Add bulk subscriber over gRPC to examples (#871)

* original version of bulk subscriber over gPRC

Signed-off-by: MregXN <mregxn@gmail.com>

* cover validation to grpc subscriber

Signed-off-by: MregXN <mregxn@gmail.com>

* modify README of pubsub example

Signed-off-by: MregXN <mregxn@gmail.com>

* re-trigger validation to pass flaky test

Signed-off-by: MregXN <mregxn@gmail.com>

* Update BulkSubscriberGrpcService.java

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* modify README.md

Signed-off-by: MregXN <mregxn@gmail.com>

---------

Signed-off-by: MregXN <mregxn@gmail.com>
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Co-authored-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: Cassie Coyle <cassie@diagrid.io>
This commit is contained in:
MregXN 2024-02-17 07:54:57 +08:00 committed by GitHub
parent 4458f4f20d
commit 14cc3f8549
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 380 additions and 167 deletions

View File

@ -37,168 +37,10 @@ Then get into the examples directory:
```sh ```sh
cd examples cd examples
``` ```
### Initialize Dapr ### Initialize Dapr
Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized. Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized.
### Running the subscriber
The subscriber will subscribe to the topic to be used by the publisher and read the messages published. The subscriber uses the Spring Boot´s DaprApplication class for initializing the `SubscriberController`. There is a gRPC version and HTTP version of the subscriber in the grpc and http folders. In `Subscriber.java` file, you will find the `Subscriber` class and the `main` method. See the code snippet below:
```java
public class Subscriber {
public static void main(String[] args) throws Exception {
///...
// Start Dapr's callback endpoint.
DaprApplication.start([PROTOCAL],port);
}
}
```
`DaprApplication.start()` Method will run a Spring Boot application that registers the `SubscriberController`, which exposes the message retrieval as a POST request, or the `SubscriberGrpcService`, which implement the grpc methods that the sidecar will call.
**HTTP Version**
The Dapr sidecar is the one that performs the actual call to the controller, based on the pubsub features. This Spring Controller handles the message endpoint, printing the message which is received as the POST body.
The subscription's topic in Dapr is handled automatically via the `@Topic` annotation - which also supports the same expressions in
[Spring's @Value annotations](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-value-annotations).
The code snippet below shows how to create a subscription using the `@Topic` annotation showcasing expression support. In this case, `myAppProperty` is a Java property that does not exist, so the expression resolves to the default value (`messagebus`).
```java
@RestController
public class SubscriberController {
///...
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopic")
public Mono<Void> handleMessage(@RequestBody(required = false) byte[] body,
@RequestHeader Map<String, String> headers) {
return Mono.fromRunnable(() -> {
try {
// Dapr's event is compliant to CloudEvent.
CloudEventEnvelope envelope = SERIALIZER.deserialize(body, CloudEventEnvelope.class);
String message = envelope.getData() == null ? "" : envelope.getData();
System.out.println("Subscriber got message: " + message);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
```
The `@BulkSubscribe` annotation can be used with `@Topic` to receive multiple messages at once. See the example below on how to handle the bulk messages and respond correctly.
```java
@RestController
public class SubscriberController {
///...
@BulkSubscribe()
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
e.printStackTrace();
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
});
}
}
```
Execute the following command to run the HTTP Subscriber example:
<!-- STEP
name: Run Subscriber
match_order: none
expected_stdout_lines:
- '== APP == Subscriber got: This is message #1'
- '== APP == Subscriber got: This is message #2'
- '== APP == Subscriber got from bulk published topic: This is message #2'
- '== APP == Subscriber got from bulk published topic: This is message #3'
- '== APP == Bulk Subscriber got: This is message #1'
- '== APP == Bulk Subscriber got: This is message #2'
background: true
sleep: 5
-->
```bash
dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol http -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000
```
<!-- END_STEP -->
**gRPC Version**
The Spring GrpcService implements the methods required for gRPC communication with Dapr\`s sidecar.
The `SubscriberGrpcService.java` snippet below shows the details. Dapr\`s sidecar will call `listTopicSubscriptions` to get the topic and pubsub name that are contained in the response before the subscription starts. After the pubsub component in the sidecar subscribes successfully to the specified topic, a message will be sent to the method `onTopicEvent` in the request parameter.
```java
@GrpcService
public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase {
private final List<DaprAppCallbackProtos.TopicSubscription> topicSubscriptionList = new ArrayList<>();
private final DaprObjectSerializer objectSerializer = new DefaultObjectSerializer();
@Override
public void listTopicSubscriptions(Empty request,
StreamObserver<DaprAppCallbackProtos.ListTopicSubscriptionsResponse> responseObserver) {
registerConsumer("messagebus","testingtopic");
try {
DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos.ListTopicSubscriptionsResponse
.newBuilder();
topicSubscriptionList.forEach(builder::addSubscriptions);
DaprAppCallbackProtos.ListTopicSubscriptionsResponse response = builder.build();
responseObserver.onNext(response);
} catch (Throwable e) {
responseObserver.onError(e);
} finally {
responseObserver.onCompleted();
}
}
@Override
public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request,
StreamObserver<DaprAppCallbackProtos.TopicEventResponse> responseObserver) {
try {
System.out.println("Subscriber got: " + request.getData());
DaprAppCallbackProtos.TopicEventResponse response = DaprAppCallbackProtos.TopicEventResponse.newBuilder()
.setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable e) {
responseObserver.onError(e);
}
}
///...
}
```
Execute the following command to run the gRPC Subscriber example:
```bash
dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol grpc -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.Subscriber -p 3000
```
### Running the publisher ### Running the publisher
The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic. The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic.
@ -477,7 +319,6 @@ match_order: sequential
expected_stdout_lines: expected_stdout_lines:
- '== APP == Published the set of messages in a single call to Dapr' - '== APP == Published the set of messages in a single call to Dapr'
background: true background: true
sleep: 20
--> -->
```bash ```bash
@ -539,9 +380,7 @@ The Subscriber started previously [here](#running-the-subscriber) should print t
Messages have been retrieved from the topic. Messages have been retrieved from the topic.
### Bulk Subscription You can also run the publisher to publish messages to `testingtopicbulk` topic, so that it can receive messages using the bulk subscription.
You can also run the publisher to publish messages to `testingtopicbulk` topic, and receive messages using the bulk subscription.
<!-- STEP <!-- STEP
name: Run Publisher on bulk topic name: Run Publisher on bulk topic
@ -549,7 +388,6 @@ expected_stdout_lines:
- '== APP == Published message: This is message #0' - '== APP == Published message: This is message #0'
- '== APP == Published message: This is message #1' - '== APP == Published message: This is message #1'
background: true background: true
sleep: 15
--> -->
```bash ```bash
@ -585,6 +423,306 @@ Once running, the Publisher should print the same output as seen [above](#runnin
``` ```
### Running the subscriber
The subscriber will subscribe to the topic to be used by the publisher and read the messages published. The subscriber uses the Spring Boot´s DaprApplication class for initializing the `SubscriberController`. There is a gRPC version and HTTP version of the subscriber in the grpc and http folders. In `Subscriber.java` file, you will find the `Subscriber` class and the `main` method. See the code snippet below:
```java
public class Subscriber {
public static void main(String[] args) throws Exception {
///...
// Start Dapr's callback endpoint.
DaprApplication.start([PROTOCAL],port);
}
}
```
`DaprApplication.start()` Method will run a Spring Boot application that registers the `SubscriberController`, which exposes the message retrieval as a POST request, or the `SubscriberGrpcService`, which implement the grpc methods that the sidecar will call.
**HTTP Version**
The Dapr sidecar is the one that performs the actual call to the controller, based on the pubsub features. This Spring Controller handles the message endpoint, printing the message which is received as the POST body.
The subscription's topic in Dapr is handled automatically via the `@Topic` annotation - which also supports the same expressions in
[Spring's @Value annotations](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-value-annotations).
The code snippet below shows how to create a subscription using the `@Topic` annotation showcasing expression support. In this case, `myAppProperty` is a Java property that does not exist, so the expression resolves to the default value (`messagebus`).
```java
@RestController
public class SubscriberController {
///...
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopic")
public Mono<Void> handleMessage(@RequestBody(required = false) byte[] body,
@RequestHeader Map<String, String> headers) {
return Mono.fromRunnable(() -> {
try {
// Dapr's event is compliant to CloudEvent.
CloudEventEnvelope envelope = SERIALIZER.deserialize(body, CloudEventEnvelope.class);
String message = envelope.getData() == null ? "" : envelope.getData();
System.out.println("Subscriber got message: " + message);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
```
The `@BulkSubscribe` annotation can be used with `@Topic` to receive multiple messages at once. See the example below on how to handle the bulk messages and respond correctly.
```java
@RestController
public class SubscriberController {
///...
@BulkSubscribe()
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
e.printStackTrace();
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
});
}
}
```
Execute the following command to run the HTTP Subscriber example:
<!-- STEP
name: Run Http Subscriber
match_order: none
expected_stdout_lines:
- '== APP == Subscriber got: This is message #1'
- '== APP == Subscriber got: This is message #2'
- '== APP == Subscriber got from bulk published topic: This is message #2'
- '== APP == Subscriber got from bulk published topic: This is message #3'
- '== APP == Bulk Subscriber got: This is message #1'
- '== APP == Bulk Subscriber got: This is message #2'
background: true
sleep: 15
-->
```bash
dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol http -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000
```
<!-- END_STEP -->
**gRPC Version**
The Spring GrpcService implements the methods required for gRPC communication with Dapr\`s sidecar.
The `SubscriberGrpcService.java` snippet below shows the details. Dapr\`s sidecar will call `listTopicSubscriptions` to get the topic and pubsub name that are contained in the response before the subscription starts. After the pubsub component in the sidecar subscribes successfully to the specified topic, a message will be sent to the method `onTopicEvent` in the request parameter.
```java
@GrpcService
public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase {
private final List<DaprAppCallbackProtos.TopicSubscription> topicSubscriptionList = new ArrayList<>();
private final DaprObjectSerializer objectSerializer = new DefaultObjectSerializer();
@Override
public void listTopicSubscriptions(Empty request,
StreamObserver<DaprAppCallbackProtos.ListTopicSubscriptionsResponse> responseObserver) {
registerConsumer("messagebus", "testingtopic", false);
registerConsumer("messagebus", "testingtopicbulk", true);
try {
DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos
.ListTopicSubscriptionsResponse.newBuilder();
topicSubscriptionList.forEach(builder::addSubscriptions);
DaprAppCallbackProtos.ListTopicSubscriptionsResponse response = builder.build();
responseObserver.onNext(response);
} catch (Throwable e) {
responseObserver.onError(e);
} finally {
responseObserver.onCompleted();
}
}
@Override
public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request,
StreamObserver<DaprAppCallbackProtos.TopicEventResponse> responseObserver) {
try {
String data = request.getData().toStringUtf8().replace("\"", "");
System.out.println("Subscriber got: " + data);
DaprAppCallbackProtos.TopicEventResponse response = DaprAppCallbackProtos.TopicEventResponse.newBuilder()
.setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.SUCCESS)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable e) {
responseObserver.onError(e);
}
}
///...
}
```
The `BulkSubscriberGrpcService.java` file is responsible for implementing the processing of bulk message subscriptions. When Dapr's sidecar successfully subscribes to bulk messages, it will call `onBulkTopicEventAlpha1` and pass them as a request parameter. You can refer to the example on how to handle bulk messages and respond correctly over gPRC.
```java
public class BulkSubscriberGrpcService extends AppCallbackAlphaGrpc.AppCallbackAlphaImplBase {
@Override
public void onBulkTopicEventAlpha1(io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkRequest request,
io.grpc.stub.StreamObserver<io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkResponse> responseObserver) {
try {
TopicEventBulkResponse.Builder responseBuilder = TopicEventBulkResponse.newBuilder();
if (request.getEntriesCount() == 0) {
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}
System.out.println("Bulk Subscriber received " + request.getEntriesCount() + " messages.");
for (TopicEventBulkRequestEntry entry : request.getEntriesList()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
System.out.printf("Bulk Subscriber got: %s\n", entry.getCloudEvent().getData().toStringUtf8());
TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry
.newBuilder()
.setEntryId(entry.getEntryId())
.setStatusValue(TopicEventResponseStatus.SUCCESS_VALUE);
responseBuilder.addStatuses(responseEntryBuilder);
} catch (Throwable e) {
TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry
.newBuilder()
.setEntryId(entry.getEntryId())
.setStatusValue(TopicEventResponseStatus.RETRY_VALUE);
responseBuilder.addStatuses(responseEntryBuilder);
}
}
TopicEventBulkResponse response = responseBuilder.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable e) {
responseObserver.onError(e);
}
}
}
```
Execute the following command to run the gRPC Subscriber example:
<!-- STEP
name: Run gRPC Subscriber
match_order: none
expected_stdout_lines:
- '== APP == Subscriber got: This is message #1'
- '== APP == Subscriber got: This is message #2'
- '== APP == Bulk Subscriber got: "This is message #1"'
- '== APP == Bulk Subscriber got: "This is message #2"'
background: true
sleep: 15
-->
```bash
// stop http subscriber if you have started one.
dapr stop --app-id subscriber
// start a grpc subscriber
dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol grpc -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.Subscriber -p 3000
```
<!-- END_STEP -->
Use the follow command to start a publisher
<!-- STEP
name: Run Publisher
expected_stdout_lines:
- '== APP == Published message: This is message #0'
- '== APP == Published message: This is message #1'
background: true
sleep: 15
-->
```bash
dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher
```
<!-- END_STEP -->
Messages will be subscribed by the gRPC Subscriber and will print the output as follows:
```
== APP == Subscriber got: This is message #0
== APP == Subscriber got: This is message #1
== APP == Subscriber got: This is message #2
== APP == Subscriber got: This is message #3
== APP == Subscriber got: This is message #4
== APP == Subscriber got: This is message #5
== APP == Subscriber got: This is message #6
== APP == Subscriber got: This is message #7
== APP == Subscriber got: This is message #8
== APP == Subscriber got: This is message #9
```
If you run a bulk publisher using following command
<!-- STEP
name: Run Publisher on bulk topic
expected_stdout_lines:
- '== APP == Published message: This is message #0'
- '== APP == Published message: This is message #1'
background: true
-->
```bash
dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher testingtopicbulk
```
<!-- END_STEP -->
The console will print the output:
```
== APP == Bulk Subscriber received 2 messages.
== APP == Bulk Subscriber message has entry ID: 0c917871-557c-41db-b038-a250d23ee07c
== APP == Bulk Subscriber got: "This is message #0"
== APP == Bulk Subscriber message has entry ID: df114819-0db9-4a04-b06b-d5844d3ff731
== APP == Bulk Subscriber got: "This is message #1"
== APP == Bulk Subscriber received 2 messages.
== APP == Bulk Subscriber message has entry ID: 8161f1e2-caf0-446a-81f7-0b40e7350e19
== APP == Bulk Subscriber got: "This is message #2"
== APP == Bulk Subscriber message has entry ID: 173fafa1-d187-4b2d-83bf-b4da00616a3a
== APP == Bulk Subscriber got: "This is message #3"
== APP == Bulk Subscriber received 2 messages.
== APP == Bulk Subscriber message has entry ID: 94f89996-155d-4b52-8a8c-c268662ff3a2
== APP == Bulk Subscriber got: "This is message #4"
== APP == Bulk Subscriber message has entry ID: 497c47a4-e4f7-4aeb-abde-099068dea30c
== APP == Bulk Subscriber got: "This is message #5"
== APP == Bulk Subscriber received 1 messages.
== APP == Bulk Subscriber message has entry ID: 892fb1ae-b027-457a-860e-52b9c3219270
== APP == Bulk Subscriber got: "This is message #6"
== APP == Bulk Subscriber received 2 messages.
== APP == Bulk Subscriber message has entry ID: 95abf8cc-e033-483d-9a55-e0491bd97930
== APP == Bulk Subscriber got: "This is message #7"
== APP == Bulk Subscriber message has entry ID: eaf75a31-335e-4c1a-b19a-6aa1710f625a
== APP == Bulk Subscriber got: "This is message #8"
== APP == Bulk Subscriber received 1 messages.
== APP == Bulk Subscriber message has entry ID: 9836ef69-6d3c-4738-ba99-1d0ce68ec06b
== APP == Bulk Subscriber got: "This is message #9"
```
>The order of the events that are published are not guaranteed if use redis.
### Tracing ### Tracing
Dapr handles tracing in PubSub automatically. Open Zipkin on [http://localhost:9411/zipkin](http://localhost:9411/zipkin). You should see a screen like the one below: Dapr handles tracing in PubSub automatically. Open Zipkin on [http://localhost:9411/zipkin](http://localhost:9411/zipkin). You should see a screen like the one below:
@ -684,11 +822,16 @@ For more details on Dapr Spring Boot integration, please refer to [Dapr Spring B
<!-- STEP <!-- STEP
name: Cleanup name: Cleanup
--> -->
Use the following command to stop your running http subscriber or gRPC subscriber.
```bash
dapr stop --app-id subscriber
```
<!-- END_STEP -->
After completing publish , the application will automatically exit. However, you can still use the following command to stop your running publisher.
```bash ```bash
dapr stop --app-id publisher dapr stop --app-id publisher
dapr stop --app-id bulk-publisher dapr stop --app-id bulk-publisher
dapr stop --app-id subscriber
``` ```
<!-- END_STEP -->

View File

@ -0,0 +1,65 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.pubsub.grpc;
import io.dapr.v1.AppCallbackAlphaGrpc;
import io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkRequestEntry;
import io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkResponse;
import io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkResponseEntry;
import io.dapr.v1.DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus;
/**
* Class that encapsulates all client-side logic for Grpc.
*/
public class BulkSubscriberGrpcService extends AppCallbackAlphaGrpc.AppCallbackAlphaImplBase {
@Override
public void onBulkTopicEventAlpha1(io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkRequest request,
io.grpc.stub.StreamObserver<io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkResponse> responseObserver) {
try {
TopicEventBulkResponse.Builder responseBuilder = TopicEventBulkResponse.newBuilder();
if (request.getEntriesCount() == 0) {
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}
System.out.println("Bulk Subscriber received " + request.getEntriesCount() + " messages.");
for (TopicEventBulkRequestEntry entry : request.getEntriesList()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
System.out.printf("Bulk Subscriber got: %s\n", entry.getCloudEvent().getData().toStringUtf8());
TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry
.newBuilder()
.setEntryId(entry.getEntryId())
.setStatusValue(TopicEventResponseStatus.SUCCESS_VALUE);
responseBuilder.addStatuses(responseEntryBuilder);
} catch (Throwable e) {
TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry
.newBuilder()
.setEntryId(entry.getEntryId())
.setStatusValue(TopicEventResponseStatus.RETRY_VALUE);
responseBuilder.addStatuses(responseEntryBuilder);
}
}
TopicEventBulkResponse response = responseBuilder.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable e) {
responseObserver.onError(e);
}
}
}

View File

@ -50,6 +50,7 @@ public class Subscriber {
//start a grpc server //start a grpc server
Server server = ServerBuilder.forPort(port) Server server = ServerBuilder.forPort(port)
.addService(new SubscriberGrpcService()) .addService(new SubscriberGrpcService())
.addService(new BulkSubscriberGrpcService())
.build(); .build();
server.start(); server.start();
server.awaitTermination(); server.awaitTermination();

View File

@ -30,7 +30,9 @@ public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase {
@Override @Override
public void listTopicSubscriptions(Empty request, public void listTopicSubscriptions(Empty request,
StreamObserver<DaprAppCallbackProtos.ListTopicSubscriptionsResponse> responseObserver) { StreamObserver<DaprAppCallbackProtos.ListTopicSubscriptionsResponse> responseObserver) {
registerConsumer("messagebus","testingtopic"); registerConsumer("messagebus", "testingtopic", false);
registerConsumer("messagebus", "bulkpublishtesting", false);
registerConsumer("messagebus", "testingtopicbulk", true);
try { try {
DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos DaprAppCallbackProtos.ListTopicSubscriptionsResponse.Builder builder = DaprAppCallbackProtos
.ListTopicSubscriptionsResponse.newBuilder(); .ListTopicSubscriptionsResponse.newBuilder();
@ -65,12 +67,14 @@ public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase {
* *
* @param topic the topic * @param topic the topic
* @param pubsubName the pubsub name * @param pubsubName the pubsub name
* @param isBulkMessage flag to enable/disable bulk subscribe
*/ */
public void registerConsumer(String pubsubName, String topic) { public void registerConsumer(String pubsubName, String topic, boolean isBulkMessage) {
topicSubscriptionList.add(DaprAppCallbackProtos.TopicSubscription topicSubscriptionList.add(DaprAppCallbackProtos.TopicSubscription
.newBuilder() .newBuilder()
.setPubsubName(pubsubName) .setPubsubName(pubsubName)
.setTopic(topic) .setTopic(topic)
.setBulkSubscribe(DaprAppCallbackProtos.BulkSubscribeConfig.newBuilder().setEnabled(isBulkMessage))
.build()); .build());
} }
} }