mirror of https://github.com/dapr/docs.git
Start of Python QS doc
Signed-off-by: Nick Greenfield <nigreenf@microsoft.com>
This commit is contained in:
parent
f0de8174eb
commit
8302ace93a
|
@ -1,258 +0,0 @@
|
||||||
---
|
|
||||||
type: docs
|
|
||||||
title: "How to: Use the gRPC interface in your Dapr application"
|
|
||||||
linkTitle: "How to: gRPC interface"
|
|
||||||
weight: 6000
|
|
||||||
description: "Use the Dapr gRPC API in your application"
|
|
||||||
type: docs
|
|
||||||
---
|
|
||||||
|
|
||||||
Dapr implements both an HTTP and a gRPC API for local calls. [gRPC](https://grpc.io/) is useful for low-latency, high performance scenarios and has language integration using the proto clients.
|
|
||||||
|
|
||||||
[Find a list of auto-generated clients in the Dapr SDK documentation]({{< ref sdks >}}).
|
|
||||||
|
|
||||||
The Dapr runtime implements a [proto service](https://github.com/dapr/dapr/blob/master/dapr/proto/runtime/v1/dapr.proto) that apps can communicate with via gRPC.
|
|
||||||
|
|
||||||
In addition to calling Dapr via gRPC, Dapr supports service-to-service calls with gRPC by acting as a proxy. [Learn more in the gRPC service invocation how-to guide]({{< ref howto-invoke-services-grpc.md >}}).
|
|
||||||
|
|
||||||
This guide demonstrates configuring and invoking Dapr with gRPC using a Go SDK application.
|
|
||||||
|
|
||||||
## Configure Dapr to communicate with an app via gRPC
|
|
||||||
|
|
||||||
{{< tabs "Self-hosted" "Kubernetes">}}
|
|
||||||
<!--selfhosted-->
|
|
||||||
{{% codetab %}}
|
|
||||||
|
|
||||||
When running in self-hosted mode, use the `--app-protocol` flag to tell Dapr to use gRPC to talk to the app.
|
|
||||||
|
|
||||||
```bash
|
|
||||||
dapr run --app-protocol grpc --app-port 5005 node app.js
|
|
||||||
```
|
|
||||||
|
|
||||||
This tells Dapr to communicate with your app via gRPC over port `5005`.
|
|
||||||
|
|
||||||
{{% /codetab %}}
|
|
||||||
|
|
||||||
<!--k8s-->
|
|
||||||
{{% codetab %}}
|
|
||||||
|
|
||||||
On Kubernetes, set the following annotations in your deployment YAML:
|
|
||||||
|
|
||||||
```yaml
|
|
||||||
apiVersion: apps/v1
|
|
||||||
kind: Deployment
|
|
||||||
metadata:
|
|
||||||
name: myapp
|
|
||||||
namespace: default
|
|
||||||
labels:
|
|
||||||
app: myapp
|
|
||||||
spec:
|
|
||||||
replicas: 1
|
|
||||||
selector:
|
|
||||||
matchLabels:
|
|
||||||
app: myapp
|
|
||||||
template:
|
|
||||||
metadata:
|
|
||||||
labels:
|
|
||||||
app: myapp
|
|
||||||
annotations:
|
|
||||||
dapr.io/enabled: "true"
|
|
||||||
dapr.io/app-id: "myapp"
|
|
||||||
dapr.io/app-protocol: "grpc"
|
|
||||||
dapr.io/app-port: "5005"
|
|
||||||
...
|
|
||||||
```
|
|
||||||
|
|
||||||
{{% /codetab %}}
|
|
||||||
|
|
||||||
{{< /tabs >}}
|
|
||||||
|
|
||||||
## Invoke Dapr with gRPC
|
|
||||||
|
|
||||||
The following steps show how to create a Dapr client and call the `SaveStateData` operation on it.
|
|
||||||
|
|
||||||
1. Import the package:
|
|
||||||
|
|
||||||
```go
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
dapr "github.com/dapr/go-sdk/client"
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
1. Create the client:
|
|
||||||
|
|
||||||
```go
|
|
||||||
// just for this demo
|
|
||||||
ctx := context.Background()
|
|
||||||
data := []byte("ping")
|
|
||||||
|
|
||||||
// create the client
|
|
||||||
client, err := dapr.NewClient()
|
|
||||||
if err != nil {
|
|
||||||
log.Panic(err)
|
|
||||||
}
|
|
||||||
defer client.Close()
|
|
||||||
```
|
|
||||||
|
|
||||||
3. Invoke the `SaveState` method:
|
|
||||||
|
|
||||||
```go
|
|
||||||
// save state with the key key1
|
|
||||||
err = client.SaveState(ctx, "statestore", "key1", data)
|
|
||||||
if err != nil {
|
|
||||||
log.Panic(err)
|
|
||||||
}
|
|
||||||
log.Println("data saved")
|
|
||||||
```
|
|
||||||
|
|
||||||
Now you can explore all the different methods on the Dapr client.
|
|
||||||
|
|
||||||
## Create a gRPC app with Dapr
|
|
||||||
|
|
||||||
The following steps will show how to create an app that exposes a server for with which Dapr can communicate.
|
|
||||||
|
|
||||||
1. Import the package:
|
|
||||||
|
|
||||||
```go
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"net"
|
|
||||||
|
|
||||||
"github.com/golang/protobuf/ptypes/any"
|
|
||||||
"github.com/golang/protobuf/ptypes/empty"
|
|
||||||
|
|
||||||
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
|
|
||||||
pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
1. Implement the interface:
|
|
||||||
|
|
||||||
```go
|
|
||||||
// server is our user app
|
|
||||||
type server struct {
|
|
||||||
pb.UnimplementedAppCallbackServer
|
|
||||||
}
|
|
||||||
|
|
||||||
// EchoMethod is a simple demo method to invoke
|
|
||||||
func (s *server) EchoMethod() string {
|
|
||||||
return "pong"
|
|
||||||
}
|
|
||||||
|
|
||||||
// This method gets invoked when a remote service has called the app through Dapr
|
|
||||||
// The payload carries a Method to identify the method, a set of metadata properties and an optional payload
|
|
||||||
func (s *server) OnInvoke(ctx context.Context, in *commonv1pb.InvokeRequest) (*commonv1pb.InvokeResponse, error) {
|
|
||||||
var response string
|
|
||||||
|
|
||||||
switch in.Method {
|
|
||||||
case "EchoMethod":
|
|
||||||
response = s.EchoMethod()
|
|
||||||
}
|
|
||||||
|
|
||||||
return &commonv1pb.InvokeResponse{
|
|
||||||
ContentType: "text/plain; charset=UTF-8",
|
|
||||||
Data: &any.Any{Value: []byte(response)},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dapr will call this method to get the list of topics the app wants to subscribe to. In this example, we are telling Dapr
|
|
||||||
// To subscribe to a topic named TopicA
|
|
||||||
func (s *server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*pb.ListTopicSubscriptionsResponse, error) {
|
|
||||||
return &pb.ListTopicSubscriptionsResponse{
|
|
||||||
Subscriptions: []*pb.TopicSubscription{
|
|
||||||
{Topic: "TopicA"},
|
|
||||||
},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dapr will call this method to get the list of bindings the app will get invoked by. In this example, we are telling Dapr
|
|
||||||
// To invoke our app with a binding named storage
|
|
||||||
func (s *server) ListInputBindings(ctx context.Context, in *empty.Empty) (*pb.ListInputBindingsResponse, error) {
|
|
||||||
return &pb.ListInputBindingsResponse{
|
|
||||||
Bindings: []string{"storage"},
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// This method gets invoked every time a new event is fired from a registered binding. The message carries the binding name, a payload and optional metadata
|
|
||||||
func (s *server) OnBindingEvent(ctx context.Context, in *pb.BindingEventRequest) (*pb.BindingEventResponse, error) {
|
|
||||||
fmt.Println("Invoked from binding")
|
|
||||||
return &pb.BindingEventResponse{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// This method is fired whenever a message has been published to a topic that has been subscribed. Dapr sends published messages in a CloudEvents 0.3 envelope.
|
|
||||||
func (s *server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*pb.TopicEventResponse, error) {
|
|
||||||
fmt.Println("Topic message arrived")
|
|
||||||
return &pb.TopicEventResponse{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
```
|
|
||||||
|
|
||||||
1. Create the server:
|
|
||||||
|
|
||||||
```go
|
|
||||||
func main() {
|
|
||||||
// create listener
|
|
||||||
lis, err := net.Listen("tcp", ":50001")
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed to listen: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// create grpc server
|
|
||||||
s := grpc.NewServer()
|
|
||||||
pb.RegisterAppCallbackServer(s, &server{})
|
|
||||||
|
|
||||||
fmt.Println("Client starting...")
|
|
||||||
|
|
||||||
// and start...
|
|
||||||
if err := s.Serve(lis); err != nil {
|
|
||||||
log.Fatalf("failed to serve: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
This creates a gRPC server for your app on port 50001.
|
|
||||||
|
|
||||||
## Run the application
|
|
||||||
|
|
||||||
{{< tabs "Self-hosted" "Kubernetes">}}
|
|
||||||
<!--selfhosted-->
|
|
||||||
{{% codetab %}}
|
|
||||||
|
|
||||||
To run locally, use the Dapr CLI:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
dapr run --app-id goapp --app-port 50001 --app-protocol grpc go run main.go
|
|
||||||
```
|
|
||||||
|
|
||||||
{{% /codetab %}}
|
|
||||||
|
|
||||||
<!--k8s-->
|
|
||||||
{{% codetab %}}
|
|
||||||
|
|
||||||
On Kubernetes, set the required `dapr.io/app-protocol: "grpc"` and `dapr.io/app-port: "50001` annotations in your pod spec template, as mentioned above.
|
|
||||||
|
|
||||||
{{% /codetab %}}
|
|
||||||
|
|
||||||
{{< /tabs >}}
|
|
||||||
|
|
||||||
|
|
||||||
## Other languages
|
|
||||||
|
|
||||||
You can use Dapr with any language supported by Protobuf, and not just with the currently available generated SDKs.
|
|
||||||
|
|
||||||
Using the [protoc](https://developers.google.com/protocol-buffers/docs/downloads) tool, you can generate the Dapr clients for other languages like Ruby, C++, Rust, and others.
|
|
||||||
|
|
||||||
## Related Topics
|
|
||||||
- [Service invocation building block]({{< ref service-invocation >}})
|
|
||||||
- [Service invocation API specification]({{< ref service_invocation_api.md >}})
|
|
|
@ -28,7 +28,7 @@ In this guide, you'll:
|
||||||
|
|
||||||
Currently, you can experience the Dapr Workflow using the .NET SDK.
|
Currently, you can experience the Dapr Workflow using the .NET SDK.
|
||||||
|
|
||||||
{{< tabs ".NET" >}}
|
{{< tabs ".NET" "Python" >}}
|
||||||
|
|
||||||
<!-- .NET -->
|
<!-- .NET -->
|
||||||
{{% codetab %}}
|
{{% codetab %}}
|
||||||
|
@ -254,8 +254,256 @@ The `Activities` directory holds the four workflow activities used by the workfl
|
||||||
- `ProcessPaymentActivity.cs`
|
- `ProcessPaymentActivity.cs`
|
||||||
- `UpdateInventoryActivity.cs`
|
- `UpdateInventoryActivity.cs`
|
||||||
|
|
||||||
|
{{% /codetab %}}
|
||||||
|
|
||||||
|
<!-- Python -->
|
||||||
|
{{% codetab %}}
|
||||||
|
|
||||||
|
### Step 1: Pre-requisites
|
||||||
|
|
||||||
|
For this example, you will need:
|
||||||
|
|
||||||
|
- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started).
|
||||||
|
- [Python 3.7+ installed](https://www.python.org/downloads/).
|
||||||
|
<!-- IGNORE_LINKS -->
|
||||||
|
- [Docker Desktop](https://www.docker.com/products/docker-desktop)
|
||||||
|
<!-- END_IGNORE -->
|
||||||
|
|
||||||
|
### Step 2: Set up the environment
|
||||||
|
|
||||||
|
Clone the [sample provided in the Quickstarts repo](https://github.com/dapr/quickstarts/tree/master/workflows).
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git clone https://github.com/dapr/quickstarts.git
|
||||||
|
```
|
||||||
|
|
||||||
|
In a new terminal window, navigate to the `order-processor` directory:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd workflows/python/sdk/order-processor
|
||||||
|
```
|
||||||
|
|
||||||
|
### Step 3: Run the order processor app
|
||||||
|
|
||||||
|
In the terminal, start the order processor app alongside a Dapr sidecar:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pip3 install -r requirements.txt
|
||||||
|
dapr run --app-id order-processor --components-path ../../../components/ -- python3 app.py
|
||||||
|
```
|
||||||
|
|
||||||
|
This starts the `order-processor` app with unique workflow ID and runs the workflow activities.
|
||||||
|
|
||||||
|
Expected output:
|
||||||
|
|
||||||
|
```
|
||||||
|
== APP == *** Welcome to the Dapr Workflow console app sample!
|
||||||
|
== APP == *** Using this app, you can place orders that start workflows.
|
||||||
|
== APP == 2023-06-06 09:35:52.892 durabletask-worker INFO: Starting gRPC worker that connects to 127.0.0.1:65406
|
||||||
|
== APP == item: InventoryItem(item_name=Paperclip, per_item_cost=5, quantity=100)
|
||||||
|
== APP == item: InventoryItem(item_name=Cars, per_item_cost=15000, quantity=100)
|
||||||
|
== APP == item: InventoryItem(item_name=Computers, per_item_cost=500, quantity=100)
|
||||||
|
== APP == ==========Begin the purchase of item:==========
|
||||||
|
== APP == Starting order workflow, purchasing 11 of cars
|
||||||
|
== APP == 2023-06-06 09:35:52.945 durabletask-worker INFO: Successfully connected to 127.0.0.1:65406. Waiting for work items...
|
||||||
|
== APP == 2023-06-06 09:35:55.960 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Waiting for 1 task(s) and 0 event(s).
|
||||||
|
== APP == INFO:NotifyActivity:Received order f4e1926e-3721-478d-be8a-f5bebd1995da for 11 cars at $165000 !
|
||||||
|
== APP == 2023-06-06 09:35:56.001 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Waiting for 1 task(s) and 0 event(s).
|
||||||
|
== APP == INFO:VerifyInventoryActivity:Verifying inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da of 11 cars
|
||||||
|
== APP == INFO:VerifyInventoryActivity:There are 100 Cars available for purchase
|
||||||
|
== APP == 2023-06-06 09:35:56.035 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Waiting for 1 task(s) and 0 event(s).
|
||||||
|
== APP == INFO:RequestApprovalActivity:Requesting approval for payment of 165000 USD for 11 cars
|
||||||
|
== APP == 2023-06-06 09:35:56.071 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Waiting for 1 task(s) and 1 event(s).
|
||||||
|
== APP == 2023-06-06 09:36:05.969 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da Event raised: manager_approval
|
||||||
|
== APP == 2023-06-06 09:36:05.969 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Waiting for 2 task(s) and 0 event(s).
|
||||||
|
== APP == INFO:NotifyActivity:Payment for order f4e1926e-3721-478d-be8a-f5bebd1995da has been approved!
|
||||||
|
== APP == 2023-06-06 09:36:06.000 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Waiting for 2 task(s) and 0 event(s).
|
||||||
|
== APP == INFO:ProcessPaymentActivity:Processing payment: f4e1926e-3721-478d-be8a-f5bebd1995da for 11 cars at 165000 USD
|
||||||
|
== APP == INFO:ProcessPaymentActivity:Payment for request ID f4e1926e-3721-478d-be8a-f5bebd1995da processed successfully
|
||||||
|
== APP == 2023-06-06 09:36:06.035 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Waiting for 2 task(s) and 0 event(s).
|
||||||
|
== APP == INFO:UpdateInventoryActivity:Checking inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da for 11 cars
|
||||||
|
== APP == INFO:UpdateInventoryActivity:There are now 89 cars left in stock
|
||||||
|
== APP == 2023-06-06 09:36:06.071 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Waiting for 2 task(s) and 0 event(s).
|
||||||
|
== APP == INFO:NotifyActivity:Order f4e1926e-3721-478d-be8a-f5bebd1995da has completed!
|
||||||
|
== APP == 2023-06-06 09:36:06.106 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Orchestration completed with status: COMPLETED
|
||||||
|
== APP == Workflow completed! Result: Completed
|
||||||
|
== APP == Purchase of item is Completed
|
||||||
|
```
|
||||||
|
|
||||||
|
### (Optional) Step 4: View in Zipkin
|
||||||
|
|
||||||
|
If you have Zipkin configured for Dapr locally on your machine, you can view the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`).
|
||||||
|
|
||||||
|
<img src="/images/workflow-trace-spans-zipkin-python.png" width=900 style="padding-bottom:15px;">
|
||||||
|
|
||||||
|
### What happened?
|
||||||
|
|
||||||
|
When you ran `dapr run --app-id order-processor --components-path ../../../components/ -- python3 app.py`:
|
||||||
|
|
||||||
|
1. A unique order ID for the workflow is generated (in the above example, `f4e1926e-3721-478d-be8a-f5bebd1995da`) and the workflow is scheduled.
|
||||||
|
1. The `NotifyActivity` workflow activity sends a notification saying an order for 11 cars has been received.
|
||||||
|
1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock.
|
||||||
|
1. Your workflow starts and notifies you of its status.
|
||||||
|
1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `f4e1926e-3721-478d-be8a-f5bebd1995da` and confirms if successful.
|
||||||
|
1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed.
|
||||||
|
1. The `NotifyActivity` workflow activity sends a notification saying that order `f4e1926e-3721-478d-be8a-f5bebd1995da` has completed.
|
||||||
|
1. The workflow terminates as completed.
|
||||||
|
|
||||||
|
#### `order-processor/app.py`
|
||||||
|
|
||||||
|
In the application's program file:
|
||||||
|
- The unique workflow order ID is generated
|
||||||
|
- The workflow is scheduled
|
||||||
|
- The workflow status is retrieved
|
||||||
|
- The workflow and the workflow activities it invokes are registered
|
||||||
|
|
||||||
|
```python
|
||||||
|
class WorkflowConsoleApp:
|
||||||
|
def main(self):
|
||||||
|
print("*** Welcome to the Dapr Workflow console app sample!", flush=True)
|
||||||
|
print("*** Using this app, you can place orders that start workflows.", flush=True)
|
||||||
|
# Wait for the sidecar to become available
|
||||||
|
sleep(5)
|
||||||
|
|
||||||
|
# Register workflow and activities
|
||||||
|
workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT)
|
||||||
|
workflowRuntime.register_workflow(order_processing_workflow)
|
||||||
|
workflowRuntime.register_activity(notify_activity)
|
||||||
|
workflowRuntime.register_activity(requst_approval_activity)
|
||||||
|
workflowRuntime.register_activity(verify_inventory_activity)
|
||||||
|
workflowRuntime.register_activity(process_payment_activity)
|
||||||
|
workflowRuntime.register_activity(update_inventory_activity)
|
||||||
|
workflowRuntime.start()
|
||||||
|
|
||||||
|
# Instantiate Dapr client
|
||||||
|
daprClient = DaprClient(address=f'{settings.DAPR_RUNTIME_HOST}:{settings.DAPR_GRPC_PORT}')
|
||||||
|
baseInventory = {}
|
||||||
|
baseInventory["paperclip"] = InventoryItem("Paperclip", 5, 100)
|
||||||
|
baseInventory["cars"] = InventoryItem("Cars", 15000, 100)
|
||||||
|
baseInventory["computers"] = InventoryItem("Computers", 500, 100)
|
||||||
|
|
||||||
|
self.restock_inventory(daprClient, baseInventory)
|
||||||
|
|
||||||
|
print("==========Begin the purchase of item:==========", flush=True)
|
||||||
|
item_name = default_item_name
|
||||||
|
order_quantity = 11
|
||||||
|
|
||||||
|
total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost
|
||||||
|
order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost)
|
||||||
|
|
||||||
|
# Start Workflow
|
||||||
|
print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True)
|
||||||
|
start_resp = daprClient.start_workflow(workflow_component=workflow_component,
|
||||||
|
workflow_name=workflow_name,
|
||||||
|
input=order)
|
||||||
|
_id = start_resp.instance_id
|
||||||
|
|
||||||
|
def prompt_for_approval(daprClient: DaprClient):
|
||||||
|
daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component,
|
||||||
|
event_name="manager_approval", event_data={'approval': True})
|
||||||
|
|
||||||
|
approval_seeked = False
|
||||||
|
start_time = datetime.now()
|
||||||
|
while True:
|
||||||
|
time_delta = datetime.now() - start_time
|
||||||
|
state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
|
||||||
|
if not state:
|
||||||
|
print("Workflow not found!") # not expected
|
||||||
|
elif state.runtime_status == "Completed" or\
|
||||||
|
state.runtime_status == "Failed" or\
|
||||||
|
state.runtime_status == "Terminated":
|
||||||
|
print(f'Workflow completed! Result: {state.runtime_status}', flush=True)
|
||||||
|
break
|
||||||
|
if time_delta.total_seconds() >= 10:
|
||||||
|
state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
|
||||||
|
if total_cost > 50000 and (
|
||||||
|
state.runtime_status != "Completed" or
|
||||||
|
state.runtime_status != "Failed" or
|
||||||
|
state.runtime_status != "Terminated"
|
||||||
|
) and not approval_seeked:
|
||||||
|
approval_seeked = True
|
||||||
|
threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start()
|
||||||
|
|
||||||
|
print("Purchase of item is ", state.runtime_status, flush=True)
|
||||||
|
|
||||||
|
def restock_inventory(self, daprClient: DaprClient, baseInventory):
|
||||||
|
for key, item in baseInventory.items():
|
||||||
|
print(f'item: {item}')
|
||||||
|
item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\
|
||||||
|
"per_item_cost": {item.per_item_cost}}}'
|
||||||
|
daprClient.save_state("statestore-actors", key, item_str)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
app = WorkflowConsoleApp()
|
||||||
|
app.main()
|
||||||
|
```
|
||||||
|
|
||||||
|
#### `order-processor/workflow.py`
|
||||||
|
|
||||||
|
In `workflow.py`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities).
|
||||||
|
|
||||||
|
```python
|
||||||
|
def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload):
|
||||||
|
"""Defines the order processing workflow.
|
||||||
|
When the order is received, the inventory is checked to see if there is enough inventory to
|
||||||
|
fulfill the order. If there is enough inventory, the payment is processed and the inventory is
|
||||||
|
updated. If there is not enough inventory, the order is rejected.
|
||||||
|
If the total order is greater than $50,000, the order is sent to a manager for approval.
|
||||||
|
"""
|
||||||
|
order_id = ctx.instance_id
|
||||||
|
order_payload=json.loads(order_payload_str)
|
||||||
|
yield ctx.call_activity(notify_activity,
|
||||||
|
input=Notification(message=('Received order ' +order_id+ ' for '
|
||||||
|
+f'{order_payload["quantity"]}' +' ' +f'{order_payload["item_name"]}'
|
||||||
|
+' at $'+f'{order_payload["total_cost"]}' +' !')))
|
||||||
|
result = yield ctx.call_activity(verify_inventory_activity,
|
||||||
|
input=InventoryRequest(request_id=order_id,
|
||||||
|
item_name=order_payload["item_name"],
|
||||||
|
quantity=order_payload["quantity"]))
|
||||||
|
if not result.success:
|
||||||
|
yield ctx.call_activity(notify_activity,
|
||||||
|
input=Notification(message='Insufficient inventory for '
|
||||||
|
+f'{order_payload["item_name"]}'+'!'))
|
||||||
|
return OrderResult(processed=False)
|
||||||
|
|
||||||
|
if order_payload["total_cost"] > 50000:
|
||||||
|
yield ctx.call_activity(requst_approval_activity, input=order_payload)
|
||||||
|
approval_task = ctx.wait_for_external_event("manager_approval")
|
||||||
|
timeout_event = ctx.create_timer(timedelta(seconds=200))
|
||||||
|
winner = yield when_any([approval_task, timeout_event])
|
||||||
|
if winner == timeout_event:
|
||||||
|
yield ctx.call_activity(notify_activity,
|
||||||
|
input=Notification(message='Payment for order '+order_id
|
||||||
|
+' has been cancelled due to timeout!'))
|
||||||
|
return OrderResult(processed=False)
|
||||||
|
approval_result = yield approval_task
|
||||||
|
if approval_result["approval"]:
|
||||||
|
yield ctx.call_activity(notify_activity, input=Notification(
|
||||||
|
message=f'Payment for order {order_id} has been approved!'))
|
||||||
|
else:
|
||||||
|
yield ctx.call_activity(notify_activity, input=Notification(
|
||||||
|
message=f'Payment for order {order_id} has been rejected!'))
|
||||||
|
return OrderResult(processed=False)
|
||||||
|
|
||||||
|
yield ctx.call_activity(process_payment_activity, input=PaymentRequest(
|
||||||
|
request_id=order_id, item_being_purchased=order_payload["item_name"],
|
||||||
|
amount=order_payload["total_cost"], quantity=order_payload["quantity"]))
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield ctx.call_activity(update_inventory_activity,
|
||||||
|
input=PaymentRequest(request_id=order_id,
|
||||||
|
item_being_purchased=order_payload["item_name"],
|
||||||
|
amount=order_payload["total_cost"],
|
||||||
|
quantity=order_payload["quantity"]))
|
||||||
|
except Exception:
|
||||||
|
yield ctx.call_activity(notify_activity,
|
||||||
|
input=Notification(message=f'Order {order_id} Failed!'))
|
||||||
|
return OrderResult(processed=False)
|
||||||
|
|
||||||
|
yield ctx.call_activity(notify_activity, input=Notification(
|
||||||
|
message=f'Order {order_id} has completed!'))
|
||||||
|
return OrderResult(processed=True)
|
||||||
|
```
|
||||||
{{% /codetab %}}
|
{{% /codetab %}}
|
||||||
|
|
||||||
|
|
||||||
|
|
Binary file not shown.
After Width: | Height: | Size: 181 KiB |
Loading…
Reference in New Issue