Skip to content

Commit a0c6ce6

Browse files
authored
feat: add a "slow mode" feature to the simulator in consumer mode (#1261)
The base "slow down" is a pause for a random time in μs within a configured range for each received batch. - Have timers where we wait a random number of blocks, within a configured range, before we "go slow" - Have timers to "go slow" for a random number of blocks within a configured range - Return to full speed after the "go slow" period elapses. Signed-off-by: Mustafa Uzun <mustafa.uzun@limechain.tech>
1 parent 20b8a9f commit a0c6ce6

File tree

11 files changed

+276
-19
lines changed

11 files changed

+276
-19
lines changed

tools-and-tests/simulator/src/main/java/org/hiero/block/simulator/config/SimulatorMappedConfigSourceInitializer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public final class SimulatorMappedConfigSourceInitializer {
3030
// Block consumer configuration
3131
new ConfigMapping("consumer.startBlockNumber", "CONSUMER_START_BLOCK_NUMBER"),
3232
new ConfigMapping("consumer.endBlockNumber", "CONSUMER_END_BLOCK_NUMBER"),
33+
new ConfigMapping("consumer.slowDownType", "CONSUMER_SLOW_DOWN_TYPE"),
34+
new ConfigMapping("consumer.slowDownMilliseconds", "CONSUMER_SLOWDOWN_MILLISECONDS"),
35+
new ConfigMapping("consumer.slowDownForBlockRange", "CONSUMER_SLOWDOWN_FOR_BLOCK_RANGE"),
3336

3437
// Block generator configuration
3538
new ConfigMapping("generator.generationMode", "GENERATOR_GENERATION_MODE"),

tools-and-tests/simulator/src/main/java/org/hiero/block/simulator/config/data/ConsumerConfig.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@
44
import com.swirlds.config.api.ConfigData;
55
import com.swirlds.config.api.ConfigProperty;
66
import org.hiero.block.simulator.config.logging.Loggable;
7+
import org.hiero.block.simulator.config.types.SlowDownType;
78

89
@ConfigData("consumer")
910
public record ConsumerConfig(
1011
@Loggable @ConfigProperty(defaultValue = "-1") long startBlockNumber,
11-
@Loggable @ConfigProperty(defaultValue = "-1") long endBlockNumber) {}
12+
@Loggable @ConfigProperty(defaultValue = "-1") long endBlockNumber,
13+
@Loggable @ConfigProperty(defaultValue = "NONE") SlowDownType slowDownType,
14+
@Loggable @ConfigProperty(defaultValue = "2") long slowDownMilliseconds,
15+
@Loggable @ConfigProperty(defaultValue = "10-30") String slowDownForBlockRange) {}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package org.hiero.block.simulator.config.types;
3+
4+
import static org.hiero.block.common.utils.StringUtilities.isBlank;
5+
6+
import java.util.HashSet;
7+
import java.util.List;
8+
import java.util.Random;
9+
import java.util.Set;
10+
import org.hiero.block.simulator.config.data.ConsumerConfig;
11+
12+
public enum SlowDownType {
13+
NONE {
14+
@Override
15+
public Set<Long> apply(final ConsumerConfig consumerConfig) {
16+
return Set.of(); // No slowdown
17+
}
18+
},
19+
FIXED {
20+
@Override
21+
public Set<Long> apply(final ConsumerConfig consumerConfig) {
22+
return parseSlowDownForBlockRange(consumerConfig.slowDownForBlockRange());
23+
}
24+
},
25+
RANDOM {
26+
@Override
27+
public Set<Long> apply(final ConsumerConfig consumerConfig) {
28+
List<Long> blockRange = parseBlockRange(consumerConfig.slowDownForBlockRange());
29+
return randomBlockRangeSet(blockRange.getFirst(), blockRange.getLast());
30+
}
31+
},
32+
RANDOM_WITH_WAIT {
33+
@Override
34+
public Set<Long> apply(final ConsumerConfig consumerConfig) {
35+
List<Long> blockRange = parseBlockRange(consumerConfig.slowDownForBlockRange());
36+
long randomBlocksToWait =
37+
new Random().nextLong(blockRange.getLast() - blockRange.getFirst() + 1) + blockRange.getFirst();
38+
Set<Long> set = parseSlowDownForBlockRange(consumerConfig.slowDownForBlockRange());
39+
set.removeIf(value -> value < randomBlocksToWait);
40+
return parseSlowDownForBlockRange(consumerConfig.slowDownForBlockRange());
41+
}
42+
};
43+
44+
public abstract Set<Long> apply(final ConsumerConfig consumerConfig);
45+
46+
private static Set<Long> parseSlowDownForBlockRange(final String slowDownForBlockRange) {
47+
final List<Long> list = parseBlockRange(slowDownForBlockRange);
48+
final long start = list.getFirst();
49+
final long end = list.getLast();
50+
51+
Set<Long> blockRangeSet = new HashSet<>();
52+
for (long i = start; i <= end; i++) {
53+
blockRangeSet.add(i);
54+
}
55+
return blockRangeSet;
56+
}
57+
58+
private static List<Long> parseBlockRange(final String slowDownForBlockRange) {
59+
if (isBlank(slowDownForBlockRange)) {
60+
return List.of();
61+
}
62+
final String[] parts = slowDownForBlockRange.split("-");
63+
if (parts.length != 2) {
64+
throw new IllegalArgumentException("Invalid range format. Expected format: start-end (e.g., 1-3)");
65+
}
66+
try {
67+
final long start = Long.parseLong(parts[0].trim());
68+
final long end = Long.parseLong(parts[1].trim());
69+
if (start > end) {
70+
throw new IllegalArgumentException("Range start cannot be greater than range end.");
71+
}
72+
73+
return List.of(start, end);
74+
} catch (NumberFormatException e) {
75+
throw new IllegalArgumentException("Range values must be valid numbers.", e);
76+
}
77+
}
78+
79+
private static Set<Long> randomBlockRangeSet(final long startBlock, final long endBlock) {
80+
final Random random = new Random();
81+
long randomStart = random.nextLong((endBlock - startBlock + 1));
82+
long randomEnd = random.nextLong(endBlock - startBlock + 1);
83+
if (randomStart > randomEnd) {
84+
long temp = randomStart;
85+
randomStart = randomEnd;
86+
randomEnd = temp;
87+
}
88+
89+
Set<Long> blockRangeSet = new HashSet<>();
90+
for (long i = randomStart; i <= randomEnd; i++) {
91+
blockRangeSet.add(i);
92+
}
93+
return blockRangeSet;
94+
}
95+
}

tools-and-tests/simulator/src/main/java/org/hiero/block/simulator/grpc/impl/ConsumerStreamGrpcClientImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public void requestBlocks(long startBlock, long endBlock) throws InterruptedExce
8383
Preconditions.requireWhole(endBlock);
8484
Preconditions.requireGreaterOrEqual(endBlock, startBlock);
8585

86-
consumerStreamObserver =
87-
new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses, lastKnownStatusesCapacity);
86+
consumerStreamObserver = new ConsumerStreamObserver(
87+
metricsService, streamLatch, lastKnownStatuses, lastKnownStatusesCapacity, consumerConfig);
8888

8989
SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder()
9090
.setStartBlockNumber(startBlock)
@@ -97,8 +97,8 @@ public void requestBlocks(long startBlock, long endBlock) throws InterruptedExce
9797

9898
@Override
9999
public void requestBlocks() throws InterruptedException {
100-
consumerStreamObserver =
101-
new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses, lastKnownStatusesCapacity);
100+
consumerStreamObserver = new ConsumerStreamObserver(
101+
metricsService, streamLatch, lastKnownStatuses, lastKnownStatusesCapacity, consumerConfig);
102102

103103
SubscribeStreamRequest request = SubscribeStreamRequest.newBuilder()
104104
.setStartBlockNumber(consumerConfig.startBlockNumber())

tools-and-tests/simulator/src/main/java/org/hiero/block/simulator/grpc/impl/ConsumerStreamObserver.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package org.hiero.block.simulator.grpc.impl;
33

4+
import static java.lang.System.Logger.Level.DEBUG;
45
import static java.lang.System.Logger.Level.ERROR;
56
import static java.lang.System.Logger.Level.INFO;
67
import static java.lang.System.Logger.Level.WARNING;
@@ -13,9 +14,12 @@
1314
import io.grpc.stub.StreamObserver;
1415
import java.util.Deque;
1516
import java.util.List;
17+
import java.util.Set;
1618
import java.util.concurrent.CountDownLatch;
1719
import java.util.concurrent.atomic.AtomicLong;
1820
import org.hiero.block.api.protoc.SubscribeStreamResponse;
21+
import org.hiero.block.simulator.config.data.ConsumerConfig;
22+
import org.hiero.block.simulator.config.types.SlowDownType;
1923
import org.hiero.block.simulator.metrics.MetricsService;
2024

2125
/**
@@ -34,6 +38,9 @@ public class ConsumerStreamObserver implements StreamObserver<SubscribeStreamRes
3438
private final Deque<String> lastKnownStatuses;
3539
private final AtomicLong blocksConsumed = new AtomicLong(0);
3640

41+
private final ConsumerConfig consumerConfig;
42+
private final Set<Long> slowDownBlockRangeSet;
43+
3744
/**
3845
* Constructs a new ConsumerStreamObserver.
3946
*
@@ -47,11 +54,14 @@ public ConsumerStreamObserver(
4754
@NonNull final MetricsService metricsService,
4855
@NonNull final CountDownLatch streamLatch,
4956
@NonNull final Deque<String> lastKnownStatuses,
50-
final int lastKnownStatusesCapacity) {
57+
final int lastKnownStatusesCapacity,
58+
@NonNull final ConsumerConfig consumerConfig) {
5159
this.metricsService = requireNonNull(metricsService);
5260
this.streamLatch = requireNonNull(streamLatch);
5361
this.lastKnownStatuses = requireNonNull(lastKnownStatuses);
5462
this.lastKnownStatusesCapacity = lastKnownStatusesCapacity;
63+
this.consumerConfig = consumerConfig;
64+
this.slowDownBlockRangeSet = consumerConfig.slowDownType().apply(consumerConfig);
5565
}
5666

5767
/**
@@ -99,6 +109,11 @@ public void onCompleted() {
99109
}
100110

101111
private void processBlockItems(List<BlockItem> blockItems) {
112+
long lastBlockConsumed = blocksConsumed.get();
113+
if (consumerConfig.slowDownType() != SlowDownType.NONE && slowDownBlockRangeSet.contains(lastBlockConsumed)) {
114+
slowDownProcessing("for block %d".formatted(lastBlockConsumed));
115+
}
116+
102117
blockItems.stream().filter(BlockItem::hasBlockProof).forEach(blockItem -> {
103118
metricsService.get(LiveBlocksConsumed).increment();
104119

@@ -122,4 +137,14 @@ private void logNonAscendingBlockNumbers(long blockNumber) {
122137
}
123138
}
124139
}
140+
141+
private void slowDownProcessing(final String message) {
142+
try {
143+
LOGGER.log(DEBUG, "Slowing down processing " + message);
144+
Thread.sleep(consumerConfig.slowDownMilliseconds());
145+
} catch (InterruptedException e) {
146+
Thread.currentThread().interrupt();
147+
LOGGER.log(INFO, "Stream processing interrupted during slowdown", e);
148+
}
149+
}
125150
}

tools-and-tests/simulator/src/test/java/org/hiero/block/simulator/config/SimulatorMappedConfigSourceInitializerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ class SimulatorMappedConfigSourceInitializerTest {
5353
// Block consumer configuration
5454
new ConfigMapping("consumer.startBlockNumber", "CONSUMER_START_BLOCK_NUMBER"),
5555
new ConfigMapping("consumer.endBlockNumber", "CONSUMER_END_BLOCK_NUMBER"),
56+
new ConfigMapping("consumer.slowDownType", "CONSUMER_SLOW_DOWN_TYPE"),
57+
new ConfigMapping("consumer.slowDownMilliseconds", "CONSUMER_SLOWDOWN_MILLISECONDS"),
58+
new ConfigMapping("consumer.slowDownForBlockRange", "CONSUMER_SLOWDOWN_FOR_BLOCK_RANGE"),
5659

5760
// Block generator configuration
5861
new ConfigMapping("generator.generationMode", "GENERATOR_GENERATION_MODE"),

tools-and-tests/simulator/src/test/java/org/hiero/block/simulator/config/logging/SimulatorConfigurationLoggerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ void testCurrentAppProperties() throws IOException {
2626
final SimulatorConfigurationLogger configurationLogging = new SimulatorConfigurationLogger(configuration);
2727
final Map<String, Object> config = configurationLogging.collectConfig(configuration);
2828
assertNotNull(config);
29-
assertEquals(43, config.size());
29+
assertEquals(46, config.size());
3030
for (final Map.Entry<String, Object> entry : config.entrySet()) {
3131
String value = entry.getValue().toString();
3232
if (value.contains("*")) {
@@ -42,7 +42,7 @@ void testWithMockedSensitiveProperty() throws IOException {
4242
final SimulatorConfigurationLogger configurationLogging = new SimulatorConfigurationLogger(configuration);
4343
final Map<String, Object> config = configurationLogging.collectConfig(configuration);
4444
assertNotNull(config);
45-
assertEquals(45, config.size());
45+
assertEquals(48, config.size());
4646
assertEquals("*****", config.get("test.secret").toString());
4747
assertEquals("", config.get("test.emptySecret").toString());
4848
}

tools-and-tests/simulator/src/test/java/org/hiero/block/simulator/grpc/impl/ConsumerStreamGrpcClientImplTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ public class ConsumerStreamGrpcClientImplTest {
4141
@Mock
4242
private BlockStreamConfig blockStreamConfig;
4343

44-
@Mock
45-
private ConsumerConfig consumerConfig;
46-
4744
private ConsumerStreamGrpcClient consumerStreamGrpcClientImpl;
4845
private Server server;
4946

@@ -98,6 +95,7 @@ public void subscribeBlockStream(
9895
when(blockStreamConfig.lastKnownStatusesCapacity()).thenReturn(10);
9996

10097
final Configuration config = TestUtils.getTestConfiguration();
98+
ConsumerConfig consumerConfig = config.getConfigData(ConsumerConfig.class);
10199
final MetricsService metricsService = new MetricsServiceImpl(getTestMetrics(config));
102100
consumerStreamGrpcClientImpl =
103101
new ConsumerStreamGrpcClientImpl(grpcConfig, blockStreamConfig, consumerConfig, metricsService);

tools-and-tests/simulator/src/test/java/org/hiero/block/simulator/grpc/impl/ConsumerStreamObserverTest.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
package org.hiero.block.simulator.grpc.impl;
33

4+
import static org.hiero.block.simulator.TestUtils.getTestConfiguration;
45
import static org.hiero.block.simulator.TestUtils.getTestMetrics;
56
import static org.junit.jupiter.api.Assertions.assertEquals;
67
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -19,7 +20,7 @@
1920
import org.hiero.block.api.protoc.BlockItemSet;
2021
import org.hiero.block.api.protoc.SubscribeStreamResponse;
2122
import org.hiero.block.api.protoc.SubscribeStreamResponse.Code;
22-
import org.hiero.block.simulator.TestUtils;
23+
import org.hiero.block.simulator.config.data.ConsumerConfig;
2324
import org.hiero.block.simulator.metrics.MetricsService;
2425
import org.hiero.block.simulator.metrics.MetricsServiceImpl;
2526
import org.hiero.block.simulator.metrics.SimulatorMetricTypes.Counter;
@@ -33,31 +34,35 @@ class ConsumerStreamObserverTest {
3334
private ArrayDeque<String> lastKnownStatuses;
3435
private ConsumerStreamObserver observer;
3536
private int lastKnownStatusesCapacity;
37+
private ConsumerConfig consumerConfig;
3638

3739
@BeforeEach
3840
void setUp() throws IOException {
39-
Configuration config = TestUtils.getTestConfiguration();
41+
Configuration config = getTestConfiguration();
4042

4143
metricsService = spy(new MetricsServiceImpl(getTestMetrics(config)));
4244
streamLatch = mock(CountDownLatch.class);
4345
ArrayDeque<String> lastKnownStatuses = new ArrayDeque<>();
4446
lastKnownStatusesCapacity = 10;
45-
46-
observer =
47-
new ConsumerStreamObserver(metricsService, streamLatch, lastKnownStatuses, lastKnownStatusesCapacity);
47+
consumerConfig = config.getConfigData(ConsumerConfig.class);
48+
observer = new ConsumerStreamObserver(
49+
metricsService, streamLatch, lastKnownStatuses, lastKnownStatusesCapacity, consumerConfig);
4850
}
4951

5052
@Test
5153
void testConstructorWithNullArguments() {
5254
assertThrows(
5355
NullPointerException.class,
54-
() -> new ConsumerStreamObserver(null, streamLatch, lastKnownStatuses, lastKnownStatusesCapacity));
56+
() -> new ConsumerStreamObserver(
57+
null, streamLatch, lastKnownStatuses, lastKnownStatusesCapacity, consumerConfig));
5558
assertThrows(
5659
NullPointerException.class,
57-
() -> new ConsumerStreamObserver(metricsService, null, lastKnownStatuses, lastKnownStatusesCapacity));
60+
() -> new ConsumerStreamObserver(
61+
metricsService, null, lastKnownStatuses, lastKnownStatusesCapacity, consumerConfig));
5862
assertThrows(
5963
NullPointerException.class,
60-
() -> new ConsumerStreamObserver(metricsService, streamLatch, null, lastKnownStatusesCapacity));
64+
() -> new ConsumerStreamObserver(
65+
metricsService, streamLatch, null, lastKnownStatusesCapacity, consumerConfig));
6166
}
6267

6368
@Test

tools-and-tests/suites/src/main/java/org/hiero/block/suites/subscriber/negative/NegativeSingleSubscriberTests.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import static org.hiero.block.suites.utils.BlockSimulatorUtils.createBlockSimulator;
55
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
import static org.junit.jupiter.api.Assertions.assertFalse;
67
import static org.junit.jupiter.api.Assertions.assertNotNull;
78
import static org.junit.jupiter.api.Assertions.assertTrue;
89

@@ -196,6 +197,67 @@ public void shouldReturnCorrectStatusForIncorrectEndBlock() throws IOException {
196197
assertEquals(0L, consumedBlocks);
197198
}
198199

200+
@Test
201+
@DisplayName("Should fail 3ms slowdown validation after each block in a range")
202+
public void shouldFailSlowdownValidationAfterEachBlock() throws IOException, InterruptedException {
203+
// ===== Prepare environment =================================================================
204+
final long startBlock = 1L;
205+
final long endBlock = 10L;
206+
final long expectedSlowdownMillis = 3L;
207+
final Map<String, String> consumerConfiguration = Map.of(
208+
"blockStream.simulatorMode",
209+
"CONSUMER",
210+
"consumer.startBlockNumber",
211+
String.valueOf(startBlock),
212+
"consumer.endBlockNumber",
213+
String.valueOf(endBlock),
214+
"consumer.slowDownMilliseconds",
215+
String.valueOf(expectedSlowdownMillis),
216+
"consumer.slowDownType",
217+
"FIXED",
218+
"consumer.slowDownForBlockRange",
219+
"1-3");
220+
final BlockStreamSimulatorApp publisherSimulator = createBlockSimulator();
221+
final BlockStreamSimulatorApp consumerSimulator = createBlockSimulator(consumerConfiguration);
222+
223+
simulatorAppsRef.add(publisherSimulator);
224+
simulatorAppsRef.add(consumerSimulator);
225+
226+
// ===== Start publisher and make sure it's streaming =======================================
227+
final Future<?> publisherSimulatorThread = startSimulatorInstance(publisherSimulator);
228+
simulators.add(publisherSimulatorThread);
229+
230+
boolean publisherReachedEndBlock = false;
231+
while (!publisherReachedEndBlock) {
232+
if (publisherSimulator.getStreamStatus().publishedBlocks() > endBlock) {
233+
publisherReachedEndBlock = true;
234+
}
235+
}
236+
237+
// ===== Start consumer and validate slowdown ===============================================
238+
final Future<?> consumerSimulatorThread = startSimulatorInThread(consumerSimulator);
239+
simulators.add(consumerSimulatorThread);
240+
241+
long previousBlockTime = System.currentTimeMillis();
242+
boolean slowdownValidated = true;
243+
244+
for (long block = startBlock; block <= endBlock; block++) {
245+
while (consumerSimulator.getStreamStatus().consumedBlocks() < block) {
246+
Thread.sleep(1); // Wait for the block to be consumed
247+
}
248+
final long currentBlockTime = System.currentTimeMillis();
249+
final long timeDifference = currentBlockTime - previousBlockTime;
250+
251+
if (timeDifference < expectedSlowdownMillis) {
252+
slowdownValidated = false;
253+
break;
254+
}
255+
previousBlockTime = currentBlockTime;
256+
}
257+
258+
assertFalse(slowdownValidated, "Not expected 3ms slowdown was validated for all blocks.");
259+
}
260+
199261
/**
200262
* Starts a simulator in a thread and make sure that it's running and trying to publish blocks
201263
*

0 commit comments

Comments
 (0)