mirror of https://github.com/dapr/quickstarts.git
feat: initial go order-processor workflow quickstart
Signed-off-by: mikeee <hey@mike.ee>
This commit is contained in:
parent
e30c06e1f4
commit
67cff3ab89
|
|
@ -0,0 +1,140 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/go-sdk/client"
|
||||
"github.com/dapr/go-sdk/workflow"
|
||||
)
|
||||
|
||||
var (
|
||||
stateStoreName = "statestore"
|
||||
workflowComponent = "dapr"
|
||||
workflowName = "OrderProcessingWorkflow"
|
||||
defaultItemName = "cars"
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Println("*** Welcome to the Dapr Workflow console app sample!")
|
||||
fmt.Println("*** Using this app, you can place orders that start workflows.")
|
||||
|
||||
w, err := workflow.NewWorker()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start worker: %v", err)
|
||||
}
|
||||
|
||||
if err := w.RegisterWorkflow(OrderProcessingWorkflow); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := w.RegisterActivity(NotifyActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := w.RegisterActivity(RequestApprovalActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := w.RegisterActivity(VerifyInventoryActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := w.RegisterActivity(ProcessPaymentActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if err := w.RegisterActivity(UpdateInventoryActivity); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
if err := w.Start(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
daprClient, err := client.NewClient()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to initialise dapr client: %v", err)
|
||||
}
|
||||
wfClient, err := workflow.NewClient(workflow.WithDaprClient(daprClient))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to initialise workflow client: %v", err)
|
||||
}
|
||||
|
||||
inventory := []InventoryItem{
|
||||
{ItemName: "paperclip", PerItemCost: 5, Quantity: 100},
|
||||
{ItemName: "cars", PerItemCost: 15000, Quantity: 100},
|
||||
{ItemName: "computers", PerItemCost: 500, Quantity: 100},
|
||||
}
|
||||
if err := restockInventory(daprClient, inventory); err != nil {
|
||||
log.Fatalf("failed to restock: %v", err)
|
||||
}
|
||||
|
||||
fmt.Println("==========Begin the purchase of item:==========")
|
||||
|
||||
itemName := defaultItemName
|
||||
orderQuantity := 10
|
||||
|
||||
totalCost := inventory[1].PerItemCost * orderQuantity
|
||||
|
||||
orderPayload := OrderPayload{
|
||||
ItemName: itemName,
|
||||
Quantity: orderQuantity,
|
||||
TotalCost: totalCost,
|
||||
}
|
||||
|
||||
id, err := wfClient.ScheduleNewWorkflow(context.Background(), workflowName, workflow.WithInput(orderPayload))
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start workflow: %v", err)
|
||||
}
|
||||
|
||||
approvalSought := false
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
for {
|
||||
timeDelta := time.Since(startTime)
|
||||
metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to fetch workflow: %v", err)
|
||||
}
|
||||
if (metadata.RuntimeStatus == workflow.StatusCompleted) || (metadata.RuntimeStatus == workflow.StatusFailed) || (metadata.RuntimeStatus == workflow.StatusTerminated) {
|
||||
fmt.Printf("Workflow completed - result: %v\n", metadata.RuntimeStatus.String())
|
||||
break
|
||||
}
|
||||
if timeDelta.Seconds() >= 1 {
|
||||
metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to fetch workflow: %v", err)
|
||||
}
|
||||
if totalCost > 50000 && !approvalSought && ((metadata.RuntimeStatus != workflow.StatusCompleted) || (metadata.RuntimeStatus != workflow.StatusFailed) || (metadata.RuntimeStatus != workflow.StatusTerminated)) {
|
||||
approvalSought = true
|
||||
promptForApproval(id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Println("Purchase of item is complete")
|
||||
}
|
||||
|
||||
func promptForApproval(id string) {
|
||||
wfClient, err := workflow.NewClient()
|
||||
if err != nil {
|
||||
log.Fatalf("failed to initialise wfClient: %v", err)
|
||||
}
|
||||
if err := wfClient.RaiseEvent(context.Background(), id, "manager_approval"); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func restockInventory(daprClient client.Client, inventory []InventoryItem) error {
|
||||
for _, item := range inventory {
|
||||
itemSerialized, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("adding base stock item: %s\n", item.ItemName)
|
||||
if err := daprClient.SaveState(context.Background(), stateStoreName, item.ItemName, itemSerialized, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
package main
|
||||
|
||||
type OrderPayload struct {
|
||||
ItemName string `json:"item_name"`
|
||||
TotalCost int `json:"total_cost"`
|
||||
Quantity int `json:"quanity"`
|
||||
}
|
||||
|
||||
type OrderResult struct {
|
||||
Processed bool `json:"processed"`
|
||||
}
|
||||
|
||||
type InventoryItem struct {
|
||||
ItemName string `json:"item_name"`
|
||||
PerItemCost int `json:"per_item_cost"`
|
||||
Quantity int `json:"quanity"`
|
||||
}
|
||||
|
||||
type InventoryRequest struct {
|
||||
RequestID string `json:"request_id"`
|
||||
ItemName string `json:"item_name"`
|
||||
Quantity int `json:"quanity"`
|
||||
}
|
||||
|
||||
type InventoryResult struct {
|
||||
Success bool `json:"success"`
|
||||
InventoryItem InventoryItem `json:"inventory_item"`
|
||||
}
|
||||
|
||||
type PaymentRequest struct {
|
||||
RequestID string `json:"request_id"`
|
||||
ItemBeingPurchased string `json:"item_being_purchased"`
|
||||
Amount int `json:"amount"`
|
||||
Quantity int `json:"quantity"`
|
||||
}
|
||||
|
||||
type ApprovalRequired struct {
|
||||
Approval bool `json:"approval"`
|
||||
}
|
||||
|
||||
type Notification struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
|
@ -0,0 +1,176 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/go-sdk/client"
|
||||
"github.com/dapr/go-sdk/workflow"
|
||||
)
|
||||
|
||||
func OrderProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||
orderID := ctx.InstanceID()
|
||||
var orderPayload OrderPayload
|
||||
if err := ctx.GetInput(&orderPayload); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Received order %s for %d %s - $%d", orderID, orderPayload.Quantity, orderPayload.ItemName, orderPayload.TotalCost)})).Await(nil); err != nil {
|
||||
return OrderResult{Processed: false}, err
|
||||
}
|
||||
|
||||
var verifyInventoryResult InventoryResult
|
||||
if err := ctx.CallActivity(VerifyInventoryActivity, workflow.ActivityInput(InventoryRequest{
|
||||
RequestID: orderID,
|
||||
ItemName: orderPayload.ItemName,
|
||||
Quantity: orderPayload.Quantity,
|
||||
})).Await(&verifyInventoryResult); err != nil {
|
||||
return OrderResult{Processed: false}, err
|
||||
}
|
||||
|
||||
if !verifyInventoryResult.Success {
|
||||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Insufficient inventory for %s", orderPayload.ItemName)})).Await(nil); err != nil {
|
||||
return OrderResult{Processed: false}, err
|
||||
}
|
||||
}
|
||||
|
||||
if orderPayload.TotalCost > 50000 {
|
||||
var approvalRequired ApprovalRequired
|
||||
if err := ctx.CallActivity(RequestApprovalActivity, workflow.ActivityInput(orderPayload)).Await(&approvalRequired); err != nil {
|
||||
return OrderResult{Processed: false}, err
|
||||
}
|
||||
if err := ctx.WaitForExternalEvent("manager_approval", time.Second*200).Await(nil); err != nil {
|
||||
return OrderResult{Processed: false}, err
|
||||
}
|
||||
// TODO: Confirm timeout flow - this will be in the form of an error.
|
||||
if approvalRequired.Approval {
|
||||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been approved!", orderID)})).Await(nil); err != nil {
|
||||
log.Printf("failed to notify of a successful order: %v\n", err)
|
||||
}
|
||||
} else {
|
||||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been rejected!", orderID)})).Await(nil); err != nil {
|
||||
log.Printf("failed to notify of an unsuccessful order :%v\n", err)
|
||||
}
|
||||
return OrderResult{Processed: false}, nil
|
||||
}
|
||||
}
|
||||
if err := ctx.CallActivity(ProcessPaymentActivity, workflow.ActivityInput(PaymentRequest{
|
||||
RequestID: orderID,
|
||||
ItemBeingPurchased: orderPayload.ItemName,
|
||||
Amount: orderPayload.TotalCost,
|
||||
Quantity: orderPayload.Quantity,
|
||||
})).Await(nil); err != nil {
|
||||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
|
||||
log.Printf("failed to notify of a failed order: %v", err)
|
||||
}
|
||||
return OrderResult{Processed: false}, err
|
||||
}
|
||||
if err := ctx.CallActivity(UpdateInventoryActivity, workflow.ActivityInput(PaymentRequest{RequestID: orderID, ItemBeingPurchased: orderPayload.ItemName, Amount: orderPayload.TotalCost, Quantity: orderPayload.Quantity})).Await(nil); err != nil {
|
||||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
|
||||
log.Printf("failed to notify of a failed order: %v", err)
|
||||
}
|
||||
return OrderResult{Processed: false}, err
|
||||
}
|
||||
|
||||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s has completed!", orderID)})).Await(nil); err != nil {
|
||||
log.Printf("failed to notify of a successful order: %v", err)
|
||||
}
|
||||
return OrderResult{Processed: true}, nil
|
||||
}
|
||||
|
||||
// NotifyActivity outputs a notification message
|
||||
func NotifyActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
var input Notification
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return "", err
|
||||
}
|
||||
fmt.Printf("NotifyActivity: %s\n", input.Message)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// ProcessPaymentActivity is used to process a payment
|
||||
func ProcessPaymentActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
var input PaymentRequest
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return "", err
|
||||
}
|
||||
fmt.Printf("ProcessPaymentActivity: %s for %d - %s (%dUSD)\n", input.RequestID, input.Quantity, input.ItemBeingPurchased, input.Amount)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// VerifyInventoryActivity is used to verify if an item is availabe in the inventory
|
||||
// g
|
||||
func VerifyInventoryActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
var input InventoryRequest
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fmt.Printf("VerifyInventoryActivity: Verifying inventory for order %s of %d %s\n", input.RequestID, input.Quantity, input.ItemName)
|
||||
dClient, err := client.NewClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
item, err := dClient.GetState(context.Background(), stateStoreName, input.ItemName, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if item == nil {
|
||||
return InventoryResult{
|
||||
Success: false,
|
||||
InventoryItem: InventoryItem{},
|
||||
}, nil
|
||||
}
|
||||
var result InventoryItem
|
||||
if err := json.Unmarshal(item.Value, &result); err != nil {
|
||||
log.Fatalf("failed to parse inventory result %v", err)
|
||||
}
|
||||
fmt.Printf("VerifyInventoryActivity: There are %d of %s available for purchase\n", result.Quantity, result.ItemName)
|
||||
if result.Quantity >= input.Quantity {
|
||||
return InventoryResult{Success: true, InventoryItem: result}, nil
|
||||
}
|
||||
return InventoryResult{Success: false, InventoryItem: InventoryItem{}}, nil
|
||||
}
|
||||
|
||||
func UpdateInventoryActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
var input PaymentRequest
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fmt.Printf("UpdateInventoryActivity: Checking Inventory for order %s for %d * %s\n", input.RequestID, input.Quantity, input.ItemBeingPurchased)
|
||||
dClient, err := client.NewClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
item, err := dClient.GetState(context.Background(), stateStoreName, input.ItemBeingPurchased, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var result InventoryItem
|
||||
err = json.Unmarshal(item.Value, &result)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newQuantity := result.Quantity - input.Quantity
|
||||
if newQuantity < 0 {
|
||||
return nil, fmt.Errorf("insufficient inventory for: %s", input.ItemBeingPurchased)
|
||||
}
|
||||
result.Quantity = newQuantity
|
||||
newState, err := json.Marshal(result)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to marshal new state: %v", err)
|
||||
}
|
||||
dClient.SaveState(context.Background(), stateStoreName, input.ItemBeingPurchased, newState, nil)
|
||||
fmt.Printf("UpdateInventoryActivity: There are now %d %s left in stock\n", result.Quantity, result.ItemName)
|
||||
return InventoryResult{Success: true, InventoryItem: result}, nil
|
||||
}
|
||||
|
||||
func RequestApprovalActivity(ctx workflow.ActivityContext) (any, error) {
|
||||
var input OrderPayload
|
||||
if err := ctx.GetInput(&input); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fmt.Printf("RequestApprovalActivity: Requesting approval for payment of %dUSD for %d %s\n", input.TotalCost, input.Quantity, input.ItemName)
|
||||
return ApprovalRequired{Approval: true}, nil
|
||||
}
|
||||
Loading…
Reference in New Issue