leader election: Display replica status in docker info.

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-06-01 13:30:43 -07:00
parent 3e2fb13d82
commit d63de2da48
6 changed files with 63 additions and 15 deletions

View File

@ -27,7 +27,7 @@ func getInfo(c *context, w http.ResponseWriter, r *http.Request) {
info := dockerclient.Info{ info := dockerclient.Info{
Containers: int64(len(c.cluster.Containers())), Containers: int64(len(c.cluster.Containers())),
Images: int64(len(c.cluster.Images())), Images: int64(len(c.cluster.Images())),
DriverStatus: c.cluster.Info(), DriverStatus: c.statusHandler.Status(),
NEventsListener: int64(c.eventsHandler.Size()), NEventsListener: int64(c.eventsHandler.Size()),
Debug: c.debug, Debug: c.debug,
MemoryLimit: true, MemoryLimit: true,

View File

@ -2,18 +2,24 @@ package api
import ( import (
"crypto/tls" "crypto/tls"
"fmt"
"net/http" "net/http"
"strings"
) )
var localRoutes = []string{"/info", "/_ping"}
// ReverseProxy is a Docker reverse proxy. // ReverseProxy is a Docker reverse proxy.
type ReverseProxy struct { type ReverseProxy struct {
api http.Handler
tlsConfig *tls.Config tlsConfig *tls.Config
dest string dest string
} }
// NewReverseProxy creates a new reverse proxy. // NewReverseProxy creates a new reverse proxy.
func NewReverseProxy(tlsConfig *tls.Config) *ReverseProxy { func NewReverseProxy(api http.Handler, tlsConfig *tls.Config) *ReverseProxy {
return &ReverseProxy{ return &ReverseProxy{
api: api,
tlsConfig: tlsConfig, tlsConfig: tlsConfig,
} }
} }
@ -26,12 +32,21 @@ func (p *ReverseProxy) SetDestination(dest string) {
// ServeHTTP is the http.Handler. // ServeHTTP is the http.Handler.
func (p *ReverseProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (p *ReverseProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Check whether we should handle this request locally.
for _, route := range localRoutes {
if strings.HasSuffix(r.URL.Path, route) {
p.api.ServeHTTP(w, r)
return
}
}
// Otherwise, forward.
if p.dest == "" { if p.dest == "" {
httpError(w, "No cluster leader", http.StatusInternalServerError) httpError(w, "No cluster leader", http.StatusInternalServerError)
return return
} }
if err := hijack(p.tlsConfig, p.dest, w, r); err != nil { if err := hijack(p.tlsConfig, p.dest, w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError) httpError(w, fmt.Sprintf("Unable to reach cluster leader: %v", err), http.StatusInternalServerError)
} }
} }

View File

@ -13,6 +13,7 @@ import (
type context struct { type context struct {
cluster cluster.Cluster cluster cluster.Cluster
eventsHandler *eventsHandler eventsHandler *eventsHandler
statusHandler StatusHandler
debug bool debug bool
tlsConfig *tls.Config tlsConfig *tls.Config
} }
@ -83,7 +84,7 @@ func writeCorsHeaders(w http.ResponseWriter, r *http.Request) {
} }
// NewRouter creates a new API router. // NewRouter creates a new API router.
func NewRouter(cluster cluster.Cluster, tlsConfig *tls.Config, enableCors bool) *mux.Router { func NewRouter(cluster cluster.Cluster, tlsConfig *tls.Config, status StatusHandler, enableCors bool) *mux.Router {
// Register the API events handler in the cluster. // Register the API events handler in the cluster.
eventsHandler := newEventsHandler() eventsHandler := newEventsHandler()
cluster.RegisterEventHandler(eventsHandler) cluster.RegisterEventHandler(eventsHandler)
@ -91,6 +92,7 @@ func NewRouter(cluster cluster.Cluster, tlsConfig *tls.Config, enableCors bool)
context := &context{ context := &context{
cluster: cluster, cluster: cluster,
eventsHandler: eventsHandler, eventsHandler: eventsHandler,
statusHandler: status,
tlsConfig: tlsConfig, tlsConfig: tlsConfig,
} }

7
api/status.go Normal file
View File

@ -0,0 +1,7 @@
package api
// StatusHandler allows the API to display extra information on docker info.
type StatusHandler interface {
// Info provides key/values to be added to docker info.
Status() [][]string
}

View File

@ -119,7 +119,7 @@ var (
} }
flLeaderElection = cli.BoolFlag{ flLeaderElection = cli.BoolFlag{
Name: "leader-election", Name: "replication",
Usage: "Enable cluster leader election between Swarm managers", Usage: "Enable Swarm manager replication",
} }
) )

View File

@ -5,7 +5,6 @@ import (
"crypto/x509" "crypto/x509"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http"
"path" "path"
"time" "time"
@ -41,6 +40,30 @@ func (h *logHandler) Handle(e *cluster.Event) error {
return nil return nil
} }
type statusHandler struct {
cluster cluster.Cluster
candidate *leadership.Candidate
follower *leadership.Follower
}
func (h *statusHandler) Status() [][]string {
var status [][]string
if h.candidate != nil && !h.candidate.IsLeader() {
status = [][]string{
{"\bRole", "replica"},
{"\bPrimary", h.follower.Leader()},
}
} else {
status = [][]string{
{"\bRole", "primary"},
}
}
status = append(status, h.cluster.Info()...)
return status
}
// Load the TLS certificates/keys and, if verify is true, the CA. // Load the TLS certificates/keys and, if verify is true, the CA.
func loadTLSConfig(ca, cert, key string, verify bool) (*tls.Config, error) { func loadTLSConfig(ca, cert, key string, verify bool) (*tls.Config, error) {
c, err := tls.LoadX509KeyPair(cert, key) c, err := tls.LoadX509KeyPair(cert, key)
@ -91,7 +114,7 @@ func createDiscovery(uri string, c *cli.Context) discovery.Discovery {
return discovery return discovery
} }
func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery discovery.Discovery, addr string, tlsConfig *tls.Config) { func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Server, discovery discovery.Discovery, addr string, tlsConfig *tls.Config) {
kvDiscovery, ok := discovery.(*kvdiscovery.Discovery) kvDiscovery, ok := discovery.(*kvdiscovery.Discovery)
if !ok { if !ok {
log.Fatal("Leader election is only supported with consul, etcd and zookeeper discovery.") log.Fatal("Leader election is only supported with consul, etcd and zookeeper discovery.")
@ -101,7 +124,8 @@ func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery
candidate := leadership.NewCandidate(client, leaderElectionPath, addr) candidate := leadership.NewCandidate(client, leaderElectionPath, addr)
follower := leadership.NewFollower(client, leaderElectionPath) follower := leadership.NewFollower(client, leaderElectionPath)
proxy := api.NewReverseProxy(tlsConfig) router := api.NewRouter(cluster, tlsConfig, &statusHandler{cluster, candidate, follower}, c.Bool("cors"))
proxy := api.NewReverseProxy(router, tlsConfig)
go func() { go func() {
candidate.RunForElection() candidate.RunForElection()
@ -109,7 +133,7 @@ func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery
for isElected := range electedCh { for isElected := range electedCh {
if isElected { if isElected {
log.Info("Cluster leadership acquired") log.Info("Cluster leadership acquired")
server.SetHandler(apiHandler) server.SetHandler(router)
} else { } else {
log.Info("Cluster leadership lost") log.Info("Cluster leadership lost")
server.SetHandler(proxy) server.SetHandler(proxy)
@ -129,6 +153,8 @@ func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery
} }
} }
}() }()
server.SetHandler(router)
} }
func manage(c *cli.Context) { func manage(c *cli.Context) {
@ -208,9 +234,7 @@ func manage(c *cli.Context) {
} }
server := api.NewServer(hosts, tlsConfig) server := api.NewServer(hosts, tlsConfig)
router := api.NewRouter(cl, tlsConfig, c.Bool("cors")) if c.Bool("replication") {
if c.Bool("leader-election") {
addr := c.String("advertise") addr := c.String("advertise")
if addr == "" { if addr == "" {
log.Fatal("--advertise address must be provided when using --leader-election") log.Fatal("--advertise address must be provided when using --leader-election")
@ -219,9 +243,9 @@ func manage(c *cli.Context) {
log.Fatal("--advertise should be of the form ip:port or hostname:port") log.Fatal("--advertise should be of the form ip:port or hostname:port")
} }
setupLeaderElection(server, router, discovery, addr, tlsConfig) setupReplication(c, cl, server, discovery, addr, tlsConfig)
} else { } else {
server.SetHandler(router) server.SetHandler(api.NewRouter(cl, tlsConfig, &statusHandler{cl, nil, nil}, c.Bool("cors")))
} }
log.Fatal(server.ListenAndServe()) log.Fatal(server.ListenAndServe())