mirror of https://github.com/tikv/client-go.git
region_cache: support <engine_role, write> for disaggregated tiflash
Signed-off-by: guo-shaoge <shaoge1994@163.com>
This commit is contained in:
parent
3f7860f109
commit
0d003d077b
|
|
@ -42,7 +42,6 @@ import (
|
|||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
|
@ -55,7 +54,6 @@ import (
|
|||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stathat/consistent"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
tikverr "github.com/tikv/client-go/v2/error"
|
||||
"github.com/tikv/client-go/v2/internal/apicodec"
|
||||
|
|
@ -86,6 +84,27 @@ const (
|
|||
defaultRegionsPerBatch = 128
|
||||
)
|
||||
|
||||
// Return false means label doesn't match, and will ignore this store.
|
||||
type LabelFilter = func(labels []*metapb.StoreLabel) bool
|
||||
|
||||
var LabelFilterOnlyTiFlashWriteNode = func(labels []*metapb.StoreLabel) bool {
|
||||
return isStoreContainLabel(labels, tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlash) &&
|
||||
isStoreContainLabel(labels, tikvrpc.EngineRoleLabelKey, tikvrpc.EngineRoleWrite)
|
||||
}
|
||||
|
||||
var LabelFilterNoTiFlashWriteNode = func(labels []*metapb.StoreLabel) bool {
|
||||
return isStoreContainLabel(labels, tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlash) &&
|
||||
!isStoreContainLabel(labels, tikvrpc.EngineRoleLabelKey, tikvrpc.EngineRoleWrite)
|
||||
}
|
||||
|
||||
var LabelFilterAllTiFlashNode = func(labels []*metapb.StoreLabel) bool {
|
||||
return isStoreContainLabel(labels, tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlash)
|
||||
}
|
||||
|
||||
var LabelFilterAllNode = func(_ []*metapb.StoreLabel) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// regionCacheTTLSec is the max idle time for regions in the region cache.
|
||||
var regionCacheTTLSec int64 = 600
|
||||
|
||||
|
|
@ -703,7 +722,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep
|
|||
}
|
||||
|
||||
// GetAllValidTiFlashStores returns the store ids of all valid TiFlash stores, the store id of currentStore is always the first one
|
||||
func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store) []uint64 {
|
||||
func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store, labelFilter LabelFilter) []uint64 {
|
||||
// set the cap to 2 because usually, TiFlash table will have 2 replicas
|
||||
allStores := make([]uint64, 0, 2)
|
||||
// make sure currentStore id is always the first in allStores
|
||||
|
|
@ -731,6 +750,9 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto
|
|||
if storeFailEpoch != regionStore.storeEpochs[storeIdx] {
|
||||
continue
|
||||
}
|
||||
if !labelFilter(store.labels) {
|
||||
continue
|
||||
}
|
||||
allStores = append(allStores, store.storeID)
|
||||
}
|
||||
return allStores
|
||||
|
|
@ -739,7 +761,7 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto
|
|||
// GetTiFlashRPCContext returns RPCContext for a region must access flash store. If it returns nil, the region
|
||||
// must be out of date and already dropped from cache or not flash store found.
|
||||
// `loadBalance` is an option. For batch cop, it is pointless and might cause try the failed store repeatly.
|
||||
func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, loadBalance bool) (*RPCContext, error) {
|
||||
func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, loadBalance bool, labelFilter LabelFilter) (*RPCContext, error) {
|
||||
ts := time.Now().Unix()
|
||||
|
||||
cachedRegion := c.GetCachedRegionWithRLock(id)
|
||||
|
|
@ -762,6 +784,9 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID,
|
|||
for i := 0; i < regionStore.accessStoreNum(tiFlashOnly); i++ {
|
||||
accessIdx := AccessIndex((sIdx + i) % regionStore.accessStoreNum(tiFlashOnly))
|
||||
storeIdx, store := regionStore.accessStore(tiFlashOnly, accessIdx)
|
||||
if !labelFilter(store.labels) {
|
||||
continue
|
||||
}
|
||||
addr, err := c.getStoreAddr(bo, cachedRegion, store)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -801,56 +826,6 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID,
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// GetTiFlashComputeRPCContextByConsistentHash return rpcCtx of tiflash_compute stores.
|
||||
// Each mpp computation of specific region will be handled by specific node whose engine-label is tiflash_compute.
|
||||
// 1. Get all stores with label <engine, tiflash_compute>.
|
||||
// 2. Get rpcCtx that indicates where the region is stored.
|
||||
// 3. Compute which tiflash_compute node should handle this region by consistent hash.
|
||||
// 4. Replace infos(addr/Store) that indicate where the region is stored to infos that indicate where the region will be computed.
|
||||
// NOTE: This function make sure the returned slice of RPCContext and the input ids correspond to each other.
|
||||
func (c *RegionCache) GetTiFlashComputeRPCContextByConsistentHash(bo *retry.Backoffer, ids []RegionVerID, stores []*Store) (res []*RPCContext, err error) {
|
||||
hasher := consistent.New()
|
||||
hasher.NumberOfReplicas = 200 // Larger replicas can balance requests more evenly
|
||||
for _, store := range stores {
|
||||
if !isStoreContainLabel(store.labels, tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashCompute) {
|
||||
return nil, errors.New("expect store should be tiflash_compute")
|
||||
}
|
||||
hasher.Add(store.GetAddr())
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
addr, err := hasher.Get(strconv.Itoa(int(id.GetID())))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rpcCtx, err := c.GetTiFlashRPCContext(bo, id, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if rpcCtx == nil {
|
||||
logutil.Logger(context.Background()).Info("rpcCtx is nil", zap.Any("region", id.String()))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var store *Store
|
||||
for _, s := range stores {
|
||||
if s.GetAddr() == addr {
|
||||
store = s
|
||||
break
|
||||
}
|
||||
}
|
||||
if store == nil {
|
||||
return nil, errors.New(fmt.Sprintf("cannot find tiflash_compute store: %v", addr))
|
||||
}
|
||||
|
||||
rpcCtx.Store = store
|
||||
rpcCtx.Addr = addr
|
||||
// Maybe no need to replace rpcCtx.AccessMode, it's only used for loadBalance when access storeIdx.
|
||||
res = append(res, rpcCtx)
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// KeyLocation is the region and range that a key is located.
|
||||
type KeyLocation struct {
|
||||
Region RegionVerID
|
||||
|
|
@ -1841,12 +1816,15 @@ func (c *RegionCache) PDClient() pd.Client {
|
|||
}
|
||||
|
||||
// GetTiFlashStores returns the information of all tiflash nodes.
|
||||
func (c *RegionCache) GetTiFlashStores() []*Store {
|
||||
func (c *RegionCache) GetTiFlashStores(labelFilter LabelFilter) []*Store {
|
||||
c.storeMu.RLock()
|
||||
defer c.storeMu.RUnlock()
|
||||
var stores []*Store
|
||||
for _, s := range c.storeMu.stores {
|
||||
if s.storeType == tikvrpc.TiFlash {
|
||||
if !labelFilter(s.labels) {
|
||||
continue
|
||||
}
|
||||
stores = append(stores, s)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1002,24 +1002,13 @@ func (s *RegionRequestSender) getRPCContext(
|
|||
}
|
||||
return s.replicaSelector.next(bo)
|
||||
case tikvrpc.TiFlash:
|
||||
return s.regionCache.GetTiFlashRPCContext(bo, regionID, true)
|
||||
// Should ignore WN, because in disaggregated tiflash mode, TiDB will build rpcCtx itself.
|
||||
return s.regionCache.GetTiFlashRPCContext(bo, regionID, true, LabelFilterNoTiFlashWriteNode)
|
||||
case tikvrpc.TiDB:
|
||||
return &RPCContext{Addr: s.storeAddr}, nil
|
||||
case tikvrpc.TiFlashCompute:
|
||||
stores, err := s.regionCache.GetTiFlashComputeStores(bo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rpcCtxs, err := s.regionCache.GetTiFlashComputeRPCContextByConsistentHash(bo, []RegionVerID{regionID}, stores)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if rpcCtxs == nil {
|
||||
return nil, nil
|
||||
} else if len(rpcCtxs) != 1 {
|
||||
return nil, errors.New(fmt.Sprintf("unexpected number of rpcCtx, expect 1, got: %v", len(rpcCtxs)))
|
||||
}
|
||||
return rpcCtxs[0], nil
|
||||
// In disaggregated tiflash mode, TiDB will build rpcCtx itself, so cannot reach here.
|
||||
return nil, errors.Errorf("should not reach here for disaggregated tiflash mode")
|
||||
default:
|
||||
return nil, errors.Errorf("unsupported storage type: %v", et)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -203,3 +203,9 @@ func SetStoreLivenessTimeout(t time.Duration) {
|
|||
func NewRegionCache(pdClient pd.Client) *locate.RegionCache {
|
||||
return locate.NewRegionCache(pdClient)
|
||||
}
|
||||
|
||||
type LabelFilter = locate.LabelFilter
|
||||
var LabelFilterOnlyTiFlashWriteNode = locate.LabelFilterOnlyTiFlashWriteNode
|
||||
var LabelFilterNoTiFlashWriteNode = locate.LabelFilterNoTiFlashWriteNode
|
||||
var LabelFilterAllTiFlashNode = locate.LabelFilterAllTiFlashNode
|
||||
var LabelFilterAllNode = locate.LabelFilterAllNode
|
||||
|
|
|
|||
|
|
@ -73,6 +73,8 @@ const (
|
|||
EngineLabelKey = "engine"
|
||||
EngineLabelTiFlash = "tiflash"
|
||||
EngineLabelTiFlashCompute = "tiflash_compute"
|
||||
EngineRoleLabelKey = "engine_role"
|
||||
EngineRoleWrite = "write"
|
||||
)
|
||||
|
||||
// GetStoreTypeByMeta gets store type by store meta pb.
|
||||
|
|
|
|||
Loading…
Reference in New Issue