-
-
Notifications
You must be signed in to change notification settings - Fork 5.9k
Support Actions concurrency
syntax
#32751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d6a95ce
f713c68
76f58ef
249e315
b012126
91fccb7
2e6032a
1a08b87
9bba25e
e6d25e2
dc86086
8721155
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,10 +35,17 @@ type ActionRunJob struct { | |
RunsOn []string `xorm:"JSON TEXT"` | ||
TaskID int64 // the latest task of the job | ||
Status Status `xorm:"index"` | ||
Started timeutil.TimeStamp | ||
Stopped timeutil.TimeStamp | ||
Created timeutil.TimeStamp `xorm:"created"` | ||
Updated timeutil.TimeStamp `xorm:"updated index"` | ||
|
||
RawConcurrencyGroup string // raw concurrency.group | ||
RawConcurrencyCancel string // raw concurrency.cancel-in-progress | ||
IsConcurrencyEvaluated bool // whether RawConcurrencyGroup have been evaluated, only valid when RawConcurrencyGroup is not empty | ||
ConcurrencyGroup string `xorm:"index"` // evaluated concurrency.group | ||
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress | ||
|
||
Started timeutil.TimeStamp | ||
Stopped timeutil.TimeStamp | ||
Created timeutil.TimeStamp `xorm:"created"` | ||
Updated timeutil.TimeStamp `xorm:"updated index"` | ||
} | ||
|
||
func init() { | ||
|
@@ -197,3 +204,82 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status { | |
return StatusUnknown // it shouldn't happen | ||
} | ||
} | ||
|
||
func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) { | ||
if job.RawConcurrencyGroup == "" { | ||
return false, nil | ||
} | ||
if !job.IsConcurrencyEvaluated { | ||
return false, ErrUnevaluatedConcurrency{ | ||
Group: job.RawConcurrencyGroup, | ||
CancelInProgress: job.RawConcurrencyCancel, | ||
} | ||
} | ||
if job.ConcurrencyGroup == "" || job.ConcurrencyCancel { | ||
return false, nil | ||
} | ||
|
||
concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{ | ||
RepoID: job.RepoID, | ||
ConcurrencyGroup: job.ConcurrencyGroup, | ||
Statuses: []Status{StatusRunning, StatusWaiting}, | ||
}) | ||
if err != nil { | ||
return false, fmt.Errorf("count running and waiting jobs: %w", err) | ||
} | ||
if concurrentJobsNum > 0 { | ||
return true, nil | ||
} | ||
|
||
if err := job.LoadRun(ctx); err != nil { | ||
return false, fmt.Errorf("load run: %w", err) | ||
} | ||
|
||
return ShouldBlockRunByConcurrency(ctx, job.Run) | ||
} | ||
|
||
func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) { | ||
var cancelledJobs []*ActionRunJob | ||
|
||
if job.RawConcurrencyGroup != "" { | ||
if !job.IsConcurrencyEvaluated { | ||
return cancelledJobs, ErrUnevaluatedConcurrency{ | ||
Group: job.RawConcurrencyGroup, | ||
CancelInProgress: job.RawConcurrencyCancel, | ||
} | ||
} | ||
if job.ConcurrencyGroup != "" && job.ConcurrencyCancel { | ||
// cancel previous jobs in the same concurrency group | ||
previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{ | ||
RepoID: job.RepoID, | ||
ConcurrencyGroup: job.ConcurrencyGroup, | ||
Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked}, | ||
}) | ||
if err != nil { | ||
return cancelledJobs, fmt.Errorf("find previous jobs: %w", err) | ||
} | ||
previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID }) | ||
cjs, err := CancelJobs(ctx, previousJobs) | ||
if err != nil { | ||
return cancelledJobs, fmt.Errorf("cancel previous jobs: %w", err) | ||
} | ||
cancelledJobs = append(cancelledJobs, cjs...) | ||
Comment on lines
+252
to
+266
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect that job concurrency key cancels equal workflow concurrency if defined on job level. Based on my experience workflow and job concurrency use the same pool of concurrency keys. (deadlock handling required) and CancelPreviousJobsByRunConcurrency to not be called here (EDIT 2 idk how this PR currently works, might break everything if not replaced) e.g. jobs:
_:
concurrency: test cancels workflow concurrency: test In response of the unexpected behavior I have seen EDIT |
||
} | ||
} | ||
|
||
return cancelledJobs, nil | ||
} | ||
|
||
type ErrUnevaluatedConcurrency struct { | ||
Group string | ||
CancelInProgress string | ||
} | ||
|
||
func IsErrUnevaluatedConcurrency(err error) bool { | ||
_, ok := err.(ErrUnevaluatedConcurrency) | ||
return ok | ||
} | ||
|
||
func (err ErrUnevaluatedConcurrency) Error() string { | ||
return fmt.Sprintf("the raw concurrency [group=%s, cancel-in-progress=%s] is not evaluated", err.Group, err.CancelInProgress) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
xorm:"index"
have effect? For example: are there SQLs likeWHERE concurrency_group=...
without other index? Or should the index be optimized like(repo_id, concurrency_group)
?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About the first, while not an example...
${{ github.run_id }}
, e.g. once job dependencies are ready / before the job is queued this is going to be evaluatedIn my Opinion we might need this for the ActionRun table as well, once we rerun the workflow/job variables in
${{ vars.MY_VAR }}
might be updated.I keep the point 2 unanswered, because I need to understand this as well. repo_id in the index sounds reasonable but could also be not specific enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For question 2, since the scope of
concurrency_group
is the repo, we always userepo_id
when querying withconcurrency_group
. So we can use combined index here.