mirror of https://github.com/knative/func.git
217 lines
6.5 KiB
Go
217 lines
6.5 KiB
Go
package functions
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"time"
|
|
|
|
cloudevents "github.com/cloudevents/sdk-go/v2"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
const (
|
|
DefaultInvokeSource = "/boson/fn"
|
|
DefaultInvokeType = "boson.fn"
|
|
DefaultInvokeContentType = "application/json"
|
|
DefaultInvokeData = `{"message":"Hello World"}`
|
|
DefaultInvokeFormat = "http"
|
|
)
|
|
|
|
// InvokeMesage is the message used by the convenience method Invoke to provide
|
|
// a simple way to trigger the execution of a function during development.
|
|
type InvokeMessage struct {
|
|
ID string
|
|
Source string
|
|
Type string
|
|
ContentType string
|
|
Data []byte
|
|
Format string // optional override for function-defined message format
|
|
}
|
|
|
|
// NewInvokeMessage creates a new InvokeMessage with fields populated
|
|
func NewInvokeMessage() InvokeMessage {
|
|
return InvokeMessage{
|
|
ID: uuid.NewString(),
|
|
Source: DefaultInvokeSource,
|
|
Type: DefaultInvokeType,
|
|
ContentType: DefaultInvokeContentType,
|
|
Data: []byte(DefaultInvokeData),
|
|
// Format override not set by default: value from function being preferred.
|
|
}
|
|
}
|
|
|
|
// invoke the function instance in the target environment with the
|
|
// invocation message. Returned is metadata (such as HTTP headers or
|
|
// CloudEvent fields) and a stringified version of the payload.
|
|
func invoke(ctx context.Context, c *Client, f Function, target string, m InvokeMessage, verbose bool) (metadata map[string][]string, body string, err error) {
|
|
// Get the first available route from 'local', 'remote', a named environment
|
|
// or treat target
|
|
route, err := invocationRoute(ctx, c, f, target) // choose instance to invoke
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Format" either 'http' or 'cloudevent'
|
|
// TODO: discuss if providing a Format on Message should a) update the
|
|
// function to use the new format if none is defined already (backwards
|
|
// compatibility fix) or b) always update the function, even if it was already
|
|
// set. Once decided, codify in a test.
|
|
format := DefaultInvokeFormat
|
|
|
|
if verbose {
|
|
fmt.Printf("Invoking '%v' function at %v\n", f.Invoke, route)
|
|
}
|
|
|
|
if f.Invoke != "" {
|
|
// Prefer the format set during function creation if defined.
|
|
format = f.Invoke
|
|
}
|
|
if m.Format != "" {
|
|
// Use the override specified on the message if provided
|
|
format = m.Format
|
|
if verbose {
|
|
fmt.Printf("Invoking '%v' function using '%v' format\n", f.Invoke, m.Format)
|
|
}
|
|
}
|
|
|
|
switch format {
|
|
case "http":
|
|
return sendPost(ctx, route, m, c.transport, verbose)
|
|
case "cloudevent":
|
|
// CouldEvents return a string which always includes a fairly verbose
|
|
// summation of fields, so metadata is not applicable
|
|
meta := make(map[string][]string)
|
|
body, err = sendEvent(ctx, route, m, c.transport, verbose)
|
|
return meta, body, err
|
|
default:
|
|
err = fmt.Errorf("format '%v' not supported", format)
|
|
return
|
|
}
|
|
}
|
|
|
|
// invocationRoute returns a route to the named target instance of a func:
|
|
// 'local': local environment; locally running function (error if not running)
|
|
// 'remote': remote environment; first available instance (error if none)
|
|
// '<environment>': A valid alternate target which contains instances.
|
|
// '<url>': An explicit URL
|
|
// ”: Default if no target is passed is to first use local, then remote.
|
|
//
|
|
// errors if neither are available.
|
|
func invocationRoute(ctx context.Context, c *Client, f Function, target string) (string, error) {
|
|
// TODO: this function has code-smell; will de-smellify it in next pass.
|
|
switch target {
|
|
case EnvironmentLocal:
|
|
instance, err := c.Instances().Get(ctx, f, target)
|
|
if err != nil {
|
|
if errors.Is(err, ErrEnvironmentNotFound) {
|
|
return "", errors.New("not running locally")
|
|
}
|
|
return "", err
|
|
}
|
|
return instance.Route, nil
|
|
case EnvironmentRemote:
|
|
instance, err := c.Instances().Get(ctx, f, target)
|
|
if err != nil {
|
|
if errors.Is(err, ErrEnvironmentNotFound) {
|
|
return "", errors.New("not running in remote")
|
|
}
|
|
return "", err
|
|
}
|
|
return instance.Route, nil
|
|
case "":
|
|
instance, err := c.Instances().Get(ctx, f, EnvironmentLocal)
|
|
if err != nil && !errors.Is(err, ErrNotRunning) {
|
|
return "", err // unexpected errors are anything other than ErrNotRunning
|
|
}
|
|
if err == nil {
|
|
return instance.Route, nil // found instance in local environment
|
|
}
|
|
instance, err = c.Instances().Get(ctx, f, EnvironmentRemote)
|
|
if errors.Is(err, ErrNotRunning) {
|
|
return "", errors.New("not running locally or in the remote")
|
|
}
|
|
if err != nil {
|
|
return "", err // unexpected error
|
|
}
|
|
return instance.Route, nil
|
|
default:
|
|
return target, nil
|
|
}
|
|
}
|
|
|
|
// sendEvent to the route populated with data in the invoke message.
|
|
func sendEvent(ctx context.Context, route string, m InvokeMessage, t http.RoundTripper, verbose bool) (resp string, err error) {
|
|
event := cloudevents.NewEvent()
|
|
event.SetID(m.ID)
|
|
event.SetSource(m.Source)
|
|
event.SetType(m.Type)
|
|
err = event.SetData(m.ContentType, (m.Data))
|
|
if err != nil {
|
|
return "", fmt.Errorf("cannot set data: %w", err)
|
|
}
|
|
c, err := cloudevents.NewClientHTTP(
|
|
cloudevents.WithTarget(route),
|
|
cloudevents.WithRoundTripper(t))
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if verbose {
|
|
fmt.Printf("Sending event\n%v", event)
|
|
// note event's stringification already includes a trailing linebreak.
|
|
}
|
|
|
|
evt, result := c.Request(cloudevents.ContextWithTarget(ctx, route), event)
|
|
if cloudevents.IsUndelivered(result) {
|
|
err = fmt.Errorf("unable to invoke: %v", result)
|
|
} else if evt != nil { // Check for nil in case no event is returned
|
|
resp = evt.String()
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// sendPost to the route populated with data in the invoke message.
|
|
func sendPost(ctx context.Context, route string, m InvokeMessage, t http.RoundTripper, verbose bool) (map[string][]string, string, error) {
|
|
client := http.Client{
|
|
Transport: t,
|
|
Timeout: time.Minute,
|
|
}
|
|
values := url.Values{
|
|
"ID": {m.ID},
|
|
"Source": {m.Source},
|
|
"Type": {m.Type},
|
|
"ContentType": {m.ContentType},
|
|
"Data": {string(m.Data)},
|
|
}
|
|
if verbose {
|
|
fmt.Println("Sending values")
|
|
for k, v := range values {
|
|
fmt.Printf(" %v: %v\n", k, v[0]) // NOTE len==1 value slices assumed
|
|
}
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "POST", route, bytes.NewReader(m.Data))
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("failure to create request: %w", err)
|
|
}
|
|
req.Header.Add("Content-Type", m.ContentType)
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode > 299 {
|
|
return nil, "", fmt.Errorf("failure invoking '%v' (HTTP %v)", route, resp.StatusCode)
|
|
}
|
|
b, err := io.ReadAll(resp.Body)
|
|
return resp.Header, string(b), err
|
|
}
|