mirror of https://github.com/tikv/client-go.git
fix the issue that health check may set liveness wrongly (#1127)
* fix the issue that health check may set liveness wrongly Signed-off-by: zyguan <zhongyangguan@gmail.com> * fix lint issue Signed-off-by: zyguan <zhongyangguan@gmail.com> * fix rawkv ut Signed-off-by: zyguan <zhongyangguan@gmail.com> * fix data race Signed-off-by: zyguan <zhongyangguan@gmail.com> * use getStore instead of accessing storeMu directly Signed-off-by: zyguan <zhongyangguan@gmail.com> * make TestAccessFollowerAfter1TiKVDown stable Signed-off-by: zyguan <zhongyangguan@gmail.com> * make TestBackoffErrorType stable Signed-off-by: zyguan <zhongyangguan@gmail.com> * address comments Signed-off-by: zyguan <zhongyangguan@gmail.com> --------- Signed-off-by: zyguan <zhongyangguan@gmail.com> Co-authored-by: disksing <i@disksing.com>
This commit is contained in:
parent
1af8d0b0ca
commit
e79e8008ce
|
|
@ -75,7 +75,8 @@ func TestBackoffErrorType(t *testing.T) {
|
|||
err = b.Backoff(BoTxnNotFound, errors.New("txn not found"))
|
||||
if err != nil {
|
||||
// Next backoff should return error of backoff that sleeps for longest time.
|
||||
assert.ErrorIs(t, err, BoTxnNotFound.err)
|
||||
cfg, _ := b.longestSleepCfg()
|
||||
assert.ErrorIs(t, err, cfg.err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2618,6 +2618,8 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
|
|||
addr = store.GetAddress()
|
||||
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
|
||||
newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
|
||||
newStore.livenessState = atomic.LoadUint32(&s.livenessState)
|
||||
newStore.unreachableSince = s.unreachableSince
|
||||
if s.addr == addr {
|
||||
newStore.slowScore = s.slowScore
|
||||
}
|
||||
|
|
@ -2783,16 +2785,28 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff
|
|||
// It may be already started by another thread.
|
||||
if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) {
|
||||
s.unreachableSince = time.Now()
|
||||
go s.checkUntilHealth(c)
|
||||
reResolveInterval := 30 * time.Second
|
||||
if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil {
|
||||
if dur, err := time.ParseDuration(val.(string)); err == nil {
|
||||
reResolveInterval = dur
|
||||
}
|
||||
}
|
||||
go s.checkUntilHealth(c, liveness, reResolveInterval)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) checkUntilHealth(c *RegionCache) {
|
||||
defer atomic.StoreUint32(&s.livenessState, uint32(reachable))
|
||||
|
||||
func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResolveInterval time.Duration) {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
if liveness != reachable {
|
||||
logutil.BgLogger().Warn("[health check] store was still not reachable at the end of health check loop",
|
||||
zap.Uint64("storeID", s.storeID),
|
||||
zap.String("state", s.getResolveState().String()),
|
||||
zap.String("liveness", s.getLivenessState().String()))
|
||||
}
|
||||
}()
|
||||
lastCheckPDTime := time.Now()
|
||||
|
||||
for {
|
||||
|
|
@ -2800,25 +2814,34 @@ func (s *Store) checkUntilHealth(c *RegionCache) {
|
|||
case <-c.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if time.Since(lastCheckPDTime) > time.Second*30 {
|
||||
if time.Since(lastCheckPDTime) > reResolveInterval {
|
||||
lastCheckPDTime = time.Now()
|
||||
|
||||
valid, err := s.reResolve(c)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err))
|
||||
} else if !valid {
|
||||
logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr))
|
||||
if s.getResolveState() == deleted {
|
||||
// if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve).
|
||||
newStore, _ := c.getStore(s.storeID)
|
||||
logutil.BgLogger().Info("[health check] store meta changed",
|
||||
zap.Uint64("storeID", s.storeID),
|
||||
zap.String("oldAddr", s.addr),
|
||||
zap.String("oldLabels", fmt.Sprintf("%v", s.labels)),
|
||||
zap.String("newAddr", newStore.addr),
|
||||
zap.String("newLabels", fmt.Sprintf("%v", newStore.labels)))
|
||||
go newStore.checkUntilHealth(c, liveness, reResolveInterval)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
l := s.requestLiveness(c.ctx, c)
|
||||
if l == reachable {
|
||||
liveness = s.requestLiveness(c.ctx, c)
|
||||
atomic.StoreUint32(&s.livenessState, uint32(liveness))
|
||||
if liveness == reachable {
|
||||
logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID))
|
||||
|
||||
return
|
||||
}
|
||||
atomic.StoreUint32(&s.livenessState, uint32(l))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -2826,7 +2849,20 @@ func (s *Store) checkUntilHealth(c *RegionCache) {
|
|||
func (s *Store) requestLiveness(ctx context.Context, tk testingKnobs) (l livenessState) {
|
||||
// It's not convenient to mock liveness in integration tests. Use failpoint to achieve that instead.
|
||||
if val, err := util.EvalFailpoint("injectLiveness"); err == nil {
|
||||
switch val.(string) {
|
||||
liveness := val.(string)
|
||||
if strings.Contains(liveness, " ") {
|
||||
for _, item := range strings.Split(liveness, " ") {
|
||||
kv := strings.Split(item, ":")
|
||||
if len(kv) != 2 {
|
||||
continue
|
||||
}
|
||||
if kv[0] == s.addr {
|
||||
liveness = kv[1]
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
switch liveness {
|
||||
case "unreachable":
|
||||
return unreachable
|
||||
case "reachable":
|
||||
|
|
|
|||
|
|
@ -1793,6 +1793,48 @@ func (s *testRegionCacheSuite) TestSlowScoreStat() {
|
|||
s.True(slowScore.isSlow())
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
|
||||
// init region cache
|
||||
s.cache.LocateKey(s.bo, []byte("a"))
|
||||
|
||||
store1, _ := s.cache.getStore(s.store1)
|
||||
s.Require().NotNil(store1)
|
||||
s.Require().Equal(resolved, store1.getResolveState())
|
||||
|
||||
// setup mock liveness func
|
||||
store1Liveness := uint32(unreachable)
|
||||
s.cache.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState {
|
||||
if s.storeID == store1.storeID {
|
||||
return livenessState(atomic.LoadUint32(&store1Liveness))
|
||||
}
|
||||
return reachable
|
||||
})
|
||||
|
||||
// start health check loop
|
||||
atomic.StoreUint32(&store1.livenessState, store1Liveness)
|
||||
go store1.checkUntilHealth(s.cache, livenessState(store1Liveness), time.Second)
|
||||
|
||||
// update store meta
|
||||
s.cluster.UpdateStoreAddr(store1.storeID, store1.addr+"'", store1.labels...)
|
||||
|
||||
// assert that the old store should be deleted and it's not reachable
|
||||
s.Eventually(func() bool {
|
||||
return store1.getResolveState() == deleted && store1.getLivenessState() != reachable
|
||||
}, 3*time.Second, time.Second)
|
||||
|
||||
// assert that the new store should be added and it's also not reachable
|
||||
newStore1, _ := s.cache.getStore(store1.storeID)
|
||||
s.Require().NotEqual(reachable, newStore1.getLivenessState())
|
||||
|
||||
// recover store1
|
||||
atomic.StoreUint32(&store1Liveness, uint32(reachable))
|
||||
|
||||
// assert that the new store should be reachable
|
||||
s.Eventually(func() bool {
|
||||
return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable
|
||||
}, 3*time.Second, time.Second)
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() {
|
||||
_ = s.cache.refreshRegionIndex(s.bo)
|
||||
r, _ := s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
|
||||
|
|
|
|||
|
|
@ -1160,7 +1160,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestAccessFollowerAfter1TiKVDown()
|
|||
regionStore := region.getStore()
|
||||
leaderAddr = regionStore.stores[regionStore.workTiKVIdx].addr
|
||||
s.NotEqual(leaderAddr, "")
|
||||
for i := 0; i < 10; i++ {
|
||||
for i := 0; i < 30; i++ {
|
||||
bo := retry.NewBackofferWithVars(context.Background(), 100, nil)
|
||||
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, loc.Region, client.ReadTimeoutShort, tikvrpc.TiKV)
|
||||
s.Nil(err)
|
||||
|
|
|
|||
|
|
@ -41,14 +41,17 @@ import (
|
|||
"hash/crc64"
|
||||
"testing"
|
||||
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/tikv/client-go/v2/config/retry"
|
||||
"github.com/tikv/client-go/v2/internal/locate"
|
||||
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
|
||||
"github.com/tikv/client-go/v2/kv"
|
||||
"github.com/tikv/client-go/v2/tikv"
|
||||
)
|
||||
|
||||
func TestRawKV(t *testing.T) {
|
||||
tikv.EnableFailpoints()
|
||||
suite.Run(t, new(testRawkvSuite))
|
||||
}
|
||||
|
||||
|
|
@ -77,9 +80,11 @@ func (s *testRawkvSuite) SetupTest() {
|
|||
s.peer1 = peerIDs[0]
|
||||
s.peer2 = peerIDs[1]
|
||||
s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil)
|
||||
s.Nil(failpoint.Enable("tikvclient/injectReResolveInterval", `return("1s")`))
|
||||
}
|
||||
|
||||
func (s *testRawkvSuite) TearDownTest() {
|
||||
s.Nil(failpoint.Disable("tikvclient/injectReResolveInterval"))
|
||||
s.mvccStore.Close()
|
||||
}
|
||||
|
||||
|
|
@ -110,6 +115,9 @@ func (s *testRawkvSuite) TestReplaceAddrWithNewStore() {
|
|||
s.cluster.ChangeLeader(s.region1, s.peer2)
|
||||
s.cluster.RemovePeer(s.region1, s.peer1)
|
||||
|
||||
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
|
||||
defer failpoint.Disable("tikvclient/injectLiveness")
|
||||
|
||||
getVal, err := client.Get(context.Background(), testKey)
|
||||
|
||||
s.Nil(err)
|
||||
|
|
@ -172,6 +180,9 @@ func (s *testRawkvSuite) TestReplaceNewAddrAndOldOfflineImmediately() {
|
|||
s.cluster.ChangeLeader(s.region1, s.peer2)
|
||||
s.cluster.RemovePeer(s.region1, s.peer1)
|
||||
|
||||
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
|
||||
defer failpoint.Disable("tikvclient/injectLiveness")
|
||||
|
||||
getVal, err := client.Get(context.Background(), testKey)
|
||||
s.Nil(err)
|
||||
s.Equal(getVal, testValue)
|
||||
|
|
@ -200,6 +211,9 @@ func (s *testRawkvSuite) TestReplaceStore() {
|
|||
s.cluster.RemovePeer(s.region1, s.peer1)
|
||||
s.cluster.ChangeLeader(s.region1, peer3)
|
||||
|
||||
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
|
||||
defer failpoint.Disable("tikvclient/injectLiveness")
|
||||
|
||||
err = client.Put(context.Background(), testKey, testValue)
|
||||
s.Nil(err)
|
||||
}
|
||||
|
|
@ -234,6 +248,9 @@ func (s *testRawkvSuite) TestColumnFamilyForClient() {
|
|||
s.cluster.ChangeLeader(s.region1, s.peer2)
|
||||
s.cluster.RemovePeer(s.region1, s.peer1)
|
||||
|
||||
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
|
||||
defer failpoint.Disable("tikvclient/injectLiveness")
|
||||
|
||||
// test get
|
||||
client.SetColumnFamily(cf1)
|
||||
getVal, err := client.Get(context.Background(), testKeyCf1)
|
||||
|
|
@ -303,6 +320,9 @@ func (s *testRawkvSuite) TestColumnFamilyForOptions() {
|
|||
s.cluster.ChangeLeader(s.region1, s.peer2)
|
||||
s.cluster.RemovePeer(s.region1, s.peer1)
|
||||
|
||||
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
|
||||
defer failpoint.Disable("tikvclient/injectLiveness")
|
||||
|
||||
// test get
|
||||
getVal, err := client.Get(context.Background(), keyInCf1, SetColumnFamily(cf1))
|
||||
s.Nil(err)
|
||||
|
|
@ -370,6 +390,9 @@ func (s *testRawkvSuite) TestBatch() {
|
|||
s.cluster.ChangeLeader(s.region1, s.peer2)
|
||||
s.cluster.RemovePeer(s.region1, s.peer1)
|
||||
|
||||
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
|
||||
defer failpoint.Disable("tikvclient/injectLiveness")
|
||||
|
||||
// test BatchGet
|
||||
returnValues, err := client.BatchGet(context.Background(), keys, SetColumnFamily(cf))
|
||||
s.Nil(err)
|
||||
|
|
@ -426,6 +449,9 @@ func (s *testRawkvSuite) TestScan() {
|
|||
s.cluster.ChangeLeader(s.region1, s.peer2)
|
||||
s.cluster.RemovePeer(s.region1, s.peer1)
|
||||
|
||||
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
|
||||
defer failpoint.Disable("tikvclient/injectLiveness")
|
||||
|
||||
// test scan
|
||||
startKey, endKey := []byte("key1"), []byte("keyz")
|
||||
limit := 3
|
||||
|
|
@ -498,6 +524,9 @@ func (s *testRawkvSuite) TestDeleteRange() {
|
|||
s.cluster.ChangeLeader(s.region1, s.peer2)
|
||||
s.cluster.RemovePeer(s.region1, s.peer1)
|
||||
|
||||
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
|
||||
defer failpoint.Disable("tikvclient/injectLiveness")
|
||||
|
||||
// test DeleteRange
|
||||
startKey, endKey := []byte("key3"), []byte(nil)
|
||||
err = client.DeleteRange(context.Background(), startKey, endKey, SetColumnFamily(cf))
|
||||
|
|
@ -535,6 +564,9 @@ func (s *testRawkvSuite) TestCompareAndSwap() {
|
|||
s.cluster.ChangeLeader(s.region1, s.peer2)
|
||||
s.cluster.RemovePeer(s.region1, s.peer1)
|
||||
|
||||
s.Nil(failpoint.Enable("tikvclient/injectLiveness", `return("store1:reachable store2:unreachable")`))
|
||||
defer failpoint.Disable("tikvclient/injectLiveness")
|
||||
|
||||
// test CompareAndSwap for false atomic
|
||||
_, _, err = client.CompareAndSwap(
|
||||
context.Background(),
|
||||
|
|
|
|||
Loading…
Reference in New Issue