diff --git a/api/api.go b/api/api.go index b0845d015c..43fab26b4d 100644 --- a/api/api.go +++ b/api/api.go @@ -15,7 +15,6 @@ import ( log "github.com/Sirupsen/logrus" dockerfilters "github.com/docker/docker/pkg/parsers/filters" "github.com/docker/docker/pkg/units" - "github.com/docker/swarm/cluster" "github.com/docker/swarm/filter" "github.com/docker/swarm/scheduler" "github.com/docker/swarm/version" @@ -26,7 +25,6 @@ import ( const APIVERSION = "1.16" type context struct { - cluster *cluster.Cluster scheduler scheduler.Scheduler eventsHandler *eventsHandler debug bool @@ -37,7 +35,7 @@ type handler func(c *context, w http.ResponseWriter, r *http.Request) // GET /info func getInfo(c *context, w http.ResponseWriter, r *http.Request) { - nodes := c.cluster.Nodes() + nodes := c.scheduler.Nodes() driverStatus := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(nodes))}} for _, node := range nodes { @@ -52,7 +50,7 @@ func getInfo(c *context, w http.ResponseWriter, r *http.Request) { NEventsListener int Debug bool }{ - len(c.cluster.Containers()), + len(c.scheduler.Containers()), driverStatus, c.eventsHandler.Size(), c.debug, @@ -100,7 +98,7 @@ func getImagesJSON(c *context, w http.ResponseWriter, r *http.Request) { accepteds, _ := filters["node"] images := []*dockerclient.Image{} - for _, node := range c.cluster.Nodes() { + for _, node := range c.scheduler.Nodes() { if len(accepteds) != 0 { found := false for _, accepted := range accepteds { @@ -134,7 +132,7 @@ func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) { all := r.Form.Get("all") == "1" out := []*dockerclient.Container{} - for _, container := range c.cluster.Containers() { + for _, container := range c.scheduler.Containers() { tmp := (*container).Container // Skip stopped containers unless -a was specified. if !strings.Contains(tmp.Status, "Up") && !all { @@ -172,7 +170,7 @@ func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) { // GET /containers/{name:.*}/json func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) { name := mux.Vars(r)["name"] - container := c.cluster.Container(name) + container := c.scheduler.Container(name) if container == nil { httpError(w, fmt.Sprintf("No such container %s", name), http.StatusNotFound) return @@ -224,7 +222,7 @@ func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) { return } - if container := c.cluster.Container(name); container != nil { + if container := c.scheduler.Container(name); container != nil { httpError(w, fmt.Sprintf("Conflict, The name %s is already assigned to %s. You have to delete (or rename) that container to be able to assign %s to a container again.", name, container.Id, name), http.StatusConflict) return } @@ -250,7 +248,7 @@ func deleteContainer(c *context, w http.ResponseWriter, r *http.Request) { name := mux.Vars(r)["name"] force := r.Form.Get("force") == "1" - container := c.cluster.Container(name) + container := c.scheduler.Container(name) if container == nil { httpError(w, fmt.Sprintf("Container %s not found", name), http.StatusNotFound) return @@ -318,7 +316,7 @@ func proxyContainer(c *context, w http.ResponseWriter, r *http.Request) { func proxyImage(c *context, w http.ResponseWriter, r *http.Request) { name := mux.Vars(r)["name"] - for _, node := range c.cluster.Nodes() { + for _, node := range c.scheduler.Nodes() { if node.Image(name) != nil { proxy(c.tlsConfig, node.Addr, w, r) return @@ -329,7 +327,7 @@ func proxyImage(c *context, w http.ResponseWriter, r *http.Request) { // Proxy a request to a random node func proxyRandom(c *context, w http.ResponseWriter, r *http.Request) { - candidates := c.cluster.Nodes() + candidates := c.scheduler.Nodes() healthFilter := &filter.HealthFilter{} accepted, err := healthFilter.Filter(nil, candidates) diff --git a/api/api.go~ b/api/api.go~ new file mode 100644 index 0000000000..e7c08536b9 --- /dev/null +++ b/api/api.go~ @@ -0,0 +1,464 @@ +package api + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "runtime" + "sort" + "strings" + + log "github.com/Sirupsen/logrus" + dockerfilters "github.com/docker/docker/pkg/parsers/filters" +<<<<<<< HEAD + "github.com/docker/docker/pkg/units" + "github.com/docker/swarm/cluster" +======= +>>>>>>> initial mesos.go file full fo TODOs + "github.com/docker/swarm/filter" + "github.com/docker/swarm/scheduler" + "github.com/docker/swarm/version" + "github.com/gorilla/mux" + "github.com/samalba/dockerclient" +) + +const APIVERSION = "1.16" + +type context struct { + scheduler scheduler.Scheduler + eventsHandler *eventsHandler + debug bool + tlsConfig *tls.Config +} + +type handler func(c *context, w http.ResponseWriter, r *http.Request) + +// GET /info +func getInfo(c *context, w http.ResponseWriter, r *http.Request) { + nodes := c.scheduler.Nodes() + driverStatus := [][2]string{{"\bNodes", fmt.Sprintf("%d", len(nodes))}} + + for _, node := range nodes { + driverStatus = append(driverStatus, [2]string{node.Name, node.Addr}) + driverStatus = append(driverStatus, [2]string{" └ Containers", fmt.Sprintf("%d", len(node.Containers()))}) + driverStatus = append(driverStatus, [2]string{" └ Reserved CPUs", fmt.Sprintf("%d / %d", node.ReservedCpus(), node.Cpus)}) + driverStatus = append(driverStatus, [2]string{" └ Reserved Memory", fmt.Sprintf("%s / %s", units.BytesSize(float64(node.ReservedMemory())), units.BytesSize(float64(node.Memory)))}) + } + info := struct { + Containers int + DriverStatus [][2]string + NEventsListener int + Debug bool + }{ + len(c.scheduler.Containers()), + driverStatus, + c.eventsHandler.Size(), + c.debug, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(info) +} + +// GET /version +func getVersion(c *context, w http.ResponseWriter, r *http.Request) { + version := struct { + Version string + ApiVersion string + GoVersion string + GitCommit string + Os string + Arch string + }{ + Version: "swarm/" + version.VERSION, + ApiVersion: APIVERSION, + GoVersion: runtime.Version(), + GitCommit: version.GITCOMMIT, + Os: runtime.GOOS, + Arch: runtime.GOARCH, + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(version) +} + +// GET /images/json +func getImagesJSON(c *context, w http.ResponseWriter, r *http.Request) { + if err := r.ParseForm(); err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + filters, err := dockerfilters.FromParam(r.Form.Get("filters")) + if err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + accepteds, _ := filters["node"] + images := []*dockerclient.Image{} + + for _, node := range c.scheduler.Nodes() { + if len(accepteds) != 0 { + found := false + for _, accepted := range accepteds { + if accepted == node.Name || accepted == node.ID { + found = true + break + } + } + if !found { + continue + } + } + + for _, image := range node.Images() { + images = append(images, image) + } + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(images) +} + +// GET /containers/ps +// GET /containers/json +func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) { + if err := r.ParseForm(); err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + all := r.Form.Get("all") == "1" + + out := []*dockerclient.Container{} + for _, container := range c.scheduler.Containers() { + tmp := (*container).Container + // Skip stopped containers unless -a was specified. + if !strings.Contains(tmp.Status, "Up") && !all { + continue + } + // Skip swarm containers unless -a was specified. + if strings.Split(tmp.Image, ":")[0] == "swarm" && !all { + continue + } + if !container.Node.IsHealthy() { + tmp.Status = "Pending" + } + // TODO remove the Node Name in the name when we have a good solution + tmp.Names = make([]string, len(container.Names)) + for i, name := range container.Names { + tmp.Names[i] = "/" + container.Node.Name + name + } + // insert node IP + tmp.Ports = make([]dockerclient.Port, len(container.Ports)) + for i, port := range container.Ports { + tmp.Ports[i] = port + if port.IP == "0.0.0.0" { + tmp.Ports[i].IP = container.Node.IP + } + } + out = append(out, &tmp) + } + + sort.Sort(sort.Reverse(ContainerSorter(out))) + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(out) +} + +// GET /containers/{name:.*}/json +func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) { + name := mux.Vars(r)["name"] + container := c.scheduler.Container(name) + if container == nil { + httpError(w, fmt.Sprintf("No such container %s", name), http.StatusNotFound) + return + } + client, scheme := newClientAndScheme(c.tlsConfig) + + resp, err := client.Get(scheme + "://" + container.Node.Addr + "/containers/" + container.Id + "/json") + if err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + // cleanup + defer resp.Body.Close() + defer closeIdleConnections(client) + + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + n, err := json.Marshal(container.Node) + if err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + // insert Node field + data = bytes.Replace(data, []byte("\"Name\":\"/"), []byte(fmt.Sprintf("\"Node\":%s,\"Name\":\"/", n)), -1) + + // insert node IP + data = bytes.Replace(data, []byte("\"HostIp\":\"0.0.0.0\""), []byte(fmt.Sprintf("\"HostIp\":%q", container.Node.IP)), -1) + + w.Header().Set("Content-Type", "application/json") + w.Write(data) +} + +// POST /containers/create +func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + var ( + config dockerclient.ContainerConfig + name = r.Form.Get("name") + ) + + if err := json.NewDecoder(r.Body).Decode(&config); err != nil { + httpError(w, err.Error(), http.StatusBadRequest) + return + } + + if container := c.scheduler.Container(name); container != nil { + httpError(w, fmt.Sprintf("Conflict, The name %s is already assigned to %s. You have to delete (or rename) that container to be able to assign %s to a container again.", name, container.Id, name), http.StatusConflict) + return + } + + container, err := c.scheduler.CreateContainer(&config, name) + if err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + fmt.Fprintf(w, "{%q:%q}", "Id", container.Id) + return +} + +// DELETE /containers/{name:.*} +func deleteContainer(c *context, w http.ResponseWriter, r *http.Request) { + if err := r.ParseForm(); err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + name := mux.Vars(r)["name"] + force := r.Form.Get("force") == "1" + container := c.scheduler.Container(name) + if container == nil { + httpError(w, fmt.Sprintf("Container %s not found", name), http.StatusNotFound) + return + } + if err := c.scheduler.RemoveContainer(container, force); err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) +} + +// GET /events +func getEvents(c *context, w http.ResponseWriter, r *http.Request) { + c.eventsHandler.Add(r.RemoteAddr, w) + + w.Header().Set("Content-Type", "application/json") + + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + + c.eventsHandler.Wait(r.RemoteAddr) +} + +// GET /_ping +func ping(c *context, w http.ResponseWriter, r *http.Request) { + w.Write([]byte{'O', 'K'}) +} + +// Proxy a request to the right node and do a force refresh +func proxyContainerAndForceRefresh(c *context, w http.ResponseWriter, r *http.Request) { + container, err := getContainerFromVars(c, mux.Vars(r)) + if err != nil { + httpError(w, err.Error(), http.StatusNotFound) + return + } + + cb := func(resp *http.Response) { + if resp.StatusCode == http.StatusCreated { + log.Debugf("[REFRESH CONTAINER] --> %s", container.Id) + container.Node.RefreshContainer(container.Id, true) + } + } + + if err := proxyAsync(c.tlsConfig, container.Node.Addr, w, r, cb); err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + } + +} + +// Proxy a request to the right node +func proxyContainer(c *context, w http.ResponseWriter, r *http.Request) { + container, err := getContainerFromVars(c, mux.Vars(r)) + if err != nil { + httpError(w, err.Error(), http.StatusNotFound) + return + } + + if err := proxy(c.tlsConfig, container.Node.Addr, w, r); err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + } +} + +// Proxy a request to the right node +func proxyImage(c *context, w http.ResponseWriter, r *http.Request) { + name := mux.Vars(r)["name"] + + for _, node := range c.scheduler.Nodes() { + if node.Image(name) != nil { + proxy(c.tlsConfig, node.Addr, w, r) + return + } + } + httpError(w, fmt.Sprintf("No such image: %s", name), http.StatusNotFound) +} + +// Proxy a request to a random node +func proxyRandom(c *context, w http.ResponseWriter, r *http.Request) { + candidates := c.scheduler.Nodes() + + healthFilter := &filter.HealthFilter{} + accepted, err := healthFilter.Filter(nil, candidates) + + if err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + if err := proxy(c.tlsConfig, accepted[rand.Intn(len(accepted))].Addr, w, r); err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + } +} + +// Proxy a hijack request to the right node +func proxyHijack(c *context, w http.ResponseWriter, r *http.Request) { + container, err := getContainerFromVars(c, mux.Vars(r)) + if err != nil { + httpError(w, err.Error(), http.StatusNotFound) + return + } + + if err := hijack(c.tlsConfig, container.Node.Addr, w, r); err != nil { + httpError(w, err.Error(), http.StatusInternalServerError) + } +} + +// Default handler for methods not supported by clustering. +func notImplementedHandler(c *context, w http.ResponseWriter, r *http.Request) { + httpError(w, "Not supported in clustering mode.", http.StatusNotImplemented) +} + +func optionsHandler(c *context, w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) +} + +func writeCorsHeaders(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Access-Control-Allow-Origin", "*") + w.Header().Add("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept") + w.Header().Add("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT, OPTIONS") +} + +func httpError(w http.ResponseWriter, err string, status int) { + log.WithField("status", status).Errorf("HTTP error: %v", err) + http.Error(w, err, status) +} + +func createRouter(c *context, enableCors bool) *mux.Router { + r := mux.NewRouter() + m := map[string]map[string]handler{ + "GET": { + "/_ping": ping, + "/events": getEvents, + "/info": getInfo, + "/version": getVersion, + "/images/json": getImagesJSON, + "/images/viz": notImplementedHandler, + "/images/search": proxyRandom, + "/images/get": notImplementedHandler, + "/images/{name:.*}/get": notImplementedHandler, + "/images/{name:.*}/history": proxyImage, + "/images/{name:.*}/json": proxyImage, + "/containers/ps": getContainersJSON, + "/containers/json": getContainersJSON, + "/containers/{name:.*}/export": proxyContainer, + "/containers/{name:.*}/changes": proxyContainer, + "/containers/{name:.*}/json": getContainerJSON, + "/containers/{name:.*}/top": proxyContainer, + "/containers/{name:.*}/logs": proxyContainer, + "/containers/{name:.*}/stats": proxyContainer, + "/containers/{name:.*}/attach/ws": notImplementedHandler, + "/exec/{execid:.*}/json": proxyContainer, + }, + "POST": { + "/auth": proxyRandom, + "/commit": notImplementedHandler, + "/build": notImplementedHandler, + "/images/create": notImplementedHandler, + "/images/load": notImplementedHandler, + "/images/{name:.*}/push": notImplementedHandler, + "/images/{name:.*}/tag": notImplementedHandler, + "/containers/create": postContainersCreate, + "/containers/{name:.*}/kill": proxyContainer, + "/containers/{name:.*}/pause": proxyContainer, + "/containers/{name:.*}/unpause": proxyContainer, + "/containers/{name:.*}/rename": proxyContainer, + "/containers/{name:.*}/restart": proxyContainer, + "/containers/{name:.*}/start": proxyContainer, + "/containers/{name:.*}/stop": proxyContainer, + "/containers/{name:.*}/wait": proxyContainer, + "/containers/{name:.*}/resize": proxyContainer, + "/containers/{name:.*}/attach": proxyHijack, + "/containers/{name:.*}/copy": proxyContainer, + "/containers/{name:.*}/exec": proxyContainerAndForceRefresh, + "/exec/{execid:.*}/start": proxyHijack, + "/exec/{execid:.*}/resize": proxyContainer, + }, + "DELETE": { + "/containers/{name:.*}": deleteContainer, + "/images/{name:.*}": notImplementedHandler, + }, + "OPTIONS": { + "": optionsHandler, + }, + } + + for method, routes := range m { + for route, fct := range routes { + log.WithFields(log.Fields{"method": method, "route": route}).Debug("Registering HTTP route") + + // NOTE: scope issue, make sure the variables are local and won't be changed + localRoute := route + localFct := fct + wrap := func(w http.ResponseWriter, r *http.Request) { + log.WithFields(log.Fields{"method": r.Method, "uri": r.RequestURI}).Info("HTTP request received") + if enableCors { + writeCorsHeaders(w, r) + } + localFct(c, w, r) + } + localMethod := method + + // add the new route + r.Path("/v{version:[0-9.]+}" + localRoute).Methods(localMethod).HandlerFunc(wrap) + r.Path(localRoute).Methods(localMethod).HandlerFunc(wrap) + } + } + + return r +} diff --git a/api/api_test.go b/api/api_test.go index 770415c453..747ea42119 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -6,15 +6,13 @@ import ( "net/http/httptest" "testing" - "github.com/docker/swarm/cluster" "github.com/docker/swarm/scheduler" "github.com/docker/swarm/version" "github.com/stretchr/testify/assert" ) -func serveRequest(c *cluster.Cluster, s scheduler.Scheduler, w http.ResponseWriter, req *http.Request) error { +func serveRequest(s scheduler.Scheduler, w http.ResponseWriter, req *http.Request) error { context := &context{ - cluster: c, scheduler: s, } @@ -28,7 +26,7 @@ func TestGetVersion(t *testing.T) { req, err := http.NewRequest("GET", "/version", nil) assert.NoError(t, err) - assert.NoError(t, serveRequest(nil, nil, r, req)) + assert.NoError(t, serveRequest(nil, r, req)) assert.Equal(t, r.Code, http.StatusOK) v := struct { diff --git a/api/server.go b/api/server.go index 20a996f443..ae8c7887c0 100644 --- a/api/server.go +++ b/api/server.go @@ -8,7 +8,6 @@ import ( "strings" log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/cluster" "github.com/docker/swarm/scheduler" ) @@ -29,14 +28,13 @@ func newListener(proto, addr string, tlsConfig *tls.Config) (net.Listener, error return l, nil } -func ListenAndServe(c *cluster.Cluster, s scheduler.Scheduler, hosts []string, enableCors bool, tlsConfig *tls.Config) error { +func ListenAndServe(s scheduler.Scheduler, hosts []string, enableCors bool, tlsConfig *tls.Config) error { context := &context{ - cluster: c, scheduler: s, eventsHandler: NewEventsHandler(), tlsConfig: tlsConfig, } - c.Events(context.eventsHandler) + s.Events(context.eventsHandler) r := createRouter(context, enableCors) chErrors := make(chan error, len(hosts)) diff --git a/api/utils.go b/api/utils.go index feb094a212..72e218f170 100644 --- a/api/utils.go +++ b/api/utils.go @@ -22,14 +22,14 @@ func newClientAndScheme(tlsConfig *tls.Config) (*http.Client, string) { func getContainerFromVars(c *context, vars map[string]string) (*cluster.Container, error) { if name, ok := vars["name"]; ok { - if container := c.cluster.Container(name); container != nil { + if container := c.scheduler.Container(name); container != nil { return container, nil } return nil, fmt.Errorf("No such container: %s", name) } if ID, ok := vars["execid"]; ok { - for _, container := range c.cluster.Containers() { + for _, container := range c.scheduler.Containers() { for _, execID := range container.Info.ExecIDs { if ID == execID { return container, nil diff --git a/cluster/cluster.go b/cluster/cluster.go index f9c51c6aa6..7334e4ea57 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -6,7 +6,6 @@ import ( "sync" log "github.com/Sirupsen/logrus" - "github.com/docker/swarm/discovery" "github.com/docker/swarm/state" "github.com/samalba/dockerclient" ) @@ -19,18 +18,18 @@ var ( type Cluster struct { sync.RWMutex store *state.Store - tlsConfig *tls.Config + TLSConfig *tls.Config eventHandlers []EventHandler nodes map[string]*Node - overcommitRatio float64 + OvercommitRatio float64 } func NewCluster(store *state.Store, tlsConfig *tls.Config, overcommitRatio float64) *Cluster { return &Cluster{ - tlsConfig: tlsConfig, + TLSConfig: tlsConfig, nodes: make(map[string]*Node), store: store, - overcommitRatio: overcommitRatio, + OvercommitRatio: overcommitRatio, } } @@ -98,24 +97,6 @@ func (c *Cluster) AddNode(n *Node) error { return n.Events(c) } -func (c *Cluster) UpdateNodes(entries []*discovery.Entry) { - for _, entry := range entries { - go func(m *discovery.Entry) { - if c.Node(m.String()) == nil { - n := NewNode(m.String(), c.overcommitRatio) - if err := n.Connect(c.tlsConfig); err != nil { - log.Error(err) - return - } - if err := c.AddNode(n); err != nil { - log.Error(err) - return - } - } - }(entry) - } -} - // Containers returns all the containers in the cluster. func (c *Cluster) Containers() []*Container { c.RLock() diff --git a/main.go b/main.go index 4399b45581..a3829d779e 100644 --- a/main.go +++ b/main.go @@ -102,7 +102,7 @@ func main() { ShortName: "m", Usage: "manage a docker cluster", Flags: []cli.Flag{ - flStore, + flStore, flScheduler, flStrategy, flFilter, flHosts, flHeartBeat, flOverCommit, flTls, flTlsCaCert, flTlsCert, flTlsKey, flTlsVerify, diff --git a/manage.go b/manage.go index d4b8a8729e..4de23a7cb3 100644 --- a/manage.go +++ b/manage.go @@ -120,37 +120,36 @@ func manage(c *cli.Context) { log.Fatal(err) } - // get the list of nodes from the discovery service + sched, err := scheduler.New(c.String("scheduler"), + cluster, + s, + fs, + ) + if err != nil { + log.Fatal(err) + } + + // get the list of entries from the discovery service go func() { d, err := discovery.New(dflag, c.Int("heartbeat")) if err != nil { log.Fatal(err) } - nodes, err := d.Fetch() + entries, err := d.Fetch() if err != nil { log.Fatal(err) } - cluster.UpdateNodes(nodes) + sched.NewEntries(entries) - go d.Watch(cluster.UpdateNodes) + go d.Watch(sched.NewEntries) }() - sched, err := scheduler.New(c.String("scheduler"), - cluster, - s, - fs, - ) - - if err != nil { - log.Fatal(err) - } - // see https://github.com/codegangsta/cli/issues/160 hosts := c.StringSlice("host") if c.IsSet("host") || c.IsSet("H") { hosts = hosts[1:] } - log.Fatal(api.ListenAndServe(cluster, sched, hosts, c.Bool("cors"), tlsConfig)) + log.Fatal(api.ListenAndServe(sched, hosts, c.Bool("cors"), tlsConfig)) } diff --git a/scheduler/builtin/builtin.go b/scheduler/builtin/builtin.go index 69327b29c4..155a59e234 100644 --- a/scheduler/builtin/builtin.go +++ b/scheduler/builtin/builtin.go @@ -3,7 +3,9 @@ package builtin import ( "sync" + log "github.com/Sirupsen/logrus" "github.com/docker/swarm/cluster" + "github.com/docker/swarm/discovery" "github.com/docker/swarm/filter" "github.com/docker/swarm/strategy" "github.com/samalba/dockerclient" @@ -37,11 +39,6 @@ func (s *BuiltinScheduler) selectNodeForContainer(config *dockerclient.Container // Schedule a brand new container into the cluster. func (s *BuiltinScheduler) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) { - /*Disable for now - if config.Memory == 0 || config.CpuShares == 0 { - return nil, fmt.Errorf("Creating containers in clustering mode requires resource constraints (-c and -m) to be set") - } - */ s.Lock() defer s.Unlock() @@ -61,3 +58,38 @@ func (s *BuiltinScheduler) RemoveContainer(container *cluster.Container, force b return s.cluster.DestroyContainer(container, force) } + +// Entries are Docker Nodes +func (s *BuiltinScheduler) NewEntries(entries []*discovery.Entry) { + for _, entry := range entries { + go func(m *discovery.Entry) { + if s.cluster.Node(m.String()) == nil { + n := cluster.NewNode(m.String(), s.cluster.OvercommitRatio) + if err := n.Connect(s.cluster.TLSConfig); err != nil { + log.Error(err) + return + } + if err := s.cluster.AddNode(n); err != nil { + log.Error(err) + return + } + } + }(entry) + } +} + +func (s *BuiltinScheduler) Events(eventsHandler cluster.EventHandler) { + s.cluster.Events(eventsHandler) +} + +func (s *BuiltinScheduler) Nodes() []*cluster.Node { + return s.cluster.Nodes() +} + +func (s *BuiltinScheduler) Containers() []*cluster.Container { + return s.cluster.Containers() +} + +func (s *BuiltinScheduler) Container(IdOrName string) *cluster.Container { + return s.cluster.Container(IdOrName) +} diff --git a/scheduler/mesos/mesos.go b/scheduler/mesos/mesos.go new file mode 100644 index 0000000000..3bb62b0ec2 --- /dev/null +++ b/scheduler/mesos/mesos.go @@ -0,0 +1,74 @@ +package mesos + +import ( + "errors" + + "github.com/docker/swarm/cluster" + "github.com/docker/swarm/discovery" + "github.com/docker/swarm/filter" + "github.com/docker/swarm/strategy" + "github.com/samalba/dockerclient" +) + +var ErrNotImplemented = errors.New("not implemented in the mesos scheduler") + +type MesosScheduler struct { + cluster *cluster.Cluster + strategy strategy.PlacementStrategy + filters []filter.Filter +} + +func (s *MesosScheduler) Initialize(cluster *cluster.Cluster, strategy strategy.PlacementStrategy, filters []filter.Filter) { + s.cluster = cluster + s.strategy = strategy + s.filters = filters +} + +// Schedule a brand new container into the cluster. +func (s *MesosScheduler) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) { + + //TODO: RequestOffers from mesos master + + //TODO: pick the right offer (using strategy & filters ???) + + //TODO: LaunchTask on the Mesos cluster + + return nil, ErrNotImplemented +} + +// Remove a container from the cluster. Containers should always be destroyed +// through the scheduler to guarantee atomicity. +func (s *MesosScheduler) RemoveContainer(container *cluster.Container, force bool) error { + + //TODO: KillTask + + //TODO: remove container + + return ErrNotImplemented +} + +// Entries are Mesos masters +func (s *MesosScheduler) NewEntries(entries []*discovery.Entry) { + + //TODO: get list of actual docker nodes from mesos masters + + //TODO: create direct connection to those nodes + + //TODO: add them to the cluster +} + +func (s *MesosScheduler) Events(eventsHandler cluster.EventHandler) { + s.cluster.Events(eventsHandler) +} + +func (s *MesosScheduler) Nodes() []*cluster.Node { + return s.cluster.Nodes() +} + +func (s *MesosScheduler) Containers() []*cluster.Container { + return s.cluster.Containers() +} + +func (s *MesosScheduler) Container(IdOrName string) *cluster.Container { + return s.cluster.Container(IdOrName) +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 9c3f8a219c..e647b1e7d8 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -1,12 +1,14 @@ package scheduler import ( - "errors" + "fmt" log "github.com/Sirupsen/logrus" "github.com/docker/swarm/cluster" + "github.com/docker/swarm/discovery" "github.com/docker/swarm/filter" "github.com/docker/swarm/scheduler/builtin" + "github.com/docker/swarm/scheduler/mesos" "github.com/docker/swarm/strategy" "github.com/samalba/dockerclient" ) @@ -15,16 +17,21 @@ type Scheduler interface { Initialize(cluster *cluster.Cluster, strategy strategy.PlacementStrategy, filters []filter.Filter) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) RemoveContainer(container *cluster.Container, force bool) error + + Events(eventsHandler cluster.EventHandler) + Nodes() []*cluster.Node + Containers() []*cluster.Container + Container(IdOrName string) *cluster.Container + + NewEntries(entries []*discovery.Entry) } -var ( - schedulers map[string]Scheduler - ErrNotSupported = errors.New("scheduler not supported") -) +var schedulers map[string]Scheduler func init() { schedulers = map[string]Scheduler{ "builtin": &builtin.BuiltinScheduler{}, + "mesos": &mesos.MesosScheduler{}, } } @@ -34,5 +41,5 @@ func New(name string, cluster *cluster.Cluster, strategy strategy.PlacementStrat scheduler.Initialize(cluster, strategy, filters) return scheduler, nil } - return nil, ErrNotSupported + return nil, fmt.Errorf("scheduler %q not supported", name) }