feat: add AllSeedPeersScope for preheating (#3698)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-12-11 11:51:51 +08:00 committed by GitHub
parent 79a845edb6
commit 62b6c3709d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 202 additions and 15 deletions

View File

@ -45,7 +45,7 @@ func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncP
func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*models.Job, error) {
if json.Args.Scope == "" {
json.Args.Scope = types.SinglePeerScope
json.Args.Scope = types.SingleSeedPeerScope
}
if json.Args.ConcurrentCount == 0 {

View File

@ -19,8 +19,11 @@ package types
import "time"
const (
// SinglePeerScope represents the scope that only single peer will be preheated.
SinglePeerScope = "single_peer"
// SingleSeedPeerScope represents the scope that only single seed peer will be preheated.
SingleSeedPeerScope = "single_seed_peer"
// AllSeedPeersScope represents the scope that all seed peers will be preheated.
AllSeedPeersScope = "all_seed_peers"
// AllPeersScope represents the scope that all peers will be preheated.
AllPeersScope = "all_peers"
@ -126,8 +129,8 @@ type PreheatArgs struct {
// The image type preheating task can specify the image architecture type. eg: linux/amd64.
Platform string `json:"platform" binding:"omitempty"`
// Scope is the scope for preheating, default is single_peer.
Scope string `json:"scope" binding:"omitempty,oneof=single_peer all_peers"`
// Scope is the scope for preheating, default is single_seed_peer.
Scope string `json:"scope" binding:"omitempty"`
// BatchSize is the batch size for preheating all peers, default is 50.
ConcurrentCount int64 `json:"concurrent_count" binding:"omitempty,gte=1,lte=500"`

View File

@ -178,13 +178,22 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) {
defer cancel()
switch req.Scope {
case managertypes.SinglePeerScope:
log.Info("preheat single peer")
case managertypes.SingleSeedPeerScope:
log.Info("preheat single seed peer")
resp, err := j.preheatSinglePeer(ctx, taskID, req, log)
if err != nil {
return "", err
}
resp.SchedulerClusterID = j.config.Manager.SchedulerClusterID
return internaljob.MarshalResponse(resp)
case managertypes.AllSeedPeersScope:
log.Info("preheat all seed peers")
resp, err := j.preheatAllSeedPeers(ctx, taskID, req, log)
if err != nil {
return "", err
}
resp.SchedulerClusterID = j.config.Manager.SchedulerClusterID
return internaljob.MarshalResponse(resp)
case managertypes.AllPeersScope:
@ -237,10 +246,21 @@ func (j *job) preheatSinglePeer(ctx context.Context, taskID string, req *interna
return resp, nil
}
// preheatAllPeers preheats job by all peers, only suoported by v2 protocol. Scheduler will trigger all peers to download task.
// If all the peer download task failed, return error. If some of the peer download task failed, return success tasks and failure tasks.
// preheatAllSeedPeers preheats job by all peer seed peers, only suoported by v2 protocol. Scheduler will trigger all seed peers to download task.
// If all the seed peers download task failed, return error. If some of the seed peers download task failed, return success tasks and failure tasks.
// Notify the client that the preheat is successful.
func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
func (j *job) preheatAllSeedPeers(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
// If seed peer is disabled, return error.
if !j.config.SeedPeer.Enable {
return nil, fmt.Errorf("cluster %d scheduler %s has disabled seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
}
// If scheduler has no available seed peer, return error.
seedPeers := j.resource.SeedPeer().Client().SeedPeers()
if len(seedPeers) == 0 {
return nil, fmt.Errorf("cluster %d scheduler %s has no available seed peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
}
var (
successTasks = sync.Map{}
failureTasks = sync.Map{}
@ -248,15 +268,156 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
eg, _ := errgroup.WithContext(ctx)
eg.SetLimit(int(req.ConcurrentCount))
for _, host := range j.resource.HostManager().LoadAll() {
for _, seedPeer := range seedPeers {
var (
hostname = host.Hostname
ip = host.IP
port = host.Port
hostname = seedPeer.Hostname
ip = seedPeer.Ip
port = seedPeer.Port
)
target := fmt.Sprintf("%s:%d", ip, port)
log := logger.WithHost(host.ID, hostname, ip)
log := logger.WithHost(idgen.HostIDV2(ip, hostname, true), hostname, ip)
eg.Go(func() error {
log.Info("preheat started")
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
dfdaemonClient, err := dfdaemonclient.GetV2ByAddr(ctx, target, dialOptions...)
if err != nil {
log.Errorf("preheat failed: %s", err.Error())
failureTasks.Store(ip, &internaljob.PreheatFailureTask{
URL: req.URL,
Hostname: hostname,
IP: ip,
Description: fmt.Sprintf("task %s failed: %s", taskID, err.Error()),
})
return err
}
stream, err := dfdaemonClient.DownloadTask(
ctx,
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Priority: commonv2.Priority(req.Priority),
FilteredQueryParams: strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator),
RequestHeader: req.Headers,
Timeout: durationpb.New(req.Timeout),
CertificateChain: req.CertificateChain,
}})
if err != nil {
log.Errorf("preheat failed: %s", err.Error())
failureTasks.Store(ip, &internaljob.PreheatFailureTask{
URL: req.URL,
Hostname: hostname,
IP: ip,
Description: fmt.Sprintf("task %s failed: %s", taskID, err.Error()),
})
return err
}
// Wait for the download task to complete.
for {
_, err := stream.Recv()
if err != nil {
if err == io.EOF {
log.Info("preheat succeeded")
successTasks.Store(ip, &internaljob.PreheatSuccessTask{
URL: req.URL,
Hostname: hostname,
IP: ip,
})
return nil
}
log.Errorf("preheat failed: %s", err.Error())
failureTasks.Store(ip, &internaljob.PreheatFailureTask{
URL: req.URL,
Hostname: hostname,
IP: ip,
Description: fmt.Sprintf("task %s failed: %s", taskID, err.Error()),
})
return err
}
}
})
}
// Wait for all tasks to complete and print the errors.
if err := eg.Wait(); err != nil {
log.Errorf("preheat failed: %s", err.Error())
}
// If successTasks is not empty, return success tasks and failure tasks.
// Notify the client that the preheat is successful.
var preheatResponse internaljob.PreheatResponse
failureTasks.Range(func(_, value any) bool {
if failureTask, ok := value.(*internaljob.PreheatFailureTask); ok {
preheatResponse.FailureTasks = append(preheatResponse.FailureTasks, failureTask)
}
return true
})
successTasks.Range(func(_, value any) bool {
if successTask, ok := value.(*internaljob.PreheatSuccessTask); ok {
for _, failureTask := range preheatResponse.FailureTasks {
if failureTask.IP == successTask.IP {
return true
}
}
preheatResponse.SuccessTasks = append(preheatResponse.SuccessTasks, successTask)
}
return true
})
if len(preheatResponse.SuccessTasks) > 0 {
return &preheatResponse, nil
}
msg := "no error message"
if len(preheatResponse.FailureTasks) > 0 {
msg = fmt.Sprintf("%s %s %s %s", taskID, preheatResponse.FailureTasks[0].IP, preheatResponse.FailureTasks[0].Hostname,
preheatResponse.FailureTasks[0].Description)
}
return nil, fmt.Errorf("all peers preheat failed: %s", msg)
}
// preheatAllPeers preheats job by all peers, only suoported by v2 protocol. Scheduler will trigger all peers to download task.
// If all the peers download task failed, return error. If some of the peers download task failed, return success tasks and
// failure tasks. Notify the client that the preheat is successful.
func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
// If scheduler has no available peer, return error.
peers := j.resource.HostManager().LoadAll()
if len(peers) == 0 {
return nil, fmt.Errorf("cluster %d scheduler %s has no available peer", j.config.Manager.SchedulerClusterID, j.config.Server.AdvertiseIP)
}
var (
successTasks = sync.Map{}
failureTasks = sync.Map{}
)
eg, _ := errgroup.WithContext(ctx)
eg.SetLimit(int(req.ConcurrentCount))
for _, peer := range peers {
var (
hostname = peer.Hostname
ip = peer.IP
port = peer.Port
)
target := fmt.Sprintf("%s:%d", ip, port)
log := logger.WithHost(peer.ID, hostname, ip)
eg.Go(func() error {
log.Info("preheat started")

View File

@ -42,6 +42,9 @@ type SeedPeerClient interface {
// Addrs returns the addresses of seed peers.
Addrs() []string
// SeedPeers returns the seed peers working for the scheduler.
SeedPeers() []*managerv2.SeedPeer
// Client is cdnsystem grpc client interface.
cdnsystemclient.Client
@ -132,6 +135,11 @@ func (sc *seedPeerClient) Addrs() []string {
return addrs
}
// SeedPeers returns the seed peers working for the scheduler.
func (sc *seedPeerClient) SeedPeers() []*managerv2.SeedPeer {
return sc.data.Scheduler.SeedPeers
}
// Dynamic config notify function.
func (sc *seedPeerClient) OnNotify(data *config.DynconfigData) {
if reflect.DeepEqual(sc.data, data) {

View File

@ -17,6 +17,7 @@ import (
common "d7y.io/api/v2/pkg/apis/common/v1"
common0 "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemon "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
manager "d7y.io/api/v2/pkg/apis/manager/v2"
config "d7y.io/dragonfly/v2/scheduler/config"
gomock "go.uber.org/mock/gomock"
grpc "google.golang.org/grpc"
@ -224,6 +225,20 @@ func (mr *MockSeedPeerClientMockRecorder) OnNotify(arg0 any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnNotify", reflect.TypeOf((*MockSeedPeerClient)(nil).OnNotify), arg0)
}
// SeedPeers mocks base method.
func (m *MockSeedPeerClient) SeedPeers() []*manager.SeedPeer {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SeedPeers")
ret0, _ := ret[0].([]*manager.SeedPeer)
return ret0
}
// SeedPeers indicates an expected call of SeedPeers.
func (mr *MockSeedPeerClientMockRecorder) SeedPeers() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SeedPeers", reflect.TypeOf((*MockSeedPeerClient)(nil).SeedPeers))
}
// StatPersistentCacheTask mocks base method.
func (m *MockSeedPeerClient) StatPersistentCacheTask(arg0 context.Context, arg1 *dfdaemon.StatPersistentCacheTaskRequest, arg2 ...grpc.CallOption) (*common0.PersistentCacheTask, error) {
m.ctrl.T.Helper()