support find stores with tiflash_mpp label (#481)

Signed-off-by: guo-shaoge <shaoge1994@163.com>
Co-authored-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
guo-shaoge 2022-05-23 11:22:53 +08:00 committed by GitHub
parent de7ca289ac
commit 700cbe60d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 186 additions and 19 deletions

2
go.mod
View File

@ -22,6 +22,7 @@ require (
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_model v0.2.0
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/stathat/consistent v1.0.0
github.com/stretchr/testify v1.7.0
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710
github.com/twmb/murmur3 v1.1.3
@ -32,4 +33,5 @@ require (
go.uber.org/zap v1.20.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/grpc v1.43.0
stathat.com/c/consistent v1.0.0 // indirect
)

4
go.sum
View File

@ -199,6 +199,8 @@ github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U=
github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@ -410,3 +412,5 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=

View File

@ -703,6 +703,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U=
github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@ -1400,3 +1402,5 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 h1:ucqkfpjg9WzSUubAO62csmucvxl4/JeW3F4I4909XkM=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67/go.mod h1:L5q+DGLGOQFpo1snNEkLOJT2d1YTW66rWNzatr3He1k=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=

View File

@ -381,7 +381,8 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
}
// TiDB will not send batch commands to TiFlash, to resolve the conflict with Batch Cop Request.
enableBatch := req.StoreTp != tikvrpc.TiDB && req.StoreTp != tikvrpc.TiFlash
// tiflash/tiflash_mpp/tidb don't use BatchCommand.
enableBatch := req.StoreTp == tikvrpc.TiKV
connArray, err := c.getConnArray(addr, enableBatch)
if err != nil {
return nil, err

View File

@ -41,6 +41,7 @@ import (
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
@ -52,6 +53,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/stathat/consistent"
"github.com/tikv/client-go/v2/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/client"
@ -67,10 +69,12 @@ import (
"golang.org/x/sync/singleflight"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
)
const (
@ -371,6 +375,11 @@ type RegionCache struct {
sync.RWMutex
stores map[uint64]*Store
}
tiflashMPPStoreMu struct {
sync.RWMutex
needReload bool
stores []*Store
}
notifyCheckCh chan struct{}
closeCh chan struct{}
@ -390,6 +399,8 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
c.mu.latestVersions = make(map[uint64]RegionVerID)
c.mu.sorted = btree.New(btreeDegree)
c.storeMu.stores = make(map[uint64]*Store)
c.tiflashMPPStoreMu.needReload = true
c.tiflashMPPStoreMu.stores = make([]*Store, 0)
c.notifyCheckCh = make(chan struct{}, 1)
c.closeCh = make(chan struct{})
interval := config.GetGlobalConfig().StoresRefreshInterval
@ -728,6 +739,59 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID,
return nil, nil
}
// GetTiFlashMPPRPCContextByConsistentHash return rpcCtx of tiflash_mpp stores.
// Each mpp computation of specific region will be handled by specific tiflash_mpp node.
// 1. Get all stores with label <engine, tiflash_mpp>.
// 2. Get rpcCtx that indicates where the region is stored.
// 3. Compute which tiflash_mpp node should handle this region by consistent hash.
// 4. Replace infos(addr/Store) that indicate where the region is stored to infos that indicate where the region will be computed.
// NOTE: This function make sure the returned slice of RPCContext and the input ids correspond to each other.
func (c *RegionCache) GetTiFlashMPPRPCContextByConsistentHash(bo *retry.Backoffer, ids []RegionVerID) (res []*RPCContext, err error) {
mppStores, err := c.GetTiFlashMPPStores(bo)
if err != nil {
return nil, err
}
if len(mppStores) == 0 {
return nil, errors.New("Number of tiflash_mpp node is zero")
}
hasher := consistent.New()
for _, store := range mppStores {
hasher.Add(store.GetAddr())
}
for _, id := range ids {
addr, err := hasher.Get(strconv.Itoa(int(id.GetID())))
if err != nil {
return nil, err
}
rpcCtx, err := c.GetTiFlashRPCContext(bo, id, true)
if err != nil {
return nil, err
}
if rpcCtx == nil {
return nil, nil
}
var store *Store
for _, s := range mppStores {
if s.GetAddr() == addr {
store = s
break
}
}
if store == nil {
return nil, errors.New(fmt.Sprintf("cannot find mpp store: %v", addr))
}
rpcCtx.Store = store
rpcCtx.Addr = addr
// Maybe no need to replace rpcCtx.AccessMode, it's only used for loadBalance when access storeIdx.
res = append(res, rpcCtx)
}
return res, nil
}
// KeyLocation is the region and range that a key is located.
type KeyLocation struct {
Region RegionVerID
@ -1743,6 +1807,70 @@ func (c *RegionCache) GetTiFlashStores() []*Store {
return stores
}
// GetTiFlashMPPStores returns all stores with lable <engine, tiflash_mpp>.
func (c *RegionCache) GetTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, err error) {
c.tiflashMPPStoreMu.RLock()
needReload := c.tiflashMPPStoreMu.needReload
stores := c.tiflashMPPStoreMu.stores
c.tiflashMPPStoreMu.RUnlock()
if needReload {
return c.reloadTiFlashMPPStores(bo)
}
return stores, nil
}
func (c *RegionCache) reloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, _ error) {
stores, err := c.pdClient.GetAllStores(bo.GetCtx())
if err != nil {
return nil, err
}
for _, s := range stores {
if isStoreContainLabel(s.GetLabels(), tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashMPP) {
res = append(res, &Store{
storeID: s.GetId(),
addr: s.GetAddress(),
saddr: s.GetStatusAddress(),
storeType: tikvrpc.GetStoreTypeByMeta(s),
labels: s.GetLabels(),
state: uint64(resolved),
})
}
}
c.tiflashMPPStoreMu.Lock()
c.tiflashMPPStoreMu.stores = res
c.tiflashMPPStoreMu.Unlock()
return res, nil
}
// InvalidateTiFlashMPPStoresIfGRPCError will invalid cache if is GRPC error.
// For now, only consider GRPC unavailable error.
func (c *RegionCache) InvalidateTiFlashMPPStoresIfGRPCError(err error) bool {
var invalidate bool
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.Unavailable:
invalidate = true
default:
}
}
if !invalidate {
return false
}
c.InvalidateTiFlashMPPStores()
return true
}
// InvalidateTiFlashMPPStores set needReload be true,
// and will refresh tiflash_mpp store cache next time.
func (c *RegionCache) InvalidateTiFlashMPPStores() {
c.tiflashMPPStoreMu.Lock()
defer c.tiflashMPPStoreMu.Unlock()
c.tiflashMPPStoreMu.needReload = true
}
// UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if
// the latestBucketsVer is newer than the cached one.
func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsVer uint64) {
@ -2217,20 +2345,23 @@ func (s *Store) IsLabelsMatch(labels []*metapb.StoreLabel) bool {
return true
}
for _, targetLabel := range labels {
match := false
for _, label := range s.labels {
if targetLabel.Key == label.Key && targetLabel.Value == label.Value {
match = true
break
}
}
if !match {
if !isStoreContainLabel(s.labels, targetLabel.Key, targetLabel.Value) {
return false
}
}
return true
}
func isStoreContainLabel(labels []*metapb.StoreLabel, key string, val string) (res bool) {
for _, label := range labels {
if label.GetKey() == key && label.GetValue() == val {
res = true
break
}
}
return res
}
type livenessState uint32
var (

View File

@ -862,6 +862,17 @@ func (s *RegionRequestSender) getRPCContext(
return s.regionCache.GetTiFlashRPCContext(bo, regionID, true)
case tikvrpc.TiDB:
return &RPCContext{Addr: s.storeAddr}, nil
case tikvrpc.TiFlashMPP:
rpcCtxs, err := s.regionCache.GetTiFlashMPPRPCContextByConsistentHash(bo, []RegionVerID{regionID})
if err != nil {
return nil, err
}
if rpcCtxs == nil {
return nil, nil
} else if len(rpcCtxs) != 1 {
return nil, errors.New(fmt.Sprintf("unexpected number of rpcCtx, expect 1, got: %v", len(rpcCtxs)))
}
return rpcCtxs[0], nil
default:
return nil, errors.Errorf("unsupported storage type: %v", et)
}
@ -1271,7 +1282,9 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e
}
}
if ctx.Meta != nil {
if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashMPP {
s.regionCache.InvalidateTiFlashMPPStoresIfGRPCError(err)
} else if ctx.Meta != nil {
if s.replicaSelector != nil {
s.replicaSelector.onSendFailure(bo, err)
} else {
@ -1283,7 +1296,7 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash {
if ctx.Store != nil && ctx.Store.storeType.IsTiFlashRelatedType() {
err = bo.Backoff(retry.BoTiFlashRPC, errors.Errorf("send tiflash request error: %v, ctx: %v, try next peer later", err, ctx))
} else {
err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx))
@ -1434,7 +1447,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later",
zap.String("reason", regionErr.GetServerIsBusy().GetReason()),
zap.Stringer("ctx", ctx))
if ctx != nil && ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash {
if ctx != nil && ctx.Store != nil && ctx.Store.storeType.IsTiFlashRelatedType() {
err = bo.Backoff(retry.BoTiFlashServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
} else {
err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))

View File

@ -131,7 +131,7 @@ func (s *Session) CheckRequestContext(ctx *kvrpcpb.Context) *errorpb.Error {
}
}
// The Peer on the Store is not leader. If it's tiflash store , we pass this check.
if storePeer.GetId() != leaderPeer.GetId() && !isTiFlashStore(s.cluster.GetStore(storePeer.GetStoreId())) {
if storePeer.GetId() != leaderPeer.GetId() && !isTiFlashRelatedStore(s.cluster.GetStore(storePeer.GetStoreId())) {
return &errorpb.Error{
Message: *proto.String("not leader"),
NotLeader: &errorpb.NotLeader{
@ -182,9 +182,9 @@ func (s *Session) checkKeyInRegion(key []byte) bool {
return regionContains(s.startKey, s.endKey, NewMvccKey(key))
}
func isTiFlashStore(store *metapb.Store) bool {
func isTiFlashRelatedStore(store *metapb.Store) bool {
for _, l := range store.GetLabels() {
if l.GetKey() == "engine" && l.GetValue() == "tiflash" {
if l.GetKey() == "engine" && (l.GetValue() == "tiflash" || l.GetValue() == "tiflash_mpp") {
return true
}
}

View File

@ -275,7 +275,7 @@ func (s *KVStore) listStoresForUnsafeDestory(ctx context.Context) ([]*metapb.Sto
if store.State == metapb.StoreState_Tombstone {
continue
}
if tikvrpc.GetStoreTypeByMeta(store) == tikvrpc.TiFlash {
if tikvrpc.GetStoreTypeByMeta(store).IsTiFlashRelatedType() {
continue
}
upStores = append(upStores, store)

View File

@ -44,6 +44,7 @@ const (
TiKV EndpointType = iota
TiFlash
TiDB
TiFlashMPP
)
// Name returns the name of endpoint type.
@ -55,23 +56,34 @@ func (t EndpointType) Name() string {
return "tiflash"
case TiDB:
return "tidb"
case TiFlashMPP:
return "tiflash_mpp"
}
return "unspecified"
}
// IsTiFlashRelatedType return true if it's tiflash or tiflash_mpp.
func (t EndpointType) IsTiFlashRelatedType() bool {
return t == TiFlash || t == TiFlashMPP
}
// Constants to determine engine type.
// They should be synced with PD.
const (
engineLabelKey = "engine"
engineLabelTiFlash = "tiflash"
EngineLabelKey = "engine"
EngineLabelTiFlash = "tiflash"
EngineLabelTiFlashMPP = "tiflash_mpp"
)
// GetStoreTypeByMeta gets store type by store meta pb.
func GetStoreTypeByMeta(store *metapb.Store) EndpointType {
for _, label := range store.Labels {
if label.Key == engineLabelKey && label.Value == engineLabelTiFlash {
if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlash {
return TiFlash
}
if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlashMPP {
return TiFlashMPP
}
}
return TiKV
}