mirror of https://github.com/tikv/client-go.git
3144 lines
102 KiB
Go
3144 lines
102 KiB
Go
// Copyright 2021 TiKV Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// NOTE: The code in this file is based on code from the
|
|
// TiDB project, licensed under the Apache License v 2.0
|
|
//
|
|
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/locate/region_cache.go
|
|
//
|
|
|
|
// Copyright 2016 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package locate
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"slices"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/google/btree"
|
|
"github.com/opentracing/opentracing-go"
|
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
|
"github.com/pingcap/kvproto/pkg/metapb"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/tikv/client-go/v2/config"
|
|
"github.com/tikv/client-go/v2/config/retry"
|
|
tikverr "github.com/tikv/client-go/v2/error"
|
|
"github.com/tikv/client-go/v2/internal/apicodec"
|
|
"github.com/tikv/client-go/v2/internal/client"
|
|
"github.com/tikv/client-go/v2/internal/logutil"
|
|
"github.com/tikv/client-go/v2/kv"
|
|
"github.com/tikv/client-go/v2/metrics"
|
|
"github.com/tikv/client-go/v2/tikvrpc"
|
|
"github.com/tikv/client-go/v2/util"
|
|
"github.com/tikv/client-go/v2/util/redact"
|
|
pd "github.com/tikv/pd/client"
|
|
"github.com/tikv/pd/client/clients/router"
|
|
"github.com/tikv/pd/client/opt"
|
|
"github.com/tikv/pd/client/pkg/circuitbreaker"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
const (
|
|
btreeDegree = 32
|
|
expiredTTL = -1
|
|
defaultRegionsPerBatch = 128
|
|
)
|
|
|
|
// LabelFilter returns false means label doesn't match, and will ignore this store.
|
|
type LabelFilter = func(labels []*metapb.StoreLabel) bool
|
|
|
|
// LabelFilterOnlyTiFlashWriteNode will only select stores whose label contains: <engine, tiflash> and <engine_role, write>.
|
|
// Only used for tiflash_compute node.
|
|
var LabelFilterOnlyTiFlashWriteNode = func(labels []*metapb.StoreLabel) bool {
|
|
return isStoreContainLabel(labels, tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlash) &&
|
|
isStoreContainLabel(labels, tikvrpc.EngineRoleLabelKey, tikvrpc.EngineRoleWrite)
|
|
}
|
|
|
|
// LabelFilterNoTiFlashWriteNode will only select stores whose label contains: <engine, tiflash>, but not contains <engine_role, write>.
|
|
// Normally tidb use this filter.
|
|
var LabelFilterNoTiFlashWriteNode = func(labels []*metapb.StoreLabel) bool {
|
|
return isStoreContainLabel(labels, tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlash) &&
|
|
!isStoreContainLabel(labels, tikvrpc.EngineRoleLabelKey, tikvrpc.EngineRoleWrite)
|
|
}
|
|
|
|
// LabelFilterAllTiFlashNode will select all tiflash stores.
|
|
var LabelFilterAllTiFlashNode = func(labels []*metapb.StoreLabel) bool {
|
|
return isStoreContainLabel(labels, tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlash)
|
|
}
|
|
|
|
// LabelFilterAllNode will select all stores.
|
|
var LabelFilterAllNode = func(_ []*metapb.StoreLabel) bool {
|
|
return true
|
|
}
|
|
|
|
// regionCacheTTLSec is the max idle time for regions in the region cache.
|
|
var regionCacheTTLSec int64 = 600
|
|
|
|
// SetRegionCacheTTLSec sets regionCacheTTLSec to t.
|
|
func SetRegionCacheTTLSec(t int64) {
|
|
regionCacheTTLSec = t
|
|
}
|
|
|
|
// regionCacheTTLJitterSec is the max jitter time for region cache TTL.
|
|
var regionCacheTTLJitterSec int64 = 60
|
|
|
|
// SetRegionCacheTTLWithJitter sets region cache TTL with jitter. The real TTL is in range of [base, base+jitter).
|
|
func SetRegionCacheTTLWithJitter(base int64, jitter int64) {
|
|
regionCacheTTLSec = base
|
|
regionCacheTTLJitterSec = jitter
|
|
}
|
|
|
|
// nextTTL returns a random TTL in range [ts+base, ts+base+jitter). The input ts should be an epoch timestamp in seconds.
|
|
func nextTTL(ts int64) int64 {
|
|
jitter := int64(0)
|
|
if regionCacheTTLJitterSec > 0 {
|
|
jitter = rand.Int63n(regionCacheTTLJitterSec)
|
|
}
|
|
return ts + regionCacheTTLSec + jitter
|
|
}
|
|
|
|
var pdRegionMetaCircuitBreaker = circuitbreaker.NewCircuitBreaker("region-meta",
|
|
circuitbreaker.Settings{
|
|
ErrorRateWindow: 30 * time.Second,
|
|
MinQPSForOpen: 10,
|
|
CoolDownInterval: 10 * time.Second,
|
|
HalfOpenSuccessCount: 1,
|
|
})
|
|
|
|
// wrap context with circuit breaker for PD region metadata calls
|
|
func withPDCircuitBreaker(ctx context.Context) context.Context {
|
|
return circuitbreaker.WithCircuitBreaker(ctx, pdRegionMetaCircuitBreaker)
|
|
}
|
|
|
|
// ChangePDRegionMetaCircuitBreakerSettings changes circuit breaker changes for region metadata calls
|
|
func ChangePDRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) {
|
|
pdRegionMetaCircuitBreaker.ChangeSettings(apply)
|
|
}
|
|
|
|
// nextTTLWithoutJitter is used for test.
|
|
func nextTTLWithoutJitter(ts int64) int64 {
|
|
return ts + regionCacheTTLSec
|
|
}
|
|
|
|
const (
|
|
needReloadOnAccess int32 = 1 << iota // indicates the region will be reloaded on next access
|
|
needExpireAfterTTL // indicates the region will expire after RegionCacheTTL (even when it's accessed continuously)
|
|
needDelayedReloadPending // indicates the region will be reloaded later after it's scanned by GC
|
|
needDelayedReloadReady // indicates the region has been scanned by GC and can be reloaded by id on next access
|
|
)
|
|
|
|
// InvalidReason is the reason why a cached region is invalidated.
|
|
// The region cache may take different strategies to handle different reasons.
|
|
// For example, when a cached region is invalidated due to no leader, region cache
|
|
// will always access to a different peer.
|
|
type InvalidReason int32
|
|
|
|
const (
|
|
// Ok indicates the cached region is valid
|
|
Ok InvalidReason = iota
|
|
// NoLeader indicates it's invalidated due to no leader
|
|
NoLeader
|
|
// RegionNotFound indicates it's invalidated due to region not found in the store
|
|
RegionNotFound
|
|
// EpochNotMatch indicates it's invalidated due to epoch not match
|
|
EpochNotMatch
|
|
// StoreNotFound indicates it's invalidated due to store not found in PD
|
|
StoreNotFound
|
|
// Other indicates it's invalidated due to other reasons, e.g., the store
|
|
// is removed from the cluster, fail to send requests to the store.
|
|
Other
|
|
)
|
|
|
|
func (r InvalidReason) String() string {
|
|
switch r {
|
|
case Ok:
|
|
return "Ok"
|
|
case Other:
|
|
return "Other"
|
|
case EpochNotMatch:
|
|
return "EpochNotMatch"
|
|
case RegionNotFound:
|
|
return "RegionNotFound"
|
|
case StoreNotFound:
|
|
return "StoreNotFound"
|
|
case NoLeader:
|
|
return "NoLeader"
|
|
default:
|
|
return "Unknown"
|
|
}
|
|
}
|
|
|
|
// Region presents kv region
|
|
type Region struct {
|
|
meta *metapb.Region // raw region meta from PD, immutable after init
|
|
store unsafe.Pointer // point to region store info, see RegionStore
|
|
ttl int64 // region TTL in epoch seconds, see checkRegionCacheTTL
|
|
syncFlags int32 // region need be sync later, see needReloadOnAccess, needExpireAfterTTL
|
|
invalidReason InvalidReason // the reason why the region is invalidated
|
|
}
|
|
|
|
// AccessIndex represent the index for accessIndex array
|
|
type AccessIndex int
|
|
|
|
// regionStore represents region stores info
|
|
// it will be store as unsafe.Pointer and be load at once
|
|
type regionStore struct {
|
|
// corresponding stores(in the same order) of Region.meta.Peers in this region.
|
|
stores []*Store
|
|
// snapshots of store's epoch, need reload when `storeEpochs[curr] != stores[cur].fail`
|
|
storeEpochs []uint32
|
|
// A region can consist of stores with different type(TiKV and TiFlash). It maintains AccessMode => idx in stores,
|
|
// e.g., stores[accessIndex[tiKVOnly][workTiKVIdx]] is the current working TiKV.
|
|
accessIndex [numAccessMode][]int
|
|
// accessIndex[tiKVOnly][workTiKVIdx] is the index of the current working TiKV in stores.
|
|
workTiKVIdx AccessIndex
|
|
// accessIndex[tiKVOnly][proxyTiKVIdx] is the index of TiKV that can forward requests to the leader in stores, -1 means not using proxy.
|
|
proxyTiKVIdx AccessIndex
|
|
// accessIndex[tiFlashOnly][workTiFlashIdx] is the index of the current working TiFlash in stores.
|
|
workTiFlashIdx atomic.Int32
|
|
// buckets is not accurate and it can change even if the region is not changed.
|
|
// It can be stale and buckets keys can be out of the region range.
|
|
buckets *metapb.Buckets
|
|
// pendingPeers refers to pdRegion.PendingPeers. It's immutable and can be used to reconstruct pdRegions.
|
|
pendingPeers []*metapb.Peer
|
|
// downPeers refers to pdRegion.DownPeers. It's immutable and can be used to reconstruct pdRegions.
|
|
downPeers []*metapb.Peer
|
|
}
|
|
|
|
func (r *regionStore) accessStore(mode accessMode, idx AccessIndex) (int, *Store) {
|
|
sidx := r.accessIndex[mode][idx]
|
|
return sidx, r.stores[sidx]
|
|
}
|
|
|
|
func (r *regionStore) getAccessIndex(mode accessMode, store *Store) AccessIndex {
|
|
for index, sidx := range r.accessIndex[mode] {
|
|
if r.stores[sidx].storeID == store.storeID {
|
|
return AccessIndex(index)
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func (r *regionStore) accessStoreNum(mode accessMode) int {
|
|
return len(r.accessIndex[mode])
|
|
}
|
|
|
|
// clone clones region store struct.
|
|
func (r *regionStore) clone() *regionStore {
|
|
storeEpochs := make([]uint32, len(r.stores))
|
|
copy(storeEpochs, r.storeEpochs)
|
|
rs := ®ionStore{
|
|
proxyTiKVIdx: r.proxyTiKVIdx,
|
|
workTiKVIdx: r.workTiKVIdx,
|
|
stores: r.stores,
|
|
storeEpochs: storeEpochs,
|
|
buckets: r.buckets,
|
|
}
|
|
rs.workTiFlashIdx.Store(r.workTiFlashIdx.Load())
|
|
for i := 0; i < int(numAccessMode); i++ {
|
|
rs.accessIndex[i] = make([]int, len(r.accessIndex[i]))
|
|
copy(rs.accessIndex[i], r.accessIndex[i])
|
|
}
|
|
return rs
|
|
}
|
|
|
|
// return next follower store's index
|
|
func (r *regionStore) follower(seed uint32, op *storeSelectorOp) AccessIndex {
|
|
l := uint32(r.accessStoreNum(tiKVOnly))
|
|
if l <= 1 {
|
|
return r.workTiKVIdx
|
|
}
|
|
|
|
for retry := l - 1; retry > 0; retry-- {
|
|
followerIdx := AccessIndex(seed % (l - 1))
|
|
if followerIdx >= r.workTiKVIdx {
|
|
followerIdx++
|
|
}
|
|
storeIdx, s := r.accessStore(tiKVOnly, followerIdx)
|
|
if r.storeEpochs[storeIdx] == atomic.LoadUint32(&s.epoch) && r.filterStoreCandidate(followerIdx, op) {
|
|
return followerIdx
|
|
}
|
|
seed++
|
|
}
|
|
return r.workTiKVIdx
|
|
}
|
|
|
|
// return next leader or follower store's index
|
|
func (r *regionStore) kvPeer(seed uint32, op *storeSelectorOp) AccessIndex {
|
|
if op.leaderOnly {
|
|
return r.workTiKVIdx
|
|
}
|
|
candidates := make([]AccessIndex, 0, r.accessStoreNum(tiKVOnly))
|
|
for i := 0; i < r.accessStoreNum(tiKVOnly); i++ {
|
|
accessIdx := AccessIndex(i)
|
|
storeIdx, s := r.accessStore(tiKVOnly, accessIdx)
|
|
if r.storeEpochs[storeIdx] != atomic.LoadUint32(&s.epoch) || !r.filterStoreCandidate(accessIdx, op) {
|
|
continue
|
|
}
|
|
candidates = append(candidates, accessIdx)
|
|
}
|
|
// If there is no candidates, send to current workTiKVIdx which generally is the leader.
|
|
if len(candidates) == 0 {
|
|
return r.workTiKVIdx
|
|
}
|
|
return candidates[seed%uint32(len(candidates))]
|
|
}
|
|
|
|
func (r *regionStore) filterStoreCandidate(aidx AccessIndex, op *storeSelectorOp) bool {
|
|
_, s := r.accessStore(tiKVOnly, aidx)
|
|
// filter label unmatched store and slow stores when ReplicaReadMode == PreferLeader
|
|
return s.IsLabelsMatch(op.labels) && (!op.preferLeader || (aidx == r.workTiKVIdx && !s.healthStatus.IsSlow()))
|
|
}
|
|
|
|
func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *router.Region) (*Region, error) {
|
|
r := &Region{meta: pdRegion.Meta}
|
|
// regionStore pull used store from global store map
|
|
// to avoid acquire storeMu in later access.
|
|
rs := ®ionStore{
|
|
workTiKVIdx: 0,
|
|
proxyTiKVIdx: -1,
|
|
stores: make([]*Store, 0, len(r.meta.Peers)),
|
|
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
|
|
buckets: pdRegion.Buckets,
|
|
pendingPeers: pdRegion.PendingPeers,
|
|
downPeers: pdRegion.DownPeers,
|
|
}
|
|
|
|
leader := pdRegion.Leader
|
|
var leaderAccessIdx AccessIndex
|
|
availablePeers := r.meta.GetPeers()[:0]
|
|
for _, p := range r.meta.Peers {
|
|
store, exists := c.stores.get(p.StoreId)
|
|
if !exists {
|
|
store = c.stores.getOrInsertDefault(p.StoreId)
|
|
}
|
|
addr, err := store.initResolve(bo, c.stores)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Filter out the peer on a tombstone or down store.
|
|
if addr == "" || slices.ContainsFunc(pdRegion.DownPeers, func(dp *metapb.Peer) bool { return isSamePeer(dp, p) }) {
|
|
continue
|
|
}
|
|
|
|
// Since the witness is read-write prohibited, it does not make sense to send requests to it
|
|
// unless it is the leader. When it is the leader and transfer leader encounters a problem,
|
|
// the backoff timeout will be triggered, and the client can give a more accurate error message.
|
|
if p.IsWitness && !isSamePeer(p, leader) {
|
|
continue
|
|
}
|
|
|
|
if isSamePeer(p, leader) {
|
|
leaderAccessIdx = AccessIndex(len(rs.accessIndex[tiKVOnly]))
|
|
}
|
|
availablePeers = append(availablePeers, p)
|
|
switch store.storeType {
|
|
case tikvrpc.TiKV:
|
|
rs.accessIndex[tiKVOnly] = append(rs.accessIndex[tiKVOnly], len(rs.stores))
|
|
case tikvrpc.TiFlash:
|
|
rs.accessIndex[tiFlashOnly] = append(rs.accessIndex[tiFlashOnly], len(rs.stores))
|
|
}
|
|
rs.stores = append(rs.stores, store)
|
|
rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch))
|
|
}
|
|
// TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover.
|
|
// Maybe we need backoff here.
|
|
if len(availablePeers) == 0 {
|
|
return nil, errors.Errorf("no available peers, region: {%v}", r.meta)
|
|
}
|
|
|
|
rs.workTiKVIdx = leaderAccessIdx
|
|
r.setStore(rs)
|
|
r.meta.Peers = availablePeers
|
|
// if the region has down peers, let it expire after TTL.
|
|
if len(pdRegion.DownPeers) > 0 {
|
|
r.syncFlags |= needExpireAfterTTL
|
|
}
|
|
|
|
// mark region has been init accessed.
|
|
r.ttl = nextTTL(time.Now().Unix())
|
|
return r, nil
|
|
}
|
|
|
|
func (r *Region) getStore() (store *regionStore) {
|
|
store = (*regionStore)(atomic.LoadPointer(&r.store))
|
|
return
|
|
}
|
|
|
|
func (r *Region) setStore(store *regionStore) {
|
|
atomic.StorePointer(&r.store, unsafe.Pointer(store))
|
|
}
|
|
|
|
func (r *Region) compareAndSwapStore(oldStore, newStore *regionStore) bool {
|
|
return atomic.CompareAndSwapPointer(&r.store, unsafe.Pointer(oldStore), unsafe.Pointer(newStore))
|
|
}
|
|
|
|
func (r *Region) isCacheTTLExpired(ts int64) bool {
|
|
return ts > atomic.LoadInt64(&r.ttl)
|
|
}
|
|
|
|
// checkRegionCacheTTL returns false means the region cache is expired.
|
|
func (r *Region) checkRegionCacheTTL(ts int64) bool {
|
|
// Only consider use percentage on this failpoint, for example, "2%return"
|
|
if _, err := util.EvalFailpoint("invalidateRegionCache"); err == nil {
|
|
r.invalidate(Other)
|
|
}
|
|
newTTL := int64(0)
|
|
for {
|
|
ttl := atomic.LoadInt64(&r.ttl)
|
|
if ts > ttl {
|
|
return false
|
|
}
|
|
// skip updating TTL when:
|
|
// 1. the region has been marked as `needExpireAfterTTL`
|
|
// 2. the TTL is far away from ts (still within jitter time)
|
|
if r.checkSyncFlags(needExpireAfterTTL) || ttl > ts+regionCacheTTLSec {
|
|
return true
|
|
}
|
|
if newTTL == 0 {
|
|
newTTL = nextTTL(ts)
|
|
}
|
|
// now we have ts <= ttl <= ts+regionCacheTTLSec <= newTTL = ts+regionCacheTTLSec+randomJitter
|
|
if atomic.CompareAndSwapInt64(&r.ttl, ttl, newTTL) {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
// invalidate invalidates a region, next time it will got null result.
|
|
func (r *Region) invalidate(reason InvalidReason, nocount ...bool) {
|
|
if atomic.CompareAndSwapInt32((*int32)(&r.invalidReason), int32(Ok), int32(reason)) {
|
|
if len(nocount) == 0 || !nocount[0] {
|
|
metrics.RegionCacheCounterWithInvalidateRegionFromCacheOK.Inc()
|
|
}
|
|
atomic.StoreInt64(&r.ttl, expiredTTL)
|
|
}
|
|
}
|
|
|
|
func (r *Region) getSyncFlags() int32 {
|
|
return atomic.LoadInt32(&r.syncFlags)
|
|
}
|
|
|
|
// checkSyncFlags returns true if sync_flags contains any of flags.
|
|
func (r *Region) checkSyncFlags(flags int32) bool {
|
|
return atomic.LoadInt32(&r.syncFlags)&flags > 0
|
|
}
|
|
|
|
// setSyncFlags sets the sync_flags bits to sync_flags|flags.
|
|
func (r *Region) setSyncFlags(flags int32) {
|
|
for {
|
|
oldFlags := atomic.LoadInt32(&r.syncFlags)
|
|
if oldFlags&flags == flags {
|
|
return
|
|
}
|
|
if atomic.CompareAndSwapInt32(&r.syncFlags, oldFlags, oldFlags|flags) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// resetSyncFlags reverts flags from sync_flags (that is sync_flags&^flags), returns the flags that are reset (0 means no flags are reverted).
|
|
func (r *Region) resetSyncFlags(flags int32) int32 {
|
|
for {
|
|
oldFlags := atomic.LoadInt32(&r.syncFlags)
|
|
if oldFlags&flags == 0 {
|
|
return 0
|
|
}
|
|
if atomic.CompareAndSwapInt32(&r.syncFlags, oldFlags, oldFlags&^flags) {
|
|
return oldFlags & flags
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Region) isValid() bool {
|
|
return r != nil && !r.checkSyncFlags(needReloadOnAccess) && r.checkRegionCacheTTL(time.Now().Unix())
|
|
}
|
|
|
|
type regionIndexMu struct {
|
|
sync.RWMutex
|
|
regions map[RegionVerID]*Region // cached regions are organized as regionVerID to region ref mapping
|
|
latestVersions map[uint64]RegionVerID // cache the map from regionID to its latest RegionVerID
|
|
sorted *SortedRegions // cache regions are organized as sorted key to region ref mapping
|
|
}
|
|
|
|
func newRegionIndexMu(rs []*Region) *regionIndexMu {
|
|
r := ®ionIndexMu{}
|
|
r.regions = make(map[RegionVerID]*Region)
|
|
r.latestVersions = make(map[uint64]RegionVerID)
|
|
r.sorted = NewSortedRegions(btreeDegree)
|
|
for _, region := range rs {
|
|
r.insertRegionToCache(region, true, false)
|
|
}
|
|
return r
|
|
}
|
|
|
|
func (mu *regionIndexMu) refresh(r []*Region) {
|
|
newMu := newRegionIndexMu(r)
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
mu.regions = newMu.regions
|
|
mu.latestVersions = newMu.latestVersions
|
|
mu.sorted = newMu.sorted
|
|
}
|
|
|
|
// repeat wraps a `func()` as a schedulable fuction for `bgRunner`.
|
|
func repeat(f func()) func(context.Context, time.Time) bool {
|
|
return func(_ context.Context, _ time.Time) bool {
|
|
f()
|
|
return false
|
|
}
|
|
}
|
|
|
|
// until wraps a `func() bool` as a schedulable fuction for `bgRunner`.
|
|
func until(f func() bool) func(context.Context, time.Time) bool {
|
|
return func(_ context.Context, _ time.Time) bool {
|
|
return f()
|
|
}
|
|
}
|
|
|
|
type bgRunner struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func newBackgroundRunner(ctx context.Context) *bgRunner {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
return &bgRunner{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
}
|
|
|
|
func (r *bgRunner) closed() bool {
|
|
select {
|
|
case <-r.ctx.Done():
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (r *bgRunner) shutdown(wait bool) {
|
|
r.cancel()
|
|
if wait {
|
|
r.wg.Wait()
|
|
}
|
|
}
|
|
|
|
// run calls `f` once in background.
|
|
func (r *bgRunner) run(f func(context.Context)) {
|
|
if r.closed() {
|
|
return
|
|
}
|
|
r.wg.Add(1)
|
|
go func() {
|
|
defer r.wg.Done()
|
|
f(r.ctx)
|
|
}()
|
|
}
|
|
|
|
// schedule calls `f` every `interval`.
|
|
func (r *bgRunner) schedule(f func(context.Context, time.Time) bool, interval time.Duration) {
|
|
if r.closed() || interval <= 0 {
|
|
return
|
|
}
|
|
r.wg.Add(1)
|
|
go func() {
|
|
ticker := time.NewTicker(interval)
|
|
defer func() {
|
|
r.wg.Done()
|
|
ticker.Stop()
|
|
}()
|
|
for {
|
|
select {
|
|
case <-r.ctx.Done():
|
|
return
|
|
case t := <-ticker.C:
|
|
if f(r.ctx, t) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// scheduleWithTrigger likes schedule, but also call `f` when `<-trigger`, in which case the time arg of `f` is zero.
|
|
func (r *bgRunner) scheduleWithTrigger(f func(context.Context, time.Time) bool, interval time.Duration, trigger <-chan struct{}) {
|
|
if r.closed() || interval <= 0 {
|
|
return
|
|
}
|
|
r.wg.Add(1)
|
|
go func() {
|
|
ticker := time.NewTicker(interval)
|
|
defer func() {
|
|
r.wg.Done()
|
|
ticker.Stop()
|
|
}()
|
|
triggerEnabled := trigger != nil
|
|
for triggerEnabled {
|
|
select {
|
|
case <-r.ctx.Done():
|
|
return
|
|
case t := <-ticker.C:
|
|
if f(r.ctx, t) {
|
|
return
|
|
}
|
|
case _, ok := <-trigger:
|
|
if !ok {
|
|
triggerEnabled = false
|
|
} else if f(r.ctx, time.Time{}) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
for {
|
|
select {
|
|
case <-r.ctx.Done():
|
|
return
|
|
case t := <-ticker.C:
|
|
if f(r.ctx, t) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// RegionCache caches Regions loaded from PD.
|
|
// All public methods of this struct should be thread-safe, unless explicitly pointed out or the method is for testing
|
|
// purposes only.
|
|
type RegionCache struct {
|
|
pdClient pd.Client
|
|
codec apicodec.Codec
|
|
enableForwarding bool
|
|
|
|
requestHealthFeedbackCallback func(ctx context.Context, addr string) error
|
|
|
|
mu regionIndexMu
|
|
|
|
stores storeCache
|
|
|
|
// runner for background jobs
|
|
bg *bgRunner
|
|
|
|
clusterID uint64
|
|
}
|
|
|
|
type regionCacheOptions struct {
|
|
noHealthTick bool
|
|
requestHealthFeedbackCallback func(ctx context.Context, addr string) error
|
|
}
|
|
|
|
type RegionCacheOpt func(*regionCacheOptions)
|
|
|
|
func RegionCacheNoHealthTick(o *regionCacheOptions) {
|
|
o.noHealthTick = true
|
|
}
|
|
|
|
func WithRequestHealthFeedbackCallback(callback func(ctx context.Context, addr string) error) RegionCacheOpt {
|
|
return func(options *regionCacheOptions) {
|
|
options.requestHealthFeedbackCallback = callback
|
|
}
|
|
}
|
|
|
|
// NewRegionCache creates a RegionCache.
|
|
func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
|
|
var options regionCacheOptions
|
|
for _, o := range opt {
|
|
o(&options)
|
|
}
|
|
|
|
c := &RegionCache{
|
|
pdClient: pdClient.WithCallerComponent("region-cache"),
|
|
requestHealthFeedbackCallback: options.requestHealthFeedbackCallback,
|
|
}
|
|
|
|
c.codec = apicodec.NewCodecV1(apicodec.ModeRaw)
|
|
if codecPDClient, ok := pdClient.(*CodecPDClient); ok {
|
|
c.codec = codecPDClient.GetCodec()
|
|
}
|
|
|
|
c.stores = newStoreCache(pdClient)
|
|
c.bg = newBackgroundRunner(context.Background())
|
|
c.enableForwarding = config.GetGlobalConfig().EnableForwarding
|
|
if c.pdClient != nil {
|
|
c.clusterID = c.pdClient.GetClusterID(context.Background())
|
|
}
|
|
if c.clusterID == 0 {
|
|
logutil.BgLogger().Error("cluster id is not set properly")
|
|
}
|
|
|
|
if config.GetGlobalConfig().EnablePreload {
|
|
logutil.BgLogger().Info("preload region index start")
|
|
if err := c.refreshRegionIndex(retry.NewBackofferWithVars(c.bg.ctx, 20000, nil)); err != nil {
|
|
logutil.BgLogger().Error("refresh region index failed", zap.Error(err))
|
|
}
|
|
logutil.BgLogger().Info("preload region index finish")
|
|
} else {
|
|
c.mu = *newRegionIndexMu(nil)
|
|
}
|
|
|
|
var (
|
|
refreshStoreInterval = config.GetGlobalConfig().StoresRefreshInterval
|
|
needCheckStores []*Store
|
|
)
|
|
c.bg.scheduleWithTrigger(func(ctx context.Context, t time.Time) bool {
|
|
// check and resolve normal stores periodically by default.
|
|
filter := func(state resolveState) bool {
|
|
return state != unresolved && state != tombstone && state != deleted
|
|
}
|
|
if t.IsZero() {
|
|
// check and resolve needCheck stores because it's triggered by a CheckStoreEvent this time.
|
|
filter = func(state resolveState) bool { return state == needCheck }
|
|
}
|
|
needCheckStores = c.checkAndResolve(needCheckStores[:0], func(s *Store) bool { return filter(s.getResolveState()) })
|
|
return false
|
|
}, time.Duration(refreshStoreInterval/4)*time.Second, c.stores.getCheckStoreEvents())
|
|
if !options.noHealthTick {
|
|
c.bg.schedule(c.checkAndUpdateStoreHealthStatus, time.Duration(refreshStoreInterval/4)*time.Second)
|
|
}
|
|
c.bg.schedule(repeat(c.reportStoreReplicaFlows), time.Duration(refreshStoreInterval/2)*time.Second)
|
|
if refreshCacheInterval := config.GetGlobalConfig().RegionsRefreshInterval; refreshCacheInterval > 0 {
|
|
c.bg.schedule(func(ctx context.Context, _ time.Time) bool {
|
|
if err := c.refreshRegionIndex(retry.NewBackofferWithVars(ctx, int(refreshCacheInterval)*1000, nil)); err != nil {
|
|
logutil.BgLogger().Error("refresh region cache failed", zap.Error(err))
|
|
}
|
|
return false
|
|
}, time.Duration(refreshCacheInterval)*time.Second)
|
|
} else {
|
|
// cache GC is incompatible with cache refresh
|
|
c.bg.schedule(c.gcRoundFunc(cleanRegionNumPerRound), cleanCacheInterval)
|
|
}
|
|
c.bg.schedule(
|
|
func(ctx context.Context, _ time.Time) bool {
|
|
refreshFullStoreList(ctx, c.stores)
|
|
return false
|
|
}, refreshStoreListInterval,
|
|
)
|
|
return c
|
|
}
|
|
|
|
// Try to refresh full store list. Errors are ignored.
|
|
func refreshFullStoreList(ctx context.Context, stores storeCache) {
|
|
storeList, err := stores.fetchAllStores(ctx)
|
|
if err != nil {
|
|
logutil.Logger(ctx).Info("refresh full store list failed", zap.Error(err))
|
|
return
|
|
}
|
|
for _, store := range storeList {
|
|
_, exist := stores.get(store.GetId())
|
|
if exist {
|
|
continue
|
|
}
|
|
// GetAllStores is supposed to return only Up and Offline stores.
|
|
// This check is being defensive and to make it consistent with store resolve code.
|
|
if store == nil || store.GetState() == metapb.StoreState_Tombstone {
|
|
continue
|
|
}
|
|
addr := store.GetAddress()
|
|
if addr == "" {
|
|
continue
|
|
}
|
|
s := stores.getOrInsertDefault(store.GetId())
|
|
// TODO: maybe refactor this, together with other places initializing Store
|
|
s.addr = addr
|
|
s.peerAddr = store.GetPeerAddress()
|
|
s.saddr = store.GetStatusAddress()
|
|
s.storeType = tikvrpc.GetStoreTypeByMeta(store)
|
|
s.labels = store.GetLabels()
|
|
s.changeResolveStateTo(unresolved, resolved)
|
|
}
|
|
}
|
|
|
|
// only used fot test.
|
|
func newTestRegionCache() *RegionCache {
|
|
c := &RegionCache{}
|
|
c.bg = newBackgroundRunner(context.Background())
|
|
c.mu = *newRegionIndexMu(nil)
|
|
return c
|
|
}
|
|
|
|
// clear clears all cached data in the RegionCache. It's only used in tests.
|
|
func (c *RegionCache) clear() {
|
|
c.mu.refresh(nil)
|
|
c.stores.clear()
|
|
}
|
|
|
|
// thread unsafe, should use with lock
|
|
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) bool {
|
|
return c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion, shouldCount)
|
|
}
|
|
|
|
// Close releases region cache's resource.
|
|
func (c *RegionCache) Close() {
|
|
c.bg.shutdown(true)
|
|
}
|
|
|
|
// checkAndResolve checks and resolve addr of failed stores.
|
|
// this method isn't thread-safe and only be used by one goroutine.
|
|
func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*Store) bool) []*Store {
|
|
defer func() {
|
|
r := recover()
|
|
if r != nil {
|
|
logutil.BgLogger().Error("panic in the checkAndResolve goroutine",
|
|
zap.Any("r", r),
|
|
zap.Stack("stack trace"))
|
|
}
|
|
}()
|
|
|
|
needCheckStores = c.stores.filter(needCheckStores, needCheck)
|
|
for _, store := range needCheckStores {
|
|
_, err := store.reResolve(c.stores, c.bg)
|
|
tikverr.Log(err)
|
|
}
|
|
return needCheckStores
|
|
}
|
|
|
|
// SetRegionCacheStore is used to set a store in region cache, for testing only
|
|
func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
|
|
c.stores.put(newStore(id, addr, peerAddr, "", storeType, resolveState(state), labels))
|
|
}
|
|
|
|
// SetPDClient replaces pd client,for testing only
|
|
func (c *RegionCache) SetPDClient(client pd.Client) {
|
|
c.pdClient = client
|
|
c.stores = newStoreCache(client)
|
|
}
|
|
|
|
// RPCContext contains data that is needed to send RPC to a region.
|
|
type RPCContext struct {
|
|
ClusterID uint64
|
|
Region RegionVerID
|
|
Meta *metapb.Region
|
|
Peer *metapb.Peer
|
|
AccessIdx AccessIndex
|
|
Store *Store
|
|
Addr string
|
|
AccessMode accessMode
|
|
ProxyStore *Store // nil means proxy is not used
|
|
ProxyAddr string // valid when ProxyStore is not nil
|
|
TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers.
|
|
BucketVersion uint64
|
|
}
|
|
|
|
func (c *RPCContext) String() string {
|
|
var runStoreType string
|
|
if c.Store != nil {
|
|
runStoreType = c.Store.storeType.Name()
|
|
}
|
|
res := fmt.Sprintf("region ID: %d, meta: %s, peer: %s, addr: %s, idx: %d, reqStoreType: %s, runStoreType: %s",
|
|
c.Region.GetID(), c.Meta, c.Peer, c.Addr, c.AccessIdx, c.AccessMode, runStoreType)
|
|
if c.ProxyStore != nil {
|
|
res += fmt.Sprintf(", proxy store id: %d, proxy addr: %s", c.ProxyStore.storeID, c.ProxyStore.addr)
|
|
}
|
|
return res
|
|
}
|
|
|
|
type storeSelectorOp struct {
|
|
leaderOnly bool
|
|
preferLeader bool
|
|
labels []*metapb.StoreLabel
|
|
stores []uint64
|
|
}
|
|
|
|
// StoreSelectorOption configures storeSelectorOp.
|
|
type StoreSelectorOption func(*storeSelectorOp)
|
|
|
|
// WithMatchLabels indicates selecting stores with matched labels.
|
|
func WithMatchLabels(labels []*metapb.StoreLabel) StoreSelectorOption {
|
|
return func(op *storeSelectorOp) {
|
|
op.labels = append(op.labels, labels...)
|
|
}
|
|
}
|
|
|
|
// WithLeaderOnly indicates selecting stores with leader only.
|
|
func WithLeaderOnly() StoreSelectorOption {
|
|
return func(op *storeSelectorOp) {
|
|
op.leaderOnly = true
|
|
}
|
|
}
|
|
|
|
// WithPerferLeader indicates selecting stores with leader as priority until leader unaccessible.
|
|
func WithPerferLeader() StoreSelectorOption {
|
|
return func(op *storeSelectorOp) {
|
|
op.preferLeader = true
|
|
}
|
|
}
|
|
|
|
// WithMatchStores indicates selecting stores with matched store ids.
|
|
func WithMatchStores(stores []uint64) StoreSelectorOption {
|
|
return func(op *storeSelectorOp) {
|
|
op.stores = stores
|
|
}
|
|
}
|
|
|
|
// GetTiKVRPCContext returns RPCContext for a region. If it returns nil, the region
|
|
// must be out of date and already dropped from cache.
|
|
func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, replicaRead kv.ReplicaReadType, followerStoreSeed uint32, opts ...StoreSelectorOption) (*RPCContext, error) {
|
|
cachedRegion := c.GetCachedRegionWithRLock(id)
|
|
if !cachedRegion.isValid() {
|
|
return nil, nil
|
|
}
|
|
|
|
regionStore := cachedRegion.getStore()
|
|
var (
|
|
store *Store
|
|
peer *metapb.Peer
|
|
storeIdx int
|
|
accessIdx AccessIndex
|
|
)
|
|
options := &storeSelectorOp{}
|
|
for _, op := range opts {
|
|
op(options)
|
|
}
|
|
isLeaderReq := false
|
|
switch replicaRead {
|
|
case kv.ReplicaReadFollower:
|
|
store, peer, accessIdx, storeIdx = cachedRegion.FollowerStorePeer(regionStore, followerStoreSeed, options)
|
|
case kv.ReplicaReadMixed:
|
|
store, peer, accessIdx, storeIdx = cachedRegion.AnyStorePeer(regionStore, followerStoreSeed, options)
|
|
case kv.ReplicaReadPreferLeader:
|
|
options.preferLeader = true
|
|
store, peer, accessIdx, storeIdx = cachedRegion.AnyStorePeer(regionStore, followerStoreSeed, options)
|
|
default:
|
|
isLeaderReq = true
|
|
store, peer, accessIdx, storeIdx = cachedRegion.WorkStorePeer(regionStore)
|
|
}
|
|
addr, err := c.getStoreAddr(bo, cachedRegion, store)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// enable by `curl -XPUT -d '1*return("[some-addr]")->return("")' http://host:port/tikvclient/injectWrongStoreAddr`
|
|
if val, err := util.EvalFailpoint("injectWrongStoreAddr"); err == nil {
|
|
if a, ok := val.(string); ok && len(a) > 0 {
|
|
addr = a
|
|
}
|
|
}
|
|
if store == nil || len(addr) == 0 {
|
|
// Store not found, region must be out of date.
|
|
cachedRegion.invalidate(StoreNotFound)
|
|
return nil, nil
|
|
}
|
|
|
|
storeFailEpoch := atomic.LoadUint32(&store.epoch)
|
|
if storeFailEpoch != regionStore.storeEpochs[storeIdx] {
|
|
cachedRegion.invalidate(Other)
|
|
logutil.Logger(bo.GetCtx()).Info("invalidate current region, because others failed on same store",
|
|
zap.Uint64("region", id.GetID()),
|
|
zap.String("store", store.addr))
|
|
return nil, nil
|
|
}
|
|
|
|
var (
|
|
proxyStore *Store
|
|
proxyAddr string
|
|
)
|
|
if c.enableForwarding && isLeaderReq {
|
|
if store.getLivenessState() == reachable {
|
|
regionStore.unsetProxyStoreIfNeeded(cachedRegion)
|
|
} else {
|
|
proxyStore, _, _ = c.getProxyStore(cachedRegion, store, regionStore, accessIdx)
|
|
if proxyStore != nil {
|
|
proxyAddr, err = c.getStoreAddr(bo, cachedRegion, proxyStore)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return &RPCContext{
|
|
ClusterID: c.clusterID,
|
|
Region: id,
|
|
Meta: cachedRegion.meta,
|
|
Peer: peer,
|
|
AccessIdx: accessIdx,
|
|
Store: store,
|
|
Addr: addr,
|
|
AccessMode: tiKVOnly,
|
|
ProxyStore: proxyStore,
|
|
ProxyAddr: proxyAddr,
|
|
TiKVNum: regionStore.accessStoreNum(tiKVOnly),
|
|
}, nil
|
|
}
|
|
|
|
// GetAllValidTiFlashStores returns the store ids of all valid TiFlash stores, the store id of currentStore is always the first one
|
|
// Caller may use `nonPendingStores` first, this can avoid task need to wait tiflash replica syncing from tikv.
|
|
// But if all tiflash peers are pending(len(nonPendingStores) == 0), use `allStores` is also ok.
|
|
func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store, labelFilter LabelFilter) ([]uint64, []uint64) {
|
|
// set the cap to 2 because usually, TiFlash table will have 2 replicas
|
|
allStores := make([]uint64, 0, 2)
|
|
nonPendingStores := make([]uint64, 0, 2)
|
|
// make sure currentStore id is always the first in allStores
|
|
allStores = append(allStores, currentStore.storeID)
|
|
ts := time.Now().Unix()
|
|
cachedRegion := c.GetCachedRegionWithRLock(id)
|
|
if cachedRegion == nil {
|
|
return allStores, nonPendingStores
|
|
}
|
|
if !cachedRegion.checkRegionCacheTTL(ts) {
|
|
return allStores, nonPendingStores
|
|
}
|
|
regionStore := cachedRegion.getStore()
|
|
currentIndex := regionStore.getAccessIndex(tiFlashOnly, currentStore)
|
|
if currentIndex == -1 {
|
|
return allStores, nonPendingStores
|
|
}
|
|
for startOffset := 1; startOffset < regionStore.accessStoreNum(tiFlashOnly); startOffset++ {
|
|
accessIdx := AccessIndex((int(currentIndex) + startOffset) % regionStore.accessStoreNum(tiFlashOnly))
|
|
storeIdx, store := regionStore.accessStore(tiFlashOnly, accessIdx)
|
|
if store.getResolveState() == needCheck {
|
|
continue
|
|
}
|
|
storeFailEpoch := atomic.LoadUint32(&store.epoch)
|
|
if storeFailEpoch != regionStore.storeEpochs[storeIdx] {
|
|
continue
|
|
}
|
|
if !labelFilter(store.labels) {
|
|
continue
|
|
}
|
|
allStores = append(allStores, store.storeID)
|
|
}
|
|
for _, storeID := range allStores {
|
|
if !slices.ContainsFunc(regionStore.pendingPeers, func(p *metapb.Peer) bool { return p.StoreId == storeID }) {
|
|
nonPendingStores = append(nonPendingStores, storeID)
|
|
}
|
|
}
|
|
return allStores, nonPendingStores
|
|
}
|
|
|
|
// GetTiFlashRPCContext returns RPCContext for a region must access flash store. If it returns nil, the region
|
|
// must be out of date and already dropped from cache or not flash store found.
|
|
// `loadBalance` is an option. For batch cop, it is pointless and might cause try the failed store repeatly.
|
|
func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, loadBalance bool, labelFilter LabelFilter) (*RPCContext, error) {
|
|
|
|
cachedRegion := c.GetCachedRegionWithRLock(id)
|
|
if !cachedRegion.isValid() {
|
|
return nil, nil
|
|
}
|
|
|
|
regionStore := cachedRegion.getStore()
|
|
|
|
// sIdx is for load balance of TiFlash store.
|
|
var sIdx int
|
|
if loadBalance {
|
|
sIdx = int(regionStore.workTiFlashIdx.Add(1))
|
|
} else {
|
|
sIdx = int(regionStore.workTiFlashIdx.Load())
|
|
}
|
|
for i := 0; i < regionStore.accessStoreNum(tiFlashOnly); i++ {
|
|
accessIdx := AccessIndex((sIdx + i) % regionStore.accessStoreNum(tiFlashOnly))
|
|
storeIdx, store := regionStore.accessStore(tiFlashOnly, accessIdx)
|
|
if !labelFilter(store.labels) {
|
|
continue
|
|
}
|
|
addr, err := c.getStoreAddr(bo, cachedRegion, store)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(addr) == 0 {
|
|
cachedRegion.invalidate(StoreNotFound)
|
|
return nil, nil
|
|
}
|
|
if store.getResolveState() == needCheck {
|
|
_, err := store.reResolve(c.stores, c.bg)
|
|
tikverr.Log(err)
|
|
}
|
|
regionStore.workTiFlashIdx.Store(int32(accessIdx))
|
|
peer := cachedRegion.meta.Peers[storeIdx]
|
|
storeFailEpoch := atomic.LoadUint32(&store.epoch)
|
|
if storeFailEpoch != regionStore.storeEpochs[storeIdx] {
|
|
cachedRegion.invalidate(Other)
|
|
logutil.Logger(bo.GetCtx()).Info("invalidate current region, because others failed on same store",
|
|
zap.Uint64("region", id.GetID()),
|
|
zap.String("store", store.addr))
|
|
// TiFlash will always try to find out a valid peer, avoiding to retry too many times.
|
|
continue
|
|
}
|
|
return &RPCContext{
|
|
ClusterID: c.clusterID,
|
|
Region: id,
|
|
Meta: cachedRegion.meta,
|
|
Peer: peer,
|
|
AccessIdx: accessIdx,
|
|
Store: store,
|
|
Addr: addr,
|
|
AccessMode: tiFlashOnly,
|
|
TiKVNum: regionStore.accessStoreNum(tiKVOnly),
|
|
}, nil
|
|
}
|
|
|
|
cachedRegion.invalidate(Other)
|
|
return nil, nil
|
|
}
|
|
|
|
// KeyLocation is the region and range that a key is located.
|
|
type KeyLocation struct {
|
|
Region RegionVerID
|
|
StartKey []byte
|
|
EndKey []byte
|
|
Buckets *metapb.Buckets
|
|
}
|
|
|
|
// Contains checks if key is in [StartKey, EndKey).
|
|
func (l *KeyLocation) Contains(key []byte) bool {
|
|
return bytes.Compare(l.StartKey, key) <= 0 &&
|
|
(bytes.Compare(key, l.EndKey) < 0 || len(l.EndKey) == 0)
|
|
}
|
|
|
|
// String implements fmt.Stringer interface.
|
|
func (l *KeyLocation) String() string {
|
|
return fmt.Sprintf("region %s,startKey:%s,endKey:%s", l.Region.String(), redact.Key(l.StartKey), redact.Key(l.EndKey))
|
|
}
|
|
|
|
// GetBucketVersion gets the bucket version of the region.
|
|
// If the region doesn't contain buckets, returns 0.
|
|
func (l *KeyLocation) GetBucketVersion() uint64 {
|
|
if l.Buckets == nil {
|
|
return 0
|
|
}
|
|
return l.Buckets.GetVersion()
|
|
}
|
|
|
|
// LocateBucket handles with a type of edge case of locateBucket that returns nil.
|
|
// There are two cases where locateBucket returns nil:
|
|
// Case one is that the key neither does not belong to any bucket nor does not belong to the region.
|
|
// Case two is that the key belongs to the region but not any bucket.
|
|
// LocateBucket will not return nil in the case two.
|
|
// Specifically, when the key is in [KeyLocation.StartKey, first Bucket key), the result returned by locateBucket will be nil
|
|
// as there's no bucket containing this key. LocateBucket will return Bucket{KeyLocation.StartKey, first Bucket key}
|
|
// as it's reasonable to assume that Bucket{KeyLocation.StartKey, first Bucket key} is a bucket belonging to the region.
|
|
// Key in [last Bucket key, KeyLocation.EndKey) is handled similarly.
|
|
func (l *KeyLocation) LocateBucket(key []byte) *Bucket {
|
|
bucket := l.locateBucket(key)
|
|
// Return the bucket when locateBucket can locate the key
|
|
if bucket != nil {
|
|
return bucket
|
|
}
|
|
// Case one returns nil too.
|
|
if !l.Contains(key) {
|
|
return nil
|
|
}
|
|
counts := len(l.Buckets.Keys)
|
|
if counts == 0 {
|
|
return &Bucket{
|
|
l.StartKey,
|
|
l.EndKey,
|
|
}
|
|
}
|
|
// Handle case two
|
|
firstBucketKey := l.Buckets.Keys[0]
|
|
if bytes.Compare(key, firstBucketKey) < 0 {
|
|
return &Bucket{
|
|
l.StartKey,
|
|
firstBucketKey,
|
|
}
|
|
}
|
|
lastBucketKey := l.Buckets.Keys[counts-1]
|
|
if bytes.Compare(lastBucketKey, key) <= 0 {
|
|
return &Bucket{
|
|
lastBucketKey,
|
|
l.EndKey,
|
|
}
|
|
}
|
|
// unreachable
|
|
logutil.Logger(context.Background()).Info(
|
|
"Unreachable place", zap.String("KeyLocation", l.String()), zap.String("Key", redact.Key(key)))
|
|
panic("Unreachable")
|
|
}
|
|
|
|
// locateBucket returns the bucket the key is located. It returns nil if the key is outside the bucket.
|
|
func (l *KeyLocation) locateBucket(key []byte) *Bucket {
|
|
keys := l.Buckets.GetKeys()
|
|
searchLen := len(keys) - 1
|
|
i := sort.Search(searchLen, func(i int) bool {
|
|
return bytes.Compare(key, keys[i]) < 0
|
|
})
|
|
|
|
// buckets contains region's start/end key, so i==0 means it can't find a suitable bucket
|
|
// which can happen if the bucket information is stale.
|
|
if i == 0 ||
|
|
// the key isn't located in the last range.
|
|
(i == searchLen && len(keys[searchLen]) != 0 && bytes.Compare(key, keys[searchLen]) >= 0) {
|
|
return nil
|
|
}
|
|
return &Bucket{keys[i-1], keys[i]}
|
|
}
|
|
|
|
// Bucket is a single bucket of a region.
|
|
type Bucket struct {
|
|
StartKey []byte
|
|
EndKey []byte
|
|
}
|
|
|
|
// Contains checks whether the key is in the Bucket.
|
|
func (b *Bucket) Contains(key []byte) bool {
|
|
return contains(b.StartKey, b.EndKey, key)
|
|
}
|
|
|
|
// LocateKeyRange lists region and range that key in [start_key,end_key).
|
|
// Regions without leader won't be returned.
|
|
func (c *RegionCache) LocateKeyRange(bo *retry.Backoffer, startKey, endKey []byte) ([]*KeyLocation, error) {
|
|
var res []*KeyLocation
|
|
for {
|
|
// 1. find regions from cache
|
|
for {
|
|
r := c.tryFindRegionByKey(startKey, false)
|
|
if r == nil {
|
|
break
|
|
}
|
|
res = append(res, &KeyLocation{
|
|
Region: r.VerID(),
|
|
StartKey: r.StartKey(),
|
|
EndKey: r.EndKey(),
|
|
Buckets: r.getStore().buckets,
|
|
})
|
|
if r.ContainsByEnd(endKey) {
|
|
return res, nil
|
|
}
|
|
startKey = r.EndKey()
|
|
}
|
|
// 2. load remaining regions from pd client
|
|
batchRegions, err := c.BatchLoadRegionsWithKeyRanges(bo, []router.KeyRange{{StartKey: startKey, EndKey: endKey}}, defaultRegionsPerBatch, WithNeedRegionHasLeaderPeer())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(batchRegions) == 0 {
|
|
// should never happen
|
|
err := errors.Errorf("BatchLoadRegionsWithKeyRange return empty batchRegions without err")
|
|
return nil, err
|
|
}
|
|
for _, r := range batchRegions {
|
|
res = append(res, &KeyLocation{
|
|
Region: r.VerID(),
|
|
StartKey: r.StartKey(),
|
|
EndKey: r.EndKey(),
|
|
Buckets: r.getStore().buckets,
|
|
})
|
|
}
|
|
endRegion := batchRegions[len(batchRegions)-1]
|
|
if endRegion.ContainsByEnd(endKey) {
|
|
break
|
|
}
|
|
startKey = endRegion.EndKey()
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
type batchLocateKeyRangesOption struct {
|
|
// whether load leader only, if it's set to true, regions without leader will be skipped.
|
|
// Note there is leader even the leader is invalid or outdated.
|
|
needRegionHasLeaderPeer bool
|
|
// whether load buckets, if it's set to true, more traffic will be consumed.
|
|
needBuckets bool
|
|
}
|
|
|
|
type BatchLocateKeyRangesOpt func(*batchLocateKeyRangesOption)
|
|
|
|
func WithNeedBuckets() BatchLocateKeyRangesOpt {
|
|
return func(opt *batchLocateKeyRangesOption) {
|
|
opt.needBuckets = true
|
|
}
|
|
}
|
|
|
|
func WithNeedRegionHasLeaderPeer() BatchLocateKeyRangesOpt {
|
|
return func(opt *batchLocateKeyRangesOption) {
|
|
opt.needRegionHasLeaderPeer = true
|
|
}
|
|
}
|
|
|
|
// BatchLocateKeyRanges lists regions in the given ranges.
|
|
func (c *RegionCache) BatchLocateKeyRanges(bo *retry.Backoffer, keyRanges []kv.KeyRange, opts ...BatchLocateKeyRangesOpt) ([]*KeyLocation, error) {
|
|
uncachedRanges := make([]router.KeyRange, 0, len(keyRanges))
|
|
cachedRegions := make([]*Region, 0, len(keyRanges))
|
|
// 1. find regions from cache
|
|
var lastRegion *Region
|
|
for _, keyRange := range keyRanges {
|
|
if lastRegion != nil {
|
|
if lastRegion.ContainsByEnd(keyRange.EndKey) {
|
|
continue
|
|
} else if lastRegion.Contains(keyRange.StartKey) {
|
|
keyRange.StartKey = lastRegion.EndKey()
|
|
}
|
|
}
|
|
// TODO: find all the cached regions in the range.
|
|
// now we only check if the region is cached from the lower bound, if there is a uncached hole in the middle,
|
|
// we will load the rest regions even they are cached.
|
|
r := c.tryFindRegionByKey(keyRange.StartKey, false)
|
|
lastRegion = r
|
|
if r == nil {
|
|
// region cache miss, add the cut range to uncachedRanges, load from PD later.
|
|
uncachedRanges = append(uncachedRanges, router.KeyRange{StartKey: keyRange.StartKey, EndKey: keyRange.EndKey})
|
|
continue
|
|
}
|
|
// region cache hit, add the region to cachedRegions.
|
|
cachedRegions = append(cachedRegions, r)
|
|
if r.ContainsByEnd(keyRange.EndKey) {
|
|
// the range is fully hit in the region cache.
|
|
continue
|
|
}
|
|
keyRange.StartKey = r.EndKey()
|
|
// Batch load rest regions from Cache.
|
|
containsAll := false
|
|
outer:
|
|
for {
|
|
batchRegionInCache, err := c.scanRegionsFromCache(bo, keyRange.StartKey, keyRange.EndKey, defaultRegionsPerBatch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, r = range batchRegionInCache {
|
|
if !r.Contains(keyRange.StartKey) { // uncached hole, load the rest regions
|
|
break outer
|
|
}
|
|
cachedRegions = append(cachedRegions, r)
|
|
lastRegion = r
|
|
if r.ContainsByEnd(keyRange.EndKey) {
|
|
// the range is fully hit in the region cache.
|
|
containsAll = true
|
|
break outer
|
|
}
|
|
keyRange.StartKey = r.EndKey()
|
|
}
|
|
if len(batchRegionInCache) < defaultRegionsPerBatch { // region cache miss, load the rest regions
|
|
break
|
|
}
|
|
}
|
|
if !containsAll {
|
|
uncachedRanges = append(uncachedRanges, router.KeyRange{StartKey: keyRange.StartKey, EndKey: keyRange.EndKey})
|
|
}
|
|
}
|
|
|
|
merger := newBatchLocateRegionMerger(cachedRegions, len(cachedRegions)+len(uncachedRanges))
|
|
|
|
// 2. load remaining regions from pd client
|
|
for len(uncachedRanges) > 0 {
|
|
regions, err := c.BatchLoadRegionsWithKeyRanges(bo, uncachedRanges, defaultRegionsPerBatch, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(regions) == 0 {
|
|
return nil, errors.Errorf("BatchLoadRegionsWithKeyRanges return empty batchedRegions without err")
|
|
}
|
|
|
|
for _, r := range regions {
|
|
merger.appendRegion(r)
|
|
}
|
|
// if the regions are not loaded completely, split uncachedRanges and load the rest.
|
|
uncachedRanges = rangesAfterKey(uncachedRanges, regions[len(regions)-1].EndKey())
|
|
}
|
|
return merger.build(), nil
|
|
}
|
|
|
|
type batchLocateRangesMerger struct {
|
|
lastEndKey *[]byte
|
|
cachedIdx int
|
|
cachedRegions []*Region
|
|
mergedLocations []*KeyLocation
|
|
}
|
|
|
|
func newBatchLocateRegionMerger(cachedRegions []*Region, sizeHint int) *batchLocateRangesMerger {
|
|
return &batchLocateRangesMerger{
|
|
lastEndKey: nil,
|
|
cachedRegions: cachedRegions,
|
|
mergedLocations: make([]*KeyLocation, 0, sizeHint),
|
|
}
|
|
}
|
|
|
|
func (m *batchLocateRangesMerger) appendKeyLocation(r *Region) {
|
|
m.mergedLocations = append(m.mergedLocations, &KeyLocation{
|
|
Region: r.VerID(),
|
|
StartKey: r.StartKey(),
|
|
EndKey: r.EndKey(),
|
|
Buckets: r.getStore().buckets,
|
|
})
|
|
}
|
|
|
|
func (m *batchLocateRangesMerger) appendRegion(uncachedRegion *Region) {
|
|
defer func() {
|
|
endKey := uncachedRegion.EndKey()
|
|
if len(endKey) == 0 {
|
|
// len(end_key) == 0 means region end to +inf, it should be the last region.
|
|
// discard the rest cached regions by moving the cachedIdx to the end of cachedRegions.
|
|
m.cachedIdx = len(m.cachedRegions)
|
|
} else {
|
|
lastEndKey := uncachedRegion.EndKey()
|
|
m.lastEndKey = &lastEndKey
|
|
}
|
|
}()
|
|
if len(uncachedRegion.StartKey()) == 0 {
|
|
// len(start_key) == 0 means region start from -inf, it should be the first region.
|
|
m.appendKeyLocation(uncachedRegion)
|
|
return
|
|
}
|
|
if m.lastEndKey != nil && bytes.Compare(*m.lastEndKey, uncachedRegion.StartKey()) >= 0 {
|
|
// the uncached regions are continued, do not consider cached region by now.
|
|
m.appendKeyLocation(uncachedRegion)
|
|
return
|
|
}
|
|
for ; m.cachedIdx < len(m.cachedRegions); m.cachedIdx++ {
|
|
if m.lastEndKey != nil && bytes.Compare(*m.lastEndKey, m.cachedRegions[m.cachedIdx].EndKey()) >= 0 {
|
|
// skip the cached region that is covered by the uncached region.
|
|
continue
|
|
}
|
|
if bytes.Compare(m.cachedRegions[m.cachedIdx].StartKey(), uncachedRegion.StartKey()) >= 0 {
|
|
break
|
|
}
|
|
// append the cached regions that are before the uncached region.
|
|
m.appendKeyLocation(m.cachedRegions[m.cachedIdx])
|
|
}
|
|
m.appendKeyLocation(uncachedRegion)
|
|
}
|
|
|
|
func (m *batchLocateRangesMerger) build() []*KeyLocation {
|
|
// append the rest cache hit regions
|
|
for ; m.cachedIdx < len(m.cachedRegions); m.cachedIdx++ {
|
|
if m.lastEndKey != nil && bytes.Compare(*m.lastEndKey, m.cachedRegions[m.cachedIdx].EndKey()) >= 0 {
|
|
// skip the cached region that is covered by the uncached region.
|
|
continue
|
|
}
|
|
m.appendKeyLocation(m.cachedRegions[m.cachedIdx])
|
|
}
|
|
return m.mergedLocations
|
|
}
|
|
|
|
// rangesAfterKey split the key ranges and return the rest ranges after splitKey.
|
|
// the returned ranges are referenced to the input keyRanges, and the key range may be changed in place,
|
|
// the input keyRanges should not be used after calling this function.
|
|
func rangesAfterKey(keyRanges []router.KeyRange, splitKey []byte) []router.KeyRange {
|
|
if len(keyRanges) == 0 {
|
|
return nil
|
|
}
|
|
if len(splitKey) == 0 || len(keyRanges[len(keyRanges)-1].EndKey) > 0 && bytes.Compare(splitKey, keyRanges[len(keyRanges)-1].EndKey) >= 0 {
|
|
// fast check, if all ranges are loaded from PD, quit the loop.
|
|
return nil
|
|
}
|
|
|
|
n := sort.Search(len(keyRanges), func(i int) bool {
|
|
return len(keyRanges[i].EndKey) == 0 || bytes.Compare(keyRanges[i].EndKey, splitKey) > 0
|
|
})
|
|
|
|
keyRanges = keyRanges[n:]
|
|
if bytes.Compare(splitKey, keyRanges[0].StartKey) > 0 {
|
|
keyRanges[0].StartKey = splitKey
|
|
}
|
|
return keyRanges
|
|
}
|
|
|
|
// LocateKey searches for the region and range that the key is located.
|
|
func (c *RegionCache) LocateKey(bo *retry.Backoffer, key []byte) (*KeyLocation, error) {
|
|
r, err := c.findRegionByKey(bo, key, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &KeyLocation{
|
|
Region: r.VerID(),
|
|
StartKey: r.StartKey(),
|
|
EndKey: r.EndKey(),
|
|
Buckets: r.getStore().buckets,
|
|
}, nil
|
|
}
|
|
|
|
// TryLocateKey searches for the region and range that the key is located, but return nil when region miss or invalid.
|
|
func (c *RegionCache) TryLocateKey(key []byte) *KeyLocation {
|
|
r := c.tryFindRegionByKey(key, false)
|
|
if r == nil {
|
|
return nil
|
|
}
|
|
return &KeyLocation{
|
|
Region: r.VerID(),
|
|
StartKey: r.StartKey(),
|
|
EndKey: r.EndKey(),
|
|
Buckets: r.getStore().buckets,
|
|
}
|
|
}
|
|
|
|
// LocateEndKey searches for the region and range that the key is located.
|
|
// Unlike LocateKey, start key of a region is exclusive and end key is inclusive.
|
|
func (c *RegionCache) LocateEndKey(bo *retry.Backoffer, key []byte) (*KeyLocation, error) {
|
|
r, err := c.findRegionByKey(bo, key, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &KeyLocation{
|
|
Region: r.VerID(),
|
|
StartKey: r.StartKey(),
|
|
EndKey: r.EndKey(),
|
|
Buckets: r.getStore().buckets,
|
|
}, nil
|
|
}
|
|
|
|
func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey bool) (r *Region, err error) {
|
|
var expired bool
|
|
r, expired = c.searchCachedRegionByKey(key, isEndKey)
|
|
tag := "ByKey"
|
|
if isEndKey {
|
|
tag = "ByEndKey"
|
|
}
|
|
if r == nil || expired {
|
|
// load region when it is not exists or expired.
|
|
observeLoadRegion(tag, r, expired, 0)
|
|
lr, err := c.loadRegion(bo, key, isEndKey, opt.WithAllowFollowerHandle())
|
|
if err != nil {
|
|
// no region data, return error if failure.
|
|
return nil, err
|
|
}
|
|
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID())
|
|
r = lr
|
|
c.mu.Lock()
|
|
stale := !c.insertRegionToCache(r, true, true)
|
|
c.mu.Unlock()
|
|
// just retry once, it won't bring much overhead.
|
|
if stale {
|
|
observeLoadRegion(tag+":Retry", r, expired, 0)
|
|
lr, err = c.loadRegion(bo, key, isEndKey)
|
|
if err != nil {
|
|
// no region data, return error if failure.
|
|
return nil, err
|
|
}
|
|
r = lr
|
|
c.mu.Lock()
|
|
c.insertRegionToCache(r, true, true)
|
|
c.mu.Unlock()
|
|
}
|
|
} else if flags := r.resetSyncFlags(needReloadOnAccess | needDelayedReloadReady); flags > 0 {
|
|
// load region when it be marked as need reload.
|
|
observeLoadRegion(tag, r, expired, flags)
|
|
// NOTE: we can NOT use c.loadRegionByID(bo, r.GetID()) here because the new region (loaded by id) is not
|
|
// guaranteed to contain the key. (ref: https://github.com/tikv/client-go/pull/1299)
|
|
lr, err := c.loadRegion(bo, key, isEndKey)
|
|
if err != nil {
|
|
// ignore error and use old region info.
|
|
logutil.Logger(bo.GetCtx()).Error("load region failure",
|
|
zap.String("key", redact.Key(key)), zap.Error(err),
|
|
zap.String("encode-key", redact.Key(c.codec.EncodeRegionKey(key))))
|
|
} else {
|
|
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to need-reload", lr.GetID())
|
|
reloadOnAccess := flags&needReloadOnAccess > 0
|
|
r = lr
|
|
c.mu.Lock()
|
|
c.insertRegionToCache(r, reloadOnAccess, reloadOnAccess)
|
|
c.mu.Unlock()
|
|
}
|
|
}
|
|
return r, nil
|
|
}
|
|
|
|
func (c *RegionCache) tryFindRegionByKey(key []byte, isEndKey bool) (r *Region) {
|
|
var expired bool
|
|
r, expired = c.searchCachedRegionByKey(key, isEndKey)
|
|
if r == nil || expired || r.checkSyncFlags(needReloadOnAccess) {
|
|
return nil
|
|
}
|
|
return r
|
|
}
|
|
|
|
// OnSendFailForTiFlash handles send request fail logic for tiflash.
|
|
func (c *RegionCache) OnSendFailForTiFlash(bo *retry.Backoffer, store *Store, region RegionVerID, prev *metapb.Region, scheduleReload bool, err error, skipSwitchPeerLog bool) {
|
|
r := c.GetCachedRegionWithRLock(region)
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
rs := r.getStore()
|
|
peersNum := len(r.GetMeta().Peers)
|
|
if len(prev.Peers) != peersNum {
|
|
logutil.Logger(bo.GetCtx()).Info("retry and refresh current region after send request fail and up/down stores length changed",
|
|
zap.Stringer("region", ®ion),
|
|
zap.Bool("needReload", scheduleReload),
|
|
zap.Reflect("oldPeers", prev.Peers),
|
|
zap.Reflect("newPeers", r.GetMeta().Peers),
|
|
zap.Error(err))
|
|
return
|
|
}
|
|
|
|
accessMode := tiFlashOnly
|
|
accessIdx := rs.getAccessIndex(accessMode, store)
|
|
if accessIdx == -1 {
|
|
logutil.Logger(bo.GetCtx()).Warn("can not get access index for region " + region.String())
|
|
return
|
|
}
|
|
if err != nil {
|
|
storeIdx, s := rs.accessStore(accessMode, accessIdx)
|
|
c.markRegionNeedBeRefill(s, storeIdx, rs)
|
|
}
|
|
|
|
// try next peer
|
|
rs.switchNextFlashPeer(r, accessIdx)
|
|
// In most scenarios, TiFlash will batch all the regions in one TiFlash store into one request, so when meet send failure,
|
|
// this function is called repeatedly for all the regions, since one TiFlash store might contain thousands of regions, we
|
|
// need a way to avoid generating too much useless log
|
|
if !skipSwitchPeerLog {
|
|
logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail",
|
|
zap.Stringer("region", ®ion),
|
|
zap.Bool("needReload", scheduleReload),
|
|
zap.Error(err))
|
|
}
|
|
|
|
// force reload region when retry all known peers in region.
|
|
if scheduleReload {
|
|
r.setSyncFlags(needReloadOnAccess)
|
|
}
|
|
}
|
|
|
|
func (c *RegionCache) markRegionNeedBeRefill(s *Store, storeIdx int, rs *regionStore) int {
|
|
incEpochStoreIdx := -1
|
|
// invalidate regions in store.
|
|
epoch := rs.storeEpochs[storeIdx]
|
|
if atomic.CompareAndSwapUint32(&s.epoch, epoch, epoch+1) {
|
|
logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr))
|
|
incEpochStoreIdx = storeIdx
|
|
metrics.RegionCacheCounterWithInvalidateStoreRegionsOK.Inc()
|
|
}
|
|
// schedule a store addr resolve.
|
|
c.stores.markStoreNeedCheck(s)
|
|
return incEpochStoreIdx
|
|
}
|
|
|
|
// OnSendFail handles send request fail logic.
|
|
func (c *RegionCache) OnSendFail(bo *retry.Backoffer, ctx *RPCContext, scheduleReload bool, err error) {
|
|
metrics.RegionCacheCounterWithSendFail.Inc()
|
|
r := c.GetCachedRegionWithRLock(ctx.Region)
|
|
if r == nil {
|
|
return
|
|
}
|
|
peersNum := len(r.meta.Peers)
|
|
if len(ctx.Meta.Peers) != peersNum {
|
|
logutil.Logger(bo.GetCtx()).Info("retry and refresh current ctx after send request fail and up/down stores length changed",
|
|
zap.Stringer("current", ctx),
|
|
zap.Bool("needReload", scheduleReload),
|
|
zap.Reflect("oldPeers", ctx.Meta.Peers),
|
|
zap.Reflect("newPeers", r.meta.Peers),
|
|
zap.Error(err))
|
|
return
|
|
}
|
|
|
|
rs := r.getStore()
|
|
|
|
if err != nil {
|
|
storeIdx, s := rs.accessStore(ctx.AccessMode, ctx.AccessIdx)
|
|
|
|
// invalidate regions in store.
|
|
c.markRegionNeedBeRefill(s, storeIdx, rs)
|
|
}
|
|
|
|
// try next peer to found new leader.
|
|
if ctx.AccessMode == tiKVOnly {
|
|
rs.switchNextTiKVPeer(r, ctx.AccessIdx)
|
|
logutil.Logger(bo.GetCtx()).Info("switch region peer to next due to send request fail",
|
|
zap.Stringer("current", ctx),
|
|
zap.Bool("needReload", scheduleReload),
|
|
zap.Error(err))
|
|
} else {
|
|
rs.switchNextFlashPeer(r, ctx.AccessIdx)
|
|
logutil.Logger(bo.GetCtx()).Info("switch region tiflash peer to next due to send request fail",
|
|
zap.Stringer("current", ctx),
|
|
zap.Bool("needReload", scheduleReload),
|
|
zap.Error(err))
|
|
}
|
|
|
|
// force reload region when retry all known peers in region.
|
|
if scheduleReload {
|
|
r.setSyncFlags(needReloadOnAccess)
|
|
}
|
|
|
|
}
|
|
|
|
// LocateRegionByID searches for the region with ID.
|
|
func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*KeyLocation, error) {
|
|
r, expired := c.searchCachedRegionByID(regionID)
|
|
if r != nil && !expired {
|
|
if flags := r.resetSyncFlags(needReloadOnAccess | needDelayedReloadReady); flags > 0 {
|
|
reloadOnAccess := flags&needReloadOnAccess > 0
|
|
observeLoadRegion("ByID", r, expired, flags)
|
|
lr, err := c.loadRegionByID(bo, regionID)
|
|
if err != nil {
|
|
// ignore error and use old region info.
|
|
logutil.Logger(bo.GetCtx()).Error("load region failure",
|
|
zap.Uint64("regionID", regionID), zap.Error(err))
|
|
} else {
|
|
r = lr
|
|
c.mu.Lock()
|
|
c.insertRegionToCache(r, reloadOnAccess, reloadOnAccess)
|
|
c.mu.Unlock()
|
|
}
|
|
}
|
|
loc := &KeyLocation{
|
|
Region: r.VerID(),
|
|
StartKey: r.StartKey(),
|
|
EndKey: r.EndKey(),
|
|
Buckets: r.getStore().buckets,
|
|
}
|
|
return loc, nil
|
|
}
|
|
|
|
observeLoadRegion("ByID", r, expired, 0)
|
|
r, err := c.loadRegionByID(bo, regionID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c.mu.Lock()
|
|
c.insertRegionToCache(r, true, true)
|
|
c.mu.Unlock()
|
|
return &KeyLocation{
|
|
Region: r.VerID(),
|
|
StartKey: r.StartKey(),
|
|
EndKey: r.EndKey(),
|
|
Buckets: r.getStore().buckets,
|
|
}, nil
|
|
}
|
|
|
|
// GroupKeysByRegion separates keys into groups by their belonging Regions.
|
|
// Specially it also returns the first key's region which may be used as the
|
|
// 'PrimaryLockKey' and should be committed ahead of others.
|
|
// filter is used to filter some unwanted keys.
|
|
func (c *RegionCache) GroupKeysByRegion(bo *retry.Backoffer, keys [][]byte, filter func(key, regionStartKey []byte) bool) (map[RegionVerID][][]byte, RegionVerID, error) {
|
|
groups := make(map[RegionVerID][][]byte)
|
|
var first RegionVerID
|
|
var lastLoc *KeyLocation
|
|
for i, k := range keys {
|
|
if lastLoc == nil || !lastLoc.Contains(k) {
|
|
var err error
|
|
lastLoc, err = c.LocateKey(bo, k)
|
|
if err != nil {
|
|
return nil, first, err
|
|
}
|
|
if filter != nil && filter(k, lastLoc.StartKey) {
|
|
continue
|
|
}
|
|
}
|
|
id := lastLoc.Region
|
|
if i == 0 {
|
|
first = id
|
|
}
|
|
groups[id] = append(groups[id], k)
|
|
}
|
|
return groups, first, nil
|
|
}
|
|
|
|
// ListRegionIDsInKeyRange lists ids of regions in [start_key,end_key].
|
|
func (c *RegionCache) ListRegionIDsInKeyRange(bo *retry.Backoffer, startKey, endKey []byte) (regionIDs []uint64, err error) {
|
|
for {
|
|
curRegion, err := c.LocateKey(bo, startKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
regionIDs = append(regionIDs, curRegion.Region.id)
|
|
if curRegion.Contains(endKey) {
|
|
break
|
|
}
|
|
startKey = curRegion.EndKey
|
|
}
|
|
return regionIDs, nil
|
|
}
|
|
|
|
// LoadRegionsInKeyRange lists regions in [start_key,end_key].
|
|
func (c *RegionCache) LoadRegionsInKeyRange(bo *retry.Backoffer, startKey, endKey []byte) (regions []*Region, err error) {
|
|
var batchRegions []*Region
|
|
for {
|
|
batchRegions, err = c.BatchLoadRegionsWithKeyRange(bo, startKey, endKey, defaultRegionsPerBatch)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(batchRegions) == 0 {
|
|
// should never happen
|
|
break
|
|
}
|
|
regions = append(regions, batchRegions...)
|
|
endRegion := batchRegions[len(batchRegions)-1]
|
|
if endRegion.ContainsByEnd(endKey) {
|
|
break
|
|
}
|
|
startKey = endRegion.EndKey()
|
|
}
|
|
return
|
|
}
|
|
|
|
// BatchLoadRegionsWithKeyRange loads at most given numbers of regions to the RegionCache,
|
|
// within the given key range from the startKey to endKey. Returns the loaded regions.
|
|
func (c *RegionCache) BatchLoadRegionsWithKeyRange(bo *retry.Backoffer, startKey []byte, endKey []byte, count int) (regions []*Region, err error) {
|
|
regions, err = c.scanRegions(bo, startKey, endKey, count)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if len(regions) == 0 {
|
|
err = errors.Errorf("PD returned no region, start_key: %q, end_key: %q, encode_start_key: %q, encode_end_key: %q",
|
|
redact.Key(startKey), redact.Key(endKey),
|
|
redact.Key(c.codec.EncodeRegionKey(startKey)), redact.Key(c.codec.EncodeRegionKey(endKey)))
|
|
return
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// TODO(youjiali1995): scanRegions always fetch regions from PD and these regions don't contain buckets information
|
|
// for less traffic, so newly inserted regions in region cache don't have buckets information. We should improve it.
|
|
for _, region := range regions {
|
|
c.insertRegionToCache(region, true, false)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// BatchLoadRegionsWithKeyRanges loads at most given numbers of regions to the RegionCache,
|
|
// within the given key range from the key ranges. Returns the loaded regions.
|
|
func (c *RegionCache) BatchLoadRegionsWithKeyRanges(bo *retry.Backoffer, keyRanges []router.KeyRange, count int, opts ...BatchLocateKeyRangesOpt) (regions []*Region, err error) {
|
|
if len(keyRanges) == 0 {
|
|
return nil, nil
|
|
}
|
|
regions, err = c.batchScanRegions(bo, keyRanges, count, opts...)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if len(regions) == 0 {
|
|
err = errors.Errorf("PD returned no region, range num: %d, count: %d", len(keyRanges), count)
|
|
return
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
for _, region := range regions {
|
|
c.insertRegionToCache(region, true, false)
|
|
}
|
|
return
|
|
}
|
|
|
|
// BatchLoadRegionsFromKey loads at most given numbers of regions to the RegionCache, from the given startKey. Returns
|
|
// the endKey of the last loaded region. If some of the regions has no leader, their entries in RegionCache will not be
|
|
// updated.
|
|
func (c *RegionCache) BatchLoadRegionsFromKey(bo *retry.Backoffer, startKey []byte, count int) ([]byte, error) {
|
|
regions, err := c.BatchLoadRegionsWithKeyRange(bo, startKey, nil, count)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return regions[len(regions)-1].EndKey(), nil
|
|
}
|
|
|
|
// InvalidateCachedRegion removes a cached Region.
|
|
func (c *RegionCache) InvalidateCachedRegion(id RegionVerID) {
|
|
c.InvalidateCachedRegionWithReason(id, Other)
|
|
}
|
|
|
|
// InvalidateCachedRegionWithReason removes a cached Region with the reason why it's invalidated.
|
|
func (c *RegionCache) InvalidateCachedRegionWithReason(id RegionVerID, reason InvalidReason) {
|
|
cachedRegion := c.GetCachedRegionWithRLock(id)
|
|
if cachedRegion == nil {
|
|
return
|
|
}
|
|
cachedRegion.invalidate(reason)
|
|
}
|
|
|
|
// UpdateLeader update some region cache with newer leader info.
|
|
func (c *RegionCache) UpdateLeader(regionID RegionVerID, leader *metapb.Peer, currentPeerIdx AccessIndex) {
|
|
r := c.GetCachedRegionWithRLock(regionID)
|
|
if r == nil {
|
|
logutil.BgLogger().Debug("regionCache: cannot find region when updating leader",
|
|
zap.Uint64("regionID", regionID.GetID()))
|
|
return
|
|
}
|
|
|
|
if leader == nil {
|
|
rs := r.getStore()
|
|
rs.switchNextTiKVPeer(r, currentPeerIdx)
|
|
logutil.BgLogger().Info("switch region peer to next due to NotLeader with NULL leader",
|
|
zap.Int("currIdx", int(currentPeerIdx)),
|
|
zap.Uint64("regionID", regionID.GetID()))
|
|
return
|
|
}
|
|
|
|
if !r.switchWorkLeaderToPeer(leader) {
|
|
logutil.BgLogger().Info("invalidate region cache due to cannot find peer when updating leader",
|
|
zap.Uint64("regionID", regionID.GetID()),
|
|
zap.Int("currIdx", int(currentPeerIdx)),
|
|
zap.Uint64("leaderStoreID", leader.GetStoreId()))
|
|
r.invalidate(StoreNotFound)
|
|
} else {
|
|
logutil.BgLogger().Info("switch region leader to specific leader due to kv return NotLeader",
|
|
zap.Uint64("regionID", regionID.GetID()),
|
|
zap.Int("currIdx", int(currentPeerIdx)),
|
|
zap.Uint64("leaderStoreID", leader.GetStoreId()))
|
|
}
|
|
}
|
|
|
|
// removeVersionFromCache removes a RegionVerID from cache, tries to cleanup
|
|
// both c.mu.regions and c.mu.versions. Note this function is not thread-safe.
|
|
func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uint64) {
|
|
delete(mu.regions, oldVer)
|
|
if ver, ok := mu.latestVersions[regionID]; ok && ver.Equals(oldVer) {
|
|
delete(mu.latestVersions, regionID)
|
|
}
|
|
}
|
|
|
|
// insertRegionToCache tries to insert the Region to cache.
|
|
// It should be protected by c.mu.l.Lock().
|
|
// if `invalidateOldRegion` is false, the old region cache should be still valid,
|
|
// and it may still be used by some kv requests.
|
|
// Moreover, it will return false if the region is stale.
|
|
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool, shouldCount bool) bool {
|
|
newVer := cachedRegion.VerID()
|
|
oldVer, ok := mu.latestVersions[newVer.id]
|
|
// There are two or more situations in which the region we got is stale.
|
|
// The first case is that the process of getting a region is concurrent.
|
|
// The stale region may be returned later due to network reasons.
|
|
// The second case is that the region may be obtained from the PD follower,
|
|
// and there is the synchronization time between the pd follower and the leader.
|
|
// So we should check the epoch.
|
|
if ok && (oldVer.GetVer() > newVer.GetVer() || oldVer.GetConfVer() > newVer.GetConfVer()) {
|
|
metrics.TiKVStaleRegionFromPDCounter.Inc()
|
|
logutil.BgLogger().Debug("get stale region",
|
|
zap.Uint64("region", newVer.GetID()), zap.Uint64("new-ver", newVer.GetVer()), zap.Uint64("new-conf", newVer.GetConfVer()),
|
|
zap.Uint64("old-ver", oldVer.GetVer()), zap.Uint64("old-conf", oldVer.GetConfVer()))
|
|
return false
|
|
}
|
|
// Also check and remove the intersecting regions including the old region.
|
|
intersectedRegions, stale := mu.sorted.removeIntersecting(cachedRegion, newVer)
|
|
if stale {
|
|
return false
|
|
}
|
|
// Insert the region (won't replace because of above deletion).
|
|
mu.sorted.ReplaceOrInsert(cachedRegion)
|
|
// Inherit the workTiKVIdx, workTiFlashIdx and buckets from the first intersected region.
|
|
if len(intersectedRegions) > 0 {
|
|
oldRegion := intersectedRegions[0].cachedRegion
|
|
store := cachedRegion.getStore()
|
|
oldRegionStore := oldRegion.getStore()
|
|
// TODO(youjiali1995): remove this because the new retry logic can handle this issue.
|
|
//
|
|
// Joint consensus is enabled in v5.0, which is possible to make a leader step down as a learner during a conf change.
|
|
// And if hibernate region is enabled, after the leader step down, there can be a long time that there is no leader
|
|
// in the region and the leader info in PD is stale until requests are sent to followers or hibernate timeout.
|
|
// To solve it, one solution is always to try a different peer if the invalid reason of the old cached region is no-leader.
|
|
// There is a small probability that the current peer who reports no-leader becomes a leader and TiDB has to retry once in this case.
|
|
if InvalidReason(atomic.LoadInt32((*int32)(&oldRegion.invalidReason))) == NoLeader {
|
|
store.workTiKVIdx = (oldRegionStore.workTiKVIdx + 1) % AccessIndex(store.accessStoreNum(tiKVOnly))
|
|
}
|
|
// Don't refresh TiFlash work idx for region. Otherwise, it will always goto a invalid store which
|
|
// is under transferring regions.
|
|
store.workTiFlashIdx.Store(oldRegionStore.workTiFlashIdx.Load())
|
|
|
|
// Keep the buckets information if needed.
|
|
if store.buckets == nil || (oldRegionStore.buckets != nil && store.buckets.GetVersion() < oldRegionStore.buckets.GetVersion()) {
|
|
store.buckets = oldRegionStore.buckets
|
|
}
|
|
}
|
|
// The intersecting regions in the cache are probably stale, clear them.
|
|
for _, region := range intersectedRegions {
|
|
mu.removeVersionFromCache(region.cachedRegion.VerID(), region.cachedRegion.GetID())
|
|
// If the old region is still valid, do not invalidate it to avoid unnecessary backoff.
|
|
if invalidateOldRegion {
|
|
// Invalidate the old region in case it's not invalidated and some requests try with the stale region information.
|
|
region.cachedRegion.invalidate(Other, !shouldCount)
|
|
}
|
|
}
|
|
// update related vars.
|
|
mu.regions[newVer] = cachedRegion
|
|
mu.latestVersions[newVer.id] = newVer
|
|
return true
|
|
}
|
|
|
|
// searchCachedRegionByKey finds the region from cache by key.
|
|
func (c *RegionCache) searchCachedRegionByKey(key []byte, isEndKey bool) (*Region, bool) {
|
|
c.mu.RLock()
|
|
region := c.mu.sorted.SearchByKey(key, isEndKey)
|
|
c.mu.RUnlock()
|
|
if region == nil {
|
|
return nil, false
|
|
}
|
|
return region, !region.checkRegionCacheTTL(time.Now().Unix())
|
|
}
|
|
|
|
// searchCachedRegionByID finds the region from cache by id.
|
|
func (c *RegionCache) searchCachedRegionByID(regionID uint64) (*Region, bool) {
|
|
c.mu.RLock()
|
|
ver, ok := c.mu.latestVersions[regionID]
|
|
if !ok {
|
|
c.mu.RUnlock()
|
|
return nil, false
|
|
}
|
|
region, ok := c.mu.regions[ver]
|
|
c.mu.RUnlock()
|
|
if !ok {
|
|
// should not happen
|
|
logutil.BgLogger().Warn("region not found", zap.Uint64("id", regionID), zap.Stringer("ver", &ver))
|
|
return nil, false
|
|
}
|
|
return region, !region.checkRegionCacheTTL(time.Now().Unix())
|
|
}
|
|
|
|
// GetStoresByType gets stores by type `typ`
|
|
func (c *RegionCache) GetStoresByType(typ tikvrpc.EndpointType) []*Store {
|
|
return c.stores.filter(nil, func(s *Store) bool {
|
|
return s.getResolveState() == resolved && s.storeType == typ
|
|
})
|
|
}
|
|
|
|
// GetAllStores gets TiKV and TiFlash stores.
|
|
func (c *RegionCache) GetAllStores() []*Store {
|
|
return c.stores.filter(nil, func(s *Store) bool {
|
|
return s.getResolveState() == resolved && (s.storeType == tikvrpc.TiKV || s.storeType == tikvrpc.TiFlash)
|
|
})
|
|
}
|
|
|
|
var loadRegionCounters sync.Map
|
|
|
|
const (
|
|
loadRegionReasonMissing = "Missing"
|
|
loadRegionReasonExpiredNormal = "Expired:Normal"
|
|
loadRegionReasonExpiredFrozen = "Expired:Frozen"
|
|
loadRegionReasonExpiredInvalid = "Expired:Invalid:"
|
|
loadRegionReasonReloadOnAccess = "Reload:OnAccess"
|
|
loadRegionReasonReloadDelayed = "Reload:Delayed"
|
|
loadRegionReasonUpdateBuckets = "UpdateBuckets"
|
|
loadRegionReasonUnknown = "Unknown"
|
|
)
|
|
|
|
func observeLoadRegion(tag string, region *Region, expired bool, reloadFlags int32, explicitReason ...string) {
|
|
reason := loadRegionReasonUnknown
|
|
if len(explicitReason) > 0 {
|
|
reason = strings.Join(explicitReason, ":")
|
|
} else if region == nil {
|
|
reason = loadRegionReasonMissing
|
|
} else if expired {
|
|
invalidReason := InvalidReason(atomic.LoadInt32((*int32)(®ion.invalidReason)))
|
|
if invalidReason != Ok {
|
|
reason = loadRegionReasonExpiredInvalid + invalidReason.String()
|
|
} else if region.checkSyncFlags(needExpireAfterTTL) {
|
|
reason = loadRegionReasonExpiredFrozen
|
|
} else {
|
|
reason = loadRegionReasonExpiredNormal
|
|
}
|
|
} else if reloadFlags > 0 {
|
|
if reloadFlags&needReloadOnAccess > 0 {
|
|
reason = loadRegionReasonReloadOnAccess
|
|
} else if reloadFlags&needDelayedReloadReady > 0 {
|
|
reason = loadRegionReasonReloadDelayed
|
|
}
|
|
}
|
|
type key struct {
|
|
t string
|
|
r string
|
|
}
|
|
counter, ok := loadRegionCounters.Load(key{tag, reason})
|
|
if !ok {
|
|
counter = metrics.TiKVLoadRegionCounter.WithLabelValues(tag, reason)
|
|
loadRegionCounters.Store(key{tag, reason}, counter)
|
|
}
|
|
counter.(prometheus.Counter).Inc()
|
|
}
|
|
|
|
// loadRegion loads region from pd client, and picks the first peer as leader.
|
|
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
|
|
// when processing in reverse order.
|
|
func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, opts ...opt.GetRegionOption) (*Region, error) {
|
|
ctx := bo.GetCtx()
|
|
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
|
|
span1 := span.Tracer().StartSpan("loadRegion", opentracing.ChildOf(span.Context()))
|
|
defer span1.Finish()
|
|
ctx = opentracing.ContextWithSpan(ctx, span1)
|
|
}
|
|
|
|
var backoffErr error
|
|
searchPrev := false
|
|
opts = append(opts, opt.WithBuckets())
|
|
for {
|
|
if backoffErr != nil {
|
|
err := bo.Backoff(retry.BoPDRPC, backoffErr)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
}
|
|
start := time.Now()
|
|
var reg *router.Region
|
|
var err error
|
|
if searchPrev {
|
|
reg, err = c.pdClient.GetPrevRegion(withPDCircuitBreaker(ctx), key, opts...)
|
|
} else {
|
|
reg, err = c.pdClient.GetRegion(withPDCircuitBreaker(ctx), key, opts...)
|
|
}
|
|
metrics.LoadRegionCacheHistogramWhenCacheMiss.Observe(time.Since(start).Seconds())
|
|
if err != nil {
|
|
metrics.RegionCacheCounterWithGetCacheMissError.Inc()
|
|
} else {
|
|
metrics.RegionCacheCounterWithGetCacheMissOK.Inc()
|
|
}
|
|
if err != nil {
|
|
if apicodec.IsDecodeError(err) {
|
|
return nil, errors.Errorf("failed to decode region range key, key: %q, err: %v, encode_key: %q",
|
|
redact.Key(key), err, redact.KeyBytes(c.codec.EncodeRegionKey(key)))
|
|
}
|
|
backoffErr = errors.Errorf("loadRegion from PD failed, key: %q, err: %v", redact.Key(key), err)
|
|
continue
|
|
}
|
|
if reg == nil || reg.Meta == nil {
|
|
backoffErr = errors.Errorf("region not found for key %q, encode_key: %q", redact.Key(key), redact.KeyBytes(c.codec.EncodeRegionKey(key)))
|
|
continue
|
|
}
|
|
if len(reg.Meta.Peers) == 0 {
|
|
return nil, errors.New("receive Region with no available peer")
|
|
}
|
|
if isEndKey && !searchPrev && bytes.Equal(reg.Meta.StartKey, key) && len(reg.Meta.StartKey) != 0 {
|
|
searchPrev = true
|
|
continue
|
|
}
|
|
return newRegion(bo, c, reg)
|
|
}
|
|
}
|
|
|
|
// loadRegionByID loads region from pd client, and picks the first peer as leader.
|
|
func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Region, error) {
|
|
ctx := bo.GetCtx()
|
|
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
|
|
span1 := span.Tracer().StartSpan("loadRegionByID", opentracing.ChildOf(span.Context()))
|
|
defer span1.Finish()
|
|
ctx = opentracing.ContextWithSpan(ctx, span1)
|
|
}
|
|
var backoffErr error
|
|
for {
|
|
if backoffErr != nil {
|
|
err := bo.Backoff(retry.BoPDRPC, backoffErr)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
}
|
|
start := time.Now()
|
|
reg, err := c.pdClient.GetRegionByID(withPDCircuitBreaker(ctx), regionID, opt.WithBuckets())
|
|
metrics.LoadRegionCacheHistogramWithRegionByID.Observe(time.Since(start).Seconds())
|
|
if err != nil {
|
|
metrics.RegionCacheCounterWithGetRegionByIDError.Inc()
|
|
} else {
|
|
metrics.RegionCacheCounterWithGetRegionByIDOK.Inc()
|
|
}
|
|
if err != nil {
|
|
if apicodec.IsDecodeError(err) {
|
|
return nil, errors.Errorf("failed to decode region range key, regionID: %q, err: %v", regionID, err)
|
|
}
|
|
backoffErr = errors.Errorf("loadRegion from PD failed, regionID: %v, err: %v", regionID, err)
|
|
continue
|
|
}
|
|
if reg == nil || reg.Meta == nil {
|
|
return nil, errors.Errorf("region not found for regionID %d", regionID)
|
|
}
|
|
if len(reg.Meta.Peers) == 0 {
|
|
return nil, errors.New("receive Region with no available peer")
|
|
}
|
|
return newRegion(bo, c, reg)
|
|
}
|
|
}
|
|
|
|
// For optimizing BatchLocateKeyRanges, scanRegionsFromCache scans at most `limit` regions from cache.
|
|
// It is the caller's responsibility to make sure that startKey is a node in the B-tree, otherwise, the startKey will not be included in the return regions.
|
|
func (c *RegionCache) scanRegionsFromCache(bo *retry.Backoffer, startKey, endKey []byte, limit int) ([]*Region, error) {
|
|
if limit == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
var regions []*Region
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
regions = c.mu.sorted.AscendGreaterOrEqual(startKey, endKey, limit)
|
|
|
|
return regions, nil
|
|
}
|
|
|
|
func (c *RegionCache) refreshRegionIndex(bo *retry.Backoffer) error {
|
|
totalRegions := make([]*Region, 0)
|
|
startKey := []byte{}
|
|
for {
|
|
regions, err := c.scanRegions(bo, startKey, nil, 10000)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
totalRegions = append(totalRegions, regions...)
|
|
if len(regions) == 0 || len(regions[len(regions)-1].meta.EndKey) == 0 {
|
|
break
|
|
}
|
|
startKey = regions[len(regions)-1].meta.EndKey
|
|
}
|
|
c.mu.refresh(totalRegions)
|
|
return nil
|
|
}
|
|
|
|
// scanRegions scans at most `limit` regions from PD, starts from the region containing `startKey` and in key order.
|
|
// Regions with no leader will not be returned.
|
|
func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte, limit int) ([]*Region, error) {
|
|
if limit == 0 {
|
|
return nil, nil
|
|
}
|
|
ctx := bo.GetCtx()
|
|
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
|
|
span1 := span.Tracer().StartSpan("scanRegions", opentracing.ChildOf(span.Context()))
|
|
defer span1.Finish()
|
|
ctx = opentracing.ContextWithSpan(ctx, span1)
|
|
}
|
|
|
|
var backoffErr error
|
|
for {
|
|
if backoffErr != nil {
|
|
err := bo.Backoff(retry.BoPDRPC, backoffErr)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
}
|
|
start := time.Now()
|
|
// TODO: ScanRegions has been deprecated in favor of BatchScanRegions.
|
|
regionsInfo, err := c.pdClient.ScanRegions(withPDCircuitBreaker(ctx), startKey, endKey, limit, opt.WithAllowFollowerHandle())
|
|
metrics.LoadRegionCacheHistogramWithRegions.Observe(time.Since(start).Seconds())
|
|
if err != nil {
|
|
if apicodec.IsDecodeError(err) {
|
|
return nil, errors.Errorf("failed to decode region range key, limit: %d, err: %v",
|
|
limit, err)
|
|
}
|
|
metrics.RegionCacheCounterWithScanRegionsError.Inc()
|
|
backoffErr = errors.Errorf(
|
|
"scanRegion from PD failed, limit: %d, err: %v",
|
|
limit,
|
|
err)
|
|
continue
|
|
}
|
|
|
|
metrics.RegionCacheCounterWithScanRegionsOK.Inc()
|
|
|
|
if len(regionsInfo) == 0 {
|
|
backoffErr = errors.Errorf("PD returned no region, limit: %d", limit)
|
|
continue
|
|
}
|
|
if regionsHaveGapInRanges([]router.KeyRange{{StartKey: startKey, EndKey: endKey}}, regionsInfo, limit) {
|
|
backoffErr = errors.Errorf("PD returned regions have gaps, limit: %d", limit)
|
|
continue
|
|
}
|
|
validRegions, err := c.handleRegionInfos(bo, regionsInfo, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// If the region information is loaded from the local disk and the current leader has not
|
|
// yet reported a heartbeat to PD, the region information scanned at this time will not include the leader.
|
|
// Retry if there is no valid regions with leaders.
|
|
if len(validRegions) == 0 {
|
|
backoffErr = errors.Errorf("All returned regions have no leaders, limit: %d", limit)
|
|
continue
|
|
}
|
|
return validRegions, nil
|
|
}
|
|
}
|
|
|
|
// batchScanRegions scans at most `limit` regions from PD, starts from the region containing `startKey` and in key order.
|
|
func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []router.KeyRange, limit int, opts ...BatchLocateKeyRangesOpt) ([]*Region, error) {
|
|
if limit == 0 || len(keyRanges) == 0 {
|
|
return nil, nil
|
|
}
|
|
ctx := bo.GetCtx()
|
|
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
|
|
span1 := span.Tracer().StartSpan("batchScanRegions", opentracing.ChildOf(span.Context()))
|
|
defer span1.Finish()
|
|
ctx = opentracing.ContextWithSpan(ctx, span1)
|
|
}
|
|
var batchOpt batchLocateKeyRangesOption
|
|
for _, op := range opts {
|
|
op(&batchOpt)
|
|
}
|
|
// TODO: return start key and end key after redact is introduced.
|
|
var backoffErr error
|
|
for {
|
|
if backoffErr != nil {
|
|
err := bo.Backoff(retry.BoPDRPC, backoffErr)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
}
|
|
start := time.Now()
|
|
pdOpts := []opt.GetRegionOption{
|
|
opt.WithAllowFollowerHandle(),
|
|
opt.WithOutputMustContainAllKeyRange(),
|
|
}
|
|
if batchOpt.needBuckets {
|
|
pdOpts = append(pdOpts, opt.WithBuckets())
|
|
}
|
|
regionsInfo, err := c.pdClient.BatchScanRegions(withPDCircuitBreaker(ctx), keyRanges, limit, pdOpts...)
|
|
metrics.LoadRegionCacheHistogramWithBatchScanRegions.Observe(time.Since(start).Seconds())
|
|
if err != nil {
|
|
if st, ok := status.FromError(err); ok && st.Code() == codes.Unimplemented {
|
|
return c.batchScanRegionsFallback(bo, keyRanges, limit, opts...)
|
|
}
|
|
if apicodec.IsDecodeError(err) {
|
|
return nil, errors.Errorf("failed to decode region range key, range num: %d, limit: %d, err: %v",
|
|
len(keyRanges), limit, err)
|
|
}
|
|
metrics.RegionCacheCounterWithBatchScanRegionsError.Inc()
|
|
backoffErr = errors.Errorf(
|
|
"batchScanRegion from PD failed, range num: %d, limit: %d, err: %v",
|
|
len(keyRanges),
|
|
limit,
|
|
err)
|
|
continue
|
|
}
|
|
|
|
metrics.RegionCacheCounterWithBatchScanRegionsOK.Inc()
|
|
if len(regionsInfo) == 0 {
|
|
backoffErr = errors.Errorf(
|
|
"PD returned no region, range num: %d, limit: %d",
|
|
len(keyRanges), limit,
|
|
)
|
|
continue
|
|
}
|
|
if regionsHaveGapInRanges(keyRanges, regionsInfo, limit) {
|
|
backoffErr = errors.Errorf(
|
|
"PD returned regions have gaps, range num: %d, limit: %d",
|
|
len(keyRanges), limit,
|
|
)
|
|
continue
|
|
}
|
|
validRegions, err := c.handleRegionInfos(bo, regionsInfo, batchOpt.needRegionHasLeaderPeer)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// If the region information is loaded from the local disk and the current leader has not
|
|
// yet reported a heartbeat to PD, the region information scanned at this time will not include the leader.
|
|
// Retry if there is no valid regions with leaders.
|
|
if len(validRegions) == 0 {
|
|
backoffErr = errors.Errorf("All returned regions have no leaders, limit: %d", limit)
|
|
continue
|
|
}
|
|
return validRegions, nil
|
|
}
|
|
}
|
|
|
|
// regionsHaveGapInRanges checks if the loaded regions can fully cover the key ranges.
|
|
// If there are any gaps between the regions, it returns true, then the requests might be retried.
|
|
// TODO: PD client now supports gap detection. Currently retained it as double verification.
|
|
// Remove this function after validation completes.
|
|
func regionsHaveGapInRanges(ranges []router.KeyRange, regionsInfo []*router.Region, limit int) bool {
|
|
if len(ranges) == 0 {
|
|
return false
|
|
}
|
|
if len(regionsInfo) == 0 {
|
|
return true
|
|
}
|
|
checkIdx := 0 // checked index of ranges
|
|
checkKey := ranges[0].StartKey // checked key of ranges
|
|
for _, r := range regionsInfo {
|
|
if r.Meta == nil {
|
|
return true
|
|
}
|
|
if bytes.Compare(r.Meta.StartKey, checkKey) > 0 {
|
|
// there is a gap between returned region's start_key and current check key
|
|
return true
|
|
}
|
|
if len(r.Meta.EndKey) == 0 {
|
|
// the current region contains all the rest ranges.
|
|
return false
|
|
}
|
|
checkKey = r.Meta.EndKey
|
|
for len(ranges[checkIdx].EndKey) > 0 && bytes.Compare(checkKey, ranges[checkIdx].EndKey) >= 0 {
|
|
// the end_key of returned region can cover multi ranges.
|
|
checkIdx++
|
|
if checkIdx == len(ranges) {
|
|
// all ranges are covered.
|
|
return false
|
|
}
|
|
}
|
|
if bytes.Compare(checkKey, ranges[checkIdx].StartKey) < 0 {
|
|
// if check_key < start_key, move it forward to start_key.
|
|
checkKey = ranges[checkIdx].StartKey
|
|
}
|
|
}
|
|
if limit > 0 && len(regionsInfo) == limit {
|
|
// the regionsInfo is limited by the limit, so there may be some ranges not covered.
|
|
// But the previous regions are continuous, so we just need to check the rest ranges.
|
|
return false
|
|
}
|
|
if checkIdx < len(ranges)-1 {
|
|
// there are still some ranges not covered.
|
|
return true
|
|
}
|
|
if len(checkKey) == 0 {
|
|
return false
|
|
} else if len(ranges[checkIdx].EndKey) == 0 {
|
|
return true
|
|
}
|
|
return bytes.Compare(checkKey, ranges[checkIdx].EndKey) < 0
|
|
}
|
|
|
|
func (c *RegionCache) batchScanRegionsFallback(bo *retry.Backoffer, keyRanges []router.KeyRange, limit int, opts ...BatchLocateKeyRangesOpt) ([]*Region, error) {
|
|
logutil.BgLogger().Warn("batch scan regions fallback to scan regions", zap.Int("range-num", len(keyRanges)))
|
|
res := make([]*Region, 0, len(keyRanges))
|
|
var lastRegion *Region
|
|
for _, keyRange := range keyRanges {
|
|
if lastRegion != nil {
|
|
endKey := lastRegion.EndKey()
|
|
if len(endKey) == 0 {
|
|
// end_key is empty means the last region is the last region of the store, which certainly contains all the rest ranges.
|
|
break
|
|
}
|
|
if bytes.Compare(endKey, keyRange.EndKey) >= 0 {
|
|
continue
|
|
}
|
|
if bytes.Compare(endKey, keyRange.StartKey) > 0 {
|
|
keyRange.StartKey = endKey
|
|
}
|
|
}
|
|
regions, err := c.scanRegions(bo, keyRange.StartKey, keyRange.EndKey, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(regions) > 0 {
|
|
lastRegion = regions[len(regions)-1]
|
|
}
|
|
res = append(res, regions...)
|
|
if len(regions) >= limit {
|
|
return res, nil
|
|
}
|
|
limit -= len(regions)
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (c *RegionCache) handleRegionInfos(bo *retry.Backoffer, regionsInfo []*router.Region, needLeader bool) ([]*Region, error) {
|
|
regions := make([]*Region, 0, len(regionsInfo))
|
|
for _, r := range regionsInfo {
|
|
// Leader id = 0 indicates no leader.
|
|
if needLeader && (r.Leader == nil || r.Leader.GetId() == 0) {
|
|
continue
|
|
}
|
|
region, err := newRegion(bo, c, r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
regions = append(regions, region)
|
|
}
|
|
if len(regions) == 0 {
|
|
return nil, nil
|
|
}
|
|
if len(regions) < len(regionsInfo) {
|
|
logutil.Logger(context.Background()).Debug(
|
|
"regionCache: scanRegion finished but some regions has no leader.")
|
|
}
|
|
return regions, nil
|
|
}
|
|
|
|
// GetCachedRegionWithRLock returns region with lock.
|
|
func (c *RegionCache) GetCachedRegionWithRLock(regionID RegionVerID) (r *Region) {
|
|
c.mu.RLock()
|
|
r = c.mu.regions[regionID]
|
|
c.mu.RUnlock()
|
|
return
|
|
}
|
|
|
|
func (c *RegionCache) getStoreAddr(bo *retry.Backoffer, region *Region, store *Store) (addr string, err error) {
|
|
state := store.getResolveState()
|
|
switch state {
|
|
case resolved, needCheck:
|
|
addr = store.addr
|
|
return
|
|
case unresolved:
|
|
addr, err = store.initResolve(bo, c.stores)
|
|
return
|
|
case deleted:
|
|
addr = c.changeToActiveStore(region, store.storeID)
|
|
return
|
|
case tombstone:
|
|
return "", nil
|
|
default:
|
|
panic("unsupported resolve state")
|
|
}
|
|
}
|
|
|
|
func (c *RegionCache) getProxyStore(region *Region, store *Store, rs *regionStore, workStoreIdx AccessIndex) (proxyStore *Store, proxyAccessIdx AccessIndex, proxyStoreIdx int) {
|
|
if !c.enableForwarding || store.storeType != tikvrpc.TiKV || store.getLivenessState() == reachable {
|
|
return
|
|
}
|
|
|
|
if rs.proxyTiKVIdx >= 0 {
|
|
storeIdx, proxyStore := rs.accessStore(tiKVOnly, rs.proxyTiKVIdx)
|
|
return proxyStore, rs.proxyTiKVIdx, storeIdx
|
|
}
|
|
|
|
tikvNum := rs.accessStoreNum(tiKVOnly)
|
|
if tikvNum <= 1 {
|
|
return
|
|
}
|
|
|
|
// Randomly select an non-leader peer
|
|
first := rand.Intn(tikvNum - 1)
|
|
if first >= int(workStoreIdx) {
|
|
first = (first + 1) % tikvNum
|
|
}
|
|
|
|
// If the current selected peer is not reachable, switch to the next one, until a reachable peer is found or all
|
|
// peers are checked.
|
|
for i := 0; i < tikvNum; i++ {
|
|
index := (i + first) % tikvNum
|
|
// Skip work store which is the actual store to be accessed
|
|
if index == int(workStoreIdx) {
|
|
continue
|
|
}
|
|
storeIdx, store := rs.accessStore(tiKVOnly, AccessIndex(index))
|
|
// Skip unreachable stores.
|
|
if store.getLivenessState() == unreachable {
|
|
continue
|
|
}
|
|
|
|
rs.setProxyStoreIdx(region, AccessIndex(index))
|
|
return store, AccessIndex(index), storeIdx
|
|
}
|
|
|
|
return nil, 0, 0
|
|
}
|
|
|
|
// changeToActiveStore replace the deleted store in the region by an up-to-date store in the stores map.
|
|
// The order is guaranteed by reResolve() which adds the new store before marking old store deleted.
|
|
func (c *RegionCache) changeToActiveStore(region *Region, storeID uint64) (addr string) {
|
|
store, _ := c.stores.get(storeID)
|
|
for {
|
|
oldRegionStore := region.getStore()
|
|
newRegionStore := oldRegionStore.clone()
|
|
newRegionStore.stores = make([]*Store, 0, len(oldRegionStore.stores))
|
|
for _, s := range oldRegionStore.stores {
|
|
if s.storeID == store.storeID {
|
|
newRegionStore.stores = append(newRegionStore.stores, store)
|
|
} else {
|
|
newRegionStore.stores = append(newRegionStore.stores, s)
|
|
}
|
|
}
|
|
if region.compareAndSwapStore(oldRegionStore, newRegionStore) {
|
|
break
|
|
}
|
|
}
|
|
addr = store.addr
|
|
return
|
|
}
|
|
|
|
// OnBucketVersionNotMatch removes the old buckets meta if the version is stale.
|
|
func (c *RegionCache) OnBucketVersionNotMatch(ctx *RPCContext, version uint64, keys [][]byte) {
|
|
r := c.GetCachedRegionWithRLock(ctx.Region)
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
buckets := r.getStore().buckets
|
|
if buckets == nil || buckets.GetVersion() < version {
|
|
oldStore := r.getStore()
|
|
store := oldStore.clone()
|
|
store.buckets = &metapb.Buckets{
|
|
Version: version,
|
|
Keys: keys,
|
|
RegionId: r.meta.GetId(),
|
|
}
|
|
r.compareAndSwapStore(oldStore, store)
|
|
}
|
|
}
|
|
|
|
// OnRegionEpochNotMatch removes the old region and inserts new regions into the cache.
|
|
// It returns whether retries the request because it's possible the region epoch is ahead of TiKV's due to slow appling.
|
|
func (c *RegionCache) OnRegionEpochNotMatch(bo *retry.Backoffer, ctx *RPCContext, currentRegions []*metapb.Region) (bool, error) {
|
|
if len(currentRegions) == 0 {
|
|
c.InvalidateCachedRegionWithReason(ctx.Region, EpochNotMatch)
|
|
return false, nil
|
|
}
|
|
|
|
// Find whether the region epoch in `ctx` is ahead of TiKV's. If so, backoff.
|
|
for _, meta := range currentRegions {
|
|
if meta.GetId() == ctx.Region.id &&
|
|
(meta.GetRegionEpoch().GetConfVer() < ctx.Region.confVer ||
|
|
meta.GetRegionEpoch().GetVersion() < ctx.Region.ver) {
|
|
err := errors.Errorf("region epoch is ahead of tikv. rpc ctx: %+v, currentRegions: %+v", ctx, currentRegions)
|
|
logutil.Logger(bo.GetCtx()).Info("region epoch is ahead of tikv", zap.Error(err))
|
|
return true, bo.Backoff(retry.BoRegionMiss, err)
|
|
}
|
|
}
|
|
|
|
var buckets *metapb.Buckets
|
|
c.mu.Lock()
|
|
cachedRegion, ok := c.mu.regions[ctx.Region]
|
|
if ok {
|
|
buckets = cachedRegion.getStore().buckets
|
|
}
|
|
c.mu.Unlock()
|
|
|
|
needInvalidateOld := true
|
|
newRegions := make([]*Region, 0, len(currentRegions))
|
|
// If the region epoch is not ahead of TiKV's, replace region meta in region cache.
|
|
for _, meta := range currentRegions {
|
|
// TODO(youjiali1995): new regions inherit old region's buckets now. Maybe we should make EpochNotMatch error
|
|
// carry buckets information. Can it bring much overhead?
|
|
region, err := newRegion(bo, c, &router.Region{Meta: meta, Buckets: buckets})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
var initLeaderStoreID uint64
|
|
if ctx.Store.storeType == tikvrpc.TiFlash {
|
|
initLeaderStoreID = region.findElectableStoreID()
|
|
} else {
|
|
initLeaderStoreID = ctx.Store.storeID
|
|
}
|
|
region.switchWorkLeaderToPeer(region.getPeerOnStore(initLeaderStoreID))
|
|
newRegions = append(newRegions, region)
|
|
if ctx.Region == region.VerID() {
|
|
needInvalidateOld = false
|
|
}
|
|
}
|
|
if needInvalidateOld && cachedRegion != nil {
|
|
cachedRegion.invalidate(EpochNotMatch)
|
|
}
|
|
|
|
c.mu.Lock()
|
|
for _, region := range newRegions {
|
|
c.insertRegionToCache(region, true, true)
|
|
}
|
|
c.mu.Unlock()
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// PDClient returns the pd.Client in RegionCache.
|
|
func (c *RegionCache) PDClient() pd.Client {
|
|
return c.pdClient
|
|
}
|
|
|
|
// GetTiFlashStores returns the information of all tiflash nodes. Like `GetAllStores`, the method only returns resolved
|
|
// stores so that users won't be bothered by tombstones. (related issue: https://github.com/pingcap/tidb/issues/46602)
|
|
func (c *RegionCache) GetTiFlashStores(labelFilter LabelFilter) []*Store {
|
|
return c.stores.filter(nil, func(s *Store) bool {
|
|
return s.storeType == tikvrpc.TiFlash && labelFilter(s.labels) && s.getResolveState() == resolved
|
|
})
|
|
}
|
|
|
|
// GetTiFlashComputeStores returns all stores with lable <engine, tiflash_compute>.
|
|
func (c *RegionCache) GetTiFlashComputeStores(bo *retry.Backoffer) (res []*Store, err error) {
|
|
stores, needReload := c.stores.listTiflashComputeStores()
|
|
|
|
if needReload {
|
|
stores, err = reloadTiFlashComputeStores(bo.GetCtx(), c.stores)
|
|
if err == nil {
|
|
c.stores.setTiflashComputeStores(stores)
|
|
}
|
|
return stores, err
|
|
}
|
|
return stores, nil
|
|
}
|
|
|
|
func reloadTiFlashComputeStores(ctx context.Context, registry storeRegistry) (res []*Store, _ error) {
|
|
stores, err := registry.fetchAllStores(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, s := range stores {
|
|
if s.GetState() == metapb.StoreState_Up && isStoreContainLabel(s.GetLabels(), tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashCompute) {
|
|
res = append(res, newStore(
|
|
s.GetId(),
|
|
s.GetAddress(),
|
|
s.GetPeerAddress(),
|
|
s.GetStatusAddress(),
|
|
tikvrpc.GetStoreTypeByMeta(s),
|
|
resolved,
|
|
s.GetLabels(),
|
|
))
|
|
}
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
// InvalidateTiFlashComputeStoresIfGRPCError will invalid cache if is GRPC error.
|
|
// For now, only consider GRPC unavailable error.
|
|
func (c *RegionCache) InvalidateTiFlashComputeStoresIfGRPCError(err error) bool {
|
|
var invalidate bool
|
|
if st, ok := status.FromError(err); ok {
|
|
switch st.Code() {
|
|
case codes.Unavailable:
|
|
invalidate = true
|
|
default:
|
|
}
|
|
}
|
|
if !invalidate {
|
|
return false
|
|
}
|
|
|
|
c.InvalidateTiFlashComputeStores()
|
|
return true
|
|
}
|
|
|
|
// InvalidateTiFlashComputeStores set needReload be true,
|
|
// and will refresh tiflash_compute store cache next time.
|
|
func (c *RegionCache) InvalidateTiFlashComputeStores() {
|
|
c.stores.markTiflashComputeStoresNeedReload()
|
|
}
|
|
|
|
// UpdateBucketsIfNeeded queries PD to update the buckets of the region in the cache if
|
|
// the latestBucketsVer is newer than the cached one.
|
|
func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsVer uint64) {
|
|
r := c.GetCachedRegionWithRLock(regionID)
|
|
if r == nil {
|
|
return
|
|
}
|
|
|
|
buckets := r.getStore().buckets
|
|
var bucketsVer uint64
|
|
if buckets != nil {
|
|
bucketsVer = buckets.GetVersion()
|
|
}
|
|
if bucketsVer < latestBucketsVer {
|
|
// TODO(youjiali1995): use singleflight.
|
|
go func() {
|
|
bo := retry.NewBackoffer(context.Background(), 20000)
|
|
observeLoadRegion("ByID", r, false, 0, loadRegionReasonUpdateBuckets)
|
|
new, err := c.loadRegionByID(bo, regionID.id)
|
|
if err != nil {
|
|
logutil.Logger(bo.GetCtx()).Error("failed to update buckets",
|
|
zap.String("region", regionID.String()), zap.Uint64("bucketsVer", bucketsVer),
|
|
zap.Uint64("latestBucketsVer", latestBucketsVer), zap.Error(err))
|
|
return
|
|
}
|
|
c.mu.Lock()
|
|
c.insertRegionToCache(new, true, true)
|
|
c.mu.Unlock()
|
|
}()
|
|
}
|
|
}
|
|
|
|
const cleanCacheInterval = time.Second
|
|
const cleanRegionNumPerRound = 50
|
|
const refreshStoreListInterval = 10 * time.Second
|
|
|
|
// gcScanItemHook is only used for testing
|
|
var gcScanItemHook = new(atomic.Pointer[func(*btreeItem)])
|
|
|
|
// The returned function is expected to run in a background goroutine.
|
|
// It keeps iterating over the whole region cache, searching for stale region
|
|
// info. It runs at cleanCacheInterval and checks only cleanRegionNumPerRound
|
|
// regions. In this way, the impact of this background goroutine should be
|
|
// negligible.
|
|
func (c *RegionCache) gcRoundFunc(limit int) func(context.Context, time.Time) bool {
|
|
if limit < 1 {
|
|
limit = 1
|
|
}
|
|
beginning := newBtreeSearchItem([]byte(""))
|
|
cursor := beginning
|
|
expiredItems := make([]*btreeItem, limit)
|
|
needCheckRegions := make([]*Region, limit)
|
|
|
|
return func(_ context.Context, t time.Time) bool {
|
|
expiredItems = expiredItems[:0]
|
|
needCheckRegions = needCheckRegions[:0]
|
|
hasMore, count, ts := false, 0, t.Unix()
|
|
onScanItem := gcScanItemHook.Load()
|
|
|
|
// Only RLock when checking TTL to avoid blocking other readers
|
|
c.mu.RLock()
|
|
c.mu.sorted.b.AscendGreaterOrEqual(cursor, func(item *btreeItem) bool {
|
|
count++
|
|
if count > limit {
|
|
cursor = item
|
|
hasMore = true
|
|
return false
|
|
}
|
|
if onScanItem != nil {
|
|
(*onScanItem)(item)
|
|
}
|
|
if item.cachedRegion.isCacheTTLExpired(ts) {
|
|
expiredItems = append(expiredItems, item)
|
|
} else {
|
|
needCheckRegions = append(needCheckRegions, item.cachedRegion)
|
|
}
|
|
return true
|
|
})
|
|
c.mu.RUnlock()
|
|
|
|
// Reach the end of the region cache, start from the beginning
|
|
if !hasMore {
|
|
cursor = beginning
|
|
}
|
|
|
|
// Clean expired regions
|
|
if len(expiredItems) > 0 {
|
|
c.mu.Lock()
|
|
for _, item := range expiredItems {
|
|
c.mu.sorted.b.Delete(item)
|
|
c.mu.removeVersionFromCache(item.cachedRegion.VerID(), item.cachedRegion.GetID())
|
|
}
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
// Check remaining regions and update sync flags
|
|
for _, region := range needCheckRegions {
|
|
syncFlags := region.getSyncFlags()
|
|
if syncFlags&needDelayedReloadReady > 0 {
|
|
// the region will be reload soon on access
|
|
continue
|
|
}
|
|
if syncFlags&needDelayedReloadPending > 0 {
|
|
region.setSyncFlags(needDelayedReloadReady)
|
|
// the region will be reload soon on access, no need to check if it needs to be expired
|
|
continue
|
|
}
|
|
if syncFlags&needExpireAfterTTL == 0 {
|
|
regionStore := region.getStore()
|
|
for i, store := range regionStore.stores {
|
|
// if the region has a stale or unreachable store, let it expire after TTL.
|
|
if atomic.LoadUint32(&store.epoch) != regionStore.storeEpochs[i] || store.getLivenessState() != reachable {
|
|
region.setSyncFlags(needExpireAfterTTL)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
}
|
|
|
|
// btreeItem is BTree's Item that uses []byte to compare.
|
|
type btreeItem struct {
|
|
key []byte
|
|
cachedRegion *Region
|
|
}
|
|
|
|
func newBtreeItem(cr *Region) *btreeItem {
|
|
return &btreeItem{
|
|
key: cr.StartKey(),
|
|
cachedRegion: cr,
|
|
}
|
|
}
|
|
|
|
func newBtreeSearchItem(key []byte) *btreeItem {
|
|
return &btreeItem{
|
|
key: key,
|
|
}
|
|
}
|
|
|
|
func (item *btreeItem) Less(other btree.Item) bool {
|
|
return bytes.Compare(item.key, other.(*btreeItem).key) < 0
|
|
}
|
|
|
|
// GetID returns id.
|
|
func (r *Region) GetID() uint64 {
|
|
return r.meta.GetId()
|
|
}
|
|
|
|
// GetMeta returns region meta.
|
|
func (r *Region) GetMeta() *metapb.Region {
|
|
return proto.Clone(r.meta).(*metapb.Region)
|
|
}
|
|
|
|
// GetLeaderPeerID returns leader peer ID.
|
|
func (r *Region) GetLeaderPeerID() uint64 {
|
|
store := r.getStore()
|
|
if int(store.workTiKVIdx) >= store.accessStoreNum(tiKVOnly) {
|
|
return 0
|
|
}
|
|
storeIdx, _ := store.accessStore(tiKVOnly, store.workTiKVIdx)
|
|
return r.meta.Peers[storeIdx].Id
|
|
}
|
|
|
|
// GetLeaderStoreID returns the store ID of the leader region.
|
|
func (r *Region) GetLeaderStoreID() uint64 {
|
|
store := r.getStore()
|
|
if int(store.workTiKVIdx) >= store.accessStoreNum(tiKVOnly) {
|
|
return 0
|
|
}
|
|
storeIdx, _ := store.accessStore(tiKVOnly, store.workTiKVIdx)
|
|
return r.meta.Peers[storeIdx].StoreId
|
|
}
|
|
|
|
func (r *Region) getKvStorePeer(rs *regionStore, aidx AccessIndex) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) {
|
|
storeIdx, store = rs.accessStore(tiKVOnly, aidx)
|
|
peer = r.meta.Peers[storeIdx]
|
|
accessIdx = aidx
|
|
return
|
|
}
|
|
|
|
// WorkStorePeer returns current work store with work peer.
|
|
func (r *Region) WorkStorePeer(rs *regionStore) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) {
|
|
return r.getKvStorePeer(rs, rs.workTiKVIdx)
|
|
}
|
|
|
|
// FollowerStorePeer returns a follower store with follower peer.
|
|
func (r *Region) FollowerStorePeer(rs *regionStore, followerStoreSeed uint32, op *storeSelectorOp) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) {
|
|
return r.getKvStorePeer(rs, rs.follower(followerStoreSeed, op))
|
|
}
|
|
|
|
// AnyStorePeer returns a leader or follower store with the associated peer.
|
|
func (r *Region) AnyStorePeer(rs *regionStore, followerStoreSeed uint32, op *storeSelectorOp) (store *Store, peer *metapb.Peer, accessIdx AccessIndex, storeIdx int) {
|
|
return r.getKvStorePeer(rs, rs.kvPeer(followerStoreSeed, op))
|
|
}
|
|
|
|
// RegionVerID is a unique ID that can identify a Region at a specific version.
|
|
type RegionVerID struct {
|
|
id uint64
|
|
confVer uint64
|
|
ver uint64
|
|
}
|
|
|
|
// NewRegionVerID creates a region ver id, which used for invalidating regions.
|
|
func NewRegionVerID(id, confVer, ver uint64) RegionVerID {
|
|
return RegionVerID{id, confVer, ver}
|
|
}
|
|
|
|
// GetID returns the id of the region
|
|
func (r *RegionVerID) GetID() uint64 {
|
|
return r.id
|
|
}
|
|
|
|
// GetVer returns the version of the region's epoch
|
|
func (r *RegionVerID) GetVer() uint64 {
|
|
return r.ver
|
|
}
|
|
|
|
// GetConfVer returns the conf ver of the region's epoch
|
|
func (r *RegionVerID) GetConfVer() uint64 {
|
|
return r.confVer
|
|
}
|
|
|
|
// String formats the RegionVerID to string
|
|
func (r *RegionVerID) String() string {
|
|
return fmt.Sprintf("{ region id: %v, ver: %v, confVer: %v }", r.id, r.ver, r.confVer)
|
|
}
|
|
|
|
// Equals checks whether the RegionVerID equals to another one
|
|
func (r *RegionVerID) Equals(another RegionVerID) bool {
|
|
return r.id == another.id && r.confVer == another.confVer && r.ver == another.ver
|
|
}
|
|
|
|
// VerID returns the Region's RegionVerID.
|
|
func (r *Region) VerID() RegionVerID {
|
|
return RegionVerID{
|
|
id: r.meta.GetId(),
|
|
confVer: r.meta.GetRegionEpoch().GetConfVer(),
|
|
ver: r.meta.GetRegionEpoch().GetVersion(),
|
|
}
|
|
}
|
|
|
|
// StartKey returns StartKey.
|
|
func (r *Region) StartKey() []byte {
|
|
return r.meta.StartKey
|
|
}
|
|
|
|
// EndKey returns EndKey.
|
|
func (r *Region) EndKey() []byte {
|
|
return r.meta.EndKey
|
|
}
|
|
|
|
func (r *Region) getPeerStoreIndex(peer *metapb.Peer) (idx int, found bool) {
|
|
if len(r.meta.Peers) == 0 || peer == nil {
|
|
return
|
|
}
|
|
for i, p := range r.meta.Peers {
|
|
if isSamePeer(p, peer) {
|
|
idx = i
|
|
found = true
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// switchWorkLeaderToPeer switches current store to the one on specific store. It returns
|
|
// false if no peer matches the peer.
|
|
func (r *Region) switchWorkLeaderToPeer(peer *metapb.Peer) (found bool) {
|
|
globalStoreIdx, found := r.getPeerStoreIndex(peer)
|
|
if !found {
|
|
return
|
|
}
|
|
retry:
|
|
// switch to new leader.
|
|
oldRegionStore := r.getStore()
|
|
var leaderIdx AccessIndex
|
|
for i, gIdx := range oldRegionStore.accessIndex[tiKVOnly] {
|
|
if gIdx == globalStoreIdx {
|
|
leaderIdx = AccessIndex(i)
|
|
}
|
|
}
|
|
if oldRegionStore.workTiKVIdx == leaderIdx {
|
|
return
|
|
}
|
|
newRegionStore := oldRegionStore.clone()
|
|
newRegionStore.workTiKVIdx = leaderIdx
|
|
newRegionStore.storeEpochs[leaderIdx] = atomic.LoadUint32(&newRegionStore.stores[leaderIdx].epoch)
|
|
if !r.compareAndSwapStore(oldRegionStore, newRegionStore) {
|
|
goto retry
|
|
}
|
|
return
|
|
}
|
|
|
|
func (r *regionStore) switchNextFlashPeer(rr *Region, currentPeerIdx AccessIndex) {
|
|
nextIdx := (currentPeerIdx + 1) % AccessIndex(r.accessStoreNum(tiFlashOnly))
|
|
newRegionStore := r.clone()
|
|
newRegionStore.workTiFlashIdx.Store(int32(nextIdx))
|
|
rr.compareAndSwapStore(r, newRegionStore)
|
|
}
|
|
|
|
func (r *regionStore) switchNextTiKVPeer(rr *Region, currentPeerIdx AccessIndex) {
|
|
if r.workTiKVIdx != currentPeerIdx {
|
|
return
|
|
}
|
|
nextIdx := (currentPeerIdx + 1) % AccessIndex(r.accessStoreNum(tiKVOnly))
|
|
newRegionStore := r.clone()
|
|
newRegionStore.workTiKVIdx = nextIdx
|
|
rr.compareAndSwapStore(r, newRegionStore)
|
|
}
|
|
|
|
func (r *regionStore) setProxyStoreIdx(rr *Region, idx AccessIndex) {
|
|
if r.proxyTiKVIdx == idx {
|
|
return
|
|
}
|
|
|
|
newRegionStore := r.clone()
|
|
newRegionStore.proxyTiKVIdx = idx
|
|
success := rr.compareAndSwapStore(r, newRegionStore)
|
|
logutil.BgLogger().Debug("try set proxy store index",
|
|
zap.Uint64("region", rr.GetID()),
|
|
zap.Int("index", int(idx)),
|
|
zap.Bool("success", success))
|
|
}
|
|
|
|
func (r *regionStore) unsetProxyStoreIfNeeded(rr *Region) {
|
|
r.setProxyStoreIdx(rr, -1)
|
|
}
|
|
|
|
func (r *Region) findElectableStoreID() uint64 {
|
|
if len(r.meta.Peers) == 0 {
|
|
return 0
|
|
}
|
|
for _, p := range r.meta.Peers {
|
|
if p.Role != metapb.PeerRole_Learner {
|
|
return p.StoreId
|
|
}
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func (r *Region) getPeerOnStore(storeID uint64) *metapb.Peer {
|
|
for _, p := range r.meta.Peers {
|
|
if p.StoreId == storeID {
|
|
return p
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Contains checks whether the key is in the region, for the maximum region endKey is empty.
|
|
// startKey <= key < endKey.
|
|
func (r *Region) Contains(key []byte) bool {
|
|
return contains(r.meta.GetStartKey(), r.meta.GetEndKey(), key)
|
|
}
|
|
|
|
// ContainsByEnd check the region contains the greatest key that is less than key.
|
|
// for the maximum region endKey is empty.
|
|
// startKey < key <= endKey.
|
|
func (r *Region) ContainsByEnd(key []byte) bool {
|
|
// Only a region's right bound expands to inf contains the point at inf.
|
|
if len(key) == 0 {
|
|
return len(r.EndKey()) == 0
|
|
}
|
|
return bytes.Compare(r.meta.GetStartKey(), key) < 0 &&
|
|
(bytes.Compare(key, r.meta.GetEndKey()) <= 0 || len(r.meta.GetEndKey()) == 0)
|
|
}
|
|
|
|
// checkAndUpdateStoreHealthStatus checks and updates health stats on each store.
|
|
func (c *RegionCache) checkAndUpdateStoreHealthStatus(ctx context.Context, now time.Time) bool {
|
|
defer func() {
|
|
r := recover()
|
|
if r != nil {
|
|
logutil.BgLogger().Error("panic in the checkAndUpdateStoreHealthStatus goroutine",
|
|
zap.Any("r", r),
|
|
zap.Stack("stack trace"))
|
|
if _, err := util.EvalFailpoint("doNotRecoverStoreHealthCheckPanic"); err == nil {
|
|
panic(r)
|
|
}
|
|
}
|
|
}()
|
|
var stores []*Store
|
|
c.stores.forEach(func(store *Store) {
|
|
stores = append(stores, store)
|
|
})
|
|
for _, store := range stores {
|
|
store.healthStatus.tick(ctx, now, store, c.requestHealthFeedbackCallback)
|
|
healthDetails := store.healthStatus.GetHealthStatusDetail()
|
|
metrics.TiKVStoreSlowScoreGauge.WithLabelValues(strconv.FormatUint(store.storeID, 10)).Set(float64(healthDetails.ClientSideSlowScore))
|
|
metrics.TiKVFeedbackSlowScoreGauge.WithLabelValues(strconv.FormatUint(store.storeID, 10)).Set(float64(healthDetails.TiKVSideSlowScore))
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// reportStoreReplicaFlows reports the statistics on the related replicaFlowsType.
|
|
func (c *RegionCache) reportStoreReplicaFlows() {
|
|
c.stores.forEach(func(store *Store) {
|
|
for destType := toLeader; destType < numReplicaFlowsType; destType++ {
|
|
metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), strconv.FormatUint(store.storeID, 10)).Set(float64(store.getReplicaFlowsStats(destType)))
|
|
store.resetReplicaFlowsStats(destType)
|
|
}
|
|
})
|
|
}
|
|
|
|
func isSamePeer(lhs *metapb.Peer, rhs *metapb.Peer) bool {
|
|
return lhs == rhs || (lhs.GetId() == rhs.GetId() && lhs.GetStoreId() == rhs.GetStoreId())
|
|
}
|
|
|
|
// contains returns true if startKey <= key < endKey. Empty endKey is the maximum key.
|
|
func contains(startKey, endKey, key []byte) bool {
|
|
return bytes.Compare(startKey, key) <= 0 &&
|
|
(bytes.Compare(key, endKey) < 0 || len(endKey) == 0)
|
|
}
|
|
|
|
func (c *RegionCache) onHealthFeedback(feedback *kvrpcpb.HealthFeedback) {
|
|
store, ok := c.stores.get(feedback.GetStoreId())
|
|
if !ok {
|
|
logutil.BgLogger().Info("dropped health feedback info due to unknown store id", zap.Uint64("storeID", feedback.GetStoreId()))
|
|
return
|
|
}
|
|
store.recordHealthFeedback(feedback)
|
|
}
|
|
|
|
// GetClientEventListener returns the listener to observe the RPC client's events and let the region cache respond to
|
|
// them. When creating the `KVStore` using `tikv.NewKVStore` function, the listener will be setup immediately.
|
|
func (c *RegionCache) GetClientEventListener() client.ClientEventListener {
|
|
return ®ionCacheClientEventListener{c: c}
|
|
}
|
|
|
|
// regionCacheClientEventListener is the listener to let RegionCache respond to events in the RPC client.
|
|
type regionCacheClientEventListener struct {
|
|
c *RegionCache
|
|
}
|
|
|
|
// OnHealthFeedback implements the `client.ClientEventListener` interface.
|
|
func (l *regionCacheClientEventListener) OnHealthFeedback(feedback *kvrpcpb.HealthFeedback) {
|
|
l.c.onHealthFeedback(feedback)
|
|
}
|