PubSub + related fixes. (#71)

This commit is contained in:
Artur Souza 2020-01-08 12:08:29 -08:00 committed by GitHub
parent 5e3dc74003
commit 858f28a34b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 597 additions and 264 deletions

View File

@ -0,0 +1,50 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.examples.pubsub.http;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import java.util.Collections;
/**
* Message publisher.
* 1. Build and install jars:
* mvn clean install
* 2. Run the program:
* dapr run --app-id publisher --port 3006 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.pubsub.http.Publisher
*/
public class Publisher {
private static final int NUM_MESSAGES = 10;
private static final String TOPIC_NAME = "message";
public static void main(String[] args) throws Exception {
DaprClient client = new DaprClientBuilder().build();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
client.publishEvent(TOPIC_NAME, message).block();
System.out.println("Published message: " + message);
try {
Thread.sleep((long)(1000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
return;
}
}
client.publishEvent(
TOPIC_NAME,
new byte[] { 1 },
Collections.singletonMap("content-type", "application/octet-stream")).block();
System.out.println("Published one byte.");
System.out.println("Done.");
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.examples.pubsub.http;
import io.dapr.runtime.Dapr;
import io.dapr.springboot.DaprApplication;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import reactor.core.publisher.Mono;
/**
* Service for subscriber.
* 1. Build and install jars:
* mvn clean install
* 2. Run the server:
* dapr run --app-id subscriber --app-port 3000 --port 3005 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.pubsub.http.Subscriber -Dexec.args="-p 3000"
*/
@SpringBootApplication
public class Subscriber {
public static void main(String[] args) throws Exception {
Options options = new Options();
options.addRequiredOption("p", "port", true, "Port Dapr will listen to.");
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);
// If port string is not valid, it will throw an exception.
int port = Integer.parseInt(cmd.getOptionValue("port"));
// Subscribe to topic.
Dapr.getInstance().subscribeToTopic("message", (id, dataType, data, metadata) -> Mono
.fromSupplier(() -> {
System.out.println("Subscriber got message (" + id + "): " + (data == null ? "" : new String(data)));
return Boolean.TRUE;
})
.then(Mono.empty()));
// Start Dapr's callback endpoint.
DaprApplication.start(port);
}
}

View File

@ -33,7 +33,7 @@ public class DaprApplication {
* @param args Command line arguments. * @param args Command line arguments.
*/ */
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(DaprApplication.class, args); DaprApplication.start(3000);
} }
} }

View File

@ -10,6 +10,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.actors.runtime.ActorRuntime; import io.dapr.actors.runtime.ActorRuntime;
import io.dapr.runtime.Dapr;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@ -17,6 +18,7 @@ import java.io.IOException;
import java.io.StringWriter; import java.io.StringWriter;
import java.io.Writer; import java.io.Writer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Map;
/** /**
* SpringBoot Controller to handle callback APIs for Dapr. * SpringBoot Controller to handle callback APIs for Dapr.
@ -30,12 +32,12 @@ public class DaprController {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@RequestMapping("/") @GetMapping("/")
public String index() { public String index() {
return "Greetings from Dapr!"; return "Greetings from Dapr!";
} }
@RequestMapping("/dapr/config") @GetMapping("/dapr/config")
public String daprConfig() throws Exception { public String daprConfig() throws Exception {
try (Writer writer = new StringWriter()) { try (Writer writer = new StringWriter()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer); JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
@ -45,10 +47,7 @@ public class DaprController {
generator.writeString(actorClass); generator.writeString(actorClass);
} }
generator.writeEndArray(); generator.writeEndArray();
generator.writeStringField("actorIdleTimeout", "10s"); // TODO: handle configuration.
generator.writeStringField("actorScanInterval", "1s");
generator.writeStringField("drainOngoingCallTimeout", "1s");
generator.writeBooleanField("drainBalancedActors", true);
generator.writeEndObject(); generator.writeEndObject();
generator.close(); generator.close();
writer.flush(); writer.flush();
@ -56,19 +55,48 @@ public class DaprController {
} }
} }
@RequestMapping(method = RequestMethod.POST, path = "/actors/{type}/{id}") @GetMapping("/dapr/subscribe")
public String daprSubscribe() throws Exception {
try (Writer writer = new StringWriter()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
generator.writeStartArray();
for (String topic : Dapr.getInstance().getSubscribedTopics()) {
generator.writeString(topic);
}
generator.writeEndArray();
generator.close();
writer.flush();
return writer.toString();
}
}
@PostMapping(path = "/{name}")
public Mono<byte[]> invokeMethodOrTopic(@PathVariable("name") String name,
@RequestBody(required = false) byte[] body,
@RequestHeader Map<String, String> header) {
return Dapr.getInstance().handleInvocation(name, body, header);
}
@PutMapping(path = "/{name}")
public Mono<byte[]> invokeMethodOrTopicViaPut(@PathVariable("name") String name,
@RequestBody(required = false) byte[] body,
@RequestHeader Map<String, String> header) {
return Dapr.getInstance().handleInvocation(name, body, header);
}
@PostMapping(path = "/actors/{type}/{id}")
public Mono<Void> activateActor(@PathVariable("type") String type, public Mono<Void> activateActor(@PathVariable("type") String type,
@PathVariable("id") String id) throws Exception { @PathVariable("id") String id) throws Exception {
return ActorRuntime.getInstance().activate(type, id); return ActorRuntime.getInstance().activate(type, id);
} }
@RequestMapping(method = RequestMethod.DELETE, path = "/actors/{type}/{id}") @DeleteMapping(path = "/actors/{type}/{id}")
public Mono<Void> deactivateActor(@PathVariable("type") String type, public Mono<Void> deactivateActor(@PathVariable("type") String type,
@PathVariable("id") String id) throws Exception { @PathVariable("id") String id) throws Exception {
return ActorRuntime.getInstance().deactivate(type, id); return ActorRuntime.getInstance().deactivate(type, id);
} }
@RequestMapping(method = RequestMethod.PUT, path = "/actors/{type}/{id}/method/{method}") @PutMapping(path = "/actors/{type}/{id}/method/{method}")
public Mono<String> invokeActorMethod(@PathVariable("type") String type, public Mono<String> invokeActorMethod(@PathVariable("type") String type,
@PathVariable("id") String id, @PathVariable("id") String id,
@PathVariable("method") String method, @PathVariable("method") String method,
@ -81,14 +109,14 @@ public class DaprController {
} }
} }
@RequestMapping(method = RequestMethod.PUT, path = "/actors/{type}/{id}/method/timer/{timer}") @PutMapping(path = "/actors/{type}/{id}/method/timer/{timer}")
public Mono<Void> invokeActorTimer(@PathVariable("type") String type, public Mono<Void> invokeActorTimer(@PathVariable("type") String type,
@PathVariable("id") String id, @PathVariable("id") String id,
@PathVariable("timer") String timer) { @PathVariable("timer") String timer) {
return ActorRuntime.getInstance().invokeTimer(type, id, timer); return ActorRuntime.getInstance().invokeTimer(type, id, timer);
} }
@RequestMapping(method = RequestMethod.PUT, path = "/actors/{type}/{id}/method/remind/{reminder}") @PutMapping(path = "/actors/{type}/{id}/method/remind/{reminder}")
public Mono<Void> invokeActorReminder(@PathVariable("type") String type, public Mono<Void> invokeActorReminder(@PathVariable("type") String type,
@PathVariable("id") String id, @PathVariable("id") String id,
@PathVariable("reminder") String reminder, @PathVariable("reminder") String reminder,

View File

@ -95,9 +95,9 @@ public abstract class AbstractActor {
Duration dueTime, Duration dueTime,
Duration period) { Duration period) {
try { try {
String data = this.actorRuntimeContext.getActorSerializer().serialize(state); String data = this.actorRuntimeContext.getActorSerializer().serializeString(state);
ActorReminderParams params = new ActorReminderParams(data, dueTime, period); ActorReminderParams params = new ActorReminderParams(data, dueTime, period);
String serialized = this.actorRuntimeContext.getActorSerializer().serialize(params); String serialized = this.actorRuntimeContext.getActorSerializer().serializeString(params);
return this.actorRuntimeContext.getDaprClient().registerActorReminder( return this.actorRuntimeContext.getDaprClient().registerActorReminder(
this.actorRuntimeContext.getActorTypeInformation().getName(), this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(), this.id.toString(),
@ -139,7 +139,7 @@ public abstract class AbstractActor {
try { try {
ActorTimer actorTimer = new ActorTimer(this, name, callback, state, dueTime, period); ActorTimer actorTimer = new ActorTimer(this, name, callback, state, dueTime, period);
String serializedTimer = this.actorRuntimeContext.getActorSerializer().serialize(actorTimer); String serializedTimer = this.actorRuntimeContext.getActorSerializer().serializeString(actorTimer);
this.timers.put(name, actorTimer); this.timers.put(name, actorTimer);
return this.actorRuntimeContext.getDaprClient().registerActorTimer( return this.actorRuntimeContext.getDaprClient().registerActorTimer(

View File

@ -226,7 +226,7 @@ class ActorManager<T extends AbstractActor> {
if (response instanceof Mono) { if (response instanceof Mono) {
return ((Mono<Object>) response).map(r -> { return ((Mono<Object>) response).map(r -> {
try { try {
return this.runtimeContext.getActorSerializer().serialize(r); return this.runtimeContext.getActorSerializer().serializeString(r);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -234,7 +234,7 @@ class ActorManager<T extends AbstractActor> {
} }
// Method was not Mono, so we serialize response. // Method was not Mono, so we serialize response.
return Mono.just(this.runtimeContext.getActorSerializer().serialize(response)); return Mono.just(this.runtimeContext.getActorSerializer().serializeString(response));
} catch (Exception e) { } catch (Exception e) {
return Mono.error(e); return Mono.error(e);
} }

View File

@ -103,10 +103,9 @@ public class ActorRuntime {
* *
* @param clazz The type of actor. * @param clazz The type of actor.
* @param <T> Actor class type. * @param <T> Actor class type.
* @return Async void task.
*/ */
public <T extends AbstractActor> Mono<Void> registerActor(Class<T> clazz) { public <T extends AbstractActor> void registerActor(Class<T> clazz) {
return registerActor(clazz, null); registerActor(clazz, null);
} }
/** /**
@ -115,10 +114,9 @@ public class ActorRuntime {
* @param clazz The type of actor. * @param clazz The type of actor.
* @param actorFactory An optional factory to create actors. * @param actorFactory An optional factory to create actors.
* @param <T> Actor class type. * @param <T> Actor class type.
* @return Async void task.
* This can be used for dependency injection into actors. * This can be used for dependency injection into actors.
*/ */
public <T extends AbstractActor> Mono<Void> registerActor(Class<T> clazz, ActorFactory<T> actorFactory) { public <T extends AbstractActor> void registerActor(Class<T> clazz, ActorFactory<T> actorFactory) {
ActorTypeInformation<T> actorTypeInfo = ActorTypeInformation.create(clazz); ActorTypeInformation<T> actorTypeInfo = ActorTypeInformation.create(clazz);
ActorFactory<T> actualActorFactory = actorFactory != null ? actorFactory : new DefaultActorFactory<T>(); ActorFactory<T> actualActorFactory = actorFactory != null ? actorFactory : new DefaultActorFactory<T>();
@ -133,7 +131,6 @@ public class ActorRuntime {
// Create ActorManagers, override existing entry if registered again. // Create ActorManagers, override existing entry if registered again.
this.actorManagers.put(actorTypeInfo.getName(), new ActorManager<T>(context)); this.actorManagers.put(actorTypeInfo.getName(), new ActorManager<T>(context));
return Mono.empty();
} }
/** /**

View File

@ -23,7 +23,7 @@ public class ActorStateSerializer extends ObjectSerializer {
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
public <T> String serialize(T state) throws IOException { public <T> String serializeString(T state) throws IOException {
if (state == null) { if (state == null) {
return null; return null;
} }
@ -39,7 +39,7 @@ public class ActorStateSerializer extends ObjectSerializer {
} }
// Is not an special case. // Is not an special case.
return super.serialize(state); return super.serializeString(state);
} }
/** /**
@ -105,13 +105,13 @@ public class ActorStateSerializer extends ObjectSerializer {
return null; return null;
} }
String json = this.serialize(request); byte[] data = this.serialize(request);
try (Writer writer = new StringWriter()) { try (Writer writer = new StringWriter()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer); JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
generator.writeStartObject(); generator.writeStartObject();
if (json != null) { if (data != null) {
generator.writeBinaryField("data", json.getBytes()); generator.writeBinaryField("data", data);
} }
generator.writeEndObject(); generator.writeEndObject();
generator.close(); generator.close();
@ -139,7 +139,7 @@ public class ActorStateSerializer extends ObjectSerializer {
generator.writeStringField("period", DurationUtils.ConvertDurationToDaprFormat(timer.getPeriod())); generator.writeStringField("period", DurationUtils.ConvertDurationToDaprFormat(timer.getPeriod()));
generator.writeStringField("callback", timer.getCallback()); generator.writeStringField("callback", timer.getCallback());
if (timer.getState() != null) { if (timer.getState() != null) {
generator.writeStringField("data", this.serialize(timer.getState())); generator.writeStringField("data", this.serializeString(timer.getState()));
} }
generator.writeEndObject(); generator.writeEndObject();
generator.close(); generator.close();

View File

@ -112,7 +112,7 @@ class DaprStateAsyncProvider {
generator.writeObjectFieldStart("request"); generator.writeObjectFieldStart("request");
generator.writeStringField("key", stateChange.getStateName()); generator.writeStringField("key", stateChange.getStateName());
if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE) || (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) { if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE) || (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) {
generator.writeStringField("value", this.serializer.serialize(stateChange.getValue())); generator.writeStringField("value", this.serializer.serializeString(stateChange.getValue()));
} }
// End request object. // End request object.
generator.writeEndObject(); generator.writeEndObject();

View File

@ -9,6 +9,7 @@ import io.dapr.client.domain.StateOptions;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required. * Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required.
@ -19,15 +20,28 @@ public interface DaprClient {
/** /**
* Publish an event. * Publish an event.
*
* @param topic the topic where the event will be published * @param topic the topic where the event will be published
* @param event the event to be published * @param event the event to be published
* @param <T> The type of event to be publishded. * @param <T> The type of event to be published.
* @return a Mono plan of type Void * @return a Mono plan of type Void
*/ */
<T> Mono<Void> publishEvent(String topic, T event); <T> Mono<Void> publishEvent(String topic, T event);
/**
* Publish an event.
*
* @param topic the topic where the event will be published
* @param event the event to be published
* @param metadata The metadata for the published event.
* @param <T> The type of event to be published.
* @return a Mono plan of type Void
*/
<T> Mono<Void> publishEvent(String topic, T event, Map<String, String> metadata);
/** /**
* Invoke a service * Invoke a service
*
* @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value. * @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value.
* @param appId The Application ID where the service is * @param appId The Application ID where the service is
* @param method The actual Method to be call in the application. * @param method The actual Method to be call in the application.
@ -41,6 +55,7 @@ public interface DaprClient {
/** /**
* Invoke a service * Invoke a service
*
* @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value. * @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value.
* @param appId The Application ID where the service is * @param appId The Application ID where the service is
* @param method The actual Method to be call in the application. * @param method The actual Method to be call in the application.
@ -52,6 +67,7 @@ public interface DaprClient {
/** /**
* Creating a Binding * Creating a Binding
*
* @param name The name of the biding to call * @param name The name of the biding to call
* @param request The request needed for the binding * @param request The request needed for the binding
* @param <T> The type of the request. * @param <T> The type of the request.
@ -62,11 +78,11 @@ public interface DaprClient {
/** /**
* Retrieve a State based on their key. * Retrieve a State based on their key.
* *
* @param state The key of the State to be retrieved * @param state The key of the State to be retrieved
* @param stateOptions * @param stateOptions
* @param clazz the Type of State needed as return. * @param clazz the Type of State needed as return.
* @param <T> the Type of the return * @param <T> the Type of the return
* @param <K> The Type of the key of the State * @param <K> The Type of the key of the State
* @return A Mono Plan for the requested State * @return A Mono Plan for the requested State
*/ */
<T, K> Mono<T> getState(StateKeyValue<K> state, StateOptions stateOptions, Class<T> clazz); <T, K> Mono<T> getState(StateKeyValue<K> state, StateOptions stateOptions, Class<T> clazz);
@ -74,20 +90,21 @@ public interface DaprClient {
/** /**
* Save/Update a list of states. * Save/Update a list of states.
* *
* @param states the States to be saved * @param states the States to be saved
* @param options the Options to use for each state * @param options the Options to use for each state
* @param <T> the Type of the State * @param <T> the Type of the State
* @return a Mono plan of type Void * @return a Mono plan of type Void
*/ */
<T> Mono<Void> saveStates(List<StateKeyValue<T>> states, StateOptions options); <T> Mono<Void> saveStates(List<StateKeyValue<T>> states, StateOptions options);
/** /**
* Save/Update a state * Save/Update a state
* @param key the key of the state *
* @param etag the etag to be used * @param key the key of the state
* @param value the value of the state * @param etag the etag to be used
* @param value the value of the state
* @param options the Options to use for each state * @param options the Options to use for each state
* @param <T> the Type of the State * @param <T> the Type of the State
* @return a Mono plan of type Void * @return a Mono plan of type Void
*/ */
<T> Mono<Void> saveState(String key, String etag, T value, StateOptions options); <T> Mono<Void> saveState(String key, String etag, T value, StateOptions options);
@ -95,9 +112,9 @@ public interface DaprClient {
/** /**
* Delete a state * Delete a state
* *
* @param state The key of the State to be removed * @param state The key of the State to be removed
* @param options The options of the state * @param options The options of the state
* @param <T> The Type of the key of the State * @param <T> The Type of the key of the State
* @return a Mono plan of type Void * @return a Mono plan of type Void
*/ */
<T> Mono<Void> deleteState(StateKeyValue<T> state, StateOptions options); <T> Mono<Void> deleteState(StateKeyValue<T> state, StateOptions options);

View File

@ -22,16 +22,9 @@ public class DaprClientBuilder {
private static final int port = DaprClientBuilder.getEnvPortOrDefault(); private static final int port = DaprClientBuilder.getEnvPortOrDefault();
/** /**
* Default host for Dapr after checking environment variable. * Unique instance of httpClient to be shared.
*/ */
private static final String host = Constants.DEFAULT_HOSTNAME; private static volatile DaprClientHttpAdapter daprHttClient;
/**
* The HTTP Client that will be used to injectto connect to Dapr
*/
private static OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
private static DaprClientHttpAdapter daprHttClient;
/** /**
* Tries to get a valid port from environment variable or returns default. * Tries to get a valid port from environment variable or returns default.
@ -70,13 +63,10 @@ public class DaprClientBuilder {
* @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number. * @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number.
*/ */
private DaprClient buildDaprClientGrpc() { private DaprClient buildDaprClientGrpc() {
if (null == this.host || "".equals(this.host.trim())) {
throw new IllegalStateException("Host must is required.");
}
if (port <= 0) { if (port <= 0) {
throw new IllegalStateException("Invalid port."); throw new IllegalStateException("Invalid port.");
} }
ManagedChannel channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build(); ManagedChannel channel = ManagedChannelBuilder.forAddress(Constants.DEFAULT_HOSTNAME, port).usePlaintext().build();
return new DaprClientGrpcAdapter(DaprGrpc.newFutureStub(channel)); return new DaprClientGrpcAdapter(DaprGrpc.newFutureStub(channel));
} }
@ -87,21 +77,19 @@ public class DaprClientBuilder {
* @return * @return
*/ */
private DaprClient buildDaprClientHttp() { private DaprClient buildDaprClientHttp() {
if (null == this.host || "".equals(this.host.trim())) {
throw new IllegalStateException("Host must is required.");
}
if (port <= 0) { if (port <= 0) {
throw new IllegalStateException("Invalid port."); throw new IllegalStateException("Invalid port.");
} }
if (daprHttClient == null) { if (this.daprHttClient == null) {
synchronized (okHttpClient) { synchronized (DaprClientBuilder.class) {
if (daprHttClient == null) { if (this.daprHttClient == null) {
DaprHttp daprHtt = new DaprHttp(Constants.DEFAULT_HTTP_PROTOCOL_IDENTIFIED+host, port, okHttpClient); OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
daprHttClient = new DaprClientHttpAdapter(daprHtt); DaprHttp daprHtt = new DaprHttp(Constants.DEFAULT_BASE_HTTP_URL, port, okHttpClient);
this.daprHttClient = new DaprClientHttpAdapter(daprHtt);
} }
} }
} }
return daprHttClient; return this.daprHttClient;
} }
} }

View File

@ -51,13 +51,22 @@ class DaprClientGrpcAdapter implements DaprClient {
*/ */
@Override @Override
public <T> Mono<Void> publishEvent(String topic, T event) { public <T> Mono<Void> publishEvent(String topic, T event) {
return this.publishEvent(topic, event, null);
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> publishEvent(String topic, T event, Map<String, String> metadata) {
try { try {
String serializedEvent = objectSerializer.serialize(event); String serializedEvent = objectSerializer.serializeString(event);
Map<String, String> mapEvent = new HashMap<>(); Map<String, String> mapEvent = new HashMap<>();
mapEvent.put("Topic", topic); mapEvent.put("Topic", topic);
mapEvent.put("Data", serializedEvent); mapEvent.put("Data", serializedEvent);
// TODO: handle metadata.
byte[] byteEvent = objectSerializer.serialize(mapEvent).getBytes(); byte[] byteEvent = objectSerializer.serialize(mapEvent);
DaprProtos.PublishEventEnvelope envelope = DaprProtos.PublishEventEnvelope.parseFrom(byteEvent); DaprProtos.PublishEventEnvelope envelope = DaprProtos.PublishEventEnvelope.parseFrom(byteEvent);
ListenableFuture<Empty> futureEmpty = client.publishEvent(envelope); ListenableFuture<Empty> futureEmpty = client.publishEvent(envelope);
@ -81,12 +90,12 @@ class DaprClientGrpcAdapter implements DaprClient {
public <T, K> Mono<T> invokeService(String verb, String appId, String method, K request, Class<T> clazz) { public <T, K> Mono<T> invokeService(String verb, String appId, String method, K request, Class<T> clazz) {
try { try {
Map<String, String> mapMessage = new HashMap<>(); Map<String, String> mapMessage = new HashMap<>();
mapMessage.put("Id", objectSerializer.serialize(appId)); mapMessage.put("Id", appId);
mapMessage.put("Method", objectSerializer.serialize(verb)); mapMessage.put("Method", verb);
mapMessage.put("Data", objectSerializer.serialize(request)); mapMessage.put("Data", objectSerializer.serializeString(request));
DaprProtos.InvokeServiceEnvelope envelope = DaprProtos.InvokeServiceEnvelope envelope =
DaprProtos.InvokeServiceEnvelope.parseFrom(objectSerializer.serialize(mapMessage).getBytes()); DaprProtos.InvokeServiceEnvelope.parseFrom(objectSerializer.serialize(mapMessage));
ListenableFuture<DaprProtos.InvokeServiceResponseEnvelope> futureResponse = ListenableFuture<DaprProtos.InvokeServiceResponseEnvelope> futureResponse =
client.invokeService(envelope); client.invokeService(envelope);
return Mono.just(futureResponse).flatMap(f -> { return Mono.just(futureResponse).flatMap(f -> {
@ -118,9 +127,9 @@ class DaprClientGrpcAdapter implements DaprClient {
try { try {
Map<String, String> mapMessage = new HashMap<>(); Map<String, String> mapMessage = new HashMap<>();
mapMessage.put("Name", name); mapMessage.put("Name", name);
mapMessage.put("Data", objectSerializer.serialize(request)); mapMessage.put("Data", objectSerializer.serializeString(request));
DaprProtos.InvokeBindingEnvelope envelope = DaprProtos.InvokeBindingEnvelope envelope =
DaprProtos.InvokeBindingEnvelope.parseFrom(objectSerializer.serialize(mapMessage).getBytes()); DaprProtos.InvokeBindingEnvelope.parseFrom(objectSerializer.serialize(mapMessage));
ListenableFuture<Empty> futureEmpty = client.invokeBinding(envelope); ListenableFuture<Empty> futureEmpty = client.invokeBinding(envelope);
return Mono.just(futureEmpty).flatMap(f -> { return Mono.just(futureEmpty).flatMap(f -> {
try { try {
@ -144,8 +153,8 @@ class DaprClientGrpcAdapter implements DaprClient {
Map<String, String> request = new HashMap<>(); Map<String, String> request = new HashMap<>();
request.put("Key", key.getKey()); request.put("Key", key.getKey());
request.put("Consistency", stateOptions.getConsistency()); request.put("Consistency", stateOptions.getConsistency());
String serializedRequest = objectSerializer.serialize(request); byte[] serializedRequest = objectSerializer.serialize(request);
DaprProtos.GetStateEnvelope envelope = DaprProtos.GetStateEnvelope.parseFrom(serializedRequest.getBytes()); DaprProtos.GetStateEnvelope envelope = DaprProtos.GetStateEnvelope.parseFrom(serializedRequest);
ListenableFuture<DaprProtos.GetStateResponseEnvelope> futureResponse = client.getState(envelope); ListenableFuture<DaprProtos.GetStateResponseEnvelope> futureResponse = client.getState(envelope);
return Mono.just(futureResponse).flatMap(f -> { return Mono.just(futureResponse).flatMap(f -> {
try { try {
@ -173,7 +182,7 @@ class DaprClientGrpcAdapter implements DaprClient {
}; };
Map<String, Object> mapStates = new HashMap<>(); Map<String, Object> mapStates = new HashMap<>();
mapStates.put("Requests", listStates); mapStates.put("Requests", listStates);
byte[] byteRequests = objectSerializer.serialize(mapStates).getBytes(); byte[] byteRequests = objectSerializer.serialize(mapStates);
DaprProtos.SaveStateEnvelope envelope = DaprProtos.SaveStateEnvelope.parseFrom(byteRequests); DaprProtos.SaveStateEnvelope envelope = DaprProtos.SaveStateEnvelope.parseFrom(byteRequests);
ListenableFuture<Empty> futureEmpty = client.saveState(envelope); ListenableFuture<Empty> futureEmpty = client.saveState(envelope);
return Mono.just(futureEmpty).flatMap(f -> { return Mono.just(futureEmpty).flatMap(f -> {
@ -204,8 +213,8 @@ class DaprClientGrpcAdapter implements DaprClient {
try { try {
Map<String, Object> mapOptions = transformStateOptionsToMap(options); Map<String, Object> mapOptions = transformStateOptionsToMap(options);
Map<String, Object> mapState = transformStateKeyValueToMap(state, mapOptions); Map<String, Object> mapState = transformStateKeyValueToMap(state, mapOptions);
String serializedState = objectSerializer.serialize(mapState); byte[] serializedState = objectSerializer.serialize(mapState);
DaprProtos.DeleteStateEnvelope envelope = DaprProtos.DeleteStateEnvelope.parseFrom(serializedState.getBytes()); DaprProtos.DeleteStateEnvelope envelope = DaprProtos.DeleteStateEnvelope.parseFrom(serializedState);
ListenableFuture<Empty> futureEmpty = client.deleteState(envelope); ListenableFuture<Empty> futureEmpty = client.deleteState(envelope);
return Mono.just(futureEmpty).flatMap(f -> { return Mono.just(futureEmpty).flatMap(f -> {
try { try {

View File

@ -28,22 +28,22 @@ public class DaprClientHttpAdapter implements DaprClient {
* *
* @see io.dapr.client.DaprHttp * @see io.dapr.client.DaprHttp
*/ */
private DaprHttp client; private final DaprHttp client;
/** /**
* A utitlity class for serialize and deserialize the messages sent and retrived by the client. * A utitlity class for serialize and deserialize the messages sent and retrived by the client.
*/ */
private ObjectSerializer objectSerializer; private final ObjectSerializer objectSerializer;
/** /**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
* *
* @param httpClient * @param client Dapr's http client.
* @see io.dapr.client.DaprClientBuilder * @see io.dapr.client.DaprClientBuilder
*/ */
DaprClientHttpAdapter(DaprHttp httpClient) { DaprClientHttpAdapter(DaprHttp client) {
this.client = client; this.client = client;
objectSerializer = new ObjectSerializer(); this.objectSerializer = new ObjectSerializer();
} }
/** /**
@ -51,22 +51,23 @@ public class DaprClientHttpAdapter implements DaprClient {
*/ */
@Override @Override
public <T> Mono<Void> publishEvent(String topic, T event) { public <T> Mono<Void> publishEvent(String topic, T event) {
return this.publishEvent(topic, event, null);
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> publishEvent(String topic, T event, Map<String, String> metadata) {
try { try {
if (topic == null || topic.trim().isEmpty()) { if (topic == null || topic.trim().isEmpty()) {
throw new DaprException("500", "Name cannot be null or empty."); throw new DaprException("INVALID_TOPIC", "Topic name cannot be null or empty.");
} }
String serializedEvent = objectSerializer.serialize(event);
byte[] serializedEvent = objectSerializer.serialize(event);
StringBuilder url = new StringBuilder(Constants.PUBLISH_PATH).append("/").append(topic); StringBuilder url = new StringBuilder(Constants.PUBLISH_PATH).append("/").append(topic);
CompletableFuture<Void> futureVoid = client.invokeAPIVoid( return this.client.invokeAPI(
Constants.defaultHttpMethodSupported.POST.name(), url.toString(), serializedEvent, null); Constants.defaultHttpMethodSupported.POST.name(), url.toString(), serializedEvent, metadata).then();
return Mono.just(futureVoid).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) { } catch (Exception ex) {
return Mono.error(ex); return Mono.error(ex);
} }
@ -91,20 +92,16 @@ public class DaprClientHttpAdapter implements DaprClient {
if (method == null || method.trim().isEmpty()) { if (method == null || method.trim().isEmpty()) {
throw new DaprException("500", "App Id cannot be null or empty."); throw new DaprException("500", "App Id cannot be null or empty.");
} }
StringBuilder urlSB = new StringBuilder("/invoke/"); String path = String.format("/invoke/%s/method/%s", appId, method);
urlSB.append(objectSerializer.serialize(appId)); byte[] serializedRequestBody = objectSerializer.serialize(request);
urlSB.append("/method/"); return this.client.invokeAPI(httMethod.name(), path, serializedRequestBody, null)
urlSB.append(objectSerializer.serialize(method)); .flatMap(r -> {
String serializedRequestBody = objectSerializer.serialize(request); try {
CompletableFuture<String> futureResponse = return Mono.just(objectSerializer.deserialize(r, clazz));
client.invokeAPI(httMethod.name(), urlSB.toString(), serializedRequestBody, null); } catch (Exception ex) {
return Mono.just(futureResponse).flatMap(f -> { return Mono.error(ex);
try { }
return Mono.just(objectSerializer.deserialize(f.get(), clazz)); });
} catch (Exception ex) {
return Mono.error(ex);
}
});
} catch (Exception ex) { } catch (Exception ex) {
return Mono.error(ex); return Mono.error(ex);
} }
@ -129,21 +126,9 @@ public class DaprClientHttpAdapter implements DaprClient {
if (method == null || method.trim().isEmpty()) { if (method == null || method.trim().isEmpty()) {
throw new DaprException("500", "Method to invoke cannot be null or empty."); throw new DaprException("500", "Method to invoke cannot be null or empty.");
} }
StringBuilder urlSB = new StringBuilder("/invoke/"); String path = String.format("/invoke/%s/method/%s", appId, method);
urlSB.append(objectSerializer.serialize(appId)); byte[] serializedRequestBody = objectSerializer.serialize(request);
urlSB.append("/method/"); return this.client.invokeAPI(httMethod.name(), path, serializedRequestBody, null).then();
urlSB.append(objectSerializer.serialize(method));
String serializedRequestBody = objectSerializer.serialize(request);
CompletableFuture<Void> futureVoid =
client.invokeAPIVoid(httMethod.name(), urlSB.toString(), serializedRequestBody, null);
return Mono.just(futureVoid).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) { } catch (Exception ex) {
return Mono.error(ex); return Mono.error(ex);
} }
@ -159,21 +144,18 @@ public class DaprClientHttpAdapter implements DaprClient {
throw new DaprException("500", "Name to bind cannot be null or empty."); throw new DaprException("500", "Name to bind cannot be null or empty.");
} }
String serializedBidingRequestBody = objectSerializer.serialize(request); String serializedBidingRequestBody = objectSerializer.serializeString(request);
Map<String, String> jsonMap = new HashMap<>(); Map<String, String> jsonMap = new HashMap<>();
jsonMap.put("Data", serializedBidingRequestBody); jsonMap.put("Data", serializedBidingRequestBody);
StringBuilder url = new StringBuilder(Constants.BINDING_PATH).append("/").append(name); StringBuilder url = new StringBuilder(Constants.BINDING_PATH).append("/").append(name);
CompletableFuture<Void> futureVoid = client.invokeAPIVoid( return this.client
Constants.defaultHttpMethodSupported.POST.name(), url.toString(), objectSerializer.serialize(jsonMap), null); .invokeAPI(
return Mono.just(futureVoid).flatMap(f -> { Constants.defaultHttpMethodSupported.POST.name(),
try { url.toString(),
f.get(); objectSerializer.serialize(jsonMap),
} catch (Exception ex) { null)
return Mono.error(ex); .then();
}
return Mono.empty();
});
} catch (Exception ex) { } catch (Exception ex) {
return Mono.error(ex); return Mono.error(ex);
} }
@ -192,21 +174,20 @@ public class DaprClientHttpAdapter implements DaprClient {
if (state.getEtag() != null && !state.getEtag().trim().isEmpty()) { if (state.getEtag() != null && !state.getEtag().trim().isEmpty()) {
headers.put(Constants.HEADER_HTTP_ETAG_ID, state.getEtag()); headers.put(Constants.HEADER_HTTP_ETAG_ID, state.getEtag());
} }
String serializedKeyBody = objectSerializer.serialize(state.getKey());
serializedKeyBody += getOptionsAsQueryParameter(options); StringBuilder url = new StringBuilder(Constants.STATE_PATH)
if (options.getConsistency() != null && !options.getConsistency().trim().isEmpty()) { .append("/")
serializedKeyBody += "?consistency=" + objectSerializer.serialize(options.getConsistency()); .append(state.getKey())
} .append(getOptionsAsQueryParameter(options));
StringBuilder url = new StringBuilder(Constants.STATE_PATH).append("/").append(serializedKeyBody); return this.client
CompletableFuture<String> futureResponse = .invokeAPI(Constants.defaultHttpMethodSupported.GET.name(), url.toString(), headers)
client.invokeAPI(Constants.defaultHttpMethodSupported.GET.name(), url.toString(), null, headers); .flatMap(s -> {
return Mono.just(futureResponse).flatMap(f -> { try {
try { return Mono.just(objectSerializer.deserialize(s, clazz));
return Mono.just(objectSerializer.deserialize(f.get(), clazz)); } catch (Exception ex) {
} catch (Exception ex) { return Mono.error(ex);
return Mono.error(ex); }
} });
});
} catch (Exception ex) { } catch (Exception ex) {
return Mono.error(ex); return Mono.error(ex);
} }
@ -228,17 +209,9 @@ public class DaprClientHttpAdapter implements DaprClient {
headers.put(Constants.HEADER_HTTP_ETAG_ID, etag); headers.put(Constants.HEADER_HTTP_ETAG_ID, etag);
} }
String url = Constants.STATE_PATH + getOptionsAsQueryParameter(options);; String url = Constants.STATE_PATH + getOptionsAsQueryParameter(options);;
String serializedStateBody = objectSerializer.serialize(states); byte[] serializedStateBody = objectSerializer.serialize(states);
CompletableFuture<Void> futureVoid = client.invokeAPIVoid( return this.client.invokeAPI(
Constants.defaultHttpMethodSupported.POST.name(), url, serializedStateBody, headers); Constants.defaultHttpMethodSupported.POST.name(), url, serializedStateBody, headers).then();
return Mono.just(futureVoid).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) { } catch (Exception ex) {
return Mono.error(ex); return Mono.error(ex);
} }
@ -263,19 +236,8 @@ public class DaprClientHttpAdapter implements DaprClient {
if (state.getEtag() != null && !state.getEtag().trim().isEmpty()) { if (state.getEtag() != null && !state.getEtag().trim().isEmpty()) {
headers.put(Constants.HEADER_HTTP_ETAG_ID, state.getEtag()); headers.put(Constants.HEADER_HTTP_ETAG_ID, state.getEtag());
} }
String serializedKey = objectSerializer.serialize(state.getKey()); String url = Constants.STATE_PATH + "/" + state.getKey() + getOptionsAsQueryParameter(options);
serializedKey += getOptionsAsQueryParameter(options); return this.client.invokeAPI(Constants.defaultHttpMethodSupported.DELETE.name(), url, headers).then();
String url = Constants.STATE_PATH + "/" + serializedKey;
CompletableFuture<Void> futureVoid = client.invokeAPIVoid(
Constants.defaultHttpMethodSupported.DELETE.name(), url, null, headers);
return Mono.just(futureVoid).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) { } catch (Exception ex) {
return Mono.error(ex); return Mono.error(ex);
} }
@ -284,76 +246,43 @@ public class DaprClientHttpAdapter implements DaprClient {
@Override @Override
public Mono<String> invokeActorMethod(String actorType, String actorId, String methodName, String jsonPayload) { public Mono<String> invokeActorMethod(String actorType, String actorId, String methodName, String jsonPayload) {
String url = String.format(Constants.ACTOR_METHOD_RELATIVE_URL_FORMAT, actorType, actorId, methodName); String url = String.format(Constants.ACTOR_METHOD_RELATIVE_URL_FORMAT, actorType, actorId, methodName);
return actorActionString(Constants.defaultHttpMethodSupported.POST.name(), url, jsonPayload); return this.client.invokeAPI(Constants.defaultHttpMethodSupported.POST.name(), url, jsonPayload, null);
} }
@Override @Override
public Mono<String> getActorState(String actorType, String actorId, String keyName) { public Mono<String> getActorState(String actorType, String actorId, String keyName) {
String url = String.format(Constants.ACTOR_STATE_KEY_RELATIVE_URL_FORMAT, actorType, actorId, keyName); String url = String.format(Constants.ACTOR_STATE_KEY_RELATIVE_URL_FORMAT, actorType, actorId, keyName);
return actorActionString(Constants.defaultHttpMethodSupported.GET.name(), url, null); return this.client.invokeAPI(Constants.defaultHttpMethodSupported.GET.name(), url, "", null);
} }
@Override @Override
public Mono<Void> saveActorStateTransactionally(String actorType, String actorId, String data) { public Mono<Void> saveActorStateTransactionally(String actorType, String actorId, String data) {
String url = String.format(Constants.ACTOR_STATE_RELATIVE_URL_FORMAT, actorType, actorId); String url = String.format(Constants.ACTOR_STATE_RELATIVE_URL_FORMAT, actorType, actorId);
return actorActionVoid(Constants.defaultHttpMethodSupported.PUT.name(), url, data); return this.client.invokeAPI(Constants.defaultHttpMethodSupported.PUT.name(), url, data, null).then();
} }
@Override @Override
public Mono<Void> registerActorReminder(String actorType, String actorId, String reminderName, String data) { public Mono<Void> registerActorReminder(String actorType, String actorId, String reminderName, String data) {
String url = String.format(Constants.ACTOR_REMINDER_RELATIVE_URL_FORMAT, actorType, actorId, reminderName); String url = String.format(Constants.ACTOR_REMINDER_RELATIVE_URL_FORMAT, actorType, actorId, reminderName);
return actorActionVoid(Constants.defaultHttpMethodSupported.PUT.name(), url, data); return this.client.invokeAPI(Constants.defaultHttpMethodSupported.PUT.name(), url, data, null).then();
} }
@Override @Override
public Mono<Void> unregisterActorReminder(String actorType, String actorId, String reminderName) { public Mono<Void> unregisterActorReminder(String actorType, String actorId, String reminderName) {
String url = String.format(Constants.ACTOR_REMINDER_RELATIVE_URL_FORMAT, actorType, actorId, reminderName); String url = String.format(Constants.ACTOR_REMINDER_RELATIVE_URL_FORMAT, actorType, actorId, reminderName);
return actorActionVoid(Constants.defaultHttpMethodSupported.DELETE.name(), url, null); return this.client.invokeAPI(Constants.defaultHttpMethodSupported.DELETE.name(), url, null).then();
} }
@Override @Override
public Mono<Void> registerActorTimer(String actorType, String actorId, String timerName, String data) { public Mono<Void> registerActorTimer(String actorType, String actorId, String timerName, String data) {
String url = String.format(Constants.ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName); String url = String.format(Constants.ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName);
return actorActionVoid(Constants.defaultHttpMethodSupported.PUT.name(), url, data); return this.client.invokeAPI(Constants.defaultHttpMethodSupported.PUT.name(), url, data, null).then();
} }
@Override @Override
public Mono<Void> unregisterActorTimer(String actorType, String actorId, String timerName) { public Mono<Void> unregisterActorTimer(String actorType, String actorId, String timerName) {
String url = String.format(Constants.ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName); String url = String.format(Constants.ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName);
return actorActionVoid(Constants.defaultHttpMethodSupported.DELETE.name(), url, null); return this.client.invokeAPI(Constants.defaultHttpMethodSupported.DELETE.name(), url, null).then();
}
private Mono<String> actorActionString(String httpVerb, String url, String payload) {
try {
CompletableFuture<String> futureResponse =
client.invokeAPI(httpVerb, url, objectSerializer.serialize(payload), null);
return Mono.just(futureResponse).flatMap(f -> {
try {
return Mono.just(objectSerializer.deserialize(f.get(), String.class));
} catch (Exception ex) {
return Mono.error(ex);
}
});
} catch (Exception ex) {
return Mono.error(ex);
}
}
private Mono<Void> actorActionVoid(String httpVerb, String url, String payload) {
try {
CompletableFuture<Void> futureVoid =
client.invokeAPIVoid(httpVerb, url, objectSerializer.serialize(payload), null);
return Mono.just(futureVoid).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
});
} catch (Exception ex) {
return Mono.error(ex);
}
} }
private String getOptionsAsQueryParameter(StateOptions options) private String getOptionsAsQueryParameter(StateOptions options)

View File

@ -18,6 +18,7 @@ import reactor.core.publisher.Mono;
import java.io.IOException; import java.io.IOException;
import java.net.URL; import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -25,11 +26,6 @@ import java.util.concurrent.Executors;
class DaprHttp { class DaprHttp {
/**
* ObjectMapper to Serialize data
*/
private static final ObjectSerializer MAPPER = new ObjectSerializer();
/** /**
* Defines the standard application/json type for HTTP calls in Dapr. * Defines the standard application/json type for HTTP calls in Dapr.
*/ */
@ -42,6 +38,11 @@ class DaprHttp {
private static final RequestBody REQUEST_BODY_EMPTY_JSON = private static final RequestBody REQUEST_BODY_EMPTY_JSON =
RequestBody.Companion.create("", MEDIA_TYPE_APPLICATION_JSON); RequestBody.Companion.create("", MEDIA_TYPE_APPLICATION_JSON);
/**
* Empty input or output.
*/
private static final byte[] EMPTY_BYTES = new byte[0];
/** /**
* JSON Object Mapper. * JSON Object Mapper.
*/ */
@ -65,9 +66,9 @@ class DaprHttp {
/** /**
* Creates a new instance of {@link DaprHttp}. * Creates a new instance of {@link DaprHttp}.
* *
* @param baseUrl Base url calling Dapr (e.g. http://localhost) * @param baseUrl Base url calling Dapr (e.g. http://localhost)
* @param port Port for calling Dapr. (e.g. 3500) * @param port Port for calling Dapr. (e.g. 3500)
* @param httpClient RestClient used for all API calls in this new instance. * @param httpClient RestClient used for all API calls in this new instance.
*/ */
DaprHttp(String baseUrl, int port, OkHttpClient httpClient) { DaprHttp(String baseUrl, int port, OkHttpClient httpClient) {
this.baseUrl = String.format("%s:%d/", baseUrl, port); this.baseUrl = String.format("%s:%d/", baseUrl, port);
@ -76,16 +77,14 @@ class DaprHttp {
} }
/** /**
* Invokes an API asynchronously that returns Void. * Invokes an API asynchronously without payload that returns a text payload.
* *
* @param method HTTP method. * @param method HTTP method.
* @param urlString url as String. * @param urlString url as String.
* @param json JSON payload or null. * @return Asynchronous text
* @return Asynchronous Void
*/ */
protected final CompletableFuture<Void> invokeAPIVoid(String method, String urlString, String json, Map<String, String> headers) { public final Mono<String> invokeAPI(String method, String urlString, Map<String, String> headers) {
CompletableFuture<String> future = this.invokeAPI(method, urlString, json, headers); return this.invokeAPI(method, urlString, (byte[])null, headers);
return future.thenAcceptAsync(future::complete);
} }
/** /**
@ -93,24 +92,47 @@ class DaprHttp {
* *
* @param method HTTP method. * @param method HTTP method.
* @param urlString url as String. * @param urlString url as String.
* @param json JSON payload or null. * @param content payload to be posted.
* @return Asynchronous text * @return Asynchronous text
*/ */
public final CompletableFuture<String> invokeAPI(String method, String urlString, String json, Map<String, String> headers) { public final Mono<String> invokeAPI(String method, String urlString, String content, Map<String, String> headers) {
CompletableFuture<String> future = CompletableFuture.supplyAsync( return this.invokeAPI(method, urlString, content == null ? EMPTY_BYTES : content.getBytes(StandardCharsets.UTF_8), headers);
}
/**
* Invokes an API asynchronously that returns a text payload.
*
* @param method HTTP method.
* @param urlString url as String.
* @param content payload to be posted.
* @return Asynchronous text
*/
public final Mono<String> invokeAPI(String method, String urlString, byte[] content, Map<String, String> headers) {
return Mono.fromFuture(CompletableFuture.supplyAsync(
() -> { () -> {
try { try {
String requestId = UUID.randomUUID().toString(); String requestId = UUID.randomUUID().toString();
RequestBody body = RequestBody body = REQUEST_BODY_EMPTY_JSON;
json != null ? RequestBody.Companion.create(json, MEDIA_TYPE_APPLICATION_JSON) : REQUEST_BODY_EMPTY_JSON;
String contentType = headers != null ? headers.get("content-type") : null;
MediaType mediaType = contentType == null ? MEDIA_TYPE_APPLICATION_JSON : MediaType.get(contentType);
if (content == null) {
body = mediaType.equals(MEDIA_TYPE_APPLICATION_JSON) ?
REQUEST_BODY_EMPTY_JSON : RequestBody.Companion.create(new byte[0], mediaType);
} else {
body = RequestBody.Companion.create(content, mediaType);
}
Request.Builder requestBuilder = new Request.Builder() Request.Builder requestBuilder = new Request.Builder()
.url(new URL(this.baseUrl + urlString)) .url(new URL(this.baseUrl + urlString))
.method(method, body) .method(method, body)
.addHeader(Constants.HEADER_DAPR_REQUEST_ID, requestId); .addHeader(Constants.HEADER_DAPR_REQUEST_ID, requestId);
Optional.ofNullable(headers.entrySet()).orElse(Collections.emptySet()).stream().forEach(header ->{ if (headers != null) {
requestBuilder.addHeader(header.getKey(), header.getValue()); Optional.ofNullable(headers.entrySet()).orElse(Collections.emptySet()).stream()
}); .forEach(header -> {
requestBuilder.addHeader(header.getKey(), header.getValue());
});
}
Request request = requestBuilder.build(); Request request = requestBuilder.build();
@ -129,9 +151,7 @@ class DaprHttp {
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}, this.pool); }, this.pool));
return future;
} }
/** /**

View File

@ -0,0 +1,150 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.runtime;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.ObjectSerializer;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public final class Dapr implements DaprRuntime {
/**
* Shared Json serializer/deserializer as per Jackson's documentation.
*/
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Mono<byte[]> EMPTY_BYTES_ASYNC = Mono.just(new byte[0]);
private static final byte[] EMPTY_BYTES = new byte[0];
private static volatile DaprRuntime instance;
private final ObjectSerializer serializer = new ObjectSerializer();
private final Map<String, Function<HandleRequest, Mono<byte[]>>> handlers = Collections.synchronizedMap(new HashMap<>());
private Dapr() {
}
/**
* Returns an DaprRuntime object.
*
* @return DaprRuntime object.
*/
public static DaprRuntime getInstance() {
if (instance == null) {
synchronized (Dapr.class) {
if (instance == null) {
instance = new Dapr();
}
}
}
return instance;
}
@Override
public Collection<String> getSubscribedTopics() {
return this.handlers
.entrySet()
.stream().filter(kv -> kv.getValue() instanceof TopicHandler)
.map(kv -> kv.getKey())
.collect(Collectors.toCollection(ArrayList::new));
}
@Override
public void subscribeToTopic(String topic, TopicListener listener) {
this.handlers.putIfAbsent(topic, new TopicHandler(listener));
}
@Override
public void registerServiceMethod(String name, MethodListener listener) {
this.handlers.putIfAbsent(name, new MethodHandler(listener));
}
@Override
public Mono<byte[]> handleInvocation(String name, byte[] payload, Map<String, String> metadata) {
Function<HandleRequest, Mono<byte[]>> handler = this.handlers.get(name);
if (handler == null) {
return Mono.error(new DaprException("INVALID_METHOD_OR_TOPIC", "Did not find handler for : " + (name == null ? "" : name)));
}
try {
Map<String, Object> map = parse(payload);
String messageId = map.getOrDefault("id", "").toString();
String dataType = map.getOrDefault("datacontenttype", "").toString();
byte[] data = this.serializer.deserialize(map.get("data"), byte[].class);
return handler.apply(new HandleRequest(messageId, dataType, data, metadata)).switchIfEmpty(EMPTY_BYTES_ASYNC);
} catch (Exception e) {
// Handling exception in user code by propagating up via Mono.
return Mono.error(e);
}
}
private static final class HandleRequest {
private final String messageId;
private final String dataType;
private final byte[] payload;
private final Map<String, String> metadata;
public HandleRequest(String messageId, String dataType, byte[] payload, Map<String, String> metadata) {
this.messageId = messageId;
this.dataType = dataType;
this.payload = payload;
this.metadata = Collections.unmodifiableMap(metadata);
}
}
private static final class MethodHandler implements Function<HandleRequest, Mono<byte[]>> {
private final MethodListener listener;
private MethodHandler(MethodListener listener) {
this.listener = listener;
}
@Override
public Mono<byte[]> apply(HandleRequest r) {
return listener.process(r.payload, r.metadata);
}
}
private static final class TopicHandler implements Function<HandleRequest, Mono<byte[]>> {
private final TopicListener listener;
private TopicHandler(TopicListener listener) {
this.listener = listener;
}
@Override
public Mono<byte[]> apply(HandleRequest r) {
return listener.process(r.messageId, r.dataType, r.payload, r.metadata).then(Mono.empty());
}
}
public Map<String, Object> parse(final byte[] response) throws IOException {
if (response == null) {
return new HashMap<String, Object>();
}
return OBJECT_MAPPER.readValue(response, new TypeReference<Map<String, Object>>() {});
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.runtime;
import reactor.core.publisher.Mono;
import java.util.Collection;
import java.util.Map;
public interface DaprRuntime {
void subscribeToTopic(String topic, TopicListener listener);
Collection<String> getSubscribedTopics();
void registerServiceMethod(String name, MethodListener listener);
Mono<byte[]> handleInvocation(String name, byte[] payload, Map<String, String> metadata);
}

View File

@ -0,0 +1,16 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.runtime;
import reactor.core.publisher.Mono;
import java.util.Map;
public interface MethodListener {
Mono<byte[]> process(byte[] data, Map<String, String> metadata);
}

View File

@ -0,0 +1,16 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.runtime;
import reactor.core.publisher.Mono;
import java.util.Map;
public interface TopicListener {
Mono<Void> process(String messageId, String dataType, byte[] data, Map<String, String> metadata);
}

View File

@ -20,9 +20,9 @@ public final class Constants {
public static final String DEFAULT_HOSTNAME = "localhost"; public static final String DEFAULT_HOSTNAME = "localhost";
/** /**
* Dapr's default http protocol. * Dapr's default http base url.
*/ */
public static final String DEFAULT_HTTP_PROTOCOL_IDENTIFIED = "http://"; public static final String DEFAULT_BASE_HTTP_URL = "http://" + DEFAULT_HOSTNAME;
/** /**
* Dapr's default port. * Dapr's default port.

View File

@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets;
/** /**
* Serializes and deserializes an object. * Serializes and deserializes an object.
@ -33,13 +34,42 @@ public class ObjectSerializer {
* @return Array of bytes[] with the serialized content. * @return Array of bytes[] with the serialized content.
* @throws IOException In case state cannot be serialized. * @throws IOException In case state cannot be serialized.
*/ */
public <T> String serialize(T state) throws IOException { public <T> byte[] serialize(T state) throws IOException {
if (state == null) {
return null;
}
if (state instanceof byte[]) {
return (byte[])state;
}
if (state.getClass() == String.class) {
return ((String) state).getBytes(StandardCharsets.UTF_8);
}
if (isPrimitiveOrEquivalent(state.getClass())) {
return state.toString().getBytes(StandardCharsets.UTF_8);
}
// Not string, not primitive, so it is a complex type: we use JSON for that.
return OBJECT_MAPPER.writeValueAsBytes(state);
}
/**
* Serializes a given state object into String.
*
* @param state State object to be serialized.
* @param <T> Type of the state object.
* @return Array of bytes[] with the serialized content.
* @throws IOException In case state cannot be serialized.
*/
public <T> String serializeString(T state) throws IOException {
if (state == null) { if (state == null) {
return null; return null;
} }
if (state.getClass() == String.class) { if (state.getClass() == String.class) {
return state.toString(); return (String) state;
} }
if (isPrimitiveOrEquivalent(state.getClass())) { if (isPrimitiveOrEquivalent(state.getClass())) {
@ -60,10 +90,6 @@ public class ObjectSerializer {
* @throws IOException In case value cannot be deserialized. * @throws IOException In case value cannot be deserialized.
*/ */
public <T> T deserialize(Object value, Class<T> clazz) throws IOException { public <T> T deserialize(Object value, Class<T> clazz) throws IOException {
if (clazz == String.class) {
return (T) value;
}
if (isPrimitiveOrEquivalent(clazz)) { if (isPrimitiveOrEquivalent(clazz)) {
return parse(value, clazz); return parse(value, clazz);
} }
@ -72,7 +98,21 @@ public class ObjectSerializer {
return (T) null; return (T) null;
} }
// Not string, not primitive, so it is a complex type: we use JSON for that. if (clazz == String.class) {
return (value instanceof byte[])
? (T) new String((byte[])value, StandardCharsets.UTF_8) : (T) value.toString();
}
if (clazz == byte[].class) {
if (value instanceof String) {
return (T) value.toString().getBytes(StandardCharsets.UTF_8);
}
return (value instanceof byte[])
? (T) value : null;
}
// Not string, not primitive, not byte[], so it is a complex type: we use JSON for that.
if (value instanceof byte[]) { if (value instanceof byte[]) {
return OBJECT_MAPPER.readValue((byte[]) value, clazz); return OBJECT_MAPPER.readValue((byte[]) value, clazz);
} }
@ -134,13 +174,16 @@ public class ObjectSerializer {
if (isDoubleOrPrimitive(clazz) && isDoubleOrPrimitive(value.getClass())) return (T) value; if (isDoubleOrPrimitive(clazz) && isDoubleOrPrimitive(value.getClass())) return (T) value;
} }
if (isBooleanOrPrimitive(clazz)) return (T) Boolean.valueOf(value.toString()); String valueString = (value instanceof byte[]) ?
if (isByteOrPrimitive(clazz)) return (T) Byte.valueOf(value.toString()); new String((byte[])value, StandardCharsets.UTF_8) : value.toString();
if (isShortOrPrimitive(clazz)) return (T) Short.valueOf(value.toString());
if (isIntegerOrPrimitive(clazz)) return (T) Integer.valueOf(value.toString()); if (isBooleanOrPrimitive(clazz)) return (T) Boolean.valueOf(valueString);
if (isLongOrPrimitive(clazz)) return (T) Long.valueOf(value.toString()); if (isByteOrPrimitive(clazz)) return (T) Byte.valueOf(valueString);
if (isFloatOrPrimitive(clazz)) return (T) Float.valueOf(value.toString()); if (isShortOrPrimitive(clazz)) return (T) Short.valueOf(valueString);
if (isDoubleOrPrimitive(clazz)) return (T) Double.valueOf(value.toString()); if (isIntegerOrPrimitive(clazz)) return (T) Integer.valueOf(valueString);
if (isLongOrPrimitive(clazz)) return (T) Long.valueOf(valueString);
if (isFloatOrPrimitive(clazz)) return (T) Float.valueOf(valueString);
if (isDoubleOrPrimitive(clazz)) return (T) Double.valueOf(valueString);
return null; return null;
} }

View File

@ -177,7 +177,7 @@ public class ActorManagerTest {
private String createReminderParams(String data) throws IOException { private String createReminderParams(String data) throws IOException {
ActorReminderParams params = new ActorReminderParams(data, Duration.ofSeconds(1), Duration.ofSeconds(1)); ActorReminderParams params = new ActorReminderParams(data, Duration.ofSeconds(1), Duration.ofSeconds(1));
return this.context.getActorSerializer().serialize(params); return this.context.getActorSerializer().serializeString(params);
} }
private static ActorId newActorId() { private static ActorId newActorId() {

View File

@ -30,7 +30,7 @@ public class ActorReminderParamsTest {
ActorReminderParams original = new ActorReminderParams(null, Duration.ZERO.plusMinutes(2), Duration.ZERO.plusMinutes((5))); ActorReminderParams original = new ActorReminderParams(null, Duration.ZERO.plusMinutes(2), Duration.ZERO.plusMinutes((5)));
ActorReminderParams recreated = null; ActorReminderParams recreated = null;
try { try {
String serialized = SERIALIZER.serialize(original); String serialized = SERIALIZER.serializeString(original);
recreated = SERIALIZER.deserialize(serialized, ActorReminderParams.class); recreated = SERIALIZER.deserialize(serialized, ActorReminderParams.class);
} }
catch(Exception e) { catch(Exception e) {
@ -48,7 +48,7 @@ public class ActorReminderParamsTest {
ActorReminderParams original = new ActorReminderParams("maru", Duration.ZERO.plusMinutes(2), Duration.ZERO.plusMinutes((5))); ActorReminderParams original = new ActorReminderParams("maru", Duration.ZERO.plusMinutes(2), Duration.ZERO.plusMinutes((5)));
ActorReminderParams recreated = null; ActorReminderParams recreated = null;
try { try {
String serialized = SERIALIZER.serialize(original); String serialized = SERIALIZER.serializeString(original);
recreated = SERIALIZER.deserialize(serialized, ActorReminderParams.class); recreated = SERIALIZER.deserialize(serialized, ActorReminderParams.class);
} }
catch(Exception e) { catch(Exception e) {

View File

@ -28,7 +28,7 @@ public class ActorTimerTest {
null, null,
dueTime, dueTime,
period); period);
String s = new ActorStateSerializer().serialize(timer); String s = new ActorStateSerializer().serializeString(timer);
String expected = "{\"period\":\"1h0m3s0ms\",\"dueTime\":\"0h7m17s0ms\", \"callback\": \"myfunction\"}"; String expected = "{\"period\":\"1h0m3s0ms\",\"dueTime\":\"0h7m17s0ms\", \"callback\": \"myfunction\"}";
// Deep comparison via JsonNode.equals method. // Deep comparison via JsonNode.equals method.
@ -53,7 +53,7 @@ public class ActorTimerTest {
null, null,
dueTime, dueTime,
period); period);
String s = new ActorStateSerializer().serialize(timer); String s = new ActorStateSerializer().serializeString(timer);
// A negative period will be serialized to an empty string which is interpreted by Dapr to mean fire once only. // A negative period will be serialized to an empty string which is interpreted by Dapr to mean fire once only.
String expected = "{\"period\":\"\",\"dueTime\":\"0h7m17s0ms\", \"callback\": \"myfunction\"}"; String expected = "{\"period\":\"\",\"dueTime\":\"0h7m17s0ms\", \"callback\": \"myfunction\"}";

View File

@ -185,7 +185,7 @@ public class ObjectSerializerTest {
ObjectSerializer serializer = new ObjectSerializer(); ObjectSerializer serializer = new ObjectSerializer();
String serializedValue; String serializedValue;
try { try {
serializedValue = serializer.serialize(obj); serializedValue = serializer.serializeString(obj);
assertEquals("FOUND:[[" + serializedValue + "]] \n but was EXPECING: [[" + expectedResult + "]]", expectedResult, serializedValue); assertEquals("FOUND:[[" + serializedValue + "]] \n but was EXPECING: [[" + expectedResult + "]]", expectedResult, serializedValue);
} catch (IOException exception) { } catch (IOException exception) {
fail(exception.getMessage()); fail(exception.getMessage());
@ -197,7 +197,7 @@ public class ObjectSerializerTest {
ObjectSerializer serializer = new ObjectSerializer(); ObjectSerializer serializer = new ObjectSerializer();
String serializedValue; String serializedValue;
try { try {
serializedValue = serializer.serialize(null); serializedValue = serializer.serializeString(null);
assertNull("The expected result is null", serializedValue); assertNull("The expected result is null", serializedValue);
} catch (IOException exception) { } catch (IOException exception) {
fail(exception.getMessage()); fail(exception.getMessage());
@ -210,7 +210,7 @@ public class ObjectSerializerTest {
ObjectSerializer serializer = new ObjectSerializer(); ObjectSerializer serializer = new ObjectSerializer();
String serializedValue; String serializedValue;
try { try {
serializedValue = serializer.serialize(valueToSerialize); serializedValue = serializer.serializeString(valueToSerialize);
assertEquals(valueToSerialize, serializedValue); assertEquals(valueToSerialize, serializedValue);
} catch (IOException exception) { } catch (IOException exception) {
fail(exception.getMessage()); fail(exception.getMessage());
@ -224,7 +224,7 @@ public class ObjectSerializerTest {
ObjectSerializer serializer = new ObjectSerializer(); ObjectSerializer serializer = new ObjectSerializer();
String serializedValue; String serializedValue;
try { try {
serializedValue = serializer.serialize(valueToSerialize.intValue()); serializedValue = serializer.serializeString(valueToSerialize.intValue());
assertEquals(expectedResult, serializedValue); assertEquals(expectedResult, serializedValue);
} catch (IOException exception) { } catch (IOException exception) {
fail(exception.getMessage()); fail(exception.getMessage());