mirror of https://github.com/dapr/docs.git
Merge branch 'v1.16' into docs/max-body-size
This commit is contained in:
commit
166fb5d1c5
|
@ -141,6 +141,14 @@ Dapr can be used from any developer framework. Here are some that have been inte
|
|||
| [JavaScript](https://github.com/dapr/js-sdk) | [Express](https://expressjs.com/) | Build Express applications with Dapr APIs
|
||||
| [PHP]({{% ref php %}}) | | You can serve with Apache, Nginx, or Caddyserver.
|
||||
|
||||
#### Dapr Agents
|
||||
|
||||

|
||||
|
||||
|
||||
[Dapr Agents]({{% ref "../developing-applications/dapr-agents" %}}) is a Python framework for building intelligent, durable agents powered by LLMs. It provides agent-centric capabilities such as tool calling, memory management, [MCP support](https://modelcontextprotocol.io/) and agent orchestration, while leveraging Dapr for durability, observability, and security, at scale.
|
||||
|
||||
|
||||
#### Integrations and extensions
|
||||
|
||||
Visit the [integrations]({{% ref integrations %}}) page to learn about some of the first-class support Dapr has for various frameworks and external products, including:
|
||||
|
|
|
@ -10,15 +10,17 @@ This page details all of the common terms you may come across in the Dapr docs.
|
|||
|
||||
| Term | Definition | More information |
|
||||
|:-----|------------|------------------|
|
||||
| App/Application | A running service/binary, usually one that you as the user create and run.
|
||||
| App/Application | A running service/binary, usually one that you as the user create and run.
|
||||
| Building block | An API that Dapr provides to users to help in the creation of microservices and applications. | [Dapr building blocks]({{% ref building-blocks-concept %}})
|
||||
| Component | Modular types of functionality that are used either individually or with a collection of other components, by a Dapr building block. | [Dapr components]({{% ref components-concept %}})
|
||||
| Configuration | A YAML file declaring all of the settings for Dapr sidecars or the Dapr control plane. This is where you can configure control plane mTLS settings, or the tracing and middleware settings for an application instance. | [Dapr configuration]({{% ref configuration-concept %}})
|
||||
| Dapr | Distributed Application Runtime. | [Dapr overview]({{% ref overview %}})
|
||||
| Dapr Actors | A Dapr building block that implements the virtual actor pattern for building stateful, single-threaded objects with identity, lifecycle, and concurrency management. | [Actors overview]({{% ref actors-overview %}})
|
||||
| Dapr Agents | A developer framework built on top of Dapr Python SDK for creating durable agentic applications powered by LLMs. | [Dapr Agents]({{% ref "../developing-applications/dapr-agents" %}})
|
||||
| Dapr control plane | A collection of services that are part of a Dapr installation on a hosting platform such as a Kubernetes cluster. This allows Dapr-enabled applications to run on the platform and handles Dapr capabilities such as actor placement, Dapr sidecar injection, or certificate issuance/rollover. | [Self-hosted overview]({{% ref self-hosted-overview %}})<br />[Kubernetes overview]({{% ref kubernetes-overview %}})
|
||||
| Dapr Workflows | A Dapr building block for authoring code-first workflows with durable execution that survive crashes, support long-running processes, and enable human-in-the-loop interactions. | [Workflow overview]({{% ref workflow-overview %}})
|
||||
| HTTPEndpoint | HTTPEndpoint is a Dapr resource use to identify non-Dapr endpoints to invoke via the service invocation API. | [Service invocation API]({{% ref service_invocation_api %}})
|
||||
| Namespacing | Namespacing in Dapr provides isolation, and thus provides multi-tenancy. | Learn more about namespacing [components]({{% ref component-scopes %}}), [service invocation]({{% ref service-invocation-namespaces %}}), [pub/sub]({{% ref pubsub-namespaces %}}), and [actors]({{% ref namespaced-actors %}})
|
||||
| Self-hosted | Windows/macOS/Linux machine(s) where you can run your applications with Dapr. Dapr provides the capability to run on machines in "self-hosted" mode. | [Self-hosted mode]({{% ref self-hosted-overview %}})
|
||||
| Service | A running application or binary. This can refer to your application or to a Dapr application.
|
||||
| Sidecar | A program that runs alongside your application as a separate process or container. | [Sidecar pattern](https://docs.microsoft.com/azure/architecture/patterns/sidecar)
|
||||
|
||||
| Service | A running application or binary. This can refer to your application or to a Dapr application.
|
||||
| Sidecar | A program that runs alongside your application as a separate process or container. | [Sidecar pattern](https://docs.microsoft.com/azure/architecture/patterns/sidecar)
|
||||
|
|
|
@ -874,86 +874,98 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/durabletask-go/api"
|
||||
"github.com/dapr/durabletask-go/backend"
|
||||
"github.com/dapr/durabletask-go/client"
|
||||
"github.com/dapr/durabletask-go/task"
|
||||
dapr "github.com/dapr/go-sdk/client"
|
||||
"github.com/dapr/durabletask-go/workflow"
|
||||
"github.com/dapr/go-sdk/client"
|
||||
)
|
||||
|
||||
var stage = 0
|
||||
|
||||
const (
|
||||
workflowComponent = "dapr"
|
||||
)
|
||||
var failActivityTries = 0
|
||||
|
||||
func main() {
|
||||
registry := task.NewTaskRegistry()
|
||||
r := workflow.NewRegistry()
|
||||
|
||||
if err := registry.AddOrchestrator(TestWorkflow); err != nil {
|
||||
if err := r.AddWorkflow(TestWorkflow); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Println("TestWorkflow registered")
|
||||
|
||||
if err := registry.AddActivity(TestActivity); err != nil {
|
||||
if err := r.AddActivity(TestActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Println("TestActivity registered")
|
||||
|
||||
daprClient, err := dapr.NewClient()
|
||||
if err := r.AddActivity(FailActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Println("FailActivity registered")
|
||||
|
||||
wclient, err := client.NewWorkflowClient()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create Dapr client: %v", err)
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Println("Worker initialized")
|
||||
|
||||
client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
|
||||
if err := client.StartWorkItemListener(context.TODO(), registry); err != nil {
|
||||
log.Fatalf("failed to start work item listener: %v", err)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
if err = wclient.StartWorker(ctx, r); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
fmt.Println("runner started")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Start workflow test
|
||||
id, err := client.ScheduleNewOrchestration(ctx, "TestWorkflow", api.WithInput(1))
|
||||
// Set the start time to the current time to not wait for the workflow to
|
||||
// "start". This is useful for increasing the throughput of creating
|
||||
// workflows.
|
||||
// workflow.WithStartTime(time.Now())
|
||||
instanceID, err := wclient.ScheduleWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start workflow: %v", err)
|
||||
}
|
||||
fmt.Printf("workflow started with id: %v\n", id)
|
||||
fmt.Printf("workflow started with id: %v\n", instanceID)
|
||||
|
||||
// Pause workflow test
|
||||
err = client.PurgeOrchestrationState(ctx, id)
|
||||
err = wclient.SuspendWorkflow(ctx, instanceID, "")
|
||||
if err != nil {
|
||||
log.Fatalf("failed to pause workflow: %v", err)
|
||||
}
|
||||
|
||||
respGet, err := client.FetchOrchestrationMetadata(ctx, id)
|
||||
respFetch, err := wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to get workflow: %v", err)
|
||||
log.Fatalf("failed to fetch workflow: %v", err)
|
||||
}
|
||||
fmt.Printf("workflow paused: %s\n", respGet.RuntimeStatus)
|
||||
|
||||
if respFetch.RuntimeStatus != workflow.StatusSuspended {
|
||||
log.Fatalf("workflow not paused: %s: %v", respFetch.RuntimeStatus, respFetch)
|
||||
}
|
||||
|
||||
fmt.Printf("workflow paused\n")
|
||||
|
||||
// Resume workflow test
|
||||
err = client.ResumeOrchestration(ctx, id, "")
|
||||
err = wclient.ResumeWorkflow(ctx, instanceID, "")
|
||||
if err != nil {
|
||||
log.Fatalf("failed to resume workflow: %v", err)
|
||||
}
|
||||
fmt.Printf("workflow running: %s\n", respGet.RuntimeStatus)
|
||||
|
||||
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
|
||||
respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to get workflow: %v", err)
|
||||
}
|
||||
fmt.Printf("workflow resumed: %s\n", respGet.RuntimeStatus)
|
||||
|
||||
if respFetch.RuntimeStatus != workflow.StatusRunning {
|
||||
log.Fatalf("workflow not running")
|
||||
}
|
||||
|
||||
fmt.Println("workflow resumed")
|
||||
|
||||
fmt.Printf("stage: %d\n", stage)
|
||||
|
||||
// Raise Event Test
|
||||
err = client.RaiseEvent(ctx, id, "testEvent", api.WithEventPayload("testData"))
|
||||
|
||||
err = wclient.RaiseEvent(ctx, instanceID, "testEvent", workflow.WithEventPayload("testData"))
|
||||
if err != nil {
|
||||
fmt.Printf("failed to raise event: %v", err)
|
||||
}
|
||||
|
@ -964,44 +976,99 @@ func main() {
|
|||
|
||||
fmt.Printf("stage: %d\n", stage)
|
||||
|
||||
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
|
||||
waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
_, err = wclient.WaitForWorkflowCompletion(waitCtx, instanceID)
|
||||
cancel()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to wait for workflow: %v", err)
|
||||
}
|
||||
|
||||
fmt.Printf("fail activity executions: %d\n", failActivityTries)
|
||||
|
||||
respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to get workflow: %v", err)
|
||||
}
|
||||
|
||||
fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
|
||||
fmt.Printf("workflow status: %v\n", respFetch.String())
|
||||
|
||||
// Purge workflow test
|
||||
err = client.PurgeOrchestrationState(ctx, id)
|
||||
err = wclient.PurgeWorkflowState(ctx, instanceID)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to purge workflow: %v", err)
|
||||
}
|
||||
|
||||
respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
|
||||
if err == nil || respFetch != nil {
|
||||
log.Fatalf("failed to purge workflow: %v", err)
|
||||
}
|
||||
|
||||
fmt.Println("workflow purged")
|
||||
|
||||
fmt.Printf("stage: %d\n", stage)
|
||||
|
||||
// Terminate workflow test
|
||||
id, err := wclient.ScheduleWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start workflow: %v", err)
|
||||
}
|
||||
fmt.Printf("workflow started with id: %v\n", instanceID)
|
||||
|
||||
metadata, err := wclient.WaitForWorkflowStart(ctx, id)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to get workflow: %v", err)
|
||||
}
|
||||
fmt.Printf("workflow status: %s\n", metadata.String())
|
||||
|
||||
err = wclient.TerminateWorkflow(ctx, id)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to terminate workflow: %v", err)
|
||||
}
|
||||
fmt.Println("workflow terminated")
|
||||
|
||||
err = wclient.PurgeWorkflowState(ctx, id)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to purge workflow: %v", err)
|
||||
}
|
||||
fmt.Println("workflow purged")
|
||||
|
||||
cancel()
|
||||
|
||||
fmt.Println("workflow worker successfully shutdown")
|
||||
}
|
||||
|
||||
func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
|
||||
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
var input int
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var output string
|
||||
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
|
||||
if err := ctx.CallActivity(TestActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := ctx.WaitForSingleEvent("testEvent", time.Second*60).Await(&output)
|
||||
err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
|
||||
if err := ctx.CallActivity(TestActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := ctx.CallActivity(FailActivity, workflow.WithActivityRetryPolicy(&workflow.RetryPolicy{
|
||||
MaxAttempts: 3,
|
||||
InitialRetryInterval: 100 * time.Millisecond,
|
||||
BackoffCoefficient: 2,
|
||||
MaxRetryInterval: 1 * time.Second,
|
||||
})).Await(nil); err == nil {
|
||||
return nil, fmt.Errorf("unexpected no error executing fail activity")
|
||||
}
|
||||
|
||||
return output, nil
|
||||
}
|
||||
|
||||
func TestActivity(ctx task.ActivityContext) (any, error) {
|
||||
func TestActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
var input int
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return "", err
|
||||
|
@ -1011,6 +1078,11 @@ func TestActivity(ctx task.ActivityContext) (any, error) {
|
|||
|
||||
return fmt.Sprintf("Stage: %d", stage), nil
|
||||
}
|
||||
|
||||
func FailActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
failActivityTries += 1
|
||||
return nil, errors.New("dummy activity error")
|
||||
}
|
||||
```
|
||||
|
||||
[See the full Go SDK workflow example in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)
|
||||
|
|
|
@ -63,11 +63,11 @@ The following example shows how to execute the activity `ActivityA` on the targe
|
|||
{{% tab "Go" %}}
|
||||
|
||||
```go
|
||||
func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
|
||||
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
var output string
|
||||
err := ctx.CallActivity("ActivityA",
|
||||
task.WithActivityInput("my-input"),
|
||||
task.WithActivityAppID("App2"), // Here we set the target app ID which will execute this activity.
|
||||
workflow.WithActivityInput("my-input"),
|
||||
workflow.WithActivityAppID("App2"), // Here we set the target app ID which will execute this activity.
|
||||
).Await(&output)
|
||||
|
||||
if err != nil {
|
||||
|
@ -115,11 +115,11 @@ The following example shows how to execute the child workflow `Workflow2` on the
|
|||
{{% tab "Go" %}}
|
||||
|
||||
```go
|
||||
func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
|
||||
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
var output string
|
||||
err := ctx.CallSubOrchestrator("Workflow2",
|
||||
task.WithSubOrchestratorInput("my-input"),
|
||||
task.WithSubOrchestratorAppID("App2"), // Here we set the target app ID which will execute this child workflow.
|
||||
err := ctx.CallChildWorkflow("Workflow2",
|
||||
workflow.WithChildWorkflowInput("my-input"),
|
||||
workflow.WithChildWorkflowAppID("App2"), // Here we set the target app ID which will execute this child workflow.
|
||||
).Await(&output)
|
||||
|
||||
if err != nil {
|
||||
|
|
|
@ -31,6 +31,12 @@ Select your [preferred language below]({{% ref "#sdk-languages" %}}) to learn mo
|
|||
| [C++](https://github.com/dapr/cpp-sdk) | In development | ✔ | | |
|
||||
| [Rust]({{% ref rust %}}) | In development | ✔ | | ✔ | |
|
||||
|
||||
|
||||
## Frameworks
|
||||
|
||||
| Framework | Language | Status | Description |
|
||||
|----------------------------------------|:----------------------|:---------------|:-----------------:|
|
||||
| [Dapr Agents]({{% ref "../dapr-agents" %}}) | Python | In development | A framework for building LLM-powered autonomous agents that leverages Dapr's distributed systems capabilities for durable execution, with built-in security, observability, and state management. |
|
||||
## Further reading
|
||||
|
||||
- [Serialization in the Dapr SDKs]({{% ref sdk-serialization.md %}})
|
||||
|
|
|
@ -25,6 +25,7 @@ Hit the ground running with our Dapr quickstarts, complete with code samples aim
|
|||
| [Service Invocation]({{% ref serviceinvocation-quickstart %}}) | Synchronous communication between two services using HTTP or gRPC. |
|
||||
| [Publish and Subscribe]({{% ref pubsub-quickstart %}}) | Asynchronous communication between two services using messaging. |
|
||||
| [Workflow]({{% ref workflow-quickstart %}}) | Orchestrate business workflow activities in long running, fault-tolerant, stateful applications. |
|
||||
| [Agents]({{% ref dapr-agents-quickstarts.md %}}) | Build LLM-powered autonomous agentic applications. |
|
||||
| [State Management]({{% ref statemanagement-quickstart %}}) | Store a service's data as key/value pairs in supported state stores. |
|
||||
| [Bindings]({{% ref bindings-quickstart %}}) | Work with external systems using input bindings to respond to events and output bindings to call operations. |
|
||||
| [Actors]({{% ref actors-quickstart %}}) | Run a microservice and a simple console client to demonstrate stateful object patterns in Dapr Actors. |
|
||||
|
@ -33,4 +34,4 @@ Hit the ground running with our Dapr quickstarts, complete with code samples aim
|
|||
| [Resiliency]({{% ref resiliency %}}) | Define and apply fault-tolerance policies to your Dapr API requests. |
|
||||
| [Cryptography]({{% ref cryptography-quickstart %}}) | Encrypt and decrypt data using Dapr's cryptographic APIs. |
|
||||
| [Jobs]({{% ref jobs-quickstart %}}) | Schedule, retrieve, and delete jobs using Dapr's jobs APIs. |
|
||||
| [Conversation]({{% ref conversation-quickstart %}}) | Securely and reliably interact with Large Language Models (LLMs). |
|
||||
| [Conversation]({{% ref conversation-quickstart %}}) | Securely and reliably interact with Large Language Models (LLMs). |
|
||||
|
|
|
@ -1756,11 +1756,8 @@ import (
|
|||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/durabletask-go/api"
|
||||
"github.com/dapr/durabletask-go/backend"
|
||||
"github.com/dapr/durabletask-go/client"
|
||||
"github.com/dapr/durabletask-go/task"
|
||||
dapr "github.com/dapr/go-sdk/client"
|
||||
"github.com/dapr/durabletask-go/workflow"
|
||||
"github.com/dapr/go-sdk/client"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -1774,43 +1771,46 @@ func main() {
|
|||
fmt.Println("*** Welcome to the Dapr Workflow console app sample!")
|
||||
fmt.Println("*** Using this app, you can place orders that start workflows.")
|
||||
|
||||
registry := task.NewTaskRegistry()
|
||||
r := workflow.NewRegistry()
|
||||
|
||||
if err := registry.AddOrchestrator(OrderProcessingWorkflow); err != nil {
|
||||
if err := r.AddWorkflow(OrderProcessingWorkflow); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := registry.AddActivity(NotifyActivity); err != nil {
|
||||
if err := r.AddActivity(NotifyActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := registry.AddActivity(RequestApprovalActivity); err != nil {
|
||||
if err := r.AddActivity(RequestApprovalActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := registry.AddActivity(VerifyInventoryActivity); err != nil {
|
||||
if err := r.AddActivity(VerifyInventoryActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := registry.AddActivity(ProcessPaymentActivity); err != nil {
|
||||
if err := r.AddActivity(ProcessPaymentActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := registry.AddActivity(UpdateInventoryActivity); err != nil {
|
||||
if err := r.AddActivity(UpdateInventoryActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
daprClient, err := dapr.NewClient()
|
||||
wfClient, err := client.NewWorkflowClient()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create Dapr client: %v", err)
|
||||
log.Fatalf("failed to initialise workflow client: %v", err)
|
||||
}
|
||||
|
||||
client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
|
||||
if err := client.StartWorkItemListener(context.TODO(), registry); err != nil {
|
||||
log.Fatalf("failed to start work item listener: %v", err)
|
||||
if err := wfClient.StartWorker(context.Background(), r); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
dclient, err := client.NewClient()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
inventory := []InventoryItem{
|
||||
{ItemName: "paperclip", PerItemCost: 5, Quantity: 100},
|
||||
{ItemName: "cars", PerItemCost: 5000, Quantity: 10},
|
||||
{ItemName: "computers", PerItemCost: 500, Quantity: 100},
|
||||
}
|
||||
if err := restockInventory(daprClient, inventory); err != nil {
|
||||
if err := restockInventory(dclient, inventory); err != nil {
|
||||
log.Fatalf("failed to restock: %v", err)
|
||||
}
|
||||
|
||||
|
@ -1827,31 +1827,29 @@ func main() {
|
|||
TotalCost: totalCost,
|
||||
}
|
||||
|
||||
id, err := client.ScheduleNewOrchestration(context.TODO(), workflowName,
|
||||
api.WithInput(orderPayload),
|
||||
)
|
||||
id, err := wfClient.ScheduleWorkflow(context.Background(), workflowName, workflow.WithInput(orderPayload))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start workflow: %v", err)
|
||||
}
|
||||
|
||||
waitCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
_, err = client.WaitForOrchestrationCompletion(waitCtx, id)
|
||||
_, err = wfClient.WaitForWorkflowCompletion(waitCtx, id)
|
||||
cancel()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to wait for workflow: %v", err)
|
||||
}
|
||||
|
||||
respFetch, err := client.FetchOrchestrationMetadata(context.Background(), id, api.WithFetchPayloads(true))
|
||||
respFetch, err := wfClient.FetchWorkflowMetadata(context.Background(), id, workflow.WithFetchPayloads(true))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to get workflow: %v", err)
|
||||
}
|
||||
|
||||
fmt.Printf("workflow status: %v\n", respFetch.RuntimeStatus)
|
||||
fmt.Printf("workflow status: %v\n", respFetch.String())
|
||||
|
||||
fmt.Println("Purchase of item is complete")
|
||||
}
|
||||
|
||||
func restockInventory(daprClient dapr.Client, inventory []InventoryItem) error {
|
||||
func restockInventory(daprClient client.Client, inventory []InventoryItem) error {
|
||||
for _, item := range inventory {
|
||||
itemSerialized, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
|
@ -1879,18 +1877,18 @@ import (
|
|||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/durabletask-go/task"
|
||||
"github.com/dapr/durabletask-go/workflow"
|
||||
"github.com/dapr/go-sdk/client"
|
||||
)
|
||||
|
||||
// OrderProcessingWorkflow is the main workflow for orchestrating activities in the order process.
|
||||
func OrderProcessingWorkflow(ctx *task.OrchestrationContext) (any, error) {
|
||||
orderID := ctx.ID
|
||||
func OrderProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
orderID := ctx.ID()
|
||||
var orderPayload OrderPayload
|
||||
if err := ctx.GetInput(&orderPayload); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{
|
||||
err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(Notification{
|
||||
Message: fmt.Sprintf("Received order %s for %d %s - $%d", orderID, orderPayload.Quantity, orderPayload.ItemName, orderPayload.TotalCost),
|
||||
})).Await(nil)
|
||||
if err != nil {
|
||||
|
@ -1898,8 +1896,8 @@ func OrderProcessingWorkflow(ctx *task.OrchestrationContext) (any, error) {
|
|||
}
|
||||
|
||||
var verifyInventoryResult InventoryResult
|
||||
if err := ctx.CallActivity(VerifyInventoryActivity, task.WithActivityInput(InventoryRequest{
|
||||
RequestID: string(orderID),
|
||||
if err := ctx.CallActivity(VerifyInventoryActivity, workflow.WithActivityInput(InventoryRequest{
|
||||
RequestID: orderID,
|
||||
ItemName: orderPayload.ItemName,
|
||||
Quantity: orderPayload.Quantity,
|
||||
})).Await(&verifyInventoryResult); err != nil {
|
||||
|
@ -1908,64 +1906,64 @@ func OrderProcessingWorkflow(ctx *task.OrchestrationContext) (any, error) {
|
|||
|
||||
if !verifyInventoryResult.Success {
|
||||
notification := Notification{Message: fmt.Sprintf("Insufficient inventory for %s", orderPayload.ItemName)}
|
||||
err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(notification)).Await(nil)
|
||||
err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(notification)).Await(nil)
|
||||
return OrderResult{Processed: false}, err
|
||||
}
|
||||
|
||||
if orderPayload.TotalCost > 5000 {
|
||||
var approvalRequired ApprovalRequired
|
||||
if err := ctx.CallActivity(RequestApprovalActivity, task.WithActivityInput(orderPayload)).Await(&approvalRequired); err != nil {
|
||||
if err := ctx.CallActivity(RequestApprovalActivity, workflow.WithActivityInput(orderPayload)).Await(&approvalRequired); err != nil {
|
||||
return OrderResult{Processed: false}, err
|
||||
}
|
||||
if err := ctx.WaitForSingleEvent("manager_approval", time.Second*200).Await(nil); err != nil {
|
||||
if err := ctx.WaitForExternalEvent("manager_approval", time.Second*200).Await(nil); err != nil {
|
||||
return OrderResult{Processed: false}, err
|
||||
}
|
||||
// TODO: Confirm timeout flow - this will be in the form of an error.
|
||||
if approvalRequired.Approval {
|
||||
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been approved!", orderID)})).Await(nil); err != nil {
|
||||
if err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been approved!", orderID)})).Await(nil); err != nil {
|
||||
log.Printf("failed to notify of a successful order: %v\n", err)
|
||||
}
|
||||
} else {
|
||||
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been rejected!", orderID)})).Await(nil); err != nil {
|
||||
if err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been rejected!", orderID)})).Await(nil); err != nil {
|
||||
log.Printf("failed to notify of an unsuccessful order :%v\n", err)
|
||||
}
|
||||
return OrderResult{Processed: false}, err
|
||||
}
|
||||
}
|
||||
err = ctx.CallActivity(ProcessPaymentActivity, task.WithActivityInput(PaymentRequest{
|
||||
RequestID: string(orderID),
|
||||
err = ctx.CallActivity(ProcessPaymentActivity, workflow.WithActivityInput(PaymentRequest{
|
||||
RequestID: orderID,
|
||||
ItemBeingPurchased: orderPayload.ItemName,
|
||||
Amount: orderPayload.TotalCost,
|
||||
Quantity: orderPayload.Quantity,
|
||||
})).Await(nil)
|
||||
if err != nil {
|
||||
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
|
||||
if err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
|
||||
log.Printf("failed to notify of a failed order: %v", err)
|
||||
}
|
||||
return OrderResult{Processed: false}, err
|
||||
}
|
||||
|
||||
err = ctx.CallActivity(UpdateInventoryActivity, task.WithActivityInput(PaymentRequest{
|
||||
RequestID: string(orderID),
|
||||
err = ctx.CallActivity(UpdateInventoryActivity, workflow.WithActivityInput(PaymentRequest{
|
||||
RequestID: orderID,
|
||||
ItemBeingPurchased: orderPayload.ItemName,
|
||||
Amount: orderPayload.TotalCost,
|
||||
Quantity: orderPayload.Quantity,
|
||||
})).Await(nil)
|
||||
if err != nil {
|
||||
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
|
||||
if err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
|
||||
log.Printf("failed to notify of a failed order: %v", err)
|
||||
}
|
||||
return OrderResult{Processed: false}, err
|
||||
}
|
||||
|
||||
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s has completed!", orderID)})).Await(nil); err != nil {
|
||||
if err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s has completed!", orderID)})).Await(nil); err != nil {
|
||||
log.Printf("failed to notify of a successful order: %v", err)
|
||||
}
|
||||
return OrderResult{Processed: true}, err
|
||||
}
|
||||
|
||||
// NotifyActivity outputs a notification message
|
||||
func NotifyActivity(ctx task.ActivityContext) (any, error) {
|
||||
func NotifyActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
var input Notification
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return "", err
|
||||
|
@ -1975,7 +1973,7 @@ func NotifyActivity(ctx task.ActivityContext) (any, error) {
|
|||
}
|
||||
|
||||
// ProcessPaymentActivity is used to process a payment
|
||||
func ProcessPaymentActivity(ctx task.ActivityContext) (any, error) {
|
||||
func ProcessPaymentActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
var input PaymentRequest
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return "", err
|
||||
|
@ -1985,7 +1983,7 @@ func ProcessPaymentActivity(ctx task.ActivityContext) (any, error) {
|
|||
}
|
||||
|
||||
// VerifyInventoryActivity is used to verify if an item is available in the inventory
|
||||
func VerifyInventoryActivity(ctx task.ActivityContext) (any, error) {
|
||||
func VerifyInventoryActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
var input InventoryRequest
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return nil, err
|
||||
|
@ -2017,7 +2015,7 @@ func VerifyInventoryActivity(ctx task.ActivityContext) (any, error) {
|
|||
}
|
||||
|
||||
// UpdateInventoryActivity modifies the inventory.
|
||||
func UpdateInventoryActivity(ctx task.ActivityContext) (any, error) {
|
||||
func UpdateInventoryActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
var input PaymentRequest
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return nil, err
|
||||
|
@ -2051,7 +2049,7 @@ func UpdateInventoryActivity(ctx task.ActivityContext) (any, error) {
|
|||
}
|
||||
|
||||
// RequestApprovalActivity requests approval for the order
|
||||
func RequestApprovalActivity(ctx task.ActivityContext) (any, error) {
|
||||
func RequestApprovalActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
var input OrderPayload
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return nil, err
|
||||
|
|
Loading…
Reference in New Issue