feat: support get info of the preheating image (#4139)

* feat: support get image by job

Signed-off-by: Gaius <gaius.qi@gmail.com>

* feat: support get info of the preheating image

Signed-off-by: Gaius <gaius.qi@gmail.com>

* feat: refactor GetImageJob to GetImageDistributionJob and improve error handling

Signed-off-by: Gaius <gaius.qi@gmail.com>

---------

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2025-06-18 18:57:22 +08:00 committed by GitHub
parent 7723cbd71b
commit 903ba33a5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 514 additions and 84 deletions

View File

@ -6,7 +6,7 @@ coverage:
project:
default:
enabled: yes
target: 33%
target: 32%
patch:
default:
enabled: no

View File

@ -33,6 +33,9 @@ const (
// GetTaskJob is the name of getting task job.
GetTaskJob = "get_task"
// GetImageDistributionJob is the job name of getting image distribution.
GetImageDistributionJob = "get_image_distribution"
// DeleteTaskJob is the name of deleting task job.
DeleteTaskJob = "delete_task"
@ -43,8 +46,8 @@ const (
// Machinery server configuration.
const (
DefaultResultsExpireIn = 86400
DefaultRedisMaxIdle = 30
DefaultRedisMaxActive = 50
DefaultRedisMaxIdle = 70
DefaultRedisMaxActive = 100
DefaultRedisIdleTimeout = 30
DefaultRedisReadTimeout = 60
DefaultRedisWriteTimeout = 60

View File

@ -36,7 +36,7 @@ const (
// cacher is a cache implementation using LRU for gorm.
type cacher struct {
// store is the LRU cache.
store *lru.LRU[string, any]
store *lru.ShardedLRU[string, any]
}
// hashStringXXHASH returns the hash of the string s.
@ -46,7 +46,7 @@ func hashStringXXHASH(s string) uint32 {
// newCacher creates a new cacher.
func newCacher() (caches.Cacher, error) {
store, err := lru.New[string, any](defaultCacheSize, hashStringXXHASH)
store, err := lru.NewSharded[string, any](defaultCacheSize, hashStringXXHASH)
if err != nil {
return nil, err
}
@ -86,7 +86,7 @@ func (c *cacher) Store(ctx context.Context, key string, val *caches.Query[any])
// INSERT / UPDATE / DELETE queries are sent to the DB.
func (c *cacher) Invalidate(ctx context.Context) error {
var err error
c.store, err = lru.New[string, any](defaultCacheSize, hashStringXXHASH)
c.store, err = lru.NewSharded[string, any](defaultCacheSize, hashStringXXHASH)
if err != nil {
return err
}

View File

@ -98,6 +98,20 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
return
}
ctx.JSON(http.StatusOK, job)
case job.GetImageDistributionJob:
var json types.CreateGetImageDistributionJobRequest
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}
job, err := h.service.CreateGetImageDistributionJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
return
}
ctx.JSON(http.StatusOK, job)
case job.DeleteTaskJob:
var json types.CreateDeleteTaskJobRequest

View File

@ -67,11 +67,6 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) {
}
}
preheat, err := newPreheat(j, cfg.Job.Preheat.RegistryTimeout, certPool, cfg.Job.Preheat.TLS.InsecureSkipVerify)
if err != nil {
return nil, err
}
syncPeers, err := newSyncPeers(cfg, j, gdb)
if err != nil {
return nil, err
@ -79,7 +74,7 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) {
return &Job{
Job: j,
Preheat: preheat,
Preheat: newPreheat(j, cfg.Job.Preheat.RegistryTimeout, certPool, cfg.Job.Preheat.TLS.InsecureSkipVerify),
SyncPeers: syncPeers,
Task: newTask(j),
}, nil

View File

@ -56,6 +56,11 @@ import (
// preheatImage is an image for preheat.
type PreheatType string
// String returns the string representation of PreheatType.
func (p PreheatType) String() string {
return string(p)
}
const (
// PreheatImageType is image type of preheat job.
PreheatImageType PreheatType = "image"
@ -81,26 +86,19 @@ type Preheat interface {
// preheat is an implementation of Preheat.
type preheat struct {
job *internaljob.Job
certificateChain [][]byte
rootCAs *x509.CertPool
insecureSkipVerify bool
registryTimeout time.Duration
}
// newPreheat creates a new Preheat.
func newPreheat(job *internaljob.Job, registryTimeout time.Duration, rootCAs *x509.CertPool, insecureSkipVerify bool) (Preheat, error) {
p := &preheat{
func newPreheat(job *internaljob.Job, registryTimeout time.Duration, rootCAs *x509.CertPool, insecureSkipVerify bool) Preheat {
return &preheat{
job: job,
rootCAs: rootCAs,
insecureSkipVerify: insecureSkipVerify,
registryTimeout: registryTimeout,
}
if rootCAs != nil {
p.certificateChain = rootCAs.Subjects()
}
return p, nil
}
// CreatePreheat creates a preheat job.
@ -121,7 +119,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
return nil, errors.New("invalid params: url is required")
}
files, err = p.getImageLayers(ctx, json)
files, err = GetImageLayers(ctx, json, p.registryTimeout, p.rootCAs, p.insecureSkipVerify)
if err != nil {
return nil, err
}
@ -135,6 +133,11 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
json.URLs = append(json.URLs, json.URL)
}
var certificateChain [][]byte
if p.rootCAs != nil {
certificateChain = p.rootCAs.Subjects()
}
for _, url := range json.URLs {
files = append(files, internaljob.PreheatRequest{
URL: url,
@ -147,7 +150,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
Percentage: json.Percentage,
Count: json.Count,
ConcurrentCount: json.ConcurrentCount,
CertificateChain: p.certificateChain,
CertificateChain: certificateChain,
InsecureSkipVerify: p.insecureSkipVerify,
Timeout: json.Timeout,
LoadToCache: json.LoadToCache,
@ -211,8 +214,7 @@ func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.Prehea
}, nil
}
// getImageLayers gets layers of image.
func (p *preheat) getImageLayers(ctx context.Context, args types.PreheatArgs) ([]internaljob.PreheatRequest, error) {
func GetImageLayers(ctx context.Context, args types.PreheatArgs, registryTimeout time.Duration, rootCAs *x509.CertPool, insecureSkipVerify bool) ([]internaljob.PreheatRequest, error) {
ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()
@ -232,10 +234,10 @@ func (p *preheat) getImageLayers(ctx context.Context, args types.PreheatArgs) ([
}
httpClient := &http.Client{
Timeout: p.registryTimeout,
Timeout: registryTimeout,
Transport: &http.Transport{
DialContext: nethttp.NewSafeDialer().DialContext,
TLSClientConfig: &tls.Config{RootCAs: p.rootCAs, InsecureSkipVerify: p.insecureSkipVerify},
TLSClientConfig: &tls.Config{RootCAs: rootCAs, InsecureSkipVerify: insecureSkipVerify},
MaxIdleConns: defaultHTTPTransport.MaxIdleConns,
MaxIdleConnsPerHost: defaultHTTPTransport.MaxIdleConnsPerHost,
MaxConnsPerHost: defaultHTTPTransport.MaxConnsPerHost,
@ -259,7 +261,7 @@ func (p *preheat) getImageLayers(ctx context.Context, args types.PreheatArgs) ([
}
// Get manifests.
manifests, err := p.getManifests(ctx, client, image, header.Clone(), platform)
manifests, err := getManifests(ctx, client, image, header.Clone(), platform)
if err != nil {
return nil, err
}
@ -273,7 +275,7 @@ func (p *preheat) getImageLayers(ctx context.Context, args types.PreheatArgs) ([
header.Set("Authorization", client.GetAuthToken())
// parse image layers to preheat
layers, err := p.parseLayers(manifests, args, header.Clone(), image)
layers, err := parseLayers(manifests, args, header.Clone(), image, rootCAs, insecureSkipVerify)
if err != nil {
return nil, err
}
@ -282,7 +284,7 @@ func (p *preheat) getImageLayers(ctx context.Context, args types.PreheatArgs) ([
}
// getManifests gets manifests of image.
func (p *preheat) getManifests(ctx context.Context, client *imageAuthClient, image *preheatImage, header http.Header, platform specs.Platform) ([]distribution.Manifest, error) {
func getManifests(ctx context.Context, client *imageAuthClient, image *preheatImage, header http.Header, platform specs.Platform) ([]distribution.Manifest, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, image.manifestURL(), nil)
if err != nil {
return nil, err
@ -330,9 +332,9 @@ func (p *preheat) getManifests(ctx context.Context, client *imageAuthClient, ima
return []distribution.Manifest{v}, nil
case *manifestlist.DeserializedManifestList:
var result []distribution.Manifest
for _, v := range p.filterManifests(v.Manifests, platform) {
for _, v := range filterManifests(v.Manifests, platform) {
image.tag = v.Digest.String()
manifests, err := p.getManifests(ctx, client, image, header.Clone(), platform)
manifests, err := getManifests(ctx, client, image, header.Clone(), platform)
if err != nil {
return nil, err
}
@ -347,7 +349,7 @@ func (p *preheat) getManifests(ctx context.Context, client *imageAuthClient, ima
}
// filterManifests filters manifests with platform.
func (p *preheat) filterManifests(manifests []manifestlist.ManifestDescriptor, platform specs.Platform) []manifestlist.ManifestDescriptor {
func filterManifests(manifests []manifestlist.ManifestDescriptor, platform specs.Platform) []manifestlist.ManifestDescriptor {
var matches []manifestlist.ManifestDescriptor
for _, desc := range manifests {
if desc.Platform.Architecture == platform.Architecture && desc.Platform.OS == platform.OS {
@ -359,7 +361,12 @@ func (p *preheat) filterManifests(manifests []manifestlist.ManifestDescriptor, p
}
// parseLayers parses layers of image.
func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.PreheatArgs, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) {
func parseLayers(manifests []distribution.Manifest, args types.PreheatArgs, header http.Header, image *preheatImage, rootCAs *x509.CertPool, insecureSkipVerify bool) ([]internaljob.PreheatRequest, error) {
var certificateChain [][]byte
if rootCAs != nil {
certificateChain = rootCAs.Subjects()
}
var layers []internaljob.PreheatRequest
for _, m := range manifests {
for _, v := range m.References() {
@ -375,8 +382,8 @@ func (p *preheat) parseLayers(manifests []distribution.Manifest, args types.Preh
Percentage: args.Percentage,
Count: args.Count,
ConcurrentCount: args.ConcurrentCount,
CertificateChain: p.certificateChain,
InsecureSkipVerify: p.insecureSkipVerify,
CertificateChain: certificateChain,
InsecureSkipVerify: insecureSkipVerify,
Timeout: args.Timeout,
}

View File

@ -3,6 +3,7 @@ package job
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
@ -10,7 +11,7 @@ import (
"d7y.io/dragonfly/v2/manager/types"
)
func TestPreheat_getImageLayers(t *testing.T) {
func TestPreheat_GetImageLayers(t *testing.T) {
tests := []struct {
name string
args types.PreheatArgs
@ -42,8 +43,7 @@ func TestPreheat_getImageLayers(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
p := &preheat{}
layers, err := p.getImageLayers(context.Background(), tc.args)
layers, err := GetImageLayers(context.Background(), tc.args, 30*time.Second, nil, true)
if err != nil {
t.Fatal(err)
}

View File

@ -18,24 +18,30 @@ package service
import (
"context"
"crypto/x509"
"errors"
"fmt"
"sync"
"time"
machineryv1tasks "github.com/dragonflyoss/machinery/v1/tasks"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"gorm.io/gorm"
logger "d7y.io/dragonfly/v2/internal/dflog"
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/job"
"d7y.io/dragonfly/v2/manager/metrics"
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/manager/types"
pkggc "d7y.io/dragonfly/v2/pkg/gc"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/net/http"
"d7y.io/dragonfly/v2/pkg/retry"
"d7y.io/dragonfly/v2/pkg/slices"
"d7y.io/dragonfly/v2/pkg/structure"
pkgtypes "d7y.io/dragonfly/v2/pkg/types"
)
const (
@ -44,6 +50,9 @@ const (
// DefaultGCJobPollingInterval is the default interval for polling GC job.
DefaultGCJobPollingInterval = 5 * time.Second
// DefaultGetImageDistributionJobConcurrentCount is the default concurrent count for getting image distribution job.
DefaultGetImageDistributionJobConcurrentCount = 30
)
func (s *service) CreateGCJob(ctx context.Context, json types.CreateGCJobRequest) (*models.Job, error) {
@ -53,6 +62,7 @@ func (s *service) CreateGCJob(ctx context.Context, json types.CreateGCJobRequest
// This is a non-block function to run the gc task, which will run the task asynchronously in the backend.
if err := s.gc.Run(ctx, json.Args.Type); err != nil {
logger.Errorf("run gc job failed: %w", err)
return nil, err
}
@ -97,6 +107,7 @@ func (s *service) pollingGCJob(ctx context.Context, jobType string, userID uint,
func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error {
schedulers, err := s.findSchedulerInClusters(ctx, json.SchedulerClusterIDs)
if err != nil {
logger.Errorf("find scheduler in clusters failed: %w", err)
return err
}
@ -122,16 +133,19 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat
args, err := structure.StructToMap(json.Args)
if err != nil {
logger.Errorf("convert preheat args to map failed: %w", err)
return nil, err
}
candidateSchedulers, err := s.findAllCandidateSchedulersInClusters(ctx, json.SchedulerClusterIDs, []string{types.SchedulerFeaturePreheat})
if err != nil {
logger.Errorf("find candidate schedulers in clusters failed: %w", err)
return nil, err
}
groupJobState, err := s.job.CreatePreheat(ctx, candidateSchedulers, json.Args)
if err != nil {
logger.Errorf("create preheat job failed: %w", err)
return nil, err
}
@ -151,10 +165,11 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat
}
if err := s.db.WithContext(ctx).Create(&job).Error; err != nil {
logger.Errorf("create preheat job failed: %w", err)
return nil, err
}
go s.pollingJob(context.Background(), internaljob.PreheatJob, job.ID, job.TaskID)
go s.pollingJob(context.Background(), internaljob.PreheatJob, job.ID, job.TaskID, 30, 300, 16)
return &job, nil
}
@ -165,10 +180,160 @@ func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTask
args, err := structure.StructToMap(json.Args)
if err != nil {
logger.Errorf("convert get task args to map failed: %w", err)
return nil, err
}
schedulers, err := s.findAllSchedulersInClusters(ctx, json.SchedulerClusterIDs)
if err != nil {
logger.Errorf("find schedulers in clusters failed: %w", err)
return nil, err
}
groupJobState, err := s.job.CreateGetTask(ctx, schedulers, json.Args)
if err != nil {
logger.Errorf("create get task job failed: %w", err)
return nil, err
}
var schedulerClusters []models.SchedulerCluster
for _, scheduler := range schedulers {
schedulerClusters = append(schedulerClusters, scheduler.SchedulerCluster)
}
job := models.Job{
TaskID: groupJobState.GroupUUID,
BIO: json.BIO,
Type: json.Type,
State: groupJobState.State,
Args: args,
UserID: json.UserID,
SchedulerClusters: schedulerClusters,
}
if err := s.db.WithContext(ctx).Create(&job).Error; err != nil {
logger.Errorf("create get task job failed: %w", err)
return nil, err
}
go s.pollingJob(context.Background(), internaljob.GetTaskJob, job.ID, job.TaskID, 30, 300, 16)
logger.Infof("create get task job %s for task %s in scheduler clusters %v", job.ID, job.TaskID, json.SchedulerClusterIDs)
return &job, nil
}
func (s *service) CreateGetImageDistributionJob(ctx context.Context, json types.CreateGetImageDistributionJobRequest) (*types.CreateGetImageDistributionJobResponse, error) {
layers, err := s.getImageLayers(ctx, json)
if err != nil {
err = fmt.Errorf("get image layers failed: %w", err)
logger.Error(err)
return nil, err
}
image := types.Image{Layers: make([]types.Layer, 0, len(layers))}
for _, file := range layers {
image.Layers = append(image.Layers, types.Layer{URL: file.URL})
}
schedulers, err := s.findAllSchedulersInClusters(ctx, json.SchedulerClusterIDs)
if err != nil {
err = fmt.Errorf("find schedulers in clusters failed: %w", err)
logger.Error(err)
return nil, err
}
// Create multiple get task jobs synchronously for each layer and
// extract peers from the jobs.
jobs := s.createGetTaskJobsSync(ctx, layers, json, schedulers)
peers, err := s.extractPeersFromJobs(jobs)
if err != nil {
err = fmt.Errorf("extract peers from jobs failed: %w", err)
logger.Error(err)
return nil, err
}
return &types.CreateGetImageDistributionJobResponse{
Image: image,
Peers: peers,
}, nil
}
func (s *service) getImageLayers(ctx context.Context, json types.CreateGetImageDistributionJobRequest) ([]internaljob.PreheatRequest, error) {
var certPool *x509.CertPool
if len(s.config.Job.Preheat.TLS.CACert) != 0 {
certPool = x509.NewCertPool()
if !certPool.AppendCertsFromPEM([]byte(s.config.Job.Preheat.TLS.CACert)) {
return nil, errors.New("invalid CA Cert")
}
}
layers, err := job.GetImageLayers(ctx, types.PreheatArgs{
Type: job.PreheatImageType.String(),
URL: json.Args.URL,
PieceLength: json.Args.PieceLength,
Tag: json.Args.Tag,
Application: json.Args.Application,
FilteredQueryParams: json.Args.FilteredQueryParams,
Headers: json.Args.Headers,
Username: json.Args.Username,
Password: json.Args.Password,
Platform: json.Args.Platform,
}, s.config.Job.Preheat.RegistryTimeout, certPool, s.config.Job.Preheat.TLS.InsecureSkipVerify)
if err != nil {
return nil, fmt.Errorf("get image layers failed: %w", err)
}
if len(layers) == 0 {
return nil, errors.New("no valid image layers found")
}
return layers, nil
}
func (s *service) createGetTaskJobsSync(ctx context.Context, layers []internaljob.PreheatRequest, json types.CreateGetImageDistributionJobRequest, schedulers []models.Scheduler) []*models.Job {
var mu sync.Mutex
jobs := make([]*models.Job, 0, len(layers))
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(DefaultGetImageDistributionJobConcurrentCount)
for _, file := range layers {
eg.Go(func() error {
job, err := s.createGetTaskJobSync(ctx, types.CreateGetTaskJobRequest{
BIO: json.BIO,
Type: internaljob.GetTaskJob,
Args: types.GetTaskArgs{
URL: file.URL,
PieceLength: file.PieceLength,
Tag: file.Tag,
Application: file.Application,
FilteredQueryParams: json.Args.FilteredQueryParams,
},
SchedulerClusterIDs: json.SchedulerClusterIDs,
}, schedulers)
if err != nil {
logger.Warnf("failed to create task job for image layer %s: %w", file.URL, err)
return nil
}
mu.Lock()
jobs = append(jobs, job)
mu.Unlock()
return nil
})
}
// If any of the goroutines return an error, ignore it and continue processing.
if err := eg.Wait(); err != nil {
logger.Errorf("failed to create get task jobs: %w", err)
}
return jobs
}
func (s *service) createGetTaskJobSync(ctx context.Context, json types.CreateGetTaskJobRequest, schedulers []models.Scheduler) (*models.Job, error) {
if json.Args.FilteredQueryParams == "" {
json.Args.FilteredQueryParams = http.RawDefaultFilteredQueryParams
}
args, err := structure.StructToMap(json.Args)
if err != nil {
return nil, err
}
@ -197,10 +362,130 @@ func (s *service) CreateGetTaskJob(ctx context.Context, json types.CreateGetTask
return nil, err
}
go s.pollingJob(context.Background(), internaljob.GetTaskJob, job.ID, job.TaskID)
s.pollingJob(context.Background(), internaljob.GetTaskJob, job.ID, job.TaskID, 3, 5, 4)
if err := s.db.WithContext(ctx).Preload("SeedPeerClusters").Preload("SchedulerClusters").First(&job, job.ID).Error; err != nil {
return nil, err
}
return &job, nil
}
func (s *service) extractPeersFromJobs(jobs []*models.Job) ([]types.Peer, error) {
m := make(map[string]*types.Peer, len(jobs))
for _, job := range jobs {
if job.State != machineryv1tasks.StateSuccess {
continue
}
jobStates, ok := job.Result["job_states"].([]any)
if !ok {
logger.Warnf("job %s has no job_states in result", job.ID)
continue
}
url, ok := job.Args["url"].(string)
if !ok {
logger.Warnf("job %s has no url in args", job.ID)
continue
}
for _, jobState := range jobStates {
jobState, ok := jobState.(map[string]any)
if !ok {
logger.Warnf("job %s has invalid job_state in result", job.ID)
continue
}
results, ok := jobState["results"].([]any)
if !ok {
logger.Warnf("job %s has no results in job_state", job.ID)
continue
}
for _, result := range results {
result, ok := result.(map[string]any)
if !ok {
logger.Warnf("job %s has invalid result in job_state", job.ID)
continue
}
schedulerClusterID, ok := result["scheduler_cluster_id"].(float64)
if !ok {
logger.Warnf("job %s has no scheduler_cluster_id in result", job.ID)
continue
}
peers, ok := result["peers"].([]any)
if !ok {
logger.Warnf("job %s has no peers in result", job.ID)
continue
}
for _, peer := range peers {
peer, ok := peer.(map[string]any)
if !ok {
logger.Warnf("job %s has invalid peer in result", job.ID)
continue
}
id, ok := peer["id"].(string)
if !ok {
logger.Warnf("job %s has invalid peer id in result", job.ID)
continue
}
hostType, ok := peer["host_type"].(string)
if !ok {
logger.Warnf("job %s has no host_type in result for peer %s", job.ID, id)
continue
}
// Only collect normal peers and skip seed peers.
if hostType != pkgtypes.HostTypeNormalName {
continue
}
ip, ok := peer["ip"].(string)
if !ok {
logger.Warnf("job %s has no ip in result for peer %s", job.ID, id)
continue
}
hostname, ok := peer["hostname"].(string)
if !ok {
logger.Warnf("job %s has no hostname in result for peer %s", job.ID, id)
continue
}
hostID := idgen.HostIDV2(ip, hostname, false)
p, found := m[hostID]
if !found {
m[hostID] = &types.Peer{
IP: ip,
Hostname: hostname,
CachedLayers: []types.Layer{{URL: url}},
SchedulerClusterID: uint(schedulerClusterID),
}
} else {
if slices.Contains(p.CachedLayers, types.Layer{URL: url}) {
continue
}
p.CachedLayers = append(p.CachedLayers, types.Layer{URL: url})
}
}
}
}
}
peers := make([]types.Peer, 0, len(m))
for _, peer := range m {
peers = append(peers, *peer)
}
return peers, nil
}
func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDeleteTaskJobRequest) (*models.Job, error) {
if json.Args.Timeout == 0 {
json.Args.Timeout = types.DefaultJobTimeout
@ -212,16 +497,19 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele
args, err := structure.StructToMap(json.Args)
if err != nil {
logger.Errorf("convert delete task args to map failed: %w", err)
return nil, err
}
schedulers, err := s.findAllSchedulersInClusters(ctx, json.SchedulerClusterIDs)
if err != nil {
logger.Errorf("find schedulers in clusters failed: %w", err)
return nil, err
}
groupJobState, err := s.job.CreateDeleteTask(ctx, schedulers, json.Args)
if err != nil {
logger.Errorf("create delete task job failed: %w", err)
return nil, err
}
@ -241,10 +529,11 @@ func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDele
}
if err := s.db.WithContext(ctx).Create(&job).Error; err != nil {
logger.Errorf("create delete task job failed: %w", err)
return nil, err
}
go s.pollingJob(context.Background(), internaljob.DeleteTaskJob, job.ID, job.TaskID)
go s.pollingJob(context.Background(), internaljob.DeleteTaskJob, job.ID, job.TaskID, 30, 300, 16)
return &job, nil
}
@ -413,21 +702,23 @@ func (s *service) findAllCandidateSchedulersInClusters(ctx context.Context, sche
return candidateSchedulers, nil
}
func (s *service) pollingJob(ctx context.Context, name string, id uint, groupID string) {
func (s *service) pollingJob(ctx context.Context, name string, id uint, groupID string, initBackoff float64, maxBackoff float64, maxAttempts int) {
var (
job models.Job
log = logger.WithGroupAndJobID(groupID, fmt.Sprint(id))
)
if _, _, err := retry.Run(ctx, 30, 300, 16, func() (any, bool, error) {
if _, _, err := retry.Run(ctx, initBackoff, maxBackoff, maxAttempts, func() (any, bool, error) {
groupJob, err := s.job.GetGroupJobState(name, groupID)
if err != nil {
log.Errorf("polling group failed: %s", err.Error())
err = fmt.Errorf("get group job state failed: %w", err)
log.Error(err)
return nil, false, err
}
result, err := structure.StructToMap(groupJob)
if err != nil {
log.Errorf("polling group failed: %s", err.Error())
err = fmt.Errorf("convert group job state to map failed: %w", err)
log.Error(err)
return nil, false, err
}
@ -435,7 +726,8 @@ func (s *service) pollingJob(ctx context.Context, name string, id uint, groupID
State: groupJob.State,
Result: result,
}).Error; err != nil {
log.Errorf("polling group failed: %s", err.Error())
err = fmt.Errorf("update job state failed: %w", err)
log.Error(err)
return nil, true, err
}
@ -455,7 +747,8 @@ func (s *service) pollingJob(ctx context.Context, name string, id uint, groupID
return nil, false, errors.New(msg)
}
}); err != nil {
log.Errorf("polling group failed: %s", err.Error())
err = fmt.Errorf("polling group job failed: %w", err)
log.Error(err)
}
// Polling timeout and failed.
@ -464,8 +757,10 @@ func (s *service) pollingJob(ctx context.Context, name string, id uint, groupID
if err := s.db.WithContext(ctx).First(&job, id).Updates(models.Job{
State: machineryv1tasks.StateFailure,
}).Error; err != nil {
log.Errorf("polling group failed: %s", err.Error())
err = fmt.Errorf("update job state to failure failed: %w", err)
log.Error(err)
}
log.Error("polling group timeout")
}
}

View File

@ -220,6 +220,21 @@ func (mr *MockServiceMockRecorder) CreateGCJob(arg0, arg1 any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateGCJob", reflect.TypeOf((*MockService)(nil).CreateGCJob), arg0, arg1)
}
// CreateGetImageDistributionJob mocks base method.
func (m *MockService) CreateGetImageDistributionJob(arg0 context.Context, arg1 types.CreateGetImageDistributionJobRequest) (*types.CreateGetImageDistributionJobResponse, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateGetImageDistributionJob", arg0, arg1)
ret0, _ := ret[0].(*types.CreateGetImageDistributionJobResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateGetImageDistributionJob indicates an expected call of CreateGetImageDistributionJob.
func (mr *MockServiceMockRecorder) CreateGetImageDistributionJob(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateGetImageDistributionJob", reflect.TypeOf((*MockService)(nil).CreateGetImageDistributionJob), arg0, arg1)
}
// CreateGetTaskJob mocks base method.
func (m *MockService) CreateGetTaskJob(arg0 context.Context, arg1 types.CreateGetTaskJobRequest) (*models.Job, error) {
m.ctrl.T.Helper()

View File

@ -119,6 +119,7 @@ type Service interface {
CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) error
CreateDeleteTaskJob(context.Context, types.CreateDeleteTaskJobRequest) (*models.Job, error)
CreateGetTaskJob(context.Context, types.CreateGetTaskJobRequest) (*models.Job, error)
CreateGetImageDistributionJob(context.Context, types.CreateGetImageDistributionJobRequest) (*types.CreateGetImageDistributionJobResponse, error)
CreateGCJob(context.Context, types.CreateGCJobRequest) (*models.Job, error)
DestroyJob(context.Context, uint) error
UpdateJob(context.Context, uint, types.UpdateJobRequest) (*models.Job, error)

View File

@ -116,10 +116,10 @@ type PreheatArgs struct {
URLs []string `json:"urls" binding:"omitempty"`
// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`
// be greater than 4MiB (4,194,304 bytes) and less than 64MiB (67,108,864 bytes),
// for example: 4194304(4mib), 8388608(8mib). If the piece length is not specified,
// the piece length will be calculated according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304,lte=67108864"`
// Tag is the tag for preheating.
Tag string `json:"tag" binding:"omitempty"`
@ -209,10 +209,10 @@ type GetTaskArgs struct {
URL string `json:"url" binding:"omitempty"`
// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`
// be greater than 4MiB (4,194,304 bytes) and less than 64MiB (67,108,864 bytes),
// for example: 4194304(4mib), 8388608(8mib). If the piece length is not specified,
// the piece length will be calculated according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304,lte=67108864"`
// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`
@ -229,6 +229,91 @@ type GetTaskArgs struct {
ContentForCalculatingTaskID *string `json:"content_for_calculating_task_id" binding:"omitempty"`
}
type CreateGetImageDistributionJobRequest struct {
// BIO is the description of the job.
BIO string `json:"bio" binding:"omitempty"`
// Type is the type of the job.
Type string `json:"type" binding:"required"`
// Args is the arguments of the job.
Args GetImageDistributionArgs `json:"args" binding:"omitempty"`
// UserID is the user id of the job.
UserID uint `json:"user_id" binding:"omitempty"`
// SchedulerClusterIDs is the scheduler cluster ids of the job.
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
}
type GetImageDistributionArgs struct {
// URL is the image manifest url of the task.
URL string `json:"url" binding:"required"`
// PieceLength is the piece length(bytes) for downloading image blobs. The value needs to
// be greater than 4MiB (4,194,304 bytes) and less than 64MiB (67,108,864 bytes),
// for example: 4194304(4mib), 8388608(8mib). If the piece length is not specified,
// the piece length will be calculated according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304,lte=67108864"`
// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`
// Application is the application of the task.
Application string `json:"application" binding:"omitempty"`
// FilteredQueryParams is the filtered query params of the task.
FilteredQueryParams string `json:"filtered_query_params" binding:"omitempty"`
// Headers is the http headers for authentication.
Headers map[string]string `json:"headers" binding:"omitempty"`
// Username is the username for authentication.
Username string `json:"username" binding:"omitempty"`
// Password is the password for authentication.
Password string `json:"password" binding:"omitempty"`
// The image type preheating task can specify the image architecture type. eg: linux/amd64.
Platform string `json:"platform" binding:"omitempty"`
}
// CreateGetImageDistributionJobResponse is the response for creating a get image job.
type CreateGetImageDistributionJobResponse struct {
// Image is the image information.
Image Image `json:"image"`
// Peers is the peers that have downloaded the image.
Peers []Peer `json:"peers"`
}
// Peer represents a peer in the get image job.
type Peer struct {
// IP is the IP address of the peer.
IP string `json:"ip"`
// Hostname is the hostname of the peer.
Hostname string `json:"hostname"`
// CachedLayers is the list of layers that the peer has downloaded.
CachedLayers []Layer `json:"layers"`
// SchedulerClusterID is the scheduler cluster id of the peer.
SchedulerClusterID uint `json:"scheduler_cluster_id"`
}
// Image represents the image information.
type Image struct {
// Layers is the list of layers of the image.
Layers []Layer `json:"layers"`
}
// Layer represents a layer of the image.
type Layer struct {
// URL is the URL of the layer.
URL string `json:"url"`
}
type CreateDeleteTaskJobRequest struct {
// BIO is the description of the job.
BIO string `json:"bio" binding:"omitempty"`
@ -254,10 +339,10 @@ type DeleteTaskArgs struct {
URL string `json:"url" binding:"omitempty"`
// PieceLength is the piece length(bytes) for downloading file. The value needs to
// be greater than or equal to 4194304, for example: 4194304(4mib), 8388608(8mib).
// If the piece length is not specified, the piece length will be calculated
// according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304"`
// be greater than 4MiB (4,194,304 bytes) and less than 64MiB (67,108,864 bytes),
// for example: 4194304(4mib), 8388608(8mib). If the piece length is not specified,
// the piece length will be calculated according to the file size.
PieceLength *uint64 `json:"piece_length" binding:"omitempty,gte=4194304,lte=67108864"`
// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

View File

@ -194,9 +194,9 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *standar
stream, loaded := peer.LoadAnnouncePeerStream()
if !loaded {
if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil {
msg := fmt.Sprintf("peer deletes inedges failed: %s", err.Error())
peer.Log.Error(msg)
return status.Error(codes.Internal, msg)
err = fmt.Errorf("peer deletes inedges failed: %w", err)
peer.Log.Error(err)
return status.Error(codes.Internal, err.Error())
}
peer.Log.Error("load stream failed")
@ -215,7 +215,8 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *standar
// Add edge from parent to peer.
for _, candidateParent := range candidateParents {
if err := peer.Task.AddPeerEdge(candidateParent, peer); err != nil {
peer.Log.Warnf("peer adds edge failed: %s", err.Error())
err = fmt.Errorf("peer adds edge failed: %w", err)
peer.Log.Warn(err)
continue
}
}
@ -259,7 +260,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer
peer.Log.Infof("send Code_SchedNeedBackSource to peer, because of peer's NeedBackToSource is %t", peer.NeedBackToSource.Load())
if err := peer.FSM.Event(ctx, standard.PeerEventDownloadBackToSource); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
err = fmt.Errorf("peer fsm event failed: %w", err)
peer.Log.Error(err)
return
}
@ -267,7 +269,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer
// peer back-to-source and reset task state to TaskStateRunning.
if peer.Task.FSM.Is(standard.TaskStateFailed) {
if err := peer.Task.FSM.Event(ctx, standard.TaskEventDownload); err != nil {
peer.Task.Log.Errorf("task fsm event failed: %s", err.Error())
err = fmt.Errorf("task fsm event failed: %w", err)
peer.Task.Log.Error(err)
return
}
}
@ -292,7 +295,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer
peer.Log.Infof("send Code_SchedNeedBackSource to peer, because of scheduling exceeded RetryBackToSourceLimit %d", s.config.RetryBackToSourceLimit)
if err := peer.FSM.Event(ctx, standard.PeerEventDownloadBackToSource); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
err = fmt.Errorf("peer fsm event failed: %w", err)
peer.Log.Error(err)
return
}
@ -300,7 +304,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer
// peer back-to-source and reset task state to TaskStateRunning.
if peer.Task.FSM.Is(standard.TaskStateFailed) {
if err := peer.Task.FSM.Event(ctx, standard.TaskEventDownload); err != nil {
peer.Task.Log.Errorf("task fsm event failed: %s", err.Error())
err = fmt.Errorf("task fsm event failed: %w", err)
peer.Task.Log.Error(err)
return
}
}
@ -334,7 +339,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer
// Condition 1: Scheduling can find candidate parents.
if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil {
n++
peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error())
err := fmt.Errorf("scheduling failed in %d times, because of %w", n, err)
peer.Log.Error(err)
// Sleep to avoid hot looping.
time.Sleep(s.config.RetryInterval)
@ -359,7 +365,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer
peer.Log.Errorf("scheduling failed in %d times, because of loading peer stream failed", n)
if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil {
peer.Log.Errorf("peer deletes inedges failed: %s", err.Error())
err = fmt.Errorf("peer deletes inedges failed: %w", err)
peer.Log.Error(err)
return
}
@ -370,10 +377,12 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer
peer.Log.Info("send PeerPacket to peer")
if err := stream.Send(constructSuccessPeerPacket(peer, candidateParents[0], candidateParents[1:])); err != nil {
n++
peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error())
err = fmt.Errorf("send PeerPacket to peer failed in %d times, because of %w", n, err)
peer.Log.Error(err)
if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil {
peer.Log.Errorf("peer deletes inedges failed: %s", err.Error())
err = fmt.Errorf("peer deletes inedges failed: %w", err)
peer.Log.Error(err)
return
}
@ -383,7 +392,8 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer
// Add edge from parent to peer.
for _, candidateParent := range candidateParents {
if err := peer.Task.AddPeerEdge(candidateParent, peer); err != nil {
peer.Log.Debugf("peer adds edge failed: %s", err.Error())
err = fmt.Errorf("peer adds edge failed: %w", err)
peer.Log.Debug(err)
continue
}
}
@ -595,7 +605,8 @@ func (s *scheduling) filterCandidateParents(peer *standard.Peer, blocklist set.S
func (s *scheduling) FindReplicatePersistentCacheHosts(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) ([]*persistentcache.Peer, []*persistentcache.Host, bool) {
currentPersistentReplicaCount, err := s.persistentCacheResource.TaskManager().LoadCurrentPersistentReplicaCount(ctx, task.ID)
if err != nil {
task.Log.Errorf("load current persistent replica count failed %s", err)
err = fmt.Errorf("load current persistent replica count failed: %w", err)
task.Log.Error(err)
return nil, nil, false
}
@ -638,7 +649,8 @@ func (s *scheduling) FindReplicatePersistentCacheHosts(ctx context.Context, task
// Load all current persistent peers and add them to the blocklist to avoid scheduling the same host.
currentPersistentPeers, err := s.persistentCacheResource.PeerManager().LoadPersistentAllByTaskID(ctx, task.ID)
if err != nil {
task.Log.Errorf("load all persistent cache peers failed: %s", err.Error())
err = fmt.Errorf("load all persistent cache peers failed: %w", err)
task.Log.Error(err)
return nil, nil, false
}
@ -701,7 +713,8 @@ func (s *scheduling) FindCandidatePersistentCacheParents(ctx context.Context, pe
func (s *scheduling) filterCandidatePersistentCacheParents(ctx context.Context, peer *persistentcache.Peer, blocklist set.SafeSet[string]) []*persistentcache.Peer {
parents, err := s.persistentCacheResource.PeerManager().LoadAllByTaskID(ctx, peer.Task.ID)
if err != nil {
peer.Log.Errorf("load all persistent cache parents failed: %s", err.Error())
err = fmt.Errorf("load all persistent cache parents failed: %w", err)
peer.Log.Error(err)
return nil
}
@ -740,7 +753,8 @@ func (s *scheduling) filterCandidatePersistentCacheParents(ctx context.Context,
func (s *scheduling) filterCachedReplicatePersistentCacheParents(ctx context.Context, task *persistentcache.Task, blocklist set.SafeSet[string]) []*persistentcache.Peer {
parents, err := s.persistentCacheResource.PeerManager().LoadAllByTaskID(ctx, task.ID)
if err != nil {
task.Log.Errorf("load all persistent cache parents failed: %s", err.Error())
err = fmt.Errorf("load all persistent cache parents failed: %w", err)
task.Log.Error(err)
return nil
}
@ -785,7 +799,8 @@ func (s *scheduling) filterCachedReplicatePersistentCacheParents(ctx context.Con
func (s *scheduling) filterReplicatePersistentCacheHosts(ctx context.Context, task *persistentcache.Task, count int, blocklist set.SafeSet[string]) []*persistentcache.Host {
hosts, err := s.persistentCacheResource.HostManager().LoadRandom(ctx, count, blocklist)
if err != nil {
task.Log.Errorf("load all persistent cache hosts failed: %s", err.Error())
err = fmt.Errorf("load all persistent cache hosts failed: %w", err)
task.Log.Error(err)
return nil
}

View File

@ -441,9 +441,9 @@ func (v *V2) DeletePeer(_ctx context.Context, req *schedulerv2.DeletePeerRequest
}
if err := peer.FSM.Event(ctx, standard.PeerEventLeave); err != nil {
msg := fmt.Sprintf("peer fsm event failed: %s", err.Error())
peer.Log.Error(msg)
return status.Error(codes.FailedPrecondition, msg)
err = fmt.Errorf("peer fsm event failed: %w", err)
peer.Log.Error(err)
return status.Error(codes.FailedPrecondition, err.Error())
}
return nil
@ -537,8 +537,8 @@ func (v *V2) DeleteTask(_ctx context.Context, req *schedulerv2.DeleteTaskRequest
if peer.Task.ID == req.GetTaskId() {
if err := peer.FSM.Event(ctx, standard.PeerEventLeave); err != nil {
msg := fmt.Sprintf("peer fsm event failed: %s", err.Error())
peer.Log.Error(msg)
err = fmt.Errorf("peer fsm event failed: %w", err)
peer.Log.Error(err)
return true
}
}