diff --git a/client/config/peerhost_linux.go b/client/config/peerhost_linux.go index cf5605b0c..2c7992846 100644 --- a/client/config/peerhost_linux.go +++ b/client/config/peerhost_linux.go @@ -21,6 +21,7 @@ package config import ( "net" + "d7y.io/dragonfly/v2/pkg/basic/dfnet" "golang.org/x/time/rate" "d7y.io/dragonfly/v2/client/clientutil" @@ -42,7 +43,12 @@ var peerHostConfig = DaemonOption{ GCInterval: clientutil.Duration{Duration: DefaultGCInterval}, KeepStorage: false, Scheduler: SchedulerOption{ - NetAddrs: nil, + NetAddrs: []dfnet.NetAddr{ + { + Type: dfnet.TCP, + Addr: "127.0.0.1:8002", + }, + }, ScheduleTimeout: clientutil.Duration{Duration: DefaultScheduleTimeout}, }, Host: HostOption{ diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index dd28df4b8..2376aae07 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -684,7 +684,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer if getErr != nil { span.RecordError(getErr) // fast way to exit retry - if curPeerPacket != pt.peerPacket { + if curPeerPacket.MainPeer.PeerId != pt.peerPacket.MainPeer.PeerId { pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getErr, curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId) peerPacketChanged = true @@ -709,7 +709,7 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer pt.Errorf("send piece result error: %s, code: %d", peer.PeerId, er) } // fast way to exit retry - if curPeerPacket != pt.peerPacket { + if curPeerPacket.MainPeer.PeerId != pt.peerPacket.MainPeer.PeerId { pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId) peerPacketChanged = true diff --git a/cmd/dependency/dependency.go b/cmd/dependency/dependency.go index 7a05b027f..48d6edf7b 100644 --- a/cmd/dependency/dependency.go +++ b/cmd/dependency/dependency.go @@ -99,7 +99,7 @@ func InitMonitor(verbose bool, pprofPort int, jaeger string) func() { pprofPort, _ = freeport.GetFreePort() } - debugAddr := fmt.Sprintf("localhost:%d", pprofPort) + debugAddr := fmt.Sprintf("%s:%d", iputils.HostIP, pprofPort) viewer.SetConfiguration(viewer.WithAddr(debugAddr)) logger.With("pprof", fmt.Sprintf("http://%s/debug/pprof", debugAddr), diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 505acd58f..3456e6be1 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -214,7 +214,10 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st } if selected { if client, ok := conn.node2ClientMap.Load(node); ok { - return client.(*candidateClient), nil + return &candidateClient{ + node: candidateNode, + Ref: client, + }, nil } } } @@ -226,7 +229,7 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st if len(ringNodes) == 0 { return nil, dferrors.ErrNoCandidateNode } - candidateNodes := make([]string, 0, 0) + candidateNodes := make([]string, 0) for _, ringNode := range ringNodes { candidate := true for _, exclusiveNode := range exclusiveNodes { diff --git a/pkg/rpc/scheduler/client/peer_packet_stream.go b/pkg/rpc/scheduler/client/peer_packet_stream.go index a62bbf01f..d3983dba0 100644 --- a/pkg/rpc/scheduler/client/peer_packet_stream.go +++ b/pkg/rpc/scheduler/client/peer_packet_stream.go @@ -19,7 +19,6 @@ package client import ( "context" "io" - "time" "github.com/pkg/errors" "google.golang.org/grpc" @@ -56,7 +55,7 @@ func newPeerPacketStream(ctx context.Context, sc *schedulerClient, hashKey strin pps := &peerPacketStream{ sc: sc, - ctx: ctx, + ctx: context.Background(), hashKey: hashKey, ptr: ptr, opts: opts, @@ -126,9 +125,9 @@ func (pps *peerPacketStream) retryRecv(cause error) (*scheduler.PeerPacket, erro if err != nil { return nil, err } - timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _, err = client.RegisterPeerTask(timeCtx, pps.ptr) + //timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + //defer cancel() + _, err = client.RegisterPeerTask(pps.ctx, pps.ptr) if err != nil { return nil, err } @@ -198,9 +197,9 @@ func (pps *peerPacketStream) replaceClient(cause error) error { if err != nil { return nil, err } - timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _, err = client.RegisterPeerTask(timeCtx, pps.ptr) + //timeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + //defer cancel() + _, err = client.RegisterPeerTask(pps.ctx, pps.ptr) if err != nil { return nil, err } diff --git a/scheduler/core/events.go b/scheduler/core/events.go index aee72229c..6e9b24f43 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -74,11 +74,7 @@ func (s *state) start() { s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second) continue } - if peer.PacketChan == nil { - logger.Errorf("waitScheduleParentPeerQueue: there is no packet chan associated with peer %s", peer.PeerID) - return - } - peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates) + peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates)) logger.WithTaskAndPeerID(peer.Task.TaskID, peer.PeerID).Debugf("waitScheduleParentPeerQueue: peer has left from waitScheduleParentPeerQueue because it has scheduled new parent %v", parent) s.waitScheduleParentPeerQueue.Done(v) @@ -104,15 +100,11 @@ func (e startReportPieceResultEvent) apply(s *state) { s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second) return } - if e.peer.PacketChan == nil { - logger.Errorf("start report piece result: there is no packet chan associated with peer %s", e.peer.PeerID) - return - } - e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parent, candidates) + e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parent, candidates)) } func (e startReportPieceResultEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } type peerDownloadPieceSuccessEvent struct { @@ -140,17 +132,13 @@ func (e peerDownloadPieceSuccessEvent) apply(s *state) { if oldParent != nil { candidates = append(candidates, oldParent) } - if e.peer.PacketChan == nil { - logger.Errorf("peerDownloadPieceSuccessEvent: there is no packet chan with peer %s", e.peer.PeerID) - return - } // TODO if parentPeer is equal with oldParent, need schedule again ? - e.peer.PacketChan <- constructSuccessPeerPacket(e.peer, parentPeer, candidates) + e.peer.SendSchedulePacket(constructSuccessPeerPacket(e.peer, parentPeer, candidates)) return } func (e peerDownloadPieceSuccessEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } type peerDownloadPieceFailEvent struct { @@ -183,7 +171,7 @@ func (e peerDownloadPieceFailEvent) apply(s *state) { } } func (e peerDownloadPieceFailEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } type peerReplaceParentEvent struct { @@ -191,7 +179,7 @@ type peerReplaceParentEvent struct { } func (e peerReplaceParentEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } func (e peerReplaceParentEvent) apply(s *state) { @@ -226,20 +214,13 @@ func (e peerDownloadSuccessEvent) apply(s *state) { removePeerFromCurrentTree(e.peer, s) children := s.sched.ScheduleChildren(e.peer) for _, child := range children { - if child.PacketChan == nil { - logger.Debugf("reportPeerSuccessResult: there is no packet chan with peer %s", e.peer.PeerID) - continue - } - child.PacketChan <- constructSuccessPeerPacket(child, e.peer, nil) - } - if e.peer.PacketChan != nil { - close(e.peer.PacketChan) - e.peer.PacketChan = nil + child.SendSchedulePacket(constructSuccessPeerPacket(child, e.peer, nil)) } + e.peer.UnBindSendChannel() } func (e peerDownloadSuccessEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } type peerDownloadFailEvent struct { @@ -260,22 +241,14 @@ func (e peerDownloadFailEvent) apply(s *state) { s.waitScheduleParentPeerQueue.AddAfter(e.peer, time.Second) return true } - if child.PacketChan == nil { - logger.Warnf("reportPeerFailResult: there is no packet chan associated with peer %s", e.peer.PeerID) - return true - } - child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates) + child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates)) return true }) - if e.peer.PacketChan != nil { - close(e.peer.PacketChan) - e.peer.PacketChan = nil - } s.peerManager.Delete(e.peer.PeerID) } func (e peerDownloadFailEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } type peerLeaveEvent struct { @@ -289,7 +262,7 @@ func (e peerLeaveEvent) apply(s *state) { } func (e peerLeaveEvent) hashKey() string { - return e.peer.PeerID + return e.peer.Task.TaskID } func constructSuccessPeerPacket(peer *types.Peer, parent *types.Peer, candidates []*types.Peer) *schedulerRPC.PeerPacket { @@ -337,11 +310,7 @@ func handlePeerLeave(peer *types.Peer, s *state) { s.waitScheduleParentPeerQueue.AddAfter(child, time.Second) return true } - if child.PacketChan == nil { - logger.Debugf("handlePeerLeave: there is no packet chan with peer %s", child.PeerID) - return true - } - child.PacketChan <- constructSuccessPeerPacket(child, parent, candidates) + child.SendSchedulePacket(constructSuccessPeerPacket(child, parent, candidates)) return true }) s.peerManager.Delete(peer.PeerID) @@ -355,22 +324,14 @@ func handleReplaceParent(peer *types.Peer, s *state) { s.waitScheduleParentPeerQueue.AddAfter(peer, time.Second) return } - if peer.PacketChan == nil { - logger.Errorf("handleReplaceParent: there is no packet chan with peer %s", peer.PeerID) - return - } - peer.PacketChan <- constructSuccessPeerPacket(peer, parent, candidates) + peer.SendSchedulePacket(constructSuccessPeerPacket(peer, parent, candidates)) } func handleSeedTaskFail(task *types.Task) { if task.IsFail() { task.ListPeers().Range(func(data sortedlist.Item) bool { peer := data.(*types.Peer) - if peer.PacketChan == nil { - logger.Debugf("taskSeedFailEvent: there is no packet chan with peer %s", peer.PeerID) - return true - } - peer.PacketChan <- constructFailPeerPacket(peer, dfcodes.CdnError) + peer.SendSchedulePacket(constructFailPeerPacket(peer, dfcodes.CdnError)) return true }) } @@ -383,11 +344,7 @@ func removePeerFromCurrentTree(peer *types.Peer, s *state) { if parent != nil { children := s.sched.ScheduleChildren(parent) for _, child := range children { - if child.PacketChan == nil { - logger.Debugf("removePeerFromCurrentTree: there is no packet chan with peer %s", peer.PeerID) - continue - } - child.PacketChan <- constructSuccessPeerPacket(child, peer, nil) + child.SendSchedulePacket(constructSuccessPeerPacket(child, peer, nil)) } } } diff --git a/scheduler/core/monitor.go b/scheduler/core/monitor.go index f6b5e6acb..dd23de574 100644 --- a/scheduler/core/monitor.go +++ b/scheduler/core/monitor.go @@ -87,7 +87,7 @@ func (m *monitor) printDebugInfo() string { parentNode = peer.GetParent().PeerID } - table.Append([]string{peer.PeerID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)-5], parentNode, peer.GetStatus().String(), + table.Append([]string{peer.PeerID, peer.Task.URL[len(peer.Task.URL)-15 : len(peer.Task.URL)], parentNode, peer.GetStatus().String(), peer.CreateTime.String(), strconv.Itoa(int(peer.GetFinishedNum())), strconv.FormatBool(peer.IsSuccess()), strconv.Itoa(peer.Host.GetFreeUploadLoad())}) diff --git a/scheduler/core/scheduler_service.go b/scheduler/core/scheduler_service.go index defba62e2..e10f70f2e 100644 --- a/scheduler/core/scheduler_service.go +++ b/scheduler/core/scheduler_service.go @@ -184,6 +184,9 @@ func (s *SchedulerService) GetOrCreateTask(ctx context.Context, task *types.Task if task.IsFrozen() { task.SetStatus(types.TaskStatusRunning) } + //if s.config.DisableCDN { + // TODO NeedBackSource + //} go func() { if err := s.cdnManager.StartSeedTask(ctx, task); err != nil { if !task.IsSuccess() { diff --git a/scheduler/core/worker.go b/scheduler/core/worker.go index 7f35c1687..2c14a1628 100644 --- a/scheduler/core/worker.go +++ b/scheduler/core/worker.go @@ -64,7 +64,7 @@ func (wg *workerGroup) send(e event) bool { func (wg *workerGroup) stop() { close(wg.stopCh) - wg.s.start() + wg.s.stop() for _, worker := range wg.workerList { worker.stop() } diff --git a/scheduler/daemon/cdn/d7y/manager.go b/scheduler/daemon/cdn/d7y/manager.go index 083d0900e..5bedc2f86 100644 --- a/scheduler/daemon/cdn/d7y/manager.go +++ b/scheduler/daemon/cdn/d7y/manager.go @@ -114,7 +114,7 @@ func (cm *manager) StartSeedTask(ctx context.Context, task *types.Task) error { } func (cm *manager) receivePiece(task *types.Task, stream *client.PieceSeedStream) error { - var once sync.Once + var initialized bool var cdnPeer *types.Peer for { piece, err := stream.Recv() @@ -138,9 +138,10 @@ func (cm *manager) receivePiece(task *types.Task, stream *client.PieceSeedStream return errors.Wrapf(ErrCDNInvokeFail, "receive piece from cdn: %v", err) } if piece != nil { - once.Do(func() { + if !initialized { cdnPeer, err = cm.initCdnPeer(task, piece) - }) + initialized = true + } if err != nil || cdnPeer == nil { return err } diff --git a/scheduler/daemon/peer/manager.go b/scheduler/daemon/peer/manager.go index b6e77252d..d450f486b 100644 --- a/scheduler/daemon/peer/manager.go +++ b/scheduler/daemon/peer/manager.go @@ -70,11 +70,7 @@ func (m *manager) Delete(peerID string) { if ok { peer.Host.DeletePeer(peerID) peer.Task.DeletePeer(peer) - if peer.PacketChan != nil { - close(peer.PacketChan) - logger.Infof("close peer %s stream", peerID) - peer.PacketChan = nil - } + peer.UnBindSendChannel() m.peerMap.Delete(peerID) } return diff --git a/scheduler/server/service/scheduler_server.go b/scheduler/server/service/scheduler_server.go index d190dca0a..9aa0c0eb8 100644 --- a/scheduler/server/service/scheduler_server.go +++ b/scheduler/server/service/scheduler_server.go @@ -33,6 +33,8 @@ import ( "d7y.io/dragonfly/v2/scheduler/core" "d7y.io/dragonfly/v2/scheduler/types" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type SchedulerServer struct { @@ -118,24 +120,33 @@ func (s *SchedulerServer) RegisterPeerTask(ctx context.Context, request *schedul func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPieceResultServer) error { peerPacketChan := make(chan *scheduler.PeerPacket, 1) + var peer *types.Peer + var initialized bool + ctx, cancel := context.WithCancel(stream.Context()) + g, ctx := errgroup.WithContext(ctx) var once sync.Once - g, ctx := errgroup.WithContext(context.Background()) - stopCh := make(chan struct{}) g.Go(func() error { + defer func() { + cancel() + once.Do(peer.UnBindSendChannel) + }() for { - var peer *types.Peer select { case <-ctx.Done(): return nil - case <-stopCh: - return nil default: pieceResult, err := stream.Recv() if err == io.EOF { return nil } if err != nil { - return dferrors.Newf(dfcodes.SchedPeerPieceResultReportFail, "peer piece result report error") + if status.Code(err) == codes.Canceled { + if peer != nil { + logger.Info("peer %s canceled", peer.PeerID) + return nil + } + } + return dferrors.Newf(dfcodes.SchedPeerPieceResultReportFail, "peer piece result report error: %v", err) } logger.Debugf("report piece result %v of peer %s", pieceResult, pieceResult.SrcPid) var ok bool @@ -143,10 +154,11 @@ func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPie if !ok { return dferrors.Newf(dfcodes.SchedPeerNotFound, "peer %s not found", pieceResult.SrcPid) } - once.Do(func() { + if !initialized { peer.BindSendChannel(peerPacketChan) peer.SetStatus(types.PeerStatusRunning) - }) + initialized = true + } if err := s.service.HandlePieceResult(peer, pieceResult); err != nil { logger.Errorf("handle piece result %v fail: %v", pieceResult, err) } @@ -155,21 +167,22 @@ func (s *SchedulerServer) ReportPieceResult(stream scheduler.Scheduler_ReportPie }) g.Go(func() error { + defer func() { + cancel() + once.Do(peer.UnBindSendChannel) + }() for { select { case <-ctx.Done(): return nil - case <-stopCh: - return nil case pp, ok := <-peerPacketChan: if !ok { - close(stopCh) return nil } err := stream.Send(pp) if err != nil { logger.Errorf("send peer %s schedule packet %v failed: %v", pp.SrcPid, pp, err) - return err + return dferrors.Newf(dfcodes.SchedPeerPieceResultReportFail, "peer piece result report error: %v", err) } } } diff --git a/scheduler/types/peer.go b/scheduler/types/peer.go index 0675ee429..28ce9712b 100644 --- a/scheduler/types/peer.go +++ b/scheduler/types/peer.go @@ -59,8 +59,10 @@ type Peer struct { Task *Task // Host specifies Host *PeerHost + // bindPacketChan + bindPacketChan bool // PacketChan send schedulerPacket to peer client - PacketChan chan *scheduler.PeerPacket + packetChan chan *scheduler.PeerPacket // createTime CreateTime time.Time // finishedNum specifies downloaded finished piece number @@ -293,11 +295,27 @@ func (peer *Peer) SetStatus(status PeerStatus) { func (peer *Peer) BindSendChannel(packetChan chan *scheduler.PeerPacket) { peer.lock.Lock() defer peer.lock.Unlock() - peer.PacketChan = packetChan + peer.bindPacketChan = true + peer.packetChan = packetChan } -func (peer *Peer) GetSendChannel() chan *scheduler.PeerPacket { - return peer.PacketChan +func (peer *Peer) UnBindSendChannel() { + peer.lock.Lock() + defer peer.lock.Unlock() + if peer.bindPacketChan { + if peer.packetChan != nil { + close(peer.packetChan) + } + peer.bindPacketChan = false + } +} + +func (peer *Peer) SendSchedulePacket(packet *scheduler.PeerPacket) { + peer.lock.Lock() + defer peer.lock.Unlock() + if peer.bindPacketChan { + peer.packetChan <- packet + } } func (peer *Peer) IsRunning() bool {