mirror of https://github.com/tikv/client-go.git
txn: provide more information in commit RPC / log mvcc debug info when commit failed for `TxnLockNotFound` (#1640)
ref tikv/client-go#1631 Signed-off-by: Chao Wang <cclcwangchao@hotmail.com>
This commit is contained in:
parent
2058fbc062
commit
3150e385e3
|
@ -35,9 +35,11 @@
|
|||
package error
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/kvproto/pkg/pdpb"
|
||||
"github.com/pingcap/log"
|
||||
|
@ -355,3 +357,47 @@ func Log(err error) {
|
|||
log.Error("encountered error", zap.Error(err), zap.Stack("stack"))
|
||||
}
|
||||
}
|
||||
|
||||
// ExtractDebugInfoStrFromKeyErr extracts the debug info from key error
|
||||
func ExtractDebugInfoStrFromKeyErr(keyErr *kvrpcpb.KeyError) string {
|
||||
if keyErr.DebugInfo == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
debugInfoToMarshal := keyErr.DebugInfo
|
||||
if redact.NeedRedact() {
|
||||
redactMarker := []byte{'?'}
|
||||
debugInfoToMarshal = proto.Clone(debugInfoToMarshal).(*kvrpcpb.DebugInfo)
|
||||
for _, mvccInfo := range debugInfoToMarshal.MvccInfo {
|
||||
mvccInfo.Key = redactMarker
|
||||
if mvcc := mvccInfo.Mvcc; mvcc != nil {
|
||||
if lock := mvcc.Lock; lock != nil {
|
||||
lock.Primary = redactMarker
|
||||
lock.ShortValue = redactMarker
|
||||
for i := range lock.Secondaries {
|
||||
lock.Secondaries[i] = redactMarker
|
||||
}
|
||||
}
|
||||
|
||||
for _, write := range mvcc.Writes {
|
||||
if write != nil {
|
||||
write.ShortValue = redactMarker
|
||||
}
|
||||
}
|
||||
|
||||
for _, value := range mvcc.Values {
|
||||
if value != nil {
|
||||
value.Value = redactMarker
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debugStr, err := json.Marshal(debugInfoToMarshal)
|
||||
if err != nil {
|
||||
log.Error("encountered error when extracting debug info for keyError", zap.Error(err), zap.Stack("stack"))
|
||||
return ""
|
||||
}
|
||||
return string(debugStr)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
package error
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestExtractDebugInfoStrFromKeyErr(t *testing.T) {
|
||||
origRedact := errors.RedactLogEnabled.Load()
|
||||
defer errors.RedactLogEnabled.Store(origRedact)
|
||||
|
||||
errors.RedactLogEnabled.Store(errors.RedactLogDisable)
|
||||
// empty debug info
|
||||
assert.Equal(t, "", ExtractDebugInfoStrFromKeyErr(&kvrpcpb.KeyError{
|
||||
TxnLockNotFound: &kvrpcpb.TxnLockNotFound{Key: []byte("byte")},
|
||||
}))
|
||||
// non-empty debug info
|
||||
debugInfo := &kvrpcpb.DebugInfo{
|
||||
MvccInfo: []*kvrpcpb.MvccDebugInfo{
|
||||
{
|
||||
Key: []byte("byte"),
|
||||
Mvcc: &kvrpcpb.MvccInfo{
|
||||
Lock: &kvrpcpb.MvccLock{
|
||||
Type: kvrpcpb.Op_Del,
|
||||
StartTs: 128,
|
||||
Primary: []byte("k1"),
|
||||
Secondaries: [][]byte{
|
||||
[]byte("k1"),
|
||||
[]byte("k2"),
|
||||
},
|
||||
ShortValue: []byte("v1"),
|
||||
},
|
||||
Writes: []*kvrpcpb.MvccWrite{
|
||||
{
|
||||
Type: kvrpcpb.Op_Insert,
|
||||
StartTs: 64,
|
||||
CommitTs: 86,
|
||||
ShortValue: []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6},
|
||||
},
|
||||
},
|
||||
Values: []*kvrpcpb.MvccValue{
|
||||
{
|
||||
StartTs: 64,
|
||||
Value: []byte{0x11, 0x12},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
expectedStr := `{"mvcc_info":[{"key":"Ynl0ZQ==","mvcc":{"lock":{"type":1,"start_ts":128,"primary":"azE=","short_value":"djE=","secondaries":["azE=","azI="]},"writes":[{"type":4,"start_ts":64,"commit_ts":86,"short_value":"AQIDBAUG"}],"values":[{"start_ts":64,"value":"ERI="}]}}]}`
|
||||
assert.Equal(t, expectedStr, ExtractDebugInfoStrFromKeyErr(&kvrpcpb.KeyError{
|
||||
TxnLockNotFound: &kvrpcpb.TxnLockNotFound{Key: []byte("byte")},
|
||||
DebugInfo: debugInfo,
|
||||
}))
|
||||
|
||||
// redact log enabled
|
||||
errors.RedactLogEnabled.Store(errors.RedactLogEnable)
|
||||
expectedStr = `{"mvcc_info":[{"key":"Pw==","mvcc":{"lock":{"type":1,"start_ts":128,"primary":"Pw==","short_value":"Pw==","secondaries":["Pw==","Pw=="]},"writes":[{"type":4,"start_ts":64,"commit_ts":86,"short_value":"Pw=="}],"values":[{"start_ts":64,"value":"Pw=="}]}}]}`
|
||||
assert.Equal(t, expectedStr, ExtractDebugInfoStrFromKeyErr(&kvrpcpb.KeyError{
|
||||
TxnLockNotFound: &kvrpcpb.TxnLockNotFound{Key: []byte("byte")},
|
||||
DebugInfo: debugInfo,
|
||||
}))
|
||||
}
|
|
@ -149,6 +149,11 @@ func (c *codecV2) EncodeRequest(req *tikvrpc.Request) (*tikvrpc.Request, error)
|
|||
case tikvrpc.CmdCommit:
|
||||
r := *req.Commit()
|
||||
r.Keys = c.encodeKeys(r.Keys)
|
||||
if len(r.PrimaryKey) > 0 {
|
||||
// Only encode the primary key if it is not empty.
|
||||
// Otherwise, it means `PrimaryKey` is not set, left it as empty to indicate it is not set in RPC.
|
||||
r.PrimaryKey = c.EncodeKey(r.PrimaryKey)
|
||||
}
|
||||
req.Req = &r
|
||||
case tikvrpc.CmdCleanup:
|
||||
r := *req.Cleanup()
|
||||
|
@ -938,6 +943,19 @@ func (c *codecV2) decodeKeyError(keyError *kvrpcpb.KeyError) (*kvrpcpb.KeyError,
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
if keyError.TxnLockNotFound != nil {
|
||||
keyError.TxnLockNotFound.Key, err = c.DecodeKey(keyError.TxnLockNotFound.Key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if debugInfo := keyError.DebugInfo; debugInfo != nil {
|
||||
for _, mvccInfo := range debugInfo.MvccInfo {
|
||||
if mvccInfo.Key, err = c.DecodeKey(mvccInfo.Key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return keyError, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -50,21 +50,65 @@ func (suite *testCodecV2Suite) SetupSuite() {
|
|||
|
||||
func (suite *testCodecV2Suite) TestEncodeRequest() {
|
||||
re := suite.Require()
|
||||
req := &tikvrpc.Request{
|
||||
Type: tikvrpc.CmdRawGet,
|
||||
Req: &kvrpcpb.RawGetRequest{
|
||||
Key: []byte("key"),
|
||||
requests := []struct {
|
||||
name string
|
||||
req *tikvrpc.Request
|
||||
validate func(*tikvrpc.Request)
|
||||
}{
|
||||
{
|
||||
name: "CmdRawGet",
|
||||
req: &tikvrpc.Request{
|
||||
Type: tikvrpc.CmdRawGet,
|
||||
Req: &kvrpcpb.RawGetRequest{
|
||||
Key: []byte("key"),
|
||||
},
|
||||
},
|
||||
validate: func(encoded *tikvrpc.Request) {
|
||||
re.Equal(append(keyspacePrefix, []byte("key")...), encoded.RawGet().Key)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "CmdCommitWithOutPrimaryKey",
|
||||
req: &tikvrpc.Request{
|
||||
Type: tikvrpc.CmdCommit,
|
||||
Req: &kvrpcpb.CommitRequest{
|
||||
Keys: [][]byte{[]byte("key1"), []byte("key2")},
|
||||
},
|
||||
},
|
||||
validate: func(encoded *tikvrpc.Request) {
|
||||
re.Equal([][]byte{
|
||||
append(keyspacePrefix, []byte("key1")...),
|
||||
append(keyspacePrefix, []byte("key2")...),
|
||||
}, encoded.Commit().Keys)
|
||||
re.Empty(encoded.Commit().PrimaryKey)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "CmdCommitWithPrimaryKey",
|
||||
req: &tikvrpc.Request{
|
||||
Type: tikvrpc.CmdCommit,
|
||||
Req: &kvrpcpb.CommitRequest{
|
||||
Keys: [][]byte{[]byte("key1"), []byte("key2")},
|
||||
PrimaryKey: []byte("key1"),
|
||||
},
|
||||
},
|
||||
validate: func(encoded *tikvrpc.Request) {
|
||||
re.Equal([][]byte{
|
||||
append(keyspacePrefix, []byte("key1")...),
|
||||
append(keyspacePrefix, []byte("key2")...),
|
||||
}, encoded.Commit().Keys)
|
||||
re.Equal(append(keyspacePrefix, []byte("key1")...), encoded.Commit().PrimaryKey)
|
||||
},
|
||||
},
|
||||
}
|
||||
req.ApiVersion = kvrpcpb.APIVersion_V2
|
||||
|
||||
r, err := suite.codec.EncodeRequest(req)
|
||||
re.NoError(err)
|
||||
re.Equal(append(keyspacePrefix, []byte("key")...), r.RawGet().Key)
|
||||
|
||||
r, err = suite.codec.EncodeRequest(req)
|
||||
re.NoError(err)
|
||||
re.Equal(append(keyspacePrefix, []byte("key")...), r.RawGet().Key)
|
||||
for _, req := range requests {
|
||||
suite.Run(req.name, func() {
|
||||
encoded, err := suite.codec.EncodeRequest(req.req)
|
||||
re.NoError(err)
|
||||
req.validate(encoded)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *testCodecV2Suite) TestEncodeV2KeyRanges() {
|
||||
|
@ -278,6 +322,55 @@ func (suite *testCodecV2Suite) TestDecodeEpochNotMatch() {
|
|||
}
|
||||
}
|
||||
|
||||
func (suite *testCodecV2Suite) TestDecodeKeyError() {
|
||||
re := suite.Require()
|
||||
errors := []struct {
|
||||
name string
|
||||
err *kvrpcpb.KeyError
|
||||
validate func(*kvrpcpb.KeyError)
|
||||
}{
|
||||
{
|
||||
name: "TxnLockNotFound",
|
||||
err: &kvrpcpb.KeyError{
|
||||
TxnLockNotFound: &kvrpcpb.TxnLockNotFound{
|
||||
Key: append(keyspacePrefix, []byte("key1")...),
|
||||
},
|
||||
},
|
||||
validate: func(decoded *kvrpcpb.KeyError) {
|
||||
re.Equal([]byte("key1"), decoded.TxnLockNotFound.Key)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "MvccDebugInfo",
|
||||
err: &kvrpcpb.KeyError{
|
||||
TxnLockNotFound: &kvrpcpb.TxnLockNotFound{
|
||||
Key: append(keyspacePrefix, []byte("key1")...),
|
||||
},
|
||||
DebugInfo: &kvrpcpb.DebugInfo{
|
||||
MvccInfo: []*kvrpcpb.MvccDebugInfo{
|
||||
{
|
||||
Key: append(keyspacePrefix, []byte("key1")...),
|
||||
Mvcc: &kvrpcpb.MvccInfo{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
validate: func(decoded *kvrpcpb.KeyError) {
|
||||
re.Equal([]byte("key1"), decoded.TxnLockNotFound.Key)
|
||||
re.Equal(1, len(decoded.DebugInfo.MvccInfo))
|
||||
re.Equal([]byte("key1"), decoded.DebugInfo.MvccInfo[0].Key)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
codec := suite.codec
|
||||
for _, keyErr := range errors {
|
||||
decoded, err := codec.decodeKeyError(keyErr.err)
|
||||
re.NoError(err)
|
||||
keyErr.validate(decoded)
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *testCodecV2Suite) TestGetKeyspaceID() {
|
||||
suite.Equal(KeyspaceID(testKeyspaceID), suite.codec.GetKeyspaceID())
|
||||
}
|
||||
|
|
|
@ -73,10 +73,18 @@ func (action actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer {
|
|||
|
||||
func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
|
||||
keys := batch.mutations.GetKeys()
|
||||
var commitRole kvrpcpb.CommitRole
|
||||
if batch.isPrimary {
|
||||
commitRole = kvrpcpb.CommitRole_Primary
|
||||
} else {
|
||||
commitRole = kvrpcpb.CommitRole_Secondary
|
||||
}
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &kvrpcpb.CommitRequest{
|
||||
StartVersion: c.startTS,
|
||||
Keys: keys,
|
||||
PrimaryKey: c.primary(),
|
||||
CommitVersion: c.commitTS,
|
||||
CommitRole: commitRole,
|
||||
}, kvrpcpb.Context{
|
||||
Priority: c.priority,
|
||||
SyncLog: c.syncLog,
|
||||
|
@ -212,13 +220,18 @@ func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Bac
|
|||
zap.Error(err),
|
||||
zap.Uint64("txnStartTS", c.startTS),
|
||||
zap.Uint64("commitTS", c.commitTS),
|
||||
zap.Strings("keys", hexBatchKeys(keys)))
|
||||
zap.Strings("keys", hexBatchKeys(keys)),
|
||||
zap.Uint64("sessionID", c.sessionID),
|
||||
zap.String("debugInfo", tikverr.ExtractDebugInfoStrFromKeyErr(keyErr)))
|
||||
return err
|
||||
}
|
||||
// The transaction maybe rolled back by concurrent transactions.
|
||||
logutil.Logger(bo.GetCtx()).Debug("2PC failed commit primary key",
|
||||
zap.Error(err),
|
||||
zap.Uint64("txnStartTS", c.startTS))
|
||||
zap.Uint64("txnStartTS", c.startTS),
|
||||
zap.Uint64("commitTS", c.commitTS),
|
||||
zap.Uint64("sessionID", c.sessionID),
|
||||
)
|
||||
return err
|
||||
}
|
||||
break
|
||||
|
|
|
@ -438,7 +438,13 @@ func (c *twoPhaseCommitter) buildPipelinedResolveHandler(commit bool, resolved *
|
|||
cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse)
|
||||
if keyErr := cmdResp.GetError(); keyErr != nil {
|
||||
err = errors.Errorf("unexpected resolve err: %s", keyErr)
|
||||
logutil.BgLogger().Error("resolveLock error", zap.Error(err))
|
||||
logutil.BgLogger().Error(
|
||||
"resolveLock error",
|
||||
zap.Error(err),
|
||||
zap.Uint64("startVer", lreq.StartVersion),
|
||||
zap.Uint64("commitVer", lreq.CommitVersion),
|
||||
zap.String("debugInfo", tikverr.ExtractDebugInfoStrFromKeyErr(keyErr)),
|
||||
)
|
||||
return res, err
|
||||
}
|
||||
resolved.Add(1)
|
||||
|
|
|
@ -344,7 +344,13 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
|
|||
}
|
||||
cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse)
|
||||
if keyErr := cmdResp.GetError(); keyErr != nil {
|
||||
return false, errors.Errorf("unexpected resolve err: %s", keyErr)
|
||||
err = errors.Errorf("unexpected resolve err: %s", keyErr)
|
||||
logutil.BgLogger().Error(
|
||||
"resolveLock error",
|
||||
zap.Error(err),
|
||||
zap.String("debugInfo", tikverr.ExtractDebugInfoStrFromKeyErr(keyErr)),
|
||||
)
|
||||
return false, err
|
||||
}
|
||||
|
||||
logutil.BgLogger().Info("BatchResolveLocks: resolve locks in a batch",
|
||||
|
@ -1124,10 +1130,13 @@ func (lr *LockResolver) resolveRegionLocks(bo *retry.Backoffer, l *Lock, region
|
|||
cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse)
|
||||
if keyErr := cmdResp.GetError(); keyErr != nil {
|
||||
err = errors.Errorf("unexpected resolve err: %s, lock: %v", keyErr, l)
|
||||
logutil.BgLogger().Error("resolveLock error", zap.Error(err))
|
||||
logutil.BgLogger().Error("resolveLock error",
|
||||
zap.Error(err),
|
||||
zap.String("debugInfo", tikverr.ExtractDebugInfoStrFromKeyErr(keyErr)),
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStatus, lite bool, cleanRegions map[locate.RegionVerID]struct{}) error {
|
||||
|
@ -1190,7 +1199,11 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat
|
|||
cmdResp := resp.Resp.(*kvrpcpb.ResolveLockResponse)
|
||||
if keyErr := cmdResp.GetError(); keyErr != nil {
|
||||
err = errors.Errorf("unexpected resolve err: %s, lock: %v", keyErr, l)
|
||||
logutil.BgLogger().Error("resolveLock error", zap.Error(err))
|
||||
logutil.BgLogger().Error(
|
||||
"resolveLock error",
|
||||
zap.Error(err),
|
||||
zap.String("debugInfo", tikverr.ExtractDebugInfoStrFromKeyErr(keyErr)),
|
||||
)
|
||||
return err
|
||||
}
|
||||
if !resolveLite {
|
||||
|
|
Loading…
Reference in New Issue