From 16d902a3c7e5e850c931f0e9515c3dbb4944b6f8 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Tue, 24 Aug 2021 17:05:36 +0800 Subject: [PATCH] reduce write backoffs but don't count busy errors (#271) Signed-off-by: Yilin Chen --- integration_tests/2pc_test.go | 2 -- internal/client/client.go | 7 +++++- internal/retry/backoff.go | 41 +++++++++++++++++++------------- internal/retry/config.go | 5 ++++ rawkv/rawkv.go | 6 ++++- tikv/client.go | 5 ++-- txnkv/transaction/2pc.go | 14 +++++------ txnkv/transaction/cleanup.go | 3 ++- txnkv/transaction/commit.go | 3 ++- txnkv/transaction/pessimistic.go | 4 +++- txnkv/transaction/prewrite.go | 3 ++- txnkv/txnlock/lock_resolver.go | 7 +++++- 12 files changed, 65 insertions(+), 35 deletions(-) diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index ee7f23aa..20a74ac2 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -84,12 +84,10 @@ type testCommitterSuite struct { func (s *testCommitterSuite) SetupSuite() { atomic.StoreUint64(&transaction.ManagedLockTTL, 3000) // 3s atomic.StoreUint64(&transaction.CommitMaxBackoff, 1000) - atomic.StoreUint64(&transaction.VeryLongMaxBackoff, 1000) } func (s *testCommitterSuite) TearDownSuite() { atomic.StoreUint64(&transaction.CommitMaxBackoff, 20000) - atomic.StoreUint64(&transaction.VeryLongMaxBackoff, 600000) } func (s *testCommitterSuite) SetupTest() { diff --git a/internal/client/client.go b/internal/client/client.go index c34bf814..392a2f84 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -76,8 +76,13 @@ var MaxRecvMsgSize = math.MaxInt64 - 1 // Timeout durations. const ( dialTimeout = 5 * time.Second - ReadTimeoutShort = 20 * time.Second // For requests that read/write several key-values. + ReadTimeoutShort = 30 * time.Second // For requests that read/write several key-values. ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region. + + // MaxWriteExecutionTime is the MaxExecutionDurationMs field for write requests. + // Because the last deadline check is before proposing, let us give it 10 more seconds + // after proposing. + MaxWriteExecutionTime = ReadTimeoutShort - 10*time.Second ) // Grpc window size diff --git a/internal/retry/backoff.go b/internal/retry/backoff.go index 8b4d02c5..c092fa77 100644 --- a/internal/retry/backoff.go +++ b/internal/retry/backoff.go @@ -57,9 +57,10 @@ import ( type Backoffer struct { ctx context.Context - fn map[string]backoffFn - maxSleep int - totalSleep int + fn map[string]backoffFn + maxSleep int + totalSleep int + excludedSleep int vars *kv.Variables noop bool @@ -140,7 +141,7 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano))) b.configs = append(b.configs, cfg) - if b.noop || (b.maxSleep > 0 && b.totalSleep >= b.maxSleep) { + if b.noop || (b.maxSleep > 0 && (b.totalSleep-b.excludedSleep) >= b.maxSleep) { errMsg := fmt.Sprintf("%s backoffer.maxSleep %dms is exceeded, errors:", cfg.String(), b.maxSleep) for i, err := range b.errors { // Print only last 3 errors for non-DEBUG log levels. @@ -166,7 +167,11 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e if cfg.metric != nil { (*cfg.metric).Observe(float64(realSleep) / 1000) } + b.totalSleep += realSleep + if _, ok := isSleepExcluded[cfg]; ok { + b.excludedSleep += realSleep + } if b.backoffSleepMS == nil { b.backoffSleepMS = make(map[string]int) } @@ -196,6 +201,7 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e logutil.Logger(b.ctx).Debug("retry later", zap.Error(err), zap.Int("totalSleep", b.totalSleep), + zap.Int("excludedSleep", b.excludedSleep), zap.Int("maxSleep", b.maxSleep), zap.Stringer("type", cfg), zap.Reflect("txnStartTS", startTs)) @@ -213,12 +219,13 @@ func (b *Backoffer) String() string { // current Backoffer's context. func (b *Backoffer) Clone() *Backoffer { return &Backoffer{ - ctx: b.ctx, - maxSleep: b.maxSleep, - totalSleep: b.totalSleep, - errors: b.errors, - vars: b.vars, - parent: b.parent, + ctx: b.ctx, + maxSleep: b.maxSleep, + totalSleep: b.totalSleep, + excludedSleep: b.excludedSleep, + errors: b.errors, + vars: b.vars, + parent: b.parent, } } @@ -227,12 +234,13 @@ func (b *Backoffer) Clone() *Backoffer { func (b *Backoffer) Fork() (*Backoffer, context.CancelFunc) { ctx, cancel := context.WithCancel(b.ctx) return &Backoffer{ - ctx: ctx, - maxSleep: b.maxSleep, - totalSleep: b.totalSleep, - errors: b.errors, - vars: b.vars, - parent: b, + ctx: ctx, + maxSleep: b.maxSleep, + totalSleep: b.totalSleep, + excludedSleep: b.excludedSleep, + errors: b.errors, + vars: b.vars, + parent: b, }, cancel } @@ -299,6 +307,7 @@ func (b *Backoffer) ErrorsNum() int { func (b *Backoffer) Reset() { b.fn = nil b.totalSleep = 0 + b.excludedSleep = 0 } // ResetMaxSleep resets the sleep state and max sleep limit of the backoffer. diff --git a/internal/retry/config.go b/internal/retry/config.go index 42f07816..c6fa5bfe 100644 --- a/internal/retry/config.go +++ b/internal/retry/config.go @@ -122,6 +122,11 @@ var ( BoTxnLockFast = NewConfig(txnLockFastName, &metrics.BackoffHistogramLockFast, NewBackoffFnCfg(2, 3000, EqualJitter), tikverr.ErrResolveLockTimeout) ) +var isSleepExcluded = map[*Config]struct{}{ + BoTiKVServerBusy: {}, + // add BoTiFlashServerBusy if appropriate +} + const ( // NoJitter makes the backoff sequence strict exponential. NoJitter = 1 + iota diff --git a/rawkv/rawkv.go b/rawkv/rawkv.go index 4bf4f843..50c853d5 100644 --- a/rawkv/rawkv.go +++ b/rawkv/rawkv.go @@ -268,6 +268,7 @@ func (c *Client) Delete(ctx context.Context, key []byte) error { Key: key, ForCas: c.atomic, }) + req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, _, err := c.sendReq(ctx, key, req, false) if err != nil { return errors.Trace(err) @@ -450,7 +451,7 @@ func (c *Client) CompareAndSwap(ctx context.Context, key, previousValue, newValu } req := tikvrpc.NewRequest(tikvrpc.CmdRawCompareAndSwap, &reqArgs) - + req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, _, err := c.sendReq(ctx, key, req, false) if err != nil { return nil, false, errors.Trace(err) @@ -565,6 +566,7 @@ func (c *Client) doBatchReq(bo *retry.Backoffer, batch kvrpc.Batch, cmdType tikv } sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient) + req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort) batchResp := kvrpc.BatchResult{} @@ -630,6 +632,7 @@ func (c *Client) sendDeleteRangeReq(ctx context.Context, startKey []byte, endKey EndKey: actualEndKey, }) + req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutShort) if err != nil { return nil, nil, errors.Trace(err) @@ -695,6 +698,7 @@ func (c *Client) doBatchPut(bo *retry.Backoffer, batch kvrpc.Batch) error { req := tikvrpc.NewRequest(tikvrpc.CmdRawBatchPut, &kvrpcpb.RawBatchPutRequest{Pairs: kvPair, ForCas: c.atomic}) sender := locate.NewRegionRequestSender(c.regionCache, c.rpcClient) + req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := sender.SendReq(bo, req, batch.RegionID, client.ReadTimeoutShort) if err != nil { return errors.Trace(err) diff --git a/tikv/client.go b/tikv/client.go index 5a724ac4..d25fed10 100644 --- a/tikv/client.go +++ b/tikv/client.go @@ -53,8 +53,9 @@ func WithSecurity(security config.Security) ClientOpt { // Timeout durations. const ( - ReadTimeoutMedium = client.ReadTimeoutMedium - ReadTimeoutShort = client.ReadTimeoutShort + ReadTimeoutMedium = client.ReadTimeoutMedium + ReadTimeoutShort = client.ReadTimeoutShort + MaxWriteExecutionTime = client.MaxWriteExecutionTime ) // NewRPCClient creates a client that manages connections and rpc calls with tikv-servers. diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index e4acf635..39e34419 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -82,10 +82,10 @@ var ( ) var ( - // CommitMaxBackoff is max sleep time of the 'commit' command - CommitMaxBackoff = uint64(41000) // PrewriteMaxBackoff is max sleep time of the `pre-write` command. - PrewriteMaxBackoff = 20000 + PrewriteMaxBackoff = 40000 + // CommitMaxBackoff is max sleep time of the 'commit' command + CommitMaxBackoff = uint64(40000) ) type kvstore interface { @@ -969,6 +969,7 @@ func sendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startT if err != nil { return 0, false, errors.Trace(err) } + req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort) if err != nil { return 0, false, errors.Trace(err) @@ -1077,9 +1078,6 @@ const ( TsoMaxBackoff = 15000 ) -// VeryLongMaxBackoff is the max sleep time of transaction commit. -var VeryLongMaxBackoff = uint64(600000) // 10mins - func (c *twoPhaseCommitter) cleanup(ctx context.Context) { c.cleanWg.Add(1) c.store.WaitGroup().Add(1) @@ -1185,7 +1183,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { // The maxSleep should't be very long in this case. // - If the region isn't found in PD, it's possible the reason is write-stall. // The maxSleep can be long in this case. - bo := retry.NewBackofferWithVars(ctx, int(atomic.LoadUint64(&VeryLongMaxBackoff)), c.txn.vars) + bo := retry.NewBackofferWithVars(ctx, PrewriteMaxBackoff, c.txn.vars) // If we want to use async commit or 1PC and also want linearizability across // all nodes, we have to make sure the commit TS of this transaction is greater @@ -1401,7 +1399,7 @@ func (c *twoPhaseCommitter) commitTxn(ctx context.Context, commitDetail *util.Co start := time.Now() // Use the VeryLongMaxBackoff to commit the primary key. - commitBo := retry.NewBackofferWithVars(ctx, int(atomic.LoadUint64(&VeryLongMaxBackoff)), c.txn.vars) + commitBo := retry.NewBackofferWithVars(ctx, int(CommitMaxBackoff), c.txn.vars) err := c.commitMutations(commitBo, c.mutations) commitDetail.CommitTime = time.Since(start) if commitBo.GetTotalSleep() > 0 { diff --git a/txnkv/transaction/cleanup.go b/txnkv/transaction/cleanup.go index 5ed189de..bf371c0e 100644 --- a/txnkv/transaction/cleanup.go +++ b/txnkv/transaction/cleanup.go @@ -62,7 +62,8 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &kvrpcpb.BatchRollbackRequest{ Keys: batch.mutations.GetKeys(), StartVersion: c.startTS, - }, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag}) + }, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, + MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())}) resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort) if err != nil { return errors.Trace(err) diff --git a/txnkv/transaction/commit.go b/txnkv/transaction/commit.go index f9cfae32..7ea09825 100644 --- a/txnkv/transaction/commit.go +++ b/txnkv/transaction/commit.go @@ -71,7 +71,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, Keys: keys, CommitVersion: c.commitTS, }, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, - ResourceGroupTag: c.resourceGroupTag, DiskFullOpt: c.diskFullOpt}) + ResourceGroupTag: c.resourceGroupTag, DiskFullOpt: c.diskFullOpt, + MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())}) tBegin := time.Now() attempts := 0 diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index f2a0e039..de00e070 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -117,7 +117,8 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * WaitTimeout: action.LockWaitTime(), ReturnValues: action.ReturnValues, MinCommitTs: c.forUpdateTS + 1, - }, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: action.LockCtx.ResourceGroupTag}) + }, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: action.LockCtx.ResourceGroupTag, + MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())}) lockWaitStartTime := action.WaitStartTime for { // if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit @@ -254,6 +255,7 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *ret ForUpdateTs: c.forUpdateTS, Keys: batch.mutations.GetKeys(), }) + req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort) if err != nil { return errors.Trace(err) diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index 72370774..2bc69f68 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -142,7 +142,8 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u } return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, - kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, DiskFullOpt: c.diskFullOpt}) + kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, + DiskFullOpt: c.diskFullOpt, MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())}) } func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) (err error) { diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index 221324b3..5df6c77b 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -245,6 +245,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo } req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{TxnInfos: listTxnInfos}) + req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) startTime = time.Now() resp, err := lr.store.SendReq(bo, req, loc, client.ReadTimeoutShort) if err != nil { @@ -555,6 +556,7 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary if err != nil { return status, errors.Trace(err) } + req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort) if err != nil { return status, errors.Trace(err) @@ -691,6 +693,7 @@ func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curK } req := tikvrpc.NewRequest(tikvrpc.CmdCheckSecondaryLocks, checkReq) metrics.LockResolverCountWithQueryCheckSecondaryLocks.Inc() + req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := lr.store.SendReq(bo, req, curRegionID, client.ReadTimeoutShort) if err != nil { return errors.Trace(err) @@ -823,7 +826,7 @@ func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region } lreq.Keys = keys req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq) - + req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := lr.store.SendReq(bo, req, region, client.ReadTimeoutShort) if err != nil { return errors.Trace(err) @@ -896,6 +899,7 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat lreq.Keys = [][]byte{l.Key} } req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq) + req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort) if err != nil { return errors.Trace(err) @@ -947,6 +951,7 @@ func (lr *LockResolver) resolvePessimisticLock(bo *retry.Backoffer, l *Lock, cle Keys: [][]byte{l.Key}, } req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, pessimisticRollbackReq) + req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort) if err != nil { return errors.Trace(err)