feat: add handleRegisterSeedPeerRequest to AnnouncePeer in service v2 (#2147)
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
cff4d31f72
commit
047a55fb84
|
|
@ -38,4 +38,3 @@ COPY --from=health /bin/grpc_health_probe /bin/grpc_health_probe
|
||||||
EXPOSE 65001
|
EXPOSE 65001
|
||||||
|
|
||||||
ENTRYPOINT ["/opt/dragonfly/bin/dfget", "daemon"]
|
ENTRYPOINT ["/opt/dragonfly/bin/dfget", "daemon"]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import (
|
||||||
time "time"
|
time "time"
|
||||||
|
|
||||||
dfdaemon "d7y.io/api/pkg/apis/dfdaemon/v1"
|
dfdaemon "d7y.io/api/pkg/apis/dfdaemon/v1"
|
||||||
|
config "d7y.io/dragonfly/v2/client/config"
|
||||||
gomock "github.com/golang/mock/gomock"
|
gomock "github.com/golang/mock/gomock"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -62,6 +63,18 @@ func (mr *MockServerMockRecorder) Keep() *gomock.Call {
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Keep", reflect.TypeOf((*MockServer)(nil).Keep))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Keep", reflect.TypeOf((*MockServer)(nil).Keep))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OnNotify mocks base method.
|
||||||
|
func (m *MockServer) OnNotify(arg0 *config.DynconfigData) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
m.ctrl.Call(m, "OnNotify", arg0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnNotify indicates an expected call of OnNotify.
|
||||||
|
func (mr *MockServerMockRecorder) OnNotify(arg0 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnNotify", reflect.TypeOf((*MockServer)(nil).OnNotify), arg0)
|
||||||
|
}
|
||||||
|
|
||||||
// ServeDownload mocks base method.
|
// ServeDownload mocks base method.
|
||||||
func (m *MockServer) ServeDownload(listener net.Listener) error {
|
func (m *MockServer) ServeDownload(listener net.Listener) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|
|
||||||
2
go.mod
2
go.mod
|
|
@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
|
||||||
go 1.20
|
go 1.20
|
||||||
|
|
||||||
require (
|
require (
|
||||||
d7y.io/api v1.6.8
|
d7y.io/api v1.7.5
|
||||||
github.com/RichardKnop/machinery v1.10.6
|
github.com/RichardKnop/machinery v1.10.6
|
||||||
github.com/Showmax/go-fqdn v1.0.0
|
github.com/Showmax/go-fqdn v1.0.0
|
||||||
github.com/VividCortex/mysqlerr v1.0.0
|
github.com/VividCortex/mysqlerr v1.0.0
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
|
||||||
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
||||||
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
||||||
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
|
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
|
||||||
d7y.io/api v1.6.8 h1:/oNEZC8FC8P1vPHlzgtJbBQzh5lnf0mZ+9VBx/Nq+iU=
|
d7y.io/api v1.7.5 h1:JLtbTLAiNom+qT/sQHgzqKApw/tG5MQaTBcsH/Lb2wE=
|
||||||
d7y.io/api v1.6.8/go.mod h1:LgmoxxoRDzBiseGFxNWqQP5qsro8+lhYSGwR+/Chplw=
|
d7y.io/api v1.7.5/go.mod h1:LgmoxxoRDzBiseGFxNWqQP5qsro8+lhYSGwR+/Chplw=
|
||||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||||
|
|
|
||||||
|
|
@ -111,6 +111,13 @@ const (
|
||||||
// PeerOption is a functional option for peer.
|
// PeerOption is a functional option for peer.
|
||||||
type PeerOption func(peer *Peer)
|
type PeerOption func(peer *Peer)
|
||||||
|
|
||||||
|
// WithAnnouncePeerStream set AnnouncePeerStream for peer.
|
||||||
|
func WithAnnouncePeerStream(stream schedulerv2.Scheduler_AnnouncePeerServer) PeerOption {
|
||||||
|
return func(p *Peer) {
|
||||||
|
p.StoreAnnouncePeerStream(stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithPriority set Priority for peer.
|
// WithPriority set Priority for peer.
|
||||||
func WithPriority(priority commonv2.Priority) PeerOption {
|
func WithPriority(priority commonv2.Priority) PeerOption {
|
||||||
return func(p *Peer) {
|
return func(p *Peer) {
|
||||||
|
|
@ -354,7 +361,7 @@ func (p *Peer) DeleteReportPieceResultStream() {
|
||||||
// LoadAnnouncePeerStream return the grpc stream of Scheduler_AnnouncePeerServer,
|
// LoadAnnouncePeerStream return the grpc stream of Scheduler_AnnouncePeerServer,
|
||||||
// Used only in v2 version of the grpc.
|
// Used only in v2 version of the grpc.
|
||||||
func (p *Peer) LoadAnnouncePeerStream() (schedulerv2.Scheduler_AnnouncePeerServer, bool) {
|
func (p *Peer) LoadAnnouncePeerStream() (schedulerv2.Scheduler_AnnouncePeerServer, bool) {
|
||||||
rawStream := p.ReportPieceResultStream.Load()
|
rawStream := p.AnnouncePeerStream.Load()
|
||||||
if rawStream == nil {
|
if rawStream == nil {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
@ -365,13 +372,13 @@ func (p *Peer) LoadAnnouncePeerStream() (schedulerv2.Scheduler_AnnouncePeerServe
|
||||||
// StoreAnnouncePeerStream set the grpc stream of Scheduler_AnnouncePeerServer,
|
// StoreAnnouncePeerStream set the grpc stream of Scheduler_AnnouncePeerServer,
|
||||||
// Used only in v2 version of the grpc.
|
// Used only in v2 version of the grpc.
|
||||||
func (p *Peer) StoreAnnouncePeerStream(stream schedulerv2.Scheduler_AnnouncePeerServer) {
|
func (p *Peer) StoreAnnouncePeerStream(stream schedulerv2.Scheduler_AnnouncePeerServer) {
|
||||||
p.ReportPieceResultStream.Store(stream)
|
p.AnnouncePeerStream.Store(stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteAnnouncePeerStream deletes the grpc stream of Scheduler_AnnouncePeerServer,
|
// DeleteAnnouncePeerStream deletes the grpc stream of Scheduler_AnnouncePeerServer,
|
||||||
// Used only in v2 version of the grpc.
|
// Used only in v2 version of the grpc.
|
||||||
func (p *Peer) DeleteAnnouncePeerStream() {
|
func (p *Peer) DeleteAnnouncePeerStream() {
|
||||||
p.ReportPieceResultStream = &atomic.Value{}
|
p.AnnouncePeerStream = &atomic.Value{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadPiece return piece for a key.
|
// LoadPiece return piece for a key.
|
||||||
|
|
@ -515,8 +522,8 @@ func (p *Peer) DownloadFile() ([]byte, error) {
|
||||||
return io.ReadAll(resp.Body)
|
return io.ReadAll(resp.Body)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPriority returns priority of peer.
|
// CalculatePriority returns priority of peer.
|
||||||
func (p *Peer) GetPriority(dynconfig config.DynconfigInterface) commonv2.Priority {
|
func (p *Peer) CalculatePriority(dynconfig config.DynconfigInterface) commonv2.Priority {
|
||||||
if p.Priority != commonv2.Priority_LEVEL0 {
|
if p.Priority != commonv2.Priority_LEVEL0 {
|
||||||
return p.Priority
|
return p.Priority
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,10 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPeer_NewPeer(t *testing.T) {
|
func TestPeer_NewPeer(t *testing.T) {
|
||||||
|
ctl := gomock.NewController(t)
|
||||||
|
defer ctl.Finish()
|
||||||
|
stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
id string
|
id string
|
||||||
|
|
@ -136,6 +140,32 @@ func TestPeer_NewPeer(t *testing.T) {
|
||||||
assert.NotNil(peer.Log)
|
assert.NotNil(peer.Log)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "new peer with AnnouncePeerStream",
|
||||||
|
id: mockPeerID,
|
||||||
|
options: []PeerOption{WithAnnouncePeerStream(stream)},
|
||||||
|
expect: func(t *testing.T, peer *Peer, mockTask *Task, mockHost *Host) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.Equal(peer.ID, mockPeerID)
|
||||||
|
assert.Nil(peer.Range)
|
||||||
|
assert.Equal(peer.Priority, commonv2.Priority_LEVEL0)
|
||||||
|
assert.Empty(peer.Pieces)
|
||||||
|
assert.Empty(peer.FinishedPieces)
|
||||||
|
assert.Equal(len(peer.PieceCosts()), 0)
|
||||||
|
assert.Empty(peer.ReportPieceResultStream)
|
||||||
|
assert.NotEmpty(peer.AnnouncePeerStream)
|
||||||
|
assert.Equal(peer.FSM.Current(), PeerStatePending)
|
||||||
|
assert.EqualValues(peer.Task, mockTask)
|
||||||
|
assert.EqualValues(peer.Host, mockHost)
|
||||||
|
assert.Equal(peer.BlockParents.Len(), uint(0))
|
||||||
|
assert.Equal(peer.NeedBackToSource.Load(), false)
|
||||||
|
assert.Equal(peer.IsBackToSource.Load(), false)
|
||||||
|
assert.NotEqual(peer.PieceUpdatedAt.Load(), 0)
|
||||||
|
assert.NotEqual(peer.CreatedAt.Load(), 0)
|
||||||
|
assert.NotEqual(peer.UpdatedAt.Load(), 0)
|
||||||
|
assert.NotNil(peer.Log)
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
|
|
@ -886,7 +916,7 @@ func TestPeer_DownloadFile(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPeer_GetPriority(t *testing.T) {
|
func TestPeer_CalculatePriority(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
mock func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder)
|
mock func(peer *Peer, md *configmocks.MockDynconfigInterfaceMockRecorder)
|
||||||
|
|
@ -1009,7 +1039,7 @@ func TestPeer_GetPriority(t *testing.T) {
|
||||||
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
|
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
|
||||||
peer := NewPeer(mockPeerID, mockTask, mockHost)
|
peer := NewPeer(mockPeerID, mockTask, mockHost)
|
||||||
tc.mock(peer, dynconfig.EXPECT())
|
tc.mock(peer, dynconfig.EXPECT())
|
||||||
tc.expect(t, peer.GetPriority(dynconfig))
|
tc.expect(t, peer.CalculatePriority(dynconfig))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,8 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
cdnsystemv1 "d7y.io/api/pkg/apis/cdnsystem/v1"
|
cdnsystemv1 "d7y.io/api/pkg/apis/cdnsystem/v1"
|
||||||
commonv1 "d7y.io/api/pkg/apis/common/v1"
|
commonv1 "d7y.io/api/pkg/apis/common/v1"
|
||||||
commonv2 "d7y.io/api/pkg/apis/common/v2"
|
commonv2 "d7y.io/api/pkg/apis/common/v2"
|
||||||
|
|
@ -33,6 +35,7 @@ import (
|
||||||
"d7y.io/dragonfly/v2/pkg/idgen"
|
"d7y.io/dragonfly/v2/pkg/idgen"
|
||||||
"d7y.io/dragonfly/v2/pkg/net/http"
|
"d7y.io/dragonfly/v2/pkg/net/http"
|
||||||
"d7y.io/dragonfly/v2/pkg/rpc/common"
|
"d7y.io/dragonfly/v2/pkg/rpc/common"
|
||||||
|
"d7y.io/dragonfly/v2/pkg/types"
|
||||||
"d7y.io/dragonfly/v2/scheduler/metrics"
|
"d7y.io/dragonfly/v2/scheduler/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -45,7 +48,7 @@ const (
|
||||||
type SeedPeer interface {
|
type SeedPeer interface {
|
||||||
// DownloadTask downloads task back-to-source.
|
// DownloadTask downloads task back-to-source.
|
||||||
// Used only in v2 version of the grpc.
|
// Used only in v2 version of the grpc.
|
||||||
DownloadTask(context.Context, *Task) error
|
DownloadTask(context.Context, *Task, types.HostType) error
|
||||||
|
|
||||||
// TriggerTask triggers the seed peer to download task.
|
// TriggerTask triggers the seed peer to download task.
|
||||||
// Used only in v1 version of the grpc.
|
// Used only in v1 version of the grpc.
|
||||||
|
|
@ -80,14 +83,17 @@ func newSeedPeer(client SeedPeerClient, peerManager PeerManager, hostManager Hos
|
||||||
// TODO Implement DownloadTask
|
// TODO Implement DownloadTask
|
||||||
// DownloadTask downloads task back-to-source.
|
// DownloadTask downloads task back-to-source.
|
||||||
// Used only in v2 version of the grpc.
|
// Used only in v2 version of the grpc.
|
||||||
func (s *seedPeer) DownloadTask(ctx context.Context, task *Task) error {
|
func (s *seedPeer) DownloadTask(ctx context.Context, task *Task, hostType types.HostType) error {
|
||||||
|
// ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)))
|
||||||
|
// defer cancel()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TriggerTask triggers the seed peer to download task.
|
// TriggerTask triggers the seed peer to download task.
|
||||||
// Used only in v1 version of the grpc.
|
// Used only in v1 version of the grpc.
|
||||||
func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task) (*Peer, *schedulerv1.PeerResult, error) {
|
func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task) (*Peer, *schedulerv1.PeerResult, error) {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)))
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
urlMeta := &commonv1.UrlMeta{
|
urlMeta := &commonv1.UrlMeta{
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import (
|
||||||
|
|
||||||
v1 "d7y.io/api/pkg/apis/scheduler/v1"
|
v1 "d7y.io/api/pkg/apis/scheduler/v1"
|
||||||
http "d7y.io/dragonfly/v2/pkg/net/http"
|
http "d7y.io/dragonfly/v2/pkg/net/http"
|
||||||
|
types "d7y.io/dragonfly/v2/pkg/types"
|
||||||
gomock "github.com/golang/mock/gomock"
|
gomock "github.com/golang/mock/gomock"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -51,17 +52,17 @@ func (mr *MockSeedPeerMockRecorder) Client() *gomock.Call {
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadTask mocks base method.
|
// DownloadTask mocks base method.
|
||||||
func (m *MockSeedPeer) DownloadTask(arg0 context.Context, arg1 *Task) error {
|
func (m *MockSeedPeer) DownloadTask(arg0 context.Context, arg1 *Task, arg2 types.HostType) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "DownloadTask", arg0, arg1)
|
ret := m.ctrl.Call(m, "DownloadTask", arg0, arg1, arg2)
|
||||||
ret0, _ := ret[0].(error)
|
ret0, _ := ret[0].(error)
|
||||||
return ret0
|
return ret0
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadTask indicates an expected call of DownloadTask.
|
// DownloadTask indicates an expected call of DownloadTask.
|
||||||
func (mr *MockSeedPeerMockRecorder) DownloadTask(arg0, arg1 interface{}) *gomock.Call {
|
func (mr *MockSeedPeerMockRecorder) DownloadTask(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockSeedPeer)(nil).DownloadTask), arg0, arg1)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockSeedPeer)(nil).DownloadTask), arg0, arg1, arg2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop mocks base method.
|
// Stop mocks base method.
|
||||||
|
|
|
||||||
|
|
@ -51,6 +51,21 @@ func (mr *MockSchedulingMockRecorder) FindCandidateParents(arg0, arg1, arg2 inte
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindCandidateParents", reflect.TypeOf((*MockScheduling)(nil).FindCandidateParents), arg0, arg1, arg2)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindCandidateParents", reflect.TypeOf((*MockScheduling)(nil).FindCandidateParents), arg0, arg1, arg2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindSuccessParent mocks base method.
|
||||||
|
func (m *MockScheduling) FindSuccessParent(arg0 context.Context, arg1 *resource.Peer, arg2 set.SafeSet[string]) (*resource.Peer, bool) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "FindSuccessParent", arg0, arg1, arg2)
|
||||||
|
ret0, _ := ret[0].(*resource.Peer)
|
||||||
|
ret1, _ := ret[1].(bool)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindSuccessParent indicates an expected call of FindSuccessParent.
|
||||||
|
func (mr *MockSchedulingMockRecorder) FindSuccessParent(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindSuccessParent", reflect.TypeOf((*MockScheduling)(nil).FindSuccessParent), arg0, arg1, arg2)
|
||||||
|
}
|
||||||
|
|
||||||
// ScheduleCandidateParents mocks base method.
|
// ScheduleCandidateParents mocks base method.
|
||||||
func (m *MockScheduling) ScheduleCandidateParents(arg0 context.Context, arg1 *resource.Peer, arg2 set.SafeSet[string]) error {
|
func (m *MockScheduling) ScheduleCandidateParents(arg0 context.Context, arg1 *resource.Peer, arg2 set.SafeSet[string]) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|
|
||||||
|
|
@ -52,6 +52,9 @@ type Scheduling interface {
|
||||||
|
|
||||||
// FindCandidateParents finds candidate parents for the peer.
|
// FindCandidateParents finds candidate parents for the peer.
|
||||||
FindCandidateParents(context.Context, *resource.Peer, set.SafeSet[string]) ([]*resource.Peer, bool)
|
FindCandidateParents(context.Context, *resource.Peer, set.SafeSet[string]) ([]*resource.Peer, bool)
|
||||||
|
|
||||||
|
// FindSuccessParent finds success parent for the peer.
|
||||||
|
FindSuccessParent(context.Context, *resource.Peer, set.SafeSet[string]) (*resource.Peer, bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
type scheduling struct {
|
type scheduling struct {
|
||||||
|
|
@ -196,7 +199,7 @@ func (s *scheduling) ScheduleCandidateParents(ctx context.Context, peer *resourc
|
||||||
// Send NormalTaskResponse to peer.
|
// Send NormalTaskResponse to peer.
|
||||||
peer.Log.Info("send NormalTaskResponse")
|
peer.Log.Info("send NormalTaskResponse")
|
||||||
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
|
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
|
||||||
Response: constructSuccessNormalTaskResponse(s.dynconfig, candidateParents),
|
Response: ConstructSuccessNormalTaskResponse(s.dynconfig, candidateParents),
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
peer.Log.Error(err)
|
peer.Log.Error(err)
|
||||||
return status.Error(codes.FailedPrecondition, err.Error())
|
return status.Error(codes.FailedPrecondition, err.Error())
|
||||||
|
|
@ -358,7 +361,7 @@ func (s *scheduling) ScheduleParentAndCandidateParents(ctx context.Context, peer
|
||||||
|
|
||||||
// Send PeerPacket to peer.
|
// Send PeerPacket to peer.
|
||||||
peer.Log.Info("send PeerPacket to peer")
|
peer.Log.Info("send PeerPacket to peer")
|
||||||
if err := stream.Send(constructSuccessPeerPacket(s.dynconfig, peer, candidateParents[0], candidateParents[1:])); err != nil {
|
if err := stream.Send(ConstructSuccessPeerPacket(s.dynconfig, peer, candidateParents[0], candidateParents[1:])); err != nil {
|
||||||
n++
|
n++
|
||||||
peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error())
|
peer.Log.Errorf("scheduling failed in %d times, because of %s", n, err.Error())
|
||||||
|
|
||||||
|
|
@ -409,21 +412,52 @@ func (s *scheduling) FindCandidateParents(ctx context.Context, peer *resource.Pe
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
// Add edges between candidate parent and peer.
|
|
||||||
var parentIDs []string
|
var parentIDs []string
|
||||||
for _, candidateParent := range candidateParents {
|
for _, candidateParent := range candidateParents {
|
||||||
parentIDs = append(parentIDs, candidateParent.ID)
|
parentIDs = append(parentIDs, candidateParent.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(candidateParents) <= 0 {
|
|
||||||
peer.Log.Info("can not add edges for vertex")
|
|
||||||
return []*resource.Peer{}, false
|
|
||||||
}
|
|
||||||
|
|
||||||
peer.Log.Infof("scheduling candidate parents is %#v", parentIDs)
|
peer.Log.Infof("scheduling candidate parents is %#v", parentIDs)
|
||||||
return candidateParents, true
|
return candidateParents, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindSuccessParent finds success parent for the peer.
|
||||||
|
func (s *scheduling) FindSuccessParent(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet[string]) (*resource.Peer, bool) {
|
||||||
|
// Only PeerStateRunning peers need to be rescheduled,
|
||||||
|
// and other states including the PeerStateBackToSource indicate that
|
||||||
|
// they have been scheduled.
|
||||||
|
if !peer.FSM.Is(resource.PeerStateRunning) {
|
||||||
|
peer.Log.Infof("peer state is %s, can not schedule parent", peer.FSM.Current())
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the candidate parent that can be scheduled.
|
||||||
|
candidateParents := s.filterCandidateParents(peer, blocklist)
|
||||||
|
if len(candidateParents) == 0 {
|
||||||
|
peer.Log.Info("can not find candidate parents")
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
var successParents []*resource.Peer
|
||||||
|
for _, candidateParent := range candidateParents {
|
||||||
|
if candidateParent.FSM.Is(resource.PeerStateSucceeded) {
|
||||||
|
successParents = append(successParents, candidateParent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort candidate parents by evaluation score.
|
||||||
|
taskTotalPieceCount := peer.Task.TotalPieceCount.Load()
|
||||||
|
sort.Slice(
|
||||||
|
successParents,
|
||||||
|
func(i, j int) bool {
|
||||||
|
return s.evaluator.Evaluate(successParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(successParents[j], peer, taskTotalPieceCount)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
peer.Log.Infof("scheduling success parent is %s", successParents[0].ID)
|
||||||
|
return successParents[0], true
|
||||||
|
}
|
||||||
|
|
||||||
// filterCandidateParents filters the candidate parents that can be scheduled.
|
// filterCandidateParents filters the candidate parents that can be scheduled.
|
||||||
func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.SafeSet[string]) []*resource.Peer {
|
func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.SafeSet[string]) []*resource.Peer {
|
||||||
filterParentLimit := config.DefaultSchedulerFilterParentLimit
|
filterParentLimit := config.DefaultSchedulerFilterParentLimit
|
||||||
|
|
@ -509,9 +543,177 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S
|
||||||
return candidateParents
|
return candidateParents
|
||||||
}
|
}
|
||||||
|
|
||||||
// constructSuccessNormalTaskResponse constructs scheduling successful response of the normal task.
|
// ConstructSuccessSmallTaskResponse constructs scheduling successful response of the small task.
|
||||||
// Used only in v2 version of the grpc.
|
// Used only in v2 version of the grpc.
|
||||||
func constructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse {
|
func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedulerv2.AnnouncePeerResponse_SmallTaskResponse {
|
||||||
|
parent := &commonv2.Peer{
|
||||||
|
Id: candidateParent.ID,
|
||||||
|
Priority: candidateParent.Priority,
|
||||||
|
Cost: durationpb.New(candidateParent.Cost.Load()),
|
||||||
|
State: candidateParent.FSM.Current(),
|
||||||
|
NeedBackToSource: candidateParent.NeedBackToSource.Load(),
|
||||||
|
CreatedAt: timestamppb.New(candidateParent.CreatedAt.Load()),
|
||||||
|
UpdatedAt: timestamppb.New(candidateParent.UpdatedAt.Load()),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set range to parent.
|
||||||
|
if candidateParent.Range != nil {
|
||||||
|
parent.Range = &commonv2.Range{
|
||||||
|
Start: candidateParent.Range.Start,
|
||||||
|
Length: candidateParent.Range.Length,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set pieces to parent.
|
||||||
|
candidateParent.Pieces.Range(func(key, value any) bool {
|
||||||
|
candidateParentPiece, ok := value.(*resource.Piece)
|
||||||
|
if !ok {
|
||||||
|
candidateParent.Log.Errorf("invalid piece %s %#v", key, value)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
piece := &commonv2.Piece{
|
||||||
|
Number: candidateParentPiece.Number,
|
||||||
|
ParentId: candidateParentPiece.ParentID,
|
||||||
|
Offset: candidateParentPiece.Offset,
|
||||||
|
Length: candidateParentPiece.Length,
|
||||||
|
TrafficType: candidateParentPiece.TrafficType,
|
||||||
|
Cost: durationpb.New(candidateParentPiece.Cost),
|
||||||
|
CreatedAt: timestamppb.New(candidateParentPiece.CreatedAt),
|
||||||
|
}
|
||||||
|
|
||||||
|
if candidateParentPiece.Digest != nil {
|
||||||
|
piece.Digest = candidateParentPiece.Digest.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
parent.Pieces = append(parent.Pieces, piece)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Set task to parent.
|
||||||
|
parent.Task = &commonv2.Task{
|
||||||
|
Id: candidateParent.Task.ID,
|
||||||
|
Type: candidateParent.Task.Type,
|
||||||
|
Url: candidateParent.Task.URL,
|
||||||
|
Tag: candidateParent.Task.Tag,
|
||||||
|
Application: candidateParent.Task.Application,
|
||||||
|
Filters: candidateParent.Task.Filters,
|
||||||
|
Header: candidateParent.Task.Header,
|
||||||
|
PieceLength: candidateParent.Task.PieceLength,
|
||||||
|
ContentLength: candidateParent.Task.ContentLength.Load(),
|
||||||
|
PieceCount: candidateParent.Task.TotalPieceCount.Load(),
|
||||||
|
SizeScope: candidateParent.Task.SizeScope(),
|
||||||
|
State: candidateParent.Task.FSM.Current(),
|
||||||
|
PeerCount: int32(candidateParent.Task.PeerCount()),
|
||||||
|
CreatedAt: timestamppb.New(candidateParent.Task.CreatedAt.Load()),
|
||||||
|
UpdatedAt: timestamppb.New(candidateParent.Task.UpdatedAt.Load()),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set digest to parent task.
|
||||||
|
if candidateParent.Task.Digest != nil {
|
||||||
|
parent.Task.Digest = candidateParent.Task.Digest.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set pieces to parent task.
|
||||||
|
candidateParent.Task.Pieces.Range(func(key, value any) bool {
|
||||||
|
taskPiece, ok := value.(*resource.Piece)
|
||||||
|
if !ok {
|
||||||
|
candidateParent.Task.Log.Errorf("invalid piece %s %#v", key, value)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
piece := &commonv2.Piece{
|
||||||
|
Number: taskPiece.Number,
|
||||||
|
ParentId: taskPiece.ParentID,
|
||||||
|
Offset: taskPiece.Offset,
|
||||||
|
Length: taskPiece.Length,
|
||||||
|
TrafficType: taskPiece.TrafficType,
|
||||||
|
Cost: durationpb.New(taskPiece.Cost),
|
||||||
|
CreatedAt: timestamppb.New(taskPiece.CreatedAt),
|
||||||
|
}
|
||||||
|
|
||||||
|
if taskPiece.Digest != nil {
|
||||||
|
piece.Digest = taskPiece.Digest.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
parent.Task.Pieces = append(parent.Task.Pieces, piece)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Set host to parent.
|
||||||
|
parent.Host = &commonv2.Host{
|
||||||
|
Id: candidateParent.Host.ID,
|
||||||
|
Type: uint32(candidateParent.Host.Type),
|
||||||
|
Hostname: candidateParent.Host.Hostname,
|
||||||
|
Ip: candidateParent.Host.IP,
|
||||||
|
Port: candidateParent.Host.Port,
|
||||||
|
DownloadPort: candidateParent.Host.DownloadPort,
|
||||||
|
Os: candidateParent.Host.OS,
|
||||||
|
Platform: candidateParent.Host.Platform,
|
||||||
|
PlatformFamily: candidateParent.Host.PlatformFamily,
|
||||||
|
PlatformVersion: candidateParent.Host.PlatformVersion,
|
||||||
|
KernelVersion: candidateParent.Host.KernelVersion,
|
||||||
|
Cpu: &commonv2.CPU{
|
||||||
|
LogicalCount: candidateParent.Host.CPU.LogicalCount,
|
||||||
|
PhysicalCount: candidateParent.Host.CPU.PhysicalCount,
|
||||||
|
Percent: candidateParent.Host.CPU.Percent,
|
||||||
|
ProcessPercent: candidateParent.Host.CPU.ProcessPercent,
|
||||||
|
Times: &commonv2.CPUTimes{
|
||||||
|
User: candidateParent.Host.CPU.Times.User,
|
||||||
|
System: candidateParent.Host.CPU.Times.System,
|
||||||
|
Idle: candidateParent.Host.CPU.Times.Idle,
|
||||||
|
Nice: candidateParent.Host.CPU.Times.Nice,
|
||||||
|
Iowait: candidateParent.Host.CPU.Times.Iowait,
|
||||||
|
Irq: candidateParent.Host.CPU.Times.Irq,
|
||||||
|
Softirq: candidateParent.Host.CPU.Times.Softirq,
|
||||||
|
Steal: candidateParent.Host.CPU.Times.Steal,
|
||||||
|
Guest: candidateParent.Host.CPU.Times.Guest,
|
||||||
|
GuestNice: candidateParent.Host.CPU.Times.GuestNice,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Memory: &commonv2.Memory{
|
||||||
|
Total: candidateParent.Host.Memory.Total,
|
||||||
|
Available: candidateParent.Host.Memory.Available,
|
||||||
|
Used: candidateParent.Host.Memory.Used,
|
||||||
|
UsedPercent: candidateParent.Host.Memory.UsedPercent,
|
||||||
|
ProcessUsedPercent: candidateParent.Host.Memory.ProcessUsedPercent,
|
||||||
|
Free: candidateParent.Host.Memory.Free,
|
||||||
|
},
|
||||||
|
Network: &commonv2.Network{
|
||||||
|
TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount,
|
||||||
|
UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount,
|
||||||
|
SecurityDomain: candidateParent.Host.Network.SecurityDomain,
|
||||||
|
Location: candidateParent.Host.Network.Location,
|
||||||
|
Idc: candidateParent.Host.Network.IDC,
|
||||||
|
},
|
||||||
|
Disk: &commonv2.Disk{
|
||||||
|
Total: candidateParent.Host.Disk.Total,
|
||||||
|
Free: candidateParent.Host.Disk.Free,
|
||||||
|
Used: candidateParent.Host.Disk.Used,
|
||||||
|
UsedPercent: candidateParent.Host.Disk.UsedPercent,
|
||||||
|
InodesTotal: candidateParent.Host.Disk.InodesTotal,
|
||||||
|
InodesUsed: candidateParent.Host.Disk.InodesUsed,
|
||||||
|
InodesFree: candidateParent.Host.Disk.InodesFree,
|
||||||
|
InodesUsedPercent: candidateParent.Host.Disk.InodesUsedPercent,
|
||||||
|
},
|
||||||
|
Build: &commonv2.Build{
|
||||||
|
GitVersion: candidateParent.Host.Build.GitVersion,
|
||||||
|
GitCommit: candidateParent.Host.Build.GitCommit,
|
||||||
|
GoVersion: candidateParent.Host.Build.GoVersion,
|
||||||
|
Platform: candidateParent.Host.Build.Platform,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
return &schedulerv2.AnnouncePeerResponse_SmallTaskResponse{
|
||||||
|
SmallTaskResponse: &schedulerv2.SmallTaskResponse{
|
||||||
|
CandidateParent: parent,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConstructSuccessNormalTaskResponse constructs scheduling successful response of the normal task.
|
||||||
|
// Used only in v2 version of the grpc.
|
||||||
|
func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, candidateParents []*resource.Peer) *schedulerv2.AnnouncePeerResponse_NormalTaskResponse {
|
||||||
concurrentPieceCount := config.DefaultPeerConcurrentPieceCount
|
concurrentPieceCount := config.DefaultPeerConcurrentPieceCount
|
||||||
if config, err := dynconfig.GetSchedulerClusterClientConfig(); err == nil && config.ConcurrentPieceCount > 0 {
|
if config, err := dynconfig.GetSchedulerClusterClientConfig(); err == nil && config.ConcurrentPieceCount > 0 {
|
||||||
concurrentPieceCount = int(config.ConcurrentPieceCount)
|
concurrentPieceCount = int(config.ConcurrentPieceCount)
|
||||||
|
|
@ -688,9 +890,9 @@ func constructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// constructSuccessPeerPacket constructs peer successful packet.
|
// ConstructSuccessPeerPacket constructs peer successful packet.
|
||||||
// Used only in v1 version of the grpc.
|
// Used only in v1 version of the grpc.
|
||||||
func constructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *schedulerv1.PeerPacket {
|
func ConstructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *schedulerv1.PeerPacket {
|
||||||
concurrentPieceCount := config.DefaultPeerConcurrentPieceCount
|
concurrentPieceCount := config.DefaultPeerConcurrentPieceCount
|
||||||
if config, err := dynconfig.GetSchedulerClusterClientConfig(); err == nil && config.ConcurrentPieceCount > 0 {
|
if config, err := dynconfig.GetSchedulerClusterClientConfig(); err == nil && config.ConcurrentPieceCount > 0 {
|
||||||
concurrentPieceCount = int(config.ConcurrentPieceCount)
|
concurrentPieceCount = int(config.ConcurrentPieceCount)
|
||||||
|
|
|
||||||
|
|
@ -994,13 +994,420 @@ func TestScheduling_FindCandidateParents(t *testing.T) {
|
||||||
blocklist := set.NewSafeSet[string]()
|
blocklist := set.NewSafeSet[string]()
|
||||||
tc.mock(peer, mockPeers, blocklist, dynconfig.EXPECT())
|
tc.mock(peer, mockPeers, blocklist, dynconfig.EXPECT())
|
||||||
scheduling := New(mockSchedulerConfig, dynconfig, mockPluginDir)
|
scheduling := New(mockSchedulerConfig, dynconfig, mockPluginDir)
|
||||||
parent, found := scheduling.FindCandidateParents(context.Background(), peer, blocklist)
|
parents, found := scheduling.FindCandidateParents(context.Background(), peer, blocklist)
|
||||||
|
tc.expect(t, peer, mockPeers, parents, found)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScheduling_FindSuccessParent(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
mock func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder)
|
||||||
|
expect func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool)
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "task peers is empty",
|
||||||
|
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
||||||
|
peer.FSM.SetState(resource.PeerStateRunning)
|
||||||
|
|
||||||
|
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1)
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.False(ok)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "task contains only one peer and peer is itself",
|
||||||
|
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
||||||
|
peer.FSM.SetState(resource.PeerStateRunning)
|
||||||
|
peer.Task.StorePeer(peer)
|
||||||
|
|
||||||
|
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1)
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.False(ok)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "peer is in blocklist",
|
||||||
|
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
||||||
|
peer.FSM.SetState(resource.PeerStateRunning)
|
||||||
|
peer.Task.StorePeer(peer)
|
||||||
|
peer.Task.StorePeer(mockPeers[0])
|
||||||
|
blocklist.Add(mockPeers[0].ID)
|
||||||
|
|
||||||
|
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1)
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.False(ok)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "peer is bad node",
|
||||||
|
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
||||||
|
peer.FSM.SetState(resource.PeerStateRunning)
|
||||||
|
mockPeers[0].FSM.SetState(resource.PeerStateFailed)
|
||||||
|
peer.Task.StorePeer(peer)
|
||||||
|
peer.Task.StorePeer(mockPeers[0])
|
||||||
|
|
||||||
|
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1)
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.False(ok)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "parent is peer's descendant",
|
||||||
|
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
||||||
|
peer.FSM.SetState(resource.PeerStateRunning)
|
||||||
|
mockPeers[0].FSM.SetState(resource.PeerStateRunning)
|
||||||
|
peer.Task.StorePeer(peer)
|
||||||
|
peer.Task.StorePeer(mockPeers[0])
|
||||||
|
if err := peer.Task.AddPeerEdge(peer, mockPeers[0]); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1)
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.False(ok)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "parent free upload load is zero",
|
||||||
|
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
||||||
|
peer.FSM.SetState(resource.PeerStateRunning)
|
||||||
|
mockPeers[0].FSM.SetState(resource.PeerStateRunning)
|
||||||
|
peer.Task.StorePeer(peer)
|
||||||
|
peer.Task.StorePeer(mockPeers[0])
|
||||||
|
mockPeers[0].Host.ConcurrentUploadLimit.Store(0)
|
||||||
|
|
||||||
|
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1)
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.False(ok)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "find back-to-source parent",
|
||||||
|
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
||||||
|
peer.FSM.SetState(resource.PeerStateRunning)
|
||||||
|
mockPeers[0].FSM.SetState(resource.PeerStateSucceeded)
|
||||||
|
mockPeers[1].FSM.SetState(resource.PeerStateSucceeded)
|
||||||
|
peer.Task.StorePeer(peer)
|
||||||
|
peer.Task.StorePeer(mockPeers[0])
|
||||||
|
peer.Task.StorePeer(mockPeers[1])
|
||||||
|
peer.Task.BackToSourcePeers.Add(mockPeers[0].ID)
|
||||||
|
peer.Task.BackToSourcePeers.Add(mockPeers[1].ID)
|
||||||
|
mockPeers[0].IsBackToSource.Store(true)
|
||||||
|
mockPeers[1].IsBackToSource.Store(true)
|
||||||
|
mockPeers[0].FinishedPieces.Set(0)
|
||||||
|
mockPeers[1].FinishedPieces.Set(0)
|
||||||
|
mockPeers[1].FinishedPieces.Set(1)
|
||||||
|
mockPeers[1].FinishedPieces.Set(2)
|
||||||
|
|
||||||
|
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1)
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.True(ok)
|
||||||
|
assert.Equal(mockPeers[1].ID, parent.ID)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "find seed peer parent",
|
||||||
|
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
||||||
|
peer.FSM.SetState(resource.PeerStateRunning)
|
||||||
|
mockPeers[0].FSM.SetState(resource.PeerStateSucceeded)
|
||||||
|
mockPeers[1].FSM.SetState(resource.PeerStateSucceeded)
|
||||||
|
peer.Task.StorePeer(peer)
|
||||||
|
peer.Task.StorePeer(mockPeers[0])
|
||||||
|
peer.Task.StorePeer(mockPeers[1])
|
||||||
|
peer.Task.StorePeer(mockPeers[2])
|
||||||
|
mockPeers[0].Host.Type = pkgtypes.HostTypeSuperSeed
|
||||||
|
mockPeers[1].Host.Type = pkgtypes.HostTypeSuperSeed
|
||||||
|
mockPeers[0].FinishedPieces.Set(0)
|
||||||
|
mockPeers[1].FinishedPieces.Set(0)
|
||||||
|
mockPeers[1].FinishedPieces.Set(1)
|
||||||
|
mockPeers[1].FinishedPieces.Set(2)
|
||||||
|
|
||||||
|
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1)
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.True(ok)
|
||||||
|
assert.Equal(mockPeers[1].ID, parent.ID)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "find parent with ancestor",
|
||||||
|
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
||||||
|
peer.FSM.SetState(resource.PeerStateRunning)
|
||||||
|
mockPeers[0].FSM.SetState(resource.PeerStateSucceeded)
|
||||||
|
mockPeers[1].FSM.SetState(resource.PeerStateSucceeded)
|
||||||
|
peer.Task.StorePeer(peer)
|
||||||
|
peer.Task.StorePeer(mockPeers[0])
|
||||||
|
peer.Task.StorePeer(mockPeers[1])
|
||||||
|
peer.Task.StorePeer(mockPeers[2])
|
||||||
|
if err := peer.Task.AddPeerEdge(mockPeers[2], mockPeers[0]); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := peer.Task.AddPeerEdge(mockPeers[2], mockPeers[1]); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
mockPeers[0].FinishedPieces.Set(0)
|
||||||
|
mockPeers[1].FinishedPieces.Set(0)
|
||||||
|
mockPeers[1].FinishedPieces.Set(1)
|
||||||
|
mockPeers[1].FinishedPieces.Set(2)
|
||||||
|
|
||||||
|
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1)
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.True(ok)
|
||||||
|
assert.Equal(mockPeers[1].ID, parent.ID)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "find parent with same host",
|
||||||
|
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
||||||
|
peer.FSM.SetState(resource.PeerStateRunning)
|
||||||
|
mockPeers[0].FSM.SetState(resource.PeerStateSucceeded)
|
||||||
|
mockPeers[1].FSM.SetState(resource.PeerStateSucceeded)
|
||||||
|
mockPeers[0].IsBackToSource.Store(true)
|
||||||
|
mockPeers[1].Host = peer.Host
|
||||||
|
peer.Task.StorePeer(peer)
|
||||||
|
peer.Task.StorePeer(mockPeers[0])
|
||||||
|
peer.Task.StorePeer(mockPeers[1])
|
||||||
|
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1)
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.True(ok)
|
||||||
|
assert.Equal(mockPeers[0].ID, parent.ID)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "find parent and fetch filterParentLimit from manager dynconfig",
|
||||||
|
mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) {
|
||||||
|
peer.FSM.SetState(resource.PeerStateRunning)
|
||||||
|
mockPeers[0].FSM.SetState(resource.PeerStateSucceeded)
|
||||||
|
mockPeers[1].FSM.SetState(resource.PeerStateSucceeded)
|
||||||
|
peer.Task.StorePeer(peer)
|
||||||
|
peer.Task.StorePeer(mockPeers[0])
|
||||||
|
peer.Task.StorePeer(mockPeers[1])
|
||||||
|
peer.Task.BackToSourcePeers.Add(mockPeers[0].ID)
|
||||||
|
peer.Task.BackToSourcePeers.Add(mockPeers[1].ID)
|
||||||
|
mockPeers[0].IsBackToSource.Store(true)
|
||||||
|
mockPeers[1].IsBackToSource.Store(true)
|
||||||
|
mockPeers[0].FinishedPieces.Set(0)
|
||||||
|
mockPeers[1].FinishedPieces.Set(0)
|
||||||
|
mockPeers[1].FinishedPieces.Set(1)
|
||||||
|
mockPeers[1].FinishedPieces.Set(2)
|
||||||
|
|
||||||
|
md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{
|
||||||
|
FilterParentLimit: 3,
|
||||||
|
}, nil).Times(1)
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parent *resource.Peer, ok bool) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.True(ok)
|
||||||
|
assert.Contains([]string{mockPeers[0].ID, mockPeers[1].ID, peer.ID}, parent.ID)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
ctl := gomock.NewController(t)
|
||||||
|
defer ctl.Finish()
|
||||||
|
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
|
||||||
|
mockHost := resource.NewHost(
|
||||||
|
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
|
||||||
|
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
|
||||||
|
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
|
||||||
|
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
|
||||||
|
|
||||||
|
var mockPeers []*resource.Peer
|
||||||
|
for i := 0; i < 11; i++ {
|
||||||
|
mockHost := resource.NewHost(
|
||||||
|
idgen.HostIDV2("127.0.0.1", uuid.New().String()), mockRawHost.IP, mockRawHost.Hostname,
|
||||||
|
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
|
||||||
|
peer := resource.NewPeer(idgen.PeerIDV1(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost)
|
||||||
|
mockPeers = append(mockPeers, peer)
|
||||||
|
}
|
||||||
|
|
||||||
|
blocklist := set.NewSafeSet[string]()
|
||||||
|
tc.mock(peer, mockPeers, blocklist, dynconfig.EXPECT())
|
||||||
|
scheduling := New(mockSchedulerConfig, dynconfig, mockPluginDir)
|
||||||
|
parent, found := scheduling.FindSuccessParent(context.Background(), peer, blocklist)
|
||||||
tc.expect(t, peer, mockPeers, parent, found)
|
tc.expect(t, peer, mockPeers, parent, found)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestScheduling_constructSuccessNormalTaskResponse(t *testing.T) {
|
func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
expect func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_SmallTaskResponse, candidateParent *resource.Peer)
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "construct success",
|
||||||
|
expect: func(t *testing.T, resp *schedulerv2.AnnouncePeerResponse_SmallTaskResponse, candidateParent *resource.Peer) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.EqualValues(resp, &schedulerv2.AnnouncePeerResponse_SmallTaskResponse{
|
||||||
|
SmallTaskResponse: &schedulerv2.SmallTaskResponse{
|
||||||
|
CandidateParent: &commonv2.Peer{
|
||||||
|
Id: candidateParent.ID,
|
||||||
|
Range: &commonv2.Range{
|
||||||
|
Start: candidateParent.Range.Start,
|
||||||
|
Length: candidateParent.Range.Length,
|
||||||
|
},
|
||||||
|
Priority: candidateParent.Priority,
|
||||||
|
Pieces: []*commonv2.Piece{
|
||||||
|
{
|
||||||
|
Number: mockPiece.Number,
|
||||||
|
ParentId: mockPiece.ParentID,
|
||||||
|
Offset: mockPiece.Offset,
|
||||||
|
Length: mockPiece.Length,
|
||||||
|
Digest: mockPiece.Digest.String(),
|
||||||
|
TrafficType: mockPiece.TrafficType,
|
||||||
|
Cost: durationpb.New(mockPiece.Cost),
|
||||||
|
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Cost: durationpb.New(candidateParent.Cost.Load()),
|
||||||
|
State: candidateParent.FSM.Current(),
|
||||||
|
Task: &commonv2.Task{
|
||||||
|
Id: candidateParent.Task.ID,
|
||||||
|
Type: candidateParent.Task.Type,
|
||||||
|
Url: candidateParent.Task.URL,
|
||||||
|
Digest: candidateParent.Task.Digest.String(),
|
||||||
|
Tag: candidateParent.Task.Tag,
|
||||||
|
Application: candidateParent.Task.Application,
|
||||||
|
Filters: candidateParent.Task.Filters,
|
||||||
|
Header: candidateParent.Task.Header,
|
||||||
|
PieceLength: candidateParent.Task.PieceLength,
|
||||||
|
ContentLength: candidateParent.Task.ContentLength.Load(),
|
||||||
|
PieceCount: candidateParent.Task.TotalPieceCount.Load(),
|
||||||
|
SizeScope: candidateParent.Task.SizeScope(),
|
||||||
|
Pieces: []*commonv2.Piece{
|
||||||
|
{
|
||||||
|
Number: mockPiece.Number,
|
||||||
|
ParentId: mockPiece.ParentID,
|
||||||
|
Offset: mockPiece.Offset,
|
||||||
|
Length: mockPiece.Length,
|
||||||
|
Digest: mockPiece.Digest.String(),
|
||||||
|
TrafficType: mockPiece.TrafficType,
|
||||||
|
Cost: durationpb.New(mockPiece.Cost),
|
||||||
|
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
State: candidateParent.Task.FSM.Current(),
|
||||||
|
PeerCount: int32(candidateParent.Task.PeerCount()),
|
||||||
|
CreatedAt: timestamppb.New(candidateParent.Task.CreatedAt.Load()),
|
||||||
|
UpdatedAt: timestamppb.New(candidateParent.Task.UpdatedAt.Load()),
|
||||||
|
},
|
||||||
|
Host: &commonv2.Host{
|
||||||
|
Id: candidateParent.Host.ID,
|
||||||
|
Type: uint32(candidateParent.Host.Type),
|
||||||
|
Hostname: candidateParent.Host.Hostname,
|
||||||
|
Ip: candidateParent.Host.IP,
|
||||||
|
Port: candidateParent.Host.Port,
|
||||||
|
DownloadPort: candidateParent.Host.DownloadPort,
|
||||||
|
Os: candidateParent.Host.OS,
|
||||||
|
Platform: candidateParent.Host.Platform,
|
||||||
|
PlatformFamily: candidateParent.Host.PlatformFamily,
|
||||||
|
PlatformVersion: candidateParent.Host.PlatformVersion,
|
||||||
|
KernelVersion: candidateParent.Host.KernelVersion,
|
||||||
|
Cpu: &commonv2.CPU{
|
||||||
|
LogicalCount: candidateParent.Host.CPU.LogicalCount,
|
||||||
|
PhysicalCount: candidateParent.Host.CPU.PhysicalCount,
|
||||||
|
Percent: candidateParent.Host.CPU.Percent,
|
||||||
|
ProcessPercent: candidateParent.Host.CPU.ProcessPercent,
|
||||||
|
Times: &commonv2.CPUTimes{
|
||||||
|
User: candidateParent.Host.CPU.Times.User,
|
||||||
|
System: candidateParent.Host.CPU.Times.System,
|
||||||
|
Idle: candidateParent.Host.CPU.Times.Idle,
|
||||||
|
Nice: candidateParent.Host.CPU.Times.Nice,
|
||||||
|
Iowait: candidateParent.Host.CPU.Times.Iowait,
|
||||||
|
Irq: candidateParent.Host.CPU.Times.Irq,
|
||||||
|
Softirq: candidateParent.Host.CPU.Times.Softirq,
|
||||||
|
Steal: candidateParent.Host.CPU.Times.Steal,
|
||||||
|
Guest: candidateParent.Host.CPU.Times.Guest,
|
||||||
|
GuestNice: candidateParent.Host.CPU.Times.GuestNice,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Memory: &commonv2.Memory{
|
||||||
|
Total: candidateParent.Host.Memory.Total,
|
||||||
|
Available: candidateParent.Host.Memory.Available,
|
||||||
|
Used: candidateParent.Host.Memory.Used,
|
||||||
|
UsedPercent: candidateParent.Host.Memory.UsedPercent,
|
||||||
|
ProcessUsedPercent: candidateParent.Host.Memory.ProcessUsedPercent,
|
||||||
|
Free: candidateParent.Host.Memory.Free,
|
||||||
|
},
|
||||||
|
Network: &commonv2.Network{
|
||||||
|
TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount,
|
||||||
|
UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount,
|
||||||
|
SecurityDomain: candidateParent.Host.Network.SecurityDomain,
|
||||||
|
Location: candidateParent.Host.Network.Location,
|
||||||
|
Idc: candidateParent.Host.Network.IDC,
|
||||||
|
},
|
||||||
|
Disk: &commonv2.Disk{
|
||||||
|
Total: candidateParent.Host.Disk.Total,
|
||||||
|
Free: candidateParent.Host.Disk.Free,
|
||||||
|
Used: candidateParent.Host.Disk.Used,
|
||||||
|
UsedPercent: candidateParent.Host.Disk.UsedPercent,
|
||||||
|
InodesTotal: candidateParent.Host.Disk.InodesTotal,
|
||||||
|
InodesUsed: candidateParent.Host.Disk.InodesUsed,
|
||||||
|
InodesFree: candidateParent.Host.Disk.InodesFree,
|
||||||
|
InodesUsedPercent: candidateParent.Host.Disk.InodesUsedPercent,
|
||||||
|
},
|
||||||
|
Build: &commonv2.Build{
|
||||||
|
GitVersion: candidateParent.Host.Build.GitVersion,
|
||||||
|
GitCommit: candidateParent.Host.Build.GitCommit,
|
||||||
|
GoVersion: candidateParent.Host.Build.GoVersion,
|
||||||
|
Platform: candidateParent.Host.Build.Platform,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
NeedBackToSource: candidateParent.NeedBackToSource.Load(),
|
||||||
|
CreatedAt: timestamppb.New(candidateParent.CreatedAt.Load()),
|
||||||
|
UpdatedAt: timestamppb.New(candidateParent.UpdatedAt.Load()),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
mockHost := resource.NewHost(
|
||||||
|
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
|
||||||
|
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
|
||||||
|
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
|
||||||
|
candidateParent := resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockTask, mockHost, resource.WithRange(nethttp.Range{
|
||||||
|
Start: 1,
|
||||||
|
Length: 10,
|
||||||
|
}))
|
||||||
|
candidateParent.StorePiece(&mockPiece)
|
||||||
|
candidateParent.Task.StorePiece(&mockPiece)
|
||||||
|
|
||||||
|
tc.expect(t, ConstructSuccessSmallTaskResponse(candidateParent), candidateParent)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
mock func(md *configmocks.MockDynconfigInterfaceMockRecorder)
|
mock func(md *configmocks.MockDynconfigInterfaceMockRecorder)
|
||||||
|
|
@ -1293,12 +1700,12 @@ func TestScheduling_constructSuccessNormalTaskResponse(t *testing.T) {
|
||||||
candidateParents[0].Task.StorePiece(&mockPiece)
|
candidateParents[0].Task.StorePiece(&mockPiece)
|
||||||
|
|
||||||
tc.mock(dynconfig.EXPECT())
|
tc.mock(dynconfig.EXPECT())
|
||||||
tc.expect(t, constructSuccessNormalTaskResponse(dynconfig, candidateParents), candidateParents)
|
tc.expect(t, ConstructSuccessNormalTaskResponse(dynconfig, candidateParents), candidateParents)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestScheduling_constructSuccessPeerPacket(t *testing.T) {
|
func TestScheduling_ConstructSuccessPeerPacket(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
mock func(md *configmocks.MockDynconfigInterfaceMockRecorder)
|
mock func(md *configmocks.MockDynconfigInterfaceMockRecorder)
|
||||||
|
|
@ -1377,7 +1784,7 @@ func TestScheduling_constructSuccessPeerPacket(t *testing.T) {
|
||||||
candidateParents := []*resource.Peer{resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockTask, mockHost)}
|
candidateParents := []*resource.Peer{resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockTask, mockHost)}
|
||||||
|
|
||||||
tc.mock(dynconfig.EXPECT())
|
tc.mock(dynconfig.EXPECT())
|
||||||
tc.expect(t, constructSuccessPeerPacket(dynconfig, peer, parent, candidateParents), parent, candidateParents)
|
tc.expect(t, ConstructSuccessPeerPacket(dynconfig, peer, parent, candidateParents), parent, candidateParents)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,6 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
commonv1 "d7y.io/api/pkg/apis/common/v1"
|
commonv1 "d7y.io/api/pkg/apis/common/v1"
|
||||||
|
|
@ -673,7 +672,7 @@ func (v *V1) triggerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest,
|
||||||
priority = req.UrlMeta.Priority
|
priority = req.UrlMeta.Priority
|
||||||
} else {
|
} else {
|
||||||
// Compatible with v1 version of priority enum.
|
// Compatible with v1 version of priority enum.
|
||||||
priority = types.PriorityV2ToV1(peer.GetPriority(dynconfig))
|
priority = types.PriorityV2ToV1(peer.CalculatePriority(dynconfig))
|
||||||
}
|
}
|
||||||
peer.Log.Infof("peer priority is %d", priority)
|
peer.Log.Infof("peer priority is %d", priority)
|
||||||
|
|
||||||
|
|
@ -714,8 +713,6 @@ func (v *V1) triggerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest,
|
||||||
|
|
||||||
// triggerSeedPeerTask starts to trigger seed peer task.
|
// triggerSeedPeerTask starts to trigger seed peer task.
|
||||||
func (v *V1) triggerSeedPeerTask(ctx context.Context, rg *http.Range, task *resource.Task) {
|
func (v *V1) triggerSeedPeerTask(ctx context.Context, rg *http.Range, task *resource.Task) {
|
||||||
ctx = trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx))
|
|
||||||
|
|
||||||
task.Log.Info("trigger seed peer")
|
task.Log.Info("trigger seed peer")
|
||||||
peer, endOfPiece, err := v.resource.SeedPeer().TriggerTask(ctx, rg, task)
|
peer, endOfPiece, err := v.resource.SeedPeer().TriggerTask(ctx, rg, task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,9 @@ import (
|
||||||
schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2"
|
schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2"
|
||||||
|
|
||||||
logger "d7y.io/dragonfly/v2/internal/dflog"
|
logger "d7y.io/dragonfly/v2/internal/dflog"
|
||||||
|
"d7y.io/dragonfly/v2/pkg/container/set"
|
||||||
|
"d7y.io/dragonfly/v2/pkg/digest"
|
||||||
|
"d7y.io/dragonfly/v2/pkg/net/http"
|
||||||
"d7y.io/dragonfly/v2/pkg/types"
|
"d7y.io/dragonfly/v2/pkg/types"
|
||||||
"d7y.io/dragonfly/v2/scheduler/config"
|
"d7y.io/dragonfly/v2/scheduler/config"
|
||||||
"d7y.io/dragonfly/v2/scheduler/resource"
|
"d7y.io/dragonfly/v2/scheduler/resource"
|
||||||
|
|
@ -100,37 +103,46 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
|
||||||
switch announcePeerRequest := req.GetRequest().(type) {
|
switch announcePeerRequest := req.GetRequest().(type) {
|
||||||
case *schedulerv2.AnnouncePeerRequest_RegisterPeerRequest:
|
case *schedulerv2.AnnouncePeerRequest_RegisterPeerRequest:
|
||||||
logger.Infof("receive AnnouncePeerRequest_RegisterPeerRequest: %#v", announcePeerRequest.RegisterPeerRequest.Download)
|
logger.Infof("receive AnnouncePeerRequest_RegisterPeerRequest: %#v", announcePeerRequest.RegisterPeerRequest.Download)
|
||||||
if err := v.handleRegisterPeerRequest(req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterPeerRequest); err != nil {
|
if err := v.handleRegisterPeerRequest(ctx, stream, req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterPeerRequest); err != nil {
|
||||||
logger.Error(err)
|
logger.Error(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
case *schedulerv2.AnnouncePeerRequest_RegisterSeedPeerRequest:
|
||||||
|
logger.Infof("receive AnnouncePeerRequest_RegisterSeedPeerRequest: %#v", announcePeerRequest.RegisterSeedPeerRequest.Download)
|
||||||
|
v.handleRegisterSeedPeerRequest(ctx, req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterSeedPeerRequest)
|
||||||
case *schedulerv2.AnnouncePeerRequest_DownloadPeerStartedRequest:
|
case *schedulerv2.AnnouncePeerRequest_DownloadPeerStartedRequest:
|
||||||
logger.Infof("receive AnnouncePeerRequest_DownloadPeerStartedRequest: %#v", announcePeerRequest.DownloadPeerStartedRequest)
|
logger.Infof("receive AnnouncePeerRequest_DownloadPeerStartedRequest: %#v", announcePeerRequest.DownloadPeerStartedRequest)
|
||||||
v.handleDownloadPeerStartedRequest(announcePeerRequest.DownloadPeerStartedRequest)
|
v.handleDownloadPeerStartedRequest(ctx, announcePeerRequest.DownloadPeerStartedRequest)
|
||||||
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest:
|
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest:
|
||||||
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceStartedRequest)
|
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceStartedRequest)
|
||||||
v.handleDownloadPeerBackToSourceStartedRequest(announcePeerRequest.DownloadPeerBackToSourceStartedRequest)
|
v.handleDownloadPeerBackToSourceStartedRequest(ctx, announcePeerRequest.DownloadPeerBackToSourceStartedRequest)
|
||||||
|
case *schedulerv2.AnnouncePeerRequest_DownloadSeedPeerBackToSourceStartedRequest:
|
||||||
|
logger.Infof("receive AnnouncePeerRequest_DownloadSeedPeerBackToSourceStartedRequest: %#v", announcePeerRequest.DownloadSeedPeerBackToSourceStartedRequest)
|
||||||
|
v.handleDownloadSeedPeerBackToSourceStartedRequest(ctx, announcePeerRequest.DownloadSeedPeerBackToSourceStartedRequest)
|
||||||
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
|
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
|
||||||
logger.Infof("receive AnnouncePeerRequest_DownloadPeerFinishedRequest: %#v", announcePeerRequest.DownloadPeerFinishedRequest)
|
logger.Infof("receive AnnouncePeerRequest_DownloadPeerFinishedRequest: %#v", announcePeerRequest.DownloadPeerFinishedRequest)
|
||||||
v.handleDownloadPeerFinishedRequest(announcePeerRequest.DownloadPeerFinishedRequest)
|
v.handleDownloadPeerFinishedRequest(ctx, announcePeerRequest.DownloadPeerFinishedRequest)
|
||||||
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest:
|
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest:
|
||||||
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceFinishedRequest)
|
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceFinishedRequest)
|
||||||
v.handleDownloadPeerBackToSourceFinishedRequest(announcePeerRequest.DownloadPeerBackToSourceFinishedRequest)
|
v.handleDownloadPeerBackToSourceFinishedRequest(ctx, announcePeerRequest.DownloadPeerBackToSourceFinishedRequest)
|
||||||
|
case *schedulerv2.AnnouncePeerRequest_DownloadSeedPeerBackToSourceFinishedRequest:
|
||||||
|
logger.Infof("receive AnnouncePeerRequest_DownloadSeedPeerBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadSeedPeerBackToSourceFinishedRequest)
|
||||||
|
v.handleDownloadSeedPeerBackToSourceFinishedRequest(ctx, announcePeerRequest.DownloadSeedPeerBackToSourceFinishedRequest)
|
||||||
case *schedulerv2.AnnouncePeerRequest_DownloadPieceFinishedRequest:
|
case *schedulerv2.AnnouncePeerRequest_DownloadPieceFinishedRequest:
|
||||||
logger.Infof("receive AnnouncePeerRequest_DownloadPieceFinishedRequest: %#v", announcePeerRequest.DownloadPieceFinishedRequest)
|
logger.Infof("receive AnnouncePeerRequest_DownloadPieceFinishedRequest: %#v", announcePeerRequest.DownloadPieceFinishedRequest)
|
||||||
v.handleDownloadPieceFinishedRequest(announcePeerRequest.DownloadPieceFinishedRequest)
|
v.handleDownloadPieceFinishedRequest(ctx, announcePeerRequest.DownloadPieceFinishedRequest)
|
||||||
case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest:
|
case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest:
|
||||||
logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFinishedRequest)
|
logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFinishedRequest)
|
||||||
v.handleDownloadPieceBackToSourceFinishedRequest(announcePeerRequest.DownloadPieceBackToSourceFinishedRequest)
|
v.handleDownloadPieceBackToSourceFinishedRequest(ctx, announcePeerRequest.DownloadPieceBackToSourceFinishedRequest)
|
||||||
case *schedulerv2.AnnouncePeerRequest_DownloadPieceFailedRequest:
|
case *schedulerv2.AnnouncePeerRequest_DownloadPieceFailedRequest:
|
||||||
logger.Infof("receive AnnouncePeerRequest_DownloadPieceFailedRequest: %#v", announcePeerRequest.DownloadPieceFailedRequest)
|
logger.Infof("receive AnnouncePeerRequest_DownloadPieceFailedRequest: %#v", announcePeerRequest.DownloadPieceFailedRequest)
|
||||||
v.handleDownloadPieceFailedRequest(announcePeerRequest.DownloadPieceFailedRequest)
|
v.handleDownloadPieceFailedRequest(ctx, announcePeerRequest.DownloadPieceFailedRequest)
|
||||||
case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest:
|
case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest:
|
||||||
logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFailedRequest)
|
logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFailedRequest)
|
||||||
v.handleDownloadPieceBackToSourceFailedRequest(announcePeerRequest.DownloadPieceBackToSourceFailedRequest)
|
v.handleDownloadPieceBackToSourceFailedRequest(ctx, announcePeerRequest.DownloadPieceBackToSourceFailedRequest)
|
||||||
case *schedulerv2.AnnouncePeerRequest_SyncPiecesFailedRequest:
|
case *schedulerv2.AnnouncePeerRequest_SyncPiecesFailedRequest:
|
||||||
logger.Infof("receive AnnouncePeerRequest_SyncPiecesFailedRequest: %#v", announcePeerRequest.SyncPiecesFailedRequest)
|
logger.Infof("receive AnnouncePeerRequest_SyncPiecesFailedRequest: %#v", announcePeerRequest.SyncPiecesFailedRequest)
|
||||||
v.handleSyncPiecesFailedRequest(announcePeerRequest.SyncPiecesFailedRequest)
|
v.handleSyncPiecesFailedRequest(ctx, announcePeerRequest.SyncPiecesFailedRequest)
|
||||||
default:
|
default:
|
||||||
msg := fmt.Sprintf("receive unknow request: %#v", announcePeerRequest)
|
msg := fmt.Sprintf("receive unknow request: %#v", announcePeerRequest)
|
||||||
logger.Error(msg)
|
logger.Error(msg)
|
||||||
|
|
@ -140,54 +152,63 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Implement function.
|
// TODO Implement function.
|
||||||
// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
|
// handleRegisterSeedPeerRequest handles RegisterSeedPeerRequest of AnnouncePeerRequest.
|
||||||
func (v *V2) handleRegisterPeerRequest(hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
|
func (v *V2) handleRegisterSeedPeerRequest(ctx context.Context, hostID, taskID, peerID string, req *schedulerv2.RegisterSeedPeerRequest) {
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Implement function.
|
// TODO Implement function.
|
||||||
// handleDownloadPeerStartedRequest handles DownloadPeerStartedRequest of AnnouncePeerRequest.
|
// handleDownloadPeerStartedRequest handles DownloadPeerStartedRequest of AnnouncePeerRequest.
|
||||||
func (v *V2) handleDownloadPeerStartedRequest(req *schedulerv2.DownloadPeerStartedRequest) {
|
func (v *V2) handleDownloadPeerStartedRequest(ctx context.Context, req *schedulerv2.DownloadPeerStartedRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Implement function.
|
// TODO Implement function.
|
||||||
// handleDownloadPeerBackToSourceStartedRequest handles DownloadPeerBackToSourceStartedRequest of AnnouncePeerRequest.
|
// handleDownloadPeerBackToSourceStartedRequest handles DownloadPeerBackToSourceStartedRequest of AnnouncePeerRequest.
|
||||||
func (v *V2) handleDownloadPeerBackToSourceStartedRequest(req *schedulerv2.DownloadPeerBackToSourceStartedRequest) {
|
func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, req *schedulerv2.DownloadPeerBackToSourceStartedRequest) {
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO Implement function.
|
||||||
|
// handleDownloadSeedPeerBackToSourceStartedRequest handles DownloadSeedPeerBackToSourceStartedRequest of AnnouncePeerRequest.
|
||||||
|
func (v *V2) handleDownloadSeedPeerBackToSourceStartedRequest(ctx context.Context, req *schedulerv2.DownloadSeedPeerBackToSourceStartedRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Implement function.
|
// TODO Implement function.
|
||||||
// handleDownloadPeerFinishedRequest handles DownloadPeerFinishedRequest of AnnouncePeerRequest.
|
// handleDownloadPeerFinishedRequest handles DownloadPeerFinishedRequest of AnnouncePeerRequest.
|
||||||
func (v *V2) handleDownloadPeerFinishedRequest(req *schedulerv2.DownloadPeerFinishedRequest) {
|
func (v *V2) handleDownloadPeerFinishedRequest(ctx context.Context, req *schedulerv2.DownloadPeerFinishedRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Implement function.
|
// TODO Implement function.
|
||||||
// handleDownloadPeerBackToSourceFinishedRequest handles DownloadPeerBackToSourceFinishedRequest of AnnouncePeerRequest.
|
// handleDownloadPeerBackToSourceFinishedRequest handles DownloadPeerBackToSourceFinishedRequest of AnnouncePeerRequest.
|
||||||
func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(req *schedulerv2.DownloadPeerBackToSourceFinishedRequest) {
|
func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(ctx context.Context, req *schedulerv2.DownloadPeerBackToSourceFinishedRequest) {
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO Implement function.
|
||||||
|
// handleDownloadSeedPeerBackToSourceFinishedRequest handles DownloadSeedPeerBackToSourceFinishedRequest of AnnouncePeerRequest.
|
||||||
|
func (v *V2) handleDownloadSeedPeerBackToSourceFinishedRequest(ctx context.Context, req *schedulerv2.DownloadSeedPeerBackToSourceFinishedRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Implement function.
|
// TODO Implement function.
|
||||||
// handleDownloadPieceFinishedRequest handles DownloadPieceFinishedRequest of AnnouncePeerRequest.
|
// handleDownloadPieceFinishedRequest handles DownloadPieceFinishedRequest of AnnouncePeerRequest.
|
||||||
func (v *V2) handleDownloadPieceFinishedRequest(req *schedulerv2.DownloadPieceFinishedRequest) {
|
func (v *V2) handleDownloadPieceFinishedRequest(ctx context.Context, req *schedulerv2.DownloadPieceFinishedRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Implement function.
|
// TODO Implement function.
|
||||||
// handleDownloadPieceBackToSourceFinishedRequest handles DownloadPieceBackToSourceFinishedRequest of AnnouncePeerRequest.
|
// handleDownloadPieceBackToSourceFinishedRequest handles DownloadPieceBackToSourceFinishedRequest of AnnouncePeerRequest.
|
||||||
func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(req *schedulerv2.DownloadPieceBackToSourceFinishedRequest) {
|
func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(ctx context.Context, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Implement function.
|
// TODO Implement function.
|
||||||
// handleDownloadPieceFailedRequest handles DownloadPieceFailedRequest of AnnouncePeerRequest.
|
// handleDownloadPieceFailedRequest handles DownloadPieceFailedRequest of AnnouncePeerRequest.
|
||||||
func (v *V2) handleDownloadPieceFailedRequest(req *schedulerv2.DownloadPieceFailedRequest) {
|
func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, req *schedulerv2.DownloadPieceFailedRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Implement function.
|
// TODO Implement function.
|
||||||
// handleDownloadPieceBackToSourceFailedRequest handles DownloadPieceBackToSourceFailedRequest of AnnouncePeerRequest.
|
// handleDownloadPieceBackToSourceFailedRequest handles DownloadPieceBackToSourceFailedRequest of AnnouncePeerRequest.
|
||||||
func (v *V2) handleDownloadPieceBackToSourceFailedRequest(req *schedulerv2.DownloadPieceBackToSourceFailedRequest) {
|
func (v *V2) handleDownloadPieceBackToSourceFailedRequest(ctx context.Context, req *schedulerv2.DownloadPieceBackToSourceFailedRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Implement function.
|
// TODO Implement function.
|
||||||
// handleSyncPiecesFailedRequest handles SyncPiecesFailedRequest of AnnouncePeerRequest.
|
// handleSyncPiecesFailedRequest handles SyncPiecesFailedRequest of AnnouncePeerRequest.
|
||||||
func (v *V2) handleSyncPiecesFailedRequest(req *schedulerv2.SyncPiecesFailedRequest) {
|
func (v *V2) handleSyncPiecesFailedRequest(ctx context.Context, req *schedulerv2.SyncPiecesFailedRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// StatPeer checks information of peer.
|
// StatPeer checks information of peer.
|
||||||
|
|
@ -644,3 +665,244 @@ func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) e
|
||||||
host.LeavePeers()
|
host.LeavePeers()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
|
||||||
|
func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
|
||||||
|
// Handle resource included host, task, and peer.
|
||||||
|
_, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req)
|
||||||
|
if err != nil {
|
||||||
|
return status.Error(codes.FailedPrecondition, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// When there are no available peers for a task, the scheduler needs to trigger
|
||||||
|
// the first task download in the p2p cluster.
|
||||||
|
blocklist := set.NewSafeSet[string]()
|
||||||
|
blocklist.Add(peer.ID)
|
||||||
|
if !task.HasAvailablePeer(blocklist) {
|
||||||
|
if err := v.downloadTaskBySeedPeer(ctx, peer); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Provide different scheduling strategies for different task type.
|
||||||
|
sizeScope := task.SizeScope()
|
||||||
|
switch sizeScope {
|
||||||
|
case commonv2.SizeScope_EMPTY:
|
||||||
|
// Return an EmptyTaskResponse directly.
|
||||||
|
peer.Log.Info("scheduling as SizeScope_EMPTY")
|
||||||
|
stream, loaded := peer.LoadAnnouncePeerStream()
|
||||||
|
if !loaded {
|
||||||
|
return status.Error(codes.NotFound, "AnnouncePeerStream not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := peer.FSM.Event(ctx, resource.PeerEventRegisterEmpty); err != nil {
|
||||||
|
return status.Errorf(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
|
||||||
|
Response: &schedulerv2.AnnouncePeerResponse_EmptyTaskResponse{
|
||||||
|
EmptyTaskResponse: &schedulerv2.EmptyTaskResponse{},
|
||||||
|
},
|
||||||
|
}); err != nil {
|
||||||
|
peer.Log.Error(err)
|
||||||
|
return status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
case commonv2.SizeScope_TINY:
|
||||||
|
// If the task.DirectPiece of the task can be reused, the data of
|
||||||
|
// the task will be included in the TinyTaskResponse.
|
||||||
|
// If the task.DirectPiece cannot be reused,
|
||||||
|
// it will be scheduled as a Normal Task.
|
||||||
|
peer.Log.Info("scheduling as SizeScope_TINY")
|
||||||
|
if !peer.Task.CanReuseDirectPiece() {
|
||||||
|
peer.Log.Warnf("can not reuse direct piece %d %d", len(task.DirectPiece), task.ContentLength.Load())
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, loaded := peer.LoadAnnouncePeerStream()
|
||||||
|
if !loaded {
|
||||||
|
return status.Error(codes.NotFound, "AnnouncePeerStream not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := peer.FSM.Event(ctx, resource.PeerEventRegisterTiny); err != nil {
|
||||||
|
return status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
|
||||||
|
Response: &schedulerv2.AnnouncePeerResponse_TinyTaskResponse{
|
||||||
|
TinyTaskResponse: &schedulerv2.TinyTaskResponse{
|
||||||
|
Data: peer.Task.DirectPiece,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}); err != nil {
|
||||||
|
return status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
case commonv2.SizeScope_SMALL:
|
||||||
|
// If a parent with the state of PeerStateSucceeded can be found in the task,
|
||||||
|
// its information will be returned. If a parent with the state of
|
||||||
|
// PeerStateSucceeded cannot be found in the task,
|
||||||
|
// it will be scheduled as a Normal Task.
|
||||||
|
peer.Log.Info("scheduling as SizeScope_SMALL")
|
||||||
|
parent, found := v.scheduling.FindSuccessParent(ctx, peer, set.NewSafeSet[string]())
|
||||||
|
if !found {
|
||||||
|
peer.Log.Warn("candidate parents not found")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete inedges of peer.
|
||||||
|
if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil {
|
||||||
|
return status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add edges between success parent and peer.
|
||||||
|
if err := peer.Task.AddPeerEdge(parent, peer); err != nil {
|
||||||
|
return status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, loaded := peer.LoadAnnouncePeerStream()
|
||||||
|
if !loaded {
|
||||||
|
return status.Error(codes.NotFound, "AnnouncePeerStream not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := peer.FSM.Event(ctx, resource.PeerEventRegisterSmall); err != nil {
|
||||||
|
return status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stream.Send(&schedulerv2.AnnouncePeerResponse{
|
||||||
|
Response: scheduling.ConstructSuccessSmallTaskResponse(parent),
|
||||||
|
}); err != nil {
|
||||||
|
return status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
case commonv2.SizeScope_NORMAL, commonv2.SizeScope_UNKNOW:
|
||||||
|
default:
|
||||||
|
return status.Errorf(codes.FailedPrecondition, "invalid size cope %#v", sizeScope)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scheduling as a normal task, it will control how peers download tasks
|
||||||
|
// based on RetryLimit and RetryBackToSourceLimit configurations.
|
||||||
|
peer.Log.Info("scheduling as SizeScope_NORMAL")
|
||||||
|
if err := peer.FSM.Event(ctx, resource.PeerEventRegisterNormal); err != nil {
|
||||||
|
return status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := v.scheduling.ScheduleCandidateParents(ctx, peer, set.NewSafeSet[string]()); err != nil {
|
||||||
|
return status.Error(codes.FailedPrecondition, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleResource handles resource included host, task, and peer.
|
||||||
|
func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) (*resource.Host, *resource.Task, *resource.Peer, error) {
|
||||||
|
// If the host does not exist and the host address cannot be found,
|
||||||
|
// it may cause an exception.
|
||||||
|
host, loaded := v.resource.HostManager().Load(hostID)
|
||||||
|
if !loaded {
|
||||||
|
return nil, nil, nil, fmt.Errorf("host %s not found", hostID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store new task or update task.
|
||||||
|
task, loaded := v.resource.TaskManager().Load(taskID)
|
||||||
|
if !loaded {
|
||||||
|
options := []resource.TaskOption{resource.WithPieceLength(req.Download.PieceLength)}
|
||||||
|
if req.Download.Digest != "" {
|
||||||
|
d, err := digest.Parse(req.Download.Digest)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, fmt.Errorf("invalid digest %s", req.Download.Digest)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If request has invalid digest, then new task with the nil digest.
|
||||||
|
options = append(options, resource.WithDigest(d))
|
||||||
|
}
|
||||||
|
|
||||||
|
task = resource.NewTask(taskID, req.Download.Url, req.Download.Tag, req.Download.Application, req.Download.Type,
|
||||||
|
req.Download.Filters, req.Download.Header, int32(v.config.Scheduler.BackToSourceCount), options...)
|
||||||
|
v.resource.TaskManager().Store(task)
|
||||||
|
} else {
|
||||||
|
task.URL = req.Download.Url
|
||||||
|
task.Filters = req.Download.Filters
|
||||||
|
task.Header = req.Download.Header
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store new peer or load peer.
|
||||||
|
peer, loaded := v.resource.PeerManager().Load(peerID)
|
||||||
|
if !loaded {
|
||||||
|
options := []resource.PeerOption{resource.WithPriority(req.Download.Priority), resource.WithAnnouncePeerStream(stream)}
|
||||||
|
if req.Download.Range != nil {
|
||||||
|
options = append(options, resource.WithRange(http.Range{Start: req.Download.Range.Start, Length: req.Download.Range.Length}))
|
||||||
|
}
|
||||||
|
|
||||||
|
peer = resource.NewPeer(peerID, task, host, options...)
|
||||||
|
v.resource.PeerManager().Store(peer)
|
||||||
|
}
|
||||||
|
|
||||||
|
return host, task, peer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// downloadTaskBySeedPeer downloads task by seed peer.
|
||||||
|
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, peer *resource.Peer) error {
|
||||||
|
// Trigger the first download task based on different priority levels,
|
||||||
|
// refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74.
|
||||||
|
priority := peer.CalculatePriority(v.dynconfig)
|
||||||
|
peer.Log.Infof("peer priority is %s", priority.String())
|
||||||
|
switch priority {
|
||||||
|
case commonv2.Priority_LEVEL6, commonv2.Priority_LEVEL0:
|
||||||
|
// Super peer is first triggered to back-to-source.
|
||||||
|
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
|
||||||
|
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
|
||||||
|
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
|
||||||
|
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}(ctx, peer, types.HostTypeSuperSeed)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
fallthrough
|
||||||
|
case commonv2.Priority_LEVEL5:
|
||||||
|
// Strong peer is first triggered to back-to-source.
|
||||||
|
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
|
||||||
|
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
|
||||||
|
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
|
||||||
|
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}(ctx, peer, types.HostTypeStrongSeed)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
fallthrough
|
||||||
|
case commonv2.Priority_LEVEL4:
|
||||||
|
// Weak peer is first triggered to back-to-source.
|
||||||
|
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
|
||||||
|
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
|
||||||
|
if err := v.resource.SeedPeer().DownloadTask(context.Background(), peer.Task, hostType); err != nil {
|
||||||
|
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}(ctx, peer, types.HostTypeWeakSeed)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
fallthrough
|
||||||
|
case commonv2.Priority_LEVEL3:
|
||||||
|
// When the task is downloaded for the first time,
|
||||||
|
// the normal peer is first to download back-to-source.
|
||||||
|
peer.NeedBackToSource.Store(true)
|
||||||
|
case commonv2.Priority_LEVEL2:
|
||||||
|
// Peer is first to download back-to-source.
|
||||||
|
return status.Errorf(codes.NotFound, "%s peer not found candidate peers", commonv2.Priority_LEVEL2.String())
|
||||||
|
case commonv2.Priority_LEVEL1:
|
||||||
|
// Download task is forbidden.
|
||||||
|
return status.Errorf(codes.FailedPrecondition, "%s peer is forbidden", commonv2.Priority_LEVEL1.String())
|
||||||
|
default:
|
||||||
|
return status.Errorf(codes.InvalidArgument, "invalid priority %#v", priority)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue