From 483c0b522062d5afd2b53f12d7d26faaf44d03e8 Mon Sep 17 00:00:00 2001 From: sunwp <244372610@qq.com> Date: Wed, 8 Sep 2021 14:41:42 +0800 Subject: [PATCH] feat: notice client back source when rescheduled parent reach max times (#611) Signed-off-by: santong <244372610@qq.com> --- client/daemon/proxy/proxy_sni.go | 3 +- scheduler/core/events.go | 155 ++++++++++-------- .../core/scheduler/basic/basic_scheduler.go | 21 ++- scheduler/core/scheduler/scheduler.go | 5 +- scheduler/core/service.go | 29 +++- 5 files changed, 123 insertions(+), 90 deletions(-) diff --git a/client/daemon/proxy/proxy_sni.go b/client/daemon/proxy/proxy_sni.go index 6fb128382..8124985ad 100644 --- a/client/daemon/proxy/proxy_sni.go +++ b/client/daemon/proxy/proxy_sni.go @@ -26,13 +26,14 @@ import ( "time" "github.com/golang/groupcache/lru" + "github.com/pkg/errors" logger "d7y.io/dragonfly/v2/internal/dflog" ) func (proxy *Proxy) ServeSNI(l net.Listener) error { if proxy.cert == nil { - return fmt.Errorf("empty cert") + return errors.New("empty cert") } if proxy.cert.Leaf != nil && proxy.cert.Leaf.IsCA { logger.Infof("hijack sni https request with CA <%s>", proxy.cert.Leaf.Subject.CommonName) diff --git a/scheduler/core/events.go b/scheduler/core/events.go index bf0e8f5c1..5cb8261ca 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -25,11 +25,11 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" schedulerRPC "d7y.io/dragonfly/v2/pkg/rpc/scheduler" "d7y.io/dragonfly/v2/pkg/structure/sortedlist" - "d7y.io/dragonfly/v2/pkg/synclock" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/core/scheduler" "d7y.io/dragonfly/v2/scheduler/supervisor" "go.opentelemetry.io/otel/trace" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" ) @@ -38,6 +38,12 @@ type event interface { apply(s *state) } +type rsPeer struct { + times int32 + peer *supervisor.Peer + blankParents sets.String +} + type state struct { sched scheduler.Scheduler peerManager supervisor.PeerMgr @@ -55,17 +61,48 @@ func newState(sched scheduler.Scheduler, peerManager supervisor.PeerMgr, cdnMana } type reScheduleParentEvent struct { - peer *supervisor.Peer + rsPeer *rsPeer } var _ event = reScheduleParentEvent{} func (e reScheduleParentEvent) apply(s *state) { - reScheduleParent(e.peer, s) + rsPeer := e.rsPeer + rsPeer.times = rsPeer.times + 1 + peer := rsPeer.peer + if peer.Task.IsFail() { + if err := peer.CloseChannel(dferrors.New(dfcodes.SchedTaskStatusError, "schedule task status failed")); err != nil { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Warnf("close peer channel failed: %v", err) + } + return + } + oldParent := peer.GetParent() + blankParents := rsPeer.blankParents + if oldParent != nil && !blankParents.Has(oldParent.PeerID) { + logger.WithTaskAndPeerID(peer.Task.TaskID, + peer.PeerID).Warnf("reScheduleParent: peer already schedule a parent %s and new parent is not in blank parents", oldParent.PeerID) + return + } + parent, candidates, hasParent := s.sched.ScheduleParent(peer, blankParents) + if !hasParent { + if peer.Task.NeedClientBackSource() && !peer.Task.IsBackSourcePeer(peer.PeerID) { + if peer.CloseChannel(dferrors.Newf(dfcodes.SchedNeedBackSource, "peer %s need back source", peer.PeerID)) == nil { + peer.Task.IncreaseBackSourcePeer(peer.PeerID) + } + return + } + logger.Errorf("reScheduleParent: failed to schedule parent to peer %s, reschedule it later", peer.PeerID) + s.waitScheduleParentPeerQueue.AddAfter(rsPeer, time.Second) + return + } + // TODO if parentPeer is equal with oldParent, need schedule again ? + if err := peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates)); err != nil { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Warnf("send schedule packet to peer %s failed: %v", peer.PeerID, err) + } } func (e reScheduleParentEvent) hashKey() string { - return e.peer.Task.TaskID + return e.rsPeer.peer.Task.TaskID } type startReportPieceResultEvent struct { @@ -80,33 +117,36 @@ func (e startReportPieceResultEvent) apply(s *state) { parent := e.peer.GetParent() if parent != nil { logger.WithTaskAndPeerID(e.peer.Task.TaskID, - e.peer.PeerID).Warnf("startReportPieceResultEvent: no need schedule parent because peer already had parent %s", e.peer.GetParent().PeerID) + e.peer.PeerID).Warnf("startReportPieceResultEvent: no need schedule parent because peer already had parent %s", parent.PeerID) + if err := e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parent, nil)); err != nil { + logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Warnf("send schedule packet to peer failed: %v", err) + } return } if e.peer.Task.IsBackSourcePeer(e.peer.PeerID) { logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Info("startReportPieceResultEvent: no need schedule parent because peer is back source peer") - s.waitScheduleParentPeerQueue.Done(e.peer) return } - parent, candidates, hasParent := s.sched.ScheduleParent(e.peer) + parent, candidates, hasParent := s.sched.ScheduleParent(e.peer, sets.NewString()) // No parent node is currently available if !hasParent { - if e.peer.Task.NeedClientBackSource() { + if e.peer.Task.NeedClientBackSource() && !e.peer.Task.IsBackSourcePeer(e.peer.PeerID) { span.SetAttributes(config.AttributeClientBackSource.Bool(true)) if e.peer.CloseChannel(dferrors.Newf(dfcodes.SchedNeedBackSource, "peer %s need back source", e.peer.PeerID)) == nil { e.peer.Task.IncreaseBackSourcePeer(e.peer.PeerID) - s.waitScheduleParentPeerQueue.Done(e.peer) } + logger.WithTaskAndPeerID(e.peer.Task.TaskID, + e.peer.PeerID).Info("startReportPieceResultEvent: peer need back source because no parent node is available for scheduling") return } logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Warnf("startReportPieceResultEvent: no parent node is currently available,reschedule it later") - s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second) + s.waitScheduleParentPeerQueue.AddAfter(&rsPeer{peer: e.peer}, time.Second) return } if err := e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parent, candidates)); err != nil { - logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Warnf("send schedule packet to peer %s failed: %v", e.peer.PeerID, err) + logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Warnf("send schedule packet failed: %v", err) } } @@ -138,18 +178,20 @@ func (e peerDownloadPieceSuccessEvent) apply(s *state) { if ok { oldParent := e.peer.GetParent() if e.pr.DstPid != e.peer.PeerID && (oldParent == nil || oldParent.PeerID != e.pr.DstPid) { + logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Debugf("parent peerID is not same as DestPid, replace it's parent node with %s", + e.pr.DstPid) e.peer.ReplaceParent(parentPeer) } } else if parentPeer.IsLeave() { logger.WithTaskAndPeerID(e.peer.Task.TaskID, - e.peer.PeerID).Warnf("peerDownloadPieceSuccessEvent: need reschedule parent for peer because it's parent is leave") + e.peer.PeerID).Warnf("peerDownloadPieceSuccessEvent: need reschedule parent for peer because it's parent is already left") e.peer.ReplaceParent(nil) var hasParent bool - parentPeer, candidates, hasParent = s.sched.ScheduleParent(e.peer) + parentPeer, candidates, hasParent = s.sched.ScheduleParent(e.peer, sets.NewString(parentPeer.PeerID)) if !hasParent { logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Warnf("peerDownloadPieceSuccessEvent: no parent node is currently available, " + "reschedule it later") - s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second) + s.waitScheduleParentPeerQueue.AddAfter(&rsPeer{peer: e.peer, blankParents: sets.NewString(parentPeer.PeerID)}, time.Second) return } } @@ -176,40 +218,25 @@ type peerDownloadPieceFailEvent struct { var _ event = peerDownloadPieceFailEvent{} func (e peerDownloadPieceFailEvent) apply(s *state) { - span := trace.SpanFromContext(e.ctx) if e.peer.Task.IsBackSourcePeer(e.peer.PeerID) { return } switch e.pr.Code { case dfcodes.ClientWaitPieceReady: return - case dfcodes.PeerTaskNotFound, dfcodes.ClientPieceRequestFail, dfcodes.ClientPieceDownloadFail: - // TODO PeerTaskNotFound remove dest peer task, ClientPieceDownloadFail add blank list - reScheduleParent(e.peer, s) - return - case dfcodes.CdnTaskNotFound, dfcodes.CdnError, dfcodes.CdnTaskRegistryFail, dfcodes.CdnTaskDownloadFail: - go func(task *supervisor.Task) { - // TODO - synclock.Lock(task.TaskID, false) - defer synclock.UnLock(task.TaskID, false) - if cdnPeer, err := s.cdnManager.StartSeedTask(context.Background(), task); err != nil { - logger.Errorf("start seed task fail: %v", err) - span.AddEvent(config.EventCDNFailBackClientSource, trace.WithAttributes(config.AttributeTriggerCDNError.String(err.Error()))) - handleSeedTaskFail(task) - } else { - logger.Debugf("===== successfully obtain seeds from cdn, task: %+v =====", e.peer.Task) - children := s.sched.ScheduleChildren(cdnPeer) - for _, child := range children { - if err := child.SendSchedulePacket(constructSuccessPeerPacket(child, cdnPeer, nil)); err != nil { - logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Warnf("send schedule packet to peer %s failed: %v", child.PeerID, err) - } - } + case dfcodes.PeerTaskNotFound: + s.peerManager.Delete(e.pr.DstPid) + case dfcodes.CdnTaskNotFound, dfcodes.CdnError, dfcodes.CdnTaskDownloadFail: + s.peerManager.Delete(e.pr.DstPid) + go func() { + if _, err := s.cdnManager.StartSeedTask(e.ctx, e.peer.Task); err != nil { + logger.WithTaskID(e.peer.Task.TaskID).Errorf("peerDownloadPieceFailEvent: seed task failed: %v", err) } - }(e.peer.Task) + }() default: - reScheduleParent(e.peer, s) - return + logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Debugf("report piece download fail message, piece result %s", e.pr.String()) } + s.waitScheduleParentPeerQueue.Add(&rsPeer{peer: e.peer, blankParents: sets.NewString(e.pr.DstPid)}) } func (e peerDownloadPieceFailEvent) hashKey() string { return e.peer.Task.TaskID @@ -222,7 +249,7 @@ type taskSeedFailEvent struct { var _ event = taskSeedFailEvent{} func (e taskSeedFailEvent) apply(s *state) { - handleSeedTaskFail(e.task) + handleCDNSeedTaskFail(e.task) } func (e taskSeedFailEvent) hashKey() string { @@ -238,11 +265,11 @@ var _ event = peerDownloadSuccessEvent{} func (e peerDownloadSuccessEvent) apply(s *state) { e.peer.SetStatus(supervisor.PeerStatusSuccess) - if e.peer.Task.IsBackSourcePeer(e.peer.PeerID) { + if e.peer.Task.IsBackSourcePeer(e.peer.PeerID) && !e.peer.Task.IsSuccess() { e.peer.Task.UpdateTaskSuccessResult(e.peerResult.TotalPieceCount, e.peerResult.ContentLength) } removePeerFromCurrentTree(e.peer, s) - children := s.sched.ScheduleChildren(e.peer) + children := s.sched.ScheduleChildren(e.peer, sets.NewString()) for _, child := range children { if err := child.SendSchedulePacket(constructSuccessPeerPacket(child, e.peer, nil)); err != nil { logger.WithTaskAndPeerID(e.peer.Task.TaskID, e.peer.PeerID).Warnf("send schedule packet to peer %s failed: %v", child.PeerID, err) @@ -265,16 +292,16 @@ func (e peerDownloadFailEvent) apply(s *state) { e.peer.SetStatus(supervisor.PeerStatusFail) if e.peer.Task.IsBackSourcePeer(e.peer.PeerID) && !e.peer.Task.IsSuccess() { e.peer.Task.SetStatus(supervisor.TaskStatusFail) - handleSeedTaskFail(e.peer.Task) + handleCDNSeedTaskFail(e.peer.Task) return } removePeerFromCurrentTree(e.peer, s) e.peer.GetChildren().Range(func(key, value interface{}) bool { child := (value).(*supervisor.Peer) - parent, candidates, hasParent := s.sched.ScheduleParent(child) + parent, candidates, hasParent := s.sched.ScheduleParent(child, sets.NewString(e.peer.PeerID)) if !hasParent { logger.WithTaskAndPeerID(child.Task.TaskID, child.PeerID).Warnf("peerDownloadFailEvent: there is no available parent, reschedule it later") - s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second) + s.waitScheduleParentPeerQueue.AddAfter(&rsPeer{peer: e.peer, blankParents: sets.NewString(e.peer.PeerID)}, time.Second) return true } if err := child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates)); err != nil { @@ -300,10 +327,10 @@ func (e peerLeaveEvent) apply(s *state) { removePeerFromCurrentTree(e.peer, s) e.peer.GetChildren().Range(func(key, value interface{}) bool { child := value.(*supervisor.Peer) - parent, candidates, hasParent := s.sched.ScheduleParent(child) + parent, candidates, hasParent := s.sched.ScheduleParent(child, sets.NewString(e.peer.PeerID)) if !hasParent { logger.WithTaskAndPeerID(child.Task.TaskID, child.PeerID).Warnf("handlePeerLeave: there is no available parent,reschedule it later") - s.waitScheduleParentPeerQueue.AddAfter(child, time.Second) + s.waitScheduleParentPeerQueue.AddAfter(&rsPeer{peer: child, blankParents: sets.NewString(e.peer.PeerID)}, time.Second) return true } if err := child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates)); err != nil { @@ -345,33 +372,15 @@ func constructSuccessPeerPacket(peer *supervisor.Peer, parent *supervisor.Peer, return peerPacket } -func reScheduleParent(peer *supervisor.Peer, s *state) { - parent, candidates, hasParent := s.sched.ScheduleParent(peer) - if !hasParent { - if peer.Task.NeedClientBackSource() { - if peer.CloseChannel(dferrors.Newf(dfcodes.SchedNeedBackSource, "peer %s need back source", peer.PeerID)) == nil { - peer.Task.IncreaseBackSourcePeer(peer.PeerID) - } - return - } - logger.Errorf("reScheduleParent: failed to schedule parent to peer %s, reschedule it later", peer.PeerID) - //peer.PacketChan <- constructFailPeerPacket(peer, dfcodes.SchedWithoutParentPeer) - s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second) - return - } - // TODO if parentPeer is equal with oldParent, need schedule again ? - if err := peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates)); err != nil { - logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Warnf("send schedule packet to peer %s failed: %v", peer.PeerID, err) - } -} - -func handleSeedTaskFail(task *supervisor.Task) { +func handleCDNSeedTaskFail(task *supervisor.Task) { if task.NeedClientBackSource() { task.ListPeers().Range(func(data sortedlist.Item) bool { peer := data.(*supervisor.Peer) if task.NeedClientBackSource() { - if peer.CloseChannel(dferrors.Newf(dfcodes.SchedNeedBackSource, "peer %s need back source", peer.PeerID)) == nil { - task.IncreaseBackSourcePeer(peer.PeerID) + if !task.IsBackSourcePeer(peer.PeerID) { + if peer.CloseChannel(dferrors.Newf(dfcodes.SchedNeedBackSource, "peer %s need back source because cdn seed task failed", peer.PeerID)) == nil { + task.IncreaseBackSourcePeer(peer.PeerID) + } } return true } @@ -380,7 +389,9 @@ func handleSeedTaskFail(task *supervisor.Task) { } else { task.ListPeers().Range(func(data sortedlist.Item) bool { peer := data.(*supervisor.Peer) - peer.CloseChannel(dferrors.New(dfcodes.SchedTaskStatusError, "schedule task status failed")) + if err := peer.CloseChannel(dferrors.New(dfcodes.SchedTaskStatusError, "schedule task status failed")); err != nil { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Warnf("close peer conn channel failed: %v", err) + } return true }) } @@ -391,7 +402,7 @@ func removePeerFromCurrentTree(peer *supervisor.Peer, s *state) { peer.ReplaceParent(nil) // parent frees up upload resources if parent != nil { - children := s.sched.ScheduleChildren(parent) + children := s.sched.ScheduleChildren(parent, sets.NewString(peer.PeerID)) for _, child := range children { if err := child.SendSchedulePacket(constructSuccessPeerPacket(child, peer, nil)); err != nil { logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Warnf("send schedule packet to peer %s failed: %v", child.PeerID, err) diff --git a/scheduler/core/scheduler/basic/basic_scheduler.go b/scheduler/core/scheduler/basic/basic_scheduler.go index feefe00e0..682f57eaf 100644 --- a/scheduler/core/scheduler/basic/basic_scheduler.go +++ b/scheduler/core/scheduler/basic/basic_scheduler.go @@ -25,6 +25,7 @@ import ( "d7y.io/dragonfly/v2/scheduler/core/evaluator/basic" "d7y.io/dragonfly/v2/scheduler/core/scheduler" "d7y.io/dragonfly/v2/scheduler/supervisor" + "k8s.io/apimachinery/pkg/util/sets" ) const name = "basic" @@ -69,13 +70,13 @@ type Scheduler struct { cfg *config.SchedulerConfig } -func (s *Scheduler) ScheduleChildren(peer *supervisor.Peer) (children []*supervisor.Peer) { +func (s *Scheduler) ScheduleChildren(peer *supervisor.Peer, blankChildren sets.String) (children []*supervisor.Peer) { if s.evaluator.IsBadNode(peer) { logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("terminate schedule children flow because peer is bad node") return } freeUpload := peer.Host.GetFreeUploadLoad() - candidateChildren := s.selectCandidateChildren(peer, freeUpload*2) + candidateChildren := s.selectCandidateChildren(peer, freeUpload*2, blankChildren) if len(candidateChildren) == 0 { return nil } @@ -110,7 +111,7 @@ func (s *Scheduler) ScheduleChildren(peer *supervisor.Peer) (children []*supervi return } -func (s *Scheduler) ScheduleParent(peer *supervisor.Peer) (*supervisor.Peer, []*supervisor.Peer, bool) { +func (s *Scheduler) ScheduleParent(peer *supervisor.Peer, blankParents sets.String) (*supervisor.Peer, []*supervisor.Peer, bool) { //if !s.evaluator.NeedAdjustParent(peer) { // logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("stop schedule parent flow because peer is not need adjust parent", peer.PeerID) // if peer.GetParent() == nil { @@ -118,7 +119,7 @@ func (s *Scheduler) ScheduleParent(peer *supervisor.Peer) (*supervisor.Peer, []* // } // return peer.GetParent(), []*types.Peer{peer.GetParent()}, true //} - candidateParents := s.selectCandidateParents(peer, s.cfg.CandidateParentCount) + candidateParents := s.selectCandidateParents(peer, s.cfg.CandidateParentCount, blankParents) if len(candidateParents) == 0 { return nil, nil, false } @@ -142,7 +143,7 @@ func (s *Scheduler) ScheduleParent(peer *supervisor.Peer) (*supervisor.Peer, []* return parents[0], parents[1:], true } -func (s *Scheduler) selectCandidateChildren(peer *supervisor.Peer, limit int) (candidateChildren []*supervisor.Peer) { +func (s *Scheduler) selectCandidateChildren(peer *supervisor.Peer, limit int, blankChildren sets.String) (candidateChildren []*supervisor.Peer) { logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start schedule children flow") defer logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("finish schedule parent flow, select num %d candidate children, "+ "current task tree node count %d, back source peers: %s", len(candidateChildren), peer.Task.ListPeers().Size(), peer.Task.GetBackSourcePeers()) @@ -151,6 +152,10 @@ func (s *Scheduler) selectCandidateChildren(peer *supervisor.Peer, limit int) (c logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer is not selected because it is nil******") return false } + if blankChildren != nil && blankChildren.Has(candidateNode.PeerID) { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer is not selected because it in blank children set******") + return false + } if candidateNode.IsDone() { logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("******candidate child peer %s is not selected because it has done******", candidateNode.PeerID) @@ -213,7 +218,7 @@ func (s *Scheduler) selectCandidateChildren(peer *supervisor.Peer, limit int) (c return } -func (s *Scheduler) selectCandidateParents(peer *supervisor.Peer, limit int) (candidateParents []*supervisor.Peer) { +func (s *Scheduler) selectCandidateParents(peer *supervisor.Peer, limit int, blankParents sets.String) (candidateParents []*supervisor.Peer) { logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debug("start schedule parent flow") defer logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("finish schedule parent flow, select num %d candidates parents, "+ "current task tree node count %d, back source peers: %s", len(candidateParents), peer.Task.ListPeers().Size(), peer.Task.GetBackSourcePeers()) @@ -228,6 +233,10 @@ func (s *Scheduler) selectCandidateParents(peer *supervisor.Peer, limit int) (ca logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer is not selected because it is nil++++++") return false } + if blankParents != nil && blankParents.Has(candidateNode.PeerID) { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer is not selected because it in blank parent set++++++") + return false + } if s.evaluator.IsBadNode(candidateNode) { logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("++++++candidate parent peer %s is not selected because it is badNode++++++", candidateNode.PeerID) diff --git a/scheduler/core/scheduler/scheduler.go b/scheduler/core/scheduler/scheduler.go index 1ae4c5a57..425e8b0d5 100644 --- a/scheduler/core/scheduler/scheduler.go +++ b/scheduler/core/scheduler/scheduler.go @@ -21,14 +21,15 @@ import ( "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/supervisor" + "k8s.io/apimachinery/pkg/util/sets" ) type Scheduler interface { // ScheduleChildren schedule children to a peer - ScheduleChildren(peer *supervisor.Peer) (children []*supervisor.Peer) + ScheduleChildren(peer *supervisor.Peer, blankChildren sets.String) (children []*supervisor.Peer) // ScheduleParent schedule a parent and candidates to a peer - ScheduleParent(peer *supervisor.Peer) (parent *supervisor.Peer, candidateParents []*supervisor.Peer, hasParent bool) + ScheduleParent(peer *supervisor.Peer, blankParents sets.String) (parent *supervisor.Peer, candidateParents []*supervisor.Peer, hasParent bool) } type BuildOptions struct { diff --git a/scheduler/core/service.go b/scheduler/core/service.go index 0bf3b01e4..14fe621eb 100644 --- a/scheduler/core/service.go +++ b/scheduler/core/service.go @@ -22,6 +22,7 @@ import ( "time" "d7y.io/dragonfly/v2/internal/dfcodes" + "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" @@ -39,9 +40,12 @@ import ( "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" ) +const maxRescheduleTimes = 8 + type Options struct { openTel bool disableCDN bool @@ -154,23 +158,30 @@ func (s *SchedulerService) runReScheduleParentLoop(wsdq workqueue.DelayingInterf default: v, shutdown := wsdq.Get() if shutdown { + logger.Infof("wait schedule delay queue is shutdown") break } - peer := v.(*supervisor.Peer) + rsPeer := v.(*rsPeer) + peer := rsPeer.peer wsdq.Done(v) + if rsPeer.times > maxRescheduleTimes { + if peer.CloseChannel(dferrors.Newf(dfcodes.SchedNeedBackSource, "reschedule parent for peer %s already reaches max reschedule times", + peer.PeerID)) == nil { + peer.Task.IncreaseBackSourcePeer(peer.PeerID) + } + continue + } + if peer.Task.IsBackSourcePeer(peer.PeerID) { + logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("runReScheduleLoop: peer is back source client, no need to reschedule it") + continue + } if peer.IsDone() || peer.IsLeave() { logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("runReScheduleLoop: peer has left from waitScheduleParentPeerQueue because peer is done or leave, peer status is %s, "+ "isLeave %t", peer.GetStatus(), peer.IsLeave()) continue } - if peer.GetParent() != nil { - logger.WithTaskAndPeerID(peer.Task.TaskID, - peer.PeerID).Debugf("runReScheduleLoop: peer has left from waitScheduleParentPeerQueue because peer has parent %s", - peer.GetParent().PeerID) - continue - } - s.worker.send(reScheduleParentEvent{peer}) + s.worker.send(reScheduleParentEvent{rsPeer: rsPeer}) } } } @@ -199,7 +210,7 @@ func (s *SchedulerService) GenerateTaskID(url string, meta *base.UrlMeta, peerID } func (s *SchedulerService) SelectParent(peer *supervisor.Peer) (parent *supervisor.Peer, err error) { - parent, _, hasParent := s.sched.ScheduleParent(peer) + parent, _, hasParent := s.sched.ScheduleParent(peer, sets.NewString()) if !hasParent || parent == nil { return nil, errors.Errorf("no parent peer available for peer %v", peer.PeerID) }