diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 7fd6a868..c877dfad 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -1731,15 +1731,30 @@ func (s *RegionRequestSender) onRegionError( // Since we expect that the workload should be stopped during the flashback progress, // if a request meets the FlashbackInProgress error, it should stop retrying immediately // to avoid unnecessary backoff and potential unexpected data status to the user. - if regionErr.GetFlashbackInProgress() != nil { + if flashbackInProgress := regionErr.GetFlashbackInProgress(); flashbackInProgress != nil { logutil.BgLogger().Debug( "tikv reports `FlashbackInProgress`", zap.Stringer("req", req), zap.Stringer("ctx", ctx), ) + if req != nil { + // if the failure is caused by replica read, we can retry it with leader safely. + if ctx.contextPatcher.replicaRead != nil && *ctx.contextPatcher.replicaRead { + req.BusyThresholdMs = 0 + s.replicaSelector.busyThreshold = 0 + ctx.contextPatcher.replicaRead = nil + ctx.contextPatcher.busyThreshold = nil + return true, nil + } + if req.ReplicaReadType.IsFollowerRead() { + s.replicaSelector = nil + req.ReplicaReadType = kv.ReplicaReadLeader + return true, nil + } + } return false, errors.Errorf( "region %d is in flashback progress, FlashbackStartTS is %d", - regionErr.GetFlashbackInProgress().GetRegionId(), regionErr.GetFlashbackInProgress().GetFlashbackStartTs(), + flashbackInProgress.GetRegionId(), flashbackInProgress.GetFlashbackStartTs(), ) } // This error means a second-phase flashback request is sent to a region that is not diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index c8fe0f61..e05c2cac 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -1012,3 +1012,43 @@ func (s *testRegionRequestToThreeStoresSuite) TestLoadBasedReplicaRead() { s.IsType(&tryIdleReplica{}, replicaSelector.state) s.True(*rpcCtx.contextPatcher.replicaRead) } + +func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadWithFlashbackInProgress() { + regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID) + s.Nil(err) + s.NotNil(regionLoc) + + s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + // Return serverIsBusy when accesses the leader with busy threshold. + if addr == s.cluster.GetStore(s.storeIDs[0]).Address { + if req.BusyThresholdMs > 0 { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + ServerIsBusy: &errorpb.ServerIsBusy{EstimatedWaitMs: 500}, + }}}, nil + } else { + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{Value: []byte("value")}}, nil + } + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{ + FlashbackInProgress: &errorpb.FlashbackInProgress{ + RegionId: regionLoc.Region.GetID(), + }, + }}}, nil + }} + + reqs := []*tikvrpc.Request{ + tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kvrpcpb.Context{ + BusyThresholdMs: 50, + }), + tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadFollower, nil), + } + + for _, req := range reqs { + bo := retry.NewBackoffer(context.Background(), -1) + s.Nil(err) + resp, retry, err := s.regionRequestSender.SendReq(bo, req, regionLoc.Region, time.Second) + s.Nil(err) + s.GreaterOrEqual(retry, 1) + s.Equal(resp.Resp.(*kvrpcpb.GetResponse).Value, []byte("value")) + } +}