diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index 62b2b4aa3..da3116b7a 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -644,7 +644,7 @@ retry: } if code == dfcodes.CdnTaskNotFound && curPeerPacket == pt.peerPacket { - span.AddEvent("RetryForCdnTaskNotFound") + span.AddEvent("retry for CdnTaskNotFound") goto retry } return nil, err diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 37599ef2b..2dc92dfca 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -129,8 +129,9 @@ func newFilePeerTask(ctx context.Context, logger.Infof("%s/%s size scope: tiny", result.TaskId, request.PeerId) if piece, ok := result.DirectPiece.(*scheduler.RegisterResult_PieceContent); ok { return ctx, nil, &TinyData{ + span: span, TaskId: result.TaskId, - PeerID: request.PeerId, + PeerId: request.PeerId, Content: piece.PieceContent, }, nil } diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index e06f757f5..83361bbf9 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -82,8 +82,10 @@ type PeerTaskCallback interface { } type TinyData struct { + // span is used by peer task manager to record events without peer task + span trace.Span TaskId string - PeerID string + PeerId string Content []byte } @@ -127,7 +129,6 @@ func NewPeerTaskManager( func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeerTaskRequest) (chan *FilePeerTaskProgress, *TinyData, error) { // TODO ensure scheduler is ok first - start := time.Now() ctx, pt, tiny, err := newFilePeerTask(ctx, ptm.host, ptm.pieceManager, &req.PeerTaskRequest, ptm.schedulerClient, ptm.schedulerOption, ptm.perPeerRateLimit) @@ -136,27 +137,31 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer } // tiny file content is returned by scheduler, just write to output if tiny != nil { - // TODO enable trace for tiny peer task - //defer pt.Span().End() + defer tiny.span.End() + log := logger.With("peer", tiny.PeerId, "task", tiny.TaskId, "component", "peerTaskManager") _, err = os.Stat(req.Output) if err == nil { // remove exist file - logger.Infof("destination file %q exists, purge it first", req.Output) + log.Infof("destination file %q exists, purge it first", req.Output) + tiny.span.AddEvent("purge exist output") os.Remove(req.Output) } dstFile, err := os.OpenFile(req.Output, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { - //pt.Span().RecordError(err) - logger.Errorf("open tasks destination file error: %s", err) + tiny.span.RecordError(err) + tiny.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false)) + log.Errorf("open task destination file error: %s", err) return nil, nil, err } defer dstFile.Close() n, err := dstFile.Write(tiny.Content) if err != nil { - //pt.Span().RecordError(err) + tiny.span.RecordError(err) + tiny.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false)) return nil, nil, err } - logger.Debugf("copied tasks data %d bytes to %s", n, req.Output) + log.Debugf("copied tasks data %d bytes to %s", n, req.Output) + tiny.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true)) return nil, tiny, nil } pt.SetCallback(&filePeerTaskCallback{ @@ -183,6 +188,8 @@ func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *schedu } // tiny file content is returned by scheduler, just write to output if tiny != nil { + logger.Infof("copied tasks data %d bytes to buffer", len(tiny.Content)) + tiny.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true)) return bytes.NewBuffer(tiny.Content), map[string]string{ headers.ContentLength: fmt.Sprintf("%d", len(tiny.Content)), }, nil diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 7aba7a52d..07dff6d81 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -110,8 +110,9 @@ func newStreamPeerTask(ctx context.Context, logger.Debugf("%s/%s size scope: tiny", result.TaskId, request.PeerId) if piece, ok := result.DirectPiece.(*scheduler.RegisterResult_PieceContent); ok { return ctx, nil, &TinyData{ + span: span, TaskId: result.TaskId, - PeerID: request.PeerId, + PeerId: request.PeerId, Content: piece.PieceContent, }, nil } diff --git a/client/daemon/service/manager.go b/client/daemon/service/manager.go index 6e06b8e71..c4d7af574 100644 --- a/client/daemon/service/manager.go +++ b/client/daemon/service/manager.go @@ -147,6 +147,7 @@ func (m *manager) Download(ctx context.Context, }, Output: req.Output, } + log := logger.With("peer", peerTask.PeerId, "component", "downloadService") peerTaskProgress, tiny, err := m.peerTaskManager.StartFilePeerTask(ctx, peerTask) if err != nil { @@ -155,14 +156,14 @@ func (m *manager) Download(ctx context.Context, if tiny != nil { results <- &dfdaemongrpc.DownResult{ TaskId: tiny.TaskId, - PeerId: tiny.PeerID, + PeerId: tiny.PeerId, CompletedLength: uint64(len(tiny.Content)), Done: true, } - logger.Infof("tiny file, wrote to output") + log.Infof("tiny file, wrote to output") if req.Uid != 0 && req.Gid != 0 { if err = os.Chown(req.Output, int(req.Uid), int(req.Gid)); err != nil { - logger.Errorf("change own failed: %s", err) + log.Errorf("change own failed: %s", err) return err } } @@ -174,11 +175,11 @@ func (m *manager) Download(ctx context.Context, case p, ok := <-peerTaskProgress: if !ok { err = errors.New("progress closed unexpected") - logger.Errorf(err.Error()) + log.Errorf(err.Error()) return dferrors.New(dfcodes.UnknownError, err.Error()) } if !p.State.Success { - logger.Errorf("task %s failed: %d/%s", p.TaskId, p.State.Code, p.State.Msg) + log.Errorf("task %s/%s failed: %d/%s", p.PeerID, p.TaskId, p.State.Code, p.State.Msg) return dferrors.New(p.State.Code, p.State.Msg) } results <- &dfdaemongrpc.DownResult{ @@ -190,11 +191,11 @@ func (m *manager) Download(ctx context.Context, // peer task sets PeerTaskDone to true only once if p.PeerTaskDone { p.DoneCallback() - logger.Infof("task %s done", p.TaskId) + log.Infof("task %s/%s done", p.PeerID, p.TaskId) if req.Uid != 0 && req.Gid != 0 { - logger.Infof("change own to uid %d gid %d", req.Uid, req.Gid) + log.Infof("change own to uid %d gid %d", req.Uid, req.Gid) if err = os.Chown(req.Output, int(req.Uid), int(req.Gid)); err != nil { - logger.Errorf("change own failed: %s", err) + log.Errorf("change own failed: %s", err) return err } } @@ -205,7 +206,7 @@ func (m *manager) Download(ctx context.Context, CompletedLength: 0, Done: true, } - logger.Infof("context done due to %s", ctx.Err()) + log.Infof("context done due to %s", ctx.Err()) return status.Error(codes.Canceled, ctx.Err().Error()) } } diff --git a/client/daemon/transport/transport.go b/client/daemon/transport/transport.go index 2bc908d75..cdc3bf375 100644 --- a/client/daemon/transport/transport.go +++ b/client/daemon/transport/transport.go @@ -131,7 +131,7 @@ func (rt *transport) RoundTrip(req *http.Request) (*http.Response, error) { return rt.baseRoundTripper.RoundTrip(req) } -// needUseGetter is the default value for shouldUseDragonfly, which downloads all +// NeedUseDragonfly is the default value for shouldUseDragonfly, which downloads all // images layers with dragonfly. func NeedUseDragonfly(req *http.Request) bool { return req.Method == http.MethodGet && layerReg.MatchString(req.URL.Path) @@ -140,7 +140,9 @@ func NeedUseDragonfly(req *http.Request) bool { // download uses dragonfly to download. func (rt *transport) download(req *http.Request) (*http.Response, error) { url := req.URL.String() - logger.Infof("start download with url: %s", url) + peerId := clientutil.GenPeerID(rt.peerHost) + log := logger.With("peer", peerId, "component", "transport") + log.Infof("start download with url: %s", url) meta := &base.UrlMeta{Header: map[string]string{}} if rg := req.Header.Get("Range"); len(rg) > 0 { @@ -186,21 +188,21 @@ func (rt *transport) download(req *http.Request) (*http.Response, error) { Filter: filter, BizId: biz, UrlMata: meta, - PeerId: clientutil.GenPeerID(rt.peerHost), + PeerId: peerId, PeerHost: rt.peerHost, HostLoad: nil, IsMigrating: false, }, ) if err != nil { - logger.Errorf("download fail: %v", err) + log.Errorf("download fail: %v", err) return nil, err } var hdr = http.Header{} for k, v := range attr { hdr.Set(k, v) } - logger.Infof("download stream attribute: %v", hdr) + log.Infof("download stream attribute: %v", hdr) resp := &http.Response{ StatusCode: 200,