diff --git a/examples/src/main/java/io/dapr/examples/invoke/http/DemoService.java b/examples/src/main/java/io/dapr/examples/invoke/http/DemoService.java new file mode 100644 index 000000000..f62d7d5fd --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/invoke/http/DemoService.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.examples.invoke.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import io.dapr.DaprClientGrpc; +import io.dapr.DaprClientProtos; +import io.dapr.runtime.Dapr; +import io.dapr.runtime.MethodListener; +import io.dapr.springboot.DaprApplication; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Map; +import java.util.TimeZone; + +import static io.dapr.examples.DaprExamplesProtos.SayRequest; +import static io.dapr.examples.DaprExamplesProtos.SayResponse; + +/** + * 1. Build and install jars: + * mvn clean install + * 2. Run in server mode: + * dapr run --app-id invokedemo --app-port 3000 --port 3005 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.invoke.http.DemoService -Dexec.args="-p 3000" + */ +public class DemoService { + + /** + * Shared Json serializer/deserializer. + */ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /** + * Format to output date and time. + */ + private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + + /** + * Starts the service. + * @param args Expects the port: -p PORT + * @throws Exception If cannot start service. + */ + public static void main(String[] args) throws Exception { + Options options = new Options(); + options.addRequiredOption("p", "port", true, "Port to listen to."); + + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(options, args); + + // If port string is not valid, it will throw an exception. + int port = Integer.parseInt(cmd.getOptionValue("port")); + + Dapr.getInstance().registerServiceMethod("say", (data, metadata) -> { + String message = data == null ? "" : new String(data, StandardCharsets.UTF_8); + + Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT")); + String utcNowAsString = DATE_FORMAT.format(utcNow.getTime()); + + String metadataString = metadata == null ? "" : OBJECT_MAPPER.writeValueAsString(metadata); + + // Handles the request by printing message. + System.out.println( + "Server: " + message + " @ " + utcNowAsString + " and metadata: " + metadataString); + + return Mono.just(utcNowAsString.getBytes(StandardCharsets.UTF_8)); + }); + + DaprApplication.start(port); + } +} diff --git a/examples/src/main/java/io/dapr/examples/invoke/http/InvokeClient.java b/examples/src/main/java/io/dapr/examples/invoke/http/InvokeClient.java new file mode 100644 index 000000000..f9123594f --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/invoke/http/InvokeClient.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.examples.invoke.http; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.domain.Verb; + +/** + * 1. Build and install jars: + * mvn clean install + * 2. Send messages to the server: + * dapr run --port 3006 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.invoke.http.InvokeClient -Dexec.args="'message one' 'message two'" + */ +public class InvokeClient { + + /** + * Identifier in Dapr for the service this client will invoke. + */ + private static final String SERVICE_APP_ID = "invokedemo"; + + /** + * Starts the invoke client. + * @param args Messages to be sent as request for the invoke API. + */ + public static void main(String[] args) { + DaprClient client = (new DaprClientBuilder()).build(); + for (String message : args) { + client.invokeService(Verb.POST, SERVICE_APP_ID, "say", message, null, String.class).block(); + } + } +} diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java index 3e73900e6..6337a8a10 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java @@ -71,14 +71,10 @@ public class DaprController { } @PostMapping(path = "/{name}") - public Mono invokeMethodOrTopic(@PathVariable("name") String name, - @RequestBody(required = false) byte[] body, - @RequestHeader Map header) { - return Dapr.getInstance().handleInvocation(name, body, header); - } - @PutMapping(path = "/{name}") - public Mono invokeMethodOrTopicViaPut(@PathVariable("name") String name, + @DeleteMapping(path = "/{name}") + @GetMapping(path = "/{name}") + public Mono invokeMethodOrTopic(@PathVariable("name") String name, @RequestBody(required = false) byte[] body, @RequestHeader Map header) { return Dapr.getInstance().handleInvocation(name, body, header); diff --git a/sdk/src/main/java/io/dapr/client/DaprClient.java b/sdk/src/main/java/io/dapr/client/DaprClient.java index afb243be0..db3dd1fb3 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprClient.java @@ -6,6 +6,7 @@ package io.dapr.client; import io.dapr.client.domain.StateKeyValue; import io.dapr.client.domain.StateOptions; +import io.dapr.client.domain.Verb; import reactor.core.publisher.Mono; import java.util.List; @@ -21,55 +22,93 @@ public interface DaprClient { /** * Publish an event. * - * @param topic the topic where the event will be published - * @param event the event to be published - * @param The type of event to be published. - * @return a Mono plan of type Void + * @param topic the topic where the event will be published. + * @param event the event to be published. + * @param The type of event to be published, use byte[] for skipping serialization. + * @return a Mono plan of type Void. */ Mono publishEvent(String topic, T event); /** * Publish an event. * - * @param topic the topic where the event will be published - * @param event the event to be published + * @param topic the topic where the event will be published. + * @param event the event to be published. * @param metadata The metadata for the published event. - * @param The type of event to be published. - * @return a Mono plan of type Void + * @param The type of event to be published, use byte[] for skipping serialization. + * @return a Mono plan of type Void. */ Mono publishEvent(String topic, T event, Map metadata); /** - * Invoke a service + * Invoke a service with all possible parameters, using serialization. * * @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value. - * @param appId The Application ID where the service is + * @param appId The Application ID where the service is. * @param method The actual Method to be call in the application. - * @param request The request to be sent to invoke the service - * @param clazz the Type needed as return for the call - * @param the Type of the return - * @param The Type of the request. - * @return A Mono Plan of type clazz + * @param request The request to be sent to invoke the service. + * @param metadata Metadata (in GRPC) or headers (in HTTP) to be send in request. + * @param clazz the Type needed as return for the call. + * @param the Type of the return, use byte[] to skip serialization. + * @param The Type of the request, use byte[] to skip serialization. + * @return A Mono Plan of type clazz. */ - Mono invokeService(String verb, String appId, String method, K request, Class clazz); + Mono invokeService(Verb verb, String appId, String method, R request, Map metadata, Class clazz); /** - * Invoke a service + * Invoke a service without input, using serialization for response. * * @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value. - * @param appId The Application ID where the service is + * @param appId The Application ID where the service is. * @param method The actual Method to be call in the application. - * @param request The request to be sent to invoke the service - * @param the Type of the requet - * @return a Mono plan of type Void + * @param metadata Metadata (in GRPC) or headers (in HTTP) to be send in request. + * @param clazz the Type needed as return for the call. + * @param the Type of the return, use byte[] to skip serialization. + * @return A Mono plan of type clazz. */ - Mono invokeService(String verb, String appId, String method, T request); + Mono invokeService(Verb verb, String appId, String method, Map metadata, Class clazz); /** - * Creating a Binding + * Invoke a service with void response, using serialization. * - * @param name The name of the biding to call - * @param request The request needed for the binding + * @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value. + * @param appId The Application ID where the service is. + * @param method The actual Method to be call in the application. + * @param request The request to be sent to invoke the service. + * @param metadata Metadata (in GRPC) or headers (in HTTP) to be send in request. + * @param The Type of the request, use byte[] to skip serialization. + * @return A Mono plan for Void. + */ + Mono invokeService(Verb verb, String appId, String method, R request, Map metadata); + + /** + * Invoke a service without input and void response. + * + * @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value. + * @param appId The Application ID where the service is. + * @param method The actual Method to be call in the application. + * @param metadata Metadata (in GRPC) or headers (in HTTP) to be send in request. + * @return A Mono plan for Void. + */ + Mono invokeService(Verb verb, String appId, String method, Map metadata); + + /** + * Invoke a service without serialization. + * + * @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value. + * @param appId The Application ID where the service is. + * @param method The actual Method to be call in the application. + * @param request The request to be sent to invoke the service + * @param metadata Metadata (in GRPC) or headers (in HTTP) to be send in request. + * @return A Mono plan of byte[]. + */ + Mono invokeService(Verb verb, String appId, String method, byte[] request, Map metadata); + + /** + * Creating a Binding. + * + * @param name The name of the biding to call. + * @param request The request needed for the binding. * @param The type of the request. * @return a Mono plan of type Void */ @@ -78,44 +117,44 @@ public interface DaprClient { /** * Retrieve a State based on their key. * - * @param state The key of the State to be retrieved + * @param state The key of the State to be retrieved. * @param stateOptions * @param clazz the Type of State needed as return. - * @param the Type of the return - * @param The Type of the key of the State - * @return A Mono Plan for the requested State + * @param the Type of the return. + * @param The Type of the key of the State. + * @return A Mono Plan for the requested State. */ Mono getState(StateKeyValue state, StateOptions stateOptions, Class clazz); /** * Save/Update a list of states. * - * @param states the States to be saved - * @param options the Options to use for each state - * @param the Type of the State - * @return a Mono plan of type Void + * @param states the States to be saved. + * @param options the Options to use for each state. + * @param the Type of the State. + * @return a Mono plan of type Void. */ Mono saveStates(List> states, StateOptions options); /** - * Save/Update a state + * Save/Update a state. * - * @param key the key of the state - * @param etag the etag to be used - * @param value the value of the state - * @param options the Options to use for each state - * @param the Type of the State - * @return a Mono plan of type Void + * @param key the key of the state. + * @param etag the etag to be used. + * @param value the value of the state. + * @param options the Options to use for each state. + * @param the Type of the State. + * @return a Mono plan of type Void. */ Mono saveState(String key, String etag, T value, StateOptions options); /** - * Delete a state + * Delete a state. * - * @param state The key of the State to be removed - * @param options The options of the state - * @param The Type of the key of the State - * @return a Mono plan of type Void + * @param state The key of the State to be removed. + * @param options The options of the state. + * @param The Type of the key of the State. + * @return a Mono plan of type Void. */ Mono deleteState(StateKeyValue state, StateOptions options); diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java index 526f6850b..a0eea0255 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java @@ -5,16 +5,19 @@ package io.dapr.client; import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.Any; import com.google.protobuf.Empty; import io.dapr.DaprGrpc; import io.dapr.DaprProtos; import io.dapr.client.domain.StateKeyValue; import io.dapr.client.domain.StateOptions; +import io.dapr.client.domain.Verb; import io.dapr.utils.ObjectSerializer; import reactor.core.publisher.Mono; import java.lang.reflect.Field; import java.util.*; +import java.util.concurrent.CompletableFuture; /** * An adapter for the GRPC Client. @@ -30,6 +33,7 @@ class DaprClientGrpcAdapter implements DaprClient { * @see io.dapr.DaprGrpc.DaprFutureStub */ private DaprGrpc.DaprFutureStub client; + /** * A utitlity class for serialize and deserialize the messages sent and retrived by the client. */ @@ -87,20 +91,20 @@ class DaprClientGrpcAdapter implements DaprClient { * {@inheritDoc} */ @Override - public Mono invokeService(String verb, String appId, String method, K request, Class clazz) { + public Mono invokeService(Verb verb, String appId, String method, R request, Map metadata, Class clazz) { try { - Map mapMessage = new HashMap<>(); - mapMessage.put("Id", appId); - mapMessage.put("Method", verb); - mapMessage.put("Data", objectSerializer.serializeString(request)); + DaprProtos.InvokeServiceEnvelope.Builder envelopeBuilder = DaprProtos.InvokeServiceEnvelope.newBuilder(); + envelopeBuilder.setId(appId); + envelopeBuilder.setMethod(verb.toString()); + envelopeBuilder.setData(Any.parseFrom(objectSerializer.serialize(request))); + envelopeBuilder.getMetadataMap().putAll(metadata); - DaprProtos.InvokeServiceEnvelope envelope = - DaprProtos.InvokeServiceEnvelope.parseFrom(objectSerializer.serialize(mapMessage)); + DaprProtos.InvokeServiceEnvelope envelope = envelopeBuilder.build(); ListenableFuture futureResponse = client.invokeService(envelope); return Mono.just(futureResponse).flatMap(f -> { try { - return Mono.just(objectSerializer.deserialize(f.get().getData().getValue().toStringUtf8(), clazz)); + return Mono.just(objectSerializer.deserialize(f.get().getData().toByteArray(), clazz)); } catch (Exception ex) { return Mono.error(ex); } @@ -112,13 +116,35 @@ class DaprClientGrpcAdapter implements DaprClient { } /** - * Operation not supported for GRPC - * - * TODO: Implement this since this IS supported. - * @throws UnsupportedOperationException every time is called. + * {@inheritDoc} */ - public Mono invokeService(String verb, String appId, String method, T request) { - return Mono.error(new UnsupportedOperationException("Operation not supported for GRPC")); + @Override + public Mono invokeService(Verb verb, String appId, String method, Map metadata, Class clazz) { + return this.invokeService(verb, appId, method, null, null, clazz); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeService(Verb verb, String appId, String method, R request, Map metadata) { + return this.invokeService(verb, appId, method, request, metadata, byte[].class).then(); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeService(Verb verb, String appId, String method, Map metadata) { + return this.invokeService(verb, appId, method, null, metadata, byte[].class).then(); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeService(Verb verb, String appId, String method, byte[] request, Map metadata) { + return this.invokeService(verb, appId, method, request, metadata, byte[].class); } /** diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java b/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java index 80b8d9940..99669d306 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java @@ -1,7 +1,12 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ package io.dapr.client; import io.dapr.client.domain.StateKeyValue; import io.dapr.client.domain.StateOptions; +import io.dapr.client.domain.Verb; import io.dapr.exceptions.DaprException; import io.dapr.utils.Constants; import io.dapr.utils.ObjectSerializer; @@ -14,7 +19,7 @@ import java.util.List; import java.util.Map; /** - * An adapter for the GRPC Client. + * An adapter for the HTTP Client. * * @see io.dapr.client.DaprHttp * @see io.dapr.client.DaprClient @@ -75,24 +80,21 @@ public class DaprClientHttpAdapter implements DaprClient { * {@inheritDoc} */ @Override - public Mono invokeService(String verb, String appId, String method, K request, Class clazz) { + public Mono invokeService(Verb verb, String appId, String method, R request, Map metadata, Class clazz) { try { - if (verb == null || verb.trim().isEmpty()) { - throw new DaprException("500", "App Id cannot be null or empty."); - } - DaprHttp.HttpMethods httMethod = DaprHttp.HttpMethods.valueOf(verb.toUpperCase()); - if (httMethod == null) { - throw new DaprException("405", "HTTP Method not allowed."); + if (verb == null) { + throw new DaprException("500", "Verb cannot be null."); } + String httMethod = verb.toString(); if (appId == null || appId.trim().isEmpty()) { throw new DaprException("500", "App Id cannot be null or empty."); } if (method == null || method.trim().isEmpty()) { - throw new DaprException("500", "App Id cannot be null or empty."); + throw new DaprException("500", "Method name cannot be null or empty."); } - String path = String.format("/invoke/%s/method/%s", appId, method); + String path = String.format("%s/%s/method/%s", Constants.INVOKE_PATH, appId, method); byte[] serializedRequestBody = objectSerializer.serialize(request); - return this.client.invokeAPI(httMethod.name(), path, serializedRequestBody, null) + return this.client.invokeAPI(httMethod, path, serializedRequestBody, metadata) .flatMap(r -> { try { return Mono.just(objectSerializer.deserialize(r, clazz)); @@ -109,27 +111,32 @@ public class DaprClientHttpAdapter implements DaprClient { * {@inheritDoc} */ @Override - public Mono invokeService(String verb, String appId, String method, T request) { - try { - if (verb == null || verb.trim().isEmpty()) { - throw new DaprException("500", "App Id cannot be null or empty."); - } - DaprHttp.HttpMethods httMethod = DaprHttp.HttpMethods.valueOf(verb.toUpperCase()); - if (httMethod == null) { - throw new DaprException("405", "HTTP Method not allowed."); - } - if (appId == null || appId.trim().isEmpty()) { - throw new DaprException("500", "App Id cannot be null or empty."); - } - if (method == null || method.trim().isEmpty()) { - throw new DaprException("500", "Method to invoke cannot be null or empty."); - } - String path = String.format("/invoke/%s/method/%s", appId, method); - byte[] serializedRequestBody = objectSerializer.serialize(request); - return this.client.invokeAPI(httMethod.name(), path, serializedRequestBody, null).then(); - } catch (Exception ex) { - return Mono.error(ex); - } + public Mono invokeService(Verb verb, String appId, String method, Map metadata, Class clazz) { + return this.invokeService(verb, appId, method, null, null, clazz); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeService(Verb verb, String appId, String method, R request, Map metadata) { + return this.invokeService(verb, appId, method, request, metadata, byte[].class).then(); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeService(Verb verb, String appId, String method, Map metadata) { + return this.invokeService(verb, appId, method, null, metadata, byte[].class).then(); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeService(Verb verb, String appId, String method, byte[] request, Map metadata) { + return this.invokeService(verb, appId, method, request, metadata, byte[].class); } /** diff --git a/sdk/src/main/java/io/dapr/client/domain/CloudEventEnvelope.java b/sdk/src/main/java/io/dapr/client/domain/CloudEventEnvelope.java new file mode 100644 index 000000000..6399fa4d2 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/CloudEventEnvelope.java @@ -0,0 +1,137 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.client.domain; + +import java.util.Arrays; +import java.util.Objects; + +/** + * A cloud event in Dapr. + */ +public final class CloudEventEnvelope { + + /** + * Identifier of the message being processed. + */ + private final String id; + + /** + * Event's source. + */ + private final String source; + + /** + * Envelope type. + */ + private final String type; + + /** + * Version of the specification. + */ + private final String specversion; + + /** + * Type of the data's content. + */ + private final String datacontenttype; + + /** + * Raw input payload. + */ + private final byte[] data; + + /** + * Instantiates a new input request. + * @param id Identifier of the message being processed. + * @param source Source for this event. + * @param type Type of event. + * @param specversion Version of the event spec. + * @param datacontenttype Type of the payload. + * @param data Payload. + */ + public CloudEventEnvelope( + String id, + String source, + String type, + String specversion, + String datacontenttype, + byte[] data) { + this.id = id; + this.source = source; + this.type = type; + this.specversion = specversion; + this.datacontenttype = datacontenttype; + this.data = data; + } + + /** + * Gets the identifier of the message being processed. + * @return Identifier of the message being processed. + */ + public String getId() { + return id; + } + + /** + * Gets the source for this event. + * @return Source for this event. + */ + public String getSource() { + return source; + } + + /** + * Gets the type of event. + * @return Type of event. + */ + public String getType() { + return type; + } + + /** + * Gets the version of the event spec. + * @return Version of the event spec. + */ + public String getSpecversion() { + return specversion; + } + + /** + * Gets the type of the payload. + * @return Type of the payload. + */ + public String getDatacontenttype() { + return datacontenttype; + } + + /** + * Gets the payload + * @return Payload + */ + public byte[] getData() { + return data; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CloudEventEnvelope that = (CloudEventEnvelope) o; + return Objects.equals(id, that.id) && + Objects.equals(source, that.source) && + Objects.equals(type, that.type) && + Objects.equals(specversion, that.specversion) && + Objects.equals(datacontenttype, that.datacontenttype) && + Arrays.equals(data, that.data); + } + + @Override + public int hashCode() { + int result = Objects.hash(id, source, type, specversion, datacontenttype); + result = 31 * result + Arrays.hashCode(data); + return result; + } +} diff --git a/sdk/src/main/java/io/dapr/client/domain/StateKeyValue.java b/sdk/src/main/java/io/dapr/client/domain/StateKeyValue.java index b39812df7..599045907 100644 --- a/sdk/src/main/java/io/dapr/client/domain/StateKeyValue.java +++ b/sdk/src/main/java/io/dapr/client/domain/StateKeyValue.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ package io.dapr.client.domain; public class StateKeyValue { diff --git a/sdk/src/main/java/io/dapr/client/domain/StateOptions.java b/sdk/src/main/java/io/dapr/client/domain/StateOptions.java index 6987dc05b..834baaf98 100644 --- a/sdk/src/main/java/io/dapr/client/domain/StateOptions.java +++ b/sdk/src/main/java/io/dapr/client/domain/StateOptions.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ package io.dapr.client.domain; public class StateOptions { diff --git a/sdk/src/main/java/io/dapr/client/domain/Verb.java b/sdk/src/main/java/io/dapr/client/domain/Verb.java new file mode 100644 index 000000000..fff696f31 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/Verb.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ +package io.dapr.client.domain; + +/** + * Verbs used to invoke methods in Dapr. + */ +public enum Verb { + GET, + PUT, + POST, + DELETE +} diff --git a/sdk/src/main/java/io/dapr/exceptions/DaprException.java b/sdk/src/main/java/io/dapr/exceptions/DaprException.java index 7b78279db..35325e465 100644 --- a/sdk/src/main/java/io/dapr/exceptions/DaprException.java +++ b/sdk/src/main/java/io/dapr/exceptions/DaprException.java @@ -4,8 +4,6 @@ */ package io.dapr.exceptions; -import java.io.IOException; - /** * A Dapr's specific exception. */ diff --git a/sdk/src/main/java/io/dapr/runtime/Dapr.java b/sdk/src/main/java/io/dapr/runtime/Dapr.java index 025d37467..4681a357d 100644 --- a/sdk/src/main/java/io/dapr/runtime/Dapr.java +++ b/sdk/src/main/java/io/dapr/runtime/Dapr.java @@ -5,17 +5,11 @@ package io.dapr.runtime; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; +import io.dapr.client.domain.CloudEventEnvelope; import io.dapr.exceptions.DaprException; import io.dapr.utils.ObjectSerializer; import reactor.core.publisher.Mono; -import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -27,11 +21,6 @@ import java.util.stream.Collectors; */ public final class Dapr implements DaprRuntime { - /** - * Empty response. - */ - private static final Mono EMPTY_BYTES_ASYNC = Mono.just(new byte[0]); - /** * Singleton instance for this class. */ @@ -45,7 +34,7 @@ public final class Dapr implements DaprRuntime { /** * Topics, methods and binding handles. */ - private final Map>> handlers = Collections.synchronizedMap(new HashMap<>()); + private final Map>> handlers = Collections.synchronizedMap(new HashMap<>()); /** * Private constructor to keep it singleton. @@ -87,7 +76,7 @@ public final class Dapr implements DaprRuntime { */ @Override public void subscribeToTopic(String topic, TopicListener listener) { - this.handlers.putIfAbsent(topic, new TopicHandler(listener)); + this.handlers.putIfAbsent(topic, new TopicHandler(this.serializer, listener)); } /** @@ -103,19 +92,16 @@ public final class Dapr implements DaprRuntime { */ @Override public Mono handleInvocation(String name, byte[] payload, Map metadata) { - Function> handler = this.handlers.get(name); + Function> handler = this.handlers.get(name); if (handler == null) { return Mono.error(new DaprException("INVALID_METHOD_OR_TOPIC", "Did not find handler for : " + (name == null ? "" : name))); } try { - Message message = this.serializer.deserialize(payload, Message.class); - if (message == null) { - return EMPTY_BYTES_ASYNC; - } - message.setMetadata(metadata); - return handler.apply(message).switchIfEmpty(EMPTY_BYTES_ASYNC); + HandleRequest request = new HandleRequest(name, payload, metadata); + Mono response = handler.apply(request); + return response; } catch (Exception e) { // Handling exception in user code by propagating up via Mono. return Mono.error(e); @@ -123,87 +109,42 @@ public final class Dapr implements DaprRuntime { } /** - * Internal class to encapsulate a request message. + * Class used to pass-through the handler input. */ - public static final class Message { + private static final class HandleRequest { /** - * Identifier of the message being processed. + * Name of topic/method/binding being handled. */ - private String id; + private final String name; /** - * Type of the input payload. + * Payload received. */ - private String datacontenttype; + private final byte[] payload; /** - * Raw input payload. + * Metadata received. */ - private byte[] data; + private final Map metadata; /** - * Headers (or metadata). + * Instantiates a new handle request. + * @param name Name of topic/method/binding being handled. + * @param payload Payload received. + * @param metadata Metadata received. */ - private Map metadata; - - /** - * Instantiates a new input request (useful for JSON deserialization). - */ - public Message() { - } - - /** - * Instantiates a new input request. - * @param id Identifier of the message being processed. - * @param datacontenttype Type of the input payload. - * @param data Input body. - * @param metadata Headers (or metadata) for the call. - */ - public Message(String id, String datacontenttype, byte[] data, Map metadata) { - this.id = id; - this.datacontenttype = datacontenttype; - this.data = data; + private HandleRequest(String name, byte[] payload, Map metadata) { + this.name = name; + this.payload = payload; this.metadata = metadata == null ? null : Collections.unmodifiableMap(metadata); } - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public byte[] getData() { - return data; - } - - public void setData(byte[] data) { - this.data = data; - } - - public String getDatacontenttype() { - return datacontenttype; - } - - public void setDatacontenttype(String datacontenttype) { - this.datacontenttype = datacontenttype; - } - - public Map getMetadata() { - return metadata; - } - - public void setMetadata(Map metadata) { - this.metadata = metadata; - } } /** * Internal class to handle a method call. */ - private static final class MethodHandler implements Function> { + private static final class MethodHandler implements Function> { /** * User-provided listener. @@ -224,26 +165,37 @@ public final class Dapr implements DaprRuntime { * @return Raw output payload or empty. */ @Override - public Mono apply(Message r) { - return listener.process(r.data, r.metadata); + public Mono apply(HandleRequest r) { + try { + return listener.process(r.payload, r.metadata); + } catch (Exception e) { + return Mono.error(e); + } } } /** * Internal class to handle a topic message delivery. */ - private static final class TopicHandler implements Function> { + private static final class TopicHandler implements Function> { /** * User-provided listener. */ private final TopicListener listener; + /** + * Serializer/deserializer. + */ + private final ObjectSerializer serializer; + /** * Instantiates a new topic handler. + * @param serializer Useful for object serialization. * @param listener Callback to be executed on a given message. */ - private TopicHandler(TopicListener listener) { + private TopicHandler(ObjectSerializer serializer, TopicListener listener) { + this.serializer = serializer; this.listener = listener; } @@ -253,8 +205,17 @@ public final class Dapr implements DaprRuntime { * @return Always empty response. */ @Override - public Mono apply(Message r) { - return listener.process(r.id, r.datacontenttype, r.data, r.metadata).then(Mono.empty()); + public Mono apply(HandleRequest r) { + try { + CloudEventEnvelope message = this.serializer.deserialize(r.payload, CloudEventEnvelope.class); + if (message == null) { + return Mono.empty(); + } + + return listener.process(message, r.metadata).then(Mono.empty()); + } catch (Exception e) { + return Mono.error(e); + } } } } diff --git a/sdk/src/main/java/io/dapr/runtime/MethodListener.java b/sdk/src/main/java/io/dapr/runtime/MethodListener.java index 4b1b793ca..314164fea 100644 --- a/sdk/src/main/java/io/dapr/runtime/MethodListener.java +++ b/sdk/src/main/java/io/dapr/runtime/MethodListener.java @@ -5,6 +5,7 @@ package io.dapr.runtime; +import com.fasterxml.jackson.core.JsonProcessingException; import reactor.core.publisher.Mono; import java.util.Map; @@ -19,7 +20,8 @@ public interface MethodListener { * @param data Raw input payload. * @param metadata Header (or metadata). * @return Raw response or empty. + * @throws Exception Any exception from user code. */ - Mono process(byte[] data, Map metadata); + Mono process(byte[] data, Map metadata) throws Exception; } diff --git a/sdk/src/main/java/io/dapr/runtime/TopicListener.java b/sdk/src/main/java/io/dapr/runtime/TopicListener.java index 8325ab8fe..ec8713383 100644 --- a/sdk/src/main/java/io/dapr/runtime/TopicListener.java +++ b/sdk/src/main/java/io/dapr/runtime/TopicListener.java @@ -5,6 +5,7 @@ package io.dapr.runtime; +import io.dapr.client.domain.CloudEventEnvelope; import reactor.core.publisher.Mono; import java.util.Map; @@ -16,12 +17,11 @@ public interface TopicListener { /** * Processes a given topic's message delivery. - * @param messageId Message's identifier. - * @param dataType Type of the input data. - * @param data Input data. + * @param message Message event to be processed. * @param metadata Headers (or metadata). * @return Empty response. + * @throws Exception Any exception from user code. */ - Mono process(String messageId, String dataType, byte[] data, Map metadata); + Mono process(CloudEventEnvelope message, Map metadata) throws Exception; } diff --git a/sdk/src/main/java/io/dapr/utils/Constants.java b/sdk/src/main/java/io/dapr/utils/Constants.java index 32aea9f04..f3120f63d 100644 --- a/sdk/src/main/java/io/dapr/utils/Constants.java +++ b/sdk/src/main/java/io/dapr/utils/Constants.java @@ -74,6 +74,11 @@ public final class Constants { */ public static final String ACTOR_TIMER_RELATIVE_URL_FORMAT = ACTORS_BASE_URL + "/%s/%s/timers/%s"; + /** + * Base path to invoke methods. + */ + public static final String INVOKE_PATH = API_VERSION + "/invoke"; + /** * Invoke Publish Path */ diff --git a/sdk/src/main/java/io/dapr/utils/ObjectSerializer.java b/sdk/src/main/java/io/dapr/utils/ObjectSerializer.java index e35b0c30a..6446c392a 100644 --- a/sdk/src/main/java/io/dapr/utils/ObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/utils/ObjectSerializer.java @@ -6,12 +6,10 @@ package io.dapr.utils; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import io.dapr.runtime.Dapr; +import io.dapr.client.domain.CloudEventEnvelope; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -25,7 +23,7 @@ import java.util.Map; public class ObjectSerializer { /** - * Shared Json Factory as per Jackson's documentation, used only for this class. + * Shared Json Factory as per Jackson's documentation. */ protected static final JsonFactory JSON_FACTORY = new JsonFactory(); @@ -113,8 +111,8 @@ public class ObjectSerializer { return (T) null; } - if (clazz == Dapr.Message.class) { - return (T) this.deserializeTopicMessage(value); + if (clazz == CloudEventEnvelope.class) { + return (T) this.deserializeCloudEventEnvelope(value); } if (clazz == String.class) { @@ -145,7 +143,7 @@ public class ObjectSerializer { * @return Message (can be null if input is null) * @throws IOException If cannot parse. */ - private Dapr.Message deserializeTopicMessage(Object payload) throws IOException { + private CloudEventEnvelope deserializeCloudEventEnvelope(Object payload) throws IOException { if (payload == null) { return null; } @@ -166,9 +164,24 @@ public class ObjectSerializer { id = node.get("id").asText(); } - String dataType = null; + String source = null; + if (node.has("source") && !node.get("source").isNull()) { + source = node.get("source").asText(); + } + + String type = null; + if (node.has("type") && !node.get("type").isNull()) { + type = node.get("type").asText(); + } + + String specversion = null; + if (node.has("specversion") && !node.get("specversion").isNull()) { + specversion = node.get("specversion").asText(); + } + + String datacontenttype = null; if (node.has("datacontenttype") && !node.get("datacontenttype").isNull()) { - dataType = node.get("datacontenttype").asText(); + datacontenttype = node.get("datacontenttype").asText(); } byte[] data = null; @@ -180,7 +193,7 @@ public class ObjectSerializer { } } - return new Dapr.Message(id, dataType, data, null); + return new CloudEventEnvelope(id, source, type, specversion, datacontenttype, data); } /** diff --git a/sdk/src/test/java/io/dapr/runtime/DaprRuntimeTest.java b/sdk/src/test/java/io/dapr/runtime/DaprRuntimeTest.java index 7c0c943fd..2ae8386cc 100644 --- a/sdk/src/test/java/io/dapr/runtime/DaprRuntimeTest.java +++ b/sdk/src/test/java/io/dapr/runtime/DaprRuntimeTest.java @@ -5,17 +5,21 @@ package io.dapr.runtime; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.dapr.client.*; -import io.dapr.runtime.Dapr; -import io.dapr.runtime.DaprRuntime; -import io.dapr.runtime.TopicListener; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientTestBuilder; +import io.dapr.client.DaprHttpStub; +import io.dapr.client.domain.CloudEventEnvelope; +import io.dapr.client.domain.Verb; import io.dapr.utils.Constants; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import reactor.core.publisher.Mono; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -27,12 +31,16 @@ import static org.mockito.Mockito.*; public class DaprRuntimeTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + protected static final JsonFactory JSON_FACTORY = new JsonFactory(); private static final String TYPE_PLAIN_TEXT = "plain/text"; private static final String TOPIC_NAME = "mytopic"; + private static final String APP_ID = "myappid"; + + private static final String METHOD_NAME = "mymethod"; + private final DaprRuntime daprRuntime = Dapr.getInstance(); @Before @@ -49,75 +57,166 @@ public class DaprRuntimeTest { Assert.assertTrue(this.daprRuntime.getSubscribedTopics().isEmpty()); TopicListener listener = mock(TopicListener.class); + when(listener.process(any(), any())).thenReturn(Mono.empty()); + this.daprRuntime.subscribeToTopic(TOPIC_NAME, listener); - verify(listener, never()).process(any(), any(), any(), any()); + verify(listener, never()).process(any(), any()); - Dapr.Message[] messages = new Dapr.Message[]{ - new Dapr.Message( - generateMessageId(), - TYPE_PLAIN_TEXT, - generatePayload(), - generateSingleMetadata()), - new Dapr.Message( - generateMessageId(), - TYPE_PLAIN_TEXT, - new byte[0], - generateSingleMetadata()), - new Dapr.Message( - generateMessageId(), - TYPE_PLAIN_TEXT, - null, - generateSingleMetadata()), - new Dapr.Message( - generateMessageId(), - TYPE_PLAIN_TEXT, - generatePayload(), - null), - new Dapr.Message( - "", - TYPE_PLAIN_TEXT, - generatePayload(), - generateSingleMetadata()), - new Dapr.Message( - null, - TYPE_PLAIN_TEXT, - generatePayload(), - generateSingleMetadata()), - new Dapr.Message( - generateMessageId(), - "", - generatePayload(), - generateSingleMetadata()), - new Dapr.Message( - generateMessageId(), - null, - generatePayload(), - generateSingleMetadata()) + Message[] messages = new Message[]{ + new Message( + generateMessageId(), + TYPE_PLAIN_TEXT, + generatePayload(), + generateSingleMetadata()), + new Message( + generateMessageId(), + TYPE_PLAIN_TEXT, + new byte[0], + generateSingleMetadata()), + new Message( + generateMessageId(), + TYPE_PLAIN_TEXT, + null, + generateSingleMetadata()), + new Message( + generateMessageId(), + TYPE_PLAIN_TEXT, + generatePayload(), + null), + new Message( + "", + TYPE_PLAIN_TEXT, + generatePayload(), + generateSingleMetadata()), + new Message( + null, + TYPE_PLAIN_TEXT, + generatePayload(), + generateSingleMetadata()), + new Message( + generateMessageId(), + "", + generatePayload(), + generateSingleMetadata()), + new Message( + generateMessageId(), + null, + generatePayload(), + generateSingleMetadata()) }; DaprHttpStub daprHttp = mock(DaprHttpStub.class); DaprClient client = DaprClientTestBuilder.buildHttpClient(daprHttp); - for (Dapr.Message message : messages) { + for (Message message : messages) { when(daprHttp.invokeAPI( - eq("POST"), - eq(Constants.PUBLISH_PATH + "/" + TOPIC_NAME), - eq(message.getData()), - eq(null))) - .thenAnswer(invocationOnMock -> { - this.daprRuntime.handleInvocation( - TOPIC_NAME, OBJECT_MAPPER.writeValueAsBytes(message), message.getMetadata()); - return Mono.empty(); - }); + eq("POST"), + eq(Constants.PUBLISH_PATH + "/" + TOPIC_NAME), + eq(message.data), + eq(null))) + .thenAnswer(invocationOnMock -> this.daprRuntime.handleInvocation( + TOPIC_NAME, + this.serialize(message), + message.metadata).then()); - client.publishEvent(TOPIC_NAME, message.getData()).block(); + client.publishEvent(TOPIC_NAME, message.data).block(); - verify(listener, times(1)) - .process(eq(message.getId()), eq(message.getDatacontenttype()), eq(message.getData()), eq(message.getMetadata())); + CloudEventEnvelope envelope = new CloudEventEnvelope( + message.id, + null, + null, + null, + message.datacontenttype, + message.data + ); + verify(listener, times(1)).process(eq(envelope), eq(message.metadata)); } - verify(listener, times(messages.length)).process(any(), any(), any(), any()); + verify(listener, times(messages.length)).process(any(), any()); + } + + @Test + public void invokeHappyCase() throws Exception { + MethodListener listener = mock(MethodListener.class); + + this.daprRuntime.registerServiceMethod(METHOD_NAME, listener); + + verify(listener, never()).process(any(), any()); + + Message[] messages = new Message[]{ + new Message( + generateMessageId(), + TYPE_PLAIN_TEXT, + generatePayload(), + generateSingleMetadata()), + new Message( + generateMessageId(), + TYPE_PLAIN_TEXT, + new byte[0], + generateSingleMetadata()), + new Message( + generateMessageId(), + TYPE_PLAIN_TEXT, + null, + generateSingleMetadata()), + new Message( + generateMessageId(), + TYPE_PLAIN_TEXT, + generatePayload(), + null), + new Message( + "", + TYPE_PLAIN_TEXT, + generatePayload(), + generateSingleMetadata()), + new Message( + null, + TYPE_PLAIN_TEXT, + generatePayload(), + generateSingleMetadata()), + new Message( + generateMessageId(), + "", + generatePayload(), + generateSingleMetadata()), + new Message( + generateMessageId(), + null, + generatePayload(), + generateSingleMetadata()) + }; + + DaprHttpStub daprHttp = mock(DaprHttpStub.class); + DaprClient client = DaprClientTestBuilder.buildHttpClient(daprHttp); + + for (Message message : messages) { + byte[] expectedResponse = message.id == null ? new byte[0] : message.id.getBytes(StandardCharsets.UTF_8); + when(listener.process(eq(message.data), eq(message.metadata))) + .then(x -> Mono.just(expectedResponse)); + + when(daprHttp.invokeAPI( + eq("POST"), + eq(Constants.INVOKE_PATH + "/" + APP_ID + "/method/" + METHOD_NAME), + eq(message.data), + any())) + .thenAnswer(x -> + this.daprRuntime.handleInvocation( + METHOD_NAME, + message.data, + message.metadata) + .map(r -> new String(r, StandardCharsets.UTF_8))); + + Mono response = client.invokeService(Verb.POST, APP_ID, METHOD_NAME, message.data, message.metadata); + Assert.assertEquals( + new String(expectedResponse, StandardCharsets.UTF_8), + new String(response.block(), StandardCharsets.UTF_8)); + + verify(listener, times(1)) + .process(eq(message.data), eq(message.metadata)); + } + + verify(listener, times(messages.length)).process(any(), any()); } @Test(expected = RuntimeException.class) @@ -126,27 +225,29 @@ public class DaprRuntimeTest { Assert.assertTrue(this.daprRuntime.getSubscribedTopics().isEmpty()); TopicListener listener = mock(TopicListener.class); - when(listener.process(any(), any(), any(), any())) + when(listener.process(any(), any())) .thenReturn(Mono.error(new RuntimeException())); this.daprRuntime.subscribeToTopic(TOPIC_NAME, listener); - Dapr.Message message = new Dapr.Message( + Message message = new Message( generateMessageId(), TYPE_PLAIN_TEXT, generatePayload(), generateSingleMetadata()); Mono result = this.daprRuntime - .handleInvocation(TOPIC_NAME, OBJECT_MAPPER.writeValueAsBytes(message), message.getMetadata()); - - verify(listener, times(1)) - .process( - eq(message.getId()), - eq(message.getDatacontenttype()), - eq(message.getData()), - eq(message.getMetadata())); + .handleInvocation(TOPIC_NAME, this.serialize(message), message.metadata); + CloudEventEnvelope envelope = new CloudEventEnvelope( + message.id, + null, + null, + null, + message.datacontenttype, + message.data + ); + verify(listener, times(1)).process(eq(envelope), eq(message.metadata)); result.block(); } @@ -159,21 +260,16 @@ public class DaprRuntimeTest { this.daprRuntime.subscribeToTopic(TOPIC_NAME, listener); - Dapr.Message message = new Dapr.Message( + Message message = new Message( generateMessageId(), TYPE_PLAIN_TEXT, generatePayload(), generateSingleMetadata()); Mono result = this.daprRuntime - .handleInvocation("UNKNOWN", OBJECT_MAPPER.writeValueAsBytes(message), message.getMetadata()); + .handleInvocation("UNKNOWN", serialize(message), message.metadata); - verify(listener, never()) - .process( - eq(message.getId()), - eq(message.getDatacontenttype()), - eq(message.getData()), - eq(message.getMetadata())); + verify(listener, never()).process(any(), any()); result.block(); } @@ -189,4 +285,46 @@ public class DaprRuntimeTest { private static final Map generateSingleMetadata() { return Collections.singletonMap(UUID.randomUUID().toString(), UUID.randomUUID().toString()); } + + private static final class Message { + + private final String id; + + private final String datacontenttype; + + private final byte[] data; + + private final Map metadata; + + private Message(String id, String datacontenttype, byte[] data, Map metadata) { + this.id = id; + this.datacontenttype = datacontenttype; + this.data = data; + this.metadata = metadata; + } + } + + private byte[] serialize(Message message) throws IOException { + if (message == null) { + return null; + } + + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + JsonGenerator generator = JSON_FACTORY.createGenerator(bos); + generator.writeStartObject(); + if (message.id != null) { + generator.writeStringField("id", message.id); + } + if (message.datacontenttype != null) { + generator.writeStringField("datacontenttype", message.datacontenttype); + } + if (message.data != null) { + generator.writeBinaryField("data", message.data); + } + generator.writeEndObject(); + generator.close(); + bos.flush(); + return bos.toByteArray(); + } + } } diff --git a/sdk/src/test/java/io/dapr/utils/ObjectSerializerTest.java b/sdk/src/test/java/io/dapr/utils/ObjectSerializerTest.java index 2c5662f80..e261ca1b1 100644 --- a/sdk/src/test/java/io/dapr/utils/ObjectSerializerTest.java +++ b/sdk/src/test/java/io/dapr/utils/ObjectSerializerTest.java @@ -1,3 +1,8 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + package io.dapr.utils; import org.junit.Test;