feat: implement trigger download task by seed peer in v2 (#2957)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-12-19 16:18:02 +08:00 committed by GitHub
parent 003376c6d3
commit ec66fd1229
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 287 additions and 95 deletions

2
go.mod
View File

@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21
require (
d7y.io/api/v2 v2.0.64
d7y.io/api/v2 v2.0.67
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0

4
go.sum
View File

@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.64 h1:2mdQ0maJZZgogfQHoCyzi1TBczyby1WeyFau13ywmDw=
d7y.io/api/v2 v2.0.64/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
d7y.io/api/v2 v2.0.67 h1:4fiGXT1WHWgRXSTmnP53MU83Zbf+7i1jYeGNEJWrM7Q=
d7y.io/api/v2 v2.0.67/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=

View File

@ -26,19 +26,29 @@ import (
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
logger "d7y.io/dragonfly/v2/internal/dflog"
pkgbalancer "d7y.io/dragonfly/v2/pkg/balancer"
"d7y.io/dragonfly/v2/pkg/resolver"
"d7y.io/dragonfly/v2/pkg/rpc"
"d7y.io/dragonfly/v2/scheduler/config"
)
// GetV2 returns v2 version of the dfdaemon client.
func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, error) {
func GetV2(ctx context.Context, dynconfig config.DynconfigInterface, opts ...grpc.DialOption) (V2, error) {
// Register resolver and balancer.
resolver.RegisterSeedPeer(dynconfig)
builder, pickerBuilder := pkgbalancer.NewConsistentHashingBuilder()
balancer.Register(builder)
conn, err := grpc.DialContext(
ctx,
target,
resolver.SeedPeerVirtualTarget,
append([]grpc.DialOption{
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.OTELUnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
@ -47,11 +57,13 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err
grpc_retry.WithMax(maxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)),
),
rpc.RefresherUnaryClientInterceptor(dynconfig),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.OTELStreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
rpc.RefresherStreamClientInterceptor(dynconfig),
)),
}, opts...)...,
)
@ -62,6 +74,7 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err
return &v2{
DfdaemonUploadClient: dfdaemonv2.NewDfdaemonUploadClient(conn),
ClientConn: conn,
ConsistentHashingPickerBuilder: pickerBuilder,
}, nil
}
@ -73,8 +86,8 @@ type V2 interface {
// DownloadPiece downloads piece from the other peer.
DownloadPiece(context.Context, *dfdaemonv2.DownloadPieceRequest, ...grpc.CallOption) (*dfdaemonv2.DownloadPieceResponse, error)
// DownloadTask downloads task from the other peer.
DownloadTask(context.Context, *dfdaemonv2.DownloadTaskRequest, ...grpc.CallOption) error
// TriggerDownloadTask triggers download task from the other peer.
TriggerDownloadTask(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest, ...grpc.CallOption) (*dfdaemonv2.TriggerDownloadTaskResponse, error)
// Close tears down the ClientConn and all underlying connections.
Close() error
@ -84,6 +97,7 @@ type V2 interface {
type v2 struct {
dfdaemonv2.DfdaemonUploadClient
*grpc.ClientConn
*pkgbalancer.ConsistentHashingPickerBuilder
}
// SyncPieces syncs pieces from the other peers.
@ -110,16 +124,14 @@ func (v *v2) DownloadPiece(ctx context.Context, req *dfdaemonv2.DownloadPieceReq
)
}
// DownloadTask downloads task from the other peer.
func (v *v2) DownloadTask(ctx context.Context, req *dfdaemonv2.DownloadTaskRequest, opts ...grpc.CallOption) error {
// TriggerDownloadTask triggers download task from the other peer.
func (v *v2) TriggerDownloadTask(ctx context.Context, req *dfdaemonv2.TriggerDownloadTaskRequest, opts ...grpc.CallOption) (*dfdaemonv2.TriggerDownloadTaskResponse, error) {
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()
_, err := v.DfdaemonUploadClient.DownloadTask(
return v.DfdaemonUploadClient.TriggerDownloadTask(
ctx,
req,
opts...,
)
return err
}

View File

@ -74,25 +74,6 @@ func (mr *MockV2MockRecorder) DownloadPiece(arg0, arg1 any, arg2 ...any) *gomock
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadPiece", reflect.TypeOf((*MockV2)(nil).DownloadPiece), varargs...)
}
// DownloadTask mocks base method.
func (m *MockV2) DownloadTask(arg0 context.Context, arg1 *dfdaemon.DownloadTaskRequest, arg2 ...grpc.CallOption) error {
m.ctrl.T.Helper()
varargs := []any{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "DownloadTask", varargs...)
ret0, _ := ret[0].(error)
return ret0
}
// DownloadTask indicates an expected call of DownloadTask.
func (mr *MockV2MockRecorder) DownloadTask(arg0, arg1 any, arg2 ...any) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]any{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockV2)(nil).DownloadTask), varargs...)
}
// SyncPieces mocks base method.
func (m *MockV2) SyncPieces(arg0 context.Context, arg1 *dfdaemon.SyncPiecesRequest, arg2 ...grpc.CallOption) (dfdaemon.DfdaemonUpload_SyncPiecesClient, error) {
m.ctrl.T.Helper()
@ -112,3 +93,23 @@ func (mr *MockV2MockRecorder) SyncPieces(arg0, arg1 any, arg2 ...any) *gomock.Ca
varargs := append([]any{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieces", reflect.TypeOf((*MockV2)(nil).SyncPieces), varargs...)
}
// TriggerDownloadTask mocks base method.
func (m *MockV2) TriggerDownloadTask(arg0 context.Context, arg1 *dfdaemon.TriggerDownloadTaskRequest, arg2 ...grpc.CallOption) (*dfdaemon.TriggerDownloadTaskResponse, error) {
m.ctrl.T.Helper()
varargs := []any{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "TriggerDownloadTask", varargs...)
ret0, _ := ret[0].(*dfdaemon.TriggerDownloadTaskResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// TriggerDownloadTask indicates an expected call of TriggerDownloadTask.
func (mr *MockV2MockRecorder) TriggerDownloadTask(arg0, arg1 any, arg2 ...any) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]any{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerDownloadTask", reflect.TypeOf((*MockV2)(nil).TriggerDownloadTask), varargs...)
}

View File

@ -120,7 +120,7 @@ func New(cfg *config.Config, gc gc.GC, dynconfig config.DynconfigInterface, opti
return nil, err
}
resource.seedPeer = newSeedPeer(&cfg.Resource, client, peerManager, hostManager)
resource.seedPeer = newSeedPeer(cfg, client, peerManager, hostManager)
}
return resource, nil

View File

@ -57,6 +57,8 @@ func TestResource_New(t *testing.T) {
md.Register(gomock.Any()).Return().Times(1),
md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
md.Register(gomock.Any()).Return().Times(1),
md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
md.Register(gomock.Any()).Return().Times(1),
)
},
expect: func(t *testing.T, resource Resource, err error) {
@ -132,6 +134,8 @@ func TestResource_New(t *testing.T) {
md.Register(gomock.Any()).Return().Times(1),
md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
md.Register(gomock.Any()).Return().Times(1),
md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
md.Register(gomock.Any()).Return().Times(1),
)
},
expect: func(t *testing.T, resource Resource, err error) {

View File

@ -24,9 +24,12 @@ import (
"strings"
"time"
"go.opentelemetry.io/otel/trace"
cdnsystemv1 "d7y.io/api/v2/pkg/apis/cdnsystem/v1"
commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
"d7y.io/dragonfly/v2/pkg/digest"
@ -44,9 +47,9 @@ const (
// SeedPeer is the interface used for seed peer.
type SeedPeer interface {
// DownloadTask downloads task back-to-source.
// TriggerDownloadTask triggers the seed peer to download task.
// Used only in v2 version of the grpc.
DownloadTask(context.Context, *commonv2.Download) error
TriggerDownloadTask(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) (*dfdaemonv2.TriggerDownloadTaskResponse, error)
// TriggerTask triggers the seed peer to download task.
// Used only in v1 version of the grpc.
@ -62,7 +65,7 @@ type SeedPeer interface {
// seedPeer contains content for seed peer.
type seedPeer struct {
// config is the config of resource.
config *config.ResourceConfig
config *config.Config
// client is the dynamic client of seed peer.
client SeedPeerClient
@ -75,7 +78,7 @@ type seedPeer struct {
}
// New SeedPeer interface.
func newSeedPeer(cfg *config.ResourceConfig, client SeedPeerClient, peerManager PeerManager, hostManager HostManager) SeedPeer {
func newSeedPeer(cfg *config.Config, client SeedPeerClient, peerManager PeerManager, hostManager HostManager) SeedPeer {
return &seedPeer{
config: cfg,
client: client,
@ -84,14 +87,13 @@ func newSeedPeer(cfg *config.ResourceConfig, client SeedPeerClient, peerManager
}
}
// TODO Implement DownloadTask
// DownloadTask downloads task back-to-source.
// TriggerDownloadTask triggers the seed peer to download task.
// Used only in v2 version of the grpc.
func (s *seedPeer) DownloadTask(ctx context.Context, download *commonv2.Download) error {
// ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)))
// defer cancel()
func (s *seedPeer) TriggerDownloadTask(ctx context.Context, req *dfdaemonv2.TriggerDownloadTaskRequest) (*dfdaemonv2.TriggerDownloadTaskResponse, error) {
ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx)))
defer cancel()
return nil
return s.client.TriggerDownloadTask(ctx, req)
}
// TriggerTask triggers the seed peer to download task.
@ -145,7 +147,7 @@ func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task)
initialized = true
// Initialize seed peer.
peer, err = s.initSeedPeer(ctx, rg, task, pieceSeed)
peer, err = s.initSeedPeer(ctx, rg, task, pieceSeed.HostId, pieceSeed.PeerId)
if err != nil {
return nil, nil, err
}
@ -210,21 +212,21 @@ func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task)
}
// Initialize seed peer.
func (s *seedPeer) initSeedPeer(ctx context.Context, rg *http.Range, task *Task, ps *cdnsystemv1.PieceSeed) (*Peer, error) {
func (s *seedPeer) initSeedPeer(ctx context.Context, rg *http.Range, task *Task, hostID string, peerID string) (*Peer, error) {
// Load host from manager.
host, loaded := s.hostManager.Load(ps.HostId)
host, loaded := s.hostManager.Load(hostID)
if !loaded {
task.Log.Errorf("can not find seed host id: %s", ps.HostId)
return nil, fmt.Errorf("can not find host id: %s", ps.HostId)
task.Log.Errorf("can not find seed host id: %s", hostID)
return nil, fmt.Errorf("can not find host id: %s", hostID)
}
host.UpdatedAt.Store(time.Now())
// Load peer from manager.
peer, loaded := s.peerManager.Load(ps.PeerId)
peer, loaded := s.peerManager.Load(peerID)
if loaded {
return peer, nil
}
task.Log.Infof("can not find seed peer: %s", ps.PeerId)
task.Log.Infof("can not find seed peer: %s", peerID)
options := []PeerOption{}
if rg != nil {
@ -232,7 +234,7 @@ func (s *seedPeer) initSeedPeer(ctx context.Context, rg *http.Range, task *Task,
}
// New and store seed peer without range.
peer = NewPeer(ps.PeerId, s.config, task, host, options...)
peer = NewPeer(peerID, &s.config.Resource, task, host, options...)
s.peerManager.Store(peer)
peer.Log.Info("seed peer has been stored")

View File

@ -23,6 +23,7 @@ import (
"fmt"
reflect "reflect"
"github.com/hashicorp/go-multierror"
"google.golang.org/grpc"
managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"
@ -30,7 +31,8 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client"
cdnsystemclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client"
dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
)
@ -40,8 +42,11 @@ type SeedPeerClient interface {
// Addrs returns the addresses of seed peers.
Addrs() []string
// client is seed peer grpc client interface.
client.Client
// Client is cdnsystem grpc client interface.
cdnsystemclient.Client
// V2 is dfdaemon v2 grpc client interface.
dfdaemonclient.V2
// Observer is dynconfig observer interface.
config.Observer
@ -49,8 +54,11 @@ type SeedPeerClient interface {
// seedPeerClient contains content for client of seed peer.
type seedPeerClient struct {
// client is sedd peer grpc client instance.
client.Client
// Client is cdnsystem grpc client interface.
cdnsystemclient.Client
// V2 is dfdaemon v2 grpc client interface.
dfdaemonclient.V2
// hostManager is host manager.
hostManager HostManager
@ -71,14 +79,25 @@ func newSeedPeerClient(dynconfig config.DynconfigInterface, hostManager HostMana
logger.Infof("initialize seed peer addresses: %#v", seedPeersToNetAddrs(config.Scheduler.SeedPeers))
// Initialize seed peer grpc client.
client, err := client.GetClient(context.Background(), dynconfig, opts...)
cdnsystemClient, err := cdnsystemclient.GetClient(context.Background(), dynconfig, opts...)
if err != nil {
return nil, err
}
fmt.Println("cdnsystemClient", cdnsystemClient)
// Initialize dfdaemon v2 grpc client.
dfdaemonClient, err := dfdaemonclient.GetV2(context.Background(), dynconfig, opts...)
if err != nil {
return nil, err
}
fmt.Println("dfdaemonClient", dfdaemonClient)
sc := &seedPeerClient{
hostManager: hostManager,
Client: client,
Client: cdnsystemClient,
V2: dfdaemonClient,
dynconfig: dynconfig,
data: config,
}
@ -90,6 +109,20 @@ func newSeedPeerClient(dynconfig config.DynconfigInterface, hostManager HostMana
return sc, nil
}
// Close closes the seed peer client.
func (sc *seedPeerClient) Close() error {
var errs error
if err := sc.Client.Close(); err != nil {
errs = multierror.Append(errs, err)
}
if err := sc.V2.Close(); err != nil {
errs = multierror.Append(errs, err)
}
return errs
}
// Addrs returns the addresses of seed peers.
func (sc *seedPeerClient) Addrs() []string {
var addrs []string

View File

@ -14,6 +14,7 @@ import (
cdnsystem "d7y.io/api/v2/pkg/apis/cdnsystem/v1"
common "d7y.io/api/v2/pkg/apis/common/v1"
dfdaemon "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
config "d7y.io/dragonfly/v2/scheduler/config"
gomock "go.uber.org/mock/gomock"
grpc "google.golang.org/grpc"
@ -70,6 +71,26 @@ func (mr *MockSeedPeerClientMockRecorder) Close() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSeedPeerClient)(nil).Close))
}
// DownloadPiece mocks base method.
func (m *MockSeedPeerClient) DownloadPiece(arg0 context.Context, arg1 *dfdaemon.DownloadPieceRequest, arg2 ...grpc.CallOption) (*dfdaemon.DownloadPieceResponse, error) {
m.ctrl.T.Helper()
varargs := []any{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "DownloadPiece", varargs...)
ret0, _ := ret[0].(*dfdaemon.DownloadPieceResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// DownloadPiece indicates an expected call of DownloadPiece.
func (mr *MockSeedPeerClientMockRecorder) DownloadPiece(arg0, arg1 any, arg2 ...any) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]any{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadPiece", reflect.TypeOf((*MockSeedPeerClient)(nil).DownloadPiece), varargs...)
}
// GetPieceTasks mocks base method.
func (m *MockSeedPeerClient) GetPieceTasks(arg0 context.Context, arg1 *common.PieceTaskRequest, arg2 ...grpc.CallOption) (*common.PiecePacket, error) {
m.ctrl.T.Helper()
@ -141,3 +162,43 @@ func (mr *MockSeedPeerClientMockRecorder) SyncPieceTasks(arg0, arg1 any, arg2 ..
varargs := append([]any{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieceTasks", reflect.TypeOf((*MockSeedPeerClient)(nil).SyncPieceTasks), varargs...)
}
// SyncPieces mocks base method.
func (m *MockSeedPeerClient) SyncPieces(arg0 context.Context, arg1 *dfdaemon.SyncPiecesRequest, arg2 ...grpc.CallOption) (dfdaemon.DfdaemonUpload_SyncPiecesClient, error) {
m.ctrl.T.Helper()
varargs := []any{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "SyncPieces", varargs...)
ret0, _ := ret[0].(dfdaemon.DfdaemonUpload_SyncPiecesClient)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SyncPieces indicates an expected call of SyncPieces.
func (mr *MockSeedPeerClientMockRecorder) SyncPieces(arg0, arg1 any, arg2 ...any) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]any{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieces", reflect.TypeOf((*MockSeedPeerClient)(nil).SyncPieces), varargs...)
}
// TriggerDownloadTask mocks base method.
func (m *MockSeedPeerClient) TriggerDownloadTask(arg0 context.Context, arg1 *dfdaemon.TriggerDownloadTaskRequest, arg2 ...grpc.CallOption) (*dfdaemon.TriggerDownloadTaskResponse, error) {
m.ctrl.T.Helper()
varargs := []any{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "TriggerDownloadTask", varargs...)
ret0, _ := ret[0].(*dfdaemon.TriggerDownloadTaskResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// TriggerDownloadTask indicates an expected call of TriggerDownloadTask.
func (mr *MockSeedPeerClientMockRecorder) TriggerDownloadTask(arg0, arg1 any, arg2 ...any) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]any{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerDownloadTask", reflect.TypeOf((*MockSeedPeerClient)(nil).TriggerDownloadTask), varargs...)
}

View File

@ -52,6 +52,8 @@ func TestSeedPeerClient_newSeedPeerClient(t *testing.T) {
}, nil).Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
hostManager.Load(gomock.Any()).Return(nil, false).Times(1),
hostManager.Store(gomock.Any()).Return().Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
@ -84,6 +86,8 @@ func TestSeedPeerClient_newSeedPeerClient(t *testing.T) {
dynconfig.Register(gomock.Any()).Return().Times(1),
dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
)
},
expect: func(t *testing.T, err error) {
@ -183,6 +187,8 @@ func TestSeedPeerClient_OnNotify(t *testing.T) {
}, nil).Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
hostManager.Load(gomock.Any()).Return(nil, false).Times(1),
hostManager.Store(gomock.Any()).Return().Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
@ -220,6 +226,8 @@ func TestSeedPeerClient_OnNotify(t *testing.T) {
}, nil).Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
hostManager.Load(gomock.Any()).Return(nil, false).Times(1),
hostManager.Store(gomock.Any()).Return().Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
@ -255,6 +263,8 @@ func TestSeedPeerClient_OnNotify(t *testing.T) {
}, nil).Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
hostManager.Load(gomock.Any()).Return(nil, false).Times(1),
hostManager.Store(gomock.Any()).Return().Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
@ -289,6 +299,8 @@ func TestSeedPeerClient_OnNotify(t *testing.T) {
}, nil).Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),
dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1),
hostManager.Load(gomock.Any()).Return(nil, false).Times(1),
hostManager.Store(gomock.Any()).Return().Times(1),
dynconfig.Register(gomock.Any()).Return().Times(1),

View File

@ -12,7 +12,7 @@ import (
context "context"
reflect "reflect"
common "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemon "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
scheduler "d7y.io/api/v2/pkg/apis/scheduler/v1"
http "d7y.io/dragonfly/v2/pkg/net/http"
gomock "go.uber.org/mock/gomock"
@ -55,20 +55,6 @@ func (mr *MockSeedPeerMockRecorder) Client() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Client", reflect.TypeOf((*MockSeedPeer)(nil).Client))
}
// DownloadTask mocks base method.
func (m *MockSeedPeer) DownloadTask(arg0 context.Context, arg1 *common.Download) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DownloadTask", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// DownloadTask indicates an expected call of DownloadTask.
func (mr *MockSeedPeerMockRecorder) DownloadTask(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockSeedPeer)(nil).DownloadTask), arg0, arg1)
}
// Stop mocks base method.
func (m *MockSeedPeer) Stop() error {
m.ctrl.T.Helper()
@ -83,6 +69,21 @@ func (mr *MockSeedPeerMockRecorder) Stop() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockSeedPeer)(nil).Stop))
}
// TriggerDownloadTask mocks base method.
func (m *MockSeedPeer) TriggerDownloadTask(arg0 context.Context, arg1 *dfdaemon.TriggerDownloadTaskRequest) (*dfdaemon.TriggerDownloadTaskResponse, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "TriggerDownloadTask", arg0, arg1)
ret0, _ := ret[0].(*dfdaemon.TriggerDownloadTaskResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// TriggerDownloadTask indicates an expected call of TriggerDownloadTask.
func (mr *MockSeedPeerMockRecorder) TriggerDownloadTask(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerDownloadTask", reflect.TypeOf((*MockSeedPeer)(nil).TriggerDownloadTask), arg0, arg1)
}
// TriggerTask mocks base method.
func (m *MockSeedPeer) TriggerTask(arg0 context.Context, arg1 *http.Range, arg2 *Task) (*Peer, *scheduler.PeerResult, error) {
m.ctrl.T.Helper()

View File

@ -26,6 +26,7 @@ import (
gomock "go.uber.org/mock/gomock"
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
)
@ -51,7 +52,54 @@ func TestSeedPeer_newSeedPeer(t *testing.T) {
peerManager := NewMockPeerManager(ctl)
client := NewMockSeedPeerClient(ctl)
tc.expect(t, newSeedPeer(mockResourceConfig, client, peerManager, hostManager))
tc.expect(t, newSeedPeer(mockConfig, client, peerManager, hostManager))
})
}
}
func TestSeedPeer_TriggerDownloadTask(t *testing.T) {
tests := []struct {
name string
mock func(mc *MockSeedPeerClientMockRecorder)
expect func(t *testing.T, resp *dfdaemonv2.TriggerDownloadTaskResponse, err error)
}{
{
name: "trigger download task failed",
mock: func(mc *MockSeedPeerClientMockRecorder) {
mc.TriggerDownloadTask(gomock.Any(), gomock.Any()).Return(nil, errors.New("foo")).Times(1)
},
expect: func(t *testing.T, resp *dfdaemonv2.TriggerDownloadTaskResponse, err error) {
assert := assert.New(t)
assert.EqualError(err, "foo")
},
},
{
name: "trigger download task scuccess",
mock: func(mc *MockSeedPeerClientMockRecorder) {
mc.TriggerDownloadTask(gomock.Any(), gomock.Any()).Return(&dfdaemonv2.TriggerDownloadTaskResponse{HostId: mockHostID, TaskId: mockTaskID, PeerId: mockPeerID}, nil).Times(1)
},
expect: func(t *testing.T, resp *dfdaemonv2.TriggerDownloadTaskResponse, err error) {
assert := assert.New(t)
assert.NoError(err)
assert.Equal(mockHostID, resp.HostId)
assert.Equal(mockTaskID, resp.TaskId)
assert.Equal(mockPeerID, resp.PeerId)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
hostManager := NewMockHostManager(ctl)
peerManager := NewMockPeerManager(ctl)
client := NewMockSeedPeerClient(ctl)
tc.mock(client.EXPECT())
seedPeer := newSeedPeer(mockConfig, client, peerManager, hostManager)
resp, err := seedPeer.TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{})
tc.expect(t, resp, err)
})
}
}
@ -83,7 +131,7 @@ func TestSeedPeer_TriggerTask(t *testing.T) {
client := NewMockSeedPeerClient(ctl)
tc.mock(client.EXPECT())
seedPeer := newSeedPeer(mockResourceConfig, client, peerManager, hostManager)
seedPeer := newSeedPeer(mockConfig, client, peerManager, hostManager)
mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest))
peer, result, err := seedPeer.TriggerTask(context.Background(), nil, mockTask)
tc.expect(t, peer, result, err)

View File

@ -59,6 +59,10 @@ var (
mockTaskFilters = []string{"bar"}
mockTaskHeader = map[string]string{"content-length": "100"}
mockTaskPieceLength int32 = 2048
mockConfig = &config.Config{
Resource: *mockResourceConfig,
}
mockResourceConfig = &config.ResourceConfig{
Task: config.TaskConfig{
DownloadTiny: config.DownloadTinyConfig{

View File

@ -28,6 +28,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"
logger "d7y.io/dragonfly/v2/internal/dflog"
@ -1311,12 +1312,16 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Down
case commonv2.Priority_LEVEL6, commonv2.Priority_LEVEL0:
// Super peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
go func(ctx context.Context, download *commonv2.Download, hostType types.HostType) {
resp, err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download})
if err != nil {
peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeSuperSeed)
peer.Log.Infof("%s seed peer triggers download task success, hostID: %s, peerID: %s", hostType.Name(), resp.GetHostId(), resp.GetPeerId())
}(ctx, download, types.HostTypeSuperSeed)
break
}
@ -1324,12 +1329,16 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Down
case commonv2.Priority_LEVEL5:
// Strong peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
go func(ctx context.Context, download *commonv2.Download, hostType types.HostType) {
resp, err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download})
if err != nil {
peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeStrongSeed)
peer.Log.Infof("%s seed peer triggers download task success, hostID: %s, peerID: %s", hostType.Name(), resp.GetHostId(), resp.GetPeerId())
}(ctx, download, types.HostTypeSuperSeed)
break
}
@ -1337,12 +1346,16 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Down
case commonv2.Priority_LEVEL4:
// Weak peer is first triggered to download back-to-source.
if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() {
go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) {
if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil {
peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error())
go func(ctx context.Context, download *commonv2.Download, hostType types.HostType) {
resp, err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download})
if err != nil {
peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error())
return
}
}(ctx, peer, types.HostTypeWeakSeed)
peer.Log.Infof("%s seed peer triggers download task success, hostID: %s, peerID: %s", hostType.Name(), resp.GetHostId(), resp.GetPeerId())
}(ctx, download, types.HostTypeSuperSeed)
break
}

View File

@ -37,6 +37,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
managerv2 "d7y.io/api/v2/pkg/apis/manager/v2"
schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"
schedulerv2mocks "d7y.io/api/v2/pkg/apis/scheduler/v2/mocks"
@ -3363,7 +3364,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {
gomock.InOrder(
mr.SeedPeer().Return(seedPeerClient).Times(1),
ms.DownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *commonv2.Download) { wg.Done() }).Return(nil).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(&dfdaemonv2.TriggerDownloadTaskResponse{}, nil).Times(1),
)
peer.Priority = commonv2.Priority_LEVEL6
@ -3387,7 +3388,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {
gomock.InOrder(
mr.SeedPeer().Return(seedPeerClient).Times(1),
ms.DownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *commonv2.Download) { wg.Done() }).Return(errors.New("foo")).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil, errors.New("foo")).Times(1),
)
peer.Priority = commonv2.Priority_LEVEL6
@ -3426,7 +3427,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {
gomock.InOrder(
mr.SeedPeer().Return(seedPeerClient).Times(1),
ms.DownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *commonv2.Download) { wg.Done() }).Return(nil).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(&dfdaemonv2.TriggerDownloadTaskResponse{}, nil).Times(1),
)
peer.Priority = commonv2.Priority_LEVEL5
@ -3450,7 +3451,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {
gomock.InOrder(
mr.SeedPeer().Return(seedPeerClient).Times(1),
ms.DownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *commonv2.Download) { wg.Done() }).Return(errors.New("foo")).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil, errors.New("foo")).Times(1),
)
peer.Priority = commonv2.Priority_LEVEL5
@ -3489,7 +3490,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {
gomock.InOrder(
mr.SeedPeer().Return(seedPeerClient).Times(1),
ms.DownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *commonv2.Download) { wg.Done() }).Return(nil).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(&dfdaemonv2.TriggerDownloadTaskResponse{}, nil).Times(1),
)
peer.Priority = commonv2.Priority_LEVEL4
@ -3513,7 +3514,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {
gomock.InOrder(
mr.SeedPeer().Return(seedPeerClient).Times(1),
ms.DownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *commonv2.Download) { wg.Done() }).Return(errors.New("foo")).Times(1),
ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil, errors.New("foo")).Times(1),
)
peer.Priority = commonv2.Priority_LEVEL4