Skip to content

Commit 7cbf29b

Browse files
committed
Replace RxJava 2 with RxJava 3.
1 parent b78047d commit 7cbf29b

File tree

51 files changed

+128
-134
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+128
-134
lines changed

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ allprojects {
6666
/* ******************** dependencies ******************** */
6767

6868
dependencies {
69-
api("io.reactivex.rxjava2:rxjava:${property("rxjava.version")}")
69+
api("io.reactivex.rxjava3:rxjava:${property("rxjava.version")}")
7070
api("org.reactivestreams:reactive-streams:${property("reactive-streams.version")}")
7171

7272
implementation("io.netty:netty-buffer:${property("netty.version")}")

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ prevVersion=1.3.4
33
#
44
# main dependencies
55
#
6-
rxjava.version=2.2.21
6+
rxjava.version=3.1.10
77
reactive-streams.version=1.0.4
88
netty.version=4.1.119.Final
99
jctools.version=2.1.2

reactor/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/reactor/Mqtt3ReactorClientView.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe;
3535
import com.hivemq.client.mqtt.mqtt3.reactor.Mqtt3ReactorClient;
3636
import com.hivemq.client.rx.reactor.FluxWithSingle;
37-
import io.reactivex.Flowable;
37+
import io.reactivex.rxjava3.core.Flowable;
3838
import org.jetbrains.annotations.NotNull;
3939
import org.reactivestreams.Publisher;
4040
import reactor.adapter.rxjava.RxJava2Adapter;

reactor/src/main/java/com/hivemq/client/internal/mqtt/reactor/MqttReactorClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
3939
import com.hivemq.client.mqtt.mqtt5.reactor.Mqtt5ReactorClient;
4040
import com.hivemq.client.rx.reactor.FluxWithSingle;
41-
import io.reactivex.Flowable;
41+
import io.reactivex.rxjava3.core.Flowable;
4242
import org.jetbrains.annotations.NotNull;
4343
import org.reactivestreams.Publisher;
4444
import reactor.adapter.rxjava.RxJava2Adapter;

src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@
4040
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
4141
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
4242
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
43-
import io.reactivex.FlowableSubscriber;
44-
import io.reactivex.schedulers.Schedulers;
43+
import io.reactivex.rxjava3.core.FlowableSubscriber;
44+
import io.reactivex.rxjava3.schedulers.Schedulers;
4545
import org.jetbrains.annotations.NotNull;
4646
import org.jetbrains.annotations.Nullable;
4747
import org.reactivestreams.Subscription;

src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@
4545
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
4646
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
4747
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
48-
import io.reactivex.Flowable;
49-
import io.reactivex.FlowableSubscriber;
50-
import io.reactivex.internal.subscriptions.SubscriptionHelper;
48+
import io.reactivex.rxjava3.core.Flowable;
49+
import io.reactivex.rxjava3.core.FlowableSubscriber;
50+
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
5151
import org.jetbrains.annotations.NotNull;
5252
import org.jetbrains.annotations.Nullable;
5353
import org.reactivestreams.Subscription;

src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package com.hivemq.client.internal.mqtt;
1818

1919
import com.hivemq.client.mqtt.MqttClientExecutorConfig;
20-
import io.reactivex.Scheduler;
20+
import io.reactivex.rxjava3.core.Scheduler;
2121
import org.jetbrains.annotations.NotNull;
2222
import org.jetbrains.annotations.Nullable;
2323

src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImplBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import com.hivemq.client.internal.util.Checks;
2020
import com.hivemq.client.mqtt.MqttClientExecutorConfigBuilder;
21-
import io.reactivex.Scheduler;
21+
import io.reactivex.rxjava3.core.Scheduler;
2222
import org.jetbrains.annotations.NotNull;
2323
import org.jetbrains.annotations.Nullable;
2424

src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlowable;
2424
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckFlowable;
2525
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingle;
26-
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingleFlowable;
2726
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubAckSingle;
2827
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttUnsubAckSingle;
2928
import com.hivemq.client.internal.mqtt.message.connect.MqttConnect;
@@ -49,12 +48,11 @@
4948
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
5049
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
5150
import com.hivemq.client.rx.FlowableWithSingle;
52-
import io.reactivex.Completable;
53-
import io.reactivex.Flowable;
54-
import io.reactivex.Scheduler;
55-
import io.reactivex.Single;
56-
import io.reactivex.functions.Function;
57-
import io.reactivex.internal.fuseable.ScalarCallable;
51+
import io.reactivex.rxjava3.core.Completable;
52+
import io.reactivex.rxjava3.core.Flowable;
53+
import io.reactivex.rxjava3.core.Scheduler;
54+
import io.reactivex.rxjava3.core.Single;
55+
import io.reactivex.rxjava3.functions.Function;
5856
import org.jetbrains.annotations.NotNull;
5957
import org.jetbrains.annotations.Nullable;
6058

@@ -214,20 +212,6 @@ public MqttRxClient(final @NotNull MqttClientConfig clientConfig) {
214212
final @NotNull Flowable<P> publishFlowable, final @NotNull Function<P, MqttPublish> publishMapper) {
215213

216214
final Scheduler applicationScheduler = clientConfig.getExecutorConfig().getApplicationScheduler();
217-
if (publishFlowable instanceof ScalarCallable) {
218-
//noinspection unchecked
219-
final P publish = ((ScalarCallable<P>) publishFlowable).call();
220-
if (publish == null) {
221-
return Flowable.empty();
222-
}
223-
final MqttPublish mqttPublish;
224-
try {
225-
mqttPublish = publishMapper.apply(publish);
226-
} catch (final Throwable t) {
227-
return Flowable.error(t);
228-
}
229-
return new MqttAckSingleFlowable(clientConfig, mqttPublish).observeOn(applicationScheduler, true);
230-
}
231215
return new MqttAckFlowable(
232216
clientConfig, publishFlowable.subscribeOn(applicationScheduler).map(publishMapper)).observeOn(
233217
applicationScheduler, true);

src/main/java/com/hivemq/client/internal/mqtt/exceptions/mqtt3/Mqtt3ExceptionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import com.hivemq.client.mqtt.mqtt3.exceptions.*;
2525
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5MessageException;
2626
import com.hivemq.client.mqtt.mqtt5.message.Mqtt5Message;
27-
import io.reactivex.functions.Function;
27+
import io.reactivex.rxjava3.functions.Function;
2828
import org.jetbrains.annotations.NotNull;
2929

3030
/**

0 commit comments

Comments
 (0)