feat: change ScheduleCandidateParentsForNormalPeer implement (#2133)

Change ScheduleCandidateParentsForNormalPeer implement.
Remove TaskStateLeave event in task GC.
Implement AnnouncePeer in service v2.

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-03-02 17:44:48 +08:00
parent f5aff5e27f
commit b0e6089783
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
16 changed files with 255 additions and 282 deletions

2
go.mod
View File

@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.20
require (
d7y.io/api v1.6.4
d7y.io/api v1.6.8
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
github.com/VividCortex/mysqlerr v1.0.0

4
go.sum
View File

@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api v1.6.4 h1:h+fe8/EXAdP6lqyg0OAgbbn2JoMhe2EdbSbUHHRd9eE=
d7y.io/api v1.6.4/go.mod h1:7G3t9YO5esDzQVUgdUrS+6yCDAMWS5c9ux8yX5L9Ync=
d7y.io/api v1.6.8 h1:/oNEZC8FC8P1vPHlzgtJbBQzh5lnf0mZ+9VBx/Nq+iU=
d7y.io/api v1.6.8/go.mod h1:LgmoxxoRDzBiseGFxNWqQP5qsro8+lhYSGwR+/Chplw=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=

View File

@ -17,6 +17,7 @@
package idgen
import (
"fmt"
"strings"
commonv1 "d7y.io/api/pkg/apis/common/v1"
@ -91,11 +92,11 @@ func parseFilters(rawFilters string) []string {
}
// TaskIDV2 generates v2 version of task id.
func TaskIDV2(url, digest, tag, application string, filters []string) string {
func TaskIDV2(url, digest, tag, application string, pieceLength int32, filters []string) string {
url, err := neturl.FilterQuery(url, filters)
if err != nil {
url = ""
}
return pkgdigest.SHA256FromStrings(url, digest, tag, application)
return pkgdigest.SHA256FromStrings(url, digest, tag, application, fmt.Sprint(pieceLength))
}

View File

@ -113,6 +113,7 @@ func TestTaskIDV2(t *testing.T) {
digest string
tag string
application string
pieceLength int32
filters []string
expect func(t *testing.T, d any)
}{
@ -122,10 +123,11 @@ func TestTaskIDV2(t *testing.T) {
digest: "sha256:c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4",
tag: "foo",
application: "bar",
pieceLength: 1,
filters: []string{},
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "c8659b8372599cf22c7a2de260dd6e148fca6d4e1c2940703022867f739d071d")
assert.Equal(d, "6acf73532a2e7b8c30dfc7abce2fd7d2a2cd3746f16b0d54d3e2f136ffa61c90")
},
},
{
@ -134,7 +136,7 @@ func TestTaskIDV2(t *testing.T) {
digest: "sha256:c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4",
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "60469c583429af631a45540f05e08805b31ca4f84e7974cad35cfc84c197bcf8")
assert.Equal(d, "b08a435da662ad5ae8ab8359a9c4ebd5027cf14d04b71ccc85f1e197e898adbd")
},
},
{
@ -143,7 +145,7 @@ func TestTaskIDV2(t *testing.T) {
tag: "foo",
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "2773851c628744fb7933003195db436ce397c1722920696c4274ff804d86920b")
assert.Equal(d, "274c3716c538b5a49e7296ee36dd412bae29948dfb6153e5ac9694e382144f83")
},
},
{
@ -152,7 +154,16 @@ func TestTaskIDV2(t *testing.T) {
application: "bar",
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "63dee2822037636b0109876b58e95692233840753a882afa69b9b5ee82a6c57d")
assert.Equal(d, "ca12c6591c38f726c238f35d9c7945559b52a0dcc10ae191920be6f5f8a0326a")
},
},
{
name: "generate taskID with pieceLength",
url: "https://example.com",
pieceLength: 1,
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "614fb0088e7d82b2538f1ccb5861db5940aaa665b587792898e4be1f591bafec")
},
},
{
@ -161,14 +172,14 @@ func TestTaskIDV2(t *testing.T) {
filters: []string{"foo", "bar"},
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "100680ad546ce6a577f42f52df33b4cfdca756859e664b8d7de329b150d09ce9")
assert.Equal(d, "4a89bbe790108d4987e7dc5127df2b99aea1c17828f1ff3e55176f49ac974b28")
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, TaskIDV2(tc.url, tc.digest, tc.tag, tc.application, tc.filters))
tc.expect(t, TaskIDV2(tc.url, tc.digest, tc.tag, tc.application, tc.pieceLength, tc.filters))
})
}
}

View File

@ -46,19 +46,19 @@ var (
// Variables declared for metrics.
var (
AnnouncePeerCount = promauto.NewCounterVec(prometheus.CounterOpts{
AnnouncePeerCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peer_total",
Help: "Counter of the number of the announcing peer.",
}, []string{"tag", "app"})
})
AnnouncePeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
AnnouncePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peer_failure_total",
Help: "Counter of the number of failed of the announcing peer.",
}, []string{"tag", "app"})
})
StatPeerCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,

View File

@ -19,7 +19,6 @@
package resource
import (
"context"
"sync"
pkggc "d7y.io/dragonfly/v2/pkg/gc"
@ -112,21 +111,10 @@ func (t *taskManager) RunGC() error {
return true
}
// If task state is TaskStateLeave, it will be reclaimed.
if task.FSM.Is(TaskStateLeave) {
// If there is no peer then task will be reclaimed.
if task.PeerCount() == 0 {
task.Log.Info("task has been reclaimed")
t.Delete(task.ID)
return true
}
// If there is no peer then switch the task state to TaskStateLeave.
if task.PeerCount() == 0 {
if err := task.FSM.Event(context.Background(), TaskEventLeave); err != nil {
task.Log.Errorf("task fsm event failed: %s", err.Error())
return true
}
task.Log.Info("task peer count is zero, causing the task to leave")
}
return true

View File

@ -315,14 +315,7 @@ func TestTaskManager_RunGC(t *testing.T) {
taskManager.Store(mockTask)
err := taskManager.RunGC()
assert.NoError(err)
task, loaded := taskManager.Load(mockTask.ID)
assert.Equal(loaded, true)
assert.Equal(task.FSM.Current(), TaskStateLeave)
err = taskManager.RunGC()
assert.NoError(err)
_, loaded = taskManager.Load(mockTask.ID)
_, loaded := taskManager.Load(mockTask.ID)
assert.Equal(loaded, false)
},
},

View File

@ -51,7 +51,7 @@ var (
mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilters)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilters)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"

View File

@ -52,6 +52,12 @@ func newSchedulerServerV2(
// AnnouncePeer announces peer to scheduler.
func (s *schedulerServerV2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error {
metrics.AnnouncePeerCount.Inc()
if err := s.service.AnnouncePeer(stream); err != nil {
metrics.AnnouncePeerFailureCount.Inc()
return err
}
return nil
}

View File

@ -131,7 +131,7 @@ var (
mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilters)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilters)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"

View File

@ -51,26 +51,28 @@ func (mr *MockSchedulingMockRecorder) FindCandidateParents(arg0, arg1, arg2 inte
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindCandidateParents", reflect.TypeOf((*MockScheduling)(nil).FindCandidateParents), arg0, arg1, arg2)
}
// ScheduleCandidateParentsForNormalPeer mocks base method.
func (m *MockScheduling) ScheduleCandidateParentsForNormalPeer(arg0 context.Context, arg1 *resource.Peer, arg2 set.SafeSet[string]) {
// ScheduleCandidateParents mocks base method.
func (m *MockScheduling) ScheduleCandidateParents(arg0 context.Context, arg1 *resource.Peer, arg2 set.SafeSet[string]) error {
m.ctrl.T.Helper()
m.ctrl.Call(m, "ScheduleCandidateParentsForNormalPeer", arg0, arg1, arg2)
ret := m.ctrl.Call(m, "ScheduleCandidateParents", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// ScheduleCandidateParentsForNormalPeer indicates an expected call of ScheduleCandidateParentsForNormalPeer.
func (mr *MockSchedulingMockRecorder) ScheduleCandidateParentsForNormalPeer(arg0, arg1, arg2 interface{}) *gomock.Call {
// ScheduleCandidateParents indicates an expected call of ScheduleCandidateParents.
func (mr *MockSchedulingMockRecorder) ScheduleCandidateParents(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleCandidateParentsForNormalPeer", reflect.TypeOf((*MockScheduling)(nil).ScheduleCandidateParentsForNormalPeer), arg0, arg1, arg2)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleCandidateParents", reflect.TypeOf((*MockScheduling)(nil).ScheduleCandidateParents), arg0, arg1, arg2)
}
// ScheduleParentsForNormalPeer mocks base method.
func (m *MockScheduling) ScheduleParentsForNormalPeer(arg0 context.Context, arg1 *resource.Peer, arg2 set.SafeSet[string]) {
// ScheduleParentAndCandidateParents mocks base method.
func (m *MockScheduling) ScheduleParentAndCandidateParents(arg0 context.Context, arg1 *resource.Peer, arg2 set.SafeSet[string]) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "ScheduleParentsForNormalPeer", arg0, arg1, arg2)
m.ctrl.Call(m, "ScheduleParentAndCandidateParents", arg0, arg1, arg2)
}
// ScheduleParentsForNormalPeer indicates an expected call of ScheduleParentsForNormalPeer.
func (mr *MockSchedulingMockRecorder) ScheduleParentsForNormalPeer(arg0, arg1, arg2 interface{}) *gomock.Call {
// ScheduleParentAndCandidateParents indicates an expected call of ScheduleParentAndCandidateParents.
func (mr *MockSchedulingMockRecorder) ScheduleParentAndCandidateParents(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleParentsForNormalPeer", reflect.TypeOf((*MockScheduling)(nil).ScheduleParentsForNormalPeer), arg0, arg1, arg2)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleParentAndCandidateParents", reflect.TypeOf((*MockScheduling)(nil).ScheduleParentAndCandidateParents), arg0, arg1, arg2)
}

View File

@ -24,12 +24,13 @@ import (
"sort"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
commonv1 "d7y.io/api/pkg/apis/common/v1"
commonv2 "d7y.io/api/pkg/apis/common/v2"
errordetailsv2 "d7y.io/api/pkg/apis/errordetails/v2"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2"
@ -41,13 +42,13 @@ import (
)
type Scheduling interface {
// ScheduleCandidateParentsForNormalPeer schedules candidate parents to the normal peer.
// ScheduleCandidateParents schedules candidate parents to the normal peer.
// Used only in v2 version of the grpc.
ScheduleCandidateParentsForNormalPeer(context.Context, *resource.Peer, set.SafeSet[string])
ScheduleCandidateParents(context.Context, *resource.Peer, set.SafeSet[string]) error
// ScheduleParentAndCandiateParentsForNormalPeer schedules a parent and candidate parents to the normal peer.
// ScheduleParentAndCandidateParents schedules a parent and candidate parents to the normal peer.
// Used only in v1 version of the grpc.
ScheduleParentsForNormalPeer(context.Context, *resource.Peer, set.SafeSet[string])
ScheduleParentAndCandidateParents(context.Context, *resource.Peer, set.SafeSet[string])
// FindCandidateParents finds candidate parents for the peer.
FindCandidateParents(context.Context, *resource.Peer, set.SafeSet[string]) ([]*resource.Peer, bool)
@ -72,15 +73,15 @@ func New(cfg *config.SchedulerConfig, dynconfig config.DynconfigInterface, plugi
}
}
// ScheduleCandidateParentsForNormalPeer schedules candidate parents to the normal peer.
// ScheduleCandidateParents schedules candidate parents to the normal peer.
// Used only in v2 version of the grpc.
func (s *scheduling) ScheduleCandidateParentsForNormalPeer(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet[string]) {
func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet[string]) error {
var n int
for {
select {
case <-ctx.Done():
peer.Log.Infof("context was done")
return
return ctx.Err()
default:
}
@ -95,38 +96,29 @@ func (s *scheduling) ScheduleCandidateParentsForNormalPeer(ctx context.Context,
stream, loaded := peer.LoadAnnouncePeerStream()
if !loaded {
peer.Log.Error("load stream failed")
return
return status.Error(codes.FailedPrecondition, "load stream failed")
}
// Send NeedBackToSourceResponse to peer.
reason := fmt.Sprintf("send NeedBackToSourceResponse to peer, because of peer's NeedBackToSource is %t", peer.NeedBackToSource.Load())
peer.Log.Infof("send NeedBackToSourceResponse, because of peer's NeedBackToSource is %t", peer.NeedBackToSource.Load())
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
Reason: reason,
Description: fmt.Sprintf("peer's NeedBackToSource is %t", peer.NeedBackToSource.Load()),
},
},
}); err != nil {
peer.Log.Error(err)
return
return status.Error(codes.FailedPrecondition, err.Error())
}
peer.Log.Info(reason)
if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return
msg := fmt.Sprintf("peer fsm event failed: %s", err.Error())
peer.Log.Error(msg)
return status.Error(codes.Internal, err.Error())
}
// If the task state is TaskStateFailed,
// peer back-to-source and reset task state to TaskStateRunning.
if peer.Task.FSM.Is(resource.TaskStateFailed) {
if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownload); err != nil {
peer.Task.Log.Errorf("task fsm event failed: %s", err.Error())
return
}
}
return
return nil
}
// Check condition 2:
@ -135,78 +127,46 @@ func (s *scheduling) ScheduleCandidateParentsForNormalPeer(ctx context.Context,
stream, loaded := peer.LoadAnnouncePeerStream()
if !loaded {
peer.Log.Error("load stream failed")
return
return status.Error(codes.FailedPrecondition, "load stream failed")
}
// Send NeedBackToSourceResponse to peer.
reason := fmt.Sprintf("send NeedBackToSourceResponse to peer, because of scheduling exceeded RetryBackToSourceLimit %d", s.config.RetryBackToSourceLimit)
peer.Log.Infof("send NeedBackToSourceResponse, because of scheduling exceeded RetryBackToSourceLimit %d", s.config.RetryBackToSourceLimit)
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
Reason: reason,
Description: "scheduling exceeded RetryBackToSourceLimit",
},
},
}); err != nil {
peer.Log.Error(err)
return
return status.Error(codes.FailedPrecondition, err.Error())
}
peer.Log.Info(reason)
if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return
msg := fmt.Sprintf("peer fsm event failed: %s", err.Error())
peer.Log.Error(msg)
return status.Error(codes.Internal, err.Error())
}
// If the task state is TaskStateFailed,
// peer back-to-source and reset task state to TaskStateRunning.
if peer.Task.FSM.Is(resource.TaskStateFailed) {
if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownload); err != nil {
peer.Task.Log.Errorf("task fsm event failed: %s", err.Error())
return
return nil
}
}
return
}
}
// Scheduling will send SchedulePeerFailed to peer.
// Scheduling will return schedule failed.
//
// Condition 1: Scheduling exceeds the RetryLimit.
if n >= s.config.RetryLimit {
stream, loaded := peer.LoadAnnouncePeerStream()
if !loaded {
peer.Log.Error("load stream failed")
return
}
// Send SchedulePeerFailed to peer.
reason := fmt.Sprintf("send SchedulePeerFailed to peer, because of scheduling exceeded RetryLimit %d", s.config.RetryLimit)
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
Errordetails: &schedulerv2.AnnouncePeerResponse_SchedulePeerFailed{
SchedulePeerFailed: &errordetailsv2.SchedulePeerFailed{
Description: reason,
},
},
}); err != nil {
peer.Log.Error(err)
return
}
peer.Log.Error(reason)
return
peer.Log.Errorf("scheduling failed, because of scheduling exceeded RetryLimit %d", s.config.RetryLimit)
return status.Error(codes.FailedPrecondition, "scheduling exceeded RetryLimit")
}
// Scheduling will send NormalTaskResponse to peer.
//
// Condition 1: Scheduling can find candidate parents.
if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil {
n++
peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error())
// Sleep to avoid hot looping.
time.Sleep(s.config.RetryInterval)
continue
peer.Log.Error(err)
return status.Error(codes.Internal, err.Error())
}
// Find candidate parents.
@ -223,49 +183,41 @@ func (s *scheduling) ScheduleCandidateParentsForNormalPeer(ctx context.Context,
// Load AnnouncePeerStream from peer.
stream, loaded := peer.LoadAnnouncePeerStream()
if !loaded {
n++
peer.Log.Errorf("scheduling failed in %d times, because of loading AnnouncePeerStream failed", n)
if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil {
peer.Log.Errorf("peer deletes inedges failed: %s", err.Error())
return
msg := fmt.Sprintf("peer deletes inedges failed: %s", err.Error())
peer.Log.Error(msg)
return status.Error(codes.Internal, msg)
}
return
peer.Log.Error("load stream failed")
return status.Error(codes.FailedPrecondition, "load stream failed")
}
// Send NormalTaskResponse to peer.
peer.Log.Info("send NormalTaskResponse to peer")
peer.Log.Info("send NormalTaskResponse")
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
Response: constructSuccessNormalTaskResponse(s.dynconfig, candidateParents),
}); err != nil {
n++
peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error())
if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil {
peer.Log.Errorf("peer deletes inedges failed: %s", err.Error())
return
}
return
peer.Log.Error(err)
return status.Error(codes.FailedPrecondition, err.Error())
}
// Add edge from parent to peer.
for _, candidateParent := range candidateParents {
if err := peer.Task.AddPeerEdge(candidateParent, peer); err != nil {
peer.Log.Debugf("peer adds edge failed: %s", err.Error())
peer.Log.Warnf("peer adds edge failed: %s", err.Error())
continue
}
}
peer.Log.Infof("scheduling success in %d times", n+1)
return
return nil
}
}
// ScheduleParentsForNormalPeer schedules a parent and candidate parents to a peer.
// ScheduleParentAndCandidateParents schedules a parent and candidate parents to a peer.
// Used only in v1 version of the grpc.
func (s *scheduling) ScheduleParentsForNormalPeer(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet[string]) {
func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet[string]) {
var n int
for {
select {

View File

@ -28,12 +28,13 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
commonv1 "d7y.io/api/pkg/apis/common/v1"
commonv2 "d7y.io/api/pkg/apis/common/v2"
errordetailsv2 "d7y.io/api/pkg/apis/errordetails/v2"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
schedulerv1mocks "d7y.io/api/pkg/apis/scheduler/v1/mocks"
schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2"
@ -159,7 +160,7 @@ var (
mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilters)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilters)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
@ -220,11 +221,11 @@ func TestScheduling_New(t *testing.T) {
}
}
func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) {
func TestScheduling_ScheduleCandidateParents(t *testing.T) {
tests := []struct {
name string
mock func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
expect func(t *testing.T, peer *resource.Peer)
expect func(t *testing.T, peer *resource.Peer, err error)
}{
{
name: "context was done",
@ -232,8 +233,9 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) {
peer.FSM.SetState(resource.PeerStateRunning)
cancel()
},
expect: func(t *testing.T, peer *resource.Peer) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.ErrorIs(err, context.Canceled)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
},
@ -246,8 +248,9 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) {
peer.NeedBackToSource.Store(true)
peer.FSM.SetState(resource.PeerStateRunning)
},
expect: func(t *testing.T, peer *resource.Peer) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "load stream failed"))
assert.Equal(len(peer.Parents()), 0)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
@ -265,13 +268,14 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) {
ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
Reason: "send NeedBackToSourceResponse to peer, because of peer's NeedBackToSource is true",
Description: "peer's NeedBackToSource is true",
},
},
})).Return(errors.New("foo")).Times(1)
},
expect: func(t *testing.T, peer *resource.Peer) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "foo"))
assert.Equal(len(peer.Parents()), 0)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
@ -289,43 +293,19 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) {
ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
Reason: "send NeedBackToSourceResponse to peer, because of peer's NeedBackToSource is true",
Description: "peer's NeedBackToSource is true",
},
},
})).Return(nil).Times(1)
},
expect: func(t *testing.T, peer *resource.Peer) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.NoError(err)
assert.Equal(len(peer.Parents()), 0)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
},
},
{
name: "peer needs back-to-source and task state is TaskStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
peer.NeedBackToSource.Store(true)
peer.FSM.SetState(resource.PeerStateRunning)
task.FSM.SetState(resource.TaskStateFailed)
peer.StoreAnnouncePeerStream(stream)
ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
Reason: "send NeedBackToSourceResponse to peer, because of peer's NeedBackToSource is true",
},
},
})).Return(nil).Times(1)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
assert.Equal(len(peer.Parents()), 0)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStateRunning))
},
},
{
name: "schedule exceeds RetryBackToSourceLimit and peer stream load failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
@ -334,8 +314,9 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) {
peer.FSM.SetState(resource.PeerStateRunning)
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1)
},
expect: func(t *testing.T, peer *resource.Peer) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "load stream failed"))
assert.Equal(len(peer.Parents()), 0)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
@ -354,14 +335,15 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) {
ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
Reason: "send NeedBackToSourceResponse to peer, because of scheduling exceeded RetryBackToSourceLimit 1",
Description: "scheduling exceeded RetryBackToSourceLimit",
},
},
})).Return(errors.New("foo")).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "foo"))
assert.Equal(len(peer.Parents()), 0)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
@ -380,64 +362,22 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) {
ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
Reason: "send NeedBackToSourceResponse to peer, because of scheduling exceeded RetryBackToSourceLimit 1",
Description: "scheduling exceeded RetryBackToSourceLimit",
},
},
})).Return(nil).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.NoError(err)
assert.Equal(len(peer.Parents()), 0)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
},
},
{
name: "schedule exceeds RetryBackToSourceLimit and task state is TaskStateFailed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning)
task.FSM.SetState(resource.TaskStateFailed)
peer.StoreAnnouncePeerStream(stream)
gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1),
ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
Response: &schedulerv2.AnnouncePeerResponse_NeedBackToSourceResponse{
NeedBackToSourceResponse: &schedulerv2.NeedBackToSourceResponse{
Reason: "send NeedBackToSourceResponse to peer, because of scheduling exceeded RetryBackToSourceLimit 1",
},
},
})).Return(nil).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
assert.Equal(len(peer.Parents()), 0)
assert.True(peer.FSM.Is(resource.PeerStateBackToSource))
assert.True(peer.Task.FSM.Is(resource.TaskStateRunning))
},
},
{
name: "schedule exceeds RetryLimit and peer stream load failed",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning)
peer.Task.BackToSourceLimit.Store(-1)
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
assert.Equal(len(peer.Parents()), 0)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
},
},
{
name: "schedule exceeds RetryLimit and send SchedulePeerFailed failed",
name: "schedule exceeds RetryLimit",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
@ -447,44 +387,11 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) {
gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2),
ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
Errordetails: &schedulerv2.AnnouncePeerResponse_SchedulePeerFailed{
SchedulePeerFailed: &errordetailsv2.SchedulePeerFailed{
Description: "send SchedulePeerFailed to peer, because of scheduling exceeded RetryLimit 2",
},
},
})).Return(errors.New("foo")).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
assert.Equal(len(peer.Parents()), 0)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
},
},
{
name: "schedule exceeds RetryLimit and send SchedulePeerFailed success",
mock: func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv2.Scheduler_AnnouncePeerServer, ma *schedulerv2mocks.MockScheduler_AnnouncePeerServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
task := peer.Task
task.StorePeer(peer)
peer.FSM.SetState(resource.PeerStateRunning)
peer.Task.BackToSourceLimit.Store(-1)
peer.StoreAnnouncePeerStream(stream)
gomock.InOrder(
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(2),
ma.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{
Errordetails: &schedulerv2.AnnouncePeerResponse_SchedulePeerFailed{
SchedulePeerFailed: &errordetailsv2.SchedulePeerFailed{
Description: "send SchedulePeerFailed to peer, because of scheduling exceeded RetryLimit 2",
},
},
})).Return(nil).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "scheduling exceeded RetryLimit"))
assert.Equal(len(peer.Parents()), 0)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
@ -507,8 +414,9 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) {
ma.Send(gomock.Any()).Return(nil).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer) {
expect: func(t *testing.T, peer *resource.Peer, err error) {
assert := assert.New(t)
assert.NoError(err)
assert.Equal(len(peer.Parents()), 1)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
assert.True(peer.Task.FSM.Is(resource.TaskStatePending))
@ -536,13 +444,12 @@ func TestScheduling_ScheduleCandidateParentsForNormalPeer(t *testing.T) {
tc.mock(cancel, peer, seedPeer, blocklist, stream, stream.EXPECT(), dynconfig.EXPECT())
scheduling := New(mockSchedulerConfig, dynconfig, mockPluginDir)
scheduling.ScheduleCandidateParentsForNormalPeer(ctx, peer, blocklist)
tc.expect(t, peer)
tc.expect(t, peer, scheduling.ScheduleCandidateParents(ctx, peer, blocklist))
})
}
}
func TestScheduling_ScheduleParentsForNormalPeer(t *testing.T) {
func TestScheduling_ScheduleParentAndCandidateParents(t *testing.T) {
tests := []struct {
name string
mock func(cancel context.CancelFunc, peer *resource.Peer, seedPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, mr *schedulerv1mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
@ -810,7 +717,7 @@ func TestScheduling_ScheduleParentsForNormalPeer(t *testing.T) {
tc.mock(cancel, peer, seedPeer, blocklist, stream, stream.EXPECT(), dynconfig.EXPECT())
scheduling := New(mockSchedulerConfig, dynconfig, mockPluginDir)
scheduling.ScheduleParentsForNormalPeer(ctx, peer, blocklist)
scheduling.ScheduleParentAndCandidateParents(ctx, peer, blocklist)
tc.expect(t, peer)
})
}

View File

@ -168,13 +168,14 @@ func (v *V1) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequ
// ReportPieceResult handles the piece information reported by dfdaemon.
func (v *V1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultServer) error {
ctx := stream.Context()
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
var (
peer *resource.Peer
initialized bool
loaded bool
)
for {
select {
case <-ctx.Done():
@ -327,7 +328,7 @@ func (v *V1) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequ
}
task := resource.NewTask(taskID, req.Url, req.UrlMeta.Tag, req.UrlMeta.Application, types.TaskTypeV1ToV2(req.TaskType),
strings.Split(req.UrlMeta.Filter, idgen.URLFilterSeparator), req.UrlMeta.Header, int32(v.config.Scheduler.BackSourceCount), options...)
strings.Split(req.UrlMeta.Filter, idgen.URLFilterSeparator), req.UrlMeta.Header, int32(v.config.Scheduler.BackToSourceCount), options...)
task, _ = v.resource.TaskManager().LoadOrStore(task)
host := v.storeHost(ctx, req.PeerHost)
peer := v.storePeer(ctx, peerID, req.UrlMeta.Priority, req.UrlMeta.Range, task, host)
@ -963,7 +964,7 @@ func (v *V1) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) {
return
}
v.scheduling.ScheduleParentsForNormalPeer(ctx, peer, set.NewSafeSet[string]())
v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, set.NewSafeSet[string]())
default:
}
}
@ -1034,7 +1035,7 @@ func (v *V1) handlePieceFailure(ctx context.Context, peer *resource.Peer, piece
if !loaded {
peer.Log.Errorf("parent %s not found", piece.DstPid)
peer.BlockParents.Add(piece.DstPid)
v.scheduling.ScheduleParentsForNormalPeer(ctx, peer, peer.BlockParents)
v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, peer.BlockParents)
return
}
@ -1093,7 +1094,7 @@ func (v *V1) handlePieceFailure(ctx context.Context, peer *resource.Peer, piece
peer.Log.Infof("reschedule parent because of failed piece")
peer.BlockParents.Add(parent.ID)
v.scheduling.ScheduleParentsForNormalPeer(ctx, peer, peer.BlockParents)
v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, peer.BlockParents)
}
// handlePeerSuccess handles successful peer.
@ -1135,7 +1136,7 @@ func (v *V1) handlePeerFailure(ctx context.Context, peer *resource.Peer) {
// Reschedule a new parent to children of peer to exclude the current failed peer.
for _, child := range peer.Children() {
child.Log.Infof("reschedule parent because of parent peer %s is failed", peer.ID)
v.scheduling.ScheduleParentsForNormalPeer(ctx, child, child.BlockParents)
v.scheduling.ScheduleParentAndCandidateParents(ctx, child, child.BlockParents)
}
}
@ -1150,7 +1151,7 @@ func (v *V1) handleLegacySeedPeer(ctx context.Context, peer *resource.Peer) {
// Reschedule a new parent to children of peer to exclude the current failed peer.
for _, child := range peer.Children() {
child.Log.Infof("reschedule parent because of parent peer %s is failed", peer.ID)
v.scheduling.ScheduleParentsForNormalPeer(ctx, child, child.BlockParents)
v.scheduling.ScheduleParentAndCandidateParents(ctx, child, child.BlockParents)
}
}

View File

@ -178,7 +178,7 @@ var (
mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilters)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskPieceLength, mockTaskFilters)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
@ -3410,7 +3410,7 @@ func TestServiceV1_handleBeginOfPiece(t *testing.T) {
name: "peer state is PeerStateReceivedNormal",
mock: func(peer *resource.Peer, scheduling *mocks.MockSchedulingMockRecorder) {
peer.FSM.SetState(resource.PeerStateReceivedNormal)
scheduling.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(peer), gomock.Eq(set.NewSafeSet[string]())).Return().Times(1)
scheduling.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(peer), gomock.Eq(set.NewSafeSet[string]())).Return().Times(1)
},
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
@ -3631,7 +3631,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(parent.ID)).Return(nil, false).Times(1),
ms.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1),
ms.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1),
)
svc.handlePieceFailure(context.Background(), peer, piece)
@ -3659,7 +3659,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1),
ms.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1),
ms.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1),
)
svc.handlePieceFailure(context.Background(), peer, piece)
@ -3688,7 +3688,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1),
ms.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1),
ms.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1),
)
svc.handlePieceFailure(context.Background(), peer, piece)
@ -3716,7 +3716,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1),
ms.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1),
ms.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1),
)
svc.handlePieceFailure(context.Background(), peer, piece)
@ -3745,7 +3745,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1),
ms.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1),
ms.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1),
)
svc.handlePieceFailure(context.Background(), peer, piece)
@ -3943,7 +3943,7 @@ func TestServiceV1_handlePeerFail(t *testing.T) {
peer.FSM.SetState(resource.PeerStateRunning)
child.FSM.SetState(resource.PeerStateRunning)
ms.ScheduleParentsForNormalPeer(gomock.Any(), gomock.Eq(child), gomock.Eq(set.NewSafeSet[string]())).Return().Times(1)
ms.ScheduleParentAndCandidateParents(gomock.Any(), gomock.Eq(child), gomock.Eq(set.NewSafeSet[string]())).Return().Times(1)
},
expect: func(t *testing.T, peer *resource.Peer, child *resource.Peer) {
assert := assert.New(t)

View File

@ -19,6 +19,7 @@ package service
import (
"context"
"fmt"
"io"
"time"
"google.golang.org/grpc/codes"
@ -37,7 +38,6 @@ import (
"d7y.io/dragonfly/v2/scheduler/storage"
)
// TODO Implement v2 version of the service functions.
// V2 is the interface for v2 version of the service.
type V2 struct {
// Resource interface.
@ -75,9 +75,121 @@ func NewV2(
// AnnouncePeer announces peer to scheduler.
func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
for {
select {
case <-ctx.Done():
logger.Infof("context was done")
return ctx.Err()
default:
}
req, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
logger.Errorf("receive error: %s", err.Error())
return err
}
logger := logger.WithPeer(req.HostId, req.TaskId, req.PeerId)
switch announcePeerRequest := req.GetRequest().(type) {
case *schedulerv2.AnnouncePeerRequest_RegisterPeerRequest:
logger.Infof("receive AnnouncePeerRequest_RegisterPeerRequest: %#v", announcePeerRequest.RegisterPeerRequest.Download)
if err := v.handleRegisterPeerRequest(req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterPeerRequest); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerStartedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerStartedRequest: %#v", announcePeerRequest.DownloadPeerStartedRequest)
v.handleDownloadPeerStartedRequest(announcePeerRequest.DownloadPeerStartedRequest)
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceStartedRequest)
v.handleDownloadPeerBackToSourceStartedRequest(announcePeerRequest.DownloadPeerBackToSourceStartedRequest)
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerFinishedRequest: %#v", announcePeerRequest.DownloadPeerFinishedRequest)
v.handleDownloadPeerFinishedRequest(announcePeerRequest.DownloadPeerFinishedRequest)
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceFinishedRequest)
v.handleDownloadPeerBackToSourceFinishedRequest(announcePeerRequest.DownloadPeerBackToSourceFinishedRequest)
case *schedulerv2.AnnouncePeerRequest_DownloadPieceFinishedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPieceFinishedRequest: %#v", announcePeerRequest.DownloadPieceFinishedRequest)
v.handleDownloadPieceFinishedRequest(announcePeerRequest.DownloadPieceFinishedRequest)
case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFinishedRequest)
v.handleDownloadPieceBackToSourceFinishedRequest(announcePeerRequest.DownloadPieceBackToSourceFinishedRequest)
case *schedulerv2.AnnouncePeerRequest_DownloadPieceFailedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPieceFailedRequest: %#v", announcePeerRequest.DownloadPieceFailedRequest)
v.handleDownloadPieceFailedRequest(announcePeerRequest.DownloadPieceFailedRequest)
case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFailedRequest)
v.handleDownloadPieceBackToSourceFailedRequest(announcePeerRequest.DownloadPieceBackToSourceFailedRequest)
case *schedulerv2.AnnouncePeerRequest_SyncPiecesFailedRequest:
logger.Infof("receive AnnouncePeerRequest_SyncPiecesFailedRequest: %#v", announcePeerRequest.SyncPiecesFailedRequest)
v.handleSyncPiecesFailedRequest(announcePeerRequest.SyncPiecesFailedRequest)
default:
msg := fmt.Sprintf("receive unknow request: %#v", announcePeerRequest)
logger.Error(msg)
return status.Error(codes.FailedPrecondition, msg)
}
}
}
// TODO Implement function.
// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterPeerRequest(hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
return nil
}
// TODO Implement function.
// handleDownloadPeerStartedRequest handles DownloadPeerStartedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPeerStartedRequest(req *schedulerv2.DownloadPeerStartedRequest) {
}
// TODO Implement function.
// handleDownloadPeerBackToSourceStartedRequest handles DownloadPeerBackToSourceStartedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPeerBackToSourceStartedRequest(req *schedulerv2.DownloadPeerBackToSourceStartedRequest) {
}
// TODO Implement function.
// handleDownloadPeerFinishedRequest handles DownloadPeerFinishedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPeerFinishedRequest(req *schedulerv2.DownloadPeerFinishedRequest) {
}
// TODO Implement function.
// handleDownloadPeerBackToSourceFinishedRequest handles DownloadPeerBackToSourceFinishedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(req *schedulerv2.DownloadPeerBackToSourceFinishedRequest) {
}
// TODO Implement function.
// handleDownloadPieceFinishedRequest handles DownloadPieceFinishedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPieceFinishedRequest(req *schedulerv2.DownloadPieceFinishedRequest) {
}
// TODO Implement function.
// handleDownloadPieceBackToSourceFinishedRequest handles DownloadPieceBackToSourceFinishedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(req *schedulerv2.DownloadPieceBackToSourceFinishedRequest) {
}
// TODO Implement function.
// handleDownloadPieceFailedRequest handles DownloadPieceFailedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPieceFailedRequest(req *schedulerv2.DownloadPieceFailedRequest) {
}
// TODO Implement function.
// handleDownloadPieceBackToSourceFailedRequest handles DownloadPieceBackToSourceFailedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPieceBackToSourceFailedRequest(req *schedulerv2.DownloadPieceBackToSourceFailedRequest) {
}
// TODO Implement function.
// handleSyncPiecesFailedRequest handles SyncPiecesFailedRequest of AnnouncePeerRequest.
func (v *V2) handleSyncPiecesFailedRequest(req *schedulerv2.SyncPiecesFailedRequest) {
}
// StatPeer checks information of peer.
func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*commonv2.Peer, error) {
logger.WithTaskID(req.TaskId).Infof("stat peer request: %#v", req)