Merge pull request #323 from dapr/multi-pubsub

Add support for multi-pubsub (#309)
This commit is contained in:
Charlie Stanley 2020-08-19 10:19:48 -07:00 committed by GitHub
commit c0c3bc552b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 89 additions and 53 deletions

View File

@ -22,10 +22,10 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
JDK_VER: 13.0.x
DAPR_RUNTIME_VER: 0.9.0-rc.1
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/e498de9f7dd92c30aa592d6e6761dc924bb53cc2/install/install.sh
DAPR_CLI_REF: 35b9a824d2fd9e2dcf8e75d1d49ce59a39c7cf5b
DAPR_REF: b53586d1e5a880a3f87044975ace25b9ae51daec
DAPR_RUNTIME_VER: 0.10.0-rc.0
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/e7c9a643dfefbcfff0c2c26c12029259e6e81180/install/install.sh
DAPR_CLI_REF: e7c9a643dfefbcfff0c2c26c12029259e6e81180
DAPR_REF: 98365b1b9ade55e7cf46cbc2313f0625318c0977
OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }}
OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }}
GPG_KEY: ${{ secrets.GPG_KEY }}
@ -66,15 +66,15 @@ jobs:
cd ..
- name: Initialize Dapr runtime ${{ env.DAPR_RUNTIME_VER }}
run: |
sudo dapr init --runtime-version ${{ env.DAPR_RUNTIME_VER }}
echo "Showing dapr version..."
dapr --version
dapr uninstall --all
dapr init --runtime-version ${{ env.DAPR_RUNTIME_VER }}
- name: Build and override daprd with referenced commit.
if: env.DAPR_REF != ''
run: |
cd dapr
make
sudo cp dist/linux_amd64/release/daprd /usr/local/bin/daprd
mkdir -p $HOME/.dapr/bin/
cp dist/linux_amd64/release/daprd $HOME/.dapr/bin/daprd
cd ..
- name: Override placement service.
if: env.DAPR_REF != ''

View File

@ -26,6 +26,9 @@ public class Publisher {
//The title of the topic to be used for publishing
private static final String TOPIC_NAME = "testingtopic";
//The name of the pubseb
private static final String PUBSUB_NAME = "messagebus";
/**
* This is the entry point of the publisher app example.
* @param args Args, unused.
@ -37,7 +40,7 @@ public class Publisher {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
//Publishing messages
client.publishEvent(TOPIC_NAME, message).block();
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
System.out.println("Published message: " + message);
try {
@ -51,6 +54,7 @@ public class Publisher {
//Publishing a single bite: Example of non-string based content published
client.publishEvent(
PUBSUB_NAME,
TOPIC_NAME,
new byte[]{1},
Collections.singletonMap("content-type", "application/octet-stream")).block();

View File

@ -57,7 +57,7 @@ This Spring Controller handles the message endpoint, Printing the message which
@RestController
public class SubscriberController {
///...
@Topic(name = "testingtopic")
@Topic(name = "testingtopic", pubsubName = "messagebus")
@PostMapping(path = "/testingtopic")
public Mono<Void> handleMessage(@RequestBody(required = false) byte[] body,
@RequestHeader Map<String, String> headers) {
@ -89,6 +89,8 @@ In the `Publisher.java` file, you will find the `Publisher` class, containing th
public class Publisher {
private static final int NUM_MESSAGES = 10;
private static final String TOPIC_NAME = "testingtopic";
private static final String PUBSUB_NAME = "messagebus";
///...
public static void main(String[] args) throws Exception {
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
@ -96,7 +98,7 @@ public class Publisher {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
//Publishing messages
client.publishEvent(TOPIC_NAME, message).block();
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
System.out.println("Published message: " + message);
//...
}

View File

@ -28,7 +28,7 @@ public class SubscriberController {
* @param headers The headers of the http message.
* @return A message containing the time.
*/
@Topic(name = "testingtopic")
@Topic(name = "testingtopic", pubsubName = "messagebus")
@PostMapping(path = "/testingtopic")
public Mono<Void> handleMessage(@RequestBody(required = false) byte[] body,
@RequestHeader Map<String, String> headers) {

View File

@ -17,7 +17,7 @@
<grpc.version>1.25.0</grpc.version>
<protobuf.version>3.11.0</protobuf.version>
<protoc.version>3.10.0</protoc.version>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/b59f7895191db87195ea24667bc96902ad2a1b98/dapr/proto</dapr.proto.baseurl>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/98365b1b9ade55e7cf46cbc2313f0625318c0977/dapr/proto</dapr.proto.baseurl>
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>

View File

@ -75,12 +75,13 @@ public class DaprBeanPostProcessor implements BeanPostProcessor {
}
String topicName = topic.name();
if ((topicName != null) && (topicName.length() > 0)) {
String pubSubName = topic.pubsubName();
if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
try {
TypeReference<HashMap<String, String>> typeRef
= new TypeReference<HashMap<String, String>>() {};
Map<String, String> metadata = MAPPER.readValue(topic.metadata(), typeRef);
DaprRuntime.getInstance().addSubscribedTopic(topicName, route, metadata);
DaprRuntime.getInstance().addSubscribedTopic(pubSubName, topicName, route, metadata);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Error while parsing metadata: " + e.toString());
}

View File

@ -5,6 +5,7 @@
package io.dapr.springboot;
import io.dapr.actors.runtime.ActorRuntime;
import io.dapr.serializer.DefaultObjectSerializer;
import org.springframework.http.MediaType;

View File

@ -57,14 +57,18 @@ class DaprRuntime {
/**
* Adds a topic to the list of subscribed topics.
*
* @param pubsubName Pubsub name to subcribe to.
* @param topicName Name of the topic being subscribed to.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
*/
public synchronized void addSubscribedTopic(String topicName, String route, Map<String,String> metadata) {
public synchronized void addSubscribedTopic(String pubsubName,
String topicName,
String route,
Map<String,String> metadata) {
if (!this.subscribedTopics.contains(topicName)) {
this.subscribedTopics.add(topicName);
this.subscriptions.add(new DaprTopicSubscription(topicName, route, metadata));
this.subscriptions.add(new DaprTopicSubscription(pubsubName, topicName, route, metadata));
}
}

View File

@ -12,22 +12,29 @@ import java.util.Map;
* Class to represent a subscription topic along with its metadata.
*/
public class DaprTopicSubscription {
private final String pubsubName;
private final String topic;
private final String route;
private final Map<String, String> metadata;
/**
* Create a subscription topic.
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param route Destination route for messages.
* @param metadata Metdata for extended subscription functionality.
*/
public DaprTopicSubscription(String topic, String route, Map<String, String> metadata) {
public DaprTopicSubscription(String pubsubName, String topic, String route, Map<String, String> metadata) {
this.pubsubName = pubsubName;
this.topic = topic;
this.route = route;
this.metadata = Collections.unmodifiableMap(metadata);
}
public String getPubsubName() {
return pubsubName;
}
public String getTopic() {
return topic;
}

View File

@ -30,6 +30,7 @@ public class PubSubIT extends BaseIT {
//Number of messages to be sent: 10
private static final int NUM_MESSAGES = 10;
private static final String PUBSUB_NAME = "messagebus";
//The title of the topic to be used for publishing
private static final String TOPIC_NAME = "testingtopic";
private static final String ANOTHER_TOPIC_NAME = "anothertopic";
@ -70,20 +71,21 @@ public class PubSubIT extends BaseIT {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d on topic %s", i, TOPIC_NAME);
//Publishing messages
client.publishEvent(TOPIC_NAME, message).block();
System.out.println(String.format("Published message: '%s' to topic '%s'", message, TOPIC_NAME));
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
System.out.println(String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, TOPIC_NAME, PUBSUB_NAME));
}
// Send a batch of different messages on the other.
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d on topic %s", i, ANOTHER_TOPIC_NAME);
//Publishing messages
client.publishEvent(ANOTHER_TOPIC_NAME, message).block();
System.out.println(String.format("Published message: '%s' to topic '%s'", message, ANOTHER_TOPIC_NAME));
client.publishEvent(PUBSUB_NAME, ANOTHER_TOPIC_NAME, message).block();
System.out.println(String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, ANOTHER_TOPIC_NAME, PUBSUB_NAME));
}
//Publishing a single byte: Example of non-string based content published
client.publishEvent(
PUBSUB_NAME,
TOPIC_NAME,
new byte[]{1},
Collections.singletonMap("content-type", "application/octet-stream")).block();
@ -96,7 +98,7 @@ public class PubSubIT extends BaseIT {
final List<String> messages = client.invokeService(daprRun.getAppName(), "messages/testingtopic", null, HttpExtension.GET, List.class).block();
assertEquals(11, messages.size());
for (int i = 0; i < NUM_MESSAGES; i++) {
assertTrue(messages.contains(String.format("This is message #%d on topic %s", i, TOPIC_NAME)));
assertTrue(messages.toString(), messages.contains(String.format("This is message #%d on topic %s", i, TOPIC_NAME)));
}
boolean foundByte = false;

View File

@ -34,7 +34,7 @@ public class SubscriberController {
return messagesReceivedAnotherTopic;
}
@Topic(name = "testingtopic")
@Topic(name = "testingtopic", pubsubName = "messagebus")
@PostMapping(path = "/route1")
public Mono<Void> handleMessage(@RequestBody(required = false) byte[] body,
@RequestHeader Map<String, String> headers) {
@ -44,7 +44,7 @@ public class SubscriberController {
CloudEvent envelope = CloudEvent.deserialize(body);
String message = envelope.getData() == null ? "" : envelope.getData();
System.out.println("Subscriber got message: " + message);
System.out.println("Testing topic Subscriber got message: " + message);
messagesReceivedTestingTopic.add(envelope.getData());
} catch (Exception e) {
throw new RuntimeException(e);
@ -52,7 +52,7 @@ public class SubscriberController {
});
}
@Topic(name = "anothertopic")
@Topic(name = "anothertopic", pubsubName = "messagebus")
@PostMapping(path = "/route2")
public Mono<Void> handleMessageAnotherTopic(@RequestBody(required = false) byte[] body,
@RequestHeader Map<String, String> headers) {
@ -62,7 +62,7 @@ public class SubscriberController {
CloudEvent envelope = CloudEvent.deserialize(body);
String message = envelope.getData() == null ? "" : envelope.getData();
System.out.println("Subscriber got message: " + message);
System.out.println("Another topic Subscriber got message: " + message);
messagesReceivedAnotherTopic.add(envelope.getData());
} catch (Exception e) {
throw new RuntimeException(e);

View File

@ -22,6 +22,12 @@ public @interface Topic {
*/
String name();
/**
* Name of the pubsub bus to be subscribed to.
* @return pubsub bus's name.
*/
String pubsubName();
/**
* Metadata in the form of a json object.
* {

View File

@ -24,21 +24,23 @@ public interface DaprClient extends Closeable {
/**
* Publish an event.
*
* @param pubsubName the pubsub name we will publish the event to
* @param topic the topic where the event will be published.
* @param data the event's data to be published, use byte[] for skipping serialization.
* @return a Mono plan of type Void.
*/
Mono<Void> publishEvent(String topic, Object data);
Mono<Void> publishEvent(String pubsubName, String topic, Object data);
/**
* Publish an event.
*
* @param pubsubName the pubsub name we will publish the event to
* @param topic the topic where the event will be published.
* @param data the event's data to be published, use byte[] for skipping serialization.
* @param metadata The metadata for the published event.
* @return a Mono plan of type Void.
*/
Mono<Void> publishEvent(String topic, Object data, Map<String, String> metadata);
Mono<Void> publishEvent(String pubsubName, String topic, Object data, Map<String, String> metadata);
/**
* Invoke a service with all possible parameters, using serialization.

View File

@ -100,19 +100,21 @@ public class DaprClientGrpc implements DaprClient {
* {@inheritDoc}
*/
@Override
public Mono<Void> publishEvent(String topic, Object data) {
return this.publishEvent(topic, data, null);
public Mono<Void> publishEvent(String pubsubName, String topic, Object data) {
return this.publishEvent(pubsubName, topic, data, null);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> publishEvent(String topic, Object data, Map<String, String> metadata) {
public Mono<Void> publishEvent(String pubsubName, String topic, Object data, Map<String, String> metadata) {
try {
// TODO: handle metadata.
DaprProtos.PublishEventRequest envelope = DaprProtos.PublishEventRequest.newBuilder()
.setTopic(topic).setData(ByteString.copyFrom(objectSerializer.serialize(data))).build();
.setTopic(topic)
.setPubsubName(pubsubName)
.setData(ByteString.copyFrom(objectSerializer.serialize(data))).build();
return Mono.fromCallable(() -> {
ListenableFuture<Empty> futureEmpty = client.publishEvent(envelope);

View File

@ -14,6 +14,7 @@ import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -95,22 +96,24 @@ public class DaprClientHttp implements DaprClient {
* {@inheritDoc}
*/
@Override
public Mono<Void> publishEvent(String topic, Object data) {
return this.publishEvent(topic, data, null);
public Mono<Void> publishEvent(String pubsubName, String topic, Object data) {
return this.publishEvent(pubsubName, topic, data, null);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> publishEvent(String topic, Object data, Map<String, String> metadata) {
public Mono<Void> publishEvent(String pubsubName, String topic, Object data, Map<String, String> metadata) {
try {
if (topic == null || topic.trim().isEmpty()) {
throw new IllegalArgumentException("Topic name cannot be null or empty.");
}
StringBuilder url = new StringBuilder(Constants.PUBLISH_PATH)
.append("/").append(pubsubName)
.append("/").append(topic);
byte[] serializedEvent = objectSerializer.serialize(data);
StringBuilder url = new StringBuilder(Constants.PUBLISH_PATH).append("/").append(topic);
return this.client.invokeApi(
DaprHttp.HttpMethods.POST.name(), url.toString(), null, serializedEvent, metadata).then();
} catch (Exception ex) {

View File

@ -69,7 +69,7 @@ public class DaprClientGrpcTest {
public void publishEventExceptionThrownTest() {
when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))
.thenThrow(RuntimeException.class);
Mono<Void> result = adapter.publishEvent("topic", "object");
Mono<Void> result = adapter.publishEvent("pubsubname","topic", "object");
result.block();
}
@ -81,7 +81,7 @@ public class DaprClientGrpcTest {
addCallback(settableFuture, callback, directExecutor());
when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))
.thenReturn(settableFuture);
Mono<Void> result = adapter.publishEvent("topic", "object");
Mono<Void> result = adapter.publishEvent("pubsubname","topic", "object");
settableFuture.setException(ex);
result.block();
}
@ -93,7 +93,7 @@ public class DaprClientGrpcTest {
addCallback(settableFuture, callback, directExecutor());
when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))
.thenReturn(settableFuture);
Mono<Void> result = adapter.publishEvent("topic", "object");
Mono<Void> result = adapter.publishEvent("pubsubname","topic", "object");
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -109,7 +109,7 @@ public class DaprClientGrpcTest {
settableFuture.set(Empty.newBuilder().build());
return settableFuture;
});
adapter.publishEvent("topic", "object");
adapter.publishEvent("pubsubname", "topic", "object");
// Do not call block() on the mono above, so nothing should happen.
assertFalse(callback.wasCalled);
}
@ -122,7 +122,7 @@ public class DaprClientGrpcTest {
when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))
.thenReturn(settableFuture);
MyObject event = new MyObject(1, "Event");
Mono<Void> result = adapter.publishEvent("topic", event);
Mono<Void> result = adapter.publishEvent("pubsubname", "topic", event);
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);

View File

@ -45,48 +45,48 @@ public class DaprClientHttpTest {
@Test
public void publishEventInvokation() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/publish/A")
.post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
.respond(EXPECTED_RESULT);
String event = "{ \"message\": \"This is a test\" }";
daprHttp = new DaprHttp(3000, okHttpClient);
DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp);
Mono<Void> mono = daprClientHttp.publishEvent("A", event, null);
Mono<Void> mono = daprClientHttp.publishEvent("mypubsubname", "A", event, null);
assertNull(mono.block());
}
@Test
public void publishEvent() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/publish/A")
.post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
.respond(EXPECTED_RESULT);
String event = "{ \"message\": \"This is a test\" }";
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
Mono<Void> mono = daprClientHttp.publishEvent("A", event);
Mono<Void> mono = daprClientHttp.publishEvent("mypubsubname","A", event);
assertNull(mono.block());
}
@Test(expected = IllegalArgumentException.class)
public void publishEventIfTopicIsNull() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/publish/A")
.post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
.respond(EXPECTED_RESULT);
String event = "{ \"message\": \"This is a test\" }";
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
Mono<Void> mono = daprClientHttp.publishEvent("", event);
Mono<Void> mono = daprClientHttp.publishEvent("mypubsubname", "", event);
assertNull(mono.block());
}
@Test
public void publishEventNoHotMono() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/publish/A")
.post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
.respond(EXPECTED_RESULT);
String event = "{ \"message\": \"This is a test\" }";
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
daprClientHttp.publishEvent("", event);
daprClientHttp.publishEvent("mypubsubname", "", event);
// Should not throw exception because did not call block() on mono above.
}

View File

@ -33,6 +33,8 @@ public class DaprRuntimeTest {
private static final String TYPE_PLAIN_TEXT = "plain/text";
private static final String PUBSUB_NAME = "mypubsubname";
private static final String TOPIC_NAME = "mytopic";
private static final String APP_ID = "myappid";
@ -111,7 +113,7 @@ public class DaprRuntimeTest {
for (Message message : messages) {
when(daprHttp.invokeApi(
eq("POST"),
eq(Constants.PUBLISH_PATH + "/" + TOPIC_NAME),
eq(Constants.PUBLISH_PATH + "/" + PUBSUB_NAME + "/" + TOPIC_NAME),
any(),
eq(serializer.serialize(message.data)),
eq(null)))
@ -120,7 +122,7 @@ public class DaprRuntimeTest {
this.serialize(message),
message.metadata).then());
client.publishEvent(TOPIC_NAME, message.data).block();
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message.data).block();
CloudEvent envelope = new CloudEvent(
message.id,