Implement remote publisher
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
parent
5453065a59
commit
d3852a5fcd
|
|
@ -9,6 +9,9 @@ pub mod events;
|
|||
#[rustfmt::skip]
|
||||
pub mod shim;
|
||||
|
||||
pub use ttrpc::Client;
|
||||
|
||||
pub use shim::shim as api;
|
||||
pub use shim::shim_ttrpc::{Task, TaskClient};
|
||||
pub use ttrpc::Client;
|
||||
|
||||
pub use shim::events_ttrpc::{Events, EventsClient};
|
||||
|
|
|
|||
|
|
@ -6,7 +6,12 @@ use log::info;
|
|||
struct Service;
|
||||
|
||||
impl shim::Shim for Service {
|
||||
fn new(_id: &str, _namespace: &str, _config: &mut shim::Config) -> Self {
|
||||
fn new(
|
||||
_id: &str,
|
||||
_namespace: &str,
|
||||
_publisher: shim::RemotePublisher,
|
||||
_config: &mut shim::Config,
|
||||
) -> Self {
|
||||
Service {}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,8 +18,11 @@ use thiserror::Error;
|
|||
|
||||
mod args;
|
||||
mod logger;
|
||||
mod publisher;
|
||||
mod reap;
|
||||
|
||||
pub use publisher::RemotePublisher;
|
||||
|
||||
pub use protos::shim::shim as api;
|
||||
pub use protos::shim::shim_ttrpc::Task;
|
||||
|
||||
|
|
@ -52,7 +55,7 @@ pub struct StartOpts {
|
|||
|
||||
/// Shim interface that must be implemented by clients.
|
||||
pub trait Shim: Task {
|
||||
fn new(id: &str, namespace: &str, config: &mut Config) -> Self;
|
||||
fn new(id: &str, namespace: &str, publisher: RemotePublisher, config: &mut Config) -> Self;
|
||||
|
||||
/// Launch new shim.
|
||||
/// See https://github.com/containerd/containerd/tree/master/runtime/v2#start
|
||||
|
|
@ -85,10 +88,11 @@ where
|
|||
let flags = args::parse(&os_args[1..])?;
|
||||
|
||||
let ttrpc_address = env::var("TTRPC_ADDRESS")?;
|
||||
let publisher = publisher::RemotePublisher::new(&ttrpc_address)?;
|
||||
|
||||
// Create shim instance
|
||||
let mut config = Config::default();
|
||||
let mut shim = T::new(id, &flags.namespace, &mut config);
|
||||
let mut shim = T::new(id, &flags.namespace, publisher, &mut config);
|
||||
|
||||
if !config.no_sub_reaper {
|
||||
reap::set_subreaper()?;
|
||||
|
|
@ -161,6 +165,8 @@ pub enum Error {
|
|||
Start(Box<dyn error::Error>),
|
||||
#[error("Shim cleanup failed")]
|
||||
Cleanup(Box<dyn error::Error>),
|
||||
#[error("Publisher error: {0}")]
|
||||
Publisher(#[from] publisher::Error),
|
||||
}
|
||||
|
||||
const SOCKET_ROOT: &str = "/run/containerd";
|
||||
|
|
|
|||
|
|
@ -0,0 +1,86 @@
|
|||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use containerd_shim_protos as client;
|
||||
|
||||
use client::protobuf;
|
||||
use client::shim::{empty, events};
|
||||
use client::ttrpc::{self, context::Context};
|
||||
use client::{Client, Events, EventsClient};
|
||||
|
||||
use protobuf::well_known_types::{Any, Timestamp};
|
||||
use protobuf::Message;
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
pub struct RemotePublisher {
|
||||
client: EventsClient,
|
||||
}
|
||||
|
||||
impl RemotePublisher {
|
||||
pub fn new(address: impl AsRef<str>) -> Result<RemotePublisher, Error> {
|
||||
let client = Client::connect(address.as_ref())?;
|
||||
|
||||
Ok(RemotePublisher {
|
||||
client: EventsClient::new(client),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn publish(
|
||||
&self,
|
||||
ctx: Context,
|
||||
topic: &str,
|
||||
namespace: &str,
|
||||
event: impl Message,
|
||||
) -> Result<(), Error> {
|
||||
let mut envelope = events::Envelope::new();
|
||||
envelope.set_topic(topic.to_owned());
|
||||
envelope.set_namespace(namespace.to_owned());
|
||||
envelope.set_timestamp(Self::timestamp()?);
|
||||
envelope.set_event(Self::any(event)?);
|
||||
|
||||
let mut req = events::ForwardRequest::new();
|
||||
req.set_envelope(envelope);
|
||||
|
||||
self.client.forward(ctx, &req)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn timestamp() -> Result<Timestamp, Error> {
|
||||
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
|
||||
|
||||
let mut ts = Timestamp::default();
|
||||
ts.set_seconds(now.as_secs() as _);
|
||||
ts.set_nanos(now.as_nanos() as _);
|
||||
|
||||
Ok(ts)
|
||||
}
|
||||
|
||||
fn any(event: impl Message) -> Result<Any, Error> {
|
||||
let data = event.write_to_bytes()?;
|
||||
let mut any = Any::new();
|
||||
any.merge_from_bytes(&data)?;
|
||||
|
||||
Ok(any)
|
||||
}
|
||||
}
|
||||
|
||||
impl Events for RemotePublisher {
|
||||
fn forward(
|
||||
&self,
|
||||
_ctx: &ttrpc::TtrpcContext,
|
||||
req: events::ForwardRequest,
|
||||
) -> ttrpc::Result<empty::Empty> {
|
||||
self.client.forward(Context::default(), &req)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
#[error("Publisher TTRPC error")]
|
||||
Ttrpc(#[from] containerd_shim_protos::ttrpc::Error),
|
||||
#[error("Failed to get envelope timestamp")]
|
||||
Timestamp(#[from] std::time::SystemTimeError),
|
||||
#[error("Failed to serialize event")]
|
||||
Any(#[from] protobuf::ProtobufError),
|
||||
}
|
||||
Loading…
Reference in New Issue