Fix handling of metadata, headers. Removes etag from getState. (#440)

* Fix handling of metadata, headers. Removes etag from getState().

* Update CLI and runtime.

* Fix ITs.
This commit is contained in:
Artur Souza 2021-01-14 15:01:22 -08:00 committed by GitHub
parent 76d877c9c0
commit 019dee9a8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 270 additions and 148 deletions

View File

@ -22,11 +22,11 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
JDK_VER: 13.0.x
DAPR_CLI_VER: 1.0.0-rc.2
DAPR_RUNTIME_VER: 1.0.0-rc.1
DAPR_CLI_VER: 1.0.0-rc.3
DAPR_RUNTIME_VER: 1.0.0-rc.2
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh
DAPR_CLI_REF: 3dacfb672d55f1436c249057aaebbe597e1066f3
DAPR_REF: 3cca3cc1567f1cb955ae69b8fd784f075f62ad42
DAPR_CLI_REF:
DAPR_REF: 4678e477562e7e35a1d6ba04a9b17b6c9c00a025
OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }}
OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }}
GPG_KEY: ${{ secrets.GPG_KEY }}

View File

@ -9,18 +9,22 @@ import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprHttp;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.Metadata;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static io.dapr.it.Retry.callWithRetry;
import static io.dapr.it.TestUtils.assertThrowsDaprException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -34,6 +38,8 @@ public class PubSubIT extends BaseIT {
//The title of the topic to be used for publishing
private static final String TOPIC_NAME = "testingtopic";
private static final String ANOTHER_TOPIC_NAME = "anothertopic";
// Topic used for TTL test
private static final String TTL_TOPIC_NAME = "ttltopic";
/**
* Parameters for this test.
@ -49,16 +55,57 @@ public class PubSubIT extends BaseIT {
@Parameterized.Parameter
public boolean useGrpc;
private final List<DaprRun> runs = new ArrayList<>();
private DaprRun closeLater(DaprRun run) {
this.runs.add(run);
return run;
}
@After
public void tearDown() throws Exception {
for (DaprRun run : runs) {
run.stop();
}
}
@Test
public void publishPubSubNotFound() throws Exception {
DaprRun daprRun = closeLater(startDaprApp(
this.getClass().getSimpleName(),
60000));
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}
try (DaprClient client = new DaprClientBuilder().build()) {
if (this.useGrpc) {
assertThrowsDaprException(
"INVALID_ARGUMENT",
"INVALID_ARGUMENT: pubsub unknown pubsub not found",
() -> client.publishEvent("unknown pubsub", "mytopic", "payload").block());
} else {
assertThrowsDaprException(
"ERR_PUBSUB_NOT_FOUND",
"ERR_PUBSUB_NOT_FOUND: pubsub unknown pubsub not found",
() -> client.publishEvent("unknown pubsub", "mytopic", "payload").block());
}
}
}
@Test
public void testPubSub() throws Exception {
System.out.println("Working Directory = " + System.getProperty("user.dir"));
final DaprRun daprRun = startDaprApp(
final DaprRun daprRun = closeLater(startDaprApp(
this.getClass().getSimpleName(),
SubscriberService.SUCCESS_MESSAGE,
SubscriberService.class,
true,
60000);
60000));
// At this point, it is guaranteed that the service above is running and all ports being listened to.
if (this.useGrpc) {
daprRun.switchToGRPC();
@ -123,4 +170,62 @@ public class PubSubIT extends BaseIT {
}
}
@Test
public void testPubSubTTLMetadata() throws Exception {
System.out.println("Working Directory = " + System.getProperty("user.dir"));
DaprRun daprRun = closeLater(startDaprApp(
this.getClass().getSimpleName(),
60000));
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}
// Send a batch of messages on one topic, all to be expired in 1 second.
try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d on topic %s", i, TTL_TOPIC_NAME);
//Publishing messages
client.publishEvent(
PUBSUB_NAME,
TTL_TOPIC_NAME,
message,
Collections.singletonMap(Metadata.TTL_IN_SECONDS, "1")).block();
System.out.println(String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, TOPIC_NAME, PUBSUB_NAME));
}
}
daprRun.stop();
// Sleeps for two seconds to let them expire.
Thread.sleep(2000);
daprRun = closeLater(startDaprApp(
this.getClass().getSimpleName(),
SubscriberService.SUCCESS_MESSAGE,
SubscriberService.class,
true,
60000));
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}
// Sleeps for five seconds to give subscriber a chance to receive messages.
Thread.sleep(5000);
final String appId = daprRun.getAppName();
try (DaprClient client = new DaprClientBuilder().build()) {
callWithRetry(() -> {
System.out.println("Checking results for topic " + TTL_TOPIC_NAME);
final List<String> messages = client.invokeMethod(appId, "messages/" + TTL_TOPIC_NAME, null, HttpExtension.GET, List.class).block();
assertEquals(0, messages.size());
}, 2000);
}
daprRun.stop();
}
}

View File

@ -23,6 +23,7 @@ public class SubscriberController {
private static final List<Object> messagesReceivedTestingTopic = new ArrayList();
private static final List<Object> messagesReceivedAnotherTopic = new ArrayList();
private static final List<Object> messagesReceivedTTLTopic = new ArrayList();
@GetMapping(path = "/messages/testingtopic")
public List<Object> getMessagesReceivedTestingTopic() {
@ -34,6 +35,11 @@ public class SubscriberController {
return messagesReceivedAnotherTopic;
}
@GetMapping(path = "/messages/ttltopic")
public List<Object> getMessagesReceivedTTLTopic() {
return messagesReceivedTTLTopic;
}
@Topic(name = "testingtopic", pubsubName = "messagebus")
@PostMapping(path = "/route1")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent envelope) {
@ -62,4 +68,18 @@ public class SubscriberController {
});
}
@Topic(name = "ttltopic", pubsubName = "messagebus")
@PostMapping(path = "/route3")
public Mono<Void> handleMessageTTLTopic(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
System.out.println("TTL topic Subscriber got message: " + message);
messagesReceivedTTLTopic.add(envelope.getData());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

View File

@ -79,14 +79,4 @@ public class GRPCStateClientIT extends AbstractStateClientIT {
byte[].class).block());
}
@Test
public void publishPubSubNotFound() {
DaprClient daprClient = buildDaprClient();
// DaprException is guaranteed in the Dapr SDK but getCause() is null in HTTP while present in GRPC implementation.
assertThrowsDaprException(
"NOT_FOUND",
"NOT_FOUND: pubsub 'unknown pubsub' not found",
() -> daprClient.publishEvent("unknown pubsub", "mytopic", "payload").block());
}
}

View File

@ -58,7 +58,7 @@ public class HttpStateClientIT extends AbstractStateClientIT {
// DaprException is guaranteed in the Dapr SDK but getCause() is null in HTTP while present in GRPC implementation.
assertThrowsDaprException(
"ERR_STATE_STORE_NOT_FOUND",
"ERR_STATE_STORE_NOT_FOUND: state store unknown%20state%20store is not found",
"ERR_STATE_STORE_NOT_FOUND: state store unknown state store is not found",
() -> daprClient.getState("unknown state store", new State(stateKey), byte[].class).block());
}
@ -71,7 +71,7 @@ public class HttpStateClientIT extends AbstractStateClientIT {
// DaprException is guaranteed in the Dapr SDK but getCause() is null in HTTP while present in GRPC implementation.
assertThrowsDaprException(
"ERR_STATE_STORE_NOT_FOUND",
"ERR_STATE_STORE_NOT_FOUND: state store unknown%20state%20store is not found",
"ERR_STATE_STORE_NOT_FOUND: state store unknown state store is not found",
() -> daprClient.getBulkState(
"unknown state store",
Collections.singletonList(stateKey),
@ -82,10 +82,6 @@ public class HttpStateClientIT extends AbstractStateClientIT {
public void publishPubSubNotFound() {
DaprClient daprClient = buildDaprClient();
// DaprException is guaranteed in the Dapr SDK but getCause() is null in HTTP while present in GRPC implementation.
assertThrowsDaprException(
"ERR_PUBSUB_NOT_FOUND",
"ERR_PUBSUB_NOT_FOUND: pubsub 'unknown%20pubsub' not found",
() -> daprClient.publishEvent("unknown pubsub", "mytopic", "payload").block());
}
}

View File

@ -100,7 +100,6 @@ abstract class AbstractDaprClient implements DaprClient {
InvokeServiceRequest req = builder
.withBody(data)
.withHttpExtension(httpExtension)
.withMetadata(metadata)
.withContentType(objectSerializer.getContentType())
.build();
@ -252,7 +251,7 @@ abstract class AbstractDaprClient implements DaprClient {
*/
@Override
public <T> Mono<State<T>> getState(String storeName, State<T> state, TypeRef<T> type) {
return this.getState(storeName, state.getKey(), state.getEtag(), state.getOptions(), type);
return this.getState(storeName, state.getKey(), state.getOptions(), type);
}
/**
@ -260,7 +259,7 @@ abstract class AbstractDaprClient implements DaprClient {
*/
@Override
public <T> Mono<State<T>> getState(String storeName, State<T> state, Class<T> clazz) {
return this.getState(storeName, state.getKey(), state.getEtag(), state.getOptions(), TypeRef.get(clazz));
return this.getState(storeName, state.getKey(), state.getOptions(), TypeRef.get(clazz));
}
/**
@ -268,7 +267,7 @@ abstract class AbstractDaprClient implements DaprClient {
*/
@Override
public <T> Mono<State<T>> getState(String storeName, String key, TypeRef<T> type) {
return this.getState(storeName, key, null, null, type);
return this.getState(storeName, key, null, type);
}
/**
@ -276,7 +275,7 @@ abstract class AbstractDaprClient implements DaprClient {
*/
@Override
public <T> Mono<State<T>> getState(String storeName, String key, Class<T> clazz) {
return this.getState(storeName, key, null, null, TypeRef.get(clazz));
return this.getState(storeName, key, null, TypeRef.get(clazz));
}
/**
@ -284,9 +283,8 @@ abstract class AbstractDaprClient implements DaprClient {
*/
@Override
public <T> Mono<State<T>> getState(
String storeName, String key, String etag, StateOptions options, TypeRef<T> type) {
String storeName, String key, StateOptions options, TypeRef<T> type) {
GetStateRequest request = new GetStateRequestBuilder(storeName, key)
.withEtag(etag)
.withStateOptions(options)
.build();
return this.getState(request, type).map(r -> r.getObject());
@ -298,8 +296,8 @@ abstract class AbstractDaprClient implements DaprClient {
*/
@Override
public <T> Mono<State<T>> getState(
String storeName, String key, String etag, StateOptions options, Class<T> clazz) {
return this.getState(storeName, key, etag, options, TypeRef.get(clazz));
String storeName, String key, StateOptions options, Class<T> clazz) {
return this.getState(storeName, key, options, TypeRef.get(clazz));
}
/**

View File

@ -354,26 +354,24 @@ public interface DaprClient extends AutoCloseable {
*
* @param storeName The name of the state store.
* @param key The key of the State to be retrieved.
* @param etag Optional etag for conditional get
* @param options Optional settings for retrieve operation.
* @param type The Type of State needed as return.
* @param <T> The Type of the return.
* @return A Mono Plan for the requested State.
*/
<T> Mono<State<T>> getState(String storeName, String key, String etag, StateOptions options, TypeRef<T> type);
<T> Mono<State<T>> getState(String storeName, String key, StateOptions options, TypeRef<T> type);
/**
* Retrieve a State based on their key.
*
* @param storeName The name of the state store.
* @param key The key of the State to be retrieved.
* @param etag Optional etag for conditional get
* @param options Optional settings for retrieve operation.
* @param clazz The Type of State needed as return.
* @param <T> The Type of the return.
* @return A Mono Plan for the requested State.
*/
<T> Mono<State<T>> getState(String storeName, String key, String etag, StateOptions options, Class<T> clazz);
<T> Mono<State<T>> getState(String storeName, String key, StateOptions options, Class<T> clazz);
/**
* Retrieve a State based on their key.

View File

@ -58,7 +58,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -163,6 +162,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
.setTopic(topic)
.setPubsubName(pubsubName)
.setData(ByteString.copyFrom(objectSerializer.serialize(data)));
envelopeBuilder.setDataContentType(objectSerializer.getContentType());
Map<String, String> metadata = request.getMetadata();
if (metadata != null) {
envelopeBuilder.putAllMetadata(metadata);
@ -189,14 +189,17 @@ public class DaprClientGrpc extends AbstractDaprClient {
try {
String appId = invokeServiceRequest.getAppId();
String method = invokeServiceRequest.getMethod();
Object request = invokeServiceRequest.getBody();
Object body = invokeServiceRequest.getBody();
HttpExtension httpExtension = invokeServiceRequest.getHttpExtension();
Context context = invokeServiceRequest.getContext();
DaprProtos.InvokeServiceRequest envelope = buildInvokeServiceRequest(
httpExtension,
appId,
method,
request);
body);
// Regarding missing metadata in method invocation for gRPC:
// gRPC to gRPC does not handle metadata in Dapr runtime proto.
// gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342
return this.<CommonProtos.InvokeResponse>createMono(
context,
@ -271,8 +274,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
final String stateStoreName = request.getStoreName();
final String key = request.getKey();
final StateOptions options = request.getStateOptions();
// TODO(artursouza): handle etag once available in proto.
// String etag = request.getEtag();
final Map<String, String> metadata = request.getMetadata();
final Context context = request.getContext();
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
@ -283,8 +285,10 @@ public class DaprClientGrpc extends AbstractDaprClient {
}
DaprProtos.GetStateRequest.Builder builder = DaprProtos.GetStateRequest.newBuilder()
.setStoreName(stateStoreName)
.setKey(key)
.putAllMetadata(request.getMetadata());
.setKey(key);
if (metadata != null) {
builder.putAllMetadata(metadata);
}
if (options != null && options.getConsistency() != null) {
builder.setConsistency(getGrpcStateConsistency(options));
}
@ -317,6 +321,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
final String stateStoreName = request.getStoreName();
final List<String> keys = request.getKeys();
final int parallelism = request.getParallelism();
final Map<String, String> metadata = request.getMetadata();
final Context context = request.getContext();
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
@ -332,8 +337,8 @@ public class DaprClientGrpc extends AbstractDaprClient {
.setStoreName(stateStoreName)
.addAllKeys(keys)
.setParallelism(parallelism);
if (request.getMetadata() != null) {
builder.putAllMetadata(request.getMetadata());
if (metadata != null) {
builder.putAllMetadata(metadata);
}
DaprProtos.GetBulkStateRequest envelope = builder.build();
@ -500,6 +505,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
final String key = request.getKey();
final StateOptions options = request.getStateOptions();
final String etag = request.getEtag();
final Map<String, String> metadata = request.getMetadata();
final Context context = request.getContext();
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
@ -521,8 +527,10 @@ public class DaprClientGrpc extends AbstractDaprClient {
}
DaprProtos.DeleteStateRequest.Builder builder = DaprProtos.DeleteStateRequest.newBuilder()
.setStoreName(stateStoreName)
.setKey(key)
.putAllMetadata(request.getMetadata());
.setKey(key);
if (metadata != null) {
builder.putAllMetadata(metadata);
}
if (etag != null) {
builder.setEtag(etag);
}
@ -548,7 +556,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
* @param httpExtension Object for HttpExtension
* @param appId The application id to be invoked
* @param method The application method to be invoked
* @param request The body of the request to be send as part of the invocation
* @param body The body of the request to be send as part of the invocation
* @param <K> The Type of the Body
* @return The object to be sent as part of the invocation.
* @throws IOException If there's an issue serializing the request.
@ -557,14 +565,14 @@ public class DaprClientGrpc extends AbstractDaprClient {
HttpExtension httpExtension,
String appId,
String method,
K request) throws IOException {
K body) throws IOException {
if (httpExtension == null) {
throw new IllegalArgumentException("HttpExtension cannot be null. Use HttpExtension.NONE instead.");
}
CommonProtos.InvokeRequest.Builder requestBuilder = CommonProtos.InvokeRequest.newBuilder();
requestBuilder.setMethod(method);
if (request != null) {
byte[] byteRequest = objectSerializer.serialize(request);
if (body != null) {
byte[] byteRequest = objectSerializer.serialize(body);
Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteRequest)).build();
requestBuilder.setData(data);
} else {

View File

@ -39,6 +39,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
@ -53,6 +54,11 @@ public class DaprClientHttp extends AbstractDaprClient {
*/
private static final String HEADER_HTTP_ETAG_ID = "If-Match";
/**
* Metadata prefix in query params.
*/
private static final String METADATA_PREFIX = "metadata.";
/**
* Serializer for internal objects.
*/
@ -133,11 +139,13 @@ public class DaprClientHttp extends AbstractDaprClient {
}
byte[] serializedEvent = objectSerializer.serialize(data);
Map<String, String> headers = Collections.singletonMap("content-type", objectSerializer.getContentType());
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic };
Map<String, String> queryArgs = metadataToQueryArgs(metadata);
return this.client.invokeApi(
DaprHttp.HttpMethods.POST.name(), pathSegments, null, serializedEvent, metadata, context)
DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, serializedEvent, headers, context)
.thenReturn(new Response<>(context, null));
} catch (Exception ex) {
return DaprException.wrapMono(ex);
@ -152,14 +160,14 @@ public class DaprClientHttp extends AbstractDaprClient {
final String appId = invokeServiceRequest.getAppId();
final String method = invokeServiceRequest.getMethod();
final Object request = invokeServiceRequest.getBody();
final Map<String, String> metadata = invokeServiceRequest.getMetadata();
final HttpExtension httpExtension = invokeServiceRequest.getHttpExtension();
final String contentType = invokeServiceRequest.getContentType();
final Context context = invokeServiceRequest.getContext();
if (httpExtension == null) {
throw new IllegalArgumentException("HttpExtension cannot be null. Use HttpExtension.NONE instead.");
}
// If the httpExtension is not null, then the method will not be null based on checks in constructor
String httMethod = httpExtension.getMethod().toString();
final String httpMethod = httpExtension.getMethod().toString();
if (appId == null || appId.trim().isEmpty()) {
throw new IllegalArgumentException("App Id cannot be null or empty.");
}
@ -170,8 +178,13 @@ public class DaprClientHttp extends AbstractDaprClient {
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "invoke", appId, "method", method };
Mono<DaprHttp.Response> response = this.client.invokeApi(httMethod, pathSegments,
httpExtension.getQueryString(), serializedRequestBody, metadata, context);
final Map<String, String> headers = new HashMap<>();
if (contentType != null && !contentType.isEmpty()) {
headers.put("content-type", contentType);
}
headers.putAll(httpExtension.getHeaders());
Mono<DaprHttp.Response> response = this.client.invokeApi(httpMethod, pathSegments,
httpExtension.getQueryString(), serializedRequestBody, headers, context);
return response.flatMap(r -> getMono(type, r)).map(r -> new Response<>(context, r));
} catch (Exception ex) {
return DaprException.wrapMono(ex);
@ -258,6 +271,7 @@ public class DaprClientHttp extends AbstractDaprClient {
final String stateStoreName = request.getStoreName();
final List<String> keys = request.getKeys();
final int parallelism = request.getParallelism();
final Map<String, String> metadata = request.getMetadata();
final Context context = request.getContext();
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
@ -278,8 +292,9 @@ public class DaprClientHttp extends AbstractDaprClient {
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "bulk"};
Map<String, String> queryArgs = metadataToQueryArgs(metadata);
return this.client
.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, null, requestBody, null, context)
.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, requestBody, null, context)
.flatMap(s -> {
try {
return Mono.just(buildStates(s, type));
@ -304,7 +319,7 @@ public class DaprClientHttp extends AbstractDaprClient {
final String stateStoreName = request.getStoreName();
final String key = request.getKey();
final StateOptions options = request.getStateOptions();
final String etag = request.getEtag();
final Map<String, String> metadata = request.getMetadata();
final Context context = request.getContext();
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
@ -313,21 +328,18 @@ public class DaprClientHttp extends AbstractDaprClient {
if ((key == null) || (key.trim().isEmpty())) {
throw new IllegalArgumentException("Key cannot be null or empty.");
}
Map<String, String> headers = new HashMap<>();
if (etag != null && !etag.trim().isEmpty()) {
headers.put(HEADER_HTTP_ETAG_ID, etag);
}
Map<String, String> urlParameters = Optional.ofNullable(options)
Map<String, String> optionsMap = Optional.ofNullable(options)
.map(o -> o.getStateOptionsAsMap())
.orElse(new HashMap<>());
.orElse(Collections.EMPTY_MAP);
request.getMetadata().forEach(urlParameters::put);
final Map<String, String> queryParams = new HashMap<>();
queryParams.putAll(metadataToQueryArgs(metadata));
queryParams.putAll(optionsMap);
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key};
return this.client
.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, urlParameters, headers, context)
.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryParams, null, context)
.flatMap(s -> {
try {
return Mono.just(buildState(s, key, options, type));
@ -450,6 +462,7 @@ public class DaprClientHttp extends AbstractDaprClient {
final String key = request.getKey();
final StateOptions options = request.getStateOptions();
final String etag = request.getEtag();
final Map<String, String> metadata = request.getMetadata();
final Context context = request.getContext();
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
@ -463,16 +476,18 @@ public class DaprClientHttp extends AbstractDaprClient {
headers.put(HEADER_HTTP_ETAG_ID, etag);
}
Map<String, String> urlParameters = Optional.ofNullable(options)
Map<String, String> optionsMap = Optional.ofNullable(options)
.map(stateOptions -> stateOptions.getStateOptionsAsMap())
.orElse(new HashMap<>());
.orElse(Collections.EMPTY_MAP);
request.getMetadata().forEach(urlParameters::put);
final Map<String, String> queryParams = new HashMap<>();
queryParams.putAll(metadataToQueryArgs(metadata));
queryParams.putAll(optionsMap);
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key};
return this.client.invokeApi(
DaprHttp.HttpMethods.DELETE.name(), pathSegments, urlParameters, headers, context)
DaprHttp.HttpMethods.DELETE.name(), pathSegments, queryParams, headers, context)
.thenReturn(new Response<>(context, null));
} catch (Exception ex) {
return DaprException.wrapMono(ex);
@ -556,9 +571,10 @@ public class DaprClientHttp extends AbstractDaprClient {
return DaprException.wrapMono(e);
}
Map<String, String> queryArgs = metadataToQueryArgs(metadata);
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, key};
return this.client
.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, metadata, (String)null, null, context)
.invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryArgs, (String)null, null, context)
.flatMap(response -> {
try {
Map m = INTERNAL_SERIALIZER.deserialize(response.getBody(), Map.class);
@ -575,8 +591,28 @@ public class DaprClientHttp extends AbstractDaprClient {
.map(m -> new Response<>(context, m));
}
/**
* {@inheritDoc}
*/
@Override
public void close() {
client.close();
}
/**
* Converts metadata map into HTTP headers.
* @param metadata metadata map
* @return HTTP headers
*/
private static Map<String, String> metadataToQueryArgs(Map<String, String> metadata) {
if (metadata == null) {
return Collections.EMPTY_MAP;
}
return metadata
.entrySet()
.stream()
.filter(e -> e.getKey() != null)
.collect(Collectors.toMap(e -> METADATA_PREFIX + e.getKey(), e -> e.getValue()));
}
}

View File

@ -21,8 +21,6 @@ public class GetStateRequest {
private Map<String, String> metadata;
private String etag;
private StateOptions stateOptions;
private Context context;
@ -43,14 +41,6 @@ public class GetStateRequest {
this.key = key;
}
public String getEtag() {
return etag;
}
void setEtag(String etag) {
this.etag = etag;
}
public StateOptions getStateOptions() {
return stateOptions;
}

View File

@ -20,8 +20,6 @@ public class GetStateRequestBuilder {
private Map<String, String> metadata;
private String etag;
private StateOptions stateOptions;
private Context context;
@ -36,11 +34,6 @@ public class GetStateRequestBuilder {
return this;
}
public GetStateRequestBuilder withEtag(String etag) {
this.etag = etag;
return this;
}
public GetStateRequestBuilder withStateOptions(StateOptions stateOptions) {
this.stateOptions = stateOptions;
return this;
@ -60,7 +53,6 @@ public class GetStateRequestBuilder {
request.setStoreName(this.storeName);
request.setKey(this.key);
request.setMetadata(this.metadata);
request.setEtag(this.etag);
request.setStateOptions(this.stateOptions);
request.setContext(this.context);
return request;

View File

@ -6,10 +6,8 @@
package io.dapr.client.domain;
import io.dapr.client.DaprHttp;
import io.dapr.exceptions.DaprException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
@ -22,39 +20,39 @@ public final class HttpExtension {
/**
* Convenience HttpExtension object for {@link io.dapr.client.DaprHttp.HttpMethods#NONE} with empty queryString.
*/
public static final HttpExtension NONE = new HttpExtension(DaprHttp.HttpMethods.NONE, new HashMap<>());
public static final HttpExtension NONE = new HttpExtension(DaprHttp.HttpMethods.NONE);
/**
* Convenience HttpExtension object for the {@link DaprHttp.HttpMethods#GET} Verb with empty queryString.
*/
public static final HttpExtension GET = new HttpExtension(DaprHttp.HttpMethods.GET, new HashMap<>());
public static final HttpExtension GET = new HttpExtension(DaprHttp.HttpMethods.GET);
/**
* Convenience HttpExtension object for the {@link DaprHttp.HttpMethods#PUT} Verb with empty queryString.
*/
public static final HttpExtension PUT = new HttpExtension(DaprHttp.HttpMethods.PUT, new HashMap<>());
public static final HttpExtension PUT = new HttpExtension(DaprHttp.HttpMethods.PUT);
/**
* Convenience HttpExtension object for the {@link DaprHttp.HttpMethods#POST} Verb with empty queryString.
*/
public static final HttpExtension POST = new HttpExtension(DaprHttp.HttpMethods.POST, new HashMap<>());
public static final HttpExtension POST = new HttpExtension(DaprHttp.HttpMethods.POST);
/**
* Convenience HttpExtension object for the {@link DaprHttp.HttpMethods#DELETE} Verb with empty queryString.
*/
public static final HttpExtension DELETE = new HttpExtension(DaprHttp.HttpMethods.DELETE, new HashMap<>());
public static final HttpExtension DELETE = new HttpExtension(DaprHttp.HttpMethods.DELETE);
/**
* Convenience HttpExtension object for the {@link DaprHttp.HttpMethods#HEAD} Verb with empty queryString.
*/
public static final HttpExtension HEAD = new HttpExtension(DaprHttp.HttpMethods.HEAD, new HashMap<>());
public static final HttpExtension HEAD = new HttpExtension(DaprHttp.HttpMethods.HEAD);
/**
* Convenience HttpExtension object for the {@link DaprHttp.HttpMethods#CONNECT} Verb with empty queryString.
*/
public static final HttpExtension CONNECT = new HttpExtension(DaprHttp.HttpMethods.CONNECT, new HashMap<>());
public static final HttpExtension CONNECT = new HttpExtension(DaprHttp.HttpMethods.CONNECT);
/**
* Convenience HttpExtension object for the {@link DaprHttp.HttpMethods#OPTIONS} Verb with empty queryString.
*/
public static final HttpExtension OPTIONS = new HttpExtension(DaprHttp.HttpMethods.OPTIONS, new HashMap<>());
public static final HttpExtension OPTIONS = new HttpExtension(DaprHttp.HttpMethods.OPTIONS);
/**
* Convenience HttpExtension object for the {@link DaprHttp.HttpMethods#TRACE} Verb with empty queryString.
*/
public static final HttpExtension TRACE = new HttpExtension(DaprHttp.HttpMethods.TRACE, new HashMap<>());
public static final HttpExtension TRACE = new HttpExtension(DaprHttp.HttpMethods.TRACE);
/**
* HTTP verb.
@ -66,26 +64,37 @@ public final class HttpExtension {
*/
private Map<String, String> queryString;
/**
* HTTP headers.
*/
private Map<String, String> headers;
/**
* Construct a HttpExtension object.
* @param method Required value denoting the HttpMethod.
* @param queryString Non-null map value for the queryString for a HTTP listener.
* @param queryString map for the queryString the HTTP call.
* @param headers map to set HTTP headers.
* @see io.dapr.client.DaprHttp.HttpMethods for supported methods.
* @throws IllegalArgumentException on null method or queryString.
*/
public HttpExtension(DaprHttp.HttpMethods method, Map<String, String> queryString) {
try {
if (method == null) {
throw new IllegalArgumentException("HttpExtension method cannot be null");
} else if (queryString == null) {
throw new IllegalArgumentException("HttpExtension queryString map cannot be null");
}
this.method = method;
this.queryString = Collections.unmodifiableMap(queryString);
} catch (RuntimeException e) {
DaprException.wrap(e);
public HttpExtension(DaprHttp.HttpMethods method, Map<String, String> queryString, Map<String, String> headers) {
if (method == null) {
throw new IllegalArgumentException("HttpExtension method cannot be null");
}
this.method = method;
this.queryString = Collections.unmodifiableMap(queryString == null ? Collections.EMPTY_MAP : queryString);
this.headers = Collections.unmodifiableMap(headers == null ? Collections.EMPTY_MAP : headers);
}
/**
* Construct a HttpExtension object.
* @param method Required value denoting the HttpMethod.
* @see io.dapr.client.DaprHttp.HttpMethods for supported methods.
* @throws IllegalArgumentException on null method or queryString.
*/
public HttpExtension(DaprHttp.HttpMethods method) {
this(method, null, null);
}
public DaprHttp.HttpMethods getMethod() {
@ -95,4 +104,8 @@ public final class HttpExtension {
public Map<String, String> getQueryString() {
return queryString;
}
public Map<String, String> getHeaders() {
return headers;
}
}

View File

@ -20,8 +20,6 @@ public class InvokeServiceRequest {
private Object body;
private Map<String, String> metadata;
private HttpExtension httpExtension;
private Context context;
@ -52,14 +50,6 @@ public class InvokeServiceRequest {
this.body = body;
}
public Map<String, String> getMetadata() {
return metadata;
}
void setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}
public HttpExtension getHttpExtension() {
return httpExtension;
}

View File

@ -7,10 +7,6 @@ package io.dapr.client.domain;
import io.opentelemetry.context.Context;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Builds a request to invoke a service.
*/
@ -24,8 +20,6 @@ public class InvokeServiceRequestBuilder {
private Object body;
private Map<String, String> metadata = new HashMap<>();
private HttpExtension httpExtension = HttpExtension.NONE;
private Context context;
@ -45,11 +39,6 @@ public class InvokeServiceRequestBuilder {
return this;
}
public InvokeServiceRequestBuilder withMetadata(Map<String, String> metadata) {
this.metadata = metadata == null ? null : Collections.unmodifiableMap(metadata);
return this;
}
public InvokeServiceRequestBuilder withHttpExtension(HttpExtension httpExtension) {
this.httpExtension = httpExtension;
return this;
@ -71,7 +60,6 @@ public class InvokeServiceRequestBuilder {
request.setMethod(this.method);
request.setBody(this.body);
request.setHttpExtension(this.httpExtension);
request.setMetadata(this.metadata);
request.setContext(this.context);
return request;
}

View File

@ -532,9 +532,8 @@ public class DaprClientGrpcTest {
@Test
public void invokeServiceWithHttpExtensionTest() throws IOException {
HttpExtension httpExtension = new HttpExtension(DaprHttp.HttpMethods.GET, new HashMap<String, String>() {{
put("test", "1");
}});
HttpExtension httpExtension = new HttpExtension(
DaprHttp.HttpMethods.GET, Collections.singletonMap("test", "1"), null);
CommonProtos.InvokeRequest message = CommonProtos.InvokeRequest.newBuilder()
.setMethod("method")
.setData(getAny("request"))
@ -951,7 +950,7 @@ public class DaprClientGrpcTest {
.setEtag(etag)
.build();
GetStateRequestBuilder builder = new GetStateRequestBuilder(STATE_STORE_NAME, key);
builder.withMetadata(metadata).withEtag(etag).withStateOptions(options);
builder.withMetadata(metadata).withStateOptions(options);
GetStateRequest request = builder.build();
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.GetStateResponse> observer = (StreamObserver<DaprProtos.GetStateResponse>) invocation.getArguments()[1];

View File

@ -161,7 +161,7 @@ public class DaprClientHttpTest {
daprClientHttp = new DaprClientHttp(daprHttp);
assertThrows(IllegalArgumentException.class, () -> {
// null HttpMethod
daprClientHttp.invokeMethod("1", "2", "3", new HttpExtension(null, null), null, (Class)null).block();
daprClientHttp.invokeMethod("1", "2", "3", new HttpExtension(null), null, (Class)null).block();
});
assertThrows(IllegalArgumentException.class, () -> {
// null HttpExtension
@ -282,7 +282,7 @@ public class DaprClientHttpTest {
daprClientHttp = new DaprClientHttp(daprHttp);
Map<String, String> queryString = new HashMap<>();
queryString.put("test", "1");
HttpExtension httpExtension = new HttpExtension(DaprHttp.HttpMethods.GET, queryString);
HttpExtension httpExtension = new HttpExtension(DaprHttp.HttpMethods.GET, queryString, null);
Mono<Void> mono = daprClientHttp.invokeMethod("41", "neworder", "", httpExtension, map);
assertNull(mono.block());
}
@ -679,12 +679,12 @@ public class DaprClientHttpTest {
Map<String, String> metadata = new HashMap<>();
metadata.put("key_1", "val_1");
mockInterceptor.addRule()
.get("http://127.0.0.1:3000/v1.0/state/MyStateStore/key?key_1=val_1")
.get("http://127.0.0.1:3000/v1.0/state/MyStateStore/key?metadata.key_1=val_1")
.respond("\"" + EXPECTED_RESULT + "\"");
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
GetStateRequestBuilder builder = new GetStateRequestBuilder(STATE_STORE_NAME, "key");
builder.withMetadata(metadata).withEtag("");
builder.withMetadata(metadata);
Mono<Response<State<String>>> monoMetadata = daprClientHttp.getState(builder.build(), TypeRef.get(String.class));
assertEquals(monoMetadata.block().getObject().getKey(), "key");
}
@ -956,7 +956,7 @@ public class DaprClientHttpTest {
StateOptions stateOptions = mock(StateOptions.class);
State<String> stateKeyValue = new State<>("value", "key", "etag", stateOptions);
mockInterceptor.addRule()
.delete("http://127.0.0.1:3000/v1.0/state/MyStateStore/key?key_1=val_1")
.delete("http://127.0.0.1:3000/v1.0/state/MyStateStore/key?metadata.key_1=val_1")
.respond(EXPECTED_RESULT);
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
@ -1134,7 +1134,7 @@ public class DaprClientHttpTest {
.get("http://127.0.0.1:3000/v1.0/secrets/MySecretStore/key")
.respond("{ \"mysecretkey\": \"mysecretvalue\"}");
mockInterceptor.addRule()
.get("http://127.0.0.1:3000/v1.0/secrets/MySecretStore/key?metakey=metavalue")
.get("http://127.0.0.1:3000/v1.0/secrets/MySecretStore/key?metadata.metakey=metavalue")
.respond("{ \"mysecretkey2\": \"mysecretvalue2\"}");
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);

View File

@ -8,7 +8,6 @@ package io.dapr.runtime;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientHttp;
import io.dapr.client.DaprClientTestBuilder;
import io.dapr.client.DaprHttp;
import io.dapr.client.DaprHttpStub;
@ -129,7 +128,7 @@ public class DaprRuntimeTest {
eq((PUBLISH_PATH + "/" + PUBSUB_NAME + "/" + TOPIC_NAME).split("/")),
any(),
eq(serializer.serialize(message.data)),
eq(null),
any(),
any()))
.thenAnswer(invocationOnMock -> this.daprRuntime.handleInvocation(
TOPIC_NAME,