mirror of https://github.com/dapr/docs.git
Merge 1b6ab05c6e
into 4867d9dc04
This commit is contained in:
commit
4493542de4
|
@ -197,10 +197,12 @@ public class DemoWorkflowActivity implements WorkflowActivity {
|
|||
|
||||
<!--go-->
|
||||
|
||||
### Define workflow activities
|
||||
|
||||
Define each workflow activity you'd like your workflow to perform. The Activity input can be unmarshalled from the context with `ctx.GetInput`. Activities should be defined as taking a `ctx workflow.ActivityContext` parameter and returning an interface and error.
|
||||
|
||||
```go
|
||||
func TestActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
func BusinessActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
var input int
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return "", err
|
||||
|
@ -211,6 +213,87 @@ func TestActivity(ctx workflow.ActivityContext) (any, error) {
|
|||
}
|
||||
```
|
||||
|
||||
### Define the workflow
|
||||
|
||||
Define your workflow function with the parameter `ctx *workflow.WorkflowContext` and return any and error. Invoke your defined activities from within your workflow.
|
||||
|
||||
```go
|
||||
func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
var input int
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var output string
|
||||
if err := ctx.CallActivity(BusinessActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := ctx.WaitForExternalEvent("businessEvent", time.Second*60).Await(&output); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := ctx.CreateTimer(time.Second).Await(nil); err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
return output, nil
|
||||
}
|
||||
```
|
||||
|
||||
### Register workflows and activities
|
||||
|
||||
Before your application can execute workflows, you must register both the workflow orchestrator and its activities with a workflow registry. This ensures Dapr knows which functions to call when executing your workflow.
|
||||
|
||||
```go
|
||||
func main() {
|
||||
// Create a workflow registry
|
||||
r := workflow.NewRegistry()
|
||||
|
||||
// Register the workflow orchestrator
|
||||
if err := r.AddWorkflow(BusinessWorkflow); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Println("BusinessWorkflow registered")
|
||||
|
||||
// Register the workflow activities
|
||||
if err := r.AddActivity(BusinessActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Println("BusinessActivity registered")
|
||||
|
||||
// Create workflow client and start worker
|
||||
wclient, err := client.NewWorkflowClient()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Println("Worker initialized")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
if err = wclient.StartWorker(ctx, r); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Println("runner started")
|
||||
|
||||
// Your application logic continues here...
|
||||
// Example: Start a workflow
|
||||
instanceID, err := wclient.ScheduleWorkflow(ctx, "BusinessWorkflow", workflow.WithInput(1))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start workflow: %v", err)
|
||||
}
|
||||
fmt.Printf("workflow started with id: %v\n", instanceID)
|
||||
|
||||
// Stop workflow worker when done
|
||||
cancel()
|
||||
fmt.Println("workflow worker successfully shutdown")
|
||||
}
|
||||
```
|
||||
|
||||
**Key points about registration:**
|
||||
- Use `workflow.NewRegistry()` to create a workflow registry
|
||||
- Use `r.AddWorkflow()` to register workflow functions
|
||||
- Use `r.AddActivity()` to register activity functions
|
||||
- Use `client.NewWorkflowClient()` to create a workflow client
|
||||
- Call `wclient.StartWorker()` to begin processing workflows
|
||||
- Use `wclient.ScheduleWorkflow` to schedule a named instance of a workflow
|
||||
|
||||
[See the Go SDK workflow activity example in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)
|
||||
|
||||
{{% /tab %}}
|
||||
|
@ -383,16 +466,16 @@ public class DemoWorkflowWorker {
|
|||
Define your workflow function with the parameter `ctx *workflow.WorkflowContext` and return any and error. Invoke your defined activities from within your workflow.
|
||||
|
||||
```go
|
||||
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
var input int
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var output string
|
||||
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
|
||||
if err := ctx.CallActivity(BusinessActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output); err != nil {
|
||||
if err := ctx.WaitForExternalEvent("businessEvent", time.Second*60).Await(&output); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -864,7 +947,7 @@ public class DemoWorkflow extends Workflow {
|
|||
[As in the following example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md), a hello-world application using the Go SDK and Dapr Workflow would include:
|
||||
|
||||
- A Go package called `client` to receive the Go SDK client capabilities.
|
||||
- The `TestWorkflow` method
|
||||
- The `BusinessWorkflow` method
|
||||
- Creating the workflow with input and output.
|
||||
- API calls. In the example below, these calls start and call the workflow activities.
|
||||
|
||||
|
@ -889,15 +972,15 @@ var failActivityTries = 0
|
|||
func main() {
|
||||
r := workflow.NewRegistry()
|
||||
|
||||
if err := r.AddWorkflow(TestWorkflow); err != nil {
|
||||
if err := r.AddWorkflow(BusinessWorkflow); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Println("TestWorkflow registered")
|
||||
fmt.Println("BusinessWorkflow registered")
|
||||
|
||||
if err := r.AddActivity(TestActivity); err != nil {
|
||||
if err := r.AddActivity(BusinessActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Println("TestActivity registered")
|
||||
fmt.Println("BusinessActivity registered")
|
||||
|
||||
if err := r.AddActivity(FailActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
|
@ -921,7 +1004,7 @@ func main() {
|
|||
// "start". This is useful for increasing the throughput of creating
|
||||
// workflows.
|
||||
// workflow.WithStartTime(time.Now())
|
||||
instanceID, err := wclient.ScheduleWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
|
||||
instanceID, err := wclient.ScheduleWorkflow(ctx, "BusinessWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start workflow: %v", err)
|
||||
}
|
||||
|
@ -963,9 +1046,8 @@ func main() {
|
|||
|
||||
fmt.Printf("stage: %d\n", stage)
|
||||
|
||||
// Raise Event Test
|
||||
|
||||
err = wclient.RaiseEvent(ctx, instanceID, "testEvent", workflow.WithEventPayload("testData"))
|
||||
// Raise Event
|
||||
err = wclient.RaiseEvent(ctx, instanceID, "businessEvent", workflow.WithEventPayload("testData"))
|
||||
if err != nil {
|
||||
fmt.Printf("failed to raise event: %v", err)
|
||||
}
|
||||
|
@ -1008,7 +1090,7 @@ func main() {
|
|||
fmt.Printf("stage: %d\n", stage)
|
||||
|
||||
// Terminate workflow test
|
||||
id, err := wclient.ScheduleWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
|
||||
id, err := wclient.ScheduleWorkflow(ctx, "BusinessWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start workflow: %v", err)
|
||||
}
|
||||
|
@ -1037,22 +1119,22 @@ func main() {
|
|||
fmt.Println("workflow worker successfully shutdown")
|
||||
}
|
||||
|
||||
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
var input int
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var output string
|
||||
if err := ctx.CallActivity(TestActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
|
||||
if err := ctx.CallActivity(BusinessActivity, task.WithActivityInput(input)).Await(&output); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
|
||||
err := ctx.WaitForSingleEvent("businessEvent", time.Second*60).Await(&output)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := ctx.CallActivity(TestActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
|
||||
if err := ctx.CallActivity(BusinessActivity, task.WithActivityInput(input)).Await(&output); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -1068,7 +1150,7 @@ func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
|||
return output, nil
|
||||
}
|
||||
|
||||
func TestActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
func BusinessActivity(ctx task.ActivityContext) (any, error) {
|
||||
var input int
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return "", err
|
||||
|
|
|
@ -19,28 +19,65 @@ Some scenarios where this is useful include:
|
|||
- Implementation of a workflow spans different programming languages based on team expertise or existing codebases.
|
||||
- Different team boundaries or microservice ownership.
|
||||
|
||||
<img src="/images/workflow-overview/workflow-multi-app-complex.png" width=800 alt="Diagram showing multi-application complex workflow">
|
||||
|
||||
The diagram below shows an example scenario of a complex workflow that orchestrates across multiple applications that are written in different languages. Each applications' main steps and activities are:
|
||||
|
||||
• **App1: Main Workflow Service** - Top-level orchestrator that coordinates the entire ML pipeline
|
||||
- Starts the process
|
||||
- Calls data processing activities on App2
|
||||
- Calls ML training child workflow on App3
|
||||
- Calls model deployment on App4
|
||||
- Ends the complete workflow
|
||||
- **Language: Java**
|
||||
|
||||
• **App2: Data Processing Pipeline** - **GPU activities** only
|
||||
- Data Ingesting Activity (GPU-accelerated)
|
||||
- Feature Engineering Activity (GPU-accelerated)
|
||||
- Returns completion signal to Main Workflow
|
||||
- **Language: Go**
|
||||
|
||||
• **App3: ML Training Child Workflow** - Contains a child workflow and activities
|
||||
- Child workflow orchestrates:
|
||||
- Data Processing Activity
|
||||
- Model Training Activity (GPU-intensive)
|
||||
- Model Validation Activity
|
||||
- Triggered by App2's activities completing
|
||||
- Returns completion signal to Main Workflow
|
||||
- **Language: Java**
|
||||
|
||||
• **App4: Model Serving Service** - **Beefy GPU app** with activities only
|
||||
- Model Loading Activity (GPU memory intensive)
|
||||
- Inference Setup Activity (GPU-accelerated inference)
|
||||
- Triggered by App3's workflow completing
|
||||
- Returns completion signal to Main Workflow
|
||||
- **Language: Go**
|
||||
|
||||
## Multi-application workflows
|
||||
|
||||
Like all building blocks in Dapr, workflow execution routing is based on the [App ID of the hosting Dapr application]({{% ref "security-concept.md#application-identity" %}}).
|
||||
Workflow execution routing is based on the [App ID of the hosting Dapr application]({{% ref "security-concept.md#application-identity" %}}).
|
||||
By default, the full workflow execution is hosted on the app ID that started the workflow. This workflow can be executed across any replicas of that app ID, not just the single replica which scheduled the workflow.
|
||||
|
||||
|
||||
It is possible to execute activities or child workflows on different app IDs by specifying the target app ID parameter, inside the workflow execution code.
|
||||
Upon execution, the target app ID will execute the activity or child workflow, and return the result to the parent workflow of the originating app ID.
|
||||
It is possible to execute activities and child workflows on different app IDs by specifying the target app ID parameter, inside the workflow execution code.
|
||||
Upon execution, the target app ID executes the activity or child workflow, and returns the result to the parent workflow of the originating app ID.
|
||||
|
||||
The entire Workflow execution may be distributed across multiple app IDs with no limit, with each activity or child workflow specifying the target app ID.
|
||||
The final history of the workflow will be saved by the app ID that hosts the very parent (or can consider it the root) workflow.
|
||||
|
||||
{{% alert title="Restrictions" color="primary" %}}
|
||||
Like other building blocks and resources in Dapr, workflows are scoped to a single namespace.
|
||||
Like other API building blocks and resources in Dapr, workflows are scoped to a single namespace.
|
||||
This means that all app IDs involved in a multi-application workflow must be in the same namespace.
|
||||
Similarly, all app IDs must use the same actor state store.
|
||||
Finally, the target app ID must have the activity or child workflow defined, otherwise the parent workflow will retry indefinitely.
|
||||
Similarly, all app IDs must use the same workflow (or actor) state store.
|
||||
Finally, the target app ID must have the activity or child workflow defined and registered, otherwise the parent workflow retries indefinitely.
|
||||
{{% /alert %}}
|
||||
|
||||
{{% alert title="Important Limitations" color="warning" %}}
|
||||
- **SDKs supporting multi-application workflows** - Multi-application workflows are used via the SDKs. Currently Java (activities calling) and Go (both activities and child workflows calling) SDKs are supported. The SDKs (Python, .NET, JavaScript) are planned for future releases.
|
||||
**SDKs supporting multi-application workflows** - Multi-application workflows are used via the SDKs.
|
||||
Currently the following are supported:
|
||||
- **Java** (**only** activity calls)
|
||||
- **Go** (**both** activities and child workflows calls)
|
||||
- The Python, .NET, JavaScript SDKs support are planned for future releases
|
||||
{{% /alert %}}
|
||||
|
||||
## Error handling
|
||||
|
@ -63,7 +100,7 @@ The following example shows how to execute the activity `ActivityA` on the targe
|
|||
{{% tab "Go" %}}
|
||||
|
||||
```go
|
||||
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
var output string
|
||||
err := ctx.CallActivity("ActivityA",
|
||||
workflow.WithActivityInput("my-input"),
|
||||
|
@ -83,12 +120,12 @@ func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
|||
{{% tab "Java" %}}
|
||||
|
||||
```java
|
||||
public class CrossAppWorkflow implements Workflow {
|
||||
public class BusinessWorkflow implements Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
String output = ctx.callActivity(
|
||||
"ActivityA",
|
||||
ActivityA.class.getName(),
|
||||
"my-input",
|
||||
new WorkflowTaskOptions("App2"), // Here we set the target app ID which will execute this activity.
|
||||
String.class
|
||||
|
@ -115,7 +152,7 @@ The following example shows how to execute the child workflow `Workflow2` on the
|
|||
{{% tab "Go" %}}
|
||||
|
||||
```go
|
||||
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
func BusinessWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
var output string
|
||||
err := ctx.CallChildWorkflow("Workflow2",
|
||||
workflow.WithChildWorkflowInput("my-input"),
|
||||
|
|
Binary file not shown.
Before Width: | Height: | Size: 81 KiB After Width: | Height: | Size: 41 KiB |
Binary file not shown.
After Width: | Height: | Size: 111 KiB |
Loading…
Reference in New Issue