From d79a2a687737f331f81bd701624d17059af9ecee Mon Sep 17 00:00:00 2001 From: sunwp <244372610@qq.com> Date: Mon, 29 Nov 2021 11:13:35 +0800 Subject: [PATCH] move RPC code definition to proto file (#829) * move rpc code to proto * replace Cdn to CDN Signed-off-by: sunwp <244372610@qq.com> --- cdn/rpcserver/rpcserver.go | 25 +- client/daemon/peer/peertask_base.go | 55 ++-- client/daemon/peer/peertask_dummy.go | 4 +- client/daemon/peer/peertask_file.go | 7 +- client/daemon/peer/peertask_file_callback.go | 3 +- client/daemon/peer/peertask_manager_test.go | 3 +- client/daemon/peer/peertask_reuse.go | 4 +- client/daemon/peer/peertask_stream.go | 3 +- ...peertask_stream_backsource_partial_test.go | 5 +- .../daemon/peer/peertask_stream_callback.go | 3 +- client/daemon/peer/piece_manager.go | 5 +- client/daemon/rpcserver/rpcserver.go | 9 +- internal/dfcodes/rpc_code.go | 65 ----- manager/middlewares/error.go | 4 +- pkg/rpc/base/base.pb.go | 144 ++++++++++- pkg/rpc/base/base.pb.validate.go | 240 +++++++++++++++++- pkg/rpc/base/base.proto | 38 +++ pkg/rpc/base/common/common.go | 7 +- pkg/rpc/client.go | 4 +- pkg/rpc/scheduler/client/client.go | 3 +- scheduler/core/events.go | 20 +- scheduler/core/service.go | 6 +- scheduler/rpcserver/rpcserver.go | 15 +- scheduler/supervisor/cdn.go | 10 +- scheduler/supervisor/cdn_test.go | 17 +- 25 files changed, 499 insertions(+), 200 deletions(-) delete mode 100644 internal/dfcodes/rpc_code.go diff --git a/cdn/rpcserver/rpcserver.go b/cdn/rpcserver/rpcserver.go index bffe25ebc..be932bab8 100644 --- a/cdn/rpcserver/rpcserver.go +++ b/cdn/rpcserver/rpcserver.go @@ -30,7 +30,6 @@ import ( cdnerrors "d7y.io/dragonfly/v2/cdn/errors" "d7y.io/dragonfly/v2/cdn/supervisor" "d7y.io/dragonfly/v2/cdn/types" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" @@ -112,7 +111,7 @@ func (css *server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, logger.Infof("obtain seeds request: %+v", req) defer func() { if r := recover(); r != nil { - err = dferrors.Newf(dfcodes.UnknownError, "obtain task(%s) seeds encounter an panic: %v", req.TaskId, r) + err = dferrors.Newf(base.Code_UnknownError, "obtain task(%s) seeds encounter an panic: %v", req.TaskId, r) span.RecordError(err) logger.WithTaskID(req.TaskId).Errorf("%v", err) } @@ -120,7 +119,7 @@ func (css *server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, }() registerRequest, err := constructRegisterRequest(req) if err != nil { - err = dferrors.Newf(dfcodes.BadRequest, "bad seed request for task(%s): %v", req.TaskId, err) + err = dferrors.Newf(base.Code_BadRequest, "bad seed request for task(%s): %v", req.TaskId, err) span.RecordError(err) return err } @@ -128,11 +127,11 @@ func (css *server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, pieceChan, err := css.taskMgr.Register(ctx, registerRequest) if err != nil { if cdnerrors.IsResourcesLacked(err) { - err = dferrors.Newf(dfcodes.ResourceLacked, "resources lacked for task(%s): %v", req.TaskId, err) + err = dferrors.Newf(base.Code_ResourceLacked, "resources lacked for task(%s): %v", req.TaskId, err) span.RecordError(err) return err } - err = dferrors.Newf(dfcodes.CdnTaskRegistryFail, "failed to register seed task(%s): %v", req.TaskId, err) + err = dferrors.Newf(base.Code_CDNTaskRegistryFail, "failed to register seed task(%s): %v", req.TaskId, err) span.RecordError(err) return err } @@ -154,12 +153,12 @@ func (css *server) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, } task, err := css.taskMgr.Get(req.TaskId) if err != nil { - err = dferrors.Newf(dfcodes.CdnError, "failed to get task(%s): %v", req.TaskId, err) + err = dferrors.Newf(base.Code_CDNError, "failed to get task(%s): %v", req.TaskId, err) span.RecordError(err) return err } if !task.IsSuccess() { - err = dferrors.Newf(dfcodes.CdnTaskDownloadFail, "task(%s) status error , status: %s", req.TaskId, task.CdnStatus) + err = dferrors.Newf(base.Code_CDNTaskDownloadFail, "task(%s) status error , status: %s", req.TaskId, task.CdnStatus) span.RecordError(err) return err } @@ -181,36 +180,36 @@ func (css *server) GetPieceTasks(ctx context.Context, req *base.PieceTaskRequest span.SetAttributes(config.AttributeTaskID.String(req.TaskId)) defer func() { if r := recover(); r != nil { - err = dferrors.Newf(dfcodes.UnknownError, "get task(%s) piece tasks encounter an panic: %v", req.TaskId, r) + err = dferrors.Newf(base.Code_UnknownError, "get task(%s) piece tasks encounter an panic: %v", req.TaskId, r) span.RecordError(err) logger.WithTaskID(req.TaskId).Errorf("%v", err) } }() logger.Infof("get piece tasks: %+v", req) if err := checkPieceTasksRequestParams(req); err != nil { - err = dferrors.Newf(dfcodes.BadRequest, "failed to validate seed request for task(%s): %v", req.TaskId, err) + err = dferrors.Newf(base.Code_BadRequest, "failed to validate seed request for task(%s): %v", req.TaskId, err) span.RecordError(err) return nil, err } task, err := css.taskMgr.Get(req.TaskId) if err != nil { if cdnerrors.IsDataNotFound(err) { - err = dferrors.Newf(dfcodes.CdnTaskNotFound, "failed to get task(%s) from cdn: %v", req.TaskId, err) + err = dferrors.Newf(base.Code_CDNTaskNotFound, "failed to get task(%s) from cdn: %v", req.TaskId, err) span.RecordError(err) return nil, err } - err = dferrors.Newf(dfcodes.CdnError, "failed to get task(%s) from cdn: %v", req.TaskId, err) + err = dferrors.Newf(base.Code_CDNError, "failed to get task(%s) from cdn: %v", req.TaskId, err) span.RecordError(err) return nil, err } if task.IsError() { - err = dferrors.Newf(dfcodes.CdnTaskDownloadFail, "fail to download task(%s), cdnStatus: %s", task.TaskID, task.CdnStatus) + err = dferrors.Newf(base.Code_CDNTaskDownloadFail, "fail to download task(%s), cdnStatus: %s", task.TaskID, task.CdnStatus) span.RecordError(err) return nil, err } pieces, err := css.taskMgr.GetPieces(ctx, req.TaskId) if err != nil { - err = dferrors.Newf(dfcodes.CdnError, "failed to get pieces of task(%s) from cdn: %v", task.TaskID, err) + err = dferrors.Newf(base.Code_CDNError, "failed to get pieces of task(%s) from cdn: %v", task.TaskID, err) span.RecordError(err) return nil, err } diff --git a/client/daemon/peer/peertask_base.go b/client/daemon/peer/peertask_base.go index e7104a843..7b2f8c110 100644 --- a/client/daemon/peer/peertask_base.go +++ b/client/daemon/peer/peertask_base.go @@ -32,7 +32,6 @@ import ( "google.golang.org/grpc/status" "d7y.io/dragonfly/v2/client/config" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/retry" @@ -245,9 +244,9 @@ loop: if pt.success { return } - pt.failedCode = dfcodes.UnknownError + pt.failedCode = base.Code_UnknownError if de, ok := err.(*dferrors.DfError); ok { - if de.Code == dfcodes.SchedNeedBackSource { + if de.Code == base.Code_SchedNeedBackSource { pt.needBackSource = true close(pt.peerPacketReady) return @@ -266,7 +265,7 @@ loop: } logger.Debugf("receive peerPacket %v for peer %s", peerPacket, pt.peerID) - if peerPacket.Code != dfcodes.Success { + if peerPacket.Code != base.Code_Success { pt.Errorf("receive peer packet with error: %d", peerPacket.Code) if pt.isExitPeerPacketCode(peerPacket) { pt.cancel() @@ -316,21 +315,21 @@ loop: func (pt *peerTask) isExitPeerPacketCode(pp *scheduler.PeerPacket) bool { switch pp.Code { - case dfcodes.ResourceLacked, dfcodes.BadRequest, dfcodes.PeerTaskNotFound, dfcodes.UnknownError, dfcodes.RequestTimeOut: + case base.Code_ResourceLacked, base.Code_BadRequest, base.Code_PeerTaskNotFound, base.Code_UnknownError, base.Code_RequestTimeOut: // 1xxx pt.failedCode = pp.Code pt.failedReason = fmt.Sprintf("receive exit peer packet with code %d", pp.Code) return true - case dfcodes.SchedError: + case base.Code_SchedError: // 5xxx pt.failedCode = pp.Code pt.failedReason = fmt.Sprintf("receive exit peer packet with code %d", pp.Code) return true - case dfcodes.SchedPeerGone: + case base.Code_SchedPeerGone: pt.failedReason = reasonPeerGoneFromScheduler - pt.failedCode = dfcodes.SchedPeerGone + pt.failedCode = base.Code_SchedPeerGone return true - case dfcodes.CdnError, dfcodes.CdnTaskRegistryFail, dfcodes.CdnTaskDownloadFail: + case base.Code_CDNError, base.Code_CDNTaskRegistryFail, base.Code_CDNTaskDownloadFail: // 6xxx pt.failedCode = pp.Code pt.failedReason = fmt.Sprintf("receive exit peer packet with code %d", pp.Code) @@ -351,7 +350,7 @@ func (pt *peerTask) pullSinglePiece(cleanUnfinishedFunc func()) { pt.SetPieceMd5Sign(pt.singlePiece.PieceInfo.PieceMd5) if err := pt.callback.Init(pt); err != nil { pt.failedReason = err.Error() - pt.failedCode = dfcodes.ClientError + pt.failedCode = base.Code_ClientError cleanUnfinishedFunc() span.RecordError(err) span.SetAttributes(config.AttributePieceSuccess.Bool(false)) @@ -414,7 +413,7 @@ loop: if !pt.success { if pt.failedCode == failedCodeNotSet { pt.failedReason = reasonContextCanceled - pt.failedCode = dfcodes.ClientContextCanceled + pt.failedCode = base.Code_ClientContextCanceled if err := pt.callback.Fail(pt, pt.failedCode, pt.ctx.Err().Error()); err != nil { pt.Errorf("peer task callback failed %s", err) } @@ -501,7 +500,7 @@ func (pt *peerTask) init(piecePacket *base.PiecePacket, pieceBufferSize int32) ( if err := pt.callback.Init(pt); err != nil { pt.span.RecordError(err) pt.failedReason = err.Error() - pt.failedCode = dfcodes.ClientError + pt.failedCode = base.Code_ClientError return nil, false } pc := pt.peerPacket.Load().(*scheduler.PeerPacket).ParallelCount @@ -529,8 +528,8 @@ func (pt *peerTask) waitFirstPeerPacket() (done bool, backSource bool) { time.Now().Sub(pt.callback.GetStartTime()).Microseconds(), pt.peerPacket.Load().(*scheduler.PeerPacket).MainPeer) return true, false } - // when scheduler says dfcodes.SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady - pt.Infof("start download from source due to dfcodes.SchedNeedBackSource") + // when scheduler says base.Code_SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady + pt.Infof("start download from source due to base.Code_SchedNeedBackSource") pt.span.AddEvent("back source due to scheduler says need back source") pt.needBackSource = true pt.backSource() @@ -538,7 +537,7 @@ func (pt *peerTask) waitFirstPeerPacket() (done bool, backSource bool) { case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration): if pt.schedulerOption.DisableAutoBackSource { pt.failedReason = reasonScheduleTimeout - pt.failedCode = dfcodes.ClientScheduleTimeout + pt.failedCode = base.Code_ClientScheduleTimeout err := fmt.Errorf("%s, auto back source disabled", pt.failedReason) pt.span.RecordError(err) pt.Errorf(err.Error()) @@ -564,7 +563,7 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) { if !pt.success { if pt.failedCode == failedCodeNotSet { pt.failedReason = reasonContextCanceled - pt.failedCode = dfcodes.ClientContextCanceled + pt.failedCode = base.Code_ClientContextCanceled } } case _, ok := <-pt.peerPacketReady: @@ -574,8 +573,8 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) { // research from piece 0 return pt.getNextPieceNum(0), true } - // when scheduler says dfcodes.SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady - pt.Infof("start download from source due to dfcodes.SchedNeedBackSource") + // when scheduler says base.Code_SchedNeedBackSource, receivePeerPacket will close pt.peerPacketReady + pt.Infof("start download from source due to base.Code_SchedNeedBackSource") pt.span.AddEvent("back source due to scheduler says need back source ") pt.needBackSource = true // TODO optimize back source when already downloaded some pieces @@ -583,7 +582,7 @@ func (pt *peerTask) waitAvailablePeerPacket() (int32, bool) { case <-time.After(pt.schedulerOption.ScheduleTimeout.Duration): if pt.schedulerOption.DisableAutoBackSource { pt.failedReason = reasonReScheduleTimeout - pt.failedCode = dfcodes.ClientScheduleTimeout + pt.failedCode = base.Code_ClientScheduleTimeout err := fmt.Errorf("%s, auto back source disabled", pt.failedReason) pt.span.RecordError(err) pt.Errorf(err.Error()) @@ -620,7 +619,7 @@ func (pt *peerTask) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceReque if !pt.success { if pt.failedCode == failedCodeNotSet { pt.failedReason = reasonContextCanceled - pt.failedCode = dfcodes.ClientContextCanceled + pt.failedCode = base.Code_ClientContextCanceled } } } @@ -673,7 +672,7 @@ func (pt *peerTask) downloadPieceWorker(id int32, pti Task, requests chan *Downl DstPid: request.DstPid, PieceInfo: request.piece, Success: false, - Code: dfcodes.ClientRequestLimitFail, + Code: base.Code_ClientRequestLimitFail, HostLoad: nil, FinishedCount: 0, // update by peer task }, @@ -683,7 +682,7 @@ func (pt *peerTask) downloadPieceWorker(id int32, pti Task, requests chan *Downl } pt.failedReason = err.Error() - pt.failedCode = dfcodes.ClientRequestLimitFail + pt.failedCode = base.Code_ClientRequestLimitFail pt.cancel() span.SetAttributes(config.AttributePieceSuccess.Bool(false)) span.End() @@ -749,7 +748,7 @@ func (pt *peerTask) preparePieceTasksByPeer(curPeerPacket *scheduler.PeerPacket, span.SetAttributes(config.AttributeGetPieceLimit.Int(int(request.Limit))) defer span.End() - // when cdn returns dfcodes.CdnTaskNotFound, report it to scheduler and wait cdn download it. + // when cdn returns base.Code_CDNTaskNotFound, report it to scheduler and wait cdn download it. retry: pt.Debugf("try get piece task from peer %s, piece num: %d, limit: %d\"", peer.PeerId, request.StartNum, request.Limit) p, err := pt.getPieceTasks(span, curPeerPacket, peer, request) @@ -774,7 +773,7 @@ retry: return nil, err } } - code := dfcodes.ClientPieceRequestFail + code := base.Code_ClientPieceRequestFail // not grpc error if de, ok := err.(*dferrors.DfError); ok && uint32(de.Code) > uint32(codes.Unauthenticated) { pt.Debugf("get piece task from peer %s with df error, code: %d", peer.PeerId, de.Code) @@ -796,8 +795,8 @@ retry: pt.Errorf("send piece result error: %s, code to send: %d", err, code) } - if code == dfcodes.CdnTaskNotFound && curPeerPacket == pt.peerPacket.Load().(*scheduler.PeerPacket) { - span.AddEvent("retry for CdnTaskNotFound") + if code == base.Code_CDNTaskNotFound && curPeerPacket == pt.peerPacket.Load().(*scheduler.PeerPacket) { + span.AddEvent("retry for CDNTaskNotFound") goto retry } return nil, err @@ -832,13 +831,13 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer DstPid: peer.PeerId, PieceInfo: &base.PieceInfo{}, Success: false, - Code: dfcodes.ClientWaitPieceReady, + Code: base.Code_ClientWaitPieceReady, HostLoad: nil, FinishedCount: pt.readyPieces.Settled(), }) if er != nil { span.RecordError(er) - pt.Errorf("send piece result with dfcodes.ClientWaitPieceReady error: %s", er) + pt.Errorf("send piece result with base.Code_ClientWaitPieceReady error: %s", er) } // fast way to exit retry lastPeerPacket := pt.peerPacket.Load().(*scheduler.PeerPacket) diff --git a/client/daemon/peer/peertask_dummy.go b/client/daemon/peer/peertask_dummy.go index ef19144a7..619d32a89 100644 --- a/client/daemon/peer/peertask_dummy.go +++ b/client/daemon/peer/peertask_dummy.go @@ -21,9 +21,9 @@ import ( "google.golang.org/grpc" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" "d7y.io/dragonfly/v2/pkg/basic/dfnet" + "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" ) @@ -58,7 +58,7 @@ type dummyPeerPacketStream struct { } func (d *dummyPeerPacketStream) Recv() (pp *scheduler.PeerPacket, err error) { - return nil, dferrors.New(dfcodes.SchedNeedBackSource, "") + return nil, dferrors.New(base.Code_SchedNeedBackSource, "") } func (d *dummyPeerPacketStream) Send(pr *scheduler.PieceResult) (err error) { diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index 4d3a46f90..232add020 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -27,7 +27,6 @@ import ( "golang.org/x/time/rate" "d7y.io/dragonfly/v2/client/config" - "d7y.io/dragonfly/v2/internal/dfcodes" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" @@ -193,7 +192,7 @@ func newFilePeerTask(ctx context.Context, requestedPieces: NewBitmap(), failedPieceCh: make(chan int32, config.DefaultPieceChanSize), failedReason: failedReasonNotSet, - failedCode: dfcodes.UnknownError, + failedCode: base.Code_UnknownError, contentLength: atomic.NewInt64(-1), pieceParallelCount: atomic.NewInt32(0), totalPiece: -1, @@ -315,7 +314,7 @@ func (pt *filePeerTask) finish() error { var ( success = true - code = dfcodes.Success + code = base.Code_Success message = "Success" progressDone bool ) @@ -325,7 +324,7 @@ func (pt *filePeerTask) finish() error { pt.Errorf("peer task done callback failed: %s", err) pt.span.RecordError(err) success = false - code = dfcodes.ClientError + code = base.Code_ClientError message = err.Error() } diff --git a/client/daemon/peer/peertask_file_callback.go b/client/daemon/peer/peertask_file_callback.go index 984a7f1fb..a4364ac9e 100644 --- a/client/daemon/peer/peertask_file_callback.go +++ b/client/daemon/peer/peertask_file_callback.go @@ -24,7 +24,6 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" ) @@ -112,7 +111,7 @@ func (p *filePeerTaskCallback) Done(pt Task) error { TotalPieceCount: pt.GetTotalPieces(), Cost: uint32(cost), Success: true, - Code: dfcodes.Success, + Code: base.Code_Success, }) if err != nil { peerResultSpan.RecordError(err) diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index f03ddbe5f..e954ccc27 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -42,7 +42,6 @@ import ( "d7y.io/dragonfly/v2/client/daemon/test" mock_daemon "d7y.io/dragonfly/v2/client/daemon/test/mock/daemon" mock_scheduler "d7y.io/dragonfly/v2/client/daemon/test/mock/scheduler" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/pkg/basic/dfnet" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/base" @@ -124,7 +123,7 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio delayCount++ } return &scheduler.PeerPacket{ - Code: dfcodes.Success, + Code: base.Code_Success, TaskId: opt.taskID, SrcPid: "127.0.0.1", ParallelCount: opt.pieceParallelCount, diff --git a/client/daemon/peer/peertask_reuse.go b/client/daemon/peer/peertask_reuse.go index 09a656eae..de800af5c 100644 --- a/client/daemon/peer/peertask_reuse.go +++ b/client/daemon/peer/peertask_reuse.go @@ -28,9 +28,9 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" - "d7y.io/dragonfly/v2/internal/dfcodes" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" + "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" ) @@ -83,7 +83,7 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, pg := &FilePeerTaskProgress{ State: &ProgressState{ Success: true, - Code: dfcodes.Success, + Code: base.Code_Success, Msg: "Success", }, TaskID: taskID, diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index a1f268c1a..b61f78b4d 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -30,7 +30,6 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" - "d7y.io/dragonfly/v2/internal/dfcodes" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" @@ -169,7 +168,7 @@ func newStreamPeerTask(ctx context.Context, requestedPieces: NewBitmap(), failedPieceCh: make(chan int32, config.DefaultPieceChanSize), failedReason: failedReasonNotSet, - failedCode: dfcodes.UnknownError, + failedCode: base.Code_UnknownError, contentLength: atomic.NewInt64(-1), pieceParallelCount: atomic.NewInt32(0), totalPiece: -1, diff --git a/client/daemon/peer/peertask_stream_backsource_partial_test.go b/client/daemon/peer/peertask_stream_backsource_partial_test.go index 00ce65362..cf59ce995 100644 --- a/client/daemon/peer/peertask_stream_backsource_partial_test.go +++ b/client/daemon/peer/peertask_stream_backsource_partial_test.go @@ -40,7 +40,6 @@ import ( "d7y.io/dragonfly/v2/client/daemon/test" mock_daemon "d7y.io/dragonfly/v2/client/daemon/test/mock/daemon" mock_scheduler "d7y.io/dragonfly/v2/client/daemon/test/mock/scheduler" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" "d7y.io/dragonfly/v2/pkg/basic/dfnet" "d7y.io/dragonfly/v2/pkg/rpc" @@ -129,11 +128,11 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte, if schedPeerPacket { // send back source after piece 0 is done wg.Wait() - return nil, dferrors.New(dfcodes.SchedNeedBackSource, "") + return nil, dferrors.New(base.Code_SchedNeedBackSource, "") } schedPeerPacket = true return &scheduler.PeerPacket{ - Code: dfcodes.Success, + Code: base.Code_Success, TaskId: opt.taskID, SrcPid: "127.0.0.1", ParallelCount: opt.pieceParallelCount, diff --git a/client/daemon/peer/peertask_stream_callback.go b/client/daemon/peer/peertask_stream_callback.go index 7d2e81cc2..2b7b4e35b 100644 --- a/client/daemon/peer/peertask_stream_callback.go +++ b/client/daemon/peer/peertask_stream_callback.go @@ -24,7 +24,6 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" ) @@ -110,7 +109,7 @@ func (p *streamPeerTaskCallback) Done(pt Task) error { TotalPieceCount: p.pt.totalPiece, Cost: uint32(cost), Success: true, - Code: dfcodes.Success, + Code: base.Code_Success, }) if err != nil { peerResultSpan.RecordError(err) diff --git a/client/daemon/peer/piece_manager.go b/client/daemon/peer/piece_manager.go index 78a9b9004..d09deacaf 100644 --- a/client/daemon/peer/piece_manager.go +++ b/client/daemon/peer/piece_manager.go @@ -28,7 +28,6 @@ import ( "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" - "d7y.io/dragonfly/v2/internal/dfcodes" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" @@ -171,7 +170,7 @@ func (pm *pieceManager) pushSuccessResult(peerTask Task, dstPid string, piece *b BeginTime: uint64(start), EndTime: uint64(end), Success: true, - Code: dfcodes.Success, + Code: base.Code_Success, HostLoad: nil, // TODO(jim): update host load FinishedCount: piece.PieceNum + 1, // update by peer task // TODO range_start, range_size, piece_md5, piece_offset, piece_style @@ -195,7 +194,7 @@ func (pm *pieceManager) pushFailResult(peerTask Task, dstPid string, piece *base BeginTime: uint64(start), EndTime: uint64(end), Success: false, - Code: dfcodes.ClientPieceDownloadFail, + Code: base.Code_ClientPieceDownloadFail, HostLoad: nil, FinishedCount: 0, // update by peer task }, diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index 1d899312d..faa509454 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -30,7 +30,6 @@ import ( "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/daemon/peer" "d7y.io/dragonfly/v2/client/daemon/storage" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" @@ -88,7 +87,7 @@ func (m *server) GetPieceTasks(ctx context.Context, request *base.PieceTaskReque m.Keep() p, err := m.storageManager.GetPieces(ctx, request) if err != nil { - code := dfcodes.UnknownError + code := base.Code_UnknownError if err != storage.ErrTaskNotFound { logger.Errorf("get piece tasks error: %s, task id: %s, src peer: %s, dst peer: %s, piece num: %d, limit: %d", err, request.TaskId, request.SrcPid, request.DstPid, request.StartNum, request.Limit) @@ -96,7 +95,7 @@ func (m *server) GetPieceTasks(ctx context.Context, request *base.PieceTaskReque } // dst peer is not running if !m.peerTaskManager.IsPeerTaskRunning(request.DstPid) { - code = dfcodes.PeerTaskNotFound + code = base.Code_PeerTaskNotFound logger.Errorf("get piece tasks error: peer task not found, task id: %s, src peer: %s, dst peer: %s, piece num: %d, limit: %d", request.TaskId, request.SrcPid, request.DstPid, request.StartNum, request.Limit) return nil, dferrors.New(code, err.Error()) @@ -151,7 +150,7 @@ func (m *server) Download(ctx context.Context, peerTaskProgress, tiny, err := m.peerTaskManager.StartFilePeerTask(ctx, peerTask) if err != nil { - return dferrors.New(dfcodes.UnknownError, fmt.Sprintf("%s", err)) + return dferrors.New(base.Code_UnknownError, fmt.Sprintf("%s", err)) } if tiny != nil { results <- &dfdaemongrpc.DownResult{ @@ -176,7 +175,7 @@ func (m *server) Download(ctx context.Context, if !ok { err = errors.New("progress closed unexpected") log.Errorf(err.Error()) - return dferrors.New(dfcodes.UnknownError, err.Error()) + return dferrors.New(base.Code_UnknownError, err.Error()) } if !p.State.Success { log.Errorf("task %s/%s failed: %d/%s", p.PeerID, p.TaskID, p.State.Code, p.State.Msg) diff --git a/internal/dfcodes/rpc_code.go b/internal/dfcodes/rpc_code.go deleted file mode 100644 index 1a3f9882b..000000000 --- a/internal/dfcodes/rpc_code.go +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package dfcodes - -import "d7y.io/dragonfly/v2/pkg/rpc/base" - -// rpc code -const ( - // success code 200-299 - Success base.Code = 200 - ServerUnavailable base.Code = 500 // framework can not find server node - - // common response error 1000-1999 - ResourceLacked base.Code = 1000 // client can be migrated to another scheduler/CDN - BadRequest base.Code = 1400 - PeerTaskNotFound base.Code = 1404 - UnknownError base.Code = 1500 - RequestTimeOut base.Code = 1504 - - // client response error 4000-4999 - ClientError base.Code = 4000 - ClientPieceRequestFail base.Code = 4001 // get piece task from other peer error - ClientScheduleTimeout base.Code = 4002 // wait scheduler response timeout - ClientContextCanceled base.Code = 4003 - ClientWaitPieceReady base.Code = 4004 // when target peer downloads from source slowly, should wait - ClientPieceDownloadFail base.Code = 4005 - ClientRequestLimitFail base.Code = 4006 - - // scheduler response error 5000-5999 - SchedError base.Code = 5000 - SchedNeedBackSource base.Code = 5001 // client should try to download from source - SchedPeerGone base.Code = 5002 // client should disconnect from scheduler - SchedPeerNotFound base.Code = 5004 // peer not found in scheduler - SchedPeerPieceResultReportFail base.Code = 5005 // report piece - SchedTaskStatusError base.Code = 5006 // task status is fail - - // cdnsystem response error 6000-6999 - CdnError base.Code = 6000 - CdnTaskRegistryFail base.Code = 6001 - CdnTaskDownloadFail base.Code = 6002 - CdnTaskNotFound base.Code = 6404 - - // manager response error 7000-7999 - ManagerError base.Code = 7000 - InvalidResourceType base.Code = 7001 - ManagerHostError base.Code = 7002 - ManagerStoreError base.Code = 7003 - ManagerConfigError base.Code = 7004 - ManagerStoreNotFound base.Code = 7005 - SchedulerNodesNotFound base.Code = 7006 -) diff --git a/manager/middlewares/error.go b/manager/middlewares/error.go index fc2f48d25..9ea9ffa6d 100644 --- a/manager/middlewares/error.go +++ b/manager/middlewares/error.go @@ -27,8 +27,8 @@ import ( "golang.org/x/crypto/bcrypt" "gorm.io/gorm" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" + "d7y.io/dragonfly/v2/pkg/rpc/base" ) type ErrorResponse struct { @@ -57,7 +57,7 @@ func Error() gin.HandlerFunc { // RPC error handler if err, ok := errors.Cause(err.Err).(*dferrors.DfError); ok { switch err.Code { - case dfcodes.InvalidResourceType: + case base.Code_InvalidResourceType: c.JSON(http.StatusBadRequest, ErrorResponse{ Message: http.StatusText(http.StatusBadRequest), }) diff --git a/pkg/rpc/base/base.pb.go b/pkg/rpc/base/base.pb.go index a93bdc883..757aa1dfd 100644 --- a/pkg/rpc/base/base.pb.go +++ b/pkg/rpc/base/base.pb.go @@ -39,15 +39,98 @@ type Code int32 const ( Code_X_UNSPECIFIED Code = 0 + // success code 200-299 + Code_Success Code = 200 + // framework can not find server node + Code_ServerUnavailable Code = 500 + // common response error 1000-1999 + // client can be migrated to another scheduler/CDN + Code_ResourceLacked Code = 1000 + Code_BadRequest Code = 1400 + Code_PeerTaskNotFound Code = 1404 + Code_UnknownError Code = 1500 + Code_RequestTimeOut Code = 1504 + // client response error 4000-4999 + Code_ClientError Code = 4000 + Code_ClientPieceRequestFail Code = 4001 // get piece task from other peer error + Code_ClientScheduleTimeout Code = 4002 // wait scheduler response timeout + Code_ClientContextCanceled Code = 4003 + Code_ClientWaitPieceReady Code = 4004 // when target peer downloads from source slowly, should wait + Code_ClientPieceDownloadFail Code = 4005 + Code_ClientRequestLimitFail Code = 4006 + // scheduler response error 5000-5999 + Code_SchedError Code = 5000 + Code_SchedNeedBackSource Code = 5001 // client should try to download from source + Code_SchedPeerGone Code = 5002 // client should disconnect from scheduler + Code_SchedPeerNotFound Code = 5004 // peer not found in scheduler + Code_SchedPeerPieceResultReportFail Code = 5005 // report piece + Code_SchedTaskStatusError Code = 5006 // task status is fail + // cdnsystem response error 6000-6999 + Code_CDNError Code = 6000 + Code_CDNTaskRegistryFail Code = 6001 + Code_CDNTaskDownloadFail Code = 6002 + Code_CDNTaskNotFound Code = 6404 + // manager response error 7000-7999 + Code_InvalidResourceType Code = 7001 ) // Enum value maps for Code. var ( Code_name = map[int32]string{ - 0: "X_UNSPECIFIED", + 0: "X_UNSPECIFIED", + 200: "Success", + 500: "ServerUnavailable", + 1000: "ResourceLacked", + 1400: "BadRequest", + 1404: "PeerTaskNotFound", + 1500: "UnknownError", + 1504: "RequestTimeOut", + 4000: "ClientError", + 4001: "ClientPieceRequestFail", + 4002: "ClientScheduleTimeout", + 4003: "ClientContextCanceled", + 4004: "ClientWaitPieceReady", + 4005: "ClientPieceDownloadFail", + 4006: "ClientRequestLimitFail", + 5000: "SchedError", + 5001: "SchedNeedBackSource", + 5002: "SchedPeerGone", + 5004: "SchedPeerNotFound", + 5005: "SchedPeerPieceResultReportFail", + 5006: "SchedTaskStatusError", + 6000: "CDNError", + 6001: "CDNTaskRegistryFail", + 6002: "CDNTaskDownloadFail", + 6404: "CDNTaskNotFound", + 7001: "InvalidResourceType", } Code_value = map[string]int32{ - "X_UNSPECIFIED": 0, + "X_UNSPECIFIED": 0, + "Success": 200, + "ServerUnavailable": 500, + "ResourceLacked": 1000, + "BadRequest": 1400, + "PeerTaskNotFound": 1404, + "UnknownError": 1500, + "RequestTimeOut": 1504, + "ClientError": 4000, + "ClientPieceRequestFail": 4001, + "ClientScheduleTimeout": 4002, + "ClientContextCanceled": 4003, + "ClientWaitPieceReady": 4004, + "ClientPieceDownloadFail": 4005, + "ClientRequestLimitFail": 4006, + "SchedError": 5000, + "SchedNeedBackSource": 5001, + "SchedPeerGone": 5002, + "SchedPeerNotFound": 5004, + "SchedPeerPieceResultReportFail": 5005, + "SchedTaskStatusError": 5006, + "CDNError": 6000, + "CDNTaskRegistryFail": 6001, + "CDNTaskDownloadFail": 6002, + "CDNTaskNotFound": 6404, + "InvalidResourceType": 7001, } ) @@ -712,16 +795,53 @@ var file_pkg_rpc_base_base_proto_rawDesc = []byte{ 0x74, 0x68, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x12, 0x24, 0x0a, 0x0e, 0x70, 0x69, 0x65, 0x63, 0x65, 0x5f, 0x6d, 0x64, 0x35, 0x5f, 0x73, 0x69, 0x67, 0x6e, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0c, 0x70, 0x69, 0x65, 0x63, 0x65, 0x4d, 0x64, 0x35, 0x53, 0x69, 0x67, 0x6e, 0x2a, 0x19, 0x0a, - 0x04, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x11, 0x0a, 0x0d, 0x58, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, - 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x2a, 0x17, 0x0a, 0x0a, 0x50, 0x69, 0x65, 0x63, - 0x65, 0x53, 0x74, 0x79, 0x6c, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x50, 0x4c, 0x41, 0x49, 0x4e, 0x10, - 0x00, 0x2a, 0x2c, 0x0a, 0x09, 0x53, 0x69, 0x7a, 0x65, 0x53, 0x63, 0x6f, 0x70, 0x65, 0x12, 0x0a, - 0x0a, 0x06, 0x4e, 0x4f, 0x52, 0x4d, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x53, 0x4d, - 0x41, 0x4c, 0x4c, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x54, 0x49, 0x4e, 0x59, 0x10, 0x02, 0x42, - 0x22, 0x5a, 0x20, 0x64, 0x37, 0x79, 0x2e, 0x69, 0x6f, 0x2f, 0x64, 0x72, 0x61, 0x67, 0x6f, 0x6e, - 0x66, 0x6c, 0x79, 0x2f, 0x76, 0x32, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x62, - 0x61, 0x73, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x0c, 0x70, 0x69, 0x65, 0x63, 0x65, 0x4d, 0x64, 0x35, 0x53, 0x69, 0x67, 0x6e, 0x2a, 0xeb, 0x04, + 0x0a, 0x04, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x11, 0x0a, 0x0d, 0x58, 0x5f, 0x55, 0x4e, 0x53, 0x50, + 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x07, 0x53, 0x75, 0x63, + 0x63, 0x65, 0x73, 0x73, 0x10, 0xc8, 0x01, 0x12, 0x16, 0x0a, 0x11, 0x53, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x55, 0x6e, 0x61, 0x76, 0x61, 0x69, 0x6c, 0x61, 0x62, 0x6c, 0x65, 0x10, 0xf4, 0x03, 0x12, + 0x13, 0x0a, 0x0e, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4c, 0x61, 0x63, 0x6b, 0x65, + 0x64, 0x10, 0xe8, 0x07, 0x12, 0x0f, 0x0a, 0x0a, 0x42, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x10, 0xf8, 0x0a, 0x12, 0x15, 0x0a, 0x10, 0x50, 0x65, 0x65, 0x72, 0x54, 0x61, 0x73, + 0x6b, 0x4e, 0x6f, 0x74, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x10, 0xfc, 0x0a, 0x12, 0x11, 0x0a, 0x0c, + 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x10, 0xdc, 0x0b, 0x12, + 0x13, 0x0a, 0x0e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x4f, 0x75, + 0x74, 0x10, 0xe0, 0x0b, 0x12, 0x10, 0x0a, 0x0b, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x45, 0x72, + 0x72, 0x6f, 0x72, 0x10, 0xa0, 0x1f, 0x12, 0x1b, 0x0a, 0x16, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, + 0x50, 0x69, 0x65, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x46, 0x61, 0x69, 0x6c, + 0x10, 0xa1, 0x1f, 0x12, 0x1a, 0x0a, 0x15, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x63, 0x68, + 0x65, 0x64, 0x75, 0x6c, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x10, 0xa2, 0x1f, 0x12, + 0x1a, 0x0a, 0x15, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, + 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x65, 0x64, 0x10, 0xa3, 0x1f, 0x12, 0x19, 0x0a, 0x14, 0x43, + 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x57, 0x61, 0x69, 0x74, 0x50, 0x69, 0x65, 0x63, 0x65, 0x52, 0x65, + 0x61, 0x64, 0x79, 0x10, 0xa4, 0x1f, 0x12, 0x1c, 0x0a, 0x17, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, + 0x50, 0x69, 0x65, 0x63, 0x65, 0x44, 0x6f, 0x77, 0x6e, 0x6c, 0x6f, 0x61, 0x64, 0x46, 0x61, 0x69, + 0x6c, 0x10, 0xa5, 0x1f, 0x12, 0x1b, 0x0a, 0x16, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x10, 0xa6, + 0x1f, 0x12, 0x0f, 0x0a, 0x0a, 0x53, 0x63, 0x68, 0x65, 0x64, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x10, + 0x88, 0x27, 0x12, 0x18, 0x0a, 0x13, 0x53, 0x63, 0x68, 0x65, 0x64, 0x4e, 0x65, 0x65, 0x64, 0x42, + 0x61, 0x63, 0x6b, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x10, 0x89, 0x27, 0x12, 0x12, 0x0a, 0x0d, + 0x53, 0x63, 0x68, 0x65, 0x64, 0x50, 0x65, 0x65, 0x72, 0x47, 0x6f, 0x6e, 0x65, 0x10, 0x8a, 0x27, + 0x12, 0x16, 0x0a, 0x11, 0x53, 0x63, 0x68, 0x65, 0x64, 0x50, 0x65, 0x65, 0x72, 0x4e, 0x6f, 0x74, + 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x10, 0x8c, 0x27, 0x12, 0x23, 0x0a, 0x1e, 0x53, 0x63, 0x68, 0x65, + 0x64, 0x50, 0x65, 0x65, 0x72, 0x50, 0x69, 0x65, 0x63, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, + 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x10, 0x8d, 0x27, 0x12, 0x19, 0x0a, + 0x14, 0x53, 0x63, 0x68, 0x65, 0x64, 0x54, 0x61, 0x73, 0x6b, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x45, 0x72, 0x72, 0x6f, 0x72, 0x10, 0x8e, 0x27, 0x12, 0x0d, 0x0a, 0x08, 0x43, 0x44, 0x4e, 0x45, + 0x72, 0x72, 0x6f, 0x72, 0x10, 0xf0, 0x2e, 0x12, 0x18, 0x0a, 0x13, 0x43, 0x44, 0x4e, 0x54, 0x61, + 0x73, 0x6b, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x72, 0x79, 0x46, 0x61, 0x69, 0x6c, 0x10, 0xf1, + 0x2e, 0x12, 0x18, 0x0a, 0x13, 0x43, 0x44, 0x4e, 0x54, 0x61, 0x73, 0x6b, 0x44, 0x6f, 0x77, 0x6e, + 0x6c, 0x6f, 0x61, 0x64, 0x46, 0x61, 0x69, 0x6c, 0x10, 0xf2, 0x2e, 0x12, 0x14, 0x0a, 0x0f, 0x43, + 0x44, 0x4e, 0x54, 0x61, 0x73, 0x6b, 0x4e, 0x6f, 0x74, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x10, 0x84, + 0x32, 0x12, 0x18, 0x0a, 0x13, 0x49, 0x6e, 0x76, 0x61, 0x6c, 0x69, 0x64, 0x52, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x10, 0xd9, 0x36, 0x2a, 0x17, 0x0a, 0x0a, 0x50, + 0x69, 0x65, 0x63, 0x65, 0x53, 0x74, 0x79, 0x6c, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x50, 0x4c, 0x41, + 0x49, 0x4e, 0x10, 0x00, 0x2a, 0x2c, 0x0a, 0x09, 0x53, 0x69, 0x7a, 0x65, 0x53, 0x63, 0x6f, 0x70, + 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x4e, 0x4f, 0x52, 0x4d, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x09, 0x0a, + 0x05, 0x53, 0x4d, 0x41, 0x4c, 0x4c, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x54, 0x49, 0x4e, 0x59, + 0x10, 0x02, 0x42, 0x22, 0x5a, 0x20, 0x64, 0x37, 0x79, 0x2e, 0x69, 0x6f, 0x2f, 0x64, 0x72, 0x61, + 0x67, 0x6f, 0x6e, 0x66, 0x6c, 0x79, 0x2f, 0x76, 0x32, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x72, 0x70, + 0x63, 0x2f, 0x62, 0x61, 0x73, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/rpc/base/base.pb.validate.go b/pkg/rpc/base/base.pb.validate.go index 5c4ac4140..e0ae9d0e9 100644 --- a/pkg/rpc/base/base.pb.validate.go +++ b/pkg/rpc/base/base.pb.validate.go @@ -34,20 +34,53 @@ var ( ) // Validate checks the field values on GrpcDfError with the rules defined in -// the proto definition for this message. If any rules are violated, an error -// is returned. +// the proto definition for this message. If any rules are violated, the first +// error encountered is returned, or nil if there are no violations. func (m *GrpcDfError) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on GrpcDfError with the rules defined in +// the proto definition for this message. If any rules are violated, the +// result is a list of violation errors wrapped in GrpcDfErrorMultiError, or +// nil if none found. +func (m *GrpcDfError) ValidateAll() error { + return m.validate(true) +} + +func (m *GrpcDfError) validate(all bool) error { if m == nil { return nil } + var errors []error + // no validation rules for Code // no validation rules for Message + if len(errors) > 0 { + return GrpcDfErrorMultiError(errors) + } return nil } +// GrpcDfErrorMultiError is an error wrapping multiple validation errors +// returned by GrpcDfError.ValidateAll() if the designated constraints aren't met. +type GrpcDfErrorMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m GrpcDfErrorMultiError) Error() string { + var msgs []string + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m GrpcDfErrorMultiError) AllErrors() []error { return m } + // GrpcDfErrorValidationError is the validation error returned by // GrpcDfError.Validate if the designated constraints aren't met. type GrpcDfErrorValidationError struct { @@ -103,12 +136,26 @@ var _ interface { } = GrpcDfErrorValidationError{} // Validate checks the field values on UrlMeta with the rules defined in the -// proto definition for this message. If any rules are violated, an error is returned. +// proto definition for this message. If any rules are violated, the first +// error encountered is returned, or nil if there are no violations. func (m *UrlMeta) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on UrlMeta with the rules defined in the +// proto definition for this message. If any rules are violated, the result is +// a list of violation errors wrapped in UrlMetaMultiError, or nil if none found. +func (m *UrlMeta) ValidateAll() error { + return m.validate(true) +} + +func (m *UrlMeta) validate(all bool) error { if m == nil { return nil } + var errors []error + // no validation rules for Digest // no validation rules for Tag @@ -119,9 +166,28 @@ func (m *UrlMeta) Validate() error { // no validation rules for Header + if len(errors) > 0 { + return UrlMetaMultiError(errors) + } return nil } +// UrlMetaMultiError is an error wrapping multiple validation errors returned +// by UrlMeta.ValidateAll() if the designated constraints aren't met. +type UrlMetaMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m UrlMetaMultiError) Error() string { + var msgs []string + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m UrlMetaMultiError) AllErrors() []error { return m } + // UrlMetaValidationError is the validation error returned by UrlMeta.Validate // if the designated constraints aren't met. type UrlMetaValidationError struct { @@ -177,21 +243,55 @@ var _ interface { } = UrlMetaValidationError{} // Validate checks the field values on HostLoad with the rules defined in the -// proto definition for this message. If any rules are violated, an error is returned. +// proto definition for this message. If any rules are violated, the first +// error encountered is returned, or nil if there are no violations. func (m *HostLoad) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on HostLoad with the rules defined in +// the proto definition for this message. If any rules are violated, the +// result is a list of violation errors wrapped in HostLoadMultiError, or nil +// if none found. +func (m *HostLoad) ValidateAll() error { + return m.validate(true) +} + +func (m *HostLoad) validate(all bool) error { if m == nil { return nil } + var errors []error + // no validation rules for CpuRatio // no validation rules for MemRatio // no validation rules for DiskRatio + if len(errors) > 0 { + return HostLoadMultiError(errors) + } return nil } +// HostLoadMultiError is an error wrapping multiple validation errors returned +// by HostLoad.ValidateAll() if the designated constraints aren't met. +type HostLoadMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m HostLoadMultiError) Error() string { + var msgs []string + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m HostLoadMultiError) AllErrors() []error { return m } + // HostLoadValidationError is the validation error returned by // HostLoad.Validate if the designated constraints aren't met. type HostLoadValidationError struct { @@ -247,13 +347,27 @@ var _ interface { } = HostLoadValidationError{} // Validate checks the field values on PieceTaskRequest with the rules defined -// in the proto definition for this message. If any rules are violated, an -// error is returned. +// in the proto definition for this message. If any rules are violated, the +// first error encountered is returned, or nil if there are no violations. func (m *PieceTaskRequest) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on PieceTaskRequest with the rules +// defined in the proto definition for this message. If any rules are +// violated, the result is a list of violation errors wrapped in +// PieceTaskRequestMultiError, or nil if none found. +func (m *PieceTaskRequest) ValidateAll() error { + return m.validate(true) +} + +func (m *PieceTaskRequest) validate(all bool) error { if m == nil { return nil } + var errors []error + // no validation rules for TaskId // no validation rules for SrcPid @@ -264,9 +378,29 @@ func (m *PieceTaskRequest) Validate() error { // no validation rules for Limit + if len(errors) > 0 { + return PieceTaskRequestMultiError(errors) + } return nil } +// PieceTaskRequestMultiError is an error wrapping multiple validation errors +// returned by PieceTaskRequest.ValidateAll() if the designated constraints +// aren't met. +type PieceTaskRequestMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m PieceTaskRequestMultiError) Error() string { + var msgs []string + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m PieceTaskRequestMultiError) AllErrors() []error { return m } + // PieceTaskRequestValidationError is the validation error returned by // PieceTaskRequest.Validate if the designated constraints aren't met. type PieceTaskRequestValidationError struct { @@ -322,12 +456,27 @@ var _ interface { } = PieceTaskRequestValidationError{} // Validate checks the field values on PieceInfo with the rules defined in the -// proto definition for this message. If any rules are violated, an error is returned. +// proto definition for this message. If any rules are violated, the first +// error encountered is returned, or nil if there are no violations. func (m *PieceInfo) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on PieceInfo with the rules defined in +// the proto definition for this message. If any rules are violated, the +// result is a list of violation errors wrapped in PieceInfoMultiError, or nil +// if none found. +func (m *PieceInfo) ValidateAll() error { + return m.validate(true) +} + +func (m *PieceInfo) validate(all bool) error { if m == nil { return nil } + var errors []error + // no validation rules for PieceNum // no validation rules for RangeStart @@ -340,9 +489,28 @@ func (m *PieceInfo) Validate() error { // no validation rules for PieceStyle + if len(errors) > 0 { + return PieceInfoMultiError(errors) + } return nil } +// PieceInfoMultiError is an error wrapping multiple validation errors returned +// by PieceInfo.ValidateAll() if the designated constraints aren't met. +type PieceInfoMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m PieceInfoMultiError) Error() string { + var msgs []string + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m PieceInfoMultiError) AllErrors() []error { return m } + // PieceInfoValidationError is the validation error returned by // PieceInfo.Validate if the designated constraints aren't met. type PieceInfoValidationError struct { @@ -398,13 +566,27 @@ var _ interface { } = PieceInfoValidationError{} // Validate checks the field values on PiecePacket with the rules defined in -// the proto definition for this message. If any rules are violated, an error -// is returned. +// the proto definition for this message. If any rules are violated, the first +// error encountered is returned, or nil if there are no violations. func (m *PiecePacket) Validate() error { + return m.validate(false) +} + +// ValidateAll checks the field values on PiecePacket with the rules defined in +// the proto definition for this message. If any rules are violated, the +// result is a list of violation errors wrapped in PiecePacketMultiError, or +// nil if none found. +func (m *PiecePacket) ValidateAll() error { + return m.validate(true) +} + +func (m *PiecePacket) validate(all bool) error { if m == nil { return nil } + var errors []error + // no validation rules for TaskId // no validation rules for DstPid @@ -414,7 +596,26 @@ func (m *PiecePacket) Validate() error { for idx, item := range m.GetPieceInfos() { _, _ = idx, item - if v, ok := interface{}(item).(interface{ Validate() error }); ok { + if all { + switch v := interface{}(item).(type) { + case interface{ ValidateAll() error }: + if err := v.ValidateAll(); err != nil { + errors = append(errors, PiecePacketValidationError{ + field: fmt.Sprintf("PieceInfos[%v]", idx), + reason: "embedded message failed validation", + cause: err, + }) + } + case interface{ Validate() error }: + if err := v.Validate(); err != nil { + errors = append(errors, PiecePacketValidationError{ + field: fmt.Sprintf("PieceInfos[%v]", idx), + reason: "embedded message failed validation", + cause: err, + }) + } + } + } else if v, ok := interface{}(item).(interface{ Validate() error }); ok { if err := v.Validate(); err != nil { return PiecePacketValidationError{ field: fmt.Sprintf("PieceInfos[%v]", idx), @@ -432,9 +633,28 @@ func (m *PiecePacket) Validate() error { // no validation rules for PieceMd5Sign + if len(errors) > 0 { + return PiecePacketMultiError(errors) + } return nil } +// PiecePacketMultiError is an error wrapping multiple validation errors +// returned by PiecePacket.ValidateAll() if the designated constraints aren't met. +type PiecePacketMultiError []error + +// Error returns a concatenation of all the error messages it wraps. +func (m PiecePacketMultiError) Error() string { + var msgs []string + for _, err := range m { + msgs = append(msgs, err.Error()) + } + return strings.Join(msgs, "; ") +} + +// AllErrors returns a list of validation violation errors. +func (m PiecePacketMultiError) AllErrors() []error { return m } + // PiecePacketValidationError is the validation error returned by // PiecePacket.Validate if the designated constraints aren't met. type PiecePacketValidationError struct { diff --git a/pkg/rpc/base/base.proto b/pkg/rpc/base/base.proto index 87a286637..88ddefcbe 100644 --- a/pkg/rpc/base/base.proto +++ b/pkg/rpc/base/base.proto @@ -22,6 +22,44 @@ option go_package = "d7y.io/dragonfly/v2/pkg/rpc/base"; enum Code{ X_UNSPECIFIED = 0; + // success code 200-299 + Success = 200; + // framework can not find server node + ServerUnavailable = 500; + + // common response error 1000-1999 + // client can be migrated to another scheduler/CDN + ResourceLacked = 1000; + BadRequest = 1400; + PeerTaskNotFound = 1404; + UnknownError = 1500; + RequestTimeOut = 1504; + + // client response error 4000-4999 + ClientError = 4000; + ClientPieceRequestFail = 4001; // get piece task from other peer error + ClientScheduleTimeout = 4002; // wait scheduler response timeout + ClientContextCanceled = 4003; + ClientWaitPieceReady = 4004; // when target peer downloads from source slowly, should wait + ClientPieceDownloadFail = 4005; + ClientRequestLimitFail = 4006; + + // scheduler response error 5000-5999 + SchedError = 5000; + SchedNeedBackSource = 5001; // client should try to download from source + SchedPeerGone = 5002; // client should disconnect from scheduler + SchedPeerNotFound = 5004; // peer not found in scheduler + SchedPeerPieceResultReportFail = 5005; // report piece + SchedTaskStatusError = 5006; // task status is fail + + // cdnsystem response error 6000-6999 + CDNError = 6000; + CDNTaskRegistryFail = 6001; + CDNTaskDownloadFail = 6002; + CDNTaskNotFound = 6404; + + // manager response error 7000-7999 + InvalidResourceType = 7001; } enum PieceStyle{ diff --git a/pkg/rpc/base/common/common.go b/pkg/rpc/base/common/common.go index b7845e230..73b055535 100644 --- a/pkg/rpc/base/common/common.go +++ b/pkg/rpc/base/common/common.go @@ -22,7 +22,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/pkg/rpc/base" ) @@ -53,11 +52,11 @@ func NewResWithErr(ptr interface{}, err error) interface{} { var code base.Code switch st.Code() { case codes.DeadlineExceeded: - code = dfcodes.RequestTimeOut + code = base.Code_RequestTimeOut case codes.OK: - code = dfcodes.Success + code = base.Code_Success default: - code = dfcodes.UnknownError + code = base.Code_UnknownError } return NewResWithCodeAndMsg(ptr, code, st.Message()) } diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 37c041a64..cef82be6d 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -30,10 +30,10 @@ import ( "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/util/sets" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/basic/dfnet" + "d7y.io/dragonfly/v2/pkg/rpc/base" ) const ( @@ -372,7 +372,7 @@ func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []str } // TODO recover findCandidateClientConn error if e, ok := cause.(*dferrors.DfError); ok { - if e.Code != dfcodes.ResourceLacked { + if e.Code != base.Code_ResourceLacked { return "", cause } } diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index c2d9d7b03..d6bf28b84 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -25,7 +25,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "d7y.io/dragonfly/v2/internal/dfcodes" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/basic/dfnet" @@ -199,7 +198,7 @@ func (sc *schedulerClient) retryReportPeerResult(ctx context.Context, pr *schedu var client scheduler.SchedulerClient client, schedulerNode, err = sc.getSchedulerClient(pr.TaskId, true) if err != nil { - code = dfcodes.ServerUnavailable + code = base.Code_ServerUnavailable return nil, err } return client.ReportPeerResult(ctx, pr, opts...) diff --git a/scheduler/core/events.go b/scheduler/core/events.go index 2b6103abf..0eed41562 100644 --- a/scheduler/core/events.go +++ b/scheduler/core/events.go @@ -24,10 +24,10 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/container/list" + "d7y.io/dragonfly/v2/pkg/rpc/base" schedulerRPC "d7y.io/dragonfly/v2/pkg/rpc/scheduler" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/core/scheduler" @@ -72,7 +72,7 @@ func (e reScheduleParentEvent) apply(s *state) { rsPeer.times = rsPeer.times + 1 peer := rsPeer.peer if peer.Task.IsFail() { - if err := peer.CloseChannelWithError(dferrors.New(dfcodes.SchedTaskStatusError, "schedule task status failed")); err != nil { + if err := peer.CloseChannelWithError(dferrors.New(base.Code_SchedTaskStatusError, "schedule task status failed")); err != nil { logger.WithTaskAndPeerID(peer.Task.ID, peer.ID).Warnf("close peer channel failed: %v", err) } return @@ -88,7 +88,7 @@ func (e reScheduleParentEvent) apply(s *state) { parent, candidates, hasParent := s.sched.ScheduleParent(peer, blankParents) if !hasParent { if peer.Task.CanBackToSource() && !peer.Task.ContainsBackToSourcePeer(peer.ID) { - if peer.CloseChannelWithError(dferrors.Newf(dfcodes.SchedNeedBackSource, "peer %s need back source", peer.ID)) == nil { + if peer.CloseChannelWithError(dferrors.Newf(base.Code_SchedNeedBackSource, "peer %s need back source", peer.ID)) == nil { peer.Task.AddBackToSourcePeer(peer.ID) } return @@ -134,7 +134,7 @@ func (e startReportPieceResultEvent) apply(s *state) { if !hasParent { if e.peer.Task.CanBackToSource() && !e.peer.Task.ContainsBackToSourcePeer(e.peer.ID) { span.SetAttributes(config.AttributeClientBackSource.Bool(true)) - if e.peer.CloseChannelWithError(dferrors.Newf(dfcodes.SchedNeedBackSource, "peer %s need back source", e.peer.ID)) == nil { + if e.peer.CloseChannelWithError(dferrors.Newf(base.Code_SchedNeedBackSource, "peer %s need back source", e.peer.ID)) == nil { e.peer.Task.AddBackToSourcePeer(e.peer.ID) } logger.WithTaskAndPeerID(e.peer.Task.ID, @@ -226,11 +226,11 @@ func (e peerDownloadPieceFailEvent) apply(s *state) { return } switch e.pr.Code { - case dfcodes.ClientWaitPieceReady: + case base.Code_ClientWaitPieceReady: return - case dfcodes.PeerTaskNotFound: + case base.Code_PeerTaskNotFound: s.peerManager.Delete(e.pr.DstPid) - case dfcodes.CdnTaskNotFound, dfcodes.CdnError, dfcodes.CdnTaskDownloadFail: + case base.Code_CDNTaskNotFound, base.Code_CDNError, base.Code_CDNTaskDownloadFail: s.peerManager.Delete(e.pr.DstPid) go func() { if _, err := s.cdn.StartSeedTask(e.ctx, e.peer.Task); err != nil { @@ -370,7 +370,7 @@ func constructSuccessPeerPacket(peer *supervisor.Peer, parent *supervisor.Peer, ParallelCount: 1, MainPeer: mainPeer, StealPeers: stealPeers, - Code: dfcodes.Success, + Code: base.Code_Success, } logger.Debugf("send peerPacket %+v to peer %s", peerPacket, peer.ID) return peerPacket @@ -386,7 +386,7 @@ func handleCDNSeedTaskFail(task *supervisor.Task) { if task.CanBackToSource() { if !task.ContainsBackToSourcePeer(peer.ID) { - if peer.CloseChannelWithError(dferrors.Newf(dfcodes.SchedNeedBackSource, "peer %s need back source because cdn seed task failed", peer.ID)) == nil { + if peer.CloseChannelWithError(dferrors.Newf(base.Code_SchedNeedBackSource, "peer %s need back source because cdn seed task failed", peer.ID)) == nil { task.AddBackToSourcePeer(peer.ID) } } @@ -403,7 +403,7 @@ func handleCDNSeedTaskFail(task *supervisor.Task) { return true } - if err := peer.CloseChannelWithError(dferrors.New(dfcodes.SchedTaskStatusError, "schedule task status failed")); err != nil { + if err := peer.CloseChannelWithError(dferrors.New(base.Code_SchedTaskStatusError, "schedule task status failed")); err != nil { peer.Log().Warnf("close peer conn channel failed: %v", err) } return true diff --git a/scheduler/core/service.go b/scheduler/core/service.go index 7b92e154b..c5c7f846b 100644 --- a/scheduler/core/service.go +++ b/scheduler/core/service.go @@ -28,10 +28,10 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/gc" + "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/base/common" schedulerRPC "d7y.io/dragonfly/v2/pkg/rpc/scheduler" "d7y.io/dragonfly/v2/pkg/synclock" @@ -171,7 +171,7 @@ func (s *SchedulerService) runReScheduleParentLoop(wsdq workqueue.DelayingInterf peer := rsPeer.peer wsdq.Done(v) if rsPeer.times > maxRescheduleTimes { - if peer.CloseChannelWithError(dferrors.Newf(dfcodes.SchedNeedBackSource, "reschedule parent for peer %s already reaches max reschedule times", + if peer.CloseChannelWithError(dferrors.Newf(base.Code_SchedNeedBackSource, "reschedule parent for peer %s already reaches max reschedule times", peer.ID)) == nil { peer.Task.AddBackToSourcePeer(peer.ID) } @@ -327,7 +327,7 @@ func (s *SchedulerService) HandlePieceResult(ctx context.Context, peer *supervis pr: pieceResult, }) return nil - } else if pieceResult.Code != dfcodes.Success { + } else if pieceResult.Code != base.Code_Success { s.worker.send(peerDownloadPieceFailEvent{ ctx: ctx, peer: peer, diff --git a/scheduler/rpcserver/rpcserver.go b/scheduler/rpcserver/rpcserver.go index 35d5033b7..ffa5de568 100644 --- a/scheduler/rpcserver/rpcserver.go +++ b/scheduler/rpcserver/rpcserver.go @@ -25,7 +25,6 @@ import ( "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" @@ -67,7 +66,7 @@ func (s *server) RegisterPeerTask(ctx context.Context, request *scheduler.PeerTa logger.Debugf("register peer task, req: %+v", request) resp = new(scheduler.RegisterResult) if verifyErr := validateParams(request); verifyErr != nil { - err = dferrors.Newf(dfcodes.BadRequest, "bad request param: %v", verifyErr) + err = dferrors.Newf(base.Code_BadRequest, "bad request param: %v", verifyErr) logger.Errorf("register request: %v", err) span.RecordError(err) return @@ -77,7 +76,7 @@ func (s *server) RegisterPeerTask(ctx context.Context, request *scheduler.PeerTa span.SetAttributes(config.AttributeTaskID.String(taskID)) task := s.service.GetOrCreateTask(ctx, supervisor.NewTask(taskID, request.Url, request.UrlMeta)) if task.IsFail() { - err = dferrors.New(dfcodes.SchedTaskStatusError, "task status is fail") + err = dferrors.New(base.Code_SchedTaskStatusError, "task status is fail") logger.Errorf("task %s status is fail", task.ID) span.RecordError(err) return @@ -133,7 +132,7 @@ func (s *server) ReportPieceResult(stream scheduler.Scheduler_ReportPieceResultS if err == io.EOF { return nil } - err = dferrors.Newf(dfcodes.SchedPeerPieceResultReportFail, "receive an error from peer stream: %v", err) + err = dferrors.Newf(base.Code_SchedPeerPieceResultReportFail, "receive an error from peer stream: %v", err) span.RecordError(err) return err } @@ -141,20 +140,20 @@ func (s *server) ReportPieceResult(stream scheduler.Scheduler_ReportPieceResultS peer, ok := s.service.GetPeer(pieceResult.SrcPid) if !ok { - err = dferrors.Newf(dfcodes.SchedPeerNotFound, "peer %s not found", pieceResult.SrcPid) + err = dferrors.Newf(base.Code_SchedPeerNotFound, "peer %s not found", pieceResult.SrcPid) span.RecordError(err) return err } if peer.Task.IsFail() { - err = dferrors.Newf(dfcodes.SchedTaskStatusError, "peer's task status is fail, task status %s", peer.Task.GetStatus()) + err = dferrors.Newf(base.Code_SchedTaskStatusError, "peer's task status is fail, task status %s", peer.Task.GetStatus()) span.RecordError(err) return err } conn, ok := peer.BindNewConn(stream) if !ok { - err = dferrors.Newf(dfcodes.SchedPeerPieceResultReportFail, "peer can not bind conn") + err = dferrors.Newf(base.Code_SchedPeerPieceResultReportFail, "peer can not bind conn") span.RecordError(err) return err } @@ -194,7 +193,7 @@ func (s *server) ReportPeerResult(ctx context.Context, result *scheduler.PeerRes peer, ok := s.service.GetPeer(result.PeerId) if !ok { logger.Warnf("report peer result: peer %s is not exists", result.PeerId) - err = dferrors.Newf(dfcodes.SchedPeerNotFound, "peer %s not found", result.PeerId) + err = dferrors.Newf(base.Code_SchedPeerNotFound, "peer %s not found", result.PeerId) span.RecordError(err) return err } diff --git a/scheduler/supervisor/cdn.go b/scheduler/supervisor/cdn.go index ab8d35f63..e690f3bf6 100644 --- a/scheduler/supervisor/cdn.go +++ b/scheduler/supervisor/cdn.go @@ -31,11 +31,11 @@ import ( "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/basic/dfnet" + "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem" cdnclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client" "d7y.io/dragonfly/v2/scheduler/config" @@ -110,9 +110,9 @@ func (c *cdn) StartSeedTask(ctx context.Context, task *Task) (*Peer, error) { if cdnErr, ok := err.(*dferrors.DfError); ok { logger.Errorf("failed to obtain cdn seed: %v", cdnErr) switch cdnErr.Code { - case dfcodes.CdnTaskRegistryFail: + case base.Code_CDNTaskRegistryFail: return nil, errors.Wrap(ErrCDNRegisterFail, "obtain seeds") - case dfcodes.CdnTaskDownloadFail: + case base.Code_CDNTaskDownloadFail: return nil, errors.Wrapf(ErrCDNDownloadFail, "obtain seeds") default: return nil, errors.Wrapf(ErrCDNUnknown, "obtain seeds") @@ -145,9 +145,9 @@ func (c *cdn) receivePiece(ctx context.Context, task *Task, stream *cdnclient.Pi logger.Errorf("task %s add piece err %v", task.ID, err) if recvErr, ok := err.(*dferrors.DfError); ok { switch recvErr.Code { - case dfcodes.CdnTaskRegistryFail: + case base.Code_CDNTaskRegistryFail: return nil, errors.Wrapf(ErrCDNRegisterFail, "receive piece") - case dfcodes.CdnTaskDownloadFail: + case base.Code_CDNTaskDownloadFail: return nil, errors.Wrapf(ErrCDNDownloadFail, "receive piece") default: return nil, errors.Wrapf(ErrCDNUnknown, "recive piece") diff --git a/scheduler/supervisor/cdn_test.go b/scheduler/supervisor/cdn_test.go index 927602b22..8c283be53 100644 --- a/scheduler/supervisor/cdn_test.go +++ b/scheduler/supervisor/cdn_test.go @@ -31,7 +31,6 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" - "d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc/base" @@ -102,13 +101,13 @@ func TestCDN_Initial(t *testing.T) { expect func(t *testing.T, cdn supervisor.CDN, peer *supervisor.Peer, err error) }{ { - name: "ObtainSeeds cause CdnTaskRegistryFail", + name: "ObtainSeeds cause CDNTaskRegistryFail", status: supervisor.TaskStatusWaiting, mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { ctl := gomock.NewController(t) defer ctl.Finish() - err := dferrors.New(dfcodes.CdnTaskRegistryFail, "mockError") + err := dferrors.New(base.Code_CDNTaskRegistryFail, "mockError") mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) mockPeerManager := mocks.NewMockPeerManager(ctl) mockHostManager := mocks.NewMockHostManager(ctl) @@ -124,13 +123,13 @@ func TestCDN_Initial(t *testing.T) { }, }, { - name: "ObtainSeeds cause CdnTaskDownloadFail", + name: "ObtainSeeds cause CDNTaskDownloadFail", status: supervisor.TaskStatusWaiting, mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { ctl := gomock.NewController(t) defer ctl.Finish() - err := dferrors.New(dfcodes.CdnTaskDownloadFail, "mockError") + err := dferrors.New(base.Code_CDNTaskDownloadFail, "mockError") mockCDNDynmaicClient := mocks.NewMockCDNDynmaicClient(ctl) mockPeerManager := mocks.NewMockPeerManager(ctl) mockHostManager := mocks.NewMockHostManager(ctl) @@ -265,7 +264,7 @@ func TestCDN_Initial(t *testing.T) { }, }, { - name: "receivePiece cause CdnTaskRegistryFail", + name: "receivePiece cause CDNTaskRegistryFail", status: supervisor.TaskStatusWaiting, mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { ctl := gomock.NewController(t) @@ -276,7 +275,7 @@ func TestCDN_Initial(t *testing.T) { mockHostManager := mocks.NewMockHostManager(ctl) mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(mockPieceSeedStream, nil).AnyTimes() - err := dferrors.New(dfcodes.CdnTaskRegistryFail, "mockError") + err := dferrors.New(base.Code_CDNTaskRegistryFail, "mockError") streamRet := []gomonkey.OutputCell{ {Values: gomonkey.Params{nil, err}}, } @@ -290,7 +289,7 @@ func TestCDN_Initial(t *testing.T) { }, }, { - name: "receivePiece cause CdnTaskDownloadFail", + name: "receivePiece cause CDNTaskDownloadFail", status: supervisor.TaskStatusWaiting, mock: func(t *testing.T) (supervisor.CDNDynmaicClient, supervisor.PeerManager, supervisor.HostManager, *gomonkey.Patches) { ctl := gomock.NewController(t) @@ -301,7 +300,7 @@ func TestCDN_Initial(t *testing.T) { mockHostManager := mocks.NewMockHostManager(ctl) mockCDNDynmaicClient.EXPECT().ObtainSeeds(gomock.Any(), gomock.Any()).Return(mockPieceSeedStream, nil).AnyTimes() - err := dferrors.New(dfcodes.CdnTaskDownloadFail, "mockError") + err := dferrors.New(base.Code_CDNTaskDownloadFail, "mockError") streamRet := []gomonkey.OutputCell{ {Values: gomonkey.Params{nil, err}}, }