mirror of https://github.com/dapr/java-sdk.git
Removing Saga from Dapr Workflows (#1216)
This commit is contained in:
parent
bd3a54d6c4
commit
510679e295
|
@ -29,13 +29,13 @@ import org.testcontainers.containers.PostgreSQLContainer;
|
||||||
import org.testcontainers.junit.jupiter.Container;
|
import org.testcontainers.junit.jupiter.Container;
|
||||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME;
|
import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME;
|
||||||
import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME;
|
import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME;
|
||||||
|
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
|
||||||
import static org.junit.jupiter.api.Assertions.*;
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -65,7 +65,7 @@ public class DaprKeyValueRepositoryIT {
|
||||||
|
|
||||||
@Container
|
@Container
|
||||||
@ServiceConnection
|
@ServiceConnection
|
||||||
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
|
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
|
||||||
.withAppName("postgresql-repository-dapr-app")
|
.withAppName("postgresql-repository-dapr-app")
|
||||||
.withNetwork(DAPR_NETWORK)
|
.withNetwork(DAPR_NETWORK)
|
||||||
.withComponent(new Component(STATE_STORE_NAME, "state.postgresql", "v1", STATE_STORE_PROPERTIES))
|
.withComponent(new Component(STATE_STORE_NAME, "state.postgresql", "v1", STATE_STORE_PROPERTIES))
|
||||||
|
|
|
@ -26,8 +26,6 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
|
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
|
||||||
import org.springframework.data.keyvalue.core.query.KeyValueQuery;
|
import org.springframework.data.keyvalue.core.query.KeyValueQuery;
|
||||||
import org.springframework.test.context.ContextConfiguration;
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
import org.springframework.test.context.DynamicPropertyRegistry;
|
|
||||||
import org.springframework.test.context.DynamicPropertySource;
|
|
||||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||||
import org.testcontainers.containers.MySQLContainer;
|
import org.testcontainers.containers.MySQLContainer;
|
||||||
import org.testcontainers.containers.Network;
|
import org.testcontainers.containers.Network;
|
||||||
|
@ -39,7 +37,6 @@ import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -47,6 +44,7 @@ import java.util.Optional;
|
||||||
|
|
||||||
import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME;
|
import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME;
|
||||||
import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME;
|
import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME;
|
||||||
|
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
@ -82,7 +80,7 @@ public class MySQLDaprKeyValueTemplateIT {
|
||||||
|
|
||||||
@Container
|
@Container
|
||||||
@ServiceConnection
|
@ServiceConnection
|
||||||
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
|
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
|
||||||
.withAppName("mysql-dapr-app")
|
.withAppName("mysql-dapr-app")
|
||||||
.withNetwork(DAPR_NETWORK)
|
.withNetwork(DAPR_NETWORK)
|
||||||
.withComponent(new Component(STATE_STORE_NAME, "state.mysql", "v1", STATE_STORE_PROPERTIES))
|
.withComponent(new Component(STATE_STORE_NAME, "state.mysql", "v1", STATE_STORE_PROPERTIES))
|
||||||
|
|
|
@ -26,8 +26,6 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
|
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
|
||||||
import org.springframework.data.keyvalue.core.query.KeyValueQuery;
|
import org.springframework.data.keyvalue.core.query.KeyValueQuery;
|
||||||
import org.springframework.test.context.ContextConfiguration;
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
import org.springframework.test.context.DynamicPropertyRegistry;
|
|
||||||
import org.springframework.test.context.DynamicPropertySource;
|
|
||||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||||
import org.testcontainers.containers.Network;
|
import org.testcontainers.containers.Network;
|
||||||
import org.testcontainers.containers.PostgreSQLContainer;
|
import org.testcontainers.containers.PostgreSQLContainer;
|
||||||
|
@ -38,6 +36,7 @@ import java.util.*;
|
||||||
|
|
||||||
import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME;
|
import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME;
|
||||||
import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME;
|
import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME;
|
||||||
|
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
@ -68,7 +67,7 @@ public class PostgreSQLDaprKeyValueTemplateIT {
|
||||||
|
|
||||||
@Container
|
@Container
|
||||||
@ServiceConnection
|
@ServiceConnection
|
||||||
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
|
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
|
||||||
.withAppName("postgresql-dapr-app")
|
.withAppName("postgresql-dapr-app")
|
||||||
.withNetwork(DAPR_NETWORK)
|
.withNetwork(DAPR_NETWORK)
|
||||||
.withComponent(new Component(STATE_STORE_NAME, "state.postgresql", "v1", STATE_STORE_PROPERTIES))
|
.withComponent(new Component(STATE_STORE_NAME, "state.postgresql", "v1", STATE_STORE_PROPERTIES))
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.testcontainers.junit.jupiter.Testcontainers;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
@SpringBootTest(
|
@SpringBootTest(
|
||||||
|
@ -60,7 +61,7 @@ public class DaprSpringMessagingIT {
|
||||||
|
|
||||||
@Container
|
@Container
|
||||||
@ServiceConnection
|
@ServiceConnection
|
||||||
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
|
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
|
||||||
.withAppName("messaging-dapr-app")
|
.withAppName("messaging-dapr-app")
|
||||||
.withNetwork(DAPR_NETWORK)
|
.withNetwork(DAPR_NETWORK)
|
||||||
.withComponent(new Component("pubsub", "pubsub.in-memory", "v1", Collections.emptyMap()))
|
.withComponent(new Component("pubsub", "pubsub.in-memory", "v1", Collections.emptyMap()))
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
package io.dapr.it.testcontainers;
|
||||||
|
|
||||||
|
public interface DaprContainerConstants {
|
||||||
|
String IMAGE_TAG = "daprio/daprd:1.14.1";
|
||||||
|
}
|
|
@ -44,6 +44,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
|
||||||
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
|
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
|
||||||
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
|
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
|
||||||
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
|
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
|
||||||
|
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
@ -61,7 +62,7 @@ public class DaprContainerIT {
|
||||||
private static final String PUBSUB_TOPIC_NAME = "topic";
|
private static final String PUBSUB_TOPIC_NAME = "topic";
|
||||||
|
|
||||||
@Container
|
@Container
|
||||||
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd")
|
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
|
||||||
.withAppName("dapr-app")
|
.withAppName("dapr-app")
|
||||||
.withAppPort(8081)
|
.withAppPort(8081)
|
||||||
.withAppChannelAddress("host.testcontainers.internal");
|
.withAppChannelAddress("host.testcontainers.internal");
|
||||||
|
|
|
@ -39,6 +39,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
|
|
||||||
|
@ -56,7 +57,7 @@ public class DaprWorkflowsIT {
|
||||||
private static final Network DAPR_NETWORK = Network.newNetwork();
|
private static final Network DAPR_NETWORK = Network.newNetwork();
|
||||||
|
|
||||||
@Container
|
@Container
|
||||||
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
|
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
|
||||||
.withAppName("workflow-dapr-app")
|
.withAppName("workflow-dapr-app")
|
||||||
.withNetwork(DAPR_NETWORK)
|
.withNetwork(DAPR_NETWORK)
|
||||||
.withComponent(new Component("kvstore", "state.in-memory", "v1",
|
.withComponent(new Component("kvstore", "state.in-memory", "v1",
|
||||||
|
|
|
@ -13,11 +13,6 @@ limitations under the License.
|
||||||
|
|
||||||
package io.dapr.workflows;
|
package io.dapr.workflows;
|
||||||
|
|
||||||
import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
|
|
||||||
import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
|
|
||||||
import io.dapr.workflows.saga.SagaCompensationException;
|
|
||||||
import io.dapr.workflows.saga.SagaOptions;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Common interface for workflow implementations.
|
* Common interface for workflow implementations.
|
||||||
*/
|
*/
|
||||||
|
@ -39,43 +34,6 @@ public interface Workflow {
|
||||||
default void run(WorkflowContext ctx) {
|
default void run(WorkflowContext ctx) {
|
||||||
WorkflowStub stub = this.create();
|
WorkflowStub stub = this.create();
|
||||||
|
|
||||||
if (!this.isSagaEnabled()) {
|
|
||||||
// saga disabled
|
|
||||||
stub.run(ctx);
|
stub.run(ctx);
|
||||||
} else {
|
|
||||||
// saga enabled
|
|
||||||
try {
|
|
||||||
stub.run(ctx);
|
|
||||||
} catch (OrchestratorBlockedException | ContinueAsNewInterruption e) {
|
|
||||||
throw e;
|
|
||||||
} catch (SagaCompensationException e) {
|
|
||||||
// Saga compensation is triggered gracefully but failed in exception
|
|
||||||
// don't need to trigger compensation again
|
|
||||||
throw e;
|
|
||||||
} catch (Exception e) {
|
|
||||||
try {
|
|
||||||
ctx.getSagaContext().compensate();
|
|
||||||
} catch (Exception se) {
|
|
||||||
se.addSuppressed(e);
|
|
||||||
throw se;
|
|
||||||
}
|
|
||||||
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
default boolean isSagaEnabled() {
|
|
||||||
return this.getSagaOption() != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* get saga configuration.
|
|
||||||
*
|
|
||||||
* @return saga configuration
|
|
||||||
*/
|
|
||||||
default SagaOptions getSagaOption() {
|
|
||||||
// by default, saga is disabled
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@ import com.microsoft.durabletask.CompositeTaskFailedException;
|
||||||
import com.microsoft.durabletask.Task;
|
import com.microsoft.durabletask.Task;
|
||||||
import com.microsoft.durabletask.TaskCanceledException;
|
import com.microsoft.durabletask.TaskCanceledException;
|
||||||
import com.microsoft.durabletask.TaskFailedException;
|
import com.microsoft.durabletask.TaskFailedException;
|
||||||
import io.dapr.workflows.saga.SagaContext;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
@ -530,12 +529,4 @@ public interface WorkflowContext {
|
||||||
default UUID newUuid() {
|
default UUID newUuid() {
|
||||||
throw new RuntimeException("No implementation found.");
|
throw new RuntimeException("No implementation found.");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* get saga context.
|
|
||||||
*
|
|
||||||
* @return saga context
|
|
||||||
* @throws UnsupportedOperationException if saga is not enabled.
|
|
||||||
*/
|
|
||||||
SagaContext getSagaContext();
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,9 +22,6 @@ import com.microsoft.durabletask.TaskOrchestrationContext;
|
||||||
import io.dapr.workflows.WorkflowContext;
|
import io.dapr.workflows.WorkflowContext;
|
||||||
import io.dapr.workflows.WorkflowTaskOptions;
|
import io.dapr.workflows.WorkflowTaskOptions;
|
||||||
import io.dapr.workflows.WorkflowTaskRetryPolicy;
|
import io.dapr.workflows.WorkflowTaskRetryPolicy;
|
||||||
import io.dapr.workflows.runtime.saga.DefaultSagaContext;
|
|
||||||
import io.dapr.workflows.saga.Saga;
|
|
||||||
import io.dapr.workflows.saga.SagaContext;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.slf4j.helpers.NOPLogger;
|
import org.slf4j.helpers.NOPLogger;
|
||||||
|
@ -39,7 +36,6 @@ import java.util.UUID;
|
||||||
public class DefaultWorkflowContext implements WorkflowContext {
|
public class DefaultWorkflowContext implements WorkflowContext {
|
||||||
private final TaskOrchestrationContext innerContext;
|
private final TaskOrchestrationContext innerContext;
|
||||||
private final Logger logger;
|
private final Logger logger;
|
||||||
private final Saga saga;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor for DaprWorkflowContextImpl.
|
* Constructor for DaprWorkflowContextImpl.
|
||||||
|
@ -58,23 +54,7 @@ public class DefaultWorkflowContext implements WorkflowContext {
|
||||||
* @param logger Logger
|
* @param logger Logger
|
||||||
* @throws IllegalArgumentException if context or logger is null
|
* @throws IllegalArgumentException if context or logger is null
|
||||||
*/
|
*/
|
||||||
public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger) throws IllegalArgumentException {
|
public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger)
|
||||||
this(context, logger, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public DefaultWorkflowContext(TaskOrchestrationContext context, Saga saga) throws IllegalArgumentException {
|
|
||||||
this(context, LoggerFactory.getLogger(WorkflowContext.class), saga);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor for DaprWorkflowContextImpl.
|
|
||||||
*
|
|
||||||
* @param context TaskOrchestrationContext
|
|
||||||
* @param logger Logger
|
|
||||||
* @param saga saga object, if null, saga is disabled
|
|
||||||
* @throws IllegalArgumentException if context or logger is null
|
|
||||||
*/
|
|
||||||
public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger, Saga saga)
|
|
||||||
throws IllegalArgumentException {
|
throws IllegalArgumentException {
|
||||||
if (context == null) {
|
if (context == null) {
|
||||||
throw new IllegalArgumentException("Context cannot be null");
|
throw new IllegalArgumentException("Context cannot be null");
|
||||||
|
@ -85,7 +65,6 @@ public class DefaultWorkflowContext implements WorkflowContext {
|
||||||
|
|
||||||
this.innerContext = context;
|
this.innerContext = context;
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
this.saga = saga;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -249,15 +228,6 @@ public class DefaultWorkflowContext implements WorkflowContext {
|
||||||
return this.innerContext.newUUID();
|
return this.innerContext.newUUID();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public SagaContext getSagaContext() {
|
|
||||||
if (this.saga == null) {
|
|
||||||
throw new UnsupportedOperationException("Saga is not enabled");
|
|
||||||
}
|
|
||||||
|
|
||||||
return new DefaultSagaContext(this.saga, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static TaskOptions toTaskOptions(WorkflowTaskOptions options) {
|
private static TaskOptions toTaskOptions(WorkflowTaskOptions options) {
|
||||||
if (options == null) {
|
if (options == null) {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -16,7 +16,6 @@ package io.dapr.workflows.runtime;
|
||||||
import com.microsoft.durabletask.TaskOrchestration;
|
import com.microsoft.durabletask.TaskOrchestration;
|
||||||
import com.microsoft.durabletask.TaskOrchestrationFactory;
|
import com.microsoft.durabletask.TaskOrchestrationFactory;
|
||||||
import io.dapr.workflows.Workflow;
|
import io.dapr.workflows.Workflow;
|
||||||
import io.dapr.workflows.saga.Saga;
|
|
||||||
|
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
@ -30,6 +29,7 @@ class WorkflowClassWrapper<T extends Workflow> implements TaskOrchestrationFacto
|
||||||
|
|
||||||
public WorkflowClassWrapper(Class<T> clazz) {
|
public WorkflowClassWrapper(Class<T> clazz) {
|
||||||
this.name = clazz.getCanonicalName();
|
this.name = clazz.getCanonicalName();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.workflowConstructor = clazz.getDeclaredConstructor();
|
this.workflowConstructor = clazz.getDeclaredConstructor();
|
||||||
} catch (NoSuchMethodException e) {
|
} catch (NoSuchMethodException e) {
|
||||||
|
@ -48,6 +48,7 @@ class WorkflowClassWrapper<T extends Workflow> implements TaskOrchestrationFacto
|
||||||
public TaskOrchestration create() {
|
public TaskOrchestration create() {
|
||||||
return ctx -> {
|
return ctx -> {
|
||||||
T workflow;
|
T workflow;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
workflow = this.workflowConstructor.newInstance();
|
workflow = this.workflowConstructor.newInstance();
|
||||||
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
|
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
|
||||||
|
@ -56,13 +57,7 @@ class WorkflowClassWrapper<T extends Workflow> implements TaskOrchestrationFacto
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (workflow.getSagaOption() != null) {
|
|
||||||
Saga saga = new Saga(workflow.getSagaOption());
|
|
||||||
workflow.run(new DefaultWorkflowContext(ctx, saga));
|
|
||||||
} else {
|
|
||||||
workflow.run(new DefaultWorkflowContext(ctx));
|
workflow.run(new DefaultWorkflowContext(ctx));
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,6 @@ package io.dapr.workflows.runtime;
|
||||||
import com.microsoft.durabletask.TaskOrchestration;
|
import com.microsoft.durabletask.TaskOrchestration;
|
||||||
import com.microsoft.durabletask.TaskOrchestrationFactory;
|
import com.microsoft.durabletask.TaskOrchestrationFactory;
|
||||||
import io.dapr.workflows.Workflow;
|
import io.dapr.workflows.Workflow;
|
||||||
import io.dapr.workflows.saga.Saga;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wrapper for Durable Task Framework orchestration factory.
|
* Wrapper for Durable Task Framework orchestration factory.
|
||||||
|
@ -37,13 +36,6 @@ class WorkflowInstanceWrapper<T extends Workflow> implements TaskOrchestrationFa
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TaskOrchestration create() {
|
public TaskOrchestration create() {
|
||||||
return ctx -> {
|
return ctx -> workflow.run(new DefaultWorkflowContext(ctx));
|
||||||
if (workflow.getSagaOption() != null) {
|
|
||||||
Saga saga = new Saga(workflow.getSagaOption());
|
|
||||||
workflow.run(new DefaultWorkflowContext(ctx, saga));
|
|
||||||
} else {
|
|
||||||
workflow.run(new DefaultWorkflowContext(ctx));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,56 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2023 The Dapr Authors
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.dapr.workflows.runtime.saga;
|
|
||||||
|
|
||||||
import io.dapr.workflows.WorkflowContext;
|
|
||||||
import io.dapr.workflows.saga.Saga;
|
|
||||||
import io.dapr.workflows.saga.SagaContext;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Dapr Saga Context implementation.
|
|
||||||
*/
|
|
||||||
public class DefaultSagaContext implements SagaContext {
|
|
||||||
|
|
||||||
private final Saga saga;
|
|
||||||
private final WorkflowContext workflowContext;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor to build up instance.
|
|
||||||
*
|
|
||||||
* @param saga Saga instance.
|
|
||||||
* @param workflowContext Workflow context.
|
|
||||||
* @throws IllegalArgumentException if saga or workflowContext is null.
|
|
||||||
*/
|
|
||||||
public DefaultSagaContext(Saga saga, WorkflowContext workflowContext) {
|
|
||||||
if (saga == null) {
|
|
||||||
throw new IllegalArgumentException("Saga should not be null");
|
|
||||||
}
|
|
||||||
if (workflowContext == null) {
|
|
||||||
throw new IllegalArgumentException("workflowContext should not be null");
|
|
||||||
}
|
|
||||||
|
|
||||||
this.saga = saga;
|
|
||||||
this.workflowContext = workflowContext;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void registerCompensation(String activityClassName, Object activityInput) {
|
|
||||||
this.saga.registerCompensation(activityClassName, activityInput);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void compensate() {
|
|
||||||
this.saga.compensate(workflowContext);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,68 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2023 The Dapr Authors
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.dapr.workflows.saga;
|
|
||||||
|
|
||||||
import io.dapr.workflows.WorkflowTaskOptions;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Information for a compensation activity.
|
|
||||||
*/
|
|
||||||
class CompensationInformation {
|
|
||||||
private final String compensationActivityClassName;
|
|
||||||
private final Object compensationActivityInput;
|
|
||||||
private final WorkflowTaskOptions options;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor for a compensation information.
|
|
||||||
*
|
|
||||||
* @param compensationActivityClassName Class name of the activity to do
|
|
||||||
* compensation.
|
|
||||||
* @param compensationActivityInput Input of the activity to do
|
|
||||||
* compensation.
|
|
||||||
* @param options Task options to set retry strategy
|
|
||||||
*/
|
|
||||||
public CompensationInformation(String compensationActivityClassName,
|
|
||||||
Object compensationActivityInput, WorkflowTaskOptions options) {
|
|
||||||
this.compensationActivityClassName = compensationActivityClassName;
|
|
||||||
this.compensationActivityInput = compensationActivityInput;
|
|
||||||
this.options = options;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the class name of the activity.
|
|
||||||
*
|
|
||||||
* @return the class name of the activity.
|
|
||||||
*/
|
|
||||||
public String getCompensationActivityClassName() {
|
|
||||||
return compensationActivityClassName;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the input of the activity.
|
|
||||||
*
|
|
||||||
* @return the input of the activity.
|
|
||||||
*/
|
|
||||||
public Object getCompensationActivityInput() {
|
|
||||||
return compensationActivityInput;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* get task options.
|
|
||||||
*
|
|
||||||
* @return task options, null if not set
|
|
||||||
*/
|
|
||||||
public WorkflowTaskOptions getExecutionOptions() {
|
|
||||||
return options;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,129 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2023 The Dapr Authors
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.dapr.workflows.saga;
|
|
||||||
|
|
||||||
import com.microsoft.durabletask.Task;
|
|
||||||
import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
|
|
||||||
import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
|
|
||||||
import io.dapr.workflows.WorkflowContext;
|
|
||||||
import io.dapr.workflows.WorkflowTaskOptions;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
public final class Saga {
|
|
||||||
private final SagaOptions options;
|
|
||||||
private final List<CompensationInformation> compensationActivities = new ArrayList<>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Build up a Saga with its options.
|
|
||||||
*
|
|
||||||
* @param options Saga option.
|
|
||||||
*/
|
|
||||||
public Saga(SagaOptions options) {
|
|
||||||
if (options == null) {
|
|
||||||
throw new IllegalArgumentException("option is required and should not be null.");
|
|
||||||
}
|
|
||||||
this.options = options;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Register a compensation activity.
|
|
||||||
*
|
|
||||||
* @param activityClassName name of the activity class
|
|
||||||
* @param activityInput input of the activity to be compensated
|
|
||||||
*/
|
|
||||||
public void registerCompensation(String activityClassName, Object activityInput) {
|
|
||||||
this.registerCompensation(activityClassName, activityInput, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Register a compensation activity.
|
|
||||||
*
|
|
||||||
* @param activityClassName name of the activity class
|
|
||||||
* @param activityInput input of the activity to be compensated
|
|
||||||
* @param options task options to set retry strategy
|
|
||||||
*/
|
|
||||||
public void registerCompensation(String activityClassName, Object activityInput, WorkflowTaskOptions options) {
|
|
||||||
if (activityClassName == null || activityClassName.isEmpty()) {
|
|
||||||
throw new IllegalArgumentException("activityClassName is required and should not be null or empty.");
|
|
||||||
}
|
|
||||||
this.compensationActivities.add(new CompensationInformation(activityClassName, activityInput, options));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Compensate all registered activities.
|
|
||||||
*
|
|
||||||
* @param ctx Workflow context.
|
|
||||||
*/
|
|
||||||
public void compensate(WorkflowContext ctx) {
|
|
||||||
// Check if parallel compensation is enabled
|
|
||||||
// Special case: when parallel compensation is enabled and there is only one
|
|
||||||
// compensation, we still
|
|
||||||
// compensate sequentially.
|
|
||||||
if (options.isParallelCompensation() && compensationActivities.size() > 1) {
|
|
||||||
compensateInParallel(ctx);
|
|
||||||
} else {
|
|
||||||
compensateSequentially(ctx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void compensateInParallel(WorkflowContext ctx) {
|
|
||||||
List<Task<Void>> tasks = new ArrayList<>(compensationActivities.size());
|
|
||||||
for (CompensationInformation compensationActivity : compensationActivities) {
|
|
||||||
Task<Void> task = executeCompensateActivity(ctx, compensationActivity);
|
|
||||||
tasks.add(task);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
ctx.allOf(tasks).await();
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new SagaCompensationException("Failed to compensate in parallel.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void compensateSequentially(WorkflowContext ctx) {
|
|
||||||
SagaCompensationException sagaException = null;
|
|
||||||
for (int i = compensationActivities.size() - 1; i >= 0; i--) {
|
|
||||||
String activityClassName = compensationActivities.get(i).getCompensationActivityClassName();
|
|
||||||
try {
|
|
||||||
executeCompensateActivity(ctx, compensationActivities.get(i)).await();
|
|
||||||
} catch (OrchestratorBlockedException | ContinueAsNewInterruption e) {
|
|
||||||
throw e;
|
|
||||||
} catch (Exception e) {
|
|
||||||
if (sagaException == null) {
|
|
||||||
sagaException = new SagaCompensationException(
|
|
||||||
"Exception in saga compensation: activity=" + activityClassName, e);
|
|
||||||
} else {
|
|
||||||
sagaException.addSuppressed(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!options.isContinueWithError()) {
|
|
||||||
throw sagaException;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sagaException != null) {
|
|
||||||
throw sagaException;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Task<Void> executeCompensateActivity(WorkflowContext ctx, CompensationInformation info)
|
|
||||||
throws SagaCompensationException {
|
|
||||||
String activityClassName = info.getCompensationActivityClassName();
|
|
||||||
return ctx.callActivity(activityClassName, info.getCompensationActivityInput(),
|
|
||||||
info.getExecutionOptions());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,28 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2023 The Dapr Authors
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.dapr.workflows.saga;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* saga compensation exception.
|
|
||||||
*/
|
|
||||||
public class SagaCompensationException extends RuntimeException {
|
|
||||||
/**
|
|
||||||
* build up a SagaCompensationException.
|
|
||||||
* @param message exception message
|
|
||||||
* @param cause exception cause
|
|
||||||
*/
|
|
||||||
public SagaCompensationException(String message, Exception cause) {
|
|
||||||
super(message, cause);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,34 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2023 The Dapr Authors
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.dapr.workflows.saga;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Saga context.
|
|
||||||
*/
|
|
||||||
public interface SagaContext {
|
|
||||||
/**
|
|
||||||
* Register a compensation activity.
|
|
||||||
*
|
|
||||||
* @param activityClassName name of the activity class
|
|
||||||
* @param activityInput input of the activity to be compensated
|
|
||||||
*/
|
|
||||||
void registerCompensation(String activityClassName, Object activityInput);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Compensate all registered activities.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
void compensate();
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,102 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2023 The Dapr Authors
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.dapr.workflows.saga;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Saga option.
|
|
||||||
*/
|
|
||||||
public final class SagaOptions {
|
|
||||||
private final boolean parallelCompensation;
|
|
||||||
private final int maxParallelThread;
|
|
||||||
private final boolean continueWithError;
|
|
||||||
|
|
||||||
private SagaOptions(boolean parallelCompensation, int maxParallelThread, boolean continueWithError) {
|
|
||||||
this.parallelCompensation = parallelCompensation;
|
|
||||||
this.maxParallelThread = maxParallelThread;
|
|
||||||
this.continueWithError = continueWithError;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isParallelCompensation() {
|
|
||||||
return parallelCompensation;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isContinueWithError() {
|
|
||||||
return continueWithError;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getMaxParallelThread() {
|
|
||||||
return maxParallelThread;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Builder newBuilder() {
|
|
||||||
return new Builder();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static final class Builder {
|
|
||||||
// by default compensation is sequential
|
|
||||||
private boolean parallelCompensation = false;
|
|
||||||
|
|
||||||
// by default max parallel thread is 16, it's enough for most cases
|
|
||||||
private int maxParallelThread = 16;
|
|
||||||
|
|
||||||
// by default set continueWithError to be true
|
|
||||||
// So if a compensation fails, we should continue with the next compensations
|
|
||||||
private boolean continueWithError = true;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set parallel compensation.
|
|
||||||
* @param parallelCompensation parallel compensation or not
|
|
||||||
* @return this builder itself
|
|
||||||
*/
|
|
||||||
public Builder setParallelCompensation(boolean parallelCompensation) {
|
|
||||||
this.parallelCompensation = parallelCompensation;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* set max parallel thread.
|
|
||||||
*
|
|
||||||
* <p>Only valid when parallelCompensation is true.
|
|
||||||
* @param maxParallelThread max parallel thread
|
|
||||||
* @return this builder itself
|
|
||||||
*/
|
|
||||||
public Builder setMaxParallelThread(int maxParallelThread) {
|
|
||||||
if (maxParallelThread <= 2) {
|
|
||||||
throw new IllegalArgumentException("maxParallelThread should be greater than 1.");
|
|
||||||
}
|
|
||||||
this.maxParallelThread = maxParallelThread;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set continue with error.
|
|
||||||
*
|
|
||||||
* <p>Only valid when parallelCompensation is false.
|
|
||||||
* @param continueWithError continue with error or not
|
|
||||||
* @return this builder itself
|
|
||||||
*/
|
|
||||||
public Builder setContinueWithError(boolean continueWithError) {
|
|
||||||
this.continueWithError = continueWithError;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Build Saga option.
|
|
||||||
* @return Saga option
|
|
||||||
*/
|
|
||||||
public SagaOptions build() {
|
|
||||||
return new SagaOptions(this.parallelCompensation, this.maxParallelThread, this.continueWithError);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -20,8 +20,6 @@ import com.microsoft.durabletask.TaskOptions;
|
||||||
import com.microsoft.durabletask.TaskOrchestrationContext;
|
import com.microsoft.durabletask.TaskOrchestrationContext;
|
||||||
|
|
||||||
import io.dapr.workflows.runtime.DefaultWorkflowContext;
|
import io.dapr.workflows.runtime.DefaultWorkflowContext;
|
||||||
import io.dapr.workflows.saga.Saga;
|
|
||||||
import io.dapr.workflows.saga.SagaContext;
|
|
||||||
|
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
@ -135,12 +133,6 @@ public class DefaultWorkflowContextTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void continueAsNew(Object input, boolean preserveUnprocessedEvents) {
|
public void continueAsNew(Object input, boolean preserveUnprocessedEvents) {
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SagaContext getSagaContext() {
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -335,19 +327,4 @@ public class DefaultWorkflowContextTest {
|
||||||
String expectedMessage = "No implementation found.";
|
String expectedMessage = "No implementation found.";
|
||||||
assertEquals(expectedMessage, runtimeException.getMessage());
|
assertEquals(expectedMessage, runtimeException.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void getSagaContextTest_sagaEnabled() {
|
|
||||||
Saga saga = mock(Saga.class);
|
|
||||||
WorkflowContext context = new DefaultWorkflowContext(mockInnerContext, saga);
|
|
||||||
|
|
||||||
SagaContext sagaContext = context.getSagaContext();
|
|
||||||
assertNotNull(sagaContext, "SagaContext should not be null");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void getSagaContextTest_sagaDisabled() {
|
|
||||||
WorkflowContext context = new DefaultWorkflowContext(mockInnerContext);
|
|
||||||
assertThrows(UnsupportedOperationException.class, context::getSagaContext);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,15 +1,8 @@
|
||||||
package io.dapr.workflows;
|
package io.dapr.workflows;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertThrows;
|
import static org.junit.Assert.assertThrows;
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.doReturn;
|
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
|
@ -17,21 +10,12 @@ import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
|
|
||||||
import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
|
|
||||||
|
|
||||||
import io.dapr.workflows.saga.SagaCompensationException;
|
|
||||||
import io.dapr.workflows.saga.SagaContext;
|
|
||||||
import io.dapr.workflows.saga.SagaOptions;
|
|
||||||
|
|
||||||
public class WorkflowTest {
|
public class WorkflowTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWorkflow_WithoutSaga() {
|
public void testWorkflow() {
|
||||||
WorkflowStub stub = mock(WorkflowStub.class);
|
WorkflowStub stub = mock(WorkflowStub.class);
|
||||||
Workflow workflow = new WorkflowWithoutSaga(stub);
|
Workflow workflow = new TestWorkflow(stub);
|
||||||
assertNull(workflow.getSagaOption());
|
|
||||||
assertFalse(workflow.isSagaEnabled());
|
|
||||||
|
|
||||||
WorkflowContext ctx = mock(WorkflowContext.class);
|
WorkflowContext ctx = mock(WorkflowContext.class);
|
||||||
doNothing().when(stub).run(ctx);
|
doNothing().when(stub).run(ctx);
|
||||||
|
@ -41,9 +25,9 @@ public class WorkflowTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWorkflow_WithoutSaga_throwException() {
|
public void testWorkflow_throwException() {
|
||||||
WorkflowStub stub = mock(WorkflowStub.class);
|
WorkflowStub stub = mock(WorkflowStub.class);
|
||||||
Workflow workflow = new WorkflowWithoutSaga(stub);
|
Workflow workflow = new TestWorkflow(stub);
|
||||||
WorkflowContext ctx = mock(WorkflowContext.class);
|
WorkflowContext ctx = mock(WorkflowContext.class);
|
||||||
Exception e = new RuntimeException();
|
Exception e = new RuntimeException();
|
||||||
doThrow(e).when(stub).run(ctx);
|
doThrow(e).when(stub).run(ctx);
|
||||||
|
@ -55,117 +39,10 @@ public class WorkflowTest {
|
||||||
verify(stub, times(1)).run(eq(ctx));
|
verify(stub, times(1)).run(eq(ctx));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
public static class TestWorkflow implements Workflow {
|
||||||
public void testWorkflow_WithSaga() {
|
|
||||||
WorkflowStub stub = mock(WorkflowStub.class);
|
|
||||||
Workflow workflow = new WorkflowWithSaga(stub);
|
|
||||||
assertNotNull(workflow.getSagaOption());
|
|
||||||
assertTrue(workflow.isSagaEnabled());
|
|
||||||
|
|
||||||
WorkflowContext ctx = mock(WorkflowContext.class);
|
|
||||||
doNothing().when(stub).run(ctx);
|
|
||||||
workflow.run(ctx);
|
|
||||||
|
|
||||||
verify(stub, times(1)).run(eq(ctx));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWorkflow_WithSaga_shouldNotCatch_OrchestratorBlockedException() {
|
|
||||||
WorkflowStub stub = mock(WorkflowStub.class);
|
|
||||||
Workflow workflow = new WorkflowWithSaga(stub);
|
|
||||||
|
|
||||||
WorkflowContext ctx = mock(WorkflowContext.class);
|
|
||||||
Exception e = new OrchestratorBlockedException("test");
|
|
||||||
doThrow(e).when(stub).run(ctx);
|
|
||||||
|
|
||||||
// should not catch OrchestratorBlockedException
|
|
||||||
assertThrows(OrchestratorBlockedException.class, () -> {
|
|
||||||
workflow.run(ctx);
|
|
||||||
});
|
|
||||||
verify(stub, times(1)).run(eq(ctx));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWorkflow_WithSaga_shouldNotCatch_ContinueAsNewInterruption() {
|
|
||||||
WorkflowStub stub = mock(WorkflowStub.class);
|
|
||||||
Workflow workflow = new WorkflowWithSaga(stub);
|
|
||||||
|
|
||||||
WorkflowContext ctx = mock(WorkflowContext.class);
|
|
||||||
Exception e = new ContinueAsNewInterruption("test");
|
|
||||||
doThrow(e).when(stub).run(ctx);
|
|
||||||
|
|
||||||
// should not catch ContinueAsNewInterruption
|
|
||||||
assertThrows(ContinueAsNewInterruption.class, () -> {
|
|
||||||
workflow.run(ctx);
|
|
||||||
});
|
|
||||||
verify(stub, times(1)).run(eq(ctx));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWorkflow_WithSaga_shouldNotCatch_SagaCompensationException() {
|
|
||||||
WorkflowStub stub = mock(WorkflowStub.class);
|
|
||||||
Workflow workflow = new WorkflowWithSaga(stub);
|
|
||||||
|
|
||||||
WorkflowContext ctx = mock(WorkflowContext.class);
|
|
||||||
Exception e = new SagaCompensationException("test", null);
|
|
||||||
doThrow(e).when(stub).run(ctx);
|
|
||||||
|
|
||||||
// should not catch SagaCompensationException
|
|
||||||
assertThrows(SagaCompensationException.class, () -> {
|
|
||||||
workflow.run(ctx);
|
|
||||||
});
|
|
||||||
verify(stub, times(1)).run(eq(ctx));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWorkflow_WithSaga_triggerCompensate() {
|
|
||||||
WorkflowStub stub = mock(WorkflowStub.class);
|
|
||||||
Workflow workflow = new WorkflowWithSaga(stub);
|
|
||||||
|
|
||||||
WorkflowContext ctx = mock(WorkflowContext.class);
|
|
||||||
Exception e = new RuntimeException("test", null);
|
|
||||||
doThrow(e).when(stub).run(ctx);
|
|
||||||
SagaContext sagaContext = mock(SagaContext.class);
|
|
||||||
doReturn(sagaContext).when(ctx).getSagaContext();
|
|
||||||
doNothing().when(sagaContext).compensate();
|
|
||||||
|
|
||||||
assertThrows(RuntimeException.class, () -> {
|
|
||||||
workflow.run(ctx);
|
|
||||||
});
|
|
||||||
verify(stub, times(1)).run(eq(ctx));
|
|
||||||
verify(sagaContext, times(1)).compensate();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWorkflow_WithSaga_compensateFaile() {
|
|
||||||
WorkflowStub stub = mock(WorkflowStub.class);
|
|
||||||
Workflow workflow = new WorkflowWithSaga(stub);
|
|
||||||
|
|
||||||
WorkflowContext ctx = mock(WorkflowContext.class);
|
|
||||||
Exception e = new RuntimeException("workflow fail", null);
|
|
||||||
doThrow(e).when(stub).run(ctx);
|
|
||||||
SagaContext sagaContext = mock(SagaContext.class);
|
|
||||||
doReturn(sagaContext).when(ctx).getSagaContext();
|
|
||||||
Exception e2 = new RuntimeException("compensate fail", null);
|
|
||||||
doThrow(e2).when(sagaContext).compensate();
|
|
||||||
|
|
||||||
try {
|
|
||||||
workflow.run(ctx);
|
|
||||||
fail("sholdd throw exception");
|
|
||||||
} catch (Exception ex) {
|
|
||||||
assertEquals(e2.getMessage(), ex.getMessage());
|
|
||||||
assertEquals(1, ex.getSuppressed().length);
|
|
||||||
assertEquals(e.getMessage(), ex.getSuppressed()[0].getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
verify(stub, times(1)).run(eq(ctx));
|
|
||||||
verify(sagaContext, times(1)).compensate();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class WorkflowWithoutSaga implements Workflow {
|
|
||||||
private final WorkflowStub stub;
|
private final WorkflowStub stub;
|
||||||
|
|
||||||
public WorkflowWithoutSaga(WorkflowStub stub) {
|
public TestWorkflow(WorkflowStub stub) {
|
||||||
this.stub = stub;
|
this.stub = stub;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -174,24 +51,4 @@ public class WorkflowTest {
|
||||||
return stub;
|
return stub;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class WorkflowWithSaga implements Workflow {
|
|
||||||
private final WorkflowStub stub;
|
|
||||||
|
|
||||||
public WorkflowWithSaga(WorkflowStub stub) {
|
|
||||||
this.stub = stub;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public WorkflowStub create() {
|
|
||||||
return stub;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SagaOptions getSagaOption() {
|
|
||||||
return SagaOptions.newBuilder()
|
|
||||||
.setParallelCompensation(false)
|
|
||||||
.build();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,55 +0,0 @@
|
||||||
package io.dapr.workflows.saga;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
|
||||||
import static org.mockito.Mockito.doNothing;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.times;
|
|
||||||
import static org.mockito.Mockito.verify;
|
|
||||||
|
|
||||||
import io.dapr.workflows.runtime.saga.DefaultSagaContext;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import io.dapr.workflows.WorkflowContext;
|
|
||||||
|
|
||||||
public class DefaultSagaContextTest {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDaprSagaContextImpl_IllegalArgumentException() {
|
|
||||||
Saga saga = mock(Saga.class);
|
|
||||||
WorkflowContext workflowContext = mock(WorkflowContext.class);
|
|
||||||
|
|
||||||
assertThrows(IllegalArgumentException.class, () -> {
|
|
||||||
new DefaultSagaContext(saga, null);
|
|
||||||
});
|
|
||||||
|
|
||||||
assertThrows(IllegalArgumentException.class, () -> {
|
|
||||||
new DefaultSagaContext(null, workflowContext);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void test_registerCompensation() {
|
|
||||||
Saga saga = mock(Saga.class);
|
|
||||||
WorkflowContext workflowContext = mock(WorkflowContext.class);
|
|
||||||
DefaultSagaContext ctx = new DefaultSagaContext(saga, workflowContext);
|
|
||||||
|
|
||||||
String activityClassName = "name1";
|
|
||||||
Object activityInput = new Object();
|
|
||||||
doNothing().when(saga).registerCompensation(activityClassName, activityInput);
|
|
||||||
|
|
||||||
ctx.registerCompensation(activityClassName, activityInput);
|
|
||||||
verify(saga, times(1)).registerCompensation(activityClassName, activityInput);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void test_compensate() {
|
|
||||||
Saga saga = mock(Saga.class);
|
|
||||||
WorkflowContext workflowContext = mock(WorkflowContext.class);
|
|
||||||
DefaultSagaContext ctx = new DefaultSagaContext(saga, workflowContext);
|
|
||||||
|
|
||||||
doNothing().when(saga).compensate(workflowContext);
|
|
||||||
|
|
||||||
ctx.compensate();
|
|
||||||
verify(saga, times(1)).compensate(workflowContext);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,322 +0,0 @@
|
||||||
package io.dapr.workflows.saga;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
||||||
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import io.dapr.workflows.WorkflowActivity;
|
|
||||||
import io.dapr.workflows.WorkflowActivityContext;
|
|
||||||
|
|
||||||
public class SagaIntegrationTest {
|
|
||||||
|
|
||||||
private static int count = 0;
|
|
||||||
private static Object countLock = new Object();
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSaga_CompensateSequentially() {
|
|
||||||
int runCount = 10;
|
|
||||||
int succeedCount = 0;
|
|
||||||
int compensateCount = 0;
|
|
||||||
|
|
||||||
for (int i = 0; i < runCount; i++) {
|
|
||||||
boolean isSuccueed = doExecuteWorkflowWithSaga(false);
|
|
||||||
if (isSuccueed) {
|
|
||||||
succeedCount++;
|
|
||||||
} else {
|
|
||||||
compensateCount++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
System.out.println("Run workflow with saga " + runCount + " times: succeed " + succeedCount
|
|
||||||
+ " times, failed and compensated " + compensateCount + " times");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSaga_compensateInParallel() {
|
|
||||||
int runCount = 100;
|
|
||||||
int succeedCount = 0;
|
|
||||||
int compensateCount = 0;
|
|
||||||
|
|
||||||
for (int i = 0; i < runCount; i++) {
|
|
||||||
boolean isSuccueed = doExecuteWorkflowWithSaga(true);
|
|
||||||
if (isSuccueed) {
|
|
||||||
succeedCount++;
|
|
||||||
} else {
|
|
||||||
compensateCount++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
System.out.println("Run workflow with saga " + runCount + " times: succeed " + succeedCount
|
|
||||||
+ " times, failed and compensated " + compensateCount + " times");
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean doExecuteWorkflowWithSaga(boolean parallelCompensation) {
|
|
||||||
SagaOptions config = SagaOptions.newBuilder()
|
|
||||||
.setParallelCompensation(parallelCompensation)
|
|
||||||
.setContinueWithError(true).build();
|
|
||||||
Saga saga = new Saga(config);
|
|
||||||
boolean workflowSuccess = false;
|
|
||||||
|
|
||||||
// reset count to zero
|
|
||||||
synchronized (countLock) {
|
|
||||||
count = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
Integer addInput = 100;
|
|
||||||
Integer subtractInput = 20;
|
|
||||||
Integer multiplyInput = 10;
|
|
||||||
Integer divideInput = 5;
|
|
||||||
|
|
||||||
try {
|
|
||||||
// step1: add activity
|
|
||||||
String result = callActivity(AddActivity.class.getName(), addInput, String.class);
|
|
||||||
saga.registerCompensation(AddCompentationActivity.class.getName(), addInput);
|
|
||||||
// step2: subtract activity
|
|
||||||
result = callActivity(SubtractActivity.class.getName(), subtractInput, String.class);
|
|
||||||
saga.registerCompensation(SubtractCompentationActivity.class.getName(), subtractInput);
|
|
||||||
|
|
||||||
if (parallelCompensation) {
|
|
||||||
// only add/subtract activities support parallel compensation
|
|
||||||
// so in step3 and step4 we repeat add/subtract activities
|
|
||||||
|
|
||||||
// step3: add activity again
|
|
||||||
result = callActivity(AddActivity.class.getName(), addInput, String.class);
|
|
||||||
saga.registerCompensation(AddCompentationActivity.class.getName(), addInput);
|
|
||||||
|
|
||||||
// step4: substract activity again
|
|
||||||
result = callActivity(SubtractActivity.class.getName(), subtractInput, String.class);
|
|
||||||
saga.registerCompensation(SubtractCompentationActivity.class.getName(), subtractInput);
|
|
||||||
} else {
|
|
||||||
// step3: multiply activity
|
|
||||||
result = callActivity(MultiplyActivity.class.getName(), multiplyInput, String.class);
|
|
||||||
saga.registerCompensation(MultiplyCompentationActivity.class.getName(), multiplyInput);
|
|
||||||
|
|
||||||
// step4: divide activity
|
|
||||||
result = callActivity(DivideActivity.class.getName(), divideInput, String.class);
|
|
||||||
saga.registerCompensation(DivideCompentationActivity.class.getName(), divideInput);
|
|
||||||
}
|
|
||||||
|
|
||||||
randomFail();
|
|
||||||
|
|
||||||
workflowSuccess = true;
|
|
||||||
} catch (Exception e) {
|
|
||||||
saga.compensate(SagaTest.createMockContext());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (workflowSuccess) {
|
|
||||||
int expectResult = 0;
|
|
||||||
if (parallelCompensation) {
|
|
||||||
expectResult = 0 + addInput - subtractInput + addInput - subtractInput;
|
|
||||||
} else {
|
|
||||||
expectResult = (0 + addInput - subtractInput) * multiplyInput / divideInput;
|
|
||||||
}
|
|
||||||
assertEquals(expectResult, count);
|
|
||||||
} else {
|
|
||||||
assertEquals(0, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
return workflowSuccess;
|
|
||||||
}
|
|
||||||
|
|
||||||
// mock to call activity in dapr workflow
|
|
||||||
private <V> V callActivity(String activityClassName, Object input, Class<V> returnType) {
|
|
||||||
try {
|
|
||||||
Class<?> activityClass = Class.forName(activityClassName);
|
|
||||||
WorkflowActivity activity = (WorkflowActivity) activityClass.getDeclaredConstructor().newInstance();
|
|
||||||
WorkflowActivityContext ctx = new WorkflowActivityContext() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public java.lang.String getName() {
|
|
||||||
return activityClassName;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <T> T getInput(Class<T> targetType) {
|
|
||||||
return (T) input;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
randomFail();
|
|
||||||
|
|
||||||
return (V) activity.run(ctx);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void randomFail() {
|
|
||||||
int randomInt = (int) (Math.random() * 100);
|
|
||||||
// if randomInt mod 10 is 0, then throw exception
|
|
||||||
if (randomInt % 10 == 0) {
|
|
||||||
throw new RuntimeException("random fail");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class AddActivity implements WorkflowActivity {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String run(WorkflowActivityContext ctx) {
|
|
||||||
Integer input = ctx.getInput(Integer.class);
|
|
||||||
|
|
||||||
int originalCount = 0;
|
|
||||||
int updatedCount = 0;
|
|
||||||
synchronized (countLock) {
|
|
||||||
originalCount = count;
|
|
||||||
updatedCount = originalCount + input;
|
|
||||||
count = updatedCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
String resultString = "current count is updated from " + originalCount + " to " + updatedCount
|
|
||||||
+ " after adding " + input;
|
|
||||||
// System.out.println(resultString);
|
|
||||||
return resultString;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class AddCompentationActivity implements WorkflowActivity {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String run(WorkflowActivityContext ctx) {
|
|
||||||
Integer input = ctx.getInput(Integer.class);
|
|
||||||
|
|
||||||
int originalCount = 0;
|
|
||||||
int updatedCount = 0;
|
|
||||||
synchronized (countLock) {
|
|
||||||
originalCount = count;
|
|
||||||
updatedCount = originalCount - input;
|
|
||||||
count = updatedCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
String resultString = "current count is compensated from " + originalCount + " to "
|
|
||||||
+ updatedCount + " after compensate adding " + input;
|
|
||||||
// System.out.println(resultString);
|
|
||||||
return resultString;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class SubtractActivity implements WorkflowActivity {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String run(WorkflowActivityContext ctx) {
|
|
||||||
Integer input = ctx.getInput(Integer.class);
|
|
||||||
|
|
||||||
int originalCount = 0;
|
|
||||||
int updatedCount = 0;
|
|
||||||
synchronized (countLock) {
|
|
||||||
originalCount = count;
|
|
||||||
updatedCount = originalCount - input;
|
|
||||||
count = updatedCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
String resultString = "current count is updated from " + originalCount + " to " + updatedCount
|
|
||||||
+ " after substracting " + input;
|
|
||||||
// System.out.println(resultString);
|
|
||||||
return resultString;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class SubtractCompentationActivity implements WorkflowActivity {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String run(WorkflowActivityContext ctx) {
|
|
||||||
Integer input = ctx.getInput(Integer.class);
|
|
||||||
|
|
||||||
int originalCount = 0;
|
|
||||||
int updatedCount = 0;
|
|
||||||
synchronized (countLock) {
|
|
||||||
originalCount = count;
|
|
||||||
updatedCount = originalCount + input;
|
|
||||||
count = updatedCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
String resultString = "current count is compensated from " + originalCount + " to " + updatedCount
|
|
||||||
+ " after compensate substracting " + input;
|
|
||||||
// System.out.println(resultString);
|
|
||||||
return resultString;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class MultiplyActivity implements WorkflowActivity {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String run(WorkflowActivityContext ctx) {
|
|
||||||
Integer input = ctx.getInput(Integer.class);
|
|
||||||
|
|
||||||
int originalCount = 0;
|
|
||||||
int updatedCount = 0;
|
|
||||||
synchronized (countLock) {
|
|
||||||
originalCount = count;
|
|
||||||
updatedCount = originalCount * input;
|
|
||||||
count = updatedCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
String resultString = "current count is updated from " + originalCount + " to " + updatedCount
|
|
||||||
+ " after multiplying " + input;
|
|
||||||
// System.out.println(resultString);
|
|
||||||
return resultString;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class MultiplyCompentationActivity implements WorkflowActivity {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String run(WorkflowActivityContext ctx) {
|
|
||||||
Integer input = ctx.getInput(Integer.class);
|
|
||||||
|
|
||||||
int originalCount = 0;
|
|
||||||
int updatedCount = 0;
|
|
||||||
synchronized (countLock) {
|
|
||||||
originalCount = count;
|
|
||||||
updatedCount = originalCount / input;
|
|
||||||
count = updatedCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
String resultString = "current count is compensated from " + originalCount + " to " + updatedCount
|
|
||||||
+ " after compensate multiplying " + input;
|
|
||||||
// System.out.println(resultString);
|
|
||||||
return resultString;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class DivideActivity implements WorkflowActivity {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String run(WorkflowActivityContext ctx) {
|
|
||||||
Integer input = ctx.getInput(Integer.class);
|
|
||||||
|
|
||||||
int originalCount = 0;
|
|
||||||
int updatedCount = 0;
|
|
||||||
synchronized (countLock) {
|
|
||||||
originalCount = count;
|
|
||||||
updatedCount = originalCount / input;
|
|
||||||
count = updatedCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
String resultString = "current count is updated from " + originalCount + " to " + updatedCount
|
|
||||||
+ " after dividing " + input;
|
|
||||||
// System.out.println(resultString);
|
|
||||||
return resultString;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class DivideCompentationActivity implements WorkflowActivity {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String run(WorkflowActivityContext ctx) {
|
|
||||||
Integer input = ctx.getInput(Integer.class);
|
|
||||||
|
|
||||||
int originalCount = 0;
|
|
||||||
int updatedCount = 0;
|
|
||||||
synchronized (countLock) {
|
|
||||||
originalCount = count;
|
|
||||||
updatedCount = originalCount * input;
|
|
||||||
count = updatedCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
String resultString = "current count is compensated from " + originalCount + " to " + updatedCount
|
|
||||||
+ " after compensate dividing " + input;
|
|
||||||
// System.out.println(resultString);
|
|
||||||
return resultString;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,50 +0,0 @@
|
||||||
package io.dapr.workflows.saga;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertThrows;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
||||||
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class SagaOptionsTest {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBuild() {
|
|
||||||
SagaOptions.Builder builder = SagaOptions.newBuilder();
|
|
||||||
builder.setParallelCompensation(true);
|
|
||||||
builder.setMaxParallelThread(32);
|
|
||||||
builder.setContinueWithError(false);
|
|
||||||
SagaOptions option = builder.build();
|
|
||||||
|
|
||||||
assertEquals(true, option.isParallelCompensation());
|
|
||||||
assertEquals(32, option.getMaxParallelThread());
|
|
||||||
assertEquals(false, option.isContinueWithError());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBuild_default() {
|
|
||||||
SagaOptions.Builder builder = SagaOptions.newBuilder();
|
|
||||||
SagaOptions option = builder.build();
|
|
||||||
|
|
||||||
assertEquals(false, option.isParallelCompensation());
|
|
||||||
assertEquals(16, option.getMaxParallelThread());
|
|
||||||
assertEquals(true, option.isContinueWithError());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testsetMaxParallelThread() {
|
|
||||||
SagaOptions.Builder builder = SagaOptions.newBuilder();
|
|
||||||
|
|
||||||
assertThrows(IllegalArgumentException.class, () -> {
|
|
||||||
builder.setMaxParallelThread(0);
|
|
||||||
});
|
|
||||||
|
|
||||||
assertThrows(IllegalArgumentException.class, () -> {
|
|
||||||
builder.setMaxParallelThread(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
assertThrows(IllegalArgumentException.class, () -> {
|
|
||||||
builder.setMaxParallelThread(-1);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,454 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2023 The Dapr Authors
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
package io.dapr.workflows.saga;
|
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
|
||||||
import static org.junit.jupiter.api.Assertions.fail;
|
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
|
||||||
import static org.mockito.ArgumentMatchers.anyList;
|
|
||||||
import static org.mockito.ArgumentMatchers.anyString;
|
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import io.dapr.workflows.WorkflowActivityContext;
|
|
||||||
import io.dapr.workflows.WorkflowTaskOptions;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.mockito.Mockito;
|
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
|
||||||
import org.mockito.stubbing.Answer;
|
|
||||||
|
|
||||||
import com.microsoft.durabletask.Task;
|
|
||||||
|
|
||||||
import io.dapr.workflows.WorkflowContext;
|
|
||||||
import io.dapr.workflows.WorkflowActivity;
|
|
||||||
|
|
||||||
public class SagaTest {
|
|
||||||
|
|
||||||
public static WorkflowContext createMockContext() {
|
|
||||||
WorkflowContext workflowContext = mock(WorkflowContext.class);
|
|
||||||
when(workflowContext.callActivity(anyString(), any(), eq((WorkflowTaskOptions) null))).thenAnswer(new ActivityAnswer());
|
|
||||||
when(workflowContext.allOf(anyList())).thenAnswer(new AllActivityAnswer());
|
|
||||||
|
|
||||||
return workflowContext;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSaga_IllegalArgument() {
|
|
||||||
assertThrows(IllegalArgumentException.class, () -> {
|
|
||||||
new Saga(null);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testregisterCompensation() {
|
|
||||||
SagaOptions config = SagaOptions.newBuilder()
|
|
||||||
.setParallelCompensation(false)
|
|
||||||
.setContinueWithError(true).build();
|
|
||||||
Saga saga = new Saga(config);
|
|
||||||
|
|
||||||
saga.registerCompensation(MockActivity.class.getName(), new MockActivityInput());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testregisterCompensation_IllegalArgument() {
|
|
||||||
SagaOptions config = SagaOptions.newBuilder()
|
|
||||||
.setParallelCompensation(false)
|
|
||||||
.setContinueWithError(true).build();
|
|
||||||
Saga saga = new Saga(config);
|
|
||||||
|
|
||||||
assertThrows(IllegalArgumentException.class, () -> {
|
|
||||||
saga.registerCompensation(null, "input");
|
|
||||||
});
|
|
||||||
assertThrows(IllegalArgumentException.class, () -> {
|
|
||||||
saga.registerCompensation("", "input");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCompensateInParallel() {
|
|
||||||
MockCompensationActivity.compensateOrder.clear();
|
|
||||||
|
|
||||||
SagaOptions config = SagaOptions.newBuilder()
|
|
||||||
.setParallelCompensation(true).build();
|
|
||||||
Saga saga = new Saga(config);
|
|
||||||
MockActivityInput input1 = new MockActivityInput();
|
|
||||||
input1.setOrder(1);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
|
|
||||||
MockActivityInput input2 = new MockActivityInput();
|
|
||||||
input2.setOrder(2);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
|
|
||||||
MockActivityInput input3 = new MockActivityInput();
|
|
||||||
input3.setOrder(3);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
|
|
||||||
|
|
||||||
saga.compensate(createMockContext());
|
|
||||||
|
|
||||||
assertEquals(3, MockCompensationActivity.compensateOrder.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCompensateInParallel_exception_1failed() {
|
|
||||||
MockCompensationActivity.compensateOrder.clear();
|
|
||||||
|
|
||||||
SagaOptions config = SagaOptions.newBuilder()
|
|
||||||
.setParallelCompensation(true).build();
|
|
||||||
Saga saga = new Saga(config);
|
|
||||||
MockActivityInput input1 = new MockActivityInput();
|
|
||||||
input1.setOrder(1);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
|
|
||||||
MockActivityInput input2 = new MockActivityInput();
|
|
||||||
input2.setOrder(2);
|
|
||||||
input2.setThrowException(true);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
|
|
||||||
MockActivityInput input3 = new MockActivityInput();
|
|
||||||
input3.setOrder(3);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
|
|
||||||
|
|
||||||
SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> {
|
|
||||||
saga.compensate(createMockContext());
|
|
||||||
});
|
|
||||||
assertNotNull(exception.getCause());
|
|
||||||
// 3 compentation activities, 2 succeed, 1 failed
|
|
||||||
assertEquals(0, exception.getSuppressed().length);
|
|
||||||
assertEquals(2, MockCompensationActivity.compensateOrder.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCompensateInParallel_exception_2failed() {
|
|
||||||
MockCompensationActivity.compensateOrder.clear();
|
|
||||||
|
|
||||||
SagaOptions config = SagaOptions.newBuilder()
|
|
||||||
.setParallelCompensation(true).build();
|
|
||||||
Saga saga = new Saga(config);
|
|
||||||
MockActivityInput input1 = new MockActivityInput();
|
|
||||||
input1.setOrder(1);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
|
|
||||||
MockActivityInput input2 = new MockActivityInput();
|
|
||||||
input2.setOrder(2);
|
|
||||||
input2.setThrowException(true);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
|
|
||||||
MockActivityInput input3 = new MockActivityInput();
|
|
||||||
input3.setOrder(3);
|
|
||||||
input3.setThrowException(true);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
|
|
||||||
|
|
||||||
SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> {
|
|
||||||
saga.compensate(createMockContext());
|
|
||||||
});
|
|
||||||
assertNotNull(exception.getCause());
|
|
||||||
// 3 compentation activities, 1 succeed, 2 failed
|
|
||||||
assertEquals(1, MockCompensationActivity.compensateOrder.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCompensateInParallel_exception_3failed() {
|
|
||||||
MockCompensationActivity.compensateOrder.clear();
|
|
||||||
|
|
||||||
SagaOptions config = SagaOptions.newBuilder()
|
|
||||||
.setParallelCompensation(true).build();
|
|
||||||
Saga saga = new Saga(config);
|
|
||||||
MockActivityInput input1 = new MockActivityInput();
|
|
||||||
input1.setOrder(1);
|
|
||||||
input1.setThrowException(true);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
|
|
||||||
MockActivityInput input2 = new MockActivityInput();
|
|
||||||
input2.setOrder(2);
|
|
||||||
input2.setThrowException(true);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
|
|
||||||
MockActivityInput input3 = new MockActivityInput();
|
|
||||||
input3.setOrder(3);
|
|
||||||
input3.setThrowException(true);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
|
|
||||||
|
|
||||||
SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> {
|
|
||||||
saga.compensate(createMockContext());
|
|
||||||
});
|
|
||||||
assertNotNull(exception.getCause());
|
|
||||||
// 3 compentation activities, 0 succeed, 3 failed
|
|
||||||
assertEquals(0, MockCompensationActivity.compensateOrder.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCompensateSequentially() {
|
|
||||||
MockCompensationActivity.compensateOrder.clear();
|
|
||||||
|
|
||||||
SagaOptions config = SagaOptions.newBuilder()
|
|
||||||
.setParallelCompensation(false).build();
|
|
||||||
Saga saga = new Saga(config);
|
|
||||||
MockActivityInput input1 = new MockActivityInput();
|
|
||||||
input1.setOrder(1);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
|
|
||||||
MockActivityInput input2 = new MockActivityInput();
|
|
||||||
input2.setOrder(2);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
|
|
||||||
MockActivityInput input3 = new MockActivityInput();
|
|
||||||
input3.setOrder(3);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
|
|
||||||
|
|
||||||
saga.compensate(createMockContext());
|
|
||||||
|
|
||||||
assertEquals(3, MockCompensationActivity.compensateOrder.size());
|
|
||||||
|
|
||||||
// the order should be 3 / 2 / 1
|
|
||||||
assertEquals(Integer.valueOf(3), MockCompensationActivity.compensateOrder.get(0));
|
|
||||||
assertEquals(Integer.valueOf(2), MockCompensationActivity.compensateOrder.get(1));
|
|
||||||
assertEquals(Integer.valueOf(1), MockCompensationActivity.compensateOrder.get(2));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCompensateSequentially_continueWithError() {
|
|
||||||
MockCompensationActivity.compensateOrder.clear();
|
|
||||||
|
|
||||||
SagaOptions config = SagaOptions.newBuilder()
|
|
||||||
.setParallelCompensation(false)
|
|
||||||
.setContinueWithError(true)
|
|
||||||
.build();
|
|
||||||
Saga saga = new Saga(config);
|
|
||||||
MockActivityInput input1 = new MockActivityInput();
|
|
||||||
input1.setOrder(1);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
|
|
||||||
MockActivityInput input2 = new MockActivityInput();
|
|
||||||
input2.setOrder(2);
|
|
||||||
input2.setThrowException(true);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
|
|
||||||
MockActivityInput input3 = new MockActivityInput();
|
|
||||||
input3.setOrder(3);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
|
|
||||||
|
|
||||||
SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> {
|
|
||||||
saga.compensate(createMockContext());
|
|
||||||
});
|
|
||||||
assertNotNull(exception.getCause());
|
|
||||||
assertEquals(0, exception.getSuppressed().length);
|
|
||||||
|
|
||||||
// 3 compentation activities, 2 succeed, 1 failed
|
|
||||||
assertEquals(2, MockCompensationActivity.compensateOrder.size());
|
|
||||||
// the order should be 3 / 1
|
|
||||||
assertEquals(Integer.valueOf(3), MockCompensationActivity.compensateOrder.get(0));
|
|
||||||
assertEquals(Integer.valueOf(1), MockCompensationActivity.compensateOrder.get(1));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCompensateSequentially_continueWithError_suppressed() {
|
|
||||||
MockCompensationActivity.compensateOrder.clear();
|
|
||||||
|
|
||||||
SagaOptions config = SagaOptions.newBuilder()
|
|
||||||
.setParallelCompensation(false)
|
|
||||||
.setContinueWithError(true)
|
|
||||||
.build();
|
|
||||||
Saga saga = new Saga(config);
|
|
||||||
MockActivityInput input1 = new MockActivityInput();
|
|
||||||
input1.setOrder(1);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
|
|
||||||
MockActivityInput input2 = new MockActivityInput();
|
|
||||||
input2.setOrder(2);
|
|
||||||
input2.setThrowException(true);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
|
|
||||||
MockActivityInput input3 = new MockActivityInput();
|
|
||||||
input3.setOrder(3);
|
|
||||||
input3.setThrowException(true);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
|
|
||||||
|
|
||||||
SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> {
|
|
||||||
saga.compensate(createMockContext());
|
|
||||||
});
|
|
||||||
assertNotNull(exception.getCause());
|
|
||||||
assertEquals(1, exception.getSuppressed().length);
|
|
||||||
|
|
||||||
// 3 compentation activities, 1 succeed, 2 failed
|
|
||||||
assertEquals(1, MockCompensationActivity.compensateOrder.size());
|
|
||||||
// the order should be 3 / 1
|
|
||||||
assertEquals(Integer.valueOf(1), MockCompensationActivity.compensateOrder.get(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCompensateSequentially_notContinueWithError() {
|
|
||||||
MockCompensationActivity.compensateOrder.clear();
|
|
||||||
|
|
||||||
SagaOptions config = SagaOptions.newBuilder()
|
|
||||||
.setParallelCompensation(false)
|
|
||||||
.setContinueWithError(false)
|
|
||||||
.build();
|
|
||||||
Saga saga = new Saga(config);
|
|
||||||
MockActivityInput input1 = new MockActivityInput();
|
|
||||||
input1.setOrder(1);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input1);
|
|
||||||
MockActivityInput input2 = new MockActivityInput();
|
|
||||||
input2.setOrder(2);
|
|
||||||
input2.setThrowException(true);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input2);
|
|
||||||
MockActivityInput input3 = new MockActivityInput();
|
|
||||||
input3.setOrder(3);
|
|
||||||
saga.registerCompensation(MockCompensationActivity.class.getName(), input3);
|
|
||||||
|
|
||||||
SagaCompensationException exception = assertThrows(SagaCompensationException.class, () -> {
|
|
||||||
saga.compensate(createMockContext());
|
|
||||||
});
|
|
||||||
assertNotNull(exception.getCause());
|
|
||||||
assertEquals(0, exception.getSuppressed().length);
|
|
||||||
|
|
||||||
// 3 compentation activities, 1 succeed, 1 failed and not continue
|
|
||||||
assertEquals(1, MockCompensationActivity.compensateOrder.size());
|
|
||||||
// the order should be 3 / 1
|
|
||||||
assertEquals(Integer.valueOf(3), MockCompensationActivity.compensateOrder.get(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class MockActivity implements WorkflowActivity {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Object run(WorkflowActivityContext ctx) {
|
|
||||||
MockActivityOutput output = new MockActivityOutput();
|
|
||||||
output.setSucceed(true);
|
|
||||||
return output;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class MockCompensationActivity implements WorkflowActivity {
|
|
||||||
|
|
||||||
private static final List<Integer> compensateOrder = Collections.synchronizedList(new ArrayList<>());
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Object run(WorkflowActivityContext ctx) {
|
|
||||||
MockActivityInput input = ctx.getInput(MockActivityInput.class);
|
|
||||||
|
|
||||||
if (input.isThrowException()) {
|
|
||||||
throw new RuntimeException("compensate failed: order=" + input.getOrder());
|
|
||||||
}
|
|
||||||
|
|
||||||
compensateOrder.add(input.getOrder());
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class MockActivityInput {
|
|
||||||
private int order = 0;
|
|
||||||
private boolean throwException;
|
|
||||||
|
|
||||||
public int getOrder() {
|
|
||||||
return order;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setOrder(int order) {
|
|
||||||
this.order = order;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isThrowException() {
|
|
||||||
return throwException;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setThrowException(boolean throwException) {
|
|
||||||
this.throwException = throwException;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class MockActivityOutput {
|
|
||||||
private boolean succeed;
|
|
||||||
|
|
||||||
public boolean isSucceed() {
|
|
||||||
return succeed;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setSucceed(boolean succeed) {
|
|
||||||
this.succeed = succeed;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class ActivityAnswer implements Answer<Task<Void>> {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Task<Void> answer(InvocationOnMock invocation) throws Throwable {
|
|
||||||
Object[] args = invocation.getArguments();
|
|
||||||
String name = (String) args[0];
|
|
||||||
Object input = args[1];
|
|
||||||
|
|
||||||
WorkflowActivity activity;
|
|
||||||
WorkflowActivityContext activityContext = Mockito.mock(WorkflowActivityContext.class);
|
|
||||||
try {
|
|
||||||
activity = (WorkflowActivity) Class.forName(name).getDeclaredConstructor().newInstance();
|
|
||||||
} catch (Exception e) {
|
|
||||||
fail(e);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
Task<Void> task = mock(Task.class);
|
|
||||||
when(task.await()).thenAnswer(invocation1 -> {
|
|
||||||
Mockito.doReturn(input).when(activityContext).getInput(Mockito.any());
|
|
||||||
activity.run(activityContext);
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
return task;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class AllActivityAnswer implements Answer<Task<Void>> {
|
|
||||||
@Override
|
|
||||||
public Task<Void> answer(InvocationOnMock invocation) throws Throwable {
|
|
||||||
Object[] args = invocation.getArguments();
|
|
||||||
List<Task<Void>> tasks = (List<Task<Void>>) args[0];
|
|
||||||
|
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(5);
|
|
||||||
List<Callable<Void>> compensationTasks = new ArrayList<>();
|
|
||||||
for (Task<Void> task : tasks) {
|
|
||||||
Callable<Void> compensationTask = new Callable<Void>() {
|
|
||||||
@Override
|
|
||||||
public Void call() {
|
|
||||||
return task.await();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
compensationTasks.add(compensationTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Future<Void>> resultFutures;
|
|
||||||
try {
|
|
||||||
resultFutures = executor.invokeAll(compensationTasks, 2, TimeUnit.SECONDS);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
fail(e);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
Task<Void> task = mock(Task.class);
|
|
||||||
when(task.await()).thenAnswer(new Answer<Void>() {
|
|
||||||
@Override
|
|
||||||
public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
||||||
Exception exception = null;
|
|
||||||
for (Future<Void> resultFuture : resultFutures) {
|
|
||||||
try {
|
|
||||||
resultFuture.get();
|
|
||||||
} catch (Exception e) {
|
|
||||||
exception = e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (exception != null) {
|
|
||||||
throw exception;
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return task;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue