Bi-direction subscription (#1124)

* Bi-di subscription.

Add bidi subscription to validate workflow.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Make bi-di subscriber to use Mono

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
This commit is contained in:
Artur Souza 2024-10-07 17:00:41 -07:00 committed by GitHub
parent 7490434dde
commit cb552ba668
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 838 additions and 613 deletions

View File

@ -160,3 +160,7 @@ jobs:
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/workflows/README.md
- name: Validate streaming subscription example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/pubsub/stream/README.md

View File

@ -102,8 +102,10 @@ Try the following examples to learn more about Dapr's Java SDK:
* [Invoking a Grpc service](./examples/src/main/java/io/dapr/examples/invoke/grpc)
* [State management](./examples/src/main/java/io/dapr/examples/state)
* [PubSub with subscriber](./examples/src/main/java/io/dapr/examples/pubsub/)
* [PubSub with streaming subscription](./examples/src/main/java/io/dapr/examples/pubsub/stream/)
* [Binding with input over Http](./examples/src/main/java/io/dapr/examples/bindings/http)
* [Actors](./examples/src/main/java/io/dapr/examples/actors/)
* [Workflows](./examples/src/main/java/io/dapr/examples/workflows/)
* [Secrets management](./examples/src/main/java/io/dapr/examples/secrets)
* [Configuration](./examples/src/main/java/io/dapr/examples/configuration)
* [Distributed tracing with OpenTelemetry SDK](./examples/src/main/java/io/dapr/examples/tracing)

View File

@ -1,541 +0,0 @@
# Dapr Workflow Sample
In this example, we'll use Dapr to test workflow features.
Visit [the Workflow documentation landing page](https://docs.dapr.io/developing-applications/building-blocks/workflow) for more information.
This example contains the follow classes:
* DemoWorkflow: An example of a Dapr Workflow.
* DemoWorkflowClient: This application will start workflows using Dapr.
* DemoWorkflowWorker: An application that registers a workflow to the Dapr workflow runtime engine. It also executes the workflow instance.
## Pre-requisites
* [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/).
* Java JDK 11 (or greater):
* [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11)
* [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11)
* [OpenJDK 11](https://jdk.java.net/11/)
* [Apache Maven](https://maven.apache.org/install.html) version 3.x.
### Checking out the code
Clone this repository:
```sh
git clone https://github.com/dapr/java-sdk.git
cd java-sdk
```
Then build the Maven project:
```sh
# make sure you are in the `java-sdk` directory.
mvn install
```
Get into the `examples` directory.
```sh
cd examples
```
### Initialize Dapr
Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized.
## Patterns
Those examples contain the following workflow patterns:
1. [Chaining Pattern](#chaining-pattern)
2. [Fan-out/Fan-in Pattern](#fan-outfan-in-pattern)
3. [Continue As New Pattern](#continue-as-new-pattern)
4. [External Event Pattern](#external-event-pattern)
5. [Sub-workflow Pattern](#sub-workflow-pattern)
### Chaining Pattern
In the chaining pattern, a sequence of activities executes in a specific order.
In this pattern, the output of one activity is applied to the input of another activity.
The chaining pattern is useful when you need to execute a sequence of activities in a specific order.
The first Java class is `DemoChainWorker`. Its job is to register an implementation of `DemoChainWorkflow` in Dapr's workflow runtime engine. In the `DemoChainWorker.java` file, you will find the `DemoChainWorker` class and the `main` method. See the code snippet below:
```java
public class DemoChainWorker {
/**
* The main method of this app.
*
* @param args The port the app will listen on.
* @throws Exception An Exception.
*/
public static void main(String[] args) throws Exception {
// Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoChainWorkflow.class);
builder.registerActivity(ToUpperCaseActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
System.out.println("Start workflow runtime");
runtime.start();
}
System.exit(0);
}
}
```
The second Java class you want to look at is `DemoChainWorkflow`, it defines the workflow. In this example it chains the activites in order. See the code snippet below:
```java
public class DemoChainWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
String result = "";
result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Tokyo", String.class).await() + ", ";
result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "London", String.class).await() + ", ";
result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Seattle", String.class).await();
ctx.getLogger().info("Workflow finished with result: " + result);
ctx.complete(result);
};
}
}
```
The next Java class you want to look at is `ToUpperCaseActivity`, it defines the logics for a single acitvity, in this case, it converts a string to upper case. See the code snippet below:
```java
public class ToUpperCaseActivity implements WorkflowActivity {
@Override
public String run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class);
logger.info("Starting Chaining Activity: " + ctx.getName());
var message = ctx.getInput(String.class);
var newMessage = message.toUpperCase();
logger.info("Message Received from input: " + message);
logger.info("Sending message to output: " + newMessage);
logger.info("Activity returned: " + newMessage);
logger.info("Activity finished");
return newMessage;
}
}
```
<!-- STEP
name: Run Chaining Pattern workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
- 'Starting Workflow: io.dapr.examples.workflows.chain.DemoChainWorkflow'
- 'Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity'
- 'Message Received from input: Tokyo'
- 'Sending message to output: TOKYO'
- 'Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity'
- 'Message Received from input: London'
- 'Sending message to output: LONDON'
- 'Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity'
- 'Message Received from input: Seattle'
- 'Sending message to output: SEATTLE'
- 'Workflow finished with result: TOKYO, LONDON, SEATTLE'
background: true
sleep: 60
timeout_seconds: 60
-->
Execute the following script in order to run DemoChainWorker:
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainWorker
```
Once running, the logs will start displaying the different steps: First, you can see workflow is starting:
```text
== APP == Start workflow runtime
== APP == Nov 07, 2023 11:03:07 AM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock
== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001.
```
Then, execute the following script in order to run DemoChainClient:
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainClient
```
<!-- END_STEP -->
Now you can see the worker logs showing the acitvity is invoked in sequnce and the status of each activity:
```text
== APP == 2023-11-07 11:03:14,178 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.chain.DemoChainWorkflow
== APP == 2023-11-07 11:03:14,229 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity
== APP == 2023-11-07 11:03:14,235 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: Tokyo
== APP == 2023-11-07 11:03:14,235 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: TOKYO
== APP == 2023-11-07 11:03:14,266 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity
== APP == 2023-11-07 11:03:14,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: London
== APP == 2023-11-07 11:03:14,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: LONDON
== APP == 2023-11-07 11:03:14,282 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity
== APP == 2023-11-07 11:03:14,282 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: Seattle
== APP == 2023-11-07 11:03:14,283 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: SEATTLE
== APP == 2023-11-07 11:03:14,298 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Workflow finished with result: TOKYO, LONDON, SEATTLE
```
and the client logs showing the workflow is started and finished with expected result:
```text
Started a new chaining model workflow with instance ID: 6e4fe69b-689b-4998-b095-d6b52c7d6328
workflow instance with ID: 6e4fe69b-689b-4998-b095-d6b52c7d6328 completed with result: TOKYO, LONDON, SEATTLE
```
### Fan-out/Fan-in Pattern
In the fan out/fan in pattern, you execute multiple activities in parallel and then wait for all activities to finish. Often, some aggregation work is done on the results that are returned from the activities.
The `DemoFanInOutWorkflow` class defines the workflow. In this example it executes the activities in parallel and then sums the results. See the code snippet below:
```java
public class DemoFanInOutWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
// The input is a list of objects that need to be operated on.
// In this example, inputs are expected to be strings.
List<?> inputs = ctx.getInput(List.class);
// Fan-out to multiple concurrent activity invocations, each of which does a word count.
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity(CountWordsActivity.class.getName(), input.toString(), Integer.class))
.collect(Collectors.toList());
// Fan-in to get the total word count from all the individual activity results.
List<Integer> allWordCountResults = ctx.allOf(tasks).await();
int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
ctx.getLogger().info("Workflow finished with result: " + totalWordCount);
// Save the final result as the orchestration output.
ctx.complete(totalWordCount);
};
}
}
```
The `CountWordsActivity` class defines the logics for a single acitvity, in this case, it counts the words in a string. See the code snippet below:
```java
public class CountWordsActivity implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class);
logger.info("Starting Activity: " + ctx.getName());
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
int result = tokenizer.countTokens();
logger.info("Activity returned: " + result);
logger.info("Activity finished");
return result;
}
}
```
<!-- STEP
name: Run Chaining Pattern workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
- 'Activity returned: 2'
- 'Activity returned: 9'
- 'Activity returned: 21'
- 'Activity returned: 17'
- 'Workflow finished with result: 60'
background: true
sleep: 60
timeout_seconds: 60
-->
Execute the following script in order to run DemoFanInOutWorker:
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutWorker
```
Execute the following script in order to run DemoFanInOutClient:
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutClient
```
<!-- END_STEP -->
Now you can see the logs from worker:
```text
== APP == 2023-11-07 14:52:03,075 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.faninout.DemoFanInOutWorkflow
== APP == 2023-11-07 14:52:03,144 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
== APP == 2023-11-07 14:52:03,147 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 2
== APP == 2023-11-07 14:52:03,148 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 9
== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 21
== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 17
== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
== APP == 2023-11-07 14:52:03,173 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
== APP == 2023-11-07 14:52:03,173 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 11
== APP == 2023-11-07 14:52:03,174 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
== APP == 2023-11-07 14:52:03,212 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Workflow finished with result: 60
```
and the client:
```text
Started a new fan out/fan in model model workflow with instance ID: 092c1928-b5dd-4576-9468-300bf6aed986
workflow instance with ID: 092c1928-b5dd-4576-9468-300bf6aed986 completed with result: 60
```
### Continue As New Pattern
`ContinueAsNew` API allows you to restart the workflow with a new input.
The `DemoContinueAsNewWorkflow` class defines the workflow. It simulates periodic cleanup work that happen every 10 seconds, after previous cleanup has finished. See the code snippet below:
```java
public class DemoContinueAsNewWorkflow extends Workflow {
/*
Compared with a CRON schedule, this periodic workflow example will never overlap.
For example, a CRON schedule that executes a cleanup every hour will execute it at 1:00, 2:00, 3:00 etc.
and could potentially run into overlap issues if the cleanup takes longer than an hour.
In this example, however, if the cleanup takes 30 minutes, and we create a timer for 1 hour between cleanups,
then it will be scheduled at 1:00, 2:30, 4:00, etc. and there is no chance of overlap.
*/
@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
ctx.getLogger().info("call CleanUpActivity to do the clean up");
ctx.callActivity(CleanUpActivity.class.getName()).await();
ctx.getLogger().info("CleanUpActivity finished");
ctx.getLogger().info("wait 10 seconds for next clean up");
ctx.createTimer(Duration.ofSeconds(10)).await();
// continue the workflow.
ctx.continueAsNew(null);
};
}
}
```
The `CleanUpActivity` class defines the logics for a single acitvity, in this case, it simulates a clean up work. See the code snippet below:
```java
public class CleanUpActivity implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(CleanUpActivity.class);
logger.info("Starting Activity: " + ctx.getName());
logger.info("start clean up work, it may take few seconds to finish...");
//Sleeping for 5 seconds to simulate long running operation
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "clean up finish.";
}
}
```
Once you start the workflow and client using the following commands:
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorker
```
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewClient
````
You will see the logs from worker showing the `CleanUpActivity` is invoked every 10 seconds after previous one is finished:
```text
== APP == 2023-11-07 14:44:42,004 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow
== APP == 2023-11-07 14:44:42,004 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up
== APP == 2023-11-07 14:44:42,009 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity
== APP == 2023-11-07 14:44:42,009 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish...
== APP == 2023-11-07 14:44:47,026 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow
== APP == 2023-11-07 14:44:47,026 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up
== APP == 2023-11-07 14:44:47,030 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished
== APP == 2023-11-07 14:44:47,030 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up
== APP == 2023-11-07 14:44:47,033 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity
== APP == 2023-11-07 14:44:47,033 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish...
== APP == 2023-11-07 14:44:52,053 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished
== APP == 2023-11-07 14:44:52,053 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up
== APP == 2023-11-07 14:44:57,006 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow
== APP == 2023-11-07 14:44:57,006 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up
== APP == 2023-11-07 14:44:57,012 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity
== APP == 2023-11-07 14:44:57,012 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish...
== APP == 2023-11-07 14:45:02,017 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow
== APP == 2023-11-07 14:45:02,020 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up
== APP == 2023-11-07 14:45:02,021 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished
== APP == 2023-11-07 14:45:02,021 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up
...
```
and the client:
```text
Started a new continue-as-new model workflow with instance ID: c853fb93-f0e7-4ad7-ad41-385732386f94
```
It will continue to run until you stop the worker.
### External Event Pattern
In the external event pattern, a workflow is started by an external event. The workflow can then wait for other external events to occur before completing.
The `DemoExternalEventWorkflow` class defines the workflow. It waits for an external event `Approval` to run the corresponding activity. See the code snippet below:
```java
public class DemoExternalEventWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
Boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await();
if (approved) {
ctx.getLogger().info("approval granted - do the approved action");
ctx.callActivity(ApproveActivity.class.getName()).await();
ctx.getLogger().info("approval-activity finished");
} else {
ctx.getLogger().info("approval denied - send a notification");
ctx.callActivity(DenyActivity.class.getName()).await();
ctx.getLogger().info("denied-activity finished");
}
};
}
}
```
In the `DemoExternalEventClient` class we send out Approval event to tell our workflow to run the approved activity.
```java
client.raiseEvent(instanceId, "Approval", true);
```
Start the workflow and client using the following commands:
ex
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker
```
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventClient
```
The worker logs:
```text
== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow
== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Waiting for approval...
== APP == 2023-11-07 16:01:23,324 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval granted - do the approved action
== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity
== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Running approval activity...
== APP == 2023-11-07 16:01:28,410 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval-activity finished
```
The client log:
```text
Started a new external-event model workflow with instance ID: 23410d96-1afe-4698-9fcd-c01c1e0db255
workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed.
```
### Sub-workflow Pattern
The sub-workflow pattern allows you to call a workflow from another workflow.
The `DemoWorkflow` class defines the workflow. It calls a sub-workflow `DemoSubWorkflow` to do the work. See the code snippet below:
```java
public class DemoWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
var subWorkflowInput = "Hello Dapr Workflow!";
ctx.getLogger().info("calling subworkflow with input: " + subWorkflowInput);
var subWorkflowOutput =
ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), subWorkflowInput, String.class).await();
ctx.getLogger().info("subworkflow finished with: " + subWorkflowOutput);
};
}
}
```
The `DemoSubWorkflow` class defines the sub-workflow. It call the activity to do the work and returns the result. See the code snippet below:
```java
public class DemoSubWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting SubWorkflow: " + ctx.getName());
var subWorkflowInput = ctx.getInput(String.class);
ctx.getLogger().info("SubWorkflow received input: " + subWorkflowInput);
ctx.getLogger().info("SubWorkflow is calling Activity: " + ReverseActivity.class.getName());
String result = ctx.callActivity(ReverseActivity.class.getName(), subWorkflowInput, String.class).await();
ctx.getLogger().info("SubWorkflow finished with: " + result);
ctx.complete(result);
};
}
}
```
The `ReverseActivity` class defines the logics for a single acitvity, in this case, it reverses a string. See the code snippet below:
```java
public class ReverseActivity implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(ReverseActivity.class);
logger.info("Starting Activity: " + ctx.getName());
var message = ctx.getInput(String.class);
var newMessage = new StringBuilder(message).reverse().toString();
logger.info("Message Received from input: " + message);
logger.info("Sending message to output: " + newMessage);
logger.info("Activity returned: " + newMessage);
logger.info("Activity finished");
return newMessage;
}
}
```
Start the workflow and client using the following commands:
ex
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker
```
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkerflowClient
```
The log from worker:
```text
== APP == 2023-11-07 20:08:52,521 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.subworkflow.DemoWorkflow
== APP == 2023-11-07 20:08:52,523 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - calling subworkflow with input: Hello Dapr Workflow!
== APP == 2023-11-07 20:08:52,561 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting SubWorkflow: io.dapr.examples.workflows.subworkflow.DemoSubWorkflow
== APP == 2023-11-07 20:08:52,566 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow received input: Hello Dapr Workflow!
== APP == 2023-11-07 20:08:52,566 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow is calling Activity: io.dapr.examples.workflows.subworkflow.ReverseActivity
== APP == 2023-11-07 20:08:52,576 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Starting Activity: io.dapr.examples.workflows.subworkflow.ReverseActivity
== APP == 2023-11-07 20:08:52,577 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Message Received from input: Hello Dapr Workflow!
== APP == 2023-11-07 20:08:52,577 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Sending message to output: !wolfkroW rpaD olleH
== APP == 2023-11-07 20:08:52,596 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow finished with: !wolfkroW rpaD olleH
== APP == 2023-11-07 20:08:52,611 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - subworkflow finished with: !wolfkroW rpaD olleH
```
The log from client:
```text
Started a new sub-workflow model workflow with instance ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb
workflow instance with ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb completed with result: !wolfkroW rpaD olleH
```

View File

@ -0,0 +1,122 @@
# Dapr Streaming Subscription Sample
In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subscribe pattern. The publisher will generate messages of a specific topic, while a subscriber will listen for messages of a specific topic via a bi-directional stream. All is abstracted by the SDK. See the [Dapr Pub-Sub docs](https://docs.dapr.io/developing-applications/building-blocks/pubsub/) to understand when this pattern might be a good choice for your software architecture.
Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-overview/) link for more information about Dapr and Pub-Sub.
## Pub-Sub Sample using the Java-SDK
This sample shows how the subscription to events no longer requires the application to listen to an HTTP or gRPC port. This example uses Redis Streams (enabled in Redis versions => 5).
## Pre-requisites
* [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/).
* Java JDK 11 (or greater):
* [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11)
* [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11)
* [OpenJDK 11](https://jdk.java.net/11/)
* [Apache Maven](https://maven.apache.org/install.html) version 3.x.
### Checking out the code
Clone this repository:
```sh
git clone https://github.com/dapr/java-sdk.git
cd java-sdk
```
Then build the Maven project:
```sh
# make sure you are in the `java-sdk` directory.
mvn install
```
Then get into the examples directory:
```sh
cd examples
```
### Initialize Dapr
Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized.
### Running the subscriber
The subscriber uses the `DaprPreviewClient` interface to use a new feature where events are subscribed via a streaming and processed via a callback interface.
The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic.
In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the subscriber provides an implementation of the `SubscriptionListener` interface, receiving a `Subscription` object. The `Subscription` object implements the `Closeable` interface and the `close()` method must be used to stop the subscription.
```java
public class Subscriber {
// ...
public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (var client = new DaprClientBuilder().buildPreviewClient()) {
var subscription = client.subscribeToEvents(
PUBSUB_NAME,
topicName,
new SubscriptionListener<>() {
@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
System.out.println("Subscriber got: " + event.getData());
return Mono.just(Status.SUCCESS);
}
@Override
public void onError(RuntimeException exception) {
System.out.println("Subscriber got exception: " + exception.getMessage());
}
},
TypeRef.STRING);
subscription.awaitTermination();
}
}
// ...
}
```
Execute the following command to run the Subscriber example:
<!-- STEP
name: Run Subscriber
expected_stdout_lines:
- '== APP == Subscriber got: This is message #0'
- '== APP == Subscriber got: This is message #1'
background: true
sleep: 30
-->
```bash
dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.Subscriber
```
<!-- END_STEP -->
Once the subscriber is running, run the publisher in a new terminal to see the events in the subscriber's side:
<!-- STEP
name: Run Publisher
expected_stdout_lines:
- '== APP == Published message: This is message #0'
- '== APP == Published message: This is message #1'
background: true
sleep: 15
-->
```bash
dapr run --resources-path ./components/pubsub --app-id publisher -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.Publisher
```
<!-- END_STEP -->

View File

@ -0,0 +1,81 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.pubsub.stream;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.SubscriptionListener;
import io.dapr.client.domain.CloudEvent;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;
/**
* Subscriber using bi-directional gRPC streaming, which does not require an app port.
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 3. Run the subscriber:
* dapr run --resources-path ./components/pubsub --app-id subscriber -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.Subscriber
*/
public class Subscriber {
//The title of the topic to be used for publishing
private static final String DEFAULT_TOPIC_NAME = "testingtopic";
//The name of the pubsub
private static final String PUBSUB_NAME = "messagebus";
/**
* This is the entry point for this example app, which subscribes to a topic.
* @param args Used to optionally pass a topic name.
* @throws Exception An Exception on startup.
*/
public static void main(String[] args) throws Exception {
String topicName = getTopicName(args);
try (var client = new DaprClientBuilder().buildPreviewClient()) {
var subscription = client.subscribeToEvents(
PUBSUB_NAME,
topicName,
new SubscriptionListener<>() {
@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
System.out.println("Subscriber got: " + event.getData());
return Mono.just(Status.SUCCESS);
}
@Override
public void onError(RuntimeException exception) {
System.out.println("Subscriber got exception: " + exception.getMessage());
}
},
TypeRef.STRING);
subscription.awaitTermination();
}
}
/**
* If a topic is specified in args, use that.
* Else, fallback to the default topic.
* @param args program arguments
* @return name of the topic to publish messages to.
*/
private static String getTopicName(String[] args) {
if (args.length >= 1) {
return args[0];
}
return DEFAULT_TOPIC_NAME;
}
}

View File

@ -149,6 +149,7 @@ Execute the following script in order to run DemoChainWorker:
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainWorker
```
Once running, the logs will start displaying the different steps: First, you can see workflow is starting:
```text
== APP == Start workflow runtime
@ -162,6 +163,8 @@ java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chai
```
<!-- END_STEP -->
Now you can see the worker logs showing the acitvity is invoked in sequnce and the status of each activity:
```text
== APP == 2023-11-07 11:03:14,178 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.chain.DemoChainWorkflow
@ -237,7 +240,7 @@ public class CountWordsActivity implements WorkflowActivity {
}
```
<!-- STEP
name: Run Fan-in/Fan-out Pattern workflow
name: Run Chaining Pattern workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
@ -255,7 +258,9 @@ Execute the following script in order to run DemoFanInOutWorker:
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutWorker
```
Execute the following script in order to run DemoFanInOutClient:
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutClient
```
@ -343,7 +348,7 @@ public class CleanUpActivity implements WorkflowActivity {
Once you start the workflow and client using the following commands:
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorker
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorker
```
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewClient
@ -413,29 +418,14 @@ client.raiseEvent(instanceId, "Approval", true);
Start the workflow and client using the following commands:
<!-- STEP
name: Run Wait External Event Pattern workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
- 'Starting Workflow: io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow'
- 'Waiting for approval...'
- 'approval granted - do the approved action'
- 'Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity'
- 'Running approval activity...'
- 'approval-activity finished'
background: true
sleep: 60
timeout_seconds: 60
-->
ex
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker
```
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventClient
```
<!-- END_STEP -->
The worker logs:
```text
@ -521,25 +511,14 @@ public class ReverseActivity implements WorkflowActivity {
Start the workflow and client using the following commands:
<!-- STEP
name: Run Sub-workflow Pattern workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
- 'calling subworkflow with input: Hello Dapr Workflow!'
- 'SubWorkflow finished with: !wolfkroW rpaD olleH'
background: true
sleep: 60
timeout_seconds: 60
-->
ex
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker
```
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkerflowClient
```
<!-- END_STEP -->
The log from worker:
```text

View File

@ -17,7 +17,7 @@
<grpc.version>1.64.0</grpc.version>
<protobuf.version>3.25.0</protobuf.version>
<protocCommand>protoc</protocCommand>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.14.0-rc.2/dapr/proto</dapr.proto.baseurl>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.14.4/dapr/proto</dapr.proto.baseurl>
<dapr.sdk.version>1.13.0-SNAPSHOT</dapr.sdk.version>
<dapr.sdk.alpha.version>0.13.0-SNAPSHOT</dapr.sdk.alpha.version>
<os-maven-plugin.version>1.7.1</os-maven-plugin.version>

View File

@ -17,6 +17,7 @@
<description>Auto-generated SDK for Dapr</description>
<properties>
<java.version>17</java.version>
<protobuf.output.directory>${project.build.directory}/generated-sources</protobuf.output.directory>
<protobuf.input.directory>${project.build.directory}/proto</protobuf.input.directory>
<maven.deploy.skip>false</maven.deploy.skip>

View File

@ -17,6 +17,8 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<maven.deploy.skip>true</maven.deploy.skip>
@ -145,6 +147,12 @@
<version>${dapr.sdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.dapr</groupId>
<artifactId>dapr-sdk-springboot</artifactId>
<version>${dapr.sdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.dapr</groupId>
<artifactId>dapr-sdk-workflows</artifactId>

View File

@ -15,7 +15,9 @@ package io.dapr.it;
import com.google.protobuf.Empty;
import io.dapr.actors.client.ActorClient;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.config.Property;
@ -235,6 +237,22 @@ public class DaprRun implements Stoppable {
return appName;
}
public DaprClient newDaprClient() {
return new DaprClientBuilder()
.withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString())
.withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString())
.withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1")
.build();
}
public DaprPreviewClient newDaprPreviewClient() {
return new DaprClientBuilder()
.withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString())
.withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString())
.withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1")
.buildPreviewClient();
}
public void checkRunState(long timeout, boolean shouldBeRunning) throws InterruptedException {
callWithRetry(() -> {
try {

View File

@ -0,0 +1,127 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.pubsub.stream;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.SubscriptionListener;
import io.dapr.client.domain.CloudEvent;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.utils.TypeRef;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import static io.dapr.it.Retry.callWithRetry;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class PubSubStreamIT extends BaseIT {
// Must be a large enough number, so we validate that we get more than the initial batch
// sent by the runtime. When this was first added, the batch size in runtime was set to 10.
private static final int NUM_MESSAGES = 100;
private static final String TOPIC_NAME = "stream-topic";
private static final String PUBSUB_NAME = "messagebus";
private final List<DaprRun> runs = new ArrayList<>();
private DaprRun closeLater(DaprRun run) {
this.runs.add(run);
return run;
}
@AfterEach
public void tearDown() throws Exception {
for (DaprRun run : runs) {
run.stop();
}
}
@Test
public void testPubSub() throws Exception {
final DaprRun daprRun = closeLater(startDaprApp(
this.getClass().getSimpleName(),
60000));
var runId = UUID.randomUUID().toString();
try (DaprClient client = daprRun.newDaprClient();
DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) {
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d on topic %s for run %s", i, TOPIC_NAME, runId);
//Publishing messages
client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block();
System.out.println(
String.format("Published message: '%s' to topic '%s' pubsub_name '%s'", message, TOPIC_NAME, PUBSUB_NAME));
}
System.out.println("Starting subscription for " + TOPIC_NAME);
Set<String> messages = Collections.synchronizedSet(new HashSet<>());
Set<String> errors = Collections.synchronizedSet(new HashSet<>());
var random = new Random(37); // predictable random.
var listener = new SubscriptionListener<String>() {
@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
return Mono.fromCallable(() -> {
// Useful to avoid false negatives running locally multiple times.
if (event.getData().contains(runId)) {
// 5% failure rate.
var decision = random.nextInt(100);
if (decision < 5) {
if (decision % 2 == 0) {
throw new RuntimeException("artificial exception on message " + event.getId());
}
return Status.RETRY;
}
messages.add(event.getId());
return Status.SUCCESS;
}
return Status.DROP;
});
}
@Override
public void onError(RuntimeException exception) {
errors.add(exception.getMessage());
}
};
try(var subscription = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME, listener, TypeRef.STRING)) {
callWithRetry(() -> {
var messageCount = messages.size();
System.out.println(
String.format("Got %d messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME));
assertEquals(NUM_MESSAGES, messages.size());
assertEquals(4, errors.size());
}, 120000); // Time for runtime to retry messages.
subscription.close();
subscription.awaitTermination();
}
}
}
}

View File

@ -137,6 +137,12 @@
<artifactId>grpc-inprocess</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.dapr</groupId>
<artifactId>dapr-sdk-autogen</artifactId>
<version>1.13.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>

View File

@ -23,6 +23,7 @@ import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.ComponentMetadata;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.DaprMetadata;
@ -75,11 +76,11 @@ import io.dapr.v1.DaprProtos.MetadataHTTPEndpoint;
import io.dapr.v1.DaprProtos.PubsubSubscription;
import io.dapr.v1.DaprProtos.PubsubSubscriptionRule;
import io.dapr.v1.DaprProtos.RegisteredComponents;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.Metadata;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
@ -401,6 +402,59 @@ public class DaprClientImpl extends AbstractDaprClient {
}
}
/**
* {@inheritDoc}
*/
@Override
public <T> Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type) {
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest =
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
.setTopic(topic)
.setPubsubName(pubsubName)
.build();
DaprProtos.SubscribeTopicEventsRequestAlpha1 request =
DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
.setInitialRequest(initialRequest)
.build();
return buildSubscription(listener, type, request);
}
@NotNull
private <T> Subscription<T> buildSubscription(
SubscriptionListener<T> listener,
TypeRef<T> type,
DaprProtos.SubscribeTopicEventsRequestAlpha1 request) {
Subscription<T> subscription = new Subscription<>(this.asyncStub, request, listener, response -> {
if (response.getEventMessage() == null) {
return null;
}
var message = response.getEventMessage();
if ((message.getPubsubName() == null) || message.getPubsubName().isEmpty()) {
return null;
}
try {
CloudEvent<T> cloudEvent = new CloudEvent<>();
var object =
DaprClientImpl.this.objectSerializer.deserialize(message.getData().toByteArray(), type);
cloudEvent.setData(object);
cloudEvent.setDatacontenttype(message.getDataContentType());
cloudEvent.setId(message.getId());
cloudEvent.setTopic(message.getTopic());
cloudEvent.setSpecversion(message.getSpecVersion());
cloudEvent.setType(message.getType());
cloudEvent.setPubsubName(message.getPubsubName());
return cloudEvent;
} catch (IOException e) {
throw new RuntimeException(e);
}
});
subscription.start();
return subscription;
}
@Override
public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef<T> type) {
try {

View File

@ -256,4 +256,16 @@ public interface DaprPreviewClient extends AutoCloseable {
* @return Unlock result
*/
Mono<UnlockResponseStatus> unlock(UnlockRequest request);
/**
* Subscribe to pubsub via streaming.
* @param pubsubName Name of the pubsub component.
* @param topic Name of the topic to subscribe to.
* @param listener Callback methods to process events.
* @param type Type for object deserialization.
* @return An active subscription.
* @param <T> Type of object deserialization.
*/
<T> Subscription subscribeToEvents(
String pubsubName, String topic, SubscriptionListener<T> listener, TypeRef<T> type);
}

View File

@ -0,0 +1,198 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client;
import io.dapr.client.domain.CloudEvent;
import io.dapr.exceptions.DaprException;
import io.dapr.v1.DaprAppCallbackProtos;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.stub.StreamObserver;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
/**
* Streaming subscription of events for Dapr's pubsub.
* @param <T> Application's object type.
*/
public class Subscription<T> implements Closeable {
private final BlockingQueue<DaprProtos.SubscribeTopicEventsRequestAlpha1> ackQueue = new LinkedBlockingQueue<>(50);
private final AtomicBoolean running = new AtomicBoolean(true);
private final Semaphore receiverStateChange = new Semaphore(0);
private Thread acker;
private Thread receiver;
Subscription(DaprGrpc.DaprStub asyncStub,
DaprProtos.SubscribeTopicEventsRequestAlpha1 request,
SubscriptionListener<T> listener,
Function<DaprProtos.SubscribeTopicEventsResponseAlpha1, CloudEvent<T>> cloudEventConverter) {
final AtomicReference<StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1>> streamRef =
new AtomicReference<>();
this.acker = new Thread(() -> {
while (running.get()) {
try {
var ackResponse = ackQueue.take();
if (ackResponse == null) {
continue;
}
var stream = streamRef.get();
if (stream == null) {
Thread.sleep(1000);
// stream not ready yet
continue;
}
stream.onNext(ackResponse);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
// No-op and continue after waiting for some time.
// This is useful when there is a reconnection, for example.
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return;
}
}
}
});
this.receiver = new Thread(() -> {
while (running.get()) {
var stream = asyncStub.subscribeTopicEventsAlpha1(new StreamObserver<>() {
@Override
public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 topicEventRequest) {
try {
var stream = streamRef.get();
if (stream == null) {
throw new RuntimeException("Cannot receive event: streaming subscription is not initialized.");
}
CloudEvent<T> cloudEvent = cloudEventConverter.apply(topicEventRequest);
if (cloudEvent == null) {
return;
}
var id = cloudEvent.getId();
if ((id == null) || id.isEmpty()) {
return;
}
onEvent(listener, cloudEvent).subscribe(status -> {
var ack = buildAckRequest(id, status);
try {
ackQueue.put(ack);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
this.onError(DaprException.propagate(e));
}
}
@Override
public void onError(Throwable throwable) {
listener.onError(DaprException.propagate(throwable));
}
@Override
public void onCompleted() {
receiverStateChange.release();
}
});
streamRef.set(stream);
stream.onNext(request);
// Keep the client running
try {
receiverStateChange.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
running.set(false);
}
}
});
}
private static <T> Mono<SubscriptionListener.Status> onEvent(
SubscriptionListener<T> listener, CloudEvent<T> cloudEvent) {
return listener.onEvent(cloudEvent).onErrorMap(t -> {
var exception = DaprException.propagate(t);
listener.onError(exception);
return exception;
}).onErrorReturn(SubscriptionListener.Status.RETRY);
}
@NotNull
private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest(
String id, SubscriptionListener.Status status) {
DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed =
DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1.newBuilder()
.setId(id)
.setStatus(
DaprAppCallbackProtos.TopicEventResponse.newBuilder()
.setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.valueOf(
status.name()))
.build())
.build();
DaprProtos.SubscribeTopicEventsRequestAlpha1 ack =
DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder()
.setEventProcessed(eventProcessed)
.build();
return ack;
}
void start() {
this.receiver.start();
this.acker.start();
}
/**
* Stops the subscription.
*/
@Override
public void close() {
running.set(false);
receiverStateChange.release();
this.acker.interrupt();
}
/**
* Awaits (blocks) for subscription to end.
* @throws InterruptedException Exception if interrupted while awaiting.
*/
public void awaitTermination() throws InterruptedException {
this.receiver.join();
this.acker.join();
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client;
import io.dapr.client.domain.CloudEvent;
import reactor.core.publisher.Mono;
/**
* Callback interface to receive events from a streaming subscription of events.
* @param <T> Object type for deserialization.
*/
public interface SubscriptionListener<T> {
/**
* Callback status response for acknowledging a message.
*/
enum Status {
SUCCESS,
RETRY,
DROP
}
/**
* Processes an event from streaming subscription.
* @param event Event received.
* @return Acknowledgement status.
*/
Mono<Status> onEvent(CloudEvent<T> event);
/**
* Processes an exception during streaming subscription.
* @param exception Exception to be processed.
*/
void onError(RuntimeException exception);
}

View File

@ -42,11 +42,11 @@ public class DaprClientBuilderTest {
@Test
public void buildWithOverrideSidecarIP() {
DaprClientBuilder daprClientBuilder = new DaprClientBuilder();
daprClientBuilder.withPropertyOverride(Properties.SIDECAR_IP, "unknown-host");
daprClientBuilder.withPropertyOverride(Properties.SIDECAR_IP, "unknownhost");
DaprClient daprClient = daprClientBuilder.build();
assertNotNull(daprClient);
DaprException thrown = assertThrows(DaprException.class, () -> { daprClient.getMetadata().block(); });
assertTrue(thrown.toString().contains("UNAVAILABLE"));
assertTrue(thrown.toString().contains("UNAVAILABLE"), thrown.toString());
}

View File

@ -20,6 +20,7 @@ import com.google.protobuf.ByteString;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.QueryStateItem;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
@ -27,6 +28,8 @@ import io.dapr.client.domain.UnlockResponseStatus;
import io.dapr.client.domain.query.Query;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.TypeRef;
import io.dapr.v1.DaprAppCallbackProtos;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.Status;
@ -44,9 +47,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -417,6 +425,120 @@ public class DaprPreviewClientGrpcTest {
assertEquals(UnlockResponseStatus.SUCCESS, result);
}
@Test
public void subscribeEventTest() throws Exception {
var numEvents = 100;
var numErrors = 3;
var numDrops = 2;
var pubsubName = "pubsubName";
var topicName = "topicName";
var data = "my message";
var started = new Semaphore(0);
doAnswer((Answer<StreamObserver<DaprProtos.SubscribeTopicEventsRequestAlpha1>>) invocation -> {
StreamObserver<DaprProtos.SubscribeTopicEventsResponseAlpha1> observer =
(StreamObserver<DaprProtos.SubscribeTopicEventsResponseAlpha1>) invocation.getArguments()[0];
var emitterThread = new Thread(() -> {
try {
started.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance());
for (int i = 0; i < numEvents; i++) {
observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
.setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder()
.setId(Integer.toString(i))
.setPubsubName(pubsubName)
.setTopic(topicName)
.setData(ByteString.copyFromUtf8("\"" + data + "\""))
.setDataContentType("application/json")
.build())
.build());
}
for (int i = 0; i < numDrops; i++) {
// Bad messages
observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder()
.setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder()
.setId(UUID.randomUUID().toString())
.setPubsubName("bad pubsub")
.setTopic("bad topic")
.setData(ByteString.copyFromUtf8("\"\""))
.setDataContentType("application/json")
.build())
.build());
}
observer.onCompleted();
});
emitterThread.start();
return new StreamObserver<>() {
@Override
public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) {
started.release();
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
}
};
}).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class));
final Set<String> success = Collections.synchronizedSet(new HashSet<>());
final Set<String> errors = Collections.synchronizedSet(new HashSet<>());
final AtomicInteger dropCounter = new AtomicInteger();
final Semaphore gotAll = new Semaphore(0);
final AtomicInteger errorsToBeEmitted = new AtomicInteger(numErrors);
var subscription = previewClient.subscribeToEvents(
"pubsubname",
"topic",
new SubscriptionListener<>() {
@Override
public Mono<Status> onEvent(CloudEvent<String> event) {
if (event.getPubsubName().equals(pubsubName) &&
event.getTopic().equals(topicName) &&
event.getData().equals(data)) {
// Simulate an error
if ((success.size() == 4 /* some random entry */) && errorsToBeEmitted.decrementAndGet() >= 0) {
throw new RuntimeException("simulated exception on event " + event.getId());
}
success.add(event.getId());
if (success.size() >= numEvents) {
gotAll.release();
}
return Mono.just(Status.SUCCESS);
}
dropCounter.incrementAndGet();
return Mono.just(Status.DROP);
}
@Override
public void onError(RuntimeException exception) {
errors.add(exception.getMessage());
}
},
TypeRef.STRING);
gotAll.acquire();
subscription.close();
assertEquals(numEvents, success.size());
assertEquals(numDrops, dropCounter.get());
assertEquals(numErrors, errors.size());
}
private DaprProtos.QueryStateResponse buildQueryStateResponse(List<QueryStateItem<?>> resp,String token)
throws JsonProcessingException {
List<DaprProtos.QueryStateItem> items = new ArrayList<>();

View File

@ -5,29 +5,15 @@ import io.grpc.ManagedChannel;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
public class NetworkUtilsTest {
private final int defaultGrpcPort = 4000;
private final int defaultGrpcPort = 50001;
private final String defaultSidecarIP = "127.0.0.1";
private ManagedChannel channel;
private Map<String, String> propertiesOverride;
@BeforeEach
public void setUp() {
// Must be mutable for some test scenarios here.
propertiesOverride = new HashMap<>(Map.of(
Properties.GRPC_PORT.getName(), Integer.toString(defaultGrpcPort),
Properties.SIDECAR_IP.getName(), defaultSidecarIP,
Properties.GRPC_ENDPOINT.getName(), ""
));
}
@AfterEach
public void tearDown() {
@ -38,7 +24,7 @@ public class NetworkUtilsTest {
@Test
public void testBuildGrpcManagedChannel() {
channel = NetworkUtils.buildGrpcManagedChannel(new Properties(propertiesOverride));
channel = NetworkUtils.buildGrpcManagedChannel(new Properties());
String expectedAuthority = String.format("%s:%s", defaultSidecarIP, defaultGrpcPort);
Assertions.assertEquals(expectedAuthority, channel.authority());
@ -46,8 +32,8 @@ public class NetworkUtilsTest {
@Test
public void testBuildGrpcManagedChannel_httpEndpointNoPort() {
propertiesOverride.put(Properties.GRPC_ENDPOINT.getName(), "http://example.com");
channel = NetworkUtils.buildGrpcManagedChannel(new Properties(propertiesOverride));
var properties = new Properties(Map.of(Properties.GRPC_ENDPOINT.getName(), "http://example.com"));
channel = NetworkUtils.buildGrpcManagedChannel(properties);
String expectedAuthority = "example.com:80";
Assertions.assertEquals(expectedAuthority, channel.authority());
@ -55,8 +41,8 @@ public class NetworkUtilsTest {
@Test
public void testBuildGrpcManagedChannel_httpEndpointWithPort() {
propertiesOverride.put(Properties.GRPC_ENDPOINT.getName(), "http://example.com:3000");
channel = NetworkUtils.buildGrpcManagedChannel(new Properties(propertiesOverride));
var properties = new Properties(Map.of(Properties.GRPC_ENDPOINT.getName(), "http://example.com:3000"));
channel = NetworkUtils.buildGrpcManagedChannel(properties);
String expectedAuthority = "example.com:3000";
Assertions.assertEquals(expectedAuthority, channel.authority());
@ -64,8 +50,8 @@ public class NetworkUtilsTest {
@Test
public void testBuildGrpcManagedChannel_httpsEndpointNoPort() {
propertiesOverride.put(Properties.GRPC_ENDPOINT.getName(), "https://example.com");
channel = NetworkUtils.buildGrpcManagedChannel(new Properties(propertiesOverride));
var properties = new Properties(Map.of(Properties.GRPC_ENDPOINT.getName(), "https://example.com"));
channel = NetworkUtils.buildGrpcManagedChannel(properties);
String expectedAuthority = "example.com:443";
Assertions.assertEquals(expectedAuthority, channel.authority());
@ -73,8 +59,8 @@ public class NetworkUtilsTest {
@Test
public void testBuildGrpcManagedChannel_httpsEndpointWithPort() {
propertiesOverride.put(Properties.GRPC_ENDPOINT.getName(), "https://example.com:3000");
channel = NetworkUtils.buildGrpcManagedChannel(new Properties(propertiesOverride));
var properties = new Properties(Map.of(Properties.GRPC_ENDPOINT.getName(), "https://example.com:3000"));
channel = NetworkUtils.buildGrpcManagedChannel(properties);
String expectedAuthority = "example.com:3000";
Assertions.assertEquals(expectedAuthority, channel.authority());
@ -144,8 +130,8 @@ public class NetworkUtilsTest {
String expectedEndpoint,
boolean expectSecure
) {
var override = Map.of(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue);
var settings = NetworkUtils.GrpcEndpointSettings.parse(new Properties(override));
var properties = new Properties(Map.of(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue));
var settings = NetworkUtils.GrpcEndpointSettings.parse(properties);
Assertions.assertEquals(expectedEndpoint, settings.endpoint);
Assertions.assertEquals(expectSecure, settings.secure);
@ -153,8 +139,8 @@ public class NetworkUtilsTest {
private static void testGrpcEndpointParsingErrorScenario(String grpcEndpointEnvValue) {
try {
var override = Map.of(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue);
NetworkUtils.GrpcEndpointSettings.parse(new Properties(override));
var properties = new Properties(Map.of(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue));
NetworkUtils.GrpcEndpointSettings.parse(properties);
Assert.fail();
} catch (IllegalArgumentException e) {
// Expected