api: Integrate leader election.

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2015-05-22 20:07:41 -07:00
parent d8a0ba5d88
commit b4efc08dfc
10 changed files with 201 additions and 52 deletions

37
api/proxy.go Normal file
View File

@ -0,0 +1,37 @@
package api
import (
"crypto/tls"
"net/http"
)
// ReverseProxy is a Docker reverse proxy.
type ReverseProxy struct {
tlsConfig *tls.Config
dest string
}
// NewReverseProxy creates a new reverse proxy.
func NewReverseProxy(tlsConfig *tls.Config) *ReverseProxy {
return &ReverseProxy{
tlsConfig: tlsConfig,
}
}
// SetDestination sets the HTTP destination of the Docker endpoint.
func (p *ReverseProxy) SetDestination(dest string) {
// FIXME: We have to kill current connections before doing this.
p.dest = dest
}
// ServeHTTP is the http.Handler.
func (p *ReverseProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if p.dest == "" {
httpError(w, "No cluster leader", http.StatusInternalServerError)
return
}
if err := hijack(p.tlsConfig, p.dest, w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}

View File

@ -82,7 +82,18 @@ func writeCorsHeaders(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT, OPTIONS")
}
func createRouter(c *context, enableCors bool) *mux.Router {
// NewRouter creates a new API router.
func NewRouter(cluster cluster.Cluster, tlsConfig *tls.Config, enableCors bool) *mux.Router {
// Register the API events handler in the cluster.
eventsHandler := newEventsHandler()
cluster.RegisterEventHandler(eventsHandler)
context := &context{
cluster: cluster,
eventsHandler: eventsHandler,
tlsConfig: tlsConfig,
}
r := mux.NewRouter()
for method, mappings := range routes {
for route, fct := range mappings {
@ -95,7 +106,7 @@ func createRouter(c *context, enableCors bool) *mux.Router {
if enableCors {
writeCorsHeaders(w, r)
}
localFct(c, w, r)
localFct(context, w, r)
}
localMethod := method

View File

@ -8,12 +8,52 @@ import (
"strings"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
)
// The default port to listen on for incoming connections
const DefaultDockerPort = ":2375"
// Dispatcher is a meta http.Handler. It acts as an http.Handler and forwards
// requests to another http.Handler that can be changed at runtime.
type dispatcher struct {
handler http.Handler
}
// SetHandler changes the underlying handler.
func (d *dispatcher) SetHandler(handler http.Handler) {
d.handler = handler
}
// ServeHTTP forwards requests to the underlying handler.
func (d *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if d.handler == nil {
httpError(w, "No dispatcher defined", http.StatusInternalServerError)
}
d.handler.ServeHTTP(w, r)
}
// Server is a Docker API server.
type Server struct {
hosts []string
tlsConfig *tls.Config
dispatcher *dispatcher
}
// NewServer creates an api.Server.
func NewServer(hosts []string, tlsConfig *tls.Config) *Server {
return &Server{
hosts: hosts,
tlsConfig: tlsConfig,
dispatcher: &dispatcher{},
}
}
// SetHandler is used to overwrite the HTTP handler for the API.
// This can be the api router or a reverse proxy.
func (s *Server) SetHandler(handler http.Handler) {
s.dispatcher.SetHandler(handler)
}
func newListener(proto, addr string, tlsConfig *tls.Config) (net.Listener, error) {
l, err := net.Listen(proto, addr)
if err != nil {
@ -35,20 +75,10 @@ func newListener(proto, addr string, tlsConfig *tls.Config) (net.Listener, error
//
// The expected format for a host string is [protocol://]address. The protocol
// must be either "tcp" or "unix", with "tcp" used by default if not specified.
func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfig *tls.Config) error {
// Register the API events handler in the cluster.
eventsHandler := newEventsHandler()
c.RegisterEventHandler(eventsHandler)
func (s *Server) ListenAndServe() error {
chErrors := make(chan error, len(s.hosts))
context := &context{
cluster: c,
eventsHandler: eventsHandler,
tlsConfig: tlsConfig,
}
r := createRouter(context, enableCors)
chErrors := make(chan error, len(hosts))
for _, host := range hosts {
for _, host := range s.hosts {
protoAddrParts := strings.SplitN(host, "://", 2)
if len(protoAddrParts) == 1 {
protoAddrParts = append([]string{"tcp"}, protoAddrParts...)
@ -62,15 +92,15 @@ func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfi
err error
server = &http.Server{
Addr: protoAddrParts[1],
Handler: r,
Handler: s.dispatcher,
}
)
switch protoAddrParts[0] {
case "unix":
l, err = newUnixListener(protoAddrParts[1], tlsConfig)
l, err = newUnixListener(protoAddrParts[1], s.tlsConfig)
case "tcp":
l, err = newListener("tcp", protoAddrParts[1], tlsConfig)
l, err = newListener("tcp", protoAddrParts[1], s.tlsConfig)
default:
err = fmt.Errorf("unsupported protocol: %q", protoAddrParts[0])
}
@ -83,7 +113,7 @@ func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfi
}()
}
for i := 0; i < len(hosts); i++ {
for i := 0; i < len(s.hosts); i++ {
err := <-chErrors
if err != nil {
return err

View File

@ -114,7 +114,9 @@ func Run() {
flStore,
flStrategy, flFilter,
flHosts,
flLeaderElection, flAddr,
flTLS, flTLSCaCert, flTLSCert, flTLSKey, flTLSVerify,
flHeartBeat,
flEnableCors,
flCluster, flClusterOpt},
Action: manage,

View File

@ -114,4 +114,9 @@ var (
Usage: "cluster driver options",
Value: &cli.StringSlice{},
}
flLeaderElection = cli.BoolFlag{
Name: "leader-election",
Usage: "Enable cluster leader election between Swarm managers",
}
)

View File

@ -44,7 +44,7 @@ Arguments:
Options:
{{range .Flags}}{{.}}
{{end}}{{if (eq .Name "manage")}}{{printf "\t * swarm.overcommit=0.05\tovercommit to apply on resources"}}
{{printf "\t * swarm.discovery.heartbeat=25s\tperiod between each heartbeat"}}{{end}}{{ end }}
{{end}}{{ end }}
`
}

View File

@ -5,19 +5,28 @@ import (
"crypto/x509"
"fmt"
"io/ioutil"
"net/http"
"path"
"time"
log "github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
"github.com/docker/swarm/api"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/cluster/swarm"
"github.com/docker/swarm/discovery"
kvdiscovery "github.com/docker/swarm/discovery/kv"
"github.com/docker/swarm/leadership"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/scheduler/strategy"
"github.com/docker/swarm/state"
)
const (
leaderElectionPath = "docker/swarm/leader"
)
type logHandler struct {
}
@ -62,6 +71,70 @@ func loadTLSConfig(ca, cert, key string, verify bool) (*tls.Config, error) {
return config, nil
}
// Initialize the discovery service.
func createDiscovery(c *cli.Context) discovery.Discovery {
uri := getDiscovery(c)
if uri == "" {
log.Fatalf("discovery required to manage a cluster. See '%s manage --help'.", c.App.Name)
}
hb, err := time.ParseDuration(c.String("heartbeat"))
if err != nil {
log.Fatalf("invalid --heartbeat: %v", err)
}
if hb < 1*time.Second {
log.Fatal("--heartbeat should be at least one second")
}
// Set up discovery.
discovery, err := discovery.New(uri, hb, 0)
if err != nil {
log.Fatal(err)
}
return discovery
}
func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery discovery.Discovery, addr string, tlsConfig *tls.Config) {
kvDiscovery, ok := discovery.(*kvdiscovery.Discovery)
if !ok {
log.Fatal("Leader election is only supported with consul, etcd and zookeeper discovery.")
}
client := kvDiscovery.Store()
candidate := leadership.NewCandidate(client, leaderElectionPath, addr)
follower := leadership.NewFollower(client, leaderElectionPath)
proxy := api.NewReverseProxy(tlsConfig)
go func() {
candidate.RunForElection()
electedCh := candidate.ElectedCh()
for isElected := range electedCh {
if isElected {
log.Info("Cluster leadership acquired")
server.SetHandler(apiHandler)
} else {
log.Info("Cluster leadership lost")
server.SetHandler(proxy)
}
}
}()
go func() {
follower.FollowElection()
leaderCh := follower.LeaderCh()
for leader := range leaderCh {
log.Infof("New leader elected: %s", leader)
if leader == addr {
proxy.SetDestination("")
} else {
proxy.SetDestination(leader)
}
}
}()
}
func manage(c *cli.Context) {
var (
tlsConfig *tls.Config
@ -97,10 +170,7 @@ func manage(c *cli.Context) {
log.Fatal(err)
}
dflag := getDiscovery(c)
if dflag == "" {
log.Fatalf("discovery required to manage a cluster. See '%s manage --help'.", c.App.Name)
}
discovery := createDiscovery(c)
s, err := strategy.New(c.String("strategy"))
if err != nil {
@ -119,7 +189,7 @@ func manage(c *cli.Context) {
sched := scheduler.New(s, fs)
cluster, err := swarm.NewCluster(sched, store, tlsConfig, dflag, c.StringSlice("cluster-opt"))
cluster, err := swarm.NewCluster(sched, store, tlsConfig, discovery, c.StringSlice("cluster-opt"))
if err != nil {
log.Fatal(err)
}
@ -129,5 +199,15 @@ func manage(c *cli.Context) {
if c.IsSet("host") || c.IsSet("H") {
hosts = hosts[1:]
}
log.Fatal(api.ListenAndServe(cluster, hosts, c.Bool("cors"), tlsConfig))
server := api.NewServer(hosts, tlsConfig)
router := api.NewRouter(cluster, tlsConfig, c.Bool("cors"))
if c.Bool("leader-election") {
setupLeaderElection(server, router, discovery, c.String("addr"), tlsConfig)
} else {
server.SetHandler(router)
}
log.Fatal(server.ListenAndServe())
}

View File

@ -8,7 +8,6 @@ import (
"sort"
"strings"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/stringid"
@ -36,42 +35,22 @@ type Cluster struct {
}
// NewCluster is exported
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *tls.Config, dflag string, options cluster.DriverOpts) (cluster.Cluster, error) {
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *tls.Config, discovery discovery.Discovery, options cluster.DriverOpts) (cluster.Cluster, error) {
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")
var (
err error
)
cluster := &Cluster{
engines: make(map[string]*cluster.Engine),
scheduler: scheduler,
store: store,
overcommitRatio: 0.05,
TLSConfig: TLSConfig,
discovery: discovery,
overcommitRatio: 0.05,
}
if val, ok := options.Float("swarm.overcommit", ""); ok {
cluster.overcommitRatio = val
}
heartbeat := 25 * time.Second
if opt, ok := options.String("swarm.discovery.heartbeat", ""); ok {
h, err := time.ParseDuration(opt)
if err != nil {
return nil, err
}
if h < 1*time.Second {
return nil, fmt.Errorf("invalid heartbeat %s: must be at least 1s", opt)
}
heartbeat = h
}
// Set up discovery.
cluster.discovery, err = discovery.New(dflag, heartbeat, 0)
if err != nil {
log.Fatal(err)
}
discoveryCh, errCh := cluster.discovery.Watch(nil)
go cluster.monitorDiscovery(discoveryCh, errCh)

View File

@ -131,3 +131,8 @@ func (s *Discovery) Register(addr string) error {
opts := &store.WriteOptions{Ephemeral: true, Heartbeat: s.heartbeat}
return s.store.Put(path.Join(s.prefix, addr), []byte(addr), opts)
}
// Store returns the underlying store used by KV discovery.
func (s *Discovery) Store() store.Store {
return s.store
}

View File

@ -94,7 +94,7 @@ function swarm_manage() {
discovery="$@"
fi
"$SWARM_BINARY" -l debug manage -H "$SWARM_HOST" --cluster-opt "swarm.discovery.heartbeat=1s" "$discovery" &
"$SWARM_BINARY" -l debug manage -H "$SWARM_HOST" --heartbeat=1s "$discovery" &
SWARM_PID=$!
wait_until_reachable "$SWARM_HOST"
}