feat: implement scheduler/jobs api (#196)

* feat: implement scheduler example

Signed-off-by: mikeee <hey@mike.ee>

* feat: add macro for adding a handler

Signed-off-by: mikeee <hey@mike.ee>

* chore: fmt

Signed-off-by: mikeee <hey@mike.ee>

---------

Signed-off-by: mikeee <hey@mike.ee>
This commit is contained in:
Mike Nguyen 2024-08-13 20:28:57 +01:00 committed by GitHub
parent b8ce2aafd0
commit 3c79d18938
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 461 additions and 6 deletions

View File

@ -16,7 +16,7 @@ env:
CARGO_TERM_COLOR: always
CARGO_TOKEN: ${{ secrets.CRATES_IO_TOKEN }}
PROTOC_VERSION: 3.x
RUST_TOOLCHAIN: 1.76.0
RUST_TOOLCHAIN: 1.79.0
jobs:
lint:

View File

@ -211,7 +211,7 @@ jobs:
fail-fast: false
matrix:
examples:
[ "actors", "bindings", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ]
[ "actors", "bindings", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "jobs", "pubsub", "query_state", "secrets-bulk" ]
steps:
- name: Check out code
uses: actions/checkout@v4

View File

@ -24,7 +24,8 @@ serde_json = "1.0"
axum = "0.7.4"
tokio = { version = "1.29", features = ["sync"] }
tokio-util = { version = "0.7.10", features = ["io"] }
chrono = "0.4.24"
chrono = "0.4.38"
base64 = "0.22.1"
[build-dependencies]
tonic-build = "0.11.0"
@ -74,6 +75,10 @@ path = "examples/invoke/grpc-proxying/client.rs"
name = "invoke-grpc-proxying-server"
path = "examples/invoke/grpc-proxying/server.rs"
[[example]]
name = "jobs"
path = "examples/jobs/jobs.rs"
[[example]]
name = "publisher"
path = "examples/pubsub/publisher.rs"

36
examples/jobs/README.md Normal file
View File

@ -0,0 +1,36 @@
# Jobs Example
This is a simple example that demonstrates Dapr's job scheduling capabilities.
## Running
To run this example:
1. Run the multi-app run template:
<!-- STEP
name: Run multi-app
output_match_mode: substring
match_order: none
expected_stdout_lines:
- 'job scheduled successfully'
- 'job received'
- 'job received'
- 'job received'
- 'received job on ping_pong_handler'
- 'received job on ping_pong_handler'
- 'received job on ping_pong_handler'
- 'received job on ping_pong_handler'
- 'received job on ping_pong_handler'
background: true
sleep: 30
timeout_seconds: 30
-->
```bash
dapr run -f .
```
<!-- END_STEP -->
2. Stop with `ctrl + c`

10
examples/jobs/dapr.yaml Normal file
View File

@ -0,0 +1,10 @@
version: 1
common:
daprdLogDestination: console
apps:
- appID: jobs-example
appDirPath: ./
appProtocol: grpc
appPort: 50051
logLevel: debug
command: [ "cargo", "run", "--example", "jobs" ]

160
examples/jobs/jobs.rs Normal file
View File

@ -0,0 +1,160 @@
use std::time::Duration;
use base64::prelude::*;
use dapr::add_job_handler_alpha;
use dapr::client::JobBuilder;
use dapr::dapr::dapr::proto::runtime::v1::app_callback_alpha_server::AppCallbackAlphaServer;
use dapr::dapr::dapr::proto::runtime::v1::{JobEventRequest, JobEventResponse};
use dapr::server::appcallbackalpha::{AppCallbackServiceAlpha, JobHandlerMethod};
use prost_types::Any;
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use tonic::transport::Server;
use tonic::Status;
type DaprClient = dapr::Client<dapr::client::TonicClient>;
#[derive(Serialize, Deserialize, Debug)]
struct Backup {
task: String,
metadata: Option<Metadata>,
}
#[derive(Serialize, Deserialize, Debug)]
struct Metadata {
db_name: String,
backup_location: String,
}
#[derive(Serialize, Deserialize, Debug)]
struct JsonAny {
type_url: String,
value: String,
}
async fn ping_pong_handler(_request: JobEventRequest) -> Result<JobEventResponse, Status> {
// Implement the logic for handling the backup job request
// ...
println!("received job on ping_pong_handler");
Ok(JobEventResponse::default())
}
async fn backup_job_handler(request: JobEventRequest) -> Result<JobEventResponse, Status> {
// The logic for handling the backup job request
if request.data.is_some() {
// weird value - any type is actually put into the value
let any = request.data.unwrap().value;
// parse any value
let any_parsed: JsonAny = serde_json::from_slice(&any).unwrap();
// Decode the base64-encoded value field
let decoded_value = BASE64_STANDARD.decode(any_parsed.value).unwrap();
// Deserialize the decoded value into a Backup struct
let backup_val: Backup = serde_json::from_slice(&decoded_value).unwrap();
println!("job received: {:?}", backup_val);
}
Ok(JobEventResponse::default())
}
#[tokio::main]
#[allow(non_camel_case_types)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio::spawn(async move {
let server_addr = "127.0.0.1:50051".parse().unwrap();
println!("AppCallbackAlpha server listening on {server_addr}");
let mut alpha_callback_service = AppCallbackServiceAlpha::new();
let backup_job_handler_name = "prod-db-backup";
add_job_handler_alpha!(
alpha_callback_service,
backup_job_handler_name,
backup_job_handler
);
let ping_pong_handler_name = "ping-pong";
add_job_handler_alpha!(
alpha_callback_service,
ping_pong_handler_name,
ping_pong_handler
);
Server::builder()
.add_service(AppCallbackAlphaServer::new(alpha_callback_service))
.serve(server_addr)
.await
.unwrap();
});
sleep(Duration::from_secs(5)).await;
// Client
let client_addr = "https://127.0.0.1".to_string();
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
let address = format!("{}:{}", client_addr, port);
println!("attempting to create a dapr client: {}", address);
// Create the client
let mut client = DaprClient::connect(client_addr).await?;
println!("client created");
// define job data in json
let job = Backup {
task: "db-backup".to_string(),
metadata: Some(Metadata {
db_name: "prod-db".to_string(),
backup_location: "/path/to/backup".to_string(),
}),
};
let any = Any {
type_url: "type.googleapis.com/io.dapr.RustTest".to_string(),
value: serde_json::to_vec(&job).unwrap(),
};
let job = JobBuilder::new("prod-db-backup")
.with_schedule("@every 1s")
.with_data(any)
.build();
let _schedule_resp = client.schedule_job_alpha1(job).await?;
println!("job scheduled successfully");
sleep(Duration::from_secs(3)).await;
let get_resp = client.get_job_alpha1("prod-db-backup").await?;
let get_resp_backup: Backup =
serde_json::from_slice(&get_resp.clone().job.unwrap().data.unwrap().value).unwrap();
println!("job retrieved: {:?}", get_resp_backup);
let _delete_resp = client.delete_job_alpha1("prod-db-backup").await?;
println!("job deleted");
sleep(Duration::from_secs(5)).await;
// Second handler
let ping_pong_job = JobBuilder::new("ping-pong")
.with_schedule("@every 1s")
.with_repeats(5)
.build();
let _schedule_resp = client.schedule_job_alpha1(ping_pong_job).await?;
sleep(Duration::from_secs(10)).await;
Ok(())
}

View File

@ -1,10 +1,11 @@
use serde_json::Value;
use std::collections::HashMap;
use std::fmt::Debug;
use async_trait::async_trait;
use futures::StreamExt;
use prost_types::Any;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::io::AsyncRead;
use tonic::codegen::tokio_stream;
use tonic::{transport::Channel as TonicChannel, Request};
@ -497,6 +498,42 @@ impl<T: DaprInterface> Client<T> {
.collect();
self.0.decrypt(requested_items).await
}
/// Schedules a job with the Dapr Distributed Scheduler
///
/// # Arguments
///
/// * job - The job to schedule
pub async fn schedule_job_alpha1(&mut self, job: Job) -> Result<ScheduleJobResponse, Error> {
let request = ScheduleJobRequest {
job: Some(job.clone()),
};
self.0.schedule_job_alpha1(request).await
}
/// Retrieves a scheduled job from the Dapr Distributed Scheduler
///
/// # Arguments
///
/// * name - The name of the job to retrieve
pub async fn get_job_alpha1(&mut self, name: &str) -> Result<GetJobResponse, Error> {
let request = GetJobRequest {
name: name.to_string(),
};
self.0.get_job_alpha1(request).await
}
/// Deletes a scheduled job from the Dapr Distributed Scheduler
///
/// # Arguments
///
/// * name - The name of the job to delete
pub async fn delete_job_alpha1(&mut self, name: &str) -> Result<DeleteJobResponse, Error> {
let request = DeleteJobRequest {
name: name.to_string(),
};
self.0.delete_job_alpha1(request).await
}
}
#[async_trait]
@ -547,6 +584,18 @@ pub trait DaprInterface: Sized {
-> Result<Vec<StreamPayload>, Status>;
async fn decrypt(&mut self, payload: Vec<DecryptRequest>) -> Result<Vec<u8>, Status>;
async fn schedule_job_alpha1(
&mut self,
request: ScheduleJobRequest,
) -> Result<ScheduleJobResponse, Error>;
async fn get_job_alpha1(&mut self, request: GetJobRequest) -> Result<GetJobResponse, Error>;
async fn delete_job_alpha1(
&mut self,
request: DeleteJobRequest,
) -> Result<DeleteJobResponse, Error>;
}
#[async_trait]
@ -717,6 +766,30 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
}
Ok(data)
}
async fn schedule_job_alpha1(
&mut self,
request: ScheduleJobRequest,
) -> Result<ScheduleJobResponse, Error> {
Ok(self.schedule_job_alpha1(request).await?.into_inner())
}
async fn get_job_alpha1(&mut self, request: GetJobRequest) -> Result<GetJobResponse, Error> {
Ok(self
.get_job_alpha1(Request::new(request))
.await?
.into_inner())
}
async fn delete_job_alpha1(
&mut self,
request: DeleteJobRequest,
) -> Result<DeleteJobResponse, Error> {
Ok(self
.delete_job_alpha1(Request::new(request))
.await?
.into_inner())
}
}
/// A request from invoking a service
@ -814,6 +887,27 @@ pub type EncryptRequestOptions = crate::dapr::dapr::proto::runtime::v1::EncryptR
/// Decryption request options
pub type DecryptRequestOptions = crate::dapr::dapr::proto::runtime::v1::DecryptRequestOptions;
/// The basic job structure
pub type Job = crate::dapr::dapr::proto::runtime::v1::Job;
/// A request to schedule a job
pub type ScheduleJobRequest = crate::dapr::dapr::proto::runtime::v1::ScheduleJobRequest;
/// A response from a schedule job request
pub type ScheduleJobResponse = crate::dapr::dapr::proto::runtime::v1::ScheduleJobResponse;
/// A request to get a job
pub type GetJobRequest = crate::dapr::dapr::proto::runtime::v1::GetJobRequest;
/// A response from a get job request
pub type GetJobResponse = crate::dapr::dapr::proto::runtime::v1::GetJobResponse;
/// A request to delete a job
pub type DeleteJobRequest = crate::dapr::dapr::proto::runtime::v1::DeleteJobRequest;
/// A response from a delete job request
pub type DeleteJobResponse = crate::dapr::dapr::proto::runtime::v1::DeleteJobResponse;
type StreamPayload = crate::dapr::dapr::proto::common::v1::StreamPayload;
impl<K> From<(K, Vec<u8>)> for common_v1::StateItem
where
@ -835,3 +929,63 @@ impl<T: AsyncRead> ReaderStream<T> {
ReaderStream(tokio_util::io::ReaderStream::new(data))
}
}
#[derive(Debug)]
pub struct JobBuilder {
schedule: Option<String>,
data: Option<Any>,
name: String,
ttl: Option<String>,
repeats: Option<u32>,
due_time: Option<String>,
}
impl JobBuilder {
/// Create a new Job to be scheduled
pub fn new(name: &str) -> Self {
JobBuilder {
schedule: None,
data: None,
name: name.to_string(),
ttl: None,
repeats: None,
due_time: None,
}
}
pub fn with_schedule(mut self, schedule: &str) -> Self {
self.schedule = Some(schedule.into());
self
}
pub fn with_data(mut self, data: Any) -> Self {
self.data = Some(data);
self
}
pub fn with_ttl(mut self, ttl: &str) -> Self {
self.ttl = Some(ttl.into());
self
}
pub fn with_repeats(mut self, repeats: u32) -> Self {
self.repeats = Some(repeats);
self
}
pub fn with_due_time(mut self, due_time: &str) -> Self {
self.due_time = Some(due_time.into());
self
}
pub fn build(self) -> Job {
Job {
schedule: self.schedule,
data: self.data,
name: self.name,
ttl: self.ttl,
repeats: self.repeats,
due_time: self.due_time,
}
}
}

View File

@ -0,0 +1,89 @@
use std::collections::HashMap;
use tonic::{Code, Request, Response, Status};
use crate::dapr::dapr::proto::runtime;
use crate::dapr::dapr::proto::runtime::v1::app_callback_alpha_server::AppCallbackAlpha;
pub struct AppCallbackServiceAlpha {
pub job_handlers: HashMap<String, Box<dyn JobHandlerMethod + Send + Sync + 'static>>,
}
impl AppCallbackServiceAlpha {
pub fn new() -> Self {
AppCallbackServiceAlpha {
job_handlers: HashMap::new(),
}
}
pub fn add_job_handler(&mut self, job_name: String, handler: Box<dyn JobHandlerMethod>) {
self.job_handlers.insert(job_name, handler);
}
}
impl Default for AppCallbackServiceAlpha {
fn default() -> Self {
Self::new()
}
}
#[tonic::async_trait]
impl AppCallbackAlpha for AppCallbackServiceAlpha {
async fn on_bulk_topic_event_alpha1(
&self,
_request: Request<runtime::v1::TopicEventBulkRequest>,
) -> Result<Response<runtime::v1::TopicEventBulkResponse>, Status> {
Err(Status::unavailable("unimplemented"))
}
async fn on_job_event_alpha1(
&self,
request: Request<runtime::v1::JobEventRequest>,
) -> Result<Response<runtime::v1::JobEventResponse>, Status> {
let request_inner = request.into_inner();
let job_name = request_inner
.method
.strip_prefix("job/")
.unwrap()
.to_string();
if let Some(handler) = self.job_handlers.get(&job_name) {
let handle_response = handler.handler(request_inner).await;
handle_response.map(Response::new)
} else {
Err(Status::new(Code::Internal, "Job Handler Not Found"))
}
}
}
#[macro_export]
macro_rules! add_job_handler_alpha {
($app_callback_service:expr, $handler_name:ident, $handler_fn:expr) => {
pub struct $handler_name {}
#[async_trait::async_trait]
impl JobHandlerMethod for $handler_name {
async fn handler(&self, request: JobEventRequest) -> Result<JobEventResponse, Status> {
$handler_fn(request).await
}
}
impl $handler_name {
pub fn new() -> Self {
$handler_name {}
}
}
let handler_name = $handler_name.to_string();
$app_callback_service.add_job_handler(handler_name, Box::new($handler_name::new()));
};
}
#[tonic::async_trait]
pub trait JobHandlerMethod: Send + Sync + 'static {
async fn handler(
&self,
request: runtime::v1::JobEventRequest,
) -> Result<runtime::v1::JobEventResponse, Status>;
}

View File

@ -1,7 +1,8 @@
pub use http::DaprHttpServer;
#[macro_use]
pub mod actor;
pub mod appcallbackalpha;
mod http;
mod models;
pub mod utils;
pub use http::DaprHttpServer;