mirror of https://github.com/etcd-io/dbtester.git
vendor: update
This commit is contained in:
parent
815abb126b
commit
a8bdcc9db2
|
|
@ -20,6 +20,7 @@ import (
|
|||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -35,6 +36,7 @@ import (
|
|||
|
||||
var (
|
||||
ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
|
||||
ErrOldCluster = errors.New("etcdclient: old cluster version")
|
||||
)
|
||||
|
||||
// Client provides and manages an etcd v3 client session.
|
||||
|
|
@ -272,7 +274,7 @@ func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientCo
|
|||
tokenMu: &sync.RWMutex{},
|
||||
}
|
||||
|
||||
err := c.getToken(context.TODO())
|
||||
err := c.getToken(c.ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -307,7 +309,12 @@ func newClient(cfg *Config) (*Client, error) {
|
|||
}
|
||||
|
||||
// use a temporary skeleton client to bootstrap first connection
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
baseCtx := context.TODO()
|
||||
if cfg.Context != nil {
|
||||
baseCtx = cfg.Context
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(baseCtx)
|
||||
client := &Client{
|
||||
conn: nil,
|
||||
cfg: *cfg,
|
||||
|
|
@ -353,10 +360,57 @@ func newClient(cfg *Config) (*Client, error) {
|
|||
client.Auth = NewAuth(client)
|
||||
client.Maintenance = NewMaintenance(client)
|
||||
|
||||
if cfg.RejectOldCluster {
|
||||
if err := client.checkVersion(); err != nil {
|
||||
client.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
go client.autoSync()
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (c *Client) checkVersion() (err error) {
|
||||
var wg sync.WaitGroup
|
||||
errc := make(chan error, len(c.cfg.Endpoints))
|
||||
ctx, cancel := context.WithCancel(c.ctx)
|
||||
if c.cfg.DialTimeout > 0 {
|
||||
ctx, _ = context.WithTimeout(ctx, c.cfg.DialTimeout)
|
||||
}
|
||||
wg.Add(len(c.cfg.Endpoints))
|
||||
for _, ep := range c.cfg.Endpoints {
|
||||
// if cluster is current, any endpoint gives a recent version
|
||||
go func(e string) {
|
||||
defer wg.Done()
|
||||
resp, rerr := c.Status(ctx, e)
|
||||
if rerr != nil {
|
||||
errc <- rerr
|
||||
return
|
||||
}
|
||||
vs := strings.Split(resp.Version, ".")
|
||||
maj, min := 0, 0
|
||||
if len(vs) >= 2 {
|
||||
maj, rerr = strconv.Atoi(vs[0])
|
||||
min, rerr = strconv.Atoi(vs[1])
|
||||
}
|
||||
if maj < 3 || (maj == 3 && min < 2) {
|
||||
rerr = ErrOldCluster
|
||||
}
|
||||
errc <- rerr
|
||||
}(ep)
|
||||
}
|
||||
// wait for success
|
||||
for i := 0; i < len(c.cfg.Endpoints); i++ {
|
||||
if err = <-errc; err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
wg.Wait()
|
||||
return err
|
||||
}
|
||||
|
||||
// ActiveConnection returns the current in-use connection
|
||||
func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }
|
||||
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import (
|
|||
"crypto/tls"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
|
|
@ -41,6 +42,13 @@ type Config struct {
|
|||
// Password is a password for authentication.
|
||||
Password string `json:"password"`
|
||||
|
||||
// RejectOldCluster when set will refuse to create a client against an outdated cluster.
|
||||
RejectOldCluster bool `json:"reject-old-cluster"`
|
||||
|
||||
// DialOptions is a list of dial options for the grpc client (e.g., for interceptors).
|
||||
DialOptions []grpc.DialOption
|
||||
|
||||
// Context is the default client context; it can be used to cancel grpc dial out and
|
||||
// other operations that do not have an explicit context.
|
||||
Context context.Context
|
||||
}
|
||||
|
|
|
|||
|
|
@ -144,16 +144,19 @@ type keepAlive struct {
|
|||
}
|
||||
|
||||
func NewLease(c *Client) Lease {
|
||||
return NewLeaseFromLeaseClient(RetryLeaseClient(c), c.cfg.DialTimeout+time.Second)
|
||||
}
|
||||
|
||||
func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease {
|
||||
l := &lessor{
|
||||
donec: make(chan struct{}),
|
||||
keepAlives: make(map[LeaseID]*keepAlive),
|
||||
remote: RetryLeaseClient(c),
|
||||
firstKeepAliveTimeout: c.cfg.DialTimeout + time.Second,
|
||||
remote: remote,
|
||||
firstKeepAliveTimeout: keepAliveTimeout,
|
||||
}
|
||||
if l.firstKeepAliveTimeout == time.Second {
|
||||
l.firstKeepAliveTimeout = defaultTTL
|
||||
}
|
||||
|
||||
l.stopCtx, l.stopCancel = context.WithCancel(context.Background())
|
||||
return l
|
||||
}
|
||||
|
|
@ -407,7 +410,7 @@ func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
|
|||
}
|
||||
|
||||
// send update to all channels
|
||||
nextKeepAlive := time.Now().Add(1 + time.Duration(karesp.TTL/3)*time.Second)
|
||||
nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
|
||||
ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
|
||||
for _, ch := range ka.chs {
|
||||
select {
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"encoding/binary"
|
||||
"fmt"
|
||||
"net"
|
||||
"sort"
|
||||
"syscall"
|
||||
|
||||
"github.com/coreos/etcd/pkg/cpuutil"
|
||||
|
|
@ -38,37 +39,58 @@ func GetDefaultHost() (string, error) {
|
|||
return "", rerr
|
||||
}
|
||||
|
||||
for family, rmsg := range rmsgs {
|
||||
host, oif, err := parsePREFSRC(rmsg)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if host != "" {
|
||||
return host, nil
|
||||
// prioritize IPv4
|
||||
if rmsg, ok := rmsgs[syscall.AF_INET]; ok {
|
||||
if host, err := chooseHost(syscall.AF_INET, rmsg); host != "" || err != nil {
|
||||
return host, err
|
||||
}
|
||||
delete(rmsgs, syscall.AF_INET)
|
||||
}
|
||||
|
||||
// prefsrc not detected, fall back to getting address from iface
|
||||
ifmsg, ierr := getIfaceAddr(oif, family)
|
||||
if ierr != nil {
|
||||
return "", ierr
|
||||
}
|
||||
// sort so choice is deterministic
|
||||
var families []int
|
||||
for family := range rmsgs {
|
||||
families = append(families, int(family))
|
||||
}
|
||||
sort.Ints(families)
|
||||
|
||||
attrs, aerr := syscall.ParseNetlinkRouteAttr(ifmsg)
|
||||
if aerr != nil {
|
||||
return "", aerr
|
||||
}
|
||||
|
||||
for _, attr := range attrs {
|
||||
// search for RTA_DST because ipv6 doesn't have RTA_SRC
|
||||
if attr.Attr.Type == syscall.RTA_DST {
|
||||
return net.IP(attr.Value).String(), nil
|
||||
}
|
||||
for _, f := range families {
|
||||
family := uint8(f)
|
||||
if host, err := chooseHost(family, rmsgs[family]); host != "" || err != nil {
|
||||
return host, err
|
||||
}
|
||||
}
|
||||
|
||||
return "", errNoDefaultHost
|
||||
}
|
||||
|
||||
func chooseHost(family uint8, rmsg *syscall.NetlinkMessage) (string, error) {
|
||||
host, oif, err := parsePREFSRC(rmsg)
|
||||
if host != "" || err != nil {
|
||||
return host, err
|
||||
}
|
||||
|
||||
// prefsrc not detected, fall back to getting address from iface
|
||||
ifmsg, ierr := getIfaceAddr(oif, family)
|
||||
if ierr != nil {
|
||||
return "", ierr
|
||||
}
|
||||
|
||||
attrs, aerr := syscall.ParseNetlinkRouteAttr(ifmsg)
|
||||
if aerr != nil {
|
||||
return "", aerr
|
||||
}
|
||||
|
||||
for _, attr := range attrs {
|
||||
// search for RTA_DST because ipv6 doesn't have RTA_SRC
|
||||
if attr.Attr.Type == syscall.RTA_DST {
|
||||
return net.IP(attr.Value).String(), nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func getDefaultRoutes() (map[uint8]*syscall.NetlinkMessage, error) {
|
||||
dat, err := syscall.NetlinkRIB(syscall.RTM_GETROUTE, syscall.AF_UNSPEC)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -196,7 +196,7 @@ type horizontalAxis struct {
|
|||
|
||||
// size returns the height of the axis.
|
||||
func (a *horizontalAxis) size() (h vg.Length) {
|
||||
if a.Label.Text != "" {
|
||||
if a.Label.Text != "" { // We assume that the label isn't rotated.
|
||||
h -= a.Label.Font.Extents().Descent
|
||||
h += a.Label.Height(a.Label.Text)
|
||||
}
|
||||
|
|
@ -258,13 +258,9 @@ func (a *horizontalAxis) GlyphBoxes(*Plot) (boxes []GlyphBox) {
|
|||
if t.IsMinor() {
|
||||
continue
|
||||
}
|
||||
w := a.Tick.Label.Width(t.Label)
|
||||
box := GlyphBox{
|
||||
X: a.Norm(t.Value),
|
||||
Rectangle: vg.Rectangle{
|
||||
Min: vg.Point{X: -w / 2},
|
||||
Max: vg.Point{X: w / 2},
|
||||
},
|
||||
X: a.Norm(t.Value),
|
||||
Rectangle: a.Tick.Label.Rectangle(t.Label),
|
||||
}
|
||||
boxes = append(boxes, box)
|
||||
}
|
||||
|
|
@ -278,7 +274,7 @@ type verticalAxis struct {
|
|||
|
||||
// size returns the width of the axis.
|
||||
func (a *verticalAxis) size() (w vg.Length) {
|
||||
if a.Label.Text != "" {
|
||||
if a.Label.Text != "" { // We assume that the label isn't rotated.
|
||||
w -= a.Label.Font.Extents().Descent
|
||||
w += a.Label.Height(a.Label.Text)
|
||||
}
|
||||
|
|
@ -343,13 +339,9 @@ func (a *verticalAxis) GlyphBoxes(*Plot) (boxes []GlyphBox) {
|
|||
if t.IsMinor() {
|
||||
continue
|
||||
}
|
||||
h := a.Tick.Label.Height(t.Label)
|
||||
box := GlyphBox{
|
||||
Y: a.Norm(t.Value),
|
||||
Rectangle: vg.Rectangle{
|
||||
Min: vg.Point{Y: -h / 2},
|
||||
Max: vg.Point{Y: h / 2},
|
||||
},
|
||||
Y: a.Norm(t.Value),
|
||||
Rectangle: a.Tick.Label.Rectangle(t.Label),
|
||||
}
|
||||
boxes = append(boxes, box)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package api
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
|
|
@ -79,6 +80,11 @@ type QueryOptions struct {
|
|||
// metadata key/value pairs. Currently, only one key/value pair can
|
||||
// be provided for filtering.
|
||||
NodeMeta map[string]string
|
||||
|
||||
// RelayFactor is used in keyring operations to cause reponses to be
|
||||
// relayed back to the sender through N other random nodes. Must be
|
||||
// a value from 0 to 5 (inclusive).
|
||||
RelayFactor uint8
|
||||
}
|
||||
|
||||
// WriteOptions are used to parameterize a write
|
||||
|
|
@ -90,6 +96,11 @@ type WriteOptions struct {
|
|||
// Token is used to provide a per-request ACL token
|
||||
// which overrides the agent's default token.
|
||||
Token string
|
||||
|
||||
// RelayFactor is used in keyring operations to cause reponses to be
|
||||
// relayed back to the sender through N other random nodes. Must be
|
||||
// a value from 0 to 5 (inclusive).
|
||||
RelayFactor uint8
|
||||
}
|
||||
|
||||
// QueryMeta is used to return meta data about a query
|
||||
|
|
@ -336,13 +347,22 @@ func NewClient(config *Config) (*Client, error) {
|
|||
config.HttpClient = defConfig.HttpClient
|
||||
}
|
||||
|
||||
if parts := strings.SplitN(config.Address, "unix://", 2); len(parts) == 2 {
|
||||
trans := cleanhttp.DefaultTransport()
|
||||
trans.Dial = func(_, _ string) (net.Conn, error) {
|
||||
return net.Dial("unix", parts[1])
|
||||
}
|
||||
config.HttpClient = &http.Client{
|
||||
Transport: trans,
|
||||
parts := strings.SplitN(config.Address, "://", 2)
|
||||
if len(parts) == 2 {
|
||||
switch parts[0] {
|
||||
case "http":
|
||||
case "https":
|
||||
config.Scheme = "https"
|
||||
case "unix":
|
||||
trans := cleanhttp.DefaultTransport()
|
||||
trans.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) {
|
||||
return net.Dial("unix", parts[1])
|
||||
}
|
||||
config.HttpClient = &http.Client{
|
||||
Transport: trans,
|
||||
}
|
||||
default:
|
||||
return nil, fmt.Errorf("Unknown protocol scheme: %s", parts[0])
|
||||
}
|
||||
config.Address = parts[1]
|
||||
}
|
||||
|
|
@ -396,6 +416,9 @@ func (r *request) setQueryOptions(q *QueryOptions) {
|
|||
r.params.Add("node-meta", key+":"+value)
|
||||
}
|
||||
}
|
||||
if q.RelayFactor != 0 {
|
||||
r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor)))
|
||||
}
|
||||
}
|
||||
|
||||
// durToMsec converts a duration to a millisecond specified string. If the
|
||||
|
|
@ -437,6 +460,9 @@ func (r *request) setWriteOptions(q *WriteOptions) {
|
|||
if q.Token != "" {
|
||||
r.header.Set("X-Consul-Token", q.Token)
|
||||
}
|
||||
if q.RelayFactor != 0 {
|
||||
r.params.Set("relay-factor", strconv.Itoa(int(q.RelayFactor)))
|
||||
}
|
||||
}
|
||||
|
||||
// toHTTP converts the request to an HTTP request
|
||||
|
|
|
|||
Loading…
Reference in New Issue