Fixing bug, where call should be deferred instead of async (#102)

* Fixing bug, where call should be deferred instead of async

* Removing unused pool property
Returning null as response body if empty response is found.
Adding Test case for validating callback is executed when expected.

* Fixing assertion based on previous changes

* Adding test case to verify that the call to grpc only happens when the requestor block the thread, instead of in parallel immediately after calling the methods in the adapter.
Documenting complex test case

* Fixing merge conflicts

* Documenting complex test case
This commit is contained in:
Andres Robles 2020-01-16 16:34:39 -06:00 committed by Leon Mai
parent 0fed4fe76e
commit c67d5678d0
4 changed files with 203 additions and 70 deletions

View File

@ -71,14 +71,11 @@ class DaprClientGrpcAdapter implements DaprClient {
DaprProtos.PublishEventEnvelope envelope = DaprProtos.PublishEventEnvelope.newBuilder() DaprProtos.PublishEventEnvelope envelope = DaprProtos.PublishEventEnvelope.newBuilder()
.setTopic(topic).setData(data).build(); .setTopic(topic).setData(data).build();
ListenableFuture<Empty> futureEmpty = client.publishEvent(envelope);
return Mono.just(futureEmpty).flatMap(f -> { return Mono.fromCallable(() -> {
try { ListenableFuture<Empty> futureEmpty = client.publishEvent(envelope);
f.get(); futureEmpty.get();
} catch (Exception ex) { return null;
return Mono.error(ex);
}
return Mono.empty();
}); });
} catch (Exception ex) { } catch (Exception ex) {
return Mono.error(ex); return Mono.error(ex);
@ -92,16 +89,11 @@ class DaprClientGrpcAdapter implements DaprClient {
public <T, R> Mono<T> invokeService(Verb verb, String appId, String method, R request, Map<String, String> metadata, Class<T> clazz) { public <T, R> Mono<T> invokeService(Verb verb, String appId, String method, R request, Map<String, String> metadata, Class<T> clazz) {
try { try {
DaprProtos.InvokeServiceEnvelope envelope = buildInvokeServiceEnvelope(verb.toString(), appId, method, request); DaprProtos.InvokeServiceEnvelope envelope = buildInvokeServiceEnvelope(verb.toString(), appId, method, request);
ListenableFuture<DaprProtos.InvokeServiceResponseEnvelope> futureResponse = return Mono.fromCallable(() -> {
client.invokeService(envelope); ListenableFuture<DaprProtos.InvokeServiceResponseEnvelope> futureResponse =
return Mono.just(futureResponse).flatMap(f -> { client.invokeService(envelope);
try { return objectSerializer.deserialize(futureResponse.get().getData().getValue().toByteArray(), clazz);
return Mono.just(objectSerializer.deserialize(f.get().getData().getValue().toByteArray(), clazz));
} catch (Exception ex) {
return Mono.error(ex);
}
}); });
} catch (Exception ex) { } catch (Exception ex) {
return Mono.error(ex); return Mono.error(ex);
} }
@ -151,14 +143,10 @@ class DaprClientGrpcAdapter implements DaprClient {
.setName(name) .setName(name)
.setData(data); .setData(data);
DaprProtos.InvokeBindingEnvelope envelope = builder.build(); DaprProtos.InvokeBindingEnvelope envelope = builder.build();
ListenableFuture<Empty> futureEmpty = client.invokeBinding(envelope); return Mono.fromCallable(() -> {
return Mono.just(futureEmpty).flatMap(f -> { ListenableFuture<Empty> futureEmpty = client.invokeBinding(envelope);
try { futureEmpty.get();
f.get(); return null;
} catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
}); });
} catch (Exception ex) { } catch (Exception ex) {
return Mono.error(ex); return Mono.error(ex);
@ -167,7 +155,7 @@ class DaprClientGrpcAdapter implements DaprClient {
/** /**
* @return Returns an io.dapr.client.domain.StateKeyValue * @return Returns an io.dapr.client.domain.StateKeyValue
* * <p>
* {@inheritDoc} * {@inheritDoc}
*/ */
@Override @Override
@ -180,22 +168,23 @@ class DaprClientGrpcAdapter implements DaprClient {
} }
DaprProtos.GetStateEnvelope envelope = builder.build(); DaprProtos.GetStateEnvelope envelope = builder.build();
ListenableFuture<DaprProtos.GetStateResponseEnvelope> futureResponse = client.getState(envelope); return Mono.fromCallable(() -> {
return Mono.just(futureResponse).flatMap(f -> { ListenableFuture<DaprProtos.GetStateResponseEnvelope> futureResponse = client.getState(envelope);
DaprProtos.GetStateResponseEnvelope response = null;
try { try {
return Mono.just(buildStateKeyValue(f.get(), state.getKey(), stateOptions, clazz)); response = futureResponse.get();
} catch (Exception ex) { } catch (NullPointerException npe) {
return Mono.error(ex); return null;
} }
}); return buildStateKeyValue(response, state.getKey(), stateOptions, clazz);
} catch (Exception ex) { }); } catch (Exception ex) {
return Mono.error(ex); return Mono.error(ex);
} }
} }
private <T> StateKeyValue<T> buildStateKeyValue(DaprProtos.GetStateResponseEnvelope resonse, String requestedKey, StateOptions stateOptions, Class<T> clazz) throws IOException { private <T> StateKeyValue<T> buildStateKeyValue(DaprProtos.GetStateResponseEnvelope response, String requestedKey, StateOptions stateOptions, Class<T> clazz) throws IOException {
T value = objectSerializer.deserialize(resonse.getData().getValue().toByteArray(), clazz); T value = objectSerializer.deserialize(Optional.ofNullable(response.getData().getValue().toByteArray()).orElse(null), clazz);
String etag = resonse.getEtag(); String etag = response.getEtag();
String key = requestedKey; String key = requestedKey;
return new StateKeyValue<>(value, key, etag, stateOptions); return new StateKeyValue<>(value, key, etag, stateOptions);
} }
@ -208,8 +197,7 @@ class DaprClientGrpcAdapter implements DaprClient {
try { try {
DaprProtos.SaveStateEnvelope.Builder builder = DaprProtos.SaveStateEnvelope.newBuilder(); DaprProtos.SaveStateEnvelope.Builder builder = DaprProtos.SaveStateEnvelope.newBuilder();
for (StateKeyValue state : states) { for (StateKeyValue state : states) {
builder.addRequests(buildStateRequest(state).build()); builder.addRequests(buildStateRequest(state).build()); }
}
DaprProtos.SaveStateEnvelope envelope = builder.build(); DaprProtos.SaveStateEnvelope envelope = builder.build();
ListenableFuture<Empty> futureEmpty = client.saveState(envelope); ListenableFuture<Empty> futureEmpty = client.saveState(envelope);
@ -282,7 +270,7 @@ class DaprClientGrpcAdapter implements DaprClient {
@Override @Override
public <T> Mono<Void> deleteState(StateKeyValue<T> state, StateOptions options) { public <T> Mono<Void> deleteState(StateKeyValue<T> state, StateOptions options) {
try { try {
DaprProtos.StateOptions.Builder optionBuilder = null; DaprProtos.StateOptions.Builder optionBuilder = null;
if (options != null) { if (options != null) {
optionBuilder = DaprProtos.StateOptions.newBuilder(); optionBuilder = DaprProtos.StateOptions.newBuilder();
@ -407,12 +395,13 @@ class DaprClientGrpcAdapter implements DaprClient {
/** /**
* Builds the object io.dapr.{@link DaprProtos.InvokeServiceEnvelope} to be send based on the parameters. * Builds the object io.dapr.{@link DaprProtos.InvokeServiceEnvelope} to be send based on the parameters.
* @param verb String that must match HTTP Methods *
* @param appId The application id to be invoked * @param verb String that must match HTTP Methods
* @param method The application method to be invoked * @param appId The application id to be invoked
* @param request The body of the request to be send as part of the invokation * @param method The application method to be invoked
* @param <K> The Type of the Body * @param request The body of the request to be send as part of the invokation
* @return The object to be sent as part of the invokation. * @param <K> The Type of the Body
* @return The object to be sent as part of the invokation.
* @throws IOException If there's an issue serializing the request. * @throws IOException If there's an issue serializing the request.
*/ */
private <K> DaprProtos.InvokeServiceEnvelope buildInvokeServiceEnvelope( private <K> DaprProtos.InvokeServiceEnvelope buildInvokeServiceEnvelope(

View File

@ -83,11 +83,6 @@ class DaprHttp {
*/ */
private final OkHttpClient httpClient; private final OkHttpClient httpClient;
/**
* Thread-pool for HTTP calls.
*/
private final ExecutorService pool;
/** /**
* Creates a new instance of {@link DaprHttp}. * Creates a new instance of {@link DaprHttp}.
* *
@ -97,7 +92,6 @@ class DaprHttp {
DaprHttp(int port, OkHttpClient httpClient) { DaprHttp(int port, OkHttpClient httpClient) {
this.port = port; this.port = port;
this.httpClient = httpClient; this.httpClient = httpClient;
this.pool = Executors.newWorkStealingPool();
} }
/** /**
@ -132,7 +126,7 @@ class DaprHttp {
* @return Asynchronous text * @return Asynchronous text
*/ */
public Mono<Response> invokeAPI(String method, String urlString, Map<String, String> urlParameters, byte[] content, Map<String, String> headers) { public Mono<Response> invokeAPI(String method, String urlString, Map<String, String> urlParameters, byte[] content, Map<String, String> headers) {
return Mono.fromFuture(CompletableFuture.supplyAsync( return Mono.fromCallable(
() -> { () -> {
try { try {
String requestId = UUID.randomUUID().toString(); String requestId = UUID.randomUUID().toString();
@ -185,12 +179,12 @@ class DaprHttp {
response.headers().forEach(pair -> { response.headers().forEach(pair -> {
mapHeaders.put(pair.getFirst(), pair.getSecond()); mapHeaders.put(pair.getFirst(), pair.getSecond());
}); });
return new Response(result, mapHeaders, response.code()); return new Response(result.length > 0 ? result : null, mapHeaders, response.code());
} }
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}, this.pool)); });
} }
/** /**

View File

@ -14,12 +14,15 @@ import io.dapr.utils.ObjectSerializer;
import org.checkerframework.checker.nullness.compatqual.NullableDecl; import org.checkerframework.checker.nullness.compatqual.NullableDecl;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentMatcher;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import static com.google.common.util.concurrent.Futures.addCallback; import static com.google.common.util.concurrent.Futures.addCallback;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
@ -42,13 +45,13 @@ public class DaprClientGrpcAdapterTest {
@Test(expected = UnsupportedOperationException.class) @Test(expected = UnsupportedOperationException.class)
public void unregisterActorTimerTest() { public void unregisterActorTimerTest() {
Mono<Void> result = adater.unregisterActorTimer("actorType", "actorId", "timerName"); Mono<Void> result = adater.unregisterActorTimer("actorType", "actorId", "timerName");
result.block(); result.block();
} }
@Test(expected = UnsupportedOperationException.class) @Test(expected = UnsupportedOperationException.class)
public void registerActorTimerTest() { public void registerActorTimerTest() {
Mono<Void> result = adater.registerActorTimer("actorType", "actorId", "timerName" , "DATA"); Mono<Void> result = adater.registerActorTimer("actorType", "actorId", "timerName", "DATA");
result.block(); result.block();
} }
@ -273,7 +276,7 @@ public class DaprClientGrpcAdapterTest {
} }
@Test @Test
public void invokeServiceObjectTest() throws Exception { public void invokeServiceObjectTest() throws Exception {
MyObject resultObj = new MyObject(1, "Value"); MyObject resultObj = new MyObject(1, "Value");
SettableFuture<DaprProtos.InvokeServiceResponseEnvelope> settableFuture = SettableFuture.create(); SettableFuture<DaprProtos.InvokeServiceResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.InvokeServiceResponseEnvelope> callback = MockCallback<DaprProtos.InvokeServiceResponseEnvelope> callback =
@ -322,13 +325,13 @@ public class DaprClientGrpcAdapterTest {
settableFuture.set(DaprProtos.InvokeServiceResponseEnvelope.newBuilder().setData(getAny(expected)).build()); settableFuture.set(DaprProtos.InvokeServiceResponseEnvelope.newBuilder().setData(getAny(expected)).build());
when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class))) when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class)))
.thenReturn(settableFuture); .thenReturn(settableFuture);
Mono<String> result = adater.invokeService(Verb.GET, "appId", "method",null, String.class); Mono<String> result = adater.invokeService(Verb.GET, "appId", "method", null, String.class);
String strOutput = result.block(); String strOutput = result.block();
assertEquals(expected, strOutput); assertEquals(expected, strOutput);
} }
@Test @Test
public void invokeServiceNoRequestBodyObjectTest() throws Exception { public void invokeServiceNoRequestBodyObjectTest() throws Exception {
MyObject resultObj = new MyObject(1, "Value"); MyObject resultObj = new MyObject(1, "Value");
SettableFuture<DaprProtos.InvokeServiceResponseEnvelope> settableFuture = SettableFuture.create(); SettableFuture<DaprProtos.InvokeServiceResponseEnvelope> settableFuture = SettableFuture.create();
@ -339,7 +342,7 @@ public class DaprClientGrpcAdapterTest {
settableFuture.set(DaprProtos.InvokeServiceResponseEnvelope.newBuilder().setData(getAny(resultObj)).build()); settableFuture.set(DaprProtos.InvokeServiceResponseEnvelope.newBuilder().setData(getAny(resultObj)).build());
when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class))) when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class)))
.thenReturn(settableFuture); .thenReturn(settableFuture);
Mono<String> result = adater.invokeService(Verb.GET, "appId", "method",null, String.class); Mono<String> result = adater.invokeService(Verb.GET, "appId", "method", null, String.class);
String strOutput = result.block(); String strOutput = result.block();
assertEquals(serializer.serializeString(resultObj), strOutput); assertEquals(serializer.serializeString(resultObj), strOutput);
} }
@ -390,7 +393,7 @@ public class DaprClientGrpcAdapterTest {
} }
@Test @Test
public void invokeServiceByteRequestObjectTest() throws Exception { public void invokeServiceByteRequestObjectTest() throws Exception {
MyObject resultObj = new MyObject(1, "Value"); MyObject resultObj = new MyObject(1, "Value");
SettableFuture<DaprProtos.InvokeServiceResponseEnvelope> settableFuture = SettableFuture.create(); SettableFuture<DaprProtos.InvokeServiceResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.InvokeServiceResponseEnvelope> callback = MockCallback<DaprProtos.InvokeServiceResponseEnvelope> callback =
@ -446,7 +449,7 @@ public class DaprClientGrpcAdapterTest {
} }
@Test @Test
public void invokeServiceNoRequestNoClassBodyObjectTest() throws Exception { public void invokeServiceNoRequestNoClassBodyObjectTest() throws Exception {
MyObject resultObj = new MyObject(1, "Value"); MyObject resultObj = new MyObject(1, "Value");
SettableFuture<DaprProtos.InvokeServiceResponseEnvelope> settableFuture = SettableFuture.create(); SettableFuture<DaprProtos.InvokeServiceResponseEnvelope> settableFuture = SettableFuture.create();
@ -491,10 +494,7 @@ public class DaprClientGrpcAdapterTest {
String key = "key1"; String key = "key1";
String expectedValue = "Expected state"; String expectedValue = "Expected state";
StateKeyValue<String> expectedState = buildStateKey(expectedValue, key, etag, null); StateKeyValue<String> expectedState = buildStateKey(expectedValue, key, etag, null);
DaprProtos.GetStateResponseEnvelope responseEnvelope = DaprProtos.GetStateResponseEnvelope.newBuilder() DaprProtos.GetStateResponseEnvelope responseEnvelope = buildGetStateResponseEnvelope(expectedValue, etag);
.setData(getAny(expectedValue))
.setEtag(etag)
.build();
SettableFuture<DaprProtos.GetStateResponseEnvelope> settableFuture = SettableFuture.create(); SettableFuture<DaprProtos.GetStateResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(responseEnvelope); MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor()); addCallback(settableFuture, callback, directExecutor());
@ -880,6 +880,76 @@ public class DaprClientGrpcAdapterTest {
return new StateKeyValue(value, key, etag, options); return new StateKeyValue(value, key, etag, options);
} }
/**
* The purpose of this test is to show that it doesn't matter when the client is called, the actual coll to DAPR
* will be done when the output Mono response call the Mono.block method.
* Like for instanche if you call getState, withouth blocking for the response, and then call delete for the same state
* you just retrived but block for the delete response, when later you block for the response of the getState, you will
* not found the state.
* <p>This test will execute the following flow:</p>
* <ol>
* <li>Exeucte client getState for Key=key1</li>
* <li>Block for result to the the state</li>
* <li>Assert the Returned State is the expected to key1</li>
* <li>Execute client getState for Key=key2</li>
* <li>Execute client deleteState for Key=key2</li>
* <li>Block for deleteState call.</li>
* <li>Block for getState for Key=key2 and Assert they 2 was not found.</li>
* </ol>
* @throws Exception
*/
@Test
public void getStateDeleteStateThenBlockDeleteThenBlockGet() throws Exception {
String etag = "ETag1";
String key1 = "key1";
String expectedValue1 = "Expected state 1";
String key2 = "key2";
String expectedValue2 = "Expected state 2";
StateKeyValue<String> expectedState1 = buildStateKey(expectedValue1, key1, etag, null);
Map<String, SettableFuture<DaprProtos.GetStateResponseEnvelope>> futuresMap = new HashMap<>();
futuresMap.put(key1, buildFutureGetStateEnvelop(expectedValue1, etag));
futuresMap.put(key2, buildFutureGetStateEnvelop(expectedValue2, etag));
when(client.getState(argThat(new GetStateEnvelopeKeyMatcher(key1)))).thenReturn(futuresMap.get(key1));
StateKeyValue<String> keyRequest1 = buildStateKey(null, key1, etag, null);
Mono<StateKeyValue<String>> resultGet1 = adater.getState(keyRequest1, null, String.class);
assertEquals(expectedState1, resultGet1.block());
StateKeyValue<String> keyRequest2 = buildStateKey(null, key2, etag, null);
Mono<StateKeyValue<String>> resultGet2 = adater.getState(keyRequest2, null, String.class);
SettableFuture<Empty> settableFutureDelete = SettableFuture.create();
MockCallback<Empty> callbackDelete = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFutureDelete, callbackDelete, directExecutor());
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFutureDelete);
Mono<Void> resultDelete = adater.deleteState(keyRequest2, null);
settableFutureDelete.set(Empty.newBuilder().build());
resultDelete.block();
assertTrue(callbackDelete.wasCalled);
futuresMap.replace(key2, null);
when(client.getState(argThat(new GetStateEnvelopeKeyMatcher(key2)))).thenReturn(futuresMap.get(key2));
StateKeyValue<String> state2 = resultGet2.block();
assertNull(state2);
}
private <T> SettableFuture<DaprProtos.GetStateResponseEnvelope> buildFutureGetStateEnvelop(T value, String etag) throws IOException {
DaprProtos.GetStateResponseEnvelope envelope = buildGetStateResponseEnvelope(value, etag);
SettableFuture<DaprProtos.GetStateResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(envelope);
addCallback(settableFuture, callback, directExecutor());
settableFuture.set(envelope);
return settableFuture;
}
private <T> DaprProtos.GetStateResponseEnvelope buildGetStateResponseEnvelope(T value, String etag) throws IOException {
return DaprProtos.GetStateResponseEnvelope.newBuilder()
.setData(getAny(value))
.setEtag(etag)
.build();
}
private StateOptions buildStateOptions(StateOptions.Consistency consistency, StateOptions.Concurrency concurrency, private StateOptions buildStateOptions(StateOptions.Consistency consistency, StateOptions.Concurrency concurrency,
Duration interval, Integer threshold, StateOptions.RetryPolicy.Pattern pattern) { Duration interval, Integer threshold, StateOptions.RetryPolicy.Pattern pattern) {
@ -977,4 +1047,32 @@ public class DaprClientGrpcAdapterTest {
return result; return result;
} }
} }
private static class GetStateEnvelopeKeyMatcher implements ArgumentMatcher<DaprProtos.GetStateEnvelope> {
private final String propValue;
GetStateEnvelopeKeyMatcher(String propValue) {
this.propValue = propValue;
}
@Override
public boolean matches(DaprProtos.GetStateEnvelope argument) {
if (argument == null) {
return false;
}
if (propValue == null && argument.getKey() != null) {
return false;
}
if (propValue == null && argument.getKey() == null) {
return true;
}
return propValue.equals(argument.getKey());
}
@Override
public String toString() {
return "<Has property of certain value (propName: " + propValue + ") matcher>";
}
}
} }

View File

@ -4,6 +4,8 @@
*/ */
package io.dapr.client; package io.dapr.client;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.Constants;
import io.dapr.utils.ObjectSerializer; import io.dapr.utils.ObjectSerializer;
import okhttp3.*; import okhttp3.*;
import okhttp3.mock.Behavior; import okhttp3.mock.Behavior;
@ -16,7 +18,7 @@ import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
public class DaprHttpTest { public class DaprHttpTest {
@ -177,16 +179,66 @@ public class DaprHttpTest {
@Test @Test
public void getHeadersAndStatus(){ public void getHeadersAndStatus(){
mockInterceptor.addRule() mockInterceptor.addRule()
.post("http://localhost:3500/v1.0/state") .post("http://localhost:3500/v1.0/state")
.respond(500, ResponseBody.create(MediaType.parse("application/json"), .respond(500, ResponseBody.create(MediaType.parse("application/json"),
"{\"errorCode\":\"null\",\"message\":\"null\"}")); "{\"errorCode\":\"null\",\"message\":\"null\"}"));
DaprHttp daprHttp = new DaprHttp(3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(3500, okHttpClient);
System.out.println(daprHttp); System.out.println(daprHttp);
}
/**
* The purpose of this test is to show that it doesn't matter when the client is called, the actual coll to DAPR
* will be done when the output Mono response call the Mono.block method.
* Like for instanche if you call getState, withouth blocking for the response, and then call delete for the same state
* you just retrived but block for the delete response, when later you block for the response of the getState, you will
* not found the state.
* <p>This test will execute the following flow:</p>
* <ol>
* <li>Exeucte client getState for Key=key1</li>
* <li>Block for result to the the state</li>
* <li>Assert the Returned State is the expected to key1</li>
* <li>Execute client getState for Key=key2</li>
* <li>Execute client deleteState for Key=key2</li>
* <li>Block for deleteState call.</li>
* <li>Block for getState for Key=key2 and Assert they 2 was not found.</li>
* </ol>
* @throws Exception
*/
@Test ()
public void testCallbackCalledAtTheExpectedTimeTest() throws IOException {
String deletedStateKey = "deletedKey";
String existingState = "existingState";
String urlDeleteState = Constants.STATE_PATH + "/" + deletedStateKey;
String urlExistingState = Constants.STATE_PATH + "/" + existingState;
mockInterceptor.addRule()
.get("http://localhost:3500/" + urlDeleteState)
.respond(200, ResponseBody.create(MediaType.parse("application/json"),
deletedStateKey));
mockInterceptor.addRule()
.delete("http://localhost:3500/" + urlDeleteState)
.respond(204);
mockInterceptor.addRule()
.get("http://localhost:3500/" +urlExistingState)
.respond(200, ResponseBody.create(MediaType.parse("application/json"),
existingState));
DaprHttp daprHttp = new DaprHttp(3500, okHttpClient);
Mono<DaprHttp.Response> response = daprHttp.invokeAPI("GET", urlExistingState, null, null);
assertEquals(existingState, serializer.deserialize(response.block().getBody(), String.class));
Mono<DaprHttp.Response> responseDeleted = daprHttp.invokeAPI("GET", urlDeleteState, null, null);
Mono<DaprHttp.Response> responseDeleteKey = daprHttp.invokeAPI("DELETE", urlDeleteState, null, null);
assertNull(serializer.deserialize(responseDeleteKey.block().getBody(), String.class));
mockInterceptor.reset();
mockInterceptor.addRule()
.get("http://localhost:3500/" +urlDeleteState)
.respond(404, ResponseBody.create(MediaType.parse("application/json"),
"{\"errorCode\":\"404\",\"message\":\"State Not Fuund\"}"));
try {
responseDeleted.block();
fail("Expected DaprException");
} catch (Exception ex) {
assertEquals(DaprException.class, ex.getCause().getCause().getClass());
}
} }
} }