min-safe-ts: fix MinSafeTS might be set to MaxUint64 permanently (#994)

* fix

Signed-off-by: husharp <jinhao.hu@pingcap.com>

* refine test

Signed-off-by: husharp <jinhao.hu@pingcap.com>

* address comment

Signed-off-by: husharp <jinhao.hu@pingcap.com>

* refine test

Signed-off-by: husharp <jinhao.hu@pingcap.com>

---------

Signed-off-by: husharp <jinhao.hu@pingcap.com>
This commit is contained in:
Hu# 2023-10-09 10:08:55 +08:00 committed by GitHub
parent 18a8414120
commit 652de4de2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 80 additions and 46 deletions

View File

@ -12,31 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/prewrite_test.go
//
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv_test
import (
"context"
"fmt"
"math"
"strings"
"sync/atomic"
"testing"
@ -71,6 +52,7 @@ func (s *apiTestSuite) SetupTest() {
pdClient, err := pd.NewClient(addrs, pd.SecurityOption{})
s.Require().NoError(err)
rpcClient := tikv.NewRPCClient()
s.Require().NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
// Set PD HTTP client.
store, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0, tikv.WithPDHTTPClient(nil, addrs))
s.store = store
@ -85,6 +67,18 @@ func (s *apiTestSuite) storeAddr(id uint64) string {
type storeSafeTsMockClient struct {
tikv.Client
requestCount int32
kvSafeTS uint64
}
func newStoreSafeTsMockClient(client tikv.Client) storeSafeTsMockClient {
return storeSafeTsMockClient{
Client: client,
kvSafeTS: 150, // Set a default value.
}
}
func (c *storeSafeTsMockClient) SetKVSafeTS(ts uint64) {
c.kvSafeTS = ts
}
func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
@ -92,9 +86,9 @@ func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, re
return c.Client.SendRequest(ctx, addr, req, timeout)
}
atomic.AddInt32(&c.requestCount, 1)
resp := &tikvrpc.Response{}
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 150}
return resp, nil
return &tikvrpc.Response{
Resp: &kvrpcpb.StoreSafeTSResponse{SafeTs: c.kvSafeTS},
}, nil
}
func (c *storeSafeTsMockClient) Close() error {
@ -108,10 +102,8 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
util.EnableFailpoints()
require := s.Require()
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
defer func() {
require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
}()
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
// Set DC label for store 1.
// Mock Cluster-level min resolved ts failed.
@ -131,10 +123,6 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
// Try to get the minimum resolved timestamp of the stores from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(dcLabel) != 100 {
time.Sleep(100 * time.Millisecond)
@ -145,22 +133,16 @@ func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
}
require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount))
require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel))
require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}
func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the cluster from PD.
require := s.Require()
require.NoError(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
defer func() {
require.NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
}()
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(100 * time.Millisecond)
@ -176,9 +158,6 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
// Set DC label for store 1.
// Mock PD server not support get min resolved ts by stores.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
defer func() {
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}()
dcLabel := "testDC"
restore := config.UpdateGlobal(func(conf *config.Config) {
conf.TxnScope = dcLabel
@ -193,7 +172,6 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
}
storeID := uint64(1)
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
// Try to get the minimum resolved timestamp of the store from TiKV.
retryCount = 0
for s.store.GetMinSafeTS(dcLabel) != 150 {
@ -206,10 +184,64 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}
func (s *apiTestSuite) TestInitClusterMinResolvedTSZero() {
util.EnableFailpoints()
require := s.Require()
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
s.store.SetTiKVClient(&mockClient)
// Make sure the store's min resolved ts is not initialized.
mockClient.SetKVSafeTS(0)
// Try to get the minimum resolved timestamp of the cluster from TiKV.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != math.MaxUint64 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
// Make sure the store's min resolved ts is not initialized.
require.Equal(uint64(math.MaxUint64), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
// Try to get the minimum resolved timestamp of the cluster from PD.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) == math.MaxUint64 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
// Make sure the store's min resolved ts is not regarded as MaxUint64.
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
// Fallback to KV Request when PD server not support get min resolved ts.
require.NoError(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
mockClient.SetKVSafeTS(150)
retryCount = 0
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
time.Sleep(100 * time.Millisecond)
if retryCount > 5 {
break
}
retryCount++
}
// Make sure the minSafeTS can advance.
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
require.NoError(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}
func (s *apiTestSuite) TearDownTest() {
if s.store != nil {
s.Require().Nil(s.store.Close())
}
s.Require().NoError(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
}

View File

@ -690,7 +690,9 @@ func (s *KVStore) updateGlobalTxnScopeTSFromPD(ctx context.Context) bool {
} else if clusterMinSafeTS != 0 {
// Update ts and metrics.
preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope)
if preClusterMinSafeTS > clusterMinSafeTS {
// If preClusterMinSafeTS is maxUint64, it means that the min safe ts has not been initialized.
// related to https://github.com/tikv/client-go/issues/991
if preClusterMinSafeTS != math.MaxUint64 && preClusterMinSafeTS > clusterMinSafeTS {
skipSafeTSUpdateCounter.Inc()
preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS)
clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds())