mirror of https://github.com/tikv/pd.git
471 lines
15 KiB
Go
471 lines
15 KiB
Go
// Copyright 2016 TiKV Project Authors.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"encoding/json"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/pingcap/kvproto/pkg/metapb"
|
|
"github.com/pingcap/log"
|
|
|
|
"github.com/tikv/pd/pkg/core"
|
|
"github.com/tikv/pd/pkg/core/storelimit"
|
|
"github.com/tikv/pd/pkg/encryption"
|
|
"github.com/tikv/pd/pkg/errs"
|
|
"github.com/tikv/pd/pkg/mcs/utils/constant"
|
|
"github.com/tikv/pd/pkg/progress"
|
|
"github.com/tikv/pd/pkg/schedule"
|
|
sc "github.com/tikv/pd/pkg/schedule/config"
|
|
sche "github.com/tikv/pd/pkg/schedule/core"
|
|
"github.com/tikv/pd/pkg/schedule/handler"
|
|
"github.com/tikv/pd/pkg/schedule/schedulers"
|
|
"github.com/tikv/pd/pkg/schedule/types"
|
|
"github.com/tikv/pd/pkg/statistics"
|
|
"github.com/tikv/pd/pkg/statistics/utils"
|
|
"github.com/tikv/pd/pkg/storage"
|
|
"github.com/tikv/pd/pkg/utils/apiutil"
|
|
"github.com/tikv/pd/pkg/utils/syncutil"
|
|
"github.com/tikv/pd/server/cluster"
|
|
"github.com/tikv/pd/server/config"
|
|
)
|
|
|
|
// SchedulerConfigHandlerPath is the api router path of the schedule config handler.
|
|
var SchedulerConfigHandlerPath = "/pd/api/v1/scheduler-config"
|
|
|
|
type server struct {
|
|
*Server
|
|
}
|
|
|
|
// GetCoordinator returns the coordinator.
|
|
func (s *server) GetCoordinator() *schedule.Coordinator {
|
|
c := s.GetRaftCluster()
|
|
if c == nil {
|
|
return nil
|
|
}
|
|
return c.GetCoordinator()
|
|
}
|
|
|
|
// GetCluster returns RaftCluster.
|
|
func (s *server) GetCluster() sche.SchedulerCluster {
|
|
return s.GetRaftCluster()
|
|
}
|
|
|
|
// Handler is a helper to export methods to handle API/RPC requests.
|
|
type Handler struct {
|
|
*handler.Handler
|
|
s *Server
|
|
opt *config.PersistOptions
|
|
pluginChMap map[string]chan string
|
|
pluginChMapLock syncutil.RWMutex
|
|
}
|
|
|
|
func newHandler(s *Server) *Handler {
|
|
h := handler.NewHandler(&server{
|
|
Server: s,
|
|
})
|
|
return &Handler{
|
|
Handler: h,
|
|
s: s,
|
|
opt: s.persistOptions,
|
|
pluginChMap: make(map[string]chan string),
|
|
pluginChMapLock: syncutil.RWMutex{},
|
|
}
|
|
}
|
|
|
|
// GetRaftCluster returns RaftCluster.
|
|
func (h *Handler) GetRaftCluster() (*cluster.RaftCluster, error) {
|
|
rc := h.s.GetRaftCluster()
|
|
if rc == nil {
|
|
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
|
|
}
|
|
return rc, nil
|
|
}
|
|
|
|
// IsSchedulerExisted returns whether scheduler is existed.
|
|
func (h *Handler) IsSchedulerExisted(name string) (bool, error) {
|
|
rc, err := h.GetRaftCluster()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return rc.GetCoordinator().GetSchedulersController().IsSchedulerExisted(name)
|
|
}
|
|
|
|
// GetScheduleConfig returns ScheduleConfig.
|
|
func (h *Handler) GetScheduleConfig() *sc.ScheduleConfig {
|
|
return h.s.GetScheduleConfig()
|
|
}
|
|
|
|
// GetHotRegionsWriteInterval gets interval for PD to store Hot Region information..
|
|
func (h *Handler) GetHotRegionsWriteInterval() time.Duration {
|
|
return h.opt.GetHotRegionsWriteInterval()
|
|
}
|
|
|
|
// GetHotRegionsReservedDays gets days hot region information is kept.
|
|
func (h *Handler) GetHotRegionsReservedDays() uint64 {
|
|
return h.opt.GetHotRegionsReservedDays()
|
|
}
|
|
|
|
// HistoryHotRegionsRequest wrap request condition from tidb.
|
|
// it is request from tidb
|
|
type HistoryHotRegionsRequest struct {
|
|
StartTime int64 `json:"start_time,omitempty"`
|
|
EndTime int64 `json:"end_time,omitempty"`
|
|
RegionIDs []uint64 `json:"region_ids,omitempty"`
|
|
StoreIDs []uint64 `json:"store_ids,omitempty"`
|
|
PeerIDs []uint64 `json:"peer_ids,omitempty"`
|
|
IsLearners []bool `json:"is_learners,omitempty"`
|
|
IsLeaders []bool `json:"is_leaders,omitempty"`
|
|
HotRegionTypes []string `json:"hot_region_type,omitempty"`
|
|
}
|
|
|
|
// GetAllRequestHistoryHotRegion gets all hot region info in HistoryHotRegion form.
|
|
func (h *Handler) GetAllRequestHistoryHotRegion(request *HistoryHotRegionsRequest) (*storage.HistoryHotRegions, error) {
|
|
var hotRegionTypes = storage.HotRegionTypes
|
|
if len(request.HotRegionTypes) != 0 {
|
|
hotRegionTypes = request.HotRegionTypes
|
|
}
|
|
iter := h.GetHistoryHotRegionIter(hotRegionTypes, request.StartTime, request.EndTime)
|
|
var results []*storage.HistoryHotRegion
|
|
regionSet, storeSet, peerSet, learnerSet, leaderSet :=
|
|
make(map[uint64]bool), make(map[uint64]bool),
|
|
make(map[uint64]bool), make(map[bool]bool), make(map[bool]bool)
|
|
for _, id := range request.RegionIDs {
|
|
regionSet[id] = true
|
|
}
|
|
for _, id := range request.StoreIDs {
|
|
storeSet[id] = true
|
|
}
|
|
for _, id := range request.PeerIDs {
|
|
peerSet[id] = true
|
|
}
|
|
for _, isLearner := range request.IsLearners {
|
|
learnerSet[isLearner] = true
|
|
}
|
|
for _, isLeader := range request.IsLeaders {
|
|
leaderSet[isLeader] = true
|
|
}
|
|
var next *storage.HistoryHotRegion
|
|
var err error
|
|
for next, err = iter.Next(); next != nil && err == nil; next, err = iter.Next() {
|
|
if len(regionSet) != 0 && !regionSet[next.RegionID] {
|
|
continue
|
|
}
|
|
if len(storeSet) != 0 && !storeSet[next.StoreID] {
|
|
continue
|
|
}
|
|
if len(peerSet) != 0 && !peerSet[next.PeerID] {
|
|
continue
|
|
}
|
|
if !learnerSet[next.IsLearner] {
|
|
continue
|
|
}
|
|
if !leaderSet[next.IsLeader] {
|
|
continue
|
|
}
|
|
results = append(results, next)
|
|
}
|
|
return &storage.HistoryHotRegions{
|
|
HistoryHotRegion: results,
|
|
}, err
|
|
}
|
|
|
|
// AddScheduler adds a scheduler.
|
|
func (h *Handler) AddScheduler(tp types.CheckerSchedulerType, args ...string) error {
|
|
c, err := h.GetRaftCluster()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var removeSchedulerCb func(string) error
|
|
if c.IsServiceIndependent(constant.SchedulingServiceName) {
|
|
removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveSchedulerHandler
|
|
} else {
|
|
removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveScheduler
|
|
}
|
|
s, err := schedulers.CreateScheduler(tp, c.GetOperatorController(), h.s.storage, schedulers.ConfigSliceDecoder(tp, args), removeSchedulerCb)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args))
|
|
if c.IsServiceIndependent(constant.SchedulingServiceName) {
|
|
if err = c.AddSchedulerHandler(s, args...); err != nil {
|
|
log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err))
|
|
return err
|
|
}
|
|
log.Info("add scheduler handler successfully", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args))
|
|
} else {
|
|
if err = c.AddScheduler(s, args...); err != nil {
|
|
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err))
|
|
return err
|
|
}
|
|
log.Info("add scheduler successfully", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args))
|
|
}
|
|
if err = h.opt.Persist(c.GetStorage()); err != nil {
|
|
log.Error("can not persist scheduler config", errs.ZapError(err))
|
|
return err
|
|
}
|
|
log.Info("persist scheduler config successfully", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args))
|
|
return nil
|
|
}
|
|
|
|
// RemoveScheduler removes a scheduler by name.
|
|
func (h *Handler) RemoveScheduler(name string) error {
|
|
c, err := h.GetRaftCluster()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if c.IsServiceIndependent(constant.SchedulingServiceName) {
|
|
if err = c.RemoveSchedulerHandler(name); err != nil {
|
|
log.Error("can not remove scheduler handler", zap.String("scheduler-name", name), errs.ZapError(err))
|
|
} else {
|
|
log.Info("remove scheduler handler successfully", zap.String("scheduler-name", name))
|
|
}
|
|
} else {
|
|
if err = c.RemoveScheduler(name); err != nil {
|
|
log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
|
|
} else {
|
|
log.Info("remove scheduler successfully", zap.String("scheduler-name", name))
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// SetAllStoresLimit is used to set limit of all stores.
|
|
func (h *Handler) SetAllStoresLimit(ratePerMin float64, limitType storelimit.Type) error {
|
|
c, err := h.GetRaftCluster()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return c.SetAllStoresLimit(limitType, ratePerMin)
|
|
}
|
|
|
|
// SetAllStoresLimitTTL is used to set limit of all stores with ttl
|
|
func (h *Handler) SetAllStoresLimitTTL(ratePerMin float64, limitType storelimit.Type, ttl time.Duration) error {
|
|
c, err := h.GetRaftCluster()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return c.SetAllStoresLimitTTL(limitType, ratePerMin, ttl)
|
|
}
|
|
|
|
// SetLabelStoresLimit is used to set limit of label stores.
|
|
func (h *Handler) SetLabelStoresLimit(ratePerMin float64, limitType storelimit.Type, labels []*metapb.StoreLabel) error {
|
|
c, err := h.GetRaftCluster()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, store := range c.GetStores() {
|
|
for _, label := range labels {
|
|
for _, sl := range store.GetLabels() {
|
|
if label.Key == sl.Key && label.Value == sl.Value {
|
|
// TODO: need to handle some of stores are persisted, and some of stores are not.
|
|
_ = c.SetStoreLimit(store.GetID(), limitType, ratePerMin)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetStoreLimit is used to set the limit of a store.
|
|
func (h *Handler) SetStoreLimit(storeID uint64, ratePerMin float64, limitType storelimit.Type) error {
|
|
c, err := h.GetRaftCluster()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return c.SetStoreLimit(storeID, limitType, ratePerMin)
|
|
}
|
|
|
|
// GetRegionsByType gets the region with specified type.
|
|
func (h *Handler) GetRegionsByType(typ statistics.RegionStatisticType) ([]*core.RegionInfo, error) {
|
|
c := h.s.GetRaftCluster()
|
|
if c == nil {
|
|
return nil, errs.ErrNotBootstrapped.FastGenByArgs()
|
|
}
|
|
return c.GetRegionStatsByType(typ), nil
|
|
}
|
|
|
|
// GetSchedulerConfigHandler gets the handler of schedulers.
|
|
func (h *Handler) GetSchedulerConfigHandler() (http.Handler, error) {
|
|
c, err := h.GetRaftCluster()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
mux := http.NewServeMux()
|
|
for name, handler := range c.GetSchedulerHandlers() {
|
|
prefix := path.Join(SchedulerConfigHandlerPath, name)
|
|
urlPath := prefix + "/"
|
|
mux.Handle(urlPath, http.StripPrefix(prefix, handler))
|
|
}
|
|
return mux, nil
|
|
}
|
|
|
|
// ResetTS resets the ts with specified tso.
|
|
func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, _ uint32) error {
|
|
log.Info("reset-ts",
|
|
zap.Uint64("new-ts", ts),
|
|
zap.Bool("ignore-smaller", ignoreSmaller),
|
|
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck))
|
|
tsoAllocator := h.s.GetTSOAllocator()
|
|
if tsoAllocator == nil {
|
|
return errs.ErrServerNotStarted
|
|
}
|
|
return tsoAllocator.SetTSO(ts, ignoreSmaller, skipUpperBoundCheck)
|
|
}
|
|
|
|
// GetProgressByID returns the progress details for a given store ID.
|
|
func (h *Handler) GetProgressByID(storeID uint64) (*progress.Progress, error) {
|
|
return h.s.GetRaftCluster().GetProgressByID(storeID)
|
|
}
|
|
|
|
// GetProgressByAction returns the progress details for a given action.
|
|
func (h *Handler) GetProgressByAction(action string) (*progress.Progress, error) {
|
|
return h.s.GetRaftCluster().GetProgressByAction(action)
|
|
}
|
|
|
|
// PluginLoad loads the plugin referenced by the pluginPath
|
|
func (h *Handler) PluginLoad(pluginPath string) error {
|
|
h.pluginChMapLock.Lock()
|
|
defer h.pluginChMapLock.Unlock()
|
|
cluster, err := h.GetRaftCluster()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c := cluster.GetCoordinator()
|
|
ch := make(chan string)
|
|
h.pluginChMap[pluginPath] = ch
|
|
|
|
// make sure path is in data dir
|
|
filePath, err := filepath.Abs(pluginPath)
|
|
if err != nil || !apiutil.IsPathInDirectory(filePath, h.s.GetConfig().DataDir) {
|
|
return errs.ErrFilePathAbs.Wrap(err)
|
|
}
|
|
|
|
c.LoadPlugin(pluginPath, ch)
|
|
return nil
|
|
}
|
|
|
|
// PluginUnload unloads the plugin referenced by the pluginPath
|
|
func (h *Handler) PluginUnload(pluginPath string) error {
|
|
h.pluginChMapLock.Lock()
|
|
defer h.pluginChMapLock.Unlock()
|
|
if ch, ok := h.pluginChMap[pluginPath]; ok {
|
|
ch <- schedule.PluginUnload
|
|
return nil
|
|
}
|
|
return errs.ErrPluginNotFound.FastGenByArgs(pluginPath)
|
|
}
|
|
|
|
// GetAddr returns the server urls for clients.
|
|
func (h *Handler) GetAddr() string {
|
|
return h.s.GetAddr()
|
|
}
|
|
|
|
// SetStoreLimitTTL set storeLimit with ttl
|
|
func (h *Handler) SetStoreLimitTTL(data string, value float64, ttl time.Duration) error {
|
|
return h.s.SaveTTLConfig(map[string]any{
|
|
data: value,
|
|
}, ttl)
|
|
}
|
|
|
|
// IsLeader return true if this server is leader
|
|
func (h *Handler) IsLeader() bool {
|
|
return h.s.member.IsServing()
|
|
}
|
|
|
|
// GetHistoryHotRegions get hot region info in HistoryHotRegion form.
|
|
func (h *Handler) GetHistoryHotRegions(typ utils.RWType) ([]storage.HistoryHotRegion, error) {
|
|
hotRegions, err := h.GetHotRegions(typ)
|
|
if hotRegions == nil || err != nil {
|
|
return nil, err
|
|
}
|
|
hotPeers := hotRegions.AsPeer
|
|
return h.packHotRegions(hotPeers, typ.String())
|
|
}
|
|
|
|
func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotRegionType string) (historyHotRegions []storage.HistoryHotRegion, err error) {
|
|
c, err := h.GetRaftCluster()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, hotPeersStat := range hotPeersStat {
|
|
stats := hotPeersStat.Stats
|
|
for _, hotPeerStat := range stats {
|
|
region := c.GetRegion(hotPeerStat.RegionID)
|
|
if region == nil {
|
|
continue
|
|
}
|
|
meta := region.GetMeta()
|
|
meta, err := encryption.EncryptRegion(meta, h.s.encryptionKeyManager)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
stat := storage.HistoryHotRegion{
|
|
// store in ms.
|
|
// TODO: distinguish store heartbeat interval and region heartbeat interval
|
|
// read statistic from store heartbeat, write statistic from region heartbeat
|
|
UpdateTime: int64(region.GetInterval().GetEndTimestamp() * 1000),
|
|
RegionID: hotPeerStat.RegionID,
|
|
StoreID: hotPeerStat.StoreID,
|
|
PeerID: region.GetStorePeer(hotPeerStat.StoreID).GetId(),
|
|
IsLeader: hotPeerStat.IsLeader,
|
|
IsLearner: core.IsLearner(region.GetPeer(hotPeerStat.StoreID)),
|
|
HotDegree: int64(hotPeerStat.HotDegree),
|
|
FlowBytes: hotPeerStat.ByteRate,
|
|
KeyRate: hotPeerStat.KeyRate,
|
|
QueryRate: hotPeerStat.QueryRate,
|
|
StartKey: string(region.GetStartKey()),
|
|
EndKey: string(region.GetEndKey()),
|
|
EncryptionMeta: meta.GetEncryptionMeta(),
|
|
HotRegionType: hotRegionType,
|
|
}
|
|
historyHotRegions = append(historyHotRegions, stat)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// GetHistoryHotRegionIter return a iter which iter all qualified item .
|
|
func (h *Handler) GetHistoryHotRegionIter(
|
|
hotRegionTypes []string,
|
|
startTime, endTime int64,
|
|
) storage.HotRegionStorageIterator {
|
|
iter := h.s.hotRegionStorage.NewIterator(hotRegionTypes, startTime, endTime)
|
|
return iter
|
|
}
|
|
|
|
// RedirectSchedulerUpdate update scheduler config. Export this func to help handle damaged store.
|
|
func (h *Handler) RedirectSchedulerUpdate(name string, storeID float64) error {
|
|
input := make(map[string]any)
|
|
input["name"] = name
|
|
input["store_id"] = storeID
|
|
updateURL, err := url.JoinPath(h.GetAddr(), SchedulerConfigHandlerPath, name, "config")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
body, err := json.Marshal(input)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return apiutil.PostJSONIgnoreResp(h.s.GetHTTPClient(), updateURL, body)
|
|
}
|