mirror of https://github.com/tikv/client-go.git
reduce write backoffs but don't count busy errors (#271)
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
parent
feeb9d1ed1
commit
16d902a3c7
|
|
@ -84,12 +84,10 @@ type testCommitterSuite struct {
|
|||
func (s *testCommitterSuite) SetupSuite() {
|
||||
atomic.StoreUint64(&transaction.ManagedLockTTL, 3000) // 3s
|
||||
atomic.StoreUint64(&transaction.CommitMaxBackoff, 1000)
|
||||
atomic.StoreUint64(&transaction.VeryLongMaxBackoff, 1000)
|
||||
}
|
||||
|
||||
func (s *testCommitterSuite) TearDownSuite() {
|
||||
atomic.StoreUint64(&transaction.CommitMaxBackoff, 20000)
|
||||
atomic.StoreUint64(&transaction.VeryLongMaxBackoff, 600000)
|
||||
}
|
||||
|
||||
func (s *testCommitterSuite) SetupTest() {
|
||||
|
|
|
|||
|
|
@ -76,8 +76,13 @@ var MaxRecvMsgSize = math.MaxInt64 - 1
|
|||
// Timeout durations.
|
||||
const (
|
||||
dialTimeout = 5 * time.Second
|
||||
ReadTimeoutShort = 20 * time.Second // For requests that read/write several key-values.
|
||||
ReadTimeoutShort = 30 * time.Second // For requests that read/write several key-values.
|
||||
ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region.
|
||||
|
||||
// MaxWriteExecutionTime is the MaxExecutionDurationMs field for write requests.
|
||||
// Because the last deadline check is before proposing, let us give it 10 more seconds
|
||||
// after proposing.
|
||||
MaxWriteExecutionTime = ReadTimeoutShort - 10*time.Second
|
||||
)
|
||||
|
||||
// Grpc window size
|
||||
|
|
|
|||
|
|
@ -57,9 +57,10 @@ import (
|
|||
type Backoffer struct {
|
||||
ctx context.Context
|
||||
|
||||
fn map[string]backoffFn
|
||||
maxSleep int
|
||||
totalSleep int
|
||||
fn map[string]backoffFn
|
||||
maxSleep int
|
||||
totalSleep int
|
||||
excludedSleep int
|
||||
|
||||
vars *kv.Variables
|
||||
noop bool
|
||||
|
|
@ -140,7 +141,7 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
|
|||
|
||||
b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano)))
|
||||
b.configs = append(b.configs, cfg)
|
||||
if b.noop || (b.maxSleep > 0 && b.totalSleep >= b.maxSleep) {
|
||||
if b.noop || (b.maxSleep > 0 && (b.totalSleep-b.excludedSleep) >= b.maxSleep) {
|
||||
errMsg := fmt.Sprintf("%s backoffer.maxSleep %dms is exceeded, errors:", cfg.String(), b.maxSleep)
|
||||
for i, err := range b.errors {
|
||||
// Print only last 3 errors for non-DEBUG log levels.
|
||||
|
|
@ -166,7 +167,11 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
|
|||
if cfg.metric != nil {
|
||||
(*cfg.metric).Observe(float64(realSleep) / 1000)
|
||||
}
|
||||
|
||||
b.totalSleep += realSleep
|
||||
if _, ok := isSleepExcluded[cfg]; ok {
|
||||
b.excludedSleep += realSleep
|
||||
}
|
||||
if b.backoffSleepMS == nil {
|
||||
b.backoffSleepMS = make(map[string]int)
|
||||
}
|
||||
|
|
@ -196,6 +201,7 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
|
|||
logutil.Logger(b.ctx).Debug("retry later",
|
||||
zap.Error(err),
|
||||
zap.Int("totalSleep", b.totalSleep),
|
||||
zap.Int("excludedSleep", b.excludedSleep),
|
||||
zap.Int("maxSleep", b.maxSleep),
|
||||
zap.Stringer("type", cfg),
|
||||
zap.Reflect("txnStartTS", startTs))
|
||||
|
|
@ -213,12 +219,13 @@ func (b *Backoffer) String() string {
|
|||
// current Backoffer's context.
|
||||
func (b *Backoffer) Clone() *Backoffer {
|
||||
return &Backoffer{
|
||||
ctx: b.ctx,
|
||||
maxSleep: b.maxSleep,
|
||||
totalSleep: b.totalSleep,
|
||||
errors: b.errors,
|
||||
vars: b.vars,
|
||||
parent: b.parent,
|
||||
ctx: b.ctx,
|
||||
maxSleep: b.maxSleep,
|
||||
totalSleep: b.totalSleep,
|
||||
excludedSleep: b.excludedSleep,
|
||||
errors: b.errors,
|
||||
vars: b.vars,
|
||||
parent: b.parent,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -227,12 +234,13 @@ func (b *Backoffer) Clone() *Backoffer {
|
|||
func (b *Backoffer) Fork() (*Backoffer, context.CancelFunc) {
|
||||
ctx, cancel := context.WithCancel(b.ctx)
|
||||
return &Backoffer{
|
||||
ctx: ctx,
|
||||
maxSleep: b.maxSleep,
|
||||
totalSleep: b.totalSleep,
|
||||
errors: b.errors,
|
||||
vars: b.vars,
|
||||
parent: b,
|
||||
ctx: ctx,
|
||||
maxSleep: b.maxSleep,
|
||||
totalSleep: b.totalSleep,
|
||||
excludedSleep: b.excludedSleep,
|
||||
errors: b.errors,
|
||||
vars: b.vars,
|
||||
parent: b,
|
||||
}, cancel
|
||||
}
|
||||
|
||||
|
|
@ -299,6 +307,7 @@ func (b *Backoffer) ErrorsNum() int {
|
|||
func (b *Backoffer) Reset() {
|
||||
b.fn = nil
|
||||
b.totalSleep = 0
|
||||
b.excludedSleep = 0
|
||||
}
|
||||
|
||||
// ResetMaxSleep resets the sleep state and max sleep limit of the backoffer.
|
||||
|
|
|
|||
|
|
@ -122,6 +122,11 @@ var (
|
|||
BoTxnLockFast = NewConfig(txnLockFastName, &metrics.BackoffHistogramLockFast, NewBackoffFnCfg(2, 3000, EqualJitter), tikverr.ErrResolveLockTimeout)
|
||||
)
|
||||
|
||||
var isSleepExcluded = map[*Config]struct{}{
|
||||
BoTiKVServerBusy: {},
|
||||
// add BoTiFlashServerBusy if appropriate
|
||||
}
|
||||
|
||||
const (
|
||||
// NoJitter makes the backoff sequence strict exponential.
|
||||
NoJitter = 1 + iota
|
||||
|
|
|
|||
|
|
@ -268,6 +268,7 @@ func (c *Client) Delete(ctx context.Context, key []byte) error {
|
|||
Key: key,
|
||||
ForCas: c.atomic,
|
||||
})
|
||||
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
||||
resp, _, err := c.sendReq(ctx, key, req, false)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
|
|
@ -450,7 +451,7 @@ func (c *Client) CompareAndSwap(ctx context.Context, key, previousValue, newValu
|
|||
}
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdRawCompareAndSwap, &reqArgs)
|
||||
|
||||
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
||||
resp, _, err := c.sendReq(ctx, key, req, false)
|
||||
if err != nil {
|
||||
return nil, false, errors.Trace(err)
|
||||
|
|
@ -565,6 +566,7 @@ func (c *Client) doBatchReq(bo *retry.Backoffer, batch kvrpc.Batch, cmdType tikv
|
|||
}
|
||||
|
||||
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
||||
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
||||
resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)
|
||||
|
||||
batchResp := kvrpc.BatchResult{}
|
||||
|
|
@ -630,6 +632,7 @@ func (c *Client) sendDeleteRangeReq(ctx context.Context, startKey []byte, endKey
|
|||
EndKey: actualEndKey,
|
||||
})
|
||||
|
||||
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
||||
resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
|
|
@ -695,6 +698,7 @@ func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch) error {
|
|||
req := tikvrpc.NewRequest(tikvrpc.CmdRawBatchPut, &kvrpcpb.RawBatchPutRequest{Pairs: kvPair, ForCas: c.atomic})
|
||||
|
||||
sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient)
|
||||
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
||||
resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
|
|
|
|||
|
|
@ -53,8 +53,9 @@ func WithSecurity(security config.Security) ClientOpt {
|
|||
|
||||
// Timeout durations.
|
||||
const (
|
||||
ReadTimeoutMedium = client.ReadTimeoutMedium
|
||||
ReadTimeoutShort = client.ReadTimeoutShort
|
||||
ReadTimeoutMedium = client.ReadTimeoutMedium
|
||||
ReadTimeoutShort = client.ReadTimeoutShort
|
||||
MaxWriteExecutionTime = client.MaxWriteExecutionTime
|
||||
)
|
||||
|
||||
// NewRPCClient creates a client that manages connections and rpc calls with tikv-servers.
|
||||
|
|
|
|||
|
|
@ -82,10 +82,10 @@ var (
|
|||
)
|
||||
|
||||
var (
|
||||
// CommitMaxBackoff is max sleep time of the 'commit' command
|
||||
CommitMaxBackoff = uint64(41000)
|
||||
// PrewriteMaxBackoff is max sleep time of the `pre-write` command.
|
||||
PrewriteMaxBackoff = 20000
|
||||
PrewriteMaxBackoff = 40000
|
||||
// CommitMaxBackoff is max sleep time of the 'commit' command
|
||||
CommitMaxBackoff = uint64(40000)
|
||||
)
|
||||
|
||||
type kvstore interface {
|
||||
|
|
@ -969,6 +969,7 @@ func sendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startT
|
|||
if err != nil {
|
||||
return 0, false, errors.Trace(err)
|
||||
}
|
||||
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
||||
resp, err := store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return 0, false, errors.Trace(err)
|
||||
|
|
@ -1077,9 +1078,6 @@ const (
|
|||
TsoMaxBackoff = 15000
|
||||
)
|
||||
|
||||
// VeryLongMaxBackoff is the max sleep time of transaction commit.
|
||||
var VeryLongMaxBackoff = uint64(600000) // 10mins
|
||||
|
||||
func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
|
||||
c.cleanWg.Add(1)
|
||||
c.store.WaitGroup().Add(1)
|
||||
|
|
@ -1185,7 +1183,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
|
|||
// The maxSleep should't be very long in this case.
|
||||
// - If the region isn't found in PD, it's possible the reason is write-stall.
|
||||
// The maxSleep can be long in this case.
|
||||
bo := retry.NewBackofferWithVars(ctx, int(atomic.LoadUint64(&VeryLongMaxBackoff)), c.txn.vars)
|
||||
bo := retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, c.txn.vars)
|
||||
|
||||
// If we want to use async commit or 1PC and also want linearizability across
|
||||
// all nodes, we have to make sure the commit TS of this transaction is greater
|
||||
|
|
@ -1401,7 +1399,7 @@ func (c *twoPhaseCommitter) commitTxn(ctx context.Context, commitDetail *util.Co
|
|||
start := time.Now()
|
||||
|
||||
// Use the VeryLongMaxBackoff to commit the primary key.
|
||||
commitBo := retry.NewBackofferWithVars(ctx, int(atomic.LoadUint64(&VeryLongMaxBackoff)), c.txn.vars)
|
||||
commitBo := retry.NewBackofferWithVars(ctx, int(CommitMaxBackoff), c.txn.vars)
|
||||
err := c.commitMutations(commitBo, c.mutations)
|
||||
commitDetail.CommitTime = time.Since(start)
|
||||
if commitBo.GetTotalSleep() > 0 {
|
||||
|
|
|
|||
|
|
@ -62,7 +62,8 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer
|
|||
req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &kvrpcpb.BatchRollbackRequest{
|
||||
Keys: batch.mutations.GetKeys(),
|
||||
StartVersion: c.startTS,
|
||||
}, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag})
|
||||
}, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag,
|
||||
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())})
|
||||
resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
|
|
|
|||
|
|
@ -71,7 +71,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
|
|||
Keys: keys,
|
||||
CommitVersion: c.commitTS,
|
||||
}, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog,
|
||||
ResourceGroupTag: c.resourceGroupTag, DiskFullOpt: c.diskFullOpt})
|
||||
ResourceGroupTag: c.resourceGroupTag, DiskFullOpt: c.diskFullOpt,
|
||||
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())})
|
||||
|
||||
tBegin := time.Now()
|
||||
attempts := 0
|
||||
|
|
|
|||
|
|
@ -117,7 +117,8 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
|
|||
WaitTimeout: action.LockWaitTime(),
|
||||
ReturnValues: action.ReturnValues,
|
||||
MinCommitTs: c.forUpdateTS + 1,
|
||||
}, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: action.LockCtx.ResourceGroupTag})
|
||||
}, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: action.LockCtx.ResourceGroupTag,
|
||||
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())})
|
||||
lockWaitStartTime := action.WaitStartTime
|
||||
for {
|
||||
// if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit
|
||||
|
|
@ -254,6 +255,7 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *ret
|
|||
ForUpdateTs: c.forUpdateTS,
|
||||
Keys: batch.mutations.GetKeys(),
|
||||
})
|
||||
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
||||
resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
|
|
|
|||
|
|
@ -142,7 +142,8 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
|
|||
}
|
||||
|
||||
return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req,
|
||||
kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, DiskFullOpt: c.diskFullOpt})
|
||||
kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag,
|
||||
DiskFullOpt: c.diskFullOpt, MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())})
|
||||
}
|
||||
|
||||
func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) (err error) {
|
||||
|
|
|
|||
|
|
@ -245,6 +245,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
|
|||
}
|
||||
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{TxnInfos: listTxnInfos})
|
||||
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
||||
startTime = time.Now()
|
||||
resp, err := lr.store.SendReq(bo, req, loc, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
|
|
@ -555,6 +556,7 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary
|
|||
if err != nil {
|
||||
return status, errors.Trace(err)
|
||||
}
|
||||
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
||||
resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return status, errors.Trace(err)
|
||||
|
|
@ -691,6 +693,7 @@ func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curK
|
|||
}
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdCheckSecondaryLocks, checkReq)
|
||||
metrics.LockResolverCountWithQueryCheckSecondaryLocks.Inc()
|
||||
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
||||
resp, err := lr.store.SendReq(bo, req, curRegionID, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
|
|
@ -823,7 +826,7 @@ func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region
|
|||
}
|
||||
lreq.Keys = keys
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq)
|
||||
|
||||
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
||||
resp, err := lr.store.SendReq(bo, req, region, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
|
|
@ -896,6 +899,7 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat
|
|||
lreq.Keys = [][]byte{l.Key}
|
||||
}
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq)
|
||||
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
||||
resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
|
|
@ -947,6 +951,7 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock, cle
|
|||
Keys: [][]byte{l.Key},
|
||||
}
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, pessimisticRollbackReq)
|
||||
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
|
||||
resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
|
|
|
|||
Loading…
Reference in New Issue