Fix hot monos in GRPC + unit tests. (#251)

This commit is contained in:
Artur Souza 2020-03-10 01:04:55 -07:00 committed by GitHub
parent eb27994e78
commit 98ddaf5841
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 189 additions and 10 deletions

View File

@ -287,8 +287,7 @@ public class DaprClientGrpc implements DaprClient {
}
DaprProtos.SaveStateEnvelope envelope = builder.build();
ListenableFuture<Empty> futureEmpty = client.saveState(envelope);
return Mono.just(futureEmpty).flatMap(f -> {
return Mono.fromCallable(() -> client.saveState(envelope)).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {
@ -432,8 +431,7 @@ public class DaprClientGrpc implements DaprClient {
}
DaprProtos.DeleteStateEnvelope envelope = builder.build();
ListenableFuture<Empty> futureEmpty = client.deleteState(envelope);
return Mono.just(futureEmpty).flatMap(f -> {
return Mono.fromCallable(() -> client.deleteState(envelope)).flatMap(f -> {
try {
f.get();
} catch (Exception ex) {

View File

@ -221,7 +221,7 @@ public class DaprHttp {
* @return DaprError or null if could not parse.
*/
private static DaprError parseDaprError(byte[] json) throws IOException {
if (json == null) {
if ((json == null) || (json.length == 0)) {
return null;
}
return OBJECT_MAPPER.readValue(json, DaprError.class);

View File

@ -82,6 +82,21 @@ public class DaprClientGrpcTest {
assertTrue(callback.wasCalled);
}
@Test
public void publishEventNoHotMono() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<Empty>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.publishEvent(any(DaprProtos.PublishEventEnvelope.class)))
.thenAnswer(c -> {
settableFuture.set(Empty.newBuilder().build());
return settableFuture;
});
adapter.publishEvent("topic", "object");
// Do not call block() on the mono above, so nothing should happen.
assertFalse(callback.wasCalled);
}
@Test
public void publishEventObjectTest() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
@ -145,6 +160,22 @@ public class DaprClientGrpcTest {
assertTrue(callback.wasCalled);
}
@Test
public void invokeBindingObjectNoHotMono() {
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<Empty>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.invokeBinding(any(DaprProtos.InvokeBindingEnvelope.class)))
.thenAnswer(c -> {
settableFuture.set(Empty.newBuilder().build());
return settableFuture;
});
MyObject event = new MyObject(1, "Event");
adapter.invokeBinding("BindingName", event);
// Do not call block() on mono above, so nothing should happen.
assertFalse(callback.wasCalled);
}
@Test(expected = RuntimeException.class)
public void invokeServiceVoidExceptionThrownTest() {
when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class)))
@ -415,6 +446,24 @@ public class DaprClientGrpcTest {
assertTrue(callback.wasCalled);
}
@Test
public void invokeServiceNoRequestNoHotMono() throws Exception {
String expected = "Value";
SettableFuture<DaprProtos.InvokeServiceResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.InvokeServiceResponseEnvelope> callback =
new MockCallback<DaprProtos.InvokeServiceResponseEnvelope>(DaprProtos.InvokeServiceResponseEnvelope.newBuilder()
.setData(getAny(expected)).build());
addCallback(settableFuture, callback, directExecutor());
when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class)))
.thenAnswer(c -> {
settableFuture.set(DaprProtos.InvokeServiceResponseEnvelope.newBuilder().setData(getAny(expected)).build());
return settableFuture;
});
adapter.invokeService(Verb.GET, "appId", "method", null);
// Do not call block() on mono above, so nothing should happen.
assertFalse(callback.wasCalled);
}
@Test
public void invokeServiceNoRequestNoClassBodyObjectTest() throws Exception {
MyObject resultObj = new MyObject(1, "Value");
@ -466,13 +515,34 @@ public class DaprClientGrpcTest {
MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class)))
.thenReturn(settableFuture);
.thenReturn(settableFuture);
State<String> keyRequest = buildStateKey(null, key, etag, null);
Mono<State<String>> result = adapter.getState(STATE_STORE_NAME, keyRequest, String.class);
settableFuture.set(responseEnvelope);
assertEquals(expectedState, result.block());
}
@Test
public void getStateStringValueNoHotMono() throws IOException {
String etag = "ETag1";
String key = "key1";
String expectedValue = "Expected state";
State<String> expectedState = buildStateKey(expectedValue, key, etag, null);
DaprProtos.GetStateResponseEnvelope responseEnvelope = buildGetStateResponseEnvelope(expectedValue, etag);
SettableFuture<DaprProtos.GetStateResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class)))
.thenAnswer(c -> {
settableFuture.set(responseEnvelope);
return settableFuture;
});
State<String> keyRequest = buildStateKey(null, key, etag, null);
adapter.getState(STATE_STORE_NAME, keyRequest, String.class);
// block() on the mono above is not called, so nothing should happen.
assertFalse(callback.wasCalled);
}
@Test
public void getStateObjectValueWithOptionsTest() throws IOException {
String etag = "ETag1";
@ -564,20 +634,41 @@ public class DaprClientGrpcTest {
String etag = "ETag1";
String key = "key1";
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
stateKey.getOptions());
stateKey.getOptions());
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
}
@Test
public void deleteStateTestNoHotMono() {
String etag = "ETag1";
String key = "key1";
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenAnswer(c -> {
settableFuture.set(Empty.newBuilder().build());
return settableFuture;
});
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
stateKey.getOptions());
// Do not call result.block(), so nothing should happen.
assertFalse(callback.wasCalled);
}
@Test
public void deleteStateNoConsistencyTest() {
String etag = "ETag1";
@ -742,13 +833,32 @@ public class DaprClientGrpcTest {
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
}
@Test
public void saveStateTestNoHotMono() {
String key = "key1";
String etag = "ETag1";
String value = "State value";
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenAnswer(c -> {
settableFuture.set(Empty.newBuilder().build());
return settableFuture;
});
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
// No call to result.block(), so nothing should happen.
assertFalse(callback.wasCalled);
}
@Test
public void saveStateNoConsistencyTest() {
String key = "key1";

View File

@ -77,6 +77,18 @@ public class DaprClientHttpTest {
assertNull(mono.block());
}
@Test
public void publishEventNoHotMono() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/publish/A")
.respond(EXPECTED_RESULT);
String event = "{ \"message\": \"This is a test\" }";
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
daprClientHttp.publishEvent("", event);
// Should not throw exception because did not call block() on mono above.
}
@Test(expected = IllegalArgumentException.class)
public void invokeServiceVerbNull() {
mockInterceptor.addRule()
@ -187,6 +199,18 @@ public class DaprClientHttpTest {
assertNull(mono.block());
}
@Test
public void invokeServiceNoHotMono() {
Map<String, String> map = new HashMap<>();
mockInterceptor.addRule()
.get("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder")
.respond(500);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
daprClientHttp.invokeService(Verb.GET, "41", "neworder", "", map);
// No exception should be thrown because did not call block() on mono above.
}
@Test
public void invokeBinding() {
Map<String, String> map = new HashMap<>();
@ -211,6 +235,17 @@ public class DaprClientHttpTest {
assertNull(mono.block());
}
@Test
public void bindingNoHotMono() {
Map<String, String> map = new HashMap<>();
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
.respond(EXPECTED_RESULT);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
daprClientHttp.invokeBinding(null, "");
// No exception is thrown because did not call block() on mono above.
}
@Test
public void getStates() {
@ -253,6 +288,18 @@ public class DaprClientHttpTest {
assertEquals(monoNullEtag.block().getKey(), "key");
}
@Test
public void getStatesNoHotMono() {
State<String> stateNullEtag = new State("value", "key", null, null);
mockInterceptor.addRule()
.get("http://127.0.0.1:3000/v1.0/state/MyStateStore/key")
.respond(500);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
daprClientHttp.getState(STATE_STORE_NAME, stateNullEtag, String.class);
// No exception should be thrown since did not call block() on mono above.
}
@Test
public void saveStates() {
State<String> stateKeyValue = new State("value", "key", "etag", null);
@ -319,6 +366,17 @@ public class DaprClientHttpTest {
assertNull(mono.block());
}
@Test
public void saveStatesNoHotMono() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/state/MyStateStore")
.respond(500);
StateOptions stateOptions = mock(StateOptions.class);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
daprClientHttp.saveState(STATE_STORE_NAME, "key", "etag", "value", stateOptions);
// No exception should be thrown because we did not call block() on the mono above.
}
@Test
public void deleteState() {
@ -333,6 +391,19 @@ public class DaprClientHttpTest {
assertNull(mono.block());
}
@Test
public void deleteStateNoHotMono() {
StateOptions stateOptions = mock(StateOptions.class);
State<String> stateKeyValue = new State("value", "key", "etag", stateOptions);
mockInterceptor.addRule()
.delete("http://127.0.0.1:3000/v1.0/state/MyStateStore/key")
.respond(500);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
daprClientHttp.deleteState(STATE_STORE_NAME, stateKeyValue.getKey(), stateKeyValue.getEtag(), stateOptions);
// No exception should be thrown because we did not call block() on the mono above.
}
@Test
public void deleteStateNullEtag() {
State<String> stateKeyValue = new State("value", "key", null, null);