*: allow to custom disk_full_opt for transactions (#256)

Signed-off-by: tier-cap <zhengxiaojin@pingcap.com>
This commit is contained in:
tier-cap 2021-08-06 16:56:16 +08:00 committed by GitHub
parent 6f33dd97af
commit 14892a598e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 119 additions and 4 deletions

2
go.mod
View File

@ -18,7 +18,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db
github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210525032559-c37778aff307
github.com/prometheus/client_golang v1.5.1

4
go.sum
View File

@ -292,6 +292,10 @@ github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLy
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db h1:PSW6P83KZi5WopPBiecU286PWMSl2rvxCBZT94iBX+I=
github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210802073939-62630088ebc1 h1:zl56/I6s/UMG/kH+9epaIAiwfIx2gZO5FW8hvkhDYAg=
github.com/pingcap/kvproto v0.0.0-20210802073939-62630088ebc1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4 h1:4EUpHzPFHwleKkVALyMqQbQcNziPZvU+vhUT9Wzj93E=
github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=

View File

@ -218,6 +218,24 @@ func (s *testCommitterSuite) TestCommitRollback() {
})
}
func (s *testCommitterSuite) TestCommitOnTiKVDiskFullOpt() {
s.Nil(failpoint.Enable("tikvclient/rpcAllowedOnAlmostFull", `return("true")`))
txn := s.begin()
txn.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
txn.Set([]byte("a"), []byte("a1"))
err := txn.Commit(context.Background())
s.Nil(err)
s.checkValues(map[string]string{"a": "a1"})
s.Nil(failpoint.Disable("tikvclient/rpcAllowedOnAlmostFull"))
s.Nil(failpoint.Enable("tikvclient/rpcAllowedOnAlmostFull", `return("true")`))
txn = s.begin()
txn.Set([]byte("c"), []byte("c1"))
err = txn.Commit(context.Background())
s.NotNil(err)
s.Nil(failpoint.Disable("tikvclient/rpcAllowedOnAlmostFull"))
}
func (s *testCommitterSuite) TestPrewriteRollback() {
s.mustCommit(map[string]string{
"a": "a0",

View File

@ -6,7 +6,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db
github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4
github.com/pingcap/parser v0.0.0-20210728060616-75cff0c906d2
github.com/pingcap/tidb v1.1.0-beta.0.20210729073017-a27d306e65a0
github.com/stretchr/testify v1.7.0

View File

@ -422,6 +422,10 @@ github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLy
github.com/pingcap/kvproto v0.0.0-20210219064844-c1844a4775d6/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db h1:PSW6P83KZi5WopPBiecU286PWMSl2rvxCBZT94iBX+I=
github.com/pingcap/kvproto v0.0.0-20210722091755-91a52cd9e8db/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210802073939-62630088ebc1 h1:zl56/I6s/UMG/kH+9epaIAiwfIx2gZO5FW8hvkhDYAg=
github.com/pingcap/kvproto v0.0.0-20210802073939-62630088ebc1/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4 h1:4EUpHzPFHwleKkVALyMqQbQcNziPZvU+vhUT9Wzj93E=
github.com/pingcap/kvproto v0.0.0-20210806074406-317f69fb54b4/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=

View File

@ -950,6 +950,14 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
}
}
// Not Retry when tikv disk full happens.
if diskFull := regionErr.GetDiskFull(); diskFull != nil {
logutil.BgLogger().Error("tikv reports `DiskFull` not retry",
zap.String("diskFull", diskFull.String()),
zap.String("ctx", ctx.String()))
return false, nil
}
// This peer is removed from the region. Invalidate the region since it's too stale.
if regionErr.GetRegionNotFound() != nil {
if seed != nil {

View File

@ -674,6 +674,21 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
resp.Resp = kvHandler{session}.handleKvScan(r)
case tikvrpc.CmdPrewrite:
if val, err := util.EvalFailpoint("rpcAllowedOnAlmostFull"); err == nil {
switch val.(string) {
case "true":
if req.Context.DiskFullOpt != kvrpcpb.DiskFullOpt_AllowedOnAlmostFull {
return &tikvrpc.Response{
Resp: &kvrpcpb.PrewriteResponse{
RegionError: &errorpb.Error{
DiskFull: &errorpb.DiskFull{StoreId: []uint64{1, 10}, Reason: "disk almost full"},
},
},
}, nil
}
}
}
if val, err := util.EvalFailpoint("rpcPrewriteResult"); err == nil {
switch val.(string) {
case "notLeader":
@ -704,6 +719,21 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
}
resp.Resp = kvHandler{session}.handleKvPessimisticRollback(r)
case tikvrpc.CmdCommit:
if val, err := util.EvalFailpoint("rpcAllowedOnAlmostFull"); err == nil {
switch val.(string) {
case "true":
if req.Context.DiskFullOpt != kvrpcpb.DiskFullOpt_AllowedOnAlmostFull {
return &tikvrpc.Response{
Resp: &kvrpcpb.CommitResponse{
RegionError: &errorpb.Error{
DiskFull: &errorpb.DiskFull{StoreId: []uint64{1, 10}, Reason: "disk almost full"},
},
},
}, nil
}
}
}
if val, err := util.EvalFailpoint("rpcCommitResult"); err == nil {
switch val.(string) {
case "timeout":

View File

@ -39,6 +39,7 @@ import (
"sync"
"unsafe"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/kv"
)
@ -163,6 +164,11 @@ func (db *MemDB) Reset() {
db.allocator.reset()
}
// SetDiskFullOpt is used by TiDB test case.
func (db *MemDB) SetDiskFullOpt(level kvrpcpb.DiskFullOpt) {
// Nothing to do.
}
// DiscardValues releases the memory used by all values.
// NOTE: any operation need value will panic after this function.
func (db *MemDB) DiscardValues() {

View File

@ -168,6 +168,9 @@ type twoPhaseCommitter struct {
binlog BinlogExecutor
resourceGroupTag []byte
// allowed when tikv disk full happened.
diskFullOpt kvrpcpb.DiskFullOpt
}
type memBufferMutations struct {
@ -362,6 +365,7 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err
regionTxnSize: map[uint64]int{},
isPessimistic: txn.IsPessimistic(),
binlog: txn.binlog,
diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull,
}, nil
}
@ -833,6 +837,10 @@ func (c *twoPhaseCommitter) keySize(key, value []byte) int {
return len(key)
}
func (c *twoPhaseCommitter) SetDiskFullOpt(level kvrpcpb.DiskFullOpt) {
c.diskFullOpt = level
}
type ttlManagerState uint32
const (

View File

@ -68,7 +68,8 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
StartVersion: c.startTS,
Keys: keys,
CommitVersion: c.commitTS,
}, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag})
}, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog,
ResourceGroupTag: c.resourceGroupTag, DiskFullOpt: c.diskFullOpt})
tBegin := time.Now()
attempts := 0

View File

@ -35,6 +35,7 @@ package transaction
import (
"encoding/hex"
"math"
"strconv"
"sync/atomic"
"time"
@ -138,7 +139,8 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
req.TryOnePc = true
}
return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag})
return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req,
kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, DiskFullOpt: c.diskFullOpt})
}
func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) (err error) {
@ -220,6 +222,19 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
return errors.Trace(err)
}
}
if regionErr.GetDiskFull() != nil {
storeIds := regionErr.GetDiskFull().GetStoreId()
desc := " "
for _, i := range storeIds {
desc += strconv.FormatUint(i, 10) + " "
}
logutil.Logger(bo.GetCtx()).Error("Request failed cause of TiKV disk full",
zap.String("store_id", desc),
zap.String("reason", regionErr.GetDiskFull().GetReason()))
return errors.Trace(errors.New(regionErr.String()))
}
same, err := batch.relocate(bo, c.store.GetRegionCache())
if err != nil {
return errors.Trace(err)

View File

@ -107,6 +107,7 @@ type KVTxn struct {
scope string
kvFilter KVFilter
resourceGroupTag []byte
diskFullOpt kvrpcpb.DiskFullOpt
}
// NewTiKVTxn creates a new KVTxn.
@ -123,6 +124,7 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64,
scope: scope,
enableAsyncCommit: cfg.EnableAsyncCommit,
enable1PC: cfg.Enable1PC,
diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull,
}
return newTiKVTxn, nil
}
@ -267,6 +269,21 @@ func (txn *KVTxn) SetKVFilter(filter KVFilter) {
txn.kvFilter = filter
}
// SetDiskFullOpt sets whether current operation is allowed in each TiKV disk usage level.
func (txn *KVTxn) SetDiskFullOpt(level kvrpcpb.DiskFullOpt) {
txn.diskFullOpt = level
}
// GetDiskFullOpt gets the options of current operation in each TiKV disk usage level.
func (txn *KVTxn) GetDiskFullOpt() kvrpcpb.DiskFullOpt {
return txn.diskFullOpt
}
// ClearDiskFullOpt clears the options of current operation in each tikv disk usage level.
func (txn *KVTxn) ClearDiskFullOpt() {
txn.diskFullOpt = kvrpcpb.DiskFullOpt_NotAllowedOnFull
}
// IsPessimistic returns true if it is pessimistic.
func (txn *KVTxn) IsPessimistic() bool {
return txn.isPessimistic
@ -324,6 +341,9 @@ func (txn *KVTxn) Commit(ctx context.Context) error {
}
txn.committer = committer
}
txn.committer.SetDiskFullOpt(txn.diskFullOpt)
defer committer.ttlManager.close()
initRegion := trace.StartRegion(ctx, "InitKeys")
@ -390,6 +410,7 @@ func (txn *KVTxn) Commit(ctx context.Context) error {
func (txn *KVTxn) close() {
txn.valid = false
txn.ClearDiskFullOpt()
}
// Rollback undoes the transaction operations to KV store.