feat: delete jobs in batches (#3682)
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
6695100dac
commit
a97584a104
|
|
@ -321,6 +321,9 @@ type GCConfig struct {
|
|||
|
||||
// TTL is the ttl for job.
|
||||
TTL time.Duration `yaml:"ttl" mapstructure:"ttl"`
|
||||
|
||||
// BatchSize is the batch size when operating gorm database.
|
||||
BatchSize int `yaml:"batchSize" mapstructure:"batchSize"`
|
||||
}
|
||||
|
||||
type PreheatConfig struct {
|
||||
|
|
@ -444,6 +447,7 @@ func New() *Config {
|
|||
GC: GCConfig{
|
||||
Interval: DefaultJobGCInterval,
|
||||
TTL: DefaultJobGCTTL,
|
||||
BatchSize: DefaultJobGCBatchSize,
|
||||
},
|
||||
Preheat: PreheatConfig{
|
||||
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
|
||||
|
|
@ -629,6 +633,10 @@ func (cfg *Config) Validate() error {
|
|||
return errors.New("gc requires parameter ttl")
|
||||
}
|
||||
|
||||
if cfg.Job.GC.BatchSize == 0 {
|
||||
return errors.New("gc requires parameter batchSize")
|
||||
}
|
||||
|
||||
if cfg.Job.Preheat.RegistryTimeout == 0 {
|
||||
return errors.New("preheat requires parameter registryTimeout")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -180,6 +180,7 @@ func TestConfig_Load(t *testing.T) {
|
|||
GC: GCConfig{
|
||||
Interval: 1 * time.Second,
|
||||
TTL: 1 * time.Second,
|
||||
BatchSize: 100,
|
||||
},
|
||||
Preheat: PreheatConfig{
|
||||
RegistryTimeout: DefaultJobPreheatRegistryTimeout,
|
||||
|
|
@ -765,6 +766,21 @@ func TestConfig_Validate(t *testing.T) {
|
|||
assert.EqualError(err, "gc requires parameter ttl")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "gc requires parameter batchSize",
|
||||
config: New(),
|
||||
mock: func(cfg *Config) {
|
||||
cfg.Auth.JWT = mockJWTConfig
|
||||
cfg.Database.Type = DatabaseTypeMysql
|
||||
cfg.Database.Mysql = mockMysqlConfig
|
||||
cfg.Database.Redis = mockRedisConfig
|
||||
cfg.Job.GC.BatchSize = 0
|
||||
},
|
||||
expect: func(t *testing.T, err error) {
|
||||
assert := assert.New(t)
|
||||
assert.EqualError(err, "gc requires parameter batchSize")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "preheat requires parameter registryTimeout",
|
||||
config: New(),
|
||||
|
|
|
|||
|
|
@ -93,6 +93,9 @@ const (
|
|||
// DefaultJobGCTTL is the default ttl for job.
|
||||
DefaultJobGCTTL = 12 * time.Hour
|
||||
|
||||
// DefaultJobGCBatchSize is the default batch size for operating on the database in gc job.
|
||||
DefaultJobGCBatchSize = 5000
|
||||
|
||||
// DefaultJobPreheatRegistryTimeout is the default timeout for requesting registry to get token and manifest.
|
||||
DefaultJobPreheatRegistryTimeout = 1 * time.Minute
|
||||
|
||||
|
|
|
|||
|
|
@ -72,6 +72,7 @@ job:
|
|||
gc:
|
||||
interval: 1s
|
||||
ttl: 1s
|
||||
batchSize: 100
|
||||
preheat:
|
||||
registryTimeout: 1m
|
||||
tls:
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ func (gc *gc) Serve() {
|
|||
select {
|
||||
case <-tick.C:
|
||||
logger.Infof("gc job started")
|
||||
if err := gc.db.WithContext(context.Background()).Where("created_at < ?", time.Now().Add(-gc.config.Job.GC.TTL)).Unscoped().Delete(&models.Job{}).Error; err != nil {
|
||||
if err := gc.deleteInBatches(context.Background()); err != nil {
|
||||
logger.Errorf("gc job failed: %v", err)
|
||||
}
|
||||
case <-gc.done:
|
||||
|
|
@ -74,3 +74,21 @@ func (gc *gc) Serve() {
|
|||
func (gc *gc) Stop() {
|
||||
close(gc.done)
|
||||
}
|
||||
|
||||
// deleteInBatches deletes jobs in batches.
|
||||
func (gc *gc) deleteInBatches(ctx context.Context) error {
|
||||
for {
|
||||
result := gc.db.WithContext(ctx).Where("created_at < ?", time.Now().Add(-gc.config.Job.GC.TTL)).Limit(gc.config.Job.GC.BatchSize).Unscoped().Delete(&models.Job{})
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
|
||||
if result.RowsAffected == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
logger.Infof("gc job deleted %d jobs", result.RowsAffected)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue