// Copyright 2021 TiKV Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // NOTE: The code in this file is based on code from the // TiDB project, licensed under the Apache License v 2.0 // // https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/locate/region_request_test.go // // Copyright 2017 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package locate import ( "context" "fmt" "math" "math/rand" "net" "strconv" "sync" "sync/atomic" "testing" "time" "unsafe" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/disaggregated" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/internal/apicodec" "github.com/tikv/client-go/v2/internal/client" "github.com/tikv/client-go/v2/internal/client/mockserver" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/oracle/oracles" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util/async" pd "github.com/tikv/pd/client" pderr "github.com/tikv/pd/client/errs" "google.golang.org/grpc" ) func TestRegionRequestToSingleStore(t *testing.T) { suite.Run(t, new(testRegionRequestToSingleStoreSuite)) } type testRegionRequestToSingleStoreSuite struct { suite.Suite cluster *mocktikv.Cluster store uint64 peer uint64 region uint64 pdCli pd.Client cache *RegionCache bo *retry.Backoffer regionRequestSender *RegionRequestSender mvccStore mocktikv.MVCCStore } func (s *testRegionRequestToSingleStoreSuite) SetupTest() { s.mvccStore = mocktikv.MustNewMVCCStore() s.cluster = mocktikv.NewCluster(s.mvccStore) s.store, s.peer, s.region = mocktikv.BootstrapWithSingleStore(s.cluster) s.pdCli = &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)} s.cache = NewRegionCache(s.pdCli) s.bo = retry.NewNoopBackoff(context.Background()) client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil) s.regionRequestSender = NewRegionRequestSender(s.cache, client, oracle.NoopReadTSValidator{}) s.NoError(failpoint.Enable("tikvclient/doNotRecoverStoreHealthCheckPanic", "return")) } func (s *testRegionRequestToSingleStoreSuite) TearDownTest() { s.cache.Close() s.mvccStore.Close() s.NoError(failpoint.Disable("tikvclient/doNotRecoverStoreHealthCheckPanic")) } type fnClient struct { fn func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) closedAddr string closedVer uint64 } func (f *fnClient) Close() error { return nil } func (f *fnClient) CloseAddr(addr string) error { return f.CloseAddrVer(addr, math.MaxUint64) } func (f *fnClient) CloseAddrVer(addr string, ver uint64) error { f.closedAddr = addr f.closedVer = ver return nil } func (f *fnClient) SetEventListener(listener client.ClientEventListener) {} func (f *fnClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { tikvrpc.AttachContext(req, req.Context) return f.fn(ctx, addr, req, timeout) } func (f *fnClient) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) { go func() { tikvrpc.AttachContext(req, req.Context) cb.Schedule(f.fn(ctx, addr, req, 0)) }() } func (s *testRegionRequestToSingleStoreSuite) TestOnRegionError() { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), }) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) // test stale command retry. test := func() { oc := s.regionRequestSender.client defer func() { s.regionRequestSender.client = oc }() s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { staleResp := &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ RegionError: &errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}, }} return staleResp, nil }} bo := retry.NewBackofferWithVars(context.Background(), 5, nil) resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp) regionErr, _ := resp.GetRegionError() s.NotNil(regionErr) } s.Run("Default", test) failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`) defer failpoint.Disable("tikvclient/useSendReqAsync") s.Run("AsyncAPI", test) } func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailByResourceGroupThrottled() { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), }) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) // test ErrClientResourceGroupThrottled handled by regionRequestSender test := func() { oc := s.regionRequestSender.client defer func() { s.regionRequestSender.client = oc }() storeOld, _ := s.regionRequestSender.regionCache.stores.get(1) epoch := storeOld.epoch s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { return nil, pderr.ErrClientResourceGroupThrottled }} bo := retry.NewBackofferWithVars(context.Background(), 5, nil) _, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second) s.NotNil(err) storeNew, _ := s.regionRequestSender.regionCache.stores.get(1) // not mark the store need be refill, then the epoch should not be changed. s.Equal(epoch, storeNew.epoch) // no rpc error if the error is ErrClientResourceGroupThrottled s.Nil(s.regionRequestSender.rpcError) } s.Run("Default", test) failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`) defer failpoint.Disable("tikvclient/useSendReqAsync") s.Run("AsyncAPI", test) } func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart() { s.testOnSendFailedWithStoreRestart() } func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestartUsingAsyncAPI() { failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`) defer failpoint.Disable("tikvclient/useSendReqAsync") s.testOnSendFailedWithStoreRestart() } func (s *testRegionRequestToSingleStoreSuite) testOnSendFailedWithStoreRestart() { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), }) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp.Resp) s.Nil(s.regionRequestSender.rpcError) // stop store. s.cluster.StopStore(s.store) _, _, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) s.NotNil(err) // The RPC error shouldn't be nil since it failed to sent the request. s.NotNil(s.regionRequestSender.rpcError) // start store. s.cluster.StartStore(s.store) // locate region again is needed // since last request on the region failed and region's info had been cleared. region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) s.NotNil(s.regionRequestSender.rpcError) resp, _, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp.Resp) } func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOne() { s.testOnSendFailedWithCloseKnownStoreThenUseNewOne() } func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOneUsingAsyncAPI() { failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`) defer failpoint.Disable("tikvclient/useSendReqAsync") s.testOnSendFailedWithCloseKnownStoreThenUseNewOne() } func (s *testRegionRequestToSingleStoreSuite) testOnSendFailedWithCloseKnownStoreThenUseNewOne() { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), }) // add new store2 and make store2 as leader. store2 := s.cluster.AllocID() peer2 := s.cluster.AllocID() s.cluster.AddStore(store2, fmt.Sprintf("store%d", store2)) s.cluster.AddPeer(s.region, store2, peer2) s.cluster.ChangeLeader(s.region, peer2) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp.Resp) // stop store2 and make store1 as new leader. s.cluster.StopStore(store2) s.cluster.ChangeLeader(s.region, s.peer) // send to store2 fail and send to new leader store1. bo2 := retry.NewBackofferWithVars(context.Background(), 100, nil) resp, _, err = s.regionRequestSender.SendReq(bo2, req, region.Region, time.Second) s.Nil(err) regionErr, err := resp.GetRegionError() s.Nil(err) s.Nil(regionErr) s.NotNil(resp.Resp) } func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCancelled() { s.testOnSendFailedWithCancelled() } func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCancelledUsingAsyncAPI() { failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`) defer failpoint.Disable("tikvclient/useSendReqAsync") s.testOnSendFailedWithCancelled() } func (s *testRegionRequestToSingleStoreSuite) testOnSendFailedWithCancelled() { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), }) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp.Resp) // set store to cancel state. s.cluster.CancelStore(s.store) // locate region again is needed // since last request on the region failed and region's info had been cleared. _, _, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) s.NotNil(err) s.Equal(errors.Cause(err), context.Canceled) // set store to normal state. s.cluster.UnCancelStore(s.store) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) resp, _, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp.Resp) } func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionWhenCtxCanceled() { s.testNoReloadRegionWhenCtxCanceled() } func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionWhenCtxCanceledUsingAsyncAPI() { failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`) defer failpoint.Disable("tikvclient/useSendReqAsync") s.testNoReloadRegionWhenCtxCanceled() } func (s *testRegionRequestToSingleStoreSuite) testNoReloadRegionWhenCtxCanceled() { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), }) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) sender := s.regionRequestSender bo, cancel := s.bo.Fork() cancel() // Call SendKVReq with a canceled context. _, _, err = sender.SendReq(bo, req, region.Region, time.Second) // Check this kind of error won't cause region cache drop. s.Equal(errors.Cause(err), context.Canceled) r, expired := sender.regionCache.searchCachedRegionByID(s.region) s.False(expired) s.NotNil(r) } func (s *testRegionRequestToSingleStoreSuite) TestSendReqCtx() { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), }) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) resp, ctx, _, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV) s.Nil(err) s.NotNil(resp.Resp) s.NotNil(ctx) req.ReplicaRead = true resp, ctx, _, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV) s.Nil(err) s.NotNil(resp.Resp) s.NotNil(ctx) } func (s *testRegionRequestToSingleStoreSuite) TestSendReqAsync() { reachable.injectConstantLiveness(s.regionRequestSender.regionCache.stores) ctx := context.Background() rl := async.NewRunLoop() s.Run("Basic", func() { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), }) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) complete := false s.regionRequestSender.SendReqAsync(s.bo, req, region.Region, time.Second, async.NewCallback(rl, func(resp *tikvrpc.ResponseExt, err error) { s.Nil(err) s.NotNil(resp.Resp) s.NotEmpty(resp.Addr) complete = true })) for !complete { _, err := rl.Exec(ctx) s.Require().NoError(err) } }) s.Run("StoreLimit", func() { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), }) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) store := s.cache.stores.getOrInsertDefault(s.store) defer func(storeLimit int64, tokenCount int64) { kv.StoreLimit.Store(storeLimit) store.tokenCount.Store(tokenCount) }(kv.StoreLimit.Load(), store.tokenCount.Load()) kv.StoreLimit.Store(100) store.tokenCount.Store(100) complete := false s.regionRequestSender.SendReqAsync(s.bo, req, region.Region, time.Second, async.NewCallback(rl, func(resp *tikvrpc.ResponseExt, err error) { s.Nil(resp) s.NotNil(err) e, ok := errors.Cause(err).(*tikverr.ErrTokenLimit) s.True(ok) s.Equal(s.store, e.StoreID) complete = true })) for !complete { _, err := rl.Exec(ctx) s.Require().NoError(err) } }) s.Run("RPCCancel", func() { req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), }) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) defer func(ctx context.Context, cli client.Client) { s.bo.SetCtx(ctx) s.regionRequestSender.client = cli }(s.bo.GetCtx(), s.regionRequestSender.client) var once sync.Once rpcCanceller := NewRPCanceller() s.bo.SetCtx(context.WithValue(s.bo.GetCtx(), RPCCancellerCtxKey{}, rpcCanceller)) s.regionRequestSender.client = &fnClient{ fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { once.Do(func() { rpcCanceller.CancelAll() }) return nil, context.Canceled }, } complete := false s.regionRequestSender.SendReqAsync(s.bo, req, region.Region, time.Second, async.NewCallback(rl, func(resp *tikvrpc.ResponseExt, err error) { s.Nil(resp) s.ErrorIs(err, context.Canceled) complete = true })) for !complete { _, err := rl.Exec(ctx) s.Require().NoError(err) } }) s.Run("Timeout", func() { req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: []byte("key"), Version: math.MaxUint64, }) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) defer func(cli client.Client) { s.regionRequestSender.client = cli }(s.regionRequestSender.client) s.regionRequestSender.client = &fnClient{ fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { <-ctx.Done() return nil, ctx.Err() }, } complete := false s.regionRequestSender.SendReqAsync(s.bo, req, region.Region, 100*time.Millisecond, async.NewCallback(rl, func(resp *tikvrpc.ResponseExt, err error) { s.Nil(err) s.NotNil(resp) regionErr, err := resp.GetRegionError() s.Nil(err) s.True(retry.IsFakeRegionError(regionErr)) complete = true })) for !complete { _, err := rl.Exec(ctx) s.Require().NoError(err) } }) } // cancelContextClient wraps rpcClient and always cancels context before sending requests. type cancelContextClient struct { client.Client redirectAddr string } func (c *cancelContextClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { childCtx, cancel := context.WithCancel(ctx) cancel() return c.Client.SendRequest(childCtx, c.redirectAddr, req, timeout) } // mockTikvGrpcServer mock a tikv gprc server for testing. type mockTikvGrpcServer struct{} var _ tikvpb.TikvServer = &mockTikvGrpcServer{} // KvGet commands with mvcc/txn supported. func (s *mockTikvGrpcServer) KvGet(context.Context, *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvScan(context.Context, *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvPrewrite(context.Context, *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvCommit(context.Context, *kvrpcpb.CommitRequest) (*kvrpcpb.CommitResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvImport(context.Context, *kvrpcpb.ImportRequest) (*kvrpcpb.ImportResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvCleanup(context.Context, *kvrpcpb.CleanupRequest) (*kvrpcpb.CleanupResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvBatchGet(context.Context, *kvrpcpb.BatchGetRequest) (*kvrpcpb.BatchGetResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvBatchRollback(context.Context, *kvrpcpb.BatchRollbackRequest) (*kvrpcpb.BatchRollbackResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvScanLock(context.Context, *kvrpcpb.ScanLockRequest) (*kvrpcpb.ScanLockResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvResolveLock(context.Context, *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvPessimisticLock(context.Context, *kvrpcpb.PessimisticLockRequest) (*kvrpcpb.PessimisticLockResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KVPessimisticRollback(context.Context, *kvrpcpb.PessimisticRollbackRequest) (*kvrpcpb.PessimisticRollbackResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvCheckTxnStatus(ctx context.Context, in *kvrpcpb.CheckTxnStatusRequest) (*kvrpcpb.CheckTxnStatusResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvCheckSecondaryLocks(ctx context.Context, in *kvrpcpb.CheckSecondaryLocksRequest) (*kvrpcpb.CheckSecondaryLocksResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvTxnHeartBeat(ctx context.Context, in *kvrpcpb.TxnHeartBeatRequest) (*kvrpcpb.TxnHeartBeatResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvGC(context.Context, *kvrpcpb.GCRequest) (*kvrpcpb.GCResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvDeleteRange(context.Context, *kvrpcpb.DeleteRangeRequest) (*kvrpcpb.DeleteRangeResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RawGet(context.Context, *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RawBatchGet(context.Context, *kvrpcpb.RawBatchGetRequest) (*kvrpcpb.RawBatchGetResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RawPut(context.Context, *kvrpcpb.RawPutRequest) (*kvrpcpb.RawPutResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RawBatchPut(context.Context, *kvrpcpb.RawBatchPutRequest) (*kvrpcpb.RawBatchPutResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RawDelete(context.Context, *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RawBatchDelete(context.Context, *kvrpcpb.RawBatchDeleteRequest) (*kvrpcpb.RawBatchDeleteResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RawScan(context.Context, *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RawDeleteRange(context.Context, *kvrpcpb.RawDeleteRangeRequest) (*kvrpcpb.RawDeleteRangeResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RawBatchScan(context.Context, *kvrpcpb.RawBatchScanRequest) (*kvrpcpb.RawBatchScanResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RawGetKeyTTL(context.Context, *kvrpcpb.RawGetKeyTTLRequest) (*kvrpcpb.RawGetKeyTTLResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) UnsafeDestroyRange(context.Context, *kvrpcpb.UnsafeDestroyRangeRequest) (*kvrpcpb.UnsafeDestroyRangeResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RegisterLockObserver(context.Context, *kvrpcpb.RegisterLockObserverRequest) (*kvrpcpb.RegisterLockObserverResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) CheckLockObserver(context.Context, *kvrpcpb.CheckLockObserverRequest) (*kvrpcpb.CheckLockObserverResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RemoveLockObserver(context.Context, *kvrpcpb.RemoveLockObserverRequest) (*kvrpcpb.RemoveLockObserverResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) PhysicalScanLock(context.Context, *kvrpcpb.PhysicalScanLockRequest) (*kvrpcpb.PhysicalScanLockResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) Coprocessor(context.Context, *coprocessor.Request) (*coprocessor.Response, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) BatchCoprocessor(*coprocessor.BatchRequest, tikvpb.Tikv_BatchCoprocessorServer) error { return errors.New("unreachable") } func (s *mockTikvGrpcServer) DelegateCoprocessor(context.Context, *coprocessor.DelegateRequest) (*coprocessor.DelegateResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RawCoprocessor(context.Context, *kvrpcpb.RawCoprocessorRequest) (*kvrpcpb.RawCoprocessorResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) DispatchMPPTask(context.Context, *mpp.DispatchTaskRequest) (*mpp.DispatchTaskResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) IsAlive(context.Context, *mpp.IsAliveRequest) (*mpp.IsAliveResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) ReportMPPTaskStatus(context.Context, *mpp.ReportTaskStatusRequest) (*mpp.ReportTaskStatusResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) EstablishMPPConnection(*mpp.EstablishMPPConnectionRequest, tikvpb.Tikv_EstablishMPPConnectionServer) error { return errors.New("unreachable") } func (s *mockTikvGrpcServer) CancelMPPTask(context.Context, *mpp.CancelTaskRequest) (*mpp.CancelTaskResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) Raft(tikvpb.Tikv_RaftServer) error { return errors.New("unreachable") } func (s *mockTikvGrpcServer) BatchRaft(tikvpb.Tikv_BatchRaftServer) error { return errors.New("unreachable") } func (s *mockTikvGrpcServer) Snapshot(tikvpb.Tikv_SnapshotServer) error { return errors.New("unreachable") } func (s *mockTikvGrpcServer) MvccGetByKey(context.Context, *kvrpcpb.MvccGetByKeyRequest) (*kvrpcpb.MvccGetByKeyResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) MvccGetByStartTs(context.Context, *kvrpcpb.MvccGetByStartTsRequest) (*kvrpcpb.MvccGetByStartTsResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) SplitRegion(context.Context, *kvrpcpb.SplitRegionRequest) (*kvrpcpb.SplitRegionResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) CoprocessorStream(*coprocessor.Request, tikvpb.Tikv_CoprocessorStreamServer) error { return errors.New("unreachable") } func (s *mockTikvGrpcServer) BatchCommands(tikvpb.Tikv_BatchCommandsServer) error { return errors.New("unreachable") } func (s *mockTikvGrpcServer) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpcpb.ReadIndexResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRequest) (*kvrpcpb.CheckLeaderResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) GetStoreSafeTS(context.Context, *kvrpcpb.StoreSafeTSRequest) (*kvrpcpb.StoreSafeTSResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RawCompareAndSwap(context.Context, *kvrpcpb.RawCASRequest) (*kvrpcpb.RawCASResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) GetLockWaitInfo(context.Context, *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) RawChecksum(context.Context, *kvrpcpb.RawChecksumRequest) (*kvrpcpb.RawChecksumResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) Compact(ctx context.Context, request *kvrpcpb.CompactRequest) (*kvrpcpb.CompactResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) GetLockWaitHistory(ctx context.Context, request *kvrpcpb.GetLockWaitHistoryRequest) (*kvrpcpb.GetLockWaitHistoryResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) TryAddLock(context.Context, *disaggregated.TryAddLockRequest) (*disaggregated.TryAddLockResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) TryMarkDelete(context.Context, *disaggregated.TryMarkDeleteRequest) (*disaggregated.TryMarkDeleteResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvFlashbackToVersion(context.Context, *kvrpcpb.FlashbackToVersionRequest) (*kvrpcpb.FlashbackToVersionResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvPrepareFlashbackToVersion(context.Context, *kvrpcpb.PrepareFlashbackToVersionRequest) (*kvrpcpb.PrepareFlashbackToVersionResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) EstablishDisaggTask(context.Context, *disaggregated.EstablishDisaggTaskRequest) (*disaggregated.EstablishDisaggTaskResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) FetchDisaggPages(*disaggregated.FetchDisaggPagesRequest, tikvpb.Tikv_FetchDisaggPagesServer) error { return errors.New("unreachable") } func (s *mockTikvGrpcServer) TabletSnapshot(_ tikvpb.Tikv_TabletSnapshotServer) error { return errors.New("unreachable") } func (s *mockTikvGrpcServer) GetTiFlashSystemTable(context.Context, *kvrpcpb.TiFlashSystemTableRequest) (*kvrpcpb.TiFlashSystemTableResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) GetDisaggConfig(context.Context, *disaggregated.GetDisaggConfigRequest) (*disaggregated.GetDisaggConfigResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) CancelDisaggTask(context.Context, *disaggregated.CancelDisaggTaskRequest) (*disaggregated.CancelDisaggTaskResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvFlush(context.Context, *kvrpcpb.FlushRequest) (*kvrpcpb.FlushResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) KvBufferBatchGet(context.Context, *kvrpcpb.BufferBatchGetRequest) (*kvrpcpb.BufferBatchGetResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) GetHealthFeedback(ctx context.Context, request *kvrpcpb.GetHealthFeedbackRequest) (*kvrpcpb.GetHealthFeedbackResponse, error) { return nil, errors.New("unreachable") } func (s *mockTikvGrpcServer) BroadcastTxnStatus(ctx context.Context, request *kvrpcpb.BroadcastTxnStatusRequest) (*kvrpcpb.BroadcastTxnStatusResponse, error) { return nil, errors.New("unreachable") } func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled() { // prepare a mock tikv grpc server addr := "localhost:56341" lis, err := net.Listen("tcp", addr) s.Nil(err) server := grpc.NewServer() tikvpb.RegisterTikvServer(server, &mockTikvGrpcServer{}) wg := &sync.WaitGroup{} wg.Add(1) go func() { server.Serve(lis) wg.Done() }() cli := client.NewRPCClient() sender := NewRegionRequestSender(s.cache, cli, oracle.NoopReadTSValidator{}) req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{ Key: []byte("key"), Value: []byte("value"), }) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) bo, cancel := s.bo.Fork() cancel() _, _, err = sender.SendReq(bo, req, region.Region, 3*time.Second) s.Equal(errors.Cause(err), context.Canceled) r, expired := sender.regionCache.searchCachedRegionByID(s.region) s.False(expired) s.NotNil(r) // Just for covering error code = codes.Canceled. client1 := &cancelContextClient{ Client: client.NewRPCClient(), redirectAddr: addr, } sender = NewRegionRequestSender(s.cache, client1, oracle.NoopReadTSValidator{}) sender.SendReq(s.bo, req, region.Region, 3*time.Second) // cleanup server.Stop() wg.Wait() cli.Close() client1.Close() } func (s *testRegionRequestToSingleStoreSuite) TestOnMaxTimestampNotSyncedError() { req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) // test retry for max timestamp not synced func() { oc := s.regionRequestSender.client defer func() { s.regionRequestSender.client = oc }() count := 0 s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { count++ var resp *tikvrpc.Response if count < 3 { resp = &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{ RegionError: &errorpb.Error{MaxTimestampNotSynced: &errorpb.MaxTimestampNotSynced{}}, }} } else { resp = &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{}} } return resp, nil }} bo := retry.NewBackofferWithVars(context.Background(), 5, nil) resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp) }() } func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) // test kv epochNotMatch return empty regions s.cache.OnRegionEpochNotMatch(s.bo, &RPCContext{Region: region.Region, Store: &Store{storeID: s.store}}, []*metapb.Region{}) s.Nil(err) r, expired := s.cache.searchCachedRegionByID(s.region) s.True(expired) s.NotNil(r) // refill cache region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) // test kv load new region with new start-key and new epoch v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} st := newUninitializedStore(s.store) s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) s.Equal(region.Region.confVer, v2) s.Equal(region.Region.ver, region.Region.ver) v3 := region.Region.confVer + 1 r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}} st = newUninitializedStore(s.store) s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) s.Equal(region.Region.confVer, region.Region.confVer) s.Equal(region.Region.ver, v3) } func (s *testRegionRequestToSingleStoreSuite) TestCloseConnectionOnStoreNotMatch() { req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: []byte("key"), }) region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) oc := s.regionRequestSender.client defer func() { s.regionRequestSender.client = oc }() var target string client := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { target = addr resp := &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ RegionError: &errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}, }} return resp, nil }} s.regionRequestSender.client = client bo := retry.NewBackofferWithVars(context.Background(), 5, nil) resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp) regionErr, _ := resp.GetRegionError() s.NotNil(regionErr) s.Equal(target, client.closedAddr) var expected uint64 = math.MaxUint64 s.Equal(expected, client.closedVer) } func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchClient() { config.UpdateGlobal(func(conf *config.Config) { conf.TiKVClient.MaxBatchSize = 0 })() server, port := mockserver.StartMockTikvService() s.True(port > 0) server.SetMetaChecker(func(ctx context.Context) error { return context.DeadlineExceeded }) rpcClient := client.NewRPCClient() s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { return rpcClient.SendRequest(ctx, server.Addr(), req, timeout) }} defer func() { rpcClient.Close() server.Stop() }() bo := retry.NewBackofferWithVars(context.Background(), 2000, nil) region, err := s.cache.LocateRegionByID(bo, s.region) s.Nil(err) s.NotNil(region) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a"), Version: 1}) // send a probe request to make sure the mock server is ready. s.regionRequestSender.SendReq(retry.NewNoopBackoff(context.Background()), req, region.Region, time.Second) resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Millisecond*10) s.Nil(err) s.NotNil(resp) regionErr, _ := resp.GetRegionError() s.True(retry.IsFakeRegionError(regionErr)) s.Equal(0, bo.GetTotalBackoffTimes()) // use kv read timeout will do fast retry, so backoff times should be 0. } func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { // This test should use `go test -race` to run. config.UpdateGlobal(func(conf *config.Config) { conf.TiKVClient.MaxBatchSize = 128 })() server, port := mockserver.StartMockTikvService() s.True(port > 0) rpcClient := client.NewRPCClient() fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { return rpcClient.SendRequest(ctx, server.Addr(), req, timeout) }} defer func() { rpcClient.Close() server.Stop() }() var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < 100; j++ { ctx, cancel := context.WithCancel(context.Background()) bo := retry.NewBackofferWithVars(ctx, int(client.ReadTimeoutShort.Milliseconds()), nil) region, err := s.cache.LocateRegionByID(bo, s.region) s.Nil(err) s.NotNil(region) go func() { // mock for kill query execution or timeout. time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)+1)) cancel() }() req := tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{Data: []byte("a"), StartTs: 1}) regionRequestSender := NewRegionRequestSender(s.cache, fnClient, oracle.NoopReadTSValidator{}) reachable.injectConstantLiveness(regionRequestSender.regionCache.stores) regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort) } }() } wg.Wait() // batchSendLoop should not panic. s.Equal(atomic.LoadInt64(&client.BatchSendLoopPanicCounter), int64(0)) } func (s *testRegionRequestToSingleStoreSuite) TestClusterIDInReq() { server, port := mockserver.StartMockTikvService() s.True(port > 0) rpcClient := client.NewRPCClient() s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { s.True(req.ClusterId > 0) return rpcClient.SendRequest(ctx, server.Addr(), req, timeout) }} defer func() { rpcClient.Close() server.Stop() }() bo := retry.NewBackofferWithVars(context.Background(), 2000, nil) region, err := s.cache.LocateRegionByID(bo, s.region) s.Nil(err) s.NotNil(region) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a"), Version: 1}) // send a probe request to make sure the mock server is ready. s.regionRequestSender.SendReq(retry.NewNoopBackoff(context.Background()), req, region.Region, time.Second) resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Millisecond*10) s.Nil(err) s.NotNil(resp) regionErr, _ := resp.GetRegionError() s.Nil(regionErr) } type emptyClient struct { client.Client } func (s *testRegionRequestToSingleStoreSuite) TestClientExt() { var cli client.Client = client.NewRPCClient() sender := NewRegionRequestSender(s.cache, cli, oracle.NoopReadTSValidator{}) s.NotNil(sender.client) s.NotNil(sender.getClientExt()) cli.Close() cli = &emptyClient{} sender = NewRegionRequestSender(s.cache, cli, oracle.NoopReadTSValidator{}) s.NotNil(sender.client) s.Nil(sender.getClientExt()) } func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestSenderString() { sender := NewRegionRequestSender(s.cache, &fnClient{}, oracle.NoopReadTSValidator{}) loc, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) // invalid region cache before sending request. s.cache.InvalidateCachedRegion(loc.Region) sender.SendReqCtx(s.bo, tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}), loc.Region, time.Second, tikvrpc.TiKV) s.Equal("{rpcError:, replicaSelector: }", sender.String()) } func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestStats() { reqStats := NewRegionRequestRuntimeStats() reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Second) reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Millisecond) reqStats.RecordRPCRuntimeStats(tikvrpc.CmdCop, time.Second*2) reqStats.RecordRPCRuntimeStats(tikvrpc.CmdCop, time.Millisecond*200) reqStats.RecordRPCErrorStats("context canceled") reqStats.RecordRPCErrorStats("context canceled") reqStats.RecordRPCErrorStats("region_not_found") reqStats.Merge(NewRegionRequestRuntimeStats()) reqStats2 := NewRegionRequestRuntimeStats() reqStats2.Merge(reqStats.Clone()) expecteds := []string{ // Since map iteration order is random, we need to check all possible orders. "Get:{num_rpc:2, total_time:1s},Cop:{num_rpc:2, total_time:2.2s}, rpc_errors:{region_not_found:1, context canceled:2}", "Get:{num_rpc:2, total_time:1s},Cop:{num_rpc:2, total_time:2.2s}, rpc_errors:{context canceled:2, region_not_found:1}", "Cop:{num_rpc:2, total_time:2.2s},Get:{num_rpc:2, total_time:1s}, rpc_errors:{context canceled:2, region_not_found:1}", "Cop:{num_rpc:2, total_time:2.2s},Get:{num_rpc:2, total_time:1s}, rpc_errors:{region_not_found:1, context canceled:2}", } s.Contains(expecteds, reqStats.String()) s.Contains(expecteds, reqStats2.String()) for i := 0; i < 50; i++ { reqStats.RecordRPCErrorStats("err_" + strconv.Itoa(i)) } s.Regexp("{.*err_.*:1.*, other_error:36}", reqStats.RequestErrorStats.String()) s.Regexp(".*num_rpc.*total_time.*, rpc_errors:{.*err.*, other_error:36}", reqStats.String()) access := &ReplicaAccessStats{} access.recordReplicaAccessInfo(true, false, 1, 2, "data_not_ready") access.recordReplicaAccessInfo(false, false, 3, 4, "not_leader") access.recordReplicaAccessInfo(false, true, 5, 6, "server_is_Busy") s.Equal("{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}", access.String()) for i := 0; i < 20; i++ { access.recordReplicaAccessInfo(false, false, 5+uint64(i)%2, 6, "server_is_Busy") } expecteds = []string{ // Since map iteration order is random, we need to check all possible orders. "{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}, {peer:5, store:6, err:server_is_Busy}, {peer:6, store:6, err:server_is_Busy}, overflow_count:{{peer:5, error_stats:{server_is_Busy:9}}, {peer:6, error_stats:{server_is_Busy:9}}}", "{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}, {peer:5, store:6, err:server_is_Busy}, {peer:6, store:6, err:server_is_Busy}, overflow_count:{{peer:6, error_stats:{server_is_Busy:9}}, {peer:5, error_stats:{server_is_Busy:9}}}", } s.Contains(expecteds, access.String()) } func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestValidateReadTS() { oracles.EnableTSValidation.Store(true) defer oracles.EnableTSValidation.Store(false) o, err := oracles.NewPdOracle(s.pdCli, &oracles.PDOracleOptions{ UpdateInterval: time.Second * 2, }) s.NoError(err) s.regionRequestSender.readTSValidator = o defer o.Close() testImpl := func(ts func() uint64, staleRead bool, expectedErrorType error) { region, err := s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) s.NotNil(region) req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{ Key: []byte("k"), Version: ts(), }) req.StaleRead = staleRead _, _, _, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV) if expectedErrorType == nil { s.NoError(err) } else { s.Error(err) s.IsType(err, expectedErrorType) } } getTS := func() uint64 { ts, err := o.GetTimestamp(s.bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) s.NoError(err) return ts } addTS := func(ts uint64, diff time.Duration) uint64 { return oracle.ComposeTS(oracle.GetPhysical(oracle.GetTimeFromTS(ts).Add(diff)), oracle.ExtractLogical(ts)) } testImpl(getTS, false, nil) testImpl(getTS, true, nil) testImpl(func() uint64 { return addTS(getTS(), -time.Minute) }, false, nil) testImpl(func() uint64 { return addTS(getTS(), -time.Minute) }, true, nil) testImpl(func() uint64 { return addTS(getTS(), +time.Minute) }, false, oracle.ErrFutureTSRead{}) testImpl(func() uint64 { return addTS(getTS(), +time.Minute) }, true, oracle.ErrFutureTSRead{}) testImpl(func() uint64 { return math.MaxUint64 }, false, nil) testImpl(func() uint64 { return math.MaxUint64 }, true, oracle.ErrLatestStaleRead{}) } type noCauseError struct { error } func (_ noCauseError) Cause() error { return nil } func TestGetErrMsg(t *testing.T) { err := noCauseError{error: errors.New("no cause err")} require.Equal(t, nil, errors.Cause(err)) require.Panicsf(t, func() { _ = errors.Cause(err).Error() }, "should panic") require.Equal(t, "no cause err", getErrMsg(err)) }