pd/server/server.go

2167 lines
70 KiB
Go

// Copyright 2016 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"context"
"fmt"
"math"
"math/rand"
"net/http"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/coreos/go-semver/semver"
"github.com/gogo/protobuf/proto"
"github.com/gorilla/mux"
"go.etcd.io/etcd/api/v3/mvccpb"
etcdtypes "go.etcd.io/etcd/client/pkg/v3/types"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/tikv/pd/pkg/audit"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/cgroup"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/encryption"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/gc"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/keyspace"
"github.com/tikv/pd/pkg/keyspace/constant"
ms_server "github.com/tikv/pd/pkg/mcs/metastorage/server"
"github.com/tikv/pd/pkg/mcs/registry"
rm_server "github.com/tikv/pd/pkg/mcs/resourcemanager/server"
mcs "github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/metering"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/replication"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/syncer"
"github.com/tikv/pd/pkg/systimemon"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/jsonutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/pkg/versioninfo"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/config"
_ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/apis/v1" // init API group
_ "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" // init tso API group
)
const (
serverMetricsInterval = time.Minute
pdAPIPrefix = "/pd/"
// PD is name of member.
PD = "PD"
// maxRetryTimesGetServicePrimary is the max retry times for getting primary addr.
// Note: it need to be less than client.defaultPDTimeout
maxRetryTimesGetServicePrimary = 25
// retryIntervalGetServicePrimary is the retry interval for getting primary addr.
retryIntervalGetServicePrimary = 100 * time.Millisecond
lostPDLeaderMaxTimeoutSecs = 10
lostPDLeaderReElectionFactor = 10
)
// EtcdStartTimeout the timeout of the startup etcd.
var EtcdStartTimeout = time.Minute * 5
var (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
etcdTermGauge = etcdStateGauge.WithLabelValues("term")
etcdAppliedIndexGauge = etcdStateGauge.WithLabelValues("appliedIndex")
etcdCommittedIndexGauge = etcdStateGauge.WithLabelValues("committedIndex")
)
type streamWrapper struct {
tsopb.TSO_TsoClient
syncutil.Mutex
}
// Server is the pd server. It implements bs.Server
type Server struct {
diagnosticspb.DiagnosticsServer
// Server state. 0 is not running, 1 is running.
isRunning int64
// Server start timestamp
startTimestamp int64
// Configs and initial fields.
cfg *config.Config
serviceMiddlewareCfg *config.ServiceMiddlewareConfig
etcdCfg *embed.Config
serviceMiddlewarePersistOptions *config.ServiceMiddlewarePersistOptions
persistOptions *config.PersistOptions
handler *Handler
ctx context.Context
serverLoopCtx context.Context
serverLoopCancel func()
serverLoopWg sync.WaitGroup
// for PD leader election.
member *member.Member
// etcd client
client *clientv3.Client
// electionClient is used for leader election.
electionClient *clientv3.Client
// http client
httpClient *http.Client
// Server services.
// for id allocator, we can use one allocator for
// store, region and peer, because we just need
// a unique ID.
idAllocator id.Allocator
// for encryption
encryptionKeyManager *encryption.Manager
// for storage operation.
storage storage.Storage
// GC states manager
gcStateManager *gc.GCStateManager
// keyspace manager
keyspaceManager *keyspace.Manager
// keyspace group manager
keyspaceGroupManager *keyspace.GroupManager
// metering writer
meteringWriter *metering.Writer
// for basicCluster operation.
basicCluster *core.BasicCluster
// for tso.
tsoAllocator *tso.Allocator
// for raft cluster
cluster *cluster.RaftCluster
// For async region heartbeat.
hbStreams *hbstream.HeartbeatStreams
// Zap logger
lg *zap.Logger
logProps *log.ZapProperties
// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
// leaderCallbacks will be called after the server becomes leader.
leaderCallbacks []func(context.Context) error
// closeCallbacks will be called before the server is closed.
closeCallbacks []func()
// hot region history info storage
hotRegionStorage *storage.HotRegionStorage
// Store as map[string]*grpc.ClientConn
clientConns sync.Map
tsoClientPool struct {
syncutil.RWMutex
clients map[string]*streamWrapper
}
// tsoDispatcher is used to dispatch different TSO requests to
// the corresponding forwarding TSO channel.
tsoDispatcher *tsoutil.TSODispatcher
// tsoProtoFactory is the abstract factory for creating tso
// related data structures defined in the TSO grpc service
tsoProtoFactory *tsoutil.TSOProtoFactory
// pdProtoFactory is the abstract factory for creating tso
// related data structures defined in the PD grpc service
pdProtoFactory *tsoutil.PDProtoFactory
serviceRateLimiter *ratelimit.Controller
serviceLabels map[string][]apiutil.AccessPath
apiServiceLabelMap map[apiutil.AccessPath]string
grpcServiceRateLimiter *ratelimit.Controller
grpcServiceLabels map[string]struct{}
grpcServer *grpc.Server
serviceAuditBackendLabels map[string]*audit.BackendLabels
auditBackends []audit.Backend
registry *registry.ServiceRegistry
isKeyspaceGroupEnabled bool
servicePrimaryMap sync.Map /* Store as map[string]string */
tsoPrimaryWatcher *etcdutil.LoopWatcher
schedulingPrimaryWatcher *etcdutil.LoopWatcher
// Cgroup Monitor
cgMonitor cgroup.Monitor
}
// HandlerBuilder builds a server HTTP handler.
type HandlerBuilder func(context.Context, *Server) (http.Handler, apiutil.APIServiceGroup, error)
// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, cfg *config.Config, services []string, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
// TODO: Currently, whether we enable microservice or not is determined by the service list.
// It's equal to whether we enable the keyspace group or not.
// But indeed the keyspace group is independent of the microservice.
// There could be the following scenarios:
// 1. Enable microservice but disable keyspace group. (non-serverless scenario)
// 2. Enable microservice and enable keyspace group. (serverless scenario)
// 3. Disable microservice and disable keyspace group. (both serverless scenario and non-serverless scenario)
// We should separate the keyspace group from the microservice later.
isKeyspaceGroupEnabled := len(services) != 0
log.Info("PD config", zap.Bool("enable-keyspace-group", isKeyspaceGroupEnabled), zap.Reflect("config", cfg))
serviceMiddlewareCfg := config.NewServiceMiddlewareConfig()
s := &Server{
cfg: cfg,
persistOptions: config.NewPersistOptions(cfg),
serviceMiddlewareCfg: serviceMiddlewareCfg,
serviceMiddlewarePersistOptions: config.NewServiceMiddlewarePersistOptions(serviceMiddlewareCfg),
member: &member.Member{},
ctx: ctx,
startTimestamp: time.Now().Unix(),
DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename),
isKeyspaceGroupEnabled: isKeyspaceGroupEnabled,
tsoClientPool: struct {
syncutil.RWMutex
clients map[string]*streamWrapper
}{
clients: make(map[string]*streamWrapper),
},
}
s.handler = newHandler(s)
// create audit backend
s.auditBackends = []audit.Backend{
audit.NewLocalLogBackend(true),
audit.NewPrometheusBackend(serviceAuditHistogram, serviceAuditCounter, false),
}
s.serviceRateLimiter = ratelimit.NewController(s.ctx, "http", apiConcurrencyGauge)
s.grpcServiceRateLimiter = ratelimit.NewController(s.ctx, "grpc", apiConcurrencyGauge)
s.serviceAuditBackendLabels = make(map[string]*audit.BackendLabels)
s.serviceLabels = make(map[string][]apiutil.AccessPath)
s.grpcServiceLabels = make(map[string]struct{})
s.apiServiceLabelMap = make(map[apiutil.AccessPath]string)
// Adjust etcd config.
etcdCfg, err := s.cfg.GenEmbedEtcdConfig()
if err != nil {
return nil, err
}
if len(legacyServiceBuilders) != 0 {
userHandlers, err := combineBuilderServerHTTPService(ctx, s, legacyServiceBuilders...)
if err != nil {
return nil, err
}
etcdCfg.UserHandlers = userHandlers
}
// New way to register services.
s.registry = registry.NewServerServiceRegistry()
s.registry.RegisterService("MetaStorage", ms_server.NewService)
s.registry.RegisterService("ResourceManager", rm_server.NewService[*Server])
// Register the microservices REST path.
s.registry.InstallAllRESTHandler(s, etcdCfg.UserHandlers)
etcdCfg.ServiceRegister = func(gs *grpc.Server) {
grpcServer := &GrpcServer{Server: s}
pdpb.RegisterPDServer(gs, grpcServer)
keyspacepb.RegisterKeyspaceServer(gs, &KeyspaceServer{GrpcServer: grpcServer})
diagnosticspb.RegisterDiagnosticsServer(gs, s)
// Register the microservices GRPC service.
s.registry.InstallAllGRPCServices(s, gs)
s.grpcServer = gs
}
s.etcdCfg = etcdCfg
s.lg = cfg.Logger
s.logProps = cfg.LogProps
return s, nil
}
func (s *Server) startEtcd(ctx context.Context) error {
newCtx, cancel := context.WithTimeout(ctx, EtcdStartTimeout)
defer cancel()
etcd, err := embed.StartEtcd(s.etcdCfg)
if err != nil {
return errs.ErrStartEtcd.Wrap(err).GenWithStackByCause()
}
// Check cluster ID
urlMap, err := etcdtypes.NewURLsMap(s.cfg.InitialCluster)
if err != nil {
return errs.ErrEtcdURLMap.Wrap(err).GenWithStackByCause()
}
tlsConfig, err := s.cfg.Security.ToClientTLSConfig()
if err != nil {
return err
}
if err = etcdutil.CheckClusterID(etcd.Server.Cluster().ID(), urlMap, tlsConfig); err != nil {
return err
}
select {
// Wait etcd until it is ready to use
case <-etcd.Server.ReadyNotify():
case <-newCtx.Done():
return errs.ErrCancelStartEtcd.FastGenByArgs()
}
// Start the etcd and HTTP clients, then init the member.
err = s.startClient()
if err != nil {
return err
}
err = s.initMember(newCtx, etcd)
if err != nil {
return err
}
s.initGRPCServiceLabels()
return nil
}
func (s *Server) initGRPCServiceLabels() {
for name, serviceInfo := range s.grpcServer.GetServiceInfo() {
if name == gRPCServiceName {
for _, methodInfo := range serviceInfo.Methods {
s.grpcServiceLabels[methodInfo.Name] = struct{}{}
}
}
}
}
func (s *Server) startClient() error {
tlsConfig, err := s.cfg.Security.ToClientTLSConfig()
if err != nil {
return err
}
etcdCfg, err := s.cfg.GenEmbedEtcdConfig()
if err != nil {
return err
}
/* Starting two different etcd clients here is to avoid the throttling. */
// This etcd client will be used to access the etcd cluster to read and write all kinds of meta data.
s.client, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.AdvertiseClientUrls, "server-etcd-client")
if err != nil {
return errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
// This etcd client will only be used to read and write the election-related data, such as leader key.
s.electionClient, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.AdvertiseClientUrls, "election-etcd-client")
if err != nil {
return errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
s.httpClient = etcdutil.CreateHTTPClient(tlsConfig)
return nil
}
func (s *Server) initMember(ctx context.Context, etcd *embed.Etcd) error {
// Update advertise peer URLs.
etcdMembers, err := etcdutil.ListEtcdMembers(ctx, s.client)
if err != nil {
return err
}
etcdServerID := uint64(etcd.Server.ID())
for _, m := range etcdMembers.Members {
if etcdServerID == m.ID {
etcdPeerURLs := strings.Join(m.PeerURLs, ",")
if s.cfg.AdvertisePeerUrls != etcdPeerURLs {
log.Info("update advertise peer urls", zap.String("from", s.cfg.AdvertisePeerUrls), zap.String("to", etcdPeerURLs))
s.cfg.AdvertisePeerUrls = etcdPeerURLs
}
}
}
failpoint.Inject("memberNil", func() {
time.Sleep(1500 * time.Millisecond)
})
s.member = member.NewMember(etcd, s.electionClient, etcdServerID)
return nil
}
// AddStartCallback adds a callback in the startServer phase.
func (s *Server) AddStartCallback(callbacks ...func()) {
s.startCallbacks = append(s.startCallbacks, callbacks...)
}
func (s *Server) startServer(ctx context.Context) error {
clusterID, err := endpoint.InitClusterID(s.client)
if err != nil {
log.Error("failed to init cluster id", errs.ZapError(err))
return err
}
// It may lose accuracy if use float64 to store uint64. So we store the cluster id in label.
metadataGauge.WithLabelValues(fmt.Sprintf("cluster%d", clusterID)).Set(0)
bs.ServerInfoGauge.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix()))
s.member.InitMemberInfo(s.cfg.AdvertiseClientUrls, s.cfg.AdvertisePeerUrls, s.Name())
if err := s.member.SetMemberDeployPath(s.member.ID()); err != nil {
return err
}
if err := s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion); err != nil {
return err
}
if err := s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash); err != nil {
return err
}
s.idAllocator = id.NewAllocator(&id.AllocatorParams{
Client: s.client,
Label: id.DefaultLabel,
Member: s.member.MemberValue(),
})
s.encryptionKeyManager, err = encryption.NewManager(s.client, &s.cfg.Security.Encryption)
if err != nil {
return err
}
// Initialize an etcd storage as the default storage.
defaultStorage := storage.NewStorageWithEtcdBackend(s.client)
// Initialize a specialized LevelDB storage to store the region-related meta info independently.
regionStorage, err := storage.NewRegionStorageWithLevelDBBackend(
ctx,
filepath.Join(s.cfg.DataDir, "region-meta"),
s.encryptionKeyManager)
if err != nil {
return err
}
s.storage = storage.NewCoreStorage(defaultStorage, regionStorage)
s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize)
s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.pdProtoFactory = &tsoutil.PDProtoFactory{}
s.tsoAllocator = tso.NewAllocator(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.storage, s)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocator)
keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{
Client: s.client,
Label: id.KeyspaceLabel,
Member: s.member.MemberValue(),
Step: keyspace.AllocStep,
})
if s.IsKeyspaceGroupEnabled() {
s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client)
}
s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager)
// Only start the metering writer if a valid metering config is provided.
if len(s.cfg.Metering.Type) > 0 {
s.meteringWriter, err = metering.NewWriter(s.ctx, &s.cfg.Metering, fmt.Sprintf("pd%d", s.GetMember().ID()))
if err != nil {
log.Warn("failed to initialize the metering writer", errs.ZapError(err))
} else {
s.meteringWriter.Start()
}
} else {
log.Info("no metering config provided, the metering writer will not be started")
}
s.gcStateManager = gc.NewGCStateManager(s.storage.GetGCStateProvider(), s.cfg.PDServerCfg, s.keyspaceManager)
s.hbStreams = hbstream.NewHeartbeatStreams(ctx, "", s.cluster)
// initial hot_region_storage in here.
s.hotRegionStorage, err = storage.NewHotRegionsStorage(
ctx, filepath.Join(s.cfg.DataDir, "hot-region"), s.encryptionKeyManager, s.handler)
if err != nil {
return err
}
// Run callbacks
log.Info("triggering the start callback functions")
for _, cb := range s.startCallbacks {
cb()
}
// to init all rate limiter and metrics
for service := range s.serviceLabels {
s.serviceRateLimiter.Update(service, ratelimit.InitLimiter())
}
for service := range s.grpcServiceLabels {
s.grpcServiceRateLimiter.Update(service, ratelimit.InitLimiter())
}
failpoint.InjectCall("delayStartServer")
// Server has started.
atomic.StoreInt64(&s.isRunning, 1)
bs.ServerMaxProcsGauge.Set(float64(runtime.GOMAXPROCS(0)))
return nil
}
// AddCloseCallback adds a callback in the Close phase.
func (s *Server) AddCloseCallback(callbacks ...func()) {
s.closeCallbacks = append(s.closeCallbacks, callbacks...)
}
// Close closes the server.
func (s *Server) Close() {
if !atomic.CompareAndSwapInt64(&s.isRunning, 1, 0) {
// server is already closed
return
}
log.Info("closing server")
s.cgMonitor.StopMonitor()
s.stopServerLoop()
if s.IsKeyspaceGroupEnabled() {
s.keyspaceGroupManager.Close()
}
if s.tsoAllocator != nil {
s.tsoAllocator.Close()
}
if s.meteringWriter != nil {
s.meteringWriter.Stop()
}
if s.client != nil {
if err := s.client.Close(); err != nil {
log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err))
}
}
if s.electionClient != nil {
if err := s.electionClient.Close(); err != nil {
log.Error("close election client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err))
}
}
if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
}
if s.member.Etcd() != nil {
s.member.Close()
}
if s.hbStreams != nil {
s.hbStreams.Close()
}
if err := s.storage.Close(); err != nil {
log.Error("close storage meet error", errs.ZapError(err))
}
if s.hotRegionStorage != nil {
if err := s.hotRegionStorage.Close(); err != nil {
log.Error("close hot region storage meet error", errs.ZapError(err))
}
}
s.grpcServiceRateLimiter.Close()
s.serviceRateLimiter.Close()
// Run callbacks
log.Info("triggering the close callback functions")
for _, cb := range s.closeCallbacks {
cb()
}
s.clientConns.Range(func(_, value any) bool {
conn := value.(*grpc.ClientConn)
if err := conn.Close(); err != nil {
log.Error("close grpc client meet error", errs.ZapError(err))
}
return true
})
log.Info("close server")
}
// IsClosed checks whether server is closed or not.
func (s *Server) IsClosed() bool {
return atomic.LoadInt64(&s.isRunning) == 0
}
// Run runs the pd server.
func (s *Server) Run() error {
go systimemon.StartMonitor(s.ctx, time.Now, func() {
log.Error("system time jumps backward", errs.ZapError(errs.ErrIncorrectSystemTime))
timeJumpBackCounter.Inc()
})
if err := s.startEtcd(s.ctx); err != nil {
return err
}
if err := s.startServer(s.ctx); err != nil {
return err
}
s.cgMonitor.StartMonitor(s.ctx)
failpoint.Inject("delayStartServerLoop", func() {
time.Sleep(2 * time.Second)
})
s.startServerLoop(s.ctx)
return nil
}
// SetServiceAuditBackendForHTTP is used to register service audit config for HTTP.
func (s *Server) SetServiceAuditBackendForHTTP(route *mux.Route, labels ...string) {
if len(route.GetName()) == 0 {
return
}
if len(labels) > 0 {
s.SetServiceAuditBackendLabels(route.GetName(), labels)
}
}
// Context returns the context of server.
func (s *Server) Context() context.Context {
return s.ctx
}
// LoopContext returns the loop context of server.
func (s *Server) LoopContext() context.Context {
return s.serverLoopCtx
}
func (s *Server) startServerLoop(ctx context.Context) {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx)
s.serverLoopWg.Add(4)
go s.leaderLoop()
go s.etcdLeaderLoop()
go s.serverMetricsLoop()
go s.encryptionKeyManagerLoop()
if s.IsKeyspaceGroupEnabled() {
s.initTSOPrimaryWatcher()
s.initSchedulingPrimaryWatcher()
}
}
func (s *Server) stopServerLoop() {
s.serverLoopCancel()
s.serverLoopWg.Wait()
}
func (s *Server) serverMetricsLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
ticker := time.NewTicker(serverMetricsInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.collectEtcdStateMetrics()
case <-ctx.Done():
log.Info("server is closed, exit metrics loop")
return
}
}
}
// encryptionKeyManagerLoop is used to start monitor encryption key changes.
func (s *Server) encryptionKeyManagerLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
s.encryptionKeyManager.StartBackgroundLoop(ctx)
log.Info("server is closed, exist encryption key manager loop")
}
func (s *Server) collectEtcdStateMetrics() {
etcdTermGauge.Set(float64(s.member.Etcd().Server.Term()))
etcdAppliedIndexGauge.Set(float64(s.member.Etcd().Server.AppliedIndex()))
etcdCommittedIndexGauge.Set(float64(s.member.Etcd().Server.CommittedIndex()))
}
func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapResponse, error) {
clusterID := keypath.ClusterID()
log.Info("try to bootstrap raft cluster",
zap.Uint64("cluster-id", clusterID),
zap.String("request", fmt.Sprintf("%v", req)))
if err := checkBootstrapRequest(req); err != nil {
return nil, err
}
clusterMeta := metapb.Cluster{
Id: clusterID,
MaxPeerCount: uint32(s.persistOptions.GetMaxReplicas()),
}
// Set cluster meta
clusterValue, err := clusterMeta.Marshal()
if err != nil {
return nil, errors.WithStack(err)
}
var ops []clientv3.Op
ops = append(ops, clientv3.OpPut(keypath.ClusterPath(), string(clusterValue)))
// Set bootstrap time
// Because we will write the cluster meta into etcd directly,
// so we need to handle the root key path manually here.
timeData := typeutil.Uint64ToBytes(uint64(time.Now().UnixNano()))
ops = append(ops, clientv3.OpPut(keypath.ClusterBootstrapTimePath(), string(timeData)))
// Set store meta
storeMeta := req.GetStore()
storeValue, err := storeMeta.Marshal()
if err != nil {
return nil, errors.WithStack(err)
}
ops = append(ops, clientv3.OpPut(keypath.StorePath(storeMeta.GetId()), string(storeValue)))
regionValue, err := req.GetRegion().Marshal()
if err != nil {
return nil, errors.WithStack(err)
}
// Set region meta with region id.
ops = append(ops, clientv3.OpPut(keypath.RegionPath(req.GetRegion().GetId()), string(regionValue)))
// TODO: we must figure out a better way to handle bootstrap failed, maybe intervene manually.
bootstrapCmp := clientv3.Compare(clientv3.CreateRevision(keypath.ClusterPath()), "=", 0)
resp, err := kv.NewSlowLogTxn(s.client).If(bootstrapCmp).Then(ops...).Commit()
if err != nil {
return nil, errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause()
}
if !resp.Succeeded {
log.Warn("cluster already bootstrapped", zap.Uint64("cluster-id", clusterID))
return nil, errs.ErrEtcdTxnConflict.FastGenByArgs()
}
log.Info("bootstrap cluster ok", zap.Uint64("cluster-id", clusterID))
err = s.storage.SaveRegion(req.GetRegion())
if err != nil {
log.Warn("save the bootstrap region failed", errs.ZapError(err))
}
err = s.storage.Flush()
if err != nil {
log.Warn("flush the bootstrap region failed", errs.ZapError(err))
}
if err := s.cluster.Start(s, true); err != nil {
return nil, err
}
if err = s.GetKeyspaceManager().Bootstrap(); err != nil {
log.Warn("bootstrapping keyspace manager failed", errs.ZapError(err))
}
return &pdpb.BootstrapResponse{
ReplicationStatus: s.cluster.GetReplicationMode().GetReplicationStatus(),
}, nil
}
func (s *Server) createRaftCluster() error {
if s.cluster.IsRunning() {
return nil
}
return s.cluster.Start(s, false)
}
func (s *Server) stopRaftCluster() {
failpoint.Inject("raftclusterIsBusy", func() {})
s.cluster.Stop()
}
// IsKeyspaceGroupEnabled return whether the server is in PD.
func (s *Server) IsKeyspaceGroupEnabled() bool {
return s.isKeyspaceGroupEnabled
}
// GetAddr returns the server urls for clients.
func (s *Server) GetAddr() string {
return s.cfg.AdvertiseClientUrls
}
// GetClientScheme returns the client URL scheme
func (s *Server) GetClientScheme() string {
if len(s.cfg.Security.CertPath) == 0 && len(s.cfg.Security.KeyPath) == 0 {
return "http"
}
return "https"
}
// GetMemberInfo returns the server member information.
func (s *Server) GetMemberInfo() *pdpb.Member {
return typeutil.DeepClone(s.member.Member(), core.MemberFactory)
}
// GetHandler returns the handler for API.
func (s *Server) GetHandler() *Handler {
return s.handler
}
// GetEndpoints returns the etcd endpoints for outer use.
func (s *Server) GetEndpoints() []string {
return s.client.Endpoints()
}
// GetClient returns builtin etcd client.
func (s *Server) GetClient() *clientv3.Client {
return s.client
}
// GetHTTPClient returns builtin http client.
func (s *Server) GetHTTPClient() *http.Client {
return s.httpClient
}
// GetLeader returns the leader of PD cluster(i.e the PD leader).
func (s *Server) GetLeader() *pdpb.Member {
return s.member.GetLeader()
}
// GetServingUrls gets service endpoints.
func (s *Server) GetServingUrls() []string {
return s.member.GetServingUrls()
}
// GetMember returns the member of server.
func (s *Server) GetMember() *member.Member {
return s.member
}
// GetStorage returns the backend storage of server.
func (s *Server) GetStorage() storage.Storage {
return s.storage
}
// GetGCStateManager returns the GC state manager of the server.
func (s *Server) GetGCStateManager() *gc.GCStateManager {
return s.gcStateManager
}
// GetHistoryHotRegionStorage returns the backend storage of historyHotRegion.
func (s *Server) GetHistoryHotRegionStorage() *storage.HotRegionStorage {
return s.hotRegionStorage
}
// SetStorage changes the storage only for test purpose.
// When we use it, we should prevent calling GetStorage, otherwise, it may cause a data race problem.
func (s *Server) SetStorage(storage storage.Storage) {
s.storage = storage
}
// GetBasicCluster returns the basic cluster of server.
func (s *Server) GetBasicCluster() *core.BasicCluster {
return s.basicCluster
}
// GetPersistOptions returns the schedule option.
func (s *Server) GetPersistOptions() *config.PersistOptions {
return s.persistOptions
}
// GetServiceMiddlewarePersistOptions returns the service middleware persist option.
func (s *Server) GetServiceMiddlewarePersistOptions() *config.ServiceMiddlewarePersistOptions {
return s.serviceMiddlewarePersistOptions
}
// GetHBStreams returns the heartbeat streams.
func (s *Server) GetHBStreams() *hbstream.HeartbeatStreams {
return s.hbStreams
}
// GetAllocator returns the ID allocator of server.
func (s *Server) GetAllocator() id.Allocator {
return s.idAllocator
}
// GetTSOAllocator returns the TSO Allocator.
func (s *Server) GetTSOAllocator() *tso.Allocator {
return s.tsoAllocator
}
// GetKeyspaceManager returns the keyspace manager of server.
func (s *Server) GetKeyspaceManager() *keyspace.Manager {
return s.keyspaceManager
}
// SetKeyspaceManager sets the keyspace manager of server.
// Note: it is only used for test.
func (s *Server) SetKeyspaceManager(keyspaceManager *keyspace.Manager) {
s.keyspaceManager = keyspaceManager
}
// GetKeyspaceGroupManager returns the keyspace group manager of server.
func (s *Server) GetKeyspaceGroupManager() *keyspace.GroupManager {
return s.keyspaceGroupManager
}
// SetKeyspaceGroupManager sets the keyspace group manager of server.
// Note: it is only used for test.
func (s *Server) SetKeyspaceGroupManager(keyspaceGroupManager *keyspace.GroupManager) {
s.keyspaceGroupManager = keyspaceGroupManager
}
// GetMeteringWriter returns the metering writer.
func (s *Server) GetMeteringWriter() *metering.Writer {
return s.meteringWriter
}
// Name returns the unique etcd Name for this server in etcd cluster.
func (s *Server) Name() string {
return s.cfg.Name
}
// StartTimestamp returns the start timestamp of this server
func (s *Server) StartTimestamp() int64 {
return s.startTimestamp
}
// GetMembers returns PD server list.
func (s *Server) GetMembers() ([]*pdpb.Member, error) {
if s.IsClosed() {
return nil, errs.ErrServerNotStarted.FastGenByArgs()
}
return cluster.GetMembers(s.GetClient())
}
// GetServiceMiddlewareConfig gets the service middleware config information.
func (s *Server) GetServiceMiddlewareConfig() *config.ServiceMiddlewareConfig {
cfg := s.serviceMiddlewareCfg.Clone()
cfg.AuditConfig = *s.serviceMiddlewarePersistOptions.GetAuditConfig().Clone()
cfg.RateLimitConfig = *s.serviceMiddlewarePersistOptions.GetRateLimitConfig().Clone()
cfg.GRPCRateLimitConfig = *s.serviceMiddlewarePersistOptions.GetGRPCRateLimitConfig().Clone()
return cfg
}
// GetConfig gets the config information.
func (s *Server) GetConfig() *config.Config {
cfg := s.cfg.Clone()
cfg.Schedule = *s.persistOptions.GetScheduleConfig().Clone()
cfg.Replication = *s.persistOptions.GetReplicationConfig().Clone()
cfg.PDServerCfg = *s.persistOptions.GetPDServerConfig().Clone()
cfg.ReplicationMode = *s.persistOptions.GetReplicationModeConfig()
cfg.Keyspace = *s.persistOptions.GetKeyspaceConfig().Clone()
cfg.Microservice = *s.persistOptions.GetMicroserviceConfig().Clone()
cfg.LabelProperty = s.persistOptions.GetLabelPropertyConfig().Clone()
cfg.ClusterVersion = *s.persistOptions.GetClusterVersion()
return cfg
}
// GetKeyspaceConfig gets the keyspace config information.
func (s *Server) GetKeyspaceConfig() *config.KeyspaceConfig {
return s.persistOptions.GetKeyspaceConfig().Clone()
}
// SetKeyspaceConfig sets the keyspace config information.
func (s *Server) SetKeyspaceConfig(cfg config.KeyspaceConfig) error {
if err := cfg.Validate(); err != nil {
return err
}
old := s.persistOptions.GetKeyspaceConfig()
s.persistOptions.SetKeyspaceConfig(&cfg)
if err := s.persistOptions.Persist(s.storage); err != nil {
s.persistOptions.SetKeyspaceConfig(old)
log.Error("failed to update keyspace config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
errs.ZapError(err))
return err
}
s.keyspaceManager.UpdateConfig(&cfg)
log.Info("keyspace config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}
// GetMicroserviceConfig gets the microservice config information.
func (s *Server) GetMicroserviceConfig() *config.MicroserviceConfig {
return s.persistOptions.GetMicroserviceConfig().Clone()
}
// SetMicroserviceConfig sets the microservice config information.
func (s *Server) SetMicroserviceConfig(cfg config.MicroserviceConfig) error {
old := s.persistOptions.GetMicroserviceConfig()
s.persistOptions.SetMicroserviceConfig(&cfg)
if err := s.persistOptions.Persist(s.storage); err != nil {
s.persistOptions.SetMicroserviceConfig(old)
log.Error("failed to update microservice config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
errs.ZapError(err))
return err
}
log.Info("microservice config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}
// GetScheduleConfig gets the balance config information.
func (s *Server) GetScheduleConfig() *sc.ScheduleConfig {
return s.persistOptions.GetScheduleConfig().Clone()
}
// SetScheduleConfig sets the balance config information.
// This function is exported to be used by the API.
func (s *Server) SetScheduleConfig(cfg sc.ScheduleConfig) error {
if err := cfg.Validate(); err != nil {
return err
}
if err := cfg.Deprecated(); err != nil {
return err
}
old := s.persistOptions.GetScheduleConfig()
s.persistOptions.SetScheduleConfig(&cfg)
if err := s.persistOptions.Persist(s.storage); err != nil {
s.persistOptions.SetScheduleConfig(old)
log.Error("failed to update schedule config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
errs.ZapError(err))
return err
}
// Update the scheduling halt status at the same time.
s.persistOptions.SetSchedulingAllowanceStatus(cfg.HaltScheduling, "manually")
log.Info("schedule config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}
// GetReplicationConfig get the replication config.
func (s *Server) GetReplicationConfig() *sc.ReplicationConfig {
return s.persistOptions.GetReplicationConfig().Clone()
}
// SetReplicationConfig sets the replication config.
func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error {
if err := cfg.Validate(); err != nil {
return err
}
old := s.persistOptions.GetReplicationConfig()
if cfg.EnablePlacementRules != old.EnablePlacementRules {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if cfg.EnablePlacementRules {
// initialize rule manager.
if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel, false); err != nil {
return err
}
} else {
// NOTE: can be removed after placement rules feature is enabled by default.
for _, s := range rc.GetStores() {
if !s.IsRemoved() && s.IsTiFlash() {
return errors.New("cannot disable placement rules with TiFlash nodes")
}
}
}
}
var rule *placement.Rule
if cfg.EnablePlacementRules {
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
// replication.MaxReplicas won't work when placement rule is enabled and not only have one default rule.
defaultRule := rc.GetRuleManager().GetRule(placement.DefaultGroupID, placement.DefaultRuleID)
CheckInDefaultRule := func() error {
// replication config won't work when placement rule is enabled and exceeds one default rule
if defaultRule == nil ||
len(defaultRule.StartKey) != 0 || len(defaultRule.EndKey) != 0 {
return errors.New("cannot update MaxReplicas, LocationLabels or IsolationLevel when placement rules feature is enabled and not only default rule exists, please update rule instead")
}
if defaultRule.Count != int(old.MaxReplicas) || !typeutil.AreStringSlicesEqual(defaultRule.LocationLabels, []string(old.LocationLabels)) || defaultRule.IsolationLevel != old.IsolationLevel {
return errors.New("cannot to update replication config, the default rules do not consistent with replication config, please update rule instead")
}
return nil
}
if cfg.MaxReplicas != old.MaxReplicas || !typeutil.AreStringSlicesEqual(cfg.LocationLabels, old.LocationLabels) || cfg.IsolationLevel != old.IsolationLevel {
if err := CheckInDefaultRule(); err != nil {
return err
}
rule = defaultRule
}
}
if rule != nil {
rule.Count = int(cfg.MaxReplicas)
rule.LocationLabels = cfg.LocationLabels
rule.IsolationLevel = cfg.IsolationLevel
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if err := rc.GetRuleManager().SetRule(rule); err != nil {
log.Error("failed to update rule count",
errs.ZapError(err))
return err
}
}
s.persistOptions.SetReplicationConfig(&cfg)
if err := s.persistOptions.Persist(s.storage); err != nil {
s.persistOptions.SetReplicationConfig(old)
if rule != nil {
rule.Count = int(old.MaxReplicas)
rc := s.GetRaftCluster()
if rc == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if e := rc.GetRuleManager().SetRule(rule); e != nil {
log.Error("failed to roll back count of rule when update replication config", errs.ZapError(e))
}
}
log.Error("failed to update replication config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
errs.ZapError(err))
return err
}
log.Info("replication config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}
// GetAuditConfig gets the audit config information.
func (s *Server) GetAuditConfig() *config.AuditConfig {
return s.serviceMiddlewarePersistOptions.GetAuditConfig().Clone()
}
// SetAuditConfig sets the audit config.
func (s *Server) SetAuditConfig(cfg config.AuditConfig) error {
old := s.serviceMiddlewarePersistOptions.GetAuditConfig()
s.serviceMiddlewarePersistOptions.SetAuditConfig(&cfg)
if err := s.serviceMiddlewarePersistOptions.Persist(s.storage); err != nil {
s.serviceMiddlewarePersistOptions.SetAuditConfig(old)
log.Error("failed to update Audit config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
errs.ZapError(err))
return err
}
log.Info("audit config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}
// UpdateRateLimitConfig is used to update rate-limit config which will reserve old limiter-config
func (s *Server) UpdateRateLimitConfig(key, label string, value ratelimit.DimensionConfig) error {
cfg := s.GetServiceMiddlewareConfig()
rateLimitCfg := make(map[string]ratelimit.DimensionConfig)
for label, item := range cfg.RateLimitConfig.LimiterConfig {
rateLimitCfg[label] = item
}
rateLimitCfg[label] = value
return s.UpdateRateLimit(&cfg.RateLimitConfig, key, &rateLimitCfg)
}
// UpdateRateLimit is used to update rate-limit config which will overwrite limiter-config
func (s *Server) UpdateRateLimit(cfg *config.RateLimitConfig, key string, value any) error {
updated, found, err := jsonutil.AddKeyValue(cfg, key, value)
if err != nil {
return err
}
if !found {
return errors.Errorf("config item %s not found", key)
}
if updated {
err = s.SetRateLimitConfig(*cfg)
}
return err
}
// GetRateLimitConfig gets the rate limit config information.
func (s *Server) GetRateLimitConfig() *config.RateLimitConfig {
return s.serviceMiddlewarePersistOptions.GetRateLimitConfig().Clone()
}
// SetRateLimitConfig sets the rate limit config.
func (s *Server) SetRateLimitConfig(cfg config.RateLimitConfig) error {
old := s.serviceMiddlewarePersistOptions.GetRateLimitConfig()
s.serviceMiddlewarePersistOptions.SetRateLimitConfig(&cfg)
if err := s.serviceMiddlewarePersistOptions.Persist(s.storage); err != nil {
s.serviceMiddlewarePersistOptions.SetRateLimitConfig(old)
log.Error("failed to update rate limit config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
errs.ZapError(err))
return err
}
log.Info("rate limit config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}
// UpdateGRPCRateLimitConfig is used to update rate-limit config which will reserve old limiter-config
func (s *Server) UpdateGRPCRateLimitConfig(key, label string, value ratelimit.DimensionConfig) error {
cfg := s.GetServiceMiddlewareConfig()
rateLimitCfg := make(map[string]ratelimit.DimensionConfig)
for label, item := range cfg.GRPCRateLimitConfig.LimiterConfig {
rateLimitCfg[label] = item
}
rateLimitCfg[label] = value
return s.UpdateGRPCRateLimit(&cfg.GRPCRateLimitConfig, key, &rateLimitCfg)
}
// UpdateGRPCRateLimit is used to update gRPC rate-limit config which will overwrite limiter-config
func (s *Server) UpdateGRPCRateLimit(cfg *config.GRPCRateLimitConfig, key string, value any) error {
updated, found, err := jsonutil.AddKeyValue(cfg, key, value)
if err != nil {
return err
}
if !found {
return errors.Errorf("config item %s not found", key)
}
if updated {
err = s.SetGRPCRateLimitConfig(*cfg)
}
return err
}
// GetGRPCRateLimitConfig gets the rate limit config information.
func (s *Server) GetGRPCRateLimitConfig() *config.GRPCRateLimitConfig {
return s.serviceMiddlewarePersistOptions.GetGRPCRateLimitConfig().Clone()
}
// SetGRPCRateLimitConfig sets the rate limit config.
func (s *Server) SetGRPCRateLimitConfig(cfg config.GRPCRateLimitConfig) error {
old := s.serviceMiddlewarePersistOptions.GetGRPCRateLimitConfig()
s.serviceMiddlewarePersistOptions.SetGRPCRateLimitConfig(&cfg)
if err := s.serviceMiddlewarePersistOptions.Persist(s.storage); err != nil {
s.serviceMiddlewarePersistOptions.SetGRPCRateLimitConfig(old)
log.Error("failed to update gRPC rate limit config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
errs.ZapError(err))
return err
}
log.Info("gRPC rate limit config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}
// GetPDServerConfig gets the balance config information.
func (s *Server) GetPDServerConfig() *config.PDServerConfig {
return s.persistOptions.GetPDServerConfig().Clone()
}
// SetPDServerConfig sets the server config.
func (s *Server) SetPDServerConfig(cfg config.PDServerConfig) error {
switch cfg.DashboardAddress {
case "auto":
case "none":
default:
if !strings.HasPrefix(cfg.DashboardAddress, "http") {
cfg.DashboardAddress = fmt.Sprintf("%s://%s", s.GetClientScheme(), cfg.DashboardAddress)
}
if !cluster.IsClientURL(cfg.DashboardAddress, s.client) {
return errors.Errorf("%s is not the client url of any member", cfg.DashboardAddress)
}
}
if err := cfg.Validate(); err != nil {
return err
}
old := s.persistOptions.GetPDServerConfig()
s.persistOptions.SetPDServerConfig(&cfg)
if err := s.persistOptions.Persist(s.storage); err != nil {
s.persistOptions.SetPDServerConfig(old)
log.Error("failed to update PDServer config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
errs.ZapError(err))
return err
}
log.Info("PD server config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}
// SetLabelPropertyConfig sets the label property config.
func (s *Server) SetLabelPropertyConfig(cfg config.LabelPropertyConfig) error {
old := s.persistOptions.GetLabelPropertyConfig()
s.persistOptions.SetLabelPropertyConfig(cfg)
if err := s.persistOptions.Persist(s.storage); err != nil {
s.persistOptions.SetLabelPropertyConfig(old)
log.Error("failed to update label property config",
zap.Reflect("new", cfg),
zap.Reflect("old", &old),
errs.ZapError(err))
return err
}
log.Info("label property config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}
// SetLabelProperty inserts a label property config.
func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error {
s.persistOptions.SetLabelProperty(typ, labelKey, labelValue)
err := s.persistOptions.Persist(s.storage)
if err != nil {
s.persistOptions.DeleteLabelProperty(typ, labelKey, labelValue)
log.Error("failed to update label property config",
zap.String("typ", typ),
zap.String("label-key", labelKey),
zap.String("label-value", labelValue),
zap.Reflect("config", s.persistOptions.GetLabelPropertyConfig()),
errs.ZapError(err))
return err
}
log.Info("label property config is updated", zap.Reflect("config", s.persistOptions.GetLabelPropertyConfig()))
return nil
}
// DeleteLabelProperty deletes a label property config.
func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error {
s.persistOptions.DeleteLabelProperty(typ, labelKey, labelValue)
err := s.persistOptions.Persist(s.storage)
if err != nil {
s.persistOptions.SetLabelProperty(typ, labelKey, labelValue)
log.Error("failed to delete label property config",
zap.String("typ", typ),
zap.String("label-key", labelKey),
zap.String("label-value", labelValue),
zap.Reflect("config", s.persistOptions.GetLabelPropertyConfig()),
errs.ZapError(err))
return err
}
log.Info("label property config is deleted", zap.Reflect("config", s.persistOptions.GetLabelPropertyConfig()))
return nil
}
// GetLabelProperty returns the whole label property config.
func (s *Server) GetLabelProperty() config.LabelPropertyConfig {
return s.persistOptions.GetLabelPropertyConfig().Clone()
}
// SetClusterVersion sets the version of cluster.
func (s *Server) SetClusterVersion(v string) error {
version, err := versioninfo.ParseVersion(v)
if err != nil {
return err
}
old := s.persistOptions.GetClusterVersion()
s.persistOptions.SetClusterVersion(version)
err = s.persistOptions.Persist(s.storage)
if err != nil {
s.persistOptions.SetClusterVersion(old)
log.Error("failed to update cluster version",
zap.String("old-version", old.String()),
zap.String("new-version", v),
errs.ZapError(err))
return err
}
log.Info("cluster version is updated", zap.String("new-version", v))
return nil
}
// GetClusterVersion returns the version of cluster.
func (s *Server) GetClusterVersion() semver.Version {
return *s.persistOptions.GetClusterVersion()
}
// GetTLSConfig get the security config.
func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
}
// GetControllerConfig gets the resource manager controller config.
func (s *Server) GetControllerConfig() *rm_server.ControllerConfig {
return &s.cfg.Controller
}
// GetRaftCluster gets Raft cluster.
// If cluster has not been bootstrapped, return nil.
func (s *Server) GetRaftCluster() *cluster.RaftCluster {
if s.IsClosed() || !s.cluster.IsRunning() {
return nil
}
return s.cluster
}
// IsServiceIndependent returns whether the service is independent.
func (s *Server) IsServiceIndependent(name string) bool {
if s.isKeyspaceGroupEnabled && !s.IsClosed() {
if name == mcs.TSOServiceName && !s.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
return true
}
return s.cluster.IsServiceIndependent(name)
}
return false
}
// DirectlyGetRaftCluster returns raft cluster directly.
// Only used for test.
func (s *Server) DirectlyGetRaftCluster() *cluster.RaftCluster {
return s.cluster
}
// GetCluster gets cluster.
func (s *Server) GetCluster() *metapb.Cluster {
return &metapb.Cluster{
Id: keypath.ClusterID(),
MaxPeerCount: uint32(s.persistOptions.GetMaxReplicas()),
}
}
// GetServerOption gets the option of the server.
func (s *Server) GetServerOption() *config.PersistOptions {
return s.persistOptions
}
// GetMetaRegions gets meta regions from cluster.
func (s *Server) GetMetaRegions() []*metapb.Region {
rc := s.GetRaftCluster()
if rc != nil {
return rc.GetMetaRegions()
}
return nil
}
// GetRegions gets regions from cluster.
func (s *Server) GetRegions() []*core.RegionInfo {
rc := s.GetRaftCluster()
if rc != nil {
return rc.GetRegions()
}
return nil
}
// GetServiceLabels returns ApiAccessPaths by given service label
// TODO: this function will be used for updating api rate limit config
func (s *Server) GetServiceLabels(serviceLabel string) []apiutil.AccessPath {
if apis, ok := s.serviceLabels[serviceLabel]; ok {
return apis
}
return nil
}
// IsGRPCServiceLabelExist returns if the service label exists
func (s *Server) IsGRPCServiceLabelExist(serviceLabel string) bool {
_, ok := s.grpcServiceLabels[serviceLabel]
return ok
}
// GetAPIAccessServiceLabel returns service label by given access path
// TODO: this function will be used for updating api rate limit config
func (s *Server) GetAPIAccessServiceLabel(accessPath apiutil.AccessPath) string {
if serviceLabel, ok := s.apiServiceLabelMap[accessPath]; ok {
return serviceLabel
}
accessPathNoMethod := apiutil.NewAccessPath(accessPath.Path, "")
if serviceLabel, ok := s.apiServiceLabelMap[accessPathNoMethod]; ok {
return serviceLabel
}
return ""
}
// AddServiceLabel is used to add the relationship between service label and api access path
// TODO: this function will be used for updating api rate limit config
func (s *Server) AddServiceLabel(serviceLabel string, accessPath apiutil.AccessPath) {
if slice, ok := s.serviceLabels[serviceLabel]; ok {
slice = append(slice, accessPath)
s.serviceLabels[serviceLabel] = slice
} else {
slice = []apiutil.AccessPath{accessPath}
s.serviceLabels[serviceLabel] = slice
}
s.apiServiceLabelMap[accessPath] = serviceLabel
}
// GetAuditBackend returns audit backends
func (s *Server) GetAuditBackend() []audit.Backend {
return s.auditBackends
}
// GetServiceAuditBackendLabels returns audit backend labels by serviceLabel
func (s *Server) GetServiceAuditBackendLabels(serviceLabel string) *audit.BackendLabels {
return s.serviceAuditBackendLabels[serviceLabel]
}
// SetServiceAuditBackendLabels is used to add audit backend labels for service by service label
func (s *Server) SetServiceAuditBackendLabels(serviceLabel string, labels []string) {
s.serviceAuditBackendLabels[serviceLabel] = &audit.BackendLabels{Labels: labels}
}
// GetServiceRateLimiter is used to get rate limiter
func (s *Server) GetServiceRateLimiter() *ratelimit.Controller {
return s.serviceRateLimiter
}
// IsInRateLimitAllowList returns whether given service label is in allow lost
func (s *Server) IsInRateLimitAllowList(serviceLabel string) bool {
return s.serviceRateLimiter.IsInAllowList(serviceLabel)
}
// UpdateServiceRateLimiter is used to update RateLimiter
func (s *Server) UpdateServiceRateLimiter(serviceLabel string, opts ...ratelimit.Option) ratelimit.UpdateStatus {
return s.serviceRateLimiter.Update(serviceLabel, opts...)
}
// GetGRPCRateLimiter is used to get rate limiter
func (s *Server) GetGRPCRateLimiter() *ratelimit.Controller {
return s.grpcServiceRateLimiter
}
// UpdateGRPCServiceRateLimiter is used to update RateLimiter
func (s *Server) UpdateGRPCServiceRateLimiter(serviceLabel string, opts ...ratelimit.Option) ratelimit.UpdateStatus {
return s.grpcServiceRateLimiter.Update(serviceLabel, opts...)
}
// GetClusterStatus gets cluster status.
func (s *Server) GetClusterStatus() (*cluster.Status, error) {
return s.cluster.LoadClusterStatus()
}
// SetLogLevel sets log level.
func (s *Server) SetLogLevel(level string) error {
if !logutil.IsLevelLegal(level) {
return errors.Errorf("log level %s is illegal", level)
}
s.cfg.Log.Level = level
log.SetLevel(logutil.StringToZapLogLevel(level))
log.Warn("log level changed", zap.String("level", log.GetLevel().String()))
return nil
}
// GetReplicationModeConfig returns the replication mode config.
func (s *Server) GetReplicationModeConfig() *config.ReplicationModeConfig {
return s.persistOptions.GetReplicationModeConfig().Clone()
}
// SetReplicationModeConfig sets the replication mode.
func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) error {
if config.NormalizeReplicationMode(cfg.ReplicationMode) == "" {
return errors.Errorf("invalid replication mode: %v", cfg.ReplicationMode)
}
old := s.persistOptions.GetReplicationModeConfig()
s.persistOptions.SetReplicationModeConfig(&cfg)
if err := s.persistOptions.Persist(s.storage); err != nil {
s.persistOptions.SetReplicationModeConfig(old)
log.Error("failed to update replication mode config",
zap.Reflect("new", cfg),
zap.Reflect("old", &old),
errs.ZapError(err))
return err
}
log.Info("replication mode config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
rc := s.GetRaftCluster()
if rc != nil {
err := rc.GetReplicationMode().UpdateConfig(cfg)
if err != nil {
log.Warn("failed to update replication mode", errs.ZapError(err))
// revert to old config
// NOTE: since we can't put the 2 storage mutations in a batch, it
// is possible that memory and persistent data become different
// (when below revert fail). They will become the same after PD is
// restart or PD leader is changed.
s.persistOptions.SetReplicationModeConfig(old)
revertErr := s.persistOptions.Persist(s.storage)
if revertErr != nil {
log.Error("failed to revert replication mode persistent config", errs.ZapError(revertErr))
}
}
return err
}
return nil
}
// IsServing returns whether the server is the leader if there is embedded etcd, or the primary otherwise.
func (s *Server) IsServing() bool {
return s.member.IsServing()
}
// AddServiceReadyCallback adds callbacks when the server becomes the leader if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
s.leaderCallbacks = append(s.leaderCallbacks, callbacks...)
}
func (s *Server) leaderLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
for {
if s.IsClosed() {
log.Info("server is closed, return PD leader loop")
return
}
leader, checkAgain := s.member.CheckLeader()
// add failpoint to test leader check go to stuck.
failpoint.Inject("leaderLoopCheckAgain", func(val failpoint.Value) {
memberString := val.(string)
memberID, _ := strconv.ParseUint(memberString, 10, 64)
if s.member.ID() == memberID {
checkAgain = true
}
})
if checkAgain {
continue
}
if leader != nil {
err := s.reloadConfigFromKV()
if err != nil {
log.Error("reload config failed", errs.ZapError(err))
continue
}
syncer := s.cluster.GetRegionSyncer()
if s.persistOptions.IsUseRegionStorage() {
syncer.StartSyncWithLeader(leader.GetListenUrls()[0])
}
log.Info("start to watch pd leader", zap.Stringer("pd-leader", leader))
// WatchLeader will keep looping and never return unless the PD leader has changed.
leader.Watch(s.serverLoopCtx)
syncer.StopSyncWithLeader()
log.Info("pd leader has changed, try to re-campaign a pd leader")
}
// To make sure the etcd leader and PD leader are on the same server.
etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
if s.member.GetLeader() == nil {
lastUpdated := s.member.GetLastLeaderUpdatedTime()
// use random timeout to avoid leader campaigning storm.
randomTimeout := time.Duration(rand.Intn(lostPDLeaderMaxTimeoutSecs))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration
// add failpoint to test the campaign leader logic.
failpoint.Inject("timeoutWaitPDLeader", func() {
log.Info("timeoutWaitPDLeader is injected, skip wait other etcd leader be etcd leader")
randomTimeout = time.Duration(rand.Intn(10))*time.Millisecond + 100*time.Millisecond
})
if lastUpdated.Add(randomTimeout).Before(time.Now()) && !lastUpdated.IsZero() && etcdLeader != 0 {
log.Info("the pd leader is lost for a long time, try to re-campaign a pd leader with resign etcd leader",
zap.Duration("timeout", randomTimeout),
zap.Time("last-updated", lastUpdated),
zap.String("current-leader-member-id", etcdtypes.ID(etcdLeader).String()),
zap.String("transferee-member-id", etcdtypes.ID(s.member.ID()).String()),
)
if err := s.member.MoveEtcdLeader(s.ctx, etcdLeader, s.member.ID()); err != nil {
log.Error("failed to move etcd leader", errs.ZapError(err))
}
}
}
log.Info("skip campaigning of pd leader and check later",
zap.String("server-name", s.Name()),
zap.Uint64("etcd-leader-id", etcdLeader),
zap.Uint64("member-id", s.member.ID()))
time.Sleep(200 * time.Millisecond)
continue
}
s.campaignLeader()
}
}
func (s *Server) campaignLeader() {
log.Info("start to campaign PD leader", zap.String("campaign-leader-name", s.Name()))
if err := s.member.Campaign(s.ctx, s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign PD leader meets error due to txn conflict, another PD may campaign successfully",
zap.String("campaign-leader-name", s.Name()))
} else {
log.Error("campaign PD leader meets error due to etcd error",
zap.String("campaign-leader-name", s.Name()),
errs.ZapError(err))
}
return
}
// Start keepalive the leadership and enable TSO service.
// TSO service is strictly enabled/disabled by PD leader lease for 2 reasons:
// 1. lease based approach is not affected by thread pause, slow runtime schedule, etc.
// 2. load region could be slow. Based on lease we can recover TSO service faster.
ctx, cancel := context.WithCancel(s.serverLoopCtx)
var resetLeaderOnce sync.Once
defer resetLeaderOnce.Do(func() {
cancel()
s.member.Resign()
})
// maintain the PD leadership, after this, TSO can be service.
s.member.GetLeadership().Keep(ctx)
log.Info("campaign PD leader ok", zap.String("campaign-leader-name", s.Name()))
if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
return
}
if err := s.persistOptions.LoadTTLFromEtcd(s.ctx, s.client); err != nil {
log.Error("failed to load persistOptions from etcd", errs.ZapError(err))
return
}
if err := s.encryptionKeyManager.SetLeadership(s.member.GetLeadership()); err != nil {
log.Error("failed to initialize encryption", errs.ZapError(err))
return
}
log.Info("triggering the leader callback functions")
for _, cb := range s.leaderCallbacks {
if err := cb(ctx); err != nil {
log.Error("failed to execute leader callback function", errs.ZapError(err))
return
}
}
// Try to create raft cluster.
if err := s.createRaftCluster(); err != nil {
log.Error("failed to create raft cluster", errs.ZapError(err))
return
}
defer s.stopRaftCluster()
failpoint.Inject("rebaseErr", func() {
failpoint.Return()
})
if err := s.idAllocator.Rebase(); err != nil {
log.Error("failed to sync id from etcd", errs.ZapError(err))
return
}
// PromoteSelf to accept the remaining service, such as GetStore, GetRegion.
s.member.PromoteSelf()
member.ServiceMemberGauge.WithLabelValues(PD).Set(1)
defer resetLeaderOnce.Do(func() {
// as soon as cancel the leadership keepalive, then other member have chance
// to be new leader.
cancel()
s.member.Resign()
member.ServiceMemberGauge.WithLabelValues(PD).Set(0)
})
CheckPDVersionWithClusterVersion(s.persistOptions)
log.Info("PD leader is ready to serve", zap.String("leader-name", s.Name()))
leaderTicker := time.NewTicker(mcs.LeaderTickInterval)
defer leaderTicker.Stop()
for {
select {
case <-leaderTicker.C:
if !s.member.IsServing() {
log.Info("no longer a leader because lease has expired, PD leader will step down")
return
}
// add failpoint to test exit leader, failpoint judge the member is the give value, then break
failpoint.Inject("exitCampaignLeader", func(val failpoint.Value) {
memberString := val.(string)
memberID, _ := strconv.ParseUint(memberString, 10, 64)
if s.member.ID() == memberID {
log.Info("exit PD leader")
failpoint.Return()
}
})
etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
log.Info("etcd leader changed, resigns pd leadership", zap.String("old-pd-leader-name", s.Name()))
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed")
return
}
}
}
func (s *Server) etcdLeaderLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
ticker := time.NewTicker(s.cfg.LeaderPriorityCheckInterval.Duration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.member.CheckPriority(ctx)
// Note: we reset the ticker here to support updating configuration dynamically.
ticker.Reset(s.cfg.LeaderPriorityCheckInterval.Duration)
case <-ctx.Done():
log.Info("server is closed, exit etcd leader loop")
return
}
}
}
func (s *Server) reloadConfigFromKV() error {
err := s.persistOptions.Reload(s.storage)
if err != nil {
return err
}
err = s.serviceMiddlewarePersistOptions.Reload(s.storage)
if err != nil {
return err
}
s.loadRateLimitConfig()
s.loadGRPCRateLimitConfig()
s.loadKeyspaceConfig()
useRegionStorage := s.persistOptions.IsUseRegionStorage()
regionStorage := storage.TrySwitchRegionStorage(s.storage, useRegionStorage)
if regionStorage != nil {
if useRegionStorage {
log.Info("server enable region storage")
} else {
log.Info("server disable region storage")
}
}
return nil
}
func (s *Server) loadKeyspaceConfig() {
if s.keyspaceManager == nil {
return
}
cfg := s.persistOptions.GetKeyspaceConfig()
s.keyspaceManager.UpdateConfig(cfg)
}
func (s *Server) loadRateLimitConfig() {
cfg := s.serviceMiddlewarePersistOptions.GetRateLimitConfig().LimiterConfig
for key := range cfg {
value := cfg[key]
s.serviceRateLimiter.Update(key, ratelimit.UpdateDimensionConfig(&value))
}
}
func (s *Server) loadGRPCRateLimitConfig() {
cfg := s.serviceMiddlewarePersistOptions.GetGRPCRateLimitConfig().LimiterConfig
for key := range cfg {
value := cfg[key]
s.grpcServiceRateLimiter.Update(key, ratelimit.UpdateDimensionConfig(&value))
}
}
// ReplicateFileToMember is used to synchronize state to a member.
// Each member will write `data` to a local file named `name`.
// For security reason, data should be in JSON format.
func (s *Server) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error {
clientUrls := member.GetClientUrls()
if len(clientUrls) == 0 {
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()))
return errs.ErrClientURLEmpty.FastGenByArgs()
}
url := clientUrls[0] + filepath.Join("/pd/api/v1/admin/persist-file", name)
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(data))
req.Header.Set(apiutil.PDAllowFollowerHandleHeader, "true")
res, err := s.httpClient.Do(req)
if err != nil {
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), errs.ZapError(err))
return errs.ErrSendRequest.Wrap(err).GenWithStackByCause()
}
// Since we don't read the body, we can close it immediately.
res.Body.Close()
if res.StatusCode != http.StatusOK {
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), zap.Int("status-code", res.StatusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
return nil
}
// PersistFile saves a file in DataDir.
func (s *Server) PersistFile(name string, data []byte) error {
if name != replication.DrStatusFile {
return errors.New("Invalid file name")
}
log.Info("persist file", zap.String("name", name), zap.Binary("data", data))
path := filepath.Join(s.GetConfig().DataDir, name)
if !apiutil.IsPathInDirectory(path, s.GetConfig().DataDir) {
return errors.New("Invalid file path")
}
return os.WriteFile(path, data, 0644) // #nosec
}
// SaveTTLConfig save ttl config
func (s *Server) SaveTTLConfig(data map[string]any, ttl time.Duration) error {
for k := range data {
if !config.IsSupportedTTLConfig(k) {
return fmt.Errorf("unsupported ttl config %s", k)
}
}
for k, v := range data {
var valueStr string
switch val := v.(type) {
case float64:
// math.Trunc(val) returns the integer part of val
if val == math.Trunc(val) {
valueStr = strconv.FormatInt(int64(val), 10)
} else {
valueStr = strconv.FormatFloat(val, 'f', -1, 64)
}
case int, int8, int16, int32, int64:
valueStr = fmt.Sprintf("%d", val)
case uint, uint8, uint16, uint32, uint64:
valueStr = fmt.Sprintf("%d", val)
default:
valueStr = fmt.Sprint(v)
}
if err := s.persistOptions.SetTTLData(s.ctx, s.client, k, valueStr, ttl); err != nil {
return err
}
}
return nil
}
// IsTTLConfigExist returns true if the ttl config is existed for a given config.
func (s *Server) IsTTLConfigExist(key string) bool {
if config.IsSupportedTTLConfig(key) {
if _, ok := s.persistOptions.GetTTLData(key); ok {
return true
}
}
return false
}
// MarkSnapshotRecovering mark pd that we're recovering
// tikv will get this state during BR EBS restore.
// we write this info into etcd for simplicity, the key only stays inside etcd temporary
// during BR EBS restore in which period the cluster is not able to serve request.
// and is deleted after BR EBS restore is done.
func (s *Server) MarkSnapshotRecovering() error {
log.Info("mark snapshot recovering")
markPath := keypath.RecoveringMarkPath()
// the value doesn't matter, set to a static string
_, err := kv.NewSlowLogTxn(s.client).
If(clientv3.Compare(clientv3.CreateRevision(markPath), "=", 0)).
Then(clientv3.OpPut(markPath, "on")).
Commit()
// if other client already marked, return success too
return err
}
// IsSnapshotRecovering check whether recovering-mark marked
func (s *Server) IsSnapshotRecovering(ctx context.Context) (bool, error) {
resp, err := s.client.Get(ctx, keypath.RecoveringMarkPath())
if err != nil {
return false, err
}
return len(resp.Kvs) > 0, nil
}
// UnmarkSnapshotRecovering unmark recovering mark
func (s *Server) UnmarkSnapshotRecovering(ctx context.Context) error {
log.Info("unmark snapshot recovering")
_, err := s.client.Delete(ctx, keypath.RecoveringMarkPath())
// if other client already unmarked, return success too
return err
}
// MarkPitrRestoreMode mark pd that we're pitr restore mode
func (s *Server) MarkPitrRestoreMode() error {
log.Info("mark pitr restore mode")
markPath := keypath.PitrRestoreModeMarkPath()
// the value doesn't matter, set to a static string
_, err := kv.NewSlowLogTxn(s.client).
If(clientv3.Compare(clientv3.CreateRevision(markPath), "=", 0)).
Then(clientv3.OpPut(markPath, "on")).
Commit()
// if other client already marked, return success too
return err
}
// IsPitrRestoreMode check whether pitr-restore-mode-mark marked
func (s *Server) IsPitrRestoreMode(ctx context.Context) (bool, error) {
resp, err := s.client.Get(ctx, keypath.PitrRestoreModeMarkPath())
if err != nil {
return false, err
}
return len(resp.Kvs) > 0, nil
}
// UnmarkPitrRestoreMode unmark pitr restore mode mark
func (s *Server) UnmarkPitrRestoreMode(ctx context.Context) error {
log.Info("unmark pitr restore mode")
_, err := s.client.Delete(ctx, keypath.PitrRestoreModeMarkPath())
// if other client already unmarked, return success too
return err
}
// GetServicePrimaryAddr returns the primary address for a given service.
// Note: This function will only return primary address without judging if it's alive.
func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) {
ticker := time.NewTicker(retryIntervalGetServicePrimary)
defer ticker.Stop()
for range maxRetryTimesGetServicePrimary {
if v, ok := s.servicePrimaryMap.Load(serviceName); ok {
return v.(string), true
}
select {
case <-s.ctx.Done():
return "", false
case <-ctx.Done():
return "", false
case <-ticker.C:
}
}
return "", false
}
// SetServicePrimaryAddr sets the primary address directly.
// Note: This function is only used for test.
func (s *Server) SetServicePrimaryAddr(serviceName, addr string) {
s.servicePrimaryMap.Store(serviceName, addr)
}
func (s *Server) initTSOPrimaryWatcher() {
serviceName := mcs.TSOServiceName
tsoServicePrimaryKey := keypath.ElectionPath(&keypath.MsParam{
ServiceName: mcs.TSOServiceName,
GroupID: constant.DefaultKeyspaceGroupID,
})
s.tsoPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, tsoServicePrimaryKey)
s.tsoPrimaryWatcher.StartWatchLoop()
}
func (s *Server) initSchedulingPrimaryWatcher() {
serviceName := mcs.SchedulingServiceName
primaryKey := keypath.ElectionPath(&keypath.MsParam{
ServiceName: mcs.SchedulingServiceName,
})
s.schedulingPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, primaryKey)
s.schedulingPrimaryWatcher.StartWatchLoop()
}
func (s *Server) initServicePrimaryWatcher(serviceName string, primaryKey string) *etcdutil.LoopWatcher {
putFn := func(kv *mvccpb.KeyValue) error {
primary := member.NewParticipantByService(serviceName)
if err := proto.Unmarshal(kv.Value, primary); err != nil {
return err
}
listenUrls := primary.GetListenUrls()
if len(listenUrls) > 0 {
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
log.Info("update service primary", zap.String("service-name", serviceName), zap.String("primary", listenUrls[0]))
}
return nil
}
deleteFn := func(*mvccpb.KeyValue) error {
var oldPrimary string
v, ok := s.servicePrimaryMap.Load(serviceName)
if ok {
oldPrimary = v.(string)
}
log.Info("delete service primary", zap.String("service-name", serviceName), zap.String("old-primary", oldPrimary))
s.servicePrimaryMap.Delete(serviceName)
return nil
}
name := fmt.Sprintf("%s-primary-watcher", serviceName)
return etcdutil.NewLoopWatcher(
s.serverLoopCtx,
&s.serverLoopWg,
s.client,
name,
primaryKey,
func([]*clientv3.Event) error { return nil },
putFn,
deleteFn,
func([]*clientv3.Event) error { return nil },
false, /* withPrefix */
)
}
// RecoverAllocID recover alloc id. set current base id to input id
func (s *Server) RecoverAllocID(_ context.Context, id uint64) error {
return s.idAllocator.SetBase(id)
}
// GetExternalTS returns external timestamp.
func (s *Server) GetExternalTS() uint64 {
rc := s.GetRaftCluster()
if rc == nil {
return 0
}
return rc.GetExternalTS()
}
// SetExternalTS returns external timestamp.
func (s *Server) SetExternalTS(externalTS, globalTS uint64) error {
if tsoutil.CompareTimestampUint64(externalTS, globalTS) == 1 {
desc := "the external timestamp should not be larger than global ts"
log.Error(desc, zap.Uint64("request-timestamp", externalTS), zap.Uint64("global-ts", globalTS))
return errors.New(desc)
}
c := s.GetRaftCluster()
if c == nil {
return errs.ErrNotBootstrapped.FastGenByArgs()
}
currentExternalTS := c.GetExternalTS()
if tsoutil.CompareTimestampUint64(externalTS, currentExternalTS) != 1 {
desc := "the external timestamp should be larger than current external timestamp"
log.Error(desc, zap.Uint64("request", externalTS), zap.Uint64("current", currentExternalTS))
return errors.New(desc)
}
return c.SetExternalTS(externalTS)
}
// GetMaxConcurrentTSOProxyStreamings returns the max concurrent TSO proxy streamings.
// If the value is negative, there is no limit.
func (s *Server) GetMaxConcurrentTSOProxyStreamings() int {
return s.cfg.GetMaxConcurrentTSOProxyStreamings()
}
// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client.
func (s *Server) GetTSOProxyRecvFromClientTimeout() time.Duration {
return s.cfg.GetTSOProxyRecvFromClientTimeout()
}
// GetLease returns the leader lease.
func (s *Server) GetLease() int64 {
return s.cfg.GetLease()
}
// GetTSOSaveInterval returns TSO save interval.
func (s *Server) GetTSOSaveInterval() time.Duration {
return s.cfg.GetTSOSaveInterval()
}
// GetTSOUpdatePhysicalInterval returns TSO update physical interval.
func (s *Server) GetTSOUpdatePhysicalInterval() time.Duration {
return s.cfg.GetTSOUpdatePhysicalInterval()
}
// GetMaxResetTSGap gets the max gap to reset the tso.
func (s *Server) GetMaxResetTSGap() time.Duration {
return s.persistOptions.GetMaxResetTSGap()
}
// SetClient sets the etcd client.
// Notes: it is only used for test.
func (s *Server) SetClient(client *clientv3.Client) {
s.client = client
}