initial bulk publish impl for java (#789)

* initial bulk publish impl for java

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* add UTs and clean up java doc for new interface methods.

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* add more interface methods for bulk publish

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* adding examples and ITs for bulk publish

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* addressing review comments

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* use latest ref from dapr branch

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* add example validation

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix bindings example validation

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* make changes for latest bulk publish dapr changes

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix examples

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix examples

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix typo

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* test against java 11 only

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* change to latest dapr commit

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* run only pubsub IT, upload failsafe reports as run artifact

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix checkstyle

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix IT report upload condition

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix compile issues

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix spotbugs issue

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* run pubsubIT only

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* change upload artifact name for IT

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix tests

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* introduce sleep

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* test bulk publish with redis

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* change longvalues test to kafka

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* change bulk pub to kafka and revert long values changes

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* remove kafka pubsub from pubsub IT

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* change match order in examples

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* set fail fast as false

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix Internal Invoke exception in ITs

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* address review comments

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix IT

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix app scopes in examples

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* add content to daprdocs

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* address review comments

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix mm.py step comment

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* reset bindings examples readme

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

* fix example, IT and make classes immutable

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>

Signed-off-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Co-authored-by: Artur Souza <asouza.pro@gmail.com>
This commit is contained in:
Mukundan Sundararajan 2023-01-20 02:27:06 +05:30 committed by GitHub
parent eb8565cca0
commit 81591b9f5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1942 additions and 50 deletions

View File

@ -100,9 +100,10 @@ jobs:
- name: Codecov
uses: codecov/codecov-action@v3.1.0
- name: Install jars
run: mvn install -q
run: mvn install -q
- name: Integration tests
run: mvn -f sdk-tests/pom.xml verify -q
id: integration_tests
run: mvn -f sdk-tests/pom.xml verify
- name: Upload test report for sdk
uses: actions/upload-artifact@master
with:
@ -113,6 +114,19 @@ jobs:
with:
name: report-dapr-java-sdk-actors
path: sdk-actors/target/jacoco-report/
- name: Upload failsafe test report for sdk-tests on failure
if: ${{ failure() && steps.integration_tests.conclusion == 'failure' }}
uses: actions/upload-artifact@master
with:
name: failsafe-report-sdk-tests
path: sdk-tests/target/failsafe-reports
- name: Upload surefire test report for sdk-tests on failure
if: ${{ failure() && steps.integration_tests.conclusion == 'failure' }}
uses: actions/upload-artifact@master
with:
name: surefire-report-sdk-tests
path: sdk-tests/target/surefire-reports
publish:
runs-on: ubuntu-latest
needs: build

View File

@ -28,6 +28,7 @@ jobs:
validate:
runs-on: ubuntu-latest
strategy:
fail-fast: false # Keep running if one leg fails.
matrix:
java: [ 11, 13, 15, 16 ]
env:
@ -129,10 +130,10 @@ jobs:
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/state/README.md
- name: Validate pubsub HTTP example
- name: Validate pubsub example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/pubsub/http/README.md
mm.py ./src/main/java/io/dapr/examples/pubsub/README.md
- name: Validate bindings HTTP example
working-directory: ./examples
run: |

View File

@ -222,6 +222,35 @@ public class SubscriberController {
}
```
##### Bulk Publish Messages
> Note: API is in Alpha stage
```java
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import java.util.ArrayList;
import java.util.List;
class Solution {
public void publishMessages() {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
// Create a list of messages to publish
List<String> messages = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
messages.add(message);
System.out.println("Going to publish message : " + message);
}
// Publish list of messages using the bulk publish API
BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block()
}
}
}
```
- For a full guide on publishing messages and subscribing to a topic [How-To: Publish & subscribe]({{< ref howto-publish-subscribe.md >}}).
- Visit [Java SDK examples](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/pubsub/http) for code samples and instructions to try out pub/sub

View File

@ -10,3 +10,8 @@ spec:
value: localhost:6379
- name: redisPassword
value: ""
scopes:
- publisher
- bulk-publisher
- subscriber
- publisher-tracing

View File

@ -50,7 +50,7 @@ cd examples
Before getting into the application code, follow these steps in order to set up a local instance of Kafka. This is needed for the local instances. Steps are:
1. To run container locally run:
1. To run container locally run:
<!-- STEP
name: Setup kafka container

View File

@ -0,0 +1,101 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.pubsub;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.examples.OpenTelemetryConfig;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import java.util.ArrayList;
import java.util.List;
import static io.dapr.examples.OpenTelemetryConfig.getReactorContext;
/**
* Message publisher.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id bulk-publisher -- \
* java -Ddapr.grpc.port="50010" -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.BulkPublisher
*/
public class BulkPublisher {
private static final int NUM_MESSAGES = 10;
private static final String TOPIC_NAME = "bulkpublishtesting";
//The name of the pubsub
private static final String PUBSUB_NAME = "messagebus";
/**
* main method.
*
* @param args incoming args
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry();
Tracer tracer = openTelemetry.getTracer(BulkPublisher.class.getCanonicalName());
Span span = tracer.spanBuilder("Bulk Publisher's Main").setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
DaprClient c = (DaprClient) client;
c.waitForSidecar(10000);
try (Scope scope = span.makeCurrent()) {
System.out.println("Using preview client...");
List<String> messages = new ArrayList<>();
System.out.println("Constructing the list of messages to publish");
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
messages.add(message);
System.out.println("Going to publish message : " + message);
}
BulkPublishResponse<?> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages)
.subscriberContext(getReactorContext()).block();
System.out.println("Published the set of messages in a single call to Dapr");
if (res != null) {
if (res.getFailedEntries().size() > 0) {
// Ideally this condition will not happen in examples
System.out.println("Some events failed to be published");
for (BulkPublishResponseFailedEntry<?> entry : res.getFailedEntries()) {
System.out.println("EntryId : " + entry.getEntry().getEntryId()
+ " Error message : " + entry.getErrorMessage());
}
}
} else {
throw new Exception("null response from dapr");
}
}
// Close the span.
span.end();
// Allow plenty of time for Dapr to export all relevant spans to the tracing infra.
Thread.sleep(10000);
// Shutdown the OpenTelemetry tracer.
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();
System.out.println("Done");
}
}
}

View File

@ -0,0 +1,94 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.pubsub;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.client.domain.CloudEvent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* Message publisher.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher -- \
* java -Ddapr.grpc.port="50010" \
* -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.CloudEventBulkPublisher
*/
public class CloudEventBulkPublisher {
private static final int NUM_MESSAGES = 10;
private static final String TOPIC_NAME = "bulkpublishtesting";
//The name of the pubsub
private static final String PUBSUB_NAME = "messagebus";
/**
* main method.
*
* @param args incoming args
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
System.out.println("Using preview client...");
List<BulkPublishEntry<CloudEvent<Map<String, String>>>> entries = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
CloudEvent<Map<String, String>> cloudEvent = new CloudEvent<>();
cloudEvent.setId(UUID.randomUUID().toString());
cloudEvent.setType("example");
cloudEvent.setSpecversion("1");
cloudEvent.setDatacontenttype("application/json");
String val = String.format("This is message #%d", i);
cloudEvent.setData(new HashMap<>() {
{
put("dataKey", val);
}
});
BulkPublishEntry<CloudEvent<Map<String, String>>> entry = new BulkPublishEntry<>(
"" + (i + 1), cloudEvent, CloudEvent.CONTENT_TYPE, null);
entries.add(entry);
}
BulkPublishRequest<CloudEvent<Map<String, String>>> request = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
entries);
BulkPublishResponse<?> res = client.publishEvents(request).block();
if (res != null) {
if (res.getFailedEntries().size() > 0) {
// Ideally this condition will not happen in examples
System.out.println("Some events failed to be published");
for (BulkPublishResponseFailedEntry<?> entry : res.getFailedEntries()) {
System.out.println("EntryId : " + entry.getEntry().getEntryId()
+ " Error message : " + entry.getErrorMessage());
}
}
} else {
throw new Exception("null response");
}
System.out.println("Done");
}
}
}

View File

@ -11,7 +11,7 @@
limitations under the License.
*/
package io.dapr.examples.pubsub.http;
package io.dapr.examples.pubsub;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
@ -30,7 +30,7 @@ import static java.util.Collections.singletonMap;
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.CloudEventPublisher
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.CloudEventPublisher
*/
public class CloudEventPublisher {

View File

@ -11,7 +11,7 @@
limitations under the License.
*/
package io.dapr.examples.pubsub.http;
package io.dapr.examples.pubsub;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
@ -26,7 +26,7 @@ import static java.util.Collections.singletonMap;
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Publisher
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher
*/
public class Publisher {

View File

@ -11,7 +11,7 @@
limitations under the License.
*/
package io.dapr.examples.pubsub.http;
package io.dapr.examples.pubsub;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
@ -31,7 +31,7 @@ import static io.dapr.examples.OpenTelemetryConfig.getReactorContext;
* 2. cd [repo root]/examples
* 3. Run the program:
* dapr run --components-path ./components/pubsub --app-id publisher-tracing -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.PublisherWithTracing
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.PublisherWithTracing
*/
public class PublisherWithTracing {

View File

@ -6,7 +6,7 @@ Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub
## Pub-Sub Sample using the Java-SDK
This sample uses the HTTP Client provided in Dapr Java SDK for subscribing, and Dapr Spring Boot integration for publishing. This example uses Redis Streams (enabled in Redis versions => 5).
This sample uses the HTTP Springboot integration provided in Dapr Java SDK for subscribing, and gRPC client for publishing. This example uses Redis Streams (enabled in Redis versions => 5).
## Pre-requisites
* [Dapr and Dapr Cli](https://docs.dapr.io/getting-started/install-dapr/).
@ -124,6 +124,8 @@ match_order: none
expected_stdout_lines:
- '== APP == Subscriber got: This is message #1'
- '== APP == Subscriber got: This is message #2'
- '== APP == Subscriber got from bulk published topic: This is message #2'
- '== APP == Subscriber got from bulk published topic: This is message #3'
- '== APP == Bulk Subscriber got: This is message #1'
- '== APP == Bulk Subscriber got: This is message #2'
background: true
@ -131,14 +133,14 @@ sleep: 5
-->
```bash
dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000
dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Subscriber -p 3000
```
<!-- END_STEP -->
### Running the publisher
The other component is the publisher. It is a simple java application with a main method that uses the Dapr HTTP Client to publish 10 messages to an specific topic.
Another component is the publisher. It is a simple java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic.
In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and received objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
Dapr sidecar will automatically wrap the payload received into a CloudEvent object, which will later on parsed by the subscriber.
@ -175,7 +177,7 @@ The `CloudEventPublisher.java` file shows how the same can be accomplished if th
In this case, the app MUST override the content-type parameter via `withContentType()`, so Dapr sidecar knows that the payload is already a CloudEvent object.
```java
public class Publisher {
public class CloudEventPublisher {
///...
public static void main(String[] args) throws Exception {
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
@ -215,7 +217,7 @@ sleep: 15
-->
```bash
dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Publisher
dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher
```
<!-- END_STEP -->
@ -288,6 +290,196 @@ Once running, the Subscriber should print the output as follows:
Messages have been retrieved from the topic.
### Bulk Publish Messages
> Note : This API is currently in Alpha stage in Dapr runtime, hence the API methods in SDK are part of the DaprPreviewClient class.
Another feature provided by the SDK is to allow users to publish multiple messages in a single call to the Dapr sidecar.
For this example, we have a simple java application with a main method that uses the Dapr gPRC Preview Client to publish 10 messages to a specific topic in a single call.
In the `BulkPublisher.java` file, you will find the `BulkPublisher` class, containing the main method. The main method declares a Dapr Preview Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and recieved objects, and second is for objects to be persisted.
The client publishes messages using `publishEvents` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
Dapr sidecar will automatically wrap the payload received into a CloudEvent object, which will later on be parsed by the subscriber.
```java
public class BulkPublisher {
private static final int NUM_MESSAGES = 10;
private static final String TOPIC_NAME = "kafkatestingtopic";
private static final String PUBSUB_NAME = "kafka-pubsub";
///...
public static void main(String[] args) throws Exception {
OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry();
Tracer tracer = openTelemetry.getTracer(BulkPublisher.class.getCanonicalName());
Span span = tracer.spanBuilder("Bulk Publisher's Main").setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
DaprClient c = (DaprClient)client;
c.waitForSidecar(10000);
try (Scope scope = span.makeCurrent()) {
System.out.println("Using preview client...");
List<String> messages = new ArrayList<>();
System.out.println("Constructing the list of messages to publish");
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
messages.add(message);
System.out.println("Going to publish message : " + message);
}
BulkPublishResponse res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, messages, "text/plain")
.subscriberContext(getReactorContext()).block();
System.out.println("Published the set of messages in a single call to Dapr");
if (res != null) {
if (res.getFailedEntries().size() > 0) {
// Ideally this condition will not happen in examples
System.out.println("Some events failed to be published");
for (BulkPublishResponseFailedEntry entry : res.getFailedEntries()) {
System.out.println("EntryId : " + entry.getEntryId() + " Error message : " + entry.getErrorMessage());
}
}
} else {
throw new Exception("null response from dapr");
}
}
// Close the span.
span.end();
// Allow plenty of time for Dapr to export all relevant spans to the tracing infra.
Thread.sleep(10000);
// Shutdown the OpenTelemetry tracer.
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();
}
}
```
The code uses the `DaprPreviewClient` created by the `DaprClientBuilder` is used for the `publishEvents` (BulkPublish) preview API.
In this case, when `publishEvents` call is made, one of the argument to the method is the content type of data, this being `text/plain` in the example.
In this case, when parsing and printing the response, there is a concept of EntryID, which is automatically generated or can be set manually when using the `BulkPublishRequest` object.
The EntryID is a request scoped ID, in this case automatically generated as the index of the message in the list of messages in the `publishEvents` call.
The response, will be empty if all events are published successfully or it will contain the list of events that have failed.
The code also shows the scenario where it is possible to start tracing in code and pass on that tracing context to Dapr.
The `CloudEventBulkPublisher.java` file shows how the same can be accomplished if the application must send a CloudEvent object instead of relying on Dapr's automatic CloudEvent "wrapping".
In this case, the application **MUST** override the content-type parameter via `withContentType()`, so Dapr sidecar knows that the payload is already a CloudEvent object.
```java
public class CloudEventBulkPublisher {
///...
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
// Construct request
BulkPublishRequest<CloudEvent<Map<String, String>>> request = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME);
List<BulkPublishRequestEntry<CloudEvent<Map<String, String>>>> entries = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
CloudEvent<Map<String, String>> cloudEvent = new CloudEvent<>();
cloudEvent.setId(UUID.randomUUID().toString());
cloudEvent.setType("example");
cloudEvent.setSpecversion("1");
cloudEvent.setDatacontenttype("application/json");
String val = String.format("This is message #%d", i);
cloudEvent.setData(new HashMap<>() {
{
put("dataKey", val);
}
});
BulkPublishRequestEntry<CloudEvent<Map<String, String>>> entry = new BulkPublishRequestEntry<>();
entry.setEntryID("" + (i + 1))
.setEvent(cloudEvent)
.setContentType(CloudEvent.CONTENT_TYPE);
entries.add(entry);
}
request.setEntries(entries);
// Publish events
BulkPublishResponse res = client.publishEvents(request).block();
if (res != null) {
if (res.getFailedEntries().size() > 0) {
// Ideally this condition will not happen in examples
System.out.println("Some events failed to be published");
for (BulkPublishResponseFailedEntry entry : res.getFailedEntries()) {
System.out.println("EntryId : " + entry.getEntryId() + " Error message : " + entry.getErrorMessage());
}
}
} else {
throw new Exception("null response");
}
System.out.println("Done");
}
}
}
```
Use the follow command to execute the BulkPublisher example:
<!-- STEP
name: Run Bulk Publisher
match_order: sequential
expected_stdout_lines:
- '== APP == Published the set of messages in a single call to Dapr'
background: true
sleep: 20
-->
```bash
dapr run --components-path ./components/pubsub --app-id bulk-publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.BulkPublisher
```
<!-- END_STEP -->
Once running, the BulkPublisher should print the output as follows:
```txt
✅ You're up and running! Both Dapr and your app logs will appear here.
== APP == Using preview client...
== APP == Constructing the list of messages to publish
== APP == Going to publish message : This is message #0
== APP == Going to publish message : This is message #1
== APP == Going to publish message : This is message #2
== APP == Going to publish message : This is message #3
== APP == Going to publish message : This is message #4
== APP == Going to publish message : This is message #5
== APP == Going to publish message : This is message #6
== APP == Going to publish message : This is message #7
== APP == Going to publish message : This is message #8
== APP == Going to publish message : This is message #9
== APP == Published the set of messages in a single call to Dapr
== APP == Done
```
Messages have been published in the topic.
The Subscriber started previously [here](#running-the-subscriber) should print the output as follows:
```txt
== APP == Subscriber got from bulk published topic: This is message #1
== APP == Subscriber got: {"id":"323935ed-d8db-4ea2-ba28-52352b1d1b34","source":"bulk-publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"text/plain","data":"This is message #1","data_base64":null}
== APP == Subscriber got from bulk published topic: This is message #0
== APP == Subscriber got: {"id":"bb2f4833-0473-446b-a6cc-04a36de5ac0a","source":"bulk-publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"text/plain","data":"This is message #0","data_base64":null}
== APP == Subscriber got from bulk published topic: This is message #5
== APP == Subscriber got: {"id":"07bad175-4be4-4beb-a983-4def2eba5768","source":"bulk-publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"text/plain","data":"This is message #5","data_base64":null}
== APP == Subscriber got from bulk published topic: This is message #6
== APP == Subscriber got: {"id":"b99fba4d-732a-4d18-bf10-b37916dedfb1","source":"bulk-publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"text/plain","data":"This is message #6","data_base64":null}
== APP == Subscriber got from bulk published topic: This is message #2
== APP == Subscriber got: {"id":"2976f254-7859-449e-b66c-57fab4a72aef","source":"bulk-publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"text/plain","data":"This is message #2","data_base64":null}
== APP == Subscriber got from bulk published topic: This is message #3
== APP == Subscriber got: {"id":"f21ff2b5-4842-481d-9a96-e4c299d1c463","source":"bulk-publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"text/plain","data":"This is message #3","data_base64":null}
== APP == Subscriber got from bulk published topic: This is message #4
== APP == Subscriber got: {"id":"4bf50438-e576-4f5f-bb40-bd31c716ad02","source":"bulk-publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"text/plain","data":"This is message #4","data_base64":null}
== APP == Subscriber got from bulk published topic: This is message #7
== APP == Subscriber got: {"id":"f0c8b53b-7935-478e-856b-164d329d25ab","source":"bulk-publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"text/plain","data":"This is message #7","data_base64":null}
== APP == Subscriber got from bulk published topic: This is message #9
== APP == Subscriber got: {"id":"b280569f-cc29-471f-9cb7-682d8d6bd553","source":"bulk-publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"text/plain","data":"This is message #9","data_base64":null}
== APP == Subscriber got from bulk published topic: This is message #8
== APP == Subscriber got: {"id":"df20d841-296e-4c6b-9dcb-dd17920538e7","source":"bulk-publisher","type":"com.dapr.event.sent","specversion":"1.0","datacontenttype":"text/plain","data":"This is message #8","data_base64":null}
```
> Note: Redis pubsub component does not have a native and uses Dapr runtime's default bulk publish implementation which is concurrent, thus the order of the events that are published are not guaranteed.
Messages have been retrieved from the topic.
### Bulk Subscription
You can also run the publisher to publish messages to `testingtopicbulk` topic, and receive messages using the bulk subscription.
<!-- STEP
@ -300,12 +492,12 @@ sleep: 15
-->
```bash
dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Publisher testingtopicbulk
dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher testingtopicbulk
```
<!-- END_STEP -->
Once running, the Publisher should print the same output as above. The Subscriber should print the output as follows:
Once running, the Publisher should print the same output as seen [above](#running-the-publisher). The Subscriber should print the output as follows:
```txt
== APP == Bulk Subscriber got 10 messages.
@ -346,10 +538,15 @@ Once you click on the tracing event, you will see the details of the call stack
![zipking-details](https://raw.githubusercontent.com/dapr/java-sdk/master/examples/src/main/resources/img/zipkin-pubsub-details.png)
Once you click on the bulk publisher tracing event, you will see the details of the call stack starting in the client and then showing the service API calls right below.
![zipking-details](../../../../../../resources/img/zipkin-bulk-publish-details.png)
If you would like to add a tracing span as a parent of the span created by Dapr, change the publisher to handle that. See `PublisherWithTracing.java` to see the difference and run it with:
<!-- STEP
name: Run Publisher
name: Run Publisher with tracing
expected_stdout_lines:
- '== APP == Published message: This is message #0'
- '== APP == Published message: This is message #1'
@ -358,7 +555,7 @@ sleep: 15
-->
```bash
dapr run --components-path ./components/pubsub --app-id publisher-tracing -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.PublisherWithTracing
dapr run --components-path ./components/pubsub --app-id publisher-tracing -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.PublisherWithTracing
```
<!-- END_STEP -->
@ -387,12 +584,12 @@ mvn install
Run the publisher app:
```sh
dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Publisher
dapr run --components-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher
```
Wait until all 10 messages are published like before, then wait for a few more seconds and run the subscriber app:
```sh
dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000
dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Subscriber -p 3000
```
No message is consumed by the subscriber app and warnings messages are emitted from Dapr sidecar:
@ -419,7 +616,7 @@ No message is consumed by the subscriber app and warnings messages are emitted f
```
For more details on Dapr Spring Boot integration, please refer to [Dapr Spring Boot](../../DaprApplication.java) Application implementation.
For more details on Dapr Spring Boot integration, please refer to [Dapr Spring Boot](../DaprApplication.java) Application implementation.
### Cleanup
@ -429,6 +626,7 @@ name: Cleanup
```bash
dapr stop --app-id publisher
dapr stop --app-id bulk-publisher
dapr stop --app-id subscriber
```

View File

@ -11,7 +11,7 @@
limitations under the License.
*/
package io.dapr.examples.pubsub.http;
package io.dapr.examples.pubsub;
import io.dapr.examples.DaprApplication;
import org.apache.commons.cli.CommandLine;
@ -26,7 +26,7 @@ import org.apache.commons.cli.Options;
* 2. cd [repo root]/examples
* 3. Run the server:
* dapr run --components-path ./components/pubsub --app-id subscriber --app-port 3000 -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.http.Subscriber -p 3000
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Subscriber -p 3000
*/
public class Subscriber {

View File

@ -11,7 +11,7 @@
limitations under the License.
*/
package io.dapr.examples.pubsub.http;
package io.dapr.examples.pubsub;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Rule;
@ -79,6 +79,24 @@ public class SubscriberController {
});
}
/**
* Handles a registered publish endpoint on this app (bulk published events).
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
@Topic(name = "bulkpublishtesting", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/bulkpublishtesting")
public Mono<Void> handleBulkPublishMessage(@RequestBody(required = false) CloudEvent cloudEvent) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber got from bulk published topic: " + cloudEvent.getData());
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
/**
* Handles a registered subscribe endpoint on this app using bulk subscribe.
*

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.1 MiB

View File

@ -17,6 +17,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
@ -51,9 +55,9 @@ import static io.dapr.it.Retry.callWithRetry;
import static io.dapr.it.TestUtils.assertThrowsDaprException;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
@RunWith(Parameterized.class)
@ -71,6 +75,8 @@ public class PubSubIT extends BaseIT {
private static final String PUBSUB_NAME = "messagebus";
//The title of the topic to be used for publishing
private static final String TOPIC_NAME = "testingtopic";
private static final String TOPIC_BULK = "testingbulktopic";
private static final String TYPED_TOPIC_NAME = "typedtestingtopic";
private static final String ANOTHER_TOPIC_NAME = "anothertopic";
// Topic used for TTL test
@ -138,6 +144,162 @@ public class PubSubIT extends BaseIT {
}
}
@Test
public void testBulkPublishPubSubNotFound() throws Exception {
DaprRun daprRun = closeLater(startDaprApp(
this.getClass().getSimpleName(),
60000));
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
// No HTTP implementation for bulk publish
System.out.println("no HTTP impl for bulkPublish");
return;
}
try (DaprPreviewClient client = new DaprClientBuilder().buildPreviewClient()) {
assertThrowsDaprException(
"INVALID_ARGUMENT",
"INVALID_ARGUMENT: pubsub unknown pubsub not found",
() -> client.publishEvents("unknown pubsub", "mytopic","text/plain", "message").block());
}
}
@Test
public void testBulkPublish() throws Exception {
final DaprRun daprRun = closeLater(startDaprApp(
this.getClass().getSimpleName(),
SubscriberService.SUCCESS_MESSAGE,
SubscriberService.class,
true,
60000));
// At this point, it is guaranteed that the service above is running and all ports being listened to.
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
System.out.println("HTTP BulkPublish is not implemented. So skipping tests");
return;
}
DaprObjectSerializer serializer = new DaprObjectSerializer() {
@Override
public byte[] serialize(Object o) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsBytes(o);
}
@Override
public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
return (T) OBJECT_MAPPER.readValue(data, OBJECT_MAPPER.constructType(type.getType()));
}
@Override
public String getContentType() {
return "application/json";
}
};
try (DaprClient client = new DaprClientBuilder().withObjectSerializer(serializer).build();
DaprPreviewClient previewClient = new DaprClientBuilder().withObjectSerializer(serializer).buildPreviewClient()) {
// Only for the gRPC test
// Send a multiple messages on one topic in messagebus pubsub via publishEvents API.
List<String> messages = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
messages.add(String.format("This is message #%d on topic %s", i, TOPIC_BULK));
}
//Publishing 10 messages
BulkPublishResponse response = previewClient.publishEvents(PUBSUB_NAME, TOPIC_BULK, "", messages).block();
System.out.println(String.format("Published %d messages to topic '%s' pubsub_name '%s'",
NUM_MESSAGES, TOPIC_BULK, PUBSUB_NAME));
assertNotNull("expected not null bulk publish response", response);
Assert.assertEquals("expected no failures in the response", 0, response.getFailedEntries().size());
//Publishing an object.
MyObject object = new MyObject();
object.setId("123");
response = previewClient.publishEvents(PUBSUB_NAME, TOPIC_BULK,
"application/json", Collections.singletonList(object)).block();
System.out.println("Published one object.");
assertNotNull("expected not null bulk publish response", response);
Assert.assertEquals("expected no failures in the response", 0, response.getFailedEntries().size());
//Publishing a single byte: Example of non-string based content published
previewClient.publishEvents(PUBSUB_NAME, TOPIC_BULK, "",
Collections.singletonList(new byte[]{1})).block();
System.out.println("Published one byte.");
assertNotNull("expected not null bulk publish response", response);
Assert.assertEquals("expected no failures in the response", 0, response.getFailedEntries().size());
CloudEvent cloudEvent = new CloudEvent();
cloudEvent.setId("1234");
cloudEvent.setData("message from cloudevent");
cloudEvent.setSource("test");
cloudEvent.setSpecversion("1");
cloudEvent.setType("myevent");
cloudEvent.setDatacontenttype("text/plain");
BulkPublishRequest<CloudEvent> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_BULK,
Collections.singletonList(
new BulkPublishEntry<>("1", cloudEvent, "application/cloudevents+json", null)
));
//Publishing a cloud event.
previewClient.publishEvents(req).block();
assertNotNull("expected not null bulk publish response", response);
Assert.assertEquals("expected no failures in the response", 0, response.getFailedEntries().size());
System.out.println("Published one cloud event.");
// Introduce sleep
Thread.sleep(10000);
// Check messagebus subscription for topic testingbulktopic since it is populated only by publishEvents API call
callWithRetry(() -> {
System.out.println("Checking results for topic " + TOPIC_BULK + " in pubsub " + PUBSUB_NAME);
// Validate text payload.
final List<CloudEvent> cloudEventMessages = client.invokeMethod(
daprRun.getAppName(),
"messages/redis/testingbulktopic",
null,
HttpExtension.GET,
CLOUD_EVENT_LIST_TYPE_REF).block();
assertEquals("expected 13 messages to be received on subscribe", 13, cloudEventMessages.size());
for (int i = 0; i < NUM_MESSAGES; i++) {
final int messageId = i;
assertTrue("expected data content to match", cloudEventMessages
.stream()
.filter(m -> m.getData() != null)
.map(m -> m.getData())
.filter(m -> m.equals(String.format("This is message #%d on topic %s", messageId, TOPIC_BULK)))
.count() == 1);
}
// Validate object payload.
assertTrue("expected data content 123 to match", cloudEventMessages
.stream()
.filter(m -> m.getData() != null)
.filter(m -> m.getData() instanceof LinkedHashMap)
.map(m -> (LinkedHashMap) m.getData())
.filter(m -> "123".equals(m.get("id")))
.count() == 1);
// Validate byte payload.
assertTrue("expected bin data to match", cloudEventMessages
.stream()
.filter(m -> m.getData() != null)
.map(m -> m.getData())
.filter(m -> "AQ==".equals(m))
.count() == 1);
// Validate cloudevent payload.
assertTrue("expected data to match",cloudEventMessages
.stream()
.filter(m -> m.getData() != null)
.map(m -> m.getData())
.filter(m -> "message from cloudevent".equals(m))
.count() == 1);
}, 2000);
}
}
@Test
public void testPubSub() throws Exception {
final DaprRun daprRun = closeLater(startDaprApp(
@ -589,7 +751,7 @@ public class PubSubIT extends BaseIT {
"messages/testinglongvalues",
null,
HttpExtension.GET, CLOUD_EVENT_LONG_LIST_TYPE_REF).block();
Assert.assertNotNull(messages);
assertNotNull(messages);
for (CloudEvent<ConvertToLong> message : messages) {
actual.add(message.getData());
}

View File

@ -44,11 +44,18 @@ public class SubscriberController {
return messagesByTopic.getOrDefault(topic, Collections.emptyList());
}
private static final List<CloudEvent> messagesReceivedBulkPublishTopic = new ArrayList();
private static final List<CloudEvent> messagesReceivedTestingTopic = new ArrayList();
private static final List<CloudEvent> messagesReceivedTestingTopicV2 = new ArrayList();
private static final List<CloudEvent> messagesReceivedTestingTopicV3 = new ArrayList();
private static final List<BulkSubscribeAppResponse> responsesReceivedTestingTopicBulkSub = new ArrayList<>();
@GetMapping(path = "/messages/redis/testingbulktopic")
public List<CloudEvent> getMessagesReceivedBulkTopic() {
return messagesReceivedBulkPublishTopic;
}
private static final List<BulkSubscribeAppResponse> responsesReceivedTestingTopicBulk = new ArrayList<>();
@GetMapping(path = "/messages/testingtopic")
public List<CloudEvent> getMessagesReceivedTestingTopic() {
@ -67,7 +74,7 @@ public class SubscriberController {
@GetMapping(path = "/messages/topicBulkSub")
public List<BulkSubscribeAppResponse> getMessagesReceivedTestingTopicBulkSub() {
return responsesReceivedTestingTopicBulk;
return responsesReceivedTestingTopicBulkSub;
}
@Topic(name = "testingtopic", pubsubName = "messagebus")
@ -85,6 +92,21 @@ public class SubscriberController {
});
}
@Topic(name = "testingbulktopic", pubsubName = "messagebus")
@PostMapping("/route1_redis")
public Mono<Void> handleBulkTopicMessage(@RequestBody(required = false) CloudEvent envelope) {
return Mono.fromRunnable(() -> {
try {
String message = envelope.getData() == null ? "" : envelope.getData().toString();
String contentType = envelope.getDatacontenttype() == null ? "" : envelope.getDatacontenttype();
System.out.println("Testing bulk publish topic Subscriber got message: " + message + "; Content-type: " + contentType);
messagesReceivedBulkPublishTopic.add(envelope);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@Topic(name = "testingtopic", pubsubName = "messagebus",
rule = @Rule(match = "event.type == 'myevent.v2'", priority = 2))
@PostMapping(path = "/route1_v2")
@ -205,7 +227,7 @@ public class SubscriberController {
return Mono.fromCallable(() -> {
if (bulkMessage.getEntries().size() == 0) {
BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(new ArrayList<>());
responsesReceivedTestingTopicBulk.add(response);
responsesReceivedTestingTopicBulkSub.add(response);
return response;
}
@ -219,7 +241,7 @@ public class SubscriberController {
}
}
BulkSubscribeAppResponse response = new BulkSubscribeAppResponse(entries);
responsesReceivedTestingTopicBulk.add(response);
responsesReceivedTestingTopicBulkSub.add(response);
return response;
});
}

View File

@ -14,6 +14,9 @@ limitations under the License.
package io.dapr.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.ExecuteStateTransactionRequest;
@ -42,6 +45,7 @@ import io.dapr.utils.TypeRef;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -392,6 +396,50 @@ abstract class AbstractDaprClient implements DaprClient, DaprPreviewClient {
return this.queryState(request, TypeRef.get(clazz));
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
List<T> events) {
return publishEvents(pubsubName, topicName, contentType, null, events);
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
T... events) {
return publishEvents(pubsubName, topicName, contentType, null, events);
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
Map<String, String> requestMetadata, T... events) {
return publishEvents(pubsubName, topicName, contentType, requestMetadata, Arrays.asList(events));
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
Map<String, String> requestMetadata, List<T> events) {
if (events == null || events.size() == 0) {
throw new IllegalArgumentException("list of events cannot be null or empty");
}
List<BulkPublishEntry<T>> entries = new ArrayList<>();
for (int i = 0; i < events.size(); i++) {
// entryID field is generated based on order of events in the request
entries.add(new BulkPublishEntry<>("" + i, events.get(i), contentType, null));
}
return publishEvents(new BulkPublishRequest<>(pubsubName, topicName, entries, requestMetadata));
}
/**
* {@inheritDoc}
*/

View File

@ -17,6 +17,10 @@ import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.ExecuteStateTransactionRequest;
@ -44,6 +48,8 @@ import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.DefaultContentTypeConverter;
import io.dapr.utils.NetworkUtils;
import io.dapr.utils.TypeRef;
import io.dapr.v1.CommonProtos;
@ -65,6 +71,7 @@ import reactor.util.context.Context;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@ -185,6 +192,93 @@ public class DaprClientGrpc extends AbstractDaprClient {
}
}
/**
*
* {@inheritDoc}
*/
@Override
public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> request) {
try {
String pubsubName = request.getPubsubName();
String topic = request.getTopic();
DaprProtos.BulkPublishRequest.Builder envelopeBuilder = DaprProtos.BulkPublishRequest.newBuilder();
envelopeBuilder.setTopic(topic);
envelopeBuilder.setPubsubName(pubsubName);
if (Strings.isNullOrEmpty(pubsubName) || Strings.isNullOrEmpty(topic)) {
throw new IllegalArgumentException("pubsubName and topic name cannot be null or empty");
}
for (BulkPublishEntry<?> entry: request.getEntries()) {
Object event = entry.getEvent();
byte[] data;
String contentType = entry.getContentType();
try {
// Serialize event into bytes
if (!Strings.isNullOrEmpty(contentType) && objectSerializer instanceof DefaultObjectSerializer) {
// If content type is given by user and default object serializer is used
data = DefaultContentTypeConverter.convertEventToBytesForGrpc(event, contentType);
} else {
// perform the serialization as per user given input of serializer
// this is also the case when content type is empty
data = objectSerializer.serialize(event);
if (Strings.isNullOrEmpty(contentType)) {
// Only override content type if not given in input by user
contentType = objectSerializer.getContentType();
}
}
} catch (IOException ex) {
throw DaprException.propagate(ex);
}
DaprProtos.BulkPublishRequestEntry.Builder reqEntryBuilder = DaprProtos.BulkPublishRequestEntry.newBuilder()
.setEntryId(entry.getEntryId())
.setEvent(ByteString.copyFrom(data))
.setContentType(contentType);
Map<String, String> metadata = entry.getMetadata();
if (metadata != null) {
reqEntryBuilder.putAllMetadata(metadata);
}
envelopeBuilder.addEntries(reqEntryBuilder.build());
}
// Set metadata if available
Map<String, String> metadata = request.getMetadata();
if (metadata != null) {
envelopeBuilder.putAllMetadata(metadata);
}
Map<String, BulkPublishEntry<T>> entryMap = new HashMap<>();
for (BulkPublishEntry<T> entry: request.getEntries()) {
entryMap.put(entry.getEntryId(), entry);
}
return Mono.subscriberContext().flatMap(
context ->
this.<DaprProtos.BulkPublishResponse>createMono(
it -> intercept(context, asyncStub).bulkPublishEventAlpha1(envelopeBuilder.build(), it)
)
).map(
it -> {
List<BulkPublishResponseFailedEntry<T>> entries = new ArrayList<>();
for (DaprProtos.BulkPublishResponseFailedEntry entry : it.getFailedEntriesList()) {
BulkPublishResponseFailedEntry<T> domainEntry = new BulkPublishResponseFailedEntry<T>(
entryMap.get(entry.getEntryId()),
entry.getError());
entries.add(domainEntry);
}
if (entries.size() > 0) {
return new BulkPublishResponse<>(entries);
}
return new BulkPublishResponse<>();
}
);
} catch (RuntimeException ex) {
return DaprException.wrapMono(ex);
}
}
/**
* {@inheritDoc}
*/

View File

@ -15,6 +15,8 @@ package io.dapr.client;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Strings;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.ExecuteStateTransactionRequest;
@ -150,13 +152,18 @@ public class DaprClientHttp extends AbstractDaprClient {
try {
String pubsubName = request.getPubsubName();
String topic = request.getTopic();
Object data = request.getData();
Map<String, String> metadata = request.getMetadata();
if (pubsubName == null || pubsubName.trim().isEmpty()) {
throw new IllegalArgumentException("Pubsub name cannot be null or empty.");
}
if (topic == null || topic.trim().isEmpty()) {
throw new IllegalArgumentException("Topic name cannot be null or empty.");
}
Object data = request.getData();
Map<String, String> metadata = request.getMetadata();
byte[] serializedEvent = objectSerializer.serialize(data);
// Content-type can be overwritten on a per-request basis.
// It allows CloudEvents to be handled differently, for example.
@ -179,6 +186,14 @@ public class DaprClientHttp extends AbstractDaprClient {
}
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> request) {
return DaprException.wrapMono(new UnsupportedOperationException());
}
/**
* {@inheritDoc}
*/

View File

@ -13,6 +13,10 @@ limitations under the License.
package io.dapr.client;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.GetConfigurationRequest;
import io.dapr.client.domain.QueryStateRequest;
@ -244,4 +248,77 @@ public interface DaprPreviewClient extends AutoCloseable {
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, TypeRef<T> type);
/**
* Publish multiple events to Dapr in a single request.
*
* @param request {@link BulkPublishRequest} object.
* @return A Mono of {@link BulkPublishResponse} object.
* @param <T> The type of events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> request);
/**
* Publish multiple events to Dapr in a single request.
*
* @param pubsubName the pubsub name we will publish the event to.
* @param topicName the topicName where the event will be published.
* @param events the {@link List} of events to be published.
* @param contentType the content type of the event. Use Mime based types.
* @return the {@link BulkPublishResponse} containing publish status of each event.
* The "entryID" field in {@link BulkPublishEntry} in {@link BulkPublishResponseFailedEntry} will be
* generated based on the order of events in the {@link List}.
* @param <T> The type of the events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
List<T> events);
/**
* Publish multiple events to Dapr in a single request.
*
* @param pubsubName the pubsub name we will publish the event to.
* @param topicName the topicName where the event will be published.
* @param events the varargs of events to be published.
* @param contentType the content type of the event. Use Mime based types.
* @return the {@link BulkPublishResponse} containing publish status of each event.
* The "entryID" field in {@link BulkPublishEntry} in {@link BulkPublishResponseFailedEntry} will be
* generated based on the order of events in the {@link List}.
* @param <T> The type of the events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
T... events);
/**
* Publish multiple events to Dapr in a single request.
*
* @param pubsubName the pubsub name we will publish the event to.
* @param topicName the topicName where the event will be published.
* @param events the {@link List} of events to be published.
* @param contentType the content type of the event. Use Mime based types.
* @param requestMetadata the metadata to be set at the request level for the {@link BulkPublishRequest}.
* @return the {@link BulkPublishResponse} containing publish status of each event.
* The "entryID" field in {@link BulkPublishEntry} in {@link BulkPublishResponseFailedEntry} will be
* generated based on the order of events in the {@link List}.
* @param <T> The type of the events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
Map<String,String> requestMetadata, List<T> events);
/**
* Publish multiple events to Dapr in a single request.
*
* @param pubsubName the pubsub name we will publish the event to.
* @param topicName the topicName where the event will be published.
* @param events the varargs of events to be published.
* @param contentType the content type of the event. Use Mime based types.
* @param requestMetadata the metadata to be set at the request level for the {@link BulkPublishRequest}.
* @return the {@link BulkPublishResponse} containing publish status of each event.
* The "entryID" field in {@link BulkPublishEntry} in {@link BulkPublishResponseFailedEntry} will be
* generated based on the order of events in the {@link List}.
* @param <T> The type of the events to publish in the call.
*/
<T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicName, String contentType,
Map<String,String> requestMetadata, T... events);
}

View File

@ -0,0 +1,94 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Class representing an entry in the BulkPublishRequest or BulkPublishResponse.
*
* @param <T> Type of the event that is part of the request.
*/
public final class BulkPublishEntry<T> {
/**
* The ID uniquely identifying this particular request entry across the request and scoped for this request only.
*/
private final String entryId;
/**
* The event to be published.
*/
private final T event;
/**
* The content type of the event to be published. Uses MIME style content-type values.
*/
private final String contentType;
/**
* The metadata set for this particular event.
* Any particular values in this metadata overrides the request metadata present in BulkPublishRequest.
*/
private final Map<String, String> metadata;
/**
* Constructor for the BulkPublishRequestEntry object.
*
* @param entryId A request scoped ID uniquely identifying this entry in the BulkPublishRequest.
* @param event Event to be published.
* @param contentType Content Type of the event to be published in MIME format.
*/
public BulkPublishEntry(String entryId, T event, String contentType) {
this.entryId = entryId;
this.event = event;
this.contentType = contentType;
this.metadata = Collections.unmodifiableMap(new HashMap<>());
}
/**
* Constructor for the BulkPublishRequestEntry object.
*
* @param entryId A request scoped ID uniquely identifying this entry in the BulkPublishRequest.
* @param event Event to be published.
* @param contentType Content Type of the event to be published in MIME format.
* @param metadata Metadata for the event.
*/
public BulkPublishEntry(String entryId, T event, String contentType, Map<String, String> metadata) {
this.entryId = entryId;
this.event = event;
this.contentType = contentType;
this.metadata = metadata == null ? Collections.unmodifiableMap(new HashMap<>()) :
Collections.unmodifiableMap(metadata);
}
public String getEntryId() {
return entryId;
}
public T getEvent() {
return event;
}
public String getContentType() {
return contentType;
}
public Map<String, String> getMetadata() {
return metadata;
}
}

View File

@ -0,0 +1,102 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A request to bulk publish multiples events in a single call to Dapr.
*
* @param <T> Type parameter of the event.
*/
public final class BulkPublishRequest<T> {
/**
* The name of the pubsub to publish to.
*/
private final String pubsubName;
/**
* The name of the topic to publish to.
*/
private final String topic;
/**
* The metadata for the request sent to the pubsub broker.
* This is also used for setting common metadata for all entries in the request such as ttlInSeconds etc.
*/
private Map<String, String> metadata;
/**
* The list of entry objects that make up this request.
*/
private final List<BulkPublishEntry<T>> entries;
/**
* Constructor for BulkPublishRequest.
* @param pubsubName Name of the pubsub to publish to.
* @param topic Name of the topic to publish to.
* @param entries List of {@link BulkPublishEntry} objects.
*/
public BulkPublishRequest(String pubsubName, String topic, List<BulkPublishEntry<T>> entries) {
this.pubsubName = pubsubName;
this.topic = topic;
this.entries = entries == null ? Collections.unmodifiableList(new ArrayList<>()) :
Collections.unmodifiableList(entries);
}
/**
* Constructor for the BulkPublishRequest.
*
* @param pubsubName Name of the pubsub to publish to.
* @param topic Name of the topic to publish to.
* @param entries List of {@link BulkPublishEntry} objects.
* @param metadata Metadata for the request.
*/
public BulkPublishRequest(String pubsubName, String topic, List<BulkPublishEntry<T>> entries,
Map<String, String> metadata) {
this.pubsubName = pubsubName;
this.topic = topic;
this.entries = entries == null ? Collections.unmodifiableList(new ArrayList<>()) :
Collections.unmodifiableList(entries);
this.metadata = metadata == null ? Collections.unmodifiableMap(new HashMap<>()) :
Collections.unmodifiableMap(metadata);
}
public String getPubsubName() {
return pubsubName;
}
public String getTopic() {
return topic;
}
public Map<String, String> getMetadata() {
return metadata;
}
public BulkPublishRequest<T> setMetadata(Map<String, String> metadata) {
this.metadata = metadata == null ? null : Collections.unmodifiableMap(metadata);
return this;
}
public List<BulkPublishEntry<T>> getEntries() {
return entries;
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Class representing the response returned on bulk publishing events.
*/
public class BulkPublishResponse<T> {
/**
* List of {@link BulkPublishResponseFailedEntry} objects that have failed to publish.
*/
private final List<BulkPublishResponseFailedEntry<T>> failedEntries;
/**
* Default constructor for class.
*/
public BulkPublishResponse() {
this.failedEntries = Collections.unmodifiableList(new ArrayList<>());
}
/**
* Constructor for the BulkPublishResponse object.
*
* @param failedEntries The List of BulkPublishResponseEntries representing the list of
* events that failed to be published.
*/
public BulkPublishResponse(List<BulkPublishResponseFailedEntry<T>> failedEntries) {
this.failedEntries = failedEntries == null ? Collections.unmodifiableList(new ArrayList<>()) :
Collections.unmodifiableList(failedEntries);
}
public List<BulkPublishResponseFailedEntry<T>> getFailedEntries() {
return failedEntries;
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
/**
* Class representing the entry that failed to be published using BulkPublishRequest.
*/
public final class BulkPublishResponseFailedEntry<T> {
/**
* The entry that failed to be published.
*/
private final BulkPublishEntry<T> entry;
/**
* Error message as to why the entry failed to publish.
*/
private final String errorMessage;
/**
* Constructor for BulkPublishResponseFailedEntry.
* @param entry The entry that has failed.
* @param errorMessage The error message for why the entry failed.
*/
public BulkPublishResponseFailedEntry(BulkPublishEntry<T> entry, String errorMessage) {
this.entry = entry;
this.errorMessage = errorMessage;
}
public BulkPublishEntry<T> getEntry() {
return entry;
}
public String getErrorMessage() {
return errorMessage;
}
}

View File

@ -0,0 +1,176 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.utils;
import io.dapr.serializer.DefaultObjectSerializer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
/**
* A utility class for converting event to bytes based on content type or given serializer.
* When an application/json or application/cloudevents+json is given as content type, the object serializer is used
* to serialize the data into bytes
*/
public class DefaultContentTypeConverter {
private static final DefaultObjectSerializer OBJECT_SERIALIZER = new DefaultObjectSerializer();
/**
* Function to convert a given event to bytes for HTTP calls.
*
* @param <T> The type of the event
* @param event The input event
* @param contentType The content type of the event
* @return the event as bytes
* @throws IllegalArgumentException on mismatch between contentType and event types
* @throws IOException on serialization
*/
public static <T> byte[] convertEventToBytesForHttp(T event, String contentType)
throws IllegalArgumentException, IOException {
if (isBinaryContentType(contentType)) {
if (event instanceof byte[]) {
return Base64.getEncoder().encode((byte[]) event);
} else {
throw new IllegalArgumentException("mismatch between 'application/octect-stream' contentType and event. "
+ "expected binary data as bytes array");
}
} else if (isStringContentType(contentType)) {
if (event instanceof String) {
return ((String) event).getBytes(StandardCharsets.UTF_8);
} else if (event instanceof Boolean || event instanceof Number) {
return String.valueOf(event).getBytes(StandardCharsets.UTF_8);
} else {
throw new IllegalArgumentException("mismatch between string contentType and event. "
+ "expected event to be convertible into a string");
}
} else if (isJsonContentType(contentType) || isCloudEventContentType(contentType)) {
return OBJECT_SERIALIZER.serialize(event);
}
throw new IllegalArgumentException("mismatch between contentType and event");
}
/**
* Function to convert a given event to bytes for gRPC calls.
*
* @param <T> The type of the event
* @param event The input event
* @param contentType The content type of the event
* @return the event as bytes
* @throws IllegalArgumentException on mismatch between contentType and event types
* @throws IOException on serialization
*/
public static <T> byte[] convertEventToBytesForGrpc(T event, String contentType)
throws IllegalArgumentException, IOException {
if (isBinaryContentType(contentType)) {
if (event instanceof byte[]) {
// Return the bytes of the event directly for gRPC
return (byte[]) event;
} else {
throw new IllegalArgumentException("mismatch between 'application/octect-stream' contentType and event. "
+ "expected binary data as bytes array");
}
}
// The rest of the conversion is same as HTTP
return convertEventToBytesForHttp(event, contentType);
}
/**
* Function to convert a bytes array from HTTP input into event based on given object deserializer.
*
* @param <T> The type of the event
* @param event The input event
* @param contentType The content type of the event
* @param typeRef The type to convert the event to
* @return the event as bytes
* @throws IllegalArgumentException on mismatch between contentType and event types
* @throws IOException on serialization
*/
public static <T> T convertBytesToEventFromHttp(byte[] event, String contentType, TypeRef<T> typeRef)
throws IllegalArgumentException, IOException {
if (isBinaryContentType(contentType)) {
byte[] decoded = Base64.getDecoder().decode(new String(event, StandardCharsets.UTF_8));
return OBJECT_SERIALIZER.deserialize(decoded, typeRef);
} else if (isStringContentType(contentType)) {
if (TypeRef.STRING.equals(typeRef)) {
// This is a string data, required as string
return (T) new String(event, StandardCharsets.UTF_8);
} else if (TypeRef.isPrimitive(typeRef)) {
// This is primitive data
return OBJECT_SERIALIZER.deserialize(event, typeRef);
}
// There is mismatch between content type and required type cast
} else if (isJsonContentType(contentType) || isCloudEventContentType(contentType)) {
// This is normal JSON deserialization of the event
return OBJECT_SERIALIZER.deserialize(event, typeRef);
}
throw new IllegalArgumentException("mismatch between contentType and requested type cast in return");
}
/**
* Function to convert a bytes array from gRPC input into event based on given object deserializer.
*
* @param <T> The type of the event
* @param event The input event
* @param contentType The content type of the event
* @param typeRef The type to convert the event to
* @return the event as bytes
* @throws IllegalArgumentException on mismatch between contentType and event types
* @throws IOException on serialization
*/
public static <T> T convertBytesToEventFromGrpc(byte[] event, String contentType, TypeRef<T> typeRef)
throws IllegalArgumentException, IOException {
if (isBinaryContentType(contentType)) {
// The byte array is directly deserialized
return OBJECT_SERIALIZER.deserialize(event, typeRef);
}
// rest of the conversion is similar to the HTTP method
return convertBytesToEventFromHttp(event, contentType, typeRef);
}
public static boolean isCloudEventContentType(String contentType) {
return isContentType(contentType, "application/cloudevents+json");
}
public static boolean isJsonContentType(String contentType) {
return isContentType(contentType, "application/json");
}
public static boolean isStringContentType(String contentType) {
return contentType != null && (contentType.toLowerCase().startsWith("text/")
|| isContentType(contentType, "application/xml"));
}
public static boolean isBinaryContentType(String contentType) {
return isContentType(contentType, "application/octet-stream");
}
private static boolean isContentType(String contentType, String expected) {
if (contentType == null) {
return false;
}
if (contentType.equalsIgnoreCase(expected)) {
return true;
}
int semiColonPos = contentType.indexOf(";");
if (semiColonPos > 0) {
return contentType.substring(0, semiColonPos).equalsIgnoreCase(expected);
}
return false;
}
}

View File

@ -13,6 +13,8 @@ limitations under the License.
package io.dapr.utils;
import com.fasterxml.jackson.databind.JavaType;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
@ -32,6 +34,8 @@ public abstract class TypeRef<T> {
public static final TypeRef<Long> LONG = new TypeRef<Long>(long.class) {};
public static final TypeRef<Short> SHORT = new TypeRef<Short>(short.class) {};
public static final TypeRef<Character> CHAR = new TypeRef<Character>(char.class) {};
public static final TypeRef<Byte> BYTE = new TypeRef<Byte>(byte.class) {};
@ -141,4 +145,25 @@ public abstract class TypeRef<T> {
return new TypeRef<T>(type) {};
}
/**
* Checks if the given TypeRef is of a primitive type
* Similar to implementation of deserializePrimitives in the class {@link io.dapr.client.ObjectSerializer}
* It considers only those types as primitives.
* @param typeRef Type to be referenced.
* @param <T> Type to be referenced.
* @return truth value of whether the given type ref is a primitive reference or not.
*/
public static <T> boolean isPrimitive(TypeRef<T> typeRef) {
if (typeRef == null) {
return false;
}
if (BOOLEAN.equals(typeRef) || CHAR.equals(typeRef)
|| INT.equals(typeRef) || FLOAT.equals(typeRef)
|| LONG.equals(typeRef) || DOUBLE.equals(typeRef)
|| SHORT.equals(typeRef) || BYTE.equals(typeRef)) {
return true;
}
return false;
}
}

View File

@ -120,7 +120,7 @@ public class DaprClientHttpTest {
}
@Test
public void publishEventInvokation() {
public void publishEventInvocation() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/publish/mypubsubname/A")
.respond(EXPECTED_RESULT);
@ -167,6 +167,16 @@ public class DaprClientHttpTest {
daprClientHttp.publishEvent("mypubsubname", "", event).block());
}
@Test
public void publishEventIfPubsubIsNullOrEmpty() {
String event = "{ \"message\": \"This is a test\" }";
assertThrows(IllegalArgumentException.class, () ->
daprClientHttp.publishEvent(null, "A", event).block());
assertThrows(IllegalArgumentException.class, () ->
daprClientHttp.publishEvent("", "A", event).block());
}
@Test
public void publishEventNoHotMono() {
mockInterceptor.addRule()

View File

@ -17,30 +17,38 @@ package io.dapr.client;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.GetConfigurationRequest;
import io.dapr.client.domain.QueryStateItem;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.client.domain.query.Query;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.v1.CommonProtos;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.stubbing.Answer;
import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -67,6 +75,10 @@ public class DaprPreviewClientGrpcTest {
private static final String CONFIG_STORE_NAME = "MyConfigStore";
private static final String QUERY_STORE_NAME = "testQueryStore";
private static final String PUBSUB_NAME = "testPubsub";
private static final String TOPIC_NAME = "testTopic";
private Closeable closeable;
private DaprGrpc.DaprStub daprStub;
private DaprPreviewClient previewClient;
@ -88,6 +100,201 @@ public class DaprPreviewClientGrpcTest {
verifyNoMoreInteractions(closeable);
}
@Test
public void publishEventsExceptionThrownTest() {
doAnswer((Answer<Void>) invocation -> {
throw newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument");
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
assertThrowsDaprException(
StatusRuntimeException.class,
"INVALID_ARGUMENT",
"INVALID_ARGUMENT: bad bad argument",
() -> previewClient.publishEvents(new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.EMPTY_LIST)).block());
}
@Test
public void publishEventsCallbackExceptionThrownTest() {
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onError(newStatusRuntimeException("INVALID_ARGUMENT", "bad bad argument"));
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
assertThrowsDaprException(
ExecutionException.class,
"INVALID_ARGUMENT",
"INVALID_ARGUMENT: bad bad argument",
() -> previewClient.publishEvents(new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.EMPTY_LIST)).block());
}
@Test(expected = IllegalArgumentException.class)
public void publishEventsContentTypeMismatchException() throws IOException {
DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class);
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
BulkPublishEntry<String> entry = new BulkPublishEntry<>("1", "testEntry"
, "application/octet-stream", null);
BulkPublishRequest<String> wrongReq = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
previewClient.publishEvents(wrongReq).block();
}
@Test
public void publishEventsSerializeException() throws IOException {
DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class);
previewClient = new DaprClientGrpc(closeable, daprStub, mockSerializer, new DefaultObjectSerializer());
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).publishEvent(any(DaprProtos.PublishEventRequest.class), any());
BulkPublishEntry<Map<String, String>> entry = new BulkPublishEntry<>("1", new HashMap<>(),
"application/json", null);
BulkPublishRequest<Map<String, String>> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
when(mockSerializer.serialize(any())).thenThrow(IOException.class);
Mono<BulkPublishResponse<Map<String, String>>> result = previewClient.publishEvents(req);
assertThrowsDaprException(
IOException.class,
"UNKNOWN",
"UNKNOWN: ",
() -> result.block());
}
@Test
public void publishEventsTest() {
doAnswer((Answer<BulkPublishResponse>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder();
observer.onNext(builder.build());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
BulkPublishEntry<String> entry = new BulkPublishEntry<>("1", "test",
"text/plain", null);
BulkPublishRequest<String> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
Mono<BulkPublishResponse<String>> result = previewClient.publishEvents(req);
BulkPublishResponse res = result.block();
Assert.assertNotNull(res);
assertEquals("expected no entry in failed entries list", 0, res.getFailedEntries().size());
}
@Test
public void publishEventsWithoutMetaTest() {
doAnswer((Answer<BulkPublishResponse>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder();
observer.onNext(builder.build());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
Mono<BulkPublishResponse<String>> result = previewClient.publishEvents(PUBSUB_NAME, TOPIC_NAME,
"text/plain", Collections.singletonList("test"));
BulkPublishResponse<String> res = result.block();
Assert.assertNotNull(res);
assertEquals("expected no entries in failed entries list", 0, res.getFailedEntries().size());
}
@Test
public void publishEventsWithRequestMetaTest() {
doAnswer((Answer<BulkPublishResponse>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
DaprProtos.BulkPublishResponse.Builder builder = DaprProtos.BulkPublishResponse.newBuilder();
observer.onNext(builder.build());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(any(DaprProtos.BulkPublishRequest.class), any());
Mono<BulkPublishResponse<String>> result = previewClient.publishEvents(PUBSUB_NAME, TOPIC_NAME,
"text/plain", new HashMap<String, String>(){{
put("ttlInSeconds", "123");
}}, Collections.singletonList("test"));
BulkPublishResponse<String> res = result.block();
Assert.assertNotNull(res);
assertEquals("expected no entry in failed entries list", 0, res.getFailedEntries().size());
}
@Test
public void publishEventsObjectTest() {
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(ArgumentMatchers.argThat(bulkPublishRequest -> {
DaprProtos.BulkPublishRequestEntry entry = bulkPublishRequest.getEntries(0);
if (!"application/json".equals(bulkPublishRequest.getEntries(0).getContentType())) {
return false;
}
if (!"{\"id\":1,\"value\":\"Event\"}".equals(new String(entry.getEvent().toByteArray())) &&
!"{\"value\":\"Event\",\"id\":1}".equals(new String(entry.getEvent().toByteArray()))) {
return false;
}
return true;
}), any());
DaprClientGrpcTest.MyObject event = new DaprClientGrpcTest.MyObject(1, "Event");
BulkPublishEntry<DaprClientGrpcTest.MyObject> entry = new BulkPublishEntry<>("1", event,
"application/json", null);
BulkPublishRequest<DaprClientGrpcTest.MyObject> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
BulkPublishResponse<DaprClientGrpcTest.MyObject> result = previewClient.publishEvents(req).block();
Assert.assertNotNull(result);
Assert.assertEquals("expected no entries to be failed", 0, result.getFailedEntries().size());
}
@Test
public void publishEventsContentTypeOverrideTest() {
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];
observer.onNext(DaprProtos.BulkPublishResponse.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).bulkPublishEventAlpha1(ArgumentMatchers.argThat(bulkPublishRequest -> {
DaprProtos.BulkPublishRequestEntry entry = bulkPublishRequest.getEntries(0);
if (!"application/json".equals(entry.getContentType())) {
return false;
}
if (!"\"hello\"".equals(new String(entry.getEvent().toByteArray()))) {
return false;
}
return true;
}), any());
BulkPublishEntry<String> entry = new BulkPublishEntry<>("1", "hello",
"", null);
BulkPublishRequest<String> req = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME,
Collections.singletonList(entry));
BulkPublishResponse<String> result = previewClient.publishEvents(req).block();
Assert.assertNotNull(result);
Assert.assertEquals("expected no entries to be failed", 0, result.getFailedEntries().size());
}
@Test
public void getConfigurationTestErrorScenario() {
assertThrows(IllegalArgumentException.class, () -> {
@ -430,4 +637,8 @@ public class DaprPreviewClientGrpcTest {
}
return it.build();
}
private static StatusRuntimeException newStatusRuntimeException(String status, String message) {
return new StatusRuntimeException(Status.fromCode(Status.Code.valueOf(status)).withDescription(message));
}
}

View File

@ -13,33 +13,29 @@ limitations under the License.
package io.dapr.client;
import io.dapr.client.domain.*;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.client.domain.query.Query;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.TypeRef;
import io.dapr.v1.DaprProtos;
import io.grpc.stub.StreamObserver;
import okhttp3.OkHttpClient;
import okhttp3.mock.Behavior;
import okhttp3.mock.MockInterceptor;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
public class DaprPreviewClientHttpTest {
private static final String CONFIG_STORE_NAME = "MyConfigStore";

View File

@ -0,0 +1,43 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertNull;
public class BulkPublishRequestTest {
@Test
public void testSetMetadata() {
BulkPublishRequest<String> request = new BulkPublishRequest<>("testPubsub", "testTopic", Collections.emptyList());
// Null check
request.setMetadata(null);
assertNull(request.getMetadata());
// Modifiability check
Map<String, String> metadata = new HashMap<>();
metadata.put("test", "testval");
request.setMetadata(metadata);
Map<String, String> initial = request.getMetadata();
request.setMetadata(metadata);
Assert.assertNotSame("Should not be same map", request.getMetadata(), initial);
}
}

View File

@ -0,0 +1,238 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.utils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.domain.CloudEvent;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
public class DefaultContentTypeConverterTest {
// same as default serializer config
private static final ObjectMapper MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
@Test
public void testToBytesHttpStringEventCorrectContentType() throws IOException {
String event = "string event";
byte[] res = DefaultContentTypeConverter.convertEventToBytesForHttp(event, "text/plain");
Assert.assertNotNull("expected correct byte array response", res);
byte[] expected = event.getBytes(StandardCharsets.UTF_8);
Assert.assertArrayEquals("expected response to be matched with expectation", expected, res);
}
@Test
public void testToBytesHttpNumberEventCorrectContentType() throws IOException {
Number event = 123;
byte[] res = DefaultContentTypeConverter.convertEventToBytesForHttp(event, "text/plain");
Assert.assertNotNull("expected correct byte array response", res);
byte[] expected = "123".getBytes(StandardCharsets.UTF_8);
Assert.assertArrayEquals("expected response to be matched with expectation", expected, res);
}
@Test
public void testToBytesHttpBinEventCorrectContentType() throws IOException {
String event = "string event";
byte[] data = event.getBytes(StandardCharsets.UTF_8);
byte[] res = DefaultContentTypeConverter.convertEventToBytesForHttp(data, "application/octet-stream");
Assert.assertNotNull("expected correct byte array response", res);
byte[] expected = Base64.getEncoder().encode(data);
Assert.assertArrayEquals("expected response to be matched with expectation", expected, res);
}
@Test(expected = IllegalArgumentException.class)
public void testToBytesHttpBinEventInCorrectContentType() throws IOException {
String event = "string event";
DefaultContentTypeConverter.convertEventToBytesForHttp(event, "application/octet-stream");
}
@Test
public void testToBytesHttpJsonEventCorrectContentType() throws IOException {
Map<String, String> event = new HashMap<String, String>() {{
put("test1", "val1");
put("test2", "val2");
}};
byte[] res = DefaultContentTypeConverter.convertEventToBytesForHttp(event, "application/json");
Assert.assertNotNull("expected correct byte array response", res);
byte[] expected = MAPPER.writeValueAsBytes(event);
Assert.assertArrayEquals("expected response to be matched with expectation", expected, res);
}
@Test(expected = IllegalArgumentException.class)
public void testToBytesHttpJsonEventInCorrectContentType() throws IOException {
Map<String, String> event = new HashMap<String, String>() {{
put("test1", "val1");
put("test2", "val2");
}};
DefaultContentTypeConverter.convertEventToBytesForHttp(event, "application/xml");
}
@Test
public void testToBytesHttpCloudEventCorrectContentType() throws IOException {
// Make sure that the MAPPER is configured same as the DefaultObjectSerializer config
CloudEvent<String> event = new CloudEvent<>();
event.setType("test");
event.setId("id 1");
event.setSpecversion("v1");
event.setData("test data");
event.setDatacontenttype("text/plain");
event.setSource("dapr test");
byte[] res = DefaultContentTypeConverter.convertEventToBytesForHttp(event, "application/cloudevents+json");
Assert.assertNotNull("expected correct byte array response", res);
byte[] expected = MAPPER.writeValueAsBytes(event);
Assert.assertArrayEquals("expected response to be matched with expectation", expected, res);
}
@Test(expected = IllegalArgumentException.class)
public void testToBytesHttpCloudEventInCorrectContentType() throws IOException {
// Make sure that the MAPPER is configured same as the DefaultObjectSerializer config
CloudEvent<String> event = new CloudEvent<>();
event.setType("test");
event.setId("id 1");
event.setSpecversion("v1");
event.setData("test data");
event.setDatacontenttype("text/plain");
event.setSource("dapr test");
DefaultContentTypeConverter.convertEventToBytesForHttp(event, "image/png");
}
@Test
public void testToBytesGrpcBinEventCorrectContentType() throws IOException {
byte[] event = "test event".getBytes(StandardCharsets.UTF_8);
byte[] res = DefaultContentTypeConverter.convertEventToBytesForGrpc(event, "application/octet-stream");
Assert.assertNotNull("expected correct byte array response", res);
Assert.assertArrayEquals("expected response to be matched with expectation", event, res);
}
@Test(expected = IllegalArgumentException.class)
public void testToBytesGrpcBinEventInCorrectContentType() throws IOException {
byte[] event = "test event".getBytes(StandardCharsets.UTF_8);
DefaultContentTypeConverter.convertEventToBytesForGrpc(event, "application/xml");
}
@Test
public void testToBytesGrpcStringEventCorrectContentType() throws IOException {
String event = "string event";
byte[] res = DefaultContentTypeConverter.convertEventToBytesForGrpc(event, "text/plain");
Assert.assertNotNull("expected correct byte array response", res);
byte[] expected = event.getBytes(StandardCharsets.UTF_8);
Assert.assertArrayEquals("expected response to be matched with expectation", expected, res);
}
@Test
public void testToEventHttpStringDataCorrectContentType() throws IOException {
byte[] event = "string event".getBytes(StandardCharsets.UTF_8);
String res = DefaultContentTypeConverter.convertBytesToEventFromHttp(event,
"text/plain", TypeRef.STRING);
Assert.assertNotNull("expected not null response", res);
Assert.assertEquals("expected res to match expectation", "string event", res);
}
@Test
public void testToEventHttpBinDataCorrectContentType() throws IOException {
byte[] expected = "string event".getBytes(StandardCharsets.UTF_8);
byte[] event = Base64.getEncoder().encode(expected);
byte[] res = DefaultContentTypeConverter.convertBytesToEventFromHttp(event,
"application/octet-stream", TypeRef.BYTE_ARRAY);
Assert.assertNotNull("expected not null response", res);
Assert.assertArrayEquals("expected res to match expectation", expected, res);
}
@Test
public void testToEventGrpcBinDataCorrectContentType() throws IOException {
byte[] expected = "string event".getBytes(StandardCharsets.UTF_8);
byte[] res = DefaultContentTypeConverter.convertBytesToEventFromGrpc(expected,
"application/octet-stream", TypeRef.BYTE_ARRAY);
Assert.assertNotNull("expected not null response", res);
Assert.assertArrayEquals("expected res to match expectation", expected, res);
}
@Test(expected = IllegalArgumentException.class)
public void testToBytesGrpcBinDataInCorrectContentType() throws IOException {
String event = "string event";
DefaultContentTypeConverter.convertEventToBytesForGrpc(event,
"application/octet-stream");
}
@Test
public void testToEventGrpcStringDataCorrectContentType() throws IOException {
byte[] expected = "string event".getBytes(StandardCharsets.UTF_8);
String res = DefaultContentTypeConverter.convertBytesToEventFromGrpc(expected,
"text/plain", TypeRef.STRING);
Assert.assertNotNull("expected not null response", res);
Assert.assertEquals("expected res to match expectation", "string event", res);
}
@Test
public void testToEventHttpPrimitiveDataCorrectContentType() throws IOException {
Number expected = 123;
byte[] data = DefaultContentTypeConverter.convertEventToBytesForHttp(expected, "text/plain");
Integer res = DefaultContentTypeConverter.convertBytesToEventFromHttp(data,
"text/plain", TypeRef.INT);
Assert.assertNotNull("expected not null response", res);
Assert.assertEquals("expected res to match expectation", expected, res);
}
@Test
public void testToEventHttpCEDataCorrectContentType() throws IOException {
CloudEvent<String> event = new CloudEvent<>();
event.setType("test");
event.setId("id 1");
event.setSpecversion("v1");
event.setData("test data");
event.setDatacontenttype("text/plain");
event.setSource("dapr test");
byte[] data = DefaultContentTypeConverter.convertEventToBytesForHttp(event, "application/cloudevents+json");
CloudEvent<String> res = DefaultContentTypeConverter.convertBytesToEventFromHttp(data,
"application/cloudevents+json", new TypeRef<CloudEvent<String>>() {
});
Assert.assertNotNull("expected not null response", res);
Assert.assertEquals("expected res to match expectation", event, res);
}
@Test(expected = IllegalArgumentException.class)
public void testToEventHttpBinDataInCorrectContentType() throws IOException {
byte[] data = "string event".getBytes(StandardCharsets.UTF_8);
byte[] event = Base64.getEncoder().encode(data);
DefaultContentTypeConverter.convertBytesToEventFromHttp(event,
"text/plain", TypeRef.BYTE_ARRAY);
}
@Test(expected = IllegalArgumentException.class)
public void testToEventHttpBinDataNullCorrectContentType() throws IOException {
byte[] data = "string event".getBytes(StandardCharsets.UTF_8);
byte[] event = Base64.getEncoder().encode(data);
DefaultContentTypeConverter.convertBytesToEventFromHttp(event,
null, TypeRef.BYTE_ARRAY);
}
@Test(expected = IllegalArgumentException.class)
public void testToEventHttpBinDataCharsetInCorrectContentType() throws IOException {
byte[] data = "string event".getBytes(StandardCharsets.UTF_8);
byte[] event = Base64.getEncoder().encode(data);
DefaultContentTypeConverter.convertBytesToEventFromHttp(event,
"text/plain;charset=utf-8", TypeRef.BYTE_ARRAY);
}
}

View File

@ -0,0 +1,21 @@
package io.dapr.utils;
import org.junit.Assert;
import org.junit.Test;
public class TypeRefTest {
@Test
public void testTypeRefIsPrimitive() {
Assert.assertTrue("expected this to be true as boolean is primitive", TypeRef.isPrimitive(TypeRef.BOOLEAN));
Assert.assertTrue("expected this to be true as short is primitive", TypeRef.isPrimitive(TypeRef.SHORT));
Assert.assertTrue("expected this to be true as float is primitive", TypeRef.isPrimitive(TypeRef.FLOAT));
Assert.assertTrue("expected this to be true as double is primitive", TypeRef.isPrimitive(TypeRef.DOUBLE));
Assert.assertTrue("expected this to be true as integer is primitive", TypeRef.isPrimitive(TypeRef.INT));
Assert.assertFalse("expected this to be false as string is not primitive",
TypeRef.isPrimitive(TypeRef.STRING));
Assert.assertFalse("expected this to be false as string array is not primitive",
TypeRef.isPrimitive(TypeRef.STRING_ARRAY));
}
}