feat: add reschedule handler for schduler v2 (#2882)
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
86ce09f53a
commit
4d4ea8dda9
2
go.mod
2
go.mod
|
|
@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
|
|||
go 1.21
|
||||
|
||||
require (
|
||||
d7y.io/api/v2 v2.0.45
|
||||
d7y.io/api/v2 v2.0.46
|
||||
github.com/MysteriousPotato/go-lockable v1.0.0
|
||||
github.com/RichardKnop/machinery v1.10.6
|
||||
github.com/Showmax/go-fqdn v1.0.0
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
|
|||
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
||||
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
||||
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
|
||||
d7y.io/api/v2 v2.0.45 h1:a39URUlu6SpkFeeGxDTnl9QQTn4bHaEdex1ARpZfmAo=
|
||||
d7y.io/api/v2 v2.0.45/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
|
||||
d7y.io/api/v2 v2.0.46 h1:oPPjp3eKUDAWX9VnCdKG3Mpdwdp57a4gRfnLAZHyMiw=
|
||||
d7y.io/api/v2 v2.0.46/go.mod h1:yeVjEpNTQB4vEqnTxtdzLizDzsICcBzq3zTIyhQJF5E=
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
|
|
|
|||
|
|
@ -107,10 +107,11 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resourc
|
|||
// Send NeedBackToSourceResponse to peer.
|
||||
peer.Log.Infof("send NeedBackToSourceResponse, because of peer's NeedBackToSource is %t and peer's schedule count is %d",
|
||||
peer.NeedBackToSource.Load(), peer.ScheduleCount.Load())
|
||||
description := fmt.Sprintf("peer's NeedBackToSource is %t", peer.NeedBackToSource.Load())
|
||||
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
|
||||
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
|
||||
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
|
||||
Description: fmt.Sprintf("peer's NeedBackToSource is %t", peer.NeedBackToSource.Load()),
|
||||
Description: &description,
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
|
|
@ -132,10 +133,11 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resourc
|
|||
|
||||
// Send NeedBackToSourceResponse to peer.
|
||||
peer.Log.Infof("send NeedBackToSourceResponse, because of scheduling exceeded RetryBackToSourceLimit %d", s.config.RetryBackToSourceLimit)
|
||||
description := "scheduling exceeded RetryBackToSourceLimit"
|
||||
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
|
||||
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
|
||||
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
|
||||
Description: "scheduling exceeded RetryBackToSourceLimit",
|
||||
Description: &description,
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
|
|
|
|||
|
|
@ -232,6 +232,9 @@ func TestScheduling_New(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestScheduling_ScheduleCandidateParents(t *testing.T) {
|
||||
needBackToSourceDescription := "peer's NeedBackToSource is true"
|
||||
exceededLimitDescription := "scheduling exceeded RetryBackToSourceLimit"
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
mock func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
|
||||
|
|
@ -294,7 +297,7 @@ func TestScheduling_ScheduleCandidateParents(t *testing.T) {
|
|||
ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
|
||||
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
|
||||
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
|
||||
Description: "peer's NeedBackToSource is true",
|
||||
Description: &needBackToSourceDescription,
|
||||
},
|
||||
},
|
||||
})).Return(errors.New("foo")).Times(1)
|
||||
|
|
@ -319,7 +322,7 @@ func TestScheduling_ScheduleCandidateParents(t *testing.T) {
|
|||
ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
|
||||
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
|
||||
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
|
||||
Description: "peer's NeedBackToSource is true",
|
||||
Description: &needBackToSourceDescription,
|
||||
},
|
||||
},
|
||||
})).Return(nil).Times(1)
|
||||
|
|
@ -361,7 +364,7 @@ func TestScheduling_ScheduleCandidateParents(t *testing.T) {
|
|||
ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
|
||||
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
|
||||
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
|
||||
Description: "scheduling exceeded RetryBackToSourceLimit",
|
||||
Description: &exceededLimitDescription,
|
||||
},
|
||||
},
|
||||
})).Return(errors.New("foo")).Times(1),
|
||||
|
|
@ -388,7 +391,7 @@ func TestScheduling_ScheduleCandidateParents(t *testing.T) {
|
|||
ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
|
||||
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
|
||||
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
|
||||
Description: "scheduling exceeded RetryBackToSourceLimit",
|
||||
Description: &exceededLimitDescription,
|
||||
},
|
||||
},
|
||||
})).Return(nil).Times(1),
|
||||
|
|
|
|||
|
|
@ -132,6 +132,12 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
|
|||
logger.Error(err)
|
||||
return err
|
||||
}
|
||||
case *schedulerv2.AnnouncePeerRequest_RescheduleRequest:
|
||||
logger.Infof("receive AnnouncePeerRequest_RescheduleRequest: %#v", announcePeerRequest.RescheduleRequest)
|
||||
if err := v.handleRescheduleRequest(ctx, req.GetPeerId()); err != nil {
|
||||
logger.Error(err)
|
||||
return err
|
||||
}
|
||||
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
|
||||
logger.Infof("receive AnnouncePeerRequest_DownloadPeerFinishedRequest: %#v", announcePeerRequest.DownloadPeerFinishedRequest)
|
||||
if err := v.handleDownloadPeerFinishedRequest(ctx, req.GetPeerId()); err != nil {
|
||||
|
|
@ -949,6 +955,20 @@ func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, p
|
|||
return nil
|
||||
}
|
||||
|
||||
// handleRescheduleRequest handles RescheduleRequest of AnnouncePeerRequest.
|
||||
func (v *V2) handleRescheduleRequest(ctx context.Context, peerID string) error {
|
||||
peer, loaded := v.resource.PeerManager().Load(peerID)
|
||||
if !loaded {
|
||||
return status.Errorf(codes.NotFound, "peer %s not found", peerID)
|
||||
}
|
||||
|
||||
if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil {
|
||||
return status.Error(codes.FailedPrecondition, err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleDownloadPeerFinishedRequest handles DownloadPeerFinishedRequest of AnnouncePeerRequest.
|
||||
func (v *V2) handleDownloadPeerFinishedRequest(ctx context.Context, peerID string) error {
|
||||
peer, loaded := v.resource.PeerManager().Load(peerID)
|
||||
|
|
@ -1192,10 +1212,6 @@ func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, peerID string
|
|||
// Handle peer with piece temporary failed request.
|
||||
peer.UpdatedAt.Store(time.Now())
|
||||
peer.BlockParents.Add(req.Piece.GetParentId())
|
||||
if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil {
|
||||
return status.Error(codes.FailedPrecondition, err.Error())
|
||||
}
|
||||
|
||||
if parent, loaded := v.resource.PeerManager().Load(req.Piece.GetParentId()); loaded {
|
||||
parent.Host.UploadFailedCount.Inc()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1104,7 +1104,6 @@ func TestServiceV2_SyncProbes(t *testing.T) {
|
|||
Disk: mockV2Probe.Host.Disk,
|
||||
Build: mockV2Probe.Host.Build,
|
||||
},
|
||||
Description: "foo",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
@ -2130,6 +2129,78 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestServiceV2_handleRescheduleRequest(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
run func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
|
||||
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder)
|
||||
}{
|
||||
{
|
||||
name: "peer can not be loaded",
|
||||
run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
|
||||
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
||||
gomock.InOrder(
|
||||
mr.PeerManager().Return(peerManager).Times(1),
|
||||
mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
|
||||
)
|
||||
|
||||
assert := assert.New(t)
|
||||
assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID), status.Errorf(codes.NotFound, "peer %s not found", peer.ID))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reschedule failed",
|
||||
run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
|
||||
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
||||
gomock.InOrder(
|
||||
mr.PeerManager().Return(peerManager).Times(1),
|
||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
||||
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1),
|
||||
)
|
||||
|
||||
assert := assert.New(t)
|
||||
assert.ErrorIs(svc.handleRescheduleRequest(context.Background(), peer.ID), status.Error(codes.FailedPrecondition, "foo"))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reschedule succeeded",
|
||||
run: func(t *testing.T, svc *V2, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
|
||||
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
||||
gomock.InOrder(
|
||||
mr.PeerManager().Return(peerManager).Times(1),
|
||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
||||
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
|
||||
)
|
||||
|
||||
assert := assert.New(t)
|
||||
assert.NoError(svc.handleRescheduleRequest(context.Background(), peer.ID))
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctl := gomock.NewController(t)
|
||||
defer ctl.Finish()
|
||||
scheduling := schedulingmocks.NewMockScheduling(ctl)
|
||||
res := resource.NewMockResource(ctl)
|
||||
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
|
||||
storage := storagemocks.NewMockStorage(ctl)
|
||||
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
|
||||
peerManager := resource.NewMockPeerManager(ctl)
|
||||
|
||||
mockHost := resource.NewHost(
|
||||
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
|
||||
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
|
||||
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
|
||||
peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
|
||||
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
|
||||
|
||||
tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
@ -2887,7 +2958,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
|
|||
name string
|
||||
req *schedulerv2.DownloadPieceFailedRequest
|
||||
run func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
|
||||
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder)
|
||||
mp *resource.MockPeerManagerMockRecorder)
|
||||
}{
|
||||
{
|
||||
name: "peer can not be loaded",
|
||||
|
|
@ -2898,7 +2969,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
|
|||
Temporary: true,
|
||||
},
|
||||
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
|
||||
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
||||
mp *resource.MockPeerManagerMockRecorder) {
|
||||
gomock.InOrder(
|
||||
mr.PeerManager().Return(peerManager).Times(1),
|
||||
mp.Load(gomock.Eq(peer.ID)).Return(nil, false).Times(1),
|
||||
|
|
@ -2917,7 +2988,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
|
|||
Temporary: false,
|
||||
},
|
||||
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
|
||||
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
||||
mp *resource.MockPeerManagerMockRecorder) {
|
||||
gomock.InOrder(
|
||||
mr.PeerManager().Return(peerManager).Times(1),
|
||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
||||
|
|
@ -2927,28 +2998,6 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
|
|||
assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.FailedPrecondition, "download piece failed"))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "schedule failed",
|
||||
req: &schedulerv2.DownloadPieceFailedRequest{
|
||||
Piece: &commonv2.Piece{
|
||||
ParentId: &mockSeedPeerID,
|
||||
},
|
||||
Temporary: true,
|
||||
},
|
||||
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
|
||||
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
||||
gomock.InOrder(
|
||||
mr.PeerManager().Return(peerManager).Times(1),
|
||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
||||
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("foo")).Times(1),
|
||||
)
|
||||
|
||||
assert := assert.New(t)
|
||||
assert.ErrorIs(svc.handleDownloadPieceFailedRequest(context.Background(), peer.ID, req), status.Error(codes.FailedPrecondition, "foo"))
|
||||
assert.NotEqual(peer.UpdatedAt.Load(), 0)
|
||||
assert.True(peer.BlockParents.Contains(req.Piece.GetParentId()))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "parent can not be loaded",
|
||||
req: &schedulerv2.DownloadPieceFailedRequest{
|
||||
|
|
@ -2958,11 +3007,10 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
|
|||
Temporary: true,
|
||||
},
|
||||
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
|
||||
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
||||
mp *resource.MockPeerManagerMockRecorder) {
|
||||
gomock.InOrder(
|
||||
mr.PeerManager().Return(peerManager).Times(1),
|
||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
||||
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
|
||||
mr.PeerManager().Return(peerManager).Times(1),
|
||||
mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(nil, false).Times(1),
|
||||
)
|
||||
|
|
@ -2983,11 +3031,10 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
|
|||
Temporary: true,
|
||||
},
|
||||
run: func(t *testing.T, svc *V2, req *schedulerv2.DownloadPieceFailedRequest, peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder,
|
||||
mp *resource.MockPeerManagerMockRecorder, ms *schedulingmocks.MockSchedulingMockRecorder) {
|
||||
mp *resource.MockPeerManagerMockRecorder) {
|
||||
gomock.InOrder(
|
||||
mr.PeerManager().Return(peerManager).Times(1),
|
||||
mp.Load(gomock.Eq(peer.ID)).Return(peer, true).Times(1),
|
||||
ms.ScheduleCandidateParents(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1),
|
||||
mr.PeerManager().Return(peerManager).Times(1),
|
||||
mp.Load(gomock.Eq(req.Piece.GetParentId())).Return(peer, true).Times(1),
|
||||
)
|
||||
|
|
@ -3020,7 +3067,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
|
|||
peer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost)
|
||||
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
|
||||
|
||||
tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT())
|
||||
tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue