Move time from DataPoint to Sum/Gauge (#2377)

This commit is contained in:
Mindaugas Vinkelis 2024-12-10 23:14:58 +02:00 committed by GitHub
parent f513768305
commit d67d1fc558
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 155 additions and 185 deletions

View File

@ -295,8 +295,8 @@ pub mod tonic {
.iter() .iter()
.map(|dp| TonicNumberDataPoint { .map(|dp| TonicNumberDataPoint {
attributes: dp.attributes.iter().map(Into::into).collect(), attributes: dp.attributes.iter().map(Into::into).collect(),
start_time_unix_nano: to_nanos(dp.start_time), start_time_unix_nano: to_nanos(sum.start_time),
time_unix_nano: to_nanos(dp.time), time_unix_nano: to_nanos(sum.time),
exemplars: dp.exemplars.iter().map(Into::into).collect(), exemplars: dp.exemplars.iter().map(Into::into).collect(),
flags: TonicDataPointFlags::default() as u32, flags: TonicDataPointFlags::default() as u32,
value: Some(dp.value.into()), value: Some(dp.value.into()),
@ -319,8 +319,8 @@ pub mod tonic {
.iter() .iter()
.map(|dp| TonicNumberDataPoint { .map(|dp| TonicNumberDataPoint {
attributes: dp.attributes.iter().map(Into::into).collect(), attributes: dp.attributes.iter().map(Into::into).collect(),
start_time_unix_nano: dp.start_time.map(to_nanos).unwrap_or_default(), start_time_unix_nano: gauge.start_time.map(to_nanos).unwrap_or_default(),
time_unix_nano: to_nanos(dp.time), time_unix_nano: to_nanos(gauge.time),
exemplars: dp.exemplars.iter().map(Into::into).collect(), exemplars: dp.exemplars.iter().map(Into::into).collect(),
flags: TonicDataPointFlags::default() as u32, flags: TonicDataPointFlags::default() as u32,
value: Some(dp.value.into()), value: Some(dp.value.into()),

View File

@ -59,10 +59,6 @@ pub struct GaugeDataPoint<T> {
/// Attributes is the set of key value pairs that uniquely identify the /// Attributes is the set of key value pairs that uniquely identify the
/// time series. /// time series.
pub attributes: Vec<KeyValue>, pub attributes: Vec<KeyValue>,
/// The time when the time series was started.
pub start_time: Option<SystemTime>,
/// The time when the time series was recorded.
pub time: SystemTime,
/// The value of this data point. /// The value of this data point.
pub value: T, pub value: T,
/// The sampled [Exemplar]s collected during the time series. /// The sampled [Exemplar]s collected during the time series.
@ -73,8 +69,6 @@ impl<T: Copy> Clone for GaugeDataPoint<T> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
attributes: self.attributes.clone(), attributes: self.attributes.clone(),
start_time: self.start_time,
time: self.time,
value: self.value, value: self.value,
exemplars: self.exemplars.clone(), exemplars: self.exemplars.clone(),
} }
@ -86,6 +80,10 @@ impl<T: Copy> Clone for GaugeDataPoint<T> {
pub struct Gauge<T> { pub struct Gauge<T> {
/// Represents individual aggregated measurements with unique attributes. /// Represents individual aggregated measurements with unique attributes.
pub data_points: Vec<GaugeDataPoint<T>>, pub data_points: Vec<GaugeDataPoint<T>>,
/// The time when the time series was started.
pub start_time: Option<SystemTime>,
/// The time when the time series was recorded.
pub time: SystemTime,
} }
impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Gauge<T> { impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Gauge<T> {
@ -103,10 +101,6 @@ pub struct SumDataPoint<T> {
/// Attributes is the set of key value pairs that uniquely identify the /// Attributes is the set of key value pairs that uniquely identify the
/// time series. /// time series.
pub attributes: Vec<KeyValue>, pub attributes: Vec<KeyValue>,
/// The time when the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,
/// The value of this data point. /// The value of this data point.
pub value: T, pub value: T,
/// The sampled [Exemplar]s collected during the time series. /// The sampled [Exemplar]s collected during the time series.
@ -117,8 +111,6 @@ impl<T: Copy> Clone for SumDataPoint<T> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
attributes: self.attributes.clone(), attributes: self.attributes.clone(),
start_time: self.start_time,
time: self.time,
value: self.value, value: self.value,
exemplars: self.exemplars.clone(), exemplars: self.exemplars.clone(),
} }
@ -130,6 +122,10 @@ impl<T: Copy> Clone for SumDataPoint<T> {
pub struct Sum<T> { pub struct Sum<T> {
/// Represents individual aggregated measurements with unique attributes. /// Represents individual aggregated measurements with unique attributes.
pub data_points: Vec<SumDataPoint<T>>, pub data_points: Vec<SumDataPoint<T>>,
/// The time when the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,
/// Describes if the aggregation is reported as the change from the last report /// Describes if the aggregation is reported as the change from the last report
/// time, or the cumulative changes since a fixed start time. /// time, or the cumulative changes since a fixed start time.
pub temporality: Temporality, pub temporality: Temporality,
@ -366,8 +362,6 @@ mod tests {
fn validate_cloning_data_points() { fn validate_cloning_data_points() {
let data_type = SumDataPoint { let data_type = SumDataPoint {
attributes: vec![KeyValue::new("key", "value")], attributes: vec![KeyValue::new("key", "value")],
start_time: std::time::SystemTime::now(),
time: std::time::SystemTime::now(),
value: 0u32, value: 0u32,
exemplars: vec![Exemplar { exemplars: vec![Exemplar {
filtered_attributes: vec![], filtered_attributes: vec![],

View File

@ -2,10 +2,7 @@ use std::{marker, sync::Arc};
use opentelemetry::KeyValue; use opentelemetry::KeyValue;
use crate::metrics::{ use crate::metrics::{data::Aggregation, Temporality};
data::{Aggregation, Gauge},
Temporality,
};
use super::{ use super::{
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue, exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
@ -99,31 +96,15 @@ impl<T: Number> AggregateBuilder<T> {
/// Builds a last-value aggregate function input and output. /// Builds a last-value aggregate function input and output.
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) { pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
let lv_filter = Arc::new(LastValue::new()); let lv = Arc::new(LastValue::new());
let lv_agg = Arc::clone(&lv_filter); let agg_lv = Arc::clone(&lv);
let t = self.temporality; let t = self.temporality;
( (
self.filter(move |n, a: &[KeyValue]| lv_filter.measure(n, a)), self.filter(move |n, a: &[KeyValue]| lv.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| { move |dest: Option<&mut dyn Aggregation>| match t {
let g = dest.and_then(|d| d.as_mut().downcast_mut::<Gauge<T>>()); Some(Temporality::Delta) => agg_lv.delta(dest),
let mut new_agg = if g.is_none() { _ => agg_lv.cumulative(dest),
Some(Gauge {
data_points: vec![],
})
} else {
None
};
let g = g.unwrap_or_else(|| new_agg.as_mut().expect("present if g is none"));
match t {
Some(Temporality::Delta) => {
lv_agg.compute_aggregation_delta(&mut g.data_points)
}
_ => lv_agg.compute_aggregation_cumulative(&mut g.data_points),
}
(g.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
}, },
) )
} }
@ -211,8 +192,8 @@ impl<T: Number> AggregateBuilder<T> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::metrics::data::{ use crate::metrics::data::{
ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, GaugeDataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, Gauge,
Histogram, HistogramDataPoint, Sum, SumDataPoint, GaugeDataPoint, Histogram, HistogramDataPoint, Sum, SumDataPoint,
}; };
use std::{time::SystemTime, vec}; use std::{time::SystemTime, vec};
@ -224,11 +205,11 @@ mod tests {
let mut a = Gauge { let mut a = Gauge {
data_points: vec![GaugeDataPoint { data_points: vec![GaugeDataPoint {
attributes: vec![KeyValue::new("a", 1)], attributes: vec![KeyValue::new("a", 1)],
start_time: Some(SystemTime::now()),
time: SystemTime::now(),
value: 1u64, value: 1u64,
exemplars: vec![], exemplars: vec![],
}], }],
start_time: Some(SystemTime::now()),
time: SystemTime::now(),
}; };
let new_attributes = [KeyValue::new("b", 2)]; let new_attributes = [KeyValue::new("b", 2)];
measure.call(2, &new_attributes[..]); measure.call(2, &new_attributes[..]);
@ -251,19 +232,17 @@ mod tests {
data_points: vec![ data_points: vec![
SumDataPoint { SumDataPoint {
attributes: vec![KeyValue::new("a1", 1)], attributes: vec![KeyValue::new("a1", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 1u64, value: 1u64,
exemplars: vec![], exemplars: vec![],
}, },
SumDataPoint { SumDataPoint {
attributes: vec![KeyValue::new("a2", 1)], attributes: vec![KeyValue::new("a2", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 2u64, value: 2u64,
exemplars: vec![], exemplars: vec![],
}, },
], ],
start_time: SystemTime::now(),
time: SystemTime::now(),
temporality: if temporality == Temporality::Delta { temporality: if temporality == Temporality::Delta {
Temporality::Cumulative Temporality::Cumulative
} else { } else {
@ -294,19 +273,17 @@ mod tests {
data_points: vec![ data_points: vec![
SumDataPoint { SumDataPoint {
attributes: vec![KeyValue::new("a1", 1)], attributes: vec![KeyValue::new("a1", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 1u64, value: 1u64,
exemplars: vec![], exemplars: vec![],
}, },
SumDataPoint { SumDataPoint {
attributes: vec![KeyValue::new("a2", 1)], attributes: vec![KeyValue::new("a2", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 2u64, value: 2u64,
exemplars: vec![], exemplars: vec![],
}, },
], ],
start_time: SystemTime::now(),
time: SystemTime::now(),
temporality: if temporality == Temporality::Delta { temporality: if temporality == Temporality::Delta {
Temporality::Cumulative Temporality::Cumulative
} else { } else {

View File

@ -1440,7 +1440,7 @@ mod tests {
count = out_fn.call(Some(got.as_mut())).0 count = out_fn.call(Some(got.as_mut())).0
} }
assert_aggregation_eq::<T>(Box::new(test.want), got, true, test.name); assert_aggregation_eq::<T>(Box::new(test.want), got, test.name);
assert_eq!(test.want_count, count, "{}", test.name); assert_eq!(test.want_count, count, "{}", test.name);
} }
} }
@ -1448,7 +1448,6 @@ mod tests {
fn assert_aggregation_eq<T: Number + PartialEq>( fn assert_aggregation_eq<T: Number + PartialEq>(
a: Box<dyn Aggregation>, a: Box<dyn Aggregation>,
b: Box<dyn Aggregation>, b: Box<dyn Aggregation>,
ignore_timestamp: bool,
test_name: &'static str, test_name: &'static str,
) { ) {
assert_eq!( assert_eq!(
@ -1467,13 +1466,7 @@ mod tests {
test_name test_name
); );
for (a, b) in a.data_points.iter().zip(b.data_points.iter()) { for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
assert_gauge_data_points_eq( assert_gauge_data_points_eq(a, b, "mismatching gauge data points", test_name);
a,
b,
ignore_timestamp,
"mismatching gauge data points",
test_name,
);
} }
} else if let Some(a) = a.as_any().downcast_ref::<data::Sum<T>>() { } else if let Some(a) = a.as_any().downcast_ref::<data::Sum<T>>() {
let b = b.as_any().downcast_ref::<data::Sum<T>>().unwrap(); let b = b.as_any().downcast_ref::<data::Sum<T>>().unwrap();
@ -1494,13 +1487,7 @@ mod tests {
test_name test_name
); );
for (a, b) in a.data_points.iter().zip(b.data_points.iter()) { for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
assert_sum_data_points_eq( assert_sum_data_points_eq(a, b, "mismatching sum data points", test_name);
a,
b,
ignore_timestamp,
"mismatching sum data points",
test_name,
);
} }
} else if let Some(a) = a.as_any().downcast_ref::<data::Histogram<T>>() { } else if let Some(a) = a.as_any().downcast_ref::<data::Histogram<T>>() {
let b = b.as_any().downcast_ref::<data::Histogram<T>>().unwrap(); let b = b.as_any().downcast_ref::<data::Histogram<T>>().unwrap();
@ -1516,13 +1503,7 @@ mod tests {
test_name test_name
); );
for (a, b) in a.data_points.iter().zip(b.data_points.iter()) { for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
assert_hist_data_points_eq( assert_hist_data_points_eq(a, b, "mismatching hist data points", test_name);
a,
b,
ignore_timestamp,
"mismatching hist data points",
test_name,
);
} }
} else if let Some(a) = a.as_any().downcast_ref::<data::ExponentialHistogram<T>>() { } else if let Some(a) = a.as_any().downcast_ref::<data::ExponentialHistogram<T>>() {
let b = b let b = b
@ -1544,7 +1525,6 @@ mod tests {
assert_exponential_hist_data_points_eq( assert_exponential_hist_data_points_eq(
a, a,
b, b,
ignore_timestamp,
"mismatching hist data points", "mismatching hist data points",
test_name, test_name,
); );
@ -1557,7 +1537,6 @@ mod tests {
fn assert_sum_data_points_eq<T: Number>( fn assert_sum_data_points_eq<T: Number>(
a: &data::SumDataPoint<T>, a: &data::SumDataPoint<T>,
b: &data::SumDataPoint<T>, b: &data::SumDataPoint<T>,
ignore_timestamp: bool,
message: &'static str, message: &'static str,
test_name: &'static str, test_name: &'static str,
) { ) {
@ -1567,21 +1546,11 @@ mod tests {
test_name, message test_name, message
); );
assert_eq!(a.value, b.value, "{}: {} value", test_name, message); assert_eq!(a.value, b.value, "{}: {} value", test_name, message);
if !ignore_timestamp {
assert_eq!(
a.start_time, b.start_time,
"{}: {} start time",
test_name, message
);
assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
}
} }
fn assert_gauge_data_points_eq<T: Number>( fn assert_gauge_data_points_eq<T: Number>(
a: &data::GaugeDataPoint<T>, a: &data::GaugeDataPoint<T>,
b: &data::GaugeDataPoint<T>, b: &data::GaugeDataPoint<T>,
ignore_timestamp: bool,
message: &'static str, message: &'static str,
test_name: &'static str, test_name: &'static str,
) { ) {
@ -1591,21 +1560,11 @@ mod tests {
test_name, message test_name, message
); );
assert_eq!(a.value, b.value, "{}: {} value", test_name, message); assert_eq!(a.value, b.value, "{}: {} value", test_name, message);
if !ignore_timestamp {
assert_eq!(
a.start_time, b.start_time,
"{}: {} start time",
test_name, message
);
assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
}
} }
fn assert_hist_data_points_eq<T: Number>( fn assert_hist_data_points_eq<T: Number>(
a: &data::HistogramDataPoint<T>, a: &data::HistogramDataPoint<T>,
b: &data::HistogramDataPoint<T>, b: &data::HistogramDataPoint<T>,
ignore_timestamp: bool,
message: &'static str, message: &'static str,
test_name: &'static str, test_name: &'static str,
) { ) {
@ -1624,21 +1583,11 @@ mod tests {
assert_eq!(a.min, b.min, "{}: {} min", test_name, message); assert_eq!(a.min, b.min, "{}: {} min", test_name, message);
assert_eq!(a.max, b.max, "{}: {} max", test_name, message); assert_eq!(a.max, b.max, "{}: {} max", test_name, message);
assert_eq!(a.sum, b.sum, "{}: {} sum", test_name, message); assert_eq!(a.sum, b.sum, "{}: {} sum", test_name, message);
if !ignore_timestamp {
assert_eq!(
a.start_time, b.start_time,
"{}: {} start time",
test_name, message
);
assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
}
} }
fn assert_exponential_hist_data_points_eq<T: Number>( fn assert_exponential_hist_data_points_eq<T: Number>(
a: &data::ExponentialHistogramDataPoint<T>, a: &data::ExponentialHistogramDataPoint<T>,
b: &data::ExponentialHistogramDataPoint<T>, b: &data::ExponentialHistogramDataPoint<T>,
ignore_timestamp: bool,
message: &'static str, message: &'static str,
test_name: &'static str, test_name: &'static str,
) { ) {
@ -1669,14 +1618,5 @@ mod tests {
"{}: {} neg", "{}: {} neg",
test_name, message test_name, message
); );
if !ignore_timestamp {
assert_eq!(
a.start_time, b.start_time,
"{}: {} start time",
test_name, message
);
assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
}
} }
} }

View File

@ -1,6 +1,6 @@
use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime}; use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime};
use crate::metrics::data::GaugeDataPoint; use crate::metrics::data::{self, Aggregation, GaugeDataPoint};
use opentelemetry::KeyValue; use opentelemetry::KeyValue;
use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap}; use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap};
@ -56,33 +56,75 @@ impl<T: Number> LastValue<T> {
self.value_map.measure(measurement, attrs); self.value_map.measure(measurement, attrs);
} }
pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec<GaugeDataPoint<T>>) { pub(crate) fn delta(
let t = SystemTime::now(); &self,
let prev_start = self dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let time = SystemTime::now();
let start_time = self
.start .start
.lock() .lock()
.map(|mut start| replace(start.deref_mut(), t)) .map(|mut start| replace(start.deref_mut(), time))
.unwrap_or(t); .unwrap_or(time);
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Gauge<T>>());
let mut new_agg = if s_data.is_none() {
Some(data::Gauge {
data_points: vec![],
start_time: Some(start_time),
time,
})
} else {
None
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
s_data.start_time = Some(start_time);
s_data.time = time;
self.value_map self.value_map
.collect_and_reset(dest, |attributes, aggr| GaugeDataPoint { .collect_and_reset(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
attributes, attributes,
start_time: Some(prev_start),
time: t,
value: aggr.value.get_value(), value: aggr.value.get_value(),
exemplars: vec![], exemplars: vec![],
}); });
(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
} }
pub(crate) fn compute_aggregation_cumulative(&self, dest: &mut Vec<GaugeDataPoint<T>>) { pub(crate) fn cumulative(
let t = SystemTime::now(); &self,
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let time = SystemTime::now();
let start_time = self.start.lock().map(|start| *start).unwrap_or(time);
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Gauge<T>>());
let mut new_agg = if s_data.is_none() {
Some(data::Gauge {
data_points: vec![],
start_time: Some(start_time),
time,
})
} else {
None
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
s_data.start_time = Some(start_time);
s_data.time = time;
self.value_map self.value_map
.collect_readonly(dest, |attributes, aggr| GaugeDataPoint { .collect_readonly(&mut s_data.data_points, |attributes, aggr| GaugeDataPoint {
attributes, attributes,
start_time: Some(prev_start),
time: t,
value: aggr.value.get_value(), value: aggr.value.get_value(),
exemplars: vec![], exemplars: vec![],
}); });
(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
} }
} }

View File

@ -33,12 +33,19 @@ impl<T: Number> PrecomputedSum<T> {
&self, &self,
dest: Option<&mut dyn Aggregation>, dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) { ) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now(); let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), time))
.unwrap_or(time);
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>()); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
let mut new_agg = if s_data.is_none() { let mut new_agg = if s_data.is_none() {
Some(data::Sum { Some(data::Sum {
data_points: vec![], data_points: vec![],
start_time,
time,
temporality: Temporality::Delta, temporality: Temporality::Delta,
is_monotonic: self.monotonic, is_monotonic: self.monotonic,
}) })
@ -46,15 +53,11 @@ impl<T: Number> PrecomputedSum<T> {
None None
}; };
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
s_data.start_time = start_time;
s_data.time = time;
s_data.temporality = Temporality::Delta; s_data.temporality = Temporality::Delta;
s_data.is_monotonic = self.monotonic; s_data.is_monotonic = self.monotonic;
let prev_start = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);
let mut reported = match self.reported.lock() { let mut reported = match self.reported.lock() {
Ok(r) => r, Ok(r) => r,
Err(_) => return (0, None), Err(_) => return (0, None),
@ -68,8 +71,6 @@ impl<T: Number> PrecomputedSum<T> {
let delta = value - *reported.get(&attributes).unwrap_or(&T::default()); let delta = value - *reported.get(&attributes).unwrap_or(&T::default());
SumDataPoint { SumDataPoint {
attributes, attributes,
start_time: prev_start,
time: t,
value: delta, value: delta,
exemplars: vec![], exemplars: vec![],
} }
@ -88,12 +89,15 @@ impl<T: Number> PrecomputedSum<T> {
&self, &self,
dest: Option<&mut dyn Aggregation>, dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) { ) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now(); let time = SystemTime::now();
let start_time = self.start.lock().map(|start| *start).unwrap_or(time);
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>()); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
let mut new_agg = if s_data.is_none() { let mut new_agg = if s_data.is_none() {
Some(data::Sum { Some(data::Sum {
data_points: vec![], data_points: vec![],
start_time,
time,
temporality: Temporality::Cumulative, temporality: Temporality::Cumulative,
is_monotonic: self.monotonic, is_monotonic: self.monotonic,
}) })
@ -101,16 +105,14 @@ impl<T: Number> PrecomputedSum<T> {
None None
}; };
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
s_data.start_time = start_time;
s_data.time = time;
s_data.temporality = Temporality::Cumulative; s_data.temporality = Temporality::Cumulative;
s_data.is_monotonic = self.monotonic; s_data.is_monotonic = self.monotonic;
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
self.value_map self.value_map
.collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint { .collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
attributes, attributes,
start_time: prev_start,
time: t,
value: aggr.value.get_value(), value: aggr.value.get_value(),
exemplars: vec![], exemplars: vec![],
}); });

View File

@ -71,12 +71,19 @@ impl<T: Number> Sum<T> {
&self, &self,
dest: Option<&mut dyn Aggregation>, dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) { ) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now(); let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), time))
.unwrap_or(time);
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>()); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
let mut new_agg = if s_data.is_none() { let mut new_agg = if s_data.is_none() {
Some(data::Sum { Some(data::Sum {
data_points: vec![], data_points: vec![],
start_time,
time,
temporality: Temporality::Delta, temporality: Temporality::Delta,
is_monotonic: self.monotonic, is_monotonic: self.monotonic,
}) })
@ -84,19 +91,14 @@ impl<T: Number> Sum<T> {
None None
}; };
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
s_data.start_time = start_time;
s_data.time = time;
s_data.temporality = Temporality::Delta; s_data.temporality = Temporality::Delta;
s_data.is_monotonic = self.monotonic; s_data.is_monotonic = self.monotonic;
let prev_start = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);
self.value_map self.value_map
.collect_and_reset(&mut s_data.data_points, |attributes, aggr| SumDataPoint { .collect_and_reset(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
attributes, attributes,
start_time: prev_start,
time: t,
value: aggr.value.get_value(), value: aggr.value.get_value(),
exemplars: vec![], exemplars: vec![],
}); });
@ -111,12 +113,14 @@ impl<T: Number> Sum<T> {
&self, &self,
dest: Option<&mut dyn Aggregation>, dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) { ) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now(); let time = SystemTime::now();
let start_time = self.start.lock().map(|start| *start).unwrap_or(time);
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>()); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
let mut new_agg = if s_data.is_none() { let mut new_agg = if s_data.is_none() {
Some(data::Sum { Some(data::Sum {
data_points: vec![], data_points: vec![],
start_time,
time,
temporality: Temporality::Cumulative, temporality: Temporality::Cumulative,
is_monotonic: self.monotonic, is_monotonic: self.monotonic,
}) })
@ -124,16 +128,15 @@ impl<T: Number> Sum<T> {
None None
}; };
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
s_data.start_time = start_time;
s_data.time = time;
s_data.temporality = Temporality::Cumulative; s_data.temporality = Temporality::Cumulative;
s_data.is_monotonic = self.monotonic; s_data.is_monotonic = self.monotonic;
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
self.value_map self.value_map
.collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint { .collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
attributes, attributes,
start_time: prev_start,
time: t,
value: aggr.value.get_value(), value: aggr.value.get_value(),
exemplars: vec![], exemplars: vec![],
}); });

View File

@ -210,32 +210,44 @@ impl InMemoryMetricExporter {
} else if let Some(sum) = data.as_any().downcast_ref::<data::Sum<i64>>() { } else if let Some(sum) = data.as_any().downcast_ref::<data::Sum<i64>>() {
Some(Box::new(data::Sum { Some(Box::new(data::Sum {
data_points: sum.data_points.clone(), data_points: sum.data_points.clone(),
start_time: sum.start_time,
time: sum.time,
temporality: sum.temporality, temporality: sum.temporality,
is_monotonic: sum.is_monotonic, is_monotonic: sum.is_monotonic,
})) }))
} else if let Some(sum) = data.as_any().downcast_ref::<data::Sum<f64>>() { } else if let Some(sum) = data.as_any().downcast_ref::<data::Sum<f64>>() {
Some(Box::new(data::Sum { Some(Box::new(data::Sum {
data_points: sum.data_points.clone(), data_points: sum.data_points.clone(),
start_time: sum.start_time,
time: sum.time,
temporality: sum.temporality, temporality: sum.temporality,
is_monotonic: sum.is_monotonic, is_monotonic: sum.is_monotonic,
})) }))
} else if let Some(sum) = data.as_any().downcast_ref::<data::Sum<u64>>() { } else if let Some(sum) = data.as_any().downcast_ref::<data::Sum<u64>>() {
Some(Box::new(data::Sum { Some(Box::new(data::Sum {
data_points: sum.data_points.clone(), data_points: sum.data_points.clone(),
start_time: sum.start_time,
time: sum.time,
temporality: sum.temporality, temporality: sum.temporality,
is_monotonic: sum.is_monotonic, is_monotonic: sum.is_monotonic,
})) }))
} else if let Some(gauge) = data.as_any().downcast_ref::<data::Gauge<i64>>() { } else if let Some(gauge) = data.as_any().downcast_ref::<data::Gauge<i64>>() {
Some(Box::new(data::Gauge { Some(Box::new(data::Gauge {
data_points: gauge.data_points.clone(), data_points: gauge.data_points.clone(),
start_time: gauge.start_time,
time: gauge.time,
})) }))
} else if let Some(gauge) = data.as_any().downcast_ref::<data::Gauge<f64>>() { } else if let Some(gauge) = data.as_any().downcast_ref::<data::Gauge<f64>>() {
Some(Box::new(data::Gauge { Some(Box::new(data::Gauge {
data_points: gauge.data_points.clone(), data_points: gauge.data_points.clone(),
start_time: gauge.start_time,
time: gauge.time,
})) }))
} else if let Some(gauge) = data.as_any().downcast_ref::<data::Gauge<u64>>() { } else if let Some(gauge) = data.as_any().downcast_ref::<data::Gauge<u64>>() {
Some(Box::new(data::Gauge { Some(Box::new(data::Gauge {
data_points: gauge.data_points.clone(), data_points: gauge.data_points.clone(),
start_time: gauge.start_time,
time: gauge.time,
})) }))
} else { } else {
// unknown data type // unknown data type

View File

@ -142,11 +142,33 @@ fn print_sum<T: Debug>(sum: &data::Sum<T>) {
} else { } else {
println!("\t\tTemporality : Delta"); println!("\t\tTemporality : Delta");
} }
let datetime: DateTime<Utc> = sum.start_time.into();
println!(
"\t\tStartTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
let datetime: DateTime<Utc> = sum.time.into();
println!(
"\t\tEndTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
print_sum_data_points(&sum.data_points); print_sum_data_points(&sum.data_points);
} }
fn print_gauge<T: Debug>(gauge: &data::Gauge<T>) { fn print_gauge<T: Debug>(gauge: &data::Gauge<T>) {
println!("\t\tGauge DataPoints"); println!("\t\tGauge DataPoints");
if let Some(start_time) = gauge.start_time {
let datetime: DateTime<Utc> = start_time.into();
println!(
"\t\t\tStartTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
}
let datetime: DateTime<Utc> = gauge.time.into();
println!(
"\t\t\tEndTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
print_gauge_data_points(&gauge.data_points); print_gauge_data_points(&gauge.data_points);
} }
@ -163,16 +185,6 @@ fn print_histogram<T: Debug>(histogram: &data::Histogram<T>) {
fn print_sum_data_points<T: Debug>(data_points: &[data::SumDataPoint<T>]) { fn print_sum_data_points<T: Debug>(data_points: &[data::SumDataPoint<T>]) {
for (i, data_point) in data_points.iter().enumerate() { for (i, data_point) in data_points.iter().enumerate() {
println!("\t\tDataPoint #{}", i); println!("\t\tDataPoint #{}", i);
let datetime: DateTime<Utc> = data_point.start_time.into();
println!(
"\t\t\tStartTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
let datetime: DateTime<Utc> = data_point.time.into();
println!(
"\t\t\tEndTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
println!("\t\t\tValue : {:#?}", data_point.value); println!("\t\t\tValue : {:#?}", data_point.value);
println!("\t\t\tAttributes :"); println!("\t\t\tAttributes :");
for kv in data_point.attributes.iter() { for kv in data_point.attributes.iter() {
@ -184,18 +196,6 @@ fn print_sum_data_points<T: Debug>(data_points: &[data::SumDataPoint<T>]) {
fn print_gauge_data_points<T: Debug>(data_points: &[data::GaugeDataPoint<T>]) { fn print_gauge_data_points<T: Debug>(data_points: &[data::GaugeDataPoint<T>]) {
for (i, data_point) in data_points.iter().enumerate() { for (i, data_point) in data_points.iter().enumerate() {
println!("\t\tDataPoint #{}", i); println!("\t\tDataPoint #{}", i);
if let Some(start_time) = data_point.start_time {
let datetime: DateTime<Utc> = start_time.into();
println!(
"\t\t\tStartTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
}
let datetime: DateTime<Utc> = data_point.time.into();
println!(
"\t\t\tEndTime : {}",
datetime.format("%Y-%m-%d %H:%M:%S%.6f")
);
println!("\t\t\tValue : {:#?}", data_point.value); println!("\t\t\tValue : {:#?}", data_point.value);
println!("\t\t\tAttributes :"); println!("\t\t\tAttributes :");
for kv in data_point.attributes.iter() { for kv in data_point.attributes.iter() {