diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 25b5773d..49c955b3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,6 +19,9 @@ jobs: - name: Checkout code uses: actions/checkout@v3 + - name: Install Protoc + uses: arduino/setup-protoc@v2 + - name: Install stable toolchain uses: actions-rs/toolchain@v1 with: @@ -39,6 +42,9 @@ jobs: - name: Checkout code uses: actions/checkout@v3 + - name: Install Protoc + uses: arduino/setup-protoc@v2 + - name: Install cargo-llvm-cov uses: taiki-e/install-action@v2 with: diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index c37b4439..fbbcbae7 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -15,6 +15,9 @@ jobs: - name: Checkout code uses: actions/checkout@v3 + - name: Install Protoc + uses: arduino/setup-protoc@v2 + - name: Install stable toolchain uses: actions-rs/toolchain@v1 with: diff --git a/Cargo.toml b/Cargo.toml index 2d470293..4f3a9c0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,8 @@ opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"] } lazy_static = "1.4" prometheus = "0.13.3" warp = "0.3.5" +tonic = "0.9.2" +tonic-health = "0.9.2" reqwest = { version = "0.11.18", features = ["stream"] } futures = "0.3.28" tokio = { version = "1.28.1", features = ["full"] } @@ -51,3 +53,4 @@ local-ip-address = "0.5.3" rocksdb = "0.21.0" num_cpus = "1.0" chrono = { version = "0.4.26", features = ["serde"] } +dragonfly-api = "1.9.8" diff --git a/src/config/mod.rs b/src/config/mod.rs index b33ac282..bca4e016 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -25,7 +25,7 @@ pub mod dfstore; pub const SERVICE_NAME: &str = "dragonfly"; // NAME is the name of the package. -pub const NAME: &str = env!("CARGO_PKG_NAME"); +pub const NAME: &str = "client"; // default_root_dir is the default root directory for client. pub fn default_root_dir() -> PathBuf { diff --git a/src/grpc/health.rs b/src/grpc/health.rs new file mode 100644 index 00000000..ad084213 --- /dev/null +++ b/src/grpc/health.rs @@ -0,0 +1,51 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::net::SocketAddr; +use tonic::transport::Channel; +use tonic_health::pb::{ + health_client::HealthClient as HealthGRPCClient, HealthCheckRequest, HealthCheckResponse, +}; + +// HealthClient is a wrapper of HealthGRPCClient. +pub struct HealthClient { + // client is the grpc client of the certificate. + pub client: HealthGRPCClient, +} + +// HealthClient implements the grpc client of the health. +impl HealthClient { + // new creates a new HealthClient. + pub async fn new(addr: &SocketAddr) -> super::Result { + let conn = tonic::transport::Endpoint::new(addr.to_string())? + .connect() + .await?; + let client = HealthGRPCClient::new(conn); + Ok(Self { client }) + } + + // check checks the health of the server. + pub async fn check( + &mut self, + request: HealthCheckRequest, + ) -> super::Result { + let mut request = tonic::Request::new(request); + request.set_timeout(super::REQUEST_TIMEOUT); + + let response = self.client.check(request).await?; + Ok(response.into_inner()) + } +} diff --git a/src/grpc/manager.rs b/src/grpc/manager.rs new file mode 100644 index 00000000..e3c12a36 --- /dev/null +++ b/src/grpc/manager.rs @@ -0,0 +1,76 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use dragonfly_api::manager::{ + manager_client::ManagerClient as ManagerGRPCClient, GetObjectStorageRequest, + ListSchedulersRequest, ListSchedulersResponse, ObjectStorage, SeedPeer, UpdateSeedPeerRequest, +}; +use std::net::SocketAddr; +use tonic::transport::Channel; + +// ManagerClient is a wrapper of ManagerGRPCClient. +pub struct ManagerClient { + // client is the grpc client of the manager. + pub client: ManagerGRPCClient, +} + +// ManagerClient implements the grpc client of the manager. +impl ManagerClient { + // new creates a new ManagerClient. + pub async fn new(addr: &SocketAddr) -> super::Result { + let conn = tonic::transport::Endpoint::new(addr.to_string())? + .connect() + .await?; + let client = ManagerGRPCClient::new(conn); + Ok(Self { client }) + } + + // list_schedulers lists all schedulers that best match the client. + pub async fn list_schedulers( + &mut self, + request: ListSchedulersRequest, + ) -> super::Result { + let mut request = tonic::Request::new(request); + request.set_timeout(super::REQUEST_TIMEOUT); + + let response = self.client.list_schedulers(request).await?; + Ok(response.into_inner()) + } + + // get_object_storage provides the object storage information. + pub async fn get_object_storage( + &mut self, + request: GetObjectStorageRequest, + ) -> super::Result { + let mut request = tonic::Request::new(request); + request.set_timeout(super::REQUEST_TIMEOUT); + + let response = self.client.get_object_storage(request).await?; + Ok(response.into_inner()) + } + + // update_seed_peer updates the seed peer information. + pub async fn update_seed_peer( + &mut self, + request: UpdateSeedPeerRequest, + ) -> super::Result { + let mut request = tonic::Request::new(request); + request.set_timeout(super::REQUEST_TIMEOUT); + + let response = self.client.update_seed_peer(request).await?; + Ok(response.into_inner()) + } +} diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs new file mode 100644 index 00000000..3af7f88d --- /dev/null +++ b/src/grpc/mod.rs @@ -0,0 +1,40 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::time::Duration; + +pub mod health; +pub mod manager; +pub mod scheduler; +pub mod security; + +// REQUEST_TIMEOUT is the timeout for GRPC requests. +const REQUEST_TIMEOUT: Duration = Duration::from_secs(30); + +// Error is the error for GRPC. +#[derive(Debug, thiserror::Error)] +pub enum Error { + // TonicTransport is the error for tonic transport. + #[error(transparent)] + TonicTransport(#[from] tonic::transport::Error), + + // TonicStatus is the error for tonic status. + #[error(transparent)] + TonicStatus(#[from] tonic::Status), +} + +// Result is the result for GRPC. +pub type Result = std::result::Result; diff --git a/src/grpc/scheduler.rs b/src/grpc/scheduler.rs new file mode 100644 index 00000000..629263e4 --- /dev/null +++ b/src/grpc/scheduler.rs @@ -0,0 +1,111 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use dragonfly_api::common::{Peer, Task}; +use dragonfly_api::scheduler::{ + scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceHostRequest, + ExchangePeerRequest, ExchangePeerResponse, LeaveHostRequest, LeavePeerRequest, StatPeerRequest, + StatTaskRequest, +}; +use std::net::SocketAddr; +use tonic::transport::Channel; + +// SchedulerClient is a wrapper of SchedulerGRPCClient. +pub struct SchedulerClient { + // client is the grpc client of the scehduler. + pub client: SchedulerGRPCClient, +} + +// SchedulerClient implements the grpc client of the scheduler. +impl SchedulerClient { + // new creates a new SchedulerClient. + pub async fn new(addr: &SocketAddr) -> super::Result { + let conn = tonic::transport::Endpoint::new(addr.to_string())? + .connect() + .await?; + let client = SchedulerGRPCClient::new(conn); + Ok(Self { client }) + } + + // TODO Implement the announce_peer method. + // list_schedulers lists all schedulers that best match the client. + pub async fn announce_peer(&mut self) -> super::Result<()> { + Ok(()) + } + + // stat_peer gets the status of the peer. + pub async fn stat_peer(&mut self, request: StatPeerRequest) -> super::Result { + let mut request = tonic::Request::new(request); + request.set_timeout(super::REQUEST_TIMEOUT); + + let response = self.client.stat_peer(request).await?; + Ok(response.into_inner()) + } + + // leave_peer tells the scheduler that the peer is leaving. + pub async fn leave_peer(&mut self, request: LeavePeerRequest) -> super::Result<()> { + let mut request = tonic::Request::new(request); + request.set_timeout(super::REQUEST_TIMEOUT); + + self.client.leave_peer(request).await?; + Ok(()) + } + + // exchange_peer exchanges the peer with the scheduler. + pub async fn exchange_peer( + &mut self, + request: ExchangePeerRequest, + ) -> super::Result { + let mut request = tonic::Request::new(request); + request.set_timeout(super::REQUEST_TIMEOUT); + + let response = self.client.exchange_peer(request).await?; + Ok(response.into_inner()) + } + + // stat_task gets the status of the task. + pub async fn stat_task(&mut self, request: StatTaskRequest) -> super::Result { + let mut request = tonic::Request::new(request); + request.set_timeout(super::REQUEST_TIMEOUT); + + let response = self.client.stat_task(request).await?; + Ok(response.into_inner()) + } + + // announce_host announces the host to the scheduler. + pub async fn announce_host(&mut self, request: AnnounceHostRequest) -> super::Result<()> { + let mut request = tonic::Request::new(request); + request.set_timeout(super::REQUEST_TIMEOUT); + + self.client.announce_host(request).await?; + Ok(()) + } + + // leave_host tells the scheduler that the host is leaving. + pub async fn leave_host(&mut self, request: LeaveHostRequest) -> super::Result<()> { + let mut request = tonic::Request::new(request); + request.set_timeout(super::REQUEST_TIMEOUT); + + self.client.leave_host(request).await?; + Ok(()) + } + + // TODO Implement the sync_probes method. + // sync_probes syncs the probes to the scheduler. + pub async fn sync_probes(self) -> super::Result<()> { + Ok(()) + } +} diff --git a/src/grpc/security.rs b/src/grpc/security.rs new file mode 100644 index 00000000..7af4a3ec --- /dev/null +++ b/src/grpc/security.rs @@ -0,0 +1,52 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use dragonfly_api::security::{ + certificate_client::CertificateClient as CertificateGRPCClient, CertificateRequest, + CertificateResponse, +}; +use std::net::SocketAddr; +use tonic::transport::Channel; + +// CertificateClient is a wrapper of CertificateGRPCClient. +pub struct CertificateClient { + // client is the grpc client of the certificate. + pub client: CertificateGRPCClient, +} + +// CertificateClient implements the grpc client of the certificate. +impl CertificateClient { + // new creates a new CertificateClient. + pub async fn new(addr: &SocketAddr) -> super::Result { + let conn = tonic::transport::Endpoint::new(addr.to_string())? + .connect() + .await?; + let client = CertificateGRPCClient::new(conn); + Ok(Self { client }) + } + + // issue_certificate issues a certificate for the peer. + pub async fn issue_certificate( + &mut self, + request: CertificateRequest, + ) -> super::Result { + let mut request = tonic::Request::new(request); + request.set_timeout(super::REQUEST_TIMEOUT); + + let response = self.client.issue_certificate(request).await?; + Ok(response.into_inner()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 99380790..b180a2a8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,7 @@ pub mod announcer; pub mod backend; pub mod config; pub mod downloader; +pub mod grpc; pub mod health; pub mod metrics; pub mod proxy;