diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md new file mode 100644 index 000000000..35c88dd86 --- /dev/null +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md @@ -0,0 +1,285 @@ +--- +type: docs +title: "Publish and subscribe to bulk messages" +linkTitle: "Publish and subscribe to bulk messages" +weight: 7100 +description: "Learn how to use the bulk publish and subscribe APIs in Dapr." +--- + +{{% alert title="alpha" color="warning" %}} +The bulk publish and subscribe APIs are in **alpha** stage. +{{% /alert %}} + +With the bulk publish and subscribe APIs, you can publish and subscribe to multiple messages in a single request. When writing applications that need to send or receive a large number of messages, using bulk operations allows achieving high throughput by reducing the overall number of requests between the Dapr sidecar, the application, and the underlying pub/sub broker. + +## Publishing messages in bulk + +### Restrictions when publishing messages in bulk + +The bulk publish API allows you to publish multiple messages to a topic in a single request. It is *non-transactional*, i.e., from a single bulk request, some messages can succeed and some can fail. If any of the messages fail to publish, the bulk publish operation returns a list of failed messages. + +The bulk publish operation also does not guarantee any ordering of messages. + +### Example + +{{< tabs Java Javascript Dotnet Python Go "HTTP API (Bash)" "HTTP API (PowerShell)" >}} + +{{% codetab %}} + +```java +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.DaprPreviewClient; +import io.dapr.client.domain.BulkPublishResponse; +import io.dapr.client.domain.BulkPublishResponseFailedEntry; +import java.util.ArrayList; +import java.util.List; + +class BulkPublisher { + private static final String PUBSUB_NAME = "my-pubsub-name"; + private static final String TOPIC_NAME = "topic-a"; + + public void publishMessages() { + try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) { + // Create a list of messages to publish + List messages = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + String message = String.format("This is message #%d", i); + messages.add(message); + } + + // Publish list of messages using the bulk publish API + BulkPublishResponse res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block(); + } + } +} +``` + +{{% /codetab %}} + +{{% codetab %}} + +```typescript + +import { DaprClient } from "@dapr/dapr"; + +const pubSubName = "my-pubsub-name"; +const topic = "topic-a"; + +async function start() { + const client = new DaprClient(); + + // Publish multiple messages to a topic. + await client.pubsub.publishBulk(pubSubName, topic, ["message 1", "message 2", "message 3"]); + + // Publish multiple messages to a topic with explicit bulk publish messages. + const bulkPublishMessages = [ + { + entryID: "entry-1", + contentType: "application/json", + event: { hello: "foo message 1" }, + }, + { + entryID: "entry-2", + contentType: "application/cloudevents+json", + event: { + specversion: "1.0", + source: "/some/source", + type: "example", + id: "1234", + data: "foo message 2", + datacontenttype: "text/plain" + }, + }, + { + entryID: "entry-3", + contentType: "text/plain", + event: "foo message 3", + }, + ]; + await client.pubsub.publishBulk(pubSubName, topic, bulkPublishMessages); +} + +start().catch((e) => { + console.error(e); + process.exit(1); +}); +``` + +{{% /codetab %}} + +{{% codetab %}} + +```csharp +using System; +using System.Collections.Generic; +using Dapr.Client; + +const string PubsubName = "my-pubsub-name"; +const string TopicName = "topic-a"; +IReadOnlyList BulkPublishData = new List() { + new { Id = "17", Amount = 10m }, + new { Id = "18", Amount = 20m }, + new { Id = "19", Amount = 30m } +}; + +using var client = new DaprClientBuilder().Build(); + +var res = await client.BulkPublishEventAsync(PubsubName, TopicName, BulkPublishData); +if (res == null) { + throw new Exception("null response from dapr"); +} +if (res.FailedEntries.Count > 0) +{ + Console.WriteLine("Some events failed to be published!"); + foreach (var failedEntry in res.FailedEntries) + { + Console.WriteLine("EntryId: " + failedEntry.Entry.EntryId + " Error message: " + + failedEntry.ErrorMessage); + } +} +else +{ + Console.WriteLine("Published all events!"); +} +``` + +{{% /codetab %}} + +{{% codetab %}} + +```python +import requests +import json + +base_url = "http://localhost:3500/v1.0-alpha1/publish/bulk/{}/{}" +pubsub_name = "my-pubsub-name" +topic_name = "topic-a" +payload = [ + { + "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002", + "event": "first text message", + "contentType": "text/plain" + }, + { + "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002", + "event": { + "message": "second JSON message" + }, + "contentType": "application/json" + } +] + +response = requests.post(base_url.format(pubsub_name, topic_name), json=payload) +print(response.status_code) +``` + +{{% /codetab %}} + +{{% codetab %}} + +```go +package main + +import ( + "fmt" + "strings" + "net/http" + "io/ioutil" +) + +const ( + pubsubName = "my-pubsub-name" + topicName = "topic-a" + baseUrl = "http://localhost:3500/v1.0-alpha1/publish/bulk/%s/%s" +) + +func main() { + url := fmt.Sprintf(baseUrl, pubsubName, topicName) + method := "POST" + payload := strings.NewReader(`[ + { + "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002", + "event": "first text message", + "contentType": "text/plain" + }, + { + "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002", + "event": { + "message": "second JSON message" + }, + "contentType": "application/json" + } +]`) + + client := &http.Client {} + req, _ := http.NewRequest(method, url, payload) + + req.Header.Add("Content-Type", "application/json") + res, err := client.Do(req) + // ... +} +``` + +{{% /codetab %}} + +{{% codetab %}} + +```bash +curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a \ + -H 'Content-Type: application/json' \ + -d '[ + { + "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002", + "event": "first text message", + "contentType": "text/plain" + }, + { + "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002", + "event": { + "message": "second JSON message" + }, + "contentType": "application/json" + }, + ]' +``` + +{{% /codetab %}} + +{{% codetab %}} + +```powershell +Invoke-RestMethod -Method Post -ContentType 'application/json' -Uri 'http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a' ` +-Body '[ + { + "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002", + "event": "first text message", + "contentType": "text/plain" + }, + { + "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002", + "event": { + "message": "second JSON message" + }, + "contentType": "application/json" + }, + ]' +``` + +{{% /codetab %}} + +{{< /tabs >}} + +## How components handle publishing and subscribing to bulk messages + +Some pub/sub brokers support sending and receiving multiple messages in a single request. When a component supports bulk publish or subscribe operations, Dapr runtime uses them to further optimize the communication between the Dapr sidecar and the underlying pub/sub broker. + +For components that do not have bulk publish or subscribe support, Dapr runtime uses the regular publish and subscribe APIs to send and receive messages one by one. This is still more efficient than directly using the regular publish or subscribe APIs, because applications can still send/receive multiple messages in a single request to/from Dapr. + +## Supported components + +Refer to the [component reference]({{< ref supported-pubsub >}}) to see which components support bulk publish and subscribe operations. + +## Related links + +- List of [supported pub/sub components]({{< ref supported-pubsub >}}) +- Read the [API reference]({{< ref pubsub_api.md >}}) diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-overview.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-overview.md index 2b90adda6..b0015ee2e 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-overview.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-overview.md @@ -119,6 +119,10 @@ By default, all topic messages associated with an instance of a pub/sub componen Dapr can set a timeout message on a per-message basis, meaning that if the message is not read from the pub/sub component, then the message is discarded. This timeout message prevents a build up of unread messages. If a message has been in the queue longer than the configured TTL, it is marked as dead. For more information, read [pub/sub message TTL]({{< ref pubsub-message-ttl.md >}}). +### Publish and subscribe to bulk messages + +Dapr supports sending and receiving multiple messages in a single request. When writing applications that need to send or receive a large number of messages, using bulk operations allows achieving high throughput by reducing the overall number of requests. For more information, read [pub/sub bulk messages]({{< ref pubsub-bulk.md >}}). + ## Try out pub/sub ### Quickstarts and tutorials diff --git a/daprdocs/content/en/developing-applications/building-blocks/service-invocation/howto-invoke-discover-services.md b/daprdocs/content/en/developing-applications/building-blocks/service-invocation/howto-invoke-discover-services.md index e73456d4b..183b537f6 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/service-invocation/howto-invoke-discover-services.md +++ b/daprdocs/content/en/developing-applications/building-blocks/service-invocation/howto-invoke-discover-services.md @@ -191,25 +191,24 @@ using System.Threading; //code namespace EventService { - class Program - { - static async Task Main(string[] args) - { - while(true) { - System.Threading.Thread.Sleep(5000); - Random random = new Random(); - int orderId = random.Next(1,1000); - CancellationTokenSource source = new CancellationTokenSource(); - CancellationToken cancellationToken = source.Token; - using var client = new DaprClientBuilder().Build(); - //Using Dapr SDK to invoke a method - var result = client.CreateInvokeMethodRequest(HttpMethod.Get, "checkout", "checkout/" + orderId, cancellationToken); - await client.InvokeMethodAsync(result); - Console.WriteLine("Order requested: " + orderId); - Console.WriteLine("Result: " + result); - } - } - } + class Program + { + static async Task Main(string[] args) + { + while(true) { + System.Threading.Thread.Sleep(5000); + Random random = new Random(); + int orderId = random.Next(1,1000); + using var client = new DaprClientBuilder().Build(); + + //Using Dapr SDK to invoke a method + var result = client.CreateInvokeMethodRequest(HttpMethod.Get, "checkout", "checkout/" + orderId); + await client.InvokeMethodAsync(result); + Console.WriteLine("Order requested: " + orderId); + Console.WriteLine("Result: " + result); + } + } + } } ``` diff --git a/daprdocs/content/en/reference/api/pubsub_api.md b/daprdocs/content/en/reference/api/pubsub_api.md index f4d8e47fa..198f71f10 100644 --- a/daprdocs/content/en/reference/api/pubsub_api.md +++ b/daprdocs/content/en/reference/api/pubsub_api.md @@ -64,6 +64,90 @@ Parameter | Description > Additional metadata parameters are available based on each pubsub component. +## Publish multiple messages to a given topic + +This endpoint lets you publish multiple messages to consumers who are listening on a `topic`. + +### HTTP Request + +``` +POST http://localhost:/v1.0-alpha1/publish/bulk//[?] +``` + +The request body should contain a JSON array of entries with: +- Unique entry IDs +- The event to publish +- The content type of the event + +If the content type for an event is not `application/cloudevents+json`, it is auto-wrapped as a CloudEvent (unless `metadata.rawPayload` is set to `true`). + +Example: + +```bash +curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/pubsubName/deathStarStatus \ + -H 'Content-Type: application/json' \ + -d '[ + { + "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002", + "event": "first text message", + "contentType": "text/plain" + }, + { + "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002", + "event": { + "message": "second JSON message" + }, + "contentType": "application/json" + }, + ]' +``` + +### Headers + +The `Content-Type` header should always be set to `application/json` since the request body is a JSON array. + +### URL Parameters + +|**Parameter**|**Description**| +|--|--| +|`daprPort`|The Dapr port| +|`pubsubname`|The name of pub/sub component| +|`topic`|The name of the topic| +|`metadata`|Query parameters for [metadata]({{< ref "pubsub_api.md#metadata" >}})| + +### Metadata + +Metadata can be sent via query parameters in the request's URL. It must be prefixed with `metadata.`, as shown in the table below. + +|**Parameter**|**Description**| +|--|--| +|`metadata.rawPayload`|Boolean to determine if Dapr should publish the messages without wrapping them as CloudEvent.| +|`metadata.maxBulkPubBytes`|Maximum bytes to publish in a bulk publish request.| + + +#### HTTP Response + +|**HTTP Status**|**Description**| +|--|--| +|204|All messages delivered| +|400|Pub/sub does not exist| +|403|Forbidden by access controls| +|500|At least one message failed to be delivered| + +In case of a 500 status code, the response body will contain a JSON object containing a list of entries that failed to be delivered. For example from our request above, if the entry with event `"first text message"` failed to be delivered, the response would contain its entry ID and an error message from the underlying pub/sub component. + +```json +{ + "failedEntries": [ + { + "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002", + "error": "some error message" + }, + ], + "errorCode": "ERR_PUBSUB_PUBLISH_MESSAGE" +} +``` + ## Optional Application (User Code) Routes ### Provide a route for Dapr to discover topic subscriptions diff --git a/daprdocs/data/components/pubsub/aws.yaml b/daprdocs/data/components/pubsub/aws.yaml index 682428c38..3f937b51d 100644 --- a/daprdocs/data/components/pubsub/aws.yaml +++ b/daprdocs/data/components/pubsub/aws.yaml @@ -2,4 +2,7 @@ link: setup-aws-snssqs state: Stable version: v1 - since: "1.10" + since: "1.6" + features: + bulkPublish: false + bulkSubscribe: false diff --git a/daprdocs/data/components/pubsub/azure.yaml b/daprdocs/data/components/pubsub/azure.yaml index 3bb3c279a..e278829af 100644 --- a/daprdocs/data/components/pubsub/azure.yaml +++ b/daprdocs/data/components/pubsub/azure.yaml @@ -3,8 +3,14 @@ state: Stable version: v1 since: "1.8" + features: + bulkPublish: true + bulkSubscribe: false - component: Azure Service Bus link: setup-azure-servicebus state: Stable version: v1 since: "1.0" + features: + bulkPublish: true + bulkSubscribe: true diff --git a/daprdocs/data/components/pubsub/gcp.yaml b/daprdocs/data/components/pubsub/gcp.yaml index 8fd547af9..815ced19b 100644 --- a/daprdocs/data/components/pubsub/gcp.yaml +++ b/daprdocs/data/components/pubsub/gcp.yaml @@ -3,3 +3,6 @@ state: Alpha version: v1 since: "1.0" + features: + bulkPublish: false + bulkSubscribe: false diff --git a/daprdocs/data/components/pubsub/generic.yaml b/daprdocs/data/components/pubsub/generic.yaml index ce5ed8f34..a2f751ea8 100644 --- a/daprdocs/data/components/pubsub/generic.yaml +++ b/daprdocs/data/components/pubsub/generic.yaml @@ -3,53 +3,86 @@ state: Deprecated version: v1 since: "1.9" + features: + bulkPublish: false + bulkSubscribe: false - component: In-memory link: setup-inmemory state: Beta version: v1 since: "1.7" + features: + bulkPublish: false + bulkSubscribe: false - component: Apache Kafka link: setup-apache-kafka state: Stable version: v1 since: "1.5" + features: + bulkPublish: true + bulkSubscribe: true - component: Redis Streams link: setup-redis-pubsub state: Stable version: v1 since: "1.0" + features: + bulkPublish: false + bulkSubscribe: false - component: JetStream link: setup-jetstream state: Beta version: v1 since: "1.10" + features: + bulkPublish: false + bulkSubscribe: false - component: Pulsar link: setup-pulsar state: Stable version: v1 since: "1.10" + features: + bulkPublish: false + bulkSubscribe: false - component: MQTT3 link: setup-mqtt3 state: Stable version: v1 since: "1.7" + features: + bulkPublish: false + bulkSubscribe: false - component: NATS Streaming link: setup-nats-streaming state: Beta version: v1 since: "1.0" + features: + bulkPublish: false + bulkSubscribe: false - component: RabbitMQ link: setup-rabbitmq state: Stable version: v1 since: "1.7" + features: + bulkPublish: false + bulkSubscribe: false - component: RocketMQ link: setup-rocketmq state: Alpha version: v1 since: "1.8" + features: + bulkPublish: false + bulkSubscribe: false - component: Solace-AMQP link: setup-solace-amqp state: Beta version: v1 since: "1.10" + features: + bulkPublish: false + bulkSubscribe: false diff --git a/daprdocs/layouts/partials/components/pubsub.html b/daprdocs/layouts/partials/components/pubsub.html index 01c80e7ce..98d7cb106 100644 --- a/daprdocs/layouts/partials/components/pubsub.html +++ b/daprdocs/layouts/partials/components/pubsub.html @@ -10,6 +10,8 @@ + + @@ -18,6 +20,8 @@ + + diff --git a/sdkdocs/dotnet b/sdkdocs/dotnet index e87b9ad6e..52b82d7ce 160000 --- a/sdkdocs/dotnet +++ b/sdkdocs/dotnet @@ -1 +1 @@ -Subproject commit e87b9ad6eefaa05390144d82642df13c5b4bed17 +Subproject commit 52b82d7ce6599822a37d2528379f5ca146e286bb
ComponentBulk PublishBulk Subscribe Status Component version Since runtime version
{{ .component }} {{ if .features.bulkPublish }}✅{{else}}{{ end }}{{ if .features.bulkSubscribe }}✅{{else}}{{ end }} {{ .state }} {{ .version }} {{ .since }}