diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..5657f6ea --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +vendor \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..644d5626 --- /dev/null +++ b/Makefile @@ -0,0 +1,2 @@ +default: + GO111MODULE=on go build ./... \ No newline at end of file diff --git a/codec/bytes.go b/codec/bytes.go new file mode 100644 index 00000000..70850245 --- /dev/null +++ b/codec/bytes.go @@ -0,0 +1,83 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "bytes" + + "github.com/pingcap/errors" +) + +const ( + encGroupSize = 8 + encMarker = byte(0xFF) + encPad = byte(0x0) +) + +var pads = make([]byte, encGroupSize) + +// DecodeBytes decodes a TiDB encoded byte slice. +func DecodeBytes(b []byte) ([]byte, error) { + buf := make([]byte, 0, len(b)/(encGroupSize+1)*encGroupSize) + for { + if len(b) < encGroupSize+1 { + return nil, errors.New("insufficient bytes to decode value") + } + + groupBytes := b[:encGroupSize+1] + + group := groupBytes[:encGroupSize] + marker := groupBytes[encGroupSize] + + padCount := encMarker - marker + if padCount > encGroupSize { + return nil, errors.Errorf("invalid marker byte, group bytes %q", groupBytes) + } + + realGroupSize := encGroupSize - padCount + buf = append(buf, group[:realGroupSize]...) + b = b[encGroupSize+1:] + + if padCount != 0 { + // Check validity of padding bytes. + if !bytes.Equal(group[realGroupSize:], pads[:padCount]) { + return nil, errors.Errorf("invalid padding byte, group bytes %q", groupBytes) + } + break + } + } + return buf, nil +} + +// EncodeBytes encodes a byte slice into TiDB's encoded form. +func EncodeBytes(b []byte) []byte { + dLen := len(b) + reallocSize := (dLen/encGroupSize + 1) * (encGroupSize + 1) + result := make([]byte, 0, reallocSize) + for idx := 0; idx <= dLen; idx += encGroupSize { + remain := dLen - idx + padCount := 0 + if remain >= encGroupSize { + result = append(result, b[idx:idx+encGroupSize]...) + } else { + padCount = encGroupSize - remain + result = append(result, b[idx:]...) + result = append(result, pads[:padCount]...) + } + + marker := encMarker - byte(padCount) + result = append(result, marker) + } + return result +} diff --git a/codec/meta.go b/codec/meta.go new file mode 100644 index 00000000..aa76299b --- /dev/null +++ b/codec/meta.go @@ -0,0 +1,38 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package codec + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" +) + +// DecodeRegionMetaKey translates a region meta from encoded form to unencoded form. +func DecodeRegionMetaKey(r *metapb.Region) error { + if len(r.StartKey) != 0 { + decoded, err := DecodeBytes(r.StartKey) + if err != nil { + return errors.Trace(err) + } + r.StartKey = decoded + } + if len(r.EndKey) != 0 { + decoded, err := DecodeBytes(r.EndKey) + if err != nil { + return errors.Trace(err) + } + r.EndKey = decoded + } + return nil +} diff --git a/config/config.go b/config/config.go new file mode 100644 index 00000000..ee2f992a --- /dev/null +++ b/config/config.go @@ -0,0 +1,67 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" + + "github.com/pkg/errors" +) + +// Security is SSL configuration. +type Security struct { + SSLCA string `toml:"ssl-ca" json:"ssl-ca"` + SSLCert string `toml:"ssl-cert" json:"ssl-cert"` + SSLKey string `toml:"ssl-key" json:"ssl-key"` +} + +// ToTLSConfig generates tls's config based on security section of the config. +func (s *Security) ToTLSConfig() (*tls.Config, error) { + var tlsConfig *tls.Config + if len(s.SSLCA) != 0 { + var certificates = make([]tls.Certificate, 0) + if len(s.SSLCert) != 0 && len(s.SSLKey) != 0 { + // Load the client certificates from disk + certificate, err := tls.LoadX509KeyPair(s.SSLCert, s.SSLKey) + if err != nil { + return nil, errors.Errorf("could not load client key pair: %s", err) + } + certificates = append(certificates, certificate) + } + + // Create a certificate pool from the certificate authority + certPool := x509.NewCertPool() + ca, err := ioutil.ReadFile(s.SSLCA) + if err != nil { + return nil, errors.Errorf("could not read ca certificate: %s", err) + } + + // Append the certificates from the CA + if !certPool.AppendCertsFromPEM(ca) { + return nil, errors.New("failed to append ca certs") + } + + tlsConfig = &tls.Config{ + Certificates: certificates, + RootCAs: certPool, + } + } + + return tlsConfig, nil +} + +// EnableOpenTracing is the flag to enable open tracing. +var EnableOpenTracing = false diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..41d58f2b --- /dev/null +++ b/go.mod @@ -0,0 +1,54 @@ +module github.com/tikv/client-go + +require ( + github.com/BurntSushi/toml v0.3.1 // indirect + github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect + github.com/coreos/bbolt v1.3.1-coreos.6 // indirect + github.com/coreos/etcd v3.3.10+incompatible // indirect + github.com/coreos/go-semver v0.2.0 // indirect + github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142 // indirect + github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect + github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect + github.com/dustin/go-humanize v1.0.0 // indirect + github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 // indirect + github.com/ghodss/yaml v1.0.0 // indirect + github.com/gogo/protobuf v1.2.0 // indirect + github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect + github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c + github.com/gorilla/context v1.1.1 // indirect + github.com/gorilla/mux v1.6.2 // indirect + github.com/gorilla/websocket v1.4.0 // indirect + github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 + github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 + github.com/grpc-ecosystem/grpc-gateway v1.6.3 // indirect + github.com/jonboulle/clockwork v0.1.0 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect + github.com/montanaflynn/stats v0.0.0-20181214052348-945b007cb92f // indirect + github.com/opentracing/opentracing-go v1.0.2 // indirect + github.com/pingcap/check v0.0.0-20181222140913-41d022e836db // indirect + github.com/pingcap/errors v0.11.0 + github.com/pingcap/gofail v0.0.0-20181115114620-e47081505b9c // indirect + github.com/pingcap/kvproto v0.0.0-20181224100128-c884c24ef88d + github.com/pingcap/pd v2.1.2+incompatible + github.com/pkg/errors v0.8.0 + github.com/prometheus/client_golang v0.9.1 + github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect + github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 // indirect + github.com/prometheus/procfs v0.0.0-20181129180645-aa55a523dc0a // indirect + github.com/sirupsen/logrus v1.2.0 + github.com/soheilhy/cmux v0.1.4 // indirect + github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect + github.com/ugorji/go v1.1.1 // indirect + github.com/unrolled/render v0.0.0-20181210145518-4c664cb3ad2f // indirect + github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 // indirect + go.uber.org/atomic v1.3.2 // indirect + go.uber.org/multierr v1.1.0 // indirect + go.uber.org/zap v1.9.1 // indirect + golang.org/x/net v0.0.0-20181220203305-927f97764cc3 // indirect + golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 // indirect + golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect + google.golang.org/genproto v0.0.0-20181221175505-bd9b4fb69e2f // indirect + google.golang.org/grpc v1.17.0 + gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + gopkg.in/yaml.v2 v2.2.2 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..10747438 --- /dev/null +++ b/go.sum @@ -0,0 +1,146 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/bbolt v1.3.1-coreos.6 h1:uTXKg9gY70s9jMAKdfljFQcuh4e/BXOM+V+d00KFj3A= +github.com/coreos/bbolt v1.3.1-coreos.6/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= +github.com/coreos/etcd v3.3.10+incompatible h1:KjVWqrZ5U0wa3CxY2AxlH6/UcB+PK2td1DcsYhA+HRs= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142 h1:3jFq2xL4ZajGK4aZY8jz+DAF0FHjI51BXjjSwCzS1Dk= +github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 h1:clC1lXBpe2kTj2VHdaIu9ajZQe4kcEY9j0NsnDDBZ3o= +github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= +github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI= +github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff h1:kOkM9whyQYodu09SJ6W3NCsHG7crFaJILQ22Gozp3lg= +github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8= +github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= +github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk= +github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= +github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:BWIsLfhgKhV5g/oF34aRjniBHLTZe5DNekSjbAjIS6c= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.6.3 h1:oQ+8y59SMDn8Ita1Sh4f94XCUVp8AB84sppXP8Qgiow= +github.com/grpc-ecosystem/grpc-gateway v1.6.3/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= +github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= +github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/montanaflynn/stats v0.0.0-20181214052348-945b007cb92f h1:r//C+RGlxxi1gPODiDj/Y/uvv3OaZlZPSFz2SuwIees= +github.com/montanaflynn/stats v0.0.0-20181214052348-945b007cb92f/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= +github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pingcap/check v0.0.0-20181222140913-41d022e836db h1:yg93sLvBszRnzcd+Z5gkCUdYgud2scHYYxnRwljvRAM= +github.com/pingcap/check v0.0.0-20181222140913-41d022e836db/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ= +github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4= +github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= +github.com/pingcap/gofail v0.0.0-20181115114620-e47081505b9c h1:nYlcWGAYxDMdiRLjyhNJB9tMSuGaqu2M/CVd2RJx4QQ= +github.com/pingcap/gofail v0.0.0-20181115114620-e47081505b9c/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns= +github.com/pingcap/kvproto v0.0.0-20181224100128-c884c24ef88d h1:zjLW4mSw3fHsBXoDa01e9UROGnhA2Fm1Usu0oqlBIFk= +github.com/pingcap/kvproto v0.0.0-20181224100128-c884c24ef88d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/pd v2.1.2+incompatible h1:VQmYV7B/7ZdPmbDUHcz2jSswTgInrgWhAfF0YuPAlLw= +github.com/pingcap/pd v2.1.2+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E= +github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1 h1:K47Rk0v/fkEfwfQet2KWhscE0cJzjgCCDBG2KHZoVno= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20181129180645-aa55a523dc0a h1:Z2GBQ7wAiTCixJhSGK4sMO/FHYlvFvUBBK0M0FSsxeU= +github.com/prometheus/procfs v0.0.0-20181129180645-aa55a523dc0a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E= +github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 h1:lYIiVDtZnyTWlNwiAxLj0bbpTcx1BWCFhXjfsvmPdNc= +github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/ugorji/go v1.1.1 h1:gmervu+jDMvXTbcHQ0pd2wee85nEoE0BsVyEuzkfK8w= +github.com/ugorji/go v1.1.1/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ= +github.com/unrolled/render v0.0.0-20181210145518-4c664cb3ad2f h1:+feYJlxPM00jEkdybexHiwIIOVuClwTEbh1WLiNr0mk= +github.com/unrolled/render v0.0.0-20181210145518-4c664cb3ad2f/go.mod h1:tu82oB5W2ykJRVioYsB+IQKcft7ryBr7w12qMBUPyXg= +github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 h1:MPPkRncZLN9Kh4MEFmbnK4h3BD7AUmskWv2+EeZJCCs= +github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181005035420-146acd28ed58 h1:otZG8yDCO4LVps5+9bxOeNiCvgmOyt96J3roHTYs7oE= +golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3 h1:eH6Eip3UpmR+yM/qI9Ijluzb1bNv/cAU/n+6l8tRSis= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 h1:IcgEB62HYgAhX0Nd/QrVgZlxlcyxbGQHElLUhW2X4Fo= +golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg= +golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI= +google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20181221175505-bd9b4fb69e2f h1:eT3B0O2ghdSPzjAOznr3oOLyN1HFeYUncYl7FRwg4VI= +google.golang.org/genproto v0.0.0-20181221175505-bd9b4fb69e2f/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg= +google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= +google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= +google.golang.org/grpc v1.17.0 h1:TRJYBgMclJvGYn2rIMjj+h9KtMt5r1Ij7ODVRIZkwhk= +google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/locate/codec.go b/locate/codec.go new file mode 100644 index 00000000..39318a73 --- /dev/null +++ b/locate/codec.go @@ -0,0 +1,56 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package locate + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" + pd "github.com/pingcap/pd/client" + "github.com/tikv/client-go/codec" +) + +type codecPDClient struct { + pd.Client +} + +// GetRegion encodes the key before send requests to pd-server and decodes the +// returned StartKey && EndKey from pd-server. +func (c *codecPDClient) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) { + encodedKey := codec.EncodeBytes(key) + region, peer, err := c.Client.GetRegion(ctx, encodedKey) + return processRegionResult(region, peer, err) +} + +// GetRegion encodes the key before send requests to pd-server and decodes the +// returned StartKey && EndKey from pd-server. +func (c *codecPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) { + region, peer, err := c.Client.GetRegionByID(ctx, regionID) + return processRegionResult(region, peer, err) +} + +func processRegionResult(region *metapb.Region, peer *metapb.Peer, err error) (*metapb.Region, *metapb.Peer, error) { + if err != nil { + return nil, nil, errors.Trace(err) + } + if region == nil { + return nil, nil, nil + } + err = codec.DecodeRegionMetaKey(region) + if err != nil { + return nil, nil, errors.Trace(err) + } + return region, peer, nil +} diff --git a/locate/region_cache.go b/locate/region_cache.go new file mode 100644 index 00000000..5ef204e3 --- /dev/null +++ b/locate/region_cache.go @@ -0,0 +1,613 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package locate + +import ( + "bytes" + "context" + "sync" + "sync/atomic" + "time" + + "github.com/google/btree" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/client" + log "github.com/sirupsen/logrus" + "github.com/tikv/client-go/codec" + "github.com/tikv/client-go/metrics" + "github.com/tikv/client-go/retry" +) + +const ( + btreeDegree = 32 + rcDefaultRegionCacheTTL = time.Minute * 10 +) + +// CachedRegion encapsulates {Region, TTL} +type CachedRegion struct { + region *Region + lastAccess int64 +} + +func (c *CachedRegion) isValid() bool { + lastAccess := atomic.LoadInt64(&c.lastAccess) + lastAccessTime := time.Unix(lastAccess, 0) + return time.Since(lastAccessTime) < rcDefaultRegionCacheTTL +} + +// RegionCache caches Regions loaded from PD. +type RegionCache struct { + pdClient pd.Client + + mu struct { + sync.RWMutex + regions map[RegionVerID]*CachedRegion + sorted *btree.BTree + } + storeMu struct { + sync.RWMutex + stores map[uint64]*Store + } +} + +// NewRegionCache creates a RegionCache. +func NewRegionCache(pdClient pd.Client) *RegionCache { + c := &RegionCache{ + pdClient: pdClient, + } + c.mu.regions = make(map[RegionVerID]*CachedRegion) + c.mu.sorted = btree.New(btreeDegree) + c.storeMu.stores = make(map[uint64]*Store) + return c +} + +// RPCContext contains data that is needed to send RPC to a region. +type RPCContext struct { + Region RegionVerID + Meta *metapb.Region + Peer *metapb.Peer + Addr string +} + +// GetStoreID returns StoreID. +func (c *RPCContext) GetStoreID() uint64 { + if c.Peer != nil { + return c.Peer.StoreId + } + return 0 +} + +// GetRPCContext returns RPCContext for a region. If it returns nil, the region +// must be out of date and already dropped from cache. +func (c *RegionCache) GetRPCContext(bo *retry.Backoffer, id RegionVerID) (*RPCContext, error) { + c.mu.RLock() + region := c.getCachedRegion(id) + if region == nil { + c.mu.RUnlock() + return nil, nil + } + // Note: it is safe to use region.meta and region.peer without clone after + // unlock, because region cache will never update the content of region's meta + // or peer. On the contrary, if we want to use `region` after unlock, then we + // need to clone it to avoid data race. + meta, peer := region.meta, region.peer + c.mu.RUnlock() + + addr, err := c.GetStoreAddr(bo, peer.GetStoreId()) + if err != nil { + return nil, errors.Trace(err) + } + if addr == "" { + // Store not found, region must be out of date. + c.DropRegion(id) + return nil, nil + } + return &RPCContext{ + Region: id, + Meta: meta, + Peer: peer, + Addr: addr, + }, nil +} + +// KeyLocation is the region and range that a key is located. +type KeyLocation struct { + Region RegionVerID + StartKey []byte + EndKey []byte +} + +// Contains checks if key is in [StartKey, EndKey). +func (l *KeyLocation) Contains(key []byte) bool { + return bytes.Compare(l.StartKey, key) <= 0 && + (bytes.Compare(key, l.EndKey) < 0 || len(l.EndKey) == 0) +} + +// LocateKey searches for the region and range that the key is located. +func (c *RegionCache) LocateKey(bo *retry.Backoffer, key []byte) (*KeyLocation, error) { + c.mu.RLock() + r := c.searchCachedRegion(key) + if r != nil { + loc := &KeyLocation{ + Region: r.VerID(), + StartKey: r.StartKey(), + EndKey: r.EndKey(), + } + c.mu.RUnlock() + return loc, nil + } + c.mu.RUnlock() + + r, err := c.loadRegion(bo, key) + if err != nil { + return nil, errors.Trace(err) + } + + c.mu.Lock() + defer c.mu.Unlock() + c.insertRegionToCache(r) + + return &KeyLocation{ + Region: r.VerID(), + StartKey: r.StartKey(), + EndKey: r.EndKey(), + }, nil +} + +// LocateRegionByID searches for the region with ID. +func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*KeyLocation, error) { + c.mu.RLock() + r := c.getRegionByIDFromCache(regionID) + if r != nil { + loc := &KeyLocation{ + Region: r.VerID(), + StartKey: r.StartKey(), + EndKey: r.EndKey(), + } + c.mu.RUnlock() + return loc, nil + } + c.mu.RUnlock() + + r, err := c.loadRegionByID(bo, regionID) + if err != nil { + return nil, errors.Trace(err) + } + + c.mu.Lock() + defer c.mu.Unlock() + c.insertRegionToCache(r) + return &KeyLocation{ + Region: r.VerID(), + StartKey: r.StartKey(), + EndKey: r.EndKey(), + }, nil +} + +// GroupKeysByRegion separates keys into groups by their belonging Regions. +// Specially it also returns the first key's region which may be used as the +// 'PrimaryLockKey' and should be committed ahead of others. +func (c *RegionCache) GroupKeysByRegion(bo *retry.Backoffer, keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error) { + groups := make(map[RegionVerID][][]byte) + var first RegionVerID + var lastLoc *KeyLocation + for i, k := range keys { + if lastLoc == nil || !lastLoc.Contains(k) { + var err error + lastLoc, err = c.LocateKey(bo, k) + if err != nil { + return nil, first, errors.Trace(err) + } + } + id := lastLoc.Region + if i == 0 { + first = id + } + groups[id] = append(groups[id], k) + } + return groups, first, nil +} + +// ListRegionIDsInKeyRange lists ids of regions in [start_key,end_key]. +func (c *RegionCache) ListRegionIDsInKeyRange(bo *retry.Backoffer, startKey, endKey []byte) (regionIDs []uint64, err error) { + for { + curRegion, err := c.LocateKey(bo, startKey) + if err != nil { + return nil, errors.Trace(err) + } + regionIDs = append(regionIDs, curRegion.Region.id) + if curRegion.Contains(endKey) { + break + } + startKey = curRegion.EndKey + } + return regionIDs, nil +} + +// DropRegion removes a cached Region. +func (c *RegionCache) DropRegion(id RegionVerID) { + c.mu.Lock() + defer c.mu.Unlock() + c.dropRegionFromCache(id) +} + +// UpdateLeader update some region cache with newer leader info. +func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64) { + c.mu.Lock() + defer c.mu.Unlock() + + r := c.getCachedRegion(regionID) + if r == nil { + log.Debugf("regionCache: cannot find region when updating leader %d,%d", regionID, leaderStoreID) + return + } + + if !r.SwitchPeer(leaderStoreID) { + log.Debugf("regionCache: cannot find peer when updating leader %d,%d", regionID, leaderStoreID) + c.dropRegionFromCache(r.VerID()) + } +} + +// insertRegionToCache tries to insert the Region to cache. +func (c *RegionCache) insertRegionToCache(r *Region) { + old := c.mu.sorted.ReplaceOrInsert(newBtreeItem(r)) + if old != nil { + delete(c.mu.regions, old.(*btreeItem).region.VerID()) + } + c.mu.regions[r.VerID()] = &CachedRegion{ + region: r, + lastAccess: time.Now().Unix(), + } +} + +// getCachedRegion loads a region from cache. It also checks if the region has +// not been accessed for a long time (maybe out of date). In this case, it +// returns nil so the region will be loaded from PD again. +// Note that it should be called with c.mu.RLock(), and the returned Region +// should not be used after c.mu is RUnlock(). +func (c *RegionCache) getCachedRegion(id RegionVerID) *Region { + cachedRegion, ok := c.mu.regions[id] + if !ok { + return nil + } + if cachedRegion.isValid() { + atomic.StoreInt64(&cachedRegion.lastAccess, time.Now().Unix()) + return cachedRegion.region + } + return nil +} + +// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`, +// it should be called with c.mu.RLock(), and the returned Region should not be +// used after c.mu is RUnlock(). +func (c *RegionCache) searchCachedRegion(key []byte) *Region { + var r *Region + c.mu.sorted.DescendLessOrEqual(newBtreeSearchItem(key), func(item btree.Item) bool { + r = item.(*btreeItem).region + return false + }) + if r != nil && r.Contains(key) { + return c.getCachedRegion(r.VerID()) + } + return nil +} + +// getRegionByIDFromCache tries to get region by regionID from cache. Like +// `getCachedRegion`, it should be called with c.mu.RLock(), and the returned +// Region should not be used after c.mu is RUnlock(). +func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { + for v, r := range c.mu.regions { + if v.id == regionID { + return r.region + } + } + return nil +} + +func (c *RegionCache) dropRegionFromCache(verID RegionVerID) { + r, ok := c.mu.regions[verID] + if !ok { + return + } + metrics.RegionCacheCounter.WithLabelValues("drop_region_from_cache", "ok").Inc() + c.mu.sorted.Delete(newBtreeItem(r.region)) + delete(c.mu.regions, verID) +} + +// loadRegion loads region from pd client, and picks the first peer as leader. +func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte) (*Region, error) { + var backoffErr error + for { + if backoffErr != nil { + err := bo.Backoff(retry.BoPDRPC, backoffErr) + if err != nil { + return nil, errors.Trace(err) + } + } + meta, leader, err := c.pdClient.GetRegion(bo.GetContext(), key) + metrics.RegionCacheCounter.WithLabelValues("get_region", metrics.RetLabel(err)).Inc() + if err != nil { + backoffErr = errors.Errorf("loadRegion from PD failed, key: %q, err: %v", key, err) + continue + } + if meta == nil { + backoffErr = errors.Errorf("region not found for key %q", key) + continue + } + if len(meta.Peers) == 0 { + return nil, errors.New("receive Region with no peer") + } + region := &Region{ + meta: meta, + peer: meta.Peers[0], + } + if leader != nil { + region.SwitchPeer(leader.GetStoreId()) + } + return region, nil + } +} + +// loadRegionByID loads region from pd client, and picks the first peer as leader. +func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Region, error) { + var backoffErr error + for { + if backoffErr != nil { + err := bo.Backoff(retry.BoPDRPC, backoffErr) + if err != nil { + return nil, errors.Trace(err) + } + } + meta, leader, err := c.pdClient.GetRegionByID(bo.GetContext(), regionID) + metrics.RegionCacheCounter.WithLabelValues("get_region_by_id", metrics.RetLabel(err)).Inc() + if err != nil { + backoffErr = errors.Errorf("loadRegion from PD failed, regionID: %v, err: %v", regionID, err) + continue + } + if meta == nil { + backoffErr = errors.Errorf("region not found for regionID %q", regionID) + continue + } + if len(meta.Peers) == 0 { + return nil, errors.New("receive Region with no peer") + } + region := &Region{ + meta: meta, + peer: meta.Peers[0], + } + if leader != nil { + region.SwitchPeer(leader.GetStoreId()) + } + return region, nil + } +} + +// GetStoreAddr returns a tikv server's address by its storeID. It checks cache +// first, sends request to pd server when necessary. +func (c *RegionCache) GetStoreAddr(bo *retry.Backoffer, id uint64) (string, error) { + c.storeMu.RLock() + if store, ok := c.storeMu.stores[id]; ok { + c.storeMu.RUnlock() + return store.Addr, nil + } + c.storeMu.RUnlock() + return c.ReloadStoreAddr(bo, id) +} + +// ReloadStoreAddr reloads store's address. +func (c *RegionCache) ReloadStoreAddr(bo *retry.Backoffer, id uint64) (string, error) { + addr, err := c.loadStoreAddr(bo, id) + if err != nil || addr == "" { + return "", errors.Trace(err) + } + + c.storeMu.Lock() + defer c.storeMu.Unlock() + c.storeMu.stores[id] = &Store{ + ID: id, + Addr: addr, + } + return addr, nil +} + +// ClearStoreByID clears store from cache with storeID. +func (c *RegionCache) ClearStoreByID(id uint64) { + c.storeMu.Lock() + defer c.storeMu.Unlock() + delete(c.storeMu.stores, id) +} + +func (c *RegionCache) loadStoreAddr(bo *retry.Backoffer, id uint64) (string, error) { + for { + store, err := c.pdClient.GetStore(bo.GetContext(), id) + metrics.RegionCacheCounter.WithLabelValues("get_store", metrics.RetLabel(err)).Inc() + if err != nil { + if errors.Cause(err) == context.Canceled { + return "", errors.Trace(err) + } + err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", id, err) + if err = bo.Backoff(retry.BoPDRPC, err); err != nil { + return "", errors.Trace(err) + } + continue + } + if store == nil { + return "", nil + } + return store.GetAddress(), nil + } +} + +// DropStoreOnSendRequestFail is used for clearing cache when a tikv server does not respond. +func (c *RegionCache) DropStoreOnSendRequestFail(ctx *RPCContext, err error) { + // We need to drop the store only when the request is the first one failed on this store. + // Because too many concurrently requests trying to drop the store will be blocked on the lock. + failedRegionID := ctx.Region + failedStoreID := ctx.Peer.StoreId + c.mu.Lock() + _, ok := c.mu.regions[failedRegionID] + if !ok { + // The failed region is dropped already by another request, we don't need to iterate the regions + // and find regions on the failed store to drop. + c.mu.Unlock() + return + } + for id, r := range c.mu.regions { + if r.region.peer.GetStoreId() == failedStoreID { + c.dropRegionFromCache(id) + } + } + c.mu.Unlock() + + // Store's meta may be out of date. + var failedStoreAddr string + c.storeMu.Lock() + store, ok := c.storeMu.stores[failedStoreID] + if ok { + failedStoreAddr = store.Addr + delete(c.storeMu.stores, failedStoreID) + } + c.storeMu.Unlock() + log.Infof("drop regions that on the store %d(%s) due to send request fail, err: %v", + failedStoreID, failedStoreAddr, err) +} + +// OnRegionStale removes the old region and inserts new regions into the cache. +func (c *RegionCache) OnRegionStale(ctx *RPCContext, newRegions []*metapb.Region) error { + c.mu.Lock() + defer c.mu.Unlock() + + c.dropRegionFromCache(ctx.Region) + + for _, meta := range newRegions { + if _, ok := c.pdClient.(*codecPDClient); ok { + if err := codec.DecodeRegionMetaKey(meta); err != nil { + return errors.Errorf("newRegion's range key is not encoded: %v, %v", meta, err) + } + } + region := &Region{ + meta: meta, + peer: meta.Peers[0], + } + region.SwitchPeer(ctx.Peer.GetStoreId()) + c.insertRegionToCache(region) + } + return nil +} + +// PDClient returns the pd.Client in RegionCache. +func (c *RegionCache) PDClient() pd.Client { + return c.pdClient +} + +// btreeItem is BTree's Item that uses []byte to compare. +type btreeItem struct { + key []byte + region *Region +} + +func newBtreeItem(r *Region) *btreeItem { + return &btreeItem{ + key: r.StartKey(), + region: r, + } +} + +func newBtreeSearchItem(key []byte) *btreeItem { + return &btreeItem{ + key: key, + } +} + +func (item *btreeItem) Less(other btree.Item) bool { + return bytes.Compare(item.key, other.(*btreeItem).key) < 0 +} + +// Region stores region's meta and its leader peer. +type Region struct { + meta *metapb.Region + peer *metapb.Peer +} + +// GetID returns id. +func (r *Region) GetID() uint64 { + return r.meta.GetId() +} + +// RegionVerID is a unique ID that can identify a Region at a specific version. +type RegionVerID struct { + id uint64 + confVer uint64 + ver uint64 +} + +// GetID returns the id of the region +func (r *RegionVerID) GetID() uint64 { + return r.id +} + +// VerID returns the Region's RegionVerID. +func (r *Region) VerID() RegionVerID { + return RegionVerID{ + id: r.meta.GetId(), + confVer: r.meta.GetRegionEpoch().GetConfVer(), + ver: r.meta.GetRegionEpoch().GetVersion(), + } +} + +// StartKey returns StartKey. +func (r *Region) StartKey() []byte { + return r.meta.StartKey +} + +// EndKey returns EndKey. +func (r *Region) EndKey() []byte { + return r.meta.EndKey +} + +// GetContext constructs kvprotopb.Context from region info. +func (r *Region) GetContext() *kvrpcpb.Context { + return &kvrpcpb.Context{ + RegionId: r.meta.Id, + RegionEpoch: r.meta.RegionEpoch, + Peer: r.peer, + } +} + +// SwitchPeer switches current peer to the one on specific store. It returns +// false if no peer matches the storeID. +func (r *Region) SwitchPeer(storeID uint64) bool { + for _, p := range r.meta.Peers { + if p.GetStoreId() == storeID { + r.peer = p + return true + } + } + return false +} + +// Contains checks whether the key is in the region, for the maximum region endKey is empty. +// startKey <= key < endKey. +func (r *Region) Contains(key []byte) bool { + return bytes.Compare(r.meta.GetStartKey(), key) <= 0 && + (bytes.Compare(key, r.meta.GetEndKey()) < 0 || len(r.meta.GetEndKey()) == 0) +} + +// Store contains a tikv server's address. +type Store struct { + ID uint64 + Addr string +} diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 00000000..b8587667 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,221 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +// Client metrics. +// TODO: Create new grafana page for the metrics. +var ( + TxnCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "txn_total", + Help: "Counter of created txns.", + }) + + SnapshotCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "snapshot_total", + Help: "Counter of snapshots.", + }) + + TxnCmdCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "txn_cmd_total", + Help: "Counter of txn commands.", + }, []string{"type"}) + + TxnCmdHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "txn_cmd_duration_seconds", + Help: "Bucketed histogram of processing time of txn cmds.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20), + }, []string{"type"}) + + BackoffCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "backoff_total", + Help: "Counter of backoff.", + }, []string{"type"}) + + BackoffHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "backoff_seconds", + Help: "total backoff seconds of a single backoffer.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20), + }) + + ConnPoolHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "get_conn_seconds", + Help: "Bucketed histogram of taking conn from conn pool.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20), + }, []string{"type"}) + + SendReqHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "request_seconds", + Help: "Bucketed histogram of sending request duration.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20), + }, []string{"type", "store"}) + + CoprocessorCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "cop_actions_total", + Help: "Counter of coprocessor actions.", + }, []string{"type"}) + + CoprocessorHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "cop_duration_seconds", + Help: "Run duration of a single coprocessor task, includes backoff time.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20), + }) + + LockResolverCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "lock_resolver_actions_total", + Help: "Counter of lock resolver actions.", + }, []string{"type"}) + + RegionErrorCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "region_err_total", + Help: "Counter of region errors.", + }, []string{"type"}) + + TxnWriteKVCountHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "txn_write_kv_num", + Help: "Count of kv pairs to write in a transaction.", + Buckets: prometheus.ExponentialBuckets(1, 2, 21), + }) + + TxnWriteSizeHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "txn_write_size_bytes", + Help: "Size of kv pairs to write in a transaction.", + Buckets: prometheus.ExponentialBuckets(1, 2, 21), + }) + + RawkvCmdHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "rawkv_cmd_seconds", + Help: "Bucketed histogram of processing time of rawkv cmds.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20), + }, []string{"type"}) + + RawkvSizeHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "rawkv_kv_size_bytes", + Help: "Size of key/value to put, in bytes.", + Buckets: prometheus.ExponentialBuckets(1, 2, 21), + }, []string{"type"}) + + TxnRegionsNumHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "txn_regions_num", + Help: "Number of regions in a transaction.", + Buckets: prometheus.ExponentialBuckets(1, 2, 20), + }, []string{"type"}) + + LoadSafepointCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "load_safepoint_total", + Help: "Counter of load safepoint.", + }, []string{"type"}) + + SecondaryLockCleanupFailureCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "lock_cleanup_task_total", + Help: "failure statistic of secondary lock cleanup task.", + }, []string{"type"}) + + RegionCacheCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tikv", + Subsystem: "client_go", + Name: "region_cache_operations_total", + Help: "Counter of region cache.", + }, []string{"type", "result"}) +) + +// RetLabel returns "ok" when err == nil and "err" when err != nil. +// This could be useful when you need to observe the operation result. +func RetLabel(err error) string { + if err == nil { + return "ok" + } + return "err" +} + +func init() { + prometheus.MustRegister(TxnCounter) + prometheus.MustRegister(SnapshotCounter) + prometheus.MustRegister(TxnCmdHistogram) + prometheus.MustRegister(BackoffCounter) + prometheus.MustRegister(BackoffHistogram) + prometheus.MustRegister(SendReqHistogram) + prometheus.MustRegister(ConnPoolHistogram) + prometheus.MustRegister(CoprocessorCounter) + prometheus.MustRegister(CoprocessorHistogram) + prometheus.MustRegister(LockResolverCounter) + prometheus.MustRegister(RegionErrorCounter) + prometheus.MustRegister(TxnWriteKVCountHistogram) + prometheus.MustRegister(TxnWriteSizeHistogram) + prometheus.MustRegister(RawkvCmdHistogram) + prometheus.MustRegister(RawkvSizeHistogram) + prometheus.MustRegister(TxnRegionsNumHistogram) + prometheus.MustRegister(LoadSafepointCounter) + prometheus.MustRegister(SecondaryLockCleanupFailureCounter) + prometheus.MustRegister(RegionCacheCounter) +} diff --git a/raw/errors.go b/raw/errors.go new file mode 100644 index 00000000..d1452282 --- /dev/null +++ b/raw/errors.go @@ -0,0 +1,18 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package raw + +import "github.com/pingcap/errors" + +var ErrBodyMissing = errors.New("response body is missing") diff --git a/raw/rawkv.go b/raw/rawkv.go new file mode 100644 index 00000000..060525a1 --- /dev/null +++ b/raw/rawkv.go @@ -0,0 +1,619 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package raw + +import ( + "bytes" + "context" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + pd "github.com/pingcap/pd/client" + "github.com/tikv/client-go/config" + "github.com/tikv/client-go/locate" + "github.com/tikv/client-go/metrics" + "github.com/tikv/client-go/retry" + "github.com/tikv/client-go/rpc" +) + +var ( + // MaxRawKVScanLimit is the maximum scan limit for rawkv Scan. + MaxRawKVScanLimit = 10240 + // ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large. + ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit") +) + +const ( + // rawBatchPutSize is the maximum size limit for rawkv each batch put request. + rawBatchPutSize = 16 * 1024 + // rawBatchPairCount is the maximum limit for rawkv each batch get/delete request. + rawBatchPairCount = 512 +) + +// RawKVClient is a client of TiKV server which is used as a key-value storage, +// only GET/PUT/DELETE commands are supported. +type RawKVClient struct { + clusterID uint64 + regionCache *locate.RegionCache + pdClient pd.Client + rpcClient rpc.Client +} + +// NewRawKVClient creates a client with PD cluster addrs. +func NewRawKVClient(pdAddrs []string, security config.Security) (*RawKVClient, error) { + pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{ + CAPath: security.SSLCA, + CertPath: security.SSLCert, + KeyPath: security.SSLKey, + }) + if err != nil { + return nil, errors.Trace(err) + } + return &RawKVClient{ + clusterID: pdCli.GetClusterID(context.TODO()), + regionCache: locate.NewRegionCache(pdCli), + pdClient: pdCli, + rpcClient: rpc.NewRPCClient(security), + }, nil +} + +// Close closes the client. +func (c *RawKVClient) Close() error { + c.pdClient.Close() + return c.rpcClient.Close() +} + +// ClusterID returns the TiKV cluster ID. +func (c *RawKVClient) ClusterID() uint64 { + return c.clusterID +} + +// Get queries value with the key. When the key does not exist, it returns `nil, nil`. +func (c *RawKVClient) Get(key []byte) ([]byte, error) { + start := time.Now() + defer func() { metrics.RawkvCmdHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }() + + req := &rpc.Request{ + Type: rpc.CmdRawGet, + RawGet: &kvrpcpb.RawGetRequest{ + Key: key, + }, + } + resp, _, err := c.sendReq(key, req) + if err != nil { + return nil, errors.Trace(err) + } + cmdResp := resp.RawGet + if cmdResp == nil { + return nil, errors.Trace(ErrBodyMissing) + } + if cmdResp.GetError() != "" { + return nil, errors.New(cmdResp.GetError()) + } + if len(cmdResp.Value) == 0 { + return nil, nil + } + return cmdResp.Value, nil +} + +// BatchGet queries values with the keys. +func (c *RawKVClient) BatchGet(keys [][]byte) ([][]byte, error) { + start := time.Now() + defer func() { + metrics.RawkvCmdHistogram.WithLabelValues("batch_get").Observe(time.Since(start).Seconds()) + }() + + bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff) + resp, err := c.sendBatchReq(bo, keys, rpc.CmdRawBatchGet) + if err != nil { + return nil, errors.Trace(err) + } + + cmdResp := resp.RawBatchGet + if cmdResp == nil { + return nil, errors.Trace(ErrBodyMissing) + } + + keyToValue := make(map[string][]byte, len(keys)) + for _, pair := range cmdResp.Pairs { + keyToValue[string(pair.Key)] = pair.Value + } + + values := make([][]byte, len(keys)) + for i, key := range keys { + values[i] = keyToValue[string(key)] + } + return values, nil +} + +// Put stores a key-value pair to TiKV. +func (c *RawKVClient) Put(key, value []byte) error { + start := time.Now() + defer func() { metrics.RawkvCmdHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds()) }() + metrics.RawkvSizeHistogram.WithLabelValues("key").Observe(float64(len(key))) + metrics.RawkvSizeHistogram.WithLabelValues("value").Observe(float64(len(value))) + + if len(value) == 0 { + return errors.New("empty value is not supported") + } + + req := &rpc.Request{ + Type: rpc.CmdRawPut, + RawPut: &kvrpcpb.RawPutRequest{ + Key: key, + Value: value, + }, + } + resp, _, err := c.sendReq(key, req) + if err != nil { + return errors.Trace(err) + } + cmdResp := resp.RawPut + if cmdResp == nil { + return errors.Trace(ErrBodyMissing) + } + if cmdResp.GetError() != "" { + return errors.New(cmdResp.GetError()) + } + return nil +} + +// BatchPut stores key-value pairs to TiKV. +func (c *RawKVClient) BatchPut(keys, values [][]byte) error { + start := time.Now() + defer func() { + metrics.RawkvCmdHistogram.WithLabelValues("batch_put").Observe(time.Since(start).Seconds()) + }() + + if len(keys) != len(values) { + return errors.New("the len of keys is not equal to the len of values") + } + for _, value := range values { + if len(value) == 0 { + return errors.New("empty value is not supported") + } + } + bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff) + err := c.sendBatchPut(bo, keys, values) + return errors.Trace(err) +} + +// Delete deletes a key-value pair from TiKV. +func (c *RawKVClient) Delete(key []byte) error { + start := time.Now() + defer func() { metrics.RawkvCmdHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds()) }() + + req := &rpc.Request{ + Type: rpc.CmdRawDelete, + RawDelete: &kvrpcpb.RawDeleteRequest{ + Key: key, + }, + } + resp, _, err := c.sendReq(key, req) + if err != nil { + return errors.Trace(err) + } + cmdResp := resp.RawDelete + if cmdResp == nil { + return errors.Trace(ErrBodyMissing) + } + if cmdResp.GetError() != "" { + return errors.New(cmdResp.GetError()) + } + return nil +} + +// BatchDelete deletes key-value pairs from TiKV +func (c *RawKVClient) BatchDelete(keys [][]byte) error { + start := time.Now() + defer func() { + metrics.RawkvCmdHistogram.WithLabelValues("batch_delete").Observe(time.Since(start).Seconds()) + }() + + bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff) + resp, err := c.sendBatchReq(bo, keys, rpc.CmdRawBatchDelete) + if err != nil { + return errors.Trace(err) + } + cmdResp := resp.RawBatchDelete + if cmdResp == nil { + return errors.Trace(ErrBodyMissing) + } + if cmdResp.GetError() != "" { + return errors.New(cmdResp.GetError()) + } + return nil +} + +// DeleteRange deletes all key-value pairs in a range from TiKV +func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error { + start := time.Now() + var err error + defer func() { + var label = "delete_range" + if err != nil { + label += "_error" + } + metrics.RawkvCmdHistogram.WithLabelValues(label).Observe(time.Since(start).Seconds()) + }() + + // Process each affected region respectively + for !bytes.Equal(startKey, endKey) { + var resp *rpc.Response + var actualEndKey []byte + resp, actualEndKey, err = c.sendDeleteRangeReq(startKey, endKey) + if err != nil { + return errors.Trace(err) + } + cmdResp := resp.RawDeleteRange + if cmdResp == nil { + return errors.Trace(ErrBodyMissing) + } + if cmdResp.GetError() != "" { + return errors.New(cmdResp.GetError()) + } + startKey = actualEndKey + } + + return nil +} + +// Scan queries continuous kv pairs, starts from startKey, up to limit pairs. +// If you want to exclude the startKey, append a '\0' to the key: `Scan(append(startKey, '\0'), limit)`. +func (c *RawKVClient) Scan(startKey []byte, limit int) (keys [][]byte, values [][]byte, err error) { + start := time.Now() + defer func() { metrics.RawkvCmdHistogram.WithLabelValues("raw_scan").Observe(time.Since(start).Seconds()) }() + + if limit > MaxRawKVScanLimit { + return nil, nil, errors.Trace(ErrMaxScanLimitExceeded) + } + + for len(keys) < limit { + req := &rpc.Request{ + Type: rpc.CmdRawScan, + RawScan: &kvrpcpb.RawScanRequest{ + StartKey: startKey, + Limit: uint32(limit - len(keys)), + }, + } + resp, loc, err := c.sendReq(startKey, req) + if err != nil { + return nil, nil, errors.Trace(err) + } + cmdResp := resp.RawScan + if cmdResp == nil { + return nil, nil, errors.Trace(ErrBodyMissing) + } + for _, pair := range cmdResp.Kvs { + keys = append(keys, pair.Key) + values = append(values, pair.Value) + } + startKey = loc.EndKey + if len(startKey) == 0 { + break + } + } + return +} + +func (c *RawKVClient) sendReq(key []byte, req *rpc.Request) (*rpc.Response, *locate.KeyLocation, error) { + bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff) + sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient) + for { + loc, err := c.regionCache.LocateKey(bo, key) + if err != nil { + return nil, nil, errors.Trace(err) + } + resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort) + if err != nil { + return nil, nil, errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return nil, nil, errors.Trace(err) + } + if regionErr != nil { + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + return nil, nil, errors.Trace(err) + } + continue + } + return resp, loc, nil + } +} + +func (c *RawKVClient) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType rpc.CmdType) (*rpc.Response, error) { // split the keys + groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys) + if err != nil { + return nil, errors.Trace(err) + } + + var batches []batch + for regionID, groupKeys := range groups { + batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount) + } + bo, cancel := bo.Fork() + ches := make(chan singleBatchResp, len(batches)) + for _, batch := range batches { + batch1 := batch + go func() { + singleBatchBackoffer, singleBatchCancel := bo.Fork() + defer singleBatchCancel() + ches <- c.doBatchReq(singleBatchBackoffer, batch1, cmdType) + }() + } + + var firstError error + var resp *rpc.Response + switch cmdType { + case rpc.CmdRawBatchGet: + resp = &rpc.Response{Type: rpc.CmdRawBatchGet, RawBatchGet: &kvrpcpb.RawBatchGetResponse{}} + case rpc.CmdRawBatchDelete: + resp = &rpc.Response{Type: rpc.CmdRawBatchDelete, RawBatchDelete: &kvrpcpb.RawBatchDeleteResponse{}} + } + for i := 0; i < len(batches); i++ { + singleResp, ok := <-ches + if ok { + if singleResp.err != nil { + cancel() + if firstError == nil { + firstError = singleResp.err + } + } else if cmdType == rpc.CmdRawBatchGet { + cmdResp := singleResp.resp.RawBatchGet + resp.RawBatchGet.Pairs = append(resp.RawBatchGet.Pairs, cmdResp.Pairs...) + } + } + } + + return resp, firstError +} + +func (c *RawKVClient) doBatchReq(bo *retry.Backoffer, batch batch, cmdType rpc.CmdType) singleBatchResp { + var req *rpc.Request + switch cmdType { + case rpc.CmdRawBatchGet: + req = &rpc.Request{ + Type: cmdType, + RawBatchGet: &kvrpcpb.RawBatchGetRequest{ + Keys: batch.keys, + }, + } + case rpc.CmdRawBatchDelete: + req = &rpc.Request{ + Type: cmdType, + RawBatchDelete: &kvrpcpb.RawBatchDeleteRequest{ + Keys: batch.keys, + }, + } + } + + sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient) + resp, err := sender.SendReq(bo, req, batch.regionID, rpc.ReadTimeoutShort) + + batchResp := singleBatchResp{} + if err != nil { + batchResp.err = errors.Trace(err) + return batchResp + } + regionErr, err := resp.GetRegionError() + if err != nil { + batchResp.err = errors.Trace(err) + return batchResp + } + if regionErr != nil { + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + batchResp.err = errors.Trace(err) + return batchResp + } + resp, err = c.sendBatchReq(bo, batch.keys, cmdType) + batchResp.resp = resp + batchResp.err = err + return batchResp + } + + switch cmdType { + case rpc.CmdRawBatchGet: + batchResp.resp = resp + case rpc.CmdRawBatchDelete: + cmdResp := resp.RawBatchDelete + if cmdResp == nil { + batchResp.err = errors.Trace(ErrBodyMissing) + return batchResp + } + if cmdResp.GetError() != "" { + batchResp.err = errors.New(cmdResp.GetError()) + return batchResp + } + batchResp.resp = resp + } + return batchResp +} + +// sendDeleteRangeReq sends a raw delete range request and returns the response and the actual endKey. +// If the given range spans over more than one regions, the actual endKey is the end of the first region. +// We can't use sendReq directly, because we need to know the end of the region before we send the request +// TODO: Is there any better way to avoid duplicating code with func `sendReq` ? +func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*rpc.Response, []byte, error) { + bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff) + sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient) + for { + loc, err := c.regionCache.LocateKey(bo, startKey) + if err != nil { + return nil, nil, errors.Trace(err) + } + + actualEndKey := endKey + if len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, endKey) < 0 { + actualEndKey = loc.EndKey + } + + req := &rpc.Request{ + Type: rpc.CmdRawDeleteRange, + RawDeleteRange: &kvrpcpb.RawDeleteRangeRequest{ + StartKey: startKey, + EndKey: actualEndKey, + }, + } + + resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort) + if err != nil { + return nil, nil, errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return nil, nil, errors.Trace(err) + } + if regionErr != nil { + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + return nil, nil, errors.Trace(err) + } + continue + } + return resp, actualEndKey, nil + } +} + +func (c *RawKVClient) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) error { + keyToValue := make(map[string][]byte) + for i, key := range keys { + keyToValue[string(key)] = values[i] + } + groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys) + if err != nil { + return errors.Trace(err) + } + var batches []batch + // split the keys by size and RegionVerID + for regionID, groupKeys := range groups { + batches = appendBatches(batches, regionID, groupKeys, keyToValue, rawBatchPutSize) + } + bo, cancel := bo.Fork() + ch := make(chan error, len(batches)) + for _, batch := range batches { + batch1 := batch + go func() { + singleBatchBackoffer, singleBatchCancel := bo.Fork() + defer singleBatchCancel() + ch <- c.doBatchPut(singleBatchBackoffer, batch1) + }() + } + + for i := 0; i < len(batches); i++ { + if e := <-ch; e != nil { + cancel() + // catch the first error + if err == nil { + err = e + } + } + } + return errors.Trace(err) +} + +func appendKeyBatches(batches []batch, regionID locate.RegionVerID, groupKeys [][]byte, limit int) []batch { + var keys [][]byte + for start, count := 0, 0; start < len(groupKeys); start++ { + if count > limit { + batches = append(batches, batch{regionID: regionID, keys: keys}) + keys = make([][]byte, 0, limit) + count = 0 + } + keys = append(keys, groupKeys[start]) + count++ + } + if len(keys) != 0 { + batches = append(batches, batch{regionID: regionID, keys: keys}) + } + return batches +} + +func appendBatches(batches []batch, regionID locate.RegionVerID, groupKeys [][]byte, keyToValue map[string][]byte, limit int) []batch { + var start, size int + var keys, values [][]byte + for start = 0; start < len(groupKeys); start++ { + if size >= limit { + batches = append(batches, batch{regionID: regionID, keys: keys, values: values}) + keys = make([][]byte, 0) + values = make([][]byte, 0) + size = 0 + } + key := groupKeys[start] + value := keyToValue[string(key)] + keys = append(keys, key) + values = append(values, value) + size += len(key) + size += len(value) + } + if len(keys) != 0 { + batches = append(batches, batch{regionID: regionID, keys: keys, values: values}) + } + return batches +} + +func (c *RawKVClient) doBatchPut(bo *retry.Backoffer, batch batch) error { + kvPair := make([]*kvrpcpb.KvPair, 0, len(batch.keys)) + for i, key := range batch.keys { + kvPair = append(kvPair, &kvrpcpb.KvPair{Key: key, Value: batch.values[i]}) + } + + req := &rpc.Request{ + Type: rpc.CmdRawBatchPut, + RawBatchPut: &kvrpcpb.RawBatchPutRequest{ + Pairs: kvPair, + }, + } + + sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient) + resp, err := sender.SendReq(bo, req, batch.regionID, rpc.ReadTimeoutShort) + if err != nil { + return errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return errors.Trace(err) + } + if regionErr != nil { + err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String())) + if err != nil { + return errors.Trace(err) + } + // recursive call + return c.sendBatchPut(bo, batch.keys, batch.values) + } + + cmdResp := resp.RawBatchPut + if cmdResp == nil { + return errors.Trace(ErrBodyMissing) + } + if cmdResp.GetError() != "" { + return errors.New(cmdResp.GetError()) + } + return nil +} + +type batch struct { + regionID locate.RegionVerID + keys [][]byte + values [][]byte +} + +type singleBatchResp struct { + resp *rpc.Response + err error +} diff --git a/retry/backoff.go b/retry/backoff.go new file mode 100644 index 00000000..ef332363 --- /dev/null +++ b/retry/backoff.go @@ -0,0 +1,254 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "fmt" + "math" + "math/rand" + "time" + + "github.com/pingcap/errors" + log "github.com/sirupsen/logrus" + "github.com/tikv/client-go/metrics" +) + +const ( + // NoJitter makes the backoff sequence strict exponential. + NoJitter = 1 + iota + // FullJitter applies random factors to strict exponential. + FullJitter + // EqualJitter is also randomized, but prevents very short sleeps. + EqualJitter + // DecorrJitter increases the maximum jitter based on the last random value. + DecorrJitter +) + +// NewBackoffFn creates a backoff func which implements exponential backoff with +// optional jitters. +// See http://www.awsarchitectureblog.com/2015/03/backoff.html +func NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int { + if base < 2 { + // Top prevent panic in 'rand.Intn'. + base = 2 + } + attempts := 0 + lastSleep := base + return func(ctx context.Context) int { + var sleep int + switch jitter { + case NoJitter: + sleep = expo(base, cap, attempts) + case FullJitter: + v := expo(base, cap, attempts) + sleep = rand.Intn(v) + case EqualJitter: + v := expo(base, cap, attempts) + sleep = v/2 + rand.Intn(v/2) + case DecorrJitter: + sleep = int(math.Min(float64(cap), float64(base+rand.Intn(lastSleep*3-base)))) + } + log.Debugf("backoff base %d, sleep %d", base, sleep) + select { + case <-time.After(time.Duration(sleep) * time.Millisecond): + case <-ctx.Done(): + } + + attempts++ + lastSleep = sleep + return lastSleep + } +} + +func expo(base, cap, n int) int { + return int(math.Min(float64(cap), float64(base)*math.Pow(2.0, float64(n)))) +} + +// BackoffType is the retryable error type. +type BackoffType int + +// Back off types. +const ( + BoTiKVRPC BackoffType = iota + BoTxnLock + BoTxnLockFast + BoPDRPC + BoRegionMiss + BoUpdateLeader + BoServerBusy +) + +func (t BackoffType) createFn() func(context.Context) int { + switch t { + case BoTiKVRPC: + return NewBackoffFn(100, 2000, EqualJitter) + case BoTxnLock: + return NewBackoffFn(200, 3000, EqualJitter) + case BoTxnLockFast: + return NewBackoffFn(50, 3000, EqualJitter) + case BoPDRPC: + return NewBackoffFn(500, 3000, EqualJitter) + case BoRegionMiss: + // change base time to 2ms, because it may recover soon. + return NewBackoffFn(2, 500, NoJitter) + case BoUpdateLeader: + return NewBackoffFn(1, 10, NoJitter) + case BoServerBusy: + return NewBackoffFn(2000, 10000, EqualJitter) + } + return nil +} + +func (t BackoffType) String() string { + switch t { + case BoTiKVRPC: + return "tikvRPC" + case BoTxnLock: + return "txnLock" + case BoTxnLockFast: + return "txnLockFast" + case BoPDRPC: + return "pdRPC" + case BoRegionMiss: + return "regionMiss" + case BoUpdateLeader: + return "updateLeader" + case BoServerBusy: + return "serverBusy" + } + return "" +} + +// Maximum total sleep time(in ms) for kv/cop commands. +const ( + copBuildTaskMaxBackoff = 5000 + tsoMaxBackoff = 15000 + scannerNextMaxBackoff = 20000 + batchGetMaxBackoff = 20000 + copNextMaxBackoff = 20000 + getMaxBackoff = 20000 + prewriteMaxBackoff = 20000 + cleanupMaxBackoff = 20000 + GcOneRegionMaxBackoff = 20000 + GcResolveLockMaxBackoff = 100000 + deleteRangeOneRegionMaxBackoff = 100000 + RawkvMaxBackoff = 20000 + splitRegionBackoff = 20000 +) + +// CommitMaxBackoff is max sleep time of the 'commit' command +var CommitMaxBackoff = 41000 + +// Backoffer is a utility for retrying queries. +type Backoffer struct { + ctx context.Context + + fn map[BackoffType]func(context.Context) int + maxSleep int + totalSleep int + errors []error + types []BackoffType +} + +// txnStartKey is a key for transaction start_ts info in context.Context. +const txnStartKey = "_txn_start_key" + +// NewBackoffer creates a Backoffer with maximum sleep time(in ms). +func NewBackoffer(ctx context.Context, maxSleep int) *Backoffer { + return &Backoffer{ + ctx: ctx, + maxSleep: maxSleep, + } +} + +// Backoff sleeps a while base on the BackoffType and records the error message. +// It returns a retryable error if total sleep time exceeds maxSleep. +func (b *Backoffer) Backoff(typ BackoffType, err error) error { + select { + case <-b.ctx.Done(): + return errors.Trace(err) + default: + } + + metrics.BackoffCounter.WithLabelValues(typ.String()).Inc() + // Lazy initialize. + if b.fn == nil { + b.fn = make(map[BackoffType]func(context.Context) int) + } + f, ok := b.fn[typ] + if !ok { + f = typ.createFn() + b.fn[typ] = f + } + + b.totalSleep += f(b.ctx) + b.types = append(b.types, typ) + + var startTs interface{} = "" + if ts := b.ctx.Value(txnStartKey); ts != nil { + startTs = ts + } + log.Debugf("%v, retry later(totalsleep %dms, maxsleep %dms), type: %s, txn_start_ts: %v", err, b.totalSleep, b.maxSleep, typ.String(), startTs) + + b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano))) + if b.maxSleep > 0 && b.totalSleep >= b.maxSleep { + errMsg := fmt.Sprintf("backoffer.maxSleep %dms is exceeded, errors:", b.maxSleep) + for i, err := range b.errors { + // Print only last 3 errors for non-DEBUG log levels. + if log.GetLevel() == log.DebugLevel || i >= len(b.errors)-3 { + errMsg += "\n" + err.Error() + } + } + log.Warn(errMsg) + // Use the first backoff type to generate a MySQL error. + return errors.New(b.types[0].String()) + } + return nil +} + +func (b *Backoffer) String() string { + if b.totalSleep == 0 { + return "" + } + return fmt.Sprintf(" backoff(%dms %v)", b.totalSleep, b.types) +} + +// Clone creates a new Backoffer which keeps current Backoffer's sleep time and errors, and shares +// current Backoffer's context. +func (b *Backoffer) Clone() *Backoffer { + return &Backoffer{ + ctx: b.ctx, + maxSleep: b.maxSleep, + totalSleep: b.totalSleep, + errors: b.errors, + } +} + +// Fork creates a new Backoffer which keeps current Backoffer's sleep time and errors, and holds +// a child context of current Backoffer's context. +func (b *Backoffer) Fork() (*Backoffer, context.CancelFunc) { + ctx, cancel := context.WithCancel(b.ctx) + return &Backoffer{ + ctx: ctx, + maxSleep: b.maxSleep, + totalSleep: b.totalSleep, + errors: b.errors[:len(b.errors):len(b.errors)], + }, cancel +} + +// GetContext returns the associated context. +func (b *Backoffer) GetContext() context.Context { + return b.ctx +} diff --git a/rpc/calls.go b/rpc/calls.go new file mode 100644 index 00000000..4b4dadc0 --- /dev/null +++ b/rpc/calls.go @@ -0,0 +1,558 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package rpc + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/errorpb" + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/tikvpb" +) + +// CmdType represents the concrete request type in Request or response type in Response. +type CmdType uint16 + +// CmdType values. +const ( + CmdGet CmdType = 1 + iota + CmdScan + CmdPrewrite + CmdCommit + CmdCleanup + CmdBatchGet + CmdBatchRollback + CmdScanLock + CmdResolveLock + CmdGC + CmdDeleteRange + + CmdRawGet CmdType = 256 + iota + CmdRawBatchGet + CmdRawPut + CmdRawBatchPut + CmdRawDelete + CmdRawBatchDelete + CmdRawDeleteRange + CmdRawScan + + CmdUnsafeDestroyRange + + CmdCop CmdType = 512 + iota + CmdCopStream + + CmdMvccGetByKey CmdType = 1024 + iota + CmdMvccGetByStartTs + CmdSplitRegion +) + +func (t CmdType) String() string { + switch t { + case CmdGet: + return "Get" + case CmdScan: + return "Scan" + case CmdPrewrite: + return "Prewrite" + case CmdCommit: + return "Commit" + case CmdCleanup: + return "Cleanup" + case CmdBatchGet: + return "BatchGet" + case CmdBatchRollback: + return "BatchRollback" + case CmdScanLock: + return "ScanLock" + case CmdResolveLock: + return "ResolveLock" + case CmdGC: + return "GC" + case CmdDeleteRange: + return "DeleteRange" + case CmdRawGet: + return "RawGet" + case CmdRawBatchGet: + return "RawBatchGet" + case CmdRawPut: + return "RawPut" + case CmdRawBatchPut: + return "RawBatchPut" + case CmdRawDelete: + return "RawDelete" + case CmdRawBatchDelete: + return "RawBatchDelete" + case CmdRawDeleteRange: + return "RawDeleteRange" + case CmdRawScan: + return "RawScan" + case CmdUnsafeDestroyRange: + return "UnsafeDestroyRange" + case CmdCop: + return "Cop" + case CmdCopStream: + return "CopStream" + case CmdMvccGetByKey: + return "MvccGetByKey" + case CmdMvccGetByStartTs: + return "MvccGetByStartTS" + case CmdSplitRegion: + return "SplitRegion" + } + return "Unknown" +} + +// Request wraps all kv/coprocessor requests. +type Request struct { + kvrpcpb.Context + Type CmdType + Get *kvrpcpb.GetRequest + Scan *kvrpcpb.ScanRequest + Prewrite *kvrpcpb.PrewriteRequest + Commit *kvrpcpb.CommitRequest + Cleanup *kvrpcpb.CleanupRequest + BatchGet *kvrpcpb.BatchGetRequest + BatchRollback *kvrpcpb.BatchRollbackRequest + ScanLock *kvrpcpb.ScanLockRequest + ResolveLock *kvrpcpb.ResolveLockRequest + GC *kvrpcpb.GCRequest + DeleteRange *kvrpcpb.DeleteRangeRequest + RawGet *kvrpcpb.RawGetRequest + RawBatchGet *kvrpcpb.RawBatchGetRequest + RawPut *kvrpcpb.RawPutRequest + RawBatchPut *kvrpcpb.RawBatchPutRequest + RawDelete *kvrpcpb.RawDeleteRequest + RawBatchDelete *kvrpcpb.RawBatchDeleteRequest + RawDeleteRange *kvrpcpb.RawDeleteRangeRequest + RawScan *kvrpcpb.RawScanRequest + UnsafeDestroyRange *kvrpcpb.UnsafeDestroyRangeRequest + Cop *coprocessor.Request + MvccGetByKey *kvrpcpb.MvccGetByKeyRequest + MvccGetByStartTs *kvrpcpb.MvccGetByStartTsRequest + SplitRegion *kvrpcpb.SplitRegionRequest +} + +// Response wraps all kv/coprocessor responses. +type Response struct { + Type CmdType + Get *kvrpcpb.GetResponse + Scan *kvrpcpb.ScanResponse + Prewrite *kvrpcpb.PrewriteResponse + Commit *kvrpcpb.CommitResponse + Cleanup *kvrpcpb.CleanupResponse + BatchGet *kvrpcpb.BatchGetResponse + BatchRollback *kvrpcpb.BatchRollbackResponse + ScanLock *kvrpcpb.ScanLockResponse + ResolveLock *kvrpcpb.ResolveLockResponse + GC *kvrpcpb.GCResponse + DeleteRange *kvrpcpb.DeleteRangeResponse + RawGet *kvrpcpb.RawGetResponse + RawBatchGet *kvrpcpb.RawBatchGetResponse + RawPut *kvrpcpb.RawPutResponse + RawBatchPut *kvrpcpb.RawBatchPutResponse + RawDelete *kvrpcpb.RawDeleteResponse + RawBatchDelete *kvrpcpb.RawBatchDeleteResponse + RawDeleteRange *kvrpcpb.RawDeleteRangeResponse + RawScan *kvrpcpb.RawScanResponse + UnsafeDestroyRange *kvrpcpb.UnsafeDestroyRangeResponse + Cop *coprocessor.Response + CopStream *CopStreamResponse + MvccGetByKey *kvrpcpb.MvccGetByKeyResponse + MvccGetByStartTS *kvrpcpb.MvccGetByStartTsResponse + SplitRegion *kvrpcpb.SplitRegionResponse +} + +// CopStreamResponse combinates tikvpb.Tikv_CoprocessorStreamClient and the first Recv() result together. +// In streaming API, get grpc stream client may not involve any network packet, then region error have +// to be handled in Recv() function. This struct facilitates the error handling. +type CopStreamResponse struct { + tikvpb.Tikv_CoprocessorStreamClient + *coprocessor.Response // The first result of Recv() + Timeout time.Duration + Lease // Shared by this object and a background goroutine. +} + +// SetContext set the Context field for the given req to the specified ctx. +func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { + ctx := &req.Context + ctx.RegionId = region.Id + ctx.RegionEpoch = region.RegionEpoch + ctx.Peer = peer + + switch req.Type { + case CmdGet: + req.Get.Context = ctx + case CmdScan: + req.Scan.Context = ctx + case CmdPrewrite: + req.Prewrite.Context = ctx + case CmdCommit: + req.Commit.Context = ctx + case CmdCleanup: + req.Cleanup.Context = ctx + case CmdBatchGet: + req.BatchGet.Context = ctx + case CmdBatchRollback: + req.BatchRollback.Context = ctx + case CmdScanLock: + req.ScanLock.Context = ctx + case CmdResolveLock: + req.ResolveLock.Context = ctx + case CmdGC: + req.GC.Context = ctx + case CmdDeleteRange: + req.DeleteRange.Context = ctx + case CmdRawGet: + req.RawGet.Context = ctx + case CmdRawBatchGet: + req.RawBatchGet.Context = ctx + case CmdRawPut: + req.RawPut.Context = ctx + case CmdRawBatchPut: + req.RawBatchPut.Context = ctx + case CmdRawDelete: + req.RawDelete.Context = ctx + case CmdRawBatchDelete: + req.RawBatchDelete.Context = ctx + case CmdRawDeleteRange: + req.RawDeleteRange.Context = ctx + case CmdRawScan: + req.RawScan.Context = ctx + case CmdUnsafeDestroyRange: + req.UnsafeDestroyRange.Context = ctx + case CmdCop: + req.Cop.Context = ctx + case CmdCopStream: + req.Cop.Context = ctx + case CmdMvccGetByKey: + req.MvccGetByKey.Context = ctx + case CmdMvccGetByStartTs: + req.MvccGetByStartTs.Context = ctx + case CmdSplitRegion: + req.SplitRegion.Context = ctx + default: + return fmt.Errorf("invalid request type %v", req.Type) + } + return nil +} + +// GenRegionErrorResp returns corresponding Response with specified RegionError +// according to the given req. +func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { + resp := &Response{} + resp.Type = req.Type + switch req.Type { + case CmdGet: + resp.Get = &kvrpcpb.GetResponse{ + RegionError: e, + } + case CmdScan: + resp.Scan = &kvrpcpb.ScanResponse{ + RegionError: e, + } + case CmdPrewrite: + resp.Prewrite = &kvrpcpb.PrewriteResponse{ + RegionError: e, + } + case CmdCommit: + resp.Commit = &kvrpcpb.CommitResponse{ + RegionError: e, + } + case CmdCleanup: + resp.Cleanup = &kvrpcpb.CleanupResponse{ + RegionError: e, + } + case CmdBatchGet: + resp.BatchGet = &kvrpcpb.BatchGetResponse{ + RegionError: e, + } + case CmdBatchRollback: + resp.BatchRollback = &kvrpcpb.BatchRollbackResponse{ + RegionError: e, + } + case CmdScanLock: + resp.ScanLock = &kvrpcpb.ScanLockResponse{ + RegionError: e, + } + case CmdResolveLock: + resp.ResolveLock = &kvrpcpb.ResolveLockResponse{ + RegionError: e, + } + case CmdGC: + resp.GC = &kvrpcpb.GCResponse{ + RegionError: e, + } + case CmdDeleteRange: + resp.DeleteRange = &kvrpcpb.DeleteRangeResponse{ + RegionError: e, + } + case CmdRawGet: + resp.RawGet = &kvrpcpb.RawGetResponse{ + RegionError: e, + } + case CmdRawBatchGet: + resp.RawBatchGet = &kvrpcpb.RawBatchGetResponse{ + RegionError: e, + } + case CmdRawPut: + resp.RawPut = &kvrpcpb.RawPutResponse{ + RegionError: e, + } + case CmdRawBatchPut: + resp.RawBatchPut = &kvrpcpb.RawBatchPutResponse{ + RegionError: e, + } + case CmdRawDelete: + resp.RawDelete = &kvrpcpb.RawDeleteResponse{ + RegionError: e, + } + case CmdRawBatchDelete: + resp.RawBatchDelete = &kvrpcpb.RawBatchDeleteResponse{ + RegionError: e, + } + case CmdRawDeleteRange: + resp.RawDeleteRange = &kvrpcpb.RawDeleteRangeResponse{ + RegionError: e, + } + case CmdRawScan: + resp.RawScan = &kvrpcpb.RawScanResponse{ + RegionError: e, + } + case CmdUnsafeDestroyRange: + resp.UnsafeDestroyRange = &kvrpcpb.UnsafeDestroyRangeResponse{ + RegionError: e, + } + case CmdCop: + resp.Cop = &coprocessor.Response{ + RegionError: e, + } + case CmdCopStream: + resp.CopStream = &CopStreamResponse{ + Response: &coprocessor.Response{ + RegionError: e, + }, + } + case CmdMvccGetByKey: + resp.MvccGetByKey = &kvrpcpb.MvccGetByKeyResponse{ + RegionError: e, + } + case CmdMvccGetByStartTs: + resp.MvccGetByStartTS = &kvrpcpb.MvccGetByStartTsResponse{ + RegionError: e, + } + case CmdSplitRegion: + resp.SplitRegion = &kvrpcpb.SplitRegionResponse{ + RegionError: e, + } + default: + return nil, fmt.Errorf("invalid request type %v", req.Type) + } + return resp, nil +} + +// GetRegionError returns the RegionError of the underlying concrete response. +func (resp *Response) GetRegionError() (*errorpb.Error, error) { + var e *errorpb.Error + switch resp.Type { + case CmdGet: + e = resp.Get.GetRegionError() + case CmdScan: + e = resp.Scan.GetRegionError() + case CmdPrewrite: + e = resp.Prewrite.GetRegionError() + case CmdCommit: + e = resp.Commit.GetRegionError() + case CmdCleanup: + e = resp.Cleanup.GetRegionError() + case CmdBatchGet: + e = resp.BatchGet.GetRegionError() + case CmdBatchRollback: + e = resp.BatchRollback.GetRegionError() + case CmdScanLock: + e = resp.ScanLock.GetRegionError() + case CmdResolveLock: + e = resp.ResolveLock.GetRegionError() + case CmdGC: + e = resp.GC.GetRegionError() + case CmdDeleteRange: + e = resp.DeleteRange.GetRegionError() + case CmdRawGet: + e = resp.RawGet.GetRegionError() + case CmdRawBatchGet: + e = resp.RawBatchGet.GetRegionError() + case CmdRawPut: + e = resp.RawPut.GetRegionError() + case CmdRawBatchPut: + e = resp.RawBatchPut.GetRegionError() + case CmdRawDelete: + e = resp.RawDelete.GetRegionError() + case CmdRawBatchDelete: + e = resp.RawBatchDelete.GetRegionError() + case CmdRawDeleteRange: + e = resp.RawDeleteRange.GetRegionError() + case CmdRawScan: + e = resp.RawScan.GetRegionError() + case CmdUnsafeDestroyRange: + e = resp.UnsafeDestroyRange.GetRegionError() + case CmdCop: + e = resp.Cop.GetRegionError() + case CmdCopStream: + e = resp.CopStream.Response.GetRegionError() + case CmdMvccGetByKey: + e = resp.MvccGetByKey.GetRegionError() + case CmdMvccGetByStartTs: + e = resp.MvccGetByStartTS.GetRegionError() + case CmdSplitRegion: + e = resp.SplitRegion.GetRegionError() + default: + return nil, fmt.Errorf("invalid response type %v", resp.Type) + } + return e, nil +} + +// CallRPC launches a rpc call. +// ch is needed to implement timeout for coprocessor streaing, the stream object's +// cancel function will be sent to the channel, together with a lease checked by a background goroutine. +func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Response, error) { + resp := &Response{} + resp.Type = req.Type + var err error + switch req.Type { + case CmdGet: + resp.Get, err = client.KvGet(ctx, req.Get) + case CmdScan: + resp.Scan, err = client.KvScan(ctx, req.Scan) + case CmdPrewrite: + resp.Prewrite, err = client.KvPrewrite(ctx, req.Prewrite) + case CmdCommit: + resp.Commit, err = client.KvCommit(ctx, req.Commit) + case CmdCleanup: + resp.Cleanup, err = client.KvCleanup(ctx, req.Cleanup) + case CmdBatchGet: + resp.BatchGet, err = client.KvBatchGet(ctx, req.BatchGet) + case CmdBatchRollback: + resp.BatchRollback, err = client.KvBatchRollback(ctx, req.BatchRollback) + case CmdScanLock: + resp.ScanLock, err = client.KvScanLock(ctx, req.ScanLock) + case CmdResolveLock: + resp.ResolveLock, err = client.KvResolveLock(ctx, req.ResolveLock) + case CmdGC: + resp.GC, err = client.KvGC(ctx, req.GC) + case CmdDeleteRange: + resp.DeleteRange, err = client.KvDeleteRange(ctx, req.DeleteRange) + case CmdRawGet: + resp.RawGet, err = client.RawGet(ctx, req.RawGet) + case CmdRawBatchGet: + resp.RawBatchGet, err = client.RawBatchGet(ctx, req.RawBatchGet) + case CmdRawPut: + resp.RawPut, err = client.RawPut(ctx, req.RawPut) + case CmdRawBatchPut: + resp.RawBatchPut, err = client.RawBatchPut(ctx, req.RawBatchPut) + case CmdRawDelete: + resp.RawDelete, err = client.RawDelete(ctx, req.RawDelete) + case CmdRawBatchDelete: + resp.RawBatchDelete, err = client.RawBatchDelete(ctx, req.RawBatchDelete) + case CmdRawDeleteRange: + resp.RawDeleteRange, err = client.RawDeleteRange(ctx, req.RawDeleteRange) + case CmdRawScan: + resp.RawScan, err = client.RawScan(ctx, req.RawScan) + case CmdUnsafeDestroyRange: + resp.UnsafeDestroyRange, err = client.UnsafeDestroyRange(ctx, req.UnsafeDestroyRange) + case CmdCop: + resp.Cop, err = client.Coprocessor(ctx, req.Cop) + case CmdCopStream: + var streamClient tikvpb.Tikv_CoprocessorStreamClient + streamClient, err = client.CoprocessorStream(ctx, req.Cop) + resp.CopStream = &CopStreamResponse{ + Tikv_CoprocessorStreamClient: streamClient, + } + case CmdMvccGetByKey: + resp.MvccGetByKey, err = client.MvccGetByKey(ctx, req.MvccGetByKey) + case CmdMvccGetByStartTs: + resp.MvccGetByStartTS, err = client.MvccGetByStartTs(ctx, req.MvccGetByStartTs) + case CmdSplitRegion: + resp.SplitRegion, err = client.SplitRegion(ctx, req.SplitRegion) + default: + return nil, errors.Errorf("invalid request type: %v", req.Type) + } + if err != nil { + return nil, errors.Trace(err) + } + return resp, nil +} + +// Lease is used to implement grpc stream timeout. +type Lease struct { + Cancel context.CancelFunc + deadline int64 // A time.UnixNano value, if time.Now().UnixNano() > deadline, cancel() would be called. +} + +// Recv overrides the stream client Recv() function. +func (resp *CopStreamResponse) Recv() (*coprocessor.Response, error) { + deadline := time.Now().Add(resp.Timeout).UnixNano() + atomic.StoreInt64(&resp.Lease.deadline, deadline) + + ret, err := resp.Tikv_CoprocessorStreamClient.Recv() + + atomic.StoreInt64(&resp.Lease.deadline, 0) // Stop the lease check. + return ret, errors.Trace(err) +} + +// Close closes the CopStreamResponse object. +func (resp *CopStreamResponse) Close() { + atomic.StoreInt64(&resp.Lease.deadline, 1) +} + +// CheckStreamTimeoutLoop runs periodically to check is there any stream request timeouted. +// Lease is an object to track stream requests, call this function with "go CheckStreamTimeoutLoop()" +func CheckStreamTimeoutLoop(ch <-chan *Lease) { + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + array := make([]*Lease, 0, 1024) + + for { + select { + case item, ok := <-ch: + if !ok { + // This channel close means goroutine should return. + return + } + array = append(array, item) + case now := <-ticker.C: + array = keepOnlyActive(array, now.UnixNano()) + } + } +} + +// keepOnlyActive removes completed items, call cancel function for timeout items. +func keepOnlyActive(array []*Lease, now int64) []*Lease { + idx := 0 + for i := 0; i < len(array); i++ { + item := array[i] + deadline := atomic.LoadInt64(&item.deadline) + if deadline == 0 || deadline > now { + array[idx] = array[i] + idx++ + } else { + item.Cancel() + } + } + return array[:idx] +} diff --git a/rpc/client.go b/rpc/client.go new file mode 100644 index 00000000..7e4f3d62 --- /dev/null +++ b/rpc/client.go @@ -0,0 +1,290 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package rpc + +import ( + "context" + "io" + "strconv" + "sync" + "sync/atomic" + "time" + + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/kvproto/pkg/tikvpb" + log "github.com/sirupsen/logrus" + "github.com/tikv/client-go/config" + "github.com/tikv/client-go/metrics" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" +) + +// MaxConnectionCount is the max gRPC connections that will be established with +// each tikv-server. +var MaxConnectionCount uint = 16 + +// GrpcKeepAliveTime is the duration of time after which if the client doesn't see +// any activity it pings the server to see if the transport is still alive. +var GrpcKeepAliveTime = time.Duration(10) * time.Second + +// GrpcKeepAliveTimeout is the duration of time for which the client waits after having +// pinged for keepalive check and if no activity is seen even after that the connection +// is closed. +var GrpcKeepAliveTimeout = time.Duration(3) * time.Second + +// MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than +// current value, an error will be reported from gRPC. +var MaxSendMsgSize = 1<<31 - 1 + +// MaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than +// current value, an error will be reported from gRPC. +var MaxCallMsgSize = 1<<31 - 1 + +// Timeout durations. +const ( + dialTimeout = 5 * time.Second + ReadTimeoutShort = 20 * time.Second // For requests that read/write several key-values. + ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region. + ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times. + GCTimeout = 5 * time.Minute + UnsafeDestroyRangeTimeout = 5 * time.Minute + + grpcInitialWindowSize = 1 << 30 + grpcInitialConnWindowSize = 1 << 30 +) + +// Client is a client that sends RPC. +// It should not be used after calling Close(). +type Client interface { + // Close should release all data. + Close() error + // SendRequest sends Request. + SendRequest(ctx context.Context, addr string, req *Request, timeout time.Duration) (*Response, error) +} + +type connArray struct { + index uint32 + v []*grpc.ClientConn + // Bind with a background goroutine to process coprocessor streaming timeout. + streamTimeout chan *Lease +} + +func newConnArray(maxSize uint, addr string, security config.Security) (*connArray, error) { + a := &connArray{ + index: 0, + v: make([]*grpc.ClientConn, maxSize), + streamTimeout: make(chan *Lease, 1024), + } + if err := a.Init(addr, security); err != nil { + return nil, err + } + return a, nil +} + +func (a *connArray) Init(addr string, security config.Security) error { + opt := grpc.WithInsecure() + if len(security.SSLCA) != 0 { + tlsConfig, err := security.ToTLSConfig() + if err != nil { + return errors.Trace(err) + } + opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) + } + + unaryInterceptor := grpc_prometheus.UnaryClientInterceptor + streamInterceptor := grpc_prometheus.StreamClientInterceptor + if config.EnableOpenTracing { + unaryInterceptor = grpc_middleware.ChainUnaryClient( + unaryInterceptor, + grpc_opentracing.UnaryClientInterceptor(), + ) + streamInterceptor = grpc_middleware.ChainStreamClient( + streamInterceptor, + grpc_opentracing.StreamClientInterceptor(), + ) + } + + for i := range a.v { + ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) + conn, err := grpc.DialContext( + ctx, + addr, + opt, + grpc.WithInitialWindowSize(grpcInitialWindowSize), + grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize), + grpc.WithUnaryInterceptor(unaryInterceptor), + grpc.WithStreamInterceptor(streamInterceptor), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxCallMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(MaxSendMsgSize)), + grpc.WithBackoffMaxDelay(time.Second*3), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: GrpcKeepAliveTime, + Timeout: GrpcKeepAliveTimeout, + PermitWithoutStream: true, + }), + ) + cancel() + if err != nil { + // Cleanup if the initialization fails. + a.Close() + return errors.Trace(err) + } + a.v[i] = conn + } + go CheckStreamTimeoutLoop(a.streamTimeout) + + return nil +} + +func (a *connArray) Get() *grpc.ClientConn { + next := atomic.AddUint32(&a.index, 1) % uint32(len(a.v)) + return a.v[next] +} + +func (a *connArray) Close() { + for i, c := range a.v { + if c != nil { + c.Close() + a.v[i] = nil + } + } + close(a.streamTimeout) +} + +// rpcClient is RPC client struct. +// TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV. +// Since we use shared client connection to communicate to the same TiKV, it's possible +// that there are too many concurrent requests which overload the service of TiKV. +// TODO: Implement background cleanup. It adds a background goroutine to periodically check +// whether there is any connection is idle and then close and remove these idle connections. +type rpcClient struct { + sync.RWMutex + isClosed bool + conns map[string]*connArray + security config.Security +} + +// NewRPCClient manages connections and rpc calls with tikv-servers. +func NewRPCClient(security config.Security) Client { + return &rpcClient{ + conns: make(map[string]*connArray), + security: security, + } +} + +func (c *rpcClient) getConnArray(addr string) (*connArray, error) { + c.RLock() + if c.isClosed { + c.RUnlock() + return nil, errors.Errorf("rpcClient is closed") + } + array, ok := c.conns[addr] + c.RUnlock() + if !ok { + var err error + array, err = c.createConnArray(addr) + if err != nil { + return nil, err + } + } + return array, nil +} + +func (c *rpcClient) createConnArray(addr string) (*connArray, error) { + c.Lock() + defer c.Unlock() + array, ok := c.conns[addr] + if !ok { + var err error + array, err = newConnArray(MaxConnectionCount, addr, c.security) + if err != nil { + return nil, err + } + c.conns[addr] = array + } + return array, nil +} + +func (c *rpcClient) closeConns() { + c.Lock() + if !c.isClosed { + c.isClosed = true + // close all connections + for _, array := range c.conns { + array.Close() + } + } + c.Unlock() +} + +// SendRequest sends a Request to server and receives Response. +func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *Request, timeout time.Duration) (*Response, error) { + start := time.Now() + reqType := req.Type.String() + storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10) + defer func() { + metrics.SendReqHistogram.WithLabelValues(reqType, storeID).Observe(time.Since(start).Seconds()) + }() + + connArray, err := c.getConnArray(addr) + if err != nil { + return nil, errors.Trace(err) + } + client := tikvpb.NewTikvClient(connArray.Get()) + + if req.Type != CmdCopStream { + ctx1, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + return CallRPC(ctx1, client, req) + } + + // Coprocessor streaming request. + // Use context to support timeout for grpc streaming client. + ctx1, cancel := context.WithCancel(ctx) + defer cancel() + resp, err := CallRPC(ctx1, client, req) + if err != nil { + return nil, errors.Trace(err) + } + + // Put the lease object to the timeout channel, so it would be checked periodically. + copStream := resp.CopStream + copStream.Timeout = timeout + copStream.Lease.Cancel = cancel + connArray.streamTimeout <- &copStream.Lease + + // Read the first streaming response to get CopStreamResponse. + // This can make error handling much easier, because SendReq() retry on + // region error automatically. + var first *coprocessor.Response + first, err = copStream.Recv() + if err != nil { + if errors.Cause(err) != io.EOF { + return nil, errors.Trace(err) + } + log.Debug("copstream returns nothing for the request.") + } + copStream.Response = first + return resp, nil +} + +func (c *rpcClient) Close() error { + c.closeConns() + return nil +} diff --git a/rpc/region_request.go b/rpc/region_request.go new file mode 100644 index 00000000..7eab6598 --- /dev/null +++ b/rpc/region_request.go @@ -0,0 +1,243 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package rpc + +import ( + "context" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/errorpb" + log "github.com/sirupsen/logrus" + "github.com/tikv/client-go/locate" + "github.com/tikv/client-go/metrics" + "github.com/tikv/client-go/retry" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// RegionRequestSender sends KV/Cop requests to tikv server. It handles network +// errors and some region errors internally. +// +// Typically, a KV/Cop request is bind to a region, all keys that are involved +// in the request should be located in the region. +// The sending process begins with looking for the address of leader store's +// address of the target region from cache, and the request is then sent to the +// destination tikv server over TCP connection. +// If region is updated, can be caused by leader transfer, region split, region +// merge, or region balance, tikv server may not able to process request and +// send back a RegionError. +// RegionRequestSender takes care of errors that does not relevant to region +// range, such as 'I/O timeout', 'NotLeader', and 'ServerIsBusy'. For other +// errors, since region range have changed, the request may need to split, so we +// simply return the error to caller. +type RegionRequestSender struct { + regionCache *locate.RegionCache + client Client + storeAddr string + rpcError error +} + +// NewRegionRequestSender creates a new sender. +func NewRegionRequestSender(regionCache *locate.RegionCache, client Client) *RegionRequestSender { + return &RegionRequestSender{ + regionCache: regionCache, + client: client, + } +} + +// SendReq sends a request to tikv server. +func (s *RegionRequestSender) SendReq(bo *retry.Backoffer, req *Request, regionID locate.RegionVerID, timeout time.Duration) (*Response, error) { + + // gofail: var tikvStoreSendReqResult string + // switch tikvStoreSendReqResult { + // case "timeout": + // return nil, errors.New("timeout") + // case "GCNotLeader": + // if req.Type == CmdGC { + // return &Response{ + // Type: CmdGC, + // GC: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}}, + // }, nil + // } + // case "GCServerIsBusy": + // if req.Type == CmdGC { + // return &Response{ + // Type: CmdGC, + // GC: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}}, + // }, nil + // } + // } + + for { + ctx, err := s.regionCache.GetRPCContext(bo, regionID) + if err != nil { + return nil, errors.Trace(err) + } + if ctx == nil { + // If the region is not found in cache, it must be out + // of date and already be cleaned up. We can skip the + // RPC by returning RegionError directly. + + // TODO: Change the returned error to something like "region missing in cache", + // and handle this error like StaleEpoch, which means to re-split the request and retry. + return GenRegionErrorResp(req, &errorpb.Error{StaleEpoch: &errorpb.StaleEpoch{}}) + } + + s.storeAddr = ctx.Addr + resp, retry, err := s.sendReqToRegion(bo, ctx, req, timeout) + if err != nil { + return nil, errors.Trace(err) + } + if retry { + continue + } + + regionErr, err := resp.GetRegionError() + if err != nil { + return nil, errors.Trace(err) + } + if regionErr != nil { + retry, err := s.onRegionError(bo, ctx, regionErr) + if err != nil { + return nil, errors.Trace(err) + } + if retry { + continue + } + } + return resp, nil + } +} + +func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, ctx *locate.RPCContext, req *Request, timeout time.Duration) (resp *Response, retry bool, err error) { + if e := SetContext(req, ctx.Meta, ctx.Peer); e != nil { + return nil, false, errors.Trace(e) + } + resp, err = s.client.SendRequest(bo.GetContext(), ctx.Addr, req, timeout) + if err != nil { + s.rpcError = err + if e := s.onSendFail(bo, ctx, err); e != nil { + return nil, false, errors.Trace(e) + } + return nil, true, nil + } + return +} + +func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *locate.RPCContext, err error) error { + // If it failed because the context is cancelled by ourself, don't retry. + if errors.Cause(err) == context.Canceled { + return errors.Trace(err) + } + code := codes.Unknown + if s, ok := status.FromError(errors.Cause(err)); ok { + code = s.Code() + } + if code == codes.Canceled { + select { + case <-bo.GetContext().Done(): + return errors.Trace(err) + default: + // If we don't cancel, but the error code is Canceled, it must be from grpc remote. + // This may happen when tikv is killed and exiting. + // Backoff and retry in this case. + log.Warn("receive a grpc cancel signal from remote:", errors.ErrorStack(err)) + } + } + + s.regionCache.DropStoreOnSendRequestFail(ctx, err) + + // Retry on send request failure when it's not canceled. + // When a store is not available, the leader of related region should be elected quickly. + // TODO: the number of retry time should be limited:since region may be unavailable + // when some unrecoverable disaster happened. + err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx)) + return errors.Trace(err) +} + +func regionErrorToLabel(e *errorpb.Error) string { + if e.GetNotLeader() != nil { + return "not_leader" + } else if e.GetRegionNotFound() != nil { + return "region_not_found" + } else if e.GetKeyNotInRegion() != nil { + return "key_not_in_region" + } else if e.GetStaleEpoch() != nil { + return "stale_epoch" + } else if e.GetServerIsBusy() != nil { + return "server_is_busy" + } else if e.GetStaleCommand() != nil { + return "stale_command" + } else if e.GetStoreNotMatch() != nil { + return "store_not_match" + } + return "unknown" +} + +func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *locate.RPCContext, regionErr *errorpb.Error) (retryable bool, err error) { + metrics.RegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc() + if notLeader := regionErr.GetNotLeader(); notLeader != nil { + // Retry if error is `NotLeader`. + log.Debugf("tikv reports `NotLeader`: %s, ctx: %v, retry later", notLeader, ctx) + s.regionCache.UpdateLeader(ctx.Region, notLeader.GetLeader().GetStoreId()) + + var boType retry.BackoffType + if notLeader.GetLeader() != nil { + boType = retry.BoUpdateLeader + } else { + boType = retry.BoRegionMiss + } + + if err = bo.Backoff(boType, errors.Errorf("not leader: %v, ctx: %v", notLeader, ctx)); err != nil { + return false, errors.Trace(err) + } + + return true, nil + } + + if storeNotMatch := regionErr.GetStoreNotMatch(); storeNotMatch != nil { + // store not match + log.Warnf("tikv reports `StoreNotMatch`: %s, ctx: %v, retry later", storeNotMatch, ctx) + s.regionCache.ClearStoreByID(ctx.GetStoreID()) + return true, nil + } + + if staleEpoch := regionErr.GetStaleEpoch(); staleEpoch != nil { + log.Debugf("tikv reports `StaleEpoch`, ctx: %v, retry later", ctx) + err = s.regionCache.OnRegionStale(ctx, staleEpoch.NewRegions) + return false, errors.Trace(err) + } + if regionErr.GetServerIsBusy() != nil { + log.Warnf("tikv reports `ServerIsBusy`, reason: %s, ctx: %v, retry later", regionErr.GetServerIsBusy().GetReason(), ctx) + err = bo.Backoff(retry.BoServerBusy, errors.Errorf("server is busy, ctx: %v", ctx)) + if err != nil { + return false, errors.Trace(err) + } + return true, nil + } + if regionErr.GetStaleCommand() != nil { + log.Debugf("tikv reports `StaleCommand`, ctx: %v", ctx) + return true, nil + } + if regionErr.GetRaftEntryTooLarge() != nil { + log.Warnf("tikv reports `RaftEntryTooLarge`, ctx: %v", ctx) + return false, errors.New(regionErr.String()) + } + // For other errors, we only drop cache here. + // Because caller may need to re-split the request. + log.Debugf("tikv reports region error: %s, ctx: %v", regionErr, ctx) + s.regionCache.DropRegion(ctx.Region) + return false, nil +}