diff --git a/crates/snapshots/Cargo.toml b/crates/snapshots/Cargo.toml index 22aa621..7899203 100644 --- a/crates/snapshots/Cargo.toml +++ b/crates/snapshots/Cargo.toml @@ -15,5 +15,11 @@ prost-types = "0.9" tokio = { version = "1.15", features = ["sync"] } tokio-stream = "0.1.8" +[dev-dependencies] +log = "0.4" +async-stream = "0.3.2" +futures = "0.3.17" +simple_logger = { version = "1.13", default-features = false } + [build-dependencies] tonic-build = "0.6" diff --git a/crates/snapshots/examples/snapshotter.rs b/crates/snapshots/examples/snapshotter.rs new file mode 100644 index 0000000..f709c73 --- /dev/null +++ b/crates/snapshots/examples/snapshotter.rs @@ -0,0 +1,200 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +use std::collections::HashMap; +use std::env; + +use futures::TryFutureExt; +use log::info; +use tokio::net::UnixListener; + +use containerd_snapshots as snapshots; +use containerd_snapshots::{api, Info, Usage}; + +use snapshots::tonic::transport::Server; + +#[derive(Default)] +struct Example; + +#[snapshots::tonic::async_trait] +impl snapshots::Snapshotter for Example { + type Error = String; + + async fn stat(&self, key: String) -> Result { + info!("Stat: {}", key); + Ok(Info::default()) + } + + async fn update( + &self, + info: Info, + fieldpaths: Option>, + ) -> Result { + info!("Update: info={:?}, fieldpaths={:?}", info, fieldpaths); + Ok(Info::default()) + } + + async fn usage(&self, key: String) -> Result { + info!("Usage: {}", key); + Ok(Usage::default()) + } + + async fn mounts(&self, key: String) -> Result, Self::Error> { + info!("Mounts: {}", key); + Ok(Vec::new()) + } + + async fn prepare( + &self, + key: String, + parent: String, + labels: HashMap, + ) -> Result, Self::Error> { + info!( + "Prepare: key={}, parent={}, labels={:?}", + key, parent, labels + ); + Ok(Vec::new()) + } + + async fn view( + &self, + key: String, + parent: String, + labels: HashMap, + ) -> Result, Self::Error> { + info!("View: key={}, parent={}, labels={:?}", key, parent, labels); + Ok(Vec::new()) + } + + async fn commit( + &self, + name: String, + key: String, + labels: HashMap, + ) -> Result<(), Self::Error> { + info!("Commit: name={}, key={}, labels={:?}", name, key, labels); + Ok(()) + } + + async fn remove(&self, key: String) -> Result<(), Self::Error> { + info!("Remove: {}", key); + Ok(()) + } +} + +#[cfg(unix)] +#[tokio::main(flavor = "current_thread")] +async fn main() { + simple_logger::SimpleLogger::new() + .init() + .expect("Failed to initialize logger"); + + let args = env::args().collect::>(); + + let socket_path = args + .get(1) + .ok_or("First argument must be socket path") + .unwrap(); + + let example = Example::default(); + + let incoming = { + let uds = UnixListener::bind(socket_path).expect("Failed to bind listener"); + + async_stream::stream! { + loop { + let item = uds.accept().map_ok(|(st, _)| unix::UnixStream(st)).await; + yield item; + } + } + }; + + Server::builder() + .add_service(snapshots::server(example)) + .serve_with_incoming(incoming) + .await + .expect("Serve failed"); +} + +// Copy-pasted from https://github.com/hyperium/tonic/blob/master/examples/src/uds/server.rs#L69 +#[cfg(unix)] +mod unix { + use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, + }; + + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + use tonic::transport::server::Connected; + + #[derive(Debug)] + pub struct UnixStream(pub tokio::net::UnixStream); + + impl Connected for UnixStream { + type ConnectInfo = UdsConnectInfo; + + fn connect_info(&self) -> Self::ConnectInfo { + UdsConnectInfo { + peer_addr: self.0.peer_addr().ok().map(Arc::new), + peer_cred: self.0.peer_cred().ok(), + } + } + } + + #[derive(Clone, Debug)] + pub struct UdsConnectInfo { + pub peer_addr: Option>, + pub peer_cred: Option, + } + + impl AsyncRead for UnixStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) + } + } + + impl AsyncWrite for UnixStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_shutdown(cx) + } + } +} + +#[cfg(not(unix))] +fn main() { + panic!("The snapshotter example only works on unix systems!"); +} diff --git a/crates/snapshots/src/lib.rs b/crates/snapshots/src/lib.rs index 7539eaf..8a0f5d1 100644 --- a/crates/snapshots/src/lib.rs +++ b/crates/snapshots/src/lib.rs @@ -16,11 +16,34 @@ //! Remote snapshotter library for containerd. //! -//! This crate aims to hide the underlying complexity of GRPC interfaces, streaming and request/response -//! conversions and provide one clean [Snapshots] trait to implement for remote snapshotting. +//! This crate implements containerd's proxy plugin for snapshotting. It aims hide the underlying +//! complexity of GRPC interfaces, streaming and request/response conversions and provide one +//! [Snapshots] trait to implement. +//! +//! # How to use from containerd. +//! Add the following to containerd's configuration file: +//! ```toml +//! [proxy_plugins] +//! [proxy_plugins.custom] +//! type = "snapshot" +//! address = "/tmp/snap2.sock" +//! ``` +//! Start containerd daemon: +//! ```bash +//! containerd --config /path/config.toml +//! ``` +//! +//! Run remote snapshotter instance: +//! ```bash +//! $ cargo run --example snapshotter /tmp/snap2.sock +//! ``` +//! Specify the snapshotter when pulling an image: +//! ```bash +//! $ ctr i pull --snapshotter custom docker.io/library/hello-world:latest +//! ``` +//! use std::collections::HashMap; -use std::error::Error; use std::fmt::Debug; use std::ops::AddAssign; use std::time::SystemTime; @@ -56,6 +79,12 @@ pub enum Kind { Committed, } +impl Default for Kind { + fn default() -> Self { + Kind::Unknown + } +} + /// Information about a particular snapshot. #[derive(Debug)] pub struct Info { @@ -73,11 +102,24 @@ pub struct Info { pub updated_at: SystemTime, } +impl Default for Info { + fn default() -> Self { + Info { + kind: Default::default(), + name: Default::default(), + parent: Default::default(), + labels: Default::default(), + created_at: SystemTime::now(), + updated_at: SystemTime::now(), + } + } +} + /// Defines statistics for disk resources consumed by the snapshot. /// // These resources only include the resources consumed by the snapshot itself and does not include // resources usage by the parent. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct Usage { /// Number of inodes in use. pub inodes: i64, @@ -105,7 +147,7 @@ pub trait Snapshotter: Send + Sync + 'static { /// Error type returned from the underlying snapshotter implementation. /// /// This type must be convertable to GRPC status. - type Error: Error; + type Error: Debug; /// Returns the info for an active or committed snapshot by name or key. /// diff --git a/crates/snapshots/src/wrap.rs b/crates/snapshots/src/wrap.rs index e1c8f9a..f206ab5 100644 --- a/crates/snapshots/src/wrap.rs +++ b/crates/snapshots/src/wrap.rs @@ -17,7 +17,7 @@ //! Trait wrapper to server GRPC requests. use std::convert::TryInto; -use std::error::Error; +use std::fmt::Debug; use tokio_stream::wrappers::ReceiverStream; @@ -209,7 +209,7 @@ impl Snapshots for Wrapper { } } -fn status(err: E) -> tonic::Status { - let message = format!("{}", err); +fn status(err: E) -> tonic::Status { + let message = format!("{:?}", err); tonic::Status::internal(message) }