1
1
// SPDX-License-Identifier: Apache-2.0
2
2
package org .hiero .block .node .stream .subscriber ;
3
3
4
+ import static java .util .Objects .requireNonNull ;
4
5
import static java .util .concurrent .locks .LockSupport .parkNanos ;
5
6
import static org .hiero .block .node .spi .BlockNodePlugin .METRICS_CATEGORY ;
6
7
import static org .hiero .block .node .spi .BlockNodePlugin .UNKNOWN_BLOCK_NUMBER ;
11
12
import com .hedera .pbj .runtime .grpc .Pipeline ;
12
13
import com .swirlds .metrics .api .Counter ;
13
14
import com .swirlds .metrics .api .Counter .Config ;
15
+ import edu .umd .cs .findbugs .annotations .NonNull ;
14
16
import java .lang .System .Logger ;
15
17
import java .lang .System .Logger .Level ;
16
18
import java .util .List ;
17
- import java .util .Objects ;
18
19
import java .util .concurrent .ArrayBlockingQueue ;
19
20
import java .util .concurrent .BlockingQueue ;
20
21
import java .util .concurrent .Callable ;
@@ -111,18 +112,17 @@ public class BlockStreamSubscriberSession implements Callable<BlockStreamSubscri
111
112
* @param context The context for the block node
112
113
*/
113
114
public BlockStreamSubscriberSession (
114
- long clientId ,
115
- SubscribeStreamRequest request ,
116
- Pipeline <? super SubscribeStreamResponseUnparsed > responsePipeline ,
117
- BlockNodeContext context ,
118
- final CountDownLatch sessionReadyLatch ) {
119
- LOGGER .log (Level .TRACE , request .toString ());
115
+ final long clientId ,
116
+ @ NonNull final SubscribeStreamRequest request ,
117
+ @ NonNull final Pipeline <? super SubscribeStreamResponseUnparsed > responsePipeline ,
118
+ @ NonNull final BlockNodeContext context ,
119
+ @ NonNull final CountDownLatch sessionReadyLatch ) {
120
120
this .clientId = clientId ;
121
121
this .startBlockNumber = request .startBlockNumber ();
122
122
this .endBlockNumber = request .endBlockNumber ();
123
- this .responsePipeline = Objects . requireNonNull (responsePipeline );
124
- this .context = Objects . requireNonNull (context );
125
- this .sessionReadyLatch = Objects . requireNonNull (sessionReadyLatch );
123
+ this .responsePipeline = requireNonNull (responsePipeline );
124
+ this .context = requireNonNull (context );
125
+ this .sessionReadyLatch = requireNonNull (sessionReadyLatch );
126
126
latestLiveStreamBlock = new AtomicLong (UNKNOWN_BLOCK_NUMBER - 1 );
127
127
pluginConfiguration = context .configuration ().getConfigData (SubscriberConfig .class );
128
128
// Next block to send depends on what was requested and what is available.
@@ -162,7 +162,6 @@ public BlockStreamSubscriberSession call() {
162
162
if (validateRequest (
163
163
oldestBlockNumber , latestBlockNumber , startBlockNumber , endBlockNumber , clientId , LOGGER )) {
164
164
// register us to listen to block items from the block messaging system
165
- LOGGER .log (Level .TRACE , "Registering a block subscriber handler for " + handlerName );
166
165
context .blockMessaging ().registerNoBackpressureBlockItemHandler (liveBlockHandler , false , handlerName );
167
166
sessionReadyLatch .countDown ();
168
167
// Send blocks forever if requested, otherwise send until we reach the requested end block
@@ -174,10 +173,13 @@ public BlockStreamSubscriberSession call() {
174
173
// then process live blocks, if available.
175
174
sendLiveBlocksIfAvailable ();
176
175
}
176
+ if (allRequestedBlocksSent ()) {
177
+ // We've sent all request blocks. Therefore, we close, according to the protocol.
178
+ close (SubscribeStreamResponse .Code .READ_STREAM_SUCCESS );
179
+ }
177
180
}
178
181
} catch (RuntimeException | ParseException | InterruptedException e ) {
179
182
sessionFailedCause = e ;
180
- interruptedStream .set (true );
181
183
close (SubscribeStreamResponse .Code .READ_STREAM_SUCCESS ); // Need an "INCOMPLETE" code...
182
184
}
183
185
// Need to record a metric here with client ID tag, so we can record
@@ -193,7 +195,8 @@ private void sendHistoricalBlocks() throws ParseException {
193
195
// We need to send historical blocks.
194
196
// We will only send one block at a time to keep things "smooth".
195
197
// Start by getting a block accessor for the next block to send from the historical provider.
196
- BlockAccessor nextBlockAccessor = context .historicalBlockProvider ().block (nextBlockToSend );
198
+ final BlockAccessor nextBlockAccessor =
199
+ context .historicalBlockProvider ().block (nextBlockToSend );
197
200
if (nextBlockAccessor != null ) {
198
201
// We have a block to send, so send it.
199
202
sendOneBlockItemSet (nextBlockAccessor .blockUnparsed ());
@@ -202,7 +205,7 @@ private void sendHistoricalBlocks() throws ParseException {
202
205
} else {
203
206
// Only give up if this is an historical block, otherwise just
204
207
// go back up and see if live has the block.
205
- if (!(nextBlockToSend < 0 || nextBlockToSend >= getLatestKnownBlock ())) {
208
+ if (!(nextBlockToSend < 0 || nextBlockToSend >= getLatestHistoricalBlock ())) {
206
209
// We cannot get the block needed, something has failed.
207
210
// close the stream with an "unavailable" response.
208
211
final String message = "Unable to read historical block {0}." ;
@@ -228,7 +231,7 @@ private void sendHistoricalBlocks() throws ParseException {
228
231
*/
229
232
private boolean isHistoryPermitted () {
230
233
return !(interruptedStream .get ()
231
- || latestLiveStreamBlock .get () < nextBlockToSend
234
+ || ( latestLiveStreamBlock .get () < nextBlockToSend && latestLiveStreamBlock . get () >= 0 )
232
235
|| allRequestedBlocksSent ()
233
236
|| nextBatchIsLive ());
234
237
}
@@ -271,21 +274,35 @@ private void sendLiveBlocksIfAvailable() throws InterruptedException {
271
274
// If we run out, get ahead of live, or have to send historical blocks,
272
275
// then we'll also break out of the loop and return to the caller.
273
276
while (!liveBlockQueue .isEmpty ()) {
274
- // take the block item from the queue and process it
275
- final BlockItems blockItems = liveBlockQueue .poll ();
276
- // Live _might_ be behind the next expected block (particularly if
277
- // the requested start block is in the future), skip this block, in that case.
278
- if (blockItems .isStartOfNewBlock () && blockItems .newBlockNumber () != nextBlockToSend ) {
277
+ // Peek at the block item from the queue and _possibly_ process it
278
+ BlockItems blockItems = liveBlockQueue .peek ();
279
+ // Live _might_ be ahead or behind the next expected block (particularly if
280
+ // the requested start block is in the future), don't send this block, in that case.
281
+ // If behind, remove that item from the queue; if ahead leave it in the queue.
282
+ if (blockItems .isStartOfNewBlock () && blockItems .newBlockNumber () < nextBlockToSend ) {
279
283
LOGGER .log (Level .TRACE , "Skipping block {0} for client {1}" , blockItems .newBlockNumber (), clientId );
280
284
skipCurrentBlockInQueue (blockItems );
281
- } else {
285
+ } else if (blockItems .newBlockNumber () == nextBlockToSend ) {
286
+ blockItems = liveBlockQueue .poll ();
282
287
if (blockItems != null ) {
283
288
sendOneBlockItemSet (blockItems );
289
+ if (blockItems .isEndOfBlock ()) {
290
+ nextBlockToSend ++;
291
+ trimBlockItemQueue (nextBlockToSend );
292
+ }
284
293
}
285
- if (blockItems .isEndOfBlock ()) {
286
- nextBlockToSend ++;
287
- trimBlockItemQueue (nextBlockToSend );
288
- }
294
+ } else if (blockItems .isStartOfNewBlock () && blockItems .newBlockNumber () > nextBlockToSend ) {
295
+ // This block is _future_, so we need to wait, and try to get the next block from history
296
+ // first, then come back to this block.
297
+ LOGGER .log (
298
+ Level .TRACE ,
299
+ "Retaining future block {0} for client {1}" ,
300
+ blockItems .newBlockNumber (),
301
+ clientId );
302
+ } else {
303
+ // This is a past or future _partial_ block, so we need to trim the queue.
304
+ liveBlockQueue .poll (); // discard _this batch only_.
305
+ trimBlockItemQueue (nextBlockToSend );
289
306
}
290
307
// Note: We depend here on the rule that there are no gaps between blocks.
291
308
// The header for block N immediately follows the proof for block N-1.
@@ -338,20 +355,26 @@ private boolean queueFull() {
338
355
/**
339
356
* Remove the remainder of a block from the queue.
340
357
* <p>
341
- * This method assumes that a possible header batch has already been removed
342
- * from the queue (and is provided). The provided item is checked, and if it
343
- * is a header block, the remainder of that block, up to the next header
344
- * batch (which might be the next item) is removed from the queue.<br/>
345
- * Note, the item provided and the next item are not removed from the queue,
346
- * so it is important to only call this method after polling and item, and
347
- * when this method returns, the next item in the queue will be the start of
348
- * a new block (or else the queue will be empty).
358
+ * This method assumes that a possible header batch has already been viewed
359
+ * but not removed from the queue (and is provided). The provided item is
360
+ * checked, and if it is a header block, it is removed and then the
361
+ * remainder of that block, up to the next header batch (which might be the
362
+ * next item) is removed from the queue.<br/>
363
+ * Note, the item provided _may be_ removed from the queue (even if it's not
364
+ * a block header), so it is important to only call this method after
365
+ * peeking at the item without removing it, and when this method returns,
366
+ * the next item in the queue will be the start of a new block (or else
367
+ * the queue will be empty).
349
368
*/
350
369
private void skipCurrentBlockInQueue (BlockItems queueHead ) {
351
- // The "head" entry is already removed, remove the rest of its block if it's a block header.
352
- if (queueHead != null && queueHead .isStartOfNewBlock ()) {
353
- queueHead = liveBlockQueue .peek ();
370
+ // The "head" entry is _not_ already removed, remove it, and the rest of
371
+ // its block. This also handles a partial block at the head of the queue
372
+ // when we cannot process that block (e.g. it's in the future or past from
373
+ // the block we currently need to send).
374
+ if (queueHead != null ) {
375
+ liveBlockQueue .poll (); // remove the "head" entry
354
376
// Now remove "head" entries until the _next_ item is a block header
377
+ queueHead = liveBlockQueue .peek (); // peek at the next item.
355
378
while (queueHead != null && !(queueHead .isStartOfNewBlock ())) {
356
379
liveBlockQueue .poll ();
357
380
queueHead = liveBlockQueue .peek ();
@@ -465,11 +488,6 @@ long getNextBlockToSend() {
465
488
return nextBlockToSend ;
466
489
}
467
490
468
- // Visible for testing.
469
- LiveBlockHandler getLiveBlockHandler () {
470
- return liveBlockHandler ;
471
- }
472
-
473
491
/**
474
492
* Close this session. This will unregister us from the block messaging system and cancel the subscription.
475
493
*/
@@ -480,11 +498,14 @@ synchronized void close(final SubscribeStreamResponse.Code endStreamResponseCode
480
498
sessionReadyLatch .countDown ();
481
499
LOGGER .log (Level .DEBUG , "Session ready latch was not counted down on close, releasing now" );
482
500
}
483
- // unregister us from the block messaging system, if we are not registered then this is noop
484
- context .blockMessaging ().unregisterBlockItemHandler (liveBlockHandler );
485
- final Builder response = SubscribeStreamResponseUnparsed .newBuilder ().status (endStreamResponseCode );
486
- responsePipeline .onNext (response .build ());
487
- responsePipeline .onComplete ();
501
+ if (!interruptedStream .get ()) {
502
+ // unregister us from the block messaging system, if we are not registered then this is noop
503
+ context .blockMessaging ().unregisterBlockItemHandler (liveBlockHandler );
504
+ final Builder response =
505
+ SubscribeStreamResponseUnparsed .newBuilder ().status (endStreamResponseCode );
506
+ responsePipeline .onNext (response .build ());
507
+ responsePipeline .onComplete ();
508
+ }
488
509
if (subscription != null ) {
489
510
subscription .cancel ();
490
511
subscription = null ;
@@ -494,10 +515,8 @@ synchronized void close(final SubscribeStreamResponse.Code endStreamResponseCode
494
515
}
495
516
496
517
private void sendOneBlockItemSet (final BlockUnparsed nextBlock ) throws ParseException {
497
- LOGGER .log (Level .TRACE , "Sending full block {0} to {1}" , nextBlockToSend , handlerName );
498
- BlockHeader header =
518
+ final BlockHeader header =
499
519
BlockHeader .PROTOBUF .parse (nextBlock .blockItems ().getFirst ().blockHeader ());
500
- BlockItems blockBatch = new BlockItems (nextBlock .blockItems (), header .number ());
501
520
if (header .number () == nextBlockToSend ) {
502
521
sendOneBlockItemSet (nextBlock .blockItems ());
503
522
} else {
@@ -541,11 +560,11 @@ private static class LiveBlockHandler implements NoBackPressureBlockItemHandler
541
560
private final String clientId ;
542
561
543
562
private LiveBlockHandler (
544
- final BlockingQueue <BlockItems > liveBlockQueue ,
545
- final AtomicLong latestLiveStreamBlock ,
563
+ @ NonNull final BlockingQueue <BlockItems > liveBlockQueue ,
564
+ @ NonNull final AtomicLong latestLiveStreamBlock ,
546
565
final String clientId ) {
547
- this .liveBlockQueue = liveBlockQueue ;
548
- this .latestLiveStreamBlock = latestLiveStreamBlock ;
566
+ this .liveBlockQueue = requireNonNull ( liveBlockQueue ) ;
567
+ this .latestLiveStreamBlock = requireNonNull ( latestLiveStreamBlock ) ;
549
568
this .clientId = clientId ;
550
569
}
551
570
@@ -557,10 +576,9 @@ public void onTooFarBehindError() {
557
576
}
558
577
559
578
@ Override
560
- public void handleBlockItemsReceived (final BlockItems blockItems ) {
579
+ public void handleBlockItemsReceived (@ NonNull final BlockItems blockItems ) {
561
580
if (blockItems .newBlockNumber () > latestLiveStreamBlock .get ()) {
562
581
latestLiveStreamBlock .set (blockItems .newBlockNumber ());
563
- LOGGER .log (Level .TRACE , "Updated latest block to {0}." , latestLiveStreamBlock );
564
582
}
565
583
// Blocking so that the client thread has a chance to pull items
566
584
// off the head when it's full.
0 commit comments