diff --git a/tikv/kv.go b/tikv/kv.go index fbdd65d8..e1cab15b 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -585,10 +585,12 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { wg := &sync.WaitGroup{} wg.Add(len(stores)) // Try to get the minimum resolved timestamp of the store from PD. - var err error - var storeMinResolvedTSs map[uint64]uint64 + var ( + err error + storeMinResolvedTSs map[uint64]uint64 + ) + storeIDs := make([]string, len(stores)) if s.pdHttpClient != nil { - storeIDs := make([]string, len(stores)) for i, store := range stores { storeIDs[i] = strconv.FormatUint(store.StoreID(), 10) } @@ -599,17 +601,16 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { } } - for _, store := range stores { + for i, store := range stores { storeID := store.StoreID() storeAddr := store.GetAddr() if store.IsTiFlash() { storeAddr = store.GetPeerAddr() } - go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) { + go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string, storeIDStr string) { defer wg.Done() var safeTS uint64 - storeIDStr := strconv.FormatUint(storeID, 10) // If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV. if storeMinResolvedTSs == nil || storeMinResolvedTSs[storeID] == 0 || err != nil { resp, err := tikvClient.SendRequest( @@ -645,7 +646,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", storeIDStr).Inc() safeTSTime := oracle.GetTimeFromTS(safeTS) metrics.TiKVMinSafeTSGapSeconds.WithLabelValues(storeIDStr).Set(time.Since(safeTSTime).Seconds()) - }(ctx, wg, storeID, storeAddr) + }(ctx, wg, storeID, storeAddr, storeIDs[i]) } txnScopeMap := make(map[string][]uint64) diff --git a/util/pd.go b/util/pd.go index 68ac8c3e..a2eb2cb8 100644 --- a/util/pd.go +++ b/util/pd.go @@ -128,15 +128,13 @@ func (p *PDHTTPClient) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs } } } - } else { + } else if tmp, ok := val.(int); ok { // Should be val.(uint64) but failpoint doesn't support that. - if tmp, ok := val.(int); ok { - // ci's store id is 1, we can change it if we have more stores. - // but for pool ci it's no need to do that :( - d.StoresMinResolvedTS = make(map[uint64]uint64) - d.StoresMinResolvedTS[1] = uint64(tmp) - logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(tmp))) - } + // ci's store id is 1, we can change it if we have more stores. + // but for pool ci it's no need to do that :( + d.StoresMinResolvedTS = make(map[uint64]uint64) + d.StoresMinResolvedTS[1] = uint64(tmp) + logutil.BgLogger().Info("inject min resolved ts", zap.Uint64("ts", uint64(tmp))) } }