Use 1.16 rc.3 (#1488)

* chore: Use dapr release 1.16-rc.3

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* feat: Move field to top level request

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* chore: Update mechanical markdown workflow timeouts

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* chore: Run MM workflows tests in parallel

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* fix: Fix tracing examples

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* ci: Do not run tests during example validations

We have specifics jobs that runs tests. Do not run them as part as validations

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* ci: Improve validate job

No need to run separate jobs for clean, compile and install

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* ci: Run workflows examples sequentially

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* ci: Add timeout to strem pubsub

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* ci: Fix order of examples

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* ci: Fix validation tests for sb workflows

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* nit: Add copyright header

Signed-off-by: Javier Aliaga <javier@diagrid.io>

---------

Signed-off-by: Javier Aliaga <javier@diagrid.io>
Signed-off-by: siri-varma <siri.varma@outlook.com>
This commit is contained in:
Javier Aliaga 2025-08-11 18:39:15 +02:00 committed by siri-varma
parent e3c7dcd045
commit 1b63c7631a
29 changed files with 399 additions and 119 deletions

View File

@ -39,7 +39,7 @@ jobs:
GOPROXY: https://proxy.golang.org GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }} JDK_VER: ${{ matrix.java }}
DAPR_CLI_VER: 1.15.0 DAPR_CLI_VER: 1.15.0
DAPR_RUNTIME_VER: 1.15.7 DAPR_RUNTIME_VER: 1.16.0-rc.3
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.15.0/install/install.sh DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.15.0/install/install.sh
DAPR_CLI_REF: DAPR_CLI_REF:
DAPR_REF: DAPR_REF:

View File

@ -38,7 +38,7 @@ jobs:
GOPROXY: https://proxy.golang.org GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }} JDK_VER: ${{ matrix.java }}
DAPR_CLI_VER: 1.15.0 DAPR_CLI_VER: 1.15.0
DAPR_RUNTIME_VER: 1.15.7 DAPR_RUNTIME_VER: 1.16.0-rc.3
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.15.0/install/install.sh DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.15.0/install/install.sh
DAPR_CLI_REF: DAPR_CLI_REF:
DAPR_REF: DAPR_REF:
@ -107,12 +107,26 @@ jobs:
pip3 install mechanical-markdown pip3 install mechanical-markdown
- name: Verify scheduler is listening on port. - name: Verify scheduler is listening on port.
run: sleep 30 && docker logs dapr_scheduler && nc -vz localhost 50006 run: sleep 30 && docker logs dapr_scheduler && nc -vz localhost 50006
- name: Clean up files
run: ./mvnw clean
- name: Build sdk
run: ./mvnw compile -q
- name: Install jars - name: Install jars
run: ./mvnw install -q run: ./mvnw clean install -DskipTests -q
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate workflows example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/workflows/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate Spring Boot examples
working-directory: ./spring-boot-examples
run: |
mm.py README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate Spring Boot Workflow examples
working-directory: ./spring-boot-examples/workflows
run: |
mm.py README.md
env: env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate Jobs example - name: Validate Jobs example
@ -199,21 +213,9 @@ jobs:
mm.py ./src/main/java/io/dapr/examples/querystate/README.md mm.py ./src/main/java/io/dapr/examples/querystate/README.md
env: env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate workflows example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/workflows/README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate streaming subscription example - name: Validate streaming subscription example
working-directory: ./examples working-directory: ./examples
run: | run: |
mm.py ./src/main/java/io/dapr/examples/pubsub/stream/README.md mm.py ./src/main/java/io/dapr/examples/pubsub/stream/README.md
env: env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}} DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}
- name: Validate Spring Boot examples
working-directory: ./spring-boot-examples
run: |
mm.py README.md
env:
DOCKER_HOST: ${{steps.setup_docker.outputs.sock}}

View File

@ -79,8 +79,8 @@ public class DaprTestContainersConfig {
@Bean @Bean
@ServiceConnection @ServiceConnection
public DaprContainer daprContainer(Network daprNetwork, PostgreSQLContainer<?> postgreSQLContainer){ public DaprContainer daprContainer(Network daprNetwork, PostgreSQLContainer<?> postgreSQLContainer){
return new DaprContainer("daprio/daprd:1.15.7") return new DaprContainer("daprio/daprd:1.16.0-rc.3")
.withAppName("producer-app") .withAppName("producer-app")
.withNetwork(daprNetwork) .withNetwork(daprNetwork)
.withComponent(new Component("kvstore", "state.postgresql", "v1", STATE_STORE_PROPERTIES)) .withComponent(new Component("kvstore", "state.postgresql", "v1", STATE_STORE_PROPERTIES))
@ -235,7 +235,7 @@ Finally, because Dapr PubSub requires a bidirectional connection between your ap
@ServiceConnection @ServiceConnection
public DaprContainer daprContainer(Network daprNetwork, PostgreSQLContainer<?> postgreSQLContainer, RabbitMQContainer rabbitMQContainer){ public DaprContainer daprContainer(Network daprNetwork, PostgreSQLContainer<?> postgreSQLContainer, RabbitMQContainer rabbitMQContainer){
return new DaprContainer("daprio/daprd:1.15.7") return new DaprContainer("daprio/daprd:1.16.0-rc.3")
.withAppName("producer-app") .withAppName("producer-app")
.withNetwork(daprNetwork) .withNetwork(daprNetwork)
.withComponent(new Component("kvstore", "state.postgresql", "v1", STATE_STORE_PROPERTIES)) .withComponent(new Component("kvstore", "state.postgresql", "v1", STATE_STORE_PROPERTIES))

View File

@ -93,7 +93,8 @@ expected_stdout_lines:
- '== APP == Subscriber got: This is message #0' - '== APP == Subscriber got: This is message #0'
- '== APP == Subscriber got: This is message #1' - '== APP == Subscriber got: This is message #1'
background: true background: true
sleep: 30 sleep: 15
timeout_seconds: 30
--> -->
```bash ```bash
@ -111,6 +112,7 @@ expected_stdout_lines:
- '== APP == Published message: This is message #1' - '== APP == Published message: This is message #1'
background: true background: true
sleep: 15 sleep: 15
timeout_seconds: 30
--> -->
```bash ```bash

View File

@ -70,6 +70,12 @@ public class InvokeClient {
}).contextWrite(getReactorContext()).block(); }).contextWrite(getReactorContext()).block();
} }
} }
span.end();
openTelemetrySdk.getSdkTracerProvider().shutdown();
Validation.validate();
System.out.println("Done");
System.exit(0);
} }
span.end(); span.end();
shutdown(); shutdown();

View File

@ -138,6 +138,7 @@ name: Run demo service
expected_stdout_lines: expected_stdout_lines:
background: true background: true
sleep: 20 sleep: 20
timeout_seconds: 40
--> -->
```bash ```bash
@ -225,6 +226,7 @@ name: Run proxy service
expected_stdout_lines: expected_stdout_lines:
background: true background: true
sleep: 20 sleep: 20
timeout_seconds: 40
--> -->
```bash ```bash
@ -284,6 +286,7 @@ expected_stdout_lines:
- '== APP == Done' - '== APP == Done'
background: true background: true
sleep: 20 sleep: 20
timeout_seconds: 40
--> -->
```bash ```bash

View File

@ -142,13 +142,12 @@ expected_stdout_lines:
- 'Message Received from input: Seattle' - 'Message Received from input: Seattle'
- 'Sending message to output: SEATTLE' - 'Sending message to output: SEATTLE'
- 'Workflow finished with result: TOKYO, LONDON, SEATTLE' - 'Workflow finished with result: TOKYO, LONDON, SEATTLE'
timeout_seconds: 20
background: true background: true
sleep: 60
timeout_seconds: 60
--> -->
Execute the following script in order to run DemoChainWorker: Execute the following script in order to run DemoChainWorker:
```sh ```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainWorker dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainWorker 50001
``` ```
Once running, the logs will start displaying the different steps: First, you can see workflow is starting: Once running, the logs will start displaying the different steps: First, you can see workflow is starting:
@ -158,9 +157,19 @@ Once running, the logs will start displaying the different steps: First, you can
== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001. == APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001.
``` ```
<!-- END_STEP -->
<!-- STEP
name: Execute Chaining Pattern workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
- 'completed with result: TOKYO, LONDON, SEATTLE'
timeout_seconds: 20
-->
Then, execute the following script in order to run DemoChainClient: Then, execute the following script in order to run DemoChainClient:
```sh ```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainClient sleep 10 && java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainClient 50001
``` ```
<!-- END_STEP --> <!-- END_STEP -->
@ -241,7 +250,7 @@ public class CountWordsActivity implements WorkflowActivity {
} }
``` ```
<!-- STEP <!-- STEP
name: Run Chaining Pattern workflow name: Run FanInOut Pattern workflow
match_order: none match_order: none
output_match_mode: substring output_match_mode: substring
expected_stdout_lines: expected_stdout_lines:
@ -251,20 +260,29 @@ expected_stdout_lines:
- 'Activity returned: 17.' - 'Activity returned: 17.'
- 'Activity returned: 11.' - 'Activity returned: 11.'
- 'Workflow finished with result: 60' - 'Workflow finished with result: 60'
timeout_seconds: 20
background: true background: true
sleep: 60
timeout_seconds: 60
--> -->
Execute the following script in order to run DemoFanInOutWorker: Execute the following script in order to run DemoFanInOutWorker:
```sh ```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutWorker dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50002 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutWorker 50002
``` ```
<!-- END_STEP -->
<!-- STEP
name: Execute FanInOut Pattern workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
- 'completed with result: 60'
timeout_seconds: 20
-->
Execute the following script in order to run DemoFanInOutClient: Execute the following script in order to run DemoFanInOutClient:
```sh ```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutClient sleep 10 && java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutClient 50002
``` ```
<!-- END_STEP --> <!-- END_STEP -->
@ -294,7 +312,6 @@ and the client:
Started a new fan out/fan in model model workflow with instance ID: 092c1928-b5dd-4576-9468-300bf6aed986 Started a new fan out/fan in model model workflow with instance ID: 092c1928-b5dd-4576-9468-300bf6aed986
workflow instance with ID: 092c1928-b5dd-4576-9468-300bf6aed986 completed with result: 60 workflow instance with ID: 092c1928-b5dd-4576-9468-300bf6aed986 completed with result: 60
``` ```
### Continue As New Pattern ### Continue As New Pattern
`ContinueAsNew` API allows you to restart the workflow with a new input. `ContinueAsNew` API allows you to restart the workflow with a new input.
@ -606,8 +623,6 @@ expected_stdout_lines:
- "Registered Activity: CancelCarActivity" - "Registered Activity: CancelCarActivity"
- "Successfully built dapr workflow runtime" - "Successfully built dapr workflow runtime"
- "Start workflow runtime" - "Start workflow runtime"
- "Durable Task worker is connecting to sidecar at 127.0.0.1:50001."
- "Starting Workflow: io.dapr.examples.workflows.compensation.BookTripWorkflow" - "Starting Workflow: io.dapr.examples.workflows.compensation.BookTripWorkflow"
- "Starting Activity: io.dapr.examples.workflows.compensation.BookFlightActivity" - "Starting Activity: io.dapr.examples.workflows.compensation.BookFlightActivity"
- "Activity completed with result: Flight booked successfully" - "Activity completed with result: Flight booked successfully"
@ -625,18 +640,27 @@ expected_stdout_lines:
- "Starting Activity: io.dapr.examples.workflows.compensation.CancelFlightActivity" - "Starting Activity: io.dapr.examples.workflows.compensation.CancelFlightActivity"
- "Activity completed with result: Flight canceled successfully" - "Activity completed with result: Flight canceled successfully"
background: true background: true
sleep: 60 timeout_seconds: 30
timeout_seconds: 60
--> -->
Execute the following script in order to run the BookTripWorker: Execute the following script in order to run the BookTripWorker:
```sh ```sh
dapr run --app-id book-trip-worker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.compensation.BookTripWorker dapr run --app-id book-trip-worker --resources-path ./components/workflows --dapr-grpc-port 50003 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.compensation.BookTripWorker 50003
``` ```
<!-- END_STEP -->
<!-- STEP
name: Execute Compensation Pattern workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
- 'Workflow failed, compensation applied'
timeout_seconds: 30
-->
Once running, execute the following script to run the BookTripClient: Once running, execute the following script to run the BookTripClient:
```sh ```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.compensation.BookTripClient sleep 15 && java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.compensation.BookTripClient 50003
``` ```
<!-- END_STEP --> <!-- END_STEP -->
@ -656,7 +680,7 @@ Key Points:
### Suspend/Resume Pattern ### Suspend/Resume Pattern
Workflow instances can be suspended and resumed. This example shows how to use the suspend and resume commands. Workflow instances can be suspended and resumed. This example shows how to use the suspend and resume commands.
For testing the suspend and resume operations we will use the same workflow definition used by the DemoExternalEventWorkflow. For testing the suspend and resume operations we will use the same workflow definition used by the DemoExternalEventWorkflow.
@ -669,26 +693,34 @@ match_order: none
output_match_mode: substring output_match_mode: substring
expected_stdout_lines: expected_stdout_lines:
- "Waiting for approval..." - "Waiting for approval..."
- "Suspending Workflow Instance"
- "Workflow Instance Status: SUSPENDED"
- "Let's resume the Workflow Instance before sending the external event"
- "Workflow Instance Status: RUNNING"
- "Now that the instance is RUNNING again, lets send the external event."
- "approval granted - do the approved action" - "approval granted - do the approved action"
- "Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity" - "Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity"
- "Running approval activity..." - "Running approval activity..."
- "approval-activity finished" - "approval-activity finished"
background: true background: true
sleep: 60 timeout_seconds: 30
timeout_seconds: 60
--> -->
```sh ```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.suspendresume.DemoSuspendResumeWorker dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50004 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.suspendresume.DemoSuspendResumeWorker 50004
``` ```
<!-- END_STEP -->
<!-- STEP
name: Execute Suspend/Resume workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
- "Suspending Workflow Instance"
- "Workflow Instance Status: SUSPENDED"
- "Let's resume the Workflow Instance before sending the external event"
- "Workflow Instance Status: RUNNING"
- "Now that the instance is RUNNING again, lets send the external event."
timeout_seconds: 30
-->
```sh ```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.suspendresume.DemoSuspendResumeClient sleep 15 && java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.suspendresume.DemoSuspendResumeClient 50004
``` ```
<!-- END_STEP --> <!-- END_STEP -->

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.examples.workflows.chain; package io.dapr.examples.workflows.chain;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.client.DaprWorkflowClient; import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus; import io.dapr.workflows.client.WorkflowInstanceStatus;
@ -26,7 +27,7 @@ public class DemoChainClient {
* @throws InterruptedException If program has been interrupted. * @throws InterruptedException If program has been interrupted.
*/ */
public static void main(String[] args) { public static void main(String[] args) {
try (DaprWorkflowClient client = new DaprWorkflowClient()) { try (DaprWorkflowClient client = new DaprWorkflowClient(PropertyUtils.getProperties(args))) {
String instanceId = client.scheduleNewWorkflow(DemoChainWorkflow.class); String instanceId = client.scheduleNewWorkflow(DemoChainWorkflow.class);
System.out.printf("Started a new chaining model workflow with instance ID: %s%n", instanceId); System.out.printf("Started a new chaining model workflow with instance ID: %s%n", instanceId);
WorkflowInstanceStatus workflowInstanceStatus = WorkflowInstanceStatus workflowInstanceStatus =

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.examples.workflows.chain; package io.dapr.examples.workflows.chain;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.runtime.WorkflowRuntime; import io.dapr.workflows.runtime.WorkflowRuntime;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
@ -25,7 +26,7 @@ public class DemoChainWorker {
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// Register the Workflow with the builder. // Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoChainWorkflow.class); WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(PropertyUtils.getProperties(args)).registerWorkflow(DemoChainWorkflow.class);
builder.registerActivity(ToUpperCaseActivity.class); builder.registerActivity(ToUpperCaseActivity.class);
// Build and then start the workflow runtime pulling and executing tasks // Build and then start the workflow runtime pulling and executing tasks

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.examples.workflows.childworkflow; package io.dapr.examples.workflows.childworkflow;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.client.DaprWorkflowClient; import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus; import io.dapr.workflows.client.WorkflowInstanceStatus;
@ -26,7 +27,7 @@ public class DemoChildWorkerflowClient {
* @throws InterruptedException If program has been interrupted. * @throws InterruptedException If program has been interrupted.
*/ */
public static void main(String[] args) { public static void main(String[] args) {
try (DaprWorkflowClient client = new DaprWorkflowClient()) { try (DaprWorkflowClient client = new DaprWorkflowClient(PropertyUtils.getProperties(args))) {
String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class); String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class);
System.out.printf("Started a new child-workflow model workflow with instance ID: %s%n", instanceId); System.out.printf("Started a new child-workflow model workflow with instance ID: %s%n", instanceId);
WorkflowInstanceStatus workflowInstanceStatus = WorkflowInstanceStatus workflowInstanceStatus =

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.examples.workflows.childworkflow; package io.dapr.examples.workflows.childworkflow;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.runtime.WorkflowRuntime; import io.dapr.workflows.runtime.WorkflowRuntime;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
@ -25,7 +26,7 @@ public class DemoChildWorkflowWorker {
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// Register the Workflow with the builder. // Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder() WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(PropertyUtils.getProperties(args))
.registerWorkflow(DemoWorkflow.class) .registerWorkflow(DemoWorkflow.class)
.registerWorkflow(DemoChildWorkflow.class); .registerWorkflow(DemoChildWorkflow.class);
builder.registerActivity(ReverseActivity.class); builder.registerActivity(ReverseActivity.class);

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.examples.workflows.compensation; package io.dapr.examples.workflows.compensation;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.client.DaprWorkflowClient; import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus; import io.dapr.workflows.client.WorkflowInstanceStatus;
@ -21,7 +22,7 @@ import java.util.concurrent.TimeoutException;
public class BookTripClient { public class BookTripClient {
public static void main(String[] args) { public static void main(String[] args) {
try (DaprWorkflowClient client = new DaprWorkflowClient()) { try (DaprWorkflowClient client = new DaprWorkflowClient(PropertyUtils.getProperties(args))) {
String instanceId = client.scheduleNewWorkflow(BookTripWorkflow.class); String instanceId = client.scheduleNewWorkflow(BookTripWorkflow.class);
System.out.printf("Started a new trip booking workflow with instance ID: %s%n", instanceId); System.out.printf("Started a new trip booking workflow with instance ID: %s%n", instanceId);

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.examples.workflows.compensation; package io.dapr.examples.workflows.compensation;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.runtime.WorkflowRuntime; import io.dapr.workflows.runtime.WorkflowRuntime;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
@ -20,7 +21,7 @@ public class BookTripWorker {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// Register the Workflow with the builder // Register the Workflow with the builder
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder() WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(PropertyUtils.getProperties(args))
.registerWorkflow(BookTripWorkflow.class) .registerWorkflow(BookTripWorkflow.class)
.registerActivity(BookFlightActivity.class) .registerActivity(BookFlightActivity.class)
.registerActivity(CancelFlightActivity.class) .registerActivity(CancelFlightActivity.class)

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.examples.workflows.continueasnew; package io.dapr.examples.workflows.continueasnew;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.client.DaprWorkflowClient; import io.dapr.workflows.client.DaprWorkflowClient;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -25,7 +26,7 @@ public class DemoContinueAsNewClient {
* @throws InterruptedException If program has been interrupted. * @throws InterruptedException If program has been interrupted.
*/ */
public static void main(String[] args) { public static void main(String[] args) {
try (DaprWorkflowClient client = new DaprWorkflowClient()) { try (DaprWorkflowClient client = new DaprWorkflowClient(PropertyUtils.getProperties(args))) {
String instanceId = client.scheduleNewWorkflow(DemoContinueAsNewWorkflow.class); String instanceId = client.scheduleNewWorkflow(DemoContinueAsNewWorkflow.class);
System.out.printf("Started a new continue-as-new model workflow with instance ID: %s%n", instanceId); System.out.printf("Started a new continue-as-new model workflow with instance ID: %s%n", instanceId);

View File

@ -13,10 +13,10 @@ limitations under the License.
package io.dapr.examples.workflows.continueasnew; package io.dapr.examples.workflows.continueasnew;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.runtime.WorkflowRuntime; import io.dapr.workflows.runtime.WorkflowRuntime;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
public class DemoContinueAsNewWorker { public class DemoContinueAsNewWorker {
@ -28,7 +28,7 @@ public class DemoContinueAsNewWorker {
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// Register the Workflow with the builder. // Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(). WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(PropertyUtils.getProperties(args)).
registerWorkflow(DemoContinueAsNewWorkflow.class) registerWorkflow(DemoContinueAsNewWorkflow.class)
.withExecutorService(Executors.newFixedThreadPool(3)); .withExecutorService(Executors.newFixedThreadPool(3));
builder.registerActivity(CleanUpActivity.class); builder.registerActivity(CleanUpActivity.class);

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.examples.workflows.externalevent; package io.dapr.examples.workflows.externalevent;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.client.DaprWorkflowClient; import io.dapr.workflows.client.DaprWorkflowClient;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -25,7 +26,7 @@ public class DemoExternalEventClient {
* @throws InterruptedException If program has been interrupted. * @throws InterruptedException If program has been interrupted.
*/ */
public static void main(String[] args) { public static void main(String[] args) {
try (DaprWorkflowClient client = new DaprWorkflowClient()) { try (DaprWorkflowClient client = new DaprWorkflowClient(PropertyUtils.getProperties(args))) {
String instanceId = client.scheduleNewWorkflow(DemoExternalEventWorkflow.class); String instanceId = client.scheduleNewWorkflow(DemoExternalEventWorkflow.class);
System.out.printf("Started a new external-event workflow with instance ID: %s%n", instanceId); System.out.printf("Started a new external-event workflow with instance ID: %s%n", instanceId);

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.examples.workflows.externalevent; package io.dapr.examples.workflows.externalevent;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.runtime.WorkflowRuntime; import io.dapr.workflows.runtime.WorkflowRuntime;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
@ -25,7 +26,7 @@ public class DemoExternalEventWorker {
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// Register the Workflow with the builder. // Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoExternalEventWorkflow.class); WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(PropertyUtils.getProperties(args)).registerWorkflow(DemoExternalEventWorkflow.class);
builder.registerActivity(ApproveActivity.class); builder.registerActivity(ApproveActivity.class);
builder.registerActivity(DenyActivity.class); builder.registerActivity(DenyActivity.class);

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.examples.workflows.faninout; package io.dapr.examples.workflows.faninout;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.client.DaprWorkflowClient; import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus; import io.dapr.workflows.client.WorkflowInstanceStatus;
@ -29,7 +30,7 @@ public class DemoFanInOutClient {
* @throws InterruptedException If program has been interrupted. * @throws InterruptedException If program has been interrupted.
*/ */
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
try (DaprWorkflowClient client = new DaprWorkflowClient()) { try (DaprWorkflowClient client = new DaprWorkflowClient(PropertyUtils.getProperties(args))) {
// The input is an arbitrary list of strings. // The input is an arbitrary list of strings.
List<String> listOfStrings = Arrays.asList( List<String> listOfStrings = Arrays.asList(
"Hello, world!", "Hello, world!",

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.examples.workflows.faninout; package io.dapr.examples.workflows.faninout;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.runtime.WorkflowRuntime; import io.dapr.workflows.runtime.WorkflowRuntime;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
@ -25,7 +26,7 @@ public class DemoFanInOutWorker {
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// Register the Workflow with the builder. // Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoFanInOutWorkflow.class); WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(PropertyUtils.getProperties(args)).registerWorkflow(DemoFanInOutWorkflow.class);
builder.registerActivity(CountWordsActivity.class); builder.registerActivity(CountWordsActivity.class);
// Build and then start the workflow runtime pulling and executing tasks // Build and then start the workflow runtime pulling and executing tasks

View File

@ -14,6 +14,7 @@ limitations under the License.
package io.dapr.examples.workflows.suspendresume; package io.dapr.examples.workflows.suspendresume;
import io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow; import io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.client.DaprWorkflowClient; import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus; import io.dapr.workflows.client.WorkflowInstanceStatus;
@ -27,7 +28,7 @@ public class DemoSuspendResumeClient {
* @throws InterruptedException If program has been interrupted. * @throws InterruptedException If program has been interrupted.
*/ */
public static void main(String[] args) { public static void main(String[] args) {
try (DaprWorkflowClient client = new DaprWorkflowClient()) { try (DaprWorkflowClient client = new DaprWorkflowClient(PropertyUtils.getProperties(args))) {
String instanceId = client.scheduleNewWorkflow(DemoExternalEventWorkflow.class); String instanceId = client.scheduleNewWorkflow(DemoExternalEventWorkflow.class);
System.out.printf("Started a new external-event workflow with instance ID: %s%n", instanceId); System.out.printf("Started a new external-event workflow with instance ID: %s%n", instanceId);

View File

@ -16,6 +16,7 @@ package io.dapr.examples.workflows.suspendresume;
import io.dapr.examples.workflows.externalevent.ApproveActivity; import io.dapr.examples.workflows.externalevent.ApproveActivity;
import io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow; import io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow;
import io.dapr.examples.workflows.externalevent.DenyActivity; import io.dapr.examples.workflows.externalevent.DenyActivity;
import io.dapr.examples.workflows.utils.PropertyUtils;
import io.dapr.workflows.runtime.WorkflowRuntime; import io.dapr.workflows.runtime.WorkflowRuntime;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
@ -28,7 +29,7 @@ public class DemoSuspendResumeWorker {
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// Register the Workflow with the builder. // Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoExternalEventWorkflow.class); WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(PropertyUtils.getProperties(args)).registerWorkflow(DemoExternalEventWorkflow.class);
builder.registerActivity(ApproveActivity.class); builder.registerActivity(ApproveActivity.class);
builder.registerActivity(DenyActivity.class); builder.registerActivity(DenyActivity.class);

View File

@ -0,0 +1,32 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.workflows.utils;
import io.dapr.config.Properties;
import java.util.HashMap;
public class PropertyUtils {
public static Properties getProperties(String[] args) {
Properties properties = new Properties();
if (args != null && args.length > 0) {
properties = new Properties(new HashMap<>() {{
put(Properties.GRPC_PORT, args[0]);
}});
}
return properties;
}
}

View File

@ -17,9 +17,9 @@
<grpc.version>1.69.0</grpc.version> <grpc.version>1.69.0</grpc.version>
<protobuf.version>3.25.5</protobuf.version> <protobuf.version>3.25.5</protobuf.version>
<protocCommand>protoc</protocCommand> <protocCommand>protoc</protocCommand>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.15.7/dapr/proto</dapr.proto.baseurl> <dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.16.0-rc.3/dapr/proto</dapr.proto.baseurl>
<dapr.sdk.version>1.15.0-rc-8</dapr.sdk.version> <dapr.sdk.version>1.16.0-SNAPSHOT</dapr.sdk.version>
<dapr.sdk.alpha.version>0.15.0-rc-8</dapr.sdk.alpha.version> <dapr.sdk.alpha.version>0.16.0-SNAPSHOT</dapr.sdk.alpha.version>
<os-maven-plugin.version>1.7.1</os-maven-plugin.version> <os-maven-plugin.version>1.7.1</os-maven-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version> <maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version> <maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>

View File

@ -1309,38 +1309,44 @@ public class DaprClientImpl extends AbstractDaprClient {
try { try {
validateScheduleJobRequest(scheduleJobRequest); validateScheduleJobRequest(scheduleJobRequest);
DaprProtos.Job.Builder scheduleJobRequestBuilder = DaprProtos.Job.newBuilder(); DaprProtos.Job.Builder jobBuilder = DaprProtos.Job.newBuilder();
scheduleJobRequestBuilder.setName(scheduleJobRequest.getName()); jobBuilder.setName(scheduleJobRequest.getName());
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.withZone(ZoneOffset.UTC); .withZone(ZoneOffset.UTC);
if (scheduleJobRequest.getData() != null) { if (scheduleJobRequest.getData() != null) {
scheduleJobRequestBuilder.setData(Any.newBuilder() jobBuilder.setData(Any.newBuilder()
.setValue(ByteString.copyFrom(scheduleJobRequest.getData())).build()); .setValue(ByteString.copyFrom(scheduleJobRequest.getData())).build());
} }
if (scheduleJobRequest.getSchedule() != null) { if (scheduleJobRequest.getSchedule() != null) {
scheduleJobRequestBuilder.setSchedule(scheduleJobRequest.getSchedule().getExpression()); jobBuilder.setSchedule(scheduleJobRequest.getSchedule().getExpression());
} }
if (scheduleJobRequest.getTtl() != null) { if (scheduleJobRequest.getTtl() != null) {
scheduleJobRequestBuilder.setTtl(iso8601Formatter.format(scheduleJobRequest.getTtl())); jobBuilder.setTtl(iso8601Formatter.format(scheduleJobRequest.getTtl()));
} }
if (scheduleJobRequest.getRepeats() != null) { if (scheduleJobRequest.getRepeats() != null) {
scheduleJobRequestBuilder.setRepeats(scheduleJobRequest.getRepeats()); jobBuilder.setRepeats(scheduleJobRequest.getRepeats());
} }
if (scheduleJobRequest.getDueTime() != null) { if (scheduleJobRequest.getDueTime() != null) {
scheduleJobRequestBuilder.setDueTime(iso8601Formatter.format(scheduleJobRequest.getDueTime())); jobBuilder.setDueTime(iso8601Formatter.format(scheduleJobRequest.getDueTime()));
} }
if (scheduleJobRequest.getFailurePolicy() != null) {
jobBuilder.setFailurePolicy(getJobFailurePolicy(scheduleJobRequest.getFailurePolicy()));
}
Mono<DaprProtos.ScheduleJobResponse> scheduleJobResponseMono = Mono<DaprProtos.ScheduleJobResponse> scheduleJobResponseMono =
Mono.deferContextual(context -> this.createMono( Mono.deferContextual(context -> this.createMono(
it -> intercept(context, asyncStub) it -> intercept(context, asyncStub)
.scheduleJobAlpha1(DaprProtos.ScheduleJobRequest.newBuilder() .scheduleJobAlpha1(DaprProtos.ScheduleJobRequest.newBuilder()
.setJob(scheduleJobRequestBuilder.build()).build(), it) .setOverwrite(scheduleJobRequest.getOverwrite())
.setJob(jobBuilder.build()).build(), it)
) )
); );

View File

@ -22,13 +22,15 @@ import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest; import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse; import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.CloudEvent; import io.dapr.client.domain.CloudEvent;
import io.dapr.client.domain.DeleteJobRequest; import io.dapr.client.domain.ConstantFailurePolicy;
import io.dapr.client.domain.GetJobRequest;
import io.dapr.client.domain.GetJobResponse;
import io.dapr.client.domain.JobSchedule;
import io.dapr.client.domain.ConversationInput; import io.dapr.client.domain.ConversationInput;
import io.dapr.client.domain.ConversationRequest; import io.dapr.client.domain.ConversationRequest;
import io.dapr.client.domain.ConversationResponse; import io.dapr.client.domain.ConversationResponse;
import io.dapr.client.domain.DeleteJobRequest;
import io.dapr.client.domain.DropFailurePolicy;
import io.dapr.client.domain.GetJobRequest;
import io.dapr.client.domain.GetJobResponse;
import io.dapr.client.domain.JobSchedule;
import io.dapr.client.domain.QueryStateItem; import io.dapr.client.domain.QueryStateItem;
import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse; import io.dapr.client.domain.QueryStateResponse;
@ -817,6 +819,199 @@ public class DaprPreviewClientGrpcTest {
assertEquals("Name in the request cannot be null or empty", exception.getMessage()); assertEquals("Name in the request cannot be null or empty", exception.getMessage());
} }
@Test
public void scheduleJobShouldHavePolicyWhenPolicyIsSet() {
doAnswer(invocation -> {
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
observer.onCompleted(); // Simulate successful response
return null;
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob",
JobSchedule.fromString("* * * * * *"))
.setFailurePolicy(new DropFailurePolicy());
previewClient.scheduleJob(expectedScheduleJobRequest).block();
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any());
DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue();
DaprProtos.Job job = actualScheduleJobRequest.getJob();
assertEquals("testJob", job.getName());
assertFalse(job.hasData());
assertEquals( "* * * * * *", job.getSchedule());
assertEquals(0, job.getRepeats());
assertFalse(job.hasTtl());
Assertions.assertTrue(job.hasFailurePolicy());
}
@Test
public void scheduleJobShouldHaveConstantPolicyWithMaxRetriesWhenConstantPolicyIsSetWithMaxRetries() {
doAnswer(invocation -> {
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
observer.onCompleted(); // Simulate successful response
return null;
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob",
JobSchedule.fromString("* * * * * *"))
.setFailurePolicy(new ConstantFailurePolicy(2));
previewClient.scheduleJob(expectedScheduleJobRequest).block();
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any());
DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue();
DaprProtos.Job job = actualScheduleJobRequest.getJob();
assertEquals("testJob", job.getName());
assertFalse(job.hasData());
assertEquals( "* * * * * *", job.getSchedule());
assertEquals(0, job.getRepeats());
assertFalse(job.hasTtl());
Assertions.assertTrue(job.hasFailurePolicy());
assertEquals(2, job.getFailurePolicy().getConstant().getMaxRetries());
}
@Test
public void scheduleJobShouldHaveConstantPolicyWithIntervalWhenConstantPolicyIsSetWithInterval() {
doAnswer(invocation -> {
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
observer.onCompleted(); // Simulate successful response
return null;
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob",
JobSchedule.fromString("* * * * * *"))
.setFailurePolicy(new ConstantFailurePolicy(Duration.of(2, ChronoUnit.SECONDS)));
previewClient.scheduleJob(expectedScheduleJobRequest).block();
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any());
DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue();
DaprProtos.Job job = actualScheduleJobRequest.getJob();
assertEquals("testJob", job.getName());
assertFalse(job.hasData());
assertEquals( "* * * * * *", job.getSchedule());
assertEquals(0, job.getRepeats());
assertFalse(job.hasTtl());
Assertions.assertTrue(job.hasFailurePolicy());
assertEquals(Duration.of(2, ChronoUnit.SECONDS).getNano(),
job.getFailurePolicy().getConstant().getInterval().getNanos());
}
@Test
public void scheduleJobShouldHaveBothRetiresAndIntervalWhenConstantPolicyIsSetWithRetriesAndInterval() {
doAnswer(invocation -> {
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
observer.onCompleted(); // Simulate successful response
return null;
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
ScheduleJobRequest expectedScheduleJobRequest = new ScheduleJobRequest("testJob",
JobSchedule.fromString("* * * * * *"))
.setFailurePolicy(new ConstantFailurePolicy(Duration.of(2, ChronoUnit.SECONDS))
.setMaxRetries(10));
previewClient.scheduleJob(expectedScheduleJobRequest).block();
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any());
DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue();
DaprProtos.Job job = actualScheduleJobRequest.getJob();
assertEquals("testJob", job.getName());
assertFalse(job.hasData());
assertEquals( "* * * * * *", job.getSchedule());
assertEquals(0, job.getRepeats());
assertFalse(job.hasTtl());
Assertions.assertTrue(job.hasFailurePolicy());
assertEquals(Duration.of(2, ChronoUnit.SECONDS).getNano(),
job.getFailurePolicy().getConstant().getInterval().getNanos());
assertEquals(10, job.getFailurePolicy().getConstant().getMaxRetries());
}
@Test
public void scheduleJobShouldThrowWhenNameAlreadyExists() {
AtomicInteger callCount = new AtomicInteger(0);
doAnswer(invocation -> {
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
if (callCount.incrementAndGet() == 1) {
// First call succeeds
observer.onCompleted();
} else {
// Second call fails with ALREADY_EXISTS
observer.onError(newStatusRuntimeException("ALREADY_EXISTS", "Job with name 'testJob' already exists"));
}
return null;
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
// First call should succeed
ScheduleJobRequest firstRequest = new ScheduleJobRequest("testJob", Instant.now());
assertDoesNotThrow(() -> previewClient.scheduleJob(firstRequest).block());
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
verify(daprStub, times(1)).scheduleJobAlpha1(captor.capture(), Mockito.any());
DaprProtos.ScheduleJobRequest actualScheduleJobRequest = captor.getValue();
DaprProtos.Job job = actualScheduleJobRequest.getJob();
assertEquals("testJob", job.getName());
assertFalse(job.hasData());
assertEquals(0, job.getRepeats());
assertFalse(job.hasTtl());
// Second call with same name should fail
ScheduleJobRequest secondRequest = new ScheduleJobRequest("testJob", Instant.now());
assertThrowsDaprException(
ExecutionException.class,
"ALREADY_EXISTS",
"ALREADY_EXISTS: Job with name 'testJob' already exists",
() -> previewClient.scheduleJob(secondRequest).block());
}
@Test
public void scheduleJobShouldSucceedWhenNameAlreadyExistsWithOverwrite() {
doAnswer(invocation -> {
StreamObserver<DaprProtos.ScheduleJobResponse> observer = invocation.getArgument(1);
observer.onCompleted(); // Simulate successful response for both calls
return null;
}).when(daprStub).scheduleJobAlpha1(any(DaprProtos.ScheduleJobRequest.class), any());
// First call should succeed
ScheduleJobRequest firstRequest = new ScheduleJobRequest("testJob", Instant.now());
assertDoesNotThrow(() -> previewClient.scheduleJob(firstRequest).block());
// Second call with same name but overwrite=true should also succeed
ScheduleJobRequest secondRequest = new ScheduleJobRequest("testJob", Instant.now())
.setOverwrite(true);
assertDoesNotThrow(() -> previewClient.scheduleJob(secondRequest).block());
// Verify that both calls were made successfully
ArgumentCaptor<DaprProtos.ScheduleJobRequest> captor =
ArgumentCaptor.forClass(DaprProtos.ScheduleJobRequest.class);
verify(daprStub, times(2)).scheduleJobAlpha1(captor.capture(), any());
// Verify the first call doesn't have overwrite set
DaprProtos.ScheduleJobRequest firstActualRequest = captor.getAllValues().get(0);
assertFalse(firstActualRequest.getOverwrite());
assertEquals("testJob", firstActualRequest.getJob().getName());
// Verify the second call has overwrite set to true
DaprProtos.ScheduleJobRequest secondActualRequest = captor.getAllValues().get(1);
assertTrue(secondActualRequest.getOverwrite());
assertEquals("testJob", secondActualRequest.getJob().getName());
}
@Test @Test
public void getJobShouldReturnResponseWhenAllFieldsArePresentInRequest() { public void getJobShouldReturnResponseWhenAllFieldsArePresentInRequest() {
DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") DateTimeFormatter iso8601Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")

View File

@ -30,7 +30,7 @@ Once you have the cluster up and running you can install Dapr:
helm repo add dapr https://dapr.github.io/helm-charts/ helm repo add dapr https://dapr.github.io/helm-charts/
helm repo update helm repo update
helm upgrade --install dapr dapr/dapr \ helm upgrade --install dapr dapr/dapr \
--version=1.15.7 \ --version=1.16.0-rc.3 \
--namespace dapr-system \ --namespace dapr-system \
--create-namespace \ --create-namespace \
--wait --wait

View File

@ -24,8 +24,7 @@ expected_stdout_lines:
- 'Started WorkflowPatternsApplication' - 'Started WorkflowPatternsApplication'
background: true background: true
expected_return_code: 143 expected_return_code: 143
sleep: 30 timeout_seconds: 180
timeout_seconds: 45
--> -->
<!-- Timeout for above service must be more than sleep + timeout for the client--> <!-- Timeout for above service must be more than sleep + timeout for the client-->
@ -67,15 +66,14 @@ output_match_mode: substring
expected_stdout_lines: expected_stdout_lines:
- 'TOKYO, LONDON, SEATTLE' - 'TOKYO, LONDON, SEATTLE'
background: true background: true
sleep: 1 timeout_seconds: 90
timeout_seconds: 2
--> -->
<!-- Timeout for above service must be more than sleep + timeout for the client--> <!-- Timeout for above service must be more than sleep + timeout for the client-->
To start the workflow with the three chained activities you can run: To start the workflow with the three chained activities you can run:
```sh ```sh
curl -X POST localhost:8080/wfp/chain -H 'Content-Type: application/json' sleep 35 && curl -X POST localhost:8080/wfp/chain -H 'Content-Type: application/json'
``` ```
<!-- END_STEP --> <!-- END_STEP -->
@ -145,15 +143,14 @@ output_match_mode: substring
expected_stdout_lines: expected_stdout_lines:
- '!wolfkroW rpaD olleH' - '!wolfkroW rpaD olleH'
background: true background: true
sleep: 1 timeout_seconds: 90
timeout_seconds: 2
--> -->
<!-- Timeout for above service must be more than sleep + timeout for the client--> <!-- Timeout for above service must be more than sleep + timeout for the client-->
To start the workflow with the three chained activities you can run: To start the workflow with the three chained activities you can run:
```sh ```sh
curl -X POST localhost:8080/wfp/child -H 'Content-Type: application/json' sleep 35 && curl -X POST localhost:8080/wfp/child -H 'Content-Type: application/json'
``` ```
<!-- END_STEP --> <!-- END_STEP -->
@ -195,13 +192,12 @@ output_match_mode: substring
expected_stdout_lines: expected_stdout_lines:
- '{"cleanUpTimes":5}' - '{"cleanUpTimes":5}'
background: true background: true
sleep: 10 timeout_seconds: 90
timeout_seconds: 15
--> -->
<!-- Timeout for above service must be more than sleep + timeout for the client--> <!-- Timeout for above service must be more than sleep + timeout for the client-->
```sh ```sh
curl -X POST localhost:8080/wfp/continueasnew -H 'Content-Type: application/json' sleep 30 && curl -X POST localhost:8080/wfp/continueasnew -H 'Content-Type: application/json'
``` ```
<!-- END_STEP --> <!-- END_STEP -->
@ -265,13 +261,12 @@ name: Start External Event Workflow
match_order: none match_order: none
output_match_mode: substring output_match_mode: substring
background: true background: true
sleep: 1 timeout_seconds: 90
timeout_seconds: 2
--> -->
<!-- Timeout for above service must be more than sleep + timeout for the client--> <!-- Timeout for above service must be more than sleep + timeout for the client-->
```sh ```sh
curl -X POST "localhost:8080/wfp/externalevent?orderId=123" -H 'Content-Type: application/json' sleep 30 && curl -X POST "localhost:8080/wfp/externalevent?orderId=123" -H 'Content-Type: application/json'
``` ```
<!-- END_STEP --> <!-- END_STEP -->
@ -296,15 +291,14 @@ output_match_mode: substring
expected_stdout_lines: expected_stdout_lines:
- '{"approved":true}' - '{"approved":true}'
background: true background: true
sleep: 5 timeout_seconds: 90
timeout_seconds: 10
--> -->
<!-- Timeout for above service must be more than sleep + timeout for the client--> <!-- Timeout for above service must be more than sleep + timeout for the client-->
To send the event you can run: To send the event you can run:
```sh ```sh
curl -X POST "localhost:8080/wfp/externalevent-continue?orderId=123&decision=true" -H 'Content-Type: application/json' sleep 42 && curl -X POST "localhost:8080/wfp/externalevent-continue?orderId=123&decision=true" -H 'Content-Type: application/json'
``` ```
<!-- END_STEP --> <!-- END_STEP -->
@ -353,13 +347,12 @@ output_match_mode: substring
expected_stdout_lines: expected_stdout_lines:
- '{"wordCount":60}' - '{"wordCount":60}'
background: true background: true
sleep: 1 timeout_seconds: 90
timeout_seconds: 2
--> -->
<!-- Timeout for above service must be more than sleep + timeout for the client--> <!-- Timeout for above service must be more than sleep + timeout for the client-->
```sh ```sh
curl -X POST localhost:8080/wfp/fanoutin -H 'Content-Type: application/json' -d @body.json sleep 45 && curl -X POST localhost:8080/wfp/fanoutin -H 'Content-Type: application/json' -d @body.json
``` ```
<!-- END_STEP --> <!-- END_STEP -->
@ -406,13 +399,12 @@ name: Start Suspend/Resume Workflow
match_order: none match_order: none
output_match_mode: substring output_match_mode: substring
background: true background: true
sleep: 1 timeout_seconds: 90
timeout_seconds: 2
--> -->
<!-- Timeout for above service must be more than sleep + timeout for the client--> <!-- Timeout for above service must be more than sleep + timeout for the client-->
```sh ```sh
curl -X POST "localhost:8080/wfp/suspendresume?orderId=123" -H 'Content-Type: application/json' sleep 50 && curl -X POST "localhost:8080/wfp/suspendresume?orderId=456" -H 'Content-Type: application/json'
``` ```
<!-- END_STEP --> <!-- END_STEP -->
@ -440,15 +432,14 @@ output_match_mode: substring
expected_stdout_lines: expected_stdout_lines:
- 'SUSPENDED' - 'SUSPENDED'
background: true background: true
sleep: 5 timeout_seconds: 90
timeout_seconds: 10
--> -->
<!-- Timeout for above service must be more than sleep + timeout for the client-->
Let's suspend the workflow instance by sending the following request: Let's suspend the workflow instance by sending the following request:
```sh ```sh
curl -X POST "localhost:8080/wfp/suspendresume/suspend?orderId=123" -H 'Content-Type: application/json' sleep 55 && curl -X POST "localhost:8080/wfp/suspendresume/suspend?orderId=456" -H 'Content-Type: application/json'
``` ```
<!-- END_STEP --> <!-- END_STEP -->
@ -469,15 +460,14 @@ output_match_mode: substring
expected_stdout_lines: expected_stdout_lines:
- 'RUNNING' - 'RUNNING'
background: true background: true
sleep: 5 timeout_seconds: 90
timeout_seconds: 10
--> -->
<!-- Timeout for above service must be more than sleep + timeout for the client-->
To send the event you can run: To send the event you can run:
```sh ```sh
curl -X POST "localhost:8080/wfp/suspendresume/resume?orderId=123" -H 'Content-Type: application/json' sleep 60 && curl -X POST "localhost:8080/wfp/suspendresume/resume?orderId=456" -H 'Content-Type: application/json'
``` ```
<!-- END_STEP --> <!-- END_STEP -->
@ -498,15 +488,14 @@ output_match_mode: substring
expected_stdout_lines: expected_stdout_lines:
- '{"approved":true}' - '{"approved":true}'
background: true background: true
sleep: 5 timeout_seconds: 90
timeout_seconds: 10
--> -->
<!-- Timeout for above service must be more than sleep + timeout for the client--> <!-- Timeout for above service must be more than sleep + timeout for the client-->
To send the event you can run: To send the event you can run:
```sh ```sh
curl -X POST "localhost:8080/wfp/suspendresume/continue?orderId=123&decision=true" -H 'Content-Type: application/json' sleep 65 && curl -X POST "localhost:8080/wfp/suspendresume/continue?orderId=456&decision=true" -H 'Content-Type: application/json'
``` ```
<!-- END_STEP --> <!-- END_STEP -->

View File

@ -66,7 +66,7 @@ public class WorkflowPatternsRestController {
String instanceId = daprWorkflowClient.scheduleNewWorkflow(ChainWorkflow.class); String instanceId = daprWorkflowClient.scheduleNewWorkflow(ChainWorkflow.class);
logger.info("Workflow instance " + instanceId + " started"); logger.info("Workflow instance " + instanceId + " started");
return daprWorkflowClient return daprWorkflowClient
.waitForInstanceCompletion(instanceId, Duration.ofSeconds(2), true) .waitForInstanceCompletion(instanceId, Duration.ofSeconds(10), true)
.readOutputAs(String.class); .readOutputAs(String.class);
} }
@ -80,7 +80,7 @@ public class WorkflowPatternsRestController {
String instanceId = daprWorkflowClient.scheduleNewWorkflow(ParentWorkflow.class); String instanceId = daprWorkflowClient.scheduleNewWorkflow(ParentWorkflow.class);
logger.info("Workflow instance " + instanceId + " started"); logger.info("Workflow instance " + instanceId + " started");
return daprWorkflowClient return daprWorkflowClient
.waitForInstanceCompletion(instanceId, Duration.ofSeconds(2), true) .waitForInstanceCompletion(instanceId, Duration.ofSeconds(10), true)
.readOutputAs(String.class); .readOutputAs(String.class);
} }

View File

@ -1,7 +1,7 @@
package io.dapr.testcontainers; package io.dapr.testcontainers;
public interface DaprContainerConstants { public interface DaprContainerConstants {
String DAPR_VERSION = "1.15.7"; String DAPR_VERSION = "1.16.0-rc.3";
String DAPR_RUNTIME_IMAGE_TAG = "daprio/daprd:" + DAPR_VERSION; String DAPR_RUNTIME_IMAGE_TAG = "daprio/daprd:" + DAPR_VERSION;
String DAPR_PLACEMENT_IMAGE_TAG = "daprio/placement:" + DAPR_VERSION; String DAPR_PLACEMENT_IMAGE_TAG = "daprio/placement:" + DAPR_VERSION;
String DAPR_SCHEDULER_IMAGE_TAG = "daprio/scheduler:" + DAPR_VERSION; String DAPR_SCHEDULER_IMAGE_TAG = "daprio/scheduler:" + DAPR_VERSION;