tweak howto-schedule-jobs && add howto-handle-triggered-jobs

Signed-off-by: Cassandra Coyle <cassie@diagrid.io>
This commit is contained in:
Cassandra Coyle 2024-07-25 13:30:11 -05:00
parent 51ebfe4c15
commit 19c7ca940b
No known key found for this signature in database
2 changed files with 282 additions and 14 deletions

View File

@ -0,0 +1,210 @@
---
type: docs
title: "How-To: Handle triggered jobs"
linkTitle: "How-To: Handle triggered jobs"
weight: 2000
description: "Learn how to use the jobs API to schedule jobs and handle triggered jobs"
---
Now that you've learned what the [jobs building block]({{< ref jobs-overview.md >}}) provides, let's look at an example of how to use the API. The code examples below describe an application that schedules jobs for droid maintenance and another application to handle the triggered jobs that are sent back to the app at their dueTime.
<!--
Include a diagram or image, if possible.
-->
## Start the Scheduler service
When you [run `dapr init` in either self-hosted mode or on Kubernetes]({{< ref install-dapr-selfhost.md >}}), the Dapr Scheduler service is started.
## Set up the Jobs API
In your code, set up and schedule jobs within your application.
{{< tabs ".NET" "Go" >}}
{{% codetab %}}
<!--.net-->
{{% /codetab %}}
{{% codetab %}}
<!--go-->
The Go SDK schedules the job named `R2-D2`. For example, the following is application code to schedule jobs.
```go
package main
import (
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"time"
)
var r2d2JobBody = `{
"data": {
"@type": "type.googleapis.com/google.protobuf.StringValue",
"value": "R2-D2:Oil Change"
},
"dueTime": "2s"
}`
func main() {
//Sleep for 5 seconds to wait for job-service to start
time.Sleep(5 * time.Second)
daprHost := os.Getenv("DAPR_HOST")
if daprHost == "" {
daprHost = "http://localhost"
}
schedulerDaprHttpPort := "6280"
client := http.Client{
Timeout: 30 * time.Second,
}
// Schedule a job using the Dapr Jobs API with short dueTime
jobName := "R2-D2"
reqURL := daprHost + ":" + schedulerDaprHttpPort + "/v1.0-alpha1/jobs/" + jobName
req, err := http.NewRequest("POST", reqURL, strings.NewReader(r2d2JobBody))
if err != nil {
log.Fatal(err.Error())
}
req.Header.Set("Content-Type", "application/json")
// Schedule a job using the Dapr Jobs API
res, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
if res.StatusCode != http.StatusNoContent {
log.Fatalf("failed to register job event handler. status code: %v", res.StatusCode)
}
defer res.Body.Close()
fmt.Println("Job Scheduled:", jobName)
// ...
}
```
The following is application code to handle the triggered jobs that are sent back to the application at their dueTime.
```go
package main
import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
)
type Job struct {
TypeURL string `json:"type_url"`
Value string `json:"value"`
}
type DroidJob struct {
Droid string `json:"droid"`
Task string `json:"task"`
}
func main() {
appPort := os.Getenv("APP_PORT")
if appPort == "" {
appPort = "6200"
}
// Setup job handler
http.HandleFunc("/job/", handleJob)
fmt.Printf("Server started on port %v\n", appPort)
err := http.ListenAndServe(":"+appPort, nil)
if err != nil {
log.Fatal(err)
}
}
func handleJob(w http.ResponseWriter, r *http.Request) {
fmt.Println("Received job request...")
rawBody, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, fmt.Sprintf("error reading request body: %v", err), http.StatusBadRequest)
return
}
var jobData Job
if err := json.Unmarshal(rawBody, &jobData); err != nil {
http.Error(w, fmt.Sprintf("error decoding JSON: %v", err), http.StatusBadRequest)
return
}
// Decoding job data
decodedValue, err := base64.RawStdEncoding.DecodeString(jobData.Value)
if err != nil {
fmt.Printf("Error decoding base64: %v", err)
http.Error(w, fmt.Sprintf("error decoding base64: %v", err), http.StatusBadRequest)
return
}
// Creating Droid Job from decoded value
droidJob := setDroidJob(string(decodedValue))
fmt.Println("Starting droid:", droidJob.Droid)
fmt.Println("Executing maintenance job:", droidJob.Task)
w.WriteHeader(http.StatusOK)
}
func setDroidJob(decodedValue string) DroidJob {
// Removing new lines from decoded value - Workaround for base64 encoding issue
droidStr := strings.ReplaceAll(decodedValue, "\n", "")
droidArray := strings.Split(droidStr, ":")
droidJob := DroidJob{Droid: droidArray[0], Task: droidArray[1]}
return droidJob
}
```
{{% /codetab %}}
{{< /tabs >}}
## Run the Dapr sidecar
Once you've set up the Jobs API in your application, run the Dapr sidecar.
```bash
// service to handle the triggered jobs
// run locally to the directory where the job handler service lives
dapr run --app-id job-service --app-port 6200 --dapr-http-port 6280 -- go run .
// service to schedule a job to be sent back at some point in the future
// run locally to the directory where the job scheduler service lives
dapr run --app-id job-scheduler --app-port 6300 --dapr-http-port 6380 -- go run .
```
## Next steps
- [Learn more about the Scheduler control plane service]({{< ref "concepts/dapr-services/scheduler.md" >}})
- [Jobs API reference]({{< ref jobs_api.md >}})

View File

@ -6,7 +6,7 @@ weight: 2000
description: "Learn how to use the jobs API to schedule jobs"
---
Now that you've learned what the [jobs building block]({{< ref jobs-overview.md >}}) provides, let's look at an example of how to use the API. The code example below describes an application that schedules jobs for a **TBD** application.
Now that you've learned what the [jobs building block]({{< ref jobs-overview.md >}}) provides, let's look at an example of how to use the API. The code example below describes an application that schedules jobs for a database backup application.
<!--
Include a diagram or image, if possible.
@ -14,7 +14,7 @@ Include a diagram or image, if possible.
## Start the Scheduler service
When you [run `dapr init` in either self-hosted mode or on Kubernetes]({{< ref install-dapr-selfhost.md >}}), the Dapr scheduler service is started.
When you [run `dapr init` in either self-hosted mode or on Kubernetes]({{< ref install-dapr-selfhost.md >}}), the Dapr Scheduler service is started.
## Set up the Jobs API
@ -33,7 +33,7 @@ In your code, set up and schedule jobs within your application.
<!--go-->
The Go SDK triggers the job called `daprc.Job` to schedule jobs. Job data is housed in a backup database (`"my-prod-db"`) and are called with `ScheduleJobAlpha1`. For example:
The Go SDK schedules the job named `prod-db-backup`. Job data is housed in a backup database (`"my-prod-db"`) and are called with `ScheduleJobAlpha1`. For example:
```go
package main
@ -52,8 +52,24 @@ func main() {
// Initialize the server
server, err := daprs.NewService(":50070")
// ...
if err != nil {
log.Fatalf("failed to start the server: %v", err)
}
if err = server.AddJobEventHandler("prod-db-backup", prodDBBackupHandler); err != nil {
log.Fatalf("failed to register job event handler: %v", err)
}
log.Println("starting server")
go func() {
if err = server.Start(); err != nil {
log.Fatalf("failed to start server: %v", err)
}
}()
// Brief intermission to allow for the server to initialize.
time.Sleep(10 * time.Second)
ctx := context.Background()
// Set up backup location
@ -65,10 +81,12 @@ func main() {
},
},
)
// ...
// Set up the job data
if err != nil {
panic(err)
}
// Set up the job
job := daprc.Job{
Name: "prod-db-backup",
Schedule: "@every 1s",
@ -80,23 +98,63 @@ func main() {
// Create the client
client, err := daprc.NewClient()
//...
if err != nil {
panic(err)
}
defer client.Close()
// Schedule job
err = client.ScheduleJobAlpha1(ctx, &job)
if err != nil {
panic(err)
}
// ...
// Get job
fmt.Println("schedulejob - success")
time.Sleep(3 * time.Second)
// Get job
resp, err := client.GetJobAlpha1(ctx, "prod-db-backup")
// ...
if err != nil {
panic(err)
}
fmt.Printf("getjob - resp: %v\n", resp) // parse
// Delete job
err = client.DeleteJobAlpha1(ctx, "prod-db-backup")
// ..
if err != nil {
fmt.Printf("job deletion error: %v\n", err)
} else {
fmt.Println("deletejob - success")
}
if err = server.Stop(); err != nil {
log.Fatalf("failed to stop server: %v\n", err)
}
}
var jobCount = 0
func prodDBBackupHandler(ctx context.Context, job *common.JobEvent) error {
var jobData common.Job
if err := json.Unmarshal(job.Data, &jobData); err != nil {
return fmt.Errorf("failed to unmarshal job: %v", err)
}
decodedPayload, err := base64.StdEncoding.DecodeString(jobData.Value)
if err != nil {
return fmt.Errorf("failed to decode job payload: %v", err)
}
var jobPayload api.DBBackup
if err := json.Unmarshal(decodedPayload, &jobPayload); err != nil {
return fmt.Errorf("failed to unmarshal payload: %v", err)
}
fmt.Printf("job %d received:\n type: %v \n typeurl: %v\n value: %v\n extracted payload: %v\n", jobCount, job.JobType, jobData.TypeURL, jobData.Value, jobPayload)
jobCount++
return nil
}
```
{{% /codetab %}}
@ -109,7 +167,7 @@ var jobCount = 0
Once you've set up the Jobs API in your application, run the Dapr sidecar.
```bash
dapr run --app-id=distributed-scheduler --metrics-port=9091 --scheduler-host-address=localhost:50006 --dapr-grpc-port 50001 --app-port 50070 --app-protocol grpc --log-level debug go run ./main.go
dapr run --app-id=distributed-scheduler --metrics-port=9091 --dapr-grpc-port 50001 --app-port 50070 --app-protocol grpc --log-level debug go run ./main.go
```
## Next steps