From f6694a87ec46ffc9392536e5aff08942d6e0cb6c Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Wed, 7 Jul 2021 17:21:13 +0800 Subject: [PATCH] Support reuse tiny peer task (#425) * feature: support reuse tiny peer task Signed-off-by: Jim Ma * feature: reuse context when store tiny peer task Signed-off-by: Jim Ma --- client/daemon/peer/peertask_manager.go | 62 +++++++++++++++++++ .../daemon/peer/peertask_stream_callback.go | 5 +- client/daemon/proxy/proxy.go | 2 + client/daemon/storage/metadata.go | 1 - 4 files changed, 66 insertions(+), 4 deletions(-) diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index 99c97bcf6..9b1b0bc35 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "d7y.io/dragonfly/v2/client/clientutil" "github.com/go-http-utils/headers" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" @@ -153,6 +154,7 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer } // tiny file content is returned by scheduler, just write to output if tiny != nil { + ptm.storeTinyPeerTask(ctx, tiny) defer tiny.span.End() log := logger.With("peer", tiny.PeerID, "task", tiny.TaskID, "component", "peerTaskManager") _, err = os.Stat(req.Output) @@ -210,6 +212,7 @@ func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *schedu } // tiny file content is returned by scheduler, just write to output if tiny != nil { + ptm.storeTinyPeerTask(ctx, tiny) logger.Infof("copied tasks data %d bytes to buffer", len(tiny.Content)) tiny.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true)) return ioutil.NopCloser(bytes.NewBuffer(tiny.Content)), map[string]string{ @@ -244,3 +247,62 @@ func (ptm *peerTaskManager) IsPeerTaskRunning(peerID string) bool { _, ok := ptm.runningPeerTasks.Load(peerID) return ok } + +func (ptm *peerTaskManager) storeTinyPeerTask(ctx context.Context, tiny *TinyData) { + // TODO store tiny data asynchronous + l := int64(len(tiny.Content)) + err := ptm.storageManager.RegisterTask(ctx, + storage.RegisterTaskRequest{ + CommonTaskRequest: storage.CommonTaskRequest{ + PeerID: tiny.PeerID, + TaskID: tiny.TaskID, + }, + ContentLength: l, + TotalPieces: 1, + }) + if err != nil { + logger.Errorf("register tiny data storage failed: %s", err) + return + } + n, err := ptm.storageManager.WritePiece(ctx, + &storage.WritePieceRequest{ + PeerTaskMetaData: storage.PeerTaskMetaData{ + PeerID: tiny.PeerID, + TaskID: tiny.TaskID, + }, + PieceMetaData: storage.PieceMetaData{ + Num: 0, + Md5: "", + Offset: 0, + Range: clientutil.Range{ + Start: 0, + Length: l, + }, + Style: 0, + }, + UnknownLength: false, + Reader: bytes.NewBuffer(tiny.Content), + }) + if err != nil { + logger.Errorf("write tiny data storage failed: %s", err) + return + } + if n != l { + logger.Errorf("write tiny data storage failed", n, l) + return + } + err = ptm.storageManager.Store(ctx, + &storage.StoreRequest{ + CommonTaskRequest: storage.CommonTaskRequest{ + PeerID: tiny.PeerID, + TaskID: tiny.TaskID, + }, + MetadataOnly: true, + TotalPieces: 1, + }) + if err != nil { + logger.Errorf("store tiny data failed: %s", err) + } else { + logger.Debugf("store tiny data, len: %d", l) + } +} diff --git a/client/daemon/peer/peertask_stream_callback.go b/client/daemon/peer/peertask_stream_callback.go index cd7ac75b0..07b00c0f4 100644 --- a/client/daemon/peer/peertask_stream_callback.go +++ b/client/daemon/peer/peertask_stream_callback.go @@ -44,9 +44,8 @@ func (p *streamPeerTaskCallback) Init(pt Task) error { err := p.ptm.storageManager.RegisterTask(p.ctx, storage.RegisterTaskRequest{ CommonTaskRequest: storage.CommonTaskRequest{ - PeerID: pt.GetPeerID(), - TaskID: pt.GetTaskID(), - Destination: "", + PeerID: pt.GetPeerID(), + TaskID: pt.GetTaskID(), }, ContentLength: pt.GetContentLength(), TotalPieces: pt.GetTotalPieces(), diff --git a/client/daemon/proxy/proxy.go b/client/daemon/proxy/proxy.go index 60721b19d..1af59bc61 100644 --- a/client/daemon/proxy/proxy.go +++ b/client/daemon/proxy/proxy.go @@ -210,6 +210,8 @@ func NewProxyWithOptions(options ...Option) (*Proxy, error) { // ServeHTTP implements http.Handler.ServeHTTP func (proxy *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx, span := proxy.tracer.Start(r.Context(), config.SpanProxy) + span.SetAttributes(config.AttributePeerHost.String(proxy.peerHost.Uuid)) + span.SetAttributes(semconv.NetHostIPKey.String(proxy.peerHost.Ip)) span.SetAttributes(semconv.HTTPSchemeKey.String(r.URL.Scheme)) span.SetAttributes(semconv.HTTPHostKey.String(r.Host)) span.SetAttributes(semconv.HTTPURLKey.String(r.URL.String())) diff --git a/client/daemon/storage/metadata.go b/client/daemon/storage/metadata.go index 7e3902e0b..64e6427fe 100644 --- a/client/daemon/storage/metadata.go +++ b/client/daemon/storage/metadata.go @@ -59,7 +59,6 @@ type RegisterTaskRequest struct { CommonTaskRequest ContentLength int64 TotalPieces int32 - GCCallback func(CommonTaskRequest) } type WritePieceRequest struct {