diff --git a/sdk-tests/src/test/java/io/dapr/it/spring/data/DaprKeyValueRepositoryIT.java b/sdk-tests/src/test/java/io/dapr/it/spring/data/DaprKeyValueRepositoryIT.java index 60d4b8854..131e1e615 100644 --- a/sdk-tests/src/test/java/io/dapr/it/spring/data/DaprKeyValueRepositoryIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/spring/data/DaprKeyValueRepositoryIT.java @@ -29,13 +29,13 @@ import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME; import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME; +import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG; import static org.junit.jupiter.api.Assertions.*; /** @@ -65,7 +65,7 @@ public class DaprKeyValueRepositoryIT { @Container @ServiceConnection - private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2") + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG) .withAppName("postgresql-repository-dapr-app") .withNetwork(DAPR_NETWORK) .withComponent(new Component(STATE_STORE_NAME, "state.postgresql", "v1", STATE_STORE_PROPERTIES)) diff --git a/sdk-tests/src/test/java/io/dapr/it/spring/data/MySQLDaprKeyValueTemplateIT.java b/sdk-tests/src/test/java/io/dapr/it/spring/data/MySQLDaprKeyValueTemplateIT.java index d365685d9..6f372c39b 100644 --- a/sdk-tests/src/test/java/io/dapr/it/spring/data/MySQLDaprKeyValueTemplateIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/spring/data/MySQLDaprKeyValueTemplateIT.java @@ -26,8 +26,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.testcontainers.service.connection.ServiceConnection; import org.springframework.data.keyvalue.core.query.KeyValueQuery; import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.testcontainers.containers.MySQLContainer; import org.testcontainers.containers.Network; @@ -39,7 +37,6 @@ import org.testcontainers.junit.jupiter.Testcontainers; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,6 +44,7 @@ import java.util.Optional; import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME; import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME; +import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -82,7 +80,7 @@ public class MySQLDaprKeyValueTemplateIT { @Container @ServiceConnection - private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2") + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG) .withAppName("mysql-dapr-app") .withNetwork(DAPR_NETWORK) .withComponent(new Component(STATE_STORE_NAME, "state.mysql", "v1", STATE_STORE_PROPERTIES)) diff --git a/sdk-tests/src/test/java/io/dapr/it/spring/data/PostgreSQLDaprKeyValueTemplateIT.java b/sdk-tests/src/test/java/io/dapr/it/spring/data/PostgreSQLDaprKeyValueTemplateIT.java index b055d3aef..c6f81daa5 100644 --- a/sdk-tests/src/test/java/io/dapr/it/spring/data/PostgreSQLDaprKeyValueTemplateIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/spring/data/PostgreSQLDaprKeyValueTemplateIT.java @@ -26,8 +26,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.testcontainers.service.connection.ServiceConnection; import org.springframework.data.keyvalue.core.query.KeyValueQuery; import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.testcontainers.containers.Network; import org.testcontainers.containers.PostgreSQLContainer; @@ -38,6 +36,7 @@ import java.util.*; import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME; import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME; +import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -68,7 +67,7 @@ public class PostgreSQLDaprKeyValueTemplateIT { @Container @ServiceConnection - private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2") + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG) .withAppName("postgresql-dapr-app") .withNetwork(DAPR_NETWORK) .withComponent(new Component(STATE_STORE_NAME, "state.postgresql", "v1", STATE_STORE_PROPERTIES)) diff --git a/sdk-tests/src/test/java/io/dapr/it/spring/messaging/DaprSpringMessagingIT.java b/sdk-tests/src/test/java/io/dapr/it/spring/messaging/DaprSpringMessagingIT.java index 1e41186df..ea1377949 100644 --- a/sdk-tests/src/test/java/io/dapr/it/spring/messaging/DaprSpringMessagingIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/spring/messaging/DaprSpringMessagingIT.java @@ -37,6 +37,7 @@ import org.testcontainers.junit.jupiter.Testcontainers; import java.util.Collections; import java.util.List; +import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG; import static org.assertj.core.api.Assertions.assertThat; @SpringBootTest( @@ -60,7 +61,7 @@ public class DaprSpringMessagingIT { @Container @ServiceConnection - private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2") + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG) .withAppName("messaging-dapr-app") .withNetwork(DAPR_NETWORK) .withComponent(new Component("pubsub", "pubsub.in-memory", "v1", Collections.emptyMap())) diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprContainerConstants.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprContainerConstants.java new file mode 100644 index 000000000..26bf340a1 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprContainerConstants.java @@ -0,0 +1,5 @@ +package io.dapr.it.testcontainers; + +public interface DaprContainerConstants { + String IMAGE_TAG = "daprio/daprd:1.14.1"; +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprContainerIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprContainerIT.java index d37680a2d..ed7c14ee5 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprContainerIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprContainerIT.java @@ -44,6 +44,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching; import static com.github.tomakehurst.wiremock.client.WireMock.verify; +import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -61,7 +62,7 @@ public class DaprContainerIT { private static final String PUBSUB_TOPIC_NAME = "topic"; @Container - private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd") + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG) .withAppName("dapr-app") .withAppPort(8081) .withAppChannelAddress("host.testcontainers.internal"); diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java index f0c39ed80..364f7d32e 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Map; +import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -56,7 +57,7 @@ public class DaprWorkflowsIT { private static final Network DAPR_NETWORK = Network.newNetwork(); @Container - private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2") + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG) .withAppName("workflow-dapr-app") .withNetwork(DAPR_NETWORK) .withComponent(new Component("kvstore", "state.in-memory", "v1", diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java b/sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java index 8cb4750ae..ff08dc015 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java @@ -13,11 +13,6 @@ limitations under the License. package io.dapr.workflows; -import com.microsoft.durabletask.interruption.ContinueAsNewInterruption; -import com.microsoft.durabletask.interruption.OrchestratorBlockedException; -import io.dapr.workflows.saga.SagaCompensationException; -import io.dapr.workflows.saga.SagaOptions; - /** * Common interface for workflow implementations. */ @@ -39,43 +34,6 @@ public interface Workflow { default void run(WorkflowContext ctx) { WorkflowStub stub = this.create(); - if (!this.isSagaEnabled()) { - // saga disabled - stub.run(ctx); - } else { - // saga enabled - try { - stub.run(ctx); - } catch (OrchestratorBlockedException | ContinueAsNewInterruption e) { - throw e; - } catch (SagaCompensationException e) { - // Saga compensation is triggered gracefully but failed in exception - // don't need to trigger compensation again - throw e; - } catch (Exception e) { - try { - ctx.getSagaContext().compensate(); - } catch (Exception se) { - se.addSuppressed(e); - throw se; - } - - throw e; - } - } - } - - default boolean isSagaEnabled() { - return this.getSagaOption() != null; - } - - /** - * get saga configuration. - * - * @return saga configuration - */ - default SagaOptions getSagaOption() { - // by default, saga is disabled - return null; + stub.run(ctx); } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java index 4156c5a59..9ed34fdc1 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java @@ -17,7 +17,6 @@ import com.microsoft.durabletask.CompositeTaskFailedException; import com.microsoft.durabletask.Task; import com.microsoft.durabletask.TaskCanceledException; import com.microsoft.durabletask.TaskFailedException; -import io.dapr.workflows.saga.SagaContext; import org.slf4j.Logger; import javax.annotation.Nullable; @@ -530,12 +529,4 @@ public interface WorkflowContext { default UUID newUuid() { throw new RuntimeException("No implementation found."); } - - /** - * get saga context. - * - * @return saga context - * @throws UnsupportedOperationException if saga is not enabled. - */ - SagaContext getSagaContext(); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java index b843819ad..20572995c 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java @@ -22,9 +22,6 @@ import com.microsoft.durabletask.TaskOrchestrationContext; import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowTaskOptions; import io.dapr.workflows.WorkflowTaskRetryPolicy; -import io.dapr.workflows.runtime.saga.DefaultSagaContext; -import io.dapr.workflows.saga.Saga; -import io.dapr.workflows.saga.SagaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.helpers.NOPLogger; @@ -39,7 +36,6 @@ import java.util.UUID; public class DefaultWorkflowContext implements WorkflowContext { private final TaskOrchestrationContext innerContext; private final Logger logger; - private final Saga saga; /** * Constructor for DaprWorkflowContextImpl. @@ -58,23 +54,7 @@ public class DefaultWorkflowContext implements WorkflowContext { * @param logger Logger * @throws IllegalArgumentException if context or logger is null */ - public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger) throws IllegalArgumentException { - this(context, logger, null); - } - - public DefaultWorkflowContext(TaskOrchestrationContext context, Saga saga) throws IllegalArgumentException { - this(context, LoggerFactory.getLogger(WorkflowContext.class), saga); - } - - /** - * Constructor for DaprWorkflowContextImpl. - * - * @param context TaskOrchestrationContext - * @param logger Logger - * @param saga saga object, if null, saga is disabled - * @throws IllegalArgumentException if context or logger is null - */ - public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger, Saga saga) + public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger) throws IllegalArgumentException { if (context == null) { throw new IllegalArgumentException("Context cannot be null"); @@ -85,7 +65,6 @@ public class DefaultWorkflowContext implements WorkflowContext { this.innerContext = context; this.logger = logger; - this.saga = saga; } /** @@ -249,15 +228,6 @@ public class DefaultWorkflowContext implements WorkflowContext { return this.innerContext.newUUID(); } - @Override - public SagaContext getSagaContext() { - if (this.saga == null) { - throw new UnsupportedOperationException("Saga is not enabled"); - } - - return new DefaultSagaContext(this.saga, this); - } - private static TaskOptions toTaskOptions(WorkflowTaskOptions options) { if (options == null) { return null; diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java index 9c0ed95a6..4fab3f9cd 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java @@ -16,7 +16,6 @@ package io.dapr.workflows.runtime; import com.microsoft.durabletask.TaskOrchestration; import com.microsoft.durabletask.TaskOrchestrationFactory; import io.dapr.workflows.Workflow; -import io.dapr.workflows.saga.Saga; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -30,6 +29,7 @@ class WorkflowClassWrapper implements TaskOrchestrationFacto public WorkflowClassWrapper(Class clazz) { this.name = clazz.getCanonicalName(); + try { this.workflowConstructor = clazz.getDeclaredConstructor(); } catch (NoSuchMethodException e) { @@ -48,6 +48,7 @@ class WorkflowClassWrapper implements TaskOrchestrationFacto public TaskOrchestration create() { return ctx -> { T workflow; + try { workflow = this.workflowConstructor.newInstance(); } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { @@ -56,13 +57,7 @@ class WorkflowClassWrapper implements TaskOrchestrationFacto ); } - if (workflow.getSagaOption() != null) { - Saga saga = new Saga(workflow.getSagaOption()); - workflow.run(new DefaultWorkflowContext(ctx, saga)); - } else { - workflow.run(new DefaultWorkflowContext(ctx)); - } + workflow.run(new DefaultWorkflowContext(ctx)); }; - } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java index bda34d597..ad3159406 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java @@ -16,7 +16,6 @@ package io.dapr.workflows.runtime; import com.microsoft.durabletask.TaskOrchestration; import com.microsoft.durabletask.TaskOrchestrationFactory; import io.dapr.workflows.Workflow; -import io.dapr.workflows.saga.Saga; /** * Wrapper for Durable Task Framework orchestration factory. @@ -37,13 +36,6 @@ class WorkflowInstanceWrapper implements TaskOrchestrationFa @Override public TaskOrchestration create() { - return ctx -> { - if (workflow.getSagaOption() != null) { - Saga saga = new Saga(workflow.getSagaOption()); - workflow.run(new DefaultWorkflowContext(ctx, saga)); - } else { - workflow.run(new DefaultWorkflowContext(ctx)); - } - }; + return ctx -> workflow.run(new DefaultWorkflowContext(ctx)); } } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/saga/DefaultSagaContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/saga/DefaultSagaContext.java deleted file mode 100644 index 78d72b73d..000000000 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/saga/DefaultSagaContext.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2023 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.workflows.runtime.saga; - -import io.dapr.workflows.WorkflowContext; -import io.dapr.workflows.saga.Saga; -import io.dapr.workflows.saga.SagaContext; - -/** - * Dapr Saga Context implementation. - */ -public class DefaultSagaContext implements SagaContext { - - private final Saga saga; - private final WorkflowContext workflowContext; - - /** - * Constructor to build up instance. - * - * @param saga Saga instance. - * @param workflowContext Workflow context. - * @throws IllegalArgumentException if saga or workflowContext is null. - */ - public DefaultSagaContext(Saga saga, WorkflowContext workflowContext) { - if (saga == null) { - throw new IllegalArgumentException("Saga should not be null"); - } - if (workflowContext == null) { - throw new IllegalArgumentException("workflowContext should not be null"); - } - - this.saga = saga; - this.workflowContext = workflowContext; - } - - @Override - public void registerCompensation(String activityClassName, Object activityInput) { - this.saga.registerCompensation(activityClassName, activityInput); - } - - @Override - public void compensate() { - this.saga.compensate(workflowContext); - } -} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/saga/CompensationInformation.java b/sdk-workflows/src/main/java/io/dapr/workflows/saga/CompensationInformation.java deleted file mode 100644 index 33a2f741d..000000000 --- a/sdk-workflows/src/main/java/io/dapr/workflows/saga/CompensationInformation.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2023 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.workflows.saga; - -import io.dapr.workflows.WorkflowTaskOptions; - -/** - * Information for a compensation activity. - */ -class CompensationInformation { - private final String compensationActivityClassName; - private final Object compensationActivityInput; - private final WorkflowTaskOptions options; - - /** - * Constructor for a compensation information. - * - * @param compensationActivityClassName Class name of the activity to do - * compensation. - * @param compensationActivityInput Input of the activity to do - * compensation. - * @param options Task options to set retry strategy - */ - public CompensationInformation(String compensationActivityClassName, - Object compensationActivityInput, WorkflowTaskOptions options) { - this.compensationActivityClassName = compensationActivityClassName; - this.compensationActivityInput = compensationActivityInput; - this.options = options; - } - - /** - * Gets the class name of the activity. - * - * @return the class name of the activity. - */ - public String getCompensationActivityClassName() { - return compensationActivityClassName; - } - - /** - * Gets the input of the activity. - * - * @return the input of the activity. - */ - public Object getCompensationActivityInput() { - return compensationActivityInput; - } - - /** - * get task options. - * - * @return task options, null if not set - */ - public WorkflowTaskOptions getExecutionOptions() { - return options; - } -} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/saga/Saga.java b/sdk-workflows/src/main/java/io/dapr/workflows/saga/Saga.java deleted file mode 100644 index f02da10b4..000000000 --- a/sdk-workflows/src/main/java/io/dapr/workflows/saga/Saga.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright 2023 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.workflows.saga; - -import com.microsoft.durabletask.Task; -import com.microsoft.durabletask.interruption.ContinueAsNewInterruption; -import com.microsoft.durabletask.interruption.OrchestratorBlockedException; -import io.dapr.workflows.WorkflowContext; -import io.dapr.workflows.WorkflowTaskOptions; - -import java.util.ArrayList; -import java.util.List; - -public final class Saga { - private final SagaOptions options; - private final List compensationActivities = new ArrayList<>(); - - /** - * Build up a Saga with its options. - * - * @param options Saga option. - */ - public Saga(SagaOptions options) { - if (options == null) { - throw new IllegalArgumentException("option is required and should not be null."); - } - this.options = options; - } - - /** - * Register a compensation activity. - * - * @param activityClassName name of the activity class - * @param activityInput input of the activity to be compensated - */ - public void registerCompensation(String activityClassName, Object activityInput) { - this.registerCompensation(activityClassName, activityInput, null); - } - - /** - * Register a compensation activity. - * - * @param activityClassName name of the activity class - * @param activityInput input of the activity to be compensated - * @param options task options to set retry strategy - */ - public void registerCompensation(String activityClassName, Object activityInput, WorkflowTaskOptions options) { - if (activityClassName == null || activityClassName.isEmpty()) { - throw new IllegalArgumentException("activityClassName is required and should not be null or empty."); - } - this.compensationActivities.add(new CompensationInformation(activityClassName, activityInput, options)); - } - - /** - * Compensate all registered activities. - * - * @param ctx Workflow context. - */ - public void compensate(WorkflowContext ctx) { - // Check if parallel compensation is enabled - // Special case: when parallel compensation is enabled and there is only one - // compensation, we still - // compensate sequentially. - if (options.isParallelCompensation() && compensationActivities.size() > 1) { - compensateInParallel(ctx); - } else { - compensateSequentially(ctx); - } - } - - private void compensateInParallel(WorkflowContext ctx) { - List> tasks = new ArrayList<>(compensationActivities.size()); - for (CompensationInformation compensationActivity : compensationActivities) { - Task task = executeCompensateActivity(ctx, compensationActivity); - tasks.add(task); - } - - try { - ctx.allOf(tasks).await(); - } catch (Exception e) { - throw new SagaCompensationException("Failed to compensate in parallel.", e); - } - } - - private void compensateSequentially(WorkflowContext ctx) { - SagaCompensationException sagaException = null; - for (int i = compensationActivities.size() - 1; i >= 0; i--) { - String activityClassName = compensationActivities.get(i).getCompensationActivityClassName(); - try { - executeCompensateActivity(ctx, compensationActivities.get(i)).await(); - } catch (OrchestratorBlockedException | ContinueAsNewInterruption e) { - throw e; - } catch (Exception e) { - if (sagaException == null) { - sagaException = new SagaCompensationException( - "Exception in saga compensation: activity=" + activityClassName, e); - } else { - sagaException.addSuppressed(e); - } - - if (!options.isContinueWithError()) { - throw sagaException; - } - } - } - - if (sagaException != null) { - throw sagaException; - } - } - - private Task executeCompensateActivity(WorkflowContext ctx, CompensationInformation info) - throws SagaCompensationException { - String activityClassName = info.getCompensationActivityClassName(); - return ctx.callActivity(activityClassName, info.getCompensationActivityInput(), - info.getExecutionOptions()); - } -} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaCompensationException.java b/sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaCompensationException.java deleted file mode 100644 index 07396d9b5..000000000 --- a/sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaCompensationException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2023 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.workflows.saga; - -/** - * saga compensation exception. - */ -public class SagaCompensationException extends RuntimeException { - /** - * build up a SagaCompensationException. - * @param message exception message - * @param cause exception cause - */ - public SagaCompensationException(String message, Exception cause) { - super(message, cause); - } -} \ No newline at end of file diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaContext.java deleted file mode 100644 index 03470ff92..000000000 --- a/sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaContext.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2023 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.workflows.saga; - -/** - * Saga context. - */ -public interface SagaContext { - /** - * Register a compensation activity. - * - * @param activityClassName name of the activity class - * @param activityInput input of the activity to be compensated - */ - void registerCompensation(String activityClassName, Object activityInput); - - /** - * Compensate all registered activities. - * - */ - void compensate(); - -} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaOptions.java b/sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaOptions.java deleted file mode 100644 index 8a7184b6d..000000000 --- a/sdk-workflows/src/main/java/io/dapr/workflows/saga/SagaOptions.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright 2023 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.workflows.saga; - -/** - * Saga option. - */ -public final class SagaOptions { - private final boolean parallelCompensation; - private final int maxParallelThread; - private final boolean continueWithError; - - private SagaOptions(boolean parallelCompensation, int maxParallelThread, boolean continueWithError) { - this.parallelCompensation = parallelCompensation; - this.maxParallelThread = maxParallelThread; - this.continueWithError = continueWithError; - } - - public boolean isParallelCompensation() { - return parallelCompensation; - } - - public boolean isContinueWithError() { - return continueWithError; - } - - public int getMaxParallelThread() { - return maxParallelThread; - } - - public static Builder newBuilder() { - return new Builder(); - } - - public static final class Builder { - // by default compensation is sequential - private boolean parallelCompensation = false; - - // by default max parallel thread is 16, it's enough for most cases - private int maxParallelThread = 16; - - // by default set continueWithError to be true - // So if a compensation fails, we should continue with the next compensations - private boolean continueWithError = true; - - /** - * Set parallel compensation. - * @param parallelCompensation parallel compensation or not - * @return this builder itself - */ - public Builder setParallelCompensation(boolean parallelCompensation) { - this.parallelCompensation = parallelCompensation; - return this; - } - - /** - * set max parallel thread. - * - *

Only valid when parallelCompensation is true. - * @param maxParallelThread max parallel thread - * @return this builder itself - */ - public Builder setMaxParallelThread(int maxParallelThread) { - if (maxParallelThread <= 2) { - throw new IllegalArgumentException("maxParallelThread should be greater than 1."); - } - this.maxParallelThread = maxParallelThread; - return this; - } - - /** - * Set continue with error. - * - *

Only valid when parallelCompensation is false. - * @param continueWithError continue with error or not - * @return this builder itself - */ - public Builder setContinueWithError(boolean continueWithError) { - this.continueWithError = continueWithError; - return this; - } - - /** - * Build Saga option. - * @return Saga option - */ - public SagaOptions build() { - return new SagaOptions(this.parallelCompensation, this.maxParallelThread, this.continueWithError); - } - } -} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java index 98b5dc23b..61d153484 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java @@ -20,8 +20,6 @@ import com.microsoft.durabletask.TaskOptions; import com.microsoft.durabletask.TaskOrchestrationContext; import io.dapr.workflows.runtime.DefaultWorkflowContext; -import io.dapr.workflows.saga.Saga; -import io.dapr.workflows.saga.SagaContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -135,12 +133,6 @@ public class DefaultWorkflowContextTest { @Override public void continueAsNew(Object input, boolean preserveUnprocessedEvents) { - - } - - @Override - public SagaContext getSagaContext() { - return null; } }; } @@ -335,19 +327,4 @@ public class DefaultWorkflowContextTest { String expectedMessage = "No implementation found."; assertEquals(expectedMessage, runtimeException.getMessage()); } - - @Test - public void getSagaContextTest_sagaEnabled() { - Saga saga = mock(Saga.class); - WorkflowContext context = new DefaultWorkflowContext(mockInnerContext, saga); - - SagaContext sagaContext = context.getSagaContext(); - assertNotNull(sagaContext, "SagaContext should not be null"); - } - - @Test - public void getSagaContextTest_sagaDisabled() { - WorkflowContext context = new DefaultWorkflowContext(mockInnerContext); - assertThrows(UnsupportedOperationException.class, context::getSagaContext); - } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTest.java index f319709ec..13fe3a1b1 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/WorkflowTest.java @@ -1,15 +1,8 @@ package io.dapr.workflows; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -17,21 +10,12 @@ import static org.mockito.Mockito.verify; import org.junit.Test; -import com.microsoft.durabletask.interruption.ContinueAsNewInterruption; -import com.microsoft.durabletask.interruption.OrchestratorBlockedException; - -import io.dapr.workflows.saga.SagaCompensationException; -import io.dapr.workflows.saga.SagaContext; -import io.dapr.workflows.saga.SagaOptions; - public class WorkflowTest { @Test - public void testWorkflow_WithoutSaga() { + public void testWorkflow() { WorkflowStub stub = mock(WorkflowStub.class); - Workflow workflow = new WorkflowWithoutSaga(stub); - assertNull(workflow.getSagaOption()); - assertFalse(workflow.isSagaEnabled()); + Workflow workflow = new TestWorkflow(stub); WorkflowContext ctx = mock(WorkflowContext.class); doNothing().when(stub).run(ctx); @@ -41,9 +25,9 @@ public class WorkflowTest { } @Test - public void testWorkflow_WithoutSaga_throwException() { + public void testWorkflow_throwException() { WorkflowStub stub = mock(WorkflowStub.class); - Workflow workflow = new WorkflowWithoutSaga(stub); + Workflow workflow = new TestWorkflow(stub); WorkflowContext ctx = mock(WorkflowContext.class); Exception e = new RuntimeException(); doThrow(e).when(stub).run(ctx); @@ -55,117 +39,10 @@ public class WorkflowTest { verify(stub, times(1)).run(eq(ctx)); } - @Test - public void testWorkflow_WithSaga() { - WorkflowStub stub = mock(WorkflowStub.class); - Workflow workflow = new WorkflowWithSaga(stub); - assertNotNull(workflow.getSagaOption()); - assertTrue(workflow.isSagaEnabled()); - - WorkflowContext ctx = mock(WorkflowContext.class); - doNothing().when(stub).run(ctx); - workflow.run(ctx); - - verify(stub, times(1)).run(eq(ctx)); - } - - @Test - public void testWorkflow_WithSaga_shouldNotCatch_OrchestratorBlockedException() { - WorkflowStub stub = mock(WorkflowStub.class); - Workflow workflow = new WorkflowWithSaga(stub); - - WorkflowContext ctx = mock(WorkflowContext.class); - Exception e = new OrchestratorBlockedException("test"); - doThrow(e).when(stub).run(ctx); - - // should not catch OrchestratorBlockedException - assertThrows(OrchestratorBlockedException.class, () -> { - workflow.run(ctx); - }); - verify(stub, times(1)).run(eq(ctx)); - } - - @Test - public void testWorkflow_WithSaga_shouldNotCatch_ContinueAsNewInterruption() { - WorkflowStub stub = mock(WorkflowStub.class); - Workflow workflow = new WorkflowWithSaga(stub); - - WorkflowContext ctx = mock(WorkflowContext.class); - Exception e = new ContinueAsNewInterruption("test"); - doThrow(e).when(stub).run(ctx); - - // should not catch ContinueAsNewInterruption - assertThrows(ContinueAsNewInterruption.class, () -> { - workflow.run(ctx); - }); - verify(stub, times(1)).run(eq(ctx)); - } - - @Test - public void testWorkflow_WithSaga_shouldNotCatch_SagaCompensationException() { - WorkflowStub stub = mock(WorkflowStub.class); - Workflow workflow = new WorkflowWithSaga(stub); - - WorkflowContext ctx = mock(WorkflowContext.class); - Exception e = new SagaCompensationException("test", null); - doThrow(e).when(stub).run(ctx); - - // should not catch SagaCompensationException - assertThrows(SagaCompensationException.class, () -> { - workflow.run(ctx); - }); - verify(stub, times(1)).run(eq(ctx)); - } - - @Test - public void testWorkflow_WithSaga_triggerCompensate() { - WorkflowStub stub = mock(WorkflowStub.class); - Workflow workflow = new WorkflowWithSaga(stub); - - WorkflowContext ctx = mock(WorkflowContext.class); - Exception e = new RuntimeException("test", null); - doThrow(e).when(stub).run(ctx); - SagaContext sagaContext = mock(SagaContext.class); - doReturn(sagaContext).when(ctx).getSagaContext(); - doNothing().when(sagaContext).compensate(); - - assertThrows(RuntimeException.class, () -> { - workflow.run(ctx); - }); - verify(stub, times(1)).run(eq(ctx)); - verify(sagaContext, times(1)).compensate(); - } - - @Test - public void testWorkflow_WithSaga_compensateFaile() { - WorkflowStub stub = mock(WorkflowStub.class); - Workflow workflow = new WorkflowWithSaga(stub); - - WorkflowContext ctx = mock(WorkflowContext.class); - Exception e = new RuntimeException("workflow fail", null); - doThrow(e).when(stub).run(ctx); - SagaContext sagaContext = mock(SagaContext.class); - doReturn(sagaContext).when(ctx).getSagaContext(); - Exception e2 = new RuntimeException("compensate fail", null); - doThrow(e2).when(sagaContext).compensate(); - - try { - workflow.run(ctx); - fail("sholdd throw exception"); - } catch (Exception ex) { - assertEquals(e2.getMessage(), ex.getMessage()); - assertEquals(1, ex.getSuppressed().length); - assertEquals(e.getMessage(), ex.getSuppressed()[0].getMessage()); - } - - verify(stub, times(1)).run(eq(ctx)); - verify(sagaContext, times(1)).compensate(); - } - - public static class WorkflowWithoutSaga implements Workflow { + public static class TestWorkflow implements Workflow { private final WorkflowStub stub; - public WorkflowWithoutSaga(WorkflowStub stub) { + public TestWorkflow(WorkflowStub stub) { this.stub = stub; } @@ -174,24 +51,4 @@ public class WorkflowTest { return stub; } } - - public static class WorkflowWithSaga implements Workflow { - private final WorkflowStub stub; - - public WorkflowWithSaga(WorkflowStub stub) { - this.stub = stub; - } - - @Override - public WorkflowStub create() { - return stub; - } - - @Override - public SagaOptions getSagaOption() { - return SagaOptions.newBuilder() - .setParallelCompensation(false) - .build(); - } - } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/saga/DefaultSagaContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/saga/DefaultSagaContextTest.java deleted file mode 100644 index 4f92c4f9e..000000000 --- a/sdk-workflows/src/test/java/io/dapr/workflows/saga/DefaultSagaContextTest.java +++ /dev/null @@ -1,55 +0,0 @@ -package io.dapr.workflows.saga; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import io.dapr.workflows.runtime.saga.DefaultSagaContext; -import org.junit.Test; - -import io.dapr.workflows.WorkflowContext; - -public class DefaultSagaContextTest { - - @Test - public void testDaprSagaContextImpl_IllegalArgumentException() { - Saga saga = mock(Saga.class); - WorkflowContext workflowContext = mock(WorkflowContext.class); - - assertThrows(IllegalArgumentException.class, () -> { - new DefaultSagaContext(saga, null); - }); - - assertThrows(IllegalArgumentException.class, () -> { - new DefaultSagaContext(null, workflowContext); - }); - } - - @Test - public void test_registerCompensation() { - Saga saga = mock(Saga.class); - WorkflowContext workflowContext = mock(WorkflowContext.class); - DefaultSagaContext ctx = new DefaultSagaContext(saga, workflowContext); - - String activityClassName = "name1"; - Object activityInput = new Object(); - doNothing().when(saga).registerCompensation(activityClassName, activityInput); - - ctx.registerCompensation(activityClassName, activityInput); - verify(saga, times(1)).registerCompensation(activityClassName, activityInput); - } - - @Test - public void test_compensate() { - Saga saga = mock(Saga.class); - WorkflowContext workflowContext = mock(WorkflowContext.class); - DefaultSagaContext ctx = new DefaultSagaContext(saga, workflowContext); - - doNothing().when(saga).compensate(workflowContext); - - ctx.compensate(); - verify(saga, times(1)).compensate(workflowContext); - } -} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaIntegrationTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaIntegrationTest.java deleted file mode 100644 index 0838aa1a3..000000000 --- a/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaIntegrationTest.java +++ /dev/null @@ -1,322 +0,0 @@ -package io.dapr.workflows.saga; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import org.junit.Test; - -import io.dapr.workflows.WorkflowActivity; -import io.dapr.workflows.WorkflowActivityContext; - -public class SagaIntegrationTest { - - private static int count = 0; - private static Object countLock = new Object(); - - @Test - public void testSaga_CompensateSequentially() { - int runCount = 10; - int succeedCount = 0; - int compensateCount = 0; - - for (int i = 0; i < runCount; i++) { - boolean isSuccueed = doExecuteWorkflowWithSaga(false); - if (isSuccueed) { - succeedCount++; - } else { - compensateCount++; - } - } - - System.out.println("Run workflow with saga " + runCount + " times: succeed " + succeedCount - + " times, failed and compensated " + compensateCount + " times"); - } - - @Test - public void testSaga_compensateInParallel() { - int runCount = 100; - int succeedCount = 0; - int compensateCount = 0; - - for (int i = 0; i < runCount; i++) { - boolean isSuccueed = doExecuteWorkflowWithSaga(true); - if (isSuccueed) { - succeedCount++; - } else { - compensateCount++; - } - } - - System.out.println("Run workflow with saga " + runCount + " times: succeed " + succeedCount - + " times, failed and compensated " + compensateCount + " times"); - } - - private boolean doExecuteWorkflowWithSaga(boolean parallelCompensation) { - SagaOptions config = SagaOptions.newBuilder() - .setParallelCompensation(parallelCompensation) - .setContinueWithError(true).build(); - Saga saga = new Saga(config); - boolean workflowSuccess = false; - - // reset count to zero - synchronized (countLock) { - count = 0; - } - - Integer addInput = 100; - Integer subtractInput = 20; - Integer multiplyInput = 10; - Integer divideInput = 5; - - try { - // step1: add activity - String result = callActivity(AddActivity.class.getName(), addInput, String.class); - saga.registerCompensation(AddCompentationActivity.class.getName(), addInput); - // step2: subtract activity - result = callActivity(SubtractActivity.class.getName(), subtractInput, String.class); - saga.registerCompensation(SubtractCompentationActivity.class.getName(), subtractInput); - - if (parallelCompensation) { - // only add/subtract activities support parallel compensation - // so in step3 and step4 we repeat add/subtract activities - - // step3: add activity again - result = callActivity(AddActivity.class.getName(), addInput, String.class); - saga.registerCompensation(AddCompentationActivity.class.getName(), addInput); - - // step4: substract activity again - result = callActivity(SubtractActivity.class.getName(), subtractInput, String.class); - saga.registerCompensation(SubtractCompentationActivity.class.getName(), subtractInput); - } else { - // step3: multiply activity - result = callActivity(MultiplyActivity.class.getName(), multiplyInput, String.class); - saga.registerCompensation(MultiplyCompentationActivity.class.getName(), multiplyInput); - - // step4: divide activity - result = callActivity(DivideActivity.class.getName(), divideInput, String.class); - saga.registerCompensation(DivideCompentationActivity.class.getName(), divideInput); - } - - randomFail(); - - workflowSuccess = true; - } catch (Exception e) { - saga.compensate(SagaTest.createMockContext()); - } - - if (workflowSuccess) { - int expectResult = 0; - if (parallelCompensation) { - expectResult = 0 + addInput - subtractInput + addInput - subtractInput; - } else { - expectResult = (0 + addInput - subtractInput) * multiplyInput / divideInput; - } - assertEquals(expectResult, count); - } else { - assertEquals(0, count); - } - - return workflowSuccess; - } - - // mock to call activity in dapr workflow - private V callActivity(String activityClassName, Object input, Class returnType) { - try { - Class activityClass = Class.forName(activityClassName); - WorkflowActivity activity = (WorkflowActivity) activityClass.getDeclaredConstructor().newInstance(); - WorkflowActivityContext ctx = new WorkflowActivityContext() { - - @Override - public java.lang.String getName() { - return activityClassName; - } - - @Override - public T getInput(Class targetType) { - return (T) input; - } - }; - - randomFail(); - - return (V) activity.run(ctx); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private static void randomFail() { - int randomInt = (int) (Math.random() * 100); - // if randomInt mod 10 is 0, then throw exception - if (randomInt % 10 == 0) { - throw new RuntimeException("random fail"); - } - } - - public static class AddActivity implements WorkflowActivity { - - @Override - public String run(WorkflowActivityContext ctx) { - Integer input = ctx.getInput(Integer.class); - - int originalCount = 0; - int updatedCount = 0; - synchronized (countLock) { - originalCount = count; - updatedCount = originalCount + input; - count = updatedCount; - } - - String resultString = "current count is updated from " + originalCount + " to " + updatedCount - + " after adding " + input; - // System.out.println(resultString); - return resultString; - } - } - - public static class AddCompentationActivity implements WorkflowActivity { - - @Override - public String run(WorkflowActivityContext ctx) { - Integer input = ctx.getInput(Integer.class); - - int originalCount = 0; - int updatedCount = 0; - synchronized (countLock) { - originalCount = count; - updatedCount = originalCount - input; - count = updatedCount; - } - - String resultString = "current count is compensated from " + originalCount + " to " - + updatedCount + " after compensate adding " + input; - // System.out.println(resultString); - return resultString; - } - } - - public static class SubtractActivity implements WorkflowActivity { - - @Override - public String run(WorkflowActivityContext ctx) { - Integer input = ctx.getInput(Integer.class); - - int originalCount = 0; - int updatedCount = 0; - synchronized (countLock) { - originalCount = count; - updatedCount = originalCount - input; - count = updatedCount; - } - - String resultString = "current count is updated from " + originalCount + " to " + updatedCount - + " after substracting " + input; - // System.out.println(resultString); - return resultString; - } - } - - public static class SubtractCompentationActivity implements WorkflowActivity { - - @Override - public String run(WorkflowActivityContext ctx) { - Integer input = ctx.getInput(Integer.class); - - int originalCount = 0; - int updatedCount = 0; - synchronized (countLock) { - originalCount = count; - updatedCount = originalCount + input; - count = updatedCount; - } - - String resultString = "current count is compensated from " + originalCount + " to " + updatedCount - + " after compensate substracting " + input; - // System.out.println(resultString); - return resultString; - } - } - - public static class MultiplyActivity implements WorkflowActivity { - - @Override - public String run(WorkflowActivityContext ctx) { - Integer input = ctx.getInput(Integer.class); - - int originalCount = 0; - int updatedCount = 0; - synchronized (countLock) { - originalCount = count; - updatedCount = originalCount * input; - count = updatedCount; - } - - String resultString = "current count is updated from " + originalCount + " to " + updatedCount - + " after multiplying " + input; - // System.out.println(resultString); - return resultString; - } - } - - public static class MultiplyCompentationActivity implements WorkflowActivity { - - @Override - public String run(WorkflowActivityContext ctx) { - Integer input = ctx.getInput(Integer.class); - - int originalCount = 0; - int updatedCount = 0; - synchronized (countLock) { - originalCount = count; - updatedCount = originalCount / input; - count = updatedCount; - } - - String resultString = "current count is compensated from " + originalCount + " to " + updatedCount - + " after compensate multiplying " + input; - // System.out.println(resultString); - return resultString; - } - } - - public static class DivideActivity implements WorkflowActivity { - - @Override - public String run(WorkflowActivityContext ctx) { - Integer input = ctx.getInput(Integer.class); - - int originalCount = 0; - int updatedCount = 0; - synchronized (countLock) { - originalCount = count; - updatedCount = originalCount / input; - count = updatedCount; - } - - String resultString = "current count is updated from " + originalCount + " to " + updatedCount - + " after dividing " + input; - // System.out.println(resultString); - return resultString; - } - } - - public static class DivideCompentationActivity implements WorkflowActivity { - - @Override - public String run(WorkflowActivityContext ctx) { - Integer input = ctx.getInput(Integer.class); - - int originalCount = 0; - int updatedCount = 0; - synchronized (countLock) { - originalCount = count; - updatedCount = originalCount * input; - count = updatedCount; - } - - String resultString = "current count is compensated from " + originalCount + " to " + updatedCount - + " after compensate dividing " + input; - // System.out.println(resultString); - return resultString; - } - } -} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaOptionsTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaOptionsTest.java deleted file mode 100644 index 76c538813..000000000 --- a/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaOptionsTest.java +++ /dev/null @@ -1,50 +0,0 @@ -package io.dapr.workflows.saga; - -import static org.junit.Assert.assertThrows; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import org.junit.Test; - -public class SagaOptionsTest { - - @Test - public void testBuild() { - SagaOptions.Builder builder = SagaOptions.newBuilder(); - builder.setParallelCompensation(true); - builder.setMaxParallelThread(32); - builder.setContinueWithError(false); - SagaOptions option = builder.build(); - - assertEquals(true, option.isParallelCompensation()); - assertEquals(32, option.getMaxParallelThread()); - assertEquals(false, option.isContinueWithError()); - } - - @Test - public void testBuild_default() { - SagaOptions.Builder builder = SagaOptions.newBuilder(); - SagaOptions option = builder.build(); - - assertEquals(false, option.isParallelCompensation()); - assertEquals(16, option.getMaxParallelThread()); - assertEquals(true, option.isContinueWithError()); - } - - @Test - public void testsetMaxParallelThread() { - SagaOptions.Builder builder = SagaOptions.newBuilder(); - - assertThrows(IllegalArgumentException.class, () -> { - builder.setMaxParallelThread(0); - }); - - assertThrows(IllegalArgumentException.class, () -> { - builder.setMaxParallelThread(1); - }); - - assertThrows(IllegalArgumentException.class, () -> { - builder.setMaxParallelThread(-1); - }); - } - -} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaTest.java deleted file mode 100644 index 8afa2eb10..000000000 --- a/sdk-workflows/src/test/java/io/dapr/workflows/saga/SagaTest.java +++ /dev/null @@ -1,454 +0,0 @@ -/* - * Copyright 2023 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ -package io.dapr.workflows.saga; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import io.dapr.workflows.WorkflowActivityContext; -import io.dapr.workflows.WorkflowTaskOptions; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import com.microsoft.durabletask.Task; - -import io.dapr.workflows.WorkflowContext; -import io.dapr.workflows.WorkflowActivity; - -public class SagaTest { - - public static WorkflowContext createMockContext() { - WorkflowContext workflowContext = mock(WorkflowContext.class); - when(workflowContext.callActivity(anyString(), any(), eq((WorkflowTaskOptions) null))).thenAnswer(new ActivityAnswer()); - when(workflowContext.allOf(anyList())).thenAnswer(new AllActivityAnswer()); - - return workflowContext; - } - - @Test - public void testSaga_IllegalArgument() { - assertThrows(IllegalArgumentException.class, () -> { - new Saga(null); - }); - } - - @Test - public void testregisterCompensation() { - SagaOptions config = SagaOptions.newBuilder() - .setParallelCompensation(false) - .setContinueWithError(true).build(); - Saga saga = new Saga(config); - - saga.registerCompensation(MockActivity.class.getName(), new MockActivityInput()); - } - - @Test - public void testregisterCompensation_IllegalArgument() { - SagaOptions config = SagaOptions.newBuilder() - .setParallelCompensation(false) - .setContinueWithError(true).build(); - Saga saga = new Saga(config); - - assertThrows(IllegalArgumentException.class, () -> { - saga.registerCompensation(null, "input"); - }); - assertThrows(IllegalArgumentException.class, () -> { - saga.registerCompensation("", "input"); - }); - } - - @Test - public void testCompensateInParallel() { - MockCompensationActivity.compensateOrder.clear(); - - SagaOptions config = SagaOptions.newBuilder() - .setParallelCompensation(true).build(); - Saga saga = new Saga(config); - MockActivityInput input1 = new MockActivityInput(); - input1.setOrder(1); - saga.registerCompensation(MockCompensationActivity.class.getName(), input1); - MockActivityInput input2 = new MockActivityInput(); - input2.setOrder(2); - saga.registerCompensation(MockCompensationActivity.class.getName(), input2); - MockActivityInput input3 = new MockActivityInput(); - input3.setOrder(3); - saga.registerCompensation(MockCompensationActivity.class.getName(), input3); - - saga.compensate(createMockContext()); - - assertEquals(3, MockCompensationActivity.compensateOrder.size()); - } - - @Test - public void testCompensateInParallel_exception_1failed() { - MockCompensationActivity.compensateOrder.clear(); - - SagaOptions config = SagaOptions.newBuilder() - .setParallelCompensation(true).build(); - Saga saga = new Saga(config); - MockActivityInput input1 = new MockActivityInput(); - input1.setOrder(1); - saga.registerCompensation(MockCompensationActivity.class.getName(), input1); - MockActivityInput input2 = new MockActivityInput(); - input2.setOrder(2); - input2.setThrowException(true); - saga.registerCompensation(MockCompensationActivity.class.getName(), input2); - MockActivityInput input3 = new MockActivityInput(); - input3.setOrder(3); - saga.registerCompensation(MockCompensationActivity.class.getName(), input3); - - SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { - saga.compensate(createMockContext()); - }); - assertNotNull(exception.getCause()); - // 3 compentation activities, 2 succeed, 1 failed - assertEquals(0, exception.getSuppressed().length); - assertEquals(2, MockCompensationActivity.compensateOrder.size()); - } - - @Test - public void testCompensateInParallel_exception_2failed() { - MockCompensationActivity.compensateOrder.clear(); - - SagaOptions config = SagaOptions.newBuilder() - .setParallelCompensation(true).build(); - Saga saga = new Saga(config); - MockActivityInput input1 = new MockActivityInput(); - input1.setOrder(1); - saga.registerCompensation(MockCompensationActivity.class.getName(), input1); - MockActivityInput input2 = new MockActivityInput(); - input2.setOrder(2); - input2.setThrowException(true); - saga.registerCompensation(MockCompensationActivity.class.getName(), input2); - MockActivityInput input3 = new MockActivityInput(); - input3.setOrder(3); - input3.setThrowException(true); - saga.registerCompensation(MockCompensationActivity.class.getName(), input3); - - SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { - saga.compensate(createMockContext()); - }); - assertNotNull(exception.getCause()); - // 3 compentation activities, 1 succeed, 2 failed - assertEquals(1, MockCompensationActivity.compensateOrder.size()); - } - - @Test - public void testCompensateInParallel_exception_3failed() { - MockCompensationActivity.compensateOrder.clear(); - - SagaOptions config = SagaOptions.newBuilder() - .setParallelCompensation(true).build(); - Saga saga = new Saga(config); - MockActivityInput input1 = new MockActivityInput(); - input1.setOrder(1); - input1.setThrowException(true); - saga.registerCompensation(MockCompensationActivity.class.getName(), input1); - MockActivityInput input2 = new MockActivityInput(); - input2.setOrder(2); - input2.setThrowException(true); - saga.registerCompensation(MockCompensationActivity.class.getName(), input2); - MockActivityInput input3 = new MockActivityInput(); - input3.setOrder(3); - input3.setThrowException(true); - saga.registerCompensation(MockCompensationActivity.class.getName(), input3); - - SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { - saga.compensate(createMockContext()); - }); - assertNotNull(exception.getCause()); - // 3 compentation activities, 0 succeed, 3 failed - assertEquals(0, MockCompensationActivity.compensateOrder.size()); - } - - @Test - public void testCompensateSequentially() { - MockCompensationActivity.compensateOrder.clear(); - - SagaOptions config = SagaOptions.newBuilder() - .setParallelCompensation(false).build(); - Saga saga = new Saga(config); - MockActivityInput input1 = new MockActivityInput(); - input1.setOrder(1); - saga.registerCompensation(MockCompensationActivity.class.getName(), input1); - MockActivityInput input2 = new MockActivityInput(); - input2.setOrder(2); - saga.registerCompensation(MockCompensationActivity.class.getName(), input2); - MockActivityInput input3 = new MockActivityInput(); - input3.setOrder(3); - saga.registerCompensation(MockCompensationActivity.class.getName(), input3); - - saga.compensate(createMockContext()); - - assertEquals(3, MockCompensationActivity.compensateOrder.size()); - - // the order should be 3 / 2 / 1 - assertEquals(Integer.valueOf(3), MockCompensationActivity.compensateOrder.get(0)); - assertEquals(Integer.valueOf(2), MockCompensationActivity.compensateOrder.get(1)); - assertEquals(Integer.valueOf(1), MockCompensationActivity.compensateOrder.get(2)); - } - - @Test - public void testCompensateSequentially_continueWithError() { - MockCompensationActivity.compensateOrder.clear(); - - SagaOptions config = SagaOptions.newBuilder() - .setParallelCompensation(false) - .setContinueWithError(true) - .build(); - Saga saga = new Saga(config); - MockActivityInput input1 = new MockActivityInput(); - input1.setOrder(1); - saga.registerCompensation(MockCompensationActivity.class.getName(), input1); - MockActivityInput input2 = new MockActivityInput(); - input2.setOrder(2); - input2.setThrowException(true); - saga.registerCompensation(MockCompensationActivity.class.getName(), input2); - MockActivityInput input3 = new MockActivityInput(); - input3.setOrder(3); - saga.registerCompensation(MockCompensationActivity.class.getName(), input3); - - SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { - saga.compensate(createMockContext()); - }); - assertNotNull(exception.getCause()); - assertEquals(0, exception.getSuppressed().length); - - // 3 compentation activities, 2 succeed, 1 failed - assertEquals(2, MockCompensationActivity.compensateOrder.size()); - // the order should be 3 / 1 - assertEquals(Integer.valueOf(3), MockCompensationActivity.compensateOrder.get(0)); - assertEquals(Integer.valueOf(1), MockCompensationActivity.compensateOrder.get(1)); - } - - @Test - public void testCompensateSequentially_continueWithError_suppressed() { - MockCompensationActivity.compensateOrder.clear(); - - SagaOptions config = SagaOptions.newBuilder() - .setParallelCompensation(false) - .setContinueWithError(true) - .build(); - Saga saga = new Saga(config); - MockActivityInput input1 = new MockActivityInput(); - input1.setOrder(1); - saga.registerCompensation(MockCompensationActivity.class.getName(), input1); - MockActivityInput input2 = new MockActivityInput(); - input2.setOrder(2); - input2.setThrowException(true); - saga.registerCompensation(MockCompensationActivity.class.getName(), input2); - MockActivityInput input3 = new MockActivityInput(); - input3.setOrder(3); - input3.setThrowException(true); - saga.registerCompensation(MockCompensationActivity.class.getName(), input3); - - SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { - saga.compensate(createMockContext()); - }); - assertNotNull(exception.getCause()); - assertEquals(1, exception.getSuppressed().length); - - // 3 compentation activities, 1 succeed, 2 failed - assertEquals(1, MockCompensationActivity.compensateOrder.size()); - // the order should be 3 / 1 - assertEquals(Integer.valueOf(1), MockCompensationActivity.compensateOrder.get(0)); - } - - @Test - public void testCompensateSequentially_notContinueWithError() { - MockCompensationActivity.compensateOrder.clear(); - - SagaOptions config = SagaOptions.newBuilder() - .setParallelCompensation(false) - .setContinueWithError(false) - .build(); - Saga saga = new Saga(config); - MockActivityInput input1 = new MockActivityInput(); - input1.setOrder(1); - saga.registerCompensation(MockCompensationActivity.class.getName(), input1); - MockActivityInput input2 = new MockActivityInput(); - input2.setOrder(2); - input2.setThrowException(true); - saga.registerCompensation(MockCompensationActivity.class.getName(), input2); - MockActivityInput input3 = new MockActivityInput(); - input3.setOrder(3); - saga.registerCompensation(MockCompensationActivity.class.getName(), input3); - - SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> { - saga.compensate(createMockContext()); - }); - assertNotNull(exception.getCause()); - assertEquals(0, exception.getSuppressed().length); - - // 3 compentation activities, 1 succeed, 1 failed and not continue - assertEquals(1, MockCompensationActivity.compensateOrder.size()); - // the order should be 3 / 1 - assertEquals(Integer.valueOf(3), MockCompensationActivity.compensateOrder.get(0)); - } - - public static class MockActivity implements WorkflowActivity { - - @Override - public Object run(WorkflowActivityContext ctx) { - MockActivityOutput output = new MockActivityOutput(); - output.setSucceed(true); - return output; - } - } - - public static class MockCompensationActivity implements WorkflowActivity { - - private static final List compensateOrder = Collections.synchronizedList(new ArrayList<>()); - - @Override - public Object run(WorkflowActivityContext ctx) { - MockActivityInput input = ctx.getInput(MockActivityInput.class); - - if (input.isThrowException()) { - throw new RuntimeException("compensate failed: order=" + input.getOrder()); - } - - compensateOrder.add(input.getOrder()); - return null; - } - } - - public static class MockActivityInput { - private int order = 0; - private boolean throwException; - - public int getOrder() { - return order; - } - - public void setOrder(int order) { - this.order = order; - } - - public boolean isThrowException() { - return throwException; - } - - public void setThrowException(boolean throwException) { - this.throwException = throwException; - } - } - - public static class MockActivityOutput { - private boolean succeed; - - public boolean isSucceed() { - return succeed; - } - - public void setSucceed(boolean succeed) { - this.succeed = succeed; - } - } - - public static class ActivityAnswer implements Answer> { - - @Override - public Task answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - String name = (String) args[0]; - Object input = args[1]; - - WorkflowActivity activity; - WorkflowActivityContext activityContext = Mockito.mock(WorkflowActivityContext.class); - try { - activity = (WorkflowActivity) Class.forName(name).getDeclaredConstructor().newInstance(); - } catch (Exception e) { - fail(e); - return null; - } - - Task task = mock(Task.class); - when(task.await()).thenAnswer(invocation1 -> { - Mockito.doReturn(input).when(activityContext).getInput(Mockito.any()); - activity.run(activityContext); - return null; - }); - return task; - } - - } - - public static class AllActivityAnswer implements Answer> { - @Override - public Task answer(InvocationOnMock invocation) throws Throwable { - Object[] args = invocation.getArguments(); - List> tasks = (List>) args[0]; - - ExecutorService executor = Executors.newFixedThreadPool(5); - List> compensationTasks = new ArrayList<>(); - for (Task task : tasks) { - Callable compensationTask = new Callable() { - @Override - public Void call() { - return task.await(); - } - }; - compensationTasks.add(compensationTask); - } - - List> resultFutures; - try { - resultFutures = executor.invokeAll(compensationTasks, 2, TimeUnit.SECONDS); - } catch (InterruptedException e) { - fail(e); - return null; - } - - Task task = mock(Task.class); - when(task.await()).thenAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Exception exception = null; - for (Future resultFuture : resultFutures) { - try { - resultFuture.get(); - } catch (Exception e) { - exception = e; - } - } - if (exception != null) { - throw exception; - } - return null; - } - }); - return task; - } - } - -}