Adding pubsub routing support (#631)

* Adding pubsub routing support

Signed-off-by: Artur Souza <artursouza.ms@outlook.com>

* remove deprecated classes (#704)

* remove deprecated classes

Signed-off-by: Mukundan Sundararajan <msundar.ms@outlook.com>

* remove builder class ref in ITs

Signed-off-by: Mukundan Sundararajan <msundar.ms@outlook.com>

* fix setter in IT

Signed-off-by: Mukundan Sundararajan <msundar.ms@outlook.com>
Signed-off-by: Artur Souza <artursouza.ms@outlook.com>

* Address PR comments.

Signed-off-by: Artur Souza <artursouza.ms@outlook.com>

Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
Phil Kedy 2022-06-23 14:40:10 -04:00 committed by GitHub
parent a9672c4438
commit 2235e3d978
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 523 additions and 26 deletions

View File

@ -172,6 +172,20 @@ public class SubscriberController {
});
}
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == 'myevent.v2'", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber got: " + cloudEvent.getData());
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
```

View File

@ -14,6 +14,7 @@ limitations under the License.
package io.dapr.examples.pubsub.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Rule;
import io.dapr.Topic;
import io.dapr.client.domain.CloudEvent;
import org.springframework.web.bind.annotation.PostMapping;
@ -47,4 +48,23 @@ public class SubscriberController {
});
}
/**
* Handles a registered publish endpoint on this app (version 2 of a cloud event).
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == \"v2\"", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber got: " + cloudEvent.getData());
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

View File

@ -16,6 +16,7 @@ package io.dapr.springboot;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Rule;
import io.dapr.Topic;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
@ -26,7 +27,6 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
@ -85,8 +85,11 @@ public class DaprBeanPostProcessor implements BeanPostProcessor {
continue;
}
Rule rule = topic.rule();
String topicName = embeddedValueResolver.resolveStringValue(topic.name());
String pubSubName = embeddedValueResolver.resolveStringValue(topic.pubsubName());
String match = embeddedValueResolver.resolveStringValue(rule.match());
if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
try {
TypeReference<HashMap<String, String>> typeRef
@ -94,12 +97,11 @@ public class DaprBeanPostProcessor implements BeanPostProcessor {
Map<String, String> metadata = MAPPER.readValue(topic.metadata(), typeRef);
List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName);
for (String route : routes) {
DaprRuntime.getInstance().addSubscribedTopic(pubSubName, topicName, route,
metadata);
DaprRuntime.getInstance().addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata);
}
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Error while parsing metadata: " + e.toString());
throw new IllegalArgumentException("Error while parsing metadata: " + e);
}
}
}

View File

@ -13,11 +13,12 @@ limitations under the License.
package io.dapr.springboot;
import java.util.ArrayList;
import java.util.HashSet;
import io.dapr.Rule;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Internal Singleton to handle Dapr configuration.
@ -30,14 +31,9 @@ class DaprRuntime {
private static volatile DaprRuntime instance;
/**
* List of subscribed topics.
* Map of subscription builders.
*/
private final Set<String> subscribedTopics = new HashSet<>();
/**
* List of subscriptions.
*/
private final List<DaprTopicSubscription> subscriptions = new ArrayList<>();
private final Map<DaprTopicKey, DaprSubscriptionBuilder> subscriptionBuilders = new HashMap<>();
/**
* Private constructor to make this singleton.
@ -67,20 +63,38 @@ class DaprRuntime {
*
* @param pubsubName Pubsub name to subcribe to.
* @param topicName Name of the topic being subscribed to.
* @param rule The optional rule for this route.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
*/
public synchronized void addSubscribedTopic(String pubsubName,
String topicName,
String match,
int priority,
String route,
Map<String,String> metadata) {
if (!this.subscribedTopics.contains(topicName)) {
this.subscribedTopics.add(topicName);
this.subscriptions.add(new DaprTopicSubscription(pubsubName, topicName, route, metadata));
DaprTopicKey topicKey = new DaprTopicKey(pubsubName, topicName);
DaprSubscriptionBuilder builder = subscriptionBuilders.get(topicKey);
if (builder == null) {
builder = new DaprSubscriptionBuilder(pubsubName, topicName);
subscriptionBuilders.put(topicKey, builder);
}
if (match.length() > 0) {
builder.addRule(route, match, priority);
} else {
builder.setDefaultPath(route);
}
if (metadata != null && !metadata.isEmpty()) {
builder.setMetadata(metadata);
}
}
public synchronized DaprTopicSubscription[] listSubscribedTopics() {
return this.subscriptions.toArray(new DaprTopicSubscription[0]);
List<DaprTopicSubscription> values = subscriptionBuilders.values().stream()
.map(b -> b.build()).collect(Collectors.toList());
return values.toArray(new DaprTopicSubscription[0]);
}
}

View File

@ -0,0 +1,122 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.springboot;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
class DaprSubscriptionBuilder {
private final String pubsubName;
private final String topic;
private final List<TopicRule> rules;
private String defaultPath;
private Map<String, String> metadata;
/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
*/
DaprSubscriptionBuilder(String pubsubName, String topic) {
this.pubsubName = pubsubName;
this.topic = topic;
this.rules = new ArrayList<>();
this.defaultPath = null;
this.metadata = Collections.emptyMap();
}
/**
* Sets the default path for the subscription.
* @param path The default path.
* @return this instance.
*/
DaprSubscriptionBuilder setDefaultPath(String path) {
if (defaultPath != null) {
throw new RuntimeException(
String.format(
"a default route is already set for topic %s on pubsub %s",
this.topic, this.pubsubName));
}
defaultPath = path;
return this;
}
/**
* Adds a rule to the subscription.
* @param path The path to route to.
* @param match The CEL expression the event must match.
* @param priority The priority of the rule.
* @return this instance.
*/
public DaprSubscriptionBuilder addRule(String path, String match, int priority) {
if (rules.stream().anyMatch(e -> e.getPriority() == priority)) {
throw new RuntimeException(
String.format(
"a rule priority of %d is already used for topic %s on pubsub %s",
priority, this.topic, this.pubsubName));
}
rules.add(new TopicRule(path, match, priority));
return this;
}
/**
* Sets the metadata for the subscription.
* @param metadata The metadata.
* @return this instance.
*/
public DaprSubscriptionBuilder setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
return this;
}
/**
* Builds the DaprTopicSubscription that is returned by the application to Dapr.
* @return The DaprTopicSubscription.
*/
public DaprTopicSubscription build() {
String route = null;
DaprTopicRoutes routes = null;
if (!rules.isEmpty()) {
Collections.sort(rules, Comparator.comparingInt(TopicRule::getPriority));
List<DaprTopicRule> topicRules = rules.stream()
.map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList());
routes = new DaprTopicRoutes(topicRules, defaultPath);
} else {
route = defaultPath;
}
return new DaprTopicSubscription(this.pubsubName, this.topic, route, routes, metadata);
}
private static class TopicRule {
private final String path;
private final String match;
private final int priority;
public TopicRule(String path, String match, int priority) {
this.path = path;
this.match = match;
this.priority = priority;
}
public int getPriority() {
return priority;
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.springboot;
import java.util.Objects;
class DaprTopicKey {
private final String pubsubName;
private final String topic;
DaprTopicKey(String pubsubName, String topic) {
this.pubsubName = pubsubName;
this.topic = topic;
}
public String getPubsubName() {
return pubsubName;
}
public String getTopic() {
return topic;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DaprTopicKey that = (DaprTopicKey) o;
return pubsubName.equals(that.pubsubName) && topic.equals(that.topic);
}
@Override
public int hashCode() {
return Objects.hash(pubsubName, topic);
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.springboot;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import java.util.Optional;
class DaprTopicRoutes {
private final List<DaprTopicRule> rules;
@JsonProperty("default")
private final String defaultRoute;
DaprTopicRoutes(List<DaprTopicRule> rules, String defaultRoute) {
this.rules = rules;
this.defaultRoute = defaultRoute;
}
public List<DaprTopicRule> getRules() {
return rules;
}
public String getDefaultRoute() {
return defaultRoute;
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.springboot;
import java.util.Objects;
class DaprTopicRule {
private final String match;
private final String path;
DaprTopicRule(String match, String path) {
this.match = match;
this.path = path;
}
public String getMatch() {
return match;
}
public String getPath() {
return path;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DaprTopicRule that = (DaprTopicRule) o;
return match.equals(that.match) && path.equals(that.path);
}
@Override
public int hashCode() {
return Objects.hash(match, path);
}
}

View File

@ -19,10 +19,11 @@ import java.util.Map;
/**
* Class to represent a subscription topic along with its metadata.
*/
public class DaprTopicSubscription {
class DaprTopicSubscription {
private final String pubsubName;
private final String topic;
private final String route;
private final DaprTopicRoutes routes;
private final Map<String, String> metadata;
/**
@ -32,10 +33,24 @@ public class DaprTopicSubscription {
* @param route Destination route for messages.
* @param metadata Metdata for extended subscription functionality.
*/
public DaprTopicSubscription(String pubsubName, String topic, String route, Map<String, String> metadata) {
DaprTopicSubscription(String pubsubName, String topic, String route, Map<String, String> metadata) {
this(pubsubName, topic, route, null, metadata);
}
/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param routes Destination routes with rules for messages.
* @param metadata Metdata for extended subscription functionality.
*/
DaprTopicSubscription(String pubsubName, String topic, String route,
DaprTopicRoutes routes, Map<String, String> metadata) {
this.pubsubName = pubsubName;
this.topic = topic;
this.route = route;
this.routes = routes;
this.metadata = Collections.unmodifiableMap(metadata);
}
@ -51,6 +66,10 @@ public class DaprTopicSubscription {
return route;
}
public DaprTopicRoutes getRoutes() {
return routes;
}
public Map<String, String> getMetadata() {
return metadata;
}

View File

@ -19,3 +19,5 @@ spec:
value: sample
- name: authRequired
value: "false"
- name: initialOffset
value: oldest

View File

@ -0,0 +1,12 @@
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: testconfiguration
spec:
tracing:
samplingRate: "1"
zipkin:
endpointAddress: http://localhost:9411/api/v2/spans
features:
- name: PubSub.Routing
enabled: true

View File

@ -28,7 +28,9 @@ public class DaprRun implements Stoppable {
private static final String DAPR_SUCCESS_MESSAGE = "You're up and running!";
private static final String DAPR_RUN = "dapr run --app-id %s --app-protocol %s --components-path ./components";
private static final String DAPR_RUN = "dapr run --app-id %s --app-protocol %s " +
"--config ./configurations/configuration.yaml " +
"--components-path ./components";
// the arg in -Dexec.args is the app's port
private static final String DAPR_COMMAND =

View File

@ -210,7 +210,35 @@ public class PubSubIT extends BaseIT {
.setContentType("application/cloudevents+json")).block();
System.out.println("Published one cloud event.");
Thread.sleep(3000);
{
CloudEvent cloudEventV2 = new CloudEvent();
cloudEventV2.setId("2222");
cloudEventV2.setData("message from cloudevent v2");
cloudEventV2.setSource("test");
cloudEventV2.setSpecversion("1");
cloudEventV2.setType("myevent.v2");
cloudEventV2.setDatacontenttype("text/plain");
client.publishEvent(
new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV2)
.setContentType("application/cloudevents+json")).block();
System.out.println("Published one cloud event for v2.");
}
{
CloudEvent cloudEventV3 = new CloudEvent();
cloudEventV3.setId("3333");
cloudEventV3.setData("message from cloudevent v3");
cloudEventV3.setSource("test");
cloudEventV3.setSpecversion("1");
cloudEventV3.setType("myevent.v3");
cloudEventV3.setDatacontenttype("text/plain");
client.publishEvent(
new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV3)
.setContentType("application/cloudevents+json")).block();
System.out.println("Published one cloud event for v3.");
}
Thread.sleep(2000);
callWithRetry(() -> {
System.out.println("Checking results for topic " + TOPIC_NAME);
@ -258,6 +286,30 @@ public class PubSubIT extends BaseIT {
.count() == 1);
}, 2000);
callWithRetry(() -> {
System.out.println("Checking results for topic " + TOPIC_NAME + " V2");
// Validate text payload.
final List<CloudEvent> messages = client.invokeMethod(
daprRun.getAppName(),
"messages/testingtopicV2",
null,
HttpExtension.GET,
CLOUD_EVENT_LIST_TYPE_REF).block();
assertEquals(1, messages.size());
}, 2000);
callWithRetry(() -> {
System.out.println("Checking results for topic " + TOPIC_NAME + " V3");
// Validate text payload.
final List<CloudEvent> messages = client.invokeMethod(
daprRun.getAppName(),
"messages/testingtopicV3",
null,
HttpExtension.GET,
CLOUD_EVENT_LIST_TYPE_REF).block();
assertEquals(1, messages.size());
}, 2000);
callWithRetry(() -> {
System.out.println("Checking results for topic " + TYPED_TOPIC_NAME);
// Validate object payload.

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.it.pubsub.http;
import io.dapr.Rule;
import io.dapr.Topic;
import io.dapr.client.domain.CloudEvent;
import org.springframework.web.bind.annotation.GetMapping;
@ -42,6 +43,25 @@ public class SubscriberController {
return messagesByTopic.getOrDefault(topic, Collections.emptyList());
}
private static final List<CloudEvent> messagesReceivedTestingTopic = new ArrayList();
private static final List<CloudEvent> messagesReceivedTestingTopicV2 = new ArrayList();
private static final List<CloudEvent> messagesReceivedTestingTopicV3 = new ArrayList();
@GetMapping(path = "/messages/testingtopic")
public List<CloudEvent> getMessagesReceivedTestingTopic() {
return messagesReceivedTestingTopic;
}
@GetMapping(path = "/messages/testingtopicV2")
public List<CloudEvent> getMessagesReceivedTestingTopicV2() {
return messagesReceivedTestingTopicV2;
}
@GetMapping(path = "/messages/testingtopicV3")
public List<CloudEvent> getMessagesReceivedTestingTopicV3() {
return messagesReceivedTestingTopicV3;
}
@Topic(name = "testingtopic", pubsubName = "messagebus")
@PostMapping("/route1")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent envelope) {
@ -50,7 +70,39 @@ public class SubscriberController {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype();
System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType);
messagesByTopic.compute("testingtopic", merge(envelope));
messagesReceivedTestingTopic.add(envelope);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "testingtopic", pubsubName = "messagebus",
rule = @Rule(match = "event.type == 'myevent.v2'", priority = 2))
@PostMapping(path = "/route1_v2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype();
System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType);
messagesReceivedTestingTopicV2.add(envelope);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "testingtopic", pubsubName = "messagebus",
rule = @Rule(match = "event.type == 'myevent.v3'", priority = 1))
@PostMapping(path = "/route1_v3")
public Mono<Void> handleMessageV3(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype();
System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType);
messagesReceivedTestingTopicV3.add(envelope);
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -46,12 +46,13 @@ public final class Validation {
public static void validate(String spanName, String sleepSpanName) throws Exception {
// Must wait for some time to make sure Zipkin receives all spans.
Thread.sleep(5000);
Thread.sleep(10000);
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
urlBuilder.scheme("http")
.host("localhost")
.port(9411)
.addPathSegments("api/v2/traces");
.addPathSegments("api/v2/traces")
.addQueryParameter("limit", "100");
Request.Builder requestBuilder = new Request.Builder()
.url(urlBuilder.build());
requestBuilder.method("GET", null);

View File

@ -0,0 +1,39 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Documented
@Target(ElementType.ANNOTATION_TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Rule {
/**
* The Common Expression Language (CEL) expression to use
* to match the incoming cloud event.
* <a href="https://docs.dapr.io/developing-applications/building-blocks/pubsub/howto-route-messages/">Examples</a>.
* @return the CEL expression.
*/
String match();
/**
* Priority of the rule used for ordering. Lowest numbers have higher priority.
* @return the rule's priority.
*/
int priority();
}

View File

@ -36,6 +36,12 @@ public @interface Topic {
*/
String pubsubName();
/**
* The rules used to match the incoming cloud event.
* @return the CEL expression.
*/
Rule rule() default @Rule(match = "", priority = 0);
/**
* Metadata in the form of a json object.
* {