Hide OpenCensus reference from public APIs in obsreport package (#3253)

This commit is contained in:
Min Xia 2021-05-26 11:04:00 -07:00 committed by GitHub
parent 5bf7a5469b
commit f72eab07be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 687 additions and 518 deletions

View File

@ -30,8 +30,8 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
)
@ -222,7 +222,7 @@ func checkWrapSpanForLogsExporter(t *testing.T, le component.LogsExporter, wantE
sentLogRecords = 0
failedToSendLogRecords = numLogRecords
}
require.Equalf(t, sentLogRecords, sd.Attributes[obsreport.SentLogRecordsKey], "SpanData %v", sd)
require.Equalf(t, failedToSendLogRecords, sd.Attributes[obsreport.FailedToSendLogRecordsKey], "SpanData %v", sd)
require.Equalf(t, sentLogRecords, sd.Attributes[obsmetrics.SentLogRecordsKey], "SpanData %v", sd)
require.Equalf(t, failedToSendLogRecords, sd.Attributes[obsmetrics.FailedToSendLogRecordsKey], "SpanData %v", sd)
}
}

View File

@ -30,8 +30,8 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
)
@ -246,7 +246,7 @@ func checkWrapSpanForMetricsExporter(t *testing.T, me component.MetricsExporter,
sentMetricPoints = 0
failedToSendMetricPoints = numMetricPoints
}
require.Equalf(t, sentMetricPoints, sd.Attributes[obsreport.SentMetricPointsKey], "SpanData %v", sd)
require.Equalf(t, failedToSendMetricPoints, sd.Attributes[obsreport.FailedToSendMetricPointsKey], "SpanData %v", sd)
require.Equalf(t, sentMetricPoints, sd.Attributes[obsmetrics.SentMetricPointsKey], "SpanData %v", sd)
require.Equalf(t, failedToSendMetricPoints, sd.Attributes[obsmetrics.FailedToSendMetricPointsKey], "SpanData %v", sd)
}
}

View File

@ -30,16 +30,16 @@ import (
"go.uber.org/zap/zapcore"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)
var (
r = metric.NewRegistry()
queueSizeGauge, _ = r.AddInt64DerivedGauge(
obsreport.ExporterKey+"/queue_size",
obsmetrics.ExporterKey+"/queue_size",
metric.WithDescription("Current size of the retry queue (in batches)"),
metric.WithLabelKeys(obsreport.ExporterKey),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))
)
@ -127,7 +127,7 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger {
func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySettings, nextSender requestSender, logger *zap.Logger) *queuedRetrySender {
retryStopCh := make(chan struct{})
sampledLogger := createSampledLogger(logger)
traceAttr := trace.StringAttribute(obsreport.ExporterKey, fullName)
traceAttr := trace.StringAttribute(obsmetrics.ExporterKey, fullName)
return &queuedRetrySender{
fullName: fullName,
cfg: qCfg,

View File

@ -31,8 +31,8 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
)
@ -239,7 +239,7 @@ func checkWrapSpanForTracesExporter(t *testing.T, te component.TracesExporter, w
failedToSendSpans = numSpans
}
require.Equalf(t, sentSpans, sd.Attributes[obsreport.SentSpansKey], "SpanData %v", sd)
require.Equalf(t, failedToSendSpans, sd.Attributes[obsreport.FailedToSendSpansKey], "SpanData %v", sd)
require.Equalf(t, sentSpans, sd.Attributes[obsmetrics.SentSpansKey], "SpanData %v", sd)
require.Equalf(t, failedToSendSpans, sd.Attributes[obsmetrics.FailedToSendSpansKey], "SpanData %v", sd)
}
}

View File

@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package obsmetrics
import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
const (
// ExporterKey used to identify exporters in metrics and traces.
ExporterKey = "exporter"
// SentSpansKey used to track spans sent by exporters.
SentSpansKey = "sent_spans"
// FailedToSendSpansKey used to track spans that failed to be sent by exporters.
FailedToSendSpansKey = "send_failed_spans"
// SentMetricPointsKey used to track metric points sent by exporters.
SentMetricPointsKey = "sent_metric_points"
// FailedToSendMetricPointsKey used to track metric points that failed to be sent by exporters.
FailedToSendMetricPointsKey = "send_failed_metric_points"
// SentLogRecordsKey used to track logs sent by exporters.
SentLogRecordsKey = "sent_log_records"
// FailedToSendLogRecordsKey used to track logs that failed to be sent by exporters.
FailedToSendLogRecordsKey = "send_failed_log_records"
)
var (
TagKeyExporter, _ = tag.NewKey(ExporterKey)
ExporterPrefix = ExporterKey + NameSep
ExportTraceDataOperationSuffix = NameSep + "traces"
ExportMetricsOperationSuffix = NameSep + "metrics"
ExportLogsOperationSuffix = NameSep + "logs"
// Exporter metrics. Any count of data items below is in the final format
// that they were sent, reasoning: reconciliation is easier if measurements
// on backend and exporter are expected to be the same. Translation issues
// that result in a different number of elements should be reported in a
// separate way.
ExporterSentSpans = stats.Int64(
ExporterPrefix+SentSpansKey,
"Number of spans successfully sent to destination.",
stats.UnitDimensionless)
ExporterFailedToSendSpans = stats.Int64(
ExporterPrefix+FailedToSendSpansKey,
"Number of spans in failed attempts to send to destination.",
stats.UnitDimensionless)
ExporterSentMetricPoints = stats.Int64(
ExporterPrefix+SentMetricPointsKey,
"Number of metric points successfully sent to destination.",
stats.UnitDimensionless)
ExporterFailedToSendMetricPoints = stats.Int64(
ExporterPrefix+FailedToSendMetricPointsKey,
"Number of metric points in failed attempts to send to destination.",
stats.UnitDimensionless)
ExporterSentLogRecords = stats.Int64(
ExporterPrefix+SentLogRecordsKey,
"Number of log record successfully sent to destination.",
stats.UnitDimensionless)
ExporterFailedToSendLogRecords = stats.Int64(
ExporterPrefix+FailedToSendLogRecordsKey,
"Number of log records in failed attempts to send to destination.",
stats.UnitDimensionless)
)

View File

@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package obsmetrics
import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
const (
// ProcessorKey is the key used to identify processors in metrics and traces.
ProcessorKey = "processor"
// DroppedSpansKey is the key used to identify spans dropped by the Collector.
DroppedSpansKey = "dropped_spans"
// DroppedMetricPointsKey is the key used to identify metric points dropped by the Collector.
DroppedMetricPointsKey = "dropped_metric_points"
// DroppedLogRecordsKey is the key used to identify log records dropped by the Collector.
DroppedLogRecordsKey = "dropped_log_records"
)
var (
TagKeyProcessor, _ = tag.NewKey(ProcessorKey)
ProcessorPrefix = ProcessorKey + NameSep
// Processor metrics. Any count of data items below is in the internal format
// of the collector since processors only deal with internal format.
ProcessorAcceptedSpans = stats.Int64(
ProcessorPrefix+AcceptedSpansKey,
"Number of spans successfully pushed into the next component in the pipeline.",
stats.UnitDimensionless)
ProcessorRefusedSpans = stats.Int64(
ProcessorPrefix+RefusedSpansKey,
"Number of spans that were rejected by the next component in the pipeline.",
stats.UnitDimensionless)
ProcessorDroppedSpans = stats.Int64(
ProcessorPrefix+DroppedSpansKey,
"Number of spans that were dropped.",
stats.UnitDimensionless)
ProcessorAcceptedMetricPoints = stats.Int64(
ProcessorPrefix+AcceptedMetricPointsKey,
"Number of metric points successfully pushed into the next component in the pipeline.",
stats.UnitDimensionless)
ProcessorRefusedMetricPoints = stats.Int64(
ProcessorPrefix+RefusedMetricPointsKey,
"Number of metric points that were rejected by the next component in the pipeline.",
stats.UnitDimensionless)
ProcessorDroppedMetricPoints = stats.Int64(
ProcessorPrefix+DroppedMetricPointsKey,
"Number of metric points that were dropped.",
stats.UnitDimensionless)
ProcessorAcceptedLogRecords = stats.Int64(
ProcessorPrefix+AcceptedLogRecordsKey,
"Number of log records successfully pushed into the next component in the pipeline.",
stats.UnitDimensionless)
ProcessorRefusedLogRecords = stats.Int64(
ProcessorPrefix+RefusedLogRecordsKey,
"Number of log records that were rejected by the next component in the pipeline.",
stats.UnitDimensionless)
ProcessorDroppedLogRecords = stats.Int64(
ProcessorPrefix+DroppedLogRecordsKey,
"Number of log records that were dropped.",
stats.UnitDimensionless)
)

View File

@ -0,0 +1,86 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package obsmetrics
import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
const (
// ReceiverKey used to identify receivers in metrics and traces.
ReceiverKey = "receiver"
// TransportKey used to identify the transport used to received the data.
TransportKey = "transport"
// FormatKey used to identify the format of the data received.
FormatKey = "format"
// AcceptedSpansKey used to identify spans accepted by the Collector.
AcceptedSpansKey = "accepted_spans"
// RefusedSpansKey used to identify spans refused (ie.: not ingested) by the Collector.
RefusedSpansKey = "refused_spans"
// AcceptedMetricPointsKey used to identify metric points accepted by the Collector.
AcceptedMetricPointsKey = "accepted_metric_points"
// RefusedMetricPointsKey used to identify metric points refused (ie.: not ingested) by the
// Collector.
RefusedMetricPointsKey = "refused_metric_points"
// AcceptedLogRecordsKey used to identify log records accepted by the Collector.
AcceptedLogRecordsKey = "accepted_log_records"
// RefusedLogRecordsKey used to identify log records refused (ie.: not ingested) by the
// Collector.
RefusedLogRecordsKey = "refused_log_records"
)
var (
TagKeyReceiver, _ = tag.NewKey(ReceiverKey)
TagKeyTransport, _ = tag.NewKey(TransportKey)
ReceiverPrefix = ReceiverKey + NameSep
ReceiveTraceDataOperationSuffix = NameSep + "TraceDataReceived"
ReceiverMetricsOperationSuffix = NameSep + "MetricsReceived"
ReceiverLogsOperationSuffix = NameSep + "LogsReceived"
// Receiver metrics. Any count of data items below is in the original format
// that they were received, reasoning: reconciliation is easier if measurement
// on clients and receiver are expected to be the same. Translation issues
// that result in a different number of elements should be reported in a
// separate way.
ReceiverAcceptedSpans = stats.Int64(
ReceiverPrefix+AcceptedSpansKey,
"Number of spans successfully pushed into the pipeline.",
stats.UnitDimensionless)
ReceiverRefusedSpans = stats.Int64(
ReceiverPrefix+RefusedSpansKey,
"Number of spans that could not be pushed into the pipeline.",
stats.UnitDimensionless)
ReceiverAcceptedMetricPoints = stats.Int64(
ReceiverPrefix+AcceptedMetricPointsKey,
"Number of metric points successfully pushed into the pipeline.",
stats.UnitDimensionless)
ReceiverRefusedMetricPoints = stats.Int64(
ReceiverPrefix+RefusedMetricPointsKey,
"Number of metric points that could not be pushed into the pipeline.",
stats.UnitDimensionless)
ReceiverAcceptedLogRecords = stats.Int64(
ReceiverPrefix+AcceptedLogRecordsKey,
"Number of log records successfully pushed into the pipeline.",
stats.UnitDimensionless)
ReceiverRefusedLogRecords = stats.Int64(
ReceiverPrefix+RefusedLogRecordsKey,
"Number of log records that could not be pushed into the pipeline.",
stats.UnitDimensionless)
)

View File

@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package obsmetrics
import (
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
const (
// ScraperKey used to identify scrapers in metrics and traces.
ScraperKey = "scraper"
// ScrapedMetricPointsKey used to identify metric points scraped by the
// Collector.
ScrapedMetricPointsKey = "scraped_metric_points"
// ErroredMetricPointsKey used to identify metric points errored (i.e.
// unable to be scraped) by the Collector.
ErroredMetricPointsKey = "errored_metric_points"
)
const (
ScraperPrefix = ScraperKey + NameSep
ScraperMetricsOperationSuffix = NameSep + "MetricsScraped"
)
var (
TagKeyScraper, _ = tag.NewKey(ScraperKey)
ScraperScrapedMetricPoints = stats.Int64(
ScraperPrefix+ScrapedMetricPointsKey,
"Number of metric points successfully scraped.",
stats.UnitDimensionless)
ScraperErroredMetricPoints = stats.Int64(
ScraperPrefix+ErroredMetricPointsKey,
"Number of metric points that were unable to be scraped.",
stats.UnitDimensionless)
)

View File

@ -0,0 +1,22 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package obsmetrics defines the obsreport metrics for each components
// all the metrics is in OpenCensus format which will be replaced with OTEL Metrics
// in the future
package obsmetrics
const (
NameSep = "/"
)

View File

@ -0,0 +1,124 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package obsreportconfig
import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)
var (
Level = configtelemetry.LevelBasic
)
// ObsMetrics wraps OpenCensus View for Collector observability metrics
type ObsMetrics struct {
Views []*view.View
}
// Configure is used to control the settings that will be used by the obsreport
// package.
func Configure(level configtelemetry.Level) *ObsMetrics {
Level = level
var views []*view.View
if Level != configtelemetry.LevelNone {
obsMetricViews := allViews()
views = append(views, obsMetricViews.Views...)
}
return &ObsMetrics{
Views: views,
}
}
// allViews return the list of all views that needs to be configured.
func allViews() *ObsMetrics {
var views []*view.View
// Receiver views.
measures := []*stats.Int64Measure{
obsmetrics.ReceiverAcceptedSpans,
obsmetrics.ReceiverRefusedSpans,
obsmetrics.ReceiverAcceptedMetricPoints,
obsmetrics.ReceiverRefusedMetricPoints,
obsmetrics.ReceiverAcceptedLogRecords,
obsmetrics.ReceiverRefusedLogRecords,
}
tagKeys := []tag.Key{
obsmetrics.TagKeyReceiver, obsmetrics.TagKeyTransport,
}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
// Scraper views.
measures = []*stats.Int64Measure{
obsmetrics.ScraperScrapedMetricPoints,
obsmetrics.ScraperErroredMetricPoints,
}
tagKeys = []tag.Key{obsmetrics.TagKeyReceiver, obsmetrics.TagKeyScraper}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
// Exporter views.
measures = []*stats.Int64Measure{
obsmetrics.ExporterSentSpans,
obsmetrics.ExporterFailedToSendSpans,
obsmetrics.ExporterSentMetricPoints,
obsmetrics.ExporterFailedToSendMetricPoints,
obsmetrics.ExporterSentLogRecords,
obsmetrics.ExporterFailedToSendLogRecords,
}
tagKeys = []tag.Key{obsmetrics.TagKeyExporter}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
// Processor views.
measures = []*stats.Int64Measure{
obsmetrics.ProcessorAcceptedSpans,
obsmetrics.ProcessorRefusedSpans,
obsmetrics.ProcessorDroppedSpans,
obsmetrics.ProcessorAcceptedMetricPoints,
obsmetrics.ProcessorRefusedMetricPoints,
obsmetrics.ProcessorDroppedMetricPoints,
obsmetrics.ProcessorAcceptedLogRecords,
obsmetrics.ProcessorRefusedLogRecords,
obsmetrics.ProcessorDroppedLogRecords,
}
tagKeys = []tag.Key{obsmetrics.TagKeyProcessor}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
return &ObsMetrics{
Views: views,
}
}
func genViews(
measures []*stats.Int64Measure,
tagKeys []tag.Key,
aggregation *view.Aggregation,
) []*view.View {
views := make([]*view.View, 0, len(measures))
for _, measure := range measures {
views = append(views, &view.View{
Name: measure.Name(),
Description: measure.Description(),
TagKeys: tagKeys,
Measure: measure,
Aggregation: aggregation,
})
}
return views
}

View File

@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package obsreportconfig
import (
"testing"
"github.com/stretchr/testify/assert"
"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/config/configtelemetry"
)
func TestConfigure(t *testing.T) {
tests := []struct {
name string
level configtelemetry.Level
wantViews []*view.View
}{
{
name: "none",
level: configtelemetry.LevelNone,
},
{
name: "basic",
level: configtelemetry.LevelBasic,
wantViews: allViews().Views,
},
{
name: "normal",
level: configtelemetry.LevelNormal,
wantViews: allViews().Views,
},
{
name: "detailed",
level: configtelemetry.LevelDetailed,
wantViews: allViews().Views,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotViews := Configure(tt.level)
assert.Equal(t, tt.wantViews, gotViews.Views)
})
}
}

View File

@ -18,21 +18,12 @@ import (
"context"
"strings"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
"go.opentelemetry.io/collector/config/configtelemetry"
)
const (
nameSep = "/"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)
var (
gLevel = configtelemetry.LevelBasic
okStatus = trace.Status{Code: trace.StatusCodeOK}
)
@ -60,99 +51,14 @@ func setParentLink(parentCtx context.Context, childSpan *trace.Span) bool {
return true
}
// Configure is used to control the settings that will be used by the obsreport
// package.
func Configure(level configtelemetry.Level) (views []*view.View) {
gLevel = level
if gLevel != configtelemetry.LevelNone {
gProcessor.level = level
views = append(views, AllViews()...)
}
return views
}
func buildComponentPrefix(componentPrefix, configType string) string {
if !strings.HasSuffix(componentPrefix, nameSep) {
componentPrefix += nameSep
if !strings.HasSuffix(componentPrefix, obsmetrics.NameSep) {
componentPrefix += obsmetrics.NameSep
}
if configType == "" {
return componentPrefix
}
return componentPrefix + configType + nameSep
}
// AllViews return the list of all views that needs to be configured.
func AllViews() (views []*view.View) {
// Receiver views.
measures := []*stats.Int64Measure{
mReceiverAcceptedSpans,
mReceiverRefusedSpans,
mReceiverAcceptedMetricPoints,
mReceiverRefusedMetricPoints,
mReceiverAcceptedLogRecords,
mReceiverRefusedLogRecords,
}
tagKeys := []tag.Key{
tagKeyReceiver, tagKeyTransport,
}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
// Scraper views.
measures = []*stats.Int64Measure{
mScraperScrapedMetricPoints,
mScraperErroredMetricPoints,
}
tagKeys = []tag.Key{tagKeyReceiver, tagKeyScraper}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
// Exporter views.
measures = []*stats.Int64Measure{
mExporterSentSpans,
mExporterFailedToSendSpans,
mExporterSentMetricPoints,
mExporterFailedToSendMetricPoints,
mExporterSentLogRecords,
mExporterFailedToSendLogRecords,
}
tagKeys = []tag.Key{tagKeyExporter}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
// Processor views.
measures = []*stats.Int64Measure{
mProcessorAcceptedSpans,
mProcessorRefusedSpans,
mProcessorDroppedSpans,
mProcessorAcceptedMetricPoints,
mProcessorRefusedMetricPoints,
mProcessorDroppedMetricPoints,
mProcessorAcceptedLogRecords,
mProcessorRefusedLogRecords,
mProcessorDroppedLogRecords,
}
tagKeys = []tag.Key{tagKeyProcessor}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
return views
}
func genViews(
measures []*stats.Int64Measure,
tagKeys []tag.Key,
aggregation *view.Aggregation,
) []*view.View {
views := make([]*view.View, 0, len(measures))
for _, measure := range measures {
views = append(views, &view.View{
Name: measure.Name(),
Description: measure.Description(),
TagKeys: tagKeys,
Measure: measure,
Aggregation: aggregation,
})
}
return views
return componentPrefix + configType + obsmetrics.NameSep
}
func errToStatus(err error) trace.Status {

View File

@ -23,65 +23,8 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
)
const (
// ExporterKey used to identify exporters in metrics and traces.
ExporterKey = "exporter"
// SentSpansKey used to track spans sent by exporters.
SentSpansKey = "sent_spans"
// FailedToSendSpansKey used to track spans that failed to be sent by exporters.
FailedToSendSpansKey = "send_failed_spans"
// SentMetricPointsKey used to track metric points sent by exporters.
SentMetricPointsKey = "sent_metric_points"
// FailedToSendMetricPointsKey used to track metric points that failed to be sent by exporters.
FailedToSendMetricPointsKey = "send_failed_metric_points"
// SentLogRecordsKey used to track logs sent by exporters.
SentLogRecordsKey = "sent_log_records"
// FailedToSendLogRecordsKey used to track logs that failed to be sent by exporters.
FailedToSendLogRecordsKey = "send_failed_log_records"
)
var (
tagKeyExporter, _ = tag.NewKey(ExporterKey)
exporterPrefix = ExporterKey + nameSep
exportTraceDataOperationSuffix = nameSep + "traces"
exportMetricsOperationSuffix = nameSep + "metrics"
exportLogsOperationSuffix = nameSep + "logs"
// Exporter metrics. Any count of data items below is in the final format
// that they were sent, reasoning: reconciliation is easier if measurements
// on backend and exporter are expected to be the same. Translation issues
// that result in a different number of elements should be reported in a
// separate way.
mExporterSentSpans = stats.Int64(
exporterPrefix+SentSpansKey,
"Number of spans successfully sent to destination.",
stats.UnitDimensionless)
mExporterFailedToSendSpans = stats.Int64(
exporterPrefix+FailedToSendSpansKey,
"Number of spans in failed attempts to send to destination.",
stats.UnitDimensionless)
mExporterSentMetricPoints = stats.Int64(
exporterPrefix+SentMetricPointsKey,
"Number of metric points successfully sent to destination.",
stats.UnitDimensionless)
mExporterFailedToSendMetricPoints = stats.Int64(
exporterPrefix+FailedToSendMetricPointsKey,
"Number of metric points in failed attempts to send to destination.",
stats.UnitDimensionless)
mExporterSentLogRecords = stats.Int64(
exporterPrefix+SentLogRecordsKey,
"Number of log record successfully sent to destination.",
stats.UnitDimensionless)
mExporterFailedToSendLogRecords = stats.Int64(
exporterPrefix+FailedToSendLogRecordsKey,
"Number of log records in failed attempts to send to destination.",
stats.UnitDimensionless)
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)
// Exporter is a helper to add observability to a component.Exporter.
@ -102,7 +45,7 @@ func NewExporter(cfg ExporterSettings) *Exporter {
return &Exporter{
level: cfg.Level,
exporterName: cfg.ExporterID.String(),
mutators: []tag.Mutator{tag.Upsert(tagKeyExporter, cfg.ExporterID.String(), tag.WithTTL(tag.TTLNoPropagation))},
mutators: []tag.Mutator{tag.Upsert(obsmetrics.TagKeyExporter, cfg.ExporterID.String(), tag.WithTTL(tag.TTLNoPropagation))},
}
}
@ -110,55 +53,55 @@ func NewExporter(cfg ExporterSettings) *Exporter {
// The returned context should be used in other calls to the Exporter functions
// dealing with the same export operation.
func (eor *Exporter) StartTracesExportOp(ctx context.Context) context.Context {
return eor.startSpan(ctx, exportTraceDataOperationSuffix)
return eor.startSpan(ctx, obsmetrics.ExportTraceDataOperationSuffix)
}
// EndTracesExportOp completes the export operation that was started with StartTracesExportOp.
func (eor *Exporter) EndTracesExportOp(ctx context.Context, numSpans int, err error) {
numSent, numFailedToSend := toNumItems(numSpans, err)
eor.recordMetrics(ctx, numSent, numFailedToSend, mExporterSentSpans, mExporterFailedToSendSpans)
endSpan(ctx, err, numSent, numFailedToSend, SentSpansKey, FailedToSendSpansKey)
eor.recordMetrics(ctx, numSent, numFailedToSend, obsmetrics.ExporterSentSpans, obsmetrics.ExporterFailedToSendSpans)
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentSpansKey, obsmetrics.FailedToSendSpansKey)
}
// StartMetricsExportOp is called at the start of an Export operation.
// The returned context should be used in other calls to the Exporter functions
// dealing with the same export operation.
func (eor *Exporter) StartMetricsExportOp(ctx context.Context) context.Context {
return eor.startSpan(ctx, exportMetricsOperationSuffix)
return eor.startSpan(ctx, obsmetrics.ExportMetricsOperationSuffix)
}
// EndMetricsExportOp completes the export operation that was started with
// StartMetricsExportOp.
func (eor *Exporter) EndMetricsExportOp(ctx context.Context, numMetricPoints int, err error) {
numSent, numFailedToSend := toNumItems(numMetricPoints, err)
eor.recordMetrics(ctx, numSent, numFailedToSend, mExporterSentMetricPoints, mExporterFailedToSendMetricPoints)
endSpan(ctx, err, numSent, numFailedToSend, SentMetricPointsKey, FailedToSendMetricPointsKey)
eor.recordMetrics(ctx, numSent, numFailedToSend, obsmetrics.ExporterSentMetricPoints, obsmetrics.ExporterFailedToSendMetricPoints)
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentMetricPointsKey, obsmetrics.FailedToSendMetricPointsKey)
}
// StartLogsExportOp is called at the start of an Export operation.
// The returned context should be used in other calls to the Exporter functions
// dealing with the same export operation.
func (eor *Exporter) StartLogsExportOp(ctx context.Context) context.Context {
return eor.startSpan(ctx, exportLogsOperationSuffix)
return eor.startSpan(ctx, obsmetrics.ExportLogsOperationSuffix)
}
// EndLogsExportOp completes the export operation that was started with StartLogsExportOp.
func (eor *Exporter) EndLogsExportOp(ctx context.Context, numLogRecords int, err error) {
numSent, numFailedToSend := toNumItems(numLogRecords, err)
eor.recordMetrics(ctx, numSent, numFailedToSend, mExporterSentLogRecords, mExporterFailedToSendLogRecords)
endSpan(ctx, err, numSent, numFailedToSend, SentLogRecordsKey, FailedToSendLogRecordsKey)
eor.recordMetrics(ctx, numSent, numFailedToSend, obsmetrics.ExporterSentLogRecords, obsmetrics.ExporterFailedToSendLogRecords)
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentLogRecordsKey, obsmetrics.FailedToSendLogRecordsKey)
}
// startSpan creates the span used to trace the operation. Returning
// the updated context and the created span.
func (eor *Exporter) startSpan(ctx context.Context, operationSuffix string) context.Context {
spanName := exporterPrefix + eor.exporterName + operationSuffix
spanName := obsmetrics.ExporterPrefix + eor.exporterName + operationSuffix
ctx, _ = trace.StartSpan(ctx, spanName)
return ctx
}
func (eor *Exporter) recordMetrics(ctx context.Context, numSent, numFailedToSend int64, sentMeasure, failedToSendMeasure *stats.Int64Measure) {
if gLevel == configtelemetry.LevelNone {
if obsreportconfig.Level == configtelemetry.LevelNone {
return
}
// Ignore the error for now. This should not happen.

View File

@ -23,79 +23,22 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
)
const (
// ProcessorKey is the key used to identify processors in metrics and traces.
ProcessorKey = "processor"
// DroppedSpansKey is the key used to identify spans dropped by the Collector.
DroppedSpansKey = "dropped_spans"
// DroppedMetricPointsKey is the key used to identify metric points dropped by the Collector.
DroppedMetricPointsKey = "dropped_metric_points"
// DroppedLogRecordsKey is the key used to identify log records dropped by the Collector.
DroppedLogRecordsKey = "dropped_log_records"
)
var (
tagKeyProcessor, _ = tag.NewKey(ProcessorKey)
processorPrefix = ProcessorKey + nameSep
// Processor metrics. Any count of data items below is in the internal format
// of the collector since processors only deal with internal format.
mProcessorAcceptedSpans = stats.Int64(
processorPrefix+AcceptedSpansKey,
"Number of spans successfully pushed into the next component in the pipeline.",
stats.UnitDimensionless)
mProcessorRefusedSpans = stats.Int64(
processorPrefix+RefusedSpansKey,
"Number of spans that were rejected by the next component in the pipeline.",
stats.UnitDimensionless)
mProcessorDroppedSpans = stats.Int64(
processorPrefix+DroppedSpansKey,
"Number of spans that were dropped.",
stats.UnitDimensionless)
mProcessorAcceptedMetricPoints = stats.Int64(
processorPrefix+AcceptedMetricPointsKey,
"Number of metric points successfully pushed into the next component in the pipeline.",
stats.UnitDimensionless)
mProcessorRefusedMetricPoints = stats.Int64(
processorPrefix+RefusedMetricPointsKey,
"Number of metric points that were rejected by the next component in the pipeline.",
stats.UnitDimensionless)
mProcessorDroppedMetricPoints = stats.Int64(
processorPrefix+DroppedMetricPointsKey,
"Number of metric points that were dropped.",
stats.UnitDimensionless)
mProcessorAcceptedLogRecords = stats.Int64(
processorPrefix+AcceptedLogRecordsKey,
"Number of log records successfully pushed into the next component in the pipeline.",
stats.UnitDimensionless)
mProcessorRefusedLogRecords = stats.Int64(
processorPrefix+RefusedLogRecordsKey,
"Number of log records that were rejected by the next component in the pipeline.",
stats.UnitDimensionless)
mProcessorDroppedLogRecords = stats.Int64(
processorPrefix+DroppedLogRecordsKey,
"Number of log records that were dropped.",
stats.UnitDimensionless)
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)
// BuildProcessorCustomMetricName is used to be build a metric name following
// the standards used in the Collector. The configType should be the same
// value used to identify the type on the config.
func BuildProcessorCustomMetricName(configType, metric string) string {
return buildComponentPrefix(processorPrefix, configType) + metric
return buildComponentPrefix(obsmetrics.ProcessorPrefix, configType) + metric
}
// ProcessorMetricViews builds the metric views for custom metrics of processors.
func ProcessorMetricViews(configType string, legacyViews []*view.View) []*view.View {
func ProcessorMetricViews(configType string, legacyViews *obsreportconfig.ObsMetrics) *obsreportconfig.ObsMetrics {
var allViews []*view.View
if gLevel != configtelemetry.LevelNone {
for _, legacyView := range legacyViews {
if obsreportconfig.Level != configtelemetry.LevelNone {
for _, legacyView := range legacyViews.Views {
// Ignore any nil entry and views without measure or aggregation.
// These can't be registered but some code registering legacy views may
// ignore the errors.
@ -112,11 +55,11 @@ func ProcessorMetricViews(configType string, legacyViews []*view.View) []*view.V
}
}
return allViews
return &obsreportconfig.ObsMetrics{
Views: allViews,
}
}
var gProcessor = &Processor{level: configtelemetry.LevelNone}
// Processor is a helper to add observability to a component.Processor.
type Processor struct {
level configtelemetry.Level
@ -133,7 +76,7 @@ type ProcessorSettings struct {
func NewProcessor(cfg ProcessorSettings) *Processor {
return &Processor{
level: cfg.Level,
mutators: []tag.Mutator{tag.Upsert(tagKeyProcessor, cfg.ProcessorID.String(), tag.WithTTL(tag.TTLNoPropagation))},
mutators: []tag.Mutator{tag.Upsert(obsmetrics.TagKeyProcessor, cfg.ProcessorID.String(), tag.WithTTL(tag.TTLNoPropagation))},
}
}
@ -143,9 +86,9 @@ func (por *Processor) TracesAccepted(ctx context.Context, numSpans int) {
stats.RecordWithTags(
ctx,
por.mutators,
mProcessorAcceptedSpans.M(int64(numSpans)),
mProcessorRefusedSpans.M(0),
mProcessorDroppedSpans.M(0),
obsmetrics.ProcessorAcceptedSpans.M(int64(numSpans)),
obsmetrics.ProcessorRefusedSpans.M(0),
obsmetrics.ProcessorDroppedSpans.M(0),
)
}
}
@ -156,9 +99,9 @@ func (por *Processor) TracesRefused(ctx context.Context, numSpans int) {
stats.RecordWithTags(
ctx,
por.mutators,
mProcessorAcceptedSpans.M(0),
mProcessorRefusedSpans.M(int64(numSpans)),
mProcessorDroppedSpans.M(0),
obsmetrics.ProcessorAcceptedSpans.M(0),
obsmetrics.ProcessorRefusedSpans.M(int64(numSpans)),
obsmetrics.ProcessorDroppedSpans.M(0),
)
}
}
@ -169,9 +112,9 @@ func (por *Processor) TracesDropped(ctx context.Context, numSpans int) {
stats.RecordWithTags(
ctx,
por.mutators,
mProcessorAcceptedSpans.M(0),
mProcessorRefusedSpans.M(0),
mProcessorDroppedSpans.M(int64(numSpans)),
obsmetrics.ProcessorAcceptedSpans.M(0),
obsmetrics.ProcessorRefusedSpans.M(0),
obsmetrics.ProcessorDroppedSpans.M(int64(numSpans)),
)
}
}
@ -182,9 +125,9 @@ func (por *Processor) MetricsAccepted(ctx context.Context, numPoints int) {
stats.RecordWithTags(
ctx,
por.mutators,
mProcessorAcceptedMetricPoints.M(int64(numPoints)),
mProcessorRefusedMetricPoints.M(0),
mProcessorDroppedMetricPoints.M(0),
obsmetrics.ProcessorAcceptedMetricPoints.M(int64(numPoints)),
obsmetrics.ProcessorRefusedMetricPoints.M(0),
obsmetrics.ProcessorDroppedMetricPoints.M(0),
)
}
}
@ -195,9 +138,9 @@ func (por *Processor) MetricsRefused(ctx context.Context, numPoints int) {
stats.RecordWithTags(
ctx,
por.mutators,
mProcessorAcceptedMetricPoints.M(0),
mProcessorRefusedMetricPoints.M(int64(numPoints)),
mProcessorDroppedMetricPoints.M(0),
obsmetrics.ProcessorAcceptedMetricPoints.M(0),
obsmetrics.ProcessorRefusedMetricPoints.M(int64(numPoints)),
obsmetrics.ProcessorDroppedMetricPoints.M(0),
)
}
}
@ -208,9 +151,9 @@ func (por *Processor) MetricsDropped(ctx context.Context, numPoints int) {
stats.RecordWithTags(
ctx,
por.mutators,
mProcessorAcceptedMetricPoints.M(0),
mProcessorRefusedMetricPoints.M(0),
mProcessorDroppedMetricPoints.M(int64(numPoints)),
obsmetrics.ProcessorAcceptedMetricPoints.M(0),
obsmetrics.ProcessorRefusedMetricPoints.M(0),
obsmetrics.ProcessorDroppedMetricPoints.M(int64(numPoints)),
)
}
}
@ -221,9 +164,9 @@ func (por *Processor) LogsAccepted(ctx context.Context, numRecords int) {
stats.RecordWithTags(
ctx,
por.mutators,
mProcessorAcceptedLogRecords.M(int64(numRecords)),
mProcessorRefusedLogRecords.M(0),
mProcessorDroppedLogRecords.M(0),
obsmetrics.ProcessorAcceptedLogRecords.M(int64(numRecords)),
obsmetrics.ProcessorRefusedLogRecords.M(0),
obsmetrics.ProcessorDroppedLogRecords.M(0),
)
}
}
@ -234,9 +177,9 @@ func (por *Processor) LogsRefused(ctx context.Context, numRecords int) {
stats.RecordWithTags(
ctx,
por.mutators,
mProcessorAcceptedLogRecords.M(0),
mProcessorRefusedLogRecords.M(int64(numRecords)),
mProcessorDroppedMetricPoints.M(0),
obsmetrics.ProcessorAcceptedLogRecords.M(0),
obsmetrics.ProcessorRefusedLogRecords.M(int64(numRecords)),
obsmetrics.ProcessorDroppedMetricPoints.M(0),
)
}
}
@ -247,9 +190,9 @@ func (por *Processor) LogsDropped(ctx context.Context, numRecords int) {
stats.RecordWithTags(
ctx,
por.mutators,
mProcessorAcceptedLogRecords.M(0),
mProcessorRefusedLogRecords.M(0),
mProcessorDroppedLogRecords.M(int64(numRecords)),
obsmetrics.ProcessorAcceptedLogRecords.M(0),
obsmetrics.ProcessorRefusedLogRecords.M(0),
obsmetrics.ProcessorDroppedLogRecords.M(int64(numRecords)),
)
}
}

View File

@ -23,72 +23,8 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
)
const (
// ReceiverKey used to identify receivers in metrics and traces.
ReceiverKey = "receiver"
// TransportKey used to identify the transport used to received the data.
TransportKey = "transport"
// FormatKey used to identify the format of the data received.
FormatKey = "format"
// AcceptedSpansKey used to identify spans accepted by the Collector.
AcceptedSpansKey = "accepted_spans"
// RefusedSpansKey used to identify spans refused (ie.: not ingested) by the Collector.
RefusedSpansKey = "refused_spans"
// AcceptedMetricPointsKey used to identify metric points accepted by the Collector.
AcceptedMetricPointsKey = "accepted_metric_points"
// RefusedMetricPointsKey used to identify metric points refused (ie.: not ingested) by the
// Collector.
RefusedMetricPointsKey = "refused_metric_points"
// AcceptedLogRecordsKey used to identify log records accepted by the Collector.
AcceptedLogRecordsKey = "accepted_log_records"
// RefusedLogRecordsKey used to identify log records refused (ie.: not ingested) by the
// Collector.
RefusedLogRecordsKey = "refused_log_records"
)
var (
tagKeyReceiver, _ = tag.NewKey(ReceiverKey)
tagKeyTransport, _ = tag.NewKey(TransportKey)
receiverPrefix = ReceiverKey + nameSep
receiveTraceDataOperationSuffix = nameSep + "TraceDataReceived"
receiverMetricsOperationSuffix = nameSep + "MetricsReceived"
receiverLogsOperationSuffix = nameSep + "LogsReceived"
// Receiver metrics. Any count of data items below is in the original format
// that they were received, reasoning: reconciliation is easier if measurements
// on clients and receiver are expected to be the same. Translation issues
// that result in a different number of elements should be reported in a
// separate way.
mReceiverAcceptedSpans = stats.Int64(
receiverPrefix+AcceptedSpansKey,
"Number of spans successfully pushed into the pipeline.",
stats.UnitDimensionless)
mReceiverRefusedSpans = stats.Int64(
receiverPrefix+RefusedSpansKey,
"Number of spans that could not be pushed into the pipeline.",
stats.UnitDimensionless)
mReceiverAcceptedMetricPoints = stats.Int64(
receiverPrefix+AcceptedMetricPointsKey,
"Number of metric points successfully pushed into the pipeline.",
stats.UnitDimensionless)
mReceiverRefusedMetricPoints = stats.Int64(
receiverPrefix+RefusedMetricPointsKey,
"Number of metric points that could not be pushed into the pipeline.",
stats.UnitDimensionless)
mReceiverAcceptedLogRecords = stats.Int64(
receiverPrefix+AcceptedLogRecordsKey,
"Number of log records successfully pushed into the pipeline.",
stats.UnitDimensionless)
mReceiverRefusedLogRecords = stats.Int64(
receiverPrefix+RefusedLogRecordsKey,
"Number of log records that could not be pushed into the pipeline.",
stats.UnitDimensionless)
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)
// StartReceiveOptions has the options related to starting a receive operation.
@ -172,7 +108,7 @@ func (rec *Receiver) StartTraceDataReceiveOp(
) context.Context {
return rec.traceReceiveOp(
operationCtx,
receiveTraceDataOperationSuffix,
obsmetrics.ReceiveTraceDataOperationSuffix,
opt...)
}
@ -188,7 +124,7 @@ func StartTraceDataReceiveOp(
rec := NewReceiver(ReceiverSettings{ReceiverID: receiverID, Transport: transport})
return rec.traceReceiveOp(
operationCtx,
receiveTraceDataOperationSuffix,
obsmetrics.ReceiveTraceDataOperationSuffix,
opt...)
}
@ -236,7 +172,7 @@ func (rec *Receiver) StartLogsReceiveOp(
) context.Context {
return rec.traceReceiveOp(
operationCtx,
receiverLogsOperationSuffix,
obsmetrics.ReceiverLogsOperationSuffix,
opt...)
}
@ -252,7 +188,7 @@ func StartLogsReceiveOp(
rec := NewReceiver(ReceiverSettings{ReceiverID: receiverID, Transport: transport})
return rec.traceReceiveOp(
operationCtx,
receiverLogsOperationSuffix,
obsmetrics.ReceiverLogsOperationSuffix,
opt...)
}
@ -300,7 +236,7 @@ func (rec *Receiver) StartMetricsReceiveOp(
) context.Context {
return rec.traceReceiveOp(
operationCtx,
receiverMetricsOperationSuffix,
obsmetrics.ReceiverMetricsOperationSuffix,
opt...)
}
@ -316,7 +252,7 @@ func StartMetricsReceiveOp(
rec := NewReceiver(ReceiverSettings{ReceiverID: receiverID, Transport: transport})
return rec.traceReceiveOp(
operationCtx,
receiverMetricsOperationSuffix,
obsmetrics.ReceiverMetricsOperationSuffix,
opt...)
}
@ -365,8 +301,8 @@ func ReceiverContext(
transport string,
) context.Context {
ctx, _ = tag.New(ctx,
tag.Upsert(tagKeyReceiver, receiverID.String(), tag.WithTTL(tag.TTLNoPropagation)),
tag.Upsert(tagKeyTransport, transport, tag.WithTTL(tag.TTLNoPropagation)))
tag.Upsert(obsmetrics.TagKeyReceiver, receiverID.String(), tag.WithTTL(tag.TTLNoPropagation)),
tag.Upsert(obsmetrics.TagKeyTransport, transport, tag.WithTTL(tag.TTLNoPropagation)))
return ctx
}
@ -385,7 +321,7 @@ func (rec *Receiver) traceReceiveOp(
var ctx context.Context
var span *trace.Span
spanName := receiverPrefix + rec.receiverID.String() + operationSuffix
spanName := obsmetrics.ReceiverPrefix + rec.receiverID.String() + operationSuffix
if !opts.LongLivedCtx {
ctx, span = trace.StartSpan(receiverCtx, spanName)
} else {
@ -401,7 +337,7 @@ func (rec *Receiver) traceReceiveOp(
}
if rec.transport != "" {
span.AddAttributes(trace.StringAttribute(TransportKey, rec.transport))
span.AddAttributes(trace.StringAttribute(obsmetrics.TransportKey, rec.transport))
}
return ctx
}
@ -423,18 +359,18 @@ func (rec *Receiver) endReceiveOp(
span := trace.FromContext(receiverCtx)
if gLevel != configtelemetry.LevelNone {
if obsreportconfig.Level != configtelemetry.LevelNone {
var acceptedMeasure, refusedMeasure *stats.Int64Measure
switch dataType {
case config.TracesDataType:
acceptedMeasure = mReceiverAcceptedSpans
refusedMeasure = mReceiverRefusedSpans
acceptedMeasure = obsmetrics.ReceiverAcceptedSpans
refusedMeasure = obsmetrics.ReceiverRefusedSpans
case config.MetricsDataType:
acceptedMeasure = mReceiverAcceptedMetricPoints
refusedMeasure = mReceiverRefusedMetricPoints
acceptedMeasure = obsmetrics.ReceiverAcceptedMetricPoints
refusedMeasure = obsmetrics.ReceiverRefusedMetricPoints
case config.LogsDataType:
acceptedMeasure = mReceiverAcceptedLogRecords
refusedMeasure = mReceiverRefusedLogRecords
acceptedMeasure = obsmetrics.ReceiverAcceptedLogRecords
refusedMeasure = obsmetrics.ReceiverRefusedLogRecords
}
stats.Record(
@ -448,19 +384,19 @@ func (rec *Receiver) endReceiveOp(
var acceptedItemsKey, refusedItemsKey string
switch dataType {
case config.TracesDataType:
acceptedItemsKey = AcceptedSpansKey
refusedItemsKey = RefusedSpansKey
acceptedItemsKey = obsmetrics.AcceptedSpansKey
refusedItemsKey = obsmetrics.RefusedSpansKey
case config.MetricsDataType:
acceptedItemsKey = AcceptedMetricPointsKey
refusedItemsKey = RefusedMetricPointsKey
acceptedItemsKey = obsmetrics.AcceptedMetricPointsKey
refusedItemsKey = obsmetrics.RefusedMetricPointsKey
case config.LogsDataType:
acceptedItemsKey = AcceptedLogRecordsKey
refusedItemsKey = RefusedLogRecordsKey
acceptedItemsKey = obsmetrics.AcceptedLogRecordsKey
refusedItemsKey = obsmetrics.RefusedLogRecordsKey
}
span.AddAttributes(
trace.StringAttribute(
FormatKey, format),
obsmetrics.FormatKey, format),
trace.Int64Attribute(
acceptedItemsKey, int64(numAccepted)),
trace.Int64Attribute(

View File

@ -23,39 +23,11 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/receiver/scrapererror"
)
const (
// ScraperKey used to identify scrapers in metrics and traces.
ScraperKey = "scraper"
// ScrapedMetricPointsKey used to identify metric points scraped by the
// Collector.
ScrapedMetricPointsKey = "scraped_metric_points"
// ErroredMetricPointsKey used to identify metric points errored (i.e.
// unable to be scraped) by the Collector.
ErroredMetricPointsKey = "errored_metric_points"
)
const (
scraperPrefix = ScraperKey + nameSep
scraperMetricsOperationSuffix = nameSep + "MetricsScraped"
)
var (
tagKeyScraper, _ = tag.NewKey(ScraperKey)
mScraperScrapedMetricPoints = stats.Int64(
scraperPrefix+ScrapedMetricPointsKey,
"Number of metric points successfully scraped.",
stats.UnitDimensionless)
mScraperErroredMetricPoints = stats.Int64(
scraperPrefix+ErroredMetricPointsKey,
"Number of metric points that were unable to be scraped.",
stats.UnitDimensionless)
)
// ScraperContext adds the keys used when recording observability metrics to
// the given context returning the newly created context. This context should
// be used in related calls to the obsreport functions so metrics are properly
@ -67,8 +39,8 @@ func ScraperContext(
) context.Context {
ctx, _ = tag.New(
ctx,
tag.Upsert(tagKeyReceiver, receiverID.String(), tag.WithTTL(tag.TTLNoPropagation)),
tag.Upsert(tagKeyScraper, scraper.String(), tag.WithTTL(tag.TTLNoPropagation)))
tag.Upsert(obsmetrics.TagKeyReceiver, receiverID.String(), tag.WithTTL(tag.TTLNoPropagation)),
tag.Upsert(obsmetrics.TagKeyScraper, scraper.String(), tag.WithTTL(tag.TTLNoPropagation)))
return ctx
}
@ -81,7 +53,7 @@ func StartMetricsScrapeOp(
receiverID config.ComponentID,
scraper config.ComponentID,
) context.Context {
spanName := scraperPrefix + receiverID.String() + nameSep + scraper.String() + scraperMetricsOperationSuffix
spanName := obsmetrics.ScraperPrefix + receiverID.String() + obsmetrics.NameSep + scraper.String() + obsmetrics.ScraperMetricsOperationSuffix
ctx, _ := trace.StartSpan(scraperCtx, spanName)
return ctx
}
@ -105,19 +77,19 @@ func EndMetricsScrapeOp(
span := trace.FromContext(scraperCtx)
if gLevel != configtelemetry.LevelNone {
if obsreportconfig.Level != configtelemetry.LevelNone {
stats.Record(
scraperCtx,
mScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
mScraperErroredMetricPoints.M(int64(numErroredMetrics)))
obsmetrics.ScraperScrapedMetricPoints.M(int64(numScrapedMetrics)),
obsmetrics.ScraperErroredMetricPoints.M(int64(numErroredMetrics)))
}
// end span according to errors
if span.IsRecordingEvents() {
span.AddAttributes(
trace.StringAttribute(FormatKey, string(config.MetricsDataType)),
trace.Int64Attribute(ScrapedMetricPointsKey, int64(numScrapedMetrics)),
trace.Int64Attribute(ErroredMetricPointsKey, int64(numErroredMetrics)),
trace.StringAttribute(obsmetrics.FormatKey, string(config.MetricsDataType)),
trace.Int64Attribute(obsmetrics.ScrapedMetricPointsKey, int64(numScrapedMetrics)),
trace.Int64Attribute(obsmetrics.ErroredMetricPointsKey, int64(numErroredMetrics)),
)
span.SetStatus(errToStatus(err))

View File

@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// obsreport_test instead of just obsreport to avoid dependency cycle between
// obsreport_test and obsreporttest
package obsreport_test
package obsreport
import (
"context"
@ -30,7 +28,8 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
"go.opentelemetry.io/collector/receiver/scrapererror"
)
@ -55,40 +54,6 @@ type receiveTestParams struct {
err error
}
func TestConfigure(t *testing.T) {
tests := []struct {
name string
level configtelemetry.Level
wantViews []*view.View
}{
{
name: "none",
level: configtelemetry.LevelNone,
},
{
name: "basic",
level: configtelemetry.LevelBasic,
wantViews: obsreport.AllViews(),
},
{
name: "normal",
level: configtelemetry.LevelNormal,
wantViews: obsreport.AllViews(),
},
{
name: "detailed",
level: configtelemetry.LevelDetailed,
wantViews: obsreport.AllViews(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotViews := obsreport.Configure(tt.level)
assert.Equal(t, tt.wantViews, gotViews)
})
}
}
func TestReceiveTraceDataOp(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
@ -102,14 +67,14 @@ func TestReceiveTraceDataOp(t *testing.T) {
t.Name(), trace.WithSampler(trace.AlwaysSample()))
defer parentSpan.End()
receiverCtx := obsreport.ReceiverContext(parentCtx, receiver, transport)
receiverCtx := ReceiverContext(parentCtx, receiver, transport)
params := []receiveTestParams{
{transport, errFake},
{"", nil},
}
rcvdSpans := []int{13, 42}
for i, param := range params {
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartTraceDataReceiveOp(receiverCtx)
assert.NotNil(t, ctx)
@ -129,22 +94,22 @@ func TestReceiveTraceDataOp(t *testing.T) {
switch params[i].err {
case nil:
acceptedSpans += rcvdSpans[i]
assert.Equal(t, int64(rcvdSpans[i]), span.Attributes[obsreport.AcceptedSpansKey])
assert.Equal(t, int64(0), span.Attributes[obsreport.RefusedSpansKey])
assert.Equal(t, int64(rcvdSpans[i]), span.Attributes[obsmetrics.AcceptedSpansKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.RefusedSpansKey])
assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status)
case errFake:
refusedSpans += rcvdSpans[i]
assert.Equal(t, int64(0), span.Attributes[obsreport.AcceptedSpansKey])
assert.Equal(t, int64(rcvdSpans[i]), span.Attributes[obsreport.RefusedSpansKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.AcceptedSpansKey])
assert.Equal(t, int64(rcvdSpans[i]), span.Attributes[obsmetrics.RefusedSpansKey])
assert.Equal(t, params[i].err.Error(), span.Status.Message)
default:
t.Fatalf("unexpected param: %v", params[i])
}
switch params[i].transport {
case "":
assert.NotContains(t, span.Attributes, obsreport.TransportKey)
assert.NotContains(t, span.Attributes, obsmetrics.TransportKey)
default:
assert.Equal(t, params[i].transport, span.Attributes[obsreport.TransportKey])
assert.Equal(t, params[i].transport, span.Attributes[obsmetrics.TransportKey])
}
}
obsreporttest.CheckReceiverTraces(t, receiver, transport, int64(acceptedSpans), int64(refusedSpans))
@ -163,14 +128,14 @@ func TestReceiveLogsOp(t *testing.T) {
t.Name(), trace.WithSampler(trace.AlwaysSample()))
defer parentSpan.End()
receiverCtx := obsreport.ReceiverContext(parentCtx, receiver, transport)
receiverCtx := ReceiverContext(parentCtx, receiver, transport)
params := []receiveTestParams{
{transport, errFake},
{"", nil},
}
rcvdLogRecords := []int{13, 42}
for i, param := range params {
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartLogsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)
@ -190,22 +155,22 @@ func TestReceiveLogsOp(t *testing.T) {
switch params[i].err {
case nil:
acceptedLogRecords += rcvdLogRecords[i]
assert.Equal(t, int64(rcvdLogRecords[i]), span.Attributes[obsreport.AcceptedLogRecordsKey])
assert.Equal(t, int64(0), span.Attributes[obsreport.RefusedLogRecordsKey])
assert.Equal(t, int64(rcvdLogRecords[i]), span.Attributes[obsmetrics.AcceptedLogRecordsKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.RefusedLogRecordsKey])
assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status)
case errFake:
refusedLogRecords += rcvdLogRecords[i]
assert.Equal(t, int64(0), span.Attributes[obsreport.AcceptedLogRecordsKey])
assert.Equal(t, int64(rcvdLogRecords[i]), span.Attributes[obsreport.RefusedLogRecordsKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.AcceptedLogRecordsKey])
assert.Equal(t, int64(rcvdLogRecords[i]), span.Attributes[obsmetrics.RefusedLogRecordsKey])
assert.Equal(t, params[i].err.Error(), span.Status.Message)
default:
t.Fatalf("unexpected param: %v", params[i])
}
switch params[i].transport {
case "":
assert.NotContains(t, span.Attributes, obsreport.TransportKey)
assert.NotContains(t, span.Attributes, obsmetrics.TransportKey)
default:
assert.Equal(t, params[i].transport, span.Attributes[obsreport.TransportKey])
assert.Equal(t, params[i].transport, span.Attributes[obsmetrics.TransportKey])
}
}
obsreporttest.CheckReceiverLogs(t, receiver, transport, int64(acceptedLogRecords), int64(refusedLogRecords))
@ -224,14 +189,14 @@ func TestReceiveMetricsOp(t *testing.T) {
t.Name(), trace.WithSampler(trace.AlwaysSample()))
defer parentSpan.End()
receiverCtx := obsreport.ReceiverContext(parentCtx, receiver, transport)
receiverCtx := ReceiverContext(parentCtx, receiver, transport)
params := []receiveTestParams{
{transport, errFake},
{"", nil},
}
rcvdMetricPts := []int{23, 29}
for i, param := range params {
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartMetricsReceiveOp(receiverCtx)
assert.NotNil(t, ctx)
@ -251,22 +216,22 @@ func TestReceiveMetricsOp(t *testing.T) {
switch params[i].err {
case nil:
acceptedMetricPoints += rcvdMetricPts[i]
assert.Equal(t, int64(rcvdMetricPts[i]), span.Attributes[obsreport.AcceptedMetricPointsKey])
assert.Equal(t, int64(0), span.Attributes[obsreport.RefusedMetricPointsKey])
assert.Equal(t, int64(rcvdMetricPts[i]), span.Attributes[obsmetrics.AcceptedMetricPointsKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.RefusedMetricPointsKey])
assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status)
case errFake:
refusedMetricPoints += rcvdMetricPts[i]
assert.Equal(t, int64(0), span.Attributes[obsreport.AcceptedMetricPointsKey])
assert.Equal(t, int64(rcvdMetricPts[i]), span.Attributes[obsreport.RefusedMetricPointsKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.AcceptedMetricPointsKey])
assert.Equal(t, int64(rcvdMetricPts[i]), span.Attributes[obsmetrics.RefusedMetricPointsKey])
assert.Equal(t, params[i].err.Error(), span.Status.Message)
default:
t.Fatalf("unexpected param: %v", params[i])
}
switch params[i].transport {
case "":
assert.NotContains(t, span.Attributes, obsreport.TransportKey)
assert.NotContains(t, span.Attributes, obsmetrics.TransportKey)
default:
assert.Equal(t, params[i].transport, span.Attributes[obsreport.TransportKey])
assert.Equal(t, params[i].transport, span.Attributes[obsmetrics.TransportKey])
}
}
@ -286,14 +251,14 @@ func TestScrapeMetricsDataOp(t *testing.T) {
t.Name(), trace.WithSampler(trace.AlwaysSample()))
defer parentSpan.End()
receiverCtx := obsreport.ScraperContext(parentCtx, receiver, scraper)
receiverCtx := ScraperContext(parentCtx, receiver, scraper)
errParams := []error{partialErrFake, errFake, nil}
scrapedMetricPts := []int{23, 29, 15}
for i, err := range errParams {
ctx := obsreport.StartMetricsScrapeOp(receiverCtx, receiver, scraper)
ctx := StartMetricsScrapeOp(receiverCtx, receiver, scraper)
assert.NotNil(t, ctx)
obsreport.EndMetricsScrapeOp(
EndMetricsScrapeOp(
ctx,
scrapedMetricPts[i],
err)
@ -308,19 +273,19 @@ func TestScrapeMetricsDataOp(t *testing.T) {
switch errParams[i] {
case nil:
scrapedMetricPoints += scrapedMetricPts[i]
assert.Equal(t, int64(scrapedMetricPts[i]), span.Attributes[obsreport.ScrapedMetricPointsKey])
assert.Equal(t, int64(0), span.Attributes[obsreport.ErroredMetricPointsKey])
assert.Equal(t, int64(scrapedMetricPts[i]), span.Attributes[obsmetrics.ScrapedMetricPointsKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.ErroredMetricPointsKey])
assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status)
case errFake:
erroredMetricPoints += scrapedMetricPts[i]
assert.Equal(t, int64(0), span.Attributes[obsreport.ScrapedMetricPointsKey])
assert.Equal(t, int64(scrapedMetricPts[i]), span.Attributes[obsreport.ErroredMetricPointsKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.ScrapedMetricPointsKey])
assert.Equal(t, int64(scrapedMetricPts[i]), span.Attributes[obsmetrics.ErroredMetricPointsKey])
assert.Equal(t, errParams[i].Error(), span.Status.Message)
case partialErrFake:
scrapedMetricPoints += scrapedMetricPts[i]
erroredMetricPoints++
assert.Equal(t, int64(scrapedMetricPts[i]), span.Attributes[obsreport.ScrapedMetricPointsKey])
assert.Equal(t, int64(1), span.Attributes[obsreport.ErroredMetricPointsKey])
assert.Equal(t, int64(scrapedMetricPts[i]), span.Attributes[obsmetrics.ScrapedMetricPointsKey])
assert.Equal(t, int64(1), span.Attributes[obsmetrics.ErroredMetricPointsKey])
assert.Equal(t, errParams[i].Error(), span.Status.Message)
default:
t.Fatalf("unexpected err param: %v", errParams[i])
@ -343,7 +308,7 @@ func TestExportTraceDataOp(t *testing.T) {
t.Name(), trace.WithSampler(trace.AlwaysSample()))
defer parentSpan.End()
obsrep := obsreport.NewExporter(obsreport.ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter})
obsrep := NewExporter(ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter})
errs := []error{nil, errFake}
numExportedSpans := []int{22, 14}
for i, err := range errs {
@ -361,13 +326,13 @@ func TestExportTraceDataOp(t *testing.T) {
switch errs[i] {
case nil:
sentSpans += numExportedSpans[i]
assert.Equal(t, int64(numExportedSpans[i]), span.Attributes[obsreport.SentSpansKey])
assert.Equal(t, int64(0), span.Attributes[obsreport.FailedToSendSpansKey])
assert.Equal(t, int64(numExportedSpans[i]), span.Attributes[obsmetrics.SentSpansKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.FailedToSendSpansKey])
assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status)
case errFake:
failedToSendSpans += numExportedSpans[i]
assert.Equal(t, int64(0), span.Attributes[obsreport.SentSpansKey])
assert.Equal(t, int64(numExportedSpans[i]), span.Attributes[obsreport.FailedToSendSpansKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.SentSpansKey])
assert.Equal(t, int64(numExportedSpans[i]), span.Attributes[obsmetrics.FailedToSendSpansKey])
assert.Equal(t, errs[i].Error(), span.Status.Message)
default:
t.Fatalf("unexpected error: %v", errs[i])
@ -390,7 +355,7 @@ func TestExportMetricsOp(t *testing.T) {
t.Name(), trace.WithSampler(trace.AlwaysSample()))
defer parentSpan.End()
obsrep := obsreport.NewExporter(obsreport.ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter})
obsrep := NewExporter(ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter})
errs := []error{nil, errFake}
toSendMetricPoints := []int{17, 23}
@ -410,13 +375,13 @@ func TestExportMetricsOp(t *testing.T) {
switch errs[i] {
case nil:
sentMetricPoints += toSendMetricPoints[i]
assert.Equal(t, int64(toSendMetricPoints[i]), span.Attributes[obsreport.SentMetricPointsKey])
assert.Equal(t, int64(0), span.Attributes[obsreport.FailedToSendMetricPointsKey])
assert.Equal(t, int64(toSendMetricPoints[i]), span.Attributes[obsmetrics.SentMetricPointsKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.FailedToSendMetricPointsKey])
assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status)
case errFake:
failedToSendMetricPoints += toSendMetricPoints[i]
assert.Equal(t, int64(0), span.Attributes[obsreport.SentMetricPointsKey])
assert.Equal(t, int64(toSendMetricPoints[i]), span.Attributes[obsreport.FailedToSendMetricPointsKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.SentMetricPointsKey])
assert.Equal(t, int64(toSendMetricPoints[i]), span.Attributes[obsmetrics.FailedToSendMetricPointsKey])
assert.Equal(t, errs[i].Error(), span.Status.Message)
default:
t.Fatalf("unexpected error: %v", errs[i])
@ -439,7 +404,7 @@ func TestExportLogsOp(t *testing.T) {
t.Name(), trace.WithSampler(trace.AlwaysSample()))
defer parentSpan.End()
obsrep := obsreport.NewExporter(obsreport.ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter})
obsrep := NewExporter(ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter})
errs := []error{nil, errFake}
toSendLogRecords := []int{17, 23}
for i, err := range errs {
@ -458,13 +423,13 @@ func TestExportLogsOp(t *testing.T) {
switch errs[i] {
case nil:
sentLogRecords += toSendLogRecords[i]
assert.Equal(t, int64(toSendLogRecords[i]), span.Attributes[obsreport.SentLogRecordsKey])
assert.Equal(t, int64(0), span.Attributes[obsreport.FailedToSendLogRecordsKey])
assert.Equal(t, int64(toSendLogRecords[i]), span.Attributes[obsmetrics.SentLogRecordsKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.FailedToSendLogRecordsKey])
assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status)
case errFake:
failedToSendLogRecords += toSendLogRecords[i]
assert.Equal(t, int64(0), span.Attributes[obsreport.SentLogRecordsKey])
assert.Equal(t, int64(toSendLogRecords[i]), span.Attributes[obsreport.FailedToSendLogRecordsKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.SentLogRecordsKey])
assert.Equal(t, int64(toSendLogRecords[i]), span.Attributes[obsmetrics.FailedToSendLogRecordsKey])
assert.Equal(t, errs[i].Error(), span.Status.Message)
default:
t.Fatalf("unexpected error: %v", errs[i])
@ -491,7 +456,7 @@ func TestReceiveWithLongLivedCtx(t *testing.T) {
parentCtx, parentSpan := trace.StartSpan(context.Background(), t.Name())
defer parentSpan.End()
longLivedCtx := obsreport.ReceiverContext(parentCtx, receiver, transport)
longLivedCtx := ReceiverContext(parentCtx, receiver, transport)
ops := []struct {
numSpans int
err error
@ -502,10 +467,10 @@ func TestReceiveWithLongLivedCtx(t *testing.T) {
for _, op := range ops {
// Use a new context on each operation to simulate distinct operations
// under the same long lived context.
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartTraceDataReceiveOp(
longLivedCtx,
obsreport.WithLongLivedCtx())
WithLongLivedCtx())
assert.NotNil(t, ctx)
rec.EndTraceDataReceiveOp(
@ -526,15 +491,15 @@ func TestReceiveWithLongLivedCtx(t *testing.T) {
assert.Equal(t, parentSpan.SpanContext().TraceID, link.TraceID)
assert.Equal(t, parentSpan.SpanContext().SpanID, link.SpanID)
assert.Equal(t, "receiver/"+receiver.String()+"/TraceDataReceived", span.Name)
assert.Equal(t, transport, span.Attributes[obsreport.TransportKey])
assert.Equal(t, transport, span.Attributes[obsmetrics.TransportKey])
switch ops[i].err {
case nil:
assert.Equal(t, int64(ops[i].numSpans), span.Attributes[obsreport.AcceptedSpansKey])
assert.Equal(t, int64(0), span.Attributes[obsreport.RefusedSpansKey])
assert.Equal(t, int64(ops[i].numSpans), span.Attributes[obsmetrics.AcceptedSpansKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.RefusedSpansKey])
assert.Equal(t, trace.Status{Code: trace.StatusCodeOK}, span.Status)
case errFake:
assert.Equal(t, int64(0), span.Attributes[obsreport.AcceptedSpansKey])
assert.Equal(t, int64(ops[i].numSpans), span.Attributes[obsreport.RefusedSpansKey])
assert.Equal(t, int64(0), span.Attributes[obsmetrics.AcceptedSpansKey])
assert.Equal(t, int64(ops[i].numSpans), span.Attributes[obsmetrics.RefusedSpansKey])
assert.Equal(t, ops[i].err.Error(), span.Status.Message)
default:
t.Fatalf("unexpected error: %v", ops[i].err)
@ -551,7 +516,7 @@ func TestProcessorTraceData(t *testing.T) {
const refusedSpans = 19
const droppedSpans = 13
obsrep := obsreport.NewProcessor(obsreport.ProcessorSettings{Level: configtelemetry.LevelNormal, ProcessorID: processor})
obsrep := NewProcessor(ProcessorSettings{Level: configtelemetry.LevelNormal, ProcessorID: processor})
obsrep.TracesAccepted(context.Background(), acceptedSpans)
obsrep.TracesRefused(context.Background(), refusedSpans)
obsrep.TracesDropped(context.Background(), droppedSpans)
@ -568,7 +533,7 @@ func TestProcessorMetricsData(t *testing.T) {
const refusedPoints = 11
const droppedPoints = 17
obsrep := obsreport.NewProcessor(obsreport.ProcessorSettings{Level: configtelemetry.LevelNormal, ProcessorID: processor})
obsrep := NewProcessor(ProcessorSettings{Level: configtelemetry.LevelNormal, ProcessorID: processor})
obsrep.MetricsAccepted(context.Background(), acceptedPoints)
obsrep.MetricsRefused(context.Background(), refusedPoints)
obsrep.MetricsDropped(context.Background(), droppedPoints)
@ -623,9 +588,9 @@ func TestProcessorMetricViews(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
obsreport.Configure(tt.level)
got := obsreport.ProcessorMetricViews("test_type", legacyViews)
assert.Equal(t, tt.want, got)
obsreportconfig.Configure(tt.level)
got := ProcessorMetricViews("test_type", &obsreportconfig.ObsMetrics{Views: legacyViews})
assert.Equal(t, tt.want, got.Views)
})
}
}
@ -639,7 +604,7 @@ func TestProcessorLogRecords(t *testing.T) {
const refusedRecords = 11
const droppedRecords = 17
obsrep := obsreport.NewProcessor(obsreport.ProcessorSettings{Level: configtelemetry.LevelNormal, ProcessorID: processor})
obsrep := NewProcessor(ProcessorSettings{Level: configtelemetry.LevelNormal, ProcessorID: processor})
obsrep.LogsAccepted(context.Background(), acceptedRecords)
obsrep.LogsRefused(context.Background(), refusedRecords)
obsrep.LogsDropped(context.Background(), droppedRecords)

View File

@ -25,7 +25,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/internal/obsreportconfig"
)
var (
@ -45,7 +45,8 @@ var (
// SetupRecordedMetricsTest does setup the testing environment to check the metrics recorded by receivers, producers or exporters.
// The returned function should be deferred.
func SetupRecordedMetricsTest() (func(), error) {
views := obsreport.Configure(configtelemetry.LevelNormal)
obsMetrics := obsreportconfig.Configure(configtelemetry.LevelNormal)
views := obsMetrics.Views
err := view.Register(views...)
if err != nil {
return nil, err

View File

@ -19,11 +19,13 @@ import (
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/obsreport"
)
var (
processorTagKey = tag.MustNewKey(obsreport.ProcessorKey)
processorTagKey = tag.MustNewKey(obsmetrics.ProcessorKey)
statBatchSizeTriggerSend = stats.Int64("batch_size_trigger_send", "Number of times the batch was sent due to a size trigger", stats.UnitDimensionless)
statTimeoutTriggerSend = stats.Int64("timeout_trigger_send", "Number of times the batch was sent due to a timeout trigger", stats.UnitDimensionless)
statBatchSendSize = stats.Int64("batch_send_size", "Number of units in the batch", stats.UnitDimensionless)
@ -68,12 +70,14 @@ func MetricViews() []*view.View {
1000_000, 2000_000, 3000_000, 4000_000, 5000_000, 6000_000, 7000_000, 8000_000, 9000_000),
}
legacyViews := []*view.View{
countBatchSizeTriggerSendView,
countTimeoutTriggerSendView,
distributionBatchSendSizeView,
distributionBatchSendSizeBytesView,
legacyViews := &obsreportconfig.ObsMetrics{
Views: []*view.View{
countBatchSizeTriggerSendView,
countTimeoutTriggerSendView,
distributionBatchSendSizeView,
distributionBatchSendSizeBytesView,
},
}
return obsreport.ProcessorMetricViews(typeStr, legacyViews)
return obsreport.ProcessorMetricViews(typeStr, legacyViews).Views
}

View File

@ -23,7 +23,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)
// ErrSkipProcessingData is a sentinel value to indicate when traces or metrics should intentionally be dropped
@ -79,6 +79,6 @@ func fromOptions(options []Option) *baseSettings {
func spanAttributes(id config.ComponentID) []trace.Attribute {
return []trace.Attribute{
trace.StringAttribute(obsreport.ProcessorKey, id.String()),
trace.StringAttribute(obsmetrics.ProcessorKey, id.String()),
}
}

View File

@ -27,7 +27,7 @@ import (
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter/jaegerexporter"
"go.opentelemetry.io/collector/internal/collector/telemetry"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/receiver/kafkareceiver"
telemetry2 "go.opentelemetry.io/collector/service/internal/telemetry"
@ -61,10 +61,11 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u
}
var views []*view.View
obsMetrics := obsreportconfig.Configure(level)
views = append(views, batchprocessor.MetricViews()...)
views = append(views, jaegerexporter.MetricViews()...)
views = append(views, kafkareceiver.MetricViews()...)
views = append(views, obsreport.Configure(level)...)
views = append(views, obsMetrics.Views...)
views = append(views, processMetricsViews.Views()...)
tel.views = views