mirror of https://github.com/tikv/client-go.git
handle mismatch peer id (#935)
Signed-off-by: you06 <you1474600@gmail.com>
This commit is contained in:
parent
44f5025f5a
commit
a0ac170698
|
|
@ -1882,6 +1882,8 @@ func regionErrorToLabel(e *errorpb.Error) string {
|
|||
return "peer_is_witness"
|
||||
} else if isDeadlineExceeded(e) {
|
||||
return "deadline_exceeded"
|
||||
} else if e.GetMismatchPeerId() != nil {
|
||||
return "mismatch_peer_id"
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
|
|
@ -2173,6 +2175,18 @@ func (s *RegionRequestSender) onRegionError(
|
|||
s.replicaSelector.onDeadlineExceeded()
|
||||
}
|
||||
|
||||
if mismatch := regionErr.GetMismatchPeerId(); mismatch != nil {
|
||||
logutil.Logger(bo.GetCtx()).Warn(
|
||||
"tikv reports `MismatchPeerId`, invalidate region cache",
|
||||
zap.Uint64("req peer id", mismatch.GetRequestPeerId()),
|
||||
zap.Uint64("store peer id", mismatch.GetStorePeerId()),
|
||||
)
|
||||
if s.replicaSelector != nil {
|
||||
s.replicaSelector.invalidateRegion()
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
logutil.Logger(bo.GetCtx()).Debug(
|
||||
"tikv reports region failed",
|
||||
zap.Stringer("regionErr", regionErr),
|
||||
|
|
|
|||
|
|
@ -1376,3 +1376,52 @@ func (s *testRegionRequestToThreeStoresSuite) TestStaleReadFallback2Follower() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToThreeStoresSuite) TestReplicaReadFallbackToLeaderRegionError() {
|
||||
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
|
||||
s.Nil(err)
|
||||
s.NotNil(regionLoc)
|
||||
|
||||
s.regionRequestSender.client = &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, errors.New("timeout")
|
||||
default:
|
||||
}
|
||||
// Return `mismatch peer id` when accesses the leader.
|
||||
if addr == s.cluster.GetStore(s.storeIDs[0]).Address {
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
|
||||
MismatchPeerId: &errorpb.MismatchPeerId{
|
||||
RequestPeerId: 1,
|
||||
StorePeerId: 2,
|
||||
},
|
||||
}}}, nil
|
||||
}
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{
|
||||
DataIsNotReady: &errorpb.DataIsNotReady{},
|
||||
}}}, nil
|
||||
}}
|
||||
|
||||
region := s.cache.getRegionByIDFromCache(regionLoc.Region.GetID())
|
||||
s.True(region.isValid())
|
||||
|
||||
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}, kv.ReplicaReadLeader, nil)
|
||||
req.ReadReplicaScope = oracle.GlobalTxnScope
|
||||
req.TxnScope = oracle.GlobalTxnScope
|
||||
req.EnableStaleRead()
|
||||
req.ReplicaReadType = kv.ReplicaReadFollower
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
bo := retry.NewBackoffer(ctx, -1)
|
||||
s.Nil(err)
|
||||
resp, _, _, err := s.regionRequestSender.SendReqCtx(bo, req, regionLoc.Region, time.Second, tikvrpc.TiKV)
|
||||
s.Nil(err)
|
||||
regionErr, err := resp.GetRegionError()
|
||||
s.Nil(err)
|
||||
s.Equal(regionErrorToLabel(regionErr), "mismatch_peer_id")
|
||||
// return non-epoch-not-match region error and the upper layer can auto retry.
|
||||
s.Nil(regionErr.GetEpochNotMatch())
|
||||
// after region error returned, the region should be invalidated.
|
||||
s.False(region.isValid())
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue