Move config variables to config package (#9)

* cleanup configurations

Signed-off-by: disksing <i@disksing.com>
This commit is contained in:
disksing 2019-03-26 15:41:51 +08:00 committed by siddontang
parent 970883c423
commit ea8b88134e
23 changed files with 191 additions and 176 deletions

View File

@ -97,3 +97,107 @@ var (
EnableTxnLocalLatch = false
TxnLocalLatchCapacity uint = 2048000
)
// RegionCache configurations.
var (
RegionCacheBTreeDegree = 32
RegionCacheTTL = time.Minute * 10
)
// RawKV configurations.
var (
// MaxRawKVScanLimit is the maximum scan limit for rawkv Scan.
MaxRawKVScanLimit = 10240
// RawBatchPutSize is the maximum size limit for rawkv each batch put request.
RawBatchPutSize = 16 * 1024
// RawBatchPairCount is the maximum limit for rawkv each batch get/delete request.
RawBatchPairCount = 512
)
// RPC configurations.
var (
// MaxConnectionCount is the max gRPC connections that will be established with
// each tikv-server.
MaxConnectionCount uint = 16
// GrpcKeepAliveTime is the duration of time after which if the client doesn't see
// any activity it pings the server to see if the transport is still alive.
GrpcKeepAliveTime = time.Duration(10) * time.Second
// GrpcKeepAliveTimeout is the duration of time for which the client waits after having
// pinged for keepalive check and if no activity is seen even after that the connection
// is closed.
GrpcKeepAliveTimeout = time.Duration(3) * time.Second
// MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than
// current value, an error will be reported from gRPC.
MaxSendMsgSize = 1<<31 - 1
// MaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than
// current value, an error will be reported from gRPC.
MaxCallMsgSize = 1<<31 - 1
DialTimeout = 5 * time.Second
ReadTimeoutShort = 20 * time.Second // For requests that read/write several key-values.
ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region.
ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times.
GCTimeout = 5 * time.Minute
UnsafeDestroyRangeTimeout = 5 * time.Minute
GrpcInitialWindowSize = 1 << 30
GrpcInitialConnWindowSize = 1 << 30
)
// KV configurations.
var (
// DefaultTxnMembufCap is the default transaction membuf capability.
DefaultTxnMembufCap = 4 * 1024
)
// Latch configurations.
var (
LatchExpireDuration = 2 * time.Minute
LatchCheckInterval = 1 * time.Minute
LatchCheckCounter = 50000
LatchListCount = 5
LatchLockChanSize = 100
)
// Oracle configurations.
var (
TsoSlowThreshold = 30 * time.Millisecond
// update oracle's lastTS every 2000ms.
OracleUpdateInterval = 2000
)
// Txn configurations.
var (
// TiKV recommends each RPC packet should be less than ~1MB. We keep each packet's
// Key+Value size below 16KB.
TxnCommitBatchSize = 16 * 1024
// By default, locks after 3000ms is considered unusual (the client created the
// lock might be dead). Other client may cleanup this kind of lock.
// For locks created recently, we will do backoff and retry.
TxnDefaultLockTTL uint64 = 3000
// TODO: Consider if it's appropriate.
TxnMaxLockTTL uint64 = 120000
// ttl = ttlFactor * sqrt(writeSizeInMiB)
TxnTTLFactor = 6000
// TxnResolvedCacheSize is max number of cached txn status.
TxnResolvedCacheSize = 2048
// SafePoint.
// This is almost the same as 'tikv_gc_safe_point' in the table 'mysql.tidb',
// save this to pd instead of tikv, because we can't use interface of table
// if the safepoint on tidb is expired.
GcSavedSafePoint = "/tidb/store/gcworker/saved_safe_point"
GcSafePointCacheInterval = time.Second * 100
GcCPUTimeInaccuracyBound = time.Second
GcSafePointUpdateInterval = time.Second * 10
GcSafePointQuickRepeatInterval = time.Second
TxnScanBatchSize = 256
TxnBatchGetSize = 5120
)

View File

@ -27,15 +27,11 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/codec"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/metrics"
"github.com/tikv/client-go/retry"
)
const (
btreeDegree = 32
rcDefaultRegionCacheTTL = time.Minute * 10
)
// CachedRegion encapsulates {Region, TTL}
type CachedRegion struct {
region *Region
@ -45,7 +41,7 @@ type CachedRegion struct {
func (c *CachedRegion) isValid() bool {
lastAccess := atomic.LoadInt64(&c.lastAccess)
lastAccessTime := time.Unix(lastAccess, 0)
return time.Since(lastAccessTime) < rcDefaultRegionCacheTTL
return time.Since(lastAccessTime) < config.RegionCacheTTL
}
// RegionCache caches Regions loaded from PD.
@ -69,7 +65,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
pdClient: pdClient,
}
c.mu.regions = make(map[RegionVerID]*CachedRegion)
c.mu.sorted = btree.New(btreeDegree)
c.mu.sorted = btree.New(config.RegionCacheBTreeDegree)
c.storeMu.stores = make(map[uint64]*Store)
return c
}

View File

@ -29,19 +29,10 @@ import (
)
var (
// MaxRawKVScanLimit is the maximum scan limit for rawkv Scan.
MaxRawKVScanLimit = 10240
// ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large.
ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit")
)
const (
// rawBatchPutSize is the maximum size limit for rawkv each batch put request.
rawBatchPutSize = 16 * 1024
// rawBatchPairCount is the maximum limit for rawkv each batch get/delete request.
rawBatchPairCount = 512
)
// RawKVClient is a client of TiKV server which is used as a key-value storage,
// only GET/PUT/DELETE commands are supported.
type RawKVClient struct {
@ -278,7 +269,7 @@ func (c *RawKVClient) Scan(startKey, endKey []byte, limit int) (keys [][]byte, v
start := time.Now()
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("raw_scan").Observe(time.Since(start).Seconds()) }()
if limit > MaxRawKVScanLimit {
if limit > config.MaxRawKVScanLimit {
return nil, nil, errors.WithStack(ErrMaxScanLimitExceeded)
}
@ -319,7 +310,7 @@ func (c *RawKVClient) sendReq(key []byte, req *rpc.Request) (*rpc.Response, *loc
if err != nil {
return nil, nil, err
}
resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort)
if err != nil {
return nil, nil, err
}
@ -346,7 +337,7 @@ func (c *RawKVClient) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType r
var batches []batch
for regionID, groupKeys := range groups {
batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount)
batches = appendKeyBatches(batches, regionID, groupKeys, config.RawBatchPairCount)
}
bo, cancel := bo.Fork()
ches := make(chan singleBatchResp, len(batches))
@ -405,7 +396,7 @@ func (c *RawKVClient) doBatchReq(bo *retry.Backoffer, batch batch, cmdType rpc.C
}
sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)
resp, err := sender.SendReq(bo, req, batch.regionID, rpc.ReadTimeoutShort)
resp, err := sender.SendReq(bo, req, batch.regionID, config.ReadTimeoutShort)
batchResp := singleBatchResp{}
if err != nil {
@ -473,7 +464,7 @@ func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*rpc.R
},
}
resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort)
if err != nil {
return nil, nil, err
}
@ -504,7 +495,7 @@ func (c *RawKVClient) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) e
var batches []batch
// split the keys by size and RegionVerID
for regionID, groupKeys := range groups {
batches = appendBatches(batches, regionID, groupKeys, keyToValue, rawBatchPutSize)
batches = appendBatches(batches, regionID, groupKeys, keyToValue, config.RawBatchPutSize)
}
bo, cancel := bo.Fork()
ch := make(chan error, len(batches))
@ -583,7 +574,7 @@ func (c *RawKVClient) doBatchPut(bo *retry.Backoffer, batch batch) error {
}
sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)
resp, err := sender.SendReq(bo, req, batch.regionID, rpc.ReadTimeoutShort)
resp, err := sender.SendReq(bo, req, batch.regionID, config.ReadTimeoutShort)
if err != nil {
return err
}

View File

@ -20,6 +20,7 @@ import (
"testing"
. "github.com/pingcap/check"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/locate"
"github.com/tikv/client-go/mockstore/mocktikv"
"github.com/tikv/client-go/retry"
@ -178,7 +179,7 @@ func (s *testRawKVSuite) TestRawBatch(c *C) {
size := 0
var testKeys [][]byte
var testValues [][]byte
for i := 0; size/rawBatchPutSize < 4; i++ {
for i := 0; size/config.RawBatchPutSize < 4; i++ {
key := fmt.Sprint("key", i)
size += len(key)
testKeys = append(testKeys, []byte(key))

View File

@ -37,40 +37,6 @@ import (
gstatus "google.golang.org/grpc/status"
)
// MaxConnectionCount is the max gRPC connections that will be established with
// each tikv-server.
var MaxConnectionCount uint = 16
// GrpcKeepAliveTime is the duration of time after which if the client doesn't see
// any activity it pings the server to see if the transport is still alive.
var GrpcKeepAliveTime = time.Duration(10) * time.Second
// GrpcKeepAliveTimeout is the duration of time for which the client waits after having
// pinged for keepalive check and if no activity is seen even after that the connection
// is closed.
var GrpcKeepAliveTimeout = time.Duration(3) * time.Second
// MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than
// current value, an error will be reported from gRPC.
var MaxSendMsgSize = 1<<31 - 1
// MaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than
// current value, an error will be reported from gRPC.
var MaxCallMsgSize = 1<<31 - 1
// Timeout durations.
const (
dialTimeout = 5 * time.Second
ReadTimeoutShort = 20 * time.Second // For requests that read/write several key-values.
ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region.
ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times.
GCTimeout = 5 * time.Minute
UnsafeDestroyRangeTimeout = 5 * time.Minute
grpcInitialWindowSize = 1 << 30
grpcInitialConnWindowSize = 1 << 30
)
// Client is a client that sends RPC.
// It should not be used after calling Close().
type Client interface {
@ -223,21 +189,21 @@ func (a *connArray) Init(addr string, security config.Security) error {
allowBatch := config.MaxBatchSize > 0
for i := range a.v {
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
ctx, cancel := context.WithTimeout(context.Background(), config.DialTimeout)
conn, err := grpc.DialContext(
ctx,
addr,
opt,
grpc.WithInitialWindowSize(grpcInitialWindowSize),
grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize),
grpc.WithInitialWindowSize(int32(config.GrpcInitialWindowSize)),
grpc.WithInitialConnWindowSize(int32(config.GrpcInitialConnWindowSize)),
grpc.WithUnaryInterceptor(unaryInterceptor),
grpc.WithStreamInterceptor(streamInterceptor),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxCallMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(MaxSendMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(config.MaxCallMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(config.MaxSendMsgSize)),
grpc.WithBackoffMaxDelay(time.Second*3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: GrpcKeepAliveTime,
Timeout: GrpcKeepAliveTimeout,
Time: config.GrpcKeepAliveTime,
Timeout: config.GrpcKeepAliveTimeout,
PermitWithoutStream: true,
}),
)
@ -489,7 +455,7 @@ func (c *rpcClient) createConnArray(addr string) (*connArray, error) {
array, ok := c.conns[addr]
if !ok {
var err error
array, err = newConnArray(MaxConnectionCount, addr, c.security)
array, err = newConnArray(config.MaxConnectionCount, addr, c.security)
if err != nil {
return nil, err
}

View File

@ -14,18 +14,10 @@
package kv
import (
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/key"
)
var (
// DefaultTxnMembufCap is the default transaction membuf capability.
DefaultTxnMembufCap = 4 * 1024
// ImportingTxnMembufCap is the capability of tidb importing data situation.
ImportingTxnMembufCap = 32 * 1024
// TempTxnMemBufCap is the capability of temporary membuf.
TempTxnMemBufCap = 64
)
// BufferStore wraps a Retriever for read and a MemBuffer for buffered write.
// Common usage pattern:
// bs := NewBufferStore(r) // use BufferStore to wrap a Retriever
@ -41,7 +33,7 @@ type BufferStore struct {
// NewBufferStore creates a BufferStore using r for read.
func NewBufferStore(r Retriever, cap int) *BufferStore {
if cap <= 0 {
cap = DefaultTxnMembufCap
cap = config.DefaultTxnMembufCap
}
return &BufferStore{
r: r,

View File

@ -19,6 +19,7 @@ import (
"testing"
. "github.com/pingcap/check"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/key"
)
@ -31,7 +32,7 @@ type testBufferStoreSuite struct{}
var _ = Suite(testBufferStoreSuite{})
func (s testBufferStoreSuite) TestGetSet(c *C) {
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(DefaultTxnMembufCap)}, DefaultTxnMembufCap)
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(config.DefaultTxnMembufCap)}, config.DefaultTxnMembufCap)
key := key.Key("key")
_, err := bs.Get(key)
c.Check(err, NotNil)
@ -45,7 +46,7 @@ func (s testBufferStoreSuite) TestGetSet(c *C) {
}
func (s testBufferStoreSuite) TestSaveTo(c *C) {
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(DefaultTxnMembufCap)}, DefaultTxnMembufCap)
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(config.DefaultTxnMembufCap)}, config.DefaultTxnMembufCap)
var buf bytes.Buffer
for i := 0; i < 10; i++ {
fmt.Fprint(&buf, i)
@ -55,7 +56,7 @@ func (s testBufferStoreSuite) TestSaveTo(c *C) {
}
bs.Set(key.Key("novalue"), nil)
mutator := NewMemDbBuffer(DefaultTxnMembufCap)
mutator := NewMemDbBuffer(config.DefaultTxnMembufCap)
err := bs.SaveTo(mutator)
c.Check(err, IsNil)

View File

@ -21,6 +21,7 @@ import (
"testing"
. "github.com/pingcap/check"
"github.com/tikv/client-go/config"
)
const (
@ -37,11 +38,11 @@ type testKVSuite struct {
func (s *testKVSuite) SetUpSuite(c *C) {
s.bs = make([]MemBuffer, 1)
s.bs[0] = NewMemDbBuffer(DefaultTxnMembufCap)
s.bs[0] = NewMemDbBuffer(config.DefaultTxnMembufCap)
}
func (s *testKVSuite) ResetMembuffers() {
s.bs[0] = NewMemDbBuffer(DefaultTxnMembufCap)
s.bs[0] = NewMemDbBuffer(config.DefaultTxnMembufCap)
}
func insertData(c *C, buffer MemBuffer) {
@ -184,7 +185,7 @@ func (s *testKVSuite) TestNewIteratorMin(c *C) {
}
func (s *testKVSuite) TestBufferLimit(c *C) {
buffer := NewMemDbBuffer(DefaultTxnMembufCap).(*memDbBuffer)
buffer := NewMemDbBuffer(config.DefaultTxnMembufCap).(*memDbBuffer)
buffer.bufferSizeLimit = 1000
buffer.entrySizeLimit = 500
@ -196,7 +197,7 @@ func (s *testKVSuite) TestBufferLimit(c *C) {
err = buffer.Set([]byte("yz"), make([]byte, 499))
c.Assert(err, NotNil) // buffer size limit
buffer = NewMemDbBuffer(DefaultTxnMembufCap).(*memDbBuffer)
buffer = NewMemDbBuffer(config.DefaultTxnMembufCap).(*memDbBuffer)
buffer.bufferLenLimit = 10
for i := 0; i < 10; i++ {
err = buffer.Set([]byte{byte(i)}, []byte{byte(i)})
@ -213,7 +214,7 @@ func BenchmarkMemDbBufferSequential(b *testing.B) {
for i := 0; i < opCnt; i++ {
data[i] = encodeInt(i)
}
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
buffer := NewMemDbBuffer(config.DefaultTxnMembufCap)
benchmarkSetGet(b, buffer, data)
b.ReportAllocs()
}
@ -224,20 +225,20 @@ func BenchmarkMemDbBufferRandom(b *testing.B) {
data[i] = encodeInt(i)
}
shuffle(data)
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
buffer := NewMemDbBuffer(config.DefaultTxnMembufCap)
benchmarkSetGet(b, buffer, data)
b.ReportAllocs()
}
func BenchmarkMemDbIter(b *testing.B) {
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
buffer := NewMemDbBuffer(config.DefaultTxnMembufCap)
benchIterator(b, buffer)
b.ReportAllocs()
}
func BenchmarkMemDbCreation(b *testing.B) {
for i := 0; i < b.N; i++ {
NewMemDbBuffer(DefaultTxnMembufCap)
NewMemDbBuffer(config.DefaultTxnMembufCap)
}
b.ReportAllocs()
}

View File

@ -14,6 +14,7 @@
package kv
import (
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/key"
)
@ -73,7 +74,7 @@ type unionStore struct {
// NewUnionStore builds a new UnionStore.
func NewUnionStore(snapshot Snapshot) UnionStore {
return &unionStore{
BufferStore: NewBufferStore(snapshot, DefaultTxnMembufCap),
BufferStore: NewBufferStore(snapshot, config.DefaultTxnMembufCap),
snapshot: snapshot,
lazyConditionPairs: make(map[string]*conditionPair),
opts: make(map[Option]interface{}),

View File

@ -16,6 +16,7 @@ package kv
import (
. "github.com/pingcap/check"
"github.com/pkg/errors"
"github.com/tikv/client-go/config"
)
var _ = Suite(&testUnionStoreSuite{})
@ -26,7 +27,7 @@ type testUnionStoreSuite struct {
}
func (s *testUnionStoreSuite) SetUpTest(c *C) {
s.store = NewMemDbBuffer(DefaultTxnMembufCap)
s.store = NewMemDbBuffer(config.DefaultTxnMembufCap)
s.us = NewUnionStore(&mockSnapshot{s.store})
}

View File

@ -23,6 +23,7 @@ import (
"github.com/cznic/mathutil"
log "github.com/sirupsen/logrus"
"github.com/spaolacci/murmur3"
"github.com/tikv/client-go/config"
)
type node struct {
@ -227,7 +228,7 @@ func (latches *Latches) acquireSlot(lock *Lock) acquireResult {
defer latch.Unlock()
// Try to recycle to limit the memory usage.
if latch.count >= latchListCount {
if latch.count >= config.LatchListCount {
latch.recycle(lock.startTS)
}
@ -268,7 +269,7 @@ func (l *latch) recycle(currentTS uint64) int {
fakeHead := node{next: l.queue}
prev := &fakeHead
for curr := prev.next; curr != nil; curr = curr.next {
if tsoSub(currentTS, curr.maxCommitTS) >= expireDuration && curr.value == nil {
if tsoSub(currentTS, curr.maxCommitTS) >= config.LatchExpireDuration && curr.value == nil {
l.count--
prev.next = curr.next
total++

View File

@ -19,6 +19,7 @@ import (
"time"
. "github.com/pingcap/check"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/txnkv/oracle"
)
@ -142,7 +143,7 @@ func (s *testLatchSuite) TestRecycle(c *C) {
}
c.Assert(allEmpty, IsFalse)
currentTS := oracle.ComposeTS(oracle.GetPhysical(now.Add(expireDuration)), 3)
currentTS := oracle.ComposeTS(oracle.GetPhysical(now.Add(config.LatchExpireDuration)), 3)
latches.recycle(currentTS)
for i := 0; i < len(latches.slots); i++ {

View File

@ -17,11 +17,10 @@ import (
"sync"
"time"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/txnkv/oracle"
)
const lockChanSize = 100
// LatchesScheduler is used to schedule latches for transactions.
type LatchesScheduler struct {
latches *Latches
@ -34,7 +33,7 @@ type LatchesScheduler struct {
// NewScheduler create the LatchesScheduler.
func NewScheduler(size uint) *LatchesScheduler {
latches := NewLatches(size)
unlockCh := make(chan *Lock, lockChanSize)
unlockCh := make(chan *Lock, config.LatchLockChanSize)
scheduler := &LatchesScheduler{
latches: latches,
unlockCh: unlockCh,
@ -44,11 +43,6 @@ func NewScheduler(size uint) *LatchesScheduler {
return scheduler
}
const expireDuration = 2 * time.Minute
const checkInterval = 1 * time.Minute
const checkCounter = 50000
const latchListCount = 5
func (scheduler *LatchesScheduler) run() {
var counter int
wakeupList := make([]*Lock, 0)
@ -61,7 +55,7 @@ func (scheduler *LatchesScheduler) run() {
if lock.commitTS > lock.startTS {
currentTS := lock.commitTS
elapsed := tsoSub(currentTS, scheduler.lastRecycleTime)
if elapsed > checkInterval || counter > checkCounter {
if elapsed > config.LatchCheckInterval || counter > config.LatchCheckCounter {
go scheduler.latches.recycle(lock.commitTS)
scheduler.lastRecycleTime = currentTS
counter = 0

View File

@ -20,14 +20,13 @@ import (
"github.com/pingcap/pd/client"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/metrics"
"github.com/tikv/client-go/txnkv/oracle"
)
var _ oracle.Oracle = &pdOracle{}
const slowDist = 30 * time.Millisecond
// pdOracle is an Oracle that uses a placement driver client as source.
type pdOracle struct {
c pd.Client
@ -103,7 +102,7 @@ func (o *pdOracle) getTimestamp(ctx context.Context) (uint64, error) {
return 0, err
}
dist := time.Since(now)
if dist > slowDist {
if dist > config.TsoSlowThreshold {
log.Warnf("get timestamp too slow: %s", dist)
}
return oracle.ComposeTS(physical, logical), nil

View File

@ -15,10 +15,6 @@ package store
import "github.com/tikv/client-go/locate"
// TiKV recommends each RPC packet should be less than ~1MB. We keep each packet's
// Key+Value size below 16KB.
const txnCommitBatchSize = 16 * 1024
// batchKeys is a batch of keys in the same region.
type batchKeys struct {
region locate.RegionVerID

View File

@ -19,6 +19,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pkg/errors"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/retry"
"github.com/tikv/client-go/rpc"
)
@ -79,7 +80,7 @@ func (t *DeleteRangeTask) Execute() error {
},
}
resp, err := t.store.SendReq(bo, req, loc.Region, rpc.ReadTimeoutMedium)
resp, err := t.store.SendReq(bo, req, loc.Region, config.ReadTimeoutMedium)
if err != nil {
return err
}

View File

@ -29,9 +29,6 @@ import (
"github.com/tikv/client-go/rpc"
)
// ResolvedCacheSize is max number of cached txn status.
const ResolvedCacheSize = 2048
// LockResolver resolves locks and also caches resolved txn status.
type LockResolver struct {
store *TiKVStore
@ -76,17 +73,6 @@ func (s TxnStatus) IsCommitted() bool { return s > 0 }
// CommitTS returns the txn's commitTS. It is valid iff `IsCommitted` is true.
func (s TxnStatus) CommitTS() uint64 { return uint64(s) }
// By default, locks after 3000ms is considered unusual (the client created the
// lock might be dead). Other client may cleanup this kind of lock.
// For locks created recently, we will do backoff and retry.
var defaultLockTTL uint64 = 3000
// TODO: Consider if it's appropriate.
var maxLockTTL uint64 = 120000
// ttl = ttlFactor * sqrt(writeSizeInMiB)
var ttlFactor = 6000
// Lock represents a lock from tikv server.
type Lock struct {
Key []byte
@ -99,7 +85,7 @@ type Lock struct {
func NewLock(l *kvrpcpb.LockInfo) *Lock {
ttl := l.GetLockTtl()
if ttl == 0 {
ttl = defaultLockTTL
ttl = config.TxnDefaultLockTTL
}
return &Lock{
Key: l.GetKey(),
@ -118,7 +104,7 @@ func (lr *LockResolver) saveResolved(txnID uint64, status TxnStatus) {
}
lr.mu.resolved[txnID] = status
lr.mu.recentResolved.PushBack(txnID)
if len(lr.mu.resolved) > ResolvedCacheSize {
if len(lr.mu.resolved) > config.TxnResolvedCacheSize {
front := lr.mu.recentResolved.Front()
delete(lr.mu.resolved, front.Value.(uint64))
lr.mu.recentResolved.Remove(front)
@ -185,7 +171,7 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo
},
}
startTime = time.Now()
resp, err := lr.store.SendReq(bo, req, loc, rpc.ReadTimeoutShort)
resp, err := lr.store.SendReq(bo, req, loc, config.ReadTimeoutShort)
if err != nil {
return false, err
}
@ -296,7 +282,7 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary
if err != nil {
return status, err
}
resp, err := lr.store.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
resp, err := lr.store.SendReq(bo, req, loc.Region, config.ReadTimeoutShort)
if err != nil {
return status, err
}
@ -350,7 +336,7 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat
if status.IsCommitted() {
req.ResolveLock.CommitVersion = status.CommitTS()
}
resp, err := lr.store.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
resp, err := lr.store.SendReq(bo, req, loc.Region, config.ReadTimeoutShort)
if err != nil {
return err
}

View File

@ -24,22 +24,10 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/config"
"google.golang.org/grpc"
)
// Safe point constants.
const (
// This is almost the same as 'tikv_gc_safe_point' in the table 'mysql.tidb',
// save this to pd instead of tikv, because we can't use interface of table
// if the safepoint on tidb is expired.
GcSavedSafePoint = "/tidb/store/gcworker/saved_safe_point"
GcSafePointCacheInterval = time.Second * 100
gcCPUTimeInaccuracyBound = time.Second
gcSafePointUpdateInterval = time.Second * 10
gcSafePointQuickRepeatInterval = time.Second
)
// SafePointKV is used for a seamingless integration for mockTest and runtime.
type SafePointKV interface {
Put(k string, v string) error
@ -132,7 +120,7 @@ func (w *EtcdSafePointKV) Get(k string) (string, error) {
func saveSafePoint(kv SafePointKV, key string, t uint64) error {
s := strconv.FormatUint(t, 10)
err := kv.Put(GcSavedSafePoint, s)
err := kv.Put(config.GcSavedSafePoint, s)
if err != nil {
log.Error("save safepoint failed:", err)
return err
@ -141,7 +129,7 @@ func saveSafePoint(kv SafePointKV, key string, t uint64) error {
}
func loadSafePoint(kv SafePointKV, key string) (uint64, error) {
str, err := kv.Get(GcSavedSafePoint)
str, err := kv.Get(config.GcSavedSafePoint)
if err != nil {
return 0, err

View File

@ -20,6 +20,7 @@ import (
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/key"
"github.com/tikv/client-go/retry"
"github.com/tikv/client-go/rpc"
@ -41,7 +42,7 @@ type Scanner struct {
func newScanner(snapshot *TiKVSnapshot, startKey []byte, endKey []byte, batchSize int) (*Scanner, error) {
// It must be > 1. Otherwise scanner won't skipFirst.
if batchSize <= 1 {
batchSize = scanBatchSize
batchSize = config.TxnScanBatchSize
}
scanner := &Scanner{
snapshot: snapshot,
@ -174,7 +175,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
NotFillCache: s.snapshot.NotFillCache,
},
}
resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutMedium)
resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutMedium)
if err != nil {
return err
}

View File

@ -24,6 +24,7 @@ import (
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/key"
"github.com/tikv/client-go/metrics"
"github.com/tikv/client-go/retry"
@ -31,11 +32,6 @@ import (
"github.com/tikv/client-go/txnkv/kv"
)
const (
scanBatchSize = 256
batchGetSize = 5120
)
// TiKVSnapshot supports read from TiKV.
type TiKVSnapshot struct {
store *TiKVStore
@ -102,7 +98,7 @@ func (s *TiKVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte,
var batches []batchKeys
for id, g := range groups {
batches = appendBatchBySize(batches, id, g, func([]byte) int { return 1 }, batchGetSize)
batches = appendBatchBySize(batches, id, g, func([]byte) int { return 1 }, config.TxnBatchGetSize)
}
if len(batches) == 0 {
@ -145,7 +141,7 @@ func (s *TiKVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys
NotFillCache: s.NotFillCache,
},
}
resp, err := sender.SendReq(bo, req, batch.region, rpc.ReadTimeoutMedium)
resp, err := sender.SendReq(bo, req, batch.region, config.ReadTimeoutMedium)
if err != nil {
return err
}
@ -230,7 +226,7 @@ func (s *TiKVSnapshot) get(bo *retry.Backoffer, k key.Key) ([]byte, error) {
if err != nil {
return nil, err
}
resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
resp, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort)
if err != nil {
return nil, err
}
@ -273,7 +269,7 @@ func (s *TiKVSnapshot) get(bo *retry.Backoffer, k key.Key) ([]byte, error) {
// Iter returns a list of key-value pair after `k`.
func (s *TiKVSnapshot) Iter(k key.Key, upperBound key.Key) (kv.Iterator, error) {
scanner, err := newScanner(s, k, upperBound, scanBatchSize)
scanner, err := newScanner(s, k, upperBound, config.TxnScanBatchSize)
return scanner, err
}

View File

@ -20,6 +20,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/key"
"github.com/tikv/client-go/retry"
"github.com/tikv/client-go/rpc"
@ -47,7 +48,7 @@ func SplitRegion(store *TiKVStore, splitKey key.Key) error {
log.Infof("skip split_region region at %q", splitKey)
return nil
}
res, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
res, err := sender.SendReq(bo, req, loc.Region, config.ReadTimeoutShort)
if err != nil {
return err
}

View File

@ -33,9 +33,6 @@ import (
"github.com/tikv/client-go/txnkv/oracle/oracles"
)
// update oracle's lastTS every 2000ms.
var oracleUpdateInterval = 2000
// TiKVStore contains methods to interact with a TiKV cluster.
type TiKVStore struct {
clusterID uint64
@ -67,7 +64,7 @@ func NewStore(pdAddrs []string, security config.Security) (*TiKVStore, error) {
return nil, err
}
oracle, err := oracles.NewPdOracle(pdCli, time.Duration(oracleUpdateInterval)*time.Millisecond)
oracle, err := oracles.NewPdOracle(pdCli, time.Duration(config.OracleUpdateInterval)*time.Millisecond)
if err != nil {
return nil, err
}
@ -180,21 +177,21 @@ func (s *TiKVStore) GetTimestampWithRetry(bo *retry.Backoffer) (uint64, error) {
}
func (s *TiKVStore) runSafePointChecker() {
d := gcSafePointUpdateInterval
d := config.GcSafePointUpdateInterval
for {
select {
case spCachedTime := <-time.After(d):
cachedSafePoint, err := loadSafePoint(s.spkv, GcSavedSafePoint)
cachedSafePoint, err := loadSafePoint(s.spkv, config.GcSavedSafePoint)
if err == nil {
metrics.LoadSafepointCounter.WithLabelValues("ok").Inc()
s.spMutex.Lock()
s.safePoint, s.spTime = cachedSafePoint, spCachedTime
s.spMutex.Unlock()
d = gcSafePointUpdateInterval
d = config.GcSafePointUpdateInterval
} else {
metrics.LoadSafepointCounter.WithLabelValues("fail").Inc()
log.Errorf("fail to load safepoint from pd: %v", err)
d = gcSafePointQuickRepeatInterval
d = config.GcSafePointQuickRepeatInterval
}
case <-s.Closed():
return
@ -211,7 +208,7 @@ func (s *TiKVStore) CheckVisibility(startTS uint64) error {
s.spMutex.RUnlock()
diff := time.Since(cachedTime)
if diff > (GcSafePointCacheInterval - gcCPUTimeInaccuracyBound) {
if diff > (config.GcSafePointCacheInterval - config.GcCPUTimeInaccuracyBound) {
return errors.WithStack(ErrPDServerTimeout)
}

View File

@ -143,15 +143,15 @@ func txnLockTTL(startTime time.Time, txnSize int) uint64 {
// The formula is `ttl = ttlFactor * sqrt(sizeInMiB)`.
// When writeSize is less than 256KB, the base ttl is defaultTTL (3s);
// When writeSize is 1MiB, 100MiB, or 400MiB, ttl is 6s, 60s, 120s correspondingly;
lockTTL := defaultLockTTL
if txnSize >= txnCommitBatchSize {
lockTTL := config.TxnDefaultLockTTL
if txnSize >= config.TxnCommitBatchSize {
sizeMiB := float64(txnSize) / bytesPerMiB
lockTTL = uint64(float64(ttlFactor) * math.Sqrt(sizeMiB))
if lockTTL < defaultLockTTL {
lockTTL = defaultLockTTL
lockTTL = uint64(float64(config.TxnTTLFactor) * math.Sqrt(sizeMiB))
if lockTTL < config.TxnDefaultLockTTL {
lockTTL = config.TxnDefaultLockTTL
}
if lockTTL > maxLockTTL {
lockTTL = maxLockTTL
if lockTTL > config.TxnMaxLockTTL {
lockTTL = config.TxnMaxLockTTL
}
}
@ -183,10 +183,10 @@ func (c *TxnCommitter) doActionOnKeys(bo *retry.Backoffer, action commitAction,
atomic.AddInt32(&c.detail.PrewriteRegionNum, int32(len(groups)))
}
// Make sure the group that contains primary key goes first.
batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize)
batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, config.TxnCommitBatchSize)
delete(groups, firstRegion)
for id, g := range groups {
batches = appendBatchBySize(batches, id, g, sizeFunc, txnCommitBatchSize)
batches = appendBatchBySize(batches, id, g, sizeFunc, config.TxnCommitBatchSize)
}
firstIsPrimary := bytes.Equal(keys[0], c.primary())
@ -318,7 +318,7 @@ func (c *TxnCommitter) prewriteSingleBatch(bo *retry.Backoffer, batch batchKeys)
},
}
for {
resp, err := c.store.SendReq(bo, req, batch.region, rpc.ReadTimeoutShort)
resp, err := c.store.SendReq(bo, req, batch.region, config.ReadTimeoutShort)
if err != nil {
return err
}
@ -399,7 +399,7 @@ func (c *TxnCommitter) commitSingleBatch(bo *retry.Backoffer, batch batchKeys) e
req.Context.Priority = c.Priority
sender := rpc.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetRPCClient())
resp, err := sender.SendReq(bo, req, batch.region, rpc.ReadTimeoutShort)
resp, err := sender.SendReq(bo, req, batch.region, config.ReadTimeoutShort)
// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
// transaction has been successfully committed.
@ -470,7 +470,7 @@ func (c *TxnCommitter) cleanupSingleBatch(bo *retry.Backoffer, batch batchKeys)
SyncLog: c.SyncLog,
},
}
resp, err := c.store.SendReq(bo, req, batch.region, rpc.ReadTimeoutShort)
resp, err := c.store.SendReq(bo, req, batch.region, config.ReadTimeoutShort)
if err != nil {
return err
}