From 63f591e87782ff71cdfb69a8f419ea85bc18f37f Mon Sep 17 00:00:00 2001 From: Charlie Stanley Date: Tue, 18 Aug 2020 17:21:52 -0700 Subject: [PATCH] Add support for multi-pubsub (#309) --- .github/workflows/build.yml | 16 ++++++++-------- .../io/dapr/examples/pubsub/http/Publisher.java | 6 +++++- .../java/io/dapr/examples/pubsub/http/README.md | 6 ++++-- .../pubsub/http/SubscriberController.java | 2 +- pom.xml | 2 +- .../dapr/springboot/DaprBeanPostProcessor.java | 5 +++-- .../java/io/dapr/springboot/DaprController.java | 1 + .../java/io/dapr/springboot/DaprRuntime.java | 8 ++++++-- .../dapr/springboot/DaprTopicSubscription.java | 9 ++++++++- .../java/io/dapr/it/pubsub/http/PubSubIT.java | 12 +++++++----- .../it/pubsub/http/SubscriberController.java | 8 ++++---- sdk/src/main/java/io/dapr/Topic.java | 6 ++++++ sdk/src/main/java/io/dapr/client/DaprClient.java | 6 ++++-- .../main/java/io/dapr/client/DaprClientGrpc.java | 12 +++++++----- .../main/java/io/dapr/client/DaprClientHttp.java | 11 +++++++---- .../java/io/dapr/client/DaprClientGrpcTest.java | 10 +++++----- .../java/io/dapr/client/DaprClientHttpTest.java | 16 ++++++++-------- .../java/io/dapr/runtime/DaprRuntimeTest.java | 6 ++++-- 18 files changed, 89 insertions(+), 53 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e8498f331..5e5717424 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,10 +22,10 @@ jobs: GOARCH: amd64 GOPROXY: https://proxy.golang.org JDK_VER: 13.0.x - DAPR_RUNTIME_VER: 0.9.0-rc.1 - DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/e498de9f7dd92c30aa592d6e6761dc924bb53cc2/install/install.sh - DAPR_CLI_REF: 35b9a824d2fd9e2dcf8e75d1d49ce59a39c7cf5b - DAPR_REF: b53586d1e5a880a3f87044975ace25b9ae51daec + DAPR_RUNTIME_VER: 0.10.0-rc.0 + DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/e7c9a643dfefbcfff0c2c26c12029259e6e81180/install/install.sh + DAPR_CLI_REF: e7c9a643dfefbcfff0c2c26c12029259e6e81180 + DAPR_REF: 98365b1b9ade55e7cf46cbc2313f0625318c0977 OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }} OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }} GPG_KEY: ${{ secrets.GPG_KEY }} @@ -66,15 +66,15 @@ jobs: cd .. - name: Initialize Dapr runtime ${{ env.DAPR_RUNTIME_VER }} run: | - sudo dapr init --runtime-version ${{ env.DAPR_RUNTIME_VER }} - echo "Showing dapr version..." - dapr --version + dapr uninstall --all + dapr init --runtime-version ${{ env.DAPR_RUNTIME_VER }} - name: Build and override daprd with referenced commit. if: env.DAPR_REF != '' run: | cd dapr make - sudo cp dist/linux_amd64/release/daprd /usr/local/bin/daprd + mkdir -p $HOME/.dapr/bin/ + cp dist/linux_amd64/release/daprd $HOME/.dapr/bin/daprd cd .. - name: Override placement service. if: env.DAPR_REF != '' diff --git a/examples/src/main/java/io/dapr/examples/pubsub/http/Publisher.java b/examples/src/main/java/io/dapr/examples/pubsub/http/Publisher.java index 4cca9730c..8cbcf39ad 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/http/Publisher.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/http/Publisher.java @@ -26,6 +26,9 @@ public class Publisher { //The title of the topic to be used for publishing private static final String TOPIC_NAME = "testingtopic"; + //The name of the pubseb + private static final String PUBSUB_NAME = "messagebus"; + /** * This is the entry point of the publisher app example. * @param args Args, unused. @@ -37,7 +40,7 @@ public class Publisher { for (int i = 0; i < NUM_MESSAGES; i++) { String message = String.format("This is message #%d", i); //Publishing messages - client.publishEvent(TOPIC_NAME, message).block(); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block(); System.out.println("Published message: " + message); try { @@ -51,6 +54,7 @@ public class Publisher { //Publishing a single bite: Example of non-string based content published client.publishEvent( + PUBSUB_NAME, TOPIC_NAME, new byte[]{1}, Collections.singletonMap("content-type", "application/octet-stream")).block(); diff --git a/examples/src/main/java/io/dapr/examples/pubsub/http/README.md b/examples/src/main/java/io/dapr/examples/pubsub/http/README.md index 6b25095ed..a33573804 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/http/README.md +++ b/examples/src/main/java/io/dapr/examples/pubsub/http/README.md @@ -57,7 +57,7 @@ This Spring Controller handles the message endpoint, Printing the message which @RestController public class SubscriberController { ///... - @Topic(name = "testingtopic") + @Topic(name = "testingtopic", pubsubName = "messagebus") @PostMapping(path = "/testingtopic") public Mono handleMessage(@RequestBody(required = false) byte[] body, @RequestHeader Map headers) { @@ -89,6 +89,8 @@ In the `Publisher.java` file, you will find the `Publisher` class, containing th public class Publisher { private static final int NUM_MESSAGES = 10; private static final String TOPIC_NAME = "testingtopic"; + private static final String PUBSUB_NAME = "messagebus"; + ///... public static void main(String[] args) throws Exception { //Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client @@ -96,7 +98,7 @@ public class Publisher { for (int i = 0; i < NUM_MESSAGES; i++) { String message = String.format("This is message #%d", i); //Publishing messages - client.publishEvent(TOPIC_NAME, message).block(); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block(); System.out.println("Published message: " + message); //... } diff --git a/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java b/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java index 0c50ec611..18f32dcd8 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java @@ -28,7 +28,7 @@ public class SubscriberController { * @param headers The headers of the http message. * @return A message containing the time. */ - @Topic(name = "testingtopic") + @Topic(name = "testingtopic", pubsubName = "messagebus") @PostMapping(path = "/testingtopic") public Mono handleMessage(@RequestBody(required = false) byte[] body, @RequestHeader Map headers) { diff --git a/pom.xml b/pom.xml index 5bf82f665..01b7f1c9f 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ 1.25.0 3.11.0 3.10.0 - https://raw.githubusercontent.com/dapr/dapr/b59f7895191db87195ea24667bc96902ad2a1b98/dapr/proto + https://raw.githubusercontent.com/dapr/dapr/98365b1b9ade55e7cf46cbc2313f0625318c0977/dapr/proto 1.6.2 3.1.1 1.8 diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java index 2d46ca2a0..8671a7c10 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java @@ -75,12 +75,13 @@ public class DaprBeanPostProcessor implements BeanPostProcessor { } String topicName = topic.name(); - if ((topicName != null) && (topicName.length() > 0)) { + String pubSubName = topic.pubsubName(); + if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) { try { TypeReference> typeRef = new TypeReference>() {}; Map metadata = MAPPER.readValue(topic.metadata(), typeRef); - DaprRuntime.getInstance().addSubscribedTopic(topicName, route, metadata); + DaprRuntime.getInstance().addSubscribedTopic(pubSubName, topicName, route, metadata); } catch (JsonProcessingException e) { throw new IllegalArgumentException("Error while parsing metadata: " + e.toString()); } diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java index e8de3ca51..e7894bab3 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java @@ -5,6 +5,7 @@ package io.dapr.springboot; + import io.dapr.actors.runtime.ActorRuntime; import io.dapr.serializer.DefaultObjectSerializer; import org.springframework.http.MediaType; diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java index 38cebf0cf..6e7028829 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java @@ -57,14 +57,18 @@ class DaprRuntime { /** * Adds a topic to the list of subscribed topics. * + * @param pubsubName Pubsub name to subcribe to. * @param topicName Name of the topic being subscribed to. * @param route Destination route for requests. * @param metadata Metadata for extended subscription functionality. */ - public synchronized void addSubscribedTopic(String topicName, String route, Map metadata) { + public synchronized void addSubscribedTopic(String pubsubName, + String topicName, + String route, + Map metadata) { if (!this.subscribedTopics.contains(topicName)) { this.subscribedTopics.add(topicName); - this.subscriptions.add(new DaprTopicSubscription(topicName, route, metadata)); + this.subscriptions.add(new DaprTopicSubscription(pubsubName, topicName, route, metadata)); } } diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java index 3db3022c1..1f0632365 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprTopicSubscription.java @@ -12,22 +12,29 @@ import java.util.Map; * Class to represent a subscription topic along with its metadata. */ public class DaprTopicSubscription { + private final String pubsubName; private final String topic; private final String route; private final Map metadata; /** * Create a subscription topic. + * @param pubsubName The pubsub name to subscribe to. * @param topic The topic to subscribe to. * @param route Destination route for messages. * @param metadata Metdata for extended subscription functionality. */ - public DaprTopicSubscription(String topic, String route, Map metadata) { + public DaprTopicSubscription(String pubsubName, String topic, String route, Map metadata) { + this.pubsubName = pubsubName; this.topic = topic; this.route = route; this.metadata = Collections.unmodifiableMap(metadata); } + public String getPubsubName() { + return pubsubName; + } + public String getTopic() { return topic; } diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java index 827c93952..49eb8ebe1 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java @@ -30,6 +30,7 @@ public class PubSubIT extends BaseIT { //Number of messages to be sent: 10 private static final int NUM_MESSAGES = 10; + private static final String PUBSUB_NAME = "messagebus"; //The title of the topic to be used for publishing private static final String TOPIC_NAME = "testingtopic"; private static final String ANOTHER_TOPIC_NAME = "anothertopic"; @@ -70,20 +71,21 @@ public class PubSubIT extends BaseIT { for (int i = 0; i < NUM_MESSAGES; i++) { String message = String.format("This is message #%d on topic %s", i, TOPIC_NAME); //Publishing messages - client.publishEvent(TOPIC_NAME, message).block(); - System.out.println(String.format("Published message: '%s' to topic '%s'", message, TOPIC_NAME)); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block(); + System.out.println(String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, TOPIC_NAME, PUBSUB_NAME)); } // Send a batch of different messages on the other. for (int i = 0; i < NUM_MESSAGES; i++) { String message = String.format("This is message #%d on topic %s", i, ANOTHER_TOPIC_NAME); //Publishing messages - client.publishEvent(ANOTHER_TOPIC_NAME, message).block(); - System.out.println(String.format("Published message: '%s' to topic '%s'", message, ANOTHER_TOPIC_NAME)); + client.publishEvent(PUBSUB_NAME, ANOTHER_TOPIC_NAME, message).block(); + System.out.println(String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, ANOTHER_TOPIC_NAME, PUBSUB_NAME)); } //Publishing a single byte: Example of non-string based content published client.publishEvent( + PUBSUB_NAME, TOPIC_NAME, new byte[]{1}, Collections.singletonMap("content-type", "application/octet-stream")).block(); @@ -96,7 +98,7 @@ public class PubSubIT extends BaseIT { final List messages = client.invokeService(daprRun.getAppName(), "messages/testingtopic", null, HttpExtension.GET, List.class).block(); assertEquals(11, messages.size()); for (int i = 0; i < NUM_MESSAGES; i++) { - assertTrue(messages.contains(String.format("This is message #%d on topic %s", i, TOPIC_NAME))); + assertTrue(messages.toString(), messages.contains(String.format("This is message #%d on topic %s", i, TOPIC_NAME))); } boolean foundByte = false; diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java index 27ec4d630..ed2fe08c7 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/SubscriberController.java @@ -34,7 +34,7 @@ public class SubscriberController { return messagesReceivedAnotherTopic; } - @Topic(name = "testingtopic") + @Topic(name = "testingtopic", pubsubName = "messagebus") @PostMapping(path = "/route1") public Mono handleMessage(@RequestBody(required = false) byte[] body, @RequestHeader Map headers) { @@ -44,7 +44,7 @@ public class SubscriberController { CloudEvent envelope = CloudEvent.deserialize(body); String message = envelope.getData() == null ? "" : envelope.getData(); - System.out.println("Subscriber got message: " + message); + System.out.println("Testing topic Subscriber got message: " + message); messagesReceivedTestingTopic.add(envelope.getData()); } catch (Exception e) { throw new RuntimeException(e); @@ -52,7 +52,7 @@ public class SubscriberController { }); } - @Topic(name = "anothertopic") + @Topic(name = "anothertopic", pubsubName = "messagebus") @PostMapping(path = "/route2") public Mono handleMessageAnotherTopic(@RequestBody(required = false) byte[] body, @RequestHeader Map headers) { @@ -62,7 +62,7 @@ public class SubscriberController { CloudEvent envelope = CloudEvent.deserialize(body); String message = envelope.getData() == null ? "" : envelope.getData(); - System.out.println("Subscriber got message: " + message); + System.out.println("Another topic Subscriber got message: " + message); messagesReceivedAnotherTopic.add(envelope.getData()); } catch (Exception e) { throw new RuntimeException(e); diff --git a/sdk/src/main/java/io/dapr/Topic.java b/sdk/src/main/java/io/dapr/Topic.java index 95bd0c29a..08315c08f 100644 --- a/sdk/src/main/java/io/dapr/Topic.java +++ b/sdk/src/main/java/io/dapr/Topic.java @@ -22,6 +22,12 @@ public @interface Topic { */ String name(); + /** + * Name of the pubsub bus to be subscribed to. + * @return pubsub bus's name. + */ + String pubsubName(); + /** * Metadata in the form of a json object. * { diff --git a/sdk/src/main/java/io/dapr/client/DaprClient.java b/sdk/src/main/java/io/dapr/client/DaprClient.java index a2c7d3466..2f3487676 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprClient.java @@ -24,21 +24,23 @@ public interface DaprClient extends Closeable { /** * Publish an event. * + * @param pubsubName the pubsub name we will publish the event to * @param topic the topic where the event will be published. * @param data the event's data to be published, use byte[] for skipping serialization. * @return a Mono plan of type Void. */ - Mono publishEvent(String topic, Object data); + Mono publishEvent(String pubsubName, String topic, Object data); /** * Publish an event. * + * @param pubsubName the pubsub name we will publish the event to * @param topic the topic where the event will be published. * @param data the event's data to be published, use byte[] for skipping serialization. * @param metadata The metadata for the published event. * @return a Mono plan of type Void. */ - Mono publishEvent(String topic, Object data, Map metadata); + Mono publishEvent(String pubsubName, String topic, Object data, Map metadata); /** * Invoke a service with all possible parameters, using serialization. diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index 163c4485a..9b3f0121e 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -100,19 +100,21 @@ public class DaprClientGrpc implements DaprClient { * {@inheritDoc} */ @Override - public Mono publishEvent(String topic, Object data) { - return this.publishEvent(topic, data, null); + public Mono publishEvent(String pubsubName, String topic, Object data) { + return this.publishEvent(pubsubName, topic, data, null); } /** * {@inheritDoc} */ @Override - public Mono publishEvent(String topic, Object data, Map metadata) { + public Mono publishEvent(String pubsubName, String topic, Object data, Map metadata) { try { // TODO: handle metadata. DaprProtos.PublishEventRequest envelope = DaprProtos.PublishEventRequest.newBuilder() - .setTopic(topic).setData(ByteString.copyFrom(objectSerializer.serialize(data))).build(); + .setTopic(topic) + .setPubsubName(pubsubName) + .setData(ByteString.copyFrom(objectSerializer.serialize(data))).build(); return Mono.fromCallable(() -> { ListenableFuture futureEmpty = client.publishEvent(envelope); @@ -607,4 +609,4 @@ public class DaprClientGrpc implements DaprClient { channel.close(); } } -} \ No newline at end of file +} diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java index 18cf2d00b..766a2ad3f 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java @@ -14,6 +14,7 @@ import io.dapr.utils.TypeRef; import reactor.core.publisher.Mono; import java.io.IOException; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -95,22 +96,24 @@ public class DaprClientHttp implements DaprClient { * {@inheritDoc} */ @Override - public Mono publishEvent(String topic, Object data) { - return this.publishEvent(topic, data, null); + public Mono publishEvent(String pubsubName, String topic, Object data) { + return this.publishEvent(pubsubName, topic, data, null); } /** * {@inheritDoc} */ @Override - public Mono publishEvent(String topic, Object data, Map metadata) { + public Mono publishEvent(String pubsubName, String topic, Object data, Map metadata) { try { if (topic == null || topic.trim().isEmpty()) { throw new IllegalArgumentException("Topic name cannot be null or empty."); } + StringBuilder url = new StringBuilder(Constants.PUBLISH_PATH) + .append("/").append(pubsubName) + .append("/").append(topic); byte[] serializedEvent = objectSerializer.serialize(data); - StringBuilder url = new StringBuilder(Constants.PUBLISH_PATH).append("/").append(topic); return this.client.invokeApi( DaprHttp.HttpMethods.POST.name(), url.toString(), null, serializedEvent, metadata).then(); } catch (Exception ex) { diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java index 2b97115f6..d02735897 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java @@ -69,7 +69,7 @@ public class DaprClientGrpcTest { public void publishEventExceptionThrownTest() { when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) .thenThrow(RuntimeException.class); - Mono result = adapter.publishEvent("topic", "object"); + Mono result = adapter.publishEvent("pubsubname","topic", "object"); result.block(); } @@ -81,7 +81,7 @@ public class DaprClientGrpcTest { addCallback(settableFuture, callback, directExecutor()); when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) .thenReturn(settableFuture); - Mono result = adapter.publishEvent("topic", "object"); + Mono result = adapter.publishEvent("pubsubname","topic", "object"); settableFuture.setException(ex); result.block(); } @@ -93,7 +93,7 @@ public class DaprClientGrpcTest { addCallback(settableFuture, callback, directExecutor()); when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) .thenReturn(settableFuture); - Mono result = adapter.publishEvent("topic", "object"); + Mono result = adapter.publishEvent("pubsubname","topic", "object"); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -109,7 +109,7 @@ public class DaprClientGrpcTest { settableFuture.set(Empty.newBuilder().build()); return settableFuture; }); - adapter.publishEvent("topic", "object"); + adapter.publishEvent("pubsubname", "topic", "object"); // Do not call block() on the mono above, so nothing should happen. assertFalse(callback.wasCalled); } @@ -122,7 +122,7 @@ public class DaprClientGrpcTest { when(client.publishEvent(any(DaprProtos.PublishEventRequest.class))) .thenReturn(settableFuture); MyObject event = new MyObject(1, "Event"); - Mono result = adapter.publishEvent("topic", event); + Mono result = adapter.publishEvent("pubsubname", "topic", event); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index 4a04c9880..288746827 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -45,48 +45,48 @@ public class DaprClientHttpTest { @Test public void publishEventInvokation() { mockInterceptor.addRule() - .post("http://127.0.0.1:3000/v1.0/publish/A") + .post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A") .respond(EXPECTED_RESULT); String event = "{ \"message\": \"This is a test\" }"; daprHttp = new DaprHttp(3000, okHttpClient); DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp); - Mono mono = daprClientHttp.publishEvent("A", event, null); + Mono mono = daprClientHttp.publishEvent("mypubsubname", "A", event, null); assertNull(mono.block()); } @Test public void publishEvent() { mockInterceptor.addRule() - .post("http://127.0.0.1:3000/v1.0/publish/A") + .post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A") .respond(EXPECTED_RESULT); String event = "{ \"message\": \"This is a test\" }"; daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); - Mono mono = daprClientHttp.publishEvent("A", event); + Mono mono = daprClientHttp.publishEvent("mypubsubname","A", event); assertNull(mono.block()); } @Test(expected = IllegalArgumentException.class) public void publishEventIfTopicIsNull() { mockInterceptor.addRule() - .post("http://127.0.0.1:3000/v1.0/publish/A") + .post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A") .respond(EXPECTED_RESULT); String event = "{ \"message\": \"This is a test\" }"; daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); - Mono mono = daprClientHttp.publishEvent("", event); + Mono mono = daprClientHttp.publishEvent("mypubsubname", "", event); assertNull(mono.block()); } @Test public void publishEventNoHotMono() { mockInterceptor.addRule() - .post("http://127.0.0.1:3000/v1.0/publish/A") + .post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A") .respond(EXPECTED_RESULT); String event = "{ \"message\": \"This is a test\" }"; daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); - daprClientHttp.publishEvent("", event); + daprClientHttp.publishEvent("mypubsubname", "", event); // Should not throw exception because did not call block() on mono above. } diff --git a/sdk/src/test/java/io/dapr/runtime/DaprRuntimeTest.java b/sdk/src/test/java/io/dapr/runtime/DaprRuntimeTest.java index 1363292e3..2c03bf048 100644 --- a/sdk/src/test/java/io/dapr/runtime/DaprRuntimeTest.java +++ b/sdk/src/test/java/io/dapr/runtime/DaprRuntimeTest.java @@ -33,6 +33,8 @@ public class DaprRuntimeTest { private static final String TYPE_PLAIN_TEXT = "plain/text"; + private static final String PUBSUB_NAME = "mypubsubname"; + private static final String TOPIC_NAME = "mytopic"; private static final String APP_ID = "myappid"; @@ -111,7 +113,7 @@ public class DaprRuntimeTest { for (Message message : messages) { when(daprHttp.invokeApi( eq("POST"), - eq(Constants.PUBLISH_PATH + "/" + TOPIC_NAME), + eq(Constants.PUBLISH_PATH + "/" + PUBSUB_NAME + "/" + TOPIC_NAME), any(), eq(serializer.serialize(message.data)), eq(null))) @@ -120,7 +122,7 @@ public class DaprRuntimeTest { this.serialize(message), message.metadata).then()); - client.publishEvent(TOPIC_NAME, message.data).block(); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message.data).block(); CloudEvent envelope = new CloudEvent( message.id,