diff --git a/README.md b/README.md index 85aeb85aa5..8406ebf551 100644 --- a/README.md +++ b/README.md @@ -80,7 +80,7 @@ other discovery services. ## Advanced Scheduling -See [filters](filter) and [strategies](strategy) to learn +See [filters](scheduler/filter) and [strategies](scheduler/strategy) to learn more about advanced scheduling. ## TLS diff --git a/api/api.go b/api/api.go index 43fab26b4d..415d3832ca 100644 --- a/api/api.go +++ b/api/api.go @@ -15,8 +15,8 @@ import ( log "github.com/Sirupsen/logrus" dockerfilters "github.com/docker/docker/pkg/parsers/filters" "github.com/docker/docker/pkg/units" - "github.com/docker/swarm/filter" - "github.com/docker/swarm/scheduler" + "github.com/docker/swarm/cluster" + "github.com/docker/swarm/scheduler/filter" "github.com/docker/swarm/version" "github.com/gorilla/mux" "github.com/samalba/dockerclient" @@ -25,7 +25,7 @@ import ( const APIVERSION = "1.16" type context struct { - scheduler scheduler.Scheduler + cluster cluster.Cluster eventsHandler *eventsHandler debug bool tlsConfig *tls.Config @@ -35,7 +35,7 @@ type handler func(c *context, w http.ResponseWriter, r *http.Request) // GET /info func getInfo(c *context, w http.ResponseWriter, r *http.Request) { - nodes := c.scheduler.Nodes() + nodes := c.cluster.Nodes() driverStatus := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(nodes))}} for _, node := range nodes { @@ -50,7 +50,7 @@ func getInfo(c *context, w http.ResponseWriter, r *http.Request) { NEventsListener int Debug bool }{ - len(c.scheduler.Containers()), + len(c.cluster.Containers()), driverStatus, c.eventsHandler.Size(), c.debug, @@ -98,7 +98,7 @@ func getImagesJSON(c *context, w http.ResponseWriter, r *http.Request) { accepteds, _ := filters["node"] images := []*dockerclient.Image{} - for _, node := range c.scheduler.Nodes() { + for _, node := range c.cluster.Nodes() { if len(accepteds) != 0 { found := false for _, accepted := range accepteds { @@ -132,7 +132,7 @@ func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) { all := r.Form.Get("all") == "1" out := []*dockerclient.Container{} - for _, container := range c.scheduler.Containers() { + for _, container := range c.cluster.Containers() { tmp := (*container).Container // Skip stopped containers unless -a was specified. if !strings.Contains(tmp.Status, "Up") && !all { @@ -170,7 +170,7 @@ func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) { // GET /containers/{name:.*}/json func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) { name := mux.Vars(r)["name"] - container := c.scheduler.Container(name) + container := c.cluster.Container(name) if container == nil { httpError(w, fmt.Sprintf("No such container %s", name), http.StatusNotFound) return @@ -222,12 +222,12 @@ func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) { return } - if container := c.scheduler.Container(name); container != nil { + if container := c.cluster.Container(name); container != nil { httpError(w, fmt.Sprintf("Conflict, The name %s is already assigned to %s. You have to delete (or rename) that container to be able to assign %s to a container again.", name, container.Id, name), http.StatusConflict) return } - container, err := c.scheduler.CreateContainer(&config, name) + container, err := c.cluster.CreateContainer(&config, name) if err != nil { httpError(w, err.Error(), http.StatusInternalServerError) return @@ -248,12 +248,12 @@ func deleteContainer(c *context, w http.ResponseWriter, r *http.Request) { name := mux.Vars(r)["name"] force := r.Form.Get("force") == "1" - container := c.scheduler.Container(name) + container := c.cluster.Container(name) if container == nil { httpError(w, fmt.Sprintf("Container %s not found", name), http.StatusNotFound) return } - if err := c.scheduler.RemoveContainer(container, force); err != nil { + if err := c.cluster.RemoveContainer(container, force); err != nil { httpError(w, err.Error(), http.StatusInternalServerError) return } @@ -316,7 +316,7 @@ func proxyContainer(c *context, w http.ResponseWriter, r *http.Request) { func proxyImage(c *context, w http.ResponseWriter, r *http.Request) { name := mux.Vars(r)["name"] - for _, node := range c.scheduler.Nodes() { + for _, node := range c.cluster.Nodes() { if node.Image(name) != nil { proxy(c.tlsConfig, node.Addr, w, r) return @@ -327,7 +327,7 @@ func proxyImage(c *context, w http.ResponseWriter, r *http.Request) { // Proxy a request to a random node func proxyRandom(c *context, w http.ResponseWriter, r *http.Request) { - candidates := c.scheduler.Nodes() + candidates := c.cluster.Nodes() healthFilter := &filter.HealthFilter{} accepted, err := healthFilter.Filter(nil, candidates) diff --git a/api/api.go~ b/api/api.go~ deleted file mode 100644 index e7c08536b9..0000000000 --- a/api/api.go~ +++ /dev/null @@ -1,464 +0,0 @@ -package api - -import ( - "bytes" - "crypto/tls" - "encoding/json" - "fmt" - "io/ioutil" - "math/rand" - "net/http" - "runtime" - "sort" - "strings" - - log "github.com/Sirupsen/logrus" - dockerfilters "github.com/docker/docker/pkg/parsers/filters" -<<<<<<< HEAD - "github.com/docker/docker/pkg/units" - "github.com/docker/swarm/cluster" -======= ->>>>>>> initial mesos.go file full fo TODOs - "github.com/docker/swarm/filter" - "github.com/docker/swarm/scheduler" - "github.com/docker/swarm/version" - "github.com/gorilla/mux" - "github.com/samalba/dockerclient" -) - -const APIVERSION = "1.16" - -type context struct { - scheduler scheduler.Scheduler - eventsHandler *eventsHandler - debug bool - tlsConfig *tls.Config -} - -type handler func(c *context, w http.ResponseWriter, r *http.Request) - -// GET /info -func getInfo(c *context, w http.ResponseWriter, r *http.Request) { - nodes := c.scheduler.Nodes() - driverStatus := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(nodes))}} - - for _, node := range nodes { - driverStatus = append(driverStatus, [2]string{node.Name, node.Addr}) - driverStatus = append(driverStatus, [2]string{" └ Containers", fmt.Sprintf("%d", len(node.Containers()))}) - driverStatus = append(driverStatus, [2]string{" └ Reserved CPUs", fmt.Sprintf("%d / %d", node.ReservedCpus(), node.Cpus)}) - driverStatus = append(driverStatus, [2]string{" └ Reserved Memory", fmt.Sprintf("%s / %s", units.BytesSize(float64(node.ReservedMemory())), units.BytesSize(float64(node.Memory)))}) - } - info := struct { - Containers int - DriverStatus [][2]string - NEventsListener int - Debug bool - }{ - len(c.scheduler.Containers()), - driverStatus, - c.eventsHandler.Size(), - c.debug, - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(info) -} - -// GET /version -func getVersion(c *context, w http.ResponseWriter, r *http.Request) { - version := struct { - Version string - ApiVersion string - GoVersion string - GitCommit string - Os string - Arch string - }{ - Version: "swarm/" + version.VERSION, - ApiVersion: APIVERSION, - GoVersion: runtime.Version(), - GitCommit: version.GITCOMMIT, - Os: runtime.GOOS, - Arch: runtime.GOARCH, - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(version) -} - -// GET /images/json -func getImagesJSON(c *context, w http.ResponseWriter, r *http.Request) { - if err := r.ParseForm(); err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - return - } - - filters, err := dockerfilters.FromParam(r.Form.Get("filters")) - if err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - return - } - - accepteds, _ := filters["node"] - images := []*dockerclient.Image{} - - for _, node := range c.scheduler.Nodes() { - if len(accepteds) != 0 { - found := false - for _, accepted := range accepteds { - if accepted == node.Name || accepted == node.ID { - found = true - break - } - } - if !found { - continue - } - } - - for _, image := range node.Images() { - images = append(images, image) - } - } - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(images) -} - -// GET /containers/ps -// GET /containers/json -func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) { - if err := r.ParseForm(); err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - return - } - - all := r.Form.Get("all") == "1" - - out := []*dockerclient.Container{} - for _, container := range c.scheduler.Containers() { - tmp := (*container).Container - // Skip stopped containers unless -a was specified. - if !strings.Contains(tmp.Status, "Up") && !all { - continue - } - // Skip swarm containers unless -a was specified. - if strings.Split(tmp.Image, ":")[0] == "swarm" && !all { - continue - } - if !container.Node.IsHealthy() { - tmp.Status = "Pending" - } - // TODO remove the Node Name in the name when we have a good solution - tmp.Names = make([]string, len(container.Names)) - for i, name := range container.Names { - tmp.Names[i] = "/" + container.Node.Name + name - } - // insert node IP - tmp.Ports = make([]dockerclient.Port, len(container.Ports)) - for i, port := range container.Ports { - tmp.Ports[i] = port - if port.IP == "0.0.0.0" { - tmp.Ports[i].IP = container.Node.IP - } - } - out = append(out, &tmp) - } - - sort.Sort(sort.Reverse(ContainerSorter(out))) - - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(out) -} - -// GET /containers/{name:.*}/json -func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) { - name := mux.Vars(r)["name"] - container := c.scheduler.Container(name) - if container == nil { - httpError(w, fmt.Sprintf("No such container %s", name), http.StatusNotFound) - return - } - client, scheme := newClientAndScheme(c.tlsConfig) - - resp, err := client.Get(scheme + "://" + container.Node.Addr + "/containers/" + container.Id + "/json") - if err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - return - } - - // cleanup - defer resp.Body.Close() - defer closeIdleConnections(client) - - data, err := ioutil.ReadAll(resp.Body) - if err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - return - } - - n, err := json.Marshal(container.Node) - if err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - return - } - - // insert Node field - data = bytes.Replace(data, []byte("\"Name\":\"/"), []byte(fmt.Sprintf("\"Node\":%s,\"Name\":\"/", n)), -1) - - // insert node IP - data = bytes.Replace(data, []byte("\"HostIp\":\"0.0.0.0\""), []byte(fmt.Sprintf("\"HostIp\":%q", container.Node.IP)), -1) - - w.Header().Set("Content-Type", "application/json") - w.Write(data) -} - -// POST /containers/create -func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) { - r.ParseForm() - var ( - config dockerclient.ContainerConfig - name = r.Form.Get("name") - ) - - if err := json.NewDecoder(r.Body).Decode(&config); err != nil { - httpError(w, err.Error(), http.StatusBadRequest) - return - } - - if container := c.scheduler.Container(name); container != nil { - httpError(w, fmt.Sprintf("Conflict, The name %s is already assigned to %s. You have to delete (or rename) that container to be able to assign %s to a container again.", name, container.Id, name), http.StatusConflict) - return - } - - container, err := c.scheduler.CreateContainer(&config, name) - if err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusCreated) - fmt.Fprintf(w, "{%q:%q}", "Id", container.Id) - return -} - -// DELETE /containers/{name:.*} -func deleteContainer(c *context, w http.ResponseWriter, r *http.Request) { - if err := r.ParseForm(); err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - return - } - - name := mux.Vars(r)["name"] - force := r.Form.Get("force") == "1" - container := c.scheduler.Container(name) - if container == nil { - httpError(w, fmt.Sprintf("Container %s not found", name), http.StatusNotFound) - return - } - if err := c.scheduler.RemoveContainer(container, force); err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - return - } - w.WriteHeader(http.StatusNoContent) -} - -// GET /events -func getEvents(c *context, w http.ResponseWriter, r *http.Request) { - c.eventsHandler.Add(r.RemoteAddr, w) - - w.Header().Set("Content-Type", "application/json") - - if f, ok := w.(http.Flusher); ok { - f.Flush() - } - - c.eventsHandler.Wait(r.RemoteAddr) -} - -// GET /_ping -func ping(c *context, w http.ResponseWriter, r *http.Request) { - w.Write([]byte{'O', 'K'}) -} - -// Proxy a request to the right node and do a force refresh -func proxyContainerAndForceRefresh(c *context, w http.ResponseWriter, r *http.Request) { - container, err := getContainerFromVars(c, mux.Vars(r)) - if err != nil { - httpError(w, err.Error(), http.StatusNotFound) - return - } - - cb := func(resp *http.Response) { - if resp.StatusCode == http.StatusCreated { - log.Debugf("[REFRESH CONTAINER] --> %s", container.Id) - container.Node.RefreshContainer(container.Id, true) - } - } - - if err := proxyAsync(c.tlsConfig, container.Node.Addr, w, r, cb); err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - } - -} - -// Proxy a request to the right node -func proxyContainer(c *context, w http.ResponseWriter, r *http.Request) { - container, err := getContainerFromVars(c, mux.Vars(r)) - if err != nil { - httpError(w, err.Error(), http.StatusNotFound) - return - } - - if err := proxy(c.tlsConfig, container.Node.Addr, w, r); err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - } -} - -// Proxy a request to the right node -func proxyImage(c *context, w http.ResponseWriter, r *http.Request) { - name := mux.Vars(r)["name"] - - for _, node := range c.scheduler.Nodes() { - if node.Image(name) != nil { - proxy(c.tlsConfig, node.Addr, w, r) - return - } - } - httpError(w, fmt.Sprintf("No such image: %s", name), http.StatusNotFound) -} - -// Proxy a request to a random node -func proxyRandom(c *context, w http.ResponseWriter, r *http.Request) { - candidates := c.scheduler.Nodes() - - healthFilter := &filter.HealthFilter{} - accepted, err := healthFilter.Filter(nil, candidates) - - if err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - return - } - - if err := proxy(c.tlsConfig, accepted[rand.Intn(len(accepted))].Addr, w, r); err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - } -} - -// Proxy a hijack request to the right node -func proxyHijack(c *context, w http.ResponseWriter, r *http.Request) { - container, err := getContainerFromVars(c, mux.Vars(r)) - if err != nil { - httpError(w, err.Error(), http.StatusNotFound) - return - } - - if err := hijack(c.tlsConfig, container.Node.Addr, w, r); err != nil { - httpError(w, err.Error(), http.StatusInternalServerError) - } -} - -// Default handler for methods not supported by clustering. -func notImplementedHandler(c *context, w http.ResponseWriter, r *http.Request) { - httpError(w, "Not supported in clustering mode.", http.StatusNotImplemented) -} - -func optionsHandler(c *context, w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) -} - -func writeCorsHeaders(w http.ResponseWriter, r *http.Request) { - w.Header().Add("Access-Control-Allow-Origin", "*") - w.Header().Add("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept") - w.Header().Add("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT, OPTIONS") -} - -func httpError(w http.ResponseWriter, err string, status int) { - log.WithField("status", status).Errorf("HTTP error: %v", err) - http.Error(w, err, status) -} - -func createRouter(c *context, enableCors bool) *mux.Router { - r := mux.NewRouter() - m := map[string]map[string]handler{ - "GET": { - "/_ping": ping, - "/events": getEvents, - "/info": getInfo, - "/version": getVersion, - "/images/json": getImagesJSON, - "/images/viz": notImplementedHandler, - "/images/search": proxyRandom, - "/images/get": notImplementedHandler, - "/images/{name:.*}/get": notImplementedHandler, - "/images/{name:.*}/history": proxyImage, - "/images/{name:.*}/json": proxyImage, - "/containers/ps": getContainersJSON, - "/containers/json": getContainersJSON, - "/containers/{name:.*}/export": proxyContainer, - "/containers/{name:.*}/changes": proxyContainer, - "/containers/{name:.*}/json": getContainerJSON, - "/containers/{name:.*}/top": proxyContainer, - "/containers/{name:.*}/logs": proxyContainer, - "/containers/{name:.*}/stats": proxyContainer, - "/containers/{name:.*}/attach/ws": notImplementedHandler, - "/exec/{execid:.*}/json": proxyContainer, - }, - "POST": { - "/auth": proxyRandom, - "/commit": notImplementedHandler, - "/build": notImplementedHandler, - "/images/create": notImplementedHandler, - "/images/load": notImplementedHandler, - "/images/{name:.*}/push": notImplementedHandler, - "/images/{name:.*}/tag": notImplementedHandler, - "/containers/create": postContainersCreate, - "/containers/{name:.*}/kill": proxyContainer, - "/containers/{name:.*}/pause": proxyContainer, - "/containers/{name:.*}/unpause": proxyContainer, - "/containers/{name:.*}/rename": proxyContainer, - "/containers/{name:.*}/restart": proxyContainer, - "/containers/{name:.*}/start": proxyContainer, - "/containers/{name:.*}/stop": proxyContainer, - "/containers/{name:.*}/wait": proxyContainer, - "/containers/{name:.*}/resize": proxyContainer, - "/containers/{name:.*}/attach": proxyHijack, - "/containers/{name:.*}/copy": proxyContainer, - "/containers/{name:.*}/exec": proxyContainerAndForceRefresh, - "/exec/{execid:.*}/start": proxyHijack, - "/exec/{execid:.*}/resize": proxyContainer, - }, - "DELETE": { - "/containers/{name:.*}": deleteContainer, - "/images/{name:.*}": notImplementedHandler, - }, - "OPTIONS": { - "": optionsHandler, - }, - } - - for method, routes := range m { - for route, fct := range routes { - log.WithFields(log.Fields{"method": method, "route": route}).Debug("Registering HTTP route") - - // NOTE: scope issue, make sure the variables are local and won't be changed - localRoute := route - localFct := fct - wrap := func(w http.ResponseWriter, r *http.Request) { - log.WithFields(log.Fields{"method": r.Method, "uri": r.RequestURI}).Info("HTTP request received") - if enableCors { - writeCorsHeaders(w, r) - } - localFct(c, w, r) - } - localMethod := method - - // add the new route - r.Path("/v{version:[0-9.]+}" + localRoute).Methods(localMethod).HandlerFunc(wrap) - r.Path(localRoute).Methods(localMethod).HandlerFunc(wrap) - } - } - - return r -} diff --git a/api/api_test.go b/api/api_test.go index 747ea42119..4087111813 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -2,18 +2,17 @@ package api import ( "encoding/json" + "github.com/docker/swarm/cluster" + "github.com/docker/swarm/version" + "github.com/stretchr/testify/assert" "net/http" "net/http/httptest" "testing" - - "github.com/docker/swarm/scheduler" - "github.com/docker/swarm/version" - "github.com/stretchr/testify/assert" ) -func serveRequest(s scheduler.Scheduler, w http.ResponseWriter, req *http.Request) error { +func serveRequest(c cluster.Cluster, w http.ResponseWriter, req *http.Request) error { context := &context{ - scheduler: s, + cluster: c, } r := createRouter(context, false) diff --git a/api/server.go b/api/server.go index ae8c7887c0..e3ea4e4b22 100644 --- a/api/server.go +++ b/api/server.go @@ -8,7 +8,7 @@ import ( "strings" log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/scheduler" + "github.com/docker/swarm/cluster" ) const DefaultDockerPort = ":2375" @@ -28,13 +28,13 @@ func newListener(proto, addr string, tlsConfig *tls.Config) (net.Listener, error return l, nil } -func ListenAndServe(s scheduler.Scheduler, hosts []string, enableCors bool, tlsConfig *tls.Config) error { +func ListenAndServe(c cluster.Cluster, hosts []string, enableCors bool, tlsConfig *tls.Config) error { context := &context{ - scheduler: s, + cluster: c, eventsHandler: NewEventsHandler(), tlsConfig: tlsConfig, } - s.Events(context.eventsHandler) + c.Events(context.eventsHandler) r := createRouter(context, enableCors) chErrors := make(chan error, len(hosts)) diff --git a/api/utils.go b/api/utils.go index 72e218f170..feb094a212 100644 --- a/api/utils.go +++ b/api/utils.go @@ -22,14 +22,14 @@ func newClientAndScheme(tlsConfig *tls.Config) (*http.Client, string) { func getContainerFromVars(c *context, vars map[string]string) (*cluster.Container, error) { if name, ok := vars["name"]; ok { - if container := c.scheduler.Container(name); container != nil { + if container := c.cluster.Container(name); container != nil { return container, nil } return nil, fmt.Errorf("No such container: %s", name) } if ID, ok := vars["execid"]; ok { - for _, container := range c.scheduler.Containers() { + for _, container := range c.cluster.Containers() { for _, execID := range container.Info.ExecIDs { if ID == execID { return container, nil diff --git a/cluster/cluster.go b/cluster/cluster.go index f8940f37e1..933235a65b 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -1,141 +1,17 @@ package cluster import ( - "errors" - "sync" - - log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/state" + "github.com/docker/swarm/discovery" "github.com/samalba/dockerclient" ) -var ( - ErrNodeNotConnected = errors.New("node is not connected to docker's REST API") - ErrNodeAlreadyRegistered = errors.New("node was already added to the cluster") -) +type Cluster interface { + CreateContainer(config *dockerclient.ContainerConfig, name string) (*Container, error) + RemoveContainer(container *Container, force bool) error -type Cluster struct { - sync.RWMutex - store *state.Store - eventHandlers []EventHandler - nodes map[string]*Node -} - -func NewCluster(store *state.Store) *Cluster { - return &Cluster{ - nodes: make(map[string]*Node), - store: store, - } -} - -// Commit the requested state in the store. -func (c *Cluster) CommitContainerInStore(Id string, config *dockerclient.ContainerConfig, name string) error { - st := &state.RequestedState{ - ID: Id, - Name: name, - Config: config, - } - return c.store.Add(Id, st) -} - -// Remove a container from the store. -func (c *Cluster) RemoveContainerFromStore(Id string, force bool) error { - if err := c.store.Remove(Id); err != nil { - if err == state.ErrNotFound { - log.Debugf("Container %s not found in the store", Id) - return nil - } - return err - } - return nil -} - -func (c *Cluster) Handle(e *Event) error { - for _, eventHandler := range c.eventHandlers { - if err := eventHandler.Handle(e); err != nil { - log.Error(err) - } - } - return nil -} - -// Register a node within the cluster. The node must have been already -// initialized. -func (c *Cluster) AddNode(n *Node) error { - if !n.IsConnected() { - return ErrNodeNotConnected - } - - c.Lock() - defer c.Unlock() - - if old, exists := c.nodes[n.ID]; exists { - if old.IP != n.IP { - log.Errorf("ID duplicated. %s shared by %s and %s", n.ID, old.IP, n.IP) - } - return ErrNodeAlreadyRegistered - } - - c.nodes[n.ID] = n - return n.Events(c) -} - -// Containers returns all the containers in the cluster. -func (c *Cluster) Containers() []*Container { - c.RLock() - defer c.RUnlock() - - out := []*Container{} - for _, n := range c.nodes { - containers := n.Containers() - for _, container := range containers { - out = append(out, container) - } - } - - return out -} - -// Container returns the container with IdOrName in the cluster -func (c *Cluster) Container(IdOrName string) *Container { - // Abort immediately if the name is empty. - if len(IdOrName) == 0 { - return nil - } - - c.RLock() - defer c.RUnlock() - - for _, n := range c.nodes { - if container := n.Container(IdOrName); container != nil { - return container - } - } - - return nil -} - -// Nodes returns the list of nodes in the cluster -func (c *Cluster) Nodes() []*Node { - nodes := []*Node{} - c.RLock() - for _, node := range c.nodes { - nodes = append(nodes, node) - } - c.RUnlock() - return nodes -} - -func (c *Cluster) Node(addr string) *Node { - for _, node := range c.nodes { - if node.Addr == addr { - return node - } - } - return nil -} - -func (c *Cluster) Events(h EventHandler) error { - c.eventHandlers = append(c.eventHandlers, h) - return nil + Events(eventsHandler EventHandler) + Nodes() []*Node + Containers() []*Container + Container(IdOrName string) *Container + NewEntries(entries []*discovery.Entry) } diff --git a/cluster/mesos/mesos.go b/cluster/mesos/mesos.go new file mode 100644 index 0000000000..d3ff9028b4 --- /dev/null +++ b/cluster/mesos/mesos.go @@ -0,0 +1,87 @@ +package mesos + +import ( + "errors" + "sync" + + log "github.com/Sirupsen/logrus" + "github.com/docker/swarm/cluster" + "github.com/docker/swarm/discovery" + "github.com/samalba/dockerclient" +) + +var ErrNotImplemented = errors.New("not implemented in the mesos cluster") + +type MesosCluster struct { + sync.Mutex + + //TODO: list of mesos masters + //TODO: list of offers + nodes *cluster.Nodes + options *cluster.Options +} + +func NewCluster(options *cluster.Options) cluster.Cluster { + log.WithFields(log.Fields{"name": "mesos"}).Debug("Initializing cluster") + return &MesosCluster{ + nodes: cluster.NewNodes(), + options: options, + } +} + +// Schedule a brand new container into the cluster. +func (s *MesosCluster) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) { + + s.Lock() + defer s.Unlock() + + //TODO: pick the right offer (using strategy & filters ???) + + //TODO: LaunchTask on the Mesos cluster and get container + + //TODO: Store container in store ?? + + return nil, ErrNotImplemented +} + +// Remove a container from the cluster. Containers should always be destroyed +// through the scheduler to guarantee atomicity. +func (s *MesosCluster) RemoveContainer(container *cluster.Container, force bool) error { + s.Lock() + defer s.Unlock() + + //TODO: KillTask + + //TODO: remove container from store ?? + + return ErrNotImplemented +} + +// Entries are Mesos masters +func (s *MesosCluster) NewEntries(entries []*discovery.Entry) { + + //TODO: get list of actual docker nodes from mesos masters + // - cluster.NewNode(m.String(), s.options.OvercommitRatio) + + //TODO: create direct connection to those nodes + // - n.Connect(s.options.TLSConfig) + + //TODO: add them to the cluster + // - s.nodes.Add(n) +} + +func (s *MesosCluster) Events(eventsHandler cluster.EventHandler) { + s.nodes.Events(eventsHandler) +} + +func (s *MesosCluster) Nodes() []*cluster.Node { + return s.nodes.List() +} + +func (s *MesosCluster) Containers() []*cluster.Container { + return s.nodes.Containers() +} + +func (s *MesosCluster) Container(IdOrName string) *cluster.Container { + return s.nodes.Container(IdOrName) +} diff --git a/cluster/nodes.go b/cluster/nodes.go new file mode 100644 index 0000000000..04660c9241 --- /dev/null +++ b/cluster/nodes.go @@ -0,0 +1,115 @@ +package cluster + +import ( + "errors" + "sync" + + log "github.com/Sirupsen/logrus" +) + +var ( + ErrNodeNotConnected = errors.New("node is not connected to docker's REST API") + ErrNodeAlreadyRegistered = errors.New("node was already") +) + +type Nodes struct { + sync.RWMutex + + eventHandlers []EventHandler + nodes map[string]*Node +} + +func NewNodes() *Nodes { + return &Nodes{ + nodes: make(map[string]*Node), + } +} + +func (c *Nodes) Handle(e *Event) error { + for _, eventHandler := range c.eventHandlers { + if err := eventHandler.Handle(e); err != nil { + log.Error(err) + } + } + return nil +} + +// Register a node within the cluster. The node must have been already +// initialized. +func (c *Nodes) Add(n *Node) error { + if !n.IsConnected() { + return ErrNodeNotConnected + } + + c.Lock() + defer c.Unlock() + + if old, exists := c.nodes[n.ID]; exists { + if old.IP != n.IP { + log.Errorf("ID duplicated. %s shared by %s and %s", n.ID, old.IP, n.IP) + } + return ErrNodeAlreadyRegistered + } + + c.nodes[n.ID] = n + return n.Events(c) +} + +// Containers returns all the containers in the cluster. +func (c *Nodes) Containers() []*Container { + c.Lock() + defer c.Unlock() + + out := []*Container{} + for _, n := range c.nodes { + containers := n.Containers() + for _, container := range containers { + out = append(out, container) + } + } + + return out +} + +// Container returns the container with IdOrName in the cluster +func (c *Nodes) Container(IdOrName string) *Container { + // Abort immediately if the name is empty. + if len(IdOrName) == 0 { + return nil + } + + c.RLock() + defer c.RUnlock() + for _, n := range c.nodes { + if container := n.Container(IdOrName); container != nil { + return container + } + } + + return nil +} + +// Nodes returns the list of nodes in the cluster +func (c *Nodes) List() []*Node { + nodes := []*Node{} + c.RLock() + for _, node := range c.nodes { + nodes = append(nodes, node) + } + c.RUnlock() + return nodes +} + +func (c *Nodes) Get(addr string) *Node { + for _, node := range c.nodes { + if node.Addr == addr { + return node + } + } + return nil +} + +func (c *Nodes) Events(h EventHandler) error { + c.eventHandlers = append(c.eventHandlers, h) + return nil +} diff --git a/cluster/cluster_test.go b/cluster/nodes_test.go similarity index 68% rename from cluster/cluster_test.go rename to cluster/nodes_test.go index d45bf79e8a..f35d5dc2de 100644 --- a/cluster/cluster_test.go +++ b/cluster/nodes_test.go @@ -1,10 +1,8 @@ package cluster import ( - "io/ioutil" "testing" - "github.com/docker/swarm/state" "github.com/samalba/dockerclient" "github.com/samalba/dockerclient/mockclient" "github.com/stretchr/testify/assert" @@ -34,39 +32,33 @@ func createNode(t *testing.T, ID string, containers ...dockerclient.Container) * return node } -func newCluster(t *testing.T) *Cluster { - dir, err := ioutil.TempDir("", "store-test") - assert.NoError(t, err) - return NewCluster(state.NewStore(dir)) -} +func TestAdd(t *testing.T) { + c := NewNodes() + assert.Equal(t, len(c.List()), 0) + assert.Nil(t, c.Get("test")) + assert.Nil(t, c.Get("test2")) -func TestAddNode(t *testing.T) { - c := newCluster(t) - assert.Equal(t, len(c.Nodes()), 0) - assert.Nil(t, c.Node("test")) - assert.Nil(t, c.Node("test2")) + assert.NoError(t, c.Add(createNode(t, "test"))) + assert.Equal(t, len(c.List()), 1) + assert.NotNil(t, c.Get("test")) - assert.NoError(t, c.AddNode(createNode(t, "test"))) - assert.Equal(t, len(c.Nodes()), 1) - assert.NotNil(t, c.Node("test")) + assert.Error(t, c.Add(createNode(t, "test"))) + assert.Equal(t, len(c.List()), 1) + assert.NotNil(t, c.Get("test")) - assert.Error(t, c.AddNode(createNode(t, "test"))) - assert.Equal(t, len(c.Nodes()), 1) - assert.NotNil(t, c.Node("test")) - - assert.NoError(t, c.AddNode(createNode(t, "test2"))) - assert.Equal(t, len(c.Nodes()), 2) - assert.NotNil(t, c.Node("test2")) + assert.NoError(t, c.Add(createNode(t, "test2"))) + assert.Equal(t, len(c.List()), 2) + assert.NotNil(t, c.Get("test2")) } func TestContainerLookup(t *testing.T) { - c := newCluster(t) + c := NewNodes() container := dockerclient.Container{ Id: "container-id", Names: []string{"/container-name1", "/container-name2"}, } node := createNode(t, "test-node", container) - assert.NoError(t, c.AddNode(node)) + assert.NoError(t, c.Add(node)) // Invalid lookup assert.Nil(t, c.Container("invalid-id")) diff --git a/cluster/options.go b/cluster/options.go new file mode 100644 index 0000000000..cc6468b8fb --- /dev/null +++ b/cluster/options.go @@ -0,0 +1,8 @@ +package cluster + +import "crypto/tls" + +type Options struct { + TLSConfig *tls.Config + OvercommitRatio float64 +} diff --git a/cluster/swarm/swarm.go b/cluster/swarm/swarm.go new file mode 100644 index 0000000000..cddfd55c9a --- /dev/null +++ b/cluster/swarm/swarm.go @@ -0,0 +1,110 @@ +package swarm + +import ( + "sync" + + log "github.com/Sirupsen/logrus" + "github.com/docker/swarm/cluster" + "github.com/docker/swarm/discovery" + "github.com/docker/swarm/scheduler" + "github.com/docker/swarm/state" + "github.com/samalba/dockerclient" +) + +type SwarmCluster struct { + sync.RWMutex + + nodes *cluster.Nodes + scheduler *scheduler.Scheduler + options *cluster.Options + store *state.Store +} + +func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, options *cluster.Options) cluster.Cluster { + log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster") + return &SwarmCluster{ + nodes: cluster.NewNodes(), + scheduler: scheduler, + options: options, + store: store, + } +} + +// Schedule a brand new container into the cluster. +func (s *SwarmCluster) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) { + + s.RLock() + defer s.RUnlock() + + node, err := s.scheduler.SelectNodeForContainer(s.nodes.List(), config) + if err != nil { + return nil, err + } + + container, err := node.Create(config, name, true) + if err != nil { + return nil, err + } + + st := &state.RequestedState{ + ID: container.Id, + Name: name, + Config: config, + } + return container, s.store.Add(container.Id, st) +} + +// Remove a container from the cluster. Containers should always be destroyed +// through the scheduler to guarantee atomicity. +func (s *SwarmCluster) RemoveContainer(container *cluster.Container, force bool) error { + s.Lock() + defer s.Unlock() + + if err := container.Node.Destroy(container, force); err != nil { + return err + } + + if err := s.store.Remove(container.Id); err != nil { + if err == state.ErrNotFound { + log.Debugf("Container %s not found in the store", container.Id) + return nil + } + return err + } + return nil +} + +// Entries are Docker Nodes +func (s *SwarmCluster) NewEntries(entries []*discovery.Entry) { + for _, entry := range entries { + go func(m *discovery.Entry) { + if s.nodes.Get(m.String()) == nil { + n := cluster.NewNode(m.String(), s.options.OvercommitRatio) + if err := n.Connect(s.options.TLSConfig); err != nil { + log.Error(err) + return + } + if err := s.nodes.Add(n); err != nil { + log.Error(err) + return + } + } + }(entry) + } +} + +func (s *SwarmCluster) Events(eventsHandler cluster.EventHandler) { + s.nodes.Events(eventsHandler) +} + +func (s *SwarmCluster) Nodes() []*cluster.Node { + return s.nodes.List() +} + +func (s *SwarmCluster) Containers() []*cluster.Container { + return s.nodes.Containers() +} + +func (s *SwarmCluster) Container(IdOrName string) *cluster.Container { + return s.nodes.Container(IdOrName) +} diff --git a/flags.go b/flags.go index 091d79e2f3..4a4f971f0f 100644 --- a/flags.go +++ b/flags.go @@ -94,9 +94,9 @@ var ( Usage: "filter to use [constraint, affinity, health, port, dependency]", Value: &flFilterValue, } - flScheduler = cli.StringFlag{ - Name: "scheduler, s", - Usage: "scheduler to use [builtin, mesos]", - Value: "builtin", + flCluster = cli.StringFlag{ + Name: "cluster, c", + Usage: "cluster to use [swarm, mesos]", + Value: "swarm", } ) diff --git a/main.go b/main.go index a3829d779e..6639f46bfe 100644 --- a/main.go +++ b/main.go @@ -102,7 +102,7 @@ func main() { ShortName: "m", Usage: "manage a docker cluster", Flags: []cli.Flag{ - flStore, flScheduler, + flStore, flCluster, flStrategy, flFilter, flHosts, flHeartBeat, flOverCommit, flTls, flTlsCaCert, flTlsCert, flTlsKey, flTlsVerify, diff --git a/manage.go b/manage.go index 7ccbd197d4..9dbc403fe8 100644 --- a/manage.go +++ b/manage.go @@ -11,12 +11,13 @@ import ( "github.com/codegangsta/cli" "github.com/docker/swarm/api" "github.com/docker/swarm/cluster" + "github.com/docker/swarm/cluster/mesos" + "github.com/docker/swarm/cluster/swarm" "github.com/docker/swarm/discovery" - "github.com/docker/swarm/filter" "github.com/docker/swarm/scheduler" - "github.com/docker/swarm/scheduler/options" + "github.com/docker/swarm/scheduler/filter" + "github.com/docker/swarm/scheduler/strategy" "github.com/docker/swarm/state" - "github.com/docker/swarm/strategy" ) type logHandler struct { @@ -118,17 +119,22 @@ func manage(c *cli.Context) { log.Fatal(err) } - options := &options.SchedulerOptions{ - Strategy: s, - Filters: fs, - Store: store, + sched := scheduler.New(s, fs) + + options := &cluster.Options{ TLSConfig: tlsConfig, OvercommitRatio: c.Float64("overcommit"), } - sched, err := scheduler.New(c.String("scheduler"), options) - if err != nil { - log.Fatal(err) + var cluster cluster.Cluster + + switch c.String("cluster") { + case "swarm": + cluster = swarm.NewCluster(sched, store, options) + case "mesos": + cluster = mesos.NewCluster(options) + default: + log.Fatalf("cluster %q not supported", c.String("cluster")) } // get the list of entries from the discovery service @@ -143,9 +149,9 @@ func manage(c *cli.Context) { log.Fatal(err) } - sched.NewEntries(entries) + cluster.NewEntries(entries) - go d.Watch(sched.NewEntries) + go d.Watch(cluster.NewEntries) }() // see https://github.com/codegangsta/cli/issues/160 @@ -153,5 +159,5 @@ func manage(c *cli.Context) { if c.IsSet("host") || c.IsSet("H") { hosts = hosts[1:] } - log.Fatal(api.ListenAndServe(sched, hosts, c.Bool("cors"), tlsConfig)) + log.Fatal(api.ListenAndServe(cluster, hosts, c.Bool("cors"), tlsConfig)) } diff --git a/scheduler/builtin/builtin.go b/scheduler/builtin/builtin.go deleted file mode 100644 index 0840c094c6..0000000000 --- a/scheduler/builtin/builtin.go +++ /dev/null @@ -1,100 +0,0 @@ -package builtin - -import ( - "sync" - - log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/cluster" - "github.com/docker/swarm/discovery" - "github.com/docker/swarm/filter" - "github.com/docker/swarm/scheduler/options" - "github.com/samalba/dockerclient" -) - -type BuiltinScheduler struct { - sync.Mutex - - cluster *cluster.Cluster - options *options.SchedulerOptions -} - -func (s *BuiltinScheduler) Initialize(options *options.SchedulerOptions) { - s.options = options - - s.cluster = cluster.NewCluster(s.options.Store) -} - -// Schedule a brand new container into the cluster. -func (s *BuiltinScheduler) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) { - - s.Lock() - defer s.Unlock() - - candidates := s.cluster.Nodes() - - // Find a nice home for our container. - accepted, err := filter.ApplyFilters(s.options.Filters, config, candidates) - if err != nil { - return nil, err - } - - node, err := s.options.Strategy.PlaceContainer(config, accepted) - if err != nil { - return nil, err - } - - container, err := node.Create(config, name, true) - if err != nil { - return nil, err - } - - return container, s.cluster.CommitContainerInStore(container.Id, config, name) -} - -// Remove a container from the cluster. Containers should always be destroyed -// through the scheduler to guarantee atomicity. -func (s *BuiltinScheduler) RemoveContainer(container *cluster.Container, force bool) error { - s.Lock() - defer s.Unlock() - - if err := container.Node.Destroy(container, force); err != nil { - return err - } - - return s.cluster.RemoveContainerFromStore(container.Id, force) -} - -// Entries are Docker Nodes -func (s *BuiltinScheduler) NewEntries(entries []*discovery.Entry) { - for _, entry := range entries { - go func(m *discovery.Entry) { - if s.cluster.Node(m.String()) == nil { - n := cluster.NewNode(m.String(), s.options.OvercommitRatio) - if err := n.Connect(s.options.TLSConfig); err != nil { - log.Error(err) - return - } - if err := s.cluster.AddNode(n); err != nil { - log.Error(err) - return - } - } - }(entry) - } -} - -func (s *BuiltinScheduler) Events(eventsHandler cluster.EventHandler) { - s.cluster.Events(eventsHandler) -} - -func (s *BuiltinScheduler) Nodes() []*cluster.Node { - return s.cluster.Nodes() -} - -func (s *BuiltinScheduler) Containers() []*cluster.Container { - return s.cluster.Containers() -} - -func (s *BuiltinScheduler) Container(IdOrName string) *cluster.Container { - return s.cluster.Container(IdOrName) -} diff --git a/filter/README.md b/scheduler/filter/README.md similarity index 100% rename from filter/README.md rename to scheduler/filter/README.md diff --git a/filter/affinity.go b/scheduler/filter/affinity.go similarity index 100% rename from filter/affinity.go rename to scheduler/filter/affinity.go diff --git a/filter/affinity_test.go b/scheduler/filter/affinity_test.go similarity index 100% rename from filter/affinity_test.go rename to scheduler/filter/affinity_test.go diff --git a/filter/constraint.go b/scheduler/filter/constraint.go similarity index 100% rename from filter/constraint.go rename to scheduler/filter/constraint.go diff --git a/filter/constraint_test.go b/scheduler/filter/constraint_test.go similarity index 100% rename from filter/constraint_test.go rename to scheduler/filter/constraint_test.go diff --git a/filter/expr.go b/scheduler/filter/expr.go similarity index 100% rename from filter/expr.go rename to scheduler/filter/expr.go diff --git a/filter/expr_test.go b/scheduler/filter/expr_test.go similarity index 100% rename from filter/expr_test.go rename to scheduler/filter/expr_test.go diff --git a/filter/filter.go b/scheduler/filter/filter.go similarity index 100% rename from filter/filter.go rename to scheduler/filter/filter.go diff --git a/filter/health.go b/scheduler/filter/health.go similarity index 100% rename from filter/health.go rename to scheduler/filter/health.go diff --git a/filter/port.go b/scheduler/filter/port.go similarity index 100% rename from filter/port.go rename to scheduler/filter/port.go diff --git a/filter/port_test.go b/scheduler/filter/port_test.go similarity index 100% rename from filter/port_test.go rename to scheduler/filter/port_test.go diff --git a/scheduler/mesos/mesos.go b/scheduler/mesos/mesos.go deleted file mode 100644 index c68c6f36cc..0000000000 --- a/scheduler/mesos/mesos.go +++ /dev/null @@ -1,88 +0,0 @@ -package mesos - -import ( - "errors" - "sync" - - "github.com/docker/swarm/cluster" - "github.com/docker/swarm/discovery" - "github.com/docker/swarm/scheduler/options" - "github.com/samalba/dockerclient" -) - -var ErrNotImplemented = errors.New("not implemented in the mesos scheduler") - -type MesosScheduler struct { - sync.Mutex - - //TODO: list of mesos masters - cluster *cluster.Cluster - options *options.SchedulerOptions -} - -func (s *MesosScheduler) Initialize(options *options.SchedulerOptions) { - s.options = options - - s.cluster = cluster.NewCluster(s.options.Store) -} - -// Schedule a brand new container into the cluster. -func (s *MesosScheduler) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) { - - s.Lock() - defer s.Unlock() - - //TODO: RequestOffers from mesos master - - //TODO: pick the right offer (using strategy & filters ???) - - //TODO: LaunchTask on the Mesos cluster and get container - - //TODO: Store container in store - // - s.cluster.CommitContainerInStore(container.Id, config, name) - - return nil, ErrNotImplemented -} - -// Remove a container from the cluster. Containers should always be destroyed -// through the scheduler to guarantee atomicity. -func (s *MesosScheduler) RemoveContainer(container *cluster.Container, force bool) error { - s.Lock() - defer s.Unlock() - - //TODO: KillTask - - //TODO: remove container - // - s.cluster.RemoveContainerFromStore(container.Id, force) - - return ErrNotImplemented -} - -// Entries are Mesos masters -func (s *MesosScheduler) NewEntries(entries []*discovery.Entry) { - - //TODO: get list of actual docker nodes from mesos masters - // - cluster.NewNode(m.String(), s.options.OvercommitRatio) - - //TODO: create direct connection to those nodes - // - n.Connect(s.options.TLSConfig) - - //TODO: add them to the cluster - // - s.cluster.AddNode(n) -} - -func (s *MesosScheduler) Events(eventsHandler cluster.EventHandler) { - s.cluster.Events(eventsHandler) -} - -func (s *MesosScheduler) Nodes() []*cluster.Node { - return s.cluster.Nodes() -} - -func (s *MesosScheduler) Containers() []*cluster.Container { - return s.cluster.Containers() -} - -func (s *MesosScheduler) Container(IdOrName string) *cluster.Container { - return s.cluster.Container(IdOrName) -} diff --git a/scheduler/options/options.go b/scheduler/options/options.go deleted file mode 100644 index aca913afe5..0000000000 --- a/scheduler/options/options.go +++ /dev/null @@ -1,18 +0,0 @@ -package options - -import ( - "crypto/tls" - - "github.com/docker/swarm/filter" - "github.com/docker/swarm/state" - "github.com/docker/swarm/strategy" -) - -type SchedulerOptions struct { - Strategy strategy.PlacementStrategy - Filters []filter.Filter - - Store *state.Store - TLSConfig *tls.Config - OvercommitRatio float64 -} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index ef4c028291..056e35188a 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -1,44 +1,30 @@ package scheduler import ( - "fmt" - - log "github.com/Sirupsen/logrus" "github.com/docker/swarm/cluster" - "github.com/docker/swarm/discovery" - "github.com/docker/swarm/scheduler/builtin" - "github.com/docker/swarm/scheduler/mesos" - "github.com/docker/swarm/scheduler/options" + "github.com/docker/swarm/scheduler/filter" + "github.com/docker/swarm/scheduler/strategy" "github.com/samalba/dockerclient" ) -type Scheduler interface { - Initialize(options *options.SchedulerOptions) - CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) - RemoveContainer(container *cluster.Container, force bool) error - - Events(eventsHandler cluster.EventHandler) - Nodes() []*cluster.Node - Containers() []*cluster.Container - Container(IdOrName string) *cluster.Container - - NewEntries(entries []*discovery.Entry) +type Scheduler struct { + strategy strategy.PlacementStrategy + filters []filter.Filter } -var schedulers map[string]Scheduler - -func init() { - schedulers = map[string]Scheduler{ - "builtin": &builtin.BuiltinScheduler{}, - "mesos": &mesos.MesosScheduler{}, +func New(strategy strategy.PlacementStrategy, filters []filter.Filter) *Scheduler { + return &Scheduler{ + strategy: strategy, + filters: filters, } } -func New(name string, options *options.SchedulerOptions) (Scheduler, error) { - if scheduler, exists := schedulers[name]; exists { - log.WithField("name", name).Debug("Initializing scheduler") - scheduler.Initialize(options) - return scheduler, nil +// Find a nice home for our container. +func (s *Scheduler) SelectNodeForContainer(nodes []*cluster.Node, config *dockerclient.ContainerConfig) (*cluster.Node, error) { + accepted, err := filter.ApplyFilters(s.filters, config, nodes) + if err != nil { + return nil, err } - return nil, fmt.Errorf("scheduler %q not supported", name) + + return s.strategy.PlaceContainer(config, accepted) } diff --git a/strategy/README.md b/scheduler/strategy/README.md similarity index 100% rename from strategy/README.md rename to scheduler/strategy/README.md diff --git a/strategy/binpacking.go b/scheduler/strategy/binpacking.go similarity index 100% rename from strategy/binpacking.go rename to scheduler/strategy/binpacking.go diff --git a/strategy/binpacking_test.go b/scheduler/strategy/binpacking_test.go similarity index 100% rename from strategy/binpacking_test.go rename to scheduler/strategy/binpacking_test.go diff --git a/strategy/random.go b/scheduler/strategy/random.go similarity index 100% rename from strategy/random.go rename to scheduler/strategy/random.go diff --git a/strategy/strategy.go b/scheduler/strategy/strategy.go similarity index 100% rename from strategy/strategy.go rename to scheduler/strategy/strategy.go