Skip to content

Commit 9187e24

Browse files
Kguswoggivo
andauthored
Fix JedisBroadcastException in functionLoadReplace for Redis Cluster (#4219)
* Fix JedisBroadcastException in functionLoadReplace * Clean up PRIMARY_ONLY_COMMANDS initialization * Fix JedisBroadcastException in FUNCTION commands for cluster * Broadcast to primary nodes only PR #3306 introduces broadcasting of commands like FUNCTION DELETE, FUNCTION FLUSH, FUNCTION KILL, FUNCTION RESTORE ... to all nodes of the cluster. This leads to error when command is executed on non-writable (replica) node. This commit introduces a fix to broadcast the commands only to primary nodes from the cluster. * format --------- Co-authored-by: Ivo Gaydazhiev <ivo.gaydazhiev@redis.com>
1 parent f1a12f3 commit 9187e24

File tree

10 files changed

+457
-12
lines changed

10 files changed

+457
-12
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@
348348
<include>src/test/java/redis/clients/jedis/commands/jedis/ClusterStreamsCommandsTest.java</include>
349349
<include>src/test/java/redis/clients/jedis/commands/jedis/PooledStreamsCommandsTest.java</include>
350350
<include>src/test/java/redis/clients/jedis/resps/StreamEntryDeletionResultTest.java</include>
351+
<include>**/*FunctionCommandsTest*</include>
351352
</includes>
352353
</configuration>
353354
<executions>

src/main/java/redis/clients/jedis/JedisClusterInfoCache.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class JedisClusterInfoCache {
3939
private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class);
4040

4141
private final Map<String, ConnectionPool> nodes = new HashMap<>();
42+
private final Map<String, ConnectionPool> primaryNodesCache = new HashMap<>();
4243
private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS];
4344
private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS];
4445
private final List<ConnectionPool>[] replicaSlots;
@@ -176,6 +177,7 @@ public void discoverClusterNodesAndSlots(Connection jedis) {
176177
HostAndPort targetNode = generateHostAndPort(hostInfos);
177178
setupNodeIfNotExist(targetNode);
178179
if (i == MASTER_NODE_INDEX) {
180+
primaryNodesCache.put(getNodeKey(targetNode), getNode(targetNode));
179181
assignSlotsToNode(slotNums, targetNode);
180182
} else if (clientConfig.isReadOnlyForRedisClusterReplicas()) {
181183
assignSlotsToReplicaNode(slotNums, targetNode);
@@ -425,6 +427,26 @@ public Map<String, ConnectionPool> getNodes() {
425427
}
426428
}
427429

430+
public Map<String, ConnectionPool> getPrimaryNodes() {
431+
r.lock();
432+
try {
433+
return new HashMap<>(primaryNodesCache);
434+
} finally {
435+
r.unlock();
436+
}
437+
}
438+
439+
public List<ConnectionPool> getShuffledPrimaryNodesPool() {
440+
r.lock();
441+
try {
442+
List<ConnectionPool> pools = new ArrayList<>(primaryNodesCache.values());
443+
Collections.shuffle(pools);
444+
return pools;
445+
} finally {
446+
r.unlock();
447+
}
448+
}
449+
428450
public List<ConnectionPool> getShuffledNodesPool() {
429451
r.lock();
430452
try {
@@ -475,6 +497,7 @@ private void resetNodes() {
475497
}
476498
}
477499
nodes.clear();
500+
primaryNodesCache.clear();
478501
}
479502

480503
public void close() {

src/main/java/redis/clients/jedis/executors/ClusterCommandExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void close() {
4545

4646
@Override
4747
public final <T> T broadcastCommand(CommandObject<T> commandObject) {
48-
Map<String, ConnectionPool> connectionMap = provider.getConnectionMap();
48+
Map<String, ConnectionPool> connectionMap = provider.getPrimaryNodesConnectionMap();
4949

5050
boolean isErrored = false;
5151
T reply = null;

src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ public Map<String, ConnectionPool> getNodes() {
112112
return cache.getNodes();
113113
}
114114

115+
public Map<String, ConnectionPool> getPrimaryNodes() {
116+
return cache.getPrimaryNodes();
117+
}
118+
115119
public HostAndPort getNode(int slot) {
116120
return slot >= 0 ? cache.getSlotNode(slot) : null;
117121
}
@@ -136,7 +140,7 @@ public Connection getConnection() {
136140
// In antirez's redis-rb-cluster implementation, getRandomConnection always return
137141
// valid connection (able to ping-pong) or exception if all connections are invalid
138142

139-
List<ConnectionPool> pools = cache.getShuffledNodesPool();
143+
List<ConnectionPool> pools = cache.getShuffledPrimaryNodesPool();
140144

141145
JedisException suppressed = null;
142146
for (ConnectionPool pool : pools) {
@@ -205,8 +209,15 @@ public Connection getReplicaConnectionFromSlot(int slot) {
205209
return getConnectionFromSlot(slot);
206210
}
207211

212+
208213
@Override
209214
public Map<String, ConnectionPool> getConnectionMap() {
210215
return Collections.unmodifiableMap(getNodes());
211216
}
217+
218+
@Override
219+
public Map<String, ConnectionPool> getPrimaryNodesConnectionMap() {
220+
return Collections.unmodifiableMap(getPrimaryNodes());
221+
}
222+
212223
}

src/main/java/redis/clients/jedis/providers/ConnectionProvider.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,9 @@ public interface ConnectionProvider extends AutoCloseable {
1515
final Connection c = getConnection();
1616
return Collections.singletonMap(c.toString(), c);
1717
}
18+
19+
default Map<?, ?> getPrimaryNodesConnectionMap() {
20+
final Connection c = getConnection();
21+
return Collections.singletonMap(c.toString(), c);
22+
}
1823
}

src/test/java/redis/clients/jedis/ClusterCommandExecutorTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.mockito.InOrder;
2020
import org.mockito.Mockito;
2121
import org.mockito.invocation.InvocationOnMock;
22-
import org.mockito.stubbing.Answer;
2322

2423
import redis.clients.jedis.exceptions.JedisAskDataException;
2524
import redis.clients.jedis.exceptions.JedisClusterOperationException;

src/test/java/redis/clients/jedis/JedisClusterInfoCacheTest.java

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,16 @@
1515
import java.util.stream.Collectors;
1616

1717
import static org.hamcrest.MatcherAssert.assertThat;
18+
import static org.hamcrest.Matchers.aMapWithSize;
19+
import static org.hamcrest.Matchers.equalTo;
20+
import static org.hamcrest.Matchers.hasEntry;
1821
import static org.hamcrest.Matchers.hasItem;
1922
import static org.junit.jupiter.api.Assertions.assertEquals;
2023
import static org.junit.jupiter.api.Assertions.assertNotNull;
2124
import static org.junit.jupiter.api.Assertions.assertNull;
2225
import static org.mockito.ArgumentMatchers.argThat;
2326
import static org.mockito.Mockito.when;
27+
import static redis.clients.jedis.JedisClusterInfoCache.getNodeKey;
2428
import static redis.clients.jedis.Protocol.Command.CLUSTER;
2529
import static redis.clients.jedis.util.CommandArgumentMatchers.commandWithArgs;
2630

@@ -49,7 +53,7 @@ public void testReplicaNodeRemovalAndRediscovery() {
4953

5054
// Mock the cluster slots responses
5155
when(mockConnection.executeCommand(argThat(commandWithArgs(CLUSTER, "SLOTS")))).thenReturn(
52-
masterReplicaSlotsResponse()).thenReturn(masterOnlySlotsResponse())
56+
masterReplicaSlotsResponse(MASTER_HOST, REPLICA_1_HOST)).thenReturn(masterOnlySlotsResponse())
5357
.thenReturn(masterReplica2SlotsResponse());
5458

5559
// Initial discovery with one master and one replica (replica-1)
@@ -78,7 +82,7 @@ public void testResetWithReplicaSlots() {
7882

7983
// Mock the cluster slots responses
8084
when(mockConnection.executeCommand(argThat(commandWithArgs(CLUSTER, "SLOTS")))).thenReturn(
81-
masterReplicaSlotsResponse());
85+
masterReplicaSlotsResponse(MASTER_HOST, REPLICA_1_HOST));
8286

8387
// Initial discovery
8488
cache.discoverClusterNodesAndSlots(mockConnection);
@@ -94,10 +98,68 @@ public void testResetWithReplicaSlots() {
9498
assertReplicasAvailable(cache, REPLICA_1_HOST);
9599
}
96100

97-
private List<Object> masterReplicaSlotsResponse() {
101+
@Test
102+
public void getPrimaryNodesAfterReplicaNodeRemovalAndRediscovery() {
103+
// Create client config with read-only replicas enabled
104+
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
105+
.readOnlyForRedisClusterReplicas().build();
106+
107+
Set<HostAndPort> startNodes = new HashSet<>();
108+
startNodes.add(MASTER_HOST);
109+
110+
JedisClusterInfoCache cache = new JedisClusterInfoCache(clientConfig, startNodes);
111+
112+
// Mock the cluster slots responses
113+
when(mockConnection.executeCommand(argThat(commandWithArgs(CLUSTER, "SLOTS")))).thenReturn(
114+
masterReplicaSlotsResponse(MASTER_HOST, REPLICA_1_HOST)).thenReturn(masterOnlySlotsResponse())
115+
.thenReturn(masterReplica2SlotsResponse());
116+
117+
// Initial discovery with one master and one replica (replica-1)
118+
cache.discoverClusterNodesAndSlots(mockConnection);
119+
assertThat(cache.getPrimaryNodes(),aMapWithSize(1));
120+
assertThat(cache.getPrimaryNodes(),
121+
hasEntry(equalTo(getNodeKey(MASTER_HOST)), equalTo(cache.getNode(MASTER_HOST))));
122+
123+
// Simulate rediscovery - master only
124+
cache.discoverClusterNodesAndSlots(mockConnection);
125+
assertThat( cache.getPrimaryNodes(),aMapWithSize(1));
126+
assertThat(cache.getPrimaryNodes(),
127+
hasEntry(equalTo(getNodeKey(MASTER_HOST)), equalTo(cache.getNode(MASTER_HOST))));
128+
}
129+
130+
@Test
131+
public void getPrimaryNodesAfterMasterReplicaFailover() {
132+
// Create client config with read-only replicas enabled
133+
JedisClientConfig clientConfig = DefaultJedisClientConfig.builder()
134+
.readOnlyForRedisClusterReplicas().build();
135+
136+
Set<HostAndPort> startNodes = new HashSet<>();
137+
startNodes.add(MASTER_HOST);
138+
139+
JedisClusterInfoCache cache = new JedisClusterInfoCache(clientConfig, startNodes);
140+
141+
// Mock the cluster slots responses
142+
when(mockConnection.executeCommand(argThat(commandWithArgs(CLUSTER, "SLOTS"))))
143+
.thenReturn(masterReplicaSlotsResponse(MASTER_HOST, REPLICA_1_HOST))
144+
.thenReturn(masterReplicaSlotsResponse(REPLICA_1_HOST, MASTER_HOST));
145+
146+
// Initial discovery with one master and one replica (replica-1)
147+
cache.discoverClusterNodesAndSlots(mockConnection);
148+
assertThat(cache.getPrimaryNodes(),aMapWithSize(1));
149+
assertThat(cache.getPrimaryNodes(),
150+
hasEntry(equalTo(getNodeKey(MASTER_HOST)), equalTo(cache.getNode(MASTER_HOST))));
151+
152+
// Simulate rediscovery - master only
153+
cache.discoverClusterNodesAndSlots(mockConnection);
154+
assertThat( cache.getPrimaryNodes(),aMapWithSize(1));
155+
assertThat(cache.getPrimaryNodes(),
156+
hasEntry(equalTo(getNodeKey(REPLICA_1_HOST)), equalTo(cache.getNode(REPLICA_1_HOST))));
157+
}
158+
159+
private List<Object> masterReplicaSlotsResponse(HostAndPort masterHost, HostAndPort replicaHost) {
98160
return createClusterSlotsResponse(
99-
new SlotRange.Builder(0, 16383).master(MASTER_HOST, "master-id-1")
100-
.replica(REPLICA_1_HOST, "replica-id-1").build());
161+
new SlotRange.Builder(0, 16383).master(masterHost, masterHost.toString() + "-id")
162+
.replica(replicaHost, replicaHost.toString() + "-id").build());
101163
}
102164

103165
private List<Object> masterOnlySlotsResponse() {

0 commit comments

Comments
 (0)