From 5cb62329b0e7401a3649bce51223a18a73b47c26 Mon Sep 17 00:00:00 2001 From: Amulya Varote Date: Sun, 5 Dec 2021 07:59:52 -0800 Subject: [PATCH] Added full code snippets of pub sub Signed-off-by: Amulya Varote --- .../pubsub/howto-publish-subscribe.md | 181 +++++++++++++++--- 1 file changed, 157 insertions(+), 24 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/howto-publish-subscribe.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/howto-publish-subscribe.md index 25f21ea85..17efe454a 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/howto-publish-subscribe.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/howto-publish-subscribe.md @@ -304,8 +304,11 @@ Below are code examples that leverage Dapr SDKs to subscribe to a topic. {{% codetab %}} ```csharp - //dependencies +using System.Collections.Generic; +using System.Threading.Tasks; +using System; +using Microsoft.AspNetCore.Mvc; using Dapr; using Dapr.Client; @@ -315,7 +318,7 @@ namespace CheckoutService.controller [ApiController] public class CheckoutServiceController : Controller { - //Subscribe to a topic + //Subscribe to a topic [Topic("order_pub_sub", "orders")] [HttpPost("checkout")] public void getCheckout([FromBody] int orderId) @@ -347,14 +350,20 @@ import io.dapr.Topic; import io.dapr.client.domain.CloudEvent; <<<<<<< HEAD <<<<<<< HEAD +<<<<<<< HEAD ======= +======= +>>>>>>> f30a0acb (Added full code snippets of pub sub) import org.springframework.web.bind.annotation.*; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +<<<<<<< HEAD >>>>>>> 0e83af7a (Added pub sub documentation) ======= >>>>>>> fbadd23a (Modified based on the review comments - 1) +======= +>>>>>>> f30a0acb (Added full code snippets of pub sub) import reactor.core.publisher.Mono; //code @@ -362,7 +371,7 @@ import reactor.core.publisher.Mono; public class CheckoutServiceController { private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class); - //Subscribe to a topic + //Subscribe to a topic @Topic(name = "orders", pubsubName = "order_pub_sub") @PostMapping(path = "/checkout") public Mono getCheckout(@RequestBody(required = false) CloudEvent cloudEvent) { @@ -393,17 +402,21 @@ from cloudevents.sdk.event import v1 from dapr.ext.grpc import App <<<<<<< HEAD <<<<<<< HEAD +<<<<<<< HEAD ======= import logging import json >>>>>>> 0e83af7a (Added pub sub documentation) ======= >>>>>>> fbadd23a (Modified based on the review comments - 1) +======= +import logging +import json +>>>>>>> f30a0acb (Added full code snippets of pub sub) #code app = App() logging.basicConfig(level = logging.INFO) - #Subscribe to a topic @app.subscribe(pubsub_name='order_pub_sub', topic='orders') def mytopic(event: v1.Event) -> None: @@ -463,7 +476,7 @@ var sub = &common.Subscription{ func main() { s := daprd.NewService(":6002") - //Subscribe to a topic + //Subscribe to a topic if err := s.AddTopicEventHandler(sub, eventHandler); err != nil { log.Fatalf("error adding topic subscription: %v", err) } @@ -476,7 +489,6 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er log.Printf("Subscriber received: %s", e.Data) return false, nil } - ``` Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: @@ -490,7 +502,6 @@ dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-por {{% codetab %}} ```javascript - //dependencies import { DaprServer, CommunicationProtocolEnum } from 'dapr-client'; @@ -511,14 +522,13 @@ async function start(orderId) { daprHost, process.env.DAPR_HTTP_PORT, CommunicationProtocolEnum.HTTP - ); - //Subscribe to a topic + ); + //Subscribe to a topic await server.pubsub.subscribe("order_pub_sub", "orders", async (orderId) => { console.log(`Subscriber received: ${JSON.stringify(orderId)}`) }); await server.startServer(); } - ``` Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: @@ -610,10 +620,10 @@ Below are code examples that leverage Dapr SDKs to publish a topic. {{% codetab %}} ```csharp - //dependencies <<<<<<< HEAD <<<<<<< HEAD +<<<<<<< HEAD using Dapr.Client; ======= using System; @@ -628,6 +638,16 @@ using System.Threading; ======= using Dapr.Client; >>>>>>> fbadd23a (Modified based on the review comments - 1) +======= +using System; +using System.Collections.Generic; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Threading.Tasks; +using Dapr.Client; +using Microsoft.AspNetCore.Mvc; +using System.Threading; +>>>>>>> f30a0acb (Added full code snippets of pub sub) //code namespace EventService @@ -636,6 +656,7 @@ namespace EventService { static async Task Main(string[] args) { +<<<<<<< HEAD string PUBSUB_NAME = "order_pub_sub"; string TOPIC_NAME = "orders"; int orderId = 100; @@ -653,10 +674,24 @@ namespace EventService >>>>>>> fbadd23a (Modified based on the review comments - 1) await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken); Console.WriteLine("Published data: " + orderId); +======= + string PUBSUB_NAME = "order_pub_sub"; + string TOPIC_NAME = "orders"; + while(true) { + System.Threading.Thread.Sleep(5000); + Random random = new Random(); + int orderId = random.Next(1,1000); + CancellationTokenSource source = new CancellationTokenSource(); + CancellationToken cancellationToken = source.Token; + using var client = new DaprClientBuilder().Build(); + //Using Dapr SDK to publish a topic + await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken); + Console.WriteLine("Published data: " + orderId); + } +>>>>>>> f30a0acb (Added full code snippets of pub sub) } } } - ``` Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: @@ -677,16 +712,25 @@ import io.dapr.client.domain.Metadata; import static java.util.Collections.singletonMap; <<<<<<< HEAD <<<<<<< HEAD +<<<<<<< HEAD ======= import org.springframework.boot.autoconfigure.SpringBootApplication; >>>>>>> 0e83af7a (Added pub sub documentation) ======= >>>>>>> fbadd23a (Modified based on the review comments - 1) +======= +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Random; +import java.util.concurrent.TimeUnit; +>>>>>>> f30a0acb (Added full code snippets of pub sub) //code @SpringBootApplication public class OrderProcessingServiceApplication { +<<<<<<< HEAD public static void main(String[] args) throws InterruptedException{ String MESSAGE_TTL_IN_SECONDS = "1000"; String TOPIC_NAME = "orders"; @@ -720,6 +764,30 @@ Navigate to the directory containing the above code, then run the following comm ======= +======= + private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class); + + public static void main(String[] args) throws InterruptedException{ + String MESSAGE_TTL_IN_SECONDS = "1000"; + String TOPIC_NAME = "orders"; + String PUBSUB_NAME = "order_pub_sub"; + + while(true) { + TimeUnit.MILLISECONDS.sleep(5000); + Random random = new Random(); + int orderId = random.nextInt(1000-1) + 1; + DaprClient client = new DaprClientBuilder().build(); + //Using Dapr SDK to publish a topic + client.publishEvent( + PUBSUB_NAME, + TOPIC_NAME, + orderId, + singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block(); + log.info("Published data:" + orderId); + } + } +} +>>>>>>> f30a0acb (Added full code snippets of pub sub) ``` Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: @@ -737,21 +805,28 @@ dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-g <<<<<<< HEAD <<<<<<< HEAD #dependencies +<<<<<<< HEAD ======= #dependencies +======= +>>>>>>> f30a0acb (Added full code snippets of pub sub) import random from time import sleep import requests import logging import json +<<<<<<< HEAD >>>>>>> 0e83af7a (Added pub sub documentation) ======= #dependencies >>>>>>> fbadd23a (Modified based on the review comments - 1) +======= +>>>>>>> f30a0acb (Added full code snippets of pub sub) from dapr.clients import DaprClient #code logging.basicConfig(level = logging.INFO) +<<<<<<< HEAD orderId = 100 <<<<<<< HEAD @@ -769,6 +844,22 @@ with DaprClient() as client: logging.info('Published data: ' + str(orderId)) <<<<<<< HEAD +======= +while True: + sleep(random.randrange(50, 5000) / 1000) + orderId = random.randint(1, 1000) + PUBSUB_NAME = 'order_pub_sub' + TOPIC_NAME = 'orders' + with DaprClient() as client: + #Using Dapr SDK to publish a topic + result = client.publish_event( + pubsub_name=PUBSUB_NAME, + topic_name=TOPIC_NAME, + data=json.dumps(orderId), + data_content_type='application/json', + ) + logging.info('Published data: ' + str(orderId)) +>>>>>>> f30a0acb (Added full code snippets of pub sub) ``` Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: @@ -808,20 +899,22 @@ dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --app-pr ```go //dependencies import ( - "context" - "log" - "strconv" - dapr "github.com/dapr/go-sdk/client" - + "context" + "log" + "math/rand" + "time" + "strconv" + dapr "github.com/dapr/go-sdk/client" ) //code var ( - PUBSUB_NAME = "order_pub_sub" - TOPIC_NAME = "orders" + PUBSUB_NAME = "order_pub_sub" + TOPIC_NAME = "orders" ) func main() { +<<<<<<< HEAD orderId := 100 client, err := dapr.NewClient() if err != nil { @@ -847,8 +940,26 @@ func main() { panic(err) } log.Println("Published data: " + strconv.Itoa(orderId)) +======= + for i := 0; i < 10; i++ { + time.Sleep(5000) + orderId := rand.Intn(1000-1) + 1 + client, err := dapr.NewClient() + if err != nil { + panic(err) + } + defer client.Close() + ctx := context.Background() + //Using Dapr SDK to publish a topic + if err := client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId))); + err != nil { + panic(err) + } + + log.Println("Published data: " + strconv.Itoa(orderId)) + } +>>>>>>> f30a0acb (Added full code snippets of pub sub) } - ``` <<<<<<< HEAD @@ -875,11 +986,14 @@ dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-g ```javascript //dependencies -import { DaprServer, DaprClient, CommunicationProtocolEnum } from 'dapr-client'; +import { DaprServer, CommunicationProtocolEnum } from 'dapr-client'; //code const daprHost = "127.0.0.1"; +const serverHost = "127.0.0.1"; +const serverPort = "6002"; +<<<<<<< HEAD var main = function() { <<<<<<< HEAD <<<<<<< HEAD @@ -912,10 +1026,29 @@ async function start(orderId) { //Using Dapr SDK to publish a topic >>>>>>> fbadd23a (Modified based on the review comments - 1) await client.pubsub.publish("order_pub_sub", "orders", orderId); -} +======= +start().catch((e) => { + console.error(e); + process.exit(1); +}); -main(); - +async function start(orderId) { + const PUBSUB_NAME = "order_pub_sub" + const TOPIC_NAME = "orders" + const server = new DaprServer( + serverHost, + serverPort, + daprHost, + process.env.DAPR_HTTP_PORT, + CommunicationProtocolEnum.HTTP + ); + //Using Dapr SDK to publish a topic + await server.pubsub.subscribe(PUBSUB_NAME, TOPIC_NAME, async (orderId) => { + console.log(`Subscriber received: ${JSON.stringify(orderId)}`) + }); + await server.startServer(); +>>>>>>> f30a0acb (Added full code snippets of pub sub) +} ``` Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: