mirror of https://github.com/dapr/docs.git
Added pub sub documentation
This commit is contained in:
parent
4a8fbf10da
commit
830015d3eb
|
@ -20,33 +20,48 @@ When publishing a message, it's important to specify the content type of the dat
|
|||
Unless specified, Dapr will assume `text/plain`. When using Dapr's HTTP API, the content type can be set in a `Content-Type` header.
|
||||
gRPC clients and SDKs have a dedicated content type parameter.
|
||||
|
||||
## Step 1: Setup the Pub/Sub component
|
||||
The following example creates applications to publish and subscribe to a topic called `deathStarStatus`.
|
||||
## Example:
|
||||
|
||||
<img src="/images/pubsub-publish-subscribe-example.png" width=1000>
|
||||
<br></br>
|
||||
The below code examples loosely describes an application that processes orders. In the examples, there are two services - an order processing service and a checkout service. Both services have Dapr sidecars. The order processing service uses Dapr to publish message and the checkout service subscribes to the message in Rabbit mq.
|
||||
|
||||
<img src="/images/building-block-pub-sub-example.png" width=1000 alt="Diagram showing state management of example service">
|
||||
|
||||
## Step 1: Setup the Pub/Sub component
|
||||
The following example creates applications to publish and subscribe to a topic called `orders`.
|
||||
|
||||
The first step is to setup the Pub/Sub component:
|
||||
|
||||
{{< tabs "Self-Hosted (CLI)" Kubernetes >}}
|
||||
|
||||
{{% codetab %}}
|
||||
Redis Streams is installed by default on a local machine when running `dapr init`.
|
||||
pubsub.yaml is created by default on a local machine when running `dapr init`. Verify by opening your components file under `%UserProfile%\.dapr\components\pubsub.yaml` on Windows or `~/.dapr/components/pubsub.yaml` on Linux/MacOS.
|
||||
|
||||
In this example, rabbit mq is used for publish and subscribe. Replace pubsub.yaml file contents with the below contents.
|
||||
|
||||
Verify by opening your components file under `%UserProfile%\.dapr\components\pubsub.yaml` on Windows or `~/.dapr/components/pubsub.yaml` on Linux/MacOS:
|
||||
```yaml
|
||||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: pubsub
|
||||
name: order_pub_sub
|
||||
spec:
|
||||
type: pubsub.redis
|
||||
type: pubsub.rabbitmq
|
||||
version: v1
|
||||
metadata:
|
||||
- name: redisHost
|
||||
value: localhost:6379
|
||||
- name: redisPassword
|
||||
value: ""
|
||||
- name: host
|
||||
value: "amqp://localhost:5672"
|
||||
- name: durable
|
||||
value: "false"
|
||||
- name: deletedWhenUnused
|
||||
value: "false"
|
||||
- name: autoAck
|
||||
value: "false"
|
||||
- name: reconnectWait
|
||||
value: "0"
|
||||
- name: concurrency
|
||||
value: parallel
|
||||
scopes:
|
||||
- orderprocessing
|
||||
- checkout
|
||||
```
|
||||
|
||||
You can override this file with another Redis instance or another [pubsub component]({{< ref setup-pubsub >}}) by creating a `components` directory containing the file and using the flag `--components-path` with the `dapr run` CLI command.
|
||||
|
@ -59,16 +74,27 @@ To deploy this into a Kubernetes cluster, fill in the `metadata` connection deta
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: pubsub
|
||||
name: order_pub_sub
|
||||
namespace: default
|
||||
spec:
|
||||
type: pubsub.redis
|
||||
type: pubsub.rabbitmq
|
||||
version: v1
|
||||
metadata:
|
||||
- name: redisHost
|
||||
value: localhost:6379
|
||||
- name: redisPassword
|
||||
value: ""
|
||||
- name: host
|
||||
value: "amqp://localhost:5672"
|
||||
- name: durable
|
||||
value: "false"
|
||||
- name: deletedWhenUnused
|
||||
value: "false"
|
||||
- name: autoAck
|
||||
value: "false"
|
||||
- name: reconnectWait
|
||||
value: "0"
|
||||
- name: concurrency
|
||||
value: parallel
|
||||
scopes:
|
||||
- orderprocessing
|
||||
- checkout
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
|
@ -95,19 +121,19 @@ You can subscribe to a topic using the following Custom Resources Definition (CR
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Subscription
|
||||
metadata:
|
||||
name: myevent-subscription
|
||||
name: order_pub_sub
|
||||
spec:
|
||||
topic: deathStarStatus
|
||||
route: /dsstatus
|
||||
pubsubname: pubsub
|
||||
topic: orders
|
||||
route: /checkout
|
||||
pubsubname: order_pub_sub
|
||||
scopes:
|
||||
- app1
|
||||
- app2
|
||||
- orderprocessing
|
||||
- checkout
|
||||
```
|
||||
|
||||
The example above shows an event subscription to topic `deathStarStatus`, for the pubsub component `pubsub`.
|
||||
- The `route` field tells Dapr to send all topic messages to the `/dsstatus` endpoint in the app.
|
||||
- The `scopes` field enables this subscription for apps with IDs `app1` and `app2`.
|
||||
The example above shows an event subscription to topic `orders`, for the pubsub component `order_pub_sub`.
|
||||
- The `route` field tells Dapr to send all topic messages to the `/checkout` endpoint in the app.
|
||||
- The `scopes` field enables this subscription for apps with IDs `orderprocessing` and `checkout`.
|
||||
|
||||
Set the component with:
|
||||
{{< tabs "Self-Hosted (CLI)" Kubernetes>}}
|
||||
|
@ -120,7 +146,7 @@ Note: By default, Dapr loads components from `$HOME/.dapr/components` on MacOS/L
|
|||
You can also override the default directory by pointing the Dapr CLI to a components path:
|
||||
|
||||
```bash
|
||||
dapr run --app-id myapp --components-path ./myComponents -- python3 app1.py
|
||||
dapr run --app-id myapp --components-path ./myComponents -- <language_specific_command>
|
||||
```
|
||||
|
||||
*Note: If you place the subscription in a custom components path, make sure the Pub/Sub component is present also.*
|
||||
|
@ -137,251 +163,240 @@ kubectl apply -f subscription.yaml
|
|||
|
||||
{{< /tabs >}}
|
||||
|
||||
#### Example
|
||||
Below are code examples that leverage Dapr SDKs to subscribe to a topic.
|
||||
|
||||
{{< tabs Python Node PHP>}}
|
||||
|
||||
{{% codetab %}}
|
||||
Create a file named `app1.py` and paste in the following:
|
||||
```python
|
||||
import flask
|
||||
from flask import request, jsonify
|
||||
from flask_cors import CORS
|
||||
import json
|
||||
import sys
|
||||
|
||||
app = flask.Flask(__name__)
|
||||
CORS(app)
|
||||
|
||||
@app.route('/dsstatus', methods=['POST'])
|
||||
def ds_subscriber():
|
||||
print(request.json, flush=True)
|
||||
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
|
||||
|
||||
app.run()
|
||||
```
|
||||
After creating `app1.py` ensure flask and flask_cors are installed:
|
||||
|
||||
```bash
|
||||
pip install flask
|
||||
pip install flask_cors
|
||||
```
|
||||
|
||||
Then run:
|
||||
|
||||
```bash
|
||||
dapr --app-id app1 --app-port 5000 run python app1.py
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
After setting up the subscription above, download this javascript (Node > 4.16) into a `app2.js` file:
|
||||
|
||||
```javascript
|
||||
const express = require('express')
|
||||
const bodyParser = require('body-parser')
|
||||
const app = express()
|
||||
app.use(bodyParser.json({ type: 'application/*+json' }));
|
||||
|
||||
const port = 3000
|
||||
|
||||
app.post('/dsstatus', (req, res) => {
|
||||
console.log(req.body);
|
||||
res.sendStatus(200);
|
||||
});
|
||||
|
||||
app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
|
||||
```
|
||||
Run this app with:
|
||||
|
||||
```bash
|
||||
dapr --app-id app2 --app-port 3000 run node app2.js
|
||||
```
|
||||
{{% /codetab %}}
|
||||
{{< tabs Dotnet Java Python Go Javascript>}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
Create a file named `app1.php` and paste in the following:
|
||||
```csharp
|
||||
|
||||
```php
|
||||
<?php
|
||||
//dependencies
|
||||
using System.Collections.Generic;
|
||||
using System.Threading.Tasks;
|
||||
using System;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Dapr;
|
||||
using Dapr.Client;
|
||||
|
||||
require_once __DIR__.'/vendor/autoload.php';
|
||||
|
||||
$app = \Dapr\App::create();
|
||||
$app->post('/dsstatus', function(
|
||||
#[\Dapr\Attributes\FromBody]
|
||||
\Dapr\PubSub\CloudEvent $cloudEvent,
|
||||
\Psr\Log\LoggerInterface $logger
|
||||
) {
|
||||
$logger->alert('Received event: {event}', ['event' => $cloudEvent]);
|
||||
return ['status' => 'SUCCESS'];
|
||||
}
|
||||
);
|
||||
$app->start();
|
||||
```
|
||||
|
||||
After creating `app1.php`, and with the [SDK installed](https://docs.dapr.io/developing-applications/sdks/php/),
|
||||
go ahead and start the app:
|
||||
|
||||
```bash
|
||||
dapr --app-id app1 --app-port 3000 run -- php -S 0.0.0.0:3000 app1.php
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{< /tabs >}}
|
||||
|
||||
### Programmatic subscriptions
|
||||
|
||||
To subscribe to topics, start a web server in the programming language of your choice and listen on the following `GET` endpoint: `/dapr/subscribe`.
|
||||
The Dapr instance calls into your app at startup and expect a JSON response for the topic subscriptions with:
|
||||
- `pubsubname`: Which pub/sub component Dapr should use.
|
||||
- `topic`: Which topic to subscribe to.
|
||||
- `route`: Which endpoint for Dapr to call on when a message comes to that topic.
|
||||
|
||||
#### Example
|
||||
|
||||
{{< tabs Python Node PHP>}}
|
||||
|
||||
{{% codetab %}}
|
||||
```python
|
||||
import flask
|
||||
from flask import request, jsonify
|
||||
from flask_cors import CORS
|
||||
import json
|
||||
import sys
|
||||
|
||||
app = flask.Flask(__name__)
|
||||
CORS(app)
|
||||
|
||||
@app.route('/dapr/subscribe', methods=['GET'])
|
||||
def subscribe():
|
||||
subscriptions = [{'pubsubname': 'pubsub',
|
||||
'topic': 'deathStarStatus',
|
||||
'route': 'dsstatus'}]
|
||||
return jsonify(subscriptions)
|
||||
|
||||
@app.route('/dsstatus', methods=['POST'])
|
||||
def ds_subscriber():
|
||||
print(request.json, flush=True)
|
||||
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
|
||||
app.run()
|
||||
```
|
||||
After creating `app1.py` ensure flask and flask_cors are installed:
|
||||
|
||||
```bash
|
||||
pip install flask
|
||||
pip install flask_cors
|
||||
```
|
||||
|
||||
Then run:
|
||||
|
||||
```bash
|
||||
dapr --app-id app1 --app-port 5000 run python app1.py
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
```javascript
|
||||
const express = require('express')
|
||||
const bodyParser = require('body-parser')
|
||||
const app = express()
|
||||
app.use(bodyParser.json({ type: 'application/*+json' }));
|
||||
|
||||
const port = 3000
|
||||
|
||||
app.get('/dapr/subscribe', (req, res) => {
|
||||
res.json([
|
||||
//code
|
||||
namespace CheckoutService.controller
|
||||
{
|
||||
[ApiController]
|
||||
public class CheckoutServiceController : Controller
|
||||
{
|
||||
//Subscribe to a topic
|
||||
[Topic("order_pub_sub", "orders")]
|
||||
[HttpPost("checkout")]
|
||||
public void getCheckout([FromBody] int orderId)
|
||||
{
|
||||
pubsubname: "pubsub",
|
||||
topic: "deathStarStatus",
|
||||
route: "dsstatus"
|
||||
Console.WriteLine("Subscriber received : " + orderId);
|
||||
}
|
||||
]);
|
||||
})
|
||||
|
||||
app.post('/dsstatus', (req, res) => {
|
||||
console.log(req.body);
|
||||
res.sendStatus(200);
|
||||
});
|
||||
|
||||
app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
|
||||
}
|
||||
}
|
||||
```
|
||||
Run this app with:
|
||||
|
||||
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
|
||||
|
||||
```bash
|
||||
dapr --app-id app2 --app-port 3000 run node app2.js
|
||||
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 --app-ssl dotnet run
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
Update `app1.php` with the following:
|
||||
```java
|
||||
//dependencies
|
||||
import io.dapr.Topic;
|
||||
import io.dapr.client.domain.CloudEvent;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
```php
|
||||
<?php
|
||||
//code
|
||||
@RestController
|
||||
public class CheckoutServiceController {
|
||||
|
||||
require_once __DIR__.'/vendor/autoload.php';
|
||||
|
||||
$app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
|
||||
new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'deathStarStatus', route: '/dsstatus'),
|
||||
]]));
|
||||
$app->post('/dsstatus', function(
|
||||
#[\Dapr\Attributes\FromBody]
|
||||
\Dapr\PubSub\CloudEvent $cloudEvent,
|
||||
\Psr\Log\LoggerInterface $logger
|
||||
) {
|
||||
$logger->alert('Received event: {event}', ['event' => $cloudEvent]);
|
||||
return ['status' => 'SUCCESS'];
|
||||
private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class);
|
||||
//Subscribe to a topic
|
||||
@Topic(name = "orders", pubsubName = "order_pub_sub")
|
||||
@PostMapping(path = "/checkout")
|
||||
public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
|
||||
return Mono.fromRunnable(() -> {
|
||||
try {
|
||||
log.info("Subscriber received: " + cloudEvent.getData());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
);
|
||||
$app->start();
|
||||
}
|
||||
```
|
||||
|
||||
Run this app with:
|
||||
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
|
||||
|
||||
```bash
|
||||
dapr --app-id app1 --app-port 3000 run -- php -S 0.0.0.0:3000 app1.php
|
||||
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 mvn spring-boot:run
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
```python
|
||||
#dependencies
|
||||
from cloudevents.sdk.event import v1
|
||||
from dapr.ext.grpc import App
|
||||
import logging
|
||||
import json
|
||||
|
||||
#code
|
||||
app = App()
|
||||
logging.basicConfig(level = logging.INFO)
|
||||
|
||||
#Subscribe to a topic
|
||||
@app.subscribe(pubsub_name='order_pub_sub', topic='orders')
|
||||
def mytopic(event: v1.Event) -> None:
|
||||
data = json.loads(event.Data())
|
||||
logging.info('Subscriber received: ' + data)
|
||||
|
||||
app.run(60002)
|
||||
```
|
||||
|
||||
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
|
||||
|
||||
```bash
|
||||
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 -- python3 CheckoutService.py
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
```go
|
||||
//dependencies
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"context"
|
||||
|
||||
"github.com/dapr/go-sdk/service/common"
|
||||
daprd "github.com/dapr/go-sdk/service/http"
|
||||
)
|
||||
|
||||
//code
|
||||
var sub = &common.Subscription{
|
||||
PubsubName: "order_pub_sub",
|
||||
Topic: "orders",
|
||||
Route: "/checkout",
|
||||
}
|
||||
|
||||
func main() {
|
||||
s := daprd.NewService(":6002")
|
||||
//Subscribe to a topic
|
||||
if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
|
||||
log.Fatalf("error adding topic subscription: %v", err)
|
||||
}
|
||||
if err := s.Start(); err != nil && err != http.ErrServerClosed {
|
||||
log.Fatalf("error listenning: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
|
||||
log.Printf("Subscriber received: %s", e.Data)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
|
||||
|
||||
```bash
|
||||
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 go run CheckoutService.go
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
```javascript
|
||||
|
||||
//dependencies
|
||||
import { DaprServer, CommunicationProtocolEnum } from 'dapr-client';
|
||||
|
||||
//code
|
||||
const daprHost = "127.0.0.1";
|
||||
const serverHost = "127.0.0.1";
|
||||
const serverPort = "6002";
|
||||
|
||||
start().catch((e) => {
|
||||
console.error(e);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
async function start(orderId) {
|
||||
const server = new DaprServer(
|
||||
serverHost,
|
||||
serverPort,
|
||||
daprHost,
|
||||
process.env.DAPR_HTTP_PORT,
|
||||
CommunicationProtocolEnum.HTTP
|
||||
);
|
||||
//Subscribe to a topic
|
||||
await server.pubsub.subscribe("order_pub_sub", "orders", async (orderId) => {
|
||||
console.log(`Subscriber received: ${JSON.stringify(orderId)}`)
|
||||
});
|
||||
await server.startServer();
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
|
||||
|
||||
```bash
|
||||
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 npm start
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{< /tabs >}}
|
||||
|
||||
The `/dsstatus` endpoint matches the `route` defined in the subscriptions and this is where Dapr will send all topic messages to.
|
||||
The `/checkout` endpoint matches the `route` defined in the subscriptions and this is where Dapr will send all topic messages to.
|
||||
|
||||
## Step 3: Publish a topic
|
||||
|
||||
To publish a topic you need to run an instance of a Dapr sidecar to use the pubsub Redis component. You can use the default Redis component installed into your local environment.
|
||||
|
||||
Start an instance of Dapr with an app-id called `testpubsub`:
|
||||
Start an instance of Dapr with an app-id called `orderprocessing`:
|
||||
|
||||
```bash
|
||||
dapr run --app-id testpubsub --dapr-http-port 3500
|
||||
dapr run --app-id orderprocessing --dapr-http-port 3500
|
||||
```
|
||||
{{< tabs "Dapr CLI" "HTTP API (Bash)" "HTTP API (PowerShell)">}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
Then publish a message to the `deathStarStatus` topic:
|
||||
Then publish a message to the `orders` topic:
|
||||
|
||||
```bash
|
||||
dapr publish --publish-app-id testpubsub --pubsub pubsub --topic deathStarStatus --data '{"status": "completed"}'
|
||||
dapr publish --publish-app-id testpubsub --pubsub pubsub --topic orders --data '{"orderId": "100"}'
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
Then publish a message to the `deathStarStatus` topic:
|
||||
Then publish a message to the `orders` topic:
|
||||
```bash
|
||||
curl -X POST http://localhost:3500/v1.0/publish/pubsub/deathStarStatus -H "Content-Type: application/json" -d '{"status": "completed"}'
|
||||
curl -X POST http://localhost:3601/v1.0/publish/pubsub/orders -H "Content-Type: application/json" -d '{"orderId": "100"}'
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
Then publish a message to the `deathStarStatus` topic:
|
||||
Then publish a message to the `orders` topic:
|
||||
```powershell
|
||||
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"status": "completed"}' -Uri 'http://localhost:3500/v1.0/publish/pubsub/deathStarStatus'
|
||||
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"orderId": "100"}' -Uri 'http://localhost:3601/v1.0/publish/pubsub/orders'
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
|
@ -389,96 +404,217 @@ Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"status":
|
|||
|
||||
Dapr automatically wraps the user payload in a Cloud Events v1.0 compliant envelope, using `Content-Type` header value for `datacontenttype` attribute.
|
||||
|
||||
Below are code examples that leverage Dapr SDKs to publish a topic.
|
||||
|
||||
{{< tabs Dotnet Java Python Go Javascript>}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
```csharp
|
||||
|
||||
//dependencies
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Net.Http;
|
||||
using System.Net.Http.Headers;
|
||||
using System.Threading.Tasks;
|
||||
using Dapr.Client;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using System.Threading;
|
||||
|
||||
//code
|
||||
namespace EventService
|
||||
{
|
||||
class Program
|
||||
{
|
||||
static async Task Main(string[] args)
|
||||
{
|
||||
string PUBSUB_NAME = "order_pub_sub";
|
||||
string TOPIC_NAME = "orders";
|
||||
int orderId = 100;
|
||||
CancellationTokenSource source = new CancellationTokenSource();
|
||||
CancellationToken cancellationToken = source.Token;
|
||||
using var client = new DaprClientBuilder().Build();
|
||||
//Using Dapr SDK to publish to a topic
|
||||
await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken);
|
||||
Console.WriteLine("Published data: " + orderId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
|
||||
|
||||
```bash
|
||||
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --app-ssl dotnet run
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
```java
|
||||
//dependencies
|
||||
import io.dapr.client.DaprClient;
|
||||
import io.dapr.client.DaprClientBuilder;
|
||||
import io.dapr.client.domain.Metadata;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
|
||||
//code
|
||||
@SpringBootApplication
|
||||
public class OrderProcessingServiceApplication {
|
||||
|
||||
public static void main(String[] args) throws InterruptedException{
|
||||
String MESSAGE_TTL_IN_SECONDS = "1000";
|
||||
String TOPIC_NAME = "orders";
|
||||
String PUBSUB_NAME = "order_pub_sub";
|
||||
|
||||
int orderId = 100;
|
||||
DaprClient client = new DaprClientBuilder().build();
|
||||
//Using Dapr SDK to publish to a topic
|
||||
client.publishEvent(
|
||||
PUBSUB_NAME,
|
||||
TOPIC_NAME,
|
||||
orderId,
|
||||
singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
|
||||
log.info("Published data:" + orderId);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
|
||||
|
||||
```bash
|
||||
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
```python
|
||||
#dependencies
|
||||
import random
|
||||
from time import sleep
|
||||
import requests
|
||||
import logging
|
||||
import json
|
||||
from dapr.clients import DaprClient
|
||||
|
||||
#code
|
||||
logging.basicConfig(level = logging.INFO)
|
||||
|
||||
orderId = 100
|
||||
with DaprClient() as client:
|
||||
#Using Dapr SDK to publish to a topic
|
||||
result = client.publish_event(
|
||||
pubsub_name='order_pub_sub',
|
||||
topic_name='orders',
|
||||
data=json.dumps(orderId),
|
||||
data_content_type='application/json',
|
||||
)
|
||||
logging.info('Published data: ' + str(orderId))
|
||||
|
||||
```
|
||||
|
||||
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
|
||||
|
||||
```bash
|
||||
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 -- python3 OrderProcessingService.py
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
```go
|
||||
//dependencies
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"strconv"
|
||||
dapr "github.com/dapr/go-sdk/client"
|
||||
|
||||
)
|
||||
|
||||
//code
|
||||
var (
|
||||
PUBSUB_NAME = "order_pub_sub"
|
||||
TOPIC_NAME = "orders"
|
||||
)
|
||||
|
||||
func main() {
|
||||
orderId := 100
|
||||
client, err := dapr.NewClient()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer client.Close()
|
||||
ctx := context.Background()
|
||||
//Using Dapr SDK to publish to a topic
|
||||
if err := client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId)));
|
||||
err != nil {
|
||||
panic(err)
|
||||
}
|
||||
log.Println("Published data: " + strconv.Itoa(orderId))
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
|
||||
|
||||
```bash
|
||||
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
```javascript
|
||||
//dependencies
|
||||
import { DaprServer, DaprClient, CommunicationProtocolEnum } from 'dapr-client';
|
||||
|
||||
//code
|
||||
const daprHost = "127.0.0.1";
|
||||
|
||||
var main = function() {
|
||||
var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
|
||||
start(orderId).catch((e) => {
|
||||
console.error(e);
|
||||
process.exit(1);
|
||||
});
|
||||
}
|
||||
|
||||
async function start(orderId) {
|
||||
const client = new DaprClient(daprHost, process.env.DAPR_HTTP_PORT, CommunicationProtocolEnum.HTTP);
|
||||
console.log("Published data:" + orderId)
|
||||
//Using Dapr SDK to publish to a topic
|
||||
await client.pubsub.publish("order_pub_sub", "orders", orderId);
|
||||
}
|
||||
|
||||
main();
|
||||
|
||||
```
|
||||
|
||||
Navigate to the directory containing the above code, then run the following command to launch a Dapr sidecar and run the application:
|
||||
|
||||
```bash
|
||||
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{< /tabs >}}
|
||||
|
||||
## Step 4: ACK-ing a message
|
||||
|
||||
In order to tell Dapr that a message was processed successfully, return a `200 OK` response. If Dapr receives any other return status code than `200`, or if your app crashes, Dapr will attempt to redeliver the message following At-Least-Once semantics.
|
||||
|
||||
#### Example
|
||||
|
||||
{{< tabs Python Node>}}
|
||||
|
||||
{{% codetab %}}
|
||||
```python
|
||||
@app.route('/dsstatus', methods=['POST'])
|
||||
def ds_subscriber():
|
||||
print(request.json, flush=True)
|
||||
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
```javascript
|
||||
app.post('/dsstatus', (req, res) => {
|
||||
res.sendStatus(200);
|
||||
});
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
{{< /tabs >}}
|
||||
|
||||
{{% alert title="Note on message redelivery" color="primary" %}}
|
||||
Some pubsub components (e.g. Redis) will redeliver a message if a response is not sent back within a specified time window. Make sure to configure metadata such as `processingTimeout` to customize this behavior. For more information refer to the respective [component references]({{< ref supported-pubsub >}}).
|
||||
{{% /alert %}}
|
||||
|
||||
## (Optional) Step 5: Publishing a topic with code
|
||||
|
||||
{{< tabs Node PHP>}}
|
||||
|
||||
{{% codetab %}}
|
||||
If you prefer publishing a topic using code, here is an example.
|
||||
|
||||
```javascript
|
||||
const express = require('express');
|
||||
const path = require('path');
|
||||
const request = require('request');
|
||||
const bodyParser = require('body-parser');
|
||||
|
||||
const app = express();
|
||||
app.use(bodyParser.json());
|
||||
|
||||
const daprPort = process.env.DAPR_HTTP_PORT || 3500;
|
||||
const daprUrl = `http://localhost:${daprPort}/v1.0`;
|
||||
const port = 8080;
|
||||
const pubsubName = 'pubsub';
|
||||
|
||||
app.post('/publish', (req, res) => {
|
||||
console.log("Publishing: ", req.body);
|
||||
const publishUrl = `${daprUrl}/publish/${pubsubName}/deathStarStatus`;
|
||||
request( { uri: publishUrl, method: 'POST', json: req.body } );
|
||||
res.sendStatus(200);
|
||||
});
|
||||
|
||||
app.listen(process.env.PORT || port, () => console.log(`Listening on port ${port}!`));
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
If you prefer publishing a topic using code, here is an example.
|
||||
|
||||
```php
|
||||
<?php
|
||||
|
||||
require_once __DIR__.'/vendor/autoload.php';
|
||||
|
||||
$app = \Dapr\App::create();
|
||||
$app->run(function(\DI\FactoryInterface $factory, \Psr\Log\LoggerInterface $logger) {
|
||||
$publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
|
||||
$publisher->topic('deathStarStatus')->publish('operational');
|
||||
$logger->alert('published!');
|
||||
});
|
||||
```
|
||||
|
||||
You can save this to `app2.php` and while `app1` is running in another terminal, execute:
|
||||
|
||||
```bash
|
||||
dapr --app-id app2 run -- php app2.php
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{< /tabs >}}
|
||||
|
||||
## Sending a custom CloudEvent
|
||||
|
||||
Dapr automatically takes the data sent on the publish request and wraps it in a CloudEvent 1.0 envelope.
|
||||
|
@ -491,23 +627,23 @@ Read about content types [here](#content-types), and about the [Cloud Events mes
|
|||
{{< tabs "Dapr CLI" "HTTP API (Bash)" "HTTP API (PowerShell)">}}
|
||||
|
||||
{{% codetab %}}
|
||||
Publish a custom CloudEvent to the `deathStarStatus` topic:
|
||||
Publish a custom CloudEvent to the `orders` topic:
|
||||
```bash
|
||||
dapr publish --publish-app-id testpubsub --pubsub pubsub --topic deathStarStatus --data '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"status": "completed"}}'
|
||||
dapr publish --publish-app-id testpubsub --pubsub pubsub --topic orders --data '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}'
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
Publish a custom CloudEvent to the `deathStarStatus` topic:
|
||||
Publish a custom CloudEvent to the `orders` topic:
|
||||
```bash
|
||||
curl -X POST http://localhost:3500/v1.0/publish/pubsub/deathStarStatus -H "Content-Type: application/cloudevents+json" -d '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"status": "completed"}}'
|
||||
curl -X POST http://localhost:3601/v1.0/publish/pubsub/orders -H "Content-Type: application/cloudevents+json" -d '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}'
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
Publish a custom CloudEvent to the `deathStarStatus` topic:
|
||||
Publish a custom CloudEvent to the `orders` topic:
|
||||
```powershell
|
||||
Invoke-RestMethod -Method Post -ContentType 'application/cloudevents+json' -Body '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"status": "completed"}}' -Uri 'http://localhost:3500/v1.0/publish/pubsub/deathStarStatus'
|
||||
Invoke-RestMethod -Method Post -ContentType 'application/cloudevents+json' -Body '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' -Uri 'http://localhost:3601/v1.0/publish/pubsub/orders'
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
|
|
Binary file not shown.
After Width: | Height: | Size: 174 KiB |
Loading…
Reference in New Issue