diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index 803133562..25ec9c6f3 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -166,7 +166,38 @@ await context.CallActivityAsync("PostResults", sum); ```python -# TODO +import time +from typing import List +import dapr.ext.workflow as wf + + +def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): + # get a batch of N work items to process in parallel + work_batch = yield ctx.call_activity(get_work_batch, input=wf_input) + + # schedule N parallel tasks to process the work items and wait for all to complete + parallel_tasks = [ctx.call_activity(process_work_item, input=work_item) for work_item in work_batch] + outputs = yield wf.when_all(parallel_tasks) + + # aggregate the results and send them to another activity + total = sum(outputs) + yield ctx.call_activity(process_results, input=total) + + +def get_work_batch(ctx, batch_size: int) -> List[int]: + return [i + 1 for i in range(batch_size)] + + +def process_work_item(ctx, work_item: int) -> int: + print(f'Processing work item: {work_item}.') + time.sleep(5) + result = work_item * 2 + print(f'Work item {work_item} processed. Result: {result}.') + return result + + +def process_results(ctx, final_result: int): + print(f'Final result: {final_result}.') ``` {{% /codetab %}}