Add challenges and tips code

Signed-off-by: Marc Duiker <marcduiker@users.noreply.github.com>
This commit is contained in:
Marc Duiker 2025-05-12 14:51:36 +02:00
parent 23a84b5e16
commit d45f950642
No known key found for this signature in database
GPG Key ID: 6A36EA7754473DD7
5 changed files with 121 additions and 0 deletions

View File

@ -0,0 +1,10 @@
# Workflow Challenges & Tips
Workflow systems are very powerful tools but also have their challenges & limitations as described in the [Dapr docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-features-concepts/#limitations).
This section provides some tips with code snippets to understand the limitations and get the most out of the Dapr Workflow API. Read through the following examples to learn best practices to develop Dapr workflows.
- [Deterministic workflows](deterministic_workflow.py)
- [Idempotent activities](idempotent_activity.py)
- [Versioning workflows](versioning_workflow.py)
- [Workflow & activity payload size](payload_size_workflow.py)

View File

@ -0,0 +1,36 @@
import dapr.ext.workflow as wf
import datetime
import uuid
wf_runtime = wf.WorkflowRuntime()
@wf_runtime.workflow(name='non_deterministic_workflow')
def non_deterministic_workflow(ctx: wf.DaprWorkflowContext, wf_input: str):
"""
Do not use non-deterministic operations in a workflow!
These operations will create a new value every time the
workflow is replayed.
"""
order_id = str(uuid.uuid4())
order_date = datetime.now()
yield ctx.call_activity(submit_id, input=order_id)
yield ctx.call_activity(submit_date, input=order_date)
return order_id
@wf_runtime.workflow(name='deterministic_workflow')
def deterministic_workflow(ctx: wf.DaprWorkflowContext, wf_input: str):
"""
Either wrap non-deterministic operations in an activity. Or use deterministic
alternatives on the DaprWorkflowContext instead. These operations create the
same value when the workflow is replayed.
"""
order_id = yield ctx.call_activity(create_order_id, input=wf_input)
order_date = ctx.current_utc_datetime
yield ctx.call_activity(submit_id, input=order_id)
yield ctx.call_activity(submit_date, input=order_date)
return order_id

View File

@ -0,0 +1,14 @@
import dapr.ext.workflow as wf
wf_runtime = wf.WorkflowRuntime()
@wf_runtime.activity(name='idempotent_activity')
def idempotent_activity(ctx: wf.WorkflowActivityContext, order_item: Order) -> bool:
"""
Beware of non-idempotent operations in an activity.
Dapr Workflow guarantees at-least-once execution of activities, so activities might be executed more than once
in case an activity is not ran to completion successfully.
For instance, can you insert a record to a database twice without side effects?
var insertSql = $"INSERT INTO Orders (Id, Description, UnitPrice, Quantity) VALUES ('{order_item.id}', '{order_item.description}', {order_item.unit_price}, {order_item.quantity})";
It's best to perform a check if an record already exists before inserting it.
"""

View File

@ -0,0 +1,29 @@
import dapr.ext.workflow as wf
wf_runtime = wf.WorkflowRuntime()
@wf_runtime.workflow(name='large_payload_size_workflow')
def large_payload_size_workflow(ctx: wf.DaprWorkflowContext, doc_id: str):
"""
Do not pass large payloads between activities.
They are stored in the Dapr state store twice, one as output argument
for GetDocument, and once as input argument for UpdateDocument.
"""
document = yield ctx.call_activity(get_document, input=doc_id)
updated_document = yield ctx.call_activity(update_document, input=document)
# More activities to process the updated document
return updated_document
@wf_runtime.workflow(name='small_payload_size_workflow')
def small_payload_size_workflow(ctx: wf.DaprWorkflowContext, doc_id: str):
"""
Do pass small payloads between activities, preferably IDs only, or objects that are quick to (de)serialize in large volumes.
Combine multiple actions, such as document retrieval and update, into a single activity.
"""
updated_doc_id = yield ctx.call_activity(get_and_update_document, input=doc_id)
# More activities to process the updated document
return updated_doc_id

View File

@ -0,0 +1,32 @@
import dapr.ext.workflow as wf
wf_runtime = wf.WorkflowRuntime()
"""
This is the initial version of the workflow.
Note that the input argument for both activities is the orderItem (string).
"""
@wf_runtime.workflow(name='versioning_workflow_1')
def versioning_workflow_1(ctx: wf.DaprWorkflowContext, order_item: str):
result_a = yield ctx.call_activity(activity_a, input=order_item)
result_b = yield ctx.call_activity(activity_b, input=order_item)
return result_a + result_b
"""
This is the updated version of the workflow.
The input for activity_b has changed from orderItem (string) to resultA (int).
If there are in-flight workflow instances that were started with the previous version
of this workflow, these will fail when the new version of the workflow is deployed
and the workflow name remains the same, since the runtime parameters do not match with the persisted state.
It is recommended to version workflows by creating a new workflow class with a new name:
{workflowname}_1 -> {workflowname}_2
Try to avoid making breaking changes in perpetual workflows (that use the `continue_as_new` method)
since these are difficult to replace with a new version.
"""
@wf_runtime.workflow(name='versioning_workflow_2')
def versioning_workflow_2(ctx: wf.DaprWorkflowContext, order_item: str):
result_a = yield ctx.call_activity(activity_a, input=order_item)
result_b = yield ctx.call_activity(activity_b, input=result_a)
return result_a + result_b