1
1
// SPDX-License-Identifier: Apache-2.0
2
2
package org .hiero .block .node .backfill .client ;
3
3
4
+ import static java .lang .System .Logger .Level .DEBUG ;
4
5
import static java .lang .System .Logger .Level .TRACE ;
5
6
import static java .util .Objects .requireNonNull ;
6
7
import static org .hiero .block .api .BlockStreamSubscribeServiceInterface .FULL_NAME ;
7
8
8
9
import com .hedera .hapi .block .stream .output .BlockHeader ;
9
- import com .hedera .pbj .runtime .Codec ;
10
10
import com .hedera .pbj .runtime .ParseException ;
11
- import com .hedera .pbj .runtime .UncheckedParseException ;
12
11
import com .hedera .pbj .runtime .grpc .GrpcCall ;
13
12
import com .hedera .pbj .runtime .grpc .GrpcClient ;
14
13
import com .hedera .pbj .runtime .grpc .Pipeline ;
@@ -44,12 +43,11 @@ public class BlockStreamSubscribeUnparsedClient {
44
43
45
44
// From constructor
46
45
private final GrpcClient grpcClient ;
47
- private final ServiceInterface .RequestOptions requestOptions ;
48
46
49
47
public BlockStreamSubscribeUnparsedClient (
50
48
@ NonNull final GrpcClient grpcClient , @ NonNull final ServiceInterface .RequestOptions requestOptions ) {
51
49
this .grpcClient = requireNonNull (grpcClient );
52
- this . requestOptions = requireNonNull (requestOptions );
50
+ ServiceInterface . RequestOptions requestOptions1 = requireNonNull (requestOptions );
53
51
}
54
52
55
53
/**
@@ -77,72 +75,13 @@ public List<BlockUnparsed> getBatchOfBlocks(long startBlockNumber, long endBlock
77
75
.build ();
78
76
79
77
// Create a per-request pipeline that closes over `ctx`
80
- final Pipeline <SubscribeStreamResponseUnparsed > pipeline = new Pipeline <>() {
81
- @ Override
82
- public void onSubscribe (Flow .Subscription subscription ) {
83
- LOGGER .log (TRACE , "received onSubscribe confirmation" );
84
- // No backpressure negotiation needed for this pattern.
85
- }
86
-
87
- @ Override
88
- public void onNext (SubscribeStreamResponseUnparsed subscribeStreamResponse ) {
89
- if (subscribeStreamResponse .hasBlockItems ()) {
90
- final List <BlockItemUnparsed > blockItems =
91
- subscribeStreamResponse .blockItems ().blockItems ();
92
-
93
- // Start of a new block
94
- if (blockItems .getFirst ().hasBlockHeader ()) {
95
- final long expected = ctx .expectedBlockNumber ;
96
- final long actual = extractBlockNumberFromBlockHeader (blockItems .getFirst ());
97
- if (actual != expected ) {
98
- ctx .fail (new IllegalStateException (
99
- "Expected block number " + expected + " but received " + actual ));
100
- return ;
101
- }
102
- // Begin a new block with the header and following items in this frame
103
- ctx .currentBlockItems = new ArrayList <>(blockItems );
104
- } else {
105
- // Continuation of the current block
106
- ctx .currentBlockItems .addAll (blockItems );
107
- }
108
-
109
- // End of block
110
- if (blockItems .getLast ().hasBlockProof ()) {
111
- ctx .blocks .add (BlockUnparsed .newBuilder ()
112
- .blockItems (ctx .currentBlockItems )
113
- .build ());
114
- ctx .currentBlockItems = new ArrayList <>();
115
- ctx .expectedBlockNumber ++;
116
- }
117
-
118
- } else if (subscribeStreamResponse .hasStatus ()) {
119
- final SubscribeStreamResponse .Code code = subscribeStreamResponse .status ();
120
- if (code != SubscribeStreamResponse .Code .SUCCESS ) {
121
- ctx .fail (new RuntimeException ("Received error code: " + code ));
122
- }
123
- } else {
124
- ctx .fail (new RuntimeException ("Received unexpected response without block items or code" ));
125
- }
126
- }
127
-
128
- @ Override
129
- public void onError (Throwable throwable ) {
130
- LOGGER .log (TRACE , "received onError" , throwable );
131
- ctx .fail (throwable );
132
- }
133
-
134
- @ Override
135
- public void onComplete () {
136
- LOGGER .log (TRACE , "received onComplete" );
137
- ctx .complete ();
138
- }
139
- };
78
+ final Pipeline <SubscribeStreamResponseUnparsed > pipeline = new SubscribePipeline (ctx );
140
79
141
80
// Issue the call using the per-request pipeline
142
81
final GrpcCall <SubscribeStreamRequest , SubscribeStreamResponseUnparsed > call = grpcClient .createCall (
143
82
FULL_NAME + "/subscribeBlockStream" ,
144
- getSubscribeStreamRequestCodec ( requestOptions ) ,
145
- getSubscribeStreamResponseUnparsedCodec ( requestOptions ) ,
83
+ SubscribeStreamRequest . PROTOBUF ,
84
+ SubscribeStreamResponseUnparsed . PROTOBUF ,
146
85
pipeline );
147
86
148
87
call .sendRequest (request , true );
@@ -154,35 +93,8 @@ public void onComplete() {
154
93
/**
155
94
* Extracts the block number from a block header item.
156
95
*/
157
- private static long extractBlockNumberFromBlockHeader (BlockItemUnparsed itemUnparsed ) {
158
- try {
159
- return BlockHeader .PROTOBUF .parse (itemUnparsed .blockHeaderOrThrow ()).number ();
160
- } catch (ParseException e ) {
161
- throw new UncheckedParseException (e );
162
- }
163
- }
164
-
165
- private static Codec <SubscribeStreamRequest > getSubscribeStreamRequestCodec (
166
- @ NonNull final ServiceInterface .RequestOptions options ) {
167
- requireNonNull (options );
168
- // Default to protobuf, and don't error out if both are set:
169
- if (options .isJson () && !options .isProtobuf ()) {
170
- return SubscribeStreamRequest .JSON ;
171
- } else {
172
- return SubscribeStreamRequest .PROTOBUF ;
173
- }
174
- }
175
-
176
- @ NonNull
177
- private static Codec <SubscribeStreamResponseUnparsed > getSubscribeStreamResponseUnparsedCodec (
178
- @ NonNull final ServiceInterface .RequestOptions options ) {
179
- requireNonNull (options );
180
- // Default to protobuf, and don't error out if both are set:
181
- if (options .isJson () && !options .isProtobuf ()) {
182
- return SubscribeStreamResponseUnparsed .JSON ;
183
- } else {
184
- return SubscribeStreamResponseUnparsed .PROTOBUF ;
185
- }
96
+ private static long extractBlockNumberFromBlockHeader (BlockItemUnparsed itemUnparsed ) throws ParseException {
97
+ return BlockHeader .PROTOBUF .parse (itemUnparsed .blockHeaderOrThrow ()).number ();
186
98
}
187
99
188
100
/**
@@ -214,12 +126,85 @@ List<BlockUnparsed> await() {
214
126
done .await ();
215
127
} catch (InterruptedException e ) {
216
128
Thread .currentThread ().interrupt ();
217
- throw new RuntimeException ("Interrupted while waiting for blocks" , e );
218
129
}
219
130
if (error != null ) {
220
131
throw new RuntimeException ("Error fetching blocks" , error );
221
132
}
222
133
return blocks ;
223
134
}
224
135
}
136
+
137
+ private static final class SubscribePipeline implements Pipeline <SubscribeStreamResponseUnparsed > {
138
+ private final RequestContext ctx ;
139
+
140
+ SubscribePipeline (RequestContext ctx ) {
141
+ this .ctx = ctx ;
142
+ }
143
+
144
+ @ Override
145
+ public void onSubscribe (Flow .Subscription subscription ) {
146
+ LOGGER .log (TRACE , "received onSubscribe confirmation" );
147
+ // No backpressure negotiation needed for this pattern.
148
+ }
149
+
150
+ @ Override
151
+ public void onNext (SubscribeStreamResponseUnparsed resp ) {
152
+ try {
153
+ if (resp .hasBlockItems ()) {
154
+ final List <BlockItemUnparsed > frame = resp .blockItems ().blockItems ();
155
+
156
+ if (frame .getFirst ().hasBlockHeader ()) {
157
+ final long expected = ctx .expectedBlockNumber ;
158
+ final long actual = extractBlockNumberFromBlockHeader (frame .getFirst ());
159
+ if (actual != expected ) {
160
+ ctx .fail (new IllegalStateException (
161
+ "Expected block number " + expected + " but received " + actual ));
162
+ return ;
163
+ }
164
+ // Start a new block: reuse the buffer and populate it.
165
+ ctx .currentBlockItems .clear ();
166
+ ctx .currentBlockItems .addAll (frame );
167
+ } else {
168
+ // Continuation: append to the same buffer.
169
+ ctx .currentBlockItems .addAll (frame );
170
+ }
171
+
172
+ if (frame .getLast ().hasBlockProof ()) {
173
+ // Snapshot the current items to avoid retaining the large buffer in the finished block.
174
+ final List <BlockItemUnparsed > snapshot = List .copyOf (ctx .currentBlockItems );
175
+ ctx .blocks .add (
176
+ BlockUnparsed .newBuilder ().blockItems (snapshot ).build ());
177
+ ctx .currentBlockItems .clear ();
178
+ ctx .expectedBlockNumber ++;
179
+ }
180
+
181
+ } else if (resp .hasStatus ()) {
182
+ final SubscribeStreamResponse .Code code = resp .status ();
183
+ if (code != SubscribeStreamResponse .Code .SUCCESS ) {
184
+ ctx .fail (new RuntimeException ("Received error code: " + code ));
185
+ }
186
+ } else {
187
+ ctx .fail (new RuntimeException ("Received unexpected response without block items or code" ));
188
+ }
189
+ } catch (ParseException e ) {
190
+ LOGGER .log (DEBUG , "Parse error in block item" , e );
191
+ ctx .fail (e );
192
+ } catch (RuntimeException e ) {
193
+ LOGGER .log (DEBUG , "Runtime error processing SubscribeStreamResponseUnparsed" , e );
194
+ ctx .fail (e );
195
+ }
196
+ }
197
+
198
+ @ Override
199
+ public void onError (Throwable throwable ) {
200
+ LOGGER .log (TRACE , "received onError" , throwable );
201
+ ctx .fail (throwable );
202
+ }
203
+
204
+ @ Override
205
+ public void onComplete () {
206
+ LOGGER .log (TRACE , "received onComplete" );
207
+ ctx .complete ();
208
+ }
209
+ }
225
210
}
0 commit comments