Add bi-directional bindings support (#350)

* add bi-directional bindings

* added invoke response

* added explicit types

* change operationtype to operationkind
This commit is contained in:
Yaron Schneider 2020-05-26 23:32:20 -07:00 committed by GitHub
parent d3e9a7b3a7
commit e82164f4e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 228 additions and 117 deletions

View File

@ -24,10 +24,14 @@ type InputBinding interface {
Output binding: Output binding:
An output binding can be used to invoke an external system and also to return data from it.
Each output binding can decide which operations it supports. This information is communicated to the caller via the `Operations()` method.
```go ```go
type OutputBinding interface { type OutputBinding interface {
Init(metadata Metadata) error Init(metadata Metadata) error
Write(req *WriteRequest) error Invoke(req *InvokeRequest) error
Operations() []OperationKind
} }
``` ```
A spec is also needed in [Dapr docs](https://github.com/dapr/docs/tree/master/reference/specs/bindings). A spec is also needed in [Dapr docs](https://github.com/dapr/docs/tree/master/reference/specs/bindings).

View File

@ -50,7 +50,11 @@ func (s *AliCloudOSS) Init(metadata bindings.Metadata) error {
return nil return nil
} }
func (s *AliCloudOSS) Write(req *bindings.WriteRequest) error { func (s *AliCloudOSS) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (s *AliCloudOSS) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
key := "" key := ""
if val, ok := req.Metadata["key"]; ok && val != "" { if val, ok := req.Metadata["key"]; ok && val != "" {
key = val key = val
@ -63,16 +67,16 @@ func (s *AliCloudOSS) Write(req *bindings.WriteRequest) error {
bucket, err := s.client.Bucket(s.metadata.Bucket) bucket, err := s.client.Bucket(s.metadata.Bucket)
if err != nil { if err != nil {
return err return nil, err
} }
// Upload a byte array. // Upload a byte array.
err = bucket.PutObject(key, bytes.NewReader(req.Data)) err = bucket.PutObject(key, bytes.NewReader(req.Data))
if err != nil { if err != nil {
return err return nil, err
} }
return err return nil, err
} }
func (s *AliCloudOSS) parseMetadata(metadata bindings.Metadata) (*ossMetadata, error) { func (s *AliCloudOSS) parseMetadata(metadata bindings.Metadata) (*ossMetadata, error) {

View File

@ -55,16 +55,20 @@ func (d *DynamoDB) Init(metadata bindings.Metadata) error {
return nil return nil
} }
func (d *DynamoDB) Write(req *bindings.WriteRequest) error { func (d *DynamoDB) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (d *DynamoDB) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var obj interface{} var obj interface{}
err := json.Unmarshal(req.Data, &obj) err := json.Unmarshal(req.Data, &obj)
if err != nil { if err != nil {
return err return nil, err
} }
item, err := dynamodbattribute.MarshalMap(obj) item, err := dynamodbattribute.MarshalMap(obj)
if err != nil { if err != nil {
return err return nil, err
} }
input := &dynamodb.PutItemInput{ input := &dynamodb.PutItemInput{
@ -74,10 +78,10 @@ func (d *DynamoDB) Write(req *bindings.WriteRequest) error {
_, err = d.client.PutItem(input) _, err = d.client.PutItem(input)
if err != nil { if err != nil {
return err return nil, err
} }
return nil return nil, nil
} }
func (d *DynamoDB) getDynamoDBMetadata(spec bindings.Metadata) (*dynamoDBMetadata, error) { func (d *DynamoDB) getDynamoDBMetadata(spec bindings.Metadata) (*dynamoDBMetadata, error) {

View File

@ -122,7 +122,11 @@ func (a *AWSKinesis) Init(metadata bindings.Metadata) error {
return nil return nil
} }
func (a *AWSKinesis) Write(req *bindings.WriteRequest) error { func (a *AWSKinesis) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (a *AWSKinesis) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
partitionKey := req.Metadata[partitionKeyName] partitionKey := req.Metadata[partitionKeyName]
if partitionKey == "" { if partitionKey == "" {
partitionKey = uuid.New().String() partitionKey = uuid.New().String()
@ -132,7 +136,7 @@ func (a *AWSKinesis) Write(req *bindings.WriteRequest) error {
Data: req.Data, Data: req.Data,
PartitionKey: &partitionKey, PartitionKey: &partitionKey,
}) })
return err return nil, err
} }
func (a *AWSKinesis) Read(handler func(*bindings.ReadResponse) error) error { func (a *AWSKinesis) Read(handler func(*bindings.ReadResponse) error) error {

View File

@ -54,7 +54,11 @@ func (s *AWSS3) Init(metadata bindings.Metadata) error {
return nil return nil
} }
func (s *AWSS3) Write(req *bindings.WriteRequest) error { func (s *AWSS3) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (s *AWSS3) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
key := "" key := ""
if val, ok := req.Metadata["key"]; ok && val != "" { if val, ok := req.Metadata["key"]; ok && val != "" {
key = val key = val
@ -69,7 +73,7 @@ func (s *AWSS3) Write(req *bindings.WriteRequest) error {
Key: aws.String(key), Key: aws.String(key),
Body: r, Body: r,
}) })
return err return nil, err
} }
func (s *AWSS3) parseMetadata(metadata bindings.Metadata) (*s3Metadata, error) { func (s *AWSS3) parseMetadata(metadata bindings.Metadata) (*s3Metadata, error) {

View File

@ -81,11 +81,15 @@ func (a *AWSSNS) getClient(metadata *snsMetadata) (*sns.SNS, error) {
return c, nil return c, nil
} }
func (a *AWSSNS) Write(req *bindings.WriteRequest) error { func (a *AWSSNS) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (a *AWSSNS) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var payload dataPayload var payload dataPayload
err := json.Unmarshal(req.Data, &payload) err := json.Unmarshal(req.Data, &payload)
if err != nil { if err != nil {
return err return nil, err
} }
msg := fmt.Sprintf("%v", payload.Message) msg := fmt.Sprintf("%v", payload.Message)
@ -99,7 +103,7 @@ func (a *AWSSNS) Write(req *bindings.WriteRequest) error {
_, err = a.client.Publish(input) _, err = a.client.Publish(input)
if err != nil { if err != nil {
return err return nil, err
} }
return nil return nil, nil
} }

View File

@ -62,13 +62,17 @@ func (a *AWSSQS) Init(metadata bindings.Metadata) error {
return nil return nil
} }
func (a *AWSSQS) Write(req *bindings.WriteRequest) error { func (a *AWSSQS) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (a *AWSSQS) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
msgBody := string(req.Data) msgBody := string(req.Data)
_, err := a.Client.SendMessage(&sqs.SendMessageInput{ _, err := a.Client.SendMessage(&sqs.SendMessageInput{
MessageBody: &msgBody, MessageBody: &msgBody,
QueueUrl: a.QueueURL, QueueUrl: a.QueueURL,
}) })
return err return nil, err
} }
func (a *AWSSQS) Read(handler func(*bindings.ReadResponse) error) error { func (a *AWSSQS) Read(handler func(*bindings.ReadResponse) error) error {

View File

@ -90,7 +90,11 @@ func (a *AzureBlobStorage) parseMetadata(metadata bindings.Metadata) (*blobStora
return &m, nil return &m, nil
} }
func (a *AzureBlobStorage) Write(req *bindings.WriteRequest) error { func (a *AzureBlobStorage) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (a *AzureBlobStorage) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
name := "" name := ""
if val, ok := req.Metadata[blobName]; ok && val != "" { if val, ok := req.Metadata[blobName]; ok && val != "" {
name = val name = val
@ -108,7 +112,7 @@ func (a *AzureBlobStorage) Write(req *bindings.WriteRequest) error {
if val, ok := req.Metadata[contentMD5]; ok && val != "" { if val, ok := req.Metadata[contentMD5]; ok && val != "" {
sDec, err := b64.StdEncoding.DecodeString(val) sDec, err := b64.StdEncoding.DecodeString(val)
if err != nil || len(sDec) != 16 { if err != nil || len(sDec) != 16 {
return fmt.Errorf("the MD5 value specified in Content MD5 is invalid, MD5 value must be 128 bits and base64 encoded") return nil, fmt.Errorf("the MD5 value specified in Content MD5 is invalid, MD5 value must be 128 bits and base64 encoded")
} }
blobHTTPHeaders.ContentMD5 = sDec blobHTTPHeaders.ContentMD5 = sDec
delete(req.Metadata, contentMD5) delete(req.Metadata, contentMD5)
@ -138,5 +142,5 @@ func (a *AzureBlobStorage) Write(req *bindings.WriteRequest) error {
Metadata: req.Metadata, Metadata: req.Metadata,
BlobHTTPHeaders: blobHTTPHeaders, BlobHTTPHeaders: blobHTTPHeaders,
}) })
return err return nil, err
} }

View File

@ -97,24 +97,28 @@ func (c *CosmosDB) parseMetadata(metadata bindings.Metadata) (*cosmosDBCredentia
return &creds, nil return &creds, nil
} }
func (c *CosmosDB) Write(req *bindings.WriteRequest) error { func (c *CosmosDB) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (c *CosmosDB) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var obj interface{} var obj interface{}
err := json.Unmarshal(req.Data, &obj) err := json.Unmarshal(req.Data, &obj)
if err != nil { if err != nil {
return err return nil, err
} }
val, err := c.getPartitionKeyValue(c.partitionKey, obj) val, err := c.getPartitionKeyValue(c.partitionKey, obj)
if err != nil { if err != nil {
return err return nil, err
} }
_, err = c.client.CreateDocument(c.collection.Self, obj, documentdb.PartitionKey(val)) _, err = c.client.CreateDocument(c.collection.Self, obj, documentdb.PartitionKey(val))
if err != nil { if err != nil {
return err return nil, err
} }
return nil return nil, nil
} }
func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (interface{}, error) { func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (interface{}, error) {

View File

@ -107,11 +107,15 @@ func (a *AzureEventGrid) Read(handler func(*bindings.ReadResponse) error) error
return nil return nil
} }
func (a *AzureEventGrid) Write(req *bindings.WriteRequest) error { func (a *AzureEventGrid) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (a *AzureEventGrid) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
err := a.ensureOutputBindingMetadata() err := a.ensureOutputBindingMetadata()
if err != nil { if err != nil {
a.logger.Error(err.Error()) a.logger.Error(err.Error())
return err return nil, err
} }
request := fasthttp.AcquireRequest() request := fasthttp.AcquireRequest()
@ -129,16 +133,16 @@ func (a *AzureEventGrid) Write(req *bindings.WriteRequest) error {
err = client.Do(request, response) err = client.Do(request, response)
if err != nil { if err != nil {
a.logger.Error(err.Error()) a.logger.Error(err.Error())
return err return nil, err
} }
if response.StatusCode() != fasthttp.StatusOK { if response.StatusCode() != fasthttp.StatusOK {
body := response.Body() body := response.Body()
a.logger.Error(string(body)) a.logger.Error(string(body))
return errors.New(string(body)) return nil, errors.New(string(body))
} }
return nil return nil, nil
} }
func (a *AzureEventGrid) ensureInputBindingMetadata() error { func (a *AzureEventGrid) ensureInputBindingMetadata() error {

View File

@ -138,8 +138,12 @@ func parseMetadata(meta bindings.Metadata) (*azureEventHubsMetadata, error) {
return m, nil return m, nil
} }
func (a *AzureEventHubs) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
// Write posts an event hubs message // Write posts an event hubs message
func (a *AzureEventHubs) Write(req *bindings.WriteRequest) error { func (a *AzureEventHubs) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
event := &eventhub.Event{ event := &eventhub.Event{
Data: req.Data, Data: req.Data,
} }
@ -156,10 +160,10 @@ func (a *AzureEventHubs) Write(req *bindings.WriteRequest) error {
err := a.hub.Send(context.Background(), event) err := a.hub.Send(context.Background(), event)
if err != nil { if err != nil {
return err return nil, err
} }
return nil return nil, nil
} }
// Read gets messages from eventhubs in a non-blocking fashion // Read gets messages from eventhubs in a non-blocking fashion

View File

@ -127,7 +127,11 @@ func (a *AzureServiceBusQueues) parseMetadata(metadata bindings.Metadata) (*serv
return &m, nil return &m, nil
} }
func (a *AzureServiceBusQueues) Write(req *bindings.WriteRequest) error { func (a *AzureServiceBusQueues) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (a *AzureServiceBusQueues) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel() defer cancel()
@ -141,14 +145,14 @@ func (a *AzureServiceBusQueues) Write(req *bindings.WriteRequest) error {
ttl, ok, err := bindings.TryGetTTL(req.Metadata) ttl, ok, err := bindings.TryGetTTL(req.Metadata)
if err != nil { if err != nil {
return err return nil, err
} }
if ok { if ok {
msg.TTL = &ttl msg.TTL = &ttl
} }
return a.client.Send(ctx, msg) return nil, a.client.Send(ctx, msg)
} }
func (a *AzureServiceBusQueues) Read(handler func(*bindings.ReadResponse) error) error { func (a *AzureServiceBusQueues) Read(handler func(*bindings.ReadResponse) error) error {

View File

@ -83,7 +83,7 @@ func TestQueueWithTTL(t *testing.T) {
// Assert that if waited too long, we won't see any message // Assert that if waited too long, we won't see any message
const tooLateMsgContent = "too_late_msg" const tooLateMsgContent = "too_late_msg"
err = a.Write(&bindings.WriteRequest{Data: []byte(tooLateMsgContent)}) err = a.Write(&bindings.InvokeRequest{Data: []byte(tooLateMsgContent)})
assert.Nil(t, err) assert.Nil(t, err)
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)
@ -97,7 +97,7 @@ func TestQueueWithTTL(t *testing.T) {
// Getting before it is expired, should return it // Getting before it is expired, should return it
const testMsgContent = "test_msg" const testMsgContent = "test_msg"
err = a.Write(&bindings.WriteRequest{Data: []byte(testMsgContent)}) err = a.Write(&bindings.InvokeRequest{Data: []byte(testMsgContent)})
assert.Nil(t, err) assert.Nil(t, err)
msg, ok, err := getMessageWithRetries(queue, maxGetDuration) msg, ok, err := getMessageWithRetries(queue, maxGetDuration)
@ -136,7 +136,7 @@ func TestPublishingWithTTL(t *testing.T) {
assert.Equal(t, defaultAzureServiceBusMessageTimeToLive, *queueEntity.DefaultMessageTimeToLive) assert.Equal(t, defaultAzureServiceBusMessageTimeToLive, *queueEntity.DefaultMessageTimeToLive)
const tooLateMsgContent = "too_late_msg" const tooLateMsgContent = "too_late_msg"
writeRequest := bindings.WriteRequest{ writeRequest := bindings.InvokeRequest{
Data: []byte(tooLateMsgContent), Data: []byte(tooLateMsgContent),
Metadata: map[string]string{ Metadata: map[string]string{
bindings.TTLMetadataKey: "1", bindings.TTLMetadataKey: "1",
@ -160,7 +160,7 @@ func TestPublishingWithTTL(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
const testMsgContent = "test_msg" const testMsgContent = "test_msg"
writeRequest = bindings.WriteRequest{ writeRequest = bindings.InvokeRequest{
Data: []byte(testMsgContent), Data: []byte(testMsgContent),
Metadata: map[string]string{ Metadata: map[string]string{
bindings.TTLMetadataKey: "1", bindings.TTLMetadataKey: "1",

View File

@ -95,7 +95,7 @@ func (s *SignalR) Init(metadata bindings.Metadata) error {
return nil return nil
} }
func (s *SignalR) resolveAPIURL(req *bindings.WriteRequest) (string, error) { func (s *SignalR) resolveAPIURL(req *bindings.InvokeRequest) (string, error) {
hub := s.hub hub := s.hub
if hub == "" { if hub == "" {
hubFromRequest, ok := req.Metadata[hubKey] hubFromRequest, ok := req.Metadata[hubKey]
@ -148,23 +148,27 @@ func (s *SignalR) sendMessageToSignalR(url string, token string, data []byte) er
return nil return nil
} }
func (s *SignalR) Write(req *bindings.WriteRequest) error { func (s *SignalR) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (s *SignalR) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
url, err := s.resolveAPIURL(req) url, err := s.resolveAPIURL(req)
if err != nil { if err != nil {
return err return nil, err
} }
token, err := s.ensureValidToken(url) token, err := s.ensureValidToken(url)
if err != nil { if err != nil {
return err return nil, err
} }
err = s.sendMessageToSignalR(url, token, req.Data) err = s.sendMessageToSignalR(url, token, req.Data)
if err != nil { if err != nil {
return err return nil, err
} }
return nil return nil, nil
} }
func (s *SignalR) ensureValidToken(url string) (string, error) { func (s *SignalR) ensureValidToken(url string) (string, error) {

View File

@ -177,7 +177,7 @@ func TestWriteShouldFail(t *testing.T) {
t.Run("Missing hub should fail", func(t *testing.T) { t.Run("Missing hub should fail", func(t *testing.T) {
httpTransport.reset() httpTransport.reset()
err := s.Write(&bindings.WriteRequest{ _, err := s.Invoke(&bindings.InvokeRequest{
Data: []byte("hello world"), Data: []byte("hello world"),
Metadata: map[string]string{}, Metadata: map[string]string{},
}) })
@ -189,7 +189,7 @@ func TestWriteShouldFail(t *testing.T) {
httpTransport.reset() httpTransport.reset()
httpErr := errors.New("fake error") httpErr := errors.New("fake error")
httpTransport.errToReturn = httpErr httpTransport.errToReturn = httpErr
err := s.Write(&bindings.WriteRequest{ _, err := s.Invoke(&bindings.InvokeRequest{
Data: []byte("hello world"), Data: []byte("hello world"),
Metadata: map[string]string{ Metadata: map[string]string{
hubKey: "testHub", hubKey: "testHub",
@ -203,7 +203,7 @@ func TestWriteShouldFail(t *testing.T) {
t.Run("SignalR call returns status != [200, 202]", func(t *testing.T) { t.Run("SignalR call returns status != [200, 202]", func(t *testing.T) {
httpTransport.reset() httpTransport.reset()
httpTransport.response.StatusCode = 401 httpTransport.response.StatusCode = 401
err := s.Write(&bindings.WriteRequest{ _, err := s.Invoke(&bindings.InvokeRequest{
Data: []byte("hello world"), Data: []byte("hello world"),
Metadata: map[string]string{ Metadata: map[string]string{
hubKey: "testHub", hubKey: "testHub",
@ -228,7 +228,7 @@ func TestWriteShouldSucceed(t *testing.T) {
t.Run("Has authorization", func(t *testing.T) { t.Run("Has authorization", func(t *testing.T) {
httpTransport.reset() httpTransport.reset()
err := s.Write(&bindings.WriteRequest{ _, err := s.Invoke(&bindings.InvokeRequest{
Data: []byte("hello world"), Data: []byte("hello world"),
Metadata: map[string]string{ Metadata: map[string]string{
hubKey: "testHub", hubKey: "testHub",
@ -262,7 +262,7 @@ func TestWriteShouldSucceed(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
httpTransport.reset() httpTransport.reset()
s.hub = tt.hubInMetadata s.hub = tt.hubInMetadata
err := s.Write(&bindings.WriteRequest{ _, err := s.Invoke(&bindings.InvokeRequest{
Data: []byte("hello world"), Data: []byte("hello world"),
Metadata: map[string]string{ Metadata: map[string]string{
hubKey: tt.hubInWriteRequest, hubKey: tt.hubInWriteRequest,

View File

@ -191,11 +191,15 @@ func (a *AzureStorageQueues) parseMetadata(metadata bindings.Metadata) (*storage
return &m, nil return &m, nil
} }
func (a *AzureStorageQueues) Write(req *bindings.WriteRequest) error { func (a *AzureStorageQueues) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (a *AzureStorageQueues) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
ttlToUse := a.metadata.ttl ttlToUse := a.metadata.ttl
ttl, ok, err := bindings.TryGetTTL(req.Metadata) ttl, ok, err := bindings.TryGetTTL(req.Metadata)
if err != nil { if err != nil {
return err return nil, err
} }
if ok { if ok {
@ -204,9 +208,9 @@ func (a *AzureStorageQueues) Write(req *bindings.WriteRequest) error {
err = a.helper.Write(req.Data, ttlToUse) err = a.helper.Write(req.Data, ttlToUse)
if err != nil { if err != nil {
return err return nil, err
} }
return nil return nil, nil
} }
func (a *AzureStorageQueues) Read(handler func(*bindings.ReadResponse) error) error { func (a *AzureStorageQueues) Read(handler func(*bindings.ReadResponse) error) error {

View File

@ -51,9 +51,9 @@ func TestWriteQueue(t *testing.T) {
err := a.Init(m) err := a.Init(m)
assert.Nil(t, err) assert.Nil(t, err)
r := bindings.WriteRequest{Data: []byte("This is my message")} r := bindings.InvokeRequest{Data: []byte("This is my message")}
err = a.Write(&r) _, err = a.Invoke(&r)
assert.Nil(t, err) assert.Nil(t, err)
} }
@ -73,9 +73,9 @@ func TestWriteWithTTLInQueue(t *testing.T) {
err := a.Init(m) err := a.Init(m)
assert.Nil(t, err) assert.Nil(t, err)
r := bindings.WriteRequest{Data: []byte("This is my message")} r := bindings.InvokeRequest{Data: []byte("This is my message")}
err = a.Write(&r) _, err = a.Invoke(&r)
assert.Nil(t, err) assert.Nil(t, err)
} }
@ -95,12 +95,12 @@ func TestWriteWithTTLInWrite(t *testing.T) {
err := a.Init(m) err := a.Init(m)
assert.Nil(t, err) assert.Nil(t, err)
r := bindings.WriteRequest{ r := bindings.InvokeRequest{
Data: []byte("This is my message"), Data: []byte("This is my message"),
Metadata: map[string]string{bindings.TTLMetadataKey: "1"}, Metadata: map[string]string{bindings.TTLMetadataKey: "1"},
} }
err = a.Write(&r) _, err = a.Invoke(&r)
assert.Nil(t, err) assert.Nil(t, err)
} }
@ -116,7 +116,7 @@ func TestWriteWithTTLInWrite(t *testing.T) {
err := a.Init(m) err := a.Init(m)
assert.Nil(t, err) assert.Nil(t, err)
r := bindings.WriteRequest{Data: []byte("This is my message")} r := bindings.InvokeRequest{Data: []byte("This is my message")}
err = a.Write(&r) err = a.Write(&r)
@ -135,9 +135,9 @@ func TestReadQueue(t *testing.T) {
err := a.Init(m) err := a.Init(m)
assert.Nil(t, err) assert.Nil(t, err)
r := bindings.WriteRequest{Data: []byte("This is my message")} r := bindings.InvokeRequest{Data: []byte("This is my message")}
err = a.Write(&r) _, err = a.Invoke(&r)
assert.Nil(t, err) assert.Nil(t, err)
@ -169,9 +169,9 @@ func TestReadQueueDecode(t *testing.T) {
err := a.Init(m) err := a.Init(m)
assert.Nil(t, err) assert.Nil(t, err)
r := bindings.WriteRequest{Data: []byte("VGhpcyBpcyBteSBtZXNzYWdl")} r := bindings.InvokeRequest{Data: []byte("VGhpcyBpcyBteSBtZXNzYWdl")}
err = a.Write(&r) _, err = a.Invoke(&r)
assert.Nil(t, err) assert.Nil(t, err)
@ -200,7 +200,7 @@ func TestReadQueueDecode(t *testing.T) {
err := a.Init(m) err := a.Init(m)
assert.Nil(t, err) assert.Nil(t, err)
r := bindings.WriteRequest{Data: []byte("This is my message")} r := bindings.InvokeRequest{Data: []byte("This is my message")}
err = a.Write(&r) err = a.Write(&r)

View File

@ -74,7 +74,11 @@ func (g *GCPStorage) parseMetadata(metadata bindings.Metadata) ([]byte, error) {
return b, nil return b, nil
} }
func (g *GCPStorage) Write(req *bindings.WriteRequest) error { func (g *GCPStorage) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (g *GCPStorage) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
name := "" name := ""
if val, ok := req.Metadata["name"]; ok && val != "" { if val, ok := req.Metadata["name"]; ok && val != "" {
name = val name = val
@ -84,7 +88,7 @@ func (g *GCPStorage) Write(req *bindings.WriteRequest) error {
h := g.client.Bucket(g.metadata.Bucket).Object(name).NewWriter(context.Background()) h := g.client.Bucket(g.metadata.Bucket).Object(name).NewWriter(context.Background())
defer h.Close() defer h.Close()
if _, err := h.Write(req.Data); err != nil { if _, err := h.Write(req.Data); err != nil {
return err return nil, err
} }
return nil return nil, nil
} }

View File

@ -92,7 +92,11 @@ func (g *GCPPubSub) Read(handler func(*bindings.ReadResponse) error) error {
return err return err
} }
func (g *GCPPubSub) Write(req *bindings.WriteRequest) error { func (g *GCPPubSub) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (g *GCPPubSub) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
topicName := g.metadata.Topic topicName := g.metadata.Topic
if val, ok := req.Metadata[topic]; ok && val != "" { if val, ok := req.Metadata[topic]; ok && val != "" {
topicName = val topicName = val
@ -103,5 +107,5 @@ func (g *GCPPubSub) Write(req *bindings.WriteRequest) error {
_, err := t.Publish(ctx, &pubsub.Message{ _, err := t.Publish(ctx, &pubsub.Message{
Data: req.Data, Data: req.Data,
}).Get(ctx) }).Get(ctx)
return err return nil, err
} }

View File

@ -81,14 +81,18 @@ func (h *HTTPSource) Read(handler func(*bindings.ReadResponse) error) error {
return nil return nil
} }
func (h *HTTPSource) Write(req *bindings.WriteRequest) error { func (h *HTTPSource) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (h *HTTPSource) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
client := http.Client{Timeout: time.Second * 5} client := http.Client{Timeout: time.Second * 5}
resp, err := client.Post(h.metadata.URL, "application/json; charset=utf-8", bytes.NewBuffer(req.Data)) resp, err := client.Post(h.metadata.URL, "application/json; charset=utf-8", bytes.NewBuffer(req.Data))
if err != nil { if err != nil {
return err return nil, err
} }
if resp != nil && resp.Body != nil { if resp != nil && resp.Body != nil {
resp.Body.Close() resp.Body.Close()
} }
return nil return nil, nil
} }

View File

@ -104,7 +104,11 @@ func (k *Kafka) Init(metadata bindings.Metadata) error {
return nil return nil
} }
func (k *Kafka) Write(req *bindings.WriteRequest) error { func (k *Kafka) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (k *Kafka) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
msg := &sarama.ProducerMessage{ msg := &sarama.ProducerMessage{
Topic: k.publishTopic, Topic: k.publishTopic,
Value: sarama.ByteEncoder(req.Data), Value: sarama.ByteEncoder(req.Data),
@ -115,10 +119,10 @@ func (k *Kafka) Write(req *bindings.WriteRequest) error {
_, _, err := k.producer.SendMessage(msg) _, _, err := k.producer.SendMessage(msg)
if err != nil { if err != nil {
return err return nil, err
} }
return nil return nil, nil
} }
// GetKafkaMetadata returns new Kafka metadata // GetKafkaMetadata returns new Kafka metadata

View File

@ -83,10 +83,14 @@ func (m *MQTT) getMQTTMetadata(metadata bindings.Metadata) (*mqttMetadata, error
return &mMetadata, nil return &mMetadata, nil
} }
func (m *MQTT) Write(req *bindings.WriteRequest) error { func (m *MQTT) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (m *MQTT) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
m.client.Publish(m.metadata.Topic, 0, false, string(req.Data)) m.client.Publish(m.metadata.Topic, 0, false, string(req.Data))
m.client.Disconnect(0) m.client.Disconnect(0)
return nil return nil, nil
} }
func (m *MQTT) Read(handler func(*bindings.ReadResponse) error) error { func (m *MQTT) Read(handler func(*bindings.ReadResponse) error) error {

View File

@ -8,5 +8,6 @@ package bindings
// OutputBinding is the interface for an output binding, allowing users to invoke remote systems with optional payloads // OutputBinding is the interface for an output binding, allowing users to invoke remote systems with optional payloads
type OutputBinding interface { type OutputBinding interface {
Init(metadata Metadata) error Init(metadata Metadata) error
Write(req *WriteRequest) error Invoke(req *InvokeRequest) (*InvokeResponse, error)
Operations() []OperationKind
} }

View File

@ -72,7 +72,11 @@ func (r *RabbitMQ) Init(metadata bindings.Metadata) error {
return nil return nil
} }
func (r *RabbitMQ) Write(req *bindings.WriteRequest) error { func (r *RabbitMQ) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (r *RabbitMQ) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
pub := amqp.Publishing{ pub := amqp.Publishing{
DeliveryMode: amqp.Persistent, DeliveryMode: amqp.Persistent,
ContentType: "text/plain", ContentType: "text/plain",
@ -81,7 +85,7 @@ func (r *RabbitMQ) Write(req *bindings.WriteRequest) error {
ttl, ok, err := bindings.TryGetTTL(req.Metadata) ttl, ok, err := bindings.TryGetTTL(req.Metadata)
if err != nil { if err != nil {
return err return nil, err
} }
// The default time to live has been set in the queue // The default time to live has been set in the queue
@ -94,10 +98,10 @@ func (r *RabbitMQ) Write(req *bindings.WriteRequest) error {
err = r.channel.Publish("", r.metadata.QueueName, false, false, pub) err = r.channel.Publish("", r.metadata.QueueName, false, false, pub)
if err != nil { if err != nil {
return err return nil, err
} }
return nil return nil, nil
} }
func (r *RabbitMQ) parseMetadata(metadata bindings.Metadata) error { func (r *RabbitMQ) parseMetadata(metadata bindings.Metadata) error {

View File

@ -83,7 +83,7 @@ func TestQueuesWithTTL(t *testing.T) {
defer ch.Close() defer ch.Close()
const tooLateMsgContent = "too_late_msg" const tooLateMsgContent = "too_late_msg"
err = r.Write(&bindings.WriteRequest{Data: []byte(tooLateMsgContent)}) err = r.Write(&bindings.InvokeRequest{Data: []byte(tooLateMsgContent)})
assert.Nil(t, err) assert.Nil(t, err)
time.Sleep(time.Second + (ttlInSeconds * time.Second)) time.Sleep(time.Second + (ttlInSeconds * time.Second))
@ -94,7 +94,7 @@ func TestQueuesWithTTL(t *testing.T) {
// Getting before it is expired, should return it // Getting before it is expired, should return it
const testMsgContent = "test_msg" const testMsgContent = "test_msg"
err = r.Write(&bindings.WriteRequest{Data: []byte(testMsgContent)}) err = r.Write(&bindings.InvokeRequest{Data: []byte(testMsgContent)})
assert.Nil(t, err) assert.Nil(t, err)
msg, ok, err := getMessageWithRetries(ch, queueName, maxGetDuration) msg, ok, err := getMessageWithRetries(ch, queueName, maxGetDuration)
@ -140,7 +140,7 @@ func TestPublishingWithTTL(t *testing.T) {
defer ch.Close() defer ch.Close()
const tooLateMsgContent = "too_late_msg" const tooLateMsgContent = "too_late_msg"
writeRequest := bindings.WriteRequest{ writeRequest := bindings.InvokeRequest{
Data: []byte(tooLateMsgContent), Data: []byte(tooLateMsgContent),
Metadata: map[string]string{ Metadata: map[string]string{
bindings.TTLMetadataKey: strconv.Itoa(ttlInSeconds), bindings.TTLMetadataKey: strconv.Itoa(ttlInSeconds),
@ -162,7 +162,7 @@ func TestPublishingWithTTL(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
const testMsgContent = "test_msg" const testMsgContent = "test_msg"
writeRequest = bindings.WriteRequest{ writeRequest = bindings.InvokeRequest{
Data: []byte(testMsgContent), Data: []byte(testMsgContent),
Metadata: map[string]string{ Metadata: map[string]string{
bindings.TTLMetadataKey: strconv.Itoa(ttlInSeconds * 1000), bindings.TTLMetadataKey: strconv.Itoa(ttlInSeconds * 1000),

View File

@ -118,14 +118,18 @@ func (r *Redis) parseMetadata(meta bindings.Metadata) (metadata, error) {
return m, nil return m, nil
} }
func (r *Redis) Write(req *bindings.WriteRequest) error { func (r *Redis) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (r *Redis) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
if val, ok := req.Metadata["key"]; ok && val != "" { if val, ok := req.Metadata["key"]; ok && val != "" {
key := val key := val
_, err := r.client.DoContext(context.Background(), "SET", key, req.Data).Result() _, err := r.client.DoContext(context.Background(), "SET", key, req.Data).Result()
if err != nil { if err != nil {
return err return nil, err
} }
return nil return nil, nil
} }
return errors.New("redis binding: missing key on write request metadata") return nil, errors.New("redis binding: missing key on write request metadata")
} }

View File

@ -5,8 +5,20 @@
package bindings package bindings
// WriteRequest is the object given to an dapr output binding // InvokeRequest is the object given to a dapr output binding
type WriteRequest struct { type InvokeRequest struct {
Data []byte `json:"data"` Data []byte `json:"data"`
Metadata map[string]string `json:"metadata"` Metadata map[string]string `json:"metadata"`
Operation OperationKind `json:"operation"`
} }
// OperationKind defines an output binding operation
type OperationKind string
// Non exhaustive list of operations. A binding can add operations that are not in this list.
const (
GetOperation OperationKind = "get"
CreateOperation OperationKind = "create"
DeleteOperation OperationKind = "delete"
ListOperation OperationKind = "list"
)

View File

@ -23,3 +23,9 @@ type AppResponse struct {
State []state.SetRequest `json:"state"` State []state.SetRequest `json:"state"`
Concurrency string `json:"concurrency"` Concurrency string `json:"concurrency"`
} }
// InvokeResponse is the response object returned from an output binding
type InvokeResponse struct {
Data []byte `json:"data"`
Metadata map[string]string `json:"metadata"`
}

View File

@ -82,8 +82,12 @@ func (sg *SendGrid) Init(metadata bindings.Metadata) error {
return nil return nil
} }
func (sg *SendGrid) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
// Write does the work of sending message to SendGrid API // Write does the work of sending message to SendGrid API
func (sg *SendGrid) Write(req *bindings.WriteRequest) error { func (sg *SendGrid) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
// We allow two possible sources of the properties we need, // We allow two possible sources of the properties we need,
// the component metadata or request metadata, request takes priority if present // the component metadata or request metadata, request takes priority if present
@ -96,7 +100,7 @@ func (sg *SendGrid) Write(req *bindings.WriteRequest) error {
fromAddress = mail.NewEmail("", req.Metadata["emailFrom"]) fromAddress = mail.NewEmail("", req.Metadata["emailFrom"])
} }
if fromAddress == nil { if fromAddress == nil {
return fmt.Errorf("error SendGrid from email not supplied") return nil, fmt.Errorf("error SendGrid from email not supplied")
} }
// Build email to address, this is required // Build email to address, this is required
@ -108,7 +112,7 @@ func (sg *SendGrid) Write(req *bindings.WriteRequest) error {
toAddress = mail.NewEmail("", req.Metadata["emailTo"]) toAddress = mail.NewEmail("", req.Metadata["emailTo"])
} }
if toAddress == nil { if toAddress == nil {
return fmt.Errorf("error SendGrid to email not supplied") return nil, fmt.Errorf("error SendGrid to email not supplied")
} }
// Build email subject, this is required // Build email subject, this is required
@ -120,7 +124,7 @@ func (sg *SendGrid) Write(req *bindings.WriteRequest) error {
subject = req.Metadata["subject"] subject = req.Metadata["subject"]
} }
if subject == "" { if subject == "" {
return fmt.Errorf("error SendGrid subject not supplied") return nil, fmt.Errorf("error SendGrid subject not supplied")
} }
// Build email cc address, this is optional // Build email cc address, this is optional
@ -165,7 +169,7 @@ func (sg *SendGrid) Write(req *bindings.WriteRequest) error {
client := sendgrid.NewSendClient(sg.metadata.APIKey) client := sendgrid.NewSendClient(sg.metadata.APIKey)
resp, err := client.Send(email) resp, err := client.Send(email)
if err != nil { if err != nil {
return fmt.Errorf("error from SendGrid, sending email failed: %+v", err) return nil, fmt.Errorf("error from SendGrid, sending email failed: %+v", err)
} }
// Check SendGrid response is OK // Check SendGrid response is OK
@ -174,9 +178,9 @@ func (sg *SendGrid) Write(req *bindings.WriteRequest) error {
sendGridError := sendGridRestError{} sendGridError := sendGridRestError{}
json.NewDecoder(strings.NewReader(resp.Body)).Decode(&sendGridError) json.NewDecoder(strings.NewReader(resp.Body)).Decode(&sendGridError)
// Pass it back to the caller, so they have some idea what went wrong // Pass it back to the caller, so they have some idea what went wrong
return fmt.Errorf("error from SendGrid, sending email failed: %d %+v", resp.StatusCode, sendGridError) return nil, fmt.Errorf("error from SendGrid, sending email failed: %d %+v", resp.StatusCode, sendGridError)
} }
sg.logger.Info("sent email with SendGrid") sg.logger.Info("sent email with SendGrid")
return nil return nil, nil
} }

View File

@ -75,12 +75,16 @@ func (t *SMS) Init(metadata bindings.Metadata) error {
return nil return nil
} }
func (t *SMS) Write(req *bindings.WriteRequest) error { func (t *SMS) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.CreateOperation}
}
func (t *SMS) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
toNumberValue := t.metadata.toNumber toNumberValue := t.metadata.toNumber
if toNumberValue == "" { if toNumberValue == "" {
toNumberFromRequest, ok := req.Metadata[toNumber] toNumberFromRequest, ok := req.Metadata[toNumber]
if !ok || toNumberFromRequest == "" { if !ok || toNumberFromRequest == "" {
return errors.New("twilio missing \"toNumber\" field") return nil, errors.New("twilio missing \"toNumber\" field")
} }
toNumberValue = toNumberFromRequest toNumberValue = toNumberFromRequest
} }
@ -94,7 +98,7 @@ func (t *SMS) Write(req *bindings.WriteRequest) error {
twilioURL := fmt.Sprintf("%s%s/Messages.json", twilioURLBase, t.metadata.accountSid) twilioURL := fmt.Sprintf("%s%s/Messages.json", twilioURLBase, t.metadata.accountSid)
httpReq, err := http.NewRequest("POST", twilioURL, &vDr) httpReq, err := http.NewRequest("POST", twilioURL, &vDr)
if err != nil { if err != nil {
return err return nil, err
} }
httpReq.SetBasicAuth(t.metadata.accountSid, t.metadata.authToken) httpReq.SetBasicAuth(t.metadata.accountSid, t.metadata.authToken)
httpReq.Header.Add("Accept", "application/json") httpReq.Header.Add("Accept", "application/json")
@ -102,11 +106,11 @@ func (t *SMS) Write(req *bindings.WriteRequest) error {
resp, err := t.httpClient.Do(httpReq) resp, err := t.httpClient.Do(httpReq)
if err != nil { if err != nil {
return err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
if !(resp.StatusCode >= 200 && resp.StatusCode < 300) { if !(resp.StatusCode >= 200 && resp.StatusCode < 300) {
return fmt.Errorf("error from Twilio: %s", resp.Status) return nil, fmt.Errorf("error from Twilio: %s", resp.Status)
} }
return nil return nil, nil
} }

View File

@ -69,7 +69,7 @@ func TestWriteShouldSucceed(t *testing.T) {
t.Run("Should succeed with expected url and headers", func(t *testing.T) { t.Run("Should succeed with expected url and headers", func(t *testing.T) {
httpTransport.reset() httpTransport.reset()
err := tw.Write(&bindings.WriteRequest{ _, err := tw.Invoke(&bindings.InvokeRequest{
Data: []byte("hello world"), Data: []byte("hello world"),
Metadata: map[string]string{ Metadata: map[string]string{
toNumber: "toNumber", toNumber: "toNumber",
@ -104,7 +104,7 @@ func TestWriteShouldFail(t *testing.T) {
t.Run("Missing 'to' should fail", func(t *testing.T) { t.Run("Missing 'to' should fail", func(t *testing.T) {
httpTransport.reset() httpTransport.reset()
err := tw.Write(&bindings.WriteRequest{ _, err := tw.Invoke(&bindings.InvokeRequest{
Data: []byte("hello world"), Data: []byte("hello world"),
Metadata: map[string]string{}, Metadata: map[string]string{},
}) })
@ -116,7 +116,7 @@ func TestWriteShouldFail(t *testing.T) {
httpTransport.reset() httpTransport.reset()
httpErr := errors.New("twilio fake error") httpErr := errors.New("twilio fake error")
httpTransport.errToReturn = httpErr httpTransport.errToReturn = httpErr
err := tw.Write(&bindings.WriteRequest{ _, err := tw.Invoke(&bindings.InvokeRequest{
Data: []byte("hello world"), Data: []byte("hello world"),
Metadata: map[string]string{ Metadata: map[string]string{
toNumber: "toNumber", toNumber: "toNumber",
@ -130,7 +130,7 @@ func TestWriteShouldFail(t *testing.T) {
t.Run("Twilio call returns status not >=200 and <300", func(t *testing.T) { t.Run("Twilio call returns status not >=200 and <300", func(t *testing.T) {
httpTransport.reset() httpTransport.reset()
httpTransport.response.StatusCode = 401 httpTransport.response.StatusCode = 401
err := tw.Write(&bindings.WriteRequest{ _, err := tw.Invoke(&bindings.InvokeRequest{
Data: []byte("hello world"), Data: []byte("hello world"),
Metadata: map[string]string{ Metadata: map[string]string{
toNumber: "toNumber", toNumber: "toNumber",