From 7c70c54016871c4ea3ce1727f5fbdb3640624c59 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 8 Apr 2024 16:03:59 +0800 Subject: [PATCH] Allow setting session id of KVTxn (#1270) * allow setting session id Signed-off-by: ekexium * comment: explain usage Signed-off-by: ekexium * try using ctx logger as much as possible Signed-off-by: ekexium --------- Signed-off-by: ekexium Co-authored-by: cfzjywxk --- txnkv/transaction/pipelined_flush.go | 22 +++++++--------------- txnkv/transaction/txn.go | 16 ++++++++++++++++ 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 288f5da9..0ea033d3 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -120,7 +120,6 @@ func (action actionPipelinedFlush) handleSingleBatch( "[pipelined dml] primary key should be set before pipelined flush", zap.Uint64("startTS", c.startTS), zap.Uint64("generation", action.generation), - zap.Uint64("session", c.sessionID), ) return errors.New("[pipelined dml] primary key should be set before pipelined flush") } @@ -136,11 +135,10 @@ func (action actionPipelinedFlush) handleSingleBatch( attempts++ reqBegin := time.Now() if reqBegin.Sub(tBegin) > slowRequestThreshold { - logutil.BgLogger().Warn( + logutil.Logger(bo.GetCtx()).Warn( "[pipelined dml] slow pipelined flush request", zap.Uint64("startTS", c.startTS), zap.Uint64("generation", action.generation), - zap.Uint64("session", c.sessionID), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts), ) @@ -228,9 +226,8 @@ func (action actionPipelinedFlush) handleSingleBatch( if err1 != nil { return err1 } - logutil.BgLogger().Info( + logutil.Logger(bo.GetCtx()).Info( "[pipelined dml] encounters lock", - zap.Uint64("session", c.sessionID), zap.Uint64("txnID", c.startTS), zap.Uint64("generation", action.generation), zap.Stringer("lock", lock), @@ -280,7 +277,6 @@ func (action actionPipelinedFlush) handleSingleBatch( zap.Error(err), zap.Uint64("startTS", c.startTS), zap.Uint64("generation", action.generation), - zap.Uint64("session", c.sessionID), ) return err } @@ -299,18 +295,17 @@ func (c *twoPhaseCommitter) pipelinedFlushMutations(bo *retry.Backoffer, mutatio } func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { - logutil.BgLogger().Info("[pipelined dml] start to commit transaction", + logutil.Logger(bo.GetCtx()).Info( + "[pipelined dml] start to commit transaction", zap.Int("keys", c.txn.GetMemBuffer().Len()), zap.String("size", units.HumanSize(float64(c.txn.GetMemBuffer().Size()))), zap.Uint64("startTS", c.startTS), - zap.Uint64("session", c.sessionID), ) commitTS, err := c.store.GetTimestampWithRetry(bo, c.txn.GetScope()) if err != nil { logutil.Logger(bo.GetCtx()).Warn("[pipelined dml] commit transaction get commitTS failed", zap.Error(err), zap.Uint64("txnStartTS", c.startTS), - zap.Uint64("session", c.sessionID), ) return err } @@ -328,11 +323,10 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { c.mu.RLock() c.mu.committed = true c.mu.RUnlock() - logutil.BgLogger().Info( + logutil.Logger(bo.GetCtx()).Info( "[pipelined dml] transaction is committed", zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", commitTS), - zap.Uint64("session", c.sessionID), ) if _, err := util.EvalFailpoint("pipelinedSkipResolveLock"); err == nil { @@ -431,7 +425,6 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end zap.Uint64("resolved regions", resolved.Load()), zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), - zap.Uint64("session", c.sessionID), ) return } @@ -446,15 +439,14 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end zap.Uint64("resolved regions", resolved.Load()), zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), - zap.Uint64("session", c.sessionID), zap.Error(err), ) } else { - logutil.BgLogger().Info("[pipelined dml] commit transaction secondaries done", + logutil.BgLogger().Info( + "[pipelined dml] commit transaction secondaries done", zap.Uint64("resolved regions", resolved.Load()), zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), - zap.Uint64("session", c.sessionID), ) } } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index c190494e..2546e35e 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -395,6 +395,22 @@ func (txn *KVTxn) SetTxnSource(txnSource uint64) { txn.txnSource = txnSource } +// SetSessionID sets the session ID of the transaction. +// If the committer is not initialized yet, the function won't take effect. +// It is supposed to be set before performing any writes in the transaction to avoid data race. +// It is designed to be called in ActivateTxn(), though subject to change. +// It is especially useful for pipelined transactions, as its committer is initialized immediately +// when the transaction is created. +// +// Note that commiter may also obtain a sessionID from context directly via sessionIDCtxKey. +// TODO: consider unifying them. + +func (txn *KVTxn) SetSessionID(sessionID uint64) { + if txn.committer != nil { + txn.committer.sessionID = sessionID + } +} + // GetDiskFullOpt gets the options of current operation in each TiKV disk usage level. func (txn *KVTxn) GetDiskFullOpt() kvrpcpb.DiskFullOpt { return txn.diskFullOpt