Skip to content

[automatic failover] Implement HealtStatusManager + weighted endpoints #4189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public final CommandObject<String> ping() {
return PING_COMMAND_OBJECT;
}

public final CommandObject<String> echo(String msg) {
return new CommandObject<>(commandArguments(ECHO).add(msg), BuilderFactory.STRING);
}

private final CommandObject<String> FLUSHALL_COMMAND_OBJECT = new CommandObject<>(commandArguments(FLUSHALL), BuilderFactory.STRING);

public final CommandObject<String> flushAll() {
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/redis/clients/jedis/HostAndPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import java.io.Serializable;

public class HostAndPort implements Serializable {
import redis.clients.jedis.mcf.Endpoint;

public class HostAndPort implements Serializable, Endpoint {

private static final long serialVersionUID = -519876229978427751L;

Expand All @@ -14,10 +16,12 @@ public HostAndPort(String host, int port) {
this.port = port;
}

@Override
public String getHost() {
return host;
}

@Override
public int getPort() {
return port;
}
Expand Down
133 changes: 123 additions & 10 deletions src/main/java/redis/clients/jedis/MultiClusterClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisValidationException;
import redis.clients.jedis.mcf.ConnectionFailoverException;
import redis.clients.jedis.mcf.EchoStrategy;
import redis.clients.jedis.mcf.HealthCheckStrategy;

/**
* @author Allen Terleto (aterleto)
Expand All @@ -31,6 +33,19 @@
@Experimental
public final class MultiClusterClientConfig {

/**
* Interface for creating HealthCheckStrategy instances for specific endpoints
*/
public static interface StrategySupplier {
/**
* Creates a HealthCheckStrategy for the given endpoint.
* @param hostAndPort the endpoint to create a strategy for
* @param jedisClientConfig the client configuration, may be null for implementations that don't need it
* @return a HealthCheckStrategy instance
*/
HealthCheckStrategy get(HostAndPort hostAndPort, JedisClientConfig jedisClientConfig);
}

private static final int RETRY_MAX_ATTEMPTS_DEFAULT = 3;
private static final int RETRY_WAIT_DURATION_DEFAULT = 500; // measured in milliseconds
private static final int RETRY_WAIT_DURATION_EXPONENTIAL_BACKOFF_MULTIPLIER_DEFAULT = 2;
Expand Down Expand Up @@ -129,6 +144,14 @@ public final class MultiClusterClientConfig {

private List<Class<? extends Throwable>> fallbackExceptionList;

//////////// Failover Config ////////////

/** Whether to retry failed commands during failover */
private boolean retryOnFailover = false;

/** Whether failback is enabled */
private boolean failback = false;

public MultiClusterClientConfig(ClusterConfig[] clusterConfigs) {
this.clusterConfigs = clusterConfigs;
}
Expand Down Expand Up @@ -193,13 +216,23 @@ public List<Class<? extends Throwable>> getFallbackExceptionList() {
return fallbackExceptionList;
}

public boolean isRetryOnFailover() {
return retryOnFailover;
}

public boolean isFailback() {
return failback;
}

public static class ClusterConfig {

private int priority;
private HostAndPort hostAndPort;
private JedisClientConfig clientConfig;
private GenericObjectPoolConfig<Connection> connectionPoolConfig;

private float weight = 1.0f;
private StrategySupplier healthCheckStrategySupplier;

public ClusterConfig(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this.hostAndPort = hostAndPort;
this.clientConfig = clientConfig;
Expand All @@ -212,25 +245,92 @@ public ClusterConfig(HostAndPort hostAndPort, JedisClientConfig clientConfig,
this.connectionPoolConfig = connectionPoolConfig;
}

public int getPriority() {
return priority;
}

private void setPriority(int priority) {
this.priority = priority;
private ClusterConfig(Builder builder) {
this.hostAndPort = builder.hostAndPort;
this.clientConfig = builder.clientConfig;
this.connectionPoolConfig = builder.connectionPoolConfig;
this.weight = builder.weight;
this.healthCheckStrategySupplier = builder.healthCheckStrategySupplier;
}

public HostAndPort getHostAndPort() {
return hostAndPort;
}

public static Builder builder(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
return new Builder(hostAndPort, clientConfig);
}

public JedisClientConfig getJedisClientConfig() {
return clientConfig;
}

public GenericObjectPoolConfig<Connection> getConnectionPoolConfig() {
return connectionPoolConfig;
}

public float getWeight() {
return weight;
}

public StrategySupplier getHealthCheckStrategySupplier() {
return healthCheckStrategySupplier;
}

public static class Builder {
private HostAndPort hostAndPort;
private JedisClientConfig clientConfig;
private GenericObjectPoolConfig<Connection> connectionPoolConfig;

private float weight = 1.0f;
private StrategySupplier healthCheckStrategySupplier = EchoStrategy.DEFAULT;
private boolean healthCheckEnabled = true;

public Builder(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this.hostAndPort = hostAndPort;
this.clientConfig = clientConfig;
}

public Builder connectionPoolConfig(GenericObjectPoolConfig<Connection> connectionPoolConfig) {
this.connectionPoolConfig = connectionPoolConfig;
return this;
}

public Builder weight(float weight) {
this.weight = weight;
return this;
}

public Builder healthCheckStrategySupplier(StrategySupplier healthCheckStrategySupplier) {
if (healthCheckStrategySupplier == null) {
throw new IllegalArgumentException("healthCheckStrategySupplier must not be null");
}
this.healthCheckStrategySupplier = healthCheckStrategySupplier;
return this;
}

public Builder healthCheckStrategy(HealthCheckStrategy healthCheckStrategy) {
if (healthCheckStrategy == null) {
throw new IllegalArgumentException("healthCheckStrategy must not be null");
}
this.healthCheckStrategySupplier = (hostAndPort, jedisClientConfig) -> healthCheckStrategy;
return this;
}

public Builder healthCheckEnabled(boolean healthCheckEnabled) {
this.healthCheckEnabled = healthCheckEnabled;
if (!healthCheckEnabled) {
this.healthCheckStrategySupplier = null;
} else if (healthCheckStrategySupplier == null) {
this.healthCheckStrategySupplier = EchoStrategy.DEFAULT;
}
return this;
}

public ClusterConfig build() {
return new ClusterConfig(this);
}
}
}

public static class Builder {
Expand All @@ -253,14 +353,14 @@ public static class Builder {
private List<Class> circuitBreakerIgnoreExceptionList = null;
private List<Class<? extends Throwable>> fallbackExceptionList = FALLBACK_EXCEPTIONS_DEFAULT;

private boolean retryOnFailover = false;
private boolean failback = false;

public Builder(ClusterConfig[] clusterConfigs) {

if (clusterConfigs == null || clusterConfigs.length < 1) throw new JedisValidationException(
"ClusterClientConfigs are required for MultiClusterPooledConnectionProvider");

for (int i = 0; i < clusterConfigs.length; i++)
clusterConfigs[i].setPriority(i + 1);

this.clusterConfigs = clusterConfigs;
}

Expand Down Expand Up @@ -348,6 +448,16 @@ public Builder fallbackExceptionList(List<Class<? extends Throwable>> fallbackEx
return this;
}

public Builder retryOnFailover(boolean retryOnFailover) {
this.retryOnFailover = retryOnFailover;
return this;
}

public Builder failback(boolean failback) {
this.failback = failback;
return this;
}

public MultiClusterClientConfig build() {
MultiClusterClientConfig config = new MultiClusterClientConfig(this.clusterConfigs);

Expand All @@ -373,6 +483,9 @@ public MultiClusterClientConfig build() {

config.fallbackExceptionList = this.fallbackExceptionList;

config.retryOnFailover = this.retryOnFailover;
config.failback = this.failback;

return config;
}
}
Expand Down
19 changes: 5 additions & 14 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import redis.clients.jedis.json.Path2;
import redis.clients.jedis.json.JsonObjectMapper;
import redis.clients.jedis.mcf.CircuitBreakerCommandExecutor;
import redis.clients.jedis.mcf.FailoverOptions;
import redis.clients.jedis.mcf.MultiClusterPipeline;
import redis.clients.jedis.mcf.MultiClusterTransaction;
import redis.clients.jedis.params.*;
Expand Down Expand Up @@ -238,19 +237,7 @@ public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTo
*/
@Experimental
public UnifiedJedis(MultiClusterPooledConnectionProvider provider) {
this(new CircuitBreakerCommandExecutor(provider, FailoverOptions.builder().build()), provider);
}

/**
* Constructor which supports multiple cluster/database endpoints each with their own isolated connection pool.
* <p>
* With this Constructor users can seamlessly failover to Disaster Recovery (DR), Backup, and Active-Active cluster(s)
* by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs
* <p>
*/
@Experimental
public UnifiedJedis(MultiClusterPooledConnectionProvider provider, FailoverOptions failoverOptions) {
this(new CircuitBreakerCommandExecutor(provider, failoverOptions), provider);
this(new CircuitBreakerCommandExecutor(provider), provider);
}

/**
Expand Down Expand Up @@ -354,6 +341,10 @@ public String ping() {
return checkAndBroadcastCommand(commandObjects.ping());
}

public String echo(String string) {
return executeCommand(commandObjects.echo(string));
}

public String flushDB() {
return checkAndBroadcastCommand(commandObjects.flushDB());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@
@Experimental
public class CircuitBreakerCommandExecutor extends CircuitBreakerFailoverBase implements CommandExecutor {

private final FailoverOptions options;

public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider, FailoverOptions options) {
public CircuitBreakerCommandExecutor(MultiClusterPooledConnectionProvider provider) {
super(provider);
this.options = options != null ? options : FailoverOptions.builder().build();
}

@Override
Expand All @@ -50,8 +47,7 @@ private <T> T handleExecuteCommand(CommandObject<T> commandObject, Cluster clust
try (Connection connection = cluster.getConnection()) {
return connection.executeCommand(commandObject);
} catch (Exception e) {

if (retryOnFailover() && !isActiveCluster(cluster)
if (cluster.retryOnFailover() && !isActiveCluster(cluster)
&& isCircuitBreakerTrackedException(e, cluster.getCircuitBreaker())) {
throw new ConnectionFailoverException(
"Command failed during failover: " + cluster.getCircuitBreaker().getName(), e);
Expand All @@ -65,10 +61,6 @@ private boolean isCircuitBreakerTrackedException(Exception e, CircuitBreaker cb)
return cb.getCircuitBreakerConfig().getRecordExceptionPredicate().test(e);
}

private boolean retryOnFailover() {
return options.isRetryOnFailover();
}

private boolean isActiveCluster(Cluster cluster) {
Cluster activeCluster = provider.getCluster();
return activeCluster != null && activeCluster.equals(cluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ protected void clusterFailover(CircuitBreaker circuitBreaker) {
lock.lock();

try {
// Check state to handle race conditions since incrementActiveMultiClusterIndex() is
// Check state to handle race conditions since iterateActiveCluster() is
// non-idempotent
if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) {

Expand All @@ -48,25 +48,26 @@ protected void clusterFailover(CircuitBreaker circuitBreaker) {
// To recover/transition from this forced state the user will need to manually failback
circuitBreaker.transitionToForcedOpenState();

// Incrementing the activeMultiClusterIndex will allow subsequent calls to the
// executeCommand()
// to use the next cluster's connection pool - according to the configuration's
// prioritization/order
int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex();
// Iterating the active cluster will allow subsequent calls to the executeCommand() to use the next
// cluster's connection pool - according to the configuration's prioritization/order/weight
// int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex1();
provider.iterateActiveCluster();

// Implementation is optionally provided during configuration. Typically, used for
// activeMultiClusterIndex persistence or custom logging
provider.runClusterFailoverPostProcessor(activeMultiClusterIndex);
provider.runClusterFailoverPostProcessor(provider.getCluster());
}

// Once the priority list is exhausted only a manual failback can open the circuit breaker so
// all subsequent operations will fail
else if (provider.isLastClusterCircuitBreakerForcedOpen()) {
// this check relies on the fact that many failover attempts can hit with the same CB,
// only the first one will trigger a failover, and make the CB FORCED_OPEN.
// when the rest reaches here, the active cluster is already the next one, and should be different than
// active CB. If its the same one and there are no more clusters to failover to, then throw an exception
else if (circuitBreaker == provider.getCluster().getCircuitBreaker() && !provider.canIterateOnceMore()) {
throw new JedisConnectionException(
"Cluster/database endpoint could not failover since the MultiClusterClientConfig was not "
+ "provided with an additional cluster/database endpoint according to its prioritized sequence. "
+ "If applicable, consider failing back OR restarting with an available cluster/database endpoint");
}
// Ignore exceptions since we are already in a failure state
} finally {
lock.unlock();
}
Expand Down
Loading
Loading