diff --git a/go.mod b/go.mod index 2c6fad1a5..5f40bf851 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/docker/docker v28.1.1+incompatible github.com/docker/go-connections v0.5.0 github.com/docker/go-units v0.4.0 - github.com/dragonflyoss/machinery v1.10.10 + github.com/dragonflyoss/machinery v1.10.13 github.com/elastic/go-freelru v0.16.0 github.com/fsouza/fake-gcs-server v1.52.2 github.com/gaius-qi/ping v1.0.0 diff --git a/go.sum b/go.sum index 43ab227f0..07cb72fe5 100644 --- a/go.sum +++ b/go.sum @@ -441,8 +441,8 @@ github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1 h1:ZClxb8laGDf5arX github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1/go.mod h1:cyGadeNEkKy96OOhEzfZl+yxihPEzKnqJwvfuSUqbZE= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= -github.com/dragonflyoss/machinery v1.10.10 h1:vvyPikg8W6Og6jyyUU5VVG+Lzjm+55v9xfVkqKaf8o8= -github.com/dragonflyoss/machinery v1.10.10/go.mod h1:YUhavio0FVIsY9e3mVrj7weroc08gWm1hiauPDu1S28= +github.com/dragonflyoss/machinery v1.10.13 h1:0iixzz4rn+oDIDHLz8sj5sQ5veTVg+Z1TOVKm2nnWv8= +github.com/dragonflyoss/machinery v1.10.13/go.mod h1:YUhavio0FVIsY9e3mVrj7weroc08gWm1hiauPDu1S28= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= diff --git a/internal/job/constants.go b/internal/job/constants.go index d5d12e10d..13ef16b8d 100644 --- a/internal/job/constants.go +++ b/internal/job/constants.go @@ -39,10 +39,13 @@ const ( // Machinery server configuration. const ( - DefaultResultsExpireIn = 86400 - DefaultRedisMaxIdle = 10 - DefaultRedisIdleTimeout = 300 - DefaultRedisReadTimeout = 60 - DefaultRedisWriteTimeout = 60 - DefaultRedisConnectTimeout = 60 + DefaultResultsExpireIn = 86400 + DefaultRedisMaxIdle = 0 + DefaultRedisMaxActive = 300 + DefaultRedisIdleTimeout = 30 + DefaultRedisReadTimeout = 60 + DefaultRedisWriteTimeout = 60 + DefaultRedisConnectTimeout = 60 + DefaultRedisNormalTasksPollPeriod = 2000 + DefaultRedisDelayedTasksPollPeriod = 500 ) diff --git a/internal/job/job.go b/internal/job/job.go index eec2434d4..876ca9dad 100644 --- a/internal/job/job.go +++ b/internal/job/job.go @@ -88,12 +88,15 @@ func New(cfg *Config, queue Queue) (*Job, error) { ResultBackend: backend, ResultsExpireIn: DefaultResultsExpireIn, Redis: &machineryv1config.RedisConfig{ - MasterName: cfg.MasterName, - MaxIdle: DefaultRedisMaxIdle, - IdleTimeout: DefaultRedisIdleTimeout, - ReadTimeout: DefaultRedisReadTimeout, - WriteTimeout: DefaultRedisWriteTimeout, - ConnectTimeout: DefaultRedisConnectTimeout, + MasterName: cfg.MasterName, + MaxIdle: DefaultRedisMaxIdle, + MaxActive: DefaultRedisMaxActive, + IdleTimeout: DefaultRedisIdleTimeout, + ReadTimeout: DefaultRedisReadTimeout, + WriteTimeout: DefaultRedisWriteTimeout, + ConnectTimeout: DefaultRedisConnectTimeout, + NormalTasksPollPeriod: DefaultRedisNormalTasksPollPeriod, + DelayedTasksPollPeriod: DefaultRedisDelayedTasksPollPeriod, }, }) if err != nil { diff --git a/manager/job/preheat.go b/manager/job/preheat.go index 4d9652acc..d7f2f20fc 100644 --- a/manager/job/preheat.go +++ b/manager/job/preheat.go @@ -199,7 +199,7 @@ func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.Prehea } logger.Infof("[preheat]: create preheat group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks) - if _, err := p.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { + if _, err := p.job.Server.SendGroupWithContext(ctx, group, 50); err != nil { logger.Errorf("[preheat]: create preheat group %s failed", group.GroupUUID, err) return nil, err } diff --git a/manager/job/task.go b/manager/job/task.go index 11c67e31f..f72c688ed 100644 --- a/manager/job/task.go +++ b/manager/job/task.go @@ -102,7 +102,7 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler, } logger.Infof("create task group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks) - if _, err := t.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { + if _, err := t.job.Server.SendGroupWithContext(ctx, group, 50); err != nil { logger.Errorf("create task group %s failed", group.GroupUUID, err) return nil, err } @@ -163,7 +163,7 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul } logger.Infof("create task group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks) - if _, err := t.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { + if _, err := t.job.Server.SendGroupWithContext(ctx, group, 50); err != nil { logger.Errorf("create preheat group %s failed", group.GroupUUID, err) return nil, err }