Merge pull request #835 from aluzzardi/leader-election

Leader election
This commit is contained in:
Victor Vieux 2015-05-26 10:11:19 -07:00
commit ad8a9724e2
16 changed files with 246 additions and 120 deletions

View File

@ -1,37 +0,0 @@
package api
import (
"encoding/json"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/version"
"github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
"testing"
)
func serveRequest(c cluster.Cluster, w http.ResponseWriter, req *http.Request) error {
context := &context{
cluster: c,
}
r := createRouter(context, false)
r.ServeHTTP(w, req)
return nil
}
func TestGetVersion(t *testing.T) {
r := httptest.NewRecorder()
req, err := http.NewRequest("GET", "/version", nil)
assert.NoError(t, err)
assert.NoError(t, serveRequest(nil, r, req))
assert.Equal(t, r.Code, http.StatusOK)
v := struct {
Version string
}{}
json.NewDecoder(r.Body).Decode(&v)
assert.Equal(t, v.Version, "swarm/"+version.VERSION)
}

37
api/proxy.go Normal file
View File

@ -0,0 +1,37 @@
package api
import (
"crypto/tls"
"net/http"
)
// ReverseProxy is a Docker reverse proxy.
type ReverseProxy struct {
tlsConfig *tls.Config
dest string
}
// NewReverseProxy creates a new reverse proxy.
func NewReverseProxy(tlsConfig *tls.Config) *ReverseProxy {
return &ReverseProxy{
tlsConfig: tlsConfig,
}
}
// SetDestination sets the HTTP destination of the Docker endpoint.
func (p *ReverseProxy) SetDestination(dest string) {
// FIXME: We have to kill current connections before doing this.
p.dest = dest
}
// ServeHTTP is the http.Handler.
func (p *ReverseProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if p.dest == "" {
httpError(w, "No cluster leader", http.StatusInternalServerError)
return
}
if err := hijack(p.tlsConfig, p.dest, w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}

View File

@ -82,7 +82,18 @@ func writeCorsHeaders(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT, OPTIONS")
}
func createRouter(c *context, enableCors bool) *mux.Router {
// NewRouter creates a new API router.
func NewRouter(cluster cluster.Cluster, tlsConfig *tls.Config, enableCors bool) *mux.Router {
// Register the API events handler in the cluster.
eventsHandler := newEventsHandler()
cluster.RegisterEventHandler(eventsHandler)
context := &context{
cluster: cluster,
eventsHandler: eventsHandler,
tlsConfig: tlsConfig,
}
r := mux.NewRouter()
for method, mappings := range routes {
for route, fct := range mappings {
@ -95,7 +106,7 @@ func createRouter(c *context, enableCors bool) *mux.Router {
if enableCors {
writeCorsHeaders(w, r)
}
localFct(c, w, r)
localFct(context, w, r)
}
localMethod := method

View File

@ -8,12 +8,52 @@ import (
"strings"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
)
// The default port to listen on for incoming connections
const DefaultDockerPort = ":2375"
// Dispatcher is a meta http.Handler. It acts as an http.Handler and forwards
// requests to another http.Handler that can be changed at runtime.
type dispatcher struct {
handler http.Handler
}
// SetHandler changes the underlying handler.
func (d *dispatcher) SetHandler(handler http.Handler) {
d.handler = handler
}
// ServeHTTP forwards requests to the underlying handler.
func (d *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if d.handler == nil {
httpError(w, "No dispatcher defined", http.StatusInternalServerError)
}
d.handler.ServeHTTP(w, r)
}
// Server is a Docker API server.
type Server struct {
hosts []string
tlsConfig *tls.Config
dispatcher *dispatcher
}
// NewServer creates an api.Server.
func NewServer(hosts []string, tlsConfig *tls.Config) *Server {
return &Server{
hosts: hosts,
tlsConfig: tlsConfig,
dispatcher: &dispatcher{},
}
}
// SetHandler is used to overwrite the HTTP handler for the API.
// This can be the api router or a reverse proxy.
func (s *Server) SetHandler(handler http.Handler) {
s.dispatcher.SetHandler(handler)
}
func newListener(proto, addr string, tlsConfig *tls.Config) (net.Listener, error) {
l, err := net.Listen(proto, addr)
if err != nil {
@ -35,20 +75,10 @@ func newListener(proto, addr string, tlsConfig *tls.Config) (net.Listener, error
//
// The expected format for a host string is [protocol://]address. The protocol
// must be either "tcp" or "unix", with "tcp" used by default if not specified.
func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfig *tls.Config) error {
// Register the API events handler in the cluster.
eventsHandler := newEventsHandler()
c.RegisterEventHandler(eventsHandler)
func (s *Server) ListenAndServe() error {
chErrors := make(chan error, len(s.hosts))
context := &context{
cluster: c,
eventsHandler: eventsHandler,
tlsConfig: tlsConfig,
}
r := createRouter(context, enableCors)
chErrors := make(chan error, len(hosts))
for _, host := range hosts {
for _, host := range s.hosts {
protoAddrParts := strings.SplitN(host, "://", 2)
if len(protoAddrParts) == 1 {
protoAddrParts = append([]string{"tcp"}, protoAddrParts...)
@ -62,15 +92,15 @@ func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfi
err error
server = &http.Server{
Addr: protoAddrParts[1],
Handler: r,
Handler: s.dispatcher,
}
)
switch protoAddrParts[0] {
case "unix":
l, err = newUnixListener(protoAddrParts[1], tlsConfig)
l, err = newUnixListener(protoAddrParts[1], s.tlsConfig)
case "tcp":
l, err = newListener("tcp", protoAddrParts[1], tlsConfig)
l, err = newListener("tcp", protoAddrParts[1], s.tlsConfig)
default:
err = fmt.Errorf("unsupported protocol: %q", protoAddrParts[0])
}
@ -83,7 +113,7 @@ func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfi
}()
}
for i := 0; i < len(hosts); i++ {
for i := 0; i < len(s.hosts); i++ {
err := <-chErrors
if err != nil {
return err

View File

@ -114,7 +114,9 @@ func Run() {
flStore,
flStrategy, flFilter,
flHosts,
flLeaderElection, flAddr,
flTLS, flTLSCaCert, flTLSCert, flTLSKey, flTLSVerify,
flHeartBeat,
flEnableCors,
flCluster, flClusterOpt},
Action: manage,

View File

@ -114,4 +114,9 @@ var (
Usage: "cluster driver options",
Value: &cli.StringSlice{},
}
flLeaderElection = cli.BoolFlag{
Name: "leader-election",
Usage: "Enable cluster leader election between Swarm managers",
}
)

View File

@ -44,7 +44,7 @@ Arguments:
Options:
{{range .Flags}}{{.}}
{{end}}{{if (eq .Name "manage")}}{{printf "\t * swarm.overcommit=0.05\tovercommit to apply on resources"}}
{{printf "\t * swarm.discovery.heartbeat=25s\tperiod between each heartbeat"}}{{end}}{{ end }}
{{end}}{{ end }}
`
}

View File

@ -5,19 +5,28 @@ import (
"crypto/x509"
"fmt"
"io/ioutil"
"net/http"
"path"
"time"
log "github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
"github.com/docker/swarm/api"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/cluster/swarm"
"github.com/docker/swarm/discovery"
kvdiscovery "github.com/docker/swarm/discovery/kv"
"github.com/docker/swarm/leadership"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/scheduler/strategy"
"github.com/docker/swarm/state"
)
const (
leaderElectionPath = "docker/swarm/leader"
)
type logHandler struct {
}
@ -62,6 +71,70 @@ func loadTLSConfig(ca, cert, key string, verify bool) (*tls.Config, error) {
return config, nil
}
// Initialize the discovery service.
func createDiscovery(c *cli.Context) discovery.Discovery {
uri := getDiscovery(c)
if uri == "" {
log.Fatalf("discovery required to manage a cluster. See '%s manage --help'.", c.App.Name)
}
hb, err := time.ParseDuration(c.String("heartbeat"))
if err != nil {
log.Fatalf("invalid --heartbeat: %v", err)
}
if hb < 1*time.Second {
log.Fatal("--heartbeat should be at least one second")
}
// Set up discovery.
discovery, err := discovery.New(uri, hb, 0)
if err != nil {
log.Fatal(err)
}
return discovery
}
func setupLeaderElection(server *api.Server, apiHandler http.Handler, discovery discovery.Discovery, addr string, tlsConfig *tls.Config) {
kvDiscovery, ok := discovery.(*kvdiscovery.Discovery)
if !ok {
log.Fatal("Leader election is only supported with consul, etcd and zookeeper discovery.")
}
client := kvDiscovery.Store()
candidate := leadership.NewCandidate(client, leaderElectionPath, addr)
follower := leadership.NewFollower(client, leaderElectionPath)
proxy := api.NewReverseProxy(tlsConfig)
go func() {
candidate.RunForElection()
electedCh := candidate.ElectedCh()
for isElected := range electedCh {
if isElected {
log.Info("Cluster leadership acquired")
server.SetHandler(apiHandler)
} else {
log.Info("Cluster leadership lost")
server.SetHandler(proxy)
}
}
}()
go func() {
follower.FollowElection()
leaderCh := follower.LeaderCh()
for leader := range leaderCh {
log.Infof("New leader elected: %s", leader)
if leader == addr {
proxy.SetDestination("")
} else {
proxy.SetDestination(leader)
}
}
}()
}
func manage(c *cli.Context) {
var (
tlsConfig *tls.Config
@ -97,10 +170,7 @@ func manage(c *cli.Context) {
log.Fatal(err)
}
dflag := getDiscovery(c)
if dflag == "" {
log.Fatalf("discovery required to manage a cluster. See '%s manage --help'.", c.App.Name)
}
discovery := createDiscovery(c)
s, err := strategy.New(c.String("strategy"))
if err != nil {
@ -119,7 +189,7 @@ func manage(c *cli.Context) {
sched := scheduler.New(s, fs)
cluster, err := swarm.NewCluster(sched, store, tlsConfig, dflag, c.StringSlice("cluster-opt"))
cluster, err := swarm.NewCluster(sched, store, tlsConfig, discovery, c.StringSlice("cluster-opt"))
if err != nil {
log.Fatal(err)
}
@ -129,5 +199,15 @@ func manage(c *cli.Context) {
if c.IsSet("host") || c.IsSet("H") {
hosts = hosts[1:]
}
log.Fatal(api.ListenAndServe(cluster, hosts, c.Bool("cors"), tlsConfig))
server := api.NewServer(hosts, tlsConfig)
router := api.NewRouter(cluster, tlsConfig, c.Bool("cors"))
if c.Bool("leader-election") {
setupLeaderElection(server, router, discovery, c.String("addr"), tlsConfig)
} else {
server.SetHandler(router)
}
log.Fatal(server.ListenAndServe())
}

View File

@ -8,7 +8,6 @@ import (
"sort"
"strings"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/stringid"
@ -36,42 +35,22 @@ type Cluster struct {
}
// NewCluster is exported
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *tls.Config, dflag string, options cluster.DriverOpts) (cluster.Cluster, error) {
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *tls.Config, discovery discovery.Discovery, options cluster.DriverOpts) (cluster.Cluster, error) {
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")
var (
err error
)
cluster := &Cluster{
engines: make(map[string]*cluster.Engine),
scheduler: scheduler,
store: store,
overcommitRatio: 0.05,
TLSConfig: TLSConfig,
discovery: discovery,
overcommitRatio: 0.05,
}
if val, ok := options.Float("swarm.overcommit", ""); ok {
cluster.overcommitRatio = val
}
heartbeat := 25 * time.Second
if opt, ok := options.String("swarm.discovery.heartbeat", ""); ok {
h, err := time.ParseDuration(opt)
if err != nil {
return nil, err
}
if h < 1*time.Second {
return nil, fmt.Errorf("invalid heartbeat %s: must be at least 1s", opt)
}
heartbeat = h
}
// Set up discovery.
cluster.discovery, err = discovery.New(dflag, heartbeat, 0)
if err != nil {
log.Fatal(err)
}
discoveryCh, errCh := cluster.discovery.Watch(nil)
go cluster.monitorDiscovery(discoveryCh, errCh)

View File

@ -131,3 +131,8 @@ func (s *Discovery) Register(addr string) error {
opts := &store.WriteOptions{Ephemeral: true, Heartbeat: s.heartbeat}
return s.store.Put(path.Join(s.prefix, addr), []byte(addr), opts)
}
// Store returns the underlying store used by KV discovery.
func (s *Discovery) Store() store.Store {
return s.store
}

View File

@ -19,11 +19,12 @@ if err != nil {
underwood := leadership.NewCandidate(client, "service/swarm/leader", "underwood")
underwood.RunForElection()
for elected := range underwood.ElectedCh {
electedCh := underwood.ElectedCh()
for isElected := range rlectedCh {
// This loop will run every time there is a change in our leadership
// status.
if elected {
if isElected {
// We won the election - we are now the leader.
// Let's do leader stuff, for example, sleep for a while.
log.Printf("I won the election! I'm now the leader")
@ -48,7 +49,8 @@ there is a change in leadership:
```go
follower := leadership.NewFollower(client, "service/swarm/leader")
follower.FollowElection()
for leader := <-follower.LeaderCh {
leaderCh := follower.LeaderCh()
for leader := <-leaderCh {
// Leader is a string containing the value passed to `NewCandidate`.
log.Printf("%s is now the leader", leader)
}

View File

@ -9,16 +9,15 @@ import (
// Candidate runs the leader election algorithm asynchronously
type Candidate struct {
ElectedCh chan bool
client store.Store
key string
node string
lock sync.Mutex
leader bool
stopCh chan struct{}
resignCh chan bool
electedCh chan bool
lock sync.Mutex
leader bool
stopCh chan struct{}
resignCh chan bool
}
// NewCandidate creates a new Candidate
@ -28,13 +27,20 @@ func NewCandidate(client store.Store, key, node string) *Candidate {
key: key,
node: node,
ElectedCh: make(chan bool),
electedCh: make(chan bool),
leader: false,
resignCh: make(chan bool),
stopCh: make(chan struct{}),
}
}
// ElectedCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it.
func (c *Candidate) ElectedCh() <-chan bool {
return c.electedCh
}
// RunForElection starts the leader election algorithm. Updates in status are
// pushed through the ElectedCh channel.
func (c *Candidate) RunForElection() error {
@ -70,12 +76,12 @@ func (c *Candidate) update(status bool) {
c.lock.Lock()
defer c.lock.Unlock()
c.ElectedCh <- status
c.electedCh <- status
c.leader = status
}
func (c *Candidate) campaign(lock store.Locker) {
defer close(c.ElectedCh)
defer close(c.electedCh)
for {
// Start as a follower.

View File

@ -24,30 +24,31 @@ func TestCandidate(t *testing.T) {
candidate := NewCandidate(store, "test_key", "test_node")
candidate.RunForElection()
electedCh := candidate.ElectedCh()
// Should issue a false upon start, no matter what.
assert.False(t, <-candidate.ElectedCh)
assert.False(t, <-electedCh)
// Since the lock always succeeeds, we should get elected.
assert.True(t, <-candidate.ElectedCh)
assert.True(t, <-electedCh)
// Signaling a lost lock should get us de-elected...
close(lostCh)
assert.False(t, <-candidate.ElectedCh)
assert.False(t, <-electedCh)
// And we should attempt to get re-elected again.
assert.True(t, <-candidate.ElectedCh)
assert.True(t, <-electedCh)
// When we resign, unlock will get called, we'll be notified of the
// de-election and we'll try to get the lock again.
go candidate.Resign()
assert.False(t, <-candidate.ElectedCh)
assert.True(t, <-candidate.ElectedCh)
assert.False(t, <-electedCh)
assert.True(t, <-electedCh)
// After stopping the candidate, the ElectedCh should be closed.
candidate.Stop()
select {
case <-candidate.ElectedCh:
case <-electedCh:
assert.True(t, false) // we should not get here.
default:
assert.True(t, true)

View File

@ -5,26 +5,30 @@ import "github.com/docker/swarm/pkg/store"
// Follower can folow an election in real-time and push notifications whenever
// there is a change in leadership.
type Follower struct {
LeaderCh chan string
client store.Store
key string
stopCh chan struct{}
leaderCh chan string
stopCh chan struct{}
}
// NewFollower creates a new follower.
func NewFollower(client store.Store, key string) *Follower {
return &Follower{
LeaderCh: make(chan string),
client: client,
key: key,
leaderCh: make(chan string),
stopCh: make(chan struct{}),
}
}
// FollowElection starts monitoring the election. The current leader is updated
// in real-time and pushed through `LeaderCh`.
// LeaderCh is used to get a channel which delivers the currently elected
// leader.
func (f *Follower) LeaderCh() <-chan string {
return f.leaderCh
}
// FollowElection starts monitoring the election.
func (f *Follower) FollowElection() error {
ch, err := f.client.Watch(f.key, f.stopCh)
if err != nil {
@ -42,7 +46,7 @@ func (f *Follower) Stop() {
}
func (f *Follower) follow(<-chan *store.KVPair) {
defer close(f.LeaderCh)
defer close(f.leaderCh)
// FIXME: We should pass `RequireConsistent: true` to Consul.
ch, err := f.client.Watch(f.key, f.stopCh)
@ -57,6 +61,6 @@ func (f *Follower) follow(<-chan *store.KVPair) {
continue
}
prev = curr
f.LeaderCh <- string(curr)
f.leaderCh <- string(curr)
}
}

View File

@ -20,6 +20,7 @@ func TestFollower(t *testing.T) {
follower := NewFollower(store, "test_key")
follower.FollowElection()
leaderCh := follower.LeaderCh()
// Simulate leader updates
go func() {
@ -30,14 +31,14 @@ func TestFollower(t *testing.T) {
}()
// We shouldn't see duplicate events.
assert.Equal(t, <-follower.LeaderCh, "leader1")
assert.Equal(t, <-follower.LeaderCh, "leader2")
assert.Equal(t, <-follower.LeaderCh, "leader1")
assert.Equal(t, <-leaderCh, "leader1")
assert.Equal(t, <-leaderCh, "leader2")
assert.Equal(t, <-leaderCh, "leader1")
// Once stopped, iteration over the leader channel should stop.
follower.Stop()
close(kvCh)
assert.Equal(t, "", <-follower.LeaderCh)
assert.Equal(t, "", <-leaderCh)
mockStore.AssertExpectations(t)
}

View File

@ -94,7 +94,7 @@ function swarm_manage() {
discovery="$@"
fi
"$SWARM_BINARY" -l debug manage -H "$SWARM_HOST" --cluster-opt "swarm.discovery.heartbeat=1s" "$discovery" &
"$SWARM_BINARY" -l debug manage -H "$SWARM_HOST" --heartbeat=1s "$discovery" &
SWARM_PID=$!
wait_until_reachable "$SWARM_HOST"
}