Skip to content

Commit 44474e3

Browse files
committed
fix: 1227 subscriber plugin does not close failed streams
* The Helidon plugin throws a `RuntimeException` to indicate a stream that was closed (even if _orderly_). * This change handles that exception by closing the session and ending the thread. * Also removed the "subscription" that is not used in "server streaming" plugins. Signed-off-by: Joseph S <121976561+jsync-swirlds@users.noreply.github.com>
1 parent 3e5fb55 commit 44474e3

File tree

2 files changed

+112
-19
lines changed

2 files changed

+112
-19
lines changed

block-node/stream-subscriber/src/main/java/org/hiero/block/node/stream/subscriber/BlockStreamSubscriberSession.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import java.util.concurrent.BlockingQueue;
1818
import java.util.concurrent.Callable;
1919
import java.util.concurrent.CountDownLatch;
20-
import java.util.concurrent.Flow.Subscription;
2120
import java.util.concurrent.TimeUnit;
2221
import java.util.concurrent.atomic.AtomicBoolean;
2322
import java.util.concurrent.atomic.AtomicLong;
@@ -78,8 +77,6 @@ public class BlockStreamSubscriberSession implements Callable<BlockStreamSubscri
7877
private final CountDownLatch sessionReadyLatch;
7978
/** A flag indicating if the session should be interrupted */
8079
private final AtomicBoolean interruptedStream = new AtomicBoolean(false);
81-
/** The subscription for the GRPC connection with client */
82-
private Subscription subscription;
8380
/** The current block being sent to the client */
8481
private long nextBlockToSend;
8582
/** The latest block received from the live stream. */
@@ -481,17 +478,20 @@ synchronized void close(final SubscribeStreamResponse.Code endStreamResponseCode
481478
sessionReadyLatch.countDown();
482479
LOGGER.log(Level.DEBUG, "Session ready latch was not counted down on close, releasing now");
483480
}
484-
if (!interruptedStream.get()) {
485-
// unregister us from the block messaging system, if we are not registered then this is noop
486-
context.blockMessaging().unregisterBlockItemHandler(liveBlockHandler);
481+
// unregister us from the block messaging system, if we are not registered then this is noop
482+
context.blockMessaging().unregisterBlockItemHandler(liveBlockHandler);
483+
// send an end stream response, if we have a code to set and are not interrupted.
484+
if (!interruptedStream.get() && endStreamResponseCode != null) {
487485
final Builder response =
488486
SubscribeStreamResponseUnparsed.newBuilder().status(endStreamResponseCode);
489487
responsePipeline.onNext(response.build());
490-
responsePipeline.onComplete();
491488
}
492-
if (subscription != null) {
493-
subscription.cancel();
494-
subscription = null;
489+
try {
490+
responsePipeline.onComplete();
491+
} catch (RuntimeException e) {
492+
// If the pipeline cannot be completed, log and suppress this exception.
493+
final String message = "Suppressed client error when \"completing\" stream for client %d%n%s";
494+
LOGGER.log(Level.DEBUG, message.formatted(clientId, e.getMessage()), e);
495495
}
496496
// Break out of the loop that sends blocks to the client, so the thread completes.
497497
interruptedStream.set(true);
@@ -518,7 +518,17 @@ private void sendOneBlockItemSet(final BlockItems nextBatch) {
518518
private void sendOneBlockItemSet(final List<BlockItemUnparsed> blockItems) {
519519
final BlockItemSetUnparsed dataToSend = new BlockItemSetUnparsed(blockItems);
520520
final OneOf<ResponseOneOfType> responseOneOf = new OneOf<>(ResponseOneOfType.BLOCK_ITEMS, dataToSend);
521-
responsePipeline.onNext(new SubscribeStreamResponseUnparsed(responseOneOf));
521+
try {
522+
responsePipeline.onNext(new SubscribeStreamResponseUnparsed(responseOneOf));
523+
} catch (RuntimeException e) {
524+
// If the pipeline is in an error state; close this session.
525+
// Unfortunately this is the "standard" way to end a stream, so log
526+
// at debug rather than emitting noise in the logs.
527+
final String message =
528+
"Client error sending block items for client %d: %s".formatted(clientId, e.getMessage());
529+
LOGGER.log(Level.DEBUG, message, e);
530+
close(null); // cannot send the end stream response, just close the stream.
531+
}
522532
}
523533

524534
// ==== Block Item Handler Class ===========================================

block-node/stream-subscriber/src/test/java/org/hiero/block/node/stream/subscriber/BlockStreamSubscriberSessionTest.java

Lines changed: 91 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.hiero.block.api.SubscribeStreamRequest;
2323
import org.hiero.block.api.SubscribeStreamResponse;
2424
import org.hiero.block.internal.SubscribeStreamResponseUnparsed;
25+
import org.hiero.block.internal.SubscribeStreamResponseUnparsed.ResponseOneOfType;
2526
import org.hiero.block.node.app.fixtures.plugintest.SimpleInMemoryHistoricalBlockFacility;
2627
import org.hiero.block.node.app.fixtures.plugintest.TestBlockMessagingFacility;
2728
import org.hiero.block.node.spi.BlockNodeContext;
@@ -78,7 +79,6 @@ void setUp() {
7879
@Nested
7980
@DisplayName("Streaming Functionality Tests")
8081
class StreamingTests {
81-
8282
/**
8383
* Tests the complete flow of streaming both historical and live blocks.
8484
* Verifies that the session can handle:
@@ -107,7 +107,6 @@ void shouldStreamHistoricalAndLiveBlocksSuccessfully()
107107
sessionReadyLatch.await();
108108
// Phase 1: Historical Block Streaming
109109
final int expectedResponseCount = END_BLOCK - START_BLOCK;
110-
111110
// Phase 2: Live Block Streaming
112111
BlockItems[] liveBatches = createNumberOfSimpleBlockBatches(MAX_AVAILABLE_BLOCK, END_BLOCK + 1);
113112
for (BlockItems next : liveBatches) {
@@ -116,12 +115,8 @@ void shouldStreamHistoricalAndLiveBlocksSuccessfully()
116115
// Wait for everything to complete.
117116
// Note, don't try to wait before this; there are no execution guarantees until
118117
// `get` is called; before that the thread may not run or may be parked indefinitely.
119-
sessionFuture.get(
120-
1L,
121-
TimeUnit.SECONDS); // The timeout doesn't work, for some reason, but it's here because we don't
122-
// have a better alternative.
118+
sessionFuture.get();
123119
}
124-
125120
// Verify final pipeline state
126121
assertThat(responsePipeline.getReceivedResponses()).hasSize(21);
127122
assertThat(responsePipeline.getCompletionCount()).isEqualTo(1);
@@ -148,14 +143,90 @@ void shouldStreamHistoricalBlocksSuccessfully() {
148143
setupHistoricalBlockProvider(MIN_AVAILABLE_BLOCK, MAX_AVAILABLE_BLOCK);
149144
// Execute session
150145
session.call();
151-
152146
// Verify pipeline interactions
153147
assertThat(responsePipeline.getReceivedResponses()).hasSize(11);
154148
assertThat(responsePipeline.getCompletionCount()).isEqualTo(1);
155149
assertThat(responsePipeline.getPipelineErrors()).isEmpty();
156150
}
157151
}
158152

153+
/**
154+
* Tests related to streams ending for the BlockStreamSubscriberSession.
155+
*/
156+
@Nested
157+
@DisplayName("Stream End Tests")
158+
class StreamEndTests {
159+
/**
160+
* Tests the complete flow of streaming both historical and live blocks.
161+
* Verifies that the session can handle:
162+
* 1. Historical block streaming
163+
* 2. Transition to live block streaming
164+
* 3. Proper pipeline interactions
165+
*/
166+
@Test
167+
@DisplayName("Should complete a combined stream when the client exits")
168+
void shouldCompleteWhenStreamClosed() throws InterruptedException, ExecutionException, TimeoutException {
169+
// Setup test parameters
170+
final int MIN_AVAILABLE_BLOCK = 0;
171+
final int MAX_AVAILABLE_BLOCK = 10;
172+
final int START_BLOCK = 0;
173+
final int END_BLOCK = 19;
174+
final ResponsePipeline disconnectedPipeline = new ResponsePipeline(true);
175+
176+
// Initialize session
177+
final SubscribeStreamRequest subscribeStreamRequest = createRequest(START_BLOCK, END_BLOCK);
178+
setupHistoricalBlockProvider(MIN_AVAILABLE_BLOCK, MAX_AVAILABLE_BLOCK + 1);
179+
session = new BlockStreamSubscriberSession(
180+
CLIENT_ID, subscribeStreamRequest, disconnectedPipeline, context, sessionReadyLatch);
181+
try (final ExecutorService sessionExecutor = Executors.newVirtualThreadPerTaskExecutor()) {
182+
Future<BlockStreamSubscriberSession> sessionFuture = sessionExecutor.submit(session);
183+
sessionReadyLatch.await();
184+
// Phase 1: Historical Block Streaming
185+
final int expectedResponseCount = END_BLOCK - START_BLOCK;
186+
// Phase 2: Live Block Streaming
187+
BlockItems[] liveBatches = createNumberOfSimpleBlockBatches(MAX_AVAILABLE_BLOCK, END_BLOCK + 1);
188+
for (BlockItems next : liveBatches) {
189+
blockMessagingFacility.sendBlockItems(next);
190+
}
191+
// Wait for everything to complete.
192+
// Note, don't try to wait before this; there are no execution guarantees until
193+
// `get` is called; before that the thread may not run or may be parked indefinitely.
194+
sessionFuture.get();
195+
}
196+
// Verify final pipeline state
197+
assertThat(disconnectedPipeline.getReceivedResponses()).hasSize(0);
198+
assertThat(disconnectedPipeline.getCompletionCount()).isEqualTo(1);
199+
assertThat(disconnectedPipeline.getPipelineErrors()).isEmpty();
200+
}
201+
202+
/**
203+
* Tests the historical block streaming functionality in isolation.
204+
* Verifies that the session can properly stream blocks from the historical provider.
205+
*/
206+
@Test
207+
@DisplayName("Should complete a stream of historical blocks when the client exits")
208+
void shouldCompleteWhenStreamFails() {
209+
// Setup test parameters
210+
final int MIN_AVAILABLE_BLOCK = 0;
211+
final int MAX_AVAILABLE_BLOCK = 20;
212+
final long START_BLOCK = 1L;
213+
final long END_BLOCK = 10L;
214+
final ResponsePipeline disconnectedPipeline = new ResponsePipeline(true);
215+
216+
// Initialize session
217+
final SubscribeStreamRequest subscribeStreamRequest = createRequest(START_BLOCK, END_BLOCK);
218+
session = new BlockStreamSubscriberSession(
219+
CLIENT_ID, subscribeStreamRequest, disconnectedPipeline, context, sessionReadyLatch);
220+
setupHistoricalBlockProvider(MIN_AVAILABLE_BLOCK, MAX_AVAILABLE_BLOCK);
221+
// Execute session
222+
session.call();
223+
// Verify pipeline interactions
224+
assertThat(disconnectedPipeline.getReceivedResponses()).hasSize(0);
225+
assertThat(disconnectedPipeline.getCompletionCount()).isEqualTo(1);
226+
assertThat(disconnectedPipeline.getPipelineErrors()).isEmpty();
227+
}
228+
}
229+
159230
/**
160231
* Sets up the historical block provider with the specified block range.
161232
*
@@ -195,8 +266,17 @@ private static class ResponsePipeline implements Pipeline<SubscribeStreamRespons
195266
private final List<SubscribeStreamResponse> receivedResponses = new ArrayList<>();
196267

197268
private final List<Throwable> pipelineErrors = new ArrayList<>();
269+
private final boolean throwOnNext;
198270
private int completionCount = 0;
199271

272+
public ResponsePipeline() {
273+
this.throwOnNext = false;
274+
}
275+
276+
public ResponsePipeline(final boolean throwOnNext) {
277+
this.throwOnNext = throwOnNext;
278+
}
279+
200280
public List<SubscribeStreamResponse> getReceivedResponses() {
201281
return receivedResponses;
202282
}
@@ -217,6 +297,9 @@ public void onSubscribe(Flow.Subscription subscription) {}
217297

218298
@Override
219299
public void onNext(SubscribeStreamResponseUnparsed item) {
300+
if (throwOnNext && item.response().kind() != ResponseOneOfType.STATUS) {
301+
throw new RuntimeException("Simulated \"closed\" stream.");
302+
}
220303
try {
221304
var binary = SubscribeStreamResponseUnparsed.PROTOBUF.toBytes(item);
222305
var response = SubscribeStreamResponse.PROTOBUF.parse(binary);

0 commit comments

Comments
 (0)