mirror of https://github.com/tikv/client-go.git
add newly inserted flag to optimize delete your write (#378)
Signed-off-by: cfzjywxk <lsswxrxr@163.com>
This commit is contained in:
parent
a3f1c41ac1
commit
e0bf24b24d
|
|
@ -1550,3 +1550,51 @@ func (s *testCommitterSuite) TestCommitMultipleRegions() {
|
||||||
}
|
}
|
||||||
s.mustCommit(m)
|
s.mustCommit(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *testCommitterSuite) TestNewlyInsertedMemDBFlag() {
|
||||||
|
ctx := context.Background()
|
||||||
|
txn := s.begin()
|
||||||
|
memdb := txn.GetMemBuffer()
|
||||||
|
k0 := []byte("k0")
|
||||||
|
v0 := []byte("v0")
|
||||||
|
k1 := []byte("k1")
|
||||||
|
k2 := []byte("k2")
|
||||||
|
v1 := []byte("v1")
|
||||||
|
v2 := []byte("v2")
|
||||||
|
|
||||||
|
// Insert after delete, the newly inserted flag should not exist.
|
||||||
|
err := txn.Delete(k0)
|
||||||
|
s.Nil(err)
|
||||||
|
err = txn.Set(k0, v0)
|
||||||
|
s.Nil(err)
|
||||||
|
flags, err := memdb.GetFlags(k0)
|
||||||
|
s.Nil(err)
|
||||||
|
s.False(flags.HasNewlyInserted())
|
||||||
|
|
||||||
|
// Lock then insert, the newly inserted flag should exist.
|
||||||
|
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
|
||||||
|
err = txn.LockKeys(context.Background(), lockCtx, k1)
|
||||||
|
s.Nil(err)
|
||||||
|
err = txn.GetMemBuffer().SetWithFlags(k1, v1, kv.SetNewlyInserted)
|
||||||
|
s.Nil(err)
|
||||||
|
flags, err = memdb.GetFlags(k1)
|
||||||
|
s.Nil(err)
|
||||||
|
s.True(flags.HasNewlyInserted())
|
||||||
|
|
||||||
|
// Lock then delete and insert, the newly inserted flag should not exist.
|
||||||
|
err = txn.LockKeys(ctx, lockCtx, k2)
|
||||||
|
s.Nil(err)
|
||||||
|
err = txn.Delete(k2)
|
||||||
|
s.Nil(err)
|
||||||
|
flags, err = memdb.GetFlags(k2)
|
||||||
|
s.Nil(err)
|
||||||
|
s.False(flags.HasNewlyInserted())
|
||||||
|
err = txn.Set(k2, v2)
|
||||||
|
s.Nil(err)
|
||||||
|
flags, err = memdb.GetFlags(k2)
|
||||||
|
s.Nil(err)
|
||||||
|
s.False(flags.HasNewlyInserted())
|
||||||
|
|
||||||
|
err = txn.Commit(ctx)
|
||||||
|
s.Nil(err)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,7 @@ const (
|
||||||
flagPrewriteOnly
|
flagPrewriteOnly
|
||||||
flagIgnoredIn2PC
|
flagIgnoredIn2PC
|
||||||
flagReadable
|
flagReadable
|
||||||
|
flagNewlyInserted
|
||||||
|
|
||||||
persistentFlags = flagKeyLocked | flagKeyLockedValExist
|
persistentFlags = flagKeyLocked | flagKeyLockedValExist
|
||||||
)
|
)
|
||||||
|
|
@ -98,6 +99,11 @@ func (f KeyFlags) AndPersistent() KeyFlags {
|
||||||
return f & persistentFlags
|
return f & persistentFlags
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasNewlyInserted returns whether the in-transaction key is generated by an "insert" operation.
|
||||||
|
func (f KeyFlags) HasNewlyInserted() bool {
|
||||||
|
return f&flagNewlyInserted != 0
|
||||||
|
}
|
||||||
|
|
||||||
// ApplyFlagsOps applys flagspos to origin.
|
// ApplyFlagsOps applys flagspos to origin.
|
||||||
func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
|
func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
|
||||||
for _, op := range ops {
|
for _, op := range ops {
|
||||||
|
|
@ -126,6 +132,8 @@ func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
|
||||||
origin |= flagIgnoredIn2PC
|
origin |= flagIgnoredIn2PC
|
||||||
case SetReadable:
|
case SetReadable:
|
||||||
origin |= flagReadable
|
origin |= flagReadable
|
||||||
|
case SetNewlyInserted:
|
||||||
|
origin |= flagNewlyInserted
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return origin
|
return origin
|
||||||
|
|
@ -160,4 +168,6 @@ const (
|
||||||
SetIgnoredIn2PC
|
SetIgnoredIn2PC
|
||||||
// SetReadable marks the key is readable by in-transaction read.
|
// SetReadable marks the key is readable by in-transaction read.
|
||||||
SetReadable
|
SetReadable
|
||||||
|
// SetNewlyInserted marks the key is newly inserted with value length greater than zero.
|
||||||
|
SetNewlyInserted
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -439,10 +439,21 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
|
||||||
checkCnt++
|
checkCnt++
|
||||||
memBuf.UpdateFlags(key, kv.SetPrewriteOnly)
|
memBuf.UpdateFlags(key, kv.SetPrewriteOnly)
|
||||||
} else {
|
} else {
|
||||||
// normal delete keys in optimistic txn can be delete without not exists checking
|
if flags.HasNewlyInserted() {
|
||||||
// delete-your-writes keys in pessimistic txn can ensure must be no exists so can directly delete them
|
// The delete-your-write keys in pessimistic transactions, only lock needed keys and skip
|
||||||
op = kvrpcpb.Op_Del
|
// other deletes for example the secondary index delete.
|
||||||
delCnt++
|
// Here if `tidb_constraint_check_in_place` is enabled and the transaction is in optimistic mode,
|
||||||
|
// the logic is same as the pessimistic mode.
|
||||||
|
if flags.HasLocked() {
|
||||||
|
op = kvrpcpb.Op_Lock
|
||||||
|
lockCnt++
|
||||||
|
} else {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
op = kvrpcpb.Op_Del
|
||||||
|
delCnt++
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue