chore: New task execution task id test (#1352)

* chore: New task execution task id test

test how taskExecutionTaskId can be used for idempotency

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* chore: Clean up not used files

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* docs: Task execution keys

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* test: Modify unit tests

Signed-off-by: Javier Aliaga <javier@diagrid.io>

* Remove new lines

Signed-off-by: artur-ciocanu <artur.ciocanu@gmail.com>

---------

Signed-off-by: Javier Aliaga <javier@diagrid.io>
Signed-off-by: artur-ciocanu <artur.ciocanu@gmail.com>
Co-authored-by: Cassie Coyle <cassie.i.coyle@gmail.com>
Co-authored-by: artur-ciocanu <artur.ciocanu@gmail.com>
This commit is contained in:
Javier Aliaga 2025-05-28 16:43:48 +02:00 committed by GitHub
parent a99d286a88
commit 949584f69f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 222 additions and 7 deletions

View File

@ -6,7 +6,7 @@ weight: 20000
description: How to get up and running with workflows using the Dapr Java SDK
---
Lets create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will:
Let's create a Dapr workflow and invoke it using the console. With the [provided workflow example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows), you will:
- Execute the workflow instance using the [Java workflow worker](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java)
- Utilize the Java workflow client and API calls to [start and terminate workflow instances](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java)
@ -85,11 +85,10 @@ You're up and running! Both Dapr and your app logs will appear here.
== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001.
```
## Run the `DemoWorkflowClient
## Run the `DemoWorkflowClient`
The `DemoWorkflowClient` starts instances of workflows that have been registered with Dapr.
```java
public class DemoWorkflowClient {
@ -246,4 +245,40 @@ Exiting DemoWorkflowClient.
## Next steps
- [Learn more about Dapr workflow]({{< ref workflow-overview.md >}})
- [Workflow API reference]({{< ref workflow_api.md >}})
- [Workflow API reference]({{< ref workflow_api.md >}})
## Advanced features
### Task Execution Keys
Task execution keys are unique identifiers generated by the durabletask-java library. They are stored in the `WorkflowActivityContext` and can be used to track and manage the execution of workflow activities. They are particularly useful for:
1. **Idempotency**: Ensuring activities are not executed multiple times for the same task
2. **State Management**: Tracking the state of activity execution
3. **Error Handling**: Managing retries and failures in a controlled manner
Here's an example of how to use task execution keys in your workflow activities:
```java
public class TaskExecutionKeyActivity implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
// Get the task execution key for this activity
String taskExecutionKey = ctx.getTaskExecutionKey();
// Use the key to implement idempotency or state management
// For example, check if this task has already been executed
if (isTaskAlreadyExecuted(taskExecutionKey)) {
return getPreviousResult(taskExecutionKey);
}
// Execute the activity logic
Object result = executeActivityLogic();
// Store the result with the task execution key
storeResult(taskExecutionKey, result);
return result;
}
}
```

View File

@ -15,6 +15,7 @@ package io.dapr.it.testcontainers;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.testcontainers.Component;
import io.dapr.testcontainers.DaprContainer;
import io.dapr.testcontainers.DaprLogLevel;
@ -40,6 +41,7 @@ import java.util.Collections;
import java.util.Map;
import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -117,6 +119,29 @@ public class DaprWorkflowsIT {
assertEquals(instanceId, workflowOutput.getWorkflowId());
}
@Test
public void testExecutionKeyWorkflows() throws Exception {
TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>());
String instanceId = workflowClient.scheduleNewWorkflow(TestExecutionKeysWorkflow.class, payload);
workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(100), false);
Duration timeout = Duration.ofSeconds(1000);
WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId, timeout, true);
assertNotNull(workflowStatus);
TestWorkflowPayload workflowOutput = deserialize(workflowStatus.getSerializedOutput());
assertEquals(1, workflowOutput.getPayloads().size());
assertEquals("Execution key found", workflowOutput.getPayloads().get(0));
String executionKey = workflowOutput.getWorkflowId() +"-"+"io.dapr.it.testcontainers.TaskExecutionKeyActivity";
assertTrue(KeyStore.getInstance().getKey(executionKey));
assertEquals(instanceId, workflowOutput.getWorkflowId());
}
private TestWorkflowPayload deserialize(String value) throws JsonProcessingException {
return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class);
}

View File

@ -0,0 +1,51 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.testcontainers;
import java.util.HashMap;
import java.util.Map;
public class KeyStore {
private final Map<String, Boolean> keyStore = new HashMap<>();
private static KeyStore instance;
private KeyStore() {
}
public static KeyStore getInstance() {
if (instance == null) {
synchronized (KeyStore.class) {
if (instance == null) {
instance = new KeyStore();
}
}
}
return instance;
}
public void addKey(String key, Boolean value) {
keyStore.put(key, value);
}
public Boolean getKey(String key) {
return keyStore.get(key);
}
public void removeKey(String key) {
keyStore.remove(key);
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.testcontainers;
import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;
public class TaskExecutionKeyActivity implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class);
KeyStore keyStore = KeyStore.getInstance();
Boolean exists = keyStore.getKey(ctx.getTaskExecutionKey());
if (!Boolean.TRUE.equals(exists)) {
keyStore.addKey(ctx.getTaskExecutionKey(), true);
workflowPayload.getPayloads().add("Execution key not found");
throw new IllegalStateException("Task execution key not found");
}
workflowPayload.getPayloads().add("Execution key found");
return workflowPayload;
}
}

View File

@ -56,9 +56,12 @@ public class TestDaprWorkflowsConfiguration {
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(new Properties(overrides));
builder.registerWorkflow(TestWorkflow.class);
builder.registerWorkflow(TestExecutionKeysWorkflow.class);
builder.registerActivity(FirstActivity.class);
builder.registerActivity(SecondActivity.class);
builder.registerActivity(TaskExecutionKeyActivity.class);
return builder;
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.testcontainers;
import io.dapr.durabletask.Task;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryPolicy;
import java.time.Duration;
import org.slf4j.Logger;
public class TestExecutionKeysWorkflow implements Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
Logger logger = ctx.getLogger();
String instanceId = ctx.getInstanceId();
logger.info("Starting Workflow: " + ctx.getName());
logger.info("Instance ID: " + instanceId);
logger.info("Current Orchestration Time: " + ctx.getCurrentInstant());
TestWorkflowPayload workflowPayload = ctx.getInput(TestWorkflowPayload.class);
workflowPayload.setWorkflowId(instanceId);
WorkflowTaskOptions options = new WorkflowTaskOptions(WorkflowTaskRetryPolicy.newBuilder()
.setMaxNumberOfAttempts(3)
.setFirstRetryInterval(Duration.ofSeconds(1))
.setMaxRetryInterval(Duration.ofSeconds(10))
.setBackoffCoefficient(2.0)
.setRetryTimeout(Duration.ofSeconds(50))
.build());
Task<TestWorkflowPayload> t = ctx.callActivity(TaskExecutionKeyActivity.class.getName(), workflowPayload, options,TestWorkflowPayload.class);
TestWorkflowPayload payloadAfterExecution = t.await();
ctx.complete(payloadAfterExecution);
};
}
}

View File

@ -17,6 +17,8 @@ public interface WorkflowActivityContext {
String getName();
String getTaskExecutionKey();
<T> T getInput(Class<T> targetType);
}

View File

@ -56,4 +56,9 @@ class DefaultWorkflowActivityContext implements WorkflowActivityContext {
public <T> T getInput(Class<T> targetType) {
return this.innerContext.getInput(targetType);
}
@Override
public String getTaskExecutionKey() {
return this.innerContext.getTaskExecutionKey();
}
}

View File

@ -16,7 +16,7 @@ public class WorkflowActivityClassWrapperTest {
@Override
public Object run(WorkflowActivityContext ctx) {
String activityContextName = ctx.getName();
return ctx.getInput(String.class) + " world! from " + activityContextName;
return ctx.getInput(String.class) + " world! from " + activityContextName + " with task execution key " + ctx.getTaskExecutionKey();
}
}
@ -37,10 +37,11 @@ public class WorkflowActivityClassWrapperTest {
when(mockContext.getInput(String.class)).thenReturn("Hello");
when(mockContext.getName()).thenReturn("TestActivityContext");
when(mockContext.getTaskExecutionKey()).thenReturn("123");
Object result = wrapper.create().run(mockContext);
verify(mockContext, times(1)).getInput(String.class);
assertEquals("Hello world! from TestActivityContext", result);
assertEquals("Hello world! from TestActivityContext with task execution key 123", result);
}
}