Skip to content

chore(1416): Publisher cleanup items #1490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

jsync-swirlds
Copy link
Contributor

Reviewer Notes

  • Implemented proper condition wait for the publisher manager when there are no incoming data items to forward to messaging.
  • Added metrics for closed blocks (complete and incomplete).
  • Improved queue transfers
  • Added a node level configuration, and used that for "earliest managed block".

Related Issue(s)

Partial resolution to #1416.

@jsync-swirlds jsync-swirlds added this to the 0.17.0 milestone Aug 13, 2025
@jsync-swirlds jsync-swirlds self-assigned this Aug 13, 2025
@jsync-swirlds jsync-swirlds added the Publisher Plugin Issue related to Publisher Plugin label Aug 13, 2025
@jsync-swirlds jsync-swirlds force-pushed the 1416-publisher-cleanup branch from 3898ed5 to 43b6e5b Compare August 13, 2025 01:39
Copy link
Contributor

@ata-nas ata-nas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great improvements @jsync-swirlds! Looking forward to seeing more 🙌

@jsync-swirlds jsync-swirlds force-pushed the 1416-publisher-cleanup branch 2 times, most recently from 9b30363 to 1183720 Compare August 13, 2025 23:52
@jsync-swirlds jsync-swirlds marked this pull request as ready for review August 13, 2025 23:53
@jsync-swirlds jsync-swirlds requested review from a team as code owners August 13, 2025 23:53
@jsync-swirlds jsync-swirlds force-pushed the 1416-publisher-cleanup branch from 1183720 to c6385a9 Compare August 13, 2025 23:56
AlfredoG87
AlfredoG87 previously approved these changes Aug 14, 2025
Copy link
Contributor

@AlfredoG87 AlfredoG87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

Nana-EC
Nana-EC previously approved these changes Aug 15, 2025
Copy link
Contributor

@Nana-EC Nana-EC left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

/**
* todo(1420) add documentation
*/
private void sendEndAndReset(final Code endOfStreamCode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: without docs this reads as send End and Reset codes.
Rather we're sending an end and resetting State.

Suggested change
private void sendEndAndReset(final Code endOfStreamCode) {
private void sendEndAndResetState(final Code endOfStreamCode) {

or

Suggested change
private void sendEndAndReset(final Code endOfStreamCode) {
private void sendEndThenResetState(final Code endOfStreamCode) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest:

Suggested change
private void sendEndAndReset(final Code endOfStreamCode) {
private void sendEndAndShutdown(final Code endOfStreamCode) {

As we do seem to call shutdown there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's impossible to send two codes at once.
That said, sendEndAndResetState makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's impossible to send two codes at once. That said, sendEndAndResetState makes sense.

I do not seem to get what you mean here? We do call shutdown in the method. Shutdown does not send any responses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original question was that the name seems to claim it's sending two codes, which I can see, but it's also a non-sequitur, because nothing can send two response codes simultaneously.

I've changed the name to sendEndAndResetState

Copy link
Contributor

@ata-nas ata-nas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the changes from #1497 and we need to fix non-compiling tests. Will rebase after #1497 merges.

* to force all handlers to close their publisher communication channels.
*/
public void closeCommunication() {
sendEndOfStream(Code.SUCCESS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should shutdown() here as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The important thing is to probably call onComplete(), the manager will clear the rest (remove handler) since it's shutting down.

Copy link
Contributor

@ata-nas ata-nas Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we aim to call the other method instead? It will shutdown, which will call onComplete as well (since #1497).

Suggested change
sendEndOfStream(Code.SUCCESS);
sendEndAndReset(Code.SUCCESS);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I very deliberately did not shutdown or call onComplete here, as it creates a race condition with the manager accomplishing much the same tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. We should shutdown the connections at some point however.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P.S. please beware that with the introduction of #1497, the shutdown() method of the handler will call onComplete().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, shutdown should call replies.onShutdown, just need to be careful with idempotence.

@ata-nas ata-nas dismissed stale reviews from Nana-EC and AlfredoG87 via 18cc999 August 18, 2025 08:09
@ata-nas ata-nas force-pushed the 1416-publisher-cleanup branch 2 times, most recently from 18cc999 to e4420e7 Compare August 18, 2025 08:19
Copy link
Contributor

@ata-nas ata-nas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup suggestions

Comment on lines +177 to +181
// Cancel the queue forwarder task if it is running.
if (queueForwarderResult != null) {
queueForwarderResult.cancel(true);
queueForwarderResult = null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we cancel the forwarder task first and then proceed to do all shutdowns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdowns should happen before the forwarder task ends, so we don't lose already-received-and-completed blocks.

Copy link
Contributor

@ata-nas ata-nas Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for clarifying. Was wondering, won't that interfere with future acknowledgements however? Cause it is the manager that listens for the notifications. If we shut it down, acks will not happen.

Comment on lines +168 to +173
for (final Long nextKey : handlers.keySet()) {
final PublisherHandler value = handlers.remove(nextKey);
if (value != null) {
value.closeCommunication();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be beneficial to fork all of these close communications, after all, we take time to attempt to send responses? (Parallel stream)

Copy link
Contributor Author

@jsync-swirlds jsync-swirlds Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, but we have to wait for the full set to complete.
As this is part of shutdown, it isn't strictly necessary to be high performance and correctness is perhaps more imporant.
We may want to revisit this in follow-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted.

@@ -472,11 +549,12 @@ private void updateBlockNumbers(final BlockNodeContext serverContext) {
// Always set the last persisted block number, even if there are no
// known blocks.
lastPersistedBlockNumber.set(latestKnownBlock);
NodeConfig nodeConfiguration = serverContext.configuration().getConfigData(NodeConfig.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NodeConfig seems to not be properly exported. Unit tests fail and starting up the node also fails because the config record is not found.

E.G. when running a test:

No config data record available of type 'class org.hiero.block.node.app.config.node.NodeConfig'
java.lang.IllegalArgumentException: No config data record available of type 'class org.hiero.block.node.app.config.node.NodeConfig'
	at com.swirlds.config.impl@0.61.3/com.swirlds.config.impl.internal.ConfigDataService.getConfigData(ConfigDataService.java:96)
	at com.swirlds.config.impl@0.61.3/com.swirlds.config.impl.internal.ConfigurationImpl.getConfigData(ConfigurationImpl.java:211)
	at org.hiero.block.node.stream.publisher@0.17.0-SNAPSHOT/org.hiero.block.node.stream.publisher.LiveStreamPublisherManager.updateBlockNumbers(LiveStreamPublisherManager.java:552)
	at org.hiero.block.node.stream.publisher@0.17.0-SNAPSHOT/org.hiero.block.node.stream.publisher.LiveStreamPublisherManager.<init>(LiveStreamPublisherManager.java:94)
	at org.hiero.block.node.stream.publisher@0.17.0-SNAPSHOT/org.hiero.block.node.stream.publisher.LiveStreamPublisherManagerTest$FunctionalityTests.setup(LiveStreamPublisherManagerTest.java:165)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.Optional.ifPresent(Optional.java:178)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
	at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
	at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
	at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)

/**
* todo(1420) add documentation
*/
private void sendEndAndReset(final Code endOfStreamCode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest:

Suggested change
private void sendEndAndReset(final Code endOfStreamCode) {
private void sendEndAndShutdown(final Code endOfStreamCode) {

As we do seem to call shutdown there.

@ata-nas
Copy link
Contributor

ata-nas commented Aug 18, 2025

@jsync-swirlds FYI I've rebased this branch on top of main, where we had a big merge conflict.

cc: @Nana-EC @AlfredoG87

* Implemented proper condition wait for the publisher manager when there are no incoming
  data items to forward to messaging.
* Added metrics for closed blocks (complete and incomplete).
* Improved queue transfers
* Added a node level configuration, and used that for "earliest managed block".

Signed-off-by: Joseph S <121976561+jsync-swirlds@users.noreply.github.com>
Signed-off-by: Atanas Atanasov <a.v.atanasov98@gmail.com>
@jsync-swirlds jsync-swirlds force-pushed the 1416-publisher-cleanup branch from e4420e7 to 7a083ab Compare August 18, 2025 22:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Publisher Plugin Issue related to Publisher Plugin
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants