Merge pull request #3896 from hhunter-ms/issue_3869

[Workflow] Update for JavaScript SDK
This commit is contained in:
Hannah Hunter 2024-02-06 19:38:55 -05:00 committed by GitHub
commit f439ffc723
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 794 additions and 20 deletions

View File

@ -34,7 +34,7 @@ The Dapr sidecar doesnt load any workflow definitions. Rather, the sidecar si
[Workflow activities]({{< ref "workflow-features-concepts.md#workflow-activites" >}}) are the basic unit of work in a workflow and are the tasks that get orchestrated in the business process.
{{< tabs Python ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java >}}
{{% codetab %}}
@ -52,6 +52,37 @@ def hello_act(ctx: WorkflowActivityContext, input):
[See the `hello_act` workflow activity in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL40C1-L43C59)
{{% /codetab %}}
{{% codetab %}}
<!--javascript-->
Define the workflow activities you'd like your workflow to perform. Activities are wrapped in the `WorkflowActivityContext` class, which implements the workflow activities.
```javascript
export default class WorkflowActivityContext {
private readonly _innerContext: ActivityContext;
constructor(innerContext: ActivityContext) {
if (!innerContext) {
throw new Error("ActivityContext cannot be undefined");
}
this._innerContext = innerContext;
}
public getWorkflowInstanceId(): string {
return this._innerContext.orchestrationId;
}
public getWorkflowActivityId(): number {
return this._innerContext.taskId;
}
}
```
[See the workflow activity in context.](https://github.com/dapr/js-sdk/blob/main/src/workflow/runtime/WorkflowActivityContext.ts)
{{% /codetab %}}
{{% codetab %}}
@ -172,7 +203,7 @@ public class DemoWorkflowActivity implements WorkflowActivity {
Next, register and call the activites in a workflow.
{{< tabs Python ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java >}}
{{% codetab %}}
@ -193,6 +224,51 @@ def hello_world_wf(ctx: DaprWorkflowContext, input):
[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51)
{{% /codetab %}}
{{% codetab %}}
<!--javascript-->
Next, register the workflow with the `WorkflowRuntime` class and start the workflow runtime.
```javascript
export default class WorkflowRuntime {
//..
// Register workflow implementation for handling orchestrations
public registerWorkflow(workflow: TWorkflow): WorkflowRuntime {
const name = getFunctionName(workflow);
const workflowWrapper = (ctx: OrchestrationContext, input: any): any => {
const workflowContext = new WorkflowContext(ctx);
return workflow(workflowContext, input);
};
this.worker.addNamedOrchestrator(name, workflowWrapper);
return this;
}
// Register workflow activities
public registerActivity(fn: TWorkflowActivity<TInput, TOutput>): WorkflowRuntime {
const name = getFunctionName(fn);
const activityWrapper = (ctx: ActivityContext, intput: TInput): TOutput => {
const wfActivityContext = new WorkflowActivityContext(ctx);
return fn(wfActivityContext, intput);
};
this.worker.addNamedActivity(name, activityWrapper);
return this;
}
// Start the workflow runtime processing items and block.
public async start() {
await this.worker.start();
}
}
```
[See the `WorkflowRuntime` in context.](https://github.com/dapr/js-sdk/blob/main/src/workflow/runtime/WorkflowRuntime.ts)
{{% /codetab %}}
{{% codetab %}}
@ -275,7 +351,7 @@ public class DemoWorkflowWorker {
Finally, compose the application using the workflow.
{{< tabs Python ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java >}}
{{% codetab %}}
@ -364,6 +440,153 @@ if __name__ == '__main__':
```
{{% /codetab %}}
{{% codetab %}}
<!--javascript-->
[The following example](https://github.com/dapr/js-sdk/blob/main/src/workflow/client/DaprWorkflowClient.ts) is a basic JavaScript application using the JavaScript SDK. As in this example, your project code would include:
- A builder with extensions called:
- `WorkflowRuntime`: Allows you to register workflows and workflow activities
- `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}})
- `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}})
- API calls. In the example below, these calls start, terminate, get status, pause, resume, raise event, and purge the workflow.
```javascript
import { TaskHubGrpcClient } from "@microsoft/durabletask-js";
import { WorkflowState } from "./WorkflowState";
import { generateApiTokenClientInterceptors, generateEndpoint, getDaprApiToken } from "../internal/index";
import { TWorkflow } from "../../types/workflow/Workflow.type";
import { getFunctionName } from "../internal";
import { WorkflowClientOptions } from "../../types/workflow/WorkflowClientOption";
/** DaprWorkflowClient class defines client operations for managing workflow instances. */
export default class DaprWorkflowClient {
private readonly _innerClient: TaskHubGrpcClient;
/** Initialize a new instance of the DaprWorkflowClient.
*/
constructor(options: Partial<WorkflowClientOptions> = {}) {
const grpcEndpoint = generateEndpoint(options);
options.daprApiToken = getDaprApiToken(options);
this._innerClient = this.buildInnerClient(grpcEndpoint.endpoint, options);
}
private buildInnerClient(hostAddress: string, options: Partial<WorkflowClientOptions>): TaskHubGrpcClient {
let innerOptions = options?.grpcOptions;
if (options.daprApiToken !== undefined && options.daprApiToken !== "") {
innerOptions = {
...innerOptions,
interceptors: [generateApiTokenClientInterceptors(options), ...(innerOptions?.interceptors ?? [])],
};
}
return new TaskHubGrpcClient(hostAddress, innerOptions);
}
/**
* Schedule a new workflow using the DurableTask client.
*/
public async scheduleNewWorkflow(
workflow: TWorkflow | string,
input?: any,
instanceId?: string,
startAt?: Date,
): Promise<string> {
if (typeof workflow === "string") {
return await this._innerClient.scheduleNewOrchestration(workflow, input, instanceId, startAt);
}
return await this._innerClient.scheduleNewOrchestration(getFunctionName(workflow), input, instanceId, startAt);
}
/**
* Terminate the workflow associated with the provided instance id.
*
* @param {string} workflowInstanceId - Workflow instance id to terminate.
* @param {any} output - The optional output to set for the terminated workflow instance.
*/
public async terminateWorkflow(workflowInstanceId: string, output: any) {
await this._innerClient.terminateOrchestration(workflowInstanceId, output);
}
/**
* Fetch workflow instance metadata from the configured durable store.
*/
public async getWorkflowState(
workflowInstanceId: string,
getInputsAndOutputs: boolean,
): Promise<WorkflowState | undefined> {
const state = await this._innerClient.getOrchestrationState(workflowInstanceId, getInputsAndOutputs);
if (state !== undefined) {
return new WorkflowState(state);
}
}
/**
* Waits for a workflow to start running
*/
public async waitForWorkflowStart(
workflowInstanceId: string,
fetchPayloads = true,
timeoutInSeconds = 60,
): Promise<WorkflowState | undefined> {
const state = await this._innerClient.waitForOrchestrationStart(
workflowInstanceId,
fetchPayloads,
timeoutInSeconds,
);
if (state !== undefined) {
return new WorkflowState(state);
}
}
/**
* Waits for a workflow to complete running
*/
public async waitForWorkflowCompletion(
workflowInstanceId: string,
fetchPayloads = true,
timeoutInSeconds = 60,
): Promise<WorkflowState | undefined> {
const state = await this._innerClient.waitForOrchestrationCompletion(
workflowInstanceId,
fetchPayloads,
timeoutInSeconds,
);
if (state != undefined) {
return new WorkflowState(state);
}
}
/**
* Sends an event notification message to an awaiting workflow instance
*/
public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) {
this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload);
}
/**
* Purges the workflow instance state from the workflow state store.
*/
public async purgeWorkflow(workflowInstanceId: string): Promise<boolean> {
const purgeResult = await this._innerClient.purgeOrchestration(workflowInstanceId);
if (purgeResult !== undefined) {
return purgeResult.deletedInstanceCount > 0;
}
return false;
}
/**
* Closes the inner DurableTask client and shutdown the GRPC channel.
*/
public async stop() {
await this._innerClient.stop();
}
}
```
{{% /codetab %}}
{{% codetab %}}
@ -504,5 +727,6 @@ Now that you've authored a workflow, learn how to manage it.
- [Workflow API reference]({{< ref workflow_api.md >}})
- Try out the full SDK examples:
- [Python example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [JavaScript example](https://github.com/dapr/js-sdk/tree/main/examples/workflow)
- [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)

View File

@ -12,7 +12,7 @@ Dapr Workflow is currently in beta. [See known limitations for {{% dapr-latest-v
Now that you've [authored the workflow and its activities in your application]({{< ref howto-author-workflow.md >}}), you can start, terminate, and get information about the workflow using HTTP API calls. For more information, read the [workflow API reference]({{< ref workflow_api.md >}}).
{{< tabs Python ".NET" Java HTTP >}}
{{< tabs Python JavaScript ".NET" Java HTTP >}}
<!--Python-->
{{% codetab %}}
@ -63,6 +63,77 @@ d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponen
{{% /codetab %}}
<!--JavaScript-->
{{% codetab %}}
Manage your workflow within your code. In the workflow example from the [Author a workflow]({{< ref "howto-author-workflow.md#write-the-application" >}}) guide, the workflow is registered in the code using the following APIs:
- **client.workflow.start**: Start an instance of a workflow
- **client.workflow.get**: Get information on the status of the workflow
- **client.workflow.pause**: Pauses or suspends a workflow instance that can later be resumed
- **client.workflow.resume**: Resumes a paused workflow instance
- **client.workflow.purge**: Removes all metadata related to a specific workflow instance
- **client.workflow.terminate**: Terminate or stop a particular instance of a workflow
```javascript
import { DaprClient } from "@dapr/dapr";
async function printWorkflowStatus(client: DaprClient, instanceId: string) {
const workflow = await client.workflow.get(instanceId);
console.log(
`Workflow ${workflow.workflowName}, created at ${workflow.createdAt.toUTCString()}, has status ${
workflow.runtimeStatus
}`,
);
console.log(`Additional properties: ${JSON.stringify(workflow.properties)}`);
console.log("--------------------------------------------------\n\n");
}
async function start() {
const client = new DaprClient();
// Start a new workflow instance
const instanceId = await client.workflow.start("OrderProcessingWorkflow", {
Name: "Paperclips",
TotalCost: 99.95,
Quantity: 4,
});
console.log(`Started workflow instance ${instanceId}`);
await printWorkflowStatus(client, instanceId);
// Pause a workflow instance
await client.workflow.pause(instanceId);
console.log(`Paused workflow instance ${instanceId}`);
await printWorkflowStatus(client, instanceId);
// Resume a workflow instance
await client.workflow.resume(instanceId);
console.log(`Resumed workflow instance ${instanceId}`);
await printWorkflowStatus(client, instanceId);
// Terminate a workflow instance
await client.workflow.terminate(instanceId);
console.log(`Terminated workflow instance ${instanceId}`);
await printWorkflowStatus(client, instanceId);
// Wait for the workflow to complete, 30 seconds!
await new Promise((resolve) => setTimeout(resolve, 30000));
await printWorkflowStatus(client, instanceId);
// Purge a workflow instance
await client.workflow.purge(instanceId);
console.log(`Purged workflow instance ${instanceId}`);
// This will throw an error because the workflow instance no longer exists.
await printWorkflowStatus(client, instanceId);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
```
{{% /codetab %}}
<!--NET-->
{{% codetab %}}
@ -242,6 +313,7 @@ Learn more about these HTTP calls in the [workflow API reference guide]({{< ref
- [Try out the Workflow quickstart]({{< ref workflow-quickstart.md >}})
- Try out the full SDK examples:
- [Python example](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py)
- [JavaScript example](https://github.com/dapr/js-sdk/tree/main/examples/workflow)
- [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)

View File

@ -195,5 +195,6 @@ See the [Reminder usage and execution guarantees section]({{< ref "workflow-arch
- [Try out the Workflow quickstart]({{< ref workflow-quickstart.md >}})
- Try out the following examples:
- [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [JavaScript example](https://github.com/dapr/js-sdk/tree/main/examples/workflow)
- [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)

View File

@ -334,7 +334,7 @@ Failure to follow this rule could result in undefined behavior. Any background p
For example, instead of this:
{{< tabs ".NET" Java >}}
{{< tabs ".NET" Java JavaScript >}}
{{% codetab %}}
@ -357,11 +357,17 @@ ctx.createTimer(Duration.ofSeconds(5)).await();
{{% /codetab %}}
{{% codetab %}}
Don't declare JavaScript workflow as `async`. The Node.js runtime doesn't guarantee that asynchronous functions are deterministic.
{{% /codetab %}}
{{< /tabs >}}
Do this:
{{< tabs ".NET" Java >}}
{{< tabs ".NET" Java JavaScript >}}
{{% codetab %}}
@ -383,6 +389,12 @@ ctx.createTimer(Duration.ofSeconds(5)).await();
{{% /codetab %}}
{{% codetab %}}
Since the Node.js runtime doesn't guarantee that asynchronous functions are deterministic, always declare JavaScript workflow as synchronous generator functions.
{{% /codetab %}}
{{< /tabs >}}
@ -412,4 +424,8 @@ To work around these constraints:
- [Try out Dapr Workflow using the quickstart]({{< ref workflow-quickstart.md >}})
- [Workflow overview]({{< ref workflow-overview.md >}})
- [Workflow API reference]({{< ref workflow_api.md >}})
- [Try out the .NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- Try out the following examples:
- [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [JavaScript example](https://github.com/dapr/js-sdk/tree/main/examples/workflow)
- [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)

View File

@ -7,7 +7,7 @@ description: "Overview of Dapr Workflow"
---
{{% alert title="Note" color="primary" %}}
Dapr Workflow is currently in beta. [See known limitations for {{% dapr-latest-version cli="true" %}}]({{< ref "#limitations" >}}).
Dapr Workflow is currently in beta. [See known limitations]({{< ref "#limitations" >}}).
{{% /alert %}}
Dapr workflow makes it easy for developers to write business logic and integrations in a reliable way. Since Dapr workflows are stateful, they support long-running and fault-tolerant applications, ideal for orchestrating microservices. Dapr workflow works seamlessly with other Dapr building blocks, such as service invocation, pub/sub, state management, and bindings.
@ -80,6 +80,7 @@ You can use the following SDKs to author a workflow.
| Language stack | Package |
| - | - |
| Python | [dapr-ext-workflow](https://github.com/dapr/python-sdk/tree/master/ext/dapr-ext-workflow) |
| JavaScript | [DaprWorkflowClient](https://github.com/dapr/js-sdk/blob/main/src/workflow/client/DaprWorkflowClient.ts) |
| .NET | [Dapr.Workflow](https://www.nuget.org/profiles/dapr.io) |
| Java | [io.dapr.workflows](https://dapr.github.io/java-sdk/io/dapr/workflows/package-summary.html) |
@ -93,21 +94,19 @@ Want to put workflows to the test? Walk through the following quickstart and tut
| ------------------- | ----------- |
| [Workflow quickstart]({{< ref workflow-quickstart.md >}}) | Run a workflow application with four workflow activities to see Dapr Workflow in action |
| [Workflow Python SDK example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) | Learn how to create a Dapr Workflow and invoke it using the Python `DaprClient` package. |
| [Workflow JavaScript SDK example](https://github.com/dapr/js-sdk/tree/main/examples/workflow) | Learn how to create a Dapr Workflow and invoke it using the JavaScript SDK. |
| [Workflow .NET SDK example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) | Learn how to create a Dapr Workflow and invoke it using ASP.NET Core web APIs. |
| [Workflow Java SDK example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) | Learn how to create a Dapr Workflow and invoke it using the Java `io.dapr.workflows` package. |
### Start using workflows directly in your app
Want to skip the quickstarts? Not a problem. You can try out the workflow building block directly in your application. After [Dapr is installed]({{< ref install-dapr-cli.md >}}), you can begin using workflows, starting with [how to author a workflow]({{< ref howto-author-workflow.md >}}).
## Limitations
With Dapr Workflow in beta stage comes the following limitation(s):
- **State stores:** As of the 1.12.0 beta release of Dapr Workflow, using the NoSQL databases as a state store results in limitations around storing internal states. For example, CosmosDB has a maximum single operation item limit of only 100 states in a single request.
- **State stores:** For the {{% dapr-latest-version cli="true" %}} beta release of Dapr Workflow, using the NoSQL databases as a state store results in limitations around storing internal states. For example, CosmosDB has a maximum single operation item limit of only 100 states in a single request.
- **Horizontal scaling:** For the {{% dapr-latest-version cli="true" %}} beta release of Dapr Workflow, if you scale out Dapr sidecars or your application pods to more than 2, then the concurrency of the workflow execution drops. It is recommended to test with 1 or 2 instances, and no more than 2.
- **Horizontal scaling:** As of the 1.12.0 beta release of Dapr Workflow, if you scale out Dapr sidecars or your application pods to more than 2, then the concurrency of the workflow execution drops. It is recommended to test with 1 or 2 instances, and no more than 2.
## Watch the demo
@ -123,6 +122,7 @@ Watch [this video for an overview on Dapr Workflow](https://youtu.be/s1p9MNl4VGo
- [Workflow API reference]({{< ref workflow_api.md >}})
- Try out the full SDK examples:
- [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Python example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [JavaScript example](https://github.com/dapr/js-sdk/tree/main/examples/workflow)
- [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)

View File

@ -25,7 +25,7 @@ While the pattern is simple, there are many complexities hidden in the implement
Dapr Workflow solves these complexities by allowing you to implement the task chaining pattern concisely as a simple function in the programming language of your choice, as shown in the following example.
{{< tabs Python ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java >}}
{{% codetab %}}
<!--python-->
@ -72,6 +72,80 @@ def error_handler(ctx, error):
{{% /codetab %}}
{{% codetab %}}
<!--javascript-->
```javascript
import { DaprWorkflowClient, WorkflowActivityContext, WorkflowContext, WorkflowRuntime, TWorkflow } from "@dapr/dapr";
async function start() {
// Update the gRPC client and worker to use a local address and port
const daprHost = "localhost";
const daprPort = "50001";
const workflowClient = new DaprWorkflowClient({
daprHost,
daprPort,
});
const workflowRuntime = new WorkflowRuntime({
daprHost,
daprPort,
});
const hello = async (_: WorkflowActivityContext, name: string) => {
return `Hello ${name}!`;
};
const sequence: TWorkflow = async function* (ctx: WorkflowContext): any {
const cities: string[] = [];
const result1 = yield ctx.callActivity(hello, "Tokyo");
cities.push(result1);
const result2 = yield ctx.callActivity(hello, "Seattle");
cities.push(result2);
const result3 = yield ctx.callActivity(hello, "London");
cities.push(result3);
return cities;
};
workflowRuntime.registerWorkflow(sequence).registerActivity(hello);
// Wrap the worker startup in a try-catch block to handle any errors during startup
try {
await workflowRuntime.start();
console.log("Workflow runtime started successfully");
} catch (error) {
console.error("Error starting workflow runtime:", error);
}
// Schedule a new orchestration
try {
const id = await workflowClient.scheduleNewWorkflow(sequence);
console.log(`Orchestration scheduled with ID: ${id}`);
// Wait for orchestration completion
const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);
console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
} catch (error) {
console.error("Error scheduling or waiting for orchestration:", error);
}
await workflowRuntime.stop();
await workflowClient.stop();
// stop the dapr side car
process.exit(0);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
```
{{% /codetab %}}
{{% codetab %}}
<!--dotnet-->
@ -186,7 +260,7 @@ In addition to the challenges mentioned in [the previous pattern]({{< ref "workf
Dapr Workflows provides a way to express the fan-out/fan-in pattern as a simple function, as shown in the following example:
{{< tabs Python ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java >}}
{{% codetab %}}
<!--python-->
@ -228,6 +302,114 @@ def process_results(ctx, final_result: int):
{{% /codetab %}}
{{% codetab %}}
<!--javascript-->
```javascript
import {
Task,
DaprWorkflowClient,
WorkflowActivityContext,
WorkflowContext,
WorkflowRuntime,
TWorkflow,
} from "@dapr/dapr";
// Wrap the entire code in an immediately-invoked async function
async function start() {
// Update the gRPC client and worker to use a local address and port
const daprHost = "localhost";
const daprPort = "50001";
const workflowClient = new DaprWorkflowClient({
daprHost,
daprPort,
});
const workflowRuntime = new WorkflowRuntime({
daprHost,
daprPort,
});
function getRandomInt(min: number, max: number): number {
return Math.floor(Math.random() * (max - min + 1)) + min;
}
async function getWorkItemsActivity(_: WorkflowActivityContext): Promise<string[]> {
const count: number = getRandomInt(2, 10);
console.log(`generating ${count} work items...`);
const workItems: string[] = Array.from({ length: count }, (_, i) => `work item ${i}`);
return workItems;
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function processWorkItemActivity(context: WorkflowActivityContext, item: string): Promise<number> {
console.log(`processing work item: ${item}`);
// Simulate some work that takes a variable amount of time
const sleepTime = Math.random() * 5000;
await sleep(sleepTime);
// Return a result for the given work item, which is also a random number in this case
// For more information about random numbers in workflow please check
// https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-code-constraints?tabs=csharp#random-numbers
return Math.floor(Math.random() * 11);
}
const workflow: TWorkflow = async function* (ctx: WorkflowContext): any {
const tasks: Task<any>[] = [];
const workItems = yield ctx.callActivity(getWorkItemsActivity);
for (const workItem of workItems) {
tasks.push(ctx.callActivity(processWorkItemActivity, workItem));
}
const results: number[] = yield ctx.whenAll(tasks);
const sum: number = results.reduce((accumulator, currentValue) => accumulator + currentValue, 0);
return sum;
};
workflowRuntime.registerWorkflow(workflow);
workflowRuntime.registerActivity(getWorkItemsActivity);
workflowRuntime.registerActivity(processWorkItemActivity);
// Wrap the worker startup in a try-catch block to handle any errors during startup
try {
await workflowRuntime.start();
console.log("Worker started successfully");
} catch (error) {
console.error("Error starting worker:", error);
}
// Schedule a new orchestration
try {
const id = await workflowClient.scheduleNewWorkflow(workflow);
console.log(`Orchestration scheduled with ID: ${id}`);
// Wait for orchestration completion
const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);
console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
} catch (error) {
console.error("Error scheduling or waiting for orchestration:", error);
}
// stop worker and client
await workflowRuntime.stop();
await workflowClient.stop();
// stop the dapr side car
process.exit(0);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
```
{{% /codetab %}}
{{% codetab %}}
<!--dotnet-->
@ -379,7 +561,7 @@ Depending on the business needs, there may be a single monitor or there may be m
Dapr Workflow supports this pattern natively by allowing you to implement _eternal workflows_. Rather than writing infinite while-loops ([which is an anti-pattern]({{< ref "workflow-features-concepts.md#infinite-loops-and-eternal-workflows" >}})), Dapr Workflow exposes a _continue-as-new_ API that workflow authors can use to restart a workflow function from the beginning with a new input.
{{< tabs Python ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java >}}
{{% codetab %}}
<!--python-->
@ -428,6 +610,34 @@ def send_alert(ctx, message: str):
{{% /codetab %}}
{{% codetab %}}
<!--javascript-->
```javascript
const statusMonitorWorkflow: TWorkflow = async function* (ctx: WorkflowContext): any {
let duration;
const status = yield ctx.callActivity(checkStatusActivity);
if (status === "healthy") {
// Check less frequently when in a healthy state
// set duration to 1 hour
duration = 60 * 60;
} else {
yield ctx.callActivity(alertActivity, "job unhealthy");
// Check more frequently when in an unhealthy state
// set duration to 5 minutes
duration = 5 * 60;
}
// Put the workflow to sleep until the determined time
ctx.createTimer(duration);
// Restart from the beginning with the updated state
ctx.continueAsNew();
};
```
{{% /codetab %}}
{{% codetab %}}
<!--dotnet-->
@ -540,7 +750,7 @@ The following diagram illustrates this flow.
The following example code shows how this pattern can be implemented using Dapr Workflow.
{{< tabs Python ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java >}}
{{% codetab %}}
<!--python-->
@ -601,6 +811,146 @@ def place_order(_, order: Order) -> None:
{{% /codetab %}}
{{% codetab %}}
<!--javascript-->
```javascript
import {
Task,
DaprWorkflowClient,
WorkflowActivityContext,
WorkflowContext,
WorkflowRuntime,
TWorkflow,
} from "@dapr/dapr";
import * as readlineSync from "readline-sync";
// Wrap the entire code in an immediately-invoked async function
async function start() {
class Order {
cost: number;
product: string;
quantity: number;
constructor(cost: number, product: string, quantity: number) {
this.cost = cost;
this.product = product;
this.quantity = quantity;
}
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
// Update the gRPC client and worker to use a local address and port
const daprHost = "localhost";
const daprPort = "50001";
const workflowClient = new DaprWorkflowClient({
daprHost,
daprPort,
});
const workflowRuntime = new WorkflowRuntime({
daprHost,
daprPort,
});
// Activity function that sends an approval request to the manager
const sendApprovalRequest = async (_: WorkflowActivityContext, order: Order) => {
// Simulate some work that takes an amount of time
await sleep(3000);
console.log(`Sending approval request for order: ${order.product}`);
};
// Activity function that places an order
const placeOrder = async (_: WorkflowActivityContext, order: Order) => {
console.log(`Placing order: ${order.product}`);
};
// Orchestrator function that represents a purchase order workflow
const purchaseOrderWorkflow: TWorkflow = async function* (ctx: WorkflowContext, order: Order): any {
// Orders under $1000 are auto-approved
if (order.cost < 1000) {
return "Auto-approved";
}
// Orders of $1000 or more require manager approval
yield ctx.callActivity(sendApprovalRequest, order);
// Approvals must be received within 24 hours or they will be cancled.
const tasks: Task<any>[] = [];
const approvalEvent = ctx.waitForExternalEvent("approval_received");
const timeoutEvent = ctx.createTimer(24 * 60 * 60);
tasks.push(approvalEvent);
tasks.push(timeoutEvent);
const winner = ctx.whenAny(tasks);
if (winner == timeoutEvent) {
return "Cancelled";
}
yield ctx.callActivity(placeOrder, order);
const approvalDetails = approvalEvent.getResult();
return `Approved by ${approvalDetails.approver}`;
};
workflowRuntime
.registerWorkflow(purchaseOrderWorkflow)
.registerActivity(sendApprovalRequest)
.registerActivity(placeOrder);
// Wrap the worker startup in a try-catch block to handle any errors during startup
try {
await workflowRuntime.start();
console.log("Worker started successfully");
} catch (error) {
console.error("Error starting worker:", error);
}
// Schedule a new orchestration
try {
const cost = readlineSync.questionInt("Cost of your order:");
const approver = readlineSync.question("Approver of your order:");
const timeout = readlineSync.questionInt("Timeout for your order in seconds:");
const order = new Order(cost, "MyProduct", 1);
const id = await workflowClient.scheduleNewWorkflow(purchaseOrderWorkflow, order);
console.log(`Orchestration scheduled with ID: ${id}`);
// prompt for approval asynchronously
promptForApproval(approver, workflowClient, id);
// Wait for orchestration completion
const state = await workflowClient.waitForWorkflowCompletion(id, undefined, timeout + 2);
console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
} catch (error) {
console.error("Error scheduling or waiting for orchestration:", error);
}
// stop worker and client
await workflowRuntime.stop();
await workflowClient.stop();
// stop the dapr side car
process.exit(0);
}
async function promptForApproval(approver: string, workflowClient: DaprWorkflowClient, id: string) {
if (readlineSync.keyInYN("Press [Y] to approve the order... Y/yes, N/no")) {
const approvalEvent = { approver: approver };
await workflowClient.raiseEvent(id, "approval_received", approvalEvent);
} else {
return "Order rejected";
}
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
```
{{% /codetab %}}
{{% codetab %}}
<!--dotnet-->
@ -686,7 +1036,7 @@ public class ExternalSystemInteractionWorkflow extends Workflow {
The code that delivers the event to resume the workflow execution is external to the workflow. Workflow events can be delivered to a waiting workflow instance using the [raise event]({{< ref "howto-manage-workflow.md#raise-an-event" >}}) workflow management API, as shown in the following example:
{{< tabs Python ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java >}}
{{% codetab %}}
<!--python-->
@ -705,6 +1055,15 @@ with DaprClient() as d:
{{% /codetab %}}
{{% codetab %}}
<!--javascript-->
```javascript
// Raise the workflow event to the waiting workflow
```
{{% /codetab %}}
{{% codetab %}}
<!--dotnet-->
@ -744,5 +1103,6 @@ External events don't have to be directly triggered by humans. They can also be
- [Workflow API reference]({{< ref workflow_api.md >}})
- Try out the following examples:
- [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [JavaScript](https://github.com/dapr/js-sdk/tree/main/examples/workflow)
- [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)

View File

@ -21,7 +21,7 @@ In this guide, you'll:
<img src="/images/workflow-quickstart-overview.png" width=800 style="padding-bottom:15px;">
{{< tabs "Python" ".NET" "Java" >}}
{{< tabs "Python" "JavaScript" ".NET" "Java" >}}
<!-- Python -->
{{% codetab %}}
@ -265,6 +265,107 @@ In `workflow.py`, the workflow is defined as a class with all of its associated
message=f'Order {order_id} has completed!'))
return OrderResult(processed=True)
```
{{% /codetab %}}
<!-- JavaScript -->
{{% codetab %}}
The `order-processor` console app starts and manages the lifecycle of an order processing workflow that stores and retrieves data in a state store. The workflow consists of four workflow activities, or tasks:
- `NotifyActivity`: Utilizes a logger to print out messages throughout the workflow
- `ReserveInventoryActivity`: Checks the state store to ensure that there is enough inventory for the purchase
- `ProcessPaymentActivity`: Processes and authorizes the payment
- `UpdateInventoryActivity`: Removes the requested items from the state store and updates the store with the new remaining inventory value
### Step 1: Pre-requisites
For this example, you will need:
- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started).
- [Latest Node.js installed](https://nodejs.org/download/).
<!-- IGNORE_LINKS -->
- [Docker Desktop](https://www.docker.com/products/docker-desktop)
<!-- END_IGNORE -->
### Step 2: Set up the environment
Clone the [sample provided in the Quickstarts repo](https://github.com/dapr/quickstarts/tree/master/workflows).
```bash
git clone https://github.com/dapr/quickstarts.git
```
In a new terminal window, navigate to the `order-processor` directory:
```bash
cd workflows/javascript/sdk/order-processor
```
### Step 3: Run the order processor app
In the terminal, start the order processor app alongside a Dapr sidecar:
```bash
dapr run
```
This starts the `order-processor` app with unique workflow ID and runs the workflow activities.
Expected output:
```
```
### (Optional) Step 4: View in Zipkin
Running `dapr init` launches the [openzipkin/zipkin](https://hub.docker.com/r/openzipkin/zipkin/) Docker container. If the container has stopped running, launch the Zipkin Docker container with the following command:
```
docker run -d -p 9411:9411 openzipkin/zipkin
```
View the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`).
<img src="/images/workflow-trace-spans-zipkin.png" width=800 style="padding-bottom:15px;">
### What happened?
When you ran `dapr run `:
1. A unique order ID for the workflow is generated (in the above example, `6d2abcc9`) and the workflow is scheduled.
1. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received.
1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock.
1. Your workflow starts and notifies you of its status.
1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `6d2abcc9` and confirms if successful.
1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed.
1. The `NotifyActivity` workflow activity sends a notification saying that order `6d2abcc9` has completed.
1. The workflow terminates as completed.
#### `order-processor/index.js`
In the application's program file:
- The unique workflow order ID is generated
- The workflow is scheduled
- The workflow status is retrieved
- The workflow and the workflow activities it invokes are registered
```javascript
```
#### `order-processor/Workflows/OrderProcessingWorkflow.js`
In `OrderProcessingWorkflow.js`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities).
```javascript
```
#### `order-processor/Activities` directory
The `Activities` directory holds the four workflow activities used by the workflow, defined in the following files:
{{% /codetab %}}
<!-- .NET -->