/* * Copyright 2020 The Dragonfly Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //go:generate mockgen -destination mocks/scheduling_mock.go -source scheduling.go -package mocks package scheduling import ( "context" "sort" "time" commonv1 "d7y.io/api/pkg/apis/common/v1" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" "d7y.io/dragonfly/v2/pkg/container/set" "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduling/evaluator" ) type Scheduling interface { // ScheduleParent schedule a parent and candidates to a peer. ScheduleParent(context.Context, *resource.Peer, set.SafeSet[string]) // Find the parent that best matches the evaluation and notify peer. NotifyAndFindParent(context.Context, *resource.Peer, set.SafeSet[string]) ([]*resource.Peer, bool) // Find the parent that best matches the evaluation. FindParent(context.Context, *resource.Peer, set.SafeSet[string]) (*resource.Peer, bool) } type scheduling struct { // Evaluator interface. evaluator evaluator.Evaluator // Scheduler configuration. config *config.SchedulerConfig // Scheduler dynamic configuration. dynconfig config.DynconfigInterface } func New(cfg *config.SchedulerConfig, dynconfig config.DynconfigInterface, pluginDir string) Scheduling { return &scheduling{ evaluator: evaluator.New(cfg.Algorithm, pluginDir), config: cfg, dynconfig: dynconfig, } } // ScheduleParent schedule a parent and candidates to a peer. func (s *scheduling) ScheduleParent(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet[string]) { var n int for { select { case <-ctx.Done(): peer.Log.Infof("context was done") return default: } // If the scheduling exceeds the RetryBackToSourceLimit or peer needs back-to-source, // peer will download the task back-to-source. needBackToSource := peer.NeedBackToSource.Load() peer.Log.Infof("peer needs to back-to-source: %t", needBackToSource) if (n >= s.config.RetryBackToSourceLimit || needBackToSource) && peer.Task.CanBackToSource() { stream, loaded := peer.LoadReportPieceResultStream() if !loaded { peer.Log.Error("load stream failed") return } peer.Log.Infof("schedule peer back-to-source in %d times", n) // Notify peer back-to-source. if err := stream.Send(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedNeedBackSource}); err != nil { peer.Log.Error(err) return } if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) return } // If the task state is TaskStateFailed, // peer back-to-source and reset task state to TaskStateRunning. if peer.Task.FSM.Is(resource.TaskStateFailed) { if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownload); err != nil { peer.Task.Log.Errorf("task fsm event failed: %s", err.Error()) return } } return } // Handle peer schedule failed. if n >= s.config.RetryLimit { stream, loaded := peer.LoadReportPieceResultStream() if !loaded { peer.Log.Error("load stream failed") return } // Notify peer schedule failed. if err := stream.Send(&schedulerv1.PeerPacket{Code: commonv1.Code_SchedTaskStatusError}); err != nil { peer.Log.Error(err) return } peer.Log.Errorf("peer scheduling exceeds the limit %d times", s.config.RetryLimit) return } if _, ok := s.NotifyAndFindParent(ctx, peer, blocklist); !ok { n++ peer.Log.Infof("schedule parent failed in %d times ", n) // Sleep to avoid hot looping. time.Sleep(s.config.RetryInterval) continue } peer.Log.Infof("schedule parent successfully in %d times", n+1) return } } // NotifyAndFindParent finds parent that best matches the evaluation and notify peer. func (s *scheduling) NotifyAndFindParent(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet[string]) ([]*resource.Peer, bool) { // Only PeerStateRunning peers need to be rescheduled, // and other states including the PeerStateBackToSource indicate that // they have been scheduled. if !peer.FSM.Is(resource.PeerStateRunning) { peer.Log.Infof("peer state is %s, can not schedule parent", peer.FSM.Current()) return []*resource.Peer{}, false } // Delete inedges of vertex. if err := peer.Task.DeletePeerInEdges(peer.ID); err != nil { peer.Log.Errorf("peer deletes inedges failed: %s", err.Error()) return []*resource.Peer{}, false } // Find the candidate parent that can be scheduled. candidateParents := s.filterCandidateParents(peer, blocklist) if len(candidateParents) == 0 { peer.Log.Info("can not find candidate parents") return []*resource.Peer{}, false } // Sort candidate parents by evaluation score. taskTotalPieceCount := peer.Task.TotalPieceCount.Load() sort.Slice( candidateParents, func(i, j int) bool { return s.evaluator.Evaluate(candidateParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(candidateParents[j], peer, taskTotalPieceCount) }, ) // Add edges between candidate parent and peer. var ( parents []*resource.Peer parentIDs []string ) for _, candidateParent := range candidateParents { if err := peer.Task.AddPeerEdge(candidateParent, peer); err != nil { peer.Log.Debugf("peer adds edge failed: %s", err.Error()) continue } parents = append(parents, candidateParent) parentIDs = append(parentIDs, candidateParent.ID) } if len(parents) <= 0 { peer.Log.Info("can not add edges for vertex") return []*resource.Peer{}, false } // Send scheduling success message. stream, loaded := peer.LoadReportPieceResultStream() if !loaded { peer.Log.Error("load peer stream failed") return []*resource.Peer{}, false } if err := stream.Send(constructSuccessPeerPacket(s.dynconfig, peer, parents[0], parents[1:])); err != nil { peer.Log.Error(err) return []*resource.Peer{}, false } peer.Log.Infof("schedule candidate parents is %#v", parentIDs) return candidateParents, true } // FindParent finds parent that best matches the evaluation. func (s *scheduling) FindParent(ctx context.Context, peer *resource.Peer, blocklist set.SafeSet[string]) (*resource.Peer, bool) { // Filter the candidate parent that can be scheduled. candidateParents := s.filterCandidateParents(peer, blocklist) if len(candidateParents) == 0 { peer.Log.Info("can not find candidate parents") return nil, false } // Sort candidate parents by evaluation score. taskTotalPieceCount := peer.Task.TotalPieceCount.Load() sort.Slice( candidateParents, func(i, j int) bool { return s.evaluator.Evaluate(candidateParents[i], peer, taskTotalPieceCount) > s.evaluator.Evaluate(candidateParents[j], peer, taskTotalPieceCount) }, ) peer.Log.Infof("schedule candidate parent is %s", candidateParents[0].ID) return candidateParents[0], true } // Filter the candidate parent that can be scheduled. func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.SafeSet[string]) []*resource.Peer { filterParentLimit := config.DefaultSchedulerFilterParentLimit filterParentRangeLimit := config.DefaultSchedulerFilterParentRangeLimit if config, err := s.dynconfig.GetSchedulerClusterConfig(); err == nil { if config.FilterParentLimit > 0 { filterParentLimit = int(config.FilterParentLimit) } if config.FilterParentRangeLimit > 0 { filterParentRangeLimit = int(config.FilterParentRangeLimit) } } var ( candidateParents []*resource.Peer candidateParentIDs []string ) for _, candidateParent := range peer.Task.LoadRandomPeers(uint(filterParentRangeLimit)) { // Parent length limit after filtering. if len(candidateParents) >= filterParentLimit { break } // Candidate parent is in blocklist. if blocklist.Contains(candidateParent.ID) { peer.Log.Debugf("parent %s is not selected because it is in blocklist", candidateParent.ID) continue } // Candidate parent can add edge with peer. if !peer.Task.CanAddPeerEdge(candidateParent.ID, peer.ID) { peer.Log.Debugf("can not add edge with parent %s", candidateParent.ID) continue } // Candidate parent host is not allowed to be the same as the peer host, // because dfdaemon cannot handle the situation // where two tasks are downloading and downloading each other. if peer.Host.ID == candidateParent.Host.ID { peer.Log.Debugf("parent %s host %s is the same as peer host", candidateParent.ID, candidateParent.Host.ID) continue } // Candidate parent is bad node. if s.evaluator.IsBadNode(candidateParent) { peer.Log.Debugf("parent %s is not selected because it is bad node", candidateParent.ID) continue } // Candidate parent can not find in dag. inDegree, err := peer.Task.PeerInDegree(candidateParent.ID) if err != nil { peer.Log.Debugf("can not find parent %s vertex in dag", candidateParent.ID) continue } // Conditions for candidate parent to be a parent: // 1. parent has parent. // 2. parent has been back-to-source. // 3. parent has been succeeded. // 4. parent is seed peer. isBackToSource := candidateParent.IsBackToSource.Load() if candidateParent.Host.Type == types.HostTypeNormal && inDegree == 0 && !isBackToSource && !candidateParent.FSM.Is(resource.PeerStateSucceeded) { peer.Log.Debugf("parent %s is not selected, because its download state is %d %d %t %s", candidateParent.ID, inDegree, int(candidateParent.Host.Type), isBackToSource, candidateParent.FSM.Current()) continue } // Candidate parent's free upload is empty. if candidateParent.Host.FreeUploadCount() <= 0 { peer.Log.Debugf("parent %s is not selected because its free upload is empty, upload limit is %d, upload count is %d", candidateParent.ID, candidateParent.Host.ConcurrentUploadLimit.Load(), candidateParent.Host.ConcurrentUploadCount.Load()) continue } candidateParents = append(candidateParents, candidateParent) candidateParentIDs = append(candidateParentIDs, candidateParent.ID) } peer.Log.Infof("filter candidate parents is %#v", candidateParentIDs) return candidateParents } // Construct peer successful packet. func constructSuccessPeerPacket(dynconfig config.DynconfigInterface, peer *resource.Peer, parent *resource.Peer, candidateParents []*resource.Peer) *schedulerv1.PeerPacket { parallelCount := config.DefaultPeerParallelCount if config, err := dynconfig.GetSchedulerClusterClientConfig(); err == nil && config.ParallelCount > 0 { parallelCount = int(config.ParallelCount) } var CandidatePeers []*schedulerv1.PeerPacket_DestPeer for _, candidateParent := range candidateParents { CandidatePeers = append(CandidatePeers, &schedulerv1.PeerPacket_DestPeer{ Ip: candidateParent.Host.IP, RpcPort: candidateParent.Host.Port, PeerId: candidateParent.ID, }) } return &schedulerv1.PeerPacket{ TaskId: peer.Task.ID, SrcPid: peer.ID, ParallelCount: int32(parallelCount), MainPeer: &schedulerv1.PeerPacket_DestPeer{ Ip: parent.Host.IP, RpcPort: parent.Host.Port, PeerId: parent.ID, }, CandidatePeers: CandidatePeers, Code: commonv1.Code_Success, } }