chore: optimize defer and test (#1010)

* chore: optimize defer and test

Signed-off-by: Jim Ma <majinjing3@gmail.com>

* fix: random test failed

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2022-01-20 14:35:12 +08:00 committed by Gaius
parent 537667cfaa
commit e9f824e0a7
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
6 changed files with 73 additions and 29 deletions

View File

@ -296,7 +296,7 @@ func (ptm *peerTaskManager) newPeerTaskConductor(
return ptc, nil return ptc, nil
} }
func (pt *peerTaskConductor) run() { func (pt *peerTaskConductor) startPullAndBroadcastPieces() {
go pt.broker.Start() go pt.broker.Start()
go pt.pullPieces() go pt.pullPieces()
} }
@ -869,7 +869,15 @@ func (pt *peerTaskConductor) waitAvailablePeerPacket() (int32, bool) {
} }
func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceRequest, piecePacket *base.PiecePacket) { func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceRequest, piecePacket *base.PiecePacket) {
pt.Debugf("dispatch piece request, piece count: %d", len(piecePacket.PieceInfos)) pieceCount := len(piecePacket.PieceInfos)
pt.Debugf("dispatch piece request, piece count: %d", pieceCount)
// fix cdn return zero piece info, but with total piece count and content length
if pieceCount == 0 {
finished := pt.isCompleted()
if finished {
pt.Done()
}
}
for _, piece := range piecePacket.PieceInfos { for _, piece := range piecePacket.PieceInfos {
pt.Infof("get piece %d from %s/%s, digest: %s, start: %d, size: %d", pt.Infof("get piece %d from %s/%s, digest: %s, start: %d, size: %d",
piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize) piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize)
@ -1132,8 +1140,10 @@ func (pt *peerTaskConductor) Done() {
} }
func (pt *peerTaskConductor) done() { func (pt *peerTaskConductor) done() {
defer pt.span.End() defer func() {
defer pt.broker.Stop() pt.broker.Stop()
pt.span.End()
}()
var ( var (
cost = time.Now().Sub(pt.start).Milliseconds() cost = time.Now().Sub(pt.start).Milliseconds()
success = true success = true
@ -1213,9 +1223,11 @@ func (pt *peerTaskConductor) Fail() {
func (pt *peerTaskConductor) fail() { func (pt *peerTaskConductor) fail() {
metrics.PeerTaskFailedCount.Add(1) metrics.PeerTaskFailedCount.Add(1)
defer pt.span.End() defer func() {
defer pt.broker.Stop() close(pt.failCh)
defer close(pt.failCh) pt.broker.Stop()
pt.span.End()
}()
pt.peerTaskManager.PeerTaskDone(pt.taskID) pt.peerTaskManager.PeerTaskDone(pt.taskID)
var end = time.Now() var end = time.Now()
pt.Log().Errorf("stream peer task failed, code: %d, reason: %s", pt.failedCode, pt.failedReason) pt.Log().Errorf("stream peer task failed, code: %d, reason: %s", pt.failedCode, pt.failedReason)

View File

@ -84,7 +84,7 @@ func (ptm *peerTaskManager) newFileTask(
request *FileTaskRequest, request *FileTaskRequest,
limit rate.Limit) (context.Context, *fileTask, error) { limit rate.Limit) (context.Context, *fileTask, error) {
metrics.FileTaskCount.Add(1) metrics.FileTaskCount.Add(1)
ptc, err := ptm.getOrCreatePeerTaskConductor(ctx, idgen.TaskID(request.Url, request.UrlMeta), &request.PeerTaskRequest, limit) ptc, err := ptm.getPeerTaskConductor(ctx, idgen.TaskID(request.Url, request.UrlMeta), &request.PeerTaskRequest, limit)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -158,18 +158,35 @@ func (ptm *peerTaskManager) findPeerTaskConductor(taskID string) (*peerTaskCondu
return pt.(*peerTaskConductor), true return pt.(*peerTaskConductor), true
} }
func (ptm *peerTaskManager) getPeerTaskConductor(ctx context.Context,
taskID string,
request *scheduler.PeerTaskRequest,
limit rate.Limit) (*peerTaskConductor, error) {
ptc, created, err := ptm.getOrCreatePeerTaskConductor(ctx, taskID, request, limit)
if err != nil {
return nil, err
}
if created {
ptc.startPullAndBroadcastPieces()
}
return ptc, err
}
// getOrCreatePeerTaskConductor will get or create a peerTaskConductor,
// if created, return (ptc, true, nil), otherwise return (ptc, false, nil)
func (ptm *peerTaskManager) getOrCreatePeerTaskConductor( func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
ctx context.Context, ctx context.Context,
taskID string, taskID string,
request *scheduler.PeerTaskRequest, request *scheduler.PeerTaskRequest,
limit rate.Limit) (*peerTaskConductor, error) { limit rate.Limit) (*peerTaskConductor, bool, error) {
if ptc, ok := ptm.findPeerTaskConductor(taskID); ok { if ptc, ok := ptm.findPeerTaskConductor(taskID); ok {
logger.Debugf("peer task found: %s/%s", ptc.taskID, ptc.peerID) logger.Debugf("peer task found: %s/%s", ptc.taskID, ptc.peerID)
return ptc, nil return ptc, false, nil
} }
// FIXME merge register peer tasks
ptc, err := ptm.newPeerTaskConductor(ctx, request, limit) ptc, err := ptm.newPeerTaskConductor(ctx, request, limit)
if err != nil { if err != nil {
return nil, err return nil, false, err
} }
ptm.conductorLock.Lock() ptm.conductorLock.Lock()
@ -180,13 +197,11 @@ func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
p.taskID, p.peerID, ptc.taskID, ptc.peerID) p.taskID, p.peerID, ptc.taskID, ptc.peerID)
// cancel duplicate peer task // cancel duplicate peer task
ptc.cancel(base.Code_ClientContextCanceled, reasonContextCanceled) ptc.cancel(base.Code_ClientContextCanceled, reasonContextCanceled)
return p, nil return p, false, nil
} }
ptm.runningPeerTasks.Store(taskID, ptc) ptm.runningPeerTasks.Store(taskID, ptc)
ptm.conductorLock.Unlock() ptm.conductorLock.Unlock()
return ptc, true, nil
ptc.run()
return ptc, nil
} }
func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, *TinyData, error) { func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, *TinyData, error) {

View File

@ -25,6 +25,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
"runtime"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -43,6 +44,7 @@ import (
mock_daemon "d7y.io/dragonfly/v2/client/daemon/test/mock/daemon" mock_daemon "d7y.io/dragonfly/v2/client/daemon/test/mock/daemon"
mock_scheduler "d7y.io/dragonfly/v2/client/daemon/test/mock/scheduler" mock_scheduler "d7y.io/dragonfly/v2/client/daemon/test/mock/scheduler"
"d7y.io/dragonfly/v2/internal/dferrors" "d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/dfnet" "d7y.io/dragonfly/v2/internal/dfnet"
"d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc"
@ -327,7 +329,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
taskData: testBytes, taskData: testBytes,
pieceParallelCount: 4, pieceParallelCount: 4,
pieceSize: 1024, pieceSize: 1024,
peerID: "peer-0", peerID: "normal-size-peer",
url: "http://localhost/test/data", url: "http://localhost/test/data",
sizeScope: base.SizeScope_NORMAL, sizeScope: base.SizeScope_NORMAL,
mockPieceDownloader: commonPieceDownloader, mockPieceDownloader: commonPieceDownloader,
@ -338,7 +340,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
taskData: testBytes, taskData: testBytes,
pieceParallelCount: 4, pieceParallelCount: 4,
pieceSize: 16384, pieceSize: 16384,
peerID: "peer-0", peerID: "small-size-peer",
url: "http://localhost/test/data", url: "http://localhost/test/data",
sizeScope: base.SizeScope_SMALL, sizeScope: base.SizeScope_SMALL,
mockPieceDownloader: commonPieceDownloader, mockPieceDownloader: commonPieceDownloader,
@ -349,7 +351,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
taskData: testBytes[:64], taskData: testBytes[:64],
pieceParallelCount: 4, pieceParallelCount: 4,
pieceSize: 1024, pieceSize: 1024,
peerID: "peer-0", peerID: "tiny-size-peer",
url: "http://localhost/test/data", url: "http://localhost/test/data",
sizeScope: base.SizeScope_TINY, sizeScope: base.SizeScope_TINY,
mockPieceDownloader: nil, mockPieceDownloader: nil,
@ -360,7 +362,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
taskData: testBytes, taskData: testBytes,
pieceParallelCount: 4, pieceParallelCount: 4,
pieceSize: 1024, pieceSize: 1024,
peerID: "peer-0", peerID: "normal-size-peer-back-source",
backSource: true, backSource: true,
url: "http://localhost/test/data", url: "http://localhost/test/data",
sizeScope: base.SizeScope_NORMAL, sizeScope: base.SizeScope_NORMAL,
@ -383,7 +385,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
taskData: testBytes, taskData: testBytes,
pieceParallelCount: 4, pieceParallelCount: 4,
pieceSize: 1024, pieceSize: 1024,
peerID: "peer-0", peerID: "normal-size-peer-back-source-no-length",
backSource: true, backSource: true,
url: "http://localhost/test/data", url: "http://localhost/test/data",
sizeScope: base.SizeScope_NORMAL, sizeScope: base.SizeScope_NORMAL,
@ -402,11 +404,11 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
}, },
}, },
{ {
name: "normal size scope - back source - content length - aligning", name: "normal size scope - back source - no content length - aligning",
taskData: testBytes[:8192], taskData: testBytes[:8192],
pieceParallelCount: 4, pieceParallelCount: 4,
pieceSize: 1024, pieceSize: 1024,
peerID: "peer-0", peerID: "normal-size-peer-back-source-aligning-no-length",
backSource: true, backSource: true,
url: "http://localhost/test/data", url: "http://localhost/test/data",
sizeScope: base.SizeScope_NORMAL, sizeScope: base.SizeScope_NORMAL,
@ -429,7 +431,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
taskData: testBytes, taskData: testBytes,
pieceParallelCount: 4, pieceParallelCount: 4,
pieceSize: 1024, pieceSize: 1024,
peerID: "peer-0", peerID: "normal-size-peer-schedule-timeout",
peerPacketDelay: []time.Duration{time.Second}, peerPacketDelay: []time.Duration{time.Second},
scheduleTimeout: time.Nanosecond, scheduleTimeout: time.Nanosecond,
urlGenerator: func(ts *testSpec) string { urlGenerator: func(ts *testSpec) string {
@ -456,6 +458,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
require := testifyrequire.New(t) require := testifyrequire.New(t)
for _, typ := range taskTypes { for _, typ := range taskTypes {
// dup a new test case with the task type // dup a new test case with the task type
logger.Infof("-------------------- test %s - type %d, started --------------------", _tc.name, typ)
tc := _tc tc := _tc
tc.taskType = typ tc.taskType = typ
func() { func() {
@ -514,6 +517,7 @@ func TestPeerTaskManager_TaskSuite(t *testing.T) {
tc.run(assert, require, mm, urlMeta) tc.run(assert, require, mm, urlMeta)
}() }()
logger.Infof("-------------------- test %s - type %d, finished --------------------", _tc.name, typ)
} }
}) })
} }
@ -599,8 +603,9 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
PeerHost: &scheduler.PeerHost{}, PeerHost: &scheduler.PeerHost{},
} }
ptc, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*4)) ptc, created, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*4))
assert.Nil(err, "load first peerTaskConductor") assert.Nil(err, "load first peerTaskConductor")
assert.True(created, "should create a new peerTaskConductor")
switch ts.sizeScope { switch ts.sizeScope {
case base.SizeScope_TINY: case base.SizeScope_TINY:
@ -609,7 +614,7 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
require.NotNil(ptc.singlePiece) require.NotNil(ptc.singlePiece)
} }
var ptcCount = 10 var ptcCount = 100
var wg = &sync.WaitGroup{} var wg = &sync.WaitGroup{}
wg.Add(ptcCount + 1) wg.Add(ptcCount + 1)
@ -643,12 +648,20 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
} }
for i := 0; i < ptcCount; i++ { for i := 0; i < ptcCount; i++ {
p, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*3)) request := &scheduler.PeerTaskRequest{
Url: ts.url,
UrlMeta: urlMeta,
PeerId: fmt.Sprintf("should-not-use-peer-%d", i),
PeerHost: &scheduler.PeerHost{},
}
p, created, err := ptm.getOrCreatePeerTaskConductor(context.Background(), taskID, request, rate.Limit(pieceSize*3))
assert.Nil(err, fmt.Sprintf("load peerTaskConductor %d", i)) assert.Nil(err, fmt.Sprintf("load peerTaskConductor %d", i))
assert.Equal(ptc.peerID, p.GetPeerID(), fmt.Sprintf("ptc %d should be same with ptc", i)) assert.Equal(ptc.peerID, p.GetPeerID(), fmt.Sprintf("ptc %d should be same with ptc", i))
assert.False(created, "should not create a new peerTaskConductor")
go syncFunc(i, p) go syncFunc(i, p)
} }
ptc.startPullAndBroadcastPieces()
wg.Wait() wg.Wait()
for i, r := range result { for i, r := range result {
@ -663,7 +676,10 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
case <-ptc.successCh: case <-ptc.successCh:
success = true success = true
case <-ptc.failCh: case <-ptc.failCh:
case <-time.After(10 * time.Minute): case <-time.After(5 * time.Minute):
buf := make([]byte, 16384)
buf = buf[:runtime.Stack(buf, true)]
fmt.Printf("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
} }
assert.True(success, "task should success") assert.True(success, "task should success")
@ -675,6 +691,7 @@ func (ts *testSpec) runConductorTest(assert *testifyassert.Assertions, require *
if noRunningTask { if noRunningTask {
break break
} }
noRunningTask = true
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
assert.True(noRunningTask, "no running tasks") assert.True(noRunningTask, "no running tasks")

View File

@ -67,7 +67,7 @@ func (ptm *peerTaskManager) newStreamTask(
if ptm.perPeerRateLimit > 0 { if ptm.perPeerRateLimit > 0 {
limit = ptm.perPeerRateLimit limit = ptm.perPeerRateLimit
} }
ptc, err := ptm.getOrCreatePeerTaskConductor(ctx, idgen.TaskID(request.Url, request.UrlMeta), request, limit) ptc, err := ptm.getPeerTaskConductor(ctx, idgen.TaskID(request.Url, request.UrlMeta), request, limit)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -95,7 +95,7 @@ var (
ErrTaskNotFound = errors.New("task not found") ErrTaskNotFound = errors.New("task not found")
ErrPieceNotFound = errors.New("piece not found") ErrPieceNotFound = errors.New("piece not found")
ErrPieceCountNotSet = errors.New("total piece count not set") ErrPieceCountNotSet = errors.New("total piece count not set")
ErrDigestNotSet = errors.New("piece digest not set") ErrDigestNotSet = errors.New("digest not set")
ErrInvalidDigest = errors.New("invalid digest") ErrInvalidDigest = errors.New("invalid digest")
) )