quickstarts/workflows/python/sdk/order-processor/app.py

69 lines
2.6 KiB
Python

from datetime import datetime
from time import sleep
from dapr.clients import DaprClient
from dapr.conf import settings
from dapr.ext.workflow import DaprWorkflowClient, WorkflowStatus
from workflow import wfr, order_processing_workflow
from model import InventoryItem, OrderPayload
store_name = "statestore"
workflow_name = "order_processing_workflow"
default_item_name = "cars"
class WorkflowConsoleApp:
def main(self):
print("*** Welcome to the Dapr Workflow console app sample!", flush=True)
print("*** Using this app, you can place orders that start workflows.", flush=True)
wfr.start()
# Wait for the sidecar to become available
sleep(5)
wfClient = DaprWorkflowClient()
baseInventory = {
"paperclip": InventoryItem("Paperclip", 5, 100),
"cars": InventoryItem("Cars", 5000, 10),
"computers": InventoryItem("Computers", 500, 100),
}
daprClient = DaprClient(address=f'{settings.DAPR_RUNTIME_HOST}:{settings.DAPR_GRPC_PORT}')
self.restock_inventory(daprClient, baseInventory)
print("==========Begin the purchase of item:==========", flush=True)
item_name = default_item_name
order_quantity = 1
total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost
order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost)
print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True)
instance_id = wfClient.schedule_new_workflow(
workflow=order_processing_workflow, input=order.to_json())
try:
state = wfClient.wait_for_workflow_completion(instance_id=instance_id, timeout_in_seconds=30)
if not state:
print("Workflow not found!")
elif state.runtime_status.name == 'COMPLETED':
print(f'Workflow completed! Result: {state.serialized_output}')
else:
print(f'Workflow failed! Status: {state.runtime_status.name}') # not expected
except TimeoutError:
print('*** Workflow timed out!')
wfr.shutdown()
def restock_inventory(self, daprClient: DaprClient, baseInventory):
for key, item in baseInventory.items():
print(f'item: {item}')
item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\
"per_item_cost": {item.per_item_cost}}}'
daprClient.save_state(store_name, key, item_str)
if __name__ == '__main__':
app = WorkflowConsoleApp()
app.main()