Use dapr/durabletask-java (#1336)

* microsoft durabletask-java -> dapr durabletask-java

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>

* update another ref

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>

* 1.5.2 release

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>

* fix import order

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>

* Sdk new changes

Signed-off-by: siri-varma <siri.varma@outlook.com>

* Refine workflows

Signed-off-by: siri-varma <siri.varma@outlook.com>

* add ;

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>

* rm try

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>

---------

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>
Signed-off-by: siri-varma <siri.varma@outlook.com>
Co-authored-by: siri-varma <siri.varma@outlook.com>
This commit is contained in:
Cassie Coyle 2025-05-08 12:48:53 -05:00 committed by GitHub
parent ecc94f5b94
commit 7ed4d9184c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 165 additions and 107 deletions

View File

@ -46,11 +46,10 @@ public class DaprWorkflowsConfiguration implements ApplicationContextAware {
workflowRuntimeBuilder.registerActivity(activity);
}
try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) {
WorkflowRuntime runtime = workflowRuntimeBuilder.build();
LOGGER.info("Starting workflow runtime ... ");
runtime.start(false);
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

View File

@ -13,8 +13,8 @@ limitations under the License.
package io.dapr.examples.unittesting;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import io.dapr.durabletask.Task;
import io.dapr.durabletask.TaskCanceledException;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowStub;

View File

@ -29,9 +29,8 @@ public class DemoChainWorker {
builder.registerActivity(ToUpperCaseActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
WorkflowRuntime runtime = builder.build();
System.out.println("Start workflow runtime");
runtime.start();
}
}
}

View File

@ -31,9 +31,7 @@ public class DemoChildWorkflowWorker {
builder.registerActivity(ReverseActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
WorkflowRuntime runtime = builder.build();
System.out.println("Start workflow runtime");
runtime.start();
}
}
}

View File

@ -16,6 +16,9 @@ package io.dapr.examples.workflows.continueasnew;
import io.dapr.workflows.runtime.WorkflowRuntime;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DemoContinueAsNewWorker {
/**
* The main method of this app.
@ -25,13 +28,14 @@ public class DemoContinueAsNewWorker {
*/
public static void main(String[] args) throws Exception {
// Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoContinueAsNewWorkflow.class);
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().
registerWorkflow(DemoContinueAsNewWorkflow.class)
.withExecutorService(Executors.newFixedThreadPool(3));
builder.registerActivity(CleanUpActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
WorkflowRuntime runtime = builder.build();
System.out.println("Start workflow runtime");
runtime.start();
}
}
}

View File

@ -30,9 +30,8 @@ public class DemoExternalEventWorker {
builder.registerActivity(DenyActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
WorkflowRuntime runtime = builder.build();
System.out.println("Start workflow runtime");
runtime.start();
}
}
}

View File

@ -29,9 +29,8 @@ public class DemoFanInOutWorker {
builder.registerActivity(CountWordsActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
WorkflowRuntime runtime = builder.build();
System.out.println("Start workflow runtime");
runtime.start();
}
runtime.start(false);
}
}

View File

@ -13,7 +13,7 @@ limitations under the License.
package io.dapr.examples.workflows.faninout;
import com.microsoft.durabletask.Task;
import io.dapr.durabletask.Task;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;

View File

@ -36,7 +36,7 @@
<!--
manually declare durabletask-client's jackson dependencies for workflows sdk
which conflict with dapr-sdk's jackson dependencies
https://github.com/microsoft/durabletask-java/blob/main/client/build.gradle#L16
https://github.com/dapr/durabletask-java/blob/main/client/build.gradle#L16
-->
<jackson.version>2.16.1</jackson.version>
<gpg.skip>true</gpg.skip>

View File

@ -91,11 +91,10 @@ public class DaprWorkflowsIT {
*/
@BeforeEach
public void init() {
try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) {
WorkflowRuntime runtime = workflowRuntimeBuilder.build();
System.out.println("Start workflow runtime");
runtime.start(false);
}
}
@Test
public void testWorkflows() throws Exception {

View File

@ -45,14 +45,14 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.microsoft</groupId>
<groupId>io.dapr</groupId>
<artifactId>durabletask-client</artifactId>
<version>1.5.0</version>
<version>1.5.2</version>
</dependency>
<!--
manually declare durabletask-client's jackson dependencies
which conflict with dapr-sdk's jackson dependencies
https://github.com/microsoft/durabletask-java/blob/main/client/build.gradle#L16
https://github.com/dapr/durabletask-java/blob/main/client/build.gradle#L16
-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>

View File

@ -13,10 +13,10 @@ limitations under the License.
package io.dapr.workflows;
import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskFailedException;
import io.dapr.durabletask.CompositeTaskFailedException;
import io.dapr.durabletask.Task;
import io.dapr.durabletask.TaskCanceledException;
import io.dapr.durabletask.TaskFailedException;
import org.slf4j.Logger;
import javax.annotation.Nullable;

View File

@ -13,12 +13,12 @@ limitations under the License.
package io.dapr.workflows.client;
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.DurableTaskGrpcClientBuilder;
import com.microsoft.durabletask.NewOrchestrationInstanceOptions;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.PurgeResult;
import io.dapr.config.Properties;
import io.dapr.durabletask.DurableTaskClient;
import io.dapr.durabletask.DurableTaskGrpcClientBuilder;
import io.dapr.durabletask.NewOrchestrationInstanceOptions;
import io.dapr.durabletask.OrchestrationMetadata;
import io.dapr.durabletask.PurgeResult;
import io.dapr.utils.NetworkUtils;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.internal.ApiTokenClientInterceptor;

View File

@ -13,7 +13,7 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.TaskActivityContext;
import io.dapr.durabletask.TaskActivityContext;
import io.dapr.workflows.WorkflowActivityContext;
/**

View File

@ -13,12 +13,12 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.RetryPolicy;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskOptions;
import com.microsoft.durabletask.TaskOrchestrationContext;
import io.dapr.durabletask.CompositeTaskFailedException;
import io.dapr.durabletask.RetryPolicy;
import io.dapr.durabletask.Task;
import io.dapr.durabletask.TaskCanceledException;
import io.dapr.durabletask.TaskOptions;
import io.dapr.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryPolicy;

View File

@ -13,7 +13,7 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.FailureDetails;
import io.dapr.durabletask.FailureDetails;
import io.dapr.workflows.client.WorkflowFailureDetails;
/**

View File

@ -13,9 +13,9 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.FailureDetails;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import io.dapr.durabletask.FailureDetails;
import io.dapr.durabletask.OrchestrationMetadata;
import io.dapr.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.client.WorkflowFailureDetails;
import io.dapr.workflows.client.WorkflowInstanceStatus;
import io.dapr.workflows.client.WorkflowRuntimeStatus;

View File

@ -13,8 +13,8 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.TaskActivity;
import com.microsoft.durabletask.TaskActivityFactory;
import io.dapr.durabletask.TaskActivity;
import io.dapr.durabletask.TaskActivityFactory;
import io.dapr.workflows.WorkflowActivity;
import java.lang.reflect.Constructor;

View File

@ -13,8 +13,8 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.TaskActivity;
import com.microsoft.durabletask.TaskActivityFactory;
import io.dapr.durabletask.TaskActivity;
import io.dapr.durabletask.TaskActivityFactory;
import io.dapr.workflows.WorkflowActivity;
/**

View File

@ -13,8 +13,8 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationFactory;
import io.dapr.durabletask.TaskOrchestration;
import io.dapr.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.Workflow;
import java.lang.reflect.Constructor;

View File

@ -13,8 +13,8 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationFactory;
import io.dapr.durabletask.TaskOrchestration;
import io.dapr.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.Workflow;
/**

View File

@ -13,17 +13,34 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.DurableTaskGrpcWorker;
import io.dapr.durabletask.DurableTaskGrpcWorker;
import io.grpc.ManagedChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Contains methods to register workflows and activities.
*/
public class WorkflowRuntime implements AutoCloseable {
private DurableTaskGrpcWorker worker;
private final DurableTaskGrpcWorker worker;
private final ManagedChannel managedChannel;
private final ExecutorService executorService;
public WorkflowRuntime(DurableTaskGrpcWorker worker) {
/**
* Constructor.
*
* @param worker grpcWorker processing activities.
* @param managedChannel grpc channel.
* @param executorService executor service responsible for running the threads.
*/
public WorkflowRuntime(DurableTaskGrpcWorker worker,
ManagedChannel managedChannel,
ExecutorService executorService) {
this.worker = worker;
this.managedChannel = managedChannel;
this.executorService = executorService;
}
/**
@ -50,11 +67,31 @@ public class WorkflowRuntime implements AutoCloseable {
/**
* {@inheritDoc}
*/
@Override
public void close() {
if (this.worker != null) {
this.worker.close();
this.worker = null;
this.shutDownWorkerPool();
this.closeSideCarChannel();
}
private void closeSideCarChannel() {
this.managedChannel.shutdown();
try {
if (!this.managedChannel.awaitTermination(60, TimeUnit.SECONDS)) {
this.managedChannel.shutdownNow();
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
private void shutDownWorkerPool() {
this.executorService.shutdown();
try {
if (!this.executorService.awaitTermination(60, TimeUnit.SECONDS)) {
this.executorService.shutdownNow();
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}

View File

@ -13,8 +13,8 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;
import io.dapr.config.Properties;
import io.dapr.durabletask.DurableTaskGrpcWorkerBuilder;
import io.dapr.utils.NetworkUtils;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowActivity;
@ -26,6 +26,8 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class WorkflowRuntimeBuilder {
private static final ClientInterceptor WORKFLOW_INTERCEPTOR = new ApiTokenClientInterceptor();
@ -36,6 +38,8 @@ public class WorkflowRuntimeBuilder {
private final Set<String> activitySet = Collections.synchronizedSet(new HashSet<>());
private final Set<String> workflowSet = Collections.synchronizedSet(new HashSet<>());
private final DurableTaskGrpcWorkerBuilder builder;
private final ManagedChannel managedChannel;
private ExecutorService executorService;
/**
* Constructs the WorkflowRuntimeBuilder.
@ -58,8 +62,8 @@ public class WorkflowRuntimeBuilder {
}
private WorkflowRuntimeBuilder(Properties properties, Logger logger) {
ManagedChannel managedChannel = NetworkUtils.buildGrpcManagedChannel(properties, WORKFLOW_INTERCEPTOR);
this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(managedChannel);
this.managedChannel = NetworkUtils.buildGrpcManagedChannel(properties, WORKFLOW_INTERCEPTOR);
this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(this.managedChannel);
this.logger = logger;
}
@ -71,8 +75,11 @@ public class WorkflowRuntimeBuilder {
public WorkflowRuntime build() {
if (instance == null) {
synchronized (WorkflowRuntime.class) {
this.executorService = this.executorService == null ? Executors.newCachedThreadPool() : this.executorService;
if (instance == null) {
instance = new WorkflowRuntime(this.builder.build());
instance = new WorkflowRuntime(
this.builder.withExecutorService(this.executorService).build(),
this.managedChannel, this.executorService);
}
}
}
@ -84,6 +91,18 @@ public class WorkflowRuntimeBuilder {
return instance;
}
/**
* Register Executor Service to use with workflow.
*
* @param executorService to be used.
* @return {@link WorkflowRuntimeBuilder}.
*/
public WorkflowRuntimeBuilder withExecutorService(ExecutorService executorService) {
this.executorService = executorService;
this.builder.withExecutorService(executorService);
return this;
}
/**
* Registers a Workflow object.
*

View File

@ -13,7 +13,7 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import io.dapr.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.client.WorkflowRuntimeStatus;
public class WorkflowRuntimeStatusConverter {

View File

@ -13,11 +13,11 @@ limitations under the License.
package io.dapr.workflows;
import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskOptions;
import com.microsoft.durabletask.TaskOrchestrationContext;
import io.dapr.durabletask.CompositeTaskFailedException;
import io.dapr.durabletask.Task;
import io.dapr.durabletask.TaskCanceledException;
import io.dapr.durabletask.TaskOptions;
import io.dapr.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.runtime.DefaultWorkflowContext;

View File

@ -13,10 +13,10 @@ limitations under the License.
package io.dapr.workflows.client;
import com.microsoft.durabletask.DurableTaskClient;
import com.microsoft.durabletask.NewOrchestrationInstanceOptions;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import io.dapr.durabletask.DurableTaskClient;
import io.dapr.durabletask.NewOrchestrationInstanceOptions;
import io.dapr.durabletask.OrchestrationMetadata;
import io.dapr.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowStub;

View File

@ -13,9 +13,9 @@ limitations under the License.
package io.dapr.workflows.client;
import com.microsoft.durabletask.FailureDetails;
import com.microsoft.durabletask.OrchestrationMetadata;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import io.dapr.durabletask.FailureDetails;
import io.dapr.durabletask.OrchestrationMetadata;
import io.dapr.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.runtime.DefaultWorkflowInstanceStatus;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

View File

@ -1,6 +1,6 @@
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.TaskActivityContext;
import io.dapr.durabletask.TaskActivityContext;
import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;
import org.junit.Test;

View File

@ -1,6 +1,6 @@
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.TaskActivityContext;
import io.dapr.durabletask.TaskActivityContext;
import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;
import org.junit.Test;

View File

@ -13,7 +13,7 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.TaskOrchestrationContext;
import io.dapr.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowStub;

View File

@ -13,7 +13,7 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.TaskOrchestrationContext;
import io.dapr.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowStub;

View File

@ -67,7 +67,8 @@ public class WorkflowRuntimeBuilderTest {
@Test
public void buildTest() {
assertDoesNotThrow(() -> {
try (WorkflowRuntime runtime = new WorkflowRuntimeBuilder().build()) {
try {
WorkflowRuntime runtime = new WorkflowRuntimeBuilder().build();
System.out.println("WorkflowRuntime created");
} catch (Exception e) {
throw new RuntimeException(e);
@ -88,13 +89,11 @@ public class WorkflowRuntimeBuilderTest {
WorkflowRuntimeBuilder workflowRuntimeBuilder = new WorkflowRuntimeBuilder();
try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) {
WorkflowRuntime runtime = workflowRuntimeBuilder.build();
verify(testLogger, times(1))
.info(eq("Registered Workflow: {}"), eq("TestWorkflow"));
verify(testLogger, times(1))
.info(eq("Registered Activity: {}"), eq("TestActivity"));
}
}
}

View File

@ -13,7 +13,7 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.OrchestrationRuntimeStatus;
import io.dapr.durabletask.OrchestrationRuntimeStatus;
import io.dapr.workflows.client.WorkflowRuntimeStatus;
import org.junit.jupiter.api.Test;

View File

@ -14,10 +14,15 @@ limitations under the License.
package io.dapr.workflows.runtime;
import com.microsoft.durabletask.DurableTaskGrpcWorker;
import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;
import io.dapr.durabletask.DurableTaskGrpcWorker;
import io.dapr.durabletask.DurableTaskGrpcWorkerBuilder;
import io.dapr.config.Properties;
import io.dapr.utils.NetworkUtils;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
public class WorkflowRuntimeTest {
@ -25,15 +30,16 @@ public class WorkflowRuntimeTest {
@Test
public void startTest() {
DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder().build();
try (WorkflowRuntime runtime = new WorkflowRuntime(worker)) {
WorkflowRuntime runtime = new WorkflowRuntime(worker, NetworkUtils.buildGrpcManagedChannel(new Properties()),
Executors.newCachedThreadPool());
assertDoesNotThrow(() -> runtime.start(false));
}
}
@Test
public void closeWithoutStarting() {
DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder().build();
try (WorkflowRuntime runtime = new WorkflowRuntime(worker)) {
try (WorkflowRuntime runtime = new WorkflowRuntime(worker, NetworkUtils.buildGrpcManagedChannel(new Properties()),
Executors.newCachedThreadPool())) {
assertDoesNotThrow(runtime::close);
}
}