From 79575f3df6589839c10f4dd0d099e37890dbc185 Mon Sep 17 00:00:00 2001 From: Nishant Totla Date: Thu, 10 Mar 2016 14:10:37 -0800 Subject: [PATCH] Updating ContainerConfig to use engine-api Signed-off-by: Nishant Totla --- api/handlers.go | 33 +++++++----- cluster/config.go | 52 ++++--------------- cluster/container.go | 12 ++--- cluster/engine.go | 80 ++++++++++++++++------------- cluster/watchdog.go | 14 ++--- scheduler/node/node.go | 2 +- scheduler/strategy/weighted_node.go | 6 +-- 7 files changed, 90 insertions(+), 109 deletions(-) diff --git a/api/handlers.go b/api/handlers.go index 7bfd8febc8..d39bf3d7ea 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -388,7 +388,7 @@ func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) { } else if len(filters.Get("name")) > 0 { continue } - if !filters.Match("id", container.Id) { + if !filters.Match("id", container.ID) { continue } if !filters.MatchKVList("label", container.Config.Labels) { @@ -427,7 +427,7 @@ func getContainersJSON(c *context, w http.ResponseWriter, r *http.Request) { out := []*dockerclient.Container{} for _, container := range candidates { if before != nil { - if container.Id == before.Id { + if container.ID == before.ID { before = nil } continue @@ -490,7 +490,7 @@ func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) { return } - resp, err := client.Get(scheme + "://" + container.Engine.Addr + "/containers/" + container.Id + "/json") + resp, err := client.Get(scheme + "://" + container.Engine.Addr + "/containers/" + container.ID + "/json") container.Engine.CheckConnectionErr(err) if err != nil { httpError(w, err.Error(), http.StatusInternalServerError) @@ -526,12 +526,17 @@ func getContainerJSON(c *context, w http.ResponseWriter, r *http.Request) { func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) { r.ParseForm() var ( - config = dockerclient.ContainerConfig{ + oldConfig = dockerclient.ContainerConfig{ HostConfig: dockerclient.HostConfig{ MemorySwappiness: -1, }, } - name = r.Form.Get("name") + name = r.Form.Get("name") + config = cluster.ContainerConfig{ + HostConfig: containertypes.HostConfig{ + MemorySwappiness: -1, + }, + } ) if err := json.NewDecoder(r.Body).Decode(&config); err != nil { @@ -539,13 +544,13 @@ func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) { return } // Pass auth information along if present - var authConfig *dockerclient.AuthConfig + var authConfig *apitypes.AuthConfig buf, err := base64.URLEncoding.DecodeString(r.Header.Get("X-Registry-Auth")) if err == nil { - authConfig = &dockerclient.AuthConfig{} + authConfig = &apitypes.AuthConfig{} json.Unmarshal(buf, authConfig) } - containerConfig := cluster.BuildContainerConfig(config) + containerConfig := cluster.BuildContainerConfig(config.Config, config.HostConfig, config.NetworkingConfig) if err := containerConfig.Validate(); err != nil { httpError(w, err.Error(), http.StatusInternalServerError) return @@ -563,7 +568,7 @@ func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusCreated) - fmt.Fprintf(w, "{%q:%q}", "Id", container.Id) + fmt.Fprintf(w, "{%q:%q}", "Id", container.ID) return } @@ -820,7 +825,7 @@ func postContainersExec(c *context, w http.ResponseWriter, r *http.Request) { return } - resp, err := client.Post(scheme+"://"+container.Engine.Addr+"/containers/"+container.Id+"/exec", "application/json", r.Body) + resp, err := client.Post(scheme+"://"+container.Engine.Addr+"/containers/"+container.ID+"/exec", "application/json", r.Body) container.Engine.CheckConnectionErr(err) if err != nil { httpError(w, err.Error(), http.StatusInternalServerError) @@ -1044,7 +1049,7 @@ func proxyContainer(c *context, w http.ResponseWriter, r *http.Request) { // Set the full container ID in the proxied URL path. if name != "" { - r.URL.Path = strings.Replace(r.URL.Path, name, container.Id, 1) + r.URL.Path = strings.Replace(r.URL.Path, name, container.ID, 1) } err = proxy(container.Engine, w, r) @@ -1067,7 +1072,7 @@ func proxyContainerAndForceRefresh(c *context, w http.ResponseWriter, r *http.Re // Set the full container ID in the proxied URL path. if name != "" { - r.URL.Path = strings.Replace(r.URL.Path, name, container.Id, 1) + r.URL.Path = strings.Replace(r.URL.Path, name, container.ID, 1) } cb := func(resp *http.Response) { @@ -1200,7 +1205,7 @@ func postCommit(c *context, w http.ResponseWriter, r *http.Request) { } // Set the full container ID in the proxied URL path. if name != "" { - r.URL.RawQuery = strings.Replace(r.URL.RawQuery, name, container.Id, 1) + r.URL.RawQuery = strings.Replace(r.URL.RawQuery, name, container.ID, 1) } cb := func(resp *http.Response) { @@ -1317,7 +1322,7 @@ func proxyHijack(c *context, w http.ResponseWriter, r *http.Request) { } // Set the full container ID in the proxied URL path. if name != "" { - r.URL.Path = strings.Replace(r.URL.Path, name, container.Id, 1) + r.URL.Path = strings.Replace(r.URL.Path, name, container.ID, 1) } err = hijack(c.tlsConfig, container.Engine.Addr, w, r) diff --git a/cluster/config.go b/cluster/config.go index f68a85dbbf..2cc384ade6 100644 --- a/cluster/config.go +++ b/cluster/config.go @@ -6,7 +6,8 @@ import ( "fmt" "strings" - "github.com/samalba/dockerclient" + "github.com/docker/engine-api/types/container" + "github.com/docker/engine-api/types/network" ) // SwarmLabelNamespace defines the key prefix in all custom labels @@ -15,7 +16,10 @@ const SwarmLabelNamespace = "com.docker.swarm" // ContainerConfig is exported // TODO store affinities and constraints in their own fields type ContainerConfig struct { - dockerclient.ContainerConfig + // dockerclient.ContainerConfig + container.Config + container.HostConfig + network.NetworkingConfig } func parseEnv(e string) (bool, string, string) { @@ -26,44 +30,8 @@ func parseEnv(e string) (bool, string, string) { return false, "", "" } -// FIXME: Temporary fix to handle forward/backward compatibility between Docker <1.6 and >=1.7 -// ContainerConfig should be handling converting to/from different docker versions -func consolidateResourceFields(c *dockerclient.ContainerConfig) { - if c.Memory != c.HostConfig.Memory { - if c.Memory != 0 { - c.HostConfig.Memory = c.Memory - } else { - c.Memory = c.HostConfig.Memory - } - } - - if c.MemorySwap != c.HostConfig.MemorySwap { - if c.MemorySwap != 0 { - c.HostConfig.MemorySwap = c.MemorySwap - } else { - c.MemorySwap = c.HostConfig.MemorySwap - } - } - - if c.CpuShares != c.HostConfig.CpuShares { - if c.CpuShares != 0 { - c.HostConfig.CpuShares = c.CpuShares - } else { - c.CpuShares = c.HostConfig.CpuShares - } - } - - if c.Cpuset != c.HostConfig.CpusetCpus { - if c.Cpuset != "" { - c.HostConfig.CpusetCpus = c.Cpuset - } else { - c.Cpuset = c.HostConfig.CpusetCpus - } - } -} - -// BuildContainerConfig creates a cluster.ContainerConfig from a dockerclient.ContainerConfig -func BuildContainerConfig(c dockerclient.ContainerConfig) *ContainerConfig { +// BuildContainerConfig creates a cluster.ContainerConfig from a Config, HostConfig, and NetworkingConfig +func BuildContainerConfig(c container.Config, h container.HostConfig, n network.NetworkingConfig) *ContainerConfig { var ( affinities []string constraints []string @@ -128,9 +96,7 @@ func BuildContainerConfig(c dockerclient.ContainerConfig) *ContainerConfig { } } - consolidateResourceFields(&c) - - return &ContainerConfig{c} + return &ContainerConfig{c, h, n} } func (c *ContainerConfig) extractExprs(key string) []string { diff --git a/cluster/container.go b/cluster/container.go index 447c3628b8..645f63a805 100644 --- a/cluster/container.go +++ b/cluster/container.go @@ -4,21 +4,21 @@ import ( "strings" "github.com/docker/docker/pkg/stringid" - "github.com/samalba/dockerclient" + "github.com/docker/engine-api/types" ) // Container is exported type Container struct { - dockerclient.Container + types.Container Config *ContainerConfig - Info dockerclient.ContainerInfo + Info types.ContainerJSON Engine *Engine } // Refresh container func (c *Container) Refresh() (*Container, error) { - return c.Engine.refreshContainer(c.Id, true) + return c.Engine.refreshContainer(c.ID, true) } // Containers represents a list of containers @@ -33,7 +33,7 @@ func (containers Containers) Get(IDOrName string) *Container { // Match exact or short Container ID. for _, container := range containers { - if container.Id == IDOrName || stringid.TruncateID(container.Id) == IDOrName { + if container.ID == IDOrName || stringid.TruncateID(container.ID) == IDOrName { return container } } @@ -68,7 +68,7 @@ func (containers Containers) Get(IDOrName string) *Container { // Match Container ID prefix. for _, container := range containers { - if strings.HasPrefix(container.Id, IDOrName) { + if strings.HasPrefix(container.ID, IDOrName) { candidates = append(candidates, container) } } diff --git a/cluster/engine.go b/cluster/engine.go index b20c3404ad..5e235da3b9 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -20,6 +20,7 @@ import ( engineapi "github.com/docker/engine-api/client" "github.com/docker/engine-api/types" "github.com/docker/engine-api/types/filters" + networktypes "github.com/docker/engine-api/types/network" engineapinop "github.com/docker/swarm/api/nopclient" "github.com/samalba/dockerclient" "github.com/samalba/dockerclient/nopclient" @@ -627,7 +628,11 @@ func (e *Engine) RefreshVolumes() error { // true, each container will be inspected. // FIXME: unexport this method after mesos scheduler stops using it directly func (e *Engine) RefreshContainers(full bool) error { - containers, err := e.client.ListContainers(true, false, "") + opts := types.ContainerListOptions{ + All: true, + Size: false, + } + containers, err := e.apiClient.ContainerList(opts) e.CheckConnectionErr(err) if err != nil { return err @@ -637,7 +642,7 @@ func (e *Engine) RefreshContainers(full bool) error { for _, c := range containers { mergedUpdate, err := e.updateContainer(c, merged, full) if err != nil { - log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Unable to update state of container %q: %v", c.Id, err) + log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Errorf("Unable to update state of container %q: %v", c.ID, err) } else { merged = mergedUpdate } @@ -653,7 +658,14 @@ func (e *Engine) RefreshContainers(full bool) error { // Refresh the status of a container running on the engine. If `full` is true, // the container will be inspected. func (e *Engine) refreshContainer(ID string, full bool) (*Container, error) { - containers, err := e.client.ListContainers(true, false, fmt.Sprintf("{%q:[%q]}", "id", ID)) + filterArgs := filters.NewArgs() + filterArgs.Add("id", ID) + opts := types.ContainerListOptions{ + All: true, + Size: false, + Filter: filterArgs, + } + containers, err := e.apiClient.ContainerList(opts) e.CheckConnectionErr(err) if err != nil { return nil, err @@ -676,16 +688,16 @@ func (e *Engine) refreshContainer(ID string, full bool) (*Container, error) { _, err = e.updateContainer(containers[0], e.containers, full) e.RLock() - container := e.containers[containers[0].Id] + container := e.containers[containers[0].ID] e.RUnlock() return container, err } -func (e *Engine) updateContainer(c dockerclient.Container, containers map[string]*Container, full bool) (map[string]*Container, error) { +func (e *Engine) updateContainer(c types.Container, containers map[string]*Container, full bool) (map[string]*Container, error) { var container *Container e.RLock() - if current, exists := e.containers[c.Id]; exists { + if current, exists := e.containers[c.ID]; exists { // The container is already known. container = current // Restarting is a transit state. Unfortunately Docker doesn't always emit @@ -708,30 +720,30 @@ func (e *Engine) updateContainer(c dockerclient.Container, containers map[string // Update ContainerInfo. if full { - info, err := e.client.InspectContainer(c.Id) + info, err := e.apiClient.ContainerInspect(c.ID) e.CheckConnectionErr(err) if err != nil { return nil, err } // Convert the ContainerConfig from inspect into our own // cluster.ContainerConfig. - if info.HostConfig != nil { - info.Config.HostConfig = *info.HostConfig - } - container.Config = BuildContainerConfig(*info.Config) - // FIXME remove "duplicate" lines and move this to cluster/config.go - container.Config.CpuShares = container.Config.CpuShares * int64(e.Cpus) / 1024.0 - container.Config.HostConfig.CpuShares = container.Config.CpuShares + info.HostConfig.CPUShares = info.HostConfig.CPUShares * int64(e.Cpus) / 1024.0 + networkingConfig := networktypes.NetworkingConfig{ + EndpointsConfig: info.NetworkSettings.Networks, + } + container.Config = BuildContainerConfig(*info.Config, *info.HostConfig, networkingConfig) + // FIXME remove "duplicate" line and move this to cluster/config.go + container.Config.CPUShares = container.Config.CPUShares * int64(e.Cpus) / 1024.0 // Save the entire inspect back into the container. - container.Info = *info + container.Info = info } // Update its internal state. e.Lock() container.Container = c - containers[container.Id] = container + containers[container.ID] = container e.Unlock() return containers, nil @@ -829,7 +841,7 @@ func (e *Engine) UsedCpus() int64 { var r int64 e.RLock() for _, c := range e.containers { - r += c.Config.CpuShares + r += c.Config.CPUShares } e.RUnlock() return r @@ -848,23 +860,21 @@ func (e *Engine) TotalCpus() int { // Create a new container func (e *Engine) Create(config *ContainerConfig, name string, pullImage bool, authConfig *dockerclient.AuthConfig) (*Container, error) { var ( - err error - id string - client = e.client + err error + createResp types.ContainerCreateResponse ) // Convert our internal ContainerConfig into something Docker will // understand. Start by making a copy of the internal ContainerConfig as // we don't want to mess with the original. - dockerConfig := config.ContainerConfig + dockerConfig := config // nb of CPUs -> real CpuShares // FIXME remove "duplicate" lines and move this to cluster/config.go - dockerConfig.CpuShares = int64(math.Ceil(float64(config.CpuShares*1024) / float64(e.Cpus))) - dockerConfig.HostConfig.CpuShares = dockerConfig.CpuShares + dockerConfig.CPUShares = int64(math.Ceil(float64(config.CPUShares*1024) / float64(e.Cpus))) - id, err = client.CreateContainer(&dockerConfig, name, nil) + createResp, err = e.apiClient.ContainerCreate(&dockerConfig.Config, &dockerConfig.HostConfig, &dockerConfig.NetworkingConfig, name) e.CheckConnectionErr(err) if err != nil { // If the error is other than not found, abort immediately. @@ -876,7 +886,7 @@ func (e *Engine) Create(config *ContainerConfig, name string, pullImage bool, au return nil, err } // ...And try again. - id, err = client.CreateContainer(&dockerConfig, name, nil) + createResp, err = e.apiClient.ContainerCreate(&dockerConfig.Config, &dockerConfig.HostConfig, &dockerConfig.NetworkingConfig, name) e.CheckConnectionErr(err) if err != nil { return nil, err @@ -885,12 +895,12 @@ func (e *Engine) Create(config *ContainerConfig, name string, pullImage bool, au // Register the container immediately while waiting for a state refresh. // Force a state refresh to pick up the newly created container. - e.refreshContainer(id, true) + e.refreshContainer(createResp.ID, true) e.RefreshVolumes() e.RefreshNetworks() e.Lock() - container := e.containers[id] + container := e.containers[createResp.ID] e.Unlock() if container == nil { @@ -901,7 +911,7 @@ func (e *Engine) Create(config *ContainerConfig, name string, pullImage bool, au // RemoveContainer removes a container from the engine. func (e *Engine) RemoveContainer(container *Container, force, volumes bool) error { - err := e.client.RemoveContainer(container.Id, force, volumes) + err := e.client.RemoveContainer(container.ID, force, volumes) e.CheckConnectionErr(err) if err != nil { return err @@ -911,7 +921,7 @@ func (e *Engine) RemoveContainer(container *Container, force, volumes bool) erro // will rewrite this. e.Lock() defer e.Unlock() - delete(e.containers, container.Id) + delete(e.containers, container.ID) return nil } @@ -1112,10 +1122,10 @@ func (e *Engine) AddContainer(container *Container) error { e.Lock() defer e.Unlock() - if _, ok := e.containers[container.Id]; ok { + if _, ok := e.containers[container.ID]; ok { return errors.New("container already exists") } - e.containers[container.Id] = container + e.containers[container.ID] = container return nil } @@ -1132,10 +1142,10 @@ func (e *Engine) removeContainer(container *Container) error { e.Lock() defer e.Unlock() - if _, ok := e.containers[container.Id]; !ok { + if _, ok := e.containers[container.ID]; !ok { return errors.New("container not found") } - delete(e.containers, container.Id) + delete(e.containers, container.ID) return nil } @@ -1162,14 +1172,14 @@ func (e *Engine) StartContainer(id string, hostConfig *dockerclient.HostConfig) // RenameContainer renames a container func (e *Engine) RenameContainer(container *Container, newName string) error { // send rename request - err := e.client.RenameContainer(container.Id, newName) + err := e.client.RenameContainer(container.ID, newName) e.CheckConnectionErr(err) if err != nil { return err } // refresh container - _, err = e.refreshContainer(container.Id, true) + _, err = e.refreshContainer(container.ID, true) return err } diff --git a/cluster/watchdog.go b/cluster/watchdog.go index c208d2db79..aa43227a6c 100644 --- a/cluster/watchdog.go +++ b/cluster/watchdog.go @@ -45,10 +45,10 @@ func (w *Watchdog) removeDuplicateContainers(e *Engine) { for _, containerInCluster := range w.cluster.Containers() { if containerInCluster.Config.SwarmID() == container.Config.SwarmID() && containerInCluster.Engine.ID != container.Engine.ID { - log.Debugf("container %s was rescheduled on node %s, removing it", container.Id, containerInCluster.Engine.Name) + log.Debugf("container %s was rescheduled on node %s, removing it", container.ID, containerInCluster.Engine.Name) // container already exists in the cluster, destroy it if err := e.RemoveContainer(container, true, true); err != nil { - log.Errorf("Failed to remove duplicate container %s on node %s: %v", container.Id, containerInCluster.Engine.Name, err) + log.Errorf("Failed to remove duplicate container %s on node %s: %v", container.ID, containerInCluster.Engine.Name, err) } } } @@ -65,7 +65,7 @@ func (w *Watchdog) rescheduleContainers(e *Engine) { // Skip containers which don't have an "on-node-failure" reschedule policy. if !c.Config.HasReschedulePolicy("on-node-failure") { - log.Debugf("Skipping rescheduling of %s based on rescheduling policies", c.Id) + log.Debugf("Skipping rescheduling of %s based on rescheduling policies", c.ID) continue } @@ -78,15 +78,15 @@ func (w *Watchdog) rescheduleContainers(e *Engine) { newContainer, err := w.cluster.CreateContainer(c.Config, c.Info.Name, nil) if err != nil { - log.Errorf("Failed to reschedule container %s: %v", c.Id, err) + log.Errorf("Failed to reschedule container %s: %v", c.ID, err) // add the container back, so we can retry later c.Engine.AddContainer(c) } else { - log.Infof("Rescheduled container %s from %s to %s as %s", c.Id, c.Engine.Name, newContainer.Engine.Name, newContainer.Id) + log.Infof("Rescheduled container %s from %s to %s as %s", c.ID, c.Engine.Name, newContainer.Engine.Name, newContainer.ID) if c.Info.State.Running { - log.Infof("Container %s was running, starting container %s", c.Id, newContainer.Id) + log.Infof("Container %s was running, starting container %s", c.ID, newContainer.ID) if err := w.cluster.StartContainer(newContainer, nil); err != nil { - log.Errorf("Failed to start rescheduled container %s: %v", newContainer.Id, err) + log.Errorf("Failed to start rescheduled container %s: %v", newContainer.ID, err) } } } diff --git a/scheduler/node/node.go b/scheduler/node/node.go index 01c4ba1cde..0fec4065b2 100644 --- a/scheduler/node/node.go +++ b/scheduler/node/node.go @@ -56,7 +56,7 @@ func (n *Node) Container(IDOrName string) *cluster.Container { func (n *Node) AddContainer(container *cluster.Container) error { if container.Config != nil { memory := container.Config.Memory - cpus := container.Config.CpuShares + cpus := container.Config.CPUShares if n.TotalMemory-memory < 0 || n.TotalCpus-cpus < 0 { return errors.New("not enough resources") } diff --git a/scheduler/strategy/weighted_node.go b/scheduler/strategy/weighted_node.go index 5b5560e917..2bda25a5c4 100644 --- a/scheduler/strategy/weighted_node.go +++ b/scheduler/strategy/weighted_node.go @@ -44,7 +44,7 @@ func weighNodes(config *cluster.ContainerConfig, nodes []*node.Node, healthiness nodeCpus := node.TotalCpus // Skip nodes that are smaller than the requested resources. - if nodeMemory < int64(config.Memory) || nodeCpus < config.CpuShares { + if nodeMemory < int64(config.Memory) || nodeCpus < config.CPUShares { continue } @@ -53,8 +53,8 @@ func weighNodes(config *cluster.ContainerConfig, nodes []*node.Node, healthiness memoryScore int64 = 100 ) - if config.CpuShares > 0 { - cpuScore = (node.UsedCpus + config.CpuShares) * 100 / nodeCpus + if config.CPUShares > 0 { + cpuScore = (node.UsedCpus + config.CPUShares) * 100 / nodeCpus } if config.Memory > 0 { memoryScore = (node.UsedMemory + config.Memory) * 100 / nodeMemory