diff --git a/Makefile b/Makefile index ae84b73..d74a98f 100644 --- a/Makefile +++ b/Makefile @@ -13,20 +13,21 @@ test: mod ## Tests the entire project cover: mod ## Displays test coverage in the client and service packages go test -coverprofile=cover-client.out ./client && go tool cover -html=cover-client.out - go test -coverprofile=cover-service.out ./service/grpc && go tool cover -html=cover-service.out + go test -coverprofile=cover-grpc.out ./service/grpc && go tool cover -html=cover-grpc.out + go test -coverprofile=cover-http.out ./service/http && go tool cover -html=cover-http.out -service: mod ## Runs the uncompiled example service code +service-http: mod ## Runs the uncompiled HTTP example service code dapr run --app-id serving \ - --app-protocol grpc \ - --app-port 50001 \ + --app-protocol http \ + --app-port 8080 \ --port 3500 \ --log-level debug \ - --components-path example/serving/grpc/config \ - go run example/serving/grpc/main.go + --components-path example/serving/http/config \ + go run example/serving/http/main.go -service09: mod ## Runs the uncompiled example service code using the Dapr v0.9 flags +service-grpc: mod ## Runs the uncompiled gRPC example service code dapr run --app-id serving \ - --protocol grpc \ + --app-protocol grpc \ --app-port 50001 \ --port 3500 \ --log-level debug \ @@ -42,13 +43,13 @@ client: mod ## Runs the uncompiled example client code pubsub: ## Submits pub/sub events in different cotnent types curl -d '{ "from": "John", "to": "Lary", "message": "hi" }' \ -H "Content-type: application/json" \ - "http://localhost:3500/v1.0/publish/messages" + "http://localhost:3500/v1.0/publish/messages/topic1" curl -d 'JohnLary' \ -H "Content-type: application/xml" \ - "http://localhost:3500/v1.0/publish/messages" + "http://localhost:3500/v1.0/publish/messages/topic1" curl -d '0x18, 0x2d, 0x44, 0x54, 0xfb, 0x21, 0x09, 0x40' \ -H "Content-type: application/octet-stream" \ - "http://localhost:3500/v1.0/publish/messages" + "http://localhost:3500/v1.0/publish/messages/topic1" invoke: ## Invokes service method with different operations curl -d '{ "from": "John", "to": "Lary", "message": "hi" }' \ diff --git a/Readme.md b/Readme.md index cd256bf..19a3c50 100644 --- a/Readme.md +++ b/Readme.md @@ -169,29 +169,37 @@ resp, err = client.InvokeService(ctx, "service-name", "method-name") And to invoke a service with data: ```go -data := []byte(`{ "id": "a123", "value": "abcdefg", "valid": true }`) -resp, err := client.InvokeServiceWithContent(ctx, "service-name", "method-name", "application/json", data) +content := &DataContent{ + ContentType: "application/json", + Data: []byte(`{ "id": "a123", "value": "demo", "valid": true }`) +} +resp, err := client.InvokeServiceWithContent(ctx, "service-name", "method-name", content) ``` ##### Bindings -Similarly to Service, Dapr client provides two methods to invoke an operation on a [Dapr-defined binding](https://github.com/dapr/docs/tree/master/concepts/bindings). Dapr supports input, output, and bidirectional bindings so the first methods supports all of them along with metadata options: +Similarly to Service, Dapr client provides two methods to invoke an operation on a [Dapr-defined binding](https://github.com/dapr/docs/tree/master/concepts/bindings). Dapr supports input, output, and bidirectional bindings. + +For simple, output only biding: ```go -data := []byte("hello") -opt := map[string]string{ - "opt1": "value1", - "opt2": "value2", +in := &BindingInvocation{ Name: "binding-name", Operation: "operation-name" } +err = client.InvokeOutputBinding(ctx, in) +``` + +To invoke method with content and metadata: + +```go +in := &BindingInvocation{ + Name: "binding-name", + Operation: "operation-name", + Data: []byte("hello"), + Metadata: map[string]string{"k1": "v1", "k2": "v2"} } -resp, meta, err := client.InvokeBinding(ctx, "binding-name", "operation-name", data, opt) +out, err := client.InvokeBinding(ctx, in) ``` -And for simple, output only biding: -```go -data := []byte("hello") -err = client.InvokeOutputBinding(ctx, "binding-name", "operation-name", data) -``` ##### Secrets @@ -206,7 +214,7 @@ secret, err = client.GetSecret(ctx, "store-name", "secret-name", opt) ## Service (callback) -In addition to a an easy to use client, Dapr go package also provides implementation for `service`. Instructions on how to use it are located [here](./service/grpc/Readme.md) +In addition to a an easy to use client, Dapr go package also provides implementation for `service`. Instructions on how to use it are located [here](./service/Readme.md) ## Contributing to Dapr go client diff --git a/client/binding.go b/client/binding.go index ef1d6ab..f1c7cdb 100644 --- a/client/binding.go +++ b/client/binding.go @@ -7,37 +7,65 @@ import ( "github.com/pkg/errors" ) +// BindingInvocation represents binding invocation request +type BindingInvocation struct { + // Name is name of binding to invoke. + Name string + // Operation is the name of the operation type for the binding to invoke + Operation string + // Data is the input bindings sent + Data []byte + // Metadata is the input binding metadata + Metadata map[string]string +} + +// BindingEvent represents the binding event handler input +type BindingEvent struct { + // Data is the input bindings sent + Data []byte + // Metadata is the input binding metadata + Metadata map[string]string +} + // InvokeBinding invokes specific operation on the configured Dapr binding. // This method covers input, output, and bi-directional bindings. -func (c *GRPCClient) InvokeBinding(ctx context.Context, name, op string, in []byte, min map[string]string) (out []byte, mout map[string]string, err error) { - if name == "" { - return nil, nil, errors.New("nil topic") +func (c *GRPCClient) InvokeBinding(ctx context.Context, in *BindingInvocation) (out *BindingEvent, err error) { + if in == nil { + return nil, errors.New("binding invocation required") + } + if in.Name == "" { + return nil, errors.New("binding invocation name required") + } + if in.Operation == "" { + return nil, errors.New("binding invocation operation required") } req := &pb.InvokeBindingRequest{ - Name: name, - Operation: op, - Data: in, - Metadata: min, + Name: in.Name, + Operation: in.Operation, + Data: in.Data, + Metadata: in.Metadata, } resp, err := c.protoClient.InvokeBinding(authContext(ctx), req) if err != nil { - return nil, nil, errors.Wrapf(err, "error invoking binding %s", name) + return nil, errors.Wrapf(err, "error invoking binding %s/%s", in.Name, in.Operation) } + out = &BindingEvent{} + if resp != nil { - return resp.Data, resp.Metadata, nil + out.Data = resp.Data + out.Metadata = resp.Metadata } - return nil, nil, nil + return } // InvokeOutputBinding invokes configured Dapr binding with data (allows nil).InvokeOutputBinding // This method differs from InvokeBinding in that it doesn't expect any content being returned from the invoked method. -func (c *GRPCClient) InvokeOutputBinding(ctx context.Context, name, operation string, data []byte) error { - _, _, err := c.InvokeBinding(ctx, name, operation, data, nil) - if err != nil { +func (c *GRPCClient) InvokeOutputBinding(ctx context.Context, in *BindingInvocation) error { + if _, err := c.InvokeBinding(ctx, in); err != nil { return errors.Wrap(err, "error invoking output binding") } return nil diff --git a/client/binding_test.go b/client/binding_test.go index a0458a2..8fdb7c4 100644 --- a/client/binding_test.go +++ b/client/binding_test.go @@ -11,32 +11,36 @@ import ( func TestInvokeBinding(t *testing.T) { ctx := context.Background() - data := "ping" + in := &BindingInvocation{ + Name: "test", + Operation: "fn", + } - t.Run("output binding", func(t *testing.T) { - err := testClient.InvokeOutputBinding(ctx, "test", "fn", []byte(data)) + t.Run("output binding without data", func(t *testing.T) { + err := testClient.InvokeOutputBinding(ctx, in) assert.Nil(t, err) }) - t.Run("output binding without data", func(t *testing.T) { - err := testClient.InvokeOutputBinding(ctx, "test", "fn", []byte(data)) + t.Run("output binding", func(t *testing.T) { + in.Data = []byte("test") + err := testClient.InvokeOutputBinding(ctx, in) assert.Nil(t, err) }) t.Run("binding without data", func(t *testing.T) { - out, mOut, err := testClient.InvokeBinding(ctx, "test", "fn", nil, nil) + in.Data = nil + out, err := testClient.InvokeBinding(ctx, in) assert.Nil(t, err) - assert.NotNil(t, mOut) assert.NotNil(t, out) }) t.Run("binding with data and meta", func(t *testing.T) { - mIn := map[string]string{"k1": "v1", "k2": "v2"} - out, mOut, err := testClient.InvokeBinding(ctx, "test", "fn", []byte(data), mIn) + in.Data = []byte("test") + in.Metadata = map[string]string{"k1": "v1", "k2": "v2"} + out, err := testClient.InvokeBinding(ctx, in) assert.Nil(t, err) - assert.NotNil(t, mOut) assert.NotNil(t, out) - assert.Equal(t, data, string(out)) + assert.Equal(t, "test", string(out.Data)) }) } diff --git a/client/client.go b/client/client.go index 6400696..4e98de8 100644 --- a/client/client.go +++ b/client/client.go @@ -30,17 +30,17 @@ var ( type Client interface { // InvokeBinding invokes specific operation on the configured Dapr binding. // This method covers input, output, and bi-directional bindings. - InvokeBinding(ctx context.Context, name, op string, in []byte, min map[string]string) (out []byte, mout map[string]string, err error) + InvokeBinding(ctx context.Context, in *BindingInvocation) (out *BindingEvent, err error) - // InvokeOutputBinding invokes configured Dapr binding with data (allows nil).InvokeOutputBinding + // InvokeOutputBinding invokes configured Dapr binding with data.InvokeOutputBinding // This method differs from InvokeBinding in that it doesn't expect any content being returned from the invoked method. - InvokeOutputBinding(ctx context.Context, name, operation string, data []byte) error + InvokeOutputBinding(ctx context.Context, in *BindingInvocation) error - // InvokeService invokes service without raw data ([]byte). + // InvokeService invokes service without raw data InvokeService(ctx context.Context, serviceID, method string) (out []byte, err error) - // InvokeServiceWithContent invokes service without content (data + content type). - InvokeServiceWithContent(ctx context.Context, serviceID, method, contentType string, data []byte) (out []byte, err error) + // InvokeServiceWithContent invokes service with content + InvokeServiceWithContent(ctx context.Context, serviceID, method string, content *DataContent) (out []byte, err error) // PublishEvent pubishes data onto topic in specific pubsub component. PublishEvent(ctx context.Context, component, topic string, in []byte) error diff --git a/client/invoke.go b/client/invoke.go index 7eded5b..8d48920 100644 --- a/client/invoke.go +++ b/client/invoke.go @@ -9,6 +9,14 @@ import ( "github.com/pkg/errors" ) +// DataContent the service invocation content +type DataContent struct { + // Data is the input data + Data []byte + // ContentType is the type of the data content + ContentType string +} + func (c *GRPCClient) invokeServiceWithRequest(ctx context.Context, req *pb.InvokeServiceRequest) (out []byte, err error) { if req == nil { return nil, errors.New("nil request") @@ -50,23 +58,23 @@ func (c *GRPCClient) InvokeService(ctx context.Context, serviceID, method string } // InvokeServiceWithContent invokes service without content (data + content type). -func (c *GRPCClient) InvokeServiceWithContent(ctx context.Context, serviceID, method, contentType string, data []byte) (out []byte, err error) { +func (c *GRPCClient) InvokeServiceWithContent(ctx context.Context, serviceID, method string, content *DataContent) (out []byte, err error) { if serviceID == "" { - return nil, errors.New("nil serviceID") + return nil, errors.New("serviceID is required") } if method == "" { - return nil, errors.New("nil method") + return nil, errors.New("method name is required") } - if contentType == "" { - return nil, errors.New("nil contentType") + if content == nil { + return nil, errors.New("content required") } req := &pb.InvokeServiceRequest{ Id: serviceID, Message: &v1.InvokeRequest{ Method: method, - Data: &anypb.Any{Value: data}, - ContentType: contentType, + Data: &anypb.Any{Value: content.Data}, + ContentType: content.ContentType, HttpExtension: &v1.HTTPExtension{ Verb: v1.HTTPExtension_POST, }, diff --git a/client/invoke_test.go b/client/invoke_test.go index 79dbb21..f55c966 100644 --- a/client/invoke_test.go +++ b/client/invoke_test.go @@ -14,7 +14,11 @@ func TestInvokeServiceWithContent(t *testing.T) { data := "ping" t.Run("with content", func(t *testing.T) { - resp, err := testClient.InvokeServiceWithContent(ctx, "test", "fn", "text/plain", []byte(data)) + content := &DataContent{ + ContentType: "text/plain", + Data: []byte(data), + } + resp, err := testClient.InvokeServiceWithContent(ctx, "test", "fn", content) assert.Nil(t, err) assert.NotNil(t, resp) assert.Equal(t, string(resp), data) diff --git a/client/pubsub_test.go b/client/pubsub_test.go index f1a87a9..2de7bf0 100644 --- a/client/pubsub_test.go +++ b/client/pubsub_test.go @@ -16,13 +16,13 @@ func TestPublishEvent(t *testing.T) { assert.Nil(t, err) }) + t.Run("without data", func(t *testing.T) { + err := testClient.PublishEvent(ctx, "messagebus", "test", nil) + assert.Nil(t, err) + }) + t.Run("with empty topic name", func(t *testing.T) { err := testClient.PublishEvent(ctx, "messagebus", "", []byte("ping")) assert.NotNil(t, err) }) - - t.Run("without data", func(t *testing.T) { - err := testClient.PublishEvent(ctx, "messagebus", "test", nil) - assert.NotNil(t, err) - }) } diff --git a/example/Readme.md b/example/Readme.md index 3c0c475..b5f93ea 100644 --- a/example/Readme.md +++ b/example/Readme.md @@ -2,7 +2,22 @@ The `example` folder contains a Dapr enabled `serving` app and a `client` app that uses this SDK to invoke Dapr API for state and events, The `serving` app is available as HTTP or gRPC. The `client` app can target either one of these for service to service and binding invocations. -To run this example, start by first launching the service: +To run this example, start by first launching the service in ether HTTP or gRPC: + +### HTTP + +``` +cd example/serving/http +dapr run --app-id serving \ + --app-protocol http \ + --app-port 8080 \ + --port 3500 \ + --log-level debug \ + --components-path ./config \ + go run main.go +``` + +### gRPC ``` cd example/serving/grpc diff --git a/example/client/main.go b/example/client/main.go index 24527ad..0d664b5 100644 --- a/example/client/main.go +++ b/example/client/main.go @@ -56,6 +56,7 @@ func main() { if err := client.SaveStateItems(ctx, store, item2); err != nil { panic(err) } + fmt.Println("data item saved") // delete state for key key1 if err := client.DeleteState(ctx, store, "key1"); err != nil { @@ -64,13 +65,21 @@ func main() { fmt.Println("data deleted") // invoke a method called EchoMethod on another dapr enabled service - resp, err := client.InvokeServiceWithContent(ctx, "serving", "echo", "text/plain", data) + content := &dapr.DataContent{ + ContentType: "text/plain", + Data: []byte(data), + } + resp, err := client.InvokeServiceWithContent(ctx, "serving", "echo", content) if err != nil { panic(err) } fmt.Printf("service method invoked, response: %s", string(resp)) - if err := client.InvokeOutputBinding(ctx, "example-http-binding", "create", nil); err != nil { + in := &dapr.BindingInvocation{ + Name: "example-http-binding", + Operation: "create", + } + if err := client.InvokeOutputBinding(ctx, in); err != nil { panic(err) } fmt.Println("output binding invoked") diff --git a/example/serving/grpc/main.go b/example/serving/grpc/main.go index f23f857..e6b5d3c 100644 --- a/example/serving/grpc/main.go +++ b/example/serving/grpc/main.go @@ -5,6 +5,7 @@ import ( "errors" "log" + "github.com/dapr/go-sdk/service/common" daprd "github.com/dapr/go-sdk/service/grpc" ) @@ -16,20 +17,21 @@ func main() { } // add some topic subscriptions - err = s.AddTopicEventHandler("messages", "demo", eventHandler) - if err != nil { + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "topic1", + } + if err := s.AddTopicEventHandler(sub, eventHandler); err != nil { log.Fatalf("error adding topic subscription: %v", err) } // add a service to service invocation handler - err = s.AddServiceInvocationHandler("echo", echoHandler) - if err != nil { + if err := s.AddServiceInvocationHandler("echo", echoHandler); err != nil { log.Fatalf("error adding invocation handler: %v", err) } // add a binding invocation handler - err = s.AddBindingInvocationHandler("run", runHandler) - if err != nil { + if err := s.AddBindingInvocationHandler("run", runHandler); err != nil { log.Fatalf("error adding binding handler: %v", err) } @@ -39,12 +41,12 @@ func main() { } } -func eventHandler(ctx context.Context, e *daprd.TopicEvent) error { +func eventHandler(ctx context.Context, e *common.TopicEvent) error { log.Printf("event - PubsubName:%s, Topic:%s, ID:%s, Data: %v", e.PubsubName, e.Topic, e.ID, e.Data) return nil } -func echoHandler(ctx context.Context, in *daprd.InvocationEvent) (out *daprd.Content, err error) { +func echoHandler(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) { if in == nil { err = errors.New("nil invocation parameter") return @@ -53,7 +55,7 @@ func echoHandler(ctx context.Context, in *daprd.InvocationEvent) (out *daprd.Con "echo - ContentType:%s, Verb:%s, QueryString:%s, %+v", in.ContentType, in.Verb, in.QueryString, string(in.Data), ) - out = &daprd.Content{ + out = &common.Content{ Data: in.Data, ContentType: in.ContentType, DataTypeURL: in.DataTypeURL, @@ -61,7 +63,7 @@ func echoHandler(ctx context.Context, in *daprd.InvocationEvent) (out *daprd.Con return } -func runHandler(ctx context.Context, in *daprd.BindingEvent) (out []byte, err error) { +func runHandler(ctx context.Context, in *common.BindingEvent) (out []byte, err error) { log.Printf("binding - Data:%v, Meta:%v", in.Data, in.Metadata) return nil, nil } diff --git a/example/serving/http/config/cron.yaml b/example/serving/http/config/cron.yaml new file mode 100644 index 0000000..a195799 --- /dev/null +++ b/example/serving/http/config/cron.yaml @@ -0,0 +1,9 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: run +spec: + type: bindings.cron + metadata: + - name: schedule + value: "@every 10s" \ No newline at end of file diff --git a/example/serving/http/config/pubsub.yaml b/example/serving/http/config/pubsub.yaml new file mode 100644 index 0000000..cefe200 --- /dev/null +++ b/example/serving/http/config/pubsub.yaml @@ -0,0 +1,11 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: messages +spec: + type: pubsub.redis + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" diff --git a/example/serving/http/config/statestore.yaml b/example/serving/http/config/statestore.yaml new file mode 100644 index 0000000..0649229 --- /dev/null +++ b/example/serving/http/config/statestore.yaml @@ -0,0 +1,13 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.redis + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" + - name: actorStateStore + value: "true" diff --git a/example/serving/http/main.go b/example/serving/http/main.go new file mode 100644 index 0000000..f1893de --- /dev/null +++ b/example/serving/http/main.go @@ -0,0 +1,67 @@ +package main + +import ( + "context" + "errors" + "log" + "net/http" + + "github.com/dapr/go-sdk/service/common" + daprd "github.com/dapr/go-sdk/service/http" +) + +func main() { + // create a Dapr service (e.g. ":8080", "0.0.0.0:8080", "10.1.1.1:8080" ) + s := daprd.NewService(":8080") + + // add some topic subscriptions + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "topic1", + Route: "/events", + } + if err := s.AddTopicEventHandler(sub, eventHandler); err != nil { + log.Fatalf("error adding topic subscription: %v", err) + } + + // add a service to service invocation handler + if err := s.AddServiceInvocationHandler("/echo", echoHandler); err != nil { + log.Fatalf("error adding invocation handler: %v", err) + } + + // add an input binding invocation handler + if err := s.AddBindingInvocationHandler("/run", runHandler); err != nil { + log.Fatalf("error adding binding handler: %v", err) + } + + if err := s.Start(); err != nil && err != http.ErrServerClosed { + log.Fatalf("error listenning: %v", err) + } +} + +func eventHandler(ctx context.Context, e *common.TopicEvent) error { + log.Printf("event - PubsubName:%s, Topic:%s, ID:%s, Data: %v", e.PubsubName, e.Topic, e.ID, e.Data) + return nil +} + +func echoHandler(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) { + if in == nil { + err = errors.New("invocation parameter required") + return + } + log.Printf( + "echo - ContentType:%s, Verb:%s, QueryString:%s, %+v", + in.ContentType, in.Verb, in.QueryString, string(in.Data), + ) + out = &common.Content{ + Data: in.Data, + ContentType: in.ContentType, + DataTypeURL: in.DataTypeURL, + } + return +} + +func runHandler(ctx context.Context, in *common.BindingEvent) (out []byte, err error) { + log.Printf("binding - Data:%v, Meta:%v", in.Data, in.Metadata) + return nil, nil +} diff --git a/service/Readme.md b/service/Readme.md new file mode 100644 index 0000000..b91ce05 --- /dev/null +++ b/service/Readme.md @@ -0,0 +1,11 @@ +# Dapr Service (Callback) SDK for Go + +In addition to a an easy to use client, Dapr go package also provides implementation for `service` or `callback` in both HTTP and gRPC protocols: + +* [HTTP Service](./http/Readme.md) +* [gRPC Service](./grpc/Readme.md) + + +## Contributing + +See the [Contribution Guide](../CONTRIBUTING.md) to get started with building and developing. diff --git a/service/common/service.go b/service/common/service.go new file mode 100755 index 0000000..dc460fd --- /dev/null +++ b/service/common/service.go @@ -0,0 +1,17 @@ +package common + +import "context" + +// Service represents Dapr callback service +type Service interface { + // AddServiceInvocationHandler appends provided service invocation handler with its name to the service. + AddServiceInvocationHandler(name string, fn func(ctx context.Context, in *InvocationEvent) (out *Content, err error)) error + // AddTopicEventHandler appends provided event handler with it's topic and optional metadata to the service. + AddTopicEventHandler(sub *Subscription, fn func(ctx context.Context, e *TopicEvent) error) error + // AddBindingInvocationHandler appends provided binding invocation handler with its name to the service. + AddBindingInvocationHandler(name string, fn func(ctx context.Context, in *BindingEvent) (out []byte, err error)) error + // Start starts service. + Start() error + // Stop stops the previously started service. + Stop() error +} diff --git a/service/common/type.go b/service/common/type.go new file mode 100755 index 0000000..0112fad --- /dev/null +++ b/service/common/type.go @@ -0,0 +1,68 @@ +package common + +// TopicEvent is the content of the inbound topic message +type TopicEvent struct { + // ID identifies the event. + ID string `json:"id"` + // The version of the CloudEvents specification. + SpecVersion string `json:"specversion"` + // The type of event related to the originating occurrence. + Type string `json:"type"` + // Source identifies the context in which an event happened. + Source string `json:"source"` + // The content type of data value. + DataContentType string `json:"datacontenttype"` + // The content of the event. + // Note, this is why the gRPC and HTTP implementations need separate structs for cloud events. + Data interface{} `json:"data"` + // Cloud event subject + Subject string `json:"subject"` + // The pubsub topic which publisher sent to. + Topic string `json:"topic"` + // PubsubName is name of the pub/sub this message came from + PubsubName string `json:"pubsubname"` +} + +// InvocationEvent represents the input and output of binding invocation +type InvocationEvent struct { + // Data is the payload that the input bindings sent. + Data []byte `json:"data"` + // ContentType of the Data + ContentType string `json:"contentType"` + // DataTypeURL is the resource URL that uniquely identifies the type of the serialized + DataTypeURL string `json:"typeUrl,omitempty"` + // Verb is the HTTP verb that was used to invoke this service. + Verb string `json:"-"` + // QueryString is the HTTP query string that was used to invoke this service. + QueryString map[string]string `json:"-"` +} + +// Content is a generic data content +type Content struct { + // Data is the payload that the input bindings sent. + Data []byte `json:"data"` + // ContentType of the Data + ContentType string `json:"contentType"` + // DataTypeURL is the resource URL that uniquely identifies the type of the serialized + DataTypeURL string `json:"typeUrl,omitempty"` +} + +// BindingEvent represents the binding event handler input +type BindingEvent struct { + // Data is the input bindings sent + Data []byte `json:"data"` + // Metadata is the input binding metadata + Metadata map[string]string `json:"metadata,omitempty"` +} + +// Subscription represents single topic subscription +type Subscription struct { + // PubsubName is name of the pub/sub this message came from + PubsubName string `json:"pubsubname"` + // Topic is the name of the topic + Topic string `json:"topic"` + // Route is the route of the handler where HTTP topic events should be published (not used in gRPC) + Route string `json:"route"` + // Metadata is the subscription metadata + Metadata map[string]string `json:"metadata,omitempty"` +} diff --git a/service/grpc/Readme.md b/service/grpc/Readme.md index 3919a85..cbe2be8 100644 --- a/service/grpc/Readme.md +++ b/service/grpc/Readme.md @@ -1,6 +1,6 @@ -# Dapr Service SDK for Go +# Dapr gRPC Service SDK for Go -Start by importing Dapr go `service` package: +Start by importing Dapr go `service/grpc` package: ```go daprd "github.com/dapr/go-sdk/service/grpc" @@ -25,7 +25,7 @@ if err := s.Start(); err != nil { } func eventHandler(ctx context.Context, e *daprd.TopicEvent) error { - log.Printf("event - Topic:%s, ID:%s, Data: %v", e.Topic, e.ID, e.Data) + log.Printf("event - PubsubName:%s, Topic:%s, ID:%s, Data: %v", e.PubsubName, e.Topic, e.ID, e.Data) return nil } ``` @@ -48,16 +48,16 @@ if err := s.Start(); err != nil { log.Fatalf("server error: %v", err) } -func echoHandler(ctx context.Context, in *daprd.InvocationEvent) (out *daprd.Content, err error) { +func echoHandler(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) { if in == nil { - err = errors.New("nil invocation parameter") + err = errors.New("invocation parameter required") return } log.Printf( "echo - ContentType:%s, Verb:%s, QueryString:%s, %+v", in.ContentType, in.Verb, in.QueryString, string(in.Data), ) - out = &daprd.Content{ + out = &common.Content{ Data: in.Data, ContentType: in.ContentType, DataTypeURL: in.DataTypeURL, @@ -84,7 +84,7 @@ if err := s.Start(); err != nil { log.Fatalf("server error: %v", err) } -func runHandler(ctx context.Context, in *daprd.BindingEvent) (out []byte, err error) { +func runHandler(ctx context.Context, in *common.BindingEvent) (out []byte, err error) { log.Printf("binding - Data:%v, Meta:%v", in.Data, in.Metadata) return nil, nil } @@ -93,4 +93,4 @@ func runHandler(ctx context.Context, in *daprd.BindingEvent) (out []byte, err er ## Contributing to Dapr go client -See the [Contribution Guide](../CONTRIBUTING.md) to get started with building and developing. +See the [Contribution Guide](../../CONTRIBUTING.md) to get started with building and developing. diff --git a/service/grpc/binding.go b/service/grpc/binding.go index ceee86f..dec1f3c 100644 --- a/service/grpc/binding.go +++ b/service/grpc/binding.go @@ -4,14 +4,14 @@ import ( "context" "fmt" + pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1" + "github.com/dapr/go-sdk/service/common" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" - - pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1" ) // AddBindingInvocationHandler appends provided binding invocation handler with its name to the service -func (s *Server) AddBindingInvocationHandler(name string, fn func(ctx context.Context, in *BindingEvent) (out []byte, err error)) error { +func (s *Server) AddBindingInvocationHandler(name string, fn func(ctx context.Context, in *common.BindingEvent) (out []byte, err error)) error { if name == "" { return fmt.Errorf("binding name required") } @@ -38,7 +38,7 @@ func (s *Server) OnBindingEvent(ctx context.Context, in *pb.BindingEventRequest) return nil, errors.New("nil binding event request") } if fn, ok := s.bindingHandlers[in.Name]; ok { - e := &BindingEvent{ + e := &common.BindingEvent{ Data: in.Data, Metadata: in.Metadata, } diff --git a/service/grpc/binding_test.go b/service/grpc/binding_test.go index b44fb9f..76c1657 100644 --- a/service/grpc/binding_test.go +++ b/service/grpc/binding_test.go @@ -6,10 +6,11 @@ import ( "testing" "github.com/dapr/go-sdk/dapr/proto/runtime/v1" + "github.com/dapr/go-sdk/service/common" "github.com/stretchr/testify/assert" ) -func testBindingHandler(ctx context.Context, in *BindingEvent) (out []byte, err error) { +func testBindingHandler(ctx context.Context, in *common.BindingEvent) (out []byte, err error) { if in == nil { return nil, errors.New("nil event") } diff --git a/service/grpc/invoke.go b/service/grpc/invoke.go index 05bca80..a34133f 100644 --- a/service/grpc/invoke.go +++ b/service/grpc/invoke.go @@ -4,14 +4,14 @@ import ( "context" "fmt" + cpb "github.com/dapr/go-sdk/dapr/proto/common/v1" + cc "github.com/dapr/go-sdk/service/common" "github.com/golang/protobuf/ptypes/any" "github.com/pkg/errors" - - cpb "github.com/dapr/go-sdk/dapr/proto/common/v1" ) // AddServiceInvocationHandler appends provided service invocation handler with its method to the service -func (s *Server) AddServiceInvocationHandler(method string, fn func(ctx context.Context, in *InvocationEvent) (our *Content, err error)) error { +func (s *Server) AddServiceInvocationHandler(method string, fn func(ctx context.Context, in *cc.InvocationEvent) (our *cc.Content, err error)) error { if method == "" { return fmt.Errorf("servie name required") } @@ -25,7 +25,7 @@ func (s *Server) OnInvoke(ctx context.Context, in *cpb.InvokeRequest) (*cpb.Invo return nil, errors.New("nil invoke request") } if fn, ok := s.invokeHandlers[in.Method]; ok { - e := &InvocationEvent{} + e := &cc.InvocationEvent{} if in != nil { e.ContentType = in.ContentType diff --git a/service/grpc/invoke_test.go b/service/grpc/invoke_test.go index 896c0f7..66319f2 100644 --- a/service/grpc/invoke_test.go +++ b/service/grpc/invoke_test.go @@ -5,15 +5,16 @@ import ( "testing" "github.com/dapr/go-sdk/dapr/proto/common/v1" + cc "github.com/dapr/go-sdk/service/common" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/types/known/anypb" ) -func testInvokeHandler(ctx context.Context, in *InvocationEvent) (out *Content, err error) { +func testInvokeHandler(ctx context.Context, in *cc.InvocationEvent) (out *cc.Content, err error) { if in == nil { return } - out = &Content{ + out = &cc.Content{ ContentType: in.ContentType, Data: in.Data, } diff --git a/service/grpc/service.go b/service/grpc/service.go index c2de473..c093474 100644 --- a/service/grpc/service.go +++ b/service/grpc/service.go @@ -7,89 +7,12 @@ import ( "github.com/pkg/errors" pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1" + "github.com/dapr/go-sdk/service/common" "google.golang.org/grpc" ) -// Service represents Dapr callback service -type Service interface { - // AddServiceInvocationHandler appends provided service invocation handler with its name to the service. - AddServiceInvocationHandler(name string, fn func(ctx context.Context, in *InvocationEvent) (out *Content, err error)) error - // AddTopicEventHandler appends provided event handler with it's topic and optional metadata to the service. - AddTopicEventHandler(component, topic string, fn func(ctx context.Context, e *TopicEvent) error) error - // AddTopicEventHandlerWithMetadata appends provided event handler with topic name and metadata to the service. - AddTopicEventHandlerWithMetadata(component, topic string, m map[string]string, fn func(ctx context.Context, e *TopicEvent) error) error - // AddBindingInvocationHandler appends provided binding invocation handler with its name to the service. - AddBindingInvocationHandler(name string, fn func(ctx context.Context, in *BindingEvent) (out []byte, err error)) error - // Start starts service. - Start() error - // Stop stops the previously started service. - Stop() error -} - -// TopicEvent is the content of the inbound topic message. -type TopicEvent struct { - // ID identifies the event. - ID string - // The version of the CloudEvents specification. - SpecVersion string - // The type of event related to the originating occurrence. - Type string - // Source identifies the context in which an event happened. - Source string - // The content type of data value. - DataContentType string - // The content of the event. - Data interface{} - // Cloud event subject - Subject string - // The pubsub topic which publisher sent to. - Topic string - // PubsubName is the pubsub topic which publisher sent to. - PubsubName string -} - -// InvocationEvent represents the input and output of binding invocation. -type InvocationEvent struct { - // Data is the payload that the input bindings sent. - Data []byte - // ContentType of the Data - ContentType string - // DataTypeURL is the resource URL that uniquely identifies the type of the serialized. - DataTypeURL string - // Verb is the HTTP verb that was used to invoke this service. - Verb string - // QueryString is the HTTP query string that was used to invoke this service. - QueryString map[string]string -} - -// Content is a generic data content. -type Content struct { - // Data is the payload that the input bindings sent. - Data []byte - // ContentType of the Data - ContentType string - // DataTypeURL is the resource URL that uniquely identifies the type of the serialized. - DataTypeURL string -} - -// BindingEvent represents the binding event handler input. -type BindingEvent struct { - // Data is the input bindings sent. - Data []byte - // Metadata is the input binging components. - Metadata map[string]string -} - -// Subscription represents single topic subscription. -type Subscription struct { - // Topic is the name of the topic. - Topic string - // Route is the route of the handler where topic events should be published. - Route string -} - // NewService creates new Service. -func NewService(address string) (s Service, err error) { +func NewService(address string) (s common.Service, err error) { if address == "" { return nil, errors.New("nil address") } @@ -103,31 +26,31 @@ func NewService(address string) (s Service, err error) { } // NewServiceWithListener creates new Service with specific listener. -func NewServiceWithListener(lis net.Listener) Service { +func NewServiceWithListener(lis net.Listener) common.Service { return newService(lis) } func newService(lis net.Listener) *Server { return &Server{ listener: lis, - invokeHandlers: make(map[string]func(ctx context.Context, in *InvocationEvent) (out *Content, err error)), + invokeHandlers: make(map[string]func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error)), topicSubscriptions: make(map[string]*topicEventHandler), - bindingHandlers: make(map[string]func(ctx context.Context, in *BindingEvent) (out []byte, err error)), + bindingHandlers: make(map[string]func(ctx context.Context, in *common.BindingEvent) (out []byte, err error)), } } // Server is the gRPC service implementation for Dapr. type Server struct { listener net.Listener - invokeHandlers map[string]func(ctx context.Context, in *InvocationEvent) (out *Content, err error) + invokeHandlers map[string]func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) topicSubscriptions map[string]*topicEventHandler - bindingHandlers map[string]func(ctx context.Context, in *BindingEvent) (out []byte, err error) + bindingHandlers map[string]func(ctx context.Context, in *common.BindingEvent) (out []byte, err error) } type topicEventHandler struct { component string topic string - fn func(ctx context.Context, e *TopicEvent) error + fn func(ctx context.Context, e *common.TopicEvent) error meta map[string]string } diff --git a/service/grpc/topic.go b/service/grpc/topic.go index e44f7d3..746bc4e 100644 --- a/service/grpc/topic.go +++ b/service/grpc/topic.go @@ -4,49 +4,34 @@ import ( "context" "fmt" + pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1" + "github.com/dapr/go-sdk/service/common" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" - - pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1" ) // AddTopicEventHandler appends provided event handler with topic name to the service -func (s *Server) AddTopicEventHandler(component, topic string, fn func(ctx context.Context, e *TopicEvent) error) error { - if topic == "" { - return fmt.Errorf("topic name required") +func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn func(ctx context.Context, e *common.TopicEvent) error) error { + if sub == nil { + return errors.New("subscription required") } - if component == "" { - return fmt.Errorf("component name required") + if sub.Topic == "" { + return errors.New("topic name required") } - key := fmt.Sprintf("%s-%s", component, topic) + if sub.PubsubName == "" { + return errors.New("pub/sub name required") + } + key := fmt.Sprintf("%s-%s", sub.PubsubName, sub.Topic) s.topicSubscriptions[key] = &topicEventHandler{ - component: component, - topic: topic, + component: sub.PubsubName, + topic: sub.Topic, fn: fn, - meta: map[string]string{}, + meta: sub.Metadata, } return nil } -// AddTopicEventHandlerWithMetadata appends provided event handler with topic name and metadata to the service -func (s *Server) AddTopicEventHandlerWithMetadata(component, topic string, m map[string]string, fn func(ctx context.Context, e *TopicEvent) error) error { - if topic == "" { - return fmt.Errorf("topic name required") - } - if component == "" { - return fmt.Errorf("component name required") - } - key := fmt.Sprintf("%s-%s", component, topic) - s.topicSubscriptions[key] = &topicEventHandler{ - component: component, - topic: topic, - fn: fn, - meta: m, - } - return nil -} - -// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to. +// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to. func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*pb.ListTopicSubscriptionsResponse, error) { subs := make([]*pb.TopicSubscription, 0) for _, v := range s.topicSubscriptions { @@ -63,7 +48,8 @@ func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (* }, nil } -// OnTopicEvent fired whenever a message has been published to a topic that has been subscribed. Dapr sends published messages in a CloudEvents 0.3 envelope. +// OnTopicEvent fired whenever a message has been published to a topic that has been subscribed. +// Dapr sends published messages in a CloudEvents 0.3 envelope. func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*pb.TopicEventResponse, error) { if in == nil { return nil, errors.New("nil event request") @@ -76,7 +62,7 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*p } key := fmt.Sprintf("%s-%s", in.PubsubName, in.Topic) if h, ok := s.topicSubscriptions[key]; ok { - e := &TopicEvent{ + e := &common.TopicEvent{ ID: in.Id, Source: in.Source, Type: in.Type, diff --git a/service/grpc/topic_test.go b/service/grpc/topic_test.go index e6612c5..40d6bbf 100644 --- a/service/grpc/topic_test.go +++ b/service/grpc/topic_test.go @@ -6,10 +6,11 @@ import ( "testing" "github.com/dapr/go-sdk/dapr/proto/runtime/v1" + "github.com/dapr/go-sdk/service/common" "github.com/stretchr/testify/assert" ) -func eventHandler(ctx context.Context, event *TopicEvent) error { +func eventHandler(ctx context.Context, event *common.TopicEvent) error { if event == nil { return errors.New("nil event") } @@ -18,12 +19,14 @@ func eventHandler(ctx context.Context, event *TopicEvent) error { // go test -timeout 30s ./service/grpc -count 1 -run ^TestTopic$ func TestTopic(t *testing.T) { - topicName := "test" - componentName := "messages" ctx := context.Background() - + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + } server := getTestServer() - err := server.AddTopicEventHandler(componentName, topicName, eventHandler) + + err := server.AddTopicEventHandler(sub, eventHandler) assert.Nil(t, err) startTestServer(server) @@ -48,8 +51,8 @@ func TestTopic(t *testing.T) { SpecVersion: "v0.3", DataContentType: "text/plain", Data: []byte("test"), - Topic: topicName, - PubsubName: componentName, + Topic: sub.Topic, + PubsubName: sub.PubsubName, } _, err := server.OnTopicEvent(ctx, in) assert.NoError(t, err) diff --git a/service/http/Readme.md b/service/http/Readme.md new file mode 100644 index 0000000..f87215c --- /dev/null +++ b/service/http/Readme.md @@ -0,0 +1,89 @@ +# Dapr HTTP Service SDK for Go + +Start by importing Dapr go `service/http` package: + +```go +daprd "github.com/dapr/go-sdk/service/http" +``` + +## Event Handling + +To handle events from specific topic, first create a Dapr service, add topic event handler, and start the service: + +```go +s := daprd.NewService(":8080") + +sub := &common.Subscription{ + PubsubName: "messages", + Topic: "topic1", + Route: "/events", +} +err := s.AddTopicEventHandler(sub, eventHandler) +if err != nil { + log.Fatalf("error adding topic subscription: %v", err) +} + +if err = s.Start(); err != nil && err != http.ErrServerClosed { + log.Fatalf("error listening: %v", err) +} + +func eventHandler(ctx context.Context, e *common.TopicEvent) error { + log.Printf("event - PubsubName:%s, Topic:%s, ID:%s, Data: %v", e.PubsubName, e.Topic, e.ID, e.Data) + return nil +} +``` + +## Service Invocation Handler + +To handle service invocations, create and start the Dapr service as in the above example. In this case though add the handler for service invocation: + +```go +s := daprd.NewService(":8080") + +if err := s.AddServiceInvocationHandler("/echo", echoHandler); err != nil { + log.Fatalf("error adding invocation handler: %v", err) +} + +if err := s.Start(); err != nil { + log.Fatalf("server error: %v", err) +} + +func echoHandler(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) { + if in == nil { + err = errors.New("invocation parameter required") + return + } + log.Printf( + "echo - ContentType:%s, Verb:%s, QueryString:%s, %+v", + in.ContentType, in.Verb, in.QueryString, string(in.Data), + ) + out = &common.Content{ + Data: in.Data, + ContentType: in.ContentType, + DataTypeURL: in.DataTypeURL, + } + return +} +``` + +## Binding Invocation Handler + +To handle binding invocations, create and start the Dapr service as in the above examples. In this case though add the handler for binding invocation: + +```go +s := daprd.NewService(":8080") + +if err := s.AddBindingInvocationHandler("/run", runHandler); err != nil { + log.Fatalf("error adding binding handler: %v", err) +} + +func runHandler(ctx context.Context, in *common.BindingEvent) (out []byte, err error) { + log.Printf("binding - Data:%v, Meta:%v", in.Data, in.Metadata) + return nil, nil +} +``` + + +## Contributing to Dapr go client + +See the [Contribution Guide](../../CONTRIBUTING.md) to get started with building and developing. diff --git a/service/http/binding.go b/service/http/binding.go new file mode 100755 index 0000000..f754934 --- /dev/null +++ b/service/http/binding.go @@ -0,0 +1,69 @@ +package http + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" + "strings" + + "github.com/dapr/go-sdk/service/common" +) + +// AddBindingInvocationHandler appends provided binding invocation handler with its route to the service +func (s *Server) AddBindingInvocationHandler(route string, fn func(ctx context.Context, in *common.BindingEvent) (out []byte, err error)) error { + if route == "" { + return fmt.Errorf("binding route required") + } + + if !strings.HasPrefix(route, "/") { + route = fmt.Sprintf("/%s", route) + } + + s.mux.Handle(route, optionsHandler(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + var content []byte + if r.ContentLength > 0 { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + content = body + } + + // assuming Dapr doesn't pass multiple values for key + meta := map[string]string{} + for k, values := range r.Header { + // TODO: Need to figure out how to parse out only the headers set in the binding + Traceparent + // if k == "raceparent" || strings.HasPrefix(k, "dapr") { + for _, v := range values { + meta[k] = v + } + // } + } + + // execute handler + in := &common.BindingEvent{ + Data: content, + Metadata: meta, + } + out, err := fn(r.Context(), in) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if out == nil { + out = []byte("{}") + } + + w.Header().Add("Content-Type", "application/json") + if _, err := w.Write(out); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + }))) + + return nil +} diff --git a/service/http/binding_test.go b/service/http/binding_test.go new file mode 100755 index 0000000..86bff7c --- /dev/null +++ b/service/http/binding_test.go @@ -0,0 +1,57 @@ +package http + +import ( + "context" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/dapr/go-sdk/service/common" + "github.com/stretchr/testify/assert" +) + +func TestBindingHandlerWithoutData(t *testing.T) { + s := newService("") + err := s.AddBindingInvocationHandler("/", func(ctx context.Context, in *common.BindingEvent) (out []byte, err error) { + if in == nil { + return nil, errors.New("nil input") + } + if in.Data != nil { + return nil, errors.New("invalid input data") + } + return nil, nil + }) + assert.NoErrorf(t, err, "error adding binding event handler") + + req, err := http.NewRequest(http.MethodPost, "/", nil) + assert.NoErrorf(t, err, "error creating request") + req.Header.Set("Content-Type", "application/json") + + resp := httptest.NewRecorder() + s.mux.ServeHTTP(resp, req) + assert.Equal(t, http.StatusOK, resp.Code) + assert.Equal(t, "{}", resp.Body.String()) +} + +func TestBindingHandlerWithData(t *testing.T) { + data := `{"name": "test"}` + s := newService("") + err := s.AddBindingInvocationHandler("/", func(ctx context.Context, in *common.BindingEvent) (out []byte, err error) { + if in == nil { + return nil, errors.New("nil input") + } + return []byte("test"), nil + }) + assert.NoErrorf(t, err, "error adding binding event handler") + + req, err := http.NewRequest(http.MethodPost, "/", strings.NewReader(data)) + assert.NoErrorf(t, err, "error creating request") + req.Header.Set("Content-Type", "application/json") + + resp := httptest.NewRecorder() + s.mux.ServeHTTP(resp, req) + assert.Equal(t, http.StatusOK, resp.Code) + assert.Equal(t, "test", resp.Body.String()) +} diff --git a/service/http/invoke.go b/service/http/invoke.go new file mode 100755 index 0000000..ee40b1f --- /dev/null +++ b/service/http/invoke.go @@ -0,0 +1,71 @@ +package http + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strings" + + "github.com/dapr/go-sdk/service/common" +) + +// AddServiceInvocationHandler appends provided service invocation handler with its route to the service +func (s *Server) AddServiceInvocationHandler(route string, fn func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error)) error { + if route == "" { + return fmt.Errorf("service route required") + } + + if !strings.HasPrefix(route, "/") { + route = fmt.Sprintf("/%s", route) + } + + s.mux.Handle(route, optionsHandler(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + // capture http args + e := &common.InvocationEvent{ + Verb: r.Method, + QueryString: valuesToMap(r.URL.Query()), + ContentType: r.Header.Get("Content-type"), + } + + // check for post with no data + if r.ContentLength > 0 { + content, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + e.Data = content + } + + // execute handler + o, err := fn(r.Context(), e) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // write to response if handler returned data + if o != nil && o.Data != nil { + if _, err := w.Write(o.Data); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if o.ContentType != "" { + w.Header().Set("Content-type", o.ContentType) + } + } + }))) + + return nil +} + +func valuesToMap(in url.Values) map[string]string { + out := map[string]string{} + for k := range in { + out[k] = in.Get(k) + } + return out +} diff --git a/service/http/invoke_test.go b/service/http/invoke_test.go new file mode 100755 index 0000000..2cdc39c --- /dev/null +++ b/service/http/invoke_test.go @@ -0,0 +1,84 @@ +package http + +import ( + "context" + "errors" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/dapr/go-sdk/service/common" + "github.com/stretchr/testify/assert" +) + +func TestInvocationHandlerWithData(t *testing.T) { + data := `{"name": "test", "data": hellow}` + s := newService("") + err := s.AddServiceInvocationHandler("/", func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) { + if in == nil || in.Data == nil || in.ContentType == "" { + err = errors.New("nil input") + return + } + out = &common.Content{ + Data: in.Data, + ContentType: in.ContentType, + DataTypeURL: in.DataTypeURL, + } + return + }) + assert.NoErrorf(t, err, "error adding event handler") + + req, err := http.NewRequest(http.MethodPost, "/", strings.NewReader(data)) + assert.NoErrorf(t, err, "error creating request") + req.Header.Set("Content-Type", "application/json") + + resp := httptest.NewRecorder() + s.mux.ServeHTTP(resp, req) + assert.Equal(t, http.StatusOK, resp.Code) + + b, err := ioutil.ReadAll(resp.Body) + assert.NoErrorf(t, err, "error reading response body") + assert.Equal(t, data, string(b)) +} + +func TestInvocationHandlerWithoutInputData(t *testing.T) { + s := newService("") + err := s.AddServiceInvocationHandler("/", func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) { + if in == nil || in.Data != nil { + err = errors.New("nil input") + return + } + return &common.Content{}, nil + }) + assert.NoErrorf(t, err, "error adding event handler") + + req, err := http.NewRequest(http.MethodPost, "/", nil) + assert.NoErrorf(t, err, "error creating request") + req.Header.Set("Content-Type", "application/json") + + resp := httptest.NewRecorder() + s.mux.ServeHTTP(resp, req) + assert.Equal(t, http.StatusOK, resp.Code) + + b, err := ioutil.ReadAll(resp.Body) + assert.NoErrorf(t, err, "error reading response body") + assert.NotNil(t, b) + assert.Equal(t, "", string(b)) +} + +func TestInvocationHandlerWithInvalidRoute(t *testing.T) { + s := newService("") + err := s.AddServiceInvocationHandler("/a", func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) { + return nil, nil + }) + assert.NoErrorf(t, err, "error adding event handler") + + req, err := http.NewRequest(http.MethodPost, "/b", nil) + assert.NoErrorf(t, err, "error creating request") + + resp := httptest.NewRecorder() + s.mux.ServeHTTP(resp, req) + assert.Equal(t, http.StatusNotFound, resp.Code) +} diff --git a/service/http/service.go b/service/http/service.go new file mode 100755 index 0000000..a1de057 --- /dev/null +++ b/service/http/service.go @@ -0,0 +1,56 @@ +package http + +import ( + "net/http" + + "github.com/dapr/go-sdk/service/common" +) + +// NewService creates new Service +func NewService(address string) common.Service { + return newService(address) +} + +func newService(address string) *Server { + return &Server{ + address: address, + mux: http.NewServeMux(), + topicSubscriptions: make([]*common.Subscription, 0), + } +} + +// Server is the HTTP server wrapping mux many Dapr helpers +type Server struct { + address string + mux *http.ServeMux + topicSubscriptions []*common.Subscription +} + +// Start starts the HTTP handler. Blocks while serving +func (s *Server) Start() error { + s.registerSubscribeHandler() + server := http.Server{ + Addr: s.address, + Handler: s.mux, + } + return server.ListenAndServe() +} + +// Stop stops previously started HTTP service +func (s *Server) Stop() error { + // TODO: implement service stop + return nil +} + +func optionsHandler(h http.Handler) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodOptions { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST,OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "authorization, origin, content-type, accept") + w.Header().Set("Allow", "POST,OPTIONS") + } else { + h.ServeHTTP(w, r) + } + } +} diff --git a/service/http/service_test.go b/service/http/service_test.go new file mode 100644 index 0000000..4cbc941 --- /dev/null +++ b/service/http/service_test.go @@ -0,0 +1,12 @@ +package http + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestStoppingUnstartedService(t *testing.T) { + s := newService("") + assert.NotNil(t, s) +} diff --git a/service/http/topic.go b/service/http/topic.go new file mode 100755 index 0000000..7057c5f --- /dev/null +++ b/service/http/topic.go @@ -0,0 +1,77 @@ +package http + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/pkg/errors" + + "github.com/dapr/go-sdk/service/common" +) + +func (s *Server) registerSubscribeHandler() { + f := func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(s.topicSubscriptions); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + s.mux.HandleFunc("/dapr/subscribe", f) +} + +// AddTopicEventHandler appends provided event handler with it's name to the service +func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn func(ctx context.Context, e *common.TopicEvent) error) error { + if sub == nil { + return errors.New("subscription required") + } + if sub.Topic == "" { + return errors.New("topic name required") + } + if sub.PubsubName == "" { + return errors.New("pub/sub name required") + } + if sub.Route == "" { + return errors.New("handler route name") + } + + if !strings.HasPrefix(sub.Route, "/") { + sub.Route = fmt.Sprintf("/%s", sub.Route) + } + + s.topicSubscriptions = append(s.topicSubscriptions, sub) + + s.mux.Handle(sub.Route, optionsHandler(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + // check for post with no data + if r.ContentLength == 0 { + http.Error(w, "nil content", http.StatusBadRequest) + return + } + + // deserialize the event + var in common.TopicEvent + if err := json.NewDecoder(r.Body).Decode(&in); err != nil { + fmt.Println(err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if in.Topic == "" { + in.Topic = sub.Topic + } + + if err := fn(r.Context(), &in); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + }))) + + return nil +} diff --git a/service/http/topic_test.go b/service/http/topic_test.go new file mode 100755 index 0000000..0af08b4 --- /dev/null +++ b/service/http/topic_test.go @@ -0,0 +1,60 @@ +package http + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/dapr/go-sdk/service/common" + "github.com/stretchr/testify/assert" +) + +func TestEventHandler(t *testing.T) { + data := `{ + "specversion" : "1.0", + "type" : "com.github.pull.create", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "A234-1234-1234", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ==" + }` + + s := newService("") + + sub := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/", + Metadata: map[string]string{}, + } + err := s.AddTopicEventHandler(sub, func(ctx context.Context, e *common.TopicEvent) error { + if e == nil { + return errors.New("nil content") + } + if e.DataContentType != "application/json" { + return fmt.Errorf("invalid content type: %s", e.DataContentType) + } + if e.Data == nil { + return errors.New("nil data") + } + return nil + }) + assert.NoErrorf(t, err, "error adding event handler") + + req, err := http.NewRequest(http.MethodPost, "/", strings.NewReader(data)) + assert.NoErrorf(t, err, "error creating request") + req.Header.Set("Content-Type", "application/json") + + rr := httptest.NewRecorder() + s.registerSubscribeHandler() + s.mux.ServeHTTP(rr, req) + assert.Equal(t, http.StatusOK, rr.Code) +}