mirror of https://github.com/tikv/client-go.git
region_cache: only load UP tiflash_mpp store (#598)
Signed-off-by: guo-shaoge <shaoge1994@163.com>
This commit is contained in:
parent
6def8d7b90
commit
e76cd3e240
|
|
@ -376,7 +376,7 @@ type RegionCache struct {
|
|||
sync.RWMutex
|
||||
stores map[uint64]*Store
|
||||
}
|
||||
tiflashMPPStoreMu struct {
|
||||
tiflashComputeStoreMu struct {
|
||||
sync.RWMutex
|
||||
needReload bool
|
||||
stores []*Store
|
||||
|
|
@ -411,8 +411,8 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
|
|||
c.mu.latestVersions = make(map[uint64]RegionVerID)
|
||||
c.mu.sorted = NewSortedRegions(btreeDegree)
|
||||
c.storeMu.stores = make(map[uint64]*Store)
|
||||
c.tiflashMPPStoreMu.needReload = true
|
||||
c.tiflashMPPStoreMu.stores = make([]*Store, 0)
|
||||
c.tiflashComputeStoreMu.needReload = true
|
||||
c.tiflashComputeStoreMu.stores = make([]*Store, 0)
|
||||
c.notifyCheckCh = make(chan struct{}, 1)
|
||||
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
|
||||
interval := config.GetGlobalConfig().StoresRefreshInterval
|
||||
|
|
@ -751,24 +751,24 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID,
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// GetTiFlashMPPRPCContextByConsistentHash return rpcCtx of tiflash_mpp stores.
|
||||
// Each mpp computation of specific region will be handled by specific tiflash_mpp node.
|
||||
// 1. Get all stores with label <engine, tiflash_mpp>.
|
||||
// GetTiFlashComputeRPCContextByConsistentHash return rpcCtx of tiflash_compute stores.
|
||||
// Each mpp computation of specific region will be handled by specific node whose engine-label is tiflash_compute.
|
||||
// 1. Get all stores with label <engine, tiflash_compute>.
|
||||
// 2. Get rpcCtx that indicates where the region is stored.
|
||||
// 3. Compute which tiflash_mpp node should handle this region by consistent hash.
|
||||
// 3. Compute which tiflash_compute node should handle this region by consistent hash.
|
||||
// 4. Replace infos(addr/Store) that indicate where the region is stored to infos that indicate where the region will be computed.
|
||||
// NOTE: This function make sure the returned slice of RPCContext and the input ids correspond to each other.
|
||||
func (c *RegionCache) GetTiFlashMPPRPCContextByConsistentHash(bo *retry.Backoffer, ids []RegionVerID) (res []*RPCContext, err error) {
|
||||
mppStores, err := c.GetTiFlashMPPStores(bo)
|
||||
func (c *RegionCache) GetTiFlashComputeRPCContextByConsistentHash(bo *retry.Backoffer, ids []RegionVerID) (res []*RPCContext, err error) {
|
||||
stores, err := c.GetTiFlashComputeStores(bo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(mppStores) == 0 {
|
||||
return nil, errors.New("Number of tiflash_mpp node is zero")
|
||||
if len(stores) == 0 {
|
||||
return nil, errors.New("number of tiflash_compute node is zero")
|
||||
}
|
||||
|
||||
hasher := consistent.New()
|
||||
for _, store := range mppStores {
|
||||
for _, store := range stores {
|
||||
hasher.Add(store.GetAddr())
|
||||
}
|
||||
|
||||
|
|
@ -786,14 +786,14 @@ func (c *RegionCache) GetTiFlashMPPRPCContextByConsistentHash(bo *retry.Backoffe
|
|||
}
|
||||
|
||||
var store *Store
|
||||
for _, s := range mppStores {
|
||||
for _, s := range stores {
|
||||
if s.GetAddr() == addr {
|
||||
store = s
|
||||
break
|
||||
}
|
||||
}
|
||||
if store == nil {
|
||||
return nil, errors.New(fmt.Sprintf("cannot find mpp store: %v", addr))
|
||||
return nil, errors.New(fmt.Sprintf("cannot find tiflash_compute store: %v", addr))
|
||||
}
|
||||
|
||||
rpcCtx.Store = store
|
||||
|
|
@ -1811,26 +1811,26 @@ func (c *RegionCache) GetTiFlashStores() []*Store {
|
|||
return stores
|
||||
}
|
||||
|
||||
// GetTiFlashMPPStores returns all stores with lable <engine, tiflash_mpp>.
|
||||
func (c *RegionCache) GetTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, err error) {
|
||||
c.tiflashMPPStoreMu.RLock()
|
||||
needReload := c.tiflashMPPStoreMu.needReload
|
||||
stores := c.tiflashMPPStoreMu.stores
|
||||
c.tiflashMPPStoreMu.RUnlock()
|
||||
// GetTiFlashComputeStores returns all stores with lable <engine, tiflash_compute>.
|
||||
func (c *RegionCache) GetTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, err error) {
|
||||
c.tiflashComputeStoreMu.RLock()
|
||||
needReload := c.tiflashComputeStoreMu.needReload
|
||||
stores := c.tiflashComputeStoreMu.stores
|
||||
c.tiflashComputeStoreMu.RUnlock()
|
||||
|
||||
if needReload {
|
||||
return c.reloadTiFlashMPPStores(bo)
|
||||
return c.reloadTiFlashComputeStores(bo)
|
||||
}
|
||||
return stores, nil
|
||||
}
|
||||
|
||||
func (c *RegionCache) reloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, _ error) {
|
||||
func (c *RegionCache) reloadTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, _ error) {
|
||||
stores, err := c.pdClient.GetAllStores(bo.GetCtx())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, s := range stores {
|
||||
if isStoreContainLabel(s.GetLabels(), tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashMPP) {
|
||||
if s.GetState() == metapb.StoreState_Up && isStoreContainLabel(s.GetLabels(), tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashCompute) {
|
||||
res = append(res, &Store{
|
||||
storeID: s.GetId(),
|
||||
addr: s.GetAddress(),
|
||||
|
|
@ -1842,15 +1842,15 @@ func (c *RegionCache) reloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store,
|
|||
}
|
||||
}
|
||||
|
||||
c.tiflashMPPStoreMu.Lock()
|
||||
c.tiflashMPPStoreMu.stores = res
|
||||
c.tiflashMPPStoreMu.Unlock()
|
||||
c.tiflashComputeStoreMu.Lock()
|
||||
c.tiflashComputeStoreMu.stores = res
|
||||
c.tiflashComputeStoreMu.Unlock()
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// InvalidateTiFlashMPPStoresIfGRPCError will invalid cache if is GRPC error.
|
||||
// InvalidateTiFlashComputeStoresIfGRPCError will invalid cache if is GRPC error.
|
||||
// For now, only consider GRPC unavailable error.
|
||||
func (c *RegionCache) InvalidateTiFlashMPPStoresIfGRPCError(err error) bool {
|
||||
func (c *RegionCache) InvalidateTiFlashComputeStoresIfGRPCError(err error) bool {
|
||||
var invalidate bool
|
||||
if st, ok := status.FromError(err); ok {
|
||||
switch st.Code() {
|
||||
|
|
@ -1863,16 +1863,16 @@ func (c *RegionCache) InvalidateTiFlashMPPStoresIfGRPCError(err error) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
c.InvalidateTiFlashMPPStores()
|
||||
c.InvalidateTiFlashComputeStores()
|
||||
return true
|
||||
}
|
||||
|
||||
// InvalidateTiFlashMPPStores set needReload be true,
|
||||
// and will refresh tiflash_mpp store cache next time.
|
||||
func (c *RegionCache) InvalidateTiFlashMPPStores() {
|
||||
c.tiflashMPPStoreMu.Lock()
|
||||
defer c.tiflashMPPStoreMu.Unlock()
|
||||
c.tiflashMPPStoreMu.needReload = true
|
||||
// InvalidateTiFlashComputeStores set needReload be true,
|
||||
// and will refresh tiflash_compute store cache next time.
|
||||
func (c *RegionCache) InvalidateTiFlashComputeStores() {
|
||||
c.tiflashComputeStoreMu.Lock()
|
||||
defer c.tiflashComputeStoreMu.Unlock()
|
||||
c.tiflashComputeStoreMu.needReload = true
|
||||
}
|
||||
|
||||
// UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if
|
||||
|
|
|
|||
|
|
@ -878,8 +878,8 @@ func (s *RegionRequestSender) getRPCContext(
|
|||
return s.regionCache.GetTiFlashRPCContext(bo, regionID, true)
|
||||
case tikvrpc.TiDB:
|
||||
return &RPCContext{Addr: s.storeAddr}, nil
|
||||
case tikvrpc.TiFlashMPP:
|
||||
rpcCtxs, err := s.regionCache.GetTiFlashMPPRPCContextByConsistentHash(bo, []RegionVerID{regionID})
|
||||
case tikvrpc.TiFlashCompute:
|
||||
rpcCtxs, err := s.regionCache.GetTiFlashComputeRPCContextByConsistentHash(bo, []RegionVerID{regionID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -1308,8 +1308,8 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e
|
|||
}
|
||||
}
|
||||
|
||||
if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashMPP {
|
||||
s.regionCache.InvalidateTiFlashMPPStoresIfGRPCError(err)
|
||||
if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashCompute {
|
||||
s.regionCache.InvalidateTiFlashComputeStoresIfGRPCError(err)
|
||||
} else if ctx.Meta != nil {
|
||||
if s.replicaSelector != nil {
|
||||
s.replicaSelector.onSendFailure(bo, err)
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ const (
|
|||
TiKV EndpointType = iota
|
||||
TiFlash
|
||||
TiDB
|
||||
TiFlashMPP
|
||||
TiFlashCompute
|
||||
)
|
||||
|
||||
// Name returns the name of endpoint type.
|
||||
|
|
@ -56,23 +56,23 @@ func (t EndpointType) Name() string {
|
|||
return "tiflash"
|
||||
case TiDB:
|
||||
return "tidb"
|
||||
case TiFlashMPP:
|
||||
return "tiflash_mpp"
|
||||
case TiFlashCompute:
|
||||
return "tiflash_compute"
|
||||
}
|
||||
return "unspecified"
|
||||
}
|
||||
|
||||
// IsTiFlashRelatedType return true if it's tiflash or tiflash_mpp.
|
||||
// IsTiFlashRelatedType return true if it's tiflash or tiflash_compute.
|
||||
func (t EndpointType) IsTiFlashRelatedType() bool {
|
||||
return t == TiFlash || t == TiFlashMPP
|
||||
return t == TiFlash || t == TiFlashCompute
|
||||
}
|
||||
|
||||
// Constants to determine engine type.
|
||||
// They should be synced with PD.
|
||||
const (
|
||||
EngineLabelKey = "engine"
|
||||
EngineLabelTiFlash = "tiflash"
|
||||
EngineLabelTiFlashMPP = "tiflash_mpp"
|
||||
EngineLabelKey = "engine"
|
||||
EngineLabelTiFlash = "tiflash"
|
||||
EngineLabelTiFlashCompute = "tiflash_compute"
|
||||
)
|
||||
|
||||
// GetStoreTypeByMeta gets store type by store meta pb.
|
||||
|
|
@ -81,8 +81,8 @@ func GetStoreTypeByMeta(store *metapb.Store) EndpointType {
|
|||
if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlash {
|
||||
return TiFlash
|
||||
}
|
||||
if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlashMPP {
|
||||
return TiFlashMPP
|
||||
if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlashCompute {
|
||||
return TiFlashCompute
|
||||
}
|
||||
}
|
||||
return TiKV
|
||||
|
|
|
|||
Loading…
Reference in New Issue