Separate serializer for state. (#135)

This commit is contained in:
Artur Souza 2020-01-23 11:39:23 -08:00 committed by GitHub
parent d5506f38b6
commit 103b407aeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 527 additions and 222 deletions

View File

@ -8,7 +8,7 @@ package io.dapr.examples.actors.http;
import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxy;
import io.dapr.actors.client.ActorProxyBuilder;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import java.util.ArrayList;
import java.util.List;

View File

@ -6,7 +6,7 @@
package io.dapr.examples.actors.http;
import io.dapr.actors.runtime.ActorRuntime;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.springboot.DaprApplication;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@ -36,7 +36,8 @@ public class DemoActorService {
int port = Integer.parseInt(cmd.getOptionValue("port"));
// Register the Actor class.
ActorRuntime.getInstance().registerActor(DemoActorImpl.class, new DefaultObjectSerializer());
ActorRuntime.getInstance().registerActor(
DemoActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
// Start Dapr's callback endpoint.
DaprApplication.start(port);

View File

@ -7,7 +7,7 @@ package io.dapr.examples.bindings.http;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
/**
* Service for output binding example.
@ -24,8 +24,8 @@ public class OutputBindingExample {
public String message;
}
public static void main(String[] args) throws Exception {
DaprClient client = new DaprClientBuilder(new DefaultObjectSerializer()).build();
public static void main(String[] args) {
DaprClient client = new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer()).build();
final String BINDING_NAME = "sample123";

View File

@ -7,7 +7,7 @@ package io.dapr.examples.invoke.http;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.client.domain.Verb;
/**
@ -28,7 +28,7 @@ public class InvokeClient {
* @param args Messages to be sent as request for the invoke API.
*/
public static void main(String[] args) {
DaprClient client = (new DaprClientBuilder(new DefaultObjectSerializer())).build();
DaprClient client = (new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer())).build();
for (String message : args) {
client.invokeService(Verb.POST, SERVICE_APP_ID, "say", message, null, String.class).block();
}

View File

@ -7,7 +7,7 @@ package io.dapr.examples.pubsub.http;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import java.util.Collections;
@ -25,7 +25,7 @@ public class Publisher {
private static final String TOPIC_NAME = "message";
public static void main(String[] args) throws Exception {
DaprClient client = new DaprClientBuilder(new DefaultObjectSerializer()).build();
DaprClient client = new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer()).build();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
client.publishEvent(TOPIC_NAME, message).block();

View File

@ -5,7 +5,7 @@
package io.dapr.examples.pubsub.http;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.client.domain.CloudEvent;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;

View File

@ -2,6 +2,9 @@ package io.dapr.examples.state.http;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.serializer.DefaultObjectSerializer;
import org.json.JSONArray;
import org.json.JSONObject;
@ -43,35 +46,27 @@ import static java.lang.System.out;
*/
public class OrderManager {
static HttpClient httpClient;
public static void main(String[] args) throws IOException {
int httpPort = 3000;
String daprPort = Optional.ofNullable(System.getenv("DAPR_HTTP_PORT")).orElse("3500");
String stateUrl = String.format("http://localhost:%s/v1.0/state", daprPort);
int httpPort = 3001;
HttpServer httpServer = HttpServer.create(new InetSocketAddress(httpPort), 0);
httpClient = HttpClient.newBuilder().version(Version.HTTP_1_1).followRedirects(Redirect.NORMAL)
.connectTimeout(Duration.ofSeconds(2)).build();
DaprClient daprClient =
(new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer())).build();
httpServer.createContext("/order").setHandler(e -> {
out.println("Fetching order!");
fetch(stateUrl + "/order").thenAccept(response -> {
int resCode = response.statusCode() == 200 ? 200 : 500;
String body = (response.statusCode() == 200) || (response.statusCode() == 201) ? response.body() : "Could not get state.";
try {
e.sendResponseHeaders(resCode, body.getBytes().length);
OutputStream os = e.getResponseBody();
try {
os.write(body.getBytes());
} finally {
os.close();
}
byte[] data = daprClient.getState("order", String.class).block().getValue().getBytes();
e.getResponseHeaders().set("content-type", "application/json");
e.sendResponseHeaders(200, data.length);
e.getResponseBody().write(data);
e.getResponseBody().close();
} catch (IOException ioerror) {
out.println(ioerror);
e.sendResponseHeaders(500, ioerror.getMessage().getBytes().length);
e.getResponseBody().write(ioerror.getMessage().getBytes());
e.getResponseBody().close();
}
});
});
httpServer.createContext("/neworder").setHandler(e -> {
@ -83,30 +78,17 @@ public class OrderManager {
String orderId = data.getString("orderId");
out.printf("Got a new order! Order ID: %s\n", orderId);
JSONObject item = new JSONObject();
item.put("key", "order");
item.put("value", data);
JSONArray state = new JSONArray();
state.put(item);
out.printf("Writing to state: %s\n", state.toString());
daprClient.saveState("order", data.toString()).block();
post(stateUrl, state.toString()).thenAccept(response -> {
int resCode = (response.statusCode() == 200) || (response.statusCode() == 201) ? 201 : 500;
String body = response.body();
try {
e.sendResponseHeaders(resCode, body.getBytes().length);
OutputStream os = e.getResponseBody();
try {
os.write(body.getBytes());
} finally {
os.close();
}
} catch (IOException ioerror) {
out.println(ioerror);
}
});
out.printf("Saved state: %s\n", data.toString());
e.sendResponseHeaders(200, 0);
e.getResponseBody().write(new byte[0]);
e.getResponseBody().close();
} catch (IOException ioerror) {
out.println(ioerror);
e.sendResponseHeaders(500, ioerror.getMessage().getBytes().length);
e.getResponseBody().write(ioerror.getMessage().getBytes());
e.getResponseBody().close();
}
});
@ -114,18 +96,6 @@ public class OrderManager {
out.printf("Java App listening on port %s.", httpPort);
}
private static CompletableFuture<HttpResponse<String>> fetch(String url) {
HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)).build();
return httpClient.sendAsync(request, BodyHandlers.ofString());
}
private static CompletableFuture<HttpResponse<String>> post(String url, String body) {
HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url))
.header("Content-Type", "application/json; charset=UTF-8").POST(BodyPublishers.ofString(body)).build();
return httpClient.sendAsync(request, BodyHandlers.ofString());
}
private static String readBody(HttpExchange t) throws IOException {
// retrieve the request json data
InputStream is = t.getRequestBody();

View File

@ -2,7 +2,7 @@ package io.dapr.actors.client;
import io.dapr.actors.ActorId;
import io.dapr.client.DaprHttpBuilder;
import io.dapr.client.DaprObjectSerializer;
import io.dapr.serializer.DaprObjectSerializer;
/**
* Builder to generate an ActorProxy instance. Builder can be reused for multiple instances.
@ -22,24 +22,24 @@ public class ActorProxyBuilder {
/**
* Dapr's object serializer.
*/
private final DaprObjectSerializer serializer;
private final DaprObjectSerializer objectSerializer;
/**
* Instantiates a new builder for a given Actor type.
*
* @param actorType Actor's type.
* @param serializer Serializer for objects sent/received. Use null for default (not recommended).
* @param actorType Actor's type.
* @param objectSerializer Serializer for objects sent/received.
*/
public ActorProxyBuilder(String actorType, DaprObjectSerializer serializer) {
public ActorProxyBuilder(String actorType, DaprObjectSerializer objectSerializer) {
if ((actorType == null) || actorType.isEmpty()) {
throw new IllegalArgumentException("ActorType is required.");
}
if (serializer == null) {
if (objectSerializer == null) {
throw new IllegalArgumentException("Serializer is required.");
}
this.actorType = actorType;
this.serializer = serializer;
this.objectSerializer = objectSerializer;
}
/**
@ -56,7 +56,7 @@ public class ActorProxyBuilder {
return new ActorProxyImpl(
this.actorType,
actorId,
this.serializer,
this.objectSerializer,
new DaprHttpClient(this.daprHttpBuilder.build()));
}

View File

@ -2,7 +2,7 @@ package io.dapr.actors.client;
import io.dapr.actors.ActorId;
import io.dapr.actors.runtime.ObjectSerializer;
import io.dapr.client.DaprObjectSerializer;
import io.dapr.serializer.DaprObjectSerializer;
import reactor.core.publisher.Mono;
import java.io.IOException;

View File

@ -7,7 +7,7 @@ package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.actors.ActorTrace;
import io.dapr.client.DaprHttpBuilder;
import io.dapr.client.DaprObjectSerializer;
import io.dapr.serializer.DaprObjectSerializer;
import reactor.core.publisher.Mono;
import java.io.IOException;
@ -112,11 +112,13 @@ public class ActorRuntime {
* Registers an actor with the runtime.
*
* @param clazz The type of actor.
* @param objectSerializer Serializer for Actor's state and transient objects.
* @param objectSerializer Serializer for Actor's request and response objects.
* @param stateSerializer Serializer for Actor's state objects.
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(Class<T> clazz, DaprObjectSerializer objectSerializer) {
registerActor(clazz, null, objectSerializer);
public <T extends AbstractActor> void registerActor(
Class<T> clazz, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
registerActor(clazz, null, objectSerializer, stateSerializer);
}
/**
@ -124,17 +126,23 @@ public class ActorRuntime {
*
* @param clazz The type of actor.
* @param actorFactory An optional factory to create actors. This can be used for dependency injection.
* @param serializer Serializer for Actor's state and transient objects.
* @param objectSerializer Serializer for Actor's request and response objects.
* @param stateSerializer Serializer for Actor's state objects.
* @param <T> Actor class type.
*/
public <T extends AbstractActor> void registerActor(
Class<T> clazz, ActorFactory<T> actorFactory, DaprObjectSerializer serializer) {
Class<T> clazz, ActorFactory<T> actorFactory,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
if (clazz == null) {
throw new IllegalArgumentException("Class is required.");
}
if (serializer == null) {
if (objectSerializer == null) {
throw new IllegalArgumentException("Serializer is required.");
}
if (stateSerializer == null) {
throw new IllegalArgumentException("State objectSerializer is required.");
}
ActorTypeInformation<T> actorTypeInfo = ActorTypeInformation.create(clazz);
@ -142,11 +150,11 @@ public class ActorRuntime {
ActorRuntimeContext<T> context = new ActorRuntimeContext<>(
this,
serializer,
objectSerializer,
actualActorFactory,
actorTypeInfo,
this.daprClient,
new DaprStateAsyncProvider(this.daprClient, serializer));
new DaprStateAsyncProvider(this.daprClient, stateSerializer));
// Create ActorManagers, override existing entry if registered again.
this.actorManagers.put(actorTypeInfo.getName(), new ActorManager<T>(context));

View File

@ -5,7 +5,7 @@
package io.dapr.actors.runtime;
import io.dapr.client.DaprObjectSerializer;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.actors.ActorTrace;
/**
@ -21,7 +21,7 @@ public class ActorRuntimeContext<T extends AbstractActor> {
private final ActorRuntime actorRuntime;
/**
* Serializer.
* Serializer for transient objects.
*/
private final DaprObjectSerializer objectSerializer;
@ -54,7 +54,7 @@ public class ActorRuntimeContext<T extends AbstractActor> {
* Instantiates a new runtime context for the Actor type.
*
* @param actorRuntime Runtime.
* @param objectSerializer Serializer.
* @param objectSerializer Serializer for transient objects.
* @param actorFactory Factory for Actors.
* @param actorTypeInformation Information for Actor's type.
* @param daprClient Client to communicate to Dapr.
@ -85,9 +85,9 @@ public class ActorRuntimeContext<T extends AbstractActor> {
}
/**
* Gets the Actor's serializer.
* Gets the Actor's serializer for transient objects.
*
* @return Actor's serializer.
* @return Actor's serializer for transient objects.
*/
DaprObjectSerializer getObjectSerializer() {
return this.objectSerializer;

View File

@ -8,7 +8,8 @@ package io.dapr.actors.runtime;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import io.dapr.actors.ActorId;
import io.dapr.client.DaprObjectSerializer;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.StringContentType;
import reactor.core.publisher.Mono;
import java.io.ByteArrayOutputStream;
@ -24,13 +25,30 @@ class DaprStateAsyncProvider {
*/
private static final JsonFactory JSON_FACTORY = new JsonFactory();
/**
* Dapr's client for Actor runtime.
*/
private final DaprClient daprClient;
private final DaprObjectSerializer serializer;
/**
* Serializer for state objects.
*/
private final DaprObjectSerializer stateSerializer;
DaprStateAsyncProvider(DaprClient daprClient, DaprObjectSerializer serializer) {
/**
* Flag determining if serializer's input and output contains a valid String.
*/
private final boolean isStateString;
/**
* Instantiates a new Actor's state provider.
* @param daprClient Dapr client for Actor runtime.
* @param stateSerializer Serializer for state objects.
*/
DaprStateAsyncProvider(DaprClient daprClient, DaprObjectSerializer stateSerializer) {
this.daprClient = daprClient;
this.serializer = serializer;
this.stateSerializer = stateSerializer;
this.isStateString = stateSerializer.getClass().getAnnotation(StringContentType.class) != null;
}
<T> Mono<T> load(String actorType, ActorId actorId, String stateName, Class<T> clazz) {
@ -38,7 +56,7 @@ class DaprStateAsyncProvider {
return result.flatMap(s -> {
try {
T response = this.serializer.deserialize(s, clazz);
T response = this.stateSerializer.deserialize(s, clazz);
if (response == null) {
return Mono.empty();
}
@ -110,8 +128,16 @@ class DaprStateAsyncProvider {
// Start request object.
generator.writeObjectFieldStart("request");
generator.writeStringField("key", stateChange.getStateName());
if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE) || (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) {
generator.writeBinaryField("value", this.serializer.serialize(stateChange.getValue()));
if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE) ||
(stateChange.getChangeKind() == ActorStateChangeKind.ADD)) {
byte[] data = this.stateSerializer.serialize(stateChange.getValue());
if (data != null) {
if (this.isStateString) {
generator.writeStringField("value", new String(data));
} else {
generator.writeBinaryField("value", data);
}
}
}
// End request object.
generator.writeEndObject();

View File

@ -1,12 +1,10 @@
package io.dapr.actors.client;
import io.dapr.actors.ActorId;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.*;
public class ActorProxyBuilderTest {
@Test(expected = IllegalArgumentException.class)

View File

@ -6,7 +6,7 @@
package io.dapr.actors.client;
import io.dapr.actors.ActorId;
import io.dapr.client.DaprObjectSerializer;
import io.dapr.serializer.DaprObjectSerializer;
public class ActorProxyForTestsImpl extends ActorProxyImpl {

View File

@ -1,8 +1,8 @@
package io.dapr.actors.client;
import io.dapr.actors.ActorId;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.client.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.serializer.DaprObjectSerializer;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

View File

@ -0,0 +1,176 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxy;
import io.dapr.actors.client.ActorProxyForTestsImpl;
import io.dapr.actors.client.DaprClientStub;
import io.dapr.serializer.DaprObjectSerializer;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ActorCustomSerializerTest {
private static final ObjectSerializer INTERNAL_SERIALIZER = new ObjectSerializer();
private static final DaprObjectSerializer CUSTOM_SERIALIZER = new JavaSerializer();
private static final AtomicInteger ACTOR_ID_COUNT = new AtomicInteger();
private final ActorRuntimeContext context = createContext();
private ActorManager<ActorImpl> manager = new ActorManager<>(context);
public interface MyActor {
Mono<Integer> intInIntOut(int input);
Mono<String> stringInStringOut(String input);
Mono<MyData> classInClassOut(MyData input);
}
@ActorType(name = "MyActor")
public static class ActorImpl extends AbstractActor implements MyActor {
//public MyActorImpl(ActorRuntimeContext runtimeContext, ActorId id) {
public ActorImpl(ActorRuntimeContext runtimeContext, ActorId id) {
super(runtimeContext, id);
}
@Override
public Mono<Integer> intInIntOut(int input) {
return Mono.fromSupplier(() -> input + input);
}
@Override
public Mono<String> stringInStringOut(String input) {
return Mono.fromSupplier(() -> input + input);
}
@Override
public Mono<MyData> classInClassOut(MyData input) {
return Mono.fromSupplier(() -> new MyData(
input.getName() + input.getName(),
input.getNum() + input.getNum())
);
}
}
static class MyData implements Serializable {
private String name;
private int num;
public MyData() {
this.name = "";
this.num = 0;
}
public MyData(String name, int num) {
this.name = name;
this.num = num;
}
public String getName() {
return this.name;
}
public int getNum() {
return this.num;
}
}
@Test
public void classInClassOut() {
ActorProxy actorProxy = createActorProxy();
MyData d = new MyData("hi", 3);
MyData response = actorProxy.invokeActorMethod("classInClassOut", d, MyData.class).block();
Assert.assertEquals("hihi", response.getName());
Assert.assertEquals(6, response.getNum());
}
@Test
public void stringInStringOut() {
ActorProxy actorProxy = createActorProxy();
String response = actorProxy.invokeActorMethod("stringInStringOut", "oi", String.class).block();
Assert.assertEquals("oioi", response);
}
@Test
public void intInIntOut() {
ActorProxy actorProxy = createActorProxy();
int response = actorProxy.invokeActorMethod("intInIntOut", 2, int.class).block();
Assert.assertEquals(4, response);
}
private static ActorId newActorId() {
return new ActorId(Integer.toString(ACTOR_ID_COUNT.incrementAndGet()));
}
private ActorProxy createActorProxy() {
ActorId actorId = newActorId();
// Mock daprClient for ActorProxy only, not for runtime.
DaprClientStub daprClient = mock(DaprClientStub.class);
when(daprClient.invokeActorMethod(
eq(context.getActorTypeInformation().getName()),
eq(actorId.toString()),
any(),
any()))
.thenAnswer(invocationOnMock ->
this.manager.invokeMethod(
new ActorId(invocationOnMock.getArgument(1, String.class)),
invocationOnMock.getArgument(2, String.class),
INTERNAL_SERIALIZER.unwrapData(invocationOnMock.getArgument(3, byte[].class)))
.map(s -> {
try {
return INTERNAL_SERIALIZER.wrapData(s);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
this.manager.activateActor(actorId).block();
return new ActorProxyForTestsImpl(
context.getActorTypeInformation().getName(),
actorId,
CUSTOM_SERIALIZER,
daprClient);
}
private static <T extends AbstractActor> ActorRuntimeContext createContext() {
DaprClient daprClient = mock(DaprClient.class);
when(daprClient.registerActorTimer(any(), any(), any(), any())).thenReturn(Mono.empty());
when(daprClient.registerActorReminder(any(), any(), any(), any())).thenReturn(Mono.empty());
when(daprClient.unregisterActorTimer(any(), any(), any())).thenReturn(Mono.empty());
when(daprClient.unregisterActorReminder(any(), any(), any())).thenReturn(Mono.empty());
return new ActorRuntimeContext(
mock(ActorRuntime.class),
CUSTOM_SERIALIZER,
new DefaultActorFactory<T>(),
ActorTypeInformation.create(ActorImpl.class),
daprClient,
mock(DaprStateAsyncProvider.class)
);
}
}

View File

@ -6,7 +6,7 @@
package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Mono;

View File

@ -9,7 +9,7 @@ import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxy;
import io.dapr.actors.client.ActorProxyForTestsImpl;
import io.dapr.actors.client.DaprClientStub;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Mono;

View File

@ -6,7 +6,7 @@
package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@ -67,21 +67,21 @@ public class ActorRuntimeTest {
@Test
public void registerActor() throws Exception {
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer());
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
Assert.assertTrue(new String(this.runtime.serializeConfig()).contains(ACTOR_NAME));
}
@Test
public void activateActor() throws Exception {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer());
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
this.runtime.activate(ACTOR_NAME, actorId).block();
}
@Test
public void invokeActor() throws Exception {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer());
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
this.runtime.activate(ACTOR_NAME, actorId).block();
byte[] response = this.runtime.invoke(ACTOR_NAME, actorId, "say", null).block();
@ -92,7 +92,7 @@ public class ActorRuntimeTest {
@Test
public void activateThendeactivateActor() throws Exception {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer());
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
this.runtime.activate(ACTOR_NAME, actorId).block();
this.runtime.deactivate(ACTOR_NAME, actorId).block();
}
@ -100,14 +100,14 @@ public class ActorRuntimeTest {
@Test
public void deactivateActor() throws Exception {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer());
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
this.runtime.deactivate(ACTOR_NAME, actorId).block();
}
@Test
public void lazyActivate() throws Exception {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer());
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
this.runtime.activate(ACTOR_NAME, actorId).block();
this.runtime.invoke(ACTOR_NAME, actorId, "say", null)
@ -120,7 +120,7 @@ public class ActorRuntimeTest {
@Test
public void lazyDeactivate() throws Exception {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer());
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
this.runtime.activate(ACTOR_NAME, actorId).block();
Mono<Void> deacticateCall = this.runtime.deactivate(ACTOR_NAME, actorId);
@ -139,7 +139,7 @@ public class ActorRuntimeTest {
@Test
public void lazyInvoke() throws Exception {
String actorId = UUID.randomUUID().toString();
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer());
this.runtime.registerActor(MyActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
Mono<byte[]> invokeCall = this.runtime.invoke(ACTOR_NAME, actorId, "say", null);

View File

@ -9,12 +9,13 @@ import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxy;
import io.dapr.actors.client.ActorProxyForTestsImpl;
import io.dapr.actors.client.DaprClientStub;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.IllegalCharsetNameException;
import java.time.Duration;
import java.util.ArrayList;
@ -257,7 +258,7 @@ public class ActorStatefulTest {
}
// Class used to validate serialization/deserialization
public static class MyMethodContext {
public static class MyMethodContext implements Serializable {
private String type;
@ -658,7 +659,7 @@ public class ActorStatefulTest {
new DefaultActorFactory<T>(),
ActorTypeInformation.create(MyActorImpl.class),
daprClient,
new DaprInMemoryStateProvider(new ObjectSerializer())
new DaprInMemoryStateProvider(new JavaSerializer())
);
}
}

View File

@ -6,6 +6,7 @@
package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.serializer.DaprObjectSerializer;
import reactor.core.publisher.Mono;
import java.io.IOException;
@ -19,10 +20,10 @@ public class DaprInMemoryStateProvider extends DaprStateAsyncProvider {
private static final Map<String, byte[]> stateStore = new HashMap<>();
private final ObjectSerializer serializer;
private final DaprObjectSerializer serializer;
DaprInMemoryStateProvider(ObjectSerializer serializer) {
super(null, null);
DaprInMemoryStateProvider(DaprObjectSerializer serializer) {
super(null, serializer /* just to avoid NPE */);
this.serializer = serializer;
}

View File

@ -8,8 +8,8 @@ package io.dapr.actors.runtime;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.actors.ActorId;
import io.dapr.client.DaprObjectSerializer;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Mono;
@ -108,7 +108,7 @@ public class DaprStateAsyncProviderTest {
String key = operation.get("request").get("key").asText();
JsonNode valueNode = operation.get("request").get("value");
byte[] value = (valueNode == null) ? null : valueNode.binaryValue();
byte[] value = (valueNode == null) ? null : valueNode.textValue().getBytes();
foundInsertName |= "upsert".equals(opName) &&
"name".equals(key) &&
Arrays.equals(SERIALIZER.serialize("Jon Doe"), value);

View File

@ -6,7 +6,7 @@
package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.client.DaprObjectSerializer;
import io.dapr.serializer.DaprObjectSerializer;
import org.junit.Assert;
import org.junit.Test;

View File

@ -9,7 +9,7 @@ import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxy;
import io.dapr.actors.client.ActorProxyForTestsImpl;
import io.dapr.actors.client.DaprClientStub;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Mono;

View File

@ -0,0 +1,46 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import io.dapr.serializer.DaprObjectSerializer;
import java.io.*;
/**
* Class used to test different serializer implementations.
*/
public class JavaSerializer implements DaprObjectSerializer {
/**
* {@inheritDoc}
*/
@Override
public byte[] serialize(Object o) throws IOException {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(o);
oos.flush();
return bos.toByteArray();
}
}
}
/**
* {@inheritDoc}
*/
@Override
public <T> T deserialize(byte[] data, Class<T> clazz) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(data)) {
try (ObjectInputStream ois = new ObjectInputStream(bis)) {
try {
return (T) ois.readObject();
} catch (Exception e) {
throw new IOException("Could not deserialize Java object.", e);
}
}
}
}
}

View File

@ -9,7 +9,7 @@ import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxy;
import io.dapr.actors.client.ActorProxyForTestsImpl;
import io.dapr.actors.client.DaprClientStub;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Mono;

View File

@ -5,14 +5,13 @@
package io.dapr.client;
import io.dapr.DaprGrpc;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.Constants;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import okhttp3.OkHttpClient;
import java.time.Duration;
import java.time.temporal.TemporalUnit;
/**
* A builder for the DaprClient,
* Currently only and HTTP Client will be supported.
@ -32,14 +31,14 @@ public class DaprClientBuilder {
Constants.ENV_DAPR_GRPC_PORT, Constants.DEFAULT_GRPC_PORT);
/**
* Default serializer.
* Serializer used for request and response objects in DaprClient.
*/
private static final DaprObjectSerializer DEFAULT_SERIALIZER = new DefaultObjectSerializer();
private final DaprObjectSerializer objectSerializer;
/**
* Serializer used for objects in DaprClient.
* Serializer used for state objects in DaprClient.
*/
private final DaprObjectSerializer serializer;
private final DaprObjectSerializer stateSerializer;
/**
* Finds the port defined by env variable or sticks to default.
@ -66,14 +65,21 @@ public class DaprClientBuilder {
/**
* Creates a constructor for DaprClient.
*
* @param serializer Serializer for objects to be sent and received from Dapr.
* See {@link DefaultObjectSerializer} as possible serializer for non-production scenarios.
*
* @param objectSerializer Serializer for objects to be sent and received from Dapr.
* @param stateSerializer Serializer for objects to be persisted.
*/
public DaprClientBuilder(DaprObjectSerializer serializer) {
if (serializer == null) {
public DaprClientBuilder(DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
if (objectSerializer == null) {
throw new IllegalArgumentException("Serializer is required");
}
if (stateSerializer == null) {
throw new IllegalArgumentException("State serializer is required");
}
this.serializer = serializer;
this.objectSerializer = objectSerializer;
this.stateSerializer = stateSerializer;
}
/**
@ -97,7 +103,7 @@ public class DaprClientBuilder {
throw new IllegalStateException("Invalid port.");
}
ManagedChannel channel = ManagedChannelBuilder.forAddress(Constants.DEFAULT_HOSTNAME, GRPC_PORT).usePlaintext().build();
return new DaprClientGrpcAdapter(DaprGrpc.newFutureStub(channel), new DefaultObjectSerializer());
return new DaprClientGrpcAdapter(DaprGrpc.newFutureStub(channel), this.objectSerializer, this.stateSerializer);
}
/**
@ -111,6 +117,6 @@ public class DaprClientBuilder {
}
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
DaprHttp daprHttp = new DaprHttp(HTTP_PORT, okHttpClient);
return new DaprClientHttpAdapter(daprHttp, this.serializer);
return new DaprClientHttpAdapter(daprHttp, this.objectSerializer, this.stateSerializer);
}
}

View File

@ -14,10 +14,13 @@ import io.dapr.DaprProtos;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.Verb;
import io.dapr.serializer.DaprObjectSerializer;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.util.*;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* An adapter for the GRPC Client.
@ -27,11 +30,6 @@ import java.util.*;
*/
class DaprClientGrpcAdapter implements DaprClient {
/**
* Serializer for internal objects.
*/
private static final ObjectSerializer INTERNAL_SERIALIZER = new ObjectSerializer();
/**
* The GRPC client to be used
*
@ -40,19 +38,30 @@ class DaprClientGrpcAdapter implements DaprClient {
private DaprGrpc.DaprFutureStub client;
/**
* A utitlity class for serialize and deserialize the messages sent and retrieved by the client.
* A utitlity class for serialize and deserialize the transient objects.
*/
private DaprObjectSerializer objectSerializer;
/**
* A utitlity class for serialize and deserialize state objects.
*/
private DaprObjectSerializer stateSerializer;
/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
*
* @param futureClient
* @see io.dapr.client.DaprClientBuilder
* @param futureClient GRPC client
* @param objectSerializer Serializer for transient request/response objects.
* @param stateSerializer Serializer for state objects.
* @see DaprClientBuilder
*/
DaprClientGrpcAdapter(DaprGrpc.DaprFutureStub futureClient, DaprObjectSerializer serializer) {
client = futureClient;
objectSerializer = serializer;
DaprClientGrpcAdapter(
DaprGrpc.DaprFutureStub futureClient,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
this.client = futureClient;
this.objectSerializer = objectSerializer;
this.stateSerializer = stateSerializer;
}
/**
@ -207,7 +216,7 @@ class DaprClientGrpcAdapter implements DaprClient {
Class<T> clazz) throws IOException {
ByteString payload = response.getData().getValue();
byte[] data = payload == null ? null : payload.toByteArray();
T value = objectSerializer.deserialize(data, clazz);
T value = stateSerializer.deserialize(data, clazz);
String etag = response.getEtag();
String key = requestedKey;
return new State<>(value, key, etag, stateOptions);
@ -240,7 +249,7 @@ class DaprClientGrpcAdapter implements DaprClient {
}
private <T> DaprProtos.StateRequest.Builder buildStateRequest(State<T> state) throws IOException {
byte[] bytes = objectSerializer.serialize(state.getValue());
byte[] bytes = stateSerializer.serialize(state.getValue());
Any data = Any.newBuilder().setValue(ByteString.copyFrom(bytes)).build();
DaprProtos.StateRequest.Builder stateBuilder = DaprProtos.StateRequest.newBuilder();
if (state.getEtag() != null) {

View File

@ -7,6 +7,9 @@ package io.dapr.client;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.Verb;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.serializer.StringContentType;
import io.dapr.utils.Constants;
import reactor.core.publisher.Mono;
@ -34,20 +37,34 @@ public class DaprClientHttpAdapter implements DaprClient {
private final DaprHttp client;
/**
* A utility class for serialize and deserialize customer's objects.
* A utility class for serialize and deserialize customer's transient objects.
*/
private final DaprObjectSerializer objectSerializer;
/**
* A utility class for serialize and deserialize customer's state objects.
*/
private final DaprObjectSerializer stateSerializer;
/**
* Flag determining if serializer's input and output contains a valid String.
*/
private final boolean isStateString;
/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
*
* @param client Dapr's http client.
* @param serializer Dapr's object serializer.
* @see io.dapr.client.DaprClientBuilder
* @param client Dapr's http client.
* @param objectSerializer Dapr's serializer for transient request/response objects.
* @param stateSerializer Dapr's serializer for state objects.
* @see DaprClientBuilder
* @see DefaultObjectSerializer
*/
DaprClientHttpAdapter(DaprHttp client, DaprObjectSerializer serializer) {
DaprClientHttpAdapter(DaprHttp client, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
this.client = client;
this.objectSerializer = serializer;
this.objectSerializer = objectSerializer;
this.stateSerializer = stateSerializer;
this.isStateString = stateSerializer.getClass().getAnnotation(StringContentType.class) != null;
}
/**
@ -55,9 +72,10 @@ public class DaprClientHttpAdapter implements DaprClient {
*
* @param client Dapr's http client.
* @see io.dapr.client.DaprClientBuilder
* @see DefaultObjectSerializer
*/
DaprClientHttpAdapter(DaprHttp client) {
this(client, new DefaultObjectSerializer());
this(client, new DefaultObjectSerializer(), new DefaultObjectSerializer());
}
/**
@ -246,13 +264,18 @@ public class DaprClientHttpAdapter implements DaprClient {
headers.put(Constants.HEADER_HTTP_ETAG_ID, etag);
}
final String url = Constants.STATE_PATH;
List<State<byte[]>> internalStateObjects = new ArrayList<>(states.size());
List<State<Object>> internalStateObjects = new ArrayList<>(states.size());
for (State state : states) {
if (state == null) {
continue;
}
byte[] data = this.objectSerializer.serialize(state.getValue());
internalStateObjects.add(new State<>(data, state.getKey(), state.getEtag(), state.getOptions()));
byte[] data = this.stateSerializer.serialize(state.getValue());
if (this.isStateString) {
internalStateObjects.add(
new State<>(data == null ? null : new String(data), state.getKey(), state.getEtag(), state.getOptions()));
} else {
internalStateObjects.add(new State<>(data, state.getKey(), state.getEtag(), state.getOptions()));
}
}
byte[] serializedStateBody = INTERNAL_SERIALIZER.serialize(states);
return this.client.invokeAPI(
@ -275,7 +298,7 @@ public class DaprClientHttpAdapter implements DaprClient {
*/
@Override
public Mono<Void> saveState(String key, String etag, Object value, StateOptions options) {
return Mono.fromSupplier(() -> new State<Object>(value, key, etag, options))
return Mono.fromSupplier(() -> new State<>(value, key, etag, options))
.flatMap(state -> saveStates(Arrays.asList(state)));
}
@ -320,7 +343,8 @@ public class DaprClientHttpAdapter implements DaprClient {
*/
private <T> State<T> buildStateKeyValue(
DaprHttp.Response response, String requestedKey, StateOptions stateOptions, Class<T> clazz) throws IOException {
T value = objectSerializer.deserialize(response.getBody(), clazz);
// The state is in the body directly, so we use the state serializer here.
T value = stateSerializer.deserialize(response.getBody(), clazz);
String key = requestedKey;
String etag = null;
if (response.getHeaders() != null && response.getHeaders().containsKey("Etag")) {

View File

@ -1,37 +0,0 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.client;
import java.io.IOException;
/**
* Default serializer/deserializer for actor state.
*
* WARNING: for production systems, it is recommended for users to provide their own serializer instead.
*/
public class DefaultObjectSerializer implements DaprObjectSerializer {
/**
* Shared serializer for all instances of the default state serializer.
*/
public static final ObjectSerializer SERIALIZER = new ObjectSerializer();
/**
* {@inheritDoc}
*/
@Override
public byte[] serialize(Object o) throws IOException {
return SERIALIZER.serialize(o);
}
/**
* {@inheritDoc}
*/
@Override
public <T> T deserialize(byte[] data, Class<T> clazz) throws IOException {
return SERIALIZER.deserialize(data, clazz);
}
}

View File

@ -42,11 +42,20 @@ public class ObjectSerializer {
return null;
}
if (state.getClass() == Void.class) {
return null;
}
// Have this check here to be consistent with deserialization (see deserialize() method below).
if (state instanceof byte[]) {
return (byte[])state;
}
// This avoids string to be quoted in the state store.
if (state instanceof String) {
return ((String) state).getBytes();
}
// Not string, not primitive, so it is a complex type: we use JSON for that.
return OBJECT_MAPPER.writeValueAsBytes(state);
}
@ -61,7 +70,7 @@ public class ObjectSerializer {
* @throws IOException In case content cannot be deserialized.
*/
public <T> T deserialize(byte[] content, Class<T> clazz) throws IOException {
if (clazz == null) {
if ((clazz == null) || (clazz == Void.class)) {
return null;
}
@ -69,15 +78,23 @@ public class ObjectSerializer {
return deserializePrimitives(content, clazz);
}
if ((content == null) || (content.length == 0)) {
if (content == null) {
return (T) null;
}
if (clazz == String.class) {
return (T) new String(content);
}
// Deserialization of GRPC response fails without this check since it does not come as base64 encoded byte[].
if (clazz == byte[].class) {
return (T) content;
}
if (content.length == 0) {
return (T) null;
}
if (clazz == CloudEvent.class) {
return (T) CloudEvent.deserialize(content);
}

View File

@ -3,7 +3,7 @@
* Licensed under the MIT License.
*/
package io.dapr.client;
package io.dapr.serializer;
import java.io.IOException;

View File

@ -0,0 +1,33 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.serializer;
import io.dapr.client.ObjectSerializer;
import java.io.IOException;
/**
* Default serializer/deserializer for request/response objects and for state objects too.
*/
@StringContentType
public class DefaultObjectSerializer extends ObjectSerializer implements DaprObjectSerializer {
/**
* {@inheritDoc}
*/
@Override
public byte[] serialize(Object o) throws IOException {
return super.serialize(o);
}
/**
* {@inheritDoc}
*/
@Override
public <T> T deserialize(byte[] data, Class<T> clazz) throws IOException {
return super.deserialize(data, clazz);
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.serializer;
import java.lang.annotation.*;
/**
* Flags a serializer indicating that byte[] contains String for both input and output.
*
* This information can be used to at the state store, for example, to save serialized data as plain text.
*/
@Documented
@Target({ElementType.TYPE_USE})
@Retention(RetentionPolicy.RUNTIME)
public @interface StringContentType {
}

View File

@ -10,6 +10,7 @@ import io.dapr.DaprProtos;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.Verb;
import io.dapr.serializer.DefaultObjectSerializer;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;
import org.junit.Before;
import org.junit.Test;
@ -37,7 +38,7 @@ public class DaprClientGrpcAdapterTest {
@Before
public void setup() {
client = mock(DaprGrpc.DaprFutureStub.class);
adapter = new DaprClientGrpcAdapter(client, new DefaultObjectSerializer());
adapter = new DaprClientGrpcAdapter(client, new DefaultObjectSerializer(), new DefaultObjectSerializer());
serializer = new ObjectSerializer();
}

View File

@ -128,7 +128,7 @@ public class DaprClientHttpAdapterTest {
public void invokeService() {
mockInterceptor.addRule()
.get("http://localhost:3000/v1.0/invoke/41/method/neworder")
.respond("\"hello world\"");
.respond("hello world");
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttpAdapter = new DaprClientHttpAdapter(daprHttp);
Mono<String> mono = daprClientHttpAdapter.invokeService(Verb.GET, "41", "neworder", null, null, String.class);

View File

@ -184,7 +184,7 @@ public class DaprHttpTest {
assertEquals(existingState, serializer.deserialize(response.block().getBody(), String.class));
Mono<DaprHttp.Response> responseDeleted = daprHttp.invokeAPI("GET", urlDeleteState, null, null);
Mono<DaprHttp.Response> responseDeleteKey = daprHttp.invokeAPI("DELETE", urlDeleteState, null, null);
assertNull(serializer.deserialize(responseDeleteKey.block().getBody(), String.class));
assertEquals("", serializer.deserialize(responseDeleteKey.block().getBody(), String.class));
mockInterceptor.reset();
mockInterceptor.addRule()
.get("http://localhost:3500/" + urlDeleteState)

View File

@ -7,7 +7,7 @@ package io.dapr.it.state;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.it.BaseIT;
@ -44,7 +44,7 @@ public class HttpStateClientIT extends BaseIT {
final String stateKey = "myKey";
//create the http client
DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build();
DaprClient daprClient = buildDaprClient();
//creation of a dummy data
MyData data = new MyData();
@ -77,7 +77,7 @@ public class HttpStateClientIT extends BaseIT {
final String stateKey = "keyToBeUpdated";
//create http DAPR client
DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build();
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
@ -112,7 +112,7 @@ public class HttpStateClientIT extends BaseIT {
final String stateKey = "myeKeyToBeDeleted";
//create DAPR client
DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build();
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
@ -152,7 +152,7 @@ public class HttpStateClientIT extends BaseIT {
//The key use to store the state and be updated using etags
final String stateKey = "keyToBeUpdatedWithEtag";
//create DAPR client
DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build();
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
@ -205,7 +205,7 @@ public class HttpStateClientIT extends BaseIT {
final String stateKey = "keyToBeUpdatedWithWrongEtag";
//create DAPR client
DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build();
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
@ -256,7 +256,7 @@ public class HttpStateClientIT extends BaseIT {
public void saveAndDeleteStateWithEtag() {
final String stateKey = "myeKeyToBeDeletedWithEtag";
//create DAPR client
DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build();
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
@ -296,7 +296,7 @@ public class HttpStateClientIT extends BaseIT {
final String stateKey = "myeKeyToBeDeletedWithWrongEtag";
//create DAPR client
DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build();
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
@ -338,7 +338,7 @@ public class HttpStateClientIT extends BaseIT {
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, null);
//create dapr client
DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build();
DaprClient daprClient = buildDaprClient();
//create Dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
@ -396,7 +396,7 @@ public class HttpStateClientIT extends BaseIT {
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE, null);
//create dapr client
DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build();
DaprClient daprClient = buildDaprClient();
//create Dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
@ -453,7 +453,7 @@ public class HttpStateClientIT extends BaseIT {
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
//create DAPR client
DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build();
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
@ -501,7 +501,7 @@ public class HttpStateClientIT extends BaseIT {
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
//create DAPR client
DaprClient daprClient = new DaprClientBuilder(new DefaultObjectSerializer()).build();
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
@ -542,4 +542,8 @@ public class HttpStateClientIT extends BaseIT {
}
private static DaprClient buildDaprClient() {
return new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer()).build();
}
}

View File

@ -10,6 +10,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
import io.dapr.client.*;
import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.Verb;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.Constants;
import org.junit.Assert;
import org.junit.Before;

View File

@ -3,10 +3,10 @@
* Licensed under the MIT License.
*/
package io.dapr.utils;
package io.dapr.serializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.DefaultObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.client.domain.CloudEvent;
import org.junit.Assert;
import org.junit.Test;
@ -250,7 +250,7 @@ public class DefaultObjectSerializerTest {
byte [] byteValue;
try {
serializedValue = new String(SERIALIZER.serialize(valueToSerialize));
assertEquals("\"" + valueToSerialize + "\"", serializedValue);
assertEquals(valueToSerialize, serializedValue);
byteValue = SERIALIZER.serialize(valueToSerialize);
assertNotNull(byteValue);
String deserializedValue = SERIALIZER.deserialize(byteValue, String.class);
@ -730,8 +730,8 @@ public class DefaultObjectSerializerTest {
}
});
Assert.assertTrue(check.apply(null));
Assert.assertTrue(check.apply(
assertTrue(check.apply(null));
assertTrue(check.apply(
new CloudEvent(
"1",
"mysource",
@ -739,7 +739,7 @@ public class DefaultObjectSerializerTest {
"v2",
"XML",
"<root></root>")));
Assert.assertTrue(check.apply(
assertTrue(check.apply(
new CloudEvent(
"1234-65432",
"myother",