2
2
package org .hiero .block .node .stream .publisher ;
3
3
4
4
import static java .lang .System .Logger .Level .TRACE ;
5
- import static java .util .concurrent .locks .LockSupport .parkNanos ;
6
5
import static org .hiero .block .node .spi .BlockNodePlugin .METRICS_CATEGORY ;
7
6
import static org .hiero .block .node .spi .BlockNodePlugin .UNKNOWN_BLOCK_NUMBER ;
8
7
23
22
import java .util .concurrent .ConcurrentSkipListMap ;
24
23
import java .util .concurrent .Future ;
25
24
import java .util .concurrent .LinkedTransferQueue ;
25
+ import java .util .concurrent .TimeUnit ;
26
26
import java .util .concurrent .atomic .AtomicLong ;
27
+ import java .util .concurrent .locks .Condition ;
28
+ import java .util .concurrent .locks .ReentrantLock ;
27
29
import org .hiero .block .api .PublishStreamResponse ;
28
30
import org .hiero .block .internal .BlockItemSetUnparsed ;
31
+ import org .hiero .block .node .app .config .node .NodeConfig ;
29
32
import org .hiero .block .node .spi .BlockNodeContext ;
30
33
import org .hiero .block .node .spi .blockmessaging .BlockItems ;
31
34
import org .hiero .block .node .spi .blockmessaging .BlockMessagingFacility ;
39
42
*/
40
43
public final class LiveStreamPublisherManager implements StreamPublisherManager {
41
44
private static final String QUEUE_ID_FORMAT = "Q%016d" ;
45
+ private static final int DATA_READY_WAIT_MICROSECONDS = 500 ;
42
46
// @todo(1413) utilize the logger
43
47
private final System .Logger LOGGER = System .getLogger (LiveStreamPublisherManager .class .getName ());
44
48
private final MetricsHolder metrics ;
@@ -48,6 +52,8 @@ public final class LiveStreamPublisherManager implements StreamPublisherManager
48
52
private final AtomicLong nextHandlerId ;
49
53
private final ConcurrentMap <String , BlockingQueue <BlockItemSetUnparsed >> transferQueueMap ;
50
54
private final ConcurrentMap <Long , BlockingQueue <BlockItemSetUnparsed >> queueByBlockMap ;
55
+ private final Condition dataReadyLatch ;
56
+ private final ReentrantLock dataReadyLock ;
51
57
52
58
/**
53
59
* Future tracking the queue forwarder task.
@@ -81,6 +87,8 @@ public LiveStreamPublisherManager(
81
87
currentStreamingBlockNumber = new AtomicLong (-1 );
82
88
nextUnstreamedBlockNumber = new AtomicLong (-1 );
83
89
lastPersistedBlockNumber = new AtomicLong (-1 );
90
+ dataReadyLock = new ReentrantLock ();
91
+ dataReadyLatch = dataReadyLock .newCondition ();
84
92
updateBlockNumbers (serverContext );
85
93
}
86
94
@@ -129,6 +137,25 @@ public BlockAction getActionForBlock(
129
137
};
130
138
}
131
139
140
+ @ Override
141
+ public void shutdown () {
142
+ // Shut down all handlers and clear the queues.
143
+ for (final Long nextKey : handlers .keySet ()) {
144
+ final PublisherHandler value = handlers .remove (nextKey );
145
+ if (value != null ) {
146
+ value .closeCommunication ();
147
+ }
148
+ }
149
+ handlers .clear ();
150
+ transferQueueMap .clear ();
151
+ queueByBlockMap .clear ();
152
+ // Cancel the queue forwarder task if it is running.
153
+ if (queueForwarderResult != null ) {
154
+ queueForwarderResult .cancel (true );
155
+ queueForwarderResult = null ;
156
+ }
157
+ }
158
+
132
159
/**
133
160
* todo(1420) add documentation
134
161
*/
@@ -167,13 +194,61 @@ private BlockAction addHandlerQueueForBlock(final long blockNumber, final long h
167
194
if (queueForwarderResult == null ) {
168
195
queueForwarderResult = launchQueueForwarder ();
169
196
}
197
+ // This should result in new data being available, so we
198
+ // count down the data ready latch.
199
+ signalDataReady ();
170
200
return BlockAction .ACCEPT ;
171
201
}
172
202
}
173
203
// Return the correct action if another handler jumped in front of the caller.
174
204
return blockNumber < nextUnstreamedBlockNumber .get () ? BlockAction .SKIP : BlockAction .END_BEHIND ;
175
205
}
176
206
207
+ /*
208
+ * Signal the data ready condition.
209
+ * <p>
210
+ * This method is called to indicate that data _might_ be available to be
211
+ * sent to the messaging facility.<br/>
212
+ * The messaging thread may wait on this condition to limit spin cycles
213
+ * and still have a low impact on latency.
214
+ */
215
+ private void signalDataReady () {
216
+ dataReadyLock .lock ();
217
+ try {
218
+ dataReadyLatch .signal ();
219
+ } finally {
220
+ dataReadyLock .unlock ();
221
+ }
222
+ }
223
+
224
+ /**
225
+ * Wait for data to be ready.
226
+ * <p>
227
+ * This method will block until the data ready condition is signaled or
228
+ * the timeout is reached.<br/>
229
+ * This method is used (with {@link #signalDataReady()}) to limit spin
230
+ * cycles and still have a low impact on latency.
231
+ * <p>
232
+ * When this method returns data _might_ be available to send to the
233
+ * messaging facility, but it is not guaranteed.
234
+ * <p>
235
+ * Note
236
+ * <blockquote>This method ignored interrupted exceptions as a specific
237
+ * optimization to avoid unnecessarily ending a thread or causing failures
238
+ * when interrupt is used as a signal rather than signaling the `Condition`
239
+ * variable.</blockquote>
240
+ */
241
+ private void waitForDataReady () {
242
+ dataReadyLock .lock ();
243
+ try {
244
+ dataReadyLatch .await (DATA_READY_WAIT_MICROSECONDS , TimeUnit .MICROSECONDS );
245
+ } catch (InterruptedException e ) {
246
+ // just ignore interruption in this specific case.
247
+ } finally {
248
+ dataReadyLock .unlock ();
249
+ }
250
+ }
251
+
177
252
/**
178
253
* todo(1420) add documentation
179
254
*/
@@ -185,6 +260,9 @@ private BlockAction getActionForCurrentlyStreaming(final long blockNumber) {
185
260
// We'll have to skip the rest of this block.
186
261
return BlockAction .SKIP ;
187
262
} else if (blockNumber >= currentStreamingBlockNumber .get () && blockNumber < nextUnstreamedBlockNumber .get ()) {
263
+ // This should result in new data being available, so we
264
+ // count down the data ready latch.
265
+ signalDataReady ();
188
266
// We're one of the handlers currently streaming, keep going.
189
267
return BlockAction .ACCEPT ;
190
268
} else if (blockNumber == nextUnstreamedBlockNumber .get ()) {
@@ -220,11 +298,13 @@ public void closeBlock(final BlockProof blockEndProof, final long handlerId) {
220
298
if (queueForwarderResult == null || queueForwarderResult .isDone ()) {
221
299
queueForwarderResult = launchQueueForwarder ();
222
300
}
223
- // @todo(1416) complete tasks that do not require the block proof data here.
301
+ // @todo(1416) complete tasks that do not require the block proof data here (before this line) .
224
302
if (blockEndProof == null ) {
225
303
// No point logging here, as the handler would have done that.
226
304
// here we just update metrics.
305
+ metrics .blocksClosedIncomplete .increment ();
227
306
} else {
307
+ metrics .blocksClosedComplete .increment ();
228
308
// @todo(1413) Also log completed blocks metric and any other relevant
229
309
// actions. Also check if we have incomplete blocks lower than the
230
310
// block that completed, and possibly enter the resend process to
@@ -393,10 +473,7 @@ private static String getQueueNameForHandlerId(final long handlerId) {
393
473
return QUEUE_ID_FORMAT .formatted (handlerId );
394
474
}
395
475
396
- // Somewhere we were supposed to set the first block number supported by
397
- // the block node. I don't know what happened to that config, but it seems
398
- // to be missing. I asked the question on the backfill PR as it's also
399
- // relevant there. The current streaming should be the next block to be
476
+ // The current streaming should be the next block to be
400
477
// streamed, but _only_ on startup. After that there should always be
401
478
// a delta (next unstreamed must always be strictly greater than the current
402
479
// streaming block number).
@@ -406,11 +483,12 @@ private void updateBlockNumbers(final BlockNodeContext serverContext) {
406
483
// Always set the last persisted block number, even if there are no
407
484
// known blocks.
408
485
lastPersistedBlockNumber .set (latestKnownBlock );
486
+ NodeConfig nodeConfiguration = serverContext .configuration ().getConfigData (NodeConfig .class );
487
+ final long earliestManagedBlock = nodeConfiguration .earliestManagedBlock ();
409
488
if (UNKNOWN_BLOCK_NUMBER == latestKnownBlock ) {
410
489
// if we have entered here, then we have no blocks available
411
- // @todo(1416) get below values from hiero config.
412
- currentStreamingBlockNumber .set (0L );
413
- nextUnstreamedBlockNumber .set (0L );
490
+ currentStreamingBlockNumber .set (earliestManagedBlock );
491
+ nextUnstreamedBlockNumber .set (earliestManagedBlock );
414
492
} else {
415
493
// if we have entered here, we know what the latest known block is,
416
494
// so we can set the next unstreamed block number to one greater
@@ -483,15 +561,11 @@ public Long call() {
483
561
}
484
562
}
485
563
// If the current block number has no batches to send, then
486
- // block on a count down latch until more data is available.
487
- // @todo(1416) need to figure out how to reset and set the countdown
488
- // latch... Until then, just park for 1/2 millisecond.
489
- // Park for 500 microseconds if there is no data available,
490
- // but not if the current block is completed (i.e. we just
491
- // sent a block proof).
564
+ // block on a condition variable until more data is
565
+ // _probably_ available, or 500 microseconds elapses.
492
566
if (publisherManager .currentStreamingBlockNumber .get () == currentBlockNumber
493
567
&& availableBatches .isEmpty ()) {
494
- parkNanos ( 500_000 ); // Park for 500 microseconds
568
+ publisherManager . waitForDataReady ();
495
569
}
496
570
}
497
571
}
@@ -501,25 +575,37 @@ public Long call() {
501
575
502
576
/**
503
577
* Metrics for tracking publisher handler activity:
578
+ * blockItemsMessaged - Number of block items delivered to the messaging service
579
+ * currentPublisherCount - Number of currently connected publishers
504
580
* lowestBlockNumber - Lowest incoming block number
505
581
* currentBlockNumber - Current incoming block number
506
582
* highestBlockNumber - Highest incoming block number
507
583
* latestBlockNumberAcknowledged - The latest block number acknowledged
584
+ * blocksClosedComplete - Number of blocks received complete (with both header and proof)
585
+ * blocksClosedIncomplete - Number of blocks received incomplete (missing header or proof)
508
586
*/
509
587
public record MetricsHolder (
510
588
Counter blockItemsMessaged ,
511
589
LongGauge currentPublisherCount ,
512
590
LongGauge lowestBlockNumber ,
513
591
LongGauge currentBlockNumber ,
514
592
LongGauge highestBlockNumber ,
515
- LongGauge latestBlockNumberAcknowledged ) {
593
+ LongGauge latestBlockNumberAcknowledged ,
594
+ Counter blocksClosedComplete ,
595
+ Counter blocksClosedIncomplete ) {
516
596
/**
517
597
* todo(1420) add documentation
518
598
*/
519
599
static MetricsHolder createMetrics (@ NonNull final Metrics metrics ) {
520
600
final Counter blockItemsMessaged =
521
601
metrics .getOrCreate (new Counter .Config (METRICS_CATEGORY , "publisher_block_items_messaged" )
522
602
.withDescription ("Live block items messaged to the messaging service" ));
603
+ final Counter blocksClosedComplete =
604
+ metrics .getOrCreate (new Counter .Config (METRICS_CATEGORY , "publisher_blocks_closed_complete" )
605
+ .withDescription ("Blocks received complete (with both header and proof) by any Handler" ));
606
+ final Counter blocksClosedIncomplete =
607
+ metrics .getOrCreate (new Counter .Config (METRICS_CATEGORY , "publisher_blocks_closed_incomplete" )
608
+ .withDescription ("Blocks received incomplete (missing header or proof) by any Handler" ));
523
609
final LongGauge numberOfProducers =
524
610
metrics .getOrCreate (new LongGauge .Config (METRICS_CATEGORY , "publisher_open_connections" )
525
611
.withDescription ("Connected publishers" ));
@@ -541,7 +627,9 @@ static MetricsHolder createMetrics(@NonNull final Metrics metrics) {
541
627
lowestBlockNumber ,
542
628
currentBlockNumber ,
543
629
highestBlockNumber ,
544
- latestBlockNumberAcknowledged );
630
+ latestBlockNumberAcknowledged ,
631
+ blocksClosedComplete ,
632
+ blocksClosedIncomplete );
545
633
}
546
634
}
547
635
}
0 commit comments