mirror of https://github.com/docker/docs.git
Merge pull request #938 from aluzzardi/leadership-status
Replication Status
This commit is contained in:
commit
8c39e16ddb
|
|
@ -27,7 +27,7 @@ func getInfo(c *context, w http.ResponseWriter, r *http.Request) {
|
||||||
info := dockerclient.Info{
|
info := dockerclient.Info{
|
||||||
Containers: int64(len(c.cluster.Containers())),
|
Containers: int64(len(c.cluster.Containers())),
|
||||||
Images: int64(len(c.cluster.Images())),
|
Images: int64(len(c.cluster.Images())),
|
||||||
DriverStatus: c.cluster.Info(),
|
DriverStatus: c.statusHandler.Status(),
|
||||||
NEventsListener: int64(c.eventsHandler.Size()),
|
NEventsListener: int64(c.eventsHandler.Size()),
|
||||||
Debug: c.debug,
|
Debug: c.debug,
|
||||||
MemoryLimit: true,
|
MemoryLimit: true,
|
||||||
|
|
|
||||||
|
|
@ -9,10 +9,11 @@ import (
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Router context, used by handlers.
|
// Primary router context, used by handlers.
|
||||||
type context struct {
|
type context struct {
|
||||||
cluster cluster.Cluster
|
cluster cluster.Cluster
|
||||||
eventsHandler *eventsHandler
|
eventsHandler *eventsHandler
|
||||||
|
statusHandler StatusHandler
|
||||||
debug bool
|
debug bool
|
||||||
tlsConfig *tls.Config
|
tlsConfig *tls.Config
|
||||||
}
|
}
|
||||||
|
|
@ -82,8 +83,8 @@ func writeCorsHeaders(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Add("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT, OPTIONS")
|
w.Header().Add("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT, OPTIONS")
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRouter creates a new API router.
|
// NewPrimary creates a new API router.
|
||||||
func NewRouter(cluster cluster.Cluster, tlsConfig *tls.Config, enableCors bool) *mux.Router {
|
func NewPrimary(cluster cluster.Cluster, tlsConfig *tls.Config, status StatusHandler, enableCors bool) *mux.Router {
|
||||||
// Register the API events handler in the cluster.
|
// Register the API events handler in the cluster.
|
||||||
eventsHandler := newEventsHandler()
|
eventsHandler := newEventsHandler()
|
||||||
cluster.RegisterEventHandler(eventsHandler)
|
cluster.RegisterEventHandler(eventsHandler)
|
||||||
|
|
@ -91,6 +92,7 @@ func NewRouter(cluster cluster.Cluster, tlsConfig *tls.Config, enableCors bool)
|
||||||
context := &context{
|
context := &context{
|
||||||
cluster: cluster,
|
cluster: cluster,
|
||||||
eventsHandler: eventsHandler,
|
eventsHandler: eventsHandler,
|
||||||
|
statusHandler: status,
|
||||||
tlsConfig: tlsConfig,
|
tlsConfig: tlsConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
37
api/proxy.go
37
api/proxy.go
|
|
@ -1,37 +0,0 @@
|
||||||
package api
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/tls"
|
|
||||||
"net/http"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ReverseProxy is a Docker reverse proxy.
|
|
||||||
type ReverseProxy struct {
|
|
||||||
tlsConfig *tls.Config
|
|
||||||
dest string
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewReverseProxy creates a new reverse proxy.
|
|
||||||
func NewReverseProxy(tlsConfig *tls.Config) *ReverseProxy {
|
|
||||||
return &ReverseProxy{
|
|
||||||
tlsConfig: tlsConfig,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetDestination sets the HTTP destination of the Docker endpoint.
|
|
||||||
func (p *ReverseProxy) SetDestination(dest string) {
|
|
||||||
// FIXME: We have to kill current connections before doing this.
|
|
||||||
p.dest = dest
|
|
||||||
}
|
|
||||||
|
|
||||||
// ServeHTTP is the http.Handler.
|
|
||||||
func (p *ReverseProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if p.dest == "" {
|
|
||||||
httpError(w, "No cluster leader", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := hijack(p.tlsConfig, p.dest, w, r); err != nil {
|
|
||||||
httpError(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -0,0 +1,52 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
var localRoutes = []string{"/info", "/_ping"}
|
||||||
|
|
||||||
|
// Replica is an API replica that reserves proxy to the primary.
|
||||||
|
type Replica struct {
|
||||||
|
handler http.Handler
|
||||||
|
tlsConfig *tls.Config
|
||||||
|
primary string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewReplica creates a new API replica.
|
||||||
|
func NewReplica(handler http.Handler, tlsConfig *tls.Config) *Replica {
|
||||||
|
return &Replica{
|
||||||
|
handler: handler,
|
||||||
|
tlsConfig: tlsConfig,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetPrimary sets the address of the primary Swarm manager
|
||||||
|
func (p *Replica) SetPrimary(primary string) {
|
||||||
|
// FIXME: We have to kill current connections before doing this.
|
||||||
|
p.primary = primary
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServeHTTP is the http.Handler.
|
||||||
|
func (p *Replica) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// Check whether we should handle this request locally.
|
||||||
|
for _, route := range localRoutes {
|
||||||
|
if strings.HasSuffix(r.URL.Path, route) {
|
||||||
|
p.handler.ServeHTTP(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, forward.
|
||||||
|
if p.primary == "" {
|
||||||
|
httpError(w, "No elected primary cluster manager", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := hijack(p.tlsConfig, p.primary, w, r); err != nil {
|
||||||
|
httpError(w, fmt.Sprintf("Unable to reach primary cluster manager (%s): %v", err, p.primary), http.StatusInternalServerError)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
package api
|
||||||
|
|
||||||
|
// StatusHandler allows the API to display extra information on docker info.
|
||||||
|
type StatusHandler interface {
|
||||||
|
// Info provides key/values to be added to docker info.
|
||||||
|
Status() [][]string
|
||||||
|
}
|
||||||
|
|
@ -119,7 +119,7 @@ var (
|
||||||
}
|
}
|
||||||
|
|
||||||
flLeaderElection = cli.BoolFlag{
|
flLeaderElection = cli.BoolFlag{
|
||||||
Name: "leader-election",
|
Name: "replication",
|
||||||
Usage: "Enable cluster leader election between Swarm managers",
|
Usage: "Enable Swarm manager replication",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
|
||||||
"path"
|
"path"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -41,6 +40,30 @@ func (h *logHandler) Handle(e *cluster.Event) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type statusHandler struct {
|
||||||
|
cluster cluster.Cluster
|
||||||
|
candidate *leadership.Candidate
|
||||||
|
follower *leadership.Follower
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *statusHandler) Status() [][]string {
|
||||||
|
var status [][]string
|
||||||
|
|
||||||
|
if h.candidate != nil && !h.candidate.IsLeader() {
|
||||||
|
status = [][]string{
|
||||||
|
{"\bRole", "replica"},
|
||||||
|
{"\bPrimary", h.follower.Leader()},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
status = [][]string{
|
||||||
|
{"\bRole", "primary"},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
status = append(status, h.cluster.Info()...)
|
||||||
|
return status
|
||||||
|
}
|
||||||
|
|
||||||
// Load the TLS certificates/keys and, if verify is true, the CA.
|
// Load the TLS certificates/keys and, if verify is true, the CA.
|
||||||
func loadTLSConfig(ca, cert, key string, verify bool) (*tls.Config, error) {
|
func loadTLSConfig(ca, cert, key string, verify bool) (*tls.Config, error) {
|
||||||
c, err := tls.LoadX509KeyPair(cert, key)
|
c, err := tls.LoadX509KeyPair(cert, key)
|
||||||
|
|
@ -91,7 +114,7 @@ func createDiscovery(uri string, c *cli.Context) discovery.Discovery {
|
||||||
return discovery
|
return discovery
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery discovery.Discovery, addr string, tlsConfig *tls.Config) {
|
func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Server, discovery discovery.Discovery, addr string, tlsConfig *tls.Config) {
|
||||||
kvDiscovery, ok := discovery.(*kvdiscovery.Discovery)
|
kvDiscovery, ok := discovery.(*kvdiscovery.Discovery)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Fatal("Leader election is only supported with consul, etcd and zookeeper discovery.")
|
log.Fatal("Leader election is only supported with consul, etcd and zookeeper discovery.")
|
||||||
|
|
@ -101,7 +124,8 @@ func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery
|
||||||
candidate := leadership.NewCandidate(client, leaderElectionPath, addr)
|
candidate := leadership.NewCandidate(client, leaderElectionPath, addr)
|
||||||
follower := leadership.NewFollower(client, leaderElectionPath)
|
follower := leadership.NewFollower(client, leaderElectionPath)
|
||||||
|
|
||||||
proxy := api.NewReverseProxy(tlsConfig)
|
primary := api.NewPrimary(cluster, tlsConfig, &statusHandler{cluster, candidate, follower}, c.Bool("cors"))
|
||||||
|
replica := api.NewReplica(primary, tlsConfig)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
candidate.RunForElection()
|
candidate.RunForElection()
|
||||||
|
|
@ -109,10 +133,10 @@ func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery
|
||||||
for isElected := range electedCh {
|
for isElected := range electedCh {
|
||||||
if isElected {
|
if isElected {
|
||||||
log.Info("Cluster leadership acquired")
|
log.Info("Cluster leadership acquired")
|
||||||
server.SetHandler(apiHandler)
|
server.SetHandler(primary)
|
||||||
} else {
|
} else {
|
||||||
log.Info("Cluster leadership lost")
|
log.Info("Cluster leadership lost")
|
||||||
server.SetHandler(proxy)
|
server.SetHandler(replica)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
@ -123,12 +147,14 @@ func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery
|
||||||
for leader := range leaderCh {
|
for leader := range leaderCh {
|
||||||
log.Infof("New leader elected: %s", leader)
|
log.Infof("New leader elected: %s", leader)
|
||||||
if leader == addr {
|
if leader == addr {
|
||||||
proxy.SetDestination("")
|
replica.SetPrimary("")
|
||||||
} else {
|
} else {
|
||||||
proxy.SetDestination(leader)
|
replica.SetPrimary(leader)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
server.SetHandler(primary)
|
||||||
}
|
}
|
||||||
|
|
||||||
func manage(c *cli.Context) {
|
func manage(c *cli.Context) {
|
||||||
|
|
@ -208,9 +234,7 @@ func manage(c *cli.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
server := api.NewServer(hosts, tlsConfig)
|
server := api.NewServer(hosts, tlsConfig)
|
||||||
router := api.NewRouter(cl, tlsConfig, c.Bool("cors"))
|
if c.Bool("replication") {
|
||||||
|
|
||||||
if c.Bool("leader-election") {
|
|
||||||
addr := c.String("advertise")
|
addr := c.String("advertise")
|
||||||
if addr == "" {
|
if addr == "" {
|
||||||
log.Fatal("--advertise address must be provided when using --leader-election")
|
log.Fatal("--advertise address must be provided when using --leader-election")
|
||||||
|
|
@ -219,9 +243,9 @@ func manage(c *cli.Context) {
|
||||||
log.Fatal("--advertise should be of the form ip:port or hostname:port")
|
log.Fatal("--advertise should be of the form ip:port or hostname:port")
|
||||||
}
|
}
|
||||||
|
|
||||||
setupLeaderElection(server, router, discovery, addr, tlsConfig)
|
setupReplication(c, cl, server, discovery, addr, tlsConfig)
|
||||||
} else {
|
} else {
|
||||||
server.SetHandler(router)
|
server.SetHandler(api.NewPrimary(cl, tlsConfig, &statusHandler{cl, nil, nil}, c.Bool("cors")))
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Fatal(server.ListenAndServe())
|
log.Fatal(server.ListenAndServe())
|
||||||
|
|
|
||||||
|
|
@ -41,6 +41,11 @@ func (c *Candidate) ElectedCh() <-chan bool {
|
||||||
return c.electedCh
|
return c.electedCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsLeader returns true if the candidate is currently a leader.
|
||||||
|
func (c *Candidate) IsLeader() bool {
|
||||||
|
return c.leader
|
||||||
|
}
|
||||||
|
|
||||||
// RunForElection starts the leader election algorithm. Updates in status are
|
// RunForElection starts the leader election algorithm. Updates in status are
|
||||||
// pushed through the ElectedCh channel.
|
// pushed through the ElectedCh channel.
|
||||||
func (c *Candidate) RunForElection() error {
|
func (c *Candidate) RunForElection() error {
|
||||||
|
|
@ -76,8 +81,8 @@ func (c *Candidate) update(status bool) {
|
||||||
c.lock.Lock()
|
c.lock.Lock()
|
||||||
defer c.lock.Unlock()
|
defer c.lock.Unlock()
|
||||||
|
|
||||||
c.electedCh <- status
|
|
||||||
c.leader = status
|
c.leader = status
|
||||||
|
c.electedCh <- status
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Candidate) campaign(lock store.Locker) {
|
func (c *Candidate) campaign(lock store.Locker) {
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ func TestCandidate(t *testing.T) {
|
||||||
|
|
||||||
// Since the lock always succeeeds, we should get elected.
|
// Since the lock always succeeeds, we should get elected.
|
||||||
assert.True(t, <-electedCh)
|
assert.True(t, <-electedCh)
|
||||||
|
assert.True(t, candidate.IsLeader())
|
||||||
|
|
||||||
// Signaling a lost lock should get us de-elected...
|
// Signaling a lost lock should get us de-elected...
|
||||||
close(lostCh)
|
close(lostCh)
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ type Follower struct {
|
||||||
client store.Store
|
client store.Store
|
||||||
key string
|
key string
|
||||||
|
|
||||||
|
leader string
|
||||||
leaderCh chan string
|
leaderCh chan string
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
@ -28,6 +29,11 @@ func (f *Follower) LeaderCh() <-chan string {
|
||||||
return f.leaderCh
|
return f.leaderCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Leader returns the current leader.
|
||||||
|
func (f *Follower) Leader() string {
|
||||||
|
return f.leader
|
||||||
|
}
|
||||||
|
|
||||||
// FollowElection starts monitoring the election.
|
// FollowElection starts monitoring the election.
|
||||||
func (f *Follower) FollowElection() error {
|
func (f *Follower) FollowElection() error {
|
||||||
ch, err := f.client.Watch(f.key, f.stopCh)
|
ch, err := f.client.Watch(f.key, f.stopCh)
|
||||||
|
|
@ -54,13 +60,13 @@ func (f *Follower) follow(<-chan *store.KVPair) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
prev := ""
|
f.leader = ""
|
||||||
for kv := range ch {
|
for kv := range ch {
|
||||||
curr := string(kv.Value)
|
curr := string(kv.Value)
|
||||||
if curr == prev {
|
if curr == f.leader {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
prev = curr
|
f.leader = curr
|
||||||
f.leaderCh <- string(curr)
|
f.leaderCh <- f.leader
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,7 @@ func TestFollower(t *testing.T) {
|
||||||
assert.Equal(t, <-leaderCh, "leader1")
|
assert.Equal(t, <-leaderCh, "leader1")
|
||||||
assert.Equal(t, <-leaderCh, "leader2")
|
assert.Equal(t, <-leaderCh, "leader2")
|
||||||
assert.Equal(t, <-leaderCh, "leader1")
|
assert.Equal(t, <-leaderCh, "leader1")
|
||||||
|
assert.Equal(t, follower.Leader(), "leader1")
|
||||||
|
|
||||||
// Once stopped, iteration over the leader channel should stop.
|
// Once stopped, iteration over the leader channel should stop.
|
||||||
follower.Stop()
|
follower.Stop()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue