diff --git a/go.mod b/go.mod index 890fa1416..2103cdd57 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 github.com/opencontainers/image-spec v1.1.0-rc6 github.com/otiai10/copy v1.14.0 + github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 @@ -169,6 +170,7 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect google.golang.org/protobuf v1.31.0 // indirect + gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect k8s.io/api v0.26.7 // indirect diff --git a/go.sum b/go.sum index c23181627..cfb3e697b 100644 --- a/go.sum +++ b/go.sum @@ -425,6 +425,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o= +github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= @@ -562,6 +564,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -672,6 +675,8 @@ google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/cenkalti/backoff.v2 v2.2.1 h1:eJ9UAg01/HIHG987TwxvnzK2MgxXq97YY6rYDpY9aII= gopkg.in/cenkalti/backoff.v2 v2.2.1/go.mod h1:S0QdOvT2AlerfSBkp0O+dk+bbIMaNbEmVk876gPCthU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/desktop/client.go b/internal/desktop/client.go index 718c7889e..9d99040bf 100644 --- a/internal/desktop/client.go +++ b/internal/desktop/client.go @@ -17,14 +17,18 @@ package desktop import ( + "bytes" "context" "encoding/json" + "errors" "fmt" + "io" "net" "net/http" "strings" "github.com/docker/compose/v2/internal/memnet" + "github.com/r3labs/sse" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) @@ -119,6 +123,175 @@ func (c *Client) FeatureFlags(ctx context.Context) (FeatureFlagResponse, error) return ret, nil } +type CreateFileShareRequest struct { + HostPath string `json:"hostPath"` + Labels map[string]string `json:"labels,omitempty"` +} + +type CreateFileShareResponse struct { + FileShareID string `json:"fileShareID"` +} + +func (c *Client) CreateFileShare(ctx context.Context, r CreateFileShareRequest) (*CreateFileShareResponse, error) { + rawBody, _ := json.Marshal(r) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, backendURL("/mutagen/file-shares"), bytes.NewReader(rawBody)) + req.Header.Set("Content-Type", "application/json") + if err != nil { + return nil, err + } + resp, err := c.client.Do(req) + if err != nil { + return nil, err + } + defer func() { + _ = resp.Body.Close() + }() + + if resp.StatusCode != http.StatusOK { + errBody, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(errBody)) + } + var ret CreateFileShareResponse + if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil { + return nil, err + } + return &ret, nil +} + +type FileShareReceiverState struct { + TotalReceivedSize uint64 `json:"totalReceivedSize"` +} + +type FileShareEndpoint struct { + Path string `json:"path"` + TotalFileSize uint64 `json:"totalFileSize,omitempty"` + StagingProgress *FileShareReceiverState `json:"stagingProgress"` +} + +type FileShareSession struct { + SessionID string `json:"identifier"` + Alpha FileShareEndpoint `json:"alpha"` + Beta FileShareEndpoint `json:"beta"` + Labels map[string]string `json:"labels"` + Status string `json:"status"` +} + +func (c *Client) ListFileShares(ctx context.Context) ([]FileShareSession, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, backendURL("/mutagen/file-shares"), http.NoBody) + if err != nil { + return nil, err + } + resp, err := c.client.Do(req) + if err != nil { + return nil, err + } + defer func() { + _ = resp.Body.Close() + }() + + if resp.StatusCode != http.StatusOK { + return nil, newHTTPStatusCodeError(resp) + } + + var ret []FileShareSession + if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil { + return nil, err + } + return ret, nil +} + +func (c *Client) DeleteFileShare(ctx context.Context, id string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, backendURL("/mutagen/file-shares/"+id), http.NoBody) + if err != nil { + return err + } + resp, err := c.client.Do(req) + if err != nil { + return err + } + defer func() { + _ = resp.Body.Close() + }() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return newHTTPStatusCodeError(resp) + } + return nil +} + +type EventMessage[T any] struct { + Value T + Error error +} + +func newHTTPStatusCodeError(resp *http.Response) error { + r := io.LimitReader(resp.Body, 2048) + body, err := io.ReadAll(r) + if err != nil { + return fmt.Errorf("http status code %d", resp.StatusCode) + } + return fmt.Errorf("http status code %d: %s", resp.StatusCode, string(body)) +} + +func (c *Client) StreamFileShares(ctx context.Context) (<-chan EventMessage[[]FileShareSession], error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, backendURL("/mutagen/file-shares/stream"), http.NoBody) + if err != nil { + return nil, err + } + resp, err := c.client.Do(req) + if err != nil { + return nil, err + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + defer func() { + _ = resp.Body.Close() + }() + return nil, newHTTPStatusCodeError(resp) + } + + events := make(chan EventMessage[[]FileShareSession]) + go func(ctx context.Context) { + defer func() { + _ = resp.Body.Close() + for range events { + // drain the channel + } + close(events) + }() + if err := readEvents(ctx, resp.Body, events); err != nil { + select { + case <-ctx.Done(): + case events <- EventMessage[[]FileShareSession]{Error: err}: + } + } + }(ctx) + return events, nil +} + +func readEvents[T any](ctx context.Context, r io.Reader, events chan<- EventMessage[T]) error { + eventReader := sse.NewEventStreamReader(r) + for { + msg, err := eventReader.ReadEvent() + if errors.Is(err, io.EOF) { + return nil + } else if err != nil { + return fmt.Errorf("reading events: %w", err) + } + msg = bytes.TrimPrefix(msg, []byte("data: ")) + + var event T + if err := json.Unmarshal(msg, &event); err != nil { + return err + } + select { + case <-ctx.Done(): + return context.Cause(ctx) + case events <- EventMessage[T]{Value: event}: + // event was sent to channel, read next + } + } +} + // backendURL generates a URL for the given API path. // // NOTE: Custom transport handles communication. The host is to create a valid diff --git a/internal/desktop/client_test.go b/internal/desktop/client_test.go new file mode 100644 index 000000000..0355dd501 --- /dev/null +++ b/internal/desktop/client_test.go @@ -0,0 +1,52 @@ +/* + Copyright 2024 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package desktop + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestClientPing(t *testing.T) { + if testing.Short() { + t.Skip("Skipped in short mode - test connects to Docker Desktop") + } + desktopEndpoint := os.Getenv("COMPOSE_TEST_DESKTOP_ENDPOINT") + if desktopEndpoint == "" { + t.Skip("Skipping - COMPOSE_TEST_DESKTOP_ENDPOINT not defined") + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + client := NewClient(desktopEndpoint) + t.Cleanup(func() { + _ = client.Close() + }) + + now := time.Now() + + ret, err := client.Ping(ctx) + require.NoError(t, err) + + serverTime := time.Unix(0, ret.ServerTime) + require.True(t, now.Before(serverTime)) +} diff --git a/internal/desktop/file_shares.go b/internal/desktop/file_shares.go new file mode 100644 index 000000000..cf4af09c3 --- /dev/null +++ b/internal/desktop/file_shares.go @@ -0,0 +1,387 @@ +/* + Copyright 2024 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package desktop + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + + "github.com/docker/compose/v2/internal/paths" + "github.com/docker/compose/v2/pkg/api" + "github.com/docker/compose/v2/pkg/progress" + "github.com/docker/go-units" + "github.com/sirupsen/logrus" +) + +// fileShareProgressID is the identifier used for the root grouping of file +// share events in the progress writer. +const fileShareProgressID = "Synchronized File Shares" + +// RemoveFileSharesForProject removes any Synchronized File Shares that were +// created by Compose for this project in the past if possible. +// +// Errors are not propagated; they are only sent to the progress writer. +func RemoveFileSharesForProject(ctx context.Context, c *Client, projectName string) { + w := progress.ContextWriter(ctx) + + existing, err := c.ListFileShares(ctx) + if err != nil { + w.TailMsgf("Synchronized File Shares not removed due to error: %v", err) + return + } + // filter the list first, so we can early return and not show the event if + // there's no sessions to clean up + var toRemove []FileShareSession + for _, share := range existing { + if share.Labels["com.docker.compose.project"] == projectName { + toRemove = append(toRemove, share) + } + } + if len(toRemove) == 0 { + return + } + + w.Event(progress.NewEvent(fileShareProgressID, progress.Working, "Removing")) + rootResult := progress.Done + defer func() { + w.Event(progress.NewEvent(fileShareProgressID, rootResult, "")) + }() + for _, share := range toRemove { + shareID := share.Labels["com.docker.desktop.mutagen.file-share.id"] + if shareID == "" { + w.Event(progress.Event{ + ID: share.Alpha.Path, + ParentID: fileShareProgressID, + Status: progress.Warning, + StatusText: "Invalid", + }) + continue + } + + w.Event(progress.Event{ + ID: share.Alpha.Path, + ParentID: fileShareProgressID, + Status: progress.Working, + }) + + var status progress.EventStatus + var statusText string + if err := c.DeleteFileShare(ctx, shareID); err != nil { + // TODO(milas): Docker Desktop is doing weird things with error responses, + // once fixed, we can return proper error types from the client + if strings.Contains(err.Error(), "file share in use") { + status = progress.Warning + statusText = "Resource is still in use" + if rootResult != progress.Error { + // error takes precedence over warning + rootResult = progress.Warning + } + } else { + logrus.Debugf("Error deleting file share %q: %v", shareID, err) + status = progress.Error + rootResult = progress.Error + } + } else { + logrus.Debugf("Deleted file share: %s", shareID) + status = progress.Done + } + + w.Event(progress.Event{ + ID: share.Alpha.Path, + ParentID: fileShareProgressID, + Status: status, + StatusText: statusText, + }) + } +} + +// FileShareManager maps between Compose bind mounts and Desktop File Shares +// state. +type FileShareManager struct { + mu sync.Mutex + cli *Client + projectName string + hostPaths []string + // state holds session status keyed by file share ID. + state map[string]*FileShareSession +} + +func NewFileShareManager(cli *Client, projectName string, hostPaths []string) *FileShareManager { + return &FileShareManager{ + cli: cli, + projectName: projectName, + hostPaths: hostPaths, + state: make(map[string]*FileShareSession), + } +} + +// EnsureExists looks for existing File Shares or creates new ones for the +// host paths. +// +// This function blocks until each share reaches steady state, at which point +// flow can continue. +func (m *FileShareManager) EnsureExists(ctx context.Context) (err error) { + w := progress.ContextWriter(ctx) + // TODO(milas): this should be a per-node option, not global + w.HasMore(false) + + w.Event(progress.NewEvent(fileShareProgressID, progress.Working, "")) + defer func() { + if err != nil { + w.Event(progress.NewEvent(fileShareProgressID, progress.Error, "")) + } else { + w.Event(progress.NewEvent(fileShareProgressID, progress.Done, "")) + } + }() + + wait := &waiter{ + shareIDs: make(map[string]struct{}), + done: make(chan struct{}), + } + + handler := m.eventHandler(w, wait) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // stream session events to update internal state for project + monitorErr := make(chan error, 1) + go func() { + defer close(monitorErr) + if err := m.watch(ctx, handler); err != nil && ctx.Err() == nil { + monitorErr <- err + } + }() + + if err := m.initialize(ctx, wait, handler); err != nil { + return err + } + + waitCh := wait.start() + if waitCh != nil { + select { + case <-ctx.Done(): + return context.Cause(ctx) + case err := <-monitorErr: + if err != nil { + return fmt.Errorf("watching file share sessions: %w", err) + } else if ctx.Err() == nil { + // this indicates a bug - it should not stop w/o an error if the context is still active + return errors.New("file share session watch stopped unexpectedly") + } + case <-wait.start(): + // everything is done + } + } + + return nil +} + +// initialize finds existing shares or creates new ones for the host paths. +// +// Once a share is found/created, its progress is monitored via the watch. +func (m *FileShareManager) initialize(ctx context.Context, wait *waiter, handler func(FileShareSession)) error { + // the watch is already running in the background, so the lock is taken + // throughout to prevent interleaving writes + m.mu.Lock() + defer m.mu.Unlock() + + existing, err := m.cli.ListFileShares(ctx) + if err != nil { + return err + } + + for _, path := range m.hostPaths { + var fileShareID string + var fss *FileShareSession + + if fss = findExistingShare(path, existing); fss != nil { + fileShareID = fss.Beta.Path + logrus.Debugf("Found existing suitable file share %s for path %q [%s]", fileShareID, path, fss.Alpha.Path) + wait.addShare(fileShareID) + handler(*fss) + continue + } else { + req := CreateFileShareRequest{ + HostPath: path, + Labels: map[string]string{ + "com.docker.compose.project": m.projectName, + }, + } + createResp, err := m.cli.CreateFileShare(ctx, req) + if err != nil { + return fmt.Errorf("creating file share: %w", err) + } + fileShareID = createResp.FileShareID + fss = m.state[fileShareID] + logrus.Debugf("Created file share %s for path %q", fileShareID, path) + } + wait.addShare(fileShareID) + if fss != nil { + handler(*fss) + } + } + + return nil +} + +func (m *FileShareManager) watch(ctx context.Context, handler func(FileShareSession)) error { + events, err := m.cli.StreamFileShares(ctx) + if err != nil { + return fmt.Errorf("streaming file shares: %w", err) + } + + for { + select { + case <-ctx.Done(): + return nil + case event := <-events: + if event.Error != nil { + return fmt.Errorf("reading file share events: %w", event.Error) + } + // closure for lock + func() { + m.mu.Lock() + defer m.mu.Unlock() + for _, fss := range event.Value { + handler(fss) + } + }() + } + } +} + +// eventHandler updates internal state, keeps track of in-progress syncs, and +// prints relevant events to progress. +func (m *FileShareManager) eventHandler(w progress.Writer, wait *waiter) func(fss FileShareSession) { + return func(fss FileShareSession) { + fileShareID := fss.Beta.Path + + shouldPrint := wait.isWatching(fileShareID) + forProject := fss.Labels[api.ProjectLabel] == m.projectName + + if shouldPrint || forProject { + m.state[fileShareID] = &fss + } + + var percent int + var current, total int64 + if fss.Beta.StagingProgress != nil { + current = int64(fss.Beta.StagingProgress.TotalReceivedSize) + } else { + current = int64(fss.Beta.TotalFileSize) + } + total = int64(fss.Alpha.TotalFileSize) + if total != 0 { + percent = int(current * 100 / total) + } + + var status progress.EventStatus + var text string + + switch { + case strings.HasPrefix(fss.Status, "halted"): + wait.shareDone(fileShareID) + status = progress.Error + case fss.Status == "watching": + wait.shareDone(fileShareID) + status = progress.Done + percent = 100 + case fss.Status == "staging-beta": + status = progress.Working + // TODO(milas): the printer doesn't style statuses for children nicely + text = fmt.Sprintf(" Syncing (%7s / %-7s)", + units.HumanSize(float64(current)), + units.HumanSize(float64(total)), + ) + default: + // catch-all for various other transitional statuses + status = progress.Working + } + + evt := progress.Event{ + ID: fss.Alpha.Path, + Status: status, + Text: text, + ParentID: fileShareProgressID, + Current: current, + Total: total, + Percent: percent, + } + + if shouldPrint { + w.Event(evt) + } + } +} + +func findExistingShare(path string, existing []FileShareSession) *FileShareSession { + for _, share := range existing { + if paths.IsChild(share.Alpha.Path, path) { + return &share + } + } + return nil +} + +type waiter struct { + mu sync.Mutex + shareIDs map[string]struct{} + done chan struct{} +} + +func (w *waiter) addShare(fileShareID string) { + w.mu.Lock() + defer w.mu.Unlock() + w.shareIDs[fileShareID] = struct{}{} +} + +func (w *waiter) isWatching(fileShareID string) bool { + w.mu.Lock() + defer w.mu.Unlock() + _, ok := w.shareIDs[fileShareID] + return ok +} + +// start returns a channel to wait for any outstanding shares to be ready. +// +// If no shares are registered when this is called, nil is returned. +func (w *waiter) start() <-chan struct{} { + w.mu.Lock() + defer w.mu.Unlock() + if len(w.shareIDs) == 0 { + return nil + } + if w.done == nil { + w.done = make(chan struct{}) + } + return w.done +} + +func (w *waiter) shareDone(fileShareID string) { + w.mu.Lock() + defer w.mu.Unlock() + + delete(w.shareIDs, fileShareID) + if len(w.shareIDs) == 0 && w.done != nil { + close(w.done) + w.done = nil + } +} diff --git a/internal/experimental/experimental.go b/internal/experimental/experimental.go index 878757b6d..d69609d97 100644 --- a/internal/experimental/experimental.go +++ b/internal/experimental/experimental.go @@ -76,7 +76,7 @@ func (s *State) AutoFileShares() bool { } func (s *State) determineFeatureState(name string) bool { - if !s.active || s.desktopValues == nil { + if s == nil || !s.active || s.desktopValues == nil { return false } // TODO(milas): we should add individual environment variable overrides diff --git a/internal/paths/paths.go b/internal/paths/paths.go new file mode 100644 index 000000000..4e4c01b8c --- /dev/null +++ b/internal/paths/paths.go @@ -0,0 +1,120 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package paths + +import ( + "os" + "path/filepath" + "strings" +) + +func IsChild(dir string, file string) bool { + if dir == "" { + return false + } + + dir = filepath.Clean(dir) + current := filepath.Clean(file) + child := "." + for { + if strings.EqualFold(dir, current) { + // If the two paths are exactly equal, then they must be the same. + if dir == current { + return true + } + + // If the two paths are equal under case-folding, but not exactly equal, + // then the only way to check if they're truly "equal" is to check + // to see if we're on a case-insensitive file system. + // + // This is a notoriously tricky problem. See how dep solves it here: + // https://github.com/golang/dep/blob/v0.5.4/internal/fs/fs.go#L33 + // + // because you can mount case-sensitive filesystems onto case-insensitive + // file-systems, and vice versa :scream: + // + // We want to do as much of this check as possible with strings-only + // (to avoid a file system read and error handling), so we only + // do this check if we have no other choice. + dirInfo, err := os.Stat(dir) + if err != nil { + return false + } + + currentInfo, err := os.Stat(current) + if err != nil { + return false + } + + if !os.SameFile(dirInfo, currentInfo) { + return false + } + return true + } + + if len(current) <= len(dir) || current == "." { + return false + } + + cDir := filepath.Dir(current) + cBase := filepath.Base(current) + child = filepath.Join(cBase, child) + current = cDir + } +} + +// EncompassingPaths returns the minimal set of paths that root all paths +// from the original collection. +// +// For example, ["/foo", "/foo/bar", "/foo", "/baz"] -> ["/foo", "/baz]. +func EncompassingPaths(paths []string) []string { + result := []string{} + for _, current := range paths { + isCovered := false + hasRemovals := false + + for i, existing := range result { + if IsChild(existing, current) { + // The path is already covered, so there's no need to include it + isCovered = true + break + } + + if IsChild(current, existing) { + // Mark the element empty for removal. + result[i] = "" + hasRemovals = true + } + } + + if !isCovered { + result = append(result, current) + } + + if hasRemovals { + // Remove all the empties + newResult := []string{} + for _, r := range result { + if r != "" { + newResult = append(newResult, r) + } + } + result = newResult + } + } + return result +} diff --git a/pkg/compose/create.go b/pkg/compose/create.go index c034b1466..241ab1f40 100644 --- a/pkg/compose/create.go +++ b/pkg/compose/create.go @@ -22,12 +22,17 @@ import ( "encoding/json" "errors" "fmt" + "io/fs" "os" "path" "path/filepath" + "sort" "strconv" "strings" + "github.com/compose-spec/compose-go/v2/types" + "github.com/docker/compose/v2/internal/desktop" + pathutil "github.com/docker/compose/v2/internal/paths" moby "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/blkiodev" "github.com/docker/docker/api/types/container" @@ -42,8 +47,6 @@ import ( "github.com/docker/go-units" "github.com/sirupsen/logrus" - "github.com/compose-spec/compose-go/v2/types" - "github.com/docker/compose/v2/pkg/api" "github.com/docker/compose/v2/pkg/progress" "github.com/docker/compose/v2/pkg/utils" @@ -65,11 +68,11 @@ type createConfigs struct { func (s *composeService) Create(ctx context.Context, project *types.Project, createOpts api.CreateOptions) error { return progress.RunWithTitle(ctx, func(ctx context.Context) error { - return s.create(ctx, project, createOpts) + return s.create(ctx, project, createOpts, false) }, s.stdinfo(), "Creating") } -func (s *composeService) create(ctx context.Context, project *types.Project, options api.CreateOptions) error { +func (s *composeService) create(ctx context.Context, project *types.Project, options api.CreateOptions, willAttach bool) error { if len(options.Services) == 0 { options.Services = project.ServiceNames() } @@ -111,6 +114,9 @@ func (s *composeService) create(ctx context.Context, project *types.Project, opt } } + if willAttach { + progress.ContextWriter(ctx).HasMore(willAttach) + } return newConvergence(options.Services, observedState, s).apply(ctx, project, options) } @@ -144,6 +150,49 @@ func (s *composeService) ensureProjectVolumes(ctx context.Context, project *type return err } } + + err := func() error { + if s.experiments.AutoFileShares() && s.desktopCli != nil { + // collect all the bind mount paths and try to set up file shares in + // Docker Desktop for them + var paths []string + for _, svcName := range project.ServiceNames() { + svc := project.Services[svcName] + for _, vol := range svc.Volumes { + if vol.Type != string(mount.TypeBind) { + continue + } + p := filepath.Clean(vol.Source) + if !filepath.IsAbs(p) { + return fmt.Errorf("file share path is not absolute: %s", p) + } + if _, err := os.Stat(p); errors.Is(err, fs.ErrNotExist) { + if vol.Bind != nil && !vol.Bind.CreateHostPath { + return fmt.Errorf("service %s: host path %q does not exist and `create_host_path` is false", svcName, vol.Source) + } + if err := os.MkdirAll(p, 0o755); err != nil { + return fmt.Errorf("creating host path: %w", err) + } + } + paths = append(paths, p) + } + } + + // remove duplicate/unnecessary child paths and sort them for predictability + paths = pathutil.EncompassingPaths(paths) + sort.Strings(paths) + + fileShareManager := desktop.NewFileShareManager(s.desktopCli, project.Name, paths) + if err := fileShareManager.EnsureExists(ctx); err != nil { + return fmt.Errorf("initializing: %w", err) + } + } + return nil + }() + + if err != nil { + progress.ContextWriter(ctx).TailMsgf("Failed to prepare Synchronized File Shares: %v", err) + } return nil } diff --git a/pkg/compose/down.go b/pkg/compose/down.go index 7c344b078..010eb6f6c 100644 --- a/pkg/compose/down.go +++ b/pkg/compose/down.go @@ -22,9 +22,9 @@ import ( "strings" "time" - "github.com/docker/compose/v2/pkg/utils" - "github.com/compose-spec/compose-go/v2/types" + "github.com/docker/compose/v2/internal/desktop" + "github.com/docker/compose/v2/pkg/utils" moby "github.com/docker/docker/api/types" containerType "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" @@ -144,6 +144,14 @@ func (s *composeService) ensureVolumesDown(ctx context.Context, project *types.P return s.removeVolume(ctx, volumeName, w) }) } + + if s.experiments.AutoFileShares() && s.desktopCli != nil { + ops = append(ops, func() error { + desktop.RemoveFileSharesForProject(ctx, s.desktopCli, project.Name) + return nil + }) + } + return ops } diff --git a/pkg/compose/scale.go b/pkg/compose/scale.go index 985cc129d..b10aea095 100644 --- a/pkg/compose/scale.go +++ b/pkg/compose/scale.go @@ -26,7 +26,7 @@ import ( func (s *composeService) Scale(ctx context.Context, project *types.Project, options api.ScaleOptions) error { return progress.Run(ctx, tracing.SpanWrapFunc("project/scale", tracing.ProjectOptions(ctx, project), func(ctx context.Context) error { - err := s.create(ctx, project, api.CreateOptions{Services: options.Services}) + err := s.create(ctx, project, api.CreateOptions{Services: options.Services}, true) if err != nil { return err } diff --git a/pkg/compose/up.go b/pkg/compose/up.go index ab22de48b..94b37a976 100644 --- a/pkg/compose/up.go +++ b/pkg/compose/up.go @@ -34,8 +34,7 @@ import ( func (s *composeService) Up(ctx context.Context, project *types.Project, options api.UpOptions) error { //nolint:gocyclo err := progress.Run(ctx, tracing.SpanWrapFunc("project/up", tracing.ProjectOptions(ctx, project), func(ctx context.Context) error { w := progress.ContextWriter(ctx) - w.HasMore(options.Start.Attach == nil) - err := s.create(ctx, project, options.Create) + err := s.create(ctx, project, options.Create, options.Start.Attach != nil) if err != nil { return err } diff --git a/pkg/compose/watch.go b/pkg/compose/watch.go index 2a651baac..bc5469f88 100644 --- a/pkg/compose/watch.go +++ b/pkg/compose/watch.go @@ -28,6 +28,7 @@ import ( "time" "github.com/compose-spec/compose-go/v2/types" + pathutil "github.com/docker/compose/v2/internal/paths" "github.com/docker/compose/v2/internal/sync" "github.com/docker/compose/v2/pkg/api" "github.com/docker/compose/v2/pkg/watch" @@ -228,7 +229,7 @@ func (s *composeService) watch(ctx context.Context, project *types.Project, name // // Any errors are logged as warnings and nil (no file event) is returned. func maybeFileEvent(trigger types.Trigger, hostPath string, ignore watch.PathMatcher) *fileEvent { - if !watch.IsChild(trigger.Path, hostPath) { + if !pathutil.IsChild(trigger.Path, hostPath) { return nil } isIgnored, err := ignore.Matches(hostPath) @@ -456,7 +457,7 @@ func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Pr Services: []string{serviceName}, Inherit: true, Recreate: api.RecreateForce, - }) + }, true) if err != nil { options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err)) return err diff --git a/pkg/progress/tty.go b/pkg/progress/tty.go index 20322636d..0675beede 100644 --- a/pkg/progress/tty.go +++ b/pkg/progress/tty.go @@ -104,9 +104,16 @@ func (w *ttyWriter) event(e Event) { last.Status = e.Status last.Text = e.Text last.StatusText = e.StatusText - last.Total = e.Total - last.Current = e.Current - last.Percent = e.Percent + // progress can only go up + if e.Total > last.Total { + last.Total = e.Total + } + if e.Current > last.Current { + last.Current = e.Current + } + if e.Percent > last.Percent { + last.Percent = e.Percent + } // allow set/unset of parent, but not swapping otherwise prompt is flickering if last.ParentID == "" || e.ParentID == "" { last.ParentID = e.ParentID @@ -236,28 +243,46 @@ func (w *ttyWriter) lineText(event Event, pad string, terminalWidth, statusPaddi elapsed := endTime.Sub(event.startTime).Seconds() var ( - total int64 - current int64 - completion []string + hideDetails bool + total int64 + current int64 + completion []string ) - for _, v := range w.eventIDs { - ev := w.events[v] - if ev.ParentID == event.ID { - total += ev.Total - current += ev.Current - completion = append(completion, percentChars[(len(percentChars)-1)*ev.Percent/100]) + // only show the aggregated progress while the root operation is in-progress + if parent := event; parent.Status == Working { + for _, v := range w.eventIDs { + child := w.events[v] + if child.ParentID == parent.ID { + if child.Status == Working && child.Total == 0 { + // we don't have totals available for all the child events + // so don't show the total progress yet + hideDetails = true + } + total += child.Total + current += child.Current + completion = append(completion, percentChars[(len(percentChars)-1)*child.Percent/100]) + } } } + // don't try to show detailed progress if we don't have any idea + if total == 0 { + hideDetails = true + } + var txt string if len(completion) > 0 { - txt = fmt.Sprintf("%s %s [%s] %7s/%-7s %s", + var details string + if !hideDetails { + details = fmt.Sprintf(" %7s / %-7s", units.HumanSize(float64(current)), units.HumanSize(float64(total))) + } + txt = fmt.Sprintf("%s [%s]%s %s", event.ID, - CountColor(fmt.Sprintf("%d layers", len(completion))), SuccessColor(strings.Join(completion, "")), - units.HumanSize(float64(current)), units.HumanSize(float64(total)), - event.Text) + details, + event.Text, + ) } else { txt = fmt.Sprintf("%s %s", event.ID, event.Text) } diff --git a/pkg/watch/dockerignore.go b/pkg/watch/dockerignore.go index 35c18499b..3f6bd0742 100644 --- a/pkg/watch/dockerignore.go +++ b/pkg/watch/dockerignore.go @@ -22,6 +22,7 @@ import ( "path/filepath" "strings" + "github.com/docker/compose/v2/internal/paths" "github.com/moby/patternmatcher" "github.com/moby/patternmatcher/ignorefile" ) @@ -50,7 +51,7 @@ func (i dockerPathMatcher) MatchesEntireDir(f string) (bool, error) { if !pattern.Exclusion() { continue } - if IsChild(f, pattern.String()) { + if paths.IsChild(f, pattern.String()) { // Found an exclusion match -- we don't match this whole dir return false, nil } diff --git a/pkg/watch/paths.go b/pkg/watch/paths.go index 1496a06dd..c0c893cd9 100644 --- a/pkg/watch/paths.go +++ b/pkg/watch/paths.go @@ -20,7 +20,6 @@ import ( "fmt" "os" "path/filepath" - "strings" ) func greatestExistingAncestor(path string) (string, error) { @@ -40,98 +39,3 @@ func greatestExistingAncestor(path string) (string, error) { return path, nil } - -// If we're recursively watching a path, it doesn't -// make sense to watch any of its descendants. -func dedupePathsForRecursiveWatcher(paths []string) []string { - result := []string{} - for _, current := range paths { - isCovered := false - hasRemovals := false - - for i, existing := range result { - if IsChild(existing, current) { - // The path is already covered, so there's no need to include it - isCovered = true - break - } - - if IsChild(current, existing) { - // Mark the element empty for removal. - result[i] = "" - hasRemovals = true - } - } - - if !isCovered { - result = append(result, current) - } - - if hasRemovals { - // Remove all the empties - newResult := []string{} - for _, r := range result { - if r != "" { - newResult = append(newResult, r) - } - } - result = newResult - } - } - return result -} - -func IsChild(dir string, file string) bool { - if dir == "" { - return false - } - - dir = filepath.Clean(dir) - current := filepath.Clean(file) - child := "." - for { - if strings.EqualFold(dir, current) { - // If the two paths are exactly equal, then they must be the same. - if dir == current { - return true - } - - // If the two paths are equal under case-folding, but not exactly equal, - // then the only way to check if they're truly "equal" is to check - // to see if we're on a case-insensitive file system. - // - // This is a notoriously tricky problem. See how dep solves it here: - // https://github.com/golang/dep/blob/v0.5.4/internal/fs/fs.go#L33 - // - // because you can mount case-sensitive filesystems onto case-insensitive - // file-systems, and vice versa :scream: - // - // We want to do as much of this check as possible with strings-only - // (to avoid a file system read and error handling), so we only - // do this check if we have no other choice. - dirInfo, err := os.Stat(dir) - if err != nil { - return false - } - - currentInfo, err := os.Stat(current) - if err != nil { - return false - } - - if !os.SameFile(dirInfo, currentInfo) { - return false - } - return true - } - - if len(current) <= len(dir) || current == "." { - return false - } - - cDir := filepath.Dir(current) - cBase := filepath.Base(current) - child = filepath.Join(cBase, child) - current = cDir - } -} diff --git a/pkg/watch/watcher_darwin.go b/pkg/watch/watcher_darwin.go index 37aac3a31..aef5da1b4 100644 --- a/pkg/watch/watcher_darwin.go +++ b/pkg/watch/watcher_darwin.go @@ -25,6 +25,7 @@ import ( "path/filepath" "time" + pathutil "github.com/docker/compose/v2/internal/paths" "github.com/fsnotify/fsevents" "github.com/sirupsen/logrus" ) @@ -132,7 +133,7 @@ func newWatcher(paths []string, ignore PathMatcher) (Notify, error) { stop: make(chan struct{}), } - paths = dedupePathsForRecursiveWatcher(paths) + paths = pathutil.EncompassingPaths(paths) for _, path := range paths { path, err := filepath.Abs(path) if err != nil { diff --git a/pkg/watch/watcher_naive.go b/pkg/watch/watcher_naive.go index e681d5a96..5ee0b0536 100644 --- a/pkg/watch/watcher_naive.go +++ b/pkg/watch/watcher_naive.go @@ -27,8 +27,8 @@ import ( "runtime" "strings" + pathutil "github.com/docker/compose/v2/internal/paths" "github.com/sirupsen/logrus" - "github.com/tilt-dev/fsnotify" ) @@ -71,7 +71,7 @@ func (d *naiveNotify) Start() error { return err } if d.isWatcherRecursive { - pathsToWatch = dedupePathsForRecursiveWatcher(pathsToWatch) + pathsToWatch = pathutil.EncompassingPaths(pathsToWatch) } for _, name := range pathsToWatch { @@ -246,7 +246,7 @@ func (d *naiveNotify) shouldNotify(path string) bool { } for root := range d.notifyList { - if IsChild(root, path) { + if pathutil.IsChild(root, path) { return true } } @@ -281,7 +281,7 @@ func (d *naiveNotify) shouldSkipDir(path string) (bool, error) { // - A parent of a directory that's in our notify list // (i.e., to cover the "path doesn't exist" case). for root := range d.notifyList { - if IsChild(root, path) || IsChild(path, root) { + if pathutil.IsChild(root, path) || pathutil.IsChild(path, root) { return false, nil } } @@ -320,7 +320,7 @@ func newWatcher(paths []string, ignore PathMatcher) (Notify, error) { wrappedEvents := make(chan FileEvent) notifyList := make(map[string]bool, len(paths)) if isWatcherRecursive { - paths = dedupePathsForRecursiveWatcher(paths) + paths = pathutil.EncompassingPaths(paths) } for _, path := range paths { path, err := filepath.Abs(path)