From b2083187df8b480a752689849254911b69b2005f Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Fri, 31 Jan 2020 15:20:22 -0800 Subject: [PATCH] =?UTF-8?q?Add=20support=20for=20metadata=20in=20Bindings?= =?UTF-8?q?=20+=20fix=20serialization=20of=20output=20bi=E2=80=A6=20(#186)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add support for metadata in Bindings + fix serialization of output binding." * Remove generics from new bindings method. --- .../bindings/http/InputBindingController.java | 2 +- .../bindings/http/InputBindingExample.java | 2 +- .../bindings/http/OutputBindingExample.java | 42 +++++++++-------- .../io/dapr/examples/bindings/http/README.md | 7 ++- .../io/dapr/it/binding/http/BindingIT.java | 17 +++++-- .../binding/http/InputBindingController.java | 2 +- .../it/binding/http/InputBindingService.java | 2 +- .../main/java/io/dapr/client/DaprClient.java | 14 +++++- .../java/io/dapr/client/DaprClientGrpc.java | 19 ++++++-- .../java/io/dapr/client/DaprClientHttp.java | 45 +++++++++++++++++-- 10 files changed, 112 insertions(+), 40 deletions(-) diff --git a/examples/src/main/java/io/dapr/examples/bindings/http/InputBindingController.java b/examples/src/main/java/io/dapr/examples/bindings/http/InputBindingController.java index 0183c25e2..ec6f7a40b 100644 --- a/examples/src/main/java/io/dapr/examples/bindings/http/InputBindingController.java +++ b/examples/src/main/java/io/dapr/examples/bindings/http/InputBindingController.java @@ -16,7 +16,7 @@ import reactor.core.publisher.Mono; @RestController public class InputBindingController { - @PostMapping(path = "/bindingSample") + @PostMapping(path = "/sample123") public Mono handleInputBinding(@RequestBody(required = false) byte[] body) { return Mono.fromRunnable(() -> System.out.println("Received message through binding: " + (body == null ? "" : new String(body)))); diff --git a/examples/src/main/java/io/dapr/examples/bindings/http/InputBindingExample.java b/examples/src/main/java/io/dapr/examples/bindings/http/InputBindingExample.java index 9e1b04be1..181bd2177 100644 --- a/examples/src/main/java/io/dapr/examples/bindings/http/InputBindingExample.java +++ b/examples/src/main/java/io/dapr/examples/bindings/http/InputBindingExample.java @@ -18,7 +18,7 @@ import org.apache.commons.cli.Options; * 2. cd to [repo-root]/examples * 3. Run : * dapr run --app-id inputbinding --app-port 3000 --port 3005 \ - * -- mvn exec:java -D exec.mainClass=io.dapr.examples.bindings.InputBindingExample -D exec.args="-p 3000" + * -- mvn exec:java -D exec.mainClass=io.dapr.examples.bindings.http.InputBindingExample -D exec.args="-p 3000" */ public class InputBindingExample { diff --git a/examples/src/main/java/io/dapr/examples/bindings/http/OutputBindingExample.java b/examples/src/main/java/io/dapr/examples/bindings/http/OutputBindingExample.java index 0310011da..37fd4b49a 100644 --- a/examples/src/main/java/io/dapr/examples/bindings/http/OutputBindingExample.java +++ b/examples/src/main/java/io/dapr/examples/bindings/http/OutputBindingExample.java @@ -15,7 +15,7 @@ import io.dapr.client.DaprClientBuilder; * 2. cd to [repo-root]/examples * 3. Run the program: * dapr run --app-id outputbinding --port 3006 \ - * -- mvn exec:java -D exec.mainClass=io.dapr.examples.bindings.OutputBindingExample + * -- mvn exec:java -D exec.mainClass=io.dapr.examples.bindings.http.OutputBindingExample */ public class OutputBindingExample { @@ -26,7 +26,7 @@ public class OutputBindingExample { public String message; } - static final String BINDING_NAME = "bindingSample"; + static final String BINDING_NAME = "sample123"; /** * The main method of this app. @@ -37,26 +37,30 @@ public class OutputBindingExample { public static void main(String[] args) { DaprClient client = new DaprClientBuilder().build(); - // This is an example of sending data in a user-defined object. The input binding will receive: - // {"message":"hello"} - MyClass myClass = new MyClass(); - myClass.message = "hello"; + int count = 0; + while (!Thread.currentThread().isInterrupted()) { + String message = "Message #" + (count++); - System.out.println("sending a class with message: " + myClass.message); - client.invokeBinding(BINDING_NAME, myClass).block(); + // Randomly decides between a class type or string type to be sent. + if (Math.random() >= 0.5) { + // This is an example of sending data in a user-defined object. The input binding will receive: + // {"message":"hello"} + MyClass myClass = new MyClass(); + myClass.message = message; - // This is an example of sending a plain string. The input binding will receive: - // "cat" - final String m = "cat"; - System.out.println("sending a plain string: " + m); - client.invokeBinding(BINDING_NAME, m).block(); + System.out.println("sending a class with message: " + myClass.message); + client.invokeBinding(BINDING_NAME, myClass).block(); + } else { + System.out.println("sending a plain string: " + message); + client.invokeBinding(BINDING_NAME, message).block(); + } - try { - Thread.sleep((long) (10000 * Math.random())); - } catch (InterruptedException e) { - e.printStackTrace(); - Thread.currentThread().interrupt(); - return; + try { + Thread.sleep((long) (10000 * Math.random())); + } catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + } } System.out.println("Done."); diff --git a/examples/src/main/java/io/dapr/examples/bindings/http/README.md b/examples/src/main/java/io/dapr/examples/bindings/http/README.md index 130e95dfc..9b96bf830 100644 --- a/examples/src/main/java/io/dapr/examples/bindings/http/README.md +++ b/examples/src/main/java/io/dapr/examples/bindings/http/README.md @@ -105,14 +105,13 @@ public class OutputBindingExample { final String BINDING_NAME = "bindingSample"; ///... MyClass myClass = new MyClass(); - myClass.message = "hello"; + myClass.message = message; System.out.println("sending an object instance with message: " + myClass.message); client.invokeBinding(BINDING_NAME, myClass); //Binding a data object - ///.. - final String m = "cat"; + ///... System.out.println("sending a plain string: " + m); - client.invokeBinding(BINDING_NAME, m); //Binding a plain string text + client.invokeBinding(BINDING_NAME, message); //Binding a plain string text } ///... } diff --git a/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java b/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java index 7261a1da5..537c6ccfa 100644 --- a/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java @@ -17,6 +17,7 @@ import org.junit.Ignore; import org.junit.Test; import java.util.Base64; +import java.util.Collections; import java.util.List; import static io.dapr.it.Retry.callWithRetry; @@ -45,7 +46,7 @@ public class BindingIT extends BaseIT { InputBindingService.class, true, 60000); - // At this point, it is guaranteed that the service aboce is running and all ports being listened to. + // At this point, it is guaranteed that the service above is running and all ports being listened to. // TODO: figure out why this wait is needed for this scenario to work end-to-end. Kafka not up yet? Thread.sleep(120000); @@ -59,18 +60,26 @@ public class BindingIT extends BaseIT { myClass.message = "hello"; System.out.println("sending first message"); - client.invokeBinding(BINDING_NAME, myClass).block(); + client.invokeBinding(BINDING_NAME, myClass, Collections.singletonMap("MyMetadata", "MyValue")).block(); // This is an example of sending a plain string. The input binding will receive // cat final String m = "cat"; System.out.println("sending " + m); - client.invokeBinding(BINDING_NAME, m).block(); + client.invokeBinding(BINDING_NAME, m, Collections.singletonMap("MyMetadata", "MyValue")).block(); + // Metadata is not used by Kafka component, so it is not possible to validate. callWithRetry(() -> { System.out.println("Checking results ..."); - final List messages = client.invokeService(Verb.GET, daprRunInputBinding.getAppName(), "messages", null, List.class).block(); + final List messages = + client.invokeService( + Verb.GET, + daprRunInputBinding.getAppName(), + "messages", + null, + List.class).block(); assertEquals(2, messages.size()); + MyClass resultClass = null; try { resultClass = new ObjectMapper().readValue(messages.get(0), MyClass.class); diff --git a/sdk-tests/src/test/java/io/dapr/it/binding/http/InputBindingController.java b/sdk-tests/src/test/java/io/dapr/it/binding/http/InputBindingController.java index 5ce12eaf2..1fd35f23a 100644 --- a/sdk-tests/src/test/java/io/dapr/it/binding/http/InputBindingController.java +++ b/sdk-tests/src/test/java/io/dapr/it/binding/http/InputBindingController.java @@ -24,7 +24,7 @@ public class InputBindingController { @GetMapping("/dapr/config") public String daprConfig() throws Exception { - return "{\"actorIdleTimeout\":\"5s\",\"actorScanInterval\":\"2s\",\"drainOngoingCallTimeout\":\"1s\",\"drainBalancedActors\":true,\"entities\":[]}"; + return "{}"; } @PostMapping(path = "/sample123") diff --git a/sdk-tests/src/test/java/io/dapr/it/binding/http/InputBindingService.java b/sdk-tests/src/test/java/io/dapr/it/binding/http/InputBindingService.java index 7ecef637d..7f1e55775 100644 --- a/sdk-tests/src/test/java/io/dapr/it/binding/http/InputBindingService.java +++ b/sdk-tests/src/test/java/io/dapr/it/binding/http/InputBindingService.java @@ -8,7 +8,7 @@ package io.dapr.it.binding.http; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -@SpringBootApplication(scanBasePackages = {"io.dapr.it.binding.http"}) +@SpringBootApplication public class InputBindingService { public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running. Init Elapsed"; diff --git a/sdk/src/main/java/io/dapr/client/DaprClient.java b/sdk/src/main/java/io/dapr/client/DaprClient.java index 5247df1cd..a46511c76 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprClient.java @@ -127,14 +127,24 @@ public interface DaprClient { Mono invokeService(Verb verb, String appId, String method, byte[] request, Map metadata); /** - * Creating a Binding. + * Invokes a Binding. * * @param name The name of the biding to call. * @param request The request needed for the binding, use byte[] to skip serialization. - * @return a Mono plan of type Void + * @return a Mono plan of type Void. */ Mono invokeBinding(String name, Object request); + /** + * Invokes a Binding with metadata. + * + * @param name The name of the biding to call. + * @param request The request needed for the binding, use byte[] to skip serialization. + * @param metadata The metadata map. + * @return a Mono plan of type Void. + */ + Mono invokeBinding(String name, Object request, Map metadata); + /** * Retrieve a State based on their key. * diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index ce25e878c..9eaed0a09 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -176,12 +176,25 @@ public class DaprClientGrpc implements DaprClient { */ @Override public Mono invokeBinding(String name, Object request) { + return this.invokeBinding(name, request, null); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeBinding(String name, Object request, Map metadata) { try { byte[] byteRequest = objectSerializer.serialize(request); - Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteRequest)).build(); DaprProtos.InvokeBindingEnvelope.Builder builder = DaprProtos.InvokeBindingEnvelope.newBuilder() - .setName(name) - .setData(data); + .setName(name); + if (byteRequest != null) { + Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteRequest)).build(); + builder.setData(data); + } + if (metadata != null) { + builder.getMetadataMap().putAll(metadata); + } DaprProtos.InvokeBindingEnvelope envelope = builder.build(); return Mono.fromCallable(() -> { ListenableFuture futureEmpty = client.invokeBinding(envelope); diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java index 1099ee08e..d1c154c26 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java @@ -53,7 +53,12 @@ public class DaprClientHttp implements DaprClient { private final DaprObjectSerializer stateSerializer; /** - * Flag determining if serializer's input and output contains a valid String. + * Flag determining if object serializer's input and output is Dapr's default. + */ + private final boolean isDefaultObjectSerializer; + + /** + * Flag determining if state serializer's input and output contains a valid String. */ private final boolean isStateString; @@ -70,6 +75,7 @@ public class DaprClientHttp implements DaprClient { this.client = client; this.objectSerializer = objectSerializer; this.stateSerializer = stateSerializer; + this.isDefaultObjectSerializer = objectSerializer instanceof DefaultObjectSerializer; this.isStateString = stateSerializer.getClass().getAnnotation(StringContentType.class) != null; } @@ -205,13 +211,44 @@ public class DaprClientHttp implements DaprClient { */ @Override public Mono invokeBinding(String name, Object request) { + return this.invokeBinding(name, request, null); + } + + /** + * {@inheritDoc} + */ + @Override + public Mono invokeBinding(String name, Object request, Map metadata) { try { if (name == null || name.trim().isEmpty()) { - throw new IllegalArgumentException("Name to bind cannot be null or empty."); + throw new IllegalArgumentException("Binding name cannot be null or empty."); } Map jsonMap = new HashMap<>(); - jsonMap.put("data", request); + if (metadata != null) { + jsonMap.put("metadata", metadata); + } + + if (request != null) { + if (this.isDefaultObjectSerializer) { + // If we are using Dapr's default serializer, we pass the object directly and skip objectSerializer. + // This allows binding to receive JSON directly without having to extract it from a quoted string. + // Example of output binding vs body in the input binding: + // This logic DOES this: + // Output Binding: { "data" : { "mykey": "myvalue" } } + // Input Binding: { "mykey": "myvalue" } + // This logic AVOIDS this: + // Output Binding: { "data" : "{ \"mykey\": \"myvalue\" }" } + // Input Binding: "{ \"mykey\": \"myvalue\" }" + jsonMap.put("data", request); + } else { + // When customer provides a custom serializer, he will get a Base64 encoded String back - always. + // Example of body in the input binding resulting from this logic: + // { "data" : "eyJrZXkiOiAidmFsdWUifQ==" } + jsonMap.put("data", objectSerializer.serialize(request)); + } + } + StringBuilder url = new StringBuilder(Constants.BINDING_PATH).append("/").append(name); return this.client @@ -219,7 +256,7 @@ public class DaprClientHttp implements DaprClient { DaprHttp.HttpMethods.POST.name(), url.toString(), null, - objectSerializer.serialize(jsonMap), + INTERNAL_SERIALIZER.serialize(jsonMap), null) .then(); } catch (Exception ex) {