fix(ext/prometheus): support minmaxsumcount aggregator (#945)
This commit is contained in:
parent
b090a6bd49
commit
6e3965be16
|
|
@ -67,22 +67,22 @@ API
|
||||||
import collections
|
import collections
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
from typing import Sequence
|
from typing import Iterable, Optional, Sequence, Union
|
||||||
|
|
||||||
from prometheus_client import start_http_server
|
|
||||||
from prometheus_client.core import (
|
from prometheus_client.core import (
|
||||||
REGISTRY,
|
REGISTRY,
|
||||||
CollectorRegistry,
|
|
||||||
CounterMetricFamily,
|
CounterMetricFamily,
|
||||||
|
SummaryMetricFamily,
|
||||||
UnknownMetricFamily,
|
UnknownMetricFamily,
|
||||||
)
|
)
|
||||||
|
|
||||||
from opentelemetry.metrics import Counter, Metric, ValueRecorder
|
from opentelemetry.metrics import Counter, ValueRecorder
|
||||||
from opentelemetry.sdk.metrics.export import (
|
from opentelemetry.sdk.metrics.export import (
|
||||||
MetricRecord,
|
MetricRecord,
|
||||||
MetricsExporter,
|
MetricsExporter,
|
||||||
MetricsExportResult,
|
MetricsExportResult,
|
||||||
)
|
)
|
||||||
|
from opentelemetry.sdk.metrics.export.aggregate import MinMaxSumCountAggregator
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
@ -110,7 +110,7 @@ class PrometheusMetricsExporter(MetricsExporter):
|
||||||
|
|
||||||
|
|
||||||
class CustomCollector:
|
class CustomCollector:
|
||||||
""" CustomCollector represents the Prometheus Collector object
|
"""CustomCollector represents the Prometheus Collector object
|
||||||
https://github.com/prometheus/client_python#custom-collectors
|
https://github.com/prometheus/client_python#custom-collectors
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
@ -121,7 +121,7 @@ class CustomCollector:
|
||||||
r"[^\w]", re.UNICODE | re.IGNORECASE
|
r"[^\w]", re.UNICODE | re.IGNORECASE
|
||||||
)
|
)
|
||||||
|
|
||||||
def add_metrics_data(self, metric_records: Sequence[MetricRecord]):
|
def add_metrics_data(self, metric_records: Sequence[MetricRecord]) -> None:
|
||||||
self._metrics_to_export.append(metric_records)
|
self._metrics_to_export.append(metric_records)
|
||||||
|
|
||||||
def collect(self):
|
def collect(self):
|
||||||
|
|
@ -152,25 +152,35 @@ class CustomCollector:
|
||||||
metric_name = self._prefix + "_"
|
metric_name = self._prefix + "_"
|
||||||
metric_name += self._sanitize(metric_record.instrument.name)
|
metric_name += self._sanitize(metric_record.instrument.name)
|
||||||
|
|
||||||
|
description = getattr(metric_record.instrument, "description", "")
|
||||||
if isinstance(metric_record.instrument, Counter):
|
if isinstance(metric_record.instrument, Counter):
|
||||||
prometheus_metric = CounterMetricFamily(
|
prometheus_metric = CounterMetricFamily(
|
||||||
name=metric_name,
|
name=metric_name, documentation=description, labels=label_keys
|
||||||
documentation=metric_record.instrument.description,
|
|
||||||
labels=label_keys,
|
|
||||||
)
|
)
|
||||||
prometheus_metric.add_metric(
|
prometheus_metric.add_metric(
|
||||||
labels=label_values, value=metric_record.aggregator.checkpoint
|
labels=label_values, value=metric_record.aggregator.checkpoint
|
||||||
)
|
)
|
||||||
# TODO: Add support for histograms when supported in OT
|
# TODO: Add support for histograms when supported in OT
|
||||||
elif isinstance(metric_record.instrument, ValueRecorder):
|
elif isinstance(metric_record.instrument, ValueRecorder):
|
||||||
prometheus_metric = UnknownMetricFamily(
|
value = metric_record.aggregator.checkpoint
|
||||||
|
if isinstance(metric_record.aggregator, MinMaxSumCountAggregator):
|
||||||
|
prometheus_metric = SummaryMetricFamily(
|
||||||
name=metric_name,
|
name=metric_name,
|
||||||
documentation=metric_record.instrument.description,
|
documentation=description,
|
||||||
labels=label_keys,
|
labels=label_keys,
|
||||||
)
|
)
|
||||||
prometheus_metric.add_metric(
|
prometheus_metric.add_metric(
|
||||||
labels=label_values, value=metric_record.aggregator.checkpoint
|
labels=label_values,
|
||||||
|
count_value=value.count,
|
||||||
|
sum_value=value.sum,
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
prometheus_metric = UnknownMetricFamily(
|
||||||
|
name=metric_name,
|
||||||
|
documentation=description,
|
||||||
|
labels=label_keys,
|
||||||
|
)
|
||||||
|
prometheus_metric.add_metric(labels=label_values, value=value)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
|
|
@ -178,8 +188,8 @@ class CustomCollector:
|
||||||
)
|
)
|
||||||
return prometheus_metric
|
return prometheus_metric
|
||||||
|
|
||||||
def _sanitize(self, key):
|
def _sanitize(self, key: str) -> str:
|
||||||
""" sanitize the given metric name or label according to Prometheus rule.
|
"""sanitize the given metric name or label according to Prometheus rule.
|
||||||
Replace all characters other than [A-Za-z0-9_] with '_'.
|
Replace all characters other than [A-Za-z0-9_] with '_'.
|
||||||
"""
|
"""
|
||||||
return self._non_letters_nor_digits_re.sub("_", key)
|
return self._non_letters_nor_digits_re.sub("_", key)
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@
|
||||||
import unittest
|
import unittest
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
|
from prometheus_client import generate_latest
|
||||||
from prometheus_client.core import CounterMetricFamily
|
from prometheus_client.core import CounterMetricFamily
|
||||||
|
|
||||||
from opentelemetry.exporter.prometheus import (
|
from opentelemetry.exporter.prometheus import (
|
||||||
|
|
@ -24,7 +25,11 @@ from opentelemetry.exporter.prometheus import (
|
||||||
from opentelemetry.metrics import get_meter_provider, set_meter_provider
|
from opentelemetry.metrics import get_meter_provider, set_meter_provider
|
||||||
from opentelemetry.sdk import metrics
|
from opentelemetry.sdk import metrics
|
||||||
from opentelemetry.sdk.metrics.export import MetricRecord, MetricsExportResult
|
from opentelemetry.sdk.metrics.export import MetricRecord, MetricsExportResult
|
||||||
from opentelemetry.sdk.metrics.export.aggregate import SumAggregator
|
from opentelemetry.sdk.metrics.export.aggregate import (
|
||||||
|
MinMaxSumCountAggregator,
|
||||||
|
SumAggregator,
|
||||||
|
)
|
||||||
|
from opentelemetry.sdk.util import get_dict_as_key
|
||||||
|
|
||||||
|
|
||||||
class TestPrometheusMetricExporter(unittest.TestCase):
|
class TestPrometheusMetricExporter(unittest.TestCase):
|
||||||
|
|
@ -35,7 +40,7 @@ class TestPrometheusMetricExporter(unittest.TestCase):
|
||||||
"testname", "testdesc", "unit", int, metrics.Counter,
|
"testname", "testdesc", "unit", int, metrics.Counter,
|
||||||
)
|
)
|
||||||
labels = {"environment": "staging"}
|
labels = {"environment": "staging"}
|
||||||
self._labels_key = metrics.get_dict_as_key(labels)
|
self._labels_key = get_dict_as_key(labels)
|
||||||
|
|
||||||
self._mock_registry_register = mock.Mock()
|
self._mock_registry_register = mock.Mock()
|
||||||
self._registry_register_patch = mock.patch(
|
self._registry_register_patch = mock.patch(
|
||||||
|
|
@ -70,13 +75,32 @@ class TestPrometheusMetricExporter(unittest.TestCase):
|
||||||
self.assertEqual(len(exporter._collector._metrics_to_export), 1)
|
self.assertEqual(len(exporter._collector._metrics_to_export), 1)
|
||||||
self.assertIs(result, MetricsExportResult.SUCCESS)
|
self.assertIs(result, MetricsExportResult.SUCCESS)
|
||||||
|
|
||||||
|
def test_min_max_sum_aggregator_to_prometheus(self):
|
||||||
|
meter = get_meter_provider().get_meter(__name__)
|
||||||
|
metric = meter.create_metric(
|
||||||
|
"test@name", "testdesc", "unit", int, metrics.ValueRecorder, []
|
||||||
|
)
|
||||||
|
labels = {}
|
||||||
|
key_labels = get_dict_as_key(labels)
|
||||||
|
aggregator = MinMaxSumCountAggregator()
|
||||||
|
aggregator.update(123)
|
||||||
|
aggregator.update(456)
|
||||||
|
aggregator.take_checkpoint()
|
||||||
|
record = MetricRecord(metric, key_labels, aggregator)
|
||||||
|
collector = CustomCollector("testprefix")
|
||||||
|
collector.add_metrics_data([record])
|
||||||
|
result_bytes = generate_latest(collector)
|
||||||
|
result = result_bytes.decode("utf-8")
|
||||||
|
self.assertIn("testprefix_test_name_count 2.0", result)
|
||||||
|
self.assertIn("testprefix_test_name_sum 579.0", result)
|
||||||
|
|
||||||
def test_counter_to_prometheus(self):
|
def test_counter_to_prometheus(self):
|
||||||
meter = get_meter_provider().get_meter(__name__)
|
meter = get_meter_provider().get_meter(__name__)
|
||||||
metric = meter.create_metric(
|
metric = meter.create_metric(
|
||||||
"test@name", "testdesc", "unit", int, metrics.Counter,
|
"test@name", "testdesc", "unit", int, metrics.Counter,
|
||||||
)
|
)
|
||||||
labels = {"environment@": "staging", "os": "Windows"}
|
labels = {"environment@": "staging", "os": "Windows"}
|
||||||
key_labels = metrics.get_dict_as_key(labels)
|
key_labels = get_dict_as_key(labels)
|
||||||
aggregator = SumAggregator()
|
aggregator = SumAggregator()
|
||||||
aggregator.update(123)
|
aggregator.update(123)
|
||||||
aggregator.take_checkpoint()
|
aggregator.take_checkpoint()
|
||||||
|
|
@ -107,7 +131,7 @@ class TestPrometheusMetricExporter(unittest.TestCase):
|
||||||
"tesname", "testdesc", "unit", int, StubMetric
|
"tesname", "testdesc", "unit", int, StubMetric
|
||||||
)
|
)
|
||||||
labels = {"environment": "staging"}
|
labels = {"environment": "staging"}
|
||||||
key_labels = metrics.get_dict_as_key(labels)
|
key_labels = get_dict_as_key(labels)
|
||||||
record = MetricRecord(metric, key_labels, None)
|
record = MetricRecord(metric, key_labels, None)
|
||||||
collector = CustomCollector("testprefix")
|
collector = CustomCollector("testprefix")
|
||||||
collector.add_metrics_data([record])
|
collector.add_metrics_data([record])
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue