feat: add grpc client implements (#56)

This commit is contained in:
Gaius 2023-07-18 22:36:08 +08:00 committed by GitHub
parent 000de0ef2b
commit b22f153532
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 344 additions and 1 deletions

View File

@ -19,6 +19,9 @@ jobs:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v3 uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
- name: Install stable toolchain - name: Install stable toolchain
uses: actions-rs/toolchain@v1 uses: actions-rs/toolchain@v1
with: with:
@ -39,6 +42,9 @@ jobs:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v3 uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
- name: Install cargo-llvm-cov - name: Install cargo-llvm-cov
uses: taiki-e/install-action@v2 uses: taiki-e/install-action@v2
with: with:

View File

@ -15,6 +15,9 @@ jobs:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v3 uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
- name: Install stable toolchain - name: Install stable toolchain
uses: actions-rs/toolchain@v1 uses: actions-rs/toolchain@v1
with: with:

View File

@ -41,6 +41,8 @@ opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"] }
lazy_static = "1.4" lazy_static = "1.4"
prometheus = "0.13.3" prometheus = "0.13.3"
warp = "0.3.5" warp = "0.3.5"
tonic = "0.9.2"
tonic-health = "0.9.2"
reqwest = { version = "0.11.18", features = ["stream"] } reqwest = { version = "0.11.18", features = ["stream"] }
futures = "0.3.28" futures = "0.3.28"
tokio = { version = "1.28.1", features = ["full"] } tokio = { version = "1.28.1", features = ["full"] }
@ -51,3 +53,4 @@ local-ip-address = "0.5.3"
rocksdb = "0.21.0" rocksdb = "0.21.0"
num_cpus = "1.0" num_cpus = "1.0"
chrono = { version = "0.4.26", features = ["serde"] } chrono = { version = "0.4.26", features = ["serde"] }
dragonfly-api = "1.9.8"

View File

@ -25,7 +25,7 @@ pub mod dfstore;
pub const SERVICE_NAME: &str = "dragonfly"; pub const SERVICE_NAME: &str = "dragonfly";
// NAME is the name of the package. // NAME is the name of the package.
pub const NAME: &str = env!("CARGO_PKG_NAME"); pub const NAME: &str = "client";
// default_root_dir is the default root directory for client. // default_root_dir is the default root directory for client.
pub fn default_root_dir() -> PathBuf { pub fn default_root_dir() -> PathBuf {

51
src/grpc/health.rs Normal file
View File

@ -0,0 +1,51 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::net::SocketAddr;
use tonic::transport::Channel;
use tonic_health::pb::{
health_client::HealthClient as HealthGRPCClient, HealthCheckRequest, HealthCheckResponse,
};
// HealthClient is a wrapper of HealthGRPCClient.
pub struct HealthClient {
// client is the grpc client of the certificate.
pub client: HealthGRPCClient<Channel>,
}
// HealthClient implements the grpc client of the health.
impl HealthClient {
// new creates a new HealthClient.
pub async fn new(addr: &SocketAddr) -> super::Result<Self> {
let conn = tonic::transport::Endpoint::new(addr.to_string())?
.connect()
.await?;
let client = HealthGRPCClient::new(conn);
Ok(Self { client })
}
// check checks the health of the server.
pub async fn check(
&mut self,
request: HealthCheckRequest,
) -> super::Result<HealthCheckResponse> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT);
let response = self.client.check(request).await?;
Ok(response.into_inner())
}
}

76
src/grpc/manager.rs Normal file
View File

@ -0,0 +1,76 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use dragonfly_api::manager::{
manager_client::ManagerClient as ManagerGRPCClient, GetObjectStorageRequest,
ListSchedulersRequest, ListSchedulersResponse, ObjectStorage, SeedPeer, UpdateSeedPeerRequest,
};
use std::net::SocketAddr;
use tonic::transport::Channel;
// ManagerClient is a wrapper of ManagerGRPCClient.
pub struct ManagerClient {
// client is the grpc client of the manager.
pub client: ManagerGRPCClient<Channel>,
}
// ManagerClient implements the grpc client of the manager.
impl ManagerClient {
// new creates a new ManagerClient.
pub async fn new(addr: &SocketAddr) -> super::Result<Self> {
let conn = tonic::transport::Endpoint::new(addr.to_string())?
.connect()
.await?;
let client = ManagerGRPCClient::new(conn);
Ok(Self { client })
}
// list_schedulers lists all schedulers that best match the client.
pub async fn list_schedulers(
&mut self,
request: ListSchedulersRequest,
) -> super::Result<ListSchedulersResponse> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT);
let response = self.client.list_schedulers(request).await?;
Ok(response.into_inner())
}
// get_object_storage provides the object storage information.
pub async fn get_object_storage(
&mut self,
request: GetObjectStorageRequest,
) -> super::Result<ObjectStorage> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT);
let response = self.client.get_object_storage(request).await?;
Ok(response.into_inner())
}
// update_seed_peer updates the seed peer information.
pub async fn update_seed_peer(
&mut self,
request: UpdateSeedPeerRequest,
) -> super::Result<SeedPeer> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT);
let response = self.client.update_seed_peer(request).await?;
Ok(response.into_inner())
}
}

40
src/grpc/mod.rs Normal file
View File

@ -0,0 +1,40 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::time::Duration;
pub mod health;
pub mod manager;
pub mod scheduler;
pub mod security;
// REQUEST_TIMEOUT is the timeout for GRPC requests.
const REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
// Error is the error for GRPC.
#[derive(Debug, thiserror::Error)]
pub enum Error {
// TonicTransport is the error for tonic transport.
#[error(transparent)]
TonicTransport(#[from] tonic::transport::Error),
// TonicStatus is the error for tonic status.
#[error(transparent)]
TonicStatus(#[from] tonic::Status),
}
// Result is the result for GRPC.
pub type Result<T> = std::result::Result<T, Error>;

111
src/grpc/scheduler.rs Normal file
View File

@ -0,0 +1,111 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use dragonfly_api::common::{Peer, Task};
use dragonfly_api::scheduler::{
scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceHostRequest,
ExchangePeerRequest, ExchangePeerResponse, LeaveHostRequest, LeavePeerRequest, StatPeerRequest,
StatTaskRequest,
};
use std::net::SocketAddr;
use tonic::transport::Channel;
// SchedulerClient is a wrapper of SchedulerGRPCClient.
pub struct SchedulerClient {
// client is the grpc client of the scehduler.
pub client: SchedulerGRPCClient<Channel>,
}
// SchedulerClient implements the grpc client of the scheduler.
impl SchedulerClient {
// new creates a new SchedulerClient.
pub async fn new(addr: &SocketAddr) -> super::Result<Self> {
let conn = tonic::transport::Endpoint::new(addr.to_string())?
.connect()
.await?;
let client = SchedulerGRPCClient::new(conn);
Ok(Self { client })
}
// TODO Implement the announce_peer method.
// list_schedulers lists all schedulers that best match the client.
pub async fn announce_peer(&mut self) -> super::Result<()> {
Ok(())
}
// stat_peer gets the status of the peer.
pub async fn stat_peer(&mut self, request: StatPeerRequest) -> super::Result<Peer> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT);
let response = self.client.stat_peer(request).await?;
Ok(response.into_inner())
}
// leave_peer tells the scheduler that the peer is leaving.
pub async fn leave_peer(&mut self, request: LeavePeerRequest) -> super::Result<()> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT);
self.client.leave_peer(request).await?;
Ok(())
}
// exchange_peer exchanges the peer with the scheduler.
pub async fn exchange_peer(
&mut self,
request: ExchangePeerRequest,
) -> super::Result<ExchangePeerResponse> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT);
let response = self.client.exchange_peer(request).await?;
Ok(response.into_inner())
}
// stat_task gets the status of the task.
pub async fn stat_task(&mut self, request: StatTaskRequest) -> super::Result<Task> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT);
let response = self.client.stat_task(request).await?;
Ok(response.into_inner())
}
// announce_host announces the host to the scheduler.
pub async fn announce_host(&mut self, request: AnnounceHostRequest) -> super::Result<()> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT);
self.client.announce_host(request).await?;
Ok(())
}
// leave_host tells the scheduler that the host is leaving.
pub async fn leave_host(&mut self, request: LeaveHostRequest) -> super::Result<()> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT);
self.client.leave_host(request).await?;
Ok(())
}
// TODO Implement the sync_probes method.
// sync_probes syncs the probes to the scheduler.
pub async fn sync_probes(self) -> super::Result<()> {
Ok(())
}
}

52
src/grpc/security.rs Normal file
View File

@ -0,0 +1,52 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use dragonfly_api::security::{
certificate_client::CertificateClient as CertificateGRPCClient, CertificateRequest,
CertificateResponse,
};
use std::net::SocketAddr;
use tonic::transport::Channel;
// CertificateClient is a wrapper of CertificateGRPCClient.
pub struct CertificateClient {
// client is the grpc client of the certificate.
pub client: CertificateGRPCClient<Channel>,
}
// CertificateClient implements the grpc client of the certificate.
impl CertificateClient {
// new creates a new CertificateClient.
pub async fn new(addr: &SocketAddr) -> super::Result<Self> {
let conn = tonic::transport::Endpoint::new(addr.to_string())?
.connect()
.await?;
let client = CertificateGRPCClient::new(conn);
Ok(Self { client })
}
// issue_certificate issues a certificate for the peer.
pub async fn issue_certificate(
&mut self,
request: CertificateRequest,
) -> super::Result<CertificateResponse> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT);
let response = self.client.issue_certificate(request).await?;
Ok(response.into_inner())
}
}

View File

@ -18,6 +18,7 @@ pub mod announcer;
pub mod backend; pub mod backend;
pub mod config; pub mod config;
pub mod downloader; pub mod downloader;
pub mod grpc;
pub mod health; pub mod health;
pub mod metrics; pub mod metrics;
pub mod proxy; pub mod proxy;