docs/daprdocs/content/en/developing-applications/building-blocks/jobs/howto-schedule-and-handle-t...

11 KiB
Raw Blame History

type title linkTitle weight description
docs How-To: Schedule and handle triggered jobs How-To: Schedule and handle triggered jobs 2000 Learn how to use the jobs API to schedule and handle triggered jobs

Now that you've learned what the [jobs building block]({{< ref jobs-overview.md >}}) provides, let's look at an example of how to use the API. The code example below describes an application that schedules jobs for a database backup application and handles them at trigger time, also known as the time the job was sent back to the application because it reached it's dueTime.

Start the Scheduler service

When you [run dapr init in either self-hosted mode or on Kubernetes]({{< ref install-dapr-selfhost.md >}}), the Dapr Scheduler service is started.

Set up the Jobs API

In your code, set up and schedule jobs within your application.

{{< tabs ".NET" "Go" >}}

{{% codetab %}}

The following .NET SDK code sample schedules the job named prod-db-backup. The job data contains information about the database that you'll be seeking to backup regularly. Over the course of this example, you'll:

  • Define types used in the rest of the example
  • Register an endpoint during application startup that handles all job trigger invocations on the service
  • Register the job with Dapr

In the following example, we'll create some records that we'll serialize and register alongside the job so the information is available when the job is triggered in the future:

  • The name of the backup task (db-backup)
  • The backup task's Metadata, including:
    • The database name (DBName)
    • The database location (BackupLocation)

Create an ASP.NET Core project and add the latest version of Dapr.Jobs from NuGet. While it's not strictly necessary for your project to use the Microsoft.NET.Sdk.Web SDK to create jobs, as of the time this documentation is authored, only the service that schedules a job will receive trigger invocations for it. As those invocations expect an endpoint registered that can handle the job trigger, and that requires the Microsoft.NET.Sdk.Web SDK, it's recommended that you use an ASP.NET Core project for this purpose.

We'll start by defining some types to persist our backup job data and apply our own JSON property name attributes to the properties so they're consistent with other language examples.

//Define the types that we'll represent the job data with
internal sealed record BackupJobData([property: JsonPropertyName("task")] string Task, [property: JsonPropertyName("metadata")] BackupMetadata Metadata);
internal sealed record BackupMetadata([property: JsonPropertyName("DBName")]string DatabaseName, [property: JsonPropertyName("BackupLocation")] string BackupLocation);

Next, we'll set up a handler as part of our application setup that will be called anytime a job is triggered on our application. It's the responsibility of this handler to identify how jobs should be processed based on the job name provided.

This works by registering a handler with ASP.NET Core at /job/<job-name> where <job-name> is parameterized and simply passed into this handler delegate, meeting Dapr's expectation that an endpoint is available to handle triggered named jobs.

Populate your Program.cs file with the following:

using System.Text;
using System.Text.Json;
using Dapr.Jobs;
using Dapr.Jobs.Extensions;
using Dapr.Jobs.Models;
using Dapr.Jobs.Models.Responses;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprJobsClient();
var app = builder.Build();

//Registers an endpoint to receive and process triggered jobs
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
app.MapDaprScheduledJobHandler((string jobName, DaprJobDetails jobDetails, ILogger logger, CancellationToken cancellationToken) => {
  logger?.LogInformation("Received trigger invocation for job '{jobName}'", jobName);
  switch (jobName)
  {
    case "prod-db-backup":
      // Deserialize the job payload metadata
      var jobData = JsonSerializer.Deserialize<BackupJobData>(jobDetails.Payload);
      
      // Process the backup operation - we assume this is implemented elsewhere in your code
      await BackupDatabaseAsync(jobData, cancellationToken);
      break;
  }
}, cancellationTokenSource.Token);

await app.RunAsync();

Finally, the job itself needs to be registered with Dapr so it can be triggered at a later point in time. This could happen by injecting a DaprJobsClient into a class and executing as part of an inbound operation to your application, but for our purposes, we'll just put it at the bottom of our Program.cs file we started above. Because we'll be using the DaprJobsClient we registered with dependency injection, we need to start by creating a scope so we can access it.

//Create a scope so we can access the registered DaprJobsClient
await using scope = app.Services.CreateAsyncScope();
var daprJobsClient = scope.ServiceProvider.GetRequiredService<DaprJobsClient>();

//Create the payload we wish to present alongside our future job triggers
var jobData = new BackupJobData("db-backup", new BackupMetadata("my-prod-db", "/backup-dir")); 

//Serialize our payload to UTF-8 bytes
var serializedJobData = JsonSerializer.SerializeToUtf8Bytes(jobData);

//Schedule our backup job to run every minute, but only repeat 10 times
await daprJobsClient.ScheduleJobAsync("prod-db-backup", DaprJobSchedule.FromDuration(TimeSpan.FromMinutes(1)),
    serializedJobData, repeats: 10);

{{% /codetab %}}

{{% codetab %}}

The following Go SDK code sample schedules the job named prod-db-backup. Job data is housed in a backup database ("my-prod-db") and is scheduled with ScheduleJobAlpha1. This provides the jobData, which includes:

  • The backup Task name
  • The backup task's Metadata, including:
    • The database name (DBName)
    • The database location (BackupLocation)
package main

import (
    //...

	daprc "github.com/dapr/go-sdk/client"
	"github.com/dapr/go-sdk/examples/dist-scheduler/api"
	"github.com/dapr/go-sdk/service/common"
	daprs "github.com/dapr/go-sdk/service/grpc"
)

func main() {
    // Initialize the server
	server, err := daprs.NewService(":50070")
    // ...

	if err = server.AddJobEventHandler("prod-db-backup", prodDBBackupHandler); err != nil {
		log.Fatalf("failed to register job event handler: %v", err)
	}

	log.Println("starting server")
	go func() {
		if err = server.Start(); err != nil {
			log.Fatalf("failed to start server: %v", err)
		}
	}()
    // ...

    // Set up backup location
	jobData, err := json.Marshal(&api.DBBackup{
		Task: "db-backup",
		Metadata: api.Metadata{
			DBName:         "my-prod-db",
			BackupLocation: "/backup-dir",
		},
	},
	)
	// ...
}

The job is scheduled with a Schedule set and the amount of Repeats desired. These settings determine a max amount of times the job should be triggered and sent back to the app.

In this example, at trigger time, which is @every 1s according to the Schedule, this job is triggered and sent back to the application up to the max Repeats (10).

    // ...
    // Set up the job
	job := daprc.Job{
		Name:     "prod-db-backup",
		Schedule: "@every 1s",
		Repeats:  10,
		Data: &anypb.Any{
			Value: jobData,
		},
	}

At the trigger time, the prodDBBackupHandler function is called, executing the desired business logic for this job at trigger time. For example:

HTTP

When you create a job using Dapr's Jobs API, Dapr will automatically assume there is an endpoint available at /job/<job-name>. For instance, if you schedule a job named test, Dapr expects your application to listen for job events at /job/test. Ensure your application has a handler set up for this endpoint to process the job when it is triggered. For example:

Note: The following example is in Go but applies to any programming language.


func main() {
    ...
    http.HandleFunc("/job/", handleJob)
	http.HandleFunc("/job/<job-name>", specificJob)
    ...
}

func specificJob(w http.ResponseWriter, r *http.Request) {
    // Handle specific triggered job
}

func handleJob(w http.ResponseWriter, r *http.Request) {
    // Handle the triggered jobs
}

gRPC

When a job reaches its scheduled trigger time, the triggered job is sent back to the application via the following callback function:

Note: The following example is in Go but applies to any programming language with gRPC support.

import rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1"
...
func (s *JobService) OnJobEventAlpha1(ctx context.Context, in *rtv1.JobEventRequest) (*rtv1.JobEventResponse, error) {
    // Handle the triggered job
}

This function processes the triggered jobs within the context of your gRPC server. When you set up the server, ensure that you register the callback server, which will invoke this function when a job is triggered:

...
js := &JobService{}
rtv1.RegisterAppCallbackAlphaServer(server, js)

In this setup, you have full control over how triggered jobs are received and processed, as they are routed directly through this gRPC method.

SDKs

For SDK users, handling triggered jobs is simpler. When a job is triggered, Dapr will automatically route the job to the event handler you set up during the server initialization. For example, in Go, you'd register the event handler like this:

...
if err = server.AddJobEventHandler("prod-db-backup", prodDBBackupHandler); err != nil {
    log.Fatalf("failed to register job event handler: %v", err)
}

Dapr takes care of the underlying routing. When the job is triggered, your prodDBBackupHandler function is called with the triggered job data. Heres an example of handling the triggered job:

// ...

// At job trigger time this function is called
func prodDBBackupHandler(ctx context.Context, job *common.JobEvent) error {
	var jobData common.Job
	if err := json.Unmarshal(job.Data, &jobData); err != nil {
		// ...
	}

	var jobPayload api.DBBackup
	if err := json.Unmarshal(job.Data, &jobPayload); err != nil {
		// ...
	}
	fmt.Printf("job %d received:\n type: %v \n typeurl: %v\n value: %v\n extracted payload: %v\n", jobCount, job.JobType, jobData.TypeURL, jobData.Value, jobPayload)
	jobCount++
	return nil
}

{{% /codetab %}}

{{< /tabs >}}

Run the Dapr sidecar

Once you've set up the Jobs API in your application, in a terminal window run the Dapr sidecar with the following command.

{{< tabs "Go" >}}

{{% codetab %}}

dapr run --app-id=distributed-scheduler \
                --metrics-port=9091 \
                --dapr-grpc-port 50001 \
                --app-port 50070 \
                --app-protocol grpc \
                --log-level debug \
                go run ./main.go

{{% /codetab %}}

{{< /tabs >}}

Next steps

  • [Learn more about the Scheduler control plane service]({{< ref "concepts/dapr-services/scheduler.md" >}})
  • [Jobs API reference]({{< ref jobs_api.md >}})