Skip to content

Commit b9039ec

Browse files
committed
feat: add support to the block stream simulator to fail sending mid-block (#523)
Signed-off-by: Alex Kehayov <aleks.kehayov@limechain.tech>
1 parent fb38bb7 commit b9039ec

File tree

9 files changed

+180
-66
lines changed

9 files changed

+180
-66
lines changed

simulator/src/main/java/org/hiero/block/simulator/config/SimulatorMappedConfigSourceInitializer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public final class SimulatorMappedConfigSourceInitializer {
2424
new ConfigMapping("blockStream.streamingMode", "BLOCK_STREAM_STREAMING_MODE"),
2525
new ConfigMapping("blockStream.millisecondsPerBlock", "BLOCK_STREAM_MILLISECONDS_PER_BLOCK"),
2626
new ConfigMapping("blockStream.blockItemsBatchSize", "BLOCK_STREAM_BLOCK_ITEMS_BATCH_SIZE"),
27+
new ConfigMapping("blockStream.midBlockFailType", "BLOCK_STREAM_MID_BLOCK_FAIL_TYPE"),
28+
new ConfigMapping("blockStream.midBlockFailOffset", "BLOCK_STREAM_MID_BLOCK_FAIL_OFFSET"),
2729

2830
// Block consumer configuration
2931
new ConfigMapping("consumer.startBlockNumber", "CONSUMER_START_BLOCK_NUMBER"),

simulator/src/main/java/org/hiero/block/simulator/config/data/BlockStreamConfig.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.swirlds.config.api.ConfigData;
55
import com.swirlds.config.api.ConfigProperty;
66
import org.hiero.block.simulator.config.logging.Loggable;
7+
import org.hiero.block.simulator.config.types.MidBlockFailType;
78
import org.hiero.block.simulator.config.types.SimulatorMode;
89
import org.hiero.block.simulator.config.types.StreamingMode;
910

@@ -26,7 +27,9 @@ public record BlockStreamConfig(
2627
@Loggable @ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream,
2728
@Loggable @ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode,
2829
@Loggable @ConfigProperty(defaultValue = "1000") int millisecondsPerBlock,
29-
@Loggable @ConfigProperty(defaultValue = "1000") int blockItemsBatchSize) {
30+
@Loggable @ConfigProperty(defaultValue = "1000") int blockItemsBatchSize,
31+
@Loggable @ConfigProperty(defaultValue = "NONE") MidBlockFailType midBlockFailType,
32+
@Loggable @ConfigProperty(defaultValue = "0") long midBlockFailOffset) {
3033

3134
/**
3235
* Creates a new {@link Builder} instance for constructing a {@code BlockStreamConfig}.
@@ -48,6 +51,8 @@ public static class Builder {
4851
private StreamingMode streamingMode = StreamingMode.MILLIS_PER_BLOCK;
4952
private int millisecondsPerBlock = 1000;
5053
private int blockItemsBatchSize = 1000;
54+
private MidBlockFailType midBlockFailType = MidBlockFailType.NONE;
55+
private long midBlockFailOffset = 0;
5156

5257
/**
5358
* Creates a new instance of the {@code Builder} class with default configuration values.
@@ -133,6 +138,28 @@ public Builder blockItemsBatchSize(int blockItemsBatchSize) {
133138
return this;
134139
}
135140

141+
/**
142+
* Sets a failure type to occur while streaming.
143+
*
144+
* @param midBlockFailType the failure type
145+
* @return this {@code Builder} instance
146+
*/
147+
public Builder midBlockFailType(MidBlockFailType midBlockFailType) {
148+
this.midBlockFailType = midBlockFailType;
149+
return this;
150+
}
151+
152+
/**
153+
* Sets the index of the failing block.
154+
*
155+
* @param midBlockFailOffset the index of the failing block
156+
* @return this {@code Builder} instance
157+
*/
158+
public Builder midBlockFailOffset(long midBlockFailOffset) {
159+
this.midBlockFailOffset = midBlockFailOffset;
160+
return this;
161+
}
162+
136163
/**
137164
* Builds a new {@link BlockStreamConfig} instance with the configured values.
138165
*
@@ -146,7 +173,9 @@ public BlockStreamConfig build() {
146173
maxBlockItemsToStream,
147174
streamingMode,
148175
millisecondsPerBlock,
149-
blockItemsBatchSize);
176+
blockItemsBatchSize,
177+
midBlockFailType,
178+
midBlockFailOffset);
150179
}
151180
}
152181
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package org.hiero.block.simulator.config.types;
3+
4+
/** The MidBlockFailType enum defines the type of failure in the process of block streaming.
5+
* Failure will occur randomly between the header and the proof of the block. */
6+
public enum MidBlockFailType {
7+
/**
8+
* The NONE value indicates no failure will be simulated during streaming.
9+
*/
10+
NONE,
11+
/**
12+
* The ABRUPT value indicates that an abrupt disconnection will occur while streaming
13+
* (without closing the connection or sending an EndOfStream message).
14+
*/
15+
ABRUPT,
16+
/**
17+
* The EOS value indicates that an EndOfStream message will be sent before the final item of the block.
18+
* Currently, onError is called, as the client is not able to send actual EOS message
19+
*/
20+
EOS
21+
}

simulator/src/main/java/org/hiero/block/simulator/grpc/PublishStreamGrpcClient.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
package org.hiero.block.simulator.grpc;
33

44
import com.hedera.hapi.block.stream.protoc.Block;
5-
import com.hedera.hapi.block.stream.protoc.BlockItem;
65
import java.util.List;
76

87
/**
@@ -14,14 +13,6 @@ public interface PublishStreamGrpcClient {
1413
*/
1514
void init();
1615

17-
/**
18-
* Streams the block item.
19-
*
20-
* @param blockItems list of the block item to be streamed
21-
* @return true if the block item is streamed successfully, false otherwise
22-
*/
23-
boolean streamBlockItem(List<BlockItem> blockItems);
24-
2516
/**
2617
* Streams the block.
2718
*

simulator/src/main/java/org/hiero/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
import static java.lang.System.Logger.Level.DEBUG;
55
import static java.lang.System.Logger.Level.ERROR;
6-
import static java.lang.System.Logger.Level.INFO;
76
import static java.util.Objects.requireNonNull;
87
import static org.hiero.block.simulator.metrics.SimulatorMetricTypes.Counter.LiveBlockItemsSent;
98
import static org.hiero.block.simulator.metrics.SimulatorMetricTypes.Counter.LiveBlocksSent;
@@ -20,11 +19,13 @@
2019
import java.util.ArrayDeque;
2120
import java.util.Deque;
2221
import java.util.List;
22+
import java.util.Random;
2323
import java.util.concurrent.atomic.AtomicBoolean;
2424
import javax.inject.Inject;
2525
import org.hiero.block.common.utils.ChunkUtils;
2626
import org.hiero.block.simulator.config.data.BlockStreamConfig;
2727
import org.hiero.block.simulator.config.data.GrpcConfig;
28+
import org.hiero.block.simulator.config.types.MidBlockFailType;
2829
import org.hiero.block.simulator.grpc.PublishStreamGrpcClient;
2930
import org.hiero.block.simulator.metrics.MetricsService;
3031
import org.hiero.block.simulator.startup.SimulatorStartupData;
@@ -57,11 +58,11 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {
5758
/**
5859
* Creates a new PublishStreamGrpcClientImpl with the specified dependencies.
5960
*
60-
* @param grpcConfig The configuration for gRPC connection settings
61+
* @param grpcConfig The configuration for gRPC connection settings
6162
* @param blockStreamConfig The configuration for block streaming parameters
62-
* @param metricsService The service for recording publication metrics
63-
* @param streamEnabled Flag controlling stream state
64-
* @param startupData The startup data for the simulator
63+
* @param metricsService The service for recording publication metrics
64+
* @param streamEnabled Flag controlling stream state
65+
* @param startupData The startup data for the simulator
6566
* @throws NullPointerException if any parameter is null
6667
*/
6768
@Inject
@@ -95,33 +96,6 @@ public void init() {
9596
lastKnownStatuses.clear();
9697
}
9798

98-
/**
99-
* Streams a list of block items to the server.
100-
*
101-
* @param blockItems The list of block items to stream
102-
* @return true if streaming should continue, false if streaming should stop
103-
*/
104-
@Override
105-
public boolean streamBlockItem(List<BlockItem> blockItems) {
106-
if (streamEnabled.get()) {
107-
requestStreamObserver.onNext(PublishStreamRequest.newBuilder()
108-
.setBlockItems(BlockItemSet.newBuilder()
109-
.addAllBlockItems(blockItems)
110-
.build())
111-
.build());
112-
113-
metricsService.get(LiveBlockItemsSent).add(blockItems.size());
114-
LOGGER.log(
115-
INFO,
116-
"Number of block items sent: "
117-
+ metricsService.get(LiveBlockItemsSent).get());
118-
} else {
119-
LOGGER.log(ERROR, "Not allowed to send next batch of block items");
120-
}
121-
122-
return streamEnabled.get();
123-
}
124-
12599
/**
126100
* Streams a complete block to the server, chunking it if necessary based on configuration.
127101
*
@@ -134,6 +108,7 @@ public boolean streamBlock(Block block) {
134108
ChunkUtils.chunkify(block.getItemsList(), blockStreamConfig.blockItemsBatchSize());
135109
for (List<BlockItem> streamingBatch : streamingBatches) {
136110
if (streamEnabled.get()) {
111+
handleMidBlockFailIfSet(streamingBatch);
137112
requestStreamObserver.onNext(PublishStreamRequest.newBuilder()
138113
.setBlockItems(BlockItemSet.newBuilder()
139114
.addAllBlockItems(streamingBatch)
@@ -192,4 +167,28 @@ public void shutdown() throws InterruptedException {
192167
completeStreaming();
193168
channel.shutdown();
194169
}
170+
171+
private void handleMidBlockFailIfSet(List<BlockItem> streamingBatch) {
172+
if (blockStreamConfig.midBlockFailType() != MidBlockFailType.NONE
173+
&& blockStreamConfig.midBlockFailOffset()
174+
== metricsService.get(LiveBlocksSent).get()) {
175+
// streaming fails after the BlockHeader, but before the last BlockItem or the BlockProof
176+
// the block is considered to have a proper structure
177+
if (streamingBatch.size() < 3) {
178+
throw new IllegalArgumentException("Block items batch size is too small for mid block fail simulation");
179+
}
180+
int failIndex = new Random().nextInt(1, streamingBatch.size() - 1);
181+
List<BlockItem> streamingBatchBeforeFail = streamingBatch.subList(0, failIndex);
182+
requestStreamObserver.onNext(PublishStreamRequest.newBuilder()
183+
.setBlockItems(BlockItemSet.newBuilder()
184+
.addAllBlockItems(streamingBatchBeforeFail)
185+
.build())
186+
.build());
187+
if (blockStreamConfig.midBlockFailType() == MidBlockFailType.ABRUPT) {
188+
throw new RuntimeException("Configured abrupt disconnection occurred");
189+
} else if (blockStreamConfig.midBlockFailType() == MidBlockFailType.EOS) {
190+
requestStreamObserver.onError(new Exception("Configured failure occurred, calling onError()"));
191+
}
192+
}
193+
}
195194
}

simulator/src/main/resources/app.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ prometheus.endpointPortNumber=9998
3636
#blockStream.millisecondsPerBlock=500
3737
#blockStream.blockItemsBatchSize=1_000
3838
#blockStream.delayBetweenBlockItems=3_000_000
39+
# When this property is active it causes an abrupt disconnect or sending EndOfStream message (NONE/ABRUPT/EOS)
40+
#blockStream.midBlockFailType=NONE
41+
# The number of blocks streamed before the failure
42+
#blockStream.midBlockFailOffset=0
3943

4044
# ----------------------------------------------
4145

simulator/src/test/java/org/hiero/block/simulator/config/SimulatorMappedConfigSourceInitializerTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class SimulatorMappedConfigSourceInitializerTest {
4747
new ConfigMapping("blockStream.streamingMode", "BLOCK_STREAM_STREAMING_MODE"),
4848
new ConfigMapping("blockStream.millisecondsPerBlock", "BLOCK_STREAM_MILLISECONDS_PER_BLOCK"),
4949
new ConfigMapping("blockStream.blockItemsBatchSize", "BLOCK_STREAM_BLOCK_ITEMS_BATCH_SIZE"),
50+
new ConfigMapping("blockStream.midBlockFailType", "BLOCK_STREAM_MID_BLOCK_FAIL_TYPE"),
51+
new ConfigMapping("blockStream.midBlockFailOffset", "BLOCK_STREAM_MID_BLOCK_FAIL_OFFSET"),
5052

5153
// Block consumer configuration
5254
new ConfigMapping("consumer.startBlockNumber", "CONSUMER_START_BLOCK_NUMBER"),

simulator/src/test/java/org/hiero/block/simulator/config/logging/SimulatorConfigurationLoggerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ void testCurrentAppProperties() throws IOException {
2626
final SimulatorConfigurationLogger configurationLogging = new SimulatorConfigurationLogger(configuration);
2727
final Map<String, Object> config = configurationLogging.collectConfig(configuration);
2828
assertNotNull(config);
29-
assertEquals(36, config.size());
29+
assertEquals(38, config.size());
3030
for (final Map.Entry<String, Object> entry : config.entrySet()) {
3131
String value = entry.getValue().toString();
3232
if (value.contains("*")) {
@@ -42,7 +42,7 @@ void testWithMockedSensitiveProperty() throws IOException {
4242
final SimulatorConfigurationLogger configurationLogging = new SimulatorConfigurationLogger(configuration);
4343
final Map<String, Object> config = configurationLogging.collectConfig(configuration);
4444
assertNotNull(config);
45-
assertEquals(38, config.size());
45+
assertEquals(40, config.size());
4646
assertEquals("*****", config.get("test.secret").toString());
4747
assertEquals("", config.get("test.emptySecret").toString());
4848
}

0 commit comments

Comments
 (0)