/* * Copyright 2022 The Dragonfly Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package peer import ( "context" "errors" "fmt" "io" "sync" "time" "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" commonv1 "d7y.io/api/pkg/apis/common/v1" dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" "d7y.io/dragonfly/v2/client/config" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/dfnet" dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client" ) type pieceTaskSyncManager struct { sync.RWMutex ctx context.Context ctxCancel context.CancelFunc peerTaskConductor *peerTaskConductor pieceRequestCh chan *DownloadPieceRequest workers map[string]*pieceTaskSynchronizer watchdog *synchronizerWatchdog } type pieceTaskSynchronizer struct { *logger.SugaredLoggerOnWith span trace.Span client dfdaemonv1.Daemon_SyncPieceTasksClient dstPeer *schedulerv1.PeerPacket_DestPeer error atomic.Value peerTaskConductor *peerTaskConductor pieceRequestCh chan *DownloadPieceRequest } type synchronizerWatchdog struct { done chan struct{} mainPeer atomic.Value // save *schedulerv1.PeerPacket_DestPeer syncSuccess *atomic.Bool peerTaskConductor *peerTaskConductor } type pieceTaskSynchronizerError struct { err error } // FIXME for compatibility, sync will be called after the dfdaemonclient.GetPieceTasks deprecated and the pieceTaskPoller removed func (s *pieceTaskSyncManager) sync(pp *schedulerv1.PeerPacket, desiredPiece int32) error { var ( peers = map[string]bool{} errs []error ) peers[pp.MainPeer.PeerId] = true // TODO if the worker failed, reconnect and retry s.Lock() defer s.Unlock() if _, ok := s.workers[pp.MainPeer.PeerId]; !ok { err := s.newPieceTaskSynchronizer(s.ctx, pp.MainPeer, desiredPiece) if err != nil { s.peerTaskConductor.Errorf("main peer SyncPieceTasks error: %s", err) errs = append(errs, err) } } for _, p := range pp.CandidatePeers { peers[p.PeerId] = true if _, ok := s.workers[p.PeerId]; !ok { err := s.newPieceTaskSynchronizer(s.ctx, p, desiredPiece) if err != nil { s.peerTaskConductor.Errorf("candidate peer SyncPieceTasks error: %s", err) errs = append(errs, err) } } } // cancel old workers if len(s.workers) != len(peers) { var peersToRemove []string for p, worker := range s.workers { if !peers[p] { worker.close() peersToRemove = append(peersToRemove, p) } } for _, p := range peersToRemove { delete(s.workers, p) } } if len(errs) > 0 { return errs[0] } return nil } func (s *pieceTaskSyncManager) cleanStaleWorker(destPeers []*schedulerv1.PeerPacket_DestPeer) { var ( peers = map[string]bool{} ) for _, p := range destPeers { peers[p.PeerId] = true } // cancel old workers if len(s.workers) != len(peers) { var peersToRemove []string for p, worker := range s.workers { if !peers[p] { worker.close() peersToRemove = append(peersToRemove, p) } } for _, p := range peersToRemove { delete(s.workers, p) } } } func (s *pieceTaskSyncManager) newPieceTaskSynchronizer( ctx context.Context, dstPeer *schedulerv1.PeerPacket_DestPeer, desiredPiece int32) error { request := &commonv1.PieceTaskRequest{ TaskId: s.peerTaskConductor.taskID, SrcPid: s.peerTaskConductor.peerID, DstPid: dstPeer.PeerId, StartNum: uint32(desiredPiece), Limit: 16, } if worker, ok := s.workers[dstPeer.PeerId]; ok { // worker is okay, keep it go on if worker.error.Load() == nil { s.peerTaskConductor.Infof("reuse PieceTaskSynchronizer %s", dstPeer.PeerId) return nil } // clean error worker delete(s.workers, dstPeer.PeerId) } netAddr := &dfnet.NetAddr{ Type: dfnet.TCP, Addr: fmt.Sprintf("%s:%d", dstPeer.Ip, dstPeer.RpcPort), } credentialOpt := grpc.WithTransportCredentials(s.peerTaskConductor.GRPCCredentials) dialCtx, cancel := context.WithTimeout(ctx, s.peerTaskConductor.GRPCDialTimeout) client, err := dfdaemonclient.GetClient(dialCtx, netAddr.String(), credentialOpt) cancel() if err != nil { s.peerTaskConductor.Errorf("get dfdaemon client error: %s, dest peer: %s", err, dstPeer.PeerId) return err } stream, err := client.SyncPieceTasks(ctx, request) // Refer: https://github.com/grpc/grpc-go/blob/v1.44.0/stream.go#L104 // When receive io.EOF, the real error should be discovered using RecvMsg, here is client.Recv() here if err == io.EOF && client != nil { _, err = stream.Recv() } if err != nil { s.peerTaskConductor.Errorf("call SyncPieceTasks error: %s, dest peer: %s", err, dstPeer.PeerId) return err } // TODO the codes.Unimplemented is received only in client.Recv() // when remove legacy get piece grpc, can move this check into synchronizer.receive piecePacket, err := stream.Recv() if err != nil { s.peerTaskConductor.Warnf("receive from SyncPieceTasksClient error: %s, dest peer: %s", err, dstPeer.PeerId) _ = stream.CloseSend() return err } _, span := tracer.Start(s.ctx, config.SpanSyncPieceTasks) span.SetAttributes(config.AttributeTargetPeerID.String(dstPeer.PeerId)) synchronizer := &pieceTaskSynchronizer{ span: span, peerTaskConductor: s.peerTaskConductor, pieceRequestCh: s.pieceRequestCh, client: stream, dstPeer: dstPeer, error: atomic.Value{}, SugaredLoggerOnWith: s.peerTaskConductor.With("targetPeerID", request.DstPid), } s.workers[dstPeer.PeerId] = synchronizer go synchronizer.receive(piecePacket) return nil } func (s *pieceTaskSyncManager) newMultiPieceTaskSynchronizer( destPeers []*schedulerv1.PeerPacket_DestPeer, desiredPiece int32) (legacyPeers []*schedulerv1.PeerPacket_DestPeer) { s.Lock() defer func() { if s.peerTaskConductor.WatchdogTimeout > 0 { s.resetWatchdog(destPeers[0]) } s.Unlock() }() for _, peer := range destPeers { err := s.newPieceTaskSynchronizer(s.ctx, peer, desiredPiece) if err == nil { s.peerTaskConductor.Infof("connected to peer: %s", peer.PeerId) continue } // when err is codes.Unimplemented, fallback to legacy get piece grpc stat, ok := status.FromError(err) if ok && stat.Code() == codes.Unimplemented { // for legacy peers, when get pieces error, will report the error s.peerTaskConductor.Warnf("connect peer %s error: %s, fallback to legacy get piece grpc", peer.PeerId, err) legacyPeers = append(legacyPeers, peer) continue } // other errors, report to scheduler if errors.Is(err, context.DeadlineExceeded) { // connect timeout error, report to scheduler to get more available peers s.reportInvalidPeer(peer, commonv1.Code_ClientConnectionError) s.peerTaskConductor.Infof("connect to peer %s with error: %s, peer is invalid, skip legacy grpc", peer.PeerId, err) } else { // other errors, report to scheduler to get more available peers s.reportInvalidPeer(peer, commonv1.Code_ClientPieceRequestFail) s.peerTaskConductor.Errorf("connect peer %s error: %s, not codes.Unimplemented", peer.PeerId, err) } } s.cleanStaleWorker(destPeers) return legacyPeers } func (s *pieceTaskSyncManager) resetWatchdog(mainPeer *schedulerv1.PeerPacket_DestPeer) { if s.watchdog != nil { close(s.watchdog.done) s.peerTaskConductor.Debugf("close old watchdog") } s.watchdog = &synchronizerWatchdog{ done: make(chan struct{}), mainPeer: atomic.Value{}, syncSuccess: atomic.NewBool(false), peerTaskConductor: s.peerTaskConductor, } s.watchdog.mainPeer.Store(mainPeer) s.peerTaskConductor.Infof("start new watchdog") go s.watchdog.watch(s.peerTaskConductor.WatchdogTimeout) } func compositePieceResult(peerTaskConductor *peerTaskConductor, destPeer *schedulerv1.PeerPacket_DestPeer, code commonv1.Code) *schedulerv1.PieceResult { return &schedulerv1.PieceResult{ TaskId: peerTaskConductor.taskID, SrcPid: peerTaskConductor.peerID, DstPid: destPeer.PeerId, PieceInfo: &commonv1.PieceInfo{}, Success: false, Code: code, HostLoad: nil, FinishedCount: peerTaskConductor.readyPieces.Settled(), } } func (s *pieceTaskSyncManager) reportInvalidPeer(destPeer *schedulerv1.PeerPacket_DestPeer, code commonv1.Code) { sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, destPeer, code)) if sendError != nil { s.peerTaskConductor.Errorf("connect peer %s failed and send piece result with error: %s", destPeer.PeerId, sendError) go s.peerTaskConductor.cancel(commonv1.Code_SchedError, sendError.Error()) } else { s.peerTaskConductor.Debugf("report invalid peer %s/%d to scheduler", destPeer.PeerId, code) } } // acquire send the target piece to other peers func (s *pieceTaskSyncManager) acquire(request *commonv1.PieceTaskRequest) (attempt int, success int) { s.RLock() for _, p := range s.workers { attempt++ if p.acquire(request) == nil { success++ } } s.RUnlock() return } func (s *pieceTaskSyncManager) cancel() { s.ctxCancel() s.Lock() for _, p := range s.workers { p.close() } s.workers = map[string]*pieceTaskSynchronizer{} s.Unlock() } func (s *pieceTaskSynchronizer) close() { if err := s.client.CloseSend(); err != nil { s.error.Store(&pieceTaskSynchronizerError{err}) s.Debugf("close send error: %s, dest peer: %s", err, s.dstPeer.PeerId) s.span.RecordError(err) } s.span.End() } func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *commonv1.PiecePacket) { s.peerTaskConductor.updateMetadata(piecePacket) pieceCount := len(piecePacket.PieceInfos) s.Debugf("dispatch piece request, piece count: %d, dest peer: %s", pieceCount, s.dstPeer.PeerId) // peers maybe send zero piece info, but with total piece count and content length if pieceCount == 0 { finished := s.peerTaskConductor.isCompleted() if finished { s.peerTaskConductor.Done() } return } for _, piece := range piecePacket.PieceInfos { s.Infof("got piece %d from %s/%s, digest: %s, start: %d, size: %d", piece.PieceNum, piecePacket.DstAddr, piecePacket.DstPid, piece.PieceMd5, piece.RangeStart, piece.RangeSize) // FIXME when set total piece but no total digest, fetch again s.peerTaskConductor.requestedPiecesLock.Lock() if !s.peerTaskConductor.requestedPieces.IsSet(piece.PieceNum) { s.peerTaskConductor.requestedPieces.Set(piece.PieceNum) } s.peerTaskConductor.requestedPiecesLock.Unlock() req := &DownloadPieceRequest{ storage: s.peerTaskConductor.GetStorage(), piece: piece, log: s.peerTaskConductor.Log(), TaskID: s.peerTaskConductor.GetTaskID(), PeerID: s.peerTaskConductor.GetPeerID(), DstPid: piecePacket.DstPid, DstAddr: piecePacket.DstAddr, } select { case s.pieceRequestCh <- req: s.span.AddEvent(fmt.Sprintf("send piece #%d request to piece download queue", piece.PieceNum)) case <-s.peerTaskConductor.successCh: s.Infof("peer task success, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId) case <-s.peerTaskConductor.failCh: s.Warnf("peer task fail, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId) } } } func (s *pieceTaskSynchronizer) receive(piecePacket *commonv1.PiecePacket) { var err error for { s.dispatchPieceRequest(piecePacket) piecePacket, err = s.client.Recv() if err != nil { break } } if err == io.EOF { s.Debugf("synchronizer receives io.EOF") } else if s.canceled(err) { s.Debugf("synchronizer receives canceled") s.error.Store(&pieceTaskSynchronizerError{err}) } else { s.Errorf("synchronizer receives with error: %s", err) s.error.Store(&pieceTaskSynchronizerError{err}) s.reportError(err) s.Errorf("synchronizer receives with error: %s", err) } } func (s *pieceTaskSynchronizer) acquire(request *commonv1.PieceTaskRequest) error { if s.error.Load() != nil { err := s.error.Load().(*pieceTaskSynchronizerError).err s.Debugf("synchronizer already error %s, skip acquire more pieces", err) return err } request.DstPid = s.dstPeer.PeerId err := s.client.Send(request) s.span.AddEvent(fmt.Sprintf("send piece #%d request", request.StartNum)) if err != nil { // send should always ok s.error.Store(&pieceTaskSynchronizerError{err}) s.Errorf("synchronizer sends with error: %s", err) s.reportError(err) } return err } func (s *pieceTaskSynchronizer) reportError(err error) { s.span.RecordError(err) sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult(s.peerTaskConductor, s.dstPeer, commonv1.Code_ClientPieceRequestFail)) if sendError != nil { s.Errorf("sync piece info failed and send piece result with error: %s", sendError) go s.peerTaskConductor.cancel(commonv1.Code_SchedError, sendError.Error()) } else { s.Debugf("report sync piece error to scheduler") } } func (s *pieceTaskSynchronizer) canceled(err error) bool { if err == context.Canceled { s.Debugf("context canceled, dst peer: %s", s.dstPeer.PeerId) return true } if stat, ok := err.(interface{ GRPCStatus() *status.Status }); ok { if stat.GRPCStatus().Code() == codes.Canceled { s.Debugf("grpc canceled, dst peer: %s", s.dstPeer.PeerId) return true } } return false } func (s *synchronizerWatchdog) watch(timeout time.Duration) { select { case <-time.After(timeout): if s.peerTaskConductor.readyPieces.Settled() == 0 { s.peerTaskConductor.Warnf("watch sync pieces timeout, may be a bug, " + "please file a issue in https://github.com/dragonflyoss/Dragonfly2/issues") s.syncSuccess.Store(false) s.reportWatchFailed() } else { s.peerTaskConductor.Infof("watch sync pieces ok") } case <-s.peerTaskConductor.successCh: s.peerTaskConductor.Debugf("peer task success, watchdog exit") case <-s.peerTaskConductor.failCh: s.peerTaskConductor.Debugf("peer task fail, watchdog exit") case <-s.done: s.peerTaskConductor.Debugf("watchdog done, exit") } } func (s *synchronizerWatchdog) reportWatchFailed() { sendError := s.peerTaskConductor.sendPieceResult(compositePieceResult( s.peerTaskConductor, s.mainPeer.Load().(*schedulerv1.PeerPacket_DestPeer), commonv1.Code_ClientPieceRequestFail)) if sendError != nil { s.peerTaskConductor.Errorf("watchdog sync piece info failed and send piece result with error: %s", sendError) go s.peerTaskConductor.cancel(commonv1.Code_SchedError, sendError.Error()) } else { s.peerTaskConductor.Debugf("report watchdog sync piece error to scheduler") } }