reuse temporal metric storage for sync storage (#1369)

This commit is contained in:
Lalit Kumar Bhasin 2022-05-09 15:47:08 -07:00 committed by GitHub
parent 02630e0b6d
commit 54abc2741b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 7 additions and 113 deletions

View File

@ -5,16 +5,14 @@
#ifndef ENABLE_METRICS_PREVIEW #ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/common/key_value_iterable_view.h" # include "opentelemetry/common/key_value_iterable_view.h"
# include "opentelemetry/sdk/common/attributemap_hash.h" # include "opentelemetry/sdk/common/attributemap_hash.h"
# include "opentelemetry/sdk/instrumentationlibrary/instrumentation_library.h"
# include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h" # include "opentelemetry/sdk/metrics/aggregation/default_aggregation.h"
# include "opentelemetry/sdk/metrics/exemplar/reservoir.h" # include "opentelemetry/sdk/metrics/exemplar/reservoir.h"
# include "opentelemetry/sdk/metrics/state/attributes_hashmap.h" # include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
# include "opentelemetry/sdk/metrics/state/metric_collector.h" # include "opentelemetry/sdk/metrics/state/metric_collector.h"
# include "opentelemetry/sdk/metrics/state/metric_storage.h" # include "opentelemetry/sdk/metrics/state/metric_storage.h"
# include "opentelemetry/sdk/metrics/state/temporal_metric_storage.h"
# include "opentelemetry/sdk/metrics/view/attributes_processor.h" # include "opentelemetry/sdk/metrics/view/attributes_processor.h"
# include "opentelemetry/sdk/metrics/view/view.h"
# include "opentelemetry/sdk/resource/resource.h"
# include <list> # include <list>
# include <memory> # include <memory>
@ -24,13 +22,6 @@ namespace sdk
{ {
namespace metrics namespace metrics
{ {
struct LastReportedMetrics
{
std::unique_ptr<AttributesHashMap> attributes_map;
opentelemetry::common::SystemTimestamp collection_ts;
};
class SyncMetricStorage : public MetricStorage, public WritableMetricStorage class SyncMetricStorage : public MetricStorage, public WritableMetricStorage
{ {
@ -43,7 +34,9 @@ public:
aggregation_type_{aggregation_type}, aggregation_type_{aggregation_type},
attributes_hashmap_(new AttributesHashMap()), attributes_hashmap_(new AttributesHashMap()),
attributes_processor_{attributes_processor}, attributes_processor_{attributes_processor},
exemplar_reservoir_(exemplar_reservoir) exemplar_reservoir_(exemplar_reservoir),
temporal_metric_storage_(instrument_descriptor)
{ {
create_default_aggregation_ = [&]() -> std::unique_ptr<Aggregation> { create_default_aggregation_ = [&]() -> std::unique_ptr<Aggregation> {
return std::move( return std::move(
@ -114,14 +107,10 @@ private:
// hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call) // hashmap to maintain the metrics for delta collection (i.e, collection since last Collect call)
std::unique_ptr<AttributesHashMap> attributes_hashmap_; std::unique_ptr<AttributesHashMap> attributes_hashmap_;
// unreported metrics stash for all the collectors
std::unordered_map<CollectorHandle *, std::list<std::shared_ptr<AttributesHashMap>>>
unreported_metrics_;
// last reported metrics stash for all the collectors.
std::unordered_map<CollectorHandle *, LastReportedMetrics> last_reported_metrics_;
const AttributesProcessor *attributes_processor_; const AttributesProcessor *attributes_processor_;
std::function<std::unique_ptr<Aggregation>()> create_default_aggregation_; std::function<std::unique_ptr<Aggregation>()> create_default_aggregation_;
nostd::shared_ptr<ExemplarReservoir> exemplar_reservoir_; nostd::shared_ptr<ExemplarReservoir> exemplar_reservoir_;
TemporalMetricStorage temporal_metric_storage_;
}; };
} // namespace metrics } // namespace metrics

View File

@ -25,104 +25,9 @@ bool SyncMetricStorage::Collect(CollectorHandle *collector,
// recordings // recordings
std::shared_ptr<AttributesHashMap> delta_metrics = std::move(attributes_hashmap_); std::shared_ptr<AttributesHashMap> delta_metrics = std::move(attributes_hashmap_);
attributes_hashmap_.reset(new AttributesHashMap); attributes_hashmap_.reset(new AttributesHashMap);
for (auto &col : collectors)
{
unreported_metrics_[col.get()].push_back(delta_metrics);
}
// Get the unreported metrics for the `collector` from `unreported metrics stash` return temporal_metric_storage_.buildMetrics(collector, collectors, sdk_start_ts, collection_ts,
// since last collection, this will also cleanup the unreported metrics for `collector` std::move(delta_metrics), callback);
// from the stash.
auto present = unreported_metrics_.find(collector);
if (present == unreported_metrics_.end())
{
// no unreported metrics for the collector, return.
return true;
}
auto unreported_list = std::move(present->second);
// Iterate over the unreporter metrics for `collector` and store result in `merged_metrics`
std::unique_ptr<AttributesHashMap> merged_metrics(new AttributesHashMap);
for (auto &agg_hashmap : unreported_list)
{
agg_hashmap->GetAllEnteries([&merged_metrics, this](const MetricAttributes &attributes,
Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, std::move(agg->Merge(aggregation)));
}
else
{
merged_metrics->Set(
attributes,
std::move(
DefaultAggregation::CreateAggregation(instrument_descriptor_)->Merge(aggregation)));
merged_metrics->GetAllEnteries(
[](const MetricAttributes &attr, Aggregation &aggr) { return true; });
}
return true;
});
}
// Get the last reported metrics for the `collector` from `last reported metrics` stash
// - If the aggregation_temporarily for the collector is cumulative
// - Merge the last reported metrics with unreported metrics (which is in merged_metrics),
// Final result of merge would be in merged_metrics.
// - Move the final merge to the `last reported metrics` stash.
// - If the aggregation_temporarily is delta
// - Store the unreported metrics for `collector` (which is in merged_mtrics) to
// `last reported metrics` stash.
auto reported = last_reported_metrics_.find(collector);
if (reported != last_reported_metrics_.end())
{
last_collection_ts = last_reported_metrics_[collector].collection_ts;
auto last_aggr_hashmap = std::move(last_reported_metrics_[collector].attributes_map);
if (aggregation_temporarily == AggregationTemporality::kCumulative)
{
// merge current delta to previous cumulative
last_aggr_hashmap->GetAllEnteries(
[&merged_metrics, this](const MetricAttributes &attributes, Aggregation &aggregation) {
auto agg = merged_metrics->Get(attributes);
if (agg)
{
merged_metrics->Set(attributes, agg->Merge(aggregation));
}
else
{
merged_metrics->Set(attributes,
DefaultAggregation::CreateAggregation(instrument_descriptor_));
}
return true;
});
}
last_reported_metrics_[collector] =
LastReportedMetrics{std::move(merged_metrics), collection_ts};
}
else
{
merged_metrics->GetAllEnteries(
[](const MetricAttributes &attr, Aggregation &aggr) { return true; });
last_reported_metrics_.insert(
std::make_pair(collector, LastReportedMetrics{std::move(merged_metrics), collection_ts}));
}
// Generate the MetricData from the final merged_metrics, and invoke callback over it.
AttributesHashMap *result_to_export = (last_reported_metrics_[collector]).attributes_map.get();
MetricData metric_data;
metric_data.instrument_descriptor = instrument_descriptor_;
metric_data.start_ts = last_collection_ts;
metric_data.end_ts = collection_ts;
result_to_export->GetAllEnteries(
[&metric_data](const MetricAttributes &attributes, Aggregation &aggregation) {
PointDataAttributes point_data_attr;
point_data_attr.point_data = aggregation.ToPoint();
point_data_attr.attributes = attributes;
metric_data.point_data_attr_.push_back(point_data_attr);
return true;
});
return callback(metric_data);
} }
} // namespace metrics } // namespace metrics