proxy: Fix Tap ID generation (#885)
The proxy's tap server assigns a sequential numeric ID to each inbound Tap request to assist tap lifecycle management. The server implementation keeps a local counter to keep track of tap IDs. However, this implementation is cloned for each individual tap requests, so `0` the only tap ID ever used. This change moves the Tap ID to be stored in a shared atomic integer. Debug logging has been improved as well.
This commit is contained in:
parent
18e6eafb85
commit
77017eedea
|
@ -1,4 +1,5 @@
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
|
||||||
use futures::{future, Poll, Stream};
|
use futures::{future, Poll, Stream};
|
||||||
use futures_mpsc_lossy;
|
use futures_mpsc_lossy;
|
||||||
|
@ -14,7 +15,7 @@ use telemetry::tap::{Tap, Taps};
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Observe {
|
pub struct Observe {
|
||||||
next_id: usize,
|
next_id: Arc<AtomicUsize>,
|
||||||
taps: Arc<Mutex<Taps>>,
|
taps: Arc<Mutex<Taps>>,
|
||||||
tap_capacity: usize,
|
tap_capacity: usize,
|
||||||
}
|
}
|
||||||
|
@ -32,7 +33,7 @@ impl Observe {
|
||||||
let taps = Arc::new(Mutex::new(Taps::default()));
|
let taps = Arc::new(Mutex::new(Taps::default()));
|
||||||
|
|
||||||
let observe = Observe {
|
let observe = Observe {
|
||||||
next_id: 0,
|
next_id: Arc::new(AtomicUsize::new(0)),
|
||||||
tap_capacity,
|
tap_capacity,
|
||||||
taps: taps.clone(),
|
taps: taps.clone(),
|
||||||
};
|
};
|
||||||
|
@ -46,7 +47,7 @@ impl server::Tap for Observe {
|
||||||
type ObserveFuture = future::FutureResult<Response<Self::ObserveStream>, grpc::Error>;
|
type ObserveFuture = future::FutureResult<Response<Self::ObserveStream>, grpc::Error>;
|
||||||
|
|
||||||
fn observe(&mut self, req: grpc::Request<ObserveRequest>) -> Self::ObserveFuture {
|
fn observe(&mut self, req: grpc::Request<ObserveRequest>) -> Self::ObserveFuture {
|
||||||
if self.next_id == ::std::usize::MAX {
|
if self.next_id.load(Ordering::Acquire) == ::std::usize::MAX {
|
||||||
return future::err(grpc::Error::Grpc(grpc::Status::INTERNAL));
|
return future::err(grpc::Error::Grpc(grpc::Status::INTERNAL));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,8 +65,7 @@ impl server::Tap for Observe {
|
||||||
|
|
||||||
let tap_id = match self.taps.lock() {
|
let tap_id = match self.taps.lock() {
|
||||||
Ok(mut taps) => {
|
Ok(mut taps) => {
|
||||||
let tap_id = self.next_id;
|
let tap_id = self.next_id.fetch_add(1, Ordering::AcqRel);
|
||||||
self.next_id += 1;
|
|
||||||
let _ = (*taps).insert(tap_id, tap);
|
let _ = (*taps).insert(tap_id, tap);
|
||||||
tap_id
|
tap_id
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,34 +26,43 @@ struct Ended;
|
||||||
|
|
||||||
impl Taps {
|
impl Taps {
|
||||||
pub fn insert(&mut self, id: usize, tap: Tap) -> Option<Tap> {
|
pub fn insert(&mut self, id: usize, tap: Tap) -> Option<Tap> {
|
||||||
|
debug!("insert id={} tap={:?}", id, tap);
|
||||||
self.by_id.insert(id, tap)
|
self.by_id.insert(id, tap)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn remove(&mut self, id: usize) -> Option<Tap> {
|
pub fn remove(&mut self, id: usize) -> Option<Tap> {
|
||||||
|
debug!("remove id={}", id);
|
||||||
self.by_id.swap_remove(&id)
|
self.by_id.swap_remove(&id)
|
||||||
}
|
}
|
||||||
|
|
||||||
///
|
///
|
||||||
pub(super) fn inspect(&mut self, ev: &Event) {
|
pub(super) fn inspect(&mut self, ev: &Event) {
|
||||||
if !ev.is_http() {
|
if !ev.is_http() || self.by_id.is_empty() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
debug!("inspect taps={:?} event={:?}", self.by_id.keys().collect::<Vec<_>>(), ev);
|
||||||
|
|
||||||
// Iterate through taps by index so that items may be removed.
|
// Iterate through taps by index so that items may be removed.
|
||||||
let mut idx = 0;
|
let mut idx = 0;
|
||||||
while idx < self.by_id.len() {
|
while idx < self.by_id.len() {
|
||||||
let ended = {
|
let (tap_id, inspect) = {
|
||||||
let (_, tap) = self.by_id.get_index(idx).unwrap();
|
let (id, tap) = self.by_id.get_index(idx).unwrap();
|
||||||
tap.inspect(ev).is_err()
|
(*id, tap.inspect(ev))
|
||||||
};
|
};
|
||||||
|
|
||||||
// If the tap is no longer receiving events, remove it. The index is only
|
// If the tap is no longer receiving events, remove it. The index is only
|
||||||
// incremented on successs so that, when an item is removed, the swapped item
|
// incremented on successs so that, when an item is removed, the swapped item
|
||||||
// is inspected on the next iteration OR, if the last item has been removed,
|
// is inspected on the next iteration OR, if the last item has been removed,
|
||||||
// `len()` will return `idx` and a subsequent iteration will not occur.
|
// `len()` will return `idx` and a subsequent iteration will not occur.
|
||||||
if ended {
|
match inspect {
|
||||||
self.by_id.swap_remove_index(idx);
|
Ok(matched) => {
|
||||||
continue;
|
debug!("inspect tap={} match={}", tap_id, matched);
|
||||||
|
}
|
||||||
|
Err(Ended) => {
|
||||||
|
debug!("ended tap={}", tap_id);
|
||||||
|
self.by_id.swap_remove_index(idx);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
idx += 1;
|
idx += 1;
|
||||||
|
@ -76,8 +85,6 @@ impl Tap {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inspect(&self, ev: &Event) -> Result<bool, Ended> {
|
fn inspect(&self, ev: &Event) -> Result<bool, Ended> {
|
||||||
debug!("inspect event={:?} with tap={:?}", ev, self);
|
|
||||||
|
|
||||||
if self.match_.matches(ev) {
|
if self.match_.matches(ev) {
|
||||||
return self.tx
|
return self.tx
|
||||||
.lossy_send(ev.clone())
|
.lossy_send(ev.clone())
|
||||||
|
|
Loading…
Reference in New Issue