mirror of https://github.com/dapr/java-sdk.git
Support TTL in PubSub. Fixes + refactor of CloudEvent. Fix handling content-type in PubSub over GRPC (#431)
This commit is contained in:
parent
2714402a0e
commit
1a6675add8
|
|
@ -7,10 +7,13 @@ package io.dapr.examples.pubsub.http;
|
|||
|
||||
import io.dapr.client.DaprClient;
|
||||
import io.dapr.client.DaprClientBuilder;
|
||||
import io.dapr.client.domain.Metadata;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
|
||||
/**
|
||||
* Message publisher.
|
||||
* 1. Build and install jars:
|
||||
|
|
@ -22,12 +25,16 @@ import java.util.Collections;
|
|||
*/
|
||||
public class Publisher {
|
||||
|
||||
//Number of messages to be sent: 10
|
||||
//Number of messages to be sent.
|
||||
private static final int NUM_MESSAGES = 10;
|
||||
|
||||
//Time-to-live for messages published.
|
||||
private static final String MESSAGE_TTL_IN_SECONDS = "1000";
|
||||
|
||||
//The title of the topic to be used for publishing
|
||||
private static final String TOPIC_NAME = "testingtopic";
|
||||
|
||||
//The name of the pubseb
|
||||
//The name of the pubsub
|
||||
private static final String PUBSUB_NAME = "messagebus";
|
||||
|
||||
/**
|
||||
|
|
@ -41,7 +48,11 @@ public class Publisher {
|
|||
for (int i = 0; i < NUM_MESSAGES; i++) {
|
||||
String message = String.format("This is message #%d", i);
|
||||
//Publishing messages
|
||||
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
|
||||
client.publishEvent(
|
||||
PUBSUB_NAME,
|
||||
TOPIC_NAME,
|
||||
message,
|
||||
singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
|
||||
System.out.println("Published message: " + message);
|
||||
|
||||
try {
|
||||
|
|
@ -53,14 +64,6 @@ public class Publisher {
|
|||
}
|
||||
}
|
||||
|
||||
//Publishing a single bite: Example of non-string based content published
|
||||
client.publishEvent(
|
||||
PUBSUB_NAME,
|
||||
TOPIC_NAME,
|
||||
new byte[]{1},
|
||||
Collections.singletonMap("content-type", "application/octet-stream")).block();
|
||||
System.out.println("Published one byte.");
|
||||
|
||||
// This is an example, so for simplicity we are just exiting here.
|
||||
// Normally a dapr app would be a web service and not exit main.
|
||||
System.out.println("Done.");
|
||||
|
|
|
|||
|
|
@ -135,14 +135,112 @@ dapr run --components-path ./components/pubsub --app-id publisher -- java -jar t
|
|||
|
||||
Once running, the Publisher should print the output as follows:
|
||||
|
||||

|
||||
```txt
|
||||
✅ You're up and running! Both Dapr and your app logs will appear here.
|
||||
|
||||
== APP == Published message: This is message #0
|
||||
|
||||
== APP == Published message: This is message #1
|
||||
|
||||
== APP == Published message: This is message #2
|
||||
|
||||
== APP == Published message: This is message #3
|
||||
|
||||
== APP == Published message: This is message #4
|
||||
|
||||
== APP == Published message: This is message #5
|
||||
|
||||
== APP == Published message: This is message #6
|
||||
|
||||
== APP == Published message: This is message #7
|
||||
|
||||
== APP == Published message: This is message #8
|
||||
|
||||
== APP == Published message: This is message #9
|
||||
|
||||
== APP == Done.
|
||||
|
||||
```
|
||||
|
||||
Messages have been published in the topic.
|
||||
|
||||
Once running, the Subscriber should print the output as follows:
|
||||
|
||||

|
||||
```txt
|
||||
== APP == Subscriber got: {"id":"1f646657-0032-4797-b59b-c57b4f40743b","source":"publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"application/json","data":"This is message #3","expiration":"2020-12-24T05:29:12Z"}
|
||||
|
||||
== APP == Subscriber got: {"id":"a22b97ce-9008-4fba-8b57-c3c3e1f031b6","source":"publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"application/json","data":"This is message #8","expiration":"2020-12-24T05:29:15Z"}
|
||||
|
||||
== APP == Subscriber got: {"id":"abb2f110-6862-49f7-8c8d-189f6dcd177d","source":"publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"application/json","data":"This is message #0","expiration":"2020-12-24T05:29:11Z"}
|
||||
|
||||
== APP == Subscriber got: {"id":"043f31d3-c13a-4a02-ac89-64ecca946598","source":"publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"application/json","data":"This is message #7","expiration":"2020-12-24T05:29:14Z"}
|
||||
|
||||
== APP == Subscriber got: {"id":"acc554f4-7109-4c31-9374-0e5936b90180","source":"publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"application/json","data":"This is message #2","expiration":"2020-12-24T05:29:12Z"}
|
||||
|
||||
== APP == Subscriber got: {"id":"8b3ad160-368d-4b0f-9925-8fa2a2fbf5ca","source":"publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"application/json","data":"This is message #9","expiration":"2020-12-24T05:29:15Z"}
|
||||
|
||||
== APP == Subscriber got: {"id":"e41d4512-511a-4a2b-80f3-a0a4d091c9a5","source":"publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"application/json","data":"This is message #1","expiration":"2020-12-24T05:29:11Z"}
|
||||
|
||||
== APP == Subscriber got: {"id":"33e21664-128e-4fc4-b5c4-ed257f758336","source":"publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"application/json","data":"This is message #4","expiration":"2020-12-24T05:29:13Z"}
|
||||
|
||||
== APP == Subscriber got: {"id":"bd14f1ee-ca6b-47f7-8130-dd1e6de5b03c","source":"publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"application/json","data":"This is message #6","expiration":"2020-12-24T05:29:14Z"}
|
||||
|
||||
== APP == Subscriber got: {"id":"acc57cd6-71da-4ba3-9a12-9c921ca49af7","source":"publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"application/json","data":"This is message #5","expiration":"2020-12-24T05:29:13Z"}
|
||||
|
||||
```
|
||||
|
||||
Messages have been retrieved from the topic.
|
||||
|
||||
### Message expiration (Optional)
|
||||
|
||||
Optionally, you can see how Dapr can automatically drop expired messages on behalf of the subscriber.
|
||||
First, make sure the publisher and the subscriber applications are stopped.
|
||||
Then, change the TTL constant in the `Publisher.java` file from:
|
||||
```java
|
||||
private static final String MESSAGE_TTL_IN_SECONDS = "1000";
|
||||
```
|
||||
To:
|
||||
```java
|
||||
private static final String MESSAGE_TTL_IN_SECONDS = "1";
|
||||
```
|
||||
|
||||
Now rebuild the example:
|
||||
```sh
|
||||
mvn install
|
||||
```
|
||||
|
||||
Run the publisher app:
|
||||
```sh
|
||||
dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Publisher
|
||||
```
|
||||
|
||||
Wait until all 10 messages are published like before, then wait for a few more seconds and run the subscriber app:
|
||||
```sh
|
||||
dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000
|
||||
```
|
||||
|
||||
No message is consumed by the subscriber app and warnings messages are emitted from Dapr sidecar:
|
||||
```txt
|
||||
== DAPR == time="2020-12-23T21:21:59.085797-08:00" level=warning msg="dropping expired pub/sub event 461546c1-d2df-42bd-a6b8-3beeb952fe1e as of 2020-12-24T05:21:50Z" app_id=subscriber instance=myhost scope=dapr.runtime type=log ver=edge
|
||||
|
||||
== DAPR == time="2020-12-23T21:21:59.085841-08:00" level=warning msg="dropping expired pub/sub event 2d8cf9a6-4019-4dda-95fd-59218a19381b as of 2020-12-24T05:21:48Z" app_id=subscriber instance=myhost scope=dapr.runtime type=log ver=edge
|
||||
|
||||
== DAPR == time="2020-12-23T21:21:59.085871-08:00" level=warning msg="dropping expired pub/sub event d2a199e0-a4b8-4067-9618-6688391ad68f as of 2020-12-24T05:21:53Z" app_id=subscriber instance=myhost scope=dapr.runtime type=log ver=edge
|
||||
|
||||
== DAPR == time="2020-12-23T21:21:59.085894-08:00" level=warning msg="dropping expired pub/sub event 30719f17-ad8f-4dea-91b5-b77958f360d4 as of 2020-12-24T05:21:49Z" app_id=subscriber instance=myhost scope=dapr.runtime type=log ver=edge
|
||||
|
||||
== DAPR == time="2020-12-23T21:21:59.085797-08:00" level=warning msg="dropping expired pub/sub event d136d5ae-5561-418c-a850-9d1698bc8840 as of 2020-12-24T05:21:51Z" app_id=subscriber instance=myhost scope=dapr.runtime type=log ver=edge
|
||||
|
||||
== DAPR == time="2020-12-23T21:21:59.085958-08:00" level=warning msg="dropping expired pub/sub event 82b334a2-e295-48ea-8c6c-c45b1c4fcd2d as of 2020-12-24T05:21:50Z" app_id=subscriber instance=myhost scope=dapr.runtime type=log ver=edge
|
||||
|
||||
== DAPR == time="2020-12-23T21:21:59.085973-08:00" level=warning msg="dropping expired pub/sub event f6eb3f9f-185f-492f-9df9-45af8c91932b as of 2020-12-24T05:21:53Z" app_id=subscriber instance=myhost scope=dapr.runtime type=log ver=edge
|
||||
|
||||
== DAPR == time="2020-12-23T21:21:59.086041-08:00" level=warning msg="dropping expired pub/sub event a536eb9f-34e0-49fc-ba29-a34854398d96 as of 2020-12-24T05:21:52Z" app_id=subscriber instance=myhost scope=dapr.runtime type=log ver=edge
|
||||
|
||||
== DAPR == time="2020-12-23T21:21:59.085995-08:00" level=warning msg="dropping expired pub/sub event 52cc9528-f9d4-44f4-8f78-8f32341a743a as of 2020-12-24T05:21:49Z" app_id=subscriber instance=myhost scope=dapr.runtime type=log ver=edge
|
||||
|
||||
== DAPR == time="2020-12-23T21:21:59.085797-08:00" level=warning msg="dropping expired pub/sub event 7cf927e8-e832-4f8a-911a-1cae5a1369d2 as of 2020-12-24T05:21:48Z" app_id=subscriber instance=myhost scope=dapr.runtime type=log ver=edge
|
||||
|
||||
```
|
||||
|
||||
For more details on Dapr Spring Boot integration, please refer to [Dapr Spring Boot](../../../springboot/DaprApplication.java) Application implementation.
|
||||
|
|
|
|||
|
|
@ -5,40 +5,33 @@
|
|||
|
||||
package io.dapr.examples.pubsub.http;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.dapr.Topic;
|
||||
import io.dapr.client.domain.CloudEvent;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestHeader;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* SpringBoot Controller to handle input binding.
|
||||
*/
|
||||
@RestController
|
||||
public class SubscriberController {
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
/**
|
||||
* Handles a registered publish endpoint on this app.
|
||||
* @param body The body of the http message.
|
||||
* @param headers The headers of the http message.
|
||||
* @param cloudEvent The cloud event received.
|
||||
* @return A message containing the time.
|
||||
*/
|
||||
@Topic(name = "testingtopic", pubsubName = "messagebus")
|
||||
@PostMapping(path = "/testingtopic")
|
||||
public Mono<Void> handleMessage(@RequestBody(required = false) byte[] body,
|
||||
@RequestHeader Map<String, String> headers) {
|
||||
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent cloudEvent) {
|
||||
return Mono.fromRunnable(() -> {
|
||||
try {
|
||||
// Dapr's event is compliant to CloudEvent.
|
||||
CloudEvent envelope = CloudEvent.deserialize(body);
|
||||
|
||||
String message = envelope.getData() == null ? "" : envelope.getData();
|
||||
System.out.println("Subscriber got message: " + message);
|
||||
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ import io.opentelemetry.api.OpenTelemetry;
|
|||
import io.opentelemetry.api.trace.Tracer;
|
||||
import io.opentelemetry.api.trace.propagation.HttpTraceContext;
|
||||
import io.opentelemetry.context.propagation.DefaultContextPropagators;
|
||||
import io.opentelemetry.exporter.logging.LoggingSpanExporter;
|
||||
import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
|
||||
import io.opentelemetry.sdk.OpenTelemetrySdk;
|
||||
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
|
||||
|
|
@ -64,10 +63,6 @@ public class OpenTelemetryConfig {
|
|||
System.out.println("WARNING: Zipkin is not available.");
|
||||
}
|
||||
|
||||
final LoggingSpanExporter loggingExporter = new LoggingSpanExporter();
|
||||
OpenTelemetrySdk.getGlobalTracerManagement()
|
||||
.addSpanProcessor(SimpleSpanProcessor.builder(loggingExporter).build());
|
||||
|
||||
return tracer;
|
||||
}
|
||||
|
||||
|
|
|
|||
Binary file not shown.
|
Before Width: | Height: | Size: 405 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 341 KiB |
2
pom.xml
2
pom.xml
|
|
@ -16,7 +16,7 @@
|
|||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<grpc.version>1.33.1</grpc.version>
|
||||
<protobuf.version>3.13.0</protobuf.version>
|
||||
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/3cca3cc1567f1cb955ae69b8fd784f075f62ad42/dapr/proto</dapr.proto.baseurl>
|
||||
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/33dc3361e99f75e88628e0abfc8b04d3d7e8bc5f/dapr/proto</dapr.proto.baseurl>
|
||||
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
|
||||
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
|
||||
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
|
||||
|
|
|
|||
|
|
@ -21,29 +21,25 @@ import java.util.Map;
|
|||
@RestController
|
||||
public class SubscriberController {
|
||||
|
||||
private static final List<String> messagesReceivedTestingTopic = new ArrayList();
|
||||
private static final List<String> messagesReceivedAnotherTopic = new ArrayList();
|
||||
private static final List<Object> messagesReceivedTestingTopic = new ArrayList();
|
||||
private static final List<Object> messagesReceivedAnotherTopic = new ArrayList();
|
||||
|
||||
@GetMapping(path = "/messages/testingtopic")
|
||||
public List<String> getMessagesReceivedTestingTopic() {
|
||||
public List<Object> getMessagesReceivedTestingTopic() {
|
||||
return messagesReceivedTestingTopic;
|
||||
}
|
||||
|
||||
@GetMapping(path = "/messages/anothertopic")
|
||||
public List<String> getMessagesReceivedAnotherTopic() {
|
||||
public List<Object> getMessagesReceivedAnotherTopic() {
|
||||
return messagesReceivedAnotherTopic;
|
||||
}
|
||||
|
||||
@Topic(name = "testingtopic", pubsubName = "messagebus")
|
||||
@PostMapping(path = "/route1")
|
||||
public Mono<Void> handleMessage(@RequestBody(required = false) byte[] body,
|
||||
@RequestHeader Map<String, String> headers) {
|
||||
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent envelope) {
|
||||
return Mono.fromRunnable(() -> {
|
||||
try {
|
||||
// Dapr's event is compliant to CloudEvent.
|
||||
CloudEvent envelope = CloudEvent.deserialize(body);
|
||||
|
||||
String message = envelope.getData() == null ? "" : envelope.getData();
|
||||
String message = envelope.getData() == null ? "" : envelope.getData().toString();
|
||||
System.out.println("Testing topic Subscriber got message: " + message);
|
||||
messagesReceivedTestingTopic.add(envelope.getData());
|
||||
} catch (Exception e) {
|
||||
|
|
@ -54,14 +50,10 @@ public class SubscriberController {
|
|||
|
||||
@Topic(name = "anothertopic", pubsubName = "messagebus")
|
||||
@PostMapping(path = "/route2")
|
||||
public Mono<Void> handleMessageAnotherTopic(@RequestBody(required = false) byte[] body,
|
||||
@RequestHeader Map<String, String> headers) {
|
||||
public Mono<Void> handleMessageAnotherTopic(@RequestBody(required = false) CloudEvent envelope) {
|
||||
return Mono.fromRunnable(() -> {
|
||||
try {
|
||||
// Dapr's event is compliant to CloudEvent.
|
||||
CloudEvent envelope = CloudEvent.deserialize(body);
|
||||
|
||||
String message = envelope.getData() == null ? "" : envelope.getData();
|
||||
String message = envelope.getData() == null ? "" : envelope.getData().toString();
|
||||
System.out.println("Another topic Subscriber got message: " + message);
|
||||
messagesReceivedAnotherTopic.add(envelope.getData());
|
||||
} catch (Exception e) {
|
||||
|
|
|
|||
|
|
@ -140,16 +140,22 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
String pubsubName = request.getPubsubName();
|
||||
String topic = request.getTopic();
|
||||
Object data = request.getData();
|
||||
// TODO(artursouza): handle metadata.
|
||||
// Map<String, String> metadata = request.getMetadata();
|
||||
Context context = request.getContext();
|
||||
DaprProtos.PublishEventRequest envelope = DaprProtos.PublishEventRequest.newBuilder()
|
||||
DaprProtos.PublishEventRequest.Builder envelopeBuilder = DaprProtos.PublishEventRequest.newBuilder()
|
||||
.setTopic(topic)
|
||||
.setPubsubName(pubsubName)
|
||||
.setData(ByteString.copyFrom(objectSerializer.serialize(data))).build();
|
||||
.setData(ByteString.copyFrom(objectSerializer.serialize(data)));
|
||||
Map<String, String> metadata = request.getMetadata();
|
||||
if (metadata != null) {
|
||||
envelopeBuilder.putAllMetadata(metadata);
|
||||
String contentType = metadata.get(io.dapr.client.domain.Metadata.CONTENT_TYPE);
|
||||
if (contentType != null) {
|
||||
envelopeBuilder.setDataContentType(contentType);
|
||||
}
|
||||
}
|
||||
|
||||
return Mono.fromCallable(wrap(context, () -> {
|
||||
get(client.publishEvent(envelope));
|
||||
get(client.publishEvent(envelopeBuilder.build()));
|
||||
return null;
|
||||
}));
|
||||
} catch (Exception ex) {
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package io.dapr.client;
|
|||
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.dapr.client.domain.Metadata;
|
||||
import io.dapr.config.Properties;
|
||||
import io.dapr.exceptions.DaprError;
|
||||
import io.dapr.exceptions.DaprException;
|
||||
|
|
@ -242,7 +243,7 @@ public class DaprHttp implements AutoCloseable {
|
|||
final String requestId = UUID.randomUUID().toString();
|
||||
RequestBody body;
|
||||
|
||||
String contentType = headers != null ? headers.get("content-type") : null;
|
||||
String contentType = headers != null ? headers.get(Metadata.CONTENT_TYPE) : null;
|
||||
MediaType mediaType = contentType == null ? MEDIA_TYPE_APPLICATION_JSON : MediaType.get(contentType);
|
||||
if (content == null) {
|
||||
body = mediaType.equals(MEDIA_TYPE_APPLICATION_JSON)
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ package io.dapr.client.domain;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
@ -28,36 +27,41 @@ public final class CloudEvent {
|
|||
/**
|
||||
* Identifier of the message being processed.
|
||||
*/
|
||||
private final String id;
|
||||
private String id;
|
||||
|
||||
/**
|
||||
* Event's source.
|
||||
*/
|
||||
private final String source;
|
||||
private String source;
|
||||
|
||||
/**
|
||||
* Envelope type.
|
||||
*/
|
||||
private final String type;
|
||||
private String type;
|
||||
|
||||
/**
|
||||
* Version of the specification.
|
||||
*/
|
||||
private final String specversion;
|
||||
private String specversion;
|
||||
|
||||
/**
|
||||
* Type of the data's content.
|
||||
*/
|
||||
private final String datacontenttype;
|
||||
private String datacontenttype;
|
||||
|
||||
/**
|
||||
* Cloud event specs says data can be a JSON object or string.
|
||||
*/
|
||||
private final String data;
|
||||
private Object data;
|
||||
|
||||
/**
|
||||
* Instantiates a new input request.
|
||||
*
|
||||
* Instantiates a CloudEvent.
|
||||
*/
|
||||
public CloudEvent() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a CloudEvent.
|
||||
* @param id Identifier of the message being processed.
|
||||
* @param source Source for this event.
|
||||
* @param type Type of event.
|
||||
|
|
@ -71,7 +75,7 @@ public final class CloudEvent {
|
|||
String type,
|
||||
String specversion,
|
||||
String datacontenttype,
|
||||
String data) {
|
||||
Object data) {
|
||||
this.id = id;
|
||||
this.source = source;
|
||||
this.type = type;
|
||||
|
|
@ -80,86 +84,9 @@ public final class CloudEvent {
|
|||
this.data = data;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Gets the identifier of the message being processed.
|
||||
*
|
||||
* @return Identifier of the message being processed.
|
||||
*/
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the source for this event.
|
||||
*
|
||||
* @return Source for this event.
|
||||
*/
|
||||
public String getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the type of event.
|
||||
*
|
||||
* @return Type of event.
|
||||
*/
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the version of the event spec.
|
||||
*
|
||||
* @return Version of the event spec.
|
||||
*/
|
||||
public String getSpecversion() {
|
||||
return specversion;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the type of the payload.
|
||||
*
|
||||
* @return Type of the payload.
|
||||
*/
|
||||
public String getDatacontenttype() {
|
||||
return datacontenttype;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the payload.
|
||||
*
|
||||
* @return Payload
|
||||
*/
|
||||
public String getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
CloudEvent that = (CloudEvent) o;
|
||||
return Objects.equals(id, that.id)
|
||||
&& Objects.equals(source, that.source)
|
||||
&& Objects.equals(type, that.type)
|
||||
&& Objects.equals(specversion, that.specversion)
|
||||
&& Objects.equals(datacontenttype, that.datacontenttype)
|
||||
&& Objects.equals(data, that.data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, source, type, specversion, datacontenttype, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialized a message topic from Dapr.
|
||||
* Deserialize a message topic from Dapr.
|
||||
*
|
||||
* @param payload Payload sent from Dapr.
|
||||
* @return Message (can be null if input is null)
|
||||
|
|
@ -170,47 +97,130 @@ public final class CloudEvent {
|
|||
return null;
|
||||
}
|
||||
|
||||
JsonNode node = OBJECT_MAPPER.readTree(payload);
|
||||
return OBJECT_MAPPER.readValue(payload, CloudEvent.class);
|
||||
}
|
||||
|
||||
if (node == null) {
|
||||
return null;
|
||||
/**
|
||||
* Gets the identifier of the message being processed.
|
||||
* @return Identifier of the message being processed.
|
||||
*/
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the identifier of the message being processed.
|
||||
* @param id Identifier of the message being processed.
|
||||
*/
|
||||
public void setId(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the event's source.
|
||||
* @return Event's source.
|
||||
*/
|
||||
public String getSource() {
|
||||
return source;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the event's source.
|
||||
* @param source Event's source.
|
||||
*/
|
||||
public void setSource(String source) {
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the envelope type.
|
||||
* @return Envelope type.
|
||||
*/
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the envelope type.
|
||||
* @param type Envelope type.
|
||||
*/
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the version of the specification.
|
||||
* @return Version of the specification.
|
||||
*/
|
||||
public String getSpecversion() {
|
||||
return specversion;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the version of the specification.
|
||||
* @param specversion Version of the specification.
|
||||
*/
|
||||
public void setSpecversion(String specversion) {
|
||||
this.specversion = specversion;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the type of the data's content.
|
||||
* @return Type of the data's content.
|
||||
*/
|
||||
public String getDatacontenttype() {
|
||||
return datacontenttype;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the type of the data's content.
|
||||
* @param datacontenttype Type of the data's content.
|
||||
*/
|
||||
public void setDatacontenttype(String datacontenttype) {
|
||||
this.datacontenttype = datacontenttype;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the cloud event data.
|
||||
* @return Cloud event's data. As per specs, data can be a JSON object or string.
|
||||
*/
|
||||
public Object getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the cloud event data. As per specs, data can be a JSON object or string.
|
||||
* @param data Cloud event's data. As per specs, data can be a JSON object or string.
|
||||
*/
|
||||
public void setData(Object data) {
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
|
||||
String id = null;
|
||||
if (node.has("id") && !node.get("id").isNull()) {
|
||||
id = node.get("id").asText();
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
CloudEvent that = (CloudEvent) o;
|
||||
return Objects.equals(id, that.id)
|
||||
&& Objects.equals(source, that.source)
|
||||
&& Objects.equals(type, that.type)
|
||||
&& Objects.equals(specversion, that.specversion)
|
||||
&& Objects.equals(datacontenttype, that.datacontenttype)
|
||||
&& Objects.equals(data, that.data);
|
||||
}
|
||||
|
||||
String source = null;
|
||||
if (node.has("source") && !node.get("source").isNull()) {
|
||||
source = node.get("source").asText();
|
||||
}
|
||||
|
||||
String type = null;
|
||||
if (node.has("type") && !node.get("type").isNull()) {
|
||||
type = node.get("type").asText();
|
||||
}
|
||||
|
||||
String specversion = null;
|
||||
if (node.has("specversion") && !node.get("specversion").isNull()) {
|
||||
specversion = node.get("specversion").asText();
|
||||
}
|
||||
|
||||
String datacontenttype = null;
|
||||
if (node.has("datacontenttype") && !node.get("datacontenttype").isNull()) {
|
||||
datacontenttype = node.get("datacontenttype").asText();
|
||||
}
|
||||
|
||||
String data = null;
|
||||
if (node.has("data") && !node.get("data").isNull()) {
|
||||
JsonNode dataNode = node.get("data");
|
||||
if (dataNode.isTextual()) {
|
||||
data = dataNode.textValue();
|
||||
} else {
|
||||
data = node.get("data").toString();
|
||||
}
|
||||
}
|
||||
|
||||
return new CloudEvent(id, source, type, specversion, datacontenttype, data);
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, source, type, specversion, datacontenttype, data);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.client.domain;
|
||||
|
||||
/**
|
||||
* Enumerates commonly used metadata attributes.
|
||||
*/
|
||||
public final class Metadata {
|
||||
|
||||
public static final String CONTENT_TYPE = "content-type";
|
||||
|
||||
public static final String TTL_IN_SECONDS = "ttlInSeconds";
|
||||
|
||||
private Metadata() {
|
||||
}
|
||||
}
|
||||
|
|
@ -14,7 +14,9 @@ import java.io.IOException;
|
|||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Base64;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
|
@ -780,7 +782,7 @@ public class DefaultObjectSerializerTest {
|
|||
public void deserializeCloudEventEnvelopeData() throws Exception {
|
||||
|
||||
|
||||
Function<String, String> deserializeData = (jsonData -> {
|
||||
Function<String, Object> deserializeData = (jsonData -> {
|
||||
try {
|
||||
String payload = String.format("{\"data\": %s}", jsonData);
|
||||
return CloudEvent.deserialize(payload.getBytes()).getData();
|
||||
|
|
@ -789,26 +791,28 @@ public class DefaultObjectSerializerTest {
|
|||
}
|
||||
});
|
||||
|
||||
assertEquals("123",
|
||||
deserializeData.apply("123"));
|
||||
assertEquals("true",
|
||||
deserializeData.apply("true"));
|
||||
assertEquals("123.45",
|
||||
deserializeData.apply("123.45"));
|
||||
assertEquals(123,
|
||||
deserializeData.apply("123"));
|
||||
assertEquals(true,
|
||||
deserializeData.apply("true"));
|
||||
assertEquals(123.45,
|
||||
deserializeData.apply("123.45"));
|
||||
assertEquals("AAEI",
|
||||
deserializeData.apply(quote(Base64.getEncoder().encodeToString(new byte[] { 0, 1, 8}))));
|
||||
deserializeData.apply(quote(Base64.getEncoder().encodeToString(new byte[]{0, 1, 8}))));
|
||||
assertEquals("hello world",
|
||||
deserializeData.apply(quote("hello world")));
|
||||
deserializeData.apply(quote("hello world")));
|
||||
assertEquals("\"hello world\"",
|
||||
deserializeData.apply(quote("\\\"hello world\\\"")));
|
||||
deserializeData.apply(quote("\\\"hello world\\\"")));
|
||||
assertEquals("\"hello world\"",
|
||||
deserializeData.apply(new ObjectMapper().writeValueAsString("\"hello world\"")));
|
||||
deserializeData.apply(new ObjectMapper().writeValueAsString("\"hello world\"")));
|
||||
assertEquals("hello world",
|
||||
deserializeData.apply(new ObjectMapper().writeValueAsString("hello world")));
|
||||
assertEquals("{\"id\":\"123:\",\"name\":\"Jon Doe\"}",
|
||||
deserializeData.apply("{\"id\": \"123:\", \"name\": \"Jon Doe\"}"));
|
||||
assertEquals("{\"id\": \"123:\", \"name\": \"Jon Doe\"}",
|
||||
deserializeData.apply(new ObjectMapper().writeValueAsString("{\"id\": \"123:\", \"name\": \"Jon Doe\"}")));
|
||||
deserializeData.apply(new ObjectMapper().writeValueAsString("hello world")));
|
||||
assertEquals(new TreeMap<String, String>() {{
|
||||
put("id", "123");
|
||||
put("name", "Jon Doe");
|
||||
}}, deserializeData.apply("{\"id\": \"123\", \"name\": \"Jon Doe\"}"));
|
||||
assertEquals("{\"id\": \"123\", \"name\": \"Jon Doe\"}",
|
||||
deserializeData.apply(new ObjectMapper().writeValueAsString("{\"id\": \"123\", \"name\": \"Jon Doe\"}")));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
Loading…
Reference in New Issue