Skip to content

Commit 6047692

Browse files
committed
migrates to the latest master updates
Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com> Signed-off-by: Oleh Dokuka <odokuka@vmware.com> Signed-off-by: Oleh Dokuka <oleh.dokuka@icloud.com>
1 parent 83b73e2 commit 6047692

File tree

4 files changed

+17
-16
lines changed

4 files changed

+17
-16
lines changed

rsocket-transport-aeron/build.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
plugins {
1818
id 'java-library'
1919
id 'maven-publish'
20-
id 'com.jfrog.artifactory'
21-
id 'com.jfrog.bintray'
20+
id 'signing'
2221
id "com.google.osdetector" version "1.4.0"
2322
}
2423

@@ -35,7 +34,9 @@ dependencies {
3534
testImplementation 'org.mockito:mockito-junit-jupiter'
3635
testImplementation 'org.junit.jupiter:junit-jupiter-api'
3736
testImplementation 'org.junit.jupiter:junit-jupiter-params'
37+
testImplementation 'org.slf4j:slf4j-api'
3838

39+
testImplementation 'ch.qos.logback:logback-classic'
3940
testRuntimeOnly 'ch.qos.logback:logback-classic'
4041
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
4142
}

rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/FluxReceiver.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
import org.slf4j.LoggerFactory;
3232
import reactor.core.CoreSubscriber;
3333
import reactor.core.publisher.Flux;
34-
import reactor.core.publisher.MonoProcessor;
3534
import reactor.core.publisher.Operators;
35+
import reactor.core.publisher.Sinks;
3636

3737
class FluxReceiver extends Flux<ByteBuf>
3838
implements org.reactivestreams.Subscription, ControlledFragmentHandler, Runnable {
@@ -44,7 +44,7 @@ class FluxReceiver extends Flux<ByteBuf>
4444
final Subscription subscription;
4545
final ControlledFragmentAssembler assembler;
4646
final EventLoop eventLoop;
47-
final MonoProcessor<Void> onClose;
47+
final Sinks.Empty<Void> onClose;
4848
final int effort;
4949

5050
volatile long requested;
@@ -61,7 +61,7 @@ class FluxReceiver extends Flux<ByteBuf>
6161
int produced;
6262

6363
public FluxReceiver(
64-
MonoProcessor<Void> onClose, EventLoop eventLoop, Subscription subscription, int effort) {
64+
Sinks.Empty<Void> onClose, EventLoop eventLoop, Subscription subscription, int effort) {
6565
this.onClose = onClose;
6666
this.eventLoop = eventLoop;
6767
this.subscription = subscription;
@@ -140,7 +140,7 @@ void drain(long n) {
140140
final NotConnectedException exception =
141141
new NotConnectedException("Aeron Subscription has been closed unexpectedly");
142142
this.actual.onError(exception);
143-
this.onClose.onError(exception);
143+
this.onClose.tryEmitError(exception);
144144
return;
145145
}
146146

rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/MonoSendMany.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,17 @@
2828
import reactor.core.Disposable;
2929
import reactor.core.Fuseable;
3030
import reactor.core.Fuseable.QueueSubscription;
31-
import reactor.core.publisher.MonoProcessor;
3231
import reactor.core.publisher.Operators;
32+
import reactor.core.publisher.Sinks;
3333
import reactor.util.context.Context;
3434

3535
class MonoSendMany implements Disposable, CoreSubscriber<ByteBuf>, Runnable {
36-
private static final Logger logger = LoggerFactory.getLogger(MonoSendMany.class);
36+
static final Logger logger = LoggerFactory.getLogger(MonoSendMany.class);
3737

3838
static final ThreadLocal<BufferClaim> BUFFER_CLAIMS = ThreadLocal.withInitial(BufferClaim::new);
3939
static final ThreadLocal<UnsafeBuffer> UNSAFE_BUFFER = ThreadLocal.withInitial(UnsafeBuffer::new);
4040

41-
final MonoProcessor<Void> onClose;
41+
final Sinks.Empty<Void> onClose;
4242
final EventLoop eventLoop;
4343
final ExclusivePublication publication;
4444
final int effort;
@@ -59,7 +59,7 @@ class MonoSendMany implements Disposable, CoreSubscriber<ByteBuf>, Runnable {
5959
boolean cancelled;
6060

6161
public MonoSendMany(
62-
MonoProcessor<Void> onClose,
62+
Sinks.Empty<Void> onClose,
6363
EventLoop eventLoop,
6464
ExclusivePublication publication,
6565
int effort,
@@ -130,7 +130,7 @@ void drain() {
130130
}
131131

132132
void drainAsync() {
133-
final MonoProcessor<Void> onClose = this.onClose;
133+
final Sinks.Empty<Void> onClose = this.onClose;
134134
final ExclusivePublication publication = this.publication;
135135
final QueueSubscription<ByteBuf> qs = this.subscription;
136136
final EventLoop eventLoop = this.eventLoop;
@@ -159,7 +159,7 @@ void drainAsync() {
159159
qs.cancel();
160160

161161
this.done = true;
162-
onClose.onError(t);
162+
onClose.tryEmitError(t);
163163
return;
164164
}
165165

@@ -200,9 +200,9 @@ void drainAsync() {
200200

201201
final Throwable t = this.t;
202202
if (t == null) {
203-
onClose.onComplete();
203+
onClose.tryEmitEmpty();
204204
} else {
205-
onClose.onError(t);
205+
onClose.tryEmitError(t);
206206
}
207207
}
208208
break;
@@ -217,7 +217,7 @@ void drainAsync() {
217217
buf.release();
218218

219219
this.done = true;
220-
onClose.onError(t);
220+
onClose.tryEmitError(t);
221221
return;
222222
}
223223

rsocket-transport-aeron/src/main/java/io/rsocket/transport/aeron/WrappedDirectBufferByteBuf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ public int nioBufferCount() {
245245
public ByteBuffer nioBuffer(int index, int length) {
246246
final ByteBuffer buffer = directBuffer.byteBuffer();
247247
if (buffer != null) {
248-
return buffer.duplicate().position(index).limit(index + length);
248+
return (ByteBuffer) buffer.duplicate().position(index).limit(index + length);
249249
} else {
250250
final byte[] bytes = directBuffer.byteArray();
251251
return ByteBuffer.wrap(bytes, index, length);

0 commit comments

Comments
 (0)