From 0d003d077be7f7b4da3dbb59a6c138d08d4ad98b Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 6 Mar 2023 17:53:57 +0800 Subject: [PATCH] region_cache: support for disaggregated tiflash Signed-off-by: guo-shaoge --- internal/locate/region_cache.go | 88 ++++++++++++------------------- internal/locate/region_request.go | 19 ++----- tikv/region.go | 6 +++ tikvrpc/endpoint.go | 2 + 4 files changed, 45 insertions(+), 70 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index bf969b5a..33fa929e 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -42,7 +42,6 @@ import ( "math" "math/rand" "sort" - "strconv" "strings" "sync" "sync/atomic" @@ -55,7 +54,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pkg/errors" - "github.com/stathat/consistent" "github.com/tikv/client-go/v2/config" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/internal/apicodec" @@ -86,6 +84,27 @@ const ( defaultRegionsPerBatch = 128 ) +// Return false means label doesn't match, and will ignore this store. +type LabelFilter = func(labels []*metapb.StoreLabel) bool + +var LabelFilterOnlyTiFlashWriteNode = func(labels []*metapb.StoreLabel) bool { + return isStoreContainLabel(labels, tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlash) && + isStoreContainLabel(labels, tikvrpc.EngineRoleLabelKey, tikvrpc.EngineRoleWrite) +} + +var LabelFilterNoTiFlashWriteNode = func(labels []*metapb.StoreLabel) bool { + return isStoreContainLabel(labels, tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlash) && + !isStoreContainLabel(labels, tikvrpc.EngineRoleLabelKey, tikvrpc.EngineRoleWrite) +} + +var LabelFilterAllTiFlashNode = func(labels []*metapb.StoreLabel) bool { + return isStoreContainLabel(labels, tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlash) +} + +var LabelFilterAllNode = func(_ []*metapb.StoreLabel) bool { + return true +} + // regionCacheTTLSec is the max idle time for regions in the region cache. var regionCacheTTLSec int64 = 600 @@ -703,7 +722,7 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep } // GetAllValidTiFlashStores returns the store ids of all valid TiFlash stores, the store id of currentStore is always the first one -func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store) []uint64 { +func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store, labelFilter LabelFilter) []uint64 { // set the cap to 2 because usually, TiFlash table will have 2 replicas allStores := make([]uint64, 0, 2) // make sure currentStore id is always the first in allStores @@ -731,6 +750,9 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto if storeFailEpoch != regionStore.storeEpochs[storeIdx] { continue } + if !labelFilter(store.labels) { + continue + } allStores = append(allStores, store.storeID) } return allStores @@ -739,7 +761,7 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto // GetTiFlashRPCContext returns RPCContext for a region must access flash store. If it returns nil, the region // must be out of date and already dropped from cache or not flash store found. // `loadBalance` is an option. For batch cop, it is pointless and might cause try the failed store repeatly. -func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, loadBalance bool) (*RPCContext, error) { +func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, loadBalance bool, labelFilter LabelFilter) (*RPCContext, error) { ts := time.Now().Unix() cachedRegion := c.GetCachedRegionWithRLock(id) @@ -762,6 +784,9 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, for i := 0; i < regionStore.accessStoreNum(tiFlashOnly); i++ { accessIdx := AccessIndex((sIdx + i) % regionStore.accessStoreNum(tiFlashOnly)) storeIdx, store := regionStore.accessStore(tiFlashOnly, accessIdx) + if !labelFilter(store.labels) { + continue + } addr, err := c.getStoreAddr(bo, cachedRegion, store) if err != nil { return nil, err @@ -801,56 +826,6 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, return nil, nil } -// GetTiFlashComputeRPCContextByConsistentHash return rpcCtx of tiflash_compute stores. -// Each mpp computation of specific region will be handled by specific node whose engine-label is tiflash_compute. -// 1. Get all stores with label . -// 2. Get rpcCtx that indicates where the region is stored. -// 3. Compute which tiflash_compute node should handle this region by consistent hash. -// 4. Replace infos(addr/Store) that indicate where the region is stored to infos that indicate where the region will be computed. -// NOTE: This function make sure the returned slice of RPCContext and the input ids correspond to each other. -func (c *RegionCache) GetTiFlashComputeRPCContextByConsistentHash(bo *retry.Backoffer, ids []RegionVerID, stores []*Store) (res []*RPCContext, err error) { - hasher := consistent.New() - hasher.NumberOfReplicas = 200 // Larger replicas can balance requests more evenly - for _, store := range stores { - if !isStoreContainLabel(store.labels, tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashCompute) { - return nil, errors.New("expect store should be tiflash_compute") - } - hasher.Add(store.GetAddr()) - } - - for _, id := range ids { - addr, err := hasher.Get(strconv.Itoa(int(id.GetID()))) - if err != nil { - return nil, err - } - rpcCtx, err := c.GetTiFlashRPCContext(bo, id, false) - if err != nil { - return nil, err - } - if rpcCtx == nil { - logutil.Logger(context.Background()).Info("rpcCtx is nil", zap.Any("region", id.String())) - return nil, nil - } - - var store *Store - for _, s := range stores { - if s.GetAddr() == addr { - store = s - break - } - } - if store == nil { - return nil, errors.New(fmt.Sprintf("cannot find tiflash_compute store: %v", addr)) - } - - rpcCtx.Store = store - rpcCtx.Addr = addr - // Maybe no need to replace rpcCtx.AccessMode, it's only used for loadBalance when access storeIdx. - res = append(res, rpcCtx) - } - return res, nil -} - // KeyLocation is the region and range that a key is located. type KeyLocation struct { Region RegionVerID @@ -1841,12 +1816,15 @@ func (c *RegionCache) PDClient() pd.Client { } // GetTiFlashStores returns the information of all tiflash nodes. -func (c *RegionCache) GetTiFlashStores() []*Store { +func (c *RegionCache) GetTiFlashStores(labelFilter LabelFilter) []*Store { c.storeMu.RLock() defer c.storeMu.RUnlock() var stores []*Store for _, s := range c.storeMu.stores { if s.storeType == tikvrpc.TiFlash { + if !labelFilter(s.labels) { + continue + } stores = append(stores, s) } } diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index d3bd89c9..dce6d953 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1002,24 +1002,13 @@ func (s *RegionRequestSender) getRPCContext( } return s.replicaSelector.next(bo) case tikvrpc.TiFlash: - return s.regionCache.GetTiFlashRPCContext(bo, regionID, true) + // Should ignore WN, because in disaggregated tiflash mode, TiDB will build rpcCtx itself. + return s.regionCache.GetTiFlashRPCContext(bo, regionID, true, LabelFilterNoTiFlashWriteNode) case tikvrpc.TiDB: return &RPCContext{Addr: s.storeAddr}, nil case tikvrpc.TiFlashCompute: - stores, err := s.regionCache.GetTiFlashComputeStores(bo) - if err != nil { - return nil, err - } - rpcCtxs, err := s.regionCache.GetTiFlashComputeRPCContextByConsistentHash(bo, []RegionVerID{regionID}, stores) - if err != nil { - return nil, err - } - if rpcCtxs == nil { - return nil, nil - } else if len(rpcCtxs) != 1 { - return nil, errors.New(fmt.Sprintf("unexpected number of rpcCtx, expect 1, got: %v", len(rpcCtxs))) - } - return rpcCtxs[0], nil + // In disaggregated tiflash mode, TiDB will build rpcCtx itself, so cannot reach here. + return nil, errors.Errorf("should not reach here for disaggregated tiflash mode") default: return nil, errors.Errorf("unsupported storage type: %v", et) } diff --git a/tikv/region.go b/tikv/region.go index 97845aa7..9e87d804 100644 --- a/tikv/region.go +++ b/tikv/region.go @@ -203,3 +203,9 @@ func SetStoreLivenessTimeout(t time.Duration) { func NewRegionCache(pdClient pd.Client) *locate.RegionCache { return locate.NewRegionCache(pdClient) } + +type LabelFilter = locate.LabelFilter +var LabelFilterOnlyTiFlashWriteNode = locate.LabelFilterOnlyTiFlashWriteNode +var LabelFilterNoTiFlashWriteNode = locate.LabelFilterNoTiFlashWriteNode +var LabelFilterAllTiFlashNode = locate.LabelFilterAllTiFlashNode +var LabelFilterAllNode = locate.LabelFilterAllNode diff --git a/tikvrpc/endpoint.go b/tikvrpc/endpoint.go index 5e82229e..6047b70a 100644 --- a/tikvrpc/endpoint.go +++ b/tikvrpc/endpoint.go @@ -73,6 +73,8 @@ const ( EngineLabelKey = "engine" EngineLabelTiFlash = "tiflash" EngineLabelTiFlashCompute = "tiflash_compute" + EngineRoleLabelKey = "engine_role" + EngineRoleWrite = "write" ) // GetStoreTypeByMeta gets store type by store meta pb.