mirror of https://github.com/tikv/client-go.git
MinSafeTS support TiFlash (#642)
Signed-off-by: hehechen <awd123456sss@gmail.com>
This commit is contained in:
parent
a4f5c00b46
commit
fe3536dd59
|
|
@ -491,7 +491,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*
|
|||
}
|
||||
|
||||
// SetRegionCacheStore is used to set a store in region cache, for testing only
|
||||
func (c *RegionCache) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
|
||||
func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
|
||||
c.storeMu.Lock()
|
||||
defer c.storeMu.Unlock()
|
||||
c.storeMu.stores[id] = &Store{
|
||||
|
|
@ -499,6 +499,8 @@ func (c *RegionCache) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointT
|
|||
storeType: storeType,
|
||||
state: state,
|
||||
labels: labels,
|
||||
addr: addr,
|
||||
peerAddr: peerAddr,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1381,15 +1383,24 @@ func (c *RegionCache) GetStoresByType(typ tikvrpc.EndpointType) []*Store {
|
|||
})
|
||||
}
|
||||
stores = append(stores, &Store{
|
||||
addr: store.addr,
|
||||
storeID: store.storeID,
|
||||
labels: storeLabel,
|
||||
addr: store.addr,
|
||||
peerAddr: store.peerAddr,
|
||||
storeID: store.storeID,
|
||||
labels: storeLabel,
|
||||
storeType: typ,
|
||||
})
|
||||
}
|
||||
}
|
||||
return stores
|
||||
}
|
||||
|
||||
// GetAllStores gets TiKV and TiFlash stores.
|
||||
func (c *RegionCache) GetAllStores() []*Store {
|
||||
stores := c.GetStoresByType(tikvrpc.TiKV)
|
||||
tiflashStores := c.GetStoresByType(tikvrpc.TiFlash)
|
||||
return append(stores, tiflashStores...)
|
||||
}
|
||||
|
||||
func filterUnavailablePeers(region *pd.Region) {
|
||||
if len(region.DownPeers) == 0 {
|
||||
return
|
||||
|
|
@ -1830,6 +1841,7 @@ func (c *RegionCache) reloadTiFlashComputeStores(bo *retry.Backoffer) (res []*St
|
|||
res = append(res, &Store{
|
||||
storeID: s.GetId(),
|
||||
addr: s.GetAddress(),
|
||||
peerAddr: s.GetPeerAddress(),
|
||||
saddr: s.GetStatusAddress(),
|
||||
storeType: tikvrpc.GetStoreTypeByMeta(s),
|
||||
labels: s.GetLabels(),
|
||||
|
|
@ -2148,6 +2160,7 @@ func (r *Region) ContainsByEnd(key []byte) bool {
|
|||
// Store contains a kv process's address.
|
||||
type Store struct {
|
||||
addr string // loaded store address
|
||||
peerAddr string // TiFlash Proxy use peerAddr
|
||||
saddr string // loaded store status address
|
||||
storeID uint64 // store's id
|
||||
state uint64 // unsafe store storeState
|
||||
|
|
@ -2235,6 +2248,7 @@ func (s *Store) initResolve(bo *retry.Backoffer, c *RegionCache) (addr string, e
|
|||
return "", errors.Errorf("empty store(%d) address", s.storeID)
|
||||
}
|
||||
s.addr = addr
|
||||
s.peerAddr = store.GetPeerAddress()
|
||||
s.saddr = store.GetStatusAddress()
|
||||
s.storeType = tikvrpc.GetStoreTypeByMeta(store)
|
||||
s.labels = store.GetLabels()
|
||||
|
|
@ -2281,7 +2295,7 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
|
|||
storeType := tikvrpc.GetStoreTypeByMeta(store)
|
||||
addr = store.GetAddress()
|
||||
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
|
||||
newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
|
||||
newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
|
||||
c.storeMu.Lock()
|
||||
c.storeMu.stores[newStore.storeID] = newStore
|
||||
c.storeMu.Unlock()
|
||||
|
|
@ -2503,6 +2517,11 @@ func (s *Store) GetAddr() string {
|
|||
return s.addr
|
||||
}
|
||||
|
||||
// GetPeerAddr returns the peer address of the store
|
||||
func (s *Store) GetPeerAddr() string {
|
||||
return s.peerAddr
|
||||
}
|
||||
|
||||
func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
|
|
|
|||
|
|
@ -53,14 +53,14 @@ var _ cluster.Cluster = &Cluster{}
|
|||
|
||||
// Cluster simulates a TiKV cluster. It focuses on management and the change of
|
||||
// meta data. A Cluster mainly includes following 3 kinds of meta data:
|
||||
// 1) Region: A Region is a fragment of TiKV's data whose range is [start, end).
|
||||
// The data of a Region is duplicated to multiple Peers and distributed in
|
||||
// multiple Stores.
|
||||
// 2) Peer: A Peer is a replica of a Region's data. All peers of a Region form
|
||||
// a group, each group elects a Leader to provide services.
|
||||
// 3) Store: A Store is a storage/service node. Try to think it as a TiKV server
|
||||
// process. Only the store with request's Region's leader Peer could respond
|
||||
// to client's request.
|
||||
// 1. Region: A Region is a fragment of TiKV's data whose range is [start, end).
|
||||
// The data of a Region is duplicated to multiple Peers and distributed in
|
||||
// multiple Stores.
|
||||
// 2. Peer: A Peer is a replica of a Region's data. All peers of a Region form
|
||||
// a group, each group elects a Leader to provide services.
|
||||
// 3. Store: A Store is a storage/service node. Try to think it as a TiKV server
|
||||
// process. Only the store with request's Region's leader Peer could respond
|
||||
// to client's request.
|
||||
type Cluster struct {
|
||||
sync.RWMutex
|
||||
id uint64
|
||||
|
|
@ -224,7 +224,7 @@ func (c *Cluster) AddStore(storeID uint64, addr string, labels ...*metapb.StoreL
|
|||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
c.stores[storeID] = newStore(storeID, addr, labels...)
|
||||
c.stores[storeID] = newStore(storeID, addr, addr, labels...)
|
||||
}
|
||||
|
||||
// RemoveStore removes a Store from the cluster.
|
||||
|
|
@ -248,7 +248,15 @@ func (c *Cluster) MarkTombstone(storeID uint64) {
|
|||
func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string, labels ...*metapb.StoreLabel) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.stores[storeID] = newStore(storeID, addr, labels...)
|
||||
c.stores[storeID] = newStore(storeID, addr, addr, labels...)
|
||||
}
|
||||
|
||||
// UpdateStorePeerAddr updates store peer address for cluster.
|
||||
func (c *Cluster) UpdateStorePeerAddr(storeID uint64, peerAddr string, labels ...*metapb.StoreLabel) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
addr := c.stores[storeID].meta.Address
|
||||
c.stores[storeID] = newStore(storeID, addr, peerAddr, labels...)
|
||||
}
|
||||
|
||||
// GetRegion returns a Region's meta and leader ID.
|
||||
|
|
@ -691,12 +699,13 @@ type Store struct {
|
|||
cancel bool // return context.Cancelled error when cancel is true.
|
||||
}
|
||||
|
||||
func newStore(storeID uint64, addr string, labels ...*metapb.StoreLabel) *Store {
|
||||
func newStore(storeID uint64, addr string, peerAddr string, labels ...*metapb.StoreLabel) *Store {
|
||||
return &Store{
|
||||
meta: &metapb.Store{
|
||||
Id: storeID,
|
||||
Address: addr,
|
||||
Labels: labels,
|
||||
Id: storeID,
|
||||
Address: addr,
|
||||
PeerAddress: peerAddr,
|
||||
Labels: labels,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -521,13 +521,16 @@ func (s *KVStore) safeTSUpdater() {
|
|||
}
|
||||
|
||||
func (s *KVStore) updateSafeTS(ctx context.Context) {
|
||||
stores := s.regionCache.GetStoresByType(tikvrpc.TiKV)
|
||||
stores := s.regionCache.GetAllStores()
|
||||
tikvClient := s.GetTiKVClient()
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(len(stores))
|
||||
for _, store := range stores {
|
||||
storeID := store.StoreID()
|
||||
storeAddr := store.GetAddr()
|
||||
if store.IsTiFlash() {
|
||||
storeAddr = store.GetPeerAddr()
|
||||
}
|
||||
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
|
||||
defer wg.Done()
|
||||
resp, err := tikvClient.SendRequest(ctx, storeAddr, tikvrpc.NewRequest(tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{KeyRange: &kvrpcpb.KeyRange{
|
||||
|
|
|
|||
|
|
@ -0,0 +1,124 @@
|
|||
// Copyright 2022 TiKV Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tikv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
"github.com/tikv/client-go/v2/testutils"
|
||||
"github.com/tikv/client-go/v2/tikvrpc"
|
||||
)
|
||||
|
||||
func TestKV(t *testing.T) {
|
||||
suite.Run(t, new(testKVSuite))
|
||||
}
|
||||
|
||||
type testKVSuite struct {
|
||||
suite.Suite
|
||||
store *KVStore
|
||||
cluster *mocktikv.Cluster
|
||||
tikvStoreID uint64
|
||||
tiflashStoreID uint64
|
||||
tiflashPeerStoreID uint64
|
||||
}
|
||||
|
||||
func (s *testKVSuite) SetupTest() {
|
||||
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
|
||||
s.Require().Nil(err)
|
||||
testutils.BootstrapWithSingleStore(cluster)
|
||||
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
|
||||
s.Require().Nil(err)
|
||||
|
||||
s.store = store
|
||||
s.cluster = cluster
|
||||
|
||||
storeIDs, _, _, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 2)
|
||||
s.tikvStoreID = storeIDs[0]
|
||||
s.tiflashStoreID = storeIDs[1]
|
||||
tiflashPeerAddrID := cluster.AllocIDs(1)
|
||||
s.tiflashPeerStoreID = tiflashPeerAddrID[0]
|
||||
|
||||
s.cluster.UpdateStorePeerAddr(s.tiflashStoreID, s.storeAddr(s.tiflashPeerStoreID), &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
|
||||
s.store.regionCache.SetRegionCacheStore(s.tikvStoreID, s.storeAddr(s.tikvStoreID), s.storeAddr(s.tikvStoreID), tikvrpc.TiKV, 1, nil)
|
||||
var labels []*metapb.StoreLabel
|
||||
labels = append(labels, &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
|
||||
s.store.regionCache.SetRegionCacheStore(s.tiflashStoreID, s.storeAddr(s.tiflashStoreID), s.storeAddr(s.tiflashPeerStoreID), tikvrpc.TiFlash, 1, labels)
|
||||
|
||||
}
|
||||
|
||||
func (s *testKVSuite) TearDownTest() {
|
||||
s.Require().Nil(s.store.Close())
|
||||
}
|
||||
|
||||
func (s *testKVSuite) storeAddr(id uint64) string {
|
||||
return fmt.Sprintf("store%d", id)
|
||||
}
|
||||
|
||||
type storeSafeTsMockClient struct {
|
||||
Client
|
||||
requestCount int32
|
||||
testSuite *testKVSuite
|
||||
}
|
||||
|
||||
func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
|
||||
if req.Type != tikvrpc.CmdStoreSafeTS {
|
||||
return c.Client.SendRequest(ctx, addr, req, timeout)
|
||||
}
|
||||
atomic.AddInt32(&c.requestCount, 1)
|
||||
resp := &tikvrpc.Response{}
|
||||
if addr == c.testSuite.storeAddr(c.testSuite.tiflashPeerStoreID) {
|
||||
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 80}
|
||||
} else {
|
||||
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 100}
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *storeSafeTsMockClient) Close() error {
|
||||
return c.Client.Close()
|
||||
}
|
||||
|
||||
func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
|
||||
return c.Client.CloseAddr(addr)
|
||||
}
|
||||
|
||||
func (s *testKVSuite) TestMinSafeTs() {
|
||||
mockClient := storeSafeTsMockClient{
|
||||
Client: s.store.GetTiKVClient(),
|
||||
testSuite: s,
|
||||
}
|
||||
s.store.SetTiKVClient(&mockClient)
|
||||
|
||||
// wait for updateMinSafeTS
|
||||
var retryCount int
|
||||
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 80 {
|
||||
time.Sleep(2 * time.Second)
|
||||
if retryCount > 5 {
|
||||
break
|
||||
}
|
||||
retryCount++
|
||||
}
|
||||
s.Require().GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(2))
|
||||
s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
|
||||
}
|
||||
|
|
@ -101,7 +101,7 @@ func (s StoreProbe) SaveSafePoint(v uint64) error {
|
|||
|
||||
// SetRegionCacheStore is used to set a store in region cache, for testing only
|
||||
func (s StoreProbe) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
|
||||
s.regionCache.SetRegionCacheStore(id, storeType, state, labels)
|
||||
s.regionCache.SetRegionCacheStore(id, "", "", storeType, state, labels)
|
||||
}
|
||||
|
||||
// SetSafeTS is used to set safeTS for the store with `storeID`
|
||||
|
|
|
|||
Loading…
Reference in New Issue