Bidirectional Subscriptions (#578)

* Bidirectional Subscriptions

Adds support for bidirectional subscriptions to PubSubs. Adds two
methods for subscribing- one using a callback and one using an
imperative approach. Both giving support to different programming styles
or use cases.

Adds example with tests.

Signed-off-by: joshvanl <me@joshvanl.dev>

* Linting: Remove unused `closeCh`

Signed-off-by: joshvanl <me@joshvanl.dev>

* Fixes comment order in bidisub.go

Signed-off-by: joshvanl <me@joshvanl.dev>

* Add comment about processing message

Signed-off-by: joshvanl <me@joshvanl.dev>

* Adds dead letter topic example

Signed-off-by: joshvanl <me@joshvanl.dev>

* chore: remove go.mod

Signed-off-by: mikeee <hey@mike.ee>

* Updates go mod to v1.14.0-rc.1

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
Signed-off-by: mikeee <hey@mike.ee>
Signed-off-by: Mike Nguyen <hey@mike.ee>
Co-authored-by: mikeee <hey@mike.ee>
This commit is contained in:
Josh van Leeuwen 2024-07-11 15:43:51 +01:00 committed by GitHub
parent a1e723bd29
commit b7b90e3f8e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 508 additions and 39 deletions

View File

@ -162,6 +162,7 @@ jobs:
"grpc-service",
"hello-world",
"pubsub",
"bidipubsub",
"service",
"socket",
"workflow",

View File

@ -162,6 +162,14 @@ type Client interface {
// UnsubscribeConfigurationItems can stop the subscription with target store's and id
UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error
// Subscribe subscribes to a pubsub topic and streams messages to the returned Subscription.
// Subscription must be closed after finishing with subscribing.
Subscribe(ctx context.Context, opts SubscriptionOptions) (*Subscription, error)
// SubscribeWithHandler subscribes to a pubsub topic and calls the given handler on topic events.
// The returned cancel function must be called after finishing with subscribing.
SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error)
// DeleteBulkState deletes content for multiple keys from store.
DeleteBulkState(ctx context.Context, storeName string, keys []string, meta map[string]string) error

View File

@ -382,7 +382,8 @@ func (c *GRPCClient) GetBulkState(ctx context.Context, storeName string, keys []
// GetState retrieves state from specific store using default consistency option.
func (c *GRPCClient) GetState(ctx context.Context, storeName, key string, meta map[string]string) (item *StateItem, err error) {
return c.GetStateWithConsistency(ctx, storeName, key, meta, StateConsistencyStrong)
i, err := c.GetStateWithConsistency(ctx, storeName, key, meta, StateConsistencyStrong)
return i, err
}
// GetStateWithConsistency retrieves state from specific store using provided state consistency.

220
client/subscribe.go Normal file
View File

@ -0,0 +1,220 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"encoding/json"
"errors"
"fmt"
"mime"
"strings"
"sync"
"sync/atomic"
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/service/common"
)
type SubscriptionHandleFunction func(event *common.TopicEvent) common.SubscriptionResponseStatus
type SubscriptionOptions struct {
PubsubName string
Topic string
DeadLetterTopic *string
Metadata map[string]string
}
type Subscription struct {
stream pb.Dapr_SubscribeTopicEventsAlpha1Client
// lock locks concurrent writes to subscription stream.
lock sync.Mutex
closed atomic.Bool
}
type SubscriptionMessage struct {
*common.TopicEvent
sub *Subscription
}
func (c *GRPCClient) Subscribe(ctx context.Context, opts SubscriptionOptions) (*Subscription, error) {
stream, err := c.subscribeInitialRequest(ctx, opts)
if err != nil {
return nil, err
}
return &Subscription{
stream: stream,
}, nil
}
func (c *GRPCClient) SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error) {
s, err := c.Subscribe(ctx, opts)
if err != nil {
return nil, err
}
go func() {
defer s.Close()
for {
msg, err := s.Receive()
if err != nil {
if !s.closed.Load() {
logger.Printf("Error receiving messages from subscription pubsub=%s topic=%s, closing subscription: %s",
opts.PubsubName, opts.Topic, err)
}
return
}
go func() {
if err := msg.respondStatus(handler(msg.TopicEvent)); err != nil {
logger.Printf("Error responding to topic with event status pubsub=%s topic=%s message_id=%s: %s",
opts.PubsubName, opts.Topic, msg.ID, err)
}
}()
}
}()
return s.Close, nil
}
func (s *Subscription) Close() error {
if !s.closed.CompareAndSwap(false, true) {
return errors.New("subscription already closed")
}
return s.stream.CloseSend()
}
func (s *Subscription) Receive() (*SubscriptionMessage, error) {
event, err := s.stream.Recv()
if err != nil {
return nil, err
}
data := any(event.GetData())
if len(event.GetData()) > 0 {
mediaType, _, err := mime.ParseMediaType(event.GetDataContentType())
if err == nil {
var v interface{}
switch mediaType {
case "application/json":
if err := json.Unmarshal(event.GetData(), &v); err == nil {
data = v
}
case "text/plain":
// Assume UTF-8 encoded string.
data = string(event.GetData())
default:
if strings.HasPrefix(mediaType, "application/") &&
strings.HasSuffix(mediaType, "+json") {
if err := json.Unmarshal(event.GetData(), &v); err == nil {
data = v
}
}
}
}
}
topicEvent := &common.TopicEvent{
ID: event.GetId(),
Source: event.GetSource(),
Type: event.GetType(),
SpecVersion: event.GetSpecVersion(),
DataContentType: event.GetDataContentType(),
Data: data,
RawData: event.GetData(),
Topic: event.GetTopic(),
PubsubName: event.GetPubsubName(),
}
return &SubscriptionMessage{
sub: s,
TopicEvent: topicEvent,
}, nil
}
func (s *SubscriptionMessage) Success() error {
return s.respond(pb.TopicEventResponse_SUCCESS)
}
func (s *SubscriptionMessage) Retry() error {
return s.respond(pb.TopicEventResponse_RETRY)
}
func (s *SubscriptionMessage) Drop() error {
return s.respond(pb.TopicEventResponse_DROP)
}
func (s *SubscriptionMessage) respondStatus(status common.SubscriptionResponseStatus) error {
var statuspb pb.TopicEventResponse_TopicEventResponseStatus
switch status {
case common.SubscriptionResponseStatusSuccess:
statuspb = pb.TopicEventResponse_SUCCESS
case common.SubscriptionResponseStatusRetry:
statuspb = pb.TopicEventResponse_RETRY
case common.SubscriptionResponseStatusDrop:
statuspb = pb.TopicEventResponse_DROP
default:
return fmt.Errorf("unknown status, expected one of %s, %s, %s: %s",
common.SubscriptionResponseStatusSuccess, common.SubscriptionResponseStatusRetry,
common.SubscriptionResponseStatusDrop, status)
}
return s.respond(statuspb)
}
func (s *SubscriptionMessage) respond(status pb.TopicEventResponse_TopicEventResponseStatus) error {
s.sub.lock.Lock()
defer s.sub.lock.Unlock()
return s.sub.stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventResponse{
EventResponse: &pb.SubscribeTopicEventsResponseAlpha1{
Id: s.ID,
Status: &pb.TopicEventResponse{Status: status},
},
},
})
}
func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts SubscriptionOptions) (pb.Dapr_SubscribeTopicEventsAlpha1Client, error) {
if len(opts.PubsubName) == 0 {
return nil, errors.New("pubsub name required")
}
if len(opts.Topic) == 0 {
return nil, errors.New("topic required")
}
stream, err := c.protoClient.SubscribeTopicEventsAlpha1(ctx)
if err != nil {
return nil, err
}
err = stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_InitialRequest{
InitialRequest: &pb.SubscribeTopicEventsInitialRequestAlpha1{
PubsubName: opts.PubsubName, Topic: opts.Topic,
Metadata: opts.Metadata, DeadLetterTopic: opts.DeadLetterTopic,
},
},
})
if err != nil {
return nil, errors.Join(err, stream.CloseSend())
}
return stream, nil
}

View File

@ -0,0 +1,71 @@
# Dapr PubSub Example with go-sdk
This folder contains two Go files that use the Go SDK to invoke the Dapr Pub/Sub API.
## Diagram
![](https://i.loli.net/2020/08/23/5MBYgwqCZcXNUf2.jpg)
## Step
### Prepare
- Dapr installed
### Run Subscriber Server
<!-- STEP
name: Run Subscriber Server
output_match_mode: substring
match_order: none
expected_stdout_lines:
- 'event - PubsubName: messages, Topic: neworder'
- 'event - PubsubName: messages, Topic: neworder'
- 'event - PubsubName: messages, Topic: neworder'
- 'event - PubsubName: messages, Topic: sendorder'
- 'event - PubsubName: messages, Topic: sendorder'
- 'event - PubsubName: messages, Topic: sendorder'
expected_stderr_lines:
background: true
sleep: 15
-->
```bash
dapr run --app-id sub \
--dapr-http-port 3500 \
--log-level debug \
--resources-path ./config \
go run bidisub/bidisub.go
```
<!-- END_STEP -->
### Run Publisher
<!-- STEP
name: Run publisher
output_match_mode: substring
expected_stdout_lines:
- 'sending message'
- 'message published'
- 'sending multiple messages'
- 'multiple messages published'
expected_stderr_lines:
background: true
sleep: 15
-->
```bash
dapr run --app-id pub \
--log-level debug \
--resources-path ./config \
go run pub/pub.go
```
<!-- END_STEP -->
## Result
```shell
== APP == 2023/03/29 21:36:07 event - PubsubName: messages, Topic: neworder, ID: 82427280-1c18-4fab-b901-c7e68d295d31, Data: ping123
```

View File

@ -0,0 +1,92 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"context"
"errors"
"fmt"
"log"
"time"
daprd "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
)
func main() {
client, err := daprd.NewClient()
if err != nil {
log.Fatal(err)
}
var deadLetterTopic = "deadletter"
// Streaming subscription for topic "sendorder" on pubsub component
// "messages". The given subscription handler is called when a message is
// received. The returned `stop` function is used to stop the subscription
// and close the connection.
stop, err := client.SubscribeWithHandler(context.Background(),
daprd.SubscriptionOptions{
PubsubName: "messages",
Topic: "sendorder",
DeadLetterTopic: &deadLetterTopic,
},
eventHandler,
)
if err != nil {
log.Fatal(err)
}
// Another method of streaming subscriptions, this time for the topic "neworder".
// The returned `sub` object is used to receive messages.
// `sub` must be closed once it's no longer needed.
sub, err := client.Subscribe(context.Background(), daprd.SubscriptionOptions{
PubsubName: "pubsub",
Topic: "neworder",
DeadLetterTopic: &deadLetterTopic,
})
if err != nil {
log.Fatal(err)
}
fmt.Printf(">>Created subscription\n")
for i := 0; i < 3; i++ {
msg, err := sub.Receive()
if err != nil {
log.Fatalf("error receiving message: %v", err)
}
log.Printf(">>Received message\n")
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", msg.PubsubName, msg.Topic, msg.ID, msg.RawData)
// Use _MUST_ always signal the result of processing the message, else the
// message will not be considered as processed and will be redelivered or
// dead lettered.
if err := msg.Success(); err != nil {
log.Fatalf("error sending message success: %v", err)
}
}
time.Sleep(time.Second * 5)
if err := errors.Join(stop(), sub.Close()); err != nil {
log.Fatal(err)
}
}
func eventHandler(e *common.TopicEvent) common.SubscriptionResponseStatus {
log.Printf(">>Received message\n")
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", e.PubsubName, e.Topic, e.ID, e.Data)
return common.SubscriptionResponseStatusSuccess
}

View File

@ -0,0 +1,12 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messages
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""

View File

@ -0,0 +1,60 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"context"
"log"
dapr "github.com/dapr/go-sdk/client"
)
var (
// set the environment as instructions.
pubsubName = "messages"
topicName1 = "sendorder"
topicName2 = "neworder"
)
func main() {
ctx := context.Background()
publishEventData := []byte("ping123")
publishEventsData := []interface{}{"multi-ping", "multi-pong"}
client, err := dapr.NewClient()
if err != nil {
log.Fatalf("error creating dapr client: %v", err)
}
defer client.Close()
// Publish a single event
log.Println("sending message")
if err := client.PublishEvent(ctx, pubsubName, topicName1, publishEventData); err != nil {
log.Fatalf("error publishing event: %v", err)
}
if err := client.PublishEvent(ctx, pubsubName, topicName2, publishEventData); err != nil {
log.Fatalf("error publishing event: %v", err)
}
log.Println("message published")
// Publish multiple events
log.Println("sending multiple messages")
if res := client.PublishEvents(ctx, pubsubName, topicName1, publishEventsData); res.Error != nil {
log.Fatalf("error publishing events: %v", res.Error)
}
if res := client.PublishEvents(ctx, pubsubName, topicName2, publishEventsData); res.Error != nil {
log.Fatalf("error publishing events: %v", res.Error)
}
log.Println("multiple messages published")
}

24
go.mod
View File

@ -1,34 +1,36 @@
module github.com/dapr/go-sdk
go 1.21.8
go 1.22.4
toolchain go1.22.5
require (
github.com/dapr/dapr v1.13.4
github.com/dapr/dapr v1.14.0-rc.1
github.com/go-chi/chi/v5 v5.1.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
github.com/microsoft/durabletask-go v0.4.1-0.20240621011625-bfcc3331ca58
github.com/stretchr/testify v1.8.4
github.com/microsoft/durabletask-go v0.5.0
github.com/stretchr/testify v1.9.0
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.1
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
)
require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/marusama/semaphore/v2 v2.5.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.opentelemetry.io/otel v1.23.1 // indirect
go.opentelemetry.io/otel/metric v1.23.1 // indirect
go.opentelemetry.io/otel/trace v1.23.1 // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
go.opentelemetry.io/otel/trace v1.27.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)

44
go.sum
View File

@ -1,15 +1,15 @@
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dapr/dapr v1.13.4 h1:cwWA8qx7ALbx8YLOSI0ZJ6dSqxGSeHkqU5f/CVcrfvE=
github.com/dapr/dapr v1.13.4/go.mod h1:v7xjV+3dP8zKaSlvUJRKoPsBby2CosobCBTZzHbahcs=
github.com/dapr/dapr v1.14.0-rc.1 h1:4P376+PIU66hMtLz5TiF41IJ6Lh5FNY1DiwaNNYZv/8=
github.com/dapr/dapr v1.14.0-rc.1/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
@ -28,23 +28,23 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
github.com/microsoft/durabletask-go v0.4.1-0.20240621011625-bfcc3331ca58 h1:+HZ6RzZz6YBfA+Chtn0SnMU2OgY6nafl2sGbZ9FmerY=
github.com/microsoft/durabletask-go v0.4.1-0.20240621011625-bfcc3331ca58/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/microsoft/durabletask-go v0.5.0 h1:4DWBgg05wnkV/VwakaiPqZ4cARvATP74ZQJFcXVMC18=
github.com/microsoft/durabletask-go v0.5.0/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opentelemetry.io/otel v1.23.1 h1:Za4UzOqJYS+MUczKI320AtqZHZb7EqxO00jAHE0jmQY=
go.opentelemetry.io/otel v1.23.1/go.mod h1:Td0134eafDLcTS4y+zQ26GE8u3dEuRBiBCTUIRHaikA=
go.opentelemetry.io/otel/metric v1.23.1 h1:PQJmqJ9u2QaJLBOELl1cxIdPcpbwzbkjfEyelTl2rlo=
go.opentelemetry.io/otel/metric v1.23.1/go.mod h1:mpG2QPlAfnK8yNhNJAxDZruU9Y1/HubbC+KyH8FaCWI=
go.opentelemetry.io/otel/trace v1.23.1 h1:4LrmmEd8AU2rFvU1zegmvqW7+kWarxtNOPyeL6HmYY8=
go.opentelemetry.io/otel/trace v1.23.1/go.mod h1:4IpnpJFwr1mo/6HL8XIPJaE9y0+u1KcVmuW7dwFSVrI=
go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg=
go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ=
go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik=
go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak=
go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw=
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a h1:Q8/wZp0KX97QFTc2ywcOE0YRjZPVIx+MXInMzdvQqcA=
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY=
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -71,12 +71,12 @@ golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d h1:k3zyW3BYYR30e8v3x0bTDdE9vpYFjZHK+HcyqkrppWk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@ -105,16 +105,18 @@ type Subscription struct {
DisableTopicValidation bool `json:"disableTopicValidation"`
}
type SubscriptionResponseStatus string
const (
// SubscriptionResponseStatusSuccess means message is processed successfully.
SubscriptionResponseStatusSuccess = "SUCCESS"
SubscriptionResponseStatusSuccess SubscriptionResponseStatus = "SUCCESS"
// SubscriptionResponseStatusRetry means message to be retried by Dapr.
SubscriptionResponseStatusRetry = "RETRY"
SubscriptionResponseStatusRetry SubscriptionResponseStatus = "RETRY"
// SubscriptionResponseStatusDrop means warning is logged and message is dropped.
SubscriptionResponseStatusDrop = "DROP"
SubscriptionResponseStatusDrop SubscriptionResponseStatus = "DROP"
)
// SubscriptionResponse represents the response handling hint from subscriber to Dapr.
type SubscriptionResponse struct {
Status string `json:"status"`
Status SubscriptionResponseStatus `json:"status"`
}

View File

@ -327,7 +327,7 @@ func getCustomMetdataFromHeaders(r *http.Request) map[string]string {
return md
}
func writeStatus(w http.ResponseWriter, s string) {
func writeStatus(w http.ResponseWriter, s common.SubscriptionResponseStatus) {
status := &common.SubscriptionResponse{Status: s}
if err := json.NewEncoder(w).Encode(status); err != nil {
http.Error(w, err.Error(), PubSubHandlerRetryStatusCode)