diff --git a/examples/components/kafka_bindings.yaml b/examples/components/kafka_bindings.yaml new file mode 100644 index 000000000..280e241e1 --- /dev/null +++ b/examples/components/kafka_bindings.yaml @@ -0,0 +1,20 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: sample123 +spec: + type: bindings.kafka + metadata: + # Kafka broker connection setting + - name: brokers + value: localhost:9092 + # consumer configuration: topic and consumer group + - name: topics + value: sample + - name: consumerGroup + value: group1 + # publisher configuration: topic + - name: publishTopic + value: sample + - name: authRequired + value: "false" diff --git a/examples/src/main/java/io/dapr/examples/bindings/http/InputBindingExample.java b/examples/src/main/java/io/dapr/examples/bindings/http/InputBindingExample.java new file mode 100644 index 000000000..57ae789fc --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/bindings/http/InputBindingExample.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.examples.bindings.http; + +import io.dapr.runtime.Dapr; +import io.dapr.springboot.DaprApplication; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import reactor.core.publisher.Mono; + +/** + * Service for input binding example. + * 1. From your repo root, build and install jars: + * mvn clean install + * 2. cd to [repo-root]/examples + * 3. Run : + * dapr run --app-id inputbinding --app-port 3000 --port 3005 -- mvn exec:java -Dexec.mainClass=io.dapr.examples.bindings.http.InputBindingExample -Dexec.args="-p 3000" + */ +@SpringBootApplication +public class InputBindingExample { + + public static void main(String[] args) throws Exception { + Options options = new Options(); + options.addRequiredOption("p", "port", true, "Port Dapr will listen to."); + + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(options, args); + + // If port string is not valid, it will throw an exception. + int port = Integer.parseInt(cmd.getOptionValue("port")); + + final String BINDING_NAME = "sample123"; + + // "sample123" is the name of the binding. It will be received at url /v1.0/bindings/sample123 + Dapr.getInstance().registerInputBinding(BINDING_NAME, (message, metadata) -> Mono + .fromSupplier(() -> { + System.out.println("Received message through binding: " + (message == null ? "" : new String(message))); + return Boolean.TRUE; + }) + .then(Mono.empty())); + + // Start Dapr's callback endpoint. + DaprApplication.start(port); + } +} diff --git a/examples/src/main/java/io/dapr/examples/bindings/http/OutputBindingExample.java b/examples/src/main/java/io/dapr/examples/bindings/http/OutputBindingExample.java new file mode 100644 index 000000000..70472fcc2 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/bindings/http/OutputBindingExample.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.examples.bindings.http; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.utils.ObjectSerializer; + +/** + * Service for output binding example. + * 1. From your repo root, build and install jars: + * mvn clean install + * 2. cd to [repo-root]/examples + * 3. Run the program: + * dapr run --app-id outputbinding --port 3006 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.bindings.http.OutputBindingExample + */ +public class OutputBindingExample { + + public static class MyClass { + public MyClass(){} + public String message; + } + + public static void main(String[] args) throws Exception { + DaprClient client = new DaprClientBuilder().build(); + + final String BINDING_NAME = "sample123"; + + // This is an example of sending data in a user-defined object. The input binding will receive: + // {"message":"hello"} + MyClass myClass = new MyClass(); + myClass.message = "hello"; + + System.out.println("sending first message"); + client.invokeBinding(BINDING_NAME, myClass); + + // This is an example of sending a plain string. The input binding will receive + // cat + final String m = "cat"; + System.out.println("sending " + m); + client.invokeBinding(BINDING_NAME, m); + + try { + Thread.sleep((long) (10000 * Math.random())); + } catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + return; + } + + System.out.println("Done."); + } +} diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java b/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java index 99669d306..96095c569 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java @@ -149,11 +149,10 @@ public class DaprClientHttpAdapter implements DaprClient { throw new DaprException("500", "Name to bind cannot be null or empty."); } - String serializedBidingRequestBody = objectSerializer.serializeString(request); - - Map jsonMap = new HashMap<>(); - jsonMap.put("Data", serializedBidingRequestBody); + Map jsonMap = new HashMap<>(); + jsonMap.put("data", request); StringBuilder url = new StringBuilder(Constants.BINDING_PATH).append("/").append(name); + return this.client .invokeAPI( DaprHttp.HttpMethods.POST.name(), diff --git a/sdk/src/main/java/io/dapr/runtime/Dapr.java b/sdk/src/main/java/io/dapr/runtime/Dapr.java index 4681a357d..60de49198 100644 --- a/sdk/src/main/java/io/dapr/runtime/Dapr.java +++ b/sdk/src/main/java/io/dapr/runtime/Dapr.java @@ -87,6 +87,13 @@ public final class Dapr implements DaprRuntime { this.handlers.putIfAbsent(name, new MethodHandler(listener)); } + /** + * {@inheritDoc} + */ + public void registerInputBinding(String name, MethodListener listener) { + this.handlers.putIfAbsent(name, new MethodHandler(listener)); + } + /** * {@inheritDoc} */ diff --git a/sdk/src/main/java/io/dapr/runtime/DaprRuntime.java b/sdk/src/main/java/io/dapr/runtime/DaprRuntime.java index 829739e5f..86df3bda7 100644 --- a/sdk/src/main/java/io/dapr/runtime/DaprRuntime.java +++ b/sdk/src/main/java/io/dapr/runtime/DaprRuntime.java @@ -35,6 +35,14 @@ public interface DaprRuntime { */ void registerServiceMethod(String name, MethodListener listener); + /** + * Registers a method to be executed for an input binding. + * @param name The name of the input binding. + * @param listener The method to run when receiving a message on this binding. + */ + void registerInputBinding(String name, MethodListener listener); + + /** * Handles a given topic message or method API call. * @param name Name of topic or method. @@ -43,4 +51,6 @@ public interface DaprRuntime { * @return Response payload or empty. */ Mono handleInvocation(String name, byte[] payload, Map metadata); + + } diff --git a/sdk/src/main/java/io/dapr/utils/Constants.java b/sdk/src/main/java/io/dapr/utils/Constants.java index f3120f63d..0c99034da 100644 --- a/sdk/src/main/java/io/dapr/utils/Constants.java +++ b/sdk/src/main/java/io/dapr/utils/Constants.java @@ -87,7 +87,7 @@ public final class Constants { /** * Invoke Binding Path */ - public static final String BINDING_PATH = API_VERSION + "/binding"; + public static final String BINDING_PATH = API_VERSION + "/bindings"; /** * State Path