From 903ba33a5d35a80d1a8b004abc1aaf2a91bb2978 Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 18 Jun 2025 18:57:22 +0800 Subject: [PATCH] feat: support get info of the preheating image (#4139) * feat: support get image by job Signed-off-by: Gaius * feat: support get info of the preheating image Signed-off-by: Gaius * feat: refactor GetImageJob to GetImageDistributionJob and improve error handling Signed-off-by: Gaius --------- Signed-off-by: Gaius --- codecov.yml | 2 +- internal/job/constants.go | 7 +- manager/database/cacher.go | 6 +- manager/handlers/job.go | 14 ++ manager/job/job.go | 7 +- manager/job/preheat.go | 55 +++-- manager/job/preheat_test.go | 6 +- manager/service/job.go | 315 +++++++++++++++++++++++++- manager/service/mocks/service_mock.go | 15 ++ manager/service/service.go | 1 + manager/types/job.go | 109 ++++++++- scheduler/scheduling/scheduling.go | 51 +++-- scheduler/service/service_v2.go | 10 +- 13 files changed, 514 insertions(+), 84 deletions(-) diff --git a/codecov.yml b/codecov.yml index ed609900b..1301fff67 100644 --- a/codecov.yml +++ b/codecov.yml @@ -6,7 +6,7 @@ coverage: project: default: enabled: yes - target: 33% + target: 32% patch: default: enabled: no diff --git a/internal/job/constants.go b/internal/job/constants.go index d9c475bcb..46c4224f0 100644 --- a/internal/job/constants.go +++ b/internal/job/constants.go @@ -33,6 +33,9 @@ const ( // GetTaskJob is the name of getting task job. GetTaskJob = "get_task" + // GetImageDistributionJob is the job name of getting image distribution. + GetImageDistributionJob = "get_image_distribution" + // DeleteTaskJob is the name of deleting task job. DeleteTaskJob = "delete_task" @@ -43,8 +46,8 @@ const ( // Machinery server configuration. const ( DefaultResultsExpireIn = 86400 - DefaultRedisMaxIdle = 30 - DefaultRedisMaxActive = 50 + DefaultRedisMaxIdle = 70 + DefaultRedisMaxActive = 100 DefaultRedisIdleTimeout = 30 DefaultRedisReadTimeout = 60 DefaultRedisWriteTimeout = 60 diff --git a/manager/database/cacher.go b/manager/database/cacher.go index f8a64f478..432cade38 100644 --- a/manager/database/cacher.go +++ b/manager/database/cacher.go @@ -36,7 +36,7 @@ const ( // cacher is a cache implementation using LRU for gorm. type cacher struct { // store is the LRU cache. - store *lru.LRU[string, any] + store *lru.ShardedLRU[string, any] } // hashStringXXHASH returns the hash of the string s. @@ -46,7 +46,7 @@ func hashStringXXHASH(s string) uint32 { // newCacher creates a new cacher. func newCacher() (caches.Cacher, error) { - store, err := lru.New[string, any](defaultCacheSize, hashStringXXHASH) + store, err := lru.NewSharded[string, any](defaultCacheSize, hashStringXXHASH) if err != nil { return nil, err } @@ -86,7 +86,7 @@ func (c *cacher) Store(ctx context.Context, key string, val *caches.Query[any]) // INSERT / UPDATE / DELETE queries are sent to the DB. func (c *cacher) Invalidate(ctx context.Context) error { var err error - c.store, err = lru.New[string, any](defaultCacheSize, hashStringXXHASH) + c.store, err = lru.NewSharded[string, any](defaultCacheSize, hashStringXXHASH) if err != nil { return err } diff --git a/manager/handlers/job.go b/manager/handlers/job.go index c559ea1e2..8ff61b1e6 100644 --- a/manager/handlers/job.go +++ b/manager/handlers/job.go @@ -98,6 +98,20 @@ func (h *Handlers) CreateJob(ctx *gin.Context) { return } + ctx.JSON(http.StatusOK, job) + case job.GetImageDistributionJob: + var json types.CreateGetImageDistributionJobRequest + if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + job, err := h.service.CreateGetImageDistributionJob(ctx.Request.Context(), json) + if err != nil { + ctx.Error(err) // nolint: errcheck + return + } + ctx.JSON(http.StatusOK, job) case job.DeleteTaskJob: var json types.CreateDeleteTaskJobRequest diff --git a/manager/job/job.go b/manager/job/job.go index 55c878ea1..8075fa391 100644 --- a/manager/job/job.go +++ b/manager/job/job.go @@ -67,11 +67,6 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) { } } - preheat, err := newPreheat(j, cfg.Job.Preheat.RegistryTimeout, certPool, cfg.Job.Preheat.TLS.InsecureSkipVerify) - if err != nil { - return nil, err - } - syncPeers, err := newSyncPeers(cfg, j, gdb) if err != nil { return nil, err @@ -79,7 +74,7 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) { return &Job{ Job: j, - Preheat: preheat, + Preheat: newPreheat(j, cfg.Job.Preheat.RegistryTimeout, certPool, cfg.Job.Preheat.TLS.InsecureSkipVerify), SyncPeers: syncPeers, Task: newTask(j), }, nil diff --git a/manager/job/preheat.go b/manager/job/preheat.go index d7f2f20fc..8d313b816 100644 --- a/manager/job/preheat.go +++ b/manager/job/preheat.go @@ -56,6 +56,11 @@ import ( // preheatImage is an image for preheat. type PreheatType string +// String returns the string representation of PreheatType. +func (p PreheatType) String() string { + return string(p) +} + const ( // PreheatImageType is image type of preheat job. PreheatImageType PreheatType = "image" @@ -81,26 +86,19 @@ type Preheat interface { // preheat is an implementation of Preheat. type preheat struct { job *internaljob.Job - certificateChain [][]byte rootCAs *x509.CertPool insecureSkipVerify bool registryTimeout time.Duration } // newPreheat creates a new Preheat. -func newPreheat(job *internaljob.Job, registryTimeout time.Duration, rootCAs *x509.CertPool, insecureSkipVerify bool) (Preheat, error) { - p := &preheat{ +func newPreheat(job *internaljob.Job, registryTimeout time.Duration, rootCAs *x509.CertPool, insecureSkipVerify bool) Preheat { + return &preheat{ job: job, rootCAs: rootCAs, insecureSkipVerify: insecureSkipVerify, registryTimeout: registryTimeout, } - - if rootCAs != nil { - p.certificateChain = rootCAs.Subjects() - } - - return p, nil } // CreatePreheat creates a preheat job. @@ -121,7 +119,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul return nil, errors.New("invalid params: url is required") } - files, err = p.getImageLayers(ctx, json) + files, err = GetImageLayers(ctx, json, p.registryTimeout, p.rootCAs, p.insecureSkipVerify) if err != nil { return nil, err } @@ -135,6 +133,11 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul json.URLs = append(json.URLs, json.URL) } + var certificateChain [][]byte + if p.rootCAs != nil { + certificateChain = p.rootCAs.Subjects() + } + for _, url := range json.URLs { files = append(files, internaljob.PreheatRequest{ URL: url, @@ -147,7 +150,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul Percentage: json.Percentage, Count: json.Count, ConcurrentCount: json.ConcurrentCount, - CertificateChain: p.certificateChain, + CertificateChain: certificateChain, InsecureSkipVerify: p.insecureSkipVerify, Timeout: json.Timeout, LoadToCache: json.LoadToCache, @@ -211,8 +214,7 @@ func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.Prehea }, nil } -// getImageLayers gets layers of image. -func (p *preheat) getImageLayers(ctx context.Context, args types.PreheatArgs) ([]internaljob.PreheatRequest, error) { +func GetImageLayers(ctx context.Context, args types.PreheatArgs, registryTimeout time.Duration, rootCAs *x509.CertPool, insecureSkipVerify bool) ([]internaljob.PreheatRequest, error) { ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer)) defer span.End() @@ -232,10 +234,10 @@ func (p *preheat) getImageLayers(ctx context.Context, args types.PreheatArgs) ([ } httpClient := &http.Client{ - Timeout: p.registryTimeout, + Timeout: registryTimeout, Transport: &http.Transport{ DialContext: nethttp.NewSafeDialer().DialContext, - TLSClientConfig: &tls.Config{RootCAs: p.rootCAs, InsecureSkipVerify: p.insecureSkipVerify}, + TLSClientConfig: &tls.Config{RootCAs: rootCAs, InsecureSkipVerify: insecureSkipVerify}, MaxIdleConns: defaultHTTPTransport.MaxIdleConns, MaxIdleConnsPerHost: defaultHTTPTransport.MaxIdleConnsPerHost, MaxConnsPerHost: defaultHTTPTransport.MaxConnsPerHost, @@ -259,7 +261,7 @@ func (p *preheat) getImageLayers(ctx context.Context, args types.PreheatArgs) ([ } // Get manifests. - manifests, err := p.getManifests(ctx, client, image, header.Clone(), platform) + manifests, err := getManifests(ctx, client, image, header.Clone(), platform) if err != nil { return nil, err } @@ -273,7 +275,7 @@ func (p *preheat) getImageLayers(ctx context.Context, args types.PreheatArgs) ([ header.Set("Authorization", client.GetAuthToken()) // parse image layers to preheat - layers, err := p.parseLayers(manifests, args, header.Clone(), image) + layers, err := parseLayers(manifests, args, header.Clone(), image, rootCAs, insecureSkipVerify) if err != nil { return nil, err } @@ -282,7 +284,7 @@ func (p *preheat) getImageLayers(ctx context.Context, args types.PreheatArgs) ([ } // getManifests gets manifests of image. -func (p *preheat) getManifests(ctx context.Context, client *imageAuthClient, image *preheatImage, header http.Header, platform specs.Platform) ([]distribution.Manifest, error) { +func getManifests(ctx context.Context, client *imageAuthClient, image *preheatImage, header http.Header, platform specs.Platform) ([]distribution.Manifest, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, image.manifestURL(), nil) if err != nil { return nil, err @@ -330,9 +332,9 @@ func (p *preheat) getManifests(ctx context.Context, client *imageAuthClient, ima return []distribution.Manifest{v}, nil case *manifestlist.DeserializedManifestList: var result []distribution.Manifest - for _, v := range p.filterManifests(v.Manifests, platform) { + for _, v := range filterManifests(v.Manifests, platform) { image.tag = v.Digest.String() - manifests, err := p.getManifests(ctx, client, image, header.Clone(), platform) + manifests, err := getManifests(ctx, client, image, header.Clone(), platform) if err != nil { return nil, err } @@ -347,7 +349,7 @@ func (p *preheat) getManifests(ctx context.Context, client *imageAuthClient, ima } // filterManifests filters manifests with platform. -func (p *preheat) filterManifests(manifests []manifestlist.ManifestDescriptor, platform specs.Platform) []manifestlist.ManifestDescriptor { +func filterManifests(manifests []manifestlist.ManifestDescriptor, platform specs.Platform) []manifestlist.ManifestDescriptor { var matches []manifestlist.ManifestDescriptor for _, desc := range manifests { if desc.Platform.Architecture == platform.Architecture && desc.Platform.OS == platform.OS { @@ -359,7 +361,12 @@ func (p *preheat) filterManifests(manifests []manifestlist.ManifestDescriptor, p } // parseLayers parses layers of image. -func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.PreheatArgs, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) { +func parseLayers(manifests []distribution.Manifest, args types.PreheatArgs, header http.Header, image *preheatImage, rootCAs *x509.CertPool, insecureSkipVerify bool) ([]internaljob.PreheatRequest, error) { + var certificateChain [][]byte + if rootCAs != nil { + certificateChain = rootCAs.Subjects() + } + var layers []internaljob.PreheatRequest for _, m := range manifests { for _, v := range m.References() { @@ -375,8 +382,8 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh Percentage: args.Percentage, Count: args.Count, ConcurrentCount: args.ConcurrentCount, - CertificateChain: p.certificateChain, - InsecureSkipVerify: p.insecureSkipVerify, + CertificateChain: certificateChain, + InsecureSkipVerify: insecureSkipVerify, Timeout: args.Timeout, } diff --git a/manager/job/preheat_test.go b/manager/job/preheat_test.go index ba786c443..df4c5c656 100644 --- a/manager/job/preheat_test.go +++ b/manager/job/preheat_test.go @@ -3,6 +3,7 @@ package job import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" @@ -10,7 +11,7 @@ import ( "d7y.io/dragonfly/v2/manager/types" ) -func TestPreheat_getImageLayers(t *testing.T) { +func TestPreheat_GetImageLayers(t *testing.T) { tests := []struct { name string args types.PreheatArgs @@ -42,8 +43,7 @@ func TestPreheat_getImageLayers(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - p := &preheat{} - layers, err := p.getImageLayers(context.Background(), tc.args) + layers, err := GetImageLayers(context.Background(), tc.args, 30*time.Second, nil, true) if err != nil { t.Fatal(err) } diff --git a/manager/service/job.go b/manager/service/job.go index 3550bb909..5e081489f 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -18,24 +18,30 @@ package service import ( "context" + "crypto/x509" "errors" "fmt" + "sync" "time" machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks" "github.com/google/uuid" + "golang.org/x/sync/errgroup" "gorm.io/gorm" logger "d7y.io/dragonfly/v2/internal/dflog" internaljob "d7y.io/dragonfly/v2/internal/job" + "d7y.io/dragonfly/v2/manager/job" "d7y.io/dragonfly/v2/manager/metrics" "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/manager/types" pkggc "d7y.io/dragonfly/v2/pkg/gc" + "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/retry" "d7y.io/dragonfly/v2/pkg/slices" "d7y.io/dragonfly/v2/pkg/structure" + pkgtypes "d7y.io/dragonfly/v2/pkg/types" ) const ( @@ -44,6 +50,9 @@ const ( // DefaultGCJobPollingInterval is the default interval for polling GC job. DefaultGCJobPollingInterval = 5 * time.Second + + // DefaultGetImageDistributionJobConcurrentCount is the default concurrent count for getting image distribution job. + DefaultGetImageDistributionJobConcurrentCount = 30 ) func (s *service) CreateGCJob(ctx context.Context, json types.CreateGCJobRequest) (*models.Job, error) { @@ -53,6 +62,7 @@ func (s *service) CreateGCJob(ctx context.Context, json types.CreateGCJobRequest // This is a non-block function to run the gc task, which will run the task asynchronously in the backend. if err := s.gc.Run(ctx, json.Args.Type); err != nil { + logger.Errorf("run gc job failed: %w", err) return nil, err } @@ -97,6 +107,7 @@ func (s *service) pollingGCJob(ctx context.Context, jobType string, userID uint, func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error { schedulers, err := s.findSchedulerInClusters(ctx, json.SchedulerClusterIDs) if err != nil { + logger.Errorf("find scheduler in clusters failed: %w", err) return err } @@ -122,16 +133,19 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat args, err := structure.StructToMap(json.Args) if err != nil { + logger.Errorf("convert preheat args to map failed: %w", err) return nil, err } candidateSchedulers, err := s.findAllCandidateSchedulersInClusters(ctx, json.SchedulerClusterIDs, []string{types.SchedulerFeaturePreheat}) if err != nil { + logger.Errorf("find candidate schedulers in clusters failed: %w", err) return nil, err } groupJobState, err := s.job.CreatePreheat(ctx, candidateSchedulers, json.Args) if err != nil { + logger.Errorf("create preheat job failed: %w", err) return nil, err } @@ -151,10 +165,11 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat } if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { + logger.Errorf("create preheat job failed: %w", err) return nil, err } - go s.pollingJob(context.Background(), internaljob.PreheatJob, job.ID, job.TaskID) + go s.pollingJob(context.Background(), internaljob.PreheatJob, job.ID, job.TaskID, 30, 300, 16) return &job, nil } @@ -165,10 +180,160 @@ func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTask args, err := structure.StructToMap(json.Args) if err != nil { + logger.Errorf("convert get task args to map failed: %w", err) return nil, err } schedulers, err := s.findAllSchedulersInClusters(ctx, json.SchedulerClusterIDs) + if err != nil { + logger.Errorf("find schedulers in clusters failed: %w", err) + return nil, err + } + + groupJobState, err := s.job.CreateGetTask(ctx, schedulers, json.Args) + if err != nil { + logger.Errorf("create get task job failed: %w", err) + return nil, err + } + + var schedulerClusters []models.SchedulerCluster + for _, scheduler := range schedulers { + schedulerClusters = append(schedulerClusters, scheduler.SchedulerCluster) + } + + job := models.Job{ + TaskID: groupJobState.GroupUUID, + BIO: json.BIO, + Type: json.Type, + State: groupJobState.State, + Args: args, + UserID: json.UserID, + SchedulerClusters: schedulerClusters, + } + + if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { + logger.Errorf("create get task job failed: %w", err) + return nil, err + } + + go s.pollingJob(context.Background(), internaljob.GetTaskJob, job.ID, job.TaskID, 30, 300, 16) + logger.Infof("create get task job %s for task %s in scheduler clusters %v", job.ID, job.TaskID, json.SchedulerClusterIDs) + return &job, nil +} + +func (s *service) CreateGetImageDistributionJob(ctx context.Context, json types.CreateGetImageDistributionJobRequest) (*types.CreateGetImageDistributionJobResponse, error) { + layers, err := s.getImageLayers(ctx, json) + if err != nil { + err = fmt.Errorf("get image layers failed: %w", err) + logger.Error(err) + return nil, err + } + + image := types.Image{Layers: make([]types.Layer, 0, len(layers))} + for _, file := range layers { + image.Layers = append(image.Layers, types.Layer{URL: file.URL}) + } + + schedulers, err := s.findAllSchedulersInClusters(ctx, json.SchedulerClusterIDs) + if err != nil { + err = fmt.Errorf("find schedulers in clusters failed: %w", err) + logger.Error(err) + return nil, err + } + + // Create multiple get task jobs synchronously for each layer and + // extract peers from the jobs. + jobs := s.createGetTaskJobsSync(ctx, layers, json, schedulers) + peers, err := s.extractPeersFromJobs(jobs) + if err != nil { + err = fmt.Errorf("extract peers from jobs failed: %w", err) + logger.Error(err) + return nil, err + } + + return &types.CreateGetImageDistributionJobResponse{ + Image: image, + Peers: peers, + }, nil +} + +func (s *service) getImageLayers(ctx context.Context, json types.CreateGetImageDistributionJobRequest) ([]internaljob.PreheatRequest, error) { + var certPool *x509.CertPool + if len(s.config.Job.Preheat.TLS.CACert) != 0 { + certPool = x509.NewCertPool() + if !certPool.AppendCertsFromPEM([]byte(s.config.Job.Preheat.TLS.CACert)) { + return nil, errors.New("invalid CA Cert") + } + } + + layers, err := job.GetImageLayers(ctx, types.PreheatArgs{ + Type: job.PreheatImageType.String(), + URL: json.Args.URL, + PieceLength: json.Args.PieceLength, + Tag: json.Args.Tag, + Application: json.Args.Application, + FilteredQueryParams: json.Args.FilteredQueryParams, + Headers: json.Args.Headers, + Username: json.Args.Username, + Password: json.Args.Password, + Platform: json.Args.Platform, + }, s.config.Job.Preheat.RegistryTimeout, certPool, s.config.Job.Preheat.TLS.InsecureSkipVerify) + if err != nil { + return nil, fmt.Errorf("get image layers failed: %w", err) + } + + if len(layers) == 0 { + return nil, errors.New("no valid image layers found") + } + + return layers, nil +} + +func (s *service) createGetTaskJobsSync(ctx context.Context, layers []internaljob.PreheatRequest, json types.CreateGetImageDistributionJobRequest, schedulers []models.Scheduler) []*models.Job { + var mu sync.Mutex + jobs := make([]*models.Job, 0, len(layers)) + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(DefaultGetImageDistributionJobConcurrentCount) + for _, file := range layers { + eg.Go(func() error { + job, err := s.createGetTaskJobSync(ctx, types.CreateGetTaskJobRequest{ + BIO: json.BIO, + Type: internaljob.GetTaskJob, + Args: types.GetTaskArgs{ + URL: file.URL, + PieceLength: file.PieceLength, + Tag: file.Tag, + Application: file.Application, + FilteredQueryParams: json.Args.FilteredQueryParams, + }, + SchedulerClusterIDs: json.SchedulerClusterIDs, + }, schedulers) + if err != nil { + logger.Warnf("failed to create task job for image layer %s: %w", file.URL, err) + return nil + } + + mu.Lock() + jobs = append(jobs, job) + mu.Unlock() + return nil + }) + } + + // If any of the goroutines return an error, ignore it and continue processing. + if err := eg.Wait(); err != nil { + logger.Errorf("failed to create get task jobs: %w", err) + } + + return jobs +} + +func (s *service) createGetTaskJobSync(ctx context.Context, json types.CreateGetTaskJobRequest, schedulers []models.Scheduler) (*models.Job, error) { + if json.Args.FilteredQueryParams == "" { + json.Args.FilteredQueryParams = http.RawDefaultFilteredQueryParams + } + + args, err := structure.StructToMap(json.Args) if err != nil { return nil, err } @@ -197,10 +362,130 @@ func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTask return nil, err } - go s.pollingJob(context.Background(), internaljob.GetTaskJob, job.ID, job.TaskID) + s.pollingJob(context.Background(), internaljob.GetTaskJob, job.ID, job.TaskID, 3, 5, 4) + if err := s.db.WithContext(ctx).Preload("SeedPeerClusters").Preload("SchedulerClusters").First(&job, job.ID).Error; err != nil { + return nil, err + } + return &job, nil } +func (s *service) extractPeersFromJobs(jobs []*models.Job) ([]types.Peer, error) { + m := make(map[string]*types.Peer, len(jobs)) + for _, job := range jobs { + if job.State != machineryv1tasks.StateSuccess { + continue + } + + jobStates, ok := job.Result["job_states"].([]any) + if !ok { + logger.Warnf("job %s has no job_states in result", job.ID) + continue + } + + url, ok := job.Args["url"].(string) + if !ok { + logger.Warnf("job %s has no url in args", job.ID) + continue + } + + for _, jobState := range jobStates { + jobState, ok := jobState.(map[string]any) + if !ok { + logger.Warnf("job %s has invalid job_state in result", job.ID) + continue + } + + results, ok := jobState["results"].([]any) + if !ok { + logger.Warnf("job %s has no results in job_state", job.ID) + continue + } + + for _, result := range results { + result, ok := result.(map[string]any) + if !ok { + logger.Warnf("job %s has invalid result in job_state", job.ID) + continue + } + + schedulerClusterID, ok := result["scheduler_cluster_id"].(float64) + if !ok { + logger.Warnf("job %s has no scheduler_cluster_id in result", job.ID) + continue + } + + peers, ok := result["peers"].([]any) + if !ok { + logger.Warnf("job %s has no peers in result", job.ID) + continue + } + + for _, peer := range peers { + peer, ok := peer.(map[string]any) + if !ok { + logger.Warnf("job %s has invalid peer in result", job.ID) + continue + } + + id, ok := peer["id"].(string) + if !ok { + logger.Warnf("job %s has invalid peer id in result", job.ID) + continue + } + + hostType, ok := peer["host_type"].(string) + if !ok { + logger.Warnf("job %s has no host_type in result for peer %s", job.ID, id) + continue + } + + // Only collect normal peers and skip seed peers. + if hostType != pkgtypes.HostTypeNormalName { + continue + } + + ip, ok := peer["ip"].(string) + if !ok { + logger.Warnf("job %s has no ip in result for peer %s", job.ID, id) + continue + } + + hostname, ok := peer["hostname"].(string) + if !ok { + logger.Warnf("job %s has no hostname in result for peer %s", job.ID, id) + continue + } + + hostID := idgen.HostIDV2(ip, hostname, false) + p, found := m[hostID] + if !found { + m[hostID] = &types.Peer{ + IP: ip, + Hostname: hostname, + CachedLayers: []types.Layer{{URL: url}}, + SchedulerClusterID: uint(schedulerClusterID), + } + } else { + if slices.Contains(p.CachedLayers, types.Layer{URL: url}) { + continue + } + + p.CachedLayers = append(p.CachedLayers, types.Layer{URL: url}) + } + } + } + } + } + + peers := make([]types.Peer, 0, len(m)) + for _, peer := range m { + peers = append(peers, *peer) + } + + return peers, nil +} + func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDeleteTaskJobRequest) (*models.Job, error) { if json.Args.Timeout == 0 { json.Args.Timeout = types.DefaultJobTimeout @@ -212,16 +497,19 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele args, err := structure.StructToMap(json.Args) if err != nil { + logger.Errorf("convert delete task args to map failed: %w", err) return nil, err } schedulers, err := s.findAllSchedulersInClusters(ctx, json.SchedulerClusterIDs) if err != nil { + logger.Errorf("find schedulers in clusters failed: %w", err) return nil, err } groupJobState, err := s.job.CreateDeleteTask(ctx, schedulers, json.Args) if err != nil { + logger.Errorf("create delete task job failed: %w", err) return nil, err } @@ -241,10 +529,11 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele } if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { + logger.Errorf("create delete task job failed: %w", err) return nil, err } - go s.pollingJob(context.Background(), internaljob.DeleteTaskJob, job.ID, job.TaskID) + go s.pollingJob(context.Background(), internaljob.DeleteTaskJob, job.ID, job.TaskID, 30, 300, 16) return &job, nil } @@ -413,21 +702,23 @@ func (s *service) findAllCandidateSchedulersInClusters(ctx context.Context, sche return candidateSchedulers, nil } -func (s *service) pollingJob(ctx context.Context, name string, id uint, groupID string) { +func (s *service) pollingJob(ctx context.Context, name string, id uint, groupID string, initBackoff float64, maxBackoff float64, maxAttempts int) { var ( job models.Job log = logger.WithGroupAndJobID(groupID, fmt.Sprint(id)) ) - if _, _, err := retry.Run(ctx, 30, 300, 16, func() (any, bool, error) { + if _, _, err := retry.Run(ctx, initBackoff, maxBackoff, maxAttempts, func() (any, bool, error) { groupJob, err := s.job.GetGroupJobState(name, groupID) if err != nil { - log.Errorf("polling group failed: %s", err.Error()) + err = fmt.Errorf("get group job state failed: %w", err) + log.Error(err) return nil, false, err } result, err := structure.StructToMap(groupJob) if err != nil { - log.Errorf("polling group failed: %s", err.Error()) + err = fmt.Errorf("convert group job state to map failed: %w", err) + log.Error(err) return nil, false, err } @@ -435,7 +726,8 @@ func (s *service) pollingJob(ctx context.Context, name string, id uint, groupID State: groupJob.State, Result: result, }).Error; err != nil { - log.Errorf("polling group failed: %s", err.Error()) + err = fmt.Errorf("update job state failed: %w", err) + log.Error(err) return nil, true, err } @@ -455,7 +747,8 @@ func (s *service) pollingJob(ctx context.Context, name string, id uint, groupID return nil, false, errors.New(msg) } }); err != nil { - log.Errorf("polling group failed: %s", err.Error()) + err = fmt.Errorf("polling group job failed: %w", err) + log.Error(err) } // Polling timeout and failed. @@ -464,8 +757,10 @@ func (s *service) pollingJob(ctx context.Context, name string, id uint, groupID if err := s.db.WithContext(ctx).First(&job, id).Updates(models.Job{ State: machineryv1tasks.StateFailure, }).Error; err != nil { - log.Errorf("polling group failed: %s", err.Error()) + err = fmt.Errorf("update job state to failure failed: %w", err) + log.Error(err) } + log.Error("polling group timeout") } } diff --git a/manager/service/mocks/service_mock.go b/manager/service/mocks/service_mock.go index 570ebcf2b..a07c61d5c 100644 --- a/manager/service/mocks/service_mock.go +++ b/manager/service/mocks/service_mock.go @@ -220,6 +220,21 @@ func (mr *MockServiceMockRecorder) CreateGCJob(arg0, arg1 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateGCJob", reflect.TypeOf((*MockService)(nil).CreateGCJob), arg0, arg1) } +// CreateGetImageDistributionJob mocks base method. +func (m *MockService) CreateGetImageDistributionJob(arg0 context.Context, arg1 types.CreateGetImageDistributionJobRequest) (*types.CreateGetImageDistributionJobResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateGetImageDistributionJob", arg0, arg1) + ret0, _ := ret[0].(*types.CreateGetImageDistributionJobResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateGetImageDistributionJob indicates an expected call of CreateGetImageDistributionJob. +func (mr *MockServiceMockRecorder) CreateGetImageDistributionJob(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateGetImageDistributionJob", reflect.TypeOf((*MockService)(nil).CreateGetImageDistributionJob), arg0, arg1) +} + // CreateGetTaskJob mocks base method. func (m *MockService) CreateGetTaskJob(arg0 context.Context, arg1 types.CreateGetTaskJobRequest) (*models.Job, error) { m.ctrl.T.Helper() diff --git a/manager/service/service.go b/manager/service/service.go index 0813d458e..1a38e3c4b 100644 --- a/manager/service/service.go +++ b/manager/service/service.go @@ -119,6 +119,7 @@ type Service interface { CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error CreateDeleteTaskJob(context.Context, types.CreateDeleteTaskJobRequest) (*models.Job, error) CreateGetTaskJob(context.Context, types.CreateGetTaskJobRequest) (*models.Job, error) + CreateGetImageDistributionJob(context.Context, types.CreateGetImageDistributionJobRequest) (*types.CreateGetImageDistributionJobResponse, error) CreateGCJob(context.Context, types.CreateGCJobRequest) (*models.Job, error) DestroyJob(context.Context, uint) error UpdateJob(context.Context, uint, types.UpdateJobRequest) (*models.Job, error) diff --git a/manager/types/job.go b/manager/types/job.go index dc13d2e01..99cbace6d 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -116,10 +116,10 @@ type PreheatArgs struct { URLs []string `json:"urls" binding:"omitempty"` // PieceLength is the piece length(bytes) for downloading file. The value needs to - // be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib). - // If the piece length is not specified, the piece length will be calculated - // according to the file size. - PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"` + // be greater than 4MiB (4,194,304 bytes) and less than 64MiB (67,108,864 bytes), + // for example: 4194304(4mib), 8388608(8mib). If the piece length is not specified, + // the piece length will be calculated according to the file size. + PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304,lte=67108864"` // Tag is the tag for preheating. Tag string `json:"tag" binding:"omitempty"` @@ -209,10 +209,10 @@ type GetTaskArgs struct { URL string `json:"url" binding:"omitempty"` // PieceLength is the piece length(bytes) for downloading file. The value needs to - // be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib). - // If the piece length is not specified, the piece length will be calculated - // according to the file size. - PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"` + // be greater than 4MiB (4,194,304 bytes) and less than 64MiB (67,108,864 bytes), + // for example: 4194304(4mib), 8388608(8mib). If the piece length is not specified, + // the piece length will be calculated according to the file size. + PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304,lte=67108864"` // Tag is the tag of the task. Tag string `json:"tag" binding:"omitempty"` @@ -229,6 +229,91 @@ type GetTaskArgs struct { ContentForCalculatingTaskID *string `json:"content_for_calculating_task_id" binding:"omitempty"` } +type CreateGetImageDistributionJobRequest struct { + // BIO is the description of the job. + BIO string `json:"bio" binding:"omitempty"` + + // Type is the type of the job. + Type string `json:"type" binding:"required"` + + // Args is the arguments of the job. + Args GetImageDistributionArgs `json:"args" binding:"omitempty"` + + // UserID is the user id of the job. + UserID uint `json:"user_id" binding:"omitempty"` + + // SchedulerClusterIDs is the scheduler cluster ids of the job. + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` +} + +type GetImageDistributionArgs struct { + // URL is the image manifest url of the task. + URL string `json:"url" binding:"required"` + + // PieceLength is the piece length(bytes) for downloading image blobs. The value needs to + // be greater than 4MiB (4,194,304 bytes) and less than 64MiB (67,108,864 bytes), + // for example: 4194304(4mib), 8388608(8mib). If the piece length is not specified, + // the piece length will be calculated according to the file size. + PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304,lte=67108864"` + + // Tag is the tag of the task. + Tag string `json:"tag" binding:"omitempty"` + + // Application is the application of the task. + Application string `json:"application" binding:"omitempty"` + + // FilteredQueryParams is the filtered query params of the task. + FilteredQueryParams string `json:"filtered_query_params" binding:"omitempty"` + + // Headers is the http headers for authentication. + Headers map[string]string `json:"headers" binding:"omitempty"` + + // Username is the username for authentication. + Username string `json:"username" binding:"omitempty"` + + // Password is the password for authentication. + Password string `json:"password" binding:"omitempty"` + + // The image type preheating task can specify the image architecture type. eg: linux/amd64. + Platform string `json:"platform" binding:"omitempty"` +} + +// CreateGetImageDistributionJobResponse is the response for creating a get image job. +type CreateGetImageDistributionJobResponse struct { + // Image is the image information. + Image Image `json:"image"` + + // Peers is the peers that have downloaded the image. + Peers []Peer `json:"peers"` +} + +// Peer represents a peer in the get image job. +type Peer struct { + // IP is the IP address of the peer. + IP string `json:"ip"` + + // Hostname is the hostname of the peer. + Hostname string `json:"hostname"` + + // CachedLayers is the list of layers that the peer has downloaded. + CachedLayers []Layer `json:"layers"` + + // SchedulerClusterID is the scheduler cluster id of the peer. + SchedulerClusterID uint `json:"scheduler_cluster_id"` +} + +// Image represents the image information. +type Image struct { + // Layers is the list of layers of the image. + Layers []Layer `json:"layers"` +} + +// Layer represents a layer of the image. +type Layer struct { + // URL is the URL of the layer. + URL string `json:"url"` +} + type CreateDeleteTaskJobRequest struct { // BIO is the description of the job. BIO string `json:"bio" binding:"omitempty"` @@ -254,10 +339,10 @@ type DeleteTaskArgs struct { URL string `json:"url" binding:"omitempty"` // PieceLength is the piece length(bytes) for downloading file. The value needs to - // be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib). - // If the piece length is not specified, the piece length will be calculated - // according to the file size. - PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"` + // be greater than 4MiB (4,194,304 bytes) and less than 64MiB (67,108,864 bytes), + // for example: 4194304(4mib), 8388608(8mib). If the piece length is not specified, + // the piece length will be calculated according to the file size. + PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304,lte=67108864"` // Tag is the tag of the task. Tag string `json:"tag" binding:"omitempty"` diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index ba23c4803..5b47b1b6d 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -194,9 +194,9 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *standar stream, loaded := peer.LoadAnnouncePeerStream() if !loaded { if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil { - msg := fmt.Sprintf("peer deletes inedges failed: %s", err.Error()) - peer.Log.Error(msg) - return status.Error(codes.Internal, msg) + err = fmt.Errorf("peer deletes inedges failed: %w", err) + peer.Log.Error(err) + return status.Error(codes.Internal, err.Error()) } peer.Log.Error("load stream failed") @@ -215,7 +215,8 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *standar // Add edge from parent to peer. for _, candidateParent := range candidateParents { if err := peer.Task.AddPeerEdge(candidateParent, peer); err != nil { - peer.Log.Warnf("peer adds edge failed: %s", err.Error()) + err = fmt.Errorf("peer adds edge failed: %w", err) + peer.Log.Warn(err) continue } } @@ -259,7 +260,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer peer.Log.Infof("send Code_SchedNeedBackSource to peer, because of peer's NeedBackToSource is %t", peer.NeedBackToSource.Load()) if err := peer.FSM.Event(ctx, standard.PeerEventDownloadBackToSource); err != nil { - peer.Log.Errorf("peer fsm event failed: %s", err.Error()) + err = fmt.Errorf("peer fsm event failed: %w", err) + peer.Log.Error(err) return } @@ -267,7 +269,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer // peer back-to-source and reset task state to TaskStateRunning. if peer.Task.FSM.Is(standard.TaskStateFailed) { if err := peer.Task.FSM.Event(ctx, standard.TaskEventDownload); err != nil { - peer.Task.Log.Errorf("task fsm event failed: %s", err.Error()) + err = fmt.Errorf("task fsm event failed: %w", err) + peer.Task.Log.Error(err) return } } @@ -292,7 +295,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer peer.Log.Infof("send Code_SchedNeedBackSource to peer, because of scheduling exceeded RetryBackToSourceLimit %d", s.config.RetryBackToSourceLimit) if err := peer.FSM.Event(ctx, standard.PeerEventDownloadBackToSource); err != nil { - peer.Log.Errorf("peer fsm event failed: %s", err.Error()) + err = fmt.Errorf("peer fsm event failed: %w", err) + peer.Log.Error(err) return } @@ -300,7 +304,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer // peer back-to-source and reset task state to TaskStateRunning. if peer.Task.FSM.Is(standard.TaskStateFailed) { if err := peer.Task.FSM.Event(ctx, standard.TaskEventDownload); err != nil { - peer.Task.Log.Errorf("task fsm event failed: %s", err.Error()) + err = fmt.Errorf("task fsm event failed: %w", err) + peer.Task.Log.Error(err) return } } @@ -334,7 +339,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer // Condition 1: Scheduling can find candidate parents. if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil { n++ - peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error()) + err := fmt.Errorf("scheduling failed in %d times, because of %w", n, err) + peer.Log.Error(err) // Sleep to avoid hot looping. time.Sleep(s.config.RetryInterval) @@ -359,7 +365,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer peer.Log.Errorf("scheduling failed in %d times, because of loading peer stream failed", n) if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil { - peer.Log.Errorf("peer deletes inedges failed: %s", err.Error()) + err = fmt.Errorf("peer deletes inedges failed: %w", err) + peer.Log.Error(err) return } @@ -370,10 +377,12 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer peer.Log.Info("send PeerPacket to peer") if err := stream.Send(constructSuccessPeerPacket(peer, candidateParents[0], candidateParents[1:])); err != nil { n++ - peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error()) + err = fmt.Errorf("send PeerPacket to peer failed in %d times, because of %w", n, err) + peer.Log.Error(err) if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil { - peer.Log.Errorf("peer deletes inedges failed: %s", err.Error()) + err = fmt.Errorf("peer deletes inedges failed: %w", err) + peer.Log.Error(err) return } @@ -383,7 +392,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer // Add edge from parent to peer. for _, candidateParent := range candidateParents { if err := peer.Task.AddPeerEdge(candidateParent, peer); err != nil { - peer.Log.Debugf("peer adds edge failed: %s", err.Error()) + err = fmt.Errorf("peer adds edge failed: %w", err) + peer.Log.Debug(err) continue } } @@ -595,7 +605,8 @@ func (s *scheduling) filterCandidateParents(peer *standard.Peer, blocklist set.S func (s *scheduling) FindReplicatePersistentCacheHosts(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) ([]*persistentcache.Peer, []*persistentcache.Host, bool) { currentPersistentReplicaCount, err := s.persistentCacheResource.TaskManager().LoadCurrentPersistentReplicaCount(ctx, task.ID) if err != nil { - task.Log.Errorf("load current persistent replica count failed %s", err) + err = fmt.Errorf("load current persistent replica count failed: %w", err) + task.Log.Error(err) return nil, nil, false } @@ -638,7 +649,8 @@ func (s *scheduling) FindReplicatePersistentCacheHosts(ctx context.Context, task // Load all current persistent peers and add them to the blocklist to avoid scheduling the same host. currentPersistentPeers, err := s.persistentCacheResource.PeerManager().LoadPersistentAllByTaskID(ctx, task.ID) if err != nil { - task.Log.Errorf("load all persistent cache peers failed: %s", err.Error()) + err = fmt.Errorf("load all persistent cache peers failed: %w", err) + task.Log.Error(err) return nil, nil, false } @@ -701,7 +713,8 @@ func (s *scheduling) FindCandidatePersistentCacheParents(ctx context.Context, pe func (s *scheduling) filterCandidatePersistentCacheParents(ctx context.Context, peer *persistentcache.Peer, blocklist set.SafeSet[string]) []*persistentcache.Peer { parents, err := s.persistentCacheResource.PeerManager().LoadAllByTaskID(ctx, peer.Task.ID) if err != nil { - peer.Log.Errorf("load all persistent cache parents failed: %s", err.Error()) + err = fmt.Errorf("load all persistent cache parents failed: %w", err) + peer.Log.Error(err) return nil } @@ -740,7 +753,8 @@ func (s *scheduling) filterCandidatePersistentCacheParents(ctx context.Context, func (s *scheduling) filterCachedReplicatePersistentCacheParents(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) []*persistentcache.Peer { parents, err := s.persistentCacheResource.PeerManager().LoadAllByTaskID(ctx, task.ID) if err != nil { - task.Log.Errorf("load all persistent cache parents failed: %s", err.Error()) + err = fmt.Errorf("load all persistent cache parents failed: %w", err) + task.Log.Error(err) return nil } @@ -785,7 +799,8 @@ func (s *scheduling) filterCachedReplicatePersistentCacheParents(ctx context.Con func (s *scheduling) filterReplicatePersistentCacheHosts(ctx context.Context, task *persistentcache.Task, count int, blocklist set.SafeSet[string]) []*persistentcache.Host { hosts, err := s.persistentCacheResource.HostManager().LoadRandom(ctx, count, blocklist) if err != nil { - task.Log.Errorf("load all persistent cache hosts failed: %s", err.Error()) + err = fmt.Errorf("load all persistent cache hosts failed: %w", err) + task.Log.Error(err) return nil } diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index d43ea5e8a..7db333666 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -441,9 +441,9 @@ func (v *V2) DeletePeer(_ctx context.Context, req *schedulerv2.DeletePeerRequest } if err := peer.FSM.Event(ctx, standard.PeerEventLeave); err != nil { - msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) - peer.Log.Error(msg) - return status.Error(codes.FailedPrecondition, msg) + err = fmt.Errorf("peer fsm event failed: %w", err) + peer.Log.Error(err) + return status.Error(codes.FailedPrecondition, err.Error()) } return nil @@ -537,8 +537,8 @@ func (v *V2) DeleteTask(_ctx context.Context, req *schedulerv2.DeleteTaskRequest if peer.Task.ID == req.GetTaskId() { if err := peer.FSM.Event(ctx, standard.PeerEventLeave); err != nil { - msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) - peer.Log.Error(msg) + err = fmt.Errorf("peer fsm event failed: %w", err) + peer.Log.Error(err) return true } }