From 36c110b358f2e454971ff5ef8440e8afb15434a3 Mon Sep 17 00:00:00 2001 From: Chlins Zhang Date: Fri, 25 Apr 2025 12:03:48 +0800 Subject: [PATCH] refactor: migrate the job gc to mananger gc server (#3991) refactor: migrate the job gc to manager gc server. Signed-off-by: chlins --- manager/gc/job.go | 110 +++++++++++++++++++++++++++++++++++++++ manager/gc/recorder.go | 87 +++++++++++++++++++++++++++++++ manager/job/gc.go | 94 --------------------------------- manager/job/job.go | 9 ---- manager/manager.go | 20 +++++++ manager/service/audit.go | 30 ++++++----- 6 files changed, 233 insertions(+), 117 deletions(-) create mode 100644 manager/gc/job.go create mode 100644 manager/gc/recorder.go delete mode 100644 manager/job/gc.go diff --git a/manager/gc/job.go b/manager/gc/job.go new file mode 100644 index 000000000..7f999e728 --- /dev/null +++ b/manager/gc/job.go @@ -0,0 +1,110 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gc + +import ( + "encoding/json" + "time" + + "gorm.io/gorm" + + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/manager/models" + libgc "d7y.io/dragonfly/v2/pkg/gc" +) + +const ( + // DefaultJobGCBatchSize is the default batch size for deleting jobs. + DefaultJobGCBatchSize = 5000 + + // DefaultJobGCInterval is the default interval for running job GC. + DefaultJobGCInterval = time.Hour * 3 + + // DefaultJobGCTimeout is the default timeout for running job GC. + DefaultJobGCTimeout = time.Hour * 1 + + // JohGCTaskID is the ID of the job GC task. + JobGCTaskID = "job" +) + +func NewJobGCTask(db *gorm.DB) libgc.Task { + return libgc.Task{ + ID: JobGCTaskID, + Interval: DefaultJobGCInterval, + Timeout: DefaultJobGCTimeout, + Runner: &job{db: db, recorder: newJobRecorder(db)}, + } +} + +// job is the struct for cleaning up jobs which implements the gc Runner interface. +type job struct { + db *gorm.DB + recorder *jobRecorder +} + +// RunGC implements the gc Runner interface. +func (j *job) RunGC() error { + ttl, err := j.getTTL() + if err != nil { + return err + } + + if err = j.recorder.Init(JobGCTaskID, models.JSONMap{ + "ttl": ttl, + "batch_size": DefaultJobGCBatchSize, + }); err != nil { + return err + } + + var gcResult Result + defer func() { + if err := j.recorder.Record(gcResult); err != nil { + logger.Errorf("failed to record job GC result: %v", err) + } + }() + + for { + result := j.db.Where("created_at < ?", time.Now().Add(-ttl)).Limit(DefaultJobGCBatchSize).Unscoped().Delete(&models.Job{}) + if result.Error != nil { + gcResult.Error = result.Error + return result.Error + } + + if result.RowsAffected == 0 { + break + } + + gcResult.Purged += result.RowsAffected + logger.Infof("gc job deleted %d jobs", result.RowsAffected) + } + + return nil +} + +func (j *job) getTTL() (time.Duration, error) { + var config models.Config + if err := j.db.Model(models.Config{}).First(&config, &models.Config{Name: models.ConfigGC}).Error; err != nil { + return 0, err + } + + var gcConfig models.GCConfig + if err := json.Unmarshal([]byte(config.Value), &gcConfig); err != nil { + return 0, err + } + + return gcConfig.Job.TTL, nil +} diff --git a/manager/gc/recorder.go b/manager/gc/recorder.go new file mode 100644 index 000000000..45c15f421 --- /dev/null +++ b/manager/gc/recorder.go @@ -0,0 +1,87 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gc + +import ( + "errors" + + "gorm.io/gorm" + + "d7y.io/dragonfly/v2/manager/models" +) + +const ( + // GCJobType indicates the gc task is completed successfully. + GCJobType = "gc" + + // GCStateSuccess indicates the gc task is completed successfully. + GCStateSuccess = "SUCCESS" + + // GCStateFailure indicates the gc task is completed with failure. + GCStateFailure = "FAILURE" +) + +type Result struct { + Error error + Purged int64 +} + +type jobRecorder struct { + db *gorm.DB + job *models.Job +} + +func newJobRecorder(db *gorm.DB) *jobRecorder { + return &jobRecorder{ + db: db, + } +} + +func (jb *jobRecorder) Init(taskID string, args models.JSONMap) error { + job := models.Job{ + Type: GCJobType, + TaskID: taskID, + Args: args, + } + + if err := jb.db.Create(&job).Error; err != nil { + return err + } + + jb.job = &job + return nil +} + +func (jb *jobRecorder) Record(result Result) error { + if jb.job == nil { + return errors.New("job not found") + } + + if jb.job.Result == nil { + jb.job.Result = make(models.JSONMap) + } + + jb.job.State = GCStateSuccess + jb.job.Result["purged"] = result.Purged + + if result.Error != nil { + jb.job.State = GCStateFailure + jb.job.Result["error"] = result.Error.Error() + } + + return jb.db.Save(jb.job).Error +} diff --git a/manager/job/gc.go b/manager/job/gc.go deleted file mode 100644 index 5db3f92c7..000000000 --- a/manager/job/gc.go +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2024 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -//go:generate mockgen -destination mocks/gc_mock.go -source gc.go -package mocks - -package job - -import ( - "context" - "time" - - "gorm.io/gorm" - - logger "d7y.io/dragonfly/v2/internal/dflog" - "d7y.io/dragonfly/v2/manager/config" - "d7y.io/dragonfly/v2/manager/models" -) - -// GC is an interface for gc. -type GC interface { - // Serve started gc server. - Serve() - - // Stop gc server. - Stop() -} - -// gc is an implementation of GC. -type gc struct { - config *config.Config - db *gorm.DB - done chan struct{} -} - -// newGC returns a new GC. -func newGC(cfg *config.Config, gdb *gorm.DB) (GC, error) { - return &gc{ - config: cfg, - db: gdb, - done: make(chan struct{}), - }, nil -} - -// Serve started gc server. -func (gc *gc) Serve() { - tick := time.NewTicker(gc.config.Job.GC.Interval) - for { - select { - case <-tick.C: - logger.Infof("gc job started") - if err := gc.deleteInBatches(context.Background()); err != nil { - logger.Errorf("gc job failed: %v", err) - } - case <-gc.done: - return - } - } -} - -// Stop gc server. -func (gc *gc) Stop() { - close(gc.done) -} - -// deleteInBatches deletes jobs in batches. -func (gc *gc) deleteInBatches(ctx context.Context) error { - for { - result := gc.db.WithContext(ctx).Where("created_at < ?", time.Now().Add(-gc.config.Job.GC.TTL)).Limit(gc.config.Job.GC.BatchSize).Unscoped().Delete(&models.Job{}) - if result.Error != nil { - return result.Error - } - - if result.RowsAffected == 0 { - break - } - - logger.Infof("gc job deleted %d jobs", result.RowsAffected) - } - - return nil -} diff --git a/manager/job/job.go b/manager/job/job.go index 45e345163..55c878ea1 100644 --- a/manager/job/job.go +++ b/manager/job/job.go @@ -41,7 +41,6 @@ type Job struct { Preheat SyncPeers Task - GC } // New returns a new Job. @@ -78,29 +77,21 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) { return nil, err } - gc, err := newGC(cfg, gdb) - if err != nil { - return nil, err - } - return &Job{ Job: j, Preheat: preheat, SyncPeers: syncPeers, Task: newTask(j), - GC: gc, }, nil } // Serve starts the job server. func (j *Job) Serve() { - go j.GC.Serve() go j.SyncPeers.Serve() } // Stop stops the job server. func (j *Job) Stop() { - j.GC.Stop() j.SyncPeers.Stop() } diff --git a/manager/manager.go b/manager/manager.go index 0dda7834c..46dcc864c 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -34,6 +34,7 @@ import ( "d7y.io/dragonfly/v2/manager/cache" "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/database" + "d7y.io/dragonfly/v2/manager/gc" "d7y.io/dragonfly/v2/manager/job" "d7y.io/dragonfly/v2/manager/metrics" "d7y.io/dragonfly/v2/manager/permission/rbac" @@ -42,6 +43,7 @@ import ( "d7y.io/dragonfly/v2/manager/searcher" "d7y.io/dragonfly/v2/manager/service" "d7y.io/dragonfly/v2/pkg/dfpath" + pkggc "d7y.io/dragonfly/v2/pkg/gc" "d7y.io/dragonfly/v2/pkg/net/ip" "d7y.io/dragonfly/v2/pkg/objectstorage" "d7y.io/dragonfly/v2/pkg/rpc" @@ -93,6 +95,9 @@ type Server struct { // Job rate limiter. jobRateLimiter ratelimiter.JobRateLimiter + // GC server. + gc pkggc.GC + // GRPC server. grpcServer *grpc.Server @@ -157,6 +162,14 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) { return nil, err } + // Initialize garbage collector. + s.gc = pkggc.New() + + // Register job gc task. + if err := s.gc.Add(gc.NewJobGCTask(db.DB)); err != nil { + return nil, err + } + // Initialize REST server. restService := service.New(cfg, db, cache, job, enforcer, objectStorage) router, err := router.Init(cfg, d.LogDir(), restService, db, enforcer, s.jobRateLimiter, EmbedFolder(assets, assetsTargetPath)) @@ -253,6 +266,10 @@ func (s *Server) Serve() error { s.jobRateLimiter.Serve() }() + // Started gc server. + s.gc.Start() + logger.Info("started gc server") + // Generate GRPC listener. ip, ok := ip.FormatIP(s.config.Server.GRPC.ListenIP.String()) if !ok { @@ -299,6 +316,9 @@ func (s *Server) Stop() { // Stop job rate limiter. s.jobRateLimiter.Stop() + // Stop gc server. + s.gc.Stop() + // Stop GRPC server. stopped := make(chan struct{}) go func() { diff --git a/manager/service/audit.go b/manager/service/audit.go index a2e6fc0b1..462983e46 100644 --- a/manager/service/audit.go +++ b/manager/service/audit.go @@ -28,12 +28,14 @@ import ( ) const ( - // auditBatchInsertSize is the size for batch insertion. - auditBatchInsertSize = 500 - // auditBatchInsertInterval is the interval for batch insertion. - auditBatchInsertInterval = 5 * time.Second - // auditBufferSize is the size of the audit channel buffer. - auditBufferSize = 1000 + // AuditBufferSize is the size of the audit channel buffer. + AuditBufferSize = 1000 + + // AuditInsertBatchSize is the size for batch insertion. + AuditInsertBatchSize = 500 + + // AuditInsertInterval is the interval for batch insertion. + AuditInsertInterval = time.Second * 5 ) var ( @@ -43,7 +45,7 @@ var ( func (s *service) AsyncCreateAudit(ctx context.Context, json *types.CreateAuditRequest) error { once.Do(func() { - auditCh = make(chan *models.Audit, auditBufferSize) + auditCh = make(chan *models.Audit, AuditBufferSize) go s.processAudit() }) @@ -67,7 +69,7 @@ func (s *service) AsyncCreateAudit(ctx context.Context, json *types.CreateAuditR return nil default: // Avoid to hang out the AsyncCreateAudit if the buffer is full. - return fmt.Errorf("audit buffer is full, buffer size: %d, drop the audit %#v", auditBufferSize, audit) + return fmt.Errorf("audit buffer is full, buffer size: %d, drop the audit %#v", AuditBufferSize, audit) } } } @@ -75,8 +77,8 @@ func (s *service) AsyncCreateAudit(ctx context.Context, json *types.CreateAuditR func (s *service) processAudit() { // Use the new context as this is asynchronous operation. ctx := context.Background() - audits := make([]*models.Audit, 0, auditBatchInsertSize) - ticker := time.NewTicker(auditBatchInsertInterval) + audits := make([]*models.Audit, 0, AuditInsertBatchSize) + ticker := time.NewTicker(AuditInsertInterval) defer ticker.Stop() createAuditInBatch := func(ctx context.Context, audits []*models.Audit) error { @@ -101,17 +103,17 @@ func (s *service) processAudit() { } audits = append(audits, audit) - if len(audits) >= auditBatchInsertSize { + if len(audits) >= AuditInsertBatchSize { if err := createAuditInBatch(ctx, audits); err == nil { - audits = make([]*models.Audit, 0, auditBatchInsertSize) + audits = make([]*models.Audit, 0, AuditInsertBatchSize) } - ticker.Reset(auditBatchInsertInterval) + ticker.Reset(AuditInsertInterval) } case <-ticker.C: if len(audits) > 0 { if err := createAuditInBatch(ctx, audits); err == nil { - audits = make([]*models.Audit, 0, auditBatchInsertSize) + audits = make([]*models.Audit, 0, AuditInsertBatchSize) } } }