Update publisher err deal.

ref:https://github.com/containerd/ttrpc-rust/pull/259
Due to the inclusion of the latest version of ttrpc, this part of the code has been updated

Signed-off-by: jokemanfire <hu.dingyang@zte.com.cn>
This commit is contained in:
jokemanfire 2025-02-13 11:15:27 +08:00 committed by Maksym Pavlenko
parent 559cc576b9
commit 906bd0f466
1 changed files with 15 additions and 10 deletions

View File

@ -24,7 +24,7 @@ use containerd_shim_protos::{
ttrpc, ttrpc,
ttrpc::context::Context, ttrpc::context::Context,
}; };
use log::debug; use log::{debug, error, warn};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::{ use crate::{
@ -93,16 +93,21 @@ impl RemotePublisher {
count: item.count + 1, count: item.count + 1,
}; };
if let Err(e) = client.forward(new_item.ctx.clone(), &req).await { if let Err(e) = client.forward(new_item.ctx.clone(), &req).await {
debug!("publish error {:?}", e); match e {
// This is a bug from ttrpc, ttrpc should return RemoteClosed|ClientClosed error. change it in future ttrpc::error::Error::RemoteClosed | ttrpc::error::Error::LocalClosed => {
// if e == (ttrpc::error::Error::RemoteClosed || ttrpc::error::Error::ClientClosed) warn!("publish fail because the server or client close {:?}", e);
// reconnect client // reconnect client
let new_client = Self::connect(address.as_str()).await.map_err(|e| { if let Ok(c) = Self::connect(address.as_str()).await.map_err(|e| {
debug!("reconnect the ttrpc client {:?} fail", e); debug!("reconnect the ttrpc client {:?} fail", e);
}); }) {
if let Ok(c) = new_client {
client = EventsClient::new(c); client = EventsClient::new(c);
} }
}
_ => {
// TODO! if it is other error , May we should deal with socket file
error!("the client forward err is {:?}", e);
}
}
let sender_ref = sender.clone(); let sender_ref = sender.clone();
// Take a another task requeue , for no blocking the recv task // Take a another task requeue , for no blocking the recv task
tokio::spawn(async move { tokio::spawn(async move {