Add retry handler support (#1412)

* Add retry handler support

Signed-off-by: Mason <theforbiddenai@gmail.com>

* Wrap DurableTask objects

Signed-off-by: Mason <theforbiddenai@gmail.com>

* Rename method

Signed-off-by: Mason <theforbiddenai@gmail.com>

* Add isNonRetriable field to WorkflowTaskFailureDetails

Signed-off-by: Mason <theforbiddenai@gmail.com>

* Add unit test

Signed-off-by: Mason <theforbiddenai@gmail.com>

* Removed duplicate WorkflowFailureDetails class

Signed-off-by: Mason <theforbiddenai@gmail.com>

* Removed unneeded when statements in retry policy unit test

Signed-off-by: Mason <theforbiddenai@gmail.com>

* Add unit test to test both RetryPolicy and RetryHandler

Signed-off-by: Mason <theforbiddenai@gmail.com>

* Create toRetryPolicy method

Signed-off-by: Mason <theforbiddenai@gmail.com>

---------

Signed-off-by: Mason <theforbiddenai@gmail.com>
Co-authored-by: Siri Varma Vegiraju <siri.varma@outlook.com>
Co-authored-by: Cassie Coyle <cassie.i.coyle@gmail.com>
This commit is contained in:
Mason 2025-06-11 14:31:28 -04:00 committed by GitHub
parent dcaca773b3
commit 981b3b457b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 288 additions and 9 deletions

View File

@ -47,7 +47,7 @@
<dependency>
<groupId>io.dapr</groupId>
<artifactId>durabletask-client</artifactId>
<version>1.5.5</version>
<version>1.5.6</version>
</dependency>
<!--
manually declare durabletask-client's jackson dependencies

View File

@ -16,13 +16,27 @@ package io.dapr.workflows;
public class WorkflowTaskOptions {
private final WorkflowTaskRetryPolicy retryPolicy;
private final WorkflowTaskRetryHandler retryHandler;
public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy, WorkflowTaskRetryHandler retryHandler) {
this.retryPolicy = retryPolicy;
this.retryHandler = retryHandler;
}
public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
this(retryPolicy, null);
}
public WorkflowTaskOptions(WorkflowTaskRetryHandler retryHandler) {
this(null, retryHandler);
}
public WorkflowTaskRetryPolicy getRetryPolicy() {
return retryPolicy;
}
public WorkflowTaskRetryHandler getRetryHandler() {
return retryHandler;
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows;
import io.dapr.workflows.client.WorkflowFailureDetails;
import io.dapr.workflows.runtime.DefaultWorkflowContext;
import java.time.Duration;
public class WorkflowTaskRetryContext {
private final DefaultWorkflowContext workflowContext;
private final int lastAttemptNumber;
private final WorkflowFailureDetails lastFailure;
private final Duration totalRetryTime;
/**
* Constructor for WorkflowTaskRetryContext.
*
* @param workflowContext The workflow context
* @param lastAttemptNumber The number of the previous attempt
* @param lastFailure The failure details from the most recent failure
* @param totalRetryTime The amount of time spent retrying
*/
public WorkflowTaskRetryContext(
DefaultWorkflowContext workflowContext,
int lastAttemptNumber,
WorkflowFailureDetails lastFailure,
Duration totalRetryTime) {
this.workflowContext = workflowContext;
this.lastAttemptNumber = lastAttemptNumber;
this.lastFailure = lastFailure;
this.totalRetryTime = totalRetryTime;
}
/**
* Gets the context of the current workflow.
*
* <p>The workflow context can be used in retry handlers to schedule timers (via the
* {@link DefaultWorkflowContext#createTimer} methods) for implementing delays between retries. It can also be
* used to implement time-based retry logic by using the {@link DefaultWorkflowContext#getCurrentInstant} method.
*
* @return the context of the parent workflow
*/
public DefaultWorkflowContext getWorkflowContext() {
return this.workflowContext;
}
/**
* Gets the details of the previous task failure, including the exception type, message, and callstack.
*
* @return the details of the previous task failure
*/
public WorkflowFailureDetails getLastFailure() {
return this.lastFailure;
}
/**
* Gets the previous retry attempt number. This number starts at 1 and increments each time the retry handler
* is invoked for a particular task failure.
*
* @return the previous retry attempt number
*/
public int getLastAttemptNumber() {
return this.lastAttemptNumber;
}
/**
* Gets the total amount of time spent in a retry loop for the current task.
*
* @return the total amount of time spent in a retry loop for the current task
*/
public Duration getTotalRetryTime() {
return this.totalRetryTime;
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows;
public interface WorkflowTaskRetryHandler {
/**
* Invokes retry handler logic. Return value indicates whether to continue retrying.
*
* @param retryContext The context of the retry
* @return {@code true} to continue retrying or {@code false} to stop retrying.
*/
boolean handle(WorkflowTaskRetryContext retryContext);
}

View File

@ -39,4 +39,14 @@ public interface WorkflowFailureDetails {
*/
String getStackTrace();
/**
* Checks whether the failure was caused by the provided exception class.
*
* @param exceptionClass the exception class to check
* @return {@code true} if the failure was caused by the provided exception class
*/
default boolean isCausedBy(Class<? extends Exception> exceptionClass) {
throw new UnsupportedOperationException("This method is not implemented");
}
}

View File

@ -14,6 +14,7 @@ limitations under the License.
package io.dapr.workflows.runtime;
import io.dapr.durabletask.CompositeTaskFailedException;
import io.dapr.durabletask.RetryHandler;
import io.dapr.durabletask.RetryPolicy;
import io.dapr.durabletask.Task;
import io.dapr.durabletask.TaskCanceledException;
@ -21,6 +22,8 @@ import io.dapr.durabletask.TaskOptions;
import io.dapr.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryContext;
import io.dapr.workflows.WorkflowTaskRetryHandler;
import io.dapr.workflows.WorkflowTaskRetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -228,15 +231,31 @@ public class DefaultWorkflowContext implements WorkflowContext {
return this.innerContext.newUUID();
}
private static TaskOptions toTaskOptions(WorkflowTaskOptions options) {
private TaskOptions toTaskOptions(WorkflowTaskOptions options) {
if (options == null) {
return null;
}
WorkflowTaskRetryPolicy workflowTaskRetryPolicy = options.getRetryPolicy();
RetryPolicy retryPolicy = toRetryPolicy(options.getRetryPolicy());
RetryHandler retryHandler = toRetryHandler(options.getRetryHandler());
return new TaskOptions(retryPolicy, retryHandler);
}
/**
* Converts a {@link WorkflowTaskRetryPolicy} to a {@link RetryPolicy}.
*
* @param workflowTaskRetryPolicy The {@link WorkflowTaskRetryPolicy} being converted
* @return A {@link RetryPolicy}
*/
private RetryPolicy toRetryPolicy(WorkflowTaskRetryPolicy workflowTaskRetryPolicy) {
if (workflowTaskRetryPolicy == null) {
return null;
}
RetryPolicy retryPolicy = new RetryPolicy(
workflowTaskRetryPolicy.getMaxNumberOfAttempts(),
workflowTaskRetryPolicy.getFirstRetryInterval()
workflowTaskRetryPolicy.getMaxNumberOfAttempts(),
workflowTaskRetryPolicy.getFirstRetryInterval()
);
retryPolicy.setBackoffCoefficient(workflowTaskRetryPolicy.getBackoffCoefficient());
@ -244,6 +263,29 @@ public class DefaultWorkflowContext implements WorkflowContext {
retryPolicy.setRetryTimeout(workflowTaskRetryPolicy.getRetryTimeout());
}
return new TaskOptions(retryPolicy);
return retryPolicy;
}
/**
* Converts a {@link WorkflowTaskRetryHandler} to a {@link RetryHandler}.
*
* @param workflowTaskRetryHandler The {@link WorkflowTaskRetryHandler} being converted
* @return A {@link RetryHandler}
*/
private RetryHandler toRetryHandler(WorkflowTaskRetryHandler workflowTaskRetryHandler) {
if (workflowTaskRetryHandler == null) {
return null;
}
return retryContext -> {
WorkflowTaskRetryContext workflowRetryContext = new WorkflowTaskRetryContext(
this,
retryContext.getLastAttemptNumber(),
new DefaultWorkflowFailureDetails(retryContext.getLastFailure()),
retryContext.getTotalRetryTime()
);
return workflowTaskRetryHandler.handle(workflowRetryContext);
};
}
}

View File

@ -62,6 +62,17 @@ public class DefaultWorkflowFailureDetails implements WorkflowFailureDetails {
return workflowFailureDetails.getStackTrace();
}
/**
* Checks whether the failure was caused by the provided exception class.
*
* @param exceptionClass the exception class to check
* @return {@code true} if the failure was caused by the provided exception class
*/
@Override
public boolean isCausedBy(Class<? extends Exception> exceptionClass) {
return workflowFailureDetails.isCausedBy(exceptionClass);
}
@Override
public String toString() {
return workflowFailureDetails.toString();

View File

@ -14,6 +14,9 @@ limitations under the License.
package io.dapr.workflows;
import io.dapr.durabletask.CompositeTaskFailedException;
import io.dapr.durabletask.FailureDetails;
import io.dapr.durabletask.RetryContext;
import io.dapr.durabletask.RetryHandler;
import io.dapr.durabletask.Task;
import io.dapr.durabletask.TaskCanceledException;
import io.dapr.durabletask.TaskOptions;
@ -35,10 +38,11 @@ import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -278,7 +282,7 @@ public class DefaultWorkflowContextTest {
}
@Test
public void callChildWorkflowWithOptions() {
public void callChildWorkflowWithRetryPolicy() {
String expectedName = "TestActivity";
String expectedInput = "TestInput";
String expectedInstanceId = "TestInstanceId";
@ -305,6 +309,90 @@ public class DefaultWorkflowContextTest {
assertEquals(retryPolicy.getMaxNumberOfAttempts(), taskOptions.getRetryPolicy().getMaxNumberOfAttempts());
assertEquals(retryPolicy.getFirstRetryInterval(), taskOptions.getRetryPolicy().getFirstRetryInterval());
assertEquals(Duration.ZERO, taskOptions.getRetryPolicy().getRetryTimeout());
assertNull(taskOptions.getRetryHandler());
}
@Test
public void callChildWorkflowWithRetryHandler() {
String expectedName = "TestActivity";
String expectedInput = "TestInput";
String expectedInstanceId = "TestInstanceId";
WorkflowTaskRetryHandler retryHandler = spy(new WorkflowTaskRetryHandler() {
@Override
public boolean handle(WorkflowTaskRetryContext retryContext) {
return true;
}
});
WorkflowTaskOptions executionOptions = new WorkflowTaskOptions(retryHandler);
ArgumentCaptor<TaskOptions> captor = ArgumentCaptor.forClass(TaskOptions.class);
context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, executionOptions, String.class);
verify(mockInnerContext, times(1))
.callSubOrchestrator(
eq(expectedName),
eq(expectedInput),
eq(expectedInstanceId),
captor.capture(),
eq(String.class)
);
TaskOptions taskOptions = captor.getValue();
RetryHandler durableRetryHandler = taskOptions.getRetryHandler();
RetryContext retryContext = mock(RetryContext.class, invocationOnMock -> null);
durableRetryHandler.handle(retryContext);
verify(retryHandler, times(1)).handle(any());
assertNull(taskOptions.getRetryPolicy());
}
@Test
public void callChildWorkflowWithRetryPolicyAndHandler() {
String expectedName = "TestActivity";
String expectedInput = "TestInput";
String expectedInstanceId = "TestInstanceId";
WorkflowTaskRetryPolicy retryPolicy = WorkflowTaskRetryPolicy.newBuilder()
.setMaxNumberOfAttempts(1)
.setFirstRetryInterval(Duration.ofSeconds(10))
.build();
WorkflowTaskRetryHandler retryHandler = spy(new WorkflowTaskRetryHandler() {
@Override
public boolean handle(WorkflowTaskRetryContext retryContext) {
return true;
}
});
WorkflowTaskOptions executionOptions = new WorkflowTaskOptions(retryPolicy, retryHandler);
ArgumentCaptor<TaskOptions> captor = ArgumentCaptor.forClass(TaskOptions.class);
context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, executionOptions, String.class);
verify(mockInnerContext, times(1))
.callSubOrchestrator(
eq(expectedName),
eq(expectedInput),
eq(expectedInstanceId),
captor.capture(),
eq(String.class)
);
TaskOptions taskOptions = captor.getValue();
RetryHandler durableRetryHandler = taskOptions.getRetryHandler();
RetryContext retryContext = mock(RetryContext.class, invocationOnMock -> null);
durableRetryHandler.handle(retryContext);
verify(retryHandler, times(1)).handle(any());
assertEquals(retryPolicy.getMaxNumberOfAttempts(), taskOptions.getRetryPolicy().getMaxNumberOfAttempts());
assertEquals(retryPolicy.getFirstRetryInterval(), taskOptions.getRetryPolicy().getFirstRetryInterval());
assertEquals(Duration.ZERO, taskOptions.getRetryPolicy().getRetryTimeout());
}
@Test