From 3150e385e39fbbb324fe975d68abe4fdf5dbd6ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Mon, 12 May 2025 19:19:20 +0800 Subject: [PATCH] txn: provide more information in commit RPC / log mvcc debug info when commit failed for `TxnLockNotFound` (#1640) ref tikv/client-go#1631 Signed-off-by: Chao Wang --- error/error.go | 46 +++++++++++ error/error_test.go | 68 ++++++++++++++++ internal/apicodec/codec_v2.go | 18 +++++ internal/apicodec/codec_v2_test.go | 117 ++++++++++++++++++++++++--- txnkv/transaction/commit.go | 17 +++- txnkv/transaction/pipelined_flush.go | 8 +- txnkv/txnlock/lock_resolver.go | 21 ++++- 7 files changed, 276 insertions(+), 19 deletions(-) create mode 100644 error/error_test.go diff --git a/error/error.go b/error/error.go index 59765133..4ee02287 100644 --- a/error/error.go +++ b/error/error.go @@ -35,9 +35,11 @@ package error import ( + "encoding/json" "fmt" "time" + "github.com/gogo/protobuf/proto" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -355,3 +357,47 @@ func Log(err error) { log.Error("encountered error", zap.Error(err), zap.Stack("stack")) } } + +// ExtractDebugInfoStrFromKeyErr extracts the debug info from key error +func ExtractDebugInfoStrFromKeyErr(keyErr *kvrpcpb.KeyError) string { + if keyErr.DebugInfo == nil { + return "" + } + + debugInfoToMarshal := keyErr.DebugInfo + if redact.NeedRedact() { + redactMarker := []byte{'?'} + debugInfoToMarshal = proto.Clone(debugInfoToMarshal).(*kvrpcpb.DebugInfo) + for _, mvccInfo := range debugInfoToMarshal.MvccInfo { + mvccInfo.Key = redactMarker + if mvcc := mvccInfo.Mvcc; mvcc != nil { + if lock := mvcc.Lock; lock != nil { + lock.Primary = redactMarker + lock.ShortValue = redactMarker + for i := range lock.Secondaries { + lock.Secondaries[i] = redactMarker + } + } + + for _, write := range mvcc.Writes { + if write != nil { + write.ShortValue = redactMarker + } + } + + for _, value := range mvcc.Values { + if value != nil { + value.Value = redactMarker + } + } + } + } + } + + debugStr, err := json.Marshal(debugInfoToMarshal) + if err != nil { + log.Error("encountered error when extracting debug info for keyError", zap.Error(err), zap.Stack("stack")) + return "" + } + return string(debugStr) +} diff --git a/error/error_test.go b/error/error_test.go new file mode 100644 index 00000000..67eea837 --- /dev/null +++ b/error/error_test.go @@ -0,0 +1,68 @@ +package error + +import ( + "testing" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/stretchr/testify/assert" +) + +func TestExtractDebugInfoStrFromKeyErr(t *testing.T) { + origRedact := errors.RedactLogEnabled.Load() + defer errors.RedactLogEnabled.Store(origRedact) + + errors.RedactLogEnabled.Store(errors.RedactLogDisable) + // empty debug info + assert.Equal(t, "", ExtractDebugInfoStrFromKeyErr(&kvrpcpb.KeyError{ + TxnLockNotFound: &kvrpcpb.TxnLockNotFound{Key: []byte("byte")}, + })) + // non-empty debug info + debugInfo := &kvrpcpb.DebugInfo{ + MvccInfo: []*kvrpcpb.MvccDebugInfo{ + { + Key: []byte("byte"), + Mvcc: &kvrpcpb.MvccInfo{ + Lock: &kvrpcpb.MvccLock{ + Type: kvrpcpb.Op_Del, + StartTs: 128, + Primary: []byte("k1"), + Secondaries: [][]byte{ + []byte("k1"), + []byte("k2"), + }, + ShortValue: []byte("v1"), + }, + Writes: []*kvrpcpb.MvccWrite{ + { + Type: kvrpcpb.Op_Insert, + StartTs: 64, + CommitTs: 86, + ShortValue: []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6}, + }, + }, + Values: []*kvrpcpb.MvccValue{ + { + StartTs: 64, + Value: []byte{0x11, 0x12}, + }, + }, + }, + }, + }, + } + + expectedStr := `{"mvcc_info":[{"key":"Ynl0ZQ==","mvcc":{"lock":{"type":1,"start_ts":128,"primary":"azE=","short_value":"djE=","secondaries":["azE=","azI="]},"writes":[{"type":4,"start_ts":64,"commit_ts":86,"short_value":"AQIDBAUG"}],"values":[{"start_ts":64,"value":"ERI="}]}}]}` + assert.Equal(t, expectedStr, ExtractDebugInfoStrFromKeyErr(&kvrpcpb.KeyError{ + TxnLockNotFound: &kvrpcpb.TxnLockNotFound{Key: []byte("byte")}, + DebugInfo: debugInfo, + })) + + // redact log enabled + errors.RedactLogEnabled.Store(errors.RedactLogEnable) + expectedStr = `{"mvcc_info":[{"key":"Pw==","mvcc":{"lock":{"type":1,"start_ts":128,"primary":"Pw==","short_value":"Pw==","secondaries":["Pw==","Pw=="]},"writes":[{"type":4,"start_ts":64,"commit_ts":86,"short_value":"Pw=="}],"values":[{"start_ts":64,"value":"Pw=="}]}}]}` + assert.Equal(t, expectedStr, ExtractDebugInfoStrFromKeyErr(&kvrpcpb.KeyError{ + TxnLockNotFound: &kvrpcpb.TxnLockNotFound{Key: []byte("byte")}, + DebugInfo: debugInfo, + })) +} diff --git a/internal/apicodec/codec_v2.go b/internal/apicodec/codec_v2.go index d60ca424..a15a242e 100644 --- a/internal/apicodec/codec_v2.go +++ b/internal/apicodec/codec_v2.go @@ -149,6 +149,11 @@ func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error) case tikvrpc.CmdCommit: r := *req.Commit() r.Keys = c.encodeKeys(r.Keys) + if len(r.PrimaryKey) > 0 { + // Only encode the primary key if it is not empty. + // Otherwise, it means `PrimaryKey` is not set, left it as empty to indicate it is not set in RPC. + r.PrimaryKey = c.EncodeKey(r.PrimaryKey) + } req.Req = &r case tikvrpc.CmdCleanup: r := *req.Cleanup() @@ -938,6 +943,19 @@ func (c *codecV2) decodeKeyError(keyError *kvrpcpb.KeyError) (*kvrpcpb.KeyError, return nil, err } } + if keyError.TxnLockNotFound != nil { + keyError.TxnLockNotFound.Key, err = c.DecodeKey(keyError.TxnLockNotFound.Key) + if err != nil { + return nil, err + } + } + if debugInfo := keyError.DebugInfo; debugInfo != nil { + for _, mvccInfo := range debugInfo.MvccInfo { + if mvccInfo.Key, err = c.DecodeKey(mvccInfo.Key); err != nil { + return nil, err + } + } + } return keyError, nil } diff --git a/internal/apicodec/codec_v2_test.go b/internal/apicodec/codec_v2_test.go index 67d9009d..8331302d 100644 --- a/internal/apicodec/codec_v2_test.go +++ b/internal/apicodec/codec_v2_test.go @@ -50,21 +50,65 @@ func (suite *testCodecV2Suite) SetupSuite() { func (suite *testCodecV2Suite) TestEncodeRequest() { re := suite.Require() - req := &tikvrpc.Request{ - Type: tikvrpc.CmdRawGet, - Req: &kvrpcpb.RawGetRequest{ - Key: []byte("key"), + requests := []struct { + name string + req *tikvrpc.Request + validate func(*tikvrpc.Request) + }{ + { + name: "CmdRawGet", + req: &tikvrpc.Request{ + Type: tikvrpc.CmdRawGet, + Req: &kvrpcpb.RawGetRequest{ + Key: []byte("key"), + }, + }, + validate: func(encoded *tikvrpc.Request) { + re.Equal(append(keyspacePrefix, []byte("key")...), encoded.RawGet().Key) + }, + }, + { + name: "CmdCommitWithOutPrimaryKey", + req: &tikvrpc.Request{ + Type: tikvrpc.CmdCommit, + Req: &kvrpcpb.CommitRequest{ + Keys: [][]byte{[]byte("key1"), []byte("key2")}, + }, + }, + validate: func(encoded *tikvrpc.Request) { + re.Equal([][]byte{ + append(keyspacePrefix, []byte("key1")...), + append(keyspacePrefix, []byte("key2")...), + }, encoded.Commit().Keys) + re.Empty(encoded.Commit().PrimaryKey) + }, + }, + { + name: "CmdCommitWithPrimaryKey", + req: &tikvrpc.Request{ + Type: tikvrpc.CmdCommit, + Req: &kvrpcpb.CommitRequest{ + Keys: [][]byte{[]byte("key1"), []byte("key2")}, + PrimaryKey: []byte("key1"), + }, + }, + validate: func(encoded *tikvrpc.Request) { + re.Equal([][]byte{ + append(keyspacePrefix, []byte("key1")...), + append(keyspacePrefix, []byte("key2")...), + }, encoded.Commit().Keys) + re.Equal(append(keyspacePrefix, []byte("key1")...), encoded.Commit().PrimaryKey) + }, }, } - req.ApiVersion = kvrpcpb.APIVersion_V2 - r, err := suite.codec.EncodeRequest(req) - re.NoError(err) - re.Equal(append(keyspacePrefix, []byte("key")...), r.RawGet().Key) - - r, err = suite.codec.EncodeRequest(req) - re.NoError(err) - re.Equal(append(keyspacePrefix, []byte("key")...), r.RawGet().Key) + for _, req := range requests { + suite.Run(req.name, func() { + encoded, err := suite.codec.EncodeRequest(req.req) + re.NoError(err) + req.validate(encoded) + }) + } } func (suite *testCodecV2Suite) TestEncodeV2KeyRanges() { @@ -278,6 +322,55 @@ func (suite *testCodecV2Suite) TestDecodeEpochNotMatch() { } } +func (suite *testCodecV2Suite) TestDecodeKeyError() { + re := suite.Require() + errors := []struct { + name string + err *kvrpcpb.KeyError + validate func(*kvrpcpb.KeyError) + }{ + { + name: "TxnLockNotFound", + err: &kvrpcpb.KeyError{ + TxnLockNotFound: &kvrpcpb.TxnLockNotFound{ + Key: append(keyspacePrefix, []byte("key1")...), + }, + }, + validate: func(decoded *kvrpcpb.KeyError) { + re.Equal([]byte("key1"), decoded.TxnLockNotFound.Key) + }, + }, + { + name: "MvccDebugInfo", + err: &kvrpcpb.KeyError{ + TxnLockNotFound: &kvrpcpb.TxnLockNotFound{ + Key: append(keyspacePrefix, []byte("key1")...), + }, + DebugInfo: &kvrpcpb.DebugInfo{ + MvccInfo: []*kvrpcpb.MvccDebugInfo{ + { + Key: append(keyspacePrefix, []byte("key1")...), + Mvcc: &kvrpcpb.MvccInfo{}, + }, + }, + }, + }, + validate: func(decoded *kvrpcpb.KeyError) { + re.Equal([]byte("key1"), decoded.TxnLockNotFound.Key) + re.Equal(1, len(decoded.DebugInfo.MvccInfo)) + re.Equal([]byte("key1"), decoded.DebugInfo.MvccInfo[0].Key) + }, + }, + } + + codec := suite.codec + for _, keyErr := range errors { + decoded, err := codec.decodeKeyError(keyErr.err) + re.NoError(err) + keyErr.validate(decoded) + } +} + func (suite *testCodecV2Suite) TestGetKeyspaceID() { suite.Equal(KeyspaceID(testKeyspaceID), suite.codec.GetKeyspaceID()) } diff --git a/txnkv/transaction/commit.go b/txnkv/transaction/commit.go index b3ad8c19..74e87aec 100644 --- a/txnkv/transaction/commit.go +++ b/txnkv/transaction/commit.go @@ -73,10 +73,18 @@ func (action actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer { func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error { keys := batch.mutations.GetKeys() + var commitRole kvrpcpb.CommitRole + if batch.isPrimary { + commitRole = kvrpcpb.CommitRole_Primary + } else { + commitRole = kvrpcpb.CommitRole_Secondary + } req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &kvrpcpb.CommitRequest{ StartVersion: c.startTS, Keys: keys, + PrimaryKey: c.primary(), CommitVersion: c.commitTS, + CommitRole: commitRole, }, kvrpcpb.Context{ Priority: c.priority, SyncLog: c.syncLog, @@ -212,13 +220,18 @@ func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Bac zap.Error(err), zap.Uint64("txnStartTS", c.startTS), zap.Uint64("commitTS", c.commitTS), - zap.Strings("keys", hexBatchKeys(keys))) + zap.Strings("keys", hexBatchKeys(keys)), + zap.Uint64("sessionID", c.sessionID), + zap.String("debugInfo", tikverr.ExtractDebugInfoStrFromKeyErr(keyErr))) return err } // The transaction maybe rolled back by concurrent transactions. logutil.Logger(bo.GetCtx()).Debug("2PC failed commit primary key", zap.Error(err), - zap.Uint64("txnStartTS", c.startTS)) + zap.Uint64("txnStartTS", c.startTS), + zap.Uint64("commitTS", c.commitTS), + zap.Uint64("sessionID", c.sessionID), + ) return err } break diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 6e19c94b..7de45993 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -438,7 +438,13 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved * cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse) if keyErr := cmdResp.GetError(); keyErr != nil { err = errors.Errorf("unexpected resolve err: %s", keyErr) - logutil.BgLogger().Error("resolveLock error", zap.Error(err)) + logutil.BgLogger().Error( + "resolveLock error", + zap.Error(err), + zap.Uint64("startVer", lreq.StartVersion), + zap.Uint64("commitVer", lreq.CommitVersion), + zap.String("debugInfo", tikverr.ExtractDebugInfoStrFromKeyErr(keyErr)), + ) return res, err } resolved.Add(1) diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index 1b1b33cf..15299ea3 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -344,7 +344,13 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo } cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse) if keyErr := cmdResp.GetError(); keyErr != nil { - return false, errors.Errorf("unexpected resolve err: %s", keyErr) + err = errors.Errorf("unexpected resolve err: %s", keyErr) + logutil.BgLogger().Error( + "resolveLock error", + zap.Error(err), + zap.String("debugInfo", tikverr.ExtractDebugInfoStrFromKeyErr(keyErr)), + ) + return false, err } logutil.BgLogger().Info("BatchResolveLocks: resolve locks in a batch", @@ -1124,10 +1130,13 @@ func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse) if keyErr := cmdResp.GetError(); keyErr != nil { err = errors.Errorf("unexpected resolve err: %s, lock: %v", keyErr, l) - logutil.BgLogger().Error("resolveLock error", zap.Error(err)) + logutil.BgLogger().Error("resolveLock error", + zap.Error(err), + zap.String("debugInfo", tikverr.ExtractDebugInfoStrFromKeyErr(keyErr)), + ) } - return nil + return err } func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStatus, lite bool, cleanRegions map[locate.RegionVerID]struct{}) error { @@ -1190,7 +1199,11 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse) if keyErr := cmdResp.GetError(); keyErr != nil { err = errors.Errorf("unexpected resolve err: %s, lock: %v", keyErr, l) - logutil.BgLogger().Error("resolveLock error", zap.Error(err)) + logutil.BgLogger().Error( + "resolveLock error", + zap.Error(err), + zap.String("debugInfo", tikverr.ExtractDebugInfoStrFromKeyErr(keyErr)), + ) return err } if !resolveLite {