Merge branch 'v1.10' into stable-components

Signed-off-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
This commit is contained in:
Hannah Hunter 2023-02-01 18:24:53 -06:00 committed by GitHub
commit b8fc7e35c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 442 additions and 21 deletions

View File

@ -0,0 +1,285 @@
---
type: docs
title: "Publish and subscribe to bulk messages"
linkTitle: "Publish and subscribe to bulk messages"
weight: 7100
description: "Learn how to use the bulk publish and subscribe APIs in Dapr."
---
{{% alert title="alpha" color="warning" %}}
The bulk publish and subscribe APIs are in **alpha** stage.
{{% /alert %}}
With the bulk publish and subscribe APIs, you can publish and subscribe to multiple messages in a single request. When writing applications that need to send or receive a large number of messages, using bulk operations allows achieving high throughput by reducing the overall number of requests between the Dapr sidecar, the application, and the underlying pub/sub broker.
## Publishing messages in bulk
### Restrictions when publishing messages in bulk
The bulk publish API allows you to publish multiple messages to a topic in a single request. It is *non-transactional*, i.e., from a single bulk request, some messages can succeed and some can fail. If any of the messages fail to publish, the bulk publish operation returns a list of failed messages.
The bulk publish operation also does not guarantee any ordering of messages.
### Example
{{< tabs Java Javascript Dotnet Python Go "HTTP API (Bash)" "HTTP API (PowerShell)" >}}
{{% codetab %}}
```java
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import java.util.ArrayList;
import java.util.List;
class BulkPublisher {
private static final String PUBSUB_NAME = "my-pubsub-name";
private static final String TOPIC_NAME = "topic-a";
public void publishMessages() {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
// Create a list of messages to publish
List<String> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String message = String.format("This is message #%d", i);
messages.add(message);
}
// Publish list of messages using the bulk publish API
BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block();
}
}
}
```
{{% /codetab %}}
{{% codetab %}}
```typescript
import { DaprClient } from "@dapr/dapr";
const pubSubName = "my-pubsub-name";
const topic = "topic-a";
async function start() {
const client = new DaprClient();
// Publish multiple messages to a topic.
await client.pubsub.publishBulk(pubSubName, topic, ["message 1", "message 2", "message 3"]);
// Publish multiple messages to a topic with explicit bulk publish messages.
const bulkPublishMessages = [
{
entryID: "entry-1",
contentType: "application/json",
event: { hello: "foo message 1" },
},
{
entryID: "entry-2",
contentType: "application/cloudevents+json",
event: {
specversion: "1.0",
source: "/some/source",
type: "example",
id: "1234",
data: "foo message 2",
datacontenttype: "text/plain"
},
},
{
entryID: "entry-3",
contentType: "text/plain",
event: "foo message 3",
},
];
await client.pubsub.publishBulk(pubSubName, topic, bulkPublishMessages);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
```
{{% /codetab %}}
{{% codetab %}}
```csharp
using System;
using System.Collections.Generic;
using Dapr.Client;
const string PubsubName = "my-pubsub-name";
const string TopicName = "topic-a";
IReadOnlyList<object> BulkPublishData = new List<object>() {
new { Id = "17", Amount = 10m },
new { Id = "18", Amount = 20m },
new { Id = "19", Amount = 30m }
};
using var client = new DaprClientBuilder().Build();
var res = await client.BulkPublishEventAsync(PubsubName, TopicName, BulkPublishData);
if (res == null) {
throw new Exception("null response from dapr");
}
if (res.FailedEntries.Count > 0)
{
Console.WriteLine("Some events failed to be published!");
foreach (var failedEntry in res.FailedEntries)
{
Console.WriteLine("EntryId: " + failedEntry.Entry.EntryId + " Error message: " +
failedEntry.ErrorMessage);
}
}
else
{
Console.WriteLine("Published all events!");
}
```
{{% /codetab %}}
{{% codetab %}}
```python
import requests
import json
base_url = "http://localhost:3500/v1.0-alpha1/publish/bulk/{}/{}"
pubsub_name = "my-pubsub-name"
topic_name = "topic-a"
payload = [
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
}
]
response = requests.post(base_url.format(pubsub_name, topic_name), json=payload)
print(response.status_code)
```
{{% /codetab %}}
{{% codetab %}}
```go
package main
import (
"fmt"
"strings"
"net/http"
"io/ioutil"
)
const (
pubsubName = "my-pubsub-name"
topicName = "topic-a"
baseUrl = "http://localhost:3500/v1.0-alpha1/publish/bulk/%s/%s"
)
func main() {
url := fmt.Sprintf(baseUrl, pubsubName, topicName)
method := "POST"
payload := strings.NewReader(`[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
}
]`)
client := &http.Client {}
req, _ := http.NewRequest(method, url, payload)
req.Header.Add("Content-Type", "application/json")
res, err := client.Do(req)
// ...
}
```
{{% /codetab %}}
{{% codetab %}}
```bash
curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a \
-H 'Content-Type: application/json' \
-d '[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
},
]'
```
{{% /codetab %}}
{{% codetab %}}
```powershell
Invoke-RestMethod -Method Post -ContentType 'application/json' -Uri 'http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a' `
-Body '[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
},
]'
```
{{% /codetab %}}
{{< /tabs >}}
## How components handle publishing and subscribing to bulk messages
Some pub/sub brokers support sending and receiving multiple messages in a single request. When a component supports bulk publish or subscribe operations, Dapr runtime uses them to further optimize the communication between the Dapr sidecar and the underlying pub/sub broker.
For components that do not have bulk publish or subscribe support, Dapr runtime uses the regular publish and subscribe APIs to send and receive messages one by one. This is still more efficient than directly using the regular publish or subscribe APIs, because applications can still send/receive multiple messages in a single request to/from Dapr.
## Supported components
Refer to the [component reference]({{< ref supported-pubsub >}}) to see which components support bulk publish and subscribe operations.
## Related links
- List of [supported pub/sub components]({{< ref supported-pubsub >}})
- Read the [API reference]({{< ref pubsub_api.md >}})

View File

@ -119,6 +119,10 @@ By default, all topic messages associated with an instance of a pub/sub componen
Dapr can set a timeout message on a per-message basis, meaning that if the message is not read from the pub/sub component, then the message is discarded. This timeout message prevents a build up of unread messages. If a message has been in the queue longer than the configured TTL, it is marked as dead. For more information, read [pub/sub message TTL]({{< ref pubsub-message-ttl.md >}}).
### Publish and subscribe to bulk messages
Dapr supports sending and receiving multiple messages in a single request. When writing applications that need to send or receive a large number of messages, using bulk operations allows achieving high throughput by reducing the overall number of requests. For more information, read [pub/sub bulk messages]({{< ref pubsub-bulk.md >}}).
## Try out pub/sub
### Quickstarts and tutorials

View File

@ -191,25 +191,24 @@ using System.Threading;
//code
namespace EventService
{
class Program
{
static async Task Main(string[] args)
{
while(true) {
System.Threading.Thread.Sleep(5000);
Random random = new Random();
int orderId = random.Next(1,1000);
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using var client = new DaprClientBuilder().Build();
//Using Dapr SDK to invoke a method
var result = client.CreateInvokeMethodRequest(HttpMethod.Get, "checkout", "checkout/" + orderId, cancellationToken);
await client.InvokeMethodAsync(result);
Console.WriteLine("Order requested: " + orderId);
Console.WriteLine("Result: " + result);
}
}
}
class Program
{
static async Task Main(string[] args)
{
while(true) {
System.Threading.Thread.Sleep(5000);
Random random = new Random();
int orderId = random.Next(1,1000);
using var client = new DaprClientBuilder().Build();
//Using Dapr SDK to invoke a method
var result = client.CreateInvokeMethodRequest(HttpMethod.Get, "checkout", "checkout/" + orderId);
await client.InvokeMethodAsync(result);
Console.WriteLine("Order requested: " + orderId);
Console.WriteLine("Result: " + result);
}
}
}
}
```

View File

@ -64,6 +64,90 @@ Parameter | Description
> Additional metadata parameters are available based on each pubsub component.
## Publish multiple messages to a given topic
This endpoint lets you publish multiple messages to consumers who are listening on a `topic`.
### HTTP Request
```
POST http://localhost:<daprPort>/v1.0-alpha1/publish/bulk/<pubsubname>/<topic>[?<metadata>]
```
The request body should contain a JSON array of entries with:
- Unique entry IDs
- The event to publish
- The content type of the event
If the content type for an event is not `application/cloudevents+json`, it is auto-wrapped as a CloudEvent (unless `metadata.rawPayload` is set to `true`).
Example:
```bash
curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/pubsubName/deathStarStatus \
-H 'Content-Type: application/json' \
-d '[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
},
]'
```
### Headers
The `Content-Type` header should always be set to `application/json` since the request body is a JSON array.
### URL Parameters
|**Parameter**|**Description**|
|--|--|
|`daprPort`|The Dapr port|
|`pubsubname`|The name of pub/sub component|
|`topic`|The name of the topic|
|`metadata`|Query parameters for [metadata]({{< ref "pubsub_api.md#metadata" >}})|
### Metadata
Metadata can be sent via query parameters in the request's URL. It must be prefixed with `metadata.`, as shown in the table below.
|**Parameter**|**Description**|
|--|--|
|`metadata.rawPayload`|Boolean to determine if Dapr should publish the messages without wrapping them as CloudEvent.|
|`metadata.maxBulkPubBytes`|Maximum bytes to publish in a bulk publish request.|
#### HTTP Response
|**HTTP Status**|**Description**|
|--|--|
|204|All messages delivered|
|400|Pub/sub does not exist|
|403|Forbidden by access controls|
|500|At least one message failed to be delivered|
In case of a 500 status code, the response body will contain a JSON object containing a list of entries that failed to be delivered. For example from our request above, if the entry with event `"first text message"` failed to be delivered, the response would contain its entry ID and an error message from the underlying pub/sub component.
```json
{
"failedEntries": [
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"error": "some error message"
},
],
"errorCode": "ERR_PUBSUB_PUBLISH_MESSAGE"
}
```
## Optional Application (User Code) Routes
### Provide a route for Dapr to discover topic subscriptions

View File

@ -2,4 +2,7 @@
link: setup-aws-snssqs
state: Stable
version: v1
since: "1.10"
since: "1.6"
features:
bulkPublish: false
bulkSubscribe: false

View File

@ -3,8 +3,14 @@
state: Stable
version: v1
since: "1.8"
features:
bulkPublish: true
bulkSubscribe: false
- component: Azure Service Bus
link: setup-azure-servicebus
state: Stable
version: v1
since: "1.0"
features:
bulkPublish: true
bulkSubscribe: true

View File

@ -3,3 +3,6 @@
state: Alpha
version: v1
since: "1.0"
features:
bulkPublish: false
bulkSubscribe: false

View File

@ -3,53 +3,86 @@
state: Deprecated
version: v1
since: "1.9"
features:
bulkPublish: false
bulkSubscribe: false
- component: In-memory
link: setup-inmemory
state: Beta
version: v1
since: "1.7"
features:
bulkPublish: false
bulkSubscribe: false
- component: Apache Kafka
link: setup-apache-kafka
state: Stable
version: v1
since: "1.5"
features:
bulkPublish: true
bulkSubscribe: true
- component: Redis Streams
link: setup-redis-pubsub
state: Stable
version: v1
since: "1.0"
features:
bulkPublish: false
bulkSubscribe: false
- component: JetStream
link: setup-jetstream
state: Beta
version: v1
since: "1.10"
features:
bulkPublish: false
bulkSubscribe: false
- component: Pulsar
link: setup-pulsar
state: Stable
version: v1
since: "1.10"
features:
bulkPublish: false
bulkSubscribe: false
- component: MQTT3
link: setup-mqtt3
state: Stable
version: v1
since: "1.7"
features:
bulkPublish: false
bulkSubscribe: false
- component: NATS Streaming
link: setup-nats-streaming
state: Beta
version: v1
since: "1.0"
features:
bulkPublish: false
bulkSubscribe: false
- component: RabbitMQ
link: setup-rabbitmq
state: Stable
version: v1
since: "1.7"
features:
bulkPublish: false
bulkSubscribe: false
- component: RocketMQ
link: setup-rocketmq
state: Alpha
version: v1
since: "1.8"
features:
bulkPublish: false
bulkSubscribe: false
- component: Solace-AMQP
link: setup-solace-amqp
state: Beta
version: v1
since: "1.10"
features:
bulkPublish: false
bulkSubscribe: false

View File

@ -10,6 +10,8 @@
<table width="100%">
<tr>
<th>Component</th>
<th>Bulk Publish</th>
<th>Bulk Subscribe</th>
<th>Status</th>
<th>Component version</th>
<th>Since runtime version</th>
@ -18,6 +20,8 @@
<tr>
<td><a href="/reference/components-reference/supported-pubsub/{{ .link }}/">{{ .component }}</a>
</td>
<td align="center">{{ if .features.bulkPublish }}✅{{else}}<img src="/images/emptybox.png">{{ end }}</td>
<td align="center">{{ if .features.bulkSubscribe }}✅{{else}}<img src="/images/emptybox.png">{{ end }}</td>
<td>{{ .state }}</td>
<td>{{ .version }}</td>
<td>{{ .since }}</td>

@ -1 +1 @@
Subproject commit e87b9ad6eefaa05390144d82642df13c5b4bed17
Subproject commit 52b82d7ce6599822a37d2528379f5ca146e286bb