From 949584f69f4bc78ca59abe6ad000f1f2ace21768 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Wed, 28 May 2025 16:43:48 +0200 Subject: [PATCH 1/7] chore: New task execution task id test (#1352) * chore: New task execution task id test test how taskExecutionTaskId can be used for idempotency Signed-off-by: Javier Aliaga * chore: Clean up not used files Signed-off-by: Javier Aliaga * docs: Task execution keys Signed-off-by: Javier Aliaga * test: Modify unit tests Signed-off-by: Javier Aliaga * Remove new lines Signed-off-by: artur-ciocanu --------- Signed-off-by: Javier Aliaga Signed-off-by: artur-ciocanu Co-authored-by: Cassie Coyle Co-authored-by: artur-ciocanu --- .../java-workflow/java-workflow-howto.md | 43 ++++++++++++-- .../it/testcontainers/DaprWorkflowsIT.java | 25 ++++++++ .../io/dapr/it/testcontainers/KeyStore.java | 51 ++++++++++++++++ .../TaskExecutionKeyActivity.java | 35 +++++++++++ .../TestDaprWorkflowsConfiguration.java | 5 +- .../TestExecutionKeysWorkflow.java | 58 +++++++++++++++++++ .../workflows/WorkflowActivityContext.java | 2 + .../DefaultWorkflowActivityContext.java | 5 ++ .../WorkflowActivityClassWrapperTest.java | 5 +- 9 files changed, 222 insertions(+), 7 deletions(-) create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java diff --git a/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md b/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md index 2acf11252..f9afd4813 100644 --- a/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md +++ b/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md @@ -6,7 +6,7 @@ weight: 20000 description: How to get up and running with workflows using the Dapr Java SDK --- -Let’s create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will: +Let's create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will: - Execute the workflow instance using the [Java workflow worker](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java) - Utilize the Java workflow client and API calls to [start and terminate workflow instances](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java) @@ -85,11 +85,10 @@ You're up and running! Both Dapr and your app logs will appear here. == APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001. ``` -## Run the `DemoWorkflowClient +## Run the `DemoWorkflowClient` The `DemoWorkflowClient` starts instances of workflows that have been registered with Dapr. - ```java public class DemoWorkflowClient { @@ -246,4 +245,40 @@ Exiting DemoWorkflowClient. ## Next steps - [Learn more about Dapr workflow]({{< ref workflow-overview.md >}}) -- [Workflow API reference]({{< ref workflow_api.md >}}) \ No newline at end of file +- [Workflow API reference]({{< ref workflow_api.md >}}) + +## Advanced features + +### Task Execution Keys + +Task execution keys are unique identifiers generated by the durabletask-java library. They are stored in the `WorkflowActivityContext` and can be used to track and manage the execution of workflow activities. They are particularly useful for: + +1. **Idempotency**: Ensuring activities are not executed multiple times for the same task +2. **State Management**: Tracking the state of activity execution +3. **Error Handling**: Managing retries and failures in a controlled manner + +Here's an example of how to use task execution keys in your workflow activities: + +```java +public class TaskExecutionKeyActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + // Get the task execution key for this activity + String taskExecutionKey = ctx.getTaskExecutionKey(); + + // Use the key to implement idempotency or state management + // For example, check if this task has already been executed + if (isTaskAlreadyExecuted(taskExecutionKey)) { + return getPreviousResult(taskExecutionKey); + } + + // Execute the activity logic + Object result = executeActivityLogic(); + + // Store the result with the task execution key + storeResult(taskExecutionKey, result); + + return result; + } +} +``` diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java index 5c6a360c8..bb1f1c768 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java @@ -15,6 +15,7 @@ package io.dapr.it.testcontainers; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; + import io.dapr.testcontainers.Component; import io.dapr.testcontainers.DaprContainer; import io.dapr.testcontainers.DaprLogLevel; @@ -40,6 +41,7 @@ import java.util.Collections; import java.util.Map; import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; +import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -117,6 +119,29 @@ public class DaprWorkflowsIT { assertEquals(instanceId, workflowOutput.getWorkflowId()); } + @Test + public void testExecutionKeyWorkflows() throws Exception { + TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>()); + String instanceId = workflowClient.scheduleNewWorkflow(TestExecutionKeysWorkflow.class, payload); + + workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(100), false); + + Duration timeout = Duration.ofSeconds(1000); + WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true); + + assertNotNull(workflowStatus); + + TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput()); + + assertEquals(1, workflowOutput.getPayloads().size()); + assertEquals("Execution key found", workflowOutput.getPayloads().get(0)); + + String executionKey = workflowOutput.getWorkflowId() +"-"+"io.dapr.it.testcontainers.TaskExecutionKeyActivity"; + assertTrue(KeyStore.getInstance().getKey(executionKey)); + + assertEquals(instanceId, workflowOutput.getWorkflowId()); + } + private TestWorkflowPayload deserialize(String value) throws JsonProcessingException { return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class); } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java new file mode 100644 index 000000000..017e1c50b --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java @@ -0,0 +1,51 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers; + +import java.util.HashMap; +import java.util.Map; + +public class KeyStore { + + private final Map keyStore = new HashMap<>(); + + private static KeyStore instance; + + private KeyStore() { + } + + public static KeyStore getInstance() { + if (instance == null) { + synchronized (KeyStore.class) { + if (instance == null) { + instance = new KeyStore(); + } + } + } + return instance; + } + + + public void addKey(String key, Boolean value) { + keyStore.put(key, value); + } + + public Boolean getKey(String key) { + return keyStore.get(key); + } + + public void removeKey(String key) { + keyStore.remove(key); + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java new file mode 100644 index 000000000..c1a5b5038 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java @@ -0,0 +1,35 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; + +public class TaskExecutionKeyActivity implements WorkflowActivity { + + @Override + public Object run(WorkflowActivityContext ctx) { + TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class); + KeyStore keyStore = KeyStore.getInstance(); + Boolean exists = keyStore.getKey(ctx.getTaskExecutionKey()); + if (!Boolean.TRUE.equals(exists)) { + keyStore.addKey(ctx.getTaskExecutionKey(), true); + workflowPayload.getPayloads().add("Execution key not found"); + throw new IllegalStateException("Task execution key not found"); + } + workflowPayload.getPayloads().add("Execution key found"); + return workflowPayload; + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java index 0a2487b70..e868b1887 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java @@ -56,9 +56,12 @@ public class TestDaprWorkflowsConfiguration { WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(new Properties(overrides)); builder.registerWorkflow(TestWorkflow.class); + builder.registerWorkflow(TestExecutionKeysWorkflow.class); builder.registerActivity(FirstActivity.class); builder.registerActivity(SecondActivity.class); - + builder.registerActivity(TaskExecutionKeyActivity.class); + + return builder; } } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java new file mode 100644 index 000000000..30a9ea33f --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java @@ -0,0 +1,58 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers; + +import io.dapr.durabletask.Task; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.WorkflowTaskOptions; +import io.dapr.workflows.WorkflowTaskRetryPolicy; + +import java.time.Duration; + +import org.slf4j.Logger; + +public class TestExecutionKeysWorkflow implements Workflow { + + @Override + public WorkflowStub create() { + return ctx -> { + + Logger logger = ctx.getLogger(); + String instanceId = ctx.getInstanceId(); + logger.info("Starting Workflow: " + ctx.getName()); + logger.info("Instance ID: " + instanceId); + logger.info("Current Orchestration Time: " + ctx.getCurrentInstant()); + + TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class); + workflowPayload.setWorkflowId(instanceId); + + WorkflowTaskOptions options = new WorkflowTaskOptions(WorkflowTaskRetryPolicy.newBuilder() + .setMaxNumberOfAttempts(3) + .setFirstRetryInterval(Duration.ofSeconds(1)) + .setMaxRetryInterval(Duration.ofSeconds(10)) + .setBackoffCoefficient(2.0) + .setRetryTimeout(Duration.ofSeconds(50)) + .build()); + + + Task t = ctx.callActivity(TaskExecutionKeyActivity.class.getName(), workflowPayload, options,TestWorkflowPayload.class); + + TestWorkflowPayload payloadAfterExecution = t.await(); + + ctx.complete(payloadAfterExecution); + }; + } + +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java index 3fe5d88a2..90a2c41a5 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java @@ -17,6 +17,8 @@ public interface WorkflowActivityContext { String getName(); + String getTaskExecutionKey(); + T getInput(Class targetType); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java index 551c21a37..217c3cd18 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java @@ -56,4 +56,9 @@ class DefaultWorkflowActivityContext implements WorkflowActivityContext { public T getInput(Class targetType) { return this.innerContext.getInput(targetType); } + + @Override + public String getTaskExecutionKey() { + return this.innerContext.getTaskExecutionKey(); + } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java index 76a7e07af..81ac492e0 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java @@ -16,7 +16,7 @@ public class WorkflowActivityClassWrapperTest { @Override public Object run(WorkflowActivityContext ctx) { String activityContextName = ctx.getName(); - return ctx.getInput(String.class) + " world! from " + activityContextName; + return ctx.getInput(String.class) + " world! from " + activityContextName + " with task execution key " + ctx.getTaskExecutionKey(); } } @@ -37,10 +37,11 @@ public class WorkflowActivityClassWrapperTest { when(mockContext.getInput(String.class)).thenReturn("Hello"); when(mockContext.getName()).thenReturn("TestActivityContext"); + when(mockContext.getTaskExecutionKey()).thenReturn("123"); Object result = wrapper.create().run(mockContext); verify(mockContext, times(1)).getInput(String.class); - assertEquals("Hello world! from TestActivityContext", result); + assertEquals("Hello world! from TestActivityContext with task execution key 123", result); } } From ba3b529830267499156bf68f680284499363a939 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Wed, 28 May 2025 20:14:59 +0200 Subject: [PATCH 2/7] Revert "chore: New task execution task id test (#1352)" (#1389) This reverts commit 949584f69f4bc78ca59abe6ad000f1f2ace21768. Signed-off-by: Javier Aliaga --- .../java-workflow/java-workflow-howto.md | 43 ++------------ .../it/testcontainers/DaprWorkflowsIT.java | 25 -------- .../io/dapr/it/testcontainers/KeyStore.java | 51 ---------------- .../TaskExecutionKeyActivity.java | 35 ----------- .../TestDaprWorkflowsConfiguration.java | 5 +- .../TestExecutionKeysWorkflow.java | 58 ------------------- .../workflows/WorkflowActivityContext.java | 2 - .../DefaultWorkflowActivityContext.java | 5 -- .../WorkflowActivityClassWrapperTest.java | 5 +- 9 files changed, 7 insertions(+), 222 deletions(-) delete mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java delete mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java delete mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java diff --git a/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md b/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md index f9afd4813..2acf11252 100644 --- a/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md +++ b/daprdocs/content/en/java-sdk-docs/java-workflow/java-workflow-howto.md @@ -6,7 +6,7 @@ weight: 20000 description: How to get up and running with workflows using the Dapr Java SDK --- -Let's create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will: +Let’s create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will: - Execute the workflow instance using the [Java workflow worker](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java) - Utilize the Java workflow client and API calls to [start and terminate workflow instances](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java) @@ -85,10 +85,11 @@ You're up and running! Both Dapr and your app logs will appear here. == APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001. ``` -## Run the `DemoWorkflowClient` +## Run the `DemoWorkflowClient The `DemoWorkflowClient` starts instances of workflows that have been registered with Dapr. + ```java public class DemoWorkflowClient { @@ -245,40 +246,4 @@ Exiting DemoWorkflowClient. ## Next steps - [Learn more about Dapr workflow]({{< ref workflow-overview.md >}}) -- [Workflow API reference]({{< ref workflow_api.md >}}) - -## Advanced features - -### Task Execution Keys - -Task execution keys are unique identifiers generated by the durabletask-java library. They are stored in the `WorkflowActivityContext` and can be used to track and manage the execution of workflow activities. They are particularly useful for: - -1. **Idempotency**: Ensuring activities are not executed multiple times for the same task -2. **State Management**: Tracking the state of activity execution -3. **Error Handling**: Managing retries and failures in a controlled manner - -Here's an example of how to use task execution keys in your workflow activities: - -```java -public class TaskExecutionKeyActivity implements WorkflowActivity { - @Override - public Object run(WorkflowActivityContext ctx) { - // Get the task execution key for this activity - String taskExecutionKey = ctx.getTaskExecutionKey(); - - // Use the key to implement idempotency or state management - // For example, check if this task has already been executed - if (isTaskAlreadyExecuted(taskExecutionKey)) { - return getPreviousResult(taskExecutionKey); - } - - // Execute the activity logic - Object result = executeActivityLogic(); - - // Store the result with the task execution key - storeResult(taskExecutionKey, result); - - return result; - } -} -``` +- [Workflow API reference]({{< ref workflow_api.md >}}) \ No newline at end of file diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java index bb1f1c768..5c6a360c8 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java @@ -15,7 +15,6 @@ package io.dapr.it.testcontainers; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; - import io.dapr.testcontainers.Component; import io.dapr.testcontainers.DaprContainer; import io.dapr.testcontainers.DaprLogLevel; @@ -41,7 +40,6 @@ import java.util.Collections; import java.util.Map; import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; -import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -119,29 +117,6 @@ public class DaprWorkflowsIT { assertEquals(instanceId, workflowOutput.getWorkflowId()); } - @Test - public void testExecutionKeyWorkflows() throws Exception { - TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>()); - String instanceId = workflowClient.scheduleNewWorkflow(TestExecutionKeysWorkflow.class, payload); - - workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(100), false); - - Duration timeout = Duration.ofSeconds(1000); - WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true); - - assertNotNull(workflowStatus); - - TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput()); - - assertEquals(1, workflowOutput.getPayloads().size()); - assertEquals("Execution key found", workflowOutput.getPayloads().get(0)); - - String executionKey = workflowOutput.getWorkflowId() +"-"+"io.dapr.it.testcontainers.TaskExecutionKeyActivity"; - assertTrue(KeyStore.getInstance().getKey(executionKey)); - - assertEquals(instanceId, workflowOutput.getWorkflowId()); - } - private TestWorkflowPayload deserialize(String value) throws JsonProcessingException { return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class); } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java deleted file mode 100644 index 017e1c50b..000000000 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/KeyStore.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2025 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ -package io.dapr.it.testcontainers; - -import java.util.HashMap; -import java.util.Map; - -public class KeyStore { - - private final Map keyStore = new HashMap<>(); - - private static KeyStore instance; - - private KeyStore() { - } - - public static KeyStore getInstance() { - if (instance == null) { - synchronized (KeyStore.class) { - if (instance == null) { - instance = new KeyStore(); - } - } - } - return instance; - } - - - public void addKey(String key, Boolean value) { - keyStore.put(key, value); - } - - public Boolean getKey(String key) { - return keyStore.get(key); - } - - public void removeKey(String key) { - keyStore.remove(key); - } - -} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java deleted file mode 100644 index c1a5b5038..000000000 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TaskExecutionKeyActivity.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2025 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.it.testcontainers; - -import io.dapr.workflows.WorkflowActivity; -import io.dapr.workflows.WorkflowActivityContext; - -public class TaskExecutionKeyActivity implements WorkflowActivity { - - @Override - public Object run(WorkflowActivityContext ctx) { - TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class); - KeyStore keyStore = KeyStore.getInstance(); - Boolean exists = keyStore.getKey(ctx.getTaskExecutionKey()); - if (!Boolean.TRUE.equals(exists)) { - keyStore.addKey(ctx.getTaskExecutionKey(), true); - workflowPayload.getPayloads().add("Execution key not found"); - throw new IllegalStateException("Task execution key not found"); - } - workflowPayload.getPayloads().add("Execution key found"); - return workflowPayload; - } - -} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java index e868b1887..0a2487b70 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestDaprWorkflowsConfiguration.java @@ -56,12 +56,9 @@ public class TestDaprWorkflowsConfiguration { WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(new Properties(overrides)); builder.registerWorkflow(TestWorkflow.class); - builder.registerWorkflow(TestExecutionKeysWorkflow.class); builder.registerActivity(FirstActivity.class); builder.registerActivity(SecondActivity.class); - builder.registerActivity(TaskExecutionKeyActivity.class); - - + return builder; } } diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java deleted file mode 100644 index 30a9ea33f..000000000 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/TestExecutionKeysWorkflow.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2025 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.it.testcontainers; - -import io.dapr.durabletask.Task; -import io.dapr.workflows.Workflow; -import io.dapr.workflows.WorkflowStub; -import io.dapr.workflows.WorkflowTaskOptions; -import io.dapr.workflows.WorkflowTaskRetryPolicy; - -import java.time.Duration; - -import org.slf4j.Logger; - -public class TestExecutionKeysWorkflow implements Workflow { - - @Override - public WorkflowStub create() { - return ctx -> { - - Logger logger = ctx.getLogger(); - String instanceId = ctx.getInstanceId(); - logger.info("Starting Workflow: " + ctx.getName()); - logger.info("Instance ID: " + instanceId); - logger.info("Current Orchestration Time: " + ctx.getCurrentInstant()); - - TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class); - workflowPayload.setWorkflowId(instanceId); - - WorkflowTaskOptions options = new WorkflowTaskOptions(WorkflowTaskRetryPolicy.newBuilder() - .setMaxNumberOfAttempts(3) - .setFirstRetryInterval(Duration.ofSeconds(1)) - .setMaxRetryInterval(Duration.ofSeconds(10)) - .setBackoffCoefficient(2.0) - .setRetryTimeout(Duration.ofSeconds(50)) - .build()); - - - Task t = ctx.callActivity(TaskExecutionKeyActivity.class.getName(), workflowPayload, options,TestWorkflowPayload.class); - - TestWorkflowPayload payloadAfterExecution = t.await(); - - ctx.complete(payloadAfterExecution); - }; - } - -} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java index 90a2c41a5..3fe5d88a2 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowActivityContext.java @@ -17,8 +17,6 @@ public interface WorkflowActivityContext { String getName(); - String getTaskExecutionKey(); - T getInput(Class targetType); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java index 217c3cd18..551c21a37 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java @@ -56,9 +56,4 @@ class DefaultWorkflowActivityContext implements WorkflowActivityContext { public T getInput(Class targetType) { return this.innerContext.getInput(targetType); } - - @Override - public String getTaskExecutionKey() { - return this.innerContext.getTaskExecutionKey(); - } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java index 81ac492e0..76a7e07af 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java @@ -16,7 +16,7 @@ public class WorkflowActivityClassWrapperTest { @Override public Object run(WorkflowActivityContext ctx) { String activityContextName = ctx.getName(); - return ctx.getInput(String.class) + " world! from " + activityContextName + " with task execution key " + ctx.getTaskExecutionKey(); + return ctx.getInput(String.class) + " world! from " + activityContextName; } } @@ -37,11 +37,10 @@ public class WorkflowActivityClassWrapperTest { when(mockContext.getInput(String.class)).thenReturn("Hello"); when(mockContext.getName()).thenReturn("TestActivityContext"); - when(mockContext.getTaskExecutionKey()).thenReturn("123"); Object result = wrapper.create().run(mockContext); verify(mockContext, times(1)).getInput(String.class); - assertEquals("Hello world! from TestActivityContext with task execution key 123", result); + assertEquals("Hello world! from TestActivityContext", result); } } From 6cecd7674593ba663e3e9762e6b294b35d659417 Mon Sep 17 00:00:00 2001 From: Cassie Coyle Date: Wed, 28 May 2025 15:01:04 -0500 Subject: [PATCH 3/7] 1.5.5 (#1390) Signed-off-by: Cassandra Coyle --- sdk-workflows/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk-workflows/pom.xml b/sdk-workflows/pom.xml index 1b4c704d9..582715467 100644 --- a/sdk-workflows/pom.xml +++ b/sdk-workflows/pom.xml @@ -47,7 +47,7 @@ io.dapr durabletask-client - 1.5.4 + 1.5.5 + + org.springframework + spring-web + ${springframework.version} + + + org.springframework.boot + spring-boot-configuration-processor + ${springboot.version} + + + org.springframework.boot + spring-boot-starter + ${springboot.version} + + + org.springframework.boot + spring-boot-autoconfigure-processor + ${springboot.version} + + + io.dapr.spring + dapr-spring-boot-tests + ${dapr.spring.version} + + + org.junit.jupiter junit-jupiter-api ${junit.version} + test + + + org.springframework.boot + spring-boot-starter-test + ${springboot.version} + test org.junit.jupiter junit-jupiter-params ${junit.version} + test org.junit.jupiter junit-jupiter-engine ${junit.version} + test org.junit.jupiter junit-jupiter ${junit.version} + test org.testcontainers junit-jupiter ${testcontainers.version} + test ch.qos.logback @@ -79,46 +170,6 @@ - - - - io.dapr - dapr-sdk - ${dapr.sdk.version} - - - io.dapr - dapr-sdk-actors - ${dapr.sdk.version} - - - - - org.springframework - spring-web - true - - - org.springframework - spring-context - true - - - org.springframework.boot - spring-boot-configuration-processor - ${springboot.version} - true - - - - - org.springframework.boot - spring-boot-starter-test - ${springboot.version} - test - - - diff --git a/pom.xml b/pom.xml index c0c67a89d..a5d320569 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,7 @@ + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 io.dapr @@ -129,11 +129,11 @@ ${grpc.version} test - - org.jetbrains.kotlin - kotlin-stdlib - 2.1.0 - + + org.jetbrains.kotlin + kotlin-stdlib + 2.1.0 + org.yaml snakeyaml @@ -241,6 +241,11 @@ spring-boot-testcontainers ${springboot.version} + + io.dapr + testcontainers-dapr + ${dapr.sdk.alpha.version} + org.testcontainers toxiproxy diff --git a/spring-boot-examples/consumer-app/pom.xml b/spring-boot-examples/consumer-app/pom.xml index ad64eb49a..8bb5e715d 100644 --- a/spring-boot-examples/consumer-app/pom.xml +++ b/spring-boot-examples/consumer-app/pom.xml @@ -55,6 +55,15 @@ rest-assured test + + org.junit.jupiter + junit-jupiter-api + test + + + org.springframework.boot + spring-boot-starter-test + From 1852cc55903a983264aa11b7ad4edf2be5cd12cc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 29 May 2025 10:28:55 -0500 Subject: [PATCH 6/7] Bump org.springframework:spring-context in /dapr-spring (#1394) Bumps [org.springframework:spring-context](https://github.com/spring-projects/spring-framework) from 6.1.8 to 6.1.14. - [Release notes](https://github.com/spring-projects/spring-framework/releases) - [Commits](https://github.com/spring-projects/spring-framework/compare/v6.1.8...v6.1.14) --- updated-dependencies: - dependency-name: org.springframework:spring-context dependency-version: 6.1.14 dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- dapr-spring/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dapr-spring/pom.xml b/dapr-spring/pom.xml index f332fb217..0f4841cf8 100644 --- a/dapr-spring/pom.xml +++ b/dapr-spring/pom.xml @@ -35,7 +35,7 @@ 5.11.2 0.16.0-SNAPSHOT 3.2.6 - 6.1.8 + 6.1.14 1.4.14 From 114e354363d62fd7748c6e58ee6b8ff4b57b2a30 Mon Sep 17 00:00:00 2001 From: Cassie Coyle Date: Thu, 29 May 2025 14:54:50 -0500 Subject: [PATCH 7/7] Compensation example for Workflows (#1333) * add basic compensation example for wf Signed-off-by: Cassandra Coyle * update commands to run + wf id Signed-off-by: Cassandra Coyle * update readme + add mechanical markdown Signed-off-by: Cassandra Coyle * fix import Signed-off-by: Cassandra Coyle * fix mechanical markdown + add how to test it locally Signed-off-by: Cassandra Coyle * move compensation example readme to workflows readme Signed-off-by: Cassandra Coyle * Update BookCarActivity.java Signed-off-by: artur-ciocanu * Update BookFlightActivity.java Signed-off-by: artur-ciocanu * Update BookHotelActivity.java Signed-off-by: artur-ciocanu * Update BookTripClient.java Signed-off-by: artur-ciocanu * Update BookTripWorker.java Signed-off-by: artur-ciocanu * Update BookTripWorkflow.java Signed-off-by: artur-ciocanu * Update CancelCarActivity.java Signed-off-by: artur-ciocanu * Update CancelFlightActivity.java Signed-off-by: artur-ciocanu * Update CancelHotelActivity.java Signed-off-by: artur-ciocanu * add retry IT tests and catch TaskFailedException Signed-off-by: Cassandra Coyle * add test for no compensation if successful and assert attempts Signed-off-by: Cassandra Coyle * update mechanical markdown Signed-off-by: Cassandra Coyle * add back pubsub... but this should be removed long term Signed-off-by: Cassandra Coyle * try adding waitforsidecar Signed-off-by: Cassandra Coyle * rm tests from examples pr Signed-off-by: Cassandra Coyle * reset unintended changes Signed-off-by: Cassandra Coyle --------- Signed-off-by: Cassandra Coyle Signed-off-by: artur-ciocanu Co-authored-by: artur-ciocanu --- CONTRIBUTING.md | 43 ++++++- .../java/io/dapr/examples/workflows/README.md | 119 +++++++++++++++++- .../compensation/BookCarActivity.java | 42 +++++++ .../compensation/BookFlightActivity.java | 41 ++++++ .../compensation/BookHotelActivity.java | 40 ++++++ .../compensation/BookTripClient.java | 35 ++++++ .../compensation/BookTripWorker.java | 38 ++++++ .../compensation/BookTripWorkflow.java | 107 ++++++++++++++++ .../compensation/CancelCarActivity.java | 41 ++++++ .../compensation/CancelFlightActivity.java | 41 ++++++ .../compensation/CancelHotelActivity.java | 41 ++++++ 11 files changed, 584 insertions(+), 4 deletions(-) create mode 100644 examples/src/main/java/io/dapr/examples/workflows/compensation/BookCarActivity.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/compensation/BookFlightActivity.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/compensation/BookHotelActivity.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripClient.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorker.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorkflow.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/compensation/CancelCarActivity.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/compensation/CancelFlightActivity.java create mode 100644 examples/src/main/java/io/dapr/examples/workflows/compensation/CancelHotelActivity.java diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index c094f92fa..06c911702 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -54,7 +54,48 @@ This section describes the guidelines for contributing code / docs to Dapr. ### Things to consider when adding new API to SDK 1. All the new API's go under [dapr-sdk maven package](https://github.com/dapr/java-sdk/tree/master/sdk) -2. Make sure there is an example talking about how to use the API along with a README. [Example](https://github.com/dapr/java-sdk/pull/1235/files#diff-69ed756c4c01fd5fa884aac030dccb8f3f4d4fefa0dc330862d55a6f87b34a14) +2. Make sure there is an example talking about how to use the API along with a README with mechanical markdown. [Example](https://github.com/dapr/java-sdk/pull/1235/files#diff-69ed756c4c01fd5fa884aac030dccb8f3f4d4fefa0dc330862d55a6f87b34a14) + +#### Mechanical Markdown + +Mechanical markdown is used to validate example outputs in our CI pipeline. It ensures that the expected output in README files matches the actual output when running the examples. This helps maintain example output, catches any unintended changes in example behavior, and regressions. + +To test mechanical markdown locally: + +1. Install the package: +```bash +pip3 install mechanical-markdown +``` + +2. Run the test from the respective examples README directory, for example: +```bash +cd examples +mm.py ./src/main/java/io/dapr/examples/workflows/README.md +``` + +The test will: +- Parse the STEP markers in the README +- Execute the commands specified in the markers +- Compare the actual output with the expected output +- Report any mismatches + +When writing STEP markers: +- Use `output_match_mode: substring` for flexible matching +- Quote strings containing special YAML characters (like `:`, `*`, `'`) +- Set appropriate timeouts for long-running examples + +Example STEP marker: +```yaml + +``` ### Pull Requests diff --git a/examples/src/main/java/io/dapr/examples/workflows/README.md b/examples/src/main/java/io/dapr/examples/workflows/README.md index 047f1d744..4d9c057f2 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/README.md +++ b/examples/src/main/java/io/dapr/examples/workflows/README.md @@ -51,7 +51,8 @@ Those examples contain the following workflow patterns: 2. [Fan-out/Fan-in Pattern](#fan-outfan-in-pattern) 3. [Continue As New Pattern](#continue-as-new-pattern) 4. [External Event Pattern](#external-event-pattern) -5. [child-workflow Pattern](#child-workflow-pattern) +5. [Child-workflow Pattern](#child-workflow-pattern) +6. [Compensation Pattern](#compensation-pattern) ### Chaining Pattern In the chaining pattern, a sequence of activities executes in a specific order. @@ -353,7 +354,7 @@ dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- ``` ```sh java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewClient -```` +``` You will see the logs from worker showing the `CleanUpActivity` is invoked every 10 seconds after previous one is finished: ```text @@ -444,7 +445,7 @@ Started a new external-event model workflow with instance ID: 23410d96-1afe-4698 workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed. ``` -### child-workflow Pattern +### Child-workflow Pattern The child-workflow pattern allows you to call a workflow from another workflow. The `DemoWorkflow` class defines the workflow. It calls a child-workflow `DemoChildWorkflow` to do the work. See the code snippet below: @@ -540,3 +541,115 @@ The log from client: Started a new child-workflow model workflow with instance ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb workflow instance with ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb completed with result: !wolfkroW rpaD olleH ``` + +### Compensation Pattern +The compensation pattern is used to "undo" or "roll back" previously completed steps if a later step fails. This pattern is particularly useful in scenarios where you need to ensure that all resources are properly cleaned up even if the process fails. + +The example simulates a trip booking workflow that books a flight, hotel, and car. If any step fails, the workflow will automatically compensate (cancel) the previously completed bookings in reverse order. + +The `BookTripWorkflow` class defines the workflow. It orchestrates the booking process and handles compensation if any step fails. See the code snippet below: +```java +public class BookTripWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + List compensations = new ArrayList<>(); + + try { + // Book flight + String flightResult = ctx.callActivity(BookFlightActivity.class.getName(), String.class).await(); + ctx.getLogger().info("Flight booking completed: " + flightResult); + compensations.add(CancelFlightActivity.class.getName()); + + // Book hotel + String hotelResult = ctx.callActivity(BookHotelActivity.class.getName(), String.class).await(); + ctx.getLogger().info("Hotel booking completed: " + hotelResult); + compensations.add(CancelHotelActivity.class.getName()); + + // Book car + String carResult = ctx.callActivity(BookCarActivity.class.getName(), String.class).await(); + ctx.getLogger().info("Car booking completed: " + carResult); + compensations.add(CancelCarActivity.class.getName()); + + } catch (Exception e) { + ctx.getLogger().info("******** executing compensation logic ********"); + // Execute compensations in reverse order + Collections.reverse(compensations); + for (String compensation : compensations) { + try { + ctx.callActivity(compensation, String.class).await(); + } catch (Exception ex) { + ctx.getLogger().error("Error during compensation: " + ex.getMessage()); + } + } + ctx.complete("Workflow failed, compensation applied"); + return; + } + ctx.complete("All bookings completed successfully"); + }; + } +} +``` + +Each activity class (`BookFlightActivity`, `BookHotelActivity`, `BookCarActivity`) implements the booking logic, while their corresponding compensation activities (`CancelFlightActivity`, `CancelHotelActivity`, `CancelCarActivity`) implement the cancellation logic. + + + +Execute the following script in order to run the BookTripWorker: +```sh +dapr run --app-id book-trip-worker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.compensation.BookTripWorker +``` + +Once running, execute the following script to run the BookTripClient: +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.compensation.BookTripClient +``` + + +The output demonstrates: +1. The workflow starts and successfully books a flight +2. Then successfully books a hotel +3. When attempting to book a car, it fails (intentionally) +4. The compensation logic triggers, canceling the hotel and flight in reverse order +5. The workflow completes with a status indicating the compensation was applied + +Key Points: +1. Each successful booking step adds its compensation action to an ArrayList +2. If an error occurs, the list of compensations is reversed and executed in reverse order +3. The workflow ensures that all resources are properly cleaned up even if the process fails +4. Each activity simulates work with a short delay for demonstration purposes \ No newline at end of file diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookCarActivity.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookCarActivity.java new file mode 100644 index 000000000..9ad0285d9 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookCarActivity.java @@ -0,0 +1,42 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class BookCarActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(BookCarActivity.class); + + @Override + public String run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + // Simulate work + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + logger.info("Forcing Failure to trigger compensation for activity: " + ctx.getName()); + + // force the compensation + throw new RuntimeException("Failed to book car"); + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookFlightActivity.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookFlightActivity.java new file mode 100644 index 000000000..075c4d275 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookFlightActivity.java @@ -0,0 +1,41 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class BookFlightActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(BookFlightActivity.class); + + @Override + public String run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + // Simulate work + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + String result = "Flight booked successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookHotelActivity.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookHotelActivity.java new file mode 100644 index 000000000..a2eca04c4 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookHotelActivity.java @@ -0,0 +1,40 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BookHotelActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(BookHotelActivity.class); + + @Override + public String run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + logger.info("Simulating hotel booking process..."); + + // Simulate some work + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + String result = "Hotel booked successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripClient.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripClient.java new file mode 100644 index 000000000..212c1f0a1 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripClient.java @@ -0,0 +1,35 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowInstanceStatus; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +public class BookTripClient { + public static void main(String[] args) { + try (DaprWorkflowClient client = new DaprWorkflowClient()) { + String instanceId = client.scheduleNewWorkflow(BookTripWorkflow.class); + System.out.printf("Started a new trip booking workflow with instance ID: %s%n", instanceId); + + WorkflowInstanceStatus status = client.waitForInstanceCompletion(instanceId, Duration.ofMinutes(30), true); + System.out.printf("Workflow instance with ID: %s completed with status: %s%n", instanceId, status); + System.out.printf("Workflow output: %s%n", status.getSerializedOutput()); + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorker.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorker.java new file mode 100644 index 000000000..d32ade26a --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorker.java @@ -0,0 +1,38 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class BookTripWorker { + + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder() + .registerWorkflow(BookTripWorkflow.class) + .registerActivity(BookFlightActivity.class) + .registerActivity(CancelFlightActivity.class) + .registerActivity(BookHotelActivity.class) + .registerActivity(CancelHotelActivity.class) + .registerActivity(BookCarActivity.class) + .registerActivity(CancelCarActivity.class); + + // Build and start the workflow runtime + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorkflow.java new file mode 100644 index 000000000..f375363ed --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorkflow.java @@ -0,0 +1,107 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.durabletask.TaskFailedException; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.WorkflowTaskOptions; +import io.dapr.workflows.WorkflowTaskRetryPolicy; + +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; +import java.time.Duration; + +public class BookTripWorkflow implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + List compensations = new ArrayList<>(); + + // Define retry policy for compensation activities + WorkflowTaskRetryPolicy compensationRetryPolicy = WorkflowTaskRetryPolicy.newBuilder() + .setFirstRetryInterval(Duration.ofSeconds(1)) + .setMaxNumberOfAttempts(3) + .build(); + + WorkflowTaskOptions compensationOptions = new WorkflowTaskOptions(compensationRetryPolicy); + + try { + // Book flight + String flightResult = ctx.callActivity(BookFlightActivity.class.getName(), null, String.class).await(); + ctx.getLogger().info("Flight booking completed: {}", flightResult); + compensations.add("CancelFlight"); + + // Book hotel + String hotelResult = ctx.callActivity(BookHotelActivity.class.getName(), null, String.class).await(); + ctx.getLogger().info("Hotel booking completed: {}", hotelResult); + compensations.add("CancelHotel"); + + // Book car + String carResult = ctx.callActivity(BookCarActivity.class.getName(), null, String.class).await(); + ctx.getLogger().info("Car booking completed: {}", carResult); + compensations.add("CancelCar"); + + String result = String.format("%s, %s, %s", flightResult, hotelResult, carResult); + ctx.getLogger().info("Trip booked successfully: {}", result); + ctx.complete(result); + + } catch (TaskFailedException e) { + ctx.getLogger().info("******** executing compensation logic ********"); + ctx.getLogger().error("Activity failed: {}", e.getMessage()); + + // Execute compensations in reverse order + Collections.reverse(compensations); + for (String compensation : compensations) { + try { + switch (compensation) { + case "CancelCar": + String carCancelResult = ctx.callActivity( + CancelCarActivity.class.getName(), + null, + compensationOptions, + String.class).await(); + ctx.getLogger().info("Car cancellation completed: {}", carCancelResult); + break; + + case "CancelHotel": + String hotelCancelResult = ctx.callActivity( + CancelHotelActivity.class.getName(), + null, + compensationOptions, + String.class).await(); + ctx.getLogger().info("Hotel cancellation completed: {}", hotelCancelResult); + break; + + case "CancelFlight": + String flightCancelResult = ctx.callActivity( + CancelFlightActivity.class.getName(), + null, + compensationOptions, + String.class).await(); + ctx.getLogger().info("Flight cancellation completed: {}", flightCancelResult); + break; + } + } catch (TaskFailedException ex) { + // Only catch TaskFailedException for actual activity failures + ctx.getLogger().error("Activity failed during compensation: {}", ex.getMessage()); + } + } + ctx.complete("Workflow failed, compensation applied"); + } + }; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelCarActivity.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelCarActivity.java new file mode 100644 index 000000000..bca6af0da --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelCarActivity.java @@ -0,0 +1,41 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class CancelCarActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(CancelCarActivity.class); + + @Override + public String run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + // Simulate work + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + String result = "Car canceled successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelFlightActivity.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelFlightActivity.java new file mode 100644 index 000000000..0c2034dee --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelFlightActivity.java @@ -0,0 +1,41 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class CancelFlightActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(CancelFlightActivity.class); + + @Override + public String run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + // Simulate work + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + String result = "Flight canceled successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelHotelActivity.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelHotelActivity.java new file mode 100644 index 000000000..03f5f9b64 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelHotelActivity.java @@ -0,0 +1,41 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class CancelHotelActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(CancelHotelActivity.class); + + @Override + public String run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + // Simulate work + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + String result = "Hotel canceled successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +}