feat(pubsub): add bulk subscribe support (#791)

This commit is contained in:
Shubham Sharma 2023-01-19 07:28:03 +05:30 committed by GitHub
parent b83661d109
commit eb8565cca0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1048 additions and 47 deletions

1
.gitignore vendored
View File

@ -1,6 +1,7 @@
# IDE generated files and directories
*.iml
.idea/
.run/
.vs/
.vscode/

View File

@ -148,7 +148,13 @@ try (DaprClient client = (new DaprClientBuilder()).build()) {
```java
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.BulkSubscribeMessage;
import io.dapr.client.domain.BulkSubscribeMessageEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@ -186,6 +192,33 @@ public class SubscriberController {
});
}
@BulkSubscribe()
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
if (bulkMessage.getEntries().size() == 0) {
return new BulkSubscribeAppResponse(new ArrayList<BulkSubscribeAppResponseEntry>());
}
System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
e.printStackTrace();
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
});
}
}
```

View File

@ -37,7 +37,7 @@ public class Publisher {
private static final String MESSAGE_TTL_IN_SECONDS = "1000";
//The title of the topic to be used for publishing
private static final String TOPIC_NAME = "testingtopic";
private static final String DEFAULT_TOPIC_NAME = "testingtopic";
//The name of the pubsub
private static final String PUBSUB_NAME = "messagebus";
@ -48,15 +48,17 @@ public class Publisher {
* @throws Exception A startup Exception.
*/
public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
//Publishing messages
// Publishing messages
client.publishEvent(
PUBSUB_NAME,
TOPIC_NAME,
topicName,
message,
singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
System.out.println("Published message: " + message);
try {
@ -73,4 +75,17 @@ public class Publisher {
System.out.println("Done.");
}
}
/**
* If a topic is specified in args, use that.
* Else, fallback to the default topic.
* @param args program arguments
* @return name of the topic to publish messages to.
*/
private static String getTopicName(String[] args) {
if (args.length >= 1) {
return args[0];
}
return DEFAULT_TOPIC_NAME;
}
}

View File

@ -1,6 +1,6 @@
# Dapr Pub-Sub Sample
In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subcribe pattern. The publisher will generate messages of a specific topic, while subscriber will listen for messages of specific topic. See [Why Pub-Sub](#why-pub-sub) to understand when this pattern might be a good choice for your software architecture.
In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subscribe pattern. The publisher will generate messages of a specific topic, while subscriber will listen for messages of specific topic. See [Why Pub-Sub](#why-pub-sub) to understand when this pattern might be a good choice for your software architecture.
Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-overview/) link for more information about Dapr and Pub-Sub.
@ -59,7 +59,7 @@ This Spring Controller handles the message endpoint, printing the message which
The subscription's topic in Dapr is handled automatically via the `@Topic` annotation - which also supports the same expressions in
[Spring's @Value annotations](https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-value-annotations).
The code snippet below shows how to create a subscription using the `@Topic` annotation showcasing expression support. In this case, `myAppProperty` is a Java property that does not exist, so the expression resolves to the default value (`messagebus`).
The code snippet below shows how to create a subscription using the `@Topic` annotation showcasing expression support. In this case, `myAppProperty` is a Java property that does not exist, so the expression resolves to the default value (`messagebus`).
```java
@RestController
@ -83,13 +83,49 @@ public class SubscriberController {
}
}
```
The `@BulkSubscribe` annotation can be used with `@Topic` to receive multiple messages at once. See the example on how to handle the bulk messages and respond correctly.
```java
@RestController
public class SubscriberController {
///...
@BulkSubscribe()
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
e.printStackTrace();
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
});
}
}
```
Execute the follow script in order to run the Subscriber example:
<!-- STEP
name: Run Subscriber
match_order: none
expected_stdout_lines:
- '== APP == Subscriber got: This is message #1'
- '== APP == Subscriber got: This is message #2'
- '== APP == Bulk Subscriber got: This is message #1'
- '== APP == Bulk Subscriber got: This is message #2'
background: true
sleep: 5
-->
@ -104,22 +140,28 @@ dapr run --components-path ./components/pubsub --app-id subscriber --app-port 30
The other component is the publisher. It is a simple java application with a main method that uses the Dapr HTTP Client to publish 10 messages to an specific topic.
In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and recieved objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and received objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
Dapr sidecar will automatically wrap the payload received into a CloudEvent object, which will later on parsed by the subscriber.
```java
public class Publisher {
private static final int NUM_MESSAGES = 10;
private static final String TOPIC_NAME = "testingtopic";
private static final String DEFAULT_TOPIC_NAME = "testingtopic";
private static final String PUBSUB_NAME = "messagebus";
///...
public static void main(String[] args) throws Exception {
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
String topicName = getTopicName(args); // Topic can be configured by args.
// Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
//Publishing messages
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
// Publishing messages
client.publishEvent(
PUBSUB_NAME,
topicName,
message,
singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
System.out.println("Published message: " + message);
//...
}
@ -246,6 +288,50 @@ Once running, the Subscriber should print the output as follows:
Messages have been retrieved from the topic.
You can also run the publisher to publish messages to `testingtopicbulk` topic, and receive messages using the bulk subscription.
<!-- STEP
name: Run Publisher on bulk topic
expected_stdout_lines:
- '== APP == Published message: This is message #0'
- '== APP == Published message: This is message #1'
background: true
sleep: 15
-->
```bash
dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Publisher testingtopicbulk
```
<!-- END_STEP -->
Once running, the Publisher should print the same output as above. The Subscriber should print the output as follows:
```txt
== APP == Bulk Subscriber got 10 messages.
== APP == Bulk Subscriber message has entry ID: d4d81c57-d75c-4a22-a747-e907099ca135
== APP == Bulk Subscriber got: This is message #0
== APP == Bulk Subscriber message has entry ID: f109c837-f7c8-4839-8d71-2df9c467875c
== APP == Bulk Subscriber got: This is message #1
== APP == Bulk Subscriber message has entry ID: d735044f-1320-43e1-bd41-787ad9d26427
== APP == Bulk Subscriber got: This is message #2
== APP == Bulk Subscriber message has entry ID: afe74e5a-1a2b-498a-beca-7a6383141ccf
== APP == Bulk Subscriber got: This is message #3
== APP == Bulk Subscriber message has entry ID: 1df3fa51-d137-4749-891d-973ce58f1e1c
== APP == Bulk Subscriber got: This is message #4
== APP == Bulk Subscriber message has entry ID: ecab82bd-77be-40a1-8b62-2dbb3388d726
== APP == Bulk Subscriber got: This is message #5
== APP == Bulk Subscriber message has entry ID: 49a63916-ed09-4101-969e-13a860e35c55
== APP == Bulk Subscriber got: This is message #6
== APP == Bulk Subscriber message has entry ID: 897ec32c-ad74-4512-8979-ee0a455433e8
== APP == Bulk Subscriber got: This is message #7
== APP == Bulk Subscriber message has entry ID: 67367edc-27a6-4c8c-9e39-31caa0f74b2d
== APP == Bulk Subscriber got: This is message #8
== APP == Bulk Subscriber message has entry ID: f134d21f-0a05-408d-977c-1397b999e908
== APP == Bulk Subscriber got: This is message #9
```
### Tracing
Dapr handles tracing in PubSub automatically. Open Zipkin on [http://localhost:9411/zipkin](http://localhost:9411/zipkin). You should see a screen like the one below:

View File

@ -16,12 +16,21 @@ package io.dapr.examples.pubsub.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Rule;
import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.BulkSubscribeMessage;
import io.dapr.client.domain.BulkSubscribeMessageEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
/**
* SpringBoot Controller to handle input binding.
*/
@ -32,6 +41,7 @@ public class SubscriberController {
/**
* Handles a registered publish endpoint on this app.
*
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
@ -49,7 +59,9 @@ public class SubscriberController {
}
/**
* Handles a registered publish endpoint on this app (version 2 of a cloud event).
* Handles a registered publish endpoint on this app (version 2 of a cloud
* event).
*
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
@ -67,4 +79,37 @@ public class SubscriberController {
});
}
/**
* Handles a registered subscribe endpoint on this app using bulk subscribe.
*
* @param bulkMessage The bulk pubSub message received.
* @return A list of responses for each event.
*/
@BulkSubscribe()
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
if (bulkMessage.getEntries().size() == 0) {
return new BulkSubscribeAppResponse(new ArrayList<BulkSubscribeAppResponseEntry>());
}
System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
e.printStackTrace();
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
});
}
}

View File

@ -16,7 +16,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.42.1</grpc.version>
<protobuf.version>3.17.3</protobuf.version>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.9.0-rc.3/dapr/proto</dapr.proto.baseurl>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/a8c698ad897e42d6624f5fc6ccfd0630e2a8fd00/dapr/proto</dapr.proto.baseurl>
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>

View File

@ -18,11 +18,13 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Rule;
import io.dapr.Topic;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.beans.factory.config.EmbeddedValueResolver;
import org.springframework.stereotype.Component;
import org.springframework.util.StringValueResolver;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@ -40,7 +42,6 @@ import java.util.Map;
public class DaprBeanPostProcessor implements BeanPostProcessor {
private static final ObjectMapper MAPPER = new ObjectMapper();
private final EmbeddedValueResolver embeddedValueResolver;
DaprBeanPostProcessor(ConfigurableBeanFactory beanFactory) {
@ -56,7 +57,7 @@ public class DaprBeanPostProcessor implements BeanPostProcessor {
return null;
}
subscribeToTopics(bean.getClass(), embeddedValueResolver);
subscribeToTopics(bean.getClass(), embeddedValueResolver, DaprRuntime.getInstance());
return bean;
}
@ -70,35 +71,52 @@ public class DaprBeanPostProcessor implements BeanPostProcessor {
}
/**
* Subscribe to topics based on {@link Topic} annotations on the given class and any of ancestor classes.
* Subscribe to topics based on {@link Topic} annotations on the given class and
* any of ancestor classes.
*
* @param clazz Controller class where {@link Topic} is expected.
*/
private static void subscribeToTopics(Class clazz, EmbeddedValueResolver embeddedValueResolver) {
private static void subscribeToTopics(
Class clazz, StringValueResolver stringValueResolver, DaprRuntime daprRuntime) {
if (clazz == null) {
return;
}
subscribeToTopics(clazz.getSuperclass(), embeddedValueResolver);
subscribeToTopics(clazz.getSuperclass(), stringValueResolver, daprRuntime);
for (Method method : clazz.getDeclaredMethods()) {
Topic topic = method.getAnnotation(Topic.class);
if (topic == null) {
continue;
}
DaprTopicBulkSubscribe bulkSubscribe = null;
BulkSubscribe bulkSubscribeAnnotation = method.getAnnotation(BulkSubscribe.class);
if (bulkSubscribeAnnotation != null) {
bulkSubscribe = new DaprTopicBulkSubscribe(true);
int maxMessagesCount = bulkSubscribeAnnotation.maxMessagesCount();
if (maxMessagesCount != -1) {
bulkSubscribe.setMaxMessagesCount(maxMessagesCount);
}
int maxAwaitDurationMs = bulkSubscribeAnnotation.maxAwaitDurationMs();
if (maxAwaitDurationMs != -1) {
bulkSubscribe.setMaxAwaitDurationMs(maxAwaitDurationMs);
}
}
Rule rule = topic.rule();
String topicName = embeddedValueResolver.resolveStringValue(topic.name());
String pubSubName = embeddedValueResolver.resolveStringValue(topic.pubsubName());
String match = embeddedValueResolver.resolveStringValue(rule.match());
String topicName = stringValueResolver.resolveStringValue(topic.name());
String pubSubName = stringValueResolver.resolveStringValue(topic.pubsubName());
String match = stringValueResolver.resolveStringValue(rule.match());
if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
try {
TypeReference<HashMap<String, String>> typeRef
= new TypeReference<HashMap<String, String>>() {};
TypeReference<HashMap<String, String>> typeRef = new TypeReference<HashMap<String, String>>() {
};
Map<String, String> metadata = MAPPER.readValue(topic.metadata(), typeRef);
List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName);
for (String route : routes) {
DaprRuntime.getInstance().addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata);
daprRuntime.addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata, bulkSubscribe);
}
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Error while parsing metadata: " + e);
@ -108,7 +126,8 @@ public class DaprBeanPostProcessor implements BeanPostProcessor {
}
/**
* Method to provide all possible complete routes list fos this post method present in this controller class,
* Method to provide all possible complete routes list fos this post method
* present in this controller class,
* for mentioned topic.
*
* @param clazz Controller class
@ -118,8 +137,7 @@ public class DaprBeanPostProcessor implements BeanPostProcessor {
*/
private static List<String> getAllCompleteRoutesForPost(Class clazz, Method method, String topicName) {
List<String> routesList = new ArrayList<>();
RequestMapping clazzRequestMapping =
(RequestMapping) clazz.getAnnotation(RequestMapping.class);
RequestMapping clazzRequestMapping = (RequestMapping) clazz.getAnnotation(RequestMapping.class);
String[] clazzLevelRoute = null;
if (clazzRequestMapping != null) {
clazzLevelRoute = clazzRequestMapping.value();
@ -141,7 +159,7 @@ public class DaprBeanPostProcessor implements BeanPostProcessor {
}
private static String[] getRoutesForPost(Method method, String topicName) {
String[] postValueArray = new String[] {topicName};
String[] postValueArray = new String[] { topicName };
PostMapping postMapping = method.getAnnotation(PostMapping.class);
if (postMapping != null) {
if (postMapping.path() != null && postMapping.path().length >= 1) {

View File

@ -33,7 +33,8 @@ class DaprRuntime {
private final Map<DaprTopicKey, DaprSubscriptionBuilder> subscriptionBuilders = new HashMap<>();
/**
* Private constructor to make this singleton.
* DaprRuntime should be used as a singleton, using {@link DaprRuntime#getInstance()}.
* The constructor's default scope is available for unit tests only.
*/
private DaprRuntime() {
}
@ -58,24 +59,45 @@ class DaprRuntime {
/**
* Adds a topic to the list of subscribed topics.
*
* @param pubsubName Pubsub name to subcribe to.
* @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
*/
public synchronized void addSubscribedTopic(String pubsubName,
public synchronized void addSubscribedTopic(String pubSubName,
String topicName,
String match,
int priority,
String route,
Map<String,String> metadata) {
DaprTopicKey topicKey = new DaprTopicKey(pubsubName, topicName);
this.addSubscribedTopic(pubSubName, topicName, match, priority, route, metadata, null);
}
/**
* Adds a topic to the list of subscribed topics.
*
* @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration.
*/
public synchronized void addSubscribedTopic(String pubSubName,
String topicName,
String match,
int priority,
String route,
Map<String,String> metadata,
DaprTopicBulkSubscribe bulkSubscribe) {
DaprTopicKey topicKey = new DaprTopicKey(pubSubName, topicName);
DaprSubscriptionBuilder builder = subscriptionBuilders.get(topicKey);
if (builder == null) {
builder = new DaprSubscriptionBuilder(pubsubName, topicName);
builder = new DaprSubscriptionBuilder(pubSubName, topicName);
subscriptionBuilders.put(topicKey, builder);
}
@ -88,6 +110,10 @@ class DaprRuntime {
if (metadata != null && !metadata.isEmpty()) {
builder.setMetadata(metadata);
}
if (bulkSubscribe != null) {
builder.setBulkSubscribe(bulkSubscribe);
}
}
public synchronized DaprTopicSubscription[] listSubscribedTopics() {

View File

@ -27,6 +27,8 @@ class DaprSubscriptionBuilder {
private String defaultPath;
private Map<String, String> metadata;
private DaprTopicBulkSubscribe bulkSubscribe;
/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
@ -86,6 +88,16 @@ class DaprSubscriptionBuilder {
return this;
}
/**
* Sets the bulkSubscribe configuration for the subscription.
* @param bulkSubscribe The bulk subscribe configuration.
* @return this instance.
*/
public DaprSubscriptionBuilder setBulkSubscribe(DaprTopicBulkSubscribe bulkSubscribe) {
this.bulkSubscribe = bulkSubscribe;
return this;
}
/**
* Builds the DaprTopicSubscription that is returned by the application to Dapr.
* @return The DaprTopicSubscription.
@ -103,7 +115,7 @@ class DaprSubscriptionBuilder {
route = defaultPath;
}
return new DaprTopicSubscription(this.pubsubName, this.topic, route, routes, metadata);
return new DaprTopicSubscription(this.pubsubName, this.topic, route, routes, metadata, bulkSubscribe);
}
private static class TopicRule {

View File

@ -0,0 +1,54 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.springboot;
class DaprTopicBulkSubscribe {
private boolean enabled;
private Integer maxMessagesCount;
private Integer maxAwaitDurationMs;
DaprTopicBulkSubscribe(boolean enabled) {
this.enabled = enabled;
}
public boolean isEnabled() {
return enabled;
}
public Integer getMaxAwaitDurationMs() {
return maxAwaitDurationMs;
}
public Integer getMaxMessagesCount() {
return maxMessagesCount;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public void setMaxAwaitDurationMs(int maxAwaitDurationMs) {
if (maxAwaitDurationMs < 0) {
throw new IllegalArgumentException("maxAwaitDurationMs cannot be negative");
}
this.maxAwaitDurationMs = maxAwaitDurationMs;
}
public void setMaxMessagesCount(int maxMessagesCount) {
if (maxMessagesCount < 1) {
throw new IllegalArgumentException("maxMessagesCount must be greater than 0");
}
this.maxMessagesCount = maxMessagesCount;
}
}

View File

@ -25,6 +25,7 @@ class DaprTopicSubscription {
private final String route;
private final DaprTopicRoutes routes;
private final Map<String, String> metadata;
private final DaprTopicBulkSubscribe bulkSubscribe;
/**
* Create a subscription topic.
@ -34,7 +35,19 @@ class DaprTopicSubscription {
* @param metadata Metdata for extended subscription functionality.
*/
DaprTopicSubscription(String pubsubName, String topic, String route, Map<String, String> metadata) {
this(pubsubName, topic, route, null, metadata);
this(pubsubName, topic, route, metadata, null);
}
/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param metadata Metdata for extended subscription functionality.
*/
DaprTopicSubscription(String pubsubName, String topic, String route,
Map<String, String> metadata, DaprTopicBulkSubscribe bulkSubscribe) {
this(pubsubName, topic, route, null, metadata, bulkSubscribe);
}
/**
@ -43,15 +56,30 @@ class DaprTopicSubscription {
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param routes Destination routes with rules for messages.
* @param metadata Metdata for extended subscription functionality.
* @param metadata Metadata for extended subscription functionality.
*/
DaprTopicSubscription(String pubsubName, String topic, String route,
DaprTopicRoutes routes, Map<String, String> metadata) {
DaprTopicSubscription(String pubsubName, String topic, String route, DaprTopicRoutes routes,
Map<String, String> metadata) {
this(pubsubName, topic, route, routes, metadata, null);
}
/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param routes Destination routes with rules for messages.
* @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration.
*/
DaprTopicSubscription(String pubsubName, String topic, String route, DaprTopicRoutes routes,
Map<String, String> metadata, DaprTopicBulkSubscribe bulkSubscribe) {
this.pubsubName = pubsubName;
this.topic = topic;
this.route = route;
this.routes = routes;
this.metadata = Collections.unmodifiableMap(metadata);
this.bulkSubscribe = bulkSubscribe;
}
public String getPubsubName() {
@ -73,4 +101,8 @@ class DaprTopicSubscription {
public Map<String, String> getMetadata() {
return metadata;
}
public DaprTopicBulkSubscribe getBulkSubscribe() {
return bulkSubscribe;
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.springboot.annotations;
import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeMessage;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* BulkSubscribe annotation should be applied with {@link Topic @Topic} when
* the topic should be subscribed to using the Bulk Subscribe API.
* This will require handling multiple messages using {@link BulkSubscribeMessage
* DaprBulkMessage}
* and returning a {@link BulkSubscribeAppResponse DaprBulkAppResponse}.
*/
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface BulkSubscribe {
/**
* Maximum number of messages in a bulk message from the message bus.
*
* @return number of messages.
*/
int maxMessagesCount() default -1;
/**
* Maximum duration to wait for maxBulkSubCount messages by the message bus
* before sending the messages to Dapr.
*
* @return time to await in milliseconds.
*/
int maxAwaitDurationMs() default -1;
}

View File

@ -0,0 +1,102 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.dapr.springboot;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.util.StringValueResolver;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
public class DaprBeanPostProcessorSubscribeTest {
@Test
public void testPostProcessBeforeInitialization() throws NoSuchMethodException {
Method subscribeToTopicsMethod = DaprBeanPostProcessor.class.getDeclaredMethod(
"subscribeToTopics", Class.class, StringValueResolver.class, DaprRuntime.class);
subscribeToTopicsMethod.setAccessible(true);
DaprRuntime runtime = getDaprRuntime();
try {
subscribeToTopicsMethod.invoke(DaprBeanPostProcessor.class, MockControllerWithSubscribe.class,
new MockStringValueResolver(), runtime);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
DaprTopicSubscription[] topicSubscriptions = runtime.listSubscribedTopics();
// There should be three subscriptions.
Assert.assertEquals(2, topicSubscriptions.length);
DaprTopicSubscription[] expectedDaprTopicSubscriptions = getTestDaprTopicSubscriptions();
// Subscription without BulkSubscribe.
this.assertTopicSubscriptionEquality(expectedDaprTopicSubscriptions[0], topicSubscriptions[0]);
// Subscription with BulkSubscribe.
// This should correctly set the bulkSubscribe field.
this.assertTopicSubscriptionEquality(expectedDaprTopicSubscriptions[1], topicSubscriptions[1]);
}
private void assertTopicSubscriptionEquality(DaprTopicSubscription s1, DaprTopicSubscription s2) {
Assert.assertEquals(s1.getPubsubName(), s2.getPubsubName());
Assert.assertEquals(s1.getTopic(), s2.getTopic());
Assert.assertEquals(s1.getRoute(), s2.getRoute());
Assert.assertEquals(s1.getMetadata(), s2.getMetadata());
if (s1.getBulkSubscribe() == null) {
Assert.assertNull(s2.getBulkSubscribe());
} else {
Assert.assertEquals(s1.getBulkSubscribe().isEnabled(), s2.getBulkSubscribe().isEnabled());
Assert.assertEquals(s1.getBulkSubscribe().getMaxAwaitDurationMs(), s2.getBulkSubscribe().getMaxAwaitDurationMs());
Assert.assertEquals(s1.getBulkSubscribe().getMaxMessagesCount(), s2.getBulkSubscribe().getMaxMessagesCount());
}
}
private DaprTopicSubscription[] getTestDaprTopicSubscriptions() {
DaprTopicSubscription[] daprTopicSubscriptions = new DaprTopicSubscription[3];
daprTopicSubscriptions[0] = new DaprTopicSubscription(
MockControllerWithSubscribe.pubSubName,
MockControllerWithSubscribe.topicName,
MockControllerWithSubscribe.subscribeRoute,
new HashMap<>());
DaprTopicBulkSubscribe bulkSubscribe = new DaprTopicBulkSubscribe(true);
bulkSubscribe.setMaxMessagesCount(MockControllerWithSubscribe.maxMessagesCount);
bulkSubscribe.setMaxAwaitDurationMs(MockControllerWithSubscribe.maxAwaitDurationMs);
daprTopicSubscriptions[1] = new DaprTopicSubscription(
MockControllerWithSubscribe.pubSubName,
MockControllerWithSubscribe.bulkTopicName,
MockControllerWithSubscribe.bulkSubscribeRoute,
new HashMap<>(),
bulkSubscribe);
return daprTopicSubscriptions;
}
private DaprRuntime getDaprRuntime() {
try {
Constructor<DaprRuntime> constructor = DaprRuntime.class.getDeclaredConstructor();
constructor.setAccessible(true);
return constructor.newInstance();
}catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new RuntimeException(e.getMessage());
}
}
}

View File

@ -1,3 +1,16 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.dapr.springboot;
import org.junit.Assert;
@ -65,5 +78,4 @@ public class DaprBeanPostProcessorTest {
private boolean testingListForOrderAgnosticEquality(List<?> first, List<?> second) {
return (first.size() == second.size() && first.containsAll(second) && second.containsAll(first));
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.dapr.springboot;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class DaprTopicBulkSubscribeTest {
@Test
public void testGettersAndSetters() {
DaprTopicBulkSubscribe bulkSubscribe = new DaprTopicBulkSubscribe(true);
bulkSubscribe.setMaxMessagesCount(100);
bulkSubscribe.setMaxAwaitDurationMs(200);
Assert.assertTrue(bulkSubscribe.isEnabled());
Assert.assertEquals(100, bulkSubscribe.getMaxMessagesCount().longValue());
Assert.assertEquals(200, bulkSubscribe.getMaxAwaitDurationMs().longValue());
bulkSubscribe.setEnabled(false);
Assert.assertFalse(bulkSubscribe.isEnabled());
}
@Test
public void testSetMaxMessagesCount() {
DaprTopicBulkSubscribe bulkSubscribe = new DaprTopicBulkSubscribe(true);
// "value to be put" vs "should throw exception"
Map<Integer, Boolean> testCases = new HashMap<Integer, Boolean>() {{
put(-1, true);
put(0, true);
put(1, false);
}};
for (Map.Entry<Integer, Boolean> testCase: testCases.entrySet()) {
try {
bulkSubscribe.setMaxMessagesCount(testCase.getKey());
Assert.assertFalse(testCase.getValue());
} catch (IllegalArgumentException e) {
Assert.assertTrue(testCase.getValue());
}
}
}
@Test
public void testSetMaxAwaitDurationMs() {
DaprTopicBulkSubscribe bulkSubscribe = new DaprTopicBulkSubscribe(true);
// "value to be put" vs "should throw exception"
Map<Integer, Boolean> testCases = new HashMap<Integer, Boolean>() {{
put(-1, true);
put(0, false);
put(1, false);
}};
for (Map.Entry<Integer, Boolean> testCase: testCases.entrySet()) {
try {
bulkSubscribe.setMaxAwaitDurationMs(testCase.getKey());
Assert.assertFalse(testCase.getValue());
} catch (IllegalArgumentException e) {
Assert.assertTrue(testCase.getValue());
}
}
}
}

View File

@ -1,3 +1,16 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.dapr.springboot;
import org.springframework.web.bind.annotation.*;
@ -25,11 +38,8 @@ public class MockController {
// Do nothing
}
@RequestMapping(path = {"/bar", "bar1"}, method = {RequestMethod.GET})
public void testMethod5() {
// Do nothing
}
}

View File

@ -1,3 +1,16 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.springboot;
import org.springframework.web.bind.annotation.PostMapping;
@ -26,11 +39,8 @@ public class MockControllerNoClazzAnnotation {
// Do nothing
}
@RequestMapping(path = {"bar", "bar1"}, method = {RequestMethod.GET})
public void testMethod5() {
// Do nothing
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.dapr.springboot;
import io.dapr.Topic;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.PostMapping;
public class MockControllerWithSubscribe {
public static final String pubSubName = "mockPubSub";
public static final String topicName = "mockTopic";
public static final String bulkTopicName = "mockBulkTopic";
public static final String bulkTopicNameV2 = "mockBulkTopicV2";
public static final String subscribeRoute = "mockRoute";
public static final String bulkSubscribeRoute = "mockBulkRoute";
public static final int maxMessagesCount = 500;
public static final int maxAwaitDurationMs = 1000;
@Topic(name = topicName, pubsubName = pubSubName)
@PostMapping(path = subscribeRoute)
public void handleMessages() {}
@BulkSubscribe(maxMessagesCount = maxMessagesCount, maxAwaitDurationMs = maxAwaitDurationMs)
@Topic(name = bulkTopicName, pubsubName = pubSubName)
@PostMapping(path = bulkSubscribeRoute)
public void handleBulkMessages() {}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.dapr.springboot;
import org.springframework.util.StringValueResolver;
/**
* MockStringValueResolver resolves a string value to return itself.
*/
public class MockStringValueResolver implements StringValueResolver {
@Override
public String resolveStringValue(String s) {
return s;
}
}

View File

@ -17,6 +17,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.Metadata;
@ -75,7 +78,10 @@ public class PubSubIT extends BaseIT {
// Topic to test binary data
private static final String BINARY_TOPIC_NAME = "binarytopic";
private static final String LONG_TOPIC_NAME = "testinglongvalues";
private static final String LONG_TOPIC_NAME = "testinglongvalues";
// Topic to test bulk subscribe.
private static final String BULK_SUB_TOPIC_NAME = "topicBulkSub";
/**
* Parameters for this test.
@ -465,6 +471,68 @@ public class PubSubIT extends BaseIT {
daprRun.stop();
}
@Test
public void testPubSubBulkSubscribe() throws Exception {
DaprRun daprRun = closeLater(startDaprApp(
this.getClass().getSimpleName(),
SubscriberService.SUCCESS_MESSAGE,
SubscriberService.class,
true,
60000));
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}
// Send a batch of messages on one topic.
try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d on topic %s", i, BULK_SUB_TOPIC_NAME);
// Publishing messages
client.publishEvent(PUBSUB_NAME, BULK_SUB_TOPIC_NAME, message).block();
System.out.printf("Published message: '%s' to topic '%s' pubSub_name '%s'\n",
message, BULK_SUB_TOPIC_NAME, PUBSUB_NAME);
}
}
// Sleeps for five seconds to give subscriber a chance to receive messages.
Thread.sleep(5000);
final String appId = daprRun.getAppName();
try (DaprClient client = new DaprClientBuilder().build()) {
callWithRetry(() -> {
System.out.println("Checking results for topic " + BULK_SUB_TOPIC_NAME);
@SuppressWarnings("unchecked")
Class<List<BulkSubscribeAppResponse>> clazz = (Class) List.class;
final List<BulkSubscribeAppResponse> messages = client.invokeMethod(
appId,
"messages/" + BULK_SUB_TOPIC_NAME,
null,
HttpExtension.GET,
clazz).block();
assertNotNull(messages);
BulkSubscribeAppResponse response = OBJECT_MAPPER.convertValue(messages.get(0), BulkSubscribeAppResponse.class);
// There should be a single bulk response.
assertEquals(1, messages.size());
// The bulk response should contain NUM_MESSAGES entries.
assertEquals(NUM_MESSAGES, response.getStatuses().size());
// All the entries should be SUCCESS.
for (BulkSubscribeAppResponseEntry entry : response.getStatuses()) {
assertEquals(entry.getStatus(), BulkSubscribeAppResponseStatus.SUCCESS);
}
}, 2000);
}
daprRun.stop();
}
@Test
public void testLongValues() throws Exception {
final DaprRun daprRun = closeLater(startDaprApp(

View File

@ -15,7 +15,8 @@ package io.dapr.it.pubsub.http;
import io.dapr.Rule;
import io.dapr.Topic;
import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.*;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
@ -47,6 +48,8 @@ public class SubscriberController {
private static final List<CloudEvent> messagesReceivedTestingTopicV2 = new ArrayList();
private static final List<CloudEvent> messagesReceivedTestingTopicV3 = new ArrayList();
private static final List<BulkSubscribeAppResponse> responsesReceivedTestingTopicBulk = new ArrayList<>();
@GetMapping(path = "/messages/testingtopic")
public List<CloudEvent> getMessagesReceivedTestingTopic() {
return messagesReceivedTestingTopic;
@ -62,6 +65,11 @@ public class SubscriberController {
return messagesReceivedTestingTopicV3;
}
@GetMapping(path = "/messages/topicBulkSub")
public List<BulkSubscribeAppResponse> getMessagesReceivedTestingTopicBulkSub() {
return responsesReceivedTestingTopicBulk;
}
@Topic(name = "testingtopic", pubsubName = "messagebus")
@PostMapping("/route1")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent envelope) {
@ -181,6 +189,41 @@ public class SubscriberController {
});
}
/**
* Receive messages using the bulk subscribe API.
* The maxBulkSubCount and maxBulkSubAwaitDurationMs are adjusted to ensure
* that all the test messages arrive in a single batch.
*
* @param bulkMessage incoming bulk of messages from the message bus.
* @return status for each message received.
*/
@BulkSubscribe(maxMessagesCount = 100, maxAwaitDurationMs = 5000)
@Topic(name = "topicBulkSub", pubsubName = "messagebus")
@PostMapping(path = "/routeBulkSub")
public Mono<BulkSubscribeAppResponse> handleMessageBulk(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
if (bulkMessage.getEntries().size() == 0) {
BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(new ArrayList<>());
responsesReceivedTestingTopicBulk.add(response);
return response;
}
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<>();
for (BulkSubscribeMessageEntry<?> entry: bulkMessage.getEntries()) {
try {
System.out.printf("Bulk Subscriber got entry ID: %s\n", entry.getEntryId());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(entries);
responsesReceivedTestingTopicBulk.add(response);
return response;
});
}
private BiFunction<String, List<CloudEvent<?>>, List<CloudEvent<?>>> merge(final CloudEvent<?> item) {
return (key, value) -> {
final List<CloudEvent<?>> list = value == null ? new ArrayList<>() : value;

View File

@ -0,0 +1,42 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.ArrayList;
import java.util.List;
/**
* Response from the application containing status for each entry from the bulk publish message.
*/
public final class BulkSubscribeAppResponse {
private final List<BulkSubscribeAppResponseEntry> statuses;
/**
* Instantiate a BulkSubscribeAppResponse.
* @param statuses list of statuses.
*/
@JsonCreator
public BulkSubscribeAppResponse(
@JsonProperty("statuses") List<BulkSubscribeAppResponseEntry> statuses) {
this.statuses = new ArrayList<>();
this.statuses.addAll(statuses);
}
public List<BulkSubscribeAppResponseEntry> getStatuses() {
return statuses;
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Maps a {@link BulkSubscribeMessageEntry} to a {@link BulkSubscribeAppResponseStatus}.
*/
public final class BulkSubscribeAppResponseEntry {
private final String entryId;
private final BulkSubscribeAppResponseStatus status;
/**
* Instantiate a BulkSubscribeAppResponseEntry.
* @param entryId entry ID of the event.
* @param status status of the event processing in application.
*/
@JsonCreator
public BulkSubscribeAppResponseEntry(
@JsonProperty("entryId") String entryId,
@JsonProperty("status") BulkSubscribeAppResponseStatus status) {
this.entryId = entryId;
this.status = status;
}
public String getEntryId() {
return entryId;
}
public BulkSubscribeAppResponseStatus getStatus() {
return status;
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
/**
* Status of the message handled in bulk subscribe handler.
*/
public enum BulkSubscribeAppResponseStatus {
SUCCESS,
RETRY,
DROP
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Represents a bulk of messages received from the message bus.
*/
public final class BulkSubscribeMessage<T> {
private final List<BulkSubscribeMessageEntry<T>> entries;
private final String topic;
private final Map<String, String> metadata;
/**
* Instantiate a BulkSubscribeMessage.
* @param entries maps a {@link BulkSubscribeMessageEntry} to a {@link BulkSubscribeAppResponseStatus}.
* @param topic pubSub topic.
* @param metadata metadata for the {@link BulkSubscribeMessage}
*/
@JsonCreator
public BulkSubscribeMessage(
@JsonProperty("entries") List<BulkSubscribeMessageEntry<T>> entries,
@JsonProperty("topic") String topic,
@JsonProperty("metadata") Map<String, String> metadata) {
this.entries = Collections.unmodifiableList(entries);
this.topic = topic;
this.metadata = Collections.unmodifiableMap(metadata);
}
public List<BulkSubscribeMessageEntry<T>> getEntries() {
return entries;
}
public String getTopic() {
return topic;
}
public Map<String, String> getMetadata() {
return metadata;
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Represents a single event from a {@link BulkSubscribeMessage}.
* @param <T> Type of event.
*/
public final class BulkSubscribeMessageEntry<T> {
private final String entryId;
private final T event;
private final String contentType;
private final Map<String, String> metadata;
/**
* Instantiate a BulkPubSubMessageEntry.
* @param entryId unique identifier for the event.
* @param event pubSub event.
* @param contentType content type of the event.
* @param metadata metadata for the event.
*/
@JsonCreator
public BulkSubscribeMessageEntry(
@JsonProperty("entryId") String entryId,
@JsonProperty("event") T event,
@JsonProperty("contentType") String contentType,
@JsonProperty("metadata") Map<String, String> metadata) {
this.entryId = entryId;
this.event = event;
this.contentType = contentType;
if (metadata == null) {
metadata = new HashMap<>();
}
this.metadata = Collections.unmodifiableMap(metadata);
}
public String getEntryId() {
return entryId;
}
public T getEvent() {
return event;
}
public String getContentType() {
return contentType;
}
public Map<String, String> getMetadata() {
return metadata;
}
}