mirror of https://github.com/tikv/client-go.git
refine test
Signed-off-by: husharp <jinhao.hu@pingcap.com>
This commit is contained in:
parent
b61256bbc3
commit
ae859e2450
|
|
@ -105,48 +105,57 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
|
|||
return c.Client.CloseAddr(addr)
|
||||
}
|
||||
|
||||
func (s *apiTestSuite) TestGetClusterMinResolvedTS() {
|
||||
func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
|
||||
util.EnableFailpoints()
|
||||
// Try to get the minimum resolved timestamp of the cluster from PD.
|
||||
require := s.Require()
|
||||
require.Nil(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
|
||||
defer func() {
|
||||
require.Nil(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
|
||||
}()
|
||||
|
||||
// Set DC label for store 1.
|
||||
// Mock Cluster-level min resolved ts failed.
|
||||
dcLabel := "testDC"
|
||||
restore := config.UpdateGlobal(func(conf *config.Config) {
|
||||
conf.TxnScope = dcLabel
|
||||
})
|
||||
defer restore()
|
||||
|
||||
labels := []*metapb.StoreLabel{
|
||||
{
|
||||
Key: tikv.DCLabelKey,
|
||||
Value: dcLabel,
|
||||
},
|
||||
}
|
||||
storeID := uint64(1)
|
||||
s.store.GetRegionCache().SetRegionCacheStore(storeID, s.storeAddr(storeID), s.storeAddr(storeID), tikvrpc.TiKV, 1, labels)
|
||||
// Try to get the minimum resolved timestamp of the stores from PD.
|
||||
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
|
||||
mockClient := storeSafeTsMockClient{
|
||||
Client: s.store.GetTiKVClient(),
|
||||
}
|
||||
s.store.SetTiKVClient(&mockClient)
|
||||
var retryCount int
|
||||
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
|
||||
time.Sleep(2 * time.Second)
|
||||
for s.store.GetMinSafeTS(dcLabel) != 100 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if retryCount > 5 {
|
||||
break
|
||||
}
|
||||
retryCount++
|
||||
}
|
||||
require.Equal(int32(0), atomic.LoadInt32(&mockClient.requestCount))
|
||||
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
|
||||
require.Equal(uint64(100), s.store.GetMinSafeTS(dcLabel))
|
||||
require.Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
|
||||
|
||||
// Try to get the minimum resolved timestamp of the cluster from TiKV.
|
||||
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
|
||||
defer func() {
|
||||
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
|
||||
}()
|
||||
retryCount = 0
|
||||
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 150 {
|
||||
time.Sleep(2 * time.Second)
|
||||
if retryCount > 5 {
|
||||
break
|
||||
}
|
||||
retryCount++
|
||||
}
|
||||
require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
|
||||
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
|
||||
}
|
||||
|
||||
func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
|
||||
util.EnableFailpoints()
|
||||
// Try to get the minimum resolved timestamp of the cluster from PD.
|
||||
require := s.Require()
|
||||
require.Nil(failpoint.Enable("tikvclient/mockFastSafeTSUpdater", `return()`))
|
||||
defer func() {
|
||||
require.Nil(failpoint.Disable("tikvclient/mockFastSafeTSUpdater"))
|
||||
}()
|
||||
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
|
||||
mockClient := storeSafeTsMockClient{
|
||||
Client: s.store.GetTiKVClient(),
|
||||
|
|
@ -154,7 +163,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
|
|||
s.store.SetTiKVClient(&mockClient)
|
||||
var retryCount int
|
||||
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
|
||||
time.Sleep(2 * time.Second)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if retryCount > 5 {
|
||||
break
|
||||
}
|
||||
|
|
@ -162,11 +171,14 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
|
|||
}
|
||||
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
|
||||
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
|
||||
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
|
||||
|
||||
// Set DC label for store 1.
|
||||
// Mock PD server not support get min resolved ts by stores.
|
||||
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(0)`))
|
||||
defer func() {
|
||||
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
|
||||
}()
|
||||
|
||||
// Set DC label for store 1.
|
||||
dcLabel := "testDC"
|
||||
restore := config.UpdateGlobal(func(conf *config.Config) {
|
||||
conf.TxnScope = dcLabel
|
||||
|
|
@ -185,7 +197,7 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
|
|||
// Try to get the minimum resolved timestamp of the store from TiKV.
|
||||
retryCount = 0
|
||||
for s.store.GetMinSafeTS(dcLabel) != 150 {
|
||||
time.Sleep(2 * time.Second)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
if retryCount > 5 {
|
||||
break
|
||||
}
|
||||
|
|
|
|||
|
|
@ -565,6 +565,9 @@ func (s *KVStore) updateMinSafeTS(txnScope string, storeIDs []uint64) {
|
|||
func (s *KVStore) safeTSUpdater() {
|
||||
defer s.wg.Done()
|
||||
t := time.NewTicker(safeTSUpdateInterval)
|
||||
if _, e := util.EvalFailpoint("mockFastSafeTSUpdater"); e == nil {
|
||||
t.Reset(time.Millisecond * 100)
|
||||
}
|
||||
defer t.Stop()
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC)
|
||||
|
|
|
|||
Loading…
Reference in New Issue