Merge pull request #516 from artursouza/fix_state_trx (#517)

Fix HTTP save state transaction.
This commit is contained in:
Artur Souza 2021-03-17 11:08:16 -07:00 committed by GitHub
parent 69606d0639
commit a080e44467
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 101 additions and 1 deletions

View File

@ -62,6 +62,12 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.11.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>

View File

@ -395,7 +395,7 @@ public class DaprClientHttp extends AbstractDaprClient {
}
byte[] data = this.stateSerializer.serialize(state.getValue());
// Custom serializer, so everything is byte[].
operations.add(new TransactionalStateOperation<>(operation.getOperation(),
internalOperationObjects.add(new TransactionalStateOperation<>(operation.getOperation(),
new State<>(state.getKey(), data, state.getEtag(), state.getMetadata(), state.getOptions())));
}
TransactionalStateRequest<Object> req = new TransactionalStateRequest<>(internalOperationObjects, metadata);

View File

@ -5,6 +5,8 @@
package io.dapr.client;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import io.dapr.client.domain.DeleteStateRequestBuilder;
import io.dapr.client.domain.GetBulkStateRequestBuilder;
import io.dapr.client.domain.GetStateRequestBuilder;
@ -15,12 +17,17 @@ import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.ResponseBody;
import okhttp3.mock.Behavior;
import okhttp3.mock.MediaTypes;
import okhttp3.mock.MockInterceptor;
import okhttp3.mock.matchers.Matcher;
import okio.Buffer;
import okio.BufferedSink;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@ -57,6 +64,8 @@ public class DaprClientHttpTest {
private DaprClient daprClientHttp;
private DaprClient daprClientHttpXML;
private DaprHttp daprHttp;
private OkHttpClient okHttpClient;
@ -69,6 +78,7 @@ public class DaprClientHttpTest {
okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
daprClientHttp = new DaprClientProxy(new DaprClientHttp(daprHttp));
daprClientHttpXML = new DaprClientProxy(new DaprClientHttp(daprHttp, new XmlSerializer(), new XmlSerializer()));
}
@Test
@ -848,6 +858,10 @@ public class DaprClientHttpTest {
public void simpleExecuteTransaction() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/state/MyStateStore/transaction")
.matches(new BodyMatcher(
"{\"operations\":[{\"operation\":\"upsert\",\"request\":{\"value\":\"my data\",\"key\":\"key1\"," +
"\"etag\":\"ETag1\",\"options\":{}}},{\"operation\":\"delete\",\"request\":{\"key\":\"deleteKey\"}}]}"
))
.respond(EXPECTED_RESULT);
String etag = "ETag1";
String key = "key1";
@ -867,6 +881,33 @@ public class DaprClientHttpTest {
assertNull(mono.block());
}
@Test
public void simpleExecuteTransactionXMLData() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/state/MyStateStore/transaction")
.matches(new BodyMatcher("{\"operations\":[{\"operation\":\"upsert\"," +
"\"request\":{\"value\":\"PFN0cmluZz5teSBkYXRhPC9TdHJpbmc+\",\"key\":\"key1\",\"etag\":\"ETag1\"," +
"\"options\":{}}},{\"operation\":\"delete\",\"request\":{\"value\":\"PG51bGwvPg==\"," +
"\"key\":\"deleteKey\"}}]}"))
.respond(EXPECTED_RESULT);
String etag = "ETag1";
String key = "key1";
String data = "my data";
StateOptions stateOptions = mock(StateOptions.class);
State<String> stateKey = new State<>(key, data, etag, stateOptions);
TransactionalStateOperation<String> upsertOperation = new TransactionalStateOperation<>(
TransactionalStateOperation.OperationType.UPSERT,
stateKey);
TransactionalStateOperation<String> deleteOperation = new TransactionalStateOperation<>(
TransactionalStateOperation.OperationType.DELETE,
new State<>("deleteKey"));
Mono<Void> mono = daprClientHttpXML.executeStateTransaction(STATE_STORE_NAME, Arrays.asList(upsertOperation,
deleteOperation));
assertNull(mono.block());
}
@Test
public void simpleExecuteTransactionNullEtag() {
mockInterceptor.addRule()
@ -1226,6 +1267,59 @@ public class DaprClientHttpTest {
daprClientHttp.close();
}
private static final class BodyMatcher implements Matcher {
private final String expected;
private BodyMatcher(String expected) {
this.expected = expected;
}
@Override
public boolean matches(Request request) {
BufferedSink sink = new Buffer();
try {
request.body().writeTo(sink);
} catch (IOException e) {
return false;
}
String body = sink.getBuffer().readByteString().utf8();
return expected.equals(body);
}
@Override
public String failReason(Request request) {
BufferedSink sink = new Buffer();
try {
request.body().writeTo(sink);
} catch (IOException e) {
throw new RuntimeException(e);
}
String body = sink.getBuffer().readByteString().utf8();
return String.format("Body does not match expected:\n%s\nvs actual\n%s", expected, body);
}
}
private static class XmlSerializer implements DaprObjectSerializer {
private static final XmlMapper XML_MAPPER = new XmlMapper();
@Override
public byte[] serialize(Object o) throws IOException {
return XML_MAPPER.writeValueAsBytes(o);
}
@Override
public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
return XML_MAPPER.readValue(data, new TypeReference<T>() {});
}
@Override
public String getContentType() {
return "application/xml";
}
}
public static class MyObject {
private Integer id;
private String value;