Skip to content

Commit 426f343

Browse files
authored
Fix deprecations after reactor upgrade (#355)
* Minor fixes at TcpReceiver and TcpChannelInitializer * Fixed deprecations
1 parent 2dd5f98 commit 426f343

File tree

15 files changed

+146
-173
lines changed

15 files changed

+146
-173
lines changed

cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,9 @@
4343
import reactor.core.Disposable;
4444
import reactor.core.Disposables;
4545
import reactor.core.Exceptions;
46-
import reactor.core.publisher.DirectProcessor;
4746
import reactor.core.publisher.Flux;
48-
import reactor.core.publisher.FluxSink;
4947
import reactor.core.publisher.Mono;
50-
import reactor.core.publisher.MonoProcessor;
48+
import reactor.core.publisher.Sinks;
5149
import reactor.core.scheduler.Scheduler;
5250
import reactor.core.scheduler.Schedulers;
5351

@@ -78,18 +76,17 @@ public final class ClusterImpl implements Cluster {
7876
private Function<Cluster, ? extends ClusterMessageHandler> handler =
7977
cluster -> new ClusterMessageHandler() {};
8078

81-
// Subject
82-
private final DirectProcessor<MembershipEvent> membershipEvents = DirectProcessor.create();
83-
private final FluxSink<MembershipEvent> membershipSink = membershipEvents.sink();
79+
// Sink
80+
private final Sinks.Many<MembershipEvent> sink = Sinks.many().multicast().directBestEffort();
8481

8582
// Disposables
8683
private final Disposable.Composite actionsDisposables = Disposables.composite();
8784

8885
// Lifecycle
89-
private final MonoProcessor<Void> start = MonoProcessor.create();
90-
private final MonoProcessor<Void> onStart = MonoProcessor.create();
91-
private final MonoProcessor<Void> shutdown = MonoProcessor.create();
92-
private final MonoProcessor<Void> onShutdown = MonoProcessor.create();
86+
private final Sinks.One<Void> start = Sinks.one();
87+
private final Sinks.One<Void> onStart = Sinks.one();
88+
private final Sinks.One<Void> shutdown = Sinks.one();
89+
private final Sinks.One<Void> onShutdown = Sinks.one();
9390

9491
// Cluster components
9592
private Transport transport;
@@ -119,14 +116,16 @@ private ClusterImpl(ClusterImpl that) {
119116

120117
private void initLifecycle() {
121118
start
119+
.asMono()
122120
.then(doStart())
123-
.doOnSuccess(avoid -> onStart.onComplete())
124-
.doOnError(onStart::onError)
121+
.doOnSuccess(c -> onStart.tryEmitEmpty())
122+
.doOnError(onStart::tryEmitError)
125123
.subscribe(null, th -> LOGGER.error("[{}][doStart] Exception occurred:", localMember, th));
126124

127125
shutdown
126+
.asMono()
128127
.then(doShutdown())
129-
.doFinally(s -> onShutdown.onComplete())
128+
.doFinally(s -> onShutdown.tryEmitEmpty())
130129
.subscribe(
131130
null,
132131
th ->
@@ -232,8 +231,8 @@ public ClusterImpl handler(Function<Cluster, ClusterMessageHandler> handler) {
232231
public Mono<Cluster> start() {
233232
return Mono.defer(
234233
() -> {
235-
start.onComplete();
236-
return onStart.thenReturn(this);
234+
start.tryEmitEmpty();
235+
return onStart.asMono().thenReturn(this);
237236
});
238237
}
239238

@@ -260,7 +259,7 @@ private Mono<Cluster> doStart0() {
260259
new FailureDetectorImpl(
261260
localMember,
262261
transport,
263-
membershipEvents.onBackpressureBuffer(),
262+
sink.asFlux().onBackpressureBuffer(),
264263
config.failureDetectorConfig(),
265264
scheduler,
266265
cidGenerator);
@@ -269,7 +268,7 @@ private Mono<Cluster> doStart0() {
269268
new GossipProtocolImpl(
270269
localMember,
271270
transport,
272-
membershipEvents.onBackpressureBuffer(),
271+
sink.asFlux().onBackpressureBuffer(),
273272
config.gossipConfig(),
274273
scheduler);
275274

@@ -295,7 +294,7 @@ private Mono<Cluster> doStart0() {
295294
.listen()
296295
/*.publishOn(scheduler)*/
297296
// Dont uncomment, already beign executed inside sc-cluster thread
298-
.subscribe(membershipSink::next, this::onError, membershipSink::complete));
297+
.subscribe(sink::tryEmitNext, this::onError, sink::tryEmitComplete));
299298

300299
return Mono.fromRunnable(() -> failureDetector.start())
301300
.then(Mono.fromRunnable(() -> gossip.start()))
@@ -373,7 +372,7 @@ private Flux<Message> listenGossip() {
373372

374373
private Flux<MembershipEvent> listenMembership() {
375374
// listen on live stream
376-
return membershipEvents.onBackpressureBuffer();
375+
return sink.asFlux().onBackpressureBuffer();
377376
}
378377

379378
/**
@@ -481,7 +480,7 @@ public <T> Mono<Void> updateMetadata(T metadata) {
481480

482481
@Override
483482
public void shutdown() {
484-
shutdown.onComplete();
483+
shutdown.tryEmitEmpty();
485484
}
486485

487486
private Mono<Void> doShutdown() {
@@ -524,12 +523,12 @@ private Mono<Void> dispose() {
524523

525524
@Override
526525
public Mono<Void> onShutdown() {
527-
return onShutdown;
526+
return onShutdown.asMono();
528527
}
529528

530529
@Override
531530
public boolean isShutdown() {
532-
return onShutdown.isDisposed();
531+
return onShutdown.asMono().toFuture().isDone();
533532
}
534533

535534
private static class SenderAwareTransport implements Transport {

cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
import org.slf4j.LoggerFactory;
2121
import reactor.core.Disposable;
2222
import reactor.core.Disposables;
23-
import reactor.core.publisher.DirectProcessor;
2423
import reactor.core.publisher.Flux;
25-
import reactor.core.publisher.FluxProcessor;
26-
import reactor.core.publisher.FluxSink;
24+
import reactor.core.publisher.Sinks;
2725
import reactor.core.scheduler.Scheduler;
2826

2927
public final class FailureDetectorImpl implements FailureDetector {
@@ -46,18 +44,15 @@ public final class FailureDetectorImpl implements FailureDetector {
4644
// State
4745

4846
private long currentPeriod = 0;
49-
private List<Member> pingMembers = new ArrayList<>();
5047
private int pingMemberIndex = 0; // index for sequential ping member selection
48+
private final List<Member> pingMembers = new ArrayList<>();
5149

5250
// Disposables
5351

5452
private final Disposable.Composite actionsDisposables = Disposables.composite();
5553

56-
// Subject
57-
private final FluxProcessor<FailureDetectorEvent, FailureDetectorEvent> subject =
58-
DirectProcessor.<FailureDetectorEvent>create().serialize();
59-
60-
private final FluxSink<FailureDetectorEvent> sink = subject.sink();
54+
// Sink
55+
private final Sinks.Many<FailureDetectorEvent> sink = Sinks.many().multicast().directBestEffort();
6156

6257
// Scheduled
6358
private final Scheduler scheduler;
@@ -111,12 +106,12 @@ public void stop() {
111106
actionsDisposables.dispose();
112107

113108
// Stop publishing events
114-
sink.complete();
109+
sink.tryEmitComplete();
115110
}
116111

117112
@Override
118113
public Flux<FailureDetectorEvent> listen() {
119-
return subject.onBackpressureBuffer();
114+
return sink.asFlux().onBackpressureBuffer();
120115
}
121116

122117
// ================================================
@@ -376,7 +371,7 @@ private List<Member> selectPingReqMembers(Member pingMember) {
376371

377372
private void publishPingResult(long period, Member member, MemberStatus status) {
378373
LOGGER.debug("[{}][{}] Member {} detected as {}", localMember, period, member, status);
379-
sink.next(new FailureDetectorEvent(member, status));
374+
sink.tryEmitNext(new FailureDetectorEvent(member, status));
380375
}
381376

382377
private MemberStatus computeMemberStatus(Message message, long period) {

cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,10 @@
2121
import org.slf4j.LoggerFactory;
2222
import reactor.core.Disposable;
2323
import reactor.core.Disposables;
24-
import reactor.core.publisher.DirectProcessor;
2524
import reactor.core.publisher.Flux;
26-
import reactor.core.publisher.FluxProcessor;
27-
import reactor.core.publisher.FluxSink;
2825
import reactor.core.publisher.Mono;
2926
import reactor.core.publisher.MonoSink;
27+
import reactor.core.publisher.Sinks;
3028
import reactor.core.scheduler.Scheduler;
3129

3230
public final class GossipProtocolImpl implements GossipProtocol {
@@ -58,12 +56,8 @@ public final class GossipProtocolImpl implements GossipProtocol {
5856

5957
private final Disposable.Composite actionsDisposables = Disposables.composite();
6058

61-
// Subject
62-
63-
private final FluxProcessor<Message, Message> subject =
64-
DirectProcessor.<Message>create().serialize();
65-
66-
private final FluxSink<Message> sink = subject.sink();
59+
// Sink
60+
private final Sinks.Many<Message> sink = Sinks.many().multicast().directBestEffort();
6761

6862
// Scheduled
6963

@@ -119,7 +113,7 @@ public void stop() {
119113
actionsDisposables.dispose();
120114

121115
// Stop publishing events
122-
sink.complete();
116+
sink.tryEmitComplete();
123117
}
124118

125119
@Override
@@ -131,7 +125,7 @@ public Mono<String> spread(Message message) {
131125

132126
@Override
133127
public Flux<Message> listen() {
134-
return subject.onBackpressureBuffer();
128+
return sink.asFlux().onBackpressureBuffer();
135129
}
136130

137131
// ================================================
@@ -207,7 +201,7 @@ private void onGossipReq(Message message) {
207201
if (gossipState == null) { // new gossip
208202
gossipState = new GossipState(gossip, period);
209203
gossips.put(gossip.gossipId(), gossipState);
210-
sink.next(gossip.message());
204+
sink.tryEmitNext(gossip.message());
211205
}
212206
gossipState.addToInfected(gossipRequest.from());
213207
}

cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,10 @@
4444
import org.slf4j.LoggerFactory;
4545
import reactor.core.Disposable;
4646
import reactor.core.Disposables;
47-
import reactor.core.publisher.DirectProcessor;
4847
import reactor.core.publisher.Flux;
49-
import reactor.core.publisher.FluxProcessor;
50-
import reactor.core.publisher.FluxSink;
5148
import reactor.core.publisher.Mono;
5249
import reactor.core.publisher.MonoSink;
50+
import reactor.core.publisher.Sinks;
5351
import reactor.core.scheduler.Scheduler;
5452

5553
public final class MembershipProtocolImpl implements MembershipProtocol {
@@ -91,11 +89,8 @@ private enum MembershipUpdateReason {
9189
private final List<MembershipEvent> removedMembersHistory = new CopyOnWriteArrayList<>();
9290
private final Set<String> aliveEmittedSet = new HashSet<>();
9391

94-
// Subject
95-
96-
private final FluxProcessor<MembershipEvent, MembershipEvent> subject =
97-
DirectProcessor.<MembershipEvent>create().serialize();
98-
private final FluxSink<MembershipEvent> sink = subject.sink();
92+
// Sink
93+
private final Sinks.Many<MembershipEvent> sink = Sinks.many().multicast().directBestEffort();
9994

10095
// Disposables
10196
private final Disposable.Composite actionsDisposables = Disposables.composite();
@@ -204,7 +199,7 @@ private boolean checkAddressesNotEqual(Address address0, Address address1) {
204199

205200
@Override
206201
public Flux<MembershipEvent> listen() {
207-
return subject.onBackpressureBuffer();
202+
return sink.asFlux().onBackpressureBuffer();
208203
}
209204

210205
/**
@@ -307,7 +302,7 @@ public void stop() {
307302
suspicionTimeoutTasks.clear();
308303

309304
// Stop publishing events
310-
sink.complete();
305+
sink.tryEmitComplete();
311306
}
312307

313308
@Override
@@ -735,7 +730,7 @@ private Mono<Void> onLeavingDetected(MembershipRecord r0, MembershipRecord r1) {
735730

736731
private void publishEvent(MembershipEvent event) {
737732
LOGGER.info("[{}][publishEvent] {}", localMember, event);
738-
sink.next(event);
733+
sink.tryEmitNext(event);
739734
}
740735

741736
private Mono<Void> onDeadMemberDetected(MembershipRecord r1) {

cluster/src/test/java/io/scalecube/cluster/ClusterNamespacesTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ public void testIsolatedParentNamespaces() {
231231
parent1.address(), parent2.address(), bob.address(), carol.address()))
232232
.startAwait();
233233

234+
//noinspection unused
234235
Cluster eve =
235236
new ClusterImpl()
236237
.transportFactory(WebsocketTransportFactory::new)

cluster/src/test/java/io/scalecube/cluster/ClusterTest.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.junit.jupiter.api.Test;
2626
import reactor.core.publisher.Flux;
2727
import reactor.core.publisher.Mono;
28-
import reactor.core.publisher.ReplayProcessor;
28+
import reactor.core.publisher.Sinks;
2929

3030
public class ClusterTest extends BaseTest {
3131

@@ -449,7 +449,7 @@ public void onMembershipEvent(MembershipEvent event) {
449449
@Test
450450
public void testMemberMetadataRemoved() throws InterruptedException {
451451
// Start seed member
452-
ReplayProcessor<MembershipEvent> seedEvents = ReplayProcessor.create();
452+
final Sinks.Many<MembershipEvent> sink = Sinks.many().replay().all();
453453
Map<String, String> seedMetadata = new HashMap<>();
454454
seedMetadata.put("seed", "shmid");
455455
final Cluster seedNode =
@@ -460,7 +460,7 @@ public void testMemberMetadataRemoved() throws InterruptedException {
460460
new ClusterMessageHandler() {
461461
@Override
462462
public void onMembershipEvent(MembershipEvent event) {
463-
seedEvents.onNext(event);
463+
sink.tryEmitNext(event);
464464
}
465465
})
466466
.startAwait();
@@ -469,7 +469,7 @@ public void onMembershipEvent(MembershipEvent event) {
469469
// Start member with metadata
470470
Map<String, String> node1Metadata = new HashMap<>();
471471
node1Metadata.put("node", "shmod");
472-
ReplayProcessor<MembershipEvent> node1Events = ReplayProcessor.create();
472+
final Sinks.Many<MembershipEvent> sink1 = Sinks.many().replay().all();
473473
final Cluster node1 =
474474
new ClusterImpl()
475475
.config(opts -> opts.metadata(node1Metadata))
@@ -479,16 +479,18 @@ public void onMembershipEvent(MembershipEvent event) {
479479
new ClusterMessageHandler() {
480480
@Override
481481
public void onMembershipEvent(MembershipEvent event) {
482-
node1Events.onNext(event);
482+
sink1.tryEmitNext(event);
483483
}
484484
})
485485
.startAwait();
486486

487487
// Check events
488-
MembershipEvent nodeAddedEvent = seedEvents.as(Mono::from).block(Duration.ofSeconds(3));
488+
MembershipEvent nodeAddedEvent = sink.asFlux().blockFirst(Duration.ofSeconds(3));
489+
//noinspection ConstantConditions
489490
assertEquals(Type.ADDED, nodeAddedEvent.type());
490491

491-
MembershipEvent seedAddedEvent = node1Events.as(Mono::from).block(Duration.ofSeconds(3));
492+
MembershipEvent seedAddedEvent = sink1.asFlux().blockFirst(Duration.ofSeconds(3));
493+
//noinspection ConstantConditions
492494
assertEquals(Type.ADDED, seedAddedEvent.type());
493495

494496
// Check metadata
@@ -498,7 +500,7 @@ public void onMembershipEvent(MembershipEvent event) {
498500
// Remove node1 from cluster
499501
CountDownLatch latch = new CountDownLatch(1);
500502
AtomicReference<Map<String, String>> removedMetadata = new AtomicReference<>();
501-
seedEvents
503+
sink.asFlux()
502504
.filter(MembershipEvent::isRemoved)
503505
.subscribe(
504506
event -> {

0 commit comments

Comments
 (0)