mirror of https://github.com/tikv/pd.git
1180 lines
37 KiB
Go
1180 lines
37 KiB
Go
// Copyright 2023 TiKV Project Authors.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package keyspace
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/kvproto/pkg/tsopb"
|
|
"github.com/pingcap/log"
|
|
|
|
"github.com/tikv/pd/pkg/balancer"
|
|
"github.com/tikv/pd/pkg/errs"
|
|
"github.com/tikv/pd/pkg/keyspace/constant"
|
|
"github.com/tikv/pd/pkg/mcs/discovery"
|
|
mcs "github.com/tikv/pd/pkg/mcs/utils/constant"
|
|
"github.com/tikv/pd/pkg/slice"
|
|
"github.com/tikv/pd/pkg/storage/endpoint"
|
|
"github.com/tikv/pd/pkg/storage/kv"
|
|
"github.com/tikv/pd/pkg/utils/etcdutil"
|
|
"github.com/tikv/pd/pkg/utils/keypath"
|
|
"github.com/tikv/pd/pkg/utils/logutil"
|
|
"github.com/tikv/pd/pkg/utils/syncutil"
|
|
"github.com/tikv/pd/pkg/utils/typeutil"
|
|
)
|
|
|
|
const (
|
|
defaultBalancerPolicy = balancer.PolicyRoundRobin
|
|
allocNodesToKeyspaceGroupsInterval = 1 * time.Second
|
|
allocNodesTimeout = 1 * time.Second
|
|
allocNodesInterval = 10 * time.Millisecond
|
|
)
|
|
|
|
const (
|
|
opAdd int = iota
|
|
opDelete
|
|
)
|
|
|
|
// GroupManager is the manager of keyspace group related data.
|
|
type GroupManager struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
client *clientv3.Client
|
|
|
|
syncutil.RWMutex
|
|
// groups is the cache of keyspace group related information.
|
|
// user kind -> keyspace group
|
|
groups map[endpoint.UserKind]*indexedHeap
|
|
|
|
// store is the storage for keyspace group related information.
|
|
store endpoint.KeyspaceGroupStorage
|
|
|
|
// nodeBalancer is the balancer for tso nodes.
|
|
// TODO: add user kind with different balancer when we ensure where the correspondence between tso node and user kind will be found
|
|
nodesBalancer balancer.Balancer[string]
|
|
// serviceRegistryMap stores the mapping from the service registry key to the service address.
|
|
// Note: it is only used in tsoNodesWatcher.
|
|
serviceRegistryMap map[string]string
|
|
// tsoNodesWatcher is the watcher for the registered tso servers.
|
|
tsoNodesWatcher *etcdutil.LoopWatcher
|
|
}
|
|
|
|
// NewKeyspaceGroupManager creates a Manager of keyspace group related data.
|
|
func NewKeyspaceGroupManager(
|
|
ctx context.Context,
|
|
store endpoint.KeyspaceGroupStorage,
|
|
client *clientv3.Client,
|
|
) *GroupManager {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
groups := make(map[endpoint.UserKind]*indexedHeap)
|
|
for i := range endpoint.UserKindCount {
|
|
groups[i] = newIndexedHeap(int(mcs.MaxKeyspaceGroupCountInUse))
|
|
}
|
|
m := &GroupManager{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
store: store,
|
|
groups: groups,
|
|
client: client,
|
|
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
|
|
serviceRegistryMap: make(map[string]string),
|
|
}
|
|
|
|
// If the etcd client is not nil, start the watch loop for the registered tso servers.
|
|
// The PD(TSO) Client relies on this info to discover tso servers.
|
|
if m.client != nil {
|
|
m.initTSONodesWatcher(m.client)
|
|
m.tsoNodesWatcher.StartWatchLoop()
|
|
}
|
|
return m
|
|
}
|
|
|
|
// Bootstrap saves default keyspace group info and init group mapping in the memory.
|
|
func (m *GroupManager) Bootstrap(ctx context.Context) error {
|
|
// Force the membership restriction that the default keyspace must belong to default keyspace group.
|
|
// Have no information to specify the distribution of the default keyspace group replicas, so just
|
|
// leave the replica/member list empty. The TSO service will assign the default keyspace group replica
|
|
// to every tso node/pod by default.
|
|
keyspaces := []uint32{GetBootstrapKeyspaceID()}
|
|
defaultKeyspaceGroup := &endpoint.KeyspaceGroup{
|
|
ID: constant.DefaultKeyspaceGroupID,
|
|
UserKind: endpoint.Basic.String(),
|
|
Keyspaces: keyspaces,
|
|
}
|
|
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
|
|
// Ignore the error if default keyspace group already exists in the storage (e.g. PD restart/recover).
|
|
err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup}, false)
|
|
if err != nil && err != errs.ErrKeyspaceGroupExists {
|
|
return err
|
|
}
|
|
|
|
// Load all the keyspace groups from the storage and add to the respective userKind groups.
|
|
groups, err := m.store.LoadKeyspaceGroups(constant.DefaultKeyspaceGroupID, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, group := range groups {
|
|
userKind := endpoint.StringUserKind(group.UserKind)
|
|
m.groups[userKind].Put(group)
|
|
}
|
|
|
|
// It will only alloc node when the group manager is on API leader.
|
|
if m.client != nil {
|
|
m.wg.Add(1)
|
|
go m.allocNodesToAllKeyspaceGroups(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close closes the manager.
|
|
func (m *GroupManager) Close() {
|
|
m.cancel()
|
|
m.wg.Wait()
|
|
}
|
|
|
|
func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
|
|
defer logutil.LogPanic()
|
|
defer m.wg.Done()
|
|
ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval)
|
|
failpoint.Inject("acceleratedAllocNodes", func() {
|
|
ticker.Reset(time.Millisecond * 100)
|
|
})
|
|
defer ticker.Stop()
|
|
log.Info("start to alloc nodes to all keyspace groups")
|
|
for {
|
|
select {
|
|
case <-m.ctx.Done():
|
|
// When the group manager is closed, we should stop to alloc nodes to all keyspace groups.
|
|
// Note: If raftcluster is created failed but the group manager has been bootstrapped,
|
|
// we need to close this goroutine by m.cancel() rather than ctx.Done() from the raftcluster.
|
|
// because the ctx.Done() from the raftcluster will be triggered after raftcluster is created successfully.
|
|
log.Info("server is closed, stop to alloc nodes to all keyspace groups")
|
|
return
|
|
case <-ctx.Done():
|
|
// When the API leader is changed, we should stop to alloc nodes to all keyspace groups.
|
|
log.Info("the raftcluster is closed, stop to alloc nodes to all keyspace groups")
|
|
return
|
|
case <-ticker.C:
|
|
if m.GetNodesCount() == 0 {
|
|
continue
|
|
}
|
|
}
|
|
groups, err := m.store.LoadKeyspaceGroups(constant.DefaultKeyspaceGroupID, 0)
|
|
if err != nil {
|
|
log.Error("failed to load all keyspace groups", zap.Error(err))
|
|
continue
|
|
}
|
|
// if the default keyspace is not initialized, we should wait for the default keyspace to be initialized.
|
|
if len(groups) == 0 {
|
|
continue
|
|
}
|
|
for _, group := range groups {
|
|
existMembers := make(map[string]struct{})
|
|
for _, member := range group.Members {
|
|
if exist, addr := m.IsExistNode(member.Address); exist {
|
|
existMembers[addr] = struct{}{}
|
|
}
|
|
}
|
|
numExistMembers := len(existMembers)
|
|
if numExistMembers != 0 && numExistMembers == len(group.Members) && numExistMembers == m.GetNodesCount() {
|
|
continue
|
|
}
|
|
if numExistMembers < mcs.DefaultKeyspaceGroupReplicaCount {
|
|
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, existMembers, mcs.DefaultKeyspaceGroupReplicaCount)
|
|
if err != nil {
|
|
log.Error("failed to alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", group.ID), zap.Error(err))
|
|
continue
|
|
}
|
|
log.Info("alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", group.ID), zap.Any("nodes", nodes))
|
|
group.Members = nodes
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client) {
|
|
putFn := func(kv *mvccpb.KeyValue) error {
|
|
s := &discovery.ServiceRegistryEntry{}
|
|
if err := json.Unmarshal(kv.Value, s); err != nil {
|
|
log.Warn("failed to unmarshal service registry entry",
|
|
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
|
|
return err
|
|
}
|
|
m.nodesBalancer.Put(s.ServiceAddr)
|
|
m.serviceRegistryMap[string(kv.Key)] = s.ServiceAddr
|
|
return nil
|
|
}
|
|
deleteFn := func(kv *mvccpb.KeyValue) error {
|
|
key := string(kv.Key)
|
|
if serviceAddr, ok := m.serviceRegistryMap[key]; ok {
|
|
delete(m.serviceRegistryMap, key)
|
|
m.nodesBalancer.Delete(serviceAddr)
|
|
return nil
|
|
}
|
|
return errors.Errorf("failed to find the service address for key %s", key)
|
|
}
|
|
|
|
m.tsoNodesWatcher = etcdutil.NewLoopWatcher(
|
|
m.ctx,
|
|
&m.wg,
|
|
client,
|
|
"tso-nodes-watcher",
|
|
keypath.ServicePath(mcs.TSOServiceName),
|
|
func([]*clientv3.Event) error { return nil },
|
|
putFn,
|
|
deleteFn,
|
|
func([]*clientv3.Event) error { return nil },
|
|
true, /* withPrefix */
|
|
)
|
|
}
|
|
|
|
// CreateKeyspaceGroups creates keyspace groups.
|
|
func (m *GroupManager) CreateKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGroup) error {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
if err := m.saveKeyspaceGroups(keyspaceGroups, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, keyspaceGroup := range keyspaceGroups {
|
|
userKind := endpoint.StringUserKind(keyspaceGroup.UserKind)
|
|
m.groups[userKind].Put(keyspaceGroup)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetTSOServiceAddrs gets all TSO service addresses.
|
|
func (m *GroupManager) GetTSOServiceAddrs() []string {
|
|
if m == nil || m.nodesBalancer == nil {
|
|
return nil
|
|
}
|
|
return m.nodesBalancer.GetAll()
|
|
}
|
|
|
|
// GetKeyspaceGroups gets keyspace groups from the start ID with limit.
|
|
// If limit is 0, it will load all keyspace groups from the start ID.
|
|
func (m *GroupManager) GetKeyspaceGroups(startID uint32, limit int) ([]*endpoint.KeyspaceGroup, error) {
|
|
return m.store.LoadKeyspaceGroups(startID, limit)
|
|
}
|
|
|
|
// GetKeyspaceGroupByID returns the keyspace group by ID.
|
|
func (m *GroupManager) GetKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGroup, error) {
|
|
var (
|
|
kg *endpoint.KeyspaceGroup
|
|
err error
|
|
)
|
|
|
|
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
|
|
kg, err = m.store.LoadKeyspaceGroup(txn, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
return kg, nil
|
|
}
|
|
|
|
// DeleteKeyspaceGroupByID deletes the keyspace group by ID.
|
|
func (m *GroupManager) DeleteKeyspaceGroupByID(id uint32) (*endpoint.KeyspaceGroup, error) {
|
|
var (
|
|
kg *endpoint.KeyspaceGroup
|
|
err error
|
|
)
|
|
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
|
|
kg, err = m.store.LoadKeyspaceGroup(txn, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if kg == nil {
|
|
return nil
|
|
}
|
|
if kg.IsSplitting() {
|
|
return errs.ErrKeyspaceGroupInSplit.FastGenByArgs(id)
|
|
}
|
|
return m.store.DeleteKeyspaceGroup(txn, id)
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
userKind := endpoint.StringUserKind(kg.UserKind)
|
|
// TODO: move out the keyspace to another group
|
|
// we don't need the keyspace group as the return value
|
|
m.groups[userKind].Remove(id)
|
|
|
|
return kg, nil
|
|
}
|
|
|
|
// saveKeyspaceGroups will try to save the given keyspace groups into the storage.
|
|
// If any keyspace group already exists and `overwrite` is false, it will return ErrKeyspaceGroupExists.
|
|
func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGroup, overwrite bool) error {
|
|
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
|
|
for _, keyspaceGroup := range keyspaceGroups {
|
|
// Check if keyspace group has already existed.
|
|
oldKG, err := m.store.LoadKeyspaceGroup(txn, keyspaceGroup.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if oldKG != nil && !overwrite {
|
|
return errs.ErrKeyspaceGroupExists
|
|
}
|
|
if oldKG.IsSplitting() && overwrite {
|
|
return errs.ErrKeyspaceGroupInSplit.FastGenByArgs(keyspaceGroup.ID)
|
|
}
|
|
if oldKG.IsMerging() && overwrite {
|
|
return errs.ErrKeyspaceGroupInMerging.FastGenByArgs(keyspaceGroup.ID)
|
|
}
|
|
newKG := &endpoint.KeyspaceGroup{
|
|
ID: keyspaceGroup.ID,
|
|
UserKind: keyspaceGroup.UserKind,
|
|
Members: keyspaceGroup.Members,
|
|
Keyspaces: keyspaceGroup.Keyspaces,
|
|
}
|
|
err = m.store.SaveKeyspaceGroup(txn, newKG)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// GetKeyspaceConfigByKind returns the keyspace config for the given user kind.
|
|
func (m *GroupManager) GetKeyspaceConfigByKind(userKind endpoint.UserKind) (map[string]string, error) {
|
|
if m == nil {
|
|
return map[string]string{}, nil
|
|
}
|
|
m.RLock()
|
|
defer m.RUnlock()
|
|
return m.getKeyspaceConfigByKindLocked(userKind)
|
|
}
|
|
|
|
func (m *GroupManager) getKeyspaceConfigByKindLocked(userKind endpoint.UserKind) (map[string]string, error) {
|
|
groups, ok := m.groups[userKind]
|
|
if !ok {
|
|
return map[string]string{}, errors.Errorf("user kind %s not found", userKind)
|
|
}
|
|
kg := groups.Top()
|
|
if kg == nil {
|
|
return map[string]string{}, errors.Errorf("no keyspace group for user kind %s", userKind)
|
|
}
|
|
id := strconv.FormatUint(uint64(kg.ID), 10)
|
|
config := map[string]string{
|
|
UserKindKey: userKind.String(),
|
|
TSOKeyspaceGroupIDKey: id,
|
|
}
|
|
return config, nil
|
|
}
|
|
|
|
// GetGroupByKeyspaceID returns the keyspace group ID for the given keyspace ID.
|
|
func (m *GroupManager) GetGroupByKeyspaceID(id uint32) (uint32, error) {
|
|
m.RLock()
|
|
defer m.RUnlock()
|
|
for _, groups := range m.groups {
|
|
for _, group := range groups.GetAll() {
|
|
if slice.Contains(group.Keyspaces, id) {
|
|
return group.ID, nil
|
|
}
|
|
}
|
|
}
|
|
return 0, errs.ErrKeyspaceNotInAnyKeyspaceGroup
|
|
}
|
|
|
|
var failpointOnce sync.Once
|
|
|
|
// UpdateKeyspaceForGroup updates the keyspace field for the keyspace group.
|
|
func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupID string, keyspaceID uint32, mutation int) error {
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
id, err := strconv.ParseUint(groupID, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
failpoint.Inject("externalAllocNode", func(val failpoint.Value) {
|
|
failpointOnce.Do(func() {
|
|
addrs := val.(string)
|
|
_ = m.SetNodesForKeyspaceGroup(constant.DefaultKeyspaceGroupID, strings.Split(addrs, ","))
|
|
})
|
|
})
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
return m.updateKeyspaceForGroupLocked(userKind, id, keyspaceID, mutation)
|
|
}
|
|
|
|
func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind, groupID uint64, keyspaceID uint32, mutation int) error {
|
|
kg := m.groups[userKind].Get(uint32(groupID))
|
|
if kg == nil {
|
|
return errs.ErrKeyspaceGroupNotExists.FastGenByArgs(uint32(groupID))
|
|
}
|
|
if kg.IsSplitting() {
|
|
return errs.ErrKeyspaceGroupInSplit.FastGenByArgs(uint32(groupID))
|
|
}
|
|
if kg.IsMerging() {
|
|
return errs.ErrKeyspaceGroupInMerging.FastGenByArgs(uint32(groupID))
|
|
}
|
|
|
|
changed := false
|
|
|
|
switch mutation {
|
|
case opAdd:
|
|
if !slice.Contains(kg.Keyspaces, keyspaceID) {
|
|
kg.Keyspaces = append(kg.Keyspaces, keyspaceID)
|
|
changed = true
|
|
}
|
|
case opDelete:
|
|
lenOfKeyspaces := len(kg.Keyspaces)
|
|
kg.Keyspaces = slice.Remove(kg.Keyspaces, keyspaceID)
|
|
if lenOfKeyspaces != len(kg.Keyspaces) {
|
|
changed = true
|
|
}
|
|
}
|
|
|
|
if changed {
|
|
if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{kg}, true); err != nil {
|
|
return err
|
|
}
|
|
m.groups[userKind].Put(kg)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateKeyspaceGroup updates the keyspace group.
|
|
func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUserKind, newUserKind endpoint.UserKind, keyspaceID uint32) error {
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
oldID, err := strconv.ParseUint(oldGroupID, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newID, err := strconv.ParseUint(newGroupID, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
oldKG := m.groups[oldUserKind].Get(uint32(oldID))
|
|
if oldKG == nil {
|
|
return errors.Errorf("keyspace group %s not found in %s group", oldGroupID, oldUserKind)
|
|
}
|
|
newKG := m.groups[newUserKind].Get(uint32(newID))
|
|
if newKG == nil {
|
|
return errors.Errorf("keyspace group %s not found in %s group", newGroupID, newUserKind)
|
|
}
|
|
if oldKG.IsSplitting() {
|
|
return errs.ErrKeyspaceGroupInSplit.FastGenByArgs(uint32(oldID))
|
|
} else if newKG.IsSplitting() {
|
|
return errs.ErrKeyspaceGroupInSplit.FastGenByArgs(uint32(newID))
|
|
} else if oldKG.IsMerging() {
|
|
return errs.ErrKeyspaceGroupInMerging.FastGenByArgs(uint32(oldID))
|
|
} else if newKG.IsMerging() {
|
|
return errs.ErrKeyspaceGroupInMerging.FastGenByArgs(uint32(newID))
|
|
}
|
|
|
|
var updateOld, updateNew bool
|
|
if !slice.Contains(newKG.Keyspaces, keyspaceID) {
|
|
newKG.Keyspaces = append(newKG.Keyspaces, keyspaceID)
|
|
updateNew = true
|
|
}
|
|
|
|
lenOfOldKeyspaces := len(oldKG.Keyspaces)
|
|
oldKG.Keyspaces = slice.Remove(oldKG.Keyspaces, keyspaceID)
|
|
if lenOfOldKeyspaces != len(oldKG.Keyspaces) {
|
|
updateOld = true
|
|
}
|
|
|
|
if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{oldKG, newKG}, true); err != nil {
|
|
return err
|
|
}
|
|
|
|
if updateOld {
|
|
m.groups[oldUserKind].Put(oldKG)
|
|
}
|
|
|
|
if updateNew {
|
|
m.groups[newUserKind].Put(newKG)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID.
|
|
// And the keyspaces in the old keyspace group will be moved to the new keyspace group.
|
|
func (m *GroupManager) SplitKeyspaceGroupByID(
|
|
splitSourceID, splitTargetID uint32,
|
|
keyspaces []uint32, keyspaceIDRange ...uint32,
|
|
) error {
|
|
var splitSourceKg, splitTargetKg *endpoint.KeyspaceGroup
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
|
|
// Load the old keyspace group first.
|
|
splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitSourceID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if splitSourceKg == nil {
|
|
return errs.ErrKeyspaceGroupNotExists.FastGenByArgs(splitSourceID)
|
|
}
|
|
// A keyspace group can not take part in multiple split processes.
|
|
if splitSourceKg.IsSplitting() {
|
|
return errs.ErrKeyspaceGroupInSplit.FastGenByArgs(splitSourceID)
|
|
}
|
|
// A keyspace group can not be split when it is in merging.
|
|
if splitSourceKg.IsMerging() {
|
|
return errs.ErrKeyspaceGroupInMerging.FastGenByArgs(splitSourceID)
|
|
}
|
|
// Build the new keyspace groups for split source and target.
|
|
var startKeyspaceID, endKeyspaceID uint32
|
|
if len(keyspaceIDRange) >= 2 {
|
|
startKeyspaceID, endKeyspaceID = keyspaceIDRange[0], keyspaceIDRange[1]
|
|
}
|
|
splitSourceKeyspaces, splitTargetKeyspaces, err := buildSplitKeyspaces(
|
|
splitSourceKg.Keyspaces, keyspaces, startKeyspaceID, endKeyspaceID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Check if the source keyspace group has enough replicas.
|
|
if len(splitSourceKg.Members) < mcs.DefaultKeyspaceGroupReplicaCount {
|
|
return errs.ErrKeyspaceGroupNotEnoughReplicas
|
|
}
|
|
// Check if the new keyspace group already exists.
|
|
splitTargetKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if splitTargetKg != nil {
|
|
return errs.ErrKeyspaceGroupExists
|
|
}
|
|
// Update the old keyspace group.
|
|
splitSourceKg.Keyspaces = splitSourceKeyspaces
|
|
splitSourceKg.SplitState = &endpoint.SplitState{
|
|
SplitSource: splitSourceKg.ID,
|
|
}
|
|
if err = m.store.SaveKeyspaceGroup(txn, splitSourceKg); err != nil {
|
|
return err
|
|
}
|
|
splitTargetKg = &endpoint.KeyspaceGroup{
|
|
ID: splitTargetID,
|
|
// Keep the same user kind and members as the old keyspace group.
|
|
UserKind: splitSourceKg.UserKind,
|
|
Members: splitSourceKg.Members,
|
|
Keyspaces: splitTargetKeyspaces,
|
|
SplitState: &endpoint.SplitState{
|
|
SplitSource: splitSourceKg.ID,
|
|
},
|
|
}
|
|
// Create the new split keyspace group.
|
|
return m.store.SaveKeyspaceGroup(txn, splitTargetKg)
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
// Update the keyspace group cache.
|
|
m.groups[endpoint.StringUserKind(splitSourceKg.UserKind)].Put(splitSourceKg)
|
|
m.groups[endpoint.StringUserKind(splitTargetKg.UserKind)].Put(splitTargetKg)
|
|
return nil
|
|
}
|
|
|
|
func buildSplitKeyspaces(
|
|
// `old` is the original keyspace list which will be split out,
|
|
// `new` is the keyspace list which will be split from the old keyspace list.
|
|
old, new []uint32,
|
|
startKeyspaceID, endKeyspaceID uint32,
|
|
) (oldSplit, newSplit []uint32, err error) {
|
|
oldNum, newNum := len(old), len(new)
|
|
// Split according to the new keyspace list.
|
|
if newNum != 0 {
|
|
if newNum > oldNum {
|
|
return nil, nil, errs.ErrKeyspaceNotInKeyspaceGroup
|
|
}
|
|
var (
|
|
oldKeyspaceMap = make(map[uint32]struct{}, oldNum)
|
|
newKeyspaceMap = make(map[uint32]struct{}, newNum)
|
|
)
|
|
for _, keyspace := range old {
|
|
oldKeyspaceMap[keyspace] = struct{}{}
|
|
}
|
|
for _, keyspace := range new {
|
|
if isProtectedKeyspaceID(keyspace) {
|
|
return nil, nil, newModifyProtectedKeyspaceError()
|
|
}
|
|
if _, ok := oldKeyspaceMap[keyspace]; !ok {
|
|
return nil, nil, errs.ErrKeyspaceNotInKeyspaceGroup
|
|
}
|
|
newKeyspaceMap[keyspace] = struct{}{}
|
|
}
|
|
// Get the split keyspace list for the old keyspace group.
|
|
oldSplit = make([]uint32, 0, oldNum-newNum)
|
|
for _, keyspace := range old {
|
|
if _, ok := newKeyspaceMap[keyspace]; !ok {
|
|
oldSplit = append(oldSplit, keyspace)
|
|
}
|
|
}
|
|
// If newNum != len(newKeyspaceMap), it means the provided new keyspace list contains
|
|
// duplicate keyspaces, and we need to dedup them (https://github.com/tikv/pd/issues/6687);
|
|
// otherwise, we can just return the old split and new keyspace list.
|
|
if newNum == len(newKeyspaceMap) {
|
|
return oldSplit, new, nil
|
|
}
|
|
newSplit = make([]uint32, 0, len(newKeyspaceMap))
|
|
for keyspace := range newKeyspaceMap {
|
|
newSplit = append(newSplit, keyspace)
|
|
}
|
|
return oldSplit, newSplit, nil
|
|
}
|
|
// Split according to the start and end keyspace ID.
|
|
if startKeyspaceID == 0 && endKeyspaceID == 0 {
|
|
return nil, nil, errs.ErrKeyspaceNotInKeyspaceGroup
|
|
}
|
|
newSplit = make([]uint32, 0, oldNum)
|
|
newKeyspaceMap := make(map[uint32]struct{}, newNum)
|
|
for _, keyspace := range old {
|
|
if isProtectedKeyspaceID(keyspace) {
|
|
// The source keyspace group must be the default keyspace group and we always keep
|
|
// the bootstrap keyspace (default or system) in the default keyspace group.
|
|
continue
|
|
}
|
|
|
|
if startKeyspaceID <= keyspace && keyspace <= endKeyspaceID {
|
|
newSplit = append(newSplit, keyspace)
|
|
newKeyspaceMap[keyspace] = struct{}{}
|
|
}
|
|
}
|
|
// Check if the new keyspace list is empty.
|
|
if len(newSplit) == 0 {
|
|
return nil, nil, errs.ErrKeyspaceGroupWithEmptyKeyspace
|
|
}
|
|
// Get the split keyspace list for the old keyspace group.
|
|
oldSplit = make([]uint32, 0, oldNum-len(newSplit))
|
|
for _, keyspace := range old {
|
|
if _, ok := newKeyspaceMap[keyspace]; !ok {
|
|
oldSplit = append(oldSplit, keyspace)
|
|
}
|
|
}
|
|
return oldSplit, newSplit, nil
|
|
}
|
|
|
|
// FinishSplitKeyspaceByID finishes the split keyspace group by the split target ID.
|
|
func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error {
|
|
var splitTargetKg, splitSourceKg *endpoint.KeyspaceGroup
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
|
|
// Load the split target keyspace group first.
|
|
splitTargetKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if splitTargetKg == nil {
|
|
return errs.ErrKeyspaceGroupNotExists.FastGenByArgs(splitTargetID)
|
|
}
|
|
// Check if it's in the split state.
|
|
if !splitTargetKg.IsSplitTarget() {
|
|
return errs.ErrKeyspaceGroupNotInSplit.FastGenByArgs(splitTargetID)
|
|
}
|
|
// Load the split source keyspace group then.
|
|
splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitTargetKg.SplitSource())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if splitSourceKg == nil {
|
|
return errs.ErrKeyspaceGroupNotExists.FastGenByArgs(splitTargetKg.SplitSource())
|
|
}
|
|
if !splitSourceKg.IsSplitSource() {
|
|
return errs.ErrKeyspaceGroupNotInSplit.FastGenByArgs(splitTargetKg.SplitSource())
|
|
}
|
|
splitTargetKg.SplitState = nil
|
|
splitSourceKg.SplitState = nil
|
|
err = m.store.SaveKeyspaceGroup(txn, splitTargetKg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return m.store.SaveKeyspaceGroup(txn, splitSourceKg)
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
// Update the keyspace group cache.
|
|
m.groups[endpoint.StringUserKind(splitTargetKg.UserKind)].Put(splitTargetKg)
|
|
m.groups[endpoint.StringUserKind(splitSourceKg.UserKind)].Put(splitSourceKg)
|
|
log.Info("finish split keyspace group", zap.Uint32("split-source-id", splitSourceKg.ID), zap.Uint32("split-target-id", splitTargetID))
|
|
return nil
|
|
}
|
|
|
|
// GetNodesCount returns the count of nodes.
|
|
func (m *GroupManager) GetNodesCount() int {
|
|
if m.nodesBalancer == nil {
|
|
return 0
|
|
}
|
|
return m.nodesBalancer.Len()
|
|
}
|
|
|
|
// AllocNodesForKeyspaceGroup allocates nodes for the keyspace group.
|
|
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, existMembers map[string]struct{}, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
ctx, cancel := context.WithTimeout(m.ctx, allocNodesTimeout)
|
|
defer cancel()
|
|
ticker := time.NewTicker(allocNodesInterval)
|
|
defer ticker.Stop()
|
|
|
|
var kg *endpoint.KeyspaceGroup
|
|
nodes := make([]endpoint.KeyspaceGroupMember, 0, desiredReplicaCount)
|
|
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
|
|
var err error
|
|
kg, err = m.store.LoadKeyspaceGroup(txn, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if kg == nil {
|
|
return errs.ErrKeyspaceGroupNotExists.FastGenByArgs(id)
|
|
}
|
|
if kg.IsSplitting() {
|
|
return errs.ErrKeyspaceGroupInSplit.FastGenByArgs(id)
|
|
}
|
|
if kg.IsMerging() {
|
|
return errs.ErrKeyspaceGroupInMerging.FastGenByArgs(id)
|
|
}
|
|
|
|
for addr := range existMembers {
|
|
nodes = append(nodes, endpoint.KeyspaceGroupMember{
|
|
Address: addr,
|
|
Priority: mcs.DefaultKeyspaceGroupReplicaPriority,
|
|
})
|
|
}
|
|
|
|
for len(existMembers) < desiredReplicaCount {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-ticker.C:
|
|
}
|
|
if m.GetNodesCount() == 0 { // double check
|
|
return errs.ErrNoAvailableNode
|
|
}
|
|
if len(existMembers) == m.GetNodesCount() {
|
|
break
|
|
}
|
|
addr := m.nodesBalancer.Next()
|
|
if addr == "" {
|
|
return errs.ErrNoAvailableNode
|
|
}
|
|
if _, ok := existMembers[addr]; ok {
|
|
continue
|
|
}
|
|
existMembers[addr] = struct{}{}
|
|
nodes = append(nodes, endpoint.KeyspaceGroupMember{
|
|
Address: addr,
|
|
Priority: mcs.DefaultKeyspaceGroupReplicaPriority,
|
|
})
|
|
}
|
|
kg.Members = nodes
|
|
return m.store.SaveKeyspaceGroup(txn, kg)
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg)
|
|
log.Info("alloc nodes for keyspace group",
|
|
zap.Uint32("keyspace-group-id", id),
|
|
zap.Reflect("nodes", nodes))
|
|
return nodes, nil
|
|
}
|
|
|
|
// SetNodesForKeyspaceGroup sets the nodes for the keyspace group.
|
|
func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
var kg *endpoint.KeyspaceGroup
|
|
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
|
|
var err error
|
|
kg, err = m.store.LoadKeyspaceGroup(txn, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if kg == nil {
|
|
return errs.ErrKeyspaceGroupNotExists.FastGenByArgs(id)
|
|
}
|
|
if kg.IsSplitting() {
|
|
return errs.ErrKeyspaceGroupInSplit.FastGenByArgs(id)
|
|
}
|
|
if kg.IsMerging() {
|
|
return errs.ErrKeyspaceGroupInMerging.FastGenByArgs(id)
|
|
}
|
|
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
|
|
for _, node := range nodes {
|
|
members = append(members, endpoint.KeyspaceGroupMember{
|
|
Address: node,
|
|
Priority: mcs.DefaultKeyspaceGroupReplicaPriority,
|
|
})
|
|
}
|
|
kg.Members = members
|
|
return m.store.SaveKeyspaceGroup(txn, kg)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg)
|
|
return nil
|
|
}
|
|
|
|
// SetPriorityForKeyspaceGroup sets the priority of node for the keyspace group.
|
|
func (m *GroupManager) SetPriorityForKeyspaceGroup(id uint32, node string, priority int) error {
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
var kg *endpoint.KeyspaceGroup
|
|
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
|
|
var err error
|
|
kg, err = m.store.LoadKeyspaceGroup(txn, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if kg == nil {
|
|
return errs.ErrKeyspaceGroupNotExists.FastGenByArgs(id)
|
|
}
|
|
if kg.IsSplitting() {
|
|
return errs.ErrKeyspaceGroupInSplit.FastGenByArgs(id)
|
|
}
|
|
if kg.IsMerging() {
|
|
return errs.ErrKeyspaceGroupInMerging.FastGenByArgs(id)
|
|
}
|
|
inKeyspaceGroup := false
|
|
members := make([]endpoint.KeyspaceGroupMember, 0, len(kg.Members))
|
|
for _, member := range kg.Members {
|
|
if member.IsAddressEquivalent(node) {
|
|
inKeyspaceGroup = true
|
|
member.Priority = priority
|
|
}
|
|
members = append(members, member)
|
|
}
|
|
if !inKeyspaceGroup {
|
|
return errs.ErrNodeNotInKeyspaceGroup
|
|
}
|
|
kg.Members = members
|
|
return m.store.SaveKeyspaceGroup(txn, kg)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg)
|
|
log.Info("set priority for keyspace group",
|
|
zap.Uint32("keyspace-group-id", id),
|
|
zap.String("node", node),
|
|
zap.Int("priority", priority))
|
|
return nil
|
|
}
|
|
|
|
// IsExistNode checks if the node exists.
|
|
func (m *GroupManager) IsExistNode(addr string) (bool, string) {
|
|
nodes := m.nodesBalancer.GetAll()
|
|
for _, node := range nodes {
|
|
if typeutil.EqualBaseURLs(node, addr) {
|
|
return true, node
|
|
}
|
|
}
|
|
return false, ""
|
|
}
|
|
|
|
// MergeKeyspaceGroups merges the keyspace group in the list into the target keyspace group.
|
|
func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uint32) error {
|
|
mergeListNum := len(mergeList)
|
|
if mergeListNum == 0 {
|
|
return nil
|
|
}
|
|
// The transaction below will:
|
|
// - Load and delete the keyspace groups in the merge list.
|
|
// - Load and update the target keyspace group.
|
|
// So we pre-check the number of operations to avoid exceeding the maximum number of etcd transaction.
|
|
if (mergeListNum+1)*2 > etcdutil.MaxEtcdTxnOps {
|
|
return errs.ErrExceedMaxEtcdTxnOps
|
|
}
|
|
if slice.Contains(mergeList, constant.DefaultKeyspaceGroupID) {
|
|
return errs.ErrModifyDefaultKeyspaceGroup
|
|
}
|
|
var (
|
|
groups = make(map[uint32]*endpoint.KeyspaceGroup, mergeListNum+1)
|
|
mergeTargetKg *endpoint.KeyspaceGroup
|
|
)
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
|
|
// Load and check all keyspace groups first.
|
|
for _, kgID := range append(mergeList, mergeTargetID) {
|
|
kg, err := m.store.LoadKeyspaceGroup(txn, kgID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if kg == nil {
|
|
return errs.ErrKeyspaceGroupNotExists.FastGenByArgs(kgID)
|
|
}
|
|
// A keyspace group can not be merged if it's in splitting.
|
|
if kg.IsSplitting() {
|
|
return errs.ErrKeyspaceGroupInSplit.FastGenByArgs(kgID)
|
|
}
|
|
// A keyspace group can not be split when it is in merging.
|
|
if kg.IsMerging() {
|
|
return errs.ErrKeyspaceGroupInMerging.FastGenByArgs(kgID)
|
|
}
|
|
groups[kgID] = kg
|
|
}
|
|
// Build the new keyspaces for the merge target keyspace group.
|
|
mergeTargetKg = groups[mergeTargetID]
|
|
keyspaces := make(map[uint32]struct{})
|
|
for _, keyspace := range mergeTargetKg.Keyspaces {
|
|
keyspaces[keyspace] = struct{}{}
|
|
}
|
|
for _, kgID := range mergeList {
|
|
kg := groups[kgID]
|
|
for _, keyspace := range kg.Keyspaces {
|
|
keyspaces[keyspace] = struct{}{}
|
|
}
|
|
}
|
|
mergedKeyspaces := make([]uint32, 0, len(keyspaces))
|
|
for keyspace := range keyspaces {
|
|
mergedKeyspaces = append(mergedKeyspaces, keyspace)
|
|
}
|
|
sort.Slice(mergedKeyspaces, func(i, j int) bool {
|
|
return mergedKeyspaces[i] < mergedKeyspaces[j]
|
|
})
|
|
mergeTargetKg.Keyspaces = mergedKeyspaces
|
|
// Update the merge state of the target keyspace group.
|
|
mergeTargetKg.MergeState = &endpoint.MergeState{
|
|
MergeList: mergeList,
|
|
}
|
|
err = m.store.SaveKeyspaceGroup(txn, mergeTargetKg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Delete the keyspace groups in merge list and move the keyspaces in it to the target keyspace group.
|
|
for _, kgID := range mergeList {
|
|
if err := m.store.DeleteKeyspaceGroup(txn, kgID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
// Update the keyspace group cache.
|
|
m.groups[endpoint.StringUserKind(mergeTargetKg.UserKind)].Put(mergeTargetKg)
|
|
for _, kgID := range mergeList {
|
|
kg := groups[kgID]
|
|
m.groups[endpoint.StringUserKind(kg.UserKind)].Remove(kgID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// FinishMergeKeyspaceByID finishes the merging keyspace group by the merge target ID.
|
|
func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error {
|
|
var (
|
|
mergeTargetKg *endpoint.KeyspaceGroup
|
|
mergeList []uint32
|
|
)
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
|
|
// Load the merge target keyspace group first.
|
|
mergeTargetKg, err = m.store.LoadKeyspaceGroup(txn, mergeTargetID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if mergeTargetKg == nil {
|
|
return errs.ErrKeyspaceGroupNotExists.FastGenByArgs(mergeTargetID)
|
|
}
|
|
// Check if it's in the merging state.
|
|
if !mergeTargetKg.IsMergeTarget() {
|
|
return errs.ErrKeyspaceGroupNotInMerging.FastGenByArgs(mergeTargetID)
|
|
}
|
|
// Make sure all merging keyspace groups are deleted.
|
|
for _, kgID := range mergeTargetKg.MergeState.MergeList {
|
|
kg, err := m.store.LoadKeyspaceGroup(txn, kgID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if kg != nil {
|
|
return errs.ErrKeyspaceGroupNotInMerging.FastGenByArgs(kgID)
|
|
}
|
|
}
|
|
mergeList = mergeTargetKg.MergeState.MergeList
|
|
mergeTargetKg.MergeState = nil
|
|
return m.store.SaveKeyspaceGroup(txn, mergeTargetKg)
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
// Update the keyspace group cache.
|
|
m.groups[endpoint.StringUserKind(mergeTargetKg.UserKind)].Put(mergeTargetKg)
|
|
log.Info("finish merge keyspace group",
|
|
zap.Uint32("merge-target-id", mergeTargetKg.ID),
|
|
zap.Reflect("merge-list", mergeList))
|
|
return nil
|
|
}
|
|
|
|
// MergeAllIntoDefaultKeyspaceGroup merges all other keyspace groups into the default keyspace group.
|
|
func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error {
|
|
defer logutil.LogPanic()
|
|
// Since we don't take the default keyspace group into account,
|
|
// the number of unmerged keyspace groups is -1.
|
|
unmergedGroupNum := -1
|
|
// Calculate the total number of keyspace groups to merge.
|
|
for _, groups := range m.groups {
|
|
unmergedGroupNum += groups.Len()
|
|
}
|
|
mergedGroupNum := 0
|
|
// Start to merge all keyspace groups into the default one.
|
|
for userKind, groups := range m.groups {
|
|
mergeNum := groups.Len()
|
|
log.Info("start to merge all keyspace groups into the default one",
|
|
zap.Stringer("user-kind", userKind),
|
|
zap.Int("merge-num", mergeNum),
|
|
zap.Int("merged-group-num", mergedGroupNum),
|
|
zap.Int("unmerged-group-num", unmergedGroupNum))
|
|
if mergeNum == 0 {
|
|
continue
|
|
}
|
|
var (
|
|
maxBatchSize = etcdutil.MaxEtcdTxnOps/2 - 1
|
|
groupsToMerge = make([]uint32, 0, maxBatchSize)
|
|
)
|
|
for idx, group := range groups.GetAll() {
|
|
if group.ID == constant.DefaultKeyspaceGroupID {
|
|
continue
|
|
}
|
|
groupsToMerge = append(groupsToMerge, group.ID)
|
|
if len(groupsToMerge) < maxBatchSize && idx < mergeNum-1 {
|
|
continue
|
|
}
|
|
log.Info("merge keyspace groups into the default one",
|
|
zap.Int("index", idx),
|
|
zap.Int("batch-size", len(groupsToMerge)),
|
|
zap.Int("merge-num", mergeNum),
|
|
zap.Int("merged-group-num", mergedGroupNum),
|
|
zap.Int("unmerged-group-num", unmergedGroupNum))
|
|
// Reach the batch size, merge them into the default keyspace group.
|
|
if err := m.MergeKeyspaceGroups(constant.DefaultKeyspaceGroupID, groupsToMerge); err != nil {
|
|
log.Error("failed to merge all keyspace groups into the default one",
|
|
zap.Int("index", idx),
|
|
zap.Int("batch-size", len(groupsToMerge)),
|
|
zap.Int("merge-num", mergeNum),
|
|
zap.Int("merged-group-num", mergedGroupNum),
|
|
zap.Int("unmerged-group-num", unmergedGroupNum),
|
|
zap.Error(err))
|
|
return err
|
|
}
|
|
// Wait for the merge to finish.
|
|
ctx, cancel := context.WithTimeout(m.ctx, time.Minute)
|
|
ticker := time.NewTicker(time.Second)
|
|
checkLoop:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Info("cancel merging all keyspace groups into the default one",
|
|
zap.Int("index", idx),
|
|
zap.Int("batch-size", len(groupsToMerge)),
|
|
zap.Int("merge-num", mergeNum),
|
|
zap.Int("merged-group-num", mergedGroupNum),
|
|
zap.Int("unmerged-group-num", unmergedGroupNum))
|
|
cancel()
|
|
ticker.Stop()
|
|
return nil
|
|
case <-ticker.C:
|
|
kg, err := m.GetKeyspaceGroupByID(constant.DefaultKeyspaceGroupID)
|
|
if err != nil {
|
|
log.Error("failed to check the default keyspace group merge state",
|
|
zap.Int("index", idx),
|
|
zap.Int("batch-size", len(groupsToMerge)),
|
|
zap.Int("merge-num", mergeNum),
|
|
zap.Int("merged-group-num", mergedGroupNum),
|
|
zap.Int("unmerged-group-num", unmergedGroupNum),
|
|
zap.Error(err))
|
|
cancel()
|
|
ticker.Stop()
|
|
return err
|
|
}
|
|
if !kg.IsMergeTarget() {
|
|
break checkLoop
|
|
}
|
|
}
|
|
}
|
|
cancel()
|
|
ticker.Stop()
|
|
mergedGroupNum += len(groupsToMerge)
|
|
unmergedGroupNum -= len(groupsToMerge)
|
|
groupsToMerge = groupsToMerge[:0]
|
|
}
|
|
}
|
|
log.Info("finish merging all keyspace groups into the default one",
|
|
zap.Int("merged-group-num", mergedGroupNum),
|
|
zap.Int("unmerged-group-num", unmergedGroupNum))
|
|
return nil
|
|
}
|
|
|
|
// GetKeyspaceGroupPrimaryByID returns the primary node of the keyspace group by ID.
|
|
func (m *GroupManager) GetKeyspaceGroupPrimaryByID(id uint32) (string, error) {
|
|
// check if the keyspace group exists
|
|
kg, err := m.GetKeyspaceGroupByID(id)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if kg == nil {
|
|
return "", errs.ErrKeyspaceGroupNotExists.FastGenByArgs(id)
|
|
}
|
|
|
|
primaryPath := keypath.ElectionPath(&keypath.MsParam{
|
|
ServiceName: mcs.TSOServiceName,
|
|
GroupID: id,
|
|
})
|
|
leader := &tsopb.Participant{}
|
|
ok, _, err := etcdutil.GetProtoMsgWithModRev(m.client, primaryPath, leader)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if !ok {
|
|
return "", errs.ErrKeyspaceGroupPrimaryNotFound
|
|
}
|
|
// The format of leader name is address-groupID.
|
|
return parsePrimaryName(leader.Name), err
|
|
}
|
|
|
|
func parsePrimaryName(name string) string {
|
|
idx := strings.LastIndex(name, "-")
|
|
if idx != -1 {
|
|
return name[:idx]
|
|
}
|
|
return name
|
|
}
|