From d956d9628f269a3e3a3f937d26b0ca3ac5313263 Mon Sep 17 00:00:00 2001 From: Zijian Date: Wed, 8 Oct 2025 18:00:43 +0000 Subject: [PATCH] Refactor mutable state to accept currentVersion as input --- .../engineimpl/start_workflow_execution.go | 31 ++++++++++--------- .../execution/mutable_state_builder.go | 14 ++++++--- ...ble_state_builder_methods_activity_test.go | 2 +- ...ate_builder_methods_child_workflow_test.go | 1 + .../execution/mutable_state_builder_test.go | 6 ++-- ...utable_state_decision_task_manager_test.go | 4 +-- .../execution/mutable_state_util_test.go | 2 +- service/history/execution/state_builder.go | 5 ++- service/history/execution/state_rebuilder.go | 1 + service/history/ndc/history_replicator.go | 3 ++ 10 files changed, 43 insertions(+), 26 deletions(-) diff --git a/service/history/engine/engineimpl/start_workflow_execution.go b/service/history/engine/engineimpl/start_workflow_execution.go index 97dc30ed8dd..e32d1c1527e 100644 --- a/service/history/engine/engineimpl/start_workflow_execution.go +++ b/service/history/engine/engineimpl/start_workflow_execution.go @@ -657,12 +657,7 @@ UpdateWorkflowLoop: return nil, err } - // new mutable state - newMutableState, err := e.createMutableState(ctx, domainEntry, workflowExecution.GetRunID(), startRequest) - if err != nil { - return nil, err - } - + var err error if signalWithStartRequest != nil { startRequest, err = getStartRequest(domainID, signalWithStartRequest.SignalWithStartRequest, signalWithStartRequest.PartitionConfig) if err != nil { @@ -670,6 +665,12 @@ UpdateWorkflowLoop: } } + // new mutable state + newMutableState, err := e.createMutableState(ctx, domainEntry, workflowExecution.GetRunID(), startRequest) + if err != nil { + return nil, err + } + err = e.addStartEventsAndTasks( newMutableState, workflowExecution, @@ -847,25 +848,27 @@ func (e *historyEngineImpl) createMutableState( runID string, startRequest *types.HistoryStartWorkflowExecutionRequest, ) (execution.MutableState, error) { + version := domainEntry.GetFailoverVersion() + // TODO(active-active): replace with cluster attributes + if domainEntry.GetReplicationConfig().IsActiveActive() { + res, err := e.shard.GetActiveClusterManager().LookupNewWorkflow(ctx, domainEntry.GetInfo().ID, startRequest.StartRequest.ActiveClusterSelectionPolicy) + if err != nil { + return nil, err + } + version = res.FailoverVersion + } newMutableState := execution.NewMutableStateBuilderWithVersionHistories( e.shard, e.logger, domainEntry, + version, ) if err := newMutableState.SetHistoryTree(runID); err != nil { return nil, err } - if domainEntry.GetReplicationConfig().IsActiveActive() { - res, err := e.shard.GetActiveClusterManager().LookupNewWorkflow(ctx, domainEntry.GetInfo().ID, startRequest.StartRequest.ActiveClusterSelectionPolicy) - if err != nil { - return nil, err - } - newMutableState.UpdateCurrentVersion(res.FailoverVersion, true) - } - return newMutableState, nil } diff --git a/service/history/execution/mutable_state_builder.go b/service/history/execution/mutable_state_builder.go index 3054bdd8a7b..c9c28b731ff 100644 --- a/service/history/execution/mutable_state_builder.go +++ b/service/history/execution/mutable_state_builder.go @@ -181,13 +181,14 @@ func NewMutableStateBuilder( logger log.Logger, domainEntry *cache.DomainCacheEntry, ) MutableState { - return newMutableStateBuilder(shard, logger, domainEntry) + return newMutableStateBuilder(shard, logger, domainEntry, constants.EmptyVersion) } func newMutableStateBuilder( shard shard.Context, logger log.Logger, domainEntry *cache.DomainCacheEntry, + currentVersion int64, ) *mutableStateBuilder { s := &mutableStateBuilder{ updateActivityInfos: make(map[int64]*persistence.ActivityInfo), @@ -217,7 +218,7 @@ func newMutableStateBuilder( pendingSignalRequestedIDs: make(map[string]struct{}), deleteSignalRequestedIDs: make(map[string]struct{}), - currentVersion: domainEntry.GetFailoverVersion(), + currentVersion: currentVersion, hasBufferedEventsInDB: false, stateInDB: persistence.WorkflowStateVoid, nextEventIDInDB: 0, @@ -257,13 +258,17 @@ func newMutableStateBuilder( } // NewMutableStateBuilderWithVersionHistories creates mutable state builder with version history initialized +// NOTE: currentVersion should be the failover version of the workflow, which is derived from domain metadata and +// the active cluster selection policy of the workflow. For passive workflows, the currentVersion will be overridden by the event version, +// so the input doesn't matter for them. func NewMutableStateBuilderWithVersionHistories( shard shard.Context, logger log.Logger, domainEntry *cache.DomainCacheEntry, + currentVersion int64, ) MutableState { - s := newMutableStateBuilder(shard, logger, domainEntry) + s := newMutableStateBuilder(shard, logger, domainEntry, currentVersion) s.versionHistories = persistence.NewVersionHistories(&persistence.VersionHistory{}) return s } @@ -291,7 +296,7 @@ func NewMutableStateBuilderWithVersionHistoriesWithEventV2( domainEntry *cache.DomainCacheEntry, ) MutableState { - msBuilder := NewMutableStateBuilderWithVersionHistories(shard, logger, domainEntry) + msBuilder := NewMutableStateBuilderWithVersionHistories(shard, logger, domainEntry, domainEntry.GetFailoverVersion()) err := msBuilder.UpdateCurrentVersion(version, false) if err != nil { logger.Error("update current version error", tag.Error(err)) @@ -1269,6 +1274,7 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent( e.shard, e.logger, e.domainEntry, + e.domainEntry.GetFailoverVersion(), ).(*mutableStateBuilder) // New mutable state initializes `currentVersion` to domain's failover version. diff --git a/service/history/execution/mutable_state_builder_methods_activity_test.go b/service/history/execution/mutable_state_builder_methods_activity_test.go index 63ffebe9900..8fb6426b082 100644 --- a/service/history/execution/mutable_state_builder_methods_activity_test.go +++ b/service/history/execution/mutable_state_builder_methods_activity_test.go @@ -68,7 +68,7 @@ func testMutableStateBuilder(t *testing.T) *mutableStateBuilder { mockShard.Resource.MatchingClient.EXPECT().AddActivityTask(gomock.Any(), gomock.Any()).Return(&types.AddActivityTaskResponse{}, nil).AnyTimes() mockShard.Resource.DomainCache.EXPECT().GetDomainID(constants.TestDomainName).Return(constants.TestDomainID, nil).AnyTimes() mockShard.Resource.DomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(&cache.DomainCacheEntry{}, nil).AnyTimes() - return newMutableStateBuilder(mockShard, logger, constants.TestLocalDomainEntry) + return newMutableStateBuilder(mockShard, logger, constants.TestLocalDomainEntry, constants.TestLocalDomainEntry.GetFailoverVersion()) } func Test__AddActivityTaskScheduledEvent(t *testing.T) { diff --git a/service/history/execution/mutable_state_builder_methods_child_workflow_test.go b/service/history/execution/mutable_state_builder_methods_child_workflow_test.go index 732c379da17..24830e8d297 100644 --- a/service/history/execution/mutable_state_builder_methods_child_workflow_test.go +++ b/service/history/execution/mutable_state_builder_methods_child_workflow_test.go @@ -739,6 +739,7 @@ func loadMutableState(t *testing.T, ctx *shard.TestContext, state *persistence.W m := newMutableStateBuilder(ctx, log.NewNoop(), domain, + domain.GetFailoverVersion(), ) err := m.Load(context.Background(), state) assert.NoError(t, err) diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index b73e165a63d..24a63c739fd 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -117,7 +117,7 @@ func (s *mutableStateSuite) SetupTest() { s.mockShard.Resource.DomainCache.EXPECT().GetDomainID(constants.TestDomainName).Return(constants.TestDomainID, nil).AnyTimes() - s.msBuilder = newMutableStateBuilder(s.mockShard, s.logger, constants.TestLocalDomainEntry) + s.msBuilder = newMutableStateBuilder(s.mockShard, s.logger, constants.TestLocalDomainEntry, constants.TestLocalDomainEntry.GetFailoverVersion()) } func (s *mutableStateSuite) TearDownTest() { @@ -1956,7 +1956,7 @@ func TestMutableStateBuilder_CopyToPersistence_roundtrip(t *testing.T) { activeClusterManager := activecluster.NewMockManager(ctrl) shardContext.EXPECT().GetActiveClusterManager().Return(activeClusterManager).AnyTimes() - msb := newMutableStateBuilder(shardContext, log.NewNoop(), constants.TestGlobalDomainEntry) + msb := newMutableStateBuilder(shardContext, log.NewNoop(), constants.TestGlobalDomainEntry, constants.TestGlobalDomainEntry.GetFailoverVersion()) msb.Load(context.Background(), execution) @@ -3894,7 +3894,7 @@ func createMSBWithMocks(mockCache *events.MockCache, shardContext *shardCtx.Mock domainEntry = constants.TestGlobalDomainEntry } - msb := newMutableStateBuilder(shardContext, log.NewNoop(), domainEntry) + msb := newMutableStateBuilder(shardContext, log.NewNoop(), domainEntry, domainEntry.GetFailoverVersion()) return msb } diff --git a/service/history/execution/mutable_state_decision_task_manager_test.go b/service/history/execution/mutable_state_decision_task_manager_test.go index 749a868e8f6..7fcae5b4f58 100644 --- a/service/history/execution/mutable_state_decision_task_manager_test.go +++ b/service/history/execution/mutable_state_decision_task_manager_test.go @@ -61,7 +61,7 @@ func TestReplicateDecisionTaskCompletedEvent(t *testing.T) { mockShard.Resource.DomainCache.EXPECT().GetDomainID(constants.TestDomainName).Return(constants.TestDomainID, nil).AnyTimes() m := &mutableStateDecisionTaskManagerImpl{ - msb: newMutableStateBuilder(mockShard, logger, constants.TestLocalDomainEntry), + msb: newMutableStateBuilder(mockShard, logger, constants.TestLocalDomainEntry, constants.TestLocalDomainEntry.GetFailoverVersion()), } eventType := types.EventTypeActivityTaskCompleted e := &types.HistoryEvent{ @@ -77,7 +77,7 @@ func TestReplicateDecisionTaskCompletedEvent(t *testing.T) { assert.NoError(t, err) // test when config is nil - m.msb = newMutableStateBuilder(mockShard, logger, constants.TestLocalDomainEntry) + m.msb = newMutableStateBuilder(mockShard, logger, constants.TestLocalDomainEntry, constants.TestLocalDomainEntry.GetFailoverVersion()) m.msb.config = nil err = m.ReplicateDecisionTaskCompletedEvent(e) assert.NoError(t, err) diff --git a/service/history/execution/mutable_state_util_test.go b/service/history/execution/mutable_state_util_test.go index 704edc7988c..f9d61aacbb8 100644 --- a/service/history/execution/mutable_state_util_test.go +++ b/service/history/execution/mutable_state_util_test.go @@ -343,7 +343,7 @@ func TestCreatePersistenceMutableState(t *testing.T) { mockShardContext.EXPECT().GetDomainCache().Return(mockDomainCache) mockShardContext.EXPECT().GetLogger().Return(logger).AnyTimes() - builder := newMutableStateBuilder(mockShardContext, logger, constants.TestLocalDomainEntry) + builder := newMutableStateBuilder(mockShardContext, logger, constants.TestLocalDomainEntry, constants.TestLocalDomainEntry.GetFailoverVersion()) builder.pendingActivityInfoIDs[0] = &persistence.ActivityInfo{} builder.pendingTimerInfoIDs["some-key"] = &persistence.TimerInfo{} builder.pendingSignalInfoIDs[0] = &persistence.SignalInfo{} diff --git a/service/history/execution/state_builder.go b/service/history/execution/state_builder.go index 59fc502e772..e329bc9d65a 100644 --- a/service/history/execution/state_builder.go +++ b/service/history/execution/state_builder.go @@ -29,6 +29,7 @@ import ( "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/errors" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -488,10 +489,12 @@ func (b *stateBuilderImpl) ApplyEvents( // The length of newRunHistory can be zero in resend case if len(newRunHistory) != 0 { + domainEntry := b.mutableState.GetDomainEntry() newRunMutableStateBuilder = NewMutableStateBuilderWithVersionHistories( b.shard, b.logger, - b.mutableState.GetDomainEntry(), + domainEntry, + constants.EmptyVersion, ) newRunStateBuilder := NewStateBuilder(b.shard, b.logger, newRunMutableStateBuilder) newRunID := event.WorkflowExecutionContinuedAsNewEventAttributes.GetNewExecutionRunID() diff --git a/service/history/execution/state_rebuilder.go b/service/history/execution/state_rebuilder.go index dfcc1e02610..108f2cb0326 100644 --- a/service/history/execution/state_rebuilder.go +++ b/service/history/execution/state_rebuilder.go @@ -210,6 +210,7 @@ func (r *stateRebuilderImpl) initializeBuilders( r.shard, r.logger, domainEntry, + constants.EmptyVersion, ) stateBuilder := NewStateBuilder( r.shard, diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index 2932dfd54a2..c36b5611769 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -29,6 +29,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/constants" "github.com/uber/cadence/common/errors" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -286,6 +287,7 @@ func NewHistoryReplicator( shard, logger, domainEntry, + constants.EmptyVersion, ) }, newReplicationTaskFn: newReplicationTask, @@ -419,6 +421,7 @@ func applyStartEvents( return err } requestID := uuid.New() // requestID used for start workflow execution request. This is not on the history event. + // since it's replicated from the other cluster, we don't care the active cluster policy, because the failover version will be updated after ApplyEvents mutableState := newMutableState(domainEntry, task.getLogger()) stateBuilder := newStateBuilder(mutableState, task.getLogger())