Skip to content

Commit 31aed8d

Browse files
committed
Improve recovery and usage of threads
Schedule reconnection attempts instead of waiting and blocking the thread. Use platform threads for the environment scheduler. Use a common event loop to record topology. Virtual threads do not behave as expected, especially for long-running tasks (e.g. polling a blocking queue for new tasks to process), which makes recovery fails. They will be used only for short-lived task, like polling a future to get the settlement.
1 parent 4074016 commit 31aed8d

22 files changed

+852
-238
lines changed

ci/start-cluster.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ wait_for_message rabbitmq0 "completed with"
1717

1818
docker exec rabbitmq0 rabbitmqctl await_online_nodes 3
1919

20-
docker exec rabbitmq0 rabbitmqctl enable_feature_flag khepri_db
21-
docker exec rabbitmq1 rabbitmqctl enable_feature_flag khepri_db
22-
docker exec rabbitmq2 rabbitmqctl enable_feature_flag khepri_db
20+
docker exec rabbitmq0 rabbitmqctl enable_feature_flag --experimental khepri_db
21+
docker exec rabbitmq1 rabbitmqctl enable_feature_flag --experimental khepri_db
22+
docker exec rabbitmq2 rabbitmqctl enable_feature_flag --experimental khepri_db
2323

2424
docker exec rabbitmq0 rabbitmqctl cluster_status

pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,6 @@
346346
<argLine>${test-arguments}</argLine>
347347
<systemPropertyVariables>
348348
<net.bytebuddy.experimental>true</net.bytebuddy.experimental>
349-
<rabbitmqctl.bin>DOCKER:rabbitmq</rabbitmqctl.bin>
350349
</systemPropertyVariables>
351350
</configuration>
352351
</plugin>

src/main/java/com/rabbitmq/client/amqp/BackOffDelayPolicy.java

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,85 @@ public interface BackOffDelayPolicy {
2525

2626
Duration delay(int recoveryAttempt);
2727

28+
static BackOffDelayPolicy fixedWithInitialDelay(Duration initialDelay, Duration delay) {
29+
return new FixedWithInitialDelayBackOffPolicy(initialDelay, delay);
30+
}
31+
32+
static BackOffDelayPolicy fixedWithInitialDelay(
33+
Duration initialDelay, Duration delay, Duration timeout) {
34+
return new FixedWithInitialDelayAndTimeoutBackOffPolicy(initialDelay, delay, timeout);
35+
}
36+
2837
static BackOffDelayPolicy fixed(Duration delay) {
29-
return new FixedBackOffDelayPolicy(delay);
38+
return new FixedWithInitialDelayBackOffPolicy(delay, delay);
3039
}
3140

32-
class FixedBackOffDelayPolicy implements BackOffDelayPolicy {
41+
final class FixedWithInitialDelayBackOffPolicy implements BackOffDelayPolicy {
3342

43+
private final Duration initialDelay;
3444
private final Duration delay;
3545

36-
private FixedBackOffDelayPolicy(Duration delay) {
46+
private FixedWithInitialDelayBackOffPolicy(Duration initialDelay, Duration delay) {
47+
this.initialDelay = initialDelay;
3748
this.delay = delay;
3849
}
3950

4051
@Override
4152
public Duration delay(int recoveryAttempt) {
42-
return this.delay;
53+
return recoveryAttempt == 0 ? initialDelay : delay;
54+
}
55+
56+
@Override
57+
public String toString() {
58+
return "FixedWithInitialDelayBackOffPolicy{"
59+
+ "initialDelay="
60+
+ initialDelay
61+
+ ", delay="
62+
+ delay
63+
+ '}';
64+
}
65+
}
66+
67+
final class FixedWithInitialDelayAndTimeoutBackOffPolicy implements BackOffDelayPolicy {
68+
69+
private final int attemptLimitBeforeTimeout;
70+
private final BackOffDelayPolicy delegate;
71+
72+
private FixedWithInitialDelayAndTimeoutBackOffPolicy(
73+
Duration initialDelay, Duration delay, Duration timeout) {
74+
this(fixedWithInitialDelay(initialDelay, delay), timeout);
75+
}
76+
77+
private FixedWithInitialDelayAndTimeoutBackOffPolicy(
78+
BackOffDelayPolicy policy, Duration timeout) {
79+
if (timeout.toMillis() < policy.delay(0).toMillis()) {
80+
throw new IllegalArgumentException("Timeout must be longer than initial delay");
81+
}
82+
this.delegate = policy;
83+
// best effort, assume FixedWithInitialDelay-ish policy
84+
Duration initialDelay = policy.delay(0);
85+
Duration delay = policy.delay(1);
86+
long timeoutWithInitialDelay = timeout.toMillis() - initialDelay.toMillis();
87+
this.attemptLimitBeforeTimeout = (int) (timeoutWithInitialDelay / delay.toMillis()) + 1;
88+
}
89+
90+
@Override
91+
public Duration delay(int recoveryAttempt) {
92+
if (recoveryAttempt >= attemptLimitBeforeTimeout) {
93+
return TIMEOUT;
94+
} else {
95+
return delegate.delay(recoveryAttempt);
96+
}
97+
}
98+
99+
@Override
100+
public String toString() {
101+
return "FixedWithInitialDelayAndTimeoutBackOffPolicy{"
102+
+ "attemptLimitBeforeTimeout="
103+
+ attemptLimitBeforeTimeout
104+
+ ", delegate="
105+
+ delegate
106+
+ '}';
43107
}
44108
}
45109
}

0 commit comments

Comments
 (0)