fix: type_url=ConnectionSetupRequest

This commit is contained in:
caihongwen 2022-09-06 01:33:51 +08:00
parent f1722e4ff8
commit 480d4c22dd
5 changed files with 34 additions and 16 deletions

View File

@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tonic::{
async_trait,
transport::{Channel, Endpoint},
@ -24,7 +25,7 @@ pub(crate) struct GrpcRemoteClient {
labels: HashMap<String, String>,
client: RequestClient<Channel>,
bi_client: BiRequestStreamClient<Channel>,
resp_bi_stream: Arc<tonic::codec::Streaming<Payload>>,
resp_bi_stream: Arc<Mutex<tonic::codec::Streaming<Payload>>>,
}
impl GrpcRemoteClient {
@ -39,12 +40,12 @@ impl GrpcRemoteClient {
}
/// Sets the resp_bi_stream against.
pub fn resp_bi_stream(self, resp_bi_stream: Arc<tonic::codec::Streaming<Payload>>) -> Self {
/*pub fn resp_bi_stream(self, resp_bi_stream: Arc<tonic::codec::Streaming<Payload>>) -> Self {
GrpcRemoteClient {
resp_bi_stream,
..self
}
}
}*/
pub async fn new(client_config: ClientConfig) -> crate::api::error::Result<Self> {
let client_name = client_config.client_name;
@ -59,7 +60,7 @@ impl GrpcRemoteClient {
.await
.unwrap();
let resp_bi_stream = Arc::new(resp_bi_stream);
let resp_bi_stream = Arc::new(Mutex::new(resp_bi_stream));
let mut resp_bi_stream_clone = Arc::clone(&resp_bi_stream);
/*let thread = tokio::spawn(async move {
@ -133,10 +134,15 @@ impl RemoteClient for GrpcRemoteClient {}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::thread::sleep;
use std::time::Duration;
use tokio_stream::StreamExt;
use crate::api::client_config::ClientConfig;
use crate::common::remote::remote_client::GrpcRemoteClient;
use crate::common::remote::RemoteClient;
use crate::common::util::payload_helper;
// #[tokio::test]
async fn test_grpc_remote_client() {
@ -145,5 +151,14 @@ mod tests {
)
.await
.unwrap();
// wait resp_bi_stream
sleep(Duration::from_secs(2));
let mut resp_bi_stream_clone = remote_client.resp_bi_stream.clone();
while let Some(received) = resp_bi_stream_clone.lock().unwrap().next().await {
let payload = received.unwrap();
let server_req = payload_helper::build_server_request(payload);
}
}
}

View File

@ -1,7 +1,7 @@
#![allow(non_snake_case)]
use crate::common::remote::request::{
generate_request_id, Request, TYPE_CONNECT_RESET_SERVER_REQUEST,
TYPE_SERVER_CHECK_CLIENT_REQUEST,
TYPE_CONNECT_SETUP_SERVER_REQUEST, TYPE_SERVER_CHECK_CLIENT_REQUEST,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@ -53,7 +53,7 @@ impl Request for ConnectionSetupClientRequest {
&self.headers
}
fn get_type_url(&self) -> &String {
&TYPE_SERVER_CHECK_CLIENT_REQUEST
&TYPE_CONNECT_SETUP_SERVER_REQUEST
}
}

View File

@ -17,6 +17,9 @@ lazy_static! {
/// com.alibaba.nacos.api.remote.request.ServerCheckRequest
pub static ref TYPE_SERVER_CHECK_CLIENT_REQUEST: String = String::from("ServerCheckRequest");
/// com.alibaba.nacos.api.remote.request.ConnectionSetupRequest
pub static ref TYPE_CONNECT_SETUP_SERVER_REQUEST: String = String::from("ConnectionSetupRequest");
/// com.alibaba.nacos.api.remote.request.ConnectResetRequest
pub static ref TYPE_CONNECT_RESET_SERVER_REQUEST: String = String::from("ConnectResetRequest");

View File

@ -1,5 +1,7 @@
#![allow(non_snake_case)]
use crate::common::remote::response::{Response, TYPE_ERROR_SERVER_RESPONSE, TYPE_SERVER_CHECK_SERVER_RESPONSE};
use crate::common::remote::response::{
Response, TYPE_ERROR_SERVER_RESPONSE, TYPE_SERVER_CHECK_SERVER_RESPONSE,
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
@ -50,7 +52,6 @@ impl ServerCheckServerResponse {
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ErrorResponse {
requestId: Option<String>,

View File

@ -1,7 +1,9 @@
use crate::common::remote::request::client_request::ConnectResetRequest;
use crate::common::remote::request::{Request, LOCAL_IP, TYPE_CONNECT_RESET_SERVER_REQUEST};
use crate::common::remote::response::server_response::{ErrorResponse, ServerCheckServerResponse};
use crate::common::remote::response::{Response, TYPE_ERROR_SERVER_RESPONSE, TYPE_SERVER_CHECK_SERVER_RESPONSE};
use crate::common::remote::response::{
Response, TYPE_ERROR_SERVER_RESPONSE, TYPE_SERVER_CHECK_SERVER_RESPONSE,
};
use crate::nacos_proto::v2::{Metadata, Payload};
use serde::Serialize;
@ -46,13 +48,11 @@ pub(crate) fn build_server_response(
let body_str = String::from_utf8(body_data).unwrap();
tracing::debug!("build_server_response {} with {}", type_url, body_str);
if TYPE_SERVER_CHECK_SERVER_RESPONSE.eq(&type_url) {
let de: ServerCheckServerResponse =
serde_json::from_str(body_str.as_str())?;
let de: ServerCheckServerResponse = serde_json::from_str(body_str.as_str())?;
return Ok(Box::new(de));
}
if TYPE_ERROR_SERVER_RESPONSE.eq(&type_url) {
let de: ErrorResponse =
serde_json::from_str(body_str.as_str())?;
let de: ErrorResponse = serde_json::from_str(body_str.as_str())?;
return Ok(Box::new(de));
}
Err(crate::api::error::Error::Deserialization(type_url))
@ -67,8 +67,7 @@ pub(crate) fn build_server_request(
let body_str = String::from_utf8(body_data).unwrap();
tracing::debug!("build_server_request {} with {}", type_url, body_str);
if TYPE_CONNECT_RESET_SERVER_REQUEST.eq(&type_url) {
let de: ConnectResetRequest =
serde_json::from_str(body_str.as_str())?;
let de: ConnectResetRequest = serde_json::from_str(body_str.as_str())?;
return Ok(Box::new(de));
}
Err(crate::api::error::Error::Deserialization(type_url))