mirror of https://github.com/tikv/client-go.git
Abort if the mismatching lock is from the current transaction when resolving locks (#367)
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
parent
5ae005dac3
commit
6ac6a8daf2
|
|
@ -856,3 +856,40 @@ func (s *testLockSuite) TestStartHeartBeatAfterLockingPrimary() {
|
||||||
|
|
||||||
s.Nil(txn.Rollback())
|
s.Nil(txn.Rollback())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *testLockSuite) TestPrewriteEncountersLargerTsLock() {
|
||||||
|
t1, err := s.store.Begin()
|
||||||
|
s.Nil(err)
|
||||||
|
s.Nil(t1.Set([]byte("k1"), []byte("v1")))
|
||||||
|
s.Nil(t1.Set([]byte("k2"), []byte("v2")))
|
||||||
|
|
||||||
|
// t2 has larger TS. Let t2 prewrite only the secondary lock.
|
||||||
|
t2, err := s.store.Begin()
|
||||||
|
s.Nil(err)
|
||||||
|
s.Nil(t2.Set([]byte("k1"), []byte("v1")))
|
||||||
|
s.Nil(t2.Set([]byte("k2"), []byte("v2")))
|
||||||
|
committer, err := t2.NewCommitter(1)
|
||||||
|
s.Nil(err)
|
||||||
|
committer.SetLockTTL(20000) // set TTL to 20s
|
||||||
|
|
||||||
|
s.Nil(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", "return"))
|
||||||
|
defer failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit")
|
||||||
|
s.Nil(failpoint.Enable("tikvclient/prewritePrimary", "pause"))
|
||||||
|
ch := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
err = committer.PrewriteAllMutations(context.Background())
|
||||||
|
s.Nil(err)
|
||||||
|
ch <- struct{}{}
|
||||||
|
}()
|
||||||
|
time.Sleep(200 * time.Millisecond) // make prewrite earlier than t1 commits
|
||||||
|
|
||||||
|
// Set 1 second timeout. If we still need to wait until t2 expires, we will get a timeout error
|
||||||
|
// instead of write conflict.
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
err = t1.Commit(ctx)
|
||||||
|
s.True(tikverr.IsErrWriteConflict(err))
|
||||||
|
|
||||||
|
s.Nil(failpoint.Disable("tikvclient/prewritePrimary"))
|
||||||
|
<-ch
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -328,7 +328,16 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
|
||||||
}
|
}
|
||||||
logutil.BgLogger().Info("prewrite encounters lock",
|
logutil.BgLogger().Info("prewrite encounters lock",
|
||||||
zap.Uint64("session", c.sessionID),
|
zap.Uint64("session", c.sessionID),
|
||||||
|
zap.Uint64("txnID", c.startTS),
|
||||||
zap.Stringer("lock", lock))
|
zap.Stringer("lock", lock))
|
||||||
|
// If an optimistic transaction encounters a lock with larger TS, this transaction will certainly
|
||||||
|
// fail due to a WriteConflict error. So we can construct and return an error here early.
|
||||||
|
// Pessimistic transactions don't need such an optimization. If this key needs a pessimistic lock,
|
||||||
|
// TiKV will return a PessimisticLockNotFound error directly if it encounters a different lock. Otherwise,
|
||||||
|
// TiKV returns lock.TTL = 0, and we still need to resolve the lock.
|
||||||
|
if lock.TxnID > c.startTS && !c.isPessimistic {
|
||||||
|
return tikverr.NewErrWriteConfictWithArgs(c.startTS, lock.TxnID, 0, lock.Key)
|
||||||
|
}
|
||||||
locks = append(locks, lock)
|
locks = append(locks, lock)
|
||||||
}
|
}
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue