Merge branch 'v1.11' into 1-11_ENDGAME

This commit is contained in:
Mark Fussell 2023-06-09 15:07:16 -07:00 committed by GitHub
commit 93e2616ef2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 226 additions and 11 deletions

View File

@ -31,6 +31,7 @@ metadata:
name: lockstore name: lockstore
spec: spec:
type: lock.redis type: lock.redis
version: v1
metadata: metadata:
- name: redisHost - name: redisHost
value: localhost:6379 value: localhost:6379

View File

@ -25,9 +25,10 @@ While the pattern is simple, there are many complexities hidden in the implement
Dapr Workflow solves these complexities by allowing you to implement the task chaining pattern concisely as a simple function in the programming language of your choice, as shown in the following example. Dapr Workflow solves these complexities by allowing you to implement the task chaining pattern concisely as a simple function in the programming language of your choice, as shown in the following example.
{{< tabs ".NET" >}} {{< tabs ".NET" Python >}}
{{% codetab %}} {{% codetab %}}
<!--dotnet-->
```csharp ```csharp
// Expotential backoff retry policy that survives long outages // Expotential backoff retry policy that survives long outages
@ -45,7 +46,6 @@ try
var result1 = await context.CallActivityAsync<string>("Step1", wfInput, retryOptions); var result1 = await context.CallActivityAsync<string>("Step1", wfInput, retryOptions);
var result2 = await context.CallActivityAsync<byte[]>("Step2", result1, retryOptions); var result2 = await context.CallActivityAsync<byte[]>("Step2", result1, retryOptions);
var result3 = await context.CallActivityAsync<long[]>("Step3", result2, retryOptions); var result3 = await context.CallActivityAsync<long[]>("Step3", result2, retryOptions);
var result4 = await context.CallActivityAsync<Guid[]>("Step4", result3, retryOptions);
return string.Join(", ", result4); return string.Join(", ", result4);
} }
catch (TaskFailedException) // Task failures are surfaced as TaskFailedException catch (TaskFailedException) // Task failures are surfaced as TaskFailedException
@ -56,14 +56,61 @@ catch (TaskFailedException) // Task failures are surfaced as TaskFailedException
} }
``` ```
{{% alert title="Note" color="primary" %}}
In the example above, `"Step1"`, `"Step2"`, `"Step3"`, and `"MyCompensation"` represent workflow activities, which are functions in your code that actually implement the steps of the workflow. For brevity, these activity implementations are left out of this example.
{{% /alert %}}
{{% /codetab %}}
{{% codetab %}}
<!--python-->
```python
import dapr.ext.workflow as wf
def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
try:
result1 = yield ctx.call_activity(step1, input=wf_input)
result2 = yield ctx.call_activity(step2, input=result1)
result3 = yield ctx.call_activity(step3, input=result2)
except Exception as e:
yield ctx.call_activity(error_handler, input=str(e))
raise
return [result1, result2, result3]
def step1(ctx, activity_input):
print(f'Step 1: Received input: {activity_input}.')
# Do some work
return activity_input + 1
def step2(ctx, activity_input):
print(f'Step 2: Received input: {activity_input}.')
# Do some work
return activity_input * 2
def step3(ctx, activity_input):
print(f'Step 3: Received input: {activity_input}.')
# Do some work
return activity_input ^ 2
def error_handler(ctx, error):
print(f'Executing error handler: {error}.')
# Do some compensating work
```
{{% alert title="Note" color="primary" %}}
Workflow retry policies will be available in a future version of the Python SDK.
{{% /alert %}}
{{% /codetab %}} {{% /codetab %}}
{{< /tabs >}} {{< /tabs >}}
{{% alert title="Note" color="primary" %}}
In the example above, `"Step1"`, `"Step2"`, `"MyCompensation"`, etc. represent workflow activities, which are functions in your code that actually implement the steps of the workflow. For brevity, these activity implementations are left out of this example.
{{% /alert %}}
As you can see, the workflow is expressed as a simple series of statements in the programming language of your choice. This allows any engineer in the organization to quickly understand the end-to-end flow without necessarily needing to understand the end-to-end system architecture. As you can see, the workflow is expressed as a simple series of statements in the programming language of your choice. This allows any engineer in the organization to quickly understand the end-to-end flow without necessarily needing to understand the end-to-end system architecture.
Behind the scenes, the Dapr Workflow runtime: Behind the scenes, the Dapr Workflow runtime:
@ -88,9 +135,10 @@ In addition to the challenges mentioned in [the previous pattern]({{< ref "workf
Dapr Workflows provides a way to express the fan-out/fan-in pattern as a simple function, as shown in the following example: Dapr Workflows provides a way to express the fan-out/fan-in pattern as a simple function, as shown in the following example:
{{< tabs ".NET" >}} {{< tabs ".NET" Python >}}
{{% codetab %}} {{% codetab %}}
<!--dotnet-->
```csharp ```csharp
// Get a list of N work items to process in parallel. // Get a list of N work items to process in parallel.
@ -114,6 +162,46 @@ await context.CallActivityAsync("PostResults", sum);
{{% /codetab %}} {{% /codetab %}}
{{% codetab %}}
<!--python-->
```python
import time
from typing import List
import dapr.ext.workflow as wf
def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
# get a batch of N work items to process in parallel
work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)
# schedule N parallel tasks to process the work items and wait for all to complete
parallel_tasks = [ctx.call_activity(process_work_item, input=work_item) for work_item in work_batch]
outputs = yield wf.when_all(parallel_tasks)
# aggregate the results and send them to another activity
total = sum(outputs)
yield ctx.call_activity(process_results, input=total)
def get_work_batch(ctx, batch_size: int) -> List[int]:
return [i + 1 for i in range(batch_size)]
def process_work_item(ctx, work_item: int) -> int:
print(f'Processing work item: {work_item}.')
time.sleep(5)
result = work_item * 2
print(f'Work item {work_item} processed. Result: {result}.')
return result
def process_results(ctx, final_result: int):
print(f'Final result: {final_result}.')
```
{{% /codetab %}}
{{< /tabs >}} {{< /tabs >}}
The key takeaways from this example are: The key takeaways from this example are:
@ -214,9 +302,10 @@ Depending on the business needs, there may be a single monitor or there may be m
Dapr Workflow supports this pattern natively by allowing you to implement _eternal workflows_. Rather than writing infinite while-loops ([which is an anti-pattern]({{< ref "workflow-features-concepts.md#infinite-loops-and-eternal-workflows" >}})), Dapr Workflow exposes a _continue-as-new_ API that workflow authors can use to restart a workflow function from the beginning with a new input. Dapr Workflow supports this pattern natively by allowing you to implement _eternal workflows_. Rather than writing infinite while-loops ([which is an anti-pattern]({{< ref "workflow-features-concepts.md#infinite-loops-and-eternal-workflows" >}})), Dapr Workflow exposes a _continue-as-new_ API that workflow authors can use to restart a workflow function from the beginning with a new input.
{{< tabs ".NET" >}} {{< tabs ".NET" Python >}}
{{% codetab %}} {{% codetab %}}
<!--dotnet-->
```csharp ```csharp
public override async Task<object> RunAsync(WorkflowContext context, MyEntityState myEntityState) public override async Task<object> RunAsync(WorkflowContext context, MyEntityState myEntityState)
@ -256,6 +345,53 @@ public override async Task<object> RunAsync(WorkflowContext context, MyEntitySta
{{% /codetab %}} {{% /codetab %}}
{{% codetab %}}
<!--python-->
```python
from dataclasses import dataclass
from datetime import timedelta
import random
import dapr.ext.workflow as wf
@dataclass
class JobStatus:
job_id: str
is_healthy: bool
def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
# poll a status endpoint associated with this job
status = yield ctx.call_activity(check_status, input=job)
if not ctx.is_replaying:
print(f"Job '{job.job_id}' is {status}.")
if status == "healthy":
job.is_healthy = True
next_sleep_interval = 60 # check less frequently when healthy
else:
if job.is_healthy:
job.is_healthy = False
ctx.call_activity(send_alert, input=f"Job '{job.job_id}' is unhealthy!")
next_sleep_interval = 5 # check more frequently when unhealthy
yield ctx.create_timer(fire_at=ctx.current_utc_datetime + timedelta(seconds=next_sleep_interval))
# restart from the beginning with a new JobStatus input
ctx.continue_as_new(job)
def check_status(ctx, _) -> str:
return random.choice(["healthy", "unhealthy"])
def send_alert(ctx, message: str):
print(f'*** Alert: {message}')
```
{{% /codetab %}}
{{< /tabs >}} {{< /tabs >}}
A workflow implementing the monitor pattern can loop forever or it can terminate itself gracefully by not calling _continue-as-new_. A workflow implementing the monitor pattern can loop forever or it can terminate itself gracefully by not calling _continue-as-new_.
@ -284,9 +420,10 @@ The following diagram illustrates this flow.
The following example code shows how this pattern can be implemented using Dapr Workflow. The following example code shows how this pattern can be implemented using Dapr Workflow.
{{< tabs ".NET" >}} {{< tabs ".NET" Python >}}
{{% codetab %}} {{% codetab %}}
<!--dotnet-->
```csharp ```csharp
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order) public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
@ -331,13 +468,73 @@ In the example above, `RequestApprovalActivity` is the name of a workflow activi
{{% /codetab %}} {{% /codetab %}}
{{% codetab %}}
<!--python-->
```python
from dataclasses import dataclass
from datetime import timedelta
import dapr.ext.workflow as wf
@dataclass
class Order:
cost: float
product: str
quantity: int
def __str__(self):
return f'{self.product} ({self.quantity})'
@dataclass
class Approval:
approver: str
@staticmethod
def from_dict(dict):
return Approval(**dict)
def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
# Orders under $1000 are auto-approved
if order.cost < 1000:
return "Auto-approved"
# Orders of $1000 or more require manager approval
yield ctx.call_activity(send_approval_request, input=order)
# Approvals must be received within 24 hours or they will be canceled.
approval_event = ctx.wait_for_external_event("approval_received")
timeout_event = ctx.create_timer(timedelta(hours=24))
winner = yield wf.when_any([approval_event, timeout_event])
if winner == timeout_event:
return "Cancelled"
# The order was approved
yield ctx.call_activity(place_order, input=order)
approval_details = Approval.from_dict(approval_event.get_result())
return f"Approved by '{approval_details.approver}'"
def send_approval_request(_, order: Order) -> None:
print(f'*** Sending approval request for order: {order}')
def place_order(_, order: Order) -> None:
print(f'*** Placing order: {order}')
```
{{% /codetab %}}
{{< /tabs >}} {{< /tabs >}}
The code that delivers the event to resume the workflow execution is external to the workflow. Workflow events can be delivered to a waiting workflow instance using the [raise event]({{< ref "howto-manage-workflow.md#raise-an-event" >}}) workflow management API, as shown in the following example: The code that delivers the event to resume the workflow execution is external to the workflow. Workflow events can be delivered to a waiting workflow instance using the [raise event]({{< ref "howto-manage-workflow.md#raise-an-event" >}}) workflow management API, as shown in the following example:
{{< tabs ".NET" >}} {{< tabs ".NET" Python >}}
{{% codetab %}} {{% codetab %}}
<!--dotnet-->
```csharp ```csharp
// Raise the workflow event to the waiting workflow // Raise the workflow event to the waiting workflow
@ -350,6 +547,23 @@ await daprClient.RaiseWorkflowEventAsync(
{{% /codetab %}} {{% /codetab %}}
{{% codetab %}}
<!--python-->
```python
from dapr.clients import DaprClient
from dataclasses import asdict
with DaprClient() as d:
d.raise_workflow_event(
instance_id=instance_id,
workflow_component="dapr",
event_name="approval_received",
event_data=asdict(Approval("Jane Doe")))
```
{{% /codetab %}}
{{< /tabs >}} {{< /tabs >}}
External events don't have to be directly triggered by humans. They can also be triggered by other systems. For example, a workflow may need to pause and wait for a payment to be received. In this case, a payment system might publish an event to a pub/sub topic on receipt of a payment, and a listener on that topic can raise an event to the workflow using the raise event workflow API. External events don't have to be directly triggered by humans. They can also be triggered by other systems. For example, a workflow may need to pause and wait for a payment to be received. In this case, a payment system might publish an event to a pub/sub topic on receipt of a payment, and a listener on that topic can raise an event to the workflow using the raise event workflow API.

@ -1 +1 @@
Subproject commit 11145914a5fddd1ada06b6a3b938c27b401f7025 Subproject commit 1e3b6eb859be175e12808c0ff345f40398f209d6