update python examples for workflow; update conversation quickstart to python sdk

Signed-off-by: Hannah Hunter <hannahhunter@microsoft.com>
This commit is contained in:
Hannah Hunter 2025-02-28 16:32:37 -05:00
parent 3688a1d684
commit b74c247d01
3 changed files with 108 additions and 113 deletions

View File

@ -36,16 +36,31 @@ The Dapr sidecar doesnt load any workflow definitions. Rather, the sidecar si
<!--python--> <!--python-->
Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`. Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates task chaining activities that receive input
```python ```python
def hello_act(ctx: WorkflowActivityContext, input): @wfr.activity(name='step10')
global counter def step1(ctx, activity_input):
counter += input print(f'Step 1: Received input: {activity_input}.')
print(f'New counter value is: {counter}!', flush=True) # Do some work
return activity_input + 1
@wfr.activity
def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
return activity_input * 2
@wfr.activity
def step3(ctx, activity_input):
print(f'Step 3: Received input: {activity_input}.')
# Do some work
return activity_input ^ 2
``` ```
[See the `hello_act` workflow activity in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL40C1-L43C59) [See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py)
{{% /codetab %}} {{% /codetab %}}
@ -226,16 +241,19 @@ Next, register and call the activites in a workflow.
<!--python--> <!--python-->
The `hello_world_wf` function is derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. The `random_workflow` function is a task chaining workflow pattern derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities.
```python ```python
def hello_world_wf(ctx: DaprWorkflowContext, input): @wfr.workflow(name='random_workflow')
print(f'{input}') def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
yield ctx.call_activity(hello_act, input=1) try:
yield ctx.call_activity(hello_act, input=10) result1 = yield ctx.call_activity(step1, input=wf_input)
yield ctx.wait_for_external_event("event1") result2 = yield ctx.call_activity(step2, input=result1)
yield ctx.call_activity(hello_act, input=100) result3 = yield ctx.call_activity(step3, input=result2)
yield ctx.call_activity(hello_act, input=1000) except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
return [result1, result2, result3]
``` ```
[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51) [See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51)
@ -409,82 +427,77 @@ Finally, compose the application using the workflow.
- A Python package called `DaprClient` to receive the Python SDK capabilities. - A Python package called `DaprClient` to receive the Python SDK capabilities.
- A builder with extensions called: - A builder with extensions called:
- `WorkflowRuntime`: Allows you to register workflows and workflow activities - `WorkflowRuntime`: Allows you to register the workflow runtime.
- `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}}) - `DaprWorkflowContext`: Allows you to [create workflows and workflow activities]({{< ref "#write-the-workflow" >}})
- `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}}) - `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}})
- API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow. - API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow.
```python ```python
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext from durabletask import worker, task
from dapr.clients import DaprClient
# ... from dapr.ext.workflow.workflow_context import Workflow
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext
from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext
from dapr.ext.workflow.util import getAddress
def main(): from dapr.clients import DaprInternalError
with DaprClient() as d: from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
host = settings.DAPR_RUNTIME_HOST from dapr.conf import settings
port = settings.DAPR_GRPC_PORT from dapr.conf.helpers import GrpcEndpoint
workflowRuntime = WorkflowRuntime(host, port) from dapr.ext.workflow.logger import LoggerOptions, Logger
workflowRuntime = WorkflowRuntime()
workflowRuntime.register_workflow(hello_world_wf) wfr = wf.WorkflowRuntime()
workflowRuntime.register_activity(hello_act)
workflowRuntime.start() @wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
# Workflow definition...
@wfr.activity(name='hello_act')
def hello_act(ctx: WorkflowActivityContext, wf_input):
# Activity definition...
# Start workflow # Start workflow
print("==========Start Counter Increase as per Input:==========") wfr = WorkflowRuntime()
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, wfr.start()
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) wf_client = DaprWorkflowClient()
print(f"start_resp {start_resp.instance_id}")
# ... # ...
# Pause workflow # Pause workflow
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent) wf_client.pause_workflow(instance_id=instance_id)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) metadata = wf_client.get_workflow_state(instance_id=instance_id)
print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}") # ... check status ...
wf_client.resume_workflow(instance_id=instance_id)
# Resume workflow
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
sleep(1) sleep(1)
# Raise workflow # Raise workflow
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent, wf_client.raise_workflow_event(
event_name=eventName, event_data=eventData) instance_id=instance_id,
event_name=event_name,
sleep(5) data=event_data
# Purge workflow )
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
try:
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
except DaprInternalError as err:
if nonExistentIDError in err._message:
print("Instance Successfully Purged")
# Kick off another workflow for termination purposes
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
print(f"start_resp {start_resp.instance_id}")
# Terminate workflow
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
sleep(1)
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
# Purge workflow # Purge workflow
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) state = wf_client.wait_for_workflow_completion(
try: instance_id,
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) timeout_in_seconds=30
except DaprInternalError as err: )
if nonExistentIDError in err._message: wf_client.purge_workflow(instance_id=instance_id)
print("Instance Successfully Purged")
workflowRuntime.shutdown() workflowRuntime.shutdown()
if __name__ == '__main__': if __name__ == '__main__':
main() wfr.start()
sleep(10) # wait for workflow runtime to start
wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42)
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)
print(f'Workflow completed! Status: {state.runtime_status}')
wfr.shutdown()
``` ```

View File

@ -10,7 +10,7 @@ description: Get started with the Dapr conversation building block
The conversation building block is currently in **alpha**. The conversation building block is currently in **alpha**.
{{% /alert %}} {{% /alert %}}
Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it for a poem about Dapr. Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it to define Dapr.
You can try out this conversation quickstart by either: You can try out this conversation quickstart by either:
@ -18,7 +18,7 @@ You can try out this conversation quickstart by either:
- [Running the application without the template]({{< ref "#run-the-app-without-the-template" >}}) - [Running the application without the template]({{< ref "#run-the-app-without-the-template" >}})
{{% alert title="Note" color="primary" %}} {{% alert title="Note" color="primary" %}}
Currently, only the HTTP quickstart sample is available in Python and JavaScript. Currently, you can only use JavaScript for the quickstart sample using HTTP, not the JavaScript SDK.
{{% /alert %}} {{% /alert %}}
## Run the app with the template file ## Run the app with the template file
@ -50,7 +50,7 @@ git clone https://github.com/dapr/quickstarts.git
From the root of the Quickstarts directory, navigate into the conversation directory: From the root of the Quickstarts directory, navigate into the conversation directory:
```bash ```bash
cd conversation/python/http/conversation cd conversation/python/sdk/conversation
``` ```
Install the dependencies: Install the dependencies:
@ -61,7 +61,7 @@ pip3 install -r requirements.txt
### Step 3: Launch the conversation service ### Step 3: Launch the conversation service
Navigate back to the `http` directory and start the conversation service with the following command: Navigate back to the `sdk` directory and start the conversation service with the following command:
```bash ```bash
dapr run -f . dapr run -f .
@ -117,37 +117,28 @@ In the application code:
- The mock LLM echoes "What is dapr?". - The mock LLM echoes "What is dapr?".
```python ```python
import logging from dapr.clients import DaprClient
import requests from dapr.clients.grpc._request import ConversationInput
import os
logging.basicConfig(level=logging.INFO) with DaprClient() as d:
inputs = [
ConversationInput(content="What is dapr?", role='user', scrub_pii=True),
]
base_url = os.getenv('BASE_URL', 'http://localhost') + ':' + os.getenv( metadata = {
'DAPR_HTTP_PORT', '3500') 'model': 'modelname',
'key': 'authKey',
CONVERSATION_COMPONENT_NAME = 'echo' 'cacheTTL': '10m',
input = {
'name': 'echo',
'inputs': [{'message':'What is dapr?'}],
'parameters': {},
'metadata': {}
} }
# Send input to conversation endpoint print('Input sent: What is dapr?')
result = requests.post(
url='%s/v1.0-alpha1/conversation/%s/converse' % (base_url, CONVERSATION_COMPONENT_NAME),
json=input
)
logging.info('Input sent: What is dapr?') response = d.converse_alpha1(
name='echo', inputs=inputs, temperature=0.7, context_id='chat-123', metadata=metadata
)
# Parse conversation output for output in response.outputs:
data = result.json() print(f'Output response: {output.result}')
output = data["outputs"][0]["result"]
logging.info('Output response: ' + output)
``` ```
{{% /codetab %}} {{% /codetab %}}
@ -575,7 +566,7 @@ git clone https://github.com/dapr/quickstarts.git
From the root of the Quickstarts directory, navigate into the conversation directory: From the root of the Quickstarts directory, navigate into the conversation directory:
```bash ```bash
cd conversation/python/http/conversation cd conversation/python/sdk/conversation
``` ```
Install the dependencies: Install the dependencies:
@ -586,7 +577,7 @@ pip3 install -r requirements.txt
### Step 3: Launch the conversation service ### Step 3: Launch the conversation service
Navigate back to the `http` directory and start the conversation service with the following command: Navigate back to the `sdk` directory and start the conversation service with the following command:
```bash ```bash
dapr run --app-id conversation --resources-path ../../../components -- python3 app.py dapr run --app-id conversation --resources-path ../../../components -- python3 app.py

View File

@ -251,7 +251,6 @@ class WorkflowConsoleApp:
if __name__ == '__main__': if __name__ == '__main__':
app = WorkflowConsoleApp() app = WorkflowConsoleApp()
app.main() app.main()
``` ```
#### `order-processor/workflow.py` #### `order-processor/workflow.py`
@ -276,7 +275,6 @@ wfr = WorkflowRuntime()
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@wfr.workflow(name="order_processing_workflow") @wfr.workflow(name="order_processing_workflow")
def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: str): def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: str):
"""Defines the order processing workflow. """Defines the order processing workflow.
@ -343,7 +341,6 @@ def notify_activity(ctx: WorkflowActivityContext, input: Notification):
logger = logging.getLogger('NotifyActivity') logger = logging.getLogger('NotifyActivity')
logger.info(input.message) logger.info(input.message)
@wfr.activity(name="process_payment_activity") @wfr.activity(name="process_payment_activity")
def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest): def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest):
"""Defines Process Payment Activity.This is used by the workflow to process a payment""" """Defines Process Payment Activity.This is used by the workflow to process a payment"""
@ -353,7 +350,6 @@ def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest
+' USD') +' USD')
logger.info(f'Payment for request ID {input.request_id} processed successfully') logger.info(f'Payment for request ID {input.request_id} processed successfully')
@wfr.activity(name="verify_inventory_activity") @wfr.activity(name="verify_inventory_activity")
def verify_inventory_activity(ctx: WorkflowActivityContext, def verify_inventory_activity(ctx: WorkflowActivityContext,
input: InventoryRequest) -> InventoryResult: input: InventoryRequest) -> InventoryResult:
@ -377,8 +373,6 @@ def verify_inventory_activity(ctx: WorkflowActivityContext,
return InventoryResult(True, inventory_item) return InventoryResult(True, inventory_item)
return InventoryResult(False, None) return InventoryResult(False, None)
@wfr.activity(name="update_inventory_activity") @wfr.activity(name="update_inventory_activity")
def update_inventory_activity(ctx: WorkflowActivityContext, def update_inventory_activity(ctx: WorkflowActivityContext,
input: PaymentRequest) -> InventoryResult: input: PaymentRequest) -> InventoryResult:
@ -401,8 +395,6 @@ def update_inventory_activity(ctx: WorkflowActivityContext,
client.save_state(store_name, input.item_being_purchased, new_val) client.save_state(store_name, input.item_being_purchased, new_val)
logger.info(f'There are now {new_quantity} {input.item_being_purchased} left in stock') logger.info(f'There are now {new_quantity} {input.item_being_purchased} left in stock')
@wfr.activity(name="request_approval_activity") @wfr.activity(name="request_approval_activity")
def request_approval_activity(ctx: WorkflowActivityContext, def request_approval_activity(ctx: WorkflowActivityContext,
input: OrderPayload): input: OrderPayload):
@ -413,7 +405,6 @@ def request_approval_activity(ctx: WorkflowActivityContext,
logger.info('Requesting approval for payment of '+f'{input["total_cost"]}'+' USD for ' logger.info('Requesting approval for payment of '+f'{input["total_cost"]}'+' USD for '
+f'{input["quantity"]}' +' ' +f'{input["item_name"]}') +f'{input["quantity"]}' +' ' +f'{input["item_name"]}')
``` ```
{{% /codetab %}} {{% /codetab %}}