implement rollback for pipelined dml (#1235)

* impl rollback for p-dml

Signed-off-by: you06 <you1474600@gmail.com>

* add test to ensure the rollback locks cannot be read

Signed-off-by: you06 <you1474600@gmail.com>

* remove TODO comment

Signed-off-by: you06 <you1474600@gmail.com>

* address comment

Signed-off-by: you06 <you1474600@gmail.com>

address comment

Signed-off-by: you06 <you1474600@gmail.com>

* resolve conflict

Signed-off-by: you06 <you1474600@gmail.com>

* lint

Signed-off-by: you06 <you1474600@gmail.com>

---------

Signed-off-by: you06 <you1474600@gmail.com>
Co-authored-by: cfzjywxk <lsswxrxr@163.com>
Co-authored-by: ekexium <eke@fastmail.com>
This commit is contained in:
you06 2024-04-09 11:27:18 +09:00 committed by GitHub
parent 6906de0612
commit 714958ccd4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 88 additions and 27 deletions

View File

@ -31,6 +31,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/transaction"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
)
const (
@ -241,7 +242,7 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()
bo := retry.NewNoopBackoff(ctx)
committer.ResolveFlushedLocks(bo, []byte("1"), []byte("99"))
committer.ResolveFlushedLocks(bo, []byte("1"), []byte("99"), true)
close(done)
}()
// should be done within 10 seconds.
@ -262,6 +263,36 @@ func (s *testPipelinedMemDBSuite) TestPipelinedCommit() {
}
}
func (s *testPipelinedMemDBSuite) TestPipelinedRollback() {
txn, err := s.store.Begin(tikv.WithPipelinedMemDB())
startTS := txn.StartTS()
s.Nil(err)
keys := make([][]byte, 0, 100)
for i := 0; i < 100; i++ {
key := []byte(strconv.Itoa(i))
value := key
txn.Set(key, value)
keys = append(keys, key)
}
txn.GetMemBuffer().Flush(true)
s.Nil(txn.GetMemBuffer().FlushWait())
s.Nil(txn.Rollback())
s.Eventuallyf(func() bool {
txn, err := s.store.Begin(tikv.WithStartTS(startTS), tikv.WithPipelinedMemDB())
s.Nil(err)
defer func() { s.Nil(txn.Rollback()) }()
storageBufferedValues, err := txn.GetSnapshot().BatchGetWithTier(context.Background(), keys, txnsnapshot.BatchGetBufferTier)
s.Nil(err)
return len(storageBufferedValues) == 0
}, 10*time.Second, 10*time.Millisecond, "rollback should cleanup locks in time")
txn, err = s.store.Begin()
s.Nil(err)
defer func() { s.Nil(txn.Rollback()) }()
storageValues, err := txn.GetSnapshot().BatchGet(context.Background(), keys)
s.Nil(err)
s.Len(storageValues, 0)
}
func (s *testPipelinedMemDBSuite) TestPipelinedPrefetch() {
failpoint.Enable("tikvclient/beforeSendReqToRegion", "return")
defer failpoint.Disable("tikvclient/beforeSendReqToRegion")

View File

@ -23,6 +23,7 @@ import (
"time"
"github.com/docker/go-units"
"github.com/golang/protobuf/proto" //nolint:staticcheck
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
@ -335,7 +336,7 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error {
// async resolve the rest locks.
commitBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars)
go c.resolveFlushedLocks(commitBo, c.pipelinedCommitInfo.pipelinedStart, c.pipelinedCommitInfo.pipelinedEnd)
c.resolveFlushedLocks(commitBo, c.pipelinedCommitInfo.pipelinedStart, c.pipelinedCommitInfo.pipelinedEnd, true)
return nil
}
@ -355,6 +356,18 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
maxBackOff = CommitSecondaryMaxBackoff
}
regionCache := c.store.GetRegionCache()
// the handler function runs in a different goroutine, should copy the required values before it to avoid race.
kvContext := &kvrpcpb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
ResourceGroupTag: c.resourceGroupTag,
DiskFullOpt: c.txn.diskFullOpt,
TxnSource: c.txn.txnSource,
RequestSource: PipelinedRequestSource,
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: c.resourceGroupName,
},
}
return func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
start := r.StartKey
res := rangetask.TaskStat{}
@ -363,9 +376,7 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
StartVersion: c.startTS,
CommitVersion: commitVersion,
}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, kvrpcpb.Context{
RequestSource: PipelinedRequestSource,
})
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, *proto.Clone(kvContext).(*kvrpcpb.Context))
bo := retry.NewBackoffer(ctx, maxBackOff)
loc, err := regionCache.LocateKey(bo, start)
if err != nil {
@ -413,11 +424,12 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
}, nil
}
func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end []byte) {
// TODO: implement cleanup.
// resolveFlushedLocks resolves all locks in the given range [start, end) with the given status.
// The resolve process is running in another goroutine so this function won't block.
func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end []byte, commit bool) {
const RESOLVE_CONCURRENCY = 8
var resolved atomic.Uint64
handler, err := c.buildPipelinedResolveHandler(true, &resolved)
handler, err := c.buildPipelinedResolveHandler(commit, &resolved)
if err != nil {
logutil.Logger(bo.GetCtx()).Error(
"[pipelined dml] build buildPipelinedResolveHandler error",
@ -428,26 +440,38 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
)
return
}
status := "rollback"
if commit {
status = "commit"
}
runner := rangetask.NewRangeTaskRunnerWithID(
"pipelined-dml-commit",
fmt.Sprintf("pipelined-dml-commit-%d", c.startTS),
fmt.Sprintf("pipelined-dml-%s", status),
fmt.Sprintf("pipelined-dml-%s-%d", status, c.startTS),
c.store,
RESOLVE_CONCURRENCY,
handler,
)
if err = runner.RunOnRange(bo.GetCtx(), start, end); err != nil {
logutil.Logger(bo.GetCtx()).Error("[pipelined dml] commit transaction secondaries failed",
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Error(err),
)
} else {
logutil.Logger(bo.GetCtx()).Info(
"[pipelined dml] commit transaction secondaries done",
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
)
}
go func() {
if err = runner.RunOnRange(bo.GetCtx(), start, end); err != nil {
logutil.Logger(bo.GetCtx()).Error("[pipelined dml] resolve flushed locks failed",
zap.String("txn-status", status),
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("session", c.sessionID),
zap.Error(err),
)
} else {
logutil.Logger(bo.GetCtx()).Info("[pipelined dml] resolve flushed locks done",
zap.String("txn-status", status),
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("session", c.sessionID),
)
}
}()
}

View File

@ -370,8 +370,8 @@ func (c CommitterProbe) CleanupMutations(ctx context.Context) error {
}
// ResolveFlushedLocks exports resolveFlushedLocks
func (c CommitterProbe) ResolveFlushedLocks(bo *retry.Backoffer, start, end []byte) {
c.resolveFlushedLocks(bo, start, end)
func (c CommitterProbe) ResolveFlushedLocks(bo *retry.Backoffer, start, end []byte, commit bool) {
c.resolveFlushedLocks(bo, start, end, commit)
}
// SendTxnHeartBeat renews a txn's ttl.

View File

@ -790,6 +790,12 @@ func (txn *KVTxn) Rollback() error {
txn.pipelinedCancel()
txn.GetMemBuffer().FlushWait()
txn.committer.ttlManager.close()
// no need to clean up locks when no flush triggered.
pipelinedStart, pipelinedEnd := txn.committer.pipelinedCommitInfo.pipelinedStart, txn.committer.pipelinedCommitInfo.pipelinedEnd
if len(pipelinedStart) != 0 && len(pipelinedEnd) != 0 {
rollbackBo := retry.NewBackofferWithVars(txn.store.Ctx(), CommitSecondaryMaxBackoff, txn.vars)
txn.committer.resolveFlushedLocks(rollbackBo, pipelinedStart, pipelinedEnd, false)
}
}
txn.close()
logutil.BgLogger().Debug("[kv] rollback txn", zap.Uint64("txnStartTS", txn.StartTS()))