From f31c24f5d2857e3d418731d5179265f8bcf84c87 Mon Sep 17 00:00:00 2001 From: Cassie Coyle Date: Thu, 8 May 2025 12:48:53 -0500 Subject: [PATCH] Use dapr/durabletask-java (#1336) * microsoft durabletask-java -> dapr durabletask-java Signed-off-by: Cassandra Coyle * update another ref Signed-off-by: Cassandra Coyle * 1.5.2 release Signed-off-by: Cassandra Coyle * fix import order Signed-off-by: Cassandra Coyle * Sdk new changes Signed-off-by: siri-varma * Refine workflows Signed-off-by: siri-varma * add ; Signed-off-by: Cassandra Coyle * rm try Signed-off-by: Cassandra Coyle --------- Signed-off-by: Cassandra Coyle Signed-off-by: siri-varma Co-authored-by: siri-varma Signed-off-by: sirivarma --- .../config/DaprWorkflowsConfiguration.java | 7 ++- .../unittesting/DaprWorkflowExampleTest.java | 4 +- .../workflows/chain/DemoChainWorker.java | 7 ++- .../DemoChildWorkflowWorker.java | 6 +-- .../DemoContinueAsNewWorker.java | 14 +++-- .../DemoExternalEventWorker.java | 7 ++- .../faninout/DemoFanInOutWorker.java | 7 ++- .../faninout/DemoFanInOutWorkflow.java | 2 +- pom.xml | 2 +- .../it/testcontainers/DaprWorkflowsIT.java | 7 ++- sdk-workflows/pom.xml | 6 +-- .../io/dapr/workflows/WorkflowContext.java | 8 +-- .../workflows/client/DaprWorkflowClient.java | 10 ++-- .../DefaultWorkflowActivityContext.java | 2 +- .../runtime/DefaultWorkflowContext.java | 12 ++--- .../DefaultWorkflowFailureDetails.java | 2 +- .../DefaultWorkflowInstanceStatus.java | 6 +-- .../runtime/WorkflowActivityClassWrapper.java | 4 +- .../WorkflowActivityInstanceWrapper.java | 4 +- .../runtime/WorkflowClassWrapper.java | 4 +- .../runtime/WorkflowInstanceWrapper.java | 4 +- .../workflows/runtime/WorkflowRuntime.java | 51 ++++++++++++++++--- .../runtime/WorkflowRuntimeBuilder.java | 27 ++++++++-- .../WorkflowRuntimeStatusConverter.java | 2 +- .../workflows/DefaultWorkflowContextTest.java | 10 ++-- .../client/DaprWorkflowClientTest.java | 8 +-- .../client/WorkflowInstanceStatusTest.java | 6 +-- .../WorkflowActivityClassWrapperTest.java | 2 +- .../WorkflowActivityInstanceWrapperTest.java | 2 +- .../runtime/WorkflowClassWrapperTest.java | 2 +- .../runtime/WorkflowInstanceWrapperTest.java | 2 +- .../runtime/WorkflowRuntimeBuilderTest.java | 15 +++--- .../WorkflowRuntimeStatusConverterTest.java | 2 +- .../runtime/WorkflowRuntimeTest.java | 18 ++++--- 34 files changed, 165 insertions(+), 107 deletions(-) diff --git a/dapr-spring/dapr-spring-workflows/src/main/java/io/dapr/spring/workflows/config/DaprWorkflowsConfiguration.java b/dapr-spring/dapr-spring-workflows/src/main/java/io/dapr/spring/workflows/config/DaprWorkflowsConfiguration.java index 8629982a9..18e402414 100644 --- a/dapr-spring/dapr-spring-workflows/src/main/java/io/dapr/spring/workflows/config/DaprWorkflowsConfiguration.java +++ b/dapr-spring/dapr-spring-workflows/src/main/java/io/dapr/spring/workflows/config/DaprWorkflowsConfiguration.java @@ -46,10 +46,9 @@ public class DaprWorkflowsConfiguration implements ApplicationContextAware { workflowRuntimeBuilder.registerActivity(activity); } - try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) { - LOGGER.info("Starting workflow runtime ... "); - runtime.start(false); - } + WorkflowRuntime runtime = workflowRuntimeBuilder.build(); + LOGGER.info("Starting workflow runtime ... "); + runtime.start(false); } @Override diff --git a/examples/src/main/java/io/dapr/examples/unittesting/DaprWorkflowExampleTest.java b/examples/src/main/java/io/dapr/examples/unittesting/DaprWorkflowExampleTest.java index ef0e6702a..b8ce0ef67 100644 --- a/examples/src/main/java/io/dapr/examples/unittesting/DaprWorkflowExampleTest.java +++ b/examples/src/main/java/io/dapr/examples/unittesting/DaprWorkflowExampleTest.java @@ -13,8 +13,8 @@ limitations under the License. package io.dapr.examples.unittesting; -import com.microsoft.durabletask.Task; -import com.microsoft.durabletask.TaskCanceledException; +import io.dapr.durabletask.Task; +import io.dapr.durabletask.TaskCanceledException; import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowStub; diff --git a/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorker.java b/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorker.java index 61b1572bd..51fb9ae6a 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorker.java +++ b/examples/src/main/java/io/dapr/examples/workflows/chain/DemoChainWorker.java @@ -29,9 +29,8 @@ public class DemoChainWorker { builder.registerActivity(ToUpperCaseActivity.class); // Build and then start the workflow runtime pulling and executing tasks - try (WorkflowRuntime runtime = builder.build()) { - System.out.println("Start workflow runtime"); - runtime.start(); - } + WorkflowRuntime runtime = builder.build(); + System.out.println("Start workflow runtime"); + runtime.start(); } } diff --git a/examples/src/main/java/io/dapr/examples/workflows/childworkflow/DemoChildWorkflowWorker.java b/examples/src/main/java/io/dapr/examples/workflows/childworkflow/DemoChildWorkflowWorker.java index 81a9b4973..0e692551e 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/childworkflow/DemoChildWorkflowWorker.java +++ b/examples/src/main/java/io/dapr/examples/workflows/childworkflow/DemoChildWorkflowWorker.java @@ -31,9 +31,7 @@ public class DemoChildWorkflowWorker { builder.registerActivity(ReverseActivity.class); // Build and then start the workflow runtime pulling and executing tasks - try (WorkflowRuntime runtime = builder.build()) { - System.out.println("Start workflow runtime"); - runtime.start(); - } + WorkflowRuntime runtime = builder.build(); + System.out.println("Start workflow runtime"); } } diff --git a/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorker.java b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorker.java index 0ca050e87..43ef176a2 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorker.java +++ b/examples/src/main/java/io/dapr/examples/workflows/continueasnew/DemoContinueAsNewWorker.java @@ -16,6 +16,9 @@ package io.dapr.examples.workflows.continueasnew; import io.dapr.workflows.runtime.WorkflowRuntime; import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + public class DemoContinueAsNewWorker { /** * The main method of this app. @@ -25,13 +28,14 @@ public class DemoContinueAsNewWorker { */ public static void main(String[] args) throws Exception { // Register the Workflow with the builder. - WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoContinueAsNewWorkflow.class); + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder(). + registerWorkflow(DemoContinueAsNewWorkflow.class) + .withExecutorService(Executors.newFixedThreadPool(3)); builder.registerActivity(CleanUpActivity.class); // Build and then start the workflow runtime pulling and executing tasks - try (WorkflowRuntime runtime = builder.build()) { - System.out.println("Start workflow runtime"); - runtime.start(); - } + WorkflowRuntime runtime = builder.build(); + System.out.println("Start workflow runtime"); + runtime.start(); } } diff --git a/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorker.java b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorker.java index aaa1e7c81..f7d0c8ebf 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorker.java +++ b/examples/src/main/java/io/dapr/examples/workflows/externalevent/DemoExternalEventWorker.java @@ -30,9 +30,8 @@ public class DemoExternalEventWorker { builder.registerActivity(DenyActivity.class); // Build and then start the workflow runtime pulling and executing tasks - try (WorkflowRuntime runtime = builder.build()) { - System.out.println("Start workflow runtime"); - runtime.start(); - } + WorkflowRuntime runtime = builder.build(); + System.out.println("Start workflow runtime"); + runtime.start(); } } diff --git a/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorker.java b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorker.java index d5c6d14e7..4c691dbc3 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorker.java +++ b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorker.java @@ -29,9 +29,8 @@ public class DemoFanInOutWorker { builder.registerActivity(CountWordsActivity.class); // Build and then start the workflow runtime pulling and executing tasks - try (WorkflowRuntime runtime = builder.build()) { - System.out.println("Start workflow runtime"); - runtime.start(); - } + WorkflowRuntime runtime = builder.build(); + System.out.println("Start workflow runtime"); + runtime.start(false); } } diff --git a/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorkflow.java index 1760a53b3..611b1cac6 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorkflow.java +++ b/examples/src/main/java/io/dapr/examples/workflows/faninout/DemoFanInOutWorkflow.java @@ -13,7 +13,7 @@ limitations under the License. package io.dapr.examples.workflows.faninout; -import com.microsoft.durabletask.Task; +import io.dapr.durabletask.Task; import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowStub; diff --git a/pom.xml b/pom.xml index 51f507002..19842bf1c 100644 --- a/pom.xml +++ b/pom.xml @@ -36,7 +36,7 @@ 2.16.1 true diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java index ca8678e2d..5c6a360c8 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java @@ -91,10 +91,9 @@ public class DaprWorkflowsIT { */ @BeforeEach public void init() { - try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) { - System.out.println("Start workflow runtime"); - runtime.start(false); - } + WorkflowRuntime runtime = workflowRuntimeBuilder.build(); + System.out.println("Start workflow runtime"); + runtime.start(false); } @Test diff --git a/sdk-workflows/pom.xml b/sdk-workflows/pom.xml index a5c4ae949..d8c7ada90 100644 --- a/sdk-workflows/pom.xml +++ b/sdk-workflows/pom.xml @@ -45,14 +45,14 @@ test - com.microsoft + io.dapr durabletask-client - 1.5.0 + 1.5.2 com.fasterxml.jackson.core diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java index 9ed34fdc1..f649f0086 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/WorkflowContext.java @@ -13,10 +13,10 @@ limitations under the License. package io.dapr.workflows; -import com.microsoft.durabletask.CompositeTaskFailedException; -import com.microsoft.durabletask.Task; -import com.microsoft.durabletask.TaskCanceledException; -import com.microsoft.durabletask.TaskFailedException; +import io.dapr.durabletask.CompositeTaskFailedException; +import io.dapr.durabletask.Task; +import io.dapr.durabletask.TaskCanceledException; +import io.dapr.durabletask.TaskFailedException; import org.slf4j.Logger; import javax.annotation.Nullable; diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java index e40d0640a..ab46dff79 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java @@ -13,12 +13,12 @@ limitations under the License. package io.dapr.workflows.client; -import com.microsoft.durabletask.DurableTaskClient; -import com.microsoft.durabletask.DurableTaskGrpcClientBuilder; -import com.microsoft.durabletask.NewOrchestrationInstanceOptions; -import com.microsoft.durabletask.OrchestrationMetadata; -import com.microsoft.durabletask.PurgeResult; import io.dapr.config.Properties; +import io.dapr.durabletask.DurableTaskClient; +import io.dapr.durabletask.DurableTaskGrpcClientBuilder; +import io.dapr.durabletask.NewOrchestrationInstanceOptions; +import io.dapr.durabletask.OrchestrationMetadata; +import io.dapr.durabletask.PurgeResult; import io.dapr.utils.NetworkUtils; import io.dapr.workflows.Workflow; import io.dapr.workflows.internal.ApiTokenClientInterceptor; diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java index 4a16d45f4..551c21a37 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowActivityContext.java @@ -13,7 +13,7 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.TaskActivityContext; +import io.dapr.durabletask.TaskActivityContext; import io.dapr.workflows.WorkflowActivityContext; /** diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java index 20572995c..a6c09fe76 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowContext.java @@ -13,12 +13,12 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.CompositeTaskFailedException; -import com.microsoft.durabletask.RetryPolicy; -import com.microsoft.durabletask.Task; -import com.microsoft.durabletask.TaskCanceledException; -import com.microsoft.durabletask.TaskOptions; -import com.microsoft.durabletask.TaskOrchestrationContext; +import io.dapr.durabletask.CompositeTaskFailedException; +import io.dapr.durabletask.RetryPolicy; +import io.dapr.durabletask.Task; +import io.dapr.durabletask.TaskCanceledException; +import io.dapr.durabletask.TaskOptions; +import io.dapr.durabletask.TaskOrchestrationContext; import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowTaskOptions; import io.dapr.workflows.WorkflowTaskRetryPolicy; diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowFailureDetails.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowFailureDetails.java index 6919a510e..c51fe7c3a 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowFailureDetails.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowFailureDetails.java @@ -13,7 +13,7 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.FailureDetails; +import io.dapr.durabletask.FailureDetails; import io.dapr.workflows.client.WorkflowFailureDetails; /** diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowInstanceStatus.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowInstanceStatus.java index d1082adb8..392357bc3 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowInstanceStatus.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/DefaultWorkflowInstanceStatus.java @@ -13,9 +13,9 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.FailureDetails; -import com.microsoft.durabletask.OrchestrationMetadata; -import com.microsoft.durabletask.OrchestrationRuntimeStatus; +import io.dapr.durabletask.FailureDetails; +import io.dapr.durabletask.OrchestrationMetadata; +import io.dapr.durabletask.OrchestrationRuntimeStatus; import io.dapr.workflows.client.WorkflowFailureDetails; import io.dapr.workflows.client.WorkflowInstanceStatus; import io.dapr.workflows.client.WorkflowRuntimeStatus; diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapper.java index 3dcb8ef6b..0bbfd4beb 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapper.java @@ -13,8 +13,8 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.TaskActivity; -import com.microsoft.durabletask.TaskActivityFactory; +import io.dapr.durabletask.TaskActivity; +import io.dapr.durabletask.TaskActivityFactory; import io.dapr.workflows.WorkflowActivity; import java.lang.reflect.Constructor; diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapper.java index 17d509924..537427318 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapper.java @@ -13,8 +13,8 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.TaskActivity; -import com.microsoft.durabletask.TaskActivityFactory; +import io.dapr.durabletask.TaskActivity; +import io.dapr.durabletask.TaskActivityFactory; import io.dapr.workflows.WorkflowActivity; /** diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java index 4fab3f9cd..10b524874 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java @@ -13,8 +13,8 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.TaskOrchestration; -import com.microsoft.durabletask.TaskOrchestrationFactory; +import io.dapr.durabletask.TaskOrchestration; +import io.dapr.durabletask.TaskOrchestrationFactory; import io.dapr.workflows.Workflow; import java.lang.reflect.Constructor; diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java index ad3159406..f803c49de 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java @@ -13,8 +13,8 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.TaskOrchestration; -import com.microsoft.durabletask.TaskOrchestrationFactory; +import io.dapr.durabletask.TaskOrchestration; +import io.dapr.durabletask.TaskOrchestrationFactory; import io.dapr.workflows.Workflow; /** diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java index 6754f675b..7ac44e000 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntime.java @@ -13,17 +13,34 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.DurableTaskGrpcWorker; +import io.dapr.durabletask.DurableTaskGrpcWorker; +import io.grpc.ManagedChannel; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; /** * Contains methods to register workflows and activities. */ public class WorkflowRuntime implements AutoCloseable { - private DurableTaskGrpcWorker worker; + private final DurableTaskGrpcWorker worker; + private final ManagedChannel managedChannel; + private final ExecutorService executorService; - public WorkflowRuntime(DurableTaskGrpcWorker worker) { + /** + * Constructor. + * + * @param worker grpcWorker processing activities. + * @param managedChannel grpc channel. + * @param executorService executor service responsible for running the threads. + */ + public WorkflowRuntime(DurableTaskGrpcWorker worker, + ManagedChannel managedChannel, + ExecutorService executorService) { this.worker = worker; + this.managedChannel = managedChannel; + this.executorService = executorService; } /** @@ -50,11 +67,31 @@ public class WorkflowRuntime implements AutoCloseable { /** * {@inheritDoc} */ - @Override public void close() { - if (this.worker != null) { - this.worker.close(); - this.worker = null; + this.shutDownWorkerPool(); + this.closeSideCarChannel(); + } + + private void closeSideCarChannel() { + this.managedChannel.shutdown(); + + try { + if (!this.managedChannel.awaitTermination(60, TimeUnit.SECONDS)) { + this.managedChannel.shutdownNow(); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + private void shutDownWorkerPool() { + this.executorService.shutdown(); + try { + if (!this.executorService.awaitTermination(60, TimeUnit.SECONDS)) { + this.executorService.shutdownNow(); + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); } } } \ No newline at end of file diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java index 397e58b30..7f1147a0d 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java @@ -13,8 +13,8 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder; import io.dapr.config.Properties; +import io.dapr.durabletask.DurableTaskGrpcWorkerBuilder; import io.dapr.utils.NetworkUtils; import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowActivity; @@ -26,6 +26,8 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; public class WorkflowRuntimeBuilder { private static final ClientInterceptor WORKFLOW_INTERCEPTOR = new ApiTokenClientInterceptor(); @@ -36,6 +38,8 @@ public class WorkflowRuntimeBuilder { private final Set activitySet = Collections.synchronizedSet(new HashSet<>()); private final Set workflowSet = Collections.synchronizedSet(new HashSet<>()); private final DurableTaskGrpcWorkerBuilder builder; + private final ManagedChannel managedChannel; + private ExecutorService executorService; /** * Constructs the WorkflowRuntimeBuilder. @@ -58,8 +62,8 @@ public class WorkflowRuntimeBuilder { } private WorkflowRuntimeBuilder(Properties properties, Logger logger) { - ManagedChannel managedChannel = NetworkUtils.buildGrpcManagedChannel(properties, WORKFLOW_INTERCEPTOR); - this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(managedChannel); + this.managedChannel = NetworkUtils.buildGrpcManagedChannel(properties, WORKFLOW_INTERCEPTOR); + this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(this.managedChannel); this.logger = logger; } @@ -71,8 +75,11 @@ public class WorkflowRuntimeBuilder { public WorkflowRuntime build() { if (instance == null) { synchronized (WorkflowRuntime.class) { + this.executorService = this.executorService == null ? Executors.newCachedThreadPool() : this.executorService; if (instance == null) { - instance = new WorkflowRuntime(this.builder.build()); + instance = new WorkflowRuntime( + this.builder.withExecutorService(this.executorService).build(), + this.managedChannel, this.executorService); } } } @@ -84,6 +91,18 @@ public class WorkflowRuntimeBuilder { return instance; } + /** + * Register Executor Service to use with workflow. + * + * @param executorService to be used. + * @return {@link WorkflowRuntimeBuilder}. + */ + public WorkflowRuntimeBuilder withExecutorService(ExecutorService executorService) { + this.executorService = executorService; + this.builder.withExecutorService(executorService); + return this; + } + /** * Registers a Workflow object. * diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverter.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverter.java index 198a1215c..2900916aa 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverter.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverter.java @@ -13,7 +13,7 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.OrchestrationRuntimeStatus; +import io.dapr.durabletask.OrchestrationRuntimeStatus; import io.dapr.workflows.client.WorkflowRuntimeStatus; public class WorkflowRuntimeStatusConverter { diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java index 61d153484..0de864c88 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java @@ -13,11 +13,11 @@ limitations under the License. package io.dapr.workflows; -import com.microsoft.durabletask.CompositeTaskFailedException; -import com.microsoft.durabletask.Task; -import com.microsoft.durabletask.TaskCanceledException; -import com.microsoft.durabletask.TaskOptions; -import com.microsoft.durabletask.TaskOrchestrationContext; +import io.dapr.durabletask.CompositeTaskFailedException; +import io.dapr.durabletask.Task; +import io.dapr.durabletask.TaskCanceledException; +import io.dapr.durabletask.TaskOptions; +import io.dapr.durabletask.TaskOrchestrationContext; import io.dapr.workflows.runtime.DefaultWorkflowContext; diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java index 4b2b7ec52..3ad66877c 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java @@ -13,10 +13,10 @@ limitations under the License. package io.dapr.workflows.client; -import com.microsoft.durabletask.DurableTaskClient; -import com.microsoft.durabletask.NewOrchestrationInstanceOptions; -import com.microsoft.durabletask.OrchestrationMetadata; -import com.microsoft.durabletask.OrchestrationRuntimeStatus; +import io.dapr.durabletask.DurableTaskClient; +import io.dapr.durabletask.NewOrchestrationInstanceOptions; +import io.dapr.durabletask.OrchestrationMetadata; +import io.dapr.durabletask.OrchestrationRuntimeStatus; import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowStub; diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowInstanceStatusTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowInstanceStatusTest.java index f4f40d056..776e07081 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowInstanceStatusTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/client/WorkflowInstanceStatusTest.java @@ -13,9 +13,9 @@ limitations under the License. package io.dapr.workflows.client; -import com.microsoft.durabletask.FailureDetails; -import com.microsoft.durabletask.OrchestrationMetadata; -import com.microsoft.durabletask.OrchestrationRuntimeStatus; +import io.dapr.durabletask.FailureDetails; +import io.dapr.durabletask.OrchestrationMetadata; +import io.dapr.durabletask.OrchestrationRuntimeStatus; import io.dapr.workflows.runtime.DefaultWorkflowInstanceStatus; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java index 078317605..76a7e07af 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java @@ -1,6 +1,6 @@ package io.dapr.workflows.runtime; -import com.microsoft.durabletask.TaskActivityContext; +import io.dapr.durabletask.TaskActivityContext; import io.dapr.workflows.WorkflowActivity; import io.dapr.workflows.WorkflowActivityContext; import org.junit.Test; diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapperTest.java index bd8788bbd..0c680ea51 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapperTest.java @@ -1,6 +1,6 @@ package io.dapr.workflows.runtime; -import com.microsoft.durabletask.TaskActivityContext; +import io.dapr.durabletask.TaskActivityContext; import io.dapr.workflows.WorkflowActivity; import io.dapr.workflows.WorkflowActivityContext; import org.junit.Test; diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java index a73b616bc..fd76cadaf 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java @@ -13,7 +13,7 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.TaskOrchestrationContext; +import io.dapr.durabletask.TaskOrchestrationContext; import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowStub; diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowInstanceWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowInstanceWrapperTest.java index 22f315aa5..85849af21 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowInstanceWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowInstanceWrapperTest.java @@ -13,7 +13,7 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.TaskOrchestrationContext; +import io.dapr.durabletask.TaskOrchestrationContext; import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowStub; diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java index c159930b9..2b3341fbf 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java @@ -67,7 +67,8 @@ public class WorkflowRuntimeBuilderTest { @Test public void buildTest() { assertDoesNotThrow(() -> { - try (WorkflowRuntime runtime = new WorkflowRuntimeBuilder().build()) { + try { + WorkflowRuntime runtime = new WorkflowRuntimeBuilder().build(); System.out.println("WorkflowRuntime created"); } catch (Exception e) { throw new RuntimeException(e); @@ -88,13 +89,11 @@ public class WorkflowRuntimeBuilderTest { WorkflowRuntimeBuilder workflowRuntimeBuilder = new WorkflowRuntimeBuilder(); - try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) { - verify(testLogger, times(1)) - .info(eq("Registered Workflow: {}"), eq("TestWorkflow")); + WorkflowRuntime runtime = workflowRuntimeBuilder.build(); + verify(testLogger, times(1)) + .info(eq("Registered Workflow: {}"), eq("TestWorkflow")); - verify(testLogger, times(1)) - .info(eq("Registered Activity: {}"), eq("TestActivity")); - } + verify(testLogger, times(1)) + .info(eq("Registered Activity: {}"), eq("TestActivity")); } - } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverterTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverterTest.java index 04783d961..9e2d4f983 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverterTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeStatusConverterTest.java @@ -13,7 +13,7 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.OrchestrationRuntimeStatus; +import io.dapr.durabletask.OrchestrationRuntimeStatus; import io.dapr.workflows.client.WorkflowRuntimeStatus; import org.junit.jupiter.api.Test; diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeTest.java index d3bd40a43..9ee2710e7 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeTest.java @@ -14,10 +14,15 @@ limitations under the License. package io.dapr.workflows.runtime; -import com.microsoft.durabletask.DurableTaskGrpcWorker; -import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder; +import io.dapr.durabletask.DurableTaskGrpcWorker; +import io.dapr.durabletask.DurableTaskGrpcWorkerBuilder; +import io.dapr.config.Properties; +import io.dapr.utils.NetworkUtils; import org.junit.jupiter.api.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; public class WorkflowRuntimeTest { @@ -25,15 +30,16 @@ public class WorkflowRuntimeTest { @Test public void startTest() { DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder().build(); - try (WorkflowRuntime runtime = new WorkflowRuntime(worker)) { - assertDoesNotThrow(() -> runtime.start(false)); - } + WorkflowRuntime runtime = new WorkflowRuntime(worker, NetworkUtils.buildGrpcManagedChannel(new Properties()), + Executors.newCachedThreadPool()); + assertDoesNotThrow(() -> runtime.start(false)); } @Test public void closeWithoutStarting() { DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder().build(); - try (WorkflowRuntime runtime = new WorkflowRuntime(worker)) { + try (WorkflowRuntime runtime = new WorkflowRuntime(worker, NetworkUtils.buildGrpcManagedChannel(new Properties()), + Executors.newCachedThreadPool())) { assertDoesNotThrow(runtime::close); } }