mirror of https://github.com/tikv/client-go.git
fix issue that store's liveness may incorrectly marked as unreachable when the store restarts with label changed (#1407)
* add test Signed-off-by: crazycs520 <crazycs520@gmail.com> * fix Signed-off-by: crazycs520 <crazycs520@gmail.com> * fix ci Signed-off-by: crazycs520 <crazycs520@gmail.com> --------- Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
parent
eec8198343
commit
cd64e24de8
|
|
@ -767,7 +767,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*
|
|||
|
||||
needCheckStores = c.stores.filter(needCheckStores, needCheck)
|
||||
for _, store := range needCheckStores {
|
||||
_, err := store.reResolve(c.stores)
|
||||
_, err := store.reResolve(c.stores, c.bg)
|
||||
tikverr.Log(err)
|
||||
}
|
||||
return needCheckStores
|
||||
|
|
@ -1020,7 +1020,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID,
|
|||
return nil, nil
|
||||
}
|
||||
if store.getResolveState() == needCheck {
|
||||
_, err := store.reResolve(c.stores)
|
||||
_, err := store.reResolve(c.stores, c.bg)
|
||||
tikverr.Log(err)
|
||||
}
|
||||
regionStore.workTiFlashIdx.Store(int32(accessIdx))
|
||||
|
|
|
|||
|
|
@ -2833,3 +2833,44 @@ func (s *testRegionCacheSuite) TestScanRegionsWithGaps() {
|
|||
})
|
||||
s.Equal(batchScanRegionRes, regions)
|
||||
}
|
||||
|
||||
func (s *testRegionCacheSuite) TestIssue1401() {
|
||||
// init region cache
|
||||
s.cache.LocateKey(s.bo, []byte("a"))
|
||||
|
||||
store1, _ := s.cache.stores.get(s.store1)
|
||||
s.Require().NotNil(store1)
|
||||
s.Require().Equal(resolved, store1.getResolveState())
|
||||
// change store1 label.
|
||||
labels := store1.labels
|
||||
labels = append(labels, &metapb.StoreLabel{Key: "host", Value: "0.0.0.0:20161"})
|
||||
s.cluster.UpdateStoreAddr(store1.storeID, store1.addr, labels...)
|
||||
|
||||
// mark the store is unreachable and need check.
|
||||
atomic.StoreUint32(&store1.livenessState, uint32(unreachable))
|
||||
store1.setResolveState(needCheck)
|
||||
|
||||
// setup mock liveness func
|
||||
s.cache.stores.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState {
|
||||
return reachable
|
||||
})
|
||||
|
||||
// start health check loop
|
||||
startHealthCheckLoop(s.cache.bg, s.cache.stores, store1, unreachable, time.Second*30)
|
||||
|
||||
// mock asyncCheckAndResolveLoop worker to check and resolve store.
|
||||
s.cache.checkAndResolve(nil, func(s *Store) bool {
|
||||
return s.getResolveState() == needCheck
|
||||
})
|
||||
|
||||
// assert that the old store should be deleted.
|
||||
s.Eventually(func() bool {
|
||||
return store1.getResolveState() == deleted
|
||||
}, 3*time.Second, time.Second)
|
||||
// assert the new store should be added and it should be resolved and reachable.
|
||||
newStore1, _ := s.cache.stores.get(s.store1)
|
||||
s.Eventually(func() bool {
|
||||
return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable
|
||||
}, 3*time.Second, time.Second)
|
||||
s.Require().True(isStoreContainLabel(newStore1.labels, "host", "0.0.0.0:20161"))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -436,7 +436,7 @@ func (s *Store) initResolve(bo *retry.Backoffer, c storeCache) (addr string, err
|
|||
|
||||
// reResolve try to resolve addr for store that need check. Returns false if the region is in tombstone state or is
|
||||
// deleted.
|
||||
func (s *Store) reResolve(c storeCache) (bool, error) {
|
||||
func (s *Store) reResolve(c storeCache, scheduler *bgRunner) (bool, error) {
|
||||
var addr string
|
||||
store, err := c.fetchStore(context.Background(), s.storeID)
|
||||
if err != nil {
|
||||
|
|
@ -475,12 +475,23 @@ func (s *Store) reResolve(c storeCache) (bool, error) {
|
|||
store.GetLabels(),
|
||||
)
|
||||
newStore.livenessState = atomic.LoadUint32(&s.livenessState)
|
||||
newStore.unreachableSince = s.unreachableSince
|
||||
if newStore.getLivenessState() != reachable {
|
||||
newStore.unreachableSince = s.unreachableSince
|
||||
startHealthCheckLoop(scheduler, c, newStore, newStore.getLivenessState(), storeReResolveInterval)
|
||||
}
|
||||
if s.addr == addr {
|
||||
newStore.healthStatus = s.healthStatus
|
||||
}
|
||||
c.put(newStore)
|
||||
s.setResolveState(deleted)
|
||||
logutil.BgLogger().Info("store address or labels changed, add new store and mark old store deleted",
|
||||
zap.Uint64("store", s.storeID),
|
||||
zap.String("old-addr", s.addr),
|
||||
zap.Any("old-labels", s.labels),
|
||||
zap.String("old-liveness", s.getLivenessState().String()),
|
||||
zap.String("new-addr", newStore.addr),
|
||||
zap.Any("new-labels", newStore.labels),
|
||||
zap.String("new-liveness", newStore.getLivenessState().String()))
|
||||
return false, nil
|
||||
}
|
||||
s.changeResolveStateTo(needCheck, resolved)
|
||||
|
|
@ -580,6 +591,8 @@ func (s *Store) getLivenessState() livenessState {
|
|||
return livenessState(atomic.LoadUint32(&s.livenessState))
|
||||
}
|
||||
|
||||
var storeReResolveInterval = 30 * time.Second
|
||||
|
||||
func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoffer, scheduler *bgRunner, c storeCache) (liveness livenessState) {
|
||||
liveness = requestLiveness(bo.GetCtx(), s, c)
|
||||
if liveness == reachable {
|
||||
|
|
@ -595,7 +608,7 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff
|
|||
// It may be already started by another thread.
|
||||
if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) {
|
||||
s.unreachableSince = time.Now()
|
||||
reResolveInterval := 30 * time.Second
|
||||
reResolveInterval := storeReResolveInterval
|
||||
if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil {
|
||||
if dur, err := time.ParseDuration(val.(string)); err == nil {
|
||||
reResolveInterval = dur
|
||||
|
|
@ -613,29 +626,19 @@ func startHealthCheckLoop(scheduler *bgRunner, c storeCache, s *Store, liveness
|
|||
lastCheckPDTime := time.Now()
|
||||
|
||||
scheduler.schedule(func(ctx context.Context, t time.Time) bool {
|
||||
if s.getResolveState() == deleted {
|
||||
logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String()))
|
||||
return true
|
||||
}
|
||||
if t.Sub(lastCheckPDTime) > reResolveInterval {
|
||||
lastCheckPDTime = t
|
||||
|
||||
valid, err := s.reResolve(c)
|
||||
valid, err := s.reResolve(c, scheduler)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err))
|
||||
} else if !valid {
|
||||
if s.getResolveState() != deleted {
|
||||
logutil.BgLogger().Warn("[health check] store was still unhealthy at the end of health check loop",
|
||||
zap.Uint64("storeID", s.storeID),
|
||||
zap.String("state", s.getResolveState().String()),
|
||||
zap.String("liveness", s.getLivenessState().String()))
|
||||
return true
|
||||
}
|
||||
// if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve).
|
||||
newStore, _ := c.get(s.storeID)
|
||||
logutil.BgLogger().Info("[health check] store meta changed",
|
||||
zap.Uint64("storeID", s.storeID),
|
||||
zap.String("oldAddr", s.addr),
|
||||
zap.String("oldLabels", fmt.Sprintf("%v", s.labels)),
|
||||
zap.String("newAddr", newStore.addr),
|
||||
zap.String("newLabels", fmt.Sprintf("%v", newStore.labels)))
|
||||
s = newStore
|
||||
logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String()))
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue