Support many query strings in HTTP and binary data in Cloud Event for PubSub. (#485)

* Handles base64 data from CloudEvent.

* Support for multiple query strings.

* Addressing comments.
This commit is contained in:
Artur Souza 2021-02-09 19:38:13 -08:00 committed by GitHub
parent 1b3077071f
commit e46ef319cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 323 additions and 76 deletions

View File

@ -26,7 +26,7 @@ jobs:
DAPR_RUNTIME_VER: 1.0.0-rc.3
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
DAPR_REF: 4899aa8fb8f7537fb10432e6cc0cfc09c572cb54
OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }}
OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }}
GPG_KEY: ${{ secrets.GPG_KEY }}

View File

@ -16,7 +16,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.33.1</grpc.version>
<protobuf.version>3.13.0</protobuf.version>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/4a6369caaba9cf46eae9bfa4fa6e76b474854c89/dapr/proto</dapr.proto.baseurl>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/4899aa8fb8f7537fb10432e6cc0cfc09c572cb54/dapr/proto</dapr.proto.baseurl>
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>

View File

@ -4,7 +4,7 @@ metadata:
name: sample123
spec:
type: bindings.kafka
version: 1
version: v1
metadata:
# Kafka broker connection setting
- name: brokers

View File

@ -4,7 +4,7 @@ metadata:
name: messagebus
spec:
type: pubsub.redis
version: 1
version: v1
metadata:
- name: redisHost
value: localhost:6379

View File

@ -4,7 +4,7 @@ metadata:
name: statestore
spec:
type: state.redis
version: 1
version: v1
metadata:
- name: redisHost
value: localhost:6379

View File

@ -4,7 +4,7 @@ metadata:
name: vault
spec:
type: secretstores.hashicorp.vault
version: 1
version: v1
metadata:
- name: vaultAddr
value: "http://127.0.0.1:8200"

View File

@ -5,17 +5,23 @@
package io.dapr.it.pubsub.http;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.Metadata;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -24,12 +30,18 @@ import java.util.List;
import static io.dapr.it.Retry.callWithRetry;
import static io.dapr.it.TestUtils.assertThrowsDaprException;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class PubSubIT extends BaseIT {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final TypeRef<List<CloudEvent>> CLOUD_EVENT_LIST_TYPE_REF = new TypeRef<>() {};
//Number of messages to be sent: 10
private static final int NUM_MESSAGES = 10;
@ -39,6 +51,8 @@ public class PubSubIT extends BaseIT {
private static final String ANOTHER_TOPIC_NAME = "anothertopic";
// Topic used for TTL test
private static final String TTL_TOPIC_NAME = "ttltopic";
// Topic to test binary data
private static final String BINARY_TOPIC_NAME = "binarytopic";
/**
* Parameters for this test.
@ -97,8 +111,6 @@ public class PubSubIT extends BaseIT {
@Test
public void testPubSub() throws Exception {
System.out.println("Working Directory = " + System.getProperty("user.dir"));
final DaprRun daprRun = closeLater(startDaprApp(
this.getClass().getSimpleName(),
SubscriberService.SUCCESS_MESSAGE,
@ -112,8 +124,25 @@ public class PubSubIT extends BaseIT {
daprRun.switchToHTTP();
}
DaprObjectSerializer serializer = new DaprObjectSerializer() {
@Override
public byte[] serialize(Object o) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsBytes(o);
}
@Override
public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
return (T) OBJECT_MAPPER.readValue(data, OBJECT_MAPPER.constructType(type.getType()));
}
@Override
public String getContentType() {
return "application/json";
}
};
// Send a batch of messages on one topic
try (DaprClient client = new DaprClientBuilder().build()) {
try (DaprClient client = new DaprClientBuilder().withObjectSerializer(serializer).build()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d on topic %s", i, TOPIC_NAME);
//Publishing messages
@ -140,38 +169,113 @@ public class PubSubIT extends BaseIT {
callWithRetry(() -> {
System.out.println("Checking results for topic " + TOPIC_NAME);
final List<String> messages = client.invokeMethod(daprRun.getAppName(), "messages/testingtopic", null, HttpExtension.GET, List.class).block();
final List<CloudEvent> messages = client.invokeMethod(
daprRun.getAppName(),
"messages/testingtopic",
null,
HttpExtension.GET,
CLOUD_EVENT_LIST_TYPE_REF).block();
assertEquals(11, messages.size());
for (int i = 0; i < NUM_MESSAGES; i++) {
assertTrue(messages.toString(), messages.contains(String.format("This is message #%d on topic %s", i, TOPIC_NAME)));
final int messageId = i;
assertTrue(messages
.stream()
.filter(m -> m.getData() != null)
.map(m -> m.getData())
.filter(m -> m.equals(String.format("This is message #%d on topic %s", messageId, TOPIC_NAME)))
.count() == 1);
}
boolean foundByte = false;
for (String message : messages) {
if ((message.getBytes().length == 1) && (message.getBytes()[0] == 1)) {
foundByte = true;
}
}
assertTrue(foundByte);
assertTrue(messages
.stream()
.filter(m -> m.getData() != null)
.map(m -> m.getData())
.filter(m -> "AQ==".equals(m))
.count() == 1);
}, 2000);
callWithRetry(() -> {
System.out.println("Checking results for topic " + ANOTHER_TOPIC_NAME);
final List<String> messages = client.invokeMethod(daprRun.getAppName(), "messages/anothertopic", null, HttpExtension.GET, List.class).block();
final List<CloudEvent> messages = client.invokeMethod(
daprRun.getAppName(),
"messages/anothertopic",
null,
HttpExtension.GET,
CLOUD_EVENT_LIST_TYPE_REF).block();
assertEquals(10, messages.size());
for (int i = 0; i < NUM_MESSAGES; i++) {
assertTrue(messages.contains(String.format("This is message #%d on topic %s", i, ANOTHER_TOPIC_NAME)));
final int messageId = i;
assertTrue(messages
.stream()
.filter(m -> m.getData() != null)
.map(m -> m.getData())
.filter(m -> m.equals(String.format("This is message #%d on topic %s", messageId, ANOTHER_TOPIC_NAME)))
.count() == 1);
}
}, 2000);
}
}
@Test
public void testPubSubTTLMetadata() throws Exception {
System.out.println("Working Directory = " + System.getProperty("user.dir"));
public void testPubSubBinary() throws Exception {
final DaprRun daprRun = closeLater(startDaprApp(
this.getClass().getSimpleName(),
SubscriberService.SUCCESS_MESSAGE,
SubscriberService.class,
true,
60000));
// At this point, it is guaranteed that the service above is running and all ports being listened to.
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}
DaprObjectSerializer serializer = new DaprObjectSerializer() {
@Override
public byte[] serialize(Object o) {
return (byte[])o;
}
@Override
public <T> T deserialize(byte[] data, TypeRef<T> type) {
return (T) data;
}
@Override
public String getContentType() {
return "application/octet-stream";
}
};
try (DaprClient client = new DaprClientBuilder().withObjectSerializer(serializer).build()) {
client.publishEvent(
PUBSUB_NAME,
BINARY_TOPIC_NAME,
new byte[]{1}).block();
System.out.println("Published one byte.");
}
Thread.sleep(3000);
try (DaprClient client = new DaprClientBuilder().build()) {
callWithRetry(() -> {
System.out.println("Checking results for topic " + BINARY_TOPIC_NAME);
final List<CloudEvent> messages = client.invokeMethod(
daprRun.getAppName(),
"messages/binarytopic",
null,
HttpExtension.GET, CLOUD_EVENT_LIST_TYPE_REF).block();
assertEquals(1, messages.size());
assertNull(messages.get(0).getData());
assertArrayEquals(new byte[]{1}, messages.get(0).getBinaryData());
}, 2000);
}
}
@Test
public void testPubSubTTLMetadata() throws Exception {
DaprRun daprRun = closeLater(startDaprApp(
this.getClass().getSimpleName(),
60000));

View File

@ -22,22 +22,28 @@ import java.util.List;
@RestController
public class SubscriberController {
private static final List<Object> messagesReceivedTestingTopic = new ArrayList();
private static final List<Object> messagesReceivedAnotherTopic = new ArrayList();
private static final List<Object> messagesReceivedTTLTopic = new ArrayList();
private static final List<CloudEvent> messagesReceivedTestingTopic = new ArrayList();
private static final List<CloudEvent> messagesReceivedBinaryTopic = new ArrayList();
private static final List<CloudEvent> messagesReceivedAnotherTopic = new ArrayList();
private static final List<CloudEvent> messagesReceivedTTLTopic = new ArrayList();
@GetMapping(path = "/messages/testingtopic")
public List<Object> getMessagesReceivedTestingTopic() {
public List<CloudEvent> getMessagesReceivedTestingTopic() {
return messagesReceivedTestingTopic;
}
@GetMapping(path = "/messages/binarytopic")
public List<CloudEvent> getMessagesReceivedBinaryTopic() {
return messagesReceivedBinaryTopic;
}
@GetMapping(path = "/messages/anothertopic")
public List<Object> getMessagesReceivedAnotherTopic() {
public List<CloudEvent> getMessagesReceivedAnotherTopic() {
return messagesReceivedAnotherTopic;
}
@GetMapping(path = "/messages/ttltopic")
public List<Object> getMessagesReceivedTTLTopic() {
public List<CloudEvent> getMessagesReceivedTTLTopic() {
return messagesReceivedTTLTopic;
}
@ -49,7 +55,22 @@ public class SubscriberController {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype();
System.out.println("Testing topic Subscriber got message: " + message + "; Content-type: " + contentType);
messagesReceivedTestingTopic.add(envelope.getData());
messagesReceivedTestingTopic.add(envelope);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "binarytopic", pubsubName = "messagebus")
@PostMapping(path = "/route2")
public Mono<Void> handleBinaryMessage(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype();
System.out.println("Binary topic Subscriber got message: " + message + "; Content-type: " + contentType);
messagesReceivedBinaryTopic.add(envelope);
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -57,13 +78,13 @@ public class SubscriberController {
}
@Topic(name = "anothertopic", pubsubName = "messagebus")
@PostMapping(path = "/route2")
@PostMapping(path = "/route3")
public Mono<Void> handleMessageAnotherTopic(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
System.out.println("Another topic Subscriber got message: " + message);
messagesReceivedAnotherTopic.add(envelope.getData());
messagesReceivedAnotherTopic.add(envelope);
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -71,13 +92,13 @@ public class SubscriberController {
}
@Topic(name = "ttltopic", pubsubName = "messagebus")
@PostMapping(path = "/route3")
@PostMapping(path = "/route4")
public Mono<Void> handleMessageTTLTopic(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
System.out.println("TTL topic Subscriber got message: " + message);
messagesReceivedTTLTopic.add(envelope.getData());
messagesReceivedTTLTopic.add(envelope);
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -40,6 +40,7 @@ import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.stub.StreamObserver;
import okhttp3.HttpUrl;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.context.Context;
@ -47,8 +48,10 @@ import reactor.util.context.Context;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -550,8 +553,9 @@ public class DaprClientGrpc extends AbstractDaprClient {
requestBuilder.setData(Any.newBuilder().build());
}
CommonProtos.HTTPExtension.Builder httpExtensionBuilder = CommonProtos.HTTPExtension.newBuilder();
httpExtensionBuilder.setVerb(CommonProtos.HTTPExtension.Verb.valueOf(httpExtension.getMethod().toString()))
.putAllQuerystring(httpExtension.getQueryString());
.setQuerystring(httpExtension.encodeQueryString());
requestBuilder.setHttpExtension(httpExtensionBuilder.build());
requestBuilder.setContentType(objectSerializer.getContentType());

View File

@ -29,7 +29,6 @@ import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.NetworkUtils;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import java.io.IOException;
import java.util.ArrayList;
@ -142,7 +141,7 @@ public class DaprClientHttp extends AbstractDaprClient {
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic };
Map<String, String> queryArgs = metadataToQueryArgs(metadata);
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
return Mono.subscriberContext().flatMap(
context -> this.client.invokeApi(
DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, serializedEvent, headers, context
@ -185,7 +184,7 @@ public class DaprClientHttp extends AbstractDaprClient {
headers.putAll(httpExtension.getHeaders());
Mono<DaprHttp.Response> response = Mono.subscriberContext().flatMap(
context -> this.client.invokeApi(httpMethod, pathSegments,
httpExtension.getQueryString(), serializedRequestBody, headers, context)
httpExtension.getQueryParams(), serializedRequestBody, headers, context)
);
return response.flatMap(r -> getMono(type, r));
} catch (Exception ex) {
@ -294,7 +293,7 @@ public class DaprClientHttp extends AbstractDaprClient {
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "bulk"};
Map<String, String> queryArgs = metadataToQueryArgs(metadata);
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
return Mono.subscriberContext().flatMap(
context -> this.client
.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, requestBody, null, context)
@ -333,9 +332,10 @@ public class DaprClientHttp extends AbstractDaprClient {
.map(o -> o.getStateOptionsAsMap())
.orElse(Collections.emptyMap());
final Map<String, String> queryParams = new HashMap<>();
final Map<String, List<String>> queryParams = new HashMap<>();
queryParams.putAll(metadataToQueryArgs(metadata));
queryParams.putAll(optionsMap);
queryParams.putAll(optionsMap.entrySet().stream().collect(
Collectors.toMap(kv -> kv.getKey(), kv -> Collections.singletonList(kv.getValue()))));
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key};
@ -478,12 +478,13 @@ public class DaprClientHttp extends AbstractDaprClient {
}
Map<String, String> optionsMap = Optional.ofNullable(options)
.map(stateOptions -> stateOptions.getStateOptionsAsMap())
.map(o -> o.getStateOptionsAsMap())
.orElse(Collections.emptyMap());
final Map<String, String> queryParams = new HashMap<>();
final Map<String, List<String>> queryParams = new HashMap<>();
queryParams.putAll(metadataToQueryArgs(metadata));
queryParams.putAll(optionsMap);
queryParams.putAll(optionsMap.entrySet().stream().collect(
Collectors.toMap(kv -> kv.getKey(), kv -> Collections.singletonList(kv.getValue()))));
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key};
@ -572,7 +573,7 @@ public class DaprClientHttp extends AbstractDaprClient {
return DaprException.wrapMono(e);
}
Map<String, String> queryArgs = metadataToQueryArgs(metadata);
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, key};
return Mono.subscriberContext().flatMap(
@ -608,7 +609,7 @@ public class DaprClientHttp extends AbstractDaprClient {
return DaprException.wrapMono(e);
}
Map<String, String> queryArgs = metadataToQueryArgs(metadata);
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, "bulk"};
return Mono.subscriberContext().flatMap(
@ -638,19 +639,19 @@ public class DaprClientHttp extends AbstractDaprClient {
}
/**
* Converts metadata map into HTTP headers.
* Converts metadata map into Query params.
* @param metadata metadata map
* @return HTTP headers
* @return Query params
*/
private static Map<String, String> metadataToQueryArgs(Map<String, String> metadata) {
private static Map<String, List<String>> metadataToQueryArgs(Map<String, String> metadata) {
if (metadata == null) {
return Collections.EMPTY_MAP;
return Collections.emptyMap();
}
return metadata
.entrySet()
.stream()
.filter(e -> e.getKey() != null)
.collect(Collectors.toMap(e -> METADATA_PREFIX + e.getKey(), e -> e.getValue()));
.collect(Collectors.toMap(e -> METADATA_PREFIX + e.getKey(), e -> Collections.singletonList(e.getValue())));
}
}

View File

@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
@ -158,7 +159,7 @@ public class DaprHttp implements AutoCloseable {
public Mono<Response> invokeApi(
String method,
String[] pathSegments,
Map<String, String> urlParameters,
Map<String, List<String>> urlParameters,
Map<String, String> headers,
Context context) {
return this.invokeApi(method, pathSegments, urlParameters, (byte[]) null, headers, context);
@ -178,7 +179,7 @@ public class DaprHttp implements AutoCloseable {
public Mono<Response> invokeApi(
String method,
String[] pathSegments,
Map<String, String> urlParameters,
Map<String, List<String>> urlParameters,
String content,
Map<String, String> headers,
Context context) {
@ -203,7 +204,7 @@ public class DaprHttp implements AutoCloseable {
public Mono<Response> invokeApi(
String method,
String[] pathSegments,
Map<String, String> urlParameters,
Map<String, List<String>> urlParameters,
byte[] content,
Map<String, String> headers,
Context context) {
@ -234,7 +235,7 @@ public class DaprHttp implements AutoCloseable {
*/
private CompletableFuture<Response> doInvokeApi(String method,
String[] pathSegments,
Map<String, String> urlParameters,
Map<String, List<String>> urlParameters,
byte[] content, Map<String, String> headers,
Context context) {
final String requestId = UUID.randomUUID().toString();
@ -257,7 +258,10 @@ public class DaprHttp implements AutoCloseable {
urlBuilder.addPathSegment(pathSegment);
}
Optional.ofNullable(urlParameters).orElse(Collections.emptyMap()).entrySet().stream()
.forEach(urlParameter -> urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameter.getValue()));
.forEach(urlParameter ->
Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream()
.forEach(urlParameterValue ->
urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue)));
Request.Builder requestBuilder = new Request.Builder()
.url(urlBuilder.build())

View File

@ -6,10 +6,13 @@
package io.dapr.client.domain;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.annotation.PropertyKey;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
/**
@ -54,6 +57,12 @@ public final class CloudEvent {
*/
private Object data;
/**
* Cloud event specs says binary data should be in data_base64.
*/
@JsonProperty("data_base64")
private byte[] binaryData;
/**
* Instantiates a CloudEvent.
*/
@ -84,6 +93,28 @@ public final class CloudEvent {
this.data = data;
}
/**
* Instantiates a CloudEvent.
* @param id Identifier of the message being processed.
* @param source Source for this event.
* @param type Type of event.
* @param specversion Version of the event spec.
* @param binaryData Payload.
*/
public CloudEvent(
String id,
String source,
String type,
String specversion,
byte[] binaryData) {
this.id = id;
this.source = source;
this.type = type;
this.specversion = specversion;
this.datacontenttype = "application/octet-stream";
this.binaryData = binaryData == null ? null : Arrays.copyOf(binaryData, binaryData.length);;
}
/**
* Deserialize a message topic from Dapr.
@ -196,6 +227,22 @@ public final class CloudEvent {
this.data = data;
}
/**
* Gets the cloud event's binary data.
* @return Cloud event's binary data.
*/
public byte[] getBinaryData() {
return this.binaryData == null ? null : Arrays.copyOf(this.binaryData, this.binaryData.length);
}
/**
* Sets the cloud event's binary data.
* @param binaryData Cloud event's binary data.
*/
public void setBinaryData(byte[] binaryData) {
this.binaryData = binaryData == null ? null : Arrays.copyOf(binaryData, binaryData.length);
}
/**
* {@inheritDoc}
*/
@ -213,7 +260,8 @@ public final class CloudEvent {
&& Objects.equals(type, that.type)
&& Objects.equals(specversion, that.specversion)
&& Objects.equals(datacontenttype, that.datacontenttype)
&& Objects.equals(data, that.data);
&& Objects.equals(data, that.data)
&& Arrays.equals(binaryData, that.binaryData);
}
/**
@ -221,6 +269,6 @@ public final class CloudEvent {
*/
@Override
public int hashCode() {
return Objects.hash(id, source, type, specversion, datacontenttype, data);
return Objects.hash(id, source, type, specversion, datacontenttype, data, binaryData);
}
}

View File

@ -6,9 +6,12 @@
package io.dapr.client.domain;
import io.dapr.client.DaprHttp;
import okhttp3.HttpUrl;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* HTTP Extension class.
@ -60,9 +63,9 @@ public final class HttpExtension {
private DaprHttp.HttpMethods method;
/**
* HTTP querystring.
* HTTP query params.
*/
private Map<String, String> queryString;
private Map<String, List<String>> queryParams;
/**
* HTTP headers.
@ -72,19 +75,21 @@ public final class HttpExtension {
/**
* Construct a HttpExtension object.
* @param method Required value denoting the HttpMethod.
* @param queryString map for the queryString the HTTP call.
* @param queryParams map for the query parameters the HTTP call.
* @param headers map to set HTTP headers.
* @see io.dapr.client.DaprHttp.HttpMethods for supported methods.
* @throws IllegalArgumentException on null method or queryString.
*/
public HttpExtension(DaprHttp.HttpMethods method, Map<String, String> queryString, Map<String, String> headers) {
public HttpExtension(DaprHttp.HttpMethods method,
Map<String, List<String>> queryParams,
Map<String, String> headers) {
if (method == null) {
throw new IllegalArgumentException("HttpExtension method cannot be null");
}
this.method = method;
this.queryString = Collections.unmodifiableMap(queryString == null ? Collections.EMPTY_MAP : queryString);
this.headers = Collections.unmodifiableMap(headers == null ? Collections.EMPTY_MAP : headers);
this.queryParams = Collections.unmodifiableMap(queryParams == null ? Collections.emptyMap() : queryParams);
this.headers = Collections.unmodifiableMap(headers == null ? Collections.emptyMap() : headers);
}
/**
@ -101,11 +106,31 @@ public final class HttpExtension {
return method;
}
public Map<String, String> getQueryString() {
return queryString;
public Map<String, List<String>> getQueryParams() {
return queryParams;
}
public Map<String, String> getHeaders() {
return headers;
}
/**
* Encodes the query string for the HTTP request.
* @return Encoded HTTP query string.
*/
public String encodeQueryString() {
if ((this.queryParams == null) || (this.queryParams.isEmpty())) {
return "";
}
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
// Setting required values but we only need query params in the end.
urlBuilder.scheme("http").host("localhost");
Optional.ofNullable(this.queryParams).orElse(Collections.emptyMap()).entrySet().stream()
.forEach(urlParameter ->
Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream()
.forEach(urlParameterValue ->
urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue)));
return urlBuilder.build().encodedQuery();
}
}

View File

@ -149,4 +149,26 @@ public class CloudEventTest {
assertEquals("AQI=", cloudEvent.getData());
assertArrayEquals(expected, OBJECT_MAPPER.convertValue(cloudEvent.getData(), byte[].class));
}
@Test
public void deserializeBinaryData() throws Exception {
String content = "{\n" +
" \"specversion\" : \"1.0\",\n" +
" \"type\" : \"com.github.pull_request.opened\",\n" +
" \"source\" : \"https://github.com/cloudevents/spec/pull\",\n" +
" \"subject\" : \"123\",\n" +
" \"id\" : \"A234-1234-1234\",\n" +
" \"time\" : \"2018-04-05T17:31:00Z\",\n" +
" \"comexampleextension1\" : \"value\",\n" +
" \"comexampleothervalue\" : 5,\n" +
" \"datacontenttype\" : \"application/octet-stream\",\n" +
" \"data_base64\" : \"AQI=\"\n" +
"}";
byte[] expected = new byte[]{ 0x1, 0x2 };
CloudEvent cloudEvent = CloudEvent.deserialize(content.getBytes());
assertEquals("application/octet-stream", cloudEvent.getDatacontenttype());
assertNull(cloudEvent.getData());
assertArrayEquals(expected, cloudEvent.getBinaryData());
}
}

View File

@ -549,14 +549,14 @@ public class DaprClientGrpcTest {
@Test
public void invokeServiceWithHttpExtensionTest() throws IOException {
HttpExtension httpExtension = new HttpExtension(
DaprHttp.HttpMethods.GET, Collections.singletonMap("test", "1"), null);
DaprHttp.HttpMethods.GET, Collections.singletonMap("test", Arrays.asList("1", "ab/c")), null);
CommonProtos.InvokeRequest message = CommonProtos.InvokeRequest.newBuilder()
.setMethod("method")
.setData(getAny("request"))
.setContentType("application/json")
.setHttpExtension(CommonProtos.HTTPExtension.newBuilder()
.setVerb(CommonProtos.HTTPExtension.Verb.GET)
.putQuerystring("test", "1").build())
.setQuerystring("test=1&test=ab%2Fc").build())
.build();
DaprProtos.InvokeServiceRequest request = DaprProtos.InvokeServiceRequest.newBuilder()
.setId("appId")

View File

@ -336,11 +336,12 @@ public class DaprClientHttpTest {
public void invokeServiceWithRequestAndQueryString() {
Map<String, String> map = new HashMap<>();
mockInterceptor.addRule()
.get("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder?test=1")
.get("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder?param1=1&param2=a&param2=b%2Fc")
.respond(EXPECTED_RESULT);
Map<String, String> queryString = new HashMap<>();
queryString.put("test", "1");
Map<String, List<String>> queryString = new HashMap<>();
queryString.put("param1", Collections.singletonList("1"));
queryString.put("param2", Arrays.asList("a", "b/c"));
HttpExtension httpExtension = new HttpExtension(DaprHttp.HttpMethods.GET, queryString, null);
Mono<Void> mono = daprClientHttp.invokeMethod("41", "neworder", "", httpExtension, map);
assertNull(mono.block());

View File

@ -5,9 +5,10 @@
package io.dapr.client;
import reactor.util.context.Context;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import java.util.List;
import java.util.Map;
/**
@ -32,7 +33,11 @@ public class DaprHttpStub extends DaprHttp {
* {@inheritDoc}
*/
@Override
public Mono<Response> invokeApi(String method, String[] pathSegments, Map<String, String> urlParameters, Map<String, String> headers, Context context) {
public Mono<Response> invokeApi(String method,
String[] pathSegments,
Map<String, List<String>> urlParameters,
Map<String, String> headers,
Context context) {
return Mono.empty();
}
@ -40,7 +45,12 @@ public class DaprHttpStub extends DaprHttp {
* {@inheritDoc}
*/
@Override
public Mono<Response> invokeApi(String method, String[] pathSegments, Map<String, String> urlParameters, String content, Map<String, String> headers, Context context) {
public Mono<Response> invokeApi(String method,
String[] pathSegments,
Map<String, List<String>> urlParameters,
String content,
Map<String, String> headers,
Context context) {
return Mono.empty();
}
@ -48,7 +58,12 @@ public class DaprHttpStub extends DaprHttp {
* {@inheritDoc}
*/
@Override
public Mono<Response> invokeApi(String method, String[] pathSegments, Map<String, String> urlParameters, byte[] content, Map<String, String> headers, Context context) {
public Mono<Response> invokeApi(String method,
String[] pathSegments,
Map<String, List<String>> urlParameters,
byte[] content,
Map<String, String> headers,
Context context) {
return Mono.empty();
}

View File

@ -19,7 +19,9 @@ import org.junit.contrib.java.lang.system.EnvironmentVariables;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@ -141,8 +143,8 @@ public class DaprHttpTest {
Map<String, String> headers = new HashMap<>();
headers.put("header", "value");
headers.put("header1", "value1");
Map<String, String> urlParameters = new HashMap<>();
urlParameters.put("orderId", "41");
Map<String, List<String>> urlParameters = new HashMap<>();
urlParameters.put("orderId", Collections.singletonList("41"));
mockInterceptor.addRule()
.get("http://127.0.0.1:3500/v1.0/state/order?orderId=41")
.respond(serializer.serialize(EXPECTED_RESULT));