mirror of https://github.com/dapr/java-sdk.git
				
				
				
			Add support for multi-pubsub (#309)
This commit is contained in:
		
							parent
							
								
									b62fde00cd
								
							
						
					
					
						commit
						63f591e877
					
				| 
						 | 
				
			
			@ -22,10 +22,10 @@ jobs:
 | 
			
		|||
      GOARCH: amd64
 | 
			
		||||
      GOPROXY: https://proxy.golang.org
 | 
			
		||||
      JDK_VER: 13.0.x
 | 
			
		||||
      DAPR_RUNTIME_VER: 0.9.0-rc.1
 | 
			
		||||
      DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/e498de9f7dd92c30aa592d6e6761dc924bb53cc2/install/install.sh
 | 
			
		||||
      DAPR_CLI_REF: 35b9a824d2fd9e2dcf8e75d1d49ce59a39c7cf5b
 | 
			
		||||
      DAPR_REF: b53586d1e5a880a3f87044975ace25b9ae51daec
 | 
			
		||||
      DAPR_RUNTIME_VER: 0.10.0-rc.0
 | 
			
		||||
      DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/e7c9a643dfefbcfff0c2c26c12029259e6e81180/install/install.sh
 | 
			
		||||
      DAPR_CLI_REF: e7c9a643dfefbcfff0c2c26c12029259e6e81180
 | 
			
		||||
      DAPR_REF: 98365b1b9ade55e7cf46cbc2313f0625318c0977
 | 
			
		||||
      OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }}
 | 
			
		||||
      OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }}
 | 
			
		||||
      GPG_KEY: ${{ secrets.GPG_KEY }}
 | 
			
		||||
| 
						 | 
				
			
			@ -66,15 +66,15 @@ jobs:
 | 
			
		|||
        cd ..
 | 
			
		||||
    - name: Initialize Dapr runtime ${{ env.DAPR_RUNTIME_VER }}
 | 
			
		||||
      run: |
 | 
			
		||||
        sudo dapr init --runtime-version ${{ env.DAPR_RUNTIME_VER }}
 | 
			
		||||
        echo "Showing dapr version..."
 | 
			
		||||
        dapr --version
 | 
			
		||||
        dapr uninstall --all
 | 
			
		||||
        dapr init --runtime-version ${{ env.DAPR_RUNTIME_VER }}
 | 
			
		||||
    - name: Build and override daprd with referenced commit.
 | 
			
		||||
      if: env.DAPR_REF != ''
 | 
			
		||||
      run: |
 | 
			
		||||
        cd dapr
 | 
			
		||||
        make
 | 
			
		||||
        sudo cp dist/linux_amd64/release/daprd /usr/local/bin/daprd
 | 
			
		||||
        mkdir -p $HOME/.dapr/bin/
 | 
			
		||||
        cp dist/linux_amd64/release/daprd $HOME/.dapr/bin/daprd
 | 
			
		||||
        cd ..
 | 
			
		||||
    - name: Override placement service.
 | 
			
		||||
      if: env.DAPR_REF != ''
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -26,6 +26,9 @@ public class Publisher {
 | 
			
		|||
  //The title of the topic to be used for publishing
 | 
			
		||||
  private static final String TOPIC_NAME = "testingtopic";
 | 
			
		||||
 | 
			
		||||
  //The name of the pubseb
 | 
			
		||||
  private static final String PUBSUB_NAME = "messagebus";
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * This is the entry point of the publisher app example.
 | 
			
		||||
   * @param args Args, unused.
 | 
			
		||||
| 
						 | 
				
			
			@ -37,7 +40,7 @@ public class Publisher {
 | 
			
		|||
      for (int i = 0; i < NUM_MESSAGES; i++) {
 | 
			
		||||
        String message = String.format("This is message #%d", i);
 | 
			
		||||
        //Publishing messages
 | 
			
		||||
        client.publishEvent(TOPIC_NAME, message).block();
 | 
			
		||||
        client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
 | 
			
		||||
        System.out.println("Published message: " + message);
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
| 
						 | 
				
			
			@ -51,6 +54,7 @@ public class Publisher {
 | 
			
		|||
 | 
			
		||||
      //Publishing a single bite: Example of non-string based content published
 | 
			
		||||
      client.publishEvent(
 | 
			
		||||
          PUBSUB_NAME,
 | 
			
		||||
          TOPIC_NAME,
 | 
			
		||||
          new byte[]{1},
 | 
			
		||||
          Collections.singletonMap("content-type", "application/octet-stream")).block();
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -57,7 +57,7 @@ This Spring Controller handles the message endpoint, Printing the message which
 | 
			
		|||
@RestController
 | 
			
		||||
public class SubscriberController {
 | 
			
		||||
  ///...
 | 
			
		||||
  @Topic(name = "testingtopic")
 | 
			
		||||
  @Topic(name = "testingtopic", pubsubName = "messagebus")
 | 
			
		||||
  @PostMapping(path = "/testingtopic")
 | 
			
		||||
  public Mono<Void> handleMessage(@RequestBody(required = false) byte[] body,
 | 
			
		||||
                                   @RequestHeader Map<String, String> headers) {
 | 
			
		||||
| 
						 | 
				
			
			@ -89,6 +89,8 @@ In the `Publisher.java` file, you will find the `Publisher` class, containing th
 | 
			
		|||
public class Publisher {
 | 
			
		||||
    private static final int NUM_MESSAGES = 10;
 | 
			
		||||
    private static final String TOPIC_NAME = "testingtopic";
 | 
			
		||||
    private static final String PUBSUB_NAME = "messagebus";
 | 
			
		||||
 | 
			
		||||
///...
 | 
			
		||||
  public static void main(String[] args) throws Exception {
 | 
			
		||||
      //Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
 | 
			
		||||
| 
						 | 
				
			
			@ -96,7 +98,7 @@ public class Publisher {
 | 
			
		|||
        for (int i = 0; i < NUM_MESSAGES; i++) {
 | 
			
		||||
          String message = String.format("This is message #%d", i);
 | 
			
		||||
          //Publishing messages
 | 
			
		||||
          client.publishEvent(TOPIC_NAME, message).block();
 | 
			
		||||
          client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
 | 
			
		||||
          System.out.println("Published message: " + message);
 | 
			
		||||
          //...
 | 
			
		||||
        }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -28,7 +28,7 @@ public class SubscriberController {
 | 
			
		|||
   * @param headers The headers of the http message.
 | 
			
		||||
   * @return A message containing the time.
 | 
			
		||||
   */
 | 
			
		||||
  @Topic(name = "testingtopic")
 | 
			
		||||
  @Topic(name = "testingtopic", pubsubName = "messagebus")
 | 
			
		||||
  @PostMapping(path = "/testingtopic")
 | 
			
		||||
  public Mono<Void> handleMessage(@RequestBody(required = false) byte[] body,
 | 
			
		||||
                                  @RequestHeader Map<String, String> headers) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										2
									
								
								pom.xml
								
								
								
								
							
							
						
						
									
										2
									
								
								pom.xml
								
								
								
								
							| 
						 | 
				
			
			@ -17,7 +17,7 @@
 | 
			
		|||
    <grpc.version>1.25.0</grpc.version>
 | 
			
		||||
    <protobuf.version>3.11.0</protobuf.version>
 | 
			
		||||
    <protoc.version>3.10.0</protoc.version>
 | 
			
		||||
    <dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/b59f7895191db87195ea24667bc96902ad2a1b98/dapr/proto</dapr.proto.baseurl>
 | 
			
		||||
    <dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/98365b1b9ade55e7cf46cbc2313f0625318c0977/dapr/proto</dapr.proto.baseurl>
 | 
			
		||||
    <os-maven-plugin.version>1.6.2</os-maven-plugin.version>
 | 
			
		||||
    <maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
 | 
			
		||||
    <maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -75,12 +75,13 @@ public class DaprBeanPostProcessor implements BeanPostProcessor {
 | 
			
		|||
      }
 | 
			
		||||
 | 
			
		||||
      String topicName = topic.name();
 | 
			
		||||
      if ((topicName != null) && (topicName.length() > 0)) {
 | 
			
		||||
      String pubSubName = topic.pubsubName();
 | 
			
		||||
      if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
 | 
			
		||||
        try {
 | 
			
		||||
          TypeReference<HashMap<String, String>> typeRef
 | 
			
		||||
                  = new TypeReference<HashMap<String, String>>() {};
 | 
			
		||||
          Map<String, String> metadata = MAPPER.readValue(topic.metadata(), typeRef);
 | 
			
		||||
          DaprRuntime.getInstance().addSubscribedTopic(topicName, route, metadata);
 | 
			
		||||
          DaprRuntime.getInstance().addSubscribedTopic(pubSubName, topicName, route, metadata);
 | 
			
		||||
        } catch (JsonProcessingException e) {
 | 
			
		||||
          throw new IllegalArgumentException("Error while parsing metadata: " + e.toString());
 | 
			
		||||
        }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -5,6 +5,7 @@
 | 
			
		|||
 | 
			
		||||
package io.dapr.springboot;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
import io.dapr.actors.runtime.ActorRuntime;
 | 
			
		||||
import io.dapr.serializer.DefaultObjectSerializer;
 | 
			
		||||
import org.springframework.http.MediaType;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -57,14 +57,18 @@ class DaprRuntime {
 | 
			
		|||
  /**
 | 
			
		||||
   * Adds a topic to the list of subscribed topics.
 | 
			
		||||
   *
 | 
			
		||||
   * @param pubsubName Pubsub name to subcribe to.
 | 
			
		||||
   * @param topicName Name of the topic being subscribed to.
 | 
			
		||||
   * @param route Destination route for requests.
 | 
			
		||||
   * @param metadata Metadata for extended subscription functionality.
 | 
			
		||||
   */
 | 
			
		||||
  public synchronized void addSubscribedTopic(String topicName, String route, Map<String,String> metadata) {
 | 
			
		||||
  public synchronized void addSubscribedTopic(String pubsubName,
 | 
			
		||||
                                              String topicName,
 | 
			
		||||
                                              String route,
 | 
			
		||||
                                              Map<String,String> metadata) {
 | 
			
		||||
    if (!this.subscribedTopics.contains(topicName)) {
 | 
			
		||||
      this.subscribedTopics.add(topicName);
 | 
			
		||||
      this.subscriptions.add(new DaprTopicSubscription(topicName, route, metadata));
 | 
			
		||||
      this.subscriptions.add(new DaprTopicSubscription(pubsubName, topicName, route, metadata));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -12,22 +12,29 @@ import java.util.Map;
 | 
			
		|||
 * Class to represent a subscription topic along with its metadata.
 | 
			
		||||
 */
 | 
			
		||||
public class DaprTopicSubscription {
 | 
			
		||||
  private final String pubsubName;
 | 
			
		||||
  private final String topic;
 | 
			
		||||
  private final String route;
 | 
			
		||||
  private final Map<String, String> metadata;
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Create a subscription topic.
 | 
			
		||||
   * @param pubsubName The pubsub name to subscribe to.
 | 
			
		||||
   * @param topic The topic to subscribe to.
 | 
			
		||||
   * @param route Destination route for messages.
 | 
			
		||||
   * @param metadata Metdata for extended subscription functionality.
 | 
			
		||||
   */
 | 
			
		||||
  public DaprTopicSubscription(String topic, String route, Map<String, String> metadata) {
 | 
			
		||||
  public DaprTopicSubscription(String pubsubName, String topic, String route, Map<String, String> metadata) {
 | 
			
		||||
    this.pubsubName = pubsubName;
 | 
			
		||||
    this.topic = topic;
 | 
			
		||||
    this.route = route;
 | 
			
		||||
    this.metadata = Collections.unmodifiableMap(metadata);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public String getPubsubName() {
 | 
			
		||||
    return pubsubName;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  public String getTopic() {
 | 
			
		||||
    return topic;
 | 
			
		||||
  }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -30,6 +30,7 @@ public class PubSubIT extends BaseIT {
 | 
			
		|||
  //Number of messages to be sent: 10
 | 
			
		||||
  private static final int NUM_MESSAGES = 10;
 | 
			
		||||
 | 
			
		||||
  private static final String PUBSUB_NAME = "messagebus";
 | 
			
		||||
  //The title of the topic to be used for publishing
 | 
			
		||||
  private static final String TOPIC_NAME = "testingtopic";
 | 
			
		||||
  private static final String ANOTHER_TOPIC_NAME = "anothertopic";
 | 
			
		||||
| 
						 | 
				
			
			@ -70,20 +71,21 @@ public class PubSubIT extends BaseIT {
 | 
			
		|||
      for (int i = 0; i < NUM_MESSAGES; i++) {
 | 
			
		||||
        String message = String.format("This is message #%d on topic %s", i, TOPIC_NAME);
 | 
			
		||||
        //Publishing messages
 | 
			
		||||
        client.publishEvent(TOPIC_NAME, message).block();
 | 
			
		||||
        System.out.println(String.format("Published message: '%s' to topic '%s'", message, TOPIC_NAME));
 | 
			
		||||
        client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
 | 
			
		||||
        System.out.println(String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, TOPIC_NAME, PUBSUB_NAME));
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      // Send a batch of different messages on the other.
 | 
			
		||||
      for (int i = 0; i < NUM_MESSAGES; i++) {
 | 
			
		||||
        String message = String.format("This is message #%d on topic %s", i, ANOTHER_TOPIC_NAME);
 | 
			
		||||
        //Publishing messages
 | 
			
		||||
        client.publishEvent(ANOTHER_TOPIC_NAME, message).block();
 | 
			
		||||
        System.out.println(String.format("Published message: '%s' to topic '%s'", message, ANOTHER_TOPIC_NAME));
 | 
			
		||||
        client.publishEvent(PUBSUB_NAME, ANOTHER_TOPIC_NAME, message).block();
 | 
			
		||||
        System.out.println(String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, ANOTHER_TOPIC_NAME, PUBSUB_NAME));
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      //Publishing a single byte: Example of non-string based content published
 | 
			
		||||
      client.publishEvent(
 | 
			
		||||
          PUBSUB_NAME,
 | 
			
		||||
          TOPIC_NAME,
 | 
			
		||||
          new byte[]{1},
 | 
			
		||||
          Collections.singletonMap("content-type", "application/octet-stream")).block();
 | 
			
		||||
| 
						 | 
				
			
			@ -96,7 +98,7 @@ public class PubSubIT extends BaseIT {
 | 
			
		|||
        final List<String> messages = client.invokeService(daprRun.getAppName(), "messages/testingtopic", null, HttpExtension.GET, List.class).block();
 | 
			
		||||
        assertEquals(11, messages.size());
 | 
			
		||||
        for (int i = 0; i < NUM_MESSAGES; i++) {
 | 
			
		||||
          assertTrue(messages.contains(String.format("This is message #%d on topic %s", i, TOPIC_NAME)));
 | 
			
		||||
          assertTrue(messages.toString(), messages.contains(String.format("This is message #%d on topic %s", i, TOPIC_NAME)));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        boolean foundByte = false;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -34,7 +34,7 @@ public class SubscriberController {
 | 
			
		|||
    return messagesReceivedAnotherTopic;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Topic(name = "testingtopic")
 | 
			
		||||
  @Topic(name = "testingtopic", pubsubName = "messagebus")
 | 
			
		||||
  @PostMapping(path = "/route1")
 | 
			
		||||
  public Mono<Void> handleMessage(@RequestBody(required = false) byte[] body,
 | 
			
		||||
                                  @RequestHeader Map<String, String> headers) {
 | 
			
		||||
| 
						 | 
				
			
			@ -44,7 +44,7 @@ public class SubscriberController {
 | 
			
		|||
        CloudEvent envelope = CloudEvent.deserialize(body);
 | 
			
		||||
 | 
			
		||||
        String message = envelope.getData() == null ? "" : envelope.getData();
 | 
			
		||||
        System.out.println("Subscriber got message: " + message);
 | 
			
		||||
        System.out.println("Testing topic Subscriber got message: " + message);
 | 
			
		||||
        messagesReceivedTestingTopic.add(envelope.getData());
 | 
			
		||||
      } catch (Exception e) {
 | 
			
		||||
        throw new RuntimeException(e);
 | 
			
		||||
| 
						 | 
				
			
			@ -52,7 +52,7 @@ public class SubscriberController {
 | 
			
		|||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Topic(name = "anothertopic")
 | 
			
		||||
  @Topic(name = "anothertopic", pubsubName = "messagebus")
 | 
			
		||||
  @PostMapping(path = "/route2")
 | 
			
		||||
  public Mono<Void> handleMessageAnotherTopic(@RequestBody(required = false) byte[] body,
 | 
			
		||||
                                  @RequestHeader Map<String, String> headers) {
 | 
			
		||||
| 
						 | 
				
			
			@ -62,7 +62,7 @@ public class SubscriberController {
 | 
			
		|||
        CloudEvent envelope = CloudEvent.deserialize(body);
 | 
			
		||||
 | 
			
		||||
        String message = envelope.getData() == null ? "" : envelope.getData();
 | 
			
		||||
        System.out.println("Subscriber got message: " + message);
 | 
			
		||||
        System.out.println("Another topic Subscriber got message: " + message);
 | 
			
		||||
        messagesReceivedAnotherTopic.add(envelope.getData());
 | 
			
		||||
      } catch (Exception e) {
 | 
			
		||||
        throw new RuntimeException(e);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -22,6 +22,12 @@ public @interface Topic {
 | 
			
		|||
   */
 | 
			
		||||
  String name();
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Name of the pubsub bus to be subscribed to.
 | 
			
		||||
   * @return pubsub bus's name.
 | 
			
		||||
   */
 | 
			
		||||
  String pubsubName();
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Metadata in the form of a json object.
 | 
			
		||||
   * {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -24,21 +24,23 @@ public interface DaprClient extends Closeable {
 | 
			
		|||
  /**
 | 
			
		||||
   * Publish an event.
 | 
			
		||||
   *
 | 
			
		||||
   * @param pubsubName the pubsub name we will publish the event to
 | 
			
		||||
   * @param topic the topic where the event will be published.
 | 
			
		||||
   * @param data the event's data to be published, use byte[] for skipping serialization.
 | 
			
		||||
   * @return a Mono plan of type Void.
 | 
			
		||||
   */
 | 
			
		||||
  Mono<Void> publishEvent(String topic, Object data);
 | 
			
		||||
  Mono<Void> publishEvent(String pubsubName, String topic, Object data);
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Publish an event.
 | 
			
		||||
   *
 | 
			
		||||
   * @param pubsubName the pubsub name we will publish the event to
 | 
			
		||||
   * @param topic    the topic where the event will be published.
 | 
			
		||||
   * @param data    the event's data to be published, use byte[] for skipping serialization.
 | 
			
		||||
   * @param metadata The metadata for the published event.
 | 
			
		||||
   * @return a Mono plan of type Void.
 | 
			
		||||
   */
 | 
			
		||||
  Mono<Void> publishEvent(String topic, Object data, Map<String, String> metadata);
 | 
			
		||||
  Mono<Void> publishEvent(String pubsubName, String topic, Object data, Map<String, String> metadata);
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Invoke a service with all possible parameters, using serialization.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -100,19 +100,21 @@ public class DaprClientGrpc implements DaprClient {
 | 
			
		|||
   * {@inheritDoc}
 | 
			
		||||
   */
 | 
			
		||||
  @Override
 | 
			
		||||
  public Mono<Void> publishEvent(String topic, Object data) {
 | 
			
		||||
    return this.publishEvent(topic, data, null);
 | 
			
		||||
  public Mono<Void> publishEvent(String pubsubName, String topic, Object data) {
 | 
			
		||||
    return this.publishEvent(pubsubName, topic, data, null);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * {@inheritDoc}
 | 
			
		||||
   */
 | 
			
		||||
  @Override
 | 
			
		||||
  public Mono<Void> publishEvent(String topic, Object data, Map<String, String> metadata) {
 | 
			
		||||
  public Mono<Void> publishEvent(String pubsubName, String topic, Object data, Map<String, String> metadata) {
 | 
			
		||||
    try {
 | 
			
		||||
      // TODO: handle metadata.
 | 
			
		||||
      DaprProtos.PublishEventRequest envelope = DaprProtos.PublishEventRequest.newBuilder()
 | 
			
		||||
          .setTopic(topic).setData(ByteString.copyFrom(objectSerializer.serialize(data))).build();
 | 
			
		||||
          .setTopic(topic)
 | 
			
		||||
          .setPubsubName(pubsubName)
 | 
			
		||||
          .setData(ByteString.copyFrom(objectSerializer.serialize(data))).build();
 | 
			
		||||
 | 
			
		||||
      return Mono.fromCallable(() -> {
 | 
			
		||||
        ListenableFuture<Empty> futureEmpty = client.publishEvent(envelope);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -14,6 +14,7 @@ import io.dapr.utils.TypeRef;
 | 
			
		|||
import reactor.core.publisher.Mono;
 | 
			
		||||
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
import java.nio.charset.Charset;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Arrays;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
| 
						 | 
				
			
			@ -95,22 +96,24 @@ public class DaprClientHttp implements DaprClient {
 | 
			
		|||
   * {@inheritDoc}
 | 
			
		||||
   */
 | 
			
		||||
  @Override
 | 
			
		||||
  public Mono<Void> publishEvent(String topic, Object data) {
 | 
			
		||||
    return this.publishEvent(topic, data, null);
 | 
			
		||||
  public Mono<Void> publishEvent(String pubsubName, String topic, Object data) {
 | 
			
		||||
    return this.publishEvent(pubsubName, topic, data, null);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * {@inheritDoc}
 | 
			
		||||
   */
 | 
			
		||||
  @Override
 | 
			
		||||
  public Mono<Void> publishEvent(String topic, Object data, Map<String, String> metadata) {
 | 
			
		||||
  public Mono<Void> publishEvent(String pubsubName, String topic, Object data, Map<String, String> metadata) {
 | 
			
		||||
    try {
 | 
			
		||||
      if (topic == null || topic.trim().isEmpty()) {
 | 
			
		||||
        throw new IllegalArgumentException("Topic name cannot be null or empty.");
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      StringBuilder url = new StringBuilder(Constants.PUBLISH_PATH)
 | 
			
		||||
              .append("/").append(pubsubName)
 | 
			
		||||
              .append("/").append(topic);
 | 
			
		||||
      byte[] serializedEvent = objectSerializer.serialize(data);
 | 
			
		||||
      StringBuilder url = new StringBuilder(Constants.PUBLISH_PATH).append("/").append(topic);
 | 
			
		||||
      return this.client.invokeApi(
 | 
			
		||||
          DaprHttp.HttpMethods.POST.name(), url.toString(), null, serializedEvent, metadata).then();
 | 
			
		||||
    } catch (Exception ex) {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -69,7 +69,7 @@ public class DaprClientGrpcTest {
 | 
			
		|||
  public void publishEventExceptionThrownTest() {
 | 
			
		||||
    when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))
 | 
			
		||||
        .thenThrow(RuntimeException.class);
 | 
			
		||||
    Mono<Void> result = adapter.publishEvent("topic", "object");
 | 
			
		||||
    Mono<Void> result = adapter.publishEvent("pubsubname","topic", "object");
 | 
			
		||||
    result.block();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -81,7 +81,7 @@ public class DaprClientGrpcTest {
 | 
			
		|||
    addCallback(settableFuture, callback, directExecutor());
 | 
			
		||||
    when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))
 | 
			
		||||
        .thenReturn(settableFuture);
 | 
			
		||||
    Mono<Void> result = adapter.publishEvent("topic", "object");
 | 
			
		||||
    Mono<Void> result = adapter.publishEvent("pubsubname","topic", "object");
 | 
			
		||||
    settableFuture.setException(ex);
 | 
			
		||||
    result.block();
 | 
			
		||||
  }
 | 
			
		||||
| 
						 | 
				
			
			@ -93,7 +93,7 @@ public class DaprClientGrpcTest {
 | 
			
		|||
    addCallback(settableFuture, callback, directExecutor());
 | 
			
		||||
    when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))
 | 
			
		||||
        .thenReturn(settableFuture);
 | 
			
		||||
    Mono<Void> result = adapter.publishEvent("topic", "object");
 | 
			
		||||
    Mono<Void> result = adapter.publishEvent("pubsubname","topic", "object");
 | 
			
		||||
    settableFuture.set(Empty.newBuilder().build());
 | 
			
		||||
    result.block();
 | 
			
		||||
    assertTrue(callback.wasCalled);
 | 
			
		||||
| 
						 | 
				
			
			@ -109,7 +109,7 @@ public class DaprClientGrpcTest {
 | 
			
		|||
          settableFuture.set(Empty.newBuilder().build());
 | 
			
		||||
          return settableFuture;
 | 
			
		||||
        });
 | 
			
		||||
    adapter.publishEvent("topic", "object");
 | 
			
		||||
    adapter.publishEvent("pubsubname", "topic", "object");
 | 
			
		||||
    // Do not call block() on the mono above, so nothing should happen.
 | 
			
		||||
    assertFalse(callback.wasCalled);
 | 
			
		||||
  }
 | 
			
		||||
| 
						 | 
				
			
			@ -122,7 +122,7 @@ public class DaprClientGrpcTest {
 | 
			
		|||
    when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))
 | 
			
		||||
        .thenReturn(settableFuture);
 | 
			
		||||
    MyObject event = new MyObject(1, "Event");
 | 
			
		||||
    Mono<Void> result = adapter.publishEvent("topic", event);
 | 
			
		||||
    Mono<Void> result = adapter.publishEvent("pubsubname", "topic", event);
 | 
			
		||||
    settableFuture.set(Empty.newBuilder().build());
 | 
			
		||||
    result.block();
 | 
			
		||||
    assertTrue(callback.wasCalled);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -45,48 +45,48 @@ public class DaprClientHttpTest {
 | 
			
		|||
  @Test
 | 
			
		||||
  public void publishEventInvokation() {
 | 
			
		||||
    mockInterceptor.addRule()
 | 
			
		||||
      .post("http://127.0.0.1:3000/v1.0/publish/A")
 | 
			
		||||
      .post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
 | 
			
		||||
      .respond(EXPECTED_RESULT);
 | 
			
		||||
    String event = "{ \"message\": \"This is a test\" }";
 | 
			
		||||
    daprHttp = new DaprHttp(3000, okHttpClient);
 | 
			
		||||
    DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp);
 | 
			
		||||
    Mono<Void> mono = daprClientHttp.publishEvent("A", event, null);
 | 
			
		||||
    Mono<Void> mono = daprClientHttp.publishEvent("mypubsubname", "A", event, null);
 | 
			
		||||
    assertNull(mono.block());
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  public void publishEvent() {
 | 
			
		||||
    mockInterceptor.addRule()
 | 
			
		||||
      .post("http://127.0.0.1:3000/v1.0/publish/A")
 | 
			
		||||
      .post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
 | 
			
		||||
      .respond(EXPECTED_RESULT);
 | 
			
		||||
    String event = "{ \"message\": \"This is a test\" }";
 | 
			
		||||
    daprHttp = new DaprHttp(3000, okHttpClient);
 | 
			
		||||
    daprClientHttp = new DaprClientHttp(daprHttp);
 | 
			
		||||
    Mono<Void> mono = daprClientHttp.publishEvent("A", event);
 | 
			
		||||
    Mono<Void> mono = daprClientHttp.publishEvent("mypubsubname","A", event);
 | 
			
		||||
    assertNull(mono.block());
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test(expected = IllegalArgumentException.class)
 | 
			
		||||
  public void publishEventIfTopicIsNull() {
 | 
			
		||||
    mockInterceptor.addRule()
 | 
			
		||||
      .post("http://127.0.0.1:3000/v1.0/publish/A")
 | 
			
		||||
      .post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
 | 
			
		||||
      .respond(EXPECTED_RESULT);
 | 
			
		||||
    String event = "{ \"message\": \"This is a test\" }";
 | 
			
		||||
    daprHttp = new DaprHttp(3000, okHttpClient);
 | 
			
		||||
    daprClientHttp = new DaprClientHttp(daprHttp);
 | 
			
		||||
    Mono<Void> mono = daprClientHttp.publishEvent("", event);
 | 
			
		||||
    Mono<Void> mono = daprClientHttp.publishEvent("mypubsubname", "", event);
 | 
			
		||||
    assertNull(mono.block());
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Test
 | 
			
		||||
  public void publishEventNoHotMono() {
 | 
			
		||||
    mockInterceptor.addRule()
 | 
			
		||||
        .post("http://127.0.0.1:3000/v1.0/publish/A")
 | 
			
		||||
        .post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
 | 
			
		||||
        .respond(EXPECTED_RESULT);
 | 
			
		||||
    String event = "{ \"message\": \"This is a test\" }";
 | 
			
		||||
    daprHttp = new DaprHttp(3000, okHttpClient);
 | 
			
		||||
    daprClientHttp = new DaprClientHttp(daprHttp);
 | 
			
		||||
    daprClientHttp.publishEvent("", event);
 | 
			
		||||
    daprClientHttp.publishEvent("mypubsubname", "", event);
 | 
			
		||||
    // Should not throw exception because did not call block() on mono above.
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -33,6 +33,8 @@ public class DaprRuntimeTest {
 | 
			
		|||
 | 
			
		||||
  private static final String TYPE_PLAIN_TEXT = "plain/text";
 | 
			
		||||
 | 
			
		||||
  private static final String PUBSUB_NAME = "mypubsubname";
 | 
			
		||||
 | 
			
		||||
  private static final String TOPIC_NAME = "mytopic";
 | 
			
		||||
 | 
			
		||||
  private static final String APP_ID = "myappid";
 | 
			
		||||
| 
						 | 
				
			
			@ -111,7 +113,7 @@ public class DaprRuntimeTest {
 | 
			
		|||
    for (Message message : messages) {
 | 
			
		||||
      when(daprHttp.invokeApi(
 | 
			
		||||
          eq("POST"),
 | 
			
		||||
          eq(Constants.PUBLISH_PATH + "/" + TOPIC_NAME),
 | 
			
		||||
          eq(Constants.PUBLISH_PATH + "/" + PUBSUB_NAME + "/" + TOPIC_NAME),
 | 
			
		||||
          any(),
 | 
			
		||||
          eq(serializer.serialize(message.data)),
 | 
			
		||||
          eq(null)))
 | 
			
		||||
| 
						 | 
				
			
			@ -120,7 +122,7 @@ public class DaprRuntimeTest {
 | 
			
		|||
              this.serialize(message),
 | 
			
		||||
              message.metadata).then());
 | 
			
		||||
 | 
			
		||||
      client.publishEvent(TOPIC_NAME, message.data).block();
 | 
			
		||||
      client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message.data).block();
 | 
			
		||||
 | 
			
		||||
      CloudEvent envelope = new CloudEvent(
 | 
			
		||||
        message.id,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue