*: sync with etcd, Consul master

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyu-Ho Lee 2017-03-15 14:09:21 -07:00
parent 2008a31aa7
commit a403d227a7
10 changed files with 280 additions and 59 deletions

14
glide.lock generated
View File

@ -1,5 +1,5 @@
hash: 11e6bd98c99ecd78b59d905847f46dd54ff2306035536eaa0754c8c4868870ed hash: 48a2f456909edfb095bcd4d2ed59c220569bbbc2c9ab6fcc5bd94ed8bf6cb802
updated: 2017-03-06T13:03:02.042405087-08:00 updated: 2017-03-15T14:01:44.943466204-07:00
imports: imports:
- name: bitbucket.org/zombiezen/gopdf - name: bitbucket.org/zombiezen/gopdf
version: 1c63dc69751bc45441c2ce1f56b631c55294b4d5 version: 1c63dc69751bc45441c2ce1f56b631c55294b4d5
@ -21,7 +21,7 @@ imports:
- name: github.com/cheggaaa/pb - name: github.com/cheggaaa/pb
version: d7e6ca3010b6f084d8056847f55d7f572f180678 version: d7e6ca3010b6f084d8056847f55d7f572f180678
- name: github.com/coreos/etcd - name: github.com/coreos/etcd
version: 317f3571ff9407c17ad6fd68877c4e673954d25a version: 5856c8bce9778a12d79038fdb1f3fba9416bd297
subpackages: subpackages:
- auth/authpb - auth/authpb
- client - client
@ -104,13 +104,13 @@ imports:
- psn - psn
- schema - schema
- name: github.com/hashicorp/consul - name: github.com/hashicorp/consul
version: db4f40f6f05d6ab6712951ad4e5f045f8e9e1c9c version: 2f0650a987ea7c251bd58cb896bcbbdbceeec2f4
subpackages: subpackages:
- api - api
- name: github.com/hashicorp/go-cleanhttp - name: github.com/hashicorp/go-cleanhttp
version: 3573b8b52aa7b37b9358d966a898feb387f62437 version: 3573b8b52aa7b37b9358d966a898feb387f62437
- name: github.com/hashicorp/serf - name: github.com/hashicorp/serf
version: d787b2e8f72b5da48c43b84c2617234401084050 version: 19f2c401e122352c047a84d6584dd51e2fb8fcc4
subpackages: subpackages:
- coordinate - coordinate
- name: github.com/inconshreveable/mousetrap - name: github.com/inconshreveable/mousetrap
@ -139,7 +139,7 @@ imports:
subpackages: subpackages:
- codec - codec
- name: golang.org/x/image - name: golang.org/x/image
version: e6cbe778da3cea914ea6436d0f99309605c3dfc4 version: 793f3be7dac93749dec06ae3fbe7d0ded4bdcf3d
subpackages: subpackages:
- draw - draw
- font - font
@ -181,7 +181,7 @@ imports:
- storage/v1 - storage/v1
- transport - transport
- name: google.golang.org/appengine - name: google.golang.org/appengine
version: 5403c08c6e8fb3b2dc1209d2d833d8e8ac8240de version: b79c28f0197795b4050bfcd7c4c2209136c594b1
subpackages: subpackages:
- internal - internal
- internal/app_identity - internal/app_identity

View File

@ -9,7 +9,7 @@ import:
- package: github.com/cheggaaa/pb - package: github.com/cheggaaa/pb
version: d7e6ca3010b6f084d8056847f55d7f572f180678 version: d7e6ca3010b6f084d8056847f55d7f572f180678
- package: github.com/coreos/etcd - package: github.com/coreos/etcd
version: 317f3571ff9407c17ad6fd68877c4e673954d25a version: 5856c8bce9778a12d79038fdb1f3fba9416bd297
subpackages: subpackages:
- auth/authpb - auth/authpb
- client - client
@ -60,7 +60,7 @@ import:
subpackages: subpackages:
- psn - psn
- package: github.com/hashicorp/consul - package: github.com/hashicorp/consul
version: db4f40f6f05d6ab6712951ad4e5f045f8e9e1c9c version: 2f0650a987ea7c251bd58cb896bcbbdbceeec2f4
subpackages: subpackages:
- api - api
- package: github.com/samuel/go-zookeeper - package: github.com/samuel/go-zookeeper

View File

@ -3,7 +3,7 @@ test_description: |
- Google Cloud Compute Engine - Google Cloud Compute Engine
- 4 machines of 16 vCPUs + 60 GB Memory + 300 GB SSD (1 for client) - 4 machines of 16 vCPUs + 60 GB Memory + 300 GB SSD (1 for client)
- Ubuntu 16.10 - Ubuntu 16.10
- etcd tip (Go 1.8.0) - etcd tip (Go 1.8.0, git SHA 5856c8bce9778a12d79038fdb1f3fba9416bd297)
- Zookeeper r3.5.2-alpha - Zookeeper r3.5.2-alpha
- Java 8 - Java 8
- javac 1.8.0_121 - javac 1.8.0_121

View File

@ -3,7 +3,7 @@ test_description: |
- Google Cloud Compute Engine - Google Cloud Compute Engine
- 4 machines of 16 vCPUs + 60 GB Memory + 300 GB SSD (1 for client) - 4 machines of 16 vCPUs + 60 GB Memory + 300 GB SSD (1 for client)
- Ubuntu 16.10 - Ubuntu 16.10
- etcd tip (Go 1.8.0) - etcd tip (Go 1.8.0, git SHA 5856c8bce9778a12d79038fdb1f3fba9416bd297)
- Zookeeper r3.5.2-alpha - Zookeeper r3.5.2-alpha
- Java 8 - Java 8
- javac 1.8.0_121 - javac 1.8.0_121

View File

@ -3,7 +3,7 @@ test_description: |
- Google Cloud Compute Engine - Google Cloud Compute Engine
- 4 machines of 16 vCPUs + 60 GB Memory + 300 GB SSD (1 for client) - 4 machines of 16 vCPUs + 60 GB Memory + 300 GB SSD (1 for client)
- Ubuntu 16.10 - Ubuntu 16.10
- etcd tip (Go 1.8.0) - etcd tip (Go 1.8.0, git SHA 5856c8bce9778a12d79038fdb1f3fba9416bd297)
- Zookeeper r3.5.2-alpha - Zookeeper r3.5.2-alpha
- Java 8 - Java 8
- javac 1.8.0_121 - javac 1.8.0_121

View File

@ -2,37 +2,6 @@
set -e set -e
<<COMMENT <<COMMENT
GIT_PATH=github.com/coreos/etcd
USER_NAME=coreos
BRANCH_NAME=release-3.1
rm -rf ${GOPATH}/src/${GIT_PATH}
mkdir -p ${GOPATH}/src/github.com/coreos
git clone https://github.com/${USER_NAME}/etcd \
--branch ${BRANCH_NAME} \
${GOPATH}/src/${GIT_PATH}
cd ${GOPATH}/src/${GIT_PATH}
# git reset --hard faeeb2fc7514c5caf7a9a0cc03ac9ee2ff94438b
./build
# FAILPOINTS=1 ./build
# https://github.com/coreos/etcd/commits/master?after=N38ZsAMfnAqv4q7Ci2%2BQGTEfUvkrMTExOQ%3D%3D
${GOPATH}/src/${GIT_PATH}/bin/etcd --version
${GOPATH}/src/${GIT_PATH}/bin/etcdctl --version
cp ${GOPATH}/src/${GIT_PATH}/bin/etcd ${GOPATH}/bin/etcd
sudo cp ${GOPATH}/src/${GIT_PATH}/bin/etcd /etcd
cp ${GOPATH}/src/${GIT_PATH}/bin/etcdctl ${GOPATH}/bin/etcdctl
sudo cp ${GOPATH}/src/${GIT_PATH}/bin/etcdctl /etcdctl
COMMENT
ETCD_VER=v3.1.1 ETCD_VER=v3.1.1
GOOGLE_URL=https://storage.googleapis.com/etcd GOOGLE_URL=https://storage.googleapis.com/etcd
@ -47,6 +16,37 @@ curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/
tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/test-etcd-${ETCD_VER} --strip-components=1 tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/test-etcd-${ETCD_VER} --strip-components=1
sudo cp /tmp/test-etcd-${ETCD_VER}/etcd* $GOPATH/bin sudo cp /tmp/test-etcd-${ETCD_VER}/etcd* $GOPATH/bin
COMMENT
GIT_PATH=github.com/coreos/etcd
USER_NAME=coreos
BRANCH_NAME=master
rm -rf ${GOPATH}/src/${GIT_PATH}
mkdir -p ${GOPATH}/src/github.com/coreos
git clone https://github.com/${USER_NAME}/etcd \
--branch ${BRANCH_NAME} \
${GOPATH}/src/${GIT_PATH}
cd ${GOPATH}/src/${GIT_PATH}
git reset --hard 5856c8bce9778a12d79038fdb1f3fba9416bd297
./build
# FAILPOINTS=1 ./build
# https://github.com/coreos/etcd/commits/master?after=N38ZsAMfnAqv4q7Ci2%2BQGTEfUvkrMTExOQ%3D%3D
${GOPATH}/src/${GIT_PATH}/bin/etcd --version
${GOPATH}/src/${GIT_PATH}/bin/etcdctl --version
cp ${GOPATH}/src/${GIT_PATH}/bin/etcd ${GOPATH}/bin/etcd
sudo cp ${GOPATH}/src/${GIT_PATH}/bin/etcd /etcd
cp ${GOPATH}/src/${GIT_PATH}/bin/etcdctl ${GOPATH}/bin/etcdctl
sudo cp ${GOPATH}/src/${GIT_PATH}/bin/etcdctl /etcdctl
$GOPATH/bin/etcd --version $GOPATH/bin/etcd --version
$GOPATH/bin/etcdctl --version $GOPATH/bin/etcdctl --version

View File

@ -47,6 +47,18 @@ type simpleBalancer struct {
// upc closes when upEps transitions from empty to non-zero or the balancer closes. // upc closes when upEps transitions from empty to non-zero or the balancer closes.
upc chan struct{} upc chan struct{}
// downc closes when grpc calls down() on pinAddr
downc chan struct{}
// stopc is closed to signal updateNotifyLoop should stop.
stopc chan struct{}
// donec closes when all goroutines are exited
donec chan struct{}
// updateAddrsC notifies updateNotifyLoop to update addrs.
updateAddrsC chan struct{}
// grpc issues TLS cert checks using the string passed into dial so // grpc issues TLS cert checks using the string passed into dial so
// that string must be the host. To recover the full scheme://host URL, // that string must be the host. To recover the full scheme://host URL,
// have a map from hosts to the original endpoint. // have a map from hosts to the original endpoint.
@ -71,8 +83,13 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
notifyCh: notifyCh, notifyCh: notifyCh,
readyc: make(chan struct{}), readyc: make(chan struct{}),
upc: make(chan struct{}), upc: make(chan struct{}),
stopc: make(chan struct{}),
downc: make(chan struct{}),
donec: make(chan struct{}),
updateAddrsC: make(chan struct{}, 1),
host2ep: getHost2ep(eps), host2ep: getHost2ep(eps),
} }
go sb.updateNotifyLoop()
return sb return sb
} }
@ -103,7 +120,6 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
np := getHost2ep(eps) np := getHost2ep(eps)
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock()
match := len(np) == len(b.host2ep) match := len(np) == len(b.host2ep)
for k, v := range np { for k, v := range np {
@ -114,6 +130,7 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
} }
if match { if match {
// same endpoints, so no need to update address // same endpoints, so no need to update address
b.mu.Unlock()
return return
} }
@ -124,10 +141,78 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])}) addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])})
} }
b.addrs = addrs b.addrs = addrs
// updating notifyCh can trigger new connections, // updating notifyCh can trigger new connections,
// but balancer only expects new connections if all connections are down // only update addrs if all connections are down
if b.pinAddr == "" { // or addrs does not include pinAddr.
b.notifyCh <- addrs update := !hasAddr(addrs, b.pinAddr)
b.mu.Unlock()
if update {
select {
case b.updateAddrsC <- struct{}{}:
case <-b.stopc:
}
}
}
func hasAddr(addrs []grpc.Address, targetAddr string) bool {
for _, addr := range addrs {
if targetAddr == addr.Addr {
return true
}
}
return false
}
func (b *simpleBalancer) updateNotifyLoop() {
defer close(b.donec)
for {
b.mu.RLock()
upc := b.upc
b.mu.RUnlock()
var downc chan struct{}
select {
case <-upc:
var addr string
b.mu.RLock()
addr = b.pinAddr
// Up() sets pinAddr and downc as a pair under b.mu
downc = b.downc
b.mu.RUnlock()
if addr == "" {
break
}
// close opened connections that are not pinAddr
// this ensures only one connection is open per client
select {
case b.notifyCh <- []grpc.Address{{Addr: addr}}:
case <-b.stopc:
return
}
case <-b.updateAddrsC:
b.notifyAddrs()
continue
}
select {
case <-downc:
b.notifyAddrs()
case <-b.updateAddrsC:
b.notifyAddrs()
case <-b.stopc:
return
}
}
}
func (b *simpleBalancer) notifyAddrs() {
b.mu.RLock()
addrs := b.addrs
b.mu.RUnlock()
select {
case b.notifyCh <- addrs:
case <-b.stopc:
} }
} }
@ -141,24 +226,27 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
if b.closed { if b.closed {
return func(err error) {} return func(err error) {}
} }
// gRPC might call Up on a stale address.
// Prevent updating pinAddr with a stale address.
if !hasAddr(b.addrs, addr.Addr) {
return func(err error) {}
}
if b.pinAddr == "" { if b.pinAddr == "" {
// notify waiting Get()s and pin first connected address // notify waiting Get()s and pin first connected address
close(b.upc) close(b.upc)
b.downc = make(chan struct{})
b.pinAddr = addr.Addr b.pinAddr = addr.Addr
// notify client that a connection is up // notify client that a connection is up
b.readyOnce.Do(func() { close(b.readyc) }) b.readyOnce.Do(func() { close(b.readyc) })
// close opened connections that are not pinAddr
// this ensures only one connection is open per client
b.notifyCh <- []grpc.Address{addr}
} }
return func(err error) { return func(err error) {
b.mu.Lock() b.mu.Lock()
if b.pinAddr == addr.Addr { if b.pinAddr == addr.Addr {
b.upc = make(chan struct{}) b.upc = make(chan struct{})
close(b.downc)
b.pinAddr = "" b.pinAddr = ""
b.notifyCh <- b.addrs
} }
b.mu.Unlock() b.mu.Unlock()
} }
@ -214,14 +302,15 @@ func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
func (b *simpleBalancer) Close() error { func (b *simpleBalancer) Close() error {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock()
// In case gRPC calls close twice. TODO: remove the checking // In case gRPC calls close twice. TODO: remove the checking
// when we are sure that gRPC wont call close twice. // when we are sure that gRPC wont call close twice.
if b.closed { if b.closed {
b.mu.Unlock()
<-b.donec
return nil return nil
} }
b.closed = true b.closed = true
close(b.notifyCh) close(b.stopc)
b.pinAddr = "" b.pinAddr = ""
// In the case of following scenario: // In the case of following scenario:
@ -236,6 +325,13 @@ func (b *simpleBalancer) Close() error {
// terminate all waiting Get()s // terminate all waiting Get()s
close(b.upc) close(b.upc)
} }
b.mu.Unlock()
// wait for updateNotifyLoop to finish
<-b.donec
close(b.notifyCh)
return nil return nil
} }

View File

@ -77,6 +77,14 @@ func New(cfg Config) (*Client, error) {
return newClient(&cfg) return newClient(&cfg)
} }
// NewCtxClient creates a client with a context but no underlying grpc
// connection. This is useful for embedded cases that override the
// service interface implementations and do not need connection management.
func NewCtxClient(ctx context.Context) *Client {
cctx, cancel := context.WithCancel(ctx)
return &Client{ctx: cctx, cancel: cancel}
}
// NewFromURL creates a new etcdv3 client from a URL. // NewFromURL creates a new etcdv3 client from a URL.
func NewFromURL(url string) (*Client, error) { func NewFromURL(url string) (*Client, error) {
return New(Config{Endpoints: []string{url}}) return New(Config{Endpoints: []string{url}})
@ -87,7 +95,10 @@ func (c *Client) Close() error {
c.cancel() c.cancel()
c.Watcher.Close() c.Watcher.Close()
c.Lease.Close() c.Lease.Close()
if c.conn != nil {
return toErr(c.ctx, c.conn.Close()) return toErr(c.ctx, c.conn.Close())
}
return c.ctx.Err()
} }
// Ctx is a context for "out of band" messages (e.g., for sending // Ctx is a context for "out of band" messages (e.g., for sending

View File

@ -694,6 +694,10 @@ func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan str
go func(ws *watcherStream) { go func(ws *watcherStream) {
defer wg.Done() defer wg.Done()
if ws.closing { if ws.closing {
if ws.initReq.ctx.Err() != nil && ws.outc != nil {
close(ws.outc)
ws.outc = nil
}
return return
} }
select { select {

View File

@ -6,6 +6,7 @@ import (
"io" "io"
"strconv" "strconv"
"strings" "strings"
"time"
) )
// Operator can be used to perform low-level operator tasks for Consul. // Operator can be used to perform low-level operator tasks for Consul.
@ -79,6 +80,19 @@ type AutopilotConfiguration struct {
// peer list when a new server joins // peer list when a new server joins
CleanupDeadServers bool CleanupDeadServers bool
// LastContactThreshold is the limit on the amount of time a server can go
// without leader contact before being considered unhealthy.
LastContactThreshold *ReadableDuration
// MaxTrailingLogs is the amount of entries in the Raft Log that a server can
// be behind before being considered unhealthy.
MaxTrailingLogs uint64
// ServerStabilizationTime is the minimum amount of time a server must be
// in a stable, healthy state before it can be added to the cluster. Only
// applicable with Raft protocol version 3 or higher.
ServerStabilizationTime *ReadableDuration
// CreateIndex holds the index corresponding the creation of this configuration. // CreateIndex holds the index corresponding the creation of this configuration.
// This is a read-only field. // This is a read-only field.
CreateIndex uint64 CreateIndex uint64
@ -90,6 +104,84 @@ type AutopilotConfiguration struct {
ModifyIndex uint64 ModifyIndex uint64
} }
// ServerHealth is the health (from the leader's point of view) of a server.
type ServerHealth struct {
// ID is the raft ID of the server.
ID string
// Name is the node name of the server.
Name string
// The status of the SerfHealth check for the server.
SerfStatus string
// LastContact is the time since this node's last contact with the leader.
LastContact *ReadableDuration
// LastTerm is the highest leader term this server has a record of in its Raft log.
LastTerm uint64
// LastIndex is the last log index this server has a record of in its Raft log.
LastIndex uint64
// Healthy is whether or not the server is healthy according to the current
// Autopilot config.
Healthy bool
// StableSince is the last time this server's Healthy value changed.
StableSince time.Time
}
// OperatorHealthReply is a representation of the overall health of the cluster
type OperatorHealthReply struct {
// Healthy is true if all the servers in the cluster are healthy.
Healthy bool
// FailureTolerance is the number of healthy servers that could be lost without
// an outage occurring.
FailureTolerance int
// Servers holds the health of each server.
Servers []ServerHealth
}
// ReadableDuration is a duration type that is serialized to JSON in human readable format.
type ReadableDuration time.Duration
func NewReadableDuration(dur time.Duration) *ReadableDuration {
d := ReadableDuration(dur)
return &d
}
func (d *ReadableDuration) String() string { return d.Duration().String() }
func (d *ReadableDuration) Duration() time.Duration {
if d == nil {
return time.Duration(0)
}
return time.Duration(*d)
}
func (d *ReadableDuration) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, d.Duration().String())), nil
}
func (d *ReadableDuration) UnmarshalJSON(raw []byte) error {
if d == nil {
return fmt.Errorf("cannot unmarshal to nil pointer")
}
str := string(raw)
if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' {
return fmt.Errorf("must be enclosed with quotes: %s", str)
}
dur, err := time.ParseDuration(str[1 : len(str)-1])
if err != nil {
return err
}
*d = ReadableDuration(dur)
return nil
}
// RaftGetConfiguration is used to query the current Raft peer set. // RaftGetConfiguration is used to query the current Raft peer set.
func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) { func (op *Operator) RaftGetConfiguration(q *QueryOptions) (*RaftConfiguration, error) {
r := op.c.newRequest("GET", "/v1/operator/raft/configuration") r := op.c.newRequest("GET", "/v1/operator/raft/configuration")
@ -203,6 +295,7 @@ func (op *Operator) AutopilotGetConfiguration(q *QueryOptions) (*AutopilotConfig
if err := decodeBody(resp, &out); err != nil { if err := decodeBody(resp, &out); err != nil {
return nil, err return nil, err
} }
return &out, nil return &out, nil
} }
@ -241,3 +334,20 @@ func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *W
return res, nil return res, nil
} }
// AutopilotServerHealth
func (op *Operator) AutopilotServerHealth(q *QueryOptions) (*OperatorHealthReply, error) {
r := op.c.newRequest("GET", "/v1/operator/autopilot/health")
r.setQueryOptions(q)
_, resp, err := requireOK(op.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out OperatorHealthReply
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return &out, nil
}