Support reuse tiny peer task (#425)

* feature: support reuse tiny peer task

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* feature: reuse context when store tiny peer task

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2021-07-07 17:21:13 +08:00 committed by Gaius
parent 77656dace4
commit f6694a87ec
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
4 changed files with 66 additions and 4 deletions

View File

@ -26,6 +26,7 @@ import (
"sync"
"time"
"d7y.io/dragonfly/v2/client/clientutil"
"github.com/go-http-utils/headers"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
@ -153,6 +154,7 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer
}
// tiny file content is returned by scheduler, just write to output
if tiny != nil {
ptm.storeTinyPeerTask(ctx, tiny)
defer tiny.span.End()
log := logger.With("peer", tiny.PeerID, "task", tiny.TaskID, "component", "peerTaskManager")
_, err = os.Stat(req.Output)
@ -210,6 +212,7 @@ func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *schedu
}
// tiny file content is returned by scheduler, just write to output
if tiny != nil {
ptm.storeTinyPeerTask(ctx, tiny)
logger.Infof("copied tasks data %d bytes to buffer", len(tiny.Content))
tiny.span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
return ioutil.NopCloser(bytes.NewBuffer(tiny.Content)), map[string]string{
@ -244,3 +247,62 @@ func (ptm *peerTaskManager) IsPeerTaskRunning(peerID string) bool {
_, ok := ptm.runningPeerTasks.Load(peerID)
return ok
}
func (ptm *peerTaskManager) storeTinyPeerTask(ctx context.Context, tiny *TinyData) {
// TODO store tiny data asynchronous
l := int64(len(tiny.Content))
err := ptm.storageManager.RegisterTask(ctx,
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: tiny.PeerID,
TaskID: tiny.TaskID,
},
ContentLength: l,
TotalPieces: 1,
})
if err != nil {
logger.Errorf("register tiny data storage failed: %s", err)
return
}
n, err := ptm.storageManager.WritePiece(ctx,
&storage.WritePieceRequest{
PeerTaskMetaData: storage.PeerTaskMetaData{
PeerID: tiny.PeerID,
TaskID: tiny.TaskID,
},
PieceMetaData: storage.PieceMetaData{
Num: 0,
Md5: "",
Offset: 0,
Range: clientutil.Range{
Start: 0,
Length: l,
},
Style: 0,
},
UnknownLength: false,
Reader: bytes.NewBuffer(tiny.Content),
})
if err != nil {
logger.Errorf("write tiny data storage failed: %s", err)
return
}
if n != l {
logger.Errorf("write tiny data storage failed", n, l)
return
}
err = ptm.storageManager.Store(ctx,
&storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: tiny.PeerID,
TaskID: tiny.TaskID,
},
MetadataOnly: true,
TotalPieces: 1,
})
if err != nil {
logger.Errorf("store tiny data failed: %s", err)
} else {
logger.Debugf("store tiny data, len: %d", l)
}
}

View File

@ -44,9 +44,8 @@ func (p *streamPeerTaskCallback) Init(pt Task) error {
err := p.ptm.storageManager.RegisterTask(p.ctx,
storage.RegisterTaskRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: pt.GetPeerID(),
TaskID: pt.GetTaskID(),
Destination: "",
PeerID: pt.GetPeerID(),
TaskID: pt.GetTaskID(),
},
ContentLength: pt.GetContentLength(),
TotalPieces: pt.GetTotalPieces(),

View File

@ -210,6 +210,8 @@ func NewProxyWithOptions(options ...Option) (*Proxy, error) {
// ServeHTTP implements http.Handler.ServeHTTP
func (proxy *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx, span := proxy.tracer.Start(r.Context(), config.SpanProxy)
span.SetAttributes(config.AttributePeerHost.String(proxy.peerHost.Uuid))
span.SetAttributes(semconv.NetHostIPKey.String(proxy.peerHost.Ip))
span.SetAttributes(semconv.HTTPSchemeKey.String(r.URL.Scheme))
span.SetAttributes(semconv.HTTPHostKey.String(r.Host))
span.SetAttributes(semconv.HTTPURLKey.String(r.URL.String()))

View File

@ -59,7 +59,6 @@ type RegisterTaskRequest struct {
CommonTaskRequest
ContentLength int64
TotalPieces int32
GCCallback func(CommonTaskRequest)
}
type WritePieceRequest struct {