diff --git a/aci/compose.go b/aci/compose.go index e77d3a6b7..444f643b9 100644 --- a/aci/compose.go +++ b/aci/compose.go @@ -56,7 +56,7 @@ func (cs *aciComposeService) Pull(ctx context.Context, project *types.Project) e return errdefs.ErrNotImplemented } -func (cs *aciComposeService) Up(ctx context.Context, project *types.Project, detach bool) error { +func (cs *aciComposeService) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error { logrus.Debugf("Up on project with name %q", project.Name) if err := autocreateFileshares(ctx, project); err != nil { diff --git a/api/client/compose.go b/api/client/compose.go index bc22be484..9e5ca09ee 100644 --- a/api/client/compose.go +++ b/api/client/compose.go @@ -41,7 +41,7 @@ func (c *composeService) Pull(ctx context.Context, project *types.Project) error return errdefs.ErrNotImplemented } -func (c *composeService) Up(context.Context, *types.Project, bool) error { +func (c *composeService) Up(context.Context, *types.Project, bool, io.Writer) error { return errdefs.ErrNotImplemented } diff --git a/api/compose/api.go b/api/compose/api.go index 6ac4ac454..9295fae95 100644 --- a/api/compose/api.go +++ b/api/compose/api.go @@ -32,7 +32,7 @@ type Service interface { // Pull executes the equivalent of a `compose pull` Pull(ctx context.Context, project *types.Project) error // Up executes the equivalent to a `compose up` - Up(ctx context.Context, project *types.Project, detach bool) error + Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error // Down executes the equivalent to a `compose down` Down(ctx context.Context, projectName string) error // Logs executes the equivalent to a `compose logs` diff --git a/cli/cmd/compose/up.go b/cli/cmd/compose/up.go index 907209da9..d02606aa0 100644 --- a/cli/cmd/compose/up.go +++ b/cli/cmd/compose/up.go @@ -18,13 +18,14 @@ package compose import ( "context" + "github.com/docker/compose-cli/progress" + "os" "github.com/compose-spec/compose-go/cli" "github.com/spf13/cobra" "github.com/docker/compose-cli/api/client" "github.com/docker/compose-cli/context/store" - "github.com/docker/compose-cli/progress" ) func upCommand(contextType string) *cobra.Command { @@ -54,25 +55,26 @@ func runUp(ctx context.Context, opts composeOptions, services []string) error { return err } - _, err = progress.Run(ctx, func(ctx context.Context) (string, error) { - options, err := opts.toProjectOptions() - if err != nil { - return "", err - } - project, err := cli.ProjectFromOptions(options) - if err != nil { - return "", err - } - if opts.DomainName != "" { - // arbitrarily set the domain name on the first service ; ACI backend will expose the entire project - project.Services[0].DomainName = opts.DomainName - } + options, err := opts.toProjectOptions() + if err != nil { + return err + } + project, err := cli.ProjectFromOptions(options) + if err != nil { + return err + } + if opts.DomainName != "" { + // arbitrarily set the domain name on the first service ; ACI backend will expose the entire project + project.Services[0].DomainName = opts.DomainName + } - err = filter(project, services) - if err != nil { - return "", err - } - return "", c.ComposeService().Up(ctx, project, opts.Detach) + err = filter(project, services) + if err != nil { + return err + } + + _, err = progress.Run(ctx, func(ctx context.Context) (string, error) { + return "", c.ComposeService().Up(ctx, project, opts.Detach, os.Stdout) }) return err } diff --git a/ecs/local/compose.go b/ecs/local/compose.go index c1d8b8cdf..8d7130bb0 100644 --- a/ecs/local/compose.go +++ b/ecs/local/compose.go @@ -53,7 +53,7 @@ func (e ecsLocalSimulation) Pull(ctx context.Context, project *types.Project) er return errdefs.ErrNotImplemented } -func (e ecsLocalSimulation) Up(ctx context.Context, project *types.Project, detach bool) error { +func (e ecsLocalSimulation) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error { cmd := exec.Command("docker-compose", "version", "--short") b := bytes.Buffer{} b.WriteString("v") diff --git a/ecs/logs.go b/ecs/logs.go index 5d9e98470..bdda7451a 100644 --- a/ecs/logs.go +++ b/ecs/logs.go @@ -24,7 +24,7 @@ import ( ) func (b *ecsAPIService) Logs(ctx context.Context, project string, w io.Writer) error { - consumer := formatter.NewLogConsumer(w) + consumer := formatter.NewLogConsumer(ctx, w) err := b.aws.GetLogs(ctx, project, consumer.Log) return err } diff --git a/ecs/up.go b/ecs/up.go index a5c1ce3af..b1147f7e0 100644 --- a/ecs/up.go +++ b/ecs/up.go @@ -19,6 +19,7 @@ package ecs import ( "context" "fmt" + "io" "os" "os/signal" "syscall" @@ -39,7 +40,7 @@ func (b *ecsAPIService) Pull(ctx context.Context, project *types.Project) error return errdefs.ErrNotImplemented } -func (b *ecsAPIService) Up(ctx context.Context, project *types.Project, detach bool) error { +func (b *ecsAPIService) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error { err := b.aws.CheckRequirements(ctx, b.Region) if err != nil { return err diff --git a/example/backend.go b/example/backend.go index 480783177..533ace820 100644 --- a/example/backend.go +++ b/example/backend.go @@ -151,7 +151,7 @@ func (cs *composeService) Pull(ctx context.Context, project *types.Project) erro return errdefs.ErrNotImplemented } -func (cs *composeService) Up(ctx context.Context, project *types.Project, detach bool) error { +func (cs *composeService) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error { fmt.Printf("Up command on project %q", project.Name) return nil } diff --git a/formatter/logs.go b/formatter/logs.go index 3f68b7a26..8ad062252 100644 --- a/formatter/logs.go +++ b/formatter/logs.go @@ -18,6 +18,7 @@ package formatter import ( "bytes" + "context" "fmt" "io" "strconv" @@ -25,8 +26,9 @@ import ( ) // NewLogConsumer creates a new LogConsumer -func NewLogConsumer(w io.Writer) LogConsumer { +func NewLogConsumer(ctx context.Context, w io.Writer) LogConsumer { return LogConsumer{ + ctx: ctx, colors: map[string]colorFunc{}, width: 0, writer: w, @@ -35,6 +37,9 @@ func NewLogConsumer(w io.Writer) LogConsumer { // Log formats a log message as received from service/container func (l *LogConsumer) Log(service, container, message string) { + if l.ctx.Err() == context.Canceled { + return + } cf, ok := l.colors[service] if !ok { cf = <-loop @@ -70,6 +75,7 @@ func (l *LogConsumer) computeWidth() { // LogConsumer consume logs from services and format them type LogConsumer struct { + ctx context.Context colors map[string]colorFunc width int writer io.Writer diff --git a/local/compose.go b/local/compose.go index 790fb5dba..c736c8ed7 100644 --- a/local/compose.go +++ b/local/compose.go @@ -293,13 +293,14 @@ func toProgressEvent(prefix string, jm jsonmessage.JSONMessage, w progress.Write }) } -func (s *composeService) Up(ctx context.Context, project *types.Project, detach bool) error { +func (s *composeService) Up(ctx context.Context, project *types.Project, detach bool, w io.Writer) error { err := s.ensureImagesExists(ctx, project) if err != nil { return err } + for k, network := range project.Networks { - if !network.External.External && network.Name == k { + if !network.External.External && network.Name != "" { network.Name = fmt.Sprintf("%s_%s", project.Name, k) project.Networks[k] = network } @@ -329,9 +330,117 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, detach err = InDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error { return s.ensureService(c, project, service) }) + if err != nil { + return err + } + + if detach { + err = inDependencyOrder(ctx, project, func(c context.Context, service types.ServiceConfig) error { + return s.startService(ctx, project, service) + }) + return err + } + + if detach { + return nil + } + + progress.ContextWriter(ctx).Stop() + return s.attach(ctx, project, w) +} + +func (s *composeService) attach(ctx context.Context, project *types.Project, w io.Writer) error { + consumer := formatter.NewLogConsumer(ctx, w) + containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{ + Filters: filters.NewArgs( + projectFilter(project.Name), + ), + All: true, + }) + if err != nil { + return err + } + + var names []string + for _, c := range containers { + names = append(names, getContainerName(c)) + } + fmt.Printf("Attaching to %s\n", strings.Join(names, ", ")) + + eg, ctx := errgroup.WithContext(ctx) + for _, c := range containers { + container := c + + eg.Go(func() error { + return s.attachContainer(ctx, container, project, consumer) + }) + } + + eg.Go(func() error { + <-ctx.Done() + fmt.Println("Gracefully stopping...") + ctx = context.Background() + _, err = progress.Run(ctx, func(ctx context.Context) (string, error) { + return "", s.Down(ctx, project.Name) + }) + return nil + }) + + return eg.Wait() +} + +func (s *composeService) attachContainer(ctx context.Context, container moby.Container, project *types.Project, consumer formatter.LogConsumer) error { + serviceName := container.Labels[serviceLabel] + service, err := project.GetService(serviceName) + if err != nil { + return err + } + reader, err := s.getContainerStdout(ctx, container) + if err != nil { + return err + } + + w := consumer.GetWriter(serviceName, container.ID) + if service.Tty { + _, err = io.Copy(w, reader) + } else { + _, err = stdcopy.StdCopy(w, w, reader) + } return err } +func (s *composeService) getContainerStdout(ctx context.Context, container moby.Container) (io.Reader, error) { + var reader io.Reader + if container.State == containerRunning { + logs, err := s.apiClient.ContainerLogs(ctx, container.ID, moby.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Follow: true, + }) + if err != nil { + return nil, err + } + reader = logs + } else { + cnx, err := s.apiClient.ContainerAttach(ctx, container.ID, moby.ContainerAttachOptions{ + Stream: true, + Stdin: true, + Stdout: true, + Stderr: true, + }) + if err != nil { + return nil, err + } + reader = cnx.Reader + + err = s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{}) + if err != nil { + return nil, err + } + } + return reader, nil +} + func getContainerName(c moby.Container) string { // Names return container canonical name /foo + link aliases /linked_by/foo for _, name := range c.Names { @@ -367,6 +476,7 @@ func (s *composeService) Down(ctx context.Context, projectName string) error { Filters: filters.NewArgs( projectFilter(projectName), ), + All: true, }) if err != nil { return err @@ -467,7 +577,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, w io.Writ if err != nil { return err } - consumer := formatter.NewLogConsumer(w) + consumer := formatter.NewLogConsumer(ctx, w) eg, ctx := errgroup.WithContext(ctx) for _, c := range list { service := c.Labels[serviceLabel] @@ -521,7 +631,7 @@ func containersToServiceStatus(containers []moby.Container) ([]compose.ServiceSt containers := containersByLabel[service] runnningContainers := []moby.Container{} for _, container := range containers { - if container.State == "running" { + if container.State == containerRunning { runnningContainers = append(runnningContainers, container) } } diff --git a/local/container.go b/local/container.go new file mode 100644 index 000000000..d8a2d1332 --- /dev/null +++ b/local/container.go @@ -0,0 +1,29 @@ +// +build local + +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package local + +const ( + containerCreated = "created" + containerRestarting = "restarting" + containerRunning = "running" + containerRemoving = "removing" //nolint + containerPaused = "paused" //nolint + containerExited = "exited" //nolint + containerDead = "dead" //nolint +) diff --git a/local/convergence.go b/local/convergence.go index f692f6d58..21a51f791 100644 --- a/local/convergence.go +++ b/local/convergence.go @@ -39,16 +39,12 @@ const ( ) func (s *composeService) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig) error { - err := s.waitDependencies(ctx, project, service) - if err != nil { - return err - } - actual, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{ Filters: filters.NewArgs( - filters.Arg("label", fmt.Sprintf("%s=%s", projectLabel, project.Name)), - filters.Arg("label", fmt.Sprintf("%s=%s", serviceLabel, service.Name)), + projectFilter(project.Name), + serviceFilter(service.Name), ), + All: true, }) if err != nil { return err @@ -93,6 +89,8 @@ func (s *composeService) ensureService(ctx context.Context, project *types.Proje for _, container := range actual { container := container + name := getContainerName(container) + diverged := container.Labels[configHashLabel] != expected if diverged || service.Extensions[extLifecycle] == forceRecreate { eg.Go(func() error { @@ -101,14 +99,18 @@ func (s *composeService) ensureService(ctx context.Context, project *types.Proje continue } - if container.State == "running" { - // already running, skip - continue + w := progress.ContextWriter(ctx) + switch container.State { + case containerRunning: + w.Event(progress.RunningEvent(name)) + case containerCreated: + case containerRestarting: + w.Event(progress.CreatedEvent(name)) + default: + eg.Go(func() error { + return s.restartContainer(ctx, container) + }) } - - eg.Go(func() error { - return s.restartContainer(ctx, service, container) - }) } return eg.Wait() } @@ -163,21 +165,19 @@ func getScale(config types.ServiceConfig) int { } func (s *composeService) createContainer(ctx context.Context, project *types.Project, service types.ServiceConfig, name string, number int) error { - eventName := fmt.Sprintf("Service %q", service.Name) w := progress.ContextWriter(ctx) - w.Event(progress.CreatingEvent(eventName)) + w.Event(progress.CreatingEvent(name)) err := s.runContainer(ctx, project, service, name, number, nil) if err != nil { return err } - w.Event(progress.CreatedEvent(eventName)) + w.Event(progress.CreatedEvent(name)) return nil } func (s *composeService) recreateContainer(ctx context.Context, project *types.Project, service types.ServiceConfig, container moby.Container) error { w := progress.ContextWriter(ctx) - eventName := fmt.Sprintf("Service %q", service.Name) - w.Event(progress.NewEvent(eventName, progress.Working, "Recreate")) + w.Event(progress.NewEvent(getContainerName(container), progress.Working, "Recreate")) err := s.apiClient.ContainerStop(ctx, container.ID, nil) if err != nil { return err @@ -200,7 +200,7 @@ func (s *composeService) recreateContainer(ctx context.Context, project *types.P if err != nil { return err } - w.Event(progress.NewEvent(eventName, progress.Done, "Recreated")) + w.Event(progress.NewEvent(getContainerName(container), progress.Done, "Recreated")) setDependentLifecycle(project, service.Name, forceRecreate) return nil } @@ -218,15 +218,14 @@ func setDependentLifecycle(project *types.Project, service string, strategy stri } } -func (s *composeService) restartContainer(ctx context.Context, service types.ServiceConfig, container moby.Container) error { +func (s *composeService) restartContainer(ctx context.Context, container moby.Container) error { w := progress.ContextWriter(ctx) - eventName := fmt.Sprintf("Service %q", service.Name) - w.Event(progress.NewEvent(eventName, progress.Working, "Restart")) + w.Event(progress.NewEvent(getContainerName(container), progress.Working, "Restart")) err := s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{}) if err != nil { return err } - w.Event(progress.NewEvent(eventName, progress.Done, "Restarted")) + w.Event(progress.NewEvent(getContainerName(container), progress.Done, "Restarted")) return nil } @@ -247,10 +246,6 @@ func (s *composeService) runContainer(ctx context.Context, project *types.Projec return err } } - err = s.apiClient.ContainerStart(ctx, id, moby.ContainerStartOptions{}) - if err != nil { - return err - } return nil } @@ -291,5 +286,38 @@ func (s *composeService) isServiceHealthy(ctx context.Context, project *types.Pr } } return true, nil - +} + +func (s *composeService) startService(ctx context.Context, project *types.Project, service types.ServiceConfig) error { + err := s.waitDependencies(ctx, project, service) + if err != nil { + return err + } + containers, err := s.apiClient.ContainerList(ctx, moby.ContainerListOptions{ + Filters: filters.NewArgs( + projectFilter(project.Name), + serviceFilter(service.Name), + ), + All: true, + }) + if err != nil { + return err + } + eg, ctx := errgroup.WithContext(ctx) + for _, c := range containers { + container := c + if container.State == containerRunning { + continue + } + eg.Go(func() error { + w := progress.ContextWriter(ctx) + w.Event(progress.StartingEvent(getContainerName(container))) + err := s.apiClient.ContainerStart(ctx, container.ID, moby.ContainerStartOptions{}) + if err == nil { + w.Event(progress.StartedEvent(getContainerName(container))) + } + return err + }) + } + return eg.Wait() } diff --git a/local/labels.go b/local/labels.go index e0ca60100..28bb68ea4 100644 --- a/local/labels.go +++ b/local/labels.go @@ -51,3 +51,7 @@ func serviceFilter(serviceName string) filters.KeyValuePair { func hasProjectLabelFilter() filters.KeyValuePair { return filters.Arg("label", projectLabel) } + +func serviceFilter(serviceName string) filters.KeyValuePair { + return filters.Arg("label", fmt.Sprintf("%s=%s", serviceLabel, serviceName)) +} diff --git a/progress/event.go b/progress/event.go index 05ea7ff12..f78508152 100644 --- a/progress/event.go +++ b/progress/event.go @@ -58,6 +58,21 @@ func CreatingEvent(ID string) Event { return NewEvent(ID, Working, "Creating") } +// StartingEvent creates a new Starting in progress Event +func StartingEvent(ID string) Event { + return NewEvent(ID, Working, "Starting") +} + +// StartedEvent creates a new Started in progress Event +func StartedEvent(ID string) Event { + return NewEvent(ID, Done, "Started") +} + +// RunningEvent creates a new Running in progress Event +func RunningEvent(ID string) Event { + return NewEvent(ID, Done, "Running") +} + // CreatedEvent creates a new Created (done) Event func CreatedEvent(ID string) Event { return NewEvent(ID, Done, "Created") diff --git a/server/proxy/compose.go b/server/proxy/compose.go index 59afcf556..9d21eb70d 100644 --- a/server/proxy/compose.go +++ b/server/proxy/compose.go @@ -30,7 +30,7 @@ func (p *proxy) Up(ctx context.Context, request *composev1.ComposeUpRequest) (*c if err != nil { return nil, err } - return &composev1.ComposeUpResponse{ProjectName: project.Name}, Client(ctx).ComposeService().Up(ctx, project, true) + return &composev1.ComposeUpResponse{ProjectName: project.Name}, Client(ctx).ComposeService().Up(ctx, project, true, nil) } func (p *proxy) Down(ctx context.Context, request *composev1.ComposeDownRequest) (*composev1.ComposeDownResponse, error) {