Invoke service + unit tests. (#78)

This commit is contained in:
Artur Souza 2020-01-10 14:46:57 -08:00 committed by GitHub
parent d9c60441fd
commit 9dd89da7e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 754 additions and 283 deletions

View File

@ -0,0 +1,86 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.examples.invoke.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import io.dapr.DaprClientGrpc;
import io.dapr.DaprClientProtos;
import io.dapr.runtime.Dapr;
import io.dapr.runtime.MethodListener;
import io.dapr.springboot.DaprApplication;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Map;
import java.util.TimeZone;
import static io.dapr.examples.DaprExamplesProtos.SayRequest;
import static io.dapr.examples.DaprExamplesProtos.SayResponse;
/**
* 1. Build and install jars:
* mvn clean install
* 2. Run in server mode:
* dapr run --app-id invokedemo --app-port 3000 --port 3005 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.invoke.http.DemoService -Dexec.args="-p 3000"
*/
public class DemoService {
/**
* Shared Json serializer/deserializer.
*/
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* Format to output date and time.
*/
private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
/**
* Starts the service.
* @param args Expects the port: -p PORT
* @throws Exception If cannot start service.
*/
public static void main(String[] args) throws Exception {
Options options = new Options();
options.addRequiredOption("p", "port", true, "Port to listen to.");
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);
// If port string is not valid, it will throw an exception.
int port = Integer.parseInt(cmd.getOptionValue("port"));
Dapr.getInstance().registerServiceMethod("say", (data, metadata) -> {
String message = data == null ? "" : new String(data, StandardCharsets.UTF_8);
Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());
String metadataString = metadata == null ? "" : OBJECT_MAPPER.writeValueAsString(metadata);
// Handles the request by printing message.
System.out.println(
"Server: " + message + " @ " + utcNowAsString + " and metadata: " + metadataString);
return Mono.just(utcNowAsString.getBytes(StandardCharsets.UTF_8));
});
DaprApplication.start(port);
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.examples.invoke.http;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.Verb;
/**
* 1. Build and install jars:
* mvn clean install
* 2. Send messages to the server:
* dapr run --port 3006 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.invoke.http.InvokeClient -Dexec.args="'message one' 'message two'"
*/
public class InvokeClient {
/**
* Identifier in Dapr for the service this client will invoke.
*/
private static final String SERVICE_APP_ID = "invokedemo";
/**
* Starts the invoke client.
* @param args Messages to be sent as request for the invoke API.
*/
public static void main(String[] args) {
DaprClient client = (new DaprClientBuilder()).build();
for (String message : args) {
client.invokeService(Verb.POST, SERVICE_APP_ID, "say", message, null, String.class).block();
}
}
}

View File

@ -71,14 +71,10 @@ public class DaprController {
}
@PostMapping(path = "/{name}")
public Mono<byte[]> invokeMethodOrTopic(@PathVariable("name") String name,
@RequestBody(required = false) byte[] body,
@RequestHeader Map<String, String> header) {
return Dapr.getInstance().handleInvocation(name, body, header);
}
@PutMapping(path = "/{name}")
public Mono<byte[]> invokeMethodOrTopicViaPut(@PathVariable("name") String name,
@DeleteMapping(path = "/{name}")
@GetMapping(path = "/{name}")
public Mono<byte[]> invokeMethodOrTopic(@PathVariable("name") String name,
@RequestBody(required = false) byte[] body,
@RequestHeader Map<String, String> header) {
return Dapr.getInstance().handleInvocation(name, body, header);

View File

@ -6,6 +6,7 @@ package io.dapr.client;
import io.dapr.client.domain.StateKeyValue;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.Verb;
import reactor.core.publisher.Mono;
import java.util.List;
@ -21,55 +22,93 @@ public interface DaprClient {
/**
* Publish an event.
*
* @param topic the topic where the event will be published
* @param event the event to be published
* @param <T> The type of event to be published.
* @return a Mono plan of type Void
* @param topic the topic where the event will be published.
* @param event the event to be published.
* @param <T> The type of event to be published, use byte[] for skipping serialization.
* @return a Mono plan of type Void.
*/
<T> Mono<Void> publishEvent(String topic, T event);
/**
* Publish an event.
*
* @param topic the topic where the event will be published
* @param event the event to be published
* @param topic the topic where the event will be published.
* @param event the event to be published.
* @param metadata The metadata for the published event.
* @param <T> The type of event to be published.
* @return a Mono plan of type Void
* @param <T> The type of event to be published, use byte[] for skipping serialization.
* @return a Mono plan of type Void.
*/
<T> Mono<Void> publishEvent(String topic, T event, Map<String, String> metadata);
/**
* Invoke a service
* Invoke a service with all possible parameters, using serialization.
*
* @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value.
* @param appId The Application ID where the service is
* @param appId The Application ID where the service is.
* @param method The actual Method to be call in the application.
* @param request The request to be sent to invoke the service
* @param clazz the Type needed as return for the call
* @param <T> the Type of the return
* @param <K> The Type of the request.
* @return A Mono Plan of type clazz
* @param request The request to be sent to invoke the service.
* @param metadata Metadata (in GRPC) or headers (in HTTP) to be send in request.
* @param clazz the Type needed as return for the call.
* @param <T> the Type of the return, use byte[] to skip serialization.
* @param <R> The Type of the request, use byte[] to skip serialization.
* @return A Mono Plan of type clazz.
*/
<T, K> Mono<T> invokeService(String verb, String appId, String method, K request, Class<T> clazz);
<T, R> Mono<T> invokeService(Verb verb, String appId, String method, R request, Map<String, String> metadata, Class<T> clazz);
/**
* Invoke a service
* Invoke a service without input, using serialization for response.
*
* @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value.
* @param appId The Application ID where the service is
* @param appId The Application ID where the service is.
* @param method The actual Method to be call in the application.
* @param request The request to be sent to invoke the service
* @param <T> the Type of the requet
* @return a Mono plan of type Void
* @param metadata Metadata (in GRPC) or headers (in HTTP) to be send in request.
* @param clazz the Type needed as return for the call.
* @param <T> the Type of the return, use byte[] to skip serialization.
* @return A Mono plan of type clazz.
*/
<T> Mono<Void> invokeService(String verb, String appId, String method, T request);
<T> Mono<T> invokeService(Verb verb, String appId, String method, Map<String, String> metadata, Class<T> clazz);
/**
* Creating a Binding
* Invoke a service with void response, using serialization.
*
* @param name The name of the biding to call
* @param request The request needed for the binding
* @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value.
* @param appId The Application ID where the service is.
* @param method The actual Method to be call in the application.
* @param request The request to be sent to invoke the service.
* @param metadata Metadata (in GRPC) or headers (in HTTP) to be send in request.
* @param <R> The Type of the request, use byte[] to skip serialization.
* @return A Mono plan for Void.
*/
<R> Mono<Void> invokeService(Verb verb, String appId, String method, R request, Map<String, String> metadata);
/**
* Invoke a service without input and void response.
*
* @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value.
* @param appId The Application ID where the service is.
* @param method The actual Method to be call in the application.
* @param metadata Metadata (in GRPC) or headers (in HTTP) to be send in request.
* @return A Mono plan for Void.
*/
Mono<Void> invokeService(Verb verb, String appId, String method, Map<String, String> metadata);
/**
* Invoke a service without serialization.
*
* @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value.
* @param appId The Application ID where the service is.
* @param method The actual Method to be call in the application.
* @param request The request to be sent to invoke the service
* @param metadata Metadata (in GRPC) or headers (in HTTP) to be send in request.
* @return A Mono plan of byte[].
*/
Mono<byte[]> invokeService(Verb verb, String appId, String method, byte[] request, Map<String, String> metadata);
/**
* Creating a Binding.
*
* @param name The name of the biding to call.
* @param request The request needed for the binding.
* @param <T> The type of the request.
* @return a Mono plan of type Void
*/
@ -78,44 +117,44 @@ public interface DaprClient {
/**
* Retrieve a State based on their key.
*
* @param state The key of the State to be retrieved
* @param state The key of the State to be retrieved.
* @param stateOptions
* @param clazz the Type of State needed as return.
* @param <T> the Type of the return
* @param <K> The Type of the key of the State
* @return A Mono Plan for the requested State
* @param <T> the Type of the return.
* @param <K> The Type of the key of the State.
* @return A Mono Plan for the requested State.
*/
<T, K> Mono<T> getState(StateKeyValue<K> state, StateOptions stateOptions, Class<T> clazz);
/**
* Save/Update a list of states.
*
* @param states the States to be saved
* @param options the Options to use for each state
* @param <T> the Type of the State
* @return a Mono plan of type Void
* @param states the States to be saved.
* @param options the Options to use for each state.
* @param <T> the Type of the State.
* @return a Mono plan of type Void.
*/
<T> Mono<Void> saveStates(List<StateKeyValue<T>> states, StateOptions options);
/**
* Save/Update a state
* Save/Update a state.
*
* @param key the key of the state
* @param etag the etag to be used
* @param value the value of the state
* @param options the Options to use for each state
* @param <T> the Type of the State
* @return a Mono plan of type Void
* @param key the key of the state.
* @param etag the etag to be used.
* @param value the value of the state.
* @param options the Options to use for each state.
* @param <T> the Type of the State.
* @return a Mono plan of type Void.
*/
<T> Mono<Void> saveState(String key, String etag, T value, StateOptions options);
/**
* Delete a state
* Delete a state.
*
* @param state The key of the State to be removed
* @param options The options of the state
* @param <T> The Type of the key of the State
* @return a Mono plan of type Void
* @param state The key of the State to be removed.
* @param options The options of the state.
* @param <T> The Type of the key of the State.
* @return a Mono plan of type Void.
*/
<T> Mono<Void> deleteState(StateKeyValue<T> state, StateOptions options);

View File

@ -5,16 +5,19 @@
package io.dapr.client;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import io.dapr.DaprGrpc;
import io.dapr.DaprProtos;
import io.dapr.client.domain.StateKeyValue;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.Verb;
import io.dapr.utils.ObjectSerializer;
import reactor.core.publisher.Mono;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.CompletableFuture;
/**
* An adapter for the GRPC Client.
@ -30,6 +33,7 @@ class DaprClientGrpcAdapter implements DaprClient {
* @see io.dapr.DaprGrpc.DaprFutureStub
*/
private DaprGrpc.DaprFutureStub client;
/**
* A utitlity class for serialize and deserialize the messages sent and retrived by the client.
*/
@ -87,20 +91,20 @@ class DaprClientGrpcAdapter implements DaprClient {
* {@inheritDoc}
*/
@Override
public <T, K> Mono<T> invokeService(String verb, String appId, String method, K request, Class<T> clazz) {
public <T, R> Mono<T> invokeService(Verb verb, String appId, String method, R request, Map<String, String> metadata, Class<T> clazz) {
try {
Map<String, String> mapMessage = new HashMap<>();
mapMessage.put("Id", appId);
mapMessage.put("Method", verb);
mapMessage.put("Data", objectSerializer.serializeString(request));
DaprProtos.InvokeServiceEnvelope.Builder envelopeBuilder = DaprProtos.InvokeServiceEnvelope.newBuilder();
envelopeBuilder.setId(appId);
envelopeBuilder.setMethod(verb.toString());
envelopeBuilder.setData(Any.parseFrom(objectSerializer.serialize(request)));
envelopeBuilder.getMetadataMap().putAll(metadata);
DaprProtos.InvokeServiceEnvelope envelope =
DaprProtos.InvokeServiceEnvelope.parseFrom(objectSerializer.serialize(mapMessage));
DaprProtos.InvokeServiceEnvelope envelope = envelopeBuilder.build();
ListenableFuture<DaprProtos.InvokeServiceResponseEnvelope> futureResponse =
client.invokeService(envelope);
return Mono.just(futureResponse).flatMap(f -> {
try {
return Mono.just(objectSerializer.deserialize(f.get().getData().getValue().toStringUtf8(), clazz));
return Mono.just(objectSerializer.deserialize(f.get().getData().toByteArray(), clazz));
} catch (Exception ex) {
return Mono.error(ex);
}
@ -112,13 +116,35 @@ class DaprClientGrpcAdapter implements DaprClient {
}
/**
* Operation not supported for GRPC
*
* TODO: Implement this since this IS supported.
* @throws UnsupportedOperationException every time is called.
* {@inheritDoc}
*/
public <T> Mono<Void> invokeService(String verb, String appId, String method, T request) {
return Mono.error(new UnsupportedOperationException("Operation not supported for GRPC"));
@Override
public <T> Mono<T> invokeService(Verb verb, String appId, String method, Map<String, String> metadata, Class<T> clazz) {
return this.invokeService(verb, appId, method, null, null, clazz);
}
/**
* {@inheritDoc}
*/
@Override
public <R> Mono<Void> invokeService(Verb verb, String appId, String method, R request, Map<String, String> metadata) {
return this.invokeService(verb, appId, method, request, metadata, byte[].class).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> invokeService(Verb verb, String appId, String method, Map<String, String> metadata) {
return this.invokeService(verb, appId, method, null, metadata, byte[].class).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<byte[]> invokeService(Verb verb, String appId, String method, byte[] request, Map<String, String> metadata) {
return this.invokeService(verb, appId, method, request, metadata, byte[].class);
}
/**

View File

@ -1,7 +1,12 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.client;
import io.dapr.client.domain.StateKeyValue;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.Verb;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.Constants;
import io.dapr.utils.ObjectSerializer;
@ -14,7 +19,7 @@ import java.util.List;
import java.util.Map;
/**
* An adapter for the GRPC Client.
* An adapter for the HTTP Client.
*
* @see io.dapr.client.DaprHttp
* @see io.dapr.client.DaprClient
@ -75,24 +80,21 @@ public class DaprClientHttpAdapter implements DaprClient {
* {@inheritDoc}
*/
@Override
public <T, K> Mono<T> invokeService(String verb, String appId, String method, K request, Class<T> clazz) {
public <T, R> Mono<T> invokeService(Verb verb, String appId, String method, R request, Map<String, String> metadata, Class<T> clazz) {
try {
if (verb == null || verb.trim().isEmpty()) {
throw new DaprException("500", "App Id cannot be null or empty.");
}
DaprHttp.HttpMethods httMethod = DaprHttp.HttpMethods.valueOf(verb.toUpperCase());
if (httMethod == null) {
throw new DaprException("405", "HTTP Method not allowed.");
if (verb == null) {
throw new DaprException("500", "Verb cannot be null.");
}
String httMethod = verb.toString();
if (appId == null || appId.trim().isEmpty()) {
throw new DaprException("500", "App Id cannot be null or empty.");
}
if (method == null || method.trim().isEmpty()) {
throw new DaprException("500", "App Id cannot be null or empty.");
throw new DaprException("500", "Method name cannot be null or empty.");
}
String path = String.format("/invoke/%s/method/%s", appId, method);
String path = String.format("%s/%s/method/%s", Constants.INVOKE_PATH, appId, method);
byte[] serializedRequestBody = objectSerializer.serialize(request);
return this.client.invokeAPI(httMethod.name(), path, serializedRequestBody, null)
return this.client.invokeAPI(httMethod, path, serializedRequestBody, metadata)
.flatMap(r -> {
try {
return Mono.just(objectSerializer.deserialize(r, clazz));
@ -109,27 +111,32 @@ public class DaprClientHttpAdapter implements DaprClient {
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> invokeService(String verb, String appId, String method, T request) {
try {
if (verb == null || verb.trim().isEmpty()) {
throw new DaprException("500", "App Id cannot be null or empty.");
}
DaprHttp.HttpMethods httMethod = DaprHttp.HttpMethods.valueOf(verb.toUpperCase());
if (httMethod == null) {
throw new DaprException("405", "HTTP Method not allowed.");
}
if (appId == null || appId.trim().isEmpty()) {
throw new DaprException("500", "App Id cannot be null or empty.");
}
if (method == null || method.trim().isEmpty()) {
throw new DaprException("500", "Method to invoke cannot be null or empty.");
}
String path = String.format("/invoke/%s/method/%s", appId, method);
byte[] serializedRequestBody = objectSerializer.serialize(request);
return this.client.invokeAPI(httMethod.name(), path, serializedRequestBody, null).then();
} catch (Exception ex) {
return Mono.error(ex);
}
public <T> Mono<T> invokeService(Verb verb, String appId, String method, Map<String, String> metadata, Class<T> clazz) {
return this.invokeService(verb, appId, method, null, null, clazz);
}
/**
* {@inheritDoc}
*/
@Override
public <R> Mono<Void> invokeService(Verb verb, String appId, String method, R request, Map<String, String> metadata) {
return this.invokeService(verb, appId, method, request, metadata, byte[].class).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> invokeService(Verb verb, String appId, String method, Map<String, String> metadata) {
return this.invokeService(verb, appId, method, null, metadata, byte[].class).then();
}
/**
* {@inheritDoc}
*/
@Override
public Mono<byte[]> invokeService(Verb verb, String appId, String method, byte[] request, Map<String, String> metadata) {
return this.invokeService(verb, appId, method, request, metadata, byte[].class);
}
/**

View File

@ -0,0 +1,137 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.client.domain;
import java.util.Arrays;
import java.util.Objects;
/**
* A cloud event in Dapr.
*/
public final class CloudEventEnvelope {
/**
* Identifier of the message being processed.
*/
private final String id;
/**
* Event's source.
*/
private final String source;
/**
* Envelope type.
*/
private final String type;
/**
* Version of the specification.
*/
private final String specversion;
/**
* Type of the data's content.
*/
private final String datacontenttype;
/**
* Raw input payload.
*/
private final byte[] data;
/**
* Instantiates a new input request.
* @param id Identifier of the message being processed.
* @param source Source for this event.
* @param type Type of event.
* @param specversion Version of the event spec.
* @param datacontenttype Type of the payload.
* @param data Payload.
*/
public CloudEventEnvelope(
String id,
String source,
String type,
String specversion,
String datacontenttype,
byte[] data) {
this.id = id;
this.source = source;
this.type = type;
this.specversion = specversion;
this.datacontenttype = datacontenttype;
this.data = data;
}
/**
* Gets the identifier of the message being processed.
* @return Identifier of the message being processed.
*/
public String getId() {
return id;
}
/**
* Gets the source for this event.
* @return Source for this event.
*/
public String getSource() {
return source;
}
/**
* Gets the type of event.
* @return Type of event.
*/
public String getType() {
return type;
}
/**
* Gets the version of the event spec.
* @return Version of the event spec.
*/
public String getSpecversion() {
return specversion;
}
/**
* Gets the type of the payload.
* @return Type of the payload.
*/
public String getDatacontenttype() {
return datacontenttype;
}
/**
* Gets the payload
* @return Payload
*/
public byte[] getData() {
return data;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CloudEventEnvelope that = (CloudEventEnvelope) o;
return Objects.equals(id, that.id) &&
Objects.equals(source, that.source) &&
Objects.equals(type, that.type) &&
Objects.equals(specversion, that.specversion) &&
Objects.equals(datacontenttype, that.datacontenttype) &&
Arrays.equals(data, that.data);
}
@Override
public int hashCode() {
int result = Objects.hash(id, source, type, specversion, datacontenttype);
result = 31 * result + Arrays.hashCode(data);
return result;
}
}

View File

@ -1,3 +1,7 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.client.domain;
public class StateKeyValue<T> {

View File

@ -1,3 +1,7 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.client.domain;
public class StateOptions {

View File

@ -0,0 +1,15 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.client.domain;
/**
* Verbs used to invoke methods in Dapr.
*/
public enum Verb {
GET,
PUT,
POST,
DELETE
}

View File

@ -4,8 +4,6 @@
*/
package io.dapr.exceptions;
import java.io.IOException;
/**
* A Dapr's specific exception.
*/

View File

@ -5,17 +5,11 @@
package io.dapr.runtime;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.domain.CloudEventEnvelope;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.ObjectSerializer;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -27,11 +21,6 @@ import java.util.stream.Collectors;
*/
public final class Dapr implements DaprRuntime {
/**
* Empty response.
*/
private static final Mono<byte[]> EMPTY_BYTES_ASYNC = Mono.just(new byte[0]);
/**
* Singleton instance for this class.
*/
@ -45,7 +34,7 @@ public final class Dapr implements DaprRuntime {
/**
* Topics, methods and binding handles.
*/
private final Map<String, Function<Message, Mono<byte[]>>> handlers = Collections.synchronizedMap(new HashMap<>());
private final Map<String, Function<HandleRequest, Mono<byte[]>>> handlers = Collections.synchronizedMap(new HashMap<>());
/**
* Private constructor to keep it singleton.
@ -87,7 +76,7 @@ public final class Dapr implements DaprRuntime {
*/
@Override
public void subscribeToTopic(String topic, TopicListener listener) {
this.handlers.putIfAbsent(topic, new TopicHandler(listener));
this.handlers.putIfAbsent(topic, new TopicHandler(this.serializer, listener));
}
/**
@ -103,19 +92,16 @@ public final class Dapr implements DaprRuntime {
*/
@Override
public Mono<byte[]> handleInvocation(String name, byte[] payload, Map<String, String> metadata) {
Function<Message, Mono<byte[]>> handler = this.handlers.get(name);
Function<HandleRequest, Mono<byte[]>> handler = this.handlers.get(name);
if (handler == null) {
return Mono.error(new DaprException("INVALID_METHOD_OR_TOPIC", "Did not find handler for : " + (name == null ? "" : name)));
}
try {
Message message = this.serializer.deserialize(payload, Message.class);
if (message == null) {
return EMPTY_BYTES_ASYNC;
}
message.setMetadata(metadata);
return handler.apply(message).switchIfEmpty(EMPTY_BYTES_ASYNC);
HandleRequest request = new HandleRequest(name, payload, metadata);
Mono<byte[]> response = handler.apply(request);
return response;
} catch (Exception e) {
// Handling exception in user code by propagating up via Mono.
return Mono.error(e);
@ -123,87 +109,42 @@ public final class Dapr implements DaprRuntime {
}
/**
* Internal class to encapsulate a request message.
* Class used to pass-through the handler input.
*/
public static final class Message {
private static final class HandleRequest {
/**
* Identifier of the message being processed.
* Name of topic/method/binding being handled.
*/
private String id;
private final String name;
/**
* Type of the input payload.
* Payload received.
*/
private String datacontenttype;
private final byte[] payload;
/**
* Raw input payload.
* Metadata received.
*/
private byte[] data;
private final Map<String, String> metadata;
/**
* Headers (or metadata).
* Instantiates a new handle request.
* @param name Name of topic/method/binding being handled.
* @param payload Payload received.
* @param metadata Metadata received.
*/
private Map<String, String> metadata;
/**
* Instantiates a new input request (useful for JSON deserialization).
*/
public Message() {
}
/**
* Instantiates a new input request.
* @param id Identifier of the message being processed.
* @param datacontenttype Type of the input payload.
* @param data Input body.
* @param metadata Headers (or metadata) for the call.
*/
public Message(String id, String datacontenttype, byte[] data, Map<String, String> metadata) {
this.id = id;
this.datacontenttype = datacontenttype;
this.data = data;
private HandleRequest(String name, byte[] payload, Map<String, String> metadata) {
this.name = name;
this.payload = payload;
this.metadata = metadata == null ? null : Collections.unmodifiableMap(metadata);
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
public String getDatacontenttype() {
return datacontenttype;
}
public void setDatacontenttype(String datacontenttype) {
this.datacontenttype = datacontenttype;
}
public Map<String, String> getMetadata() {
return metadata;
}
public void setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}
}
/**
* Internal class to handle a method call.
*/
private static final class MethodHandler implements Function<Message, Mono<byte[]>> {
private static final class MethodHandler implements Function<HandleRequest, Mono<byte[]>> {
/**
* User-provided listener.
@ -224,26 +165,37 @@ public final class Dapr implements DaprRuntime {
* @return Raw output payload or empty.
*/
@Override
public Mono<byte[]> apply(Message r) {
return listener.process(r.data, r.metadata);
public Mono<byte[]> apply(HandleRequest r) {
try {
return listener.process(r.payload, r.metadata);
} catch (Exception e) {
return Mono.error(e);
}
}
}
/**
* Internal class to handle a topic message delivery.
*/
private static final class TopicHandler implements Function<Message, Mono<byte[]>> {
private static final class TopicHandler implements Function<HandleRequest, Mono<byte[]>> {
/**
* User-provided listener.
*/
private final TopicListener listener;
/**
* Serializer/deserializer.
*/
private final ObjectSerializer serializer;
/**
* Instantiates a new topic handler.
* @param serializer Useful for object serialization.
* @param listener Callback to be executed on a given message.
*/
private TopicHandler(TopicListener listener) {
private TopicHandler(ObjectSerializer serializer, TopicListener listener) {
this.serializer = serializer;
this.listener = listener;
}
@ -253,8 +205,17 @@ public final class Dapr implements DaprRuntime {
* @return Always empty response.
*/
@Override
public Mono<byte[]> apply(Message r) {
return listener.process(r.id, r.datacontenttype, r.data, r.metadata).then(Mono.empty());
public Mono<byte[]> apply(HandleRequest r) {
try {
CloudEventEnvelope message = this.serializer.deserialize(r.payload, CloudEventEnvelope.class);
if (message == null) {
return Mono.empty();
}
return listener.process(message, r.metadata).then(Mono.empty());
} catch (Exception e) {
return Mono.error(e);
}
}
}
}

View File

@ -5,6 +5,7 @@
package io.dapr.runtime;
import com.fasterxml.jackson.core.JsonProcessingException;
import reactor.core.publisher.Mono;
import java.util.Map;
@ -19,7 +20,8 @@ public interface MethodListener {
* @param data Raw input payload.
* @param metadata Header (or metadata).
* @return Raw response or empty.
* @throws Exception Any exception from user code.
*/
Mono<byte[]> process(byte[] data, Map<String, String> metadata);
Mono<byte[]> process(byte[] data, Map<String, String> metadata) throws Exception;
}

View File

@ -5,6 +5,7 @@
package io.dapr.runtime;
import io.dapr.client.domain.CloudEventEnvelope;
import reactor.core.publisher.Mono;
import java.util.Map;
@ -16,12 +17,11 @@ public interface TopicListener {
/**
* Processes a given topic's message delivery.
* @param messageId Message's identifier.
* @param dataType Type of the input data.
* @param data Input data.
* @param message Message event to be processed.
* @param metadata Headers (or metadata).
* @return Empty response.
* @throws Exception Any exception from user code.
*/
Mono<Void> process(String messageId, String dataType, byte[] data, Map<String, String> metadata);
Mono<Void> process(CloudEventEnvelope message, Map<String, String> metadata) throws Exception;
}

View File

@ -74,6 +74,11 @@ public final class Constants {
*/
public static final String ACTOR_TIMER_RELATIVE_URL_FORMAT = ACTORS_BASE_URL + "/%s/%s/timers/%s";
/**
* Base path to invoke methods.
*/
public static final String INVOKE_PATH = API_VERSION + "/invoke";
/**
* Invoke Publish Path
*/

View File

@ -6,12 +6,10 @@ package io.dapr.utils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.runtime.Dapr;
import io.dapr.client.domain.CloudEventEnvelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -25,7 +23,7 @@ import java.util.Map;
public class ObjectSerializer {
/**
* Shared Json Factory as per Jackson's documentation, used only for this class.
* Shared Json Factory as per Jackson's documentation.
*/
protected static final JsonFactory JSON_FACTORY = new JsonFactory();
@ -113,8 +111,8 @@ public class ObjectSerializer {
return (T) null;
}
if (clazz == Dapr.Message.class) {
return (T) this.deserializeTopicMessage(value);
if (clazz == CloudEventEnvelope.class) {
return (T) this.deserializeCloudEventEnvelope(value);
}
if (clazz == String.class) {
@ -145,7 +143,7 @@ public class ObjectSerializer {
* @return Message (can be null if input is null)
* @throws IOException If cannot parse.
*/
private Dapr.Message deserializeTopicMessage(Object payload) throws IOException {
private CloudEventEnvelope deserializeCloudEventEnvelope(Object payload) throws IOException {
if (payload == null) {
return null;
}
@ -166,9 +164,24 @@ public class ObjectSerializer {
id = node.get("id").asText();
}
String dataType = null;
String source = null;
if (node.has("source") && !node.get("source").isNull()) {
source = node.get("source").asText();
}
String type = null;
if (node.has("type") && !node.get("type").isNull()) {
type = node.get("type").asText();
}
String specversion = null;
if (node.has("specversion") && !node.get("specversion").isNull()) {
specversion = node.get("specversion").asText();
}
String datacontenttype = null;
if (node.has("datacontenttype") && !node.get("datacontenttype").isNull()) {
dataType = node.get("datacontenttype").asText();
datacontenttype = node.get("datacontenttype").asText();
}
byte[] data = null;
@ -180,7 +193,7 @@ public class ObjectSerializer {
}
}
return new Dapr.Message(id, dataType, data, null);
return new CloudEventEnvelope(id, source, type, specversion, datacontenttype, data);
}
/**

View File

@ -5,17 +5,21 @@
package io.dapr.runtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.*;
import io.dapr.runtime.Dapr;
import io.dapr.runtime.DaprRuntime;
import io.dapr.runtime.TopicListener;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientTestBuilder;
import io.dapr.client.DaprHttpStub;
import io.dapr.client.domain.CloudEventEnvelope;
import io.dapr.client.domain.Verb;
import io.dapr.utils.Constants;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
@ -27,12 +31,16 @@ import static org.mockito.Mockito.*;
public class DaprRuntimeTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
protected static final JsonFactory JSON_FACTORY = new JsonFactory();
private static final String TYPE_PLAIN_TEXT = "plain/text";
private static final String TOPIC_NAME = "mytopic";
private static final String APP_ID = "myappid";
private static final String METHOD_NAME = "mymethod";
private final DaprRuntime daprRuntime = Dapr.getInstance();
@Before
@ -49,75 +57,166 @@ public class DaprRuntimeTest {
Assert.assertTrue(this.daprRuntime.getSubscribedTopics().isEmpty());
TopicListener listener = mock(TopicListener.class);
when(listener.process(any(), any())).thenReturn(Mono.empty());
this.daprRuntime.subscribeToTopic(TOPIC_NAME, listener);
verify(listener, never()).process(any(), any(), any(), any());
verify(listener, never()).process(any(), any());
Dapr.Message[] messages = new Dapr.Message[]{
new Dapr.Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
generatePayload(),
generateSingleMetadata()),
new Dapr.Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
new byte[0],
generateSingleMetadata()),
new Dapr.Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
null,
generateSingleMetadata()),
new Dapr.Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
generatePayload(),
null),
new Dapr.Message(
"",
TYPE_PLAIN_TEXT,
generatePayload(),
generateSingleMetadata()),
new Dapr.Message(
null,
TYPE_PLAIN_TEXT,
generatePayload(),
generateSingleMetadata()),
new Dapr.Message(
generateMessageId(),
"",
generatePayload(),
generateSingleMetadata()),
new Dapr.Message(
generateMessageId(),
null,
generatePayload(),
generateSingleMetadata())
Message[] messages = new Message[]{
new Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
generatePayload(),
generateSingleMetadata()),
new Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
new byte[0],
generateSingleMetadata()),
new Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
null,
generateSingleMetadata()),
new Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
generatePayload(),
null),
new Message(
"",
TYPE_PLAIN_TEXT,
generatePayload(),
generateSingleMetadata()),
new Message(
null,
TYPE_PLAIN_TEXT,
generatePayload(),
generateSingleMetadata()),
new Message(
generateMessageId(),
"",
generatePayload(),
generateSingleMetadata()),
new Message(
generateMessageId(),
null,
generatePayload(),
generateSingleMetadata())
};
DaprHttpStub daprHttp = mock(DaprHttpStub.class);
DaprClient client = DaprClientTestBuilder.buildHttpClient(daprHttp);
for (Dapr.Message message : messages) {
for (Message message : messages) {
when(daprHttp.invokeAPI(
eq("POST"),
eq(Constants.PUBLISH_PATH + "/" + TOPIC_NAME),
eq(message.getData()),
eq(null)))
.thenAnswer(invocationOnMock -> {
this.daprRuntime.handleInvocation(
TOPIC_NAME, OBJECT_MAPPER.writeValueAsBytes(message), message.getMetadata());
return Mono.empty();
});
eq("POST"),
eq(Constants.PUBLISH_PATH + "/" + TOPIC_NAME),
eq(message.data),
eq(null)))
.thenAnswer(invocationOnMock -> this.daprRuntime.handleInvocation(
TOPIC_NAME,
this.serialize(message),
message.metadata).then());
client.publishEvent(TOPIC_NAME, message.getData()).block();
client.publishEvent(TOPIC_NAME, message.data).block();
verify(listener, times(1))
.process(eq(message.getId()), eq(message.getDatacontenttype()), eq(message.getData()), eq(message.getMetadata()));
CloudEventEnvelope envelope = new CloudEventEnvelope(
message.id,
null,
null,
null,
message.datacontenttype,
message.data
);
verify(listener, times(1)).process(eq(envelope), eq(message.metadata));
}
verify(listener, times(messages.length)).process(any(), any(), any(), any());
verify(listener, times(messages.length)).process(any(), any());
}
@Test
public void invokeHappyCase() throws Exception {
MethodListener listener = mock(MethodListener.class);
this.daprRuntime.registerServiceMethod(METHOD_NAME, listener);
verify(listener, never()).process(any(), any());
Message[] messages = new Message[]{
new Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
generatePayload(),
generateSingleMetadata()),
new Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
new byte[0],
generateSingleMetadata()),
new Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
null,
generateSingleMetadata()),
new Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
generatePayload(),
null),
new Message(
"",
TYPE_PLAIN_TEXT,
generatePayload(),
generateSingleMetadata()),
new Message(
null,
TYPE_PLAIN_TEXT,
generatePayload(),
generateSingleMetadata()),
new Message(
generateMessageId(),
"",
generatePayload(),
generateSingleMetadata()),
new Message(
generateMessageId(),
null,
generatePayload(),
generateSingleMetadata())
};
DaprHttpStub daprHttp = mock(DaprHttpStub.class);
DaprClient client = DaprClientTestBuilder.buildHttpClient(daprHttp);
for (Message message : messages) {
byte[] expectedResponse = message.id == null ? new byte[0] : message.id.getBytes(StandardCharsets.UTF_8);
when(listener.process(eq(message.data), eq(message.metadata)))
.then(x -> Mono.just(expectedResponse));
when(daprHttp.invokeAPI(
eq("POST"),
eq(Constants.INVOKE_PATH + "/" + APP_ID + "/method/" + METHOD_NAME),
eq(message.data),
any()))
.thenAnswer(x ->
this.daprRuntime.handleInvocation(
METHOD_NAME,
message.data,
message.metadata)
.map(r -> new String(r, StandardCharsets.UTF_8)));
Mono<byte[]> response = client.invokeService(Verb.POST, APP_ID, METHOD_NAME, message.data, message.metadata);
Assert.assertEquals(
new String(expectedResponse, StandardCharsets.UTF_8),
new String(response.block(), StandardCharsets.UTF_8));
verify(listener, times(1))
.process(eq(message.data), eq(message.metadata));
}
verify(listener, times(messages.length)).process(any(), any());
}
@Test(expected = RuntimeException.class)
@ -126,27 +225,29 @@ public class DaprRuntimeTest {
Assert.assertTrue(this.daprRuntime.getSubscribedTopics().isEmpty());
TopicListener listener = mock(TopicListener.class);
when(listener.process(any(), any(), any(), any()))
when(listener.process(any(), any()))
.thenReturn(Mono.error(new RuntimeException()));
this.daprRuntime.subscribeToTopic(TOPIC_NAME, listener);
Dapr.Message message = new Dapr.Message(
Message message = new Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
generatePayload(),
generateSingleMetadata());
Mono<byte[]> result = this.daprRuntime
.handleInvocation(TOPIC_NAME, OBJECT_MAPPER.writeValueAsBytes(message), message.getMetadata());
verify(listener, times(1))
.process(
eq(message.getId()),
eq(message.getDatacontenttype()),
eq(message.getData()),
eq(message.getMetadata()));
.handleInvocation(TOPIC_NAME, this.serialize(message), message.metadata);
CloudEventEnvelope envelope = new CloudEventEnvelope(
message.id,
null,
null,
null,
message.datacontenttype,
message.data
);
verify(listener, times(1)).process(eq(envelope), eq(message.metadata));
result.block();
}
@ -159,21 +260,16 @@ public class DaprRuntimeTest {
this.daprRuntime.subscribeToTopic(TOPIC_NAME, listener);
Dapr.Message message = new Dapr.Message(
Message message = new Message(
generateMessageId(),
TYPE_PLAIN_TEXT,
generatePayload(),
generateSingleMetadata());
Mono<byte[]> result = this.daprRuntime
.handleInvocation("UNKNOWN", OBJECT_MAPPER.writeValueAsBytes(message), message.getMetadata());
.handleInvocation("UNKNOWN", serialize(message), message.metadata);
verify(listener, never())
.process(
eq(message.getId()),
eq(message.getDatacontenttype()),
eq(message.getData()),
eq(message.getMetadata()));
verify(listener, never()).process(any(), any());
result.block();
}
@ -189,4 +285,46 @@ public class DaprRuntimeTest {
private static final Map<String, String> generateSingleMetadata() {
return Collections.singletonMap(UUID.randomUUID().toString(), UUID.randomUUID().toString());
}
private static final class Message {
private final String id;
private final String datacontenttype;
private final byte[] data;
private final Map<String, String> metadata;
private Message(String id, String datacontenttype, byte[] data, Map<String, String> metadata) {
this.id = id;
this.datacontenttype = datacontenttype;
this.data = data;
this.metadata = metadata;
}
}
private byte[] serialize(Message message) throws IOException {
if (message == null) {
return null;
}
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(bos);
generator.writeStartObject();
if (message.id != null) {
generator.writeStringField("id", message.id);
}
if (message.datacontenttype != null) {
generator.writeStringField("datacontenttype", message.datacontenttype);
}
if (message.data != null) {
generator.writeBinaryField("data", message.data);
}
generator.writeEndObject();
generator.close();
bos.flush();
return bos.toByteArray();
}
}
}

View File

@ -1,3 +1,8 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.utils;
import org.junit.Test;