diff --git a/proxy/controller-grpc/src/arbitrary.rs b/proxy/controller-grpc/src/arbitrary.rs index cd62fa6ef..420795cdf 100644 --- a/proxy/controller-grpc/src/arbitrary.rs +++ b/proxy/controller-grpc/src/arbitrary.rs @@ -54,6 +54,15 @@ impl Arbitrary for observe_request::match_::Seq { } } +impl Arbitrary for observe_request::match_::Label { + fn arbitrary(g: &mut G) -> Self { + observe_request::match_::Label { + key: Arbitrary::arbitrary(g), + value: Arbitrary::arbitrary(g), + } + } +} + impl Arbitrary for observe_request::match_::Tcp { fn arbitrary(g: &mut G) -> Self { observe_request::match_::Tcp { diff --git a/proxy/src/telemetry/tap/match_.rs b/proxy/src/telemetry/tap/match_.rs index 29851ac0d..27c397f9d 100644 --- a/proxy/src/telemetry/tap/match_.rs +++ b/proxy/src/telemetry/tap/match_.rs @@ -1,4 +1,5 @@ use std::boxed::Box; +use std::collections::HashMap; use std::net; use std::sync::Arc; @@ -18,6 +19,7 @@ pub(super) enum Match { Not(Box), Source(TcpMatch), Destination(TcpMatch), + DestinationLabel(LabelMatch), Http(HttpMatch), } @@ -31,6 +33,12 @@ pub enum InvalidMatch { Unimplemented, } +#[derive(Clone, Debug)] +pub(super) struct LabelMatch { + key: String, + value: String, +} + #[derive(Clone, Debug)] pub(super) enum TcpMatch { // Inclusive @@ -97,6 +105,36 @@ impl Match { _ => false, }, + Match::DestinationLabel(ref label) => match *ev { + Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { + match req.dst_labels() { + None => false, + Some(ref b) => { + match b.borrow().as_ref() { + None => false, + Some(ref labels) => label.matches(labels.as_map()), + } + } + } + } + + Event::StreamResponseOpen(ref rsp, _) | + Event::StreamResponseFail(ref rsp, _) | + Event::StreamResponseEnd(ref rsp, _) => { + match rsp.request.dst_labels() { + None => false, + Some(ref b) => { + match b.borrow().as_ref() { + None => false, + Some(ref labels) => label.matches(labels.as_map()), + } + } + } + }, + + _ => false, + } + Match::Http(ref http) => match *ev { Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { http.matches(req) @@ -153,7 +191,9 @@ impl<'a> TryFrom<&'a observe_request::match_::Match> for Match { match_::Match::Destination(ref dst) => Match::Destination(TcpMatch::try_from(dst)?), - match_::Match::DestinationLabel(..) => return Err(InvalidMatch::Unimplemented), + match_::Match::DestinationLabel(ref label) => { + Match::DestinationLabel(LabelMatch::try_from(label)?) + } match_::Match::Http(ref http) => Match::Http(HttpMatch::try_from(http)?), }; @@ -162,6 +202,29 @@ impl<'a> TryFrom<&'a observe_request::match_::Match> for Match { } } +// ===== impl LabelMatch ====== + +impl LabelMatch { + fn matches(&self, labels: &HashMap) -> bool { + labels.get(&self.key) == Some(&self.value) + } +} + +impl<'a> TryFrom<&'a observe_request::match_::Label> for LabelMatch { + type Err = InvalidMatch; + + fn try_from(m: &observe_request::match_::Label) -> Result { + if m.key.is_empty() || m.value.is_empty() { + return Err(InvalidMatch::Empty); + } + + Ok(LabelMatch { + key: m.key.clone(), + value: m.value.clone(), + }) + } +} + // ===== impl TcpMatch ====== impl TcpMatch { @@ -340,6 +403,15 @@ mod tests { use super::*; use conduit_proxy_controller_grpc::*; + impl Arbitrary for LabelMatch { + fn arbitrary(g: &mut G) -> Self { + Self { + key: Arbitrary::arbitrary(g), + value: Arbitrary::arbitrary(g), + } + } + } + impl Arbitrary for TcpMatch { fn arbitrary(g: &mut G) -> Self { if g.gen::() { @@ -405,6 +477,22 @@ mod tests { m.matches(&addr) == matches } + fn labels_from_proto(label: observe_request::match_::Label) -> bool { + let err: Option = + if label.key.is_empty() || label.value.is_empty() { + Some(InvalidMatch::Empty) + } else { + None + }; + + err == LabelMatch::try_from(&label).err() + } + + fn label_matches(l: LabelMatch, labels: HashMap) -> bool { + let matches = labels.get(&l.key) == Some(&l.value); + l.matches(&labels) == matches + } + fn http_from_proto(http: observe_request::match_::Http) -> bool { use self::observe_request::match_::http;