From e82164f4e7a9402c1fe0a201f354e05b044f98a9 Mon Sep 17 00:00:00 2001 From: Yaron Schneider Date: Tue, 26 May 2020 23:32:20 -0700 Subject: [PATCH] Add bi-directional bindings support (#350) * add bi-directional bindings * added invoke response * added explicit types * change operationtype to operationkind --- bindings/Readme.md | 6 ++++- bindings/alicloud/oss/oss.go | 12 ++++++---- bindings/aws/dynamodb/dynamodb.go | 14 +++++++---- bindings/aws/kinesis/kinesis.go | 8 +++++-- bindings/aws/s3/s3.go | 8 +++++-- bindings/aws/sns/sns.go | 12 ++++++---- bindings/aws/sqs/sqs.go | 8 +++++-- bindings/azure/blobstorage/blobstorage.go | 10 +++++--- bindings/azure/cosmosdb/cosmosdb.go | 14 +++++++---- bindings/azure/eventgrid/eventgrid.go | 14 +++++++---- bindings/azure/eventhubs/eventhubs.go | 10 +++++--- .../servicebusqueues/servicebusqueues.go | 10 +++++--- .../servicebusqueues_integration_test.go | 8 +++---- bindings/azure/signalr/signalr.go | 16 ++++++++----- bindings/azure/signalr/signalr_test.go | 10 ++++---- bindings/azure/storagequeues/storagequeues.go | 12 ++++++---- .../azure/storagequeues/storagequeues_test.go | 24 +++++++++---------- bindings/gcp/bucket/bucket.go | 10 +++++--- bindings/gcp/pubsub/pubsub.go | 8 +++++-- bindings/http/http.go | 10 +++++--- bindings/kafka/kafka.go | 10 +++++--- bindings/mqtt/mqtt.go | 8 +++++-- bindings/output_binding.go | 3 ++- bindings/rabbitmq/rabbitmq.go | 12 ++++++---- .../rabbitmq/rabbitmq_integration_test.go | 8 +++---- bindings/redis/redis.go | 12 ++++++---- bindings/requests.go | 20 ++++++++++++---- bindings/responses.go | 6 +++++ bindings/twilio/sendgrid/sendgrid.go | 18 ++++++++------ bindings/twilio/sms/sms.go | 16 ++++++++----- bindings/twilio/sms/sms_test.go | 8 +++---- 31 files changed, 228 insertions(+), 117 deletions(-) diff --git a/bindings/Readme.md b/bindings/Readme.md index 4c219a461..eb6689190 100644 --- a/bindings/Readme.md +++ b/bindings/Readme.md @@ -24,10 +24,14 @@ type InputBinding interface { Output binding: +An output binding can be used to invoke an external system and also to return data from it. +Each output binding can decide which operations it supports. This information is communicated to the caller via the `Operations()` method. + ```go type OutputBinding interface { Init(metadata Metadata) error - Write(req *WriteRequest) error + Invoke(req *InvokeRequest) error + Operations() []OperationKind } ``` A spec is also needed in [Dapr docs](https://github.com/dapr/docs/tree/master/reference/specs/bindings). \ No newline at end of file diff --git a/bindings/alicloud/oss/oss.go b/bindings/alicloud/oss/oss.go index 07bc4fba4..eca1c09f9 100644 --- a/bindings/alicloud/oss/oss.go +++ b/bindings/alicloud/oss/oss.go @@ -50,7 +50,11 @@ func (s *AliCloudOSS) Init(metadata bindings.Metadata) error { return nil } -func (s *AliCloudOSS) Write(req *bindings.WriteRequest) error { +func (s *AliCloudOSS) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (s *AliCloudOSS) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { key := "" if val, ok := req.Metadata["key"]; ok && val != "" { key = val @@ -63,16 +67,16 @@ func (s *AliCloudOSS) Write(req *bindings.WriteRequest) error { bucket, err := s.client.Bucket(s.metadata.Bucket) if err != nil { - return err + return nil, err } // Upload a byte array. err = bucket.PutObject(key, bytes.NewReader(req.Data)) if err != nil { - return err + return nil, err } - return err + return nil, err } func (s *AliCloudOSS) parseMetadata(metadata bindings.Metadata) (*ossMetadata, error) { diff --git a/bindings/aws/dynamodb/dynamodb.go b/bindings/aws/dynamodb/dynamodb.go index 6eb8e2e96..591e3405d 100644 --- a/bindings/aws/dynamodb/dynamodb.go +++ b/bindings/aws/dynamodb/dynamodb.go @@ -55,16 +55,20 @@ func (d *DynamoDB) Init(metadata bindings.Metadata) error { return nil } -func (d *DynamoDB) Write(req *bindings.WriteRequest) error { +func (d *DynamoDB) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (d *DynamoDB) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { var obj interface{} err := json.Unmarshal(req.Data, &obj) if err != nil { - return err + return nil, err } item, err := dynamodbattribute.MarshalMap(obj) if err != nil { - return err + return nil, err } input := &dynamodb.PutItemInput{ @@ -74,10 +78,10 @@ func (d *DynamoDB) Write(req *bindings.WriteRequest) error { _, err = d.client.PutItem(input) if err != nil { - return err + return nil, err } - return nil + return nil, nil } func (d *DynamoDB) getDynamoDBMetadata(spec bindings.Metadata) (*dynamoDBMetadata, error) { diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index 32bd8cea4..13ab0a006 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -122,7 +122,11 @@ func (a *AWSKinesis) Init(metadata bindings.Metadata) error { return nil } -func (a *AWSKinesis) Write(req *bindings.WriteRequest) error { +func (a *AWSKinesis) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (a *AWSKinesis) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { partitionKey := req.Metadata[partitionKeyName] if partitionKey == "" { partitionKey = uuid.New().String() @@ -132,7 +136,7 @@ func (a *AWSKinesis) Write(req *bindings.WriteRequest) error { Data: req.Data, PartitionKey: &partitionKey, }) - return err + return nil, err } func (a *AWSKinesis) Read(handler func(*bindings.ReadResponse) error) error { diff --git a/bindings/aws/s3/s3.go b/bindings/aws/s3/s3.go index 2547599bb..49bedf6f4 100644 --- a/bindings/aws/s3/s3.go +++ b/bindings/aws/s3/s3.go @@ -54,7 +54,11 @@ func (s *AWSS3) Init(metadata bindings.Metadata) error { return nil } -func (s *AWSS3) Write(req *bindings.WriteRequest) error { +func (s *AWSS3) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (s *AWSS3) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { key := "" if val, ok := req.Metadata["key"]; ok && val != "" { key = val @@ -69,7 +73,7 @@ func (s *AWSS3) Write(req *bindings.WriteRequest) error { Key: aws.String(key), Body: r, }) - return err + return nil, err } func (s *AWSS3) parseMetadata(metadata bindings.Metadata) (*s3Metadata, error) { diff --git a/bindings/aws/sns/sns.go b/bindings/aws/sns/sns.go index fe424fd4b..005702534 100644 --- a/bindings/aws/sns/sns.go +++ b/bindings/aws/sns/sns.go @@ -81,11 +81,15 @@ func (a *AWSSNS) getClient(metadata *snsMetadata) (*sns.SNS, error) { return c, nil } -func (a *AWSSNS) Write(req *bindings.WriteRequest) error { +func (a *AWSSNS) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (a *AWSSNS) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { var payload dataPayload err := json.Unmarshal(req.Data, &payload) if err != nil { - return err + return nil, err } msg := fmt.Sprintf("%v", payload.Message) @@ -99,7 +103,7 @@ func (a *AWSSNS) Write(req *bindings.WriteRequest) error { _, err = a.client.Publish(input) if err != nil { - return err + return nil, err } - return nil + return nil, nil } diff --git a/bindings/aws/sqs/sqs.go b/bindings/aws/sqs/sqs.go index 6099f5c63..a7c5e55f4 100644 --- a/bindings/aws/sqs/sqs.go +++ b/bindings/aws/sqs/sqs.go @@ -62,13 +62,17 @@ func (a *AWSSQS) Init(metadata bindings.Metadata) error { return nil } -func (a *AWSSQS) Write(req *bindings.WriteRequest) error { +func (a *AWSSQS) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (a *AWSSQS) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { msgBody := string(req.Data) _, err := a.Client.SendMessage(&sqs.SendMessageInput{ MessageBody: &msgBody, QueueUrl: a.QueueURL, }) - return err + return nil, err } func (a *AWSSQS) Read(handler func(*bindings.ReadResponse) error) error { diff --git a/bindings/azure/blobstorage/blobstorage.go b/bindings/azure/blobstorage/blobstorage.go index 16b1cbc4b..d789733e1 100644 --- a/bindings/azure/blobstorage/blobstorage.go +++ b/bindings/azure/blobstorage/blobstorage.go @@ -90,7 +90,11 @@ func (a *AzureBlobStorage) parseMetadata(metadata bindings.Metadata) (*blobStora return &m, nil } -func (a *AzureBlobStorage) Write(req *bindings.WriteRequest) error { +func (a *AzureBlobStorage) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (a *AzureBlobStorage) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { name := "" if val, ok := req.Metadata[blobName]; ok && val != "" { name = val @@ -108,7 +112,7 @@ func (a *AzureBlobStorage) Write(req *bindings.WriteRequest) error { if val, ok := req.Metadata[contentMD5]; ok && val != "" { sDec, err := b64.StdEncoding.DecodeString(val) if err != nil || len(sDec) != 16 { - return fmt.Errorf("the MD5 value specified in Content MD5 is invalid, MD5 value must be 128 bits and base64 encoded") + return nil, fmt.Errorf("the MD5 value specified in Content MD5 is invalid, MD5 value must be 128 bits and base64 encoded") } blobHTTPHeaders.ContentMD5 = sDec delete(req.Metadata, contentMD5) @@ -138,5 +142,5 @@ func (a *AzureBlobStorage) Write(req *bindings.WriteRequest) error { Metadata: req.Metadata, BlobHTTPHeaders: blobHTTPHeaders, }) - return err + return nil, err } diff --git a/bindings/azure/cosmosdb/cosmosdb.go b/bindings/azure/cosmosdb/cosmosdb.go index 75b73bd68..d74ca192c 100644 --- a/bindings/azure/cosmosdb/cosmosdb.go +++ b/bindings/azure/cosmosdb/cosmosdb.go @@ -97,24 +97,28 @@ func (c *CosmosDB) parseMetadata(metadata bindings.Metadata) (*cosmosDBCredentia return &creds, nil } -func (c *CosmosDB) Write(req *bindings.WriteRequest) error { +func (c *CosmosDB) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (c *CosmosDB) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { var obj interface{} err := json.Unmarshal(req.Data, &obj) if err != nil { - return err + return nil, err } val, err := c.getPartitionKeyValue(c.partitionKey, obj) if err != nil { - return err + return nil, err } _, err = c.client.CreateDocument(c.collection.Self, obj, documentdb.PartitionKey(val)) if err != nil { - return err + return nil, err } - return nil + return nil, nil } func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (interface{}, error) { diff --git a/bindings/azure/eventgrid/eventgrid.go b/bindings/azure/eventgrid/eventgrid.go index 62ce30067..a5a75f3eb 100644 --- a/bindings/azure/eventgrid/eventgrid.go +++ b/bindings/azure/eventgrid/eventgrid.go @@ -107,11 +107,15 @@ func (a *AzureEventGrid) Read(handler func(*bindings.ReadResponse) error) error return nil } -func (a *AzureEventGrid) Write(req *bindings.WriteRequest) error { +func (a *AzureEventGrid) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (a *AzureEventGrid) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { err := a.ensureOutputBindingMetadata() if err != nil { a.logger.Error(err.Error()) - return err + return nil, err } request := fasthttp.AcquireRequest() @@ -129,16 +133,16 @@ func (a *AzureEventGrid) Write(req *bindings.WriteRequest) error { err = client.Do(request, response) if err != nil { a.logger.Error(err.Error()) - return err + return nil, err } if response.StatusCode() != fasthttp.StatusOK { body := response.Body() a.logger.Error(string(body)) - return errors.New(string(body)) + return nil, errors.New(string(body)) } - return nil + return nil, nil } func (a *AzureEventGrid) ensureInputBindingMetadata() error { diff --git a/bindings/azure/eventhubs/eventhubs.go b/bindings/azure/eventhubs/eventhubs.go index e9b30fced..678995eea 100644 --- a/bindings/azure/eventhubs/eventhubs.go +++ b/bindings/azure/eventhubs/eventhubs.go @@ -138,8 +138,12 @@ func parseMetadata(meta bindings.Metadata) (*azureEventHubsMetadata, error) { return m, nil } +func (a *AzureEventHubs) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + // Write posts an event hubs message -func (a *AzureEventHubs) Write(req *bindings.WriteRequest) error { +func (a *AzureEventHubs) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { event := &eventhub.Event{ Data: req.Data, } @@ -156,10 +160,10 @@ func (a *AzureEventHubs) Write(req *bindings.WriteRequest) error { err := a.hub.Send(context.Background(), event) if err != nil { - return err + return nil, err } - return nil + return nil, nil } // Read gets messages from eventhubs in a non-blocking fashion diff --git a/bindings/azure/servicebusqueues/servicebusqueues.go b/bindings/azure/servicebusqueues/servicebusqueues.go index 876ae0a80..5d79c77c3 100644 --- a/bindings/azure/servicebusqueues/servicebusqueues.go +++ b/bindings/azure/servicebusqueues/servicebusqueues.go @@ -127,7 +127,11 @@ func (a *AzureServiceBusQueues) parseMetadata(metadata bindings.Metadata) (*serv return &m, nil } -func (a *AzureServiceBusQueues) Write(req *bindings.WriteRequest) error { +func (a *AzureServiceBusQueues) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (a *AzureServiceBusQueues) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() @@ -141,14 +145,14 @@ func (a *AzureServiceBusQueues) Write(req *bindings.WriteRequest) error { ttl, ok, err := bindings.TryGetTTL(req.Metadata) if err != nil { - return err + return nil, err } if ok { msg.TTL = &ttl } - return a.client.Send(ctx, msg) + return nil, a.client.Send(ctx, msg) } func (a *AzureServiceBusQueues) Read(handler func(*bindings.ReadResponse) error) error { diff --git a/bindings/azure/servicebusqueues/servicebusqueues_integration_test.go b/bindings/azure/servicebusqueues/servicebusqueues_integration_test.go index b5c782614..5c65afb1b 100644 --- a/bindings/azure/servicebusqueues/servicebusqueues_integration_test.go +++ b/bindings/azure/servicebusqueues/servicebusqueues_integration_test.go @@ -83,7 +83,7 @@ func TestQueueWithTTL(t *testing.T) { // Assert that if waited too long, we won't see any message const tooLateMsgContent = "too_late_msg" - err = a.Write(&bindings.WriteRequest{Data: []byte(tooLateMsgContent)}) + err = a.Write(&bindings.InvokeRequest{Data: []byte(tooLateMsgContent)}) assert.Nil(t, err) time.Sleep(time.Second * 2) @@ -97,7 +97,7 @@ func TestQueueWithTTL(t *testing.T) { // Getting before it is expired, should return it const testMsgContent = "test_msg" - err = a.Write(&bindings.WriteRequest{Data: []byte(testMsgContent)}) + err = a.Write(&bindings.InvokeRequest{Data: []byte(testMsgContent)}) assert.Nil(t, err) msg, ok, err := getMessageWithRetries(queue, maxGetDuration) @@ -136,7 +136,7 @@ func TestPublishingWithTTL(t *testing.T) { assert.Equal(t, defaultAzureServiceBusMessageTimeToLive, *queueEntity.DefaultMessageTimeToLive) const tooLateMsgContent = "too_late_msg" - writeRequest := bindings.WriteRequest{ + writeRequest := bindings.InvokeRequest{ Data: []byte(tooLateMsgContent), Metadata: map[string]string{ bindings.TTLMetadataKey: "1", @@ -160,7 +160,7 @@ func TestPublishingWithTTL(t *testing.T) { assert.Nil(t, err) const testMsgContent = "test_msg" - writeRequest = bindings.WriteRequest{ + writeRequest = bindings.InvokeRequest{ Data: []byte(testMsgContent), Metadata: map[string]string{ bindings.TTLMetadataKey: "1", diff --git a/bindings/azure/signalr/signalr.go b/bindings/azure/signalr/signalr.go index d55c0e195..9a145c822 100644 --- a/bindings/azure/signalr/signalr.go +++ b/bindings/azure/signalr/signalr.go @@ -95,7 +95,7 @@ func (s *SignalR) Init(metadata bindings.Metadata) error { return nil } -func (s *SignalR) resolveAPIURL(req *bindings.WriteRequest) (string, error) { +func (s *SignalR) resolveAPIURL(req *bindings.InvokeRequest) (string, error) { hub := s.hub if hub == "" { hubFromRequest, ok := req.Metadata[hubKey] @@ -148,23 +148,27 @@ func (s *SignalR) sendMessageToSignalR(url string, token string, data []byte) er return nil } -func (s *SignalR) Write(req *bindings.WriteRequest) error { +func (s *SignalR) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (s *SignalR) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { url, err := s.resolveAPIURL(req) if err != nil { - return err + return nil, err } token, err := s.ensureValidToken(url) if err != nil { - return err + return nil, err } err = s.sendMessageToSignalR(url, token, req.Data) if err != nil { - return err + return nil, err } - return nil + return nil, nil } func (s *SignalR) ensureValidToken(url string) (string, error) { diff --git a/bindings/azure/signalr/signalr_test.go b/bindings/azure/signalr/signalr_test.go index 413660574..9557742ea 100644 --- a/bindings/azure/signalr/signalr_test.go +++ b/bindings/azure/signalr/signalr_test.go @@ -177,7 +177,7 @@ func TestWriteShouldFail(t *testing.T) { t.Run("Missing hub should fail", func(t *testing.T) { httpTransport.reset() - err := s.Write(&bindings.WriteRequest{ + _, err := s.Invoke(&bindings.InvokeRequest{ Data: []byte("hello world"), Metadata: map[string]string{}, }) @@ -189,7 +189,7 @@ func TestWriteShouldFail(t *testing.T) { httpTransport.reset() httpErr := errors.New("fake error") httpTransport.errToReturn = httpErr - err := s.Write(&bindings.WriteRequest{ + _, err := s.Invoke(&bindings.InvokeRequest{ Data: []byte("hello world"), Metadata: map[string]string{ hubKey: "testHub", @@ -203,7 +203,7 @@ func TestWriteShouldFail(t *testing.T) { t.Run("SignalR call returns status != [200, 202]", func(t *testing.T) { httpTransport.reset() httpTransport.response.StatusCode = 401 - err := s.Write(&bindings.WriteRequest{ + _, err := s.Invoke(&bindings.InvokeRequest{ Data: []byte("hello world"), Metadata: map[string]string{ hubKey: "testHub", @@ -228,7 +228,7 @@ func TestWriteShouldSucceed(t *testing.T) { t.Run("Has authorization", func(t *testing.T) { httpTransport.reset() - err := s.Write(&bindings.WriteRequest{ + _, err := s.Invoke(&bindings.InvokeRequest{ Data: []byte("hello world"), Metadata: map[string]string{ hubKey: "testHub", @@ -262,7 +262,7 @@ func TestWriteShouldSucceed(t *testing.T) { t.Run(tt.name, func(t *testing.T) { httpTransport.reset() s.hub = tt.hubInMetadata - err := s.Write(&bindings.WriteRequest{ + _, err := s.Invoke(&bindings.InvokeRequest{ Data: []byte("hello world"), Metadata: map[string]string{ hubKey: tt.hubInWriteRequest, diff --git a/bindings/azure/storagequeues/storagequeues.go b/bindings/azure/storagequeues/storagequeues.go index 06cca262b..e5e089838 100644 --- a/bindings/azure/storagequeues/storagequeues.go +++ b/bindings/azure/storagequeues/storagequeues.go @@ -191,11 +191,15 @@ func (a *AzureStorageQueues) parseMetadata(metadata bindings.Metadata) (*storage return &m, nil } -func (a *AzureStorageQueues) Write(req *bindings.WriteRequest) error { +func (a *AzureStorageQueues) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (a *AzureStorageQueues) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { ttlToUse := a.metadata.ttl ttl, ok, err := bindings.TryGetTTL(req.Metadata) if err != nil { - return err + return nil, err } if ok { @@ -204,9 +208,9 @@ func (a *AzureStorageQueues) Write(req *bindings.WriteRequest) error { err = a.helper.Write(req.Data, ttlToUse) if err != nil { - return err + return nil, err } - return nil + return nil, nil } func (a *AzureStorageQueues) Read(handler func(*bindings.ReadResponse) error) error { diff --git a/bindings/azure/storagequeues/storagequeues_test.go b/bindings/azure/storagequeues/storagequeues_test.go index f3bdf21e5..810325ef6 100644 --- a/bindings/azure/storagequeues/storagequeues_test.go +++ b/bindings/azure/storagequeues/storagequeues_test.go @@ -51,9 +51,9 @@ func TestWriteQueue(t *testing.T) { err := a.Init(m) assert.Nil(t, err) - r := bindings.WriteRequest{Data: []byte("This is my message")} + r := bindings.InvokeRequest{Data: []byte("This is my message")} - err = a.Write(&r) + _, err = a.Invoke(&r) assert.Nil(t, err) } @@ -73,9 +73,9 @@ func TestWriteWithTTLInQueue(t *testing.T) { err := a.Init(m) assert.Nil(t, err) - r := bindings.WriteRequest{Data: []byte("This is my message")} + r := bindings.InvokeRequest{Data: []byte("This is my message")} - err = a.Write(&r) + _, err = a.Invoke(&r) assert.Nil(t, err) } @@ -95,12 +95,12 @@ func TestWriteWithTTLInWrite(t *testing.T) { err := a.Init(m) assert.Nil(t, err) - r := bindings.WriteRequest{ + r := bindings.InvokeRequest{ Data: []byte("This is my message"), Metadata: map[string]string{bindings.TTLMetadataKey: "1"}, } - err = a.Write(&r) + _, err = a.Invoke(&r) assert.Nil(t, err) } @@ -116,7 +116,7 @@ func TestWriteWithTTLInWrite(t *testing.T) { err := a.Init(m) assert.Nil(t, err) - r := bindings.WriteRequest{Data: []byte("This is my message")} + r := bindings.InvokeRequest{Data: []byte("This is my message")} err = a.Write(&r) @@ -135,9 +135,9 @@ func TestReadQueue(t *testing.T) { err := a.Init(m) assert.Nil(t, err) - r := bindings.WriteRequest{Data: []byte("This is my message")} + r := bindings.InvokeRequest{Data: []byte("This is my message")} - err = a.Write(&r) + _, err = a.Invoke(&r) assert.Nil(t, err) @@ -169,9 +169,9 @@ func TestReadQueueDecode(t *testing.T) { err := a.Init(m) assert.Nil(t, err) - r := bindings.WriteRequest{Data: []byte("VGhpcyBpcyBteSBtZXNzYWdl")} + r := bindings.InvokeRequest{Data: []byte("VGhpcyBpcyBteSBtZXNzYWdl")} - err = a.Write(&r) + _, err = a.Invoke(&r) assert.Nil(t, err) @@ -200,7 +200,7 @@ func TestReadQueueDecode(t *testing.T) { err := a.Init(m) assert.Nil(t, err) - r := bindings.WriteRequest{Data: []byte("This is my message")} + r := bindings.InvokeRequest{Data: []byte("This is my message")} err = a.Write(&r) diff --git a/bindings/gcp/bucket/bucket.go b/bindings/gcp/bucket/bucket.go index 80e27b6c4..2fd5ca714 100644 --- a/bindings/gcp/bucket/bucket.go +++ b/bindings/gcp/bucket/bucket.go @@ -74,7 +74,11 @@ func (g *GCPStorage) parseMetadata(metadata bindings.Metadata) ([]byte, error) { return b, nil } -func (g *GCPStorage) Write(req *bindings.WriteRequest) error { +func (g *GCPStorage) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (g *GCPStorage) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { name := "" if val, ok := req.Metadata["name"]; ok && val != "" { name = val @@ -84,7 +88,7 @@ func (g *GCPStorage) Write(req *bindings.WriteRequest) error { h := g.client.Bucket(g.metadata.Bucket).Object(name).NewWriter(context.Background()) defer h.Close() if _, err := h.Write(req.Data); err != nil { - return err + return nil, err } - return nil + return nil, nil } diff --git a/bindings/gcp/pubsub/pubsub.go b/bindings/gcp/pubsub/pubsub.go index 2745afd5c..b9611007a 100644 --- a/bindings/gcp/pubsub/pubsub.go +++ b/bindings/gcp/pubsub/pubsub.go @@ -92,7 +92,11 @@ func (g *GCPPubSub) Read(handler func(*bindings.ReadResponse) error) error { return err } -func (g *GCPPubSub) Write(req *bindings.WriteRequest) error { +func (g *GCPPubSub) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (g *GCPPubSub) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { topicName := g.metadata.Topic if val, ok := req.Metadata[topic]; ok && val != "" { topicName = val @@ -103,5 +107,5 @@ func (g *GCPPubSub) Write(req *bindings.WriteRequest) error { _, err := t.Publish(ctx, &pubsub.Message{ Data: req.Data, }).Get(ctx) - return err + return nil, err } diff --git a/bindings/http/http.go b/bindings/http/http.go index 028468c5b..4d6036eb4 100644 --- a/bindings/http/http.go +++ b/bindings/http/http.go @@ -81,14 +81,18 @@ func (h *HTTPSource) Read(handler func(*bindings.ReadResponse) error) error { return nil } -func (h *HTTPSource) Write(req *bindings.WriteRequest) error { +func (h *HTTPSource) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (h *HTTPSource) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { client := http.Client{Timeout: time.Second * 5} resp, err := client.Post(h.metadata.URL, "application/json; charset=utf-8", bytes.NewBuffer(req.Data)) if err != nil { - return err + return nil, err } if resp != nil && resp.Body != nil { resp.Body.Close() } - return nil + return nil, nil } diff --git a/bindings/kafka/kafka.go b/bindings/kafka/kafka.go index 19a6fbf52..e31b2d0bd 100644 --- a/bindings/kafka/kafka.go +++ b/bindings/kafka/kafka.go @@ -104,7 +104,11 @@ func (k *Kafka) Init(metadata bindings.Metadata) error { return nil } -func (k *Kafka) Write(req *bindings.WriteRequest) error { +func (k *Kafka) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (k *Kafka) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { msg := &sarama.ProducerMessage{ Topic: k.publishTopic, Value: sarama.ByteEncoder(req.Data), @@ -115,10 +119,10 @@ func (k *Kafka) Write(req *bindings.WriteRequest) error { _, _, err := k.producer.SendMessage(msg) if err != nil { - return err + return nil, err } - return nil + return nil, nil } // GetKafkaMetadata returns new Kafka metadata diff --git a/bindings/mqtt/mqtt.go b/bindings/mqtt/mqtt.go index 3a1ebb449..62208f430 100644 --- a/bindings/mqtt/mqtt.go +++ b/bindings/mqtt/mqtt.go @@ -83,10 +83,14 @@ func (m *MQTT) getMQTTMetadata(metadata bindings.Metadata) (*mqttMetadata, error return &mMetadata, nil } -func (m *MQTT) Write(req *bindings.WriteRequest) error { +func (m *MQTT) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (m *MQTT) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { m.client.Publish(m.metadata.Topic, 0, false, string(req.Data)) m.client.Disconnect(0) - return nil + return nil, nil } func (m *MQTT) Read(handler func(*bindings.ReadResponse) error) error { diff --git a/bindings/output_binding.go b/bindings/output_binding.go index 4e9bfd328..85ae5949f 100644 --- a/bindings/output_binding.go +++ b/bindings/output_binding.go @@ -8,5 +8,6 @@ package bindings // OutputBinding is the interface for an output binding, allowing users to invoke remote systems with optional payloads type OutputBinding interface { Init(metadata Metadata) error - Write(req *WriteRequest) error + Invoke(req *InvokeRequest) (*InvokeResponse, error) + Operations() []OperationKind } diff --git a/bindings/rabbitmq/rabbitmq.go b/bindings/rabbitmq/rabbitmq.go index afbaa54fc..71a9e49d5 100644 --- a/bindings/rabbitmq/rabbitmq.go +++ b/bindings/rabbitmq/rabbitmq.go @@ -72,7 +72,11 @@ func (r *RabbitMQ) Init(metadata bindings.Metadata) error { return nil } -func (r *RabbitMQ) Write(req *bindings.WriteRequest) error { +func (r *RabbitMQ) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (r *RabbitMQ) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { pub := amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "text/plain", @@ -81,7 +85,7 @@ func (r *RabbitMQ) Write(req *bindings.WriteRequest) error { ttl, ok, err := bindings.TryGetTTL(req.Metadata) if err != nil { - return err + return nil, err } // The default time to live has been set in the queue @@ -94,10 +98,10 @@ func (r *RabbitMQ) Write(req *bindings.WriteRequest) error { err = r.channel.Publish("", r.metadata.QueueName, false, false, pub) if err != nil { - return err + return nil, err } - return nil + return nil, nil } func (r *RabbitMQ) parseMetadata(metadata bindings.Metadata) error { diff --git a/bindings/rabbitmq/rabbitmq_integration_test.go b/bindings/rabbitmq/rabbitmq_integration_test.go index 6a8c6d364..89c2de160 100644 --- a/bindings/rabbitmq/rabbitmq_integration_test.go +++ b/bindings/rabbitmq/rabbitmq_integration_test.go @@ -83,7 +83,7 @@ func TestQueuesWithTTL(t *testing.T) { defer ch.Close() const tooLateMsgContent = "too_late_msg" - err = r.Write(&bindings.WriteRequest{Data: []byte(tooLateMsgContent)}) + err = r.Write(&bindings.InvokeRequest{Data: []byte(tooLateMsgContent)}) assert.Nil(t, err) time.Sleep(time.Second + (ttlInSeconds * time.Second)) @@ -94,7 +94,7 @@ func TestQueuesWithTTL(t *testing.T) { // Getting before it is expired, should return it const testMsgContent = "test_msg" - err = r.Write(&bindings.WriteRequest{Data: []byte(testMsgContent)}) + err = r.Write(&bindings.InvokeRequest{Data: []byte(testMsgContent)}) assert.Nil(t, err) msg, ok, err := getMessageWithRetries(ch, queueName, maxGetDuration) @@ -140,7 +140,7 @@ func TestPublishingWithTTL(t *testing.T) { defer ch.Close() const tooLateMsgContent = "too_late_msg" - writeRequest := bindings.WriteRequest{ + writeRequest := bindings.InvokeRequest{ Data: []byte(tooLateMsgContent), Metadata: map[string]string{ bindings.TTLMetadataKey: strconv.Itoa(ttlInSeconds), @@ -162,7 +162,7 @@ func TestPublishingWithTTL(t *testing.T) { assert.Nil(t, err) const testMsgContent = "test_msg" - writeRequest = bindings.WriteRequest{ + writeRequest = bindings.InvokeRequest{ Data: []byte(testMsgContent), Metadata: map[string]string{ bindings.TTLMetadataKey: strconv.Itoa(ttlInSeconds * 1000), diff --git a/bindings/redis/redis.go b/bindings/redis/redis.go index 6af68d072..d986d3aa4 100644 --- a/bindings/redis/redis.go +++ b/bindings/redis/redis.go @@ -118,14 +118,18 @@ func (r *Redis) parseMetadata(meta bindings.Metadata) (metadata, error) { return m, nil } -func (r *Redis) Write(req *bindings.WriteRequest) error { +func (r *Redis) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (r *Redis) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { if val, ok := req.Metadata["key"]; ok && val != "" { key := val _, err := r.client.DoContext(context.Background(), "SET", key, req.Data).Result() if err != nil { - return err + return nil, err } - return nil + return nil, nil } - return errors.New("redis binding: missing key on write request metadata") + return nil, errors.New("redis binding: missing key on write request metadata") } diff --git a/bindings/requests.go b/bindings/requests.go index d7e442ce5..4b65b5dc5 100644 --- a/bindings/requests.go +++ b/bindings/requests.go @@ -5,8 +5,20 @@ package bindings -// WriteRequest is the object given to an dapr output binding -type WriteRequest struct { - Data []byte `json:"data"` - Metadata map[string]string `json:"metadata"` +// InvokeRequest is the object given to a dapr output binding +type InvokeRequest struct { + Data []byte `json:"data"` + Metadata map[string]string `json:"metadata"` + Operation OperationKind `json:"operation"` } + +// OperationKind defines an output binding operation +type OperationKind string + +// Non exhaustive list of operations. A binding can add operations that are not in this list. +const ( + GetOperation OperationKind = "get" + CreateOperation OperationKind = "create" + DeleteOperation OperationKind = "delete" + ListOperation OperationKind = "list" +) diff --git a/bindings/responses.go b/bindings/responses.go index ecbc093fa..2c3534167 100644 --- a/bindings/responses.go +++ b/bindings/responses.go @@ -23,3 +23,9 @@ type AppResponse struct { State []state.SetRequest `json:"state"` Concurrency string `json:"concurrency"` } + +// InvokeResponse is the response object returned from an output binding +type InvokeResponse struct { + Data []byte `json:"data"` + Metadata map[string]string `json:"metadata"` +} diff --git a/bindings/twilio/sendgrid/sendgrid.go b/bindings/twilio/sendgrid/sendgrid.go index 7607c49df..cf1b11553 100644 --- a/bindings/twilio/sendgrid/sendgrid.go +++ b/bindings/twilio/sendgrid/sendgrid.go @@ -82,8 +82,12 @@ func (sg *SendGrid) Init(metadata bindings.Metadata) error { return nil } +func (sg *SendGrid) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + // Write does the work of sending message to SendGrid API -func (sg *SendGrid) Write(req *bindings.WriteRequest) error { +func (sg *SendGrid) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { // We allow two possible sources of the properties we need, // the component metadata or request metadata, request takes priority if present @@ -96,7 +100,7 @@ func (sg *SendGrid) Write(req *bindings.WriteRequest) error { fromAddress = mail.NewEmail("", req.Metadata["emailFrom"]) } if fromAddress == nil { - return fmt.Errorf("error SendGrid from email not supplied") + return nil, fmt.Errorf("error SendGrid from email not supplied") } // Build email to address, this is required @@ -108,7 +112,7 @@ func (sg *SendGrid) Write(req *bindings.WriteRequest) error { toAddress = mail.NewEmail("", req.Metadata["emailTo"]) } if toAddress == nil { - return fmt.Errorf("error SendGrid to email not supplied") + return nil, fmt.Errorf("error SendGrid to email not supplied") } // Build email subject, this is required @@ -120,7 +124,7 @@ func (sg *SendGrid) Write(req *bindings.WriteRequest) error { subject = req.Metadata["subject"] } if subject == "" { - return fmt.Errorf("error SendGrid subject not supplied") + return nil, fmt.Errorf("error SendGrid subject not supplied") } // Build email cc address, this is optional @@ -165,7 +169,7 @@ func (sg *SendGrid) Write(req *bindings.WriteRequest) error { client := sendgrid.NewSendClient(sg.metadata.APIKey) resp, err := client.Send(email) if err != nil { - return fmt.Errorf("error from SendGrid, sending email failed: %+v", err) + return nil, fmt.Errorf("error from SendGrid, sending email failed: %+v", err) } // Check SendGrid response is OK @@ -174,9 +178,9 @@ func (sg *SendGrid) Write(req *bindings.WriteRequest) error { sendGridError := sendGridRestError{} json.NewDecoder(strings.NewReader(resp.Body)).Decode(&sendGridError) // Pass it back to the caller, so they have some idea what went wrong - return fmt.Errorf("error from SendGrid, sending email failed: %d %+v", resp.StatusCode, sendGridError) + return nil, fmt.Errorf("error from SendGrid, sending email failed: %d %+v", resp.StatusCode, sendGridError) } sg.logger.Info("sent email with SendGrid") - return nil + return nil, nil } diff --git a/bindings/twilio/sms/sms.go b/bindings/twilio/sms/sms.go index d4116bd0b..e020b8742 100644 --- a/bindings/twilio/sms/sms.go +++ b/bindings/twilio/sms/sms.go @@ -75,12 +75,16 @@ func (t *SMS) Init(metadata bindings.Metadata) error { return nil } -func (t *SMS) Write(req *bindings.WriteRequest) error { +func (t *SMS) Operations() []bindings.OperationKind { + return []bindings.OperationKind{bindings.CreateOperation} +} + +func (t *SMS) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { toNumberValue := t.metadata.toNumber if toNumberValue == "" { toNumberFromRequest, ok := req.Metadata[toNumber] if !ok || toNumberFromRequest == "" { - return errors.New("twilio missing \"toNumber\" field") + return nil, errors.New("twilio missing \"toNumber\" field") } toNumberValue = toNumberFromRequest } @@ -94,7 +98,7 @@ func (t *SMS) Write(req *bindings.WriteRequest) error { twilioURL := fmt.Sprintf("%s%s/Messages.json", twilioURLBase, t.metadata.accountSid) httpReq, err := http.NewRequest("POST", twilioURL, &vDr) if err != nil { - return err + return nil, err } httpReq.SetBasicAuth(t.metadata.accountSid, t.metadata.authToken) httpReq.Header.Add("Accept", "application/json") @@ -102,11 +106,11 @@ func (t *SMS) Write(req *bindings.WriteRequest) error { resp, err := t.httpClient.Do(httpReq) if err != nil { - return err + return nil, err } defer resp.Body.Close() if !(resp.StatusCode >= 200 && resp.StatusCode < 300) { - return fmt.Errorf("error from Twilio: %s", resp.Status) + return nil, fmt.Errorf("error from Twilio: %s", resp.Status) } - return nil + return nil, nil } diff --git a/bindings/twilio/sms/sms_test.go b/bindings/twilio/sms/sms_test.go index 759ec9b58..fc919a269 100644 --- a/bindings/twilio/sms/sms_test.go +++ b/bindings/twilio/sms/sms_test.go @@ -69,7 +69,7 @@ func TestWriteShouldSucceed(t *testing.T) { t.Run("Should succeed with expected url and headers", func(t *testing.T) { httpTransport.reset() - err := tw.Write(&bindings.WriteRequest{ + _, err := tw.Invoke(&bindings.InvokeRequest{ Data: []byte("hello world"), Metadata: map[string]string{ toNumber: "toNumber", @@ -104,7 +104,7 @@ func TestWriteShouldFail(t *testing.T) { t.Run("Missing 'to' should fail", func(t *testing.T) { httpTransport.reset() - err := tw.Write(&bindings.WriteRequest{ + _, err := tw.Invoke(&bindings.InvokeRequest{ Data: []byte("hello world"), Metadata: map[string]string{}, }) @@ -116,7 +116,7 @@ func TestWriteShouldFail(t *testing.T) { httpTransport.reset() httpErr := errors.New("twilio fake error") httpTransport.errToReturn = httpErr - err := tw.Write(&bindings.WriteRequest{ + _, err := tw.Invoke(&bindings.InvokeRequest{ Data: []byte("hello world"), Metadata: map[string]string{ toNumber: "toNumber", @@ -130,7 +130,7 @@ func TestWriteShouldFail(t *testing.T) { t.Run("Twilio call returns status not >=200 and <300", func(t *testing.T) { httpTransport.reset() httpTransport.response.StatusCode = 401 - err := tw.Write(&bindings.WriteRequest{ + _, err := tw.Invoke(&bindings.InvokeRequest{ Data: []byte("hello world"), Metadata: map[string]string{ toNumber: "toNumber",