diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index 602f63de6..08cd9fb43 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -101,84 +101,84 @@ var ( Subsystem: types.SchedulerMetricsName, Name: "register_peer_total", Help: "Counter of the number of the register peer.", - }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) + }, []string{"priority", "task_type", "host_type"}) RegisterPeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "register_peer_failure_total", Help: "Counter of the number of failed of the register peer.", - }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) + }, []string{"priority", "task_type", "host_type"}) DownloadPeerStartedCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "download_peer_started_total", Help: "Counter of the number of the download peer started.", - }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) + }, []string{"priority", "task_type", "host_type"}) DownloadPeerStartedFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "download_peer_started_failure_total", Help: "Counter of the number of failed of the download peer started.", - }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) + }, []string{"priority", "task_type", "host_type"}) DownloadPeerBackToSourceStartedCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "download_peer_back_to_source_started_total", Help: "Counter of the number of the download peer back-to-source started.", - }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) + }, []string{"priority", "task_type", "host_type"}) DownloadPeerBackToSourceStartedFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "download_peer_back_to_source_started_failure_total", Help: "Counter of the number of failed of the download peer back-to-source started.", - }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) + }, []string{"priority", "task_type", "host_type"}) DownloadPeerCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "download_peer_finished_total", Help: "Counter of the number of the download peer.", - }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) + }, []string{"priority", "task_type", "host_type"}) DownloadPeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "download_peer_finished_failure_total", Help: "Counter of the number of failed of the download peer.", - }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) + }, []string{"priority", "task_type", "host_type"}) DownloadPeerBackToSourceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "download_peer_back_to_source_finished_failure_total", Help: "Counter of the number of failed of the download peer back-to-source.", - }, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) + }, []string{"priority", "task_type", "host_type"}) DownloadPieceCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "download_piece_finished_total", Help: "Counter of the number of the download piece.", - }, []string{"traffic_type", "task_type", "task_tag", "task_app", "host_type"}) + }, []string{"traffic_type", "task_type", "host_type"}) DownloadPieceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "download_piece_finished_failure_total", Help: "Counter of the number of failed of the download piece.", - }, []string{"traffic_type", "task_type", "task_tag", "task_app", "host_type"}) + }, []string{"traffic_type", "task_type", "host_type"}) DownloadPieceBackToSourceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "download_piece_back_to_source_finished_failure_total", Help: "Counter of the number of failed of the download piece back-to-source.", - }, []string{"traffic_type", "task_type", "task_tag", "task_app", "host_type"}) + }, []string{"traffic_type", "task_type", "host_type"}) StatTaskCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, @@ -257,14 +257,14 @@ var ( Subsystem: types.SchedulerMetricsName, Name: "traffic", Help: "Counter of the number of traffic.", - }, []string{"type", "task_type", "task_tag", "task_app", "host_type"}) + }, []string{"type", "task_type", "host_type"}) HostTraffic = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "host_traffic", Help: "Counter of the number of per host traffic.", - }, []string{"type", "task_type", "task_tag", "task_app", "host_type", "host_id", "host_ip", "host_name"}) + }, []string{"type", "task_type", "host_type", "host_id", "host_ip", "host_name"}) DownloadPeerDuration = promauto.NewSummaryVec(prometheus.SummaryOpts{ Namespace: types.MetricsNamespace, @@ -281,6 +281,14 @@ var ( Help: "Gauge of the number of concurrent of the scheduling.", }) + ScheduleDuration = promauto.NewSummary(prometheus.SummaryOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "schedule_duration_milliseconds", + Help: "Summary of the time each scheduling.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }) + VersionGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, diff --git a/scheduler/resource/seed_peer.go b/scheduler/resource/seed_peer.go index 1323eef83..989310b2c 100644 --- a/scheduler/resource/seed_peer.go +++ b/scheduler/resource/seed_peer.go @@ -213,7 +213,7 @@ func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task) trafficType = commonv2.TrafficType_LOCAL_PEER } metrics.Traffic.WithLabelValues(trafficType.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(pieceSeed.PieceInfo.RangeSize)) + peer.Host.Type.Name()).Add(float64(pieceSeed.PieceInfo.RangeSize)) } // Handle end of piece. diff --git a/scheduler/rpcserver/scheduler_server_v1.go b/scheduler/rpcserver/scheduler_server_v1.go index 678f4eecc..61aa78430 100644 --- a/scheduler/rpcserver/scheduler_server_v1.go +++ b/scheduler/rpcserver/scheduler_server_v1.go @@ -59,19 +59,16 @@ func (s *schedulerServerV1) RegisterPeerTask(ctx context.Context, req *scheduler if req.TaskId == "" { req.TaskId = idgen.TaskIDV1(req.Url, req.UrlMeta) } - - tag := req.UrlMeta.Tag - application := req.UrlMeta.Application priority := req.UrlMeta.Priority // Collect RegisterPeerCount metrics. metrics.RegisterPeerCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(), - tag, application, types.HostTypeNormalName).Inc() + types.HostTypeNormalName).Inc() resp, err := s.service.RegisterPeerTask(ctx, req) if err != nil { // Collect RegisterPeerFailureCount metrics. metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(), - tag, application, types.HostTypeNormalName).Inc() + types.HostTypeNormalName).Inc() } return resp, err diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 955d013b8..0fb162645 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -252,10 +252,10 @@ func (v *V1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultSer // Collect host traffic metrics. if v.config.Metrics.Enable && v.config.Metrics.EnableHost { - metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, + metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.PieceInfo.RangeSize)) if parent, loaded := v.resource.PeerManager().Load(piece.DstPid); loaded { - metrics.HostTraffic.WithLabelValues(metrics.HostTrafficUploadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, + metrics.HostTraffic.WithLabelValues(metrics.HostTrafficUploadType, peer.Task.Type.String(), parent.Host.Type.Name(), parent.Host.ID, parent.Host.IP, parent.Host.Hostname).Add(float64(piece.PieceInfo.RangeSize)) } else if !resource.IsPieceBackToSource(piece.DstPid) { peer.Log.Warnf("dst peer %s not found", piece.DstPid) @@ -265,10 +265,10 @@ func (v *V1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultSer // Collect traffic metrics. if !resource.IsPieceBackToSource(piece.DstPid) { metrics.Traffic.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.PieceInfo.RangeSize)) + peer.Host.Type.Name()).Add(float64(piece.PieceInfo.RangeSize)) } else { metrics.Traffic.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.PieceInfo.RangeSize)) + peer.Host.Type.Name()).Add(float64(piece.PieceInfo.RangeSize)) } continue } @@ -305,7 +305,7 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) // Collect DownloadPeerCount metrics. priority := peer.CalculatePriority(v.dynconfig) metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() parents := peer.Parents() if !req.GetSuccess() { @@ -313,7 +313,7 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) if peer.FSM.Is(resource.PeerStateBackToSource) { // Collect DownloadPeerBackToSourceFailureCount metrics. metrics.DownloadPeerBackToSourceFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() go v.createDownloadRecord(peer, parents, req) v.handleTaskFailure(ctx, peer.Task, req.GetSourceError(), nil) @@ -323,7 +323,7 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) // Collect DownloadPeerFailureCount metrics. metrics.DownloadPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() go v.createDownloadRecord(peer, parents, req) v.handlePeerFailure(ctx, peer) @@ -1035,12 +1035,17 @@ func (v *V1) registerTinyTask(ctx context.Context, peer *resource.Peer) (*schedu // registerSmallTask registers the small task. func (v *V1) registerSmallTask(ctx context.Context, peer *resource.Peer) (*schedulerv1.RegisterResult, error) { + // Record the start time. + start := time.Now() candidateParents, found := v.scheduling.FindParentAndCandidateParents(ctx, peer, set.NewSafeSet[string]()) if !found { return nil, errors.New("candidate parent not found") } candidateParent := candidateParents[0] + // Collect SchedulingDuration metrics. + metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds())) + // When task size scope is small, parent must be downloaded successfully // before returning to the parent directly. if !candidateParent.FSM.Is(resource.PeerStateSucceeded) { @@ -1147,7 +1152,12 @@ func (v *V1) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) { return } + // Record the start time. + start := time.Now() v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, set.NewSafeSet[string]()) + + // Collect SchedulingDuration metrics. + metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds())) default: } } @@ -1218,7 +1228,13 @@ func (v *V1) handlePieceFailure(ctx context.Context, peer *resource.Peer, piece if !loaded { peer.Log.Errorf("parent %s not found", piece.DstPid) peer.BlockParents.Add(piece.DstPid) + + // Record the start time. + start := time.Now() v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, peer.BlockParents) + + // Collect SchedulingDuration metrics. + metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds())) return } @@ -1277,7 +1293,13 @@ func (v *V1) handlePieceFailure(ctx context.Context, peer *resource.Peer, piece peer.Log.Infof("reschedule parent because of failed piece") peer.BlockParents.Add(parent.ID) + + // Record the start time. + start := time.Now() v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, peer.BlockParents) + + // Collect SchedulingDuration metrics. + metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds())) } // handlePeerSuccess handles successful peer. @@ -1319,7 +1341,13 @@ func (v *V1) handlePeerFailure(ctx context.Context, peer *resource.Peer) { // Reschedule a new parent to children of peer to exclude the current failed peer. for _, child := range peer.Children() { child.Log.Infof("reschedule parent because of parent peer %s is failed", peer.ID) + + // Record the start time. + start := time.Now() v.scheduling.ScheduleParentAndCandidateParents(ctx, child, child.BlockParents) + + // Collect SchedulingDuration metrics. + metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds())) } } @@ -1334,7 +1362,13 @@ func (v *V1) handleLegacySeedPeer(ctx context.Context, peer *resource.Peer) { // Reschedule a new parent to children of peer to exclude the current failed peer. for _, child := range peer.Children() { child.Log.Infof("reschedule parent because of parent peer %s is failed", peer.ID) + + // Record the start time. + start := time.Now() v.scheduling.ScheduleParentAndCandidateParents(ctx, child, child.BlockParents) + + // Collect SchedulingDuration metrics. + metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds())) } } diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index e415466ea..39d20edc3 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -873,7 +873,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S // Collect RegisterPeerCount metrics. priority := peer.CalculatePriority(v.dynconfig) metrics.RegisterPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() blocklist := set.NewSafeSet[string]() blocklist.Add(peer.ID) @@ -904,7 +904,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S if err := v.downloadTaskBySeedPeer(ctx, taskID, download, peer); err != nil { // Collect RegisterPeerFailureCount metrics. metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() return err } } else { @@ -920,7 +920,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownload); err != nil { // Collect RegisterPeerFailureCount metrics. metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() return status.Error(codes.Internal, err.Error()) } } else { @@ -960,13 +960,18 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S // Scheduling parent for the peer. peer.BlockParents.Add(peer.ID) + + // Record the start time. + start := time.Now() if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil { // Collect RegisterPeerFailureCount metrics. metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() return status.Error(codes.FailedPrecondition, err.Error()) } + // Collect SchedulingDuration metrics. + metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds())) return nil default: return status.Errorf(codes.FailedPrecondition, "invalid size cope %#v", sizeScope) @@ -983,14 +988,14 @@ func (v *V2) handleDownloadPeerStartedRequest(ctx context.Context, peerID string // Collect DownloadPeerStartedCount metrics. priority := peer.CalculatePriority(v.dynconfig) metrics.DownloadPeerStartedCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() // Handle peer with peer started request. if !peer.FSM.Is(resource.PeerStateRunning) { if err := peer.FSM.Event(ctx, resource.PeerEventDownload); err != nil { // Collect DownloadPeerStartedFailureCount metrics. metrics.DownloadPeerStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() return status.Error(codes.Internal, err.Error()) } } @@ -1008,14 +1013,14 @@ func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, p // Collect DownloadPeerBackToSourceStartedCount metrics. priority := peer.CalculatePriority(v.dynconfig) metrics.DownloadPeerBackToSourceStartedCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() // Handle peer with peer back-to-source started request. if !peer.FSM.Is(resource.PeerStateRunning) { if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil { // Collect DownloadPeerBackToSourceStartedFailureCount metrics. metrics.DownloadPeerBackToSourceStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() return status.Error(codes.Internal, err.Error()) } } @@ -1035,10 +1040,14 @@ func (v *V2) handleRescheduleRequest(ctx context.Context, peerID string, candida peer.BlockParents.Add(candidateParent.GetId()) } + // Record the start time. + start := time.Now() if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil { return status.Error(codes.FailedPrecondition, err.Error()) } + // Collect SchedulingDuration metrics. + metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds())) return nil } @@ -1058,7 +1067,7 @@ func (v *V2) handleDownloadPeerFinishedRequest(ctx context.Context, peerID strin // Collect DownloadPeerCount and DownloadPeerDuration metrics. priority := peer.CalculatePriority(v.dynconfig) metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() // TODO to be determined which traffic type to use, temporarily use TrafficType_REMOTE_PEER instead metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(peer.Cost.Load().Milliseconds())) @@ -1091,7 +1100,7 @@ func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(ctx context.Context, // Collect DownloadPeerCount and DownloadPeerDuration metrics. priority := peer.CalculatePriority(v.dynconfig) metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() // TODO to be determined which traffic type to use, temporarily use TrafficType_REMOTE_PEER instead metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(peer.Cost.Load().Milliseconds())) @@ -1116,9 +1125,9 @@ func (v *V2) handleDownloadPeerFailedRequest(ctx context.Context, peerID string) // Collect DownloadPeerCount and DownloadPeerFailureCount metrics. priority := peer.CalculatePriority(v.dynconfig) metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() metrics.DownloadPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() return nil } @@ -1146,9 +1155,9 @@ func (v *V2) handleDownloadPeerBackToSourceFailedRequest(ctx context.Context, pe // Collect DownloadPeerCount and DownloadPeerBackToSourceFailureCount metrics. priority := peer.CalculatePriority(v.dynconfig) metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() metrics.DownloadPeerBackToSourceFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() return nil } @@ -1201,14 +1210,14 @@ func (v *V2) handleDownloadPieceFinishedRequest(peerID string, req *schedulerv2. // Collect piece and traffic metrics. metrics.DownloadPieceCount.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() metrics.Traffic.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.Length)) + peer.Host.Type.Name()).Add(float64(piece.Length)) if v.config.Metrics.EnableHost { - metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, + metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.Length)) if loadedParent { - metrics.HostTraffic.WithLabelValues(metrics.HostTrafficUploadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, + metrics.HostTraffic.WithLabelValues(metrics.HostTrafficUploadType, peer.Task.Type.String(), parent.Host.Type.Name(), parent.Host.ID, parent.Host.IP, parent.Host.Hostname).Add(float64(piece.Length)) } } @@ -1257,11 +1266,11 @@ func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(ctx context.Context, // Collect piece and traffic metrics. metrics.DownloadPieceCount.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() metrics.Traffic.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.Length)) + peer.Host.Type.Name()).Add(float64(piece.Length)) if v.config.Metrics.EnableHost { - metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, + metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.Length)) } @@ -1277,9 +1286,9 @@ func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, peerID string // Collect DownloadPieceCount and DownloadPieceFailureCount metrics. metrics.DownloadPieceCount.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() metrics.DownloadPieceFailureCount.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() if req.Temporary { // Handle peer with piece temporary failed request. @@ -1312,9 +1321,9 @@ func (v *V2) handleDownloadPieceBackToSourceFailedRequest(ctx context.Context, p // Collect DownloadPieceCount and DownloadPieceFailureCount metrics. metrics.DownloadPieceCount.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() metrics.DownloadPieceFailureCount.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(), - peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() + peer.Host.Type.Name()).Inc() return status.Error(codes.Internal, "download piece from source failed") }