Merge pull request #1 from disksing/rawkv

Add rawkv support
This commit is contained in:
Huachao Huang 2019-01-10 16:27:31 +08:00 committed by GitHub
commit a62678bf38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 3263 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
vendor

2
Makefile Normal file
View File

@ -0,0 +1,2 @@
default:
GO111MODULE=on go build ./...

83
codec/bytes.go Normal file
View File

@ -0,0 +1,83 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package codec
import (
"bytes"
"github.com/pingcap/errors"
)
const (
encGroupSize = 8
encMarker = byte(0xFF)
encPad = byte(0x0)
)
var pads = make([]byte, encGroupSize)
// DecodeBytes decodes a TiDB encoded byte slice.
func DecodeBytes(b []byte) ([]byte, error) {
buf := make([]byte, 0, len(b)/(encGroupSize+1)*encGroupSize)
for {
if len(b) < encGroupSize+1 {
return nil, errors.New("insufficient bytes to decode value")
}
groupBytes := b[:encGroupSize+1]
group := groupBytes[:encGroupSize]
marker := groupBytes[encGroupSize]
padCount := encMarker - marker
if padCount > encGroupSize {
return nil, errors.Errorf("invalid marker byte, group bytes %q", groupBytes)
}
realGroupSize := encGroupSize - padCount
buf = append(buf, group[:realGroupSize]...)
b = b[encGroupSize+1:]
if padCount != 0 {
// Check validity of padding bytes.
if !bytes.Equal(group[realGroupSize:], pads[:padCount]) {
return nil, errors.Errorf("invalid padding byte, group bytes %q", groupBytes)
}
break
}
}
return buf, nil
}
// EncodeBytes encodes a byte slice into TiDB's encoded form.
func EncodeBytes(b []byte) []byte {
dLen := len(b)
reallocSize := (dLen/encGroupSize + 1) * (encGroupSize + 1)
result := make([]byte, 0, reallocSize)
for idx := 0; idx <= dLen; idx += encGroupSize {
remain := dLen - idx
padCount := 0
if remain >= encGroupSize {
result = append(result, b[idx:idx+encGroupSize]...)
} else {
padCount = encGroupSize - remain
result = append(result, b[idx:]...)
result = append(result, pads[:padCount]...)
}
marker := encMarker - byte(padCount)
result = append(result, marker)
}
return result
}

38
codec/meta.go Normal file
View File

@ -0,0 +1,38 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package codec
import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
)
// DecodeRegionMetaKey translates a region meta from encoded form to unencoded form.
func DecodeRegionMetaKey(r *metapb.Region) error {
if len(r.StartKey) != 0 {
decoded, err := DecodeBytes(r.StartKey)
if err != nil {
return errors.Trace(err)
}
r.StartKey = decoded
}
if len(r.EndKey) != 0 {
decoded, err := DecodeBytes(r.EndKey)
if err != nil {
return errors.Trace(err)
}
r.EndKey = decoded
}
return nil
}

67
config/config.go Normal file
View File

@ -0,0 +1,67 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package config
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"github.com/pkg/errors"
)
// Security is SSL configuration.
type Security struct {
SSLCA string `toml:"ssl-ca" json:"ssl-ca"`
SSLCert string `toml:"ssl-cert" json:"ssl-cert"`
SSLKey string `toml:"ssl-key" json:"ssl-key"`
}
// ToTLSConfig generates tls's config based on security section of the config.
func (s *Security) ToTLSConfig() (*tls.Config, error) {
var tlsConfig *tls.Config
if len(s.SSLCA) != 0 {
var certificates = make([]tls.Certificate, 0)
if len(s.SSLCert) != 0 && len(s.SSLKey) != 0 {
// Load the client certificates from disk
certificate, err := tls.LoadX509KeyPair(s.SSLCert, s.SSLKey)
if err != nil {
return nil, errors.Errorf("could not load client key pair: %s", err)
}
certificates = append(certificates, certificate)
}
// Create a certificate pool from the certificate authority
certPool := x509.NewCertPool()
ca, err := ioutil.ReadFile(s.SSLCA)
if err != nil {
return nil, errors.Errorf("could not read ca certificate: %s", err)
}
// Append the certificates from the CA
if !certPool.AppendCertsFromPEM(ca) {
return nil, errors.New("failed to append ca certs")
}
tlsConfig = &tls.Config{
Certificates: certificates,
RootCAs: certPool,
}
}
return tlsConfig, nil
}
// EnableOpenTracing is the flag to enable open tracing.
var EnableOpenTracing = false

54
go.mod Normal file
View File

@ -0,0 +1,54 @@
module github.com/tikv/client-go
require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/coreos/bbolt v1.3.1-coreos.6 // indirect
github.com/coreos/etcd v3.3.10+incompatible // indirect
github.com/coreos/go-semver v0.2.0 // indirect
github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142 // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/gogo/protobuf v1.2.0 // indirect
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/mux v1.6.2 // indirect
github.com/gorilla/websocket v1.4.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.6.3 // indirect
github.com/jonboulle/clockwork v0.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/montanaflynn/stats v0.0.0-20181214052348-945b007cb92f // indirect
github.com/opentracing/opentracing-go v1.0.2 // indirect
github.com/pingcap/check v0.0.0-20181222140913-41d022e836db // indirect
github.com/pingcap/errors v0.11.0
github.com/pingcap/gofail v0.0.0-20181115114620-e47081505b9c // indirect
github.com/pingcap/kvproto v0.0.0-20181224100128-c884c24ef88d
github.com/pingcap/pd v2.1.2+incompatible
github.com/pkg/errors v0.8.0
github.com/prometheus/client_golang v0.9.1
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 // indirect
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 // indirect
github.com/prometheus/procfs v0.0.0-20181129180645-aa55a523dc0a // indirect
github.com/sirupsen/logrus v1.2.0
github.com/soheilhy/cmux v0.1.4 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
github.com/ugorji/go v1.1.1 // indirect
github.com/unrolled/render v0.0.0-20181210145518-4c664cb3ad2f // indirect
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 // indirect
go.uber.org/atomic v1.3.2 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.9.1 // indirect
golang.org/x/net v0.0.0-20181220203305-927f97764cc3 // indirect
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 // indirect
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect
google.golang.org/genproto v0.0.0-20181221175505-bd9b4fb69e2f // indirect
google.golang.org/grpc v1.17.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)

146
go.sum Normal file
View File

@ -0,0 +1,146 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/bbolt v1.3.1-coreos.6 h1:uTXKg9gY70s9jMAKdfljFQcuh4e/BXOM+V+d00KFj3A=
github.com/coreos/bbolt v1.3.1-coreos.6/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible h1:KjVWqrZ5U0wa3CxY2AxlH6/UcB+PK2td1DcsYhA+HRs=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142 h1:3jFq2xL4ZajGK4aZY8jz+DAF0FHjI51BXjjSwCzS1Dk=
github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 h1:clC1lXBpe2kTj2VHdaIu9ajZQe4kcEY9j0NsnDDBZ3o=
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gogo/protobuf v0.0.0-20180717141946-636bf0302bc9/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff h1:kOkM9whyQYodu09SJ6W3NCsHG7crFaJILQ22Gozp3lg=
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v0.0.0-20180814211427-aa810b61a9c7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:BWIsLfhgKhV5g/oF34aRjniBHLTZe5DNekSjbAjIS6c=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.6.3 h1:oQ+8y59SMDn8Ita1Sh4f94XCUVp8AB84sppXP8Qgiow=
github.com/grpc-ecosystem/grpc-gateway v1.6.3/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/montanaflynn/stats v0.0.0-20181214052348-945b007cb92f h1:r//C+RGlxxi1gPODiDj/Y/uvv3OaZlZPSFz2SuwIees=
github.com/montanaflynn/stats v0.0.0-20181214052348-945b007cb92f/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pingcap/check v0.0.0-20181222140913-41d022e836db h1:yg93sLvBszRnzcd+Z5gkCUdYgud2scHYYxnRwljvRAM=
github.com/pingcap/check v0.0.0-20181222140913-41d022e836db/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/errors v0.11.0 h1:DCJQB8jrHbQ1VVlMFIrbj2ApScNNotVmkSNplu2yUt4=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/gofail v0.0.0-20181115114620-e47081505b9c h1:nYlcWGAYxDMdiRLjyhNJB9tMSuGaqu2M/CVd2RJx4QQ=
github.com/pingcap/gofail v0.0.0-20181115114620-e47081505b9c/go.mod h1:DazNTg0PTldtpsQiT9I5tVJwV1onHMKBBgXzmJUlMns=
github.com/pingcap/kvproto v0.0.0-20181224100128-c884c24ef88d h1:zjLW4mSw3fHsBXoDa01e9UROGnhA2Fm1Usu0oqlBIFk=
github.com/pingcap/kvproto v0.0.0-20181224100128-c884c24ef88d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/pd v2.1.2+incompatible h1:VQmYV7B/7ZdPmbDUHcz2jSswTgInrgWhAfF0YuPAlLw=
github.com/pingcap/pd v2.1.2+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1 h1:K47Rk0v/fkEfwfQet2KWhscE0cJzjgCCDBG2KHZoVno=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20181129180645-aa55a523dc0a h1:Z2GBQ7wAiTCixJhSGK4sMO/FHYlvFvUBBK0M0FSsxeU=
github.com/prometheus/procfs v0.0.0-20181129180645-aa55a523dc0a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 h1:lYIiVDtZnyTWlNwiAxLj0bbpTcx1BWCFhXjfsvmPdNc=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.1 h1:gmervu+jDMvXTbcHQ0pd2wee85nEoE0BsVyEuzkfK8w=
github.com/ugorji/go v1.1.1/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
github.com/unrolled/render v0.0.0-20181210145518-4c664cb3ad2f h1:+feYJlxPM00jEkdybexHiwIIOVuClwTEbh1WLiNr0mk=
github.com/unrolled/render v0.0.0-20181210145518-4c664cb3ad2f/go.mod h1:tu82oB5W2ykJRVioYsB+IQKcft7ryBr7w12qMBUPyXg=
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 h1:MPPkRncZLN9Kh4MEFmbnK4h3BD7AUmskWv2+EeZJCCs=
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181005035420-146acd28ed58 h1:otZG8yDCO4LVps5+9bxOeNiCvgmOyt96J3roHTYs7oE=
golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3 h1:eH6Eip3UpmR+yM/qI9Ijluzb1bNv/cAU/n+6l8tRSis=
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 h1:IcgEB62HYgAhX0Nd/QrVgZlxlcyxbGQHElLUhW2X4Fo=
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f h1:FU37niK8AQ59mHcskRyQL7H0ErSeNh650vdcj8HqdSI=
google.golang.org/genproto v0.0.0-20181004005441-af9cb2a35e7f/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20181221175505-bd9b4fb69e2f h1:eT3B0O2ghdSPzjAOznr3oOLyN1HFeYUncYl7FRwg4VI=
google.golang.org/genproto v0.0.0-20181221175505-bd9b4fb69e2f/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg=
google.golang.org/grpc v0.0.0-20180607172857-7a6a684ca69e/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
google.golang.org/grpc v1.17.0 h1:TRJYBgMclJvGYn2rIMjj+h9KtMt5r1Ij7ODVRIZkwhk=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

56
locate/codec.go Normal file
View File

@ -0,0 +1,56 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package locate
import (
"context"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
pd "github.com/pingcap/pd/client"
"github.com/tikv/client-go/codec"
)
type codecPDClient struct {
pd.Client
}
// GetRegion encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *codecPDClient) GetRegion(ctx context.Context, key []byte) (*metapb.Region, *metapb.Peer, error) {
encodedKey := codec.EncodeBytes(key)
region, peer, err := c.Client.GetRegion(ctx, encodedKey)
return processRegionResult(region, peer, err)
}
// GetRegion encodes the key before send requests to pd-server and decodes the
// returned StartKey && EndKey from pd-server.
func (c *codecPDClient) GetRegionByID(ctx context.Context, regionID uint64) (*metapb.Region, *metapb.Peer, error) {
region, peer, err := c.Client.GetRegionByID(ctx, regionID)
return processRegionResult(region, peer, err)
}
func processRegionResult(region *metapb.Region, peer *metapb.Peer, err error) (*metapb.Region, *metapb.Peer, error) {
if err != nil {
return nil, nil, errors.Trace(err)
}
if region == nil {
return nil, nil, nil
}
err = codec.DecodeRegionMetaKey(region)
if err != nil {
return nil, nil, errors.Trace(err)
}
return region, peer, nil
}

613
locate/region_cache.go Normal file
View File

@ -0,0 +1,613 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package locate
import (
"bytes"
"context"
"sync"
"sync/atomic"
"time"
"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/client"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/codec"
"github.com/tikv/client-go/metrics"
"github.com/tikv/client-go/retry"
)
const (
btreeDegree = 32
rcDefaultRegionCacheTTL = time.Minute * 10
)
// CachedRegion encapsulates {Region, TTL}
type CachedRegion struct {
region *Region
lastAccess int64
}
func (c *CachedRegion) isValid() bool {
lastAccess := atomic.LoadInt64(&c.lastAccess)
lastAccessTime := time.Unix(lastAccess, 0)
return time.Since(lastAccessTime) < rcDefaultRegionCacheTTL
}
// RegionCache caches Regions loaded from PD.
type RegionCache struct {
pdClient pd.Client
mu struct {
sync.RWMutex
regions map[RegionVerID]*CachedRegion
sorted *btree.BTree
}
storeMu struct {
sync.RWMutex
stores map[uint64]*Store
}
}
// NewRegionCache creates a RegionCache.
func NewRegionCache(pdClient pd.Client) *RegionCache {
c := &RegionCache{
pdClient: pdClient,
}
c.mu.regions = make(map[RegionVerID]*CachedRegion)
c.mu.sorted = btree.New(btreeDegree)
c.storeMu.stores = make(map[uint64]*Store)
return c
}
// RPCContext contains data that is needed to send RPC to a region.
type RPCContext struct {
Region RegionVerID
Meta *metapb.Region
Peer *metapb.Peer
Addr string
}
// GetStoreID returns StoreID.
func (c *RPCContext) GetStoreID() uint64 {
if c.Peer != nil {
return c.Peer.StoreId
}
return 0
}
// GetRPCContext returns RPCContext for a region. If it returns nil, the region
// must be out of date and already dropped from cache.
func (c *RegionCache) GetRPCContext(bo *retry.Backoffer, id RegionVerID) (*RPCContext, error) {
c.mu.RLock()
region := c.getCachedRegion(id)
if region == nil {
c.mu.RUnlock()
return nil, nil
}
// Note: it is safe to use region.meta and region.peer without clone after
// unlock, because region cache will never update the content of region's meta
// or peer. On the contrary, if we want to use `region` after unlock, then we
// need to clone it to avoid data race.
meta, peer := region.meta, region.peer
c.mu.RUnlock()
addr, err := c.GetStoreAddr(bo, peer.GetStoreId())
if err != nil {
return nil, errors.Trace(err)
}
if addr == "" {
// Store not found, region must be out of date.
c.DropRegion(id)
return nil, nil
}
return &RPCContext{
Region: id,
Meta: meta,
Peer: peer,
Addr: addr,
}, nil
}
// KeyLocation is the region and range that a key is located.
type KeyLocation struct {
Region RegionVerID
StartKey []byte
EndKey []byte
}
// Contains checks if key is in [StartKey, EndKey).
func (l *KeyLocation) Contains(key []byte) bool {
return bytes.Compare(l.StartKey, key) <= 0 &&
(bytes.Compare(key, l.EndKey) < 0 || len(l.EndKey) == 0)
}
// LocateKey searches for the region and range that the key is located.
func (c *RegionCache) LocateKey(bo *retry.Backoffer, key []byte) (*KeyLocation, error) {
c.mu.RLock()
r := c.searchCachedRegion(key)
if r != nil {
loc := &KeyLocation{
Region: r.VerID(),
StartKey: r.StartKey(),
EndKey: r.EndKey(),
}
c.mu.RUnlock()
return loc, nil
}
c.mu.RUnlock()
r, err := c.loadRegion(bo, key)
if err != nil {
return nil, errors.Trace(err)
}
c.mu.Lock()
defer c.mu.Unlock()
c.insertRegionToCache(r)
return &KeyLocation{
Region: r.VerID(),
StartKey: r.StartKey(),
EndKey: r.EndKey(),
}, nil
}
// LocateRegionByID searches for the region with ID.
func (c *RegionCache) LocateRegionByID(bo *retry.Backoffer, regionID uint64) (*KeyLocation, error) {
c.mu.RLock()
r := c.getRegionByIDFromCache(regionID)
if r != nil {
loc := &KeyLocation{
Region: r.VerID(),
StartKey: r.StartKey(),
EndKey: r.EndKey(),
}
c.mu.RUnlock()
return loc, nil
}
c.mu.RUnlock()
r, err := c.loadRegionByID(bo, regionID)
if err != nil {
return nil, errors.Trace(err)
}
c.mu.Lock()
defer c.mu.Unlock()
c.insertRegionToCache(r)
return &KeyLocation{
Region: r.VerID(),
StartKey: r.StartKey(),
EndKey: r.EndKey(),
}, nil
}
// GroupKeysByRegion separates keys into groups by their belonging Regions.
// Specially it also returns the first key's region which may be used as the
// 'PrimaryLockKey' and should be committed ahead of others.
func (c *RegionCache) GroupKeysByRegion(bo *retry.Backoffer, keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error) {
groups := make(map[RegionVerID][][]byte)
var first RegionVerID
var lastLoc *KeyLocation
for i, k := range keys {
if lastLoc == nil || !lastLoc.Contains(k) {
var err error
lastLoc, err = c.LocateKey(bo, k)
if err != nil {
return nil, first, errors.Trace(err)
}
}
id := lastLoc.Region
if i == 0 {
first = id
}
groups[id] = append(groups[id], k)
}
return groups, first, nil
}
// ListRegionIDsInKeyRange lists ids of regions in [start_key,end_key].
func (c *RegionCache) ListRegionIDsInKeyRange(bo *retry.Backoffer, startKey, endKey []byte) (regionIDs []uint64, err error) {
for {
curRegion, err := c.LocateKey(bo, startKey)
if err != nil {
return nil, errors.Trace(err)
}
regionIDs = append(regionIDs, curRegion.Region.id)
if curRegion.Contains(endKey) {
break
}
startKey = curRegion.EndKey
}
return regionIDs, nil
}
// DropRegion removes a cached Region.
func (c *RegionCache) DropRegion(id RegionVerID) {
c.mu.Lock()
defer c.mu.Unlock()
c.dropRegionFromCache(id)
}
// UpdateLeader update some region cache with newer leader info.
func (c *RegionCache) UpdateLeader(regionID RegionVerID, leaderStoreID uint64) {
c.mu.Lock()
defer c.mu.Unlock()
r := c.getCachedRegion(regionID)
if r == nil {
log.Debugf("regionCache: cannot find region when updating leader %d,%d", regionID, leaderStoreID)
return
}
if !r.SwitchPeer(leaderStoreID) {
log.Debugf("regionCache: cannot find peer when updating leader %d,%d", regionID, leaderStoreID)
c.dropRegionFromCache(r.VerID())
}
}
// insertRegionToCache tries to insert the Region to cache.
func (c *RegionCache) insertRegionToCache(r *Region) {
old := c.mu.sorted.ReplaceOrInsert(newBtreeItem(r))
if old != nil {
delete(c.mu.regions, old.(*btreeItem).region.VerID())
}
c.mu.regions[r.VerID()] = &CachedRegion{
region: r,
lastAccess: time.Now().Unix(),
}
}
// getCachedRegion loads a region from cache. It also checks if the region has
// not been accessed for a long time (maybe out of date). In this case, it
// returns nil so the region will be loaded from PD again.
// Note that it should be called with c.mu.RLock(), and the returned Region
// should not be used after c.mu is RUnlock().
func (c *RegionCache) getCachedRegion(id RegionVerID) *Region {
cachedRegion, ok := c.mu.regions[id]
if !ok {
return nil
}
if cachedRegion.isValid() {
atomic.StoreInt64(&cachedRegion.lastAccess, time.Now().Unix())
return cachedRegion.region
}
return nil
}
// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
// it should be called with c.mu.RLock(), and the returned Region should not be
// used after c.mu is RUnlock().
func (c *RegionCache) searchCachedRegion(key []byte) *Region {
var r *Region
c.mu.sorted.DescendLessOrEqual(newBtreeSearchItem(key), func(item btree.Item) bool {
r = item.(*btreeItem).region
return false
})
if r != nil && r.Contains(key) {
return c.getCachedRegion(r.VerID())
}
return nil
}
// getRegionByIDFromCache tries to get region by regionID from cache. Like
// `getCachedRegion`, it should be called with c.mu.RLock(), and the returned
// Region should not be used after c.mu is RUnlock().
func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region {
for v, r := range c.mu.regions {
if v.id == regionID {
return r.region
}
}
return nil
}
func (c *RegionCache) dropRegionFromCache(verID RegionVerID) {
r, ok := c.mu.regions[verID]
if !ok {
return
}
metrics.RegionCacheCounter.WithLabelValues("drop_region_from_cache", "ok").Inc()
c.mu.sorted.Delete(newBtreeItem(r.region))
delete(c.mu.regions, verID)
}
// loadRegion loads region from pd client, and picks the first peer as leader.
func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte) (*Region, error) {
var backoffErr error
for {
if backoffErr != nil {
err := bo.Backoff(retry.BoPDRPC, backoffErr)
if err != nil {
return nil, errors.Trace(err)
}
}
meta, leader, err := c.pdClient.GetRegion(bo.GetContext(), key)
metrics.RegionCacheCounter.WithLabelValues("get_region", metrics.RetLabel(err)).Inc()
if err != nil {
backoffErr = errors.Errorf("loadRegion from PD failed, key: %q, err: %v", key, err)
continue
}
if meta == nil {
backoffErr = errors.Errorf("region not found for key %q", key)
continue
}
if len(meta.Peers) == 0 {
return nil, errors.New("receive Region with no peer")
}
region := &Region{
meta: meta,
peer: meta.Peers[0],
}
if leader != nil {
region.SwitchPeer(leader.GetStoreId())
}
return region, nil
}
}
// loadRegionByID loads region from pd client, and picks the first peer as leader.
func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Region, error) {
var backoffErr error
for {
if backoffErr != nil {
err := bo.Backoff(retry.BoPDRPC, backoffErr)
if err != nil {
return nil, errors.Trace(err)
}
}
meta, leader, err := c.pdClient.GetRegionByID(bo.GetContext(), regionID)
metrics.RegionCacheCounter.WithLabelValues("get_region_by_id", metrics.RetLabel(err)).Inc()
if err != nil {
backoffErr = errors.Errorf("loadRegion from PD failed, regionID: %v, err: %v", regionID, err)
continue
}
if meta == nil {
backoffErr = errors.Errorf("region not found for regionID %q", regionID)
continue
}
if len(meta.Peers) == 0 {
return nil, errors.New("receive Region with no peer")
}
region := &Region{
meta: meta,
peer: meta.Peers[0],
}
if leader != nil {
region.SwitchPeer(leader.GetStoreId())
}
return region, nil
}
}
// GetStoreAddr returns a tikv server's address by its storeID. It checks cache
// first, sends request to pd server when necessary.
func (c *RegionCache) GetStoreAddr(bo *retry.Backoffer, id uint64) (string, error) {
c.storeMu.RLock()
if store, ok := c.storeMu.stores[id]; ok {
c.storeMu.RUnlock()
return store.Addr, nil
}
c.storeMu.RUnlock()
return c.ReloadStoreAddr(bo, id)
}
// ReloadStoreAddr reloads store's address.
func (c *RegionCache) ReloadStoreAddr(bo *retry.Backoffer, id uint64) (string, error) {
addr, err := c.loadStoreAddr(bo, id)
if err != nil || addr == "" {
return "", errors.Trace(err)
}
c.storeMu.Lock()
defer c.storeMu.Unlock()
c.storeMu.stores[id] = &Store{
ID: id,
Addr: addr,
}
return addr, nil
}
// ClearStoreByID clears store from cache with storeID.
func (c *RegionCache) ClearStoreByID(id uint64) {
c.storeMu.Lock()
defer c.storeMu.Unlock()
delete(c.storeMu.stores, id)
}
func (c *RegionCache) loadStoreAddr(bo *retry.Backoffer, id uint64) (string, error) {
for {
store, err := c.pdClient.GetStore(bo.GetContext(), id)
metrics.RegionCacheCounter.WithLabelValues("get_store", metrics.RetLabel(err)).Inc()
if err != nil {
if errors.Cause(err) == context.Canceled {
return "", errors.Trace(err)
}
err = errors.Errorf("loadStore from PD failed, id: %d, err: %v", id, err)
if err = bo.Backoff(retry.BoPDRPC, err); err != nil {
return "", errors.Trace(err)
}
continue
}
if store == nil {
return "", nil
}
return store.GetAddress(), nil
}
}
// DropStoreOnSendRequestFail is used for clearing cache when a tikv server does not respond.
func (c *RegionCache) DropStoreOnSendRequestFail(ctx *RPCContext, err error) {
// We need to drop the store only when the request is the first one failed on this store.
// Because too many concurrently requests trying to drop the store will be blocked on the lock.
failedRegionID := ctx.Region
failedStoreID := ctx.Peer.StoreId
c.mu.Lock()
_, ok := c.mu.regions[failedRegionID]
if !ok {
// The failed region is dropped already by another request, we don't need to iterate the regions
// and find regions on the failed store to drop.
c.mu.Unlock()
return
}
for id, r := range c.mu.regions {
if r.region.peer.GetStoreId() == failedStoreID {
c.dropRegionFromCache(id)
}
}
c.mu.Unlock()
// Store's meta may be out of date.
var failedStoreAddr string
c.storeMu.Lock()
store, ok := c.storeMu.stores[failedStoreID]
if ok {
failedStoreAddr = store.Addr
delete(c.storeMu.stores, failedStoreID)
}
c.storeMu.Unlock()
log.Infof("drop regions that on the store %d(%s) due to send request fail, err: %v",
failedStoreID, failedStoreAddr, err)
}
// OnRegionStale removes the old region and inserts new regions into the cache.
func (c *RegionCache) OnRegionStale(ctx *RPCContext, newRegions []*metapb.Region) error {
c.mu.Lock()
defer c.mu.Unlock()
c.dropRegionFromCache(ctx.Region)
for _, meta := range newRegions {
if _, ok := c.pdClient.(*codecPDClient); ok {
if err := codec.DecodeRegionMetaKey(meta); err != nil {
return errors.Errorf("newRegion's range key is not encoded: %v, %v", meta, err)
}
}
region := &Region{
meta: meta,
peer: meta.Peers[0],
}
region.SwitchPeer(ctx.Peer.GetStoreId())
c.insertRegionToCache(region)
}
return nil
}
// PDClient returns the pd.Client in RegionCache.
func (c *RegionCache) PDClient() pd.Client {
return c.pdClient
}
// btreeItem is BTree's Item that uses []byte to compare.
type btreeItem struct {
key []byte
region *Region
}
func newBtreeItem(r *Region) *btreeItem {
return &btreeItem{
key: r.StartKey(),
region: r,
}
}
func newBtreeSearchItem(key []byte) *btreeItem {
return &btreeItem{
key: key,
}
}
func (item *btreeItem) Less(other btree.Item) bool {
return bytes.Compare(item.key, other.(*btreeItem).key) < 0
}
// Region stores region's meta and its leader peer.
type Region struct {
meta *metapb.Region
peer *metapb.Peer
}
// GetID returns id.
func (r *Region) GetID() uint64 {
return r.meta.GetId()
}
// RegionVerID is a unique ID that can identify a Region at a specific version.
type RegionVerID struct {
id uint64
confVer uint64
ver uint64
}
// GetID returns the id of the region
func (r *RegionVerID) GetID() uint64 {
return r.id
}
// VerID returns the Region's RegionVerID.
func (r *Region) VerID() RegionVerID {
return RegionVerID{
id: r.meta.GetId(),
confVer: r.meta.GetRegionEpoch().GetConfVer(),
ver: r.meta.GetRegionEpoch().GetVersion(),
}
}
// StartKey returns StartKey.
func (r *Region) StartKey() []byte {
return r.meta.StartKey
}
// EndKey returns EndKey.
func (r *Region) EndKey() []byte {
return r.meta.EndKey
}
// GetContext constructs kvprotopb.Context from region info.
func (r *Region) GetContext() *kvrpcpb.Context {
return &kvrpcpb.Context{
RegionId: r.meta.Id,
RegionEpoch: r.meta.RegionEpoch,
Peer: r.peer,
}
}
// SwitchPeer switches current peer to the one on specific store. It returns
// false if no peer matches the storeID.
func (r *Region) SwitchPeer(storeID uint64) bool {
for _, p := range r.meta.Peers {
if p.GetStoreId() == storeID {
r.peer = p
return true
}
}
return false
}
// Contains checks whether the key is in the region, for the maximum region endKey is empty.
// startKey <= key < endKey.
func (r *Region) Contains(key []byte) bool {
return bytes.Compare(r.meta.GetStartKey(), key) <= 0 &&
(bytes.Compare(key, r.meta.GetEndKey()) < 0 || len(r.meta.GetEndKey()) == 0)
}
// Store contains a tikv server's address.
type Store struct {
ID uint64
Addr string
}

221
metrics/metrics.go Normal file
View File

@ -0,0 +1,221 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
import "github.com/prometheus/client_golang/prometheus"
// Client metrics.
// TODO: Create new grafana page for the metrics.
var (
TxnCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "txn_total",
Help: "Counter of created txns.",
})
SnapshotCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "snapshot_total",
Help: "Counter of snapshots.",
})
TxnCmdCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "txn_cmd_total",
Help: "Counter of txn commands.",
}, []string{"type"})
TxnCmdHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "txn_cmd_duration_seconds",
Help: "Bucketed histogram of processing time of txn cmds.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
}, []string{"type"})
BackoffCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "backoff_total",
Help: "Counter of backoff.",
}, []string{"type"})
BackoffHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "backoff_seconds",
Help: "total backoff seconds of a single backoffer.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
})
ConnPoolHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "get_conn_seconds",
Help: "Bucketed histogram of taking conn from conn pool.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
}, []string{"type"})
SendReqHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "request_seconds",
Help: "Bucketed histogram of sending request duration.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
}, []string{"type", "store"})
CoprocessorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "cop_actions_total",
Help: "Counter of coprocessor actions.",
}, []string{"type"})
CoprocessorHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "cop_duration_seconds",
Help: "Run duration of a single coprocessor task, includes backoff time.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
})
LockResolverCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "lock_resolver_actions_total",
Help: "Counter of lock resolver actions.",
}, []string{"type"})
RegionErrorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "region_err_total",
Help: "Counter of region errors.",
}, []string{"type"})
TxnWriteKVCountHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "txn_write_kv_num",
Help: "Count of kv pairs to write in a transaction.",
Buckets: prometheus.ExponentialBuckets(1, 2, 21),
})
TxnWriteSizeHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "txn_write_size_bytes",
Help: "Size of kv pairs to write in a transaction.",
Buckets: prometheus.ExponentialBuckets(1, 2, 21),
})
RawkvCmdHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "rawkv_cmd_seconds",
Help: "Bucketed histogram of processing time of rawkv cmds.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
}, []string{"type"})
RawkvSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "rawkv_kv_size_bytes",
Help: "Size of key/value to put, in bytes.",
Buckets: prometheus.ExponentialBuckets(1, 2, 21),
}, []string{"type"})
TxnRegionsNumHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "txn_regions_num",
Help: "Number of regions in a transaction.",
Buckets: prometheus.ExponentialBuckets(1, 2, 20),
}, []string{"type"})
LoadSafepointCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "load_safepoint_total",
Help: "Counter of load safepoint.",
}, []string{"type"})
SecondaryLockCleanupFailureCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "lock_cleanup_task_total",
Help: "failure statistic of secondary lock cleanup task.",
}, []string{"type"})
RegionCacheCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv",
Subsystem: "client_go",
Name: "region_cache_operations_total",
Help: "Counter of region cache.",
}, []string{"type", "result"})
)
// RetLabel returns "ok" when err == nil and "err" when err != nil.
// This could be useful when you need to observe the operation result.
func RetLabel(err error) string {
if err == nil {
return "ok"
}
return "err"
}
func init() {
prometheus.MustRegister(TxnCounter)
prometheus.MustRegister(SnapshotCounter)
prometheus.MustRegister(TxnCmdHistogram)
prometheus.MustRegister(BackoffCounter)
prometheus.MustRegister(BackoffHistogram)
prometheus.MustRegister(SendReqHistogram)
prometheus.MustRegister(ConnPoolHistogram)
prometheus.MustRegister(CoprocessorCounter)
prometheus.MustRegister(CoprocessorHistogram)
prometheus.MustRegister(LockResolverCounter)
prometheus.MustRegister(RegionErrorCounter)
prometheus.MustRegister(TxnWriteKVCountHistogram)
prometheus.MustRegister(TxnWriteSizeHistogram)
prometheus.MustRegister(RawkvCmdHistogram)
prometheus.MustRegister(RawkvSizeHistogram)
prometheus.MustRegister(TxnRegionsNumHistogram)
prometheus.MustRegister(LoadSafepointCounter)
prometheus.MustRegister(SecondaryLockCleanupFailureCounter)
prometheus.MustRegister(RegionCacheCounter)
}

18
raw/errors.go Normal file
View File

@ -0,0 +1,18 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package raw
import "github.com/pingcap/errors"
var ErrBodyMissing = errors.New("response body is missing")

619
raw/rawkv.go Normal file
View File

@ -0,0 +1,619 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package raw
import (
"bytes"
"context"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
pd "github.com/pingcap/pd/client"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/locate"
"github.com/tikv/client-go/metrics"
"github.com/tikv/client-go/retry"
"github.com/tikv/client-go/rpc"
)
var (
// MaxRawKVScanLimit is the maximum scan limit for rawkv Scan.
MaxRawKVScanLimit = 10240
// ErrMaxScanLimitExceeded is returned when the limit for rawkv Scan is to large.
ErrMaxScanLimitExceeded = errors.New("limit should be less than MaxRawKVScanLimit")
)
const (
// rawBatchPutSize is the maximum size limit for rawkv each batch put request.
rawBatchPutSize = 16 * 1024
// rawBatchPairCount is the maximum limit for rawkv each batch get/delete request.
rawBatchPairCount = 512
)
// RawKVClient is a client of TiKV server which is used as a key-value storage,
// only GET/PUT/DELETE commands are supported.
type RawKVClient struct {
clusterID uint64
regionCache *locate.RegionCache
pdClient pd.Client
rpcClient rpc.Client
}
// NewRawKVClient creates a client with PD cluster addrs.
func NewRawKVClient(pdAddrs []string, security config.Security) (*RawKVClient, error) {
pdCli, err := pd.NewClient(pdAddrs, pd.SecurityOption{
CAPath: security.SSLCA,
CertPath: security.SSLCert,
KeyPath: security.SSLKey,
})
if err != nil {
return nil, errors.Trace(err)
}
return &RawKVClient{
clusterID: pdCli.GetClusterID(context.TODO()),
regionCache: locate.NewRegionCache(pdCli),
pdClient: pdCli,
rpcClient: rpc.NewRPCClient(security),
}, nil
}
// Close closes the client.
func (c *RawKVClient) Close() error {
c.pdClient.Close()
return c.rpcClient.Close()
}
// ClusterID returns the TiKV cluster ID.
func (c *RawKVClient) ClusterID() uint64 {
return c.clusterID
}
// Get queries value with the key. When the key does not exist, it returns `nil, nil`.
func (c *RawKVClient) Get(key []byte) ([]byte, error) {
start := time.Now()
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
req := &rpc.Request{
Type: rpc.CmdRawGet,
RawGet: &kvrpcpb.RawGetRequest{
Key: key,
},
}
resp, _, err := c.sendReq(key, req)
if err != nil {
return nil, errors.Trace(err)
}
cmdResp := resp.RawGet
if cmdResp == nil {
return nil, errors.Trace(ErrBodyMissing)
}
if cmdResp.GetError() != "" {
return nil, errors.New(cmdResp.GetError())
}
if len(cmdResp.Value) == 0 {
return nil, nil
}
return cmdResp.Value, nil
}
// BatchGet queries values with the keys.
func (c *RawKVClient) BatchGet(keys [][]byte) ([][]byte, error) {
start := time.Now()
defer func() {
metrics.RawkvCmdHistogram.WithLabelValues("batch_get").Observe(time.Since(start).Seconds())
}()
bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff)
resp, err := c.sendBatchReq(bo, keys, rpc.CmdRawBatchGet)
if err != nil {
return nil, errors.Trace(err)
}
cmdResp := resp.RawBatchGet
if cmdResp == nil {
return nil, errors.Trace(ErrBodyMissing)
}
keyToValue := make(map[string][]byte, len(keys))
for _, pair := range cmdResp.Pairs {
keyToValue[string(pair.Key)] = pair.Value
}
values := make([][]byte, len(keys))
for i, key := range keys {
values[i] = keyToValue[string(key)]
}
return values, nil
}
// Put stores a key-value pair to TiKV.
func (c *RawKVClient) Put(key, value []byte) error {
start := time.Now()
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds()) }()
metrics.RawkvSizeHistogram.WithLabelValues("key").Observe(float64(len(key)))
metrics.RawkvSizeHistogram.WithLabelValues("value").Observe(float64(len(value)))
if len(value) == 0 {
return errors.New("empty value is not supported")
}
req := &rpc.Request{
Type: rpc.CmdRawPut,
RawPut: &kvrpcpb.RawPutRequest{
Key: key,
Value: value,
},
}
resp, _, err := c.sendReq(key, req)
if err != nil {
return errors.Trace(err)
}
cmdResp := resp.RawPut
if cmdResp == nil {
return errors.Trace(ErrBodyMissing)
}
if cmdResp.GetError() != "" {
return errors.New(cmdResp.GetError())
}
return nil
}
// BatchPut stores key-value pairs to TiKV.
func (c *RawKVClient) BatchPut(keys, values [][]byte) error {
start := time.Now()
defer func() {
metrics.RawkvCmdHistogram.WithLabelValues("batch_put").Observe(time.Since(start).Seconds())
}()
if len(keys) != len(values) {
return errors.New("the len of keys is not equal to the len of values")
}
for _, value := range values {
if len(value) == 0 {
return errors.New("empty value is not supported")
}
}
bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff)
err := c.sendBatchPut(bo, keys, values)
return errors.Trace(err)
}
// Delete deletes a key-value pair from TiKV.
func (c *RawKVClient) Delete(key []byte) error {
start := time.Now()
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds()) }()
req := &rpc.Request{
Type: rpc.CmdRawDelete,
RawDelete: &kvrpcpb.RawDeleteRequest{
Key: key,
},
}
resp, _, err := c.sendReq(key, req)
if err != nil {
return errors.Trace(err)
}
cmdResp := resp.RawDelete
if cmdResp == nil {
return errors.Trace(ErrBodyMissing)
}
if cmdResp.GetError() != "" {
return errors.New(cmdResp.GetError())
}
return nil
}
// BatchDelete deletes key-value pairs from TiKV
func (c *RawKVClient) BatchDelete(keys [][]byte) error {
start := time.Now()
defer func() {
metrics.RawkvCmdHistogram.WithLabelValues("batch_delete").Observe(time.Since(start).Seconds())
}()
bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff)
resp, err := c.sendBatchReq(bo, keys, rpc.CmdRawBatchDelete)
if err != nil {
return errors.Trace(err)
}
cmdResp := resp.RawBatchDelete
if cmdResp == nil {
return errors.Trace(ErrBodyMissing)
}
if cmdResp.GetError() != "" {
return errors.New(cmdResp.GetError())
}
return nil
}
// DeleteRange deletes all key-value pairs in a range from TiKV
func (c *RawKVClient) DeleteRange(startKey []byte, endKey []byte) error {
start := time.Now()
var err error
defer func() {
var label = "delete_range"
if err != nil {
label += "_error"
}
metrics.RawkvCmdHistogram.WithLabelValues(label).Observe(time.Since(start).Seconds())
}()
// Process each affected region respectively
for !bytes.Equal(startKey, endKey) {
var resp *rpc.Response
var actualEndKey []byte
resp, actualEndKey, err = c.sendDeleteRangeReq(startKey, endKey)
if err != nil {
return errors.Trace(err)
}
cmdResp := resp.RawDeleteRange
if cmdResp == nil {
return errors.Trace(ErrBodyMissing)
}
if cmdResp.GetError() != "" {
return errors.New(cmdResp.GetError())
}
startKey = actualEndKey
}
return nil
}
// Scan queries continuous kv pairs, starts from startKey, up to limit pairs.
// If you want to exclude the startKey, append a '\0' to the key: `Scan(append(startKey, '\0'), limit)`.
func (c *RawKVClient) Scan(startKey []byte, limit int) (keys [][]byte, values [][]byte, err error) {
start := time.Now()
defer func() { metrics.RawkvCmdHistogram.WithLabelValues("raw_scan").Observe(time.Since(start).Seconds()) }()
if limit > MaxRawKVScanLimit {
return nil, nil, errors.Trace(ErrMaxScanLimitExceeded)
}
for len(keys) < limit {
req := &rpc.Request{
Type: rpc.CmdRawScan,
RawScan: &kvrpcpb.RawScanRequest{
StartKey: startKey,
Limit: uint32(limit - len(keys)),
},
}
resp, loc, err := c.sendReq(startKey, req)
if err != nil {
return nil, nil, errors.Trace(err)
}
cmdResp := resp.RawScan
if cmdResp == nil {
return nil, nil, errors.Trace(ErrBodyMissing)
}
for _, pair := range cmdResp.Kvs {
keys = append(keys, pair.Key)
values = append(values, pair.Value)
}
startKey = loc.EndKey
if len(startKey) == 0 {
break
}
}
return
}
func (c *RawKVClient) sendReq(key []byte, req *rpc.Request) (*rpc.Response, *locate.KeyLocation, error) {
bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff)
sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)
for {
loc, err := c.regionCache.LocateKey(bo, key)
if err != nil {
return nil, nil, errors.Trace(err)
}
resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
if err != nil {
return nil, nil, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return nil, nil, errors.Trace(err)
}
if regionErr != nil {
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return nil, nil, errors.Trace(err)
}
continue
}
return resp, loc, nil
}
}
func (c *RawKVClient) sendBatchReq(bo *retry.Backoffer, keys [][]byte, cmdType rpc.CmdType) (*rpc.Response, error) { // split the keys
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
if err != nil {
return nil, errors.Trace(err)
}
var batches []batch
for regionID, groupKeys := range groups {
batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount)
}
bo, cancel := bo.Fork()
ches := make(chan singleBatchResp, len(batches))
for _, batch := range batches {
batch1 := batch
go func() {
singleBatchBackoffer, singleBatchCancel := bo.Fork()
defer singleBatchCancel()
ches <- c.doBatchReq(singleBatchBackoffer, batch1, cmdType)
}()
}
var firstError error
var resp *rpc.Response
switch cmdType {
case rpc.CmdRawBatchGet:
resp = &rpc.Response{Type: rpc.CmdRawBatchGet, RawBatchGet: &kvrpcpb.RawBatchGetResponse{}}
case rpc.CmdRawBatchDelete:
resp = &rpc.Response{Type: rpc.CmdRawBatchDelete, RawBatchDelete: &kvrpcpb.RawBatchDeleteResponse{}}
}
for i := 0; i < len(batches); i++ {
singleResp, ok := <-ches
if ok {
if singleResp.err != nil {
cancel()
if firstError == nil {
firstError = singleResp.err
}
} else if cmdType == rpc.CmdRawBatchGet {
cmdResp := singleResp.resp.RawBatchGet
resp.RawBatchGet.Pairs = append(resp.RawBatchGet.Pairs, cmdResp.Pairs...)
}
}
}
return resp, firstError
}
func (c *RawKVClient) doBatchReq(bo *retry.Backoffer, batch batch, cmdType rpc.CmdType) singleBatchResp {
var req *rpc.Request
switch cmdType {
case rpc.CmdRawBatchGet:
req = &rpc.Request{
Type: cmdType,
RawBatchGet: &kvrpcpb.RawBatchGetRequest{
Keys: batch.keys,
},
}
case rpc.CmdRawBatchDelete:
req = &rpc.Request{
Type: cmdType,
RawBatchDelete: &kvrpcpb.RawBatchDeleteRequest{
Keys: batch.keys,
},
}
}
sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)
resp, err := sender.SendReq(bo, req, batch.regionID, rpc.ReadTimeoutShort)
batchResp := singleBatchResp{}
if err != nil {
batchResp.err = errors.Trace(err)
return batchResp
}
regionErr, err := resp.GetRegionError()
if err != nil {
batchResp.err = errors.Trace(err)
return batchResp
}
if regionErr != nil {
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
batchResp.err = errors.Trace(err)
return batchResp
}
resp, err = c.sendBatchReq(bo, batch.keys, cmdType)
batchResp.resp = resp
batchResp.err = err
return batchResp
}
switch cmdType {
case rpc.CmdRawBatchGet:
batchResp.resp = resp
case rpc.CmdRawBatchDelete:
cmdResp := resp.RawBatchDelete
if cmdResp == nil {
batchResp.err = errors.Trace(ErrBodyMissing)
return batchResp
}
if cmdResp.GetError() != "" {
batchResp.err = errors.New(cmdResp.GetError())
return batchResp
}
batchResp.resp = resp
}
return batchResp
}
// sendDeleteRangeReq sends a raw delete range request and returns the response and the actual endKey.
// If the given range spans over more than one regions, the actual endKey is the end of the first region.
// We can't use sendReq directly, because we need to know the end of the region before we send the request
// TODO: Is there any better way to avoid duplicating code with func `sendReq` ?
func (c *RawKVClient) sendDeleteRangeReq(startKey []byte, endKey []byte) (*rpc.Response, []byte, error) {
bo := retry.NewBackoffer(context.Background(), retry.RawkvMaxBackoff)
sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)
for {
loc, err := c.regionCache.LocateKey(bo, startKey)
if err != nil {
return nil, nil, errors.Trace(err)
}
actualEndKey := endKey
if len(loc.EndKey) > 0 && bytes.Compare(loc.EndKey, endKey) < 0 {
actualEndKey = loc.EndKey
}
req := &rpc.Request{
Type: rpc.CmdRawDeleteRange,
RawDeleteRange: &kvrpcpb.RawDeleteRangeRequest{
StartKey: startKey,
EndKey: actualEndKey,
},
}
resp, err := sender.SendReq(bo, req, loc.Region, rpc.ReadTimeoutShort)
if err != nil {
return nil, nil, errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return nil, nil, errors.Trace(err)
}
if regionErr != nil {
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return nil, nil, errors.Trace(err)
}
continue
}
return resp, actualEndKey, nil
}
}
func (c *RawKVClient) sendBatchPut(bo *retry.Backoffer, keys, values [][]byte) error {
keyToValue := make(map[string][]byte)
for i, key := range keys {
keyToValue[string(key)] = values[i]
}
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
if err != nil {
return errors.Trace(err)
}
var batches []batch
// split the keys by size and RegionVerID
for regionID, groupKeys := range groups {
batches = appendBatches(batches, regionID, groupKeys, keyToValue, rawBatchPutSize)
}
bo, cancel := bo.Fork()
ch := make(chan error, len(batches))
for _, batch := range batches {
batch1 := batch
go func() {
singleBatchBackoffer, singleBatchCancel := bo.Fork()
defer singleBatchCancel()
ch <- c.doBatchPut(singleBatchBackoffer, batch1)
}()
}
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
cancel()
// catch the first error
if err == nil {
err = e
}
}
}
return errors.Trace(err)
}
func appendKeyBatches(batches []batch, regionID locate.RegionVerID, groupKeys [][]byte, limit int) []batch {
var keys [][]byte
for start, count := 0, 0; start < len(groupKeys); start++ {
if count > limit {
batches = append(batches, batch{regionID: regionID, keys: keys})
keys = make([][]byte, 0, limit)
count = 0
}
keys = append(keys, groupKeys[start])
count++
}
if len(keys) != 0 {
batches = append(batches, batch{regionID: regionID, keys: keys})
}
return batches
}
func appendBatches(batches []batch, regionID locate.RegionVerID, groupKeys [][]byte, keyToValue map[string][]byte, limit int) []batch {
var start, size int
var keys, values [][]byte
for start = 0; start < len(groupKeys); start++ {
if size >= limit {
batches = append(batches, batch{regionID: regionID, keys: keys, values: values})
keys = make([][]byte, 0)
values = make([][]byte, 0)
size = 0
}
key := groupKeys[start]
value := keyToValue[string(key)]
keys = append(keys, key)
values = append(values, value)
size += len(key)
size += len(value)
}
if len(keys) != 0 {
batches = append(batches, batch{regionID: regionID, keys: keys, values: values})
}
return batches
}
func (c *RawKVClient) doBatchPut(bo *retry.Backoffer, batch batch) error {
kvPair := make([]*kvrpcpb.KvPair, 0, len(batch.keys))
for i, key := range batch.keys {
kvPair = append(kvPair, &kvrpcpb.KvPair{Key: key, Value: batch.values[i]})
}
req := &rpc.Request{
Type: rpc.CmdRawBatchPut,
RawBatchPut: &kvrpcpb.RawBatchPutRequest{
Pairs: kvPair,
},
}
sender := rpc.NewRegionRequestSender(c.regionCache, c.rpcClient)
resp, err := sender.SendReq(bo, req, batch.regionID, rpc.ReadTimeoutShort)
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
err := bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return errors.Trace(err)
}
// recursive call
return c.sendBatchPut(bo, batch.keys, batch.values)
}
cmdResp := resp.RawBatchPut
if cmdResp == nil {
return errors.Trace(ErrBodyMissing)
}
if cmdResp.GetError() != "" {
return errors.New(cmdResp.GetError())
}
return nil
}
type batch struct {
regionID locate.RegionVerID
keys [][]byte
values [][]byte
}
type singleBatchResp struct {
resp *rpc.Response
err error
}

254
retry/backoff.go Normal file
View File

@ -0,0 +1,254 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package retry
import (
"context"
"fmt"
"math"
"math/rand"
"time"
"github.com/pingcap/errors"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/metrics"
)
const (
// NoJitter makes the backoff sequence strict exponential.
NoJitter = 1 + iota
// FullJitter applies random factors to strict exponential.
FullJitter
// EqualJitter is also randomized, but prevents very short sleeps.
EqualJitter
// DecorrJitter increases the maximum jitter based on the last random value.
DecorrJitter
)
// NewBackoffFn creates a backoff func which implements exponential backoff with
// optional jitters.
// See http://www.awsarchitectureblog.com/2015/03/backoff.html
func NewBackoffFn(base, cap, jitter int) func(ctx context.Context) int {
if base < 2 {
// Top prevent panic in 'rand.Intn'.
base = 2
}
attempts := 0
lastSleep := base
return func(ctx context.Context) int {
var sleep int
switch jitter {
case NoJitter:
sleep = expo(base, cap, attempts)
case FullJitter:
v := expo(base, cap, attempts)
sleep = rand.Intn(v)
case EqualJitter:
v := expo(base, cap, attempts)
sleep = v/2 + rand.Intn(v/2)
case DecorrJitter:
sleep = int(math.Min(float64(cap), float64(base+rand.Intn(lastSleep*3-base))))
}
log.Debugf("backoff base %d, sleep %d", base, sleep)
select {
case <-time.After(time.Duration(sleep) * time.Millisecond):
case <-ctx.Done():
}
attempts++
lastSleep = sleep
return lastSleep
}
}
func expo(base, cap, n int) int {
return int(math.Min(float64(cap), float64(base)*math.Pow(2.0, float64(n))))
}
// BackoffType is the retryable error type.
type BackoffType int
// Back off types.
const (
BoTiKVRPC BackoffType = iota
BoTxnLock
BoTxnLockFast
BoPDRPC
BoRegionMiss
BoUpdateLeader
BoServerBusy
)
func (t BackoffType) createFn() func(context.Context) int {
switch t {
case BoTiKVRPC:
return NewBackoffFn(100, 2000, EqualJitter)
case BoTxnLock:
return NewBackoffFn(200, 3000, EqualJitter)
case BoTxnLockFast:
return NewBackoffFn(50, 3000, EqualJitter)
case BoPDRPC:
return NewBackoffFn(500, 3000, EqualJitter)
case BoRegionMiss:
// change base time to 2ms, because it may recover soon.
return NewBackoffFn(2, 500, NoJitter)
case BoUpdateLeader:
return NewBackoffFn(1, 10, NoJitter)
case BoServerBusy:
return NewBackoffFn(2000, 10000, EqualJitter)
}
return nil
}
func (t BackoffType) String() string {
switch t {
case BoTiKVRPC:
return "tikvRPC"
case BoTxnLock:
return "txnLock"
case BoTxnLockFast:
return "txnLockFast"
case BoPDRPC:
return "pdRPC"
case BoRegionMiss:
return "regionMiss"
case BoUpdateLeader:
return "updateLeader"
case BoServerBusy:
return "serverBusy"
}
return ""
}
// Maximum total sleep time(in ms) for kv/cop commands.
const (
copBuildTaskMaxBackoff = 5000
tsoMaxBackoff = 15000
scannerNextMaxBackoff = 20000
batchGetMaxBackoff = 20000
copNextMaxBackoff = 20000
getMaxBackoff = 20000
prewriteMaxBackoff = 20000
cleanupMaxBackoff = 20000
GcOneRegionMaxBackoff = 20000
GcResolveLockMaxBackoff = 100000
deleteRangeOneRegionMaxBackoff = 100000
RawkvMaxBackoff = 20000
splitRegionBackoff = 20000
)
// CommitMaxBackoff is max sleep time of the 'commit' command
var CommitMaxBackoff = 41000
// Backoffer is a utility for retrying queries.
type Backoffer struct {
ctx context.Context
fn map[BackoffType]func(context.Context) int
maxSleep int
totalSleep int
errors []error
types []BackoffType
}
// txnStartKey is a key for transaction start_ts info in context.Context.
const txnStartKey = "_txn_start_key"
// NewBackoffer creates a Backoffer with maximum sleep time(in ms).
func NewBackoffer(ctx context.Context, maxSleep int) *Backoffer {
return &Backoffer{
ctx: ctx,
maxSleep: maxSleep,
}
}
// Backoff sleeps a while base on the BackoffType and records the error message.
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *Backoffer) Backoff(typ BackoffType, err error) error {
select {
case <-b.ctx.Done():
return errors.Trace(err)
default:
}
metrics.BackoffCounter.WithLabelValues(typ.String()).Inc()
// Lazy initialize.
if b.fn == nil {
b.fn = make(map[BackoffType]func(context.Context) int)
}
f, ok := b.fn[typ]
if !ok {
f = typ.createFn()
b.fn[typ] = f
}
b.totalSleep += f(b.ctx)
b.types = append(b.types, typ)
var startTs interface{} = ""
if ts := b.ctx.Value(txnStartKey); ts != nil {
startTs = ts
}
log.Debugf("%v, retry later(totalsleep %dms, maxsleep %dms), type: %s, txn_start_ts: %v", err, b.totalSleep, b.maxSleep, typ.String(), startTs)
b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano)))
if b.maxSleep > 0 && b.totalSleep >= b.maxSleep {
errMsg := fmt.Sprintf("backoffer.maxSleep %dms is exceeded, errors:", b.maxSleep)
for i, err := range b.errors {
// Print only last 3 errors for non-DEBUG log levels.
if log.GetLevel() == log.DebugLevel || i >= len(b.errors)-3 {
errMsg += "\n" + err.Error()
}
}
log.Warn(errMsg)
// Use the first backoff type to generate a MySQL error.
return errors.New(b.types[0].String())
}
return nil
}
func (b *Backoffer) String() string {
if b.totalSleep == 0 {
return ""
}
return fmt.Sprintf(" backoff(%dms %v)", b.totalSleep, b.types)
}
// Clone creates a new Backoffer which keeps current Backoffer's sleep time and errors, and shares
// current Backoffer's context.
func (b *Backoffer) Clone() *Backoffer {
return &Backoffer{
ctx: b.ctx,
maxSleep: b.maxSleep,
totalSleep: b.totalSleep,
errors: b.errors,
}
}
// Fork creates a new Backoffer which keeps current Backoffer's sleep time and errors, and holds
// a child context of current Backoffer's context.
func (b *Backoffer) Fork() (*Backoffer, context.CancelFunc) {
ctx, cancel := context.WithCancel(b.ctx)
return &Backoffer{
ctx: ctx,
maxSleep: b.maxSleep,
totalSleep: b.totalSleep,
errors: b.errors[:len(b.errors):len(b.errors)],
}, cancel
}
// GetContext returns the associated context.
func (b *Backoffer) GetContext() context.Context {
return b.ctx
}

558
rpc/calls.go Normal file
View File

@ -0,0 +1,558 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package rpc
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/tikvpb"
)
// CmdType represents the concrete request type in Request or response type in Response.
type CmdType uint16
// CmdType values.
const (
CmdGet CmdType = 1 + iota
CmdScan
CmdPrewrite
CmdCommit
CmdCleanup
CmdBatchGet
CmdBatchRollback
CmdScanLock
CmdResolveLock
CmdGC
CmdDeleteRange
CmdRawGet CmdType = 256 + iota
CmdRawBatchGet
CmdRawPut
CmdRawBatchPut
CmdRawDelete
CmdRawBatchDelete
CmdRawDeleteRange
CmdRawScan
CmdUnsafeDestroyRange
CmdCop CmdType = 512 + iota
CmdCopStream
CmdMvccGetByKey CmdType = 1024 + iota
CmdMvccGetByStartTs
CmdSplitRegion
)
func (t CmdType) String() string {
switch t {
case CmdGet:
return "Get"
case CmdScan:
return "Scan"
case CmdPrewrite:
return "Prewrite"
case CmdCommit:
return "Commit"
case CmdCleanup:
return "Cleanup"
case CmdBatchGet:
return "BatchGet"
case CmdBatchRollback:
return "BatchRollback"
case CmdScanLock:
return "ScanLock"
case CmdResolveLock:
return "ResolveLock"
case CmdGC:
return "GC"
case CmdDeleteRange:
return "DeleteRange"
case CmdRawGet:
return "RawGet"
case CmdRawBatchGet:
return "RawBatchGet"
case CmdRawPut:
return "RawPut"
case CmdRawBatchPut:
return "RawBatchPut"
case CmdRawDelete:
return "RawDelete"
case CmdRawBatchDelete:
return "RawBatchDelete"
case CmdRawDeleteRange:
return "RawDeleteRange"
case CmdRawScan:
return "RawScan"
case CmdUnsafeDestroyRange:
return "UnsafeDestroyRange"
case CmdCop:
return "Cop"
case CmdCopStream:
return "CopStream"
case CmdMvccGetByKey:
return "MvccGetByKey"
case CmdMvccGetByStartTs:
return "MvccGetByStartTS"
case CmdSplitRegion:
return "SplitRegion"
}
return "Unknown"
}
// Request wraps all kv/coprocessor requests.
type Request struct {
kvrpcpb.Context
Type CmdType
Get *kvrpcpb.GetRequest
Scan *kvrpcpb.ScanRequest
Prewrite *kvrpcpb.PrewriteRequest
Commit *kvrpcpb.CommitRequest
Cleanup *kvrpcpb.CleanupRequest
BatchGet *kvrpcpb.BatchGetRequest
BatchRollback *kvrpcpb.BatchRollbackRequest
ScanLock *kvrpcpb.ScanLockRequest
ResolveLock *kvrpcpb.ResolveLockRequest
GC *kvrpcpb.GCRequest
DeleteRange *kvrpcpb.DeleteRangeRequest
RawGet *kvrpcpb.RawGetRequest
RawBatchGet *kvrpcpb.RawBatchGetRequest
RawPut *kvrpcpb.RawPutRequest
RawBatchPut *kvrpcpb.RawBatchPutRequest
RawDelete *kvrpcpb.RawDeleteRequest
RawBatchDelete *kvrpcpb.RawBatchDeleteRequest
RawDeleteRange *kvrpcpb.RawDeleteRangeRequest
RawScan *kvrpcpb.RawScanRequest
UnsafeDestroyRange *kvrpcpb.UnsafeDestroyRangeRequest
Cop *coprocessor.Request
MvccGetByKey *kvrpcpb.MvccGetByKeyRequest
MvccGetByStartTs *kvrpcpb.MvccGetByStartTsRequest
SplitRegion *kvrpcpb.SplitRegionRequest
}
// Response wraps all kv/coprocessor responses.
type Response struct {
Type CmdType
Get *kvrpcpb.GetResponse
Scan *kvrpcpb.ScanResponse
Prewrite *kvrpcpb.PrewriteResponse
Commit *kvrpcpb.CommitResponse
Cleanup *kvrpcpb.CleanupResponse
BatchGet *kvrpcpb.BatchGetResponse
BatchRollback *kvrpcpb.BatchRollbackResponse
ScanLock *kvrpcpb.ScanLockResponse
ResolveLock *kvrpcpb.ResolveLockResponse
GC *kvrpcpb.GCResponse
DeleteRange *kvrpcpb.DeleteRangeResponse
RawGet *kvrpcpb.RawGetResponse
RawBatchGet *kvrpcpb.RawBatchGetResponse
RawPut *kvrpcpb.RawPutResponse
RawBatchPut *kvrpcpb.RawBatchPutResponse
RawDelete *kvrpcpb.RawDeleteResponse
RawBatchDelete *kvrpcpb.RawBatchDeleteResponse
RawDeleteRange *kvrpcpb.RawDeleteRangeResponse
RawScan *kvrpcpb.RawScanResponse
UnsafeDestroyRange *kvrpcpb.UnsafeDestroyRangeResponse
Cop *coprocessor.Response
CopStream *CopStreamResponse
MvccGetByKey *kvrpcpb.MvccGetByKeyResponse
MvccGetByStartTS *kvrpcpb.MvccGetByStartTsResponse
SplitRegion *kvrpcpb.SplitRegionResponse
}
// CopStreamResponse combinates tikvpb.Tikv_CoprocessorStreamClient and the first Recv() result together.
// In streaming API, get grpc stream client may not involve any network packet, then region error have
// to be handled in Recv() function. This struct facilitates the error handling.
type CopStreamResponse struct {
tikvpb.Tikv_CoprocessorStreamClient
*coprocessor.Response // The first result of Recv()
Timeout time.Duration
Lease // Shared by this object and a background goroutine.
}
// SetContext set the Context field for the given req to the specified ctx.
func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
ctx := &req.Context
ctx.RegionId = region.Id
ctx.RegionEpoch = region.RegionEpoch
ctx.Peer = peer
switch req.Type {
case CmdGet:
req.Get.Context = ctx
case CmdScan:
req.Scan.Context = ctx
case CmdPrewrite:
req.Prewrite.Context = ctx
case CmdCommit:
req.Commit.Context = ctx
case CmdCleanup:
req.Cleanup.Context = ctx
case CmdBatchGet:
req.BatchGet.Context = ctx
case CmdBatchRollback:
req.BatchRollback.Context = ctx
case CmdScanLock:
req.ScanLock.Context = ctx
case CmdResolveLock:
req.ResolveLock.Context = ctx
case CmdGC:
req.GC.Context = ctx
case CmdDeleteRange:
req.DeleteRange.Context = ctx
case CmdRawGet:
req.RawGet.Context = ctx
case CmdRawBatchGet:
req.RawBatchGet.Context = ctx
case CmdRawPut:
req.RawPut.Context = ctx
case CmdRawBatchPut:
req.RawBatchPut.Context = ctx
case CmdRawDelete:
req.RawDelete.Context = ctx
case CmdRawBatchDelete:
req.RawBatchDelete.Context = ctx
case CmdRawDeleteRange:
req.RawDeleteRange.Context = ctx
case CmdRawScan:
req.RawScan.Context = ctx
case CmdUnsafeDestroyRange:
req.UnsafeDestroyRange.Context = ctx
case CmdCop:
req.Cop.Context = ctx
case CmdCopStream:
req.Cop.Context = ctx
case CmdMvccGetByKey:
req.MvccGetByKey.Context = ctx
case CmdMvccGetByStartTs:
req.MvccGetByStartTs.Context = ctx
case CmdSplitRegion:
req.SplitRegion.Context = ctx
default:
return fmt.Errorf("invalid request type %v", req.Type)
}
return nil
}
// GenRegionErrorResp returns corresponding Response with specified RegionError
// according to the given req.
func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
resp := &Response{}
resp.Type = req.Type
switch req.Type {
case CmdGet:
resp.Get = &kvrpcpb.GetResponse{
RegionError: e,
}
case CmdScan:
resp.Scan = &kvrpcpb.ScanResponse{
RegionError: e,
}
case CmdPrewrite:
resp.Prewrite = &kvrpcpb.PrewriteResponse{
RegionError: e,
}
case CmdCommit:
resp.Commit = &kvrpcpb.CommitResponse{
RegionError: e,
}
case CmdCleanup:
resp.Cleanup = &kvrpcpb.CleanupResponse{
RegionError: e,
}
case CmdBatchGet:
resp.BatchGet = &kvrpcpb.BatchGetResponse{
RegionError: e,
}
case CmdBatchRollback:
resp.BatchRollback = &kvrpcpb.BatchRollbackResponse{
RegionError: e,
}
case CmdScanLock:
resp.ScanLock = &kvrpcpb.ScanLockResponse{
RegionError: e,
}
case CmdResolveLock:
resp.ResolveLock = &kvrpcpb.ResolveLockResponse{
RegionError: e,
}
case CmdGC:
resp.GC = &kvrpcpb.GCResponse{
RegionError: e,
}
case CmdDeleteRange:
resp.DeleteRange = &kvrpcpb.DeleteRangeResponse{
RegionError: e,
}
case CmdRawGet:
resp.RawGet = &kvrpcpb.RawGetResponse{
RegionError: e,
}
case CmdRawBatchGet:
resp.RawBatchGet = &kvrpcpb.RawBatchGetResponse{
RegionError: e,
}
case CmdRawPut:
resp.RawPut = &kvrpcpb.RawPutResponse{
RegionError: e,
}
case CmdRawBatchPut:
resp.RawBatchPut = &kvrpcpb.RawBatchPutResponse{
RegionError: e,
}
case CmdRawDelete:
resp.RawDelete = &kvrpcpb.RawDeleteResponse{
RegionError: e,
}
case CmdRawBatchDelete:
resp.RawBatchDelete = &kvrpcpb.RawBatchDeleteResponse{
RegionError: e,
}
case CmdRawDeleteRange:
resp.RawDeleteRange = &kvrpcpb.RawDeleteRangeResponse{
RegionError: e,
}
case CmdRawScan:
resp.RawScan = &kvrpcpb.RawScanResponse{
RegionError: e,
}
case CmdUnsafeDestroyRange:
resp.UnsafeDestroyRange = &kvrpcpb.UnsafeDestroyRangeResponse{
RegionError: e,
}
case CmdCop:
resp.Cop = &coprocessor.Response{
RegionError: e,
}
case CmdCopStream:
resp.CopStream = &CopStreamResponse{
Response: &coprocessor.Response{
RegionError: e,
},
}
case CmdMvccGetByKey:
resp.MvccGetByKey = &kvrpcpb.MvccGetByKeyResponse{
RegionError: e,
}
case CmdMvccGetByStartTs:
resp.MvccGetByStartTS = &kvrpcpb.MvccGetByStartTsResponse{
RegionError: e,
}
case CmdSplitRegion:
resp.SplitRegion = &kvrpcpb.SplitRegionResponse{
RegionError: e,
}
default:
return nil, fmt.Errorf("invalid request type %v", req.Type)
}
return resp, nil
}
// GetRegionError returns the RegionError of the underlying concrete response.
func (resp *Response) GetRegionError() (*errorpb.Error, error) {
var e *errorpb.Error
switch resp.Type {
case CmdGet:
e = resp.Get.GetRegionError()
case CmdScan:
e = resp.Scan.GetRegionError()
case CmdPrewrite:
e = resp.Prewrite.GetRegionError()
case CmdCommit:
e = resp.Commit.GetRegionError()
case CmdCleanup:
e = resp.Cleanup.GetRegionError()
case CmdBatchGet:
e = resp.BatchGet.GetRegionError()
case CmdBatchRollback:
e = resp.BatchRollback.GetRegionError()
case CmdScanLock:
e = resp.ScanLock.GetRegionError()
case CmdResolveLock:
e = resp.ResolveLock.GetRegionError()
case CmdGC:
e = resp.GC.GetRegionError()
case CmdDeleteRange:
e = resp.DeleteRange.GetRegionError()
case CmdRawGet:
e = resp.RawGet.GetRegionError()
case CmdRawBatchGet:
e = resp.RawBatchGet.GetRegionError()
case CmdRawPut:
e = resp.RawPut.GetRegionError()
case CmdRawBatchPut:
e = resp.RawBatchPut.GetRegionError()
case CmdRawDelete:
e = resp.RawDelete.GetRegionError()
case CmdRawBatchDelete:
e = resp.RawBatchDelete.GetRegionError()
case CmdRawDeleteRange:
e = resp.RawDeleteRange.GetRegionError()
case CmdRawScan:
e = resp.RawScan.GetRegionError()
case CmdUnsafeDestroyRange:
e = resp.UnsafeDestroyRange.GetRegionError()
case CmdCop:
e = resp.Cop.GetRegionError()
case CmdCopStream:
e = resp.CopStream.Response.GetRegionError()
case CmdMvccGetByKey:
e = resp.MvccGetByKey.GetRegionError()
case CmdMvccGetByStartTs:
e = resp.MvccGetByStartTS.GetRegionError()
case CmdSplitRegion:
e = resp.SplitRegion.GetRegionError()
default:
return nil, fmt.Errorf("invalid response type %v", resp.Type)
}
return e, nil
}
// CallRPC launches a rpc call.
// ch is needed to implement timeout for coprocessor streaing, the stream object's
// cancel function will be sent to the channel, together with a lease checked by a background goroutine.
func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Response, error) {
resp := &Response{}
resp.Type = req.Type
var err error
switch req.Type {
case CmdGet:
resp.Get, err = client.KvGet(ctx, req.Get)
case CmdScan:
resp.Scan, err = client.KvScan(ctx, req.Scan)
case CmdPrewrite:
resp.Prewrite, err = client.KvPrewrite(ctx, req.Prewrite)
case CmdCommit:
resp.Commit, err = client.KvCommit(ctx, req.Commit)
case CmdCleanup:
resp.Cleanup, err = client.KvCleanup(ctx, req.Cleanup)
case CmdBatchGet:
resp.BatchGet, err = client.KvBatchGet(ctx, req.BatchGet)
case CmdBatchRollback:
resp.BatchRollback, err = client.KvBatchRollback(ctx, req.BatchRollback)
case CmdScanLock:
resp.ScanLock, err = client.KvScanLock(ctx, req.ScanLock)
case CmdResolveLock:
resp.ResolveLock, err = client.KvResolveLock(ctx, req.ResolveLock)
case CmdGC:
resp.GC, err = client.KvGC(ctx, req.GC)
case CmdDeleteRange:
resp.DeleteRange, err = client.KvDeleteRange(ctx, req.DeleteRange)
case CmdRawGet:
resp.RawGet, err = client.RawGet(ctx, req.RawGet)
case CmdRawBatchGet:
resp.RawBatchGet, err = client.RawBatchGet(ctx, req.RawBatchGet)
case CmdRawPut:
resp.RawPut, err = client.RawPut(ctx, req.RawPut)
case CmdRawBatchPut:
resp.RawBatchPut, err = client.RawBatchPut(ctx, req.RawBatchPut)
case CmdRawDelete:
resp.RawDelete, err = client.RawDelete(ctx, req.RawDelete)
case CmdRawBatchDelete:
resp.RawBatchDelete, err = client.RawBatchDelete(ctx, req.RawBatchDelete)
case CmdRawDeleteRange:
resp.RawDeleteRange, err = client.RawDeleteRange(ctx, req.RawDeleteRange)
case CmdRawScan:
resp.RawScan, err = client.RawScan(ctx, req.RawScan)
case CmdUnsafeDestroyRange:
resp.UnsafeDestroyRange, err = client.UnsafeDestroyRange(ctx, req.UnsafeDestroyRange)
case CmdCop:
resp.Cop, err = client.Coprocessor(ctx, req.Cop)
case CmdCopStream:
var streamClient tikvpb.Tikv_CoprocessorStreamClient
streamClient, err = client.CoprocessorStream(ctx, req.Cop)
resp.CopStream = &CopStreamResponse{
Tikv_CoprocessorStreamClient: streamClient,
}
case CmdMvccGetByKey:
resp.MvccGetByKey, err = client.MvccGetByKey(ctx, req.MvccGetByKey)
case CmdMvccGetByStartTs:
resp.MvccGetByStartTS, err = client.MvccGetByStartTs(ctx, req.MvccGetByStartTs)
case CmdSplitRegion:
resp.SplitRegion, err = client.SplitRegion(ctx, req.SplitRegion)
default:
return nil, errors.Errorf("invalid request type: %v", req.Type)
}
if err != nil {
return nil, errors.Trace(err)
}
return resp, nil
}
// Lease is used to implement grpc stream timeout.
type Lease struct {
Cancel context.CancelFunc
deadline int64 // A time.UnixNano value, if time.Now().UnixNano() > deadline, cancel() would be called.
}
// Recv overrides the stream client Recv() function.
func (resp *CopStreamResponse) Recv() (*coprocessor.Response, error) {
deadline := time.Now().Add(resp.Timeout).UnixNano()
atomic.StoreInt64(&resp.Lease.deadline, deadline)
ret, err := resp.Tikv_CoprocessorStreamClient.Recv()
atomic.StoreInt64(&resp.Lease.deadline, 0) // Stop the lease check.
return ret, errors.Trace(err)
}
// Close closes the CopStreamResponse object.
func (resp *CopStreamResponse) Close() {
atomic.StoreInt64(&resp.Lease.deadline, 1)
}
// CheckStreamTimeoutLoop runs periodically to check is there any stream request timeouted.
// Lease is an object to track stream requests, call this function with "go CheckStreamTimeoutLoop()"
func CheckStreamTimeoutLoop(ch <-chan *Lease) {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
array := make([]*Lease, 0, 1024)
for {
select {
case item, ok := <-ch:
if !ok {
// This channel close means goroutine should return.
return
}
array = append(array, item)
case now := <-ticker.C:
array = keepOnlyActive(array, now.UnixNano())
}
}
}
// keepOnlyActive removes completed items, call cancel function for timeout items.
func keepOnlyActive(array []*Lease, now int64) []*Lease {
idx := 0
for i := 0; i < len(array); i++ {
item := array[i]
deadline := atomic.LoadInt64(&item.deadline)
if deadline == 0 || deadline > now {
array[idx] = array[i]
idx++
} else {
item.Cancel()
}
}
return array[:idx]
}

290
rpc/client.go Normal file
View File

@ -0,0 +1,290 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package rpc
import (
"context"
"io"
"strconv"
"sync"
"sync/atomic"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/tikvpb"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/config"
"github.com/tikv/client-go/metrics"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)
// MaxConnectionCount is the max gRPC connections that will be established with
// each tikv-server.
var MaxConnectionCount uint = 16
// GrpcKeepAliveTime is the duration of time after which if the client doesn't see
// any activity it pings the server to see if the transport is still alive.
var GrpcKeepAliveTime = time.Duration(10) * time.Second
// GrpcKeepAliveTimeout is the duration of time for which the client waits after having
// pinged for keepalive check and if no activity is seen even after that the connection
// is closed.
var GrpcKeepAliveTimeout = time.Duration(3) * time.Second
// MaxSendMsgSize set max gRPC request message size sent to server. If any request message size is larger than
// current value, an error will be reported from gRPC.
var MaxSendMsgSize = 1<<31 - 1
// MaxCallMsgSize set max gRPC receive message size received from server. If any message size is larger than
// current value, an error will be reported from gRPC.
var MaxCallMsgSize = 1<<31 - 1
// Timeout durations.
const (
dialTimeout = 5 * time.Second
ReadTimeoutShort = 20 * time.Second // For requests that read/write several key-values.
ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region.
ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times.
GCTimeout = 5 * time.Minute
UnsafeDestroyRangeTimeout = 5 * time.Minute
grpcInitialWindowSize = 1 << 30
grpcInitialConnWindowSize = 1 << 30
)
// Client is a client that sends RPC.
// It should not be used after calling Close().
type Client interface {
// Close should release all data.
Close() error
// SendRequest sends Request.
SendRequest(ctx context.Context, addr string, req *Request, timeout time.Duration) (*Response, error)
}
type connArray struct {
index uint32
v []*grpc.ClientConn
// Bind with a background goroutine to process coprocessor streaming timeout.
streamTimeout chan *Lease
}
func newConnArray(maxSize uint, addr string, security config.Security) (*connArray, error) {
a := &connArray{
index: 0,
v: make([]*grpc.ClientConn, maxSize),
streamTimeout: make(chan *Lease, 1024),
}
if err := a.Init(addr, security); err != nil {
return nil, err
}
return a, nil
}
func (a *connArray) Init(addr string, security config.Security) error {
opt := grpc.WithInsecure()
if len(security.SSLCA) != 0 {
tlsConfig, err := security.ToTLSConfig()
if err != nil {
return errors.Trace(err)
}
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}
unaryInterceptor := grpc_prometheus.UnaryClientInterceptor
streamInterceptor := grpc_prometheus.StreamClientInterceptor
if config.EnableOpenTracing {
unaryInterceptor = grpc_middleware.ChainUnaryClient(
unaryInterceptor,
grpc_opentracing.UnaryClientInterceptor(),
)
streamInterceptor = grpc_middleware.ChainStreamClient(
streamInterceptor,
grpc_opentracing.StreamClientInterceptor(),
)
}
for i := range a.v {
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
conn, err := grpc.DialContext(
ctx,
addr,
opt,
grpc.WithInitialWindowSize(grpcInitialWindowSize),
grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize),
grpc.WithUnaryInterceptor(unaryInterceptor),
grpc.WithStreamInterceptor(streamInterceptor),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxCallMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(MaxSendMsgSize)),
grpc.WithBackoffMaxDelay(time.Second*3),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: GrpcKeepAliveTime,
Timeout: GrpcKeepAliveTimeout,
PermitWithoutStream: true,
}),
)
cancel()
if err != nil {
// Cleanup if the initialization fails.
a.Close()
return errors.Trace(err)
}
a.v[i] = conn
}
go CheckStreamTimeoutLoop(a.streamTimeout)
return nil
}
func (a *connArray) Get() *grpc.ClientConn {
next := atomic.AddUint32(&a.index, 1) % uint32(len(a.v))
return a.v[next]
}
func (a *connArray) Close() {
for i, c := range a.v {
if c != nil {
c.Close()
a.v[i] = nil
}
}
close(a.streamTimeout)
}
// rpcClient is RPC client struct.
// TODO: Add flow control between RPC clients in TiDB ond RPC servers in TiKV.
// Since we use shared client connection to communicate to the same TiKV, it's possible
// that there are too many concurrent requests which overload the service of TiKV.
// TODO: Implement background cleanup. It adds a background goroutine to periodically check
// whether there is any connection is idle and then close and remove these idle connections.
type rpcClient struct {
sync.RWMutex
isClosed bool
conns map[string]*connArray
security config.Security
}
// NewRPCClient manages connections and rpc calls with tikv-servers.
func NewRPCClient(security config.Security) Client {
return &rpcClient{
conns: make(map[string]*connArray),
security: security,
}
}
func (c *rpcClient) getConnArray(addr string) (*connArray, error) {
c.RLock()
if c.isClosed {
c.RUnlock()
return nil, errors.Errorf("rpcClient is closed")
}
array, ok := c.conns[addr]
c.RUnlock()
if !ok {
var err error
array, err = c.createConnArray(addr)
if err != nil {
return nil, err
}
}
return array, nil
}
func (c *rpcClient) createConnArray(addr string) (*connArray, error) {
c.Lock()
defer c.Unlock()
array, ok := c.conns[addr]
if !ok {
var err error
array, err = newConnArray(MaxConnectionCount, addr, c.security)
if err != nil {
return nil, err
}
c.conns[addr] = array
}
return array, nil
}
func (c *rpcClient) closeConns() {
c.Lock()
if !c.isClosed {
c.isClosed = true
// close all connections
for _, array := range c.conns {
array.Close()
}
}
c.Unlock()
}
// SendRequest sends a Request to server and receives Response.
func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *Request, timeout time.Duration) (*Response, error) {
start := time.Now()
reqType := req.Type.String()
storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10)
defer func() {
metrics.SendReqHistogram.WithLabelValues(reqType, storeID).Observe(time.Since(start).Seconds())
}()
connArray, err := c.getConnArray(addr)
if err != nil {
return nil, errors.Trace(err)
}
client := tikvpb.NewTikvClient(connArray.Get())
if req.Type != CmdCopStream {
ctx1, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return CallRPC(ctx1, client, req)
}
// Coprocessor streaming request.
// Use context to support timeout for grpc streaming client.
ctx1, cancel := context.WithCancel(ctx)
defer cancel()
resp, err := CallRPC(ctx1, client, req)
if err != nil {
return nil, errors.Trace(err)
}
// Put the lease object to the timeout channel, so it would be checked periodically.
copStream := resp.CopStream
copStream.Timeout = timeout
copStream.Lease.Cancel = cancel
connArray.streamTimeout <- &copStream.Lease
// Read the first streaming response to get CopStreamResponse.
// This can make error handling much easier, because SendReq() retry on
// region error automatically.
var first *coprocessor.Response
first, err = copStream.Recv()
if err != nil {
if errors.Cause(err) != io.EOF {
return nil, errors.Trace(err)
}
log.Debug("copstream returns nothing for the request.")
}
copStream.Response = first
return resp, nil
}
func (c *rpcClient) Close() error {
c.closeConns()
return nil
}

243
rpc/region_request.go Normal file
View File

@ -0,0 +1,243 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package rpc
import (
"context"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/errorpb"
log "github.com/sirupsen/logrus"
"github.com/tikv/client-go/locate"
"github.com/tikv/client-go/metrics"
"github.com/tikv/client-go/retry"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// RegionRequestSender sends KV/Cop requests to tikv server. It handles network
// errors and some region errors internally.
//
// Typically, a KV/Cop request is bind to a region, all keys that are involved
// in the request should be located in the region.
// The sending process begins with looking for the address of leader store's
// address of the target region from cache, and the request is then sent to the
// destination tikv server over TCP connection.
// If region is updated, can be caused by leader transfer, region split, region
// merge, or region balance, tikv server may not able to process request and
// send back a RegionError.
// RegionRequestSender takes care of errors that does not relevant to region
// range, such as 'I/O timeout', 'NotLeader', and 'ServerIsBusy'. For other
// errors, since region range have changed, the request may need to split, so we
// simply return the error to caller.
type RegionRequestSender struct {
regionCache *locate.RegionCache
client Client
storeAddr string
rpcError error
}
// NewRegionRequestSender creates a new sender.
func NewRegionRequestSender(regionCache *locate.RegionCache, client Client) *RegionRequestSender {
return &RegionRequestSender{
regionCache: regionCache,
client: client,
}
}
// SendReq sends a request to tikv server.
func (s *RegionRequestSender) SendReq(bo *retry.Backoffer, req *Request, regionID locate.RegionVerID, timeout time.Duration) (*Response, error) {
// gofail: var tikvStoreSendReqResult string
// switch tikvStoreSendReqResult {
// case "timeout":
// return nil, errors.New("timeout")
// case "GCNotLeader":
// if req.Type == CmdGC {
// return &Response{
// Type: CmdGC,
// GC: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}},
// }, nil
// }
// case "GCServerIsBusy":
// if req.Type == CmdGC {
// return &Response{
// Type: CmdGC,
// GC: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}},
// }, nil
// }
// }
for {
ctx, err := s.regionCache.GetRPCContext(bo, regionID)
if err != nil {
return nil, errors.Trace(err)
}
if ctx == nil {
// If the region is not found in cache, it must be out
// of date and already be cleaned up. We can skip the
// RPC by returning RegionError directly.
// TODO: Change the returned error to something like "region missing in cache",
// and handle this error like StaleEpoch, which means to re-split the request and retry.
return GenRegionErrorResp(req, &errorpb.Error{StaleEpoch: &errorpb.StaleEpoch{}})
}
s.storeAddr = ctx.Addr
resp, retry, err := s.sendReqToRegion(bo, ctx, req, timeout)
if err != nil {
return nil, errors.Trace(err)
}
if retry {
continue
}
regionErr, err := resp.GetRegionError()
if err != nil {
return nil, errors.Trace(err)
}
if regionErr != nil {
retry, err := s.onRegionError(bo, ctx, regionErr)
if err != nil {
return nil, errors.Trace(err)
}
if retry {
continue
}
}
return resp, nil
}
}
func (s *RegionRequestSender) sendReqToRegion(bo *retry.Backoffer, ctx *locate.RPCContext, req *Request, timeout time.Duration) (resp *Response, retry bool, err error) {
if e := SetContext(req, ctx.Meta, ctx.Peer); e != nil {
return nil, false, errors.Trace(e)
}
resp, err = s.client.SendRequest(bo.GetContext(), ctx.Addr, req, timeout)
if err != nil {
s.rpcError = err
if e := s.onSendFail(bo, ctx, err); e != nil {
return nil, false, errors.Trace(e)
}
return nil, true, nil
}
return
}
func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *locate.RPCContext, err error) error {
// If it failed because the context is cancelled by ourself, don't retry.
if errors.Cause(err) == context.Canceled {
return errors.Trace(err)
}
code := codes.Unknown
if s, ok := status.FromError(errors.Cause(err)); ok {
code = s.Code()
}
if code == codes.Canceled {
select {
case <-bo.GetContext().Done():
return errors.Trace(err)
default:
// If we don't cancel, but the error code is Canceled, it must be from grpc remote.
// This may happen when tikv is killed and exiting.
// Backoff and retry in this case.
log.Warn("receive a grpc cancel signal from remote:", errors.ErrorStack(err))
}
}
s.regionCache.DropStoreOnSendRequestFail(ctx, err)
// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
// TODO: the number of retry time should be limited:since region may be unavailable
// when some unrecoverable disaster happened.
err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx))
return errors.Trace(err)
}
func regionErrorToLabel(e *errorpb.Error) string {
if e.GetNotLeader() != nil {
return "not_leader"
} else if e.GetRegionNotFound() != nil {
return "region_not_found"
} else if e.GetKeyNotInRegion() != nil {
return "key_not_in_region"
} else if e.GetStaleEpoch() != nil {
return "stale_epoch"
} else if e.GetServerIsBusy() != nil {
return "server_is_busy"
} else if e.GetStaleCommand() != nil {
return "stale_command"
} else if e.GetStoreNotMatch() != nil {
return "store_not_match"
}
return "unknown"
}
func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *locate.RPCContext, regionErr *errorpb.Error) (retryable bool, err error) {
metrics.RegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc()
if notLeader := regionErr.GetNotLeader(); notLeader != nil {
// Retry if error is `NotLeader`.
log.Debugf("tikv reports `NotLeader`: %s, ctx: %v, retry later", notLeader, ctx)
s.regionCache.UpdateLeader(ctx.Region, notLeader.GetLeader().GetStoreId())
var boType retry.BackoffType
if notLeader.GetLeader() != nil {
boType = retry.BoUpdateLeader
} else {
boType = retry.BoRegionMiss
}
if err = bo.Backoff(boType, errors.Errorf("not leader: %v, ctx: %v", notLeader, ctx)); err != nil {
return false, errors.Trace(err)
}
return true, nil
}
if storeNotMatch := regionErr.GetStoreNotMatch(); storeNotMatch != nil {
// store not match
log.Warnf("tikv reports `StoreNotMatch`: %s, ctx: %v, retry later", storeNotMatch, ctx)
s.regionCache.ClearStoreByID(ctx.GetStoreID())
return true, nil
}
if staleEpoch := regionErr.GetStaleEpoch(); staleEpoch != nil {
log.Debugf("tikv reports `StaleEpoch`, ctx: %v, retry later", ctx)
err = s.regionCache.OnRegionStale(ctx, staleEpoch.NewRegions)
return false, errors.Trace(err)
}
if regionErr.GetServerIsBusy() != nil {
log.Warnf("tikv reports `ServerIsBusy`, reason: %s, ctx: %v, retry later", regionErr.GetServerIsBusy().GetReason(), ctx)
err = bo.Backoff(retry.BoServerBusy, errors.Errorf("server is busy, ctx: %v", ctx))
if err != nil {
return false, errors.Trace(err)
}
return true, nil
}
if regionErr.GetStaleCommand() != nil {
log.Debugf("tikv reports `StaleCommand`, ctx: %v", ctx)
return true, nil
}
if regionErr.GetRaftEntryTooLarge() != nil {
log.Warnf("tikv reports `RaftEntryTooLarge`, ctx: %v", ctx)
return false, errors.New(regionErr.String())
}
// For other errors, we only drop cache here.
// Because caller may need to re-split the request.
log.Debugf("tikv reports region error: %s, ctx: %v", regionErr, ctx)
s.regionCache.DropRegion(ctx.Region)
return false, nil
}