mirror of https://github.com/knative/func.git
183 lines
5.5 KiB
Go
183 lines
5.5 KiB
Go
package function
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"time"
|
|
|
|
cloudevents "github.com/cloudevents/sdk-go/v2"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
const (
|
|
DefaultInvokeSource = "/boson/fn"
|
|
DefaultInvokeType = "boson.fn"
|
|
DefaultInvokeContentType = "text/plain"
|
|
DefaultInvokeData = "Hello World"
|
|
DefaultInvokeFormat = "http"
|
|
)
|
|
|
|
// InvokeMesage is the message used by the convenience method Invoke to provide
|
|
// a simple way to trigger the execution of a Function during development.
|
|
type InvokeMessage struct {
|
|
ID string
|
|
Source string
|
|
Type string
|
|
ContentType string
|
|
Data string
|
|
Format string //optional override for Function-defined message format
|
|
}
|
|
|
|
// NewInvokeMessage creates a new InvokeMessage with fields populated
|
|
func NewInvokeMessage() InvokeMessage {
|
|
return InvokeMessage{
|
|
ID: uuid.NewString(),
|
|
Source: DefaultInvokeSource,
|
|
Type: DefaultInvokeType,
|
|
ContentType: DefaultInvokeContentType,
|
|
Data: DefaultInvokeData,
|
|
// Format override not set by default: value from Function being preferred.
|
|
}
|
|
}
|
|
|
|
// invoke the Function instance in the target environment with the
|
|
// invocation message.
|
|
func invoke(ctx context.Context, c *Client, f Function, target string, m InvokeMessage) (string, error) {
|
|
|
|
// Get the first available route from 'local', 'remote', a named environment
|
|
// or treat target
|
|
route, err := invocationRoute(ctx, c, f, target) // choose instance to invoke
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Format" either 'http' or 'cloudevent'
|
|
// TODO: discuss if providing a Format on Message should a) update the
|
|
// Function to use the new format if none is defined already (backwards
|
|
// compatibility fix) or b) always update the Function, even if it was already
|
|
// set. Once decided, codify in a test.
|
|
format := DefaultInvokeFormat
|
|
if f.Invocation.Format != "" {
|
|
// Prefer the format set during Function creation if defined.
|
|
format = f.Invocation.Format
|
|
}
|
|
if m.Format != "" {
|
|
// Use the override specified on the message if provided
|
|
format = m.Format
|
|
}
|
|
|
|
switch format {
|
|
case "http":
|
|
return sendPost(ctx, route, m, c.transport)
|
|
case "cloudevent":
|
|
return sendEvent(ctx, route, m, c.transport)
|
|
default:
|
|
return "", fmt.Errorf("format '%v' not supported.", format)
|
|
}
|
|
}
|
|
|
|
// invocationRoute returns a route to the named target instance of a Func:
|
|
// 'local': local environment; locally running Function (error if not running)
|
|
// 'remote': remote environment; first available instance (error if none)
|
|
// '<environment>': A valid alternate target which contains instances.
|
|
// '<url>': An explicit URL
|
|
// '': Default if no target is passed is to first use local, then remote.
|
|
// errors if neither are available.
|
|
func invocationRoute(ctx context.Context, c *Client, f Function, target string) (string, error) {
|
|
// TODO: this function has code-smell; will de-smellify it in next pass.
|
|
if target == EnvironmentLocal {
|
|
instance, err := c.Instances().Get(ctx, f, target)
|
|
if err != nil {
|
|
if errors.Is(err, ErrEnvironmentNotFound) {
|
|
return "", errors.New("not running locally")
|
|
}
|
|
return "", err
|
|
}
|
|
return instance.Route, nil
|
|
|
|
} else if target == EnvironmentRemote {
|
|
instance, err := c.Instances().Get(ctx, f, target)
|
|
if err != nil {
|
|
if errors.Is(err, ErrEnvironmentNotFound) {
|
|
return "", errors.New("not running in remote")
|
|
}
|
|
return "", err
|
|
}
|
|
return instance.Route, nil
|
|
|
|
} else if target == "" { // target blank, check local first then remote.
|
|
instance, err := c.Instances().Get(ctx, f, EnvironmentLocal)
|
|
if err != nil && !errors.Is(err, ErrNotRunning) {
|
|
return "", err // unexpected errors are anything other than ErrNotRunning
|
|
}
|
|
if err == nil {
|
|
return instance.Route, nil // found instance in local environment
|
|
}
|
|
instance, err = c.Instances().Get(ctx, f, EnvironmentRemote)
|
|
if errors.Is(err, ErrNotRunning) {
|
|
return "", errors.New("not running locally or in the remote")
|
|
}
|
|
if err != nil {
|
|
return "", err // unexpected error
|
|
}
|
|
return instance.Route, nil
|
|
} else { // treat an unrecognized target as an ad-hoc verbatim endpoint
|
|
return target, nil
|
|
}
|
|
}
|
|
|
|
// sendEvent to the route populated with data in the invoke message.
|
|
func sendEvent(ctx context.Context, route string, m InvokeMessage, t http.RoundTripper) (resp string, err error) {
|
|
event := cloudevents.NewEvent()
|
|
event.SetID(m.ID)
|
|
event.SetSource(m.Source)
|
|
event.SetType(m.Type)
|
|
if err = event.SetData(m.ContentType, m.Data); err != nil {
|
|
return
|
|
}
|
|
|
|
c, err := cloudevents.NewClientHTTP(
|
|
cloudevents.WithTarget(route),
|
|
cloudevents.WithRoundTripper(t))
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
evt, result := c.Request(cloudevents.ContextWithTarget(ctx, route), event)
|
|
if cloudevents.IsUndelivered(result) {
|
|
err = fmt.Errorf("unable to invoke: %v", result)
|
|
} else if evt != nil { // Check for nil in case no event is returned
|
|
resp = evt.String()
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// sendPost to the route populated with data in the invoke message.
|
|
func sendPost(ctx context.Context, route string, m InvokeMessage, t http.RoundTripper) (string, error) {
|
|
client := http.Client{
|
|
Transport: t,
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
resp, err := client.PostForm(route, url.Values{
|
|
"ID": {m.ID},
|
|
"Source": {m.Source},
|
|
"Type": {m.Type},
|
|
"ContentType": {m.ContentType},
|
|
"Data": {m.Data},
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode > 299 {
|
|
return "", fmt.Errorf("failure invoking '%v' (HTTP %v)", route, resp.StatusCode)
|
|
}
|
|
b, err := io.ReadAll(resp.Body)
|
|
return string(b), err
|
|
}
|