mirror of https://github.com/tikv/client-go.git
Fix batch client batchSendLoop panic (#1021)
* init Signed-off-by: crazycs520 <crazycs520@gmail.com> * add test Signed-off-by: crazycs520 <crazycs520@gmail.com> * refine test Signed-off-by: crazycs520 <crazycs520@gmail.com> * try to fix Signed-off-by: crazycs520 <crazycs520@gmail.com> * fix test Signed-off-by: crazycs520 <crazycs520@gmail.com> * fix Signed-off-by: crazycs520 <crazycs520@gmail.com> * refine Signed-off-by: crazycs520 <crazycs520@gmail.com> * address comment Signed-off-by: crazycs520 <crazycs520@gmail.com> * address comment Signed-off-by: crazycs520 <crazycs520@gmail.com> --------- Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
parent
e70513e671
commit
2eaf68e0cd
|
|
@ -92,28 +92,27 @@ func attachAPICtx(c Codec, req *tikvrpc.Request) *tikvrpc.Request {
|
||||||
// Shallow copy the request to avoid concurrent modification.
|
// Shallow copy the request to avoid concurrent modification.
|
||||||
r := *req
|
r := *req
|
||||||
|
|
||||||
ctx := &r.Context
|
r.Context.ApiVersion = c.GetAPIVersion()
|
||||||
ctx.ApiVersion = c.GetAPIVersion()
|
r.Context.KeyspaceId = uint32(c.GetKeyspaceID())
|
||||||
ctx.KeyspaceId = uint32(c.GetKeyspaceID())
|
|
||||||
|
|
||||||
switch r.Type {
|
switch r.Type {
|
||||||
case tikvrpc.CmdMPPTask:
|
case tikvrpc.CmdMPPTask:
|
||||||
mpp := *r.DispatchMPPTask()
|
mpp := *r.DispatchMPPTask()
|
||||||
// Shallow copy the meta to avoid concurrent modification.
|
// Shallow copy the meta to avoid concurrent modification.
|
||||||
meta := *mpp.Meta
|
meta := *mpp.Meta
|
||||||
meta.KeyspaceId = ctx.KeyspaceId
|
meta.KeyspaceId = r.Context.KeyspaceId
|
||||||
meta.ApiVersion = ctx.ApiVersion
|
meta.ApiVersion = r.Context.ApiVersion
|
||||||
mpp.Meta = &meta
|
mpp.Meta = &meta
|
||||||
r.Req = &mpp
|
r.Req = &mpp
|
||||||
|
|
||||||
case tikvrpc.CmdCompact:
|
case tikvrpc.CmdCompact:
|
||||||
compact := *r.Compact()
|
compact := *r.Compact()
|
||||||
compact.KeyspaceId = ctx.KeyspaceId
|
compact.KeyspaceId = r.Context.KeyspaceId
|
||||||
compact.ApiVersion = ctx.ApiVersion
|
compact.ApiVersion = r.Context.ApiVersion
|
||||||
r.Req = &compact
|
r.Req = &compact
|
||||||
}
|
}
|
||||||
|
|
||||||
tikvrpc.AttachContext(&r, ctx)
|
tikvrpc.AttachContext(&r, r.Context)
|
||||||
|
|
||||||
return &r
|
return &r
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -298,6 +298,9 @@ func (a *batchConn) fetchMorePendingRequests(
|
||||||
|
|
||||||
const idleTimeout = 3 * time.Minute
|
const idleTimeout = 3 * time.Minute
|
||||||
|
|
||||||
|
// BatchSendLoopPanicCounter is only used for testing.
|
||||||
|
var BatchSendLoopPanicCounter int64 = 0
|
||||||
|
|
||||||
func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
|
func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
|
|
@ -305,7 +308,8 @@ func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
|
||||||
logutil.BgLogger().Error("batchSendLoop",
|
logutil.BgLogger().Error("batchSendLoop",
|
||||||
zap.Any("r", r),
|
zap.Any("r", r),
|
||||||
zap.Stack("stack"))
|
zap.Stack("stack"))
|
||||||
logutil.BgLogger().Info("restart batchSendLoop")
|
atomic.AddInt64(&BatchSendLoopPanicCounter, 1)
|
||||||
|
logutil.BgLogger().Info("restart batchSendLoop", zap.Int64("count", atomic.LoadInt64(&BatchSendLoopPanicCounter)))
|
||||||
go a.batchSendLoop(cfg)
|
go a.batchSendLoop(cfg)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
||||||
|
|
@ -37,8 +37,10 @@ package locate
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
@ -733,3 +735,52 @@ func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchC
|
||||||
s.True(IsFakeRegionError(regionErr))
|
s.True(IsFakeRegionError(regionErr))
|
||||||
s.Equal(0, bo.GetTotalBackoffTimes()) // use kv read timeout will do fast retry, so backoff times should be 0.
|
s.Equal(0, bo.GetTotalBackoffTimes()) // use kv read timeout will do fast retry, so backoff times should be 0.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() {
|
||||||
|
// This test should use `go test -race` to run.
|
||||||
|
config.UpdateGlobal(func(conf *config.Config) {
|
||||||
|
conf.TiKVClient.MaxBatchSize = 128
|
||||||
|
})()
|
||||||
|
|
||||||
|
server, port := mock_server.StartMockTikvService()
|
||||||
|
s.True(port > 0)
|
||||||
|
rpcClient := client.NewRPCClient()
|
||||||
|
fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
||||||
|
return rpcClient.SendRequest(ctx, server.Addr(), req, timeout)
|
||||||
|
}}
|
||||||
|
tf := func(s *Store, bo *retry.Backoffer) livenessState {
|
||||||
|
return reachable
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
rpcClient.Close()
|
||||||
|
server.Stop()
|
||||||
|
}()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for j := 0; j < 100; j++ {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
bo := retry.NewBackofferWithVars(ctx, int(client.ReadTimeoutShort.Milliseconds()), nil)
|
||||||
|
region, err := s.cache.LocateRegionByID(bo, s.region)
|
||||||
|
s.Nil(err)
|
||||||
|
s.NotNil(region)
|
||||||
|
go func() {
|
||||||
|
// mock for kill query execution or timeout.
|
||||||
|
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5)+1))
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
req := tikvrpc.NewRequest(tikvrpc.CmdCop, &coprocessor.Request{Data: []byte("a"), StartTs: 1})
|
||||||
|
regionRequestSender := NewRegionRequestSender(s.cache, fnClient)
|
||||||
|
regionRequestSender.regionCache.testingKnobs.mockRequestLiveness.Store((*livenessFunc)(&tf))
|
||||||
|
regionRequestSender.SendReq(bo, req, region.Region, client.ReadTimeoutShort)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
// batchSendLoop should not panic.
|
||||||
|
s.Equal(atomic.LoadInt64(&client.BatchSendLoopPanicCounter), int64(0))
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -709,7 +709,9 @@ type MPPStreamResponse struct {
|
||||||
|
|
||||||
// AttachContext sets the request context to the request,
|
// AttachContext sets the request context to the request,
|
||||||
// return false if encounter unknown request type.
|
// return false if encounter unknown request type.
|
||||||
func AttachContext(req *Request, ctx *kvrpcpb.Context) bool {
|
// Parameter `rpcCtx` use `kvrpcpb.Context` instead of `*kvrpcpb.Context` to avoid concurrent modification by shallow copy.
|
||||||
|
func AttachContext(req *Request, rpcCtx kvrpcpb.Context) bool {
|
||||||
|
ctx := &rpcCtx
|
||||||
switch req.Type {
|
switch req.Type {
|
||||||
case CmdGet:
|
case CmdGet:
|
||||||
req.Get().Context = ctx
|
req.Get().Context = ctx
|
||||||
|
|
@ -807,13 +809,14 @@ func AttachContext(req *Request, ctx *kvrpcpb.Context) bool {
|
||||||
|
|
||||||
// SetContext set the Context field for the given req to the specified ctx.
|
// SetContext set the Context field for the given req to the specified ctx.
|
||||||
func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
|
func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
|
||||||
ctx := &req.Context
|
|
||||||
if region != nil {
|
if region != nil {
|
||||||
ctx.RegionId = region.Id
|
req.Context.RegionId = region.Id
|
||||||
ctx.RegionEpoch = region.RegionEpoch
|
req.Context.RegionEpoch = region.RegionEpoch
|
||||||
}
|
}
|
||||||
ctx.Peer = peer
|
req.Context.Peer = peer
|
||||||
if !AttachContext(req, ctx) {
|
|
||||||
|
// Shallow copy the context to avoid concurrent modification.
|
||||||
|
if !AttachContext(req, req.Context) {
|
||||||
return errors.Errorf("invalid request type %v", req.Type)
|
return errors.Errorf("invalid request type %v", req.Type)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue