mirror of https://github.com/dapr/go-sdk.git
131 lines
3.2 KiB
Go
131 lines
3.2 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/dapr/go-sdk/workflow"
|
|
)
|
|
|
|
func main() {
|
|
w, err := workflow.NewWorker()
|
|
if err != nil {
|
|
log.Fatalf("failed to initialise worker: %v", err)
|
|
}
|
|
|
|
if err := w.RegisterWorkflow(BatchProcessingWorkflow); err != nil {
|
|
log.Fatalf("failed to register workflow: %v", err)
|
|
}
|
|
if err := w.RegisterActivity(GetWorkBatch); err != nil {
|
|
log.Fatalf("failed to register activity: %v", err)
|
|
}
|
|
if err := w.RegisterActivity(ProcessWorkItem); err != nil {
|
|
log.Fatalf("failed to register activity: %v", err)
|
|
}
|
|
if err := w.RegisterActivity(ProcessResults); err != nil {
|
|
log.Fatalf("failed to register activity: %v", err)
|
|
}
|
|
fmt.Println("Workflow(s) and activities registered.")
|
|
|
|
if err := w.Start(); err != nil {
|
|
log.Fatalf("failed to start worker")
|
|
}
|
|
|
|
wfClient, err := workflow.NewClient()
|
|
if err != nil {
|
|
log.Fatalf("failed to initialise client: %v", err)
|
|
}
|
|
ctx := context.Background()
|
|
id, err := wfClient.ScheduleNewWorkflow(ctx, "BatchProcessingWorkflow", workflow.WithInput(10))
|
|
if err != nil {
|
|
log.Fatalf("failed to schedule a new workflow: %v", err)
|
|
}
|
|
|
|
metadata, err := wfClient.WaitForWorkflowCompletion(ctx, id)
|
|
if err != nil {
|
|
log.Fatalf("failed to get workflow: %v", err)
|
|
}
|
|
fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String())
|
|
|
|
err = wfClient.TerminateWorkflow(ctx, id)
|
|
if err != nil {
|
|
log.Fatalf("failed to terminate workflow: %v", err)
|
|
}
|
|
fmt.Println("workflow terminated")
|
|
|
|
err = wfClient.PurgeWorkflow(ctx, id)
|
|
if err != nil {
|
|
log.Fatalf("failed to purge workflow: %v", err)
|
|
}
|
|
fmt.Println("workflow purged")
|
|
}
|
|
|
|
func BatchProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
|
var input int
|
|
if err := ctx.GetInput(&input); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
var workBatch []int
|
|
if err := ctx.CallActivity(GetWorkBatch, workflow.ActivityInput(input)).Await(&workBatch); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
parallelTasks := workflow.NewTaskSlice(len(workBatch))
|
|
for i, workItem := range workBatch {
|
|
parallelTasks[i] = ctx.CallActivity(ProcessWorkItem, workflow.ActivityInput(workItem))
|
|
}
|
|
|
|
var outputs int
|
|
for _, task := range parallelTasks {
|
|
var output int
|
|
err := task.Await(&output)
|
|
if err == nil {
|
|
outputs += output
|
|
} else {
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
if err := ctx.CallActivity(ProcessResults, workflow.ActivityInput(outputs)).Await(nil); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return 0, nil
|
|
}
|
|
|
|
func GetWorkBatch(ctx workflow.ActivityContext) (any, error) {
|
|
var batchSize int
|
|
if err := ctx.GetInput(&batchSize); err != nil {
|
|
return 0, err
|
|
}
|
|
batch := make([]int, batchSize)
|
|
for i := 0; i < batchSize; i++ {
|
|
batch[i] = i
|
|
}
|
|
return batch, nil
|
|
}
|
|
|
|
func ProcessWorkItem(ctx workflow.ActivityContext) (any, error) {
|
|
var workItem int
|
|
if err := ctx.GetInput(&workItem); err != nil {
|
|
return 0, err
|
|
}
|
|
fmt.Printf("Processing work item: %d\n", workItem)
|
|
time.Sleep(time.Second * 5)
|
|
result := workItem * 2
|
|
fmt.Printf("Work item %d processed. Result: %d\n", workItem, result)
|
|
return result, nil
|
|
}
|
|
|
|
func ProcessResults(ctx workflow.ActivityContext) (any, error) {
|
|
var finalResult int
|
|
if err := ctx.GetInput(&finalResult); err != nil {
|
|
return 0, err
|
|
}
|
|
fmt.Printf("Final result: %d\n", finalResult)
|
|
return finalResult, nil
|
|
}
|