46
46
import java .util .concurrent .atomic .AtomicInteger ;
47
47
import java .util .concurrent .atomic .AtomicLong ;
48
48
import java .util .function .BiFunction ;
49
+ import java .util .function .Predicate ;
49
50
import java .util .function .Supplier ;
50
51
import java .util .logging .Level ;
51
52
import java .util .stream .Collectors ;
@@ -247,7 +248,7 @@ default void requestChannel1() {
247
248
.requestChannel (Mono .just (createTestPayload (0 )))
248
249
.doOnNext (Payload ::release )
249
250
.as (StepVerifier ::create )
250
- .expectNextCount ( 1 )
251
+ .thenConsumeWhile ( new PayloadPredicate ( 1 ) )
251
252
.expectComplete ()
252
253
.verify (getTimeout ());
253
254
}
@@ -262,7 +263,7 @@ default void requestChannel200_000() {
262
263
.doOnNext (Payload ::release )
263
264
.limitRate (8 )
264
265
.as (StepVerifier ::create )
265
- .expectNextCount ( 200_000 )
266
+ .thenConsumeWhile ( new PayloadPredicate ( 200_000 ) )
266
267
.expectComplete ()
267
268
.verify (getTimeout ());
268
269
}
@@ -276,7 +277,7 @@ default void largePayloadRequestChannel50() {
276
277
.requestChannel (payloads )
277
278
.doOnNext (Payload ::release )
278
279
.as (StepVerifier ::create )
279
- .expectNextCount ( 50 )
280
+ .thenConsumeWhile ( new PayloadPredicate ( 50 ) )
280
281
.expectComplete ()
281
282
.verify (getTimeout ());
282
283
}
@@ -291,7 +292,7 @@ default void requestChannel20_000() {
291
292
.doOnNext (this ::assertChannelPayload )
292
293
.doOnNext (Payload ::release )
293
294
.as (StepVerifier ::create )
294
- .expectNextCount ( 20_000 )
295
+ .thenConsumeWhile ( new PayloadPredicate ( 20_000 ) )
295
296
.expectComplete ()
296
297
.verify (getTimeout ());
297
298
}
@@ -306,7 +307,7 @@ default void requestChannel2_000_000() {
306
307
.doOnNext (Payload ::release )
307
308
.limitRate (8 )
308
309
.as (StepVerifier ::create )
309
- .expectNextCount ( 2_000_000 )
310
+ .thenConsumeWhile ( new PayloadPredicate ( 2_000_000 ) )
310
311
.expectComplete ()
311
312
.verify (getTimeout ());
312
313
}
@@ -322,7 +323,7 @@ default void requestChannel3() {
322
323
.requestChannel (payloads )
323
324
.doOnNext (Payload ::release )
324
325
.as (publisher -> StepVerifier .create (publisher , 3 ))
325
- .expectNextCount ( 3 )
326
+ .thenConsumeWhile ( new PayloadPredicate ( 3 ) )
326
327
.expectComplete ()
327
328
.verify (getTimeout ());
328
329
@@ -358,7 +359,7 @@ default void check(Flux<Payload> payloads) {
358
359
.doOnNext (ReferenceCounted ::release )
359
360
.limitRate (8 )
360
361
.as (StepVerifier ::create )
361
- .expectNextCount ( 256 )
362
+ .thenConsumeWhile ( new PayloadPredicate ( 256 ) )
362
363
.as ("expected 256 items" )
363
364
.expectComplete ()
364
365
.verify (getTimeout ());
@@ -958,4 +959,27 @@ public String toString() {
958
959
}
959
960
}
960
961
}
962
+
963
+ class PayloadPredicate implements Predicate <Payload > {
964
+ final int expectedCnt ;
965
+ int cnt ;
966
+
967
+ public PayloadPredicate (int expectedCnt ) {
968
+ this .expectedCnt = expectedCnt ;
969
+ }
970
+
971
+ @ Override
972
+ public boolean test (Payload p ) {
973
+ boolean shouldConsume = cnt ++ < expectedCnt ;
974
+ if (!shouldConsume ) {
975
+ logger .info (
976
+ "Metadata: \n \r {}\n \r Data:{}" ,
977
+ p .hasMetadata ()
978
+ ? new ByteBufRepresentation ().fallbackToStringOf (p .sliceMetadata ())
979
+ : "Empty" ,
980
+ new ByteBufRepresentation ().fallbackToStringOf (p .sliceData ()));
981
+ }
982
+ return shouldConsume ;
983
+ }
984
+ }
961
985
}
0 commit comments