Skip to content

Commit 326fc0f

Browse files
davsclausf2par0
authored andcommitted
CAMEL-21901: camel-salesforce: Fix using fallbackReplyId when salesfo… (apache#17696)
* CAMEL-21901: camel-salesforce: Fix using fallbackReplyId when salesforce cannot subscribe due to invalid initial replyId from a preconfigured initialReplyIdMap.
1 parent c47378d commit 326fc0f

File tree

3 files changed

+44
-5
lines changed

3 files changed

+44
-5
lines changed

components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@
3636

3737
/**
3838
* The Bayeux extension for replay
39-
*
40-
* @author hal.hildebrand
41-
* @since API v37.0
4239
*/
4340
public class ReplayExtension implements Extension {
4441
private static final String EXTENSION_NAME = "replay";
@@ -52,6 +49,11 @@ public void setReplayIdIfAbsent(final String channelName, final long replayId) {
5249
dataMap.putIfAbsent(channelName, replayId);
5350
}
5451

52+
public void setReplayId(final String channelName, final long replayId) {
53+
// force setting with a specific value
54+
dataMap.put(channelName, replayId);
55+
}
56+
5557
@Override
5658
public boolean rcv(ClientSession session, Message.Mutable message) {
5759
Long replayId = getReplayId(message);

components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,10 @@ private void subscriptionFailed(StreamingApiConsumer firstConsumer, Message mess
228228
} else if (error.matches(INVALID_REPLAY_ID_PATTERN)) {
229229
abort = false;
230230
long fallBackReplayId
231-
= ((SalesforceEndpoint) firstConsumer.getEndpoint()).getConfiguration().getFallBackReplayId();
231+
= firstConsumer.getEndpoint().getConfiguration().getFallBackReplayId();
232232
LOG.warn(error);
233233
LOG.warn("Falling back to replayId {} for channel {}", fallBackReplayId, channelName);
234-
REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, fallBackReplayId);
234+
REPLAY_EXTENSION.setReplayId(channelName, fallBackReplayId);
235235
for (var consumer : consumers) {
236236
subscribe(consumer);
237237
}

components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.camel.component.salesforce.internal.streaming;
1818

19+
import java.lang.reflect.Field;
1920
import java.util.Collections;
2021
import java.util.HashMap;
2122
import java.util.Map;
@@ -28,10 +29,12 @@
2829
import org.apache.camel.component.salesforce.SalesforceLoginConfig;
2930
import org.apache.camel.component.salesforce.api.SalesforceException;
3031
import org.apache.camel.component.salesforce.internal.SalesforceSession;
32+
import org.apache.camel.util.ReflectionHelper;
3133
import org.cometd.client.BayeuxClient;
3234
import org.hamcrest.MatcherAssert;
3335
import org.junit.jupiter.api.Test;
3436

37+
import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.REPLAY_EXTENSION;
3538
import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor;
3639
import static org.assertj.core.api.Assertions.assertThat;
3740
import static org.cometd.client.transport.ClientTransport.MAX_NETWORK_DELAY_OPTION;
@@ -187,4 +190,38 @@ public void defaultLongPollingTimeoutShouldBeGreaterThanSalesforceTimeout() thro
187190
MatcherAssert.assertThat(longPollingTimeout, instanceOf(Integer.class));
188191
MatcherAssert.assertThat((Integer) longPollingTimeout, greaterThan(110000));
189192
}
193+
194+
@Test
195+
public void fallbackReplyId() throws Exception {
196+
final SalesforceEndpointConfig componentConfig = new SalesforceEndpointConfig();
197+
componentConfig.setFallBackReplayId(-2L);
198+
199+
final SalesforceEndpointConfig endpointConfig = new SalesforceEndpointConfig();
200+
endpointConfig.setDefaultReplayId(-1L);
201+
endpointConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-1", 2L));
202+
203+
final SalesforceComponent component = mock(SalesforceComponent.class);
204+
when(component.getConfig()).thenReturn(componentConfig);
205+
206+
final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class);
207+
when(endpoint.getReplayId()).thenReturn(null);
208+
when(endpoint.getComponent()).thenReturn(component);
209+
when(endpoint.getConfiguration()).thenReturn(endpointConfig);
210+
211+
assertEquals(Optional.of(2L), determineReplayIdFor(endpoint, "my-topic-1"),
212+
"Expecting replayId for `my-topic-1` to be 2, from initial reply id map");
213+
214+
REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 3L);
215+
REPLAY_EXTENSION.setReplayIdIfAbsent("my-topic-1", 4L);
216+
217+
// should still be 3L
218+
Field f = REPLAY_EXTENSION.getClass().getDeclaredField("dataMap");
219+
Map m = (Map) ReflectionHelper.getField(f, REPLAY_EXTENSION);
220+
assertEquals(3L, m.get("my-topic-1"));
221+
222+
// there is some subscription error due to INVALID_REPLAY_ID_PATTERN so we force setting another reply id
223+
REPLAY_EXTENSION.setReplayId("my-topic-1", -2L);
224+
assertEquals(-2L, m.get("my-topic-1"));
225+
}
226+
190227
}

0 commit comments

Comments
 (0)