Skip to content

Commit e26c859

Browse files
authored
Merge pull request #807 from scalecube/update-dependencies
Update dependencies: rsocket, netty, reactor
2 parents 889785c + c2892ed commit e26c859

File tree

31 files changed

+105
-1420
lines changed

31 files changed

+105
-1420
lines changed

pom.xml

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,17 @@
5757
</scm>
5858

5959
<properties>
60-
<scalecube-cluster.version>2.6.6</scalecube-cluster.version>
60+
<scalecube-cluster.version>2.6.7.RC1</scalecube-cluster.version>
6161
<scalecube-commons.version>1.0.12</scalecube-commons.version>
62-
<reactor.version>Dysprosium-SR9</reactor.version>
62+
<reactor.version>2020.0.5</reactor.version>
6363

6464
<jackson.version>2.11.0</jackson.version>
65-
<rsocket.version>1.0.1</rsocket.version>
65+
<rsocket.version>1.0.4</rsocket.version>
6666
<protostuff.version>1.6.0</protostuff.version>
6767
<slf4j.version>1.7.30</slf4j.version>
6868
<log4j.version>2.13.2</log4j.version>
6969
<disruptor.version>3.4.2</disruptor.version>
70+
<netty.version>4.1.60.Final</netty.version>
7071

7172
<jsr305.version>3.0.2</jsr305.version>
7273
<jctools.version>2.1.2</jctools.version>
@@ -182,6 +183,18 @@
182183
<artifactId>jctools-core</artifactId>
183184
<version>${jctools.version}</version>
184185
</dependency>
186+
187+
<!-- Enforcer / Netty -->
188+
<dependency>
189+
<groupId>io.netty</groupId>
190+
<artifactId>netty-buffer</artifactId>
191+
<version>${netty.version}</version>
192+
</dependency>
193+
<dependency>
194+
<groupId>io.netty</groupId>
195+
<artifactId>netty-common</artifactId>
196+
<version>${netty.version}</version>
197+
</dependency>
185198
</dependencies>
186199
</dependencyManagement>
187200

services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import reactor.core.publisher.Flux;
2323
import reactor.core.publisher.Mono;
2424
import reactor.util.context.Context;
25+
import reactor.util.context.ContextView;
2526

2627
public final class ServiceMethodInvoker {
2728

@@ -70,7 +71,7 @@ public ServiceMethodInvoker(
7071
* @return mono of service message
7172
*/
7273
public Mono<ServiceMessage> invokeOne(ServiceMessage message) {
73-
return Mono.deferWithContext(context -> authenticate(message, context))
74+
return Mono.deferContextual(context -> authenticate(message, context))
7475
.flatMap(authData -> deferWithContextOne(message, authData))
7576
.map(response -> toResponse(response, message.qualifier(), message.dataFormat()))
7677
.onErrorResume(
@@ -84,7 +85,7 @@ public Mono<ServiceMessage> invokeOne(ServiceMessage message) {
8485
* @return flux of service messages
8586
*/
8687
public Flux<ServiceMessage> invokeMany(ServiceMessage message) {
87-
return Mono.deferWithContext(context -> authenticate(message, context))
88+
return Mono.deferContextual(context -> authenticate(message, context))
8889
.flatMapMany(authData -> deferWithContextMany(message, authData))
8990
.map(response -> toResponse(response, message.qualifier(), message.dataFormat()))
9091
.onErrorResume(
@@ -101,7 +102,7 @@ public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publis
101102
return Flux.from(publisher)
102103
.switchOnFirst(
103104
(first, messages) ->
104-
Mono.deferWithContext(context -> authenticate(first.get(), context))
105+
Mono.deferContextual(context -> authenticate(first.get(), context))
105106
.flatMapMany(authData -> deferWithContextBidirectional(messages, authData))
106107
.map(
107108
response ->
@@ -112,18 +113,18 @@ public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publis
112113
}
113114

114115
private Mono<?> deferWithContextOne(ServiceMessage message, Object authData) {
115-
return Mono.deferWithContext(context -> Mono.from(invoke(toRequest(message))))
116-
.subscriberContext(context -> enhanceContextWithPrincipal(authData, context));
116+
return Mono.deferContextual(context -> Mono.from(invoke(toRequest(message))))
117+
.contextWrite(context -> enhanceContextWithPrincipal(authData, context));
117118
}
118119

119120
private Flux<?> deferWithContextMany(ServiceMessage message, Object authData) {
120-
return Flux.deferWithContext(context -> Flux.from(invoke(toRequest(message))))
121-
.subscriberContext(context -> enhanceContextWithPrincipal(authData, context));
121+
return Flux.deferContextual(context -> Flux.from(invoke(toRequest(message))))
122+
.contextWrite(context -> enhanceContextWithPrincipal(authData, context));
122123
}
123124

124125
private Flux<?> deferWithContextBidirectional(Flux<ServiceMessage> messages, Object authData) {
125-
return Flux.deferWithContext(context -> messages.map(this::toRequest).transform(this::invoke))
126-
.subscriberContext(context -> enhanceContextWithPrincipal(authData, context));
126+
return Flux.deferContextual(context -> messages.map(this::toRequest).transform(this::invoke))
127+
.contextWrite(context -> enhanceContextWithPrincipal(authData, context));
127128
}
128129

129130
private Publisher<?> invoke(Object request) {
@@ -155,7 +156,7 @@ private Object[] prepareArguments(Object request) {
155156
return arguments;
156157
}
157158

158-
private Mono<Object> authenticate(ServiceMessage message, Context context) {
159+
private Mono<Object> authenticate(ServiceMessage message, ContextView context) {
159160
if (!methodInfo.isSecured()) {
160161
return Mono.just(NULL_AUTH_CONTEXT);
161162
}

services-api/src/test/java/io/scalecube/services/methods/ServiceMethodInvokerTest.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ void testInvokeOneWhenReturnNull() throws Exception {
7878
ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build();
7979

8080
StepVerifier.create(
81-
Mono.deferWithContext(context -> serviceMethodInvoker.invokeOne(message))
82-
.subscriberContext(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA)))
81+
Mono.deferContextual(context -> serviceMethodInvoker.invokeOne(message))
82+
.contextWrite(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA)))
8383
.verifyComplete();
8484
}
8585

@@ -116,8 +116,8 @@ void testInvokeManyWhenReturnNull() throws Exception {
116116
ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build();
117117

118118
StepVerifier.create(
119-
Flux.deferWithContext(context -> serviceMethodInvoker.invokeMany(message))
120-
.subscriberContext(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA)))
119+
Flux.deferContextual(context -> serviceMethodInvoker.invokeMany(message))
120+
.contextWrite(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA)))
121121
.verifyComplete();
122122
}
123123

@@ -154,9 +154,9 @@ void testInvokeBidirectionalWhenReturnNull() throws Exception {
154154
ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build();
155155

156156
StepVerifier.create(
157-
Flux.deferWithContext(
157+
Flux.deferContextual(
158158
context -> serviceMethodInvoker.invokeBidirectional(Flux.just(message)))
159-
.subscriberContext(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA)))
159+
.contextWrite(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA)))
160160
.verifyComplete();
161161
}
162162

@@ -194,8 +194,8 @@ void testInvokeOneWhenThrowException() throws Exception {
194194

195195
// invokeOne
196196
final Mono<ServiceMessage> invokeOne =
197-
Mono.deferWithContext(context -> serviceMethodInvoker.invokeOne(message))
198-
.subscriberContext(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA));
197+
Mono.deferContextual(context -> serviceMethodInvoker.invokeOne(message))
198+
.contextWrite(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA));
199199

200200
StepVerifier.create(invokeOne)
201201
.assertNext(serviceMessage -> Assertions.assertTrue(serviceMessage.isError()))
@@ -235,8 +235,8 @@ void testInvokeManyWhenThrowException() throws Exception {
235235
ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build();
236236

237237
final Flux<ServiceMessage> invokeOne =
238-
Flux.deferWithContext(context -> serviceMethodInvoker.invokeMany(message))
239-
.subscriberContext(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA));
238+
Flux.deferContextual(context -> serviceMethodInvoker.invokeMany(message))
239+
.contextWrite(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA));
240240

241241
StepVerifier.create(invokeOne)
242242
.assertNext(serviceMessage -> Assertions.assertTrue(serviceMessage.isError()))
@@ -277,9 +277,9 @@ void testInvokeBidirectionalWhenThrowException() throws Exception {
277277

278278
// invokeOne
279279
final Flux<ServiceMessage> invokeOne =
280-
Flux.deferWithContext(
280+
Flux.deferContextual(
281281
context -> serviceMethodInvoker.invokeBidirectional(Flux.just(message)))
282-
.subscriberContext(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA));
282+
.contextWrite(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA));
283283

284284
StepVerifier.create(invokeOne)
285285
.assertNext(serviceMessage -> Assertions.assertTrue(serviceMessage.isError()))
@@ -363,8 +363,8 @@ void testAuthMethodWhenThereIsContextAndNoAuthenticator() throws Exception {
363363
ServiceMessage.builder().qualifier(qualifierPrefix + methodName).build();
364364

365365
StepVerifier.create(
366-
Mono.deferWithContext(context -> serviceMethodInvoker.invokeOne(message))
367-
.subscriberContext(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA)))
366+
Mono.deferContextual(context -> serviceMethodInvoker.invokeOne(message))
367+
.contextWrite(context -> context.put(AUTH_CONTEXT_KEY, AUTH_DATA)))
368368
.verifyComplete();
369369
}
370370

@@ -389,7 +389,7 @@ void testAuthMethodWhenNoContextButThereIsAuthenticator() throws Exception {
389389
IS_REQUEST_TYPE_SERVICE_MESSAGE,
390390
AUTH);
391391

392-
//noinspection unchecked
392+
//noinspection unchecked,rawtypes
393393
Authenticator<Map> mockedAuthenticator = Mockito.mock(Authenticator.class);
394394
Mockito.when(mockedAuthenticator.apply(ArgumentMatchers.anyMap()))
395395
.thenReturn(Mono.just(AUTH_DATA));

services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,9 @@
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
3434
import reactor.core.Exceptions;
35-
import reactor.core.publisher.DirectProcessor;
3635
import reactor.core.publisher.Flux;
37-
import reactor.core.publisher.FluxSink;
3836
import reactor.core.publisher.Mono;
37+
import reactor.core.publisher.Sinks;
3938

4039
public final class ScalecubeServiceDiscovery implements ServiceDiscovery {
4140

@@ -46,8 +45,9 @@ public final class ScalecubeServiceDiscovery implements ServiceDiscovery {
4645
private ClusterConfig clusterConfig;
4746
private Cluster cluster;
4847

49-
private final DirectProcessor<ServiceDiscoveryEvent> subject = DirectProcessor.create();
50-
private final FluxSink<ServiceDiscoveryEvent> sink = subject.sink();
48+
// Sink
49+
private final Sinks.Many<ServiceDiscoveryEvent> sink =
50+
Sinks.many().multicast().directBestEffort();
5151

5252
/**
5353
* Constructor.
@@ -132,7 +132,7 @@ public ScalecubeServiceDiscovery failureDetector(UnaryOperator<FailureDetectorCo
132132
*/
133133
@Override
134134
public Mono<Void> start() {
135-
return Mono.deferWithContext(
135+
return Mono.deferContextual(
136136
context -> {
137137
ServiceDiscoveryContext.Builder discoveryContextBuilder =
138138
context.get(ServiceDiscoveryContext.Builder.class);
@@ -161,19 +161,19 @@ public void onMembershipEvent(MembershipEvent event) {
161161

162162
@Override
163163
public Flux<ServiceDiscoveryEvent> listen() {
164-
return subject.onBackpressureBuffer();
164+
return sink.asFlux().onBackpressureBuffer();
165165
}
166166

167167
@Override
168168
public Mono<Void> shutdown() {
169169
return Mono.defer(
170170
() -> {
171171
if (cluster == null) {
172-
sink.complete();
172+
sink.tryEmitComplete();
173173
return Mono.empty();
174174
}
175175
cluster.shutdown();
176-
return cluster.onShutdown().doFinally(s -> sink.complete());
176+
return cluster.onShutdown().doFinally(s -> sink.tryEmitComplete());
177177
});
178178
}
179179

@@ -190,7 +190,7 @@ private void onMembershipEvent(MembershipEvent membershipEvent) {
190190

191191
if (discoveryEvent != null) {
192192
LOGGER.debug("Publish discoveryEvent: {}", discoveryEvent);
193-
sink.next(discoveryEvent);
193+
sink.tryEmitNext(discoveryEvent);
194194
}
195195
}
196196

services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import org.junit.jupiter.params.provider.MethodSource;
3636
import reactor.core.publisher.Flux;
3737
import reactor.core.publisher.Mono;
38-
import reactor.core.publisher.ReplayProcessor;
38+
import reactor.core.publisher.Sinks;
3939
import reactor.test.StepVerifier;
4040

4141
class ScalecubeServiceDiscoveryTest extends BaseTest {
@@ -244,7 +244,7 @@ private void startSeed(MetadataCodec metadataCodec) {
244244
private static class RecordingServiceDiscovery {
245245

246246
final Supplier<Mono<ServiceDiscovery>> supplier;
247-
final ReplayProcessor<ServiceDiscoveryEvent> discoveryEvents = ReplayProcessor.create();
247+
final Sinks.Many<ServiceDiscoveryEvent> sink = Sinks.many().replay().all();
248248

249249
ServiceDiscovery serviceDiscovery; // effectively final
250250

@@ -258,7 +258,9 @@ private RecordingServiceDiscovery(RecordingServiceDiscovery other) {
258258
}
259259

260260
Flux<ServiceDiscoveryEvent> nonGroupDiscoveryEvents() {
261-
return discoveryEvents.filter(ScalecubeServiceDiscoveryTest::filterNonGroupDiscoveryEvents);
261+
return sink.asFlux()
262+
.onBackpressureBuffer()
263+
.filter(ScalecubeServiceDiscoveryTest::filterNonGroupDiscoveryEvents);
262264
}
263265

264266
RecordingServiceDiscovery resubscribe() {
@@ -282,7 +284,9 @@ static RecordingServiceDiscovery create(Supplier<Mono<ServiceDiscovery>> supplie
282284
}
283285

284286
private RecordingServiceDiscovery subscribe() {
285-
serviceDiscovery.listen().subscribe(discoveryEvents);
287+
serviceDiscovery
288+
.listen()
289+
.subscribe(sink::tryEmitNext, sink::tryEmitError, sink::tryEmitComplete);
286290
return this;
287291
}
288292

0 commit comments

Comments
 (0)