Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions service/history/engine/engineimpl/start_workflow_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,19 +657,20 @@ UpdateWorkflowLoop:
return nil, err
}

// new mutable state
newMutableState, err := e.createMutableState(ctx, domainEntry, workflowExecution.GetRunID(), startRequest)
if err != nil {
return nil, err
}

var err error
if signalWithStartRequest != nil {
startRequest, err = getStartRequest(domainID, signalWithStartRequest.SignalWithStartRequest, signalWithStartRequest.PartitionConfig)
if err != nil {
return nil, err
}
}

// new mutable state
newMutableState, err := e.createMutableState(ctx, domainEntry, workflowExecution.GetRunID(), startRequest)
if err != nil {
return nil, err
}

err = e.addStartEventsAndTasks(
newMutableState,
workflowExecution,
Expand Down Expand Up @@ -847,25 +848,27 @@ func (e *historyEngineImpl) createMutableState(
runID string,
startRequest *types.HistoryStartWorkflowExecutionRequest,
) (execution.MutableState, error) {
version := domainEntry.GetFailoverVersion()
// TODO(active-active): replace with cluster attributes
if domainEntry.GetReplicationConfig().IsActiveActive() {
res, err := e.shard.GetActiveClusterManager().LookupNewWorkflow(ctx, domainEntry.GetInfo().ID, startRequest.StartRequest.ActiveClusterSelectionPolicy)
if err != nil {
return nil, err
}
version = res.FailoverVersion
}

newMutableState := execution.NewMutableStateBuilderWithVersionHistories(
e.shard,
e.logger,
domainEntry,
version,
)

if err := newMutableState.SetHistoryTree(runID); err != nil {
return nil, err
}

if domainEntry.GetReplicationConfig().IsActiveActive() {
res, err := e.shard.GetActiveClusterManager().LookupNewWorkflow(ctx, domainEntry.GetInfo().ID, startRequest.StartRequest.ActiveClusterSelectionPolicy)
if err != nil {
return nil, err
}
newMutableState.UpdateCurrentVersion(res.FailoverVersion, true)
}

return newMutableState, nil
}

Expand Down
14 changes: 10 additions & 4 deletions service/history/execution/mutable_state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,14 @@ func NewMutableStateBuilder(
logger log.Logger,
domainEntry *cache.DomainCacheEntry,
) MutableState {
return newMutableStateBuilder(shard, logger, domainEntry)
return newMutableStateBuilder(shard, logger, domainEntry, constants.EmptyVersion)
}

func newMutableStateBuilder(
shard shard.Context,
logger log.Logger,
domainEntry *cache.DomainCacheEntry,
currentVersion int64,
) *mutableStateBuilder {
s := &mutableStateBuilder{
updateActivityInfos: make(map[int64]*persistence.ActivityInfo),
Expand Down Expand Up @@ -217,7 +218,7 @@ func newMutableStateBuilder(
pendingSignalRequestedIDs: make(map[string]struct{}),
deleteSignalRequestedIDs: make(map[string]struct{}),

currentVersion: domainEntry.GetFailoverVersion(),
currentVersion: currentVersion,
hasBufferedEventsInDB: false,
stateInDB: persistence.WorkflowStateVoid,
nextEventIDInDB: 0,
Expand Down Expand Up @@ -257,13 +258,17 @@ func newMutableStateBuilder(
}

// NewMutableStateBuilderWithVersionHistories creates mutable state builder with version history initialized
// NOTE: currentVersion should be the failover version of the workflow, which is derived from domain metadata and
// the active cluster selection policy of the workflow. For passive workflows, the currentVersion will be overridden by the event version,
// so the input doesn't matter for them.
func NewMutableStateBuilderWithVersionHistories(
shard shard.Context,
logger log.Logger,
domainEntry *cache.DomainCacheEntry,
currentVersion int64,
) MutableState {

s := newMutableStateBuilder(shard, logger, domainEntry)
s := newMutableStateBuilder(shard, logger, domainEntry, currentVersion)
s.versionHistories = persistence.NewVersionHistories(&persistence.VersionHistory{})
return s
}
Expand Down Expand Up @@ -291,7 +296,7 @@ func NewMutableStateBuilderWithVersionHistoriesWithEventV2(
domainEntry *cache.DomainCacheEntry,
) MutableState {

msBuilder := NewMutableStateBuilderWithVersionHistories(shard, logger, domainEntry)
msBuilder := NewMutableStateBuilderWithVersionHistories(shard, logger, domainEntry, domainEntry.GetFailoverVersion())
err := msBuilder.UpdateCurrentVersion(version, false)
if err != nil {
logger.Error("update current version error", tag.Error(err))
Expand Down Expand Up @@ -1269,6 +1274,7 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(
e.shard,
e.logger,
e.domainEntry,
e.domainEntry.GetFailoverVersion(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be updated in a follow-up PR, because I don't want to mess up with the tests

).(*mutableStateBuilder)

// New mutable state initializes `currentVersion` to domain's failover version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func testMutableStateBuilder(t *testing.T) *mutableStateBuilder {
mockShard.Resource.MatchingClient.EXPECT().AddActivityTask(gomock.Any(), gomock.Any()).Return(&types.AddActivityTaskResponse{}, nil).AnyTimes()
mockShard.Resource.DomainCache.EXPECT().GetDomainID(constants.TestDomainName).Return(constants.TestDomainID, nil).AnyTimes()
mockShard.Resource.DomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(&cache.DomainCacheEntry{}, nil).AnyTimes()
return newMutableStateBuilder(mockShard, logger, constants.TestLocalDomainEntry)
return newMutableStateBuilder(mockShard, logger, constants.TestLocalDomainEntry, constants.TestLocalDomainEntry.GetFailoverVersion())
}

func Test__AddActivityTaskScheduledEvent(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ func loadMutableState(t *testing.T, ctx *shard.TestContext, state *persistence.W
m := newMutableStateBuilder(ctx,
log.NewNoop(),
domain,
domain.GetFailoverVersion(),
)
err := m.Load(context.Background(), state)
assert.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions service/history/execution/mutable_state_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *mutableStateSuite) SetupTest() {

s.mockShard.Resource.DomainCache.EXPECT().GetDomainID(constants.TestDomainName).Return(constants.TestDomainID, nil).AnyTimes()

s.msBuilder = newMutableStateBuilder(s.mockShard, s.logger, constants.TestLocalDomainEntry)
s.msBuilder = newMutableStateBuilder(s.mockShard, s.logger, constants.TestLocalDomainEntry, constants.TestLocalDomainEntry.GetFailoverVersion())
}

func (s *mutableStateSuite) TearDownTest() {
Expand Down Expand Up @@ -1956,7 +1956,7 @@ func TestMutableStateBuilder_CopyToPersistence_roundtrip(t *testing.T) {
activeClusterManager := activecluster.NewMockManager(ctrl)
shardContext.EXPECT().GetActiveClusterManager().Return(activeClusterManager).AnyTimes()

msb := newMutableStateBuilder(shardContext, log.NewNoop(), constants.TestGlobalDomainEntry)
msb := newMutableStateBuilder(shardContext, log.NewNoop(), constants.TestGlobalDomainEntry, constants.TestGlobalDomainEntry.GetFailoverVersion())

msb.Load(context.Background(), execution)

Expand Down Expand Up @@ -3894,7 +3894,7 @@ func createMSBWithMocks(mockCache *events.MockCache, shardContext *shardCtx.Mock
domainEntry = constants.TestGlobalDomainEntry
}

msb := newMutableStateBuilder(shardContext, log.NewNoop(), domainEntry)
msb := newMutableStateBuilder(shardContext, log.NewNoop(), domainEntry, domainEntry.GetFailoverVersion())
return msb
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestReplicateDecisionTaskCompletedEvent(t *testing.T) {
mockShard.Resource.DomainCache.EXPECT().GetDomainID(constants.TestDomainName).Return(constants.TestDomainID, nil).AnyTimes()

m := &mutableStateDecisionTaskManagerImpl{
msb: newMutableStateBuilder(mockShard, logger, constants.TestLocalDomainEntry),
msb: newMutableStateBuilder(mockShard, logger, constants.TestLocalDomainEntry, constants.TestLocalDomainEntry.GetFailoverVersion()),
}
eventType := types.EventTypeActivityTaskCompleted
e := &types.HistoryEvent{
Expand All @@ -77,7 +77,7 @@ func TestReplicateDecisionTaskCompletedEvent(t *testing.T) {
assert.NoError(t, err)

// test when config is nil
m.msb = newMutableStateBuilder(mockShard, logger, constants.TestLocalDomainEntry)
m.msb = newMutableStateBuilder(mockShard, logger, constants.TestLocalDomainEntry, constants.TestLocalDomainEntry.GetFailoverVersion())
m.msb.config = nil
err = m.ReplicateDecisionTaskCompletedEvent(e)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion service/history/execution/mutable_state_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestCreatePersistenceMutableState(t *testing.T) {
mockShardContext.EXPECT().GetDomainCache().Return(mockDomainCache)
mockShardContext.EXPECT().GetLogger().Return(logger).AnyTimes()

builder := newMutableStateBuilder(mockShardContext, logger, constants.TestLocalDomainEntry)
builder := newMutableStateBuilder(mockShardContext, logger, constants.TestLocalDomainEntry, constants.TestLocalDomainEntry.GetFailoverVersion())
builder.pendingActivityInfoIDs[0] = &persistence.ActivityInfo{}
builder.pendingTimerInfoIDs["some-key"] = &persistence.TimerInfo{}
builder.pendingSignalInfoIDs[0] = &persistence.SignalInfo{}
Expand Down
5 changes: 4 additions & 1 deletion service/history/execution/state_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/constants"
"github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -488,10 +489,12 @@ func (b *stateBuilderImpl) ApplyEvents(

// The length of newRunHistory can be zero in resend case
if len(newRunHistory) != 0 {
domainEntry := b.mutableState.GetDomainEntry()
newRunMutableStateBuilder = NewMutableStateBuilderWithVersionHistories(
b.shard,
b.logger,
b.mutableState.GetDomainEntry(),
domainEntry,
constants.EmptyVersion,
)
newRunStateBuilder := NewStateBuilder(b.shard, b.logger, newRunMutableStateBuilder)
newRunID := event.WorkflowExecutionContinuedAsNewEventAttributes.GetNewExecutionRunID()
Expand Down
1 change: 1 addition & 0 deletions service/history/execution/state_rebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (r *stateRebuilderImpl) initializeBuilders(
r.shard,
r.logger,
domainEntry,
constants.EmptyVersion,
)
stateBuilder := NewStateBuilder(
r.shard,
Expand Down
3 changes: 3 additions & 0 deletions service/history/ndc/history_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/constants"
"github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -286,6 +287,7 @@ func NewHistoryReplicator(
shard,
logger,
domainEntry,
constants.EmptyVersion,
)
},
newReplicationTaskFn: newReplicationTask,
Expand Down Expand Up @@ -419,6 +421,7 @@ func applyStartEvents(
return err
}
requestID := uuid.New() // requestID used for start workflow execution request. This is not on the history event.
// since it's replicated from the other cluster, we don't care the active cluster policy, because the failover version will be updated after ApplyEvents
mutableState := newMutableState(domainEntry, task.getLogger())
stateBuilder := newStateBuilder(mutableState, task.getLogger())

Expand Down
Loading