From e72c4cd47479e90f72527c7eaa2f09ed39c54dd8 Mon Sep 17 00:00:00 2001 From: zyguan Date: Mon, 26 Feb 2024 11:26:25 +0800 Subject: [PATCH] enhance background job management of region cache (#1171) * enhance background job management of region cache Signed-off-by: zyguan * add some comments Signed-off-by: zyguan * address comments Signed-off-by: zyguan * fix data race in ut Signed-off-by: zyguan * fix data race Signed-off-by: zyguan * refine scheduleWithTrigger Signed-off-by: zyguan * address the comment Signed-off-by: zyguan --------- Signed-off-by: zyguan --- internal/locate/region_cache.go | 501 +++++++++++++++------------ internal/locate/region_cache_test.go | 159 ++++++++- 2 files changed, 430 insertions(+), 230 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index d1b511e2..03a935af 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -508,6 +508,130 @@ func (mu *regionIndexMu) refresh(r []*Region) { type livenessFunc func(ctx context.Context, s *Store) livenessState +// repeat wraps a `func()` as a schedulable fuction for `bgRunner`. +func repeat(f func()) func(context.Context, time.Time) bool { + return func(_ context.Context, _ time.Time) bool { + f() + return false + } +} + +// until wraps a `func() bool` as a schedulable fuction for `bgRunner`. +func until(f func() bool) func(context.Context, time.Time) bool { + return func(_ context.Context, _ time.Time) bool { + return f() + } +} + +type bgRunner struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +func newBackgroundRunner(ctx context.Context) *bgRunner { + ctx, cancel := context.WithCancel(ctx) + return &bgRunner{ + ctx: ctx, + cancel: cancel, + } +} + +func (r *bgRunner) closed() bool { + select { + case <-r.ctx.Done(): + return true + default: + return false + } +} + +func (r *bgRunner) shutdown(wait bool) { + r.cancel() + if wait { + r.wg.Wait() + } +} + +// run calls `f` once in background. +func (r *bgRunner) run(f func(context.Context)) { + if r.closed() { + return + } + r.wg.Add(1) + go func() { + defer r.wg.Done() + f(r.ctx) + }() +} + +// schedule calls `f` every `interval`. +func (r *bgRunner) schedule(f func(context.Context, time.Time) bool, interval time.Duration) { + if r.closed() || interval <= 0 { + return + } + r.wg.Add(1) + go func() { + ticker := time.NewTicker(interval) + defer func() { + r.wg.Done() + ticker.Stop() + }() + for { + select { + case <-r.ctx.Done(): + return + case t := <-ticker.C: + if f(r.ctx, t) { + return + } + } + } + }() +} + +// scheduleWithTrigger likes schedule, but also call `f` when `<-trigger`, in which case the time arg of `f` is zero. +func (r *bgRunner) scheduleWithTrigger(f func(context.Context, time.Time) bool, interval time.Duration, trigger <-chan struct{}) { + if r.closed() || interval <= 0 { + return + } + r.wg.Add(1) + go func() { + ticker := time.NewTicker(interval) + defer func() { + r.wg.Done() + ticker.Stop() + }() + triggerEnabled := trigger != nil + for triggerEnabled { + select { + case <-r.ctx.Done(): + return + case t := <-ticker.C: + if f(r.ctx, t) { + return + } + case _, ok := <-trigger: + if !ok { + triggerEnabled = false + } else if f(r.ctx, time.Time{}) { + return + } + } + } + for { + select { + case <-r.ctx.Done(): + return + case t := <-ticker.C: + if f(r.ctx, t) { + return + } + } + } + }() +} + // RegionCache caches Regions loaded from PD. // All public methods of this struct should be thread-safe, unless explicitly pointed out or the method is for testing // purposes only. @@ -529,10 +653,8 @@ type RegionCache struct { } notifyCheckCh chan struct{} - // Context for background jobs - ctx context.Context - cancelFunc context.CancelFunc - wg sync.WaitGroup + // runner for background jobs + bg *bgRunner testingKnobs struct { // Replace the requestLiveness function for test purpose. Note that in unit tests, if this is not set, @@ -556,12 +678,12 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.tiflashComputeStoreMu.needReload = true c.tiflashComputeStoreMu.stores = make([]*Store, 0) c.notifyCheckCh = make(chan struct{}, 1) - c.ctx, c.cancelFunc = context.WithCancel(context.Background()) - interval := config.GetGlobalConfig().StoresRefreshInterval + c.bg = newBackgroundRunner(context.Background()) + c.enableForwarding = config.GetGlobalConfig().EnableForwarding if config.GetGlobalConfig().EnablePreload { logutil.BgLogger().Info("preload region index start") - if err := c.refreshRegionIndex(retry.NewBackofferWithVars(c.ctx, 20000, nil)); err != nil { + if err := c.refreshRegionIndex(retry.NewBackofferWithVars(c.bg.ctx, 20000, nil)); err != nil { logutil.BgLogger().Error("refresh region index failed", zap.Error(err)) } logutil.BgLogger().Info("preload region index finish") @@ -569,22 +691,35 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.mu = *newRegionIndexMu(nil) } - // TODO(zyguan): refine management of background cron jobs - c.wg.Add(1) - go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second) - c.enableForwarding = config.GetGlobalConfig().EnableForwarding - // Default use 15s as the update inerval. - c.wg.Add(1) - go c.asyncUpdateStoreSlowScore(time.Duration(interval/4) * time.Second) - if config.GetGlobalConfig().RegionsRefreshInterval > 0 { - c.timelyRefreshCache(config.GetGlobalConfig().RegionsRefreshInterval) + var ( + refreshStoreInterval = config.GetGlobalConfig().StoresRefreshInterval + needCheckStores []*Store + ) + c.bg.scheduleWithTrigger(func(ctx context.Context, t time.Time) bool { + // check and resolve normal stores periodically by default. + filter := func(state resolveState) bool { + return state != unresolved && state != tombstone && state != deleted + } + if t.IsZero() { + // check and resolve needCheck stores because it's triggered by a CheckStoreEvent this time. + filter = func(state resolveState) bool { return state == needCheck } + } + needCheckStores = c.checkAndResolve(needCheckStores[:0], func(s *Store) bool { return filter(s.getResolveState()) }) + return false + }, time.Duration(refreshStoreInterval/4)*time.Second, c.getCheckStoreEvents()) + c.bg.schedule(repeat(c.checkAndUpdateStoreSlowScores), time.Duration(refreshStoreInterval/4)*time.Second) + c.bg.schedule(repeat(c.reportStoreReplicaFlows), time.Duration(refreshStoreInterval/2)*time.Second) + if refreshCacheInterval := config.GetGlobalConfig().RegionsRefreshInterval; refreshCacheInterval > 0 { + c.bg.schedule(func(ctx context.Context, _ time.Time) bool { + if err := c.refreshRegionIndex(retry.NewBackofferWithVars(ctx, int(refreshCacheInterval)*1000, nil)); err != nil { + logutil.BgLogger().Error("refresh region cache failed", zap.Error(err)) + } + return false + }, time.Duration(refreshCacheInterval)*time.Second) } else { - // cacheGC is not compatible with timelyRefreshCache - c.wg.Add(1) - go c.cacheGC() + // cache GC is incompatible with cache refresh + c.bg.schedule(c.gcRoundFunc(cleanRegionNumPerRound), cleanCacheInterval) } - c.wg.Add(1) - go c.asyncReportStoreReplicaFlows(time.Duration(interval/2) * time.Second) return c } @@ -595,7 +730,7 @@ func newTestRegionCache() *RegionCache { c.tiflashComputeStoreMu.needReload = true c.tiflashComputeStoreMu.stores = make([]*Store, 0) c.notifyCheckCh = make(chan struct{}, 1) - c.ctx, c.cancelFunc = context.WithCancel(context.Background()) + c.bg = newBackgroundRunner(context.Background()) c.mu = *newRegionIndexMu(nil) return c } @@ -613,42 +748,12 @@ func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldReg // Close releases region cache's resource. func (c *RegionCache) Close() { - c.cancelFunc() - c.wg.Wait() -} - -// asyncCheckAndResolveLoop with -func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) { - ticker := time.NewTicker(interval) - defer func() { - c.wg.Done() - ticker.Stop() - }() - var needCheckStores []*Store - for { - needCheckStores = needCheckStores[:0] - select { - case <-c.ctx.Done(): - return - case <-c.getCheckStoreEvents(): - c.checkAndResolve(needCheckStores, func(s *Store) bool { - return s.getResolveState() == needCheck - }) - case <-ticker.C: - // refresh store to update labels. - c.checkAndResolve(needCheckStores, func(s *Store) bool { - state := s.getResolveState() - // Only valid stores should be reResolved. In fact, it's impossible - // there's a deleted store in the stores map which guaranteed by reReslve(). - return state != unresolved && state != tombstone && state != deleted - }) - } - } + c.bg.shutdown(true) } // checkAndResolve checks and resolve addr of failed stores. // this method isn't thread-safe and only be used by one goroutine. -func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*Store) bool) { +func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*Store) bool) []*Store { defer func() { r := recover() if r != nil { @@ -658,10 +763,12 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(* } }() - for _, store := range c.filterStores(needCheckStores, needCheck) { + needCheckStores = c.filterStores(needCheckStores, needCheck) + for _, store := range needCheckStores { _, err := store.reResolve(c) tikverr.Log(err) } + return needCheckStores } // SetRegionCacheStore is used to set a store in region cache, for testing only @@ -1775,31 +1882,6 @@ func (c *RegionCache) scanRegionsFromCache(bo *retry.Backoffer, startKey, endKey return regions, nil } -func (c *RegionCache) timelyRefreshCache(intervalS uint64) { - if intervalS <= 0 { - return - } - ticker := time.NewTicker(time.Duration(intervalS) * time.Second) - c.wg.Add(1) - go func() { - defer func() { - c.wg.Done() - ticker.Stop() - }() - for { - select { - case <-c.ctx.Done(): - return - case <-ticker.C: - intervalMs := int(1000 * intervalS) - if err := c.refreshRegionIndex(retry.NewBackofferWithVars(c.ctx, intervalMs, nil)); err != nil { - logutil.BgLogger().Error("refresh region cache failed", zap.Error(err)) - } - } - } - }() -} - func (c *RegionCache) refreshRegionIndex(bo *retry.Backoffer) error { totalRegions := make([]*Region, 0) startKey := []byte{} @@ -2169,88 +2251,89 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV const cleanCacheInterval = time.Second const cleanRegionNumPerRound = 50 -// This function is expected to run in a background goroutine. +// gcScanItemHook is only used for testing +var gcScanItemHook = new(atomic.Pointer[func(*btreeItem)]) + +// The returned function is expected to run in a background goroutine. // It keeps iterating over the whole region cache, searching for stale region // info. It runs at cleanCacheInterval and checks only cleanRegionNumPerRound // regions. In this way, the impact of this background goroutine should be // negligible. -func (c *RegionCache) cacheGC() { - ticker := time.NewTicker(cleanCacheInterval) - defer func() { - c.wg.Done() - ticker.Stop() - }() - +func (c *RegionCache) gcRoundFunc(limit int) func(context.Context, time.Time) bool { + if limit < 1 { + limit = 1 + } beginning := newBtreeSearchItem([]byte("")) - iterItem := beginning - expired := make([]*btreeItem, cleanRegionNumPerRound) - remaining := make([]*Region, cleanRegionNumPerRound) - for { - select { - case <-c.ctx.Done(): - return - case <-ticker.C: - count := 0 - expired = expired[:0] - remaining = remaining[:0] + cursor := beginning + expiredItems := make([]*btreeItem, limit) + needCheckRegions := make([]*Region, limit) - // Only RLock when checking TTL to avoid blocking other readers - c.mu.RLock() - ts := time.Now().Unix() - c.mu.sorted.b.AscendGreaterOrEqual(iterItem, func(item *btreeItem) bool { - if count > cleanRegionNumPerRound { - iterItem = item - return false - } - count++ - if item.cachedRegion.isCacheTTLExpired(ts) { - expired = append(expired, item) - } else { - remaining = append(remaining, item.cachedRegion) - } - return true - }) - c.mu.RUnlock() + return func(_ context.Context, t time.Time) bool { + expiredItems = expiredItems[:0] + needCheckRegions = needCheckRegions[:0] + hasMore, count, ts := false, 0, t.Unix() + onScanItem := gcScanItemHook.Load() - // Reach the end of the region cache, start from the beginning - if count <= cleanRegionNumPerRound { - iterItem = beginning + // Only RLock when checking TTL to avoid blocking other readers + c.mu.RLock() + c.mu.sorted.b.AscendGreaterOrEqual(cursor, func(item *btreeItem) bool { + count++ + if count > limit { + cursor = item + hasMore = true + return false } - - // Clean expired regions - if len(expired) > 0 { - c.mu.Lock() - for _, item := range expired { - c.mu.sorted.b.Delete(item) - c.mu.removeVersionFromCache(item.cachedRegion.VerID(), item.cachedRegion.GetID()) - } - c.mu.Unlock() + if onScanItem != nil { + (*onScanItem)(item) } + if item.cachedRegion.isCacheTTLExpired(ts) { + expiredItems = append(expiredItems, item) + } else { + needCheckRegions = append(needCheckRegions, item.cachedRegion) + } + return true + }) + c.mu.RUnlock() - // Check remaining regions and update sync flags - for _, region := range remaining { - syncFlags := region.getSyncFlags() - if syncFlags&needDelayedReloadReady > 0 { - // the region will be reload soon on access - continue - } - if syncFlags&needDelayedReloadPending > 0 { - region.setSyncFlags(needDelayedReloadReady) - // the region will be reload soon on access, no need to check if it needs to be expired - continue - } - if syncFlags&needExpireAfterTTL == 0 { - regionStore := region.getStore() - for i, store := range regionStore.stores { - // if the region has a stale or unreachable store, let it expire after TTL. - if atomic.LoadUint32(&store.epoch) != regionStore.storeEpochs[i] || store.getLivenessState() != reachable { - region.setSyncFlags(needExpireAfterTTL) - break - } + // Reach the end of the region cache, start from the beginning + if !hasMore { + cursor = beginning + } + + // Clean expired regions + if len(expiredItems) > 0 { + c.mu.Lock() + for _, item := range expiredItems { + c.mu.sorted.b.Delete(item) + c.mu.removeVersionFromCache(item.cachedRegion.VerID(), item.cachedRegion.GetID()) + } + c.mu.Unlock() + } + + // Check remaining regions and update sync flags + for _, region := range needCheckRegions { + syncFlags := region.getSyncFlags() + if syncFlags&needDelayedReloadReady > 0 { + // the region will be reload soon on access + continue + } + if syncFlags&needDelayedReloadPending > 0 { + region.setSyncFlags(needDelayedReloadReady) + // the region will be reload soon on access, no need to check if it needs to be expired + continue + } + if syncFlags&needExpireAfterTTL == 0 { + regionStore := region.getStore() + for i, store := range regionStore.stores { + // if the region has a stale or unreachable store, let it expire after TTL. + if atomic.LoadUint32(&store.epoch) != regionStore.storeEpochs[i] || store.getLivenessState() != reachable { + region.setSyncFlags(needExpireAfterTTL) + break } } } } + return false } } @@ -2517,7 +2600,7 @@ type Store struct { loadStats atomic2.Pointer[storeLoadStats] // whether the store is unreachable due to some reason, therefore requests to the store needs to be - // forwarded by other stores. this is also the flag that a checkUntilHealth goroutine is running for this store. + // forwarded by other stores. this is also the flag that a health check loop is running for this store. // this mechanism is currently only applicable for TiKV stores. livenessState uint32 unreachableSince time.Time @@ -2846,59 +2929,49 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff if _, err := util.EvalFailpoint("skipStoreCheckUntilHealth"); err == nil { return } - go s.checkUntilHealth(c, liveness, reResolveInterval) + startHealthCheckLoop(c, s, liveness, reResolveInterval) } return } -func (s *Store) checkUntilHealth(c *RegionCache, liveness livenessState, reResolveInterval time.Duration) { - ticker := time.NewTicker(time.Second) - defer func() { - ticker.Stop() - if liveness != reachable { - logutil.BgLogger().Warn("[health check] store was still not reachable at the end of health check loop", - zap.Uint64("storeID", s.storeID), - zap.String("state", s.getResolveState().String()), - zap.String("liveness", s.getLivenessState().String())) - } - }() +func startHealthCheckLoop(c *RegionCache, s *Store, liveness livenessState, reResolveInterval time.Duration) { lastCheckPDTime := time.Now() - for { - select { - case <-c.ctx.Done(): - return - case <-ticker.C: - if time.Since(lastCheckPDTime) > reResolveInterval { - lastCheckPDTime = time.Now() + c.bg.schedule(func(ctx context.Context, t time.Time) bool { + if t.Sub(lastCheckPDTime) > reResolveInterval { + lastCheckPDTime = t - valid, err := s.reResolve(c) - if err != nil { - logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err)) - } else if !valid { - if s.getResolveState() == deleted { - // if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve). - newStore, _ := c.getStore(s.storeID) - logutil.BgLogger().Info("[health check] store meta changed", - zap.Uint64("storeID", s.storeID), - zap.String("oldAddr", s.addr), - zap.String("oldLabels", fmt.Sprintf("%v", s.labels)), - zap.String("newAddr", newStore.addr), - zap.String("newLabels", fmt.Sprintf("%v", newStore.labels))) - go newStore.checkUntilHealth(c, liveness, reResolveInterval) - } - return + valid, err := s.reResolve(c) + if err != nil { + logutil.BgLogger().Warn("[health check] failed to re-resolve unhealthy store", zap.Error(err)) + } else if !valid { + if s.getResolveState() != deleted { + logutil.BgLogger().Warn("[health check] store was still unhealthy at the end of health check loop", + zap.Uint64("storeID", s.storeID), + zap.String("state", s.getResolveState().String()), + zap.String("liveness", s.getLivenessState().String())) + return true } - } - - liveness = s.requestLiveness(c.ctx, c) - atomic.StoreUint32(&s.livenessState, uint32(liveness)) - if liveness == reachable { - logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) - return + // if the store is deleted, a new store with same id must be inserted (guaranteed by reResolve). + newStore, _ := c.getStore(s.storeID) + logutil.BgLogger().Info("[health check] store meta changed", + zap.Uint64("storeID", s.storeID), + zap.String("oldAddr", s.addr), + zap.String("oldLabels", fmt.Sprintf("%v", s.labels)), + zap.String("newAddr", newStore.addr), + zap.String("newLabels", fmt.Sprintf("%v", newStore.labels))) + s = newStore } } - } + + liveness = s.requestLiveness(ctx, c) + atomic.StoreUint32(&s.livenessState, uint32(liveness)) + if liveness == reachable { + logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) + return true + } + return false + }, time.Second) } func (s *Store) requestLiveness(ctx context.Context, tk testingKnobs) (l livenessState) { @@ -3056,24 +3129,6 @@ func (s *Store) markAlreadySlow() { s.slowScore.markAlreadySlow() } -// asyncUpdateStoreSlowScore updates the slow score of each store periodically. -func (c *RegionCache) asyncUpdateStoreSlowScore(interval time.Duration) { - ticker := time.NewTicker(interval) - defer func() { - c.wg.Done() - ticker.Stop() - }() - for { - select { - case <-c.ctx.Done(): - return - case <-ticker.C: - // update store slowScores - c.checkAndUpdateStoreSlowScores() - } - } -} - // checkAndUpdateStoreSlowScores checks and updates slowScore on each store. func (c *RegionCache) checkAndUpdateStoreSlowScores() { defer func() { @@ -3109,26 +3164,14 @@ func (s *Store) recordReplicaFlowsStats(destType replicaFlowsType) { atomic.AddUint64(&s.replicaFlowsStats[destType], 1) } -// asyncReportStoreReplicaFlows reports the statistics on the related replicaFlowsType. -func (c *RegionCache) asyncReportStoreReplicaFlows(interval time.Duration) { - ticker := time.NewTicker(interval) - defer func() { - c.wg.Done() - ticker.Stop() - }() - for { - select { - case <-c.ctx.Done(): - return - case <-ticker.C: - c.forEachStore(func(store *Store) { - for destType := toLeader; destType < numReplicaFlowsType; destType++ { - metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType))) - store.resetReplicaFlowsStats(destType) - } - }) +// reportStoreReplicaFlows reports the statistics on the related replicaFlowsType. +func (c *RegionCache) reportStoreReplicaFlows() { + c.forEachStore(func(store *Store) { + for destType := toLeader; destType < numReplicaFlowsType; destType++ { + metrics.TiKVPreferLeaderFlowsGauge.WithLabelValues(destType.String(), store.addr).Set(float64(store.getReplicaFlowsStats(destType))) + store.resetReplicaFlowsStats(destType) } - } + }) } func createKVHealthClient(ctx context.Context, addr string) (*grpc.ClientConn, healthpb.HealthClient, error) { diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 2d6d9aab..397b17a6 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -49,6 +49,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config/retry" "github.com/tikv/client-go/v2/internal/apicodec" @@ -70,6 +71,142 @@ func (c *inspectedPDClient) GetRegion(ctx context.Context, key []byte, opts ...p return c.Client.GetRegion(ctx, key, opts...) } +func TestBackgroundRunner(t *testing.T) { + t.Run("ShutdownWait", func(t *testing.T) { + dur := 100 * time.Millisecond + r := newBackgroundRunner(context.Background()) + r.run(func(ctx context.Context) { + time.Sleep(dur) + }) + start := time.Now() + r.shutdown(true) + require.True(t, time.Since(start) >= dur) + }) + + t.Run("ShutdownNoWait", func(t *testing.T) { + dur := 100 * time.Millisecond + done := make(chan struct{}) + r := newBackgroundRunner(context.Background()) + r.run(func(ctx context.Context) { + select { + case <-ctx.Done(): + close(done) + case <-time.After(dur): + require.Fail(t, "run should be canceled by shutdown") + } + }) + r.shutdown(false) + <-done + }) + + t.Run("RunAfterShutdown", func(t *testing.T) { + var called atomic.Bool + r := newBackgroundRunner(context.Background()) + r.shutdown(false) + r.run(func(ctx context.Context) { + called.Store(true) + }) + require.False(t, called.Load()) + r.schedule(until(func() bool { + called.Store(true) + return true + }), time.Second) + require.False(t, called.Load()) + r.scheduleWithTrigger(until(func() bool { + called.Store(true) + return true + }), time.Second, make(chan struct{})) + require.False(t, called.Load()) + }) + + t.Run("Schedule", func(t *testing.T) { + var ( + done = make(chan struct{}) + interval = 20 * time.Millisecond + history = make([]int64, 0, 3) + start = time.Now().UnixMilli() + ) + r := newBackgroundRunner(context.Background()) + r.schedule(func(_ context.Context, t time.Time) bool { + history = append(history, t.UnixMilli()) + if len(history) == 3 { + close(done) + return true + } + return false + }, interval) + <-done + require.Equal(t, 3, len(history)) + for i := range history { + require.LessOrEqual(t, int64(i+1)*interval.Milliseconds(), history[i]-start) + } + + history = history[:0] + start = time.Now().UnixMilli() + r.schedule(func(ctx context.Context, t time.Time) bool { + history = append(history, t.UnixMilli()) + return false + }, interval) + time.Sleep(interval*3 + interval/2) + r.shutdown(true) + require.Equal(t, 3, len(history)) + for i := range history { + require.LessOrEqual(t, int64(i+1)*interval.Milliseconds(), history[i]-start) + } + }) + + t.Run("ScheduleWithTrigger", func(t *testing.T) { + var ( + done = make(chan struct{}) + trigger = make(chan struct{}) + interval = 20 * time.Millisecond + history = make([]int64, 0, 3) + start = time.Now().UnixMilli() + ) + r := newBackgroundRunner(context.Background()) + r.scheduleWithTrigger(func(ctx context.Context, t time.Time) bool { + if t.IsZero() { + history = append(history, -1) + } else { + history = append(history, t.UnixMilli()) + } + if len(history) == 3 { + close(done) + return true + } + return false + }, interval, trigger) + trigger <- struct{}{} + time.Sleep(interval + interval/2) + trigger <- struct{}{} + <-done + require.Equal(t, 3, len(history)) + require.Equal(t, int64(-1), history[0]) + require.Equal(t, int64(-1), history[2]) + require.LessOrEqual(t, int64(1)*interval.Milliseconds(), history[1]-start) + + history = history[:0] + start = time.Now().UnixMilli() + r.scheduleWithTrigger(func(ctx context.Context, t time.Time) bool { + if t.IsZero() { + history = append(history, -1) + } else { + history = append(history, t.UnixMilli()) + } + return false + }, interval, trigger) + trigger <- struct{}{} + trigger <- struct{}{} + close(trigger) + time.Sleep(interval + interval/2) + r.shutdown(true) + require.Equal(t, 3, len(history)) + require.Equal(t, int64(-1), history[0]) + require.Equal(t, int64(-1), history[1]) + require.LessOrEqual(t, int64(1)*interval.Milliseconds(), history[2]-start) + }) +} + func TestRegionCache(t *testing.T) { suite.Run(t, new(testRegionCacheSuite)) } @@ -1815,6 +1952,26 @@ func (s *testRegionCacheSuite) TestBackgroundCacheGC() { loadRegionsToCache(s.cache, regionCnt) s.checkCache(regionCnt) + var ( + gcScanStats = make(map[uint64]int) + gcScanStatsMu sync.Mutex + gcScanStatsFn = func(item *btreeItem) { + gcScanStatsMu.Lock() + gcScanStats[item.cachedRegion.GetID()]++ + gcScanStatsMu.Unlock() + } + ) + gcScanItemHook.Store(&gcScanStatsFn) + + // Check that region items are scanned uniformly. + time.Sleep(cleanCacheInterval*time.Duration(2*regionCnt/cleanRegionNumPerRound) + cleanCacheInterval/2) + gcScanStatsMu.Lock() + s.Equal(regionCnt, len(gcScanStats)) + for _, count := range gcScanStats { + s.Equal(2, count) + } + gcScanStatsMu.Unlock() + // Make parts of the regions stale remaining := 0 s.cache.mu.Lock() @@ -1901,7 +2058,7 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() { // start health check loop atomic.StoreUint32(&store1.livenessState, store1Liveness) - go store1.checkUntilHealth(s.cache, livenessState(store1Liveness), time.Second) + startHealthCheckLoop(s.cache, store1, livenessState(store1Liveness), time.Second) // update store meta s.cluster.UpdateStoreAddr(store1.storeID, store1.addr+"'", store1.labels...)