diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 151de21c..4b3afdfb 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -767,7 +767,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(* needCheckStores = c.stores.filter(needCheckStores, needCheck) for _, store := range needCheckStores { - _, err := store.reResolve(c.stores) + _, err := store.reResolve(c.stores, c.bg) tikverr.Log(err) } return needCheckStores @@ -1020,7 +1020,7 @@ func (c *RegionCache) GetTiFlashRPCContext(bo *retry.Backoffer, id RegionVerID, return nil, nil } if store.getResolveState() == needCheck { - _, err := store.reResolve(c.stores) + _, err := store.reResolve(c.stores, c.bg) tikverr.Log(err) } regionStore.workTiFlashIdx.Store(int32(accessIdx)) diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 888b96d1..7d9ff89b 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -2833,3 +2833,44 @@ func (s *testRegionCacheSuite) TestScanRegionsWithGaps() { }) s.Equal(batchScanRegionRes, regions) } + +func (s *testRegionCacheSuite) TestIssue1401() { + // init region cache + s.cache.LocateKey(s.bo, []byte("a")) + + store1, _ := s.cache.stores.get(s.store1) + s.Require().NotNil(store1) + s.Require().Equal(resolved, store1.getResolveState()) + // change store1 label. + labels := store1.labels + labels = append(labels, &metapb.StoreLabel{Key: "host", Value: "0.0.0.0:20161"}) + s.cluster.UpdateStoreAddr(store1.storeID, store1.addr, labels...) + + // mark the store is unreachable and need check. + atomic.StoreUint32(&store1.livenessState, uint32(unreachable)) + store1.setResolveState(needCheck) + + // setup mock liveness func + s.cache.stores.setMockRequestLiveness(func(ctx context.Context, s *Store) livenessState { + return reachable + }) + + // start health check loop + startHealthCheckLoop(s.cache.bg, s.cache.stores, store1, unreachable, time.Second*30) + + // mock asyncCheckAndResolveLoop worker to check and resolve store. + s.cache.checkAndResolve(nil, func(s *Store) bool { + return s.getResolveState() == needCheck + }) + + // assert that the old store should be deleted. + s.Eventually(func() bool { + return store1.getResolveState() == deleted + }, 3*time.Second, time.Second) + // assert the new store should be added and it should be resolved and reachable. + newStore1, _ := s.cache.stores.get(s.store1) + s.Eventually(func() bool { + return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable + }, 3*time.Second, time.Second) + s.Require().True(isStoreContainLabel(newStore1.labels, "host", "0.0.0.0:20161")) +} diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index 2eba262b..d0bea45a 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -436,7 +436,7 @@ func (s *Store) initResolve(bo *retry.Backoffer, c storeCache) (addr string, err // reResolve try to resolve addr for store that need check. Returns false if the region is in tombstone state or is // deleted. -func (s *Store) reResolve(c storeCache) (bool, error) { +func (s *Store) reResolve(c storeCache, scheduler *bgRunner) (bool, error) { var addr string store, err := c.fetchStore(context.Background(), s.storeID) if err != nil { @@ -475,12 +475,23 @@ func (s *Store) reResolve(c storeCache) (bool, error) { store.GetLabels(), ) newStore.livenessState = atomic.LoadUint32(&s.livenessState) - newStore.unreachableSince = s.unreachableSince + if newStore.getLivenessState() != reachable { + newStore.unreachableSince = s.unreachableSince + startHealthCheckLoop(scheduler, c, newStore, newStore.getLivenessState(), storeReResolveInterval) + } if s.addr == addr { newStore.healthStatus = s.healthStatus } c.put(newStore) s.setResolveState(deleted) + logutil.BgLogger().Info("store address or labels changed, add new store and mark old store deleted", + zap.Uint64("store", s.storeID), + zap.String("old-addr", s.addr), + zap.Any("old-labels", s.labels), + zap.String("old-liveness", s.getLivenessState().String()), + zap.String("new-addr", newStore.addr), + zap.Any("new-labels", newStore.labels), + zap.String("new-liveness", newStore.getLivenessState().String())) return false, nil } s.changeResolveStateTo(needCheck, resolved) @@ -580,6 +591,8 @@ func (s *Store) getLivenessState() livenessState { return livenessState(atomic.LoadUint32(&s.livenessState)) } +var storeReResolveInterval = 30 * time.Second + func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoffer, scheduler *bgRunner, c storeCache) (liveness livenessState) { liveness = requestLiveness(bo.GetCtx(), s, c) if liveness == reachable { @@ -595,7 +608,7 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff // It may be already started by another thread. if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) { s.unreachableSince = time.Now() - reResolveInterval := 30 * time.Second + reResolveInterval := storeReResolveInterval if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil { if dur, err := time.ParseDuration(val.(string)); err == nil { reResolveInterval = dur @@ -613,29 +626,19 @@ func startHealthCheckLoop(scheduler *bgRunner, c storeCache, s *Store, liveness lastCheckPDTime := time.Now() scheduler.schedule(func(ctx context.Context, t time.Time) bool { + if s.getResolveState() == deleted { + logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String())) + return true + } if t.Sub(lastCheckPDTime) > reResolveInterval { lastCheckPDTime = t - valid, err := s.reResolve(c) + valid, err := s.reResolve(c, scheduler) if err != nil { logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err)) } else if !valid { - if s.getResolveState() != deleted { - logutil.BgLogger().Warn("[health check] store was still unhealthy at the end of health check loop", - zap.Uint64("storeID", s.storeID), - zap.String("state", s.getResolveState().String()), - zap.String("liveness", s.getLivenessState().String())) - return true - } - // if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve). - newStore, _ := c.get(s.storeID) - logutil.BgLogger().Info("[health check] store meta changed", - zap.Uint64("storeID", s.storeID), - zap.String("oldAddr", s.addr), - zap.String("oldLabels", fmt.Sprintf("%v", s.labels)), - zap.String("newAddr", newStore.addr), - zap.String("newLabels", fmt.Sprintf("%v", newStore.labels))) - s = newStore + logutil.BgLogger().Info("[health check] store meta deleted, stop checking", zap.Uint64("storeID", s.storeID), zap.String("addr", s.addr), zap.String("state", s.getResolveState().String())) + return true } }