Support setting assertions and receiving assertion errors for Prewrite requests (#311)

* support set assertion in 2pc mutation

Signed-off-by: lysu <sulifx@gmail.com>
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Receive assertion fail errors from TiKV

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add test log

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Remove verbose log

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* update kvproto

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add metrics counter for assertions

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Address some comments

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Try to optimize assertion for pessimistic transactions

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Fix panic on optimistic transactions

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add InitCheckExistence method for LockCtx

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Support assertion level

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Check assertion level before doing assertion on client side

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Test bitoperations of menBUfferMutations.Push

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add test for assertion in tikv

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Support run assertion test with unistore

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Fix test

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Fix license

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Fix test

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* export DeleteKey

Signed-off-by: ekexium <ekexium@gmail.com>

* Renaming DeleteKey

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* fix build

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Address comments

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Avoid panic when running with old version of TiKV; Add schema check on fast assertion

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add test for fast assertion

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add test for pessimistic lock check existence

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Test assertion takes no effect if amending is enabled

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add HasAssertUnknown function

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add comments

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Cleanup locks after assertion fail

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* update tidb dependency

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Fix panic in TestIllegalTSO

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Address comments

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add comments

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Update dependency to tidb

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Fix test

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

Co-authored-by: lysu <sulifx@gmail.com>
Co-authored-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Co-authored-by: ekexium <ekexium@gmail.com>
This commit is contained in:
MyonKeminta 2022-02-10 15:31:32 +08:00 committed by GitHub
parent cea0c4c6e6
commit b5eb031edd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 723 additions and 86 deletions

View File

@ -231,6 +231,15 @@ func (e *ErrTokenLimit) Error() string {
return fmt.Sprintf("Store token is up to the limit, store id = %d.", e.StoreID)
}
// ErrAssertionFailed is the error that assertion on data failed.
type ErrAssertionFailed struct {
*kvrpcpb.AssertionFailed
}
func (e *ErrAssertionFailed) Error() string {
return fmt.Sprintf("assertion failed { %s }", e.AssertionFailed.String())
}
// ExtractKeyErr extracts a KeyError.
func ExtractKeyErr(keyErr *kvrpcpb.KeyError) error {
if val, err := util.EvalFailpoint("mockRetryableErrorResp"); err == nil {
@ -248,6 +257,10 @@ func ExtractKeyErr(keyErr *kvrpcpb.KeyError) error {
return errors.WithStack(&ErrRetryable{Retryable: keyErr.Retryable})
}
if keyErr.AssertionFailed != nil {
return &ErrAssertionFailed{AssertionFailed: keyErr.AssertionFailed}
}
if keyErr.Abort != "" {
err := errors.Errorf("tikv aborts txn: %s", keyErr.GetAbort())
logutil.BgLogger().Warn("2PC failed", zap.Error(err))

View File

@ -748,6 +748,39 @@ func (s *testCommitterSuite) TestPessimisticLockReturnValues() {
s.Equal(lockCtx.Values[string(key2)].Value, key2)
}
func (s *testCommitterSuite) TestPessimisticLockCheckExistence() {
key := []byte("key")
key2 := []byte("key2")
txn := s.begin()
s.Nil(txn.Set(key, key))
s.Nil(txn.Commit(context.Background()))
txn = s.begin()
txn.SetPessimistic(true)
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
lockCtx.InitCheckExistence(2)
s.Nil(txn.LockKeys(context.Background(), lockCtx, key, key2))
s.Len(lockCtx.Values, 2)
s.Empty(lockCtx.Values[string(key)].Value)
s.True(lockCtx.Values[string(key)].Exists)
s.Empty(lockCtx.Values[string(key2)].Value)
s.False(lockCtx.Values[string(key2)].Exists)
s.Nil(txn.Rollback())
txn = s.begin()
txn.SetPessimistic(true)
lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
lockCtx.InitCheckExistence(2)
lockCtx.InitReturnValues(2)
s.Nil(txn.LockKeys(context.Background(), lockCtx, key, key2))
s.Len(lockCtx.Values, 2)
s.Equal(lockCtx.Values[string(key)].Value, key)
s.True(lockCtx.Values[string(key)].Exists)
s.Empty(lockCtx.Values[string(key2)].Value)
s.False(lockCtx.Values[string(key2)].Exists)
s.Nil(txn.Rollback())
}
// TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time.
func (s *testCommitterSuite) TestElapsedTTL() {
key := []byte("key")
@ -1217,7 +1250,7 @@ func (s *testCommitterSuite) TestResolveMixed() {
// stop txn ttl manager and remove primary key, make the other keys left behind
committer.CloseTTLManager()
muts := transaction.NewPlainMutations(1)
muts.Push(kvrpcpb.Op_Lock, pk, nil, true)
muts.Push(kvrpcpb.Op_Lock, pk, nil, true, false, false)
err = committer.PessimisticRollbackMutations(context.Background(), &muts)
s.Nil(err)
@ -1600,3 +1633,56 @@ func (s *testCommitterSuite) TestNewlyInsertedMemDBFlag() {
err = txn.Commit(ctx)
s.Nil(err)
}
func (s *testCommitterSuite) TestFlagsInMemBufferMutations() {
// Get a MemDB object from a transaction object.
db := s.begin().GetMemBuffer()
// A helper for iterating all cases.
forEachCase := func(f func(op kvrpcpb.Op, key []byte, value []byte, index int, isPessimisticLock, assertExist, assertNotExist bool)) {
keyIndex := 0
for _, op := range []kvrpcpb.Op{kvrpcpb.Op_Put, kvrpcpb.Op_Del, kvrpcpb.Op_CheckNotExists} {
for flags := 0; flags < (1 << 3); flags++ {
key := []byte(fmt.Sprintf("k%05d", keyIndex))
value := []byte(fmt.Sprintf("v%05d", keyIndex))
// `flag` Iterates all combinations of flags in binary.
isPessimisticLock := (flags & 0x4) != 0
assertExist := (flags & 0x2) != 0
assertNotExist := (flags & 0x1) != 0
f(op, key, value, keyIndex, isPessimisticLock, assertExist, assertNotExist)
keyIndex++
}
}
}
// Put some keys to the MemDB
forEachCase(func(op kvrpcpb.Op, key []byte, value []byte, i int, isPessimisticLock, assertExist, assertNotExist bool) {
if op == kvrpcpb.Op_Put {
err := db.Set(key, value)
s.Nil(err)
} else if op == kvrpcpb.Op_Del {
err := db.Delete(key)
s.Nil(err)
} else {
db.UpdateFlags(key, kv.SetPresumeKeyNotExists)
}
})
// Create memBufferMutations object and add keys with flags to it.
mutations := transaction.NewMemBufferMutationsProbe(db.Len(), db)
forEachCase(func(op kvrpcpb.Op, key []byte, value []byte, i int, isPessimisticLock, assertExist, assertNotExist bool) {
handle := db.IterWithFlags(key, nil).Handle()
mutations.Push(op, isPessimisticLock, assertExist, assertNotExist, handle)
})
forEachCase(func(op kvrpcpb.Op, key []byte, value []byte, i int, isPessimisticLock, assertExist, assertNotExist bool) {
s.Equal(key, mutations.GetKey(i))
s.Equal(op, mutations.GetOp(i))
s.Equal(isPessimisticLock, mutations.IsPessimisticLock(i))
s.Equal(assertExist, mutations.IsAssertExists(i))
s.Equal(assertNotExist, mutations.IsAssertNotExist(i))
})
}

View File

@ -0,0 +1,223 @@
// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tikv_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/stretchr/testify/suite"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/transaction"
)
func TestAssertion(t *testing.T) {
suite.Run(t, new(testAssertionSuite))
}
type testAssertionSuite struct {
suite.Suite
cluster testutils.Cluster
store tikv.StoreProbe
}
func (s *testAssertionSuite) SetupTest() {
s.store = tikv.StoreProbe{KVStore: NewTestStore(s.T())}
}
func (s *testAssertionSuite) TearDownTest() {
s.store.Close()
}
type mockAmender struct{}
func (*mockAmender) AmendTxn(ctx context.Context, startInfoSchema transaction.SchemaVer, change *transaction.RelatedSchemaChange, mutations transaction.CommitterMutations) (transaction.CommitterMutations, error) {
return nil, nil
}
func (s *testAssertionSuite) testAssertionImpl(keyPrefix string, pessimistic bool, lockKeys bool, assertionLevel kvrpcpb.AssertionLevel, enableAmend bool) {
if assertionLevel != kvrpcpb.AssertionLevel_Strict {
s.Nil(failpoint.Enable("tikvclient/assertionSkipCheckFromPrewrite", "return"))
defer func() {
s.Nil(failpoint.Disable("tikvclient/assertionSkipCheckFromPrewrite"))
}()
}
if assertionLevel != kvrpcpb.AssertionLevel_Fast {
s.Nil(failpoint.Enable("tikvclient/assertionSkipCheckFromLock", "return"))
defer func() {
s.Nil(failpoint.Disable("tikvclient/assertionSkipCheckFromLock"))
}()
}
// Compose the key
k := func(i byte) []byte {
return append([]byte(keyPrefix), 'k', i)
}
// Prepare some data. Make k1, k3, k7 exist.
prepareTxn, err := s.store.Begin()
s.Nil(err)
err = prepareTxn.Set(k(1), []byte("v1"))
s.Nil(err)
err = prepareTxn.Set(k(3), []byte("v3"))
s.Nil(err)
err = prepareTxn.Set(k(7), []byte("v7"))
s.Nil(err)
err = prepareTxn.Commit(context.Background())
s.Nil(err)
prepareStartTS := prepareTxn.GetCommitter().GetStartTS()
prepareCommitTS := prepareTxn.GetCommitTS()
// A helper to perform a complete transaction. When multiple keys are passed in, assertion will be set on only
// the last key.
doTxn := func(lastAssertion kv.FlagsOp, keys ...[]byte) (uint64, error) {
txn, err := s.store.Begin()
s.Nil(err)
txn.SetAssertionLevel(assertionLevel)
txn.SetPessimistic(pessimistic)
if enableAmend {
txn.SetSchemaAmender(&mockAmender{})
}
if lockKeys {
lockCtx := kv.NewLockCtx(txn.StartTS(), 1000, time.Now())
lockCtx.InitCheckExistence(1)
err = txn.LockKeys(context.Background(), lockCtx, keys...)
s.Nil(err)
} else if pessimistic {
// Since we don't want to lock the keys to be tested, set another key as the primary.
err = txn.LockKeysWithWaitTime(context.Background(), 10000, []byte("primary"))
s.Nil(err)
}
for _, key := range keys {
err = txn.Set(key, append([]byte{'v'}, key...))
s.Nil(err)
}
txn.GetMemBuffer().UpdateFlags(keys[len(keys)-1], lastAssertion)
err = txn.Commit(context.Background())
startTS := txn.GetCommitter().GetStartTS()
return startTS, err
}
checkAssertionFailError := func(err error, startTS uint64, key []byte, assertion kvrpcpb.Assertion, existingStartTS uint64, existingCommitTS uint64) {
assertionFailed, ok := errors.Cause(err).(*tikverr.ErrAssertionFailed)
s.True(ok)
s.Equal(startTS, assertionFailed.StartTs)
s.Equal(key, assertionFailed.Key)
s.Equal(assertion, assertionFailed.Assertion)
s.Equal(existingStartTS, assertionFailed.ExistingStartTs)
s.Equal(existingCommitTS, assertionFailed.ExistingCommitTs)
}
if assertionLevel == kvrpcpb.AssertionLevel_Strict && !enableAmend {
// Single key.
_, err = doTxn(kv.SetAssertExist, k(1))
s.Nil(err)
_, err = doTxn(kv.SetAssertNotExist, k(2))
s.Nil(err)
startTS, err := doTxn(kv.SetAssertNotExist, k(3))
s.NotNil(err)
checkAssertionFailError(err, startTS, k(3), kvrpcpb.Assertion_NotExist, prepareStartTS, prepareCommitTS)
startTS, err = doTxn(kv.SetAssertExist, k(4))
s.NotNil(err)
checkAssertionFailError(err, startTS, k(4), kvrpcpb.Assertion_Exist, 0, 0)
// Multiple keys
startTS, err = doTxn(kv.SetAssertNotExist, k(5), k(6), k(7))
s.NotNil(err)
checkAssertionFailError(err, startTS, k(7), kvrpcpb.Assertion_NotExist, prepareStartTS, prepareCommitTS)
startTS, err = doTxn(kv.SetAssertExist, k(8), k(9), k(10))
s.NotNil(err)
checkAssertionFailError(err, startTS, k(10), kvrpcpb.Assertion_Exist, 0, 0)
} else if assertionLevel == kvrpcpb.AssertionLevel_Fast && pessimistic && lockKeys && !enableAmend {
// Different from STRICT level, the already-existing version's startTS and commitTS cannot be fetched.
// Single key.
_, err = doTxn(kv.SetAssertExist, k(1))
s.Nil(err)
_, err = doTxn(kv.SetAssertNotExist, k(2))
s.Nil(err)
startTS, err := doTxn(kv.SetAssertNotExist, k(3))
s.NotNil(err)
checkAssertionFailError(err, startTS, k(3), kvrpcpb.Assertion_NotExist, 0, 0)
startTS, err = doTxn(kv.SetAssertExist, k(4))
s.NotNil(err)
checkAssertionFailError(err, startTS, k(4), kvrpcpb.Assertion_Exist, 0, 0)
// Multiple keys
startTS, err = doTxn(kv.SetAssertNotExist, k(5), k(6), k(7))
s.NotNil(err)
checkAssertionFailError(err, startTS, k(7), kvrpcpb.Assertion_NotExist, 0, 0)
startTS, err = doTxn(kv.SetAssertExist, k(8), k(9), k(10))
s.NotNil(err)
checkAssertionFailError(err, startTS, k(10), kvrpcpb.Assertion_Exist, 0, 0)
} else {
// Nothing will be detected.
// Single key.
_, err = doTxn(kv.SetAssertExist, k(1))
s.Nil(err)
_, err = doTxn(kv.SetAssertNotExist, k(2))
s.Nil(err)
_, err := doTxn(kv.SetAssertNotExist, k(3))
s.Nil(err)
_, err = doTxn(kv.SetAssertExist, k(4))
s.Nil(err)
// Multiple keys
_, err = doTxn(kv.SetAssertNotExist, k(5), k(6), k(7))
s.Nil(err)
_, err = doTxn(kv.SetAssertExist, k(8), k(9), k(10))
s.Nil(err)
}
}
func (s *testAssertionSuite) TestPrewriteAssertion() {
// When the test cases runs with TiKV, the TiKV cluster can be reused, thus there may be deleted versions caused by
// previous tests. This test case may meet different behavior if there are deleted versions. To avoid it, compose a
// key prefix with a timestamp to ensure the keys to be unique.
ts, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.Nil(err)
prefix := fmt.Sprintf("test-prewrite-assertion-%d-", ts)
s.testAssertionImpl(prefix+"a", false, false, kvrpcpb.AssertionLevel_Strict, false)
s.testAssertionImpl(prefix+"b", true, false, kvrpcpb.AssertionLevel_Strict, false)
s.testAssertionImpl(prefix+"c", true, true, kvrpcpb.AssertionLevel_Strict, false)
s.testAssertionImpl(prefix+"a", false, false, kvrpcpb.AssertionLevel_Strict, true)
s.testAssertionImpl(prefix+"b", true, false, kvrpcpb.AssertionLevel_Strict, true)
s.testAssertionImpl(prefix+"c", true, true, kvrpcpb.AssertionLevel_Strict, true)
}
func (s *testAssertionSuite) TestFastAssertion() {
// When the test cases runs with TiKV, the TiKV cluster can be reused, thus there may be deleted versions caused by
// previous tests. This test case may meet different behavior if there are deleted versions. To avoid it, compose a
// key prefix with a timestamp to ensure the keys to be unique.
ts, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.Nil(err)
prefix := fmt.Sprintf("test-fast-assertion-%d-", ts)
s.testAssertionImpl(prefix+"a", false, false, kvrpcpb.AssertionLevel_Fast, false)
s.testAssertionImpl(prefix+"b", true, false, kvrpcpb.AssertionLevel_Fast, false)
s.testAssertionImpl(prefix+"c", true, true, kvrpcpb.AssertionLevel_Fast, false)
s.testAssertionImpl(prefix+"a", false, false, kvrpcpb.AssertionLevel_Fast, true)
s.testAssertionImpl(prefix+"b", true, false, kvrpcpb.AssertionLevel_Fast, true)
s.testAssertionImpl(prefix+"c", true, true, kvrpcpb.AssertionLevel_Fast, true)
}

View File

@ -4,10 +4,11 @@ go 1.16
require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20211224055123-d1a140660c39
github.com/pingcap/tidb v1.1.0-beta.0.20220208061135-294a094d9055
github.com/pingcap/tidb/parser v0.0.0-20220208061135-294a094d9055 // indirect
github.com/pingcap/tidb v1.1.0-beta.0.20220209090336-37c9dc9e5206
github.com/pingcap/tidb/parser v0.0.0-20220209083136-a850b044c134 // indirect
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
github.com/tikv/client-go/v2 v2.0.0

View File

@ -668,15 +668,15 @@ github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d/go.mod h1:7j18ezaWTao2LHOyMlsc2Dg1vW+mDY9dEbPzVyOlaeM=
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqrZ/z3eJS4j3Db8wMphEm1q10lY/TM=
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39cGiv2vwunZkaFA917vVkqDTGSPbbV7z4Oops=
github.com/pingcap/tidb v1.1.0-beta.0.20220208061135-294a094d9055 h1:T5B3nfeO98NRFkgacnXiQt1lZd7CYBfDRqBzhkTSF6w=
github.com/pingcap/tidb v1.1.0-beta.0.20220208061135-294a094d9055/go.mod h1:1ATxD29hPy7OXNIxyZWb9PqA12F5YjmYoO9uh/655OY=
github.com/pingcap/tidb v1.1.0-beta.0.20220209090336-37c9dc9e5206 h1:xioqMwQm+D9iSQjG2V+fEkZ++TCKd+mr4bf9xF7sXk8=
github.com/pingcap/tidb v1.1.0-beta.0.20220209090336-37c9dc9e5206/go.mod h1:1ATxD29hPy7OXNIxyZWb9PqA12F5YjmYoO9uh/655OY=
github.com/pingcap/tidb-dashboard v0.0.0-20211206031355-bcc43a01d537/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-dashboard v0.0.0-20220117082709-e8076b5c79ba/go.mod h1:4hk/3owVGWdvI9Kx6yCqqvM1T5PVgwyQNyMQxD3rwfc=
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible h1:c7+izmker91NkjkZ6FgTlmD4k1A5FLOAq+li6Ki2/GY=
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e/go.mod h1:e1MGCA9Sg3T8jid8PKAEq5eYVuMMCq4n8gJ+Kqp4Plg=
github.com/pingcap/tidb/parser v0.0.0-20220208061135-294a094d9055 h1:ZiFxY5X03o8bzt7BH/5vCLS/eK+pjv7uJBWXFBHtFco=
github.com/pingcap/tidb/parser v0.0.0-20220208061135-294a094d9055/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI=
github.com/pingcap/tidb/parser v0.0.0-20220209083136-a850b044c134 h1:G/poRrk+Uqe1vwNjL0QMbgesxkBVucP+9/WxD7gm/OI=
github.com/pingcap/tidb/parser v0.0.0-20220209083136-a850b044c134/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI=
github.com/pingcap/tipb v0.0.0-20220107024056-3b91949a18a7 h1:DHU4vw0o15qdKsf7d/Pyhun4YtX8FwoDQxG0plPByUg=
github.com/pingcap/tipb v0.0.0-20220107024056-3b91949a18a7/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98=

View File

@ -501,8 +501,10 @@ type lockCtx struct {
ttl uint64
minCommitTs uint64
returnValues bool
values [][]byte
returnValues bool
checkExistence bool
values [][]byte
keyNotFound []bool
}
// PessimisticLock writes the pessimistic lock.
@ -512,12 +514,13 @@ func (mvcc *MVCCLevelDB) PessimisticLock(req *kvrpcpb.PessimisticLockRequest) *k
defer mvcc.mu.Unlock()
mutations := req.Mutations
lCtx := &lockCtx{
startTS: req.StartVersion,
forUpdateTS: req.ForUpdateTs,
primary: req.PrimaryLock,
ttl: req.LockTtl,
minCommitTs: req.MinCommitTs,
returnValues: req.ReturnValues,
startTS: req.StartVersion,
forUpdateTS: req.ForUpdateTs,
primary: req.PrimaryLock,
ttl: req.LockTtl,
minCommitTs: req.MinCommitTs,
returnValues: req.ReturnValues,
checkExistence: req.CheckExistence,
}
lockWaitTime := req.WaitTimeout
@ -550,6 +553,9 @@ func (mvcc *MVCCLevelDB) PessimisticLock(req *kvrpcpb.PessimisticLockRequest) *k
}
if req.ReturnValues {
resp.Values = lCtx.values
resp.NotFounds = lCtx.keyNotFound
} else if req.CheckExistence {
resp.NotFounds = lCtx.keyNotFound
}
return resp
}
@ -587,12 +593,15 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation
// For pessimisticLockMutation, check the correspond rollback record, there may be rollbackLock
// operation between startTS and forUpdateTS
val, err := checkConflictValue(iter, mutation, forUpdateTS, startTS, true)
val, err := checkConflictValue(iter, mutation, forUpdateTS, startTS, true, kvrpcpb.AssertionLevel_Off)
if err != nil {
return err
}
if lctx.returnValues {
lctx.values = append(lctx.values, val)
lctx.keyNotFound = append(lctx.keyNotFound, len(val) == 0)
} else if lctx.checkExistence {
lctx.keyNotFound = append(lctx.keyNotFound, len(val) == 0)
}
lock := mvccLock{
@ -700,7 +709,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
continue
}
isPessimisticLock := len(req.IsPessimisticLock) > 0 && req.IsPessimisticLock[i]
err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, txnSize, isPessimisticLock, minCommitTS)
err = prewriteMutation(mvcc.db, batch, m, startTS, primary, ttl, txnSize, isPessimisticLock, minCommitTS, req.AssertionLevel)
errs = append(errs, err)
if err != nil {
anyError = true
@ -716,7 +725,7 @@ func (mvcc *MVCCLevelDB) Prewrite(req *kvrpcpb.PrewriteRequest) []error {
return errs
}
func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, startTS uint64, getVal bool) ([]byte, error) {
func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, startTS uint64, getVal bool, assertionLevel kvrpcpb.AssertionLevel) ([]byte, error) {
dec := &valueDecoder{
expectKey: m.Key,
}
@ -725,6 +734,10 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
return nil, err
}
if !ok {
if m.Assertion == kvrpcpb.Assertion_Exist && assertionLevel != kvrpcpb.AssertionLevel_Off && m.Op != kvrpcpb.Op_PessimisticLock {
logutil.BgLogger().Error("assertion failed!!! non-exist for must exist key", zap.Stringer("mutation", m))
return nil, errors.Errorf("assertion failed!!! non-exist for must exist key, mutation: %v", m.String())
}
return nil, nil
}
@ -739,7 +752,8 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
}
needGetVal := getVal
needCheckAssertion := m.Assertion == kvrpcpb.Assertion_NotExist
needCheckShouldNotExistForPessimisticLock := m.Assertion == kvrpcpb.Assertion_NotExist && m.Op == kvrpcpb.Op_PessimisticLock
needCheckAssertionForPrewerite := m.Assertion != kvrpcpb.Assertion_None && m.Op != kvrpcpb.Op_PessimisticLock && assertionLevel != kvrpcpb.AssertionLevel_Off
needCheckRollback := true
var retVal []byte
// do the check or get operations within one iteration to make CI faster
@ -762,30 +776,40 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
needCheckRollback = false
}
}
if needCheckAssertion {
if dec.value.valueType == typePut || dec.value.valueType == typeLock {
if m.Op == kvrpcpb.Op_PessimisticLock {
return nil, &ErrKeyAlreadyExist{
Key: m.Key,
}
if dec.value.valueType == typePut || dec.value.valueType == typeLock {
if needCheckShouldNotExistForPessimisticLock {
return nil, &ErrKeyAlreadyExist{
Key: m.Key,
}
} else if dec.value.valueType == typeDelete {
needCheckAssertion = false
}
if needCheckAssertionForPrewerite && m.Assertion == kvrpcpb.Assertion_NotExist {
logutil.BgLogger().Error("assertion failed!!! exist for must non-exist key", zap.Stringer("mutation", m))
// TODO: Use specific error type
return nil, errors.Errorf("assertion failed!!! exist for must non-exist key, mutation: %v", m.String())
}
} else if dec.value.valueType == typeDelete {
needCheckShouldNotExistForPessimisticLock = false
}
if needGetVal {
if dec.value.valueType == typeDelete || dec.value.valueType == typePut {
retVal = dec.value.value
needGetVal = false
}
}
if !needCheckAssertion && !needGetVal && !needCheckRollback {
if !needCheckShouldNotExistForPessimisticLock && !needGetVal && !needCheckRollback {
break
}
ok, err = dec.Decode(iter)
if err != nil {
return nil, err
}
if m.Assertion == kvrpcpb.Assertion_Exist && !ok && assertionLevel != kvrpcpb.AssertionLevel_Off {
logutil.BgLogger().Error("assertion failed!!! non-exist for must exist key", zap.Stringer("mutation", m))
// TODO: Use specific error type
return nil, errors.Errorf("assertion failed!!! non-exist for must exist key, mutation: %v", m.String())
}
}
if getVal {
return retVal, nil
@ -796,7 +820,8 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64,
func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
mutation *kvrpcpb.Mutation, startTS uint64,
primary []byte, ttl uint64, txnSize uint64,
isPessimisticLock bool, minCommitTS uint64) error {
isPessimisticLock bool, minCommitTS uint64,
assertionLevel kvrpcpb.AssertionLevel) error {
startKey := mvccEncode(mutation.Key, lockVer)
iter := newIterator(db, &util.Range{
Start: startKey,
@ -836,7 +861,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch,
if isPessimisticLock {
return ErrAbort("pessimistic lock not found")
}
_, err = checkConflictValue(iter, mutation, startTS, startTS, false)
_, err = checkConflictValue(iter, mutation, startTS, startTS, false, assertionLevel)
if err != nil {
return err
}

View File

@ -51,9 +51,40 @@ const (
flagReadable
flagNewlyInserted
// The following are assertion related flags.
// There are four choices of the two bits:
// * 0: Assertion is not set and can be set later.
// * flagAssertExists: We assert the key exists.
// * flagAssertNotExists: We assert the key doesn't exist.
// * flagAssertExists | flagAssertNotExists: Assertion cannot be made on this key (unknown).
// Once either (or both) of the two flags is set, we say assertion is set (`HasAssertionFlags` becomes true), and
// it's expected to be unchangeable within the current transaction.
flagAssertExist
flagAssertNotExist
persistentFlags = flagKeyLocked | flagKeyLockedValExist
)
// HasAssertExist returns whether the key need ensure exists in 2pc.
func (f KeyFlags) HasAssertExist() bool {
return f&flagAssertExist != 0 && f&flagAssertNotExist == 0
}
// HasAssertNotExist returns whether the key need ensure non-exists in 2pc.
func (f KeyFlags) HasAssertNotExist() bool {
return f&flagAssertNotExist != 0 && f&flagAssertExist == 0
}
// HasAssertUnknown returns whether the key is marked unable to do any assertion.
func (f KeyFlags) HasAssertUnknown() bool {
return f&flagAssertExist != 0 && f&flagAssertNotExist != 0
}
// HasAssertionFlags returns whether the key's assertion is set.
func (f KeyFlags) HasAssertionFlags() bool {
return f&flagAssertExist != 0 || f&flagAssertNotExist != 0
}
// HasPresumeKeyNotExists returns whether the associated key use lazy check.
func (f KeyFlags) HasPresumeKeyNotExists() bool {
return f&flagPresumeKNE != 0
@ -134,13 +165,25 @@ func ApplyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
origin |= flagReadable
case SetNewlyInserted:
origin |= flagNewlyInserted
case SetAssertExist:
origin &= ^flagAssertNotExist
origin |= flagAssertExist
case SetAssertNotExist:
origin &= ^flagAssertExist
origin |= flagAssertNotExist
case SetAssertUnknown:
origin |= flagAssertNotExist
origin |= flagAssertExist
case SetAssertNone:
origin &= ^flagAssertExist
origin &= ^flagAssertNotExist
}
}
return origin
}
// FlagsOp describes KeyFlags modify operation.
type FlagsOp uint16
type FlagsOp uint32
const (
// SetPresumeKeyNotExists marks the existence of the associated key is checked lazily.
@ -170,4 +213,12 @@ const (
SetReadable
// SetNewlyInserted marks the key is newly inserted with value length greater than zero.
SetNewlyInserted
// SetAssertExist marks the key must exist.
SetAssertExist
// SetAssertNotExist marks the key must not exist.
SetAssertNotExist
// SetAssertUnknown mark the key maybe exists or not exists.
SetAssertUnknown
// SetAssertNone cleans up the key's assert.
SetAssertNone
)

View File

@ -33,6 +33,7 @@ import (
// ReturnedValue pairs the Value and AlreadyLocked flag for PessimisticLock return values result.
type ReturnedValue struct {
Value []byte
Exists bool
AlreadyLocked bool
}
@ -62,6 +63,7 @@ type LockCtx struct {
LockKeysDuration *int64
LockKeysCount *int32
ReturnValues bool
CheckExistence bool
Values map[string]ReturnedValue
ValuesLock sync.Mutex
LockExpired *uint32
@ -93,9 +95,19 @@ func NewLockCtx(forUpdateTS uint64, lockWaitTime int64, waitStartTime time.Time)
}
// InitReturnValues creates the map to store returned value.
func (ctx *LockCtx) InitReturnValues(valueLen int) {
func (ctx *LockCtx) InitReturnValues(capacity int) {
ctx.ReturnValues = true
ctx.Values = make(map[string]ReturnedValue, valueLen)
if ctx.Values == nil {
ctx.Values = make(map[string]ReturnedValue, capacity)
}
}
// InitCheckExistence creates the map to store whether each key exists or not.
func (ctx *LockCtx) InitCheckExistence(capacity int) {
ctx.CheckExistence = true
if ctx.Values == nil {
ctx.Values = make(map[string]ReturnedValue, capacity)
}
}
// GetValueNotLocked returns a value if the key is not already locked.

View File

@ -91,6 +91,7 @@ var (
TiKVSmallReadDuration prometheus.Histogram
TiKVReadThroughput prometheus.Histogram
TiKVUnsafeDestroyRangeFailuresCounterVec *prometheus.CounterVec
TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec
)
// Label constants.
@ -541,6 +542,14 @@ func initMetrics(namespace, subsystem string) {
Help: "Counter of unsafe destroyrange failures",
}, []string{LblType})
TiKVPrewriteAssertionUsageCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "prewrite_assertion_count",
Help: "Counter of assertions used in prewrite requests",
}, []string{LblType})
initShortcuts()
}
@ -606,6 +615,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVSmallReadDuration)
prometheus.MustRegister(TiKVReadThroughput)
prometheus.MustRegister(TiKVUnsafeDestroyRangeFailuresCounterVec)
prometheus.MustRegister(TiKVPrewriteAssertionUsageCounter)
}
// readCounter reads the value of a prometheus.Counter.

View File

@ -123,6 +123,11 @@ var (
BatchRecvHistogramOK prometheus.Observer
BatchRecvHistogramError prometheus.Observer
PrewriteAssertionUsageCounterNone prometheus.Counter
PrewriteAssertionUsageCounterExist prometheus.Counter
PrewriteAssertionUsageCounterNotExist prometheus.Counter
PrewriteAssertionUsageCounterUnknown prometheus.Counter
)
func initShortcuts() {
@ -211,4 +216,9 @@ func initShortcuts() {
BatchRecvHistogramOK = TiKVBatchRecvLatency.WithLabelValues("ok")
BatchRecvHistogramError = TiKVBatchRecvLatency.WithLabelValues("err")
PrewriteAssertionUsageCounterNone = TiKVPrewriteAssertionUsageCounter.WithLabelValues("none")
PrewriteAssertionUsageCounterExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("exist")
PrewriteAssertionUsageCounterNotExist = TiKVPrewriteAssertionUsageCounter.WithLabelValues("not-exist")
PrewriteAssertionUsageCounterUnknown = TiKVPrewriteAssertionUsageCounter.WithLabelValues("unknown")
}

View File

@ -179,6 +179,10 @@ type twoPhaseCommitter struct {
type memBufferMutations struct {
storage *unionstore.MemDB
// The format to put to the UserData of the handles:
// MSB LSB
// [13 bits: Op][1 bit: assertNotExist][1 bit: assertExist][1 bit: isPessimisticLock]
handles []unionstore.MemKeyHandle
}
@ -211,13 +215,21 @@ func (m *memBufferMutations) GetValue(i int) []byte {
}
func (m *memBufferMutations) GetOp(i int) kvrpcpb.Op {
return kvrpcpb.Op(m.handles[i].UserData >> 1)
return kvrpcpb.Op(m.handles[i].UserData >> 3)
}
func (m *memBufferMutations) IsPessimisticLock(i int) bool {
return m.handles[i].UserData&1 != 0
}
func (m *memBufferMutations) IsAssertExists(i int) bool {
return m.handles[i].UserData&(1<<1) != 0
}
func (m *memBufferMutations) IsAssertNotExist(i int) bool {
return m.handles[i].UserData&(1<<2) != 0
}
func (m *memBufferMutations) Slice(from, to int) CommitterMutations {
return &memBufferMutations{
handles: m.handles[from:to],
@ -225,15 +237,50 @@ func (m *memBufferMutations) Slice(from, to int) CommitterMutations {
}
}
func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock bool, handle unionstore.MemKeyHandle) {
aux := uint16(op) << 1
func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, assertNotExist bool, handle unionstore.MemKeyHandle) {
// See comments of `m.handles` field about the format of the user data `aux`.
aux := uint16(op) << 3
if isPessimisticLock {
aux |= 1
}
if assertExist {
aux |= 1 << 1
}
if assertNotExist {
aux |= 1 << 2
}
handle.UserData = aux
m.handles = append(m.handles, handle)
}
// CommitterMutationFlags represents various bit flags of mutations.
type CommitterMutationFlags uint8
const (
// MutationFlagIsPessimisticLock is the flag that marks a mutation needs to be pessimistic-locked.
MutationFlagIsPessimisticLock CommitterMutationFlags = 1 << iota
// MutationFlagIsAssertExists is the flag that marks a mutation needs to be asserted to be existed when prewriting.
MutationFlagIsAssertExists
// MutationFlagIsAssertNotExists is the flag that marks a mutation needs to be asserted to be not-existed when prewriting.
MutationFlagIsAssertNotExists
)
func makeMutationFlags(isPessimisticLock, assertExist, assertNotExist bool) CommitterMutationFlags {
var flags CommitterMutationFlags = 0
if isPessimisticLock {
flags |= MutationFlagIsPessimisticLock
}
if assertExist {
flags |= MutationFlagIsAssertExists
}
if assertNotExist {
flags |= MutationFlagIsAssertNotExists
}
return flags
}
// CommitterMutations contains the mutations to be submitted.
type CommitterMutations interface {
Len() int
@ -243,23 +290,25 @@ type CommitterMutations interface {
GetValue(i int) []byte
IsPessimisticLock(i int) bool
Slice(from, to int) CommitterMutations
IsAssertExists(i int) bool
IsAssertNotExist(i int) bool
}
// PlainMutations contains transaction operations.
type PlainMutations struct {
ops []kvrpcpb.Op
keys [][]byte
values [][]byte
isPessimisticLock []bool
ops []kvrpcpb.Op
keys [][]byte
values [][]byte
flags []CommitterMutationFlags
}
// NewPlainMutations creates a PlainMutations object with sizeHint reserved.
func NewPlainMutations(sizeHint int) PlainMutations {
return PlainMutations{
ops: make([]kvrpcpb.Op, 0, sizeHint),
keys: make([][]byte, 0, sizeHint),
values: make([][]byte, 0, sizeHint),
isPessimisticLock: make([]bool, 0, sizeHint),
ops: make([]kvrpcpb.Op, 0, sizeHint),
keys: make([][]byte, 0, sizeHint),
values: make([][]byte, 0, sizeHint),
flags: make([]CommitterMutationFlags, 0, sizeHint),
}
}
@ -273,18 +322,18 @@ func (c *PlainMutations) Slice(from, to int) CommitterMutations {
if c.values != nil {
res.values = c.values[from:to]
}
if c.isPessimisticLock != nil {
res.isPessimisticLock = c.isPessimisticLock[from:to]
if c.flags != nil {
res.flags = c.flags[from:to]
}
return &res
}
// Push another mutation into mutations.
func (c *PlainMutations) Push(op kvrpcpb.Op, key []byte, value []byte, isPessimisticLock bool) {
func (c *PlainMutations) Push(op kvrpcpb.Op, key []byte, value []byte, isPessimisticLock, assertExist, assertNotExist bool) {
c.ops = append(c.ops, op)
c.keys = append(c.keys, key)
c.values = append(c.values, value)
c.isPessimisticLock = append(c.isPessimisticLock, isPessimisticLock)
c.flags = append(c.flags, makeMutationFlags(isPessimisticLock, assertExist, assertNotExist))
}
// Len returns the count of mutations.
@ -312,9 +361,19 @@ func (c *PlainMutations) GetValues() [][]byte {
return c.values
}
// GetPessimisticFlags returns the key pessimistic flags.
func (c *PlainMutations) GetPessimisticFlags() []bool {
return c.isPessimisticLock
// GetFlags returns the flags on the mutations.
func (c *PlainMutations) GetFlags() []CommitterMutationFlags {
return c.flags
}
// IsAssertExists returns the key assertExist flag at index.
func (c *PlainMutations) IsAssertExists(i int) bool {
return c.flags[i]&MutationFlagIsAssertExists != 0
}
// IsAssertNotExist returns the key assertNotExist flag at index.
func (c *PlainMutations) IsAssertNotExist(i int) bool {
return c.flags[i]&MutationFlagIsAssertNotExists != 0
}
// GetOp returns the key op at index.
@ -332,15 +391,15 @@ func (c *PlainMutations) GetValue(i int) []byte {
// IsPessimisticLock returns the key pessimistic flag at index.
func (c *PlainMutations) IsPessimisticLock(i int) bool {
return c.isPessimisticLock[i]
return c.flags[i]&MutationFlagIsPessimisticLock != 0
}
// PlainMutation represents a single transaction operation.
type PlainMutation struct {
KeyOp kvrpcpb.Op
Key []byte
Value []byte
IsPessimisticLock bool
KeyOp kvrpcpb.Op
Key []byte
Value []byte
Flags CommitterMutationFlags
}
// MergeMutations append input mutations into current mutations.
@ -348,7 +407,7 @@ func (c *PlainMutations) MergeMutations(mutations PlainMutations) {
c.ops = append(c.ops, mutations.ops...)
c.keys = append(c.keys, mutations.keys...)
c.values = append(c.values, mutations.values...)
c.isPessimisticLock = append(c.isPessimisticLock, mutations.isPessimisticLock...)
c.flags = append(c.flags, mutations.flags...)
}
// AppendMutation merges a single Mutation into the current mutations.
@ -356,7 +415,7 @@ func (c *PlainMutations) AppendMutation(mutation PlainMutation) {
c.ops = append(c.ops, mutation.KeyOp)
c.keys = append(c.keys, mutation.Key)
c.values = append(c.values, mutation.Value)
c.isPessimisticLock = append(c.isPessimisticLock, mutation.IsPessimisticLock)
c.flags = append(c.flags, mutation.Flags)
}
// newTwoPhaseCommitter creates a twoPhaseCommitter.
@ -386,7 +445,53 @@ type KVFilter interface {
IsUnnecessaryKeyValue(key, value []byte, flags kv.KeyFlags) (bool, error)
}
func (c *twoPhaseCommitter) initKeysAndMutations() error {
func (c *twoPhaseCommitter) checkAssertionByPessimisticLockResults(ctx context.Context, key []byte, flags kv.KeyFlags, mustExist, mustNotExist bool) error {
var assertionFailed *tikverr.ErrAssertionFailed
if flags.HasLockedValueExists() && mustNotExist {
assertionFailed = &tikverr.ErrAssertionFailed{
AssertionFailed: &kvrpcpb.AssertionFailed{
StartTs: c.startTS,
Key: key,
Assertion: kvrpcpb.Assertion_NotExist,
ExistingStartTs: 0,
ExistingCommitTs: 0,
},
}
} else if !flags.HasLockedValueExists() && mustExist {
assertionFailed = &tikverr.ErrAssertionFailed{
AssertionFailed: &kvrpcpb.AssertionFailed{
StartTs: c.startTS,
Key: key,
Assertion: kvrpcpb.Assertion_Exist,
ExistingStartTs: 0,
ExistingCommitTs: 0,
},
}
}
if assertionFailed != nil {
return c.checkSchemaOnAssertionFail(ctx, assertionFailed)
}
return nil
}
func (c *twoPhaseCommitter) checkSchemaOnAssertionFail(ctx context.Context, assertionFailed *tikverr.ErrAssertionFailed) error {
// If the schema has changed, it might be a false-positive. In this case we should return schema changed, which
// is a usual case, instead of assertion failed.
ts, err := c.store.GetTimestampWithRetry(retry.NewBackofferWithVars(ctx, TsoMaxBackoff, c.txn.vars), c.txn.GetScope())
if err != nil {
return err
}
_, _, err = c.checkSchemaValid(ctx, ts, c.txn.schemaVer, false)
if err != nil {
return err
}
return assertionFailed
}
func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
var size, putCnt, delCnt, lockCnt, checkCnt int
txn := c.txn
@ -397,6 +502,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
filter := txn.kvFilter
var err error
var assertionError error
for it := memBuf.IterWithFlags(nil, nil); it.Valid(); err = it.Next() {
_ = err
key := it.Key()
@ -470,9 +576,42 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
if flags.HasLocked() {
isPessimistic = c.isPessimistic
}
c.mutations.Push(op, isPessimistic, it.Handle())
mustExist, mustNotExist, hasAssertUnknown := flags.HasAssertExist(), flags.HasAssertNotExist(), flags.HasAssertUnknown()
if c.txn.schemaAmender != nil || c.txn.assertionLevel == kvrpcpb.AssertionLevel_Off {
mustExist, mustNotExist, hasAssertUnknown = false, false, false
}
c.mutations.Push(op, isPessimistic, mustExist, mustNotExist, it.Handle())
size += len(key) + len(value)
if c.txn.assertionLevel != kvrpcpb.AssertionLevel_Off {
// Check mutations for pessimistic-locked keys with the read results of pessimistic lock requests.
// This can be disabled by failpoint.
skipCheckFromLock := false
if _, err := util.EvalFailpoint("assertionSkipCheckFromLock"); err == nil {
skipCheckFromLock = true
}
if isPessimistic && !skipCheckFromLock {
err1 := c.checkAssertionByPessimisticLockResults(ctx, key, flags, mustExist, mustNotExist)
// Do not exit immediately here. To rollback the pessimistic locks (if any), we need to finish
// collecting all the keys.
// Keep only the first assertion error.
if err1 != nil && assertionError == nil {
assertionError = errors.WithStack(err1)
}
}
// Update metrics
if mustExist {
metrics.PrewriteAssertionUsageCounterExist.Inc()
} else if mustNotExist {
metrics.PrewriteAssertionUsageCounterNotExist.Inc()
} else if hasAssertUnknown {
metrics.PrewriteAssertionUsageCounterUnknown.Inc()
} else {
metrics.PrewriteAssertionUsageCounterNone.Inc()
}
}
if len(c.primaryKey) == 0 && op != kvrpcpb.Op_CheckNotExists {
c.primaryKey = key
}
@ -517,6 +656,11 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
c.resourceGroupTag = txn.resourceGroupTag
c.resourceGroupTagger = txn.resourceGroupTagger
c.setDetail(commitDetail)
if assertionError != nil {
return assertionError
}
return nil
}
@ -1263,6 +1407,10 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
err = c.prewriteMutations(bo, c.mutations)
if err != nil {
if assertionFailed, ok := errors.Cause(err).(*tikverr.ErrAssertionFailed); ok {
err = c.checkSchemaOnAssertionFail(ctx, assertionFailed)
}
// TODO: Now we return an undetermined error as long as one of the prewrite
// RPCs fails. However, if there are multiple errors and some of the errors
// are not RPC failures, we can return the actual error instead of undetermined.
@ -1523,7 +1671,8 @@ func (c *twoPhaseCommitter) amendPessimisticLock(ctx context.Context, addMutatio
keysNeedToLock := NewPlainMutations(addMutations.Len())
for i := 0; i < addMutations.Len(); i++ {
if addMutations.IsPessimisticLock(i) {
keysNeedToLock.Push(addMutations.GetOp(i), addMutations.GetKey(i), addMutations.GetValue(i), addMutations.IsPessimisticLock(i))
keysNeedToLock.Push(addMutations.GetOp(i), addMutations.GetKey(i), addMutations.GetValue(i), addMutations.IsPessimisticLock(i),
addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i))
}
}
// For unique index amend, we need to pessimistic lock the generated new index keys first.
@ -1607,7 +1756,7 @@ func (c *twoPhaseCommitter) tryAmendTxn(ctx context.Context, startInfoSchema Sch
return false, err
}
handle := c.txn.GetMemBuffer().IterWithFlags(key, nil).Handle()
c.mutations.Push(op, addMutations.IsPessimisticLock(i), handle)
c.mutations.Push(op, addMutations.IsPessimisticLock(i), addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i), handle)
}
}
return false, nil
@ -1651,7 +1800,7 @@ func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64
logutil.Logger(ctx).Warn("schemaLeaseChecker is not set for this transaction",
zap.Uint64("sessionID", c.sessionID),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", checkTS))
zap.Uint64("checkTS", checkTS))
}
return nil, false, nil
}
@ -1940,7 +2089,8 @@ func (c *twoPhaseCommitter) mutationsOfKeys(keys [][]byte) CommitterMutations {
for i := 0; i < c.mutations.Len(); i++ {
for _, key := range keys {
if bytes.Equal(c.mutations.GetKey(i), key) {
res.Push(c.mutations.GetOp(i), c.mutations.GetKey(i), c.mutations.GetValue(i), c.mutations.IsPessimisticLock(i))
res.Push(c.mutations.GetOp(i), c.mutations.GetKey(i), c.mutations.GetValue(i), c.mutations.IsPessimisticLock(i),
c.mutations.IsAssertExists(i), c.mutations.IsAssertNotExist(i))
break
}
}

View File

@ -108,15 +108,16 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
zap.Uint64("txnStartTS", c.startTS), zap.Strings("keys", keys))
}
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
LockTtl: ttl,
IsFirstLock: c.isFirstLock,
WaitTimeout: action.LockWaitTime(),
ReturnValues: action.ReturnValues,
MinCommitTs: c.forUpdateTS + 1,
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
LockTtl: ttl,
IsFirstLock: c.isFirstLock,
WaitTimeout: action.LockWaitTime(),
ReturnValues: action.ReturnValues,
CheckExistence: action.CheckExistence,
MinCommitTs: c.forUpdateTS + 1,
}, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: action.LockCtx.ResourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())})
if action.LockCtx.ResourceGroupTag == nil && action.LockCtx.ResourceGroupTagger != nil {
@ -182,10 +183,23 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
c.run(c, action.LockCtx)
}
if action.ReturnValues {
// Handle the case that the TiKV's version is too old and doesn't support `CheckExistence`.
// If `CheckExistence` is set, `ReturnValues` is not set and `CheckExistence` is not supported, skip
// retrieving value totally (indicated by `skipRetrievingValue`) to avoid panicking.
skipRetrievingValue := !action.ReturnValues && action.CheckExistence && len(lockResp.NotFounds) == 0
if (action.ReturnValues || action.CheckExistence) && !skipRetrievingValue {
action.ValuesLock.Lock()
for i, mutation := range mutations {
action.Values[string(mutation.Key)] = kv.ReturnedValue{Value: lockResp.Values[i]}
var value []byte
if action.ReturnValues {
value = lockResp.Values[i]
}
var exists = !lockResp.NotFounds[i]
action.Values[string(mutation.Key)] = kv.ReturnedValue{
Value: value,
Exists: exists,
}
}
action.ValuesLock.Unlock()
}

View File

@ -75,10 +75,18 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
mutations := make([]*kvrpcpb.Mutation, m.Len())
isPessimisticLock := make([]bool, m.Len())
for i := 0; i < m.Len(); i++ {
assertion := kvrpcpb.Assertion_None
if m.IsAssertExists(i) {
assertion = kvrpcpb.Assertion_Exist
}
if m.IsAssertNotExist(i) {
assertion = kvrpcpb.Assertion_NotExist
}
mutations[i] = &kvrpcpb.Mutation{
Op: m.GetOp(i),
Key: m.GetKey(i),
Value: m.GetValue(i),
Op: m.GetOp(i),
Key: m.GetKey(i),
Value: m.GetValue(i),
Assertion: assertion,
}
isPessimisticLock[i] = m.IsPessimisticLock(i)
}
@ -112,6 +120,11 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
}
}
assertionLevel := c.txn.assertionLevel
if _, err := util.EvalFailpoint("assertionSkipCheckFromPrewrite"); err == nil {
assertionLevel = kvrpcpb.AssertionLevel_Off
}
req := &kvrpcpb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
@ -122,6 +135,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
TxnSize: txnSize,
MinCommitTs: minCommitTS,
MaxCommitTs: c.maxCommitTS,
AssertionLevel: assertionLevel,
}
if _, err := util.EvalFailpoint("invalidMaxCommitTS"); err == nil {

View File

@ -96,7 +96,7 @@ func newTwoPhaseCommitterWithInit(txn *KVTxn, sessionID uint64) (*twoPhaseCommit
if err != nil {
return nil, err
}
if err = c.initKeysAndMutations(); err != nil {
if err = c.initKeysAndMutations(context.Background()); err != nil {
return nil, err
}
return c, nil
@ -109,7 +109,7 @@ type CommitterProbe struct {
// InitKeysAndMutations prepares the committer for commit.
func (c CommitterProbe) InitKeysAndMutations() error {
return c.initKeysAndMutations()
return c.initKeysAndMutations(context.Background())
}
// SetPrimaryKey resets the committer's commit ts.
@ -366,3 +366,13 @@ func (c ConfigProbe) LoadPreSplitSizeThreshold() uint32 {
func (c ConfigProbe) StorePreSplitSizeThreshold(v uint32) {
atomic.StoreUint32(&preSplitSizeThreshold, v)
}
// MemBufferMutationsProbe exports memBufferMutations for test purposes.
type MemBufferMutationsProbe struct {
*memBufferMutations
}
// NewMemBufferMutationsProbe creates a new memBufferMutations instance for testing purpose.
func NewMemBufferMutationsProbe(sizeHint int, storage *unionstore.MemDB) MemBufferMutationsProbe {
return MemBufferMutationsProbe{newMemBufferMutations(sizeHint, storage)}
}

View File

@ -115,7 +115,8 @@ type KVTxn struct {
diskFullOpt kvrpcpb.DiskFullOpt
commitTSUpperBoundCheck func(uint64) bool
// interceptor is used to decorate the RPC request logic related to the txn.
interceptor interceptor.RPCInterceptor
interceptor interceptor.RPCInterceptor
assertionLevel kvrpcpb.AssertionLevel
}
// NewTiKVTxn creates a new KVTxn.
@ -325,6 +326,11 @@ func (txn *KVTxn) ClearDiskFullOpt() {
txn.diskFullOpt = kvrpcpb.DiskFullOpt_NotAllowedOnFull
}
// SetAssertionLevel sets how strict the assertions in the transaction should be.
func (txn *KVTxn) SetAssertionLevel(assertionLevel kvrpcpb.AssertionLevel) {
txn.assertionLevel = assertionLevel
}
// IsPessimistic returns true if it is pessimistic.
func (txn *KVTxn) IsPessimistic() bool {
return txn.isPessimistic
@ -395,9 +401,12 @@ func (txn *KVTxn) Commit(ctx context.Context) error {
defer committer.ttlManager.close()
initRegion := trace.StartRegion(ctx, "InitKeys")
err = committer.initKeysAndMutations()
err = committer.initKeysAndMutations(ctx)
initRegion.End()
if err != nil {
if txn.IsPessimistic() {
txn.asyncPessimisticRollback(ctx, committer.mutations.GetKeys())
}
return err
}
if committer.mutations.Len() == 0 {
@ -631,6 +640,7 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
return nil
}
keys = deduplicateKeys(keys)
checkedExistence := false
if txn.IsPessimistic() && lockCtx.ForUpdateTS > 0 {
if txn.committer == nil {
// sessionID is used for log.
@ -714,15 +724,23 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
}
return err
}
if lockCtx.CheckExistence {
checkedExistence = true
}
}
for _, key := range keys {
valExists := tikv.SetKeyLockedValueExists
// PointGet and BatchPointGet will return value in pessimistic lock response, the value may not exist.
// For other lock modes, the locked key values always exist.
if lockCtx.ReturnValues {
val := lockCtx.Values[string(key)]
if len(val.Value) == 0 {
valExists = tikv.SetKeyLockedValueNotExists
if lockCtx.ReturnValues || checkedExistence {
// If ReturnValue is disabled and CheckExistence is requested, it's still possible that the TiKV's version
// is too old and CheckExistence is not supported.
if val, ok := lockCtx.Values[string(key)]; ok {
// TODO: Check if it's safe to use `val.Exists` instead of assuming empty value.
if !val.Exists {
valExists = tikv.SetKeyLockedValueNotExists
}
}
}
memBuf.UpdateFlags(key, tikv.SetKeyLocked, tikv.DelNeedCheckExists, valExists)