mirror of https://github.com/dapr/quickstarts.git
69 lines
2.6 KiB
Python
69 lines
2.6 KiB
Python
from datetime import datetime
|
|
from time import sleep
|
|
|
|
from dapr.clients import DaprClient
|
|
from dapr.conf import settings
|
|
from dapr.ext.workflow import DaprWorkflowClient, WorkflowStatus
|
|
|
|
from workflow import wfr, order_processing_workflow
|
|
from model import InventoryItem, OrderPayload
|
|
|
|
store_name = "statestore"
|
|
workflow_name = "order_processing_workflow"
|
|
default_item_name = "cars"
|
|
|
|
class WorkflowConsoleApp:
|
|
def main(self):
|
|
print("*** Welcome to the Dapr Workflow console app sample!", flush=True)
|
|
print("*** Using this app, you can place orders that start workflows.", flush=True)
|
|
|
|
wfr.start()
|
|
# Wait for the sidecar to become available
|
|
sleep(5)
|
|
|
|
wfClient = DaprWorkflowClient()
|
|
|
|
baseInventory = {
|
|
"paperclip": InventoryItem("Paperclip", 5, 100),
|
|
"cars": InventoryItem("Cars", 5000, 10),
|
|
"computers": InventoryItem("Computers", 500, 100),
|
|
}
|
|
|
|
|
|
daprClient = DaprClient(address=f'{settings.DAPR_RUNTIME_HOST}:{settings.DAPR_GRPC_PORT}')
|
|
self.restock_inventory(daprClient, baseInventory)
|
|
|
|
print("==========Begin the purchase of item:==========", flush=True)
|
|
item_name = default_item_name
|
|
order_quantity = 1
|
|
total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost
|
|
order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost)
|
|
|
|
print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True)
|
|
instance_id = wfClient.schedule_new_workflow(
|
|
workflow=order_processing_workflow, input=order.to_json())
|
|
|
|
try:
|
|
state = wfClient.wait_for_workflow_completion(instance_id=instance_id, timeout_in_seconds=30)
|
|
if not state:
|
|
print("Workflow not found!")
|
|
elif state.runtime_status.name == 'COMPLETED':
|
|
print(f'Workflow completed! Result: {state.serialized_output}')
|
|
else:
|
|
print(f'Workflow failed! Status: {state.runtime_status.name}') # not expected
|
|
except TimeoutError:
|
|
print('*** Workflow timed out!')
|
|
|
|
wfr.shutdown()
|
|
|
|
def restock_inventory(self, daprClient: DaprClient, baseInventory):
|
|
for key, item in baseInventory.items():
|
|
print(f'item: {item}')
|
|
item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\
|
|
"per_item_cost": {item.per_item_cost}}}'
|
|
daprClient.save_state(store_name, key, item_str)
|
|
|
|
if __name__ == '__main__':
|
|
app = WorkflowConsoleApp()
|
|
app.main()
|