pd/plugin/scheduler_example/evict_leader.go

366 lines
11 KiB
Go

// Copyright 2019 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"net/http"
"net/url"
"strconv"
"github.com/gorilla/mux"
"github.com/unrolled/render"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/errs"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/keyutil"
"github.com/tikv/pd/pkg/utils/syncutil"
)
const (
// EvictLeaderName is evict leader scheduler name.
EvictLeaderName = "user-evict-leader-scheduler"
// EvictLeaderType is evict leader scheduler type.
EvictLeaderType = "user-evict-leader"
noStoreInSchedulerInfo = "No store in user-evict-leader-scheduler-config"
userEvictLeaderScheduler types.CheckerSchedulerType = "user-evict-leader-scheduler"
)
func init() {
schedulers.RegisterSliceDecoderBuilder(userEvictLeaderScheduler, func(args []string) schedulers.ConfigDecoder {
return func(v any) error {
if len(args) < 1 {
return errors.New("should specify the store-id")
}
conf, ok := v.(*evictLeaderSchedulerConfig)
if !ok {
return errors.New("the config does not exist")
}
id, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return errors.WithStack(err)
}
ranges, err := getKeyRanges(args[1:])
if err != nil {
return errors.WithStack(err)
}
conf.StoreIDWitRanges[id] = ranges
return nil
}
})
schedulers.RegisterScheduler(userEvictLeaderScheduler, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder schedulers.ConfigDecoder, _ ...func(string) error) (schedulers.Scheduler, error) {
conf := &evictLeaderSchedulerConfig{StoreIDWitRanges: make(map[uint64][]keyutil.KeyRange), storage: storage}
if err := decoder(conf); err != nil {
return nil, err
}
conf.cluster = opController.GetCluster()
return newEvictLeaderScheduler(opController, conf), nil
})
}
// SchedulerType returns the type of the scheduler
func SchedulerType() string {
return EvictLeaderType
}
// SchedulerArgs returns the args for the scheduler
func SchedulerArgs() []string {
args := []string{"1"}
return args
}
type evictLeaderSchedulerConfig struct {
mu syncutil.RWMutex
storage endpoint.ConfigStorage
StoreIDWitRanges map[uint64][]keyutil.KeyRange `json:"store-id-ranges"`
cluster *core.BasicCluster
}
// BuildWithArgs builds the config with the args.
func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error {
if len(args) < 1 {
return errors.New("should specify the store-id")
}
id, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return errors.WithStack(err)
}
ranges, err := getKeyRanges(args[1:])
if err != nil {
return errors.WithStack(err)
}
conf.mu.Lock()
defer conf.mu.Unlock()
conf.StoreIDWitRanges[id] = ranges
return nil
}
// Clone clones the config.
func (conf *evictLeaderSchedulerConfig) Clone() *evictLeaderSchedulerConfig {
conf.mu.RLock()
defer conf.mu.RUnlock()
return &evictLeaderSchedulerConfig{
StoreIDWitRanges: conf.StoreIDWitRanges,
}
}
// Persist saves the config.
func (conf *evictLeaderSchedulerConfig) Persist() error {
conf.mu.RLock()
defer conf.mu.RUnlock()
data, err := schedulers.EncodeConfig(conf)
if err != nil {
return err
}
return conf.storage.SaveSchedulerConfig(EvictLeaderName, data)
}
func (conf *evictLeaderSchedulerConfig) getRanges(id uint64) []string {
conf.mu.RLock()
defer conf.mu.RUnlock()
res := make([]string, 0, len(conf.StoreIDWitRanges[id])*2)
for index := range conf.StoreIDWitRanges[id] {
res = append(res, (string)(conf.StoreIDWitRanges[id][index].StartKey), (string)(conf.StoreIDWitRanges[id][index].EndKey))
}
return res
}
type evictLeaderScheduler struct {
*schedulers.BaseScheduler
conf *evictLeaderSchedulerConfig
handler http.Handler
}
// newEvictLeaderScheduler creates an admin scheduler that transfers all leaders
// out of a store.
func newEvictLeaderScheduler(opController *operator.Controller, conf *evictLeaderSchedulerConfig) schedulers.Scheduler {
base := schedulers.NewBaseScheduler(opController, userEvictLeaderScheduler, nil)
handler := newEvictLeaderHandler(conf)
return &evictLeaderScheduler{
BaseScheduler: base,
conf: conf,
handler: handler,
}
}
// ServeHTTP implements the http.Handler interface.
func (s *evictLeaderScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handler.ServeHTTP(w, r)
}
// EncodeConfig implements the Scheduler interface.
func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
return schedulers.EncodeConfig(s.conf)
}
// PrepareConfig ensures the scheduler config is valid.
func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
var res error
for id := range s.conf.StoreIDWitRanges {
if err := cluster.PauseLeaderTransfer(id, constant.In); err != nil {
res = err
}
}
return res
}
// CleanConfig is used to clean the scheduler config.
func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id := range s.conf.StoreIDWitRanges {
cluster.ResumeLeaderTransfer(id, constant.In)
}
}
// IsScheduleAllowed checks if the scheduler is allowed to schedule.
func (s *evictLeaderScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster) bool {
allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetSchedulerConfig().GetLeaderScheduleLimit()
if !allowed {
operator.IncOperatorLimitCounter(s.GetType(), operator.OpLeader)
}
return allowed
}
// Schedule schedules the evict leader operator.
func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*operator.Operator, []plan.Plan) {
ops := make([]*operator.Operator, 0, len(s.conf.StoreIDWitRanges))
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
pendingFilter := filter.NewRegionPendingFilter()
downFilter := filter.NewRegionDownFilter()
for id, ranges := range s.conf.StoreIDWitRanges {
region := filter.SelectOneRegion(cluster.RandLeaderRegions(id, ranges), nil, pendingFilter, downFilter)
if region == nil {
continue
}
target := filter.NewCandidates(s.R, cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetSchedulerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: EvictLeaderName, TransferLeader: true, OperatorLevel: constant.Urgent}).
RandomPick()
if target == nil {
continue
}
op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, target.GetID(), []uint64{}, operator.OpLeader)
if err != nil {
log.Debug("fail to create evict leader operator", errs.ZapError(err))
continue
}
op.SetPriorityLevel(constant.High)
ops = append(ops, op)
}
return ops, nil
}
type evictLeaderHandler struct {
rd *render.Render
config *evictLeaderSchedulerConfig
}
// updateConfig updates the config.
func (handler *evictLeaderHandler) updateConfig(w http.ResponseWriter, r *http.Request) {
var input map[string]any
if err := apiutil.ReadJSONRespondError(handler.rd, w, r.Body, &input); err != nil {
return
}
var args []string
var exists bool
var id uint64
idFloat, ok := input["store_id"].(float64)
if ok {
id = (uint64)(idFloat)
if _, exists = handler.config.StoreIDWitRanges[id]; !exists {
if err := handler.config.cluster.PauseLeaderTransfer(id, constant.In); err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}
args = append(args, strconv.FormatUint(id, 10))
}
ranges, ok := (input["ranges"]).([]string)
if ok {
args = append(args, ranges...)
} else if exists {
args = append(args, handler.config.getRanges(id)...)
}
err := handler.config.BuildWithArgs(args)
if err != nil {
handler.config.mu.Lock()
handler.config.cluster.ResumeLeaderTransfer(id, constant.In)
handler.config.mu.Unlock()
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
err = handler.config.Persist()
if err != nil {
handler.config.mu.Lock()
delete(handler.config.StoreIDWitRanges, id)
handler.config.cluster.ResumeLeaderTransfer(id, constant.In)
handler.config.mu.Unlock()
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
handler.rd.JSON(w, http.StatusOK, nil)
}
func (handler *evictLeaderHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
conf := handler.config.Clone()
handler.rd.JSON(w, http.StatusOK, conf)
}
func (handler *evictLeaderHandler) deleteConfig(w http.ResponseWriter, r *http.Request) {
idStr := mux.Vars(r)["store_id"]
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
handler.config.mu.Lock()
defer handler.config.mu.Unlock()
ranges, exists := handler.config.StoreIDWitRanges[id]
if !exists {
handler.rd.JSON(w, http.StatusInternalServerError, errors.New("the config does not exist"))
return
}
delete(handler.config.StoreIDWitRanges, id)
handler.config.cluster.ResumeLeaderTransfer(id, constant.In)
if err := handler.config.Persist(); err != nil {
handler.config.StoreIDWitRanges[id] = ranges
_ = handler.config.cluster.PauseLeaderTransfer(id, constant.In)
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
var resp any
if len(handler.config.StoreIDWitRanges) == 0 {
resp = noStoreInSchedulerInfo
}
handler.rd.JSON(w, http.StatusOK, resp)
}
func newEvictLeaderHandler(config *evictLeaderSchedulerConfig) http.Handler {
h := &evictLeaderHandler{
config: config,
rd: render.New(render.Options{IndentJSON: true}),
}
router := mux.NewRouter()
router.HandleFunc("/config", h.updateConfig).Methods(http.MethodPost)
router.HandleFunc("/list", h.listConfig).Methods(http.MethodGet)
router.HandleFunc("/delete/{store_id}", h.deleteConfig).Methods(http.MethodDelete)
return router
}
func getKeyRanges(args []string) ([]keyutil.KeyRange, error) {
var ranges []keyutil.KeyRange
for len(args) > 1 {
startKey, err := url.QueryUnescape(args[0])
if err != nil {
return nil, err
}
endKey, err := url.QueryUnescape(args[1])
if err != nil {
return nil, err
}
args = args[2:]
ranges = append(ranges, keyutil.NewKeyRange(startKey, endKey))
}
if len(ranges) == 0 {
return []keyutil.KeyRange{keyutil.NewKeyRange("", "")}, nil
}
return ranges, nil
}