diff --git a/api/handlers.go b/api/handlers.go index e7468eefa4..f0258c71d8 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -27,7 +27,7 @@ func getInfo(c *context, w http.ResponseWriter, r *http.Request) { info := dockerclient.Info{ Containers: int64(len(c.cluster.Containers())), Images: int64(len(c.cluster.Images())), - DriverStatus: c.cluster.Info(), + DriverStatus: c.statusHandler.Status(), NEventsListener: int64(c.eventsHandler.Size()), Debug: c.debug, MemoryLimit: true, diff --git a/api/router.go b/api/primary.go similarity index 94% rename from api/router.go rename to api/primary.go index ece90870e3..ca44462267 100644 --- a/api/router.go +++ b/api/primary.go @@ -9,10 +9,11 @@ import ( "github.com/gorilla/mux" ) -// Router context, used by handlers. +// Primary router context, used by handlers. type context struct { cluster cluster.Cluster eventsHandler *eventsHandler + statusHandler StatusHandler debug bool tlsConfig *tls.Config } @@ -82,8 +83,8 @@ func writeCorsHeaders(w http.ResponseWriter, r *http.Request) { w.Header().Add("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT, OPTIONS") } -// NewRouter creates a new API router. -func NewRouter(cluster cluster.Cluster, tlsConfig *tls.Config, enableCors bool) *mux.Router { +// NewPrimary creates a new API router. +func NewPrimary(cluster cluster.Cluster, tlsConfig *tls.Config, status StatusHandler, enableCors bool) *mux.Router { // Register the API events handler in the cluster. eventsHandler := newEventsHandler() cluster.RegisterEventHandler(eventsHandler) @@ -91,6 +92,7 @@ func NewRouter(cluster cluster.Cluster, tlsConfig *tls.Config, enableCors bool) context := &context{ cluster: cluster, eventsHandler: eventsHandler, + statusHandler: status, tlsConfig: tlsConfig, } diff --git a/api/proxy.go b/api/proxy.go deleted file mode 100644 index 3ff86f4ac5..0000000000 --- a/api/proxy.go +++ /dev/null @@ -1,37 +0,0 @@ -package api - -import ( - "crypto/tls" - "net/http" -) - -// ReverseProxy is a Docker reverse proxy. -type ReverseProxy struct { - tlsConfig *tls.Config - dest string -} - -// NewReverseProxy creates a new reverse proxy. -func NewReverseProxy(tlsConfig *tls.Config) *ReverseProxy { - return &ReverseProxy{ - tlsConfig: tlsConfig, - } -} - -// SetDestination sets the HTTP destination of the Docker endpoint. -func (p *ReverseProxy) SetDestination(dest string) { - // FIXME: We have to kill current connections before doing this. - p.dest = dest -} - -// ServeHTTP is the http.Handler. -func (p *ReverseProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if p.dest == "" { - httpError(w, "No cluster leader", http.StatusInternalServerError) - return - } - - if err := hijack(p.tlsConfig, p.dest, w, r); err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - } -} diff --git a/api/replica.go b/api/replica.go new file mode 100644 index 0000000000..b9a12e371a --- /dev/null +++ b/api/replica.go @@ -0,0 +1,52 @@ +package api + +import ( + "crypto/tls" + "fmt" + "net/http" + "strings" +) + +var localRoutes = []string{"/info", "/_ping"} + +// Replica is an API replica that reserves proxy to the primary. +type Replica struct { + handler http.Handler + tlsConfig *tls.Config + primary string +} + +// NewReplica creates a new API replica. +func NewReplica(handler http.Handler, tlsConfig *tls.Config) *Replica { + return &Replica{ + handler: handler, + tlsConfig: tlsConfig, + } +} + +// SetPrimary sets the address of the primary Swarm manager +func (p *Replica) SetPrimary(primary string) { + // FIXME: We have to kill current connections before doing this. + p.primary = primary +} + +// ServeHTTP is the http.Handler. +func (p *Replica) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Check whether we should handle this request locally. + for _, route := range localRoutes { + if strings.HasSuffix(r.URL.Path, route) { + p.handler.ServeHTTP(w, r) + return + } + } + + // Otherwise, forward. + if p.primary == "" { + httpError(w, "No elected primary cluster manager", http.StatusInternalServerError) + return + } + + if err := hijack(p.tlsConfig, p.primary, w, r); err != nil { + httpError(w, fmt.Sprintf("Unable to reach primary cluster manager (%s): %v", err, p.primary), http.StatusInternalServerError) + } +} diff --git a/api/status.go b/api/status.go new file mode 100644 index 0000000000..962e5c9ada --- /dev/null +++ b/api/status.go @@ -0,0 +1,7 @@ +package api + +// StatusHandler allows the API to display extra information on docker info. +type StatusHandler interface { + // Info provides key/values to be added to docker info. + Status() [][]string +} diff --git a/cli/flags.go b/cli/flags.go index 70b7be61ff..bec32bfe1d 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -119,7 +119,7 @@ var ( } flLeaderElection = cli.BoolFlag{ - Name: "leader-election", - Usage: "Enable cluster leader election between Swarm managers", + Name: "replication", + Usage: "Enable Swarm manager replication", } ) diff --git a/cli/manage.go b/cli/manage.go index 657710e925..cb0cec02ab 100644 --- a/cli/manage.go +++ b/cli/manage.go @@ -5,7 +5,6 @@ import ( "crypto/x509" "fmt" "io/ioutil" - "net/http" "path" "time" @@ -41,6 +40,30 @@ func (h *logHandler) Handle(e *cluster.Event) error { return nil } +type statusHandler struct { + cluster cluster.Cluster + candidate *leadership.Candidate + follower *leadership.Follower +} + +func (h *statusHandler) Status() [][]string { + var status [][]string + + if h.candidate != nil && !h.candidate.IsLeader() { + status = [][]string{ + {"\bRole", "replica"}, + {"\bPrimary", h.follower.Leader()}, + } + } else { + status = [][]string{ + {"\bRole", "primary"}, + } + } + + status = append(status, h.cluster.Info()...) + return status +} + // Load the TLS certificates/keys and, if verify is true, the CA. func loadTLSConfig(ca, cert, key string, verify bool) (*tls.Config, error) { c, err := tls.LoadX509KeyPair(cert, key) @@ -91,7 +114,7 @@ func createDiscovery(uri string, c *cli.Context) discovery.Discovery { return discovery } -func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery discovery.Discovery, addr string, tlsConfig *tls.Config) { +func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Server, discovery discovery.Discovery, addr string, tlsConfig *tls.Config) { kvDiscovery, ok := discovery.(*kvdiscovery.Discovery) if !ok { log.Fatal("Leader election is only supported with consul, etcd and zookeeper discovery.") @@ -101,7 +124,8 @@ func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery candidate := leadership.NewCandidate(client, leaderElectionPath, addr) follower := leadership.NewFollower(client, leaderElectionPath) - proxy := api.NewReverseProxy(tlsConfig) + primary := api.NewPrimary(cluster, tlsConfig, &statusHandler{cluster, candidate, follower}, c.Bool("cors")) + replica := api.NewReplica(primary, tlsConfig) go func() { candidate.RunForElection() @@ -109,10 +133,10 @@ func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery for isElected := range electedCh { if isElected { log.Info("Cluster leadership acquired") - server.SetHandler(apiHandler) + server.SetHandler(primary) } else { log.Info("Cluster leadership lost") - server.SetHandler(proxy) + server.SetHandler(replica) } } }() @@ -123,12 +147,14 @@ func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery for leader := range leaderCh { log.Infof("New leader elected: %s", leader) if leader == addr { - proxy.SetDestination("") + replica.SetPrimary("") } else { - proxy.SetDestination(leader) + replica.SetPrimary(leader) } } }() + + server.SetHandler(primary) } func manage(c *cli.Context) { @@ -208,9 +234,7 @@ func manage(c *cli.Context) { } server := api.NewServer(hosts, tlsConfig) - router := api.NewRouter(cl, tlsConfig, c.Bool("cors")) - - if c.Bool("leader-election") { + if c.Bool("replication") { addr := c.String("advertise") if addr == "" { log.Fatal("--advertise address must be provided when using --leader-election") @@ -219,9 +243,9 @@ func manage(c *cli.Context) { log.Fatal("--advertise should be of the form ip:port or hostname:port") } - setupLeaderElection(server, router, discovery, addr, tlsConfig) + setupReplication(c, cl, server, discovery, addr, tlsConfig) } else { - server.SetHandler(router) + server.SetHandler(api.NewPrimary(cl, tlsConfig, &statusHandler{cl, nil, nil}, c.Bool("cors"))) } log.Fatal(server.ListenAndServe()) diff --git a/leadership/candidate.go b/leadership/candidate.go index 9e140b57c8..b7f7007208 100644 --- a/leadership/candidate.go +++ b/leadership/candidate.go @@ -41,6 +41,11 @@ func (c *Candidate) ElectedCh() <-chan bool { return c.electedCh } +// IsLeader returns true if the candidate is currently a leader. +func (c *Candidate) IsLeader() bool { + return c.leader +} + // RunForElection starts the leader election algorithm. Updates in status are // pushed through the ElectedCh channel. func (c *Candidate) RunForElection() error { @@ -76,8 +81,8 @@ func (c *Candidate) update(status bool) { c.lock.Lock() defer c.lock.Unlock() - c.electedCh <- status c.leader = status + c.electedCh <- status } func (c *Candidate) campaign(lock store.Locker) { diff --git a/leadership/candidate_test.go b/leadership/candidate_test.go index f2758cfbbf..bd63fcf353 100644 --- a/leadership/candidate_test.go +++ b/leadership/candidate_test.go @@ -31,6 +31,7 @@ func TestCandidate(t *testing.T) { // Since the lock always succeeeds, we should get elected. assert.True(t, <-electedCh) + assert.True(t, candidate.IsLeader()) // Signaling a lost lock should get us de-elected... close(lostCh) diff --git a/leadership/follower.go b/leadership/follower.go index 1a3a74483a..ff75a28cbc 100644 --- a/leadership/follower.go +++ b/leadership/follower.go @@ -8,6 +8,7 @@ type Follower struct { client store.Store key string + leader string leaderCh chan string stopCh chan struct{} } @@ -28,6 +29,11 @@ func (f *Follower) LeaderCh() <-chan string { return f.leaderCh } +// Leader returns the current leader. +func (f *Follower) Leader() string { + return f.leader +} + // FollowElection starts monitoring the election. func (f *Follower) FollowElection() error { ch, err := f.client.Watch(f.key, f.stopCh) @@ -54,13 +60,13 @@ func (f *Follower) follow(<-chan *store.KVPair) { return } - prev := "" + f.leader = "" for kv := range ch { curr := string(kv.Value) - if curr == prev { + if curr == f.leader { continue } - prev = curr - f.leaderCh <- string(curr) + f.leader = curr + f.leaderCh <- f.leader } } diff --git a/leadership/follower_test.go b/leadership/follower_test.go index bd0827b81c..943162ae77 100644 --- a/leadership/follower_test.go +++ b/leadership/follower_test.go @@ -34,6 +34,7 @@ func TestFollower(t *testing.T) { assert.Equal(t, <-leaderCh, "leader1") assert.Equal(t, <-leaderCh, "leader2") assert.Equal(t, <-leaderCh, "leader1") + assert.Equal(t, follower.Leader(), "leader1") // Once stopped, iteration over the leader channel should stop. follower.Stop()