client-go/txnkv/transaction/pipelined_flush.go

411 lines
13 KiB
Go

// Copyright 2024 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transaction
import (
"bytes"
"context"
"strconv"
"sync/atomic"
"time"
"github.com/docker/go-units"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/config/retry"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/client"
"github.com/tikv/client-go/v2/internal/locate"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/rangetask"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
// PipelinedRequestSource is the source of the Flush & ResolveLock requests in a txn with pipelined memdb.
// txn.GetRequestSource may cause data race because the upper layer may edit the source while the flush requests are built in background.
// So we use the fixed source from the upper layer to avoid the data race.
// This also distinguishes the resource usage between p-DML(pipelined DML) and other small DMLs.
const PipelinedRequestSource = "external_pdml"
type actionPipelinedFlush struct {
generation uint64
}
var _ twoPhaseCommitAction = actionPipelinedFlush{}
func (action actionPipelinedFlush) String() string {
return "pipelined_flush"
}
func (action actionPipelinedFlush) tiKVTxnRegionsNumHistogram() prometheus.Observer {
return nil
}
func (c *twoPhaseCommitter) buildPipelinedFlushRequest(batch batchMutations, generation uint64) *tikvrpc.Request {
m := batch.mutations
mutations := make([]*kvrpcpb.Mutation, m.Len())
for i := 0; i < m.Len(); i++ {
assertion := kvrpcpb.Assertion_None
if m.IsAssertExists(i) {
assertion = kvrpcpb.Assertion_Exist
}
if m.IsAssertNotExist(i) {
assertion = kvrpcpb.Assertion_NotExist
}
mutations[i] = &kvrpcpb.Mutation{
Op: m.GetOp(i),
Key: m.GetKey(i),
Value: m.GetValue(i),
Assertion: assertion,
}
}
minCommitTS := c.startTS + 1
req := &kvrpcpb.FlushRequest{
Mutations: mutations,
PrimaryKey: c.primary(),
StartTs: c.startTS,
MinCommitTs: minCommitTS,
Generation: generation,
LockTtl: max(defaultLockTTL, ManagedLockTTL),
AssertionLevel: c.txn.assertionLevel,
}
r := tikvrpc.NewRequest(
tikvrpc.CmdFlush, req, kvrpcpb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
ResourceGroupTag: c.resourceGroupTag,
DiskFullOpt: c.txn.diskFullOpt,
TxnSource: c.txn.txnSource,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: PipelinedRequestSource,
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: c.resourceGroupName,
},
},
)
return r
}
func (action actionPipelinedFlush) handleSingleBatch(
c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations,
) (err error) {
if len(c.primaryKey) == 0 {
return errors.New("[pipelined dml] primary key should be set before pipelined flush")
}
tBegin := time.Now()
attempts := 0
req := c.buildPipelinedFlushRequest(batch, action.generation)
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
var resolvingRecordToken *int
for {
attempts++
reqBegin := time.Now()
if reqBegin.Sub(tBegin) > slowRequestThreshold {
logutil.BgLogger().Warn(
"[pipelined dml] slow pipelined flush request",
zap.Uint64("startTS", c.startTS),
zap.Stringer("region", &batch.region),
zap.Int("attempts", attempts),
)
tBegin = time.Now()
}
resp, _, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
// Unexpected error occurs, return it
if err != nil {
return err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return err
}
if regionErr != nil {
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
if regionErr.GetEpochNotMatch() == nil || locate.IsFakeRegionError(regionErr) {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return err
}
}
if regionErr.GetDiskFull() != nil {
storeIds := regionErr.GetDiskFull().GetStoreId()
desc := " "
for _, i := range storeIds {
desc += strconv.FormatUint(i, 10) + " "
}
logutil.Logger(bo.GetCtx()).Error(
"Request failed cause of TiKV disk full",
zap.String("store_id", desc),
zap.String("reason", regionErr.GetDiskFull().GetReason()),
)
return errors.New(regionErr.String())
}
same, err := batch.relocate(bo, c.store.GetRegionCache())
if err != nil {
return err
}
if same {
continue
}
err = c.doActionOnMutations(bo, actionPipelinedFlush{generation: action.generation}, batch.mutations)
return err
}
if resp.Resp == nil {
return errors.WithStack(tikverr.ErrBodyMissing)
}
flushResp := resp.Resp.(*kvrpcpb.FlushResponse)
keyErrs := flushResp.GetErrors()
if len(keyErrs) == 0 {
// Clear the RPC Error since the request is evaluated successfully.
sender.SetRPCError(nil)
// Update CommitDetails
reqDuration := time.Since(reqBegin)
c.getDetail().MergeFlushReqDetails(
reqDuration,
batch.region.GetID(),
sender.GetStoreAddr(),
flushResp.ExecDetailsV2,
)
if batch.isPrimary {
// start keepalive after primary key is written.
c.run(c, nil)
}
return nil
}
locks := make([]*txnlock.Lock, 0, len(keyErrs))
for _, keyErr := range keyErrs {
// Check already exists error
if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
e := &tikverr.ErrKeyExist{AlreadyExist: alreadyExist}
return c.extractKeyExistsErr(e)
}
// Extract lock from key error
lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr)
if err1 != nil {
return err1
}
logutil.BgLogger().Info(
"[pipelined dml] encounters lock",
zap.Uint64("session", c.sessionID),
zap.Uint64("txnID", c.startTS),
zap.Stringer("lock", lock),
)
// If an optimistic transaction encounters a lock with larger TS, this transaction will certainly
// fail due to a WriteConflict error. So we can construct and return an error here early.
// Pessimistic transactions don't need such an optimization. If this key needs a pessimistic lock,
// TiKV will return a PessimisticLockNotFound error directly if it encounters a different lock. Otherwise,
// TiKV returns lock.TTL = 0, and we still need to resolve the lock.
if lock.TxnID > c.startTS && !c.isPessimistic {
return tikverr.NewErrWriteConflictWithArgs(
c.startTS,
lock.TxnID,
0,
lock.Key,
kvrpcpb.WriteConflict_Optimistic,
)
}
locks = append(locks, lock)
}
if resolvingRecordToken == nil {
token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS)
resolvingRecordToken = &token
defer c.store.GetLockResolver().ResolveLocksDone(c.startTS, *resolvingRecordToken)
} else {
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken)
}
resolveLockOpts := txnlock.ResolveLocksOptions{
CallerStartTS: c.startTS,
Locks: locks,
Detail: &c.getDetail().ResolveLock,
}
resolveLockRes, err := c.store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLockOpts)
if err != nil {
return err
}
msBeforeExpired := resolveLockRes.TTL
if msBeforeExpired > 0 {
err = bo.BackoffWithCfgAndMaxSleep(
retry.BoTxnLock,
int(msBeforeExpired),
errors.Errorf("[pipelined dml] flush lockedKeys: %d", len(locks)),
)
if err != nil {
return err
}
}
}
}
func (c *twoPhaseCommitter) pipelinedFlushMutations(bo *retry.Backoffer, mutations CommitterMutations, generation uint64) error {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("twoPhaseCommitter.pipelinedFlushMutations", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
return c.doActionOnMutations(bo, actionPipelinedFlush{generation}, mutations)
}
func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error {
logutil.BgLogger().Info("[pipelined dml] start to commit transaction",
zap.Int("keys", c.txn.GetMemBuffer().Len()),
zap.String("size", units.HumanSize(float64(c.txn.GetMemBuffer().Size()))))
commitTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope())
if err != nil {
logutil.Logger(bo.GetCtx()).Warn("[pipelined dml] commit transaction get commitTS failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS))
return err
}
atomic.StoreUint64(&c.commitTS, commitTS)
if _, err := util.EvalFailpoint("pipelinedCommitFail"); err == nil {
return errors.New("pipelined DML commit failed")
}
primaryMutation := NewPlainMutations(1)
primaryMutation.Push(c.pipelinedCommitInfo.primaryOp, c.primaryKey, nil, false, false, false, false)
if err = c.commitMutations(bo, &primaryMutation); err != nil {
return errors.Trace(err)
}
c.mu.RLock()
c.mu.committed = true
c.mu.RUnlock()
logutil.BgLogger().Info("[pipelined dml] transaction is committed")
if _, err := util.EvalFailpoint("pipelinedSkipResolveLock"); err == nil {
return nil
}
// async resolve the rest locks.
commitBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars)
go c.resolveFlushedLocks(commitBo, c.pipelinedCommitInfo.pipelinedStart, c.pipelinedCommitInfo.pipelinedEnd)
return nil
}
// buildPipelinedResolveHandler returns a function which resolves all locks for the given region.
// If the region cache is stale, it reloads the region info and resolve the rest ranges.
// The function also count resolved regions.
func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *atomic.Uint64) (rangetask.TaskHandler, error) {
commitVersion := uint64(0)
if commit {
commitVersion = atomic.LoadUint64(&c.commitTS)
if commitVersion == 0 {
return nil, errors.New("commitTS is 0")
}
}
maxBackOff := cleanupMaxBackoff
if commit {
maxBackOff = CommitSecondaryMaxBackoff
}
regionCache := c.store.GetRegionCache()
return func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) {
start := r.StartKey
res := rangetask.TaskStat{}
for {
lreq := &kvrpcpb.ResolveLockRequest{
StartVersion: c.startTS,
CommitVersion: commitVersion,
}
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq, kvrpcpb.Context{
RequestSource: PipelinedRequestSource,
})
bo := retry.NewBackoffer(ctx, maxBackOff)
loc, err := regionCache.LocateKey(bo, start)
if err != nil {
return res, err
}
resp, err := c.store.SendReq(bo, req, loc.Region, client.MaxWriteExecutionTime)
if err != nil {
err = bo.Backoff(retry.BoRegionMiss, err)
if err != nil {
logutil.Logger(bo.GetCtx()).Error("send resolve lock request error", zap.Error(err))
return res, err
}
continue
}
regionErr, err := resp.GetRegionError()
if err != nil {
logutil.Logger(bo.GetCtx()).Error("get region error failed", zap.Error(err))
return res, err
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
logutil.Logger(bo.GetCtx()).Error("send resolve lock get region error", zap.Error(err))
return res, err
}
continue
}
if resp.Resp == nil {
logutil.Logger(bo.GetCtx()).Error("send resolve lock response body missing", zap.Error(errors.WithStack(tikverr.ErrBodyMissing)))
return res, err
}
cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse)
if keyErr := cmdResp.GetError(); keyErr != nil {
err = errors.Errorf("unexpected resolve err: %s", keyErr)
logutil.BgLogger().Error("resolveLock error", zap.Error(err))
return res, err
}
resolved.Add(1)
res.CompletedRegions++
if loc.EndKey == nil || bytes.Compare(loc.EndKey, r.EndKey) >= 0 {
return res, nil
}
start = loc.EndKey
}
}, nil
}
func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end []byte) {
// TODO: implement cleanup.
const RESOLVE_CONCURRENCY = 8
var resolved atomic.Uint64
handler, err := c.buildPipelinedResolveHandler(true, &resolved)
if err != nil {
logutil.Logger(bo.GetCtx()).Error("[pipelined dml] build buildPipelinedResolveHandler error", zap.Error(err))
return
}
runner := rangetask.NewRangeTaskRunner("pipelined-dml-commit", c.store, RESOLVE_CONCURRENCY, handler)
if err = runner.RunOnRange(bo.GetCtx(), start, end); err != nil {
logutil.Logger(bo.GetCtx()).Error("[pipelined dml] commit transaction secondaries failed",
zap.Uint64("resolved regions", resolved.Load()),
zap.Error(err))
} else {
logutil.BgLogger().Info("[pipelined dml] commit transaction secondaries done",
zap.Uint64("resolved regions", resolved.Load()))
}
}