feat: optimize preheat (#1824)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-11-10 16:53:24 +08:00
parent b1c049db46
commit 3da67311ea
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
3 changed files with 30 additions and 34 deletions

View File

@ -12,7 +12,7 @@ import (
v1 "d7y.io/api/pkg/apis/common/v1"
v10 "d7y.io/api/pkg/apis/scheduler/v1"
storage "d7y.io/dragonfly/v2/client/daemon/storage"
dflog "d7y.io/dragonfly/v2/internal/dflog"
logger "d7y.io/dragonfly/v2/internal/dflog"
gomock "github.com/golang/mock/gomock"
status "google.golang.org/grpc/status"
)
@ -322,10 +322,10 @@ func (mr *MockTaskMockRecorder) GetTraffic() *gomock.Call {
}
// Log mocks base method.
func (m *MockTask) Log() *dflog.SugaredLoggerOnWith {
func (m *MockTask) Log() *logger.SugaredLoggerOnWith {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Log")
ret0, _ := ret[0].(*dflog.SugaredLoggerOnWith)
ret0, _ := ret[0].(*logger.SugaredLoggerOnWith)
return ret0
}
@ -431,10 +431,10 @@ func (m *MockLogger) EXPECT() *MockLoggerMockRecorder {
}
// Log mocks base method.
func (m *MockLogger) Log() *dflog.SugaredLoggerOnWith {
func (m *MockLogger) Log() *logger.SugaredLoggerOnWith {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Log")
ret0, _ := ret[0].(*dflog.SugaredLoggerOnWith)
ret0, _ := ret[0].(*logger.SugaredLoggerOnWith)
return ret0
}

View File

@ -100,7 +100,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Schedule
queues := getSchedulerQueues(schedulers)
// Generate download files
var files []*internaljob.PreheatRequest
var files []internaljob.PreheatRequest
switch PreheatType(json.Type) {
case PreheatImageType:
// Parse image manifest url
@ -114,7 +114,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Schedule
return nil, err
}
case PreheatFileType:
files = []*internaljob.PreheatRequest{
files = []internaljob.PreheatRequest{
{
URL: url,
Tag: tag,
@ -126,14 +126,11 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Schedule
return nil, errors.New("unknow preheat type")
}
for _, f := range files {
logger.Infof("preheat %s file url: %v queues: %v", json.URL, f.URL, queues)
}
logger.Infof("preheat %s queues: %v, files: %#v", json.URL, queues, files)
return p.createGroupJob(ctx, files, queues)
}
func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.PreheatRequest, queues []internaljob.Queue) (*internaljob.GroupJobState, error) {
func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.PreheatRequest, queues []internaljob.Queue) (*internaljob.GroupJobState, error) {
signatures := []*machineryv1tasks.Signature{}
var urls []string
for i := range files {
@ -173,7 +170,7 @@ func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.Prehe
}, nil
}
func (p *preheat) getLayers(ctx context.Context, url, tag, filter string, header http.Header, image *preheatImage) ([]*internaljob.PreheatRequest, error) {
func (p *preheat) getLayers(ctx context.Context, url, tag, filter string, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) {
ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()
@ -234,7 +231,7 @@ func (p *preheat) getManifests(ctx context.Context, url string, header http.Head
return resp, nil
}
func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, header http.Header, image *preheatImage) ([]*internaljob.PreheatRequest, error) {
func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) {
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
@ -245,10 +242,10 @@ func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, head
return nil, err
}
var layers []*internaljob.PreheatRequest
var layers []internaljob.PreheatRequest
for _, v := range manifest.References() {
digest := v.Digest.String()
layer := &internaljob.PreheatRequest{
layer := internaljob.PreheatRequest{
URL: layerURL(image.protocol, image.domain, image.name, digest),
Tag: tag,
Filter: filter,

View File

@ -145,55 +145,54 @@ func (j *job) preheat(ctx context.Context, req string) error {
return errors.New("scheduler has disabled seed peer")
}
request := &internaljob.PreheatRequest{}
if err := internaljob.UnmarshalRequest(req, request); err != nil {
preheat := &internaljob.PreheatRequest{}
if err := internaljob.UnmarshalRequest(req, preheat); err != nil {
logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), req)
return err
}
if err := validator.New().Struct(request); err != nil {
logger.Errorf("url %s validate failed: %s", request.URL, err.Error())
if err := validator.New().Struct(preheat); err != nil {
logger.Errorf("preheat %s validate failed: %s", preheat.URL, err.Error())
return err
}
urlMeta := &commonv1.UrlMeta{
Header: request.Headers,
Tag: request.Tag,
Filter: request.Filter,
Digest: request.Digest,
Header: preheat.Headers,
Tag: preheat.Tag,
Filter: preheat.Filter,
Digest: preheat.Digest,
}
if request.Headers != nil {
if r, ok := request.Headers[headers.Range]; ok {
if preheat.Headers != nil {
if r, ok := preheat.Headers[headers.Range]; ok {
// Range in dragonfly is without "bytes=".
urlMeta.Range = strings.TrimLeft(r, "bytes=")
}
}
taskID := idgen.TaskID(request.URL, urlMeta)
// Trigger seed peer download seeds.
log := logger.WithTaskIDAndURL(taskID, request.URL)
taskID := idgen.TaskID(preheat.URL, urlMeta)
log := logger.WithTaskIDAndURL(taskID, preheat.URL)
log.Infof("preheat %s headers: %#v, tag: %s, range: %s, filter: %s, digest: %s",
request.URL, urlMeta.Header, urlMeta.Tag, urlMeta.Range, urlMeta.Filter, urlMeta.Digest)
preheat.URL, urlMeta.Header, urlMeta.Tag, urlMeta.Range, urlMeta.Filter, urlMeta.Digest)
stream, err := j.resource.SeedPeer().Client().ObtainSeeds(ctx, &cdnsystemv1.SeedRequest{
TaskId: taskID,
Url: request.URL,
Url: preheat.URL,
UrlMeta: urlMeta,
})
if err != nil {
log.Errorf("preheat failed: %s", err.Error())
log.Errorf("preheat %s failed: %s", preheat.URL, err.Error())
return err
}
for {
piece, err := stream.Recv()
if err != nil {
log.Errorf("preheat recive piece failed: %s", err.Error())
log.Errorf("preheat %s recive piece failed: %s", preheat.URL, err.Error())
return err
}
if piece.Done == true {
log.Info("preheat succeeded")
log.Infof("preheat %s succeeded", preheat.URL)
return nil
}
}