From 3c79d189385ae481de3a7b56caae205023267cf7 Mon Sep 17 00:00:00 2001 From: Mike Nguyen Date: Tue, 13 Aug 2024 20:28:57 +0100 Subject: [PATCH] feat: implement scheduler/jobs api (#196) * feat: implement scheduler example Signed-off-by: mikeee * feat: add macro for adding a handler Signed-off-by: mikeee * chore: fmt Signed-off-by: mikeee --------- Signed-off-by: mikeee --- .github/workflows/ci.yml | 2 +- .github/workflows/validate-examples.yml | 2 +- Cargo.toml | 7 +- examples/jobs/README.md | 36 ++++++ examples/jobs/dapr.yaml | 10 ++ examples/jobs/jobs.rs | 160 ++++++++++++++++++++++++ src/client.rs | 156 ++++++++++++++++++++++- src/server/appcallbackalpha.rs | 89 +++++++++++++ src/server/mod.rs | 5 +- 9 files changed, 461 insertions(+), 6 deletions(-) create mode 100644 examples/jobs/README.md create mode 100644 examples/jobs/dapr.yaml create mode 100644 examples/jobs/jobs.rs create mode 100644 src/server/appcallbackalpha.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d33d742..6e56cf4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ env: CARGO_TERM_COLOR: always CARGO_TOKEN: ${{ secrets.CRATES_IO_TOKEN }} PROTOC_VERSION: 3.x - RUST_TOOLCHAIN: 1.76.0 + RUST_TOOLCHAIN: 1.79.0 jobs: lint: diff --git a/.github/workflows/validate-examples.yml b/.github/workflows/validate-examples.yml index 9643d4e..e4344e2 100644 --- a/.github/workflows/validate-examples.yml +++ b/.github/workflows/validate-examples.yml @@ -211,7 +211,7 @@ jobs: fail-fast: false matrix: examples: - [ "actors", "bindings", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ] + [ "actors", "bindings", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "jobs", "pubsub", "query_state", "secrets-bulk" ] steps: - name: Check out code uses: actions/checkout@v4 diff --git a/Cargo.toml b/Cargo.toml index 4d06b99..5ce85fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,8 @@ serde_json = "1.0" axum = "0.7.4" tokio = { version = "1.29", features = ["sync"] } tokio-util = { version = "0.7.10", features = ["io"] } -chrono = "0.4.24" +chrono = "0.4.38" +base64 = "0.22.1" [build-dependencies] tonic-build = "0.11.0" @@ -74,6 +75,10 @@ path = "examples/invoke/grpc-proxying/client.rs" name = "invoke-grpc-proxying-server" path = "examples/invoke/grpc-proxying/server.rs" +[[example]] +name = "jobs" +path = "examples/jobs/jobs.rs" + [[example]] name = "publisher" path = "examples/pubsub/publisher.rs" diff --git a/examples/jobs/README.md b/examples/jobs/README.md new file mode 100644 index 0000000..6a4809b --- /dev/null +++ b/examples/jobs/README.md @@ -0,0 +1,36 @@ +# Jobs Example + +This is a simple example that demonstrates Dapr's job scheduling capabilities. + +## Running + +To run this example: + +1. Run the multi-app run template: + + + +```bash +dapr run -f . +``` + + + +2. Stop with `ctrl + c` diff --git a/examples/jobs/dapr.yaml b/examples/jobs/dapr.yaml new file mode 100644 index 0000000..bb808ff --- /dev/null +++ b/examples/jobs/dapr.yaml @@ -0,0 +1,10 @@ +version: 1 +common: + daprdLogDestination: console +apps: + - appID: jobs-example + appDirPath: ./ + appProtocol: grpc + appPort: 50051 + logLevel: debug + command: [ "cargo", "run", "--example", "jobs" ] \ No newline at end of file diff --git a/examples/jobs/jobs.rs b/examples/jobs/jobs.rs new file mode 100644 index 0000000..27613b5 --- /dev/null +++ b/examples/jobs/jobs.rs @@ -0,0 +1,160 @@ +use std::time::Duration; + +use base64::prelude::*; +use dapr::add_job_handler_alpha; +use dapr::client::JobBuilder; +use dapr::dapr::dapr::proto::runtime::v1::app_callback_alpha_server::AppCallbackAlphaServer; +use dapr::dapr::dapr::proto::runtime::v1::{JobEventRequest, JobEventResponse}; +use dapr::server::appcallbackalpha::{AppCallbackServiceAlpha, JobHandlerMethod}; +use prost_types::Any; +use serde::{Deserialize, Serialize}; +use tokio::time::sleep; +use tonic::transport::Server; +use tonic::Status; + +type DaprClient = dapr::Client; + +#[derive(Serialize, Deserialize, Debug)] +struct Backup { + task: String, + metadata: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +struct Metadata { + db_name: String, + backup_location: String, +} + +#[derive(Serialize, Deserialize, Debug)] +struct JsonAny { + type_url: String, + value: String, +} + +async fn ping_pong_handler(_request: JobEventRequest) -> Result { + // Implement the logic for handling the backup job request + // ... + println!("received job on ping_pong_handler"); + + Ok(JobEventResponse::default()) +} +async fn backup_job_handler(request: JobEventRequest) -> Result { + // The logic for handling the backup job request + + if request.data.is_some() { + // weird value - any type is actually put into the value + let any = request.data.unwrap().value; + + // parse any value + let any_parsed: JsonAny = serde_json::from_slice(&any).unwrap(); + + // Decode the base64-encoded value field + let decoded_value = BASE64_STANDARD.decode(any_parsed.value).unwrap(); + + // Deserialize the decoded value into a Backup struct + let backup_val: Backup = serde_json::from_slice(&decoded_value).unwrap(); + + println!("job received: {:?}", backup_val); + } + + Ok(JobEventResponse::default()) +} + +#[tokio::main] +#[allow(non_camel_case_types)] +async fn main() -> Result<(), Box> { + tokio::spawn(async move { + let server_addr = "127.0.0.1:50051".parse().unwrap(); + + println!("AppCallbackAlpha server listening on {server_addr}"); + + let mut alpha_callback_service = AppCallbackServiceAlpha::new(); + + let backup_job_handler_name = "prod-db-backup"; + add_job_handler_alpha!( + alpha_callback_service, + backup_job_handler_name, + backup_job_handler + ); + + let ping_pong_handler_name = "ping-pong"; + add_job_handler_alpha!( + alpha_callback_service, + ping_pong_handler_name, + ping_pong_handler + ); + + Server::builder() + .add_service(AppCallbackAlphaServer::new(alpha_callback_service)) + .serve(server_addr) + .await + .unwrap(); + }); + + sleep(Duration::from_secs(5)).await; + + // Client + + let client_addr = "https://127.0.0.1".to_string(); + + let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?; + let address = format!("{}:{}", client_addr, port); + + println!("attempting to create a dapr client: {}", address); + + // Create the client + let mut client = DaprClient::connect(client_addr).await?; + + println!("client created"); + + // define job data in json + let job = Backup { + task: "db-backup".to_string(), + metadata: Some(Metadata { + db_name: "prod-db".to_string(), + backup_location: "/path/to/backup".to_string(), + }), + }; + + let any = Any { + type_url: "type.googleapis.com/io.dapr.RustTest".to_string(), + value: serde_json::to_vec(&job).unwrap(), + }; + + let job = JobBuilder::new("prod-db-backup") + .with_schedule("@every 1s") + .with_data(any) + .build(); + + let _schedule_resp = client.schedule_job_alpha1(job).await?; + + println!("job scheduled successfully"); + + sleep(Duration::from_secs(3)).await; + + let get_resp = client.get_job_alpha1("prod-db-backup").await?; + + let get_resp_backup: Backup = + serde_json::from_slice(&get_resp.clone().job.unwrap().data.unwrap().value).unwrap(); + + println!("job retrieved: {:?}", get_resp_backup); + + let _delete_resp = client.delete_job_alpha1("prod-db-backup").await?; + + println!("job deleted"); + + sleep(Duration::from_secs(5)).await; + + // Second handler + + let ping_pong_job = JobBuilder::new("ping-pong") + .with_schedule("@every 1s") + .with_repeats(5) + .build(); + let _schedule_resp = client.schedule_job_alpha1(ping_pong_job).await?; + + sleep(Duration::from_secs(10)).await; + + Ok(()) +} diff --git a/src/client.rs b/src/client.rs index 7ba101b..dbd24e4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,10 +1,11 @@ -use serde_json::Value; use std::collections::HashMap; +use std::fmt::Debug; use async_trait::async_trait; use futures::StreamExt; use prost_types::Any; use serde::{Deserialize, Serialize}; +use serde_json::Value; use tokio::io::AsyncRead; use tonic::codegen::tokio_stream; use tonic::{transport::Channel as TonicChannel, Request}; @@ -497,6 +498,42 @@ impl Client { .collect(); self.0.decrypt(requested_items).await } + + /// Schedules a job with the Dapr Distributed Scheduler + /// + /// # Arguments + /// + /// * job - The job to schedule + pub async fn schedule_job_alpha1(&mut self, job: Job) -> Result { + let request = ScheduleJobRequest { + job: Some(job.clone()), + }; + self.0.schedule_job_alpha1(request).await + } + + /// Retrieves a scheduled job from the Dapr Distributed Scheduler + /// + /// # Arguments + /// + /// * name - The name of the job to retrieve + pub async fn get_job_alpha1(&mut self, name: &str) -> Result { + let request = GetJobRequest { + name: name.to_string(), + }; + self.0.get_job_alpha1(request).await + } + + /// Deletes a scheduled job from the Dapr Distributed Scheduler + /// + /// # Arguments + /// + /// * name - The name of the job to delete + pub async fn delete_job_alpha1(&mut self, name: &str) -> Result { + let request = DeleteJobRequest { + name: name.to_string(), + }; + self.0.delete_job_alpha1(request).await + } } #[async_trait] @@ -547,6 +584,18 @@ pub trait DaprInterface: Sized { -> Result, Status>; async fn decrypt(&mut self, payload: Vec) -> Result, Status>; + + async fn schedule_job_alpha1( + &mut self, + request: ScheduleJobRequest, + ) -> Result; + + async fn get_job_alpha1(&mut self, request: GetJobRequest) -> Result; + + async fn delete_job_alpha1( + &mut self, + request: DeleteJobRequest, + ) -> Result; } #[async_trait] @@ -717,6 +766,30 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient { } Ok(data) } + + async fn schedule_job_alpha1( + &mut self, + request: ScheduleJobRequest, + ) -> Result { + Ok(self.schedule_job_alpha1(request).await?.into_inner()) + } + + async fn get_job_alpha1(&mut self, request: GetJobRequest) -> Result { + Ok(self + .get_job_alpha1(Request::new(request)) + .await? + .into_inner()) + } + + async fn delete_job_alpha1( + &mut self, + request: DeleteJobRequest, + ) -> Result { + Ok(self + .delete_job_alpha1(Request::new(request)) + .await? + .into_inner()) + } } /// A request from invoking a service @@ -814,6 +887,27 @@ pub type EncryptRequestOptions = crate::dapr::dapr::proto::runtime::v1::EncryptR /// Decryption request options pub type DecryptRequestOptions = crate::dapr::dapr::proto::runtime::v1::DecryptRequestOptions; +/// The basic job structure +pub type Job = crate::dapr::dapr::proto::runtime::v1::Job; + +/// A request to schedule a job +pub type ScheduleJobRequest = crate::dapr::dapr::proto::runtime::v1::ScheduleJobRequest; + +/// A response from a schedule job request +pub type ScheduleJobResponse = crate::dapr::dapr::proto::runtime::v1::ScheduleJobResponse; + +/// A request to get a job +pub type GetJobRequest = crate::dapr::dapr::proto::runtime::v1::GetJobRequest; + +/// A response from a get job request +pub type GetJobResponse = crate::dapr::dapr::proto::runtime::v1::GetJobResponse; + +/// A request to delete a job +pub type DeleteJobRequest = crate::dapr::dapr::proto::runtime::v1::DeleteJobRequest; + +/// A response from a delete job request +pub type DeleteJobResponse = crate::dapr::dapr::proto::runtime::v1::DeleteJobResponse; + type StreamPayload = crate::dapr::dapr::proto::common::v1::StreamPayload; impl From<(K, Vec)> for common_v1::StateItem where @@ -835,3 +929,63 @@ impl ReaderStream { ReaderStream(tokio_util::io::ReaderStream::new(data)) } } + +#[derive(Debug)] +pub struct JobBuilder { + schedule: Option, + data: Option, + name: String, + ttl: Option, + repeats: Option, + due_time: Option, +} + +impl JobBuilder { + /// Create a new Job to be scheduled + pub fn new(name: &str) -> Self { + JobBuilder { + schedule: None, + data: None, + name: name.to_string(), + ttl: None, + repeats: None, + due_time: None, + } + } + + pub fn with_schedule(mut self, schedule: &str) -> Self { + self.schedule = Some(schedule.into()); + self + } + + pub fn with_data(mut self, data: Any) -> Self { + self.data = Some(data); + self + } + + pub fn with_ttl(mut self, ttl: &str) -> Self { + self.ttl = Some(ttl.into()); + self + } + + pub fn with_repeats(mut self, repeats: u32) -> Self { + self.repeats = Some(repeats); + self + } + + pub fn with_due_time(mut self, due_time: &str) -> Self { + self.due_time = Some(due_time.into()); + self + } + + pub fn build(self) -> Job { + Job { + schedule: self.schedule, + data: self.data, + name: self.name, + ttl: self.ttl, + repeats: self.repeats, + due_time: self.due_time, + } + } +} diff --git a/src/server/appcallbackalpha.rs b/src/server/appcallbackalpha.rs new file mode 100644 index 0000000..49b2992 --- /dev/null +++ b/src/server/appcallbackalpha.rs @@ -0,0 +1,89 @@ +use std::collections::HashMap; + +use tonic::{Code, Request, Response, Status}; + +use crate::dapr::dapr::proto::runtime; +use crate::dapr::dapr::proto::runtime::v1::app_callback_alpha_server::AppCallbackAlpha; + +pub struct AppCallbackServiceAlpha { + pub job_handlers: HashMap>, +} + +impl AppCallbackServiceAlpha { + pub fn new() -> Self { + AppCallbackServiceAlpha { + job_handlers: HashMap::new(), + } + } + + pub fn add_job_handler(&mut self, job_name: String, handler: Box) { + self.job_handlers.insert(job_name, handler); + } +} + +impl Default for AppCallbackServiceAlpha { + fn default() -> Self { + Self::new() + } +} + +#[tonic::async_trait] +impl AppCallbackAlpha for AppCallbackServiceAlpha { + async fn on_bulk_topic_event_alpha1( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unavailable("unimplemented")) + } + + async fn on_job_event_alpha1( + &self, + request: Request, + ) -> Result, Status> { + let request_inner = request.into_inner(); + let job_name = request_inner + .method + .strip_prefix("job/") + .unwrap() + .to_string(); + + if let Some(handler) = self.job_handlers.get(&job_name) { + let handle_response = handler.handler(request_inner).await; + handle_response.map(Response::new) + } else { + Err(Status::new(Code::Internal, "Job Handler Not Found")) + } + } +} + +#[macro_export] +macro_rules! add_job_handler_alpha { + ($app_callback_service:expr, $handler_name:ident, $handler_fn:expr) => { + pub struct $handler_name {} + + #[async_trait::async_trait] + impl JobHandlerMethod for $handler_name { + async fn handler(&self, request: JobEventRequest) -> Result { + $handler_fn(request).await + } + } + + impl $handler_name { + pub fn new() -> Self { + $handler_name {} + } + } + + let handler_name = $handler_name.to_string(); + + $app_callback_service.add_job_handler(handler_name, Box::new($handler_name::new())); + }; +} + +#[tonic::async_trait] +pub trait JobHandlerMethod: Send + Sync + 'static { + async fn handler( + &self, + request: runtime::v1::JobEventRequest, + ) -> Result; +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 1e6ab6a..f39f212 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,7 +1,8 @@ +pub use http::DaprHttpServer; + #[macro_use] pub mod actor; +pub mod appcallbackalpha; mod http; mod models; pub mod utils; - -pub use http::DaprHttpServer;