Return assertion failed error less prior to other kinds of errors (#448)

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
This commit is contained in:
MyonKeminta 2022-03-11 12:36:19 +08:00 committed by GitHub
parent cd7e7681c2
commit 5042c6f2aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 108 additions and 12 deletions

View File

@ -221,3 +221,66 @@ func (s *testAssertionSuite) TestFastAssertion() {
s.testAssertionImpl(prefix+"b", true, false, kvrpcpb.AssertionLevel_Fast, true)
s.testAssertionImpl(prefix+"c", true, true, kvrpcpb.AssertionLevel_Fast, true)
}
func (s *testAssertionSuite) TestAssertionErrorLessPriorToOtherError() {
s.NoError(failpoint.Enable("tikvclient/shortPessimisticLockTTL", "return"))
defer func() {
s.NoError(failpoint.Disable("tikvclient/shortPessimisticLockTTL"))
}()
testOnce := func(lockKey []byte, nonLockKey []byte, noBatch bool, delayPrimary bool, delaySecondary bool) {
if noBatch {
s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", "return"))
defer func() {
s.NoError(failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit"))
}()
}
ctx := context.Background()
// Transaction 1 locks the keys.
tx, err := s.store.Begin()
s.NoError(err)
tx.SetPessimistic(true)
tx.SetAssertionLevel(kvrpcpb.AssertionLevel_Strict)
s.NoError(tx.LockKeysWithWaitTime(ctx, 1, lockKey))
s.NoError(tx.Set(lockKey, []byte("rv1")))
s.NoError(tx.Set(nonLockKey, []byte("iv1")))
tx.GetMemBuffer().UpdateFlags(lockKey, kv.SetAssertNotExist)
tx.GetMemBuffer().UpdateFlags(nonLockKey, kv.SetAssertNotExist)
tx.GetCommitter().CloseTTLManager()
// Transaction 2 resolves the lock of transaction 1 and then
tx2, err := s.store.Begin()
s.NoError(err)
tx2.SetPessimistic(true)
tx2.SetAssertionLevel(kvrpcpb.AssertionLevel_Strict)
s.NoError(tx.LockKeysWithWaitTime(ctx, 1000, lockKey))
s.NoError(tx2.Set(lockKey, []byte("rv2")))
s.NoError(tx2.Set(nonLockKey, []byte("iv2")))
tx2.GetMemBuffer().UpdateFlags(lockKey, kv.SetAssertNotExist)
tx2.GetMemBuffer().UpdateFlags(nonLockKey, kv.SetAssertNotExist)
s.NoError(tx2.Commit(ctx))
if delayPrimary {
s.NoError(failpoint.Enable("tikvclient/prewritePrimary", "sleep(200)"))
defer func() {
s.NoError(failpoint.Disable("tikvclient/prewritePrimary"))
}()
}
if delaySecondary {
s.NoError(failpoint.Enable("tikvclient/prewriteSecondary", "sleep(200)"))
defer func() {
s.NoError(failpoint.Disable("tikvclient/prewriteSecondary"))
}()
}
err = tx.Commit(ctx)
s.NotNil(err)
s.IsType(&tikverr.ErrAssertionFailed{}, errors.Cause(err))
}
testOnce([]byte("kr1"), []byte("ki1"), false, false, false)
testOnce([]byte("kr2"), []byte("ki2"), true, false, false)
testOnce([]byte("kr3"), []byte("ki3"), true, true, false)
testOnce([]byte("kr4"), []byte("ki4"), true, false, true)
}

View File

@ -228,6 +228,15 @@ func (h kvHandler) handleKvPrewrite(req *kvrpcpb.PrewriteRequest) *kvrpcpb.Prewr
}
}
errs := h.mvccStore.Prewrite(req)
for i, e := range errs {
if e != nil {
if _, isLocked := errors.Cause(e).(*ErrLocked); !isLocked {
// Keep only one error if it's not a KeyIsLocked error.
errs = errs[i : i+1]
break
}
}
}
return &kvrpcpb.PrewriteResponse{
Errors: convertToKeyErrors(errs),
}

View File

@ -2026,11 +2026,17 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error {
}
// For prewrite, stop sending other requests after receiving first error.
// However, AssertionFailed error is less prior to other kinds of errors. If we meet an AssertionFailed error,
// we hold it to see if there's other error, and return it if there are no other kinds of errors.
// This is because when there are transaction conflicts in pessimistic transaction, it's possible that the
// non-pessimistic-locked keys may report false-positive assertion failure.
// See also: https://github.com/tikv/tikv/issues/12113
var cancel context.CancelFunc
if _, ok := batchExe.action.(actionPrewrite); ok {
batchExe.backoffer, cancel = batchExe.backoffer.Fork()
defer cancel()
}
var assertionFailedErr error = nil
// concurrently do the work for each batch.
ch := make(chan error, len(batches))
exitCh := make(chan struct{})
@ -2043,6 +2049,11 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error {
zap.Stringer("action type", batchExe.action),
zap.Error(e),
zap.Uint64("txnStartTS", batchExe.committer.startTS))
if _, isAssertionFailed := errors.Cause(e).(*tikverr.ErrAssertionFailed); isAssertionFailed {
if assertionFailedErr == nil {
assertionFailedErr = e
}
} else {
// Cancel other requests and return the first error.
if cancel != nil {
logutil.Logger(batchExe.backoffer.GetCtx()).Debug("2PC doActionOnBatch to cancel other actions",
@ -2057,12 +2068,25 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error {
}
}
}
}
close(exitCh)
if batchExe.tokenWaitDuration > 0 {
metrics.TiKVTokenWaitDuration.Observe(float64(batchExe.tokenWaitDuration.Nanoseconds()))
}
if err != nil {
if assertionFailedErr != nil {
logutil.Logger(batchExe.backoffer.GetCtx()).Debug("2PC doActionOnBatch met assertion failed error but ignored due to other kinds of error",
zap.Uint64("session", batchExe.committer.sessionID),
zap.Stringer("actoin type", batchExe.action),
zap.Uint64("txnStartTS", batchExe.committer.startTS),
zap.Uint64("forUpdateTS", batchExe.committer.forUpdateTS),
zap.NamedError("assertionFailed", assertionFailedErr),
zap.Error(err))
}
return err
}
return assertionFailedErr
}
func (c *twoPhaseCommitter) setDetail(d *util.CommitDetails) {
atomic.StorePointer(&c.detail, unsafe.Pointer(d))