Update shutdown

Signed-off-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
This commit is contained in:
Deepanshu Agarwal 2023-05-26 09:44:18 +05:30
parent f8fc4e46d7
commit 7b7c350bdc
2 changed files with 18 additions and 13 deletions

View File

@ -14,10 +14,20 @@ class WorkflowConsoleApp:
print("*** Welcome to the Dapr Workflow console app sample!")
print("*** Using this app, you can place orders that start workflows.")
print("*** Ensure that Dapr is running in a separate terminal window using the following command:")
print("dapr run --dapr-grpc-port 50001 --app-id order-processor")
# Wait for the sidecar to become available
# sleep(5)
sleep(5)
address = get_address()
workflowRuntime = WorkflowRuntime(address["host"], address["port"])
workflowRuntime.register_workflow(order_processing_workflow)
workflowRuntime.register_activity(notify_activity)
workflowRuntime.register_activity(requst_approval_activity)
workflowRuntime.register_activity(verify_inventory_activity)
workflowRuntime.register_activity(process_payment_activity)
workflowRuntime.register_activity(update_inventory_activity)
workflowRuntime.start()
daprClient = DaprClient(address=f'{address["host"]}:{address["port"]}')
baseInventory = {}
baseInventory["paperclip"] = InventoryItem("Paperclip", 5, 100)
@ -27,21 +37,13 @@ class WorkflowConsoleApp:
self.restock_inventory(daprClient, baseInventory)
while True:
workflowRuntime = WorkflowRuntime(address["host"], address["port"])
workflowRuntime.register_workflow(order_processing_workflow)
workflowRuntime.register_activity(notify_activity)
workflowRuntime.register_activity(requst_approval_activity)
workflowRuntime.register_activity(verify_inventory_activity)
workflowRuntime.register_activity(process_payment_activity)
workflowRuntime.register_activity(update_inventory_activity)
workflowRuntime.start()
client = DaprWorkflowClient(address["host"], address["port"])
sleep(1)
print("==========Begin the purchase of item:==========")
items = ', '.join(str(inventory_item) for inventory_item in baseInventory.keys())
print("To restock items, type 'restock'.")
print("To exit workflow console app, type 'exit'.")
item_name = input(f'Enter the name of one of the following items to order: {items}: ')
if item_name is None:
@ -49,6 +51,10 @@ class WorkflowConsoleApp:
elif item_name == "restock":
self.restock_inventory(daprClient, baseInventory)
continue
elif item_name == "exit":
print("Exiting workflow console app.")
workflowRuntime.shutdown()
break
else:
item_name = item_name.lower()
if item_name not in baseInventory.keys():
@ -95,7 +101,6 @@ class WorkflowConsoleApp:
threading.Thread(target=prompt_for_approval(client), daemon=True).start()
print(" Purchase of item is ", state.runtime_status.name)
workflowRuntime.shutdown()
def restock_inventory(self, daprClient: DaprClient, baseInventory):
for key, item in baseInventory.items():

View File

@ -24,7 +24,7 @@ def order_processing_workflow(ctx: DaprWorkflowContext, orderPayload: OrderPaylo
if orderPayload.total_cost > 50000:
yield ctx.call_activity(requst_approval_activity, input=orderPayload)
approval_flag = ctx.wait_for_external_event("manager_approval")
timeout_event = ctx.create_timer(timedelta(seconds=15))
timeout_event = ctx.create_timer(timedelta(seconds=200))
winner = yield when_any([approval_flag, timeout_event])
if winner == timeout_event:
yield ctx.call_activity(notify_activity, input=Notification(message=f'Payment for order {order_id} has been cancelled due to timeout!'))