-
Notifications
You must be signed in to change notification settings - Fork 9
chore(1416): Publisher cleanup items #1490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
3898ed5
to
43b6e5b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great improvements @jsync-swirlds! Looking forward to seeing more 🙌
...ublisher/src/main/java/org/hiero/block/node/stream/publisher/LiveStreamPublisherManager.java
Show resolved
Hide resolved
...ublisher/src/main/java/org/hiero/block/node/stream/publisher/LiveStreamPublisherManager.java
Outdated
Show resolved
Hide resolved
...ublisher/src/main/java/org/hiero/block/node/stream/publisher/LiveStreamPublisherManager.java
Show resolved
Hide resolved
...ublisher/src/main/java/org/hiero/block/node/stream/publisher/LiveStreamPublisherManager.java
Show resolved
Hide resolved
...e/stream-publisher/src/main/java/org/hiero/block/node/stream/publisher/PublisherHandler.java
Outdated
Show resolved
Hide resolved
9b30363
to
1183720
Compare
1183720
to
c6385a9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
/** | ||
* todo(1420) add documentation | ||
*/ | ||
private void sendEndAndReset(final Code endOfStreamCode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: without docs this reads as send End and Reset codes.
Rather we're sending an end and resetting State.
private void sendEndAndReset(final Code endOfStreamCode) { | |
private void sendEndAndResetState(final Code endOfStreamCode) { |
or
private void sendEndAndReset(final Code endOfStreamCode) { | |
private void sendEndThenResetState(final Code endOfStreamCode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest:
private void sendEndAndReset(final Code endOfStreamCode) { | |
private void sendEndAndShutdown(final Code endOfStreamCode) { |
As we do seem to call shutdown there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's impossible to send two codes at once.
That said, sendEndAndResetState
makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's impossible to send two codes at once. That said,
sendEndAndResetState
makes sense.
I do not seem to get what you mean here? We do call shutdown in the method. Shutdown does not send any responses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original question was that the name seems to claim it's sending two codes, which I can see, but it's also a non-sequitur, because nothing can send two response codes simultaneously.
I've changed the name to sendEndAndResetState
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* to force all handlers to close their publisher communication channels. | ||
*/ | ||
public void closeCommunication() { | ||
sendEndOfStream(Code.SUCCESS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should shutdown()
here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The important thing is to probably call onComplete()
, the manager will clear the rest (remove handler) since it's shutting down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we aim to call the other method instead? It will shutdown, which will call onComplete
as well (since #1497).
sendEndOfStream(Code.SUCCESS); | |
sendEndAndReset(Code.SUCCESS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I very deliberately did not shutdown or call onComplete here, as it creates a race condition with the manager accomplishing much the same tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. We should shutdown the connections at some point however.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P.S. please beware that with the introduction of #1497, the shutdown()
method of the handler will call onComplete()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, shutdown should call replies.onShutdown
, just need to be careful with idempotence.
18cc999
to
e4420e7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleanup suggestions
// Cancel the queue forwarder task if it is running. | ||
if (queueForwarderResult != null) { | ||
queueForwarderResult.cancel(true); | ||
queueForwarderResult = null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we cancel the forwarder task first and then proceed to do all shutdowns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shutdowns should happen before the forwarder task ends, so we don't lose already-received-and-completed blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks for clarifying. Was wondering, won't that interfere with future acknowledgements however? Cause it is the manager that listens for the notifications. If we shut it down, acks will not happen.
for (final Long nextKey : handlers.keySet()) { | ||
final PublisherHandler value = handlers.remove(nextKey); | ||
if (value != null) { | ||
value.closeCommunication(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be beneficial to fork all of these close communications, after all, we take time to attempt to send responses? (Parallel stream)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly, but we have to wait for the full set to complete.
As this is part of shutdown, it isn't strictly necessary to be high performance and correctness is perhaps more imporant.
We may want to revisit this in follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted.
@@ -472,11 +549,12 @@ private void updateBlockNumbers(final BlockNodeContext serverContext) { | |||
// Always set the last persisted block number, even if there are no | |||
// known blocks. | |||
lastPersistedBlockNumber.set(latestKnownBlock); | |||
NodeConfig nodeConfiguration = serverContext.configuration().getConfigData(NodeConfig.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The NodeConfig
seems to not be properly exported. Unit tests fail and starting up the node also fails because the config record is not found.
E.G. when running a test:
No config data record available of type 'class org.hiero.block.node.app.config.node.NodeConfig'
java.lang.IllegalArgumentException: No config data record available of type 'class org.hiero.block.node.app.config.node.NodeConfig'
at com.swirlds.config.impl@0.61.3/com.swirlds.config.impl.internal.ConfigDataService.getConfigData(ConfigDataService.java:96)
at com.swirlds.config.impl@0.61.3/com.swirlds.config.impl.internal.ConfigurationImpl.getConfigData(ConfigurationImpl.java:211)
at org.hiero.block.node.stream.publisher@0.17.0-SNAPSHOT/org.hiero.block.node.stream.publisher.LiveStreamPublisherManager.updateBlockNumbers(LiveStreamPublisherManager.java:552)
at org.hiero.block.node.stream.publisher@0.17.0-SNAPSHOT/org.hiero.block.node.stream.publisher.LiveStreamPublisherManager.<init>(LiveStreamPublisherManager.java:94)
at org.hiero.block.node.stream.publisher@0.17.0-SNAPSHOT/org.hiero.block.node.stream.publisher.LiveStreamPublisherManagerTest$FunctionalityTests.setup(LiveStreamPublisherManagerTest.java:165)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at java.base/java.util.Optional.ifPresent(Optional.java:178)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
/** | ||
* todo(1420) add documentation | ||
*/ | ||
private void sendEndAndReset(final Code endOfStreamCode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest:
private void sendEndAndReset(final Code endOfStreamCode) { | |
private void sendEndAndShutdown(final Code endOfStreamCode) { |
As we do seem to call shutdown there.
@jsync-swirlds FYI I've rebased this branch on top of main, where we had a big merge conflict. cc: @Nana-EC @AlfredoG87 |
* Implemented proper condition wait for the publisher manager when there are no incoming data items to forward to messaging. * Added metrics for closed blocks (complete and incomplete). * Improved queue transfers * Added a node level configuration, and used that for "earliest managed block". Signed-off-by: Joseph S <121976561+jsync-swirlds@users.noreply.github.com> Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
e4420e7
to
7a083ab
Compare
Reviewer Notes
Related Issue(s)
Partial resolution to #1416.