diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 5fdf1cf9d..088733041 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -88,7 +88,9 @@ jobs: - name: Run E2E test run: | make build-e2e-sha256sum - ginkgo -v -r --race --fail-fast --cover --trace --progress --skip=${{ matrix.skip }} test/e2e -- --feature-gates=dfget-range=true + # generate an empty file + docker exec kind-control-plane touch /tmp/empty-file + ginkgo -v -r --race --fail-fast --cover --trace --progress --skip=${{ matrix.skip }} test/e2e -- --feature-gates=dfget-range=true --feature-gates=dfget-empty-file=true cat coverprofile.out >> coverage.txt - name: Upload coverage to Codecov diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index d236d64d1..5ae03b59c 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -340,6 +340,16 @@ func (pt *peerTaskConductor) register() error { if result.ExtendAttribute != nil { header = result.ExtendAttribute.Header } + case commonv1.SizeScope_EMPTY: + tinyData = &TinyData{ + TaskID: result.TaskId, + PeerID: pt.request.PeerId, + Content: []byte{}, + } + pt.span.SetAttributes(config.AttributePeerTaskSizeScope.String("empty")) + if result.ExtendAttribute != nil { + header = result.ExtendAttribute.Header + } } } @@ -550,6 +560,8 @@ func (pt *peerTaskConductor) pullPieces() { return } switch pt.sizeScope { + case commonv1.SizeScope_EMPTY: + pt.storeEmptyPeerTask() case commonv1.SizeScope_TINY: pt.storeTinyPeerTask() case commonv1.SizeScope_SMALL: @@ -580,6 +592,37 @@ func (pt *peerTaskConductor) pullPiecesWithP2P() { pt.receivePeerPacket(pieceRequestCh) } +func (pt *peerTaskConductor) storeEmptyPeerTask() { + pt.SetContentLength(0) + pt.SetTotalPieces(0) + ctx := pt.ctx + var err error + storageDriver, err := pt.StorageManager.RegisterTask(ctx, + &storage.RegisterTaskRequest{ + PeerTaskMetadata: storage.PeerTaskMetadata{ + PeerID: pt.peerID, + TaskID: pt.taskID, + }, + DesiredLocation: "", + ContentLength: 0, + TotalPieces: 0, + }) + pt.storage = storageDriver + if err != nil { + pt.Errorf("register tiny data storage failed: %s", err) + pt.cancel(commonv1.Code_ClientError, err.Error()) + return + } + + if err = pt.UpdateStorage(); err != nil { + pt.Errorf("update tiny data storage failed: %s", err) + pt.cancel(commonv1.Code_ClientError, err.Error()) + return + } + pt.Debug("store empty metadata") + pt.Done() +} + func (pt *peerTaskConductor) storeTinyPeerTask() { contentLength := int64(len(pt.tinyData.Content)) pt.SetContentLength(contentLength) diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index aff2bba7e..26ac9d92f 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -286,7 +286,7 @@ func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequ // FIXME when failed due to SchedulerClient error, relocate SchedulerClient and retry progress, err := pt.Start(ctx) - return progress, nil, err + return progress, pt.peerTaskConductor.tinyData, err } func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTaskRequest) (io.ReadCloser, map[string]string, error) { diff --git a/client/daemon/peer/peertask_piecetask_synchronizer.go b/client/daemon/peer/peertask_piecetask_synchronizer.go index 741e54fa3..c98c40716 100644 --- a/client/daemon/peer/peertask_piecetask_synchronizer.go +++ b/client/daemon/peer/peertask_piecetask_synchronizer.go @@ -332,7 +332,7 @@ func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *commonv1.Piece pieceCount := len(piecePacket.PieceInfos) s.Debugf("dispatch piece request, piece count: %d, dest peer: %s", pieceCount, s.dstPeer.PeerId) - // fix cdn return zero piece info, but with total piece count and content length + // peers maybe send zero piece info, but with total piece count and content length if pieceCount == 0 { finished := s.peerTaskConductor.isCompleted() if finished { diff --git a/client/daemon/peer/piece_manager.go b/client/daemon/peer/piece_manager.go index 055018dc1..e39cb625f 100644 --- a/client/daemon/peer/piece_manager.go +++ b/client/daemon/peer/piece_manager.go @@ -350,6 +350,7 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, peerTaskReq TaskID: pt.GetTaskID(), }, ContentLength: targetContentLength, + TotalPieces: pt.GetTotalPieces(), Header: &metadata.Header, }) if err != nil { @@ -412,6 +413,7 @@ singleDownload: TaskID: pt.GetTaskID(), }, ContentLength: contentLength, + TotalPieces: pt.GetTotalPieces(), Header: &response.Header, }) if err != nil { @@ -498,6 +500,7 @@ func (pm *pieceManager) downloadKnownLengthSource(ctx context.Context, pt Task, TaskID: pt.GetTaskID(), }, ContentLength: targetContentLength, + TotalPieces: pt.GetTotalPieces(), Header: &metadata.Header, }) if err != nil { diff --git a/client/daemon/rpcserver/subscriber.go b/client/daemon/rpcserver/subscriber.go index 7dfdabbed..4c76bcf7e 100644 --- a/client/daemon/rpcserver/subscriber.go +++ b/client/daemon/rpcserver/subscriber.go @@ -83,7 +83,8 @@ func sendExistPieces( log.Errorf("get piece error: %s", err) return -1, err } - if len(pp.PieceInfos) == 0 && skipSendZeroPiece { + // when ContentLength is zero, it's an empty file, need send metadata + if pp.ContentLength != 0 && len(pp.PieceInfos) == 0 && skipSendZeroPiece { return pp.TotalPiece, nil } if err = sync.Send(pp); err != nil { @@ -209,7 +210,8 @@ loop: s.Infof("peer task is success, send remaining pieces") s.Lock() // all pieces already sent - if s.totalPieces > -1 && nextPieceNum == uint32(s.totalPieces) { + // empty piece task will reach sendExistPieces to sync content length and piece count + if s.totalPieces > 0 && nextPieceNum == uint32(s.totalPieces) { s.Unlock() break loop } diff --git a/client/daemon/storage/local_storage.go b/client/daemon/storage/local_storage.go index 6088ed400..95ba16a82 100644 --- a/client/daemon/storage/local_storage.go +++ b/client/daemon/storage/local_storage.go @@ -214,6 +214,11 @@ func (t *localTaskStore) UpdateTask(ctx context.Context, req *UpdateTaskRequest) if req.ContentLength > t.persistentMetadata.ContentLength { t.ContentLength = req.ContentLength t.Debugf("update content length: %d", t.ContentLength) + // update empty file TotalPieces + // the default req.TotalPieces is 0, need check ContentLength + if t.ContentLength == 0 { + t.TotalPieces = 0 + } } if req.TotalPieces > 0 { t.TotalPieces = req.TotalPieces @@ -420,7 +425,7 @@ func (t *localTaskStore) GetPieces(ctx context.Context, req *commonv1.PieceTaskR ContentLength: t.ContentLength, PieceMd5Sign: t.PieceMd5Sign, } - if t.TotalPieces > -1 && int32(req.StartNum) >= t.TotalPieces { + if t.TotalPieces > 0 && int32(req.StartNum) >= t.TotalPieces { t.Warnf("invalid start num: %d", req.StartNum) } for i := int32(0); i < int32(req.Limit); i++ { diff --git a/client/daemon/storage/local_storage_subtask.go b/client/daemon/storage/local_storage_subtask.go index b4b610645..412aed86d 100644 --- a/client/daemon/storage/local_storage_subtask.go +++ b/client/daemon/storage/local_storage_subtask.go @@ -255,7 +255,15 @@ func (t *localSubTaskStore) UpdateTask(ctx context.Context, req *UpdateTaskReque t.parent.touch() t.Lock() defer t.Unlock() - t.persistentMetadata.ContentLength = req.ContentLength + if req.ContentLength > t.persistentMetadata.ContentLength { + t.ContentLength = req.ContentLength + t.Debugf("update content length: %d", t.ContentLength) + // update empty file TotalPieces + // the default req.TotalPieces is 0, need check ContentLength + if t.ContentLength == 0 { + t.TotalPieces = 0 + } + } if req.TotalPieces > 0 { t.TotalPieces = req.TotalPieces t.Debugf("update total pieces: %d", t.TotalPieces) diff --git a/go.mod b/go.mod index db4587dc4..a2936e8c9 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.18 require ( - d7y.io/api v1.1.6 + d7y.io/api v1.1.7 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 github.com/VividCortex/mysqlerr v1.0.0 diff --git a/go.sum b/go.sum index 7b8c8725c..ef5bcfdd9 100644 --- a/go.sum +++ b/go.sum @@ -70,8 +70,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= cloud.google.com/go/storage v1.22.1/go.mod h1:S8N1cAStu7BOeFfE8KAQzmyyLkK8p/vmRq6kuBTW58Y= -d7y.io/api v1.1.6 h1:0osnBvnLmS9Jb1hAGyzNlBnGosmrPXkCTmNJNXOtFf4= -d7y.io/api v1.1.6/go.mod h1:dvakC+UhcyPoLbGk4gHPoQ9h9gJr7+p70lCgHS7WPNM= +d7y.io/api v1.1.7 h1:oafz16gQeFHjRfVdW8uAKfiNT8/4vHaemS0vcCm+SMY= +d7y.io/api v1.1.7/go.mod h1:dvakC+UhcyPoLbGk4gHPoQ9h9gJr7+p70lCgHS7WPNM= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0/go.mod h1:h6H6c8enJmmocHUbLiiGY6sx7f9i+X3m1CHdd5c6Rdw= diff --git a/hack/gen-buildx.sh b/hack/gen-buildx.sh index b546fdb3f..ccbbc6024 100755 --- a/hack/gen-buildx.sh +++ b/hack/gen-buildx.sh @@ -1,19 +1,15 @@ #!/bin/bash -components="scheduler manager-server" +components="dfdaemon scheduler manager" set -x for c in ${components}; do file=build/images/"${c}"/Dockerfile sed -i '1i# syntax=docker/dockerfile:1.3' "${file}" - sed -i "s#RUN make build-$c && make install-$c#RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/root/go/ export GOPATH=/root/go \&\& make build-$c \&\& make install-$c#" "${file}" + sed -i "s#RUN make build#RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/root/go/ export GOPATH=/root/go \&\& make build#" "${file}" done -# dfdaemon is subcommand, need to update alone -sed -i '1i# syntax=docker/dockerfile:1.3' build/images/dfdaemon/Dockerfile -sed -i "s#RUN make build-dfget && make install-dfget#RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/root/go/ export GOPATH=/root/go \&\& make build-dfget \&\& make install-dfget#" build/images/dfdaemon/Dockerfile - # buildx need "--load" to export images to docker sed -i 's/docker build/docker build --load/' hack/docker-build.sh sed -i 's/docker build/docker build --load/' test/tools/no-content-length/build.sh diff --git a/scheduler/resource/peer.go b/scheduler/resource/peer.go index 7bb78a14b..08a90dbd9 100644 --- a/scheduler/resource/peer.go +++ b/scheduler/resource/peer.go @@ -51,6 +51,9 @@ const ( // Peer has been created but did not start running. PeerStatePending = "Pending" + // Peer successfully registered as empty scope size. + PeerStateReceivedEmpty = "ReceivedEmpty" + // Peer successfully registered as tiny scope size. PeerStateReceivedTiny = "ReceivedTiny" @@ -77,6 +80,9 @@ const ( ) const ( + // Peer is registered as empty scope size. + PeerEventRegisterEmpty = "RegisterEmpty" + // Peer is registered as tiny scope size. PeerEventRegisterTiny = "RegisterTiny" @@ -203,27 +209,32 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer { p.FSM = fsm.NewFSM( PeerStatePending, fsm.Events{ + {Name: PeerEventRegisterEmpty, Src: []string{PeerStatePending}, Dst: PeerStateReceivedEmpty}, {Name: PeerEventRegisterTiny, Src: []string{PeerStatePending}, Dst: PeerStateReceivedTiny}, {Name: PeerEventRegisterSmall, Src: []string{PeerStatePending}, Dst: PeerStateReceivedSmall}, {Name: PeerEventRegisterNormal, Src: []string{PeerStatePending}, Dst: PeerStateReceivedNormal}, - {Name: PeerEventDownload, Src: []string{PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal}, Dst: PeerStateRunning}, - {Name: PeerEventDownloadBackToSource, Src: []string{PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal, PeerStateRunning}, Dst: PeerStateBackToSource}, + {Name: PeerEventDownload, Src: []string{PeerStateReceivedEmpty, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal}, Dst: PeerStateRunning}, + {Name: PeerEventDownloadBackToSource, Src: []string{PeerStateReceivedEmpty, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal, PeerStateRunning}, Dst: PeerStateBackToSource}, {Name: PeerEventDownloadSucceeded, Src: []string{ // Since ReportPeerResult and ReportPieceResult are called in no order, // the result may be reported after the register is successful. - PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal, + PeerStateReceivedEmpty, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal, PeerStateRunning, PeerStateBackToSource, }, Dst: PeerStateSucceeded}, {Name: PeerEventDownloadFailed, Src: []string{ - PeerStatePending, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal, + PeerStatePending, PeerStateReceivedEmpty, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal, PeerStateRunning, PeerStateBackToSource, PeerStateSucceeded, }, Dst: PeerStateFailed}, {Name: PeerEventLeave, Src: []string{ - PeerStatePending, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal, + PeerStatePending, PeerStateReceivedEmpty, PeerStateReceivedTiny, PeerStateReceivedSmall, PeerStateReceivedNormal, PeerStateRunning, PeerStateBackToSource, PeerStateFailed, PeerStateSucceeded, }, Dst: PeerStateLeave}, }, fsm.Callbacks{ + PeerEventRegisterEmpty: func(e *fsm.Event) { + p.UpdateAt.Store(time.Now()) + p.Log.Infof("peer state is %s", e.FSM.Current()) + }, PeerEventRegisterTiny: func(e *fsm.Event) { p.UpdateAt.Store(time.Now()) p.Log.Infof("peer state is %s", e.FSM.Current()) diff --git a/scheduler/resource/seed_peer.go b/scheduler/resource/seed_peer.go index 72a5bb22f..7952bada1 100644 --- a/scheduler/resource/seed_peer.go +++ b/scheduler/resource/seed_peer.go @@ -112,31 +112,33 @@ func (s *seedPeer) TriggerTask(ctx context.Context, task *Task) (*Peer, *schedul } } - // Handle begin of piece. - if piece.PieceInfo != nil && piece.PieceInfo.PieceNum == common.BeginOfPiece { - peer.Log.Infof("receive begin of piece from seed peer: %#v %#v", piece, piece.PieceInfo) - if err := peer.FSM.Event(PeerEventDownload); err != nil { - return nil, nil, err + if piece.PieceInfo != nil { + // Handle begin of piece. + if piece.PieceInfo.PieceNum == common.BeginOfPiece { + peer.Log.Infof("receive begin of piece from seed peer: %#v %#v", piece, piece.PieceInfo) + if err := peer.FSM.Event(PeerEventDownload); err != nil { + return nil, nil, err + } + + continue } - continue + // Handle piece download successfully. + peer.Log.Infof("receive piece from seed peer: %#v %#v", piece, piece.PieceInfo) + peer.Pieces.Add(&schedulerv1.PieceResult{ + TaskId: task.ID, + SrcPid: peer.ID, + BeginTime: piece.BeginTime, + EndTime: piece.EndTime, + Success: true, + PieceInfo: piece.PieceInfo, + ExtendAttribute: piece.ExtendAttribute, + }) + peer.FinishedPieces.Set(uint(piece.PieceInfo.PieceNum)) + peer.AppendPieceCost(pkgtime.SubNano(int64(piece.EndTime), int64(piece.BeginTime)).Milliseconds()) + task.StorePiece(piece.PieceInfo) } - // Handle piece download successfully. - peer.Log.Infof("receive piece from seed peer: %#v %#v", piece, piece.PieceInfo) - peer.Pieces.Add(&schedulerv1.PieceResult{ - TaskId: task.ID, - SrcPid: peer.ID, - BeginTime: piece.BeginTime, - EndTime: piece.EndTime, - Success: true, - PieceInfo: piece.PieceInfo, - ExtendAttribute: piece.ExtendAttribute, - }) - peer.FinishedPieces.Set(uint(piece.PieceInfo.PieceNum)) - peer.AppendPieceCost(pkgtime.SubNano(int64(piece.EndTime), int64(piece.BeginTime)).Milliseconds()) - task.StorePiece(piece.PieceInfo) - // Handle end of piece. if piece.Done { peer.Log.Infof("receive done piece") diff --git a/scheduler/resource/task.go b/scheduler/resource/task.go index 10251a807..a8d042358 100644 --- a/scheduler/resource/task.go +++ b/scheduler/resource/task.go @@ -37,6 +37,9 @@ const ( // Tiny file size is 128 bytes. TinyFileSize = 128 + // Empty file size is 0 bytes. + EmptyFileSize = 0 + // Peer failure limit in task. FailedPeerCountLimit = 200 @@ -383,10 +386,14 @@ func (t *Task) SizeScope() (commonv1.SizeScope, error) { return -1, errors.New("invalid content length") } - if t.TotalPieceCount.Load() <= 0 { + if t.TotalPieceCount.Load() < 0 { return -1, errors.New("invalid total piece count") } + if t.ContentLength.Load() == EmptyFileSize { + return commonv1.SizeScope_EMPTY, nil + } + if t.ContentLength.Load() <= TinyFileSize { return commonv1.SizeScope_TINY, nil } diff --git a/scheduler/resource/task_test.go b/scheduler/resource/task_test.go index d934922f8..379a46896 100644 --- a/scheduler/resource/task_test.go +++ b/scheduler/resource/task_test.go @@ -1252,6 +1252,21 @@ func TestTask_SizeScope(t *testing.T) { assert.Equal(sizeScope, commonv1.SizeScope_TINY) }, }, + { + name: "scope size is empty", + id: mockTaskID, + urlMeta: mockTaskURLMeta, + url: mockTaskURL, + backToSourceLimit: mockTaskBackToSourceLimit, + contentLength: 0, + totalPieceCount: 0, + expect: func(t *testing.T, task *Task) { + assert := assert.New(t) + sizeScope, err := task.SizeScope() + assert.NoError(err) + assert.Equal(sizeScope, commonv1.SizeScope_EMPTY) + }, + }, { name: "scope size is small", id: mockTaskID, @@ -1303,7 +1318,7 @@ func TestTask_SizeScope(t *testing.T) { url: mockTaskURL, backToSourceLimit: mockTaskBackToSourceLimit, contentLength: TinyFileSize + 1, - totalPieceCount: 0, + totalPieceCount: -1, expect: func(t *testing.T, task *Task) { assert := assert.New(t) _, err := task.SizeScope() diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 60b6d3ac0..52a9db14e 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -99,6 +99,22 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTas if task.FSM.Is(resource.TaskStateSucceeded) && err == nil { peer.Log.Info("task can be reused") switch sizeScope { + case commonv1.SizeScope_EMPTY: + peer.Log.Info("task size scope is empty and return empty content directly") + if err := peer.FSM.Event(resource.PeerEventRegisterEmpty); err != nil { + msg := fmt.Sprintf("peer %s register is failed: %s", req.PeerId, err.Error()) + peer.Log.Error(msg) + return nil, dferrors.New(commonv1.Code_SchedError, msg) + } + + return &schedulerv1.RegisterResult{ + TaskId: task.ID, + TaskType: task.Type, + SizeScope: commonv1.SizeScope_EMPTY, + DirectPiece: &schedulerv1.RegisterResult_PieceContent{ + PieceContent: []byte{}, + }, + }, nil case commonv1.SizeScope_TINY: peer.Log.Info("task size scope is tiny and return piece content directly") if len(task.DirectPiece) > 0 && int64(len(task.DirectPiece)) == task.ContentLength.Load() { diff --git a/scheduler/service/service_test.go b/scheduler/service/service_test.go index 357dac6c1..05679d9e7 100644 --- a/scheduler/service/service_test.go +++ b/scheduler/service/service_test.go @@ -264,6 +264,41 @@ func TestService_RegisterPeerTask(t *testing.T) { assert.Equal(peer.NeedBackToSource.Load(), false) }, }, + { + name: "task scope size is SizeScope_EMPTY", + req: &schedulerv1.PeerTaskRequest{ + UrlMeta: &commonv1.UrlMeta{}, + PeerHost: &schedulerv1.PeerHost{ + Id: mockRawHost.Id, + }, + }, + mock: func( + req *schedulerv1.PeerTaskRequest, mockPeer *resource.Peer, mockSeedPeer *resource.Peer, + scheduler scheduler.Scheduler, res resource.Resource, hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, + ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder, + ) { + mockPeer.Task.FSM.SetState(resource.TaskStateSucceeded) + mockPeer.Task.StorePeer(mockSeedPeer) + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.LoadOrStore(gomock.Any()).Return(mockPeer.Task, true).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockPeer.Host.ID)).Return(mockPeer.Host, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.LoadOrStore(gomock.Any()).Return(mockPeer, true).Times(1), + ) + }, + expect: func(t *testing.T, peer *resource.Peer, result *schedulerv1.RegisterResult, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.Equal(result.TaskId, peer.Task.ID) + assert.Equal(result.SizeScope, commonv1.SizeScope_EMPTY) + assert.Equal(result.DirectPiece, &schedulerv1.RegisterResult_PieceContent{ + PieceContent: []byte{}, + }) + assert.Equal(peer.NeedBackToSource.Load(), false) + }, + }, { name: "task scope size is SizeScope_TINY", req: &schedulerv1.PeerTaskRequest{ diff --git a/test/e2e/dfget_test.go b/test/e2e/dfget_test.go index 0d3c8c462..d9009ce47 100644 --- a/test/e2e/dfget_test.go +++ b/test/e2e/dfget_test.go @@ -32,23 +32,30 @@ import ( var _ = Describe("Download with dfget and proxy", func() { Context("dfget", func() { - files := getFileSizes() singleDfgetTest("dfget daemon download should be ok", dragonflyNamespace, "component=dfdaemon", - "dragonfly-dfdaemon-", "dfdaemon", files) + "dragonfly-dfdaemon-", "dfdaemon") for i := 0; i < 3; i++ { singleDfgetTest( fmt.Sprintf("dfget daemon proxy-%d should be ok", i), dragonflyE2ENamespace, fmt.Sprintf("statefulset.kubernetes.io/pod-name=proxy-%d", i), - "proxy-", "proxy", files) + "proxy-", "proxy") } }) }) func getFileSizes() map[string]int { - var details = map[string]int{} - for _, path := range e2eutil.GetFileList() { + var ( + details = map[string]int{} + files = e2eutil.GetFileList() + ) + + if featureGates.Enabled(featureGateEmptyFile) { + fmt.Printf("dfget-empty-file feature gate enabled\n") + files = append(files, "/tmp/empty-file") + } + for _, path := range files { out, err := e2eutil.DockerCommand("stat", "--printf=%s", path).CombinedOutput() Expect(err).NotTo(HaveOccurred()) size, err := strconv.Atoi(string(out)) @@ -59,6 +66,12 @@ func getFileSizes() map[string]int { } func getRandomRange(size int) *util.Range { + if size == 0 { + return &util.Range{ + Start: 0, + Length: 0, + } + } rnd := rand.New(rand.NewSource(time.Now().UnixNano())) r1 := rnd.Intn(size - 1) r2 := rnd.Intn(size - 1) @@ -77,8 +90,9 @@ func getRandomRange(size int) *util.Range { return rg } -func singleDfgetTest(name, ns, label, podNamePrefix, container string, fileDetails map[string]int) { +func singleDfgetTest(name, ns, label, podNamePrefix, container string) { It(name, func() { + fileDetails := getFileSizes() out, err := e2eutil.KubeCtlCommand("-n", ns, "get", "pod", "-l", label, "-o", "jsonpath='{range .items[*]}{.metadata.name}{end}'").CombinedOutput() podName := strings.Trim(string(out), "'") diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 00bcdd479..dca27c809 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -36,16 +36,12 @@ var ( featureGates = featuregate.NewFeatureGate() featureGatesFlag string - featureGateRange featuregate.Feature = "dfget-range" - featureGateCommit featuregate.Feature = "dfget-commit" - featureGateNoLength featuregate.Feature = "dfget-no-length" + featureGateRange featuregate.Feature = "dfget-range" + featureGateCommit featuregate.Feature = "dfget-commit" + featureGateNoLength featuregate.Feature = "dfget-no-length" + featureGateEmptyFile featuregate.Feature = "dfget-empty-file" defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ - featureGateRange: { - Default: false, - LockToDefault: false, - PreRelease: featuregate.Alpha, - }, featureGateCommit: { Default: true, LockToDefault: false, @@ -56,6 +52,16 @@ var ( LockToDefault: false, PreRelease: featuregate.Alpha, }, + featureGateRange: { + Default: false, + LockToDefault: false, + PreRelease: featuregate.Alpha, + }, + featureGateEmptyFile: { + Default: false, + LockToDefault: false, + PreRelease: featuregate.Alpha, + }, } )