mirror of https://github.com/dapr/java-sdk.git
Fix save bytes (#353)
* IT for save byte error in actor state. * Update proto version. * Fixes bug on read/write byte[] in actor state. * Addresses comments round 1.
This commit is contained in:
parent
b1691a6138
commit
9076d9f60e
2
pom.xml
2
pom.xml
|
@ -17,7 +17,7 @@
|
|||
<grpc.version>1.25.0</grpc.version>
|
||||
<protobuf.version>3.11.0</protobuf.version>
|
||||
<protoc.version>3.10.0</protoc.version>
|
||||
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/1660773c3da6740377f4440cae89dea93479c9f5/dapr/proto</dapr.proto.baseurl>
|
||||
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/67c4553aea13acef634540469f4a287b759a288e/dapr/proto</dapr.proto.baseurl>
|
||||
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
|
||||
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
|
||||
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
|
||||
|
|
|
@ -7,6 +7,7 @@ package io.dapr.actors.runtime;
|
|||
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.dapr.actors.ActorId;
|
||||
import io.dapr.config.Properties;
|
||||
import io.dapr.serializer.DaprObjectSerializer;
|
||||
|
@ -28,6 +29,11 @@ class DaprStateAsyncProvider {
|
|||
*/
|
||||
private static final Charset CHARSET = Properties.STRING_CHARSET.get();
|
||||
|
||||
/**
|
||||
* Handles special serialization cases.
|
||||
*/
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
/**
|
||||
* Shared Json Factory as per Jackson's documentation, used only for this class.
|
||||
*/
|
||||
|
@ -66,6 +72,10 @@ class DaprStateAsyncProvider {
|
|||
return result.flatMap(s -> {
|
||||
try {
|
||||
T response = this.stateSerializer.deserialize(s, type);
|
||||
if (this.isStateSerializerDefault && (response instanceof byte[])) {
|
||||
// Default serializer just passes through byte arrays, so we need to decode it here.
|
||||
response = (T) OBJECT_MAPPER.readValue(s, byte[].class);
|
||||
}
|
||||
if (response == null) {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
@ -141,12 +151,13 @@ class DaprStateAsyncProvider {
|
|||
|| (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) {
|
||||
byte[] data = this.stateSerializer.serialize(stateChange.getValue());
|
||||
if (data != null) {
|
||||
if (this.isStateSerializerDefault) {
|
||||
if (this.isStateSerializerDefault && !(stateChange.getValue() instanceof byte[])) {
|
||||
// DefaultObjectSerializer is a JSON serializer, so we just pass it on.
|
||||
generator.writeFieldName("value");
|
||||
generator.writeRawValue(new String(data, CHARSET));
|
||||
} else {
|
||||
// Custom serializer uses byte[].
|
||||
// DefaultObjectSerializer is just a passthrough for byte[], so we handle it here too.
|
||||
generator.writeBinaryField("value", data);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -90,13 +90,14 @@ public class DaprStateAsyncProviderTest {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (node.size() != 3) {
|
||||
if (node.size() != 4) {
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean foundInsertName = false;
|
||||
boolean foundUpdateZipcode = false;
|
||||
boolean foundDeleteFlag = false;
|
||||
boolean foundUpdateBytes = false;
|
||||
for (JsonNode operation : node) {
|
||||
if (operation.get("operation") == null) {
|
||||
return false;
|
||||
|
@ -119,9 +120,12 @@ public class DaprStateAsyncProviderTest {
|
|||
foundDeleteFlag |= "delete".equals(opName) &&
|
||||
"flag".equals(key) &&
|
||||
(value == null);
|
||||
foundUpdateBytes |= "upsert".equals(opName) &&
|
||||
"bytes".equals(key) &&
|
||||
"AQ==".equals(value);
|
||||
}
|
||||
|
||||
return foundInsertName && foundUpdateZipcode && foundDeleteFlag;
|
||||
return foundInsertName && foundUpdateZipcode && foundDeleteFlag && foundUpdateBytes;
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
|
@ -134,7 +138,8 @@ public class DaprStateAsyncProviderTest {
|
|||
new ActorId("123"),
|
||||
createInsertChange("name", "Jon Doe"),
|
||||
createUpdateChange("zipcode", 98011),
|
||||
createDeleteChange("flag"))
|
||||
createDeleteChange("flag"),
|
||||
createUpdateChange("bytes", new byte[] {0x1}))
|
||||
.block();
|
||||
|
||||
verify(daprClient).saveActorStateTransactionally(eq("MyActor"), eq("123"), any());
|
||||
|
@ -167,6 +172,9 @@ public class DaprStateAsyncProviderTest {
|
|||
when(daprClient
|
||||
.getActorState(any(), any(), eq("nullCustomer")))
|
||||
.thenReturn(Mono.empty());
|
||||
when(daprClient
|
||||
.getActorState(any(), any(), eq("bytes")))
|
||||
.thenReturn(Mono.just("\"QQ==\"".getBytes()));
|
||||
|
||||
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprClient, SERIALIZER);
|
||||
|
||||
|
@ -189,6 +197,8 @@ public class DaprStateAsyncProviderTest {
|
|||
provider.load("MyActor", new ActorId("123"), "anotherCustomer", TypeRef.get(Customer.class)).block());
|
||||
Assert.assertNull(
|
||||
provider.load("MyActor", new ActorId("123"), "nullCustomer", TypeRef.get(Customer.class)).block());
|
||||
Assert.assertArrayEquals("A".getBytes(),
|
||||
provider.load("MyActor", new ActorId("123"), "bytes", TypeRef.get(byte[].class)).block());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -36,6 +36,7 @@ public class ActorStateIT extends BaseIT {
|
|||
|
||||
String message = "This is a message to be saved and retrieved.";
|
||||
String name = "Jon Doe";
|
||||
byte[] bytes = new byte[] { 0x1 };
|
||||
ActorId actorId = new ActorId(Long.toString(System.currentTimeMillis()));
|
||||
String actorType = "StatefulActorTest";
|
||||
logger.debug("Building proxy ...");
|
||||
|
@ -96,6 +97,17 @@ public class ActorStateIT extends BaseIT {
|
|||
assertEquals("", result);
|
||||
}, 5000);
|
||||
|
||||
callWithRetry(() -> {
|
||||
logger.debug("Invoking writeBytes ... ");
|
||||
proxy.invokeActorMethod("writeBytes", bytes).block();
|
||||
}, 5000);
|
||||
|
||||
callWithRetry(() -> {
|
||||
logger.debug("Invoking readBytes where data is probably still cached ... ");
|
||||
byte[] result = proxy.invokeActorMethod("readBytes", byte[].class).block();
|
||||
assertArrayEquals(bytes, result);
|
||||
}, 5000);
|
||||
|
||||
logger.debug("Waiting, so actor can be deactivated ...");
|
||||
Thread.sleep(10000);
|
||||
|
||||
|
@ -130,5 +142,11 @@ public class ActorStateIT extends BaseIT {
|
|||
String result = newProxy.invokeActorMethod("readName", String.class).block();
|
||||
assertEquals("", result);
|
||||
}, 5000);
|
||||
|
||||
callWithRetry(() -> {
|
||||
logger.debug("Invoking readBytes where content is not cached ... ");
|
||||
byte[] result = newProxy.invokeActorMethod("readBytes", byte[].class).block();
|
||||
assertArrayEquals(bytes, result);
|
||||
}, 5000);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,10 @@ public interface StatefulActor {
|
|||
|
||||
MyData readData();
|
||||
|
||||
void writeBytes(byte[] something);
|
||||
|
||||
byte[] readBytes();
|
||||
|
||||
class MyData {
|
||||
public String value;
|
||||
}
|
||||
|
|
|
@ -58,4 +58,18 @@ public class StatefulActorImpl extends AbstractActor implements StatefulActor {
|
|||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBytes(byte[] something) {
|
||||
super.getActorStateManager().set("bytes", something).block();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] readBytes() {
|
||||
if (super.getActorStateManager().contains("bytes").block()) {
|
||||
return super.getActorStateManager().get("bytes", byte[].class).block();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue