pd/server/handler.go

471 lines
15 KiB
Go

// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"encoding/json"
"net/http"
"net/url"
"path"
"path/filepath"
"time"
"go.uber.org/zap"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/encryption"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/progress"
"github.com/tikv/pd/pkg/schedule"
sc "github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/utils"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/config"
)
// SchedulerConfigHandlerPath is the api router path of the schedule config handler.
var SchedulerConfigHandlerPath = "/pd/api/v1/scheduler-config"
type server struct {
*Server
}
// GetCoordinator returns the coordinator.
func (s *server) GetCoordinator() *schedule.Coordinator {
c := s.GetRaftCluster()
if c == nil {
return nil
}
return c.GetCoordinator()
}
// GetCluster returns RaftCluster.
func (s *server) GetCluster() sche.SchedulerCluster {
return s.GetRaftCluster()
}
// Handler is a helper to export methods to handle API/RPC requests.
type Handler struct {
*handler.Handler
s *Server
opt *config.PersistOptions
pluginChMap map[string]chan string
pluginChMapLock syncutil.RWMutex
}
func newHandler(s *Server) *Handler {
h := handler.NewHandler(&server{
Server: s,
})
return &Handler{
Handler: h,
s: s,
opt: s.persistOptions,
pluginChMap: make(map[string]chan string),
pluginChMapLock: syncutil.RWMutex{},
}
}
// GetRaftCluster returns RaftCluster.
func (h *Handler) GetRaftCluster() (*cluster.RaftCluster, error) {
rc := h.s.GetRaftCluster()
if rc == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
return rc, nil
}
// IsSchedulerExisted returns whether scheduler is existed.
func (h *Handler) IsSchedulerExisted(name string) (bool, error) {
rc, err := h.GetRaftCluster()
if err != nil {
return false, err
}
return rc.GetCoordinator().GetSchedulersController().IsSchedulerExisted(name)
}
// GetScheduleConfig returns ScheduleConfig.
func (h *Handler) GetScheduleConfig() *sc.ScheduleConfig {
return h.s.GetScheduleConfig()
}
// GetHotRegionsWriteInterval gets interval for PD to store Hot Region information..
func (h *Handler) GetHotRegionsWriteInterval() time.Duration {
return h.opt.GetHotRegionsWriteInterval()
}
// GetHotRegionsReservedDays gets days hot region information is kept.
func (h *Handler) GetHotRegionsReservedDays() uint64 {
return h.opt.GetHotRegionsReservedDays()
}
// HistoryHotRegionsRequest wrap request condition from tidb.
// it is request from tidb
type HistoryHotRegionsRequest struct {
StartTime int64 `json:"start_time,omitempty"`
EndTime int64 `json:"end_time,omitempty"`
RegionIDs []uint64 `json:"region_ids,omitempty"`
StoreIDs []uint64 `json:"store_ids,omitempty"`
PeerIDs []uint64 `json:"peer_ids,omitempty"`
IsLearners []bool `json:"is_learners,omitempty"`
IsLeaders []bool `json:"is_leaders,omitempty"`
HotRegionTypes []string `json:"hot_region_type,omitempty"`
}
// GetAllRequestHistoryHotRegion gets all hot region info in HistoryHotRegion form.
func (h *Handler) GetAllRequestHistoryHotRegion(request *HistoryHotRegionsRequest) (*storage.HistoryHotRegions, error) {
var hotRegionTypes = storage.HotRegionTypes
if len(request.HotRegionTypes) != 0 {
hotRegionTypes = request.HotRegionTypes
}
iter := h.GetHistoryHotRegionIter(hotRegionTypes, request.StartTime, request.EndTime)
var results []*storage.HistoryHotRegion
regionSet, storeSet, peerSet, learnerSet, leaderSet :=
make(map[uint64]bool), make(map[uint64]bool),
make(map[uint64]bool), make(map[bool]bool), make(map[bool]bool)
for _, id := range request.RegionIDs {
regionSet[id] = true
}
for _, id := range request.StoreIDs {
storeSet[id] = true
}
for _, id := range request.PeerIDs {
peerSet[id] = true
}
for _, isLearner := range request.IsLearners {
learnerSet[isLearner] = true
}
for _, isLeader := range request.IsLeaders {
leaderSet[isLeader] = true
}
var next *storage.HistoryHotRegion
var err error
for next, err = iter.Next(); next != nil && err == nil; next, err = iter.Next() {
if len(regionSet) != 0 && !regionSet[next.RegionID] {
continue
}
if len(storeSet) != 0 && !storeSet[next.StoreID] {
continue
}
if len(peerSet) != 0 && !peerSet[next.PeerID] {
continue
}
if !learnerSet[next.IsLearner] {
continue
}
if !leaderSet[next.IsLeader] {
continue
}
results = append(results, next)
}
return &storage.HistoryHotRegions{
HistoryHotRegion: results,
}, err
}
// AddScheduler adds a scheduler.
func (h *Handler) AddScheduler(tp types.CheckerSchedulerType, args ...string) error {
c, err := h.GetRaftCluster()
if err != nil {
return err
}
var removeSchedulerCb func(string) error
if c.IsServiceIndependent(constant.SchedulingServiceName) {
removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveSchedulerHandler
} else {
removeSchedulerCb = c.GetCoordinator().GetSchedulersController().RemoveScheduler
}
s, err := schedulers.CreateScheduler(tp, c.GetOperatorController(), h.s.storage, schedulers.ConfigSliceDecoder(tp, args), removeSchedulerCb)
if err != nil {
return err
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args))
if c.IsServiceIndependent(constant.SchedulingServiceName) {
if err = c.AddSchedulerHandler(s, args...); err != nil {
log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err))
return err
}
log.Info("add scheduler handler successfully", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args))
} else {
if err = c.AddScheduler(s, args...); err != nil {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err))
return err
}
log.Info("add scheduler successfully", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args))
}
if err = h.opt.Persist(c.GetStorage()); err != nil {
log.Error("can not persist scheduler config", errs.ZapError(err))
return err
}
log.Info("persist scheduler config successfully", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args))
return nil
}
// RemoveScheduler removes a scheduler by name.
func (h *Handler) RemoveScheduler(name string) error {
c, err := h.GetRaftCluster()
if err != nil {
return err
}
if c.IsServiceIndependent(constant.SchedulingServiceName) {
if err = c.RemoveSchedulerHandler(name); err != nil {
log.Error("can not remove scheduler handler", zap.String("scheduler-name", name), errs.ZapError(err))
} else {
log.Info("remove scheduler handler successfully", zap.String("scheduler-name", name))
}
} else {
if err = c.RemoveScheduler(name); err != nil {
log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
} else {
log.Info("remove scheduler successfully", zap.String("scheduler-name", name))
}
}
return err
}
// SetAllStoresLimit is used to set limit of all stores.
func (h *Handler) SetAllStoresLimit(ratePerMin float64, limitType storelimit.Type) error {
c, err := h.GetRaftCluster()
if err != nil {
return err
}
return c.SetAllStoresLimit(limitType, ratePerMin)
}
// SetAllStoresLimitTTL is used to set limit of all stores with ttl
func (h *Handler) SetAllStoresLimitTTL(ratePerMin float64, limitType storelimit.Type, ttl time.Duration) error {
c, err := h.GetRaftCluster()
if err != nil {
return err
}
return c.SetAllStoresLimitTTL(limitType, ratePerMin, ttl)
}
// SetLabelStoresLimit is used to set limit of label stores.
func (h *Handler) SetLabelStoresLimit(ratePerMin float64, limitType storelimit.Type, labels []*metapb.StoreLabel) error {
c, err := h.GetRaftCluster()
if err != nil {
return err
}
for _, store := range c.GetStores() {
for _, label := range labels {
for _, sl := range store.GetLabels() {
if label.Key == sl.Key && label.Value == sl.Value {
// TODO: need to handle some of stores are persisted, and some of stores are not.
_ = c.SetStoreLimit(store.GetID(), limitType, ratePerMin)
}
}
}
}
return nil
}
// SetStoreLimit is used to set the limit of a store.
func (h *Handler) SetStoreLimit(storeID uint64, ratePerMin float64, limitType storelimit.Type) error {
c, err := h.GetRaftCluster()
if err != nil {
return err
}
return c.SetStoreLimit(storeID, limitType, ratePerMin)
}
// GetRegionsByType gets the region with specified type.
func (h *Handler) GetRegionsByType(typ statistics.RegionStatisticType) ([]*core.RegionInfo, error) {
c := h.s.GetRaftCluster()
if c == nil {
return nil, errs.ErrNotBootstrapped.FastGenByArgs()
}
return c.GetRegionStatsByType(typ), nil
}
// GetSchedulerConfigHandler gets the handler of schedulers.
func (h *Handler) GetSchedulerConfigHandler() (http.Handler, error) {
c, err := h.GetRaftCluster()
if err != nil {
return nil, err
}
mux := http.NewServeMux()
for name, handler := range c.GetSchedulerHandlers() {
prefix := path.Join(SchedulerConfigHandlerPath, name)
urlPath := prefix + "/"
mux.Handle(urlPath, http.StripPrefix(prefix, handler))
}
return mux, nil
}
// ResetTS resets the ts with specified tso.
func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, _ uint32) error {
log.Info("reset-ts",
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck))
tsoAllocator := h.s.GetTSOAllocator()
if tsoAllocator == nil {
return errs.ErrServerNotStarted
}
return tsoAllocator.SetTSO(ts, ignoreSmaller, skipUpperBoundCheck)
}
// GetProgressByID returns the progress details for a given store ID.
func (h *Handler) GetProgressByID(storeID uint64) (*progress.Progress, error) {
return h.s.GetRaftCluster().GetProgressByID(storeID)
}
// GetProgressByAction returns the progress details for a given action.
func (h *Handler) GetProgressByAction(action string) (*progress.Progress, error) {
return h.s.GetRaftCluster().GetProgressByAction(action)
}
// PluginLoad loads the plugin referenced by the pluginPath
func (h *Handler) PluginLoad(pluginPath string) error {
h.pluginChMapLock.Lock()
defer h.pluginChMapLock.Unlock()
cluster, err := h.GetRaftCluster()
if err != nil {
return err
}
c := cluster.GetCoordinator()
ch := make(chan string)
h.pluginChMap[pluginPath] = ch
// make sure path is in data dir
filePath, err := filepath.Abs(pluginPath)
if err != nil || !apiutil.IsPathInDirectory(filePath, h.s.GetConfig().DataDir) {
return errs.ErrFilePathAbs.Wrap(err)
}
c.LoadPlugin(pluginPath, ch)
return nil
}
// PluginUnload unloads the plugin referenced by the pluginPath
func (h *Handler) PluginUnload(pluginPath string) error {
h.pluginChMapLock.Lock()
defer h.pluginChMapLock.Unlock()
if ch, ok := h.pluginChMap[pluginPath]; ok {
ch <- schedule.PluginUnload
return nil
}
return errs.ErrPluginNotFound.FastGenByArgs(pluginPath)
}
// GetAddr returns the server urls for clients.
func (h *Handler) GetAddr() string {
return h.s.GetAddr()
}
// SetStoreLimitTTL set storeLimit with ttl
func (h *Handler) SetStoreLimitTTL(data string, value float64, ttl time.Duration) error {
return h.s.SaveTTLConfig(map[string]any{
data: value,
}, ttl)
}
// IsLeader return true if this server is leader
func (h *Handler) IsLeader() bool {
return h.s.member.IsServing()
}
// GetHistoryHotRegions get hot region info in HistoryHotRegion form.
func (h *Handler) GetHistoryHotRegions(typ utils.RWType) ([]storage.HistoryHotRegion, error) {
hotRegions, err := h.GetHotRegions(typ)
if hotRegions == nil || err != nil {
return nil, err
}
hotPeers := hotRegions.AsPeer
return h.packHotRegions(hotPeers, typ.String())
}
func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotRegionType string) (historyHotRegions []storage.HistoryHotRegion, err error) {
c, err := h.GetRaftCluster()
if err != nil {
return nil, err
}
for _, hotPeersStat := range hotPeersStat {
stats := hotPeersStat.Stats
for _, hotPeerStat := range stats {
region := c.GetRegion(hotPeerStat.RegionID)
if region == nil {
continue
}
meta := region.GetMeta()
meta, err := encryption.EncryptRegion(meta, h.s.encryptionKeyManager)
if err != nil {
return nil, err
}
stat := storage.HistoryHotRegion{
// store in ms.
// TODO: distinguish store heartbeat interval and region heartbeat interval
// read statistic from store heartbeat, write statistic from region heartbeat
UpdateTime: int64(region.GetInterval().GetEndTimestamp() * 1000),
RegionID: hotPeerStat.RegionID,
StoreID: hotPeerStat.StoreID,
PeerID: region.GetStorePeer(hotPeerStat.StoreID).GetId(),
IsLeader: hotPeerStat.IsLeader,
IsLearner: core.IsLearner(region.GetPeer(hotPeerStat.StoreID)),
HotDegree: int64(hotPeerStat.HotDegree),
FlowBytes: hotPeerStat.ByteRate,
KeyRate: hotPeerStat.KeyRate,
QueryRate: hotPeerStat.QueryRate,
StartKey: string(region.GetStartKey()),
EndKey: string(region.GetEndKey()),
EncryptionMeta: meta.GetEncryptionMeta(),
HotRegionType: hotRegionType,
}
historyHotRegions = append(historyHotRegions, stat)
}
}
return
}
// GetHistoryHotRegionIter return a iter which iter all qualified item .
func (h *Handler) GetHistoryHotRegionIter(
hotRegionTypes []string,
startTime, endTime int64,
) storage.HotRegionStorageIterator {
iter := h.s.hotRegionStorage.NewIterator(hotRegionTypes, startTime, endTime)
return iter
}
// RedirectSchedulerUpdate update scheduler config. Export this func to help handle damaged store.
func (h *Handler) RedirectSchedulerUpdate(name string, storeID float64) error {
input := make(map[string]any)
input["name"] = name
input["store_id"] = storeID
updateURL, err := url.JoinPath(h.GetAddr(), SchedulerConfigHandlerPath, name, "config")
if err != nil {
return err
}
body, err := json.Marshal(input)
if err != nil {
return err
}
return apiutil.PostJSONIgnoreResp(h.s.GetHTTPClient(), updateURL, body)
}