Skip to content

Commit 2febc51

Browse files
fix: fixing threading issue with unsubscribe
Signed-off-by: Matt Peterson <matt.peterson@swirldslabs.com>
1 parent e595712 commit 2febc51

File tree

4 files changed

+68
-45
lines changed

4 files changed

+68
-45
lines changed

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -111,18 +111,16 @@ public ConsumerBlockItemObserver(
111111
public void onEvent(
112112
final ObjectEvent<SubscribeStreamResponse> event, final long l, final boolean b) {
113113

114-
final long currentMillis = producerLivenessClock.millis();
115-
if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) {
116-
streamMediator.unsubscribe(this);
117-
LOGGER.log(
118-
System.Logger.Level.DEBUG,
119-
"Unsubscribed ConsumerBlockItemObserver due to producer timeout");
120-
} else {
121-
122-
// Only send the response if the consumer has not cancelled
123-
// or closed the stream.
124-
if (isResponsePermitted.get()) {
125-
114+
// Only send the response if the consumer has not cancelled
115+
// or closed the stream.
116+
if (isResponsePermitted.get()) {
117+
final long currentMillis = producerLivenessClock.millis();
118+
if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) {
119+
streamMediator.unsubscribe(this);
120+
LOGGER.log(
121+
System.Logger.Level.DEBUG,
122+
"Unsubscribed ConsumerBlockItemObserver due to producer timeout");
123+
} else {
126124
// Refresh the producer liveness and pass the BlockItem to the downstream observer.
127125
producerLivenessMillis = currentMillis;
128126

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,19 @@ public void subscribe(final EventHandler<ObjectEvent<SubscribeStreamResponse>> h
153153
public void unsubscribe(final EventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {
154154

155155
// Remove the subscriber
156-
final var batchEventProcessor = subscribers.remove(handler);
156+
if (subscribers.containsKey(handler)) {
157157

158-
// Stop the processor
159-
batchEventProcessor.halt();
158+
final var batchEventProcessor = subscribers.remove(handler);
160159

161-
// Remove the gating sequence from the ring buffer
162-
ringBuffer.removeGatingSequence(batchEventProcessor.getSequence());
160+
// Stop the processor
161+
batchEventProcessor.halt();
162+
163+
// Remove the gating sequence from the ring buffer
164+
ringBuffer.removeGatingSequence(batchEventProcessor.getSequence());
165+
166+
} else {
167+
LOGGER.log(System.Logger.Level.ERROR, "Subscriber not found: {0}", handler);
168+
}
163169
}
164170

165171
@Override

server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ public class BlockStreamServiceIT {
8686
private Path testPath;
8787
private Config testConfig;
8888

89+
private static final int testTimeout = 100;
90+
8991
@BeforeEach
9092
public void setUp() throws IOException {
9193
testPath = Files.createTempDirectory(TEMP_DIR);
@@ -103,11 +105,11 @@ public void tearDown() {
103105

104106
@Test
105107
public void testPublishBlockStreamRegistrationAndExecution()
106-
throws InterruptedException, IOException, NoSuchAlgorithmException {
108+
throws IOException, NoSuchAlgorithmException {
107109

108110
final BlockStreamService blockStreamService =
109111
new BlockStreamService(
110-
50L,
112+
1500L,
111113
new ItemAckBuilder(),
112114
streamMediator,
113115
blockPersistenceHandler,
@@ -131,21 +133,22 @@ public void testPublishBlockStreamRegistrationAndExecution()
131133
PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build();
132134

133135
// Verify the BlockItem message is sent to the mediator
134-
verify(streamMediator, timeout(50).times(1)).publishEvent(blockItem);
136+
verify(streamMediator, timeout(testTimeout).times(1)).publishEvent(blockItem);
135137

136138
// Verify our custom StreamObserver implementation builds and sends
137139
// a response back to the producer
138-
verify(publishStreamResponseObserver, timeout(50).times(1)).onNext(publishStreamResponse);
140+
verify(publishStreamResponseObserver, timeout(testTimeout).times(1))
141+
.onNext(publishStreamResponse);
139142

140143
// Close the stream as Helidon does
141144
streamObserver.onCompleted();
142145

143146
// verify the onCompleted() method is invoked on the wrapped StreamObserver
144-
verify(publishStreamResponseObserver, timeout(50).times(1)).onCompleted();
147+
verify(publishStreamResponseObserver, timeout(testTimeout).times(1)).onCompleted();
145148
}
146149

147150
@Test
148-
public void testSubscribeBlockStream() throws InterruptedException {
151+
public void testSubscribeBlockStream() {
149152

150153
final ServiceStatus serviceStatus = new ServiceStatusImpl();
151154
serviceStatus.setWebServer(webServer);
@@ -185,13 +188,16 @@ public void testSubscribeBlockStream() throws InterruptedException {
185188
final SubscribeStreamResponse subscribeStreamResponse =
186189
SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.getFirst()).build();
187190

188-
verify(subscribeStreamObserver1, timeout(50).times(1)).onNext(subscribeStreamResponse);
189-
verify(subscribeStreamObserver2, timeout(50).times(1)).onNext(subscribeStreamResponse);
190-
verify(subscribeStreamObserver3, timeout(50).times(1)).onNext(subscribeStreamResponse);
191+
verify(subscribeStreamObserver1, timeout(testTimeout).times(1))
192+
.onNext(subscribeStreamResponse);
193+
verify(subscribeStreamObserver2, timeout(testTimeout).times(1))
194+
.onNext(subscribeStreamResponse);
195+
verify(subscribeStreamObserver3, timeout(testTimeout).times(1))
196+
.onNext(subscribeStreamResponse);
191197
}
192198

193199
@Test
194-
public void testFullHappyPath() throws IOException, InterruptedException {
200+
public void testFullHappyPath() throws IOException {
195201
int numberOfBlocks = 100;
196202

197203
final BlockStreamService blockStreamService = buildBlockStreamService();
@@ -226,7 +232,7 @@ public void testFullHappyPath() throws IOException, InterruptedException {
226232
}
227233

228234
@Test
229-
public void testFullWithSubscribersAddedDynamically() throws IOException, InterruptedException {
235+
public void testFullWithSubscribersAddedDynamically() throws IOException {
230236

231237
int numberOfBlocks = 100;
232238

@@ -296,7 +302,7 @@ public void testFullWithSubscribersAddedDynamically() throws IOException, Interr
296302
}
297303

298304
@Test
299-
public void testSubAndUnsubWhileStreaming() throws IOException, InterruptedException {
305+
public void testSubAndUnsubWhileStreaming() throws IOException {
300306

301307
int numberOfBlocks = 100;
302308

@@ -384,8 +390,7 @@ public void testSubAndUnsubWhileStreaming() throws IOException, InterruptedExcep
384390
}
385391

386392
@Test
387-
public void testMediatorExceptionHandlingWhenPersistenceFailure()
388-
throws IOException, InterruptedException {
393+
public void testMediatorExceptionHandlingWhenPersistenceFailure() throws IOException {
389394
final Map<
390395
EventHandler<ObjectEvent<SubscribeStreamResponse>>,
391396
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
@@ -444,9 +449,12 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
444449
// before the IOException was thrown.
445450
final SubscribeStreamResponse subscribeStreamResponse =
446451
SubscribeStreamResponse.newBuilder().setBlockItem(blockItems.getFirst()).build();
447-
verify(subscribeStreamObserver1, timeout(50).times(1)).onNext(subscribeStreamResponse);
448-
verify(subscribeStreamObserver2, timeout(50).times(1)).onNext(subscribeStreamResponse);
449-
verify(subscribeStreamObserver3, timeout(50).times(1)).onNext(subscribeStreamResponse);
452+
verify(subscribeStreamObserver1, timeout(testTimeout).times(1))
453+
.onNext(subscribeStreamResponse);
454+
verify(subscribeStreamObserver2, timeout(testTimeout).times(1))
455+
.onNext(subscribeStreamResponse);
456+
verify(subscribeStreamObserver3, timeout(testTimeout).times(1))
457+
.onNext(subscribeStreamResponse);
450458

451459
// Verify all the consumers received the end of stream response
452460
// TODO: Fix the response code when it's available
@@ -456,9 +464,9 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
456464
SubscribeStreamResponse.SubscribeStreamResponseCode
457465
.READ_STREAM_SUCCESS)
458466
.build();
459-
verify(subscribeStreamObserver1, timeout(50).times(1)).onNext(endStreamResponse);
460-
verify(subscribeStreamObserver2, timeout(50).times(1)).onNext(endStreamResponse);
461-
verify(subscribeStreamObserver3, timeout(50).times(1)).onNext(endStreamResponse);
467+
verify(subscribeStreamObserver1, timeout(testTimeout).times(1)).onNext(endStreamResponse);
468+
verify(subscribeStreamObserver2, timeout(testTimeout).times(1)).onNext(endStreamResponse);
469+
verify(subscribeStreamObserver3, timeout(testTimeout).times(1)).onNext(endStreamResponse);
462470

463471
// Verify all the consumers were unsubscribed
464472
for (final var s : subscribers.keySet()) {
@@ -473,8 +481,9 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
473481
.build();
474482
final var endOfStreamResponse =
475483
PublishStreamResponse.newBuilder().setStatus(endOfStream).build();
476-
verify(publishStreamResponseObserver, timeout(50).times(2)).onNext(endOfStreamResponse);
477-
verify(webServer, timeout(50).times(1)).stop();
484+
verify(publishStreamResponseObserver, timeout(testTimeout).times(2))
485+
.onNext(endOfStreamResponse);
486+
verify(webServer, timeout(testTimeout).times(1)).stop();
478487

479488
// Now verify the block was removed from the file system.
480489
final BlockReader<Block> blockReader = new BlockAsDirReader(JUNIT, testConfig);
@@ -489,7 +498,7 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
489498
SingleBlockResponse.SingleBlockResponseCode
490499
.READ_BLOCK_NOT_AVAILABLE)
491500
.build();
492-
verify(singleBlockResponseStreamObserver, timeout(50).times(1))
501+
verify(singleBlockResponseStreamObserver, timeout(testTimeout).times(1))
493502
.onNext(expectedSingleBlockNotAvailable);
494503

495504
// TODO: Fix the response code when it's available
@@ -499,7 +508,7 @@ public void testMediatorExceptionHandlingWhenPersistenceFailure()
499508
SubscribeStreamResponse.SubscribeStreamResponseCode
500509
.READ_STREAM_SUCCESS)
501510
.build();
502-
verify(subscribeStreamObserver4, timeout(50).times(1))
511+
verify(subscribeStreamObserver4, timeout(testTimeout).times(1))
503512
.onNext(expectedSubscriberStreamNotAvailable);
504513
}
505514

@@ -535,9 +544,9 @@ private static void verifySubscribeStreamResponse(
535544
final SubscribeStreamResponse stateProofStreamResponse =
536545
buildSubscribeStreamResponse(stateProofBlockItem);
537546

538-
verify(streamObserver, timeout(50).times(1)).onNext(headerSubStreamResponse);
539-
verify(streamObserver, timeout(50).times(8)).onNext(bodySubStreamResponse);
540-
verify(streamObserver, timeout(50).times(1)).onNext(stateProofStreamResponse);
547+
verify(streamObserver, timeout(testTimeout).times(1)).onNext(headerSubStreamResponse);
548+
verify(streamObserver, timeout(testTimeout).times(8)).onNext(bodySubStreamResponse);
549+
verify(streamObserver, timeout(testTimeout).times(1)).onNext(stateProofStreamResponse);
541550
}
542551
}
543552

server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public class LiveStreamMediatorImplTest {
6060
private final long TEST_TIME = 1_719_427_664_950L;
6161

6262
@Test
63-
public void testUnsubscribeEach() {
63+
// @Disabled
64+
public void testUnsubscribeEach() throws InterruptedException {
6465

6566
final var streamMediator =
6667
new LiveStreamMediatorImpl(
@@ -81,6 +82,8 @@ public void testUnsubscribeEach() {
8182
streamMediator.isSubscribed(observer3),
8283
"Expected the mediator to have observer3 subscribed");
8384

85+
Thread.sleep(50L);
86+
8487
streamMediator.unsubscribe(observer1);
8588
assertFalse(
8689
streamMediator.isSubscribed(observer1),
@@ -168,6 +171,7 @@ public void testMediatorPublishEventToSubscribers() throws IOException, Interrup
168171
}
169172

170173
@Test
174+
// @Disabled
171175
public void testSubAndUnsubHandling() {
172176
final var streamMediator =
173177
new LiveStreamMediatorImpl(
@@ -220,6 +224,9 @@ public void testOnCancelSubscriptionHandling() throws IOException {
220224
final List<BlockItem> blockItems = generateBlockItems(1);
221225
streamMediator.publishEvent(blockItems.getFirst());
222226

227+
// Verify the event made it to the consumer
228+
verify(serverCallStreamObserver, timeout(50).times(1)).setOnCancelHandler(any());
229+
223230
// Simulate the consumer cancelling the stream
224231
testConsumerBlockItemObserver.getOnCancel().run();
225232

@@ -251,6 +258,9 @@ public void testOnCloseSubscriptionHandling() throws IOException {
251258
final List<BlockItem> blockItems = generateBlockItems(1);
252259
streamMediator.publishEvent(blockItems.getFirst());
253260

261+
// Verify the event made it to the consumer
262+
verify(serverCallStreamObserver, timeout(50).times(1)).setOnCancelHandler(any());
263+
254264
// Simulate the consumer completing the stream
255265
testConsumerBlockItemObserver.getOnClose().run();
256266

0 commit comments

Comments
 (0)