mirror of https://github.com/tikv/client-go.git
lock_resolver: avoid pessimistic transactions using resolveLocksForWrite (#213)
Signed-off-by: youjiali1995 <zlwgx1023@gmail.com> Co-authored-by: disksing <i@disksing.com>
This commit is contained in:
parent
814e61ee06
commit
6ca00989dd
|
|
@ -131,7 +131,7 @@ type ErrWriteConflict struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k *ErrWriteConflict) Error() string {
|
func (k *ErrWriteConflict) Error() string {
|
||||||
return k.WriteConflict.String()
|
return fmt.Sprintf("write conflict { %s }", k.WriteConflict.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsErrWriteConflict returns true if it is ErrWriteConflict.
|
// IsErrWriteConflict returns true if it is ErrWriteConflict.
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,7 @@ import (
|
||||||
"github.com/pingcap/tidb/store/mockstore/unistore"
|
"github.com/pingcap/tidb/store/mockstore/unistore"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
tikverr "github.com/tikv/client-go/v2/error"
|
tikverr "github.com/tikv/client-go/v2/error"
|
||||||
|
"github.com/tikv/client-go/v2/kv"
|
||||||
"github.com/tikv/client-go/v2/mockstore/cluster"
|
"github.com/tikv/client-go/v2/mockstore/cluster"
|
||||||
"github.com/tikv/client-go/v2/oracle"
|
"github.com/tikv/client-go/v2/oracle"
|
||||||
"github.com/tikv/client-go/v2/tikv"
|
"github.com/tikv/client-go/v2/tikv"
|
||||||
|
|
@ -210,6 +211,7 @@ func (s *testAsyncCommitSuite) lockKeysWithAsyncCommit(keys, values [][]byte, pr
|
||||||
tpc, err := txnProbe.NewCommitter(0)
|
tpc, err := txnProbe.NewCommitter(0)
|
||||||
s.Nil(err)
|
s.Nil(err)
|
||||||
tpc.SetPrimaryKey(primaryKey)
|
tpc.SetPrimaryKey(primaryKey)
|
||||||
|
tpc.SetUseAsyncCommit()
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
err = tpc.PrewriteAllMutations(ctx)
|
err = tpc.PrewriteAllMutations(ctx)
|
||||||
|
|
@ -571,3 +573,24 @@ func (m *mockResolveClient) SendRequest(ctx context.Context, addr string, req *t
|
||||||
func (m *mockResolveClient) Close() error {
|
func (m *mockResolveClient) Close() error {
|
||||||
return m.inner.Close()
|
return m.inner.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestPessimisticTxnResolveAsyncCommitLock tests that pessimistic transactions resolve non-expired async-commit locks during the prewrite phase.
|
||||||
|
// Pessimistic transactions will resolve locks immediately during the prewrite phase because of the special logic for handling non-pessimistic lock conflict.
|
||||||
|
// However, async-commit locks can't be resolved until they expire. This test covers it.
|
||||||
|
func (s *testAsyncCommitSuite) TestPessimisticTxnResolveAsyncCommitLock() {
|
||||||
|
ctx := context.Background()
|
||||||
|
k := []byte("k")
|
||||||
|
|
||||||
|
txn, err := s.store.Begin()
|
||||||
|
s.Nil(err)
|
||||||
|
txn.SetPessimistic(true)
|
||||||
|
err = txn.LockKeys(ctx, &kv.LockCtx{ForUpdateTS: txn.StartTS()}, []byte("k1"))
|
||||||
|
s.Nil(err)
|
||||||
|
|
||||||
|
// Lock the key with a async-commit lock.
|
||||||
|
s.lockKeysWithAsyncCommit([][]byte{}, [][]byte{}, k, k, false)
|
||||||
|
|
||||||
|
txn.Set(k, k)
|
||||||
|
err = txn.Commit(context.Background())
|
||||||
|
s.Nil(err)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -452,8 +452,10 @@ func (lr *LockResolver) resolveLocks(bo *Backoffer, callerStartTS uint64, locks
|
||||||
return msBeforeTxnExpired.value(), pushed, nil
|
return msBeforeTxnExpired.value(), pushed, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lr *LockResolver) resolveLocksForWrite(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) {
|
func (lr *LockResolver) resolveLocksForWrite(bo *Backoffer, callerStartTS, callerForUpdateTS uint64, locks []*Lock) (int64, error) {
|
||||||
msBeforeTxnExpired, _, err := lr.resolveLocks(bo, callerStartTS, locks, true, false)
|
// The forWrite parameter is only useful for optimistic transactions which can avoid deadlock between large transactions,
|
||||||
|
// so only use forWrite if the callerForUpdateTS is zero.
|
||||||
|
msBeforeTxnExpired, _, err := lr.resolveLocks(bo, callerStartTS, locks, callerForUpdateTS == 0, false)
|
||||||
return msBeforeTxnExpired, err
|
return msBeforeTxnExpired, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -313,7 +313,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
|
||||||
locks = append(locks, lock)
|
locks = append(locks, lock)
|
||||||
}
|
}
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
msBeforeExpired, err := c.store.lockResolver.resolveLocksForWrite(bo, c.startTS, locks)
|
msBeforeExpired, err := c.store.lockResolver.resolveLocksForWrite(bo, c.startTS, c.forUpdateTS, locks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Trace(err)
|
return errors.Trace(err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue