update howtos and patterns

Signed-off-by: Hannah Hunter <hannahhunter@microsoft.com>
This commit is contained in:
Hannah Hunter 2024-01-25 13:58:16 -05:00
parent 3c9c255f88
commit 11ab77a82e
3 changed files with 199 additions and 131 deletions

View File

@ -80,7 +80,7 @@ export default class WorkflowActivityContext {
}
```
[See the workflow activity in context.](todo)
[See the workflow activity in context.](https://github.com/dapr/js-sdk/blob/main/src/workflow/runtime/WorkflowActivityContext.ts)
{{% /codetab %}}
@ -236,7 +236,7 @@ Next, register the workflow with the `WorkflowRuntime` class and start the workf
export default class WorkflowRuntime {
//..
// Register workflow
// Register workflow implementation for handling orchestrations
public registerWorkflow(workflow: TWorkflow): WorkflowRuntime {
const name = getFunctionName(workflow);
const workflowWrapper = (ctx: OrchestrationContext, input: any): any => {
@ -266,7 +266,7 @@ export default class WorkflowRuntime {
}
```
[See the `hello_world_wf` workflow in context.](todo)
[See the `WorkflowRuntime` in context.](https://github.com/dapr/js-sdk/blob/main/src/workflow/runtime/WorkflowRuntime.ts)
{{% /codetab %}}
@ -446,48 +446,48 @@ if __name__ == '__main__':
<!--javascript-->
[In the following example](todo), for a basic JavaScript hello world application using the Go SDK, your project code would include:
[The following example](https://github.com/dapr/js-sdk/blob/main/src/workflow/client/DaprWorkflowClient.ts) is a basic JavaScript application using the JavaScript SDK. As in this example, your project code would include:
- A JavaScript package called `todo` to receive the Go SDK capabilities.
- A builder with extensions called:
- `WorkflowRuntime`: Allows you to register workflows and workflow activities
- `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}})
- `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}})
- API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow.
- API calls. In the example below, these calls start, terminate, get status, pause, resume, raise event, and purge the workflow.
```javascript
import { TaskHubGrpcClient } from "kaibocai-durabletask-js";
import * as grpc from "@grpc/grpc-js";
import { TaskHubGrpcClient } from "@microsoft/durabletask-js";
import { WorkflowState } from "./WorkflowState";
import { generateInterceptors } from "../internal/ApiTokenClientInterceptor";
import { TWorkflow } from "../types/Workflow.type";
import { generateApiTokenClientInterceptors, generateEndpoint, getDaprApiToken } from "../internal/index";
import { TWorkflow } from "../../types/workflow/Workflow.type";
import { getFunctionName } from "../internal";
import { WorkflowClientOptions } from "../../types/workflow/WorkflowClientOption";
export default class WorkflowClient {
/** DaprWorkflowClient class defines client operations for managing workflow instances. */
export default class DaprWorkflowClient {
private readonly _innerClient: TaskHubGrpcClient;
/**
* Initializes a new instance of the DaprWorkflowClient.
* @param {string | undefined} hostAddress - The address of the Dapr runtime hosting the workflow services.
* @param {grpc.ChannelOptions | undefined} options - Additional options for configuring the gRPC channel.
/** Initialize a new instance of the DaprWorkflowClient.
*/
constructor(hostAddress?: string, options?: grpc.ChannelOptions) {
this._innerClient = this._buildInnerClient(hostAddress, options);
constructor(options: Partial<WorkflowClientOptions> = {}) {
const grpcEndpoint = generateEndpoint(options);
options.daprApiToken = getDaprApiToken(options);
this._innerClient = this.buildInnerClient(grpcEndpoint.endpoint, options);
}
_buildInnerClient(hostAddress = "127.0.0.1:50001", options: grpc.ChannelOptions = {}): TaskHubGrpcClient {
const innerOptions = {
...options,
interceptors: [generateInterceptors(), ...(options?.interceptors ?? [])],
};
private buildInnerClient(hostAddress: string, options: Partial<WorkflowClientOptions>): TaskHubGrpcClient {
let innerOptions = options?.grpcOptions;
if (options.daprApiToken !== undefined && options.daprApiToken !== "") {
innerOptions = {
...innerOptions,
interceptors: [generateApiTokenClientInterceptors(options), ...(innerOptions?.interceptors ?? [])],
};
}
return new TaskHubGrpcClient(hostAddress, innerOptions);
}
/**
* Schedules a new workflow using the DurableTask client.
*
* @param {TWorkflow | string} workflow - The Workflow or the name of the workflow to be scheduled.
* @return {Promise<string>} A Promise resolving to the unique ID of the scheduled workflow instance.
* Schedule a new workflow using the DurableTask client.
*/
public async scheduleNewWorkflow(
workflow: TWorkflow | string,
@ -502,7 +502,7 @@ export default class WorkflowClient {
}
/**
* Terminates the workflow associated with the provided instance id.
* Terminate the workflow associated with the provided instance id.
*
* @param {string} workflowInstanceId - Workflow instance id to terminate.
* @param {any} output - The optional output to set for the terminated workflow instance.
@ -512,14 +512,7 @@ export default class WorkflowClient {
}
/**
* Fetches workflow instance metadata from the configured durable store.
*
* @param {string} workflowInstanceId - The unique identifier of the workflow instance to fetch.
* @param {boolean} getInputsAndOutputs - Indicates whether to fetch the workflow instance's
* inputs, outputs, and custom status (true) or omit them (false).
* @returns {Promise<WorkflowState | undefined>} A Promise that resolves to a metadata record describing
* the workflow instance and its execution status, or undefined
* if the instance is not found.
* Fetch workflow instance metadata from the configured durable store.
*/
public async getWorkflowState(
workflowInstanceId: string,
@ -532,69 +525,43 @@ export default class WorkflowClient {
}
/**
* Waits for a workflow to start running and returns a {@link WorkflowState} object
* containing metadata about the started instance, and optionally, its input, output,
* and custom status payloads.
*
* A "started" workflow instance refers to any instance not in the Pending state.
*
* If a workflow instance is already running when this method is called, it returns immediately.
*
* @param {string} workflowInstanceId - The unique identifier of the workflow instance to wait for.
* @param {boolean} fetchPayloads - Indicates whether to fetch the workflow instance's
* inputs, outputs (true) or omit them (false).
* @param {number} timeout - The amount of time, in seconds, to wait for the workflow instance to start.
* @returns {Promise<WorkflowState | undefined>} A Promise that resolves to the workflow instance metadata
* or undefined if no such instance is found.
* Waits for a workflow to start running
*/
public async waitForWorkflowStart(
workflowInstanceId: string,
fetchPayloads?: boolean,
timeout?: number,
fetchPayloads = true,
timeoutInSeconds = 60,
): Promise<WorkflowState | undefined> {
const state = await this._innerClient.waitForOrchestrationStart(workflowInstanceId, fetchPayloads, timeout);
const state = await this._innerClient.waitForOrchestrationStart(
workflowInstanceId,
fetchPayloads,
timeoutInSeconds,
);
if (state !== undefined) {
return new WorkflowState(state);
}
}
/**
* Waits for a workflow to complete running and returns a {@link WorkflowState} object
* containing metadata about the completed instance, and optionally, its input, output,
* and custom status payloads.
*
* A "completed" workflow instance refers to any instance in one of the terminal states.
* For example, the Completed, Failed, or Terminated states.
*
* If a workflow instance is already running when this method is called, it returns immediately.
*
* @param {string} workflowInstanceId - The unique identifier of the workflow instance to wait for.
* @param {boolean} fetchPayloads - Indicates whether to fetch the workflow instance's
* inputs, outputs (true) or omit them (false).
* @param {number} timeout - The amount of time, in seconds, to wait for the workflow instance to start.
* @returns {Promise<WorkflowState | undefined>} A Promise that resolves to the workflow instance metadata
* or undefined if no such instance is found.
* Waits for a workflow to complete running
*/
public async waitForWorkflowCompletion(
workflowInstanceId: string,
fetchPayloads = true,
timeout: number,
timeoutInSeconds = 60,
): Promise<WorkflowState | undefined> {
const state = await this._innerClient.waitForOrchestrationCompletion(workflowInstanceId, fetchPayloads, timeout);
const state = await this._innerClient.waitForOrchestrationCompletion(
workflowInstanceId,
fetchPayloads,
timeoutInSeconds,
);
if (state != undefined) {
return new WorkflowState(state);
}
}
/**
* Sends an event notification message to an awaiting workflow instance.
*
* This method triggers the specified event in a running workflow instance,
* allowing the workflow to respond to the event if it has defined event handlers.
*
* @param {string} workflowInstanceId - The unique identifier of the workflow instance that will handle the event.
* @param {string} eventName - The name of the event. Event names are case-insensitive.
* @param {any} [eventPayload] - An optional serializable data payload to include with the event.
* Sends an event notification message to an awaiting workflow instance
*/
public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) {
this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload);
@ -602,11 +569,6 @@ export default class WorkflowClient {
/**
* Purges the workflow instance state from the workflow state store.
*
* This method removes the persisted state associated with a workflow instance from the state store.
*
* @param {string} workflowInstanceId - The unique identifier of the workflow instance to purge.
* @return {Promise<boolean>} A Promise that resolves to true if the workflow state was found and purged successfully, otherwise false.
*/
public async purgeWorkflow(workflowInstanceId: string): Promise<boolean> {
const purgeResult = await this._innerClient.purgeOrchestration(workflowInstanceId);
@ -765,6 +727,6 @@ Now that you've authored a workflow, learn how to manage it.
- [Workflow API reference]({{< ref workflow_api.md >}})
- Try out the full SDK examples:
- [Python example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [JavaScript example](todo)
- [JavaScript example](https://github.com/dapr/js-sdk/tree/main/src/workflow)
- [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)

View File

@ -67,16 +67,69 @@ d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponen
{{% codetab %}}
Manage your workflow within your code. In the workflow example from the [Author a workflow]({{< ref "howto-author-workflow.md#write-the-application" >}}) guide, the workflow is registered in the code using the following APIs:
- **start_workflow**: Start an instance of a workflow
- **get_workflow**: Get information on the status of the workflow
- **pause_workflow**: Pauses or suspends a workflow instance that can later be resumed
- **resume_workflow**: Resumes a paused workflow instance
- **raise_workflow_event**: Raise an event on a workflow
- **purge_workflow**: Removes all metadata related to a specific workflow instance
- **terminate_workflow**: Terminate or stop a particular instance of a workflow
- **client.workflow.start**: Start an instance of a workflow
- **client.workflow.get**: Get information on the status of the workflow
- **client.workflow.pause**: Pauses or suspends a workflow instance that can later be resumed
- **client.workflow.resume**: Resumes a paused workflow instance
- **client.workflow.purge**: Removes all metadata related to a specific workflow instance
- **client.workflow.terminate**: Terminate or stop a particular instance of a workflow
```javascript
import { DaprClient } from "@dapr/dapr";
async function printWorkflowStatus(client: DaprClient, instanceId: string) {
const workflow = await client.workflow.get(instanceId);
console.log(
`Workflow ${workflow.workflowName}, created at ${workflow.createdAt.toUTCString()}, has status ${
workflow.runtimeStatus
}`,
);
console.log(`Additional properties: ${JSON.stringify(workflow.properties)}`);
console.log("--------------------------------------------------\n\n");
}
async function start() {
const client = new DaprClient();
// Start a new workflow instance
const instanceId = await client.workflow.start("OrderProcessingWorkflow", {
Name: "Paperclips",
TotalCost: 99.95,
Quantity: 4,
});
console.log(`Started workflow instance ${instanceId}`);
await printWorkflowStatus(client, instanceId);
// Pause a workflow instance
await client.workflow.pause(instanceId);
console.log(`Paused workflow instance ${instanceId}`);
await printWorkflowStatus(client, instanceId);
// Resume a workflow instance
await client.workflow.resume(instanceId);
console.log(`Resumed workflow instance ${instanceId}`);
await printWorkflowStatus(client, instanceId);
// Terminate a workflow instance
await client.workflow.terminate(instanceId);
console.log(`Terminated workflow instance ${instanceId}`);
await printWorkflowStatus(client, instanceId);
// Wait for the workflow to complete, 30 seconds!
await new Promise((resolve) => setTimeout(resolve, 30000));
await printWorkflowStatus(client, instanceId);
// Purge a workflow instance
await client.workflow.purge(instanceId);
console.log(`Purged workflow instance ${instanceId}`);
// This will throw an error because the workflow instance no longer exists.
await printWorkflowStatus(client, instanceId);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
```
{{% /codetab %}}
@ -260,7 +313,7 @@ Learn more about these HTTP calls in the [workflow API reference guide]({{< ref
- [Try out the Workflow quickstart]({{< ref workflow-quickstart.md >}})
- Try out the full SDK examples:
- [Python example](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py)
- [JavaScript example](todo)
- [JavaScript example](https://github.com/dapr/js-sdk/tree/main/src/workflow)
- [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)

View File

@ -76,16 +76,20 @@ def error_handler(ctx, error):
<!--javascript-->
```javascript
import WorkflowClient from "../client/WorkflowClient";
import WorkflowActivityContext from "../runtime/WorkflowActivityContext";
import WorkflowContext from "../runtime/WorkflowContext";
import WorkflowRuntime from "../runtime/WorkflowRuntime";
import { TWorkflow } from "../types/Workflow.type";
import { DaprWorkflowClient, WorkflowActivityContext, WorkflowContext, WorkflowRuntime, TWorkflow } from "@dapr/dapr";
(async () => {
const grpcEndpoint = "localhost:4001";
const workflowClient = new WorkflowClient(grpcEndpoint);
const workflowRuntime = new WorkflowRuntime(grpcEndpoint);
async function start() {
// Update the gRPC client and worker to use a local address and port
const daprHost = "localhost";
const daprPort = "50001";
const workflowClient = new DaprWorkflowClient({
daprHost,
daprPort,
});
const workflowRuntime = new WorkflowRuntime({
daprHost,
daprPort,
});
const hello = async (_: WorkflowActivityContext, name: string) => {
return `Hello ${name}!`;
@ -96,7 +100,7 @@ import { TWorkflow } from "../types/Workflow.type";
const result1 = yield ctx.callActivity(hello, "Tokyo");
cities.push(result1);
const result2 = yield ctx.callActivity(hello, "Seattle"); // Correct the spelling of "Seattle"
const result2 = yield ctx.callActivity(hello, "Seattle");
cities.push(result2);
const result3 = yield ctx.callActivity(hello, "London");
cities.push(result3);
@ -129,7 +133,15 @@ import { TWorkflow } from "../types/Workflow.type";
await workflowRuntime.stop();
await workflowClient.stop();
})();
// stop the dapr side car
process.exit(0);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
```
{{% /codetab %}}
@ -294,19 +306,28 @@ def process_results(ctx, final_result: int):
<!--javascript-->
```javascript
import { Task } from "kaibocai-durabletask-js/task/task";
import WorkflowClient from "../client/WorkflowClient";
import WorkflowActivityContext from "../runtime/WorkflowActivityContext";
import WorkflowContext from "../runtime/WorkflowContext";
import WorkflowRuntime from "../runtime/WorkflowRuntime";
import { TWorkflow } from "../types/Workflow.type";
import {
Task,
DaprWorkflowClient,
WorkflowActivityContext,
WorkflowContext,
WorkflowRuntime,
TWorkflow,
} from "@dapr/dapr";
// Wrap the entire code in an immediately-invoked async function
(async () => {
async function start() {
// Update the gRPC client and worker to use a local address and port
const grpcServerAddress = "localhost:4001";
const workflowClient: WorkflowClient = new WorkflowClient(grpcServerAddress);
const workflowRuntime: WorkflowRuntime = new WorkflowRuntime(grpcServerAddress);
const daprHost = "localhost";
const daprPort = "50001";
const workflowClient = new DaprWorkflowClient({
daprHost,
daprPort,
});
const workflowRuntime = new WorkflowRuntime({
daprHost,
daprPort,
});
function getRandomInt(min: number, max: number): number {
return Math.floor(Math.random() * (max - min + 1)) + min;
@ -332,6 +353,8 @@ import { TWorkflow } from "../types/Workflow.type";
await sleep(sleepTime);
// Return a result for the given work item, which is also a random number in this case
// For more information about random numbers in workflow please check
// https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-code-constraints?tabs=csharp#random-numbers
return Math.floor(Math.random() * 11);
}
@ -374,7 +397,15 @@ import { TWorkflow } from "../types/Workflow.type";
// stop worker and client
await workflowRuntime.stop();
await workflowClient.stop();
})();
// stop the dapr side car
process.exit(0);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
```
{{% /codetab %}}
@ -767,16 +798,18 @@ def place_order(_, order: Order) -> None:
<!--javascript-->
```javascript
import { Task } from "kaibocai-durabletask-js/task/task";
import WorkflowClient from "../client/WorkflowClient";
import WorkflowActivityContext from "../runtime/WorkflowActivityContext";
import WorkflowContext from "../runtime/WorkflowContext";
import WorkflowRuntime from "../runtime/WorkflowRuntime";
import { TWorkflow } from "../types/Workflow.type";
import {
Task,
DaprWorkflowClient,
WorkflowActivityContext,
WorkflowContext,
WorkflowRuntime,
TWorkflow,
} from "@dapr/dapr";
import * as readlineSync from "readline-sync";
// Wrap the entire code in an immediately-invoked async function
(async () => {
async function start() {
class Order {
cost: number;
product: string;
@ -793,11 +826,18 @@ import * as readlineSync from "readline-sync";
}
// Update the gRPC client and worker to use a local address and port
const grpcServerAddress = "localhost:4001";
let workflowClient: WorkflowClient = new WorkflowClient(grpcServerAddress);
let workflowRuntime: WorkflowRuntime = new WorkflowRuntime(grpcServerAddress);
const daprHost = "localhost";
const daprPort = "50001";
const workflowClient = new DaprWorkflowClient({
daprHost,
daprPort,
});
const workflowRuntime = new WorkflowRuntime({
daprHost,
daprPort,
});
//Activity function that sends an approval request to the manager
// Activity function that sends an approval request to the manager
const sendApprovalRequest = async (_: WorkflowActivityContext, order: Order) => {
// Simulate some work that takes an amount of time
await sleep(3000);
@ -858,12 +898,8 @@ import * as readlineSync from "readline-sync";
const id = await workflowClient.scheduleNewWorkflow(purchaseOrderWorkflow, order);
console.log(`Orchestration scheduled with ID: ${id}`);
if (readlineSync.keyInYN("Press [Y] to approve the order... Y/yes, N/no")) {
const approvalEvent = { approver: approver };
await workflowClient.raiseEvent(id, "approval_received", approvalEvent);
} else {
return "Order rejected";
}
// prompt for approval asynchronously
promptForApproval(approver, workflowClient, id);
// Wait for orchestration completion
const state = await workflowClient.waitForWorkflowCompletion(id, undefined, timeout + 2);
@ -876,7 +912,24 @@ import * as readlineSync from "readline-sync";
// stop worker and client
await workflowRuntime.stop();
await workflowClient.stop();
})();
// stop the dapr side car
process.exit(0);
}
async function promptForApproval(approver: string, workflowClient: DaprWorkflowClient, id: string) {
if (readlineSync.keyInYN("Press [Y] to approve the order... Y/yes, N/no")) {
const approvalEvent = { approver: approver };
await workflowClient.raiseEvent(id, "approval_received", approvalEvent);
} else {
return "Order rejected";
}
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
```
{{% /codetab %}}