---
type: docs
title: Workflow patterns
linkTitle: Workflow patterns
weight: 3000
description: "Write different types of workflow patterns"
---
Dapr Workflows simplify complex, stateful coordination requirements in microservice architectures. The following sections describe several application patterns that can benefit from Dapr Workflows.
## Task chaining
In the task chaining pattern, multiple steps in a workflow are run in succession, and the output of one step may be passed as the input to the next step. Task chaining workflows typically involve creating a sequence of operations that need to be performed on some data, such as filtering, transforming, and reducing.
In some cases, the steps of the workflow may need to be orchestrated across multiple microservices. For increased reliability and scalability, you're also likely to use queues to trigger the various steps.
While the pattern is simple, there are many complexities hidden in the implementation. For example:
- What happens if one of the microservices are unavailable for an extended period of time?
- Can failed steps be automatically retried?
- If not, how do you facilitate the rollback of previously completed steps, if applicable?
- Implementation details aside, is there a way to visualize the workflow so that other engineers can understand what it does and how it works?
Dapr Workflow solves these complexities by allowing you to implement the task chaining pattern concisely as a simple function in the programming language of your choice, as shown in the following example.
{{< tabs Python JavaScript ".NET" Java Go >}}
{{% codetab %}}
```python
import dapr.ext.workflow as wf
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
result2 = yield ctx.call_activity(step2, input=result1)
result3 = yield ctx.call_activity(step3, input=result2)
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
return [result1, result2, result3]
def step1(ctx, activity_input):
print(f'Step 1: Received input: {activity_input}.')
# Do some work
return activity_input + 1
def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
return activity_input * 2
def step3(ctx, activity_input):
print(f'Step 3: Received input: {activity_input}.')
# Do some work
return activity_input ^ 2
def error_handler(ctx, error):
print(f'Executing error handler: {error}.')
# Do some compensating work
```
> **Note** Workflow retry policies will be available in a future version of the Python SDK.
{{% /codetab %}}
{{% codetab %}}
```javascript
import { DaprWorkflowClient, WorkflowActivityContext, WorkflowContext, WorkflowRuntime, TWorkflow } from "@dapr/dapr";
async function start() {
// Update the gRPC client and worker to use a local address and port
const daprHost = "localhost";
const daprPort = "50001";
const workflowClient = new DaprWorkflowClient({
daprHost,
daprPort,
});
const workflowRuntime = new WorkflowRuntime({
daprHost,
daprPort,
});
const hello = async (_: WorkflowActivityContext, name: string) => {
return `Hello ${name}!`;
};
const sequence: TWorkflow = async function* (ctx: WorkflowContext): any {
const cities: string[] = [];
const result1 = yield ctx.callActivity(hello, "Tokyo");
cities.push(result1);
const result2 = yield ctx.callActivity(hello, "Seattle");
cities.push(result2);
const result3 = yield ctx.callActivity(hello, "London");
cities.push(result3);
return cities;
};
workflowRuntime.registerWorkflow(sequence).registerActivity(hello);
// Wrap the worker startup in a try-catch block to handle any errors during startup
try {
await workflowRuntime.start();
console.log("Workflow runtime started successfully");
} catch (error) {
console.error("Error starting workflow runtime:", error);
}
// Schedule a new orchestration
try {
const id = await workflowClient.scheduleNewWorkflow(sequence);
console.log(`Orchestration scheduled with ID: ${id}`);
// Wait for orchestration completion
const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);
console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
} catch (error) {
console.error("Error scheduling or waiting for orchestration:", error);
}
await workflowRuntime.stop();
await workflowClient.stop();
// stop the dapr side car
process.exit(0);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
```
{{% /codetab %}}
{{% codetab %}}
```csharp
// Expotential backoff retry policy that survives long outages
var retryOptions = new WorkflowTaskOptions
{
RetryPolicy = new WorkflowRetryPolicy(
firstRetryInterval: TimeSpan.FromMinutes(1),
backoffCoefficient: 2.0,
maxRetryInterval: TimeSpan.FromHours(1),
maxNumberOfAttempts: 10),
};
try
{
var result1 = await context.CallActivityAsync("Step1", wfInput, retryOptions);
var result2 = await context.CallActivityAsync("Step2", result1, retryOptions);
var result3 = await context.CallActivityAsync("Step3", result2, retryOptions);
return string.Join(", ", result4);
}
catch (TaskFailedException) // Task failures are surfaced as TaskFailedException
{
// Retries expired - apply custom compensation logic
await context.CallActivityAsync("MyCompensation", options: retryOptions);
throw;
}
```
> **Note** In the example above, `"Step1"`, `"Step2"`, `"Step3"`, and `"MyCompensation"` represent workflow activities, which are functions in your code that actually implement the steps of the workflow. For brevity, these activity implementations are left out of this example.
{{% /codetab %}}
{{% codetab %}}
```java
public class ChainWorkflow extends Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
StringBuilder sb = new StringBuilder();
String wfInput = ctx.getInput(String.class);
String result1 = ctx.callActivity("Step1", wfInput, String.class).await();
String result2 = ctx.callActivity("Step2", result1, String.class).await();
String result3 = ctx.callActivity("Step3", result2, String.class).await();
String result = sb.append(result1).append(',').append(result2).append(',').append(result3).toString();
ctx.complete(result);
};
}
}
class Step1 implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(Step1.class);
logger.info("Starting Activity: " + ctx.getName());
// Do some work
return null;
}
}
class Step2 implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(Step2.class);
logger.info("Starting Activity: " + ctx.getName());
// Do some work
return null;
}
}
class Step3 implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(Step3.class);
logger.info("Starting Activity: " + ctx.getName());
// Do some work
return null;
}
}
```
{{% /codetab %}}
{{% codetab %}}
```go
func TaskChainWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
var result1 int
if err := ctx.CallActivity(Step1, workflow.ActivityInput(input)).Await(&result1); err != nil {
return nil, err
}
var result2 int
if err := ctx.CallActivity(Step2, workflow.ActivityInput(input)).Await(&result2); err != nil {
return nil, err
}
var result3 int
if err := ctx.CallActivity(Step3, workflow.ActivityInput(input)).Await(&result3); err != nil {
return nil, err
}
return []int{result1, result2, result3}, nil
}
func Step1(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
fmt.Printf("Step 1: Received input: %s", input)
return input + 1, nil
}
func Step2(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
fmt.Printf("Step 2: Received input: %s", input)
return input * 2, nil
}
func Step3(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
fmt.Printf("Step 3: Received input: %s", input)
return int(math.Pow(float64(input), 2)), nil
}
```
{{% /codetab %}}
{{< /tabs >}}
As you can see, the workflow is expressed as a simple series of statements in the programming language of your choice. This allows any engineer in the organization to quickly understand the end-to-end flow without necessarily needing to understand the end-to-end system architecture.
Behind the scenes, the Dapr Workflow runtime:
- Takes care of executing the workflow and ensuring that it runs to completion.
- Saves progress automatically.
- Automatically resumes the workflow from the last completed step if the workflow process itself fails for any reason.
- Enables error handling to be expressed naturally in your target programming language, allowing you to implement compensation logic easily.
- Provides built-in retry configuration primitives to simplify the process of configuring complex retry policies for individual steps in the workflow.
## Fan-out/fan-in
In the fan-out/fan-in design pattern, you execute multiple tasks simultaneously across potentially multiple workers, wait for them to finish, and perform some aggregation on the result.
In addition to the challenges mentioned in [the previous pattern]({{< ref "workflow-patterns.md#task-chaining" >}}), there are several important questions to consider when implementing the fan-out/fan-in pattern manually:
- How do you control the degree of parallelism?
- How do you know when to trigger subsequent aggregation steps?
- What if the number of parallel steps is dynamic?
Dapr Workflows provides a way to express the fan-out/fan-in pattern as a simple function, as shown in the following example:
{{< tabs Python JavaScript ".NET" Java Go >}}
{{% codetab %}}
```python
import time
from typing import List
import dapr.ext.workflow as wf
def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
# get a batch of N work items to process in parallel
work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)
# schedule N parallel tasks to process the work items and wait for all to complete
parallel_tasks = [ctx.call_activity(process_work_item, input=work_item) for work_item in work_batch]
outputs = yield wf.when_all(parallel_tasks)
# aggregate the results and send them to another activity
total = sum(outputs)
yield ctx.call_activity(process_results, input=total)
def get_work_batch(ctx, batch_size: int) -> List[int]:
return [i + 1 for i in range(batch_size)]
def process_work_item(ctx, work_item: int) -> int:
print(f'Processing work item: {work_item}.')
time.sleep(5)
result = work_item * 2
print(f'Work item {work_item} processed. Result: {result}.')
return result
def process_results(ctx, final_result: int):
print(f'Final result: {final_result}.')
```
{{% /codetab %}}
{{% codetab %}}
```javascript
import {
Task,
DaprWorkflowClient,
WorkflowActivityContext,
WorkflowContext,
WorkflowRuntime,
TWorkflow,
} from "@dapr/dapr";
// Wrap the entire code in an immediately-invoked async function
async function start() {
// Update the gRPC client and worker to use a local address and port
const daprHost = "localhost";
const daprPort = "50001";
const workflowClient = new DaprWorkflowClient({
daprHost,
daprPort,
});
const workflowRuntime = new WorkflowRuntime({
daprHost,
daprPort,
});
function getRandomInt(min: number, max: number): number {
return Math.floor(Math.random() * (max - min + 1)) + min;
}
async function getWorkItemsActivity(_: WorkflowActivityContext): Promise {
const count: number = getRandomInt(2, 10);
console.log(`generating ${count} work items...`);
const workItems: string[] = Array.from({ length: count }, (_, i) => `work item ${i}`);
return workItems;
}
function sleep(ms: number): Promise {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function processWorkItemActivity(context: WorkflowActivityContext, item: string): Promise {
console.log(`processing work item: ${item}`);
// Simulate some work that takes a variable amount of time
const sleepTime = Math.random() * 5000;
await sleep(sleepTime);
// Return a result for the given work item, which is also a random number in this case
// For more information about random numbers in workflow please check
// https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-code-constraints?tabs=csharp#random-numbers
return Math.floor(Math.random() * 11);
}
const workflow: TWorkflow = async function* (ctx: WorkflowContext): any {
const tasks: Task[] = [];
const workItems = yield ctx.callActivity(getWorkItemsActivity);
for (const workItem of workItems) {
tasks.push(ctx.callActivity(processWorkItemActivity, workItem));
}
const results: number[] = yield ctx.whenAll(tasks);
const sum: number = results.reduce((accumulator, currentValue) => accumulator + currentValue, 0);
return sum;
};
workflowRuntime.registerWorkflow(workflow);
workflowRuntime.registerActivity(getWorkItemsActivity);
workflowRuntime.registerActivity(processWorkItemActivity);
// Wrap the worker startup in a try-catch block to handle any errors during startup
try {
await workflowRuntime.start();
console.log("Worker started successfully");
} catch (error) {
console.error("Error starting worker:", error);
}
// Schedule a new orchestration
try {
const id = await workflowClient.scheduleNewWorkflow(workflow);
console.log(`Orchestration scheduled with ID: ${id}`);
// Wait for orchestration completion
const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);
console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
} catch (error) {
console.error("Error scheduling or waiting for orchestration:", error);
}
// stop worker and client
await workflowRuntime.stop();
await workflowClient.stop();
// stop the dapr side car
process.exit(0);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
```
{{% /codetab %}}
{{% codetab %}}
```csharp
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync