mirror of https://github.com/tikv/client-go.git
fix issue read req timeout bug cause by pr#1223 (#1232)
* fix issue read req timeout bug cause by pr#1223 Signed-off-by: crazycs520 <crazycs520@gmail.com> * add test Signed-off-by: crazycs520 <crazycs520@gmail.com> * fix test Signed-off-by: crazycs520 <crazycs520@gmail.com> * implement Cause Signed-off-by: crazycs520 <crazycs520@gmail.com> * fix race test Signed-off-by: crazycs520 <crazycs520@gmail.com> --------- Signed-off-by: crazycs520 <crazycs520@gmail.com> Co-authored-by: you06 <you1474600@gmail.com>
This commit is contained in:
parent
87a984a72d
commit
98a7df8f41
|
|
@ -138,6 +138,10 @@ func (e *ErrConn) Error() string {
|
||||||
return fmt.Sprintf("[%s](%d) %s", e.Addr, e.Ver, e.Err.Error())
|
return fmt.Sprintf("[%s](%d) %s", e.Addr, e.Ver, e.Err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *ErrConn) Cause() error {
|
||||||
|
return e.Err
|
||||||
|
}
|
||||||
|
|
||||||
func (e *ErrConn) Unwrap() error {
|
func (e *ErrConn) Unwrap() error {
|
||||||
return e.Err
|
return e.Err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"github.com/tikv/client-go/v2/config/retry"
|
"github.com/tikv/client-go/v2/config/retry"
|
||||||
"github.com/tikv/client-go/v2/internal/apicodec"
|
"github.com/tikv/client-go/v2/internal/apicodec"
|
||||||
"github.com/tikv/client-go/v2/internal/client"
|
"github.com/tikv/client-go/v2/internal/client"
|
||||||
|
"github.com/tikv/client-go/v2/internal/client/mockserver"
|
||||||
"github.com/tikv/client-go/v2/internal/logutil"
|
"github.com/tikv/client-go/v2/internal/logutil"
|
||||||
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
|
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
|
||||||
"github.com/tikv/client-go/v2/kv"
|
"github.com/tikv/client-go/v2/kv"
|
||||||
|
|
@ -3215,6 +3216,71 @@ func (s *testReplicaSelectorSuite) getRegion() *Region {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTiKVClientReadTimeout(t *testing.T) {
|
||||||
|
if israce.RaceEnabled {
|
||||||
|
t.Skip("the test run with race will failed, so skip it")
|
||||||
|
}
|
||||||
|
config.UpdateGlobal(func(conf *config.Config) {
|
||||||
|
// enable batch client.
|
||||||
|
conf.TiKVClient.MaxBatchSize = 128
|
||||||
|
})()
|
||||||
|
s := new(testReplicaSelectorSuite)
|
||||||
|
s.SetupTest(t)
|
||||||
|
defer s.TearDownTest()
|
||||||
|
|
||||||
|
server, port := mockserver.StartMockTikvService()
|
||||||
|
s.True(port > 0)
|
||||||
|
server.SetMetaChecker(func(ctx context.Context) error {
|
||||||
|
time.Sleep(time.Millisecond * 10)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
rpcClient := client.NewRPCClient()
|
||||||
|
defer func() {
|
||||||
|
rpcClient.Close()
|
||||||
|
server.Stop()
|
||||||
|
}()
|
||||||
|
|
||||||
|
accessPath := []string{}
|
||||||
|
fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
||||||
|
accessPath = append(accessPath, addr)
|
||||||
|
accessPath = append(accessPath, fmt.Sprintf("{addr: %v, replica-read: %v, stale-read: %v, timeout: %v}", addr, req.ReplicaRead, req.StaleRead, req.MaxExecutionDurationMs))
|
||||||
|
return rpcClient.SendRequest(ctx, server.Addr(), req, timeout)
|
||||||
|
}}
|
||||||
|
rc := s.getRegion()
|
||||||
|
s.NotNil(rc)
|
||||||
|
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key"), Version: 1})
|
||||||
|
req.ReplicaReadType = kv.ReplicaReadLeader
|
||||||
|
req.MaxExecutionDurationMs = 1
|
||||||
|
bo := retry.NewBackofferWithVars(context.Background(), 2000, nil)
|
||||||
|
sender := NewRegionRequestSender(s.cache, fnClient)
|
||||||
|
resp, _, err := sender.SendReq(bo, req, rc.VerID(), time.Millisecond)
|
||||||
|
s.Nil(err)
|
||||||
|
s.NotNil(resp)
|
||||||
|
regionErr, _ := resp.GetRegionError()
|
||||||
|
s.True(IsFakeRegionError(regionErr))
|
||||||
|
s.Equal(0, bo.GetTotalBackoffTimes())
|
||||||
|
s.Equal([]string{
|
||||||
|
"store1", "{addr: store1, replica-read: false, stale-read: false, timeout: 1}",
|
||||||
|
"store2", "{addr: store2, replica-read: true, stale-read: false, timeout: 1}",
|
||||||
|
"store3", "{addr: store3, replica-read: true, stale-read: false, timeout: 1}",
|
||||||
|
}, accessPath)
|
||||||
|
// clear max execution duration for retry.
|
||||||
|
req.MaxExecutionDurationMs = 0
|
||||||
|
sender = NewRegionRequestSender(s.cache, fnClient)
|
||||||
|
resp, _, err = sender.SendReq(bo, req, rc.VerID(), time.Second) // use a longer timeout.
|
||||||
|
s.Nil(err)
|
||||||
|
s.NotNil(resp)
|
||||||
|
regionErr, _ = resp.GetRegionError()
|
||||||
|
s.Nil(regionErr)
|
||||||
|
s.Equal(0, bo.GetTotalBackoffTimes())
|
||||||
|
s.Equal([]string{
|
||||||
|
"store1", "{addr: store1, replica-read: false, stale-read: false, timeout: 1}",
|
||||||
|
"store2", "{addr: store2, replica-read: true, stale-read: false, timeout: 1}",
|
||||||
|
"store3", "{addr: store3, replica-read: true, stale-read: false, timeout: 1}",
|
||||||
|
"store1", "{addr: store1, replica-read: true, stale-read: false, timeout: 1000}",
|
||||||
|
}, accessPath)
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkReplicaSelector(b *testing.B) {
|
func BenchmarkReplicaSelector(b *testing.B) {
|
||||||
mvccStore := mocktikv.MustNewMVCCStore()
|
mvccStore := mocktikv.MustNewMVCCStore()
|
||||||
cluster := mocktikv.NewCluster(mvccStore)
|
cluster := mocktikv.NewCluster(mvccStore)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue