add fan in/fan out

Signed-off-by: Hannah Hunter <hannahhunter@microsoft.com>
This commit is contained in:
Hannah Hunter 2024-02-16 12:33:22 -05:00
parent 233c4f3e56
commit 8204495bf0
1 changed files with 58 additions and 1 deletions

View File

@ -516,7 +516,64 @@ public class FaninoutWorkflow extends Workflow {
<!--go-->
```go
func BatchProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return 0, err
}
var workBatch []int
if err := ctx.CallActivity(GetWorkBatch, workflow.ActivityInput(input)).Await(&workBatch); err != nil {
return 0, err
}
parallelTasks := workflow.NewTaskSlice(len(workBatch))
for i, workItem := range workBatch {
parallelTasks[i] = ctx.CallActivity(ProcessWorkItem, workflow.ActivityInput(workItem))
}
var outputs int
for _, task := range parallelTasks {
var output int
err := task.Await(&output)
if err == nil {
outputs += output
} else {
return 0, err
}
}
if err := ctx.CallActivity(ProcessResults, workflow.ActivityInput(outputs)).Await(nil); err != nil {
return 0, err
}
return 0, nil
}
func GetWorkBatch(ctx workflow.ActivityContext) (any, error) {
var batchSize int
if err := ctx.GetInput(&batchSize); err != nil {
return 0, err
}
batch := make([]int, batchSize)
for i := 0; i < batchSize; i++ {
batch[i] = i
}
return batch, nil
}
func ProcessWorkItem(ctx workflow.ActivityContext) (any, error) {
var workItem int
if err := ctx.GetInput(&workItem); err != nil {
return 0, err
}
fmt.Printf("Processing work item: %d\n", workItem)
time.Sleep(time.Second * 5)
result := workItem * 2
fmt.Printf("Work item %d processed. Result: %d\n", workItem, result)
return result, nil
}
func ProcessResults(ctx workflow.ActivityContext) (any, error) {
var finalResult int
if err := ctx.GetInput(&finalResult); err != nil {
return 0, err
}
fmt.Printf("Final result: %d\n", finalResult)
return finalResult, nil
}
```
{{% /codetab %}}