feat: tikv/grpc-rs replace tonic
This commit is contained in:
parent
52c1e6f33a
commit
01a877808e
|
|
@ -11,3 +11,7 @@ Cargo.lock
|
|||
|
||||
# IDE
|
||||
.idea
|
||||
|
||||
# Customize
|
||||
### google.protobuf.rs build by prost-build, exclude it because no content.
|
||||
**/google.protobuf.rs
|
||||
|
|
|
|||
10
Cargo.toml
10
Cargo.toml
|
|
@ -28,10 +28,9 @@ thiserror = "1.0"
|
|||
tokio = { version = "1.21", features = ["full"] }
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
futures = "0.3"
|
||||
h2 = "0.3"
|
||||
tonic = "0.8"
|
||||
prost = "0.11"
|
||||
prost-types = "0.11"
|
||||
grpcio = { version = "0.10", default-features = false, features = ["prost-codec"] }
|
||||
prost = "0.9"
|
||||
prost-types = "0.9"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
lazy_static = "1.4"
|
||||
|
|
@ -39,7 +38,8 @@ lazy_static = "1.4"
|
|||
tracing = "0.1"
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "0.8"
|
||||
grpcio-compiler = { version = "0.10", default-features = false, features = ["prost-codec"] }
|
||||
prost-build = "0.9"
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
|
|
|||
|
|
@ -11,8 +11,7 @@ Nacos client in Rust
|
|||
在 nacos-sdk-rust 工程里,为主要功能的实现,将会引入以下依赖包。
|
||||
|
||||
- serde-rs/serde 一个超高性能的通用序列化/反序列化框架,可以跟多种协议的库联合使用,实现统一编解码格式;serde-rs/json 快到上天的 JSON 库,也是 Rust 事实上的标准 JSON
|
||||
- hyperium/tonic 纯 Rust 实现的 gRPC 客户端和服务器端,支持 async/await 异步调用,文档和示例较为清晰
|
||||
- tokio-rs/prost tokio 出品的 Protocol Buffers 工具,简单易用,文档详细
|
||||
- tikv/grpc-rs 一个 Rust 版的 gRPC 客户端和服务器端
|
||||
- tokio-rs/tokio 最火的异步网络库,除了复杂上手难度高一些外,没有其它大的问题。同时 tokio 团队提供了多个非常优秀的 Rust 库,整个生态欣欣向荣,用户认可度很高
|
||||
- tokio-rs/tracing 强大的日志框架,同时还支持 OpenTelemetry 格式,无缝打通未来的监控
|
||||
|
||||
|
|
@ -20,9 +19,9 @@ Nacos client in Rust
|
|||
|
||||
### 简要描述 client & server 的交互
|
||||
|
||||
请关注 `proto/nacos_grpc_service.proto` 并知晓经过 `tonic-build` 构建出客户端侧的 stub,实现同步调用 `service Request.request()`,流式交互 `service BiRequestStream.requestBiStream()`。
|
||||
请关注 `proto/nacos_grpc_service.proto` 并知晓构建出客户端侧的 stub,实现同步调用 `service Request.request()`,流式交互 `service BiRequestStream.requestBiStream()`。
|
||||
|
||||
`tonic` 创建与 Nacos-server 的 gRPC 双工长链接,`serde/json` 适配与 server 的交互序列化;
|
||||
`tikv/grpc-rs` 创建与 Nacos-server 的 gRPC 双工长链接,`serde/json` 适配与 server 的交互序列化;
|
||||
|
||||
gRPC 交互的 Payload 和 Metadata 由 `Protocol Buffers` 序列化,具体的 Request/Response 实体 json 格式二进制数据维护于 Payload.body,类型名字符串维护于 Metadata.type (为 java 全类名)。
|
||||
|
||||
|
|
|
|||
13
build.rs
13
build.rs
|
|
@ -14,13 +14,12 @@
|
|||
// limitations under the License.
|
||||
//
|
||||
|
||||
// use std::env;
|
||||
|
||||
fn main() -> Result<(), std::io::Error> {
|
||||
// env::set_var("OUT_DIR", "src");
|
||||
tonic_build::configure()
|
||||
.build_server(false)
|
||||
.disable_package_emission()
|
||||
.compile(&["./proto/nacos_grpc_service.proto"], &["./proto"])?;
|
||||
grpcio_compiler::prost_codegen::compile_protos(
|
||||
&["./proto/nacos_grpc_service.proto"],
|
||||
&["./proto"],
|
||||
"src",
|
||||
)
|
||||
.unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,9 +17,6 @@
|
|||
|
||||
syntax = "proto3";
|
||||
|
||||
// package for rust
|
||||
package nacos.v2;
|
||||
|
||||
import "google/protobuf/any.proto";
|
||||
import "google/protobuf/timestamp.proto";
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,54 @@
|
|||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct Metadata {
|
||||
#[prost(string, tag="3")]
|
||||
pub r#type: ::prost::alloc::string::String,
|
||||
#[prost(string, tag="8")]
|
||||
pub client_ip: ::prost::alloc::string::String,
|
||||
#[prost(map="string, string", tag="7")]
|
||||
pub headers: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
|
||||
}
|
||||
#[derive(Clone, PartialEq, ::prost::Message)]
|
||||
pub struct Payload {
|
||||
#[prost(message, optional, tag="2")]
|
||||
pub metadata: ::core::option::Option<Metadata>,
|
||||
#[prost(message, optional, tag="3")]
|
||||
pub body: ::core::option::Option<::prost_types::Any>,
|
||||
}
|
||||
const METHOD_REQUEST_REQUEST: ::grpcio::Method<Payload, Payload> = ::grpcio::Method{ty: ::grpcio::MethodType::Unary, name: "/Request/request", req_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, };
|
||||
#[derive(Clone)]
|
||||
pub struct RequestClient { client: ::grpcio::Client }
|
||||
impl RequestClient {
|
||||
pub fn new(channel: ::grpcio::Channel) -> Self { RequestClient { client: ::grpcio::Client::new(channel) }}
|
||||
pub fn request_opt(&self, req: &Payload, opt: ::grpcio::CallOption) -> ::grpcio::Result<Payload,> { self.client.unary_call(&METHOD_REQUEST_REQUEST, req, opt) }
|
||||
pub fn request(&self, req: &Payload) -> ::grpcio::Result<Payload,> { self.request_opt(req, ::grpcio::CallOption::default()) }
|
||||
pub fn request_async_opt(&self, req: &Payload, opt: ::grpcio::CallOption) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver<Payload>,> { self.client.unary_call_async(&METHOD_REQUEST_REQUEST, req, opt) }
|
||||
pub fn request_async(&self, req: &Payload) -> ::grpcio::Result<::grpcio::ClientUnaryReceiver<Payload>,> { self.request_async_opt(req, ::grpcio::CallOption::default()) }
|
||||
pub fn spawn<F>(&self, f: F) where F: ::std::future::Future<Output = ()> + Send + 'static {self.client.spawn(f)}
|
||||
}
|
||||
pub trait Request {
|
||||
fn request(&mut self, ctx: ::grpcio::RpcContext, _req: Payload, sink: ::grpcio::UnarySink<Payload>) { grpcio::unimplemented_call!(ctx, sink) }
|
||||
}
|
||||
pub fn create_request<S: Request + Send + Clone + 'static>(s: S) -> ::grpcio::Service {
|
||||
let mut builder = ::grpcio::ServiceBuilder::new();
|
||||
let mut instance = s;
|
||||
builder = builder.add_unary_handler(&METHOD_REQUEST_REQUEST, move |ctx, req, resp| instance.request(ctx, req, resp));
|
||||
builder.build()
|
||||
}
|
||||
const METHOD_BI_REQUEST_STREAM_REQUEST_BI_STREAM: ::grpcio::Method<Payload, Payload> = ::grpcio::Method{ty: ::grpcio::MethodType::Duplex, name: "/BiRequestStream/requestBiStream", req_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, resp_mar: ::grpcio::Marshaller { ser: ::grpcio::pr_ser, de: ::grpcio::pr_de }, };
|
||||
#[derive(Clone)]
|
||||
pub struct BiRequestStreamClient { client: ::grpcio::Client }
|
||||
impl BiRequestStreamClient {
|
||||
pub fn new(channel: ::grpcio::Channel) -> Self { BiRequestStreamClient { client: ::grpcio::Client::new(channel) }}
|
||||
pub fn request_bi_stream_opt(&self, opt: ::grpcio::CallOption) -> ::grpcio::Result<(::grpcio::ClientDuplexSender<Payload>,::grpcio::ClientDuplexReceiver<Payload>,)> { self.client.duplex_streaming(&METHOD_BI_REQUEST_STREAM_REQUEST_BI_STREAM, opt) }
|
||||
pub fn request_bi_stream(&self) -> ::grpcio::Result<(::grpcio::ClientDuplexSender<Payload>,::grpcio::ClientDuplexReceiver<Payload>,)> { self.request_bi_stream_opt(::grpcio::CallOption::default()) }
|
||||
pub fn spawn<F>(&self, f: F) where F: ::std::future::Future<Output = ()> + Send + 'static {self.client.spawn(f)}
|
||||
}
|
||||
pub trait BiRequestStream {
|
||||
fn request_bi_stream(&mut self, ctx: ::grpcio::RpcContext, _stream: ::grpcio::RequestStream<Payload>, sink: ::grpcio::DuplexSink<Payload>) { grpcio::unimplemented_call!(ctx, sink) }
|
||||
}
|
||||
pub fn create_bi_request_stream<S: BiRequestStream + Send + Clone + 'static>(s: S) -> ::grpcio::Service {
|
||||
let mut builder = ::grpcio::ServiceBuilder::new();
|
||||
let mut instance = s;
|
||||
builder = builder.add_duplex_streaming_handler(&METHOD_BI_REQUEST_STREAM_REQUEST_BI_STREAM, move |ctx, req, resp| instance.request_bi_stream(ctx, req, resp));
|
||||
builder.build()
|
||||
}
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
pub const KEY_SERVER_ADDR: &'static str = "server_addr";
|
||||
|
||||
pub const DEFAULT_SERVER_ADDR: &'static str = "http://0.0.0.0:9848";
|
||||
pub const DEFAULT_SERVER_ADDR: &'static str = "0.0.0.0:9848";
|
||||
|
||||
// label AppName
|
||||
pub const KEY_LABEL_APP_NAME: &'static str = "AppName";
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@
|
|||
// limitations under the License.
|
||||
//
|
||||
/// error learn from Skywalking Rust.
|
||||
use tokio::{sync::oneshot, task::JoinError};
|
||||
|
||||
/// Nacos Sdk Rust Result.
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
|
@ -31,15 +30,16 @@ pub enum Error {
|
|||
#[error("remote client shutdown failed: {0}")]
|
||||
ClientShutdown(String),
|
||||
|
||||
/*
|
||||
#[error("tonic transport failed: {0}")]
|
||||
TonicTransport(#[from] tonic::transport::Error),
|
||||
|
||||
#[error("tonic status: {0}")]
|
||||
TonicStatus(#[from] tonic::Status),
|
||||
|
||||
*/
|
||||
#[error("tokio task join failed: {0}")]
|
||||
TokioJoin(#[from] JoinError),
|
||||
TokioJoin(#[from] tokio::task::JoinError),
|
||||
|
||||
#[error("tokio oneshot receive failed: {0}")]
|
||||
TokioOneshotRecv(#[from] oneshot::error::RecvError),
|
||||
TokioOneshotRecv(#[from] tokio::sync::oneshot::error::RecvError),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,13 +1,11 @@
|
|||
/**
|
||||
* Learn from https://github.com/tokio-rs/console/blob/main/tokio-console/src/conn.rs
|
||||
*/
|
||||
use std::{error::Error, pin::Pin, time::Duration};
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tonic::{transport::Channel, Streaming};
|
||||
|
||||
use futures::stream::StreamExt;
|
||||
use futures::SinkExt;
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{error::Error, pin::Pin, time::Duration};
|
||||
|
||||
use crate::api::client_config::ClientConfig;
|
||||
use crate::common::remote::request::client_request::{
|
||||
|
|
@ -16,11 +14,9 @@ use crate::common::remote::request::client_request::{
|
|||
use crate::common::remote::request::Request;
|
||||
use crate::common::remote::response::Response;
|
||||
use crate::common::util::*;
|
||||
use crate::nacos_proto::v2::bi_request_stream_client::BiRequestStreamClient;
|
||||
use crate::nacos_proto::v2::request_client::RequestClient;
|
||||
use crate::nacos_proto::v2::Payload;
|
||||
use crate::nacos_proto::v2::{BiRequestStreamClient, Payload, RequestClient};
|
||||
|
||||
#[derive(Debug)]
|
||||
// #[derive(Debug)]
|
||||
pub struct Connection {
|
||||
client_config: ClientConfig,
|
||||
state: State,
|
||||
|
|
@ -32,14 +28,14 @@ pub struct Connection {
|
|||
// stream just adds a heap pointer dereference, slightly penalizing polling
|
||||
// the stream in most cases. so, don't listen to clippy on this.
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Debug)]
|
||||
// #[derive(Debug)]
|
||||
enum State {
|
||||
Connected {
|
||||
conn_id: String,
|
||||
client: RequestClient<Channel>,
|
||||
bi_client: BiRequestStreamClient<Channel>,
|
||||
bi_sender: mpsc::UnboundedSender<Payload>,
|
||||
resp_bi_stream: Box<Streaming<Payload>>,
|
||||
client: RequestClient,
|
||||
bi_client: BiRequestStreamClient,
|
||||
bi_sender: Arc<Mutex<grpcio::ClientDuplexSender<Payload>>>,
|
||||
bi_receiver: Arc<Mutex<grpcio::ClientDuplexReceiver<Payload>>>,
|
||||
},
|
||||
Disconnected(Duration),
|
||||
}
|
||||
|
|
@ -102,36 +98,36 @@ impl Connection {
|
|||
let tenant = self.client_config.namespace.clone();
|
||||
let labels = self.client_config.labels.clone();
|
||||
|
||||
let endpoint = tonic::transport::Endpoint::new(target)?;
|
||||
let channel = endpoint.connect().await?;
|
||||
let env = Arc::new(grpcio::Environment::new(2));
|
||||
let channel = grpcio::ChannelBuilder::new(env).connect(target.as_str());
|
||||
|
||||
let client = RequestClient::new(channel.clone());
|
||||
|
||||
let mut client = RequestClient::new(channel.clone());
|
||||
let req_payload =
|
||||
payload_helper::build_req_grpc_payload(ServerCheckClientRequest::new());
|
||||
let resp_payload = client.request(tonic::Request::new(req_payload)).await?;
|
||||
let resp_payload = client.request(&req_payload);
|
||||
let server_check_response =
|
||||
payload_helper::build_server_response(resp_payload.into_inner()).unwrap();
|
||||
payload_helper::build_server_response(resp_payload.unwrap()).unwrap();
|
||||
let conn_id = server_check_response.get_connection_id();
|
||||
|
||||
let mut bi_client = BiRequestStreamClient::new(channel.clone());
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
let bi_client = BiRequestStreamClient::new(channel.clone());
|
||||
let (mut client_sender, client_receiver) = bi_client.request_bi_stream().unwrap();
|
||||
// send a ConnectionSetupClientRequest
|
||||
tx.send(payload_helper::build_req_grpc_payload(
|
||||
ConnectionSetupClientRequest::new(tenant, labels),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let resp_bi_stream = bi_client
|
||||
.request_bi_stream(UnboundedReceiverStream::from(rx))
|
||||
.await?
|
||||
.into_inner();
|
||||
client_sender
|
||||
.send((
|
||||
payload_helper::build_req_grpc_payload(ConnectionSetupClientRequest::new(
|
||||
tenant, labels,
|
||||
)),
|
||||
grpcio::WriteFlags::default(),
|
||||
))
|
||||
.await?;
|
||||
|
||||
Ok::<State, Box<dyn Error + Send + Sync>>(State::Connected {
|
||||
conn_id: String::from(conn_id.unwrap()),
|
||||
client,
|
||||
bi_client,
|
||||
bi_sender: tx,
|
||||
resp_bi_stream: Box::new(resp_bi_stream),
|
||||
bi_sender: Arc::new(Mutex::new(client_sender)),
|
||||
bi_receiver: Arc::new(Mutex::new(client_receiver)),
|
||||
})
|
||||
};
|
||||
self.state = match try_connect.await {
|
||||
|
|
@ -154,9 +150,12 @@ impl Connection {
|
|||
loop {
|
||||
match self.state {
|
||||
State::Connected {
|
||||
ref mut resp_bi_stream,
|
||||
ref mut bi_receiver,
|
||||
..
|
||||
} => match Pin::new(resp_bi_stream).next().await {
|
||||
} => match Pin::new(bi_receiver.to_owned().lock().unwrap())
|
||||
.next()
|
||||
.await
|
||||
{
|
||||
Some(Ok(payload)) => return payload,
|
||||
Some(Err(status)) => {
|
||||
println!("error from stream {}", status);
|
||||
|
|
@ -184,8 +183,8 @@ impl Connection {
|
|||
match self.state {
|
||||
State::Connected { ref mut client, .. } => {
|
||||
let req_payload = payload_helper::build_req_grpc_payload(req);
|
||||
let resp_payload = client.request(tonic::Request::new(req_payload)).await?;
|
||||
payload_helper::build_server_response(resp_payload.into_inner())
|
||||
let resp_payload = client.request(&req_payload);
|
||||
payload_helper::build_server_response(resp_payload.unwrap())
|
||||
}
|
||||
State::Disconnected(_) => {
|
||||
self.connect().await;
|
||||
|
|
@ -196,12 +195,19 @@ impl Connection {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn send_resp(&mut self, resp: impl Response + serde::Serialize) -> () {
|
||||
pub(crate) async fn send_resp(&mut self, resp: impl Response + serde::Serialize) {
|
||||
match self.state {
|
||||
State::Connected {
|
||||
ref mut bi_sender, ..
|
||||
} => bi_sender
|
||||
.send(payload_helper::build_resp_grpc_payload(resp))
|
||||
.to_owned()
|
||||
.lock()
|
||||
.unwrap()
|
||||
.send((
|
||||
payload_helper::build_resp_grpc_payload(resp),
|
||||
grpcio::WriteFlags::default(),
|
||||
))
|
||||
.await
|
||||
.unwrap(),
|
||||
State::Disconnected(_) => self.connect().await,
|
||||
}
|
||||
|
|
@ -219,20 +225,17 @@ mod tests {
|
|||
tracing_subscriber::fmt::init();
|
||||
println!("test_remote_connect");
|
||||
let mut remote_connect =
|
||||
Connection::new(ClientConfig::new().server_addr("http://0.0.0.0:9848".to_string()));
|
||||
Connection::new(ClientConfig::new().server_addr("0.0.0.0:9848".to_string()));
|
||||
remote_connect.connect().await;
|
||||
println!("{:?}", remote_connect.state)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_next_payload() {
|
||||
println!("test_next_payload");
|
||||
let mut remote_connect =
|
||||
Connection::new(ClientConfig::new().server_addr("http://0.0.0.0:9848".to_string()));
|
||||
Connection::new(ClientConfig::new().server_addr("0.0.0.0:9848".to_string()));
|
||||
let payload = remote_connect.next_payload().await;
|
||||
let payload = remote_connect.next_payload().await;
|
||||
// let server_req = payload_helper::build_server_request(payload).unwrap();
|
||||
println!("{:?}", remote_connect.state);
|
||||
let server_req = payload_helper::build_server_request(payload).unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
|||
|
|
@ -3,9 +3,4 @@ pub(crate) mod remote_client;
|
|||
pub(crate) mod request;
|
||||
pub(crate) mod response;
|
||||
|
||||
#[tonic::async_trait]
|
||||
pub(crate) trait RemoteClient {}
|
||||
|
||||
pub(crate) trait PayloadConverter {
|
||||
fn convert_to_grpc_payload() -> crate::nacos_proto::v2::Payload;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,164 +1,28 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use tonic::{
|
||||
async_trait,
|
||||
transport::{Channel, Endpoint},
|
||||
};
|
||||
|
||||
use crate::api::client_config::ClientConfig;
|
||||
use crate::api::constants::DEFAULT_SERVER_ADDR;
|
||||
use crate::api::error::Error;
|
||||
use crate::common::remote::request::client_request::{
|
||||
ConnectionSetupClientRequest, ServerCheckClientRequest,
|
||||
};
|
||||
use crate::common::util::*;
|
||||
use crate::nacos_proto::v2::bi_request_stream_client::BiRequestStreamClient;
|
||||
use crate::nacos_proto::v2::request_client::RequestClient;
|
||||
use crate::nacos_proto::v2::Payload;
|
||||
|
||||
use super::RemoteClient;
|
||||
|
||||
pub(crate) struct GrpcRemoteClient {
|
||||
client_name: Option<String>,
|
||||
tenant: String,
|
||||
labels: HashMap<String, String>,
|
||||
client: RequestClient<Channel>,
|
||||
bi_client: BiRequestStreamClient<Channel>,
|
||||
resp_bi_stream: Arc<Mutex<tonic::codec::Streaming<Payload>>>,
|
||||
client_config: ClientConfig,
|
||||
}
|
||||
|
||||
impl GrpcRemoteClient {
|
||||
/// Sets the request_client against.
|
||||
pub fn request_client(self, client: RequestClient<Channel>) -> Self {
|
||||
GrpcRemoteClient { client, ..self }
|
||||
}
|
||||
|
||||
/// Sets the request_client against.
|
||||
pub fn bi_client(self, bi_client: BiRequestStreamClient<Channel>) -> Self {
|
||||
GrpcRemoteClient { bi_client, ..self }
|
||||
}
|
||||
|
||||
/// Sets the resp_bi_stream against.
|
||||
/*pub fn resp_bi_stream(self, resp_bi_stream: Arc<tonic::codec::Streaming<Payload>>) -> Self {
|
||||
GrpcRemoteClient {
|
||||
resp_bi_stream,
|
||||
..self
|
||||
}
|
||||
}*/
|
||||
|
||||
pub async fn new(client_config: ClientConfig) -> crate::api::error::Result<Self> {
|
||||
let client_name = client_config.client_name;
|
||||
let tenant = client_config.namespace;
|
||||
let labels = client_config.labels;
|
||||
let address = client_config
|
||||
.server_addr
|
||||
.unwrap_or(String::from(DEFAULT_SERVER_ADDR));
|
||||
|
||||
let (client, bi_client, resp_bi_stream) =
|
||||
Self::connect_to_server(address.clone(), tenant.clone(), labels.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let resp_bi_stream = Arc::new(Mutex::new(resp_bi_stream));
|
||||
|
||||
let mut resp_bi_stream_clone = Arc::clone(&resp_bi_stream);
|
||||
/*let thread = tokio::spawn(async move {
|
||||
loop {
|
||||
while let Some(received) = resp_bi_stream_clone.next().await {
|
||||
let payload = received.unwrap();
|
||||
}
|
||||
}
|
||||
});*/
|
||||
// let threads = vec![thread];
|
||||
|
||||
let remote_client = Self {
|
||||
client_name,
|
||||
tenant,
|
||||
labels,
|
||||
client,
|
||||
bi_client,
|
||||
resp_bi_stream,
|
||||
};
|
||||
|
||||
Ok(remote_client)
|
||||
}
|
||||
|
||||
async fn connect_to_server(
|
||||
address: String,
|
||||
tenant: String,
|
||||
labels: HashMap<String, String>,
|
||||
) -> crate::api::error::Result<(
|
||||
RequestClient<Channel>,
|
||||
BiRequestStreamClient<Channel>,
|
||||
tonic::codec::Streaming<Payload>,
|
||||
)> {
|
||||
let endpoint = Endpoint::new(address)?;
|
||||
let channel = endpoint.connect().await?;
|
||||
let mut client = RequestClient::new(channel.clone());
|
||||
|
||||
let req_payload = payload_helper::build_req_grpc_payload(ServerCheckClientRequest::new());
|
||||
let resp_future = client.request(tonic::Request::new(req_payload));
|
||||
match resp_future.await {
|
||||
Ok(response) => {
|
||||
let resp_payload = response.into_inner();
|
||||
let server_check_response =
|
||||
payload_helper::build_server_response(resp_payload).unwrap();
|
||||
let conn_id = server_check_response.get_connection_id();
|
||||
let mut bi_client = BiRequestStreamClient::new(channel.clone());
|
||||
let mut resp_bi_stream = bi_client
|
||||
.request_bi_stream(Self::stream_once_connection_setup_request(
|
||||
ConnectionSetupClientRequest::new(tenant, labels),
|
||||
))
|
||||
.await
|
||||
.unwrap()
|
||||
.into_inner();
|
||||
Ok((client, bi_client, resp_bi_stream))
|
||||
}
|
||||
Err(e) => Err(Error::TonicStatus(e)),
|
||||
}
|
||||
}
|
||||
|
||||
fn stream_once_connection_setup_request(
|
||||
connection_setup_request: ConnectionSetupClientRequest,
|
||||
) -> impl tonic::codegen::futures_core::Stream<Item = Payload> {
|
||||
tokio_stream::once(payload_helper::build_req_grpc_payload(
|
||||
connection_setup_request,
|
||||
))
|
||||
pub fn new(client_config: ClientConfig) -> Self {
|
||||
Self { client_config }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RemoteClient for GrpcRemoteClient {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
use crate::api::client_config::ClientConfig;
|
||||
use crate::common::remote::remote_client::GrpcRemoteClient;
|
||||
use crate::common::remote::RemoteClient;
|
||||
use crate::common::util::payload_helper;
|
||||
|
||||
// #[tokio::test]
|
||||
async fn test_grpc_remote_client() {
|
||||
let mut remote_client = GrpcRemoteClient::new(
|
||||
ClientConfig::new().server_addr("http://0.0.0.0:9848".to_string()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// wait resp_bi_stream
|
||||
sleep(Duration::from_secs(2));
|
||||
|
||||
let mut resp_bi_stream_clone = remote_client.resp_bi_stream.clone();
|
||||
while let Some(received) = resp_bi_stream_clone.lock().unwrap().next().await {
|
||||
let payload = received.unwrap();
|
||||
let server_req = payload_helper::build_server_request(payload);
|
||||
}
|
||||
let remote_client =
|
||||
GrpcRemoteClient::new(ClientConfig::new().server_addr("0.0.0.0:9848".to_string()));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,8 +1,5 @@
|
|||
#![allow(non_snake_case)]
|
||||
use crate::common::remote::request::{
|
||||
generate_request_id, Request, TYPE_CONNECT_RESET_SERVER_REQUEST,
|
||||
TYPE_CONNECT_SETUP_SERVER_REQUEST, TYPE_SERVER_CHECK_CLIENT_REQUEST,
|
||||
};
|
||||
use crate::common::remote::request::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
|
|
@ -53,7 +50,7 @@ impl Request for ConnectionSetupClientRequest {
|
|||
&self.headers
|
||||
}
|
||||
fn get_type_url(&self) -> &String {
|
||||
&TYPE_CONNECT_SETUP_SERVER_REQUEST
|
||||
&TYPE_CONNECT_SETUP_CLIENT_REQUEST
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -73,35 +70,3 @@ impl ConnectionSetupClientRequest {
|
|||
ConnectionSetupClientRequest { labels, ..self }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub(crate) struct ConnectResetRequest {
|
||||
requestId: String,
|
||||
/// count be empty.
|
||||
headers: HashMap<String, String>,
|
||||
serverIp: String,
|
||||
serverPort: String,
|
||||
}
|
||||
|
||||
impl Request for ConnectResetRequest {
|
||||
fn get_request_id(&self) -> &String {
|
||||
&self.requestId
|
||||
}
|
||||
fn get_headers(&self) -> &HashMap<String, String> {
|
||||
&self.headers
|
||||
}
|
||||
fn get_type_url(&self) -> &String {
|
||||
&TYPE_CONNECT_RESET_SERVER_REQUEST
|
||||
}
|
||||
}
|
||||
|
||||
impl ConnectResetRequest {
|
||||
pub fn new(server_ip: String, server_port: String) -> Self {
|
||||
ConnectResetRequest {
|
||||
requestId: generate_request_id(),
|
||||
headers: HashMap::new(),
|
||||
serverIp: server_ip,
|
||||
serverPort: server_port,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ use std::collections::HashMap;
|
|||
use std::sync::atomic::{AtomicI64, Ordering};
|
||||
|
||||
pub(crate) mod client_request;
|
||||
pub(crate) mod server_request;
|
||||
|
||||
pub(crate) trait Request {
|
||||
fn get_request_id(&self) -> &String;
|
||||
|
|
@ -18,11 +19,14 @@ lazy_static! {
|
|||
pub static ref TYPE_SERVER_CHECK_CLIENT_REQUEST: String = String::from("ServerCheckRequest");
|
||||
|
||||
/// com.alibaba.nacos.api.remote.request.ConnectionSetupRequest
|
||||
pub static ref TYPE_CONNECT_SETUP_SERVER_REQUEST: String = String::from("ConnectionSetupRequest");
|
||||
pub static ref TYPE_CONNECT_SETUP_CLIENT_REQUEST: String = String::from("ConnectionSetupRequest");
|
||||
|
||||
/// com.alibaba.nacos.api.remote.request.ConnectResetRequest
|
||||
pub static ref TYPE_CONNECT_RESET_SERVER_REQUEST: String = String::from("ConnectResetRequest");
|
||||
|
||||
/// com.alibaba.nacos.api.remote.request.ClientDetectionRequest
|
||||
pub static ref TYPE_CLIENT_DETECTION_SERVER_REQUEST: String = String::from("ClientDetectionRequest");
|
||||
|
||||
}
|
||||
|
||||
// odd by client request id.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,64 @@
|
|||
#![allow(non_snake_case)]
|
||||
use crate::common::remote::request::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub(crate) struct ConnectResetServerRequest {
|
||||
requestId: String,
|
||||
/// count be empty.
|
||||
headers: HashMap<String, String>,
|
||||
serverIp: Option<String>,
|
||||
serverPort: Option<String>,
|
||||
}
|
||||
|
||||
impl Request for ConnectResetServerRequest {
|
||||
fn get_request_id(&self) -> &String {
|
||||
&self.requestId
|
||||
}
|
||||
fn get_headers(&self) -> &HashMap<String, String> {
|
||||
&self.headers
|
||||
}
|
||||
fn get_type_url(&self) -> &String {
|
||||
&TYPE_CONNECT_RESET_SERVER_REQUEST
|
||||
}
|
||||
}
|
||||
|
||||
impl ConnectResetServerRequest {
|
||||
pub fn new(server_ip: Option<String>, server_port: Option<String>) -> Self {
|
||||
ConnectResetServerRequest {
|
||||
requestId: generate_request_id(),
|
||||
headers: HashMap::new(),
|
||||
serverIp: server_ip,
|
||||
serverPort: server_port,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub(crate) struct ClientDetectionServerRequest {
|
||||
requestId: String,
|
||||
/// count be empty.
|
||||
headers: HashMap<String, String>,
|
||||
}
|
||||
|
||||
impl Request for ClientDetectionServerRequest {
|
||||
fn get_request_id(&self) -> &String {
|
||||
&self.requestId
|
||||
}
|
||||
fn get_headers(&self) -> &HashMap<String, String> {
|
||||
&self.headers
|
||||
}
|
||||
fn get_type_url(&self) -> &String {
|
||||
&TYPE_CLIENT_DETECTION_SERVER_REQUEST
|
||||
}
|
||||
}
|
||||
|
||||
impl ClientDetectionServerRequest {
|
||||
pub fn new() -> Self {
|
||||
ClientDetectionServerRequest {
|
||||
requestId: generate_request_id(),
|
||||
headers: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,5 +1,4 @@
|
|||
use lazy_static::lazy_static;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub(crate) mod server_response;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,5 @@
|
|||
#![allow(non_snake_case)]
|
||||
use crate::common::remote::response::{
|
||||
Response, TYPE_ERROR_SERVER_RESPONSE, TYPE_SERVER_CHECK_SERVER_RESPONSE,
|
||||
};
|
||||
use crate::common::remote::response::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
|
|
|||
|
|
@ -1,9 +1,7 @@
|
|||
use crate::common::remote::request::client_request::ConnectResetRequest;
|
||||
use crate::common::remote::request::{Request, LOCAL_IP, TYPE_CONNECT_RESET_SERVER_REQUEST};
|
||||
use crate::common::remote::response::server_response::{ErrorResponse, ServerCheckServerResponse};
|
||||
use crate::common::remote::response::{
|
||||
Response, TYPE_ERROR_SERVER_RESPONSE, TYPE_SERVER_CHECK_SERVER_RESPONSE,
|
||||
};
|
||||
use crate::common::remote::request::server_request::*;
|
||||
use crate::common::remote::request::*;
|
||||
use crate::common::remote::response::server_response::*;
|
||||
use crate::common::remote::response::*;
|
||||
use crate::nacos_proto::v2::{Metadata, Payload};
|
||||
use serde::Serialize;
|
||||
|
||||
|
|
@ -69,7 +67,11 @@ pub(crate) fn build_server_request(
|
|||
println!("build_server_request {} with {}", type_url, body_str);
|
||||
tracing::debug!("build_server_request {} with {}", type_url, body_str);
|
||||
if TYPE_CONNECT_RESET_SERVER_REQUEST.eq(&type_url) {
|
||||
let de: ConnectResetRequest = serde_json::from_str(body_str.as_str())?;
|
||||
let de: ConnectResetServerRequest = serde_json::from_str(body_str.as_str())?;
|
||||
return Ok(Box::new(de));
|
||||
}
|
||||
if TYPE_CLIENT_DETECTION_SERVER_REQUEST.eq(&type_url) {
|
||||
let de: ClientDetectionServerRequest = serde_json::from_str(body_str.as_str())?;
|
||||
return Ok(Box::new(de));
|
||||
}
|
||||
Err(crate::api::error::Error::Deserialization(type_url))
|
||||
|
|
|
|||
11
src/lib.rs
11
src/lib.rs
|
|
@ -20,21 +20,18 @@ mod common;
|
|||
|
||||
mod nacos_proto {
|
||||
pub mod v2 {
|
||||
tonic::include_proto!("nacos.v2");
|
||||
include!("_.rs");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use prost_types::Any;
|
||||
use crate::nacos_proto::v2::{Metadata, Payload};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::nacos_proto::v2::Metadata;
|
||||
use crate::nacos_proto::v2::Payload;
|
||||
|
||||
#[test]
|
||||
fn it_works_nacos_proto() {
|
||||
let body = Any {
|
||||
let body = prost_types::Any {
|
||||
type_url: String::new(),
|
||||
value: Vec::from("{\"cluster\":\"DEFAULT\",\"healthyOnly\":true}"),
|
||||
};
|
||||
|
|
@ -47,7 +44,7 @@ mod tests {
|
|||
metadata: Some(metadata),
|
||||
body: Some(body),
|
||||
};
|
||||
println!("{:?}", payload);
|
||||
// println!("{:?}", payload);
|
||||
assert_eq!(
|
||||
payload.metadata.unwrap().r#type,
|
||||
"com.alibaba.nacos.api.naming.remote.request.ServiceQueryRequest"
|
||||
|
|
|
|||
Loading…
Reference in New Issue