Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions durabletask-azuremanaged/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "durabletask.azuremanaged"
version = "0.3.0"
version = "0.3.1"
description = "Durable Task Python SDK provider implementation for the Azure Durable Task Scheduler"
keywords = [
"durable",
Expand All @@ -26,7 +26,7 @@ requires-python = ">=3.9"
license = {file = "LICENSE"}
readme = "README.md"
dependencies = [
"durabletask>=0.4.0",
"durabletask>=0.4.1",
"azure-identity>=1.19.0"
]

Expand Down
10 changes: 7 additions & 3 deletions durabletask/internal/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import durabletask.internal.orchestrator_service_pb2 as pb


class VersionFailureException(Exception):
pass
def __init__(self, error_details: pb.TaskFailureDetails) -> None:
super().__init__()
self.error_details = error_details


class AbandonOrchestrationError(Exception):
def __init__(self, *args: object) -> None:
super().__init__(*args)
pass
35 changes: 15 additions & 20 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,6 @@ def execute(
)

ctx = _RuntimeOrchestrationContext(instance_id, self._registry)
version_failure = None
try:
# Rebuild local state by replaying old history into the orchestrator function
self._logger.debug(
Expand All @@ -980,23 +979,6 @@ def execute(
for old_event in old_events:
self.process_event(ctx, old_event)

# Process versioning if applicable
execution_started_events = [e.executionStarted for e in old_events if e.HasField("executionStarted")]
# We only check versioning if there are executionStarted events - otherwise, on the first replay when
# ctx.version will be Null, we may invalidate orchestrations early depending on the versioning strategy.
if self._registry.versioning and len(execution_started_events) > 0:
version_failure = self.evaluate_orchestration_versioning(
self._registry.versioning,
ctx.version
)
if version_failure:
self._logger.warning(
f"Orchestration version did not meet worker versioning requirements. "
f"Error action = '{self._registry.versioning.failure_strategy}'. "
f"Version error = '{version_failure}'"
)
raise pe.VersionFailureException

# Get new actions by executing newly received events into the orchestrator function
if self._logger.level <= logging.DEBUG:
summary = _get_new_event_summary(new_events)
Expand All @@ -1009,8 +991,8 @@ def execute(

except pe.VersionFailureException as ex:
if self._registry.versioning and self._registry.versioning.failure_strategy == VersionFailureStrategy.FAIL:
if version_failure:
ctx.set_failed(version_failure)
if ex.error_details:
ctx.set_failed(ex.error_details)
else:
ctx.set_failed(ex)
elif self._registry.versioning and self._registry.versioning.failure_strategy == VersionFailureStrategy.REJECT:
Expand Down Expand Up @@ -1068,6 +1050,19 @@ def process_event(
if event.executionStarted.version:
ctx._version = event.executionStarted.version.value

if self._registry.versioning:
version_failure = self.evaluate_orchestration_versioning(
self._registry.versioning,
ctx.version
)
if version_failure:
self._logger.warning(
f"Orchestration version did not meet worker versioning requirements. "
f"Error action = '{self._registry.versioning.failure_strategy}'. "
f"Version error = '{version_failure}'"
)
raise pe.VersionFailureException(version_failure)

# deserialize the input, if any
input = None
if (
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "durabletask"
version = "0.4.0"
version = "0.4.1"
description = "A Durable Task Client SDK for Python"
keywords = [
"durable",
Expand Down
Loading