diff --git a/cmd/client.go b/cmd/client.go index 88b6ae4b..c6793a69 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -78,7 +78,7 @@ func NewClient(cfg ClientConfig, options ...fn.Option) (*fn.Client, func()) { fn.WithRemover(knative.NewRemover(cfg.Namespace, cfg.Verbose)), fn.WithDescriber(knative.NewDescriber(cfg.Namespace, cfg.Verbose)), fn.WithLister(knative.NewLister(cfg.Namespace, cfg.Verbose)), - fn.WithRunner(docker.NewRunner(cfg.Verbose)), + fn.WithRunner(docker.NewRunner(cfg.Verbose, os.Stdout, os.Stderr)), fn.WithDeployer(d), fn.WithPipelinesProvider(pp), fn.WithPusher(docker.NewPusher( diff --git a/docker/runner.go b/docker/runner.go index a173eeb3..5bde3bf9 100644 --- a/docker/runner.go +++ b/docker/runner.go @@ -3,6 +3,7 @@ package docker import ( "context" "fmt" + "io" "net" "os" "time" @@ -35,11 +36,17 @@ const ( // Runner starts and stops functions as local containers. type Runner struct { verbose bool // Verbose logging + out io.Writer + errOut io.Writer } // NewRunner creates an instance of a docker-backed runner. -func NewRunner(verbose bool) *Runner { - return &Runner{verbose: verbose} +func NewRunner(verbose bool, out, errOut io.Writer) *Runner { + return &Runner{ + verbose: verbose, + out: out, + errOut: errOut, + } } // Run the function. @@ -69,7 +76,7 @@ func (n *Runner) Run(ctx context.Context, f fn.Function) (job *fn.Job, err error if id, err = newContainer(ctx, c, f, port, n.verbose); err != nil { return job, errors.Wrap(err, "runner unable to create container") } - if conn, err = copyStdio(ctx, c, id, copyErrCh); err != nil { + if conn, err = copyStdio(ctx, c, id, copyErrCh, n.out, n.errOut); err != nil { return } @@ -225,7 +232,7 @@ func newHostConfig(port string) (c container.HostConfig, err error) { // copy stdin and stdout from the container of the given ID. Errors encountered // during copy are communicated via a provided errs channel. -func copyStdio(ctx context.Context, c client.CommonAPIClient, id string, errs chan error) (conn net.Conn, err error) { +func copyStdio(ctx context.Context, c client.CommonAPIClient, id string, errs chan error, out, errOut io.Writer) (conn net.Conn, err error) { var ( res types.HijackedResponse opt = types.ContainerAttachOptions{ @@ -239,7 +246,7 @@ func copyStdio(ctx context.Context, c client.CommonAPIClient, id string, errs ch return conn, errors.Wrap(err, "runner unable to attach to container's stdio") } go func() { - _, err := stdcopy.StdCopy(os.Stdout, os.Stderr, res.Reader) + _, err := stdcopy.StdCopy(out, errOut, res.Reader) errs <- err }() return res.Conn, nil diff --git a/docker/runner_int_test.go b/docker/runner_int_test.go new file mode 100644 index 00000000..2007ad10 --- /dev/null +++ b/docker/runner_int_test.go @@ -0,0 +1,99 @@ +//go:build integration +// +build integration + +package docker_test + +import ( + "bytes" + "context" + "errors" + "io" + "net" + "strings" + "testing" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/protocol/http" + "github.com/docker/docker/api/types" + dockerClient "github.com/docker/docker/client" + + fn "knative.dev/func" + "knative.dev/func/docker" +) + +const displayEventImg = "gcr.io/knative-releases/knative.dev/eventing/cmd/event_display@sha256:610234e4319b767b187398085971d881956da660a4e0fab65a763e0f81881d82" + +func TestRun(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + t.Cleanup(cancel) + + prePullTestImages(t) + + // deliberately try to seize 8080 + l, err := net.Listen("tcp", "localhost:8080") + if err == nil { + t.Cleanup(func() { _ = l.Close() }) + } + + var out, errOut bytes.Buffer + runner := docker.NewRunner(true, &out, &errOut) + + j, err := runner.Run(ctx, fn.Function{ + Image: displayEventImg, + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(j.Stop) + time.Sleep(time.Second * 5) + + var ( + id = "runner-test-id" + src = "runner-test-src" + typ = "runner-test-type" + ) + + event := cloudevents.NewEvent() + event.SetID(id) + event.SetSource(src) + event.SetType(typ) + + c, err := cloudevents.NewClientHTTP(cloudevents.WithTarget("http://localhost:" + j.Port)) + if err != nil { + t.Fatal(err) + } + + var httpErr *http.Result + res := c.Send(ctx, event) + if ok := errors.As(res, &httpErr); ok { + if httpErr.StatusCode < 200 || httpErr.StatusCode > 299 { + t.Fatal("non 2XX code") + } + } else { + t.Error("expected http.Result type") + } + time.Sleep(time.Second * 5) + + t.Log("out: ", out.String()) + t.Log("errOut: ", errOut.String()) + + outStr := out.String() + + if !(strings.Contains(outStr, id) && strings.Contains(outStr, src) && strings.Contains(outStr, typ)) { + t.Error("output doesn't contain invocation info") + } +} + +func prePullTestImages(t *testing.T) { + t.Helper() + c, _, err := docker.NewClient(dockerClient.DefaultDockerHost) + if err != nil { + t.Fatal(err) + } + resp, err := c.ImagePull(context.Background(), displayEventImg, types.ImagePullOptions{}) + if err != nil { + t.Fatal(err) + } + _, _ = io.Copy(io.Discard, resp) +} diff --git a/docker/runner_test.go b/docker/runner_test.go index 54a3b4a4..80cceafd 100644 --- a/docker/runner_test.go +++ b/docker/runner_test.go @@ -35,7 +35,7 @@ func TestDockerRun(t *testing.T) { // NOTE: test requires that the image be built already. - runner := docker.NewRunner(true) + runner := docker.NewRunner(true, os.Stdout, os.Stdout) if _, err = runner.Run(context.Background(), f); err != nil { t.Fatal(err) } @@ -47,7 +47,7 @@ func TestDockerRun(t *testing.T) { } func TestDockerRunImagelessError(t *testing.T) { - runner := docker.NewRunner(true) + runner := docker.NewRunner(true, os.Stdout, os.Stderr) f := fn.NewFunctionWith(fn.Function{}) _, err := runner.Run(context.Background(), f)