Allow setting session id of KVTxn (#1270)

* allow setting session id

Signed-off-by: ekexium <eke@fastmail.com>

* comment: explain usage

Signed-off-by: ekexium <eke@fastmail.com>

* try using ctx logger as much as possible

Signed-off-by: ekexium <eke@fastmail.com>

---------

Signed-off-by: ekexium <eke@fastmail.com>
Co-authored-by: cfzjywxk <lsswxrxr@163.com>
This commit is contained in:
ekexium 2024-04-08 16:03:59 +08:00 committed by GitHub
parent 2bd95773ce
commit 7c70c54016
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 23 additions and 15 deletions

View File

@ -120,7 +120,6 @@ func (action actionPipelinedFlush) handleSingleBatch(
"[pipelined dml] primary key should be set before pipelined flush",
zap.Uint64("startTS", c.startTS),
zap.Uint64("generation", action.generation),
zap.Uint64("session", c.sessionID),
)
return errors.New("[pipelined dml] primary key should be set before pipelined flush")
}
@ -136,11 +135,10 @@ func (action actionPipelinedFlush) handleSingleBatch(
attempts++
reqBegin := time.Now()
if reqBegin.Sub(tBegin) > slowRequestThreshold {
logutil.BgLogger().Warn(
logutil.Logger(bo.GetCtx()).Warn(
"[pipelined dml] slow pipelined flush request",
zap.Uint64("startTS", c.startTS),
zap.Uint64("generation", action.generation),
zap.Uint64("session", c.sessionID),
zap.Stringer("region", &batch.region),
zap.Int("attempts", attempts),
)
@ -228,9 +226,8 @@ func (action actionPipelinedFlush) handleSingleBatch(
if err1 != nil {
return err1
}
logutil.BgLogger().Info(
logutil.Logger(bo.GetCtx()).Info(
"[pipelined dml] encounters lock",
zap.Uint64("session", c.sessionID),
zap.Uint64("txnID", c.startTS),
zap.Uint64("generation", action.generation),
zap.Stringer("lock", lock),
@ -280,7 +277,6 @@ func (action actionPipelinedFlush) handleSingleBatch(
zap.Error(err),
zap.Uint64("startTS", c.startTS),
zap.Uint64("generation", action.generation),
zap.Uint64("session", c.sessionID),
)
return err
}
@ -299,18 +295,17 @@ func (c *twoPhaseCommitter) pipelinedFlushMutations(bo *retry.Backoffer, mutatio
}
func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error {
logutil.BgLogger().Info("[pipelined dml] start to commit transaction",
logutil.Logger(bo.GetCtx()).Info(
"[pipelined dml] start to commit transaction",
zap.Int("keys", c.txn.GetMemBuffer().Len()),
zap.String("size", units.HumanSize(float64(c.txn.GetMemBuffer().Size()))),
zap.Uint64("startTS", c.startTS),
zap.Uint64("session", c.sessionID),
)
commitTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope())
if err != nil {
logutil.Logger(bo.GetCtx()).Warn("[pipelined dml] commit transaction get commitTS failed",
zap.Error(err),
zap.Uint64("txnStartTS", c.startTS),
zap.Uint64("session", c.sessionID),
)
return err
}
@ -328,11 +323,10 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error {
c.mu.RLock()
c.mu.committed = true
c.mu.RUnlock()
logutil.BgLogger().Info(
logutil.Logger(bo.GetCtx()).Info(
"[pipelined dml] transaction is committed",
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", commitTS),
zap.Uint64("session", c.sessionID),
)
if _, err := util.EvalFailpoint("pipelinedSkipResolveLock"); err == nil {
@ -431,7 +425,6 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("session", c.sessionID),
)
return
}
@ -446,15 +439,14 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("session", c.sessionID),
zap.Error(err),
)
} else {
logutil.BgLogger().Info("[pipelined dml] commit transaction secondaries done",
logutil.BgLogger().Info(
"[pipelined dml] commit transaction secondaries done",
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("session", c.sessionID),
)
}
}

View File

@ -395,6 +395,22 @@ func (txn *KVTxn) SetTxnSource(txnSource uint64) {
txn.txnSource = txnSource
}
// SetSessionID sets the session ID of the transaction.
// If the committer is not initialized yet, the function won't take effect.
// It is supposed to be set before performing any writes in the transaction to avoid data race.
// It is designed to be called in ActivateTxn(), though subject to change.
// It is especially useful for pipelined transactions, as its committer is initialized immediately
// when the transaction is created.
//
// Note that commiter may also obtain a sessionID from context directly via sessionIDCtxKey.
// TODO: consider unifying them.
func (txn *KVTxn) SetSessionID(sessionID uint64) {
if txn.committer != nil {
txn.committer.sessionID = sessionID
}
}
// GetDiskFullOpt gets the options of current operation in each TiKV disk usage level.
func (txn *KVTxn) GetDiskFullOpt() kvrpcpb.DiskFullOpt {
return txn.diskFullOpt