47
47
@ ExtendWith (MockitoExtension .class )
48
48
public class ProducerBlockItemObserverTest {
49
49
50
- private final Object lock = new Object ();
51
-
52
50
@ Mock private ItemAckBuilder itemAckBuilder ;
53
51
@ Mock private StreamMediator <BlockItem , ObjectEvent <SubscribeStreamResponse >> streamMediator ;
54
52
@ Mock private StreamObserver <PublishStreamResponse > publishStreamResponseObserver ;
@@ -63,11 +61,10 @@ public class ProducerBlockItemObserverTest {
63
61
@ Mock private ServiceStatus serviceStatus ;
64
62
65
63
@ Test
66
- public void testProducerOnNext ()
67
- throws InterruptedException , IOException , NoSuchAlgorithmException {
64
+ public void testProducerOnNext () throws IOException , NoSuchAlgorithmException {
68
65
69
- List <BlockItem > blockItems = generateBlockItems (1 );
70
- ProducerBlockItemObserver producerBlockItemObserver =
66
+ final List <BlockItem > blockItems = generateBlockItems (1 );
67
+ final ProducerBlockItemObserver producerBlockItemObserver =
71
68
new ProducerBlockItemObserver (
72
69
streamMediator ,
73
70
publishStreamResponseObserver ,
@@ -76,16 +73,12 @@ public void testProducerOnNext()
76
73
77
74
when (serviceStatus .isRunning ()).thenReturn (true );
78
75
79
- BlockItem blockHeader = blockItems .getFirst ();
80
- PublishStreamRequest publishStreamRequest =
76
+ final BlockItem blockHeader = blockItems .getFirst ();
77
+ final PublishStreamRequest publishStreamRequest =
81
78
PublishStreamRequest .newBuilder ().setBlockItem (blockHeader ).build ();
82
79
producerBlockItemObserver .onNext (publishStreamRequest );
83
80
84
- synchronized (lock ) {
85
- lock .wait (50 );
86
- }
87
-
88
- verify (streamMediator , times (1 )).publishEvent (blockHeader );
81
+ verify (streamMediator , timeout (50 ).times (1 )).publishEvent (blockHeader );
89
82
90
83
final ItemAcknowledgement itemAck =
91
84
ItemAcknowledgement .newBuilder ()
@@ -95,12 +88,12 @@ public void testProducerOnNext()
95
88
BlockStreamService .PublishStreamResponse .newBuilder ()
96
89
.setAcknowledgement (itemAck )
97
90
.build ();
98
- verify (publishStreamResponseObserver , times (1 )).onNext (publishStreamResponse );
91
+ verify (publishStreamResponseObserver , timeout ( 50 ). times (1 )).onNext (publishStreamResponse );
99
92
100
93
// Helidon will call onCompleted after onNext
101
94
producerBlockItemObserver .onCompleted ();
102
95
103
- verify (publishStreamResponseObserver , times (1 )).onCompleted ();
96
+ verify (publishStreamResponseObserver , timeout ( 50 ). times (1 )).onCompleted ();
104
97
}
105
98
106
99
@ Test
@@ -160,18 +153,14 @@ public void testProducerToManyConsumers() throws IOException, InterruptedExcepti
160
153
new ItemAckBuilder (),
161
154
serviceStatus );
162
155
163
- PublishStreamRequest publishStreamRequest =
156
+ final PublishStreamRequest publishStreamRequest =
164
157
PublishStreamRequest .newBuilder ().setBlockItem (blockItem ).build ();
165
158
producerBlockItemObserver .onNext (publishStreamRequest );
166
159
167
- synchronized (lock ) {
168
- lock .wait (50 );
169
- }
170
-
171
160
// Confirm each subscriber was notified of the new block
172
- verify (streamObserver1 , times (1 )).onNext (subscribeStreamResponse );
173
- verify (streamObserver2 , times (1 )).onNext (subscribeStreamResponse );
174
- verify (streamObserver3 , times (1 )).onNext (subscribeStreamResponse );
161
+ verify (streamObserver1 , timeout ( 50 ). times (1 )).onNext (subscribeStreamResponse );
162
+ verify (streamObserver2 , timeout ( 50 ). times (1 )).onNext (subscribeStreamResponse );
163
+ verify (streamObserver3 , timeout ( 50 ). times (1 )).onNext (subscribeStreamResponse );
175
164
176
165
// Confirm the BlockStorage write method was
177
166
// called despite the absence of subscribers
@@ -180,23 +169,22 @@ public void testProducerToManyConsumers() throws IOException, InterruptedExcepti
180
169
181
170
@ Test
182
171
public void testOnError () {
183
- ProducerBlockItemObserver producerBlockItemObserver =
172
+ final ProducerBlockItemObserver producerBlockItemObserver =
184
173
new ProducerBlockItemObserver (
185
174
streamMediator ,
186
175
publishStreamResponseObserver ,
187
176
new ItemAckBuilder (),
188
177
serviceStatus );
189
178
190
- Throwable t = new Throwable ("Test error" );
179
+ final Throwable t = new Throwable ("Test error" );
191
180
producerBlockItemObserver .onError (t );
192
181
verify (publishStreamResponseObserver ).onError (t );
193
182
}
194
183
195
184
@ Test
196
- public void testItemAckBuilderExceptionTest ()
197
- throws InterruptedException , IOException , NoSuchAlgorithmException {
185
+ public void testItemAckBuilderExceptionTest () throws IOException , NoSuchAlgorithmException {
198
186
199
- ProducerBlockItemObserver producerBlockItemObserver =
187
+ final ProducerBlockItemObserver producerBlockItemObserver =
200
188
new ProducerBlockItemObserver (
201
189
streamMediator ,
202
190
publishStreamResponseObserver ,
@@ -207,16 +195,12 @@ public void testItemAckBuilderExceptionTest()
207
195
when (itemAckBuilder .buildAck (any ()))
208
196
.thenThrow (new NoSuchAlgorithmException ("Test exception" ));
209
197
210
- List <BlockItem > blockItems = generateBlockItems (1 );
211
- BlockItem blockHeader = blockItems .getFirst ();
212
- PublishStreamRequest publishStreamRequest =
198
+ final List <BlockItem > blockItems = generateBlockItems (1 );
199
+ final BlockItem blockHeader = blockItems .getFirst ();
200
+ final PublishStreamRequest publishStreamRequest =
213
201
PublishStreamRequest .newBuilder ().setBlockItem (blockHeader ).build ();
214
202
producerBlockItemObserver .onNext (publishStreamRequest );
215
203
216
- synchronized (lock ) {
217
- lock .wait (50 );
218
- }
219
-
220
204
final PublishStreamResponse .EndOfStream endOfStream =
221
205
PublishStreamResponse .EndOfStream .newBuilder ()
222
206
.setStatus (
@@ -225,6 +209,6 @@ public void testItemAckBuilderExceptionTest()
225
209
.build ();
226
210
final PublishStreamResponse errorResponse =
227
211
PublishStreamResponse .newBuilder ().setStatus (endOfStream ).build ();
228
- verify (publishStreamResponseObserver , times (1 )).onNext (errorResponse );
212
+ verify (publishStreamResponseObserver , timeout ( 50 ). times (1 )).onNext (errorResponse );
229
213
}
230
214
}
0 commit comments