mirror of https://github.com/tikv/pd.git
2167 lines
70 KiB
Go
2167 lines
70 KiB
Go
// Copyright 2016 TiKV Project Authors.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/coreos/go-semver/semver"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/gorilla/mux"
|
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
|
etcdtypes "go.etcd.io/etcd/client/pkg/v3/types"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
"go.etcd.io/etcd/server/v3/embed"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/kvproto/pkg/diagnosticspb"
|
|
"github.com/pingcap/kvproto/pkg/keyspacepb"
|
|
"github.com/pingcap/kvproto/pkg/metapb"
|
|
"github.com/pingcap/kvproto/pkg/pdpb"
|
|
"github.com/pingcap/kvproto/pkg/tsopb"
|
|
"github.com/pingcap/log"
|
|
"github.com/pingcap/sysutil"
|
|
|
|
"github.com/tikv/pd/pkg/audit"
|
|
bs "github.com/tikv/pd/pkg/basicserver"
|
|
"github.com/tikv/pd/pkg/cgroup"
|
|
"github.com/tikv/pd/pkg/core"
|
|
"github.com/tikv/pd/pkg/encryption"
|
|
"github.com/tikv/pd/pkg/errs"
|
|
"github.com/tikv/pd/pkg/gc"
|
|
"github.com/tikv/pd/pkg/id"
|
|
"github.com/tikv/pd/pkg/keyspace"
|
|
"github.com/tikv/pd/pkg/keyspace/constant"
|
|
ms_server "github.com/tikv/pd/pkg/mcs/metastorage/server"
|
|
"github.com/tikv/pd/pkg/mcs/registry"
|
|
rm_server "github.com/tikv/pd/pkg/mcs/resourcemanager/server"
|
|
mcs "github.com/tikv/pd/pkg/mcs/utils/constant"
|
|
"github.com/tikv/pd/pkg/member"
|
|
"github.com/tikv/pd/pkg/metering"
|
|
"github.com/tikv/pd/pkg/ratelimit"
|
|
"github.com/tikv/pd/pkg/replication"
|
|
sc "github.com/tikv/pd/pkg/schedule/config"
|
|
"github.com/tikv/pd/pkg/schedule/hbstream"
|
|
"github.com/tikv/pd/pkg/schedule/placement"
|
|
"github.com/tikv/pd/pkg/storage"
|
|
"github.com/tikv/pd/pkg/storage/endpoint"
|
|
"github.com/tikv/pd/pkg/storage/kv"
|
|
"github.com/tikv/pd/pkg/syncer"
|
|
"github.com/tikv/pd/pkg/systimemon"
|
|
"github.com/tikv/pd/pkg/tso"
|
|
"github.com/tikv/pd/pkg/utils/apiutil"
|
|
"github.com/tikv/pd/pkg/utils/etcdutil"
|
|
"github.com/tikv/pd/pkg/utils/grpcutil"
|
|
"github.com/tikv/pd/pkg/utils/jsonutil"
|
|
"github.com/tikv/pd/pkg/utils/keypath"
|
|
"github.com/tikv/pd/pkg/utils/logutil"
|
|
"github.com/tikv/pd/pkg/utils/syncutil"
|
|
"github.com/tikv/pd/pkg/utils/tsoutil"
|
|
"github.com/tikv/pd/pkg/utils/typeutil"
|
|
"github.com/tikv/pd/pkg/versioninfo"
|
|
"github.com/tikv/pd/server/cluster"
|
|
"github.com/tikv/pd/server/config"
|
|
|
|
_ "github.com/tikv/pd/pkg/mcs/resourcemanager/server/apis/v1" // init API group
|
|
_ "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1" // init tso API group
|
|
)
|
|
|
|
const (
|
|
serverMetricsInterval = time.Minute
|
|
pdAPIPrefix = "/pd/"
|
|
|
|
// PD is name of member.
|
|
PD = "PD"
|
|
|
|
// maxRetryTimesGetServicePrimary is the max retry times for getting primary addr.
|
|
// Note: it need to be less than client.defaultPDTimeout
|
|
maxRetryTimesGetServicePrimary = 25
|
|
// retryIntervalGetServicePrimary is the retry interval for getting primary addr.
|
|
retryIntervalGetServicePrimary = 100 * time.Millisecond
|
|
|
|
lostPDLeaderMaxTimeoutSecs = 10
|
|
lostPDLeaderReElectionFactor = 10
|
|
)
|
|
|
|
// EtcdStartTimeout the timeout of the startup etcd.
|
|
var EtcdStartTimeout = time.Minute * 5
|
|
|
|
var (
|
|
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
|
|
etcdTermGauge = etcdStateGauge.WithLabelValues("term")
|
|
etcdAppliedIndexGauge = etcdStateGauge.WithLabelValues("appliedIndex")
|
|
etcdCommittedIndexGauge = etcdStateGauge.WithLabelValues("committedIndex")
|
|
)
|
|
|
|
type streamWrapper struct {
|
|
tsopb.TSO_TsoClient
|
|
syncutil.Mutex
|
|
}
|
|
|
|
// Server is the pd server. It implements bs.Server
|
|
type Server struct {
|
|
diagnosticspb.DiagnosticsServer
|
|
|
|
// Server state. 0 is not running, 1 is running.
|
|
isRunning int64
|
|
|
|
// Server start timestamp
|
|
startTimestamp int64
|
|
|
|
// Configs and initial fields.
|
|
cfg *config.Config
|
|
serviceMiddlewareCfg *config.ServiceMiddlewareConfig
|
|
etcdCfg *embed.Config
|
|
serviceMiddlewarePersistOptions *config.ServiceMiddlewarePersistOptions
|
|
persistOptions *config.PersistOptions
|
|
handler *Handler
|
|
|
|
ctx context.Context
|
|
serverLoopCtx context.Context
|
|
serverLoopCancel func()
|
|
serverLoopWg sync.WaitGroup
|
|
|
|
// for PD leader election.
|
|
member *member.Member
|
|
// etcd client
|
|
client *clientv3.Client
|
|
// electionClient is used for leader election.
|
|
electionClient *clientv3.Client
|
|
// http client
|
|
httpClient *http.Client
|
|
|
|
// Server services.
|
|
// for id allocator, we can use one allocator for
|
|
// store, region and peer, because we just need
|
|
// a unique ID.
|
|
idAllocator id.Allocator
|
|
// for encryption
|
|
encryptionKeyManager *encryption.Manager
|
|
// for storage operation.
|
|
storage storage.Storage
|
|
// GC states manager
|
|
gcStateManager *gc.GCStateManager
|
|
// keyspace manager
|
|
keyspaceManager *keyspace.Manager
|
|
// keyspace group manager
|
|
keyspaceGroupManager *keyspace.GroupManager
|
|
// metering writer
|
|
meteringWriter *metering.Writer
|
|
// for basicCluster operation.
|
|
basicCluster *core.BasicCluster
|
|
// for tso.
|
|
tsoAllocator *tso.Allocator
|
|
// for raft cluster
|
|
cluster *cluster.RaftCluster
|
|
// For async region heartbeat.
|
|
hbStreams *hbstream.HeartbeatStreams
|
|
// Zap logger
|
|
lg *zap.Logger
|
|
logProps *log.ZapProperties
|
|
|
|
// Callback functions for different stages
|
|
// startCallbacks will be called after the server is started.
|
|
startCallbacks []func()
|
|
// leaderCallbacks will be called after the server becomes leader.
|
|
leaderCallbacks []func(context.Context) error
|
|
// closeCallbacks will be called before the server is closed.
|
|
closeCallbacks []func()
|
|
|
|
// hot region history info storage
|
|
hotRegionStorage *storage.HotRegionStorage
|
|
// Store as map[string]*grpc.ClientConn
|
|
clientConns sync.Map
|
|
|
|
tsoClientPool struct {
|
|
syncutil.RWMutex
|
|
clients map[string]*streamWrapper
|
|
}
|
|
|
|
// tsoDispatcher is used to dispatch different TSO requests to
|
|
// the corresponding forwarding TSO channel.
|
|
tsoDispatcher *tsoutil.TSODispatcher
|
|
// tsoProtoFactory is the abstract factory for creating tso
|
|
// related data structures defined in the TSO grpc service
|
|
tsoProtoFactory *tsoutil.TSOProtoFactory
|
|
// pdProtoFactory is the abstract factory for creating tso
|
|
// related data structures defined in the PD grpc service
|
|
pdProtoFactory *tsoutil.PDProtoFactory
|
|
|
|
serviceRateLimiter *ratelimit.Controller
|
|
serviceLabels map[string][]apiutil.AccessPath
|
|
apiServiceLabelMap map[apiutil.AccessPath]string
|
|
|
|
grpcServiceRateLimiter *ratelimit.Controller
|
|
grpcServiceLabels map[string]struct{}
|
|
grpcServer *grpc.Server
|
|
|
|
serviceAuditBackendLabels map[string]*audit.BackendLabels
|
|
|
|
auditBackends []audit.Backend
|
|
|
|
registry *registry.ServiceRegistry
|
|
isKeyspaceGroupEnabled bool
|
|
servicePrimaryMap sync.Map /* Store as map[string]string */
|
|
tsoPrimaryWatcher *etcdutil.LoopWatcher
|
|
schedulingPrimaryWatcher *etcdutil.LoopWatcher
|
|
|
|
// Cgroup Monitor
|
|
cgMonitor cgroup.Monitor
|
|
}
|
|
|
|
// HandlerBuilder builds a server HTTP handler.
|
|
type HandlerBuilder func(context.Context, *Server) (http.Handler, apiutil.APIServiceGroup, error)
|
|
|
|
// CreateServer creates the UNINITIALIZED pd server with given configuration.
|
|
func CreateServer(ctx context.Context, cfg *config.Config, services []string, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
|
|
// TODO: Currently, whether we enable microservice or not is determined by the service list.
|
|
// It's equal to whether we enable the keyspace group or not.
|
|
// But indeed the keyspace group is independent of the microservice.
|
|
// There could be the following scenarios:
|
|
// 1. Enable microservice but disable keyspace group. (non-serverless scenario)
|
|
// 2. Enable microservice and enable keyspace group. (serverless scenario)
|
|
// 3. Disable microservice and disable keyspace group. (both serverless scenario and non-serverless scenario)
|
|
// We should separate the keyspace group from the microservice later.
|
|
isKeyspaceGroupEnabled := len(services) != 0
|
|
log.Info("PD config", zap.Bool("enable-keyspace-group", isKeyspaceGroupEnabled), zap.Reflect("config", cfg))
|
|
serviceMiddlewareCfg := config.NewServiceMiddlewareConfig()
|
|
|
|
s := &Server{
|
|
cfg: cfg,
|
|
persistOptions: config.NewPersistOptions(cfg),
|
|
serviceMiddlewareCfg: serviceMiddlewareCfg,
|
|
serviceMiddlewarePersistOptions: config.NewServiceMiddlewarePersistOptions(serviceMiddlewareCfg),
|
|
member: &member.Member{},
|
|
ctx: ctx,
|
|
startTimestamp: time.Now().Unix(),
|
|
DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename),
|
|
isKeyspaceGroupEnabled: isKeyspaceGroupEnabled,
|
|
tsoClientPool: struct {
|
|
syncutil.RWMutex
|
|
clients map[string]*streamWrapper
|
|
}{
|
|
clients: make(map[string]*streamWrapper),
|
|
},
|
|
}
|
|
s.handler = newHandler(s)
|
|
|
|
// create audit backend
|
|
s.auditBackends = []audit.Backend{
|
|
audit.NewLocalLogBackend(true),
|
|
audit.NewPrometheusBackend(serviceAuditHistogram, serviceAuditCounter, false),
|
|
}
|
|
s.serviceRateLimiter = ratelimit.NewController(s.ctx, "http", apiConcurrencyGauge)
|
|
s.grpcServiceRateLimiter = ratelimit.NewController(s.ctx, "grpc", apiConcurrencyGauge)
|
|
s.serviceAuditBackendLabels = make(map[string]*audit.BackendLabels)
|
|
s.serviceLabels = make(map[string][]apiutil.AccessPath)
|
|
s.grpcServiceLabels = make(map[string]struct{})
|
|
s.apiServiceLabelMap = make(map[apiutil.AccessPath]string)
|
|
|
|
// Adjust etcd config.
|
|
etcdCfg, err := s.cfg.GenEmbedEtcdConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(legacyServiceBuilders) != 0 {
|
|
userHandlers, err := combineBuilderServerHTTPService(ctx, s, legacyServiceBuilders...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
etcdCfg.UserHandlers = userHandlers
|
|
}
|
|
// New way to register services.
|
|
s.registry = registry.NewServerServiceRegistry()
|
|
s.registry.RegisterService("MetaStorage", ms_server.NewService)
|
|
s.registry.RegisterService("ResourceManager", rm_server.NewService[*Server])
|
|
// Register the microservices REST path.
|
|
s.registry.InstallAllRESTHandler(s, etcdCfg.UserHandlers)
|
|
|
|
etcdCfg.ServiceRegister = func(gs *grpc.Server) {
|
|
grpcServer := &GrpcServer{Server: s}
|
|
pdpb.RegisterPDServer(gs, grpcServer)
|
|
keyspacepb.RegisterKeyspaceServer(gs, &KeyspaceServer{GrpcServer: grpcServer})
|
|
diagnosticspb.RegisterDiagnosticsServer(gs, s)
|
|
// Register the microservices GRPC service.
|
|
s.registry.InstallAllGRPCServices(s, gs)
|
|
s.grpcServer = gs
|
|
}
|
|
s.etcdCfg = etcdCfg
|
|
s.lg = cfg.Logger
|
|
s.logProps = cfg.LogProps
|
|
return s, nil
|
|
}
|
|
|
|
func (s *Server) startEtcd(ctx context.Context) error {
|
|
newCtx, cancel := context.WithTimeout(ctx, EtcdStartTimeout)
|
|
defer cancel()
|
|
|
|
etcd, err := embed.StartEtcd(s.etcdCfg)
|
|
if err != nil {
|
|
return errs.ErrStartEtcd.Wrap(err).GenWithStackByCause()
|
|
}
|
|
|
|
// Check cluster ID
|
|
urlMap, err := etcdtypes.NewURLsMap(s.cfg.InitialCluster)
|
|
if err != nil {
|
|
return errs.ErrEtcdURLMap.Wrap(err).GenWithStackByCause()
|
|
}
|
|
tlsConfig, err := s.cfg.Security.ToClientTLSConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = etcdutil.CheckClusterID(etcd.Server.Cluster().ID(), urlMap, tlsConfig); err != nil {
|
|
return err
|
|
}
|
|
|
|
select {
|
|
// Wait etcd until it is ready to use
|
|
case <-etcd.Server.ReadyNotify():
|
|
case <-newCtx.Done():
|
|
return errs.ErrCancelStartEtcd.FastGenByArgs()
|
|
}
|
|
|
|
// Start the etcd and HTTP clients, then init the member.
|
|
err = s.startClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = s.initMember(newCtx, etcd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.initGRPCServiceLabels()
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) initGRPCServiceLabels() {
|
|
for name, serviceInfo := range s.grpcServer.GetServiceInfo() {
|
|
if name == gRPCServiceName {
|
|
for _, methodInfo := range serviceInfo.Methods {
|
|
s.grpcServiceLabels[methodInfo.Name] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) startClient() error {
|
|
tlsConfig, err := s.cfg.Security.ToClientTLSConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
etcdCfg, err := s.cfg.GenEmbedEtcdConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
/* Starting two different etcd clients here is to avoid the throttling. */
|
|
// This etcd client will be used to access the etcd cluster to read and write all kinds of meta data.
|
|
s.client, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.AdvertiseClientUrls, "server-etcd-client")
|
|
if err != nil {
|
|
return errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
|
|
}
|
|
// This etcd client will only be used to read and write the election-related data, such as leader key.
|
|
s.electionClient, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.AdvertiseClientUrls, "election-etcd-client")
|
|
if err != nil {
|
|
return errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
|
|
}
|
|
s.httpClient = etcdutil.CreateHTTPClient(tlsConfig)
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) initMember(ctx context.Context, etcd *embed.Etcd) error {
|
|
// Update advertise peer URLs.
|
|
etcdMembers, err := etcdutil.ListEtcdMembers(ctx, s.client)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
etcdServerID := uint64(etcd.Server.ID())
|
|
for _, m := range etcdMembers.Members {
|
|
if etcdServerID == m.ID {
|
|
etcdPeerURLs := strings.Join(m.PeerURLs, ",")
|
|
if s.cfg.AdvertisePeerUrls != etcdPeerURLs {
|
|
log.Info("update advertise peer urls", zap.String("from", s.cfg.AdvertisePeerUrls), zap.String("to", etcdPeerURLs))
|
|
s.cfg.AdvertisePeerUrls = etcdPeerURLs
|
|
}
|
|
}
|
|
}
|
|
failpoint.Inject("memberNil", func() {
|
|
time.Sleep(1500 * time.Millisecond)
|
|
})
|
|
s.member = member.NewMember(etcd, s.electionClient, etcdServerID)
|
|
return nil
|
|
}
|
|
|
|
// AddStartCallback adds a callback in the startServer phase.
|
|
func (s *Server) AddStartCallback(callbacks ...func()) {
|
|
s.startCallbacks = append(s.startCallbacks, callbacks...)
|
|
}
|
|
|
|
func (s *Server) startServer(ctx context.Context) error {
|
|
clusterID, err := endpoint.InitClusterID(s.client)
|
|
if err != nil {
|
|
log.Error("failed to init cluster id", errs.ZapError(err))
|
|
return err
|
|
}
|
|
// It may lose accuracy if use float64 to store uint64. So we store the cluster id in label.
|
|
metadataGauge.WithLabelValues(fmt.Sprintf("cluster%d", clusterID)).Set(0)
|
|
bs.ServerInfoGauge.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix()))
|
|
|
|
s.member.InitMemberInfo(s.cfg.AdvertiseClientUrls, s.cfg.AdvertisePeerUrls, s.Name())
|
|
if err := s.member.SetMemberDeployPath(s.member.ID()); err != nil {
|
|
return err
|
|
}
|
|
if err := s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion); err != nil {
|
|
return err
|
|
}
|
|
if err := s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash); err != nil {
|
|
return err
|
|
}
|
|
s.idAllocator = id.NewAllocator(&id.AllocatorParams{
|
|
Client: s.client,
|
|
Label: id.DefaultLabel,
|
|
Member: s.member.MemberValue(),
|
|
})
|
|
s.encryptionKeyManager, err = encryption.NewManager(s.client, &s.cfg.Security.Encryption)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Initialize an etcd storage as the default storage.
|
|
defaultStorage := storage.NewStorageWithEtcdBackend(s.client)
|
|
// Initialize a specialized LevelDB storage to store the region-related meta info independently.
|
|
regionStorage, err := storage.NewRegionStorageWithLevelDBBackend(
|
|
ctx,
|
|
filepath.Join(s.cfg.DataDir, "region-meta"),
|
|
s.encryptionKeyManager)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.storage = storage.NewCoreStorage(defaultStorage, regionStorage)
|
|
s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize)
|
|
s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
|
|
s.pdProtoFactory = &tsoutil.PDProtoFactory{}
|
|
s.tsoAllocator = tso.NewAllocator(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.storage, s)
|
|
s.basicCluster = core.NewBasicCluster()
|
|
s.cluster = cluster.NewRaftCluster(ctx, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocator)
|
|
keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{
|
|
Client: s.client,
|
|
Label: id.KeyspaceLabel,
|
|
Member: s.member.MemberValue(),
|
|
Step: keyspace.AllocStep,
|
|
})
|
|
if s.IsKeyspaceGroupEnabled() {
|
|
s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client)
|
|
}
|
|
s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager)
|
|
// Only start the metering writer if a valid metering config is provided.
|
|
if len(s.cfg.Metering.Type) > 0 {
|
|
s.meteringWriter, err = metering.NewWriter(s.ctx, &s.cfg.Metering, fmt.Sprintf("pd%d", s.GetMember().ID()))
|
|
if err != nil {
|
|
log.Warn("failed to initialize the metering writer", errs.ZapError(err))
|
|
} else {
|
|
s.meteringWriter.Start()
|
|
}
|
|
} else {
|
|
log.Info("no metering config provided, the metering writer will not be started")
|
|
}
|
|
s.gcStateManager = gc.NewGCStateManager(s.storage.GetGCStateProvider(), s.cfg.PDServerCfg, s.keyspaceManager)
|
|
s.hbStreams = hbstream.NewHeartbeatStreams(ctx, "", s.cluster)
|
|
// initial hot_region_storage in here.
|
|
|
|
s.hotRegionStorage, err = storage.NewHotRegionsStorage(
|
|
ctx, filepath.Join(s.cfg.DataDir, "hot-region"), s.encryptionKeyManager, s.handler)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Run callbacks
|
|
log.Info("triggering the start callback functions")
|
|
for _, cb := range s.startCallbacks {
|
|
cb()
|
|
}
|
|
|
|
// to init all rate limiter and metrics
|
|
for service := range s.serviceLabels {
|
|
s.serviceRateLimiter.Update(service, ratelimit.InitLimiter())
|
|
}
|
|
for service := range s.grpcServiceLabels {
|
|
s.grpcServiceRateLimiter.Update(service, ratelimit.InitLimiter())
|
|
}
|
|
|
|
failpoint.InjectCall("delayStartServer")
|
|
// Server has started.
|
|
atomic.StoreInt64(&s.isRunning, 1)
|
|
bs.ServerMaxProcsGauge.Set(float64(runtime.GOMAXPROCS(0)))
|
|
return nil
|
|
}
|
|
|
|
// AddCloseCallback adds a callback in the Close phase.
|
|
func (s *Server) AddCloseCallback(callbacks ...func()) {
|
|
s.closeCallbacks = append(s.closeCallbacks, callbacks...)
|
|
}
|
|
|
|
// Close closes the server.
|
|
func (s *Server) Close() {
|
|
if !atomic.CompareAndSwapInt64(&s.isRunning, 1, 0) {
|
|
// server is already closed
|
|
return
|
|
}
|
|
|
|
log.Info("closing server")
|
|
|
|
s.cgMonitor.StopMonitor()
|
|
|
|
s.stopServerLoop()
|
|
if s.IsKeyspaceGroupEnabled() {
|
|
s.keyspaceGroupManager.Close()
|
|
}
|
|
if s.tsoAllocator != nil {
|
|
s.tsoAllocator.Close()
|
|
}
|
|
if s.meteringWriter != nil {
|
|
s.meteringWriter.Stop()
|
|
}
|
|
|
|
if s.client != nil {
|
|
if err := s.client.Close(); err != nil {
|
|
log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err))
|
|
}
|
|
}
|
|
if s.electionClient != nil {
|
|
if err := s.electionClient.Close(); err != nil {
|
|
log.Error("close election client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err))
|
|
}
|
|
}
|
|
|
|
if s.httpClient != nil {
|
|
s.httpClient.CloseIdleConnections()
|
|
}
|
|
|
|
if s.member.Etcd() != nil {
|
|
s.member.Close()
|
|
}
|
|
|
|
if s.hbStreams != nil {
|
|
s.hbStreams.Close()
|
|
}
|
|
if err := s.storage.Close(); err != nil {
|
|
log.Error("close storage meet error", errs.ZapError(err))
|
|
}
|
|
|
|
if s.hotRegionStorage != nil {
|
|
if err := s.hotRegionStorage.Close(); err != nil {
|
|
log.Error("close hot region storage meet error", errs.ZapError(err))
|
|
}
|
|
}
|
|
|
|
s.grpcServiceRateLimiter.Close()
|
|
s.serviceRateLimiter.Close()
|
|
// Run callbacks
|
|
log.Info("triggering the close callback functions")
|
|
for _, cb := range s.closeCallbacks {
|
|
cb()
|
|
}
|
|
|
|
s.clientConns.Range(func(_, value any) bool {
|
|
conn := value.(*grpc.ClientConn)
|
|
if err := conn.Close(); err != nil {
|
|
log.Error("close grpc client meet error", errs.ZapError(err))
|
|
}
|
|
return true
|
|
})
|
|
|
|
log.Info("close server")
|
|
}
|
|
|
|
// IsClosed checks whether server is closed or not.
|
|
func (s *Server) IsClosed() bool {
|
|
return atomic.LoadInt64(&s.isRunning) == 0
|
|
}
|
|
|
|
// Run runs the pd server.
|
|
func (s *Server) Run() error {
|
|
go systimemon.StartMonitor(s.ctx, time.Now, func() {
|
|
log.Error("system time jumps backward", errs.ZapError(errs.ErrIncorrectSystemTime))
|
|
timeJumpBackCounter.Inc()
|
|
})
|
|
if err := s.startEtcd(s.ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.startServer(s.ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.cgMonitor.StartMonitor(s.ctx)
|
|
|
|
failpoint.Inject("delayStartServerLoop", func() {
|
|
time.Sleep(2 * time.Second)
|
|
})
|
|
s.startServerLoop(s.ctx)
|
|
|
|
return nil
|
|
}
|
|
|
|
// SetServiceAuditBackendForHTTP is used to register service audit config for HTTP.
|
|
func (s *Server) SetServiceAuditBackendForHTTP(route *mux.Route, labels ...string) {
|
|
if len(route.GetName()) == 0 {
|
|
return
|
|
}
|
|
if len(labels) > 0 {
|
|
s.SetServiceAuditBackendLabels(route.GetName(), labels)
|
|
}
|
|
}
|
|
|
|
// Context returns the context of server.
|
|
func (s *Server) Context() context.Context {
|
|
return s.ctx
|
|
}
|
|
|
|
// LoopContext returns the loop context of server.
|
|
func (s *Server) LoopContext() context.Context {
|
|
return s.serverLoopCtx
|
|
}
|
|
|
|
func (s *Server) startServerLoop(ctx context.Context) {
|
|
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx)
|
|
s.serverLoopWg.Add(4)
|
|
go s.leaderLoop()
|
|
go s.etcdLeaderLoop()
|
|
go s.serverMetricsLoop()
|
|
go s.encryptionKeyManagerLoop()
|
|
if s.IsKeyspaceGroupEnabled() {
|
|
s.initTSOPrimaryWatcher()
|
|
s.initSchedulingPrimaryWatcher()
|
|
}
|
|
}
|
|
|
|
func (s *Server) stopServerLoop() {
|
|
s.serverLoopCancel()
|
|
s.serverLoopWg.Wait()
|
|
}
|
|
|
|
func (s *Server) serverMetricsLoop() {
|
|
defer logutil.LogPanic()
|
|
defer s.serverLoopWg.Done()
|
|
|
|
ctx, cancel := context.WithCancel(s.serverLoopCtx)
|
|
defer cancel()
|
|
ticker := time.NewTicker(serverMetricsInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
s.collectEtcdStateMetrics()
|
|
case <-ctx.Done():
|
|
log.Info("server is closed, exit metrics loop")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// encryptionKeyManagerLoop is used to start monitor encryption key changes.
|
|
func (s *Server) encryptionKeyManagerLoop() {
|
|
defer logutil.LogPanic()
|
|
defer s.serverLoopWg.Done()
|
|
|
|
ctx, cancel := context.WithCancel(s.serverLoopCtx)
|
|
defer cancel()
|
|
s.encryptionKeyManager.StartBackgroundLoop(ctx)
|
|
log.Info("server is closed, exist encryption key manager loop")
|
|
}
|
|
|
|
func (s *Server) collectEtcdStateMetrics() {
|
|
etcdTermGauge.Set(float64(s.member.Etcd().Server.Term()))
|
|
etcdAppliedIndexGauge.Set(float64(s.member.Etcd().Server.AppliedIndex()))
|
|
etcdCommittedIndexGauge.Set(float64(s.member.Etcd().Server.CommittedIndex()))
|
|
}
|
|
|
|
func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapResponse, error) {
|
|
clusterID := keypath.ClusterID()
|
|
|
|
log.Info("try to bootstrap raft cluster",
|
|
zap.Uint64("cluster-id", clusterID),
|
|
zap.String("request", fmt.Sprintf("%v", req)))
|
|
|
|
if err := checkBootstrapRequest(req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
clusterMeta := metapb.Cluster{
|
|
Id: clusterID,
|
|
MaxPeerCount: uint32(s.persistOptions.GetMaxReplicas()),
|
|
}
|
|
|
|
// Set cluster meta
|
|
clusterValue, err := clusterMeta.Marshal()
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
var ops []clientv3.Op
|
|
ops = append(ops, clientv3.OpPut(keypath.ClusterPath(), string(clusterValue)))
|
|
|
|
// Set bootstrap time
|
|
// Because we will write the cluster meta into etcd directly,
|
|
// so we need to handle the root key path manually here.
|
|
timeData := typeutil.Uint64ToBytes(uint64(time.Now().UnixNano()))
|
|
ops = append(ops, clientv3.OpPut(keypath.ClusterBootstrapTimePath(), string(timeData)))
|
|
|
|
// Set store meta
|
|
storeMeta := req.GetStore()
|
|
storeValue, err := storeMeta.Marshal()
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
ops = append(ops, clientv3.OpPut(keypath.StorePath(storeMeta.GetId()), string(storeValue)))
|
|
|
|
regionValue, err := req.GetRegion().Marshal()
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
// Set region meta with region id.
|
|
ops = append(ops, clientv3.OpPut(keypath.RegionPath(req.GetRegion().GetId()), string(regionValue)))
|
|
|
|
// TODO: we must figure out a better way to handle bootstrap failed, maybe intervene manually.
|
|
bootstrapCmp := clientv3.Compare(clientv3.CreateRevision(keypath.ClusterPath()), "=", 0)
|
|
resp, err := kv.NewSlowLogTxn(s.client).If(bootstrapCmp).Then(ops...).Commit()
|
|
if err != nil {
|
|
return nil, errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause()
|
|
}
|
|
if !resp.Succeeded {
|
|
log.Warn("cluster already bootstrapped", zap.Uint64("cluster-id", clusterID))
|
|
return nil, errs.ErrEtcdTxnConflict.FastGenByArgs()
|
|
}
|
|
|
|
log.Info("bootstrap cluster ok", zap.Uint64("cluster-id", clusterID))
|
|
err = s.storage.SaveRegion(req.GetRegion())
|
|
if err != nil {
|
|
log.Warn("save the bootstrap region failed", errs.ZapError(err))
|
|
}
|
|
err = s.storage.Flush()
|
|
if err != nil {
|
|
log.Warn("flush the bootstrap region failed", errs.ZapError(err))
|
|
}
|
|
|
|
if err := s.cluster.Start(s, true); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = s.GetKeyspaceManager().Bootstrap(); err != nil {
|
|
log.Warn("bootstrapping keyspace manager failed", errs.ZapError(err))
|
|
}
|
|
|
|
return &pdpb.BootstrapResponse{
|
|
ReplicationStatus: s.cluster.GetReplicationMode().GetReplicationStatus(),
|
|
}, nil
|
|
}
|
|
|
|
func (s *Server) createRaftCluster() error {
|
|
if s.cluster.IsRunning() {
|
|
return nil
|
|
}
|
|
|
|
return s.cluster.Start(s, false)
|
|
}
|
|
|
|
func (s *Server) stopRaftCluster() {
|
|
failpoint.Inject("raftclusterIsBusy", func() {})
|
|
s.cluster.Stop()
|
|
}
|
|
|
|
// IsKeyspaceGroupEnabled return whether the server is in PD.
|
|
func (s *Server) IsKeyspaceGroupEnabled() bool {
|
|
return s.isKeyspaceGroupEnabled
|
|
}
|
|
|
|
// GetAddr returns the server urls for clients.
|
|
func (s *Server) GetAddr() string {
|
|
return s.cfg.AdvertiseClientUrls
|
|
}
|
|
|
|
// GetClientScheme returns the client URL scheme
|
|
func (s *Server) GetClientScheme() string {
|
|
if len(s.cfg.Security.CertPath) == 0 && len(s.cfg.Security.KeyPath) == 0 {
|
|
return "http"
|
|
}
|
|
return "https"
|
|
}
|
|
|
|
// GetMemberInfo returns the server member information.
|
|
func (s *Server) GetMemberInfo() *pdpb.Member {
|
|
return typeutil.DeepClone(s.member.Member(), core.MemberFactory)
|
|
}
|
|
|
|
// GetHandler returns the handler for API.
|
|
func (s *Server) GetHandler() *Handler {
|
|
return s.handler
|
|
}
|
|
|
|
// GetEndpoints returns the etcd endpoints for outer use.
|
|
func (s *Server) GetEndpoints() []string {
|
|
return s.client.Endpoints()
|
|
}
|
|
|
|
// GetClient returns builtin etcd client.
|
|
func (s *Server) GetClient() *clientv3.Client {
|
|
return s.client
|
|
}
|
|
|
|
// GetHTTPClient returns builtin http client.
|
|
func (s *Server) GetHTTPClient() *http.Client {
|
|
return s.httpClient
|
|
}
|
|
|
|
// GetLeader returns the leader of PD cluster(i.e the PD leader).
|
|
func (s *Server) GetLeader() *pdpb.Member {
|
|
return s.member.GetLeader()
|
|
}
|
|
|
|
// GetServingUrls gets service endpoints.
|
|
func (s *Server) GetServingUrls() []string {
|
|
return s.member.GetServingUrls()
|
|
}
|
|
|
|
// GetMember returns the member of server.
|
|
func (s *Server) GetMember() *member.Member {
|
|
return s.member
|
|
}
|
|
|
|
// GetStorage returns the backend storage of server.
|
|
func (s *Server) GetStorage() storage.Storage {
|
|
return s.storage
|
|
}
|
|
|
|
// GetGCStateManager returns the GC state manager of the server.
|
|
func (s *Server) GetGCStateManager() *gc.GCStateManager {
|
|
return s.gcStateManager
|
|
}
|
|
|
|
// GetHistoryHotRegionStorage returns the backend storage of historyHotRegion.
|
|
func (s *Server) GetHistoryHotRegionStorage() *storage.HotRegionStorage {
|
|
return s.hotRegionStorage
|
|
}
|
|
|
|
// SetStorage changes the storage only for test purpose.
|
|
// When we use it, we should prevent calling GetStorage, otherwise, it may cause a data race problem.
|
|
func (s *Server) SetStorage(storage storage.Storage) {
|
|
s.storage = storage
|
|
}
|
|
|
|
// GetBasicCluster returns the basic cluster of server.
|
|
func (s *Server) GetBasicCluster() *core.BasicCluster {
|
|
return s.basicCluster
|
|
}
|
|
|
|
// GetPersistOptions returns the schedule option.
|
|
func (s *Server) GetPersistOptions() *config.PersistOptions {
|
|
return s.persistOptions
|
|
}
|
|
|
|
// GetServiceMiddlewarePersistOptions returns the service middleware persist option.
|
|
func (s *Server) GetServiceMiddlewarePersistOptions() *config.ServiceMiddlewarePersistOptions {
|
|
return s.serviceMiddlewarePersistOptions
|
|
}
|
|
|
|
// GetHBStreams returns the heartbeat streams.
|
|
func (s *Server) GetHBStreams() *hbstream.HeartbeatStreams {
|
|
return s.hbStreams
|
|
}
|
|
|
|
// GetAllocator returns the ID allocator of server.
|
|
func (s *Server) GetAllocator() id.Allocator {
|
|
return s.idAllocator
|
|
}
|
|
|
|
// GetTSOAllocator returns the TSO Allocator.
|
|
func (s *Server) GetTSOAllocator() *tso.Allocator {
|
|
return s.tsoAllocator
|
|
}
|
|
|
|
// GetKeyspaceManager returns the keyspace manager of server.
|
|
func (s *Server) GetKeyspaceManager() *keyspace.Manager {
|
|
return s.keyspaceManager
|
|
}
|
|
|
|
// SetKeyspaceManager sets the keyspace manager of server.
|
|
// Note: it is only used for test.
|
|
func (s *Server) SetKeyspaceManager(keyspaceManager *keyspace.Manager) {
|
|
s.keyspaceManager = keyspaceManager
|
|
}
|
|
|
|
// GetKeyspaceGroupManager returns the keyspace group manager of server.
|
|
func (s *Server) GetKeyspaceGroupManager() *keyspace.GroupManager {
|
|
return s.keyspaceGroupManager
|
|
}
|
|
|
|
// SetKeyspaceGroupManager sets the keyspace group manager of server.
|
|
// Note: it is only used for test.
|
|
func (s *Server) SetKeyspaceGroupManager(keyspaceGroupManager *keyspace.GroupManager) {
|
|
s.keyspaceGroupManager = keyspaceGroupManager
|
|
}
|
|
|
|
// GetMeteringWriter returns the metering writer.
|
|
func (s *Server) GetMeteringWriter() *metering.Writer {
|
|
return s.meteringWriter
|
|
}
|
|
|
|
// Name returns the unique etcd Name for this server in etcd cluster.
|
|
func (s *Server) Name() string {
|
|
return s.cfg.Name
|
|
}
|
|
|
|
// StartTimestamp returns the start timestamp of this server
|
|
func (s *Server) StartTimestamp() int64 {
|
|
return s.startTimestamp
|
|
}
|
|
|
|
// GetMembers returns PD server list.
|
|
func (s *Server) GetMembers() ([]*pdpb.Member, error) {
|
|
if s.IsClosed() {
|
|
return nil, errs.ErrServerNotStarted.FastGenByArgs()
|
|
}
|
|
return cluster.GetMembers(s.GetClient())
|
|
}
|
|
|
|
// GetServiceMiddlewareConfig gets the service middleware config information.
|
|
func (s *Server) GetServiceMiddlewareConfig() *config.ServiceMiddlewareConfig {
|
|
cfg := s.serviceMiddlewareCfg.Clone()
|
|
cfg.AuditConfig = *s.serviceMiddlewarePersistOptions.GetAuditConfig().Clone()
|
|
cfg.RateLimitConfig = *s.serviceMiddlewarePersistOptions.GetRateLimitConfig().Clone()
|
|
cfg.GRPCRateLimitConfig = *s.serviceMiddlewarePersistOptions.GetGRPCRateLimitConfig().Clone()
|
|
return cfg
|
|
}
|
|
|
|
// GetConfig gets the config information.
|
|
func (s *Server) GetConfig() *config.Config {
|
|
cfg := s.cfg.Clone()
|
|
cfg.Schedule = *s.persistOptions.GetScheduleConfig().Clone()
|
|
cfg.Replication = *s.persistOptions.GetReplicationConfig().Clone()
|
|
cfg.PDServerCfg = *s.persistOptions.GetPDServerConfig().Clone()
|
|
cfg.ReplicationMode = *s.persistOptions.GetReplicationModeConfig()
|
|
cfg.Keyspace = *s.persistOptions.GetKeyspaceConfig().Clone()
|
|
cfg.Microservice = *s.persistOptions.GetMicroserviceConfig().Clone()
|
|
cfg.LabelProperty = s.persistOptions.GetLabelPropertyConfig().Clone()
|
|
cfg.ClusterVersion = *s.persistOptions.GetClusterVersion()
|
|
return cfg
|
|
}
|
|
|
|
// GetKeyspaceConfig gets the keyspace config information.
|
|
func (s *Server) GetKeyspaceConfig() *config.KeyspaceConfig {
|
|
return s.persistOptions.GetKeyspaceConfig().Clone()
|
|
}
|
|
|
|
// SetKeyspaceConfig sets the keyspace config information.
|
|
func (s *Server) SetKeyspaceConfig(cfg config.KeyspaceConfig) error {
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
old := s.persistOptions.GetKeyspaceConfig()
|
|
s.persistOptions.SetKeyspaceConfig(&cfg)
|
|
if err := s.persistOptions.Persist(s.storage); err != nil {
|
|
s.persistOptions.SetKeyspaceConfig(old)
|
|
log.Error("failed to update keyspace config",
|
|
zap.Reflect("new", cfg),
|
|
zap.Reflect("old", old),
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
s.keyspaceManager.UpdateConfig(&cfg)
|
|
log.Info("keyspace config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
|
|
return nil
|
|
}
|
|
|
|
// GetMicroserviceConfig gets the microservice config information.
|
|
func (s *Server) GetMicroserviceConfig() *config.MicroserviceConfig {
|
|
return s.persistOptions.GetMicroserviceConfig().Clone()
|
|
}
|
|
|
|
// SetMicroserviceConfig sets the microservice config information.
|
|
func (s *Server) SetMicroserviceConfig(cfg config.MicroserviceConfig) error {
|
|
old := s.persistOptions.GetMicroserviceConfig()
|
|
s.persistOptions.SetMicroserviceConfig(&cfg)
|
|
if err := s.persistOptions.Persist(s.storage); err != nil {
|
|
s.persistOptions.SetMicroserviceConfig(old)
|
|
log.Error("failed to update microservice config",
|
|
zap.Reflect("new", cfg),
|
|
zap.Reflect("old", old),
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
log.Info("microservice config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
|
|
return nil
|
|
}
|
|
|
|
// GetScheduleConfig gets the balance config information.
|
|
func (s *Server) GetScheduleConfig() *sc.ScheduleConfig {
|
|
return s.persistOptions.GetScheduleConfig().Clone()
|
|
}
|
|
|
|
// SetScheduleConfig sets the balance config information.
|
|
// This function is exported to be used by the API.
|
|
func (s *Server) SetScheduleConfig(cfg sc.ScheduleConfig) error {
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
if err := cfg.Deprecated(); err != nil {
|
|
return err
|
|
}
|
|
old := s.persistOptions.GetScheduleConfig()
|
|
s.persistOptions.SetScheduleConfig(&cfg)
|
|
if err := s.persistOptions.Persist(s.storage); err != nil {
|
|
s.persistOptions.SetScheduleConfig(old)
|
|
log.Error("failed to update schedule config",
|
|
zap.Reflect("new", cfg),
|
|
zap.Reflect("old", old),
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
// Update the scheduling halt status at the same time.
|
|
s.persistOptions.SetSchedulingAllowanceStatus(cfg.HaltScheduling, "manually")
|
|
log.Info("schedule config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
|
|
return nil
|
|
}
|
|
|
|
// GetReplicationConfig get the replication config.
|
|
func (s *Server) GetReplicationConfig() *sc.ReplicationConfig {
|
|
return s.persistOptions.GetReplicationConfig().Clone()
|
|
}
|
|
|
|
// SetReplicationConfig sets the replication config.
|
|
func (s *Server) SetReplicationConfig(cfg sc.ReplicationConfig) error {
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
old := s.persistOptions.GetReplicationConfig()
|
|
if cfg.EnablePlacementRules != old.EnablePlacementRules {
|
|
rc := s.GetRaftCluster()
|
|
if rc == nil {
|
|
return errs.ErrNotBootstrapped.GenWithStackByArgs()
|
|
}
|
|
if cfg.EnablePlacementRules {
|
|
// initialize rule manager.
|
|
if err := rc.GetRuleManager().Initialize(int(cfg.MaxReplicas), cfg.LocationLabels, cfg.IsolationLevel, false); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
// NOTE: can be removed after placement rules feature is enabled by default.
|
|
for _, s := range rc.GetStores() {
|
|
if !s.IsRemoved() && s.IsTiFlash() {
|
|
return errors.New("cannot disable placement rules with TiFlash nodes")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var rule *placement.Rule
|
|
if cfg.EnablePlacementRules {
|
|
rc := s.GetRaftCluster()
|
|
if rc == nil {
|
|
return errs.ErrNotBootstrapped.GenWithStackByArgs()
|
|
}
|
|
// replication.MaxReplicas won't work when placement rule is enabled and not only have one default rule.
|
|
defaultRule := rc.GetRuleManager().GetRule(placement.DefaultGroupID, placement.DefaultRuleID)
|
|
|
|
CheckInDefaultRule := func() error {
|
|
// replication config won't work when placement rule is enabled and exceeds one default rule
|
|
if defaultRule == nil ||
|
|
len(defaultRule.StartKey) != 0 || len(defaultRule.EndKey) != 0 {
|
|
return errors.New("cannot update MaxReplicas, LocationLabels or IsolationLevel when placement rules feature is enabled and not only default rule exists, please update rule instead")
|
|
}
|
|
if defaultRule.Count != int(old.MaxReplicas) || !typeutil.AreStringSlicesEqual(defaultRule.LocationLabels, []string(old.LocationLabels)) || defaultRule.IsolationLevel != old.IsolationLevel {
|
|
return errors.New("cannot to update replication config, the default rules do not consistent with replication config, please update rule instead")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if cfg.MaxReplicas != old.MaxReplicas || !typeutil.AreStringSlicesEqual(cfg.LocationLabels, old.LocationLabels) || cfg.IsolationLevel != old.IsolationLevel {
|
|
if err := CheckInDefaultRule(); err != nil {
|
|
return err
|
|
}
|
|
rule = defaultRule
|
|
}
|
|
}
|
|
|
|
if rule != nil {
|
|
rule.Count = int(cfg.MaxReplicas)
|
|
rule.LocationLabels = cfg.LocationLabels
|
|
rule.IsolationLevel = cfg.IsolationLevel
|
|
rc := s.GetRaftCluster()
|
|
if rc == nil {
|
|
return errs.ErrNotBootstrapped.GenWithStackByArgs()
|
|
}
|
|
if err := rc.GetRuleManager().SetRule(rule); err != nil {
|
|
log.Error("failed to update rule count",
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
s.persistOptions.SetReplicationConfig(&cfg)
|
|
if err := s.persistOptions.Persist(s.storage); err != nil {
|
|
s.persistOptions.SetReplicationConfig(old)
|
|
if rule != nil {
|
|
rule.Count = int(old.MaxReplicas)
|
|
rc := s.GetRaftCluster()
|
|
if rc == nil {
|
|
return errs.ErrNotBootstrapped.GenWithStackByArgs()
|
|
}
|
|
if e := rc.GetRuleManager().SetRule(rule); e != nil {
|
|
log.Error("failed to roll back count of rule when update replication config", errs.ZapError(e))
|
|
}
|
|
}
|
|
log.Error("failed to update replication config",
|
|
zap.Reflect("new", cfg),
|
|
zap.Reflect("old", old),
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
log.Info("replication config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
|
|
return nil
|
|
}
|
|
|
|
// GetAuditConfig gets the audit config information.
|
|
func (s *Server) GetAuditConfig() *config.AuditConfig {
|
|
return s.serviceMiddlewarePersistOptions.GetAuditConfig().Clone()
|
|
}
|
|
|
|
// SetAuditConfig sets the audit config.
|
|
func (s *Server) SetAuditConfig(cfg config.AuditConfig) error {
|
|
old := s.serviceMiddlewarePersistOptions.GetAuditConfig()
|
|
s.serviceMiddlewarePersistOptions.SetAuditConfig(&cfg)
|
|
if err := s.serviceMiddlewarePersistOptions.Persist(s.storage); err != nil {
|
|
s.serviceMiddlewarePersistOptions.SetAuditConfig(old)
|
|
log.Error("failed to update Audit config",
|
|
zap.Reflect("new", cfg),
|
|
zap.Reflect("old", old),
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
log.Info("audit config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
|
|
return nil
|
|
}
|
|
|
|
// UpdateRateLimitConfig is used to update rate-limit config which will reserve old limiter-config
|
|
func (s *Server) UpdateRateLimitConfig(key, label string, value ratelimit.DimensionConfig) error {
|
|
cfg := s.GetServiceMiddlewareConfig()
|
|
rateLimitCfg := make(map[string]ratelimit.DimensionConfig)
|
|
for label, item := range cfg.RateLimitConfig.LimiterConfig {
|
|
rateLimitCfg[label] = item
|
|
}
|
|
rateLimitCfg[label] = value
|
|
return s.UpdateRateLimit(&cfg.RateLimitConfig, key, &rateLimitCfg)
|
|
}
|
|
|
|
// UpdateRateLimit is used to update rate-limit config which will overwrite limiter-config
|
|
func (s *Server) UpdateRateLimit(cfg *config.RateLimitConfig, key string, value any) error {
|
|
updated, found, err := jsonutil.AddKeyValue(cfg, key, value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !found {
|
|
return errors.Errorf("config item %s not found", key)
|
|
}
|
|
|
|
if updated {
|
|
err = s.SetRateLimitConfig(*cfg)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// GetRateLimitConfig gets the rate limit config information.
|
|
func (s *Server) GetRateLimitConfig() *config.RateLimitConfig {
|
|
return s.serviceMiddlewarePersistOptions.GetRateLimitConfig().Clone()
|
|
}
|
|
|
|
// SetRateLimitConfig sets the rate limit config.
|
|
func (s *Server) SetRateLimitConfig(cfg config.RateLimitConfig) error {
|
|
old := s.serviceMiddlewarePersistOptions.GetRateLimitConfig()
|
|
s.serviceMiddlewarePersistOptions.SetRateLimitConfig(&cfg)
|
|
if err := s.serviceMiddlewarePersistOptions.Persist(s.storage); err != nil {
|
|
s.serviceMiddlewarePersistOptions.SetRateLimitConfig(old)
|
|
log.Error("failed to update rate limit config",
|
|
zap.Reflect("new", cfg),
|
|
zap.Reflect("old", old),
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
log.Info("rate limit config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
|
|
return nil
|
|
}
|
|
|
|
// UpdateGRPCRateLimitConfig is used to update rate-limit config which will reserve old limiter-config
|
|
func (s *Server) UpdateGRPCRateLimitConfig(key, label string, value ratelimit.DimensionConfig) error {
|
|
cfg := s.GetServiceMiddlewareConfig()
|
|
rateLimitCfg := make(map[string]ratelimit.DimensionConfig)
|
|
for label, item := range cfg.GRPCRateLimitConfig.LimiterConfig {
|
|
rateLimitCfg[label] = item
|
|
}
|
|
rateLimitCfg[label] = value
|
|
return s.UpdateGRPCRateLimit(&cfg.GRPCRateLimitConfig, key, &rateLimitCfg)
|
|
}
|
|
|
|
// UpdateGRPCRateLimit is used to update gRPC rate-limit config which will overwrite limiter-config
|
|
func (s *Server) UpdateGRPCRateLimit(cfg *config.GRPCRateLimitConfig, key string, value any) error {
|
|
updated, found, err := jsonutil.AddKeyValue(cfg, key, value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !found {
|
|
return errors.Errorf("config item %s not found", key)
|
|
}
|
|
|
|
if updated {
|
|
err = s.SetGRPCRateLimitConfig(*cfg)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// GetGRPCRateLimitConfig gets the rate limit config information.
|
|
func (s *Server) GetGRPCRateLimitConfig() *config.GRPCRateLimitConfig {
|
|
return s.serviceMiddlewarePersistOptions.GetGRPCRateLimitConfig().Clone()
|
|
}
|
|
|
|
// SetGRPCRateLimitConfig sets the rate limit config.
|
|
func (s *Server) SetGRPCRateLimitConfig(cfg config.GRPCRateLimitConfig) error {
|
|
old := s.serviceMiddlewarePersistOptions.GetGRPCRateLimitConfig()
|
|
s.serviceMiddlewarePersistOptions.SetGRPCRateLimitConfig(&cfg)
|
|
if err := s.serviceMiddlewarePersistOptions.Persist(s.storage); err != nil {
|
|
s.serviceMiddlewarePersistOptions.SetGRPCRateLimitConfig(old)
|
|
log.Error("failed to update gRPC rate limit config",
|
|
zap.Reflect("new", cfg),
|
|
zap.Reflect("old", old),
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
log.Info("gRPC rate limit config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
|
|
return nil
|
|
}
|
|
|
|
// GetPDServerConfig gets the balance config information.
|
|
func (s *Server) GetPDServerConfig() *config.PDServerConfig {
|
|
return s.persistOptions.GetPDServerConfig().Clone()
|
|
}
|
|
|
|
// SetPDServerConfig sets the server config.
|
|
func (s *Server) SetPDServerConfig(cfg config.PDServerConfig) error {
|
|
switch cfg.DashboardAddress {
|
|
case "auto":
|
|
case "none":
|
|
default:
|
|
if !strings.HasPrefix(cfg.DashboardAddress, "http") {
|
|
cfg.DashboardAddress = fmt.Sprintf("%s://%s", s.GetClientScheme(), cfg.DashboardAddress)
|
|
}
|
|
if !cluster.IsClientURL(cfg.DashboardAddress, s.client) {
|
|
return errors.Errorf("%s is not the client url of any member", cfg.DashboardAddress)
|
|
}
|
|
}
|
|
if err := cfg.Validate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
old := s.persistOptions.GetPDServerConfig()
|
|
s.persistOptions.SetPDServerConfig(&cfg)
|
|
if err := s.persistOptions.Persist(s.storage); err != nil {
|
|
s.persistOptions.SetPDServerConfig(old)
|
|
log.Error("failed to update PDServer config",
|
|
zap.Reflect("new", cfg),
|
|
zap.Reflect("old", old),
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
log.Info("PD server config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
|
|
return nil
|
|
}
|
|
|
|
// SetLabelPropertyConfig sets the label property config.
|
|
func (s *Server) SetLabelPropertyConfig(cfg config.LabelPropertyConfig) error {
|
|
old := s.persistOptions.GetLabelPropertyConfig()
|
|
s.persistOptions.SetLabelPropertyConfig(cfg)
|
|
if err := s.persistOptions.Persist(s.storage); err != nil {
|
|
s.persistOptions.SetLabelPropertyConfig(old)
|
|
log.Error("failed to update label property config",
|
|
zap.Reflect("new", cfg),
|
|
zap.Reflect("old", &old),
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
log.Info("label property config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
|
|
return nil
|
|
}
|
|
|
|
// SetLabelProperty inserts a label property config.
|
|
func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error {
|
|
s.persistOptions.SetLabelProperty(typ, labelKey, labelValue)
|
|
err := s.persistOptions.Persist(s.storage)
|
|
if err != nil {
|
|
s.persistOptions.DeleteLabelProperty(typ, labelKey, labelValue)
|
|
log.Error("failed to update label property config",
|
|
zap.String("typ", typ),
|
|
zap.String("label-key", labelKey),
|
|
zap.String("label-value", labelValue),
|
|
zap.Reflect("config", s.persistOptions.GetLabelPropertyConfig()),
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
|
|
log.Info("label property config is updated", zap.Reflect("config", s.persistOptions.GetLabelPropertyConfig()))
|
|
return nil
|
|
}
|
|
|
|
// DeleteLabelProperty deletes a label property config.
|
|
func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error {
|
|
s.persistOptions.DeleteLabelProperty(typ, labelKey, labelValue)
|
|
err := s.persistOptions.Persist(s.storage)
|
|
if err != nil {
|
|
s.persistOptions.SetLabelProperty(typ, labelKey, labelValue)
|
|
log.Error("failed to delete label property config",
|
|
zap.String("typ", typ),
|
|
zap.String("label-key", labelKey),
|
|
zap.String("label-value", labelValue),
|
|
zap.Reflect("config", s.persistOptions.GetLabelPropertyConfig()),
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
|
|
log.Info("label property config is deleted", zap.Reflect("config", s.persistOptions.GetLabelPropertyConfig()))
|
|
return nil
|
|
}
|
|
|
|
// GetLabelProperty returns the whole label property config.
|
|
func (s *Server) GetLabelProperty() config.LabelPropertyConfig {
|
|
return s.persistOptions.GetLabelPropertyConfig().Clone()
|
|
}
|
|
|
|
// SetClusterVersion sets the version of cluster.
|
|
func (s *Server) SetClusterVersion(v string) error {
|
|
version, err := versioninfo.ParseVersion(v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
old := s.persistOptions.GetClusterVersion()
|
|
s.persistOptions.SetClusterVersion(version)
|
|
err = s.persistOptions.Persist(s.storage)
|
|
if err != nil {
|
|
s.persistOptions.SetClusterVersion(old)
|
|
log.Error("failed to update cluster version",
|
|
zap.String("old-version", old.String()),
|
|
zap.String("new-version", v),
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
log.Info("cluster version is updated", zap.String("new-version", v))
|
|
return nil
|
|
}
|
|
|
|
// GetClusterVersion returns the version of cluster.
|
|
func (s *Server) GetClusterVersion() semver.Version {
|
|
return *s.persistOptions.GetClusterVersion()
|
|
}
|
|
|
|
// GetTLSConfig get the security config.
|
|
func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
|
|
return &s.cfg.Security.TLSConfig
|
|
}
|
|
|
|
// GetControllerConfig gets the resource manager controller config.
|
|
func (s *Server) GetControllerConfig() *rm_server.ControllerConfig {
|
|
return &s.cfg.Controller
|
|
}
|
|
|
|
// GetRaftCluster gets Raft cluster.
|
|
// If cluster has not been bootstrapped, return nil.
|
|
func (s *Server) GetRaftCluster() *cluster.RaftCluster {
|
|
if s.IsClosed() || !s.cluster.IsRunning() {
|
|
return nil
|
|
}
|
|
return s.cluster
|
|
}
|
|
|
|
// IsServiceIndependent returns whether the service is independent.
|
|
func (s *Server) IsServiceIndependent(name string) bool {
|
|
if s.isKeyspaceGroupEnabled && !s.IsClosed() {
|
|
if name == mcs.TSOServiceName && !s.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
|
|
return true
|
|
}
|
|
return s.cluster.IsServiceIndependent(name)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// DirectlyGetRaftCluster returns raft cluster directly.
|
|
// Only used for test.
|
|
func (s *Server) DirectlyGetRaftCluster() *cluster.RaftCluster {
|
|
return s.cluster
|
|
}
|
|
|
|
// GetCluster gets cluster.
|
|
func (s *Server) GetCluster() *metapb.Cluster {
|
|
return &metapb.Cluster{
|
|
Id: keypath.ClusterID(),
|
|
MaxPeerCount: uint32(s.persistOptions.GetMaxReplicas()),
|
|
}
|
|
}
|
|
|
|
// GetServerOption gets the option of the server.
|
|
func (s *Server) GetServerOption() *config.PersistOptions {
|
|
return s.persistOptions
|
|
}
|
|
|
|
// GetMetaRegions gets meta regions from cluster.
|
|
func (s *Server) GetMetaRegions() []*metapb.Region {
|
|
rc := s.GetRaftCluster()
|
|
if rc != nil {
|
|
return rc.GetMetaRegions()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetRegions gets regions from cluster.
|
|
func (s *Server) GetRegions() []*core.RegionInfo {
|
|
rc := s.GetRaftCluster()
|
|
if rc != nil {
|
|
return rc.GetRegions()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetServiceLabels returns ApiAccessPaths by given service label
|
|
// TODO: this function will be used for updating api rate limit config
|
|
func (s *Server) GetServiceLabels(serviceLabel string) []apiutil.AccessPath {
|
|
if apis, ok := s.serviceLabels[serviceLabel]; ok {
|
|
return apis
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IsGRPCServiceLabelExist returns if the service label exists
|
|
func (s *Server) IsGRPCServiceLabelExist(serviceLabel string) bool {
|
|
_, ok := s.grpcServiceLabels[serviceLabel]
|
|
return ok
|
|
}
|
|
|
|
// GetAPIAccessServiceLabel returns service label by given access path
|
|
// TODO: this function will be used for updating api rate limit config
|
|
func (s *Server) GetAPIAccessServiceLabel(accessPath apiutil.AccessPath) string {
|
|
if serviceLabel, ok := s.apiServiceLabelMap[accessPath]; ok {
|
|
return serviceLabel
|
|
}
|
|
accessPathNoMethod := apiutil.NewAccessPath(accessPath.Path, "")
|
|
if serviceLabel, ok := s.apiServiceLabelMap[accessPathNoMethod]; ok {
|
|
return serviceLabel
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// AddServiceLabel is used to add the relationship between service label and api access path
|
|
// TODO: this function will be used for updating api rate limit config
|
|
func (s *Server) AddServiceLabel(serviceLabel string, accessPath apiutil.AccessPath) {
|
|
if slice, ok := s.serviceLabels[serviceLabel]; ok {
|
|
slice = append(slice, accessPath)
|
|
s.serviceLabels[serviceLabel] = slice
|
|
} else {
|
|
slice = []apiutil.AccessPath{accessPath}
|
|
s.serviceLabels[serviceLabel] = slice
|
|
}
|
|
|
|
s.apiServiceLabelMap[accessPath] = serviceLabel
|
|
}
|
|
|
|
// GetAuditBackend returns audit backends
|
|
func (s *Server) GetAuditBackend() []audit.Backend {
|
|
return s.auditBackends
|
|
}
|
|
|
|
// GetServiceAuditBackendLabels returns audit backend labels by serviceLabel
|
|
func (s *Server) GetServiceAuditBackendLabels(serviceLabel string) *audit.BackendLabels {
|
|
return s.serviceAuditBackendLabels[serviceLabel]
|
|
}
|
|
|
|
// SetServiceAuditBackendLabels is used to add audit backend labels for service by service label
|
|
func (s *Server) SetServiceAuditBackendLabels(serviceLabel string, labels []string) {
|
|
s.serviceAuditBackendLabels[serviceLabel] = &audit.BackendLabels{Labels: labels}
|
|
}
|
|
|
|
// GetServiceRateLimiter is used to get rate limiter
|
|
func (s *Server) GetServiceRateLimiter() *ratelimit.Controller {
|
|
return s.serviceRateLimiter
|
|
}
|
|
|
|
// IsInRateLimitAllowList returns whether given service label is in allow lost
|
|
func (s *Server) IsInRateLimitAllowList(serviceLabel string) bool {
|
|
return s.serviceRateLimiter.IsInAllowList(serviceLabel)
|
|
}
|
|
|
|
// UpdateServiceRateLimiter is used to update RateLimiter
|
|
func (s *Server) UpdateServiceRateLimiter(serviceLabel string, opts ...ratelimit.Option) ratelimit.UpdateStatus {
|
|
return s.serviceRateLimiter.Update(serviceLabel, opts...)
|
|
}
|
|
|
|
// GetGRPCRateLimiter is used to get rate limiter
|
|
func (s *Server) GetGRPCRateLimiter() *ratelimit.Controller {
|
|
return s.grpcServiceRateLimiter
|
|
}
|
|
|
|
// UpdateGRPCServiceRateLimiter is used to update RateLimiter
|
|
func (s *Server) UpdateGRPCServiceRateLimiter(serviceLabel string, opts ...ratelimit.Option) ratelimit.UpdateStatus {
|
|
return s.grpcServiceRateLimiter.Update(serviceLabel, opts...)
|
|
}
|
|
|
|
// GetClusterStatus gets cluster status.
|
|
func (s *Server) GetClusterStatus() (*cluster.Status, error) {
|
|
return s.cluster.LoadClusterStatus()
|
|
}
|
|
|
|
// SetLogLevel sets log level.
|
|
func (s *Server) SetLogLevel(level string) error {
|
|
if !logutil.IsLevelLegal(level) {
|
|
return errors.Errorf("log level %s is illegal", level)
|
|
}
|
|
s.cfg.Log.Level = level
|
|
log.SetLevel(logutil.StringToZapLogLevel(level))
|
|
log.Warn("log level changed", zap.String("level", log.GetLevel().String()))
|
|
return nil
|
|
}
|
|
|
|
// GetReplicationModeConfig returns the replication mode config.
|
|
func (s *Server) GetReplicationModeConfig() *config.ReplicationModeConfig {
|
|
return s.persistOptions.GetReplicationModeConfig().Clone()
|
|
}
|
|
|
|
// SetReplicationModeConfig sets the replication mode.
|
|
func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) error {
|
|
if config.NormalizeReplicationMode(cfg.ReplicationMode) == "" {
|
|
return errors.Errorf("invalid replication mode: %v", cfg.ReplicationMode)
|
|
}
|
|
|
|
old := s.persistOptions.GetReplicationModeConfig()
|
|
s.persistOptions.SetReplicationModeConfig(&cfg)
|
|
if err := s.persistOptions.Persist(s.storage); err != nil {
|
|
s.persistOptions.SetReplicationModeConfig(old)
|
|
log.Error("failed to update replication mode config",
|
|
zap.Reflect("new", cfg),
|
|
zap.Reflect("old", &old),
|
|
errs.ZapError(err))
|
|
return err
|
|
}
|
|
log.Info("replication mode config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
|
|
|
|
rc := s.GetRaftCluster()
|
|
if rc != nil {
|
|
err := rc.GetReplicationMode().UpdateConfig(cfg)
|
|
if err != nil {
|
|
log.Warn("failed to update replication mode", errs.ZapError(err))
|
|
// revert to old config
|
|
// NOTE: since we can't put the 2 storage mutations in a batch, it
|
|
// is possible that memory and persistent data become different
|
|
// (when below revert fail). They will become the same after PD is
|
|
// restart or PD leader is changed.
|
|
s.persistOptions.SetReplicationModeConfig(old)
|
|
revertErr := s.persistOptions.Persist(s.storage)
|
|
if revertErr != nil {
|
|
log.Error("failed to revert replication mode persistent config", errs.ZapError(revertErr))
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// IsServing returns whether the server is the leader if there is embedded etcd, or the primary otherwise.
|
|
func (s *Server) IsServing() bool {
|
|
return s.member.IsServing()
|
|
}
|
|
|
|
// AddServiceReadyCallback adds callbacks when the server becomes the leader if there is embedded etcd, or the primary otherwise.
|
|
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
|
|
s.leaderCallbacks = append(s.leaderCallbacks, callbacks...)
|
|
}
|
|
|
|
func (s *Server) leaderLoop() {
|
|
defer logutil.LogPanic()
|
|
defer s.serverLoopWg.Done()
|
|
|
|
for {
|
|
if s.IsClosed() {
|
|
log.Info("server is closed, return PD leader loop")
|
|
return
|
|
}
|
|
|
|
leader, checkAgain := s.member.CheckLeader()
|
|
// add failpoint to test leader check go to stuck.
|
|
failpoint.Inject("leaderLoopCheckAgain", func(val failpoint.Value) {
|
|
memberString := val.(string)
|
|
memberID, _ := strconv.ParseUint(memberString, 10, 64)
|
|
if s.member.ID() == memberID {
|
|
checkAgain = true
|
|
}
|
|
})
|
|
if checkAgain {
|
|
continue
|
|
}
|
|
if leader != nil {
|
|
err := s.reloadConfigFromKV()
|
|
if err != nil {
|
|
log.Error("reload config failed", errs.ZapError(err))
|
|
continue
|
|
}
|
|
syncer := s.cluster.GetRegionSyncer()
|
|
if s.persistOptions.IsUseRegionStorage() {
|
|
syncer.StartSyncWithLeader(leader.GetListenUrls()[0])
|
|
}
|
|
log.Info("start to watch pd leader", zap.Stringer("pd-leader", leader))
|
|
// WatchLeader will keep looping and never return unless the PD leader has changed.
|
|
leader.Watch(s.serverLoopCtx)
|
|
syncer.StopSyncWithLeader()
|
|
log.Info("pd leader has changed, try to re-campaign a pd leader")
|
|
}
|
|
|
|
// To make sure the etcd leader and PD leader are on the same server.
|
|
etcdLeader := s.member.GetEtcdLeader()
|
|
if etcdLeader != s.member.ID() {
|
|
if s.member.GetLeader() == nil {
|
|
lastUpdated := s.member.GetLastLeaderUpdatedTime()
|
|
// use random timeout to avoid leader campaigning storm.
|
|
randomTimeout := time.Duration(rand.Intn(lostPDLeaderMaxTimeoutSecs))*time.Second + lostPDLeaderMaxTimeoutSecs*time.Second + lostPDLeaderReElectionFactor*s.cfg.ElectionInterval.Duration
|
|
// add failpoint to test the campaign leader logic.
|
|
failpoint.Inject("timeoutWaitPDLeader", func() {
|
|
log.Info("timeoutWaitPDLeader is injected, skip wait other etcd leader be etcd leader")
|
|
randomTimeout = time.Duration(rand.Intn(10))*time.Millisecond + 100*time.Millisecond
|
|
})
|
|
if lastUpdated.Add(randomTimeout).Before(time.Now()) && !lastUpdated.IsZero() && etcdLeader != 0 {
|
|
log.Info("the pd leader is lost for a long time, try to re-campaign a pd leader with resign etcd leader",
|
|
zap.Duration("timeout", randomTimeout),
|
|
zap.Time("last-updated", lastUpdated),
|
|
zap.String("current-leader-member-id", etcdtypes.ID(etcdLeader).String()),
|
|
zap.String("transferee-member-id", etcdtypes.ID(s.member.ID()).String()),
|
|
)
|
|
if err := s.member.MoveEtcdLeader(s.ctx, etcdLeader, s.member.ID()); err != nil {
|
|
log.Error("failed to move etcd leader", errs.ZapError(err))
|
|
}
|
|
}
|
|
}
|
|
log.Info("skip campaigning of pd leader and check later",
|
|
zap.String("server-name", s.Name()),
|
|
zap.Uint64("etcd-leader-id", etcdLeader),
|
|
zap.Uint64("member-id", s.member.ID()))
|
|
time.Sleep(200 * time.Millisecond)
|
|
continue
|
|
}
|
|
s.campaignLeader()
|
|
}
|
|
}
|
|
|
|
func (s *Server) campaignLeader() {
|
|
log.Info("start to campaign PD leader", zap.String("campaign-leader-name", s.Name()))
|
|
if err := s.member.Campaign(s.ctx, s.cfg.LeaderLease); err != nil {
|
|
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
|
|
log.Info("campaign PD leader meets error due to txn conflict, another PD may campaign successfully",
|
|
zap.String("campaign-leader-name", s.Name()))
|
|
} else {
|
|
log.Error("campaign PD leader meets error due to etcd error",
|
|
zap.String("campaign-leader-name", s.Name()),
|
|
errs.ZapError(err))
|
|
}
|
|
return
|
|
}
|
|
|
|
// Start keepalive the leadership and enable TSO service.
|
|
// TSO service is strictly enabled/disabled by PD leader lease for 2 reasons:
|
|
// 1. lease based approach is not affected by thread pause, slow runtime schedule, etc.
|
|
// 2. load region could be slow. Based on lease we can recover TSO service faster.
|
|
ctx, cancel := context.WithCancel(s.serverLoopCtx)
|
|
var resetLeaderOnce sync.Once
|
|
defer resetLeaderOnce.Do(func() {
|
|
cancel()
|
|
s.member.Resign()
|
|
})
|
|
|
|
// maintain the PD leadership, after this, TSO can be service.
|
|
s.member.GetLeadership().Keep(ctx)
|
|
log.Info("campaign PD leader ok", zap.String("campaign-leader-name", s.Name()))
|
|
|
|
if err := s.reloadConfigFromKV(); err != nil {
|
|
log.Error("failed to reload configuration", errs.ZapError(err))
|
|
return
|
|
}
|
|
|
|
if err := s.persistOptions.LoadTTLFromEtcd(s.ctx, s.client); err != nil {
|
|
log.Error("failed to load persistOptions from etcd", errs.ZapError(err))
|
|
return
|
|
}
|
|
|
|
if err := s.encryptionKeyManager.SetLeadership(s.member.GetLeadership()); err != nil {
|
|
log.Error("failed to initialize encryption", errs.ZapError(err))
|
|
return
|
|
}
|
|
|
|
log.Info("triggering the leader callback functions")
|
|
for _, cb := range s.leaderCallbacks {
|
|
if err := cb(ctx); err != nil {
|
|
log.Error("failed to execute leader callback function", errs.ZapError(err))
|
|
return
|
|
}
|
|
}
|
|
|
|
// Try to create raft cluster.
|
|
if err := s.createRaftCluster(); err != nil {
|
|
log.Error("failed to create raft cluster", errs.ZapError(err))
|
|
return
|
|
}
|
|
defer s.stopRaftCluster()
|
|
failpoint.Inject("rebaseErr", func() {
|
|
failpoint.Return()
|
|
})
|
|
if err := s.idAllocator.Rebase(); err != nil {
|
|
log.Error("failed to sync id from etcd", errs.ZapError(err))
|
|
return
|
|
}
|
|
// PromoteSelf to accept the remaining service, such as GetStore, GetRegion.
|
|
s.member.PromoteSelf()
|
|
member.ServiceMemberGauge.WithLabelValues(PD).Set(1)
|
|
defer resetLeaderOnce.Do(func() {
|
|
// as soon as cancel the leadership keepalive, then other member have chance
|
|
// to be new leader.
|
|
cancel()
|
|
s.member.Resign()
|
|
member.ServiceMemberGauge.WithLabelValues(PD).Set(0)
|
|
})
|
|
|
|
CheckPDVersionWithClusterVersion(s.persistOptions)
|
|
log.Info("PD leader is ready to serve", zap.String("leader-name", s.Name()))
|
|
|
|
leaderTicker := time.NewTicker(mcs.LeaderTickInterval)
|
|
defer leaderTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-leaderTicker.C:
|
|
if !s.member.IsServing() {
|
|
log.Info("no longer a leader because lease has expired, PD leader will step down")
|
|
return
|
|
}
|
|
// add failpoint to test exit leader, failpoint judge the member is the give value, then break
|
|
failpoint.Inject("exitCampaignLeader", func(val failpoint.Value) {
|
|
memberString := val.(string)
|
|
memberID, _ := strconv.ParseUint(memberString, 10, 64)
|
|
if s.member.ID() == memberID {
|
|
log.Info("exit PD leader")
|
|
failpoint.Return()
|
|
}
|
|
})
|
|
|
|
etcdLeader := s.member.GetEtcdLeader()
|
|
if etcdLeader != s.member.ID() {
|
|
log.Info("etcd leader changed, resigns pd leadership", zap.String("old-pd-leader-name", s.Name()))
|
|
return
|
|
}
|
|
case <-ctx.Done():
|
|
// Server is closed and it should return nil.
|
|
log.Info("server is closed")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) etcdLeaderLoop() {
|
|
defer logutil.LogPanic()
|
|
defer s.serverLoopWg.Done()
|
|
|
|
ctx, cancel := context.WithCancel(s.serverLoopCtx)
|
|
defer cancel()
|
|
ticker := time.NewTicker(s.cfg.LeaderPriorityCheckInterval.Duration)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
s.member.CheckPriority(ctx)
|
|
// Note: we reset the ticker here to support updating configuration dynamically.
|
|
ticker.Reset(s.cfg.LeaderPriorityCheckInterval.Duration)
|
|
case <-ctx.Done():
|
|
log.Info("server is closed, exit etcd leader loop")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) reloadConfigFromKV() error {
|
|
err := s.persistOptions.Reload(s.storage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = s.serviceMiddlewarePersistOptions.Reload(s.storage)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.loadRateLimitConfig()
|
|
s.loadGRPCRateLimitConfig()
|
|
s.loadKeyspaceConfig()
|
|
useRegionStorage := s.persistOptions.IsUseRegionStorage()
|
|
regionStorage := storage.TrySwitchRegionStorage(s.storage, useRegionStorage)
|
|
if regionStorage != nil {
|
|
if useRegionStorage {
|
|
log.Info("server enable region storage")
|
|
} else {
|
|
log.Info("server disable region storage")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) loadKeyspaceConfig() {
|
|
if s.keyspaceManager == nil {
|
|
return
|
|
}
|
|
cfg := s.persistOptions.GetKeyspaceConfig()
|
|
s.keyspaceManager.UpdateConfig(cfg)
|
|
}
|
|
|
|
func (s *Server) loadRateLimitConfig() {
|
|
cfg := s.serviceMiddlewarePersistOptions.GetRateLimitConfig().LimiterConfig
|
|
for key := range cfg {
|
|
value := cfg[key]
|
|
s.serviceRateLimiter.Update(key, ratelimit.UpdateDimensionConfig(&value))
|
|
}
|
|
}
|
|
|
|
func (s *Server) loadGRPCRateLimitConfig() {
|
|
cfg := s.serviceMiddlewarePersistOptions.GetGRPCRateLimitConfig().LimiterConfig
|
|
for key := range cfg {
|
|
value := cfg[key]
|
|
s.grpcServiceRateLimiter.Update(key, ratelimit.UpdateDimensionConfig(&value))
|
|
}
|
|
}
|
|
|
|
// ReplicateFileToMember is used to synchronize state to a member.
|
|
// Each member will write `data` to a local file named `name`.
|
|
// For security reason, data should be in JSON format.
|
|
func (s *Server) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error {
|
|
clientUrls := member.GetClientUrls()
|
|
if len(clientUrls) == 0 {
|
|
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()))
|
|
return errs.ErrClientURLEmpty.FastGenByArgs()
|
|
}
|
|
url := clientUrls[0] + filepath.Join("/pd/api/v1/admin/persist-file", name)
|
|
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(data))
|
|
req.Header.Set(apiutil.PDAllowFollowerHandleHeader, "true")
|
|
res, err := s.httpClient.Do(req)
|
|
if err != nil {
|
|
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), errs.ZapError(err))
|
|
return errs.ErrSendRequest.Wrap(err).GenWithStackByCause()
|
|
}
|
|
// Since we don't read the body, we can close it immediately.
|
|
res.Body.Close()
|
|
if res.StatusCode != http.StatusOK {
|
|
log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), zap.Int("status-code", res.StatusCode))
|
|
return errs.ErrSendRequest.FastGenByArgs()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PersistFile saves a file in DataDir.
|
|
func (s *Server) PersistFile(name string, data []byte) error {
|
|
if name != replication.DrStatusFile {
|
|
return errors.New("Invalid file name")
|
|
}
|
|
log.Info("persist file", zap.String("name", name), zap.Binary("data", data))
|
|
path := filepath.Join(s.GetConfig().DataDir, name)
|
|
if !apiutil.IsPathInDirectory(path, s.GetConfig().DataDir) {
|
|
return errors.New("Invalid file path")
|
|
}
|
|
return os.WriteFile(path, data, 0644) // #nosec
|
|
}
|
|
|
|
// SaveTTLConfig save ttl config
|
|
func (s *Server) SaveTTLConfig(data map[string]any, ttl time.Duration) error {
|
|
for k := range data {
|
|
if !config.IsSupportedTTLConfig(k) {
|
|
return fmt.Errorf("unsupported ttl config %s", k)
|
|
}
|
|
}
|
|
for k, v := range data {
|
|
var valueStr string
|
|
switch val := v.(type) {
|
|
case float64:
|
|
// math.Trunc(val) returns the integer part of val
|
|
if val == math.Trunc(val) {
|
|
valueStr = strconv.FormatInt(int64(val), 10)
|
|
} else {
|
|
valueStr = strconv.FormatFloat(val, 'f', -1, 64)
|
|
}
|
|
case int, int8, int16, int32, int64:
|
|
valueStr = fmt.Sprintf("%d", val)
|
|
case uint, uint8, uint16, uint32, uint64:
|
|
valueStr = fmt.Sprintf("%d", val)
|
|
default:
|
|
valueStr = fmt.Sprint(v)
|
|
}
|
|
if err := s.persistOptions.SetTTLData(s.ctx, s.client, k, valueStr, ttl); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IsTTLConfigExist returns true if the ttl config is existed for a given config.
|
|
func (s *Server) IsTTLConfigExist(key string) bool {
|
|
if config.IsSupportedTTLConfig(key) {
|
|
if _, ok := s.persistOptions.GetTTLData(key); ok {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// MarkSnapshotRecovering mark pd that we're recovering
|
|
// tikv will get this state during BR EBS restore.
|
|
// we write this info into etcd for simplicity, the key only stays inside etcd temporary
|
|
// during BR EBS restore in which period the cluster is not able to serve request.
|
|
// and is deleted after BR EBS restore is done.
|
|
func (s *Server) MarkSnapshotRecovering() error {
|
|
log.Info("mark snapshot recovering")
|
|
markPath := keypath.RecoveringMarkPath()
|
|
// the value doesn't matter, set to a static string
|
|
_, err := kv.NewSlowLogTxn(s.client).
|
|
If(clientv3.Compare(clientv3.CreateRevision(markPath), "=", 0)).
|
|
Then(clientv3.OpPut(markPath, "on")).
|
|
Commit()
|
|
// if other client already marked, return success too
|
|
return err
|
|
}
|
|
|
|
// IsSnapshotRecovering check whether recovering-mark marked
|
|
func (s *Server) IsSnapshotRecovering(ctx context.Context) (bool, error) {
|
|
resp, err := s.client.Get(ctx, keypath.RecoveringMarkPath())
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(resp.Kvs) > 0, nil
|
|
}
|
|
|
|
// UnmarkSnapshotRecovering unmark recovering mark
|
|
func (s *Server) UnmarkSnapshotRecovering(ctx context.Context) error {
|
|
log.Info("unmark snapshot recovering")
|
|
_, err := s.client.Delete(ctx, keypath.RecoveringMarkPath())
|
|
// if other client already unmarked, return success too
|
|
return err
|
|
}
|
|
|
|
// MarkPitrRestoreMode mark pd that we're pitr restore mode
|
|
func (s *Server) MarkPitrRestoreMode() error {
|
|
log.Info("mark pitr restore mode")
|
|
markPath := keypath.PitrRestoreModeMarkPath()
|
|
// the value doesn't matter, set to a static string
|
|
_, err := kv.NewSlowLogTxn(s.client).
|
|
If(clientv3.Compare(clientv3.CreateRevision(markPath), "=", 0)).
|
|
Then(clientv3.OpPut(markPath, "on")).
|
|
Commit()
|
|
// if other client already marked, return success too
|
|
return err
|
|
}
|
|
|
|
// IsPitrRestoreMode check whether pitr-restore-mode-mark marked
|
|
func (s *Server) IsPitrRestoreMode(ctx context.Context) (bool, error) {
|
|
resp, err := s.client.Get(ctx, keypath.PitrRestoreModeMarkPath())
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return len(resp.Kvs) > 0, nil
|
|
}
|
|
|
|
// UnmarkPitrRestoreMode unmark pitr restore mode mark
|
|
func (s *Server) UnmarkPitrRestoreMode(ctx context.Context) error {
|
|
log.Info("unmark pitr restore mode")
|
|
_, err := s.client.Delete(ctx, keypath.PitrRestoreModeMarkPath())
|
|
// if other client already unmarked, return success too
|
|
return err
|
|
}
|
|
|
|
// GetServicePrimaryAddr returns the primary address for a given service.
|
|
// Note: This function will only return primary address without judging if it's alive.
|
|
func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string) (string, bool) {
|
|
ticker := time.NewTicker(retryIntervalGetServicePrimary)
|
|
defer ticker.Stop()
|
|
for range maxRetryTimesGetServicePrimary {
|
|
if v, ok := s.servicePrimaryMap.Load(serviceName); ok {
|
|
return v.(string), true
|
|
}
|
|
select {
|
|
case <-s.ctx.Done():
|
|
return "", false
|
|
case <-ctx.Done():
|
|
return "", false
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
return "", false
|
|
}
|
|
|
|
// SetServicePrimaryAddr sets the primary address directly.
|
|
// Note: This function is only used for test.
|
|
func (s *Server) SetServicePrimaryAddr(serviceName, addr string) {
|
|
s.servicePrimaryMap.Store(serviceName, addr)
|
|
}
|
|
|
|
func (s *Server) initTSOPrimaryWatcher() {
|
|
serviceName := mcs.TSOServiceName
|
|
tsoServicePrimaryKey := keypath.ElectionPath(&keypath.MsParam{
|
|
ServiceName: mcs.TSOServiceName,
|
|
GroupID: constant.DefaultKeyspaceGroupID,
|
|
})
|
|
s.tsoPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, tsoServicePrimaryKey)
|
|
s.tsoPrimaryWatcher.StartWatchLoop()
|
|
}
|
|
|
|
func (s *Server) initSchedulingPrimaryWatcher() {
|
|
serviceName := mcs.SchedulingServiceName
|
|
primaryKey := keypath.ElectionPath(&keypath.MsParam{
|
|
ServiceName: mcs.SchedulingServiceName,
|
|
})
|
|
s.schedulingPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, primaryKey)
|
|
s.schedulingPrimaryWatcher.StartWatchLoop()
|
|
}
|
|
|
|
func (s *Server) initServicePrimaryWatcher(serviceName string, primaryKey string) *etcdutil.LoopWatcher {
|
|
putFn := func(kv *mvccpb.KeyValue) error {
|
|
primary := member.NewParticipantByService(serviceName)
|
|
if err := proto.Unmarshal(kv.Value, primary); err != nil {
|
|
return err
|
|
}
|
|
listenUrls := primary.GetListenUrls()
|
|
if len(listenUrls) > 0 {
|
|
// listenUrls[0] is the primary service endpoint of the keyspace group
|
|
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
|
|
log.Info("update service primary", zap.String("service-name", serviceName), zap.String("primary", listenUrls[0]))
|
|
}
|
|
return nil
|
|
}
|
|
deleteFn := func(*mvccpb.KeyValue) error {
|
|
var oldPrimary string
|
|
v, ok := s.servicePrimaryMap.Load(serviceName)
|
|
if ok {
|
|
oldPrimary = v.(string)
|
|
}
|
|
log.Info("delete service primary", zap.String("service-name", serviceName), zap.String("old-primary", oldPrimary))
|
|
s.servicePrimaryMap.Delete(serviceName)
|
|
return nil
|
|
}
|
|
name := fmt.Sprintf("%s-primary-watcher", serviceName)
|
|
return etcdutil.NewLoopWatcher(
|
|
s.serverLoopCtx,
|
|
&s.serverLoopWg,
|
|
s.client,
|
|
name,
|
|
primaryKey,
|
|
func([]*clientv3.Event) error { return nil },
|
|
putFn,
|
|
deleteFn,
|
|
func([]*clientv3.Event) error { return nil },
|
|
false, /* withPrefix */
|
|
)
|
|
}
|
|
|
|
// RecoverAllocID recover alloc id. set current base id to input id
|
|
func (s *Server) RecoverAllocID(_ context.Context, id uint64) error {
|
|
return s.idAllocator.SetBase(id)
|
|
}
|
|
|
|
// GetExternalTS returns external timestamp.
|
|
func (s *Server) GetExternalTS() uint64 {
|
|
rc := s.GetRaftCluster()
|
|
if rc == nil {
|
|
return 0
|
|
}
|
|
return rc.GetExternalTS()
|
|
}
|
|
|
|
// SetExternalTS returns external timestamp.
|
|
func (s *Server) SetExternalTS(externalTS, globalTS uint64) error {
|
|
if tsoutil.CompareTimestampUint64(externalTS, globalTS) == 1 {
|
|
desc := "the external timestamp should not be larger than global ts"
|
|
log.Error(desc, zap.Uint64("request-timestamp", externalTS), zap.Uint64("global-ts", globalTS))
|
|
return errors.New(desc)
|
|
}
|
|
c := s.GetRaftCluster()
|
|
if c == nil {
|
|
return errs.ErrNotBootstrapped.FastGenByArgs()
|
|
}
|
|
currentExternalTS := c.GetExternalTS()
|
|
if tsoutil.CompareTimestampUint64(externalTS, currentExternalTS) != 1 {
|
|
desc := "the external timestamp should be larger than current external timestamp"
|
|
log.Error(desc, zap.Uint64("request", externalTS), zap.Uint64("current", currentExternalTS))
|
|
return errors.New(desc)
|
|
}
|
|
|
|
return c.SetExternalTS(externalTS)
|
|
}
|
|
|
|
// GetMaxConcurrentTSOProxyStreamings returns the max concurrent TSO proxy streamings.
|
|
// If the value is negative, there is no limit.
|
|
func (s *Server) GetMaxConcurrentTSOProxyStreamings() int {
|
|
return s.cfg.GetMaxConcurrentTSOProxyStreamings()
|
|
}
|
|
|
|
// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client.
|
|
func (s *Server) GetTSOProxyRecvFromClientTimeout() time.Duration {
|
|
return s.cfg.GetTSOProxyRecvFromClientTimeout()
|
|
}
|
|
|
|
// GetLease returns the leader lease.
|
|
func (s *Server) GetLease() int64 {
|
|
return s.cfg.GetLease()
|
|
}
|
|
|
|
// GetTSOSaveInterval returns TSO save interval.
|
|
func (s *Server) GetTSOSaveInterval() time.Duration {
|
|
return s.cfg.GetTSOSaveInterval()
|
|
}
|
|
|
|
// GetTSOUpdatePhysicalInterval returns TSO update physical interval.
|
|
func (s *Server) GetTSOUpdatePhysicalInterval() time.Duration {
|
|
return s.cfg.GetTSOUpdatePhysicalInterval()
|
|
}
|
|
|
|
// GetMaxResetTSGap gets the max gap to reset the tso.
|
|
func (s *Server) GetMaxResetTSGap() time.Duration {
|
|
return s.persistOptions.GetMaxResetTSGap()
|
|
}
|
|
|
|
// SetClient sets the etcd client.
|
|
// Notes: it is only used for test.
|
|
func (s *Server) SetClient(client *clientv3.Client) {
|
|
s.client = client
|
|
}
|