Wrap DurableTask objects

Signed-off-by: Mason <theforbiddenai@gmail.com>
This commit is contained in:
Mason 2025-06-09 14:25:45 -04:00
parent ef5d57486e
commit dbe95c8bc4
5 changed files with 245 additions and 8 deletions

View File

@ -0,0 +1,93 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows;
public class WorkflowTaskFailureDetails {
private final String errorType;
private final String errorMessage;
private final String stackTrace;
/**
* Constructor for WorkflowTaskFailureDetails.
*
* @param errorType The type of error
* @param errorMessage The error message
* @param stackTrace The stacktrace of the error
*/
public WorkflowTaskFailureDetails(
String errorType,
String errorMessage,
String stackTrace) {
this.errorType = errorType;
this.errorMessage = errorMessage;
this.stackTrace = stackTrace;
}
/**
* Gets the exception class name if the failure was caused by an unhandled exception. Otherwise, gets a symbolic
* name that describes the general type of error that was encountered.
*
* @return the error type as a {@code String} value
*/
public String getErrorType() {
return this.errorType;
}
/**
* Gets a summary description of the error that caused this failure. If the failure was caused by an exception, the
* exception message is returned.
*
* @return a summary description of the error
*/
public String getErrorMessage() {
return this.errorMessage;
}
/**
* Gets the stack trace of the exception that caused this failure, or {@code null} if the failure was caused by
* a non-exception error.
*
* @return the stack trace of the failure exception or {@code null} if the failure was not caused by an exception
*/
public String getStackTrace() {
return this.stackTrace;
}
/**
* Returns {@code true} if the task failure was provided by the specified exception type, otherwise {@code false}.
*
* <p>This method allows checking if a task failed due to a specific exception type by attempting to load the class
* specified in {@link #getErrorType()}. If the exception class cannot be loaded for any reason, this method will
* return {@code false}. Base types are supported by this method, as shown in the following example:
* <pre>{@code
* boolean isRuntimeException = failureDetails.isCausedBy(RuntimeException.class);
* }</pre>
*
* @param exceptionClass the class representing the exception type to test
* @return {@code true} if the task failure was provided by the specified exception type, otherwise {@code false}
*/
public boolean isCausedBy(Class<? extends Exception> exceptionClass) {
String actualClassName = this.getErrorType();
try {
// Try using reflection to load the failure's class type and see if it's a subtype of the specified
// exception. For example, this should always succeed if exceptionClass is System.Exception.
Class<?> actualExceptionClass = Class.forName(actualClassName);
return exceptionClass.isAssignableFrom(actualExceptionClass);
} catch (ClassNotFoundException ex) {
// Can't load the class and thus can't tell if it's related
return false;
}
}
}

View File

@ -13,14 +13,12 @@ limitations under the License.
package io.dapr.workflows;
import io.dapr.durabletask.RetryHandler;
public class WorkflowTaskOptions {
private final WorkflowTaskRetryPolicy retryPolicy;
private final RetryHandler retryHandler;
private final WorkflowTaskRetryHandler retryHandler;
public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy, RetryHandler retryHandler) {
public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy, WorkflowTaskRetryHandler retryHandler) {
this.retryPolicy = retryPolicy;
this.retryHandler = retryHandler;
}
@ -29,7 +27,7 @@ public class WorkflowTaskOptions {
this(retryPolicy, null);
}
public WorkflowTaskOptions(RetryHandler retryHandler) {
public WorkflowTaskOptions(WorkflowTaskRetryHandler retryHandler) {
this(null, retryHandler);
}
@ -37,7 +35,7 @@ public class WorkflowTaskOptions {
return retryPolicy;
}
public RetryHandler getRetryHandler() {
public WorkflowTaskRetryHandler getRetryHandler() {
return retryHandler;
}

View File

@ -0,0 +1,87 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows;
import io.dapr.workflows.runtime.DefaultWorkflowContext;
import java.time.Duration;
public class WorkflowTaskRetryContext {
private final DefaultWorkflowContext workflowContext;
private final int lastAttemptNumber;
private final WorkflowTaskFailureDetails lastFailure;
private final Duration totalRetryTime;
/**
* Constructor for WorkflowTaskRetryContext.
*
* @param workflowContext The workflow context
* @param lastAttemptNumber The number of the previous attempt
* @param lastFailure The failure details from the most recent failure
* @param totalRetryTime The amount of time spent retrying
*/
public WorkflowTaskRetryContext(
DefaultWorkflowContext workflowContext,
int lastAttemptNumber,
WorkflowTaskFailureDetails lastFailure,
Duration totalRetryTime) {
this.workflowContext = workflowContext;
this.lastAttemptNumber = lastAttemptNumber;
this.lastFailure = lastFailure;
this.totalRetryTime = totalRetryTime;
}
/**
* Gets the context of the current orchestration.
*
* <p>The orchestration context can be used in retry handlers to schedule timers (via the
* {@link DefaultWorkflowContext#createTimer} methods) for implementing delays between retries. It can also be
* used to implement time-based retry logic by using the {@link DefaultWorkflowContext#getCurrentInstant} method.
*
* @return the context of the parent orchestration
*/
public DefaultWorkflowContext getOrchestrationContext() {
return this.workflowContext;
}
/**
* Gets the details of the previous task failure, including the exception type, message, and callstack.
*
* @return the details of the previous task failure
*/
public WorkflowTaskFailureDetails getLastFailure() {
return this.lastFailure;
}
/**
* Gets the previous retry attempt number. This number starts at 1 and increments each time the retry handler
* is invoked for a particular task failure.
*
* @return the previous retry attempt number
*/
public int getLastAttemptNumber() {
return this.lastAttemptNumber;
}
/**
* Gets the total amount of time spent in a retry loop for the current task.
*
* @return the total amount of time spent in a retry loop for the current task
*/
public Duration getTotalRetryTime() {
return this.totalRetryTime;
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows;
public interface WorkflowTaskRetryHandler {
/**
* Invokes retry handler logic. Return value indicates whether to continue retrying.
*
* @param retryContext The context of the retry
* @return {@code true} to continue retrying or {@code false} to stop retrying.
*/
boolean handle(WorkflowTaskRetryContext retryContext);
}

View File

@ -14,6 +14,7 @@ limitations under the License.
package io.dapr.workflows.runtime;
import io.dapr.durabletask.CompositeTaskFailedException;
import io.dapr.durabletask.FailureDetails;
import io.dapr.durabletask.RetryHandler;
import io.dapr.durabletask.RetryPolicy;
import io.dapr.durabletask.Task;
@ -21,7 +22,10 @@ import io.dapr.durabletask.TaskCanceledException;
import io.dapr.durabletask.TaskOptions;
import io.dapr.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowTaskFailureDetails;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryContext;
import io.dapr.workflows.WorkflowTaskRetryHandler;
import io.dapr.workflows.WorkflowTaskRetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -229,13 +233,13 @@ public class DefaultWorkflowContext implements WorkflowContext {
return this.innerContext.newUUID();
}
private static TaskOptions toTaskOptions(WorkflowTaskOptions options) {
private TaskOptions toTaskOptions(WorkflowTaskOptions options) {
if (options == null) {
return null;
}
RetryPolicy retryPolicy = null;
RetryHandler retryHandler = options.getRetryHandler();
RetryHandler retryHandler = toRetryHandler(options.getRetryHandler());
if (options.getRetryPolicy() != null) {
WorkflowTaskRetryPolicy workflowTaskRetryPolicy = options.getRetryPolicy();
@ -252,4 +256,33 @@ public class DefaultWorkflowContext implements WorkflowContext {
return new TaskOptions(retryPolicy, retryHandler);
}
/**
* Converts a {@link WorkflowTaskRetryHandler} to a {@link RetryHandler}.
*
* @param workflowTaskRetryHandler The {@link WorkflowTaskRetryHandler} being converted
* @return A {@link RetryHandler}
*/
private RetryHandler toRetryHandler(WorkflowTaskRetryHandler workflowTaskRetryHandler) {
if (workflowTaskRetryHandler == null) {
return null;
}
return retryContext -> {
FailureDetails failureDetails = retryContext.getLastFailure();
WorkflowTaskFailureDetails workflowFailureDetails = new WorkflowTaskFailureDetails(
failureDetails.getErrorType(),
failureDetails.getErrorMessage(),
failureDetails.getStackTrace()
);
WorkflowTaskRetryContext workflowRetryContext = new WorkflowTaskRetryContext(
this,
retryContext.getLastAttemptNumber(),
workflowFailureDetails,
retryContext.getTotalRetryTime()
);
return workflowTaskRetryHandler.handle(workflowRetryContext);
};
}
}