Skip to content

Commit d7825b8

Browse files
Removed BlockPersistenceHandler interface and associated materials
Signed-off-by: Matt Peterson <matt.peterson@swirldslabs.com>
1 parent 7066a7a commit d7825b8

17 files changed

+141
-543
lines changed

server/src/main/java/com/hedera/block/server/BlockStreamService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
2424
import com.hedera.block.server.data.ObjectEvent;
2525
import com.hedera.block.server.mediator.StreamMediator;
26-
import com.hedera.block.server.persistence.BlockPersistenceHandler;
26+
import com.hedera.block.server.persistence.storage.BlockReader;
2727
import com.hedera.block.server.producer.ItemAckBuilder;
2828
import com.hedera.block.server.producer.ProducerBlockItemObserver;
2929
import io.grpc.stub.StreamObserver;
@@ -50,18 +50,18 @@ public class BlockStreamService implements GrpcService {
5050
private final ItemAckBuilder itemAckBuilder;
5151
private final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator;
5252
private final ServiceStatus serviceStatus;
53-
private final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler;
53+
private final BlockReader<Block> blockReader;
5454

5555
BlockStreamService(
5656
final long timeoutThresholdMillis,
5757
final ItemAckBuilder itemAckBuilder,
5858
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
59-
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
59+
final BlockReader<Block> blockReader,
6060
final ServiceStatus serviceStatus) {
6161
this.timeoutThresholdMillis = timeoutThresholdMillis;
6262
this.itemAckBuilder = itemAckBuilder;
6363
this.streamMediator = streamMediator;
64-
this.blockPersistenceHandler = blockPersistenceHandler;
64+
this.blockReader = blockReader;
6565
this.serviceStatus = serviceStatus;
6666
}
6767

@@ -144,7 +144,7 @@ void singleBlock(
144144
if (serviceStatus.isRunning()) {
145145
final long blockNumber = singleBlockRequest.getBlockNumber();
146146
try {
147-
final Optional<Block> blockOpt = blockPersistenceHandler.retrieve(blockNumber);
147+
final Optional<Block> blockOpt = blockReader.read(blockNumber);
148148
if (blockOpt.isPresent()) {
149149
LOGGER.log(
150150
System.Logger.Level.INFO,

server/src/main/java/com/hedera/block/server/Server.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import com.hedera.block.server.data.ObjectEvent;
2323
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
2424
import com.hedera.block.server.mediator.StreamMediator;
25-
import com.hedera.block.server.persistence.BlockPersistenceHandler;
26-
import com.hedera.block.server.persistence.FileSystemPersistenceHandler;
2725
import com.hedera.block.server.persistence.storage.BlockAsDirReader;
2826
import com.hedera.block.server.persistence.storage.BlockAsDirWriter;
27+
import com.hedera.block.server.persistence.storage.BlockReader;
28+
import com.hedera.block.server.persistence.storage.BlockWriter;
2929
import com.hedera.block.server.producer.ItemAckBuilder;
3030
import io.grpc.stub.ServerCalls;
3131
import io.grpc.stub.StreamObserver;
@@ -70,15 +70,16 @@ public static void main(final String[] args) {
7070

7171
try {
7272
final ServiceStatus serviceStatus = new ServiceStatusImpl();
73-
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler =
74-
new FileSystemPersistenceHandler(
75-
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config),
76-
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config));
7773
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator =
78-
buildStreamMediator(serviceStatus, blockPersistenceHandler);
74+
buildStreamMediator(
75+
serviceStatus,
76+
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config));
7977
final BlockStreamService blockStreamService =
8078
buildBlockStreamService(
81-
config, streamMediator, blockPersistenceHandler, serviceStatus);
79+
config,
80+
streamMediator,
81+
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config),
82+
serviceStatus);
8283
final GrpcRouting.Builder grpcRouting = buildGrpcRouting(blockStreamService);
8384

8485
// Build the web server
@@ -97,11 +98,9 @@ public static void main(final String[] args) {
9798

9899
private static StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>>
99100
buildStreamMediator(
100-
final ServiceStatus serviceStatus,
101-
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler)
101+
final ServiceStatus serviceStatus, final BlockWriter<BlockItem> blockWriter)
102102
throws IOException {
103-
return new LiveStreamMediatorImpl(
104-
new ConcurrentHashMap<>(32), blockPersistenceHandler, serviceStatus);
103+
return new LiveStreamMediatorImpl(new ConcurrentHashMap<>(32), blockWriter, serviceStatus);
105104
}
106105

107106
private static GrpcRouting.Builder buildGrpcRouting(
@@ -129,7 +128,7 @@ private static GrpcRouting.Builder buildGrpcRouting(
129128
private static BlockStreamService buildBlockStreamService(
130129
final Config config,
131130
final StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> streamMediator,
132-
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
131+
final BlockReader<Block> blockReader,
133132
final ServiceStatus serviceStatus) {
134133

135134
// Get Timeout threshold from configuration
@@ -140,7 +139,7 @@ private static BlockStreamService buildBlockStreamService(
140139
consumerTimeoutThreshold,
141140
new ItemAckBuilder(),
142141
streamMediator,
143-
blockPersistenceHandler,
142+
blockReader,
144143
serviceStatus);
145144
}
146145
}

server/src/main/java/com/hedera/block/server/ServiceStatus.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,32 @@
1818

1919
import io.helidon.webserver.WebServer;
2020

21+
/**
22+
* The ServiceStatus interface defines the contract for checking the status of the service and
23+
* shutting down the web server.
24+
*/
2125
public interface ServiceStatus {
26+
27+
/**
28+
* Checks if the service is running.
29+
*
30+
* @return true if the service is running, false otherwise
31+
*/
2232
boolean isRunning();
2333

34+
/**
35+
* Sets the running status of the service.
36+
*
37+
* @param running true if the service is running, false otherwise
38+
*/
2439
void setRunning(final boolean running);
2540

41+
/** Sets the web server instance. */
2642
void setWebServer(final WebServer webServer);
2743

28-
void stopService();
44+
/**
45+
* Stops the service and web server. This method is called to shut down the service and the web
46+
* server in the event of an error or when the service needs to restart.
47+
*/
48+
void stopWebServer();
2949
}

server/src/main/java/com/hedera/block/server/ServiceStatusImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public void setWebServer(final WebServer webServer) {
3535
this.webServer = webServer;
3636
}
3737

38-
public void stopService() {
38+
public void stopWebServer() {
3939
isRunning.set(false);
4040
webServer.stop();
4141
}

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@
1616

1717
package com.hedera.block.server.mediator;
1818

19-
import static com.hedera.block.protos.BlockStreamService.*;
19+
import static com.hedera.block.protos.BlockStreamService.BlockItem;
20+
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
2021

2122
import com.hedera.block.server.ServiceStatus;
2223
import com.hedera.block.server.ServiceStatusImpl;
2324
import com.hedera.block.server.data.ObjectEvent;
24-
import com.hedera.block.server.persistence.BlockPersistenceHandler;
25+
import com.hedera.block.server.persistence.storage.BlockWriter;
2526
import com.lmax.disruptor.BatchEventProcessor;
2627
import com.lmax.disruptor.BatchEventProcessorBuilder;
2728
import com.lmax.disruptor.EventHandler;
@@ -37,7 +38,7 @@
3738
/**
3839
* LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible
3940
* for managing the subscribe and unsubscribe operations of downstream consumers. It also proxies
40-
* live blocks to the subscribers as they arrive and persists the blocks to the block persistence
41+
* block items to the subscribers as they arrive and persists the blocks to a block persistence
4142
* store.
4243
*/
4344
public class LiveStreamMediatorImpl
@@ -53,24 +54,27 @@ public class LiveStreamMediatorImpl
5354
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
5455
subscribers;
5556

56-
private final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler;
57+
private final BlockWriter<BlockItem> blockWriter;
5758
private final ServiceStatus serviceStatus;
5859

5960
/**
60-
* Constructor for the LiveStreamMediatorImpl class.
61+
* Constructs a new LiveStreamMediatorImpl instance with the given subscribers, block
62+
* persistence handler, and service status.
6163
*
62-
* @param blockPersistenceHandler the block persistence handler
64+
* @param subscribers the map of subscribers to their corresponding representation batch event
65+
* processor in the LMAX Disruptor data structure.
66+
* @param serviceStatus the service status singleton instance
6367
*/
6468
public LiveStreamMediatorImpl(
6569
final Map<
6670
EventHandler<ObjectEvent<SubscribeStreamResponse>>,
6771
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
6872
subscribers,
69-
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
73+
final BlockWriter<BlockItem> blockWriter,
7074
final ServiceStatus serviceStatus) {
7175

7276
this.subscribers = subscribers;
73-
this.blockPersistenceHandler = blockPersistenceHandler;
77+
this.blockWriter = blockWriter;
7478

7579
// Initialize and start the disruptor
7680
final Disruptor<ObjectEvent<SubscribeStreamResponse>> disruptor =
@@ -80,15 +84,13 @@ public LiveStreamMediatorImpl(
8084
this.serviceStatus = serviceStatus;
8185
}
8286

83-
public LiveStreamMediatorImpl(
84-
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler) {
85-
this(new ConcurrentHashMap<>(), blockPersistenceHandler, new ServiceStatusImpl());
87+
public LiveStreamMediatorImpl(final BlockWriter<BlockItem> blockWriter) {
88+
this(new ConcurrentHashMap<>(), blockWriter, new ServiceStatusImpl());
8689
}
8790

8891
public LiveStreamMediatorImpl(
89-
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
90-
final ServiceStatus serviceStatus) {
91-
this(new ConcurrentHashMap<>(), blockPersistenceHandler, serviceStatus);
92+
final BlockWriter<BlockItem> blockWriter, final ServiceStatus serviceStatus) {
93+
this(new ConcurrentHashMap<>(), blockWriter, serviceStatus);
9294
}
9395

9496
@Override
@@ -104,7 +106,7 @@ public void publish(final BlockItem blockItem) throws IOException {
104106

105107
try {
106108
// Persist the BlockItem
107-
blockPersistenceHandler.persist(blockItem);
109+
blockWriter.write(blockItem);
108110
} catch (IOException e) {
109111
// Disable BlockItem publication for upstream producers
110112
serviceStatus.setRunning(false);

server/src/main/java/com/hedera/block/server/mediator/EventPublisher.java renamed to server/src/main/java/com/hedera/block/server/mediator/Publisher.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,20 @@
1818

1919
import java.io.IOException;
2020

21-
public interface EventPublisher<U> {
22-
void publish(final U blockItem) throws IOException;
21+
/**
22+
* The Publisher interface defines the contract for publishing items emitted by the producer to
23+
* downstream subscribers.
24+
*
25+
* @param <U> the type of the item to publish
26+
*/
27+
public interface Publisher<U> {
28+
29+
/**
30+
* Publishes the given item to the downstream subscribers.
31+
*
32+
* @param item the item emitted by an upstream producer to publish to downstream subscribers.
33+
* @throws IOException thrown if an I/O error occurs while publishing the item to the
34+
* subscribers.
35+
*/
36+
void publish(final U item) throws IOException;
2337
}

server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717
package com.hedera.block.server.mediator;
1818

1919
/**
20-
* The StreamMediator interface represents a bridge between a bidirectional stream of items from a
21-
* producer (e.g. a Consensus Node) and N consumers each requesting a gRPC server stream of items.
20+
* The StreamMediator marker interface defines the combination of Publisher and SubscriptionHandler
21+
* contracts. It defines multiple views of the underlying implementation, allowing producers to
22+
* publish items while the service and downstream subscribers can manage which consumers are
23+
* subscribed to the stream of events.
2224
*
23-
* @param <U> The type of items sent by the upstream producer.
25+
* @param <U> the type of the item to publish
26+
* @param <V> the type of the event
2427
*/
25-
public interface StreamMediator<U, V> extends EventPublisher<U>, SubscriptionHandler<V> {}
28+
public interface StreamMediator<U, V> extends Publisher<U>, SubscriptionHandler<V> {}

server/src/main/java/com/hedera/block/server/mediator/SubscriptionHandler.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,32 @@
1919
import com.lmax.disruptor.EventHandler;
2020

2121
/**
22-
* The SubscriptionHandler interface represents a bridge between a producer and N consumers each
23-
* requesting a gRPC server stream of items. The SubscriptionHandler manages adding and removing
24-
* consumers dynamically to receive items as they arrive from upstream.
22+
* The SubscriptionHandler interface defines the contract for subscribing and unsubscribing
23+
* downstream consumers to the stream of events.
2524
*
26-
* @param <V> The type of the response published to the downstream consumers.
25+
* @param <V> the type of the subscription events
2726
*/
2827
public interface SubscriptionHandler<V> {
2928

29+
/**
30+
* Subscribes the given handler to the stream of items.
31+
*
32+
* @param handler the handler to subscribe
33+
*/
3034
void subscribe(final EventHandler<V> handler);
3135

36+
/**
37+
* Unsubscribes the given handler from the stream of events.
38+
*
39+
* @param handler the handler to unsubscribe
40+
*/
3241
void unsubscribe(final EventHandler<V> handler);
3342

43+
/**
44+
* Checks if the given handler is subscribed to the stream of events.
45+
*
46+
* @param handler the handler to check
47+
* @return true if the handler is subscribed, false otherwise
48+
*/
3449
boolean isSubscribed(final EventHandler<V> handler);
3550
}

server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java

Lines changed: 0 additions & 51 deletions
This file was deleted.

0 commit comments

Comments
 (0)