From b74c247d01eeec64ce887eb6418039fa623208f1 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Fri, 28 Feb 2025 16:32:37 -0500 Subject: [PATCH] update python examples for workflow; update conversation quickstart to python sdk Signed-off-by: Hannah Hunter --- .../workflow/howto-author-workflow.md | 159 ++++++++++-------- .../quickstarts/conversation-quickstart.md | 53 +++--- .../quickstarts/workflow-quickstart.md | 9 - 3 files changed, 108 insertions(+), 113 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index 3345b97b2..ae2285d37 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -36,16 +36,31 @@ The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar si -Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`. +Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates task chaining activities that receive input ```python -def hello_act(ctx: WorkflowActivityContext, input): - global counter - counter += input - print(f'New counter value is: {counter}!', flush=True) +@wfr.activity(name='step10') +def step1(ctx, activity_input): + print(f'Step 1: Received input: {activity_input}.') + # Do some work + return activity_input + 1 + + +@wfr.activity +def step2(ctx, activity_input): + print(f'Step 2: Received input: {activity_input}.') + # Do some work + return activity_input * 2 + + +@wfr.activity +def step3(ctx, activity_input): + print(f'Step 3: Received input: {activity_input}.') + # Do some work + return activity_input ^ 2 ``` -[See the `hello_act` workflow activity in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL40C1-L43C59) +[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py) {{% /codetab %}} @@ -226,16 +241,19 @@ Next, register and call the activites in a workflow. -The `hello_world_wf` function is derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. +The `random_workflow` function is a task chaining workflow pattern derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. ```python -def hello_world_wf(ctx: DaprWorkflowContext, input): - print(f'{input}') - yield ctx.call_activity(hello_act, input=1) - yield ctx.call_activity(hello_act, input=10) - yield ctx.wait_for_external_event("event1") - yield ctx.call_activity(hello_act, input=100) - yield ctx.call_activity(hello_act, input=1000) +@wfr.workflow(name='random_workflow') +def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): + try: + result1 = yield ctx.call_activity(step1, input=wf_input) + result2 = yield ctx.call_activity(step2, input=result1) + result3 = yield ctx.call_activity(step3, input=result2) + except Exception as e: + yield ctx.call_activity(error_handler, input=str(e)) + raise + return [result1, result2, result3] ``` [See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51) @@ -409,82 +427,77 @@ Finally, compose the application using the workflow. - A Python package called `DaprClient` to receive the Python SDK capabilities. - A builder with extensions called: - - `WorkflowRuntime`: Allows you to register workflows and workflow activities - - `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}}) + - `WorkflowRuntime`: Allows you to register the workflow runtime. + - `DaprWorkflowContext`: Allows you to [create workflows and workflow activities]({{< ref "#write-the-workflow" >}}) - `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}}) - API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow. ```python -from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext -from dapr.clients import DaprClient +from durabletask import worker, task -# ... +from dapr.ext.workflow.workflow_context import Workflow +from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext +from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext +from dapr.ext.workflow.util import getAddress -def main(): - with DaprClient() as d: - host = settings.DAPR_RUNTIME_HOST - port = settings.DAPR_GRPC_PORT - workflowRuntime = WorkflowRuntime(host, port) - workflowRuntime = WorkflowRuntime() - workflowRuntime.register_workflow(hello_world_wf) - workflowRuntime.register_activity(hello_act) - workflowRuntime.start() +from dapr.clients import DaprInternalError +from dapr.clients.http.client import DAPR_API_TOKEN_HEADER +from dapr.conf import settings +from dapr.conf.helpers import GrpcEndpoint +from dapr.ext.workflow.logger import LoggerOptions, Logger - # Start workflow - print("==========Start Counter Increase as per Input:==========") - start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) - print(f"start_resp {start_resp.instance_id}") +wfr = wf.WorkflowRuntime() - # ... + @wfr.workflow(name='hello_world_wf') + def hello_world_wf(ctx: DaprWorkflowContext, wf_input): + # Workflow definition... - # Pause workflow - d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}") + @wfr.activity(name='hello_act') + def hello_act(ctx: WorkflowActivityContext, wf_input): + # Activity definition... - # Resume workflow - d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}") - - sleep(1) - # Raise workflow - d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent, - event_name=eventName, event_data=eventData) + # Start workflow + wfr = WorkflowRuntime() + wfr.start() + wf_client = DaprWorkflowClient() - sleep(5) - # Purge workflow - d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) - try: - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - except DaprInternalError as err: - if nonExistentIDError in err._message: - print("Instance Successfully Purged") + # ... - # Kick off another workflow for termination purposes - start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) - print(f"start_resp {start_resp.instance_id}") + # Pause workflow + wf_client.pause_workflow(instance_id=instance_id) + metadata = wf_client.get_workflow_state(instance_id=instance_id) + # ... check status ... + wf_client.resume_workflow(instance_id=instance_id) + + sleep(1) - # Terminate workflow - d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent) - sleep(1) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}") + # Raise workflow + wf_client.raise_workflow_event( + instance_id=instance_id, + event_name=event_name, + data=event_data + ) - # Purge workflow - d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) - try: - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - except DaprInternalError as err: - if nonExistentIDError in err._message: - print("Instance Successfully Purged") + # Purge workflow + state = wf_client.wait_for_workflow_completion( + instance_id, + timeout_in_seconds=30 + ) + wf_client.purge_workflow(instance_id=instance_id) - workflowRuntime.shutdown() + workflowRuntime.shutdown() if __name__ == '__main__': - main() + wfr.start() + sleep(10) # wait for workflow runtime to start + + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42) + print(f'Workflow started. Instance ID: {instance_id}') + state = wf_client.wait_for_workflow_completion(instance_id) + print(f'Workflow completed! Status: {state.runtime_status}') + + wfr.shutdown() ``` diff --git a/daprdocs/content/en/getting-started/quickstarts/conversation-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/conversation-quickstart.md index 8abed6f58..e38701bfa 100644 --- a/daprdocs/content/en/getting-started/quickstarts/conversation-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/conversation-quickstart.md @@ -10,7 +10,7 @@ description: Get started with the Dapr conversation building block The conversation building block is currently in **alpha**. {{% /alert %}} -Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it for a poem about Dapr. +Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it to define Dapr. You can try out this conversation quickstart by either: @@ -18,7 +18,7 @@ You can try out this conversation quickstart by either: - [Running the application without the template]({{< ref "#run-the-app-without-the-template" >}}) {{% alert title="Note" color="primary" %}} -Currently, only the HTTP quickstart sample is available in Python and JavaScript. +Currently, you can only use JavaScript for the quickstart sample using HTTP, not the JavaScript SDK. {{% /alert %}} ## Run the app with the template file @@ -50,7 +50,7 @@ git clone https://github.com/dapr/quickstarts.git From the root of the Quickstarts directory, navigate into the conversation directory: ```bash -cd conversation/python/http/conversation +cd conversation/python/sdk/conversation ``` Install the dependencies: @@ -61,7 +61,7 @@ pip3 install -r requirements.txt ### Step 3: Launch the conversation service -Navigate back to the `http` directory and start the conversation service with the following command: +Navigate back to the `sdk` directory and start the conversation service with the following command: ```bash dapr run -f . @@ -117,37 +117,28 @@ In the application code: - The mock LLM echoes "What is dapr?". ```python -import logging -import requests -import os +from dapr.clients import DaprClient +from dapr.clients.grpc._request import ConversationInput -logging.basicConfig(level=logging.INFO) +with DaprClient() as d: + inputs = [ + ConversationInput(content="What is dapr?", role='user', scrub_pii=True), + ] -base_url = os.getenv('BASE_URL', 'http://localhost') + ':' + os.getenv( - 'DAPR_HTTP_PORT', '3500') - -CONVERSATION_COMPONENT_NAME = 'echo' - -input = { - 'name': 'echo', - 'inputs': [{'message':'What is dapr?'}], - 'parameters': {}, - 'metadata': {} + metadata = { + 'model': 'modelname', + 'key': 'authKey', + 'cacheTTL': '10m', } -# Send input to conversation endpoint -result = requests.post( - url='%s/v1.0-alpha1/conversation/%s/converse' % (base_url, CONVERSATION_COMPONENT_NAME), - json=input -) + print('Input sent: What is dapr?') -logging.info('Input sent: What is dapr?') + response = d.converse_alpha1( + name='echo', inputs=inputs, temperature=0.7, context_id='chat-123', metadata=metadata + ) -# Parse conversation output -data = result.json() -output = data["outputs"][0]["result"] - -logging.info('Output response: ' + output) + for output in response.outputs: + print(f'Output response: {output.result}') ``` {{% /codetab %}} @@ -575,7 +566,7 @@ git clone https://github.com/dapr/quickstarts.git From the root of the Quickstarts directory, navigate into the conversation directory: ```bash -cd conversation/python/http/conversation +cd conversation/python/sdk/conversation ``` Install the dependencies: @@ -586,7 +577,7 @@ pip3 install -r requirements.txt ### Step 3: Launch the conversation service -Navigate back to the `http` directory and start the conversation service with the following command: +Navigate back to the `sdk` directory and start the conversation service with the following command: ```bash dapr run --app-id conversation --resources-path ../../../components -- python3 app.py diff --git a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md index 5f50c6a99..02929b31d 100644 --- a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md @@ -251,7 +251,6 @@ class WorkflowConsoleApp: if __name__ == '__main__': app = WorkflowConsoleApp() app.main() - ``` #### `order-processor/workflow.py` @@ -276,7 +275,6 @@ wfr = WorkflowRuntime() logging.basicConfig(level=logging.INFO) - @wfr.workflow(name="order_processing_workflow") def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: str): """Defines the order processing workflow. @@ -343,7 +341,6 @@ def notify_activity(ctx: WorkflowActivityContext, input: Notification): logger = logging.getLogger('NotifyActivity') logger.info(input.message) - @wfr.activity(name="process_payment_activity") def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest): """Defines Process Payment Activity.This is used by the workflow to process a payment""" @@ -353,7 +350,6 @@ def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest +' USD') logger.info(f'Payment for request ID {input.request_id} processed successfully') - @wfr.activity(name="verify_inventory_activity") def verify_inventory_activity(ctx: WorkflowActivityContext, input: InventoryRequest) -> InventoryResult: @@ -377,8 +373,6 @@ def verify_inventory_activity(ctx: WorkflowActivityContext, return InventoryResult(True, inventory_item) return InventoryResult(False, None) - - @wfr.activity(name="update_inventory_activity") def update_inventory_activity(ctx: WorkflowActivityContext, input: PaymentRequest) -> InventoryResult: @@ -401,8 +395,6 @@ def update_inventory_activity(ctx: WorkflowActivityContext, client.save_state(store_name, input.item_being_purchased, new_val) logger.info(f'There are now {new_quantity} {input.item_being_purchased} left in stock') - - @wfr.activity(name="request_approval_activity") def request_approval_activity(ctx: WorkflowActivityContext, input: OrderPayload): @@ -413,7 +405,6 @@ def request_approval_activity(ctx: WorkflowActivityContext, logger.info('Requesting approval for payment of '+f'{input["total_cost"]}'+' USD for ' +f'{input["quantity"]}' +' ' +f'{input["item_name"]}') - ``` {{% /codetab %}}