Add support for metadata in Bindings + fix serialization of output bi… (#186)

* Add support for metadata in Bindings + fix serialization of output binding."

* Remove generics from new bindings method.
This commit is contained in:
Artur Souza 2020-01-31 15:20:22 -08:00 committed by GitHub
parent 576b5c52ab
commit b2083187df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 112 additions and 40 deletions

View File

@ -16,7 +16,7 @@ import reactor.core.publisher.Mono;
@RestController @RestController
public class InputBindingController { public class InputBindingController {
@PostMapping(path = "/bindingSample") @PostMapping(path = "/sample123")
public Mono<Void> handleInputBinding(@RequestBody(required = false) byte[] body) { public Mono<Void> handleInputBinding(@RequestBody(required = false) byte[] body) {
return Mono.fromRunnable(() -> return Mono.fromRunnable(() ->
System.out.println("Received message through binding: " + (body == null ? "" : new String(body)))); System.out.println("Received message through binding: " + (body == null ? "" : new String(body))));

View File

@ -18,7 +18,7 @@ import org.apache.commons.cli.Options;
* 2. cd to [repo-root]/examples * 2. cd to [repo-root]/examples
* 3. Run : * 3. Run :
* dapr run --app-id inputbinding --app-port 3000 --port 3005 \ * dapr run --app-id inputbinding --app-port 3000 --port 3005 \
* -- mvn exec:java -D exec.mainClass=io.dapr.examples.bindings.InputBindingExample -D exec.args="-p 3000" * -- mvn exec:java -D exec.mainClass=io.dapr.examples.bindings.http.InputBindingExample -D exec.args="-p 3000"
*/ */
public class InputBindingExample { public class InputBindingExample {

View File

@ -15,7 +15,7 @@ import io.dapr.client.DaprClientBuilder;
* 2. cd to [repo-root]/examples * 2. cd to [repo-root]/examples
* 3. Run the program: * 3. Run the program:
* dapr run --app-id outputbinding --port 3006 \ * dapr run --app-id outputbinding --port 3006 \
* -- mvn exec:java -D exec.mainClass=io.dapr.examples.bindings.OutputBindingExample * -- mvn exec:java -D exec.mainClass=io.dapr.examples.bindings.http.OutputBindingExample
*/ */
public class OutputBindingExample { public class OutputBindingExample {
@ -26,7 +26,7 @@ public class OutputBindingExample {
public String message; public String message;
} }
static final String BINDING_NAME = "bindingSample"; static final String BINDING_NAME = "sample123";
/** /**
* The main method of this app. * The main method of this app.
@ -37,26 +37,30 @@ public class OutputBindingExample {
public static void main(String[] args) { public static void main(String[] args) {
DaprClient client = new DaprClientBuilder().build(); DaprClient client = new DaprClientBuilder().build();
int count = 0;
while (!Thread.currentThread().isInterrupted()) {
String message = "Message #" + (count++);
// Randomly decides between a class type or string type to be sent.
if (Math.random() >= 0.5) {
// This is an example of sending data in a user-defined object. The input binding will receive: // This is an example of sending data in a user-defined object. The input binding will receive:
// {"message":"hello"} // {"message":"hello"}
MyClass myClass = new MyClass(); MyClass myClass = new MyClass();
myClass.message = "hello"; myClass.message = message;
System.out.println("sending a class with message: " + myClass.message); System.out.println("sending a class with message: " + myClass.message);
client.invokeBinding(BINDING_NAME, myClass).block(); client.invokeBinding(BINDING_NAME, myClass).block();
} else {
// This is an example of sending a plain string. The input binding will receive: System.out.println("sending a plain string: " + message);
// "cat" client.invokeBinding(BINDING_NAME, message).block();
final String m = "cat"; }
System.out.println("sending a plain string: " + m);
client.invokeBinding(BINDING_NAME, m).block();
try { try {
Thread.sleep((long) (10000 * Math.random())); Thread.sleep((long) (10000 * Math.random()));
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return; }
} }
System.out.println("Done."); System.out.println("Done.");

View File

@ -105,14 +105,13 @@ public class OutputBindingExample {
final String BINDING_NAME = "bindingSample"; final String BINDING_NAME = "bindingSample";
///... ///...
MyClass myClass = new MyClass(); MyClass myClass = new MyClass();
myClass.message = "hello"; myClass.message = message;
System.out.println("sending an object instance with message: " + myClass.message); System.out.println("sending an object instance with message: " + myClass.message);
client.invokeBinding(BINDING_NAME, myClass); //Binding a data object client.invokeBinding(BINDING_NAME, myClass); //Binding a data object
///.. ///...
final String m = "cat";
System.out.println("sending a plain string: " + m); System.out.println("sending a plain string: " + m);
client.invokeBinding(BINDING_NAME, m); //Binding a plain string text client.invokeBinding(BINDING_NAME, message); //Binding a plain string text
} }
///... ///...
} }

View File

@ -17,6 +17,7 @@ import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import java.util.Base64; import java.util.Base64;
import java.util.Collections;
import java.util.List; import java.util.List;
import static io.dapr.it.Retry.callWithRetry; import static io.dapr.it.Retry.callWithRetry;
@ -45,7 +46,7 @@ public class BindingIT extends BaseIT {
InputBindingService.class, InputBindingService.class,
true, true,
60000); 60000);
// At this point, it is guaranteed that the service aboce is running and all ports being listened to. // At this point, it is guaranteed that the service above is running and all ports being listened to.
// TODO: figure out why this wait is needed for this scenario to work end-to-end. Kafka not up yet? // TODO: figure out why this wait is needed for this scenario to work end-to-end. Kafka not up yet?
Thread.sleep(120000); Thread.sleep(120000);
@ -59,18 +60,26 @@ public class BindingIT extends BaseIT {
myClass.message = "hello"; myClass.message = "hello";
System.out.println("sending first message"); System.out.println("sending first message");
client.invokeBinding(BINDING_NAME, myClass).block(); client.invokeBinding(BINDING_NAME, myClass, Collections.singletonMap("MyMetadata", "MyValue")).block();
// This is an example of sending a plain string. The input binding will receive // This is an example of sending a plain string. The input binding will receive
// cat // cat
final String m = "cat"; final String m = "cat";
System.out.println("sending " + m); System.out.println("sending " + m);
client.invokeBinding(BINDING_NAME, m).block(); client.invokeBinding(BINDING_NAME, m, Collections.singletonMap("MyMetadata", "MyValue")).block();
// Metadata is not used by Kafka component, so it is not possible to validate.
callWithRetry(() -> { callWithRetry(() -> {
System.out.println("Checking results ..."); System.out.println("Checking results ...");
final List<String> messages = client.invokeService(Verb.GET, daprRunInputBinding.getAppName(), "messages", null, List.class).block(); final List<String> messages =
client.invokeService(
Verb.GET,
daprRunInputBinding.getAppName(),
"messages",
null,
List.class).block();
assertEquals(2, messages.size()); assertEquals(2, messages.size());
MyClass resultClass = null; MyClass resultClass = null;
try { try {
resultClass = new ObjectMapper().readValue(messages.get(0), MyClass.class); resultClass = new ObjectMapper().readValue(messages.get(0), MyClass.class);

View File

@ -24,7 +24,7 @@ public class InputBindingController {
@GetMapping("/dapr/config") @GetMapping("/dapr/config")
public String daprConfig() throws Exception { public String daprConfig() throws Exception {
return "{\"actorIdleTimeout\":\"5s\",\"actorScanInterval\":\"2s\",\"drainOngoingCallTimeout\":\"1s\",\"drainBalancedActors\":true,\"entities\":[]}"; return "{}";
} }
@PostMapping(path = "/sample123") @PostMapping(path = "/sample123")

View File

@ -8,7 +8,7 @@ package io.dapr.it.binding.http;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication(scanBasePackages = {"io.dapr.it.binding.http"}) @SpringBootApplication
public class InputBindingService { public class InputBindingService {
public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running. Init Elapsed"; public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running. Init Elapsed";

View File

@ -127,14 +127,24 @@ public interface DaprClient {
Mono<byte[]> invokeService(Verb verb, String appId, String method, byte[] request, Map<String, String> metadata); Mono<byte[]> invokeService(Verb verb, String appId, String method, byte[] request, Map<String, String> metadata);
/** /**
* Creating a Binding. * Invokes a Binding.
* *
* @param name The name of the biding to call. * @param name The name of the biding to call.
* @param request The request needed for the binding, use byte[] to skip serialization. * @param request The request needed for the binding, use byte[] to skip serialization.
* @return a Mono plan of type Void * @return a Mono plan of type Void.
*/ */
Mono<Void> invokeBinding(String name, Object request); Mono<Void> invokeBinding(String name, Object request);
/**
* Invokes a Binding with metadata.
*
* @param name The name of the biding to call.
* @param request The request needed for the binding, use byte[] to skip serialization.
* @param metadata The metadata map.
* @return a Mono plan of type Void.
*/
Mono<Void> invokeBinding(String name, Object request, Map<String, String> metadata);
/** /**
* Retrieve a State based on their key. * Retrieve a State based on their key.
* *

View File

@ -176,12 +176,25 @@ public class DaprClientGrpc implements DaprClient {
*/ */
@Override @Override
public Mono<Void> invokeBinding(String name, Object request) { public Mono<Void> invokeBinding(String name, Object request) {
return this.invokeBinding(name, request, null);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> invokeBinding(String name, Object request, Map<String, String> metadata) {
try { try {
byte[] byteRequest = objectSerializer.serialize(request); byte[] byteRequest = objectSerializer.serialize(request);
Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteRequest)).build();
DaprProtos.InvokeBindingEnvelope.Builder builder = DaprProtos.InvokeBindingEnvelope.newBuilder() DaprProtos.InvokeBindingEnvelope.Builder builder = DaprProtos.InvokeBindingEnvelope.newBuilder()
.setName(name) .setName(name);
.setData(data); if (byteRequest != null) {
Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteRequest)).build();
builder.setData(data);
}
if (metadata != null) {
builder.getMetadataMap().putAll(metadata);
}
DaprProtos.InvokeBindingEnvelope envelope = builder.build(); DaprProtos.InvokeBindingEnvelope envelope = builder.build();
return Mono.fromCallable(() -> { return Mono.fromCallable(() -> {
ListenableFuture<Empty> futureEmpty = client.invokeBinding(envelope); ListenableFuture<Empty> futureEmpty = client.invokeBinding(envelope);

View File

@ -53,7 +53,12 @@ public class DaprClientHttp implements DaprClient {
private final DaprObjectSerializer stateSerializer; private final DaprObjectSerializer stateSerializer;
/** /**
* Flag determining if serializer's input and output contains a valid String. * Flag determining if object serializer's input and output is Dapr's default.
*/
private final boolean isDefaultObjectSerializer;
/**
* Flag determining if state serializer's input and output contains a valid String.
*/ */
private final boolean isStateString; private final boolean isStateString;
@ -70,6 +75,7 @@ public class DaprClientHttp implements DaprClient {
this.client = client; this.client = client;
this.objectSerializer = objectSerializer; this.objectSerializer = objectSerializer;
this.stateSerializer = stateSerializer; this.stateSerializer = stateSerializer;
this.isDefaultObjectSerializer = objectSerializer instanceof DefaultObjectSerializer;
this.isStateString = stateSerializer.getClass().getAnnotation(StringContentType.class) != null; this.isStateString = stateSerializer.getClass().getAnnotation(StringContentType.class) != null;
} }
@ -205,13 +211,44 @@ public class DaprClientHttp implements DaprClient {
*/ */
@Override @Override
public Mono<Void> invokeBinding(String name, Object request) { public Mono<Void> invokeBinding(String name, Object request) {
return this.invokeBinding(name, request, null);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> invokeBinding(String name, Object request, Map<String, String> metadata) {
try { try {
if (name == null || name.trim().isEmpty()) { if (name == null || name.trim().isEmpty()) {
throw new IllegalArgumentException("Name to bind cannot be null or empty."); throw new IllegalArgumentException("Binding name cannot be null or empty.");
} }
Map<String, Object> jsonMap = new HashMap<>(); Map<String, Object> jsonMap = new HashMap<>();
if (metadata != null) {
jsonMap.put("metadata", metadata);
}
if (request != null) {
if (this.isDefaultObjectSerializer) {
// If we are using Dapr's default serializer, we pass the object directly and skip objectSerializer.
// This allows binding to receive JSON directly without having to extract it from a quoted string.
// Example of output binding vs body in the input binding:
// This logic DOES this:
// Output Binding: { "data" : { "mykey": "myvalue" } }
// Input Binding: { "mykey": "myvalue" }
// This logic AVOIDS this:
// Output Binding: { "data" : "{ \"mykey\": \"myvalue\" }" }
// Input Binding: "{ \"mykey\": \"myvalue\" }"
jsonMap.put("data", request); jsonMap.put("data", request);
} else {
// When customer provides a custom serializer, he will get a Base64 encoded String back - always.
// Example of body in the input binding resulting from this logic:
// { "data" : "eyJrZXkiOiAidmFsdWUifQ==" }
jsonMap.put("data", objectSerializer.serialize(request));
}
}
StringBuilder url = new StringBuilder(Constants.BINDING_PATH).append("/").append(name); StringBuilder url = new StringBuilder(Constants.BINDING_PATH).append("/").append(name);
return this.client return this.client
@ -219,7 +256,7 @@ public class DaprClientHttp implements DaprClient {
DaprHttp.HttpMethods.POST.name(), DaprHttp.HttpMethods.POST.name(),
url.toString(), url.toString(),
null, null,
objectSerializer.serialize(jsonMap), INTERNAL_SERIALIZER.serialize(jsonMap),
null) null)
.then(); .then();
} catch (Exception ex) { } catch (Exception ex) {