@@ -82,6 +82,10 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider
82
82
83
83
private HealthStatusManager healthStatusManager = new HealthStatusManager ();
84
84
85
+ // Store retry and circuit breaker configs for dynamic cluster addition/removal
86
+ private RetryConfig retryConfig ;
87
+ private CircuitBreakerConfig circuitBreakerConfig ;
88
+
85
89
public MultiClusterPooledConnectionProvider (MultiClusterClientConfig multiClusterClientConfig ) {
86
90
87
91
if (multiClusterClientConfig == null ) throw new JedisValidationException (
@@ -102,7 +106,7 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste
102
106
if (retryIgnoreExceptionList != null )
103
107
retryConfigBuilder .ignoreExceptions (retryIgnoreExceptionList .stream ().toArray (Class []::new ));
104
108
105
- RetryConfig retryConfig = retryConfigBuilder .build ();
109
+ this . retryConfig = retryConfigBuilder .build ();
106
110
107
111
////////////// Configure Circuit Breaker ////////////////////
108
112
@@ -127,57 +131,140 @@ public MultiClusterPooledConnectionProvider(MultiClusterClientConfig multiCluste
127
131
if (circuitBreakerIgnoreExceptionList != null ) circuitBreakerConfigBuilder
128
132
.ignoreExceptions (circuitBreakerIgnoreExceptionList .stream ().toArray (Class []::new ));
129
133
130
- CircuitBreakerConfig circuitBreakerConfig = circuitBreakerConfigBuilder .build ();
134
+ this . circuitBreakerConfig = circuitBreakerConfigBuilder .build ();
131
135
132
136
////////////// Configure Cluster Map ////////////////////
133
137
134
138
ClusterConfig [] clusterConfigs = multiClusterClientConfig .getClusterConfigs ();
135
139
for (ClusterConfig config : clusterConfigs ) {
136
- GenericObjectPoolConfig <Connection > poolConfig = config .getConnectionPoolConfig ();
140
+ addClusterInternal (config );
141
+ }
137
142
138
- String clusterId = "cluster:" + config .getFailoverOptions ().getWeight () + ":" + config .getHostAndPort ();
143
+ // selecting activeCluster with configuration values.
144
+ // all health status would be HEALTHY at this point
145
+ activeCluster = findWeightedHealthyClusterToIterate ().getValue ();
146
+
147
+ for (Endpoint endpoint : multiClusterMap .keySet ()) {
148
+ healthStatusManager .registerListener (endpoint , this ::handleStatusChange );
149
+ }
150
+ /// --- ///
139
151
140
- Retry retry = RetryRegistry .of (retryConfig ).retry (clusterId );
152
+ this .fallbackExceptionList = multiClusterClientConfig .getFallbackExceptionList ();
153
+ }
141
154
142
- Retry .EventPublisher retryPublisher = retry .getEventPublisher ();
143
- retryPublisher .onRetry (event -> log .warn (String .valueOf (event )));
144
- retryPublisher .onError (event -> log .error (String .valueOf (event )));
155
+ /**
156
+ * Adds a new cluster endpoint to the provider.
157
+ * @param clusterConfig the configuration for the new cluster
158
+ * @throws JedisValidationException if the endpoint already exists
159
+ */
160
+ public void add (ClusterConfig clusterConfig ) {
161
+ if (clusterConfig == null ) {
162
+ throw new JedisValidationException ("ClusterConfig must not be null" );
163
+ }
164
+
165
+ Endpoint endpoint = clusterConfig .getHostAndPort ();
166
+ if (multiClusterMap .containsKey (endpoint )) {
167
+ throw new JedisValidationException ("Endpoint " + endpoint + " already exists in the provider" );
168
+ }
169
+
170
+ activeClusterIndexLock .lock ();
171
+ try {
172
+ addClusterInternal (clusterConfig );
173
+ healthStatusManager .registerListener (endpoint , this ::handleStatusChange );
174
+ } finally {
175
+ activeClusterIndexLock .unlock ();
176
+ }
177
+ }
178
+
179
+ /**
180
+ * Removes a cluster endpoint from the provider.
181
+ * @param endpoint the endpoint to remove
182
+ * @throws JedisValidationException if the endpoint doesn't exist or is the last remaining endpoint
183
+ */
184
+ public void remove (Endpoint endpoint ) {
185
+ if (endpoint == null ) {
186
+ throw new JedisValidationException ("Endpoint must not be null" );
187
+ }
145
188
146
- CircuitBreaker circuitBreaker = CircuitBreakerRegistry .of (circuitBreakerConfig ).circuitBreaker (clusterId );
189
+ if (!multiClusterMap .containsKey (endpoint )) {
190
+ throw new JedisValidationException ("Endpoint " + endpoint + " does not exist in the provider" );
191
+ }
147
192
148
- CircuitBreaker .EventPublisher circuitBreakerEventPublisher = circuitBreaker .getEventPublisher ();
149
- circuitBreakerEventPublisher .onCallNotPermitted (event -> log .error (String .valueOf (event )));
150
- circuitBreakerEventPublisher .onError (event -> log .error (String .valueOf (event )));
151
- circuitBreakerEventPublisher .onFailureRateExceeded (event -> log .error (String .valueOf (event )));
152
- circuitBreakerEventPublisher .onSlowCallRateExceeded (event -> log .error (String .valueOf (event )));
153
- circuitBreakerEventPublisher .onStateTransition (event -> log .warn (String .valueOf (event )));
193
+ if (multiClusterMap .size () < 2 ) {
194
+ throw new JedisValidationException ("Cannot remove the last remaining endpoint" );
195
+ }
154
196
155
- ConnectionPool pool ;
156
- if (poolConfig != null ) {
157
- pool = new ConnectionPool (config .getHostAndPort (), config .getJedisClientConfig (), poolConfig );
158
- } else {
159
- pool = new ConnectionPool (config .getHostAndPort (), config .getJedisClientConfig ());
197
+ activeClusterIndexLock .lock ();
198
+ try {
199
+ Cluster clusterToRemove = multiClusterMap .get (endpoint );
200
+ boolean isActiveCluster = (activeCluster == clusterToRemove );
201
+
202
+ if (isActiveCluster ) {
203
+ log .info ("Active cluster is being removed. Finding a new active cluster..." );
204
+
205
+ // If we removed the active cluster, find a new one
206
+ if (isActiveCluster ) {
207
+ Map .Entry <Endpoint , Cluster > candidateCluster = findWeightedHealthyClusterToIterate ();
208
+ if (candidateCluster != null ) {
209
+ setActiveCluster (candidateCluster .getValue (), true );
210
+ }
211
+ }
160
212
}
161
- Cluster cluster = new Cluster (pool , retry , circuitBreaker , config .getFailoverOptions ());
162
- multiClusterMap .put (config .getHostAndPort (), cluster );
163
213
164
- StrategySupplier strategySupplier = config .getFailoverOptions ().getStrategySupplier ();
165
- if (strategySupplier != null ) {
166
- HealthCheckStrategy hcs = strategySupplier .get (config .getHostAndPort (), config .getJedisClientConfig ());
167
- healthStatusManager .add (config .getHostAndPort (), hcs );
214
+ // Remove from health status manager first
215
+ healthStatusManager .unregisterListener (endpoint , this ::handleStatusChange );
216
+ healthStatusManager .remove (endpoint );
217
+
218
+ // Remove from cluster map
219
+ multiClusterMap .remove (endpoint );
220
+
221
+ // Close the cluster resources
222
+ if (clusterToRemove != null ) {
223
+ clusterToRemove .getConnectionPool ().close ();
168
224
}
225
+ } finally {
226
+ activeClusterIndexLock .unlock ();
169
227
}
228
+ }
170
229
171
- // selecting activeCluster with configuration values.
172
- // all health status would be HEALTHY at this point
173
- activeCluster = findWeightedHealthyCluster ().getValue ();
230
+ /**
231
+ * Internal method to add a cluster configuration. This method is not thread-safe and should be called within
232
+ * appropriate locks.
233
+ */
234
+ private void addClusterInternal (ClusterConfig config ) {
235
+ GenericObjectPoolConfig <Connection > poolConfig = config .getConnectionPoolConfig ();
174
236
175
- for (Endpoint endpoint : multiClusterMap .keySet ()) {
176
- healthStatusManager .registerListener (endpoint , this ::handleStatusChange );
237
+ String clusterId = "cluster:" + config .getFailoverOptions ().getWeight () + ":" + config .getHostAndPort ();
238
+
239
+ Retry retry = RetryRegistry .of (retryConfig ).retry (clusterId );
240
+
241
+ Retry .EventPublisher retryPublisher = retry .getEventPublisher ();
242
+ retryPublisher .onRetry (event -> log .warn (String .valueOf (event )));
243
+ retryPublisher .onError (event -> log .error (String .valueOf (event )));
244
+
245
+ CircuitBreaker circuitBreaker = CircuitBreakerRegistry .of (circuitBreakerConfig ).circuitBreaker (clusterId );
246
+
247
+ CircuitBreaker .EventPublisher circuitBreakerEventPublisher = circuitBreaker .getEventPublisher ();
248
+ circuitBreakerEventPublisher .onCallNotPermitted (event -> log .error (String .valueOf (event )));
249
+ circuitBreakerEventPublisher .onError (event -> log .error (String .valueOf (event )));
250
+ circuitBreakerEventPublisher .onFailureRateExceeded (event -> log .error (String .valueOf (event )));
251
+ circuitBreakerEventPublisher .onSlowCallRateExceeded (event -> log .error (String .valueOf (event )));
252
+ circuitBreakerEventPublisher .onStateTransition (event -> log .warn (String .valueOf (event )));
253
+
254
+ ConnectionPool pool ;
255
+ if (poolConfig != null ) {
256
+ pool = new ConnectionPool (config .getHostAndPort (), config .getJedisClientConfig (), poolConfig );
257
+ } else {
258
+ pool = new ConnectionPool (config .getHostAndPort (), config .getJedisClientConfig ());
177
259
}
178
- /// --- ///
260
+ Cluster cluster = new Cluster (pool , retry , circuitBreaker , config .getFailoverOptions ());
261
+ multiClusterMap .put (config .getHostAndPort (), cluster );
179
262
180
- this .fallbackExceptionList = multiClusterClientConfig .getFallbackExceptionList ();
263
+ StrategySupplier strategySupplier = config .getFailoverOptions ().getStrategySupplier ();
264
+ if (strategySupplier != null ) {
265
+ HealthCheckStrategy hcs = strategySupplier .get (config .getHostAndPort (), config .getJedisClientConfig ());
266
+ healthStatusManager .add (config .getHostAndPort (), hcs );
267
+ }
181
268
}
182
269
183
270
private void handleStatusChange (HealthStatusChangeEvent eventArgs ) {
@@ -196,7 +283,7 @@ private void handleStatusChange(HealthStatusChangeEvent eventArgs) {
196
283
if (newStatus .isHealthy ()) {
197
284
if (clusterWithHealthChange .isFailbackEnabled () && activeCluster != clusterWithHealthChange ) {
198
285
// lets check if weighted switching is possible
199
- Map .Entry <Endpoint , Cluster > failbackCluster = findWeightedFailbackCluster ();
286
+ Map .Entry <Endpoint , Cluster > failbackCluster = findWeightedHealthyClusterToIterate ();
200
287
if (failbackCluster == clusterWithHealthChange
201
288
&& clusterWithHealthChange .getWeight () > activeCluster .getWeight ()) {
202
289
setActiveCluster (clusterWithHealthChange , false );
@@ -208,7 +295,7 @@ private void handleStatusChange(HealthStatusChangeEvent eventArgs) {
208
295
}
209
296
210
297
public Endpoint iterateActiveCluster () {
211
- Map .Entry <Endpoint , Cluster > clusterToIterate = findWeightedHealthyCluster ();
298
+ Map .Entry <Endpoint , Cluster > clusterToIterate = findWeightedHealthyClusterToIterate ();
212
299
if (clusterToIterate == null ) {
213
300
throw new JedisConnectionException (
214
301
"Cluster/database endpoint could not failover since the MultiClusterClientConfig was not "
@@ -225,16 +312,15 @@ public Endpoint iterateActiveCluster() {
225
312
private static Predicate <Map .Entry <Endpoint , Cluster >> filterByHealth = c -> c .getValue ().isHealthy ();
226
313
private static Predicate <Map .Entry <Endpoint , Cluster >> filterByFailback = c -> c .getValue ().isHealthy ();
227
314
228
- private Map .Entry <Endpoint , Cluster > findWeightedHealthyCluster () {
229
- Cluster current = activeCluster ;
230
- return multiClusterMap .entrySet ().stream ().filter (filterByHealth ).filter (entry -> entry .getValue () != current )
231
- .max (maxByWeight ).orElse (null );
232
- }
315
+ // private Map.Entry<Endpoint, Cluster> findWeightedHealthyCluster() {
316
+ // Cluster current = activeCluster;
317
+ // return multiClusterMap.entrySet().stream().filter(filterByHealth).filter(entry -> entry.getValue() != current)
318
+ // .max(maxByWeight).orElse(null);
319
+ // }
233
320
234
- private Map .Entry <Endpoint , Cluster > findWeightedFailbackCluster () {
235
- Cluster current = activeCluster ;
321
+ private Map .Entry <Endpoint , Cluster > findWeightedHealthyClusterToIterate () {
236
322
return multiClusterMap .entrySet ().stream ().filter (filterByHealth ).filter (filterByFailback )
237
- .filter (entry -> entry .getValue () != current ).max (maxByWeight ).orElse (null );
323
+ .filter (entry -> entry .getValue () != activeCluster ).max (maxByWeight ).orElse (null );
238
324
}
239
325
240
326
/**
@@ -360,7 +446,7 @@ public CircuitBreaker getClusterCircuitBreaker(int multiClusterIndex) {
360
446
* manually failback to an available cluster
361
447
*/
362
448
public boolean canIterateOnceMore () {
363
- Map .Entry <Endpoint , Cluster > e = findWeightedHealthyCluster ();
449
+ Map .Entry <Endpoint , Cluster > e = findWeightedHealthyClusterToIterate ();
364
450
return e != null ;
365
451
}
366
452
0 commit comments