mirror of https://github.com/knative/func.git
feat: add --address option to func run (#2887)
This commit is contained in:
parent
ffd997c448
commit
e8ccb1bdcf
|
@ -30,7 +30,7 @@ func TestInvoke(t *testing.T) {
|
||||||
// Mock Runner
|
// Mock Runner
|
||||||
// Starts a service which sets invoked=1 on any request
|
// Starts a service which sets invoked=1 on any request
|
||||||
runner := mock.NewRunner()
|
runner := mock.NewRunner()
|
||||||
runner.RunFn = func(ctx context.Context, f fn.Function, _ time.Duration) (job *fn.Job, err error) {
|
runner.RunFn = func(ctx context.Context, f fn.Function, _ string, _ time.Duration) (job *fn.Job, err error) {
|
||||||
var (
|
var (
|
||||||
l net.Listener
|
l net.Listener
|
||||||
h = http.NewServeMux()
|
h = http.NewServeMux()
|
||||||
|
|
15
cmd/run.go
15
cmd/run.go
|
@ -28,7 +28,7 @@ NAME
|
||||||
SYNOPSIS
|
SYNOPSIS
|
||||||
{{rootCmdUse}} run [-t|--container] [-r|--registry] [-i|--image] [-e|--env]
|
{{rootCmdUse}} run [-t|--container] [-r|--registry] [-i|--image] [-e|--env]
|
||||||
[--build] [-b|--builder] [--builder-image] [-c|--confirm]
|
[--build] [-b|--builder] [--builder-image] [-c|--confirm]
|
||||||
[-v|--verbose]
|
[--address] [-v|--verbose]
|
||||||
|
|
||||||
DESCRIPTION
|
DESCRIPTION
|
||||||
Run the function locally.
|
Run the function locally.
|
||||||
|
@ -68,9 +68,12 @@ EXAMPLES
|
||||||
|
|
||||||
o Run the function locally on the host with no containerization (Go only).
|
o Run the function locally on the host with no containerization (Go only).
|
||||||
$ {{rootCmdUse}} run --container=false
|
$ {{rootCmdUse}} run --container=false
|
||||||
|
|
||||||
|
o Run the function locally on a specific address.
|
||||||
|
$ {{rootCmdUse}} run --address=0.0.0.0:8081
|
||||||
`,
|
`,
|
||||||
SuggestFor: []string{"rnu"},
|
SuggestFor: []string{"rnu"},
|
||||||
PreRunE: bindEnv("build", "builder", "builder-image", "confirm", "container", "env", "image", "path", "registry", "start-timeout", "verbose"),
|
PreRunE: bindEnv("address", "build", "builder", "builder-image", "confirm", "container", "env", "image", "path", "registry", "start-timeout", "verbose"),
|
||||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||||
return runRun(cmd, newClient)
|
return runRun(cmd, newClient)
|
||||||
},
|
},
|
||||||
|
@ -124,6 +127,8 @@ EXAMPLES
|
||||||
cmd.Flags().String("build", "auto",
|
cmd.Flags().String("build", "auto",
|
||||||
"Build the function. [auto|true|false]. ($FUNC_BUILD)")
|
"Build the function. [auto|true|false]. ($FUNC_BUILD)")
|
||||||
cmd.Flags().Lookup("build").NoOptDefVal = "true" // register `--build` as equivalient to `--build=true`
|
cmd.Flags().Lookup("build").NoOptDefVal = "true" // register `--build` as equivalient to `--build=true`
|
||||||
|
cmd.Flags().String("address", "",
|
||||||
|
"Interface and port on which to bind and listen. Default is 127.0.0.1:8080, or an available port if 8080 is not available. ($FUNC_ADDRESS)")
|
||||||
|
|
||||||
// Oft-shared flags:
|
// Oft-shared flags:
|
||||||
addConfirmFlag(cmd, cfg.Confirm)
|
addConfirmFlag(cmd, cfg.Confirm)
|
||||||
|
@ -234,7 +239,7 @@ func runRun(cmd *cobra.Command, newClient ClientFactory) (err error) {
|
||||||
// For the former, build is required and a container runtime. For the
|
// For the former, build is required and a container runtime. For the
|
||||||
// latter, scaffolding is first applied and the local host must be
|
// latter, scaffolding is first applied and the local host must be
|
||||||
// configured to build/run the language of the function.
|
// configured to build/run the language of the function.
|
||||||
job, err := client.Run(cmd.Context(), f)
|
job, err := client.Run(cmd.Context(), f, fn.RunWithAddress(cfg.Address))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -285,6 +290,9 @@ type runConfig struct {
|
||||||
// StartTimeout optionally adjusts the startup timeout from the client's
|
// StartTimeout optionally adjusts the startup timeout from the client's
|
||||||
// default of fn.DefaultStartTimeout.
|
// default of fn.DefaultStartTimeout.
|
||||||
StartTimeout time.Duration
|
StartTimeout time.Duration
|
||||||
|
|
||||||
|
// Address is the interface and port to bind (e.g. "0.0.0.0:8081")
|
||||||
|
Address string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRunConfig(cmd *cobra.Command) (c runConfig) {
|
func newRunConfig(cmd *cobra.Command) (c runConfig) {
|
||||||
|
@ -294,6 +302,7 @@ func newRunConfig(cmd *cobra.Command) (c runConfig) {
|
||||||
Env: viper.GetStringSlice("env"),
|
Env: viper.GetStringSlice("env"),
|
||||||
Container: viper.GetBool("container"),
|
Container: viper.GetBool("container"),
|
||||||
StartTimeout: viper.GetDuration("start-timeout"),
|
StartTimeout: viper.GetDuration("start-timeout"),
|
||||||
|
Address: viper.GetString("address"),
|
||||||
}
|
}
|
||||||
// NOTE: .Env should be viper.GetStringSlice, but this returns unparsed
|
// NOTE: .Env should be viper.GetStringSlice, but this returns unparsed
|
||||||
// results and appears to be an open issue since 2017:
|
// results and appears to be an open issue since 2017:
|
||||||
|
|
|
@ -107,7 +107,7 @@ func TestRun_Run(t *testing.T) {
|
||||||
|
|
||||||
runner := mock.NewRunner()
|
runner := mock.NewRunner()
|
||||||
if tt.runError != nil {
|
if tt.runError != nil {
|
||||||
runner.RunFn = func(context.Context, fn.Function, time.Duration) (*fn.Job, error) { return nil, tt.runError }
|
runner.RunFn = func(context.Context, fn.Function, string, time.Duration) (*fn.Job, error) { return nil, tt.runError }
|
||||||
}
|
}
|
||||||
|
|
||||||
builder := mock.NewBuilder()
|
builder := mock.NewBuilder()
|
||||||
|
@ -220,7 +220,7 @@ func TestRun_Images(t *testing.T) {
|
||||||
runner := mock.NewRunner()
|
runner := mock.NewRunner()
|
||||||
|
|
||||||
if tt.runError != nil {
|
if tt.runError != nil {
|
||||||
runner.RunFn = func(context.Context, fn.Function, time.Duration) (*fn.Job, error) { return nil, tt.runError }
|
runner.RunFn = func(context.Context, fn.Function, string, time.Duration) (*fn.Job, error) { return nil, tt.runError }
|
||||||
}
|
}
|
||||||
|
|
||||||
builder := mock.NewBuilder()
|
builder := mock.NewBuilder()
|
||||||
|
@ -324,7 +324,7 @@ func TestRun_CorrectImage(t *testing.T) {
|
||||||
root := FromTempDirectory(t)
|
root := FromTempDirectory(t)
|
||||||
runner := mock.NewRunner()
|
runner := mock.NewRunner()
|
||||||
|
|
||||||
runner.RunFn = func(_ context.Context, f fn.Function, _ time.Duration) (*fn.Job, error) {
|
runner.RunFn = func(_ context.Context, f fn.Function, _ string, _ time.Duration) (*fn.Job, error) {
|
||||||
// TODO: add if for empty image? -- should fail beforehand
|
// TODO: add if for empty image? -- should fail beforehand
|
||||||
if f.Build.Image != tt.image {
|
if f.Build.Image != tt.image {
|
||||||
return nil, fmt.Errorf("Expected image: %v but got: %v", tt.image, f.Build.Image)
|
return nil, fmt.Errorf("Expected image: %v but got: %v", tt.image, f.Build.Image)
|
||||||
|
@ -394,7 +394,7 @@ func TestRun_DirectOverride(t *testing.T) {
|
||||||
root := FromTempDirectory(t)
|
root := FromTempDirectory(t)
|
||||||
runner := mock.NewRunner()
|
runner := mock.NewRunner()
|
||||||
|
|
||||||
runner.RunFn = func(_ context.Context, f fn.Function, _ time.Duration) (*fn.Job, error) {
|
runner.RunFn = func(_ context.Context, f fn.Function, _ string, _ time.Duration) (*fn.Job, error) {
|
||||||
if f.Build.Image != overrideImage {
|
if f.Build.Image != overrideImage {
|
||||||
return nil, fmt.Errorf("Expected image to be overridden with '%v' but got: '%v'", overrideImage, f.Build.Image)
|
return nil, fmt.Errorf("Expected image to be overridden with '%v' but got: '%v'", overrideImage, f.Build.Image)
|
||||||
}
|
}
|
||||||
|
@ -462,3 +462,47 @@ func TestRun_DirectOverride(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestRun_Address ensures that the --address flag is passed to the runner.
|
||||||
|
func TestRun_Address(t *testing.T) {
|
||||||
|
root := FromTempDirectory(t)
|
||||||
|
_, err := fn.New().Init(fn.Function{Root: root, Runtime: "go"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
testAddr := "0.0.0.0:1234"
|
||||||
|
|
||||||
|
runner := mock.NewRunner()
|
||||||
|
runner.RunFn = func(_ context.Context, f fn.Function, addr string, _ time.Duration) (*fn.Job, error) {
|
||||||
|
if addr != testAddr {
|
||||||
|
return nil, fmt.Errorf("Expected address '%v' but got: '%v'", testAddr, addr)
|
||||||
|
}
|
||||||
|
errs := make(chan error, 1)
|
||||||
|
stop := func() error { return nil }
|
||||||
|
return fn.NewJob(f, "127.0.0.1", "8080", errs, stop, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RUN THE ACTUAL TESTED COMMAND
|
||||||
|
cmd := NewRunCmd(NewTestClient(
|
||||||
|
fn.WithRunner(runner),
|
||||||
|
fn.WithRegistry("ghcr.com/reg"),
|
||||||
|
))
|
||||||
|
cmd.SetArgs([]string{"--address", testAddr})
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
runErrCh := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
_, err := cmd.ExecuteContextC(ctx)
|
||||||
|
if err != nil {
|
||||||
|
runErrCh <- err // error was not expected
|
||||||
|
return
|
||||||
|
}
|
||||||
|
close(runErrCh) // release the waiting parent process
|
||||||
|
}()
|
||||||
|
cancel() // trigger the return of cmd.ExecuteContextC in the routine
|
||||||
|
<-ctx.Done()
|
||||||
|
if err := <-runErrCh; err != nil { // wait for completion of assertions
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ NAME
|
||||||
SYNOPSIS
|
SYNOPSIS
|
||||||
func run [-t|--container] [-r|--registry] [-i|--image] [-e|--env]
|
func run [-t|--container] [-r|--registry] [-i|--image] [-e|--env]
|
||||||
[--build] [-b|--builder] [--builder-image] [-c|--confirm]
|
[--build] [-b|--builder] [--builder-image] [-c|--confirm]
|
||||||
[-v|--verbose]
|
[--address] [-v|--verbose]
|
||||||
|
|
||||||
DESCRIPTION
|
DESCRIPTION
|
||||||
Run the function locally.
|
Run the function locally.
|
||||||
|
@ -52,6 +52,9 @@ EXAMPLES
|
||||||
o Run the function locally on the host with no containerization (Go only).
|
o Run the function locally on the host with no containerization (Go only).
|
||||||
$ func run --container=false
|
$ func run --container=false
|
||||||
|
|
||||||
|
o Run the function locally on a specific address.
|
||||||
|
$ func run --address=0.0.0.0:8081
|
||||||
|
|
||||||
|
|
||||||
```
|
```
|
||||||
func run
|
func run
|
||||||
|
@ -60,6 +63,7 @@ func run
|
||||||
### Options
|
### Options
|
||||||
|
|
||||||
```
|
```
|
||||||
|
--address string Interface and port on which to bind and listen. Default is 127.0.0.1:8080, or an available port if 8080 is not available. ($FUNC_ADDRESS)
|
||||||
--build string[="true"] Build the function. [auto|true|false]. ($FUNC_BUILD) (default "auto")
|
--build string[="true"] Build the function. [auto|true|false]. ($FUNC_BUILD) (default "auto")
|
||||||
-b, --builder string Builder to use when creating the function's container. Currently supported builders are "host", "pack" and "s2i". (default "pack")
|
-b, --builder string Builder to use when creating the function's container. Currently supported builders are "host", "pack" and "s2i". (default "pack")
|
||||||
--builder-image string Specify a custom builder image for use by the builder other than its default. ($FUNC_BUILDER_IMAGE)
|
--builder-image string Specify a custom builder image for use by the builder other than its default. ($FUNC_BUILDER_IMAGE)
|
||||||
|
|
|
@ -50,10 +50,11 @@ func NewRunner(verbose bool, out, errOut io.Writer) *Runner {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run the function.
|
// Run the function.
|
||||||
func (n *Runner) Run(ctx context.Context, f fn.Function, startTimeout time.Duration) (job *fn.Job, err error) {
|
func (n *Runner) Run(ctx context.Context, f fn.Function, address string, startTimeout time.Duration) (job *fn.Job, err error) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
port = choosePort(DefaultHost, DefaultPort, DefaultDialTimeout)
|
host = DefaultHost
|
||||||
|
port = DefaultPort
|
||||||
c client.CommonAPIClient // Docker client
|
c client.CommonAPIClient // Docker client
|
||||||
id string // ID of running container
|
id string // ID of running container
|
||||||
conn net.Conn // Connection to container's stdio
|
conn net.Conn // Connection to container's stdio
|
||||||
|
@ -67,13 +68,25 @@ func (n *Runner) Run(ctx context.Context, f fn.Function, startTimeout time.Durat
|
||||||
runtimeErrCh = make(chan error, 10)
|
runtimeErrCh = make(chan error, 10)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Parse address if provided
|
||||||
|
if address != "" {
|
||||||
|
var err error
|
||||||
|
host, port, err = net.SplitHostPort(address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid address format '%s': %w", address, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Choose an available port
|
||||||
|
port = choosePort(host, port, DefaultDialTimeout)
|
||||||
|
|
||||||
if f.Build.Image == "" {
|
if f.Build.Image == "" {
|
||||||
return job, errors.New("Function has no associated image. Has it been built?")
|
return job, errors.New("Function has no associated image. Has it been built?")
|
||||||
}
|
}
|
||||||
if c, _, err = NewClient(client.DefaultDockerHost); err != nil {
|
if c, _, err = NewClient(client.DefaultDockerHost); err != nil {
|
||||||
return job, errors.Wrap(err, "failed to create Docker API client")
|
return job, errors.Wrap(err, "failed to create Docker API client")
|
||||||
}
|
}
|
||||||
if id, err = newContainer(ctx, c, f, port, n.verbose); err != nil {
|
if id, err = newContainer(ctx, c, f, host, port, n.verbose); err != nil {
|
||||||
return job, errors.Wrap(err, "runner unable to create container")
|
return job, errors.Wrap(err, "runner unable to create container")
|
||||||
}
|
}
|
||||||
if conn, err = copyStdio(ctx, c, id, copyErrCh, n.out, n.errOut); err != nil {
|
if conn, err = copyStdio(ctx, c, id, copyErrCh, n.out, n.errOut); err != nil {
|
||||||
|
@ -136,7 +149,7 @@ func (n *Runner) Run(ctx context.Context, f fn.Function, startTimeout time.Durat
|
||||||
}
|
}
|
||||||
|
|
||||||
// Job reporting port, runtime errors and provides a mechanism for stopping.
|
// Job reporting port, runtime errors and provides a mechanism for stopping.
|
||||||
return fn.NewJob(f, DefaultHost, port, runtimeErrCh, stop, n.verbose)
|
return fn.NewJob(f, host, port, runtimeErrCh, stop, n.verbose)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dial the given (tcp) port on the given interface, returning an error if it is
|
// Dial the given (tcp) port on the given interface, returning an error if it is
|
||||||
|
@ -178,7 +191,7 @@ func choosePort(host, preferredPort string, dialTimeout time.Duration) string {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newContainer(ctx context.Context, c client.CommonAPIClient, f fn.Function, port string, verbose bool) (id string, err error) {
|
func newContainer(ctx context.Context, c client.CommonAPIClient, f fn.Function, host, port string, verbose bool) (id string, err error) {
|
||||||
var (
|
var (
|
||||||
containerCfg container.Config
|
containerCfg container.Config
|
||||||
hostCfg container.HostConfig
|
hostCfg container.HostConfig
|
||||||
|
@ -186,7 +199,7 @@ func newContainer(ctx context.Context, c client.CommonAPIClient, f fn.Function,
|
||||||
if containerCfg, err = newContainerConfig(f, port, verbose); err != nil {
|
if containerCfg, err = newContainerConfig(f, port, verbose); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if hostCfg, err = newHostConfig(port); err != nil {
|
if hostCfg, err = newHostConfig(host, port); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t, err := c.ContainerCreate(ctx, &containerCfg, &hostCfg, nil, nil, "")
|
t, err := c.ContainerCreate(ctx, &containerCfg, &hostCfg, nil, nil, "")
|
||||||
|
@ -225,14 +238,14 @@ func newContainerConfig(f fn.Function, _ string, verbose bool) (c container.Conf
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHostConfig(port string) (c container.HostConfig, err error) {
|
func newHostConfig(host, port string) (c container.HostConfig, err error) {
|
||||||
// httpPort := nat.Port(fmt.Sprintf("%v/tcp", port))
|
// httpPort := nat.Port(fmt.Sprintf("%v/tcp", port))
|
||||||
httpPort := nat.Port("8080/tcp")
|
httpPort := nat.Port("8080/tcp")
|
||||||
ports := map[nat.Port][]nat.PortBinding{
|
ports := map[nat.Port][]nat.PortBinding{
|
||||||
httpPort: {
|
httpPort: {
|
||||||
nat.PortBinding{
|
nat.PortBinding{
|
||||||
HostPort: port,
|
HostPort: port,
|
||||||
HostIP: "127.0.0.1",
|
HostIP: host,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,7 @@ func TestRun(t *testing.T) {
|
||||||
// Run the function using a docker runner
|
// Run the function using a docker runner
|
||||||
var out, errOut bytes.Buffer
|
var out, errOut bytes.Buffer
|
||||||
runner := docker.NewRunner(true, &out, &errOut)
|
runner := docker.NewRunner(true, &out, &errOut)
|
||||||
j, err := runner.Run(ctx, f, fn.DefaultStartTimeout)
|
j, err := runner.Run(ctx, f, "", fn.DefaultStartTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,7 +36,7 @@ func TestDockerRun(t *testing.T) {
|
||||||
// NOTE: test requires that the image be built already.
|
// NOTE: test requires that the image be built already.
|
||||||
|
|
||||||
runner := docker.NewRunner(true, os.Stdout, os.Stdout)
|
runner := docker.NewRunner(true, os.Stdout, os.Stdout)
|
||||||
if _, err = runner.Run(context.Background(), f, fn.DefaultStartTimeout); err != nil {
|
if _, err = runner.Run(context.Background(), f, "", fn.DefaultStartTimeout); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
/* TODO
|
/* TODO
|
||||||
|
@ -50,7 +50,7 @@ func TestDockerRunImagelessError(t *testing.T) {
|
||||||
runner := docker.NewRunner(true, os.Stdout, os.Stderr)
|
runner := docker.NewRunner(true, os.Stdout, os.Stderr)
|
||||||
f := fn.NewFunctionWith(fn.Function{})
|
f := fn.NewFunctionWith(fn.Function{})
|
||||||
|
|
||||||
_, err := runner.Run(context.Background(), f, fn.DefaultStartTimeout)
|
_, err := runner.Run(context.Background(), f, "", fn.DefaultStartTimeout)
|
||||||
// TODO: switch to typed error:
|
// TODO: switch to typed error:
|
||||||
expectedErrorMessage := "Function has no associated image. Has it been built?"
|
expectedErrorMessage := "Function has no associated image. Has it been built?"
|
||||||
if err == nil || err.Error() != expectedErrorMessage {
|
if err == nil || err.Error() != expectedErrorMessage {
|
||||||
|
|
|
@ -133,7 +133,7 @@ type Runner interface {
|
||||||
// a stop function. The process can be stopped by running the returned stop
|
// a stop function. The process can be stopped by running the returned stop
|
||||||
// function, either on context cancellation or in a defer.
|
// function, either on context cancellation or in a defer.
|
||||||
// The duration is the time to wait for the job to start.
|
// The duration is the time to wait for the job to start.
|
||||||
Run(context.Context, Function, time.Duration) (*Job, error)
|
Run(context.Context, Function, string, time.Duration) (*Job, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remover of deployed services.
|
// Remover of deployed services.
|
||||||
|
@ -176,11 +176,12 @@ type Instance struct {
|
||||||
Route string
|
Route string
|
||||||
// Routes is the primary route plus any other route at which the function
|
// Routes is the primary route plus any other route at which the function
|
||||||
// can be contacted.
|
// can be contacted.
|
||||||
Routes []string `json:"routes" yaml:"routes"`
|
Routes []string `json:"routes" yaml:"routes"`
|
||||||
Name string `json:"name" yaml:"name"`
|
Name string `json:"name" yaml:"name"`
|
||||||
Image string `json:"image" yaml:"image"`
|
Image string `json:"image" yaml:"image"`
|
||||||
Namespace string `json:"namespace" yaml:"namespace"`
|
Namespace string `json:"namespace" yaml:"namespace"`
|
||||||
Subscriptions []Subscription `json:"subscriptions" yaml:"subscriptions"`
|
Subscriptions []Subscription `json:"subscriptions" yaml:"subscriptions"`
|
||||||
|
Labels map[string]string `json:"labels" yaml:"labels"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscriptions currently active to event sources
|
// Subscriptions currently active to event sources
|
||||||
|
@ -906,6 +907,7 @@ func (c *Client) Route(ctx context.Context, f Function) (string, Function, error
|
||||||
|
|
||||||
type RunOptions struct {
|
type RunOptions struct {
|
||||||
StartTimeout time.Duration
|
StartTimeout time.Duration
|
||||||
|
Address string
|
||||||
}
|
}
|
||||||
|
|
||||||
type RunOption func(c *RunOptions)
|
type RunOption func(c *RunOptions)
|
||||||
|
@ -920,6 +922,12 @@ func RunWithStartTimeout(t time.Duration) RunOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RunWithAddress(address string) RunOption {
|
||||||
|
return func(c *RunOptions) {
|
||||||
|
c.Address = address
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Run the function whose code resides at root.
|
// Run the function whose code resides at root.
|
||||||
// On start, the chosen port is sent to the provided started channel
|
// On start, the chosen port is sent to the provided started channel
|
||||||
func (c *Client) Run(ctx context.Context, f Function, options ...RunOption) (job *Job, err error) {
|
func (c *Client) Run(ctx context.Context, f Function, options ...RunOption) (job *Job, err error) {
|
||||||
|
@ -944,7 +952,7 @@ func (c *Client) Run(ctx context.Context, f Function, options ...RunOption) (job
|
||||||
|
|
||||||
// Run the function, which returns a Job for use interacting (at arms length)
|
// Run the function, which returns a Job for use interacting (at arms length)
|
||||||
// with that running task (which is likely inside a container process).
|
// with that running task (which is likely inside a container process).
|
||||||
if job, err = c.runner.Run(ctx, f, timeout); err != nil {
|
if job, err = c.runner.Run(ctx, f, oo.Address, timeout); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1740,7 +1740,7 @@ func TestClient_Invoke_HTTP(t *testing.T) {
|
||||||
// Create a client with a mock runner which will report the port at which the
|
// Create a client with a mock runner which will report the port at which the
|
||||||
// interloping function is listening.
|
// interloping function is listening.
|
||||||
runner := mock.NewRunner()
|
runner := mock.NewRunner()
|
||||||
runner.RunFn = func(ctx context.Context, f fn.Function, _ time.Duration) (*fn.Job, error) {
|
runner.RunFn = func(ctx context.Context, f fn.Function, _ string, _ time.Duration) (*fn.Job, error) {
|
||||||
_, p, _ := net.SplitHostPort(l.Addr().String())
|
_, p, _ := net.SplitHostPort(l.Addr().String())
|
||||||
errs := make(chan error, 10)
|
errs := make(chan error, 10)
|
||||||
stop := func() error { return nil }
|
stop := func() error { return nil }
|
||||||
|
@ -1842,7 +1842,7 @@ func TestClient_Invoke_CloudEvent(t *testing.T) {
|
||||||
|
|
||||||
// Create a client with a mock Runner which returns its address.
|
// Create a client with a mock Runner which returns its address.
|
||||||
runner := mock.NewRunner()
|
runner := mock.NewRunner()
|
||||||
runner.RunFn = func(ctx context.Context, f fn.Function, _ time.Duration) (*fn.Job, error) {
|
runner.RunFn = func(ctx context.Context, f fn.Function, _ string, _ time.Duration) (*fn.Job, error) {
|
||||||
_, p, _ := net.SplitHostPort(l.Addr().String())
|
_, p, _ := net.SplitHostPort(l.Addr().String())
|
||||||
errs := make(chan error, 10)
|
errs := make(chan error, 10)
|
||||||
stop := func() error { return nil }
|
stop := func() error { return nil }
|
||||||
|
@ -1896,7 +1896,7 @@ func TestClient_Instances(t *testing.T) {
|
||||||
|
|
||||||
// A mock runner
|
// A mock runner
|
||||||
runner := mock.NewRunner()
|
runner := mock.NewRunner()
|
||||||
runner.RunFn = func(_ context.Context, f fn.Function, _ time.Duration) (*fn.Job, error) {
|
runner.RunFn = func(_ context.Context, f fn.Function, _ string, _ time.Duration) (*fn.Job, error) {
|
||||||
errs := make(chan error, 10)
|
errs := make(chan error, 10)
|
||||||
stop := func() error { return nil }
|
stop := func() error { return nil }
|
||||||
return fn.NewJob(f, "127.0.0.1", "8080", errs, stop, false)
|
return fn.NewJob(f, "127.0.0.1", "8080", errs, stop, false)
|
||||||
|
|
|
@ -36,20 +36,31 @@ func newDefaultRunner(client *Client, out, err io.Writer) *defaultRunner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *defaultRunner) Run(ctx context.Context, f Function, startTimeout time.Duration) (job *Job, err error) {
|
func (r *defaultRunner) Run(ctx context.Context, f Function, address string, startTimeout time.Duration) (job *Job, err error) {
|
||||||
var (
|
var (
|
||||||
port string
|
|
||||||
runFn func() error
|
runFn func() error
|
||||||
verbose = r.client.verbose
|
verbose = r.client.verbose
|
||||||
)
|
)
|
||||||
|
|
||||||
port, err = choosePort(defaultRunHost, defaultRunPort)
|
// Parse address if provided, otherwise use defaults
|
||||||
|
host := defaultRunHost
|
||||||
|
port := defaultRunPort
|
||||||
|
|
||||||
|
if address != "" {
|
||||||
|
var err error
|
||||||
|
host, port, err = net.SplitHostPort(address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid address format '%s': %w", address, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
port, err = choosePort(host, port)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("cannot choose port: %w", err)
|
return nil, fmt.Errorf("cannot choose port: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Job contains metadata and references for the running function.
|
// Job contains metadata and references for the running function.
|
||||||
job, err = NewJob(f, defaultRunHost, port, nil, nil, verbose)
|
job, err = NewJob(f, host, port, nil, nil, verbose)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,13 +13,13 @@ import (
|
||||||
type Runner struct {
|
type Runner struct {
|
||||||
RunInvoked bool
|
RunInvoked bool
|
||||||
RootRequested string
|
RootRequested string
|
||||||
RunFn func(context.Context, fn.Function, time.Duration) (*fn.Job, error)
|
RunFn func(context.Context, fn.Function, string, time.Duration) (*fn.Job, error)
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRunner() *Runner {
|
func NewRunner() *Runner {
|
||||||
return &Runner{
|
return &Runner{
|
||||||
RunFn: func(ctx context.Context, f fn.Function, t time.Duration) (*fn.Job, error) {
|
RunFn: func(ctx context.Context, f fn.Function, addr string, t time.Duration) (*fn.Job, error) {
|
||||||
errs := make(chan error, 1)
|
errs := make(chan error, 1)
|
||||||
stop := func() error { return nil }
|
stop := func() error { return nil }
|
||||||
return fn.NewJob(f, "127.0.0.1", "8080", errs, stop, false)
|
return fn.NewJob(f, "127.0.0.1", "8080", errs, stop, false)
|
||||||
|
@ -27,11 +27,11 @@ func NewRunner() *Runner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runner) Run(ctx context.Context, f fn.Function, t time.Duration) (*fn.Job, error) {
|
func (r *Runner) Run(ctx context.Context, f fn.Function, addr string, t time.Duration) (*fn.Job, error) {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
r.RunInvoked = true
|
r.RunInvoked = true
|
||||||
r.RootRequested = f.Root
|
r.RootRequested = f.Root
|
||||||
|
|
||||||
return r.RunFn(ctx, f, t)
|
return r.RunFn(ctx, f, addr, t)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue