From d72539e8ae5a1d82ba0705c9025e8882fefaa9ec Mon Sep 17 00:00:00 2001 From: tanopanta <25007602+tanopanta@users.noreply.github.com> Date: Fri, 9 Jun 2023 00:15:22 +0900 Subject: [PATCH 1/6] add spec.version for componet.yaml sample of lock.redis #3516 Signed-off-by: tanopanta <25007602+tanopanta@users.noreply.github.com> --- .../distributed-lock/howto-use-distributed-lock.md | 1 + 1 file changed, 1 insertion(+) diff --git a/daprdocs/content/en/developing-applications/building-blocks/distributed-lock/howto-use-distributed-lock.md b/daprdocs/content/en/developing-applications/building-blocks/distributed-lock/howto-use-distributed-lock.md index 21dac83e2..4a2745c34 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/distributed-lock/howto-use-distributed-lock.md +++ b/daprdocs/content/en/developing-applications/building-blocks/distributed-lock/howto-use-distributed-lock.md @@ -31,6 +31,7 @@ metadata: name: lockstore spec: type: lock.redis + version: v1 metadata: - name: redisHost value: localhost:6379 From b4f1e85d4b8bfbf043662e68a96c13565bc92904 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Thu, 8 Jun 2023 10:44:05 -0700 Subject: [PATCH 2/6] [Workflow] Add Python samples to patterns doc Signed-off-by: Chris Gillum --- .../workflow/workflow-patterns.md | 107 ++++++++++++++++-- 1 file changed, 97 insertions(+), 10 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index 024ab82e1..803133562 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -25,9 +25,10 @@ While the pattern is simple, there are many complexities hidden in the implement Dapr Workflow solves these complexities by allowing you to implement the task chaining pattern concisely as a simple function in the programming language of your choice, as shown in the following example. -{{< tabs ".NET" >}} +{{< tabs ".NET" Python >}} {{% codetab %}} + ```csharp // Expotential backoff retry policy that survives long outages @@ -45,7 +46,6 @@ try var result1 = await context.CallActivityAsync("Step1", wfInput, retryOptions); var result2 = await context.CallActivityAsync("Step2", result1, retryOptions); var result3 = await context.CallActivityAsync("Step3", result2, retryOptions); - var result4 = await context.CallActivityAsync("Step4", result3, retryOptions); return string.Join(", ", result4); } catch (TaskFailedException) // Task failures are surfaced as TaskFailedException @@ -56,14 +56,61 @@ catch (TaskFailedException) // Task failures are surfaced as TaskFailedException } ``` +{{% alert title="Note" color="primary" %}} +In the example above, `"Step1"`, `"Step2"`, `"Step3"`, and `"MyCompensation"` represent workflow activities, which are functions in your code that actually implement the steps of the workflow. For brevity, these activity implementations are left out of this example. +{{% /alert %}} + +{{% /codetab %}} + +{{% codetab %}} + + +```python +import dapr.ext.workflow as wf + + +def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): + try: + result1 = yield ctx.call_activity(step1, input=wf_input) + result2 = yield ctx.call_activity(step2, input=result1) + result3 = yield ctx.call_activity(step3, input=result2) + except Exception as e: + yield ctx.call_activity(error_handler, input=str(e)) + raise + return [result1, result2, result3] + + +def step1(ctx, activity_input): + print(f'Step 1: Received input: {activity_input}.') + # Do some work + return activity_input + 1 + + +def step2(ctx, activity_input): + print(f'Step 2: Received input: {activity_input}.') + # Do some work + return activity_input * 2 + + +def step3(ctx, activity_input): + print(f'Step 3: Received input: {activity_input}.') + # Do some work + return activity_input ^ 2 + + +def error_handler(ctx, error): + print(f'Executing error handler: {error}.') + # Do some compensating work +``` + +{{% alert title="Note" color="primary" %}} +Workflow retry policies will be available in a future version of the Python SDK. +{{% /alert %}} + {{% /codetab %}} {{< /tabs >}} -{{% alert title="Note" color="primary" %}} -In the example above, `"Step1"`, `"Step2"`, `"MyCompensation"`, etc. represent workflow activities, which are functions in your code that actually implement the steps of the workflow. For brevity, these activity implementations are left out of this example. -{{% /alert %}} - As you can see, the workflow is expressed as a simple series of statements in the programming language of your choice. This allows any engineer in the organization to quickly understand the end-to-end flow without necessarily needing to understand the end-to-end system architecture. Behind the scenes, the Dapr Workflow runtime: @@ -88,9 +135,10 @@ In addition to the challenges mentioned in [the previous pattern]({{< ref "workf Dapr Workflows provides a way to express the fan-out/fan-in pattern as a simple function, as shown in the following example: -{{< tabs ".NET" >}} +{{< tabs ".NET" Python >}} {{% codetab %}} + ```csharp // Get a list of N work items to process in parallel. @@ -114,6 +162,15 @@ await context.CallActivityAsync("PostResults", sum); {{% /codetab %}} +{{% codetab %}} + + +```python +# TODO +``` + +{{% /codetab %}} + {{< /tabs >}} The key takeaways from this example are: @@ -214,9 +271,10 @@ Depending on the business needs, there may be a single monitor or there may be m Dapr Workflow supports this pattern natively by allowing you to implement _eternal workflows_. Rather than writing infinite while-loops ([which is an anti-pattern]({{< ref "workflow-features-concepts.md#infinite-loops-and-eternal-workflows" >}})), Dapr Workflow exposes a _continue-as-new_ API that workflow authors can use to restart a workflow function from the beginning with a new input. -{{< tabs ".NET" >}} +{{< tabs ".NET" Python >}} {{% codetab %}} + ```csharp public override async Task RunAsync(WorkflowContext context, MyEntityState myEntityState) @@ -256,6 +314,15 @@ public override async Task RunAsync(WorkflowContext context, MyEntitySta {{% /codetab %}} +{{% codetab %}} + + +```python +# TODO +``` + +{{% /codetab %}} + {{< /tabs >}} A workflow implementing the monitor pattern can loop forever or it can terminate itself gracefully by not calling _continue-as-new_. @@ -284,9 +351,10 @@ The following diagram illustrates this flow. The following example code shows how this pattern can be implemented using Dapr Workflow. -{{< tabs ".NET" >}} +{{< tabs ".NET" Python >}} {{% codetab %}} + ```csharp public override async Task RunAsync(WorkflowContext context, OrderPayload order) @@ -331,13 +399,23 @@ In the example above, `RequestApprovalActivity` is the name of a workflow activi {{% /codetab %}} +{{% codetab %}} + + +```python +# TODO +``` + +{{% /codetab %}} + {{< /tabs >}} The code that delivers the event to resume the workflow execution is external to the workflow. Workflow events can be delivered to a waiting workflow instance using the [raise event]({{< ref "howto-manage-workflow.md#raise-an-event" >}}) workflow management API, as shown in the following example: -{{< tabs ".NET" >}} +{{< tabs ".NET" Python >}} {{% codetab %}} + ```csharp // Raise the workflow event to the waiting workflow @@ -350,6 +428,15 @@ await daprClient.RaiseWorkflowEventAsync( {{% /codetab %}} +{{% codetab %}} + + +```python +# TODO +``` + +{{% /codetab %}} + {{< /tabs >}} External events don't have to be directly triggered by humans. They can also be triggered by other systems. For example, a workflow may need to pause and wait for a payment to be received. In this case, a payment system might publish an event to a pub/sub topic on receipt of a payment, and a listener on that topic can raise an event to the workflow using the raise event workflow API. From 759ff9495e989465a61bfbee138d112833cd26e4 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Thu, 8 Jun 2023 11:08:37 -0700 Subject: [PATCH 3/6] Add fan-out / fan-in Python snippet Signed-off-by: Chris Gillum --- .../workflow/workflow-patterns.md | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index 803133562..25ec9c6f3 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -166,7 +166,38 @@ await context.CallActivityAsync("PostResults", sum); ```python -# TODO +import time +from typing import List +import dapr.ext.workflow as wf + + +def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): + # get a batch of N work items to process in parallel + work_batch = yield ctx.call_activity(get_work_batch, input=wf_input) + + # schedule N parallel tasks to process the work items and wait for all to complete + parallel_tasks = [ctx.call_activity(process_work_item, input=work_item) for work_item in work_batch] + outputs = yield wf.when_all(parallel_tasks) + + # aggregate the results and send them to another activity + total = sum(outputs) + yield ctx.call_activity(process_results, input=total) + + +def get_work_batch(ctx, batch_size: int) -> List[int]: + return [i + 1 for i in range(batch_size)] + + +def process_work_item(ctx, work_item: int) -> int: + print(f'Processing work item: {work_item}.') + time.sleep(5) + result = work_item * 2 + print(f'Work item {work_item} processed. Result: {result}.') + return result + + +def process_results(ctx, final_result: int): + print(f'Final result: {final_result}.') ``` {{% /codetab %}} From 1b06ba8ad28de6d2f502900afc084f203ab8ea63 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Thu, 8 Jun 2023 15:19:25 -0700 Subject: [PATCH 4/6] Add Monitor pattern Python code Signed-off-by: Chris Gillum --- .../workflow/workflow-patterns.md | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index 25ec9c6f3..c45767caa 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -349,7 +349,45 @@ public override async Task RunAsync(WorkflowContext context, MyEntitySta ```python -# TODO +from dataclasses import dataclass +from datetime import timedelta +import random +import dapr.ext.workflow as wf + + +@dataclass +class JobStatus: + job_id: str + is_healthy: bool + + +def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus): + # poll a status endpoint associated with this job + status = yield ctx.call_activity(check_status, input=job) + if not ctx.is_replaying: + print(f"Job '{job.job_id}' is {status}.") + + if status == "healthy": + job.is_healthy = True + next_sleep_interval = 60 # check less frequently when healthy + else: + if job.is_healthy: + job.is_healthy = False + ctx.call_activity(send_alert, input=f"Job '{job.job_id}' is unhealthy!") + next_sleep_interval = 5 # check more frequently when unhealthy + + yield ctx.create_timer(fire_at=ctx.current_utc_datetime + timedelta(seconds=next_sleep_interval)) + + # restart from the beginning with a new JobStatus input + ctx.continue_as_new(job) + + +def check_status(ctx, _) -> str: + return random.choice(["healthy", "unhealthy"]) + + +def send_alert(ctx, message: str): + print(f'*** Alert: {message}') ``` {{% /codetab %}} From e3885f4d7ef682fd4c63d3cf4644ba680b3de896 Mon Sep 17 00:00:00 2001 From: Chris Gillum Date: Thu, 8 Jun 2023 16:33:37 -0700 Subject: [PATCH 5/6] Add external system interaction pattern Python code Signed-off-by: Chris Gillum --- .../workflow/workflow-patterns.md | 62 ++++++++++++++++++- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md index c45767caa..4ff10782b 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md +++ b/daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-patterns.md @@ -472,7 +472,57 @@ In the example above, `RequestApprovalActivity` is the name of a workflow activi ```python -# TODO +from dataclasses import dataclass +from datetime import timedelta +import dapr.ext.workflow as wf + + +@dataclass +class Order: + cost: float + product: str + quantity: int + + def __str__(self): + return f'{self.product} ({self.quantity})' + + +@dataclass +class Approval: + approver: str + + @staticmethod + def from_dict(dict): + return Approval(**dict) + + +def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order): + # Orders under $1000 are auto-approved + if order.cost < 1000: + return "Auto-approved" + + # Orders of $1000 or more require manager approval + yield ctx.call_activity(send_approval_request, input=order) + + # Approvals must be received within 24 hours or they will be canceled. + approval_event = ctx.wait_for_external_event("approval_received") + timeout_event = ctx.create_timer(timedelta(hours=24)) + winner = yield wf.when_any([approval_event, timeout_event]) + if winner == timeout_event: + return "Cancelled" + + # The order was approved + yield ctx.call_activity(place_order, input=order) + approval_details = Approval.from_dict(approval_event.get_result()) + return f"Approved by '{approval_details.approver}'" + + +def send_approval_request(_, order: Order) -> None: + print(f'*** Sending approval request for order: {order}') + + +def place_order(_, order: Order) -> None: + print(f'*** Placing order: {order}') ``` {{% /codetab %}} @@ -501,7 +551,15 @@ await daprClient.RaiseWorkflowEventAsync( ```python -# TODO +from dapr.clients import DaprClient +from dataclasses import asdict + +with DaprClient() as d: + d.raise_workflow_event( + instance_id=instance_id, + workflow_component="dapr", + event_name="approval_received", + event_data=asdict(Approval("Jane Doe"))) ``` {{% /codetab %}} From f52c9a90ff6ca8a22f582f32802fd55f1c2db055 Mon Sep 17 00:00:00 2001 From: Shubham Sharma Date: Fri, 9 Jun 2023 12:13:20 +0530 Subject: [PATCH 6/6] Update JS-SDK submodule Signed-off-by: Shubham Sharma --- sdkdocs/js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdkdocs/js b/sdkdocs/js index 11145914a..1e3b6eb85 160000 --- a/sdkdocs/js +++ b/sdkdocs/js @@ -1 +1 @@ -Subproject commit 11145914a5fddd1ada06b6a3b938c27b401f7025 +Subproject commit 1e3b6eb859be175e12808c0ff345f40398f209d6