16
16
17
17
package com .hedera .block .server .consumer ;
18
18
19
- import static com .hedera .block .protos .BlockStreamService .BlockItem ;
20
- import static com .hedera .block .protos .BlockStreamService .SubscribeStreamResponse ;
21
-
22
19
import com .hedera .block .server .data .ObjectEvent ;
23
20
import com .hedera .block .server .mediator .StreamMediator ;
24
21
import io .grpc .stub .ServerCallStreamObserver ;
25
22
import io .grpc .stub .StreamObserver ;
23
+
26
24
import java .time .InstantSource ;
27
25
import java .util .concurrent .atomic .AtomicBoolean ;
28
26
27
+ import static com .hedera .block .protos .BlockStreamService .BlockItem ;
28
+ import static com .hedera .block .protos .BlockStreamService .SubscribeStreamResponse ;
29
+
29
30
/**
30
- * The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to
31
- * the downstream consumer via the notify method and manage the bidirectional stream to the consumer
32
- * via the onNext, onError, and onCompleted methods.
31
+ * The ConsumerBlockItemObserver class is the primary integration point between the LMAX Disruptor and
32
+ * an instance of a downstream consumer (represented by subscribeStreamResponseObserver provided by Helidon).
33
+ * The ConsumerBlockItemObserver implements the EventHandler interface so the Disruptor can invoke
34
+ * the onEvent() method when a new SubscribeStreamResponse is available.
33
35
*/
34
36
public class ConsumerBlockItemObserver
35
37
implements BlockItemEventHandler <ObjectEvent <SubscribeStreamResponse >> {
@@ -50,9 +52,12 @@ public class ConsumerBlockItemObserver
50
52
protected Runnable onClose ;
51
53
52
54
/**
53
- * Constructor for the LiveStreamObserverImpl class.
55
+ * Constructor for the ConsumerBlockItemObserver class.
54
56
*
55
- * @param subscribeStreamResponseObserver the response stream observer
57
+ * @param timeoutThresholdMillis The timeout threshold in milliseconds.
58
+ * @param producerLivenessClock The producer liveness clock.
59
+ * @param streamMediator The StreamMediator instance.
60
+ * @param subscribeStreamResponseObserver The StreamObserver instance.
56
61
*/
57
62
public ConsumerBlockItemObserver (
58
63
final long timeoutThresholdMillis ,
@@ -109,23 +114,28 @@ public void onEvent(
109
114
final long currentMillis = producerLivenessClock .millis ();
110
115
if (currentMillis - producerLivenessMillis > timeoutThresholdMillis ) {
111
116
streamMediator .unsubscribe (this );
112
- LOGGER .log (System .Logger .Level .DEBUG , "Unsubscribed handler " );
117
+ LOGGER .log (System .Logger .Level .DEBUG , "Unsubscribed ConsumerBlockItemObserver due to producer timeout " );
113
118
} else {
114
119
115
- // Refresh the producer liveness and pass the BlockItem to the downstream observer.
116
- producerLivenessMillis = currentMillis ;
117
-
118
- // Only start sending BlockItems after we've reached
119
- // the beginning of a block.
120
- final SubscribeStreamResponse subscribeStreamResponse = event .get ();
121
- final BlockItem blockItem = subscribeStreamResponse .getBlockItem ();
122
- if (!streamStarted && blockItem .hasHeader ()) {
123
- streamStarted = true ;
124
- }
125
-
126
- if (streamStarted && isResponsePermitted .get ()) {
127
- LOGGER .log (System .Logger .Level .INFO , "Send BlockItem downstream: {0} " , blockItem );
128
- subscribeStreamResponseObserver .onNext (subscribeStreamResponse );
120
+ // Only send the response if the consumer has not cancelled
121
+ // or closed the stream.
122
+ if (isResponsePermitted .get ()) {
123
+
124
+ // Refresh the producer liveness and pass the BlockItem to the downstream observer.
125
+ producerLivenessMillis = currentMillis ;
126
+
127
+ // Only start sending BlockItems after we've reached
128
+ // the beginning of a block.
129
+ final SubscribeStreamResponse subscribeStreamResponse = event .get ();
130
+ final BlockItem blockItem = subscribeStreamResponse .getBlockItem ();
131
+ if (!streamStarted && blockItem .hasHeader ()) {
132
+ streamStarted = true ;
133
+ }
134
+
135
+ if (streamStarted ) {
136
+ LOGGER .log (System .Logger .Level .DEBUG , "Send BlockItem downstream: {0} " , blockItem );
137
+ subscribeStreamResponseObserver .onNext (subscribeStreamResponse );
138
+ }
129
139
}
130
140
}
131
141
}
0 commit comments