pd/pkg/progress/progress.go

291 lines
8.1 KiB
Go

// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package progress
import (
"context"
"math"
"strconv"
"time"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/utils/syncutil"
)
const (
gcInterval = 2 * time.Minute
expiredDuration = 30 * time.Second
removingAction Action = "removing"
preparingAction Action = "preparing"
)
type patrolRegionsDurationGetter interface {
GetPatrolRegionsDuration() time.Duration
}
// Manager is used to maintain the progresses we care about.
type Manager struct {
syncutil.RWMutex
progresses map[uint64]*progressIndicator
completedProgress map[uint64]*progressIndicator
patrolRegionsDurationGetter patrolRegionsDurationGetter
updateInterval time.Duration
}
// NewManager creates a new Manager.
func NewManager(patrolRegionsDurationGetter patrolRegionsDurationGetter,
updateInterval time.Duration) *Manager {
return &Manager{
progresses: make(map[uint64]*progressIndicator),
completedProgress: make(map[uint64]*progressIndicator),
patrolRegionsDurationGetter: patrolRegionsDurationGetter,
updateInterval: updateInterval,
}
}
// Action is the action of the progress.
type Action string
// Progress is the progress of the online/offline store.
type Progress struct {
Action
ProgressPercent float64
LeftSecond float64
CurrentSpeed float64
}
// Reset resets the progress manager.
func (m *Manager) Reset() {
storesProgressGauge.Reset()
storesSpeedGauge.Reset()
storesETAGauge.Reset()
m.Lock()
defer m.Unlock()
m.progresses = make(map[uint64]*progressIndicator)
}
// SetPatrolRegionsDurationGetter sets the patrol regions duration getter.
func (m *Manager) SetPatrolRegionsDurationGetter(getter patrolRegionsDurationGetter) {
m.Lock()
defer m.Unlock()
m.patrolRegionsDurationGetter = getter
}
func (m *Manager) addProgress(
storeID uint64,
action Action,
current, total float64,
) {
m.progresses[storeID] = newProgressIndicator(
action,
current, total,
m.updateInterval,
)
}
// GC starts a goroutine to clean up the completed progress.
func (m *Manager) GC(ctx context.Context) {
ticker := time.NewTicker(gcInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
m.gcCompletedProgress()
case <-ctx.Done():
return
}
}
}
func (m *Manager) gcCompletedProgress() {
m.Lock()
defer m.Unlock()
exactExpiredDuration := expiredDuration
failpoint.Inject("gcExpiredTime", func(val failpoint.Value) {
if s, ok := val.(string); ok {
var err error
exactExpiredDuration, err = time.ParseDuration(s)
if err != nil {
panic(err)
}
}
})
exactExpiredTime := time.Now().Add(-exactExpiredDuration)
for storeID, p := range m.completedProgress {
if p.completeAt.Before(exactExpiredTime) {
storeIDStr := strconv.FormatUint(storeID, 10)
delete(m.completedProgress, storeID)
storesProgressGauge.DeleteLabelValues("", storeIDStr, string(p.Action))
storesSpeedGauge.DeleteLabelValues("", storeIDStr, string(p.Action))
storesETAGauge.DeleteLabelValues("", storeIDStr, string(p.Action))
}
}
}
func (m *Manager) markProgressAsFinished(storeID uint64) {
m.Lock()
defer m.Unlock()
p, exist := m.progresses[storeID]
if !exist {
return
}
p.completeAt = time.Now()
p.push(p.targetRegionSize)
m.completedProgress[storeID] = p
delete(m.progresses, storeID)
}
// UpdateProgress updates the progress of the store.
func (m *Manager) UpdateProgress(
store *core.StoreInfo,
currentRegionSize, threshold float64,
) {
if m == nil || store == nil {
return
}
p := m.GetProgressByStoreID(store.GetID())
if p != nil && ((p.Action == preparingAction && !store.IsPreparing()) ||
(p.Action == removingAction && !store.IsRemoving())) {
m.markProgressAsFinished(store.GetID())
}
var (
storeID = store.GetID()
action Action
// targetRegionSize is the total region size that need to be added/deleted.
// - If the store is in preparing state, it represents the total region size
// that need to be added, and it may be updated during the process.
// - If the store is in removing state, it represents the total region size
// that need to be deleted, and it is set in the first time.
targetRegionSize float64
)
switch store.GetNodeState() {
case metapb.NodeState_Preparing:
action = preparingAction
targetRegionSize = threshold
case metapb.NodeState_Removing:
action = removingAction
m.RLock()
p, exist := m.progresses[storeID]
if exist && p.Action == removingAction {
// currentRegionSize represents the current deleted region size.
currentRegionSize = math.Max(0, p.targetRegionSize-currentRegionSize)
} else {
// targetRegionSize represents the total region size that need to be
// deleted. It is set in the first time when the store is in removing state.
targetRegionSize = currentRegionSize
currentRegionSize = 0
}
m.RUnlock()
default:
return
}
m.updateStoreProgress(storeID, action, currentRegionSize, targetRegionSize)
}
func (m *Manager) updateStoreProgress(
storeID uint64,
action Action,
currentRegionSize, targetRegionSize float64,
) {
m.Lock()
defer m.Unlock()
p, exist := m.progresses[storeID]
if !exist || p.Action != action {
m.addProgress(storeID, action, currentRegionSize, targetRegionSize)
return
}
// The targetRegionSize of the preparing progress may be updated.
if p.targetRegionSize < targetRegionSize {
p.targetRegionSize = targetRegionSize
}
if action == removingAction {
// If the number of regions is large, each round of PatrolRegion takes a
// long time. The regions of the offline store may have not been scanned
// during some updateInterval. In this case, if the window is not large
// enough, the offline speed will vary greatly, which is not in line
// with expectations. Therefore, we adjust the window size based on the
// time consumed by PatrolRegion to avoid excessive fluctuations in the
// offline speed, which may cause misleading results.
p.adjustWindowLength(m.patrolRegionsDurationGetter.GetPatrolRegionsDuration())
}
p.push(currentRegionSize)
storeLabel := strconv.FormatUint(storeID, 10)
// For now, we don't need to record the address of the store. We can record
// them when we need it.
storesProgressGauge.WithLabelValues("", storeLabel, string(action)).Set(p.ProgressPercent)
storesSpeedGauge.WithLabelValues("", storeLabel, string(action)).Set(p.CurrentSpeed)
storesETAGauge.WithLabelValues("", storeLabel, string(action)).Set(p.LeftSecond)
}
// GetProgressByStoreID gets progresses by the store id.
func (m *Manager) GetProgressByStoreID(storeID uint64) *Progress {
m.RLock()
defer m.RUnlock()
p, exist := m.progresses[storeID]
if !exist {
return nil
}
return p.Progress
}
// GetAverageProgressByAction gets the average progress of all stores
func (m *Manager) GetAverageProgressByAction(action Action) *Progress {
m.RLock()
defer m.RUnlock()
var (
totalProgressPercent, totalLeftSeconds, totalCurrentSpeed float64
count int
)
for _, p := range m.progresses {
if p.Action == action {
totalProgressPercent += p.ProgressPercent
totalLeftSeconds += p.LeftSecond
totalCurrentSpeed += p.CurrentSpeed
count++
}
}
if count == 0 {
return nil
}
if math.IsInf(totalLeftSeconds, 1) {
totalLeftSeconds = math.MaxFloat64
} else {
totalLeftSeconds /= float64(count)
}
return &Progress{
Action: action,
ProgressPercent: totalProgressPercent / float64(count),
LeftSecond: totalLeftSeconds,
CurrentSpeed: totalCurrentSpeed / float64(count),
}
}