Adding WorkflowTaskOptions and use it instead of TaskOptions (#1200)

This commit is contained in:
artur-ciocanu 2025-01-30 02:22:07 +02:00 committed by GitHub
parent be0e56bf50
commit be5530fdd7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 409 additions and 167 deletions

View File

@ -16,7 +16,7 @@ package io.dapr.workflows;
import com.microsoft.durabletask.interruption.ContinueAsNewInterruption; import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
import com.microsoft.durabletask.interruption.OrchestratorBlockedException; import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
import io.dapr.workflows.saga.SagaCompensationException; import io.dapr.workflows.saga.SagaCompensationException;
import io.dapr.workflows.saga.SagaOption; import io.dapr.workflows.saga.SagaOptions;
/** /**
* Common interface for workflow implementations. * Common interface for workflow implementations.
@ -74,7 +74,7 @@ public interface Workflow {
* *
* @return saga configuration * @return saga configuration
*/ */
default SagaOption getSagaOption() { default SagaOptions getSagaOption() {
// by default, saga is disabled // by default, saga is disabled
return null; return null;
} }

View File

@ -14,7 +14,9 @@ limitations under the License.
package io.dapr.workflows; package io.dapr.workflows;
public interface WorkflowActivityContext { public interface WorkflowActivityContext {
String getName(); String getName();
<T> T getInput(Class<T> targetType); <T> T getInput(Class<T> targetType);
} }

View File

@ -17,7 +17,6 @@ import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task; import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException; import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskFailedException; import com.microsoft.durabletask.TaskFailedException;
import com.microsoft.durabletask.TaskOptions;
import io.dapr.workflows.saga.SagaContext; import io.dapr.workflows.saga.SagaContext;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -153,15 +152,15 @@ public interface WorkflowContext {
* @param <V> the expected type of the activity output * @param <V> the expected type of the activity output
* @return a new {@link Task} that completes when the activity completes or fails * @return a new {@link Task} that completes when the activity completes or fails
*/ */
<V> Task<V> callActivity(String name, Object input, TaskOptions options, Class<V> returnType); <V> Task<V> callActivity(String name, Object input, WorkflowTaskOptions options, Class<V> returnType);
/** /**
* Asynchronously invokes an activity by name and returns a new {@link Task} that completes when the activity * Asynchronously invokes an activity by name and returns a new {@link Task} that completes when the activity
* completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a complete description. * completes. See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description.
* *
* @param name the name of the activity to call * @param name the name of the activity to call
* @return a new {@link Task} that completes when the activity completes or fails * @return a new {@link Task} that completes when the activity completes or fails
* @see #callActivity(String, Object, TaskOptions, Class) * @see #callActivity(String, Object, WorkflowTaskOptions, Class)
*/ */
default Task<Void> callActivity(String name) { default Task<Void> callActivity(String name) {
return this.callActivity(name, null, null, Void.class); return this.callActivity(name, null, null, Void.class);
@ -169,8 +168,8 @@ public interface WorkflowContext {
/** /**
* Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task} * Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task}
* that completes when the activity completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a * that completes when the activity completes.
* complete description. * See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description.
* *
* @param name the name of the activity to call * @param name the name of the activity to call
* @param input the serializable input to pass to the activity * @param input the serializable input to pass to the activity
@ -183,7 +182,7 @@ public interface WorkflowContext {
/** /**
* Asynchronously invokes an activity by name and returns a new {@link Task} that completes when the activity * Asynchronously invokes an activity by name and returns a new {@link Task} that completes when the activity
* completes. If the activity completes successfully, the returned {@code Task}'s value will be the activity's * completes. If the activity completes successfully, the returned {@code Task}'s value will be the activity's
* output. See {@link #callActivity(String, Object, TaskOptions, Class)} for a complete description. * output. See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description.
* *
* @param name the name of the activity to call * @param name the name of the activity to call
* @param returnType the expected class type of the activity output * @param returnType the expected class type of the activity output
@ -197,8 +196,8 @@ public interface WorkflowContext {
/** /**
* Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task} * Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task}
* that completes when the activity completes.If the activity completes successfully, the returned {@code Task}'s * that completes when the activity completes.If the activity completes successfully, the returned {@code Task}'s
* value will be the activity's output. See {@link #callActivity(String, Object, TaskOptions, Class)} for a * value will be the activity's output.
* complete description. * See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description.
* *
* @param name the name of the activity to call * @param name the name of the activity to call
* @param input the serializable input to pass to the activity * @param input the serializable input to pass to the activity
@ -212,15 +211,15 @@ public interface WorkflowContext {
/** /**
* Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task} * Asynchronously invokes an activity by name and with the specified input value and returns a new {@link Task}
* that completes when the activity completes. See {@link #callActivity(String, Object, TaskOptions, Class)} for a * that completes when the activity completes.
* complete description. * See {@link #callActivity(String, Object, WorkflowTaskOptions, Class)} for a complete description.
* *
* @param name the name of the activity to call * @param name the name of the activity to call
* @param input the serializable input to pass to the activity * @param input the serializable input to pass to the activity
* @param options additional options that control the execution and processing of the activity * @param options additional options that control the execution and processing of the activity
* @return a new {@link Task} that completes when the activity completes or fails * @return a new {@link Task} that completes when the activity completes or fails
*/ */
default Task<Void> callActivity(String name, Object input, TaskOptions options) { default Task<Void> callActivity(String name, Object input, WorkflowTaskOptions options) {
return this.callActivity(name, input, options, Void.class); return this.callActivity(name, input, options, Void.class);
} }
@ -367,11 +366,11 @@ public interface WorkflowContext {
* Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes * Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes
* when the child-workflow completes. * when the child-workflow completes.
* *
* <p>See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description. * <p>See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description.
* *
* @param name the name of the workflow to invoke * @param name the name of the workflow to invoke
* @return a new {@link Task} that completes when the child-workflow completes or fails * @return a new {@link Task} that completes when the child-workflow completes or fails
* @see #callChildWorkflow(String, Object, String, TaskOptions, Class) * @see #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)
*/ */
default Task<Void> callChildWorkflow(String name) { default Task<Void> callChildWorkflow(String name) {
return this.callChildWorkflow(name, null); return this.callChildWorkflow(name, null);
@ -381,7 +380,7 @@ public interface WorkflowContext {
* Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes * Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes
* when the child-workflow completes. * when the child-workflow completes.
* *
* <p>See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description. * <p>See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description.
* *
* @param name the name of the workflow to invoke * @param name the name of the workflow to invoke
* @param input the serializable input to send to the child-workflow * @param input the serializable input to send to the child-workflow
@ -395,7 +394,7 @@ public interface WorkflowContext {
* Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes * Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes
* when the child-workflow completes. * when the child-workflow completes.
* *
* <p>See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description. * <p>See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description.
* *
* @param name the name of the workflow to invoke * @param name the name of the workflow to invoke
* @param input the serializable input to send to the child-workflow * @param input the serializable input to send to the child-workflow
@ -411,7 +410,7 @@ public interface WorkflowContext {
* Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes * Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes
* when the child-workflow completes. * when the child-workflow completes.
* *
* <p>See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description. * <p>See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description.
* *
* @param name the name of the workflow to invoke * @param name the name of the workflow to invoke
* @param input the serializable input to send to the child-workflow * @param input the serializable input to send to the child-workflow
@ -428,7 +427,7 @@ public interface WorkflowContext {
* Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes * Asynchronously invokes another workflow as a child-workflow and returns a {@link Task} that completes
* when the child-workflow completes. * when the child-workflow completes.
* *
* <p>See {@link #callChildWorkflow(String, Object, String, TaskOptions, Class)} for a full description. * <p>See {@link #callChildWorkflow(String, Object, String, WorkflowTaskOptions, Class)} for a full description.
* *
* @param name the name of the workflow to invoke * @param name the name of the workflow to invoke
* @param input the serializable input to send to the child-workflow * @param input the serializable input to send to the child-workflow
@ -436,7 +435,7 @@ public interface WorkflowContext {
* @param options additional options that control the execution and processing of the activity * @param options additional options that control the execution and processing of the activity
* @return a new {@link Task} that completes when the child-workflow completes or fails * @return a new {@link Task} that completes when the child-workflow completes or fails
*/ */
default Task<Void> callChildWorkflow(String name, Object input, String instanceID, TaskOptions options) { default Task<Void> callChildWorkflow(String name, Object input, String instanceID, WorkflowTaskOptions options) {
return this.callChildWorkflow(name, input, instanceID, options, Void.class); return this.callChildWorkflow(name, input, instanceID, options, Void.class);
} }
@ -478,7 +477,7 @@ public interface WorkflowContext {
<V> Task<V> callChildWorkflow(String name, <V> Task<V> callChildWorkflow(String name,
@Nullable Object input, @Nullable Object input,
@Nullable String instanceID, @Nullable String instanceID,
@Nullable TaskOptions options, @Nullable WorkflowTaskOptions options,
Class<V> returnType); Class<V> returnType);
/** /**

View File

@ -15,5 +15,7 @@ package io.dapr.workflows;
@FunctionalInterface @FunctionalInterface
public interface WorkflowStub { public interface WorkflowStub {
void run(WorkflowContext ctx); void run(WorkflowContext ctx);
} }

View File

@ -0,0 +1,28 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows;
public class WorkflowTaskOptions {
private final WorkflowTaskRetryPolicy retryPolicy;
public WorkflowTaskOptions(WorkflowTaskRetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
}
public WorkflowTaskRetryPolicy getRetryPolicy() {
return retryPolicy;
}
}

View File

@ -0,0 +1,180 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.workflows;
import javax.annotation.Nullable;
import java.time.Duration;
public final class WorkflowTaskRetryPolicy {
private final Integer maxNumberOfAttempts;
private final Duration firstRetryInterval;
private final Double backoffCoefficient;
private final Duration maxRetryInterval;
private final Duration retryTimeout;
/**
* Constructor for WorkflowTaskRetryPolicy.
* @param maxNumberOfAttempts Maximum number of attempts to retry the workflow.
* @param firstRetryInterval Interval to wait before the first retry.
* @param backoffCoefficient Coefficient to increase the retry interval.
* @param maxRetryInterval Maximum interval to wait between retries.
* @param retryTimeout Timeout for the whole retry process.
*/
public WorkflowTaskRetryPolicy(
Integer maxNumberOfAttempts,
Duration firstRetryInterval,
Double backoffCoefficient,
Duration maxRetryInterval,
Duration retryTimeout
) {
this.maxNumberOfAttempts = maxNumberOfAttempts;
this.firstRetryInterval = firstRetryInterval;
this.backoffCoefficient = backoffCoefficient;
this.maxRetryInterval = maxRetryInterval;
this.retryTimeout = retryTimeout;
}
public int getMaxNumberOfAttempts() {
return maxNumberOfAttempts;
}
public Duration getFirstRetryInterval() {
return firstRetryInterval;
}
public double getBackoffCoefficient() {
return backoffCoefficient;
}
public Duration getMaxRetryInterval() {
return maxRetryInterval;
}
public Duration getRetryTimeout() {
return retryTimeout;
}
public static Builder newBuilder() {
return new Builder();
}
public static class Builder {
private Integer maxNumberOfAttempts;
private Duration firstRetryInterval;
private Double backoffCoefficient = 1.0;
private Duration maxRetryInterval;
private Duration retryTimeout;
private Builder() {
}
/**
* Build the WorkflowTaskRetryPolicy.
* @return WorkflowTaskRetryPolicy
*/
public WorkflowTaskRetryPolicy build() {
return new WorkflowTaskRetryPolicy(
this.maxNumberOfAttempts,
this.firstRetryInterval,
this.backoffCoefficient,
this.maxRetryInterval,
this.retryTimeout
);
}
/**
* Set the maximum number of attempts to retry the workflow.
* @param maxNumberOfAttempts Maximum number
* @return This builder
*/
public Builder setMaxNumberOfAttempts(int maxNumberOfAttempts) {
if (maxNumberOfAttempts <= 0) {
throw new IllegalArgumentException("The value for maxNumberOfAttempts must be greater than zero.");
}
this.maxNumberOfAttempts = maxNumberOfAttempts;
return this;
}
/**
* Set the interval to wait before the first retry.
* @param firstRetryInterval Interval
* @return This builder
*/
public Builder setFirstRetryInterval(Duration firstRetryInterval) {
if (firstRetryInterval == null) {
throw new IllegalArgumentException("firstRetryInterval cannot be null.");
}
if (firstRetryInterval.isZero() || firstRetryInterval.isNegative()) {
throw new IllegalArgumentException("The value for firstRetryInterval must be greater than zero.");
}
this.firstRetryInterval = firstRetryInterval;
return this;
}
/**
* Set the backoff coefficient.
* @param backoffCoefficient Double value
* @return This builder
*/
public Builder setBackoffCoefficient(double backoffCoefficient) {
if (backoffCoefficient < 1.0) {
throw new IllegalArgumentException("The value for backoffCoefficient must be greater or equal to 1.0.");
}
this.backoffCoefficient = backoffCoefficient;
return this;
}
/**
* Set the maximum interval to wait between retries.
* @param maxRetryInterval Maximum interval
* @return This builder
*/
public Builder setMaxRetryInterval(@Nullable Duration maxRetryInterval) {
if (maxRetryInterval != null && maxRetryInterval.compareTo(this.firstRetryInterval) < 0) {
throw new IllegalArgumentException(
"The value for maxRetryInterval must be greater than or equal to the value for firstRetryInterval.");
}
this.maxRetryInterval = maxRetryInterval;
return this;
}
/**
* Set the maximum retry timeout.
* @param retryTimeout Maximum retry timeout
* @return This builder
*/
public Builder setRetryTimeout(Duration retryTimeout) {
if (retryTimeout != null && retryTimeout.compareTo(this.firstRetryInterval) < 0) {
throw new IllegalArgumentException(
"The value for retryTimeout must be greater than or equal to the value for firstRetryInterval.");
}
this.retryTimeout = retryTimeout;
return this;
}
}
}

View File

@ -17,19 +17,12 @@ import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.DurableTaskGrpcClientBuilder; import com.microsoft.durabletask.DurableTaskGrpcClientBuilder;
import com.microsoft.durabletask.OrchestrationMetadata; import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.PurgeResult; import com.microsoft.durabletask.PurgeResult;
import io.dapr.client.Headers;
import io.dapr.config.Properties; import io.dapr.config.Properties;
import io.dapr.utils.NetworkUtils; import io.dapr.utils.NetworkUtils;
import io.dapr.workflows.Workflow; import io.dapr.workflows.Workflow;
import io.dapr.workflows.internal.ApiTokenClientInterceptor; import io.dapr.workflows.internal.ApiTokenClientInterceptor;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -42,6 +35,8 @@ import java.util.concurrent.TimeoutException;
*/ */
public class DaprWorkflowClient implements AutoCloseable { public class DaprWorkflowClient implements AutoCloseable {
private static final ClientInterceptor WORKFLOW_INTERCEPTOR = new ApiTokenClientInterceptor();
private DurableTaskClient innerClient; private DurableTaskClient innerClient;
private ManagedChannel grpcChannel; private ManagedChannel grpcChannel;
@ -137,7 +132,7 @@ public class DaprWorkflowClient implements AutoCloseable {
* @param options the options for the new workflow, including input, instance ID, etc. * @param options the options for the new workflow, including input, instance ID, etc.
* @return the <code>instanceId</code> parameter value. * @return the <code>instanceId</code> parameter value.
*/ */
public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, NewWorkflowOption options) { public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, NewWorkflowOptions options) {
return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(),
options.getNewOrchestrationInstanceOptions()); options.getNewOrchestrationInstanceOptions());
} }
@ -272,6 +267,6 @@ public class DaprWorkflowClient implements AutoCloseable {
} }
} }
private static ClientInterceptor WORKFLOW_INTERCEPTOR = new ApiTokenClientInterceptor();
} }

View File

@ -20,16 +20,16 @@ import java.time.Instant;
/** /**
* Options for starting a new instance of a workflow. * Options for starting a new instance of a workflow.
*/ */
public class NewWorkflowOption { public class NewWorkflowOptions {
private final NewOrchestrationInstanceOptions newOrchestrationInstanceOptions = new NewOrchestrationInstanceOptions(); private final NewOrchestrationInstanceOptions newOrchestrationInstanceOptions = new NewOrchestrationInstanceOptions();
/** /**
* Sets the version of the workflow to start. * Sets the version of the workflow to start.
* *
* @param version the user-defined version of workflow * @param version the user-defined version of workflow
* @return this {@link NewWorkflowOption} object * @return this {@link NewWorkflowOptions} object
*/ */
public NewWorkflowOption setVersion(String version) { public NewWorkflowOptions setVersion(String version) {
this.newOrchestrationInstanceOptions.setVersion(version); this.newOrchestrationInstanceOptions.setVersion(version);
return this; return this;
} }
@ -40,9 +40,9 @@ public class NewWorkflowOption {
* <p>If no instance ID is configured, the workflow will be created with a randomly generated instance ID. * <p>If no instance ID is configured, the workflow will be created with a randomly generated instance ID.
* *
* @param instanceId the ID of the new workflow * @param instanceId the ID of the new workflow
* @return this {@link NewWorkflowOption} object * @return this {@link NewWorkflowOptions} object
*/ */
public NewWorkflowOption setInstanceId(String instanceId) { public NewWorkflowOptions setInstanceId(String instanceId) {
this.newOrchestrationInstanceOptions.setInstanceId(instanceId); this.newOrchestrationInstanceOptions.setInstanceId(instanceId);
return this; return this;
} }
@ -51,9 +51,9 @@ public class NewWorkflowOption {
* Sets the input of the workflow to start. * Sets the input of the workflow to start.
* *
* @param input the input of the new workflow * @param input the input of the new workflow
* @return this {@link NewWorkflowOption} object * @return this {@link NewWorkflowOptions} object
*/ */
public NewWorkflowOption setInput(Object input) { public NewWorkflowOptions setInput(Object input) {
this.newOrchestrationInstanceOptions.setInput(input); this.newOrchestrationInstanceOptions.setInput(input);
return this; return this;
} }
@ -65,9 +65,9 @@ public class NewWorkflowOption {
* to start them at a specific time in the future. * to start them at a specific time in the future.
* *
* @param startTime the start time of the new workflow * @param startTime the start time of the new workflow
* @return this {@link NewWorkflowOption} object * @return this {@link NewWorkflowOptions} object
*/ */
public NewWorkflowOption setStartTime(Instant startTime) { public NewWorkflowOptions setStartTime(Instant startTime) {
this.newOrchestrationInstanceOptions.setStartTime(startTime); this.newOrchestrationInstanceOptions.setStartTime(startTime);
return this; return this;
} }

View File

@ -14,11 +14,14 @@ limitations under the License.
package io.dapr.workflows.runtime; package io.dapr.workflows.runtime;
import com.microsoft.durabletask.CompositeTaskFailedException; import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.RetryPolicy;
import com.microsoft.durabletask.Task; import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException; import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskOptions; import com.microsoft.durabletask.TaskOptions;
import com.microsoft.durabletask.TaskOrchestrationContext; import com.microsoft.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryPolicy;
import io.dapr.workflows.runtime.saga.DefaultSagaContext; import io.dapr.workflows.runtime.saga.DefaultSagaContext;
import io.dapr.workflows.saga.Saga; import io.dapr.workflows.saga.Saga;
import io.dapr.workflows.saga.SagaContext; import io.dapr.workflows.saga.SagaContext;
@ -149,7 +152,7 @@ public class DefaultWorkflowContext implements WorkflowContext {
* before the event is received * before the event is received
*/ */
@Override @Override
public <V> Task<Void> waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException { public Task<Void> waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException {
return this.innerContext.waitForExternalEvent(name, timeout, Void.class); return this.innerContext.waitForExternalEvent(name, timeout, Void.class);
} }
@ -165,7 +168,7 @@ public class DefaultWorkflowContext implements WorkflowContext {
* @return a new {@link Task} that completes when the external event is received * @return a new {@link Task} that completes when the external event is received
*/ */
@Override @Override
public <V> Task<Void> waitForExternalEvent(String name) throws TaskCanceledException { public Task<Void> waitForExternalEvent(String name) throws TaskCanceledException {
return this.innerContext.waitForExternalEvent(name, null, Void.class); return this.innerContext.waitForExternalEvent(name, null, Void.class);
} }
@ -177,8 +180,10 @@ public class DefaultWorkflowContext implements WorkflowContext {
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
public <V> Task<V> callActivity(String name, Object input, TaskOptions options, Class<V> returnType) { public <V> Task<V> callActivity(String name, Object input, WorkflowTaskOptions options, Class<V> returnType) {
return this.innerContext.callActivity(name, input, options, returnType); TaskOptions taskOptions = toTaskOptions(options);
return this.innerContext.callActivity(name, input, taskOptions, returnType);
} }
/** /**
@ -214,9 +219,10 @@ public class DefaultWorkflowContext implements WorkflowContext {
*/ */
@Override @Override
public <V> Task<V> callChildWorkflow(String name, @Nullable Object input, @Nullable String instanceID, public <V> Task<V> callChildWorkflow(String name, @Nullable Object input, @Nullable String instanceID,
@Nullable TaskOptions options, Class<V> returnType) { @Nullable WorkflowTaskOptions options, Class<V> returnType) {
TaskOptions taskOptions = toTaskOptions(options);
return this.innerContext.callSubOrchestrator(name, input, instanceID, options, returnType); return this.innerContext.callSubOrchestrator(name, input, instanceID, taskOptions, returnType);
} }
/** /**
@ -251,4 +257,21 @@ public class DefaultWorkflowContext implements WorkflowContext {
return new DefaultSagaContext(this.saga, this); return new DefaultSagaContext(this.saga, this);
} }
private static TaskOptions toTaskOptions(WorkflowTaskOptions options) {
if (options == null) {
return null;
}
WorkflowTaskRetryPolicy workflowTaskRetryPolicy = options.getRetryPolicy();
RetryPolicy retryPolicy = new RetryPolicy(
workflowTaskRetryPolicy.getMaxNumberOfAttempts(),
workflowTaskRetryPolicy.getFirstRetryInterval()
);
retryPolicy.setBackoffCoefficient(workflowTaskRetryPolicy.getBackoffCoefficient());
retryPolicy.setRetryTimeout(workflowTaskRetryPolicy.getRetryTimeout());
return new TaskOptions(retryPolicy);
}
} }

View File

@ -13,7 +13,7 @@ limitations under the License.
package io.dapr.workflows.saga; package io.dapr.workflows.saga;
import com.microsoft.durabletask.TaskOptions; import io.dapr.workflows.WorkflowTaskOptions;
/** /**
* Information for a compensation activity. * Information for a compensation activity.
@ -21,7 +21,7 @@ import com.microsoft.durabletask.TaskOptions;
class CompensationInformation { class CompensationInformation {
private final String compensationActivityClassName; private final String compensationActivityClassName;
private final Object compensationActivityInput; private final Object compensationActivityInput;
private final TaskOptions taskOptions; private final WorkflowTaskOptions options;
/** /**
* Constructor for a compensation information. * Constructor for a compensation information.
@ -30,13 +30,13 @@ class CompensationInformation {
* compensation. * compensation.
* @param compensationActivityInput Input of the activity to do * @param compensationActivityInput Input of the activity to do
* compensation. * compensation.
* @param taskOptions task options to set retry strategy * @param options Task options to set retry strategy
*/ */
public CompensationInformation(String compensationActivityClassName, public CompensationInformation(String compensationActivityClassName,
Object compensationActivityInput, TaskOptions taskOptions) { Object compensationActivityInput, WorkflowTaskOptions options) {
this.compensationActivityClassName = compensationActivityClassName; this.compensationActivityClassName = compensationActivityClassName;
this.compensationActivityInput = compensationActivityInput; this.compensationActivityInput = compensationActivityInput;
this.taskOptions = taskOptions; this.options = options;
} }
/** /**
@ -62,7 +62,7 @@ class CompensationInformation {
* *
* @return task options, null if not set * @return task options, null if not set
*/ */
public TaskOptions getTaskOptions() { public WorkflowTaskOptions getExecutionOptions() {
return taskOptions; return options;
} }
} }

View File

@ -14,28 +14,28 @@ limitations under the License.
package io.dapr.workflows.saga; package io.dapr.workflows.saga;
import com.microsoft.durabletask.Task; import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskOptions;
import com.microsoft.durabletask.interruption.ContinueAsNewInterruption; import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
import com.microsoft.durabletask.interruption.OrchestratorBlockedException; import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowTaskOptions;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
public final class Saga { public final class Saga {
private final SagaOption option; private final SagaOptions options;
private final List<CompensationInformation> compensationActivities = new ArrayList<>(); private final List<CompensationInformation> compensationActivities = new ArrayList<>();
/** /**
* Build up a Saga with its options. * Build up a Saga with its options.
* *
* @param option Saga option. * @param options Saga option.
*/ */
public Saga(SagaOption option) { public Saga(SagaOptions options) {
if (option == null) { if (options == null) {
throw new IllegalArgumentException("option is required and should not be null."); throw new IllegalArgumentException("option is required and should not be null.");
} }
this.option = option; this.options = options;
} }
/** /**
@ -50,16 +50,16 @@ public final class Saga {
/** /**
* Register a compensation activity. * Register a compensation activity.
* *
* @param activityClassName name of the activity class * @param activityClassName name of the activity class
* @param activityInput input of the activity to be compensated * @param activityInput input of the activity to be compensated
* @param taskOptions task options to set retry strategy * @param options task options to set retry strategy
*/ */
public void registerCompensation(String activityClassName, Object activityInput, TaskOptions taskOptions) { public void registerCompensation(String activityClassName, Object activityInput, WorkflowTaskOptions options) {
if (activityClassName == null || activityClassName.isEmpty()) { if (activityClassName == null || activityClassName.isEmpty()) {
throw new IllegalArgumentException("activityClassName is required and should not be null or empty."); throw new IllegalArgumentException("activityClassName is required and should not be null or empty.");
} }
this.compensationActivities.add(new CompensationInformation(activityClassName, activityInput, taskOptions)); this.compensationActivities.add(new CompensationInformation(activityClassName, activityInput, options));
} }
/** /**
@ -72,7 +72,7 @@ public final class Saga {
// Special case: when parallel compensation is enabled and there is only one // Special case: when parallel compensation is enabled and there is only one
// compensation, we still // compensation, we still
// compensate sequentially. // compensate sequentially.
if (option.isParallelCompensation() && compensationActivities.size() > 1) { if (options.isParallelCompensation() && compensationActivities.size() > 1) {
compensateInParallel(ctx); compensateInParallel(ctx);
} else { } else {
compensateSequentially(ctx); compensateSequentially(ctx);
@ -109,7 +109,7 @@ public final class Saga {
sagaException.addSuppressed(e); sagaException.addSuppressed(e);
} }
if (!option.isContinueWithError()) { if (!options.isContinueWithError()) {
throw sagaException; throw sagaException;
} }
} }
@ -124,6 +124,6 @@ public final class Saga {
throws SagaCompensationException { throws SagaCompensationException {
String activityClassName = info.getCompensationActivityClassName(); String activityClassName = info.getCompensationActivityClassName();
return ctx.callActivity(activityClassName, info.getCompensationActivityInput(), return ctx.callActivity(activityClassName, info.getCompensationActivityInput(),
info.getTaskOptions()); info.getExecutionOptions());
} }
} }

View File

@ -16,12 +16,12 @@ package io.dapr.workflows.saga;
/** /**
* Saga option. * Saga option.
*/ */
public final class SagaOption { public final class SagaOptions {
private final boolean parallelCompensation; private final boolean parallelCompensation;
private final int maxParallelThread; private final int maxParallelThread;
private final boolean continueWithError; private final boolean continueWithError;
private SagaOption(boolean parallelCompensation, int maxParallelThread, boolean continueWithError) { private SagaOptions(boolean parallelCompensation, int maxParallelThread, boolean continueWithError) {
this.parallelCompensation = parallelCompensation; this.parallelCompensation = parallelCompensation;
this.maxParallelThread = maxParallelThread; this.maxParallelThread = maxParallelThread;
this.continueWithError = continueWithError; this.continueWithError = continueWithError;
@ -95,8 +95,8 @@ public final class SagaOption {
* Build Saga option. * Build Saga option.
* @return Saga option * @return Saga option
*/ */
public SagaOption build() { public SagaOptions build() {
return new SagaOption(this.parallelCompensation, this.maxParallelThread, this.continueWithError); return new SagaOptions(this.parallelCompensation, this.maxParallelThread, this.continueWithError);
} }
} }
} }

View File

@ -14,7 +14,6 @@ limitations under the License.
package io.dapr.workflows; package io.dapr.workflows;
import com.microsoft.durabletask.CompositeTaskFailedException; import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.RetryPolicy;
import com.microsoft.durabletask.Task; import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException; import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskOptions; import com.microsoft.durabletask.TaskOptions;
@ -27,6 +26,7 @@ import io.dapr.workflows.saga.SagaContext;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.time.Duration; import java.time.Duration;
@ -35,9 +35,10 @@ import java.time.ZonedDateTime;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -86,17 +87,17 @@ public class DefaultWorkflowContextTest {
} }
@Override @Override
public <V> Task<Void> waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException { public Task<Void> waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException {
return null; return null;
} }
@Override @Override
public <V> Task<Void> waitForExternalEvent(String name) throws TaskCanceledException { public Task<Void> waitForExternalEvent(String name) throws TaskCanceledException {
return null; return null;
} }
@Override @Override
public <V> Task<V> callActivity(String name, Object input, TaskOptions options, Class<V> returnType) { public <V> Task<V> callActivity(String name, Object input, WorkflowTaskOptions options, Class<V> returnType) {
return null; return null;
} }
@ -127,7 +128,7 @@ public class DefaultWorkflowContextTest {
@Override @Override
public <V> Task<V> callChildWorkflow(String name, @Nullable Object input, @Nullable String instanceID, public <V> Task<V> callChildWorkflow(String name, @Nullable Object input, @Nullable String instanceID,
@Nullable TaskOptions options, Class<V> returnType) { @Nullable WorkflowTaskOptions options, Class<V> returnType) {
return null; return null;
} }
@ -190,15 +191,12 @@ public class DefaultWorkflowContextTest {
@Test @Test
public void DaprWorkflowContextWithEmptyInnerContext() { public void DaprWorkflowContextWithEmptyInnerContext() {
assertThrows(IllegalArgumentException.class, () -> { assertThrows(IllegalArgumentException.class, () ->
context = new DefaultWorkflowContext(mockInnerContext, (Logger)null); context = new DefaultWorkflowContext(mockInnerContext, (Logger)null)); }
}); }
@Test @Test
public void DaprWorkflowContextWithEmptyLogger() { public void DaprWorkflowContextWithEmptyLogger() {
assertThrows(IllegalArgumentException.class, () -> { assertThrows(IllegalArgumentException.class, () -> context = new DefaultWorkflowContext(null, (Logger)null));
context = new DefaultWorkflowContext(null, (Logger)null);
});
} }
@Test @Test
@ -291,11 +289,28 @@ public class DefaultWorkflowContextTest {
String expectedName = "TestActivity"; String expectedName = "TestActivity";
String expectedInput = "TestInput"; String expectedInput = "TestInput";
String expectedInstanceId = "TestInstanceId"; String expectedInstanceId = "TestInstanceId";
TaskOptions expectedOptions = new TaskOptions(new RetryPolicy(1, Duration.ofSeconds(10))); WorkflowTaskRetryPolicy retryPolicy = WorkflowTaskRetryPolicy.newBuilder()
.setMaxNumberOfAttempts(1)
.setFirstRetryInterval(Duration.ofSeconds(10))
.build();
WorkflowTaskOptions executionOptions = new WorkflowTaskOptions(retryPolicy);
ArgumentCaptor<TaskOptions> captor = ArgumentCaptor.forClass(TaskOptions.class);
context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, expectedOptions, String.class); context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, executionOptions, String.class);
verify(mockInnerContext, times(1)).callSubOrchestrator(expectedName, expectedInput, expectedInstanceId,
expectedOptions, String.class); verify(mockInnerContext, times(1))
.callSubOrchestrator(
eq(expectedName),
eq(expectedInput),
eq(expectedInstanceId),
captor.capture(),
eq(String.class)
);
TaskOptions taskOptions = captor.getValue();
assertEquals(retryPolicy.getMaxNumberOfAttempts(), taskOptions.getRetryPolicy().getMaxNumberOfAttempts());
assertEquals(retryPolicy.getFirstRetryInterval(), taskOptions.getRetryPolicy().getFirstRetryInterval());
} }
@Test @Test
@ -326,14 +341,12 @@ public class DefaultWorkflowContextTest {
WorkflowContext context = new DefaultWorkflowContext(mockInnerContext, saga); WorkflowContext context = new DefaultWorkflowContext(mockInnerContext, saga);
SagaContext sagaContext = context.getSagaContext(); SagaContext sagaContext = context.getSagaContext();
assertNotNull("SagaContext should not be null", sagaContext); assertNotNull(sagaContext, "SagaContext should not be null");
} }
@Test @Test
public void getSagaContextTest_sagaDisabled() { public void getSagaContextTest_sagaDisabled() {
WorkflowContext context = new DefaultWorkflowContext(mockInnerContext); WorkflowContext context = new DefaultWorkflowContext(mockInnerContext);
assertThrows(UnsupportedOperationException.class, () -> { assertThrows(UnsupportedOperationException.class, context::getSagaContext);
context.getSagaContext();
});
} }
} }

View File

@ -22,7 +22,7 @@ import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
import io.dapr.workflows.saga.SagaCompensationException; import io.dapr.workflows.saga.SagaCompensationException;
import io.dapr.workflows.saga.SagaContext; import io.dapr.workflows.saga.SagaContext;
import io.dapr.workflows.saga.SagaOption; import io.dapr.workflows.saga.SagaOptions;
public class WorkflowTest { public class WorkflowTest {
@ -188,8 +188,8 @@ public class WorkflowTest {
} }
@Override @Override
public SagaOption getSagaOption() { public SagaOptions getSagaOption() {
return SagaOption.newBuilder() return SagaOptions.newBuilder()
.setParallelCompensation(false) .setParallelCompensation(false)
.build(); .build();
} }

View File

@ -111,13 +111,13 @@ public class DaprWorkflowClientTest {
public void scheduleNewWorkflowWithNewWorkflowOption() { public void scheduleNewWorkflowWithNewWorkflowOption() {
String expectedName = TestWorkflow.class.getCanonicalName(); String expectedName = TestWorkflow.class.getCanonicalName();
Object expectedInput = new Object(); Object expectedInput = new Object();
NewWorkflowOption newWorkflowOption = new NewWorkflowOption(); NewWorkflowOptions newWorkflowOptions = new NewWorkflowOptions();
newWorkflowOption.setInput(expectedInput).setStartTime(Instant.now()); newWorkflowOptions.setInput(expectedInput).setStartTime(Instant.now());
client.scheduleNewWorkflow(TestWorkflow.class, newWorkflowOption); client.scheduleNewWorkflow(TestWorkflow.class, newWorkflowOptions);
verify(mockInnerClient, times(1)) verify(mockInnerClient, times(1))
.scheduleNewOrchestrationInstance(expectedName, newWorkflowOption.getNewOrchestrationInstanceOptions()); .scheduleNewOrchestrationInstance(expectedName, newWorkflowOptions.getNewOrchestrationInstanceOptions());
} }
@Test @Test

View File

@ -5,11 +5,11 @@ import org.junit.jupiter.api.Test;
import java.time.Instant; import java.time.Instant;
public class NewWorkflowOptionTest { public class NewWorkflowOptionsTest {
@Test @Test
void testNewWorkflowOption() { void testNewWorkflowOption() {
NewWorkflowOption workflowOption = new NewWorkflowOption(); NewWorkflowOptions workflowOption = new NewWorkflowOptions();
String version = "v1"; String version = "v1";
String instanceId = "123"; String instanceId = "123";
Object input = new Object(); Object input = new Object();

View File

@ -51,7 +51,7 @@ public class SagaIntegrationTest {
} }
private boolean doExecuteWorkflowWithSaga(boolean parallelCompensation) { private boolean doExecuteWorkflowWithSaga(boolean parallelCompensation) {
SagaOption config = SagaOption.newBuilder() SagaOptions config = SagaOptions.newBuilder()
.setParallelCompensation(parallelCompensation) .setParallelCompensation(parallelCompensation)
.setContinueWithError(true).build(); .setContinueWithError(true).build();
Saga saga = new Saga(config); Saga saga = new Saga(config);

View File

@ -5,15 +5,15 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.Test; import org.junit.Test;
public class SagaOptionTest { public class SagaOptionsTest {
@Test @Test
public void testBuild() { public void testBuild() {
SagaOption.Builder builder = SagaOption.newBuilder(); SagaOptions.Builder builder = SagaOptions.newBuilder();
builder.setParallelCompensation(true); builder.setParallelCompensation(true);
builder.setMaxParallelThread(32); builder.setMaxParallelThread(32);
builder.setContinueWithError(false); builder.setContinueWithError(false);
SagaOption option = builder.build(); SagaOptions option = builder.build();
assertEquals(true, option.isParallelCompensation()); assertEquals(true, option.isParallelCompensation());
assertEquals(32, option.getMaxParallelThread()); assertEquals(32, option.getMaxParallelThread());
@ -22,8 +22,8 @@ public class SagaOptionTest {
@Test @Test
public void testBuild_default() { public void testBuild_default() {
SagaOption.Builder builder = SagaOption.newBuilder(); SagaOptions.Builder builder = SagaOptions.newBuilder();
SagaOption option = builder.build(); SagaOptions option = builder.build();
assertEquals(false, option.isParallelCompensation()); assertEquals(false, option.isParallelCompensation());
assertEquals(16, option.getMaxParallelThread()); assertEquals(16, option.getMaxParallelThread());
@ -32,7 +32,7 @@ public class SagaOptionTest {
@Test @Test
public void testsetMaxParallelThread() { public void testsetMaxParallelThread() {
SagaOption.Builder builder = SagaOption.newBuilder(); SagaOptions.Builder builder = SagaOptions.newBuilder();
assertThrows(IllegalArgumentException.class, () -> { assertThrows(IllegalArgumentException.class, () -> {
builder.setMaxParallelThread(0); builder.setMaxParallelThread(0);

View File

@ -33,13 +33,13 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import io.dapr.workflows.WorkflowActivityContext; import io.dapr.workflows.WorkflowActivityContext;
import io.dapr.workflows.WorkflowTaskOptions;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import com.microsoft.durabletask.Task; import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskOptions;
import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowActivity; import io.dapr.workflows.WorkflowActivity;
@ -48,7 +48,7 @@ public class SagaTest {
public static WorkflowContext createMockContext() { public static WorkflowContext createMockContext() {
WorkflowContext workflowContext = mock(WorkflowContext.class); WorkflowContext workflowContext = mock(WorkflowContext.class);
when(workflowContext.callActivity(anyString(), any(), eq((TaskOptions) null))).thenAnswer(new ActivityAnswer()); when(workflowContext.callActivity(anyString(), any(), eq((WorkflowTaskOptions) null))).thenAnswer(new ActivityAnswer());
when(workflowContext.allOf(anyList())).thenAnswer(new AllActivityAnswer()); when(workflowContext.allOf(anyList())).thenAnswer(new AllActivityAnswer());
return workflowContext; return workflowContext;
@ -63,7 +63,7 @@ public class SagaTest {
@Test @Test
public void testregisterCompensation() { public void testregisterCompensation() {
SagaOption config = SagaOption.newBuilder() SagaOptions config = SagaOptions.newBuilder()
.setParallelCompensation(false) .setParallelCompensation(false)
.setContinueWithError(true).build(); .setContinueWithError(true).build();
Saga saga = new Saga(config); Saga saga = new Saga(config);
@ -73,7 +73,7 @@ public class SagaTest {
@Test @Test
public void testregisterCompensation_IllegalArgument() { public void testregisterCompensation_IllegalArgument() {
SagaOption config = SagaOption.newBuilder() SagaOptions config = SagaOptions.newBuilder()
.setParallelCompensation(false) .setParallelCompensation(false)
.setContinueWithError(true).build(); .setContinueWithError(true).build();
Saga saga = new Saga(config); Saga saga = new Saga(config);
@ -88,43 +88,43 @@ public class SagaTest {
@Test @Test
public void testCompensateInParallel() { public void testCompensateInParallel() {
MockCompentationActivity.compensateOrder.clear(); MockCompensationActivity.compensateOrder.clear();
SagaOption config = SagaOption.newBuilder() SagaOptions config = SagaOptions.newBuilder()
.setParallelCompensation(true).build(); .setParallelCompensation(true).build();
Saga saga = new Saga(config); Saga saga = new Saga(config);
MockActivityInput input1 = new MockActivityInput(); MockActivityInput input1 = new MockActivityInput();
input1.setOrder(1); input1.setOrder(1);
saga.registerCompensation(MockCompentationActivity.class.getName(), input1); saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
MockActivityInput input2 = new MockActivityInput(); MockActivityInput input2 = new MockActivityInput();
input2.setOrder(2); input2.setOrder(2);
saga.registerCompensation(MockCompentationActivity.class.getName(), input2); saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
MockActivityInput input3 = new MockActivityInput(); MockActivityInput input3 = new MockActivityInput();
input3.setOrder(3); input3.setOrder(3);
saga.registerCompensation(MockCompentationActivity.class.getName(), input3); saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
saga.compensate(createMockContext()); saga.compensate(createMockContext());
assertEquals(3, MockCompentationActivity.compensateOrder.size()); assertEquals(3, MockCompensationActivity.compensateOrder.size());
} }
@Test @Test
public void testCompensateInParallel_exception_1failed() { public void testCompensateInParallel_exception_1failed() {
MockCompentationActivity.compensateOrder.clear(); MockCompensationActivity.compensateOrder.clear();
SagaOption config = SagaOption.newBuilder() SagaOptions config = SagaOptions.newBuilder()
.setParallelCompensation(true).build(); .setParallelCompensation(true).build();
Saga saga = new Saga(config); Saga saga = new Saga(config);
MockActivityInput input1 = new MockActivityInput(); MockActivityInput input1 = new MockActivityInput();
input1.setOrder(1); input1.setOrder(1);
saga.registerCompensation(MockCompentationActivity.class.getName(), input1); saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
MockActivityInput input2 = new MockActivityInput(); MockActivityInput input2 = new MockActivityInput();
input2.setOrder(2); input2.setOrder(2);
input2.setThrowException(true); input2.setThrowException(true);
saga.registerCompensation(MockCompentationActivity.class.getName(), input2); saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
MockActivityInput input3 = new MockActivityInput(); MockActivityInput input3 = new MockActivityInput();
input3.setOrder(3); input3.setOrder(3);
saga.registerCompensation(MockCompentationActivity.class.getName(), input3); saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> {
saga.compensate(createMockContext()); saga.compensate(createMockContext());
@ -132,110 +132,110 @@ public class SagaTest {
assertNotNull(exception.getCause()); assertNotNull(exception.getCause());
// 3 compentation activities, 2 succeed, 1 failed // 3 compentation activities, 2 succeed, 1 failed
assertEquals(0, exception.getSuppressed().length); assertEquals(0, exception.getSuppressed().length);
assertEquals(2, MockCompentationActivity.compensateOrder.size()); assertEquals(2, MockCompensationActivity.compensateOrder.size());
} }
@Test @Test
public void testCompensateInParallel_exception_2failed() { public void testCompensateInParallel_exception_2failed() {
MockCompentationActivity.compensateOrder.clear(); MockCompensationActivity.compensateOrder.clear();
SagaOption config = SagaOption.newBuilder() SagaOptions config = SagaOptions.newBuilder()
.setParallelCompensation(true).build(); .setParallelCompensation(true).build();
Saga saga = new Saga(config); Saga saga = new Saga(config);
MockActivityInput input1 = new MockActivityInput(); MockActivityInput input1 = new MockActivityInput();
input1.setOrder(1); input1.setOrder(1);
saga.registerCompensation(MockCompentationActivity.class.getName(), input1); saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
MockActivityInput input2 = new MockActivityInput(); MockActivityInput input2 = new MockActivityInput();
input2.setOrder(2); input2.setOrder(2);
input2.setThrowException(true); input2.setThrowException(true);
saga.registerCompensation(MockCompentationActivity.class.getName(), input2); saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
MockActivityInput input3 = new MockActivityInput(); MockActivityInput input3 = new MockActivityInput();
input3.setOrder(3); input3.setOrder(3);
input3.setThrowException(true); input3.setThrowException(true);
saga.registerCompensation(MockCompentationActivity.class.getName(), input3); saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> {
saga.compensate(createMockContext()); saga.compensate(createMockContext());
}); });
assertNotNull(exception.getCause()); assertNotNull(exception.getCause());
// 3 compentation activities, 1 succeed, 2 failed // 3 compentation activities, 1 succeed, 2 failed
assertEquals(1, MockCompentationActivity.compensateOrder.size()); assertEquals(1, MockCompensationActivity.compensateOrder.size());
} }
@Test @Test
public void testCompensateInParallel_exception_3failed() { public void testCompensateInParallel_exception_3failed() {
MockCompentationActivity.compensateOrder.clear(); MockCompensationActivity.compensateOrder.clear();
SagaOption config = SagaOption.newBuilder() SagaOptions config = SagaOptions.newBuilder()
.setParallelCompensation(true).build(); .setParallelCompensation(true).build();
Saga saga = new Saga(config); Saga saga = new Saga(config);
MockActivityInput input1 = new MockActivityInput(); MockActivityInput input1 = new MockActivityInput();
input1.setOrder(1); input1.setOrder(1);
input1.setThrowException(true); input1.setThrowException(true);
saga.registerCompensation(MockCompentationActivity.class.getName(), input1); saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
MockActivityInput input2 = new MockActivityInput(); MockActivityInput input2 = new MockActivityInput();
input2.setOrder(2); input2.setOrder(2);
input2.setThrowException(true); input2.setThrowException(true);
saga.registerCompensation(MockCompentationActivity.class.getName(), input2); saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
MockActivityInput input3 = new MockActivityInput(); MockActivityInput input3 = new MockActivityInput();
input3.setOrder(3); input3.setOrder(3);
input3.setThrowException(true); input3.setThrowException(true);
saga.registerCompensation(MockCompentationActivity.class.getName(), input3); saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> {
saga.compensate(createMockContext()); saga.compensate(createMockContext());
}); });
assertNotNull(exception.getCause()); assertNotNull(exception.getCause());
// 3 compentation activities, 0 succeed, 3 failed // 3 compentation activities, 0 succeed, 3 failed
assertEquals(0, MockCompentationActivity.compensateOrder.size()); assertEquals(0, MockCompensationActivity.compensateOrder.size());
} }
@Test @Test
public void testCompensateSequentially() { public void testCompensateSequentially() {
MockCompentationActivity.compensateOrder.clear(); MockCompensationActivity.compensateOrder.clear();
SagaOption config = SagaOption.newBuilder() SagaOptions config = SagaOptions.newBuilder()
.setParallelCompensation(false).build(); .setParallelCompensation(false).build();
Saga saga = new Saga(config); Saga saga = new Saga(config);
MockActivityInput input1 = new MockActivityInput(); MockActivityInput input1 = new MockActivityInput();
input1.setOrder(1); input1.setOrder(1);
saga.registerCompensation(MockCompentationActivity.class.getName(), input1); saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
MockActivityInput input2 = new MockActivityInput(); MockActivityInput input2 = new MockActivityInput();
input2.setOrder(2); input2.setOrder(2);
saga.registerCompensation(MockCompentationActivity.class.getName(), input2); saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
MockActivityInput input3 = new MockActivityInput(); MockActivityInput input3 = new MockActivityInput();
input3.setOrder(3); input3.setOrder(3);
saga.registerCompensation(MockCompentationActivity.class.getName(), input3); saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
saga.compensate(createMockContext()); saga.compensate(createMockContext());
assertEquals(3, MockCompentationActivity.compensateOrder.size()); assertEquals(3, MockCompensationActivity.compensateOrder.size());
// the order should be 3 / 2 / 1 // the order should be 3 / 2 / 1
assertEquals(Integer.valueOf(3), MockCompentationActivity.compensateOrder.get(0)); assertEquals(Integer.valueOf(3), MockCompensationActivity.compensateOrder.get(0));
assertEquals(Integer.valueOf(2), MockCompentationActivity.compensateOrder.get(1)); assertEquals(Integer.valueOf(2), MockCompensationActivity.compensateOrder.get(1));
assertEquals(Integer.valueOf(1), MockCompentationActivity.compensateOrder.get(2)); assertEquals(Integer.valueOf(1), MockCompensationActivity.compensateOrder.get(2));
} }
@Test @Test
public void testCompensateSequentially_continueWithError() { public void testCompensateSequentially_continueWithError() {
MockCompentationActivity.compensateOrder.clear(); MockCompensationActivity.compensateOrder.clear();
SagaOption config = SagaOption.newBuilder() SagaOptions config = SagaOptions.newBuilder()
.setParallelCompensation(false) .setParallelCompensation(false)
.setContinueWithError(true) .setContinueWithError(true)
.build(); .build();
Saga saga = new Saga(config); Saga saga = new Saga(config);
MockActivityInput input1 = new MockActivityInput(); MockActivityInput input1 = new MockActivityInput();
input1.setOrder(1); input1.setOrder(1);
saga.registerCompensation(MockCompentationActivity.class.getName(), input1); saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
MockActivityInput input2 = new MockActivityInput(); MockActivityInput input2 = new MockActivityInput();
input2.setOrder(2); input2.setOrder(2);
input2.setThrowException(true); input2.setThrowException(true);
saga.registerCompensation(MockCompentationActivity.class.getName(), input2); saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
MockActivityInput input3 = new MockActivityInput(); MockActivityInput input3 = new MockActivityInput();
input3.setOrder(3); input3.setOrder(3);
saga.registerCompensation(MockCompentationActivity.class.getName(), input3); saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> {
saga.compensate(createMockContext()); saga.compensate(createMockContext());
@ -244,32 +244,32 @@ public class SagaTest {
assertEquals(0, exception.getSuppressed().length); assertEquals(0, exception.getSuppressed().length);
// 3 compentation activities, 2 succeed, 1 failed // 3 compentation activities, 2 succeed, 1 failed
assertEquals(2, MockCompentationActivity.compensateOrder.size()); assertEquals(2, MockCompensationActivity.compensateOrder.size());
// the order should be 3 / 1 // the order should be 3 / 1
assertEquals(Integer.valueOf(3), MockCompentationActivity.compensateOrder.get(0)); assertEquals(Integer.valueOf(3), MockCompensationActivity.compensateOrder.get(0));
assertEquals(Integer.valueOf(1), MockCompentationActivity.compensateOrder.get(1)); assertEquals(Integer.valueOf(1), MockCompensationActivity.compensateOrder.get(1));
} }
@Test @Test
public void testCompensateSequentially_continueWithError_suppressed() { public void testCompensateSequentially_continueWithError_suppressed() {
MockCompentationActivity.compensateOrder.clear(); MockCompensationActivity.compensateOrder.clear();
SagaOption config = SagaOption.newBuilder() SagaOptions config = SagaOptions.newBuilder()
.setParallelCompensation(false) .setParallelCompensation(false)
.setContinueWithError(true) .setContinueWithError(true)
.build(); .build();
Saga saga = new Saga(config); Saga saga = new Saga(config);
MockActivityInput input1 = new MockActivityInput(); MockActivityInput input1 = new MockActivityInput();
input1.setOrder(1); input1.setOrder(1);
saga.registerCompensation(MockCompentationActivity.class.getName(), input1); saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
MockActivityInput input2 = new MockActivityInput(); MockActivityInput input2 = new MockActivityInput();
input2.setOrder(2); input2.setOrder(2);
input2.setThrowException(true); input2.setThrowException(true);
saga.registerCompensation(MockCompentationActivity.class.getName(), input2); saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
MockActivityInput input3 = new MockActivityInput(); MockActivityInput input3 = new MockActivityInput();
input3.setOrder(3); input3.setOrder(3);
input3.setThrowException(true); input3.setThrowException(true);
saga.registerCompensation(MockCompentationActivity.class.getName(), input3); saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> {
saga.compensate(createMockContext()); saga.compensate(createMockContext());
@ -278,30 +278,30 @@ public class SagaTest {
assertEquals(1, exception.getSuppressed().length); assertEquals(1, exception.getSuppressed().length);
// 3 compentation activities, 1 succeed, 2 failed // 3 compentation activities, 1 succeed, 2 failed
assertEquals(1, MockCompentationActivity.compensateOrder.size()); assertEquals(1, MockCompensationActivity.compensateOrder.size());
// the order should be 3 / 1 // the order should be 3 / 1
assertEquals(Integer.valueOf(1), MockCompentationActivity.compensateOrder.get(0)); assertEquals(Integer.valueOf(1), MockCompensationActivity.compensateOrder.get(0));
} }
@Test @Test
public void testCompensateSequentially_notContinueWithError() { public void testCompensateSequentially_notContinueWithError() {
MockCompentationActivity.compensateOrder.clear(); MockCompensationActivity.compensateOrder.clear();
SagaOption config = SagaOption.newBuilder() SagaOptions config = SagaOptions.newBuilder()
.setParallelCompensation(false) .setParallelCompensation(false)
.setContinueWithError(false) .setContinueWithError(false)
.build(); .build();
Saga saga = new Saga(config); Saga saga = new Saga(config);
MockActivityInput input1 = new MockActivityInput(); MockActivityInput input1 = new MockActivityInput();
input1.setOrder(1); input1.setOrder(1);
saga.registerCompensation(MockCompentationActivity.class.getName(), input1); saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
MockActivityInput input2 = new MockActivityInput(); MockActivityInput input2 = new MockActivityInput();
input2.setOrder(2); input2.setOrder(2);
input2.setThrowException(true); input2.setThrowException(true);
saga.registerCompensation(MockCompentationActivity.class.getName(), input2); saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
MockActivityInput input3 = new MockActivityInput(); MockActivityInput input3 = new MockActivityInput();
input3.setOrder(3); input3.setOrder(3);
saga.registerCompensation(MockCompentationActivity.class.getName(), input3); saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> {
saga.compensate(createMockContext()); saga.compensate(createMockContext());
@ -310,9 +310,9 @@ public class SagaTest {
assertEquals(0, exception.getSuppressed().length); assertEquals(0, exception.getSuppressed().length);
// 3 compentation activities, 1 succeed, 1 failed and not continue // 3 compentation activities, 1 succeed, 1 failed and not continue
assertEquals(1, MockCompentationActivity.compensateOrder.size()); assertEquals(1, MockCompensationActivity.compensateOrder.size());
// the order should be 3 / 1 // the order should be 3 / 1
assertEquals(Integer.valueOf(3), MockCompentationActivity.compensateOrder.get(0)); assertEquals(Integer.valueOf(3), MockCompensationActivity.compensateOrder.get(0));
} }
public static class MockActivity implements WorkflowActivity { public static class MockActivity implements WorkflowActivity {
@ -325,9 +325,9 @@ public class SagaTest {
} }
} }
public static class MockCompentationActivity implements WorkflowActivity { public static class MockCompensationActivity implements WorkflowActivity {
private static List<Integer> compensateOrder = Collections.synchronizedList(new ArrayList<>()); private static final List<Integer> compensateOrder = Collections.synchronizedList(new ArrayList<>());
@Override @Override
public Object run(WorkflowActivityContext ctx) { public Object run(WorkflowActivityContext ctx) {