Merge pull request #1160 from rochabr/jobs-python

Jobs API  Quickstart for Python - HTTP
This commit is contained in:
Alice Gibbons 2025-02-13 12:19:44 +00:00 committed by GitHub
commit b060c50210
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 327 additions and 0 deletions

173
jobs/python/http/README.md Normal file
View File

@ -0,0 +1,173 @@
# Dapr Jobs API (HTTP Client)
In this quickstart, you'll schedule, get, and delete a job using Dapr's Job API. This API is responsible for scheduling and running jobs at a specific time or interval.
Visit [this](https://docs.dapr.io/developing-applications/building-blocks/jobs/) link for more information about Dapr and the Jobs API.
This quickstart includes two apps:
- `job-scheduler/app.py`, responsible for scheduling, retrieving and deleting jobs.
- `job-service/app.py`, responsible for handling the triggered jobs.
## Prerequisites
- [Python 3.8+](https://www.python.org/downloads/)
- [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/)
- [Initialized Dapr environment](https://docs.dapr.io/getting-started/install-dapr-selfhost/)
## Install dependencies
```bash
pip install -r requirements.txt
```
## Run all apps with multi-app run template file
This section shows how to run both applications at once using [multi-app run template files](https://docs.dapr.io/developing-applications/local-development/multi-app-dapr-run/multi-app-overview/) with `dapr run -f .`. This enables you to test the interactions between multiple applications and will `schedule`, `run`, `get`, and `delete` jobs within a single process.
Open a new terminal window and run the multi app run template:
<!-- STEP
name: Run multi app run template
expected_stdout_lines:
- '== APP - job-service == Received job request...'
- '== APP - job-service == Executing maintenance job: Oil Change'
- '== APP - job-scheduler == Job Scheduled: C-3PO'
- '== APP - job-service == Received job request...'
- '== APP - job-service == Executing maintenance job: Limb Calibration'
expected_stderr_lines:
output_match_mode: substring
match_order: none
background: true
sleep: 60
timeout_seconds: 120
-->
```bash
dapr run -f .
```
The terminal console output should look similar to this, where:
- The `R2-D2` job is being scheduled.
- The `R2-D2` job is being executed after 2 seconds.
- The `C-3PO` job is being scheduled.
- The `C-3PO` job is being retrieved.
```text
== APP - job-scheduler == Job Scheduled: R2-D2
== APP - job-service == Received job request...
== APP - job-service == Starting droid: R2-D2
== APP - job-service == Executing maintenance job: Oil Change
== APP - job-scheduler == Job Scheduled: C-3PO
== APP - job-scheduler == Job details: {"name":"C-3PO", "dueTime":"30s", "data":{"@type":"ttype.googleapis.com/google.protobuf.StringValue", "expression":"C-3PO:Limb Calibration"}}
```
After 30 seconds, the terminal output should present the `C-3PO` job being processed:
```text
== APP - job-service == Received job request...
== APP - job-service == Starting droid: C-3PO
== APP - job-service == Executing maintenance job: Limb Calibration
```
<!-- END_STEP -->
2. Stop and clean up application processes
<!-- STEP
name: Stop multi-app run
sleep: 5
-->
```bash
dapr stop -f .
```
<!-- END_STEP -->
## Run apps individually
### Start the job service
1. Open a terminal and run the `job-service` app:
```bash
cd job-service
dapr run --app-id job-service --app-port 6200 --dapr-http-port 6280 -- python app.py
```
### Schedule jobs
1. On a new terminal window, schedule the `R2-D2` Job using the Jobs API:
```bash
curl -X POST \
http://localhost:6280/v1.0-alpha1/jobs/R2D2 \
-H "Content-Type: application/json" \
-d '{
"data": {
"@type": "type.googleapis.com/google.protobuf.StringValue",
"value": "R2-D2:Oil Change"
},
"dueTime": "2s"
}'
```
Back at the `job-service` app terminal window, the output should be:
```text
== APP - job-service == Received job request...
== APP - job-service == Starting droid: R2-D2
== APP - job-service == Executing maintenance job: Oil Change
```
2. On the same terminal window, schedule the `C-3PO` Job using the Jobs API:
```bash
curl -X POST \
http://localhost:6280/v1.0-alpha1/jobs/c-3po \
-H "Content-Type: application/json" \
-d '{
"data": {
"@type": "type.googleapis.com/google.protobuf.StringValue",
"value": "C-3PO:Limb Calibration"
},
"dueTime": "30s"
}'
```
### Get a scheduled job
1. On the same terminal window, run the command below to get the recently scheduled `C-3PO` job:
```bash
curl -X GET http://localhost:6280/v1.0-alpha1/jobs/c-3po -H "Content-Type: application/json"
```
You should see the following:
```text
{"name":"C-3PO", "dueTime":"30s", "data":{"@type":"type.googleapis.com/google.protobuf.StringValue", "expression":"C-3PO:Limb Calibration"}}
```
### Delete a scheduled job
1. On the same terminal window, run the command below to delete the recently scheduled `C-3PO` job:
```bash
curl -X DELETE http://localhost:6280/v1.0-alpha1/jobs/c-3po -H "Content-Type: application/json"
```
2. Run the command below to attempt to retrieve the deleted job:
```bash
curl -X GET http://localhost:6280/v1.0-alpha1/jobs/c-3po -H "Content-Type: application/json"
```
You should see an error message indicating that the job was not found:
```text
{"errorCode":"ERR_JOBS_NOT_FOUND","message":"job not found: app||default||job-service||c-3po"}
```

View File

@ -0,0 +1,12 @@
version: 1
apps:
- appDirPath: ./job-service/
appID: job-service
appPort: 6200
daprHTTPPort: 6280
command: ["python3", "app.py"]
- appDirPath: ./job-scheduler/
appID: job-scheduler
appPort: 6300
daprHTTPPort: 6380
command: ["python3", "app.py"]

View File

@ -0,0 +1,73 @@
import os
import time
import requests
import json
C3PO_JOB_BODY = {
"data": {"@type": "type.googleapis.com/google.protobuf.StringValue", "value": "C-3PO:Limb Calibration"},
"dueTime": "10s",
}
R2D2_JOB_BODY = {
"data": {"@type": "type.googleapis.com/google.protobuf.StringValue", "value": "R2-D2:Oil Change"},
"dueTime": "2s"
}
def schedule_job(host: str, port: str, job_name: str, job_body: dict) -> None:
req_url = f"{host}:{port}/v1.0-alpha1/jobs/{job_name}"
try:
response = requests.post(
req_url,
json=job_body,
headers={"Content-Type": "application/json"},
timeout=15
)
# Accept both 200 and 204 as success codes
if response.status_code not in [200, 204]:
raise Exception(f"Failed to schedule job. Status code: {response.status_code}, Response: {response.text}")
print(f"Job Scheduled: {job_name}")
if response.text:
print(f"Response: {response.text}")
except requests.exceptions.RequestException as e:
print(f"Error scheduling job {job_name}: {str(e)}")
raise
def get_job_details(host: str, port: str, job_name: str) -> None:
req_url = f"{host}:{port}/v1.0-alpha1/jobs/{job_name}"
try:
response = requests.get(req_url, timeout=15)
if response.status_code in [200, 204]:
print(f"Job details for {job_name}: {response.text}")
else:
print(f"Failed to get job details. Status code: {response.status_code}, Response: {response.text}")
except requests.exceptions.RequestException as e:
print(f"Error getting job details for {job_name}: {str(e)}")
raise
def main():
# Wait for services to be ready
time.sleep(5)
dapr_host = os.getenv('DAPR_HOST', 'http://localhost')
scheduler_dapr_http_port = os.getenv('SCHEDULER_DAPR_HTTP_PORT', '6280')
# Schedule R2-D2 job
schedule_job(dapr_host, scheduler_dapr_http_port, "R2-D2", R2D2_JOB_BODY)
time.sleep(5)
# Schedule C-3PO job
schedule_job(dapr_host, scheduler_dapr_http_port, "C-3PO", C3PO_JOB_BODY)
time.sleep(5)
# Get C-3PO job details
get_job_details(dapr_host, scheduler_dapr_http_port, "C-3PO")
time.sleep(5)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,66 @@
import os
import json
import traceback
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
class DroidJob:
def __init__(self, droid: str, task: str):
self.droid = droid
self.task = task
def set_droid_job(decoded_value: str) -> DroidJob:
# Remove newlines from decoded value and split into droid and task
droid_str = decoded_value.replace('\n', '')
droid_array = droid_str.split(':')
return DroidJob(droid_array[0], droid_array[1])
class JobHandler(BaseHTTPRequestHandler):
def _send_response(self, status_code: int, message: str = ""):
self.send_response(status_code)
self.send_header('Content-type', 'application/json')
self.end_headers()
if message:
self.wfile.write(json.dumps({"message": message}).encode('utf-8'))
def do_POST(self):
print('Received job request...', flush=True)
try:
# Check if path starts with /job/
if not self.path.startswith('/job/'):
self._send_response(404, "Not Found")
return
# Read request body
content_length = int(self.headers.get('Content-Length', 0))
raw_data = self.rfile.read(content_length).decode('utf-8')
# Parse outer JSON data
job_data = json.loads(raw_data)
# Extract value directly from the job data
value = job_data.get('value', '')
# Create DroidJob from value
droid_job = set_droid_job(value)
print("Starting droid: " + droid_job.droid, flush=True)
print("Executing maintenance job: " + droid_job.task, flush=True)
self._send_response(200)
except Exception as e:
print("Error processing job request:", flush= True)
print(traceback.format_exc())
self._send_response(400, f"Error processing job: {str(e)}")
def run_server(port: int):
server_address = ('', port)
httpd = HTTPServer(server_address, JobHandler)
print("Server started on port " + str(port), flush=True)
httpd.serve_forever()
if __name__ == '__main__':
app_port = int(os.getenv('APP_PORT', '6200'))
run_server(app_port)

View File

@ -0,0 +1,2 @@
include ../../../docker.mk
include ../../../validate.mk

View File

@ -0,0 +1 @@
requests==2.31.0