From 619ac7e5ad561c6880075dfdfcb73387697d44ff Mon Sep 17 00:00:00 2001 From: Amulya Varote Date: Thu, 2 Dec 2021 02:45:19 -0800 Subject: [PATCH] Added pub sub documentation Signed-off-by: Amulya Varote --- .../pubsub/howto-publish-subscribe.md | 194 ++++++++++++++++++ 1 file changed, 194 insertions(+) diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/howto-publish-subscribe.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/howto-publish-subscribe.md index d701e7f69..3a3ffc973 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/howto-publish-subscribe.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/howto-publish-subscribe.md @@ -22,7 +22,11 @@ gRPC clients and SDKs have a dedicated content type parameter. ## Example: +<<<<<<< HEAD The below code example loosely describes an application that processes orders. In the example, there are two services - an order processing service and a checkout service. Both services have Dapr sidecars. The order processing service uses Dapr to publish a message to RabbitMQ and the checkout service subscribes to the topic in the message queue. +======= +The below code examples loosely describes an application that processes orders. In the examples, there are two services - an order processing service and a checkout service. Both services have Dapr sidecars. The order processing service uses Dapr to publish message and the checkout service subscribes to the message in Rabbit mq. +>>>>>>> 0e83af7a (Added pub sub documentation) Diagram showing state management of example service @@ -34,9 +38,15 @@ The first step is to setup the Pub/Sub component: {{< tabs "Self-Hosted (CLI)" Kubernetes >}} {{% codetab %}} +<<<<<<< HEAD The pubsub.yaml is created by default on your local machine when running `dapr init`. Verify by opening your components file under `%UserProfile%\.dapr\components\pubsub.yaml` on Windows or `~/.dapr/components/pubsub.yaml` on Linux/MacOS. In this example, RabbitMQ is used for publish and subscribe. Replace `pubsub.yaml` file contents with the below contents. +======= +pubsub.yaml is created by default on a local machine when running `dapr init`. Verify by opening your components file under `%UserProfile%\.dapr\components\pubsub.yaml` on Windows or `~/.dapr/components/pubsub.yaml` on Linux/MacOS. + +In this example, rabbit mq is used for publish and subscribe. Replace pubsub.yaml file contents with the below contents. +>>>>>>> 0e83af7a (Added pub sub documentation) ```yaml apiVersion: dapr.io/v1alpha1 @@ -148,7 +158,11 @@ You can also override the default directory by pointing the Dapr CLI to a compon {{% codetab %}} ```bash +<<<<<<< HEAD dapr run --app-id myapp --components-path ./myComponents -- dotnet run +======= +dapr run --app-id myapp --components-path ./myComponents -- +>>>>>>> 0e83af7a (Added pub sub documentation) ``` {{% /codetab %}} @@ -163,6 +177,7 @@ dapr run --app-id myapp --components-path ./myComponents -- mvn spring-boot:run {{% codetab %}} +<<<<<<< HEAD ```bash dapr run --app-id myapp --components-path ./myComponents -- python3 app.py ``` @@ -184,10 +199,47 @@ dapr run --app-id myapp --components-path ./myComponents -- npm start ``` {{% /codetab %}} +======= +Below are code examples that leverage Dapr SDKs to subscribe to a topic. + +{{< tabs Dotnet Java Python Go Javascript>}} + +{{% codetab %}} + +```csharp + +//dependencies +using System.Collections.Generic; +using System.Threading.Tasks; +using System; +using Microsoft.AspNetCore.Mvc; +using Dapr; +using Dapr.Client; + +//code +namespace CheckoutService.controller +{ + [ApiController] + public class CheckoutServiceController : Controller + { + //Subscribe to a topic + [Topic("order_pub_sub", "orders")] + [HttpPost("checkout")] + public void getCheckout([FromBody] int orderId) + { + Console.WriteLine("Subscriber received : " + orderId); + } + } +} +``` + +Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: +>>>>>>> 0e83af7a (Added pub sub documentation) {{% codetab %}} In Kubernetes, save the CRD to a file and apply it to the cluster: ```bash +<<<<<<< HEAD kubectl apply -f subscription.yaml ``` {{% /codetab %}} @@ -229,6 +281,11 @@ Navigate to the directory containing the above code, then run the following comm dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 --app-ssl dotnet run ``` +======= +dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 --app-ssl dotnet run +``` + +>>>>>>> 0e83af7a (Added pub sub documentation) {{% /codetab %}} {{% codetab %}} @@ -237,6 +294,13 @@ dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-por //dependencies import io.dapr.Topic; import io.dapr.client.domain.CloudEvent; +<<<<<<< HEAD +======= +import org.springframework.web.bind.annotation.*; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +>>>>>>> 0e83af7a (Added pub sub documentation) import reactor.core.publisher.Mono; //code @@ -273,6 +337,11 @@ dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-por #dependencies from cloudevents.sdk.event import v1 from dapr.ext.grpc import App +<<<<<<< HEAD +======= +import logging +import json +>>>>>>> 0e83af7a (Added pub sub documentation) #code app = App() @@ -282,15 +351,25 @@ logging.basicConfig(level = logging.INFO) @app.subscribe(pubsub_name='order_pub_sub', topic='orders') def mytopic(event: v1.Event) -> None: data = json.loads(event.Data()) +<<<<<<< HEAD logging.info('Subscriber received: ' + str(data)) app.run(6002) +======= + logging.info('Subscriber received: ' + data) + +app.run(60002) +>>>>>>> 0e83af7a (Added pub sub documentation) ``` Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: ```bash +<<<<<<< HEAD dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --app-protocol grpc -- python3 CheckoutService.py +======= +dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 -- python3 CheckoutService.py +>>>>>>> 0e83af7a (Added pub sub documentation) ``` {{% /codetab %}} @@ -394,7 +473,11 @@ To publish a topic you need to run an instance of a Dapr sidecar to use the pubs Start an instance of Dapr with an app-id called `orderprocessing`: ```bash +<<<<<<< HEAD dapr run --app-id orderprocessing --dapr-http-port 3601 +======= +dapr run --app-id orderprocessing --dapr-http-port 3500 +>>>>>>> 0e83af7a (Added pub sub documentation) ``` {{< tabs "Dapr CLI" "HTTP API (Bash)" "HTTP API (PowerShell)">}} @@ -403,21 +486,33 @@ dapr run --app-id orderprocessing --dapr-http-port 3601 Then publish a message to the `orders` topic: ```bash +<<<<<<< HEAD dapr publish --publish-app-id orderprocessing --pubsub order_pub_sub --topic orders --data '{"orderId": "100"}' +======= +dapr publish --publish-app-id testpubsub --pubsub pubsub --topic orders --data '{"orderId": "100"}' +>>>>>>> 0e83af7a (Added pub sub documentation) ``` {{% /codetab %}} {{% codetab %}} Then publish a message to the `orders` topic: ```bash +<<<<<<< HEAD curl -X POST http://localhost:3601/v1.0/publish/order_pub_sub/orders -H "Content-Type: application/json" -d '{"orderId": "100"}' +======= +curl -X POST http://localhost:3601/v1.0/publish/pubsub/orders -H "Content-Type: application/json" -d '{"orderId": "100"}' +>>>>>>> 0e83af7a (Added pub sub documentation) ``` {{% /codetab %}} {{% codetab %}} Then publish a message to the `orders` topic: ```powershell +<<<<<<< HEAD Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"orderId": "100"}' -Uri 'http://localhost:3601/v1.0/publish/order_pub_sub/orders' +======= +Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"orderId": "100"}' -Uri 'http://localhost:3601/v1.0/publish/pubsub/orders' +>>>>>>> 0e83af7a (Added pub sub documentation) ``` {{% /codetab %}} @@ -434,7 +529,18 @@ Below are code examples that leverage Dapr SDKs to publish a topic. ```csharp //dependencies +<<<<<<< HEAD using Dapr.Client; +======= +using System; +using System.Collections.Generic; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Threading.Tasks; +using Dapr.Client; +using Microsoft.AspNetCore.Mvc; +using System.Threading; +>>>>>>> 0e83af7a (Added pub sub documentation) //code namespace EventService @@ -449,7 +555,11 @@ namespace EventService CancellationTokenSource source = new CancellationTokenSource(); CancellationToken cancellationToken = source.Token; using var client = new DaprClientBuilder().Build(); +<<<<<<< HEAD //Using Dapr SDK to publish a topic +======= + //Using Dapr SDK to publish to a topic +>>>>>>> 0e83af7a (Added pub sub documentation) await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken); Console.WriteLine("Published data: " + orderId); } @@ -474,6 +584,10 @@ import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; import io.dapr.client.domain.Metadata; import static java.util.Collections.singletonMap; +<<<<<<< HEAD +======= +import org.springframework.boot.autoconfigure.SpringBootApplication; +>>>>>>> 0e83af7a (Added pub sub documentation) //code @SpringBootApplication @@ -486,7 +600,11 @@ public class OrderProcessingServiceApplication { int orderId = 100; DaprClient client = new DaprClientBuilder().build(); +<<<<<<< HEAD //Using Dapr SDK to publish a topic +======= + //Using Dapr SDK to publish to a topic +>>>>>>> 0e83af7a (Added pub sub documentation) client.publishEvent( PUBSUB_NAME, TOPIC_NAME, @@ -496,11 +614,19 @@ public class OrderProcessingServiceApplication { } } +<<<<<<< HEAD ``` Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: +======= + +``` + +Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: + +>>>>>>> 0e83af7a (Added pub sub documentation) ```bash dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run ``` @@ -510,13 +636,23 @@ dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-g {{% codetab %}} ```python +<<<<<<< HEAD #dependencies +======= +#dependencies +import random +from time import sleep +import requests +import logging +import json +>>>>>>> 0e83af7a (Added pub sub documentation) from dapr.clients import DaprClient #code logging.basicConfig(level = logging.INFO) orderId = 100 +<<<<<<< HEAD with DaprClient() as client: #Using Dapr SDK to publish a topic result = client.publish_event( @@ -533,6 +669,24 @@ Navigate to the directory containing the above code, then run the following comm ```bash dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --app-protocol grpc python3 OrderProcessingService.py +======= + with DaprClient() as client: + #Using Dapr SDK to publish to a topic + result = client.publish_event( + pubsub_name='order_pub_sub', + topic_name='orders', + data=json.dumps(orderId), + data_content_type='application/json', + ) + logging.info('Published data: ' + str(orderId)) + +``` + +Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: + +```bash +dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py +>>>>>>> 0e83af7a (Added pub sub documentation) ``` {{% /codetab %}} @@ -563,7 +717,11 @@ func main() { } defer client.Close() ctx := context.Background() +<<<<<<< HEAD //Using Dapr SDK to publish a topic +======= + //Using Dapr SDK to publish to a topic +>>>>>>> 0e83af7a (Added pub sub documentation) if err := client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId))); err != nil { panic(err) @@ -572,6 +730,7 @@ func main() { } ``` +<<<<<<< HEAD Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: @@ -581,6 +740,17 @@ dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-g {{% /codetab %}} +======= + +Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: + +```bash +dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go +``` + +{{% /codetab %}} + +>>>>>>> 0e83af7a (Added pub sub documentation) {{% codetab %}} ```javascript @@ -591,7 +761,11 @@ import { DaprServer, DaprClient, CommunicationProtocolEnum } from 'dapr-client'; const daprHost = "127.0.0.1"; var main = function() { +<<<<<<< HEAD var orderId = 100; +======= + var orderId = Math.floor(Math.random() * (1000 - 1) + 1); +>>>>>>> 0e83af7a (Added pub sub documentation) start(orderId).catch((e) => { console.error(e); process.exit(1); @@ -601,7 +775,11 @@ var main = function() { async function start(orderId) { const client = new DaprClient(daprHost, process.env.DAPR_HTTP_PORT, CommunicationProtocolEnum.HTTP); console.log("Published data:" + orderId) +<<<<<<< HEAD //Using Dapr SDK to publish a topic +======= + //Using Dapr SDK to publish to a topic +>>>>>>> 0e83af7a (Added pub sub documentation) await client.pubsub.publish("order_pub_sub", "orders", orderId); } @@ -621,7 +799,11 @@ dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-g ## Step 4: ACK-ing a message +<<<<<<< HEAD In order to tell Dapr that a message was processed successfully, return a `200 OK` response. If Dapr receives any other return status code than `200`, or if your app crashes, Dapr will attempt to redeliver the message following at-least-once semantics. +======= +In order to tell Dapr that a message was processed successfully, return a `200 OK` response. If Dapr receives any other return status code than `200`, or if your app crashes, Dapr will attempt to redeliver the message following At-Least-Once semantics. +>>>>>>> 0e83af7a (Added pub sub documentation) ## Sending a custom CloudEvent @@ -637,21 +819,33 @@ Read about content types [here](#content-types), and about the [Cloud Events mes {{% codetab %}} Publish a custom CloudEvent to the `orders` topic: ```bash +<<<<<<< HEAD dapr publish --publish-app-id orderprocessing --pubsub order_pub_sub --topic orders --data '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' +======= +dapr publish --publish-app-id testpubsub --pubsub pubsub --topic orders --data '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' +>>>>>>> 0e83af7a (Added pub sub documentation) ``` {{% /codetab %}} {{% codetab %}} Publish a custom CloudEvent to the `orders` topic: ```bash +<<<<<<< HEAD curl -X POST http://localhost:3601/v1.0/publish/order_pub_sub/orders -H "Content-Type: application/cloudevents+json" -d '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' +======= +curl -X POST http://localhost:3601/v1.0/publish/pubsub/orders -H "Content-Type: application/cloudevents+json" -d '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' +>>>>>>> 0e83af7a (Added pub sub documentation) ``` {{% /codetab %}} {{% codetab %}} Publish a custom CloudEvent to the `orders` topic: ```powershell +<<<<<<< HEAD Invoke-RestMethod -Method Post -ContentType 'application/cloudevents+json' -Body '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' -Uri 'http://localhost:3601/v1.0/publish/order_pub_sub/orders' +======= +Invoke-RestMethod -Method Post -ContentType 'application/cloudevents+json' -Body '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' -Uri 'http://localhost:3601/v1.0/publish/pubsub/orders' +>>>>>>> 0e83af7a (Added pub sub documentation) ``` {{% /codetab %}}