Updated enable dead letter topic (#913)

* feat: Add dead letter topic to a subscription topic.

Signed-off-by: Charlie Mk <carlosrb00@gmail.com>

* feat: Add endpoint examples

Signed-off-by: Charlie Mk <carlosrb00@gmail.com>

* feat: Add dead letter topic example to the docs.

Signed-off-by: Charlie Mk <carlosrb00@gmail.com>

* Create new methods with deadLetterTopic instead of overload existing ones.

Signed-off-by: Charlie Mk <carlosrb00@gmail.com>

* Update _index.md

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Create overloads for deadlettertopic.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* fix integration tests from names overlapping

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>

---------

Signed-off-by: Charlie Mk <carlosrb00@gmail.com>
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: Cassandra Coyle <cassie@diagrid.io>
Co-authored-by: Charlie Mk <carlosrb00@gmail.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
Co-authored-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
This commit is contained in:
Cassie Coyle 2023-09-14 19:28:29 -05:00 committed by GitHub
parent 056aed4d98
commit 0543011d59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 265 additions and 56 deletions

View File

@ -183,6 +183,26 @@ public class SubscriberController {
}); });
} }
/**
* Handles a registered publish endpoint on this app adding a topic which manage to forward undeliverable messages.
*
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
deadLetterTopic = "${deadLetterProperty:deadTopic}")
@PostMapping(path = "/testingtopic")
public Mono<Void> handleMessageWithErrorHandler(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber got: " + cloudEvent.getData());
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}", @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == 'myevent.v2'", priority = 1)) rule = @Rule(match = "event.type == 'myevent.v2'", priority = 1))
@PostMapping(path = "/testingtopicV2") @PostMapping(path = "/testingtopicV2")

View File

@ -39,9 +39,10 @@ public class SubscriberController {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/** /**
* Handles a registered publish endpoint on this app. * Handles a registered publish endpoint on this app.
* *
* @param cloudEvent The cloud event received. * @param cloudEvent The cloud event received.
* @return A message containing the time. * @return A message containing the time.
*/ */
@ -58,6 +59,27 @@ public class SubscriberController {
}); });
} }
/**
* Handles a registered publish endpoint on this app adding a topic which manage to forward undeliverable messages.
*
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
@Topic(name = "testingtopic1", pubsubName = "${myAppProperty:messagebus}",
deadLetterTopic = "${deadLetterProperty:deadTopic}")
@PostMapping(path = "/testingtopic1")
public Mono<Void> handleMessageWithErrorHandler(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber got: " + cloudEvent.getData());
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
/** /**
* Handles a registered publish endpoint on this app (version 2 of a cloud * Handles a registered publish endpoint on this app (version 2 of a cloud
* event). * event).
@ -66,6 +88,7 @@ public class SubscriberController {
* @return A message containing the time. * @return A message containing the time.
*/ */
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}", @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
deadLetterTopic = "${deadLetterProperty:deadTopic}",
rule = @Rule(match = "event.type == \"v2\"", priority = 1)) rule = @Rule(match = "event.type == \"v2\"", priority = 1))
@PostMapping(path = "/testingtopicV2") @PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) { public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
@ -84,7 +107,8 @@ public class SubscriberController {
* @param cloudEvent The cloud event received. * @param cloudEvent The cloud event received.
* @return A message containing the time. * @return A message containing the time.
*/ */
@Topic(name = "bulkpublishtesting", pubsubName = "${myAppProperty:messagebus}") @Topic(name = "bulkpublishtesting", pubsubName = "${myAppProperty:messagebus}",
deadLetterTopic = "${deadLetterProperty:deadTopic}")
@PostMapping(path = "/bulkpublishtesting") @PostMapping(path = "/bulkpublishtesting")
public Mono<Void> handleBulkPublishMessage(@RequestBody(required = false) CloudEvent cloudEvent) { public Mono<Void> handleBulkPublishMessage(@RequestBody(required = false) CloudEvent cloudEvent) {
return Mono.fromRunnable(() -> { return Mono.fromRunnable(() -> {
@ -104,7 +128,8 @@ public class SubscriberController {
* @return A list of responses for each event. * @return A list of responses for each event.
*/ */
@BulkSubscribe() @BulkSubscribe()
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}") @Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}",
deadLetterTopic = "${deadLetterProperty:deadTopic}")
@PostMapping(path = "/testingtopicbulk") @PostMapping(path = "/testingtopicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage( public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) { @RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {

View File

@ -107,6 +107,7 @@ public class DaprBeanPostProcessor implements BeanPostProcessor {
Rule rule = topic.rule(); Rule rule = topic.rule();
String topicName = stringValueResolver.resolveStringValue(topic.name()); String topicName = stringValueResolver.resolveStringValue(topic.name());
String pubSubName = stringValueResolver.resolveStringValue(topic.pubsubName()); String pubSubName = stringValueResolver.resolveStringValue(topic.pubsubName());
String deadLetterTopic = stringValueResolver.resolveStringValue(topic.deadLetterTopic());
String match = stringValueResolver.resolveStringValue(rule.match()); String match = stringValueResolver.resolveStringValue(rule.match());
if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) { if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
try { try {
@ -116,7 +117,7 @@ public class DaprBeanPostProcessor implements BeanPostProcessor {
List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName); List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName);
for (String route : routes) { for (String route : routes) {
daprRuntime.addSubscribedTopic( daprRuntime.addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata, bulkSubscribe); pubSubName, topicName, match, rule.priority(), route, deadLetterTopic, metadata, bulkSubscribe);
} }
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IllegalArgumentException("Error while parsing metadata: " + e); throw new IllegalArgumentException("Error while parsing metadata: " + e);

View File

@ -22,6 +22,7 @@ import java.util.stream.Collectors;
* Internal Singleton to handle Dapr configuration. * Internal Singleton to handle Dapr configuration.
*/ */
class DaprRuntime { class DaprRuntime {
/** /**
* The singleton instance. * The singleton instance.
*/ */
@ -33,8 +34,8 @@ class DaprRuntime {
private final Map<DaprTopicKey, DaprSubscriptionBuilder> subscriptionBuilders = new HashMap<>(); private final Map<DaprTopicKey, DaprSubscriptionBuilder> subscriptionBuilders = new HashMap<>();
/** /**
* DaprRuntime should be used as a singleton, using {@link DaprRuntime#getInstance()}. * DaprRuntime should be used as a singleton, using {@link DaprRuntime#getInstance()}. The
* The constructor's default scope is available for unit tests only. * constructor's default scope is available for unit tests only.
*/ */
private DaprRuntime() { private DaprRuntime() {
} }
@ -60,30 +61,30 @@ class DaprRuntime {
* Adds a topic to the list of subscribed topics. * Adds a topic to the list of subscribed topics.
* *
* @param pubSubName PubSub name to subscribe to. * @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to. * @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route. * @param match Match expression for this route.
* @param priority Priority for this match relative to others. * @param priority Priority for this match relative to others.
* @param route Destination route for requests. * @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality. * @param metadata Metadata for extended subscription functionality.
*/ */
public synchronized void addSubscribedTopic(String pubSubName, public synchronized void addSubscribedTopic(String pubSubName,
String topicName, String topicName,
String match, String match,
int priority, int priority,
String route, String route,
Map<String,String> metadata) { Map<String, String> metadata) {
this.addSubscribedTopic(pubSubName, topicName, match, priority, route, metadata, null); this.addSubscribedTopic(pubSubName, topicName, match, priority, route, metadata, null);
} }
/** /**
* Adds a topic to the list of subscribed topics. * Adds a topic to the list of subscribed topics.
* *
* @param pubSubName PubSub name to subscribe to. * @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to. * @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route. * @param match Match expression for this route.
* @param priority Priority for this match relative to others. * @param priority Priority for this match relative to others.
* @param route Destination route for requests. * @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality. * @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration. * @param bulkSubscribe Bulk subscribe configuration.
*/ */
public synchronized void addSubscribedTopic(String pubSubName, public synchronized void addSubscribedTopic(String pubSubName,
@ -91,7 +92,53 @@ class DaprRuntime {
String match, String match,
int priority, int priority,
String route, String route,
Map<String,String> metadata, Map<String, String> metadata,
DaprTopicBulkSubscribe bulkSubscribe) {
this.addSubscribedTopic(pubSubName, topicName, match, priority, route, null,
metadata, bulkSubscribe);
}
/**
* Adds a topic to the list of subscribed topics.
*
* @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param deadLetterTopic Name of topic to forward undeliverable messages.
* @param metadata Metadata for extended subscription functionality.
*/
public synchronized void addSubscribedTopic(String pubSubName,
String topicName,
String match,
int priority,
String route,
String deadLetterTopic,
Map<String, String> metadata) {
this.addSubscribedTopic(pubSubName, topicName, match, priority, route, deadLetterTopic,
metadata, null);
}
/**
* Adds a topic to the list of subscribed topics.
*
* @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param deadLetterTopic Name of topic to forward undeliverable messages.
* @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration.
*/
public synchronized void addSubscribedTopic(String pubSubName,
String topicName,
String match,
int priority,
String route,
String deadLetterTopic,
Map<String, String> metadata,
DaprTopicBulkSubscribe bulkSubscribe) { DaprTopicBulkSubscribe bulkSubscribe) {
DaprTopicKey topicKey = new DaprTopicKey(pubSubName, topicName); DaprTopicKey topicKey = new DaprTopicKey(pubSubName, topicName);
@ -111,6 +158,10 @@ class DaprRuntime {
builder.setMetadata(metadata); builder.setMetadata(metadata);
} }
if (deadLetterTopic != null && !deadLetterTopic.isEmpty()) {
builder.setDeadLetterTopic(deadLetterTopic);
}
if (bulkSubscribe != null) { if (bulkSubscribe != null) {
builder.setBulkSubscribe(bulkSubscribe); builder.setBulkSubscribe(bulkSubscribe);
} }
@ -118,7 +169,7 @@ class DaprRuntime {
public synchronized DaprTopicSubscription[] listSubscribedTopics() { public synchronized DaprTopicSubscription[] listSubscribedTopics() {
List<DaprTopicSubscription> values = subscriptionBuilders.values().stream() List<DaprTopicSubscription> values = subscriptionBuilders.values().stream()
.map(b -> b.build()).collect(Collectors.toList()); .map(b -> b.build()).collect(Collectors.toList());
return values.toArray(new DaprTopicSubscription[0]); return values.toArray(new DaprTopicSubscription[0]);
} }
} }

View File

@ -21,9 +21,11 @@ import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
class DaprSubscriptionBuilder { class DaprSubscriptionBuilder {
private final String pubsubName; private final String pubsubName;
private final String topic; private final String topic;
private final List<TopicRule> rules; private final List<TopicRule> rules;
private String deadLetterTopic;
private String defaultPath; private String defaultPath;
private Map<String, String> metadata; private Map<String, String> metadata;
@ -31,19 +33,22 @@ class DaprSubscriptionBuilder {
/** /**
* Create a subscription topic. * Create a subscription topic.
*
* @param pubsubName The pubsub name to subscribe to. * @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to. * @param topic The topic to subscribe to.
*/ */
DaprSubscriptionBuilder(String pubsubName, String topic) { DaprSubscriptionBuilder(String pubsubName, String topic) {
this.pubsubName = pubsubName; this.pubsubName = pubsubName;
this.topic = topic; this.topic = topic;
this.rules = new ArrayList<>(); this.rules = new ArrayList<>();
this.deadLetterTopic = null;
this.defaultPath = null; this.defaultPath = null;
this.metadata = Collections.emptyMap(); this.metadata = Collections.emptyMap();
} }
/** /**
* Sets the default path for the subscription. * Sets the default path for the subscription.
*
* @param path The default path. * @param path The default path.
* @return this instance. * @return this instance.
*/ */
@ -51,28 +56,48 @@ class DaprSubscriptionBuilder {
if (defaultPath != null) { if (defaultPath != null) {
if (!defaultPath.equals(path)) { if (!defaultPath.equals(path)) {
throw new RuntimeException( throw new RuntimeException(
String.format( String.format(
"a default route is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')", "a default route is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')",
this.topic, this.pubsubName, this.defaultPath, path)); this.topic, this.pubsubName, this.defaultPath, path));
} }
} }
defaultPath = path; defaultPath = path;
return this; return this;
} }
/**
* Sets the dead letter topic for the subscription.
*
* @param deadLetterTopic Name of dead letter topic.
* @return this instance.
*/
DaprSubscriptionBuilder setDeadLetterTopic(String deadLetterTopic) {
if (this.deadLetterTopic != null) {
if (!this.deadLetterTopic.equals(deadLetterTopic)) {
throw new RuntimeException(
String.format(
"a default dead letter topic is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')",
this.topic, this.pubsubName, this.deadLetterTopic, deadLetterTopic));
}
}
this.deadLetterTopic = deadLetterTopic;
return this;
}
/** /**
* Adds a rule to the subscription. * Adds a rule to the subscription.
* @param path The path to route to. *
* @param match The CEL expression the event must match. * @param path The path to route to.
* @param match The CEL expression the event must match.
* @param priority The priority of the rule. * @param priority The priority of the rule.
* @return this instance. * @return this instance.
*/ */
public DaprSubscriptionBuilder addRule(String path, String match, int priority) { public DaprSubscriptionBuilder addRule(String path, String match, int priority) {
if (rules.stream().anyMatch(e -> e.getPriority() == priority)) { if (rules.stream().anyMatch(e -> e.getPriority() == priority)) {
throw new RuntimeException( throw new RuntimeException(
String.format( String.format(
"a rule priority of %d is already used for topic %s on pubsub %s", "a rule priority of %d is already used for topic %s on pubsub %s",
priority, this.topic, this.pubsubName)); priority, this.topic, this.pubsubName));
} }
rules.add(new TopicRule(path, match, priority)); rules.add(new TopicRule(path, match, priority));
return this; return this;
@ -80,6 +105,7 @@ class DaprSubscriptionBuilder {
/** /**
* Sets the metadata for the subscription. * Sets the metadata for the subscription.
*
* @param metadata The metadata. * @param metadata The metadata.
* @return this instance. * @return this instance.
*/ */
@ -90,6 +116,7 @@ class DaprSubscriptionBuilder {
/** /**
* Sets the bulkSubscribe configuration for the subscription. * Sets the bulkSubscribe configuration for the subscription.
*
* @param bulkSubscribe The bulk subscribe configuration. * @param bulkSubscribe The bulk subscribe configuration.
* @return this instance. * @return this instance.
*/ */
@ -100,6 +127,7 @@ class DaprSubscriptionBuilder {
/** /**
* Builds the DaprTopicSubscription that is returned by the application to Dapr. * Builds the DaprTopicSubscription that is returned by the application to Dapr.
*
* @return The DaprTopicSubscription. * @return The DaprTopicSubscription.
*/ */
public DaprTopicSubscription build() { public DaprTopicSubscription build() {
@ -109,16 +137,19 @@ class DaprSubscriptionBuilder {
if (!rules.isEmpty()) { if (!rules.isEmpty()) {
Collections.sort(rules, Comparator.comparingInt(TopicRule::getPriority)); Collections.sort(rules, Comparator.comparingInt(TopicRule::getPriority));
List<DaprTopicRule> topicRules = rules.stream() List<DaprTopicRule> topicRules = rules.stream()
.map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList()); .map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList());
routes = new DaprTopicRoutes(topicRules, defaultPath); routes = new DaprTopicRoutes(topicRules, defaultPath);
} else { } else {
route = defaultPath; route = defaultPath;
} }
return new DaprTopicSubscription(this.pubsubName, this.topic, route, routes, metadata, bulkSubscribe); return new DaprTopicSubscription(this.pubsubName, this.topic, route, this.deadLetterTopic,
routes, metadata,
bulkSubscribe);
} }
private static class TopicRule { private static class TopicRule {
private final String path; private final String path;
private final String match; private final String match;
private final int priority; private final int priority;

View File

@ -23,6 +23,7 @@ class DaprTopicSubscription {
private final String pubsubName; private final String pubsubName;
private final String topic; private final String topic;
private final String route; private final String route;
private final String deadLetterTopic;
private final DaprTopicRoutes routes; private final DaprTopicRoutes routes;
private final Map<String, String> metadata; private final Map<String, String> metadata;
private final DaprTopicBulkSubscribe bulkSubscribe; private final DaprTopicBulkSubscribe bulkSubscribe;
@ -32,7 +33,7 @@ class DaprTopicSubscription {
* @param pubsubName The pubsub name to subscribe to. * @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to. * @param topic The topic to subscribe to.
* @param route Destination route for messages. * @param route Destination route for messages.
* @param metadata Metdata for extended subscription functionality. * @param metadata Metadata for extended subscription functionality.
*/ */
DaprTopicSubscription(String pubsubName, String topic, String route, Map<String, String> metadata) { DaprTopicSubscription(String pubsubName, String topic, String route, Map<String, String> metadata) {
this(pubsubName, topic, route, metadata, null); this(pubsubName, topic, route, metadata, null);
@ -43,24 +44,12 @@ class DaprTopicSubscription {
* @param pubsubName The pubsub name to subscribe to. * @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to. * @param topic The topic to subscribe to.
* @param route Destination route for messages. * @param route Destination route for messages.
* @param metadata Metdata for extended subscription functionality. * @param deadLetterTopic Name of topic to forward undeliverable messages.
*/
DaprTopicSubscription(String pubsubName, String topic, String route,
Map<String, String> metadata, DaprTopicBulkSubscribe bulkSubscribe) {
this(pubsubName, topic, route, null, metadata, bulkSubscribe);
}
/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param routes Destination routes with rules for messages.
* @param metadata Metadata for extended subscription functionality. * @param metadata Metadata for extended subscription functionality.
*/ */
DaprTopicSubscription(String pubsubName, String topic, String route, DaprTopicRoutes routes, DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic,
Map<String, String> metadata) { Map<String, String> metadata) {
this(pubsubName, topic, route, routes, metadata, null); this(pubsubName, topic, route, deadLetterTopic, null, metadata, null);
} }
/** /**
@ -68,16 +57,93 @@ class DaprTopicSubscription {
* @param pubsubName The pubsub name to subscribe to. * @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to. * @param topic The topic to subscribe to.
* @param route Destination route for messages. * @param route Destination route for messages.
* @param routes Destination routes with rules for messages.
* @param metadata Metadata for extended subscription functionality. * @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration. * @param bulkSubscribe Bulk subscribe configuration.
*/ */
DaprTopicSubscription(String pubsubName, String topic, String route,
Map<String, String> metadata, DaprTopicBulkSubscribe bulkSubscribe) {
this(pubsubName, topic, route, "", null, metadata, bulkSubscribe);
}
/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param deadLetterTopic Name of topic to forward undeliverable messages.
* @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration.
*/
DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic,
Map<String, String> metadata, DaprTopicBulkSubscribe bulkSubscribe) {
this(pubsubName, topic, route, deadLetterTopic, null, metadata, bulkSubscribe);
}
/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param routes Destination routes with rules for messages.
* @param metadata Metadata for extended subscription functionality.
*/
DaprTopicSubscription(String pubsubName, String topic, String route, DaprTopicRoutes routes, DaprTopicSubscription(String pubsubName, String topic, String route, DaprTopicRoutes routes,
Map<String, String> metadata, DaprTopicBulkSubscribe bulkSubscribe) { Map<String, String> metadata) {
this(pubsubName, topic, route, "", routes, metadata, null);
}
/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param deadLetterTopic Name of topic to forward undeliverable messages.
* @param routes Destination routes with rules for messages.
* @param metadata Metadata for extended subscription functionality.
*/
DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic, DaprTopicRoutes routes,
Map<String, String> metadata) {
this(pubsubName, topic, route, deadLetterTopic, routes, metadata, null);
}
/**
* Create a subscription topic.
*
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param routes Destination routes with rules for messages.
* @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration.
*/
DaprTopicSubscription(String pubsubName, String topic, String route,
DaprTopicRoutes routes,
Map<String, String> metadata,
DaprTopicBulkSubscribe bulkSubscribe) {
this(pubsubName, topic, route, "", routes, metadata, bulkSubscribe);
}
/**
* Create a subscription topic.
*
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param deadLetterTopic Name of topic to forward undeliverable messages.
* @param routes Destination routes with rules for messages.
* @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration.
*/
DaprTopicSubscription(String pubsubName, String topic, String route, String deadLetterTopic,
DaprTopicRoutes routes,
Map<String, String> metadata,
DaprTopicBulkSubscribe bulkSubscribe) {
this.pubsubName = pubsubName; this.pubsubName = pubsubName;
this.topic = topic; this.topic = topic;
this.route = route; this.route = route;
this.routes = routes; this.routes = routes;
this.deadLetterTopic = deadLetterTopic;
this.metadata = Collections.unmodifiableMap(metadata); this.metadata = Collections.unmodifiableMap(metadata);
this.bulkSubscribe = bulkSubscribe; this.bulkSubscribe = bulkSubscribe;
} }
@ -98,6 +164,10 @@ class DaprTopicSubscription {
return routes; return routes;
} }
public String getDeadLetterTopic() {
return deadLetterTopic;
}
public Map<String, String> getMetadata() { public Map<String, String> getMetadata() {
return metadata; return metadata;
} }

View File

@ -74,6 +74,7 @@ public class DaprBeanPostProcessorSubscribeTest {
MockControllerWithSubscribe.pubSubName, MockControllerWithSubscribe.pubSubName,
MockControllerWithSubscribe.topicName, MockControllerWithSubscribe.topicName,
MockControllerWithSubscribe.subscribeRoute, MockControllerWithSubscribe.subscribeRoute,
MockControllerWithSubscribe.deadLetterTopic,
new HashMap<>()); new HashMap<>());
DaprTopicBulkSubscribe bulkSubscribe = new DaprTopicBulkSubscribe(true); DaprTopicBulkSubscribe bulkSubscribe = new DaprTopicBulkSubscribe(true);
@ -84,6 +85,7 @@ public class DaprBeanPostProcessorSubscribeTest {
MockControllerWithSubscribe.pubSubName, MockControllerWithSubscribe.pubSubName,
MockControllerWithSubscribe.bulkTopicName, MockControllerWithSubscribe.bulkTopicName,
MockControllerWithSubscribe.bulkSubscribeRoute, MockControllerWithSubscribe.bulkSubscribeRoute,
MockControllerWithSubscribe.deadLetterTopic,
new HashMap<>(), new HashMap<>(),
bulkSubscribe); bulkSubscribe);

View File

@ -13,6 +13,7 @@ public class DaprRuntimeTest {
public void testPubsubDefaultPathDuplicateRegistration() { public void testPubsubDefaultPathDuplicateRegistration() {
String pubSubName = "pubsub"; String pubSubName = "pubsub";
String topicName = "topic"; String topicName = "topic";
String deadLetterTopic = "deadLetterTopic";
String match = ""; String match = "";
String route = String.format("%s/%s", pubSubName, topicName); String route = String.format("%s/%s", pubSubName, topicName);
HashMap<String, String> metadata = new HashMap<String, String>(); HashMap<String, String> metadata = new HashMap<String, String>();
@ -36,15 +37,16 @@ public class DaprRuntimeTest {
// We should be able to register the same route multiple times // We should be able to register the same route multiple times
runtime.addSubscribedTopic( runtime.addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata); pubSubName, topicName, match, rule.priority(), route,deadLetterTopic, metadata);
runtime.addSubscribedTopic( runtime.addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata); pubSubName, topicName, match, rule.priority(), route,deadLetterTopic, metadata);
} }
@Test(expected = RuntimeException.class) @Test(expected = RuntimeException.class)
public void testPubsubDefaultPathDifferentRegistration() { public void testPubsubDefaultPathDifferentRegistration() {
String pubSubName = "pubsub"; String pubSubName = "pubsub";
String topicName = "topic"; String topicName = "topic";
String deadLetterTopic = "deadLetterTopic";
String match = ""; String match = "";
String firstRoute = String.format("%s/%s", pubSubName, topicName); String firstRoute = String.format("%s/%s", pubSubName, topicName);
String secondRoute = String.format("%s/%s/subscribe", pubSubName, topicName); String secondRoute = String.format("%s/%s/subscribe", pubSubName, topicName);
@ -70,11 +72,11 @@ public class DaprRuntimeTest {
Assert.assertNotNull(runtime); Assert.assertNotNull(runtime);
runtime.addSubscribedTopic( runtime.addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), firstRoute, metadata); pubSubName, topicName, match, rule.priority(), firstRoute, deadLetterTopic, metadata);
// Supplying the same pubsub bits but a different route should fail // Supplying the same pubsub bits but a different route should fail
runtime.addSubscribedTopic( runtime.addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), secondRoute, metadata); pubSubName, topicName, match, rule.priority(), secondRoute, deadLetterTopic, metadata);
} }

View File

@ -20,6 +20,7 @@ import org.springframework.web.bind.annotation.PostMapping;
public class MockControllerWithSubscribe { public class MockControllerWithSubscribe {
public static final String pubSubName = "mockPubSub"; public static final String pubSubName = "mockPubSub";
public static final String topicName = "mockTopic"; public static final String topicName = "mockTopic";
public static final String deadLetterTopic = "deadLetterTopic";
public static final String bulkTopicName = "mockBulkTopic"; public static final String bulkTopicName = "mockBulkTopic";
public static final String bulkTopicNameV2 = "mockBulkTopicV2"; public static final String bulkTopicNameV2 = "mockBulkTopicV2";
public static final String subscribeRoute = "mockRoute"; public static final String subscribeRoute = "mockRoute";
@ -27,12 +28,12 @@ public class MockControllerWithSubscribe {
public static final int maxMessagesCount = 500; public static final int maxMessagesCount = 500;
public static final int maxAwaitDurationMs = 1000; public static final int maxAwaitDurationMs = 1000;
@Topic(name = topicName, pubsubName = pubSubName) @Topic(name = topicName, pubsubName = pubSubName, deadLetterTopic = deadLetterTopic)
@PostMapping(path = subscribeRoute) @PostMapping(path = subscribeRoute)
public void handleMessages() {} public void handleMessages() {}
@BulkSubscribe(maxMessagesCount = maxMessagesCount, maxAwaitDurationMs = maxAwaitDurationMs) @BulkSubscribe(maxMessagesCount = maxMessagesCount, maxAwaitDurationMs = maxAwaitDurationMs)
@Topic(name = bulkTopicName, pubsubName = pubSubName) @Topic(name = bulkTopicName, pubsubName = pubSubName,deadLetterTopic = deadLetterTopic)
@PostMapping(path = bulkSubscribeRoute) @PostMapping(path = bulkSubscribeRoute)
public void handleBulkMessages() {} public void handleBulkMessages() {}
} }

View File

@ -50,4 +50,10 @@ public @interface Topic {
* @return metadata object * @return metadata object
*/ */
String metadata() default "{}"; String metadata() default "{}";
/**
* Name of dead letter topic to forward undeliverable messages.
* @return dead letter topic's name.
*/
String deadLetterTopic() default "";
} }