add txnkv support and examples (#6)

* add txnkv support

Signed-off-by: disksing <i@disksing.com>

* add examples

Signed-off-by: disksing <i@disksing.com>
This commit is contained in:
disksing 2019-03-18 19:47:54 +08:00 committed by GitHub
parent 445fc2e42e
commit a9347f9bec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 2981 additions and 12 deletions

View File

@ -87,4 +87,13 @@ var (
TxnEntryCountLimit uint64 = 300 * 1000
// TxnTotalSizeLimit is limit of the sum of all entry size.
TxnTotalSizeLimit = 100 * 1024 * 1024
// MaxTxnTimeUse is the max time a transaction can run.
MaxTxnTimeUse = 590
)
// Local latches for transactions. Enable it when
// there are lots of conflicts between transactions.
var (
EnableTxnLocalLatch = false
TxnLocalLatchCapacity uint = 2048000
)

62
examples/rawkv/rawkv.go Normal file
View File

@ -0,0 +1,62 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/rawkv"
)
func main() {
cli, err := rawkv.NewRawKVClient([]string{"127.0.0.1:2379"}, config.Security{})
if err != nil {
panic(err)
}
defer cli.Close()
fmt.Printf("cluster ID: %d\n", cli.ClusterID())
key := []byte("Company")
val := []byte("PingCAP")
// put key into tikv
err = cli.Put(key, val)
if err != nil {
panic(err)
}
fmt.Printf("Successfully put %s:%s to tikv\n", key, val)
// get key from tikv
val, err = cli.Get(key)
if err != nil {
panic(err)
}
fmt.Printf("found val: %s for key: %s\n", val, key)
// delete key from tikv
err = cli.Delete(key)
if err != nil {
panic(err)
}
fmt.Printf("key: %s deleted\n", key)
// get key again from tikv
val, err = cli.Get(key)
if err != nil {
panic(err)
}
fmt.Printf("found val: %s for key: %s\n", val, key)
}

153
examples/txnkv/txnkv.go Normal file
View File

@ -0,0 +1,153 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"flag"
"fmt"
"os"
"github.com/pingcap/errors"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/key"
"github.com/tikv/client-go/txnkv"
)
type KV struct {
K, V []byte
}
func (kv KV) String() string {
return fmt.Sprintf("%s => %s (%v)", kv.K, kv.V, kv.V)
}
var (
client *txnkv.TxnClient
pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address")
)
// Init initializes information.
func initStore() {
var err error
client, err = txnkv.NewTxnClient([]string{*pdAddr}, config.Security{})
if err != nil {
panic(err)
}
}
// key1 val1 key2 val2 ...
func puts(args ...[]byte) error {
tx, err := client.Begin()
if err != nil {
return errors.Trace(err)
}
for i := 0; i < len(args); i += 2 {
key, val := args[i], args[i+1]
err := tx.Set(key, val)
if err != nil {
return errors.Trace(err)
}
}
err = tx.Commit(context.Background())
if err != nil {
return errors.Trace(err)
}
return nil
}
func get(k []byte) (KV, error) {
tx, err := client.Begin()
if err != nil {
return KV{}, errors.Trace(err)
}
v, err := tx.Get(k)
if err != nil {
return KV{}, errors.Trace(err)
}
return KV{K: k, V: v}, nil
}
func dels(keys ...[]byte) error {
tx, err := client.Begin()
if err != nil {
return errors.Trace(err)
}
for _, key := range keys {
err := tx.Delete(key)
if err != nil {
return errors.Trace(err)
}
}
err = tx.Commit(context.Background())
if err != nil {
return errors.Trace(err)
}
return nil
}
func scan(keyPrefix []byte, limit int) ([]KV, error) {
tx, err := client.Begin()
if err != nil {
return nil, errors.Trace(err)
}
it, err := tx.Iter(key.Key(keyPrefix), nil)
if err != nil {
return nil, errors.Trace(err)
}
defer it.Close()
var ret []KV
for it.Valid() && limit > 0 {
ret = append(ret, KV{K: it.Key()[:], V: it.Value()[:]})
limit--
it.Next()
}
return ret, nil
}
func main() {
pdAddr := os.Getenv("PD_ADDR")
if pdAddr != "" {
os.Args = append(os.Args, "-pd", pdAddr)
}
flag.Parse()
initStore()
// set
err := puts([]byte("key1"), []byte("value1"), []byte("key2"), []byte("value2"))
if err != nil {
panic(err)
}
// get
kv, err := get([]byte("key1"))
if err != nil {
panic(err)
}
fmt.Println(kv)
// scan
ret, err := scan([]byte("key"), 10)
for _, kv := range ret {
fmt.Println(kv)
}
// delete
err = dels([]byte("key1"), []byte("key2"))
if err != nil {
panic(err)
}
}

6
go.mod
View File

@ -1,8 +1,10 @@
module github.com/tikv/client-go
require (
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
github.com/coreos/bbolt v1.3.2 // indirect
github.com/coreos/etcd v3.3.12+incompatible // indirect
github.com/coreos/etcd v3.3.12+incompatible
github.com/coreos/go-semver v0.2.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190212144455-93d5ec2c7f76 // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
@ -32,6 +34,7 @@ require (
github.com/pingcap/pd v2.1.5+incompatible
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446 // indirect
github.com/sirupsen/logrus v1.3.0
github.com/soheilhy/cmux v0.1.4 // indirect
@ -46,5 +49,6 @@ require (
go.uber.org/zap v1.9.1 // indirect
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect
google.golang.org/grpc v1.19.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
)

8
go.sum
View File

@ -1,6 +1,10 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf h1:qet1QNfXsQxTZqLG4oE62mJzwPIB8+Tee4RNCL9ulrY=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
@ -46,7 +50,7 @@ github.com/gorilla/mux v1.7.0 h1:tOSd0UKHQd6urX6ApfOn4XdBMY6Sh1MfxV3kmaazO+U=
github.com/gorilla/mux v1.7.0/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:BWIsLfhgKhV5g/oF34aRjniBHLTZe5DNekSjbAjIS6c=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
@ -159,6 +163,8 @@ google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoA
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0 h1:cfg4PD8YEdSFnm7qLV4++93WcmhH2nIUhMjhdCvl3j8=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -220,6 +220,15 @@ var (
Help: "Bucketed histogram of seconds cost for waiting timestamp future.",
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 18), // 5us ~ 128 ms
})
LocalLatchWaitTimeHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "tikvclient",
Name: "local_latch_wait_seconds",
Help: "Wait time of a get local latch.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
})
)
// RetLabel returns "ok" when err == nil and "err" when err != nil.
@ -254,4 +263,5 @@ func init() {
prometheus.MustRegister(PendingBatchRequests)
prometheus.MustRegister(BatchWaitDuration)
prometheus.MustRegister(TSFutureWaitDuration)
prometheus.MustRegister(LocalLatchWaitTimeHistogram)
}

View File

@ -133,19 +133,19 @@ func (t BackoffType) String() string {
// Maximum total sleep time(in ms) for kv/cop commands.
const (
copBuildTaskMaxBackoff = 5000
tsoMaxBackoff = 15000
scannerNextMaxBackoff = 20000
batchGetMaxBackoff = 20000
copNextMaxBackoff = 20000
getMaxBackoff = 20000
prewriteMaxBackoff = 20000
cleanupMaxBackoff = 20000
CopBuildTaskMaxBackoff = 5000
TsoMaxBackoff = 15000
ScannerNextMaxBackoff = 20000
BatchGetMaxBackoff = 20000
CopNextMaxBackoff = 20000
GetMaxBackoff = 20000
PrewriteMaxBackoff = 20000
CleanupMaxBackoff = 20000
GcOneRegionMaxBackoff = 20000
GcResolveLockMaxBackoff = 100000
deleteRangeOneRegionMaxBackoff = 100000
DeleteRangeOneRegionMaxBackoff = 100000
RawkvMaxBackoff = 20000
splitRegionBackoff = 20000
SplitRegionBackoff = 20000
)
// CommitMaxBackoff is max sleep time of the 'commit' command
@ -252,3 +252,8 @@ func (b *Backoffer) Fork() (*Backoffer, context.CancelFunc) {
func (b *Backoffer) GetContext() context.Context {
return b.ctx
}
// TotalSleep returns the total sleep time of the backoffer.
func (b *Backoffer) TotalSleep() time.Duration {
return time.Duration(b.totalSleep) * time.Millisecond
}

View File

@ -27,6 +27,9 @@ import (
"google.golang.org/grpc/status"
)
// ErrBodyMissing response body is missing error
var ErrBodyMissing = errors.New("response body is missing")
// RegionRequestSender sends KV/Cop requests to tikv server. It handles network
// errors and some region errors internally.
//
@ -57,6 +60,11 @@ func NewRegionRequestSender(regionCache *locate.RegionCache, client Client) *Reg
}
}
// RPCError returns an error if an RPC error is encountered during request.
func (s *RegionRequestSender) RPCError() error {
return s.rpcError
}
// SendReq sends a request to tikv server.
func (s *RegionRequestSender) SendReq(bo *retry.Backoffer, req *Request, regionID locate.RegionVerID, timeout time.Duration) (*Response, error) {

58
txnkv/client.go Normal file
View File

@ -0,0 +1,58 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package txnkv
import (
"context"
"github.com/pingcap/errors"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/retry"
"github.com/tikv/client-go/txnkv/store"
)
type TxnClient struct {
tikvStore *store.TiKVStore
}
func NewTxnClient(pdAddrs []string, security config.Security) (*TxnClient, error) {
tikvStore, err := store.NewStore(pdAddrs, security)
if err != nil {
return nil, errors.Trace(err)
}
return &TxnClient{
tikvStore: tikvStore,
}, nil
}
func (c *TxnClient) Close() error {
err := c.tikvStore.Close()
return errors.Trace(err)
}
func (c *TxnClient) Begin() (*Transaction, error) {
ts, err := c.GetTS()
if err != nil {
return nil, errors.Trace(err)
}
return c.BeginWithTS(ts), nil
}
func (c *TxnClient) BeginWithTS(ts uint64) *Transaction {
return newTransaction(c.tikvStore, ts)
}
func (c *TxnClient) GetTS() (uint64, error) {
return c.tikvStore.GetTimestampWithRetry(retry.NewBackoffer(context.TODO(), retry.TsoMaxBackoff))
}

View File

@ -28,6 +28,8 @@ var (
ErrEntryTooLarge = errors.New("entry is too large")
// ErrKeyExists returns when key is already exist.
ErrKeyExists = errors.New("key already exist")
// ErrInvalidTxn is the error that using a transaction after calling Commit or Rollback.
ErrInvalidTxn = errors.New("invalid transaction")
)
// IsErrNotFound checks if err is a kind of NotFound error.

43
txnkv/store/batch.go Normal file
View File

@ -0,0 +1,43 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import "github.com/tikv/client-go/locate"
// TiKV recommends each RPC packet should be less than ~1MB. We keep each packet's
// Key+Value size below 16KB.
const txnCommitBatchSize = 16 * 1024
// batchKeys is a batch of keys in the same region.
type batchKeys struct {
region locate.RegionVerID
keys [][]byte
}
// appendBatchBySize appends keys to []batchKeys. It may split the keys to make
// sure each batch's size does not exceed the limit.
func appendBatchBySize(b []batchKeys, region locate.RegionVerID, keys [][]byte, sizeFn func([]byte) int, limit int) []batchKeys {
var start, end int
for start = 0; start < len(keys); start = end {
var size int
for end = start; end < len(keys) && size < limit; end++ {
size += sizeFn(keys[end])
}
b = append(b, batchKeys{
region: region,
keys: keys[start:end],
})
}
return b
}

View File

@ -0,0 +1,106 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"fmt"
"strings"
"sync/atomic"
"time"
)
// ExecDetails contains execution detail information.
type ExecDetails struct {
CalleeAddress string
ProcessTime time.Duration
WaitTime time.Duration
BackoffTime time.Duration
RequestCount int
TotalKeys int64
ProcessedKeys int64
CommitDetail *CommitDetails
}
// CommitDetails contains commit detail information.
type CommitDetails struct {
GetCommitTsTime time.Duration
PrewriteTime time.Duration
CommitTime time.Duration
LocalLatchTime time.Duration
TotalBackoffTime time.Duration
ResolveLockTime int64
WriteKeys int
WriteSize int
PrewriteRegionNum int32
TxnRetry int
}
// String implements the fmt.Stringer interface.
func (d ExecDetails) String() string {
parts := make([]string, 0, 6)
if d.ProcessTime > 0 {
parts = append(parts, fmt.Sprintf("process_time:%vs", d.ProcessTime.Seconds()))
}
if d.WaitTime > 0 {
parts = append(parts, fmt.Sprintf("wait_time:%vs", d.WaitTime.Seconds()))
}
if d.BackoffTime > 0 {
parts = append(parts, fmt.Sprintf("backoff_time:%vs", d.BackoffTime.Seconds()))
}
if d.RequestCount > 0 {
parts = append(parts, fmt.Sprintf("request_count:%d", d.RequestCount))
}
if d.TotalKeys > 0 {
parts = append(parts, fmt.Sprintf("total_keys:%d", d.TotalKeys))
}
if d.ProcessedKeys > 0 {
parts = append(parts, fmt.Sprintf("processed_keys:%d", d.ProcessedKeys))
}
commitDetails := d.CommitDetail
if commitDetails != nil {
if commitDetails.PrewriteTime > 0 {
parts = append(parts, fmt.Sprintf("prewrite_time:%vs", commitDetails.PrewriteTime.Seconds()))
}
if commitDetails.CommitTime > 0 {
parts = append(parts, fmt.Sprintf("commit_time:%vs", commitDetails.CommitTime.Seconds()))
}
if commitDetails.GetCommitTsTime > 0 {
parts = append(parts, fmt.Sprintf("get_commit_ts_time:%vs", commitDetails.GetCommitTsTime.Seconds()))
}
if commitDetails.TotalBackoffTime > 0 {
parts = append(parts, fmt.Sprintf("total_backoff_time:%vs", commitDetails.TotalBackoffTime.Seconds()))
}
resolveLockTime := atomic.LoadInt64(&commitDetails.ResolveLockTime)
if resolveLockTime > 0 {
parts = append(parts, fmt.Sprintf("resolve_lock_time:%vs", time.Duration(resolveLockTime).Seconds()))
}
if commitDetails.LocalLatchTime > 0 {
parts = append(parts, fmt.Sprintf("local_latch_wait_time:%vs", commitDetails.LocalLatchTime.Seconds()))
}
if commitDetails.WriteKeys > 0 {
parts = append(parts, fmt.Sprintf("write_keys:%d", commitDetails.WriteKeys))
}
if commitDetails.WriteSize > 0 {
parts = append(parts, fmt.Sprintf("write_size:%d", commitDetails.WriteSize))
}
prewriteRegionNum := atomic.LoadInt32(&commitDetails.PrewriteRegionNum)
if prewriteRegionNum > 0 {
parts = append(parts, fmt.Sprintf("prewrite_region:%d", prewriteRegionNum))
}
if commitDetails.TxnRetry > 0 {
parts = append(parts, fmt.Sprintf("txn_retry:%d", commitDetails.TxnRetry))
}
}
return strings.Join(parts, " ")
}

122
txnkv/store/delete_range.go Normal file
View File

@ -0,0 +1,122 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"bytes"
"context"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/tikv/client-go/retry"
"github.com/tikv/client-go/rpc"
)
// DeleteRangeTask is used to delete all keys in a range. After
// performing DeleteRange, it keeps how many ranges it affects and
// if the task was canceled or not.
type DeleteRangeTask struct {
completedRegions int
canceled bool
store *TiKVStore
ctx context.Context
startKey []byte
endKey []byte
}
// NewDeleteRangeTask creates a DeleteRangeTask. Deleting will not be performed right away.
// WARNING: Currently, this API may leave some waste key-value pairs uncleaned in TiKV. Be careful while using it.
func NewDeleteRangeTask(ctx context.Context, store *TiKVStore, startKey []byte, endKey []byte) *DeleteRangeTask {
return &DeleteRangeTask{
completedRegions: 0,
canceled: false,
store: store,
ctx: ctx,
startKey: startKey,
endKey: endKey,
}
}
// Execute performs the delete range operation.
func (t *DeleteRangeTask) Execute() error {
startKey, rangeEndKey := t.startKey, t.endKey
for {
select {
case <-t.ctx.Done():
t.canceled = true
return nil
default:
}
bo := retry.NewBackoffer(t.ctx, retry.DeleteRangeOneRegionMaxBackoff)
loc, err := t.store.GetRegionCache().LocateKey(bo, startKey)
if err != nil {
return errors.Trace(err)
}
// Delete to the end of the region, except if it's the last region overlapping the range
endKey := loc.EndKey
// If it is the last region
if loc.Contains(rangeEndKey) {
endKey = rangeEndKey
}
req := &rpc.Request{
Type: rpc.CmdDeleteRange,
DeleteRange: &kvrpcpb.DeleteRangeRequest{
StartKey: startKey,
EndKey: endKey,
},
}
resp, err := t.store.SendReq(bo, req, loc.Region, rpc.ReadTimeoutMedium)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
continue
}
deleteRangeResp := resp.DeleteRange
if deleteRangeResp == nil {
return errors.Trace(rpc.ErrBodyMissing)
}
if err := deleteRangeResp.GetError(); err != "" {
return errors.Errorf("unexpected delete range err: %v", err)
}
t.completedRegions++
if bytes.Equal(endKey, rangeEndKey) {
break
}
startKey = endKey
}
return nil
}
// CompletedRegions returns the number of regions that are affected by this delete range task
func (t *DeleteRangeTask) CompletedRegions() int {
return t.completedRegions
}
// IsCanceled returns true if the delete range operation was canceled on the half way
func (t *DeleteRangeTask) IsCanceled() bool {
return t.canceled
}

47
txnkv/store/errors.go Normal file
View File

@ -0,0 +1,47 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"fmt"
"github.com/pingcap/errors"
"github.com/tikv/client-go/key"
)
// TxnRetryableMark is used to direct user to restart a transaction.
// TiDB decides whether to retry transaction by checking if error message contains
// string "try again later" literally. The common usage is `errors.Annotate(err, TxnRetryableMark)`.
// Note that it should be only used if i) the error occurs inside a transaction
// and ii) the error is not totally unexpected and hopefully will recover soon.
const TxnRetryableMark = "[try again later]"
var (
// ErrResultUndetermined means that the commit status is unknown.
ErrResultUndetermined = errors.New("result undetermined")
// ErrNotImplemented returns when a function is not implemented yet.
ErrNotImplemented = errors.New("not implemented")
// ErrPDServerTimeout is the error that PD does not repond in time.
ErrPDServerTimeout = errors.New("PD server timeout")
// ErrStartTSFallBehind is the error a transaction runs too long and data
// loaded from TiKV may out of date because of GC.
ErrStartTSFallBehind = errors.New("StartTS may fall behind safePoint")
)
// ErrKeyAlreadyExist is the error that a key exists in TiKV when it should not.
type ErrKeyAlreadyExist key.Key
func (e ErrKeyAlreadyExist) Error() string {
return fmt.Sprintf("key already exists: %q", e)
}

View File

@ -0,0 +1,381 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"container/list"
"context"
"sync"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/locate"
"github.com/tikv/client-go/metrics"
"github.com/tikv/client-go/retry"
"github.com/tikv/client-go/rpc"
)
// ResolvedCacheSize is max number of cached txn status.
const ResolvedCacheSize = 2048
// LockResolver resolves locks and also caches resolved txn status.
type LockResolver struct {
store *TiKVStore
mu struct {
sync.RWMutex
// resolved caches resolved txns (FIFO, txn id -> txnStatus).
resolved map[uint64]TxnStatus
recentResolved *list.List
}
}
func newLockResolver(store *TiKVStore) *LockResolver {
r := &LockResolver{
store: store,
}
r.mu.resolved = make(map[uint64]TxnStatus)
r.mu.recentResolved = list.New()
return r
}
// NewLockResolver is exported for other pkg to use, suppress unused warning.
var _ = NewLockResolver
// NewLockResolver creates a LockResolver.
// It is exported for other pkg to use. For instance, binlog service needs
// to determine a transaction's commit state.
func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolver, error) {
s, err := NewStore(etcdAddrs, security)
if err != nil {
return nil, errors.Trace(err)
}
return s.GetLockResolver(), nil
}
// TxnStatus represents a txn's final status. It should be Commit or Rollback.
type TxnStatus uint64
// IsCommitted returns true if the txn's final status is Commit.
func (s TxnStatus) IsCommitted() bool { return s > 0 }
// CommitTS returns the txn's commitTS. It is valid iff `IsCommitted` is true.
func (s TxnStatus) CommitTS() uint64 { return uint64(s) }
// By default, locks after 3000ms is considered unusual (the client created the
// lock might be dead). Other client may cleanup this kind of lock.
// For locks created recently, we will do backoff and retry.
var defaultLockTTL uint64 = 3000
// TODO: Consider if it's appropriate.
var maxLockTTL uint64 = 120000
// ttl = ttlFactor * sqrt(writeSizeInMiB)
var ttlFactor = 6000
// Lock represents a lock from tikv server.
type Lock struct {
Key []byte
Primary []byte
TxnID uint64
TTL uint64
}
// NewLock creates a new *Lock.
func NewLock(l *kvrpcpb.LockInfo) *Lock {
ttl := l.GetLockTtl()
if ttl == 0 {
ttl = defaultLockTTL
}
return &Lock{
Key: l.GetKey(),
Primary: l.GetPrimaryLock(),
TxnID: l.GetLockVersion(),
TTL: ttl,
}
}
func (lr *LockResolver) saveResolved(txnID uint64, status TxnStatus) {
lr.mu.Lock()
defer lr.mu.Unlock()
if _, ok := lr.mu.resolved[txnID]; ok {
return
}
lr.mu.resolved[txnID] = status
lr.mu.recentResolved.PushBack(txnID)
if len(lr.mu.resolved) > ResolvedCacheSize {
front := lr.mu.recentResolved.Front()
delete(lr.mu.resolved, front.Value.(uint64))
lr.mu.recentResolved.Remove(front)
}
}
func (lr *LockResolver) getResolved(txnID uint64) (TxnStatus, bool) {
lr.mu.RLock()
defer lr.mu.RUnlock()
s, ok := lr.mu.resolved[txnID]
return s, ok
}
// BatchResolveLocks resolve locks in a batch
func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, loc locate.RegionVerID) (bool, error) {
if len(locks) == 0 {
return true, nil
}
metrics.LockResolverCounter.WithLabelValues("batch_resolve").Inc()
var expiredLocks []*Lock
for _, l := range locks {
if lr.store.GetOracle().IsExpired(l.TxnID, l.TTL) {
metrics.LockResolverCounter.WithLabelValues("expired").Inc()
expiredLocks = append(expiredLocks, l)
} else {
metrics.LockResolverCounter.WithLabelValues("not_expired").Inc()
}
}
if len(expiredLocks) != len(locks) {
log.Errorf("BatchResolveLocks: get %d Locks, but only %d are expired, maybe safe point is wrong!", len(locks), len(expiredLocks))
return false, nil
}
startTime := time.Now()
txnInfos := make(map[uint64]uint64)
for _, l := range expiredLocks {
if _, ok := txnInfos[l.TxnID]; ok {
continue
}
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary)
if err != nil {
return false, errors.Trace(err)
}
txnInfos[l.TxnID] = uint64(status)
}
log.Infof("BatchResolveLocks: it took %v to lookup %v txn status", time.Since(startTime), len(txnInfos))
var listTxnInfos []*kvrpcpb.TxnInfo
for txnID, status := range txnInfos {
listTxnInfos = append(listTxnInfos, &kvrpcpb.TxnInfo{
Txn: txnID,
Status: status,
})
}
req := &rpc.Request{
Type: rpc.CmdResolveLock,
ResolveLock: &kvrpcpb.ResolveLockRequest{
TxnInfos: listTxnInfos,
},
}
startTime = time.Now()
resp, err := lr.store.SendReq(bo, req, loc, rpc.ReadTimeoutShort)
if err != nil {
return false, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return false, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return false, errors.Trace(err)
}
return false, nil
}
cmdResp := resp.ResolveLock
if cmdResp == nil {
return false, errors.Trace(rpc.ErrBodyMissing)
}
if keyErr := cmdResp.GetError(); keyErr != nil {
return false, errors.Errorf("unexpected resolve err: %s", keyErr)
}
log.Infof("BatchResolveLocks: it took %v to resolve %v locks in a batch.", time.Since(startTime), len(expiredLocks))
return true, nil
}
// ResolveLocks tries to resolve Locks. The resolving process is in 3 steps:
// 1) Use the `lockTTL` to pick up all expired locks. Only locks that are too
// old are considered orphan locks and will be handled later. If all locks
// are expired then all locks will be resolved so the returned `ok` will be
// true, otherwise caller should sleep a while before retry.
// 2) For each lock, query the primary key to get txn(which left the lock)'s
// commit status.
// 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to
// the same transaction.
func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, locks []*Lock) (ok bool, err error) {
if len(locks) == 0 {
return true, nil
}
metrics.LockResolverCounter.WithLabelValues("resolve").Inc()
var expiredLocks []*Lock
for _, l := range locks {
if lr.store.GetOracle().IsExpired(l.TxnID, l.TTL) {
metrics.LockResolverCounter.WithLabelValues("expired").Inc()
expiredLocks = append(expiredLocks, l)
} else {
metrics.LockResolverCounter.WithLabelValues("not_expired").Inc()
}
}
if len(expiredLocks) == 0 {
return false, nil
}
// TxnID -> []Region, record resolved Regions.
// TODO: Maybe put it in LockResolver and share by all txns.
cleanTxns := make(map[uint64]map[locate.RegionVerID]struct{})
for _, l := range expiredLocks {
status, err := lr.getTxnStatus(bo, l.TxnID, l.Primary)
if err != nil {
return false, errors.Trace(err)
}
cleanRegions := cleanTxns[l.TxnID]
if cleanRegions == nil {
cleanRegions = make(map[locate.RegionVerID]struct{})
cleanTxns[l.TxnID] = cleanRegions
}
err = lr.resolveLock(bo, l, status, cleanRegions)
if err != nil {
return false, errors.Trace(err)
}
}
return len(expiredLocks) == len(locks), nil
}
// GetTxnStatus queries tikv-server for a txn's status (commit/rollback).
// If the primary key is still locked, it will launch a Rollback to abort it.
// To avoid unnecessarily aborting too many txns, it is wiser to wait a few
// seconds before calling it after Prewrite.
func (lr *LockResolver) GetTxnStatus(txnID uint64, primary []byte) (TxnStatus, error) {
bo := retry.NewBackoffer(context.Background(), retry.CleanupMaxBackoff)
status, err := lr.getTxnStatus(bo, txnID, primary)
return status, errors.Trace(err)
}
func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary []byte) (TxnStatus, error) {
if s, ok := lr.getResolved(txnID); ok {
return s, nil
}
metrics.LockResolverCounter.WithLabelValues("query_txn_status").Inc()
var status TxnStatus
req := &rpc.Request{
Type: rpc.CmdCleanup,
Cleanup: &kvrpcpb.CleanupRequest{
Key: primary,
StartVersion: txnID,
},
}
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, primary)
if err != nil {
return status, errors.Trace(err)
}
resp, err := lr.store.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
if err != nil {
return status, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return status, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return status, errors.Trace(err)
}
continue
}
cmdResp := resp.Cleanup
if cmdResp == nil {
return status, errors.Trace(rpc.ErrBodyMissing)
}
if keyErr := cmdResp.GetError(); keyErr != nil {
err = errors.Errorf("unexpected cleanup err: %s, tid: %v", keyErr, txnID)
log.Error(err)
return status, err
}
if cmdResp.CommitVersion != 0 {
status = TxnStatus(cmdResp.GetCommitVersion())
metrics.LockResolverCounter.WithLabelValues("query_txn_status_committed").Inc()
} else {
metrics.LockResolverCounter.WithLabelValues("query_txn_status_rolled_back").Inc()
}
lr.saveResolved(txnID, status)
return status, nil
}
}
func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStatus, cleanRegions map[locate.RegionVerID]struct{}) error {
metrics.LockResolverCounter.WithLabelValues("query_resolve_locks").Inc()
for {
loc, err := lr.store.GetRegionCache().LocateKey(bo, l.Key)
if err != nil {
return errors.Trace(err)
}
if _, ok := cleanRegions[loc.Region]; ok {
return nil
}
req := &rpc.Request{
Type: rpc.CmdResolveLock,
ResolveLock: &kvrpcpb.ResolveLockRequest{
StartVersion: l.TxnID,
},
}
if status.IsCommitted() {
req.ResolveLock.CommitVersion = status.CommitTS()
}
resp, err := lr.store.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
continue
}
cmdResp := resp.ResolveLock
if cmdResp == nil {
return errors.Trace(rpc.ErrBodyMissing)
}
if keyErr := cmdResp.GetError(); keyErr != nil {
err = errors.Errorf("unexpected resolve err: %s, lock: %v", keyErr, l)
log.Error(err)
return err
}
cleanRegions[loc.Region] = struct{}{}
return nil
}
}

80
txnkv/store/pd_codec.go Normal file
View File

@ -0,0 +1,80 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"context"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/client"
"github.com/tikv/client-go/codec"
)
type codecPDClient struct {
pd.Client
}
// GetRegion encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *codecPDClient) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
encodedKey := codec.EncodeBytes(key)
region, peer, err := c.Client.GetRegion(ctx, encodedKey)
return processRegionResult(region, peer, err)
}
func (c *codecPDClient) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
encodedKey := codec.EncodeBytes(key)
region, peer, err := c.Client.GetPrevRegion(ctx, encodedKey)
return processRegionResult(region, peer, err)
}
// GetRegionByID encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *codecPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) {
region, peer, err := c.Client.GetRegionByID(ctx, regionID)
return processRegionResult(region, peer, err)
}
func processRegionResult(region *metapb.Region, peer *metapb.Peer, err error) (*metapb.Region, *metapb.Peer, error) {
if err != nil {
return nil, nil, errors.Trace(err)
}
if region == nil {
return nil, nil, nil
}
err = decodeRegionMetaKey(region)
if err != nil {
return nil, nil, errors.Trace(err)
}
return region, peer, nil
}
func decodeRegionMetaKey(r *metapb.Region) error {
if len(r.StartKey) != 0 {
_, decoded, err := codec.DecodeBytes(r.StartKey)
if err != nil {
return errors.Trace(err)
}
r.StartKey = decoded
}
if len(r.EndKey) != 0 {
_, decoded, err := codec.DecodeBytes(r.EndKey)
if err != nil {
return errors.Trace(err)
}
r.EndKey = decoded
}
return nil
}

159
txnkv/store/safepoint.go Normal file
View File

@ -0,0 +1,159 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"context"
"crypto/tls"
"strconv"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
// Safe point constants.
const (
// This is almost the same as 'tikv_gc_safe_point' in the table 'mysql.tidb',
// save this to pd instead of tikv, because we can't use interface of table
// if the safepoint on tidb is expired.
GcSavedSafePoint = "/tidb/store/gcworker/saved_safe_point"
GcSafePointCacheInterval = time.Second * 100
gcCPUTimeInaccuracyBound = time.Second
gcSafePointUpdateInterval = time.Second * 10
gcSafePointQuickRepeatInterval = time.Second
)
// SafePointKV is used for a seamingless integration for mockTest and runtime.
type SafePointKV interface {
Put(k string, v string) error
Get(k string) (string, error)
}
// MockSafePointKV implements SafePointKV at mock test
type MockSafePointKV struct {
store map[string]string
mockLock sync.RWMutex
}
// NewMockSafePointKV creates an instance of MockSafePointKV
func NewMockSafePointKV() *MockSafePointKV {
return &MockSafePointKV{
store: make(map[string]string),
}
}
// Put implements the Put method for SafePointKV
func (w *MockSafePointKV) Put(k string, v string) error {
w.mockLock.Lock()
defer w.mockLock.Unlock()
w.store[k] = v
return nil
}
// Get implements the Get method for SafePointKV
func (w *MockSafePointKV) Get(k string) (string, error) {
w.mockLock.RLock()
defer w.mockLock.RUnlock()
elem := w.store[k]
return elem, nil
}
// EtcdSafePointKV implements SafePointKV at runtime
type EtcdSafePointKV struct {
cli *clientv3.Client
}
// NewEtcdSafePointKV creates an instance of EtcdSafePointKV
func NewEtcdSafePointKV(addrs []string, tlsConfig *tls.Config) (*EtcdSafePointKV, error) {
etcdCli, err := createEtcdKV(addrs, tlsConfig)
if err != nil {
return nil, errors.Trace(err)
}
return &EtcdSafePointKV{cli: etcdCli}, nil
}
func createEtcdKV(addrs []string, tlsConfig *tls.Config) (*clientv3.Client, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: addrs,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
},
TLS: tlsConfig,
})
if err != nil {
return nil, errors.Trace(err)
}
return cli, nil
}
// Put implements the Put method for SafePointKV
func (w *EtcdSafePointKV) Put(k string, v string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
_, err := w.cli.Put(ctx, k, v)
cancel()
if err != nil {
return errors.Trace(err)
}
return nil
}
// Get implements the Get method for SafePointKV
func (w *EtcdSafePointKV) Get(k string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
resp, err := w.cli.Get(ctx, k)
cancel()
if err != nil {
return "", errors.Trace(err)
}
if len(resp.Kvs) > 0 {
return string(resp.Kvs[0].Value), nil
}
return "", nil
}
func saveSafePoint(kv SafePointKV, key string, t uint64) error {
s := strconv.FormatUint(t, 10)
err := kv.Put(GcSavedSafePoint, s)
if err != nil {
log.Error("save safepoint failed:", err)
return errors.Trace(err)
}
return nil
}
func loadSafePoint(kv SafePointKV, key string) (uint64, error) {
str, err := kv.Get(GcSavedSafePoint)
if err != nil {
return 0, errors.Trace(err)
}
if str == "" {
return 0, nil
}
t, err := strconv.ParseUint(str, 10, 64)
if err != nil {
return 0, errors.Trace(err)
}
return t, nil
}

234
txnkv/store/scan.go Normal file
View File

@ -0,0 +1,234 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"bytes"
"context"
"github.com/pingcap/errors"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/key"
"github.com/tikv/client-go/retry"
"github.com/tikv/client-go/rpc"
"github.com/tikv/client-go/txnkv/kv"
)
// Scanner support tikv scan
type Scanner struct {
snapshot *TiKVSnapshot
batchSize int
valid bool
cache []*pb.KvPair
idx int
nextStartKey []byte
endKey []byte
eof bool
}
func newScanner(snapshot *TiKVSnapshot, startKey []byte, endKey []byte, batchSize int) (*Scanner, error) {
// It must be > 1. Otherwise scanner won't skipFirst.
if batchSize <= 1 {
batchSize = scanBatchSize
}
scanner := &Scanner{
snapshot: snapshot,
batchSize: batchSize,
valid: true,
nextStartKey: startKey,
endKey: endKey,
}
err := scanner.Next()
if kv.IsErrNotFound(err) {
return scanner, nil
}
return scanner, errors.Trace(err)
}
// Valid return valid.
func (s *Scanner) Valid() bool {
return s.valid
}
// Key return key.
func (s *Scanner) Key() key.Key {
if s.valid {
return s.cache[s.idx].Key
}
return nil
}
// Value return value.
func (s *Scanner) Value() []byte {
if s.valid {
return s.cache[s.idx].Value
}
return nil
}
// Next return next element.
func (s *Scanner) Next() error {
bo := retry.NewBackoffer(context.Background(), retry.ScannerNextMaxBackoff)
if !s.valid {
return errors.New("scanner iterator is invalid")
}
for {
s.idx++
if s.idx >= len(s.cache) {
if s.eof {
s.Close()
return nil
}
err := s.getData(bo)
if err != nil {
s.Close()
return errors.Trace(err)
}
if s.idx >= len(s.cache) {
continue
}
}
current := s.cache[s.idx]
if len(s.endKey) > 0 && key.Key(current.Key).Cmp(key.Key(s.endKey)) >= 0 {
s.eof = true
s.Close()
return nil
}
// Try to resolve the lock
if current.GetError() != nil {
// 'current' would be modified if the lock being resolved
if err := s.resolveCurrentLock(bo, current); err != nil {
s.Close()
return errors.Trace(err)
}
// The check here does not violate the KeyOnly semantic, because current's value
// is filled by resolveCurrentLock which fetches the value by snapshot.get, so an empty
// value stands for NotExist
if len(current.Value) == 0 {
continue
}
}
return nil
}
}
// Close close iterator.
func (s *Scanner) Close() {
s.valid = false
}
func (s *Scanner) startTS() uint64 {
return s.snapshot.ts
}
func (s *Scanner) resolveCurrentLock(bo *retry.Backoffer, current *pb.KvPair) error {
val, err := s.snapshot.get(bo, key.Key(current.Key))
if err != nil {
return errors.Trace(err)
}
current.Error = nil
current.Value = val
return nil
}
func (s *Scanner) getData(bo *retry.Backoffer) error {
log.Debugf("txn getData nextStartKey[%q], txn %d", s.nextStartKey, s.startTS())
sender := rpc.NewRegionRequestSender(s.snapshot.store.GetRegionCache(), s.snapshot.store.GetRPCClient())
for {
loc, err := s.snapshot.store.regionCache.LocateKey(bo, s.nextStartKey)
if err != nil {
return errors.Trace(err)
}
reqEndKey := s.endKey
if len(reqEndKey) > 0 && len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, reqEndKey) < 0 {
reqEndKey = loc.EndKey
}
req := &rpc.Request{
Type: rpc.CmdScan,
Scan: &pb.ScanRequest{
StartKey: s.nextStartKey,
EndKey: reqEndKey,
Limit: uint32(s.batchSize),
Version: s.startTS(),
KeyOnly: s.snapshot.KeyOnly,
},
Context: pb.Context{
Priority: s.snapshot.Priority,
NotFillCache: s.snapshot.NotFillCache,
},
}
resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutMedium)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
log.Debugf("scanner getData failed: %s", regionErr)
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
continue
}
cmdScanResp := resp.Scan
if cmdScanResp == nil {
return errors.Trace(rpc.ErrBodyMissing)
}
err = s.snapshot.store.CheckVisibility(s.startTS())
if err != nil {
return errors.Trace(err)
}
kvPairs := cmdScanResp.Pairs
// Check if kvPair contains error, it should be a Lock.
for _, pair := range kvPairs {
if keyErr := pair.GetError(); keyErr != nil {
lock, err := extractLockFromKeyErr(keyErr)
if err != nil {
return errors.Trace(err)
}
pair.Key = lock.Key
}
}
s.cache, s.idx = kvPairs, 0
if len(kvPairs) < s.batchSize {
// No more data in current Region. Next getData() starts
// from current Region's endKey.
s.nextStartKey = loc.EndKey
if len(loc.EndKey) == 0 || (len(s.endKey) > 0 && key.Key(s.nextStartKey).Cmp(key.Key(s.endKey)) >= 0) {
// Current Region is the last one.
s.eof = true
}
return nil
}
// next getData() starts from the last key in kvPairs (but skip
// it by appending a '\x00' to the key). Note that next getData()
// may get an empty response if the Region in fact does not have
// more data.
lastKey := kvPairs[len(kvPairs)-1].GetKey()
s.nextStartKey = key.Key(lastKey).Next()
return nil
}
}

316
txnkv/store/snapshot.go Normal file
View File

@ -0,0 +1,316 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"bytes"
"context"
"fmt"
"sync"
"time"
"unsafe"
"github.com/pingcap/errors"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/key"
"github.com/tikv/client-go/metrics"
"github.com/tikv/client-go/retry"
"github.com/tikv/client-go/rpc"
"github.com/tikv/client-go/txnkv/kv"
)
const (
scanBatchSize = 256
batchGetSize = 5120
)
// TiKVSnapshot supports read from TiKV.
type TiKVSnapshot struct {
store *TiKVStore
ts uint64
Priority pb.CommandPri
NotFillCache bool
SyncLog bool
KeyOnly bool
}
func newTiKVSnapshot(store *TiKVStore, ts uint64) *TiKVSnapshot {
return &TiKVSnapshot{
store: store,
ts: ts,
Priority: pb.CommandPri_Normal,
}
}
// BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs.
// The map will not contain nonexistent keys.
func (s *TiKVSnapshot) BatchGet(keys []key.Key) (map[string][]byte, error) {
m := make(map[string][]byte)
if len(keys) == 0 {
return m, nil
}
metrics.TxnCmdCounter.WithLabelValues("batch_get").Inc()
start := time.Now()
defer func() { metrics.TxnCmdHistogram.WithLabelValues("batch_get").Observe(time.Since(start).Seconds()) }()
// We want [][]byte instead of []key.Key, use some magic to save memory.
bytesKeys := *(*[][]byte)(unsafe.Pointer(&keys))
bo := retry.NewBackoffer(context.Background(), retry.BatchGetMaxBackoff)
// Create a map to collect key-values from region servers.
var mu sync.Mutex
err := s.batchGetKeysByRegions(bo, bytesKeys, func(k, v []byte) {
if len(v) == 0 {
return
}
mu.Lock()
m[string(k)] = v
mu.Unlock()
})
if err != nil {
return nil, errors.Trace(err)
}
err = s.store.CheckVisibility(s.ts)
if err != nil {
return nil, errors.Trace(err)
}
return m, nil
}
func (s *TiKVSnapshot) batchGetKeysByRegions(bo *retry.Backoffer, keys [][]byte, collectF func(k, v []byte)) error {
groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys)
if err != nil {
return errors.Trace(err)
}
metrics.TxnRegionsNumHistogram.WithLabelValues("snapshot").Observe(float64(len(groups)))
var batches []batchKeys
for id, g := range groups {
batches = appendBatchBySize(batches, id, g, func([]byte) int { return 1 }, batchGetSize)
}
if len(batches) == 0 {
return nil
}
if len(batches) == 1 {
return errors.Trace(s.batchGetSingleRegion(bo, batches[0], collectF))
}
ch := make(chan error)
for _, batch1 := range batches {
batch := batch1
go func() {
backoffer, cancel := bo.Fork()
defer cancel()
ch <- s.batchGetSingleRegion(backoffer, batch, collectF)
}()
}
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
log.Debugf("snapshot batchGet failed: %v, tid: %d", e, s.ts)
err = e
}
}
return errors.Trace(err)
}
func (s *TiKVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, collectF func(k, v []byte)) error {
sender := rpc.NewRegionRequestSender(s.store.GetRegionCache(), s.store.GetRPCClient())
pending := batch.keys
for {
req := &rpc.Request{
Type: rpc.CmdBatchGet,
BatchGet: &pb.BatchGetRequest{
Keys: pending,
Version: s.ts,
},
Context: pb.Context{
Priority: s.Priority,
NotFillCache: s.NotFillCache,
},
}
resp, err := sender.SendReq(bo, req, batch.region, rpc.ReadTimeoutMedium)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
err = s.batchGetKeysByRegions(bo, pending, collectF)
return errors.Trace(err)
}
batchGetResp := resp.BatchGet
if batchGetResp == nil {
return errors.Trace(rpc.ErrBodyMissing)
}
var (
lockedKeys [][]byte
locks []*Lock
)
for _, pair := range batchGetResp.Pairs {
keyErr := pair.GetError()
if keyErr == nil {
collectF(pair.GetKey(), pair.GetValue())
continue
}
lock, err := extractLockFromKeyErr(keyErr)
if err != nil {
return errors.Trace(err)
}
lockedKeys = append(lockedKeys, lock.Key)
locks = append(locks, lock)
}
if len(lockedKeys) > 0 {
ok, err := s.store.lockResolver.ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}
if !ok {
err = bo.Backoff(retry.BoTxnLockFast, errors.Errorf("batchGet lockedKeys: %d", len(lockedKeys)))
if err != nil {
return errors.Trace(err)
}
}
pending = lockedKeys
continue
}
return nil
}
}
// Get gets the value for key k from snapshot.
func (s *TiKVSnapshot) Get(k key.Key) ([]byte, error) {
val, err := s.get(retry.NewBackoffer(context.Background(), retry.GetMaxBackoff), k)
if err != nil {
return nil, errors.Trace(err)
}
if len(val) == 0 {
return nil, kv.ErrNotExist
}
return val, nil
}
func (s *TiKVSnapshot) get(bo *retry.Backoffer, k key.Key) ([]byte, error) {
sender := rpc.NewRegionRequestSender(s.store.GetRegionCache(), s.store.GetRPCClient())
req := &rpc.Request{
Type: rpc.CmdGet,
Get: &pb.GetRequest{
Key: k,
Version: s.ts,
},
Context: pb.Context{
Priority: s.Priority,
NotFillCache: s.NotFillCache,
},
}
for {
loc, err := s.store.regionCache.LocateKey(bo, k)
if err != nil {
return nil, errors.Trace(err)
}
resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
if err != nil {
return nil, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return nil, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return nil, errors.Trace(err)
}
continue
}
cmdGetResp := resp.Get
if cmdGetResp == nil {
return nil, errors.Trace(rpc.ErrBodyMissing)
}
val := cmdGetResp.GetValue()
if keyErr := cmdGetResp.GetError(); keyErr != nil {
lock, err := extractLockFromKeyErr(keyErr)
if err != nil {
return nil, errors.Trace(err)
}
ok, err := s.store.lockResolver.ResolveLocks(bo, []*Lock{lock})
if err != nil {
return nil, errors.Trace(err)
}
if !ok {
err = bo.Backoff(retry.BoTxnLockFast, errors.New(keyErr.String()))
if err != nil {
return nil, errors.Trace(err)
}
}
continue
}
return val, nil
}
}
// Iter returns a list of key-value pair after `k`.
func (s *TiKVSnapshot) Iter(k key.Key, upperBound key.Key) (kv.Iterator, error) {
scanner, err := newScanner(s, k, upperBound, scanBatchSize)
return scanner, errors.Trace(err)
}
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
func (s *TiKVSnapshot) IterReverse(k key.Key) (kv.Iterator, error) {
return nil, ErrNotImplemented
}
// SetPriority sets the priority of read requests.
func (s *TiKVSnapshot) SetPriority(priority int) {
s.Priority = pb.CommandPri(priority)
}
func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) {
if locked := keyErr.GetLocked(); locked != nil {
return NewLock(locked), nil
}
if keyErr.Conflict != nil {
err := errors.New(conflictToString(keyErr.Conflict))
return nil, errors.Annotate(err, TxnRetryableMark)
}
if keyErr.Retryable != "" {
err := errors.Errorf("tikv restarts txn: %s", keyErr.GetRetryable())
log.Debug(err)
return nil, errors.Annotate(err, TxnRetryableMark)
}
if keyErr.Abort != "" {
err := errors.Errorf("tikv aborts txn: %s", keyErr.GetAbort())
log.Warn(err)
return nil, errors.Trace(err)
}
return nil, errors.Errorf("unexpected KeyError: %s", keyErr.String())
}
func conflictToString(conflict *pb.WriteConflict) string {
var buf bytes.Buffer
fmt.Fprintf(&buf, "WriteConflict: startTS=%d, conflictTS=%d, key=%q, primary=%q", conflict.StartTs, conflict.ConflictTs, conflict.Key, conflict.Primary)
return buf.String()
}

View File

@ -0,0 +1,68 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"bytes"
"context"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/key"
"github.com/tikv/client-go/retry"
"github.com/tikv/client-go/rpc"
)
// SplitRegion splits the region contains splitKey into 2 regions: [start,
// splitKey) and [splitKey, end).
func SplitRegion(store *TiKVStore, splitKey key.Key) error {
log.Infof("start split_region at %q", splitKey)
bo := retry.NewBackoffer(context.Background(), retry.SplitRegionBackoff)
sender := rpc.NewRegionRequestSender(store.GetRegionCache(), store.GetRPCClient())
req := &rpc.Request{
Type: rpc.CmdSplitRegion,
SplitRegion: &kvrpcpb.SplitRegionRequest{
SplitKey: splitKey,
},
}
req.Context.Priority = kvrpcpb.CommandPri_Normal
for {
loc, err := store.GetRegionCache().LocateKey(bo, splitKey)
if err != nil {
return errors.Trace(err)
}
if bytes.Equal(splitKey, loc.StartKey) {
log.Infof("skip split_region region at %q", splitKey)
return nil
}
res, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
if err != nil {
return errors.Trace(err)
}
regionErr, err := res.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
continue
}
log.Infof("split_region at %q complete, new regions: %v, %v", splitKey, res.SplitRegion.GetLeft(), res.SplitRegion.GetRight())
return nil
}
}

223
txnkv/store/store.go Normal file
View File

@ -0,0 +1,223 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"context"
"crypto/tls"
"fmt"
"sync"
"time"
"github.com/pingcap/errors"
pd "github.com/pingcap/pd/client"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/locate"
"github.com/tikv/client-go/metrics"
"github.com/tikv/client-go/retry"
"github.com/tikv/client-go/rpc"
"github.com/tikv/client-go/txnkv/latch"
"github.com/tikv/client-go/txnkv/oracle"
"github.com/tikv/client-go/txnkv/oracle/oracles"
)
// update oracle's lastTS every 2000ms.
var oracleUpdateInterval = 2000
// TiKVStore contains methods to interact with a TiKV cluster.
type TiKVStore struct {
clusterID uint64
uuid string
oracle oracle.Oracle
client rpc.Client
pdClient pd.Client
regionCache *locate.RegionCache
lockResolver *LockResolver
txnLatches *latch.LatchesScheduler
etcdAddrs []string
tlsConfig *tls.Config
spkv SafePointKV
safePoint uint64
spTime time.Time
spMutex sync.RWMutex // this is used to update safePoint and spTime
closed chan struct{} // this is used to nofity when the store is closed
}
// NewStore creates a TiKVStore instance.
func NewStore(pdAddrs []string, security config.Security) (*TiKVStore, error) {
pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{
CAPath: security.SSLCA,
CertPath: security.SSLCert,
KeyPath: security.SSLKey,
})
if err != nil {
return nil, errors.Trace(err)
}
oracle, err := oracles.NewPdOracle(pdCli, time.Duration(oracleUpdateInterval)*time.Millisecond)
if err != nil {
return nil, errors.Trace(err)
}
tlsConfig, err := security.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
}
spkv, err := NewEtcdSafePointKV(pdAddrs, tlsConfig)
if err != nil {
return nil, errors.Trace(err)
}
clusterID := pdCli.GetClusterID(context.TODO())
store := &TiKVStore{
clusterID: clusterID,
uuid: fmt.Sprintf("tikv-%d", clusterID),
oracle: oracle,
client: rpc.NewRPCClient(security),
pdClient: &codecPDClient{pdCli},
regionCache: locate.NewRegionCache(pdCli),
etcdAddrs: pdAddrs,
tlsConfig: tlsConfig,
spkv: spkv,
spTime: time.Now(),
closed: make(chan struct{}),
}
store.lockResolver = newLockResolver(store)
if config.EnableTxnLocalLatch {
store.txnLatches = latch.NewScheduler(config.TxnLocalLatchCapacity)
}
go store.runSafePointChecker()
return store, nil
}
// GetLockResolver returns the lock resolver instance.
func (s *TiKVStore) GetLockResolver() *LockResolver {
return s.lockResolver
}
// GetOracle returns the oracle instance.
func (s *TiKVStore) GetOracle() oracle.Oracle {
return s.oracle
}
// GetRegionCache returns the region cache instance.
func (s *TiKVStore) GetRegionCache() *locate.RegionCache {
return s.regionCache
}
// GetRPCClient returns the rpc client instance.
func (s *TiKVStore) GetRPCClient() rpc.Client {
return s.client
}
// GetTxnLatches returns the latch scheduler instance.
func (s *TiKVStore) GetTxnLatches() *latch.LatchesScheduler {
return s.txnLatches
}
// GetSnapshot creates a snapshot for read.
func (s *TiKVStore) GetSnapshot(ts uint64) *TiKVSnapshot {
return newTiKVSnapshot(s, ts)
}
// SendReq sends a request to TiKV server.
func (s *TiKVStore) SendReq(bo *retry.Backoffer, req *rpc.Request, regionID locate.RegionVerID, timeout time.Duration) (*rpc.Response, error) {
sender := rpc.NewRegionRequestSender(s.regionCache, s.client)
return sender.SendReq(bo, req, regionID, timeout)
}
// Closed returns a channel that will be closed when TiKVStore is closed.
func (s *TiKVStore) Closed() <-chan struct{} {
return s.closed
}
// Close stops the TiKVStore instance and releases resources.
func (s *TiKVStore) Close() error {
s.oracle.Close()
s.pdClient.Close()
close(s.closed)
if err := s.client.Close(); err != nil {
return errors.Trace(err)
}
if s.txnLatches != nil {
s.txnLatches.Close()
}
return nil
}
// GetTimestampWithRetry queries PD for a new timestamp.
func (s *TiKVStore) GetTimestampWithRetry(bo *retry.Backoffer) (uint64, error) {
for {
startTS, err := s.oracle.GetTimestamp(bo.GetContext())
if err == nil {
return startTS, nil
}
err = bo.Backoff(retry.BoPDRPC, errors.Errorf("get timestamp failed: %v", err))
if err != nil {
return 0, errors.Trace(err)
}
}
}
func (s *TiKVStore) runSafePointChecker() {
d := gcSafePointUpdateInterval
for {
select {
case spCachedTime := <-time.After(d):
cachedSafePoint, err := loadSafePoint(s.spkv, GcSavedSafePoint)
if err == nil {
metrics.LoadSafepointCounter.WithLabelValues("ok").Inc()
s.spMutex.Lock()
s.safePoint, s.spTime = cachedSafePoint, spCachedTime
s.spMutex.Unlock()
d = gcSafePointUpdateInterval
} else {
metrics.LoadSafepointCounter.WithLabelValues("fail").Inc()
log.Errorf("fail to load safepoint from pd: %v", err)
d = gcSafePointQuickRepeatInterval
}
case <-s.Closed():
return
}
}
}
// CheckVisibility checks if it is safe to read using startTS (the startTS should
// be greater than current GC safepoint).
func (s *TiKVStore) CheckVisibility(startTS uint64) error {
s.spMutex.RLock()
cachedSafePoint := s.safePoint
cachedTime := s.spTime
s.spMutex.RUnlock()
diff := time.Since(cachedTime)
if diff > (GcSafePointCacheInterval - gcCPUTimeInaccuracyBound) {
return errors.Trace(ErrPDServerTimeout)
}
if startTS < cachedSafePoint {
return errors.Trace(ErrStartTSFallBehind)
}
return nil
}

View File

@ -0,0 +1,595 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"bytes"
"context"
"math"
"sync"
"sync/atomic"
"time"
"github.com/pingcap/errors"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/metrics"
"github.com/tikv/client-go/retry"
"github.com/tikv/client-go/rpc"
"github.com/tikv/client-go/txnkv/kv"
)
type commitAction int
const (
actionPrewrite commitAction = 1
actionCommit commitAction = 2
actionCleanup commitAction = 3
)
func (ca commitAction) String() string {
switch ca {
case actionPrewrite:
return "prewrite"
case actionCommit:
return "commit"
case actionCleanup:
return "cleanup"
}
return "unknown"
}
// MetricsTag returns detail tag for metrics.
func (ca commitAction) MetricsTag() string {
return "2pc_" + ca.String()
}
// TxnCommitter executes a two-phase commit protocol.
type TxnCommitter struct {
Priority pb.CommandPri
SyncLog bool
ConnID uint64 // ConnID is used for log.
store *TiKVStore
startTS uint64
keys [][]byte
mutations map[string]*pb.Mutation
lockTTL uint64
commitTS uint64
mu struct {
sync.RWMutex
committed bool
undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key.
}
cleanWg sync.WaitGroup
// maxTxnTimeUse represents max time a Txn may use (in ms) from its startTS to commitTS.
// We use it to guarantee GC worker will not influence any active txn. The value
// should be less than GC life time.
maxTxnTimeUse uint64
detail CommitDetails
}
// NewTxnCommitter creates a TxnCommitter.
func NewTxnCommitter(store *TiKVStore, startTS uint64, startTime time.Time, mutations map[string]*pb.Mutation) (*TxnCommitter, error) {
var (
keys [][]byte
size int
putCnt int
delCnt int
lockCnt int
)
for key, mut := range mutations {
switch mut.Op {
case pb.Op_Put, pb.Op_Insert:
putCnt++
case pb.Op_Del:
delCnt++
case pb.Op_Lock:
lockCnt++
}
keys = append(keys, []byte(key))
entrySize := len(mut.Key) + len(mut.Value)
if entrySize > config.TxnEntrySizeLimit {
return nil, kv.ErrEntryTooLarge
}
size += entrySize
}
if putCnt == 0 && delCnt == 0 {
return nil, nil
}
if len(keys) > int(config.TxnEntryCountLimit) || size > config.TxnTotalSizeLimit {
return nil, kv.ErrTxnTooLarge
}
// Convert from sec to ms
maxTxnTimeUse := uint64(config.MaxTxnTimeUse) * 1000
metrics.TxnWriteKVCountHistogram.Observe(float64(len(keys)))
metrics.TxnWriteSizeHistogram.Observe(float64(size))
return &TxnCommitter{
store: store,
startTS: startTS,
keys: keys,
mutations: mutations,
lockTTL: txnLockTTL(startTime, size),
maxTxnTimeUse: maxTxnTimeUse,
detail: CommitDetails{WriteSize: size, WriteKeys: len(keys)},
}, nil
}
func (c *TxnCommitter) primary() []byte {
return c.keys[0]
}
const bytesPerMiB = 1024 * 1024
func txnLockTTL(startTime time.Time, txnSize int) uint64 {
// Increase lockTTL for large transactions.
// The formula is `ttl = ttlFactor * sqrt(sizeInMiB)`.
// When writeSize is less than 256KB, the base ttl is defaultTTL (3s);
// When writeSize is 1MiB, 100MiB, or 400MiB, ttl is 6s, 60s, 120s correspondingly;
lockTTL := defaultLockTTL
if txnSize >= txnCommitBatchSize {
sizeMiB := float64(txnSize) / bytesPerMiB
lockTTL = uint64(float64(ttlFactor) * math.Sqrt(sizeMiB))
if lockTTL < defaultLockTTL {
lockTTL = defaultLockTTL
}
if lockTTL > maxLockTTL {
lockTTL = maxLockTTL
}
}
// Increase lockTTL by the transaction's read time.
// When resolving a lock, we compare current ts and startTS+lockTTL to decide whether to clean up. If a txn
// takes a long time to read, increasing its TTL will help to prevent it from been aborted soon after prewrite.
elapsed := time.Since(startTime) / time.Millisecond
return lockTTL + uint64(elapsed)
}
// doActionOnKeys groups keys into primary batch and secondary batches, if primary batch exists in the key,
// it does action on primary batch first, then on secondary batches. If action is commit, secondary batches
// is done in background goroutine.
func (c *TxnCommitter) doActionOnKeys(bo *retry.Backoffer, action commitAction, keys [][]byte) error {
if len(keys) == 0 {
return nil
}
groups, firstRegion, err := c.store.GetRegionCache().GroupKeysByRegion(bo, keys)
if err != nil {
return errors.Trace(err)
}
metrics.TxnRegionsNumHistogram.WithLabelValues(action.MetricsTag()).Observe(float64(len(groups)))
var batches []batchKeys
var sizeFunc = c.keySize
if action == actionPrewrite {
sizeFunc = c.keyValueSize
atomic.AddInt32(&c.detail.PrewriteRegionNum, int32(len(groups)))
}
// Make sure the group that contains primary key goes first.
batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, txnCommitBatchSize)
delete(groups, firstRegion)
for id, g := range groups {
batches = appendBatchBySize(batches, id, g, sizeFunc, txnCommitBatchSize)
}
firstIsPrimary := bytes.Equal(keys[0], c.primary())
if firstIsPrimary && (action == actionCommit || action == actionCleanup) {
// primary should be committed/cleanup first
err = c.doActionOnBatches(bo, action, batches[:1])
if err != nil {
return errors.Trace(err)
}
batches = batches[1:]
}
if action == actionCommit {
// Commit secondary batches in background goroutine to reduce latency.
// The backoffer instance is created outside of the goroutine to avoid
// potencial data race in unit test since `CommitMaxBackoff` will be updated
// by test suites.
secondaryBo := retry.NewBackoffer(context.Background(), retry.CommitMaxBackoff)
go func() {
e := c.doActionOnBatches(secondaryBo, action, batches)
if e != nil {
log.Debugf("con:%d 2PC async doActionOnBatches %s err: %v", c.ConnID, action, e)
metrics.SecondaryLockCleanupFailureCounter.WithLabelValues("commit").Inc()
}
}()
} else {
err = c.doActionOnBatches(bo, action, batches)
}
return errors.Trace(err)
}
// doActionOnBatches does action to batches in parallel.
func (c *TxnCommitter) doActionOnBatches(bo *retry.Backoffer, action commitAction, batches []batchKeys) error {
if len(batches) == 0 {
return nil
}
var singleBatchActionFunc func(bo *retry.Backoffer, batch batchKeys) error
switch action {
case actionPrewrite:
singleBatchActionFunc = c.prewriteSingleBatch
case actionCommit:
singleBatchActionFunc = c.commitSingleBatch
case actionCleanup:
singleBatchActionFunc = c.cleanupSingleBatch
}
if len(batches) == 1 {
e := singleBatchActionFunc(bo, batches[0])
if e != nil {
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.ConnID, action, e, c.startTS)
}
return errors.Trace(e)
}
// For prewrite, stop sending other requests after receiving first error.
backoffer := bo
var cancel context.CancelFunc
if action == actionPrewrite {
backoffer, cancel = bo.Fork()
defer cancel()
}
// Concurrently do the work for each batch.
ch := make(chan error, len(batches))
for _, batch1 := range batches {
batch := batch1
go func() {
if action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
// fork a child context and call cancel() while the foreground goroutine exits.
// Otherwise the background goroutines will be canceled exceptionally.
// Here we makes a new clone of the original backoffer for this goroutine
// exclusively to avoid the data race when using the same backoffer
// in concurrent goroutines.
singleBatchBackoffer := backoffer.Clone()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
} else {
singleBatchBackoffer, singleBatchCancel := backoffer.Fork()
defer singleBatchCancel()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
}
}()
}
var err error
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.ConnID, action, e, c.startTS)
// Cancel other requests and return the first error.
if cancel != nil {
log.Debugf("con:%d 2PC doActionOnBatches %s to cancel other actions, tid: %d", c.ConnID, action, c.startTS)
cancel()
}
if err == nil {
err = e
}
}
}
return errors.Trace(err)
}
func (c *TxnCommitter) keyValueSize(key []byte) int {
size := len(key)
if mutation := c.mutations[string(key)]; mutation != nil {
size += len(mutation.Value)
}
return size
}
func (c *TxnCommitter) keySize(key []byte) int {
return len(key)
}
func (c *TxnCommitter) prewriteSingleBatch(bo *retry.Backoffer, batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
mutations[i] = c.mutations[string(k)]
}
req := &rpc.Request{
Type: rpc.CmdPrewrite,
Prewrite: &pb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
LockTtl: c.lockTTL,
},
Context: pb.Context{
Priority: c.Priority,
SyncLog: c.SyncLog,
},
}
for {
resp, err := c.store.SendReq(bo, req, batch.region, rpc.ReadTimeoutShort)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
err = c.prewriteKeys(bo, batch.keys)
return errors.Trace(err)
}
prewriteResp := resp.Prewrite
if prewriteResp == nil {
return errors.Trace(rpc.ErrBodyMissing)
}
keyErrs := prewriteResp.GetErrors()
if len(keyErrs) == 0 {
return nil
}
var locks []*Lock
for _, keyErr := range keyErrs {
// Check already exists error
if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
return errors.Trace(ErrKeyAlreadyExist(alreadyExist.GetKey()))
}
// Extract lock from key error
lock, err1 := extractLockFromKeyErr(keyErr)
if err1 != nil {
return errors.Trace(err1)
}
log.Debugf("con:%d 2PC prewrite encounters lock: %v", c.ConnID, lock)
locks = append(locks, lock)
}
start := time.Now()
ok, err := c.store.GetLockResolver().ResolveLocks(bo, locks)
if err != nil {
return errors.Trace(err)
}
atomic.AddInt64(&c.detail.ResolveLockTime, int64(time.Since(start)))
if !ok {
err = bo.Backoff(retry.BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
return errors.Trace(err)
}
}
}
}
func (c *TxnCommitter) setUndeterminedErr(err error) {
c.mu.Lock()
defer c.mu.Unlock()
c.mu.undeterminedErr = err
}
func (c *TxnCommitter) getUndeterminedErr() error {
c.mu.RLock()
defer c.mu.RUnlock()
return c.mu.undeterminedErr
}
func (c *TxnCommitter) commitSingleBatch(bo *retry.Backoffer, batch batchKeys) error {
req := &rpc.Request{
Type: rpc.CmdCommit,
Commit: &pb.CommitRequest{
StartVersion: c.startTS,
Keys: batch.keys,
CommitVersion: c.commitTS,
},
Context: pb.Context{
Priority: c.Priority,
SyncLog: c.SyncLog,
},
}
req.Context.Priority = c.Priority
sender := rpc.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetRPCClient())
resp, err := sender.SendReq(bo, req, batch.region, rpc.ReadTimeoutShort)
// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
// transaction has been successfully committed.
// Under this circumstance, we can not declare the commit is complete (may lead to data lost), nor can we throw
// an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best
// solution is to populate this error and let upper layer drop the connection to the corresponding mysql client.
isPrimary := bytes.Equal(batch.keys[0], c.primary())
if isPrimary && sender.RPCError() != nil {
c.setUndeterminedErr(errors.Trace(sender.RPCError()))
}
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
// re-split keys and commit again.
err = c.commitKeys(bo, batch.keys)
return errors.Trace(err)
}
commitResp := resp.Commit
if commitResp == nil {
return errors.Trace(rpc.ErrBodyMissing)
}
// Here we can make sure tikv has processed the commit primary key request. So
// we can clean undetermined error.
if isPrimary {
c.setUndeterminedErr(nil)
}
if keyErr := commitResp.GetError(); keyErr != nil {
c.mu.RLock()
defer c.mu.RUnlock()
err = errors.Errorf("con:%d 2PC commit failed: %v", c.ConnID, keyErr.String())
if c.mu.committed {
// No secondary key could be rolled back after it's primary key is committed.
// There must be a serious bug somewhere.
log.Errorf("2PC failed commit key after primary key committed: %v, tid: %d", err, c.startTS)
return errors.Trace(err)
}
// The transaction maybe rolled back by concurrent transactions.
log.Debugf("2PC failed commit primary key: %v, retry later, tid: %d", err, c.startTS)
return errors.Annotate(err, TxnRetryableMark)
}
c.mu.Lock()
defer c.mu.Unlock()
// Group that contains primary key is always the first.
// We mark transaction's status committed when we receive the first success response.
c.mu.committed = true
return nil
}
func (c *TxnCommitter) cleanupSingleBatch(bo *retry.Backoffer, batch batchKeys) error {
req := &rpc.Request{
Type: rpc.CmdBatchRollback,
BatchRollback: &pb.BatchRollbackRequest{
Keys: batch.keys,
StartVersion: c.startTS,
},
Context: pb.Context{
Priority: c.Priority,
SyncLog: c.SyncLog,
},
}
resp, err := c.store.SendReq(bo, req, batch.region, rpc.ReadTimeoutShort)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
err = c.cleanupKeys(bo, batch.keys)
return errors.Trace(err)
}
if keyErr := resp.BatchRollback.GetError(); keyErr != nil {
err = errors.Errorf("con:%d 2PC cleanup failed: %s", c.ConnID, keyErr)
log.Debugf("2PC failed cleanup key: %v, tid: %d", err, c.startTS)
return errors.Trace(err)
}
return nil
}
func (c *TxnCommitter) prewriteKeys(bo *retry.Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionPrewrite, keys)
}
func (c *TxnCommitter) commitKeys(bo *retry.Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionCommit, keys)
}
func (c *TxnCommitter) cleanupKeys(bo *retry.Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionCleanup, keys)
}
// Execute executes the two-phase commit protocol.
func (c *TxnCommitter) Execute(ctx context.Context) error {
defer func() {
// Always clean up all written keys if the txn does not commit.
c.mu.RLock()
committed := c.mu.committed
undetermined := c.mu.undeterminedErr != nil
c.mu.RUnlock()
if !committed && !undetermined {
c.cleanWg.Add(1)
go func() {
err := c.cleanupKeys(retry.NewBackoffer(context.Background(), retry.CleanupMaxBackoff), c.keys)
if err != nil {
metrics.SecondaryLockCleanupFailureCounter.WithLabelValues("rollback").Inc()
log.Infof("con:%d 2PC cleanup err: %v, tid: %d", c.ConnID, err, c.startTS)
} else {
log.Infof("con:%d 2PC clean up done, tid: %d", c.ConnID, c.startTS)
}
c.cleanWg.Done()
}()
}
}()
prewriteBo := retry.NewBackoffer(ctx, retry.PrewriteMaxBackoff)
start := time.Now()
err := c.prewriteKeys(prewriteBo, c.keys)
c.detail.PrewriteTime = time.Since(start)
c.detail.TotalBackoffTime += prewriteBo.TotalSleep()
if err != nil {
log.Debugf("con:%d 2PC failed on prewrite: %v, tid: %d", c.ConnID, err, c.startTS)
return errors.Trace(err)
}
start = time.Now()
commitTS, err := c.store.GetTimestampWithRetry(retry.NewBackoffer(ctx, retry.TsoMaxBackoff))
if err != nil {
log.Warnf("con:%d 2PC get commitTS failed: %v, tid: %d", c.ConnID, err, c.startTS)
return errors.Trace(err)
}
c.detail.GetCommitTsTime = time.Since(start)
// check commitTS
if commitTS <= c.startTS {
err = errors.Errorf("con:%d Invalid transaction tso with start_ts=%v while commit_ts=%v",
c.ConnID, c.startTS, commitTS)
log.Error(err)
return errors.Trace(err)
}
c.commitTS = commitTS
if c.store.GetOracle().IsExpired(c.startTS, c.maxTxnTimeUse) {
err = errors.Errorf("con:%d txn takes too much time, start: %d, commit: %d", c.ConnID, c.startTS, c.commitTS)
return errors.Annotate(err, TxnRetryableMark)
}
start = time.Now()
commitBo := retry.NewBackoffer(ctx, retry.CommitMaxBackoff)
err = c.commitKeys(commitBo, c.keys)
c.detail.CommitTime = time.Since(start)
c.detail.TotalBackoffTime += commitBo.TotalSleep()
if err != nil {
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
log.Warnf("con:%d 2PC commit result undetermined, err: %v, rpcErr: %v, tid: %v", c.ConnID, err, undeterminedErr, c.startTS)
log.Error(err)
err = errors.Trace(ErrResultUndetermined)
}
if !c.mu.committed {
log.Debugf("con:%d 2PC failed on commit: %v, tid: %d", c.ConnID, err, c.startTS)
return errors.Trace(err)
}
log.Debugf("con:%d 2PC succeed with error: %v, tid: %d", c.ConnID, err, c.startTS)
}
return nil
}
// GetKeys returns all keys of the committer.
func (c *TxnCommitter) GetKeys() [][]byte {
return c.keys
}
// GetCommitTS returns the commit timestamp of the transaction.
func (c *TxnCommitter) GetCommitTS() uint64 {
return c.commitTS
}

278
txnkv/txn.go Normal file
View File

@ -0,0 +1,278 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package txnkv
import (
"context"
"fmt"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/prometheus/common/log"
"github.com/tikv/client-go/key"
"github.com/tikv/client-go/metrics"
"github.com/tikv/client-go/txnkv/kv"
"github.com/tikv/client-go/txnkv/store"
)
type Transaction struct {
tikvStore *store.TiKVStore
snapshot *store.TiKVSnapshot
us kv.UnionStore
startTS uint64
startTime time.Time // Monotonic timestamp for recording txn time consuming.
commitTS uint64
valid bool
lockKeys [][]byte
setCnt int64
}
func newTransaction(tikvStore *store.TiKVStore, ts uint64) *Transaction {
snapshot := tikvStore.GetSnapshot(ts)
us := kv.NewUnionStore(snapshot)
return &Transaction{
tikvStore: tikvStore,
snapshot: snapshot,
us: us,
startTS: ts,
startTime: time.Now(),
valid: true,
}
}
// Get implements transaction interface.
func (txn *Transaction) Get(k key.Key) ([]byte, error) {
metrics.TxnCmdCounter.WithLabelValues("get").Inc()
start := time.Now()
defer func() { metrics.TxnCmdHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
ret, err := txn.us.Get(k)
if kv.IsErrNotFound(err) {
return nil, err
}
if err != nil {
return nil, errors.Trace(err)
}
err = txn.tikvStore.CheckVisibility(txn.startTS)
if err != nil {
return nil, errors.Trace(err)
}
return ret, nil
}
func (txn *Transaction) BatchGet(keys []key.Key) (map[string][]byte, error) {
if txn.IsReadOnly() {
return txn.snapshot.BatchGet(keys)
}
bufferValues := make([][]byte, len(keys))
shrinkKeys := make([]key.Key, 0, len(keys))
for i, key := range keys {
val, err := txn.us.GetMemBuffer().Get(key)
if kv.IsErrNotFound(err) {
shrinkKeys = append(shrinkKeys, key)
continue
}
if err != nil {
return nil, errors.Trace(err)
}
if len(val) != 0 {
bufferValues[i] = val
}
}
storageValues, err := txn.snapshot.BatchGet(shrinkKeys)
if err != nil {
return nil, errors.Trace(err)
}
for i, key := range keys {
if bufferValues[i] == nil {
continue
}
storageValues[string(key)] = bufferValues[i]
}
return storageValues, nil
}
func (txn *Transaction) Set(k key.Key, v []byte) error {
txn.setCnt++
return txn.us.Set(k, v)
}
func (txn *Transaction) String() string {
return fmt.Sprintf("txn-%d", txn.startTS)
}
func (txn *Transaction) Iter(k key.Key, upperBound key.Key) (kv.Iterator, error) {
metrics.TxnCmdCounter.WithLabelValues("seek").Inc()
start := time.Now()
defer func() { metrics.TxnCmdHistogram.WithLabelValues("seek").Observe(time.Since(start).Seconds()) }()
return txn.us.Iter(k, upperBound)
}
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
func (txn *Transaction) IterReverse(k key.Key) (kv.Iterator, error) {
metrics.TxnCmdCounter.WithLabelValues("seek_reverse").Inc()
start := time.Now()
defer func() {
metrics.TxnCmdHistogram.WithLabelValues("seek_reverse").Observe(time.Since(start).Seconds())
}()
return txn.us.IterReverse(k)
}
func (txn *Transaction) IsReadOnly() bool {
return txn.us.GetMemBuffer().Len() == 0 && len(txn.lockKeys) == 0
}
func (txn *Transaction) Delete(k key.Key) error {
metrics.TxnCmdCounter.WithLabelValues("delete").Inc()
return txn.us.Delete(k)
}
func (txn *Transaction) SetOption(opt kv.Option, val interface{}) {
txn.us.SetOption(opt, val)
switch opt {
case kv.Priority:
txn.snapshot.SetPriority(val.(int))
case kv.NotFillCache:
txn.snapshot.NotFillCache = val.(bool)
case kv.SyncLog:
txn.snapshot.SyncLog = val.(bool)
case kv.KeyOnly:
txn.snapshot.KeyOnly = val.(bool)
}
}
func (txn *Transaction) DelOption(opt kv.Option) {
txn.us.DelOption(opt)
}
func (txn *Transaction) close() {
txn.valid = false
}
func (txn *Transaction) Commit(ctx context.Context) error {
if !txn.valid {
return kv.ErrInvalidTxn
}
defer txn.close()
// gofail: var mockCommitError bool
// if mockCommitError && kv.IsMockCommitErrorEnable() {
// kv.MockCommitErrorDisable()
// return errors.New("mock commit error")
// }
metrics.TxnCmdCounter.WithLabelValues("set").Add(float64(txn.setCnt))
metrics.TxnCmdCounter.WithLabelValues("commit").Inc()
start := time.Now()
defer func() { metrics.TxnCmdHistogram.WithLabelValues("commit").Observe(time.Since(start).Seconds()) }()
mutations := make(map[string]*kvrpcpb.Mutation)
err := txn.us.WalkBuffer(func(k key.Key, v []byte) error {
op := kvrpcpb.Op_Put
if c := txn.us.LookupConditionPair(k); c != nil && c.ShouldNotExist() {
op = kvrpcpb.Op_Insert
}
mutations[string(k)] = &kvrpcpb.Mutation{
Op: op,
Key: k,
Value: v,
}
return nil
})
if err != nil {
return errors.Trace(err)
}
for _, lockKey := range txn.lockKeys {
if _, ok := mutations[string(lockKey)]; !ok {
mutations[string(lockKey)] = &kvrpcpb.Mutation{
Op: kvrpcpb.Op_Lock,
Key: lockKey,
}
}
}
if len(mutations) == 0 {
return nil
}
committer, err := store.NewTxnCommitter(txn.tikvStore, txn.startTS, txn.startTime, mutations)
if err != nil || committer == nil {
return errors.Trace(err)
}
// latches disabled
if txn.tikvStore.GetTxnLatches() == nil {
err = committer.Execute(ctx)
log.Debug("[kv]", txn.startTS, " txnLatches disabled, 2pc directly:", err)
return errors.Trace(err)
}
// latches enabled
// for transactions which need to acquire latches
start = time.Now()
lock := txn.tikvStore.GetTxnLatches().Lock(txn.startTS, committer.GetKeys())
localLatchTime := time.Since(start)
if localLatchTime > 0 {
metrics.LocalLatchWaitTimeHistogram.Observe(localLatchTime.Seconds())
}
defer txn.tikvStore.GetTxnLatches().UnLock(lock)
if lock.IsStale() {
err = errors.Errorf("startTS %d is stale", txn.startTS)
return errors.Annotate(err, store.TxnRetryableMark)
}
err = committer.Execute(ctx)
if err == nil {
lock.SetCommitTS(committer.GetCommitTS())
}
log.Debug("[kv]", txn.startTS, " txnLatches enabled while txn retryable:", err)
return errors.Trace(err)
}
func (txn *Transaction) Rollback() error {
if !txn.valid {
return kv.ErrInvalidTxn
}
txn.close()
log.Debugf("[kv] Rollback txn %d", txn.startTS)
metrics.TxnCmdCounter.WithLabelValues("rollback").Inc()
return nil
}
func (txn *Transaction) LockKeys(keys ...key.Key) error {
metrics.TxnCmdCounter.WithLabelValues("lock_keys").Inc()
for _, key := range keys {
txn.lockKeys = append(txn.lockKeys, key)
}
return nil
}
func (txn *Transaction) Valid() bool {
return txn.valid
}
func (txn *Transaction) Len() int {
return txn.us.Len()
}
func (txn *Transaction) Size() int {
return txn.us.Size()
}