This commit is contained in:
salaboy 2025-09-01 11:00:41 -07:00 committed by GitHub
commit 2f6b64d7bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 37 additions and 7 deletions

View File

@ -47,7 +47,11 @@
<dependency>
<groupId>io.dapr</groupId>
<artifactId>durabletask-client</artifactId>
<version>1.5.7</version>
<version>1.5.10</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<!--
manually declare durabletask-client's jackson dependencies

View File

@ -21,10 +21,9 @@ import io.dapr.durabletask.OrchestrationMetadata;
import io.dapr.durabletask.PurgeResult;
import io.dapr.utils.NetworkUtils;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.internal.ApiTokenClientInterceptor;
import io.dapr.workflows.runtime.DefaultWorkflowInstanceStatus;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.opentelemetry.context.Context;
import javax.annotation.Nullable;
@ -37,7 +36,6 @@ import java.util.concurrent.TimeoutException;
*/
public class DaprWorkflowClient implements AutoCloseable {
private ClientInterceptor workflowApiTokenInterceptor;
private DurableTaskClient innerClient;
private ManagedChannel grpcChannel;
@ -54,7 +52,7 @@ public class DaprWorkflowClient implements AutoCloseable {
* @param properties Properties for the GRPC Channel.
*/
public DaprWorkflowClient(Properties properties) {
this(NetworkUtils.buildGrpcManagedChannel(properties, new ApiTokenClientInterceptor(properties)));
this(NetworkUtils.buildGrpcManagedChannel(properties));
}
/**
@ -100,6 +98,11 @@ public class DaprWorkflowClient implements AutoCloseable {
return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input);
}
public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, Object input, String instanceId,
Context context) {
return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input, instanceId, context);
}
/**
* Schedules a new workflow using DurableTask client.
*
@ -128,6 +131,23 @@ public class DaprWorkflowClient implements AutoCloseable {
orchestrationInstanceOptions);
}
/**
* Schedules a new workflow with a specified set of options for execution.
*
* @param <T> any Workflow type
* @param clazz Class extending Workflow to start an instance of.
* @param options the options for the new workflow, including input, instance ID, etc.
* @param context otel Context for trace propagation.
* @return the <code>instanceId</code> parameter value.
*/
public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, NewWorkflowOptions options,
Context context) {
NewOrchestrationInstanceOptions orchestrationInstanceOptions = fromNewWorkflowOptions(options);
return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(),
orchestrationInstanceOptions, context);
}
/**
* Suspend the workflow associated with the provided instance id.
*
@ -278,8 +298,10 @@ public class DaprWorkflowClient implements AutoCloseable {
* @return a new instance of a DurableTaskClient with a GRPC channel.
*/
private static DurableTaskClient createDurableTaskClient(ManagedChannel grpcChannel) {
return new DurableTaskGrpcClientBuilder()
.grpcChannel(grpcChannel)
.interceptor(new io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors())
.build();
}

View File

@ -15,12 +15,14 @@ package io.dapr.workflows.runtime;
import io.dapr.config.Properties;
import io.dapr.durabletask.DurableTaskGrpcWorkerBuilder;
import io.dapr.durabletask.interceptors.DaprWorkflowClientGrpcInterceptors;
import io.dapr.utils.NetworkUtils;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.internal.ApiTokenClientInterceptor;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.opentelemetry.context.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -65,7 +67,8 @@ public class WorkflowRuntimeBuilder {
private WorkflowRuntimeBuilder(Properties properties, Logger logger) {
this.workflowApiTokenInterceptor = new ApiTokenClientInterceptor(properties);
this.managedChannel = NetworkUtils.buildGrpcManagedChannel(properties, workflowApiTokenInterceptor);
this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(this.managedChannel);
this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(this.managedChannel)
.interceptors(new DaprWorkflowClientGrpcInterceptors());
this.logger = logger;
}
@ -77,7 +80,8 @@ public class WorkflowRuntimeBuilder {
public WorkflowRuntime build() {
if (instance == null) {
synchronized (WorkflowRuntime.class) {
this.executorService = this.executorService == null ? Executors.newCachedThreadPool() : this.executorService;
this.executorService = Context.taskWrapping(this.executorService == null ? Executors.newCachedThreadPool()
: this.executorService);
if (instance == null) {
instance = new WorkflowRuntime(
this.builder.withExecutorService(this.executorService).build(),