2
2
package org .hiero .block .node .stream .publisher ;
3
3
4
4
import static java .lang .System .Logger .Level .TRACE ;
5
- import static java .util .concurrent .locks .LockSupport .parkNanos ;
6
5
import static org .hiero .block .node .spi .BlockNodePlugin .METRICS_CATEGORY ;
7
6
import static org .hiero .block .node .spi .BlockNodePlugin .UNKNOWN_BLOCK_NUMBER ;
8
7
23
22
import java .util .concurrent .ConcurrentSkipListMap ;
24
23
import java .util .concurrent .Future ;
25
24
import java .util .concurrent .LinkedTransferQueue ;
25
+ import java .util .concurrent .TimeUnit ;
26
26
import java .util .concurrent .atomic .AtomicLong ;
27
+ import java .util .concurrent .locks .Condition ;
28
+ import java .util .concurrent .locks .ReentrantLock ;
27
29
import org .hiero .block .api .PublishStreamResponse ;
28
30
import org .hiero .block .internal .BlockItemSetUnparsed ;
31
+ import org .hiero .block .node .app .config .node .NodeConfig ;
29
32
import org .hiero .block .node .spi .BlockNodeContext ;
30
33
import org .hiero .block .node .spi .blockmessaging .BlockItems ;
31
34
import org .hiero .block .node .spi .blockmessaging .BlockMessagingFacility ;
38
41
*/
39
42
public final class LiveStreamPublisherManager implements StreamPublisherManager {
40
43
private static final String QUEUE_ID_FORMAT = "Q%016d" ;
44
+ private static final int DATA_READY_WAIT_MICROSECONDS = 500 ;
41
45
// @todo(1413) utilize the logger
42
46
private final System .Logger LOGGER = System .getLogger (LiveStreamPublisherManager .class .getName ());
43
47
private final MetricsHolder metrics ;
@@ -47,6 +51,8 @@ public final class LiveStreamPublisherManager implements StreamPublisherManager
47
51
private final AtomicLong nextHandlerId ;
48
52
private final ConcurrentMap <String , BlockingQueue <BlockItemSetUnparsed >> transferQueueMap ;
49
53
private final ConcurrentMap <Long , BlockingQueue <BlockItemSetUnparsed >> queueByBlockMap ;
54
+ private final Condition dataReadyLatch ;
55
+ private final ReentrantLock dataReadyLock ;
50
56
51
57
/**
52
58
* Future tracking the queue forwarder task.
@@ -80,6 +86,8 @@ public LiveStreamPublisherManager(
80
86
currentStreamingBlockNumber = new AtomicLong (-1 );
81
87
nextUnstreamedBlockNumber = new AtomicLong (-1 );
82
88
lastPersistedBlockNumber = new AtomicLong (-1 );
89
+ dataReadyLock = new ReentrantLock ();
90
+ dataReadyLatch = dataReadyLock .newCondition ();
83
91
updateBlockNumbers (serverContext );
84
92
}
85
93
@@ -128,6 +136,25 @@ public BlockAction getActionForBlock(
128
136
};
129
137
}
130
138
139
+ @ Override
140
+ public void shutdown () {
141
+ // Shut down all handlers and clear the queues.
142
+ for (final Long nextKey : handlers .keySet ()) {
143
+ final PublisherHandler value = handlers .remove (nextKey );
144
+ if (value != null ) {
145
+ value .closeCommunication ();
146
+ }
147
+ }
148
+ handlers .clear ();
149
+ transferQueueMap .clear ();
150
+ queueByBlockMap .clear ();
151
+ // Cancel the queue forwarder task if it is running.
152
+ if (queueForwarderResult != null ) {
153
+ queueForwarderResult .cancel (true );
154
+ queueForwarderResult = null ;
155
+ }
156
+ }
157
+
131
158
/**
132
159
* todo(1420) add documentation
133
160
*/
@@ -166,13 +193,61 @@ private BlockAction addHandlerQueueForBlock(final long blockNumber, final long h
166
193
if (queueForwarderResult == null ) {
167
194
queueForwarderResult = launchQueueForwarder ();
168
195
}
196
+ // This should result in new data being available, so we
197
+ // count down the data ready latch.
198
+ signalDataReady ();
169
199
return BlockAction .ACCEPT ;
170
200
}
171
201
}
172
202
// Return the correct action if another handler jumped in front of the caller.
173
203
return blockNumber < nextUnstreamedBlockNumber .get () ? BlockAction .SKIP : BlockAction .END_BEHIND ;
174
204
}
175
205
206
+ /*
207
+ * Signal the data ready condition.
208
+ * <p>
209
+ * This method is called to indicate that data _might_ be available to be
210
+ * sent to the messaging facility.<br/>
211
+ * The messaging thread may wait on this condition to limit spin cycles
212
+ * and still have a low impact on latency.
213
+ */
214
+ private void signalDataReady () {
215
+ dataReadyLock .lock ();
216
+ try {
217
+ dataReadyLatch .signal ();
218
+ } finally {
219
+ dataReadyLock .unlock ();
220
+ }
221
+ }
222
+
223
+ /**
224
+ * Wait for data to be ready.
225
+ * <p>
226
+ * This method will block until the data ready condition is signaled or
227
+ * the timeout is reached.<br/>
228
+ * This method is used (with `signalDataReady`) to limit spin cycles and
229
+ * still have a low impact on latency.
230
+ * <p>
231
+ * When this method returns data _might_ be available to send to the
232
+ * messaging facility, but it is not guaranteed.
233
+ * <p>
234
+ * Note
235
+ * <blockquote>This method ignored interrupted exceptions as a specific
236
+ * optimization to avoid unnecessarily ending a thread or causing failures
237
+ * when interrupt is used as a signal rather than signaling the `Condition`
238
+ * variable.
239
+ */
240
+ private void waitForDataReady () {
241
+ dataReadyLock .lock ();
242
+ try {
243
+ dataReadyLatch .await (DATA_READY_WAIT_MICROSECONDS , TimeUnit .MICROSECONDS );
244
+ } catch (InterruptedException e ) {
245
+ // just ignore interruption in this specific case.
246
+ } finally {
247
+ dataReadyLock .unlock ();
248
+ }
249
+ }
250
+
176
251
/**
177
252
* todo(1420) add documentation
178
253
*/
@@ -184,6 +259,9 @@ private BlockAction getActionForCurrentlyStreaming(final long blockNumber) {
184
259
// We'll have to skip the rest of this block.
185
260
return BlockAction .SKIP ;
186
261
} else if (blockNumber >= currentStreamingBlockNumber .get () && blockNumber < nextUnstreamedBlockNumber .get ()) {
262
+ // This should result in new data being available, so we
263
+ // count down the data ready latch.
264
+ signalDataReady ();
187
265
// We're one of the handlers currently streaming, keep going.
188
266
return BlockAction .ACCEPT ;
189
267
} else if (blockNumber == nextUnstreamedBlockNumber .get ()) {
@@ -219,11 +297,13 @@ public void closeBlock(final BlockProof blockEndProof, final long handlerId) {
219
297
if (queueForwarderResult == null || queueForwarderResult .isDone ()) {
220
298
queueForwarderResult = launchQueueForwarder ();
221
299
}
222
- // @todo(1416) complete tasks that do not require the block proof data here.
300
+ // @todo(1416) complete tasks that do not require the block proof data here (before this line) .
223
301
if (blockEndProof == null ) {
224
302
// No point logging here, as the handler would have done that.
225
303
// here we just update metrics.
304
+ metrics .blocksClosedIncomplete .increment ();
226
305
} else {
306
+ metrics .blocksClosedComplete .increment ();
227
307
// @todo(1413) Also log completed blocks metric and any other relevant
228
308
// actions. Also check if we have incomplete blocks lower than the
229
309
// block that completed, and possibly enter the resend process to
@@ -379,10 +459,7 @@ private static String getQueueNameForHandlerId(final long handlerId) {
379
459
return QUEUE_ID_FORMAT .formatted (handlerId );
380
460
}
381
461
382
- // Somewhere we were supposed to set the first block number supported by
383
- // the block node. I don't know what happened to that config, but it seems
384
- // to be missing. I asked the question on the backfill PR as it's also
385
- // relevant there. The current streaming should be the next block to be
462
+ // The current streaming should be the next block to be
386
463
// streamed, but _only_ on startup. After that there should always be
387
464
// a delta (next unstreamed must always be strictly greater than the current
388
465
// streaming block number).
@@ -392,11 +469,12 @@ private void updateBlockNumbers(final BlockNodeContext serverContext) {
392
469
// Always set the last persisted block number, even if there are no
393
470
// known blocks.
394
471
lastPersistedBlockNumber .set (latestKnownBlock );
472
+ NodeConfig nodeConfiguration = serverContext .configuration ().getConfigData (NodeConfig .class );
473
+ final long earliestManagedBlock = nodeConfiguration .earliestManagedBlock ();
395
474
if (UNKNOWN_BLOCK_NUMBER == latestKnownBlock ) {
396
475
// if we have entered here, then we have no blocks available
397
- // @todo(1416) get below values from hiero config.
398
- currentStreamingBlockNumber .set (0L );
399
- nextUnstreamedBlockNumber .set (0L );
476
+ currentStreamingBlockNumber .set (earliestManagedBlock );
477
+ nextUnstreamedBlockNumber .set (earliestManagedBlock );
400
478
} else {
401
479
// if we have entered here, we know what the latest known block is,
402
480
// so we can set the next unstreamed block number to one greater
@@ -469,15 +547,11 @@ public Long call() {
469
547
}
470
548
}
471
549
// If the current block number has no batches to send, then
472
- // block on a count down latch until more data is available.
473
- // @todo(1416) need to figure out how to reset and set the countdown
474
- // latch... Until then, just park for 1/2 millisecond.
475
- // Park for 500 microseconds if there is no data available,
476
- // but not if the current block is completed (i.e. we just
477
- // sent a block proof).
550
+ // block on a condition variable until more data is
551
+ // _probably_ available, or 500 microseconds elapses.
478
552
if (publisherManager .currentStreamingBlockNumber .get () == currentBlockNumber
479
553
&& availableBatches .isEmpty ()) {
480
- parkNanos ( 500_000 ); // Park for 500 microseconds
554
+ publisherManager . waitForDataReady ();
481
555
}
482
556
}
483
557
}
@@ -487,25 +561,37 @@ public Long call() {
487
561
488
562
/**
489
563
* Metrics for tracking publisher handler activity:
564
+ * blockItemsMessaged - Number of block items delivered to the messaging service
565
+ * currentPublisherCount - Number of currently connected publishers
490
566
* lowestBlockNumber - Lowest incoming block number
491
567
* currentBlockNumber - Current incoming block number
492
568
* highestBlockNumber - Highest incoming block number
493
569
* latestBlockNumberAcknowledged - The latest block number acknowledged
570
+ * blocksClosedComplete - Number of blocks received complete (with both header and proof)
571
+ * blocksClosedIncomplete - Number of blocks received incomplete (missing header or proof)
494
572
*/
495
573
public record MetricsHolder (
496
574
Counter blockItemsMessaged ,
497
575
LongGauge currentPublisherCount ,
498
576
LongGauge lowestBlockNumber ,
499
577
LongGauge currentBlockNumber ,
500
578
LongGauge highestBlockNumber ,
501
- LongGauge latestBlockNumberAcknowledged ) {
579
+ LongGauge latestBlockNumberAcknowledged ,
580
+ Counter blocksClosedComplete ,
581
+ Counter blocksClosedIncomplete ) {
502
582
/**
503
583
* todo(1420) add documentation
504
584
*/
505
585
static MetricsHolder createMetrics (@ NonNull final Metrics metrics ) {
506
586
final Counter blockItemsMessaged =
507
587
metrics .getOrCreate (new Counter .Config (METRICS_CATEGORY , "publisher_block_items_messaged" )
508
588
.withDescription ("Live block items messaged to the messaging service" ));
589
+ final Counter blocksClosedComplete =
590
+ metrics .getOrCreate (new Counter .Config (METRICS_CATEGORY , "publisher_blocks_closed_complete" )
591
+ .withDescription ("Blocks received complete (with both header and proof) by any Handler" ));
592
+ final Counter blocksClosedIncomplete =
593
+ metrics .getOrCreate (new Counter .Config (METRICS_CATEGORY , "publisher_blocks_closed_incomplete" )
594
+ .withDescription ("Blocks received incomplete (missing header or proof) by any Handler" ));
509
595
final LongGauge numberOfProducers =
510
596
metrics .getOrCreate (new LongGauge .Config (METRICS_CATEGORY , "publisher_open_connections" )
511
597
.withDescription ("Connected publishers" ));
@@ -527,7 +613,9 @@ static MetricsHolder createMetrics(@NonNull final Metrics metrics) {
527
613
lowestBlockNumber ,
528
614
currentBlockNumber ,
529
615
highestBlockNumber ,
530
- latestBlockNumberAcknowledged );
616
+ latestBlockNumberAcknowledged ,
617
+ blocksClosedComplete ,
618
+ blocksClosedIncomplete );
531
619
}
532
620
}
533
621
}
0 commit comments