mirror of https://github.com/dapr/docs.git
Add Bulk Subscribe Docs
Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
This commit is contained in:
parent
22a3c40890
commit
b4d1e2b88f
|
@ -14,9 +14,9 @@ With the bulk publish and subscribe APIs, you can send and receive multiple mess
|
|||
|
||||
## Native bulk publish and subscribe support
|
||||
|
||||
When a pub/sub component supports the bulk publish API natively, Dapr also publishes messages to the underlying pub/sub component in bulk.
|
||||
When a pub/sub component supports the bulk publish/subscribe API natively, Dapr also publishes/subscribes messages to/from the underlying pub/sub component in bulk.
|
||||
|
||||
Otherwise, Dapr falls back to sending messages one by one to the underlying pub/sub component. This is still more efficient than using the regular publish API, because applications can still send multiple messages in a single request to Dapr.
|
||||
Otherwise, Dapr falls back to sending/receiving messages one by one to/from the underlying pub/sub component. This is still more efficient than using the regular publish/subscribe API, because applications can still send/receive multiple messages in a single request to/from Dapr.
|
||||
|
||||
## Supported components
|
||||
|
||||
|
@ -157,6 +157,128 @@ Invoke-RestMethod -Method Post -ContentType 'application/json' -Uri 'http://loca
|
|||
|
||||
{{< /tabs >}}
|
||||
|
||||
## Subscribing messages in bulk
|
||||
|
||||
The bulk subscribe API allows you to subscribe multiple messages from a topic in a single request.
|
||||
As we know from [How to: Publish & Subscribe to topics]({{< ref howto-publish-subscribe.md >}}), there are two ways to subscribe to topic(s):
|
||||
|
||||
- **Declaratively**, where subscriptions are defined in an external file.
|
||||
- **Programmatically**, where subscriptions are defined in user code.
|
||||
|
||||
To Bulk Subscribe to topic(s), we just need to use `bulkSubscribe` spec attribute, something like following:
|
||||
|
||||
```yaml
|
||||
apiVersion: dapr.io/v2alpha1
|
||||
kind: Subscription
|
||||
metadata:
|
||||
name: order-pub-sub
|
||||
spec:
|
||||
topic: orders
|
||||
routes:
|
||||
default: /checkout
|
||||
pubsubname: order-pub-sub
|
||||
bulkSubscribe:
|
||||
enabled: true
|
||||
maxMessagesCount: 100
|
||||
maxAwaitDurationMs: 40
|
||||
scopes:
|
||||
- orderprocessing
|
||||
- checkout
|
||||
```
|
||||
|
||||
In the example above, `bulkSubscribe` altogether is Optional. But, if you decide to use `bulkSubscribe`, then:
|
||||
- `enabled` is mandatory to be given.
|
||||
- You can optionally configure max number of messages (`maxMessagesCount`) that should be delivered to it via Dapr in a bulk message
|
||||
- You can also optionally provide max duration to await (`maxAwaitDurationMs`) before a bulk message is sent via Dapr to App
|
||||
|
||||
In case, Application decides not to configure maxMessagesCount and/or maxAwaitDurationMs, defaults as per respective component in Dapr will be used.
|
||||
|
||||
Application receives an EntryId associated with each entry (individual message) in this bulk message and this EntryId has to be used by App to communicate back the status of that particular entry. In case, App misses to notify status of an EntryId, it will be considered as RETRY.
|
||||
|
||||
Status | Description
|
||||
--------- | -----------
|
||||
`SUCCESS` | Message is processed successfully
|
||||
`RETRY` | Message to be retried by Dapr
|
||||
`DROP` | Warning is logged and message is dropped
|
||||
|
||||
### Example
|
||||
|
||||
{{< tabs Java Javascript "HTTP API (Bash)" "HTTP API (PowerShell)" >}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
```java
|
||||
import io.dapr.Topic;
|
||||
import io.dapr.client.domain.BulkSubscribeAppResponse;
|
||||
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
|
||||
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
|
||||
import io.dapr.client.domain.BulkSubscribeMessage;
|
||||
import io.dapr.client.domain.BulkSubscribeMessageEntry;
|
||||
import io.dapr.client.domain.CloudEvent;
|
||||
import io.dapr.springboot.annotations.BulkSubscribe;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
class BulkSubscriber {
|
||||
@BulkSubscribe()
|
||||
// @BulkSubscribe(maxMessagesCount = 100, maxAwaitDurationMs = 40)
|
||||
@Topic(name = "topicbulk", pubsubName = "orderPubSub")
|
||||
@PostMapping(path = "/topicbulk")
|
||||
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
|
||||
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
|
||||
return Mono.fromCallable(() -> {
|
||||
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
|
||||
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
|
||||
try {
|
||||
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
|
||||
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
|
||||
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
|
||||
}
|
||||
}
|
||||
return new BulkSubscribeAppResponse(entries);
|
||||
});
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
```typescript
|
||||
|
||||
import { DaprServer } from "@dapr/dapr";
|
||||
|
||||
const pubSubName = "orderPubSub";
|
||||
const topic = "topicbulk";
|
||||
|
||||
const DAPR_HOST = process.env.DAPR_HOST || "127.0.0.1";
|
||||
const DAPR_HTTP_PORT = process.env.DAPR_HTTP_PORT || "3502";
|
||||
const SERVER_HOST = process.env.SERVER_HOST || "127.0.0.1";
|
||||
const SERVER_PORT = process.env.APP_PORT || 5001;
|
||||
|
||||
async function start() {
|
||||
const server = new DaprServer(SERVER_HOST, SERVER_PORT, DAPR_HOST, DAPR_HTTP_PORT);
|
||||
|
||||
// Publish multiple messages to a topic with default config.
|
||||
await client.pubsub.bulkSubscribeWithDefaultConfig(pubSubName, topic, (data) => console.log("Subscriber received: " + JSON.stringify(data)));
|
||||
|
||||
// Publish multiple messages to a topic with specific maxMessagesCount and maxAwaitDurationMs.
|
||||
await client.pubsub.bulkSubscribeWithConfig(pubSubName, topic, (data) => console.log("Subscriber received: " + JSON.stringify(data)), 100, 40);
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
{{< /tabs >}}
|
||||
|
||||
## Related links
|
||||
|
||||
- List of [supported pub/sub components]({{< ref supported-pubsub >}})
|
||||
|
|
|
@ -245,6 +245,42 @@ HTTP Status | Description
|
|||
404 | error is logged and message is dropped
|
||||
other | warning is logged and message to be retried
|
||||
|
||||
## Subscribe multiple messages from a given topic
|
||||
|
||||
This allows you, as a consumer, to subscribe multiple messages from a broker to listen to a `topic`.
|
||||
|
||||
In order to receive messages in a bulk manner for a topic subscription, application needs to opt for `bulkSubscribe` while sending list of topics to be subscribed to, and optionally they can also configure `maxMessagesCount` and/or `maxAwaitDurationMs`.
|
||||
Please refer [Send and receive messages in bulk]({{< ref pubsub-bulk.md >}}) for more details on how to opt-in.
|
||||
|
||||
#### Expected HTTP Response for Bulk Subscribe
|
||||
|
||||
An HTTP 2xx response denotes that entries (individual messages) inside this bulk messgae have been processed by Application and Dapr will now check status pertaining to each entryId.
|
||||
A JSON-encoded payload body with the processing status against each entry needs to be sent:
|
||||
- Note: If status for an entryId is not found by Dapr in response received from Application, status for that entry will be considered as RETRY.
|
||||
|
||||
```json
|
||||
{
|
||||
"statuses": {
|
||||
"entryId": "<entryId>",
|
||||
"status": "<status>"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Status | Description
|
||||
--------- | -----------
|
||||
`SUCCESS` | Message is processed successfully
|
||||
`RETRY` | Message to be retried by Dapr
|
||||
`DROP` | Warning is logged and message is dropped
|
||||
|
||||
The HTTP response might be different from HTTP 2xx. The following are Dapr's behavior in different HTTP statuses:
|
||||
|
||||
HTTP Status | Description
|
||||
--------- | -----------
|
||||
2xx | message is processed as per status in payload.
|
||||
404 | error is logged and all messages are dropped
|
||||
other | warning is logged and all messages to be retried
|
||||
|
||||
## Message envelope
|
||||
|
||||
Dapr pub/sub adheres to version 1.0 of CloudEvents.
|
||||
|
|
Loading…
Reference in New Issue