Skip to content

Commit c9c5149

Browse files
committed
Revert rebind address to initial one after delay
# Conflicts: # src/main/java/redis/clients/jedis/MaintenanceEventListener.java # src/main/java/redis/clients/jedis/RebindAware.java # src/test/java/redis/clients/jedis/upgrade/UnifiedJedisProactiveRebindTest.java
1 parent b81a79e commit c9c5149

File tree

11 files changed

+422
-43
lines changed

11 files changed

+422
-43
lines changed

src/main/java/redis/clients/jedis/Connection.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.net.SocketException;
1212
import java.nio.ByteBuffer;
1313
import java.nio.CharBuffer;
14+
import java.time.Duration;
1415
import java.util.ArrayList;
1516
import java.util.Arrays;
1617
import java.util.List;
@@ -839,8 +840,11 @@ public void accept(PushConsumerContext context) {
839840
}
840841
}
841842
private void onMoving(PushMessage message) {
842-
HostAndPort rebindTarget = getRebindTarget(message);
843-
eventHandler.getListeners().forEach(listener -> listener.onRebind(rebindTarget));
843+
RebindEvent rebindEvent = getRebindTarget(message);
844+
if (rebindEvent == null) {
845+
return;
846+
}
847+
eventHandler.getListeners().forEach(listener -> listener.onRebind(rebindEvent.target,rebindEvent.rebindTimeout));
844848
}
845849

846850
private void onMigrating() {
@@ -859,7 +863,7 @@ private void onFailedOver() {
859863
eventHandler.getListeners().forEach(MaintenanceEventListener::onFailedOver);
860864
}
861865

862-
private HostAndPort getRebindTarget(PushMessage message) {
866+
private RebindEvent getRebindTarget(PushMessage message) {
863867
// Extract domain/ip and port from the message
864868
// MOVING push message format: ["MOVING", slot, "host:port"]
865869
List<Object> content = message.getContent();
@@ -869,6 +873,14 @@ private HostAndPort getRebindTarget(PushMessage message) {
869873
return null;
870874
}
871875

876+
Object timeObject = content.get(1); // Get the 3rd element (index 2)
877+
if (!(timeObject instanceof Long)) {
878+
logger.warn("Invalid re-bind message format, expected 2rd element to be a <time> (Long), got {}",
879+
timeObject.getClass());
880+
return null;
881+
}
882+
883+
872884
Object addressObject = content.get(2); // Get the 3rd element (index 2)
873885
if (!(addressObject instanceof byte[])) {
874886
logger.warn("Invalid re-bind message format, expected 3rd element to be a byte[], got {}",
@@ -887,18 +899,29 @@ private HostAndPort getRebindTarget(PushMessage message) {
887899

888900
String address = parts[0];
889901
int port = Integer.parseInt(parts[1]);
890-
return new HostAndPort(address, port);
902+
return new RebindEvent(new HostAndPort(address, port), Duration.ofSeconds((Long) timeObject));
891903
} catch (Exception e) {
892904
logger.warn("Error parsing re-bind target from message: {}", message, e);
893905
return null;
894906
}
895907
}
908+
909+
private static class RebindEvent{
910+
Duration rebindTimeout;
911+
HostAndPort target;
912+
913+
public RebindEvent(HostAndPort target, Duration rebindTimeout) {
914+
this.target = target;
915+
this.rebindTimeout = rebindTimeout;
916+
}
917+
}
896918
}
897919

898920
private class ConnectionRebindHandler implements MaintenanceEventListener {
899-
public void onRebind(HostAndPort target) {
921+
@Override
922+
public void onRebind(HostAndPort target, Duration rebindTimeout) {
900923
rebindRequested = true;
901-
}
924+
}
902925
}
903926

904927
private static class AdaptiveTimeoutHandler implements MaintenanceEventListener {
@@ -914,35 +937,40 @@ public AdaptiveTimeoutHandler(Connection connection) {
914937
this.connectionRef = new WeakReference<>(connection);
915938
}
916939

940+
@Override
917941
public void onMigrating() {
918942
Connection connection = connectionRef.get();
919943
if (connection != null) {
920944
connection.relaxTimeouts();
921945
}
922946
}
923947

948+
@Override
924949
public void onMigrated() {
925950
Connection connection = connectionRef.get();
926951
if (connection != null) {
927952
connection.disableRelaxedTimeout();
928953
}
929954
}
930955

956+
@Override
931957
public void onFailOver() {
932958
Connection connection = connectionRef.get();
933959
if (connection != null) {
934960
connection.relaxTimeouts();
935961
}
936962
}
937963

964+
@Override
938965
public void onFailedOver() {
939966
Connection connection = connectionRef.get();
940967
if (connection != null) {
941968
connection.disableRelaxedTimeout();
942969
}
943970
}
944971

945-
public void onRebind(HostAndPort target) {
972+
@Override
973+
public void onRebind(HostAndPort target, Duration rebindTimeout) {
946974
Connection connection = connectionRef.get();
947975
if (connection != null) {
948976
connection.relaxTimeouts();

src/main/java/redis/clients/jedis/ConnectionFactory.java

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.slf4j.Logger;
77
import org.slf4j.LoggerFactory;
88

9+
import java.time.Duration;
910
import java.util.function.Supplier;
1011

1112
import redis.clients.jedis.annots.Experimental;
@@ -15,6 +16,7 @@
1516
import redis.clients.jedis.csc.Cache;
1617
import redis.clients.jedis.csc.CacheConnection;
1718
import redis.clients.jedis.exceptions.JedisException;
19+
import redis.clients.jedis.util.Expirable;
1820

1921
/**
2022
* PoolableObjectFactory custom impl.
@@ -29,6 +31,7 @@ public class ConnectionFactory implements PooledObjectFactory<Connection> , Rebi
2931
private final Supplier<Connection> objectMaker;
3032

3133
private final AuthXEventListener authXEventListener;
34+
private final RebindAwareHostPortSupplier rebindAwareHostPortSupplier;
3235

3336
public ConnectionFactory(final HostAndPort hostAndPort) {
3437
this(hostAndPort, DefaultJedisClientConfig.builder().build(), null);
@@ -66,6 +69,22 @@ private ConnectionFactory(final JedisSocketFactory jedisSocketFactory,
6669
this.authXEventListener = authXManager.getListener();
6770
authXManager.start();
6871
}
72+
73+
if (clientConfig.isProactiveRebindEnabled()) {
74+
if (!(jedisSocketFactory instanceof DefaultJedisSocketFactory)) {
75+
throw new IllegalStateException("Rebind not supported for custom JedisSocketFactory implementations");
76+
}
77+
DefaultJedisSocketFactory factory = (DefaultJedisSocketFactory) jedisSocketFactory;
78+
this.rebindAwareHostPortSupplier = wrapHostAndPortSupplier(factory);
79+
} else {
80+
this.rebindAwareHostPortSupplier = null;
81+
}
82+
}
83+
84+
private RebindAwareHostPortSupplier wrapHostAndPortSupplier(DefaultJedisSocketFactory factory) {
85+
RebindAwareHostPortSupplier hostPortSupplier = new RebindAwareHostPortSupplier(factory.getHostAndPort(), factory.getHostAndPortSupplier());
86+
factory.setHostAndPortSupplier(hostPortSupplier);
87+
return hostPortSupplier;
6988
}
7089

7190
private Supplier<Connection> connectionSupplier() {
@@ -141,17 +160,39 @@ private void reAuthenticate(Connection jedis) throws Exception {
141160
}
142161
}
143162

144-
145163
@Override
146-
public void rebind(HostAndPort newHostAndPort) {
147-
// TODO : extract interface from DefaultJedisSocketFactory so that we can support custom socket factories
148-
if (!(jedisSocketFactory instanceof DefaultJedisSocketFactory)) {
149-
throw new IllegalStateException("Rebind not supported for custom JedisSocketFactory implementations");
164+
public void rebind(HostAndPort newHostAndPort, Duration rebindTimeout) {
165+
if (rebindAwareHostPortSupplier != null) {
166+
rebindAwareHostPortSupplier.rebind(newHostAndPort, rebindTimeout);
150167
}
151-
152-
DefaultJedisSocketFactory factory = (DefaultJedisSocketFactory) jedisSocketFactory;
153-
logger.debug("Rebinding to {}", newHostAndPort);
154-
factory.updateHostAndPort(newHostAndPort);
155168
}
156169

170+
private static class RebindAwareHostPortSupplier implements Supplier<HostAndPort>, RebindAware {
171+
private final Supplier<HostAndPort> delegatedSupplier;
172+
private final HostAndPort initialHostAndPort;
173+
private volatile Expirable<HostAndPort> rebindTarget;
174+
175+
public RebindAwareHostPortSupplier(HostAndPort initialHostAndPort,
176+
Supplier<HostAndPort> hostAndPortSupplier) {
177+
this.initialHostAndPort = initialHostAndPort;
178+
this.delegatedSupplier = hostAndPortSupplier;
179+
}
180+
181+
public void rebind(HostAndPort rebindTarget, Duration rebindTimeout) {
182+
this.rebindTarget = new Expirable<>(rebindTarget, rebindTimeout);
183+
}
184+
185+
public HostAndPort get() {
186+
if (rebindTarget != null && rebindTarget.isValid()) {
187+
return rebindTarget.getValue();
188+
}
189+
190+
if (delegatedSupplier != null) {
191+
return delegatedSupplier.get();
192+
}
193+
194+
return initialHostAndPort;
195+
}
196+
197+
}
157198
}

src/main/java/redis/clients/jedis/ConnectionPool.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,19 @@
33
import org.apache.commons.pool2.PooledObjectFactory;
44
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
55

6-
import org.slf4j.Logger;
7-
import org.slf4j.LoggerFactory;
86
import redis.clients.jedis.annots.Experimental;
97
import redis.clients.jedis.authentication.AuthXManager;
108
import redis.clients.jedis.csc.Cache;
119
import redis.clients.jedis.exceptions.JedisException;
1210
import redis.clients.jedis.util.Pool;
1311

12+
import java.time.Duration;
1413
import java.util.concurrent.atomic.AtomicReference;
1514

1615
public class ConnectionPool extends Pool<Connection> {
1716

1817
private AuthXManager authXManager;
18+
private RebindHandler rebindHandler;
1919

2020
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
2121
this(new ConnectionFactory(hostAndPort, clientConfig));
@@ -89,7 +89,7 @@ private void attachAuthenticationListener(AuthXManager authXManager) {
8989

9090
private void attachRebindHandler(JedisClientConfig clientConfig, ConnectionFactory factory) {
9191
if (clientConfig.isProactiveRebindEnabled()) {
92-
RebindHandler rebindHandler = new RebindHandler(this, factory);
92+
rebindHandler = new RebindHandler(this, factory);
9393
clientConfig.getMaintenanceEventHandler().addListener(rebindHandler);
9494
}
9595
}
@@ -102,13 +102,13 @@ private static class RebindHandler implements MaintenanceEventListener {
102102
public RebindHandler(ConnectionPool pool, ConnectionFactory factory) {
103103
this.pool = pool;
104104
this.factory = factory;
105-
}
105+
}
106106

107107
@Override
108-
public void onRebind(HostAndPort target) {
108+
public void onRebind(HostAndPort target, Duration rebindTimeout) {
109109
HostAndPort previous = rebindTarget.getAndSet(target);
110110
if (previous != target) {
111-
this.factory.rebind(target);
111+
this.factory.rebind(target, rebindTimeout);
112112
this.pool.clear();
113113
}
114114
}

src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.Arrays;
99
import java.util.Collections;
1010
import java.util.List;
11+
import java.util.function.Supplier;
1112
import javax.net.ssl.HostnameVerifier;
1213
import javax.net.ssl.SSLContext;
1314
import javax.net.ssl.SSLParameters;
@@ -23,6 +24,7 @@ public class DefaultJedisSocketFactory implements JedisSocketFactory {
2324
Protocol.DEFAULT_PORT);
2425

2526
private volatile HostAndPort hostAndPort = DEFAULT_HOST_AND_PORT;
27+
private volatile Supplier<HostAndPort> hostAndPortSupplier;
2628
private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
2729
private int socketTimeout = Protocol.DEFAULT_TIMEOUT;
2830
private boolean ssl = false;
@@ -33,19 +35,22 @@ public class DefaultJedisSocketFactory implements JedisSocketFactory {
3335
private HostAndPortMapper hostAndPortMapper = null;
3436

3537
public DefaultJedisSocketFactory() {
38+
hostAndPortSupplier = ()->this.hostAndPort;
3639
}
3740

3841
public DefaultJedisSocketFactory(HostAndPort hostAndPort) {
3942
this(hostAndPort, null);
4043
}
4144

4245
public DefaultJedisSocketFactory(JedisClientConfig config) {
43-
this(null, config);
46+
this((HostAndPort) null, config);
47+
hostAndPortSupplier = ()->this.hostAndPort;
4448
}
4549

4650
public DefaultJedisSocketFactory(HostAndPort hostAndPort, JedisClientConfig config) {
4751
if (hostAndPort != null) {
4852
this.hostAndPort = hostAndPort;
53+
hostAndPortSupplier = ()->this.hostAndPort;
4954
}
5055
if (config != null) {
5156
this.connectionTimeout = config.getConnectionTimeoutMillis();
@@ -154,17 +159,55 @@ private Socket createSslSocket(HostAndPort _hostAndPort, Socket socket) throws I
154159
return new SSLSocketWrapper(sslSocket, plainSocket);
155160
}
156161

162+
/**
163+
* Updates the HostAndPort used to create the socket.
164+
* <p>
165+
* Note: if Supplier<HostAndPort> is set it has precedence over provided hostAndPort.
166+
* </p>
167+
* */
157168
public void updateHostAndPort(HostAndPort hostAndPort) {
158169
this.hostAndPort = hostAndPort;
159170
}
160171

172+
173+
/**
174+
* Returns the HostAndPort that is used to create the socket.
175+
* <p>
176+
* Note: if Supplier<HostAndPort> is set it has precedence over HostAndPort.
177+
*</p>
178+
* @return the HostAndPort that is used to create the socket.
179+
*/
161180
public HostAndPort getHostAndPort() {
162181
return this.hostAndPort;
163182
}
164183

184+
185+
/**
186+
* Returns the Supplier<HostAndPort> that is used to create the socket.
187+
* @return the Supplier<HostAndPort> that is used to create the socket.
188+
*/
189+
public Supplier<HostAndPort> getHostAndPortSupplier() {
190+
return this.hostAndPortSupplier;
191+
}
192+
193+
/**
194+
* Sets the Supplier<HostAndPort> that is used to create the socket.
195+
* @param hostAndPortSupplier the Supplier<HostAndPort> that is used to create the socket.
196+
*/
197+
public void setHostAndPortSupplier(Supplier<HostAndPort> hostAndPortSupplier) {
198+
this.hostAndPortSupplier = hostAndPortSupplier;
199+
}
200+
165201
protected HostAndPort getSocketHostAndPort() {
166202
HostAndPortMapper mapper = hostAndPortMapper;
167-
HostAndPort hap = this.hostAndPort;
203+
204+
HostAndPort hap;
205+
if (this.hostAndPortSupplier != null) {
206+
hap = this.hostAndPortSupplier.get();
207+
} else {
208+
hap = this.hostAndPort;
209+
}
210+
168211
if (mapper != null) {
169212
HostAndPort mapped = mapper.getHostAndPort(hap);
170213
if (mapped != null) {

src/main/java/redis/clients/jedis/MaintenanceEventListener.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package redis.clients.jedis;
22

3+
import java.time.Duration;
4+
35
public interface MaintenanceEventListener {
46

57
default void onMigrating() {
@@ -14,6 +16,6 @@ default void onFailOver() {
1416
default void onFailedOver() {
1517
};
1618

17-
default void onRebind(HostAndPort target) {
19+
default void onRebind(HostAndPort target, Duration rebindTimeout) {
1820
};
19-
}
21+
}

src/main/java/redis/clients/jedis/RebindAware.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import redis.clients.jedis.annots.Experimental;
44

5+
import java.time.Duration;
6+
57
/**
68
* Interface for components that support rebinding to a new host and port.
79
* <p>
@@ -23,6 +25,7 @@ public interface RebindAware {
2325
* Components might decide to reject the re-bind request if they are not in a state to support it.
2426
* </p>
2527
* @param newHostAndPort The new host and port to use for new connections
28+
* @param expiry The duration after which the rebind should be considered expired
2629
*/
27-
void rebind(HostAndPort newHostAndPort);
30+
void rebind(HostAndPort newHostAndPort, Duration expiry);
2831
}

0 commit comments

Comments
 (0)