mirror of https://github.com/dapr/docs.git
Merge pull request #4563 from hhunter-ms/issue_4410
Python SDK workflow updates
This commit is contained in:
commit
f612ab7bcc
|
@ -39,13 +39,14 @@ The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar si
|
|||
Define the workflow activities you'd like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called `hello_act` that notifies users of the current counter value. `hello_act` is a function derived from a class called `WorkflowActivityContext`.
|
||||
|
||||
```python
|
||||
def hello_act(ctx: WorkflowActivityContext, input):
|
||||
@wfr.activity(name='hello_act')
|
||||
def hello_act(ctx: WorkflowActivityContext, wf_input):
|
||||
global counter
|
||||
counter += input
|
||||
counter += wf_input
|
||||
print(f'New counter value is: {counter}!', flush=True)
|
||||
```
|
||||
|
||||
[See the `hello_act` workflow activity in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL40C1-L43C59)
|
||||
[See the task chaining workflow activity in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py)
|
||||
|
||||
|
||||
{{% /codetab %}}
|
||||
|
@ -226,19 +227,32 @@ Next, register and call the activites in a workflow.
|
|||
|
||||
<!--python-->
|
||||
|
||||
The `hello_world_wf` function is derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities.
|
||||
The `hello_world_wf` function is a function derived from a class called `DaprWorkflowContext` with input and output parameter types. It also includes a `yield` statement that does the heavy lifting of the workflow and calls the workflow activities.
|
||||
|
||||
```python
|
||||
def hello_world_wf(ctx: DaprWorkflowContext, input):
|
||||
print(f'{input}')
|
||||
@wfr.workflow(name='hello_world_wf')
|
||||
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
|
||||
print(f'{wf_input}')
|
||||
yield ctx.call_activity(hello_act, input=1)
|
||||
yield ctx.call_activity(hello_act, input=10)
|
||||
yield ctx.wait_for_external_event("event1")
|
||||
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
|
||||
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
|
||||
|
||||
# Change in event handling: Use when_any to handle both event and timeout
|
||||
event = ctx.wait_for_external_event(event_name)
|
||||
timeout = ctx.create_timer(timedelta(seconds=30))
|
||||
winner = yield when_any([event, timeout])
|
||||
|
||||
if winner == timeout:
|
||||
print('Workflow timed out waiting for event')
|
||||
return 'Timeout'
|
||||
|
||||
yield ctx.call_activity(hello_act, input=100)
|
||||
yield ctx.call_activity(hello_act, input=1000)
|
||||
return 'Completed'
|
||||
```
|
||||
|
||||
[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py#LL32C1-L38C51)
|
||||
[See the `hello_world_wf` workflow in context.](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py)
|
||||
|
||||
|
||||
{{% /codetab %}}
|
||||
|
@ -405,89 +419,177 @@ Finally, compose the application using the workflow.
|
|||
|
||||
<!--python-->
|
||||
|
||||
[In the following example](https://github.com/dapr/python-sdk/blob/master/examples/demo_workflow/app.py), for a basic Python hello world application using the Python SDK, your project code would include:
|
||||
[In the following example](https://github.com/dapr/python-sdk/blob/main/examples/workflow/simple.py), for a basic Python hello world application using the Python SDK, your project code would include:
|
||||
|
||||
- A Python package called `DaprClient` to receive the Python SDK capabilities.
|
||||
- A builder with extensions called:
|
||||
- `WorkflowRuntime`: Allows you to register workflows and workflow activities
|
||||
- `WorkflowRuntime`: Allows you to register the workflow runtime.
|
||||
- `DaprWorkflowContext`: Allows you to [create workflows]({{< ref "#write-the-workflow" >}})
|
||||
- `WorkflowActivityContext`: Allows you to [create workflow activities]({{< ref "#write-the-workflow-activities" >}})
|
||||
- API calls. In the example below, these calls start, pause, resume, purge, and terminate the workflow.
|
||||
- API calls. In the example below, these calls start, pause, resume, purge, and completing the workflow.
|
||||
|
||||
```python
|
||||
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
|
||||
from dapr.clients import DaprClient
|
||||
from datetime import timedelta
|
||||
from time import sleep
|
||||
from dapr.ext.workflow import (
|
||||
WorkflowRuntime,
|
||||
DaprWorkflowContext,
|
||||
WorkflowActivityContext,
|
||||
RetryPolicy,
|
||||
DaprWorkflowClient,
|
||||
when_any,
|
||||
)
|
||||
from dapr.conf import Settings
|
||||
from dapr.clients.exceptions import DaprInternalError
|
||||
|
||||
settings = Settings()
|
||||
|
||||
counter = 0
|
||||
retry_count = 0
|
||||
child_orchestrator_count = 0
|
||||
child_orchestrator_string = ''
|
||||
child_act_retry_count = 0
|
||||
instance_id = 'exampleInstanceID'
|
||||
child_instance_id = 'childInstanceID'
|
||||
workflow_name = 'hello_world_wf'
|
||||
child_workflow_name = 'child_wf'
|
||||
input_data = 'Hi Counter!'
|
||||
event_name = 'event1'
|
||||
event_data = 'eventData'
|
||||
non_existent_id_error = 'no such instance exists'
|
||||
|
||||
retry_policy = RetryPolicy(
|
||||
first_retry_interval=timedelta(seconds=1),
|
||||
max_number_of_attempts=3,
|
||||
backoff_coefficient=2,
|
||||
max_retry_interval=timedelta(seconds=10),
|
||||
retry_timeout=timedelta(seconds=100),
|
||||
)
|
||||
|
||||
wfr = WorkflowRuntime()
|
||||
|
||||
|
||||
@wfr.workflow(name='hello_world_wf')
|
||||
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
|
||||
print(f'{wf_input}')
|
||||
yield ctx.call_activity(hello_act, input=1)
|
||||
yield ctx.call_activity(hello_act, input=10)
|
||||
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
|
||||
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
|
||||
|
||||
# Change in event handling: Use when_any to handle both event and timeout
|
||||
event = ctx.wait_for_external_event(event_name)
|
||||
timeout = ctx.create_timer(timedelta(seconds=30))
|
||||
winner = yield when_any([event, timeout])
|
||||
|
||||
if winner == timeout:
|
||||
print('Workflow timed out waiting for event')
|
||||
return 'Timeout'
|
||||
|
||||
yield ctx.call_activity(hello_act, input=100)
|
||||
yield ctx.call_activity(hello_act, input=1000)
|
||||
return 'Completed'
|
||||
|
||||
|
||||
@wfr.activity(name='hello_act')
|
||||
def hello_act(ctx: WorkflowActivityContext, wf_input):
|
||||
global counter
|
||||
counter += wf_input
|
||||
print(f'New counter value is: {counter}!', flush=True)
|
||||
|
||||
|
||||
@wfr.activity(name='hello_retryable_act')
|
||||
def hello_retryable_act(ctx: WorkflowActivityContext):
|
||||
global retry_count
|
||||
if (retry_count % 2) == 0:
|
||||
print(f'Retry count value is: {retry_count}!', flush=True)
|
||||
retry_count += 1
|
||||
raise ValueError('Retryable Error')
|
||||
print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True)
|
||||
retry_count += 1
|
||||
|
||||
|
||||
@wfr.workflow(name='child_retryable_wf')
|
||||
def child_retryable_wf(ctx: DaprWorkflowContext):
|
||||
global child_orchestrator_string, child_orchestrator_count
|
||||
if not ctx.is_replaying:
|
||||
child_orchestrator_count += 1
|
||||
print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True)
|
||||
child_orchestrator_string += str(child_orchestrator_count)
|
||||
yield ctx.call_activity(
|
||||
act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy
|
||||
)
|
||||
if child_orchestrator_count < 3:
|
||||
raise ValueError('Retryable Error')
|
||||
|
||||
|
||||
@wfr.activity(name='act_for_child_wf')
|
||||
def act_for_child_wf(ctx: WorkflowActivityContext, inp):
|
||||
global child_orchestrator_string, child_act_retry_count
|
||||
inp_char = chr(96 + inp)
|
||||
print(f'Appending {inp_char} to child_orchestrator_string!', flush=True)
|
||||
child_orchestrator_string += inp_char
|
||||
if child_act_retry_count % 2 == 0:
|
||||
child_act_retry_count += 1
|
||||
raise ValueError('Retryable Error')
|
||||
child_act_retry_count += 1
|
||||
|
||||
# ...
|
||||
|
||||
def main():
|
||||
with DaprClient() as d:
|
||||
host = settings.DAPR_RUNTIME_HOST
|
||||
port = settings.DAPR_GRPC_PORT
|
||||
workflowRuntime = WorkflowRuntime(host, port)
|
||||
workflowRuntime = WorkflowRuntime()
|
||||
workflowRuntime.register_workflow(hello_world_wf)
|
||||
workflowRuntime.register_activity(hello_act)
|
||||
workflowRuntime.start()
|
||||
wfr.start()
|
||||
wf_client = DaprWorkflowClient()
|
||||
|
||||
# Start workflow
|
||||
print("==========Start Counter Increase as per Input:==========")
|
||||
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
|
||||
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
|
||||
print(f"start_resp {start_resp.instance_id}")
|
||||
print('==========Start Counter Increase as per Input:==========')
|
||||
wf_client.schedule_new_workflow(
|
||||
workflow=hello_world_wf, input=input_data, instance_id=instance_id
|
||||
)
|
||||
|
||||
# ...
|
||||
wf_client.wait_for_workflow_start(instance_id)
|
||||
|
||||
# Pause workflow
|
||||
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
print(f"Get response from {workflowName} after pause call: {getResponse.runtime_status}")
|
||||
# Sleep to let the workflow run initial activities
|
||||
sleep(12)
|
||||
|
||||
# Resume workflow
|
||||
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
print(f"Get response from {workflowName} after resume call: {getResponse.runtime_status}")
|
||||
|
||||
sleep(1)
|
||||
# Raise workflow
|
||||
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
|
||||
event_name=eventName, event_data=eventData)
|
||||
assert counter == 11
|
||||
assert retry_count == 2
|
||||
assert child_orchestrator_string == '1aa2bb3cc'
|
||||
|
||||
sleep(5)
|
||||
# Purge workflow
|
||||
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
try:
|
||||
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
except DaprInternalError as err:
|
||||
if nonExistentIDError in err._message:
|
||||
print("Instance Successfully Purged")
|
||||
# Pause Test
|
||||
wf_client.pause_workflow(instance_id=instance_id)
|
||||
metadata = wf_client.get_workflow_state(instance_id=instance_id)
|
||||
print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}')
|
||||
|
||||
# Kick off another workflow for termination purposes
|
||||
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
|
||||
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
|
||||
print(f"start_resp {start_resp.instance_id}")
|
||||
# Resume Test
|
||||
wf_client.resume_workflow(instance_id=instance_id)
|
||||
metadata = wf_client.get_workflow_state(instance_id=instance_id)
|
||||
print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}')
|
||||
|
||||
# Terminate workflow
|
||||
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
sleep(1)
|
||||
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
print(f"Get response from {workflowName} after terminate call: {getResponse.runtime_status}")
|
||||
sleep(2) # Give the workflow time to reach the event wait state
|
||||
wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data)
|
||||
|
||||
# Purge workflow
|
||||
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
try:
|
||||
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
except DaprInternalError as err:
|
||||
if nonExistentIDError in err._message:
|
||||
print("Instance Successfully Purged")
|
||||
print('========= Waiting for Workflow completion', flush=True)
|
||||
try:
|
||||
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
|
||||
if state.runtime_status.name == 'COMPLETED':
|
||||
print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"')))
|
||||
else:
|
||||
print(f'Workflow failed! Status: {state.runtime_status.name}')
|
||||
except TimeoutError:
|
||||
print('*** Workflow timed out!')
|
||||
|
||||
wf_client.purge_workflow(instance_id=instance_id)
|
||||
try:
|
||||
wf_client.get_workflow_state(instance_id=instance_id)
|
||||
except DaprInternalError as err:
|
||||
if non_existent_id_error in err._message:
|
||||
print('Instance Successfully Purged')
|
||||
|
||||
wfr.shutdown()
|
||||
|
||||
workflowRuntime.shutdown()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
```
|
||||
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
|
|
@ -14,13 +14,13 @@ Now that you've [authored the workflow and its activities in your application]({
|
|||
{{% codetab %}}
|
||||
|
||||
Manage your workflow within your code. In the workflow example from the [Author a workflow]({{< ref "howto-author-workflow.md#write-the-application" >}}) guide, the workflow is registered in the code using the following APIs:
|
||||
- **start_workflow**: Start an instance of a workflow
|
||||
- **get_workflow**: Get information on the status of the workflow
|
||||
- **schedule_new_workflow**: Start an instance of a workflow
|
||||
- **get_workflow_state**: Get information on the status of the workflow
|
||||
- **pause_workflow**: Pauses or suspends a workflow instance that can later be resumed
|
||||
- **resume_workflow**: Resumes a paused workflow instance
|
||||
- **raise_workflow_event**: Raise an event on a workflow
|
||||
- **purge_workflow**: Removes all metadata related to a specific workflow instance
|
||||
- **terminate_workflow**: Terminate or stop a particular instance of a workflow
|
||||
- **wait_for_workflow_completion**: Complete a particular instance of a workflow
|
||||
|
||||
```python
|
||||
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
|
||||
|
@ -34,27 +34,28 @@ eventName = "event1"
|
|||
eventData = "eventData"
|
||||
|
||||
# Start the workflow
|
||||
start_resp = d.start_workflow(instance_id=instanceId, workflow_component=workflowComponent,
|
||||
workflow_name=workflowName, input=inputData, workflow_options=workflowOptions)
|
||||
wf_client.schedule_new_workflow(
|
||||
workflow=hello_world_wf, input=input_data, instance_id=instance_id
|
||||
)
|
||||
|
||||
# Get info on the workflow
|
||||
getResponse = d.get_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
wf_client.get_workflow_state(instance_id=instance_id)
|
||||
|
||||
# Pause the workflow
|
||||
d.pause_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
wf_client.pause_workflow(instance_id=instance_id)
|
||||
metadata = wf_client.get_workflow_state(instance_id=instance_id)
|
||||
|
||||
# Resume the workflow
|
||||
d.resume_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
wf_client.resume_workflow(instance_id=instance_id)
|
||||
|
||||
# Raise an event on the workflow.
|
||||
d.raise_workflow_event(instance_id=instanceId, workflow_component=workflowComponent,
|
||||
event_name=eventName, event_data=eventData)
|
||||
wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data)
|
||||
|
||||
# Purge the workflow
|
||||
d.purge_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
wf_client.purge_workflow(instance_id=instance_id)
|
||||
|
||||
# Terminate the workflow
|
||||
d.terminate_workflow(instance_id=instanceId, workflow_component=workflowComponent)
|
||||
# Wait for workflow completion
|
||||
wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
|
|
@ -10,7 +10,7 @@ description: Get started with the Dapr conversation building block
|
|||
The conversation building block is currently in **alpha**.
|
||||
{{% /alert %}}
|
||||
|
||||
Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it for a poem about Dapr.
|
||||
Let's take a look at how the [Dapr conversation building block]({{< ref conversation-overview.md >}}) makes interacting with Large Language Models (LLMs) easier. In this quickstart, you use the echo component to communicate with the mock LLM and ask it to define Dapr.
|
||||
|
||||
You can try out this conversation quickstart by either:
|
||||
|
||||
|
@ -18,7 +18,7 @@ You can try out this conversation quickstart by either:
|
|||
- [Running the application without the template]({{< ref "#run-the-app-without-the-template" >}})
|
||||
|
||||
{{% alert title="Note" color="primary" %}}
|
||||
Currently, only the HTTP quickstart sample is available in Python and JavaScript.
|
||||
Currently, you can only use JavaScript for the quickstart sample using HTTP, not the JavaScript SDK.
|
||||
{{% /alert %}}
|
||||
|
||||
## Run the app with the template file
|
||||
|
@ -50,7 +50,7 @@ git clone https://github.com/dapr/quickstarts.git
|
|||
From the root of the Quickstarts directory, navigate into the conversation directory:
|
||||
|
||||
```bash
|
||||
cd conversation/python/http/conversation
|
||||
cd conversation/python/sdk/conversation
|
||||
```
|
||||
|
||||
Install the dependencies:
|
||||
|
@ -61,7 +61,7 @@ pip3 install -r requirements.txt
|
|||
|
||||
### Step 3: Launch the conversation service
|
||||
|
||||
Navigate back to the `http` directory and start the conversation service with the following command:
|
||||
Navigate back to the `sdk` directory and start the conversation service with the following command:
|
||||
|
||||
```bash
|
||||
dapr run -f .
|
||||
|
@ -117,37 +117,28 @@ In the application code:
|
|||
- The mock LLM echoes "What is dapr?".
|
||||
|
||||
```python
|
||||
import logging
|
||||
import requests
|
||||
import os
|
||||
from dapr.clients import DaprClient
|
||||
from dapr.clients.grpc._request import ConversationInput
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
with DaprClient() as d:
|
||||
inputs = [
|
||||
ConversationInput(content="What is dapr?", role='user', scrub_pii=True),
|
||||
]
|
||||
|
||||
base_url = os.getenv('BASE_URL', 'http://localhost') + ':' + os.getenv(
|
||||
'DAPR_HTTP_PORT', '3500')
|
||||
|
||||
CONVERSATION_COMPONENT_NAME = 'echo'
|
||||
|
||||
input = {
|
||||
'name': 'echo',
|
||||
'inputs': [{'message':'What is dapr?'}],
|
||||
'parameters': {},
|
||||
'metadata': {}
|
||||
metadata = {
|
||||
'model': 'modelname',
|
||||
'key': 'authKey',
|
||||
'cacheTTL': '10m',
|
||||
}
|
||||
|
||||
# Send input to conversation endpoint
|
||||
result = requests.post(
|
||||
url='%s/v1.0-alpha1/conversation/%s/converse' % (base_url, CONVERSATION_COMPONENT_NAME),
|
||||
json=input
|
||||
)
|
||||
print('Input sent: What is dapr?')
|
||||
|
||||
logging.info('Input sent: What is dapr?')
|
||||
response = d.converse_alpha1(
|
||||
name='echo', inputs=inputs, temperature=0.7, context_id='chat-123', metadata=metadata
|
||||
)
|
||||
|
||||
# Parse conversation output
|
||||
data = result.json()
|
||||
output = data["outputs"][0]["result"]
|
||||
|
||||
logging.info('Output response: ' + output)
|
||||
for output in response.outputs:
|
||||
print(f'Output response: {output.result}')
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
@ -575,7 +566,7 @@ git clone https://github.com/dapr/quickstarts.git
|
|||
From the root of the Quickstarts directory, navigate into the conversation directory:
|
||||
|
||||
```bash
|
||||
cd conversation/python/http/conversation
|
||||
cd conversation/python/sdk/conversation
|
||||
```
|
||||
|
||||
Install the dependencies:
|
||||
|
@ -586,7 +577,7 @@ pip3 install -r requirements.txt
|
|||
|
||||
### Step 3: Launch the conversation service
|
||||
|
||||
Navigate back to the `http` directory and start the conversation service with the following command:
|
||||
Navigate back to the `sdk` directory and start the conversation service with the following command:
|
||||
|
||||
```bash
|
||||
dapr run --app-id conversation --resources-path ../../../components -- python3 app.py
|
||||
|
|
|
@ -251,7 +251,6 @@ class WorkflowConsoleApp:
|
|||
if __name__ == '__main__':
|
||||
app = WorkflowConsoleApp()
|
||||
app.main()
|
||||
|
||||
```
|
||||
|
||||
#### `order-processor/workflow.py`
|
||||
|
@ -276,7 +275,6 @@ wfr = WorkflowRuntime()
|
|||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
|
||||
@wfr.workflow(name="order_processing_workflow")
|
||||
def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: str):
|
||||
"""Defines the order processing workflow.
|
||||
|
@ -343,7 +341,6 @@ def notify_activity(ctx: WorkflowActivityContext, input: Notification):
|
|||
logger = logging.getLogger('NotifyActivity')
|
||||
logger.info(input.message)
|
||||
|
||||
|
||||
@wfr.activity(name="process_payment_activity")
|
||||
def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest):
|
||||
"""Defines Process Payment Activity.This is used by the workflow to process a payment"""
|
||||
|
@ -353,7 +350,6 @@ def process_payment_activity(ctx: WorkflowActivityContext, input: PaymentRequest
|
|||
+' USD')
|
||||
logger.info(f'Payment for request ID {input.request_id} processed successfully')
|
||||
|
||||
|
||||
@wfr.activity(name="verify_inventory_activity")
|
||||
def verify_inventory_activity(ctx: WorkflowActivityContext,
|
||||
input: InventoryRequest) -> InventoryResult:
|
||||
|
@ -377,8 +373,6 @@ def verify_inventory_activity(ctx: WorkflowActivityContext,
|
|||
return InventoryResult(True, inventory_item)
|
||||
return InventoryResult(False, None)
|
||||
|
||||
|
||||
|
||||
@wfr.activity(name="update_inventory_activity")
|
||||
def update_inventory_activity(ctx: WorkflowActivityContext,
|
||||
input: PaymentRequest) -> InventoryResult:
|
||||
|
@ -401,8 +395,6 @@ def update_inventory_activity(ctx: WorkflowActivityContext,
|
|||
client.save_state(store_name, input.item_being_purchased, new_val)
|
||||
logger.info(f'There are now {new_quantity} {input.item_being_purchased} left in stock')
|
||||
|
||||
|
||||
|
||||
@wfr.activity(name="request_approval_activity")
|
||||
def request_approval_activity(ctx: WorkflowActivityContext,
|
||||
input: OrderPayload):
|
||||
|
@ -413,7 +405,6 @@ def request_approval_activity(ctx: WorkflowActivityContext,
|
|||
|
||||
logger.info('Requesting approval for payment of '+f'{input["total_cost"]}'+' USD for '
|
||||
+f'{input["quantity"]}' +' ' +f'{input["item_name"]}')
|
||||
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
|
|
Loading…
Reference in New Issue