Skip to content

Commit e595712

Browse files
fix: removed interface
Signed-off-by: Matt Peterson <matt.peterson@swirldslabs.com>
1 parent f17a7f6 commit e595712

File tree

6 files changed

+20
-55
lines changed

6 files changed

+20
-55
lines changed

server/src/main/java/com/hedera/block/server/consumer/BlockItemEventHandler.java

Lines changed: 0 additions & 23 deletions
This file was deleted.

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.hedera.block.server.data.ObjectEvent;
2323
import com.hedera.block.server.mediator.StreamMediator;
24+
import com.lmax.disruptor.EventHandler;
2425
import io.grpc.stub.ServerCallStreamObserver;
2526
import io.grpc.stub.StreamObserver;
2627
import java.time.InstantSource;
@@ -33,7 +34,7 @@
3334
* can invoke the onEvent() method when a new SubscribeStreamResponse is available.
3435
*/
3536
public class ConsumerBlockItemObserver
36-
implements BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> {
37+
implements EventHandler<ObjectEvent<SubscribeStreamResponse>> {
3738

3839
private final System.Logger LOGGER = System.getLogger(getClass().getName());
3940

@@ -143,11 +144,4 @@ public void onEvent(
143144
}
144145
}
145146
}
146-
147-
@Override
148-
public void awaitShutdown() {
149-
if (!isResponsePermitted.get()) {
150-
LOGGER.log(System.Logger.Level.INFO, "ConsumerBlockItemObserver shutting down...");
151-
}
152-
}
153147
}

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020

2121
import com.hedera.block.server.ServiceStatus;
2222
import com.hedera.block.server.ServiceStatusImpl;
23-
import com.hedera.block.server.consumer.BlockItemEventHandler;
2423
import com.hedera.block.server.data.ObjectEvent;
2524
import com.hedera.block.server.persistence.BlockPersistenceHandler;
2625
import com.lmax.disruptor.BatchEventProcessor;
2726
import com.lmax.disruptor.BatchEventProcessorBuilder;
27+
import com.lmax.disruptor.EventHandler;
2828
import com.lmax.disruptor.RingBuffer;
2929
import com.lmax.disruptor.dsl.Disruptor;
3030
import com.lmax.disruptor.util.DaemonThreadFactory;
@@ -49,7 +49,7 @@ public class LiveStreamMediatorImpl
4949
private final ExecutorService executor;
5050

5151
private final Map<
52-
BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>>,
52+
EventHandler<ObjectEvent<SubscribeStreamResponse>>,
5353
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
5454
subscribers;
5555

@@ -63,7 +63,7 @@ public class LiveStreamMediatorImpl
6363
*/
6464
public LiveStreamMediatorImpl(
6565
final Map<
66-
BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>>,
66+
EventHandler<ObjectEvent<SubscribeStreamResponse>>,
6767
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
6868
subscribers,
6969
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
@@ -135,8 +135,7 @@ public void publishEvent(final BlockItem blockItem) throws IOException {
135135
}
136136

137137
@Override
138-
public void subscribe(
139-
final BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {
138+
public void subscribe(final EventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {
140139

141140
// Initialize the batch event processor and set it on the ring buffer
142141
final var batchEventProcessor =
@@ -151,11 +150,7 @@ public void subscribe(
151150
}
152151

153152
@Override
154-
public void unsubscribe(
155-
final BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {
156-
157-
// Shutdown the handler
158-
handler.awaitShutdown();
153+
public void unsubscribe(final EventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {
159154

160155
// Remove the subscriber
161156
final var batchEventProcessor = subscribers.remove(handler);
@@ -168,8 +163,7 @@ public void unsubscribe(
168163
}
169164

170165
@Override
171-
public boolean isSubscribed(
172-
BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {
166+
public boolean isSubscribed(EventHandler<ObjectEvent<SubscribeStreamResponse>> handler) {
173167
return subscribers.containsKey(handler);
174168
}
175169

server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.hedera.block.server.mediator;
1818

19-
import com.hedera.block.server.consumer.BlockItemEventHandler;
19+
import com.lmax.disruptor.EventHandler;
2020
import java.io.IOException;
2121

2222
/**
@@ -38,9 +38,9 @@ public interface StreamMediator<U, V> {
3838
*/
3939
void publishEvent(final U blockItem) throws IOException;
4040

41-
void subscribe(final BlockItemEventHandler<V> handler);
41+
void subscribe(final EventHandler<V> handler);
4242

43-
void unsubscribe(final BlockItemEventHandler<V> handler);
43+
void unsubscribe(final EventHandler<V> handler);
4444

45-
boolean isSubscribed(final BlockItemEventHandler<V> handler);
45+
boolean isSubscribed(final EventHandler<V> handler);
4646
}

server/src/test/java/com/hedera/block/server/BlockStreamServiceIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import static org.junit.jupiter.api.Assertions.assertTrue;
2424
import static org.mockito.Mockito.*;
2525

26-
import com.hedera.block.server.consumer.BlockItemEventHandler;
2726
import com.hedera.block.server.data.ObjectEvent;
2827
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
2928
import com.hedera.block.server.mediator.StreamMediator;
@@ -33,6 +32,7 @@
3332
import com.hedera.block.server.producer.ItemAckBuilder;
3433
import com.hedera.block.server.util.TestUtils;
3534
import com.lmax.disruptor.BatchEventProcessor;
35+
import com.lmax.disruptor.EventHandler;
3636
import io.grpc.stub.StreamObserver;
3737
import io.helidon.config.Config;
3838
import io.helidon.config.MapConfigSource;
@@ -301,7 +301,7 @@ public void testSubAndUnsubWhileStreaming() throws IOException, InterruptedExcep
301301
int numberOfBlocks = 100;
302302

303303
final LinkedHashMap<
304-
BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>>,
304+
EventHandler<ObjectEvent<SubscribeStreamResponse>>,
305305
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
306306
subscribers = new LinkedHashMap<>();
307307
final var streamMediator = buildStreamMediator(subscribers, Util.defaultPerms);
@@ -387,7 +387,7 @@ public void testSubAndUnsubWhileStreaming() throws IOException, InterruptedExcep
387387
public void testMediatorExceptionHandlingWhenPersistenceFailure()
388388
throws IOException, InterruptedException {
389389
final Map<
390-
BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>>,
390+
EventHandler<ObjectEvent<SubscribeStreamResponse>>,
391391
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
392392
subscribers = new ConcurrentHashMap<>();
393393

@@ -552,7 +552,7 @@ private StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> buildStr
552552

553553
private StreamMediator<BlockItem, ObjectEvent<SubscribeStreamResponse>> buildStreamMediator(
554554
final Map<
555-
BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>>,
555+
EventHandler<ObjectEvent<SubscribeStreamResponse>>,
556556
BatchEventProcessor<ObjectEvent<SubscribeStreamResponse>>>
557557
subscribers,
558558
final FileAttribute<Set<PosixFilePermission>> filePerms)

server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import static org.junit.jupiter.api.Assertions.*;
2222
import static org.mockito.Mockito.*;
2323

24-
import com.hedera.block.server.consumer.BlockItemEventHandler;
2524
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
2625
import com.hedera.block.server.data.ObjectEvent;
2726
import com.hedera.block.server.persistence.BlockPersistenceHandler;
2827
import com.hedera.block.server.persistence.FileSystemPersistenceHandler;
2928
import com.hedera.block.server.persistence.storage.BlockReader;
3029
import com.hedera.block.server.persistence.storage.BlockWriter;
30+
import com.lmax.disruptor.EventHandler;
3131
import io.grpc.stub.ServerCallStreamObserver;
3232
import io.grpc.stub.StreamObserver;
3333
import java.io.IOException;
@@ -41,9 +41,9 @@
4141
@ExtendWith(MockitoExtension.class)
4242
public class LiveStreamMediatorImplTest {
4343

44-
@Mock private BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> observer1;
45-
@Mock private BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> observer2;
46-
@Mock private BlockItemEventHandler<ObjectEvent<SubscribeStreamResponse>> observer3;
44+
@Mock private EventHandler<ObjectEvent<SubscribeStreamResponse>> observer1;
45+
@Mock private EventHandler<ObjectEvent<SubscribeStreamResponse>> observer2;
46+
@Mock private EventHandler<ObjectEvent<SubscribeStreamResponse>> observer3;
4747

4848
@Mock private BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler;
4949
@Mock private BlockReader<Block> blockReader;

0 commit comments

Comments
 (0)