Skip to content

Commit 8abd104

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 645420f + d4a3242 commit 8abd104

32 files changed

+836
-181
lines changed

.gitignore

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,8 @@ target
99
/out
1010
dummy
1111
$buildDir
12-
src/main/idls/*
12+
src/main/idls/*
13+
/bin
14+
.classpath
15+
.project
16+
.settings

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Changelog
22

3+
## v2.7.0
4+
- Add ParentClosePolicy to child workflows and also expose parent execution info for child workflows
5+
- Add context propagation
6+
- Fix various bugs around test workflow service and test mutable state implementation
7+
- Use thrift IDLs from uber/cadence-idl repo as a submodule
8+
- Various dependency updates including Docker base image and Gradle wrapper
9+
- Miscellaneous bug fixes
10+
311
## v2.6.3
412
- Add Upsert Search Attributes
513
- Support get search attributes inside workflow

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ Add *cadence-client* as a dependency to your *pom.xml*:
3131
<dependency>
3232
<groupId>com.uber.cadence</groupId>
3333
<artifactId>cadence-client</artifactId>
34-
<version>2.6.3</version>
34+
<version>2.7.0</version>
3535
</dependency>
3636

3737
or to *build.gradle*:
3838

39-
compile group: 'com.uber.cadence', name: 'cadence-client', version: '2.6.3'
39+
compile group: 'com.uber.cadence', name: 'cadence-client', version: '2.7.0'
4040

4141
## Documentation
4242

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ googleJavaFormat {
3737
}
3838

3939
group = 'com.uber.cadence'
40-
version = '2.6.3'
40+
version = '2.7.0'
4141

4242
description = '''Uber Cadence Java Client'''
4343

src/main/idls

Submodule idls updated from 56ca0b5 to 085f956

src/main/java/com/uber/cadence/internal/common/InternalUtils.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
package com.uber.cadence.internal.common;
1919

2020
import com.google.common.base.Defaults;
21+
import com.uber.cadence.DataBlob;
22+
import com.uber.cadence.History;
23+
import com.uber.cadence.HistoryEvent;
24+
import com.uber.cadence.HistoryEventFilterType;
2125
import com.uber.cadence.SearchAttributes;
2226
import com.uber.cadence.TaskList;
2327
import com.uber.cadence.TaskListKind;
@@ -27,10 +31,16 @@
2731
import com.uber.cadence.workflow.WorkflowMethod;
2832
import java.lang.reflect.Method;
2933
import java.nio.ByteBuffer;
34+
import java.util.ArrayList;
35+
import java.util.Arrays;
3036
import java.util.HashMap;
37+
import java.util.List;
3138
import java.util.Map;
3239
import java.util.concurrent.ExecutorService;
3340
import java.util.concurrent.TimeUnit;
41+
import org.apache.thrift.TDeserializer;
42+
import org.apache.thrift.TException;
43+
import org.apache.thrift.TSerializer;
3444

3545
/** Utility functions shared by the implementation code. */
3646
public final class InternalUtils {
@@ -143,6 +153,54 @@ public static SearchAttributes convertMapToSearchAttributes(
143153
return new SearchAttributes().setIndexedFields(mapOfByteBuffer);
144154
}
145155

156+
// This method deserialize the DataBlob data to the HistoriyEvent data
157+
public static History DeserializeFromBlobToHistoryEvents(
158+
List<DataBlob> blobData, HistoryEventFilterType historyEventFilterType) throws TException {
159+
160+
List<HistoryEvent> events = new ArrayList<HistoryEvent>();
161+
for (DataBlob data : blobData) {
162+
History history = new History();
163+
try {
164+
byte[] dataByte = data.getData();
165+
dataByte = Arrays.copyOfRange(dataByte, 1, dataByte.length);
166+
deSerializer.deserialize(history, dataByte);
167+
168+
if (history == null || history.getEvents() == null || history.getEvents().size() == 0) {
169+
return null;
170+
}
171+
} catch (org.apache.thrift.TException err) {
172+
throw new TException("Deserialize blob data to history event failed with unknown error");
173+
}
174+
175+
events.addAll(history.getEvents());
176+
}
177+
178+
if (events.size() > 0 && historyEventFilterType == HistoryEventFilterType.CLOSE_EVENT) {
179+
events = events.subList(events.size() - 1, events.size());
180+
}
181+
182+
return new History().setEvents(events);
183+
}
184+
185+
// This method serializes history event to blob data
186+
public static List<DataBlob> SerializeFromHistoryEventToBlobData(List<HistoryEvent> events) {
187+
List<DataBlob> blobs = new ArrayList<>(events.size());
188+
for (HistoryEvent event : events) {
189+
DataBlob blob = new DataBlob();
190+
try {
191+
blob.setData(serializer.serialize(event));
192+
} catch (org.apache.thrift.TException err) {
193+
throw new RuntimeException("Serialize to blob data failed", err);
194+
}
195+
blobs.add(blob);
196+
}
197+
198+
return blobs;
199+
}
200+
201+
private static final TDeserializer deSerializer = new TDeserializer();
202+
private static final TSerializer serializer = new TSerializer();
203+
146204
/** Prohibit instantiation */
147205
private InternalUtils() {}
148206
}

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ public class WorkflowExecutionUtils {
100100
.setDoNotRetry(BadRequestError.class, EntityNotExistsError.class)
101101
.build();
102102

103+
// Wait period for passive cluster to retry getting workflow result in case of replication delay.
104+
private static final long ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS = 500;
105+
103106
/**
104107
* Returns result of a workflow instance execution or throws an exception if workflow did not
105108
* complete successfully.
@@ -178,7 +181,7 @@ private static byte[] getResultFromCloseEvent(
178181
}
179182

180183
/** Returns an instance closing event, potentially waiting for workflow to complete. */
181-
public static HistoryEvent getInstanceCloseEvent(
184+
private static HistoryEvent getInstanceCloseEvent(
182185
IWorkflowService service,
183186
String domain,
184187
WorkflowExecution workflowExecution,
@@ -191,31 +194,53 @@ public static HistoryEvent getInstanceCloseEvent(
191194
long start = System.currentTimeMillis();
192195
HistoryEvent event;
193196
do {
197+
if (timeout != 0 && System.currentTimeMillis() - start > unit.toMillis(timeout)) {
198+
throw new TimeoutException(
199+
"WorkflowId="
200+
+ workflowExecution.getWorkflowId()
201+
+ ", runId="
202+
+ workflowExecution.getRunId()
203+
+ ", timeout="
204+
+ timeout
205+
+ ", unit="
206+
+ unit);
207+
}
208+
194209
GetWorkflowExecutionHistoryRequest r = new GetWorkflowExecutionHistoryRequest();
195210
r.setDomain(domain);
196211
r.setExecution(workflowExecution);
197212
r.setHistoryEventFilterType(HistoryEventFilterType.CLOSE_EVENT);
198213
r.setNextPageToken(pageToken);
199214
r.setWaitForNewEvent(true);
215+
r.setSkipArchival(true);
200216
try {
201217
response =
202218
Retryer.retryWithResult(retryParameters, () -> service.GetWorkflowExecutionHistory(r));
203219
} catch (EntityNotExistsError e) {
220+
if (e.activeCluster != null
221+
&& e.currentCluster != null
222+
&& !e.activeCluster.equals(e.currentCluster)) {
223+
// Current cluster is passive cluster. Execution might not exist because of replication
224+
// lag. If we are still within timeout, wait for a little bit and retry.
225+
if (timeout != 0
226+
&& System.currentTimeMillis() + ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS - start
227+
> unit.toMillis(timeout)) {
228+
throw e;
229+
}
230+
231+
try {
232+
Thread.sleep(ENTITY_NOT_EXIST_RETRY_WAIT_MILLIS);
233+
} catch (InterruptedException ie) {
234+
// Throw entity not exist here.
235+
throw e;
236+
}
237+
continue;
238+
}
204239
throw e;
205240
} catch (TException e) {
206241
throw CheckedExceptionWrapper.wrap(e);
207242
}
208-
if (timeout != 0 && System.currentTimeMillis() - start > unit.toMillis(timeout)) {
209-
throw new TimeoutException(
210-
"WorkflowId="
211-
+ workflowExecution.getWorkflowId()
212-
+ ", runId="
213-
+ workflowExecution.getRunId()
214-
+ ", timeout="
215-
+ timeout
216-
+ ", unit="
217-
+ unit);
218-
}
243+
219244
pageToken = response.getNextPageToken();
220245
History history = response.getHistory();
221246
if (history != null && history.getEvents().size() > 0) {

src/main/java/com/uber/cadence/internal/metrics/ServiceMethod.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ public class ServiceMethod {
7171
MetricsType.CADENCE_METRICS_PREFIX + "RespondDecisionTaskFailed";
7272
public static final String SIGNAL_WORKFLOW_EXECUTION =
7373
MetricsType.CADENCE_METRICS_PREFIX + "SignalWorkflowExecution";
74+
public static final String RESET_WORKFLOW_EXECUTION =
75+
MetricsType.CADENCE_METRICS_PREFIX + "ResetWorkflowExecution";
7476
public static final String SIGNAL_WITH_START_WORKFLOW_EXECUTION =
7577
MetricsType.CADENCE_METRICS_PREFIX + "SignalWithStartWorkflowExecution";
7678
public static final String START_WORKFLOW_EXECUTION =

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@
3939
import java.util.concurrent.CancellationException;
4040
import java.util.concurrent.TimeUnit;
4141
import java.util.concurrent.locks.Condition;
42-
import java.util.concurrent.locks.Lock;
43-
import java.util.concurrent.locks.ReentrantLock;
4442
import java.util.function.BiConsumer;
4543
import java.util.function.BiFunction;
4644
import java.util.function.Consumer;
@@ -88,8 +86,7 @@ public void accept(Exception reason) {
8886
private final Map<String, ExecuteLocalActivityParameters> unstartedLaTasks = new HashMap<>();
8987
private final ReplayDecider replayDecider;
9088
private final DataConverter dataConverter;
91-
private final Lock laTaskLock = new ReentrantLock();
92-
private final Condition taskCondition = laTaskLock.newCondition();
89+
private final Condition taskCondition;
9390
private boolean taskCompleted = false;
9491

9592
ClockDecisionContext(
@@ -98,6 +95,7 @@ public void accept(Exception reason) {
9895
ReplayDecider replayDecider,
9996
DataConverter dataConverter) {
10097
this.decisions = decisions;
98+
this.taskCondition = replayDecider.getLock().newCondition();
10199
mutableSideEffectHandler =
102100
new MarkerHandler(decisions, MUTABLE_SIDE_EFFECT_MARKER_NAME, () -> replaying);
103101
versionHandler = new MarkerHandler(decisions, VERSION_MARKER_NAME, () -> replaying);
@@ -272,13 +270,9 @@ private void handleLocalActivityMarker(MarkerRecordedEventAttributes attributes)
272270
completionHandle.accept(marker.getResult(), failure);
273271
setReplayCurrentTimeMilliseconds(marker.getReplayTimeMillis());
274272

275-
laTaskLock.lock();
276-
try {
277-
taskCompleted = true;
278-
taskCondition.signal();
279-
} finally {
280-
laTaskLock.unlock();
281-
}
273+
taskCompleted = true;
274+
// This method is already called under the lock.
275+
taskCondition.signal();
282276
}
283277
}
284278

@@ -311,7 +305,8 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
311305
}
312306

313307
private void validateVersion(String changeID, int version, int minSupported, int maxSupported) {
314-
if (version < minSupported || version > maxSupported) {
308+
if ((version < minSupported || version > maxSupported)
309+
&& version != WorkflowInternal.DEFAULT_VERSION) {
315310
throw new Error(
316311
String.format(
317312
"Version %d of changeID %s is not supported. Supported version is between %d and %d.",
@@ -341,7 +336,7 @@ boolean startUnstartedLaTasks(Duration maxWaitAllowed) {
341336
laTaskPoller.apply(
342337
new LocalActivityWorker.Task(
343338
params,
344-
replayDecider,
339+
replayDecider.getLocalActivityCompletionSink(),
345340
replayDecider.getDecisionTimeoutSeconds(),
346341
this::currentTimeMillis,
347342
this::replayTimeUpdatedAtMillis),
@@ -359,14 +354,10 @@ int numPendingLaTasks() {
359354
}
360355

361356
void awaitTaskCompletion(Duration duration) throws InterruptedException {
362-
laTaskLock.lock();
363-
try {
364-
while (!taskCompleted) {
365-
taskCondition.awaitNanos(duration.toNanos());
366-
}
367-
taskCompleted = false;
368-
} finally {
369-
laTaskLock.unlock();
357+
while (!taskCompleted) {
358+
// This call is called from already locked object
359+
taskCondition.awaitNanos(duration.toNanos());
370360
}
361+
taskCompleted = false;
371362
}
372363
}

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ public interface DecisionContext extends ReplayAware {
4343

4444
WorkflowExecution getWorkflowExecution();
4545

46-
// TODO: Add to Cadence
47-
// com.uber.cadence.WorkflowExecution getParentWorkflowExecution();
46+
WorkflowExecution getParentWorkflowExecution();
4847

4948
WorkflowType getWorkflowType();
5049

@@ -87,11 +86,7 @@ public interface DecisionContext extends ReplayAware {
8786
*/
8887
Map<String, Object> getPropagatedContexts();
8988

90-
/**
91-
* Returns the set of configured context propagators
92-
*
93-
* @return
94-
*/
89+
/** Returns the set of configured context propagators */
9590
List<ContextPropagator> getContextPropagators();
9691

9792
/**

0 commit comments

Comments
 (0)