Skip to content

Commit e1d3826

Browse files
authored
feat: add support for PublishBlockStream to simulator (#548)
This PR adds support for PublishBlockStream in the simulator, by introducing new wokring mode, called PublisherServerMode, where the application acts a server, which listents and awaits stream of block data. With the effort here I aim mainly to introduce the feature, where in future ones we will refine it, according to needs to be able to use it as testing driver. Signed-off-by: georgi-l95 <glazarov95@gmail.com>
1 parent 4b5bb5b commit e1d3826

30 files changed

+1037
-226
lines changed

simulator/build.gradle.kts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,32 @@ testModuleInfo {
3434
}
3535

3636
// Task to run simulator in Publisher mode
37-
tasks.register<JavaExec>("runPublisher") {
38-
description = "Run the simulator in Publisher mode"
37+
tasks.register<JavaExec>("runPublisherClient") {
38+
description = "Run the simulator in Publisher Client mode"
3939
group = "application"
4040

4141
mainClass = application.mainClass
4242
mainModule = application.mainModule
4343
classpath = sourceSets["main"].runtimeClasspath
4444

45-
environment("BLOCK_STREAM_SIMULATOR_MODE", "PUBLISHER")
45+
environment("BLOCK_STREAM_SIMULATOR_MODE", "PUBLISHER_CLIENT")
4646
environment("PROMETHEUS_ENDPOINT_ENABLED", "true")
4747
environment("PROMETHEUS_ENDPOINT_PORT_NUMBER", "9998")
4848
}
4949

50+
tasks.register<JavaExec>("runPublisherServer") {
51+
description = "Run the simulator in Publisher Server mode"
52+
group = "application"
53+
54+
mainClass = application.mainClass
55+
mainModule = application.mainModule
56+
classpath = sourceSets["main"].runtimeClasspath
57+
58+
environment("BLOCK_STREAM_SIMULATOR_MODE", "PUBLISHER_SERVER")
59+
environment("PROMETHEUS_ENDPOINT_ENABLED", "true")
60+
environment("PROMETHEUS_ENDPOINT_PORT_NUMBER", "9996")
61+
}
62+
5063
// Task to run simulator in Consumer mode
5164
tasks.register<JavaExec>("runConsumer") {
5265
description = "Run the simulator in Consumer mode"

simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorApp.java

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,12 @@
66
import static java.lang.System.Logger.Level.INFO;
77
import static java.util.Objects.requireNonNull;
88

9-
import com.hedera.block.simulator.config.data.BlockStreamConfig;
109
import com.hedera.block.simulator.config.data.StreamStatus;
11-
import com.hedera.block.simulator.config.types.SimulatorMode;
1210
import com.hedera.block.simulator.exception.BlockSimulatorParsingException;
1311
import com.hedera.block.simulator.generator.BlockStreamManager;
1412
import com.hedera.block.simulator.grpc.ConsumerStreamGrpcClient;
1513
import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
16-
import com.hedera.block.simulator.metrics.MetricsService;
17-
import com.hedera.block.simulator.mode.CombinedModeHandler;
18-
import com.hedera.block.simulator.mode.ConsumerModeHandler;
19-
import com.hedera.block.simulator.mode.PublisherModeHandler;
14+
import com.hedera.block.simulator.grpc.PublishStreamGrpcServer;
2015
import com.hedera.block.simulator.mode.SimulatorModeHandler;
2116
import com.swirlds.config.api.Configuration;
2217
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -46,6 +41,7 @@ public class BlockStreamSimulatorApp {
4641

4742
// Service dependencies
4843
private final PublishStreamGrpcClient publishStreamGrpcClient;
44+
private final PublishStreamGrpcServer publishStreamGrpcServer;
4945
private final ConsumerStreamGrpcClient consumerStreamGrpcClient;
5046
private final SimulatorModeHandler simulatorModeHandler;
5147

@@ -62,7 +58,6 @@ public class BlockStreamSimulatorApp {
6258
* generation
6359
* @param publishStreamGrpcClient The gRPC client for publishing blocks
6460
* @param consumerStreamGrpcClient The gRPC client for consuming blocks
65-
* @param metricsService The service for recording metrics
6661
* @throws NullPointerException if any parameter is null
6762
* @throws IllegalArgumentException if an unknown simulator mode is configured
6863
*/
@@ -71,27 +66,18 @@ public BlockStreamSimulatorApp(
7166
@NonNull Configuration configuration,
7267
@NonNull BlockStreamManager blockStreamManager,
7368
@NonNull PublishStreamGrpcClient publishStreamGrpcClient,
69+
@NonNull PublishStreamGrpcServer publishStreamGrpcServer,
7470
@NonNull ConsumerStreamGrpcClient consumerStreamGrpcClient,
75-
@NonNull MetricsService metricsService) {
71+
@NonNull SimulatorModeHandler simulatorModeHandler) {
7672

7773
requireNonNull(configuration);
7874
requireNonNull(blockStreamManager);
7975
loadLoggingProperties();
8076

8177
this.publishStreamGrpcClient = requireNonNull(publishStreamGrpcClient);
78+
this.publishStreamGrpcServer = requireNonNull(publishStreamGrpcServer);
8279
this.consumerStreamGrpcClient = requireNonNull(consumerStreamGrpcClient);
83-
84-
// Initialize the appropriate mode handler based on configuration
85-
final BlockStreamConfig blockStreamConfig =
86-
requireNonNull(configuration.getConfigData(BlockStreamConfig.class));
87-
// @todo(386) Load simulator mode using dagger
88-
final SimulatorMode simulatorMode = blockStreamConfig.simulatorMode();
89-
this.simulatorModeHandler = switch (simulatorMode) {
90-
case PUBLISHER -> new PublisherModeHandler(
91-
blockStreamConfig, publishStreamGrpcClient, blockStreamManager, metricsService);
92-
case CONSUMER -> new ConsumerModeHandler(consumerStreamGrpcClient);
93-
case BOTH -> new CombinedModeHandler();
94-
};
80+
this.simulatorModeHandler = requireNonNull(simulatorModeHandler);
9581
}
9682

9783
/**
@@ -148,8 +134,10 @@ public void stop() throws InterruptedException {
148134
public StreamStatus getStreamStatus() {
149135
return StreamStatus.builder()
150136
.publishedBlocks(publishStreamGrpcClient.getPublishedBlocks())
137+
.processedBlocks(publishStreamGrpcServer.getProcessedBlocks())
151138
.consumedBlocks(consumerStreamGrpcClient.getConsumedBlocks())
152-
.lastKnownPublisherStatuses(publishStreamGrpcClient.getLastKnownStatuses())
139+
.lastKnownPublisherClientStatuses(publishStreamGrpcClient.getLastKnownStatuses())
140+
.lastKnownPublisherServerStatuses(publishStreamGrpcServer.getLastKnownStatuses())
153141
.lastKnownConsumersStatuses(consumerStreamGrpcClient.getLastKnownStatuses())
154142
.build();
155143
}

simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulatorInjectionComponent.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.hedera.block.simulator.generator.GeneratorInjectionModule;
66
import com.hedera.block.simulator.grpc.GrpcInjectionModule;
77
import com.hedera.block.simulator.metrics.MetricsInjectionModule;
8+
import com.hedera.block.simulator.mode.SimulatorModeInjectionModule;
89
import com.swirlds.config.api.Configuration;
910
import dagger.BindsInstance;
1011
import dagger.Component;
@@ -18,6 +19,7 @@
1819
ConfigInjectionModule.class,
1920
GeneratorInjectionModule.class,
2021
GrpcInjectionModule.class,
22+
SimulatorModeInjectionModule.class
2123
})
2224
public interface BlockStreamSimulatorInjectionComponent {
2325

simulator/src/main/java/com/hedera/block/simulator/config/data/BlockStreamConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
*/
2020
@ConfigData("blockStream")
2121
public record BlockStreamConfig(
22-
@ConfigProperty(defaultValue = "PUBLISHER") SimulatorMode simulatorMode,
22+
@ConfigProperty(defaultValue = "PUBLISHER_SERVER") SimulatorMode simulatorMode,
2323
@ConfigProperty(defaultValue = "10") int lastKnownStatusesCapacity,
2424
@ConfigProperty(defaultValue = "1_500_000") int delayBetweenBlockItems,
2525
@ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream,
@@ -40,7 +40,7 @@ public static Builder builder() {
4040
* A builder for creating instances of {@link BlockStreamConfig}.
4141
*/
4242
public static class Builder {
43-
private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER;
43+
private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER_CLIENT;
4444
private int lastKnownStatusesCapacity = 10;
4545
private int delayBetweenBlockItems = 1_500_000;
4646
private int maxBlockItemsToStream = 10_000;

simulator/src/main/java/com/hedera/block/simulator/config/data/StreamStatus.java

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,19 @@
1111
/**
1212
* Represents the status of the stream.
1313
*
14-
* @param publishedBlocks the number of published blocks
14+
* @param publishedBlocks the number of published blocks from publish client
15+
* @param processedBlocks the number of processed blocks from publish server
1516
* @param consumedBlocks the number of consumed blocks
16-
* @param lastKnownPublisherStatuses the last known publisher statuses
17+
* @param lastKnownPublisherClientStatuses the last known statuses from publish client
18+
* @param lastKnownPublisherServerStatuses the last known statuses from publish server
1719
* @param lastKnownConsumersStatuses the last known consumers statuses
1820
*/
1921
public record StreamStatus(
2022
long publishedBlocks,
23+
long processedBlocks,
2124
long consumedBlocks,
22-
Deque<String> lastKnownPublisherStatuses,
25+
Deque<String> lastKnownPublisherClientStatuses,
26+
Deque<String> lastKnownPublisherServerStatuses,
2327
Deque<String> lastKnownConsumersStatuses) {
2428

2529
/**
@@ -36,8 +40,10 @@ public static Builder builder() {
3640
*/
3741
public static class Builder {
3842
private long publishedBlocks = 0;
43+
private long processedBlocks = 0;
3944
private long consumedBlocks = 0;
40-
private Deque<String> lastKnownPublisherStatuses = new ArrayDeque<>();
45+
private Deque<String> lastKnownPublisherClientStatuses = new ArrayDeque<>();
46+
private Deque<String> lastKnownPublisherServerStatuses = new ArrayDeque<>();
4147
private Deque<String> lastKnownConsumersStatuses = new ArrayDeque<>();
4248

4349
/**
@@ -48,7 +54,7 @@ public Builder() {
4854
}
4955

5056
/**
51-
* Sets the number of published blocks.
57+
* Sets the number of published blocks by publish client.
5258
*
5359
* @param publishedBlocks the number of published blocks
5460
* @return the builder instance
@@ -59,6 +65,18 @@ public Builder publishedBlocks(long publishedBlocks) {
5965
return this;
6066
}
6167

68+
/**
69+
* Sets the number of processed blocks by publish server.
70+
*
71+
* @param processedBlocks the number of processed blocks by publish server.
72+
* @return the builder instance
73+
*/
74+
public Builder processedBlocks(long processedBlocks) {
75+
requireWhole(processedBlocks);
76+
this.processedBlocks = processedBlocks;
77+
return this;
78+
}
79+
6280
/**
6381
* Sets the number of consumed blocks.
6482
*
@@ -72,14 +90,26 @@ public Builder consumedBlocks(long consumedBlocks) {
7290
}
7391

7492
/**
75-
* Sets the last known publisher statuses.
93+
* Sets the last known publisher client statuses.
94+
*
95+
* @param lastKnownPublisherClientStatuses the last known publisher statuses from publish client
96+
* @return the builder instance
97+
*/
98+
public Builder lastKnownPublisherClientStatuses(List<String> lastKnownPublisherClientStatuses) {
99+
requireNonNull(lastKnownPublisherClientStatuses);
100+
this.lastKnownPublisherClientStatuses = new ArrayDeque<>(lastKnownPublisherClientStatuses);
101+
return this;
102+
}
103+
104+
/**
105+
* Sets the last known publisher server statuses.
76106
*
77-
* @param lastKnownPublisherStatuses the last known publisher statuses
107+
* @param lastKnownPublisherServerStatuses the last known publisher statuses from publish server
78108
* @return the builder instance
79109
*/
80-
public Builder lastKnownPublisherStatuses(List<String> lastKnownPublisherStatuses) {
81-
requireNonNull(lastKnownPublisherStatuses);
82-
this.lastKnownPublisherStatuses = new ArrayDeque<>(lastKnownPublisherStatuses);
110+
public Builder lastKnownPublisherServerStatuses(List<String> lastKnownPublisherServerStatuses) {
111+
requireNonNull(lastKnownPublisherServerStatuses);
112+
this.lastKnownPublisherServerStatuses = new ArrayDeque<>(lastKnownPublisherServerStatuses);
83113
return this;
84114
}
85115

@@ -102,7 +132,12 @@ public Builder lastKnownConsumersStatuses(List<String> lastKnownConsumersStatuse
102132
*/
103133
public StreamStatus build() {
104134
return new StreamStatus(
105-
publishedBlocks, consumedBlocks, lastKnownPublisherStatuses, lastKnownConsumersStatuses);
135+
publishedBlocks,
136+
processedBlocks,
137+
consumedBlocks,
138+
lastKnownPublisherClientStatuses,
139+
lastKnownPublisherServerStatuses,
140+
lastKnownConsumersStatuses);
106141
}
107142
}
108143
}

simulator/src/main/java/com/hedera/block/simulator/config/types/SimulatorMode.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,16 @@
33

44
/** The SimulatorMode enum defines the work modes of the block stream simulator. */
55
public enum SimulatorMode {
6-
/**
7-
* Indicates a work mode in which the simulator is working as both consumer and publisher.
8-
*/
9-
BOTH,
106
/**
117
* Indicates a work mode in which the simulator is working in consumer mode.
128
*/
139
CONSUMER,
10+
/**
11+
* Indicates a work mode in which the simulator is working as both consumer and publisher.
12+
*/
13+
PUBLISHER_SERVER,
1414
/**
1515
* Indicates a work mode in which the simulator is working in publisher mode.
1616
*/
17-
PUBLISHER
17+
PUBLISHER_CLIENT
1818
}

simulator/src/main/java/com/hedera/block/simulator/grpc/GrpcInjectionModule.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import com.hedera.block.simulator.grpc.impl.ConsumerStreamGrpcClientImpl;
55
import com.hedera.block.simulator.grpc.impl.PublishStreamGrpcClientImpl;
6+
import com.hedera.block.simulator.grpc.impl.PublishStreamGrpcServerImpl;
67
import dagger.Binds;
78
import dagger.Module;
89
import dagger.Provides;
@@ -33,6 +34,16 @@ public interface GrpcInjectionModule {
3334
@Binds
3435
ConsumerStreamGrpcClient bindConsumerStreamGrpcClient(ConsumerStreamGrpcClientImpl consumerStreamGrpcClient);
3536

37+
/**
38+
* Binds the PublishStreamGrpcServer to the PublishStreamGrpcServerImpl.
39+
*
40+
* @param PublishStreamGrpcServer the PublishStreamGrpcServerImpl
41+
* @return the ConsumerStreamGrpcClient
42+
*/
43+
@Singleton
44+
@Binds
45+
PublishStreamGrpcServer bindPublishStreamGrpcServer(PublishStreamGrpcServerImpl PublishStreamGrpcServer);
46+
3647
/**
3748
* Provides the stream enabled flag
3849
*
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package com.hedera.block.simulator.grpc;
3+
4+
import java.util.List;
5+
6+
public interface PublishStreamGrpcServer {
7+
/**
8+
* Initialize, opens a gRPC channel and creates the needed services with the passed configuration.
9+
*/
10+
void init();
11+
12+
/**
13+
* Starts the gRPC server.
14+
*/
15+
void start();
16+
17+
/**
18+
* Gets the number of processed blocks.
19+
*
20+
* @return the number of published blocks
21+
*/
22+
long getProcessedBlocks();
23+
24+
/**
25+
* Gets the last known statuses.
26+
*
27+
* @return the last known statuses
28+
*/
29+
List<String> getLastKnownStatuses();
30+
31+
/**
32+
* Shutdowns the server.
33+
*
34+
* @throws InterruptedException if the thread is interrupted
35+
*/
36+
void shutdown() throws InterruptedException;
37+
}

0 commit comments

Comments
 (0)