Update streaming subscription to understand new initial response (#601)

* Update streaming subscription to understand new initial response

Signed-off-by: joshvanl <me@joshvanl.dev>

* Update dapr CLI to 1.14.0-rc.6

Signed-off-by: joshvanl <me@joshvanl.dev>

* Update streamsub name in validate examples

Signed-off-by: joshvanl <me@joshvanl.dev>

* Apply suggestions from code review

Co-authored-by: Mike Nguyen <hey@mike.ee>
Signed-off-by: Josh van Leeuwen <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
Signed-off-by: Josh van Leeuwen <me@joshvanl.dev>
Co-authored-by: Mike Nguyen <hey@mike.ee>
This commit is contained in:
Josh van Leeuwen 2024-07-23 21:53:33 +01:00 committed by GitHub
parent 7c03c7ce58
commit 9bc7d823cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 37 additions and 23 deletions

View File

@ -32,15 +32,15 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/master/install/install.sh
DAPR_CLI_REF: 19b9de05611ade540b06d2c061f32f6c37093a17
DAPR_CLI_REF: ${{ github.event.inputs.daprcli_commit }}
DAPR_REF: ${{ github.event.inputs.daprdapr_commit }}
CHECKOUT_REPO: ${{ github.repository }}
CHECKOUT_REF: ${{ github.ref }}
outputs:
DAPR_INSTALL_URL: ${{ env.DAPR_INSTALL_URL }}
DAPR_CLI_VER: ${{ steps.outputs.outputs.DAPR_CLI_VER }}
DAPR_CLI_VER: 1.14.0-rc.6
DAPR_CLI_REF: ${{ steps.outputs.outputs.DAPR_CLI_REF }}
DAPR_RUNTIME_VER: 1.14.0-rc.2
DAPR_RUNTIME_VER: 1.14.0-rc.4
CHECKOUT_REPO: ${{ steps.outputs.outputs.CHECKOUT_REPO }}
CHECKOUT_REF: ${{ steps.outputs.outputs.CHECKOUT_REF }}
DAPR_REF: ${{ steps.outputs.outputs.DAPR_REF }}
@ -150,7 +150,7 @@ jobs:
GOPROXY: https://proxy.golang.org
DAPR_INSTALL_URL: ${{ needs.setup.outputs.DAPR_INSTALL_URL }}
DAPR_CLI_VER: ${{ needs.setup.outputs.DAPR_CLI_VER }}
DAPR_RUNTIME_VER: 1.14.0-rc.2
DAPR_RUNTIME_VER: ${{ needs.setup.outputs.DAPR_RUNTIME_VER }}
DAPR_CLI_REF: ${{ needs.setup.outputs.DAPR_CLI_REF }}
DAPR_REF: ${{ needs.setup.outputs.DAPR_REF }}
CHECKOUT_REPO: ${{ needs.setup.outputs.CHECKOUT_REPO }}
@ -169,7 +169,7 @@ jobs:
"grpc-service",
"hello-world",
"pubsub",
"bidipubsub",
"streamsub",
"service",
"socket",
"workflow",

View File

@ -54,9 +54,11 @@ func (c *GRPCClient) Subscribe(ctx context.Context, opts SubscriptionOptions) (*
return nil, err
}
return &Subscription{
s := &Subscription{
stream: stream,
}, nil
}
return s, nil
}
func (c *GRPCClient) SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error) {
@ -99,10 +101,11 @@ func (s *Subscription) Close() error {
}
func (s *Subscription) Receive() (*SubscriptionMessage, error) {
event, err := s.stream.Recv()
resp, err := s.stream.Recv()
if err != nil {
return nil, err
}
event := resp.GetEventMessage()
data := any(event.GetData())
if len(event.GetData()) > 0 {
@ -181,8 +184,8 @@ func (s *SubscriptionMessage) respond(status pb.TopicEventResponse_TopicEventRes
defer s.sub.lock.Unlock()
return s.sub.stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventResponse{
EventResponse: &pb.SubscribeTopicEventsResponseAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventProcessed{
EventProcessed: &pb.SubscribeTopicEventsRequestProcessedAlpha1{
Id: s.ID,
Status: &pb.TopicEventResponse{Status: status},
},
@ -206,7 +209,7 @@ func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts Subscript
err = stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_InitialRequest{
InitialRequest: &pb.SubscribeTopicEventsInitialRequestAlpha1{
InitialRequest: &pb.SubscribeTopicEventsRequestInitialAlpha1{
PubsubName: opts.PubsubName, Topic: opts.Topic,
Metadata: opts.Metadata, DeadLetterTopic: opts.DeadLetterTopic,
},
@ -216,5 +219,16 @@ func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts Subscript
return nil, errors.Join(err, stream.CloseSend())
}
resp, err := stream.Recv()
if err != nil {
return nil, errors.Join(err, stream.CloseSend())
}
switch resp.GetSubscribeTopicEventsResponseType().(type) {
case *pb.SubscribeTopicEventsResponseAlpha1_InitialResponse:
default:
return nil, fmt.Errorf("unexpected initial response from server : %v", resp)
}
return stream, nil
}

View File

@ -18,7 +18,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dapr/dapr v1.14.0-rc.2 // indirect
github.com/dapr/dapr v1.14.0-rc.5 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-chi/chi/v5 v5.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect

View File

@ -7,8 +7,8 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8=
github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8=
github.com/dapr/dapr v1.14.0-rc.5 h1:oTZPcT5fwda6bCMxrfenem6tOyeqW1nastxTwWInBCY=
github.com/dapr/dapr v1.14.0-rc.5/go.mod h1:IQWNthXF/I+qqlW4I0T+F4hCu74eKon4vjhpNvoBl8A=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -35,7 +35,7 @@ dapr run --app-id sub \
--dapr-http-port 3500 \
--log-level debug \
--resources-path ./config \
go run bidisub/bidisub.go
go run sub/sub.go
```
<!-- END_STEP -->

View File

@ -45,27 +45,27 @@ func main() {
eventHandler,
)
if err != nil {
log.Fatal(err)
log.Fatalf("failed to subscribe to topic: %v", err)
}
fmt.Printf(">>Created subscription messages/sendorder\n")
// Another method of streaming subscriptions, this time for the topic "neworder".
// The returned `sub` object is used to receive messages.
// `sub` must be closed once it's no longer needed.
sub, err := client.Subscribe(context.Background(), daprd.SubscriptionOptions{
PubsubName: "messages",
Topic: "neworder",
DeadLetterTopic: &deadLetterTopic,
})
if err != nil {
log.Fatal(err)
log.Fatalf("failed to subscribe to topic: %v", err)
}
fmt.Printf(">>Created subscription\n")
fmt.Printf(">>Created subscription messages/neworder\n")
for i := 0; i < 3; i++ {
msg, err := sub.Receive()
if err != nil {
log.Fatalf("error receiving message: %v", err)
log.Fatalf("Error receiving message: %v", err)
}
log.Printf(">>Received message\n")
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", msg.PubsubName, msg.Topic, msg.ID, msg.RawData)

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/dapr/go-sdk
go 1.22.5
require (
github.com/dapr/dapr v1.14.0-rc.2
github.com/dapr/dapr v1.14.0-rc.5
github.com/go-chi/chi/v5 v5.1.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0

4
go.sum
View File

@ -1,8 +1,8 @@
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8=
github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8=
github.com/dapr/dapr v1.14.0-rc.5 h1:oTZPcT5fwda6bCMxrfenem6tOyeqW1nastxTwWInBCY=
github.com/dapr/dapr v1.14.0-rc.5/go.mod h1:IQWNthXF/I+qqlW4I0T+F4hCu74eKon4vjhpNvoBl8A=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=