diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index cbab0b14..5a353a81 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -376,7 +376,7 @@ type RegionCache struct { sync.RWMutex stores map[uint64]*Store } - tiflashMPPStoreMu struct { + tiflashComputeStoreMu struct { sync.RWMutex needReload bool stores []*Store @@ -411,8 +411,8 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.mu.latestVersions = make(map[uint64]RegionVerID) c.mu.sorted = NewSortedRegions(btreeDegree) c.storeMu.stores = make(map[uint64]*Store) - c.tiflashMPPStoreMu.needReload = true - c.tiflashMPPStoreMu.stores = make([]*Store, 0) + c.tiflashComputeStoreMu.needReload = true + c.tiflashComputeStoreMu.stores = make([]*Store, 0) c.notifyCheckCh = make(chan struct{}, 1) c.ctx, c.cancelFunc = context.WithCancel(context.Background()) interval := config.GetGlobalConfig().StoresRefreshInterval @@ -751,24 +751,24 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, return nil, nil } -// GetTiFlashMPPRPCContextByConsistentHash return rpcCtx of tiflash_mpp stores. -// Each mpp computation of specific region will be handled by specific tiflash_mpp node. -// 1. Get all stores with label . +// GetTiFlashComputeRPCContextByConsistentHash return rpcCtx of tiflash_compute stores. +// Each mpp computation of specific region will be handled by specific node whose engine-label is tiflash_compute. +// 1. Get all stores with label . // 2. Get rpcCtx that indicates where the region is stored. -// 3. Compute which tiflash_mpp node should handle this region by consistent hash. +// 3. Compute which tiflash_compute node should handle this region by consistent hash. // 4. Replace infos(addr/Store) that indicate where the region is stored to infos that indicate where the region will be computed. // NOTE: This function make sure the returned slice of RPCContext and the input ids correspond to each other. -func (c *RegionCache) GetTiFlashMPPRPCContextByConsistentHash(bo *retry.Backoffer, ids []RegionVerID) (res []*RPCContext, err error) { - mppStores, err := c.GetTiFlashMPPStores(bo) +func (c *RegionCache) GetTiFlashComputeRPCContextByConsistentHash(bo *retry.Backoffer, ids []RegionVerID) (res []*RPCContext, err error) { + stores, err := c.GetTiFlashComputeStores(bo) if err != nil { return nil, err } - if len(mppStores) == 0 { - return nil, errors.New("Number of tiflash_mpp node is zero") + if len(stores) == 0 { + return nil, errors.New("number of tiflash_compute node is zero") } hasher := consistent.New() - for _, store := range mppStores { + for _, store := range stores { hasher.Add(store.GetAddr()) } @@ -786,14 +786,14 @@ func (c *RegionCache) GetTiFlashMPPRPCContextByConsistentHash(bo *retry.Backoffe } var store *Store - for _, s := range mppStores { + for _, s := range stores { if s.GetAddr() == addr { store = s break } } if store == nil { - return nil, errors.New(fmt.Sprintf("cannot find mpp store: %v", addr)) + return nil, errors.New(fmt.Sprintf("cannot find tiflash_compute store: %v", addr)) } rpcCtx.Store = store @@ -1811,26 +1811,26 @@ func (c *RegionCache) GetTiFlashStores() []*Store { return stores } -// GetTiFlashMPPStores returns all stores with lable . -func (c *RegionCache) GetTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, err error) { - c.tiflashMPPStoreMu.RLock() - needReload := c.tiflashMPPStoreMu.needReload - stores := c.tiflashMPPStoreMu.stores - c.tiflashMPPStoreMu.RUnlock() +// GetTiFlashComputeStores returns all stores with lable . +func (c *RegionCache) GetTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, err error) { + c.tiflashComputeStoreMu.RLock() + needReload := c.tiflashComputeStoreMu.needReload + stores := c.tiflashComputeStoreMu.stores + c.tiflashComputeStoreMu.RUnlock() if needReload { - return c.reloadTiFlashMPPStores(bo) + return c.reloadTiFlashComputeStores(bo) } return stores, nil } -func (c *RegionCache) reloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, _ error) { +func (c *RegionCache) reloadTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, _ error) { stores, err := c.pdClient.GetAllStores(bo.GetCtx()) if err != nil { return nil, err } for _, s := range stores { - if isStoreContainLabel(s.GetLabels(), tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashMPP) { + if s.GetState() == metapb.StoreState_Up && isStoreContainLabel(s.GetLabels(), tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashCompute) { res = append(res, &Store{ storeID: s.GetId(), addr: s.GetAddress(), @@ -1842,15 +1842,15 @@ func (c *RegionCache) reloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, } } - c.tiflashMPPStoreMu.Lock() - c.tiflashMPPStoreMu.stores = res - c.tiflashMPPStoreMu.Unlock() + c.tiflashComputeStoreMu.Lock() + c.tiflashComputeStoreMu.stores = res + c.tiflashComputeStoreMu.Unlock() return res, nil } -// InvalidateTiFlashMPPStoresIfGRPCError will invalid cache if is GRPC error. +// InvalidateTiFlashComputeStoresIfGRPCError will invalid cache if is GRPC error. // For now, only consider GRPC unavailable error. -func (c *RegionCache) InvalidateTiFlashMPPStoresIfGRPCError(err error) bool { +func (c *RegionCache) InvalidateTiFlashComputeStoresIfGRPCError(err error) bool { var invalidate bool if st, ok := status.FromError(err); ok { switch st.Code() { @@ -1863,16 +1863,16 @@ func (c *RegionCache) InvalidateTiFlashMPPStoresIfGRPCError(err error) bool { return false } - c.InvalidateTiFlashMPPStores() + c.InvalidateTiFlashComputeStores() return true } -// InvalidateTiFlashMPPStores set needReload be true, -// and will refresh tiflash_mpp store cache next time. -func (c *RegionCache) InvalidateTiFlashMPPStores() { - c.tiflashMPPStoreMu.Lock() - defer c.tiflashMPPStoreMu.Unlock() - c.tiflashMPPStoreMu.needReload = true +// InvalidateTiFlashComputeStores set needReload be true, +// and will refresh tiflash_compute store cache next time. +func (c *RegionCache) InvalidateTiFlashComputeStores() { + c.tiflashComputeStoreMu.Lock() + defer c.tiflashComputeStoreMu.Unlock() + c.tiflashComputeStoreMu.needReload = true } // UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index d4470b0b..2b640cdb 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -878,8 +878,8 @@ func (s *RegionRequestSender) getRPCContext( return s.regionCache.GetTiFlashRPCContext(bo, regionID, true) case tikvrpc.TiDB: return &RPCContext{Addr: s.storeAddr}, nil - case tikvrpc.TiFlashMPP: - rpcCtxs, err := s.regionCache.GetTiFlashMPPRPCContextByConsistentHash(bo, []RegionVerID{regionID}) + case tikvrpc.TiFlashCompute: + rpcCtxs, err := s.regionCache.GetTiFlashComputeRPCContextByConsistentHash(bo, []RegionVerID{regionID}) if err != nil { return nil, err } @@ -1308,8 +1308,8 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e } } - if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashMPP { - s.regionCache.InvalidateTiFlashMPPStoresIfGRPCError(err) + if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashCompute { + s.regionCache.InvalidateTiFlashComputeStoresIfGRPCError(err) } else if ctx.Meta != nil { if s.replicaSelector != nil { s.replicaSelector.onSendFailure(bo, err) diff --git a/tikvrpc/endpoint.go b/tikvrpc/endpoint.go index b7d44c7f..5e82229e 100644 --- a/tikvrpc/endpoint.go +++ b/tikvrpc/endpoint.go @@ -44,7 +44,7 @@ const ( TiKV EndpointType = iota TiFlash TiDB - TiFlashMPP + TiFlashCompute ) // Name returns the name of endpoint type. @@ -56,23 +56,23 @@ func (t EndpointType) Name() string { return "tiflash" case TiDB: return "tidb" - case TiFlashMPP: - return "tiflash_mpp" + case TiFlashCompute: + return "tiflash_compute" } return "unspecified" } -// IsTiFlashRelatedType return true if it's tiflash or tiflash_mpp. +// IsTiFlashRelatedType return true if it's tiflash or tiflash_compute. func (t EndpointType) IsTiFlashRelatedType() bool { - return t == TiFlash || t == TiFlashMPP + return t == TiFlash || t == TiFlashCompute } // Constants to determine engine type. // They should be synced with PD. const ( - EngineLabelKey = "engine" - EngineLabelTiFlash = "tiflash" - EngineLabelTiFlashMPP = "tiflash_mpp" + EngineLabelKey = "engine" + EngineLabelTiFlash = "tiflash" + EngineLabelTiFlashCompute = "tiflash_compute" ) // GetStoreTypeByMeta gets store type by store meta pb. @@ -81,8 +81,8 @@ func GetStoreTypeByMeta(store *metapb.Store) EndpointType { if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlash { return TiFlash } - if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlashMPP { - return TiFlashMPP + if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlashCompute { + return TiFlashCompute } } return TiKV