diff --git a/go.mod b/go.mod index bca5a109b..ed8d73db8 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.20 require ( - d7y.io/api v1.6.4 + d7y.io/api v1.6.8 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 github.com/VividCortex/mysqlerr v1.0.0 diff --git a/go.sum b/go.sum index eb026ee61..b40c70d8e 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api v1.6.4 h1:h+fe8/EXAdP6lqyg0OAgbbn2JoMhe2EdbSbUHHRd9eE= -d7y.io/api v1.6.4/go.mod h1:7G3t9YO5esDzQVUgdUrS+6yCDAMWS5c9ux8yX5L9Ync= +d7y.io/api v1.6.8 h1:/oNEZC8FC8P1vPHlzgtJbBQzh5lnf0mZ+9VBx/Nq+iU= +d7y.io/api v1.6.8/go.mod h1:LgmoxxoRDzBiseGFxNWqQP5qsro8+lhYSGwR+/Chplw= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/pkg/idgen/task_id.go b/pkg/idgen/task_id.go index 62fe127f7..311f29a9b 100644 --- a/pkg/idgen/task_id.go +++ b/pkg/idgen/task_id.go @@ -17,6 +17,7 @@ package idgen import ( + "fmt" "strings" commonv1 "d7y.io/api/pkg/apis/common/v1" @@ -91,11 +92,11 @@ func parseFilters(rawFilters string) []string { } // TaskIDV2 generates v2 version of task id. -func TaskIDV2(url, digest, tag, application string, filters []string) string { +func TaskIDV2(url, digest, tag, application string, pieceLength int32, filters []string) string { url, err := neturl.FilterQuery(url, filters) if err != nil { url = "" } - return pkgdigest.SHA256FromStrings(url, digest, tag, application) + return pkgdigest.SHA256FromStrings(url, digest, tag, application, fmt.Sprint(pieceLength)) } diff --git a/pkg/idgen/task_id_test.go b/pkg/idgen/task_id_test.go index dd36e77aa..8f411608c 100644 --- a/pkg/idgen/task_id_test.go +++ b/pkg/idgen/task_id_test.go @@ -113,6 +113,7 @@ func TestTaskIDV2(t *testing.T) { digest string tag string application string + pieceLength int32 filters []string expect func(t *testing.T, d any) }{ @@ -122,10 +123,11 @@ func TestTaskIDV2(t *testing.T) { digest: "sha256:c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4", tag: "foo", application: "bar", + pieceLength: 1, filters: []string{}, expect: func(t *testing.T, d any) { assert := assert.New(t) - assert.Equal(d, "c8659b8372599cf22c7a2de260dd6e148fca6d4e1c2940703022867f739d071d") + assert.Equal(d, "6acf73532a2e7b8c30dfc7abce2fd7d2a2cd3746f16b0d54d3e2f136ffa61c90") }, }, { @@ -134,7 +136,7 @@ func TestTaskIDV2(t *testing.T) { digest: "sha256:c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4", expect: func(t *testing.T, d any) { assert := assert.New(t) - assert.Equal(d, "60469c583429af631a45540f05e08805b31ca4f84e7974cad35cfc84c197bcf8") + assert.Equal(d, "b08a435da662ad5ae8ab8359a9c4ebd5027cf14d04b71ccc85f1e197e898adbd") }, }, { @@ -143,7 +145,7 @@ func TestTaskIDV2(t *testing.T) { tag: "foo", expect: func(t *testing.T, d any) { assert := assert.New(t) - assert.Equal(d, "2773851c628744fb7933003195db436ce397c1722920696c4274ff804d86920b") + assert.Equal(d, "274c3716c538b5a49e7296ee36dd412bae29948dfb6153e5ac9694e382144f83") }, }, { @@ -152,7 +154,16 @@ func TestTaskIDV2(t *testing.T) { application: "bar", expect: func(t *testing.T, d any) { assert := assert.New(t) - assert.Equal(d, "63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d") + assert.Equal(d, "ca12c6591c38f726c238f35d9c7945559b52a0dcc10ae191920be6f5f8a0326a") + }, + }, + { + name: "generate taskID with pieceLength", + url: "https://example.com", + pieceLength: 1, + expect: func(t *testing.T, d any) { + assert := assert.New(t) + assert.Equal(d, "614fb0088e7d82b2538f1ccb5861db5940aaa665b587792898e4be1f591bafec") }, }, { @@ -161,14 +172,14 @@ func TestTaskIDV2(t *testing.T) { filters: []string{"foo", "bar"}, expect: func(t *testing.T, d any) { assert := assert.New(t) - assert.Equal(d, "100680ad546ce6a577f42f52df33b4cfdca756859e664b8d7de329b150d09ce9") + assert.Equal(d, "4a89bbe790108d4987e7dc5127df2b99aea1c17828f1ff3e55176f49ac974b28") }, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tc.expect(t, TaskIDV2(tc.url, tc.digest, tc.tag, tc.application, tc.filters)) + tc.expect(t, TaskIDV2(tc.url, tc.digest, tc.tag, tc.application, tc.pieceLength, tc.filters)) }) } } diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index 23503e679..b93d45dbf 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -46,19 +46,19 @@ var ( // Variables declared for metrics. var ( - AnnouncePeerCount = promauto.NewCounterVec(prometheus.CounterOpts{ + AnnouncePeerCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "announce_peer_total", Help: "Counter of the number of the announcing peer.", - }, []string{"tag", "app"}) + }) - AnnouncePeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ + AnnouncePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "announce_peer_failure_total", Help: "Counter of the number of failed of the announcing peer.", - }, []string{"tag", "app"}) + }) StatPeerCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, diff --git a/scheduler/resource/task_manager.go b/scheduler/resource/task_manager.go index 6f4a497ea..0bf1386dd 100644 --- a/scheduler/resource/task_manager.go +++ b/scheduler/resource/task_manager.go @@ -19,7 +19,6 @@ package resource import ( - "context" "sync" pkggc "d7y.io/dragonfly/v2/pkg/gc" @@ -112,21 +111,10 @@ func (t *taskManager) RunGC() error { return true } - // If task state is TaskStateLeave, it will be reclaimed. - if task.FSM.Is(TaskStateLeave) { + // If there is no peer then task will be reclaimed. + if task.PeerCount() == 0 { task.Log.Info("task has been reclaimed") t.Delete(task.ID) - return true - } - - // If there is no peer then switch the task state to TaskStateLeave. - if task.PeerCount() == 0 { - if err := task.FSM.Event(context.Background(), TaskEventLeave); err != nil { - task.Log.Errorf("task fsm event failed: %s", err.Error()) - return true - } - - task.Log.Info("task peer count is zero, causing the task to leave") } return true diff --git a/scheduler/resource/task_manager_test.go b/scheduler/resource/task_manager_test.go index c46172053..149053cd3 100644 --- a/scheduler/resource/task_manager_test.go +++ b/scheduler/resource/task_manager_test.go @@ -315,14 +315,7 @@ func TestTaskManager_RunGC(t *testing.T) { taskManager.Store(mockTask) err := taskManager.RunGC() assert.NoError(err) - - task, loaded := taskManager.Load(mockTask.ID) - assert.Equal(loaded, true) - assert.Equal(task.FSM.Current(), TaskStateLeave) - - err = taskManager.RunGC() - assert.NoError(err) - _, loaded = taskManager.Load(mockTask.ID) + _, loaded := taskManager.Load(mockTask.ID) assert.Equal(loaded, false) }, }, diff --git a/scheduler/resource/task_test.go b/scheduler/resource/task_test.go index 07f6d643d..d71020a94 100644 --- a/scheduler/resource/task_test.go +++ b/scheduler/resource/task_test.go @@ -51,7 +51,7 @@ var ( mockTaskBackToSourceLimit int32 = 200 mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilters) + mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilters) mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") mockTaskTag = "d7y" mockTaskApplication = "foo" diff --git a/scheduler/rpcserver/scheduler_server_v2.go b/scheduler/rpcserver/scheduler_server_v2.go index 4a751ac0a..2d4c2c7a2 100644 --- a/scheduler/rpcserver/scheduler_server_v2.go +++ b/scheduler/rpcserver/scheduler_server_v2.go @@ -52,6 +52,12 @@ func newSchedulerServerV2( // AnnouncePeer announces peer to scheduler. func (s *schedulerServerV2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error { + metrics.AnnouncePeerCount.Inc() + if err := s.service.AnnouncePeer(stream); err != nil { + metrics.AnnouncePeerFailureCount.Inc() + return err + } + return nil } diff --git a/scheduler/scheduling/evaluator/evaluator_base_test.go b/scheduler/scheduling/evaluator/evaluator_base_test.go index d5a35e8e3..682c62263 100644 --- a/scheduler/scheduling/evaluator/evaluator_base_test.go +++ b/scheduler/scheduling/evaluator/evaluator_base_test.go @@ -131,7 +131,7 @@ var ( mockTaskBackToSourceLimit int32 = 200 mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilters) + mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilters) mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") mockTaskTag = "d7y" mockTaskApplication = "foo" diff --git a/scheduler/scheduling/mocks/scheduling_mock.go b/scheduler/scheduling/mocks/scheduling_mock.go index 87d528499..ed48452af 100644 --- a/scheduler/scheduling/mocks/scheduling_mock.go +++ b/scheduler/scheduling/mocks/scheduling_mock.go @@ -51,26 +51,28 @@ func (mr *MockSchedulingMockRecorder) FindCandidateParents(arg0, arg1, arg2 inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindCandidateParents", reflect.TypeOf((*MockScheduling)(nil).FindCandidateParents), arg0, arg1, arg2) } -// ScheduleCandidateParentsForNormalPeer mocks base method. -func (m *MockScheduling) ScheduleCandidateParentsForNormalPeer(arg0 context.Context, arg1 *resource.Peer, arg2 set.SafeSet[string]) { +// ScheduleCandidateParents mocks base method. +func (m *MockScheduling) ScheduleCandidateParents(arg0 context.Context, arg1 *resource.Peer, arg2 set.SafeSet[string]) error { m.ctrl.T.Helper() - m.ctrl.Call(m, "ScheduleCandidateParentsForNormalPeer", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "ScheduleCandidateParents", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 } -// ScheduleCandidateParentsForNormalPeer indicates an expected call of ScheduleCandidateParentsForNormalPeer. -func (mr *MockSchedulingMockRecorder) ScheduleCandidateParentsForNormalPeer(arg0, arg1, arg2 interface{}) *gomock.Call { +// ScheduleCandidateParents indicates an expected call of ScheduleCandidateParents. +func (mr *MockSchedulingMockRecorder) ScheduleCandidateParents(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleCandidateParentsForNormalPeer", reflect.TypeOf((*MockScheduling)(nil).ScheduleCandidateParentsForNormalPeer), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleCandidateParents", reflect.TypeOf((*MockScheduling)(nil).ScheduleCandidateParents), arg0, arg1, arg2) } -// ScheduleParentsForNormalPeer mocks base method. -func (m *MockScheduling) ScheduleParentsForNormalPeer(arg0 context.Context, arg1 *resource.Peer, arg2 set.SafeSet[string]) { +// ScheduleParentAndCandidateParents mocks base method. +func (m *MockScheduling) ScheduleParentAndCandidateParents(arg0 context.Context, arg1 *resource.Peer, arg2 set.SafeSet[string]) { m.ctrl.T.Helper() - m.ctrl.Call(m, "ScheduleParentsForNormalPeer", arg0, arg1, arg2) + m.ctrl.Call(m, "ScheduleParentAndCandidateParents", arg0, arg1, arg2) } -// ScheduleParentsForNormalPeer indicates an expected call of ScheduleParentsForNormalPeer. -func (mr *MockSchedulingMockRecorder) ScheduleParentsForNormalPeer(arg0, arg1, arg2 interface{}) *gomock.Call { +// ScheduleParentAndCandidateParents indicates an expected call of ScheduleParentAndCandidateParents. +func (mr *MockSchedulingMockRecorder) ScheduleParentAndCandidateParents(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleParentsForNormalPeer", reflect.TypeOf((*MockScheduling)(nil).ScheduleParentsForNormalPeer), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleParentAndCandidateParents", reflect.TypeOf((*MockScheduling)(nil).ScheduleParentAndCandidateParents), arg0, arg1, arg2) } diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index b179fbdcd..83a66ae9b 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -24,12 +24,13 @@ import ( "sort" "time" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" commonv1 "d7y.io/api/pkg/apis/common/v1" commonv2 "d7y.io/api/pkg/apis/common/v2" - errordetailsv2 "d7y.io/api/pkg/apis/errordetails/v2" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" @@ -41,13 +42,13 @@ import ( ) type Scheduling interface { - // ScheduleCandidateParentsForNormalPeer schedules candidate parents to the normal peer. + // ScheduleCandidateParents schedules candidate parents to the normal peer. // Used only in v2 version of the grpc. - ScheduleCandidateParentsForNormalPeer(context.Context, *resource.Peer, set.SafeSet[string]) + ScheduleCandidateParents(context.Context, *resource.Peer, set.SafeSet[string]) error - // ScheduleParentAndCandiateParentsForNormalPeer schedules a parent and candidate parents to the normal peer. + // ScheduleParentAndCandidateParents schedules a parent and candidate parents to the normal peer. // Used only in v1 version of the grpc. - ScheduleParentsForNormalPeer(context.Context, *resource.Peer, set.SafeSet[string]) + ScheduleParentAndCandidateParents(context.Context, *resource.Peer, set.SafeSet[string]) // FindCandidateParents finds candidate parents for the peer. FindCandidateParents(context.Context, *resource.Peer, set.SafeSet[string]) ([]*resource.Peer, bool) @@ -72,15 +73,15 @@ func New(cfg *config.SchedulerConfig, dynconfig config.DynconfigInterface, plugi } } -// ScheduleCandidateParentsForNormalPeer schedules candidate parents to the normal peer. +// ScheduleCandidateParents schedules candidate parents to the normal peer. // Used only in v2 version of the grpc. -func (s *scheduling) ScheduleCandidateParentsForNormalPeer(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet[string]) { +func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet[string]) error { var n int for { select { case <-ctx.Done(): peer.Log.Infof("context was done") - return + return ctx.Err() default: } @@ -95,38 +96,29 @@ func (s *scheduling) ScheduleCandidateParentsForNormalPeer(ctx context.Context, stream, loaded := peer.LoadAnnouncePeerStream() if !loaded { peer.Log.Error("load stream failed") - return + return status.Error(codes.FailedPrecondition, "load stream failed") } // Send NeedBackToSourceResponse to peer. - reason := fmt.Sprintf("send NeedBackToSourceResponse to peer, because of peer's NeedBackToSource is %t", peer.NeedBackToSource.Load()) + peer.Log.Infof("send NeedBackToSourceResponse, because of peer's NeedBackToSource is %t", peer.NeedBackToSource.Load()) if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Reason: reason, + Description: fmt.Sprintf("peer's NeedBackToSource is %t", peer.NeedBackToSource.Load()), }, }, }); err != nil { peer.Log.Error(err) - return + return status.Error(codes.FailedPrecondition, err.Error()) } - peer.Log.Info(reason) if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil { - peer.Log.Errorf("peer fsm event failed: %s", err.Error()) - return + msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) + peer.Log.Error(msg) + return status.Error(codes.Internal, err.Error()) } - // If the task state is TaskStateFailed, - // peer back-to-source and reset task state to TaskStateRunning. - if peer.Task.FSM.Is(resource.TaskStateFailed) { - if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownload); err != nil { - peer.Task.Log.Errorf("task fsm event failed: %s", err.Error()) - return - } - } - - return + return nil } // Check condition 2: @@ -135,78 +127,46 @@ func (s *scheduling) ScheduleCandidateParentsForNormalPeer(ctx context.Context, stream, loaded := peer.LoadAnnouncePeerStream() if !loaded { peer.Log.Error("load stream failed") - return + return status.Error(codes.FailedPrecondition, "load stream failed") } // Send NeedBackToSourceResponse to peer. - reason := fmt.Sprintf("send NeedBackToSourceResponse to peer, because of scheduling exceeded RetryBackToSourceLimit %d", s.config.RetryBackToSourceLimit) + peer.Log.Infof("send NeedBackToSourceResponse, because of scheduling exceeded RetryBackToSourceLimit %d", s.config.RetryBackToSourceLimit) if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Reason: reason, + Description: "scheduling exceeded RetryBackToSourceLimit", }, }, }); err != nil { peer.Log.Error(err) - return + return status.Error(codes.FailedPrecondition, err.Error()) } - peer.Log.Info(reason) if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil { - peer.Log.Errorf("peer fsm event failed: %s", err.Error()) - return + msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) + peer.Log.Error(msg) + return status.Error(codes.Internal, err.Error()) } - // If the task state is TaskStateFailed, - // peer back-to-source and reset task state to TaskStateRunning. - if peer.Task.FSM.Is(resource.TaskStateFailed) { - if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownload); err != nil { - peer.Task.Log.Errorf("task fsm event failed: %s", err.Error()) - return - } - } - - return + return nil } } - // Scheduling will send SchedulePeerFailed to peer. + // Scheduling will return schedule failed. // // Condition 1: Scheduling exceeds the RetryLimit. if n >= s.config.RetryLimit { - stream, loaded := peer.LoadAnnouncePeerStream() - if !loaded { - peer.Log.Error("load stream failed") - return - } - - // Send SchedulePeerFailed to peer. - reason := fmt.Sprintf("send SchedulePeerFailed to peer, because of scheduling exceeded RetryLimit %d", s.config.RetryLimit) - if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ - Errordetails: &schedulerv2.AnnouncePeerResponse_SchedulePeerFailed{ - SchedulePeerFailed: &errordetailsv2.SchedulePeerFailed{ - Description: reason, - }, - }, - }); err != nil { - peer.Log.Error(err) - return - } - - peer.Log.Error(reason) - return + peer.Log.Errorf("scheduling failed, because of scheduling exceeded RetryLimit %d", s.config.RetryLimit) + return status.Error(codes.FailedPrecondition, "scheduling exceeded RetryLimit") } // Scheduling will send NormalTaskResponse to peer. // // Condition 1: Scheduling can find candidate parents. if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil { - n++ - peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error()) - - // Sleep to avoid hot looping. - time.Sleep(s.config.RetryInterval) - continue + peer.Log.Error(err) + return status.Error(codes.Internal, err.Error()) } // Find candidate parents. @@ -223,49 +183,41 @@ func (s *scheduling) ScheduleCandidateParentsForNormalPeer(ctx context.Context, // Load AnnouncePeerStream from peer. stream, loaded := peer.LoadAnnouncePeerStream() if !loaded { - n++ - peer.Log.Errorf("scheduling failed in %d times, because of loading AnnouncePeerStream failed", n) - if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil { - peer.Log.Errorf("peer deletes inedges failed: %s", err.Error()) - return + msg := fmt.Sprintf("peer deletes inedges failed: %s", err.Error()) + peer.Log.Error(msg) + return status.Error(codes.Internal, msg) } - return + peer.Log.Error("load stream failed") + return status.Error(codes.FailedPrecondition, "load stream failed") } // Send NormalTaskResponse to peer. - peer.Log.Info("send NormalTaskResponse to peer") + peer.Log.Info("send NormalTaskResponse") if err := stream.Send(&schedulerv2.AnnouncePeerResponse{ Response: constructSuccessNormalTaskResponse(s.dynconfig, candidateParents), }); err != nil { - n++ - peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error()) - - if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil { - peer.Log.Errorf("peer deletes inedges failed: %s", err.Error()) - return - } - - return + peer.Log.Error(err) + return status.Error(codes.FailedPrecondition, err.Error()) } // Add edge from parent to peer. for _, candidateParent := range candidateParents { if err := peer.Task.AddPeerEdge(candidateParent, peer); err != nil { - peer.Log.Debugf("peer adds edge failed: %s", err.Error()) + peer.Log.Warnf("peer adds edge failed: %s", err.Error()) continue } } peer.Log.Infof("scheduling success in %d times", n+1) - return + return nil } } -// ScheduleParentsForNormalPeer schedules a parent and candidate parents to a peer. +// ScheduleParentAndCandidateParents schedules a parent and candidate parents to a peer. // Used only in v1 version of the grpc. -func (s *scheduling) ScheduleParentsForNormalPeer(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet[string]) { +func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet[string]) { var n int for { select { diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index 845c3ebb1..85f79f18a 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -28,12 +28,13 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "go.uber.org/atomic" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" commonv1 "d7y.io/api/pkg/apis/common/v1" commonv2 "d7y.io/api/pkg/apis/common/v2" - errordetailsv2 "d7y.io/api/pkg/apis/errordetails/v2" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" schedulerv1mocks "d7y.io/api/pkg/apis/scheduler/v1/mocks" schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" @@ -159,7 +160,7 @@ var ( mockTaskBackToSourceLimit int32 = 200 mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilters) + mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilters) mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") mockTaskTag = "d7y" mockTaskApplication = "foo" @@ -220,11 +221,11 @@ func TestScheduling_New(t *testing.T) { } } -func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) { +func TestScheduling_ScheduleCandidateParents(t *testing.T) { tests := []struct { name string mock func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) - expect func(t *testing.T, peer *resource.Peer) + expect func(t *testing.T, peer *resource.Peer, err error) }{ { name: "context was done", @@ -232,8 +233,9 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) { peer.FSM.SetState(resource.PeerStateRunning) cancel() }, - expect: func(t *testing.T, peer *resource.Peer) { + expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) + assert.ErrorIs(err, context.Canceled) assert.True(peer.FSM.Is(resource.PeerStateRunning)) assert.True(peer.Task.FSM.Is(resource.TaskStatePending)) }, @@ -246,8 +248,9 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) { peer.NeedBackToSource.Store(true) peer.FSM.SetState(resource.PeerStateRunning) }, - expect: func(t *testing.T, peer *resource.Peer) { + expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) + assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "load stream failed")) assert.Equal(len(peer.Parents()), 0) assert.True(peer.FSM.Is(resource.PeerStateRunning)) assert.True(peer.Task.FSM.Is(resource.TaskStatePending)) @@ -265,13 +268,14 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) { ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Reason: "send NeedBackToSourceResponse to peer, because of peer's NeedBackToSource is true", + Description: "peer's NeedBackToSource is true", }, }, })).Return(errors.New("foo")).Times(1) }, - expect: func(t *testing.T, peer *resource.Peer) { + expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) + assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "foo")) assert.Equal(len(peer.Parents()), 0) assert.True(peer.FSM.Is(resource.PeerStateRunning)) assert.True(peer.Task.FSM.Is(resource.TaskStatePending)) @@ -289,43 +293,19 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) { ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Reason: "send NeedBackToSourceResponse to peer, because of peer's NeedBackToSource is true", + Description: "peer's NeedBackToSource is true", }, }, })).Return(nil).Times(1) }, - expect: func(t *testing.T, peer *resource.Peer) { + expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) + assert.NoError(err) assert.Equal(len(peer.Parents()), 0) assert.True(peer.FSM.Is(resource.PeerStateBackToSource)) assert.True(peer.Task.FSM.Is(resource.TaskStatePending)) }, }, - { - name: "peer needs back-to-source and task state is TaskStateFailed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { - task := peer.Task - task.StorePeer(peer) - peer.NeedBackToSource.Store(true) - peer.FSM.SetState(resource.PeerStateRunning) - task.FSM.SetState(resource.TaskStateFailed) - peer.StoreAnnouncePeerStream(stream) - - ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ - Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ - NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Reason: "send NeedBackToSourceResponse to peer, because of peer's NeedBackToSource is true", - }, - }, - })).Return(nil).Times(1) - }, - expect: func(t *testing.T, peer *resource.Peer) { - assert := assert.New(t) - assert.Equal(len(peer.Parents()), 0) - assert.True(peer.FSM.Is(resource.PeerStateBackToSource)) - assert.True(peer.Task.FSM.Is(resource.TaskStateRunning)) - }, - }, { name: "schedule exceeds RetryBackToSourceLimit and peer stream load failed", mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { @@ -334,8 +314,9 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) { peer.FSM.SetState(resource.PeerStateRunning) md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1) }, - expect: func(t *testing.T, peer *resource.Peer) { + expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) + assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "load stream failed")) assert.Equal(len(peer.Parents()), 0) assert.True(peer.FSM.Is(resource.PeerStateRunning)) assert.True(peer.Task.FSM.Is(resource.TaskStatePending)) @@ -354,14 +335,15 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) { ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Reason: "send NeedBackToSourceResponse to peer, because of scheduling exceeded RetryBackToSourceLimit 1", + Description: "scheduling exceeded RetryBackToSourceLimit", }, }, })).Return(errors.New("foo")).Times(1), ) }, - expect: func(t *testing.T, peer *resource.Peer) { + expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) + assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "foo")) assert.Equal(len(peer.Parents()), 0) assert.True(peer.FSM.Is(resource.PeerStateRunning)) assert.True(peer.Task.FSM.Is(resource.TaskStatePending)) @@ -380,64 +362,22 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) { ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Reason: "send NeedBackToSourceResponse to peer, because of scheduling exceeded RetryBackToSourceLimit 1", + Description: "scheduling exceeded RetryBackToSourceLimit", }, }, })).Return(nil).Times(1), ) }, - expect: func(t *testing.T, peer *resource.Peer) { + expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) + assert.NoError(err) assert.Equal(len(peer.Parents()), 0) assert.True(peer.FSM.Is(resource.PeerStateBackToSource)) assert.True(peer.Task.FSM.Is(resource.TaskStatePending)) }, }, { - name: "schedule exceeds RetryBackToSourceLimit and task state is TaskStateFailed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { - task := peer.Task - task.StorePeer(peer) - peer.FSM.SetState(resource.PeerStateRunning) - task.FSM.SetState(resource.TaskStateFailed) - peer.StoreAnnouncePeerStream(stream) - - gomock.InOrder( - md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1), - ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ - Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{ - NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{ - Reason: "send NeedBackToSourceResponse to peer, because of scheduling exceeded RetryBackToSourceLimit 1", - }, - }, - })).Return(nil).Times(1), - ) - }, - expect: func(t *testing.T, peer *resource.Peer) { - assert := assert.New(t) - assert.Equal(len(peer.Parents()), 0) - assert.True(peer.FSM.Is(resource.PeerStateBackToSource)) - assert.True(peer.Task.FSM.Is(resource.TaskStateRunning)) - }, - }, - { - name: "schedule exceeds RetryLimit and peer stream load failed", - mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { - task := peer.Task - task.StorePeer(peer) - peer.FSM.SetState(resource.PeerStateRunning) - peer.Task.BackToSourceLimit.Store(-1) - md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2) - }, - expect: func(t *testing.T, peer *resource.Peer) { - assert := assert.New(t) - assert.Equal(len(peer.Parents()), 0) - assert.True(peer.FSM.Is(resource.PeerStateRunning)) - assert.True(peer.Task.FSM.Is(resource.TaskStatePending)) - }, - }, - { - name: "schedule exceeds RetryLimit and send SchedulePeerFailed failed", + name: "schedule exceeds RetryLimit", mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { task := peer.Task task.StorePeer(peer) @@ -447,44 +387,11 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) { gomock.InOrder( md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2), - ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ - Errordetails: &schedulerv2.AnnouncePeerResponse_SchedulePeerFailed{ - SchedulePeerFailed: &errordetailsv2.SchedulePeerFailed{ - Description: "send SchedulePeerFailed to peer, because of scheduling exceeded RetryLimit 2", - }, - }, - })).Return(errors.New("foo")).Times(1), ) }, - expect: func(t *testing.T, peer *resource.Peer) { - assert := assert.New(t) - assert.Equal(len(peer.Parents()), 0) - assert.True(peer.FSM.Is(resource.PeerStateRunning)) - assert.True(peer.Task.FSM.Is(resource.TaskStatePending)) - }, - }, - { - name: "schedule exceeds RetryLimit and send SchedulePeerFailed success", - mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { - task := peer.Task - task.StorePeer(peer) - peer.FSM.SetState(resource.PeerStateRunning) - peer.Task.BackToSourceLimit.Store(-1) - peer.StoreAnnouncePeerStream(stream) - - gomock.InOrder( - md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2), - ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{ - Errordetails: &schedulerv2.AnnouncePeerResponse_SchedulePeerFailed{ - SchedulePeerFailed: &errordetailsv2.SchedulePeerFailed{ - Description: "send SchedulePeerFailed to peer, because of scheduling exceeded RetryLimit 2", - }, - }, - })).Return(nil).Times(1), - ) - }, - expect: func(t *testing.T, peer *resource.Peer) { + expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) + assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "scheduling exceeded RetryLimit")) assert.Equal(len(peer.Parents()), 0) assert.True(peer.FSM.Is(resource.PeerStateRunning)) assert.True(peer.Task.FSM.Is(resource.TaskStatePending)) @@ -507,8 +414,9 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) { ma.Send(gomock.Any()).Return(nil).Times(1), ) }, - expect: func(t *testing.T, peer *resource.Peer) { + expect: func(t *testing.T, peer *resource.Peer, err error) { assert := assert.New(t) + assert.NoError(err) assert.Equal(len(peer.Parents()), 1) assert.True(peer.FSM.Is(resource.PeerStateRunning)) assert.True(peer.Task.FSM.Is(resource.TaskStatePending)) @@ -536,13 +444,12 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) { tc.mock(cancel, peer, seedPeer, blocklist, stream, stream.EXPECT(), dynconfig.EXPECT()) scheduling := New(mockSchedulerConfig, dynconfig, mockPluginDir) - scheduling.ScheduleCandidateParentsForNormalPeer(ctx, peer, blocklist) - tc.expect(t, peer) + tc.expect(t, peer, scheduling.ScheduleCandidateParents(ctx, peer, blocklist)) }) } } -func TestScheduling_ScheduleParentsForNormalPeer(t *testing.T) { +func TestScheduling_ScheduleParentAndCandidateParents(t *testing.T) { tests := []struct { name string mock func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, mr *schedulerv1mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) @@ -810,7 +717,7 @@ func TestScheduling_ScheduleParentsForNormalPeer(t *testing.T) { tc.mock(cancel, peer, seedPeer, blocklist, stream, stream.EXPECT(), dynconfig.EXPECT()) scheduling := New(mockSchedulerConfig, dynconfig, mockPluginDir) - scheduling.ScheduleParentsForNormalPeer(ctx, peer, blocklist) + scheduling.ScheduleParentAndCandidateParents(ctx, peer, blocklist) tc.expect(t, peer) }) } diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 3d367f450..f2ff4dd57 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -168,13 +168,14 @@ func (v *V1) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequ // ReportPieceResult handles the piece information reported by dfdaemon. func (v *V1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultServer) error { - ctx := stream.Context() + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() + var ( peer *resource.Peer initialized bool loaded bool ) - for { select { case <-ctx.Done(): @@ -327,7 +328,7 @@ func (v *V1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequ } task := resource.NewTask(taskID, req.Url, req.UrlMeta.Tag, req.UrlMeta.Application, types.TaskTypeV1ToV2(req.TaskType), - strings.Split(req.UrlMeta.Filter, idgen.URLFilterSeparator), req.UrlMeta.Header, int32(v.config.Scheduler.BackSourceCount), options...) + strings.Split(req.UrlMeta.Filter, idgen.URLFilterSeparator), req.UrlMeta.Header, int32(v.config.Scheduler.BackToSourceCount), options...) task, _ = v.resource.TaskManager().LoadOrStore(task) host := v.storeHost(ctx, req.PeerHost) peer := v.storePeer(ctx, peerID, req.UrlMeta.Priority, req.UrlMeta.Range, task, host) @@ -963,7 +964,7 @@ func (v *V1) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) { return } - v.scheduling.ScheduleParentsForNormalPeer(ctx, peer, set.NewSafeSet[string]()) + v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, set.NewSafeSet[string]()) default: } } @@ -1034,7 +1035,7 @@ func (v *V1) handlePieceFailure(ctx context.Context, peer *resource.Peer, piece if !loaded { peer.Log.Errorf("parent %s not found", piece.DstPid) peer.BlockParents.Add(piece.DstPid) - v.scheduling.ScheduleParentsForNormalPeer(ctx, peer, peer.BlockParents) + v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, peer.BlockParents) return } @@ -1093,7 +1094,7 @@ func (v *V1) handlePieceFailure(ctx context.Context, peer *resource.Peer, piece peer.Log.Infof("reschedule parent because of failed piece") peer.BlockParents.Add(parent.ID) - v.scheduling.ScheduleParentsForNormalPeer(ctx, peer, peer.BlockParents) + v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, peer.BlockParents) } // handlePeerSuccess handles successful peer. @@ -1135,7 +1136,7 @@ func (v *V1) handlePeerFailure(ctx context.Context, peer *resource.Peer) { // Reschedule a new parent to children of peer to exclude the current failed peer. for _, child := range peer.Children() { child.Log.Infof("reschedule parent because of parent peer %s is failed", peer.ID) - v.scheduling.ScheduleParentsForNormalPeer(ctx, child, child.BlockParents) + v.scheduling.ScheduleParentAndCandidateParents(ctx, child, child.BlockParents) } } @@ -1150,7 +1151,7 @@ func (v *V1) handleLegacySeedPeer(ctx context.Context, peer *resource.Peer) { // Reschedule a new parent to children of peer to exclude the current failed peer. for _, child := range peer.Children() { child.Log.Infof("reschedule parent because of parent peer %s is failed", peer.ID) - v.scheduling.ScheduleParentsForNormalPeer(ctx, child, child.BlockParents) + v.scheduling.ScheduleParentAndCandidateParents(ctx, child, child.BlockParents) } } diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 455be1758..7217a4188 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -178,7 +178,7 @@ var ( mockTaskBackToSourceLimit int32 = 200 mockTaskURL = "http://example.com/foo" - mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilters) + mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilters) mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4") mockTaskTag = "d7y" mockTaskApplication = "foo" @@ -3410,7 +3410,7 @@ func TestServiceV1_handleBeginOfPiece(t *testing.T) { name: "peer state is PeerStateReceivedNormal", mock: func(peer *resource.Peer, scheduling *mocks.MockSchedulingMockRecorder) { peer.FSM.SetState(resource.PeerStateReceivedNormal) - scheduling.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(peer), gomock.Eq(set.NewSafeSet[string]())).Return().Times(1) + scheduling.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(peer), gomock.Eq(set.NewSafeSet[string]())).Return().Times(1) }, expect: func(t *testing.T, peer *resource.Peer) { assert := assert.New(t) @@ -3631,7 +3631,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(parent.ID)).Return(nil, false).Times(1), - ms.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), + ms.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), ) svc.handlePieceFailure(context.Background(), peer, piece) @@ -3659,7 +3659,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1), - ms.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), + ms.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), ) svc.handlePieceFailure(context.Background(), peer, piece) @@ -3688,7 +3688,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1), - ms.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), + ms.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), ) svc.handlePieceFailure(context.Background(), peer, piece) @@ -3716,7 +3716,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1), - ms.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), + ms.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), ) svc.handlePieceFailure(context.Background(), peer, piece) @@ -3745,7 +3745,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1), - ms.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), + ms.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), ) svc.handlePieceFailure(context.Background(), peer, piece) @@ -3943,7 +3943,7 @@ func TestServiceV1_handlePeerFail(t *testing.T) { peer.FSM.SetState(resource.PeerStateRunning) child.FSM.SetState(resource.PeerStateRunning) - ms.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(child), gomock.Eq(set.NewSafeSet[string]())).Return().Times(1) + ms.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(child), gomock.Eq(set.NewSafeSet[string]())).Return().Times(1) }, expect: func(t *testing.T, peer *resource.Peer, child *resource.Peer) { assert := assert.New(t) diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 2336bc294..febcb4d96 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -19,6 +19,7 @@ package service import ( "context" "fmt" + "io" "time" "google.golang.org/grpc/codes" @@ -37,7 +38,6 @@ import ( "d7y.io/dragonfly/v2/scheduler/storage" ) -// TODO Implement v2 version of the service functions. // V2 is the interface for v2 version of the service. type V2 struct { // Resource interface. @@ -75,9 +75,121 @@ func NewV2( // AnnouncePeer announces peer to scheduler. func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error { + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() + + for { + select { + case <-ctx.Done(): + logger.Infof("context was done") + return ctx.Err() + default: + } + + req, err := stream.Recv() + if err != nil { + if err == io.EOF { + return nil + } + + logger.Errorf("receive error: %s", err.Error()) + return err + } + + logger := logger.WithPeer(req.HostId, req.TaskId, req.PeerId) + switch announcePeerRequest := req.GetRequest().(type) { + case *schedulerv2.AnnouncePeerRequest_RegisterPeerRequest: + logger.Infof("receive AnnouncePeerRequest_RegisterPeerRequest: %#v", announcePeerRequest.RegisterPeerRequest.Download) + if err := v.handleRegisterPeerRequest(req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterPeerRequest); err != nil { + logger.Error(err) + return err + } + case *schedulerv2.AnnouncePeerRequest_DownloadPeerStartedRequest: + logger.Infof("receive AnnouncePeerRequest_DownloadPeerStartedRequest: %#v", announcePeerRequest.DownloadPeerStartedRequest) + v.handleDownloadPeerStartedRequest(announcePeerRequest.DownloadPeerStartedRequest) + case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest: + logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceStartedRequest) + v.handleDownloadPeerBackToSourceStartedRequest(announcePeerRequest.DownloadPeerBackToSourceStartedRequest) + case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest: + logger.Infof("receive AnnouncePeerRequest_DownloadPeerFinishedRequest: %#v", announcePeerRequest.DownloadPeerFinishedRequest) + v.handleDownloadPeerFinishedRequest(announcePeerRequest.DownloadPeerFinishedRequest) + case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest: + logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceFinishedRequest) + v.handleDownloadPeerBackToSourceFinishedRequest(announcePeerRequest.DownloadPeerBackToSourceFinishedRequest) + case *schedulerv2.AnnouncePeerRequest_DownloadPieceFinishedRequest: + logger.Infof("receive AnnouncePeerRequest_DownloadPieceFinishedRequest: %#v", announcePeerRequest.DownloadPieceFinishedRequest) + v.handleDownloadPieceFinishedRequest(announcePeerRequest.DownloadPieceFinishedRequest) + case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest: + logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFinishedRequest) + v.handleDownloadPieceBackToSourceFinishedRequest(announcePeerRequest.DownloadPieceBackToSourceFinishedRequest) + case *schedulerv2.AnnouncePeerRequest_DownloadPieceFailedRequest: + logger.Infof("receive AnnouncePeerRequest_DownloadPieceFailedRequest: %#v", announcePeerRequest.DownloadPieceFailedRequest) + v.handleDownloadPieceFailedRequest(announcePeerRequest.DownloadPieceFailedRequest) + case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest: + logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFailedRequest) + v.handleDownloadPieceBackToSourceFailedRequest(announcePeerRequest.DownloadPieceBackToSourceFailedRequest) + case *schedulerv2.AnnouncePeerRequest_SyncPiecesFailedRequest: + logger.Infof("receive AnnouncePeerRequest_SyncPiecesFailedRequest: %#v", announcePeerRequest.SyncPiecesFailedRequest) + v.handleSyncPiecesFailedRequest(announcePeerRequest.SyncPiecesFailedRequest) + default: + msg := fmt.Sprintf("receive unknow request: %#v", announcePeerRequest) + logger.Error(msg) + return status.Error(codes.FailedPrecondition, msg) + } + } +} + +// TODO Implement function. +// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest. +func (v *V2) handleRegisterPeerRequest(hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error { return nil } +// TODO Implement function. +// handleDownloadPeerStartedRequest handles DownloadPeerStartedRequest of AnnouncePeerRequest. +func (v *V2) handleDownloadPeerStartedRequest(req *schedulerv2.DownloadPeerStartedRequest) { +} + +// TODO Implement function. +// handleDownloadPeerBackToSourceStartedRequest handles DownloadPeerBackToSourceStartedRequest of AnnouncePeerRequest. +func (v *V2) handleDownloadPeerBackToSourceStartedRequest(req *schedulerv2.DownloadPeerBackToSourceStartedRequest) { +} + +// TODO Implement function. +// handleDownloadPeerFinishedRequest handles DownloadPeerFinishedRequest of AnnouncePeerRequest. +func (v *V2) handleDownloadPeerFinishedRequest(req *schedulerv2.DownloadPeerFinishedRequest) { +} + +// TODO Implement function. +// handleDownloadPeerBackToSourceFinishedRequest handles DownloadPeerBackToSourceFinishedRequest of AnnouncePeerRequest. +func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(req *schedulerv2.DownloadPeerBackToSourceFinishedRequest) { +} + +// TODO Implement function. +// handleDownloadPieceFinishedRequest handles DownloadPieceFinishedRequest of AnnouncePeerRequest. +func (v *V2) handleDownloadPieceFinishedRequest(req *schedulerv2.DownloadPieceFinishedRequest) { +} + +// TODO Implement function. +// handleDownloadPieceBackToSourceFinishedRequest handles DownloadPieceBackToSourceFinishedRequest of AnnouncePeerRequest. +func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(req *schedulerv2.DownloadPieceBackToSourceFinishedRequest) { +} + +// TODO Implement function. +// handleDownloadPieceFailedRequest handles DownloadPieceFailedRequest of AnnouncePeerRequest. +func (v *V2) handleDownloadPieceFailedRequest(req *schedulerv2.DownloadPieceFailedRequest) { +} + +// TODO Implement function. +// handleDownloadPieceBackToSourceFailedRequest handles DownloadPieceBackToSourceFailedRequest of AnnouncePeerRequest. +func (v *V2) handleDownloadPieceBackToSourceFailedRequest(req *schedulerv2.DownloadPieceBackToSourceFailedRequest) { +} + +// TODO Implement function. +// handleSyncPiecesFailedRequest handles SyncPiecesFailedRequest of AnnouncePeerRequest. +func (v *V2) handleSyncPiecesFailedRequest(req *schedulerv2.SyncPiecesFailedRequest) { +} + // StatPeer checks information of peer. func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*commonv2.Peer, error) { logger.WithTaskID(req.TaskId).Infof("stat peer request: %#v", req)