diff --git a/README.md b/README.md index f714357..cd145f5 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ If you are new to Dapr, you may want to review following resources first: | [commercetools GraphQL sample output binding](./commercetools-graphql-sample/) | Connects to commercetools, allowing you to query or manipulate a commercetools projects using a provided GraphlQL query. | | [WebAssembly Middleware](./hello-wasm) | Demonstrates how to serve HTTP responses directly from the dapr sidecar using WebAssembly. | | [Workflow + external endpoint invocation](./workflow-external-invocation) | Demonstrates how to use the Dapr Workflow API to coordinate an order process that includes an activity which uses service invocation for non-Dapr endpoints. | +| [Workflow + multi-app microservice in Python](./workflow-orderprocessing-python) | Demonstrates how to use the Dapr Workflow Python SDK to coordinate an order process across multiple dapr-enabled microservices. | ## External samples diff --git a/workflow-orderprocessing-python/.vscode/settings.json b/workflow-orderprocessing-python/.vscode/settings.json new file mode 100644 index 0000000..ecb7ec2 --- /dev/null +++ b/workflow-orderprocessing-python/.vscode/settings.json @@ -0,0 +1,15 @@ +{ + "[python]": { + "editor.defaultFormatter": "ms-python.autopep8", + "editor.formatOnSave": true, + "editor.codeActionsOnSave": { + "source.organizeImports": true, + }, + "editor.rulers": [ + 119 + ], + }, + "autopep8.args": [ + "--max-line-length=119" + ] +} \ No newline at end of file diff --git a/workflow-orderprocessing-python/README.md b/workflow-orderprocessing-python/README.md new file mode 100644 index 0000000..d1bd876 --- /dev/null +++ b/workflow-orderprocessing-python/README.md @@ -0,0 +1,254 @@ +# Dapr Workflow Order Processing Microservice Sample + +## Sample info + +| Attribute | Details | +|--------|--------| +| Dapr runtime version | 1.12.0 | +| Dapr Workflow Python SDK | v0.2.0 | +| Language | Python | +| Environment | Local | + +An example of using [Dapr Workflows](https://docs.dapr.io/developing-applications/building-blocks/workflow/) to build a microservice architecture for order processing using Python. This example uses pub/sub and service invocation building blocks. + +## Overview + +This sample is comprised of five microservices: + +- **[orderprocessing](./services/orderprocessing/)**: The main entrypoint for the sample. This service receives an order request and starts a new Dapr Workflow instance to process the order. +- **[notifications](./services/notifications/)**: A service receives pub/sub messages from the orderprocessing service and displays notifications to the user. +- **[inventory](./services/inventory/)**: A service for managing inventory that is invoked by the orderprocessing workflows. +- **[payments](./services/payments/)**: A service for managing payments that is invoked by the orderprocessing workflows. +- **[shipping](./services/shipping/)**: A service for managing shipping that is invoked by the orderprocessing workflows. + +The order processing workflow and all services are written in Python. Orders are submitted through a Flask REST API in the orderprocessing service, which triggers a workflow instance to process the order. As the workflow progresses, it uses pub/sub to send notifications to the notifications service, and service invocation to invoke the inventory, payments, and shipping services. + +Orders that are $1,000 or more must be approved by a designated approver within 24 hours, otherwise the order will be cancelled. The workflow will automatically pause execution to wait for the approval using the [external workflow event](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-features-concepts/#external-events) APIs. Approvals can be submitted through the Flask REST API in the orderprocessing service. + +## Prerequisites + +1. Python 3.8+ +1. [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/) + - Ensure that you're using **v1.12** of the Dapr runtime and the CLI. +1. A REST client, such as [cURL](https://curl.se/), or the VSCode [REST client extension](https://marketplace.visualstudio.com/items?itemName=humao.rest-client) (recommended). + +## Setting up the Python environment + +It's recommended to create a virtual environment for the sample. This can be done using the `venv` module in Python 3.8+. You can then install the dependencies for each of the microservices. + +1. Create a virtual environment for the sample: + + ```sh + python3 -m venv .venv + ``` + +1. Activate the virtual environment: + + ```sh + source .venv/bin/activate + ``` + +1. Install the dependencies for each of the microservices under the `services` directory: + + ```sh + pip install -r services/orderprocessing/requirements.txt + pip install -r services/notifications/requirements.txt + pip install -r services/inventory/requirements.txt + pip install -r services/payments/requirements.txt + pip install -r services/shipping/requirements.txt + ``` + +## Running the sample + +The full microservice architecture can be run locally using the [Multi-App Run](https://docs.dapr.io/developing-applications/local-development/multi-app-dapr-run/) feature of the Dapr CLI. The multi-app configuration is defined in the [dapr.yaml](./dapr.yaml) file. + +1. To start all the services, navigate to the root directory of the sample and run the following command: + + ```sh + dapr run -f . + ``` + +1. Next, navigate to the notification service web UI at http://localhost:3001 to view the notifications from workflows. Keep this page open while running the sample. + +1. Submit an order using the following cURL command, or via the [demo.http](./demo.http) file if you are using VSCode with the REST client, to start a new order processing workflow that requires approval (if using PowerShell, replace `\` with `` ` ``): + + ```sh + curl -i -X POST http://localhost:3000/orders \ + -H "Content-Type: application/json" \ + -d '{"customer": "Riley", "items": ["milk", "bread", "iPhone"], "total": 1299.00}' + ``` + + Expected result (abbreviated): + + ```http + HTTP/1.1 202 ACCEPTED + Location: http://localhost:3000/orders/order_riley_o066i + Content-Type: text/html; charset=utf-8 + Content-Length: 40 + + Order received. ID = 'order_riley_o066i' + ``` + + > **NOTE**: `order_riley_o066i` in the response is the order ID. The exact value is random and will be different for each order. In each subsequent example, replace `order_riley_o066i` with the order ID returned from the previous request. + + > **NOTE**: If you want to submit an order the is auto-approved, simply change the `total` value to be less than `1000`. + +1. Check the status of the order by sending a GET request to the URL in the `Location` header of the previous response: + + ```http + curl -i http://localhost:3000/orders/order_riley_o066i + ``` + + Expected result (formatted for readability): + + ```json + { + "created_time": "2023-10-28T17:02:05.137170", + "details": { + "customer": "Riley", + "id": "order_riley_o066i", + "items": ["milk", "bread", "iPhone"], + "total": 1299.0 + }, + "id": "order_riley_o066i", + "last_updated_time": "2023-10-28T17:02:05.222644", + "status": "RUNNING" + } + ``` + +1. Check the notifications web UI at http://localhost:3001 to see the notifications from the workflow. They should look something like this: + + ```text + 2023-10-28T10:02:05-07:00 | order_riley_o066i | Received order for Riley: ['milk', 'bread', 'iPhone']. Total = 1299.0 + 2023-10-28T10:02:05-07:00 | order_riley_o066i | Reserved inventory for order: ['milk', 'bread', 'iPhone'] + 2023-10-28T10:02:05-07:00 | order_riley_o066i | Waiting for approval since order >= 1000.0. Deadline = 2023-10-29 17:02:05. + ``` + +1. Approve the order by sending a POST request to the approval endpoint: + + ```sh + curl -i -X POST http://localhost:3000/orders/order_riley_o066i/approve \ + -H "Content-Type: application/json" \ + -d '{"approver": "Chris", "approved": true}' + ``` + + > **NOTE**: You can alternatively reject the order by specifying `"approved": false` in the request body. + + Expected result (abbreviated): + + ```http + HTTP/1.1 200 OK + Content-Type: text/html; charset=utf-8 + Content-Length: 42 + + Approval sent for order: order_riley_o066i + ``` + +1. Check the status of the workflow again to confirm that it completed successfully: + + ```sh + curl -i http://localhost:3000/orders/order_riley_o066i + ``` + + Expected result (formatted for readability): + + ```json + { + "created_time": "2023-10-28T17:02:05.137170", + "details": { + "customer": "Riley", + "id": "order_riley_o066i", + "items": ["milk", "bread", "iPhone"], + "total": 1299.0 + }, + "id": "order_riley_o066i", + "last_updated_time": "2023-10-28T17:10:02.716316", + "order_result": { + "id": "order_riley_o066i", + "message": "Order processed successfully", + "success": true + }, + "status": "COMPLETED" + } + ``` + +1. Check the notifications web UI again to see the updated notifications from the workflow. They should look something like this: + + ```text + 2023-10-28T10:02:05-07:00 | order_riley_o066i | Received order for Riley: ['milk', 'bread', 'iPhone']. Total = 1299.0 + 2023-10-28T10:02:05-07:00 | order_riley_o066i | Reserved inventory for order: ['milk', 'bread', 'iPhone'] + 2023-10-28T10:02:05-07:00 | order_riley_o066i | Waiting for approval since order >= 1000.0. Deadline = 2023-10-29 17:02:05. + 2023-10-28T10:10:00-07:00 | order_riley_o066i | Order was approved by Chris. + 2023-10-28T10:10:01-07:00 | order_riley_o066i | Payment was processed successfully + 2023-10-28T10:10:02-07:00 | order_riley_o066i | Order submitted for shipping + ``` + +1. Check the Zipkin UI at http://localhost:9411. Click "Run Query" to list the recently captured traces and then select the most recent, which should have a span of several seconds. The resulting UI should look something like this: + + ![Zipkin UI for success](./zipkin-workflow-success.png) + + > **NOTE**: This screenshot shows a different order processing workflow instance ID than the one used in this example. + + > **NOTE**: When the `orchestration||process_order_workflow` span is selected, you can click the "SHOW ALL ANNOTATIONS" button to see more information about the workflow, including the specific time that the order was approved. + +1. To stop all the services, press `Ctrl+C` in the terminal where the services are running. + +If you want to exercise the error handling path of the workflow, you can put the shipping service in "maintainance mode" and then try to submit another order. + +1. Put the shipping service in "maintainance mode" by sending a POST request to the `/shipping/deactivate` endpoint: + + ```sh + curl -i -X POST http://localhost:3004/shipping/deactivate + ``` + +1. Start a new order processing workflow. For simplicity, we'll start an order with a smaller cost that can be auto-approved. + + ```sh + curl -i -X POST http://localhost:3000/orders \ + -H "Content-Type: application/json" \ + -d '{"customer": "Casey", "items": ["milk", "bread"], "total": 10.00}' + ``` + +1. Check the notifications web UI to see the notifications for this new workflow. It should look something like this: + + ```text + 2023-10-28T10:46:19-07:00 | order_casey_1irq1 | Received order for Casey: ['milk', 'bread']. Total = 10.0 + 2023-10-28T10:46:20-07:00 | order_casey_1irq1 | Reserved inventory for order: ['milk', 'bread'] + 2023-10-28T10:46:21-07:00 | order_casey_1irq1 | Payment was processed successfully + 2023-10-28T10:46:21-07:00 | order_casey_1irq1 | Error submitting order for shipping: order_casey_1irq1: Activity task #6 failed: Unknown Dapr Error. HTTP status code: 503 + ``` + + > **NOTE**: There is currently a bug in the v0.2.0 release of the Dapr Workflow Python SDK that causes the workflow to fail earlier than it's supposed to, resulting in the refund activity not running. This bug will be fixed in a future release of the SDK. + +1. You can also check the status of the workflow using the REST API to see that the order is in the failed state. + + ``` + curl -i http://localhost:3000/orders/order_casey_1irq1 + ``` + + Expected result (formatted for readability): + + ```json + { + "created_time": "2023-10-28T17:46:19.031582", + "details": { + "customer": "Casey", + "id": "order_casey_1irq1", + "items": ["milk", "bread"], + "total": 10.0 + }, + "failure_details": { + "error_type": "TaskFailedError", + "message": "order_casey_1irq1: Activity task #6 failed: Unknown Dapr Error. HTTP status code: 503", + "stack_trace": "..." + }, + "id": "order_casey_1irq1", + "last_updated_time": "2023-10-28T17:46:21.205560", + "status": "FAILED" + } + ``` + +1. You can also see the failed workflow in Zipkin, which should look something like this: + + ![Zipkin UI for failure](./zipkin-workflow-failure.png) + diff --git a/workflow-orderprocessing-python/components/pubsub.yaml b/workflow-orderprocessing-python/components/pubsub.yaml new file mode 100644 index 0000000..5d2dde8 --- /dev/null +++ b/workflow-orderprocessing-python/components/pubsub.yaml @@ -0,0 +1,12 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: orders +spec: + type: pubsub.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" diff --git a/workflow-orderprocessing-python/components/statestore.yaml b/workflow-orderprocessing-python/components/statestore.yaml new file mode 100644 index 0000000..2f676bf --- /dev/null +++ b/workflow-orderprocessing-python/components/statestore.yaml @@ -0,0 +1,14 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" + - name: actorStateStore + value: "true" diff --git a/workflow-orderprocessing-python/dapr.yaml b/workflow-orderprocessing-python/dapr.yaml new file mode 100644 index 0000000..5319fa9 --- /dev/null +++ b/workflow-orderprocessing-python/dapr.yaml @@ -0,0 +1,38 @@ +version: 1 +common: + resourcesPath: ./components +apps: + # The order processing service receives orders and executes workflows +- appDirPath: ./services/orderprocessing/ + appPort: 3000 + command: ["python3", "app.py"] + appLogDestination: console + daprdLogDestination: console + + # The notifications service receives pubsub notifications from workflow +- appDirPath: ./services/notifications + appPort: 3001 + command: ["python3", "app.py"] + appLogDestination: console + daprdLogDestination: console + + # The inventory service is invoked directly by order processing workflows +- appDirPath: ./services/inventory + appPort: 3002 + command: ["python3", "app.py"] + appLogDestination: console + daprdLogDestination: console + + # The payments service is invoked directly by order processing workflows +- appDirPath: ./services/payments + appPort: 3003 + command: ["python3", "app.py"] + appLogDestination: console + daprdLogDestination: console + + # The shipping service is invoked directly by order processing workflows +- appDirPath: ./services/shipping + appPort: 3004 + command: ["python3", "app.py"] + appLogDestination: console + daprdLogDestination: console \ No newline at end of file diff --git a/workflow-orderprocessing-python/demo.http b/workflow-orderprocessing-python/demo.http new file mode 100644 index 0000000..ad84aff --- /dev/null +++ b/workflow-orderprocessing-python/demo.http @@ -0,0 +1,30 @@ + +### Get the current inventory +GET http://localhost:3002/inventory + + +### Submit a simple order +POST http://localhost:3000/orders +Content-Type: application/json + +{"customer": "Casey", "items": ["milk", "bread"], "total": 10.00} + + +### Submit an expensive order +POST http://localhost:3000/orders +Content-Type: application/json + +{"customer": "Riley", "items": ["milk", "bread", "iPhone"], "total": 1299.00} + + +### Get the status of an order +GET http://localhost:3000/orders/order_casey_6glku + + +### Approve an order +POST http://localhost:3000/orders/order_riley_xcgyz/approve +Content-Type: application/json + +{"approver": "Chris", "approved": true} + + diff --git a/workflow-orderprocessing-python/services/inventory/app.py b/workflow-orderprocessing-python/services/inventory/app.py new file mode 100644 index 0000000..7d4e908 --- /dev/null +++ b/workflow-orderprocessing-python/services/inventory/app.py @@ -0,0 +1,90 @@ +import logging +import os +import time +from typing import Dict + +from flask import Flask, g, request + +APP_PORT = os.getenv("APP_PORT", "3002") + +app = Flask(__name__) + + +@app.route("/inventory", methods=["GET"]) +def get_inventory(): + logging.info("Getting inventory") + if not hasattr(g, 'inventory'): + restock_inventory() + inventory: Dict[str, int] = g.inventory + return inventory, 200 + + +@app.route("/inventory/reserve", methods=["POST"]) +def reserve_inventory(): + logging.info(f"Reserving inventory: {request.json}") + + order = request.json + items = order['items'] + id = order['id'] + + if not hasattr(g, 'inventory'): + restock_inventory() + + inventory: Dict[str, int] = g.inventory + + # Check if we have enough inventory to fulfill the order + for item in items: + if item not in inventory or inventory[item] <= 0: + return { + "id": id, + "success": False, + "message": "Out of stock", + }, 200 + + # Update the inventory + for item in items: + inventory[item] = inventory[item] - 1 + + # Simulate work + time.sleep(1) + + return { + "id": id, + "success": True, + "message": "" + }, 200 + + +@app.route("/inventory/restock", methods=["POST"]) +def restock_inventory(): + logging.info("Restocking inventory") + inventory = dict[str, int]( + { + 'milk': 10, + 'bread': 10, + 'apples': 10, + 'oranges': 10, + 'iPhone': 10, + } + ) + g.inventory = inventory + + +@app.route("/", methods=["GET"]) +@app.route("/healthz", methods=["GET"]) +def hello(): + return f"Hello from {__name__}", 200 + + +def main(): + # Start the Flask app server + app.run(port=APP_PORT, debug=True, use_reloader=False) + app.post('/inventory/restock') + + +if __name__ == "__main__": + logging.basicConfig( + format='%(asctime)s.%(msecs)03d %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.INFO) + main() diff --git a/workflow-orderprocessing-python/services/inventory/requirements.txt b/workflow-orderprocessing-python/services/inventory/requirements.txt new file mode 100644 index 0000000..609c2fa --- /dev/null +++ b/workflow-orderprocessing-python/services/inventory/requirements.txt @@ -0,0 +1,2 @@ +dapr >= 1.11.0 +flask >= 3.0.0 diff --git a/workflow-orderprocessing-python/services/notifications/app.py b/workflow-orderprocessing-python/services/notifications/app.py new file mode 100644 index 0000000..04fae8d --- /dev/null +++ b/workflow-orderprocessing-python/services/notifications/app.py @@ -0,0 +1,64 @@ +import logging +import os + +from flask import Flask, jsonify, render_template, request +from flask_socketio import SocketIO + +APP_PORT = os.getenv("APP_PORT", "3001") +PUBSUB_NAME = os.getenv("PUBSUB_NAME", "orders") +TOPIC_NAME = os.getenv("TOPIC_NAME", "notifications") + +app = Flask(__name__) +socketio = SocketIO(app) + + +@socketio.on('connect') +def socket_connect(): + print('connected', flush=True) + + +@app.route('/') +def index(): + return render_template('index.html') + + +@app.route('/dapr/subscribe', methods=['GET']) +def subscribe(): + """Returns the list of topics the app wants to subscribe to. + Ref: https://docs.dapr.io/reference/api/pubsub_api/#provide-a-route-for-dapr-to-discover-topic-subscriptions""" + subs = [ + { + 'pubsubname': PUBSUB_NAME, + 'topic': TOPIC_NAME, + 'route': TOPIC_NAME, + }, + ] + return jsonify(subs) + + +@app.route('/' + TOPIC_NAME, methods=['POST', 'PUT']) +def topic_notifications(): + """Handles notification events from the Dapr pubsub component. + Ref: https://docs.dapr.io/reference/api/pubsub_api/#provide-routes-for-dapr-to-deliver-topic-events""" + logging.info(f"Received notification: {request.json}") + event = request.json + socketio.emit('message', event) + return '', 200 + + +@app.route("/healthz", methods=["GET"]) +def hello(): + return f"Hello from {__name__}", 200 + + +def main(): + logging.info("Starting Flask app...") + socketio.run(app, port=APP_PORT, allow_unsafe_werkzeug=True) + + +if __name__ == "__main__": + logging.basicConfig( + format='%(asctime)s.%(msecs)03d %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.INFO) + main() diff --git a/workflow-orderprocessing-python/services/notifications/requirements.txt b/workflow-orderprocessing-python/services/notifications/requirements.txt new file mode 100644 index 0000000..49aa46d --- /dev/null +++ b/workflow-orderprocessing-python/services/notifications/requirements.txt @@ -0,0 +1,2 @@ +flask-socketio == 5.3.6 +flask >= 3.0.0 \ No newline at end of file diff --git a/workflow-orderprocessing-python/services/notifications/static/css/app.css b/workflow-orderprocessing-python/services/notifications/static/css/app.css new file mode 100644 index 0000000..0e5880c --- /dev/null +++ b/workflow-orderprocessing-python/services/notifications/static/css/app.css @@ -0,0 +1,109 @@ +/* + Knative colors + Dark Blue: #0865ad + Light Blue: #6695ca +*/ + + +html, +body { + height: 100%; + margin: 0; + padding: 0; + background-color: #fff; +} + +#wrapper { + padding: 0; + margin: 0; + height: 100%; + text-align: center; + widows: 100%; +} + +#page-header { + padding: 0; + margin: 10px 0 0 0; + clear: both; + break-after: always; + height: 100px; +} + +#page-header-image { + border: 0; + margin-left: 25px; +} + +#page-header-image img { + width: 100px; + height: 100px; + float: left; +} + +#connection { + float: right; + margin: 10px; + font-family: Geneva, Verdana, sans-serif; + font-size: 1em; +} + +#middle-section { + margin: 0; + padding: 10px; + height: 700px; + overflow: auto; +} + +#middle-section div { + margin: 5px; +} + +#page-footer { + widows: 80%; + margin: 30px 0 0 0; + font-family: Geneva, Verdana, sans-serif; + font-size: 1em; +} + +#notifications { + background-color: #20329b; + padding: 0; + margin: 5px; + height: 690px; + overflow: auto; +} + +div.item-text { + margin-left: 10px; + padding-left: 10px; +} + +#notifications b { + font-family: Geneva, Verdana, sans-serif; + font-size: 1em; + color: #20329b; + margin: 0; +} + +#notifications i { + font-family: Geneva, Verdana, sans-serif; + font-size: 0.8em; + font-style: normal; +} + +#notifications i.small { + font-size: 0.7em; + color: #666666; +} + +.error { + color: red; +} + +.item { + clear:both; + padding: 5px; + text-align: left; + background-color: #fff; + overflow: auto; +} \ No newline at end of file diff --git a/workflow-orderprocessing-python/services/notifications/static/img/dapr.svg b/workflow-orderprocessing-python/services/notifications/static/img/dapr.svg new file mode 100644 index 0000000..716c3e4 --- /dev/null +++ b/workflow-orderprocessing-python/services/notifications/static/img/dapr.svg @@ -0,0 +1,15 @@ + + + + Artboard + Created with Sketch. + + + + + + + + + + \ No newline at end of file diff --git a/workflow-orderprocessing-python/services/notifications/static/img/favicon.ico b/workflow-orderprocessing-python/services/notifications/static/img/favicon.ico new file mode 100644 index 0000000..d3617fa Binary files /dev/null and b/workflow-orderprocessing-python/services/notifications/static/img/favicon.ico differ diff --git a/workflow-orderprocessing-python/services/notifications/static/js/app.js b/workflow-orderprocessing-python/services/notifications/static/js/app.js new file mode 100644 index 0000000..6f02eba --- /dev/null +++ b/workflow-orderprocessing-python/services/notifications/static/js/app.js @@ -0,0 +1,44 @@ +window.onload = function () { + + console.log("Protocol: " + location.protocol); + var wsURL = location.protocol + "//" + document.location.host + + var log = document.getElementById("notifications"); + + function appendLog(item) { + var doScroll = log.scrollTop > log.scrollHeight - log.clientHeight - 1; + log.appendChild(item); + if (doScroll) { + log.scrollTop = log.scrollHeight - log.clientHeight; + } + } + + if (log) { + var sock = io.connect(wsURL); + var connDiv = document.getElementById("connection-status"); + connDiv.innerText = "closed"; + + sock.on('connect', function () { + console.log("connected to " + wsURL); + connDiv.innerText = "open"; + }); + + sock.on('disconnect', function (e) { + console.log("connection closed (" + e.code + ")"); + connDiv.innerText = "closed"; + }); + + sock.on('message', function (evt) { + ////console.log("message: " + JSON.stringify(evt)); + + var item = document.createElement("div"); + item.className = "item"; + var data = JSON.parse(evt.data); + var message = "" + evt.time + " | " + data.order_id + " | " + data.message + ""; + var content = "
" + message + "
"; + item.innerHTML = content; + appendLog(item); + }); + + } // if log +}; diff --git a/workflow-orderprocessing-python/services/notifications/templates/index.html b/workflow-orderprocessing-python/services/notifications/templates/index.html new file mode 100644 index 0000000..6503b18 --- /dev/null +++ b/workflow-orderprocessing-python/services/notifications/templates/index.html @@ -0,0 +1,43 @@ + + + + + dapr event viewer + + + + + + + + + + + + +
+ + + + + +
+
+
+ + + +
+ + + + diff --git a/workflow-orderprocessing-python/services/orderprocessing/app.py b/workflow-orderprocessing-python/services/orderprocessing/app.py new file mode 100644 index 0000000..798ae61 --- /dev/null +++ b/workflow-orderprocessing-python/services/orderprocessing/app.py @@ -0,0 +1,282 @@ +import json +import logging +import os +import random +import string +from dataclasses import dataclass +from datetime import timedelta + +import dapr.ext.workflow as wf +from dapr.clients import DaprClient +from flask import Flask, request, url_for +from markupsafe import escape + +APP_PORT = os.getenv("APP_PORT", "3000") +PUBSUB_NAME = os.getenv("PUBSUB_NAME", "orders") +TOPIC_NAME = os.getenv("TOPIC_NAME", "notifications") + +APPROVAL_THRESHOLD = 1000.0 +APPROVAL_TIMEOUT = timedelta(hours=24) + +app = Flask(__name__) + + +@dataclass +class Order: + id: str + customer: str + items: list + total: float + + +@dataclass +class OrderResult: + id: str + success: bool + message: str + + +@dataclass +class InventoryResult: + id: str + success: bool + message: str + + +@dataclass +class Approval: + approver: str + approved: bool + + @staticmethod + def from_dict(dict): + return Approval(**dict) + + +def process_order_workflow(ctx: wf.DaprWorkflowContext, order: Order): + yield ctx.call_activity(notify, input=f"Received order for {order.customer}: {order.items}. Total = {order.total}") + + # Call into the inventory service to reserve the items in this order + result = yield ctx.call_activity(reserve_inventory, input=order) + if not result.success: + yield ctx.call_activity(notify, input=f"Inventory failed for order: {result.message}") + return OrderResult(order.id, False, result.message) + + yield ctx.call_activity(notify, input=f"Reserved inventory for order: {order.items}") + + # Orders over $1,000 require human approval + if order.total >= APPROVAL_THRESHOLD: + approval_deadline = ctx.current_utc_datetime + APPROVAL_TIMEOUT + yield ctx.call_activity( + notify, + input=f"Waiting for approval since order >= {APPROVAL_THRESHOLD}. Deadline = {approval_deadline}.") + + # Block the workflow on either an approval event or a timeout + approval_task = ctx.wait_for_external_event("approval") + timeout_expired_task = ctx.create_timer(approval_deadline) + winner = yield wf.when_any([approval_task, timeout_expired_task]) + if winner == timeout_expired_task: + message = "Approval deadline expired." + yield ctx.call_activity(notify, input=message) + return OrderResult(order.id, False, message) + + # Check the approval result + approval: Approval = yield approval_task + if not approval.approved: + message = f"Order was rejected by {approval.approver}." + yield ctx.call_activity(notify, input=message) + return OrderResult(order.id, False, message) + + yield ctx.call_activity(notify, input=f"Order was approved by {approval.approver}.") + + # Submit the order to the payment service + yield ctx.call_activity(submit_payment, input=order) + yield ctx.call_activity(notify, input="Payment was processed successfully") + + # Submit the order for shipping + try: + yield ctx.call_activity(submit_order_to_shipping, input=order) + except Exception as e: + # Shipping failed, so we need to refund the payment + yield ctx.call_activity(notify, input=f"Error submitting order for shipping: {str(e)}") + yield ctx.call_activity(refund_payment, input=order) + yield ctx.call_activity(notify, input="Payment refunded") + + # Allow the workflow to fail with the original failure details + raise + + yield ctx.call_activity(notify, input="Order submitted for shipping") + return OrderResult(order.id, True, "Order processed successfully") + + +def notify(ctx: wf.WorkflowActivityContext, message: str): + logging.info(f"Sending notification: {message}") + with DaprClient() as d: + d.publish_event(PUBSUB_NAME, TOPIC_NAME, json.dumps({ + "order_id": ctx.workflow_id, + "message": message + })) + + +def reserve_inventory(_, order: Order) -> InventoryResult: + logging.info(f"Reserving inventory for order: {order}") + with DaprClient() as d: + resp = d.invoke_method("inventory", "inventory/reserve", http_verb="POST", data=json.dumps(order.__dict__)) + if resp.status_code != 200: + raise Exception(f"Error calling inventory service: {resp.status_code}") + inventory_result = InventoryResult(**json.loads(resp.data.decode("utf-8"))) + logging.info(f"Inventory result: {inventory_result}") + return inventory_result + + +def submit_payment(_, order: Order): + logging.info(f"Submitting payment for order: {order}") + with DaprClient() as d: + resp = d.invoke_method("payments", "payments/charge", http_verb="POST", data=json.dumps(order.__dict__)) + if resp.status_code != 200: + raise Exception(f"Error calling payment service: {resp.status_code}: {resp.text()}") + + +def submit_order_to_shipping(_, order: Order): + logging.info(f"Submitting order to shipping: {order}") + with DaprClient() as d: + resp = d.invoke_method("shipping", "shipping/ship", http_verb="POST", data=json.dumps(order.__dict__)) + if resp.status_code != 200: + raise Exception(f"Error calling shipping service: {resp.status_code}: {resp.text()}") + + +def refund_payment(_, order: Order): + logging.info(f"Refunding payment for order: {order}") + with DaprClient() as d: + resp = d.invoke_method("payments", "payments/refund", http_verb="POST", data=json.dumps(order.__dict__)) + if resp.status_code != 200: + raise Exception(f"Error calling payment service: {resp.status_code}: {resp.text()}") + + +# API to submit a new order +@app.route("/orders", methods=["POST"]) +def submit_order(): + + request_data = request.get_json() + if not request_data: + return """Invalid request. Should be in the form of { + \"customer\": \"joe\", \"items\": [\"apples\", \"oranges\"], \"total\": 100.0}""", 400 + if not request_data.get("customer"): + return "Missing customer name", 400 + if not request_data.get("items"): + return "Missing items", 400 + if not request_data.get("total"): + return "Missing total", 400 + + order = Order( + None, + request_data.get("customer"), + request_data.get("items"), + request_data.get("total")) + + # Generate a unique ID for this order + random_suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=5)) + order.id = f"order_{order.customer.lower()}_{random_suffix}" + + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow( + process_order_workflow, + input=order, + instance_id=order.id) + + logging.info(f"Started workflow instance: {instance_id}") + return f"Order received. ID = '{escape(instance_id)}'", 202, { + 'Location': url_for('check_order_status', order_id=instance_id, _external=True) + } + + +@app.route("/orders/", methods=["GET"]) +def check_order_status(order_id): + wf_client = wf.DaprWorkflowClient() + state = wf_client.get_workflow_state(order_id) + if not state: + return f"Order not found: {escape(order_id)}", 404 + + order_info = json.loads(state.serialized_input) + order = Order( + order_info.get('id'), + order_info.get('customer'), + order_info.get('items'), + order_info.get('total')) + resp = { + "id": state.instance_id, + "details": order.__dict__, + "status": state.runtime_status.name, + "created_time": state.created_at.isoformat(), + "last_updated_time": state.last_updated_at.isoformat(), + } + + if state.serialized_output: + order_result_details = json.loads(state.serialized_output) + order_result = OrderResult( + order_result_details.get('id'), + order_result_details.get('success'), + order_result_details.get('message')) + resp["order_result"] = order_result.__dict__ + + if state.failure_details: + resp["failure_details"] = { + "message": state.failure_details.message, + "error_type": state.failure_details.error_type, + "stack_trace": state.failure_details.stack_trace + } + + return resp, 200 + + +@app.route("/orders//approve", methods=["POST"]) +def approve_order(order_id): + request_data = request.get_json() + if not request_data: + return """Invalid request. Should be in the form of { \"approver\": \"joe\", \"approved\": true }""", 400 + if not request_data.get("approver"): + return "Missing approver name", 400 + if "approved" not in request_data: + return "Missing approved flag", 400 + + approval = Approval( + request_data.get("approver"), + request_data.get("approved")) + + wf_client = wf.DaprWorkflowClient() + wf_client.raise_workflow_event(order_id, "approval", data=approval) + + return f"Approval sent for order: {escape(order_id)}", 200 + + +@app.route("/", methods=["GET"]) +@app.route("/healthz", methods=["GET"]) +def hello(): + return f"Hello from {__name__}", 200 + + +def main(): + # Start the workflow runtime + logging.info("Starting workflow runtime...") + wf_runtime = wf.WorkflowRuntime() # host/port comes from env vars + wf_runtime.register_workflow(process_order_workflow) + wf_runtime.register_activity(notify) + wf_runtime.register_activity(reserve_inventory) + wf_runtime.register_activity(submit_payment) + wf_runtime.register_activity(submit_order_to_shipping) + wf_runtime.register_activity(refund_payment) + wf_runtime.start() # non-blocking + + # Start the Flask app server + app.run(port=APP_PORT, debug=False, use_reloader=False) + + # Stop the workflow runtime to allow the process to terminate + wf_runtime.shutdown() + + +if __name__ == "__main__": + logging.basicConfig( + format='%(asctime)s.%(msecs)03d %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.INFO) + main() diff --git a/workflow-orderprocessing-python/services/orderprocessing/requirements.txt b/workflow-orderprocessing-python/services/orderprocessing/requirements.txt new file mode 100644 index 0000000..20fa504 --- /dev/null +++ b/workflow-orderprocessing-python/services/orderprocessing/requirements.txt @@ -0,0 +1,2 @@ +dapr-ext-workflow >= 0.2.0 +flask >= 3.0.0 diff --git a/workflow-orderprocessing-python/services/payments/app.py b/workflow-orderprocessing-python/services/payments/app.py new file mode 100644 index 0000000..1298ebd --- /dev/null +++ b/workflow-orderprocessing-python/services/payments/app.py @@ -0,0 +1,48 @@ +import logging +import os +import time + +from flask import Flask, request + +APP_PORT = os.getenv("APP_PORT", "3003") + +app = Flask(__name__) + + +@app.route("/payments/charge", methods=["POST"]) +def charge(): + logging.info(f"Charging payment for order: {request.json}") + + # Simulate work + time.sleep(1) + + return '', 200 + + +@app.route("/payments/refund", methods=["POST"]) +def refund(): + logging.info(f"Refunding payment for order: {request.json}") + + # Simulate work + time.sleep(1) + + return '', 200 + + +@app.route("/", methods=["GET"]) +@app.route("/healthz", methods=["GET"]) +def hello(): + return f"Hello from {__name__}", 200 + + +def main(): + # Start the Flask app server + app.run(port=APP_PORT, debug=True, use_reloader=False) + + +if __name__ == "__main__": + logging.basicConfig( + format='%(asctime)s.%(msecs)03d %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.INFO) + main() diff --git a/workflow-orderprocessing-python/services/payments/requirements.txt b/workflow-orderprocessing-python/services/payments/requirements.txt new file mode 100644 index 0000000..609c2fa --- /dev/null +++ b/workflow-orderprocessing-python/services/payments/requirements.txt @@ -0,0 +1,2 @@ +dapr >= 1.11.0 +flask >= 3.0.0 diff --git a/workflow-orderprocessing-python/services/shipping/app.py b/workflow-orderprocessing-python/services/shipping/app.py new file mode 100644 index 0000000..296fc72 --- /dev/null +++ b/workflow-orderprocessing-python/services/shipping/app.py @@ -0,0 +1,58 @@ +import logging +import os +import time + +from flask import Flask, request + +APP_PORT = os.getenv("APP_PORT", "3004") + +app = Flask(__name__) + +is_deactivated = False + + +@app.route("/shipping/ship", methods=["POST"]) +def ship(): + if is_deactivated: + return "The shipping service is currently deactivated for routine maintenance.", 503 + logging.info(f"Shipping order: {request.json}") + + # Simulate work + time.sleep(1) + + return '', 200 + + +@app.route("/shipping/deactivate", methods=["POST"]) +def deactivate(): + global is_deactivated + is_deactivated = True + logging.warning("The shipping service has been deactivated for routine maintenance.") + return '', 200 + + +@app.route("/shipping/activate", methods=["POST"]) +def activate(): + global is_deactivated + is_deactivated = False + logging.info("The shipping service has been (re)activated.") + return '', 200 + + +@app.route("/", methods=["GET"]) +@app.route("/healthz", methods=["GET"]) +def hello(): + return f"Hello from {__name__}", 200 + + +def main(): + # Start the Flask app server + app.run(port=APP_PORT, debug=True, use_reloader=False) + + +if __name__ == "__main__": + logging.basicConfig( + format='%(asctime)s.%(msecs)03d %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.INFO) + main() diff --git a/workflow-orderprocessing-python/services/shipping/requirements.txt b/workflow-orderprocessing-python/services/shipping/requirements.txt new file mode 100644 index 0000000..609c2fa --- /dev/null +++ b/workflow-orderprocessing-python/services/shipping/requirements.txt @@ -0,0 +1,2 @@ +dapr >= 1.11.0 +flask >= 3.0.0 diff --git a/workflow-orderprocessing-python/zipkin-workflow-failure.png b/workflow-orderprocessing-python/zipkin-workflow-failure.png new file mode 100644 index 0000000..4b544f6 Binary files /dev/null and b/workflow-orderprocessing-python/zipkin-workflow-failure.png differ diff --git a/workflow-orderprocessing-python/zipkin-workflow-success.png b/workflow-orderprocessing-python/zipkin-workflow-success.png new file mode 100644 index 0000000..20abf5d Binary files /dev/null and b/workflow-orderprocessing-python/zipkin-workflow-success.png differ