mirror of https://github.com/tikv/client-go.git
1184 lines
44 KiB
Go
1184 lines
44 KiB
Go
// Copyright 2021 TiKV Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// NOTE: The code in this file is based on code from the
|
|
// TiDB project, licensed under the Apache License v 2.0
|
|
//
|
|
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/locate/region_request_test.go
|
|
//
|
|
|
|
// Copyright 2017 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package locate
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"net"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/kvproto/pkg/coprocessor"
|
|
"github.com/pingcap/kvproto/pkg/disaggregated"
|
|
"github.com/pingcap/kvproto/pkg/errorpb"
|
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
|
"github.com/pingcap/kvproto/pkg/metapb"
|
|
"github.com/pingcap/kvproto/pkg/mpp"
|
|
"github.com/pingcap/kvproto/pkg/tikvpb"
|
|
"github.com/pkg/errors"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/stretchr/testify/suite"
|
|
"github.com/tikv/client-go/v2/config"
|
|
"github.com/tikv/client-go/v2/config/retry"
|
|
tikverr "github.com/tikv/client-go/v2/error"
|
|
"github.com/tikv/client-go/v2/internal/apicodec"
|
|
"github.com/tikv/client-go/v2/internal/client"
|
|
"github.com/tikv/client-go/v2/internal/client/mockserver"
|
|
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
|
|
"github.com/tikv/client-go/v2/kv"
|
|
"github.com/tikv/client-go/v2/oracle"
|
|
"github.com/tikv/client-go/v2/oracle/oracles"
|
|
"github.com/tikv/client-go/v2/tikvrpc"
|
|
"github.com/tikv/client-go/v2/util/async"
|
|
pd "github.com/tikv/pd/client"
|
|
pderr "github.com/tikv/pd/client/errs"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
func TestRegionRequestToSingleStore(t *testing.T) {
|
|
suite.Run(t, new(testRegionRequestToSingleStoreSuite))
|
|
}
|
|
|
|
type testRegionRequestToSingleStoreSuite struct {
|
|
suite.Suite
|
|
cluster *mocktikv.Cluster
|
|
store uint64
|
|
peer uint64
|
|
region uint64
|
|
pdCli pd.Client
|
|
cache *RegionCache
|
|
bo *retry.Backoffer
|
|
regionRequestSender *RegionRequestSender
|
|
mvccStore mocktikv.MVCCStore
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) SetupTest() {
|
|
s.mvccStore = mocktikv.MustNewMVCCStore()
|
|
s.cluster = mocktikv.NewCluster(s.mvccStore)
|
|
s.store, s.peer, s.region = mocktikv.BootstrapWithSingleStore(s.cluster)
|
|
s.pdCli = &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)}
|
|
s.cache = NewRegionCache(s.pdCli)
|
|
s.bo = retry.NewNoopBackoff(context.Background())
|
|
client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil)
|
|
s.regionRequestSender = NewRegionRequestSender(s.cache, client, oracle.NoopReadTSValidator{})
|
|
|
|
s.NoError(failpoint.Enable("tikvclient/doNotRecoverStoreHealthCheckPanic", "return"))
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TearDownTest() {
|
|
s.cache.Close()
|
|
s.mvccStore.Close()
|
|
|
|
s.NoError(failpoint.Disable("tikvclient/doNotRecoverStoreHealthCheckPanic"))
|
|
}
|
|
|
|
type fnClient struct {
|
|
fn func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error)
|
|
closedAddr string
|
|
closedVer uint64
|
|
}
|
|
|
|
func (f *fnClient) Close() error {
|
|
return nil
|
|
}
|
|
|
|
func (f *fnClient) CloseAddr(addr string) error {
|
|
return f.CloseAddrVer(addr, math.MaxUint64)
|
|
}
|
|
|
|
func (f *fnClient) CloseAddrVer(addr string, ver uint64) error {
|
|
f.closedAddr = addr
|
|
f.closedVer = ver
|
|
return nil
|
|
}
|
|
|
|
func (f *fnClient) SetEventListener(listener client.ClientEventListener) {}
|
|
|
|
func (f *fnClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
|
|
tikvrpc.AttachContext(req, req.Context)
|
|
return f.fn(ctx, addr, req, timeout)
|
|
}
|
|
|
|
func (f *fnClient) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) {
|
|
go func() {
|
|
tikvrpc.AttachContext(req, req.Context)
|
|
cb.Schedule(f.fn(ctx, addr, req, 0))
|
|
}()
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestOnRegionError() {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
|
Key: []byte("key"),
|
|
Value: []byte("value"),
|
|
})
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
|
|
// test stale command retry.
|
|
test := func() {
|
|
oc := s.regionRequestSender.client
|
|
defer func() {
|
|
s.regionRequestSender.client = oc
|
|
}()
|
|
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
|
staleResp := &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
|
|
RegionError: &errorpb.Error{StaleCommand: &errorpb.StaleCommand{}},
|
|
}}
|
|
return staleResp, nil
|
|
}}
|
|
bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
|
|
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
|
|
s.Nil(err)
|
|
s.NotNil(resp)
|
|
regionErr, _ := resp.GetRegionError()
|
|
s.NotNil(regionErr)
|
|
}
|
|
|
|
s.Run("Default", test)
|
|
|
|
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
|
|
defer failpoint.Disable("tikvclient/useSendReqAsync")
|
|
s.Run("AsyncAPI", test)
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailByResourceGroupThrottled() {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
|
Key: []byte("key"),
|
|
Value: []byte("value"),
|
|
})
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
|
|
// test ErrClientResourceGroupThrottled handled by regionRequestSender
|
|
test := func() {
|
|
oc := s.regionRequestSender.client
|
|
defer func() {
|
|
s.regionRequestSender.client = oc
|
|
}()
|
|
storeOld, _ := s.regionRequestSender.regionCache.stores.get(1)
|
|
epoch := storeOld.epoch
|
|
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
|
return nil, pderr.ErrClientResourceGroupThrottled
|
|
}}
|
|
bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
|
|
_, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
|
|
s.NotNil(err)
|
|
storeNew, _ := s.regionRequestSender.regionCache.stores.get(1)
|
|
// not mark the store need be refill, then the epoch should not be changed.
|
|
s.Equal(epoch, storeNew.epoch)
|
|
// no rpc error if the error is ErrClientResourceGroupThrottled
|
|
s.Nil(s.regionRequestSender.rpcError)
|
|
}
|
|
|
|
s.Run("Default", test)
|
|
|
|
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
|
|
defer failpoint.Disable("tikvclient/useSendReqAsync")
|
|
s.Run("AsyncAPI", test)
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart() {
|
|
s.testOnSendFailedWithStoreRestart()
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestartUsingAsyncAPI() {
|
|
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
|
|
defer failpoint.Disable("tikvclient/useSendReqAsync")
|
|
s.testOnSendFailedWithStoreRestart()
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) testOnSendFailedWithStoreRestart() {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
|
Key: []byte("key"),
|
|
Value: []byte("value"),
|
|
})
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
|
|
s.Nil(err)
|
|
s.NotNil(resp.Resp)
|
|
s.Nil(s.regionRequestSender.rpcError)
|
|
|
|
// stop store.
|
|
s.cluster.StopStore(s.store)
|
|
_, _, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
|
|
s.NotNil(err)
|
|
// The RPC error shouldn't be nil since it failed to sent the request.
|
|
s.NotNil(s.regionRequestSender.rpcError)
|
|
|
|
// start store.
|
|
s.cluster.StartStore(s.store)
|
|
|
|
// locate region again is needed
|
|
// since last request on the region failed and region's info had been cleared.
|
|
region, err = s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
s.NotNil(s.regionRequestSender.rpcError)
|
|
resp, _, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
|
|
s.Nil(err)
|
|
s.NotNil(resp.Resp)
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOne() {
|
|
s.testOnSendFailedWithCloseKnownStoreThenUseNewOne()
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOneUsingAsyncAPI() {
|
|
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
|
|
defer failpoint.Disable("tikvclient/useSendReqAsync")
|
|
s.testOnSendFailedWithCloseKnownStoreThenUseNewOne()
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) testOnSendFailedWithCloseKnownStoreThenUseNewOne() {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
|
Key: []byte("key"),
|
|
Value: []byte("value"),
|
|
})
|
|
|
|
// add new store2 and make store2 as leader.
|
|
store2 := s.cluster.AllocID()
|
|
peer2 := s.cluster.AllocID()
|
|
s.cluster.AddStore(store2, fmt.Sprintf("store%d", store2))
|
|
s.cluster.AddPeer(s.region, store2, peer2)
|
|
s.cluster.ChangeLeader(s.region, peer2)
|
|
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
|
|
s.Nil(err)
|
|
s.NotNil(resp.Resp)
|
|
|
|
// stop store2 and make store1 as new leader.
|
|
s.cluster.StopStore(store2)
|
|
s.cluster.ChangeLeader(s.region, s.peer)
|
|
|
|
// send to store2 fail and send to new leader store1.
|
|
bo2 := retry.NewBackofferWithVars(context.Background(), 100, nil)
|
|
resp, _, err = s.regionRequestSender.SendReq(bo2, req, region.Region, time.Second)
|
|
s.Nil(err)
|
|
regionErr, err := resp.GetRegionError()
|
|
s.Nil(err)
|
|
s.Nil(regionErr)
|
|
s.NotNil(resp.Resp)
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCancelled() {
|
|
s.testOnSendFailedWithCancelled()
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCancelledUsingAsyncAPI() {
|
|
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
|
|
defer failpoint.Disable("tikvclient/useSendReqAsync")
|
|
s.testOnSendFailedWithCancelled()
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) testOnSendFailedWithCancelled() {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
|
Key: []byte("key"),
|
|
Value: []byte("value"),
|
|
})
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
resp, _, err := s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
|
|
s.Nil(err)
|
|
s.NotNil(resp.Resp)
|
|
|
|
// set store to cancel state.
|
|
s.cluster.CancelStore(s.store)
|
|
// locate region again is needed
|
|
// since last request on the region failed and region's info had been cleared.
|
|
_, _, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
|
|
s.NotNil(err)
|
|
s.Equal(errors.Cause(err), context.Canceled)
|
|
|
|
// set store to normal state.
|
|
s.cluster.UnCancelStore(s.store)
|
|
region, err = s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
resp, _, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second)
|
|
s.Nil(err)
|
|
s.NotNil(resp.Resp)
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionWhenCtxCanceled() {
|
|
s.testNoReloadRegionWhenCtxCanceled()
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionWhenCtxCanceledUsingAsyncAPI() {
|
|
failpoint.Enable("tikvclient/useSendReqAsync", `return(true)`)
|
|
defer failpoint.Disable("tikvclient/useSendReqAsync")
|
|
s.testNoReloadRegionWhenCtxCanceled()
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) testNoReloadRegionWhenCtxCanceled() {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
|
Key: []byte("key"),
|
|
Value: []byte("value"),
|
|
})
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
|
|
sender := s.regionRequestSender
|
|
bo, cancel := s.bo.Fork()
|
|
cancel()
|
|
// Call SendKVReq with a canceled context.
|
|
_, _, err = sender.SendReq(bo, req, region.Region, time.Second)
|
|
// Check this kind of error won't cause region cache drop.
|
|
s.Equal(errors.Cause(err), context.Canceled)
|
|
r, expired := sender.regionCache.searchCachedRegionByID(s.region)
|
|
s.False(expired)
|
|
s.NotNil(r)
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestSendReqCtx() {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
|
Key: []byte("key"),
|
|
Value: []byte("value"),
|
|
})
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
resp, ctx, _, err := s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
|
|
s.Nil(err)
|
|
s.NotNil(resp.Resp)
|
|
s.NotNil(ctx)
|
|
req.ReplicaRead = true
|
|
resp, ctx, _, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
|
|
s.Nil(err)
|
|
s.NotNil(resp.Resp)
|
|
s.NotNil(ctx)
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestSendReqAsync() {
|
|
reachable.injectConstantLiveness(s.regionRequestSender.regionCache.stores)
|
|
|
|
ctx := context.Background()
|
|
rl := async.NewRunLoop()
|
|
|
|
s.Run("Basic", func() {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
|
Key: []byte("key"),
|
|
Value: []byte("value"),
|
|
})
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
|
|
complete := false
|
|
s.regionRequestSender.SendReqAsync(s.bo, req, region.Region, time.Second, async.NewCallback(rl, func(resp *tikvrpc.ResponseExt, err error) {
|
|
s.Nil(err)
|
|
s.NotNil(resp.Resp)
|
|
s.NotEmpty(resp.Addr)
|
|
complete = true
|
|
}))
|
|
for !complete {
|
|
_, err := rl.Exec(ctx)
|
|
s.Require().NoError(err)
|
|
}
|
|
})
|
|
|
|
s.Run("StoreLimit", func() {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
|
Key: []byte("key"),
|
|
Value: []byte("value"),
|
|
})
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
|
|
store := s.cache.stores.getOrInsertDefault(s.store)
|
|
|
|
defer func(storeLimit int64, tokenCount int64) {
|
|
kv.StoreLimit.Store(storeLimit)
|
|
store.tokenCount.Store(tokenCount)
|
|
}(kv.StoreLimit.Load(), store.tokenCount.Load())
|
|
kv.StoreLimit.Store(100)
|
|
store.tokenCount.Store(100)
|
|
|
|
complete := false
|
|
s.regionRequestSender.SendReqAsync(s.bo, req, region.Region, time.Second, async.NewCallback(rl, func(resp *tikvrpc.ResponseExt, err error) {
|
|
s.Nil(resp)
|
|
s.NotNil(err)
|
|
e, ok := errors.Cause(err).(*tikverr.ErrTokenLimit)
|
|
s.True(ok)
|
|
s.Equal(s.store, e.StoreID)
|
|
complete = true
|
|
}))
|
|
for !complete {
|
|
_, err := rl.Exec(ctx)
|
|
s.Require().NoError(err)
|
|
}
|
|
})
|
|
|
|
s.Run("RPCCancel", func() {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
|
Key: []byte("key"),
|
|
Value: []byte("value"),
|
|
})
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
|
|
defer func(ctx context.Context, cli client.Client) {
|
|
s.bo.SetCtx(ctx)
|
|
s.regionRequestSender.client = cli
|
|
}(s.bo.GetCtx(), s.regionRequestSender.client)
|
|
|
|
var once sync.Once
|
|
rpcCanceller := NewRPCanceller()
|
|
s.bo.SetCtx(context.WithValue(s.bo.GetCtx(), RPCCancellerCtxKey{}, rpcCanceller))
|
|
s.regionRequestSender.client = &fnClient{
|
|
fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
|
|
once.Do(func() { rpcCanceller.CancelAll() })
|
|
return nil, context.Canceled
|
|
},
|
|
}
|
|
|
|
complete := false
|
|
s.regionRequestSender.SendReqAsync(s.bo, req, region.Region, time.Second, async.NewCallback(rl, func(resp *tikvrpc.ResponseExt, err error) {
|
|
s.Nil(resp)
|
|
s.ErrorIs(err, context.Canceled)
|
|
complete = true
|
|
}))
|
|
for !complete {
|
|
_, err := rl.Exec(ctx)
|
|
s.Require().NoError(err)
|
|
}
|
|
})
|
|
|
|
s.Run("Timeout", func() {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
|
|
Key: []byte("key"),
|
|
Version: math.MaxUint64,
|
|
})
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
|
|
defer func(cli client.Client) {
|
|
s.regionRequestSender.client = cli
|
|
}(s.regionRequestSender.client)
|
|
|
|
s.regionRequestSender.client = &fnClient{
|
|
fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
|
|
<-ctx.Done()
|
|
return nil, ctx.Err()
|
|
},
|
|
}
|
|
|
|
complete := false
|
|
s.regionRequestSender.SendReqAsync(s.bo, req, region.Region, 100*time.Millisecond, async.NewCallback(rl, func(resp *tikvrpc.ResponseExt, err error) {
|
|
s.Nil(err)
|
|
s.NotNil(resp)
|
|
regionErr, err := resp.GetRegionError()
|
|
s.Nil(err)
|
|
s.True(IsFakeRegionError(regionErr))
|
|
complete = true
|
|
}))
|
|
for !complete {
|
|
_, err := rl.Exec(ctx)
|
|
s.Require().NoError(err)
|
|
}
|
|
})
|
|
}
|
|
|
|
// cancelContextClient wraps rpcClient and always cancels context before sending requests.
|
|
type cancelContextClient struct {
|
|
client.Client
|
|
redirectAddr string
|
|
}
|
|
|
|
func (c *cancelContextClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
|
|
childCtx, cancel := context.WithCancel(ctx)
|
|
cancel()
|
|
return c.Client.SendRequest(childCtx, c.redirectAddr, req, timeout)
|
|
}
|
|
|
|
// mockTikvGrpcServer mock a tikv gprc server for testing.
|
|
type mockTikvGrpcServer struct{}
|
|
|
|
var _ tikvpb.TikvServer = &mockTikvGrpcServer{}
|
|
|
|
// KvGet commands with mvcc/txn supported.
|
|
func (s *mockTikvGrpcServer) KvGet(context.Context, *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvScan(context.Context, *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvPrewrite(context.Context, *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvCommit(context.Context, *kvrpcpb.CommitRequest) (*kvrpcpb.CommitResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvImport(context.Context, *kvrpcpb.ImportRequest) (*kvrpcpb.ImportResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvCleanup(context.Context, *kvrpcpb.CleanupRequest) (*kvrpcpb.CleanupResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvBatchGet(context.Context, *kvrpcpb.BatchGetRequest) (*kvrpcpb.BatchGetResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvBatchRollback(context.Context, *kvrpcpb.BatchRollbackRequest) (*kvrpcpb.BatchRollbackResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvScanLock(context.Context, *kvrpcpb.ScanLockRequest) (*kvrpcpb.ScanLockResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvResolveLock(context.Context, *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvPessimisticLock(context.Context, *kvrpcpb.PessimisticLockRequest) (*kvrpcpb.PessimisticLockResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KVPessimisticRollback(context.Context, *kvrpcpb.PessimisticRollbackRequest) (*kvrpcpb.PessimisticRollbackResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvCheckTxnStatus(ctx context.Context, in *kvrpcpb.CheckTxnStatusRequest) (*kvrpcpb.CheckTxnStatusResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvCheckSecondaryLocks(ctx context.Context, in *kvrpcpb.CheckSecondaryLocksRequest) (*kvrpcpb.CheckSecondaryLocksResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvTxnHeartBeat(ctx context.Context, in *kvrpcpb.TxnHeartBeatRequest) (*kvrpcpb.TxnHeartBeatResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvGC(context.Context, *kvrpcpb.GCRequest) (*kvrpcpb.GCResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) KvDeleteRange(context.Context, *kvrpcpb.DeleteRangeRequest) (*kvrpcpb.DeleteRangeResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) RawGet(context.Context, *kvrpcpb.RawGetRequest) (*kvrpcpb.RawGetResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) RawBatchGet(context.Context, *kvrpcpb.RawBatchGetRequest) (*kvrpcpb.RawBatchGetResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) RawPut(context.Context, *kvrpcpb.RawPutRequest) (*kvrpcpb.RawPutResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) RawBatchPut(context.Context, *kvrpcpb.RawBatchPutRequest) (*kvrpcpb.RawBatchPutResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) RawDelete(context.Context, *kvrpcpb.RawDeleteRequest) (*kvrpcpb.RawDeleteResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) RawBatchDelete(context.Context, *kvrpcpb.RawBatchDeleteRequest) (*kvrpcpb.RawBatchDeleteResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) RawScan(context.Context, *kvrpcpb.RawScanRequest) (*kvrpcpb.RawScanResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) RawDeleteRange(context.Context, *kvrpcpb.RawDeleteRangeRequest) (*kvrpcpb.RawDeleteRangeResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) RawBatchScan(context.Context, *kvrpcpb.RawBatchScanRequest) (*kvrpcpb.RawBatchScanResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) RawGetKeyTTL(context.Context, *kvrpcpb.RawGetKeyTTLRequest) (*kvrpcpb.RawGetKeyTTLResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) UnsafeDestroyRange(context.Context, *kvrpcpb.UnsafeDestroyRangeRequest) (*kvrpcpb.UnsafeDestroyRangeResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) RegisterLockObserver(context.Context, *kvrpcpb.RegisterLockObserverRequest) (*kvrpcpb.RegisterLockObserverResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) CheckLockObserver(context.Context, *kvrpcpb.CheckLockObserverRequest) (*kvrpcpb.CheckLockObserverResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) RemoveLockObserver(context.Context, *kvrpcpb.RemoveLockObserverRequest) (*kvrpcpb.RemoveLockObserverResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) PhysicalScanLock(context.Context, *kvrpcpb.PhysicalScanLockRequest) (*kvrpcpb.PhysicalScanLockResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) Coprocessor(context.Context, *coprocessor.Request) (*coprocessor.Response, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) BatchCoprocessor(*coprocessor.BatchRequest, tikvpb.Tikv_BatchCoprocessorServer) error {
|
|
return errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) RawCoprocessor(context.Context, *kvrpcpb.RawCoprocessorRequest) (*kvrpcpb.RawCoprocessorResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) DispatchMPPTask(context.Context, *mpp.DispatchTaskRequest) (*mpp.DispatchTaskResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) IsAlive(context.Context, *mpp.IsAliveRequest) (*mpp.IsAliveResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) ReportMPPTaskStatus(context.Context, *mpp.ReportTaskStatusRequest) (*mpp.ReportTaskStatusResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) EstablishMPPConnection(*mpp.EstablishMPPConnectionRequest, tikvpb.Tikv_EstablishMPPConnectionServer) error {
|
|
return errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) CancelMPPTask(context.Context, *mpp.CancelTaskRequest) (*mpp.CancelTaskResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) Raft(tikvpb.Tikv_RaftServer) error {
|
|
return errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) BatchRaft(tikvpb.Tikv_BatchRaftServer) error {
|
|
return errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) Snapshot(tikvpb.Tikv_SnapshotServer) error {
|
|
return errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) MvccGetByKey(context.Context, *kvrpcpb.MvccGetByKeyRequest) (*kvrpcpb.MvccGetByKeyResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) MvccGetByStartTs(context.Context, *kvrpcpb.MvccGetByStartTsRequest) (*kvrpcpb.MvccGetByStartTsResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
func (s *mockTikvGrpcServer) SplitRegion(context.Context, *kvrpcpb.SplitRegionRequest) (*kvrpcpb.SplitRegionResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) CoprocessorStream(*coprocessor.Request, tikvpb.Tikv_CoprocessorStreamServer) error {
|
|
return errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) BatchCommands(tikvpb.Tikv_BatchCommandsServer) error {
|
|
return errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpcpb.ReadIndexResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) CheckLeader(context.Context, *kvrpcpb.CheckLeaderRequest) (*kvrpcpb.CheckLeaderResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) GetStoreSafeTS(context.Context, *kvrpcpb.StoreSafeTSRequest) (*kvrpcpb.StoreSafeTSResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) RawCompareAndSwap(context.Context, *kvrpcpb.RawCASRequest) (*kvrpcpb.RawCASResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) GetLockWaitInfo(context.Context, *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) RawChecksum(context.Context, *kvrpcpb.RawChecksumRequest) (*kvrpcpb.RawChecksumResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) Compact(ctx context.Context, request *kvrpcpb.CompactRequest) (*kvrpcpb.CompactResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) GetLockWaitHistory(ctx context.Context, request *kvrpcpb.GetLockWaitHistoryRequest) (*kvrpcpb.GetLockWaitHistoryResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) TryAddLock(context.Context, *disaggregated.TryAddLockRequest) (*disaggregated.TryAddLockResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) TryMarkDelete(context.Context, *disaggregated.TryMarkDeleteRequest) (*disaggregated.TryMarkDeleteResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) KvFlashbackToVersion(context.Context, *kvrpcpb.FlashbackToVersionRequest) (*kvrpcpb.FlashbackToVersionResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) KvPrepareFlashbackToVersion(context.Context, *kvrpcpb.PrepareFlashbackToVersionRequest) (*kvrpcpb.PrepareFlashbackToVersionResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) EstablishDisaggTask(context.Context, *disaggregated.EstablishDisaggTaskRequest) (*disaggregated.EstablishDisaggTaskResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) FetchDisaggPages(*disaggregated.FetchDisaggPagesRequest, tikvpb.Tikv_FetchDisaggPagesServer) error {
|
|
return errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) TabletSnapshot(_ tikvpb.Tikv_TabletSnapshotServer) error {
|
|
return errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) GetTiFlashSystemTable(context.Context, *kvrpcpb.TiFlashSystemTableRequest) (*kvrpcpb.TiFlashSystemTableResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) GetDisaggConfig(context.Context, *disaggregated.GetDisaggConfigRequest) (*disaggregated.GetDisaggConfigResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) CancelDisaggTask(context.Context, *disaggregated.CancelDisaggTaskRequest) (*disaggregated.CancelDisaggTaskResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) KvFlush(context.Context, *kvrpcpb.FlushRequest) (*kvrpcpb.FlushResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) KvBufferBatchGet(context.Context, *kvrpcpb.BufferBatchGetRequest) (*kvrpcpb.BufferBatchGetResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) GetHealthFeedback(ctx context.Context, request *kvrpcpb.GetHealthFeedbackRequest) (*kvrpcpb.GetHealthFeedbackResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *mockTikvGrpcServer) BroadcastTxnStatus(ctx context.Context, request *kvrpcpb.BroadcastTxnStatusRequest) (*kvrpcpb.BroadcastTxnStatusResponse, error) {
|
|
return nil, errors.New("unreachable")
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled() {
|
|
// prepare a mock tikv grpc server
|
|
addr := "localhost:56341"
|
|
lis, err := net.Listen("tcp", addr)
|
|
s.Nil(err)
|
|
server := grpc.NewServer()
|
|
tikvpb.RegisterTikvServer(server, &mockTikvGrpcServer{})
|
|
wg := &sync.WaitGroup{}
|
|
wg.Add(1)
|
|
go func() {
|
|
server.Serve(lis)
|
|
wg.Done()
|
|
}()
|
|
|
|
cli := client.NewRPCClient()
|
|
sender := NewRegionRequestSender(s.cache, cli, oracle.NoopReadTSValidator{})
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdRawPut, &kvrpcpb.RawPutRequest{
|
|
Key: []byte("key"),
|
|
Value: []byte("value"),
|
|
})
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
|
|
bo, cancel := s.bo.Fork()
|
|
cancel()
|
|
_, _, err = sender.SendReq(bo, req, region.Region, 3*time.Second)
|
|
s.Equal(errors.Cause(err), context.Canceled)
|
|
r, expired := sender.regionCache.searchCachedRegionByID(s.region)
|
|
s.False(expired)
|
|
s.NotNil(r)
|
|
|
|
// Just for covering error code = codes.Canceled.
|
|
client1 := &cancelContextClient{
|
|
Client: client.NewRPCClient(),
|
|
redirectAddr: addr,
|
|
}
|
|
sender = NewRegionRequestSender(s.cache, client1, oracle.NoopReadTSValidator{})
|
|
sender.SendReq(s.bo, req, region.Region, 3*time.Second)
|
|
|
|
// cleanup
|
|
server.Stop()
|
|
wg.Wait()
|
|
cli.Close()
|
|
client1.Close()
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestOnMaxTimestampNotSyncedError() {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{})
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
|
|
// test retry for max timestamp not synced
|
|
func() {
|
|
oc := s.regionRequestSender.client
|
|
defer func() {
|
|
s.regionRequestSender.client = oc
|
|
}()
|
|
count := 0
|
|
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
|
count++
|
|
var resp *tikvrpc.Response
|
|
if count < 3 {
|
|
resp = &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{
|
|
RegionError: &errorpb.Error{MaxTimestampNotSynced: &errorpb.MaxTimestampNotSynced{}},
|
|
}}
|
|
} else {
|
|
resp = &tikvrpc.Response{Resp: &kvrpcpb.PrewriteResponse{}}
|
|
}
|
|
return resp, nil
|
|
}}
|
|
bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
|
|
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
|
|
s.Nil(err)
|
|
s.NotNil(resp)
|
|
}()
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
|
|
// test kv epochNotMatch return empty regions
|
|
s.cache.OnRegionEpochNotMatch(s.bo, &RPCContext{Region: region.Region, Store: &Store{storeID: s.store}}, []*metapb.Region{})
|
|
s.Nil(err)
|
|
r, expired := s.cache.searchCachedRegionByID(s.region)
|
|
s.True(expired)
|
|
s.NotNil(r)
|
|
|
|
// refill cache
|
|
region, err = s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
|
|
// test kv load new region with new start-key and new epoch
|
|
v2 := region.Region.confVer + 1
|
|
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
|
|
st := newUninitializedStore(s.store)
|
|
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true)
|
|
region, err = s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
s.Equal(region.Region.confVer, v2)
|
|
s.Equal(region.Region.ver, region.Region.ver)
|
|
|
|
v3 := region.Region.confVer + 1
|
|
r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}}
|
|
st = newUninitializedStore(s.store)
|
|
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true)
|
|
region, err = s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
s.Equal(region.Region.confVer, region.Region.confVer)
|
|
s.Equal(region.Region.ver, v3)
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestCloseConnectionOnStoreNotMatch() {
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
|
|
Key: []byte("key"),
|
|
})
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
|
|
oc := s.regionRequestSender.client
|
|
defer func() {
|
|
s.regionRequestSender.client = oc
|
|
}()
|
|
|
|
var target string
|
|
client := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
|
target = addr
|
|
resp := &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{
|
|
RegionError: &errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}},
|
|
}}
|
|
return resp, nil
|
|
}}
|
|
|
|
s.regionRequestSender.client = client
|
|
bo := retry.NewBackofferWithVars(context.Background(), 5, nil)
|
|
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Second)
|
|
s.Nil(err)
|
|
s.NotNil(resp)
|
|
regionErr, _ := resp.GetRegionError()
|
|
s.NotNil(regionErr)
|
|
s.Equal(target, client.closedAddr)
|
|
var expected uint64 = math.MaxUint64
|
|
s.Equal(expected, client.closedVer)
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchClient() {
|
|
config.UpdateGlobal(func(conf *config.Config) {
|
|
conf.TiKVClient.MaxBatchSize = 0
|
|
})()
|
|
|
|
server, port := mockserver.StartMockTikvService()
|
|
s.True(port > 0)
|
|
server.SetMetaChecker(func(ctx context.Context) error {
|
|
return context.DeadlineExceeded
|
|
})
|
|
rpcClient := client.NewRPCClient()
|
|
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
|
return rpcClient.SendRequest(ctx, server.Addr(), req, timeout)
|
|
}}
|
|
defer func() {
|
|
rpcClient.Close()
|
|
server.Stop()
|
|
}()
|
|
|
|
bo := retry.NewBackofferWithVars(context.Background(), 2000, nil)
|
|
region, err := s.cache.LocateRegionByID(bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a"), Version: 1})
|
|
// send a probe request to make sure the mock server is ready.
|
|
s.regionRequestSender.SendReq(retry.NewNoopBackoff(context.Background()), req, region.Region, time.Second)
|
|
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Millisecond*10)
|
|
s.Nil(err)
|
|
s.NotNil(resp)
|
|
regionErr, _ := resp.GetRegionError()
|
|
s.True(retry.IsFakeRegionError(regionErr))
|
|
s.Equal(0, bo.GetTotalBackoffTimes()) // use kv read timeout will do fast retry, so backoff times should be 0.
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() {
|
|
// This test should use `go test -race` to run.
|
|
config.UpdateGlobal(func(conf *config.Config) {
|
|
conf.TiKVClient.MaxBatchSize = 128
|
|
})()
|
|
|
|
server, port := mockserver.StartMockTikvService()
|
|
s.True(port > 0)
|
|
rpcClient := client.NewRPCClient()
|
|
fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
|
return rpcClient.SendRequest(ctx, server.Addr(), req, timeout)
|
|
}}
|
|
|
|
defer func() {
|
|
rpcClient.Close()
|
|
server.Stop()
|
|
}()
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 100; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for j := 0; j < 100; j++ {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
bo := retry.NewBackofferWithVars(ctx, int(client.ReadTimeoutShort.Milliseconds()), nil)
|
|
region, err := s.cache.LocateRegionByID(bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
go func() {
|
|
// mock for kill query execution or timeout.
|
|
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)+1))
|
|
cancel()
|
|
}()
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{Data: []byte("a"), StartTs: 1})
|
|
regionRequestSender := NewRegionRequestSender(s.cache, fnClient, oracle.NoopReadTSValidator{})
|
|
reachable.injectConstantLiveness(regionRequestSender.regionCache.stores)
|
|
regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
// batchSendLoop should not panic.
|
|
s.Equal(atomic.LoadInt64(&client.BatchSendLoopPanicCounter), int64(0))
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestClusterIDInReq() {
|
|
server, port := mockserver.StartMockTikvService()
|
|
s.True(port > 0)
|
|
rpcClient := client.NewRPCClient()
|
|
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
|
s.True(req.ClusterId > 0)
|
|
return rpcClient.SendRequest(ctx, server.Addr(), req, timeout)
|
|
}}
|
|
defer func() {
|
|
rpcClient.Close()
|
|
server.Stop()
|
|
}()
|
|
|
|
bo := retry.NewBackofferWithVars(context.Background(), 2000, nil)
|
|
region, err := s.cache.LocateRegionByID(bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("a"), Version: 1})
|
|
// send a probe request to make sure the mock server is ready.
|
|
s.regionRequestSender.SendReq(retry.NewNoopBackoff(context.Background()), req, region.Region, time.Second)
|
|
resp, _, err := s.regionRequestSender.SendReq(bo, req, region.Region, time.Millisecond*10)
|
|
s.Nil(err)
|
|
s.NotNil(resp)
|
|
regionErr, _ := resp.GetRegionError()
|
|
s.Nil(regionErr)
|
|
}
|
|
|
|
type emptyClient struct {
|
|
client.Client
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestClientExt() {
|
|
var cli client.Client = client.NewRPCClient()
|
|
sender := NewRegionRequestSender(s.cache, cli, oracle.NoopReadTSValidator{})
|
|
s.NotNil(sender.client)
|
|
s.NotNil(sender.getClientExt())
|
|
cli.Close()
|
|
|
|
cli = &emptyClient{}
|
|
sender = NewRegionRequestSender(s.cache, cli, oracle.NoopReadTSValidator{})
|
|
s.NotNil(sender.client)
|
|
s.Nil(sender.getClientExt())
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestSenderString() {
|
|
sender := NewRegionRequestSender(s.cache, &fnClient{}, oracle.NoopReadTSValidator{})
|
|
loc, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
// invalid region cache before sending request.
|
|
s.cache.InvalidateCachedRegion(loc.Region)
|
|
sender.SendReqCtx(s.bo, tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}), loc.Region, time.Second, tikvrpc.TiKV)
|
|
s.Equal("{rpcError:<nil>, replicaSelector: <nil>}", sender.String())
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestStats() {
|
|
reqStats := NewRegionRequestRuntimeStats()
|
|
reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Second)
|
|
reqStats.RecordRPCRuntimeStats(tikvrpc.CmdGet, time.Millisecond)
|
|
reqStats.RecordRPCRuntimeStats(tikvrpc.CmdCop, time.Second*2)
|
|
reqStats.RecordRPCRuntimeStats(tikvrpc.CmdCop, time.Millisecond*200)
|
|
reqStats.RecordRPCErrorStats("context canceled")
|
|
reqStats.RecordRPCErrorStats("context canceled")
|
|
reqStats.RecordRPCErrorStats("region_not_found")
|
|
reqStats.Merge(NewRegionRequestRuntimeStats())
|
|
reqStats2 := NewRegionRequestRuntimeStats()
|
|
reqStats2.Merge(reqStats.Clone())
|
|
expecteds := []string{
|
|
// Since map iteration order is random, we need to check all possible orders.
|
|
"Get:{num_rpc:2, total_time:1s},Cop:{num_rpc:2, total_time:2.2s}, rpc_errors:{region_not_found:1, context canceled:2}",
|
|
"Get:{num_rpc:2, total_time:1s},Cop:{num_rpc:2, total_time:2.2s}, rpc_errors:{context canceled:2, region_not_found:1}",
|
|
"Cop:{num_rpc:2, total_time:2.2s},Get:{num_rpc:2, total_time:1s}, rpc_errors:{context canceled:2, region_not_found:1}",
|
|
"Cop:{num_rpc:2, total_time:2.2s},Get:{num_rpc:2, total_time:1s}, rpc_errors:{region_not_found:1, context canceled:2}",
|
|
}
|
|
s.Contains(expecteds, reqStats.String())
|
|
s.Contains(expecteds, reqStats2.String())
|
|
for i := 0; i < 50; i++ {
|
|
reqStats.RecordRPCErrorStats("err_" + strconv.Itoa(i))
|
|
}
|
|
s.Regexp("{.*err_.*:1.*, other_error:36}", reqStats.RequestErrorStats.String())
|
|
s.Regexp(".*num_rpc.*total_time.*, rpc_errors:{.*err.*, other_error:36}", reqStats.String())
|
|
|
|
access := &ReplicaAccessStats{}
|
|
access.recordReplicaAccessInfo(true, false, 1, 2, "data_not_ready")
|
|
access.recordReplicaAccessInfo(false, false, 3, 4, "not_leader")
|
|
access.recordReplicaAccessInfo(false, true, 5, 6, "server_is_Busy")
|
|
s.Equal("{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}", access.String())
|
|
for i := 0; i < 20; i++ {
|
|
access.recordReplicaAccessInfo(false, false, 5+uint64(i)%2, 6, "server_is_Busy")
|
|
}
|
|
expecteds = []string{
|
|
// Since map iteration order is random, we need to check all possible orders.
|
|
"{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}, {peer:5, store:6, err:server_is_Busy}, {peer:6, store:6, err:server_is_Busy}, overflow_count:{{peer:5, error_stats:{server_is_Busy:9}}, {peer:6, error_stats:{server_is_Busy:9}}}",
|
|
"{stale_read, peer:1, store:2, err:data_not_ready}, {peer:3, store:4, err:not_leader}, {replica_read, peer:5, store:6, err:server_is_Busy}, {peer:5, store:6, err:server_is_Busy}, {peer:6, store:6, err:server_is_Busy}, overflow_count:{{peer:6, error_stats:{server_is_Busy:9}}, {peer:5, error_stats:{server_is_Busy:9}}}",
|
|
}
|
|
s.Contains(expecteds, access.String())
|
|
}
|
|
|
|
func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestValidateReadTS() {
|
|
oracles.EnableTSValidation.Store(true)
|
|
defer oracles.EnableTSValidation.Store(false)
|
|
o, err := oracles.NewPdOracle(s.pdCli, &oracles.PDOracleOptions{
|
|
UpdateInterval: time.Second * 2,
|
|
})
|
|
s.NoError(err)
|
|
s.regionRequestSender.readTSValidator = o
|
|
defer o.Close()
|
|
|
|
testImpl := func(ts func() uint64, staleRead bool, expectedErrorType error) {
|
|
region, err := s.cache.LocateRegionByID(s.bo, s.region)
|
|
s.Nil(err)
|
|
s.NotNil(region)
|
|
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{
|
|
Key: []byte("k"),
|
|
Version: ts(),
|
|
})
|
|
|
|
req.StaleRead = staleRead
|
|
_, _, _, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second, tikvrpc.TiKV)
|
|
|
|
if expectedErrorType == nil {
|
|
s.NoError(err)
|
|
} else {
|
|
s.Error(err)
|
|
s.IsType(err, expectedErrorType)
|
|
}
|
|
}
|
|
|
|
getTS := func() uint64 {
|
|
ts, err := o.GetTimestamp(s.bo.GetCtx(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
s.NoError(err)
|
|
return ts
|
|
}
|
|
|
|
addTS := func(ts uint64, diff time.Duration) uint64 {
|
|
return oracle.ComposeTS(oracle.GetPhysical(oracle.GetTimeFromTS(ts).Add(diff)), oracle.ExtractLogical(ts))
|
|
}
|
|
|
|
testImpl(getTS, false, nil)
|
|
testImpl(getTS, true, nil)
|
|
testImpl(func() uint64 { return addTS(getTS(), -time.Minute) }, false, nil)
|
|
testImpl(func() uint64 { return addTS(getTS(), -time.Minute) }, true, nil)
|
|
testImpl(func() uint64 { return addTS(getTS(), +time.Minute) }, false, oracle.ErrFutureTSRead{})
|
|
testImpl(func() uint64 { return addTS(getTS(), +time.Minute) }, true, oracle.ErrFutureTSRead{})
|
|
testImpl(func() uint64 { return math.MaxUint64 }, false, nil)
|
|
testImpl(func() uint64 { return math.MaxUint64 }, true, oracle.ErrLatestStaleRead{})
|
|
}
|
|
|
|
type noCauseError struct {
|
|
error
|
|
}
|
|
|
|
func (_ noCauseError) Cause() error {
|
|
return nil
|
|
}
|
|
|
|
func TestGetErrMsg(t *testing.T) {
|
|
err := noCauseError{error: errors.New("no cause err")}
|
|
require.Equal(t, nil, errors.Cause(err))
|
|
require.Panicsf(t, func() {
|
|
_ = errors.Cause(err).Error()
|
|
}, "should panic")
|
|
require.Equal(t, "no cause err", getErrMsg(err))
|
|
}
|