mirror of https://github.com/dapr/docs.git
Add fan-out / fan-in Python snippet
Signed-off-by: Chris Gillum <cgillum@microsoft.com>
This commit is contained in:
parent
b4f1e85d4b
commit
759ff9495e
|
@ -166,7 +166,38 @@ await context.CallActivityAsync("PostResults", sum);
|
||||||
<!--python-->
|
<!--python-->
|
||||||
|
|
||||||
```python
|
```python
|
||||||
# TODO
|
import time
|
||||||
|
from typing import List
|
||||||
|
import dapr.ext.workflow as wf
|
||||||
|
|
||||||
|
|
||||||
|
def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
|
||||||
|
# get a batch of N work items to process in parallel
|
||||||
|
work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)
|
||||||
|
|
||||||
|
# schedule N parallel tasks to process the work items and wait for all to complete
|
||||||
|
parallel_tasks = [ctx.call_activity(process_work_item, input=work_item) for work_item in work_batch]
|
||||||
|
outputs = yield wf.when_all(parallel_tasks)
|
||||||
|
|
||||||
|
# aggregate the results and send them to another activity
|
||||||
|
total = sum(outputs)
|
||||||
|
yield ctx.call_activity(process_results, input=total)
|
||||||
|
|
||||||
|
|
||||||
|
def get_work_batch(ctx, batch_size: int) -> List[int]:
|
||||||
|
return [i + 1 for i in range(batch_size)]
|
||||||
|
|
||||||
|
|
||||||
|
def process_work_item(ctx, work_item: int) -> int:
|
||||||
|
print(f'Processing work item: {work_item}.')
|
||||||
|
time.sleep(5)
|
||||||
|
result = work_item * 2
|
||||||
|
print(f'Work item {work_item} processed. Result: {result}.')
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def process_results(ctx, final_result: int):
|
||||||
|
print(f'Final result: {final_result}.')
|
||||||
```
|
```
|
||||||
|
|
||||||
{{% /codetab %}}
|
{{% /codetab %}}
|
||||||
|
|
Loading…
Reference in New Issue