diff --git a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java index 9387b0be2..a3f370c57 100644 --- a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java +++ b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java @@ -8,7 +8,7 @@ package io.dapr.examples.actors.http; import io.dapr.actors.ActorId; import io.dapr.actors.client.ActorProxy; import io.dapr.actors.client.ActorProxyBuilder; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import java.util.ArrayList; import java.util.List; diff --git a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorService.java b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorService.java index be8539e20..031be77fc 100644 --- a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorService.java +++ b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorService.java @@ -6,7 +6,7 @@ package io.dapr.examples.actors.http; import io.dapr.actors.runtime.ActorRuntime; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.springboot.DaprApplication; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -36,7 +36,8 @@ public class DemoActorService { int port = Integer.parseInt(cmd.getOptionValue("port")); // Register the Actor class. - ActorRuntime.getInstance().registerActor(DemoActorImpl.class, new DefaultObjectSerializer()); + ActorRuntime.getInstance().registerActor( + DemoActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer()); // Start Dapr's callback endpoint. DaprApplication.start(port); diff --git a/examples/src/main/java/io/dapr/examples/bindings/http/OutputBindingExample.java b/examples/src/main/java/io/dapr/examples/bindings/http/OutputBindingExample.java index d91dcac11..40b64de52 100644 --- a/examples/src/main/java/io/dapr/examples/bindings/http/OutputBindingExample.java +++ b/examples/src/main/java/io/dapr/examples/bindings/http/OutputBindingExample.java @@ -7,7 +7,7 @@ package io.dapr.examples.bindings.http; import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; /** * Service for output binding example. @@ -24,8 +24,8 @@ public class OutputBindingExample { public String message; } - public static void main(String[] args) throws Exception { - DaprClient client = new DaprClientBuilder(new DefaultObjectSerializer()).build(); + public static void main(String[] args) { + DaprClient client = new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer()).build(); final String BINDING_NAME = "sample123"; diff --git a/examples/src/main/java/io/dapr/examples/invoke/http/InvokeClient.java b/examples/src/main/java/io/dapr/examples/invoke/http/InvokeClient.java index 7037439fe..ced9ab84b 100644 --- a/examples/src/main/java/io/dapr/examples/invoke/http/InvokeClient.java +++ b/examples/src/main/java/io/dapr/examples/invoke/http/InvokeClient.java @@ -7,7 +7,7 @@ package io.dapr.examples.invoke.http; import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.client.domain.Verb; /** @@ -28,7 +28,7 @@ public class InvokeClient { * @param args Messages to be sent as request for the invoke API. */ public static void main(String[] args) { - DaprClient client = (new DaprClientBuilder(new DefaultObjectSerializer())).build(); + DaprClient client = (new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer())).build(); for (String message : args) { client.invokeService(Verb.POST, SERVICE_APP_ID, "say", message, null, String.class).block(); } diff --git a/examples/src/main/java/io/dapr/examples/pubsub/http/Publisher.java b/examples/src/main/java/io/dapr/examples/pubsub/http/Publisher.java index 0da5802a8..dfbb1f300 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/http/Publisher.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/http/Publisher.java @@ -7,7 +7,7 @@ package io.dapr.examples.pubsub.http; import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import java.util.Collections; @@ -25,7 +25,7 @@ public class Publisher { private static final String TOPIC_NAME = "message"; public static void main(String[] args) throws Exception { - DaprClient client = new DaprClientBuilder(new DefaultObjectSerializer()).build(); + DaprClient client = new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer()).build(); for (int i = 0; i < NUM_MESSAGES; i++) { String message = String.format("This is message #%d", i); client.publishEvent(TOPIC_NAME, message).block(); diff --git a/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java b/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java index e9ca90007..94272331b 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/http/SubscriberController.java @@ -5,7 +5,7 @@ package io.dapr.examples.pubsub.http; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.client.domain.CloudEvent; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Mono; diff --git a/examples/src/main/java/io/dapr/examples/state/http/OrderManager.java b/examples/src/main/java/io/dapr/examples/state/http/OrderManager.java index dd05ee4f1..df2650f7e 100644 --- a/examples/src/main/java/io/dapr/examples/state/http/OrderManager.java +++ b/examples/src/main/java/io/dapr/examples/state/http/OrderManager.java @@ -2,6 +2,9 @@ package io.dapr.examples.state.http; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpServer; +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.serializer.DefaultObjectSerializer; import org.json.JSONArray; import org.json.JSONObject; @@ -43,35 +46,27 @@ import static java.lang.System.out; */ public class OrderManager { - static HttpClient httpClient; - public static void main(String[] args) throws IOException { - int httpPort = 3000; - String daprPort = Optional.ofNullable(System.getenv("DAPR_HTTP_PORT")).orElse("3500"); - String stateUrl = String.format("http://localhost:%s/v1.0/state", daprPort); + int httpPort = 3001; HttpServer httpServer = HttpServer.create(new InetSocketAddress(httpPort), 0); - httpClient = HttpClient.newBuilder().version(Version.HTTP_1_1).followRedirects(Redirect.NORMAL) - .connectTimeout(Duration.ofSeconds(2)).build(); + DaprClient daprClient = + (new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer())).build(); httpServer.createContext("/order").setHandler(e -> { out.println("Fetching order!"); - fetch(stateUrl + "/order").thenAccept(response -> { - int resCode = response.statusCode() == 200 ? 200 : 500; - String body = (response.statusCode() == 200) || (response.statusCode() == 201) ? response.body() : "Could not get state."; - try { - e.sendResponseHeaders(resCode, body.getBytes().length); - OutputStream os = e.getResponseBody(); - try { - os.write(body.getBytes()); - } finally { - os.close(); - } + byte[] data = daprClient.getState("order", String.class).block().getValue().getBytes(); + e.getResponseHeaders().set("content-type", "application/json"); + e.sendResponseHeaders(200, data.length); + e.getResponseBody().write(data); + e.getResponseBody().close(); } catch (IOException ioerror) { out.println(ioerror); + e.sendResponseHeaders(500, ioerror.getMessage().getBytes().length); + e.getResponseBody().write(ioerror.getMessage().getBytes()); + e.getResponseBody().close(); } - }); }); httpServer.createContext("/neworder").setHandler(e -> { @@ -83,30 +78,17 @@ public class OrderManager { String orderId = data.getString("orderId"); out.printf("Got a new order! Order ID: %s\n", orderId); - JSONObject item = new JSONObject(); - item.put("key", "order"); - item.put("value", data); - JSONArray state = new JSONArray(); - state.put(item); - out.printf("Writing to state: %s\n", state.toString()); + daprClient.saveState("order", data.toString()).block(); - post(stateUrl, state.toString()).thenAccept(response -> { - int resCode = (response.statusCode() == 200) || (response.statusCode() == 201) ? 201 : 500; - String body = response.body(); - try { - e.sendResponseHeaders(resCode, body.getBytes().length); - OutputStream os = e.getResponseBody(); - try { - os.write(body.getBytes()); - } finally { - os.close(); - } - } catch (IOException ioerror) { - out.println(ioerror); - } - }); + out.printf("Saved state: %s\n", data.toString()); + e.sendResponseHeaders(200, 0); + e.getResponseBody().write(new byte[0]); + e.getResponseBody().close(); } catch (IOException ioerror) { out.println(ioerror); + e.sendResponseHeaders(500, ioerror.getMessage().getBytes().length); + e.getResponseBody().write(ioerror.getMessage().getBytes()); + e.getResponseBody().close(); } }); @@ -114,18 +96,6 @@ public class OrderManager { out.printf("Java App listening on port %s.", httpPort); } - private static CompletableFuture> fetch(String url) { - HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)).build(); - return httpClient.sendAsync(request, BodyHandlers.ofString()); - } - - private static CompletableFuture> post(String url, String body) { - HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)) - .header("Content-Type", "application/json; charset=UTF-8").POST(BodyPublishers.ofString(body)).build(); - - return httpClient.sendAsync(request, BodyHandlers.ofString()); - } - private static String readBody(HttpExchange t) throws IOException { // retrieve the request json data InputStream is = t.getRequestBody(); diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java index 0623403cf..6ddf5f4ad 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java @@ -2,7 +2,7 @@ package io.dapr.actors.client; import io.dapr.actors.ActorId; import io.dapr.client.DaprHttpBuilder; -import io.dapr.client.DaprObjectSerializer; +import io.dapr.serializer.DaprObjectSerializer; /** * Builder to generate an ActorProxy instance. Builder can be reused for multiple instances. @@ -22,24 +22,24 @@ public class ActorProxyBuilder { /** * Dapr's object serializer. */ - private final DaprObjectSerializer serializer; + private final DaprObjectSerializer objectSerializer; /** * Instantiates a new builder for a given Actor type. * - * @param actorType Actor's type. - * @param serializer Serializer for objects sent/received. Use null for default (not recommended). + * @param actorType Actor's type. + * @param objectSerializer Serializer for objects sent/received. */ - public ActorProxyBuilder(String actorType, DaprObjectSerializer serializer) { + public ActorProxyBuilder(String actorType, DaprObjectSerializer objectSerializer) { if ((actorType == null) || actorType.isEmpty()) { throw new IllegalArgumentException("ActorType is required."); } - if (serializer == null) { + if (objectSerializer == null) { throw new IllegalArgumentException("Serializer is required."); } this.actorType = actorType; - this.serializer = serializer; + this.objectSerializer = objectSerializer; } /** @@ -56,7 +56,7 @@ public class ActorProxyBuilder { return new ActorProxyImpl( this.actorType, actorId, - this.serializer, + this.objectSerializer, new DaprHttpClient(this.daprHttpBuilder.build())); } diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyImpl.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyImpl.java index 59665c12b..97a69950c 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyImpl.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyImpl.java @@ -2,7 +2,7 @@ package io.dapr.actors.client; import io.dapr.actors.ActorId; import io.dapr.actors.runtime.ObjectSerializer; -import io.dapr.client.DaprObjectSerializer; +import io.dapr.serializer.DaprObjectSerializer; import reactor.core.publisher.Mono; import java.io.IOException; diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java index 20b37d851..c849a1974 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java @@ -7,7 +7,7 @@ package io.dapr.actors.runtime; import io.dapr.actors.ActorId; import io.dapr.actors.ActorTrace; import io.dapr.client.DaprHttpBuilder; -import io.dapr.client.DaprObjectSerializer; +import io.dapr.serializer.DaprObjectSerializer; import reactor.core.publisher.Mono; import java.io.IOException; @@ -112,11 +112,13 @@ public class ActorRuntime { * Registers an actor with the runtime. * * @param clazz The type of actor. - * @param objectSerializer Serializer for Actor's state and transient objects. + * @param objectSerializer Serializer for Actor's request and response objects. + * @param stateSerializer Serializer for Actor's state objects. * @param Actor class type. */ - public void registerActor(Class clazz, DaprObjectSerializer objectSerializer) { - registerActor(clazz, null, objectSerializer); + public void registerActor( + Class clazz, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) { + registerActor(clazz, null, objectSerializer, stateSerializer); } /** @@ -124,17 +126,23 @@ public class ActorRuntime { * * @param clazz The type of actor. * @param actorFactory An optional factory to create actors. This can be used for dependency injection. - * @param serializer Serializer for Actor's state and transient objects. + * @param objectSerializer Serializer for Actor's request and response objects. + * @param stateSerializer Serializer for Actor's state objects. * @param Actor class type. */ public void registerActor( - Class clazz, ActorFactory actorFactory, DaprObjectSerializer serializer) { + Class clazz, ActorFactory actorFactory, + DaprObjectSerializer objectSerializer, + DaprObjectSerializer stateSerializer) { if (clazz == null) { throw new IllegalArgumentException("Class is required."); } - if (serializer == null) { + if (objectSerializer == null) { throw new IllegalArgumentException("Serializer is required."); } + if (stateSerializer == null) { + throw new IllegalArgumentException("State objectSerializer is required."); + } ActorTypeInformation actorTypeInfo = ActorTypeInformation.create(clazz); @@ -142,11 +150,11 @@ public class ActorRuntime { ActorRuntimeContext context = new ActorRuntimeContext<>( this, - serializer, + objectSerializer, actualActorFactory, actorTypeInfo, this.daprClient, - new DaprStateAsyncProvider(this.daprClient, serializer)); + new DaprStateAsyncProvider(this.daprClient, stateSerializer)); // Create ActorManagers, override existing entry if registered again. this.actorManagers.put(actorTypeInfo.getName(), new ActorManager(context)); diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntimeContext.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntimeContext.java index be8e3e413..29e409afe 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntimeContext.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntimeContext.java @@ -5,7 +5,7 @@ package io.dapr.actors.runtime; -import io.dapr.client.DaprObjectSerializer; +import io.dapr.serializer.DaprObjectSerializer; import io.dapr.actors.ActorTrace; /** @@ -21,7 +21,7 @@ public class ActorRuntimeContext { private final ActorRuntime actorRuntime; /** - * Serializer. + * Serializer for transient objects. */ private final DaprObjectSerializer objectSerializer; @@ -54,7 +54,7 @@ public class ActorRuntimeContext { * Instantiates a new runtime context for the Actor type. * * @param actorRuntime Runtime. - * @param objectSerializer Serializer. + * @param objectSerializer Serializer for transient objects. * @param actorFactory Factory for Actors. * @param actorTypeInformation Information for Actor's type. * @param daprClient Client to communicate to Dapr. @@ -85,9 +85,9 @@ public class ActorRuntimeContext { } /** - * Gets the Actor's serializer. + * Gets the Actor's serializer for transient objects. * - * @return Actor's serializer. + * @return Actor's serializer for transient objects. */ DaprObjectSerializer getObjectSerializer() { return this.objectSerializer; diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java index 8250b65b6..d10a07a20 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java @@ -8,7 +8,8 @@ package io.dapr.actors.runtime; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import io.dapr.actors.ActorId; -import io.dapr.client.DaprObjectSerializer; +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.serializer.StringContentType; import reactor.core.publisher.Mono; import java.io.ByteArrayOutputStream; @@ -24,13 +25,30 @@ class DaprStateAsyncProvider { */ private static final JsonFactory JSON_FACTORY = new JsonFactory(); + /** + * Dapr's client for Actor runtime. + */ private final DaprClient daprClient; - private final DaprObjectSerializer serializer; + /** + * Serializer for state objects. + */ + private final DaprObjectSerializer stateSerializer; - DaprStateAsyncProvider(DaprClient daprClient, DaprObjectSerializer serializer) { + /** + * Flag determining if serializer's input and output contains a valid String. + */ + private final boolean isStateString; + + /** + * Instantiates a new Actor's state provider. + * @param daprClient Dapr client for Actor runtime. + * @param stateSerializer Serializer for state objects. + */ + DaprStateAsyncProvider(DaprClient daprClient, DaprObjectSerializer stateSerializer) { this.daprClient = daprClient; - this.serializer = serializer; + this.stateSerializer = stateSerializer; + this.isStateString = stateSerializer.getClass().getAnnotation(StringContentType.class) != null; } Mono load(String actorType, ActorId actorId, String stateName, Class clazz) { @@ -38,7 +56,7 @@ class DaprStateAsyncProvider { return result.flatMap(s -> { try { - T response = this.serializer.deserialize(s, clazz); + T response = this.stateSerializer.deserialize(s, clazz); if (response == null) { return Mono.empty(); } @@ -110,8 +128,16 @@ class DaprStateAsyncProvider { // Start request object. generator.writeObjectFieldStart("request"); generator.writeStringField("key", stateChange.getStateName()); - if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE) || (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) { - generator.writeBinaryField("value", this.serializer.serialize(stateChange.getValue())); + if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE) || + (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) { + byte[] data = this.stateSerializer.serialize(stateChange.getValue()); + if (data != null) { + if (this.isStateString) { + generator.writeStringField("value", new String(data)); + } else { + generator.writeBinaryField("value", data); + } + } } // End request object. generator.writeEndObject(); diff --git a/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyBuilderTest.java b/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyBuilderTest.java index 8902b2eac..c04d68298 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyBuilderTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyBuilderTest.java @@ -1,12 +1,10 @@ package io.dapr.actors.client; import io.dapr.actors.ActorId; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.*; - public class ActorProxyBuilderTest { @Test(expected = IllegalArgumentException.class) diff --git a/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyForTestsImpl.java b/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyForTestsImpl.java index b3dbfec96..faa237136 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyForTestsImpl.java +++ b/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyForTestsImpl.java @@ -6,7 +6,7 @@ package io.dapr.actors.client; import io.dapr.actors.ActorId; -import io.dapr.client.DaprObjectSerializer; +import io.dapr.serializer.DaprObjectSerializer; public class ActorProxyForTestsImpl extends ActorProxyImpl { diff --git a/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyImplTest.java b/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyImplTest.java index 18a3746ff..ac5cf3674 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyImplTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/client/ActorProxyImplTest.java @@ -1,8 +1,8 @@ package io.dapr.actors.client; import io.dapr.actors.ActorId; -import io.dapr.client.DefaultObjectSerializer; -import io.dapr.client.DaprObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; +import io.dapr.serializer.DaprObjectSerializer; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorCustomSerializerTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorCustomSerializerTest.java new file mode 100644 index 000000000..ffdd1c9d9 --- /dev/null +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorCustomSerializerTest.java @@ -0,0 +1,176 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.actors.runtime; + +import io.dapr.actors.ActorId; +import io.dapr.actors.client.ActorProxy; +import io.dapr.actors.client.ActorProxyForTestsImpl; +import io.dapr.actors.client.DaprClientStub; +import io.dapr.serializer.DaprObjectSerializer; +import org.junit.Assert; +import org.junit.Test; +import reactor.core.publisher.Mono; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ActorCustomSerializerTest { + + private static final ObjectSerializer INTERNAL_SERIALIZER = new ObjectSerializer(); + + private static final DaprObjectSerializer CUSTOM_SERIALIZER = new JavaSerializer(); + + private static final AtomicInteger ACTOR_ID_COUNT = new AtomicInteger(); + + private final ActorRuntimeContext context = createContext(); + + private ActorManager manager = new ActorManager<>(context); + + public interface MyActor { + Mono intInIntOut(int input); + + Mono stringInStringOut(String input); + + Mono classInClassOut(MyData input); + } + + @ActorType(name = "MyActor") + public static class ActorImpl extends AbstractActor implements MyActor { + + //public MyActorImpl(ActorRuntimeContext runtimeContext, ActorId id) { + public ActorImpl(ActorRuntimeContext runtimeContext, ActorId id) { + super(runtimeContext, id); + } + + @Override + public Mono intInIntOut(int input) { + return Mono.fromSupplier(() -> input + input); + } + + @Override + public Mono stringInStringOut(String input) { + return Mono.fromSupplier(() -> input + input); + } + + @Override + public Mono classInClassOut(MyData input) { + return Mono.fromSupplier(() -> new MyData( + input.getName() + input.getName(), + input.getNum() + input.getNum()) + ); + } + } + + static class MyData implements Serializable { + private String name; + private int num; + + public MyData() { + this.name = ""; + this.num = 0; + } + + public MyData(String name, int num) { + this.name = name; + this.num = num; + } + + public String getName() { + return this.name; + } + + public int getNum() { + return this.num; + } + } + + @Test + public void classInClassOut() { + ActorProxy actorProxy = createActorProxy(); + MyData d = new MyData("hi", 3); + + MyData response = actorProxy.invokeActorMethod("classInClassOut", d, MyData.class).block(); + + Assert.assertEquals("hihi", response.getName()); + Assert.assertEquals(6, response.getNum()); + } + + @Test + public void stringInStringOut() { + ActorProxy actorProxy = createActorProxy(); + String response = actorProxy.invokeActorMethod("stringInStringOut", "oi", String.class).block(); + + Assert.assertEquals("oioi", response); + } + + @Test + public void intInIntOut() { + ActorProxy actorProxy = createActorProxy(); + int response = actorProxy.invokeActorMethod("intInIntOut", 2, int.class).block(); + + Assert.assertEquals(4, response); + } + + private static ActorId newActorId() { + return new ActorId(Integer.toString(ACTOR_ID_COUNT.incrementAndGet())); + } + + private ActorProxy createActorProxy() { + ActorId actorId = newActorId(); + + // Mock daprClient for ActorProxy only, not for runtime. + DaprClientStub daprClient = mock(DaprClientStub.class); + + when(daprClient.invokeActorMethod( + eq(context.getActorTypeInformation().getName()), + eq(actorId.toString()), + any(), + any())) + .thenAnswer(invocationOnMock -> + this.manager.invokeMethod( + new ActorId(invocationOnMock.getArgument(1, String.class)), + invocationOnMock.getArgument(2, String.class), + INTERNAL_SERIALIZER.unwrapData(invocationOnMock.getArgument(3, byte[].class))) + .map(s -> { + try { + return INTERNAL_SERIALIZER.wrapData(s); + } catch (Exception e) { + throw new RuntimeException(e); + } + })); + + this.manager.activateActor(actorId).block(); + + return new ActorProxyForTestsImpl( + context.getActorTypeInformation().getName(), + actorId, + CUSTOM_SERIALIZER, + daprClient); + } + + private static ActorRuntimeContext createContext() { + DaprClient daprClient = mock(DaprClient.class); + + when(daprClient.registerActorTimer(any(), any(), any(), any())).thenReturn(Mono.empty()); + when(daprClient.registerActorReminder(any(), any(), any(), any())).thenReturn(Mono.empty()); + when(daprClient.unregisterActorTimer(any(), any(), any())).thenReturn(Mono.empty()); + when(daprClient.unregisterActorReminder(any(), any(), any())).thenReturn(Mono.empty()); + + return new ActorRuntimeContext( + mock(ActorRuntime.class), + CUSTOM_SERIALIZER, + new DefaultActorFactory(), + ActorTypeInformation.create(ActorImpl.class), + daprClient, + mock(DaprStateAsyncProvider.class) + ); + } +} diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorManagerTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorManagerTest.java index 76a22387b..9dbc50df1 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorManagerTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorManagerTest.java @@ -6,7 +6,7 @@ package io.dapr.actors.runtime; import io.dapr.actors.ActorId; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import org.junit.Assert; import org.junit.Test; import reactor.core.publisher.Mono; diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorNoStateTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorNoStateTest.java index 07cffa327..2360b2fa1 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorNoStateTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorNoStateTest.java @@ -9,7 +9,7 @@ import io.dapr.actors.ActorId; import io.dapr.actors.client.ActorProxy; import io.dapr.actors.client.ActorProxyForTestsImpl; import io.dapr.actors.client.DaprClientStub; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import org.junit.Assert; import org.junit.Test; import reactor.core.publisher.Mono; diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java index 004fbf2a0..ae6fc5839 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorRuntimeTest.java @@ -6,7 +6,7 @@ package io.dapr.actors.runtime; import io.dapr.actors.ActorId; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -67,21 +67,21 @@ public class ActorRuntimeTest { @Test public void registerActor() throws Exception { - this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer()); + this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer()); Assert.assertTrue(new String(this.runtime.serializeConfig()).contains(ACTOR_NAME)); } @Test public void activateActor() throws Exception { String actorId = UUID.randomUUID().toString(); - this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer()); + this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer()); this.runtime.activate(ACTOR_NAME, actorId).block(); } @Test public void invokeActor() throws Exception { String actorId = UUID.randomUUID().toString(); - this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer()); + this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer()); this.runtime.activate(ACTOR_NAME, actorId).block(); byte[] response = this.runtime.invoke(ACTOR_NAME, actorId, "say", null).block(); @@ -92,7 +92,7 @@ public class ActorRuntimeTest { @Test public void activateThendeactivateActor() throws Exception { String actorId = UUID.randomUUID().toString(); - this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer()); + this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer()); this.runtime.activate(ACTOR_NAME, actorId).block(); this.runtime.deactivate(ACTOR_NAME, actorId).block(); } @@ -100,14 +100,14 @@ public class ActorRuntimeTest { @Test public void deactivateActor() throws Exception { String actorId = UUID.randomUUID().toString(); - this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer()); + this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer()); this.runtime.deactivate(ACTOR_NAME, actorId).block(); } @Test public void lazyActivate() throws Exception { String actorId = UUID.randomUUID().toString(); - this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer()); + this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer()); this.runtime.activate(ACTOR_NAME, actorId).block(); this.runtime.invoke(ACTOR_NAME, actorId, "say", null) @@ -120,7 +120,7 @@ public class ActorRuntimeTest { @Test public void lazyDeactivate() throws Exception { String actorId = UUID.randomUUID().toString(); - this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer()); + this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer()); this.runtime.activate(ACTOR_NAME, actorId).block(); Mono deacticateCall = this.runtime.deactivate(ACTOR_NAME, actorId); @@ -139,7 +139,7 @@ public class ActorRuntimeTest { @Test public void lazyInvoke() throws Exception { String actorId = UUID.randomUUID().toString(); - this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer()); + this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer()); Mono invokeCall = this.runtime.invoke(ACTOR_NAME, actorId, "say", null); diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorStatefulTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorStatefulTest.java index 89b33bd3b..05f945103 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorStatefulTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorStatefulTest.java @@ -9,12 +9,13 @@ import io.dapr.actors.ActorId; import io.dapr.actors.client.ActorProxy; import io.dapr.actors.client.ActorProxyForTestsImpl; import io.dapr.actors.client.DaprClientStub; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import org.junit.Assert; import org.junit.Test; import reactor.core.publisher.Mono; import java.io.IOException; +import java.io.Serializable; import java.nio.charset.IllegalCharsetNameException; import java.time.Duration; import java.util.ArrayList; @@ -257,7 +258,7 @@ public class ActorStatefulTest { } // Class used to validate serialization/deserialization - public static class MyMethodContext { + public static class MyMethodContext implements Serializable { private String type; @@ -658,7 +659,7 @@ public class ActorStatefulTest { new DefaultActorFactory(), ActorTypeInformation.create(MyActorImpl.class), daprClient, - new DaprInMemoryStateProvider(new ObjectSerializer()) + new DaprInMemoryStateProvider(new JavaSerializer()) ); } } diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprInMemoryStateProvider.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprInMemoryStateProvider.java index 621f23dc3..9fde33352 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprInMemoryStateProvider.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprInMemoryStateProvider.java @@ -6,6 +6,7 @@ package io.dapr.actors.runtime; import io.dapr.actors.ActorId; +import io.dapr.serializer.DaprObjectSerializer; import reactor.core.publisher.Mono; import java.io.IOException; @@ -19,10 +20,10 @@ public class DaprInMemoryStateProvider extends DaprStateAsyncProvider { private static final Map stateStore = new HashMap<>(); - private final ObjectSerializer serializer; + private final DaprObjectSerializer serializer; - DaprInMemoryStateProvider(ObjectSerializer serializer) { - super(null, null); + DaprInMemoryStateProvider(DaprObjectSerializer serializer) { + super(null, serializer /* just to avoid NPE */); this.serializer = serializer; } diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java index 562cd87d2..79a761832 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java @@ -8,8 +8,8 @@ package io.dapr.actors.runtime; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.dapr.actors.ActorId; -import io.dapr.client.DaprObjectSerializer; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import org.junit.Assert; import org.junit.Test; import reactor.core.publisher.Mono; @@ -108,7 +108,7 @@ public class DaprStateAsyncProviderTest { String key = operation.get("request").get("key").asText(); JsonNode valueNode = operation.get("request").get("value"); - byte[] value = (valueNode == null) ? null : valueNode.binaryValue(); + byte[] value = (valueNode == null) ? null : valueNode.textValue().getBytes(); foundInsertName |= "upsert".equals(opName) && "name".equals(key) && Arrays.equals(SERIALIZER.serialize("Jon Doe"), value); diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DefaultActorFactoryTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DefaultActorFactoryTest.java index 290cfa539..0a14a1a86 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DefaultActorFactoryTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DefaultActorFactoryTest.java @@ -6,7 +6,7 @@ package io.dapr.actors.runtime; import io.dapr.actors.ActorId; -import io.dapr.client.DaprObjectSerializer; +import io.dapr.serializer.DaprObjectSerializer; import org.junit.Assert; import org.junit.Test; diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/DerivedActorTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/DerivedActorTest.java index f040554eb..d0b1c64e9 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/DerivedActorTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/DerivedActorTest.java @@ -9,7 +9,7 @@ import io.dapr.actors.ActorId; import io.dapr.actors.client.ActorProxy; import io.dapr.actors.client.ActorProxyForTestsImpl; import io.dapr.actors.client.DaprClientStub; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import org.junit.Assert; import org.junit.Test; import reactor.core.publisher.Mono; diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java new file mode 100644 index 000000000..b62afc0d0 --- /dev/null +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/JavaSerializer.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.actors.runtime; + +import io.dapr.serializer.DaprObjectSerializer; + +import java.io.*; + +/** + * Class used to test different serializer implementations. + */ +public class JavaSerializer implements DaprObjectSerializer { + + /** + * {@inheritDoc} + */ + @Override + public byte[] serialize(Object o) throws IOException { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { + oos.writeObject(o); + oos.flush(); + return bos.toByteArray(); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public T deserialize(byte[] data, Class clazz) throws IOException { + try (ByteArrayInputStream bis = new ByteArrayInputStream(data)) { + try (ObjectInputStream ois = new ObjectInputStream(bis)) { + try { + return (T) ois.readObject(); + } catch (Exception e) { + throw new IOException("Could not deserialize Java object.", e); + } + } + } + } +} diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ThrowFromPreAndPostActorMethodsTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ThrowFromPreAndPostActorMethodsTest.java index c287307a5..6b2c4a106 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ThrowFromPreAndPostActorMethodsTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ThrowFromPreAndPostActorMethodsTest.java @@ -9,7 +9,7 @@ import io.dapr.actors.ActorId; import io.dapr.actors.client.ActorProxy; import io.dapr.actors.client.ActorProxyForTestsImpl; import io.dapr.actors.client.DaprClientStub; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import org.junit.Assert; import org.junit.Test; import reactor.core.publisher.Mono; diff --git a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java index 9385eb30f..c19cda913 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java @@ -5,14 +5,13 @@ package io.dapr.client; import io.dapr.DaprGrpc; +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.utils.Constants; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import okhttp3.OkHttpClient; -import java.time.Duration; -import java.time.temporal.TemporalUnit; - /** * A builder for the DaprClient, * Currently only and HTTP Client will be supported. @@ -32,14 +31,14 @@ public class DaprClientBuilder { Constants.ENV_DAPR_GRPC_PORT, Constants.DEFAULT_GRPC_PORT); /** - * Default serializer. + * Serializer used for request and response objects in DaprClient. */ - private static final DaprObjectSerializer DEFAULT_SERIALIZER = new DefaultObjectSerializer(); + private final DaprObjectSerializer objectSerializer; /** - * Serializer used for objects in DaprClient. + * Serializer used for state objects in DaprClient. */ - private final DaprObjectSerializer serializer; + private final DaprObjectSerializer stateSerializer; /** * Finds the port defined by env variable or sticks to default. @@ -66,14 +65,21 @@ public class DaprClientBuilder { /** * Creates a constructor for DaprClient. * - * @param serializer Serializer for objects to be sent and received from Dapr. + * See {@link DefaultObjectSerializer} as possible serializer for non-production scenarios. + * + * @param objectSerializer Serializer for objects to be sent and received from Dapr. + * @param stateSerializer Serializer for objects to be persisted. */ - public DaprClientBuilder(DaprObjectSerializer serializer) { - if (serializer == null) { + public DaprClientBuilder(DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) { + if (objectSerializer == null) { throw new IllegalArgumentException("Serializer is required"); } + if (stateSerializer == null) { + throw new IllegalArgumentException("State serializer is required"); + } - this.serializer = serializer; + this.objectSerializer = objectSerializer; + this.stateSerializer = stateSerializer; } /** @@ -97,7 +103,7 @@ public class DaprClientBuilder { throw new IllegalStateException("Invalid port."); } ManagedChannel channel = ManagedChannelBuilder.forAddress(Constants.DEFAULT_HOSTNAME, GRPC_PORT).usePlaintext().build(); - return new DaprClientGrpcAdapter(DaprGrpc.newFutureStub(channel), new DefaultObjectSerializer()); + return new DaprClientGrpcAdapter(DaprGrpc.newFutureStub(channel), this.objectSerializer, this.stateSerializer); } /** @@ -111,6 +117,6 @@ public class DaprClientBuilder { } OkHttpClient okHttpClient = new OkHttpClient.Builder().build(); DaprHttp daprHttp = new DaprHttp(HTTP_PORT, okHttpClient); - return new DaprClientHttpAdapter(daprHttp, this.serializer); + return new DaprClientHttpAdapter(daprHttp, this.objectSerializer, this.stateSerializer); } } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java index 3a2694f2f..8bbf2887e 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java @@ -14,10 +14,13 @@ import io.dapr.DaprProtos; import io.dapr.client.domain.State; import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.Verb; +import io.dapr.serializer.DaprObjectSerializer; import reactor.core.publisher.Mono; import java.io.IOException; -import java.util.*; +import java.util.Arrays; +import java.util.List; +import java.util.Map; /** * An adapter for the GRPC Client. @@ -27,11 +30,6 @@ import java.util.*; */ class DaprClientGrpcAdapter implements DaprClient { - /** - * Serializer for internal objects. - */ - private static final ObjectSerializer INTERNAL_SERIALIZER = new ObjectSerializer(); - /** * The GRPC client to be used * @@ -40,19 +38,30 @@ class DaprClientGrpcAdapter implements DaprClient { private DaprGrpc.DaprFutureStub client; /** - * A utitlity class for serialize and deserialize the messages sent and retrieved by the client. + * A utitlity class for serialize and deserialize the transient objects. */ private DaprObjectSerializer objectSerializer; + /** + * A utitlity class for serialize and deserialize state objects. + */ + private DaprObjectSerializer stateSerializer; + /** * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder * - * @param futureClient - * @see io.dapr.client.DaprClientBuilder + * @param futureClient GRPC client + * @param objectSerializer Serializer for transient request/response objects. + * @param stateSerializer Serializer for state objects. + * @see DaprClientBuilder */ - DaprClientGrpcAdapter(DaprGrpc.DaprFutureStub futureClient, DaprObjectSerializer serializer) { - client = futureClient; - objectSerializer = serializer; + DaprClientGrpcAdapter( + DaprGrpc.DaprFutureStub futureClient, + DaprObjectSerializer objectSerializer, + DaprObjectSerializer stateSerializer) { + this.client = futureClient; + this.objectSerializer = objectSerializer; + this.stateSerializer = stateSerializer; } /** @@ -207,7 +216,7 @@ class DaprClientGrpcAdapter implements DaprClient { Class clazz) throws IOException { ByteString payload = response.getData().getValue(); byte[] data = payload == null ? null : payload.toByteArray(); - T value = objectSerializer.deserialize(data, clazz); + T value = stateSerializer.deserialize(data, clazz); String etag = response.getEtag(); String key = requestedKey; return new State<>(value, key, etag, stateOptions); @@ -240,7 +249,7 @@ class DaprClientGrpcAdapter implements DaprClient { } private DaprProtos.StateRequest.Builder buildStateRequest(State state) throws IOException { - byte[] bytes = objectSerializer.serialize(state.getValue()); + byte[] bytes = stateSerializer.serialize(state.getValue()); Any data = Any.newBuilder().setValue(ByteString.copyFrom(bytes)).build(); DaprProtos.StateRequest.Builder stateBuilder = DaprProtos.StateRequest.newBuilder(); if (state.getEtag() != null) { diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java b/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java index 6894057dd..f964ba08d 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java @@ -7,6 +7,9 @@ package io.dapr.client; import io.dapr.client.domain.State; import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.Verb; +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; +import io.dapr.serializer.StringContentType; import io.dapr.utils.Constants; import reactor.core.publisher.Mono; @@ -34,20 +37,34 @@ public class DaprClientHttpAdapter implements DaprClient { private final DaprHttp client; /** - * A utility class for serialize and deserialize customer's objects. + * A utility class for serialize and deserialize customer's transient objects. */ private final DaprObjectSerializer objectSerializer; + /** + * A utility class for serialize and deserialize customer's state objects. + */ + private final DaprObjectSerializer stateSerializer; + + /** + * Flag determining if serializer's input and output contains a valid String. + */ + private final boolean isStateString; + /** * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder * - * @param client Dapr's http client. - * @param serializer Dapr's object serializer. - * @see io.dapr.client.DaprClientBuilder + * @param client Dapr's http client. + * @param objectSerializer Dapr's serializer for transient request/response objects. + * @param stateSerializer Dapr's serializer for state objects. + * @see DaprClientBuilder + * @see DefaultObjectSerializer */ - DaprClientHttpAdapter(DaprHttp client, DaprObjectSerializer serializer) { + DaprClientHttpAdapter(DaprHttp client, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) { this.client = client; - this.objectSerializer = serializer; + this.objectSerializer = objectSerializer; + this.stateSerializer = stateSerializer; + this.isStateString = stateSerializer.getClass().getAnnotation(StringContentType.class) != null; } /** @@ -55,9 +72,10 @@ public class DaprClientHttpAdapter implements DaprClient { * * @param client Dapr's http client. * @see io.dapr.client.DaprClientBuilder + * @see DefaultObjectSerializer */ DaprClientHttpAdapter(DaprHttp client) { - this(client, new DefaultObjectSerializer()); + this(client, new DefaultObjectSerializer(), new DefaultObjectSerializer()); } /** @@ -246,13 +264,18 @@ public class DaprClientHttpAdapter implements DaprClient { headers.put(Constants.HEADER_HTTP_ETAG_ID, etag); } final String url = Constants.STATE_PATH; - List> internalStateObjects = new ArrayList<>(states.size()); + List> internalStateObjects = new ArrayList<>(states.size()); for (State state : states) { if (state == null) { continue; } - byte[] data = this.objectSerializer.serialize(state.getValue()); - internalStateObjects.add(new State<>(data, state.getKey(), state.getEtag(), state.getOptions())); + byte[] data = this.stateSerializer.serialize(state.getValue()); + if (this.isStateString) { + internalStateObjects.add( + new State<>(data == null ? null : new String(data), state.getKey(), state.getEtag(), state.getOptions())); + } else { + internalStateObjects.add(new State<>(data, state.getKey(), state.getEtag(), state.getOptions())); + } } byte[] serializedStateBody = INTERNAL_SERIALIZER.serialize(states); return this.client.invokeAPI( @@ -275,7 +298,7 @@ public class DaprClientHttpAdapter implements DaprClient { */ @Override public Mono saveState(String key, String etag, Object value, StateOptions options) { - return Mono.fromSupplier(() -> new State(value, key, etag, options)) + return Mono.fromSupplier(() -> new State<>(value, key, etag, options)) .flatMap(state -> saveStates(Arrays.asList(state))); } @@ -320,7 +343,8 @@ public class DaprClientHttpAdapter implements DaprClient { */ private State buildStateKeyValue( DaprHttp.Response response, String requestedKey, StateOptions stateOptions, Class clazz) throws IOException { - T value = objectSerializer.deserialize(response.getBody(), clazz); + // The state is in the body directly, so we use the state serializer here. + T value = stateSerializer.deserialize(response.getBody(), clazz); String key = requestedKey; String etag = null; if (response.getHeaders() != null && response.getHeaders().containsKey("Etag")) { diff --git a/sdk/src/main/java/io/dapr/client/DefaultObjectSerializer.java b/sdk/src/main/java/io/dapr/client/DefaultObjectSerializer.java deleted file mode 100644 index ef95985d5..000000000 --- a/sdk/src/main/java/io/dapr/client/DefaultObjectSerializer.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. - * Licensed under the MIT License. - */ - -package io.dapr.client; - -import java.io.IOException; - -/** - * Default serializer/deserializer for actor state. - * - * WARNING: for production systems, it is recommended for users to provide their own serializer instead. - */ -public class DefaultObjectSerializer implements DaprObjectSerializer { - - /** - * Shared serializer for all instances of the default state serializer. - */ - public static final ObjectSerializer SERIALIZER = new ObjectSerializer(); - - /** - * {@inheritDoc} - */ - @Override - public byte[] serialize(Object o) throws IOException { - return SERIALIZER.serialize(o); - } - - /** - * {@inheritDoc} - */ - @Override - public T deserialize(byte[] data, Class clazz) throws IOException { - return SERIALIZER.deserialize(data, clazz); - } -} diff --git a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java index 1b40a7f63..d95198118 100644 --- a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java @@ -42,11 +42,20 @@ public class ObjectSerializer { return null; } + if (state.getClass() == Void.class) { + return null; + } + // Have this check here to be consistent with deserialization (see deserialize() method below). if (state instanceof byte[]) { return (byte[])state; } + // This avoids string to be quoted in the state store. + if (state instanceof String) { + return ((String) state).getBytes(); + } + // Not string, not primitive, so it is a complex type: we use JSON for that. return OBJECT_MAPPER.writeValueAsBytes(state); } @@ -61,7 +70,7 @@ public class ObjectSerializer { * @throws IOException In case content cannot be deserialized. */ public T deserialize(byte[] content, Class clazz) throws IOException { - if (clazz == null) { + if ((clazz == null) || (clazz == Void.class)) { return null; } @@ -69,15 +78,23 @@ public class ObjectSerializer { return deserializePrimitives(content, clazz); } - if ((content == null) || (content.length == 0)) { + if (content == null) { return (T) null; } + if (clazz == String.class) { + return (T) new String(content); + } + // Deserialization of GRPC response fails without this check since it does not come as base64 encoded byte[]. if (clazz == byte[].class) { return (T) content; } + if (content.length == 0) { + return (T) null; + } + if (clazz == CloudEvent.class) { return (T) CloudEvent.deserialize(content); } diff --git a/sdk/src/main/java/io/dapr/client/DaprObjectSerializer.java b/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java similarity index 96% rename from sdk/src/main/java/io/dapr/client/DaprObjectSerializer.java rename to sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java index 14cea0833..9c11efd17 100644 --- a/sdk/src/main/java/io/dapr/client/DaprObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/serializer/DaprObjectSerializer.java @@ -3,7 +3,7 @@ * Licensed under the MIT License. */ -package io.dapr.client; +package io.dapr.serializer; import java.io.IOException; diff --git a/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java b/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java new file mode 100644 index 000000000..39da7bb42 --- /dev/null +++ b/sdk/src/main/java/io/dapr/serializer/DefaultObjectSerializer.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.serializer; + +import io.dapr.client.ObjectSerializer; + +import java.io.IOException; + +/** + * Default serializer/deserializer for request/response objects and for state objects too. + */ +@StringContentType +public class DefaultObjectSerializer extends ObjectSerializer implements DaprObjectSerializer { + + /** + * {@inheritDoc} + */ + @Override + public byte[] serialize(Object o) throws IOException { + return super.serialize(o); + } + + /** + * {@inheritDoc} + */ + @Override + public T deserialize(byte[] data, Class clazz) throws IOException { + return super.deserialize(data, clazz); + } +} diff --git a/sdk/src/main/java/io/dapr/serializer/StringContentType.java b/sdk/src/main/java/io/dapr/serializer/StringContentType.java new file mode 100644 index 000000000..ef604dd75 --- /dev/null +++ b/sdk/src/main/java/io/dapr/serializer/StringContentType.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.serializer; + +import java.lang.annotation.*; + +/** + * Flags a serializer indicating that byte[] contains String for both input and output. + * + * This information can be used to at the state store, for example, to save serialized data as plain text. + */ +@Documented +@Target({ElementType.TYPE_USE}) +@Retention(RetentionPolicy.RUNTIME) +public @interface StringContentType { +} diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcAdapterTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcAdapterTest.java index 5217e9e1d..46850d087 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcAdapterTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcAdapterTest.java @@ -10,6 +10,7 @@ import io.dapr.DaprProtos; import io.dapr.client.domain.State; import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.Verb; +import io.dapr.serializer.DefaultObjectSerializer; import org.checkerframework.checker.nullness.compatqual.NullableDecl; import org.junit.Before; import org.junit.Test; @@ -37,7 +38,7 @@ public class DaprClientGrpcAdapterTest { @Before public void setup() { client = mock(DaprGrpc.DaprFutureStub.class); - adapter = new DaprClientGrpcAdapter(client, new DefaultObjectSerializer()); + adapter = new DaprClientGrpcAdapter(client, new DefaultObjectSerializer(), new DefaultObjectSerializer()); serializer = new ObjectSerializer(); } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpAdapterTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpAdapterTest.java index 54370a15d..65be1634f 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpAdapterTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpAdapterTest.java @@ -128,7 +128,7 @@ public class DaprClientHttpAdapterTest { public void invokeService() { mockInterceptor.addRule() .get("http://localhost:3000/v1.0/invoke/41/method/neworder") - .respond("\"hello world\""); + .respond("hello world"); daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttpAdapter = new DaprClientHttpAdapter(daprHttp); Mono mono = daprClientHttpAdapter.invokeService(Verb.GET, "41", "neworder", null, null, String.class); diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprHttpTest.java index 85faf5453..c0d49ccb2 100644 --- a/sdk/src/test/java/io/dapr/client/DaprHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprHttpTest.java @@ -184,7 +184,7 @@ public class DaprHttpTest { assertEquals(existingState, serializer.deserialize(response.block().getBody(), String.class)); Mono responseDeleted = daprHttp.invokeAPI("GET", urlDeleteState, null, null); Mono responseDeleteKey = daprHttp.invokeAPI("DELETE", urlDeleteState, null, null); - assertNull(serializer.deserialize(responseDeleteKey.block().getBody(), String.class)); + assertEquals("", serializer.deserialize(responseDeleteKey.block().getBody(), String.class)); mockInterceptor.reset(); mockInterceptor.addRule() .get("http://localhost:3500/" + urlDeleteState) diff --git a/sdk/src/test/java/io/dapr/it/state/HttpStateClientIT.java b/sdk/src/test/java/io/dapr/it/state/HttpStateClientIT.java index 197e053bd..3a6022030 100644 --- a/sdk/src/test/java/io/dapr/it/state/HttpStateClientIT.java +++ b/sdk/src/test/java/io/dapr/it/state/HttpStateClientIT.java @@ -7,7 +7,7 @@ package io.dapr.it.state; import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.client.domain.State; import io.dapr.client.domain.StateOptions; import io.dapr.it.BaseIT; @@ -44,7 +44,7 @@ public class HttpStateClientIT extends BaseIT { final String stateKey = "myKey"; //create the http client - DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build(); + DaprClient daprClient = buildDaprClient(); //creation of a dummy data MyData data = new MyData(); @@ -77,7 +77,7 @@ public class HttpStateClientIT extends BaseIT { final String stateKey = "keyToBeUpdated"; //create http DAPR client - DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build(); + DaprClient daprClient = buildDaprClient(); //Create dummy data to be store MyData data = new MyData(); data.setPropertyA("data in property A"); @@ -112,7 +112,7 @@ public class HttpStateClientIT extends BaseIT { final String stateKey = "myeKeyToBeDeleted"; //create DAPR client - DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build(); + DaprClient daprClient = buildDaprClient(); //Create dummy data to be store MyData data = new MyData(); @@ -152,7 +152,7 @@ public class HttpStateClientIT extends BaseIT { //The key use to store the state and be updated using etags final String stateKey = "keyToBeUpdatedWithEtag"; //create DAPR client - DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build(); + DaprClient daprClient = buildDaprClient(); //Create dummy data to be store MyData data = new MyData(); data.setPropertyA("data in property A"); @@ -205,7 +205,7 @@ public class HttpStateClientIT extends BaseIT { final String stateKey = "keyToBeUpdatedWithWrongEtag"; //create DAPR client - DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build(); + DaprClient daprClient = buildDaprClient(); //Create dummy data to be store MyData data = new MyData(); data.setPropertyA("data in property A"); @@ -256,7 +256,7 @@ public class HttpStateClientIT extends BaseIT { public void saveAndDeleteStateWithEtag() { final String stateKey = "myeKeyToBeDeletedWithEtag"; //create DAPR client - DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build(); + DaprClient daprClient = buildDaprClient(); //Create dummy data to be store MyData data = new MyData(); data.setPropertyA("data in property A"); @@ -296,7 +296,7 @@ public class HttpStateClientIT extends BaseIT { final String stateKey = "myeKeyToBeDeletedWithWrongEtag"; //create DAPR client - DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build(); + DaprClient daprClient = buildDaprClient(); //Create dummy data to be store MyData data = new MyData(); data.setPropertyA("data in property A"); @@ -338,7 +338,7 @@ public class HttpStateClientIT extends BaseIT { StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, null); //create dapr client - DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build(); + DaprClient daprClient = buildDaprClient(); //create Dummy data MyData data = new MyData(); data.setPropertyA("data in property A"); @@ -396,7 +396,7 @@ public class HttpStateClientIT extends BaseIT { StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE, null); //create dapr client - DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build(); + DaprClient daprClient = buildDaprClient(); //create Dummy data MyData data = new MyData(); data.setPropertyA("data in property A"); @@ -453,7 +453,7 @@ public class HttpStateClientIT extends BaseIT { StateOptions stateOptions = new StateOptions(null, null, retryPolicy); //create DAPR client - DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build(); + DaprClient daprClient = buildDaprClient(); //Create dummy data to be store MyData data = new MyData(); data.setPropertyA("data in property A"); @@ -501,7 +501,7 @@ public class HttpStateClientIT extends BaseIT { StateOptions stateOptions = new StateOptions(null, null, retryPolicy); //create DAPR client - DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build(); + DaprClient daprClient = buildDaprClient(); //Create dummy data to be store MyData data = new MyData(); data.setPropertyA("data in property A"); @@ -542,4 +542,8 @@ public class HttpStateClientIT extends BaseIT { } + private static DaprClient buildDaprClient() { + return new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer()).build(); + } + } diff --git a/sdk/src/test/java/io/dapr/runtime/DaprRuntimeTest.java b/sdk/src/test/java/io/dapr/runtime/DaprRuntimeTest.java index b3039dec6..bcc23ffd8 100644 --- a/sdk/src/test/java/io/dapr/runtime/DaprRuntimeTest.java +++ b/sdk/src/test/java/io/dapr/runtime/DaprRuntimeTest.java @@ -10,6 +10,8 @@ import com.fasterxml.jackson.core.JsonGenerator; import io.dapr.client.*; import io.dapr.client.domain.CloudEvent; import io.dapr.client.domain.Verb; +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.utils.Constants; import org.junit.Assert; import org.junit.Before; diff --git a/sdk/src/test/java/io/dapr/utils/DefaultObjectSerializerTest.java b/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java similarity index 99% rename from sdk/src/test/java/io/dapr/utils/DefaultObjectSerializerTest.java rename to sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java index 4e61d641e..d680a750a 100644 --- a/sdk/src/test/java/io/dapr/utils/DefaultObjectSerializerTest.java +++ b/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java @@ -3,10 +3,10 @@ * Licensed under the MIT License. */ -package io.dapr.utils; +package io.dapr.serializer; import com.fasterxml.jackson.databind.ObjectMapper; -import io.dapr.client.DefaultObjectSerializer; +import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.client.domain.CloudEvent; import org.junit.Assert; import org.junit.Test; @@ -250,7 +250,7 @@ public class DefaultObjectSerializerTest { byte [] byteValue; try { serializedValue = new String(SERIALIZER.serialize(valueToSerialize)); - assertEquals("\"" + valueToSerialize + "\"", serializedValue); + assertEquals(valueToSerialize, serializedValue); byteValue = SERIALIZER.serialize(valueToSerialize); assertNotNull(byteValue); String deserializedValue = SERIALIZER.deserialize(byteValue, String.class); @@ -730,8 +730,8 @@ public class DefaultObjectSerializerTest { } }); - Assert.assertTrue(check.apply(null)); - Assert.assertTrue(check.apply( + assertTrue(check.apply(null)); + assertTrue(check.apply( new CloudEvent( "1", "mysource", @@ -739,7 +739,7 @@ public class DefaultObjectSerializerTest { "v2", "XML", ""))); - Assert.assertTrue(check.apply( + assertTrue(check.apply( new CloudEvent( "1234-65432", "myother",