*: add oracles, latch, kv utils (#5)

* *: add oracles, latch, kv utils

Signed-off-by: disksing <i@disksing.com>
This commit is contained in:
disksing 2019-03-15 02:28:54 +08:00 committed by siddontang
parent af8a9cd0b8
commit 445fc2e42e
28 changed files with 2640 additions and 49 deletions

View File

@ -13,6 +13,8 @@
package codec
import "github.com/tikv/client-go/key"
var (
tablePrefix = []byte{'t'}
recordPrefixSep = []byte("_r")
@ -36,7 +38,7 @@ func appendTableRecordPrefix(buf []byte, tableID int64) []byte {
}
// GenTableRecordPrefix composes record prefix with tableID: "t[tableID]_r".
func GenTableRecordPrefix(tableID int64) Key {
func GenTableRecordPrefix(tableID int64) key.Key {
buf := make([]byte, 0, len(tablePrefix)+8+len(recordPrefixSep))
return appendTableRecordPrefix(buf, tableID)
}
@ -50,13 +52,13 @@ func appendTableIndexPrefix(buf []byte, tableID int64) []byte {
}
// GenTableIndexPrefix composes index prefix with tableID: "t[tableID]_i".
func GenTableIndexPrefix(tableID int64) Key {
func GenTableIndexPrefix(tableID int64) key.Key {
buf := make([]byte, 0, len(tablePrefix)+8+len(indexPrefixSep))
return appendTableIndexPrefix(buf, tableID)
}
// EncodeTableIndexPrefix encodes index prefix with tableID and idxID.
func EncodeTableIndexPrefix(tableID, idxID int64) Key {
func EncodeTableIndexPrefix(tableID, idxID int64) key.Key {
key := make([]byte, 0, prefixLen)
key = appendTableIndexPrefix(key, tableID)
key = EncodeInt(key, idxID)

View File

@ -78,3 +78,13 @@ var (
// MaxBatchWaitTime in nanosecond is the max wait time for batch.
MaxBatchWaitTime time.Duration
)
// Those limits are enforced to make sure the transaction can be well handled by TiKV.
var (
// TxnEntrySizeLimit is limit of single entry size (len(key) + len(value)).
TxnEntrySizeLimit = 6 * 1024 * 1024
// TxnEntryCountLimit is a limit of the number of entries in the MemBuffer.
TxnEntryCountLimit uint64 = 300 * 1000
// TxnTotalSizeLimit is limit of the sum of all entry size.
TxnTotalSizeLimit = 100 * 1024 * 1024
)

41
go.mod
View File

@ -1,21 +1,50 @@
module github.com/tikv/client-go
require (
github.com/coreos/bbolt v1.3.2 // indirect
github.com/coreos/etcd v3.3.12+incompatible // indirect
github.com/coreos/go-semver v0.2.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190212144455-93d5ec2c7f76 // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 // indirect
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
github.com/golang/protobuf v1.2.0
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c
github.com/gorilla/mux v1.7.0 // indirect
github.com/gorilla/websocket v1.4.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.8.1 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/montanaflynn/stats v0.5.0 // indirect
github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
github.com/opentracing/opentracing-go v1.0.2 // indirect
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/errors v0.11.0
github.com/pingcap/gofail v0.0.0-20181115114620-e47081505b9c // indirect
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 // indirect
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190121084144-be0b43ee9241
github.com/pingcap/parser v0.0.0-20190118050330-3c9ff121c591
github.com/pingcap/pd v2.1.2+incompatible
github.com/pingcap/kvproto v0.0.0-20190305055742-ab7debc182d9
github.com/pingcap/pd v2.1.5+incompatible
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.9.2
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446 // indirect
github.com/sirupsen/logrus v1.3.0
google.golang.org/grpc v1.18.0
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/ugorji/go/codec v0.0.0-20190204201341-e444a5086c43 // indirect
github.com/unrolled/render v1.0.0 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.2 // indirect
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.9.1 // indirect
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect
google.golang.org/grpc v1.19.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
)

131
go.sum
View File

@ -1,43 +1,94 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/bbolt v1.3.2 h1:wZwiHHUieZCquLkDL0B8UhzreNWsPHooDAG3q34zk0s=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.12+incompatible h1:pAWNwdf7QiT1zfaWyqCtNZQWCLByQyA3JrSQyuYAqnQ=
github.com/coreos/etcd v3.3.12+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20190212144455-93d5ec2c7f76 h1:FE783w8WFh+Rvg+7bZ5g8p7gP4SeVS4AoNwkvazlsBg=
github.com/coreos/go-systemd v0.0.0-20190212144455-93d5ec2c7f76/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 h1:iwZdTE0PVqJCos1vaoKsclOGD3ADKpshg3SRtYBbwso=
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 h1:clC1lXBpe2kTj2VHdaIu9ajZQe4kcEY9j0NsnDDBZ3o=
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9 h1:EGUd+AQfZoi1OwZAoqekLbl4kq6tafFtKQSiN8nL21Y=
github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef h1:veQD95Isof8w9/WXiA+pa3tz3fJXkt5B7QaRBrM62gk=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c=
github.com/gorilla/mux v1.7.0 h1:tOSd0UKHQd6urX6ApfOn4XdBMY6Sh1MfxV3kmaazO+U=
github.com/gorilla/mux v1.7.0/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:BWIsLfhgKhV5g/oF34aRjniBHLTZe5DNekSjbAjIS6c=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/grpc-ecosystem/grpc-gateway v1.8.1 h1:VNUuLKyFcJ5IektwBKcZU4J5GJKEt+Odb8dl1d61BGQ=
github.com/grpc-ecosystem/grpc-gateway v1.8.1/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/montanaflynn/stats v0.5.0 h1:2EkzeTSqBB4V4bJwWrt5gIIrZmpJBcoIRGS2kWLgzmk=
github.com/montanaflynn/stats v0.5.0/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8 h1:USx2/E1bX46VG32FIw034Au6seQ2fY9NEILmNh/UlQg=
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/gofail v0.0.0-20181115114620-e47081505b9c/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns=
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3 h1:04yuCf5NMvLU8rB2m4Qs3rynH7EYpMno3lHkewIOdMo=
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190121084144-be0b43ee9241 h1:fzec5WdYKkEtxONmqAw8A67nBoYkawCryamFw5D6oIY=
github.com/pingcap/kvproto v0.0.0-20190121084144-be0b43ee9241/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/parser v0.0.0-20190118050330-3c9ff121c591 h1:JM8Hc82qb34a0/vDc5pWmeX/wShWWoEaiV6oCnENxzg=
github.com/pingcap/parser v0.0.0-20190118050330-3c9ff121c591/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v2.1.2+incompatible h1:VQmYV7B/7ZdPmbDUHcz2jSswTgInrgWhAfF0YuPAlLw=
github.com/pingcap/pd v2.1.2+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E=
github.com/pingcap/kvproto v0.0.0-20190305055742-ab7debc182d9 h1:EsTt42btov+tFchxOFKnxBNmXOWyPKiddOwvr/WO90g=
github.com/pingcap/kvproto v0.0.0-20190305055742-ab7debc182d9/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/pd v2.1.5+incompatible h1:vOLV2tSQdRjjmxaTXtJULoC94dYQOd+6fzn2yChODHc=
github.com/pingcap/pd v2.1.5+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
@ -47,32 +98,78 @@ github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jO
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446 h1:/NRJ5vAYoqz+7sG51ubIDHXeWO8DlTSrToPu6q11ziA=
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.2 h1:JON3E2/GPW2iDNGoSAusl1KDf5TRQ8k8q7Tp097pZGs=
github.com/ugorji/go v1.1.2/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
github.com/ugorji/go/codec v0.0.0-20190204201341-e444a5086c43 h1:BasDe+IErOQKrMVXab7UayvSlIpiyGwRvuX3EKYY7UA=
github.com/ugorji/go/codec v0.0.0-20190204201341-e444a5086c43/go.mod h1:iT03XoTwV7xq/+UGwKO3UbC1nNNlopQiY61beSdrtOA=
github.com/unrolled/render v1.0.0 h1:XYtvhA3UkpB7PqkvhUFYmpKD55OudoIeygcfus4vcd4=
github.com/unrolled/render v1.0.0/go.mod h1:tu82oB5W2ykJRVioYsB+IQKcft7ryBr7w12qMBUPyXg=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc h1:a3CU5tJYVj92DY2LaA1kUkrsqD5/3mLDhx2NcNqyW+0=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3 h1:eH6Eip3UpmR+yM/qI9Ijluzb1bNv/cAU/n+6l8tRSis=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8 h1:YoY1wS6JYVRpIfFngRf2HHo9R9dAne3xbkGOQ5rJXjU=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.18.0 h1:IZl7mfBGfbhYx2p2rKRtYgDFw6SBz+kclmxYrCksPPA=
google.golang.org/grpc v1.18.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
google.golang.org/grpc v1.19.0 h1:cfg4PD8YEdSFnm7qLV4++93WcmhH2nIUhMjhdCvl3j8=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -11,7 +11,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package codec
package key
import "bytes"
// Key represents high-level Key type.
type Key []byte
@ -51,3 +53,19 @@ func (k Key) PrefixNext() Key {
}
return buf
}
// Cmp returns the comparison result of two keys.
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
func (k Key) Cmp(another Key) int {
return bytes.Compare(k, another)
}
// Clone returns a copy of the Key.
func (k Key) Clone() Key {
return append([]byte(nil), k...)
}
// HasPrefix tests whether the Key begins with prefix.
func (k Key) HasPrefix(prefix Key) bool {
return bytes.HasPrefix(k, prefix)
}

View File

@ -211,6 +211,15 @@ var (
Buckets: prometheus.ExponentialBuckets(1, 2, 30),
Help: "batch wait duration",
})
TSFutureWaitDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tikv",
Subsystem: "pdclient",
Name: "ts_future_wait_seconds",
Help: "Bucketed histogram of seconds cost for waiting timestamp future.",
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 18), // 5us ~ 128 ms
})
)
// RetLabel returns "ok" when err == nil and "err" when err != nil.
@ -244,4 +253,5 @@ func init() {
prometheus.MustRegister(RegionCacheCounter)
prometheus.MustRegister(PendingBatchRequests)
prometheus.MustRegister(BatchWaitDuration)
prometheus.MustRegister(TSFutureWaitDuration)
}

View File

@ -25,7 +25,6 @@ import (
"github.com/pingcap/goleveldb/leveldb/storage"
"github.com/pingcap/goleveldb/leveldb/util"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/codec"
)
@ -421,7 +420,7 @@ func (mvcc *MVCCLevelDB) ReverseScan(startKey, endKey []byte, limit int, startTS
succ := iter.Last()
currKey, _, err := mvccDecode(iter.Key())
// TODO: return error.
terror.Log(errors.Trace(err))
log.Error(err)
helper := reverseScanHelper{
startTS: startTS,
isoLevel: isoLevel,
@ -890,7 +889,7 @@ func (mvcc *MVCCLevelDB) RawPut(key, value []byte) {
if value == nil {
value = []byte{}
}
terror.Log(mvcc.db.Put(key, value, nil))
log.Error(mvcc.db.Put(key, value, nil))
}
// RawBatchPut implements the RawKV interface
@ -906,7 +905,7 @@ func (mvcc *MVCCLevelDB) RawBatchPut(keys, values [][]byte) {
}
batch.Put(key, value)
}
terror.Log(mvcc.db.Write(batch, nil))
log.Error(mvcc.db.Write(batch, nil))
}
// RawGet implements the RawKV interface.
@ -915,7 +914,7 @@ func (mvcc *MVCCLevelDB) RawGet(key []byte) []byte {
defer mvcc.mu.Unlock()
ret, err := mvcc.db.Get(key, nil)
terror.Log(err)
log.Error(err)
return ret
}
@ -927,7 +926,7 @@ func (mvcc *MVCCLevelDB) RawBatchGet(keys [][]byte) [][]byte {
var values [][]byte
for _, key := range keys {
value, err := mvcc.db.Get(key, nil)
terror.Log(err)
log.Error(err)
values = append(values, value)
}
return values
@ -938,7 +937,7 @@ func (mvcc *MVCCLevelDB) RawDelete(key []byte) {
mvcc.mu.Lock()
defer mvcc.mu.Unlock()
terror.Log(mvcc.db.Delete(key, nil))
log.Error(mvcc.db.Delete(key, nil))
}
// RawBatchDelete implements the RawKV interface.
@ -950,7 +949,7 @@ func (mvcc *MVCCLevelDB) RawBatchDelete(keys [][]byte) {
for _, key := range keys {
batch.Delete(key)
}
terror.Log(mvcc.db.Write(batch, nil))
log.Error(mvcc.db.Write(batch, nil))
}
// RawScan implements the RawKV interface.
@ -981,7 +980,7 @@ func (mvcc *MVCCLevelDB) RawScan(startKey, endKey []byte, limit int) []Pair {
// RawDeleteRange implements the RawKV interface.
func (mvcc *MVCCLevelDB) RawDeleteRange(startKey, endKey []byte) {
terror.Log(mvcc.doRawDeleteRange(startKey, endKey))
log.Error(mvcc.doRawDeleteRange(startKey, endKey))
}
// doRawDeleteRange deletes all keys in a range and return the error if any.

View File

@ -98,7 +98,7 @@ func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store,
return store, nil
}
func (c *pdClient) GetAllStores(ctx context.Context) ([]*metapb.Store, error) {
func (c *pdClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
panic(errors.New("unimplemented"))
}

View File

@ -24,12 +24,11 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/terror"
"github.com/tikv/client-go/rpc"
)
// For gofail injection.
var undeterminedErr = terror.ErrResultUndetermined
var errUndeterminedErr = errors.New("undetermined")
const requestMaxSize = 8 * 1024 * 1024
@ -168,14 +167,14 @@ func (h *rpcHandler) checkRequestContext(ctx *kvrpcpb.Context) *errorpb.Error {
// Region epoch does not match.
if !proto.Equal(region.GetRegionEpoch(), ctx.GetRegionEpoch()) {
nextRegion, _ := h.cluster.GetRegionByKey(region.GetEndKey())
newRegions := []*metapb.Region{region}
currentRegions := []*metapb.Region{region}
if nextRegion != nil {
newRegions = append(newRegions, nextRegion)
currentRegions = append(currentRegions, nextRegion)
}
return &errorpb.Error{
Message: *proto.String("stale epoch"),
StaleEpoch: &errorpb.StaleEpoch{
NewRegions: newRegions,
EpochNotMatch: &errorpb.EpochNotMatch{
CurrentRegions: currentRegions,
},
}
}
@ -606,7 +605,7 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *rpc.Reque
resp.Commit = handler.handleKvCommit(r)
// gofail: var rpcCommitTimeout bool
// if rpcCommitTimeout {
// return nil, undeterminedErr
// return nil, errUndeterminedErr
// }
case rpc.CmdCleanup:
r := req.Cleanup

View File

@ -92,7 +92,7 @@ func (s *RegionRequestSender) SendReq(bo *retry.Backoffer, req *Request, regionI
// TODO: Change the returned error to something like "region missing in cache",
// and handle this error like StaleEpoch, which means to re-split the request and retry.
return GenRegionErrorResp(req, &errorpb.Error{StaleEpoch: &errorpb.StaleEpoch{}})
return GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})
}
s.storeAddr = ctx.Addr
@ -174,8 +174,8 @@ func regionErrorToLabel(e *errorpb.Error) string {
return "region_not_found"
} else if e.GetKeyNotInRegion() != nil {
return "key_not_in_region"
} else if e.GetStaleEpoch() != nil {
return "stale_epoch"
} else if e.GetEpochNotMatch() != nil {
return "epoch_not_match"
} else if e.GetServerIsBusy() != nil {
return "server_is_busy"
} else if e.GetStaleCommand() != nil {
@ -214,9 +214,9 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *locate.RPC
return true, nil
}
if staleEpoch := regionErr.GetStaleEpoch(); staleEpoch != nil {
if epochNotMatch := regionErr.GetEpochNotMatch(); epochNotMatch != nil {
log.Debugf("tikv reports `StaleEpoch`, ctx: %v, retry later", ctx)
err = s.regionCache.OnRegionStale(ctx, staleEpoch.NewRegions)
err = s.regionCache.OnRegionStale(ctx, epochNotMatch.CurrentRegions)
return false, errors.Trace(err)
}
if regionErr.GetServerIsBusy() != nil {

118
txnkv/kv/buffer_store.go Normal file
View File

@ -0,0 +1,118 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"github.com/pingcap/errors"
"github.com/tikv/client-go/key"
)
var (
// DefaultTxnMembufCap is the default transaction membuf capability.
DefaultTxnMembufCap = 4 * 1024
// ImportingTxnMembufCap is the capability of tidb importing data situation.
ImportingTxnMembufCap = 32 * 1024
// TempTxnMemBufCap is the capability of temporary membuf.
TempTxnMemBufCap = 64
)
// BufferStore wraps a Retriever for read and a MemBuffer for buffered write.
// Common usage pattern:
// bs := NewBufferStore(r) // use BufferStore to wrap a Retriever
// // ...
// // read/write on bs
// // ...
// bs.SaveTo(m) // save above operations to a Mutator
type BufferStore struct {
MemBuffer
r Retriever
}
// NewBufferStore creates a BufferStore using r for read.
func NewBufferStore(r Retriever, cap int) *BufferStore {
if cap <= 0 {
cap = DefaultTxnMembufCap
}
return &BufferStore{
r: r,
MemBuffer: &lazyMemBuffer{cap: cap},
}
}
// Reset resets s.MemBuffer.
func (s *BufferStore) Reset() {
s.MemBuffer.Reset()
}
// SetCap sets the MemBuffer capability.
func (s *BufferStore) SetCap(cap int) {
s.MemBuffer.SetCap(cap)
}
// Get implements the Retriever interface.
func (s *BufferStore) Get(k key.Key) ([]byte, error) {
val, err := s.MemBuffer.Get(k)
if IsErrNotFound(err) {
val, err = s.r.Get(k)
}
if err != nil {
return nil, errors.Trace(err)
}
if len(val) == 0 {
return nil, ErrNotExist
}
return val, nil
}
// Iter implements the Retriever interface.
func (s *BufferStore) Iter(k key.Key, upperBound key.Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.Iter(k, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
retrieverIt, err := s.r.Iter(k, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
return NewUnionIter(bufferIt, retrieverIt, false)
}
// IterReverse implements the Retriever interface.
func (s *BufferStore) IterReverse(k key.Key) (Iterator, error) {
bufferIt, err := s.MemBuffer.IterReverse(k)
if err != nil {
return nil, errors.Trace(err)
}
retrieverIt, err := s.r.IterReverse(k)
if err != nil {
return nil, errors.Trace(err)
}
return NewUnionIter(bufferIt, retrieverIt, true)
}
// WalkBuffer iterates all buffered kv pairs.
func (s *BufferStore) WalkBuffer(f func(k key.Key, v []byte) error) error {
return errors.Trace(WalkMemBuffer(s.MemBuffer, f))
}
// SaveTo saves all buffered kv pairs into a Mutator.
func (s *BufferStore) SaveTo(m Mutator) error {
err := s.WalkBuffer(func(k key.Key, v []byte) error {
if len(v) == 0 {
return errors.Trace(m.Delete(k))
}
return errors.Trace(m.Set(k, v))
})
return errors.Trace(err)
}

View File

@ -0,0 +1,69 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"bytes"
"fmt"
"testing"
. "github.com/pingcap/check"
"github.com/tikv/client-go/key"
)
func TestT(t *testing.T) {
TestingT(t)
}
type testBufferStoreSuite struct{}
var _ = Suite(testBufferStoreSuite{})
func (s testBufferStoreSuite) TestGetSet(c *C) {
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(DefaultTxnMembufCap)}, DefaultTxnMembufCap)
key := key.Key("key")
_, err := bs.Get(key)
c.Check(err, NotNil)
err = bs.Set(key, []byte("value"))
c.Check(err, IsNil)
value, err := bs.Get(key)
c.Check(err, IsNil)
c.Check(bytes.Compare(value, []byte("value")), Equals, 0)
}
func (s testBufferStoreSuite) TestSaveTo(c *C) {
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(DefaultTxnMembufCap)}, DefaultTxnMembufCap)
var buf bytes.Buffer
for i := 0; i < 10; i++ {
fmt.Fprint(&buf, i)
err := bs.Set(buf.Bytes(), buf.Bytes())
c.Check(err, IsNil)
buf.Reset()
}
bs.Set(key.Key("novalue"), nil)
mutator := NewMemDbBuffer(DefaultTxnMembufCap)
err := bs.SaveTo(mutator)
c.Check(err, IsNil)
iter, err := mutator.Iter(nil, nil)
c.Check(err, IsNil)
for iter.Valid() {
cmp := bytes.Compare(iter.Key(), iter.Value())
c.Check(cmp, Equals, 0)
iter.Next()
}
}

36
txnkv/kv/error.go Normal file
View File

@ -0,0 +1,36 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"github.com/pingcap/errors"
)
var (
// ErrNotExist is used when try to get an entry with an unexist key from KV store.
ErrNotExist = errors.New("key not exist")
// ErrCannotSetNilValue is the error when sets an empty value.
ErrCannotSetNilValue = errors.New("can not set nil value")
// ErrTxnTooLarge is the error when transaction is too large, lock time reached the maximum value.
ErrTxnTooLarge = errors.New("transaction is too large")
// ErrEntryTooLarge is the error when a key value entry is too large.
ErrEntryTooLarge = errors.New("entry is too large")
// ErrKeyExists returns when key is already exist.
ErrKeyExists = errors.New("key already exist")
)
// IsErrNotFound checks if err is a kind of NotFound error.
func IsErrNotFound(err error) bool {
return errors.Cause(err) == ErrNotExist
}

124
txnkv/kv/kv.go Normal file
View File

@ -0,0 +1,124 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import "github.com/tikv/client-go/key"
// Priority value for transaction priority.
const (
PriorityNormal = iota
PriorityLow
PriorityHigh
)
// IsoLevel is the transaction's isolation level.
type IsoLevel int
const (
// SI stands for 'snapshot isolation'.
SI IsoLevel = iota
// RC stands for 'read committed'.
RC
)
// Retriever is the interface wraps the basic Get and Seek methods.
type Retriever interface {
// Get gets the value for key k from kv store.
// If corresponding kv pair does not exist, it returns nil and ErrNotExist.
Get(k key.Key) ([]byte, error)
// Iter creates an Iterator positioned on the first entry that k <= entry's key.
// If such entry is not found, it returns an invalid Iterator with no error.
// It yields only keys that < upperBound. If upperBound is nil, it means the upperBound is unbounded.
// The Iterator must be Closed after use.
Iter(k key.Key, upperBound key.Key) (Iterator, error)
// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
// The returned iterator will iterate from greater key to smaller key.
// If k is nil, the returned iterator will be positioned at the last key.
// TODO: Add lower bound limit
IterReverse(k key.Key) (Iterator, error)
}
// Mutator is the interface wraps the basic Set and Delete methods.
type Mutator interface {
// Set sets the value for key k as v into kv store.
// v must NOT be nil or empty, otherwise it returns ErrCannotSetNilValue.
Set(k key.Key, v []byte) error
// Delete removes the entry for key k from kv store.
Delete(k key.Key) error
}
// RetrieverMutator is the interface that groups Retriever and Mutator interfaces.
type RetrieverMutator interface {
Retriever
Mutator
}
// MemBuffer is an in-memory kv collection, can be used to buffer write operations.
type MemBuffer interface {
RetrieverMutator
// Size returns sum of keys and values length.
Size() int
// Len returns the number of entries in the DB.
Len() int
// Reset cleanup the MemBuffer
Reset()
// SetCap sets the MemBuffer capability, to reduce memory allocations.
// Please call it before you use the MemBuffer, otherwise it will not works.
SetCap(cap int)
}
// Snapshot defines the interface for the snapshot fetched from KV store.
type Snapshot interface {
Retriever
// BatchGet gets a batch of values from snapshot.
BatchGet(keys []key.Key) (map[string][]byte, error)
// SetPriority snapshot set the priority
SetPriority(priority int)
}
// Iterator is the interface for a iterator on KV store.
type Iterator interface {
Valid() bool
Key() key.Key
Value() []byte
Next() error
Close()
}
// Transaction options
const (
// PresumeKeyNotExists indicates that when dealing with a Get operation but failing to read data from cache,
// we presume that the key does not exist in Store. The actual existence will be checked before the
// transaction's commit.
// This option is an optimization for frequent checks during a transaction, e.g. batch inserts.
PresumeKeyNotExists Option = iota + 1
// PresumeKeyNotExistsError is the option key for error.
// When PresumeKeyNotExists is set and condition is not match, should throw the error.
PresumeKeyNotExistsError
// BinlogInfo contains the binlog data and client.
BinlogInfo
// SchemaChecker is used for checking schema-validity.
SchemaChecker
// IsolationLevel sets isolation level for current transaction. The default level is SI.
IsolationLevel
// Priority marks the priority of this transaction.
Priority
// NotFillCache makes this request do not touch the LRU cache of the underlying storage.
NotFillCache
// SyncLog decides whether the WAL(write-ahead log) of this request should be synchronized.
SyncLog
// KeyOnly retrieve only keys, it can be used in scan now.
KeyOnly
)

280
txnkv/kv/mem_buffer_test.go Normal file
View File

@ -0,0 +1,280 @@
// Copyright 2015 PingCAP, Inc.
//
// Copyright 2015 Wenbin Xiao
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"fmt"
"math/rand"
"testing"
. "github.com/pingcap/check"
)
const (
startIndex = 0
testCount = 2
indexStep = 2
)
var _ = Suite(&testKVSuite{})
type testKVSuite struct {
bs []MemBuffer
}
func (s *testKVSuite) SetUpSuite(c *C) {
s.bs = make([]MemBuffer, 1)
s.bs[0] = NewMemDbBuffer(DefaultTxnMembufCap)
}
func (s *testKVSuite) ResetMembuffers() {
s.bs[0] = NewMemDbBuffer(DefaultTxnMembufCap)
}
func insertData(c *C, buffer MemBuffer) {
for i := startIndex; i < testCount; i++ {
val := encodeInt(i * indexStep)
err := buffer.Set(val, val)
c.Assert(err, IsNil)
}
}
func encodeInt(n int) []byte {
return []byte(fmt.Sprintf("%010d", n))
}
func decodeInt(s []byte) int {
var n int
fmt.Sscanf(string(s), "%010d", &n)
return n
}
func valToStr(c *C, iter Iterator) string {
val := iter.Value()
return string(val)
}
func checkNewIterator(c *C, buffer MemBuffer) {
for i := startIndex; i < testCount; i++ {
val := encodeInt(i * indexStep)
iter, err := buffer.Iter(val, nil)
c.Assert(err, IsNil)
c.Assert([]byte(iter.Key()), BytesEquals, val)
c.Assert(decodeInt([]byte(valToStr(c, iter))), Equals, i*indexStep)
iter.Close()
}
// Test iterator Next()
for i := startIndex; i < testCount-1; i++ {
val := encodeInt(i * indexStep)
iter, err := buffer.Iter(val, nil)
c.Assert(err, IsNil)
c.Assert([]byte(iter.Key()), BytesEquals, val)
c.Assert(valToStr(c, iter), Equals, string(val))
err = iter.Next()
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsTrue)
val = encodeInt((i + 1) * indexStep)
c.Assert([]byte(iter.Key()), BytesEquals, val)
c.Assert(valToStr(c, iter), Equals, string(val))
iter.Close()
}
// Non exist and beyond maximum seek test
iter, err := buffer.Iter(encodeInt(testCount*indexStep), nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsFalse)
// Non exist but between existing keys seek test,
// it returns the smallest key that larger than the one we are seeking
inBetween := encodeInt((testCount-1)*indexStep - 1)
last := encodeInt((testCount - 1) * indexStep)
iter, err = buffer.Iter(inBetween, nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsTrue)
c.Assert([]byte(iter.Key()), Not(BytesEquals), inBetween)
c.Assert([]byte(iter.Key()), BytesEquals, last)
iter.Close()
}
func mustGet(c *C, buffer MemBuffer) {
for i := startIndex; i < testCount; i++ {
s := encodeInt(i * indexStep)
val, err := buffer.Get(s)
c.Assert(err, IsNil)
c.Assert(string(val), Equals, string(s))
}
}
func (s *testKVSuite) TestGetSet(c *C) {
for _, buffer := range s.bs {
insertData(c, buffer)
mustGet(c, buffer)
}
s.ResetMembuffers()
}
func (s *testKVSuite) TestNewIterator(c *C) {
for _, buffer := range s.bs {
// should be invalid
iter, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)
c.Assert(iter.Valid(), IsFalse)
insertData(c, buffer)
checkNewIterator(c, buffer)
}
s.ResetMembuffers()
}
func (s *testKVSuite) TestBasicNewIterator(c *C) {
for _, buffer := range s.bs {
it, err := buffer.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
c.Assert(it.Valid(), IsFalse)
}
}
func (s *testKVSuite) TestNewIteratorMin(c *C) {
kvs := []struct {
key string
value string
}{
{"DATA_test_main_db_tbl_tbl_test_record__00000000000000000001", "lock-version"},
{"DATA_test_main_db_tbl_tbl_test_record__00000000000000000001_0002", "1"},
{"DATA_test_main_db_tbl_tbl_test_record__00000000000000000001_0003", "hello"},
{"DATA_test_main_db_tbl_tbl_test_record__00000000000000000002", "lock-version"},
{"DATA_test_main_db_tbl_tbl_test_record__00000000000000000002_0002", "2"},
{"DATA_test_main_db_tbl_tbl_test_record__00000000000000000002_0003", "hello"},
}
for _, buffer := range s.bs {
for _, kv := range kvs {
buffer.Set([]byte(kv.key), []byte(kv.value))
}
cnt := 0
it, err := buffer.Iter(nil, nil)
c.Assert(err, IsNil)
for it.Valid() {
cnt++
it.Next()
}
c.Assert(cnt, Equals, 6)
it, err = buffer.Iter([]byte("DATA_test_main_db_tbl_tbl_test_record__00000000000000000000"), nil)
c.Assert(err, IsNil)
c.Assert(string(it.Key()), Equals, "DATA_test_main_db_tbl_tbl_test_record__00000000000000000001")
}
s.ResetMembuffers()
}
func (s *testKVSuite) TestBufferLimit(c *C) {
buffer := NewMemDbBuffer(DefaultTxnMembufCap).(*memDbBuffer)
buffer.bufferSizeLimit = 1000
buffer.entrySizeLimit = 500
err := buffer.Set([]byte("x"), make([]byte, 500))
c.Assert(err, NotNil) // entry size limit
err = buffer.Set([]byte("x"), make([]byte, 499))
c.Assert(err, IsNil)
err = buffer.Set([]byte("yz"), make([]byte, 499))
c.Assert(err, NotNil) // buffer size limit
buffer = NewMemDbBuffer(DefaultTxnMembufCap).(*memDbBuffer)
buffer.bufferLenLimit = 10
for i := 0; i < 10; i++ {
err = buffer.Set([]byte{byte(i)}, []byte{byte(i)})
c.Assert(err, IsNil)
}
err = buffer.Set([]byte("x"), []byte("y"))
c.Assert(err, NotNil) // buffer len limit
}
var opCnt = 100000
func BenchmarkMemDbBufferSequential(b *testing.B) {
data := make([][]byte, opCnt)
for i := 0; i < opCnt; i++ {
data[i] = encodeInt(i)
}
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
benchmarkSetGet(b, buffer, data)
b.ReportAllocs()
}
func BenchmarkMemDbBufferRandom(b *testing.B) {
data := make([][]byte, opCnt)
for i := 0; i < opCnt; i++ {
data[i] = encodeInt(i)
}
shuffle(data)
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
benchmarkSetGet(b, buffer, data)
b.ReportAllocs()
}
func BenchmarkMemDbIter(b *testing.B) {
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
benchIterator(b, buffer)
b.ReportAllocs()
}
func BenchmarkMemDbCreation(b *testing.B) {
for i := 0; i < b.N; i++ {
NewMemDbBuffer(DefaultTxnMembufCap)
}
b.ReportAllocs()
}
func shuffle(slc [][]byte) {
N := len(slc)
for i := 0; i < N; i++ {
// choose index uniformly in [i, N-1]
r := i + rand.Intn(N-i)
slc[r], slc[i] = slc[i], slc[r]
}
}
func benchmarkSetGet(b *testing.B, buffer MemBuffer, data [][]byte) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, k := range data {
buffer.Set(k, k)
}
for _, k := range data {
buffer.Get(k)
}
}
}
func benchIterator(b *testing.B, buffer MemBuffer) {
for k := 0; k < opCnt; k++ {
buffer.Set(encodeInt(k), encodeInt(k))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
iter, err := buffer.Iter(nil, nil)
if err != nil {
b.Error(err)
}
for iter.Valid() {
iter.Next()
}
iter.Close()
}
}

179
txnkv/kv/memdb_buffer.go Normal file
View File

@ -0,0 +1,179 @@
// Copyright 2015 PingCAP, Inc.
//
// Copyright 2015 Wenbin Xiao
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"fmt"
"sync/atomic"
"github.com/pingcap/errors"
"github.com/pingcap/goleveldb/leveldb"
"github.com/pingcap/goleveldb/leveldb/comparer"
"github.com/pingcap/goleveldb/leveldb/iterator"
"github.com/pingcap/goleveldb/leveldb/memdb"
"github.com/pingcap/goleveldb/leveldb/util"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/key"
)
// memDbBuffer implements the MemBuffer interface.
type memDbBuffer struct {
db *memdb.DB
entrySizeLimit int
bufferLenLimit uint64
bufferSizeLimit int
}
type memDbIter struct {
iter iterator.Iterator
reverse bool
}
// NewMemDbBuffer creates a new memDbBuffer.
func NewMemDbBuffer(cap int) MemBuffer {
return &memDbBuffer{
db: memdb.New(comparer.DefaultComparer, cap),
entrySizeLimit: config.TxnEntrySizeLimit,
bufferLenLimit: atomic.LoadUint64(&config.TxnEntryCountLimit),
bufferSizeLimit: config.TxnTotalSizeLimit,
}
}
// Iter creates an Iterator.
func (m *memDbBuffer) Iter(k key.Key, upperBound key.Key) (Iterator, error) {
i := &memDbIter{iter: m.db.NewIterator(&util.Range{Start: []byte(k), Limit: []byte(upperBound)}), reverse: false}
err := i.Next()
if err != nil {
return nil, errors.Trace(err)
}
return i, nil
}
func (m *memDbBuffer) SetCap(cap int) {
}
func (m *memDbBuffer) IterReverse(k key.Key) (Iterator, error) {
var i *memDbIter
if k == nil {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{}), reverse: true}
} else {
i = &memDbIter{iter: m.db.NewIterator(&util.Range{Limit: []byte(k)}), reverse: true}
}
i.iter.Last()
return i, nil
}
// Get returns the value associated with key.
func (m *memDbBuffer) Get(k key.Key) ([]byte, error) {
v, err := m.db.Get(k)
if err == leveldb.ErrNotFound {
return nil, ErrNotExist
}
return v, nil
}
// Set associates key with value.
func (m *memDbBuffer) Set(k key.Key, v []byte) error {
if len(v) == 0 {
return errors.Trace(ErrCannotSetNilValue)
}
if len(k)+len(v) > m.entrySizeLimit {
return errors.WithMessage(ErrEntryTooLarge, fmt.Sprintf("entry too large, size: %d", len(k)+len(v)))
}
err := m.db.Put(k, v)
if m.Size() > m.bufferSizeLimit {
return errors.WithMessage(ErrTxnTooLarge, fmt.Sprintf("transaction too large, size:%d", m.Size()))
}
if m.Len() > int(m.bufferLenLimit) {
return errors.WithMessage(ErrTxnTooLarge, fmt.Sprintf("transaction too large, size:%d", m.Size()))
}
return errors.Trace(err)
}
// Delete removes the entry from buffer with provided key.
func (m *memDbBuffer) Delete(k key.Key) error {
err := m.db.Put(k, nil)
return errors.Trace(err)
}
// Size returns sum of keys and values length.
func (m *memDbBuffer) Size() int {
return m.db.Size()
}
// Len returns the number of entries in the DB.
func (m *memDbBuffer) Len() int {
return m.db.Len()
}
// Reset cleanup the MemBuffer.
func (m *memDbBuffer) Reset() {
m.db.Reset()
}
// Next implements the Iterator Next.
func (i *memDbIter) Next() error {
if i.reverse {
i.iter.Prev()
} else {
i.iter.Next()
}
return nil
}
// Valid implements the Iterator Valid.
func (i *memDbIter) Valid() bool {
return i.iter.Valid()
}
// Key implements the Iterator Key.
func (i *memDbIter) Key() key.Key {
return i.iter.Key()
}
// Value implements the Iterator Value.
func (i *memDbIter) Value() []byte {
return i.iter.Value()
}
// Close Implements the Iterator Close.
func (i *memDbIter) Close() {
i.iter.Release()
}
// WalkMemBuffer iterates all buffered kv pairs in memBuf
func WalkMemBuffer(memBuf MemBuffer, f func(k key.Key, v []byte) error) error {
iter, err := memBuf.Iter(nil, nil)
if err != nil {
return errors.Trace(err)
}
defer iter.Close()
for iter.Valid() {
if err = f(iter.Key(), iter.Value()); err != nil {
return errors.Trace(err)
}
err = iter.Next()
if err != nil {
return errors.Trace(err)
}
}
return nil
}

54
txnkv/kv/mock.go Normal file
View File

@ -0,0 +1,54 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"github.com/pingcap/errors"
"github.com/tikv/client-go/key"
)
type mockSnapshot struct {
store MemBuffer
}
func (s *mockSnapshot) Get(k key.Key) ([]byte, error) {
return s.store.Get(k)
}
func (s *mockSnapshot) SetPriority(priority int) {
}
func (s *mockSnapshot) BatchGet(keys []key.Key) (map[string][]byte, error) {
m := make(map[string][]byte)
for _, k := range keys {
v, err := s.store.Get(k)
if IsErrNotFound(err) {
continue
}
if err != nil {
return nil, errors.Trace(err)
}
m[string(k)] = v
}
return m, nil
}
func (s *mockSnapshot) Iter(k key.Key, upperBound key.Key) (Iterator, error) {
return s.store.Iter(k, upperBound)
}
func (s *mockSnapshot) IterReverse(k key.Key) (Iterator, error) {
return s.store.IterReverse(k)
}

185
txnkv/kv/union_iter.go Normal file
View File

@ -0,0 +1,185 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"github.com/pingcap/errors"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/key"
)
// UnionIter is the iterator on an UnionStore.
type UnionIter struct {
dirtyIt Iterator
snapshotIt Iterator
dirtyValid bool
snapshotValid bool
curIsDirty bool
isValid bool
reverse bool
}
// NewUnionIter returns a union iterator for BufferStore.
func NewUnionIter(dirtyIt Iterator, snapshotIt Iterator, reverse bool) (*UnionIter, error) {
it := &UnionIter{
dirtyIt: dirtyIt,
snapshotIt: snapshotIt,
dirtyValid: dirtyIt.Valid(),
snapshotValid: snapshotIt.Valid(),
reverse: reverse,
}
err := it.updateCur()
if err != nil {
return nil, errors.Trace(err)
}
return it, nil
}
// dirtyNext makes iter.dirtyIt go and update valid status.
func (iter *UnionIter) dirtyNext() error {
err := iter.dirtyIt.Next()
iter.dirtyValid = iter.dirtyIt.Valid()
return errors.Trace(err)
}
// snapshotNext makes iter.snapshotIt go and update valid status.
func (iter *UnionIter) snapshotNext() error {
err := iter.snapshotIt.Next()
iter.snapshotValid = iter.snapshotIt.Valid()
return errors.Trace(err)
}
func (iter *UnionIter) updateCur() error {
iter.isValid = true
for {
if !iter.dirtyValid && !iter.snapshotValid {
iter.isValid = false
break
}
if !iter.dirtyValid {
iter.curIsDirty = false
break
}
if !iter.snapshotValid {
iter.curIsDirty = true
// if delete it
if len(iter.dirtyIt.Value()) == 0 {
if err := iter.dirtyNext(); err != nil {
return errors.Trace(err)
}
continue
}
break
}
// both valid
if iter.snapshotValid && iter.dirtyValid {
snapshotKey := iter.snapshotIt.Key()
dirtyKey := iter.dirtyIt.Key()
cmp := dirtyKey.Cmp(snapshotKey)
if iter.reverse {
cmp = -cmp
}
// if equal, means both have value
if cmp == 0 {
if len(iter.dirtyIt.Value()) == 0 {
// snapshot has a record, but txn says we have deleted it
// just go next
if err := iter.dirtyNext(); err != nil {
return errors.Trace(err)
}
if err := iter.snapshotNext(); err != nil {
return errors.Trace(err)
}
continue
}
if err := iter.snapshotNext(); err != nil {
return errors.Trace(err)
}
iter.curIsDirty = true
break
} else if cmp > 0 {
// record from snapshot comes first
iter.curIsDirty = false
break
} else {
// record from dirty comes first
if len(iter.dirtyIt.Value()) == 0 {
log.Warnf("[kv] delete a record not exists? k = %q", iter.dirtyIt.Key())
// jump over this deletion
if err := iter.dirtyNext(); err != nil {
return errors.Trace(err)
}
continue
}
iter.curIsDirty = true
break
}
}
}
return nil
}
// Next implements the Iterator Next interface.
func (iter *UnionIter) Next() error {
var err error
if !iter.curIsDirty {
err = iter.snapshotNext()
} else {
err = iter.dirtyNext()
}
if err != nil {
return errors.Trace(err)
}
err = iter.updateCur()
return errors.Trace(err)
}
// Value implements the Iterator Value interface.
// Multi columns
func (iter *UnionIter) Value() []byte {
if !iter.curIsDirty {
return iter.snapshotIt.Value()
}
return iter.dirtyIt.Value()
}
// Key implements the Iterator Key interface.
func (iter *UnionIter) Key() key.Key {
if !iter.curIsDirty {
return iter.snapshotIt.Key()
}
return iter.dirtyIt.Key()
}
// Valid implements the Iterator Valid interface.
func (iter *UnionIter) Valid() bool {
return iter.isValid
}
// Close implements the Iterator Close interface.
func (iter *UnionIter) Close() {
if iter.snapshotIt != nil {
iter.snapshotIt.Close()
iter.snapshotIt = nil
}
if iter.dirtyIt != nil {
iter.dirtyIt.Close()
iter.dirtyIt = nil
}
}

249
txnkv/kv/union_store.go Normal file
View File

@ -0,0 +1,249 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
"github.com/pingcap/errors"
"github.com/tikv/client-go/key"
)
// UnionStore is a store that wraps a snapshot for read and a BufferStore for buffered write.
// Also, it provides some transaction related utilities.
type UnionStore interface {
MemBuffer
// Returns related condition pair
LookupConditionPair(k key.Key) *conditionPair
// WalkBuffer iterates all buffered kv pairs.
WalkBuffer(f func(k key.Key, v []byte) error) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
// DelOption deletes an option.
DelOption(opt Option)
// GetOption gets an option.
GetOption(opt Option) interface{}
// GetMemBuffer return the MemBuffer binding to this UnionStore.
GetMemBuffer() MemBuffer
}
// Option is used for customizing kv store's behaviors during a transaction.
type Option int
// Options is an interface of a set of options. Each option is associated with a value.
type Options interface {
// Get gets an option value.
Get(opt Option) (v interface{}, ok bool)
}
// conditionPair is used to store lazy check condition.
// If condition not match (value is not equal as expected one), returns err.
type conditionPair struct {
key key.Key
value []byte
err error
}
func (c *conditionPair) ShouldNotExist() bool {
return len(c.value) == 0
}
func (c *conditionPair) Err() error {
return c.err
}
// unionStore is an in-memory Store which contains a buffer for write and a
// snapshot for read.
type unionStore struct {
*BufferStore
snapshot Snapshot // for read
lazyConditionPairs map[string]*conditionPair // for delay check
opts options
}
// NewUnionStore builds a new UnionStore.
func NewUnionStore(snapshot Snapshot) UnionStore {
return &unionStore{
BufferStore: NewBufferStore(snapshot, DefaultTxnMembufCap),
snapshot: snapshot,
lazyConditionPairs: make(map[string]*conditionPair),
opts: make(map[Option]interface{}),
}
}
// invalidIterator implements Iterator interface.
// It is used for read-only transaction which has no data written, the iterator is always invalid.
type invalidIterator struct{}
func (it invalidIterator) Valid() bool {
return false
}
func (it invalidIterator) Next() error {
return nil
}
func (it invalidIterator) Key() key.Key {
return nil
}
func (it invalidIterator) Value() []byte {
return nil
}
func (it invalidIterator) Close() {}
// lazyMemBuffer wraps a MemBuffer which is to be initialized when it is modified.
type lazyMemBuffer struct {
mb MemBuffer
cap int
}
func (lmb *lazyMemBuffer) Get(k key.Key) ([]byte, error) {
if lmb.mb == nil {
return nil, ErrNotExist
}
return lmb.mb.Get(k)
}
func (lmb *lazyMemBuffer) Set(key key.Key, value []byte) error {
if lmb.mb == nil {
lmb.mb = NewMemDbBuffer(lmb.cap)
}
return lmb.mb.Set(key, value)
}
func (lmb *lazyMemBuffer) Delete(k key.Key) error {
if lmb.mb == nil {
lmb.mb = NewMemDbBuffer(lmb.cap)
}
return lmb.mb.Delete(k)
}
func (lmb *lazyMemBuffer) Iter(k key.Key, upperBound key.Key) (Iterator, error) {
if lmb.mb == nil {
return invalidIterator{}, nil
}
return lmb.mb.Iter(k, upperBound)
}
func (lmb *lazyMemBuffer) IterReverse(k key.Key) (Iterator, error) {
if lmb.mb == nil {
return invalidIterator{}, nil
}
return lmb.mb.IterReverse(k)
}
func (lmb *lazyMemBuffer) Size() int {
if lmb.mb == nil {
return 0
}
return lmb.mb.Size()
}
func (lmb *lazyMemBuffer) Len() int {
if lmb.mb == nil {
return 0
}
return lmb.mb.Len()
}
func (lmb *lazyMemBuffer) Reset() {
if lmb.mb != nil {
lmb.mb.Reset()
}
}
func (lmb *lazyMemBuffer) SetCap(cap int) {
lmb.cap = cap
}
// Get implements the Retriever interface.
func (us *unionStore) Get(k key.Key) ([]byte, error) {
v, err := us.MemBuffer.Get(k)
if IsErrNotFound(err) {
if _, ok := us.opts.Get(PresumeKeyNotExists); ok {
e, ok := us.opts.Get(PresumeKeyNotExistsError)
if ok && e != nil {
us.markLazyConditionPair(k, nil, e.(error))
} else {
us.markLazyConditionPair(k, nil, ErrKeyExists)
}
return nil, ErrNotExist
}
v, err = us.BufferStore.r.Get(k)
}
if err != nil {
return v, errors.Trace(err)
}
if len(v) == 0 {
return nil, ErrNotExist
}
return v, nil
}
// markLazyConditionPair marks a kv pair for later check.
// If condition not match, should return e as error.
func (us *unionStore) markLazyConditionPair(k key.Key, v []byte, e error) {
us.lazyConditionPairs[string(k)] = &conditionPair{
key: k.Clone(),
value: v,
err: e,
}
}
func (us *unionStore) LookupConditionPair(k key.Key) *conditionPair {
if c, ok := us.lazyConditionPairs[string(k)]; ok {
return c
}
return nil
}
// SetOption implements the UnionStore SetOption interface.
func (us *unionStore) SetOption(opt Option, val interface{}) {
us.opts[opt] = val
}
// DelOption implements the UnionStore DelOption interface.
func (us *unionStore) DelOption(opt Option) {
delete(us.opts, opt)
}
// GetOption implements the UnionStore GetOption interface.
func (us *unionStore) GetOption(opt Option) interface{} {
return us.opts[opt]
}
// GetMemBuffer return the MemBuffer binding to this UnionStore.
func (us *unionStore) GetMemBuffer() MemBuffer {
return us.BufferStore.MemBuffer
}
// SetCap sets membuffer capability.
func (us *unionStore) SetCap(cap int) {
us.BufferStore.SetCap(cap)
}
func (us *unionStore) Reset() {
us.BufferStore.Reset()
}
type options map[Option]interface{}
func (opts options) Get(opt Option) (interface{}, bool) {
v, ok := opts[opt]
return v, ok
}

View File

@ -0,0 +1,142 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package kv
import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
)
var _ = Suite(&testUnionStoreSuite{})
type testUnionStoreSuite struct {
store MemBuffer
us UnionStore
}
func (s *testUnionStoreSuite) SetUpTest(c *C) {
s.store = NewMemDbBuffer(DefaultTxnMembufCap)
s.us = NewUnionStore(&mockSnapshot{s.store})
}
func (s *testUnionStoreSuite) TestGetSet(c *C) {
s.store.Set([]byte("1"), []byte("1"))
v, err := s.us.Get([]byte("1"))
c.Assert(err, IsNil)
c.Assert(v, BytesEquals, []byte("1"))
s.us.Set([]byte("1"), []byte("2"))
v, err = s.us.Get([]byte("1"))
c.Assert(err, IsNil)
c.Assert(v, BytesEquals, []byte("2"))
}
func (s *testUnionStoreSuite) TestDelete(c *C) {
s.store.Set([]byte("1"), []byte("1"))
err := s.us.Delete([]byte("1"))
c.Assert(err, IsNil)
_, err = s.us.Get([]byte("1"))
c.Assert(IsErrNotFound(err), IsTrue)
s.us.Set([]byte("1"), []byte("2"))
v, err := s.us.Get([]byte("1"))
c.Assert(err, IsNil)
c.Assert(v, BytesEquals, []byte("2"))
}
func (s *testUnionStoreSuite) TestSeek(c *C) {
s.store.Set([]byte("1"), []byte("1"))
s.store.Set([]byte("2"), []byte("2"))
s.store.Set([]byte("3"), []byte("3"))
iter, err := s.us.Iter(nil, nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("1"), []byte("2"), []byte("3")}, [][]byte{[]byte("1"), []byte("2"), []byte("3")})
iter, err = s.us.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("3")}, [][]byte{[]byte("2"), []byte("3")})
s.us.Set([]byte("4"), []byte("4"))
iter, err = s.us.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("3"), []byte("4")}, [][]byte{[]byte("2"), []byte("3"), []byte("4")})
s.us.Delete([]byte("3"))
iter, err = s.us.Iter([]byte("2"), nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("4")}, [][]byte{[]byte("2"), []byte("4")})
}
func (s *testUnionStoreSuite) TestIterReverse(c *C) {
s.store.Set([]byte("1"), []byte("1"))
s.store.Set([]byte("2"), []byte("2"))
s.store.Set([]byte("3"), []byte("3"))
iter, err := s.us.IterReverse(nil)
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("3"), []byte("2"), []byte("1")}, [][]byte{[]byte("3"), []byte("2"), []byte("1")})
iter, err = s.us.IterReverse([]byte("3"))
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("1")}, [][]byte{[]byte("2"), []byte("1")})
s.us.Set([]byte("0"), []byte("0"))
iter, err = s.us.IterReverse([]byte("3"))
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("1"), []byte("0")}, [][]byte{[]byte("2"), []byte("1"), []byte("0")})
s.us.Delete([]byte("1"))
iter, err = s.us.IterReverse([]byte("3"))
c.Assert(err, IsNil)
checkIterator(c, iter, [][]byte{[]byte("2"), []byte("0")}, [][]byte{[]byte("2"), []byte("0")})
}
func (s *testUnionStoreSuite) TestLazyConditionCheck(c *C) {
s.store.Set([]byte("1"), []byte("1"))
s.store.Set([]byte("2"), []byte("2"))
v, err := s.us.Get([]byte("1"))
c.Assert(err, IsNil)
c.Assert(v, BytesEquals, []byte("1"))
s.us.SetOption(PresumeKeyNotExists, nil)
s.us.SetOption(PresumeKeyNotExistsError, ErrNotExist)
_, err = s.us.Get([]byte("2"))
c.Assert(errors.Cause(err) == ErrNotExist, IsTrue, Commentf("err %v", err))
condionPair1 := s.us.LookupConditionPair([]byte("1"))
c.Assert(condionPair1, IsNil)
condionPair2 := s.us.LookupConditionPair([]byte("2"))
c.Assert(condionPair2, NotNil)
c.Assert(condionPair2.ShouldNotExist(), IsTrue)
err2 := s.us.LookupConditionPair([]byte("2")).Err()
c.Assert(errors.Cause(err) == ErrNotExist, IsTrue, Commentf("err %v", err2))
}
func checkIterator(c *C, iter Iterator, keys [][]byte, values [][]byte) {
defer iter.Close()
c.Assert(len(keys), Equals, len(values))
for i, k := range keys {
v := values[i]
c.Assert(iter.Valid(), IsTrue)
c.Assert([]byte(iter.Key()), BytesEquals, k)
c.Assert(iter.Value(), BytesEquals, v)
c.Assert(iter.Next(), IsNil)
}
c.Assert(iter.Valid(), IsFalse)
}

301
txnkv/latch/latch.go Normal file
View File

@ -0,0 +1,301 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package latch
import (
"bytes"
"math/bits"
"sort"
"sync"
"time"
"github.com/cznic/mathutil"
log "github.com/sirupsen/logrus"
"github.com/spaolacci/murmur3"
)
type node struct {
slotID int
key []byte
maxCommitTS uint64
value *Lock
next *node
}
// latch stores a key's waiting transactions information.
type latch struct {
queue *node
count int
waiting []*Lock
sync.Mutex
}
// Lock is the locks' information required for a transaction.
type Lock struct {
keys [][]byte
// requiredSlots represents required slots.
// The slot IDs of the latches(keys) that a startTS must acquire before being able to processed.
requiredSlots []int
// acquiredCount represents the number of latches that the transaction has acquired.
// For status is stale, it includes the latch whose front is current lock already.
acquiredCount int
// startTS represents current transaction's.
startTS uint64
// commitTS represents current transaction's.
commitTS uint64
wg sync.WaitGroup
isStale bool
}
// acquireResult is the result type for acquire()
type acquireResult int32
const (
// acquireSuccess is a type constant for acquireResult.
// which means acquired success
acquireSuccess acquireResult = iota
// acquireLocked is a type constant for acquireResult
// which means still locked by other Lock.
acquireLocked
// acquireStale is a type constant for acquireResult
// which means current Lock's startTS is stale.
acquireStale
)
// IsStale returns whether the status is stale.
func (l *Lock) IsStale() bool {
return l.isStale
}
func (l *Lock) isLocked() bool {
return !l.isStale && l.acquiredCount != len(l.requiredSlots)
}
// SetCommitTS sets the lock's commitTS.
func (l *Lock) SetCommitTS(commitTS uint64) {
l.commitTS = commitTS
}
// Latches which are used for concurrency control.
// Each latch is indexed by a slot's ID, hence the term latch and slot are used in interchangeable,
// but conceptually a latch is a queue, and a slot is an index to the queue
type Latches struct {
slots []latch
}
type bytesSlice [][]byte
func (s bytesSlice) Len() int {
return len(s)
}
func (s bytesSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s bytesSlice) Less(i, j int) bool {
return bytes.Compare(s[i], s[j]) < 0
}
// NewLatches create a Latches with fixed length,
// the size will be rounded up to the power of 2.
func NewLatches(size uint) *Latches {
powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1)))
slots := make([]latch, powerOfTwoSize)
return &Latches{
slots: slots,
}
}
// genLock generates Lock for the transaction with startTS and keys.
func (latches *Latches) genLock(startTS uint64, keys [][]byte) *Lock {
sort.Sort(bytesSlice(keys))
return &Lock{
keys: keys,
requiredSlots: latches.genSlotIDs(keys),
acquiredCount: 0,
startTS: startTS,
}
}
func (latches *Latches) genSlotIDs(keys [][]byte) []int {
slots := make([]int, 0, len(keys))
for _, key := range keys {
slots = append(slots, latches.slotID(key))
}
return slots
}
// slotID return slotID for current key.
func (latches *Latches) slotID(key []byte) int {
return int(murmur3.Sum32(key)) & (len(latches.slots) - 1)
}
// acquire tries to acquire the lock for a transaction.
func (latches *Latches) acquire(lock *Lock) acquireResult {
if lock.IsStale() {
return acquireStale
}
for lock.acquiredCount < len(lock.requiredSlots) {
status := latches.acquireSlot(lock)
if status != acquireSuccess {
return status
}
}
return acquireSuccess
}
// release releases all latches owned by the `lock` and returns the wakeup list.
// Preconditions: the caller must ensure the transaction's status is not locked.
func (latches *Latches) release(lock *Lock, wakeupList []*Lock) []*Lock {
wakeupList = wakeupList[:0]
for lock.acquiredCount > 0 {
if nextLock := latches.releaseSlot(lock); nextLock != nil {
wakeupList = append(wakeupList, nextLock)
}
}
return wakeupList
}
func (latches *Latches) releaseSlot(lock *Lock) (nextLock *Lock) {
key := lock.keys[lock.acquiredCount-1]
slotID := lock.requiredSlots[lock.acquiredCount-1]
latch := &latches.slots[slotID]
lock.acquiredCount--
latch.Lock()
defer latch.Unlock()
find := findNode(latch.queue, key)
if find.value != lock {
panic("releaseSlot wrong")
}
find.maxCommitTS = mathutil.MaxUint64(find.maxCommitTS, lock.commitTS)
find.value = nil
// Make a copy of the key, so latch does not reference the transaction's memory.
// If we do not do it, transaction memory can't be recycle by GC and there will
// be a leak.
copyKey := make([]byte, len(find.key))
copy(copyKey, find.key)
find.key = copyKey
if len(latch.waiting) == 0 {
return nil
}
var idx int
for idx = 0; idx < len(latch.waiting); idx++ {
waiting := latch.waiting[idx]
if bytes.Equal(waiting.keys[waiting.acquiredCount], key) {
break
}
}
// Wake up the first one in waiting queue.
if idx < len(latch.waiting) {
nextLock = latch.waiting[idx]
// Delete element latch.waiting[idx] from the array.
copy(latch.waiting[idx:], latch.waiting[idx+1:])
latch.waiting[len(latch.waiting)-1] = nil
latch.waiting = latch.waiting[:len(latch.waiting)-1]
if find.maxCommitTS > nextLock.startTS {
find.value = nextLock
nextLock.acquiredCount++
nextLock.isStale = true
}
}
return
}
func (latches *Latches) acquireSlot(lock *Lock) acquireResult {
key := lock.keys[lock.acquiredCount]
slotID := lock.requiredSlots[lock.acquiredCount]
latch := &latches.slots[slotID]
latch.Lock()
defer latch.Unlock()
// Try to recycle to limit the memory usage.
if latch.count >= latchListCount {
latch.recycle(lock.startTS)
}
find := findNode(latch.queue, key)
if find == nil {
tmp := &node{
slotID: slotID,
key: key,
value: lock,
}
tmp.next = latch.queue
latch.queue = tmp
latch.count++
lock.acquiredCount++
return acquireSuccess
}
if find.maxCommitTS > lock.startTS {
lock.isStale = true
return acquireStale
}
if find.value == nil {
find.value = lock
lock.acquiredCount++
return acquireSuccess
}
// Push the current transaction into waitingQueue.
latch.waiting = append(latch.waiting, lock)
return acquireLocked
}
// recycle is not thread safe, the latch should acquire its lock before executing this function.
func (l *latch) recycle(currentTS uint64) int {
total := 0
fakeHead := node{next: l.queue}
prev := &fakeHead
for curr := prev.next; curr != nil; curr = curr.next {
if tsoSub(currentTS, curr.maxCommitTS) >= expireDuration && curr.value == nil {
l.count--
prev.next = curr.next
total++
} else {
prev = curr
}
}
l.queue = fakeHead.next
return total
}
func (latches *Latches) recycle(currentTS uint64) {
total := 0
for i := 0; i < len(latches.slots); i++ {
latch := &latches.slots[i]
latch.Lock()
total += latch.recycle(currentTS)
latch.Unlock()
}
log.Debugf("recycle run at %v, recycle count = %d...\n", time.Now(), total)
}
func findNode(list *node, key []byte) *node {
for n := list; n != nil; n = n.next {
if bytes.Equal(n.key, key) {
return n
}
}
return nil
}

152
txnkv/latch/latch_test.go Normal file
View File

@ -0,0 +1,152 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package latch
import (
"sync/atomic"
"testing"
"time"
. "github.com/pingcap/check"
"github.com/tikv/client-go/txnkv/oracle"
)
func TestT(t *testing.T) {
TestingT(t)
}
var _ = Suite(&testLatchSuite{})
var baseTso uint64
type testLatchSuite struct {
latches *Latches
}
func (s *testLatchSuite) SetUpTest(c *C) {
s.latches = NewLatches(256)
}
func (s *testLatchSuite) newLock(keys [][]byte) (startTS uint64, lock *Lock) {
startTS = getTso()
lock = s.latches.genLock(startTS, keys)
return
}
func getTso() uint64 {
return atomic.AddUint64(&baseTso, uint64(1))
}
func (s *testLatchSuite) TestWakeUp(c *C) {
keysA := [][]byte{
[]byte("a"), []byte("b"), []byte("c")}
_, lockA := s.newLock(keysA)
keysB := [][]byte{[]byte("d"), []byte("e"), []byte("a"), []byte("c")}
startTSB, lockB := s.newLock(keysB)
// A acquire lock success.
result := s.latches.acquire(lockA)
c.Assert(result, Equals, acquireSuccess)
// B acquire lock failed.
result = s.latches.acquire(lockB)
c.Assert(result, Equals, acquireLocked)
// A release lock, and get wakeup list.
commitTSA := getTso()
wakeupList := make([]*Lock, 0)
lockA.SetCommitTS(commitTSA)
wakeupList = s.latches.release(lockA, wakeupList)
c.Assert(wakeupList[0].startTS, Equals, startTSB)
// B acquire failed since startTSB has stale for some keys.
result = s.latches.acquire(lockB)
c.Assert(result, Equals, acquireStale)
// B release lock since it received a stale.
wakeupList = s.latches.release(lockB, wakeupList)
c.Assert(wakeupList, HasLen, 0)
// B restart:get a new startTS.
startTSB = getTso()
lockB = s.latches.genLock(startTSB, keysB)
result = s.latches.acquire(lockB)
c.Assert(result, Equals, acquireSuccess)
}
func (s *testLatchSuite) TestFirstAcquireFailedWithStale(c *C) {
keys := [][]byte{
[]byte("a"), []byte("b"), []byte("c")}
_, lockA := s.newLock(keys)
startTSB, lockB := s.newLock(keys)
// acquire lockA success
result := s.latches.acquire(lockA)
c.Assert(result, Equals, acquireSuccess)
// release lockA
commitTSA := getTso()
wakeupList := make([]*Lock, 0)
lockA.SetCommitTS(commitTSA)
s.latches.release(lockA, wakeupList)
c.Assert(commitTSA, Greater, startTSB)
// acquire lockB first time, should be failed with stale since commitTSA > startTSB
result = s.latches.acquire(lockB)
c.Assert(result, Equals, acquireStale)
s.latches.release(lockB, wakeupList)
}
func (s *testLatchSuite) TestRecycle(c *C) {
latches := NewLatches(8)
now := time.Now()
startTS := oracle.ComposeTS(oracle.GetPhysical(now), 0)
lock := latches.genLock(startTS, [][]byte{
[]byte("a"), []byte("b"),
})
lock1 := latches.genLock(startTS, [][]byte{
[]byte("b"), []byte("c"),
})
c.Assert(latches.acquire(lock), Equals, acquireSuccess)
c.Assert(latches.acquire(lock1), Equals, acquireLocked)
lock.SetCommitTS(startTS + 1)
var wakeupList []*Lock
latches.release(lock, wakeupList)
// Release lock will grant latch to lock1 automatically,
// so release lock1 is called here.
latches.release(lock1, wakeupList)
lock2 := latches.genLock(startTS+3, [][]byte{
[]byte("b"), []byte("c"),
})
c.Assert(latches.acquire(lock2), Equals, acquireSuccess)
wakeupList = wakeupList[:0]
latches.release(lock2, wakeupList)
allEmpty := true
for i := 0; i < len(latches.slots); i++ {
latch := &latches.slots[i]
if latch.queue != nil {
allEmpty = false
}
}
c.Assert(allEmpty, IsFalse)
currentTS := oracle.ComposeTS(oracle.GetPhysical(now.Add(expireDuration)), 3)
latches.recycle(currentTS)
for i := 0; i < len(latches.slots); i++ {
latch := &latches.slots[i]
c.Assert(latch.queue, IsNil)
}
}

120
txnkv/latch/scheduler.go Normal file
View File

@ -0,0 +1,120 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package latch
import (
"sync"
"time"
"github.com/tikv/client-go/txnkv/oracle"
)
const lockChanSize = 100
// LatchesScheduler is used to schedule latches for transactions.
type LatchesScheduler struct {
latches *Latches
unlockCh chan *Lock
closed bool
lastRecycleTime uint64
sync.RWMutex
}
// NewScheduler create the LatchesScheduler.
func NewScheduler(size uint) *LatchesScheduler {
latches := NewLatches(size)
unlockCh := make(chan *Lock, lockChanSize)
scheduler := &LatchesScheduler{
latches: latches,
unlockCh: unlockCh,
closed: false,
}
go scheduler.run()
return scheduler
}
const expireDuration = 2 * time.Minute
const checkInterval = 1 * time.Minute
const checkCounter = 50000
const latchListCount = 5
func (scheduler *LatchesScheduler) run() {
var counter int
wakeupList := make([]*Lock, 0)
for lock := range scheduler.unlockCh {
wakeupList = scheduler.latches.release(lock, wakeupList)
if len(wakeupList) > 0 {
scheduler.wakeup(wakeupList)
}
if lock.commitTS > lock.startTS {
currentTS := lock.commitTS
elapsed := tsoSub(currentTS, scheduler.lastRecycleTime)
if elapsed > checkInterval || counter > checkCounter {
go scheduler.latches.recycle(lock.commitTS)
scheduler.lastRecycleTime = currentTS
counter = 0
}
}
counter++
}
}
func (scheduler *LatchesScheduler) wakeup(wakeupList []*Lock) {
for _, lock := range wakeupList {
if scheduler.latches.acquire(lock) != acquireLocked {
lock.wg.Done()
}
}
}
// Close closes LatchesScheduler.
func (scheduler *LatchesScheduler) Close() {
scheduler.RWMutex.Lock()
defer scheduler.RWMutex.Unlock()
if !scheduler.closed {
close(scheduler.unlockCh)
scheduler.closed = true
}
}
// Lock acquire the lock for transaction with startTS and keys. The caller goroutine
// would be blocked if the lock can't be obtained now. When this function returns,
// the lock state would be either success or stale(call lock.IsStale)
func (scheduler *LatchesScheduler) Lock(startTS uint64, keys [][]byte) *Lock {
lock := scheduler.latches.genLock(startTS, keys)
lock.wg.Add(1)
if scheduler.latches.acquire(lock) == acquireLocked {
lock.wg.Wait()
}
if lock.isLocked() {
panic("should never run here")
}
return lock
}
// UnLock unlocks a lock.
func (scheduler *LatchesScheduler) UnLock(lock *Lock) {
scheduler.RLock()
defer scheduler.RUnlock()
if !scheduler.closed {
scheduler.unlockCh <- lock
}
}
func tsoSub(ts1, ts2 uint64) time.Duration {
t1 := oracle.GetTimeFromTS(ts1)
t2 := oracle.GetTimeFromTS(ts2)
return t1.Sub(t2)
}

View File

@ -0,0 +1,94 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package latch
import (
"bytes"
"math/rand"
"sync"
"time"
. "github.com/pingcap/check"
)
var _ = Suite(&testSchedulerSuite{})
type testSchedulerSuite struct {
}
func (s *testSchedulerSuite) SetUpTest(c *C) {
}
func (s *testSchedulerSuite) TestWithConcurrency(c *C) {
sched := NewScheduler(7)
defer sched.Close()
rand.Seed(time.Now().Unix())
ch := make(chan [][]byte, 100)
const workerCount = 10
var wg sync.WaitGroup
wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go func(ch <-chan [][]byte, wg *sync.WaitGroup) {
for txn := range ch {
lock := sched.Lock(getTso(), txn)
if lock.IsStale() {
// Should restart the transaction or return error
} else {
lock.SetCommitTS(getTso())
// Do 2pc
}
sched.UnLock(lock)
}
wg.Done()
}(ch, &wg)
}
for i := 0; i < 999; i++ {
ch <- generate()
}
close(ch)
wg.Wait()
}
// generate generates something like:
// {[]byte("a"), []byte("b"), []byte("c")}
// {[]byte("a"), []byte("d"), []byte("e"), []byte("f")}
// {[]byte("e"), []byte("f"), []byte("g"), []byte("h")}
// The data should not repeat in the sequence.
func generate() [][]byte {
table := []byte{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'}
ret := make([][]byte, 0, 5)
chance := []int{100, 60, 40, 20}
for i := 0; i < len(chance); i++ {
needMore := rand.Intn(100) < chance[i]
if needMore {
randBytes := []byte{table[rand.Intn(len(table))]}
if !contains(randBytes, ret) {
ret = append(ret, randBytes)
}
}
}
return ret
}
func contains(x []byte, set [][]byte) bool {
for _, y := range set {
if bytes.Equal(x, y) {
return true
}
}
return false
}

60
txnkv/oracle/oracle.go Normal file
View File

@ -0,0 +1,60 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package oracle
import (
"context"
"time"
)
// Oracle is the interface that provides strictly ascending timestamps.
type Oracle interface {
GetTimestamp(ctx context.Context) (uint64, error)
GetTimestampAsync(ctx context.Context) Future
IsExpired(lockTimestamp uint64, TTL uint64) bool
Close()
}
// Future is a future which promises to return a timestamp.
type Future interface {
Wait() (uint64, error)
}
const physicalShiftBits = 18
// ComposeTS creates a ts from physical and logical parts.
func ComposeTS(physical, logical int64) uint64 {
return uint64((physical << physicalShiftBits) + logical)
}
// ExtractPhysical returns a ts's physical part.
func ExtractPhysical(ts uint64) int64 {
return int64(ts >> physicalShiftBits)
}
// GetPhysical returns physical from an instant time with millisecond precision.
func GetPhysical(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond)
}
// EncodeTSO encodes a millisecond into tso.
func EncodeTSO(ts int64) uint64 {
return uint64(ts) << physicalShiftBits
}
// GetTimeFromTS extracts time.Time from a timestamp.
func GetTimeFromTS(ts uint64) time.Time {
ms := ExtractPhysical(ts)
return time.Unix(ms/1e3, (ms%1e3)*1e6)
}

View File

@ -0,0 +1,72 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package oracles
import (
"context"
"sync"
"time"
"github.com/tikv/client-go/txnkv/oracle"
)
var _ oracle.Oracle = &localOracle{}
type localOracle struct {
sync.Mutex
lastTimeStampTS uint64
n uint64
}
// NewLocalOracle creates an Oracle that uses local time as data source.
func NewLocalOracle() oracle.Oracle {
return &localOracle{}
}
func (l *localOracle) IsExpired(lockTS uint64, TTL uint64) bool {
return oracle.GetPhysical(time.Now()) >= oracle.ExtractPhysical(lockTS)+int64(TTL)
}
func (l *localOracle) GetTimestamp(context.Context) (uint64, error) {
l.Lock()
defer l.Unlock()
physical := oracle.GetPhysical(time.Now())
ts := oracle.ComposeTS(physical, 0)
if l.lastTimeStampTS == ts {
l.n++
return ts + l.n, nil
}
l.lastTimeStampTS = ts
l.n = 0
return ts, nil
}
func (l *localOracle) GetTimestampAsync(ctx context.Context) oracle.Future {
return &future{
ctx: ctx,
l: l,
}
}
type future struct {
ctx context.Context
l *localOracle
}
func (f *future) Wait() (uint64, error) {
return f.l.GetTimestamp(f.ctx)
}
func (l *localOracle) Close() {
}

View File

@ -0,0 +1,52 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package oracles
import (
"context"
"testing"
"time"
)
func TestLocalOracle(t *testing.T) {
l := NewLocalOracle()
defer l.Close()
m := map[uint64]struct{}{}
for i := 0; i < 100000; i++ {
ts, err := l.GetTimestamp(context.Background())
if err != nil {
t.Error(err)
}
m[ts] = struct{}{}
}
if len(m) != 100000 {
t.Error("generated same ts")
}
}
func TestIsExpired(t *testing.T) {
o := NewLocalOracle()
defer o.Close()
ts, _ := o.GetTimestamp(context.Background())
time.Sleep(50 * time.Millisecond)
expire := o.IsExpired(uint64(ts), 40)
if !expire {
t.Error("should expired")
}
expire = o.IsExpired(uint64(ts), 200)
if expire {
t.Error("should not expired")
}
}

140
txnkv/oracle/oracles/pd.go Normal file
View File

@ -0,0 +1,140 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package oracles
import (
"context"
"sync/atomic"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/pd/client"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/metrics"
"github.com/tikv/client-go/txnkv/oracle"
)
var _ oracle.Oracle = &pdOracle{}
const slowDist = 30 * time.Millisecond
// pdOracle is an Oracle that uses a placement driver client as source.
type pdOracle struct {
c pd.Client
lastTS uint64
quit chan struct{}
}
// NewPdOracle create an Oracle that uses a pd client source.
// Refer https://github.com/pingcap/pd/blob/master/client/client.go for more details.
// PdOracle mantains `lastTS` to store the last timestamp got from PD server. If
// `GetTimestamp()` is not called after `updateInterval`, it will be called by
// itself to keep up with the timestamp on PD server.
func NewPdOracle(pdClient pd.Client, updateInterval time.Duration) (oracle.Oracle, error) {
o := &pdOracle{
c: pdClient,
quit: make(chan struct{}),
}
ctx := context.TODO()
go o.updateTS(ctx, updateInterval)
// Initialize lastTS by Get.
_, err := o.GetTimestamp(ctx)
if err != nil {
o.Close()
return nil, errors.Trace(err)
}
return o, nil
}
// IsExpired returns whether lockTS+TTL is expired, both are ms. It uses `lastTS`
// to compare, may return false negative result temporarily.
func (o *pdOracle) IsExpired(lockTS, TTL uint64) bool {
lastTS := atomic.LoadUint64(&o.lastTS)
return oracle.ExtractPhysical(lastTS) >= oracle.ExtractPhysical(lockTS)+int64(TTL)
}
// GetTimestamp gets a new increasing time.
func (o *pdOracle) GetTimestamp(ctx context.Context) (uint64, error) {
ts, err := o.getTimestamp(ctx)
if err != nil {
return 0, errors.Trace(err)
}
o.setLastTS(ts)
return ts, nil
}
type tsFuture struct {
pd.TSFuture
o *pdOracle
}
// Wait implements the oracle.Future interface.
func (f *tsFuture) Wait() (uint64, error) {
now := time.Now()
physical, logical, err := f.TSFuture.Wait()
metrics.TSFutureWaitDuration.Observe(time.Since(now).Seconds())
if err != nil {
return 0, errors.Trace(err)
}
ts := oracle.ComposeTS(physical, logical)
f.o.setLastTS(ts)
return ts, nil
}
func (o *pdOracle) GetTimestampAsync(ctx context.Context) oracle.Future {
ts := o.c.GetTSAsync(ctx)
return &tsFuture{ts, o}
}
func (o *pdOracle) getTimestamp(ctx context.Context) (uint64, error) {
now := time.Now()
physical, logical, err := o.c.GetTS(ctx)
if err != nil {
return 0, errors.Trace(err)
}
dist := time.Since(now)
if dist > slowDist {
log.Warnf("get timestamp too slow: %s", dist)
}
return oracle.ComposeTS(physical, logical), nil
}
func (o *pdOracle) setLastTS(ts uint64) {
lastTS := atomic.LoadUint64(&o.lastTS)
if ts > lastTS {
atomic.CompareAndSwapUint64(&o.lastTS, lastTS, ts)
}
}
func (o *pdOracle) updateTS(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
ts, err := o.getTimestamp(ctx)
if err != nil {
log.Errorf("updateTS error: %v", err)
break
}
o.setLastTS(ts)
case <-o.quit:
ticker.Stop()
return
}
}
}
func (o *pdOracle) Close() {
close(o.quit)
}