From d3852a5fcd4db994adafb5f1849cdb0e995c901b Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Mon, 26 Jul 2021 16:59:21 -0700 Subject: [PATCH] Implement remote publisher Signed-off-by: Maksym Pavlenko --- crates/shim-protos/src/lib.rs | 5 +- crates/shim/examples/empty-shim.rs | 7 ++- crates/shim/src/lib.rs | 10 +++- crates/shim/src/publisher.rs | 86 ++++++++++++++++++++++++++++++ 4 files changed, 104 insertions(+), 4 deletions(-) create mode 100644 crates/shim/src/publisher.rs diff --git a/crates/shim-protos/src/lib.rs b/crates/shim-protos/src/lib.rs index 23ad413..bd235cb 100644 --- a/crates/shim-protos/src/lib.rs +++ b/crates/shim-protos/src/lib.rs @@ -9,6 +9,9 @@ pub mod events; #[rustfmt::skip] pub mod shim; +pub use ttrpc::Client; + pub use shim::shim as api; pub use shim::shim_ttrpc::{Task, TaskClient}; -pub use ttrpc::Client; + +pub use shim::events_ttrpc::{Events, EventsClient}; diff --git a/crates/shim/examples/empty-shim.rs b/crates/shim/examples/empty-shim.rs index c09b920..163d3c5 100644 --- a/crates/shim/examples/empty-shim.rs +++ b/crates/shim/examples/empty-shim.rs @@ -6,7 +6,12 @@ use log::info; struct Service; impl shim::Shim for Service { - fn new(_id: &str, _namespace: &str, _config: &mut shim::Config) -> Self { + fn new( + _id: &str, + _namespace: &str, + _publisher: shim::RemotePublisher, + _config: &mut shim::Config, + ) -> Self { Service {} } } diff --git a/crates/shim/src/lib.rs b/crates/shim/src/lib.rs index 8d57469..4a7381f 100644 --- a/crates/shim/src/lib.rs +++ b/crates/shim/src/lib.rs @@ -18,8 +18,11 @@ use thiserror::Error; mod args; mod logger; +mod publisher; mod reap; +pub use publisher::RemotePublisher; + pub use protos::shim::shim as api; pub use protos::shim::shim_ttrpc::Task; @@ -52,7 +55,7 @@ pub struct StartOpts { /// Shim interface that must be implemented by clients. pub trait Shim: Task { - fn new(id: &str, namespace: &str, config: &mut Config) -> Self; + fn new(id: &str, namespace: &str, publisher: RemotePublisher, config: &mut Config) -> Self; /// Launch new shim. /// See https://github.com/containerd/containerd/tree/master/runtime/v2#start @@ -85,10 +88,11 @@ where let flags = args::parse(&os_args[1..])?; let ttrpc_address = env::var("TTRPC_ADDRESS")?; + let publisher = publisher::RemotePublisher::new(&ttrpc_address)?; // Create shim instance let mut config = Config::default(); - let mut shim = T::new(id, &flags.namespace, &mut config); + let mut shim = T::new(id, &flags.namespace, publisher, &mut config); if !config.no_sub_reaper { reap::set_subreaper()?; @@ -161,6 +165,8 @@ pub enum Error { Start(Box), #[error("Shim cleanup failed")] Cleanup(Box), + #[error("Publisher error: {0}")] + Publisher(#[from] publisher::Error), } const SOCKET_ROOT: &str = "/run/containerd"; diff --git a/crates/shim/src/publisher.rs b/crates/shim/src/publisher.rs new file mode 100644 index 0000000..9d67e3a --- /dev/null +++ b/crates/shim/src/publisher.rs @@ -0,0 +1,86 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use containerd_shim_protos as client; + +use client::protobuf; +use client::shim::{empty, events}; +use client::ttrpc::{self, context::Context}; +use client::{Client, Events, EventsClient}; + +use protobuf::well_known_types::{Any, Timestamp}; +use protobuf::Message; + +use thiserror::Error; + +pub struct RemotePublisher { + client: EventsClient, +} + +impl RemotePublisher { + pub fn new(address: impl AsRef) -> Result { + let client = Client::connect(address.as_ref())?; + + Ok(RemotePublisher { + client: EventsClient::new(client), + }) + } + + pub fn publish( + &self, + ctx: Context, + topic: &str, + namespace: &str, + event: impl Message, + ) -> Result<(), Error> { + let mut envelope = events::Envelope::new(); + envelope.set_topic(topic.to_owned()); + envelope.set_namespace(namespace.to_owned()); + envelope.set_timestamp(Self::timestamp()?); + envelope.set_event(Self::any(event)?); + + let mut req = events::ForwardRequest::new(); + req.set_envelope(envelope); + + self.client.forward(ctx, &req)?; + + Ok(()) + } + + fn timestamp() -> Result { + let now = SystemTime::now().duration_since(UNIX_EPOCH)?; + + let mut ts = Timestamp::default(); + ts.set_seconds(now.as_secs() as _); + ts.set_nanos(now.as_nanos() as _); + + Ok(ts) + } + + fn any(event: impl Message) -> Result { + let data = event.write_to_bytes()?; + let mut any = Any::new(); + any.merge_from_bytes(&data)?; + + Ok(any) + } +} + +impl Events for RemotePublisher { + fn forward( + &self, + _ctx: &ttrpc::TtrpcContext, + req: events::ForwardRequest, + ) -> ttrpc::Result { + self.client.forward(Context::default(), &req) + } +} + +#[derive(Debug, Error)] +pub enum Error { + #[error("Publisher TTRPC error")] + Ttrpc(#[from] containerd_shim_protos::ttrpc::Error), + #[error("Failed to get envelope timestamp")] + Timestamp(#[from] std::time::SystemTimeError), + #[error("Failed to serialize event")] + Any(#[from] protobuf::ProtobufError), +}