From dcdb4873c8d77a444526fad5b1c8e705fdfe149d Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Mon, 6 Dec 2021 15:19:28 +0800 Subject: [PATCH] Move repo archiver to models/repo (#17913) * Move repo archiver to models/repo * Move archiver service into services/repository/ * Fix imports * Fix test * Fix test --- models/repo.go | 51 +------- models/repo/archiver.go | 109 ++++++++++++++++++ models/repo/main_test.go | 1 + models/repo_archiver.go | 75 +----------- routers/init.go | 2 +- routers/web/repo/repo.go | 18 +-- services/cron/tasks_basic.go | 7 +- services/cron/tasks_extended.go | 3 +- services/repository/archive.go | 20 ---- .../{ => repository}/archiver/archiver.go | 89 +++++++++++--- .../archiver/archiver_test.go | 2 +- 11 files changed, 205 insertions(+), 172 deletions(-) create mode 100644 models/repo/archiver.go delete mode 100644 services/repository/archive.go rename services/{ => repository}/archiver/archiver.go (72%) rename services/{ => repository}/archiver/archiver_test.go (98%) diff --git a/models/repo.go b/models/repo.go index 67879fefd..4f6b1c346 100644 --- a/models/repo.go +++ b/models/repo.go @@ -1605,19 +1605,18 @@ func DeleteRepository(doer *user_model.User, uid, repoID int64) error { } // Remove archives - var archives []*RepoArchiver + var archives []*repo_model.RepoArchiver if err = sess.Where("repo_id=?", repoID).Find(&archives); err != nil { return err } var archivePaths = make([]string, 0, len(archives)) for _, v := range archives { - v.Repo = repo p, _ := v.RelativePath() archivePaths = append(archivePaths, p) } - if _, err := sess.Delete(&RepoArchiver{RepoID: repoID}); err != nil { + if _, err := sess.Delete(&repo_model.RepoArchiver{RepoID: repoID}); err != nil { return err } @@ -1824,52 +1823,6 @@ func GetPrivateRepositoryCount(u *user_model.User) (int64, error) { return getPrivateRepositoryCount(db.GetEngine(db.DefaultContext), u) } -// DeleteOldRepositoryArchives deletes old repository archives. -func DeleteOldRepositoryArchives(ctx context.Context, olderThan time.Duration) error { - log.Trace("Doing: ArchiveCleanup") - - for { - var archivers []RepoArchiver - err := db.GetEngine(db.DefaultContext).Where("created_unix < ?", time.Now().Add(-olderThan).Unix()). - Asc("created_unix"). - Limit(100). - Find(&archivers) - if err != nil { - log.Trace("Error: ArchiveClean: %v", err) - return err - } - - for _, archiver := range archivers { - if err := deleteOldRepoArchiver(ctx, &archiver); err != nil { - return err - } - } - if len(archivers) < 100 { - break - } - } - - log.Trace("Finished: ArchiveCleanup") - return nil -} - -var delRepoArchiver = new(RepoArchiver) - -func deleteOldRepoArchiver(ctx context.Context, archiver *RepoArchiver) error { - p, err := archiver.RelativePath() - if err != nil { - return err - } - _, err = db.GetEngine(db.DefaultContext).ID(archiver.ID).Delete(delRepoArchiver) - if err != nil { - return err - } - if err := storage.RepoArchives.Delete(p); err != nil { - log.Error("delete repo archive file failed: %v", err) - } - return nil -} - type repoChecker struct { querySQL, correctSQL string desc string diff --git a/models/repo/archiver.go b/models/repo/archiver.go new file mode 100644 index 000000000..cee6013ca --- /dev/null +++ b/models/repo/archiver.go @@ -0,0 +1,109 @@ +// Copyright 2021 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package repo + +import ( + "context" + "fmt" + "time" + + "code.gitea.io/gitea/models/db" + "code.gitea.io/gitea/modules/git" + "code.gitea.io/gitea/modules/timeutil" + + "xorm.io/builder" +) + +// ArchiverStatus represents repo archive status +type ArchiverStatus int + +// enumerate all repo archive statuses +const ( + ArchiverGenerating = iota // the archiver is generating + ArchiverReady // it's ready +) + +// RepoArchiver represents all archivers +type RepoArchiver struct { //revive:disable-line:exported + ID int64 `xorm:"pk autoincr"` + RepoID int64 `xorm:"index unique(s)"` + Type git.ArchiveType `xorm:"unique(s)"` + Status ArchiverStatus + CommitID string `xorm:"VARCHAR(40) unique(s)"` + CreatedUnix timeutil.TimeStamp `xorm:"INDEX NOT NULL created"` +} + +func init() { + db.RegisterModel(new(RepoArchiver)) +} + +// RelativePath returns relative path +func (archiver *RepoArchiver) RelativePath() (string, error) { + return fmt.Sprintf("%d/%s/%s.%s", archiver.RepoID, archiver.CommitID[:2], archiver.CommitID, archiver.Type.String()), nil +} + +var delRepoArchiver = new(RepoArchiver) + +// DeleteRepoArchiver delete archiver +func DeleteRepoArchiver(ctx context.Context, archiver *RepoArchiver) error { + _, err := db.GetEngine(db.DefaultContext).ID(archiver.ID).Delete(delRepoArchiver) + return err +} + +// GetRepoArchiver get an archiver +func GetRepoArchiver(ctx context.Context, repoID int64, tp git.ArchiveType, commitID string) (*RepoArchiver, error) { + var archiver RepoArchiver + has, err := db.GetEngine(ctx).Where("repo_id=?", repoID).And("`type`=?", tp).And("commit_id=?", commitID).Get(&archiver) + if err != nil { + return nil, err + } + if has { + return &archiver, nil + } + return nil, nil +} + +// AddRepoArchiver adds an archiver +func AddRepoArchiver(ctx context.Context, archiver *RepoArchiver) error { + _, err := db.GetEngine(ctx).Insert(archiver) + return err +} + +// UpdateRepoArchiverStatus updates archiver's status +func UpdateRepoArchiverStatus(ctx context.Context, archiver *RepoArchiver) error { + _, err := db.GetEngine(ctx).ID(archiver.ID).Cols("status").Update(archiver) + return err +} + +// DeleteAllRepoArchives deletes all repo archives records +func DeleteAllRepoArchives() error { + _, err := db.GetEngine(db.DefaultContext).Where("1=1").Delete(new(RepoArchiver)) + return err +} + +// FindRepoArchiversOption represents an archiver options +type FindRepoArchiversOption struct { + db.ListOptions + OlderThan time.Duration +} + +func (opts FindRepoArchiversOption) toConds() builder.Cond { + var cond = builder.NewCond() + if opts.OlderThan > 0 { + cond = cond.And(builder.Lt{"created_unix": time.Now().Add(-opts.OlderThan).Unix()}) + } + return cond +} + +// FindRepoArchives find repo archivers +func FindRepoArchives(opts FindRepoArchiversOption) ([]*RepoArchiver, error) { + var archivers = make([]*RepoArchiver, 0, opts.PageSize) + start, limit := opts.GetSkipTake() + err := db.GetEngine(db.DefaultContext).Where(opts.toConds()). + Asc("created_unix"). + Limit(limit, start). + Find(&archivers) + return archivers, err +} diff --git a/models/repo/main_test.go b/models/repo/main_test.go index ac62df9d9..aa960bf13 100644 --- a/models/repo/main_test.go +++ b/models/repo/main_test.go @@ -14,5 +14,6 @@ import ( func TestMain(m *testing.M) { unittest.MainTest(m, filepath.Join("..", ".."), "attachment.yml", + "repo_archiver.yml", ) } diff --git a/models/repo_archiver.go b/models/repo_archiver.go index 647a3b47b..2369a1610 100644 --- a/models/repo_archiver.go +++ b/models/repo_archiver.go @@ -5,44 +5,13 @@ package models import ( - "context" - "fmt" - "code.gitea.io/gitea/models/db" - "code.gitea.io/gitea/modules/git" - "code.gitea.io/gitea/modules/timeutil" + + repo_model "code.gitea.io/gitea/models/repo" ) -// RepoArchiverStatus represents repo archive status -type RepoArchiverStatus int - -// enumerate all repo archive statuses -const ( - RepoArchiverGenerating = iota // the archiver is generating - RepoArchiverReady // it's ready -) - -// RepoArchiver represents all archivers -type RepoArchiver struct { - ID int64 `xorm:"pk autoincr"` - RepoID int64 `xorm:"index unique(s)"` - Repo *Repository `xorm:"-"` - Type git.ArchiveType `xorm:"unique(s)"` - Status RepoArchiverStatus - CommitID string `xorm:"VARCHAR(40) unique(s)"` - CreatedUnix timeutil.TimeStamp `xorm:"INDEX NOT NULL created"` -} - -func init() { - db.RegisterModel(new(RepoArchiver)) -} - -// LoadRepo loads repository -func (archiver *RepoArchiver) LoadRepo() (*Repository, error) { - if archiver.Repo != nil { - return archiver.Repo, nil - } - +// LoadArchiverRepo loads repository +func LoadArchiverRepo(archiver *repo_model.RepoArchiver) (*Repository, error) { var repo Repository has, err := db.GetEngine(db.DefaultContext).ID(archiver.RepoID).Get(&repo) if err != nil { @@ -55,39 +24,3 @@ func (archiver *RepoArchiver) LoadRepo() (*Repository, error) { } return &repo, nil } - -// RelativePath returns relative path -func (archiver *RepoArchiver) RelativePath() (string, error) { - return fmt.Sprintf("%d/%s/%s.%s", archiver.RepoID, archiver.CommitID[:2], archiver.CommitID, archiver.Type.String()), nil -} - -// GetRepoArchiver get an archiver -func GetRepoArchiver(ctx context.Context, repoID int64, tp git.ArchiveType, commitID string) (*RepoArchiver, error) { - var archiver RepoArchiver - has, err := db.GetEngine(ctx).Where("repo_id=?", repoID).And("`type`=?", tp).And("commit_id=?", commitID).Get(&archiver) - if err != nil { - return nil, err - } - if has { - return &archiver, nil - } - return nil, nil -} - -// AddRepoArchiver adds an archiver -func AddRepoArchiver(ctx context.Context, archiver *RepoArchiver) error { - _, err := db.GetEngine(ctx).Insert(archiver) - return err -} - -// UpdateRepoArchiverStatus updates archiver's status -func UpdateRepoArchiverStatus(ctx context.Context, archiver *RepoArchiver) error { - _, err := db.GetEngine(ctx).ID(archiver.ID).Cols("status").Update(archiver) - return err -} - -// DeleteAllRepoArchives deletes all repo archives records -func DeleteAllRepoArchives() error { - _, err := db.GetEngine(db.DefaultContext).Where("1=1").Delete(new(RepoArchiver)) - return err -} diff --git a/routers/init.go b/routers/init.go index 4cce7992d..2143ab476 100644 --- a/routers/init.go +++ b/routers/init.go @@ -35,7 +35,6 @@ import ( "code.gitea.io/gitea/routers/common" "code.gitea.io/gitea/routers/private" web_routers "code.gitea.io/gitea/routers/web" - "code.gitea.io/gitea/services/archiver" "code.gitea.io/gitea/services/auth" "code.gitea.io/gitea/services/auth/source/oauth2" "code.gitea.io/gitea/services/cron" @@ -44,6 +43,7 @@ import ( mirror_service "code.gitea.io/gitea/services/mirror" pull_service "code.gitea.io/gitea/services/pull" repo_service "code.gitea.io/gitea/services/repository" + "code.gitea.io/gitea/services/repository/archiver" "code.gitea.io/gitea/services/task" "code.gitea.io/gitea/services/webhook" diff --git a/routers/web/repo/repo.go b/routers/web/repo/repo.go index bacfa549b..69bd1ed41 100644 --- a/routers/web/repo/repo.go +++ b/routers/web/repo/repo.go @@ -24,9 +24,9 @@ import ( "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" "code.gitea.io/gitea/modules/web" - archiver_service "code.gitea.io/gitea/services/archiver" "code.gitea.io/gitea/services/forms" repo_service "code.gitea.io/gitea/services/repository" + archiver_service "code.gitea.io/gitea/services/repository/archiver" ) const ( @@ -387,12 +387,12 @@ func Download(ctx *context.Context) { return } - archiver, err := models.GetRepoArchiver(db.DefaultContext, aReq.RepoID, aReq.Type, aReq.CommitID) + archiver, err := repo_model.GetRepoArchiver(db.DefaultContext, aReq.RepoID, aReq.Type, aReq.CommitID) if err != nil { ctx.ServerError("models.GetRepoArchiver", err) return } - if archiver != nil && archiver.Status == models.RepoArchiverReady { + if archiver != nil && archiver.Status == repo_model.ArchiverReady { download(ctx, aReq.GetArchiveName(), archiver) return } @@ -417,12 +417,12 @@ func Download(ctx *context.Context) { return } times++ - archiver, err = models.GetRepoArchiver(db.DefaultContext, aReq.RepoID, aReq.Type, aReq.CommitID) + archiver, err = repo_model.GetRepoArchiver(db.DefaultContext, aReq.RepoID, aReq.Type, aReq.CommitID) if err != nil { ctx.ServerError("archiver_service.StartArchive", err) return } - if archiver != nil && archiver.Status == models.RepoArchiverReady { + if archiver != nil && archiver.Status == repo_model.ArchiverReady { download(ctx, aReq.GetArchiveName(), archiver) return } @@ -430,7 +430,7 @@ func Download(ctx *context.Context) { } } -func download(ctx *context.Context, archiveName string, archiver *models.RepoArchiver) { +func download(ctx *context.Context, archiveName string, archiver *repo_model.RepoArchiver) { downloadName := ctx.Repo.Repository.Name + "-" + archiveName rPath, err := archiver.RelativePath() @@ -473,12 +473,12 @@ func InitiateDownload(ctx *context.Context) { return } - archiver, err := models.GetRepoArchiver(db.DefaultContext, aReq.RepoID, aReq.Type, aReq.CommitID) + archiver, err := repo_model.GetRepoArchiver(db.DefaultContext, aReq.RepoID, aReq.Type, aReq.CommitID) if err != nil { ctx.ServerError("archiver_service.StartArchive", err) return } - if archiver == nil || archiver.Status != models.RepoArchiverReady { + if archiver == nil || archiver.Status != repo_model.ArchiverReady { if err := archiver_service.StartArchive(aReq); err != nil { ctx.ServerError("archiver_service.StartArchive", err) return @@ -486,7 +486,7 @@ func InitiateDownload(ctx *context.Context) { } var completed bool - if archiver != nil && archiver.Status == models.RepoArchiverReady { + if archiver != nil && archiver.Status == repo_model.ArchiverReady { completed = true } diff --git a/services/cron/tasks_basic.go b/services/cron/tasks_basic.go index 814f6eae4..fdf8550c3 100644 --- a/services/cron/tasks_basic.go +++ b/services/cron/tasks_basic.go @@ -15,7 +15,8 @@ import ( "code.gitea.io/gitea/services/auth" "code.gitea.io/gitea/services/migrations" mirror_service "code.gitea.io/gitea/services/mirror" - repository_service "code.gitea.io/gitea/services/repository" + repo_service "code.gitea.io/gitea/services/repository" + archiver_service "code.gitea.io/gitea/services/repository/archiver" ) func registerUpdateMirrorTask() { @@ -56,7 +57,7 @@ func registerRepoHealthCheck() { Args: []string{}, }, func(ctx context.Context, _ *user_model.User, config Config) error { rhcConfig := config.(*RepoHealthCheckConfig) - return repository_service.GitFsck(ctx, rhcConfig.Timeout, rhcConfig.Args) + return repo_service.GitFsck(ctx, rhcConfig.Timeout, rhcConfig.Args) }) } @@ -80,7 +81,7 @@ func registerArchiveCleanup() { OlderThan: 24 * time.Hour, }, func(ctx context.Context, _ *user_model.User, config Config) error { acConfig := config.(*OlderThanConfig) - return models.DeleteOldRepositoryArchives(ctx, acConfig.OlderThan) + return archiver_service.DeleteOldRepositoryArchives(ctx, acConfig.OlderThan) }) } diff --git a/services/cron/tasks_extended.go b/services/cron/tasks_extended.go index 26dbe548a..bc6add090 100644 --- a/services/cron/tasks_extended.go +++ b/services/cron/tasks_extended.go @@ -13,6 +13,7 @@ import ( "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/updatechecker" repo_service "code.gitea.io/gitea/services/repository" + archiver_service "code.gitea.io/gitea/services/repository/archiver" user_service "code.gitea.io/gitea/services/user" ) @@ -36,7 +37,7 @@ func registerDeleteRepositoryArchives() { RunAtStart: false, Schedule: "@annually", }, func(ctx context.Context, _ *user_model.User, _ Config) error { - return repo_service.DeleteRepositoryArchives(ctx) + return archiver_service.DeleteRepositoryArchives(ctx) }) } diff --git a/services/repository/archive.go b/services/repository/archive.go deleted file mode 100644 index bea636c57..000000000 --- a/services/repository/archive.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2021 The Gitea Authors. All rights reserved. -// Use of this source code is governed by a MIT-style -// license that can be found in the LICENSE file. - -package repository - -import ( - "context" - - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/storage" -) - -// DeleteRepositoryArchives deletes all repositories' archives. -func DeleteRepositoryArchives(ctx context.Context) error { - if err := models.DeleteAllRepoArchives(); err != nil { - return err - } - return storage.Clean(storage.RepoArchives) -} diff --git a/services/archiver/archiver.go b/services/repository/archiver/archiver.go similarity index 72% rename from services/archiver/archiver.go rename to services/repository/archiver/archiver.go index 06686220f..7e886d79c 100644 --- a/services/archiver/archiver.go +++ b/services/repository/archiver/archiver.go @@ -1,20 +1,22 @@ -// Copyright 2020 The Gitea Authors. -// All rights reserved. +// Copyright 2020 The Gitea Authors. All rights reserved. // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. package archiver import ( + "context" "errors" "fmt" "io" "os" "regexp" "strings" + "time" "code.gitea.io/gitea/models" "code.gitea.io/gitea/models/db" + repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" @@ -113,14 +115,14 @@ func (aReq *ArchiveRequest) GetArchiveName() string { return strings.ReplaceAll(aReq.refName, "/", "-") + "." + aReq.Type.String() } -func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) { +func doArchive(r *ArchiveRequest) (*repo_model.RepoArchiver, error) { ctx, committer, err := db.TxContext() if err != nil { return nil, err } defer committer.Close() - archiver, err := models.GetRepoArchiver(ctx, r.RepoID, r.Type, r.CommitID) + archiver, err := repo_model.GetRepoArchiver(ctx, r.RepoID, r.Type, r.CommitID) if err != nil { return nil, err } @@ -128,17 +130,17 @@ func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) { if archiver != nil { // FIXME: If another process are generating it, we think it's not ready and just return // Or we should wait until the archive generated. - if archiver.Status == models.RepoArchiverGenerating { + if archiver.Status == repo_model.ArchiverGenerating { return nil, nil } } else { - archiver = &models.RepoArchiver{ + archiver = &repo_model.RepoArchiver{ RepoID: r.RepoID, Type: r.Type, CommitID: r.CommitID, - Status: models.RepoArchiverGenerating, + Status: repo_model.ArchiverGenerating, } - if err := models.AddRepoArchiver(ctx, archiver); err != nil { + if err := repo_model.AddRepoArchiver(ctx, archiver); err != nil { return nil, err } } @@ -150,9 +152,9 @@ func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) { _, err = storage.RepoArchives.Stat(rPath) if err == nil { - if archiver.Status == models.RepoArchiverGenerating { - archiver.Status = models.RepoArchiverReady - if err = models.UpdateRepoArchiverStatus(ctx, archiver); err != nil { + if archiver.Status == repo_model.ArchiverGenerating { + archiver.Status = repo_model.ArchiverReady + if err = repo_model.UpdateRepoArchiverStatus(ctx, archiver); err != nil { return nil, err } } @@ -169,7 +171,7 @@ func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) { rd.Close() }() var done = make(chan error) - repo, err := archiver.LoadRepo() + repo, err := models.LoadArchiverRepo(archiver) if err != nil { return nil, fmt.Errorf("archiver.LoadRepo failed: %v", err) } @@ -180,7 +182,7 @@ func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) { } defer gitRepo.Close() - go func(done chan error, w *io.PipeWriter, archiver *models.RepoArchiver, gitRepo *git.Repository) { + go func(done chan error, w *io.PipeWriter, archiver *repo_model.RepoArchiver, gitRepo *git.Repository) { defer func() { if r := recover(); r != nil { done <- fmt.Errorf("%v", r) @@ -218,9 +220,9 @@ func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) { return nil, err } - if archiver.Status == models.RepoArchiverGenerating { - archiver.Status = models.RepoArchiverReady - if err = models.UpdateRepoArchiverStatus(ctx, archiver); err != nil { + if archiver.Status == repo_model.ArchiverGenerating { + archiver.Status = repo_model.ArchiverReady + if err = repo_model.UpdateRepoArchiverStatus(ctx, archiver); err != nil { return nil, err } } @@ -234,7 +236,7 @@ func doArchive(r *ArchiveRequest) (*models.RepoArchiver, error) { // anything. In all cases, the caller should be examining the *ArchiveRequest // being returned for completion, as it may be different than the one they passed // in. -func ArchiveRepository(request *ArchiveRequest) (*models.RepoArchiver, error) { +func ArchiveRepository(request *ArchiveRequest) (*repo_model.RepoArchiver, error) { return doArchive(request) } @@ -277,3 +279,56 @@ func StartArchive(request *ArchiveRequest) error { } return archiverQueue.Push(request) } + +func deleteOldRepoArchiver(ctx context.Context, archiver *repo_model.RepoArchiver) error { + p, err := archiver.RelativePath() + if err != nil { + return err + } + if err := repo_model.DeleteRepoArchiver(ctx, archiver); err != nil { + return err + } + if err := storage.RepoArchives.Delete(p); err != nil { + log.Error("delete repo archive file failed: %v", err) + } + return nil +} + +// DeleteOldRepositoryArchives deletes old repository archives. +func DeleteOldRepositoryArchives(ctx context.Context, olderThan time.Duration) error { + log.Trace("Doing: ArchiveCleanup") + + for { + archivers, err := repo_model.FindRepoArchives(repo_model.FindRepoArchiversOption{ + ListOptions: db.ListOptions{ + PageSize: 100, + Page: 1, + }, + OlderThan: olderThan, + }) + if err != nil { + log.Trace("Error: ArchiveClean: %v", err) + return err + } + + for _, archiver := range archivers { + if err := deleteOldRepoArchiver(ctx, archiver); err != nil { + return err + } + } + if len(archivers) < 100 { + break + } + } + + log.Trace("Finished: ArchiveCleanup") + return nil +} + +// DeleteRepositoryArchives deletes all repositories' archives. +func DeleteRepositoryArchives(ctx context.Context) error { + if err := repo_model.DeleteAllRepoArchives(); err != nil { + return err + } + return storage.Clean(storage.RepoArchives) +} diff --git a/services/archiver/archiver_test.go b/services/repository/archiver/archiver_test.go similarity index 98% rename from services/archiver/archiver_test.go rename to services/repository/archiver/archiver_test.go index 5417dfb93..2c0b46d3d 100644 --- a/services/archiver/archiver_test.go +++ b/services/repository/archiver/archiver_test.go @@ -17,7 +17,7 @@ import ( ) func TestMain(m *testing.M) { - unittest.MainTest(m, filepath.Join("..", "..")) + unittest.MainTest(m, filepath.Join("..", "..", "..")) } func TestArchive_Basic(t *testing.T) {