quickstarts/workflows/python/sdk/order-processor/app.py

116 lines
5.5 KiB
Python

from datetime import datetime
import threading
from time import sleep
from dapr.clients import DaprClient
from dapr.conf import settings
from dapr.ext.workflow import WorkflowRuntime
from workflow import order_processing_workflow, notify_activity, process_payment_activity, \
verify_inventory_activity, update_inventory_activity, requst_approval_activity
from model import InventoryItem, OrderPayload
store_name = "statestore-actors"
workflow_component = "dapr"
workflow_name = "order_processing_workflow"
default_item_name = "cars"
class WorkflowConsoleApp:
def main(self):
print("*** Welcome to the Dapr Workflow console app sample!", flush=True)
print("*** Using this app, you can place orders that start workflows.", flush=True)
# Wait for the sidecar to become available
sleep(5)
workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT)
workflowRuntime.register_workflow(order_processing_workflow)
workflowRuntime.register_activity(notify_activity)
workflowRuntime.register_activity(requst_approval_activity)
workflowRuntime.register_activity(verify_inventory_activity)
workflowRuntime.register_activity(process_payment_activity)
workflowRuntime.register_activity(update_inventory_activity)
workflowRuntime.start()
daprClient = DaprClient(address=f'{settings.DAPR_RUNTIME_HOST}:{settings.DAPR_GRPC_PORT}')
baseInventory = {}
baseInventory["paperclip"] = InventoryItem("Paperclip", 5, 100)
baseInventory["cars"] = InventoryItem("Cars", 15000, 100)
baseInventory["computers"] = InventoryItem("Computers", 500, 100)
self.restock_inventory(daprClient, baseInventory)
print("==========Begin the purchase of item:==========", flush=True)
item_name = default_item_name
order_quantity = 10
total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost
order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost)
print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True)
start_resp = daprClient.start_workflow(workflow_component=workflow_component,
workflow_name=workflow_name,
input=order)
_id = start_resp.instance_id
def prompt_for_approval(daprClient: DaprClient):
"""This is a helper function to prompt for approval.
Not using the prompt here ACTUALLY, as quickstart validation is required to be automated.
But, in case you may want to run this sample manually, you can uncomment the following lines:
try:
signal.alarm(15)
approved = input(f'(ID = {_id}) requires approval. Approve? [Y/N] ')
signal.alarm(0) # cancel the alarm
except TimeoutError:
approved = "y"
if state.runtime_status.name == "COMPLETED":
return
if approved.lower() == "y":
client.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': True})
else:
client.raise_workflow_event(instance_id=_id, event_name="manager_approval", data={'approval': False})
## Additionally, you would need to import signal and define timeout_error:
# import signal
# def timeout_error(*_):
# raise TimeoutError
# signal.signal(signal.SIGALRM, timeout_error)
"""
daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component,
event_name="manager_approval", event_data={'approval': True})
approval_seeked = False
start_time = datetime.now()
while True:
time_delta = datetime.now() - start_time
state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
if not state:
print("Workflow not found!") # not expected
elif state.runtime_status == "Completed" or\
state.runtime_status == "Failed" or\
state.runtime_status == "Terminated":
print(f'Workflow completed! Result: {state.runtime_status}', flush=True)
break
if time_delta.total_seconds() >= 10:
state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
if total_cost > 50000 and (
state.runtime_status != "Completed" or
state.runtime_status != "Failed" or
state.runtime_status != "Terminated"
) and not approval_seeked:
approval_seeked = True
threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start()
print("Purchase of item is ", state.runtime_status, flush=True)
def restock_inventory(self, daprClient: DaprClient, baseInventory):
for key, item in baseInventory.items():
print(f'item: {item}')
item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\
"per_item_cost": {item.per_item_cost}}}'
daprClient.save_state("statestore-actors", key, item_str)
if __name__ == '__main__':
app = WorkflowConsoleApp()
app.main()