diff --git a/client.go b/client.go index a38153c42..fda65b4aa 100644 --- a/client.go +++ b/client.go @@ -32,6 +32,7 @@ type Client struct { templates string // path to extensible templates registry string // default registry for OCI image tags progressListener ProgressListener // progress listener + emitter Emitter // Emits CloudEvents to functions } // ErrNotBuilt indicates the Function has not yet been built. @@ -137,6 +138,11 @@ type DNSProvider interface { Provide(Function) error } +// Emit CloudEvents to functions +type Emitter interface { + Emit(ctx context.Context, endpoint string) error +} + // New client for Function management. func New(options ...Option) *Client { // Instantiate client with static defaults. @@ -149,6 +155,7 @@ func New(options ...Option) *Client { lister: &noopLister{output: os.Stdout}, dnsProvider: &noopDNSProvider{output: os.Stdout}, progressListener: &noopProgressListener{}, + emitter: &noopEmitter{}, } // Apply passed options, which take ultimate precidence. @@ -254,6 +261,14 @@ func WithRegistry(registry string) Option { } } +// WithEmitter sets a CloudEvent emitter on the client which is capable of sending +// a CloudEvent to an arbitrary function endpoint +func WithEmitter(e Emitter) Option { + return func(c *Client) { + c.emitter = e + } +} + // New Function. // Use Create, Build and Deploy independently for lower level control. func (c *Client) New(ctx context.Context, cfg Function) (err error) { @@ -529,6 +544,11 @@ func (c *Client) Remove(ctx context.Context, cfg Function) error { return c.remover.Remove(ctx, f.Name) } +// Emit a CloudEvent to a function endpoint +func (c *Client) Emit(ctx context.Context, endpoint string) error { + return c.emitter.Emit(ctx, endpoint) +} + // Manual implementations (noops) of required interfaces. // In practice, the user of this client package (for example the CLI) will // provide a concrete implementation for all of the interfaces. For testing or @@ -573,3 +593,7 @@ func (p *noopProgressListener) SetTotal(i int) {} func (p *noopProgressListener) Increment(m string) {} func (p *noopProgressListener) Complete(m string) {} func (p *noopProgressListener) Done() {} + +type noopEmitter struct{} + +func (p *noopEmitter) Emit(ctx context.Context, endpoint string) error { return nil } diff --git a/client_test.go b/client_test.go index 39cb37efa..8a4430b1b 100644 --- a/client_test.go +++ b/client_test.go @@ -704,6 +704,30 @@ func TestDeployUnbuilt(t *testing.T) { } } +func TestEmit(t *testing.T) { + sink := "http://testy.mctestface.com" + emitter := mock.NewEmitter() + + // Ensure sink passthrough from client + emitter.EmitFn = func(s string) error { + if s != sink { + t.Fatalf("Unexpected sink %v\n", s) + } + return nil + } + + // Instantiate in the current working directory, with no name. + client := bosonFunc.New(bosonFunc.WithEmitter(emitter)) + + if err := client.Emit(context.Background(), sink); err != nil { + t.Fatal(err) + } + if !emitter.EmitInvoked { + t.Fatal("Client did not invoke emitter.Emit()") + } + +} + // TODO: The tests which confirm an error is generated do not currently test // that the expected error is received; just that any error is generated. // This should be replaced with typed errors or at a minimum code prefixes diff --git a/cloudevents/emitter.go b/cloudevents/emitter.go new file mode 100644 index 000000000..956b96464 --- /dev/null +++ b/cloudevents/emitter.go @@ -0,0 +1,66 @@ +package cloudevents + +import ( + "context" + "fmt" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/client" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/protocol/http" + "github.com/cloudevents/sdk-go/v2/types" + "github.com/google/uuid" +) + +const ( + DefaultSource = "/boson/fn" + DefaultType = "boson.fn" +) + +type Emitter struct { + Endpoint string + Source string + Type string + Id string + Data string + ContentType string +} + +func NewEmitter() *Emitter { + return &Emitter{ + Source: DefaultSource, + Type: DefaultType, + Id: uuid.NewString(), + Data: "", + ContentType: event.TextPlain, + } +} + +func (e *Emitter) Emit(ctx context.Context, endpoint string) (err error) { + c, err := newClient(endpoint) + if err != nil { + return + } + evt := event.Event{ + Context: event.EventContextV1{ + Type: e.Type, + Source: *types.ParseURIRef(e.Source), + ID: e.Id, + }.AsV1(), + } + if err = evt.SetData(e.ContentType, e.Data); err != nil { + return + } + if result := c.Send(ctx, evt); cloudevents.IsUndelivered(result) { + return fmt.Errorf(result.Error()) + } + return nil +} + +func newClient(target string) (c client.Client, err error) { + p, err := http.New(http.WithTarget(target)) + if err != nil { + return + } + return client.New(p) +} diff --git a/cloudevents/emitter_test.go b/cloudevents/emitter_test.go new file mode 100644 index 000000000..31fbe6317 --- /dev/null +++ b/cloudevents/emitter_test.go @@ -0,0 +1,140 @@ +package cloudevents + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cloudevents/sdk-go/v2/client" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/protocol/http" + "github.com/google/go-cmp/cmp" +) + +func makeClient(t *testing.T) (c client.Client, p *http.Protocol) { + p, err := http.New() + if err != nil { + t.Fatal(err) + } + c, err = client.New(p) + if err != nil { + t.Errorf("failed to make client %s", err.Error()) + } + return +} + +func receiveEvents(t *testing.T, ctx context.Context, events chan<- event.Event) (p *http.Protocol) { + c, p := makeClient(t) + go func() { + err := c.StartReceiver(ctx, func(ctx context.Context, event event.Event) error { + go func() { + events <- event + }() + return nil + }) + if err != nil { + t.Errorf("failed to start receiver %s", err.Error()) + } + }() + time.Sleep(1 * time.Second) // let the server start + return +} + +func TestEmitterDefaults(t *testing.T) { + events := make(chan event.Event) + ctx, cancel := context.WithCancel(context.Background()) + + // start a cloudevent client that receives events + // and sends them to a channel + p := receiveEvents(t, ctx, events) + + emitter := NewEmitter() + if err := emitter.Emit(ctx, fmt.Sprintf("http://localhost:%v", p.GetListeningPort())); err != nil { + t.Fatalf("Error emitting event: %v\n", err) + } + + // received event + got := <-events + + cancel() // stop the client + time.Sleep(1 * time.Second) // let the server stop + + if got.Source() != "/boson/fn" { + t.Fatal("Expected /boson/fn as default source") + } + if got.Type() != "boson.fn" { + t.Fatal("Expected boson.fn as default type") + } +} + +func TestEmitter(t *testing.T) { + testCases := map[string]struct { + cesource string + cetype string + ceid string + cedata string + }{ + "with-source": { + cesource: "/my/source", + }, + "with-type": { + cetype: "my.type", + }, + "with-id": { + ceid: "11223344", + }, + "with-data": { + cedata: "Some event data", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + events := make(chan event.Event) + ctx, cancel := context.WithCancel(context.Background()) + + // start a cloudevent client that receives events + // and sends them to a channel + p := receiveEvents(t, ctx, events) + + emitter := NewEmitter() + + if tc.cesource != "" { + emitter.Source = tc.cesource + } + if tc.cetype != "" { + emitter.Type = tc.cetype + } + if tc.ceid != "" { + emitter.Id = tc.ceid + } + if tc.cedata != "" { + emitter.Data = tc.cedata + } + if err := emitter.Emit(ctx, fmt.Sprintf("http://localhost:%v", p.GetListeningPort())); err != nil { + t.Fatalf("Error emitting event: %v\n", err) + } + + // received event + got := <-events + + cancel() // stop the client + time.Sleep(100 * time.Millisecond) // let the server stop + + if tc.cesource != "" && got.Source() != tc.cesource { + t.Fatalf("%s: Expected %s as source, got %s", n, tc.cesource, got.Source()) + } + if tc.cetype != "" && got.Type() != tc.cetype { + t.Fatalf("%s: Expected %s as type, got %s", n, tc.cetype, got.Type()) + } + if tc.ceid != "" && got.ID() != tc.ceid { + t.Fatalf("%s: Expected %s as id, got %s", n, tc.ceid, got.ID()) + } + if tc.cedata != "" { + if diff := cmp.Diff(tc.cedata, string(got.Data())); diff != "" { + t.Errorf("Unexpected difference (-want, +got): %v", diff) + } + } + }) + } +} diff --git a/cmd/emit.go b/cmd/emit.go new file mode 100644 index 000000000..7cf9a43a6 --- /dev/null +++ b/cmd/emit.go @@ -0,0 +1,142 @@ +package cmd + +import ( + "fmt" + "io/ioutil" + + fn "github.com/boson-project/func" + "github.com/boson-project/func/cloudevents" + "github.com/boson-project/func/knative" + "github.com/google/uuid" + "github.com/ory/viper" + "github.com/spf13/cobra" +) + +func init() { + e := cloudevents.NewEmitter() + root.AddCommand(emitCmd) + // TODO: do these env vars make sense? + emitCmd.Flags().StringP("sink", "k", "", "Send the CloudEvent to the function running at [sink]. The special value \"local\" can be used to send the event to a function running on the local host. When provided, the --path flag is ignored (Env: $FUNC_SINK)") + emitCmd.Flags().StringP("source", "s", e.Source, "CloudEvent source (Env: $FUNC_SOURCE)") + emitCmd.Flags().StringP("type", "t", e.Type, "CloudEvent type (Env: $FUNC_TYPE)") + emitCmd.Flags().StringP("id", "i", uuid.NewString(), "CloudEvent ID (Env: $FUNC_ID)") + emitCmd.Flags().StringP("data", "d", "", "Any arbitrary string to be sent as the CloudEvent data. Ignored if --file is provided (Env: $FUNC_DATA)") + emitCmd.Flags().StringP("file", "f", "", "Path to a local file containing CloudEvent data to be sent (Env: $FUNC_FILE)") + emitCmd.Flags().StringP("content-type", "c", "application/json", "The MIME Content-Type for the CloudEvent data (Env: $FUNC_CONTENT_TYPE)") + emitCmd.Flags().StringP("path", "p", cwd(), "Path to the project directory. Ignored when --sink is provided (Env: $FUNC_PATH)") +} + +var emitCmd = &cobra.Command{ + Use: "emit", + Short: "Emit a CloudEvent to a function endpoint", + Long: `Emit event + +Emits a CloudEvent, sending it to the deployed function. +`, + Example: ` +# Send a CloudEvent to the deployed function with no data and default values +# for source, type and ID +kn func emit + +# Send a CloudEvent to the deployed function with the data found in ./test.json +kn func emit --file ./test.json + +# Send a CloudEvent to the function running locally with a CloudEvent containing +# "Hello World!" as the data field, with a content type of "text/plain" +kn func emit --data "Hello World!" --content-type "text/plain" -s local + +# Send a CloudEvent to the function running locally with an event type of "my.event" +kn func emit --type my.event --sink local + +# Send a CloudEvent to the deployed function found at /path/to/fn with an id of "fn.test" +kn func emit --path /path/to/fn -i fn.test + +# Send a CloudEvent to an arbitrary endpoint +kn func emit --sink "http://my.event.broker.com" +`, + SuggestFor: []string{"meit", "emti", "send"}, + PreRunE: bindEnv("source", "type", "id", "data", "file", "path", "sink", "content-type"), + RunE: runEmit, +} + +func runEmit(cmd *cobra.Command, args []string) (err error) { + config := newEmitConfig() + var endpoint string + if config.Sink != "" { + if config.Sink == "local" { + endpoint = "http://localhost:8080" + } else { + endpoint = config.Sink + } + } else { + var f fn.Function + f, err = fn.NewFunction(config.Path) + if err != nil { + return + } + // What happens if the function hasn't been deployed but they don't run with --local=true + // Maybe we should be thinking about saving the endpoint URL in func.yaml after each deploy + var d *knative.Describer + d, err = knative.NewDescriber("") + if err != nil { + return + } + var desc fn.Description + desc, err = d.Describe(f.Name) + if err != nil { + return + } + // Use the first available route + endpoint = desc.Routes[0] + } + + emitter := cloudevents.NewEmitter() + emitter.Source = config.Source + emitter.Type = config.Type + emitter.Id = config.Id + emitter.ContentType = config.ContentType + emitter.Data = config.Data + if config.File != "" { + var buf []byte + if emitter.Data != "" && config.Verbose { + // TODO: This made me wonder whether we should switch to a real logging library + fmt.Printf("WARN: Found both --data and --file. Using file: %v\n", config.File) + } + buf, err = ioutil.ReadFile(config.File) + if err != nil { + return + } + emitter.Data = string(buf) + } + + client := fn.New( + fn.WithEmitter(emitter), + ) + return client.Emit(cmd.Context(), endpoint) +} + +type emitConfig struct { + Path string + Source string + Type string + Id string + Data string + File string + ContentType string + Sink string + Verbose bool +} + +func newEmitConfig() emitConfig { + return emitConfig{ + Path: viper.GetString("path"), + Source: viper.GetString("source"), + Type: viper.GetString("type"), + Id: viper.GetString("id"), + Data: viper.GetString("data"), + File: viper.GetString("file"), + ContentType: viper.GetString("content-type"), + Sink: viper.GetString("sink"), + Verbose: viper.GetBool("verbose"), + } +} diff --git a/docs/guides/commands.md b/docs/guides/commands.md index bd08a78a7..fc39f521b 100644 --- a/docs/guides/commands.md +++ b/docs/guides/commands.md @@ -124,3 +124,38 @@ When run as a `kn` plugin. ```console kn func delete [-n namespace, -p path] ``` + +## `emit` + +Emits a CloudEvent, sending it to the deployed function. The user may specify the event type, source and ID, +and may provide event data on the command line or in a file on disk. By default, `event` works on the local +directory, assuming that it is a function project. Alternatively the user may provide a path to a project +directory using the `--path` flag, or send an event to an arbitrary endpoint using the `--sink` flag. The +`--sink` flag also accepts the special value `local` to send an event to the function running locally, for +example, when run via `func run`. + +Similar `kn` command when using the `kn-plgin-event`: `kn event send [FLAGS]` + +Examples: + +```console +# Send a CloudEvent to the deployed function with no data and default values +# for source, type and ID +kn func emit + +# Send a CloudEvent to the deployed function with the data found in ./test.json +kn func emit --file ./test.json + +# Send a CloudEvent to the function running locally with a CloudEvent containing +# "Hello World!" as the data field, with a content type of "text/plain" +kn func emit --data "Hello World!" --content-type "text/plain" -s local + +# Send a CloudEvent to the function running locally with an event type of "my.event" +kn func emit --type my.event --sink local + +# Send a CloudEvent to the deployed function found at /path/to/fn with an id of "fn.test" +kn func emit --path /path/to/fn -i fn.test + +# Send a CloudEvent to an arbitrary endpoint +kn func emit --sink "http://my.event.broker.com" +``` diff --git a/go.mod b/go.mod index f8b040ac7..04a2b1634 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,12 @@ go 1.14 require ( github.com/buildpacks/pack v0.18.0 + github.com/cloudevents/sdk-go/v2 v2.2.0 github.com/containers/image/v5 v5.10.5 github.com/docker/docker v20.10.2+incompatible github.com/docker/go-connections v0.4.0 + github.com/google/go-cmp v0.5.5 + github.com/google/uuid v1.2.0 github.com/markbates/pkger v0.17.1 github.com/mitchellh/go-homedir v1.1.0 github.com/ory/viper v1.7.4 diff --git a/mock/emitter.go b/mock/emitter.go new file mode 100644 index 000000000..1fdff63c5 --- /dev/null +++ b/mock/emitter.go @@ -0,0 +1,21 @@ +package mock + +import ( + "context" +) + +type Emitter struct { + EmitInvoked bool + EmitFn func(string) error +} + +func NewEmitter() *Emitter { + return &Emitter{ + EmitFn: func(string) error { return nil }, + } +} + +func (i *Emitter) Emit(ctx context.Context, s string) error { + i.EmitInvoked = true + return i.EmitFn(s) +}