From 3da67311ea0a53dc034fbae30644f688334924e5 Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 10 Nov 2022 16:53:24 +0800 Subject: [PATCH] feat: optimize preheat (#1824) Signed-off-by: Gaius --- client/daemon/peer/peertask_manager_mock.go | 10 +++--- manager/job/preheat.go | 19 +++++------ scheduler/job/job.go | 35 ++++++++++----------- 3 files changed, 30 insertions(+), 34 deletions(-) diff --git a/client/daemon/peer/peertask_manager_mock.go b/client/daemon/peer/peertask_manager_mock.go index 382a6bae1..29223d4a1 100644 --- a/client/daemon/peer/peertask_manager_mock.go +++ b/client/daemon/peer/peertask_manager_mock.go @@ -12,7 +12,7 @@ import ( v1 "d7y.io/api/pkg/apis/common/v1" v10 "d7y.io/api/pkg/apis/scheduler/v1" storage "d7y.io/dragonfly/v2/client/daemon/storage" - dflog "d7y.io/dragonfly/v2/internal/dflog" + logger "d7y.io/dragonfly/v2/internal/dflog" gomock "github.com/golang/mock/gomock" status "google.golang.org/grpc/status" ) @@ -322,10 +322,10 @@ func (mr *MockTaskMockRecorder) GetTraffic() *gomock.Call { } // Log mocks base method. -func (m *MockTask) Log() *dflog.SugaredLoggerOnWith { +func (m *MockTask) Log() *logger.SugaredLoggerOnWith { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Log") - ret0, _ := ret[0].(*dflog.SugaredLoggerOnWith) + ret0, _ := ret[0].(*logger.SugaredLoggerOnWith) return ret0 } @@ -431,10 +431,10 @@ func (m *MockLogger) EXPECT() *MockLoggerMockRecorder { } // Log mocks base method. -func (m *MockLogger) Log() *dflog.SugaredLoggerOnWith { +func (m *MockLogger) Log() *logger.SugaredLoggerOnWith { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Log") - ret0, _ := ret[0].(*dflog.SugaredLoggerOnWith) + ret0, _ := ret[0].(*logger.SugaredLoggerOnWith) return ret0 } diff --git a/manager/job/preheat.go b/manager/job/preheat.go index e11ddcbf3..4922954e8 100644 --- a/manager/job/preheat.go +++ b/manager/job/preheat.go @@ -100,7 +100,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Schedule queues := getSchedulerQueues(schedulers) // Generate download files - var files []*internaljob.PreheatRequest + var files []internaljob.PreheatRequest switch PreheatType(json.Type) { case PreheatImageType: // Parse image manifest url @@ -114,7 +114,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Schedule return nil, err } case PreheatFileType: - files = []*internaljob.PreheatRequest{ + files = []internaljob.PreheatRequest{ { URL: url, Tag: tag, @@ -126,14 +126,11 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Schedule return nil, errors.New("unknow preheat type") } - for _, f := range files { - logger.Infof("preheat %s file url: %v queues: %v", json.URL, f.URL, queues) - } - + logger.Infof("preheat %s queues: %v, files: %#v", json.URL, queues, files) return p.createGroupJob(ctx, files, queues) } -func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.PreheatRequest, queues []internaljob.Queue) (*internaljob.GroupJobState, error) { +func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.PreheatRequest, queues []internaljob.Queue) (*internaljob.GroupJobState, error) { signatures := []*machineryv1tasks.Signature{} var urls []string for i := range files { @@ -173,7 +170,7 @@ func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.Prehe }, nil } -func (p *preheat) getLayers(ctx context.Context, url, tag, filter string, header http.Header, image *preheatImage) ([]*internaljob.PreheatRequest, error) { +func (p *preheat) getLayers(ctx context.Context, url, tag, filter string, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) { ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer)) defer span.End() @@ -234,7 +231,7 @@ func (p *preheat) getManifests(ctx context.Context, url string, header http.Head return resp, nil } -func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, header http.Header, image *preheatImage) ([]*internaljob.PreheatRequest, error) { +func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) { body, err := io.ReadAll(resp.Body) if err != nil { return nil, err @@ -245,10 +242,10 @@ func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, head return nil, err } - var layers []*internaljob.PreheatRequest + var layers []internaljob.PreheatRequest for _, v := range manifest.References() { digest := v.Digest.String() - layer := &internaljob.PreheatRequest{ + layer := internaljob.PreheatRequest{ URL: layerURL(image.protocol, image.domain, image.name, digest), Tag: tag, Filter: filter, diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 30d23a18c..7c795cefc 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -145,55 +145,54 @@ func (j *job) preheat(ctx context.Context, req string) error { return errors.New("scheduler has disabled seed peer") } - request := &internaljob.PreheatRequest{} - if err := internaljob.UnmarshalRequest(req, request); err != nil { + preheat := &internaljob.PreheatRequest{} + if err := internaljob.UnmarshalRequest(req, preheat); err != nil { logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), req) return err } - if err := validator.New().Struct(request); err != nil { - logger.Errorf("url %s validate failed: %s", request.URL, err.Error()) + if err := validator.New().Struct(preheat); err != nil { + logger.Errorf("preheat %s validate failed: %s", preheat.URL, err.Error()) return err } urlMeta := &commonv1.UrlMeta{ - Header: request.Headers, - Tag: request.Tag, - Filter: request.Filter, - Digest: request.Digest, + Header: preheat.Headers, + Tag: preheat.Tag, + Filter: preheat.Filter, + Digest: preheat.Digest, } - if request.Headers != nil { - if r, ok := request.Headers[headers.Range]; ok { + if preheat.Headers != nil { + if r, ok := preheat.Headers[headers.Range]; ok { // Range in dragonfly is without "bytes=". urlMeta.Range = strings.TrimLeft(r, "bytes=") } } - taskID := idgen.TaskID(request.URL, urlMeta) - // Trigger seed peer download seeds. - log := logger.WithTaskIDAndURL(taskID, request.URL) + taskID := idgen.TaskID(preheat.URL, urlMeta) + log := logger.WithTaskIDAndURL(taskID, preheat.URL) log.Infof("preheat %s headers: %#v, tag: %s, range: %s, filter: %s, digest: %s", - request.URL, urlMeta.Header, urlMeta.Tag, urlMeta.Range, urlMeta.Filter, urlMeta.Digest) + preheat.URL, urlMeta.Header, urlMeta.Tag, urlMeta.Range, urlMeta.Filter, urlMeta.Digest) stream, err := j.resource.SeedPeer().Client().ObtainSeeds(ctx, &cdnsystemv1.SeedRequest{ TaskId: taskID, - Url: request.URL, + Url: preheat.URL, UrlMeta: urlMeta, }) if err != nil { - log.Errorf("preheat failed: %s", err.Error()) + log.Errorf("preheat %s failed: %s", preheat.URL, err.Error()) return err } for { piece, err := stream.Recv() if err != nil { - log.Errorf("preheat recive piece failed: %s", err.Error()) + log.Errorf("preheat %s recive piece failed: %s", preheat.URL, err.Error()) return err } if piece.Done == true { - log.Info("preheat succeeded") + log.Infof("preheat %s succeeded", preheat.URL) return nil } }