lock_resolver: handle pessimistic locks in BatchResolveLocks (#794)

* lock_resolver: handle pessimistic locks in BatchResolveLocks

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Add test but failed on unistore because unistore's ScanLock doesnt return lock type

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Address comments

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

* Fix test

Signed-off-by: zyguan <zhongyangguan@gmail.com>

* Fix golangci

Signed-off-by: zyguan <zhongyangguan@gmail.com>

---------

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Signed-off-by: zyguan <zhongyangguan@gmail.com>
Co-authored-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
This commit is contained in:
zyguan 2023-05-11 12:26:01 +08:00 committed by GitHub
parent 5c324b7c1e
commit 2b0667c65c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 218 additions and 0 deletions

View File

@ -1,3 +1,5 @@
run:
timeout: 5m
linters:
disable-all: true
enable:

View File

@ -37,9 +37,12 @@ package tikv_test
import (
"bytes"
"context"
"encoding/json"
stderrs "errors"
"fmt"
"io"
"math"
"net/http"
"sync"
"sync/atomic"
"testing"
@ -48,8 +51,11 @@ import (
"github.com/pingcap/failpoint"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
@ -1083,6 +1089,126 @@ func (s *testLockWithTiKVSuite) checkIsKeyLocked(key []byte, expectedLocked bool
s.NoError(err)
}
func (s *testLockWithTiKVSuite) trySetTiKVConfig(name string, value interface{}) func() {
stores, err := s.store.GetPDClient().GetAllStores(context.Background())
s.NoError(err)
type configItem struct {
url string
name string
value interface{}
}
var recoverConfigs []configItem
httpScheme := "http"
if c, err := config.GetGlobalConfig().Security.ToTLSConfig(); err == nil && c != nil {
httpScheme = "https"
}
t := s.Suite.T()
setCfg := func(url, name string, value interface{}) error {
postBody, err := json.Marshal(map[string]interface{}{name: value})
if err != nil {
return err
}
resp, err := http.Post(url, "text/json", bytes.NewReader(postBody))
if err != nil {
return err
}
s.NoError(resp.Body.Close())
if resp.StatusCode != 200 {
return errors.Errorf("post config got unexpected status code: %v, request body: %s", resp.StatusCode, postBody)
}
t.Logf("set config for tikv at %s finished: %s", url, string(postBody))
return nil
}
storeIter:
for _, store := range stores {
if store.State != metapb.StoreState_Up {
continue
}
for _, label := range store.Labels {
if label.Key == "engine" && label.Value != "tikv" {
continue storeIter
}
}
err := func() (err error) {
defer func() {
if r := recover(); r != nil {
err = errors.Errorf("set config for store at %v panicked: %v", store.StatusAddress, r)
}
}()
url := fmt.Sprintf("%s://%s/config", httpScheme, store.StatusAddress)
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return errors.Errorf("unexpected response status: %v", resp.Status)
}
oldCfgRaw, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
oldCfg := make(map[string]interface{})
err = json.Unmarshal(oldCfgRaw, &oldCfg)
if err != nil {
return err
}
oldValue := oldCfg["pessimistic-txn"].(map[string]interface{})["in-memory"]
if assert.ObjectsAreEqual(oldValue, value) {
return nil
}
err = setCfg(url, name, value)
if err != nil {
return err
}
recoverConfigs = append(recoverConfigs, configItem{
url: url,
name: name,
value: oldValue,
})
return nil
}()
if err != nil {
t.Logf("failed to set config for store at %s: %v", store.StatusAddress, err)
}
}
// Prevent goleak from complaining about its internal connections.
http.DefaultClient.CloseIdleConnections()
if len(recoverConfigs) > 0 {
// Sleep for a while to ensure the new configs are applied.
time.Sleep(time.Second)
}
return func() {
for _, item := range recoverConfigs {
err = setCfg(item.url, item.name, item.value)
if err != nil {
t.Logf("failed to recover config for store at %s: %v", item.url, err)
}
}
// Prevent goleak from complaining about its internal connections.
http.DefaultClient.CloseIdleConnections()
}
}
func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() {
test := func(asyncCommit bool, onePC bool, causalConsistency bool) {
k1 := []byte("k1")
@ -1337,3 +1463,72 @@ func (s *testLockWithTiKVSuite) TestCheckTxnStatusSentToSecondary() {
s.NoError(err)
s.Equal([]byte("v1-1"), v)
}
func (s *testLockWithTiKVSuite) TestBatchResolveLocks() {
if *withTiKV {
recoverFunc := s.trySetTiKVConfig("pessimistic-txn.in-memory", false)
defer recoverFunc()
} else {
s.T().Skip("this test only works with tikv")
}
s.NoError(failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/beforeCommitSecondaries", `return("skip")`))
s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return("skip")`))
defer func() {
s.NoError(failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback"))
s.NoError(failpoint.Disable("tikvclient/beforeCommitSecondaries"))
s.NoError(failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit"))
}()
k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3")
v2, v3 := []byte("v2"), []byte("v3")
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
txn, err := s.store.Begin()
s.NoError(err)
txn.SetPessimistic(true)
{
// Produce write conflict on key k2
txn2, err := s.store.Begin()
s.NoError(err)
s.NoError(txn2.Set(k2, []byte("v0")))
s.NoError(txn2.Commit(ctx))
}
lockCtx := kv.NewLockCtx(txn.StartTS(), 200, time.Now())
err = txn.LockKeys(ctx, lockCtx, k1, k2)
s.IsType(&tikverr.ErrWriteConflict{}, errors.Cause(err))
// k1 has txn's stale pessimistic lock now.
forUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.NoError(err)
lockCtx = kv.NewLockCtx(forUpdateTS, 200, time.Now())
s.NoError(txn.LockKeys(ctx, lockCtx, k2, k3))
s.NoError(txn.Set(k2, v2))
s.NoError(txn.Set(k3, v3))
s.NoError(txn.Commit(ctx))
// k3 has txn's stale prewrite lock now.
// Perform ScanLock - BatchResolveLock.
currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
s.NoError(err)
s.NoError(s.store.GCResolveLockPhase(ctx, currentTS, 1))
// Check data consistency
readTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
snapshot := s.store.GetSnapshot(readTS)
_, err = snapshot.Get(ctx, k1)
s.Equal(tikverr.ErrNotExist, err)
v, err := snapshot.Get(ctx, k2)
s.NoError(err)
s.Equal(v2, v)
v, err = snapshot.Get(ctx, k3)
s.NoError(err)
s.Equal(v3, v)
}

View File

@ -109,6 +109,11 @@ func (s StoreProbe) SetSafeTS(storeID, safeTS uint64) {
s.setSafeTS(storeID, safeTS)
}
// GCResolveLockPhase performs the resolve-locks phase of GC, which scans all locks and resolves them.
func (s StoreProbe) GCResolveLockPhase(ctx context.Context, safepoint uint64, concurrency int) error {
return s.resolveLocks(ctx, safepoint, concurrency)
}
// LockResolverProbe wraps a LockResolver and exposes internal stats for testing purpose.
type LockResolverProbe struct {
*txnlock.LockResolverProbe

View File

@ -238,11 +238,27 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
txnInfos := make(map[uint64]uint64)
startTime := time.Now()
for _, l := range expiredLocks {
logutil.Logger(bo.GetCtx()).Debug("BatchResolveLocks handling lock", zap.Stringer("lock", l))
if _, ok := txnInfos[l.TxnID]; ok {
continue
}
metrics.LockResolverCountWithExpired.Inc()
if l.LockType == kvrpcpb.Op_PessimisticLock {
// BatchResolveLocks forces resolving the locks ignoring whether whey are expired.
// For pessimistic locks, committing them makes no sense, but it won't affect transaction
// correctness if we always roll back them.
// Pessimistic locks needs special handling logic because their primary may not point
// to the real primary of that transaction, and their state cannot be put in `txnInfos`.
// (see: https://github.com/pingcap/tidb/issues/42937).
err := lr.resolvePessimisticLock(bo, l)
if err != nil {
return false, err
}
continue
}
// Use currentTS = math.MaxUint64 means rollback the txn, no matter the lock is expired or not!
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary, 0, math.MaxUint64, true, false, l)
if err != nil {