From e9f824e0a7fc4552cd9fdc70e5d0dcaadf2a147c Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Thu, 20 Jan 2022 14:35:12 +0800 Subject: [PATCH] chore: optimize defer and test (#1010) * chore: optimize defer and test Signed-off-by: Jim Ma * fix: random test failed Signed-off-by: Jim Ma --- client/daemon/peer/peertask_conductor.go | 26 +++++++++---- client/daemon/peer/peertask_file.go | 2 +- client/daemon/peer/peertask_manager.go | 29 +++++++++++---- client/daemon/peer/peertask_manager_test.go | 41 +++++++++++++++------ client/daemon/peer/peertask_stream.go | 2 +- client/daemon/storage/storage_manager.go | 2 +- 6 files changed, 73 insertions(+), 29 deletions(-) diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index 1766fdcc4..e0bb515a2 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -296,7 +296,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor( return ptc, nil } -func (pt *peerTaskConductor) run() { +func (pt *peerTaskConductor) startPullAndBroadcastPieces() { go pt.broker.Start() go pt.pullPieces() } @@ -869,7 +869,15 @@ func (pt *peerTaskConductor) waitAvailablePeerPacket() (int32, bool) { } func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceRequest, piecePacket *base.PiecePacket) { - pt.Debugf("dispatch piece request, piece count: %d", len(piecePacket.PieceInfos)) + pieceCount := len(piecePacket.PieceInfos) + pt.Debugf("dispatch piece request, piece count: %d", pieceCount) + // fix cdn return zero piece info, but with total piece count and content length + if pieceCount == 0 { + finished := pt.isCompleted() + if finished { + pt.Done() + } + } for _, piece := range piecePacket.PieceInfos { pt.Infof("get piece %d from %s/%s, digest: %s, start: %d, size: %d", piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize) @@ -1132,8 +1140,10 @@ func (pt *peerTaskConductor) Done() { } func (pt *peerTaskConductor) done() { - defer pt.span.End() - defer pt.broker.Stop() + defer func() { + pt.broker.Stop() + pt.span.End() + }() var ( cost = time.Now().Sub(pt.start).Milliseconds() success = true @@ -1213,9 +1223,11 @@ func (pt *peerTaskConductor) Fail() { func (pt *peerTaskConductor) fail() { metrics.PeerTaskFailedCount.Add(1) - defer pt.span.End() - defer pt.broker.Stop() - defer close(pt.failCh) + defer func() { + close(pt.failCh) + pt.broker.Stop() + pt.span.End() + }() pt.peerTaskManager.PeerTaskDone(pt.taskID) var end = time.Now() pt.Log().Errorf("stream peer task failed, code: %d, reason: %s", pt.failedCode, pt.failedReason) diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 1ea1c5c4d..103c1da77 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -84,7 +84,7 @@ func (ptm *peerTaskManager) newFileTask( request *FileTaskRequest, limit rate.Limit) (context.Context, *fileTask, error) { metrics.FileTaskCount.Add(1) - ptc, err := ptm.getOrCreatePeerTaskConductor(ctx, idgen.TaskID(request.Url, request.UrlMeta), &request.PeerTaskRequest, limit) + ptc, err := ptm.getPeerTaskConductor(ctx, idgen.TaskID(request.Url, request.UrlMeta), &request.PeerTaskRequest, limit) if err != nil { return nil, nil, err } diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index 77617ed8d..3d931b7b0 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -158,18 +158,35 @@ func (ptm *peerTaskManager) findPeerTaskConductor(taskID string) (*peerTaskCondu return pt.(*peerTaskConductor), true } +func (ptm *peerTaskManager) getPeerTaskConductor(ctx context.Context, + taskID string, + request *scheduler.PeerTaskRequest, + limit rate.Limit) (*peerTaskConductor, error) { + ptc, created, err := ptm.getOrCreatePeerTaskConductor(ctx, taskID, request, limit) + if err != nil { + return nil, err + } + if created { + ptc.startPullAndBroadcastPieces() + } + return ptc, err +} + +// getOrCreatePeerTaskConductor will get or create a peerTaskConductor, +// if created, return (ptc, true, nil), otherwise return (ptc, false, nil) func (ptm *peerTaskManager) getOrCreatePeerTaskConductor( ctx context.Context, taskID string, request *scheduler.PeerTaskRequest, - limit rate.Limit) (*peerTaskConductor, error) { + limit rate.Limit) (*peerTaskConductor, bool, error) { if ptc, ok := ptm.findPeerTaskConductor(taskID); ok { logger.Debugf("peer task found: %s/%s", ptc.taskID, ptc.peerID) - return ptc, nil + return ptc, false, nil } + // FIXME merge register peer tasks ptc, err := ptm.newPeerTaskConductor(ctx, request, limit) if err != nil { - return nil, err + return nil, false, err } ptm.conductorLock.Lock() @@ -180,13 +197,11 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor( p.taskID, p.peerID, ptc.taskID, ptc.peerID) // cancel duplicate peer task ptc.cancel(base.Code_ClientContextCanceled, reasonContextCanceled) - return p, nil + return p, false, nil } ptm.runningPeerTasks.Store(taskID, ptc) ptm.conductorLock.Unlock() - - ptc.run() - return ptc, nil + return ptc, true, nil } func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, *TinyData, error) { diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index 063a9ca43..034809dc6 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -25,6 +25,7 @@ import ( "net/http" "net/http/httptest" "os" + "runtime" "sync" "testing" "time" @@ -43,6 +44,7 @@ import ( mock_daemon "d7y.io/dragonfly/v2/client/daemon/test/mock/daemon" mock_scheduler "d7y.io/dragonfly/v2/client/daemon/test/mock/scheduler" "d7y.io/dragonfly/v2/internal/dferrors" + logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/dfnet" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/rpc" @@ -327,7 +329,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { taskData: testBytes, pieceParallelCount: 4, pieceSize: 1024, - peerID: "peer-0", + peerID: "normal-size-peer", url: "http://localhost/test/data", sizeScope: base.SizeScope_NORMAL, mockPieceDownloader: commonPieceDownloader, @@ -338,7 +340,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { taskData: testBytes, pieceParallelCount: 4, pieceSize: 16384, - peerID: "peer-0", + peerID: "small-size-peer", url: "http://localhost/test/data", sizeScope: base.SizeScope_SMALL, mockPieceDownloader: commonPieceDownloader, @@ -349,7 +351,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { taskData: testBytes[:64], pieceParallelCount: 4, pieceSize: 1024, - peerID: "peer-0", + peerID: "tiny-size-peer", url: "http://localhost/test/data", sizeScope: base.SizeScope_TINY, mockPieceDownloader: nil, @@ -360,7 +362,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { taskData: testBytes, pieceParallelCount: 4, pieceSize: 1024, - peerID: "peer-0", + peerID: "normal-size-peer-back-source", backSource: true, url: "http://localhost/test/data", sizeScope: base.SizeScope_NORMAL, @@ -383,7 +385,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { taskData: testBytes, pieceParallelCount: 4, pieceSize: 1024, - peerID: "peer-0", + peerID: "normal-size-peer-back-source-no-length", backSource: true, url: "http://localhost/test/data", sizeScope: base.SizeScope_NORMAL, @@ -402,11 +404,11 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { }, }, { - name: "normal size scope - back source - content length - aligning", + name: "normal size scope - back source - no content length - aligning", taskData: testBytes[:8192], pieceParallelCount: 4, pieceSize: 1024, - peerID: "peer-0", + peerID: "normal-size-peer-back-source-aligning-no-length", backSource: true, url: "http://localhost/test/data", sizeScope: base.SizeScope_NORMAL, @@ -429,7 +431,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { taskData: testBytes, pieceParallelCount: 4, pieceSize: 1024, - peerID: "peer-0", + peerID: "normal-size-peer-schedule-timeout", peerPacketDelay: []time.Duration{time.Second}, scheduleTimeout: time.Nanosecond, urlGenerator: func(ts *testSpec) string { @@ -456,6 +458,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { require := testifyrequire.New(t) for _, typ := range taskTypes { // dup a new test case with the task type + logger.Infof("-------------------- test %s - type %d, started --------------------", _tc.name, typ) tc := _tc tc.taskType = typ func() { @@ -514,6 +517,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) { tc.run(assert, require, mm, urlMeta) }() + logger.Infof("-------------------- test %s - type %d, finished --------------------", _tc.name, typ) } }) } @@ -599,8 +603,9 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require * PeerHost: &scheduler.PeerHost{}, } - ptc, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*4)) + ptc, created, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*4)) assert.Nil(err, "load first peerTaskConductor") + assert.True(created, "should create a new peerTaskConductor") switch ts.sizeScope { case base.SizeScope_TINY: @@ -609,7 +614,7 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require * require.NotNil(ptc.singlePiece) } - var ptcCount = 10 + var ptcCount = 100 var wg = &sync.WaitGroup{} wg.Add(ptcCount + 1) @@ -643,12 +648,20 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require * } for i := 0; i < ptcCount; i++ { - p, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*3)) + request := &scheduler.PeerTaskRequest{ + Url: ts.url, + UrlMeta: urlMeta, + PeerId: fmt.Sprintf("should-not-use-peer-%d", i), + PeerHost: &scheduler.PeerHost{}, + } + p, created, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*3)) assert.Nil(err, fmt.Sprintf("load peerTaskConductor %d", i)) assert.Equal(ptc.peerID, p.GetPeerID(), fmt.Sprintf("ptc %d should be same with ptc", i)) + assert.False(created, "should not create a new peerTaskConductor") go syncFunc(i, p) } + ptc.startPullAndBroadcastPieces() wg.Wait() for i, r := range result { @@ -663,7 +676,10 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require * case <-ptc.successCh: success = true case <-ptc.failCh: - case <-time.After(10 * time.Minute): + case <-time.After(5 * time.Minute): + buf := make([]byte, 16384) + buf = buf[:runtime.Stack(buf, true)] + fmt.Printf("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) } assert.True(success, "task should success") @@ -675,6 +691,7 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require * if noRunningTask { break } + noRunningTask = true time.Sleep(100 * time.Millisecond) } assert.True(noRunningTask, "no running tasks") diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index c6690ba19..50bf2b1c6 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -67,7 +67,7 @@ func (ptm *peerTaskManager) newStreamTask( if ptm.perPeerRateLimit > 0 { limit = ptm.perPeerRateLimit } - ptc, err := ptm.getOrCreatePeerTaskConductor(ctx, idgen.TaskID(request.Url, request.UrlMeta), request, limit) + ptc, err := ptm.getPeerTaskConductor(ctx, idgen.TaskID(request.Url, request.UrlMeta), request, limit) if err != nil { return nil, err } diff --git a/client/daemon/storage/storage_manager.go b/client/daemon/storage/storage_manager.go index c90c83584..74e966542 100644 --- a/client/daemon/storage/storage_manager.go +++ b/client/daemon/storage/storage_manager.go @@ -95,7 +95,7 @@ var ( ErrTaskNotFound = errors.New("task not found") ErrPieceNotFound = errors.New("piece not found") ErrPieceCountNotSet = errors.New("total piece count not set") - ErrDigestNotSet = errors.New("piece digest not set") + ErrDigestNotSet = errors.New("digest not set") ErrInvalidDigest = errors.New("invalid digest") )