From 0799eb6b4fbcbe731beefbadcbc7f21e49c6de56 Mon Sep 17 00:00:00 2001 From: Marc Duiker Date: Wed, 14 May 2025 11:35:51 +0000 Subject: [PATCH] Clean up Signed-off-by: Marc Duiker --- .../combined-patterns/shipping_app/app.py | 5 ++--- .../combined-patterns/workflow_app/app.py | 12 ++++-------- .../workflow_app/inventory_management.py | 5 +---- .../workflow_app/order_workflow.py | 19 ++++++------------- 4 files changed, 13 insertions(+), 28 deletions(-) diff --git a/tutorials/workflow/python/combined-patterns/shipping_app/app.py b/tutorials/workflow/python/combined-patterns/shipping_app/app.py index f18ed52f..d6a1f86b 100644 --- a/tutorials/workflow/python/combined-patterns/shipping_app/app.py +++ b/tutorials/workflow/python/combined-patterns/shipping_app/app.py @@ -15,12 +15,11 @@ async def check_destination(customer_info: CustomerInfo): print(f"checkDestination: Received input: {customer_info}.", flush=True) return ShippingDestinationResult(is_success=True) -@app.post("/registerShipment", status_code=status.HTTP_201_CREATED, response_model=None) +@app.post("/registerShipment", status_code=status.HTTP_201_CREATED) async def register_shipment(cloud_event: CloudEvent) -> None: order = Order.model_validate(cloud_event.data) - print(f"registerShipment: Received input: {cloud_event}.", flush=True) + print(f"registerShipment: Received input: {order}.", flush=True) - print(f"registerShipment: Converted to order: {order}.", flush=True) status = ShipmentRegistrationStatus(order_id=order.id, is_success=True) with DaprClient() as dapr_client: diff --git a/tutorials/workflow/python/combined-patterns/workflow_app/app.py b/tutorials/workflow/python/combined-patterns/workflow_app/app.py index f24c08e1..08271bdd 100644 --- a/tutorials/workflow/python/combined-patterns/workflow_app/app.py +++ b/tutorials/workflow/python/combined-patterns/workflow_app/app.py @@ -18,10 +18,10 @@ app = FastAPI(lifespan=lifespan) @app.post("/start", status_code=status.HTTP_202_ACCEPTED) async def start_workflow(order: Order) -> None: """ - This is to ensure to have enough inventory for the order. - So the manual restock endpoint is not needed in this sample. + create_default_inventory is used to ensure to have enough inventory for the order. """ im.create_default_inventory(); + print(f"start: Received input: {order}.", flush=True) wf_client = wf.DaprWorkflowClient() @@ -39,13 +39,8 @@ shipment has been registered by the ShippingApp. """ @app.post("/shipmentRegistered", status_code=status.HTTP_202_ACCEPTED) async def shipment_registered(cloud_event: CloudEvent) -> None: - print(f"shipmentRegistered: Received input: {cloud_event}.", flush=True) - - print(f"shipmentRegistered: cloud_event data {cloud_event.data}.", flush=True) - status = ShipmentRegistrationStatus.model_validate(cloud_event.data) - print(f"shipmentRegistered: converted status {status}.", flush=True) - print(f"shipmentRegistered: order {status.order_id}.", flush=True) + print(f"shipmentRegistered: Received input: {status}.", flush=True) wf_client = wf.DaprWorkflowClient() wf_client.raise_workflow_event( @@ -53,6 +48,7 @@ async def shipment_registered(cloud_event: CloudEvent) -> None: event_name=SHIPMENT_REGISTERED_EVENT, data=status.model_dump() ) + return if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=5260, log_level="debug") \ No newline at end of file diff --git a/tutorials/workflow/python/combined-patterns/workflow_app/inventory_management.py b/tutorials/workflow/python/combined-patterns/workflow_app/inventory_management.py index 790f8067..25f7c88f 100644 --- a/tutorials/workflow/python/combined-patterns/workflow_app/inventory_management.py +++ b/tutorials/workflow/python/combined-patterns/workflow_app/inventory_management.py @@ -19,7 +19,6 @@ def create_default_inventory() -> None: ) def get_inventory_item(product_id: str) -> ProductInventoryItem | None: - print(f'im get_inventory_item: {product_id}.', flush=True) with DaprClient() as dapr_client: state_response = dapr_client.get_state( store_name=DAPR_INVENTORY_COMPONENT, @@ -34,7 +33,6 @@ def get_inventory_item(product_id: str) -> ProductInventoryItem | None: return product_inventory_item def check_inventory(order_item: OrderItem) -> bool: - print(f'im check_inventory: {order_item.product_id}.', flush=True) product_inventory_item = get_inventory_item(order_item.product_id) if product_inventory_item is None: return False @@ -43,9 +41,8 @@ def check_inventory(order_item: OrderItem) -> bool: return is_available def update_inventory(order_item: OrderItem) -> UpdateInventoryResult: - print(f'update_inventory: {order_item.product_id}.', flush=True) product_inventory_item = get_inventory_item(order_item.product_id) - + if product_inventory_item is None: return UpdateInventoryResult(is_success=False, message=f"Product not in inventory: {order_item.ProductName}") diff --git a/tutorials/workflow/python/combined-patterns/workflow_app/order_workflow.py b/tutorials/workflow/python/combined-patterns/workflow_app/order_workflow.py index 9b144149..c7907cb0 100644 --- a/tutorials/workflow/python/combined-patterns/workflow_app/order_workflow.py +++ b/tutorials/workflow/python/combined-patterns/workflow_app/order_workflow.py @@ -1,11 +1,8 @@ from dapr.clients import DaprClient from datetime import timedelta -from models import Order, OrderItem, CustomerInfo, OrderStatus, ActivityResult, ShippingDestinationResult, ShipmentRegistrationStatus, PaymentResult, RegisterShipmentResult, ReimburseCustomerResult, UpdateInventoryResult -from dataclasses import asdict +from models import Order, OrderItem, CustomerInfo, OrderStatus, ActivityResult, ShippingDestinationResult, ShipmentRegistrationStatus, PaymentResult, RegisterShipmentResult, ReimburseCustomerResult import dapr.ext.workflow as wf import inventory_management as im -import jsonpickle -from pydantic import BaseModel SHIPMENT_REGISTERED_EVENT = "shipment-registered-events" DAPR_PUBSUB_COMPONENT = "shippingpubsub" @@ -40,10 +37,10 @@ def order_workflow(ctx: wf.DaprWorkflowContext, order: Order): if payment_result.is_success: yield ctx.call_activity(update_inventory, input=order.order_item.model_dump()) - # The register_shipment activity is using pub/sub messaging to communicate with the ShippingApp. + # The register_shipment activity is using pub/sub messaging to communicate with the shipping_app. yield ctx.call_activity(register_shipment, input=order.model_dump()) - # The shipping_app will also use pub/sub messaging back to the WorkflowApp and raise an event. + # The shipping_app will also use pub/sub messaging back to the workflow_app and raise an event. # The workflow will wait for the event to be received or until the timeout occurs. shipment_registered_task = ctx.wait_for_external_event(name=SHIPMENT_REGISTERED_EVENT) timeout_task = ctx.create_timer(fire_at=timedelta(seconds=300)) @@ -52,16 +49,13 @@ def order_workflow(ctx: wf.DaprWorkflowContext, order: Order): # Timeout occurred, the shipment-registered-event was not received. message = f"Shipment registration status for {order.id} timed out." return OrderStatus(is_success=False, message=message) - - if not ctx.is_replaying: - print(f'order_workflow: Shipment registration status: {shipment_registered_task.get_result()}.', flush=True) + shipment_registration_status = ShipmentRegistrationStatus.model_validate(shipment_registered_task.get_result()) if not shipment_registration_status.is_success: # This is the compensation step in case the shipment registration event was not successful. yield ctx.call_activity(reimburse_customer, input=order.model_dump()) message = f"Shipment registration status for {order.id} failed. Customer is reimbursed." return OrderStatus(is_success = False, message = message); - return OrderStatus(is_success=True, message=f"Order {order.id} processed successfully.") @wf_runtime.activity(name='check_inventory') @@ -85,7 +79,7 @@ def check_shipping_destination(ctx: wf.WorkflowActivityContext, customer_info: C print(f'Failed to register shipment. Reason: {response.text}.', flush=True) raise Exception(f"Failed to register shipment. Reason: {response.text}.") result = ShippingDestinationResult.model_validate(response.json()) - return ActivityResult(is_success=result.is_success).model_dump() + return ActivityResult(is_success=result.is_success).model_dump() @wf_runtime.activity(name='process_payment') def process_payment(ctx: wf.WorkflowActivityContext, order: Order) -> PaymentResult: @@ -117,5 +111,4 @@ def register_shipment(ctx: wf.WorkflowActivityContext, order: Order) -> Register data=order.model_dump_json(), data_content_type='application/json' ) - print(f'register_shipment: response: {response}.', flush=True) - return RegisterShipmentResult(is_success=True).model_dump() \ No newline at end of file + return RegisterShipmentResult(is_success=True).model_dump() \ No newline at end of file