Allows override of content-type in pubsub publish. (#492)

This commit is contained in:
Artur Souza 2021-02-16 00:03:08 -08:00 committed by GitHub
parent 99c3794a62
commit ce7ecb9965
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 264 additions and 31 deletions

View File

@ -0,0 +1,79 @@
/*
* Copyright (c) Microsoft Corporation and Dapr Contributors.
* Licensed under the MIT License.
*/
package io.dapr.examples.pubsub.http;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.Metadata;
import io.dapr.client.domain.PublishEventRequestBuilder;
import java.util.UUID;
import static java.util.Collections.singletonMap;
/**
* Message publisher.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.CloudEventPublisher
*/
public class CloudEventPublisher {
//Number of messages to be sent.
private static final int NUM_MESSAGES = 10;
//Time-to-live for messages published.
private static final String MESSAGE_TTL_IN_SECONDS = "1000";
//The title of the topic to be used for publishing
private static final String TOPIC_NAME = "testingtopic";
//The name of the pubsub
private static final String PUBSUB_NAME = "messagebus";
/**
* This is the entry point of the publisher app example.
* @param args Args, unused.
* @throws Exception A startup Exception.
*/
public static void main(String[] args) throws Exception {
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
CloudEvent cloudEvent = new CloudEvent();
cloudEvent.setId(UUID.randomUUID().toString());
cloudEvent.setType("example");
cloudEvent.setSpecversion("1");
cloudEvent.setDatacontenttype("text/plain");
cloudEvent.setData(String.format("This is message #%d", i));
//Publishing messages
client.publishEvent(
new PublishEventRequestBuilder(PUBSUB_NAME, TOPIC_NAME, cloudEvent)
.withContentType(CloudEvent.CONTENT_TYPE)
.withMetadata(singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS))
.build()).block();
System.out.println("Published cloud event with message: " + cloudEvent.getData());
try {
Thread.sleep((long) (1000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
return;
}
}
// This is an example, so for simplicity we are just exiting here.
// Normally a dapr app would be a web service and not exit main.
System.out.println("Done.");
}
}
}

View File

@ -40,7 +40,6 @@ public class Publisher {
* @throws Exception A startup Exception.
*/
public static void main(String[] args) throws Exception {
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);

View File

@ -96,14 +96,15 @@ dapr run --components-path ./components/pubsub --app-id subscriber --app-port 30
The other component is the publisher. It is a simple java application with a main method that uses the Dapr HTTP Client to publish 10 messages to an specific topic.
In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and recieved objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and recieved objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
Dapr sidecar will automatically wrap the payload received into a CloudEvent object, which will later on parsed by the subscriber.
```java
public class Publisher {
private static final int NUM_MESSAGES = 10;
private static final String TOPIC_NAME = "testingtopic";
private static final String PUBSUB_NAME = "messagebus";
private static final int NUM_MESSAGES = 10;
private static final String TOPIC_NAME = "testingtopic";
private static final String PUBSUB_NAME = "messagebus";
///...
///...
public static void main(String[] args) throws Exception {
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
try (DaprClient client = new DaprClientBuilder().build()) {
@ -114,28 +115,41 @@ public class Publisher {
System.out.println("Published message: " + message);
//...
}
///...
}
}
///...
}
```
This example also pushes a non-string content event, the follow code in same `Publisher` main method publishes a bite:
The `CloudEventPublisher.java` file shows how the same can be accomplished if the application must send a CloudEvent object instead of relying on Dapr's automatic CloudEvent "wrapping".
In this case, the app MUST override the content-type parameter via `withContentType()`, so Dapr sidecar knows that the payload is already a CloudEvent object.
```java
public class Publisher {
///...
public static void main(String[] args) throws Exception {
///...
//Publishing a single bite: Example of non-string based content published
client.publishEvent(
TOPIC_NAME,
new byte[] { 1 },
Collections.singletonMap("content-type", "application/octet-stream")).block();
System.out.println("Published one byte.");
System.out.println("Done.");
///...
public static void main(String[] args) throws Exception {
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
CloudEvent cloudEvent = new CloudEvent();
cloudEvent.setId(UUID.randomUUID().toString());
cloudEvent.setType("example");
cloudEvent.setSpecversion("1");
cloudEvent.setDatacontenttype("text/plain");
cloudEvent.setData(String.format("This is message #%d", i));
//Publishing messages
client.publishEvent(
new PublishEventRequestBuilder(PUBSUB_NAME, TOPIC_NAME, cloudEvent)
.withContentType(CloudEvent.CONTENT_TYPE)
.withMetadata(singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS))
.build()).block();
System.out.println("Published cloud event with message: " + cloudEvent.getData());
//...
}
//...
}
}
///...
}
```

View File

@ -12,6 +12,7 @@ import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.Metadata;
import io.dapr.client.domain.PublishEventRequestBuilder;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.serializer.DaprObjectSerializer;
@ -26,6 +27,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import static io.dapr.it.Retry.callWithRetry;
@ -158,6 +160,12 @@ public class PubSubIT extends BaseIT {
System.out.println(String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, ANOTHER_TOPIC_NAME, PUBSUB_NAME));
}
//Publishing an object.
MyObject object = new MyObject();
object.setId("123");
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, object).block();
System.out.println("Published one object.");
//Publishing a single byte: Example of non-string based content published
client.publishEvent(
PUBSUB_NAME,
@ -165,17 +173,32 @@ public class PubSubIT extends BaseIT {
new byte[]{1}).block();
System.out.println("Published one byte.");
CloudEvent cloudEvent = new CloudEvent();
cloudEvent.setId("1234");
cloudEvent.setData("message from cloudevent");
cloudEvent.setSource("test");
cloudEvent.setSpecversion("1");
cloudEvent.setType("myevent");
cloudEvent.setDatacontenttype("text/plain");
//Publishing a cloud event.
client.publishEvent(new PublishEventRequestBuilder(PUBSUB_NAME, TOPIC_NAME, cloudEvent)
.withContentType("application/cloudevents+json")
.build()).block();
System.out.println("Published one cloud event.");
Thread.sleep(3000);
callWithRetry(() -> {
System.out.println("Checking results for topic " + TOPIC_NAME);
// Validate text payload.
final List<CloudEvent> messages = client.invokeMethod(
daprRun.getAppName(),
"messages/testingtopic",
null,
HttpExtension.GET,
CLOUD_EVENT_LIST_TYPE_REF).block();
assertEquals(11, messages.size());
assertEquals(13, messages.size());
for (int i = 0; i < NUM_MESSAGES; i++) {
final int messageId = i;
assertTrue(messages
@ -186,6 +209,16 @@ public class PubSubIT extends BaseIT {
.count() == 1);
}
// Validate object payload.
assertTrue(messages
.stream()
.filter(m -> m.getData() != null)
.filter(m -> m.getData() instanceof LinkedHashMap)
.map(m -> (LinkedHashMap)m.getData())
.filter(m -> "123".equals(m.get("id")))
.count() == 1);
// Validate byte payload.
assertTrue(messages
.stream()
.filter(m -> m.getData() != null)
@ -193,6 +226,13 @@ public class PubSubIT extends BaseIT {
.filter(m -> "AQ==".equals(m))
.count() == 1);
// Validate cloudevent payload.
assertTrue(messages
.stream()
.filter(m -> m.getData() != null)
.map(m -> m.getData())
.filter(m -> "message from cloudevent".equals(m))
.count() == 1);
}, 2000);
callWithRetry(() -> {
@ -330,4 +370,16 @@ public class PubSubIT extends BaseIT {
daprRun.stop();
}
public static class MyObject {
private String id;
public String getId() {
return this.id;
}
public void setId(String id) {
this.id = id;
}
}
}

View File

@ -40,7 +40,6 @@ import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.stub.StreamObserver;
import okhttp3.HttpUrl;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.context.Context;
@ -48,10 +47,8 @@ import reactor.util.context.Context;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -143,7 +140,15 @@ public class DaprClientGrpc extends AbstractDaprClient {
.setTopic(topic)
.setPubsubName(pubsubName)
.setData(ByteString.copyFrom(objectSerializer.serialize(data)));
envelopeBuilder.setDataContentType(objectSerializer.getContentType());
// Content-type can be overwritten on a per-request basis.
// It allows CloudEvents to be handled differently, for example.
String contentType = request.getContentType();
if (contentType == null || contentType.isEmpty()) {
contentType = objectSerializer.getContentType();
}
envelopeBuilder.setDataContentType(contentType);
Map<String, String> metadata = request.getMetadata();
if (metadata != null) {
envelopeBuilder.putAllMetadata(metadata);

View File

@ -137,7 +137,13 @@ public class DaprClientHttp extends AbstractDaprClient {
}
byte[] serializedEvent = objectSerializer.serialize(data);
Map<String, String> headers = Collections.singletonMap("content-type", objectSerializer.getContentType());
// Content-type can be overwritten on a per-request basis.
// It allows CloudEvents to be handled differently, for example.
String contentType = request.getContentType();
if (contentType == null || contentType.isEmpty()) {
contentType = objectSerializer.getContentType();
}
Map<String, String> headers = Collections.singletonMap("content-type", contentType);
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic };

View File

@ -10,7 +10,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.annotation.PropertyKey;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
@ -20,6 +19,11 @@ import java.util.Objects;
*/
public final class CloudEvent {
/**
* Mime type used for CloudEvent.
*/
public static final String CONTENT_TYPE = "application/cloudevents+json";
/**
* Shared Json serializer/deserializer as per Jackson's documentation.
*/

View File

@ -18,6 +18,8 @@ public class PublishEventRequest {
private Object data;
private String contentType;
private Map<String, String> metadata;
public String getPubsubName() {
@ -44,6 +46,14 @@ public class PublishEventRequest {
this.data = data;
}
public String getContentType() {
return this.contentType;
}
void setContentType(String contentType) {
this.contentType = contentType;
}
public Map<String, String> getMetadata() {
return metadata;
}

View File

@ -20,6 +20,8 @@ public class PublishEventRequestBuilder {
private final Object data;
private String contentType;
private Map<String, String> metadata = new HashMap<>();
/**
@ -34,6 +36,11 @@ public class PublishEventRequestBuilder {
this.data = data;
}
public PublishEventRequestBuilder withContentType(String contentType) {
this.contentType = contentType;
return this;
}
public PublishEventRequestBuilder withMetadata(Map<String, String> metadata) {
this.metadata = metadata == null ? null : Collections.unmodifiableMap(metadata);
return this;
@ -48,6 +55,7 @@ public class PublishEventRequestBuilder {
request.setPubsubName(this.pubsubName);
request.setTopic(this.topic);
request.setData(this.data);
request.setContentType(this.contentType);
request.setMetadata(this.metadata);
return request;
}

View File

@ -17,6 +17,8 @@ import io.dapr.client.domain.GetBulkStateRequestBuilder;
import io.dapr.client.domain.GetStateRequest;
import io.dapr.client.domain.GetStateRequestBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.PublishEventRequest;
import io.dapr.client.domain.PublishEventRequestBuilder;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.TransactionalStateOperation;
@ -35,6 +37,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.stubbing.Answer;
import reactor.core.publisher.Mono;
@ -212,13 +215,49 @@ public class DaprClientGrpcTest {
observer.onNext(Empty.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).publishEvent(any(DaprProtos.PublishEventRequest.class), any());
}).when(daprStub).publishEvent(ArgumentMatchers.argThat(publishEventRequest -> {
if (!"application/json".equals(publishEventRequest.getDataContentType())) {
return false;
}
if (!"{\"id\":1,\"value\":\"Event\"}".equals(new String(publishEventRequest.getData().toByteArray())) &&
!"{\"value\":\"Event\",\"id\":1}".equals(new String(publishEventRequest.getData().toByteArray()))) {
return false;
}
return true;
}), any());
MyObject event = new MyObject(1, "Event");
Mono<Void> result = client.publishEvent("pubsubname", "topic", event);
result.block();
}
@Test
public void publishEventContentTypeOverrideTest() {
doAnswer((Answer<Void>) invocation -> {
StreamObserver<Empty> observer = (StreamObserver<Empty>) invocation.getArguments()[1];
observer.onNext(Empty.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).publishEvent(ArgumentMatchers.argThat(publishEventRequest -> {
if (!"text/plain".equals(publishEventRequest.getDataContentType())) {
return false;
}
if (!"\"hello\"".equals(new String(publishEventRequest.getData().toByteArray()))) {
return false;
}
return true;
}), any());
Mono<Void> result = client.publishEvent(
new PublishEventRequestBuilder("pubsubname", "topic", "hello")
.withContentType("text/plain")
.build());
result.block();
}
@Test
public void invokeBindingIllegalArgumentExceptionTest() {
assertThrows(IllegalArgumentException.class, () -> {

View File

@ -9,6 +9,7 @@ import io.dapr.client.domain.DeleteStateRequestBuilder;
import io.dapr.client.domain.GetBulkStateRequestBuilder;
import io.dapr.client.domain.GetStateRequestBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.PublishEventRequestBuilder;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.TransactionalStateOperation;
@ -101,8 +102,8 @@ public class DaprClientHttpTest {
@Test
public void publishEventInvokation() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
.respond(EXPECTED_RESULT);
.post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
.respond(EXPECTED_RESULT);
String event = "{ \"message\": \"This is a test\" }";
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp);
@ -113,14 +114,30 @@ public class DaprClientHttpTest {
@Test
public void publishEvent() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
.respond(EXPECTED_RESULT);
.post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
.header("content-type", "application/json")
.respond(EXPECTED_RESULT);
String event = "{ \"message\": \"This is a test\" }";
Mono<Void> mono = daprClientHttp.publishEvent("mypubsubname","A", event);
assertNull(mono.block());
}
@Test
public void publishEventContentTypeOverride() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
.header("content-type", "text/plain")
.respond(EXPECTED_RESULT);
String event = "{ \"message\": \"This is a test\" }";
Mono<Void> mono = daprClientHttp.publishEvent(
new PublishEventRequestBuilder("mypubsubname","A", event)
.withContentType("text/plain")
.build());
assertNull(mono.block());
}
@Test
public void publishEventIfTopicIsNullOrEmpty() {
String event = "{ \"message\": \"This is a test\" }";