From b74c247d01eeec64ce887eb6418039fa623208f1 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Fri, 28 Feb 2025 16:32:37 -0500 Subject: [PATCH 1/5] update python examples for workflow; update conversation quickstart to python sdk Signed-off-by: Hannah Hunter --- .../workflow/howto-author-workflow.md | 159 ++++++++++-------- .../quickstarts/conversation-quickstart.md | 53 +++--- .../quickstarts/workflow-quickstart.md | 9 - 3 files changed, 108 insertions(+), 113 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index 3345b97b2..ae2285d37 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -36,16 +36,31 @@ The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar si -Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`. +Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates task chaining activities that receive input ```python -def hello_act(ctx: WorkflowActivityContext, input): - global counter - counter += input - print(f'New counter value is: {counter}!', flush=True) +@wfr.activity(name='step10') +def step1(ctx, activity_input): + print(f'Step 1: Received input: {activity_input}.') + # Do some work + return activity_input + 1 + + +@wfr.activity +def step2(ctx, activity_input): + print(f'Step 2: Received input: {activity_input}.') + # Do some work + return activity_input * 2 + + +@wfr.activity +def step3(ctx, activity_input): + print(f'Step 3: Received input: {activity_input}.') + # Do some work + return activity_input ^ 2 ``` -[See the `hello_act` workflow activity in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL40C1-L43C59) +[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py) {{% /codetab %}} @@ -226,16 +241,19 @@ Next, register and call the activites in a workflow. -The `hello_world_wf` function is derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. +The `random_workflow` function is a task chaining workflow pattern derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. ```python -def hello_world_wf(ctx: DaprWorkflowContext, input): - print(f'{input}') - yield ctx.call_activity(hello_act, input=1) - yield ctx.call_activity(hello_act, input=10) - yield ctx.wait_for_external_event("event1") - yield ctx.call_activity(hello_act, input=100) - yield ctx.call_activity(hello_act, input=1000) +@wfr.workflow(name='random_workflow') +def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): + try: + result1 = yield ctx.call_activity(step1, input=wf_input) + result2 = yield ctx.call_activity(step2, input=result1) + result3 = yield ctx.call_activity(step3, input=result2) + except Exception as e: + yield ctx.call_activity(error_handler, input=str(e)) + raise + return [result1, result2, result3] ``` [See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51) @@ -409,82 +427,77 @@ Finally, compose the application using the workflow. - A Python package called `DaprClient` to receive the Python SDK capabilities. - A builder with extensions called: - - `WorkflowRuntime`: Allows you to register workflows and workflow activities - - `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}}) + - `WorkflowRuntime`: Allows you to register the workflow runtime. + - `DaprWorkflowContext`: Allows you to [create workflows and workflow activities]({{< ref "#write-the-workflow" >}}) - `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}}) - API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow. ```python -from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext -from dapr.clients import DaprClient +from durabletask import worker, task -# ... +from dapr.ext.workflow.workflow_context import Workflow +from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext +from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext +from dapr.ext.workflow.util import getAddress -def main(): - with DaprClient() as d: - host = settings.DAPR_RUNTIME_HOST - port = settings.DAPR_GRPC_PORT - workflowRuntime = WorkflowRuntime(host, port) - workflowRuntime = WorkflowRuntime() - workflowRuntime.register_workflow(hello_world_wf) - workflowRuntime.register_activity(hello_act) - workflowRuntime.start() +from dapr.clients import DaprInternalError +from dapr.clients.http.client import DAPR_API_TOKEN_HEADER +from dapr.conf import settings +from dapr.conf.helpers import GrpcEndpoint +from dapr.ext.workflow.logger import LoggerOptions, Logger - # Start workflow - print("==========Start Counter Increase as per Input:==========") - start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) - print(f"start_resp {start_resp.instance_id}") +wfr = wf.WorkflowRuntime() - # ... + @wfr.workflow(name='hello_world_wf') + def hello_world_wf(ctx: DaprWorkflowContext, wf_input): + # Workflow definition... - # Pause workflow - d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}") + @wfr.activity(name='hello_act') + def hello_act(ctx: WorkflowActivityContext, wf_input): + # Activity definition... - # Resume workflow - d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}") - - sleep(1) - # Raise workflow - d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent, - event_name=eventName, event_data=eventData) + # Start workflow + wfr = WorkflowRuntime() + wfr.start() + wf_client = DaprWorkflowClient() - sleep(5) - # Purge workflow - d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) - try: - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - except DaprInternalError as err: - if nonExistentIDError in err._message: - print("Instance Successfully Purged") + # ... - # Kick off another workflow for termination purposes - start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) - print(f"start_resp {start_resp.instance_id}") + # Pause workflow + wf_client.pause_workflow(instance_id=instance_id) + metadata = wf_client.get_workflow_state(instance_id=instance_id) + # ... check status ... + wf_client.resume_workflow(instance_id=instance_id) + + sleep(1) - # Terminate workflow - d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent) - sleep(1) - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}") + # Raise workflow + wf_client.raise_workflow_event( + instance_id=instance_id, + event_name=event_name, + data=event_data + ) - # Purge workflow - d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) - try: - getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) - except DaprInternalError as err: - if nonExistentIDError in err._message: - print("Instance Successfully Purged") + # Purge workflow + state = wf_client.wait_for_workflow_completion( + instance_id, + timeout_in_seconds=30 + ) + wf_client.purge_workflow(instance_id=instance_id) - workflowRuntime.shutdown() + workflowRuntime.shutdown() if __name__ == '__main__': - main() + wfr.start() + sleep(10) # wait for workflow runtime to start + + wf_client = wf.DaprWorkflowClient() + instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42) + print(f'Workflow started. Instance ID: {instance_id}') + state = wf_client.wait_for_workflow_completion(instance_id) + print(f'Workflow completed! Status: {state.runtime_status}') + + wfr.shutdown() ``` diff --git a/daprdocs/content/en/getting-started/quickstarts/conversation-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/conversation-quickstart.md index 8abed6f58..e38701bfa 100644 --- a/daprdocs/content/en/getting-started/quickstarts/conversation-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/conversation-quickstart.md @@ -10,7 +10,7 @@ description: Get started with the Dapr conversation building block The conversation building block is currently in **alpha**. {{% /alert %}} -Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it for a poem about Dapr. +Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it to define Dapr. You can try out this conversation quickstart by either: @@ -18,7 +18,7 @@ You can try out this conversation quickstart by either: - [Running the application without the template]({{< ref "#run-the-app-without-the-template" >}}) {{% alert title="Note" color="primary" %}} -Currently, only the HTTP quickstart sample is available in Python and JavaScript. +Currently, you can only use JavaScript for the quickstart sample using HTTP, not the JavaScript SDK. {{% /alert %}} ## Run the app with the template file @@ -50,7 +50,7 @@ git clone https://github.com/dapr/quickstarts.git From the root of the Quickstarts directory, navigate into the conversation directory: ```bash -cd conversation/python/http/conversation +cd conversation/python/sdk/conversation ``` Install the dependencies: @@ -61,7 +61,7 @@ pip3 install -r requirements.txt ### Step 3: Launch the conversation service -Navigate back to the `http` directory and start the conversation service with the following command: +Navigate back to the `sdk` directory and start the conversation service with the following command: ```bash dapr run -f . @@ -117,37 +117,28 @@ In the application code: - The mock LLM echoes "What is dapr?". ```python -import logging -import requests -import os +from dapr.clients import DaprClient +from dapr.clients.grpc._request import ConversationInput -logging.basicConfig(level=logging.INFO) +with DaprClient() as d: + inputs = [ + ConversationInput(content="What is dapr?", role='user', scrub_pii=True), + ] -base_url = os.getenv('BASE_URL', 'http://localhost') + ':' + os.getenv( - 'DAPR_HTTP_PORT', '3500') - -CONVERSATION_COMPONENT_NAME = 'echo' - -input = { - 'name': 'echo', - 'inputs': [{'message':'What is dapr?'}], - 'parameters': {}, - 'metadata': {} + metadata = { + 'model': 'modelname', + 'key': 'authKey', + 'cacheTTL': '10m', } -# Send input to conversation endpoint -result = requests.post( - url='%s/v1.0-alpha1/conversation/%s/converse' % (base_url, CONVERSATION_COMPONENT_NAME), - json=input -) + print('Input sent: What is dapr?') -logging.info('Input sent: What is dapr?') + response = d.converse_alpha1( + name='echo', inputs=inputs, temperature=0.7, context_id='chat-123', metadata=metadata + ) -# Parse conversation output -data = result.json() -output = data["outputs"][0]["result"] - -logging.info('Output response: ' + output) + for output in response.outputs: + print(f'Output response: {output.result}') ``` {{% /codetab %}} @@ -575,7 +566,7 @@ git clone https://github.com/dapr/quickstarts.git From the root of the Quickstarts directory, navigate into the conversation directory: ```bash -cd conversation/python/http/conversation +cd conversation/python/sdk/conversation ``` Install the dependencies: @@ -586,7 +577,7 @@ pip3 install -r requirements.txt ### Step 3: Launch the conversation service -Navigate back to the `http` directory and start the conversation service with the following command: +Navigate back to the `sdk` directory and start the conversation service with the following command: ```bash dapr run --app-id conversation --resources-path ../../../components -- python3 app.py diff --git a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md index 5f50c6a99..02929b31d 100644 --- a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md @@ -251,7 +251,6 @@ class WorkflowConsoleApp: if __name__ == '__main__': app = WorkflowConsoleApp() app.main() - ``` #### `order-processor/workflow.py` @@ -276,7 +275,6 @@ wfr = WorkflowRuntime() logging.basicConfig(level=logging.INFO) - @wfr.workflow(name="order_processing_workflow") def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: str): """Defines the order processing workflow. @@ -343,7 +341,6 @@ def notify_activity(ctx: WorkflowActivityContext, input: Notification): logger = logging.getLogger('NotifyActivity') logger.info(input.message) - @wfr.activity(name="process_payment_activity") def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest): """Defines Process Payment Activity.This is used by the workflow to process a payment""" @@ -353,7 +350,6 @@ def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest +' USD') logger.info(f'Payment for request ID {input.request_id} processed successfully') - @wfr.activity(name="verify_inventory_activity") def verify_inventory_activity(ctx: WorkflowActivityContext, input: InventoryRequest) -> InventoryResult: @@ -377,8 +373,6 @@ def verify_inventory_activity(ctx: WorkflowActivityContext, return InventoryResult(True, inventory_item) return InventoryResult(False, None) - - @wfr.activity(name="update_inventory_activity") def update_inventory_activity(ctx: WorkflowActivityContext, input: PaymentRequest) -> InventoryResult: @@ -401,8 +395,6 @@ def update_inventory_activity(ctx: WorkflowActivityContext, client.save_state(store_name, input.item_being_purchased, new_val) logger.info(f'There are now {new_quantity} {input.item_being_purchased} left in stock') - - @wfr.activity(name="request_approval_activity") def request_approval_activity(ctx: WorkflowActivityContext, input: OrderPayload): @@ -413,7 +405,6 @@ def request_approval_activity(ctx: WorkflowActivityContext, logger.info('Requesting approval for payment of '+f'{input["total_cost"]}'+' USD for ' +f'{input["quantity"]}' +' ' +f'{input["item_name"]}') - ``` {{% /codetab %}} From 60d5332d1aaa1413296eead40cbb49afdd076624 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Mon, 17 Mar 2025 13:04:21 -0400 Subject: [PATCH 2/5] update author and manage workflow how-tos Signed-off-by: Hannah Hunter --- .../workflow/howto-author-workflow.md | 261 ++++++++++++------ .../workflow/howto-manage-workflow.md | 27 +- 2 files changed, 189 insertions(+), 99 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md index ae2285d37..009850fae 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md @@ -36,31 +36,17 @@ The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar si -Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates task chaining activities that receive input +Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`. ```python -@wfr.activity(name='step10') -def step1(ctx, activity_input): - print(f'Step 1: Received input: {activity_input}.') - # Do some work - return activity_input + 1 - - -@wfr.activity -def step2(ctx, activity_input): - print(f'Step 2: Received input: {activity_input}.') - # Do some work - return activity_input * 2 - - -@wfr.activity -def step3(ctx, activity_input): - print(f'Step 3: Received input: {activity_input}.') - # Do some work - return activity_input ^ 2 +@wfr.activity(name='hello_act') +def hello_act(ctx: WorkflowActivityContext, wf_input): + global counter + counter += wf_input + print(f'New counter value is: {counter}!', flush=True) ``` -[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/task_chaining.py) +[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py) {{% /codetab %}} @@ -241,22 +227,32 @@ Next, register and call the activites in a workflow. -The `random_workflow` function is a task chaining workflow pattern derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. +The `hello_world_wf` function is a function derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities. ```python -@wfr.workflow(name='random_workflow') -def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): - try: - result1 = yield ctx.call_activity(step1, input=wf_input) - result2 = yield ctx.call_activity(step2, input=result1) - result3 = yield ctx.call_activity(step3, input=result2) - except Exception as e: - yield ctx.call_activity(error_handler, input=str(e)) - raise - return [result1, result2, result3] +@wfr.workflow(name='hello_world_wf') +def hello_world_wf(ctx: DaprWorkflowContext, wf_input): + print(f'{wf_input}') + yield ctx.call_activity(hello_act, input=1) + yield ctx.call_activity(hello_act, input=10) + yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy) + yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy) + + # Change in event handling: Use when_any to handle both event and timeout + event = ctx.wait_for_external_event(event_name) + timeout = ctx.create_timer(timedelta(seconds=30)) + winner = yield when_any([event, timeout]) + + if winner == timeout: + print('Workflow timed out waiting for event') + return 'Timeout' + + yield ctx.call_activity(hello_act, input=100) + yield ctx.call_activity(hello_act, input=1000) + return 'Completed' ``` -[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51) +[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py) {{% /codetab %}} @@ -423,84 +419,177 @@ Finally, compose the application using the workflow. -[In the following example](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py), for a basic Python hello world application using the Python SDK, your project code would include: +[In the following example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py), for a basic Python hello world application using the Python SDK, your project code would include: - A Python package called `DaprClient` to receive the Python SDK capabilities. - A builder with extensions called: - `WorkflowRuntime`: Allows you to register the workflow runtime. - - `DaprWorkflowContext`: Allows you to [create workflows and workflow activities]({{< ref "#write-the-workflow" >}}) + - `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}}) - `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}}) -- API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow. +- API calls. In the example below, these calls start, pause, resume, purge, and completing the workflow. ```python -from durabletask import worker, task +from datetime import timedelta +from time import sleep +from dapr.ext.workflow import ( + WorkflowRuntime, + DaprWorkflowContext, + WorkflowActivityContext, + RetryPolicy, + DaprWorkflowClient, + when_any, +) +from dapr.conf import Settings +from dapr.clients.exceptions import DaprInternalError -from dapr.ext.workflow.workflow_context import Workflow -from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext -from dapr.ext.workflow.workflow_activity_context import Activity, WorkflowActivityContext -from dapr.ext.workflow.util import getAddress +settings = Settings() -from dapr.clients import DaprInternalError -from dapr.clients.http.client import DAPR_API_TOKEN_HEADER -from dapr.conf import settings -from dapr.conf.helpers import GrpcEndpoint -from dapr.ext.workflow.logger import LoggerOptions, Logger +counter = 0 +retry_count = 0 +child_orchestrator_count = 0 +child_orchestrator_string = '' +child_act_retry_count = 0 +instance_id = 'exampleInstanceID' +child_instance_id = 'childInstanceID' +workflow_name = 'hello_world_wf' +child_workflow_name = 'child_wf' +input_data = 'Hi Counter!' +event_name = 'event1' +event_data = 'eventData' +non_existent_id_error = 'no such instance exists' -wfr = wf.WorkflowRuntime() +retry_policy = RetryPolicy( + first_retry_interval=timedelta(seconds=1), + max_number_of_attempts=3, + backoff_coefficient=2, + max_retry_interval=timedelta(seconds=10), + retry_timeout=timedelta(seconds=100), +) - @wfr.workflow(name='hello_world_wf') - def hello_world_wf(ctx: DaprWorkflowContext, wf_input): - # Workflow definition... +wfr = WorkflowRuntime() - @wfr.activity(name='hello_act') - def hello_act(ctx: WorkflowActivityContext, wf_input): - # Activity definition... - # Start workflow - wfr = WorkflowRuntime() - wfr.start() - wf_client = DaprWorkflowClient() +@wfr.workflow(name='hello_world_wf') +def hello_world_wf(ctx: DaprWorkflowContext, wf_input): + print(f'{wf_input}') + yield ctx.call_activity(hello_act, input=1) + yield ctx.call_activity(hello_act, input=10) + yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy) + yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy) - # ... + # Change in event handling: Use when_any to handle both event and timeout + event = ctx.wait_for_external_event(event_name) + timeout = ctx.create_timer(timedelta(seconds=30)) + winner = yield when_any([event, timeout]) - # Pause workflow - wf_client.pause_workflow(instance_id=instance_id) - metadata = wf_client.get_workflow_state(instance_id=instance_id) - # ... check status ... - wf_client.resume_workflow(instance_id=instance_id) - - sleep(1) + if winner == timeout: + print('Workflow timed out waiting for event') + return 'Timeout' - # Raise workflow - wf_client.raise_workflow_event( - instance_id=instance_id, - event_name=event_name, - data=event_data - ) + yield ctx.call_activity(hello_act, input=100) + yield ctx.call_activity(hello_act, input=1000) + return 'Completed' - # Purge workflow - state = wf_client.wait_for_workflow_completion( - instance_id, - timeout_in_seconds=30 - ) - wf_client.purge_workflow(instance_id=instance_id) - workflowRuntime.shutdown() +@wfr.activity(name='hello_act') +def hello_act(ctx: WorkflowActivityContext, wf_input): + global counter + counter += wf_input + print(f'New counter value is: {counter}!', flush=True) -if __name__ == '__main__': + +@wfr.activity(name='hello_retryable_act') +def hello_retryable_act(ctx: WorkflowActivityContext): + global retry_count + if (retry_count % 2) == 0: + print(f'Retry count value is: {retry_count}!', flush=True) + retry_count += 1 + raise ValueError('Retryable Error') + print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True) + retry_count += 1 + + +@wfr.workflow(name='child_retryable_wf') +def child_retryable_wf(ctx: DaprWorkflowContext): + global child_orchestrator_string, child_orchestrator_count + if not ctx.is_replaying: + child_orchestrator_count += 1 + print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True) + child_orchestrator_string += str(child_orchestrator_count) + yield ctx.call_activity( + act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy + ) + if child_orchestrator_count < 3: + raise ValueError('Retryable Error') + + +@wfr.activity(name='act_for_child_wf') +def act_for_child_wf(ctx: WorkflowActivityContext, inp): + global child_orchestrator_string, child_act_retry_count + inp_char = chr(96 + inp) + print(f'Appending {inp_char} to child_orchestrator_string!', flush=True) + child_orchestrator_string += inp_char + if child_act_retry_count % 2 == 0: + child_act_retry_count += 1 + raise ValueError('Retryable Error') + child_act_retry_count += 1 + + +def main(): wfr.start() - sleep(10) # wait for workflow runtime to start + wf_client = DaprWorkflowClient() - wf_client = wf.DaprWorkflowClient() - instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow, input=42) - print(f'Workflow started. Instance ID: {instance_id}') - state = wf_client.wait_for_workflow_completion(instance_id) - print(f'Workflow completed! Status: {state.runtime_status}') + print('==========Start Counter Increase as per Input:==========') + wf_client.schedule_new_workflow( + workflow=hello_world_wf, input=input_data, instance_id=instance_id + ) + + wf_client.wait_for_workflow_start(instance_id) + + # Sleep to let the workflow run initial activities + sleep(12) + + assert counter == 11 + assert retry_count == 2 + assert child_orchestrator_string == '1aa2bb3cc' + + # Pause Test + wf_client.pause_workflow(instance_id=instance_id) + metadata = wf_client.get_workflow_state(instance_id=instance_id) + print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}') + + # Resume Test + wf_client.resume_workflow(instance_id=instance_id) + metadata = wf_client.get_workflow_state(instance_id=instance_id) + print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}') + + sleep(2) # Give the workflow time to reach the event wait state + wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data) + + print('========= Waiting for Workflow completion', flush=True) + try: + state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) + if state.runtime_status.name == 'COMPLETED': + print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"'))) + else: + print(f'Workflow failed! Status: {state.runtime_status.name}') + except TimeoutError: + print('*** Workflow timed out!') + + wf_client.purge_workflow(instance_id=instance_id) + try: + wf_client.get_workflow_state(instance_id=instance_id) + except DaprInternalError as err: + if non_existent_id_error in err._message: + print('Instance Successfully Purged') wfr.shutdown() -``` +if __name__ == '__main__': + main() +``` + {{% /codetab %}} {{% codetab %}} diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md index f03f4a4c4..c9e847ebe 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/howto-manage-workflow.md @@ -14,13 +14,13 @@ Now that you've [authored the workflow and its activities in your application]({ {{% codetab %}} Manage your workflow within your code. In the workflow example from the [Author a workflow]({{< ref "howto-author-workflow.md#write-the-application" >}}) guide, the workflow is registered in the code using the following APIs: -- **start_workflow**: Start an instance of a workflow -- **get_workflow**: Get information on the status of the workflow +- **schedule_new_workflow**: Start an instance of a workflow +- **get_workflow_state**: Get information on the status of the workflow - **pause_workflow**: Pauses or suspends a workflow instance that can later be resumed - **resume_workflow**: Resumes a paused workflow instance - **raise_workflow_event**: Raise an event on a workflow - **purge_workflow**: Removes all metadata related to a specific workflow instance -- **terminate_workflow**: Terminate or stop a particular instance of a workflow +- **wait_for_workflow_completion**: Complete a particular instance of a workflow ```python from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext @@ -34,27 +34,28 @@ eventName = "event1" eventData = "eventData" # Start the workflow -start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent, - workflow_name=workflowName, input=inputData, workflow_options=workflowOptions) +wf_client.schedule_new_workflow( + workflow=hello_world_wf, input=input_data, instance_id=instance_id + ) # Get info on the workflow -getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent) +wf_client.get_workflow_state(instance_id=instance_id) # Pause the workflow -d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent) +wf_client.pause_workflow(instance_id=instance_id) + metadata = wf_client.get_workflow_state(instance_id=instance_id) # Resume the workflow -d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent) +wf_client.resume_workflow(instance_id=instance_id) # Raise an event on the workflow. - d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent, - event_name=eventName, event_data=eventData) +wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data) # Purge the workflow -d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent) +wf_client.purge_workflow(instance_id=instance_id) -# Terminate the workflow -d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent) +# Wait for workflow completion +wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30) ``` {{% /codetab %}} From 8a29b3936ab5d406731d532baec264449115b65e Mon Sep 17 00:00:00 2001 From: James Pegg Date: Mon, 24 Mar 2025 03:52:07 +0000 Subject: [PATCH 3/5] Fixed spelling mistake in secret-scope.md (#4593) "Scop" fixed to "Scope" Signed-off-by: James Pegg --- daprdocs/content/en/operations/configuration/secret-scope.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daprdocs/content/en/operations/configuration/secret-scope.md b/daprdocs/content/en/operations/configuration/secret-scope.md index bd718288d..96e7b12e3 100644 --- a/daprdocs/content/en/operations/configuration/secret-scope.md +++ b/daprdocs/content/en/operations/configuration/secret-scope.md @@ -7,8 +7,8 @@ description: "Define secret scopes by augmenting the existing configuration reso description: "Define secret scopes by augmenting the existing configuration resource with restrictive permissions." --- -In addition to [scoping which applications can access a given component]({{< ref "component-scopes.md">}}), you can also scop a named secret store component to one or more secrets for an application. By defining `allowedSecrets` and/or `deniedSecrets` lists, you restrict applications to access only specific secrets. -In addition to [scoping which applications can access a given component]({{< ref "component-scopes.md">}}), you can also scop a named secret store component to one or more secrets for an application. By defining `allowedSecrets` and/or `deniedSecrets` lists, you restrict applications to access only specific secrets. +In addition to [scoping which applications can access a given component]({{< ref "component-scopes.md">}}), you can also scope a named secret store component to one or more secrets for an application. By defining `allowedSecrets` and/or `deniedSecrets` lists, you restrict applications to access only specific secrets. +In addition to [scoping which applications can access a given component]({{< ref "component-scopes.md">}}), you can also scope a named secret store component to one or more secrets for an application. By defining `allowedSecrets` and/or `deniedSecrets` lists, you restrict applications to access only specific secrets. For more information about configuring a Configuration resource: - [Configuration overview]({{< ref configuration-overview.md >}}) From aebf393b2a72d396a4a2622d9badacb736eb4b2a Mon Sep 17 00:00:00 2001 From: Mathieu Benoit Date: Tue, 25 Mar 2025 18:28:23 -0400 Subject: [PATCH 4/5] Update self-hosted-with-docker.md - make `scheduler` running (#4599) * Update self-hosted-with-docker.md - make scheduler running Signed-off-by: Mathieu Benoit * Update self-hosted-with-docker.md - --scheduler-host-address Signed-off-by: Mathieu Benoit * Update self-hosted-with-docker.md - Use smaller container images for placement and scheduler Signed-off-by: Mathieu Benoit --------- Signed-off-by: Mathieu Benoit --- .../hosting/self-hosted/self-hosted-with-docker.md | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/daprdocs/content/en/operations/hosting/self-hosted/self-hosted-with-docker.md b/daprdocs/content/en/operations/hosting/self-hosted/self-hosted-with-docker.md index 700acc776..7aa42196f 100644 --- a/daprdocs/content/en/operations/hosting/self-hosted/self-hosted-with-docker.md +++ b/daprdocs/content/en/operations/hosting/self-hosted/self-hosted-with-docker.md @@ -123,6 +123,7 @@ services: "--app-id", "nodeapp", "--app-port", "3000", "--placement-host-address", "placement:50006", # Dapr's placement service can be reach via the docker DNS entry + "--scheduler-host-address", "scheduler:50007", # Dapr's scheduler service can be reach via the docker DNS entry "--resources-path", "./components" ] volumes: @@ -134,22 +135,19 @@ services: ... # Deploy other daprized services and components (i.e. Redis) placement: - image: "daprio/dapr" + image: "daprio/placement" command: ["./placement", "--port", "50006"] ports: - "50006:50006" scheduler: - image: "daprio/dapr" - command: ["./scheduler", "--port", "50007"] + image: "daprio/scheduler" + command: ["./scheduler", "--port", "50007", "--etcd-data-dir", "/data"] ports: - "50007:50007" - # WARNING - This is a tmpfs volume, your state will not be persisted across restarts + user: root volumes: - - type: tmpfs - target: /data - tmpfs: - size: "64m" + - "./dapr-etcd-data/:/data" networks: hello-dapr: null From b421483bf41a4edfa0a5660d78ba3172d40366d3 Mon Sep 17 00:00:00 2001 From: Alice Gibbons Date: Thu, 27 Mar 2025 00:58:24 +0000 Subject: [PATCH 5/5] Update actors-quickstart.md (#4601) Update to .NET 8 runtime Signed-off-by: Alice Gibbons --- .../content/en/getting-started/quickstarts/actors-quickstart.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daprdocs/content/en/getting-started/quickstarts/actors-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/actors-quickstart.md index fef7c9de9..47c698be3 100644 --- a/daprdocs/content/en/getting-started/quickstarts/actors-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/actors-quickstart.md @@ -33,7 +33,7 @@ For this example, you will need: - [Docker Desktop](https://www.docker.com/products/docker-desktop) -- [.NET 6](https://dotnet.microsoft.com/download/dotnet/6.0), [.NET 8](https://dotnet.microsoft.com/download/dotnet/8.0) or [.NET 9](https://dotnet.microsoft.com/download/dotnet/9.0) installed +- [.NET 8](https://dotnet.microsoft.com/download/dotnet/8.0) installed **NOTE:** .NET 6 is the minimally supported version of .NET for the Dapr .NET SDK packages in this release. Only .NET 8 and .NET 9 will be supported in Dapr v1.16 and later releases.