From 5763ee86bda1bb44e3927b2576dbaab63812bf40 Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 30 May 2022 17:47:15 +0800 Subject: [PATCH] feat: add seed peer metrics (#1342) Signed-off-by: Gaius --- client/daemon/metrics/metrics.go | 40 ++++++++++++++--- client/daemon/peer/peertask_manager.go | 13 ++++-- .../daemon/peer/peertask_manager_mock_test.go | 7 +-- client/daemon/peer/peertask_manager_test.go | 2 +- client/daemon/peer/peertask_seed.go | 2 - client/daemon/rpcserver/seeder.go | 43 ++++++++++++++----- client/daemon/rpcserver/seeder_test.go | 4 +- .../daemon/test/mock/peer/peertask_manager.go | 7 +-- 8 files changed, 87 insertions(+), 31 deletions(-) diff --git a/client/daemon/metrics/metrics.go b/client/daemon/metrics/metrics.go index 1c0b18091..1a9580629 100644 --- a/client/daemon/metrics/metrics.go +++ b/client/daemon/metrics/metrics.go @@ -27,8 +27,17 @@ import ( ) const ( - FailTypeP2P = "p2p" + // Failed download task type is P2P + FailTypeP2P = "p2p" + + // Failed download task type is source FailTypeBackSource = "source" + + // SeedPeerDownload type is p2p + SeedPeerDownloadTypeP2P = "p2p" + + // SeedPeerDownload type is back-to-source + SeedPeerDownloadTypeBackToSource = "back_to_source" ) var ( @@ -109,11 +118,32 @@ var ( Help: "Counter of the total stream tasks.", }) - SeedTaskCount = promauto.NewCounter(prometheus.CounterOpts{ + SeedPeerDownloadCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: constants.MetricsNamespace, - Subsystem: constants.DfdaemonMetricsName, - Name: "seed_task_total", - Help: "Counter of the total seed tasks.", + Subsystem: constants.CDNMetricsName, + Name: "seed_peer_download_total", + Help: "Counter of the number of the seed peer downloading.", + }) + + SeedPeerDownloadFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.CDNMetricsName, + Name: "seed_peer_download_failure_total", + Help: "Counter of the number of failed of the seed peer downloading.", + }) + + SeedPeerDownloadTraffic = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.CDNMetricsName, + Name: "seed_peer_download_traffic", + Help: "Counter of the number of seed peer download traffic.", + }, []string{"type"}) + + SeedPeerConcurrentDownloadGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: constants.MetricsNamespace, + Subsystem: constants.CDNMetricsName, + Name: "seed_peer_concurrent_download_total", + Help: "Gauger of the number of concurrent of the seed peer downloading.", }) PeerTaskCacheHitCount = promauto.NewCounter(prometheus.CounterOpts{ diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index c4df765ae..df6f0f8e2 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -52,7 +52,7 @@ type TaskManager interface { readCloser io.ReadCloser, attribute map[string]string, err error) // StartSeedTask starts a seed peer task StartSeedTask(ctx context.Context, req *SeedTaskRequest) ( - seedTaskResult *SeedTaskResponse, err error) + seedTaskResult *SeedTaskResponse, reuse bool, err error) Subscribe(request *base.PieceTaskRequest) (*SubscribeResponse, bool) @@ -340,11 +340,11 @@ func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTask return readCloser, attribute, err } -func (ptm *peerTaskManager) StartSeedTask(ctx context.Context, req *SeedTaskRequest) (response *SeedTaskResponse, err error) { +func (ptm *peerTaskManager) StartSeedTask(ctx context.Context, req *SeedTaskRequest) (response *SeedTaskResponse, reuse bool, err error) { response, ok := ptm.tryReuseSeedPeerTask(ctx, req) if ok { metrics.PeerTaskCacheHitCount.Add(1) - return response, nil + return response, false, nil } var limit = rate.Inf @@ -355,7 +355,12 @@ func (ptm *peerTaskManager) StartSeedTask(ctx context.Context, req *SeedTaskRequ limit = rate.Limit(req.Limit) } - return ptm.newSeedTask(ctx, req, limit) + response, err = ptm.newSeedTask(ctx, req, limit) + if err != nil { + return nil, false, err + } + + return response, true, nil } type SubscribeResponse struct { diff --git a/client/daemon/peer/peertask_manager_mock_test.go b/client/daemon/peer/peertask_manager_mock_test.go index 5f59abfeb..0e30edd36 100644 --- a/client/daemon/peer/peertask_manager_mock_test.go +++ b/client/daemon/peer/peertask_manager_mock_test.go @@ -99,12 +99,13 @@ func (mr *MockTaskManagerMockRecorder) StartFileTask(ctx, req interface{}) *gomo } // StartSeedTask mocks base method. -func (m *MockTaskManager) StartSeedTask(ctx context.Context, req *SeedTaskRequest) (*SeedTaskResponse, error) { +func (m *MockTaskManager) StartSeedTask(ctx context.Context, req *SeedTaskRequest) (*SeedTaskResponse, bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "StartSeedTask", ctx, req) ret0, _ := ret[0].(*SeedTaskResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // StartSeedTask indicates an expected call of StartSeedTask. diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index 3eabaeca1..39b9caece 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -702,7 +702,7 @@ func (ts *testSpec) runStreamTaskTest(_ *testifyassert.Assertions, require *test } func (ts *testSpec) runSeedTaskTest(_ *testifyassert.Assertions, require *testifyrequire.Assertions, mm *mockManager, urlMeta *base.UrlMeta) { - r, err := mm.peerTaskManager.StartSeedTask( + r, _, err := mm.peerTaskManager.StartSeedTask( context.Background(), &SeedTaskRequest{ PeerTaskRequest: scheduler.PeerTaskRequest{ diff --git a/client/daemon/peer/peertask_seed.go b/client/daemon/peer/peertask_seed.go index afc24fe9d..d82d506fa 100644 --- a/client/daemon/peer/peertask_seed.go +++ b/client/daemon/peer/peertask_seed.go @@ -24,7 +24,6 @@ import ( "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" - "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" ) @@ -62,7 +61,6 @@ func (ptm *peerTaskManager) newSeedTask( ctx context.Context, request *SeedTaskRequest, limit rate.Limit) (*SeedTaskResponse, error) { - metrics.SeedTaskCount.Add(1) taskID := idgen.TaskID(request.Url, request.UrlMeta) ptc, err := ptm.getPeerTaskConductor(ctx, taskID, &request.PeerTaskRequest, limit, nil, request.Range, "", true) diff --git a/client/daemon/rpcserver/seeder.go b/client/daemon/rpcserver/seeder.go index 53419efe7..8ca68ca97 100644 --- a/client/daemon/rpcserver/seeder.go +++ b/client/daemon/rpcserver/seeder.go @@ -27,6 +27,7 @@ import ( "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" + "d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/peer" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/idgen" @@ -50,6 +51,10 @@ func (s *seeder) SyncPieceTasks(tasksServer cdnsystem.Seeder_SyncPieceTasksServe } func (s *seeder) ObtainSeeds(seedRequest *cdnsystem.SeedRequest, seedsServer cdnsystem.Seeder_ObtainSeedsServer) error { + metrics.SeedPeerConcurrentDownloadGauge.Inc() + defer metrics.SeedPeerConcurrentDownloadGauge.Dec() + metrics.SeedPeerDownloadCount.Add(1) + s.server.Keep() if seedRequest.UrlMeta == nil { seedRequest.UrlMeta = &base.UrlMeta{} @@ -74,6 +79,7 @@ func (s *seeder) ObtainSeeds(seedRequest *cdnsystem.SeedRequest, seedsServer cdn if len(req.UrlMeta.Range) > 0 { r, err := rangeutils.ParseRange(req.UrlMeta.Range, math.MaxInt) if err != nil { + metrics.SeedPeerDownloadFailureCount.Add(1) err = fmt.Errorf("parse range %s error: %s", req.UrlMeta.Range, err) log.Errorf(err.Error()) return err @@ -84,19 +90,22 @@ func (s *seeder) ObtainSeeds(seedRequest *cdnsystem.SeedRequest, seedsServer cdn } } - resp, err := s.server.peerTaskManager.StartSeedTask(seedsServer.Context(), &req) + resp, reuse, err := s.server.peerTaskManager.StartSeedTask(seedsServer.Context(), &req) if err != nil { + metrics.SeedPeerDownloadFailureCount.Add(1) log.Errorf("start seed task error: %s", err.Error()) return err } if resp.SubscribeResponse.Storage == nil { + metrics.SeedPeerDownloadFailureCount.Add(1) err = fmt.Errorf("invalid SubscribeResponse.Storage") log.Errorf("%s", err.Error()) return err } if resp.SubscribeResponse.Success == nil && resp.SubscribeResponse.Fail == nil { + metrics.SeedPeerDownloadFailureCount.Add(1) err = fmt.Errorf("both of SubscribeResponse.Success and SubscribeResponse.Fail is nil") log.Errorf("%s", err.Error()) return err @@ -114,6 +123,7 @@ func (s *seeder) ObtainSeeds(seedRequest *cdnsystem.SeedRequest, seedsServer cdn Done: false, }) if err != nil { + metrics.SeedPeerDownloadFailureCount.Add(1) resp.Span.RecordError(err) log.Errorf("send piece seed error: %s", err.Error()) return err @@ -128,7 +138,12 @@ func (s *seeder) ObtainSeeds(seedRequest *cdnsystem.SeedRequest, seedsServer cdn } defer resp.Span.End() - return sync.sendPieceSeeds() + if err := sync.sendPieceSeeds(reuse); err != nil { + metrics.SeedPeerDownloadFailureCount.Add(1) + return err + } + + return nil } type seedSynchronizer struct { @@ -140,7 +155,7 @@ type seedSynchronizer struct { attributeSent bool } -func (s *seedSynchronizer) sendPieceSeeds() (err error) { +func (s *seedSynchronizer) sendPieceSeeds(reuse bool) (err error) { var ( ctx = s.Context desired int32 @@ -155,7 +170,7 @@ func (s *seedSynchronizer) sendPieceSeeds() (err error) { return err case <-s.Success: s.Infof("seed task success, send reminding piece seeds") - err = s.sendRemindingPieceSeeds(desired) + err = s.sendRemindingPieceSeeds(desired, reuse) if err != nil { s.Span.RecordError(err) s.Span.SetAttributes(config.AttributeSeedTaskSuccess.Bool(false)) @@ -170,7 +185,7 @@ func (s *seedSynchronizer) sendPieceSeeds() (err error) { return status.Errorf(codes.Internal, "seed task failed") case p := <-s.PieceInfoChannel: s.Infof("receive piece info, num: %d, ordered num: %d, finish: %v", p.Num, p.OrderedNum, p.Finished) - desired, err = s.sendOrderedPieceSeeds(desired, p.OrderedNum, p.Finished) + desired, err = s.sendOrderedPieceSeeds(desired, p.OrderedNum, p.Finished, reuse) if err != nil { s.Span.RecordError(err) s.Span.SetAttributes(config.AttributeSeedTaskSuccess.Bool(false)) @@ -185,7 +200,7 @@ func (s *seedSynchronizer) sendPieceSeeds() (err error) { } } -func (s *seedSynchronizer) sendRemindingPieceSeeds(desired int32) error { +func (s *seedSynchronizer) sendRemindingPieceSeeds(desired int32, reuse bool) error { for { pp, err := s.Storage.GetPieces(s.Context, &base.PieceTaskRequest{ @@ -209,7 +224,7 @@ func (s *seedSynchronizer) sendRemindingPieceSeeds(desired int32) error { // we must send done to scheduler if len(pp.PieceInfos) == 0 { - ps := s.compositePieceSeed(pp, nil) + ps := s.compositePieceSeed(pp, nil, reuse) ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano()) s.Infof("seed tasks start time: %d, end time: %d, cost: %dms", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000) err = s.seedsServer.Send(&ps) @@ -224,7 +239,7 @@ func (s *seedSynchronizer) sendRemindingPieceSeeds(desired int32) error { s.Errorf("desired piece %d, not found", desired) return status.Errorf(codes.Internal, "seed task piece %d not found", desired) } - ps := s.compositePieceSeed(pp, p) + ps := s.compositePieceSeed(pp, p, reuse) if p.PieceNum == pp.TotalPiece-1 { ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano()) s.Infof("seed tasks start time: %d, end time: %d, cost: %dms", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000) @@ -246,7 +261,7 @@ func (s *seedSynchronizer) sendRemindingPieceSeeds(desired int32) error { } } -func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, finished bool) (int32, error) { +func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, finished bool, reuse bool) (int32, error) { cur := desired for ; cur <= orderedNum; cur++ { pp, err := s.Storage.GetPieces(s.Context, @@ -273,7 +288,7 @@ func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, fini s.attributeSent = true } - ps := s.compositePieceSeed(pp, pp.PieceInfos[0]) + ps := s.compositePieceSeed(pp, pp.PieceInfos[0], reuse) if cur == orderedNum && finished { ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano()) s.Infof("seed tasks start time: %d, end time: %d, cost: %dms", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000) @@ -289,7 +304,13 @@ func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, fini return cur, nil } -func (s *seedSynchronizer) compositePieceSeed(pp *base.PiecePacket, piece *base.PieceInfo) cdnsystem.PieceSeed { +func (s *seedSynchronizer) compositePieceSeed(pp *base.PiecePacket, piece *base.PieceInfo, reuse bool) cdnsystem.PieceSeed { + seedPeerDownloadType := metrics.SeedPeerDownloadTypeBackToSource + if reuse { + seedPeerDownloadType = metrics.SeedPeerDownloadTypeP2P + } + metrics.SeedPeerDownloadTraffic.WithLabelValues(seedPeerDownloadType).Add(float64(pp.ContentLength)) + return cdnsystem.PieceSeed{ PeerId: s.seedTaskRequest.PeerId, HostId: s.seedTaskRequest.PeerHost.Id, diff --git a/client/daemon/rpcserver/seeder_test.go b/client/daemon/rpcserver/seeder_test.go index e5148243d..b222f4879 100644 --- a/client/daemon/rpcserver/seeder_test.go +++ b/client/daemon/rpcserver/seeder_test.go @@ -256,7 +256,7 @@ func Test_ObtainSeeds(t *testing.T) { }) mockTaskManager := mock_peer.NewMockTaskManager(ctrl) mockTaskManager.EXPECT().StartSeedTask(gomock.Any(), gomock.Any()).DoAndReturn( - func(ctx context.Context, req *peer.SeedTaskRequest) (*peer.SeedTaskResponse, error) { + func(ctx context.Context, req *peer.SeedTaskRequest) (*peer.SeedTaskResponse, bool, error) { ch := make(chan *peer.PieceInfo) success := make(chan struct{}) fail := make(chan struct{}) @@ -309,7 +309,7 @@ func Test_ObtainSeeds(t *testing.T) { Context: ctx, Span: span, TaskID: "fake-task-id", - }, nil + }, false, nil }) s := &server{ diff --git a/client/daemon/test/mock/peer/peertask_manager.go b/client/daemon/test/mock/peer/peertask_manager.go index a12a96648..4958e7bfd 100644 --- a/client/daemon/test/mock/peer/peertask_manager.go +++ b/client/daemon/test/mock/peer/peertask_manager.go @@ -100,12 +100,13 @@ func (mr *MockTaskManagerMockRecorder) StartFileTask(ctx, req interface{}) *gomo } // StartSeedTask mocks base method. -func (m *MockTaskManager) StartSeedTask(ctx context.Context, req *peer.SeedTaskRequest) (*peer.SeedTaskResponse, error) { +func (m *MockTaskManager) StartSeedTask(ctx context.Context, req *peer.SeedTaskRequest) (*peer.SeedTaskResponse, bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "StartSeedTask", ctx, req) ret0, _ := ret[0].(*peer.SeedTaskResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // StartSeedTask indicates an expected call of StartSeedTask.