16
16
17
17
package com .hedera .block .server .consumer ;
18
18
19
+ import static com .hedera .block .protos .BlockStreamService .BlockItem ;
20
+ import static com .hedera .block .protos .BlockStreamService .SubscribeStreamResponse ;
21
+
19
22
import com .hedera .block .server .data .ObjectEvent ;
20
23
import com .hedera .block .server .mediator .StreamMediator ;
21
24
import io .grpc .stub .ServerCallStreamObserver ;
22
25
import io .grpc .stub .StreamObserver ;
23
-
24
26
import java .time .InstantSource ;
25
27
import java .util .concurrent .atomic .AtomicBoolean ;
26
28
27
- import static com .hedera .block .protos .BlockStreamService .BlockItem ;
28
- import static com .hedera .block .protos .BlockStreamService .SubscribeStreamResponse ;
29
-
30
29
/**
31
- * The ConsumerBlockItemObserver class is the primary integration point between the LMAX Disruptor and
32
- * an instance of a downstream consumer (represented by subscribeStreamResponseObserver provided by Helidon).
33
- * The ConsumerBlockItemObserver implements the EventHandler interface so the Disruptor can invoke
34
- * the onEvent() method when a new SubscribeStreamResponse is available.
30
+ * The ConsumerBlockItemObserver class is the primary integration point between the LMAX Disruptor
31
+ * and an instance of a downstream consumer (represented by subscribeStreamResponseObserver provided
32
+ * by Helidon). The ConsumerBlockItemObserver implements the EventHandler interface so the Disruptor
33
+ * can invoke the onEvent() method when a new SubscribeStreamResponse is available.
35
34
*/
36
35
public class ConsumerBlockItemObserver
37
36
implements BlockItemEventHandler <ObjectEvent <SubscribeStreamResponse >> {
@@ -114,7 +113,9 @@ public void onEvent(
114
113
final long currentMillis = producerLivenessClock .millis ();
115
114
if (currentMillis - producerLivenessMillis > timeoutThresholdMillis ) {
116
115
streamMediator .unsubscribe (this );
117
- LOGGER .log (System .Logger .Level .DEBUG , "Unsubscribed ConsumerBlockItemObserver due to producer timeout" );
116
+ LOGGER .log (
117
+ System .Logger .Level .DEBUG ,
118
+ "Unsubscribed ConsumerBlockItemObserver due to producer timeout" );
118
119
} else {
119
120
120
121
// Only send the response if the consumer has not cancelled
@@ -133,7 +134,10 @@ public void onEvent(
133
134
}
134
135
135
136
if (streamStarted ) {
136
- LOGGER .log (System .Logger .Level .DEBUG , "Send BlockItem downstream: {0} " , blockItem );
137
+ LOGGER .log (
138
+ System .Logger .Level .DEBUG ,
139
+ "Send BlockItem downstream: {0} " ,
140
+ blockItem );
137
141
subscribeStreamResponseObserver .onNext (subscribeStreamResponse );
138
142
}
139
143
}
0 commit comments