daemon: add add additional peer id for some logs (#205)

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2021-05-08 17:11:57 +08:00 committed by Gaius
parent 89f8a9b3fb
commit 48bb920806
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
6 changed files with 38 additions and 26 deletions

View File

@ -644,7 +644,7 @@ retry:
}
if code == dfcodes.CdnTaskNotFound && curPeerPacket == pt.peerPacket {
span.AddEvent("RetryForCdnTaskNotFound")
span.AddEvent("retry for CdnTaskNotFound")
goto retry
}
return nil, err

View File

@ -129,8 +129,9 @@ func newFilePeerTask(ctx context.Context,
logger.Infof("%s/%s size scope: tiny", result.TaskId, request.PeerId)
if piece, ok := result.DirectPiece.(*scheduler.RegisterResult_PieceContent); ok {
return ctx, nil, &TinyData{
span: span,
TaskId: result.TaskId,
PeerID: request.PeerId,
PeerId: request.PeerId,
Content: piece.PieceContent,
}, nil
}

View File

@ -82,8 +82,10 @@ type PeerTaskCallback interface {
}
type TinyData struct {
// span is used by peer task manager to record events without peer task
span trace.Span
TaskId string
PeerID string
PeerId string
Content []byte
}
@ -127,7 +129,6 @@ func NewPeerTaskManager(
func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeerTaskRequest) (chan *FilePeerTaskProgress, *TinyData, error) {
// TODO ensure scheduler is ok first
start := time.Now()
ctx, pt, tiny, err := newFilePeerTask(ctx, ptm.host, ptm.pieceManager,
&req.PeerTaskRequest, ptm.schedulerClient, ptm.schedulerOption, ptm.perPeerRateLimit)
@ -136,27 +137,31 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer
}
// tiny file content is returned by scheduler, just write to output
if tiny != nil {
// TODO enable trace for tiny peer task
//defer pt.Span().End()
defer tiny.span.End()
log := logger.With("peer", tiny.PeerId, "task", tiny.TaskId, "component", "peerTaskManager")
_, err = os.Stat(req.Output)
if err == nil {
// remove exist file
logger.Infof("destination file %q exists, purge it first", req.Output)
log.Infof("destination file %q exists, purge it first", req.Output)
tiny.span.AddEvent("purge exist output")
os.Remove(req.Output)
}
dstFile, err := os.OpenFile(req.Output, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
//pt.Span().RecordError(err)
logger.Errorf("open tasks destination file error: %s", err)
tiny.span.RecordError(err)
tiny.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
log.Errorf("open task destination file error: %s", err)
return nil, nil, err
}
defer dstFile.Close()
n, err := dstFile.Write(tiny.Content)
if err != nil {
//pt.Span().RecordError(err)
tiny.span.RecordError(err)
tiny.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
return nil, nil, err
}
logger.Debugf("copied tasks data %d bytes to %s", n, req.Output)
log.Debugf("copied tasks data %d bytes to %s", n, req.Output)
tiny.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
return nil, tiny, nil
}
pt.SetCallback(&filePeerTaskCallback{
@ -183,6 +188,8 @@ func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *schedu
}
// tiny file content is returned by scheduler, just write to output
if tiny != nil {
logger.Infof("copied tasks data %d bytes to buffer", len(tiny.Content))
tiny.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
return bytes.NewBuffer(tiny.Content), map[string]string{
headers.ContentLength: fmt.Sprintf("%d", len(tiny.Content)),
}, nil

View File

@ -110,8 +110,9 @@ func newStreamPeerTask(ctx context.Context,
logger.Debugf("%s/%s size scope: tiny", result.TaskId, request.PeerId)
if piece, ok := result.DirectPiece.(*scheduler.RegisterResult_PieceContent); ok {
return ctx, nil, &TinyData{
span: span,
TaskId: result.TaskId,
PeerID: request.PeerId,
PeerId: request.PeerId,
Content: piece.PieceContent,
}, nil
}

View File

@ -147,6 +147,7 @@ func (m *manager) Download(ctx context.Context,
},
Output: req.Output,
}
log := logger.With("peer", peerTask.PeerId, "component", "downloadService")
peerTaskProgress, tiny, err := m.peerTaskManager.StartFilePeerTask(ctx, peerTask)
if err != nil {
@ -155,14 +156,14 @@ func (m *manager) Download(ctx context.Context,
if tiny != nil {
results <- &dfdaemongrpc.DownResult{
TaskId: tiny.TaskId,
PeerId: tiny.PeerID,
PeerId: tiny.PeerId,
CompletedLength: uint64(len(tiny.Content)),
Done: true,
}
logger.Infof("tiny file, wrote to output")
log.Infof("tiny file, wrote to output")
if req.Uid != 0 && req.Gid != 0 {
if err = os.Chown(req.Output, int(req.Uid), int(req.Gid)); err != nil {
logger.Errorf("change own failed: %s", err)
log.Errorf("change own failed: %s", err)
return err
}
}
@ -174,11 +175,11 @@ func (m *manager) Download(ctx context.Context,
case p, ok := <-peerTaskProgress:
if !ok {
err = errors.New("progress closed unexpected")
logger.Errorf(err.Error())
log.Errorf(err.Error())
return dferrors.New(dfcodes.UnknownError, err.Error())
}
if !p.State.Success {
logger.Errorf("task %s failed: %d/%s", p.TaskId, p.State.Code, p.State.Msg)
log.Errorf("task %s/%s failed: %d/%s", p.PeerID, p.TaskId, p.State.Code, p.State.Msg)
return dferrors.New(p.State.Code, p.State.Msg)
}
results <- &dfdaemongrpc.DownResult{
@ -190,11 +191,11 @@ func (m *manager) Download(ctx context.Context,
// peer task sets PeerTaskDone to true only once
if p.PeerTaskDone {
p.DoneCallback()
logger.Infof("task %s done", p.TaskId)
log.Infof("task %s/%s done", p.PeerID, p.TaskId)
if req.Uid != 0 && req.Gid != 0 {
logger.Infof("change own to uid %d gid %d", req.Uid, req.Gid)
log.Infof("change own to uid %d gid %d", req.Uid, req.Gid)
if err = os.Chown(req.Output, int(req.Uid), int(req.Gid)); err != nil {
logger.Errorf("change own failed: %s", err)
log.Errorf("change own failed: %s", err)
return err
}
}
@ -205,7 +206,7 @@ func (m *manager) Download(ctx context.Context,
CompletedLength: 0,
Done: true,
}
logger.Infof("context done due to %s", ctx.Err())
log.Infof("context done due to %s", ctx.Err())
return status.Error(codes.Canceled, ctx.Err().Error())
}
}

View File

@ -131,7 +131,7 @@ func (rt *transport) RoundTrip(req *http.Request) (*http.Response, error) {
return rt.baseRoundTripper.RoundTrip(req)
}
// needUseGetter is the default value for shouldUseDragonfly, which downloads all
// NeedUseDragonfly is the default value for shouldUseDragonfly, which downloads all
// images layers with dragonfly.
func NeedUseDragonfly(req *http.Request) bool {
return req.Method == http.MethodGet && layerReg.MatchString(req.URL.Path)
@ -140,7 +140,9 @@ func NeedUseDragonfly(req *http.Request) bool {
// download uses dragonfly to download.
func (rt *transport) download(req *http.Request) (*http.Response, error) {
url := req.URL.String()
logger.Infof("start download with url: %s", url)
peerId := clientutil.GenPeerID(rt.peerHost)
log := logger.With("peer", peerId, "component", "transport")
log.Infof("start download with url: %s", url)
meta := &base.UrlMeta{Header: map[string]string{}}
if rg := req.Header.Get("Range"); len(rg) > 0 {
@ -186,21 +188,21 @@ func (rt *transport) download(req *http.Request) (*http.Response, error) {
Filter: filter,
BizId: biz,
UrlMata: meta,
PeerId: clientutil.GenPeerID(rt.peerHost),
PeerId: peerId,
PeerHost: rt.peerHost,
HostLoad: nil,
IsMigrating: false,
},
)
if err != nil {
logger.Errorf("download fail: %v", err)
log.Errorf("download fail: %v", err)
return nil, err
}
var hdr = http.Header{}
for k, v := range attr {
hdr.Set(k, v)
}
logger.Infof("download stream attribute: %v", hdr)
log.Infof("download stream attribute: %v", hdr)
resp := &http.Response{
StatusCode: 200,