mirror of https://github.com/tikv/pd.git
291 lines
8.1 KiB
Go
291 lines
8.1 KiB
Go
// Copyright 2022 TiKV Project Authors.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package progress
|
|
|
|
import (
|
|
"context"
|
|
"math"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/kvproto/pkg/metapb"
|
|
|
|
"github.com/tikv/pd/pkg/core"
|
|
"github.com/tikv/pd/pkg/utils/syncutil"
|
|
)
|
|
|
|
const (
|
|
gcInterval = 2 * time.Minute
|
|
expiredDuration = 30 * time.Second
|
|
|
|
removingAction Action = "removing"
|
|
preparingAction Action = "preparing"
|
|
)
|
|
|
|
type patrolRegionsDurationGetter interface {
|
|
GetPatrolRegionsDuration() time.Duration
|
|
}
|
|
|
|
// Manager is used to maintain the progresses we care about.
|
|
type Manager struct {
|
|
syncutil.RWMutex
|
|
progresses map[uint64]*progressIndicator
|
|
completedProgress map[uint64]*progressIndicator
|
|
patrolRegionsDurationGetter patrolRegionsDurationGetter
|
|
|
|
updateInterval time.Duration
|
|
}
|
|
|
|
// NewManager creates a new Manager.
|
|
func NewManager(patrolRegionsDurationGetter patrolRegionsDurationGetter,
|
|
updateInterval time.Duration) *Manager {
|
|
return &Manager{
|
|
progresses: make(map[uint64]*progressIndicator),
|
|
completedProgress: make(map[uint64]*progressIndicator),
|
|
patrolRegionsDurationGetter: patrolRegionsDurationGetter,
|
|
updateInterval: updateInterval,
|
|
}
|
|
}
|
|
|
|
// Action is the action of the progress.
|
|
type Action string
|
|
|
|
// Progress is the progress of the online/offline store.
|
|
type Progress struct {
|
|
Action
|
|
ProgressPercent float64
|
|
LeftSecond float64
|
|
CurrentSpeed float64
|
|
}
|
|
|
|
// Reset resets the progress manager.
|
|
func (m *Manager) Reset() {
|
|
storesProgressGauge.Reset()
|
|
storesSpeedGauge.Reset()
|
|
storesETAGauge.Reset()
|
|
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
|
|
m.progresses = make(map[uint64]*progressIndicator)
|
|
}
|
|
|
|
// SetPatrolRegionsDurationGetter sets the patrol regions duration getter.
|
|
func (m *Manager) SetPatrolRegionsDurationGetter(getter patrolRegionsDurationGetter) {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
m.patrolRegionsDurationGetter = getter
|
|
}
|
|
|
|
func (m *Manager) addProgress(
|
|
storeID uint64,
|
|
action Action,
|
|
current, total float64,
|
|
) {
|
|
m.progresses[storeID] = newProgressIndicator(
|
|
action,
|
|
current, total,
|
|
m.updateInterval,
|
|
)
|
|
}
|
|
|
|
// GC starts a goroutine to clean up the completed progress.
|
|
func (m *Manager) GC(ctx context.Context) {
|
|
ticker := time.NewTicker(gcInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
m.gcCompletedProgress()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) gcCompletedProgress() {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
exactExpiredDuration := expiredDuration
|
|
failpoint.Inject("gcExpiredTime", func(val failpoint.Value) {
|
|
if s, ok := val.(string); ok {
|
|
var err error
|
|
exactExpiredDuration, err = time.ParseDuration(s)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
})
|
|
exactExpiredTime := time.Now().Add(-exactExpiredDuration)
|
|
for storeID, p := range m.completedProgress {
|
|
if p.completeAt.Before(exactExpiredTime) {
|
|
storeIDStr := strconv.FormatUint(storeID, 10)
|
|
delete(m.completedProgress, storeID)
|
|
storesProgressGauge.DeleteLabelValues("", storeIDStr, string(p.Action))
|
|
storesSpeedGauge.DeleteLabelValues("", storeIDStr, string(p.Action))
|
|
storesETAGauge.DeleteLabelValues("", storeIDStr, string(p.Action))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Manager) markProgressAsFinished(storeID uint64) {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
p, exist := m.progresses[storeID]
|
|
if !exist {
|
|
return
|
|
}
|
|
|
|
p.completeAt = time.Now()
|
|
p.push(p.targetRegionSize)
|
|
m.completedProgress[storeID] = p
|
|
delete(m.progresses, storeID)
|
|
}
|
|
|
|
// UpdateProgress updates the progress of the store.
|
|
func (m *Manager) UpdateProgress(
|
|
store *core.StoreInfo,
|
|
currentRegionSize, threshold float64,
|
|
) {
|
|
if m == nil || store == nil {
|
|
return
|
|
}
|
|
p := m.GetProgressByStoreID(store.GetID())
|
|
if p != nil && ((p.Action == preparingAction && !store.IsPreparing()) ||
|
|
(p.Action == removingAction && !store.IsRemoving())) {
|
|
m.markProgressAsFinished(store.GetID())
|
|
}
|
|
var (
|
|
storeID = store.GetID()
|
|
action Action
|
|
// targetRegionSize is the total region size that need to be added/deleted.
|
|
// - If the store is in preparing state, it represents the total region size
|
|
// that need to be added, and it may be updated during the process.
|
|
// - If the store is in removing state, it represents the total region size
|
|
// that need to be deleted, and it is set in the first time.
|
|
targetRegionSize float64
|
|
)
|
|
switch store.GetNodeState() {
|
|
case metapb.NodeState_Preparing:
|
|
action = preparingAction
|
|
targetRegionSize = threshold
|
|
case metapb.NodeState_Removing:
|
|
action = removingAction
|
|
m.RLock()
|
|
p, exist := m.progresses[storeID]
|
|
if exist && p.Action == removingAction {
|
|
// currentRegionSize represents the current deleted region size.
|
|
currentRegionSize = math.Max(0, p.targetRegionSize-currentRegionSize)
|
|
} else {
|
|
// targetRegionSize represents the total region size that need to be
|
|
// deleted. It is set in the first time when the store is in removing state.
|
|
targetRegionSize = currentRegionSize
|
|
currentRegionSize = 0
|
|
}
|
|
m.RUnlock()
|
|
default:
|
|
return
|
|
}
|
|
|
|
m.updateStoreProgress(storeID, action, currentRegionSize, targetRegionSize)
|
|
}
|
|
|
|
func (m *Manager) updateStoreProgress(
|
|
storeID uint64,
|
|
action Action,
|
|
currentRegionSize, targetRegionSize float64,
|
|
) {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
|
|
p, exist := m.progresses[storeID]
|
|
if !exist || p.Action != action {
|
|
m.addProgress(storeID, action, currentRegionSize, targetRegionSize)
|
|
return
|
|
}
|
|
|
|
// The targetRegionSize of the preparing progress may be updated.
|
|
if p.targetRegionSize < targetRegionSize {
|
|
p.targetRegionSize = targetRegionSize
|
|
}
|
|
if action == removingAction {
|
|
// If the number of regions is large, each round of PatrolRegion takes a
|
|
// long time. The regions of the offline store may have not been scanned
|
|
// during some updateInterval. In this case, if the window is not large
|
|
// enough, the offline speed will vary greatly, which is not in line
|
|
// with expectations. Therefore, we adjust the window size based on the
|
|
// time consumed by PatrolRegion to avoid excessive fluctuations in the
|
|
// offline speed, which may cause misleading results.
|
|
p.adjustWindowLength(m.patrolRegionsDurationGetter.GetPatrolRegionsDuration())
|
|
}
|
|
|
|
p.push(currentRegionSize)
|
|
|
|
storeLabel := strconv.FormatUint(storeID, 10)
|
|
// For now, we don't need to record the address of the store. We can record
|
|
// them when we need it.
|
|
storesProgressGauge.WithLabelValues("", storeLabel, string(action)).Set(p.ProgressPercent)
|
|
storesSpeedGauge.WithLabelValues("", storeLabel, string(action)).Set(p.CurrentSpeed)
|
|
storesETAGauge.WithLabelValues("", storeLabel, string(action)).Set(p.LeftSecond)
|
|
}
|
|
|
|
// GetProgressByStoreID gets progresses by the store id.
|
|
func (m *Manager) GetProgressByStoreID(storeID uint64) *Progress {
|
|
m.RLock()
|
|
defer m.RUnlock()
|
|
|
|
p, exist := m.progresses[storeID]
|
|
if !exist {
|
|
return nil
|
|
}
|
|
return p.Progress
|
|
}
|
|
|
|
// GetAverageProgressByAction gets the average progress of all stores
|
|
func (m *Manager) GetAverageProgressByAction(action Action) *Progress {
|
|
m.RLock()
|
|
defer m.RUnlock()
|
|
|
|
var (
|
|
totalProgressPercent, totalLeftSeconds, totalCurrentSpeed float64
|
|
count int
|
|
)
|
|
for _, p := range m.progresses {
|
|
if p.Action == action {
|
|
totalProgressPercent += p.ProgressPercent
|
|
totalLeftSeconds += p.LeftSecond
|
|
totalCurrentSpeed += p.CurrentSpeed
|
|
count++
|
|
}
|
|
}
|
|
if count == 0 {
|
|
return nil
|
|
}
|
|
if math.IsInf(totalLeftSeconds, 1) {
|
|
totalLeftSeconds = math.MaxFloat64
|
|
} else {
|
|
totalLeftSeconds /= float64(count)
|
|
}
|
|
|
|
return &Progress{
|
|
Action: action,
|
|
ProgressPercent: totalProgressPercent / float64(count),
|
|
LeftSecond: totalLeftSeconds,
|
|
CurrentSpeed: totalCurrentSpeed / float64(count),
|
|
}
|
|
}
|