Add snapshots example

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2021-12-17 15:24:29 -08:00
parent 70f7cacdc1
commit e5d003c8bb
4 changed files with 256 additions and 8 deletions

View File

@ -15,5 +15,11 @@ prost-types = "0.9"
tokio = { version = "1.15", features = ["sync"] }
tokio-stream = "0.1.8"
[dev-dependencies]
log = "0.4"
async-stream = "0.3.2"
futures = "0.3.17"
simple_logger = { version = "1.13", default-features = false }
[build-dependencies]
tonic-build = "0.6"

View File

@ -0,0 +1,200 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
use std::collections::HashMap;
use std::env;
use futures::TryFutureExt;
use log::info;
use tokio::net::UnixListener;
use containerd_snapshots as snapshots;
use containerd_snapshots::{api, Info, Usage};
use snapshots::tonic::transport::Server;
#[derive(Default)]
struct Example;
#[snapshots::tonic::async_trait]
impl snapshots::Snapshotter for Example {
type Error = String;
async fn stat(&self, key: String) -> Result<Info, Self::Error> {
info!("Stat: {}", key);
Ok(Info::default())
}
async fn update(
&self,
info: Info,
fieldpaths: Option<Vec<String>>,
) -> Result<Info, Self::Error> {
info!("Update: info={:?}, fieldpaths={:?}", info, fieldpaths);
Ok(Info::default())
}
async fn usage(&self, key: String) -> Result<Usage, Self::Error> {
info!("Usage: {}", key);
Ok(Usage::default())
}
async fn mounts(&self, key: String) -> Result<Vec<api::types::Mount>, Self::Error> {
info!("Mounts: {}", key);
Ok(Vec::new())
}
async fn prepare(
&self,
key: String,
parent: String,
labels: HashMap<String, String>,
) -> Result<Vec<api::types::Mount>, Self::Error> {
info!(
"Prepare: key={}, parent={}, labels={:?}",
key, parent, labels
);
Ok(Vec::new())
}
async fn view(
&self,
key: String,
parent: String,
labels: HashMap<String, String>,
) -> Result<Vec<api::types::Mount>, Self::Error> {
info!("View: key={}, parent={}, labels={:?}", key, parent, labels);
Ok(Vec::new())
}
async fn commit(
&self,
name: String,
key: String,
labels: HashMap<String, String>,
) -> Result<(), Self::Error> {
info!("Commit: name={}, key={}, labels={:?}", name, key, labels);
Ok(())
}
async fn remove(&self, key: String) -> Result<(), Self::Error> {
info!("Remove: {}", key);
Ok(())
}
}
#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logger::SimpleLogger::new()
.init()
.expect("Failed to initialize logger");
let args = env::args().collect::<Vec<_>>();
let socket_path = args
.get(1)
.ok_or("First argument must be socket path")
.unwrap();
let example = Example::default();
let incoming = {
let uds = UnixListener::bind(socket_path).expect("Failed to bind listener");
async_stream::stream! {
loop {
let item = uds.accept().map_ok(|(st, _)| unix::UnixStream(st)).await;
yield item;
}
}
};
Server::builder()
.add_service(snapshots::server(example))
.serve_with_incoming(incoming)
.await
.expect("Serve failed");
}
// Copy-pasted from https://github.com/hyperium/tonic/blob/master/examples/src/uds/server.rs#L69
#[cfg(unix)]
mod unix {
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tonic::transport::server::Connected;
#[derive(Debug)]
pub struct UnixStream(pub tokio::net::UnixStream);
impl Connected for UnixStream {
type ConnectInfo = UdsConnectInfo;
fn connect_info(&self) -> Self::ConnectInfo {
UdsConnectInfo {
peer_addr: self.0.peer_addr().ok().map(Arc::new),
peer_cred: self.0.peer_cred().ok(),
}
}
}
#[derive(Clone, Debug)]
pub struct UdsConnectInfo {
pub peer_addr: Option<Arc<tokio::net::unix::SocketAddr>>,
pub peer_cred: Option<tokio::net::unix::UCred>,
}
impl AsyncRead for UnixStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
impl AsyncWrite for UnixStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_shutdown(cx)
}
}
}
#[cfg(not(unix))]
fn main() {
panic!("The snapshotter example only works on unix systems!");
}

View File

@ -16,11 +16,34 @@
//! Remote snapshotter library for containerd.
//!
//! This crate aims to hide the underlying complexity of GRPC interfaces, streaming and request/response
//! conversions and provide one clean [Snapshots] trait to implement for remote snapshotting.
//! This crate implements containerd's proxy plugin for snapshotting. It aims hide the underlying
//! complexity of GRPC interfaces, streaming and request/response conversions and provide one
//! [Snapshots] trait to implement.
//!
//! # How to use from containerd.
//! Add the following to containerd's configuration file:
//! ```toml
//! [proxy_plugins]
//! [proxy_plugins.custom]
//! type = "snapshot"
//! address = "/tmp/snap2.sock"
//! ```
//! Start containerd daemon:
//! ```bash
//! containerd --config /path/config.toml
//! ```
//!
//! Run remote snapshotter instance:
//! ```bash
//! $ cargo run --example snapshotter /tmp/snap2.sock
//! ```
//! Specify the snapshotter when pulling an image:
//! ```bash
//! $ ctr i pull --snapshotter custom docker.io/library/hello-world:latest
//! ```
//!
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Debug;
use std::ops::AddAssign;
use std::time::SystemTime;
@ -56,6 +79,12 @@ pub enum Kind {
Committed,
}
impl Default for Kind {
fn default() -> Self {
Kind::Unknown
}
}
/// Information about a particular snapshot.
#[derive(Debug)]
pub struct Info {
@ -73,11 +102,24 @@ pub struct Info {
pub updated_at: SystemTime,
}
impl Default for Info {
fn default() -> Self {
Info {
kind: Default::default(),
name: Default::default(),
parent: Default::default(),
labels: Default::default(),
created_at: SystemTime::now(),
updated_at: SystemTime::now(),
}
}
}
/// Defines statistics for disk resources consumed by the snapshot.
///
// These resources only include the resources consumed by the snapshot itself and does not include
// resources usage by the parent.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct Usage {
/// Number of inodes in use.
pub inodes: i64,
@ -105,7 +147,7 @@ pub trait Snapshotter: Send + Sync + 'static {
/// Error type returned from the underlying snapshotter implementation.
///
/// This type must be convertable to GRPC status.
type Error: Error;
type Error: Debug;
/// Returns the info for an active or committed snapshot by name or key.
///

View File

@ -17,7 +17,7 @@
//! Trait wrapper to server GRPC requests.
use std::convert::TryInto;
use std::error::Error;
use std::fmt::Debug;
use tokio_stream::wrappers::ReceiverStream;
@ -209,7 +209,7 @@ impl<S: Snapshotter> Snapshots for Wrapper<S> {
}
}
fn status<E: Error>(err: E) -> tonic::Status {
let message = format!("{}", err);
fn status<E: Debug>(err: E) -> tonic::Status {
let message = format!("{:?}", err);
tonic::Status::internal(message)
}