From 6ca00989ddb4ede5dc8c218394415d040aa96465 Mon Sep 17 00:00:00 2001 From: Lei Zhao Date: Tue, 6 Jul 2021 12:11:21 +0800 Subject: [PATCH] lock_resolver: avoid pessimistic transactions using resolveLocksForWrite (#213) Signed-off-by: youjiali1995 Co-authored-by: disksing --- error/error.go | 2 +- integration_tests/async_commit_test.go | 23 +++++++++++++++++++++++ tikv/lock_resolver.go | 6 ++++-- tikv/prewrite.go | 2 +- 4 files changed, 29 insertions(+), 4 deletions(-) diff --git a/error/error.go b/error/error.go index bd78a9fe..ccf837f0 100644 --- a/error/error.go +++ b/error/error.go @@ -131,7 +131,7 @@ type ErrWriteConflict struct { } func (k *ErrWriteConflict) Error() string { - return k.WriteConflict.String() + return fmt.Sprintf("write conflict { %s }", k.WriteConflict.String()) } // IsErrWriteConflict returns true if it is ErrWriteConflict. diff --git a/integration_tests/async_commit_test.go b/integration_tests/async_commit_test.go index 73fdf713..50474e4f 100644 --- a/integration_tests/async_commit_test.go +++ b/integration_tests/async_commit_test.go @@ -46,6 +46,7 @@ import ( "github.com/pingcap/tidb/store/mockstore/unistore" "github.com/stretchr/testify/suite" tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/mockstore/cluster" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" @@ -210,6 +211,7 @@ func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(keys, values [][]byte, pr tpc, err := txnProbe.NewCommitter(0) s.Nil(err) tpc.SetPrimaryKey(primaryKey) + tpc.SetUseAsyncCommit() ctx := context.Background() err = tpc.PrewriteAllMutations(ctx) @@ -571,3 +573,24 @@ func (m *mockResolveClient) SendRequest(ctx context.Context, addr string, req *t func (m *mockResolveClient) Close() error { return m.inner.Close() } + +// TestPessimisticTxnResolveAsyncCommitLock tests that pessimistic transactions resolve non-expired async-commit locks during the prewrite phase. +// Pessimistic transactions will resolve locks immediately during the prewrite phase because of the special logic for handling non-pessimistic lock conflict. +// However, async-commit locks can't be resolved until they expire. This test covers it. +func (s *testAsyncCommitSuite) TestPessimisticTxnResolveAsyncCommitLock() { + ctx := context.Background() + k := []byte("k") + + txn, err := s.store.Begin() + s.Nil(err) + txn.SetPessimistic(true) + err = txn.LockKeys(ctx, &kv.LockCtx{ForUpdateTS: txn.StartTS()}, []byte("k1")) + s.Nil(err) + + // Lock the key with a async-commit lock. + s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, k, k, false) + + txn.Set(k, k) + err = txn.Commit(context.Background()) + s.Nil(err) +} diff --git a/tikv/lock_resolver.go b/tikv/lock_resolver.go index 4297e88d..9af10521 100644 --- a/tikv/lock_resolver.go +++ b/tikv/lock_resolver.go @@ -452,8 +452,10 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks return msBeforeTxnExpired.value(), pushed, nil } -func (lr *LockResolver) resolveLocksForWrite(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) { - msBeforeTxnExpired, _, err := lr.resolveLocks(bo, callerStartTS, locks, true, false) +func (lr *LockResolver) resolveLocksForWrite(bo *Backoffer, callerStartTS, callerForUpdateTS uint64, locks []*Lock) (int64, error) { + // The forWrite parameter is only useful for optimistic transactions which can avoid deadlock between large transactions, + // so only use forWrite if the callerForUpdateTS is zero. + msBeforeTxnExpired, _, err := lr.resolveLocks(bo, callerStartTS, locks, callerForUpdateTS == 0, false) return msBeforeTxnExpired, err } diff --git a/tikv/prewrite.go b/tikv/prewrite.go index 4ca818cc..04fba5ea 100644 --- a/tikv/prewrite.go +++ b/tikv/prewrite.go @@ -313,7 +313,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff locks = append(locks, lock) } start := time.Now() - msBeforeExpired, err := c.store.lockResolver.resolveLocksForWrite(bo, c.startTS, locks) + msBeforeExpired, err := c.store.lockResolver.resolveLocksForWrite(bo, c.startTS, c.forUpdateTS, locks) if err != nil { return errors.Trace(err) }