From 97edee854cb660b97ebf81e9d52015d4eb24622d Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Tue, 29 Jun 2021 16:38:15 +0800 Subject: [PATCH] clear RPC error only if prewrite succeeds (#187) Signed-off-by: Yilin Chen --- integration_tests/async_commit_fail_test.go | 48 --------------------- internal/locate/region_request.go | 2 - internal/locate/region_request_test.go | 2 - tikv/2pc.go | 10 ++--- tikv/prewrite.go | 13 ++---- 5 files changed, 7 insertions(+), 68 deletions(-) diff --git a/integration_tests/async_commit_fail_test.go b/integration_tests/async_commit_fail_test.go index 35fd3a86..2dfb635e 100644 --- a/integration_tests/async_commit_fail_test.go +++ b/integration_tests/async_commit_fail_test.go @@ -259,51 +259,3 @@ func (s *testAsyncCommitFailSuite) TestAsyncCommitContextCancelCausingUndetermin s.NotNil(err) s.NotNil(txn.GetCommitter().GetUndeterminedErr()) } - -// TestAsyncCommitRPCErrorThenWriteConflict verifies that the determined failure error overwrites undetermined error. -func (s *testAsyncCommitFailSuite) TestAsyncCommitRPCErrorThenWriteConflict() { - // This test doesn't support tikv mode because it needs setting failpoint in unistore. - if *mockstore.WithTiKV { - return - } - - txn := s.beginAsyncCommit() - err := txn.Set([]byte("a"), []byte("va")) - s.Nil(err) - - s.Nil(failpoint.Enable("tikvclient/rpcPrewriteResult", `1*return("timeout")->return("writeConflict")`)) - defer func() { - s.Nil(failpoint.Disable("tikvclient/rpcPrewriteResult")) - }() - - ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - err = txn.Commit(ctx) - s.NotNil(err) - s.Nil(txn.GetCommitter().GetUndeterminedErr()) -} - -// TestAsyncCommitRPCErrorThenWriteConflictInChild verifies that the determined failure error in a child recursion -// overwrites the undetermined error in the parent. -func (s *testAsyncCommitFailSuite) TestAsyncCommitRPCErrorThenWriteConflictInChild() { - // This test doesn't support tikv mode because it needs setting failpoint in unistore. - if *mockstore.WithTiKV { - return - } - - txn := s.beginAsyncCommit() - err := txn.Set([]byte("a"), []byte("va")) - s.Nil(err) - - s.Nil(failpoint.Enable("tikvclient/rpcPrewriteResult", `1*return("timeout")->return("writeConflict")`)) - s.Nil(failpoint.Enable("tikvclient/forceRecursion", `return`)) - - defer func() { - s.Nil(failpoint.Disable("tikvclient/rpcPrewriteResult")) - s.Nil(failpoint.Disable("tikvclient/forceRecursion")) - }() - - ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - err = txn.Commit(ctx) - s.NotNil(err) - s.Nil(txn.GetCommitter().GetUndeterminedErr()) -} diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 35dba542..3fce0dcc 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -584,8 +584,6 @@ func (s *RegionRequestSender) SendReqCtx( continue } } else { - // Clear the RPC Error since the request is evaluated successfully on a store. - s.rpcError = nil if s.leaderReplicaSelector != nil { s.leaderReplicaSelector.OnSendSuccess() } diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 3e4dc5ea..3427a732 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -163,8 +163,6 @@ func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithStoreRestart() resp, err = s.regionRequestSender.SendReq(s.bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp.Resp) - // The RPC error should be nil since it's evaluated successfully. - s.Nil(s.regionRequestSender.rpcError) } func (s *testRegionRequestToSingleStoreSuite) TestOnSendFailedWithCloseKnownStoreThenUseNewOne() { diff --git a/tikv/2pc.go b/tikv/2pc.go index 43d0da95..b455f957 100644 --- a/tikv/2pc.go +++ b/tikv/2pc.go @@ -120,7 +120,6 @@ type twoPhaseCommitter struct { maxCommitTS uint64 prewriteStarted bool prewriteCancelled uint32 - prewriteFailed uint32 useOnePC uint32 onePCCommitTS uint64 @@ -1126,13 +1125,10 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { start := time.Now() err = c.prewriteMutations(bo, c.mutations) - // Return an undetermined error only if we don't know the transaction fails. - // If it fails due to a write conflict or a already existed unique key, we - // needn't return an undetermined error even if such an error is set. - if atomic.LoadUint32(&c.prewriteFailed) == 1 { - c.setUndeterminedErr(nil) - } if err != nil { + // TODO: Now we return an undetermined error as long as one of the prewrite + // RPCs fails. However, if there are multiple errors and some of the errors + // are not RPC failures, we can return the actual error instead of undetermined. if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil { logutil.Logger(ctx).Error("2PC commit result undetermined", zap.Error(err), diff --git a/tikv/prewrite.go b/tikv/prewrite.go index e33ffafa..de7859d3 100644 --- a/tikv/prewrite.go +++ b/tikv/prewrite.go @@ -223,9 +223,6 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff if err != nil { return errors.Trace(err) } - if _, err := util.EvalFailpoint("forceRecursion"); err == nil { - same = false - } if same { continue } @@ -239,6 +236,9 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff prewriteResp := resp.Resp.(*kvrpcpb.PrewriteResponse) keyErrs := prewriteResp.GetErrors() if len(keyErrs) == 0 { + // Clear the RPC Error since the request is evaluated successfully. + sender.SetRPCError(nil) + if batch.isPrimary { // After writing the primary key, if the size of the transaction is larger than 32M, // start the ttlManager. The ttlManager will be closed in tikvTxn.Commit(). @@ -299,17 +299,12 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff // Check already exists error if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil { e := &tikverr.ErrKeyExist{AlreadyExist: alreadyExist} - err = c.extractKeyExistsErr(e) - if err != nil { - atomic.StoreUint32(&c.prewriteFailed, 1) - } - return err + return c.extractKeyExistsErr(e) } // Extract lock from key error lock, err1 := extractLockFromKeyErr(keyErr) if err1 != nil { - atomic.StoreUint32(&c.prewriteFailed, 1) return errors.Trace(err1) } logutil.BgLogger().Info("prewrite encounters lock",