Signed-off-by: Marc Duiker <marcduiker@users.noreply.github.com>
This commit is contained in:
Marc Duiker 2025-05-14 11:35:51 +00:00
parent a3f9951373
commit 0799eb6b4f
No known key found for this signature in database
GPG Key ID: 5E708BC9F3163E81
4 changed files with 13 additions and 28 deletions

View File

@ -15,12 +15,11 @@ async def check_destination(customer_info: CustomerInfo):
print(f"checkDestination: Received input: {customer_info}.", flush=True)
return ShippingDestinationResult(is_success=True)
@app.post("/registerShipment", status_code=status.HTTP_201_CREATED, response_model=None)
@app.post("/registerShipment", status_code=status.HTTP_201_CREATED)
async def register_shipment(cloud_event: CloudEvent) -> None:
order = Order.model_validate(cloud_event.data)
print(f"registerShipment: Received input: {cloud_event}.", flush=True)
print(f"registerShipment: Received input: {order}.", flush=True)
print(f"registerShipment: Converted to order: {order}.", flush=True)
status = ShipmentRegistrationStatus(order_id=order.id, is_success=True)
with DaprClient() as dapr_client:

View File

@ -18,10 +18,10 @@ app = FastAPI(lifespan=lifespan)
@app.post("/start", status_code=status.HTTP_202_ACCEPTED)
async def start_workflow(order: Order) -> None:
"""
This is to ensure to have enough inventory for the order.
So the manual restock endpoint is not needed in this sample.
create_default_inventory is used to ensure to have enough inventory for the order.
"""
im.create_default_inventory();
print(f"start: Received input: {order}.", flush=True)
wf_client = wf.DaprWorkflowClient()
@ -39,13 +39,8 @@ shipment has been registered by the ShippingApp.
"""
@app.post("/shipmentRegistered", status_code=status.HTTP_202_ACCEPTED)
async def shipment_registered(cloud_event: CloudEvent) -> None:
print(f"shipmentRegistered: Received input: {cloud_event}.", flush=True)
print(f"shipmentRegistered: cloud_event data {cloud_event.data}.", flush=True)
status = ShipmentRegistrationStatus.model_validate(cloud_event.data)
print(f"shipmentRegistered: converted status {status}.", flush=True)
print(f"shipmentRegistered: order {status.order_id}.", flush=True)
print(f"shipmentRegistered: Received input: {status}.", flush=True)
wf_client = wf.DaprWorkflowClient()
wf_client.raise_workflow_event(
@ -53,6 +48,7 @@ async def shipment_registered(cloud_event: CloudEvent) -> None:
event_name=SHIPMENT_REGISTERED_EVENT,
data=status.model_dump()
)
return
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5260, log_level="debug")

View File

@ -19,7 +19,6 @@ def create_default_inventory() -> None:
)
def get_inventory_item(product_id: str) -> ProductInventoryItem | None:
print(f'im get_inventory_item: {product_id}.', flush=True)
with DaprClient() as dapr_client:
state_response = dapr_client.get_state(
store_name=DAPR_INVENTORY_COMPONENT,
@ -34,7 +33,6 @@ def get_inventory_item(product_id: str) -> ProductInventoryItem | None:
return product_inventory_item
def check_inventory(order_item: OrderItem) -> bool:
print(f'im check_inventory: {order_item.product_id}.', flush=True)
product_inventory_item = get_inventory_item(order_item.product_id)
if product_inventory_item is None:
return False
@ -43,9 +41,8 @@ def check_inventory(order_item: OrderItem) -> bool:
return is_available
def update_inventory(order_item: OrderItem) -> UpdateInventoryResult:
print(f'update_inventory: {order_item.product_id}.', flush=True)
product_inventory_item = get_inventory_item(order_item.product_id)
if product_inventory_item is None:
return UpdateInventoryResult(is_success=False, message=f"Product not in inventory: {order_item.ProductName}")

View File

@ -1,11 +1,8 @@
from dapr.clients import DaprClient
from datetime import timedelta
from models import Order, OrderItem, CustomerInfo, OrderStatus, ActivityResult, ShippingDestinationResult, ShipmentRegistrationStatus, PaymentResult, RegisterShipmentResult, ReimburseCustomerResult, UpdateInventoryResult
from dataclasses import asdict
from models import Order, OrderItem, CustomerInfo, OrderStatus, ActivityResult, ShippingDestinationResult, ShipmentRegistrationStatus, PaymentResult, RegisterShipmentResult, ReimburseCustomerResult
import dapr.ext.workflow as wf
import inventory_management as im
import jsonpickle
from pydantic import BaseModel
SHIPMENT_REGISTERED_EVENT = "shipment-registered-events"
DAPR_PUBSUB_COMPONENT = "shippingpubsub"
@ -40,10 +37,10 @@ def order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
if payment_result.is_success:
yield ctx.call_activity(update_inventory, input=order.order_item.model_dump())
# The register_shipment activity is using pub/sub messaging to communicate with the ShippingApp.
# The register_shipment activity is using pub/sub messaging to communicate with the shipping_app.
yield ctx.call_activity(register_shipment, input=order.model_dump())
# The shipping_app will also use pub/sub messaging back to the WorkflowApp and raise an event.
# The shipping_app will also use pub/sub messaging back to the workflow_app and raise an event.
# The workflow will wait for the event to be received or until the timeout occurs.
shipment_registered_task = ctx.wait_for_external_event(name=SHIPMENT_REGISTERED_EVENT)
timeout_task = ctx.create_timer(fire_at=timedelta(seconds=300))
@ -52,16 +49,13 @@ def order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
# Timeout occurred, the shipment-registered-event was not received.
message = f"Shipment registration status for {order.id} timed out."
return OrderStatus(is_success=False, message=message)
if not ctx.is_replaying:
print(f'order_workflow: Shipment registration status: {shipment_registered_task.get_result()}.', flush=True)
shipment_registration_status = ShipmentRegistrationStatus.model_validate(shipment_registered_task.get_result())
if not shipment_registration_status.is_success:
# This is the compensation step in case the shipment registration event was not successful.
yield ctx.call_activity(reimburse_customer, input=order.model_dump())
message = f"Shipment registration status for {order.id} failed. Customer is reimbursed."
return OrderStatus(is_success = False, message = message);
return OrderStatus(is_success=True, message=f"Order {order.id} processed successfully.")
@wf_runtime.activity(name='check_inventory')
@ -85,7 +79,7 @@ def check_shipping_destination(ctx: wf.WorkflowActivityContext, customer_info: C
print(f'Failed to register shipment. Reason: {response.text}.', flush=True)
raise Exception(f"Failed to register shipment. Reason: {response.text}.")
result = ShippingDestinationResult.model_validate(response.json())
return ActivityResult(is_success=result.is_success).model_dump()
return ActivityResult(is_success=result.is_success).model_dump()
@wf_runtime.activity(name='process_payment')
def process_payment(ctx: wf.WorkflowActivityContext, order: Order) -> PaymentResult:
@ -117,5 +111,4 @@ def register_shipment(ctx: wf.WorkflowActivityContext, order: Order) -> Register
data=order.model_dump_json(),
data_content_type='application/json'
)
print(f'register_shipment: response: {response}.', flush=True)
return RegisterShipmentResult(is_success=True).model_dump()
return RegisterShipmentResult(is_success=True).model_dump()