From 5042c6f2aaa64ec055b8c9943f77f9662a8bcc36 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Fri, 11 Mar 2022 12:36:19 +0800 Subject: [PATCH] Return assertion failed error less prior to other kinds of errors (#448) Signed-off-by: MyonKeminta --- integration_tests/assertion_test.go | 63 +++++++++++++++++++++++++++++ internal/mockstore/mocktikv/rpc.go | 9 +++++ txnkv/transaction/2pc.go | 48 ++++++++++++++++------ 3 files changed, 108 insertions(+), 12 deletions(-) diff --git a/integration_tests/assertion_test.go b/integration_tests/assertion_test.go index 8a3b718f..c7e9147d 100644 --- a/integration_tests/assertion_test.go +++ b/integration_tests/assertion_test.go @@ -221,3 +221,66 @@ func (s *testAssertionSuite) TestFastAssertion() { s.testAssertionImpl(prefix+"b", true, false, kvrpcpb.AssertionLevel_Fast, true) s.testAssertionImpl(prefix+"c", true, true, kvrpcpb.AssertionLevel_Fast, true) } + +func (s *testAssertionSuite) TestAssertionErrorLessPriorToOtherError() { + s.NoError(failpoint.Enable("tikvclient/shortPessimisticLockTTL", "return")) + defer func() { + s.NoError(failpoint.Disable("tikvclient/shortPessimisticLockTTL")) + }() + + testOnce := func(lockKey []byte, nonLockKey []byte, noBatch bool, delayPrimary bool, delaySecondary bool) { + if noBatch { + s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", "return")) + defer func() { + s.NoError(failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit")) + }() + } + + ctx := context.Background() + // Transaction 1 locks the keys. + tx, err := s.store.Begin() + s.NoError(err) + tx.SetPessimistic(true) + tx.SetAssertionLevel(kvrpcpb.AssertionLevel_Strict) + s.NoError(tx.LockKeysWithWaitTime(ctx, 1, lockKey)) + s.NoError(tx.Set(lockKey, []byte("rv1"))) + s.NoError(tx.Set(nonLockKey, []byte("iv1"))) + tx.GetMemBuffer().UpdateFlags(lockKey, kv.SetAssertNotExist) + tx.GetMemBuffer().UpdateFlags(nonLockKey, kv.SetAssertNotExist) + tx.GetCommitter().CloseTTLManager() + + // Transaction 2 resolves the lock of transaction 1 and then + tx2, err := s.store.Begin() + s.NoError(err) + tx2.SetPessimistic(true) + tx2.SetAssertionLevel(kvrpcpb.AssertionLevel_Strict) + s.NoError(tx.LockKeysWithWaitTime(ctx, 1000, lockKey)) + s.NoError(tx2.Set(lockKey, []byte("rv2"))) + s.NoError(tx2.Set(nonLockKey, []byte("iv2"))) + tx2.GetMemBuffer().UpdateFlags(lockKey, kv.SetAssertNotExist) + tx2.GetMemBuffer().UpdateFlags(nonLockKey, kv.SetAssertNotExist) + s.NoError(tx2.Commit(ctx)) + + if delayPrimary { + s.NoError(failpoint.Enable("tikvclient/prewritePrimary", "sleep(200)")) + defer func() { + s.NoError(failpoint.Disable("tikvclient/prewritePrimary")) + }() + } + if delaySecondary { + s.NoError(failpoint.Enable("tikvclient/prewriteSecondary", "sleep(200)")) + defer func() { + s.NoError(failpoint.Disable("tikvclient/prewriteSecondary")) + }() + } + + err = tx.Commit(ctx) + s.NotNil(err) + s.IsType(&tikverr.ErrAssertionFailed{}, errors.Cause(err)) + } + + testOnce([]byte("kr1"), []byte("ki1"), false, false, false) + testOnce([]byte("kr2"), []byte("ki2"), true, false, false) + testOnce([]byte("kr3"), []byte("ki3"), true, true, false) + testOnce([]byte("kr4"), []byte("ki4"), true, false, true) +} diff --git a/internal/mockstore/mocktikv/rpc.go b/internal/mockstore/mocktikv/rpc.go index 86908e8c..5be81eb0 100644 --- a/internal/mockstore/mocktikv/rpc.go +++ b/internal/mockstore/mocktikv/rpc.go @@ -228,6 +228,15 @@ func (h kvHandler) handleKvPrewrite(req *kvrpcpb.PrewriteRequest) *kvrpcpb.Prewr } } errs := h.mvccStore.Prewrite(req) + for i, e := range errs { + if e != nil { + if _, isLocked := errors.Cause(e).(*ErrLocked); !isLocked { + // Keep only one error if it's not a KeyIsLocked error. + errs = errs[i : i+1] + break + } + } + } return &kvrpcpb.PrewriteResponse{ Errors: convertToKeyErrors(errs), } diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 4e35c188..347f6bd6 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -2026,11 +2026,17 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error { } // For prewrite, stop sending other requests after receiving first error. + // However, AssertionFailed error is less prior to other kinds of errors. If we meet an AssertionFailed error, + // we hold it to see if there's other error, and return it if there are no other kinds of errors. + // This is because when there are transaction conflicts in pessimistic transaction, it's possible that the + // non-pessimistic-locked keys may report false-positive assertion failure. + // See also: https://github.com/tikv/tikv/issues/12113 var cancel context.CancelFunc if _, ok := batchExe.action.(actionPrewrite); ok { batchExe.backoffer, cancel = batchExe.backoffer.Fork() defer cancel() } + var assertionFailedErr error = nil // concurrently do the work for each batch. ch := make(chan error, len(batches)) exitCh := make(chan struct{}) @@ -2043,17 +2049,23 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error { zap.Stringer("action type", batchExe.action), zap.Error(e), zap.Uint64("txnStartTS", batchExe.committer.startTS)) - // Cancel other requests and return the first error. - if cancel != nil { - logutil.Logger(batchExe.backoffer.GetCtx()).Debug("2PC doActionOnBatch to cancel other actions", - zap.Uint64("session", batchExe.committer.sessionID), - zap.Stringer("action type", batchExe.action), - zap.Uint64("txnStartTS", batchExe.committer.startTS)) - atomic.StoreUint32(&batchExe.committer.prewriteCancelled, 1) - cancel() - } - if err == nil { - err = e + if _, isAssertionFailed := errors.Cause(e).(*tikverr.ErrAssertionFailed); isAssertionFailed { + if assertionFailedErr == nil { + assertionFailedErr = e + } + } else { + // Cancel other requests and return the first error. + if cancel != nil { + logutil.Logger(batchExe.backoffer.GetCtx()).Debug("2PC doActionOnBatch to cancel other actions", + zap.Uint64("session", batchExe.committer.sessionID), + zap.Stringer("action type", batchExe.action), + zap.Uint64("txnStartTS", batchExe.committer.startTS)) + atomic.StoreUint32(&batchExe.committer.prewriteCancelled, 1) + cancel() + } + if err == nil { + err = e + } } } } @@ -2061,7 +2073,19 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error { if batchExe.tokenWaitDuration > 0 { metrics.TiKVTokenWaitDuration.Observe(float64(batchExe.tokenWaitDuration.Nanoseconds())) } - return err + if err != nil { + if assertionFailedErr != nil { + logutil.Logger(batchExe.backoffer.GetCtx()).Debug("2PC doActionOnBatch met assertion failed error but ignored due to other kinds of error", + zap.Uint64("session", batchExe.committer.sessionID), + zap.Stringer("actoin type", batchExe.action), + zap.Uint64("txnStartTS", batchExe.committer.startTS), + zap.Uint64("forUpdateTS", batchExe.committer.forUpdateTS), + zap.NamedError("assertionFailed", assertionFailedErr), + zap.Error(err)) + } + return err + } + return assertionFailedErr } func (c *twoPhaseCommitter) setDetail(d *util.CommitDetails) {