add mockstore and rawkv tests (#2)

* add mockstore and rawkv tests

Signed-off-by: disksing <i@disksing.com>
This commit is contained in:
disksing 2019-03-08 16:21:50 +08:00 committed by GitHub
parent 3c11273160
commit af8a9cd0b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 4158 additions and 146 deletions

View File

@ -1,2 +1,5 @@
default:
GO111MODULE=on go build ./...
GO111MODULE=on go build ./...
test:
GO111MODULE=on go test ./...

View File

@ -28,11 +28,11 @@ const (
var pads = make([]byte, encGroupSize)
// DecodeBytes decodes a TiDB encoded byte slice.
func DecodeBytes(b []byte) ([]byte, error) {
func DecodeBytes(b []byte) ([]byte, []byte, error) {
buf := make([]byte, 0, len(b)/(encGroupSize+1)*encGroupSize)
for {
if len(b) < encGroupSize+1 {
return nil, errors.New("insufficient bytes to decode value")
return nil, nil, errors.New("insufficient bytes to decode value")
}
groupBytes := b[:encGroupSize+1]
@ -42,7 +42,7 @@ func DecodeBytes(b []byte) ([]byte, error) {
padCount := encMarker - marker
if padCount > encGroupSize {
return nil, errors.Errorf("invalid marker byte, group bytes %q", groupBytes)
return nil, nil, errors.Errorf("invalid marker byte, group bytes %q", groupBytes)
}
realGroupSize := encGroupSize - padCount
@ -52,12 +52,12 @@ func DecodeBytes(b []byte) ([]byte, error) {
if padCount != 0 {
// Check validity of padding bytes.
if !bytes.Equal(group[realGroupSize:], pads[:padCount]) {
return nil, errors.Errorf("invalid padding byte, group bytes %q", groupBytes)
return nil, nil, errors.Errorf("invalid padding byte, group bytes %q", groupBytes)
}
break
}
}
return buf, nil
return b, buf, nil
}
// EncodeBytes encodes a byte slice into TiDB's encoded form.

53
codec/key.go Normal file
View File

@ -0,0 +1,53 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package codec
// Key represents high-level Key type.
type Key []byte
// Next returns the next key in byte-order.
func (k Key) Next() Key {
// add 0x0 to the end of key
buf := make([]byte, len([]byte(k))+1)
copy(buf, []byte(k))
return buf
}
// PrefixNext returns the next prefix key.
//
// Assume there are keys like:
//
// rowkey1
// rowkey1_column1
// rowkey1_column2
// rowKey2
//
// If we seek 'rowkey1' Next, we will get 'rowkey1_column1'.
// If we seek 'rowkey1' PrefixNext, we will get 'rowkey2'.
func (k Key) PrefixNext() Key {
buf := make([]byte, len([]byte(k)))
copy(buf, []byte(k))
var i int
for i = len(k) - 1; i >= 0; i-- {
buf[i]++
if buf[i] != 0 {
break
}
}
if i == -1 {
copy(buf, k)
buf = append(buf, 0)
}
return buf
}

View File

@ -21,14 +21,14 @@ import (
// DecodeRegionMetaKey translates a region meta from encoded form to unencoded form.
func DecodeRegionMetaKey(r *metapb.Region) error {
if len(r.StartKey) != 0 {
decoded, err := DecodeBytes(r.StartKey)
_, decoded, err := DecodeBytes(r.StartKey)
if err != nil {
return errors.Trace(err)
}
r.StartKey = decoded
}
if len(r.EndKey) != 0 {
decoded, err := DecodeBytes(r.EndKey)
_, decoded, err := DecodeBytes(r.EndKey)
if err != nil {
return errors.Trace(err)
}

57
codec/numbers.go Normal file
View File

@ -0,0 +1,57 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package codec
import (
"encoding/binary"
"github.com/pingcap/errors"
)
const signMask uint64 = 0x8000000000000000
// EncodeIntToCmpUint make int v to comparable uint type
func EncodeIntToCmpUint(v int64) uint64 {
return uint64(v) ^ signMask
}
// EncodeInt appends the encoded value to slice b and returns the appended slice.
// EncodeInt guarantees that the encoded value is in ascending order for comparison.
func EncodeInt(b []byte, v int64) []byte {
var data [8]byte
u := EncodeIntToCmpUint(v)
binary.BigEndian.PutUint64(data[:], u)
return append(b, data[:]...)
}
// EncodeUintDesc appends the encoded value to slice b and returns the appended slice.
// EncodeUintDesc guarantees that the encoded value is in descending order for comparison.
func EncodeUintDesc(b []byte, v uint64) []byte {
var data [8]byte
binary.BigEndian.PutUint64(data[:], ^v)
return append(b, data[:]...)
}
// DecodeUintDesc decodes value encoded by EncodeInt before.
// It returns the leftover un-decoded slice, decoded value if no error.
func DecodeUintDesc(b []byte) ([]byte, uint64, error) {
if len(b) < 8 {
return nil, 0, errors.New("insufficient bytes to decode value")
}
data := b[:8]
v := binary.BigEndian.Uint64(data)
b = b[8:]
return b, ^v, nil
}

64
codec/table.go Normal file
View File

@ -0,0 +1,64 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package codec
var (
tablePrefix = []byte{'t'}
recordPrefixSep = []byte("_r")
indexPrefixSep = []byte("_i")
)
const (
idLen = 8
prefixLen = 1 + idLen /*tableID*/ + 2
recordRowKeyLen = prefixLen + idLen /*handle*/
tablePrefixLength = 1
recordPrefixSepLength = 2
)
// appendTableRecordPrefix appends table record prefix "t[tableID]_r".
func appendTableRecordPrefix(buf []byte, tableID int64) []byte {
buf = append(buf, tablePrefix...)
buf = EncodeInt(buf, tableID)
buf = append(buf, recordPrefixSep...)
return buf
}
// GenTableRecordPrefix composes record prefix with tableID: "t[tableID]_r".
func GenTableRecordPrefix(tableID int64) Key {
buf := make([]byte, 0, len(tablePrefix)+8+len(recordPrefixSep))
return appendTableRecordPrefix(buf, tableID)
}
// appendTableIndexPrefix appends table index prefix "t[tableID]_i".
func appendTableIndexPrefix(buf []byte, tableID int64) []byte {
buf = append(buf, tablePrefix...)
buf = EncodeInt(buf, tableID)
buf = append(buf, indexPrefixSep...)
return buf
}
// GenTableIndexPrefix composes index prefix with tableID: "t[tableID]_i".
func GenTableIndexPrefix(tableID int64) Key {
buf := make([]byte, 0, len(tablePrefix)+8+len(indexPrefixSep))
return appendTableIndexPrefix(buf, tableID)
}
// EncodeTableIndexPrefix encodes index prefix with tableID and idxID.
func EncodeTableIndexPrefix(tableID, idxID int64) Key {
key := make([]byte, 0, prefixLen)
key = appendTableIndexPrefix(key, tableID)
key = EncodeInt(key, idxID)
return key
}

51
go.mod
View File

@ -1,54 +1,21 @@
module github.com/tikv/client-go
require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/coreos/bbolt v1.3.1-coreos.6 // indirect
github.com/coreos/etcd v3.3.10+incompatible // indirect
github.com/coreos/go-semver v0.2.0 // indirect
github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142 // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/gogo/protobuf v1.2.0 // indirect
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect
github.com/golang/protobuf v1.2.0
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/mux v1.6.2 // indirect
github.com/gorilla/websocket v1.4.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.6.3 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/montanaflynn/stats v0.0.0-20181214052348-945b007cb92f // indirect
github.com/opentracing/opentracing-go v1.0.2 // indirect
github.com/pingcap/check v0.0.0-20181222140913-41d022e836db // indirect
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errors v0.11.0
github.com/pingcap/gofail v0.0.0-20181115114620-e47081505b9c // indirect
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190121084144-be0b43ee9241
github.com/pingcap/parser v0.0.0-20190118050330-3c9ff121c591
github.com/pingcap/pd v2.1.2+incompatible
github.com/pkg/errors v0.8.0
github.com/prometheus/client_golang v0.9.1
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 // indirect
github.com/prometheus/procfs v0.0.0-20181129180645-aa55a523dc0a // indirect
github.com/sirupsen/logrus v1.2.0
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
github.com/ugorji/go v1.1.1 // indirect
github.com/unrolled/render v0.0.0-20181210145518-4c664cb3ad2f // indirect
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 // indirect
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.9.1 // indirect
golang.org/x/net v0.0.0-20181220203305-927f97764cc3 // indirect
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 // indirect
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect
google.golang.org/genproto v0.0.0-20181221175505-bd9b4fb69e2f // indirect
google.golang.org/grpc v1.17.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.9.2
github.com/sirupsen/logrus v1.3.0
google.golang.org/grpc v1.18.0
)

115
go.sum
View File

@ -1,145 +1,78 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/bbolt v1.3.1-coreos.6 h1:uTXKg9gY70s9jMAKdfljFQcuh4e/BXOM+V+d00KFj3A=
github.com/coreos/bbolt v1.3.1-coreos.6/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible h1:KjVWqrZ5U0wa3CxY2AxlH6/UcB+PK2td1DcsYhA+HRs=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142 h1:3jFq2xL4ZajGK4aZY8jz+DAF0FHjI51BXjjSwCzS1Dk=
github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 h1:clC1lXBpe2kTj2VHdaIu9ajZQe4kcEY9j0NsnDDBZ3o=
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9 h1:EGUd+AQfZoi1OwZAoqekLbl4kq6tafFtKQSiN8nL21Y=
github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff h1:kOkM9whyQYodu09SJ6W3NCsHG7crFaJILQ22Gozp3lg=
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:BWIsLfhgKhV5g/oF34aRjniBHLTZe5DNekSjbAjIS6c=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.6.3 h1:oQ+8y59SMDn8Ita1Sh4f94XCUVp8AB84sppXP8Qgiow=
github.com/grpc-ecosystem/grpc-gateway v1.6.3/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/montanaflynn/stats v0.0.0-20181214052348-945b007cb92f h1:r//C+RGlxxi1gPODiDj/Y/uvv3OaZlZPSFz2SuwIees=
github.com/montanaflynn/stats v0.0.0-20181214052348-945b007cb92f/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pingcap/check v0.0.0-20181222140913-41d022e836db h1:yg93sLvBszRnzcd+Z5gkCUdYgud2scHYYxnRwljvRAM=
github.com/pingcap/check v0.0.0-20181222140913-41d022e836db/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/gofail v0.0.0-20181115114620-e47081505b9c h1:nYlcWGAYxDMdiRLjyhNJB9tMSuGaqu2M/CVd2RJx4QQ=
github.com/pingcap/gofail v0.0.0-20181115114620-e47081505b9c/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190121084144-be0b43ee9241 h1:fzec5WdYKkEtxONmqAw8A67nBoYkawCryamFw5D6oIY=
github.com/pingcap/kvproto v0.0.0-20190121084144-be0b43ee9241/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/parser v0.0.0-20190118050330-3c9ff121c591 h1:JM8Hc82qb34a0/vDc5pWmeX/wShWWoEaiV6oCnENxzg=
github.com/pingcap/parser v0.0.0-20190118050330-3c9ff121c591/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v2.1.2+incompatible h1:VQmYV7B/7ZdPmbDUHcz2jSswTgInrgWhAfF0YuPAlLw=
github.com/pingcap/pd v2.1.2+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1 h1:K47Rk0v/fkEfwfQet2KWhscE0cJzjgCCDBG2KHZoVno=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20181129180645-aa55a523dc0a h1:Z2GBQ7wAiTCixJhSGK4sMO/FHYlvFvUBBK0M0FSsxeU=
github.com/prometheus/procfs v0.0.0-20181129180645-aa55a523dc0a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 h1:lYIiVDtZnyTWlNwiAxLj0bbpTcx1BWCFhXjfsvmPdNc=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.1 h1:gmervu+jDMvXTbcHQ0pd2wee85nEoE0BsVyEuzkfK8w=
github.com/ugorji/go v1.1.1/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
github.com/unrolled/render v0.0.0-20181210145518-4c664cb3ad2f h1:+feYJlxPM00jEkdybexHiwIIOVuClwTEbh1WLiNr0mk=
github.com/unrolled/render v0.0.0-20181210145518-4c664cb3ad2f/go.mod h1:tu82oB5W2ykJRVioYsB+IQKcft7ryBr7w12qMBUPyXg=
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 h1:MPPkRncZLN9Kh4MEFmbnK4h3BD7AUmskWv2+EeZJCCs=
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3 h1:eH6Eip3UpmR+yM/qI9Ijluzb1bNv/cAU/n+6l8tRSis=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc h1:a3CU5tJYVj92DY2LaA1kUkrsqD5/3mLDhx2NcNqyW+0=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 h1:IcgEB62HYgAhX0Nd/QrVgZlxlcyxbGQHElLUhW2X4Fo=
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20181221175505-bd9b4fb69e2f h1:eT3B0O2ghdSPzjAOznr3oOLyN1HFeYUncYl7FRwg4VI=
google.golang.org/genproto v0.0.0-20181221175505-bd9b4fb69e2f/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg=
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
google.golang.org/grpc v1.17.0 h1:TRJYBgMclJvGYn2rIMjj+h9KtMt5r1Ij7ODVRIZkwhk=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
google.golang.org/grpc v1.18.0 h1:IZl7mfBGfbhYx2p2rKRtYgDFw6SBz+kclmxYrCksPPA=
google.golang.org/grpc v1.18.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -0,0 +1,549 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package mocktikv
import (
"bytes"
"context"
"math"
"sync"
"github.com/golang/protobuf/proto"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/client-go/codec"
)
// Cluster simulates a TiKV cluster. It focuses on management and the change of
// meta data. A Cluster mainly includes following 3 kinds of meta data:
// 1) Region: A Region is a fragment of TiKV's data whose range is [start, end).
// The data of a Region is duplicated to multiple Peers and distributed in
// multiple Stores.
// 2) Peer: A Peer is a replica of a Region's data. All peers of a Region form
// a group, each group elects a Leader to provide services.
// 3) Store: A Store is a storage/service node. Try to think it as a TiKV server
// process. Only the store with request's Region's leader Peer could respond
// to client's request.
type Cluster struct {
sync.RWMutex
id uint64
stores map[uint64]*Store
regions map[uint64]*Region
}
// NewCluster creates an empty cluster. It needs to be bootstrapped before
// providing service.
func NewCluster() *Cluster {
return &Cluster{
stores: make(map[uint64]*Store),
regions: make(map[uint64]*Region),
}
}
// AllocID creates an unique ID in cluster. The ID could be used as either
// StoreID, RegionID, or PeerID.
func (c *Cluster) AllocID() uint64 {
c.Lock()
defer c.Unlock()
return c.allocID()
}
// AllocIDs creates multiple IDs.
func (c *Cluster) AllocIDs(n int) []uint64 {
c.Lock()
defer c.Unlock()
var ids []uint64
for len(ids) < n {
ids = append(ids, c.allocID())
}
return ids
}
func (c *Cluster) allocID() uint64 {
c.id++
return c.id
}
// GetAllRegions gets all the regions in the cluster.
func (c *Cluster) GetAllRegions() []*Region {
c.Lock()
defer c.Unlock()
regions := make([]*Region, 0, len(c.regions))
for _, region := range c.regions {
regions = append(regions, region)
}
return regions
}
// GetStore returns a Store's meta.
func (c *Cluster) GetStore(storeID uint64) *metapb.Store {
c.RLock()
defer c.RUnlock()
if store := c.stores[storeID]; store != nil {
return proto.Clone(store.meta).(*metapb.Store)
}
return nil
}
// StopStore stops a store with storeID.
func (c *Cluster) StopStore(storeID uint64) {
c.Lock()
defer c.Unlock()
if store := c.stores[storeID]; store != nil {
store.meta.State = metapb.StoreState_Offline
}
}
// StartStore starts a store with storeID.
func (c *Cluster) StartStore(storeID uint64) {
c.Lock()
defer c.Unlock()
if store := c.stores[storeID]; store != nil {
store.meta.State = metapb.StoreState_Up
}
}
// CancelStore makes the store with cancel state true.
func (c *Cluster) CancelStore(storeID uint64) {
c.Lock()
defer c.Unlock()
//A store returns context.Cancelled Error when cancel is true.
if store := c.stores[storeID]; store != nil {
store.cancel = true
}
}
// UnCancelStore makes the store with cancel state false.
func (c *Cluster) UnCancelStore(storeID uint64) {
c.Lock()
defer c.Unlock()
if store := c.stores[storeID]; store != nil {
store.cancel = false
}
}
// GetStoreByAddr returns a Store's meta by an addr.
func (c *Cluster) GetStoreByAddr(addr string) *metapb.Store {
c.RLock()
defer c.RUnlock()
for _, s := range c.stores {
if s.meta.GetAddress() == addr {
return proto.Clone(s.meta).(*metapb.Store)
}
}
return nil
}
// GetAndCheckStoreByAddr checks and returns a Store's meta by an addr
func (c *Cluster) GetAndCheckStoreByAddr(addr string) (*metapb.Store, error) {
c.RLock()
defer c.RUnlock()
for _, s := range c.stores {
if s.cancel {
return nil, context.Canceled
}
if s.meta.GetAddress() == addr {
return proto.Clone(s.meta).(*metapb.Store), nil
}
}
return nil, nil
}
// AddStore adds a new Store to the cluster.
func (c *Cluster) AddStore(storeID uint64, addr string) {
c.Lock()
defer c.Unlock()
c.stores[storeID] = newStore(storeID, addr)
}
// RemoveStore removes a Store from the cluster.
func (c *Cluster) RemoveStore(storeID uint64) {
c.Lock()
defer c.Unlock()
delete(c.stores, storeID)
}
// UpdateStoreAddr updates store address for cluster.
func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string) {
c.Lock()
defer c.Unlock()
c.stores[storeID] = newStore(storeID, addr)
}
// GetRegion returns a Region's meta and leader ID.
func (c *Cluster) GetRegion(regionID uint64) (*metapb.Region, uint64) {
c.RLock()
defer c.RUnlock()
r, ok := c.regions[regionID]
if !ok {
return nil, 0
}
return proto.Clone(r.Meta).(*metapb.Region), r.leader
}
// GetRegionByKey returns the Region and its leader whose range contains the key.
func (c *Cluster) GetRegionByKey(key []byte) (*metapb.Region, *metapb.Peer) {
c.RLock()
defer c.RUnlock()
for _, r := range c.regions {
if regionContains(r.Meta.StartKey, r.Meta.EndKey, key) {
return proto.Clone(r.Meta).(*metapb.Region), proto.Clone(r.leaderPeer()).(*metapb.Peer)
}
}
return nil, nil
}
// GetPrevRegionByKey returns the previous Region and its leader whose range contains the key.
func (c *Cluster) GetPrevRegionByKey(key []byte) (*metapb.Region, *metapb.Peer) {
c.RLock()
defer c.RUnlock()
currentRegion, _ := c.GetRegionByKey(key)
if len(currentRegion.StartKey) == 0 {
return nil, nil
}
for _, r := range c.regions {
if bytes.Equal(r.Meta.EndKey, currentRegion.StartKey) {
return proto.Clone(r.Meta).(*metapb.Region), proto.Clone(r.leaderPeer()).(*metapb.Peer)
}
}
return nil, nil
}
// GetRegionByID returns the Region and its leader whose ID is regionID.
func (c *Cluster) GetRegionByID(regionID uint64) (*metapb.Region, *metapb.Peer) {
c.RLock()
defer c.RUnlock()
for _, r := range c.regions {
if r.Meta.GetId() == regionID {
return proto.Clone(r.Meta).(*metapb.Region), proto.Clone(r.leaderPeer()).(*metapb.Peer)
}
}
return nil, nil
}
// Bootstrap creates the first Region. The Stores should be in the Cluster before
// bootstrap.
func (c *Cluster) Bootstrap(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) {
c.Lock()
defer c.Unlock()
c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID)
}
// AddPeer adds a new Peer for the Region on the Store.
func (c *Cluster) AddPeer(regionID, storeID, peerID uint64) {
c.Lock()
defer c.Unlock()
c.regions[regionID].addPeer(peerID, storeID)
}
// RemovePeer removes the Peer from the Region. Note that if the Peer is leader,
// the Region will have no leader before calling ChangeLeader().
func (c *Cluster) RemovePeer(regionID, storeID uint64) {
c.Lock()
defer c.Unlock()
c.regions[regionID].removePeer(storeID)
}
// ChangeLeader sets the Region's leader Peer. Caller should guarantee the Peer
// exists.
func (c *Cluster) ChangeLeader(regionID, leaderPeerID uint64) {
c.Lock()
defer c.Unlock()
c.regions[regionID].changeLeader(leaderPeerID)
}
// GiveUpLeader sets the Region's leader to 0. The Region will have no leader
// before calling ChangeLeader().
func (c *Cluster) GiveUpLeader(regionID uint64) {
c.ChangeLeader(regionID, 0)
}
// Split splits a Region at the key (encoded) and creates new Region.
func (c *Cluster) Split(regionID, newRegionID uint64, key []byte, peerIDs []uint64, leaderPeerID uint64) {
c.SplitRaw(regionID, newRegionID, NewMvccKey(key), peerIDs, leaderPeerID)
}
// SplitRaw splits a Region at the key (not encoded) and creates new Region.
func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) {
c.Lock()
defer c.Unlock()
newRegion := c.regions[regionID].split(newRegionID, rawKey, peerIDs, leaderPeerID)
c.regions[newRegionID] = newRegion
}
// Merge merges 2 regions, their key ranges should be adjacent.
func (c *Cluster) Merge(regionID1, regionID2 uint64) {
c.Lock()
defer c.Unlock()
c.regions[regionID1].merge(c.regions[regionID2].Meta.GetEndKey())
delete(c.regions, regionID2)
}
// SplitTable evenly splits the data in table into count regions.
// Only works for single store.
func (c *Cluster) SplitTable(mvccStore MVCCStore, tableID int64, count int) {
tableStart := codec.GenTableRecordPrefix(tableID)
tableEnd := tableStart.PrefixNext()
c.splitRange(mvccStore, NewMvccKey(tableStart), NewMvccKey(tableEnd), count)
}
// SplitIndex evenly splits the data in index into count regions.
// Only works for single store.
func (c *Cluster) SplitIndex(mvccStore MVCCStore, tableID, indexID int64, count int) {
indexStart := codec.EncodeTableIndexPrefix(tableID, indexID)
indexEnd := indexStart.PrefixNext()
c.splitRange(mvccStore, NewMvccKey(indexStart), NewMvccKey(indexEnd), count)
}
func (c *Cluster) splitRange(mvccStore MVCCStore, start, end MvccKey, count int) {
c.Lock()
defer c.Unlock()
c.evacuateOldRegionRanges(start, end)
regionPairs := c.getEntriesGroupByRegions(mvccStore, start, end, count)
c.createNewRegions(regionPairs, start, end)
}
// getPairsGroupByRegions groups the key value pairs into splitted regions.
func (c *Cluster) getEntriesGroupByRegions(mvccStore MVCCStore, start, end MvccKey, count int) [][]Pair {
startTS := uint64(math.MaxUint64)
limit := int(math.MaxInt32)
pairs := mvccStore.Scan(start.Raw(), end.Raw(), limit, startTS, kvrpcpb.IsolationLevel_SI)
regionEntriesSlice := make([][]Pair, 0, count)
quotient := len(pairs) / count
remainder := len(pairs) % count
i := 0
for i < len(pairs) {
regionEntryCount := quotient
if remainder > 0 {
remainder--
regionEntryCount++
}
regionEntries := pairs[i : i+regionEntryCount]
regionEntriesSlice = append(regionEntriesSlice, regionEntries)
i += regionEntryCount
}
return regionEntriesSlice
}
func (c *Cluster) createNewRegions(regionPairs [][]Pair, start, end MvccKey) {
for i := range regionPairs {
peerID := c.allocID()
newRegion := newRegion(c.allocID(), []uint64{c.firstStoreID()}, []uint64{peerID}, peerID)
var regionStartKey, regionEndKey MvccKey
if i == 0 {
regionStartKey = start
} else {
regionStartKey = NewMvccKey(regionPairs[i][0].Key)
}
if i == len(regionPairs)-1 {
regionEndKey = end
} else {
// Use the next region's first key as region end key.
regionEndKey = NewMvccKey(regionPairs[i+1][0].Key)
}
newRegion.updateKeyRange(regionStartKey, regionEndKey)
c.regions[newRegion.Meta.Id] = newRegion
}
}
// evacuateOldRegionRanges evacuate the range [start, end].
// Old regions has intersection with [start, end) will be updated or deleted.
func (c *Cluster) evacuateOldRegionRanges(start, end MvccKey) {
oldRegions := c.getRegionsCoverRange(start, end)
for _, oldRegion := range oldRegions {
startCmp := bytes.Compare(oldRegion.Meta.StartKey, start)
endCmp := bytes.Compare(oldRegion.Meta.EndKey, end)
if len(oldRegion.Meta.EndKey) == 0 {
endCmp = 1
}
if startCmp >= 0 && endCmp <= 0 {
// The region is within table data, it will be replaced by new regions.
delete(c.regions, oldRegion.Meta.Id)
} else if startCmp < 0 && endCmp > 0 {
// A single Region covers table data, split into two regions that do not overlap table data.
oldEnd := oldRegion.Meta.EndKey
oldRegion.updateKeyRange(oldRegion.Meta.StartKey, start)
peerID := c.allocID()
newRegion := newRegion(c.allocID(), []uint64{c.firstStoreID()}, []uint64{peerID}, peerID)
newRegion.updateKeyRange(end, oldEnd)
c.regions[newRegion.Meta.Id] = newRegion
} else if startCmp < 0 {
oldRegion.updateKeyRange(oldRegion.Meta.StartKey, start)
} else {
oldRegion.updateKeyRange(end, oldRegion.Meta.EndKey)
}
}
}
func (c *Cluster) firstStoreID() uint64 {
for id := range c.stores {
return id
}
return 0
}
// getRegionsCoverRange gets regions in the cluster that has intersection with [start, end).
func (c *Cluster) getRegionsCoverRange(start, end MvccKey) []*Region {
var regions []*Region
for _, region := range c.regions {
onRight := bytes.Compare(end, region.Meta.StartKey) <= 0
onLeft := bytes.Compare(region.Meta.EndKey, start) <= 0
if len(region.Meta.EndKey) == 0 {
onLeft = false
}
if onLeft || onRight {
continue
}
regions = append(regions, region)
}
return regions
}
// Region is the Region meta data.
type Region struct {
Meta *metapb.Region
leader uint64
}
func newPeerMeta(peerID, storeID uint64) *metapb.Peer {
return &metapb.Peer{
Id: peerID,
StoreId: storeID,
}
}
func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) *Region {
if len(storeIDs) != len(peerIDs) {
panic("length of storeIDs and peerIDs mismatch")
}
peers := make([]*metapb.Peer, 0, len(storeIDs))
for i := range storeIDs {
peers = append(peers, newPeerMeta(peerIDs[i], storeIDs[i]))
}
meta := &metapb.Region{
Id: regionID,
Peers: peers,
}
return &Region{
Meta: meta,
leader: leaderPeerID,
}
}
func (r *Region) addPeer(peerID, storeID uint64) {
r.Meta.Peers = append(r.Meta.Peers, newPeerMeta(peerID, storeID))
r.incConfVer()
}
func (r *Region) removePeer(peerID uint64) {
for i, peer := range r.Meta.Peers {
if peer.GetId() == peerID {
r.Meta.Peers = append(r.Meta.Peers[:i], r.Meta.Peers[i+1:]...)
break
}
}
if r.leader == peerID {
r.leader = 0
}
r.incConfVer()
}
func (r *Region) changeLeader(leaderID uint64) {
r.leader = leaderID
}
func (r *Region) leaderPeer() *metapb.Peer {
for _, p := range r.Meta.Peers {
if p.GetId() == r.leader {
return p
}
}
return nil
}
func (r *Region) split(newRegionID uint64, key MvccKey, peerIDs []uint64, leaderPeerID uint64) *Region {
if len(r.Meta.Peers) != len(peerIDs) {
panic("length of storeIDs and peerIDs mismatch")
}
storeIDs := make([]uint64, 0, len(r.Meta.Peers))
for _, peer := range r.Meta.Peers {
storeIDs = append(storeIDs, peer.GetStoreId())
}
region := newRegion(newRegionID, storeIDs, peerIDs, leaderPeerID)
region.updateKeyRange(key, r.Meta.EndKey)
r.updateKeyRange(r.Meta.StartKey, key)
return region
}
func (r *Region) merge(endKey MvccKey) {
r.Meta.EndKey = endKey
r.incVersion()
}
func (r *Region) updateKeyRange(start, end MvccKey) {
r.Meta.StartKey = start
r.Meta.EndKey = end
r.incVersion()
}
func (r *Region) incConfVer() {
r.Meta.RegionEpoch = &metapb.RegionEpoch{
ConfVer: r.Meta.GetRegionEpoch().GetConfVer() + 1,
Version: r.Meta.GetRegionEpoch().GetVersion(),
}
}
func (r *Region) incVersion() {
r.Meta.RegionEpoch = &metapb.RegionEpoch{
ConfVer: r.Meta.GetRegionEpoch().GetConfVer(),
Version: r.Meta.GetRegionEpoch().GetVersion() + 1,
}
}
// Store is the Store's meta data.
type Store struct {
meta *metapb.Store
cancel bool // return context.Cancelled error when cancel is true.
}
func newStore(storeID uint64, addr string) *Store {
return &Store{
meta: &metapb.Store{
Id: storeID,
Address: addr,
},
}
}

View File

@ -0,0 +1,51 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package mocktikv
import "fmt"
// BootstrapWithSingleStore initializes a Cluster with 1 Region and 1 Store.
func BootstrapWithSingleStore(cluster *Cluster) (storeID, peerID, regionID uint64) {
ids := cluster.AllocIDs(3)
storeID, peerID, regionID = ids[0], ids[1], ids[2]
cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID))
cluster.Bootstrap(regionID, []uint64{storeID}, []uint64{peerID}, peerID)
return
}
// BootstrapWithMultiStores initializes a Cluster with 1 Region and n Stores.
func BootstrapWithMultiStores(cluster *Cluster, n int) (storeIDs, peerIDs []uint64, regionID uint64, leaderPeer uint64) {
storeIDs = cluster.AllocIDs(n)
peerIDs = cluster.AllocIDs(n)
leaderPeer = peerIDs[0]
regionID = cluster.AllocID()
for _, storeID := range storeIDs {
cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID))
}
cluster.Bootstrap(regionID, storeIDs, peerIDs, leaderPeer)
return
}
// BootstrapWithMultiRegions initializes a Cluster with multiple Regions and 1
// Store. The number of Regions will be len(splitKeys) + 1.
func BootstrapWithMultiRegions(cluster *Cluster, splitKeys ...[]byte) (storeID uint64, regionIDs, peerIDs []uint64) {
var firstRegionID, firstPeerID uint64
storeID, firstPeerID, firstRegionID = BootstrapWithSingleStore(cluster)
regionIDs = append([]uint64{firstRegionID}, cluster.AllocIDs(len(splitKeys))...)
peerIDs = append([]uint64{firstPeerID}, cluster.AllocIDs(len(splitKeys))...)
for i, k := range splitKeys {
cluster.Split(regionIDs[i], regionIDs[i+1], k, []uint64{peerIDs[i]}, peerIDs[i])
}
return
}

View File

@ -0,0 +1,52 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package mocktikv
import "fmt"
// ErrLocked is returned when trying to Read/Write on a locked key. Client should
// backoff or cleanup the lock then retry.
type ErrLocked struct {
Key MvccKey
Primary []byte
StartTS uint64
TTL uint64
}
// Error formats the lock to a string.
func (e *ErrLocked) Error() string {
return fmt.Sprintf("key is locked, key: %q, primary: %q, startTS: %v", e.Key, e.Primary, e.StartTS)
}
// ErrRetryable suggests that client may restart the txn. e.g. write conflict.
type ErrRetryable string
func (e ErrRetryable) Error() string {
return fmt.Sprintf("retryable: %s", string(e))
}
// ErrAbort means something is wrong and client should abort the txn.
type ErrAbort string
func (e ErrAbort) Error() string {
return fmt.Sprintf("abort: %s", string(e))
}
// ErrAlreadyCommitted is returned specially when client tries to rollback a
// committed lock.
type ErrAlreadyCommitted uint64
func (e ErrAlreadyCommitted) Error() string {
return fmt.Sprint("txn already committed")
}

View File

@ -0,0 +1,37 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package mocktikv
import (
"github.com/pingcap/errors"
"github.com/pingcap/pd/client"
)
// NewTiKVAndPDClient creates a TiKV client and PD client from options.
func NewTiKVAndPDClient(cluster *Cluster, mvccStore MVCCStore, path string) (*RPCClient, pd.Client, error) {
if cluster == nil {
cluster = NewCluster()
BootstrapWithSingleStore(cluster)
}
if mvccStore == nil {
var err error
mvccStore, err = NewMVCCLevelDB(path)
if err != nil {
return nil, nil, errors.Trace(err)
}
}
return NewRPCClient(cluster, mvccStore), NewPDClient(cluster), nil
}

View File

@ -0,0 +1,566 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package mocktikv
import (
"math"
"strings"
"testing"
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
)
func TestT(t *testing.T) {
TestingT(t)
}
// testMockTiKVSuite tests MVCCStore interface.
// SetUpTest should set specific MVCCStore implementation.
type testMockTiKVSuite struct {
store MVCCStore
}
type testMarshal struct{}
// testMVCCLevelDB is used to test MVCCLevelDB implementation.
type testMVCCLevelDB struct {
testMockTiKVSuite
}
var (
_ = Suite(&testMVCCLevelDB{})
_ = Suite(testMarshal{})
)
func (s *testMockTiKVSuite) SetUpTest(c *C) {
var err error
s.store, err = NewMVCCLevelDB("")
c.Assert(err, IsNil)
}
func putMutations(kvpairs ...string) []*kvrpcpb.Mutation {
var mutations []*kvrpcpb.Mutation
for i := 0; i < len(kvpairs); i += 2 {
mutations = append(mutations, &kvrpcpb.Mutation{
Op: kvrpcpb.Op_Put,
Key: []byte(kvpairs[i]),
Value: []byte(kvpairs[i+1]),
})
}
return mutations
}
func lock(key, primary string, ts uint64) *kvrpcpb.LockInfo {
return &kvrpcpb.LockInfo{
Key: []byte(key),
PrimaryLock: []byte(primary),
LockVersion: ts,
}
}
func (s *testMockTiKVSuite) mustGetNone(c *C, key string, ts uint64) {
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI)
c.Assert(err, IsNil)
c.Assert(val, IsNil)
}
func (s *testMockTiKVSuite) mustGetErr(c *C, key string, ts uint64) {
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI)
c.Assert(err, NotNil)
c.Assert(val, IsNil)
}
func (s *testMockTiKVSuite) mustGetOK(c *C, key string, ts uint64, expect string) {
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_SI)
c.Assert(err, IsNil)
c.Assert(string(val), Equals, expect)
}
func (s *testMockTiKVSuite) mustGetRC(c *C, key string, ts uint64, expect string) {
val, err := s.store.Get([]byte(key), ts, kvrpcpb.IsolationLevel_RC)
c.Assert(err, IsNil)
c.Assert(string(val), Equals, expect)
}
func (s *testMockTiKVSuite) mustPutOK(c *C, key, value string, startTS, commitTS uint64) {
errs := s.store.Prewrite(putMutations(key, value), []byte(key), startTS, 0)
for _, err := range errs {
c.Assert(err, IsNil)
}
err := s.store.Commit([][]byte{[]byte(key)}, startTS, commitTS)
c.Assert(err, IsNil)
}
func (s *testMockTiKVSuite) mustDeleteOK(c *C, key string, startTS, commitTS uint64) {
mutations := []*kvrpcpb.Mutation{
{
Op: kvrpcpb.Op_Del,
Key: []byte(key),
},
}
errs := s.store.Prewrite(mutations, []byte(key), startTS, 0)
for _, err := range errs {
c.Assert(err, IsNil)
}
err := s.store.Commit([][]byte{[]byte(key)}, startTS, commitTS)
c.Assert(err, IsNil)
}
func (s *testMockTiKVSuite) mustScanOK(c *C, start string, limit int, ts uint64, expect ...string) {
s.mustRangeScanOK(c, start, "", limit, ts, expect...)
}
func (s *testMockTiKVSuite) mustRangeScanOK(c *C, start, end string, limit int, ts uint64, expect ...string) {
pairs := s.store.Scan([]byte(start), []byte(end), limit, ts, kvrpcpb.IsolationLevel_SI)
c.Assert(len(pairs)*2, Equals, len(expect))
for i := 0; i < len(pairs); i++ {
c.Assert(pairs[i].Err, IsNil)
c.Assert(pairs[i].Key, BytesEquals, []byte(expect[i*2]))
c.Assert(string(pairs[i].Value), Equals, expect[i*2+1])
}
}
func (s *testMockTiKVSuite) mustReverseScanOK(c *C, end string, limit int, ts uint64, expect ...string) {
s.mustRangeReverseScanOK(c, "", end, limit, ts, expect...)
}
func (s *testMockTiKVSuite) mustRangeReverseScanOK(c *C, start, end string, limit int, ts uint64, expect ...string) {
pairs := s.store.ReverseScan([]byte(start), []byte(end), limit, ts, kvrpcpb.IsolationLevel_SI)
c.Assert(len(pairs)*2, Equals, len(expect))
for i := 0; i < len(pairs); i++ {
c.Assert(pairs[i].Err, IsNil)
c.Assert(pairs[i].Key, BytesEquals, []byte(expect[i*2]))
c.Assert(string(pairs[i].Value), Equals, expect[i*2+1])
}
}
func (s *testMockTiKVSuite) mustPrewriteOK(c *C, mutations []*kvrpcpb.Mutation, primary string, startTS uint64) {
errs := s.store.Prewrite(mutations, []byte(primary), startTS, 0)
for _, err := range errs {
c.Assert(err, IsNil)
}
}
func (s *testMockTiKVSuite) mustCommitOK(c *C, keys [][]byte, startTS, commitTS uint64) {
err := s.store.Commit(keys, startTS, commitTS)
c.Assert(err, IsNil)
}
func (s *testMockTiKVSuite) mustCommitErr(c *C, keys [][]byte, startTS, commitTS uint64) {
err := s.store.Commit(keys, startTS, commitTS)
c.Assert(err, NotNil)
}
func (s *testMockTiKVSuite) mustRollbackOK(c *C, keys [][]byte, startTS uint64) {
err := s.store.Rollback(keys, startTS)
c.Assert(err, IsNil)
}
func (s *testMockTiKVSuite) mustRollbackErr(c *C, keys [][]byte, startTS uint64) {
err := s.store.Rollback(keys, startTS)
c.Assert(err, NotNil)
}
func (s *testMockTiKVSuite) mustScanLock(c *C, maxTs uint64, expect []*kvrpcpb.LockInfo) {
locks, err := s.store.ScanLock(nil, nil, maxTs)
c.Assert(err, IsNil)
c.Assert(locks, DeepEquals, expect)
}
func (s *testMockTiKVSuite) mustResolveLock(c *C, startTS, commitTS uint64) {
c.Assert(s.store.ResolveLock(nil, nil, startTS, commitTS), IsNil)
}
func (s *testMockTiKVSuite) mustBatchResolveLock(c *C, txnInfos map[uint64]uint64) {
c.Assert(s.store.BatchResolveLock(nil, nil, txnInfos), IsNil)
}
func (s *testMockTiKVSuite) mustDeleteRange(c *C, startKey, endKey string) {
err := s.store.DeleteRange([]byte(startKey), []byte(endKey))
c.Assert(err, IsNil)
}
func (s *testMockTiKVSuite) TestGet(c *C) {
s.mustGetNone(c, "x", 10)
s.mustPutOK(c, "x", "x", 5, 10)
s.mustGetNone(c, "x", 9)
s.mustGetOK(c, "x", 10, "x")
s.mustGetOK(c, "x", 11, "x")
}
func (s *testMockTiKVSuite) TestGetWithLock(c *C) {
key := "key"
value := "value"
s.mustPutOK(c, key, value, 5, 10)
mutations := []*kvrpcpb.Mutation{{
Op: kvrpcpb.Op_Lock,
Key: []byte(key),
},
}
// test with lock's type is lock
s.mustPrewriteOK(c, mutations, key, 20)
s.mustGetOK(c, key, 25, value)
s.mustCommitOK(c, [][]byte{[]byte(key)}, 20, 30)
// test get with lock's max ts and primary key
s.mustPrewriteOK(c, putMutations(key, "value2", "key2", "v5"), key, 40)
s.mustGetErr(c, key, 41)
s.mustGetErr(c, "key2", math.MaxUint64)
s.mustGetOK(c, key, math.MaxUint64, "value")
}
func (s *testMockTiKVSuite) TestDelete(c *C) {
s.mustPutOK(c, "x", "x5-10", 5, 10)
s.mustDeleteOK(c, "x", 15, 20)
s.mustGetNone(c, "x", 5)
s.mustGetNone(c, "x", 9)
s.mustGetOK(c, "x", 10, "x5-10")
s.mustGetOK(c, "x", 19, "x5-10")
s.mustGetNone(c, "x", 20)
s.mustGetNone(c, "x", 21)
}
func (s *testMockTiKVSuite) TestCleanupRollback(c *C) {
s.mustPutOK(c, "secondary", "s-0", 1, 2)
s.mustPrewriteOK(c, putMutations("primary", "p-5", "secondary", "s-5"), "primary", 5)
s.mustGetErr(c, "secondary", 8)
s.mustGetErr(c, "secondary", 12)
s.mustCommitOK(c, [][]byte{[]byte("primary")}, 5, 10)
s.mustRollbackErr(c, [][]byte{[]byte("primary")}, 5)
}
func (s *testMockTiKVSuite) TestReverseScan(c *C) {
// ver10: A(10) - B(_) - C(10) - D(_) - E(10)
s.mustPutOK(c, "A", "A10", 5, 10)
s.mustPutOK(c, "C", "C10", 5, 10)
s.mustPutOK(c, "E", "E10", 5, 10)
checkV10 := func() {
s.mustReverseScanOK(c, "Z", 0, 10)
s.mustReverseScanOK(c, "Z", 1, 10, "E", "E10")
s.mustReverseScanOK(c, "Z", 2, 10, "E", "E10", "C", "C10")
s.mustReverseScanOK(c, "Z", 3, 10, "E", "E10", "C", "C10", "A", "A10")
s.mustReverseScanOK(c, "Z", 4, 10, "E", "E10", "C", "C10", "A", "A10")
s.mustReverseScanOK(c, "E\x00", 3, 10, "E", "E10", "C", "C10", "A", "A10")
s.mustReverseScanOK(c, "C\x00", 3, 10, "C", "C10", "A", "A10")
s.mustReverseScanOK(c, "C\x00", 4, 10, "C", "C10", "A", "A10")
s.mustReverseScanOK(c, "B", 1, 10, "A", "A10")
s.mustRangeReverseScanOK(c, "", "E", 5, 10, "C", "C10", "A", "A10")
s.mustRangeReverseScanOK(c, "", "C\x00", 5, 10, "C", "C10", "A", "A10")
s.mustRangeReverseScanOK(c, "A\x00", "C", 5, 10)
}
checkV10()
// ver20: A(10) - B(20) - C(10) - D(20) - E(10)
s.mustPutOK(c, "B", "B20", 15, 20)
s.mustPutOK(c, "D", "D20", 15, 20)
checkV20 := func() {
s.mustReverseScanOK(c, "Z", 5, 20, "E", "E10", "D", "D20", "C", "C10", "B", "B20", "A", "A10")
s.mustReverseScanOK(c, "C\x00", 5, 20, "C", "C10", "B", "B20", "A", "A10")
s.mustReverseScanOK(c, "A\x00", 1, 20, "A", "A10")
s.mustRangeReverseScanOK(c, "B", "D", 5, 20, "C", "C10", "B", "B20")
s.mustRangeReverseScanOK(c, "B", "D\x00", 5, 20, "D", "D20", "C", "C10", "B", "B20")
s.mustRangeReverseScanOK(c, "B\x00", "D\x00", 5, 20, "D", "D20", "C", "C10")
}
checkV10()
checkV20()
// ver30: A(_) - B(20) - C(10) - D(_) - E(10)
s.mustDeleteOK(c, "A", 25, 30)
s.mustDeleteOK(c, "D", 25, 30)
checkV30 := func() {
s.mustReverseScanOK(c, "Z", 5, 30, "E", "E10", "C", "C10", "B", "B20")
s.mustReverseScanOK(c, "C", 1, 30, "B", "B20")
s.mustReverseScanOK(c, "C\x00", 5, 30, "C", "C10", "B", "B20")
}
checkV10()
checkV20()
checkV30()
// ver40: A(_) - B(_) - C(40) - D(40) - E(10)
s.mustDeleteOK(c, "B", 35, 40)
s.mustPutOK(c, "C", "C40", 35, 40)
s.mustPutOK(c, "D", "D40", 35, 40)
checkV40 := func() {
s.mustReverseScanOK(c, "Z", 5, 40, "E", "E10", "D", "D40", "C", "C40")
s.mustReverseScanOK(c, "Z", 5, 100, "E", "E10", "D", "D40", "C", "C40")
}
checkV10()
checkV20()
checkV30()
checkV40()
}
func (s *testMockTiKVSuite) TestScan(c *C) {
// ver10: A(10) - B(_) - C(10) - D(_) - E(10)
s.mustPutOK(c, "A", "A10", 5, 10)
s.mustPutOK(c, "C", "C10", 5, 10)
s.mustPutOK(c, "E", "E10", 5, 10)
checkV10 := func() {
s.mustScanOK(c, "", 0, 10)
s.mustScanOK(c, "", 1, 10, "A", "A10")
s.mustScanOK(c, "", 2, 10, "A", "A10", "C", "C10")
s.mustScanOK(c, "", 3, 10, "A", "A10", "C", "C10", "E", "E10")
s.mustScanOK(c, "", 4, 10, "A", "A10", "C", "C10", "E", "E10")
s.mustScanOK(c, "A", 3, 10, "A", "A10", "C", "C10", "E", "E10")
s.mustScanOK(c, "A\x00", 3, 10, "C", "C10", "E", "E10")
s.mustScanOK(c, "C", 4, 10, "C", "C10", "E", "E10")
s.mustScanOK(c, "F", 1, 10)
s.mustRangeScanOK(c, "", "E", 5, 10, "A", "A10", "C", "C10")
s.mustRangeScanOK(c, "", "C\x00", 5, 10, "A", "A10", "C", "C10")
s.mustRangeScanOK(c, "A\x00", "C", 5, 10)
}
checkV10()
// ver20: A(10) - B(20) - C(10) - D(20) - E(10)
s.mustPutOK(c, "B", "B20", 15, 20)
s.mustPutOK(c, "D", "D20", 15, 20)
checkV20 := func() {
s.mustScanOK(c, "", 5, 20, "A", "A10", "B", "B20", "C", "C10", "D", "D20", "E", "E10")
s.mustScanOK(c, "C", 5, 20, "C", "C10", "D", "D20", "E", "E10")
s.mustScanOK(c, "D\x00", 1, 20, "E", "E10")
s.mustRangeScanOK(c, "B", "D", 5, 20, "B", "B20", "C", "C10")
s.mustRangeScanOK(c, "B", "D\x00", 5, 20, "B", "B20", "C", "C10", "D", "D20")
s.mustRangeScanOK(c, "B\x00", "D\x00", 5, 20, "C", "C10", "D", "D20")
}
checkV10()
checkV20()
// ver30: A(_) - B(20) - C(10) - D(_) - E(10)
s.mustDeleteOK(c, "A", 25, 30)
s.mustDeleteOK(c, "D", 25, 30)
checkV30 := func() {
s.mustScanOK(c, "", 5, 30, "B", "B20", "C", "C10", "E", "E10")
s.mustScanOK(c, "A", 1, 30, "B", "B20")
s.mustScanOK(c, "C\x00", 5, 30, "E", "E10")
}
checkV10()
checkV20()
checkV30()
// ver40: A(_) - B(_) - C(40) - D(40) - E(10)
s.mustDeleteOK(c, "B", 35, 40)
s.mustPutOK(c, "C", "C40", 35, 40)
s.mustPutOK(c, "D", "D40", 35, 40)
checkV40 := func() {
s.mustScanOK(c, "", 5, 40, "C", "C40", "D", "D40", "E", "E10")
s.mustScanOK(c, "", 5, 100, "C", "C40", "D", "D40", "E", "E10")
}
checkV10()
checkV20()
checkV30()
checkV40()
}
func (s *testMockTiKVSuite) TestBatchGet(c *C) {
s.mustPutOK(c, "k1", "v1", 1, 2)
s.mustPutOK(c, "k2", "v2", 1, 2)
s.mustPutOK(c, "k2", "v2", 3, 4)
s.mustPutOK(c, "k3", "v3", 1, 2)
batchKeys := [][]byte{[]byte("k1"), []byte("k2"), []byte("k3")}
pairs := s.store.BatchGet(batchKeys, 5, kvrpcpb.IsolationLevel_SI)
for _, pair := range pairs {
c.Assert(pair.Err, IsNil)
}
c.Assert(string(pairs[0].Value), Equals, "v1")
c.Assert(string(pairs[1].Value), Equals, "v2")
c.Assert(string(pairs[2].Value), Equals, "v3")
}
func (s *testMockTiKVSuite) TestScanLock(c *C) {
s.mustPutOK(c, "k1", "v1", 1, 2)
s.mustPrewriteOK(c, putMutations("p1", "v5", "s1", "v5"), "p1", 5)
s.mustPrewriteOK(c, putMutations("p2", "v10", "s2", "v10"), "p2", 10)
s.mustPrewriteOK(c, putMutations("p3", "v20", "s3", "v20"), "p3", 20)
locks, err := s.store.ScanLock([]byte("a"), []byte("r"), 12)
c.Assert(err, IsNil)
c.Assert(locks, DeepEquals, []*kvrpcpb.LockInfo{
lock("p1", "p1", 5),
lock("p2", "p2", 10),
})
s.mustScanLock(c, 10, []*kvrpcpb.LockInfo{
lock("p1", "p1", 5),
lock("p2", "p2", 10),
lock("s1", "p1", 5),
lock("s2", "p2", 10),
})
}
func (s *testMockTiKVSuite) TestCommitConflict(c *C) {
// txn A want set x to A
// txn B want set x to B
// A prewrite.
s.mustPrewriteOK(c, putMutations("x", "A"), "x", 5)
// B prewrite and find A's lock.
errs := s.store.Prewrite(putMutations("x", "B"), []byte("x"), 10, 0)
c.Assert(errs[0], NotNil)
// B find rollback A because A exist too long.
s.mustRollbackOK(c, [][]byte{[]byte("x")}, 5)
// if A commit here, it would find its lock removed, report error txn not found.
s.mustCommitErr(c, [][]byte{[]byte("x")}, 5, 10)
// B prewrite itself after it rollback A.
s.mustPrewriteOK(c, putMutations("x", "B"), "x", 10)
// if A commit here, it would find its lock replaced by others and commit fail.
s.mustCommitErr(c, [][]byte{[]byte("x")}, 5, 20)
// B commit success.
s.mustCommitOK(c, [][]byte{[]byte("x")}, 10, 20)
// if B commit again, it will success because the key already committed.
s.mustCommitOK(c, [][]byte{[]byte("x")}, 10, 20)
}
func (s *testMockTiKVSuite) TestResolveLock(c *C) {
s.mustPrewriteOK(c, putMutations("p1", "v5", "s1", "v5"), "p1", 5)
s.mustPrewriteOK(c, putMutations("p2", "v10", "s2", "v10"), "p2", 10)
s.mustResolveLock(c, 5, 0)
s.mustResolveLock(c, 10, 20)
s.mustGetNone(c, "p1", 20)
s.mustGetNone(c, "s1", 30)
s.mustGetOK(c, "p2", 20, "v10")
s.mustGetOK(c, "s2", 30, "v10")
s.mustScanLock(c, 30, nil)
}
func (s *testMockTiKVSuite) TestBatchResolveLock(c *C) {
s.mustPrewriteOK(c, putMutations("p1", "v11", "s1", "v11"), "p1", 11)
s.mustPrewriteOK(c, putMutations("p2", "v12", "s2", "v12"), "p2", 12)
s.mustPrewriteOK(c, putMutations("p3", "v13"), "p3", 13)
s.mustPrewriteOK(c, putMutations("p4", "v14", "s3", "v14", "s4", "v14"), "p4", 14)
s.mustPrewriteOK(c, putMutations("p5", "v15", "s5", "v15"), "p5", 15)
txnInfos := map[uint64]uint64{
11: 0,
12: 22,
13: 0,
14: 24,
}
s.mustBatchResolveLock(c, txnInfos)
s.mustGetNone(c, "p1", 20)
s.mustGetNone(c, "p3", 30)
s.mustGetOK(c, "p2", 30, "v12")
s.mustGetOK(c, "s4", 30, "v14")
s.mustScanLock(c, 30, []*kvrpcpb.LockInfo{
lock("p5", "p5", 15),
lock("s5", "p5", 15),
})
txnInfos = map[uint64]uint64{
15: 0,
}
s.mustBatchResolveLock(c, txnInfos)
s.mustScanLock(c, 30, nil)
}
func (s *testMockTiKVSuite) TestRollbackAndWriteConflict(c *C) {
s.mustPutOK(c, "test", "test", 1, 3)
errs := s.store.Prewrite(putMutations("lock", "lock", "test", "test1"), []byte("test"), 2, 2)
s.mustWriteWriteConflict(c, errs, 1)
s.mustPutOK(c, "test", "test2", 5, 8)
// simulate `getTxnStatus` for txn 2.
err := s.store.Cleanup([]byte("test"), 2)
c.Assert(err, IsNil)
errs = s.store.Prewrite(putMutations("test", "test3"), []byte("test"), 6, 1)
s.mustWriteWriteConflict(c, errs, 0)
}
func (s *testMockTiKVSuite) TestDeleteRange(c *C) {
for i := 1; i <= 5; i++ {
key := string(byte(i) + byte('0'))
value := "v" + key
s.mustPutOK(c, key, value, uint64(1+2*i), uint64(2+2*i))
}
s.mustScanOK(c, "0", 10, 20, "1", "v1", "2", "v2", "3", "v3", "4", "v4", "5", "v5")
s.mustDeleteRange(c, "2", "4")
s.mustScanOK(c, "0", 10, 30, "1", "v1", "4", "v4", "5", "v5")
s.mustDeleteRange(c, "5", "5")
s.mustScanOK(c, "0", 10, 40, "1", "v1", "4", "v4", "5", "v5")
s.mustDeleteRange(c, "41", "42")
s.mustScanOK(c, "0", 10, 50, "1", "v1", "4", "v4", "5", "v5")
s.mustDeleteRange(c, "4\x00", "5\x00")
s.mustScanOK(c, "0", 10, 60, "1", "v1", "4", "v4")
s.mustDeleteRange(c, "0", "9")
s.mustScanOK(c, "0", 10, 70)
}
func (s *testMockTiKVSuite) mustWriteWriteConflict(c *C, errs []error, i int) {
c.Assert(errs[i], NotNil)
c.Assert(strings.Contains(errs[i].Error(), "write conflict"), IsTrue)
}
func (s *testMockTiKVSuite) TestRC(c *C) {
s.mustPutOK(c, "key", "v1", 5, 10)
s.mustPrewriteOK(c, putMutations("key", "v2"), "key", 15)
s.mustGetErr(c, "key", 20)
s.mustGetRC(c, "key", 12, "v1")
s.mustGetRC(c, "key", 20, "v1")
}
func (s testMarshal) TestMarshalmvccLock(c *C) {
l := mvccLock{
startTS: 47,
primary: []byte{'a', 'b', 'c'},
value: []byte{'d', 'e'},
op: kvrpcpb.Op_Put,
ttl: 444,
}
bin, err := l.MarshalBinary()
c.Assert(err, IsNil)
var l1 mvccLock
err = l1.UnmarshalBinary(bin)
c.Assert(err, IsNil)
c.Assert(l.startTS, Equals, l1.startTS)
c.Assert(l.op, Equals, l1.op)
c.Assert(l.ttl, Equals, l1.ttl)
c.Assert(string(l.primary), Equals, string(l1.primary))
c.Assert(string(l.value), Equals, string(l1.value))
}
func (s testMarshal) TestMarshalmvccValue(c *C) {
v := mvccValue{
valueType: typePut,
startTS: 42,
commitTS: 55,
value: []byte{'d', 'e'},
}
bin, err := v.MarshalBinary()
c.Assert(err, IsNil)
var v1 mvccValue
err = v1.UnmarshalBinary(bin)
c.Assert(err, IsNil)
c.Assert(v.valueType, Equals, v1.valueType)
c.Assert(v.startTS, Equals, v1.startTS)
c.Assert(v.commitTS, Equals, v1.commitTS)
c.Assert(string(v.value), Equals, string(v.value))
}

489
mockstore/mocktikv/mvcc.go Normal file
View File

@ -0,0 +1,489 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package mocktikv
import (
"bytes"
"encoding/binary"
"io"
"math"
"sort"
"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/tikv/client-go/codec"
)
type mvccValueType int
const (
typePut mvccValueType = iota
typeDelete
typeRollback
)
type mvccValue struct {
valueType mvccValueType
startTS uint64
commitTS uint64
value []byte
}
type mvccLock struct {
startTS uint64
primary []byte
value []byte
op kvrpcpb.Op
ttl uint64
}
type mvccEntry struct {
key MvccKey
values []mvccValue
lock *mvccLock
}
// MarshalBinary implements encoding.BinaryMarshaler interface.
func (l *mvccLock) MarshalBinary() ([]byte, error) {
var (
mh marshalHelper
buf bytes.Buffer
)
mh.WriteNumber(&buf, l.startTS)
mh.WriteSlice(&buf, l.primary)
mh.WriteSlice(&buf, l.value)
mh.WriteNumber(&buf, l.op)
mh.WriteNumber(&buf, l.ttl)
return buf.Bytes(), errors.Trace(mh.err)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler interface.
func (l *mvccLock) UnmarshalBinary(data []byte) error {
var mh marshalHelper
buf := bytes.NewBuffer(data)
mh.ReadNumber(buf, &l.startTS)
mh.ReadSlice(buf, &l.primary)
mh.ReadSlice(buf, &l.value)
mh.ReadNumber(buf, &l.op)
mh.ReadNumber(buf, &l.ttl)
return errors.Trace(mh.err)
}
// MarshalBinary implements encoding.BinaryMarshaler interface.
func (v mvccValue) MarshalBinary() ([]byte, error) {
var (
mh marshalHelper
buf bytes.Buffer
)
mh.WriteNumber(&buf, int64(v.valueType))
mh.WriteNumber(&buf, v.startTS)
mh.WriteNumber(&buf, v.commitTS)
mh.WriteSlice(&buf, v.value)
return buf.Bytes(), errors.Trace(mh.err)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler interface.
func (v *mvccValue) UnmarshalBinary(data []byte) error {
var mh marshalHelper
buf := bytes.NewBuffer(data)
var vt int64
mh.ReadNumber(buf, &vt)
v.valueType = mvccValueType(vt)
mh.ReadNumber(buf, &v.startTS)
mh.ReadNumber(buf, &v.commitTS)
mh.ReadSlice(buf, &v.value)
return errors.Trace(mh.err)
}
type marshalHelper struct {
err error
}
func (mh *marshalHelper) WriteSlice(buf io.Writer, slice []byte) {
if mh.err != nil {
return
}
var tmp [binary.MaxVarintLen64]byte
off := binary.PutUvarint(tmp[:], uint64(len(slice)))
if err := writeFull(buf, tmp[:off]); err != nil {
mh.err = errors.Trace(err)
return
}
if err := writeFull(buf, slice); err != nil {
mh.err = errors.Trace(err)
}
}
func (mh *marshalHelper) WriteNumber(buf io.Writer, n interface{}) {
if mh.err != nil {
return
}
err := binary.Write(buf, binary.LittleEndian, n)
if err != nil {
mh.err = errors.Trace(err)
}
}
func writeFull(w io.Writer, slice []byte) error {
written := 0
for written < len(slice) {
n, err := w.Write(slice[written:])
if err != nil {
return errors.Trace(err)
}
written += n
}
return nil
}
func (mh *marshalHelper) ReadNumber(r io.Reader, n interface{}) {
if mh.err != nil {
return
}
err := binary.Read(r, binary.LittleEndian, n)
if err != nil {
mh.err = errors.Trace(err)
}
}
func (mh *marshalHelper) ReadSlice(r *bytes.Buffer, slice *[]byte) {
if mh.err != nil {
return
}
sz, err := binary.ReadUvarint(r)
if err != nil {
mh.err = errors.Trace(err)
return
}
const c10M = 10 * 1024 * 1024
if sz > c10M {
mh.err = errors.New("too large slice, maybe something wrong")
return
}
data := make([]byte, sz)
if _, err := io.ReadFull(r, data); err != nil {
mh.err = errors.Trace(err)
return
}
*slice = data
}
func newEntry(key MvccKey) *mvccEntry {
return &mvccEntry{
key: key,
}
}
// lockErr returns ErrLocked.
// Note that parameter key is raw key, while key in ErrLocked is mvcc key.
func (l *mvccLock) lockErr(key []byte) error {
return &ErrLocked{
Key: mvccEncode(key, lockVer),
Primary: l.primary,
StartTS: l.startTS,
TTL: l.ttl,
}
}
func (l *mvccLock) check(ts uint64, key []byte) (uint64, error) {
// ignore when ts is older than lock or lock's type is Lock.
if l.startTS > ts || l.op == kvrpcpb.Op_Lock {
return ts, nil
}
// for point get latest version.
if ts == math.MaxUint64 && bytes.Equal(l.primary, key) {
return l.startTS - 1, nil
}
return 0, l.lockErr(key)
}
func (e *mvccEntry) Clone() *mvccEntry {
var entry mvccEntry
entry.key = append([]byte(nil), e.key...)
for _, v := range e.values {
entry.values = append(entry.values, mvccValue{
valueType: v.valueType,
startTS: v.startTS,
commitTS: v.commitTS,
value: append([]byte(nil), v.value...),
})
}
if e.lock != nil {
entry.lock = &mvccLock{
startTS: e.lock.startTS,
primary: append([]byte(nil), e.lock.primary...),
value: append([]byte(nil), e.lock.value...),
op: e.lock.op,
ttl: e.lock.ttl,
}
}
return &entry
}
func (e *mvccEntry) Less(than btree.Item) bool {
return bytes.Compare(e.key, than.(*mvccEntry).key) < 0
}
func (e *mvccEntry) Get(ts uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, error) {
if isoLevel == kvrpcpb.IsolationLevel_SI && e.lock != nil {
var err error
ts, err = e.lock.check(ts, e.key.Raw())
if err != nil {
return nil, err
}
}
for _, v := range e.values {
if v.commitTS <= ts && v.valueType != typeRollback {
return v.value, nil
}
}
return nil, nil
}
func (e *mvccEntry) Prewrite(mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64) error {
if len(e.values) > 0 {
if e.values[0].commitTS >= startTS {
return ErrRetryable("write conflict")
}
}
if e.lock != nil {
if e.lock.startTS != startTS {
return e.lock.lockErr(e.key.Raw())
}
return nil
}
e.lock = &mvccLock{
startTS: startTS,
primary: primary,
value: mutation.Value,
op: mutation.GetOp(),
ttl: ttl,
}
return nil
}
func (e *mvccEntry) getTxnCommitInfo(startTS uint64) *mvccValue {
for _, v := range e.values {
if v.startTS == startTS {
return &v
}
}
return nil
}
func (e *mvccEntry) Commit(startTS, commitTS uint64) error {
if e.lock == nil || e.lock.startTS != startTS {
if c := e.getTxnCommitInfo(startTS); c != nil && c.valueType != typeRollback {
return nil
}
return ErrRetryable("txn not found")
}
if e.lock.op != kvrpcpb.Op_Lock {
var valueType mvccValueType
if e.lock.op == kvrpcpb.Op_Put {
valueType = typePut
} else {
valueType = typeDelete
}
e.addValue(mvccValue{
valueType: valueType,
startTS: startTS,
commitTS: commitTS,
value: e.lock.value,
})
}
e.lock = nil
return nil
}
func (e *mvccEntry) Rollback(startTS uint64) error {
// If current transaction's lock exist.
if e.lock != nil && e.lock.startTS == startTS {
e.lock = nil
e.addValue(mvccValue{
valueType: typeRollback,
startTS: startTS,
commitTS: startTS,
})
return nil
}
// If current transaction's lock not exist.
// If commit info of current transaction exist.
if c := e.getTxnCommitInfo(startTS); c != nil {
// If current transaction is already committed.
if c.valueType != typeRollback {
return ErrAlreadyCommitted(c.commitTS)
}
// If current transaction is already rollback.
return nil
}
// If current transaction is not prewritted before.
e.addValue(mvccValue{
valueType: typeRollback,
startTS: startTS,
commitTS: startTS,
})
return nil
}
func (e *mvccEntry) addValue(v mvccValue) {
i := sort.Search(len(e.values), func(i int) bool { return e.values[i].commitTS <= v.commitTS })
if i >= len(e.values) {
e.values = append(e.values, v)
} else {
e.values = append(e.values[:i+1], e.values[i:]...)
e.values[i] = v
}
}
func (e *mvccEntry) containsStartTS(startTS uint64) bool {
if e.lock != nil && e.lock.startTS == startTS {
return true
}
for _, item := range e.values {
if item.startTS == startTS {
return true
}
if item.commitTS < startTS {
return false
}
}
return false
}
func (e *mvccEntry) dumpMvccInfo() *kvrpcpb.MvccInfo {
info := &kvrpcpb.MvccInfo{}
if e.lock != nil {
info.Lock = &kvrpcpb.MvccLock{
Type: e.lock.op,
StartTs: e.lock.startTS,
Primary: e.lock.primary,
ShortValue: e.lock.value,
}
}
info.Writes = make([]*kvrpcpb.MvccWrite, len(e.values))
info.Values = make([]*kvrpcpb.MvccValue, len(e.values))
for id, item := range e.values {
var tp kvrpcpb.Op
switch item.valueType {
case typePut:
tp = kvrpcpb.Op_Put
case typeDelete:
tp = kvrpcpb.Op_Del
case typeRollback:
tp = kvrpcpb.Op_Rollback
}
info.Writes[id] = &kvrpcpb.MvccWrite{
Type: tp,
StartTs: item.startTS,
CommitTs: item.commitTS,
}
info.Values[id] = &kvrpcpb.MvccValue{
Value: item.value,
StartTs: item.startTS,
}
}
return info
}
type rawEntry struct {
key []byte
value []byte
}
func newRawEntry(key []byte) *rawEntry {
return &rawEntry{
key: key,
}
}
func (e *rawEntry) Less(than btree.Item) bool {
return bytes.Compare(e.key, than.(*rawEntry).key) < 0
}
// MVCCStore is a mvcc key-value storage.
type MVCCStore interface {
Get(key []byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, error)
Scan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, startTS uint64, ttl uint64) []error
Commit(keys [][]byte, startTS, commitTS uint64) error
Rollback(keys [][]byte, startTS uint64) error
Cleanup(key []byte, startTS uint64) error
ScanLock(startKey, endKey []byte, maxTS uint64) ([]*kvrpcpb.LockInfo, error)
ResolveLock(startKey, endKey []byte, startTS, commitTS uint64) error
BatchResolveLock(startKey, endKey []byte, txnInfos map[uint64]uint64) error
DeleteRange(startKey, endKey []byte) error
Close() error
}
// RawKV is a key-value storage. MVCCStore can be implemented upon it with timestamp encoded into key.
type RawKV interface {
RawGet(key []byte) []byte
RawBatchGet(keys [][]byte) [][]byte
RawScan(startKey, endKey []byte, limit int) []Pair
RawPut(key, value []byte)
RawBatchPut(keys, values [][]byte)
RawDelete(key []byte)
RawBatchDelete(keys [][]byte)
RawDeleteRange(startKey, endKey []byte)
}
// MVCCDebugger is for debugging.
type MVCCDebugger interface {
MvccGetByStartTS(startKey, endKey []byte, starTS uint64) (*kvrpcpb.MvccInfo, []byte)
MvccGetByKey(key []byte) *kvrpcpb.MvccInfo
}
// Pair is a KV pair read from MvccStore or an error if any occurs.
type Pair struct {
Key []byte
Value []byte
Err error
}
func regionContains(startKey []byte, endKey []byte, key []byte) bool {
return bytes.Compare(startKey, key) <= 0 &&
(bytes.Compare(key, endKey) < 0 || len(endKey) == 0)
}
// MvccKey is the encoded key type.
// On TiKV, keys are encoded before they are saved into storage engine.
type MvccKey []byte
// NewMvccKey encodes a key into MvccKey.
func NewMvccKey(key []byte) MvccKey {
if len(key) == 0 {
return nil
}
return codec.EncodeBytes(key)
}
// Raw decodes a MvccKey to original key.
func (key MvccKey) Raw() []byte {
if len(key) == 0 {
return nil
}
_, k, err := codec.DecodeBytes(key)
if err != nil {
panic(err)
}
return k
}

File diff suppressed because it is too large Load Diff

110
mockstore/mocktikv/pd.go Normal file
View File

@ -0,0 +1,110 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package mocktikv
import (
"context"
"sync"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/client"
)
// Use global variables to prevent pdClients from creating duplicate timestamps.
var tsMu = struct {
sync.Mutex
physicalTS int64
logicalTS int64
}{}
type pdClient struct {
cluster *Cluster
}
// NewPDClient creates a mock pd.Client that uses local timestamp and meta data
// from a Cluster.
func NewPDClient(cluster *Cluster) pd.Client {
return &pdClient{
cluster: cluster,
}
}
func (c *pdClient) GetClusterID(ctx context.Context) uint64 {
return 1
}
func (c *pdClient) GetTS(context.Context) (int64, int64, error) {
tsMu.Lock()
defer tsMu.Unlock()
ts := time.Now().UnixNano() / int64(time.Millisecond)
if tsMu.physicalTS >= ts {
tsMu.logicalTS++
} else {
tsMu.physicalTS = ts
tsMu.logicalTS = 0
}
return tsMu.physicalTS, tsMu.logicalTS, nil
}
func (c *pdClient) GetTSAsync(ctx context.Context) pd.TSFuture {
return &mockTSFuture{c, ctx}
}
type mockTSFuture struct {
pdc *pdClient
ctx context.Context
}
func (m *mockTSFuture) Wait() (int64, int64, error) {
return m.pdc.GetTS(m.ctx)
}
func (c *pdClient) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
region, peer := c.cluster.GetRegionByKey(key)
return region, peer, nil
}
func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
region, peer := c.cluster.GetPrevRegionByKey(key)
return region, peer, nil
}
func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) {
region, peer := c.cluster.GetRegionByID(regionID)
return region, peer, nil
}
func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
store := c.cluster.GetStore(storeID)
return store, nil
}
func (c *pdClient) GetAllStores(ctx context.Context) ([]*metapb.Store, error) {
panic(errors.New("unimplemented"))
}
func (c *pdClient) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint64, error) {
panic("unimplemented")
}
func (c *pdClient) Close() {
}

808
mockstore/mocktikv/rpc.go Normal file
View File

@ -0,0 +1,808 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package mocktikv
import (
"bytes"
"context"
"io"
"time"
"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/terror"
"github.com/tikv/client-go/rpc"
)
// For gofail injection.
var undeterminedErr = terror.ErrResultUndetermined
const requestMaxSize = 8 * 1024 * 1024
func checkGoContext(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
func convertToKeyError(err error) *kvrpcpb.KeyError {
if locked, ok := errors.Cause(err).(*ErrLocked); ok {
return &kvrpcpb.KeyError{
Locked: &kvrpcpb.LockInfo{
Key: locked.Key.Raw(),
PrimaryLock: locked.Primary,
LockVersion: locked.StartTS,
LockTtl: locked.TTL,
},
}
}
if retryable, ok := errors.Cause(err).(ErrRetryable); ok {
return &kvrpcpb.KeyError{
Retryable: retryable.Error(),
}
}
return &kvrpcpb.KeyError{
Abort: err.Error(),
}
}
func convertToKeyErrors(errs []error) []*kvrpcpb.KeyError {
var keyErrors = make([]*kvrpcpb.KeyError, 0)
for _, err := range errs {
if err != nil {
keyErrors = append(keyErrors, convertToKeyError(err))
}
}
return keyErrors
}
func convertToPbPairs(pairs []Pair) []*kvrpcpb.KvPair {
kvPairs := make([]*kvrpcpb.KvPair, 0, len(pairs))
for _, p := range pairs {
var kvPair *kvrpcpb.KvPair
if p.Err == nil {
kvPair = &kvrpcpb.KvPair{
Key: p.Key,
Value: p.Value,
}
} else {
kvPair = &kvrpcpb.KvPair{
Error: convertToKeyError(p.Err),
}
}
kvPairs = append(kvPairs, kvPair)
}
return kvPairs
}
// rpcHandler mocks tikv's side handler behavior. In general, you may assume
// TiKV just translate the logic from Go to Rust.
type rpcHandler struct {
cluster *Cluster
mvccStore MVCCStore
// store id for current request
storeID uint64
// Used for handling normal request.
startKey []byte
endKey []byte
// Used for handling coprocessor request.
rawStartKey []byte
rawEndKey []byte
// Used for current request.
isolationLevel kvrpcpb.IsolationLevel
}
func (h *rpcHandler) checkRequestContext(ctx *kvrpcpb.Context) *errorpb.Error {
ctxPeer := ctx.GetPeer()
if ctxPeer != nil && ctxPeer.GetStoreId() != h.storeID {
return &errorpb.Error{
Message: *proto.String("store not match"),
StoreNotMatch: &errorpb.StoreNotMatch{},
}
}
region, leaderID := h.cluster.GetRegion(ctx.GetRegionId())
// No region found.
if region == nil {
return &errorpb.Error{
Message: *proto.String("region not found"),
RegionNotFound: &errorpb.RegionNotFound{
RegionId: *proto.Uint64(ctx.GetRegionId()),
},
}
}
var storePeer, leaderPeer *metapb.Peer
for _, p := range region.Peers {
if p.GetStoreId() == h.storeID {
storePeer = p
}
if p.GetId() == leaderID {
leaderPeer = p
}
}
// The Store does not contain a Peer of the Region.
if storePeer == nil {
return &errorpb.Error{
Message: *proto.String("region not found"),
RegionNotFound: &errorpb.RegionNotFound{
RegionId: *proto.Uint64(ctx.GetRegionId()),
},
}
}
// No leader.
if leaderPeer == nil {
return &errorpb.Error{
Message: *proto.String("no leader"),
NotLeader: &errorpb.NotLeader{
RegionId: *proto.Uint64(ctx.GetRegionId()),
},
}
}
// The Peer on the Store is not leader.
if storePeer.GetId() != leaderPeer.GetId() {
return &errorpb.Error{
Message: *proto.String("not leader"),
NotLeader: &errorpb.NotLeader{
RegionId: *proto.Uint64(ctx.GetRegionId()),
Leader: leaderPeer,
},
}
}
// Region epoch does not match.
if !proto.Equal(region.GetRegionEpoch(), ctx.GetRegionEpoch()) {
nextRegion, _ := h.cluster.GetRegionByKey(region.GetEndKey())
newRegions := []*metapb.Region{region}
if nextRegion != nil {
newRegions = append(newRegions, nextRegion)
}
return &errorpb.Error{
Message: *proto.String("stale epoch"),
StaleEpoch: &errorpb.StaleEpoch{
NewRegions: newRegions,
},
}
}
h.startKey, h.endKey = region.StartKey, region.EndKey
h.isolationLevel = ctx.IsolationLevel
return nil
}
func (h *rpcHandler) checkRequestSize(size int) *errorpb.Error {
// TiKV has a limitation on raft log size.
// mocktikv has no raft inside, so we check the request's size instead.
if size >= requestMaxSize {
return &errorpb.Error{
RaftEntryTooLarge: &errorpb.RaftEntryTooLarge{},
}
}
return nil
}
func (h *rpcHandler) checkRequest(ctx *kvrpcpb.Context, size int) *errorpb.Error {
if err := h.checkRequestContext(ctx); err != nil {
return err
}
return h.checkRequestSize(size)
}
func (h *rpcHandler) checkKeyInRegion(key []byte) bool {
return regionContains(h.startKey, h.endKey, []byte(NewMvccKey(key)))
}
func (h *rpcHandler) handleKvGet(req *kvrpcpb.GetRequest) *kvrpcpb.GetResponse {
if !h.checkKeyInRegion(req.Key) {
panic("KvGet: key not in region")
}
val, err := h.mvccStore.Get(req.Key, req.GetVersion(), h.isolationLevel)
if err != nil {
return &kvrpcpb.GetResponse{
Error: convertToKeyError(err),
}
}
return &kvrpcpb.GetResponse{
Value: val,
}
}
func (h *rpcHandler) handleKvScan(req *kvrpcpb.ScanRequest) *kvrpcpb.ScanResponse {
if !h.checkKeyInRegion(req.GetStartKey()) {
panic("KvScan: startKey not in region")
}
endKey := h.endKey
if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(req.EndKey, endKey) < 0) {
endKey = req.EndKey
}
pairs := h.mvccStore.Scan(req.GetStartKey(), endKey, int(req.GetLimit()), req.GetVersion(), h.isolationLevel)
return &kvrpcpb.ScanResponse{
Pairs: convertToPbPairs(pairs),
}
}
func (h *rpcHandler) handleKvPrewrite(req *kvrpcpb.PrewriteRequest) *kvrpcpb.PrewriteResponse {
for _, m := range req.Mutations {
if !h.checkKeyInRegion(m.Key) {
panic("KvPrewrite: key not in region")
}
}
errs := h.mvccStore.Prewrite(req.Mutations, req.PrimaryLock, req.GetStartVersion(), req.GetLockTtl())
return &kvrpcpb.PrewriteResponse{
Errors: convertToKeyErrors(errs),
}
}
func (h *rpcHandler) handleKvCommit(req *kvrpcpb.CommitRequest) *kvrpcpb.CommitResponse {
for _, k := range req.Keys {
if !h.checkKeyInRegion(k) {
panic("KvCommit: key not in region")
}
}
var resp kvrpcpb.CommitResponse
err := h.mvccStore.Commit(req.Keys, req.GetStartVersion(), req.GetCommitVersion())
if err != nil {
resp.Error = convertToKeyError(err)
}
return &resp
}
func (h *rpcHandler) handleKvCleanup(req *kvrpcpb.CleanupRequest) *kvrpcpb.CleanupResponse {
if !h.checkKeyInRegion(req.Key) {
panic("KvCleanup: key not in region")
}
var resp kvrpcpb.CleanupResponse
err := h.mvccStore.Cleanup(req.Key, req.GetStartVersion())
if err != nil {
if commitTS, ok := errors.Cause(err).(ErrAlreadyCommitted); ok {
resp.CommitVersion = uint64(commitTS)
} else {
resp.Error = convertToKeyError(err)
}
}
return &resp
}
func (h *rpcHandler) handleKvBatchGet(req *kvrpcpb.BatchGetRequest) *kvrpcpb.BatchGetResponse {
for _, k := range req.Keys {
if !h.checkKeyInRegion(k) {
panic("KvBatchGet: key not in region")
}
}
pairs := h.mvccStore.BatchGet(req.Keys, req.GetVersion(), h.isolationLevel)
return &kvrpcpb.BatchGetResponse{
Pairs: convertToPbPairs(pairs),
}
}
func (h *rpcHandler) handleMvccGetByKey(req *kvrpcpb.MvccGetByKeyRequest) *kvrpcpb.MvccGetByKeyResponse {
debugger, ok := h.mvccStore.(MVCCDebugger)
if !ok {
return &kvrpcpb.MvccGetByKeyResponse{
Error: "not implement",
}
}
if !h.checkKeyInRegion(req.Key) {
panic("MvccGetByKey: key not in region")
}
var resp kvrpcpb.MvccGetByKeyResponse
resp.Info = debugger.MvccGetByKey(req.Key)
return &resp
}
func (h *rpcHandler) handleMvccGetByStartTS(req *kvrpcpb.MvccGetByStartTsRequest) *kvrpcpb.MvccGetByStartTsResponse {
debugger, ok := h.mvccStore.(MVCCDebugger)
if !ok {
return &kvrpcpb.MvccGetByStartTsResponse{
Error: "not implement",
}
}
var resp kvrpcpb.MvccGetByStartTsResponse
resp.Info, resp.Key = debugger.MvccGetByStartTS(h.startKey, h.endKey, req.StartTs)
return &resp
}
func (h *rpcHandler) handleKvBatchRollback(req *kvrpcpb.BatchRollbackRequest) *kvrpcpb.BatchRollbackResponse {
err := h.mvccStore.Rollback(req.Keys, req.StartVersion)
if err != nil {
return &kvrpcpb.BatchRollbackResponse{
Error: convertToKeyError(err),
}
}
return &kvrpcpb.BatchRollbackResponse{}
}
func (h *rpcHandler) handleKvScanLock(req *kvrpcpb.ScanLockRequest) *kvrpcpb.ScanLockResponse {
startKey := MvccKey(h.startKey).Raw()
endKey := MvccKey(h.endKey).Raw()
locks, err := h.mvccStore.ScanLock(startKey, endKey, req.GetMaxVersion())
if err != nil {
return &kvrpcpb.ScanLockResponse{
Error: convertToKeyError(err),
}
}
return &kvrpcpb.ScanLockResponse{
Locks: locks,
}
}
func (h *rpcHandler) handleKvResolveLock(req *kvrpcpb.ResolveLockRequest) *kvrpcpb.ResolveLockResponse {
startKey := MvccKey(h.startKey).Raw()
endKey := MvccKey(h.endKey).Raw()
err := h.mvccStore.ResolveLock(startKey, endKey, req.GetStartVersion(), req.GetCommitVersion())
if err != nil {
return &kvrpcpb.ResolveLockResponse{
Error: convertToKeyError(err),
}
}
return &kvrpcpb.ResolveLockResponse{}
}
func (h *rpcHandler) handleKvDeleteRange(req *kvrpcpb.DeleteRangeRequest) *kvrpcpb.DeleteRangeResponse {
if !h.checkKeyInRegion(req.StartKey) {
panic("KvDeleteRange: key not in region")
}
var resp kvrpcpb.DeleteRangeResponse
err := h.mvccStore.DeleteRange(req.StartKey, req.EndKey)
if err != nil {
resp.Error = err.Error()
}
return &resp
}
func (h *rpcHandler) handleKvRawGet(req *kvrpcpb.RawGetRequest) *kvrpcpb.RawGetResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
return &kvrpcpb.RawGetResponse{
Error: "not implemented",
}
}
return &kvrpcpb.RawGetResponse{
Value: rawKV.RawGet(req.GetKey()),
}
}
func (h *rpcHandler) handleKvRawBatchGet(req *kvrpcpb.RawBatchGetRequest) *kvrpcpb.RawBatchGetResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
// TODO should we add error ?
return &kvrpcpb.RawBatchGetResponse{
RegionError: &errorpb.Error{
Message: "not implemented",
},
}
}
values := rawKV.RawBatchGet(req.Keys)
kvPairs := make([]*kvrpcpb.KvPair, len(values))
for i, key := range req.Keys {
kvPairs[i] = &kvrpcpb.KvPair{
Key: key,
Value: values[i],
}
}
return &kvrpcpb.RawBatchGetResponse{
Pairs: kvPairs,
}
}
func (h *rpcHandler) handleKvRawPut(req *kvrpcpb.RawPutRequest) *kvrpcpb.RawPutResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
return &kvrpcpb.RawPutResponse{
Error: "not implemented",
}
}
rawKV.RawPut(req.GetKey(), req.GetValue())
return &kvrpcpb.RawPutResponse{}
}
func (h *rpcHandler) handleKvRawBatchPut(req *kvrpcpb.RawBatchPutRequest) *kvrpcpb.RawBatchPutResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
return &kvrpcpb.RawBatchPutResponse{
Error: "not implemented",
}
}
keys := make([][]byte, 0, len(req.Pairs))
values := make([][]byte, 0, len(req.Pairs))
for _, pair := range req.Pairs {
keys = append(keys, pair.Key)
values = append(values, pair.Value)
}
rawKV.RawBatchPut(keys, values)
return &kvrpcpb.RawBatchPutResponse{}
}
func (h *rpcHandler) handleKvRawDelete(req *kvrpcpb.RawDeleteRequest) *kvrpcpb.RawDeleteResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
return &kvrpcpb.RawDeleteResponse{
Error: "not implemented",
}
}
rawKV.RawDelete(req.GetKey())
return &kvrpcpb.RawDeleteResponse{}
}
func (h *rpcHandler) handleKvRawBatchDelete(req *kvrpcpb.RawBatchDeleteRequest) *kvrpcpb.RawBatchDeleteResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
return &kvrpcpb.RawBatchDeleteResponse{
Error: "not implemented",
}
}
rawKV.RawBatchDelete(req.Keys)
return &kvrpcpb.RawBatchDeleteResponse{}
}
func (h *rpcHandler) handleKvRawDeleteRange(req *kvrpcpb.RawDeleteRangeRequest) *kvrpcpb.RawDeleteRangeResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
return &kvrpcpb.RawDeleteRangeResponse{
Error: "not implemented",
}
}
rawKV.RawDeleteRange(req.GetStartKey(), req.GetEndKey())
return &kvrpcpb.RawDeleteRangeResponse{}
}
func (h *rpcHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawScanResponse {
rawKV, ok := h.mvccStore.(RawKV)
if !ok {
errStr := "not implemented"
return &kvrpcpb.RawScanResponse{
RegionError: &errorpb.Error{
Message: errStr,
},
}
}
endKey := h.endKey
if len(req.EndKey) > 0 && (len(endKey) == 0 || bytes.Compare(req.EndKey, endKey) < 0) {
endKey = req.EndKey
}
pairs := rawKV.RawScan(req.GetStartKey(), endKey, int(req.GetLimit()))
return &kvrpcpb.RawScanResponse{
Kvs: convertToPbPairs(pairs),
}
}
func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse {
key := NewMvccKey(req.GetSplitKey())
region, _ := h.cluster.GetRegionByKey(key)
if bytes.Equal(region.GetStartKey(), key) {
return &kvrpcpb.SplitRegionResponse{}
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
return &kvrpcpb.SplitRegionResponse{}
}
// RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of
// a rpc client at tikv's side.
type RPCClient struct {
Cluster *Cluster
MvccStore MVCCStore
streamTimeout chan *rpc.Lease
}
// NewRPCClient creates an RPCClient.
// Note that close the RPCClient may close the underlying MvccStore.
func NewRPCClient(cluster *Cluster, mvccStore MVCCStore) *RPCClient {
ch := make(chan *rpc.Lease)
go rpc.CheckStreamTimeoutLoop(ch)
return &RPCClient{
Cluster: cluster,
MvccStore: mvccStore,
streamTimeout: ch,
}
}
func (c *RPCClient) getAndCheckStoreByAddr(addr string) (*metapb.Store, error) {
store, err := c.Cluster.GetAndCheckStoreByAddr(addr)
if err != nil {
return nil, err
}
if store == nil {
return nil, errors.New("connect fail")
}
if store.GetState() == metapb.StoreState_Offline ||
store.GetState() == metapb.StoreState_Tombstone {
return nil, errors.New("connection refused")
}
return store, nil
}
func (c *RPCClient) checkArgs(ctx context.Context, addr string) (*rpcHandler, error) {
if err := checkGoContext(ctx); err != nil {
return nil, err
}
store, err := c.getAndCheckStoreByAddr(addr)
if err != nil {
return nil, err
}
handler := &rpcHandler{
cluster: c.Cluster,
mvccStore: c.MvccStore,
// set store id for current request
storeID: store.GetId(),
}
return handler, nil
}
// SendRequest sends a request to mock cluster.
func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *rpc.Request, timeout time.Duration) (*rpc.Response, error) {
// gofail: var rpcServerBusy bool
// if rpcServerBusy {
// return rpc.GenRegionErrorResp(req, &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}})
// }
handler, err := c.checkArgs(ctx, addr)
if err != nil {
return nil, err
}
reqCtx := &req.Context
resp := &rpc.Response{}
resp.Type = req.Type
switch req.Type {
case rpc.CmdGet:
r := req.Get
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.Get = &kvrpcpb.GetResponse{RegionError: err}
return resp, nil
}
resp.Get = handler.handleKvGet(r)
case rpc.CmdScan:
r := req.Scan
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.Scan = &kvrpcpb.ScanResponse{RegionError: err}
return resp, nil
}
resp.Scan = handler.handleKvScan(r)
case rpc.CmdPrewrite:
r := req.Prewrite
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.Prewrite = &kvrpcpb.PrewriteResponse{RegionError: err}
return resp, nil
}
resp.Prewrite = handler.handleKvPrewrite(r)
case rpc.CmdCommit:
// gofail: var rpcCommitResult string
// switch rpcCommitResult {
// case "timeout":
// return nil, errors.New("timeout")
// case "notLeader":
// return &rpc.Response{
// Type: rpc.CmdCommit,
// Commit: &kvrpcpb.CommitResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}},
// }, nil
// case "keyError":
// return &rpc.Response{
// Type: rpc.CmdCommit,
// Commit: &kvrpcpb.CommitResponse{Error: &kvrpcpb.KeyError{}},
// }, nil
// }
r := req.Commit
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.Commit = &kvrpcpb.CommitResponse{RegionError: err}
return resp, nil
}
resp.Commit = handler.handleKvCommit(r)
// gofail: var rpcCommitTimeout bool
// if rpcCommitTimeout {
// return nil, undeterminedErr
// }
case rpc.CmdCleanup:
r := req.Cleanup
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.Cleanup = &kvrpcpb.CleanupResponse{RegionError: err}
return resp, nil
}
resp.Cleanup = handler.handleKvCleanup(r)
case rpc.CmdBatchGet:
r := req.BatchGet
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.BatchGet = &kvrpcpb.BatchGetResponse{RegionError: err}
return resp, nil
}
resp.BatchGet = handler.handleKvBatchGet(r)
case rpc.CmdBatchRollback:
r := req.BatchRollback
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.BatchRollback = &kvrpcpb.BatchRollbackResponse{RegionError: err}
return resp, nil
}
resp.BatchRollback = handler.handleKvBatchRollback(r)
case rpc.CmdScanLock:
r := req.ScanLock
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.ScanLock = &kvrpcpb.ScanLockResponse{RegionError: err}
return resp, nil
}
resp.ScanLock = handler.handleKvScanLock(r)
case rpc.CmdResolveLock:
r := req.ResolveLock
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.ResolveLock = &kvrpcpb.ResolveLockResponse{RegionError: err}
return resp, nil
}
resp.ResolveLock = handler.handleKvResolveLock(r)
case rpc.CmdGC:
r := req.GC
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.GC = &kvrpcpb.GCResponse{RegionError: err}
return resp, nil
}
resp.GC = &kvrpcpb.GCResponse{}
case rpc.CmdDeleteRange:
r := req.DeleteRange
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.DeleteRange = &kvrpcpb.DeleteRangeResponse{RegionError: err}
return resp, nil
}
resp.DeleteRange = handler.handleKvDeleteRange(r)
case rpc.CmdRawGet:
r := req.RawGet
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawGet = &kvrpcpb.RawGetResponse{RegionError: err}
return resp, nil
}
resp.RawGet = handler.handleKvRawGet(r)
case rpc.CmdRawBatchGet:
r := req.RawBatchGet
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawBatchGet = &kvrpcpb.RawBatchGetResponse{RegionError: err}
return resp, nil
}
resp.RawBatchGet = handler.handleKvRawBatchGet(r)
case rpc.CmdRawPut:
r := req.RawPut
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawPut = &kvrpcpb.RawPutResponse{RegionError: err}
return resp, nil
}
resp.RawPut = handler.handleKvRawPut(r)
case rpc.CmdRawBatchPut:
r := req.RawBatchPut
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawBatchPut = &kvrpcpb.RawBatchPutResponse{RegionError: err}
return resp, nil
}
resp.RawBatchPut = handler.handleKvRawBatchPut(r)
case rpc.CmdRawDelete:
r := req.RawDelete
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawDelete = &kvrpcpb.RawDeleteResponse{RegionError: err}
return resp, nil
}
resp.RawDelete = handler.handleKvRawDelete(r)
case rpc.CmdRawBatchDelete:
r := req.RawBatchDelete
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawBatchDelete = &kvrpcpb.RawBatchDeleteResponse{RegionError: err}
}
resp.RawBatchDelete = handler.handleKvRawBatchDelete(r)
case rpc.CmdRawDeleteRange:
r := req.RawDeleteRange
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawDeleteRange = &kvrpcpb.RawDeleteRangeResponse{RegionError: err}
return resp, nil
}
resp.RawDeleteRange = handler.handleKvRawDeleteRange(r)
case rpc.CmdRawScan:
r := req.RawScan
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.RawScan = &kvrpcpb.RawScanResponse{RegionError: err}
return resp, nil
}
resp.RawScan = handler.handleKvRawScan(r)
case rpc.CmdUnsafeDestroyRange:
panic("unimplemented")
case rpc.CmdCop:
// TODO: support register cop handler.
panic("unimplemented")
// r := req.Cop
// if err := handler.checkRequestContext(reqCtx); err != nil {
// resp.Cop = &coprocessor.Response{RegionError: err}
// return resp, nil
// }
// handler.rawStartKey = MvccKey(handler.startKey).Raw()
// handler.rawEndKey = MvccKey(handler.endKey).Raw()
// var res *coprocessor.Response
// switch r.GetTp() {
// case kv.ReqTypeDAG:
// res = handler.handleCopDAGRequest(r)
// case kv.ReqTypeAnalyze:
// res = handler.handleCopAnalyzeRequest(r)
// case kv.ReqTypeChecksum:
// res = handler.handleCopChecksumRequest(r)
// default:
// panic(fmt.Sprintf("unknown coprocessor request type: %v", r.GetTp()))
// }
// resp.Cop = res
case rpc.CmdCopStream:
// TODO: support register copStream handler.
panic("unimplemented")
// r := req.Cop
// if err := handler.checkRequestContext(reqCtx); err != nil {
// resp.CopStream = &rpc.CopStreamResponse{
// Tikv_CoprocessorStreamClient: &mockCopStreamErrClient{Error: err},
// Response: &coprocessor.Response{
// RegionError: err,
// },
// }
// return resp, nil
// }
// handler.rawStartKey = MvccKey(handler.startKey).Raw()
// handler.rawEndKey = MvccKey(handler.endKey).Raw()
// ctx1, cancel := context.WithCancel(ctx)
// copStream, err := handler.handleCopStream(ctx1, r)
// if err != nil {
// cancel()
// return nil, errors.Trace(err)
// }
// streamResp := &rpc.CopStreamResponse{
// Tikv_CoprocessorStreamClient: copStream,
// }
// streamResp.Lease.Cancel = cancel
// streamResp.Timeout = timeout
// c.streamTimeout <- &streamResp.Lease
// first, err := streamResp.Recv()
// if err != nil {
// return nil, errors.Trace(err)
// }
// streamResp.Response = first
// resp.CopStream = streamResp
case rpc.CmdMvccGetByKey:
r := req.MvccGetByKey
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.MvccGetByKey = &kvrpcpb.MvccGetByKeyResponse{RegionError: err}
return resp, nil
}
resp.MvccGetByKey = handler.handleMvccGetByKey(r)
case rpc.CmdMvccGetByStartTs:
r := req.MvccGetByStartTs
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.MvccGetByStartTS = &kvrpcpb.MvccGetByStartTsResponse{RegionError: err}
return resp, nil
}
resp.MvccGetByStartTS = handler.handleMvccGetByStartTS(r)
case rpc.CmdSplitRegion:
r := req.SplitRegion
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
resp.SplitRegion = &kvrpcpb.SplitRegionResponse{RegionError: err}
return resp, nil
}
resp.SplitRegion = handler.handleSplitRegion(r)
default:
return nil, errors.Errorf("unsupport this request type %v", req.Type)
}
return resp, nil
}
// Close closes the client.
func (c *RPCClient) Close() error {
close(c.streamTimeout)
if raw, ok := c.MvccStore.(io.Closer); ok {
return raw.Close()
}
return nil
}

View File

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package raw
package rawkv
import "github.com/pingcap/errors"

View File

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package raw
package rawkv
import (
"bytes"
@ -270,9 +270,12 @@ func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error {
return nil
}
// Scan queries continuous kv pairs, starts from startKey, up to limit pairs.
// If you want to exclude the startKey, append a '\0' to the key: `Scan(append(startKey, '\0'), limit)`.
func (c *RawKVClient) Scan(startKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
// Scan queries continuous kv pairs in range [startKey, endKey), up to limit pairs.
// If endKey is empty, it means unbounded.
// If you want to exclude the startKey or include the endKey, append a '\0' to the key. For example, to scan
// (startKey, endKey], you can write:
// `Scan(append(startKey, '\0'), append(endKey, '\0'), limit)`.
func (c *RawKVClient) Scan(startKey, endKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
start := time.Now()
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("raw_scan").Observe(time.Since(start).Seconds()) }()
@ -285,6 +288,7 @@ func (c *RawKVClient) Scan(startKey []byte, limit int) (keys [][]byte, values []
Type: rpc.CmdRawScan,
RawScan: &kvrpcpb.RawScanRequest{
StartKey: startKey,
EndKey: endKey,
Limit: uint32(limit - len(keys)),
},
}

266
rawkv/rawkv_test.go Normal file
View File

@ -0,0 +1,266 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package rawkv
import (
"bytes"
"context"
"fmt"
"testing"
. "github.com/pingcap/check"
"github.com/tikv/client-go/locate"
"github.com/tikv/client-go/mockstore/mocktikv"
"github.com/tikv/client-go/retry"
)
func TestT(t *testing.T) {
TestingT(t)
}
type testRawKVSuite struct {
cluster *mocktikv.Cluster
client *RawKVClient
bo *retry.Backoffer
}
var _ = Suite(&testRawKVSuite{})
func (s *testRawKVSuite) SetUpTest(c *C) {
s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(s.cluster)
pdClient := mocktikv.NewPDClient(s.cluster)
mvccStore := mocktikv.MustNewMVCCStore()
s.client = &RawKVClient{
clusterID: 0,
regionCache: locate.NewRegionCache(pdClient),
pdClient: pdClient,
rpcClient: mocktikv.NewRPCClient(s.cluster, mvccStore),
}
s.bo = retry.NewBackoffer(context.Background(), 5000)
}
func (s *testRawKVSuite) TearDownTest(c *C) {
s.client.Close()
}
func (s *testRawKVSuite) mustNotExist(c *C, key []byte) {
v, err := s.client.Get(key)
c.Assert(err, IsNil)
c.Assert(v, IsNil)
}
func (s *testRawKVSuite) mustBatchNotExist(c *C, keys [][]byte) {
values, err := s.client.BatchGet(keys)
c.Assert(err, IsNil)
c.Assert(values, NotNil)
c.Assert(len(keys), Equals, len(values))
for _, value := range values {
c.Assert([]byte{}, BytesEquals, value)
}
}
func (s *testRawKVSuite) mustGet(c *C, key, value []byte) {
v, err := s.client.Get(key)
c.Assert(err, IsNil)
c.Assert(v, NotNil)
c.Assert(v, BytesEquals, value)
}
func (s *testRawKVSuite) mustBatchGet(c *C, keys, values [][]byte) {
checkValues, err := s.client.BatchGet(keys)
c.Assert(err, IsNil)
c.Assert(checkValues, NotNil)
c.Assert(len(keys), Equals, len(checkValues))
for i := range keys {
c.Check(values[i], BytesEquals, checkValues[i])
}
}
func (s *testRawKVSuite) mustPut(c *C, key, value []byte) {
err := s.client.Put(key, value)
c.Assert(err, IsNil)
}
func (s *testRawKVSuite) mustBatchPut(c *C, keys, values [][]byte) {
err := s.client.BatchPut(keys, values)
c.Assert(err, IsNil)
}
func (s *testRawKVSuite) mustDelete(c *C, key []byte) {
err := s.client.Delete(key)
c.Assert(err, IsNil)
}
func (s *testRawKVSuite) mustBatchDelete(c *C, keys [][]byte) {
err := s.client.BatchDelete(keys)
c.Assert(err, IsNil)
}
func (s *testRawKVSuite) mustScan(c *C, startKey string, limit int, expect ...string) {
keys, values, err := s.client.Scan([]byte(startKey), nil, limit)
c.Assert(err, IsNil)
c.Assert(len(keys)*2, Equals, len(expect))
for i := range keys {
c.Assert(string(keys[i]), Equals, expect[i*2])
c.Assert(string(values[i]), Equals, expect[i*2+1])
}
}
func (s *testRawKVSuite) mustScanRange(c *C, startKey string, endKey string, limit int, expect ...string) {
keys, values, err := s.client.Scan([]byte(startKey), []byte(endKey), limit)
c.Assert(err, IsNil)
c.Assert(len(keys)*2, Equals, len(expect))
for i := range keys {
c.Assert(string(keys[i]), Equals, expect[i*2])
c.Assert(string(values[i]), Equals, expect[i*2+1])
}
}
func (s *testRawKVSuite) mustDeleteRange(c *C, startKey, endKey []byte, expected map[string]string) {
err := s.client.DeleteRange(startKey, endKey)
c.Assert(err, IsNil)
for keyStr := range expected {
key := []byte(keyStr)
if bytes.Compare(startKey, key) <= 0 && bytes.Compare(key, endKey) < 0 {
delete(expected, keyStr)
}
}
s.checkData(c, expected)
}
func (s *testRawKVSuite) checkData(c *C, expected map[string]string) {
keys, values, err := s.client.Scan([]byte(""), nil, len(expected)+1)
c.Assert(err, IsNil)
c.Assert(len(expected), Equals, len(keys))
for i, key := range keys {
c.Assert(expected[string(key)], Equals, string(values[i]))
}
}
func (s *testRawKVSuite) split(c *C, regionKey, splitKey string) error {
loc, err := s.client.regionCache.LocateKey(s.bo, []byte(regionKey))
if err != nil {
return err
}
newRegionID, peerID := s.cluster.AllocID(), s.cluster.AllocID()
s.cluster.SplitRaw(loc.Region.GetID(), newRegionID, []byte(splitKey), []uint64{peerID}, peerID)
return nil
}
func (s *testRawKVSuite) TestSimple(c *C) {
s.mustNotExist(c, []byte("key"))
s.mustPut(c, []byte("key"), []byte("value"))
s.mustGet(c, []byte("key"), []byte("value"))
s.mustDelete(c, []byte("key"))
s.mustNotExist(c, []byte("key"))
err := s.client.Put([]byte("key"), []byte(""))
c.Assert(err, NotNil)
}
func (s *testRawKVSuite) TestRawBatch(c *C) {
testNum := 0
size := 0
var testKeys [][]byte
var testValues [][]byte
for i := 0; size/rawBatchPutSize < 4; i++ {
key := fmt.Sprint("key", i)
size += len(key)
testKeys = append(testKeys, []byte(key))
value := fmt.Sprint("value", i)
size += len(value)
testValues = append(testValues, []byte(value))
s.mustNotExist(c, []byte(key))
testNum = i
}
err := s.split(c, "", fmt.Sprint("key", testNum/2))
c.Assert(err, IsNil)
s.mustBatchPut(c, testKeys, testValues)
s.mustBatchGet(c, testKeys, testValues)
s.mustBatchDelete(c, testKeys)
s.mustBatchNotExist(c, testKeys)
}
func (s *testRawKVSuite) TestSplit(c *C) {
s.mustPut(c, []byte("k1"), []byte("v1"))
s.mustPut(c, []byte("k3"), []byte("v3"))
err := s.split(c, "k", "k2")
c.Assert(err, IsNil)
s.mustGet(c, []byte("k1"), []byte("v1"))
s.mustGet(c, []byte("k3"), []byte("v3"))
}
func (s *testRawKVSuite) TestScan(c *C) {
s.mustPut(c, []byte("k1"), []byte("v1"))
s.mustPut(c, []byte("k3"), []byte("v3"))
s.mustPut(c, []byte("k5"), []byte("v5"))
s.mustPut(c, []byte("k7"), []byte("v7"))
check := func() {
s.mustScan(c, "", 1, "k1", "v1")
s.mustScan(c, "k1", 2, "k1", "v1", "k3", "v3")
s.mustScan(c, "", 10, "k1", "v1", "k3", "v3", "k5", "v5", "k7", "v7")
s.mustScan(c, "k2", 2, "k3", "v3", "k5", "v5")
s.mustScan(c, "k2", 3, "k3", "v3", "k5", "v5", "k7", "v7")
s.mustScanRange(c, "", "k1", 1)
s.mustScanRange(c, "k1", "k3", 2, "k1", "v1")
s.mustScanRange(c, "k1", "k5", 10, "k1", "v1", "k3", "v3")
s.mustScanRange(c, "k1", "k5\x00", 10, "k1", "v1", "k3", "v3", "k5", "v5")
s.mustScanRange(c, "k5\x00", "k5\x00\x00", 10)
}
check()
err := s.split(c, "k", "k2")
c.Assert(err, IsNil)
check()
err = s.split(c, "k2", "k5")
c.Assert(err, IsNil)
check()
}
func (s *testRawKVSuite) TestDeleteRange(c *C) {
// Init data
testData := map[string]string{}
for _, i := range []byte("abcd") {
for j := byte('0'); j <= byte('9'); j++ {
key := []byte{i, j}
value := []byte{'v', i, j}
s.mustPut(c, key, value)
testData[string(key)] = string(value)
}
}
err := s.split(c, "b", "b")
c.Assert(err, IsNil)
err = s.split(c, "c", "c")
c.Assert(err, IsNil)
err = s.split(c, "d", "d")
c.Assert(err, IsNil)
s.checkData(c, testData)
s.mustDeleteRange(c, []byte("b"), []byte("c0"), testData)
s.mustDeleteRange(c, []byte("c11"), []byte("c12"), testData)
s.mustDeleteRange(c, []byte("d0"), []byte("d0"), testData)
s.mustDeleteRange(c, []byte("c5"), []byte("d5"), testData)
s.mustDeleteRange(c, []byte("a"), []byte("z"), testData)
}