From 8cf3b61fb9e43c773f3ca975874d0f0e69210d44 Mon Sep 17 00:00:00 2001 From: Jason Song Date: Mon, 21 Aug 2023 22:07:52 +0800 Subject: [PATCH] Add optimistic lock to ActionRun table (#26563) Should fix #26559. How xorm works: https://xorm.io/docs/chapter-06/1.lock/ --------- Co-authored-by: Giteabot --- models/actions/run.go | 15 +++++++++++-- models/actions/run_job.go | 39 ++++++++++++++++++++------------- models/actions/task.go | 8 ------- models/migrations/migrations.go | 2 ++ models/migrations/v1_21/v272.go | 14 ++++++++++++ 5 files changed, 53 insertions(+), 25 deletions(-) create mode 100644 models/migrations/v1_21/v272.go diff --git a/models/actions/run.go b/models/actions/run.go index ab6e319b1..18ed447e8 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -43,6 +43,7 @@ type ActionRun struct { EventPayload string `xorm:"LONGTEXT"` TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow Status Status `xorm:"index"` + Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed Started timeutil.TimeStamp Stopped timeutil.TimeStamp Created timeutil.TimeStamp `xorm:"created"` @@ -332,12 +333,22 @@ func GetRunByIndex(ctx context.Context, repoID, index int64) (*ActionRun, error) return run, nil } +// UpdateRun updates a run. +// It requires the inputted run has Version set. +// It will return error if the version is not matched (it means the run has been changed after loaded). func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error { sess := db.GetEngine(ctx).ID(run.ID) if len(cols) > 0 { sess.Cols(cols...) } - _, err := sess.Update(run) + affected, err := sess.Update(run) + if err != nil { + return err + } + if affected == 0 { + return fmt.Errorf("run has changed") + // It's impossible that the run is not found, since Gitea never deletes runs. + } if run.Status != 0 || util.SliceContains(cols, "status") { if run.RepoID == 0 { @@ -358,7 +369,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error { } } - return err + return nil } type ActionRunIndex db.ResourceIndex diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 0239cc0a8..1da58bb65 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -114,32 +114,41 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col if affected != 0 && util.SliceContains(cols, "status") && job.Status.IsWaiting() { // if the status of job changes to waiting again, increase tasks version. if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { - return affected, err + return 0, err } } if job.RunID == 0 { var err error if job, err = GetRunJobByID(ctx, job.ID); err != nil { - return affected, err + return 0, err } } - jobs, err := GetRunJobsByRunID(ctx, job.RunID) - if err != nil { - return affected, err + { + // Other goroutines may aggregate the status of the run and update it too. + // So we need load the run and its jobs before updating the run. + run, err := GetRunByID(ctx, job.RunID) + if err != nil { + return 0, err + } + jobs, err := GetRunJobsByRunID(ctx, job.RunID) + if err != nil { + return 0, err + } + run.Status = aggregateJobStatus(jobs) + if run.Started.IsZero() && run.Status.IsRunning() { + run.Started = timeutil.TimeStampNow() + } + if run.Stopped.IsZero() && run.Status.IsDone() { + run.Stopped = timeutil.TimeStampNow() + } + if err := UpdateRun(ctx, run, "status", "started", "stopped"); err != nil { + return 0, fmt.Errorf("update run %d: %w", run.ID, err) + } } - runStatus := aggregateJobStatus(jobs) - - run := &ActionRun{ - ID: job.RunID, - Status: runStatus, - } - if runStatus.IsDone() { - run.Stopped = timeutil.TimeStampNow() - } - return affected, UpdateRun(ctx, run) + return affected, nil } func aggregateJobStatus(jobs []*ActionRunJob) Status { diff --git a/models/actions/task.go b/models/actions/task.go index b31afb212..69f52cf08 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -317,14 +317,6 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask return nil, false, nil } - if job.Run.Status.IsWaiting() { - job.Run.Status = StatusRunning - job.Run.Started = now - if err := UpdateRun(ctx, job.Run, "status", "started"); err != nil { - return nil, false, err - } - } - task.Job = job if err := commiter.Commit(); err != nil { diff --git a/models/migrations/migrations.go b/models/migrations/migrations.go index 7a126593d..87c597b57 100644 --- a/models/migrations/migrations.go +++ b/models/migrations/migrations.go @@ -524,6 +524,8 @@ var migrations = []Migration{ NewMigration("Fix PackageProperty typo", v1_21.FixPackagePropertyTypo), // v271 -> v272 NewMigration("Allow archiving labels", v1_21.AddArchivedUnixColumInLabelTable), + // v272 -> v273 + NewMigration("Add Version to ActionRun table", v1_21.AddVersionToActionRunTable), } // GetCurrentDBVersion returns the current db version diff --git a/models/migrations/v1_21/v272.go b/models/migrations/v1_21/v272.go new file mode 100644 index 000000000..a729c49f1 --- /dev/null +++ b/models/migrations/v1_21/v272.go @@ -0,0 +1,14 @@ +// Copyright 2023 The Gitea Authors. All rights reserved. +// SPDX-License-Identifier: MIT + +package v1_21 //nolint +import ( + "xorm.io/xorm" +) + +func AddVersionToActionRunTable(x *xorm.Engine) error { + type ActionRun struct { + Version int `xorm:"version default 0"` + } + return x.Sync(new(ActionRun)) +}