diff --git a/bindings/Readme.md b/bindings/Readme.md index f77663e0c..fb12a284c 100644 --- a/bindings/Readme.md +++ b/bindings/Readme.md @@ -18,7 +18,7 @@ Input binding: ```go type InputBinding interface { Init(metadata Metadata) error - Read(handler func(*ReadResponse) error) error + Read(handler func(*ReadResponse) ([]byte, error) error } ``` diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index 1282b4144..209dd21fc 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -67,12 +67,12 @@ const ( // recordProcessorFactory type recordProcessorFactory struct { logger logger.Logger - handler func(*bindings.ReadResponse) error + handler func(*bindings.ReadResponse) ([]byte, error) } type recordProcessor struct { logger logger.Logger - handler func(*bindings.ReadResponse) error + handler func(*bindings.ReadResponse) ([]byte, error) } // NewAWSKinesis returns a new AWS Kinesis instance @@ -140,7 +140,7 @@ func (a *AWSKinesis) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeRespon return nil, err } -func (a *AWSKinesis) Read(handler func(*bindings.ReadResponse) error) error { +func (a *AWSKinesis) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { if a.metadata.KinesisConsumerMode == SharedThroughput { a.worker = worker.NewWorker(a.recordProcessorFactory(handler), a.workerConfig, nil) err := a.worker.Start() @@ -170,7 +170,7 @@ func (a *AWSKinesis) Read(handler func(*bindings.ReadResponse) error) error { } // Subscribe to all shards -func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDescription, handler func(*bindings.ReadResponse) error) error { +func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDescription, handler func(*bindings.ReadResponse) ([]byte, error)) error { consumerARN, err := a.ensureConsumer(streamDesc.StreamARN) if err != nil { a.logger.Error(err) @@ -320,7 +320,7 @@ func (a *AWSKinesis) parseMetadata(metadata bindings.Metadata) (*kinesisMetadata return &m, nil } -func (a *AWSKinesis) recordProcessorFactory(handler func(*bindings.ReadResponse) error) interfaces.IRecordProcessorFactory { +func (a *AWSKinesis) recordProcessorFactory(handler func(*bindings.ReadResponse) ([]byte, error)) interfaces.IRecordProcessorFactory { return &recordProcessorFactory{logger: a.logger, handler: handler} } diff --git a/bindings/aws/sqs/sqs.go b/bindings/aws/sqs/sqs.go index b414ddf7b..71c8d1e84 100644 --- a/bindings/aws/sqs/sqs.go +++ b/bindings/aws/sqs/sqs.go @@ -78,7 +78,7 @@ func (a *AWSSQS) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, return nil, err } -func (a *AWSSQS) Read(handler func(*bindings.ReadResponse) error) error { +func (a *AWSSQS) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { for { result, err := a.Client.ReceiveMessage(&sqs.ReceiveMessageInput{ QueueUrl: a.QueueURL, @@ -101,7 +101,7 @@ func (a *AWSSQS) Read(handler func(*bindings.ReadResponse) error) error { res := bindings.ReadResponse{ Data: []byte(*body), } - err := handler(&res) + _, err := handler(&res) if err == nil { msgHandle := m.ReceiptHandle diff --git a/bindings/azure/eventgrid/eventgrid.go b/bindings/azure/eventgrid/eventgrid.go index 72ebcb03f..909f88058 100644 --- a/bindings/azure/eventgrid/eventgrid.go +++ b/bindings/azure/eventgrid/eventgrid.go @@ -63,7 +63,7 @@ func (a *AzureEventGrid) Init(metadata bindings.Metadata) error { return nil } -func (a *AzureEventGrid) Read(handler func(*bindings.ReadResponse) error) error { +func (a *AzureEventGrid) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { err := a.ensureInputBindingMetadata() if err != nil { return err @@ -88,7 +88,7 @@ func (a *AzureEventGrid) Read(handler func(*bindings.ReadResponse) error) error case "POST": bodyBytes := ctx.PostBody() - err = handler(&bindings.ReadResponse{ + _, err = handler(&bindings.ReadResponse{ Data: bodyBytes, }) if err != nil { diff --git a/bindings/azure/eventhubs/eventhubs.go b/bindings/azure/eventhubs/eventhubs.go index 374581d70..231d37e1f 100644 --- a/bindings/azure/eventhubs/eventhubs.go +++ b/bindings/azure/eventhubs/eventhubs.go @@ -168,7 +168,7 @@ func (a *AzureEventHubs) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeRe } // Read gets messages from eventhubs in a non-blocking fashion -func (a *AzureEventHubs) Read(handler func(*bindings.ReadResponse) error) error { +func (a *AzureEventHubs) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { if !a.metadata.partitioned() { if err := a.RegisterEventProcessor(handler); err != nil { return err @@ -190,7 +190,7 @@ func (a *AzureEventHubs) Read(handler func(*bindings.ReadResponse) error) error } // RegisterPartitionedEventProcessor - receive eventhub messages by partitionID -func (a *AzureEventHubs) RegisterPartitionedEventProcessor(handler func(*bindings.ReadResponse) error) error { +func (a *AzureEventHubs) RegisterPartitionedEventProcessor(handler func(*bindings.ReadResponse) ([]byte, error)) error { ctx := context.Background() runtimeInfo, err := a.hub.GetRuntimeInformation(ctx) @@ -241,7 +241,7 @@ func contains(arr []string, str string) bool { // RegisterEventProcessor - receive eventhub messages by eventprocessor // host by balancing partitions -func (a *AzureEventHubs) RegisterEventProcessor(handler func(*bindings.ReadResponse) error) error { +func (a *AzureEventHubs) RegisterEventProcessor(handler func(*bindings.ReadResponse) ([]byte, error)) error { cred, err := azblob.NewSharedKeyCredential(a.metadata.storageAccountName, a.metadata.storageAccountKey) if err != nil { return err @@ -259,7 +259,9 @@ func (a *AzureEventHubs) RegisterEventProcessor(handler func(*bindings.ReadRespo _, err = processor.RegisterHandler(context.Background(), func(c context.Context, e *eventhub.Event) error { - return handler(&bindings.ReadResponse{Data: e.Data}) + _, err = handler(&bindings.ReadResponse{Data: e.Data}) + + return err }) if err != nil { return err diff --git a/bindings/azure/servicebusqueues/servicebusqueues.go b/bindings/azure/servicebusqueues/servicebusqueues.go index 62c1a2ca9..cee89dad5 100644 --- a/bindings/azure/servicebusqueues/servicebusqueues.go +++ b/bindings/azure/servicebusqueues/servicebusqueues.go @@ -158,9 +158,9 @@ func (a *AzureServiceBusQueues) Invoke(req *bindings.InvokeRequest) (*bindings.I return nil, a.client.Send(ctx, msg) } -func (a *AzureServiceBusQueues) Read(handler func(*bindings.ReadResponse) error) error { +func (a *AzureServiceBusQueues) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { var sbHandler servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error { - err := handler(&bindings.ReadResponse{ + _, err := handler(&bindings.ReadResponse{ Data: msg.Data, Metadata: map[string]string{id: msg.ID, correlationID: msg.CorrelationID, label: msg.Label}, }) diff --git a/bindings/azure/storagequeues/storagequeues.go b/bindings/azure/storagequeues/storagequeues.go index 81de8db74..a953b1edb 100644 --- a/bindings/azure/storagequeues/storagequeues.go +++ b/bindings/azure/storagequeues/storagequeues.go @@ -29,7 +29,7 @@ const ( ) type consumer struct { - callback func(*bindings.ReadResponse) error + callback func(*bindings.ReadResponse) ([]byte, error) } // QueueHelper enables injection for testnig @@ -107,7 +107,7 @@ func (d *AzureQueueHelper) Read(ctx context.Context, consumer *consumer) error { data = []byte(mt) } - err = consumer.callback(&bindings.ReadResponse{ + _, err = consumer.callback(&bindings.ReadResponse{ Data: data, Metadata: map[string]string{}, }) @@ -220,7 +220,7 @@ func (a *AzureStorageQueues) Invoke(req *bindings.InvokeRequest) (*bindings.Invo return nil, nil } -func (a *AzureStorageQueues) Read(handler func(*bindings.ReadResponse) error) error { +func (a *AzureStorageQueues) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { c := consumer{ callback: handler, } diff --git a/bindings/azure/storagequeues/storagequeues_test.go b/bindings/azure/storagequeues/storagequeues_test.go index 18ecdb403..a32aafe75 100644 --- a/bindings/azure/storagequeues/storagequeues_test.go +++ b/bindings/azure/storagequeues/storagequeues_test.go @@ -144,11 +144,11 @@ func TestReadQueue(t *testing.T) { assert.Nil(t, err) - handler := func(data *bindings.ReadResponse) error { + handler := func(data *bindings.ReadResponse) ([]byte, error) { s := string(data.Data) assert.Equal(t, s, "This is my message") - return nil + return nil, nil } go a.Read(handler) @@ -179,11 +179,11 @@ func TestReadQueueDecode(t *testing.T) { assert.Nil(t, err) - handler := func(data *bindings.ReadResponse) error { + handler := func(data *bindings.ReadResponse) ([]byte, error) { s := string(data.Data) assert.Equal(t, s, "This is my message") - return nil + return nil, nil } go a.Read(handler) @@ -211,10 +211,10 @@ func TestReadQueueDecode(t *testing.T) { assert.Nil(t, err) - var handler = func(data *bindings.ReadResponse) error { + var handler = func(data *bindings.ReadResponse) ([]byte, error) { s := string(data.Data) assert.Equal(t, s, "This is my message") - return nil + return nil, nil } _ = a.Read(handler) @@ -236,11 +236,11 @@ func TestReadQueueNoMessage(t *testing.T) { err := a.Init(m) assert.Nil(t, err) - handler := func(data *bindings.ReadResponse) error { + handler := func(data *bindings.ReadResponse) ([]byte, error) { s := string(data.Data) assert.Equal(t, s, "This is my message") - return nil + return nil, nil } go a.Read(handler) diff --git a/bindings/cron/cron.go b/bindings/cron/cron.go index a2c2e47f1..07a3af049 100644 --- a/bindings/cron/cron.go +++ b/bindings/cron/cron.go @@ -55,7 +55,7 @@ func (b *Binding) Init(metadata bindings.Metadata) error { } // Read triggers the Cron scheduler -func (b *Binding) Read(handler func(*bindings.ReadResponse) error) error { +func (b *Binding) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { c := cron.New(cron.WithParser(b.parser)) id, err := c.AddFunc(b.schedule, func() { b.logger.Debugf("schedule fired: %v", time.Now()) diff --git a/bindings/cron/cron_test.go b/bindings/cron/cron_test.go index ce1dab5cf..7206694fa 100644 --- a/bindings/cron/cron_test.go +++ b/bindings/cron/cron_test.go @@ -59,7 +59,7 @@ func TestCronReadWithDeleteInvoke(t *testing.T) { assert.NoErrorf(t, c.Init(getTestMetadata(schedule)), "error initializing valid schedule") testsNum := 3 i := 0 - err := c.Read(func(res *bindings.ReadResponse) error { + err := c.Read(func(res *bindings.ReadResponse) ([]byte, error) { assert.NotNil(t, res) assert.LessOrEqualf(t, i, testsNum, "Invoke didn't stop the schedule") i++ @@ -73,7 +73,7 @@ func TestCronReadWithDeleteInvoke(t *testing.T) { assert.Equal(t, schedule, scheduleVal) } - return nil + return nil, nil }) assert.NoErrorf(t, err, "error on read") } diff --git a/bindings/gcp/pubsub/pubsub.go b/bindings/gcp/pubsub/pubsub.go index 64e34035c..226e0a5fa 100644 --- a/bindings/gcp/pubsub/pubsub.go +++ b/bindings/gcp/pubsub/pubsub.go @@ -80,10 +80,10 @@ func (g *GCPPubSub) parseMetadata(metadata bindings.Metadata) ([]byte, error) { return b, err } -func (g *GCPPubSub) Read(handler func(*bindings.ReadResponse) error) error { +func (g *GCPPubSub) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { sub := g.client.Subscription(g.metadata.Subscription) err := sub.Receive(context.Background(), func(ctx context.Context, m *pubsub.Message) { - err := handler(&bindings.ReadResponse{ + _, err := handler(&bindings.ReadResponse{ Data: m.Data, Metadata: map[string]string{id: m.ID, publishTime: m.PublishTime.String()}, }) diff --git a/bindings/input_binding.go b/bindings/input_binding.go index 4749f3d13..f947ee16c 100644 --- a/bindings/input_binding.go +++ b/bindings/input_binding.go @@ -10,5 +10,5 @@ type InputBinding interface { // Init passes connection and properties metadata to the binding implementation Init(metadata Metadata) error // Read is a blocking method that triggers the callback function whenever an event arrives - Read(handler func(*ReadResponse) error) error + Read(handler func(*ReadResponse) ([]byte, error)) error } diff --git a/bindings/kafka/kafka.go b/bindings/kafka/kafka.go index 7321e9225..b91fb56d2 100644 --- a/bindings/kafka/kafka.go +++ b/bindings/kafka/kafka.go @@ -52,13 +52,13 @@ type kafkaMetadata struct { type consumer struct { ready chan bool - callback func(*bindings.ReadResponse) error + callback func(*bindings.ReadResponse) ([]byte, error) } func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { if consumer.callback != nil { - err := consumer.callback(&bindings.ReadResponse{ + _, err := consumer.callback(&bindings.ReadResponse{ Data: message.Value, }) if err == nil { @@ -207,7 +207,7 @@ func (k *Kafka) getSyncProducer(meta *kafkaMetadata) (sarama.SyncProducer, error return producer, nil } -func (k *Kafka) Read(handler func(*bindings.ReadResponse) error) error { +func (k *Kafka) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { config := sarama.NewConfig() config.Version = sarama.V1_0_0_0 // ignore SASL properties if authRequired is false diff --git a/bindings/kubernetes/kubernetes.go b/bindings/kubernetes/kubernetes.go index cb9b0e8dd..2a18b2f36 100644 --- a/bindings/kubernetes/kubernetes.go +++ b/bindings/kubernetes/kubernetes.go @@ -72,7 +72,7 @@ func (k *kubernetesInput) parseMetadata(metadata bindings.Metadata) error { return nil } -func (k *kubernetesInput) Read(handler func(*bindings.ReadResponse) error) error { +func (k *kubernetesInput) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { watchlist := cache.NewListWatchFromClient( k.kubeClient.CoreV1().RESTClient(), "events", diff --git a/bindings/mqtt/mqtt.go b/bindings/mqtt/mqtt.go index ffba19fbd..0d91a3932 100644 --- a/bindings/mqtt/mqtt.go +++ b/bindings/mqtt/mqtt.go @@ -95,7 +95,7 @@ func (m *MQTT) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, er return nil, nil } -func (m *MQTT) Read(handler func(*bindings.ReadResponse) error) error { +func (m *MQTT) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) diff --git a/bindings/rabbitmq/rabbitmq.go b/bindings/rabbitmq/rabbitmq.go index 993849af8..2f3ab4e98 100644 --- a/bindings/rabbitmq/rabbitmq.go +++ b/bindings/rabbitmq/rabbitmq.go @@ -220,7 +220,7 @@ func (r *RabbitMQ) declareQueue() (amqp.Queue, error) { return r.channel.QueueDeclare(r.metadata.QueueName, r.metadata.Durable, r.metadata.DeleteWhenUnused, r.metadata.Exclusive, false, args) } -func (r *RabbitMQ) Read(handler func(*bindings.ReadResponse) error) error { +func (r *RabbitMQ) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { msgs, err := r.channel.Consume( r.queue.Name, "", @@ -238,7 +238,7 @@ func (r *RabbitMQ) Read(handler func(*bindings.ReadResponse) error) error { go func() { for d := range msgs { - err := handler(&bindings.ReadResponse{ + _, err := handler(&bindings.ReadResponse{ Data: d.Body, }) if err == nil { diff --git a/bindings/responses.go b/bindings/responses.go index 2ff828c60..619a1ff00 100644 --- a/bindings/responses.go +++ b/bindings/responses.go @@ -9,7 +9,7 @@ import ( "github.com/dapr/components-contrib/state" ) -// ReadResponse is an the return object from an dapr input binding +// ReadResponse is the return object from an dapr input binding type ReadResponse struct { Data []byte `json:"data"` Metadata map[string]string `json:"metadata"` diff --git a/bindings/rethinkdb/statechange/statechange.go b/bindings/rethinkdb/statechange/statechange.go index e7a664469..81c49f9ed 100644 --- a/bindings/rethinkdb/statechange/statechange.go +++ b/bindings/rethinkdb/statechange/statechange.go @@ -60,7 +60,7 @@ func (b *Binding) Init(metadata bindings.Metadata) error { } // Read triggers the RethinkDB scheduler -func (b *Binding) Read(handler func(*bindings.ReadResponse) error) error { +func (b *Binding) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { b.logger.Infof("subscribing to state changes in %s.%s...", b.config.Database, b.config.Table) cursor, err := r.DB(b.config.Database).Table(b.config.Table).Changes(r.ChangesOpts{ IncludeTypes: true, @@ -94,7 +94,7 @@ func (b *Binding) Read(handler func(*bindings.ReadResponse) error) error { }, } - if err := handler(resp); err != nil { + if _, err := handler(resp); err != nil { b.logger.Errorf("error invoking change handler: %v", err) continue diff --git a/bindings/rethinkdb/statechange/statechange_test.go b/bindings/rethinkdb/statechange/statechange_test.go index f8038cf47..b78b4c75b 100644 --- a/bindings/rethinkdb/statechange/statechange_test.go +++ b/bindings/rethinkdb/statechange/statechange_test.go @@ -64,11 +64,11 @@ func TestBinding(t *testing.T) { assert.NoErrorf(t, err, "error initializing") go func() { - err = b.Read(func(res *bindings.ReadResponse) error { + err = b.Read(func(res *bindings.ReadResponse) ([]byte, error) { assert.NotNil(t, res) t.Logf("state change event:\n%s", string(res.Data)) - return nil + return nil, nil }) assert.NoErrorf(t, err, "error on read") }() diff --git a/bindings/twitter/twitter.go b/bindings/twitter/twitter.go index bbc78671f..002a1092d 100644 --- a/bindings/twitter/twitter.go +++ b/bindings/twitter/twitter.go @@ -76,7 +76,7 @@ func (t *Binding) Operations() []bindings.OperationKind { } // Read triggers the Twitter search and events on each result tweet -func (t *Binding) Read(handler func(*bindings.ReadResponse) error) error { +func (t *Binding) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { if t.query == "" { return nil } diff --git a/bindings/twitter/twitter_test.go b/bindings/twitter/twitter_test.go index 1550f1513..76ad9b035 100644 --- a/bindings/twitter/twitter_test.go +++ b/bindings/twitter/twitter_test.go @@ -60,11 +60,11 @@ func TestReadError(t *testing.T) { err := tw.Init(m) assert.Nilf(t, err, "error initializing valid metadata properties") - tw.Read(func(res *bindings.ReadResponse) error { + tw.Read(func(res *bindings.ReadResponse) ([]byte, error) { t.Logf("result: %+v", res) assert.NotNilf(t, err, "no error on read with invalid credentials") - return nil + return nil, nil }) } @@ -84,7 +84,7 @@ func TestReed(t *testing.T) { assert.Nilf(t, err, "error initializing read") counter := 0 - err = tw.Read(func(res *bindings.ReadResponse) error { + err = tw.Read(func(res *bindings.ReadResponse) ([]byte, error) { counter++ t.Logf("tweet[%d]", counter) var tweet twitter.Tweet @@ -92,7 +92,7 @@ func TestReed(t *testing.T) { assert.NotEmpty(t, tweet.IDStr, "tweet should have an ID") os.Exit(0) - return nil + return nil, nil }) assert.Nilf(t, err, "error on read") } diff --git a/state/redis/redis.go b/state/redis/redis.go index 4f57b4e12..6e22779be 100644 --- a/state/redis/redis.go +++ b/state/redis/redis.go @@ -102,7 +102,7 @@ func parseRedisMetadata(meta state.Metadata) (metadata, error) { if val, ok := meta.Properties[maxRetryBackoff]; ok && val != "" { parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize) if err != nil { - return m, fmt.Errorf("redis store error: can't parse maxRetries field: %s", err) + return m, fmt.Errorf("redis store error: can't parse maxRetryBackoff field: %s", err) } m.maxRetryBackoff = time.Duration(parsedVal) } diff --git a/tests/conformance/bindings/bindings.go b/tests/conformance/bindings/bindings.go index 908b336bd..774b10ab3 100644 --- a/tests/conformance/bindings/bindings.go +++ b/tests/conformance/bindings/bindings.go @@ -217,10 +217,10 @@ func ConformanceTests(t *testing.T, props map[string]string, inputBinding bindin } func readFromInputBinding(binding bindings.InputBinding, reads *int, readChan chan int) { - binding.Read(func(r *bindings.ReadResponse) error { + binding.Read(func(r *bindings.ReadResponse) ([]byte, error) { (*reads)++ readChan <- (*reads) - return nil + return nil, nil }) }