feat: host-based scaffolded function runner (#1733)

* feat: host runner

* improve error message wording
This commit is contained in:
Luke Kingland 2023-05-25 06:26:25 +09:00 committed by GitHub
parent f155c9c7ae
commit b82a5a4eac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 317 additions and 27 deletions

View File

@ -49,7 +49,7 @@ func TestInvoke(t *testing.T) {
_, port, _ := net.SplitHostPort(l.Addr().String())
errs := make(chan error, 10)
stop := func() error { _ = s.Close(); return nil }
return fn.NewJob(f, port, errs, stop, false)
return fn.NewJob(f, "127.0.0.1", port, errs, stop, false)
}
// Run the mock http service function interloper

View File

@ -128,7 +128,7 @@ func (n *Runner) Run(ctx context.Context, f fn.Function) (job *fn.Job, err error
}
// Job reporting port, runtime errors and provides a mechanism for stopping.
return fn.NewJob(f, port, runtimeErrCh, stop, n.verbose)
return fn.NewJob(f, DefaultHost, port, runtimeErrCh, stop, n.verbose)
}
// Dial the given (tcp) port on the given interface, returning an error if it is

View File

@ -155,7 +155,7 @@ type Describer interface {
// there is a one to many relationship between a given route and processes.
// By default the system creates the 'local' and 'remote' named instances
// when a function is run (locally) and deployed, respectively.
// See the .Instances(f) accessor for the map of named environments to these
// See the .InstanceRefs(f) accessor for the map of named environments to these
// function information structures.
type Instance struct {
// Route is the primary route of a function instance.
@ -197,7 +197,6 @@ func New(options ...Option) *Client {
builder: &noopBuilder{output: os.Stdout},
pusher: &noopPusher{output: os.Stdout},
deployer: &noopDeployer{output: os.Stdout},
runner: &noopRunner{output: os.Stdout},
remover: &noopRemover{output: os.Stdout},
lister: &noopLister{output: os.Stdout},
describer: &noopDescriber{output: os.Stdout},
@ -206,6 +205,7 @@ func New(options ...Option) *Client {
pipelinesProvider: &noopPipelinesProvider{},
transport: http.DefaultTransport,
}
c.runner = newDefaultRunner(c, os.Stdout, os.Stderr)
for _, o := range options {
o(c)
}
@ -561,10 +561,7 @@ func (c *Client) Init(cfg Function) (Function, error) {
}
// Write out the new function's Template files.
// Templates contain values which may result in the function being mutated
// (default builders, etc)
err = c.Templates().Write(&f)
if err != nil {
if err = c.Templates().Write(&f); err != nil {
return f, err
}
@ -1216,13 +1213,6 @@ func (n *noopDeployer) Deploy(ctx context.Context, _ Function) (DeploymentResult
return DeploymentResult{}, nil
}
// Runner
type noopRunner struct{ output io.Writer }
func (n *noopRunner) Run(context.Context, Function) (job *Job, err error) {
return nil, errors.New("no runner available")
}
// Remover
type noopRemover struct{ output io.Writer }

View File

@ -594,8 +594,9 @@ func TestClient_New_Delegation(t *testing.T) {
}
}
// TestClient_Run ensures that the runner is invoked with the absolute path requested.
// TestClient_Run ensures that the runner is invoked with the path requested.
// Implicitly checks that the stop fn returned also is respected.
// See TestRunner for the unit test for the default runner implementation.
func TestClient_Run(t *testing.T) {
// Create the root function directory
root := "testdata/example.com/testRun"
@ -1474,7 +1475,7 @@ func TestClient_Invoke_HTTP(t *testing.T) {
_, p, _ := net.SplitHostPort(l.Addr().String())
errs := make(chan error, 10)
stop := func() error { return nil }
return fn.NewJob(f, p, errs, stop, false)
return fn.NewJob(f, "127.0.0.1", p, errs, stop, false)
}
client := fn.New(fn.WithRegistry(TestRegistry), fn.WithRunner(runner))
@ -1572,7 +1573,7 @@ func TestClient_Invoke_CloudEvent(t *testing.T) {
_, p, _ := net.SplitHostPort(l.Addr().String())
errs := make(chan error, 10)
stop := func() error { return nil }
return fn.NewJob(f, p, errs, stop, false)
return fn.NewJob(f, "127.0.0.1", p, errs, stop, false)
}
client := fn.New(fn.WithRegistry(TestRegistry), fn.WithRunner(runner))
@ -1621,7 +1622,7 @@ func TestClient_Instances(t *testing.T) {
runner.RunFn = func(_ context.Context, f fn.Function) (*fn.Job, error) {
errs := make(chan error, 10)
stop := func() error { return nil }
return fn.NewJob(f, "8080", errs, stop, false)
return fn.NewJob(f, "127.0.0.1", "8080", errs, stop, false)
}
// Client with the mock runner

View File

@ -1,3 +1,6 @@
//go:build !integration
// +build !integration
package functions
import (

View File

@ -1,3 +1,6 @@
//go:build !integration
// +build !integration
package functions
import (

View File

@ -17,6 +17,7 @@ const runsDir = "runs"
// the zero value of the struct is set up to noop without errors.
type Job struct {
Function Function
Host string
Port string
Errors chan error
onStop func() error
@ -26,9 +27,10 @@ type Job struct {
// Create a new Job which represents a running function task by providing
// the port on which it was started, a channel on which runtime errors can
// be received, and a stop function.
func NewJob(f Function, port string, errs chan error, onStop func() error, verbose bool) (j *Job, err error) {
func NewJob(f Function, host, port string, errs chan error, onStop func() error, verbose bool) (j *Job, err error) {
j = &Job{
Function: f,
Host: host,
Port: port,
Errors: errs,
onStop: onStop,

View File

@ -1,3 +1,6 @@
//go:build !integration
// +build !integration
package functions
import (
@ -31,15 +34,15 @@ func TestJob_New(t *testing.T) {
// Assert that an initialized function and port are required
onStop := func() error { return nil }
if _, err := NewJob(Function{}, "", nil, onStop, false); err == nil {
if _, err := NewJob(Function{}, "127.0.0.1", "8080", nil, onStop, false); err == nil {
t.Fatal("expected NewJob to require an initialized functoin")
}
if _, err := NewJob(f, "", nil, onStop, false); err == nil {
if _, err := NewJob(f, "127.0.0.1", "", nil, onStop, false); err == nil {
t.Fatal("expected NewJob to require a port")
}
// Assert creating a Job with the required arguments succeeds.
_, err = NewJob(f, "8080", nil, onStop, false)
_, err = NewJob(f, "127.0.0.1", "8080", nil, onStop, false)
if err != nil {
t.Fatalf("creating job failed. %s", err)
}
@ -75,7 +78,7 @@ func TestJob_Stop(t *testing.T) {
onStop := func() error { onStopInvoked = true; return nil }
// Assert creating a Job with the required arguments succeeds.
j, err := NewJob(f, "8080", nil, onStop, false)
j, err := NewJob(f, "127.0.0.1", "8080", nil, onStop, false)
if err != nil {
t.Fatalf("creating job failed. %s", err)
}

223
pkg/functions/runner.go Normal file
View File

@ -0,0 +1,223 @@
package functions
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"os"
"os/exec"
"path/filepath"
"time"
)
const (
defaultRunHost = "127.0.0.1" // TODO allow to be altered via a runOpt
defaultRunPort = "8080"
defaultRunDialTimeout = 2 * time.Second
defaultRunStopTimeout = 10 * time.Second
readinessEndpoint = "/health/readiness"
// defaultRunTimeout is long to allow for slow-starting functions by default
// TODO: allow to be shortened as-needed using a runOption.
defaultRunTimeout = 5 * time.Minute
)
type defaultRunner struct {
client *Client
out io.Writer
err io.Writer
}
func newDefaultRunner(client *Client, out, err io.Writer) *defaultRunner {
return &defaultRunner{
client: client,
out: out,
err: err,
}
}
func (r *defaultRunner) Run(ctx context.Context, f Function) (job *Job, err error) {
var (
port = choosePort(defaultRunHost, defaultRunPort, defaultRunDialTimeout)
runFn func() error
verbose = r.client.verbose
)
// Job contains metadata and references for the running function.
job, err = NewJob(f, defaultRunHost, port, nil, nil, verbose)
if err != nil {
return
}
// Scaffold the function such that it can be run.
if err = r.client.Scaffold(ctx, f, job.Dir()); err != nil {
return
}
// Runner for the Function's runtime.
if runFn, err = runFunc(ctx, job); err != nil {
return
}
// Run the scaffolded function asynchronously.
if err = runFn(); err != nil {
return
}
// Wait for it to become available before returning the metadata.
err = waitFor(job, defaultRunTimeout)
return
}
// runFunc returns a function which will run the user's Function based on
// the jobs runtime.
func runFunc(ctx context.Context, job *Job) (runFn func() error, err error) {
runtime := job.Function.Runtime
switch runtime {
case "go":
runFn = func() error { return runGo(ctx, job) }
case "python":
err = runnerNotImplemented{runtime}
case "java":
err = runnerNotImplemented{runtime}
case "node":
err = runnerNotImplemented{runtime}
case "typescript":
err = runnerNotImplemented{runtime}
case "rust":
err = runnerNotImplemented{runtime}
case "":
err = fmt.Errorf("runner requires the function have runtime set")
default:
err = fmt.Errorf("the %q runtime is not supported", runtime)
}
return
}
type runnerNotImplemented struct {
Runtime string
}
func (e runnerNotImplemented) Error() string {
return fmt.Sprintf("the %q runtime may only be run containerized.", e.Runtime)
}
func runGo(ctx context.Context, job *Job) (err error) {
// TODO: extract the build command code from the OCI Container Builder
// and have both the runner and OCI Container Builder use the same.
if job.verbose {
fmt.Printf("cd %v && go build -o f.bin\n", job.Dir())
}
// Build
args := []string{"build", "-o", "f.bin"}
if job.verbose {
args = append(args, "-v")
}
cmd := exec.CommandContext(ctx, "go", args...)
cmd.Dir = job.Dir()
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err = cmd.Run()
if err != nil {
return
}
// Run
bin := filepath.Join(job.Dir(), "f.bin")
if job.verbose {
fmt.Printf("cd %v && PORT=%v %v\n", job.Function.Root, job.Port, bin)
}
cmd = exec.CommandContext(ctx, bin)
cmd.Dir = job.Function.Root
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// cmd.Cancel = stop // TODO: use when we upgrade to go 1.20
if job.Host != "127.0.0.1" {
// TODO: Update the functions go runtime to accept LISTEN_ADDRESS rather
// than just port
fmt.Fprintf(os.Stderr, "Warning: the Go functions runtime currently only supports localhost '127.0.0.1'. Requested listen interface '%v' will be ignored.", job.Host)
}
// See the 1.19 [release notes](https://tip.golang.org/doc/go1.19) which state:
// A Cmd with a non-empty Dir field and nil Env now implicitly sets the PWD environment variable for the subprocess to match Dir.
// The new method Cmd.Environ reports the environment that would be used to run the command, including the implicitly set PWD variable.
// cmd.Env = append(cmd.Environ(), "PORT="+job.Port) // requires go 1.19
cmd.Env = append(cmd.Env, "PORT="+job.Port, "PWD="+cmd.Dir)
// Running asynchronously allows for the client Run method to return
// metadata about the running function such as its chosen port.
go func() {
job.Errors <- cmd.Run()
}()
return
}
func waitFor(job *Job, timeout time.Duration) error {
var (
url = fmt.Sprintf("http://%s:%s/%s", job.Host, job.Port, readinessEndpoint)
tick = time.NewTicker(200 * time.Millisecond)
)
defer tick.Stop()
if job.verbose {
fmt.Printf("Waiting for %v\n", url)
}
for {
select {
case <-time.After(timeout):
return errors.New("timed out waiting for function to be ready")
case <-tick.C:
resp, err := http.Get(url)
defer resp.Body.Close()
if err != nil {
if job.verbose {
fmt.Printf("Not ready (%v)\n", err)
}
continue
} else if resp.StatusCode != 200 {
if job.verbose {
fmt.Printf("Endpoint returned HTTP %v.\n", resp.StatusCode)
dump, _ := httputil.DumpResponse(resp, true)
fmt.Println(dump)
}
continue
}
return nil // no err and status code 200
}
}
}
// choosePort returns an unused port on the given interface (host)
// Note this is not fool-proof becase of a race with any other processes
// looking for a port at the same time. If that is important, we can implement
// a check-lock-check via the filesystem.
// Also note that TCP is presumed.
func choosePort(iface, preferredPort string, dialTimeout time.Duration) string {
var (
port = defaultRunPort
c net.Conn
l net.Listener
err error
)
// Try preferreed
if c, err = net.DialTimeout("tcp", net.JoinHostPort(iface, port), dialTimeout); err == nil {
c.Close() // note err==nil
return preferredPort
}
// OS-chosen
if l, err = net.Listen("tcp", net.JoinHostPort(iface, "")); err != nil {
fmt.Fprintf(os.Stderr, "unable to check for open ports. using fallback %v. %v", defaultRunPort, err)
return port
}
l.Close() // begins aforementioned race
if _, port, err = net.SplitHostPort(l.Addr().String()); err != nil {
fmt.Fprintf(os.Stderr, "error isolating port from '%v'. %v", l.Addr(), err)
}
return port
}

View File

@ -0,0 +1,62 @@
//go:build !integration
// +build !integration
package functions_test
import (
"context"
"fmt"
"net/http"
"testing"
fn "knative.dev/func/pkg/functions"
"knative.dev/func/pkg/oci"
. "knative.dev/func/pkg/testing"
)
// TestRunner ensures that the default internal runner correctly executes
// a scaffolded function.
func TestRunner(t *testing.T) {
// This integration test explicitly requires the "host" builder due to its
// lack of a dependency on a container runtime, and the other builders not
// taking advantage of Scaffolding (expected by this runner).
// See E2E tests for testing of running functions built using Pack or S2I and
// which are dependent on Podman or Docker.
// Currently only a Go function is tested because other runtimes do not yet
// have scaffolding.
root, cleanup := Mktemp(t)
defer cleanup()
ctx, cancel := context.WithCancel(context.Background())
client := fn.New(fn.WithBuilder(oci.NewBuilder("", true)), fn.WithVerbose(true))
// Initialize
f, err := client.Init(fn.Function{Root: root, Runtime: "go", Registry: TestRegistry})
if err != nil {
t.Fatal(err)
}
// Build
if f, err = client.Build(ctx, f); err != nil {
t.Fatal(err)
}
// Run
job, err := client.Run(ctx, f)
if err != nil {
t.Fatal(err)
}
// Invoke
resp, err := http.Get(fmt.Sprintf("http://%s:%s", job.Host, job.Port))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
t.Fatalf("unexpected response code: %v", resp.StatusCode)
}
cancel()
}

View File

@ -1,3 +1,6 @@
//go:build !integration
// +build !integration
package functions
import (

View File

@ -83,10 +83,10 @@ func (t template) Write(ctx context.Context, f *Function) error {
f.Invoke = t.config.Invoke
}
isManifest := func(p string) bool {
mask := func(p string) bool {
_, f := path.Split(p)
return f == templateManifest
}
return filesystem.CopyFromFS(".", f.Root, filesystem.NewMaskingFS(isManifest, t.fs)) // copy everything but manifest.yaml
return filesystem.CopyFromFS(".", f.Root, filesystem.NewMaskingFS(mask, t.fs)) // copy everything but manifest.yaml
}

View File

@ -21,7 +21,7 @@ func NewRunner() *Runner {
RunFn: func(ctx context.Context, f fn.Function) (*fn.Job, error) {
errs := make(chan error, 1)
stop := func() error { return nil }
return fn.NewJob(f, "8080", errs, stop, false)
return fn.NewJob(f, "127.0.0.1", "8080", errs, stop, false)
},
}
}