From 11ab77a82e62a2a308149be720cfbcae3188f8ab Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Thu, 25 Jan 2024 13:58:16 -0500 Subject: [PATCH] update howtos and patterns Signed-off-by: Hannah Hunter --- .../workflow/howto-author-workflow.md | 128 ++++++----------- .../workflow/howto-manage-workflow.md | 69 +++++++-- .../workflow/workflow-patterns.md | 133 ++++++++++++------ 3 files changed, 199 insertions(+), 131 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index 688ffecb9..00330236b 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -80,7 +80,7 @@ export default class WorkflowActivityContext { } ``` -[See the workflow activity in context.](todo) +[See the workflow activity in context.](https://github.com/dapr/js-sdk/blob/main/src/workflow/runtime/WorkflowActivityContext.ts) {{% /codetab %}} @@ -236,7 +236,7 @@ Next, register the workflow with the `WorkflowRuntime` class and start the workf export default class WorkflowRuntime { //.. - // Register workflow + // Register workflow implementation for handling orchestrations public registerWorkflow(workflow: TWorkflow): WorkflowRuntime { const name = getFunctionName(workflow); const workflowWrapper = (ctx: OrchestrationContext, input: any): any => { @@ -266,7 +266,7 @@ export default class WorkflowRuntime { } ``` -[See the `hello_world_wf` workflow in context.](todo) +[See the `WorkflowRuntime` in context.](https://github.com/dapr/js-sdk/blob/main/src/workflow/runtime/WorkflowRuntime.ts) {{% /codetab %}} @@ -446,48 +446,48 @@ if __name__ == '__main__': -[In the following example](todo), for a basic JavaScript hello world application using the Go SDK, your project code would include: +[The following example](https://github.com/dapr/js-sdk/blob/main/src/workflow/client/DaprWorkflowClient.ts) is a basic JavaScript application using the JavaScript SDK. As in this example, your project code would include: -- A JavaScript package called `todo` to receive the Go SDK capabilities. - A builder with extensions called: - `WorkflowRuntime`: Allows you to register workflows and workflow activities - `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}}) - `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}}) -- API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow. +- API calls. In the example below, these calls start, terminate, get status, pause, resume, raise event, and purge the workflow. ```javascript -import { TaskHubGrpcClient } from "kaibocai-durabletask-js"; -import * as grpc from "@grpc/grpc-js"; +import { TaskHubGrpcClient } from "@microsoft/durabletask-js"; import { WorkflowState } from "./WorkflowState"; -import { generateInterceptors } from "../internal/ApiTokenClientInterceptor"; -import { TWorkflow } from "../types/Workflow.type"; +import { generateApiTokenClientInterceptors, generateEndpoint, getDaprApiToken } from "../internal/index"; +import { TWorkflow } from "../../types/workflow/Workflow.type"; import { getFunctionName } from "../internal"; +import { WorkflowClientOptions } from "../../types/workflow/WorkflowClientOption"; -export default class WorkflowClient { +/** DaprWorkflowClient class defines client operations for managing workflow instances. */ + +export default class DaprWorkflowClient { private readonly _innerClient: TaskHubGrpcClient; - /** - * Initializes a new instance of the DaprWorkflowClient. - * @param {string | undefined} hostAddress - The address of the Dapr runtime hosting the workflow services. - * @param {grpc.ChannelOptions | undefined} options - Additional options for configuring the gRPC channel. + /** Initialize a new instance of the DaprWorkflowClient. */ - constructor(hostAddress?: string, options?: grpc.ChannelOptions) { - this._innerClient = this._buildInnerClient(hostAddress, options); + constructor(options: Partial = {}) { + const grpcEndpoint = generateEndpoint(options); + options.daprApiToken = getDaprApiToken(options); + this._innerClient = this.buildInnerClient(grpcEndpoint.endpoint, options); } - _buildInnerClient(hostAddress = "127.0.0.1:50001", options: grpc.ChannelOptions = {}): TaskHubGrpcClient { - const innerOptions = { - ...options, - interceptors: [generateInterceptors(), ...(options?.interceptors ?? [])], - }; + private buildInnerClient(hostAddress: string, options: Partial): TaskHubGrpcClient { + let innerOptions = options?.grpcOptions; + if (options.daprApiToken !== undefined && options.daprApiToken !== "") { + innerOptions = { + ...innerOptions, + interceptors: [generateApiTokenClientInterceptors(options), ...(innerOptions?.interceptors ?? [])], + }; + } return new TaskHubGrpcClient(hostAddress, innerOptions); } /** - * Schedules a new workflow using the DurableTask client. - * - * @param {TWorkflow | string} workflow - The Workflow or the name of the workflow to be scheduled. - * @return {Promise} A Promise resolving to the unique ID of the scheduled workflow instance. + * Schedule a new workflow using the DurableTask client. */ public async scheduleNewWorkflow( workflow: TWorkflow | string, @@ -502,7 +502,7 @@ export default class WorkflowClient { } /** - * Terminates the workflow associated with the provided instance id. + * Terminate the workflow associated with the provided instance id. * * @param {string} workflowInstanceId - Workflow instance id to terminate. * @param {any} output - The optional output to set for the terminated workflow instance. @@ -512,14 +512,7 @@ export default class WorkflowClient { } /** - * Fetches workflow instance metadata from the configured durable store. - * - * @param {string} workflowInstanceId - The unique identifier of the workflow instance to fetch. - * @param {boolean} getInputsAndOutputs - Indicates whether to fetch the workflow instance's - * inputs, outputs, and custom status (true) or omit them (false). - * @returns {Promise} A Promise that resolves to a metadata record describing - * the workflow instance and its execution status, or undefined - * if the instance is not found. + * Fetch workflow instance metadata from the configured durable store. */ public async getWorkflowState( workflowInstanceId: string, @@ -532,69 +525,43 @@ export default class WorkflowClient { } /** - * Waits for a workflow to start running and returns a {@link WorkflowState} object - * containing metadata about the started instance, and optionally, its input, output, - * and custom status payloads. - * - * A "started" workflow instance refers to any instance not in the Pending state. - * - * If a workflow instance is already running when this method is called, it returns immediately. - * - * @param {string} workflowInstanceId - The unique identifier of the workflow instance to wait for. - * @param {boolean} fetchPayloads - Indicates whether to fetch the workflow instance's - * inputs, outputs (true) or omit them (false). - * @param {number} timeout - The amount of time, in seconds, to wait for the workflow instance to start. - * @returns {Promise} A Promise that resolves to the workflow instance metadata - * or undefined if no such instance is found. + * Waits for a workflow to start running */ public async waitForWorkflowStart( workflowInstanceId: string, - fetchPayloads?: boolean, - timeout?: number, + fetchPayloads = true, + timeoutInSeconds = 60, ): Promise { - const state = await this._innerClient.waitForOrchestrationStart(workflowInstanceId, fetchPayloads, timeout); + const state = await this._innerClient.waitForOrchestrationStart( + workflowInstanceId, + fetchPayloads, + timeoutInSeconds, + ); if (state !== undefined) { return new WorkflowState(state); } } /** - * Waits for a workflow to complete running and returns a {@link WorkflowState} object - * containing metadata about the completed instance, and optionally, its input, output, - * and custom status payloads. - * - * A "completed" workflow instance refers to any instance in one of the terminal states. - * For example, the Completed, Failed, or Terminated states. - * - * If a workflow instance is already running when this method is called, it returns immediately. - * - * @param {string} workflowInstanceId - The unique identifier of the workflow instance to wait for. - * @param {boolean} fetchPayloads - Indicates whether to fetch the workflow instance's - * inputs, outputs (true) or omit them (false). - * @param {number} timeout - The amount of time, in seconds, to wait for the workflow instance to start. - * @returns {Promise} A Promise that resolves to the workflow instance metadata - * or undefined if no such instance is found. + * Waits for a workflow to complete running */ public async waitForWorkflowCompletion( workflowInstanceId: string, fetchPayloads = true, - timeout: number, + timeoutInSeconds = 60, ): Promise { - const state = await this._innerClient.waitForOrchestrationCompletion(workflowInstanceId, fetchPayloads, timeout); + const state = await this._innerClient.waitForOrchestrationCompletion( + workflowInstanceId, + fetchPayloads, + timeoutInSeconds, + ); if (state != undefined) { return new WorkflowState(state); } } /** - * Sends an event notification message to an awaiting workflow instance. - * - * This method triggers the specified event in a running workflow instance, - * allowing the workflow to respond to the event if it has defined event handlers. - * - * @param {string} workflowInstanceId - The unique identifier of the workflow instance that will handle the event. - * @param {string} eventName - The name of the event. Event names are case-insensitive. - * @param {any} [eventPayload] - An optional serializable data payload to include with the event. + * Sends an event notification message to an awaiting workflow instance */ public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) { this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload); @@ -602,11 +569,6 @@ export default class WorkflowClient { /** * Purges the workflow instance state from the workflow state store. - * - * This method removes the persisted state associated with a workflow instance from the state store. - * - * @param {string} workflowInstanceId - The unique identifier of the workflow instance to purge. - * @return {Promise} A Promise that resolves to true if the workflow state was found and purged successfully, otherwise false. */ public async purgeWorkflow(workflowInstanceId: string): Promise { const purgeResult = await this._innerClient.purgeOrchestration(workflowInstanceId); @@ -765,6 +727,6 @@ Now that you've authored a workflow, learn how to manage it. - [Workflow API reference]({{< ref workflow_api.md >}}) - Try out the full SDK examples: - [Python example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) - - [JavaScript example](todo) + - [JavaScript example](https://github.com/dapr/js-sdk/tree/main/src/workflow) - [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md index 0412b2606..148cc258d 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md @@ -67,16 +67,69 @@ d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponen {{% codetab %}} Manage your workflow within your code. In the workflow example from the [Author a workflow]({{< ref "howto-author-workflow.md#write-the-application" >}}) guide, the workflow is registered in the code using the following APIs: -- **start_workflow**: Start an instance of a workflow -- **get_workflow**: Get information on the status of the workflow -- **pause_workflow**: Pauses or suspends a workflow instance that can later be resumed -- **resume_workflow**: Resumes a paused workflow instance -- **raise_workflow_event**: Raise an event on a workflow -- **purge_workflow**: Removes all metadata related to a specific workflow instance -- **terminate_workflow**: Terminate or stop a particular instance of a workflow +- **client.workflow.start**: Start an instance of a workflow +- **client.workflow.get**: Get information on the status of the workflow +- **client.workflow.pause**: Pauses or suspends a workflow instance that can later be resumed +- **client.workflow.resume**: Resumes a paused workflow instance +- **client.workflow.purge**: Removes all metadata related to a specific workflow instance +- **client.workflow.terminate**: Terminate or stop a particular instance of a workflow ```javascript +import { DaprClient } from "@dapr/dapr"; +async function printWorkflowStatus(client: DaprClient, instanceId: string) { + const workflow = await client.workflow.get(instanceId); + console.log( + `Workflow ${workflow.workflowName}, created at ${workflow.createdAt.toUTCString()}, has status ${ + workflow.runtimeStatus + }`, + ); + console.log(`Additional properties: ${JSON.stringify(workflow.properties)}`); + console.log("--------------------------------------------------\n\n"); +} + +async function start() { + const client = new DaprClient(); + + // Start a new workflow instance + const instanceId = await client.workflow.start("OrderProcessingWorkflow", { + Name: "Paperclips", + TotalCost: 99.95, + Quantity: 4, + }); + console.log(`Started workflow instance ${instanceId}`); + await printWorkflowStatus(client, instanceId); + + // Pause a workflow instance + await client.workflow.pause(instanceId); + console.log(`Paused workflow instance ${instanceId}`); + await printWorkflowStatus(client, instanceId); + + // Resume a workflow instance + await client.workflow.resume(instanceId); + console.log(`Resumed workflow instance ${instanceId}`); + await printWorkflowStatus(client, instanceId); + + // Terminate a workflow instance + await client.workflow.terminate(instanceId); + console.log(`Terminated workflow instance ${instanceId}`); + await printWorkflowStatus(client, instanceId); + + // Wait for the workflow to complete, 30 seconds! + await new Promise((resolve) => setTimeout(resolve, 30000)); + await printWorkflowStatus(client, instanceId); + + // Purge a workflow instance + await client.workflow.purge(instanceId); + console.log(`Purged workflow instance ${instanceId}`); + // This will throw an error because the workflow instance no longer exists. + await printWorkflowStatus(client, instanceId); +} + +start().catch((e) => { + console.error(e); + process.exit(1); +}); ``` {{% /codetab %}} @@ -260,7 +313,7 @@ Learn more about these HTTP calls in the [workflow API reference guide]({{< ref - [Try out the Workflow quickstart]({{< ref workflow-quickstart.md >}}) - Try out the full SDK examples: - [Python example](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py) - - [JavaScript example](todo) + - [JavaScript example](https://github.com/dapr/js-sdk/tree/main/src/workflow) - [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) - [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index 74dd1aa48..a31d9588b 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -76,16 +76,20 @@ def error_handler(ctx, error): ```javascript -import WorkflowClient from "../client/WorkflowClient"; -import WorkflowActivityContext from "../runtime/WorkflowActivityContext"; -import WorkflowContext from "../runtime/WorkflowContext"; -import WorkflowRuntime from "../runtime/WorkflowRuntime"; -import { TWorkflow } from "../types/Workflow.type"; +import { DaprWorkflowClient, WorkflowActivityContext, WorkflowContext, WorkflowRuntime, TWorkflow } from "@dapr/dapr"; -(async () => { - const grpcEndpoint = "localhost:4001"; - const workflowClient = new WorkflowClient(grpcEndpoint); - const workflowRuntime = new WorkflowRuntime(grpcEndpoint); +async function start() { + // Update the gRPC client and worker to use a local address and port + const daprHost = "localhost"; + const daprPort = "50001"; + const workflowClient = new DaprWorkflowClient({ + daprHost, + daprPort, + }); + const workflowRuntime = new WorkflowRuntime({ + daprHost, + daprPort, + }); const hello = async (_: WorkflowActivityContext, name: string) => { return `Hello ${name}!`; @@ -96,7 +100,7 @@ import { TWorkflow } from "../types/Workflow.type"; const result1 = yield ctx.callActivity(hello, "Tokyo"); cities.push(result1); - const result2 = yield ctx.callActivity(hello, "Seattle"); // Correct the spelling of "Seattle" + const result2 = yield ctx.callActivity(hello, "Seattle"); cities.push(result2); const result3 = yield ctx.callActivity(hello, "London"); cities.push(result3); @@ -129,7 +133,15 @@ import { TWorkflow } from "../types/Workflow.type"; await workflowRuntime.stop(); await workflowClient.stop(); -})(); + + // stop the dapr side car + process.exit(0); +} + +start().catch((e) => { + console.error(e); + process.exit(1); +}); ``` {{% /codetab %}} @@ -294,19 +306,28 @@ def process_results(ctx, final_result: int): ```javascript -import { Task } from "kaibocai-durabletask-js/task/task"; -import WorkflowClient from "../client/WorkflowClient"; -import WorkflowActivityContext from "../runtime/WorkflowActivityContext"; -import WorkflowContext from "../runtime/WorkflowContext"; -import WorkflowRuntime from "../runtime/WorkflowRuntime"; -import { TWorkflow } from "../types/Workflow.type"; +import { + Task, + DaprWorkflowClient, + WorkflowActivityContext, + WorkflowContext, + WorkflowRuntime, + TWorkflow, +} from "@dapr/dapr"; // Wrap the entire code in an immediately-invoked async function -(async () => { +async function start() { // Update the gRPC client and worker to use a local address and port - const grpcServerAddress = "localhost:4001"; - const workflowClient: WorkflowClient = new WorkflowClient(grpcServerAddress); - const workflowRuntime: WorkflowRuntime = new WorkflowRuntime(grpcServerAddress); + const daprHost = "localhost"; + const daprPort = "50001"; + const workflowClient = new DaprWorkflowClient({ + daprHost, + daprPort, + }); + const workflowRuntime = new WorkflowRuntime({ + daprHost, + daprPort, + }); function getRandomInt(min: number, max: number): number { return Math.floor(Math.random() * (max - min + 1)) + min; @@ -332,6 +353,8 @@ import { TWorkflow } from "../types/Workflow.type"; await sleep(sleepTime); // Return a result for the given work item, which is also a random number in this case + // For more information about random numbers in workflow please check + // https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-code-constraints?tabs=csharp#random-numbers return Math.floor(Math.random() * 11); } @@ -374,7 +397,15 @@ import { TWorkflow } from "../types/Workflow.type"; // stop worker and client await workflowRuntime.stop(); await workflowClient.stop(); -})(); + + // stop the dapr side car + process.exit(0); +} + +start().catch((e) => { + console.error(e); + process.exit(1); +}); ``` {{% /codetab %}} @@ -767,16 +798,18 @@ def place_order(_, order: Order) -> None: ```javascript -import { Task } from "kaibocai-durabletask-js/task/task"; -import WorkflowClient from "../client/WorkflowClient"; -import WorkflowActivityContext from "../runtime/WorkflowActivityContext"; -import WorkflowContext from "../runtime/WorkflowContext"; -import WorkflowRuntime from "../runtime/WorkflowRuntime"; -import { TWorkflow } from "../types/Workflow.type"; +import { + Task, + DaprWorkflowClient, + WorkflowActivityContext, + WorkflowContext, + WorkflowRuntime, + TWorkflow, +} from "@dapr/dapr"; import * as readlineSync from "readline-sync"; // Wrap the entire code in an immediately-invoked async function -(async () => { +async function start() { class Order { cost: number; product: string; @@ -793,11 +826,18 @@ import * as readlineSync from "readline-sync"; } // Update the gRPC client and worker to use a local address and port - const grpcServerAddress = "localhost:4001"; - let workflowClient: WorkflowClient = new WorkflowClient(grpcServerAddress); - let workflowRuntime: WorkflowRuntime = new WorkflowRuntime(grpcServerAddress); + const daprHost = "localhost"; + const daprPort = "50001"; + const workflowClient = new DaprWorkflowClient({ + daprHost, + daprPort, + }); + const workflowRuntime = new WorkflowRuntime({ + daprHost, + daprPort, + }); - //Activity function that sends an approval request to the manager + // Activity function that sends an approval request to the manager const sendApprovalRequest = async (_: WorkflowActivityContext, order: Order) => { // Simulate some work that takes an amount of time await sleep(3000); @@ -858,12 +898,8 @@ import * as readlineSync from "readline-sync"; const id = await workflowClient.scheduleNewWorkflow(purchaseOrderWorkflow, order); console.log(`Orchestration scheduled with ID: ${id}`); - if (readlineSync.keyInYN("Press [Y] to approve the order... Y/yes, N/no")) { - const approvalEvent = { approver: approver }; - await workflowClient.raiseEvent(id, "approval_received", approvalEvent); - } else { - return "Order rejected"; - } + // prompt for approval asynchronously + promptForApproval(approver, workflowClient, id); // Wait for orchestration completion const state = await workflowClient.waitForWorkflowCompletion(id, undefined, timeout + 2); @@ -876,7 +912,24 @@ import * as readlineSync from "readline-sync"; // stop worker and client await workflowRuntime.stop(); await workflowClient.stop(); -})(); + + // stop the dapr side car + process.exit(0); +} + +async function promptForApproval(approver: string, workflowClient: DaprWorkflowClient, id: string) { + if (readlineSync.keyInYN("Press [Y] to approve the order... Y/yes, N/no")) { + const approvalEvent = { approver: approver }; + await workflowClient.raiseEvent(id, "approval_received", approvalEvent); + } else { + return "Order rejected"; + } +} + +start().catch((e) => { + console.error(e); + process.exit(1); +}); ``` {{% /codetab %}}