Added full code snippets of pub sub

Signed-off-by: Amulya Varote <amulyavarote@Amulyas-MacBook-Pro.local>
This commit is contained in:
Amulya Varote 2021-12-05 07:59:52 -08:00
parent 1f879395a9
commit 5cb62329b0
1 changed files with 157 additions and 24 deletions

View File

@ -304,8 +304,11 @@ Below are code examples that leverage Dapr SDKs to subscribe to a topic.
{{% codetab %}} {{% codetab %}}
```csharp ```csharp
//dependencies //dependencies
using System.Collections.Generic;
using System.Threading.Tasks;
using System;
using Microsoft.AspNetCore.Mvc;
using Dapr; using Dapr;
using Dapr.Client; using Dapr.Client;
@ -347,14 +350,20 @@ import io.dapr.Topic;
import io.dapr.client.domain.CloudEvent; import io.dapr.client.domain.CloudEvent;
<<<<<<< HEAD <<<<<<< HEAD
<<<<<<< HEAD <<<<<<< HEAD
<<<<<<< HEAD
======= =======
=======
>>>>>>> f30a0acb (Added full code snippets of pub sub)
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
<<<<<<< HEAD
>>>>>>> 0e83af7a (Added pub sub documentation) >>>>>>> 0e83af7a (Added pub sub documentation)
======= =======
>>>>>>> fbadd23a (Modified based on the review comments - 1) >>>>>>> fbadd23a (Modified based on the review comments - 1)
=======
>>>>>>> f30a0acb (Added full code snippets of pub sub)
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
//code //code
@ -362,7 +371,7 @@ import reactor.core.publisher.Mono;
public class CheckoutServiceController { public class CheckoutServiceController {
private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class); private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class);
//Subscribe to a topic //Subscribe to a topic
@Topic(name = "orders", pubsubName = "order_pub_sub") @Topic(name = "orders", pubsubName = "order_pub_sub")
@PostMapping(path = "/checkout") @PostMapping(path = "/checkout")
public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) { public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
@ -393,17 +402,21 @@ from cloudevents.sdk.event import v1
from dapr.ext.grpc import App from dapr.ext.grpc import App
<<<<<<< HEAD <<<<<<< HEAD
<<<<<<< HEAD <<<<<<< HEAD
<<<<<<< HEAD
======= =======
import logging import logging
import json import json
>>>>>>> 0e83af7a (Added pub sub documentation) >>>>>>> 0e83af7a (Added pub sub documentation)
======= =======
>>>>>>> fbadd23a (Modified based on the review comments - 1) >>>>>>> fbadd23a (Modified based on the review comments - 1)
=======
import logging
import json
>>>>>>> f30a0acb (Added full code snippets of pub sub)
#code #code
app = App() app = App()
logging.basicConfig(level = logging.INFO) logging.basicConfig(level = logging.INFO)
#Subscribe to a topic #Subscribe to a topic
@app.subscribe(pubsub_name='order_pub_sub', topic='orders') @app.subscribe(pubsub_name='order_pub_sub', topic='orders')
def mytopic(event: v1.Event) -> None: def mytopic(event: v1.Event) -> None:
@ -463,7 +476,7 @@ var sub = &common.Subscription{
func main() { func main() {
s := daprd.NewService(":6002") s := daprd.NewService(":6002")
//Subscribe to a topic //Subscribe to a topic
if err := s.AddTopicEventHandler(sub, eventHandler); err != nil { if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err) log.Fatalf("error adding topic subscription: %v", err)
} }
@ -476,7 +489,6 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er
log.Printf("Subscriber received: %s", e.Data) log.Printf("Subscriber received: %s", e.Data)
return false, nil return false, nil
} }
``` ```
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
@ -490,7 +502,6 @@ dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-por
{{% codetab %}} {{% codetab %}}
```javascript ```javascript
//dependencies //dependencies
import { DaprServer, CommunicationProtocolEnum } from 'dapr-client'; import { DaprServer, CommunicationProtocolEnum } from 'dapr-client';
@ -511,14 +522,13 @@ async function start(orderId) {
daprHost, daprHost,
process.env.DAPR_HTTP_PORT, process.env.DAPR_HTTP_PORT,
CommunicationProtocolEnum.HTTP CommunicationProtocolEnum.HTTP
); );
//Subscribe to a topic //Subscribe to a topic
await server.pubsub.subscribe("order_pub_sub", "orders", async (orderId) => { await server.pubsub.subscribe("order_pub_sub", "orders", async (orderId) => {
console.log(`Subscriber received: ${JSON.stringify(orderId)}`) console.log(`Subscriber received: ${JSON.stringify(orderId)}`)
}); });
await server.startServer(); await server.startServer();
} }
``` ```
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
@ -610,10 +620,10 @@ Below are code examples that leverage Dapr SDKs to publish a topic.
{{% codetab %}} {{% codetab %}}
```csharp ```csharp
//dependencies //dependencies
<<<<<<< HEAD <<<<<<< HEAD
<<<<<<< HEAD <<<<<<< HEAD
<<<<<<< HEAD
using Dapr.Client; using Dapr.Client;
======= =======
using System; using System;
@ -628,6 +638,16 @@ using System.Threading;
======= =======
using Dapr.Client; using Dapr.Client;
>>>>>>> fbadd23a (Modified based on the review comments - 1) >>>>>>> fbadd23a (Modified based on the review comments - 1)
=======
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using System.Threading;
>>>>>>> f30a0acb (Added full code snippets of pub sub)
//code //code
namespace EventService namespace EventService
@ -636,6 +656,7 @@ namespace EventService
{ {
static async Task Main(string[] args) static async Task Main(string[] args)
{ {
<<<<<<< HEAD
string PUBSUB_NAME = "order_pub_sub"; string PUBSUB_NAME = "order_pub_sub";
string TOPIC_NAME = "orders"; string TOPIC_NAME = "orders";
int orderId = 100; int orderId = 100;
@ -653,10 +674,24 @@ namespace EventService
>>>>>>> fbadd23a (Modified based on the review comments - 1) >>>>>>> fbadd23a (Modified based on the review comments - 1)
await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken); await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken);
Console.WriteLine("Published data: " + orderId); Console.WriteLine("Published data: " + orderId);
=======
string PUBSUB_NAME = "order_pub_sub";
string TOPIC_NAME = "orders";
while(true) {
System.Threading.Thread.Sleep(5000);
Random random = new Random();
int orderId = random.Next(1,1000);
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using var client = new DaprClientBuilder().Build();
//Using Dapr SDK to publish a topic
await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken);
Console.WriteLine("Published data: " + orderId);
}
>>>>>>> f30a0acb (Added full code snippets of pub sub)
} }
} }
} }
``` ```
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
@ -677,16 +712,25 @@ import io.dapr.client.domain.Metadata;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
<<<<<<< HEAD <<<<<<< HEAD
<<<<<<< HEAD <<<<<<< HEAD
<<<<<<< HEAD
======= =======
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
>>>>>>> 0e83af7a (Added pub sub documentation) >>>>>>> 0e83af7a (Added pub sub documentation)
======= =======
>>>>>>> fbadd23a (Modified based on the review comments - 1) >>>>>>> fbadd23a (Modified based on the review comments - 1)
=======
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import java.util.concurrent.TimeUnit;
>>>>>>> f30a0acb (Added full code snippets of pub sub)
//code //code
@SpringBootApplication @SpringBootApplication
public class OrderProcessingServiceApplication { public class OrderProcessingServiceApplication {
<<<<<<< HEAD
public static void main(String[] args) throws InterruptedException{ public static void main(String[] args) throws InterruptedException{
String MESSAGE_TTL_IN_SECONDS = "1000"; String MESSAGE_TTL_IN_SECONDS = "1000";
String TOPIC_NAME = "orders"; String TOPIC_NAME = "orders";
@ -720,6 +764,30 @@ Navigate to the directory containing the above code, then run the following comm
======= =======
=======
private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
public static void main(String[] args) throws InterruptedException{
String MESSAGE_TTL_IN_SECONDS = "1000";
String TOPIC_NAME = "orders";
String PUBSUB_NAME = "order_pub_sub";
while(true) {
TimeUnit.MILLISECONDS.sleep(5000);
Random random = new Random();
int orderId = random.nextInt(1000-1) + 1;
DaprClient client = new DaprClientBuilder().build();
//Using Dapr SDK to publish a topic
client.publishEvent(
PUBSUB_NAME,
TOPIC_NAME,
orderId,
singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
log.info("Published data:" + orderId);
}
}
}
>>>>>>> f30a0acb (Added full code snippets of pub sub)
``` ```
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
@ -737,21 +805,28 @@ dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-g
<<<<<<< HEAD <<<<<<< HEAD
<<<<<<< HEAD <<<<<<< HEAD
#dependencies #dependencies
<<<<<<< HEAD
======= =======
#dependencies #dependencies
=======
>>>>>>> f30a0acb (Added full code snippets of pub sub)
import random import random
from time import sleep from time import sleep
import requests import requests
import logging import logging
import json import json
<<<<<<< HEAD
>>>>>>> 0e83af7a (Added pub sub documentation) >>>>>>> 0e83af7a (Added pub sub documentation)
======= =======
#dependencies #dependencies
>>>>>>> fbadd23a (Modified based on the review comments - 1) >>>>>>> fbadd23a (Modified based on the review comments - 1)
=======
>>>>>>> f30a0acb (Added full code snippets of pub sub)
from dapr.clients import DaprClient from dapr.clients import DaprClient
#code #code
logging.basicConfig(level = logging.INFO) logging.basicConfig(level = logging.INFO)
<<<<<<< HEAD
orderId = 100 orderId = 100
<<<<<<< HEAD <<<<<<< HEAD
@ -769,6 +844,22 @@ with DaprClient() as client:
logging.info('Published data: ' + str(orderId)) logging.info('Published data: ' + str(orderId))
<<<<<<< HEAD <<<<<<< HEAD
=======
while True:
sleep(random.randrange(50, 5000) / 1000)
orderId = random.randint(1, 1000)
PUBSUB_NAME = 'order_pub_sub'
TOPIC_NAME = 'orders'
with DaprClient() as client:
#Using Dapr SDK to publish a topic
result = client.publish_event(
pubsub_name=PUBSUB_NAME,
topic_name=TOPIC_NAME,
data=json.dumps(orderId),
data_content_type='application/json',
)
logging.info('Published data: ' + str(orderId))
>>>>>>> f30a0acb (Added full code snippets of pub sub)
``` ```
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
@ -808,20 +899,22 @@ dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --app-pr
```go ```go
//dependencies //dependencies
import ( import (
"context" "context"
"log" "log"
"strconv" "math/rand"
dapr "github.com/dapr/go-sdk/client" "time"
"strconv"
dapr "github.com/dapr/go-sdk/client"
) )
//code //code
var ( var (
PUBSUB_NAME = "order_pub_sub" PUBSUB_NAME = "order_pub_sub"
TOPIC_NAME = "orders" TOPIC_NAME = "orders"
) )
func main() { func main() {
<<<<<<< HEAD
orderId := 100 orderId := 100
client, err := dapr.NewClient() client, err := dapr.NewClient()
if err != nil { if err != nil {
@ -847,8 +940,26 @@ func main() {
panic(err) panic(err)
} }
log.Println("Published data: " + strconv.Itoa(orderId)) log.Println("Published data: " + strconv.Itoa(orderId))
} =======
for i := 0; i < 10; i++ {
time.Sleep(5000)
orderId := rand.Intn(1000-1) + 1
client, err := dapr.NewClient()
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
//Using Dapr SDK to publish a topic
if err := client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId)));
err != nil {
panic(err)
}
log.Println("Published data: " + strconv.Itoa(orderId))
}
>>>>>>> f30a0acb (Added full code snippets of pub sub)
}
``` ```
<<<<<<< HEAD <<<<<<< HEAD
@ -875,11 +986,14 @@ dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-g
```javascript ```javascript
//dependencies //dependencies
import { DaprServer, DaprClient, CommunicationProtocolEnum } from 'dapr-client'; import { DaprServer, CommunicationProtocolEnum } from 'dapr-client';
//code //code
const daprHost = "127.0.0.1"; const daprHost = "127.0.0.1";
const serverHost = "127.0.0.1";
const serverPort = "6002";
<<<<<<< HEAD
var main = function() { var main = function() {
<<<<<<< HEAD <<<<<<< HEAD
<<<<<<< HEAD <<<<<<< HEAD
@ -912,10 +1026,29 @@ async function start(orderId) {
//Using Dapr SDK to publish a topic //Using Dapr SDK to publish a topic
>>>>>>> fbadd23a (Modified based on the review comments - 1) >>>>>>> fbadd23a (Modified based on the review comments - 1)
await client.pubsub.publish("order_pub_sub", "orders", orderId); await client.pubsub.publish("order_pub_sub", "orders", orderId);
=======
start().catch((e) => {
console.error(e);
process.exit(1);
});
async function start(orderId) {
const PUBSUB_NAME = "order_pub_sub"
const TOPIC_NAME = "orders"
const server = new DaprServer(
serverHost,
serverPort,
daprHost,
process.env.DAPR_HTTP_PORT,
CommunicationProtocolEnum.HTTP
);
//Using Dapr SDK to publish a topic
await server.pubsub.subscribe(PUBSUB_NAME, TOPIC_NAME, async (orderId) => {
console.log(`Subscriber received: ${JSON.stringify(orderId)}`)
});
await server.startServer();
>>>>>>> f30a0acb (Added full code snippets of pub sub)
} }
main();
``` ```
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application: