cluster API instead of scheduler API

Signed-off-by: Victor Vieux <vieux@docker.com>
This commit is contained in:
Victor Vieux 2015-02-13 01:44:08 +00:00
parent 126f550317
commit ce98e66c63
35 changed files with 411 additions and 902 deletions

View File

@ -80,7 +80,7 @@ other discovery services.
## Advanced Scheduling
See [filters](filter) and [strategies](strategy) to learn
See [filters](scheduler/filter) and [strategies](scheduler/strategy) to learn
more about advanced scheduling.
## TLS

View File

@ -15,8 +15,8 @@ import (
log "github.com/Sirupsen/logrus"
dockerfilters "github.com/docker/docker/pkg/parsers/filters"
"github.com/docker/docker/pkg/units"
"github.com/docker/swarm/filter"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/version"
"github.com/gorilla/mux"
"github.com/samalba/dockerclient"
@ -25,7 +25,7 @@ import (
const APIVERSION = "1.16"
type context struct {
scheduler scheduler.Scheduler
cluster cluster.Cluster
eventsHandler *eventsHandler
debug bool
tlsConfig *tls.Config
@ -35,7 +35,7 @@ type handler func(c *context, w http.ResponseWriter, r *http.Request)
// GET /info
func getInfo(c *context, w http.ResponseWriter, r *http.Request) {
nodes := c.scheduler.Nodes()
nodes := c.cluster.Nodes()
driverStatus := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(nodes))}}
for _, node := range nodes {
@ -50,7 +50,7 @@ func getInfo(c *context, w http.ResponseWriter, r *http.Request) {
NEventsListener int
Debug bool
}{
len(c.scheduler.Containers()),
len(c.cluster.Containers()),
driverStatus,
c.eventsHandler.Size(),
c.debug,
@ -98,7 +98,7 @@ func getImagesJSON(c *context, w http.ResponseWriter, r *http.Request) {
accepteds, _ := filters["node"]
images := []*dockerclient.Image{}
for _, node := range c.scheduler.Nodes() {
for _, node := range c.cluster.Nodes() {
if len(accepteds) != 0 {
found := false
for _, accepted := range accepteds {
@ -132,7 +132,7 @@ func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) {
all := r.Form.Get("all") == "1"
out := []*dockerclient.Container{}
for _, container := range c.scheduler.Containers() {
for _, container := range c.cluster.Containers() {
tmp := (*container).Container
// Skip stopped containers unless -a was specified.
if !strings.Contains(tmp.Status, "Up") && !all {
@ -170,7 +170,7 @@ func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) {
// GET /containers/{name:.*}/json
func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
container := c.scheduler.Container(name)
container := c.cluster.Container(name)
if container == nil {
httpError(w, fmt.Sprintf("No such container %s", name), http.StatusNotFound)
return
@ -222,12 +222,12 @@ func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) {
return
}
if container := c.scheduler.Container(name); container != nil {
if container := c.cluster.Container(name); container != nil {
httpError(w, fmt.Sprintf("Conflict, The name %s is already assigned to %s. You have to delete (or rename) that container to be able to assign %s to a container again.", name, container.Id, name), http.StatusConflict)
return
}
container, err := c.scheduler.CreateContainer(&config, name)
container, err := c.cluster.CreateContainer(&config, name)
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
@ -248,12 +248,12 @@ func deleteContainer(c *context, w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
force := r.Form.Get("force") == "1"
container := c.scheduler.Container(name)
container := c.cluster.Container(name)
if container == nil {
httpError(w, fmt.Sprintf("Container %s not found", name), http.StatusNotFound)
return
}
if err := c.scheduler.RemoveContainer(container, force); err != nil {
if err := c.cluster.RemoveContainer(container, force); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
@ -316,7 +316,7 @@ func proxyContainer(c *context, w http.ResponseWriter, r *http.Request) {
func proxyImage(c *context, w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
for _, node := range c.scheduler.Nodes() {
for _, node := range c.cluster.Nodes() {
if node.Image(name) != nil {
proxy(c.tlsConfig, node.Addr, w, r)
return
@ -327,7 +327,7 @@ func proxyImage(c *context, w http.ResponseWriter, r *http.Request) {
// Proxy a request to a random node
func proxyRandom(c *context, w http.ResponseWriter, r *http.Request) {
candidates := c.scheduler.Nodes()
candidates := c.cluster.Nodes()
healthFilter := &filter.HealthFilter{}
accepted, err := healthFilter.Filter(nil, candidates)

View File

@ -1,464 +0,0 @@
package api
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"runtime"
"sort"
"strings"
log "github.com/Sirupsen/logrus"
dockerfilters "github.com/docker/docker/pkg/parsers/filters"
<<<<<<< HEAD
"github.com/docker/docker/pkg/units"
"github.com/docker/swarm/cluster"
=======
>>>>>>> initial mesos.go file full fo TODOs
"github.com/docker/swarm/filter"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/version"
"github.com/gorilla/mux"
"github.com/samalba/dockerclient"
)
const APIVERSION = "1.16"
type context struct {
scheduler scheduler.Scheduler
eventsHandler *eventsHandler
debug bool
tlsConfig *tls.Config
}
type handler func(c *context, w http.ResponseWriter, r *http.Request)
// GET /info
func getInfo(c *context, w http.ResponseWriter, r *http.Request) {
nodes := c.scheduler.Nodes()
driverStatus := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(nodes))}}
for _, node := range nodes {
driverStatus = append(driverStatus, [2]string{node.Name, node.Addr})
driverStatus = append(driverStatus, [2]string{" └ Containers", fmt.Sprintf("%d", len(node.Containers()))})
driverStatus = append(driverStatus, [2]string{" └ Reserved CPUs", fmt.Sprintf("%d / %d", node.ReservedCpus(), node.Cpus)})
driverStatus = append(driverStatus, [2]string{" └ Reserved Memory", fmt.Sprintf("%s / %s", units.BytesSize(float64(node.ReservedMemory())), units.BytesSize(float64(node.Memory)))})
}
info := struct {
Containers int
DriverStatus [][2]string
NEventsListener int
Debug bool
}{
len(c.scheduler.Containers()),
driverStatus,
c.eventsHandler.Size(),
c.debug,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(info)
}
// GET /version
func getVersion(c *context, w http.ResponseWriter, r *http.Request) {
version := struct {
Version string
ApiVersion string
GoVersion string
GitCommit string
Os string
Arch string
}{
Version: "swarm/" + version.VERSION,
ApiVersion: APIVERSION,
GoVersion: runtime.Version(),
GitCommit: version.GITCOMMIT,
Os: runtime.GOOS,
Arch: runtime.GOARCH,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(version)
}
// GET /images/json
func getImagesJSON(c *context, w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
filters, err := dockerfilters.FromParam(r.Form.Get("filters"))
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
accepteds, _ := filters["node"]
images := []*dockerclient.Image{}
for _, node := range c.scheduler.Nodes() {
if len(accepteds) != 0 {
found := false
for _, accepted := range accepteds {
if accepted == node.Name || accepted == node.ID {
found = true
break
}
}
if !found {
continue
}
}
for _, image := range node.Images() {
images = append(images, image)
}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(images)
}
// GET /containers/ps
// GET /containers/json
func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
all := r.Form.Get("all") == "1"
out := []*dockerclient.Container{}
for _, container := range c.scheduler.Containers() {
tmp := (*container).Container
// Skip stopped containers unless -a was specified.
if !strings.Contains(tmp.Status, "Up") && !all {
continue
}
// Skip swarm containers unless -a was specified.
if strings.Split(tmp.Image, ":")[0] == "swarm" && !all {
continue
}
if !container.Node.IsHealthy() {
tmp.Status = "Pending"
}
// TODO remove the Node Name in the name when we have a good solution
tmp.Names = make([]string, len(container.Names))
for i, name := range container.Names {
tmp.Names[i] = "/" + container.Node.Name + name
}
// insert node IP
tmp.Ports = make([]dockerclient.Port, len(container.Ports))
for i, port := range container.Ports {
tmp.Ports[i] = port
if port.IP == "0.0.0.0" {
tmp.Ports[i].IP = container.Node.IP
}
}
out = append(out, &tmp)
}
sort.Sort(sort.Reverse(ContainerSorter(out)))
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(out)
}
// GET /containers/{name:.*}/json
func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
container := c.scheduler.Container(name)
if container == nil {
httpError(w, fmt.Sprintf("No such container %s", name), http.StatusNotFound)
return
}
client, scheme := newClientAndScheme(c.tlsConfig)
resp, err := client.Get(scheme + "://" + container.Node.Addr + "/containers/" + container.Id + "/json")
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
// cleanup
defer resp.Body.Close()
defer closeIdleConnections(client)
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
n, err := json.Marshal(container.Node)
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
// insert Node field
data = bytes.Replace(data, []byte("\"Name\":\"/"), []byte(fmt.Sprintf("\"Node\":%s,\"Name\":\"/", n)), -1)
// insert node IP
data = bytes.Replace(data, []byte("\"HostIp\":\"0.0.0.0\""), []byte(fmt.Sprintf("\"HostIp\":%q", container.Node.IP)), -1)
w.Header().Set("Content-Type", "application/json")
w.Write(data)
}
// POST /containers/create
func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) {
r.ParseForm()
var (
config dockerclient.ContainerConfig
name = r.Form.Get("name")
)
if err := json.NewDecoder(r.Body).Decode(&config); err != nil {
httpError(w, err.Error(), http.StatusBadRequest)
return
}
if container := c.scheduler.Container(name); container != nil {
httpError(w, fmt.Sprintf("Conflict, The name %s is already assigned to %s. You have to delete (or rename) that container to be able to assign %s to a container again.", name, container.Id, name), http.StatusConflict)
return
}
container, err := c.scheduler.CreateContainer(&config, name)
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
fmt.Fprintf(w, "{%q:%q}", "Id", container.Id)
return
}
// DELETE /containers/{name:.*}
func deleteContainer(c *context, w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
name := mux.Vars(r)["name"]
force := r.Form.Get("force") == "1"
container := c.scheduler.Container(name)
if container == nil {
httpError(w, fmt.Sprintf("Container %s not found", name), http.StatusNotFound)
return
}
if err := c.scheduler.RemoveContainer(container, force); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
// GET /events
func getEvents(c *context, w http.ResponseWriter, r *http.Request) {
c.eventsHandler.Add(r.RemoteAddr, w)
w.Header().Set("Content-Type", "application/json")
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
c.eventsHandler.Wait(r.RemoteAddr)
}
// GET /_ping
func ping(c *context, w http.ResponseWriter, r *http.Request) {
w.Write([]byte{'O', 'K'})
}
// Proxy a request to the right node and do a force refresh
func proxyContainerAndForceRefresh(c *context, w http.ResponseWriter, r *http.Request) {
container, err := getContainerFromVars(c, mux.Vars(r))
if err != nil {
httpError(w, err.Error(), http.StatusNotFound)
return
}
cb := func(resp *http.Response) {
if resp.StatusCode == http.StatusCreated {
log.Debugf("[REFRESH CONTAINER] --> %s", container.Id)
container.Node.RefreshContainer(container.Id, true)
}
}
if err := proxyAsync(c.tlsConfig, container.Node.Addr, w, r, cb); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
// Proxy a request to the right node
func proxyContainer(c *context, w http.ResponseWriter, r *http.Request) {
container, err := getContainerFromVars(c, mux.Vars(r))
if err != nil {
httpError(w, err.Error(), http.StatusNotFound)
return
}
if err := proxy(c.tlsConfig, container.Node.Addr, w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
// Proxy a request to the right node
func proxyImage(c *context, w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
for _, node := range c.scheduler.Nodes() {
if node.Image(name) != nil {
proxy(c.tlsConfig, node.Addr, w, r)
return
}
}
httpError(w, fmt.Sprintf("No such image: %s", name), http.StatusNotFound)
}
// Proxy a request to a random node
func proxyRandom(c *context, w http.ResponseWriter, r *http.Request) {
candidates := c.scheduler.Nodes()
healthFilter := &filter.HealthFilter{}
accepted, err := healthFilter.Filter(nil, candidates)
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
if err := proxy(c.tlsConfig, accepted[rand.Intn(len(accepted))].Addr, w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
// Proxy a hijack request to the right node
func proxyHijack(c *context, w http.ResponseWriter, r *http.Request) {
container, err := getContainerFromVars(c, mux.Vars(r))
if err != nil {
httpError(w, err.Error(), http.StatusNotFound)
return
}
if err := hijack(c.tlsConfig, container.Node.Addr, w, r); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}
}
// Default handler for methods not supported by clustering.
func notImplementedHandler(c *context, w http.ResponseWriter, r *http.Request) {
httpError(w, "Not supported in clustering mode.", http.StatusNotImplemented)
}
func optionsHandler(c *context, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
func writeCorsHeaders(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Access-Control-Allow-Origin", "*")
w.Header().Add("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept")
w.Header().Add("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT, OPTIONS")
}
func httpError(w http.ResponseWriter, err string, status int) {
log.WithField("status", status).Errorf("HTTP error: %v", err)
http.Error(w, err, status)
}
func createRouter(c *context, enableCors bool) *mux.Router {
r := mux.NewRouter()
m := map[string]map[string]handler{
"GET": {
"/_ping": ping,
"/events": getEvents,
"/info": getInfo,
"/version": getVersion,
"/images/json": getImagesJSON,
"/images/viz": notImplementedHandler,
"/images/search": proxyRandom,
"/images/get": notImplementedHandler,
"/images/{name:.*}/get": notImplementedHandler,
"/images/{name:.*}/history": proxyImage,
"/images/{name:.*}/json": proxyImage,
"/containers/ps": getContainersJSON,
"/containers/json": getContainersJSON,
"/containers/{name:.*}/export": proxyContainer,
"/containers/{name:.*}/changes": proxyContainer,
"/containers/{name:.*}/json": getContainerJSON,
"/containers/{name:.*}/top": proxyContainer,
"/containers/{name:.*}/logs": proxyContainer,
"/containers/{name:.*}/stats": proxyContainer,
"/containers/{name:.*}/attach/ws": notImplementedHandler,
"/exec/{execid:.*}/json": proxyContainer,
},
"POST": {
"/auth": proxyRandom,
"/commit": notImplementedHandler,
"/build": notImplementedHandler,
"/images/create": notImplementedHandler,
"/images/load": notImplementedHandler,
"/images/{name:.*}/push": notImplementedHandler,
"/images/{name:.*}/tag": notImplementedHandler,
"/containers/create": postContainersCreate,
"/containers/{name:.*}/kill": proxyContainer,
"/containers/{name:.*}/pause": proxyContainer,
"/containers/{name:.*}/unpause": proxyContainer,
"/containers/{name:.*}/rename": proxyContainer,
"/containers/{name:.*}/restart": proxyContainer,
"/containers/{name:.*}/start": proxyContainer,
"/containers/{name:.*}/stop": proxyContainer,
"/containers/{name:.*}/wait": proxyContainer,
"/containers/{name:.*}/resize": proxyContainer,
"/containers/{name:.*}/attach": proxyHijack,
"/containers/{name:.*}/copy": proxyContainer,
"/containers/{name:.*}/exec": proxyContainerAndForceRefresh,
"/exec/{execid:.*}/start": proxyHijack,
"/exec/{execid:.*}/resize": proxyContainer,
},
"DELETE": {
"/containers/{name:.*}": deleteContainer,
"/images/{name:.*}": notImplementedHandler,
},
"OPTIONS": {
"": optionsHandler,
},
}
for method, routes := range m {
for route, fct := range routes {
log.WithFields(log.Fields{"method": method, "route": route}).Debug("Registering HTTP route")
// NOTE: scope issue, make sure the variables are local and won't be changed
localRoute := route
localFct := fct
wrap := func(w http.ResponseWriter, r *http.Request) {
log.WithFields(log.Fields{"method": r.Method, "uri": r.RequestURI}).Info("HTTP request received")
if enableCors {
writeCorsHeaders(w, r)
}
localFct(c, w, r)
}
localMethod := method
// add the new route
r.Path("/v{version:[0-9.]+}" + localRoute).Methods(localMethod).HandlerFunc(wrap)
r.Path(localRoute).Methods(localMethod).HandlerFunc(wrap)
}
}
return r
}

View File

@ -2,18 +2,17 @@ package api
import (
"encoding/json"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/version"
"github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
"testing"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/version"
"github.com/stretchr/testify/assert"
)
func serveRequest(s scheduler.Scheduler, w http.ResponseWriter, req *http.Request) error {
func serveRequest(c cluster.Cluster, w http.ResponseWriter, req *http.Request) error {
context := &context{
scheduler: s,
cluster: c,
}
r := createRouter(context, false)

View File

@ -8,7 +8,7 @@ import (
"strings"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/cluster"
)
const DefaultDockerPort = ":2375"
@ -28,13 +28,13 @@ func newListener(proto, addr string, tlsConfig *tls.Config) (net.Listener, error
return l, nil
}
func ListenAndServe(s scheduler.Scheduler, hosts []string, enableCors bool, tlsConfig *tls.Config) error {
func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfig *tls.Config) error {
context := &context{
scheduler: s,
cluster: c,
eventsHandler: NewEventsHandler(),
tlsConfig: tlsConfig,
}
s.Events(context.eventsHandler)
c.Events(context.eventsHandler)
r := createRouter(context, enableCors)
chErrors := make(chan error, len(hosts))

View File

@ -22,14 +22,14 @@ func newClientAndScheme(tlsConfig *tls.Config) (*http.Client, string) {
func getContainerFromVars(c *context, vars map[string]string) (*cluster.Container, error) {
if name, ok := vars["name"]; ok {
if container := c.scheduler.Container(name); container != nil {
if container := c.cluster.Container(name); container != nil {
return container, nil
}
return nil, fmt.Errorf("No such container: %s", name)
}
if ID, ok := vars["execid"]; ok {
for _, container := range c.scheduler.Containers() {
for _, container := range c.cluster.Containers() {
for _, execID := range container.Info.ExecIDs {
if ID == execID {
return container, nil

View File

@ -1,141 +1,17 @@
package cluster
import (
"errors"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/state"
"github.com/docker/swarm/discovery"
"github.com/samalba/dockerclient"
)
var (
ErrNodeNotConnected = errors.New("node is not connected to docker's REST API")
ErrNodeAlreadyRegistered = errors.New("node was already added to the cluster")
)
type Cluster interface {
CreateContainer(config *dockerclient.ContainerConfig, name string) (*Container, error)
RemoveContainer(container *Container, force bool) error
type Cluster struct {
sync.RWMutex
store *state.Store
eventHandlers []EventHandler
nodes map[string]*Node
}
func NewCluster(store *state.Store) *Cluster {
return &Cluster{
nodes: make(map[string]*Node),
store: store,
}
}
// Commit the requested state in the store.
func (c *Cluster) CommitContainerInStore(Id string, config *dockerclient.ContainerConfig, name string) error {
st := &state.RequestedState{
ID: Id,
Name: name,
Config: config,
}
return c.store.Add(Id, st)
}
// Remove a container from the store.
func (c *Cluster) RemoveContainerFromStore(Id string, force bool) error {
if err := c.store.Remove(Id); err != nil {
if err == state.ErrNotFound {
log.Debugf("Container %s not found in the store", Id)
return nil
}
return err
}
return nil
}
func (c *Cluster) Handle(e *Event) error {
for _, eventHandler := range c.eventHandlers {
if err := eventHandler.Handle(e); err != nil {
log.Error(err)
}
}
return nil
}
// Register a node within the cluster. The node must have been already
// initialized.
func (c *Cluster) AddNode(n *Node) error {
if !n.IsConnected() {
return ErrNodeNotConnected
}
c.Lock()
defer c.Unlock()
if old, exists := c.nodes[n.ID]; exists {
if old.IP != n.IP {
log.Errorf("ID duplicated. %s shared by %s and %s", n.ID, old.IP, n.IP)
}
return ErrNodeAlreadyRegistered
}
c.nodes[n.ID] = n
return n.Events(c)
}
// Containers returns all the containers in the cluster.
func (c *Cluster) Containers() []*Container {
c.RLock()
defer c.RUnlock()
out := []*Container{}
for _, n := range c.nodes {
containers := n.Containers()
for _, container := range containers {
out = append(out, container)
}
}
return out
}
// Container returns the container with IdOrName in the cluster
func (c *Cluster) Container(IdOrName string) *Container {
// Abort immediately if the name is empty.
if len(IdOrName) == 0 {
return nil
}
c.RLock()
defer c.RUnlock()
for _, n := range c.nodes {
if container := n.Container(IdOrName); container != nil {
return container
}
}
return nil
}
// Nodes returns the list of nodes in the cluster
func (c *Cluster) Nodes() []*Node {
nodes := []*Node{}
c.RLock()
for _, node := range c.nodes {
nodes = append(nodes, node)
}
c.RUnlock()
return nodes
}
func (c *Cluster) Node(addr string) *Node {
for _, node := range c.nodes {
if node.Addr == addr {
return node
}
}
return nil
}
func (c *Cluster) Events(h EventHandler) error {
c.eventHandlers = append(c.eventHandlers, h)
return nil
Events(eventsHandler EventHandler)
Nodes() []*Node
Containers() []*Container
Container(IdOrName string) *Container
NewEntries(entries []*discovery.Entry)
}

87
cluster/mesos/mesos.go Normal file
View File

@ -0,0 +1,87 @@
package mesos
import (
"errors"
"sync"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery"
"github.com/samalba/dockerclient"
)
var ErrNotImplemented = errors.New("not implemented in the mesos cluster")
type MesosCluster struct {
sync.Mutex
//TODO: list of mesos masters
//TODO: list of offers
nodes *cluster.Nodes
options *cluster.Options
}
func NewCluster(options *cluster.Options) cluster.Cluster {
log.WithFields(log.Fields{"name": "mesos"}).Debug("Initializing cluster")
return &MesosCluster{
nodes: cluster.NewNodes(),
options: options,
}
}
// Schedule a brand new container into the cluster.
func (s *MesosCluster) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) {
s.Lock()
defer s.Unlock()
//TODO: pick the right offer (using strategy & filters ???)
//TODO: LaunchTask on the Mesos cluster and get container
//TODO: Store container in store ??
return nil, ErrNotImplemented
}
// Remove a container from the cluster. Containers should always be destroyed
// through the scheduler to guarantee atomicity.
func (s *MesosCluster) RemoveContainer(container *cluster.Container, force bool) error {
s.Lock()
defer s.Unlock()
//TODO: KillTask
//TODO: remove container from store ??
return ErrNotImplemented
}
// Entries are Mesos masters
func (s *MesosCluster) NewEntries(entries []*discovery.Entry) {
//TODO: get list of actual docker nodes from mesos masters
// - cluster.NewNode(m.String(), s.options.OvercommitRatio)
//TODO: create direct connection to those nodes
// - n.Connect(s.options.TLSConfig)
//TODO: add them to the cluster
// - s.nodes.Add(n)
}
func (s *MesosCluster) Events(eventsHandler cluster.EventHandler) {
s.nodes.Events(eventsHandler)
}
func (s *MesosCluster) Nodes() []*cluster.Node {
return s.nodes.List()
}
func (s *MesosCluster) Containers() []*cluster.Container {
return s.nodes.Containers()
}
func (s *MesosCluster) Container(IdOrName string) *cluster.Container {
return s.nodes.Container(IdOrName)
}

115
cluster/nodes.go Normal file
View File

@ -0,0 +1,115 @@
package cluster
import (
"errors"
"sync"
log "github.com/Sirupsen/logrus"
)
var (
ErrNodeNotConnected = errors.New("node is not connected to docker's REST API")
ErrNodeAlreadyRegistered = errors.New("node was already")
)
type Nodes struct {
sync.RWMutex
eventHandlers []EventHandler
nodes map[string]*Node
}
func NewNodes() *Nodes {
return &Nodes{
nodes: make(map[string]*Node),
}
}
func (c *Nodes) Handle(e *Event) error {
for _, eventHandler := range c.eventHandlers {
if err := eventHandler.Handle(e); err != nil {
log.Error(err)
}
}
return nil
}
// Register a node within the cluster. The node must have been already
// initialized.
func (c *Nodes) Add(n *Node) error {
if !n.IsConnected() {
return ErrNodeNotConnected
}
c.Lock()
defer c.Unlock()
if old, exists := c.nodes[n.ID]; exists {
if old.IP != n.IP {
log.Errorf("ID duplicated. %s shared by %s and %s", n.ID, old.IP, n.IP)
}
return ErrNodeAlreadyRegistered
}
c.nodes[n.ID] = n
return n.Events(c)
}
// Containers returns all the containers in the cluster.
func (c *Nodes) Containers() []*Container {
c.Lock()
defer c.Unlock()
out := []*Container{}
for _, n := range c.nodes {
containers := n.Containers()
for _, container := range containers {
out = append(out, container)
}
}
return out
}
// Container returns the container with IdOrName in the cluster
func (c *Nodes) Container(IdOrName string) *Container {
// Abort immediately if the name is empty.
if len(IdOrName) == 0 {
return nil
}
c.RLock()
defer c.RUnlock()
for _, n := range c.nodes {
if container := n.Container(IdOrName); container != nil {
return container
}
}
return nil
}
// Nodes returns the list of nodes in the cluster
func (c *Nodes) List() []*Node {
nodes := []*Node{}
c.RLock()
for _, node := range c.nodes {
nodes = append(nodes, node)
}
c.RUnlock()
return nodes
}
func (c *Nodes) Get(addr string) *Node {
for _, node := range c.nodes {
if node.Addr == addr {
return node
}
}
return nil
}
func (c *Nodes) Events(h EventHandler) error {
c.eventHandlers = append(c.eventHandlers, h)
return nil
}

View File

@ -1,10 +1,8 @@
package cluster
import (
"io/ioutil"
"testing"
"github.com/docker/swarm/state"
"github.com/samalba/dockerclient"
"github.com/samalba/dockerclient/mockclient"
"github.com/stretchr/testify/assert"
@ -34,39 +32,33 @@ func createNode(t *testing.T, ID string, containers ...dockerclient.Container) *
return node
}
func newCluster(t *testing.T) *Cluster {
dir, err := ioutil.TempDir("", "store-test")
assert.NoError(t, err)
return NewCluster(state.NewStore(dir))
}
func TestAdd(t *testing.T) {
c := NewNodes()
assert.Equal(t, len(c.List()), 0)
assert.Nil(t, c.Get("test"))
assert.Nil(t, c.Get("test2"))
func TestAddNode(t *testing.T) {
c := newCluster(t)
assert.Equal(t, len(c.Nodes()), 0)
assert.Nil(t, c.Node("test"))
assert.Nil(t, c.Node("test2"))
assert.NoError(t, c.Add(createNode(t, "test")))
assert.Equal(t, len(c.List()), 1)
assert.NotNil(t, c.Get("test"))
assert.NoError(t, c.AddNode(createNode(t, "test")))
assert.Equal(t, len(c.Nodes()), 1)
assert.NotNil(t, c.Node("test"))
assert.Error(t, c.Add(createNode(t, "test")))
assert.Equal(t, len(c.List()), 1)
assert.NotNil(t, c.Get("test"))
assert.Error(t, c.AddNode(createNode(t, "test")))
assert.Equal(t, len(c.Nodes()), 1)
assert.NotNil(t, c.Node("test"))
assert.NoError(t, c.AddNode(createNode(t, "test2")))
assert.Equal(t, len(c.Nodes()), 2)
assert.NotNil(t, c.Node("test2"))
assert.NoError(t, c.Add(createNode(t, "test2")))
assert.Equal(t, len(c.List()), 2)
assert.NotNil(t, c.Get("test2"))
}
func TestContainerLookup(t *testing.T) {
c := newCluster(t)
c := NewNodes()
container := dockerclient.Container{
Id: "container-id",
Names: []string{"/container-name1", "/container-name2"},
}
node := createNode(t, "test-node", container)
assert.NoError(t, c.AddNode(node))
assert.NoError(t, c.Add(node))
// Invalid lookup
assert.Nil(t, c.Container("invalid-id"))

8
cluster/options.go Normal file
View File

@ -0,0 +1,8 @@
package cluster
import "crypto/tls"
type Options struct {
TLSConfig *tls.Config
OvercommitRatio float64
}

110
cluster/swarm/swarm.go Normal file
View File

@ -0,0 +1,110 @@
package swarm
import (
"sync"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/state"
"github.com/samalba/dockerclient"
)
type SwarmCluster struct {
sync.RWMutex
nodes *cluster.Nodes
scheduler *scheduler.Scheduler
options *cluster.Options
store *state.Store
}
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, options *cluster.Options) cluster.Cluster {
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")
return &SwarmCluster{
nodes: cluster.NewNodes(),
scheduler: scheduler,
options: options,
store: store,
}
}
// Schedule a brand new container into the cluster.
func (s *SwarmCluster) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) {
s.RLock()
defer s.RUnlock()
node, err := s.scheduler.SelectNodeForContainer(s.nodes.List(), config)
if err != nil {
return nil, err
}
container, err := node.Create(config, name, true)
if err != nil {
return nil, err
}
st := &state.RequestedState{
ID: container.Id,
Name: name,
Config: config,
}
return container, s.store.Add(container.Id, st)
}
// Remove a container from the cluster. Containers should always be destroyed
// through the scheduler to guarantee atomicity.
func (s *SwarmCluster) RemoveContainer(container *cluster.Container, force bool) error {
s.Lock()
defer s.Unlock()
if err := container.Node.Destroy(container, force); err != nil {
return err
}
if err := s.store.Remove(container.Id); err != nil {
if err == state.ErrNotFound {
log.Debugf("Container %s not found in the store", container.Id)
return nil
}
return err
}
return nil
}
// Entries are Docker Nodes
func (s *SwarmCluster) NewEntries(entries []*discovery.Entry) {
for _, entry := range entries {
go func(m *discovery.Entry) {
if s.nodes.Get(m.String()) == nil {
n := cluster.NewNode(m.String(), s.options.OvercommitRatio)
if err := n.Connect(s.options.TLSConfig); err != nil {
log.Error(err)
return
}
if err := s.nodes.Add(n); err != nil {
log.Error(err)
return
}
}
}(entry)
}
}
func (s *SwarmCluster) Events(eventsHandler cluster.EventHandler) {
s.nodes.Events(eventsHandler)
}
func (s *SwarmCluster) Nodes() []*cluster.Node {
return s.nodes.List()
}
func (s *SwarmCluster) Containers() []*cluster.Container {
return s.nodes.Containers()
}
func (s *SwarmCluster) Container(IdOrName string) *cluster.Container {
return s.nodes.Container(IdOrName)
}

View File

@ -94,9 +94,9 @@ var (
Usage: "filter to use [constraint, affinity, health, port, dependency]",
Value: &flFilterValue,
}
flScheduler = cli.StringFlag{
Name: "scheduler, s",
Usage: "scheduler to use [builtin, mesos]",
Value: "builtin",
flCluster = cli.StringFlag{
Name: "cluster, c",
Usage: "cluster to use [swarm, mesos]",
Value: "swarm",
}
)

View File

@ -102,7 +102,7 @@ func main() {
ShortName: "m",
Usage: "manage a docker cluster",
Flags: []cli.Flag{
flStore, flScheduler,
flStore, flCluster,
flStrategy, flFilter,
flHosts, flHeartBeat, flOverCommit,
flTls, flTlsCaCert, flTlsCert, flTlsKey, flTlsVerify,

View File

@ -11,12 +11,13 @@ import (
"github.com/codegangsta/cli"
"github.com/docker/swarm/api"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/cluster/mesos"
"github.com/docker/swarm/cluster/swarm"
"github.com/docker/swarm/discovery"
"github.com/docker/swarm/filter"
"github.com/docker/swarm/scheduler"
"github.com/docker/swarm/scheduler/options"
"github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/scheduler/strategy"
"github.com/docker/swarm/state"
"github.com/docker/swarm/strategy"
)
type logHandler struct {
@ -118,17 +119,22 @@ func manage(c *cli.Context) {
log.Fatal(err)
}
options := &options.SchedulerOptions{
Strategy: s,
Filters: fs,
Store: store,
sched := scheduler.New(s, fs)
options := &cluster.Options{
TLSConfig: tlsConfig,
OvercommitRatio: c.Float64("overcommit"),
}
sched, err := scheduler.New(c.String("scheduler"), options)
if err != nil {
log.Fatal(err)
var cluster cluster.Cluster
switch c.String("cluster") {
case "swarm":
cluster = swarm.NewCluster(sched, store, options)
case "mesos":
cluster = mesos.NewCluster(options)
default:
log.Fatalf("cluster %q not supported", c.String("cluster"))
}
// get the list of entries from the discovery service
@ -143,9 +149,9 @@ func manage(c *cli.Context) {
log.Fatal(err)
}
sched.NewEntries(entries)
cluster.NewEntries(entries)
go d.Watch(sched.NewEntries)
go d.Watch(cluster.NewEntries)
}()
// see https://github.com/codegangsta/cli/issues/160
@ -153,5 +159,5 @@ func manage(c *cli.Context) {
if c.IsSet("host") || c.IsSet("H") {
hosts = hosts[1:]
}
log.Fatal(api.ListenAndServe(sched, hosts, c.Bool("cors"), tlsConfig))
log.Fatal(api.ListenAndServe(cluster, hosts, c.Bool("cors"), tlsConfig))
}

View File

@ -1,100 +0,0 @@
package builtin
import (
"sync"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery"
"github.com/docker/swarm/filter"
"github.com/docker/swarm/scheduler/options"
"github.com/samalba/dockerclient"
)
type BuiltinScheduler struct {
sync.Mutex
cluster *cluster.Cluster
options *options.SchedulerOptions
}
func (s *BuiltinScheduler) Initialize(options *options.SchedulerOptions) {
s.options = options
s.cluster = cluster.NewCluster(s.options.Store)
}
// Schedule a brand new container into the cluster.
func (s *BuiltinScheduler) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) {
s.Lock()
defer s.Unlock()
candidates := s.cluster.Nodes()
// Find a nice home for our container.
accepted, err := filter.ApplyFilters(s.options.Filters, config, candidates)
if err != nil {
return nil, err
}
node, err := s.options.Strategy.PlaceContainer(config, accepted)
if err != nil {
return nil, err
}
container, err := node.Create(config, name, true)
if err != nil {
return nil, err
}
return container, s.cluster.CommitContainerInStore(container.Id, config, name)
}
// Remove a container from the cluster. Containers should always be destroyed
// through the scheduler to guarantee atomicity.
func (s *BuiltinScheduler) RemoveContainer(container *cluster.Container, force bool) error {
s.Lock()
defer s.Unlock()
if err := container.Node.Destroy(container, force); err != nil {
return err
}
return s.cluster.RemoveContainerFromStore(container.Id, force)
}
// Entries are Docker Nodes
func (s *BuiltinScheduler) NewEntries(entries []*discovery.Entry) {
for _, entry := range entries {
go func(m *discovery.Entry) {
if s.cluster.Node(m.String()) == nil {
n := cluster.NewNode(m.String(), s.options.OvercommitRatio)
if err := n.Connect(s.options.TLSConfig); err != nil {
log.Error(err)
return
}
if err := s.cluster.AddNode(n); err != nil {
log.Error(err)
return
}
}
}(entry)
}
}
func (s *BuiltinScheduler) Events(eventsHandler cluster.EventHandler) {
s.cluster.Events(eventsHandler)
}
func (s *BuiltinScheduler) Nodes() []*cluster.Node {
return s.cluster.Nodes()
}
func (s *BuiltinScheduler) Containers() []*cluster.Container {
return s.cluster.Containers()
}
func (s *BuiltinScheduler) Container(IdOrName string) *cluster.Container {
return s.cluster.Container(IdOrName)
}

View File

@ -1,88 +0,0 @@
package mesos
import (
"errors"
"sync"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery"
"github.com/docker/swarm/scheduler/options"
"github.com/samalba/dockerclient"
)
var ErrNotImplemented = errors.New("not implemented in the mesos scheduler")
type MesosScheduler struct {
sync.Mutex
//TODO: list of mesos masters
cluster *cluster.Cluster
options *options.SchedulerOptions
}
func (s *MesosScheduler) Initialize(options *options.SchedulerOptions) {
s.options = options
s.cluster = cluster.NewCluster(s.options.Store)
}
// Schedule a brand new container into the cluster.
func (s *MesosScheduler) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) {
s.Lock()
defer s.Unlock()
//TODO: RequestOffers from mesos master
//TODO: pick the right offer (using strategy & filters ???)
//TODO: LaunchTask on the Mesos cluster and get container
//TODO: Store container in store
// - s.cluster.CommitContainerInStore(container.Id, config, name)
return nil, ErrNotImplemented
}
// Remove a container from the cluster. Containers should always be destroyed
// through the scheduler to guarantee atomicity.
func (s *MesosScheduler) RemoveContainer(container *cluster.Container, force bool) error {
s.Lock()
defer s.Unlock()
//TODO: KillTask
//TODO: remove container
// - s.cluster.RemoveContainerFromStore(container.Id, force)
return ErrNotImplemented
}
// Entries are Mesos masters
func (s *MesosScheduler) NewEntries(entries []*discovery.Entry) {
//TODO: get list of actual docker nodes from mesos masters
// - cluster.NewNode(m.String(), s.options.OvercommitRatio)
//TODO: create direct connection to those nodes
// - n.Connect(s.options.TLSConfig)
//TODO: add them to the cluster
// - s.cluster.AddNode(n)
}
func (s *MesosScheduler) Events(eventsHandler cluster.EventHandler) {
s.cluster.Events(eventsHandler)
}
func (s *MesosScheduler) Nodes() []*cluster.Node {
return s.cluster.Nodes()
}
func (s *MesosScheduler) Containers() []*cluster.Container {
return s.cluster.Containers()
}
func (s *MesosScheduler) Container(IdOrName string) *cluster.Container {
return s.cluster.Container(IdOrName)
}

View File

@ -1,18 +0,0 @@
package options
import (
"crypto/tls"
"github.com/docker/swarm/filter"
"github.com/docker/swarm/state"
"github.com/docker/swarm/strategy"
)
type SchedulerOptions struct {
Strategy strategy.PlacementStrategy
Filters []filter.Filter
Store *state.Store
TLSConfig *tls.Config
OvercommitRatio float64
}

View File

@ -1,44 +1,30 @@
package scheduler
import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery"
"github.com/docker/swarm/scheduler/builtin"
"github.com/docker/swarm/scheduler/mesos"
"github.com/docker/swarm/scheduler/options"
"github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/scheduler/strategy"
"github.com/samalba/dockerclient"
)
type Scheduler interface {
Initialize(options *options.SchedulerOptions)
CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error)
RemoveContainer(container *cluster.Container, force bool) error
Events(eventsHandler cluster.EventHandler)
Nodes() []*cluster.Node
Containers() []*cluster.Container
Container(IdOrName string) *cluster.Container
NewEntries(entries []*discovery.Entry)
type Scheduler struct {
strategy strategy.PlacementStrategy
filters []filter.Filter
}
var schedulers map[string]Scheduler
func init() {
schedulers = map[string]Scheduler{
"builtin": &builtin.BuiltinScheduler{},
"mesos": &mesos.MesosScheduler{},
func New(strategy strategy.PlacementStrategy, filters []filter.Filter) *Scheduler {
return &Scheduler{
strategy: strategy,
filters: filters,
}
}
func New(name string, options *options.SchedulerOptions) (Scheduler, error) {
if scheduler, exists := schedulers[name]; exists {
log.WithField("name", name).Debug("Initializing scheduler")
scheduler.Initialize(options)
return scheduler, nil
// Find a nice home for our container.
func (s *Scheduler) SelectNodeForContainer(nodes []*cluster.Node, config *dockerclient.ContainerConfig) (*cluster.Node, error) {
accepted, err := filter.ApplyFilters(s.filters, config, nodes)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("scheduler %q not supported", name)
return s.strategy.PlaceContainer(config, accepted)
}