add metadata in get bulk state request (#401)

* add metadata in get bulk state request

* rollback DaprClientGrpcTest which should be comitted in another PR

* fix compile error after merge

* update unit test for metadata in getBulkState()

* re-trigger build again

* add test for withMetadata()

Co-authored-by: Mukundan Sundararajan <musundar@microsoft.com>
This commit is contained in:
Sky/敖小剑 2020-12-30 06:03:41 +08:00 committed by GitHub
parent 08f133dd1b
commit b5dd421142
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 41 additions and 0 deletions

View File

@ -283,6 +283,9 @@ public class DaprClientGrpc extends AbstractDaprClient {
.setStoreName(stateStoreName)
.addAllKeys(keys)
.setParallelism(parallelism);
if (request.getMetadata() != null) {
builder.putAllMetadata(request.getMetadata());
}
DaprProtos.GetBulkStateRequest envelope = builder.build();
return Mono.fromCallable(wrap(context, () -> {

View File

@ -8,6 +8,7 @@ package io.dapr.client.domain;
import io.opentelemetry.context.Context;
import java.util.List;
import java.util.Map;
/**
* A request to get bulk state by keys.
@ -18,6 +19,8 @@ public class GetBulkStateRequest {
private List<String> keys;
private Map<String, String> metadata;
private int parallelism;
private Context context;
@ -53,4 +56,12 @@ public class GetBulkStateRequest {
void setContext(Context context) {
this.context = context;
}
public Map<String, String> getMetadata() {
return metadata;
}
void setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}
}

View File

@ -10,6 +10,7 @@ import io.opentelemetry.context.Context;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Builds a request to request states.
@ -20,6 +21,8 @@ public class GetBulkStateRequestBuilder {
private final List<String> keys;
private Map<String, String> metadata;
private int parallelism = 1;
private Context context;
@ -34,6 +37,11 @@ public class GetBulkStateRequestBuilder {
this.keys = keys == null ? null : Collections.unmodifiableList(Arrays.asList(keys));
}
public GetBulkStateRequestBuilder withMetadata(Map<String, String> metadata) {
this.metadata = metadata == null ? null : Collections.unmodifiableMap(metadata);
return this;
}
public GetBulkStateRequestBuilder withParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
@ -52,6 +60,7 @@ public class GetBulkStateRequestBuilder {
GetBulkStateRequest request = new GetBulkStateRequest();
request.setStoreName(this.storeName);
request.setKeys(this.keys);
request.setMetadata(this.metadata);
request.setParallelism(this.parallelism);
request.setContext(this.context);
return request;

View File

@ -951,6 +951,7 @@ public class DaprClientGrpcTest {
});
// negative parallelism
GetBulkStateRequest req = new GetBulkStateRequestBuilder(STATE_STORE_NAME, Collections.singletonList("100"))
.withMetadata(new HashMap<>())
.withParallelism(-1)
.build();
assertThrows(IllegalArgumentException.class, () -> adapter.getBulkState(req, TypeRef.BOOLEAN).block());
@ -958,10 +959,14 @@ public class DaprClientGrpcTest {
@Test
public void getStatesString() throws IOException {
Map<String, String> metadata = new HashMap<>();
metadata.put("meta1", "value1");
metadata.put("meta2", "value2");
DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setData(serialize("hello world"))
.setKey("100")
.putAllMetadata(metadata)
.setEtag("1")
.build())
.addItems(DaprProtos.BulkStateItem.newBuilder()
@ -983,6 +988,7 @@ public class DaprClientGrpcTest {
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertEquals("hello world", result.stream().findFirst().get().getValue());
assertEquals(metadata, result.stream().findFirst().get().getMetadata());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
@ -993,10 +999,13 @@ public class DaprClientGrpcTest {
@Test
public void getStatesInteger() throws IOException {
Map<String, String> metadata = new HashMap<>();
metadata.put("meta1", "value1");
DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setData(serialize(1234))
.setKey("100")
.putAllMetadata(metadata)
.setEtag("1")
.build())
.addItems(DaprProtos.BulkStateItem.newBuilder()
@ -1018,6 +1027,7 @@ public class DaprClientGrpcTest {
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertEquals(1234, (int)result.stream().findFirst().get().getValue());
assertEquals(metadata, result.stream().findFirst().get().getMetadata());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
@ -1028,10 +1038,13 @@ public class DaprClientGrpcTest {
@Test
public void getStatesBoolean() throws IOException {
Map<String, String> metadata = new HashMap<>();
metadata.put("meta1", "value1");
DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setData(serialize(true))
.setKey("100")
.putAllMetadata(metadata)
.setEtag("1")
.build())
.addItems(DaprProtos.BulkStateItem.newBuilder()
@ -1053,6 +1066,7 @@ public class DaprClientGrpcTest {
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertEquals(true, result.stream().findFirst().get().getValue());
assertEquals(metadata, result.stream().findFirst().get().getMetadata());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
@ -1063,10 +1077,12 @@ public class DaprClientGrpcTest {
@Test
public void getStatesByteArray() throws IOException {
Map<String, String> metadata = new HashMap<>();
DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setData(serialize(new byte[]{1, 2, 3}))
.setKey("100")
.putAllMetadata(metadata)
.setEtag("1")
.build())
.addItems(DaprProtos.BulkStateItem.newBuilder()
@ -1088,6 +1104,7 @@ public class DaprClientGrpcTest {
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertArrayEquals(new byte[]{1, 2, 3}, result.stream().findFirst().get().getValue());
assertEquals(0, result.stream().findFirst().get().getMetadata().size());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
@ -1124,6 +1141,7 @@ public class DaprClientGrpcTest {
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertEquals(object, result.stream().findFirst().get().getValue());
assertEquals(0, result.stream().findFirst().get().getMetadata().size());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());