chore: optimize reuse logic (#1110)

* chore: optimize reuse logic

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2022-03-02 16:07:01 +08:00 committed by Gaius
parent a5f385cd99
commit 977f095c49
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
2 changed files with 558 additions and 30 deletions

View File

@ -37,31 +37,45 @@ import (
var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation
// reuse task search logic:
// A. prefetch feature enabled
// for ranged request, 1, find completed subtask, 2, find partial completed parent task
// for non-ranged request, just find completed task
// B. prefetch feature disabled
// for ranged request, 1, find completed normal task, 2, find partial completed parent task
// for non-ranged request, just find completed task
func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
request *FileTaskRequest) (chan *FileTaskProgress, bool) {
taskID := idgen.TaskID(request.Url, request.UrlMeta)
var reuse *storage.ReusePeerTask
var (
reuse *storage.ReusePeerTask
reuseRange *clientutil.Range // the range of parent peer task data to read
log *logger.SugaredLoggerOnWith
length int64
err error
)
if ptm.enabledPrefetch(request.Range) {
reuse = ptm.storageManager.FindCompletedSubTask(taskID)
} else {
reuse = ptm.storageManager.FindCompletedTask(taskID)
}
var (
rg *clientutil.Range // the range of parent peer task data to read
log *logger.SugaredLoggerOnWith
length int64
err error
)
if reuse == nil {
rg = request.Range
if request.Range == nil {
return nil, false
}
// for ranged request, check the parent task
reuseRange = request.Range
taskID = idgen.ParentTaskID(request.Url, request.UrlMeta)
reuse = ptm.storageManager.FindPartialCompletedTask(taskID, rg)
reuse = ptm.storageManager.FindPartialCompletedTask(taskID, reuseRange)
if reuse == nil {
return nil, false
}
}
if rg == nil {
if reuseRange == nil {
log = logger.With("peer", request.PeerId, "task", taskID, "component", "reuseFilePeerTask")
log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength)
length = reuse.ContentLength
@ -70,7 +84,7 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
"component", "reuseRangeFilePeerTask")
log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s",
reuse.PeerID, reuse.ContentLength, request.UrlMeta.Range)
length = rg.Length
length = reuseRange.Length
}
_, span := tracer.Start(ctx, config.SpanReusePeerTask, trace.WithSpanKind(trace.SpanKindClient))
@ -80,7 +94,7 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
span.SetAttributes(config.AttributePeerID.String(request.PeerId))
span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID))
span.SetAttributes(semconv.HTTPURLKey.String(request.Url))
if rg != nil {
if reuseRange != nil {
span.SetAttributes(config.AttributeReuseRange.String(request.UrlMeta.Range))
}
defer span.End()
@ -89,7 +103,7 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
span.AddEvent("reuse peer task", trace.WithAttributes(config.AttributePeerID.String(reuse.PeerID)))
start := time.Now()
if rg == nil || request.KeepOriginalOffset {
if reuseRange == nil || request.KeepOriginalOffset {
storeRequest := &storage.StoreRequest{
CommonTaskRequest: storage.CommonTaskRequest{
PeerID: reuse.PeerID,
@ -103,7 +117,7 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
}
err = ptm.storageManager.Store(ctx, storeRequest)
} else {
err = ptm.storePartialFile(ctx, request, log, reuse, rg)
err = ptm.storePartialFile(ctx, request, log, reuse, reuseRange)
}
if err != nil {
@ -168,36 +182,41 @@ func (ptm *peerTaskManager) storePartialFile(ctx context.Context, request *FileT
func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context,
request *StreamTaskRequest) (io.ReadCloser, map[string]string, bool) {
taskID := idgen.TaskID(request.URL, request.URLMeta)
var reuse *storage.ReusePeerTask
var (
reuse *storage.ReusePeerTask
reuseRange *clientutil.Range // the range of parent peer task data to read
log *logger.SugaredLoggerOnWith
length int64
)
if ptm.enabledPrefetch(request.Range) {
reuse = ptm.storageManager.FindCompletedSubTask(taskID)
} else {
reuse = ptm.storageManager.FindCompletedTask(taskID)
}
var (
rg *clientutil.Range // the range of parent peer task data to read
log *logger.SugaredLoggerOnWith
)
if reuse == nil {
// for ranged request, check the parent task
if request.Range == nil {
return nil, nil, false
}
rg = request.Range
// for ranged request, check the parent task
reuseRange = request.Range
taskID = idgen.ParentTaskID(request.URL, request.URLMeta)
reuse = ptm.storageManager.FindPartialCompletedTask(taskID, rg)
reuse = ptm.storageManager.FindPartialCompletedTask(taskID, reuseRange)
if reuse == nil {
return nil, nil, false
}
}
if rg == nil {
if reuseRange == nil {
log = logger.With("peer", request.PeerID, "task", taskID, "component", "reuseStreamPeerTask")
log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength)
length = reuse.ContentLength
} else {
log = logger.With("peer", request.PeerID, "task", taskID, "component", "reuseRangeStreamPeerTask")
log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s",
reuse.PeerID, reuse.ContentLength, request.URLMeta.Range)
length = reuseRange.Length
}
ctx, span := tracer.Start(ctx, config.SpanStreamTask, trace.WithSpanKind(trace.SpanKindClient))
@ -207,13 +226,13 @@ func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context,
span.SetAttributes(config.AttributePeerID.String(request.PeerID))
span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID))
span.SetAttributes(semconv.HTTPURLKey.String(request.URL))
if rg != nil {
if reuseRange != nil {
span.SetAttributes(config.AttributeReuseRange.String(request.URLMeta.Range))
}
defer span.End()
rc, err := ptm.storageManager.ReadAllPieces(ctx,
&storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: rg})
&storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: reuseRange})
if err != nil {
log.Errorf("read pieces error when reuse peer task: %s", err)
span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
@ -224,12 +243,20 @@ func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context,
attr := map[string]string{}
attr[config.HeaderDragonflyTask] = taskID
attr[config.HeaderDragonflyPeer] = request.PeerID
if rg != nil {
attr[headers.ContentLength] = fmt.Sprintf("%d", length)
if reuseRange != nil {
attr[config.HeaderDragonflyRange] = request.URLMeta.Range
attr[headers.ContentRange] = fmt.Sprintf("bytes %d-%d/%d", rg.Start, rg.Start+rg.Length-1, reuse.ContentLength)
attr[headers.ContentLength] = fmt.Sprintf("%d", rg.Length)
} else {
attr[headers.ContentLength] = fmt.Sprintf("%d", reuse.ContentLength)
attr[headers.ContentRange] = fmt.Sprintf("bytes %d-%d/%d", reuseRange.Start,
reuseRange.Start+reuseRange.Length-1, reuse.ContentLength)
} else if request.Range != nil {
// the length is from reuse task, ensure it equal with request
if length != request.Range.Length {
log.Errorf("target task length %d did not match range length %d", length, request.Range.Length)
return nil, nil, false
}
attr[headers.ContentRange] = fmt.Sprintf("bytes %d-%d/*", request.Range.Start,
request.Range.Start+request.Range.Length-1)
}
// TODO record time when file closed, need add a type to implement Close and WriteTo

View File

@ -0,0 +1,501 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package peer
import (
"bytes"
"context"
"io"
"os"
"path"
"testing"
"github.com/go-http-utils/headers"
"github.com/golang/mock/gomock"
testifyassert "github.com/stretchr/testify/assert"
"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/client/daemon/test"
ms "d7y.io/dragonfly/v2/client/daemon/test/mock/storage"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
)
func TestReuseFilePeerTask(t *testing.T) {
ctrl := gomock.NewController(t)
assert := testifyassert.New(t)
testBytes, err := os.ReadFile(test.File)
assert.Nil(err)
testOutput := path.Join(os.TempDir(), "d7y-reuse-output.data")
defer os.Remove(testOutput)
var testCases = []struct {
name string
request *FileTaskRequest
enablePrefetch bool
storageManager func(sm *ms.MockManager)
verify func(pg chan *FileTaskProgress, ok bool)
}{
{
name: "normal completed task found",
request: &FileTaskRequest{
PeerTaskRequest: scheduler.PeerTaskRequest{
PeerId: "",
Url: "http://example.com/1",
UrlMeta: &base.UrlMeta{
Digest: "",
Tag: "",
Range: "",
Filter: "",
Header: nil,
},
},
Output: testOutput,
Range: nil,
},
enablePrefetch: false,
storageManager: func(sm *ms.MockManager) {
var taskID string
sm.EXPECT().FindCompletedTask(gomock.Any()).DoAndReturn(
func(id string) *storage.ReusePeerTask {
taskID = id
return &storage.ReusePeerTask{
PeerTaskMetadata: storage.PeerTaskMetadata{
TaskID: taskID,
},
ContentLength: 10,
TotalPieces: 0,
PieceMd5Sign: "",
}
})
sm.EXPECT().Store(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, req *storage.StoreRequest) error {
return os.WriteFile(req.Destination, testBytes[0:10], 0644)
})
},
verify: func(pg chan *FileTaskProgress, ok bool) {
assert.True(ok)
data, err := os.ReadFile(testOutput)
assert.Nil(err)
assert.Equal(testBytes[0:10], data)
},
},
{
name: "normal completed task not found",
request: &FileTaskRequest{
PeerTaskRequest: scheduler.PeerTaskRequest{
PeerId: "",
Url: "http://example.com/1",
UrlMeta: &base.UrlMeta{
Digest: "",
Tag: "",
Range: "",
Filter: "",
Header: nil,
},
},
Output: testOutput,
Range: nil,
},
enablePrefetch: false,
storageManager: func(sm *ms.MockManager) {
//sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn(
// func(taskID string, rg *clientutil.Range) *storage.ReusePeerTask {
// return nil
// })
//sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn(
// func(taskID string) *storage.ReusePeerTask {
// return nil
// })
sm.EXPECT().FindCompletedTask(gomock.Any()).DoAndReturn(
func(taskID string) *storage.ReusePeerTask {
return nil
})
},
verify: func(pg chan *FileTaskProgress, ok bool) {
assert.False(ok)
assert.Nil(pg)
},
},
{
name: "normal completed subtask found",
request: &FileTaskRequest{
PeerTaskRequest: scheduler.PeerTaskRequest{
PeerId: "",
Url: "http://example.com/1",
UrlMeta: &base.UrlMeta{
Digest: "",
Tag: "",
Range: "",
Filter: "",
Header: nil,
},
},
Output: testOutput,
Range: &clientutil.Range{Start: 200, Length: 100},
},
enablePrefetch: true,
storageManager: func(sm *ms.MockManager) {
var taskID string
sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn(
func(id string) *storage.ReusePeerTask {
taskID = id
return &storage.ReusePeerTask{
PeerTaskMetadata: storage.PeerTaskMetadata{
TaskID: taskID,
},
ContentLength: 10,
TotalPieces: 0,
PieceMd5Sign: "",
}
})
sm.EXPECT().Store(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, req *storage.StoreRequest) error {
return os.WriteFile(req.Destination, testBytes[200:300], 0644)
})
},
verify: func(pg chan *FileTaskProgress, ok bool) {
assert.True(ok)
data, err := os.ReadFile(testOutput)
assert.Nil(err)
assert.Equal(testBytes[200:300], data)
},
},
{
name: "normal completed subtask not found",
request: &FileTaskRequest{
PeerTaskRequest: scheduler.PeerTaskRequest{
PeerId: "",
Url: "http://example.com/1",
UrlMeta: &base.UrlMeta{
Digest: "",
Tag: "",
Range: "",
Filter: "",
Header: nil,
},
},
Output: testOutput,
Range: &clientutil.Range{Start: 0, Length: 10},
},
enablePrefetch: true,
storageManager: func(sm *ms.MockManager) {
sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn(
func(taskID string, rg *clientutil.Range) *storage.ReusePeerTask {
return nil
})
sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn(
func(taskID string) *storage.ReusePeerTask {
return nil
})
},
verify: func(pg chan *FileTaskProgress, ok bool) {
assert.False(ok)
assert.Nil(pg)
},
},
{
name: "partial task found",
request: &FileTaskRequest{
PeerTaskRequest: scheduler.PeerTaskRequest{
PeerId: "",
Url: "http://example.com/1",
UrlMeta: &base.UrlMeta{
Digest: "",
Tag: "",
Range: "",
Filter: "",
Header: nil,
},
},
Output: testOutput,
Range: &clientutil.Range{Start: 300, Length: 100},
},
enablePrefetch: true,
storageManager: func(sm *ms.MockManager) {
var taskID string
sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn(
func(id string) *storage.ReusePeerTask {
return nil
})
sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn(
func(id string, rg *clientutil.Range) *storage.ReusePeerTask {
taskID = id
return &storage.ReusePeerTask{
PeerTaskMetadata: storage.PeerTaskMetadata{
TaskID: taskID,
},
ContentLength: 100,
TotalPieces: 0,
PieceMd5Sign: "",
}
})
sm.EXPECT().ReadAllPieces(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) {
assert.Equal(taskID, req.TaskID)
return io.NopCloser(bytes.NewBuffer(testBytes[300:400])), nil
})
},
verify: func(pg chan *FileTaskProgress, ok bool) {
assert.True(ok)
data, err := os.ReadFile(testOutput)
assert.Nil(err)
assert.Equal(testBytes[300:400], data)
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer os.Remove(testOutput)
sm := ms.NewMockManager(ctrl)
tc.storageManager(sm)
ptm := &peerTaskManager{
host: &scheduler.PeerHost{},
enablePrefetch: tc.enablePrefetch,
storageManager: sm,
}
tc.verify(ptm.tryReuseFilePeerTask(context.Background(), tc.request))
})
}
}
func TestReuseStreamPeerTask(t *testing.T) {
ctrl := gomock.NewController(t)
assert := testifyassert.New(t)
var testCases = []struct {
name string
request *StreamTaskRequest
enablePrefetch bool
storageManager func(sm *ms.MockManager)
verify func(rc io.ReadCloser, attr map[string]string, ok bool)
}{
{
name: "normal completed task found",
request: &StreamTaskRequest{
URL: "http://example.com/1",
URLMeta: &base.UrlMeta{
Digest: "",
Tag: "",
Range: "",
Filter: "",
Header: nil,
},
Range: nil,
PeerID: "",
},
enablePrefetch: false,
storageManager: func(sm *ms.MockManager) {
var taskID string
sm.EXPECT().FindCompletedTask(gomock.Any()).DoAndReturn(
func(id string) *storage.ReusePeerTask {
taskID = id
return &storage.ReusePeerTask{
PeerTaskMetadata: storage.PeerTaskMetadata{
TaskID: taskID,
},
ContentLength: 10,
TotalPieces: 0,
PieceMd5Sign: "",
}
})
sm.EXPECT().ReadAllPieces(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) {
assert.Equal(taskID, req.TaskID)
return io.NopCloser(bytes.NewBuffer([]byte("1111111111"))), nil
})
},
verify: func(rc io.ReadCloser, attr map[string]string, ok bool) {
assert.True(ok)
assert.NotNil(rc)
assert.Equal("10", attr[headers.ContentLength])
_, exist := attr[headers.ContentRange]
assert.False(exist)
},
},
{
name: "normal completed task not found",
request: &StreamTaskRequest{
URL: "http://example.com/1",
URLMeta: &base.UrlMeta{
Digest: "",
Tag: "",
Range: "",
Filter: "",
Header: nil,
},
Range: nil,
PeerID: "",
},
enablePrefetch: false,
storageManager: func(sm *ms.MockManager) {
//sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn(
// func(taskID string, rg *clientutil.Range) *storage.ReusePeerTask {
// return nil
// })
//sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn(
// func(taskID string) *storage.ReusePeerTask {
// return nil
// })
sm.EXPECT().FindCompletedTask(gomock.Any()).DoAndReturn(
func(taskID string) *storage.ReusePeerTask {
return nil
})
},
verify: func(rc io.ReadCloser, attr map[string]string, ok bool) {
assert.False(ok)
assert.Nil(rc)
assert.Nil(attr)
},
},
{
name: "normal completed subtask found",
request: &StreamTaskRequest{
URL: "http://example.com/1",
URLMeta: &base.UrlMeta{
Digest: "",
Tag: "",
Range: "",
Filter: "",
Header: nil,
},
Range: &clientutil.Range{Start: 0, Length: 10},
PeerID: "",
},
enablePrefetch: true,
storageManager: func(sm *ms.MockManager) {
var taskID string
sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn(
func(id string) *storage.ReusePeerTask {
taskID = id
return &storage.ReusePeerTask{
PeerTaskMetadata: storage.PeerTaskMetadata{
TaskID: taskID,
},
ContentLength: 10,
TotalPieces: 0,
PieceMd5Sign: "",
}
})
sm.EXPECT().ReadAllPieces(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) {
assert.Equal(taskID, req.TaskID)
return io.NopCloser(bytes.NewBuffer([]byte("1111111111"))), nil
})
},
verify: func(rc io.ReadCloser, attr map[string]string, ok bool) {
assert.True(ok)
assert.NotNil(rc)
assert.Equal("10", attr[headers.ContentLength])
assert.Equal("bytes 0-9/*", attr[headers.ContentRange])
},
},
{
name: "normal completed subtask not found",
request: &StreamTaskRequest{
URL: "http://example.com/1",
URLMeta: &base.UrlMeta{
Digest: "",
Tag: "",
Range: "",
Filter: "",
Header: nil,
},
Range: &clientutil.Range{Start: 0, Length: 10},
PeerID: "",
},
enablePrefetch: true,
storageManager: func(sm *ms.MockManager) {
sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn(
func(taskID string, rg *clientutil.Range) *storage.ReusePeerTask {
return nil
})
sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn(
func(taskID string) *storage.ReusePeerTask {
return nil
})
},
verify: func(rc io.ReadCloser, attr map[string]string, ok bool) {
assert.False(ok)
assert.Nil(rc)
assert.Nil(attr)
},
},
{
name: "partial task found",
request: &StreamTaskRequest{
URL: "http://example.com/1",
URLMeta: &base.UrlMeta{
Digest: "",
Tag: "",
Range: "",
Filter: "",
Header: nil,
},
Range: &clientutil.Range{Start: 0, Length: 10},
PeerID: "",
},
enablePrefetch: true,
storageManager: func(sm *ms.MockManager) {
var taskID string
sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn(
func(id string) *storage.ReusePeerTask {
return nil
})
sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn(
func(id string, rg *clientutil.Range) *storage.ReusePeerTask {
taskID = id
return &storage.ReusePeerTask{
PeerTaskMetadata: storage.PeerTaskMetadata{
TaskID: taskID,
},
ContentLength: 100,
TotalPieces: 0,
PieceMd5Sign: "",
}
})
sm.EXPECT().ReadAllPieces(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) {
assert.Equal(taskID, req.TaskID)
return io.NopCloser(bytes.NewBuffer([]byte("1111111111"))), nil
})
},
verify: func(rc io.ReadCloser, attr map[string]string, ok bool) {
assert.True(ok)
assert.NotNil(rc)
assert.Equal("10", attr[headers.ContentLength])
assert.Equal("bytes 0-9/100", attr[headers.ContentRange])
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
sm := ms.NewMockManager(ctrl)
tc.storageManager(sm)
ptm := &peerTaskManager{
host: &scheduler.PeerHost{},
enablePrefetch: tc.enablePrefetch,
storageManager: sm,
}
tc.verify(ptm.tryReuseStreamPeerTask(context.Background(), tc.request))
})
}
}