From 700cbe60d2b104150a31a14f390abeedeadcfca0 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 23 May 2022 11:22:53 +0800 Subject: [PATCH] support find stores with tiflash_mpp label (#481) Signed-off-by: guo-shaoge Co-authored-by: Yilin Chen --- go.mod | 2 + go.sum | 4 + integration_tests/go.sum | 4 + internal/client/client.go | 3 +- internal/locate/region_cache.go | 147 +++++++++++++++++++++++-- internal/locate/region_request.go | 19 +++- internal/mockstore/mocktikv/session.go | 6 +- tikv/gc.go | 2 +- tikvrpc/endpoint.go | 18 ++- 9 files changed, 186 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index e6aaf8e3..79f79e23 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/prometheus/client_golang v1.11.0 github.com/prometheus/client_model v0.2.0 github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect + github.com/stathat/consistent v1.0.0 github.com/stretchr/testify v1.7.0 github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710 github.com/twmb/murmur3 v1.1.3 @@ -32,4 +33,5 @@ require ( go.uber.org/zap v1.20.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c google.golang.org/grpc v1.43.0 + stathat.com/c/consistent v1.0.0 // indirect ) diff --git a/go.sum b/go.sum index 448cfa1a..f95c1904 100644 --- a/go.sum +++ b/go.sum @@ -199,6 +199,8 @@ github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U= +github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -410,3 +412,5 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= +stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 9a26e82a..3117ec94 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -703,6 +703,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= +github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U= +github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -1400,3 +1402,5 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 h1:ucqkfpjg9WzSUubAO62csmucvxl4/JeW3F4I4909XkM= sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU= sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67/go.mod h1:L5q+DGLGOQFpo1snNEkLOJT2d1YTW66rWNzatr3He1k= +stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c= +stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0= diff --git a/internal/client/client.go b/internal/client/client.go index 36f3abad..d9e1e416 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -381,7 +381,8 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R } // TiDB will not send batch commands to TiFlash, to resolve the conflict with Batch Cop Request. - enableBatch := req.StoreTp != tikvrpc.TiDB && req.StoreTp != tikvrpc.TiFlash + // tiflash/tiflash_mpp/tidb don't use BatchCommand. + enableBatch := req.StoreTp == tikvrpc.TiKV connArray, err := c.getConnArray(addr, enableBatch) if err != nil { return nil, err diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 77021ac8..fa8c38fe 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -41,6 +41,7 @@ import ( "fmt" "math/rand" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -52,6 +53,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pkg/errors" + "github.com/stathat/consistent" "github.com/tikv/client-go/v2/config" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/internal/client" @@ -67,10 +69,12 @@ import ( "golang.org/x/sync/singleflight" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" ) const ( @@ -371,6 +375,11 @@ type RegionCache struct { sync.RWMutex stores map[uint64]*Store } + tiflashMPPStoreMu struct { + sync.RWMutex + needReload bool + stores []*Store + } notifyCheckCh chan struct{} closeCh chan struct{} @@ -390,6 +399,8 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.mu.latestVersions = make(map[uint64]RegionVerID) c.mu.sorted = btree.New(btreeDegree) c.storeMu.stores = make(map[uint64]*Store) + c.tiflashMPPStoreMu.needReload = true + c.tiflashMPPStoreMu.stores = make([]*Store, 0) c.notifyCheckCh = make(chan struct{}, 1) c.closeCh = make(chan struct{}) interval := config.GetGlobalConfig().StoresRefreshInterval @@ -728,6 +739,59 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, return nil, nil } +// GetTiFlashMPPRPCContextByConsistentHash return rpcCtx of tiflash_mpp stores. +// Each mpp computation of specific region will be handled by specific tiflash_mpp node. +// 1. Get all stores with label . +// 2. Get rpcCtx that indicates where the region is stored. +// 3. Compute which tiflash_mpp node should handle this region by consistent hash. +// 4. Replace infos(addr/Store) that indicate where the region is stored to infos that indicate where the region will be computed. +// NOTE: This function make sure the returned slice of RPCContext and the input ids correspond to each other. +func (c *RegionCache) GetTiFlashMPPRPCContextByConsistentHash(bo *retry.Backoffer, ids []RegionVerID) (res []*RPCContext, err error) { + mppStores, err := c.GetTiFlashMPPStores(bo) + if err != nil { + return nil, err + } + if len(mppStores) == 0 { + return nil, errors.New("Number of tiflash_mpp node is zero") + } + + hasher := consistent.New() + for _, store := range mppStores { + hasher.Add(store.GetAddr()) + } + + for _, id := range ids { + addr, err := hasher.Get(strconv.Itoa(int(id.GetID()))) + if err != nil { + return nil, err + } + rpcCtx, err := c.GetTiFlashRPCContext(bo, id, true) + if err != nil { + return nil, err + } + if rpcCtx == nil { + return nil, nil + } + + var store *Store + for _, s := range mppStores { + if s.GetAddr() == addr { + store = s + break + } + } + if store == nil { + return nil, errors.New(fmt.Sprintf("cannot find mpp store: %v", addr)) + } + + rpcCtx.Store = store + rpcCtx.Addr = addr + // Maybe no need to replace rpcCtx.AccessMode, it's only used for loadBalance when access storeIdx. + res = append(res, rpcCtx) + } + return res, nil +} + // KeyLocation is the region and range that a key is located. type KeyLocation struct { Region RegionVerID @@ -1743,6 +1807,70 @@ func (c *RegionCache) GetTiFlashStores() []*Store { return stores } +// GetTiFlashMPPStores returns all stores with lable . +func (c *RegionCache) GetTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, err error) { + c.tiflashMPPStoreMu.RLock() + needReload := c.tiflashMPPStoreMu.needReload + stores := c.tiflashMPPStoreMu.stores + c.tiflashMPPStoreMu.RUnlock() + + if needReload { + return c.reloadTiFlashMPPStores(bo) + } + return stores, nil +} + +func (c *RegionCache) reloadTiFlashMPPStores(bo *retry.Backoffer) (res []*Store, _ error) { + stores, err := c.pdClient.GetAllStores(bo.GetCtx()) + if err != nil { + return nil, err + } + for _, s := range stores { + if isStoreContainLabel(s.GetLabels(), tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashMPP) { + res = append(res, &Store{ + storeID: s.GetId(), + addr: s.GetAddress(), + saddr: s.GetStatusAddress(), + storeType: tikvrpc.GetStoreTypeByMeta(s), + labels: s.GetLabels(), + state: uint64(resolved), + }) + } + } + + c.tiflashMPPStoreMu.Lock() + c.tiflashMPPStoreMu.stores = res + c.tiflashMPPStoreMu.Unlock() + return res, nil +} + +// InvalidateTiFlashMPPStoresIfGRPCError will invalid cache if is GRPC error. +// For now, only consider GRPC unavailable error. +func (c *RegionCache) InvalidateTiFlashMPPStoresIfGRPCError(err error) bool { + var invalidate bool + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.Unavailable: + invalidate = true + default: + } + } + if !invalidate { + return false + } + + c.InvalidateTiFlashMPPStores() + return true +} + +// InvalidateTiFlashMPPStores set needReload be true, +// and will refresh tiflash_mpp store cache next time. +func (c *RegionCache) InvalidateTiFlashMPPStores() { + c.tiflashMPPStoreMu.Lock() + defer c.tiflashMPPStoreMu.Unlock() + c.tiflashMPPStoreMu.needReload = true +} + // UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if // the latestBucketsVer is newer than the cached one. func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsVer uint64) { @@ -2217,20 +2345,23 @@ func (s *Store) IsLabelsMatch(labels []*metapb.StoreLabel) bool { return true } for _, targetLabel := range labels { - match := false - for _, label := range s.labels { - if targetLabel.Key == label.Key && targetLabel.Value == label.Value { - match = true - break - } - } - if !match { + if !isStoreContainLabel(s.labels, targetLabel.Key, targetLabel.Value) { return false } } return true } +func isStoreContainLabel(labels []*metapb.StoreLabel, key string, val string) (res bool) { + for _, label := range labels { + if label.GetKey() == key && label.GetValue() == val { + res = true + break + } + } + return res +} + type livenessState uint32 var ( diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 1ea60df4..32c81f03 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -862,6 +862,17 @@ func (s *RegionRequestSender) getRPCContext( return s.regionCache.GetTiFlashRPCContext(bo, regionID, true) case tikvrpc.TiDB: return &RPCContext{Addr: s.storeAddr}, nil + case tikvrpc.TiFlashMPP: + rpcCtxs, err := s.regionCache.GetTiFlashMPPRPCContextByConsistentHash(bo, []RegionVerID{regionID}) + if err != nil { + return nil, err + } + if rpcCtxs == nil { + return nil, nil + } else if len(rpcCtxs) != 1 { + return nil, errors.New(fmt.Sprintf("unexpected number of rpcCtx, expect 1, got: %v", len(rpcCtxs))) + } + return rpcCtxs[0], nil default: return nil, errors.Errorf("unsupported storage type: %v", et) } @@ -1271,7 +1282,9 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e } } - if ctx.Meta != nil { + if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlashMPP { + s.regionCache.InvalidateTiFlashMPPStoresIfGRPCError(err) + } else if ctx.Meta != nil { if s.replicaSelector != nil { s.replicaSelector.onSendFailure(bo, err) } else { @@ -1283,7 +1296,7 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e // When a store is not available, the leader of related region should be elected quickly. // TODO: the number of retry time should be limited:since region may be unavailable // when some unrecoverable disaster happened. - if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash { + if ctx.Store != nil && ctx.Store.storeType.IsTiFlashRelatedType() { err = bo.Backoff(retry.BoTiFlashRPC, errors.Errorf("send tiflash request error: %v, ctx: %v, try next peer later", err, ctx)) } else { err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx)) @@ -1434,7 +1447,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later", zap.String("reason", regionErr.GetServerIsBusy().GetReason()), zap.Stringer("ctx", ctx)) - if ctx != nil && ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash { + if ctx != nil && ctx.Store != nil && ctx.Store.storeType.IsTiFlashRelatedType() { err = bo.Backoff(retry.BoTiFlashServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) } else { err = bo.Backoff(retry.BoTiKVServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) diff --git a/internal/mockstore/mocktikv/session.go b/internal/mockstore/mocktikv/session.go index f29b8823..f03084e7 100644 --- a/internal/mockstore/mocktikv/session.go +++ b/internal/mockstore/mocktikv/session.go @@ -131,7 +131,7 @@ func (s *Session) CheckRequestContext(ctx *kvrpcpb.Context) *errorpb.Error { } } // The Peer on the Store is not leader. If it's tiflash store , we pass this check. - if storePeer.GetId() != leaderPeer.GetId() && !isTiFlashStore(s.cluster.GetStore(storePeer.GetStoreId())) { + if storePeer.GetId() != leaderPeer.GetId() && !isTiFlashRelatedStore(s.cluster.GetStore(storePeer.GetStoreId())) { return &errorpb.Error{ Message: *proto.String("not leader"), NotLeader: &errorpb.NotLeader{ @@ -182,9 +182,9 @@ func (s *Session) checkKeyInRegion(key []byte) bool { return regionContains(s.startKey, s.endKey, NewMvccKey(key)) } -func isTiFlashStore(store *metapb.Store) bool { +func isTiFlashRelatedStore(store *metapb.Store) bool { for _, l := range store.GetLabels() { - if l.GetKey() == "engine" && l.GetValue() == "tiflash" { + if l.GetKey() == "engine" && (l.GetValue() == "tiflash" || l.GetValue() == "tiflash_mpp") { return true } } diff --git a/tikv/gc.go b/tikv/gc.go index 7d7f0598..e6111b23 100644 --- a/tikv/gc.go +++ b/tikv/gc.go @@ -275,7 +275,7 @@ func (s *KVStore) listStoresForUnsafeDestory(ctx context.Context) ([]*metapb.Sto if store.State == metapb.StoreState_Tombstone { continue } - if tikvrpc.GetStoreTypeByMeta(store) == tikvrpc.TiFlash { + if tikvrpc.GetStoreTypeByMeta(store).IsTiFlashRelatedType() { continue } upStores = append(upStores, store) diff --git a/tikvrpc/endpoint.go b/tikvrpc/endpoint.go index 98f45ccd..b7d44c7f 100644 --- a/tikvrpc/endpoint.go +++ b/tikvrpc/endpoint.go @@ -44,6 +44,7 @@ const ( TiKV EndpointType = iota TiFlash TiDB + TiFlashMPP ) // Name returns the name of endpoint type. @@ -55,23 +56,34 @@ func (t EndpointType) Name() string { return "tiflash" case TiDB: return "tidb" + case TiFlashMPP: + return "tiflash_mpp" } return "unspecified" } +// IsTiFlashRelatedType return true if it's tiflash or tiflash_mpp. +func (t EndpointType) IsTiFlashRelatedType() bool { + return t == TiFlash || t == TiFlashMPP +} + // Constants to determine engine type. // They should be synced with PD. const ( - engineLabelKey = "engine" - engineLabelTiFlash = "tiflash" + EngineLabelKey = "engine" + EngineLabelTiFlash = "tiflash" + EngineLabelTiFlashMPP = "tiflash_mpp" ) // GetStoreTypeByMeta gets store type by store meta pb. func GetStoreTypeByMeta(store *metapb.Store) EndpointType { for _, label := range store.Labels { - if label.Key == engineLabelKey && label.Value == engineLabelTiFlash { + if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlash { return TiFlash } + if label.Key == EngineLabelKey && label.Value == EngineLabelTiFlashMPP { + return TiFlashMPP + } } return TiKV }