feat: add ScheduleDuration for recording duration of the scheduling (#3444)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-08-19 13:59:12 +08:00 committed by GitHub
parent eaaeec4629
commit 2b2635c9c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 100 additions and 52 deletions

View File

@ -101,84 +101,84 @@ var (
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "register_peer_total", Name: "register_peer_total",
Help: "Counter of the number of the register peer.", Help: "Counter of the number of the register peer.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) }, []string{"priority", "task_type", "host_type"})
RegisterPeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ RegisterPeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "register_peer_failure_total", Name: "register_peer_failure_total",
Help: "Counter of the number of failed of the register peer.", Help: "Counter of the number of failed of the register peer.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) }, []string{"priority", "task_type", "host_type"})
DownloadPeerStartedCount = promauto.NewCounterVec(prometheus.CounterOpts{ DownloadPeerStartedCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "download_peer_started_total", Name: "download_peer_started_total",
Help: "Counter of the number of the download peer started.", Help: "Counter of the number of the download peer started.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) }, []string{"priority", "task_type", "host_type"})
DownloadPeerStartedFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ DownloadPeerStartedFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "download_peer_started_failure_total", Name: "download_peer_started_failure_total",
Help: "Counter of the number of failed of the download peer started.", Help: "Counter of the number of failed of the download peer started.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) }, []string{"priority", "task_type", "host_type"})
DownloadPeerBackToSourceStartedCount = promauto.NewCounterVec(prometheus.CounterOpts{ DownloadPeerBackToSourceStartedCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "download_peer_back_to_source_started_total", Name: "download_peer_back_to_source_started_total",
Help: "Counter of the number of the download peer back-to-source started.", Help: "Counter of the number of the download peer back-to-source started.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) }, []string{"priority", "task_type", "host_type"})
DownloadPeerBackToSourceStartedFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ DownloadPeerBackToSourceStartedFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "download_peer_back_to_source_started_failure_total", Name: "download_peer_back_to_source_started_failure_total",
Help: "Counter of the number of failed of the download peer back-to-source started.", Help: "Counter of the number of failed of the download peer back-to-source started.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) }, []string{"priority", "task_type", "host_type"})
DownloadPeerCount = promauto.NewCounterVec(prometheus.CounterOpts{ DownloadPeerCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "download_peer_finished_total", Name: "download_peer_finished_total",
Help: "Counter of the number of the download peer.", Help: "Counter of the number of the download peer.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) }, []string{"priority", "task_type", "host_type"})
DownloadPeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ DownloadPeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "download_peer_finished_failure_total", Name: "download_peer_finished_failure_total",
Help: "Counter of the number of failed of the download peer.", Help: "Counter of the number of failed of the download peer.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) }, []string{"priority", "task_type", "host_type"})
DownloadPeerBackToSourceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ DownloadPeerBackToSourceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "download_peer_back_to_source_finished_failure_total", Name: "download_peer_back_to_source_finished_failure_total",
Help: "Counter of the number of failed of the download peer back-to-source.", Help: "Counter of the number of failed of the download peer back-to-source.",
}, []string{"priority", "task_type", "task_tag", "task_app", "host_type"}) }, []string{"priority", "task_type", "host_type"})
DownloadPieceCount = promauto.NewCounterVec(prometheus.CounterOpts{ DownloadPieceCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "download_piece_finished_total", Name: "download_piece_finished_total",
Help: "Counter of the number of the download piece.", Help: "Counter of the number of the download piece.",
}, []string{"traffic_type", "task_type", "task_tag", "task_app", "host_type"}) }, []string{"traffic_type", "task_type", "host_type"})
DownloadPieceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ DownloadPieceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "download_piece_finished_failure_total", Name: "download_piece_finished_failure_total",
Help: "Counter of the number of failed of the download piece.", Help: "Counter of the number of failed of the download piece.",
}, []string{"traffic_type", "task_type", "task_tag", "task_app", "host_type"}) }, []string{"traffic_type", "task_type", "host_type"})
DownloadPieceBackToSourceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ DownloadPieceBackToSourceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "download_piece_back_to_source_finished_failure_total", Name: "download_piece_back_to_source_finished_failure_total",
Help: "Counter of the number of failed of the download piece back-to-source.", Help: "Counter of the number of failed of the download piece back-to-source.",
}, []string{"traffic_type", "task_type", "task_tag", "task_app", "host_type"}) }, []string{"traffic_type", "task_type", "host_type"})
StatTaskCount = promauto.NewCounter(prometheus.CounterOpts{ StatTaskCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
@ -257,14 +257,14 @@ var (
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "traffic", Name: "traffic",
Help: "Counter of the number of traffic.", Help: "Counter of the number of traffic.",
}, []string{"type", "task_type", "task_tag", "task_app", "host_type"}) }, []string{"type", "task_type", "host_type"})
HostTraffic = promauto.NewCounterVec(prometheus.CounterOpts{ HostTraffic = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,
Name: "host_traffic", Name: "host_traffic",
Help: "Counter of the number of per host traffic.", Help: "Counter of the number of per host traffic.",
}, []string{"type", "task_type", "task_tag", "task_app", "host_type", "host_id", "host_ip", "host_name"}) }, []string{"type", "task_type", "host_type", "host_id", "host_ip", "host_name"})
DownloadPeerDuration = promauto.NewSummaryVec(prometheus.SummaryOpts{ DownloadPeerDuration = promauto.NewSummaryVec(prometheus.SummaryOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
@ -281,6 +281,14 @@ var (
Help: "Gauge of the number of concurrent of the scheduling.", Help: "Gauge of the number of concurrent of the scheduling.",
}) })
ScheduleDuration = promauto.NewSummary(prometheus.SummaryOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "schedule_duration_milliseconds",
Help: "Summary of the time each scheduling.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
})
VersionGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ VersionGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: types.MetricsNamespace, Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName, Subsystem: types.SchedulerMetricsName,

View File

@ -213,7 +213,7 @@ func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task)
trafficType = commonv2.TrafficType_LOCAL_PEER trafficType = commonv2.TrafficType_LOCAL_PEER
} }
metrics.Traffic.WithLabelValues(trafficType.String(), peer.Task.Type.String(), metrics.Traffic.WithLabelValues(trafficType.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(pieceSeed.PieceInfo.RangeSize)) peer.Host.Type.Name()).Add(float64(pieceSeed.PieceInfo.RangeSize))
} }
// Handle end of piece. // Handle end of piece.

View File

@ -59,19 +59,16 @@ func (s *schedulerServerV1) RegisterPeerTask(ctx context.Context, req *scheduler
if req.TaskId == "" { if req.TaskId == "" {
req.TaskId = idgen.TaskIDV1(req.Url, req.UrlMeta) req.TaskId = idgen.TaskIDV1(req.Url, req.UrlMeta)
} }
tag := req.UrlMeta.Tag
application := req.UrlMeta.Application
priority := req.UrlMeta.Priority priority := req.UrlMeta.Priority
// Collect RegisterPeerCount metrics. // Collect RegisterPeerCount metrics.
metrics.RegisterPeerCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(), metrics.RegisterPeerCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(),
tag, application, types.HostTypeNormalName).Inc() types.HostTypeNormalName).Inc()
resp, err := s.service.RegisterPeerTask(ctx, req) resp, err := s.service.RegisterPeerTask(ctx, req)
if err != nil { if err != nil {
// Collect RegisterPeerFailureCount metrics. // Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(), metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), commonv1.TaskType_Normal.String(),
tag, application, types.HostTypeNormalName).Inc() types.HostTypeNormalName).Inc()
} }
return resp, err return resp, err

View File

@ -252,10 +252,10 @@ func (v *V1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultSer
// Collect host traffic metrics. // Collect host traffic metrics.
if v.config.Metrics.Enable && v.config.Metrics.EnableHost { if v.config.Metrics.Enable && v.config.Metrics.EnableHost {
metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(),
peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.PieceInfo.RangeSize)) peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.PieceInfo.RangeSize))
if parent, loaded := v.resource.PeerManager().Load(piece.DstPid); loaded { if parent, loaded := v.resource.PeerManager().Load(piece.DstPid); loaded {
metrics.HostTraffic.WithLabelValues(metrics.HostTrafficUploadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, metrics.HostTraffic.WithLabelValues(metrics.HostTrafficUploadType, peer.Task.Type.String(),
parent.Host.Type.Name(), parent.Host.ID, parent.Host.IP, parent.Host.Hostname).Add(float64(piece.PieceInfo.RangeSize)) parent.Host.Type.Name(), parent.Host.ID, parent.Host.IP, parent.Host.Hostname).Add(float64(piece.PieceInfo.RangeSize))
} else if !resource.IsPieceBackToSource(piece.DstPid) { } else if !resource.IsPieceBackToSource(piece.DstPid) {
peer.Log.Warnf("dst peer %s not found", piece.DstPid) peer.Log.Warnf("dst peer %s not found", piece.DstPid)
@ -265,10 +265,10 @@ func (v *V1) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResultSer
// Collect traffic metrics. // Collect traffic metrics.
if !resource.IsPieceBackToSource(piece.DstPid) { if !resource.IsPieceBackToSource(piece.DstPid) {
metrics.Traffic.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(), metrics.Traffic.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.PieceInfo.RangeSize)) peer.Host.Type.Name()).Add(float64(piece.PieceInfo.RangeSize))
} else { } else {
metrics.Traffic.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(), metrics.Traffic.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.PieceInfo.RangeSize)) peer.Host.Type.Name()).Add(float64(piece.PieceInfo.RangeSize))
} }
continue continue
} }
@ -305,7 +305,7 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult)
// Collect DownloadPeerCount metrics. // Collect DownloadPeerCount metrics.
priority := peer.CalculatePriority(v.dynconfig) priority := peer.CalculatePriority(v.dynconfig)
metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
parents := peer.Parents() parents := peer.Parents()
if !req.GetSuccess() { if !req.GetSuccess() {
@ -313,7 +313,7 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult)
if peer.FSM.Is(resource.PeerStateBackToSource) { if peer.FSM.Is(resource.PeerStateBackToSource) {
// Collect DownloadPeerBackToSourceFailureCount metrics. // Collect DownloadPeerBackToSourceFailureCount metrics.
metrics.DownloadPeerBackToSourceFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerBackToSourceFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
go v.createDownloadRecord(peer, parents, req) go v.createDownloadRecord(peer, parents, req)
v.handleTaskFailure(ctx, peer.Task, req.GetSourceError(), nil) v.handleTaskFailure(ctx, peer.Task, req.GetSourceError(), nil)
@ -323,7 +323,7 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult)
// Collect DownloadPeerFailureCount metrics. // Collect DownloadPeerFailureCount metrics.
metrics.DownloadPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
go v.createDownloadRecord(peer, parents, req) go v.createDownloadRecord(peer, parents, req)
v.handlePeerFailure(ctx, peer) v.handlePeerFailure(ctx, peer)
@ -1035,12 +1035,17 @@ func (v *V1) registerTinyTask(ctx context.Context, peer *resource.Peer) (*schedu
// registerSmallTask registers the small task. // registerSmallTask registers the small task.
func (v *V1) registerSmallTask(ctx context.Context, peer *resource.Peer) (*schedulerv1.RegisterResult, error) { func (v *V1) registerSmallTask(ctx context.Context, peer *resource.Peer) (*schedulerv1.RegisterResult, error) {
// Record the start time.
start := time.Now()
candidateParents, found := v.scheduling.FindParentAndCandidateParents(ctx, peer, set.NewSafeSet[string]()) candidateParents, found := v.scheduling.FindParentAndCandidateParents(ctx, peer, set.NewSafeSet[string]())
if !found { if !found {
return nil, errors.New("candidate parent not found") return nil, errors.New("candidate parent not found")
} }
candidateParent := candidateParents[0] candidateParent := candidateParents[0]
// Collect SchedulingDuration metrics.
metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds()))
// When task size scope is small, parent must be downloaded successfully // When task size scope is small, parent must be downloaded successfully
// before returning to the parent directly. // before returning to the parent directly.
if !candidateParent.FSM.Is(resource.PeerStateSucceeded) { if !candidateParent.FSM.Is(resource.PeerStateSucceeded) {
@ -1147,7 +1152,12 @@ func (v *V1) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) {
return return
} }
// Record the start time.
start := time.Now()
v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, set.NewSafeSet[string]()) v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, set.NewSafeSet[string]())
// Collect SchedulingDuration metrics.
metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds()))
default: default:
} }
} }
@ -1218,7 +1228,13 @@ func (v *V1) handlePieceFailure(ctx context.Context, peer *resource.Peer, piece
if !loaded { if !loaded {
peer.Log.Errorf("parent %s not found", piece.DstPid) peer.Log.Errorf("parent %s not found", piece.DstPid)
peer.BlockParents.Add(piece.DstPid) peer.BlockParents.Add(piece.DstPid)
// Record the start time.
start := time.Now()
v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, peer.BlockParents) v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, peer.BlockParents)
// Collect SchedulingDuration metrics.
metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds()))
return return
} }
@ -1277,7 +1293,13 @@ func (v *V1) handlePieceFailure(ctx context.Context, peer *resource.Peer, piece
peer.Log.Infof("reschedule parent because of failed piece") peer.Log.Infof("reschedule parent because of failed piece")
peer.BlockParents.Add(parent.ID) peer.BlockParents.Add(parent.ID)
// Record the start time.
start := time.Now()
v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, peer.BlockParents) v.scheduling.ScheduleParentAndCandidateParents(ctx, peer, peer.BlockParents)
// Collect SchedulingDuration metrics.
metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds()))
} }
// handlePeerSuccess handles successful peer. // handlePeerSuccess handles successful peer.
@ -1319,7 +1341,13 @@ func (v *V1) handlePeerFailure(ctx context.Context, peer *resource.Peer) {
// Reschedule a new parent to children of peer to exclude the current failed peer. // Reschedule a new parent to children of peer to exclude the current failed peer.
for _, child := range peer.Children() { for _, child := range peer.Children() {
child.Log.Infof("reschedule parent because of parent peer %s is failed", peer.ID) child.Log.Infof("reschedule parent because of parent peer %s is failed", peer.ID)
// Record the start time.
start := time.Now()
v.scheduling.ScheduleParentAndCandidateParents(ctx, child, child.BlockParents) v.scheduling.ScheduleParentAndCandidateParents(ctx, child, child.BlockParents)
// Collect SchedulingDuration metrics.
metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds()))
} }
} }
@ -1334,7 +1362,13 @@ func (v *V1) handleLegacySeedPeer(ctx context.Context, peer *resource.Peer) {
// Reschedule a new parent to children of peer to exclude the current failed peer. // Reschedule a new parent to children of peer to exclude the current failed peer.
for _, child := range peer.Children() { for _, child := range peer.Children() {
child.Log.Infof("reschedule parent because of parent peer %s is failed", peer.ID) child.Log.Infof("reschedule parent because of parent peer %s is failed", peer.ID)
// Record the start time.
start := time.Now()
v.scheduling.ScheduleParentAndCandidateParents(ctx, child, child.BlockParents) v.scheduling.ScheduleParentAndCandidateParents(ctx, child, child.BlockParents)
// Collect SchedulingDuration metrics.
metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds()))
} }
} }

View File

@ -873,7 +873,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
// Collect RegisterPeerCount metrics. // Collect RegisterPeerCount metrics.
priority := peer.CalculatePriority(v.dynconfig) priority := peer.CalculatePriority(v.dynconfig)
metrics.RegisterPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.RegisterPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
blocklist := set.NewSafeSet[string]() blocklist := set.NewSafeSet[string]()
blocklist.Add(peer.ID) blocklist.Add(peer.ID)
@ -904,7 +904,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
if err := v.downloadTaskBySeedPeer(ctx, taskID, download, peer); err != nil { if err := v.downloadTaskBySeedPeer(ctx, taskID, download, peer); err != nil {
// Collect RegisterPeerFailureCount metrics. // Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
return err return err
} }
} else { } else {
@ -920,7 +920,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownload); err != nil { if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownload); err != nil {
// Collect RegisterPeerFailureCount metrics. // Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
} else { } else {
@ -960,13 +960,18 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
// Scheduling parent for the peer. // Scheduling parent for the peer.
peer.BlockParents.Add(peer.ID) peer.BlockParents.Add(peer.ID)
// Record the start time.
start := time.Now()
if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil { if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil {
// Collect RegisterPeerFailureCount metrics. // Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
return status.Error(codes.FailedPrecondition, err.Error()) return status.Error(codes.FailedPrecondition, err.Error())
} }
// Collect SchedulingDuration metrics.
metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds()))
return nil return nil
default: default:
return status.Errorf(codes.FailedPrecondition, "invalid size cope %#v", sizeScope) return status.Errorf(codes.FailedPrecondition, "invalid size cope %#v", sizeScope)
@ -983,14 +988,14 @@ func (v *V2) handleDownloadPeerStartedRequest(ctx context.Context, peerID string
// Collect DownloadPeerStartedCount metrics. // Collect DownloadPeerStartedCount metrics.
priority := peer.CalculatePriority(v.dynconfig) priority := peer.CalculatePriority(v.dynconfig)
metrics.DownloadPeerStartedCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerStartedCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
// Handle peer with peer started request. // Handle peer with peer started request.
if !peer.FSM.Is(resource.PeerStateRunning) { if !peer.FSM.Is(resource.PeerStateRunning) {
if err := peer.FSM.Event(ctx, resource.PeerEventDownload); err != nil { if err := peer.FSM.Event(ctx, resource.PeerEventDownload); err != nil {
// Collect DownloadPeerStartedFailureCount metrics. // Collect DownloadPeerStartedFailureCount metrics.
metrics.DownloadPeerStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
} }
@ -1008,14 +1013,14 @@ func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, p
// Collect DownloadPeerBackToSourceStartedCount metrics. // Collect DownloadPeerBackToSourceStartedCount metrics.
priority := peer.CalculatePriority(v.dynconfig) priority := peer.CalculatePriority(v.dynconfig)
metrics.DownloadPeerBackToSourceStartedCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerBackToSourceStartedCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
// Handle peer with peer back-to-source started request. // Handle peer with peer back-to-source started request.
if !peer.FSM.Is(resource.PeerStateRunning) { if !peer.FSM.Is(resource.PeerStateRunning) {
if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil { if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil {
// Collect DownloadPeerBackToSourceStartedFailureCount metrics. // Collect DownloadPeerBackToSourceStartedFailureCount metrics.
metrics.DownloadPeerBackToSourceStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerBackToSourceStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
} }
@ -1035,10 +1040,14 @@ func (v *V2) handleRescheduleRequest(ctx context.Context, peerID string, candida
peer.BlockParents.Add(candidateParent.GetId()) peer.BlockParents.Add(candidateParent.GetId())
} }
// Record the start time.
start := time.Now()
if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil { if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil {
return status.Error(codes.FailedPrecondition, err.Error()) return status.Error(codes.FailedPrecondition, err.Error())
} }
// Collect SchedulingDuration metrics.
metrics.ScheduleDuration.Observe(float64(time.Since(start).Milliseconds()))
return nil return nil
} }
@ -1058,7 +1067,7 @@ func (v *V2) handleDownloadPeerFinishedRequest(ctx context.Context, peerID strin
// Collect DownloadPeerCount and DownloadPeerDuration metrics. // Collect DownloadPeerCount and DownloadPeerDuration metrics.
priority := peer.CalculatePriority(v.dynconfig) priority := peer.CalculatePriority(v.dynconfig)
metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
// TODO to be determined which traffic type to use, temporarily use TrafficType_REMOTE_PEER instead // TODO to be determined which traffic type to use, temporarily use TrafficType_REMOTE_PEER instead
metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(peer.Cost.Load().Milliseconds())) metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(peer.Cost.Load().Milliseconds()))
@ -1091,7 +1100,7 @@ func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(ctx context.Context,
// Collect DownloadPeerCount and DownloadPeerDuration metrics. // Collect DownloadPeerCount and DownloadPeerDuration metrics.
priority := peer.CalculatePriority(v.dynconfig) priority := peer.CalculatePriority(v.dynconfig)
metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
// TODO to be determined which traffic type to use, temporarily use TrafficType_REMOTE_PEER instead // TODO to be determined which traffic type to use, temporarily use TrafficType_REMOTE_PEER instead
metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(peer.Cost.Load().Milliseconds())) metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(peer.Cost.Load().Milliseconds()))
@ -1116,9 +1125,9 @@ func (v *V2) handleDownloadPeerFailedRequest(ctx context.Context, peerID string)
// Collect DownloadPeerCount and DownloadPeerFailureCount metrics. // Collect DownloadPeerCount and DownloadPeerFailureCount metrics.
priority := peer.CalculatePriority(v.dynconfig) priority := peer.CalculatePriority(v.dynconfig)
metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
metrics.DownloadPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
return nil return nil
} }
@ -1146,9 +1155,9 @@ func (v *V2) handleDownloadPeerBackToSourceFailedRequest(ctx context.Context, pe
// Collect DownloadPeerCount and DownloadPeerBackToSourceFailureCount metrics. // Collect DownloadPeerCount and DownloadPeerBackToSourceFailureCount metrics.
priority := peer.CalculatePriority(v.dynconfig) priority := peer.CalculatePriority(v.dynconfig)
metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
metrics.DownloadPeerBackToSourceFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerBackToSourceFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
return nil return nil
} }
@ -1201,14 +1210,14 @@ func (v *V2) handleDownloadPieceFinishedRequest(peerID string, req *schedulerv2.
// Collect piece and traffic metrics. // Collect piece and traffic metrics.
metrics.DownloadPieceCount.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(), metrics.DownloadPieceCount.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
metrics.Traffic.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(), metrics.Traffic.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.Length)) peer.Host.Type.Name()).Add(float64(piece.Length))
if v.config.Metrics.EnableHost { if v.config.Metrics.EnableHost {
metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(),
peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.Length)) peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.Length))
if loadedParent { if loadedParent {
metrics.HostTraffic.WithLabelValues(metrics.HostTrafficUploadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, metrics.HostTraffic.WithLabelValues(metrics.HostTrafficUploadType, peer.Task.Type.String(),
parent.Host.Type.Name(), parent.Host.ID, parent.Host.IP, parent.Host.Hostname).Add(float64(piece.Length)) parent.Host.Type.Name(), parent.Host.ID, parent.Host.IP, parent.Host.Hostname).Add(float64(piece.Length))
} }
} }
@ -1257,11 +1266,11 @@ func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(ctx context.Context,
// Collect piece and traffic metrics. // Collect piece and traffic metrics.
metrics.DownloadPieceCount.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(), metrics.DownloadPieceCount.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
metrics.Traffic.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(), metrics.Traffic.WithLabelValues(piece.TrafficType.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Add(float64(piece.Length)) peer.Host.Type.Name()).Add(float64(piece.Length))
if v.config.Metrics.EnableHost { if v.config.Metrics.EnableHost {
metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(), peer.Task.Tag, peer.Task.Application, metrics.HostTraffic.WithLabelValues(metrics.HostTrafficDownloadType, peer.Task.Type.String(),
peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.Length)) peer.Host.Type.Name(), peer.Host.ID, peer.Host.IP, peer.Host.Hostname).Add(float64(piece.Length))
} }
@ -1277,9 +1286,9 @@ func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, peerID string
// Collect DownloadPieceCount and DownloadPieceFailureCount metrics. // Collect DownloadPieceCount and DownloadPieceFailureCount metrics.
metrics.DownloadPieceCount.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(), metrics.DownloadPieceCount.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
metrics.DownloadPieceFailureCount.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(), metrics.DownloadPieceFailureCount.WithLabelValues(commonv2.TrafficType_REMOTE_PEER.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
if req.Temporary { if req.Temporary {
// Handle peer with piece temporary failed request. // Handle peer with piece temporary failed request.
@ -1312,9 +1321,9 @@ func (v *V2) handleDownloadPieceBackToSourceFailedRequest(ctx context.Context, p
// Collect DownloadPieceCount and DownloadPieceFailureCount metrics. // Collect DownloadPieceCount and DownloadPieceFailureCount metrics.
metrics.DownloadPieceCount.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(), metrics.DownloadPieceCount.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
metrics.DownloadPieceFailureCount.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(), metrics.DownloadPieceFailureCount.WithLabelValues(commonv2.TrafficType_BACK_TO_SOURCE.String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
return status.Error(codes.Internal, "download piece from source failed") return status.Error(codes.Internal, "download piece from source failed")
} }