diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md index e7ebc89ef..afaff5430 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md @@ -1,7 +1,7 @@ --- type: docs -title: "Send and receive messages in bulk" -linkTitle: "Send and receive messages in bulk" +title: "Publish and subscribe to bulk messages" +linkTitle: "Publish and subscribe to bulk messages" weight: 7100 description: "Learn how to use the bulk publish and subscribe APIs in Dapr." --- @@ -10,7 +10,8 @@ description: "Learn how to use the bulk publish and subscribe APIs in Dapr." The bulk publish and subscribe APIs are in **alpha** stage. {{% /alert %}} -With the bulk publish and subscribe APIs, you can send and receive multiple messages in a single request. +With the bulk publish and subscribe APIs, you can publish and subscribe to multiple messages in a single request. + ## Native bulk publish and subscribe support @@ -28,7 +29,7 @@ The bulk publish API allows you to publish multiple messages to a topic in a sin ### Example -{{< tabs Java Javascript "HTTP API (Bash)" "HTTP API (PowerShell)" >}} +{{< tabs Java Javascript Dotnet Python Go "HTTP API (Bash)" "HTTP API (PowerShell)" >}} {{% codetab %}} @@ -41,6 +42,9 @@ import java.util.ArrayList; import java.util.List; class BulkPublisher { + private static final String PUBSUB_NAME = "my-pubsub-name"; + private static final String TOPIC_NAME = "topic-a"; + public void publishMessages() { try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) { // Create a list of messages to publish @@ -112,19 +116,134 @@ start().catch((e) => { {{% codetab %}} +```csharp +using System; +using System.Collections.Generic; +using Dapr.Client; + +const string PubsubName = "my-pubsub-name"; +const string TopicName = "topic-a"; +IReadOnlyList BulkPublishData = new List() { + new { Id = "17", Amount = 10m }, + new { Id = "18", Amount = 20m }, + new { Id = "19", Amount = 30m } +}; + +using var client = new DaprClientBuilder().Build(); + +var res = await client.BulkPublishEventAsync(PubsubName, TopicName, BulkPublishData); +if (res == null) { + throw new Exception("null response from dapr"); +} +if (res.FailedEntries.Count > 0) +{ + Console.WriteLine("Some events failed to be published!"); + foreach (var failedEntry in res.FailedEntries) + { + Console.WriteLine("EntryId: " + failedEntry.Entry.EntryId + " Error message: " + + failedEntry.ErrorMessage); + } +} +else +{ + Console.WriteLine("Published all events!"); +} +``` + +{{% /codetab %}} + +{{% codetab %}} + +```python +import requests +import json + +base_url = "http://localhost:3500/v1.0-alpha1/publish/bulk/{}/{}" +pubsub_name = "my-pubsub-name" +topic_name = "topic-a" +payload = [ + { + "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002", + "event": "first text message", + "contentType": "text/plain" + }, + { + "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002", + "event": { + "message": "second JSON message" + }, + "contentType": "application/json" + } +] + +response = requests.post(base_url.format(pubsub_name, topic_name), json=payload) +print(response.status_code) +``` + +{{% /codetab %}} + +{{% codetab %}} + +```go +package main + +import ( + "fmt" + "strings" + "net/http" + "io/ioutil" +) + +const ( + pubsubName = "my-pubsub-name" + topicName = "topic-a" + baseUrl = "http://localhost:3500/v1.0-alpha1/publish/bulk/%s/%s" +) + +func main() { + url := fmt.Sprintf(baseUrl, pubsubName, topicName) + method := "POST" + payload := strings.NewReader(`[ + { + "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002", + "event": "first text message", + "contentType": "text/plain" + }, + { + "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002", + "event": { + "message": "second JSON message" + }, + "contentType": "application/json" + } +]`) + + client := &http.Client {} + req, _ := http.NewRequest(method, url, payload) + + req.Header.Add("Content-Type", "application/json") + res, err := client.Do(req) + // ... +} +``` + +{{% /codetab %}} + +{{% codetab %}} + ```bash curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a \ -H 'Content-Type: application/json' \ -d '[ { "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002", - "event": "first", + "event": "first text message", "contentType": "text/plain" }, { "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002", "event": { - "message": "second" + "message": "second JSON message" }, "contentType": "application/json" }, @@ -140,13 +259,13 @@ Invoke-RestMethod -Method Post -ContentType 'application/json' -Uri 'http://loca -Body '[ { "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002", - "event": "first", + "event": "first text message", "contentType": "text/plain" }, { "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002", "event": { - "message": "second" + "message": "second JSON message" }, "contentType": "application/json" }, diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-overview.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-overview.md index 93bc84314..b7c999a94 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-overview.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-overview.md @@ -119,7 +119,7 @@ By default, all topic messages associated with an instance of a pub/sub componen Dapr can set a timeout message on a per-message basis, meaning that if the message is not read from the pub/sub component, then the message is discarded. This timeout message prevents a build up of unread messages. If a message has been in the queue longer than the configured TTL, it is marked as dead. For more information, read [pub/sub message TTL]({{< ref pubsub-message-ttl.md >}}). -### Send and receive messages in bulk +### Publish and subscribe to bulk messages Dapr supports sending and receiving multiple messages in a single request. This is useful for applications that require a high throughput. For more information, read [pub/sub bulk messages]({{< ref pubsub-bulk.md >}}). diff --git a/daprdocs/content/en/reference/api/pubsub_api.md b/daprdocs/content/en/reference/api/pubsub_api.md index 3bdbe09aa..198f71f10 100644 --- a/daprdocs/content/en/reference/api/pubsub_api.md +++ b/daprdocs/content/en/reference/api/pubsub_api.md @@ -89,13 +89,13 @@ curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/pubsubName/deathStar -d '[ { "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002", - "event": "first", + "event": "first text message", "contentType": "text/plain" }, { "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002", "event": { - "message": "second" + "message": "second JSON message" }, "contentType": "application/json" }, @@ -104,7 +104,7 @@ curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/pubsubName/deathStar ### Headers -The `Content-Type` header should always be set to `application/json`. +The `Content-Type` header should always be set to `application/json` since the request body is a JSON array. ### URL Parameters @@ -134,14 +134,14 @@ Metadata can be sent via query parameters in the request's URL. It must be prefi |403|Forbidden by access controls| |500|At least one message failed to be delivered| -The response body is a JSON containing a list of failed entries. Example: +In case of a 500 status code, the response body will contain a JSON object containing a list of entries that failed to be delivered. For example from our request above, if the entry with event `"first text message"` failed to be delivered, the response would contain its entry ID and an error message from the underlying pub/sub component. ```json { "failedEntries": [ { "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002", - "error": "error message" + "error": "some error message" }, ], "errorCode": "ERR_PUBSUB_PUBLISH_MESSAGE"