diff --git a/.github/workflows/validate_examples.yaml b/.github/workflows/validate_examples.yaml index a39d954..de1563d 100644 --- a/.github/workflows/validate_examples.yaml +++ b/.github/workflows/validate_examples.yaml @@ -32,15 +32,15 @@ jobs: GOARCH: amd64 GOPROXY: https://proxy.golang.org DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/master/install/install.sh - DAPR_CLI_REF: 19b9de05611ade540b06d2c061f32f6c37093a17 + DAPR_CLI_REF: ${{ github.event.inputs.daprcli_commit }} DAPR_REF: ${{ github.event.inputs.daprdapr_commit }} CHECKOUT_REPO: ${{ github.repository }} CHECKOUT_REF: ${{ github.ref }} outputs: DAPR_INSTALL_URL: ${{ env.DAPR_INSTALL_URL }} - DAPR_CLI_VER: ${{ steps.outputs.outputs.DAPR_CLI_VER }} + DAPR_CLI_VER: 1.14.0-rc.6 DAPR_CLI_REF: ${{ steps.outputs.outputs.DAPR_CLI_REF }} - DAPR_RUNTIME_VER: 1.14.0-rc.2 + DAPR_RUNTIME_VER: 1.14.0-rc.4 CHECKOUT_REPO: ${{ steps.outputs.outputs.CHECKOUT_REPO }} CHECKOUT_REF: ${{ steps.outputs.outputs.CHECKOUT_REF }} DAPR_REF: ${{ steps.outputs.outputs.DAPR_REF }} @@ -150,7 +150,7 @@ jobs: GOPROXY: https://proxy.golang.org DAPR_INSTALL_URL: ${{ needs.setup.outputs.DAPR_INSTALL_URL }} DAPR_CLI_VER: ${{ needs.setup.outputs.DAPR_CLI_VER }} - DAPR_RUNTIME_VER: 1.14.0-rc.2 + DAPR_RUNTIME_VER: ${{ needs.setup.outputs.DAPR_RUNTIME_VER }} DAPR_CLI_REF: ${{ needs.setup.outputs.DAPR_CLI_REF }} DAPR_REF: ${{ needs.setup.outputs.DAPR_REF }} CHECKOUT_REPO: ${{ needs.setup.outputs.CHECKOUT_REPO }} @@ -169,7 +169,7 @@ jobs: "grpc-service", "hello-world", "pubsub", - "bidipubsub", + "streamsub", "service", "socket", "workflow", diff --git a/client/subscribe.go b/client/subscribe.go index 31931c9..e59da80 100644 --- a/client/subscribe.go +++ b/client/subscribe.go @@ -54,9 +54,11 @@ func (c *GRPCClient) Subscribe(ctx context.Context, opts SubscriptionOptions) (* return nil, err } - return &Subscription{ + s := &Subscription{ stream: stream, - }, nil + } + + return s, nil } func (c *GRPCClient) SubscribeWithHandler(ctx context.Context, opts SubscriptionOptions, handler SubscriptionHandleFunction) (func() error, error) { @@ -99,10 +101,11 @@ func (s *Subscription) Close() error { } func (s *Subscription) Receive() (*SubscriptionMessage, error) { - event, err := s.stream.Recv() + resp, err := s.stream.Recv() if err != nil { return nil, err } + event := resp.GetEventMessage() data := any(event.GetData()) if len(event.GetData()) > 0 { @@ -181,8 +184,8 @@ func (s *SubscriptionMessage) respond(status pb.TopicEventResponse_TopicEventRes defer s.sub.lock.Unlock() return s.sub.stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{ - SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventResponse{ - EventResponse: &pb.SubscribeTopicEventsResponseAlpha1{ + SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_EventProcessed{ + EventProcessed: &pb.SubscribeTopicEventsRequestProcessedAlpha1{ Id: s.ID, Status: &pb.TopicEventResponse{Status: status}, }, @@ -206,7 +209,7 @@ func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts Subscript err = stream.Send(&pb.SubscribeTopicEventsRequestAlpha1{ SubscribeTopicEventsRequestType: &pb.SubscribeTopicEventsRequestAlpha1_InitialRequest{ - InitialRequest: &pb.SubscribeTopicEventsInitialRequestAlpha1{ + InitialRequest: &pb.SubscribeTopicEventsRequestInitialAlpha1{ PubsubName: opts.PubsubName, Topic: opts.Topic, Metadata: opts.Metadata, DeadLetterTopic: opts.DeadLetterTopic, }, @@ -216,5 +219,16 @@ func (c *GRPCClient) subscribeInitialRequest(ctx context.Context, opts Subscript return nil, errors.Join(err, stream.CloseSend()) } + resp, err := stream.Recv() + if err != nil { + return nil, errors.Join(err, stream.CloseSend()) + } + + switch resp.GetSubscribeTopicEventsResponseType().(type) { + case *pb.SubscribeTopicEventsResponseAlpha1_InitialResponse: + default: + return nil, fmt.Errorf("unexpected initial response from server : %v", resp) + } + return stream, nil } diff --git a/examples/go.mod b/examples/go.mod index df4eb30..ca5a44a 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -18,7 +18,7 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/dapr/dapr v1.14.0-rc.2 // indirect + github.com/dapr/dapr v1.14.0-rc.5 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-chi/chi/v5 v5.1.0 // indirect github.com/go-logr/logr v1.4.2 // indirect diff --git a/examples/go.sum b/examples/go.sum index e2b13bf..0e25080 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -7,8 +7,8 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8= -github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8= +github.com/dapr/dapr v1.14.0-rc.5 h1:oTZPcT5fwda6bCMxrfenem6tOyeqW1nastxTwWInBCY= +github.com/dapr/dapr v1.14.0-rc.5/go.mod h1:IQWNthXF/I+qqlW4I0T+F4hCu74eKon4vjhpNvoBl8A= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/examples/bidipubsub/README.md b/examples/streamsub/README.md similarity index 97% rename from examples/bidipubsub/README.md rename to examples/streamsub/README.md index 2bde2b7..5555bd8 100644 --- a/examples/bidipubsub/README.md +++ b/examples/streamsub/README.md @@ -35,7 +35,7 @@ dapr run --app-id sub \ --dapr-http-port 3500 \ --log-level debug \ --resources-path ./config \ - go run bidisub/bidisub.go + go run sub/sub.go ``` diff --git a/examples/bidipubsub/config/pubsub.yaml b/examples/streamsub/config/pubsub.yaml similarity index 100% rename from examples/bidipubsub/config/pubsub.yaml rename to examples/streamsub/config/pubsub.yaml diff --git a/examples/bidipubsub/pub/pub.go b/examples/streamsub/pub/pub.go similarity index 100% rename from examples/bidipubsub/pub/pub.go rename to examples/streamsub/pub/pub.go diff --git a/examples/bidipubsub/bidisub/bidisub.go b/examples/streamsub/sub/sub.go similarity index 90% rename from examples/bidipubsub/bidisub/bidisub.go rename to examples/streamsub/sub/sub.go index 8d51db3..92211b3 100644 --- a/examples/bidipubsub/bidisub/bidisub.go +++ b/examples/streamsub/sub/sub.go @@ -45,27 +45,27 @@ func main() { eventHandler, ) if err != nil { - log.Fatal(err) + log.Fatalf("failed to subscribe to topic: %v", err) } + fmt.Printf(">>Created subscription messages/sendorder\n") // Another method of streaming subscriptions, this time for the topic "neworder". // The returned `sub` object is used to receive messages. // `sub` must be closed once it's no longer needed. - sub, err := client.Subscribe(context.Background(), daprd.SubscriptionOptions{ PubsubName: "messages", Topic: "neworder", DeadLetterTopic: &deadLetterTopic, }) if err != nil { - log.Fatal(err) + log.Fatalf("failed to subscribe to topic: %v", err) } - fmt.Printf(">>Created subscription\n") + fmt.Printf(">>Created subscription messages/neworder\n") for i := 0; i < 3; i++ { msg, err := sub.Receive() if err != nil { - log.Fatalf("error receiving message: %v", err) + log.Fatalf("Error receiving message: %v", err) } log.Printf(">>Received message\n") log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s\n", msg.PubsubName, msg.Topic, msg.ID, msg.RawData) diff --git a/go.mod b/go.mod index 3817e3b..0530f1a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/dapr/go-sdk go 1.22.5 require ( - github.com/dapr/dapr v1.14.0-rc.2 + github.com/dapr/dapr v1.14.0-rc.5 github.com/go-chi/chi/v5 v5.1.0 github.com/golang/mock v1.6.0 github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index cd8f26c..95e128f 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8= -github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8= +github.com/dapr/dapr v1.14.0-rc.5 h1:oTZPcT5fwda6bCMxrfenem6tOyeqW1nastxTwWInBCY= +github.com/dapr/dapr v1.14.0-rc.5/go.mod h1:IQWNthXF/I+qqlW4I0T+F4hCu74eKon4vjhpNvoBl8A= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=