mirror of https://github.com/tikv/client-go.git
Fallback to leader read when replica read meets flashback-in-progress error (#796)
* handle flashback error when follower read Signed-off-by: you06 <you1474600@gmail.com> * add test Signed-off-by: you06 <you1474600@gmail.com> * trigger CI Signed-off-by: you06 <you1474600@gmail.com> * fix panic Signed-off-by: you06 <you1474600@gmail.com> * trigger CI Signed-off-by: you06 <you1474600@gmail.com> * trigger CI Signed-off-by: you06 <you1474600@gmail.com> --------- Signed-off-by: you06 <you1474600@gmail.com>
This commit is contained in:
parent
2b0667c65c
commit
c946782286
|
|
@ -1731,15 +1731,30 @@ func (s *RegionRequestSender) onRegionError(
|
|||
// Since we expect that the workload should be stopped during the flashback progress,
|
||||
// if a request meets the FlashbackInProgress error, it should stop retrying immediately
|
||||
// to avoid unnecessary backoff and potential unexpected data status to the user.
|
||||
if regionErr.GetFlashbackInProgress() != nil {
|
||||
if flashbackInProgress := regionErr.GetFlashbackInProgress(); flashbackInProgress != nil {
|
||||
logutil.BgLogger().Debug(
|
||||
"tikv reports `FlashbackInProgress`",
|
||||
zap.Stringer("req", req),
|
||||
zap.Stringer("ctx", ctx),
|
||||
)
|
||||
if req != nil {
|
||||
// if the failure is caused by replica read, we can retry it with leader safely.
|
||||
if ctx.contextPatcher.replicaRead != nil && *ctx.contextPatcher.replicaRead {
|
||||
req.BusyThresholdMs = 0
|
||||
s.replicaSelector.busyThreshold = 0
|
||||
ctx.contextPatcher.replicaRead = nil
|
||||
ctx.contextPatcher.busyThreshold = nil
|
||||
return true, nil
|
||||
}
|
||||
if req.ReplicaReadType.IsFollowerRead() {
|
||||
s.replicaSelector = nil
|
||||
req.ReplicaReadType = kv.ReplicaReadLeader
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, errors.Errorf(
|
||||
"region %d is in flashback progress, FlashbackStartTS is %d",
|
||||
regionErr.GetFlashbackInProgress().GetRegionId(), regionErr.GetFlashbackInProgress().GetFlashbackStartTs(),
|
||||
flashbackInProgress.GetRegionId(), flashbackInProgress.GetFlashbackStartTs(),
|
||||
)
|
||||
}
|
||||
// This error means a second-phase flashback request is sent to a region that is not
|
||||
|
|
|
|||
|
|
@ -1012,3 +1012,43 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() {
|
|||
s.IsType(&tryIdleReplica{}, replicaSelector.state)
|
||||
s.True(*rpcCtx.contextPatcher.replicaRead)
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadWithFlashbackInProgress() {
|
||||
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
|
||||
s.Nil(err)
|
||||
s.NotNil(regionLoc)
|
||||
|
||||
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
||||
// Return serverIsBusy when accesses the leader with busy threshold.
|
||||
if addr == s.cluster.GetStore(s.storeIDs[0]).Address {
|
||||
if req.BusyThresholdMs > 0 {
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
|
||||
ServerIsBusy: &errorpb.ServerIsBusy{EstimatedWaitMs: 500},
|
||||
}}}, nil
|
||||
} else {
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil
|
||||
}
|
||||
}
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
|
||||
FlashbackInProgress: &errorpb.FlashbackInProgress{
|
||||
RegionId: regionLoc.Region.GetID(),
|
||||
},
|
||||
}}}, nil
|
||||
}}
|
||||
|
||||
reqs := []*tikvrpc.Request{
|
||||
tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kvrpcpb.Context{
|
||||
BusyThresholdMs: 50,
|
||||
}),
|
||||
tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadFollower, nil),
|
||||
}
|
||||
|
||||
for _, req := range reqs {
|
||||
bo := retry.NewBackoffer(context.Background(), -1)
|
||||
s.Nil(err)
|
||||
resp, retry, err := s.regionRequestSender.SendReq(bo, req, regionLoc.Region, time.Second)
|
||||
s.Nil(err)
|
||||
s.GreaterOrEqual(retry, 1)
|
||||
s.Equal(resp.Resp.(*kvrpcpb.GetResponse).Value, []byte("value"))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue