diff --git a/daprdocs/content/en/java-sdk-docs/_index.md b/daprdocs/content/en/java-sdk-docs/_index.md index 438e29738..d4fb7be7b 100644 --- a/daprdocs/content/en/java-sdk-docs/_index.md +++ b/daprdocs/content/en/java-sdk-docs/_index.md @@ -172,6 +172,20 @@ public class SubscriberController { }); } + @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}", + rule = @Rule(match = "event.type == 'myevent.v2'", priority = 1)) + @PostMapping(path = "/testingtopicV2") + public Mono handleMessageV2(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + System.out.println("Subscriber got: " + cloudEvent.getData()); + System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + } ``` diff --git a/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java b/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java index fc0f09527..19cb26a91 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java @@ -14,6 +14,7 @@ limitations under the License. package io.dapr.examples.pubsub.http; import com.fasterxml.jackson.databind.ObjectMapper; +import io.dapr.Rule; import io.dapr.Topic; import io.dapr.client.domain.CloudEvent; import org.springframework.web.bind.annotation.PostMapping; @@ -47,4 +48,23 @@ public class SubscriberController { }); } + /** + * Handles a registered publish endpoint on this app (version 2 of a cloud event). + * @param cloudEvent The cloud event received. + * @return A message containing the time. + */ + @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}", + rule = @Rule(match = "event.type == \"v2\"", priority = 1)) + @PostMapping(path = "/testingtopicV2") + public Mono handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) { + return Mono.fromRunnable(() -> { + try { + System.out.println("Subscriber got: " + cloudEvent.getData()); + System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + } diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java index cc591184a..6afd1ba9a 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java @@ -16,6 +16,7 @@ package io.dapr.springboot; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import io.dapr.Rule; import io.dapr.Topic; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; @@ -26,7 +27,6 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; -import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; @@ -85,8 +85,11 @@ public class DaprBeanPostProcessor implements BeanPostProcessor { continue; } + + Rule rule = topic.rule(); String topicName = embeddedValueResolver.resolveStringValue(topic.name()); String pubSubName = embeddedValueResolver.resolveStringValue(topic.pubsubName()); + String match = embeddedValueResolver.resolveStringValue(rule.match()); if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) { try { TypeReference> typeRef @@ -94,12 +97,11 @@ public class DaprBeanPostProcessor implements BeanPostProcessor { Map metadata = MAPPER.readValue(topic.metadata(), typeRef); List routes = getAllCompleteRoutesForPost(clazz, method, topicName); for (String route : routes) { - DaprRuntime.getInstance().addSubscribedTopic(pubSubName, topicName, route, - metadata); + DaprRuntime.getInstance().addSubscribedTopic( + pubSubName, topicName, match, rule.priority(), route, metadata); } - } catch (JsonProcessingException e) { - throw new IllegalArgumentException("Error while parsing metadata: " + e.toString()); + throw new IllegalArgumentException("Error while parsing metadata: " + e); } } } diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java index abb5898d2..298b8bca3 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java @@ -13,11 +13,12 @@ limitations under the License. package io.dapr.springboot; -import java.util.ArrayList; -import java.util.HashSet; +import io.dapr.Rule; + +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.stream.Collectors; /** * Internal Singleton to handle Dapr configuration. @@ -30,14 +31,9 @@ class DaprRuntime { private static volatile DaprRuntime instance; /** - * List of subscribed topics. + * Map of subscription builders. */ - private final Set subscribedTopics = new HashSet<>(); - - /** - * List of subscriptions. - */ - private final List subscriptions = new ArrayList<>(); + private final Map subscriptionBuilders = new HashMap<>(); /** * Private constructor to make this singleton. @@ -67,20 +63,38 @@ class DaprRuntime { * * @param pubsubName Pubsub name to subcribe to. * @param topicName Name of the topic being subscribed to. + * @param rule The optional rule for this route. * @param route Destination route for requests. * @param metadata Metadata for extended subscription functionality. */ public synchronized void addSubscribedTopic(String pubsubName, String topicName, + String match, + int priority, String route, Map metadata) { - if (!this.subscribedTopics.contains(topicName)) { - this.subscribedTopics.add(topicName); - this.subscriptions.add(new DaprTopicSubscription(pubsubName, topicName, route, metadata)); + DaprTopicKey topicKey = new DaprTopicKey(pubsubName, topicName); + + DaprSubscriptionBuilder builder = subscriptionBuilders.get(topicKey); + if (builder == null) { + builder = new DaprSubscriptionBuilder(pubsubName, topicName); + subscriptionBuilders.put(topicKey, builder); + } + + if (match.length() > 0) { + builder.addRule(route, match, priority); + } else { + builder.setDefaultPath(route); + } + + if (metadata != null && !metadata.isEmpty()) { + builder.setMetadata(metadata); } } public synchronized DaprTopicSubscription[] listSubscribedTopics() { - return this.subscriptions.toArray(new DaprTopicSubscription[0]); + List values = subscriptionBuilders.values().stream() + .map(b -> b.build()).collect(Collectors.toList()); + return values.toArray(new DaprTopicSubscription[0]); } } diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprSubscriptionBuilder.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprSubscriptionBuilder.java new file mode 100644 index 000000000..9ed65508e --- /dev/null +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprSubscriptionBuilder.java @@ -0,0 +1,122 @@ +/* + * Copyright 2022 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +class DaprSubscriptionBuilder { + private final String pubsubName; + private final String topic; + private final List rules; + private String defaultPath; + private Map metadata; + + /** + * Create a subscription topic. + * @param pubsubName The pubsub name to subscribe to. + * @param topic The topic to subscribe to. + */ + DaprSubscriptionBuilder(String pubsubName, String topic) { + this.pubsubName = pubsubName; + this.topic = topic; + this.rules = new ArrayList<>(); + this.defaultPath = null; + this.metadata = Collections.emptyMap(); + } + + /** + * Sets the default path for the subscription. + * @param path The default path. + * @return this instance. + */ + DaprSubscriptionBuilder setDefaultPath(String path) { + if (defaultPath != null) { + throw new RuntimeException( + String.format( + "a default route is already set for topic %s on pubsub %s", + this.topic, this.pubsubName)); + } + defaultPath = path; + return this; + } + + /** + * Adds a rule to the subscription. + * @param path The path to route to. + * @param match The CEL expression the event must match. + * @param priority The priority of the rule. + * @return this instance. + */ + public DaprSubscriptionBuilder addRule(String path, String match, int priority) { + if (rules.stream().anyMatch(e -> e.getPriority() == priority)) { + throw new RuntimeException( + String.format( + "a rule priority of %d is already used for topic %s on pubsub %s", + priority, this.topic, this.pubsubName)); + } + rules.add(new TopicRule(path, match, priority)); + return this; + } + + /** + * Sets the metadata for the subscription. + * @param metadata The metadata. + * @return this instance. + */ + public DaprSubscriptionBuilder setMetadata(Map metadata) { + this.metadata = metadata; + return this; + } + + /** + * Builds the DaprTopicSubscription that is returned by the application to Dapr. + * @return The DaprTopicSubscription. + */ + public DaprTopicSubscription build() { + String route = null; + DaprTopicRoutes routes = null; + + if (!rules.isEmpty()) { + Collections.sort(rules, Comparator.comparingInt(TopicRule::getPriority)); + List topicRules = rules.stream() + .map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList()); + routes = new DaprTopicRoutes(topicRules, defaultPath); + } else { + route = defaultPath; + } + + return new DaprTopicSubscription(this.pubsubName, this.topic, route, routes, metadata); + } + + private static class TopicRule { + private final String path; + private final String match; + private final int priority; + + public TopicRule(String path, String match, int priority) { + this.path = path; + this.match = match; + this.priority = priority; + } + + public int getPriority() { + return priority; + } + } +} diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicKey.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicKey.java new file mode 100644 index 000000000..e2a3ee6b6 --- /dev/null +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicKey.java @@ -0,0 +1,51 @@ +/* + * Copyright 2022 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot; + +import java.util.Objects; + +class DaprTopicKey { + private final String pubsubName; + private final String topic; + + DaprTopicKey(String pubsubName, String topic) { + this.pubsubName = pubsubName; + this.topic = topic; + } + + public String getPubsubName() { + return pubsubName; + } + + public String getTopic() { + return topic; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DaprTopicKey that = (DaprTopicKey) o; + return pubsubName.equals(that.pubsubName) && topic.equals(that.topic); + } + + @Override + public int hashCode() { + return Objects.hash(pubsubName, topic); + } +} diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicRoutes.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicRoutes.java new file mode 100644 index 000000000..fcb1bbb0f --- /dev/null +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicRoutes.java @@ -0,0 +1,38 @@ +/* + * Copyright 2022 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Optional; + +class DaprTopicRoutes { + private final List rules; + @JsonProperty("default") + private final String defaultRoute; + + DaprTopicRoutes(List rules, String defaultRoute) { + this.rules = rules; + this.defaultRoute = defaultRoute; + } + + public List getRules() { + return rules; + } + + public String getDefaultRoute() { + return defaultRoute; + } +} diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicRule.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicRule.java new file mode 100644 index 000000000..4342e242a --- /dev/null +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicRule.java @@ -0,0 +1,51 @@ +/* + * Copyright 2022 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot; + +import java.util.Objects; + +class DaprTopicRule { + private final String match; + private final String path; + + DaprTopicRule(String match, String path) { + this.match = match; + this.path = path; + } + + public String getMatch() { + return match; + } + + public String getPath() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DaprTopicRule that = (DaprTopicRule) o; + return match.equals(that.match) && path.equals(that.path); + } + + @Override + public int hashCode() { + return Objects.hash(match, path); + } +} diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java index b55d586f3..1612bc141 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java @@ -19,10 +19,11 @@ import java.util.Map; /** * Class to represent a subscription topic along with its metadata. */ -public class DaprTopicSubscription { +class DaprTopicSubscription { private final String pubsubName; private final String topic; private final String route; + private final DaprTopicRoutes routes; private final Map metadata; /** @@ -32,10 +33,24 @@ public class DaprTopicSubscription { * @param route Destination route for messages. * @param metadata Metdata for extended subscription functionality. */ - public DaprTopicSubscription(String pubsubName, String topic, String route, Map metadata) { + DaprTopicSubscription(String pubsubName, String topic, String route, Map metadata) { + this(pubsubName, topic, route, null, metadata); + } + + /** + * Create a subscription topic. + * @param pubsubName The pubsub name to subscribe to. + * @param topic The topic to subscribe to. + * @param route Destination route for messages. + * @param routes Destination routes with rules for messages. + * @param metadata Metdata for extended subscription functionality. + */ + DaprTopicSubscription(String pubsubName, String topic, String route, + DaprTopicRoutes routes, Map metadata) { this.pubsubName = pubsubName; this.topic = topic; this.route = route; + this.routes = routes; this.metadata = Collections.unmodifiableMap(metadata); } @@ -51,6 +66,10 @@ public class DaprTopicSubscription { return route; } + public DaprTopicRoutes getRoutes() { + return routes; + } + public Map getMetadata() { return metadata; } diff --git a/sdk-tests/components/kafka_bindings.yaml b/sdk-tests/components/kafka_bindings.yaml index 0ced72076..574ce4140 100644 --- a/sdk-tests/components/kafka_bindings.yaml +++ b/sdk-tests/components/kafka_bindings.yaml @@ -19,3 +19,5 @@ spec: value: sample - name: authRequired value: "false" + - name: initialOffset + value: oldest diff --git a/sdk-tests/configurations/configuration.yaml b/sdk-tests/configurations/configuration.yaml new file mode 100644 index 000000000..e2d316154 --- /dev/null +++ b/sdk-tests/configurations/configuration.yaml @@ -0,0 +1,12 @@ +apiVersion: dapr.io/v1alpha1 +kind: Configuration +metadata: + name: testconfiguration +spec: + tracing: + samplingRate: "1" + zipkin: + endpointAddress: http://localhost:9411/api/v2/spans + features: + - name: PubSub.Routing + enabled: true \ No newline at end of file diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index cdf1d10d7..13b0f901c 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -28,7 +28,9 @@ public class DaprRun implements Stoppable { private static final String DAPR_SUCCESS_MESSAGE = "You're up and running!"; - private static final String DAPR_RUN = "dapr run --app-id %s --app-protocol %s --components-path ./components"; + private static final String DAPR_RUN = "dapr run --app-id %s --app-protocol %s " + + "--config ./configurations/configuration.yaml " + + "--components-path ./components"; // the arg in -Dexec.args is the app's port private static final String DAPR_COMMAND = diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java index b7d5f6583..84f81239e 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java @@ -210,7 +210,35 @@ public class PubSubIT extends BaseIT { .setContentType("application/cloudevents+json")).block(); System.out.println("Published one cloud event."); - Thread.sleep(3000); + { + CloudEvent cloudEventV2 = new CloudEvent(); + cloudEventV2.setId("2222"); + cloudEventV2.setData("message from cloudevent v2"); + cloudEventV2.setSource("test"); + cloudEventV2.setSpecversion("1"); + cloudEventV2.setType("myevent.v2"); + cloudEventV2.setDatacontenttype("text/plain"); + client.publishEvent( + new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV2) + .setContentType("application/cloudevents+json")).block(); + System.out.println("Published one cloud event for v2."); + } + + { + CloudEvent cloudEventV3 = new CloudEvent(); + cloudEventV3.setId("3333"); + cloudEventV3.setData("message from cloudevent v3"); + cloudEventV3.setSource("test"); + cloudEventV3.setSpecversion("1"); + cloudEventV3.setType("myevent.v3"); + cloudEventV3.setDatacontenttype("text/plain"); + client.publishEvent( + new PublishEventRequest(PUBSUB_NAME, TOPIC_NAME, cloudEventV3) + .setContentType("application/cloudevents+json")).block(); + System.out.println("Published one cloud event for v3."); + } + + Thread.sleep(2000); callWithRetry(() -> { System.out.println("Checking results for topic " + TOPIC_NAME); @@ -258,6 +286,30 @@ public class PubSubIT extends BaseIT { .count() == 1); }, 2000); + callWithRetry(() -> { + System.out.println("Checking results for topic " + TOPIC_NAME + " V2"); + // Validate text payload. + final List messages = client.invokeMethod( + daprRun.getAppName(), + "messages/testingtopicV2", + null, + HttpExtension.GET, + CLOUD_EVENT_LIST_TYPE_REF).block(); + assertEquals(1, messages.size()); + }, 2000); + + callWithRetry(() -> { + System.out.println("Checking results for topic " + TOPIC_NAME + " V3"); + // Validate text payload. + final List messages = client.invokeMethod( + daprRun.getAppName(), + "messages/testingtopicV3", + null, + HttpExtension.GET, + CLOUD_EVENT_LIST_TYPE_REF).block(); + assertEquals(1, messages.size()); + }, 2000); + callWithRetry(() -> { System.out.println("Checking results for topic " + TYPED_TOPIC_NAME); // Validate object payload. diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java index 726e8990b..dc6c7d2af 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java @@ -13,6 +13,7 @@ limitations under the License. package io.dapr.it.pubsub.http; +import io.dapr.Rule; import io.dapr.Topic; import io.dapr.client.domain.CloudEvent; import org.springframework.web.bind.annotation.GetMapping; @@ -42,6 +43,25 @@ public class SubscriberController { return messagesByTopic.getOrDefault(topic, Collections.emptyList()); } + private static final List messagesReceivedTestingTopic = new ArrayList(); + private static final List messagesReceivedTestingTopicV2 = new ArrayList(); + private static final List messagesReceivedTestingTopicV3 = new ArrayList(); + + @GetMapping(path = "/messages/testingtopic") + public List getMessagesReceivedTestingTopic() { + return messagesReceivedTestingTopic; + } + + @GetMapping(path = "/messages/testingtopicV2") + public List getMessagesReceivedTestingTopicV2() { + return messagesReceivedTestingTopicV2; + } + + @GetMapping(path = "/messages/testingtopicV3") + public List getMessagesReceivedTestingTopicV3() { + return messagesReceivedTestingTopicV3; + } + @Topic(name = "testingtopic", pubsubName = "messagebus") @PostMapping("/route1") public Mono handleMessage(@RequestBody(required = false) CloudEvent envelope) { @@ -50,7 +70,39 @@ public class SubscriberController { String message = envelope.getData() == null ? "" : envelope.getData().toString(); String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); - messagesByTopic.compute("testingtopic", merge(envelope)); + messagesReceivedTestingTopic.add(envelope); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "testingtopic", pubsubName = "messagebus", + rule = @Rule(match = "event.type == 'myevent.v2'", priority = 2)) + @PostMapping(path = "/route1_v2") + public Mono handleMessageV2(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); + messagesReceivedTestingTopicV2.add(envelope); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + @Topic(name = "testingtopic", pubsubName = "messagebus", + rule = @Rule(match = "event.type == 'myevent.v3'", priority = 1)) + @PostMapping(path = "/route1_v3") + public Mono handleMessageV3(@RequestBody(required = false) CloudEvent envelope) { + return Mono.fromRunnable(() -> { + try { + String message = envelope.getData() == null ? "" : envelope.getData().toString(); + String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype(); + System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType); + messagesReceivedTestingTopicV3.add(envelope); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/Validation.java b/sdk-tests/src/test/java/io/dapr/it/tracing/Validation.java index e7a175583..0f6a7867e 100644 --- a/sdk-tests/src/test/java/io/dapr/it/tracing/Validation.java +++ b/sdk-tests/src/test/java/io/dapr/it/tracing/Validation.java @@ -46,12 +46,13 @@ public final class Validation { public static void validate(String spanName, String sleepSpanName) throws Exception { // Must wait for some time to make sure Zipkin receives all spans. - Thread.sleep(5000); + Thread.sleep(10000); HttpUrl.Builder urlBuilder = new HttpUrl.Builder(); urlBuilder.scheme("http") .host("localhost") .port(9411) - .addPathSegments("api/v2/traces"); + .addPathSegments("api/v2/traces") + .addQueryParameter("limit", "100"); Request.Builder requestBuilder = new Request.Builder() .url(urlBuilder.build()); requestBuilder.method("GET", null); diff --git a/sdk/src/main/java/io/dapr/Rule.java b/sdk/src/main/java/io/dapr/Rule.java new file mode 100644 index 000000000..894e5d4b0 --- /dev/null +++ b/sdk/src/main/java/io/dapr/Rule.java @@ -0,0 +1,39 @@ +/* + * Copyright 2022 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Documented +@Target(ElementType.ANNOTATION_TYPE) +@Retention(RetentionPolicy.RUNTIME) +public @interface Rule { + /** + * The Common Expression Language (CEL) expression to use + * to match the incoming cloud event. + * Examples. + * @return the CEL expression. + */ + String match(); + + /** + * Priority of the rule used for ordering. Lowest numbers have higher priority. + * @return the rule's priority. + */ + int priority(); +} diff --git a/sdk/src/main/java/io/dapr/Topic.java b/sdk/src/main/java/io/dapr/Topic.java index 665aab949..2cd7624b0 100644 --- a/sdk/src/main/java/io/dapr/Topic.java +++ b/sdk/src/main/java/io/dapr/Topic.java @@ -36,6 +36,12 @@ public @interface Topic { */ String pubsubName(); + /** + * The rules used to match the incoming cloud event. + * @return the CEL expression. + */ + Rule rule() default @Rule(match = "", priority = 0); + /** * Metadata in the form of a json object. * {