From 77017eedea71499f9f441926f6da00a3c7535403 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 1 May 2018 11:59:45 -0700 Subject: [PATCH] proxy: Fix Tap ID generation (#885) The proxy's tap server assigns a sequential numeric ID to each inbound Tap request to assist tap lifecycle management. The server implementation keeps a local counter to keep track of tap IDs. However, this implementation is cloned for each individual tap requests, so `0` the only tap ID ever used. This change moves the Tap ID to be stored in a shared atomic integer. Debug logging has been improved as well. --- proxy/src/control/observe.rs | 10 +++++----- proxy/src/telemetry/tap/mod.rs | 25 ++++++++++++++++--------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/proxy/src/control/observe.rs b/proxy/src/control/observe.rs index ec6e03f4c..bd509036a 100644 --- a/proxy/src/control/observe.rs +++ b/proxy/src/control/observe.rs @@ -1,4 +1,5 @@ use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicUsize, Ordering}; use futures::{future, Poll, Stream}; use futures_mpsc_lossy; @@ -14,7 +15,7 @@ use telemetry::tap::{Tap, Taps}; #[derive(Clone, Debug)] pub struct Observe { - next_id: usize, + next_id: Arc, taps: Arc>, tap_capacity: usize, } @@ -32,7 +33,7 @@ impl Observe { let taps = Arc::new(Mutex::new(Taps::default())); let observe = Observe { - next_id: 0, + next_id: Arc::new(AtomicUsize::new(0)), tap_capacity, taps: taps.clone(), }; @@ -46,7 +47,7 @@ impl server::Tap for Observe { type ObserveFuture = future::FutureResult, grpc::Error>; fn observe(&mut self, req: grpc::Request) -> Self::ObserveFuture { - if self.next_id == ::std::usize::MAX { + if self.next_id.load(Ordering::Acquire) == ::std::usize::MAX { return future::err(grpc::Error::Grpc(grpc::Status::INTERNAL)); } @@ -64,8 +65,7 @@ impl server::Tap for Observe { let tap_id = match self.taps.lock() { Ok(mut taps) => { - let tap_id = self.next_id; - self.next_id += 1; + let tap_id = self.next_id.fetch_add(1, Ordering::AcqRel); let _ = (*taps).insert(tap_id, tap); tap_id } diff --git a/proxy/src/telemetry/tap/mod.rs b/proxy/src/telemetry/tap/mod.rs index 81f5ecb1f..911ce3ea9 100644 --- a/proxy/src/telemetry/tap/mod.rs +++ b/proxy/src/telemetry/tap/mod.rs @@ -26,34 +26,43 @@ struct Ended; impl Taps { pub fn insert(&mut self, id: usize, tap: Tap) -> Option { + debug!("insert id={} tap={:?}", id, tap); self.by_id.insert(id, tap) } pub fn remove(&mut self, id: usize) -> Option { + debug!("remove id={}", id); self.by_id.swap_remove(&id) } /// pub(super) fn inspect(&mut self, ev: &Event) { - if !ev.is_http() { + if !ev.is_http() || self.by_id.is_empty() { return; } + debug!("inspect taps={:?} event={:?}", self.by_id.keys().collect::>(), ev); // Iterate through taps by index so that items may be removed. let mut idx = 0; while idx < self.by_id.len() { - let ended = { - let (_, tap) = self.by_id.get_index(idx).unwrap(); - tap.inspect(ev).is_err() + let (tap_id, inspect) = { + let (id, tap) = self.by_id.get_index(idx).unwrap(); + (*id, tap.inspect(ev)) }; // If the tap is no longer receiving events, remove it. The index is only // incremented on successs so that, when an item is removed, the swapped item // is inspected on the next iteration OR, if the last item has been removed, // `len()` will return `idx` and a subsequent iteration will not occur. - if ended { - self.by_id.swap_remove_index(idx); - continue; + match inspect { + Ok(matched) => { + debug!("inspect tap={} match={}", tap_id, matched); + } + Err(Ended) => { + debug!("ended tap={}", tap_id); + self.by_id.swap_remove_index(idx); + continue; + } } idx += 1; @@ -76,8 +85,6 @@ impl Tap { } fn inspect(&self, ev: &Event) -> Result { - debug!("inspect event={:?} with tap={:?}", ev, self); - if self.match_.matches(ev) { return self.tx .lossy_send(ev.clone())