Support logs scraper (#12116)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
This PR added support for logs scraper

<!-- Issue number if applicable -->
#### Link to tracking issue
Relevant to #11238 

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Added

<!--Describe the documentation added.-->
#### Documentation
Added

<!--Please delete paragraphs that you did not use before submitting.-->
This commit is contained in:
Chao Weng 2025-01-22 12:16:41 +08:00 committed by GitHub
parent 9757ead255
commit 6740a28a80
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 560 additions and 53 deletions

View File

@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: scraper
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support logs scraper
# One or more tracking issues or pull requests related to the change
issues: [12116]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]

View File

@ -29,12 +29,19 @@ type Settings struct {
type Factory interface {
component.Factory
// CreateLogs creates a Logs scraper based on this config.
// If the scraper type does not support logs,
// this function returns the error [pipeline.ErrSignalNotSupported].
CreateLogs(ctx context.Context, set Settings, cfg component.Config) (Logs, error)
// CreateMetrics creates a Metrics scraper based on this config.
// If the scraper type does not support metrics,
// this function returns the error [pipeline.ErrSignalNotSupported].
// Implementers can assume `next` is never nil.
CreateMetrics(ctx context.Context, set Settings, cfg component.Config) (Metrics, error)
// LogsStability gets the stability level of the Logs scraper.
LogsStability() component.StabilityLevel
// MetricsStability gets the stability level of the Metrics scraper.
MetricsStability() component.StabilityLevel
@ -59,7 +66,9 @@ func (f factoryOptionFunc) applyOption(o *factory) {
type factory struct {
cfgType component.Type
component.CreateDefaultConfigFunc
CreateLogsFunc
CreateMetricsFunc
logsStabilityLevel component.StabilityLevel
metricsStabilityLevel component.StabilityLevel
}
@ -69,13 +78,28 @@ func (f *factory) Type() component.Type {
func (f *factory) unexportedFactoryFunc() {}
func (f *factory) LogsStability() component.StabilityLevel {
return f.logsStabilityLevel
}
func (f *factory) MetricsStability() component.StabilityLevel {
return f.metricsStabilityLevel
}
// CreateLogsFunc is the equivalent of Factory.CreateLogs().
type CreateLogsFunc func(context.Context, Settings, component.Config) (Logs, error)
// CreateMetricsFunc is the equivalent of Factory.CreateMetrics().
type CreateMetricsFunc func(context.Context, Settings, component.Config) (Metrics, error)
// CreateLogs implements Factory.CreateLogs.
func (f CreateLogsFunc) CreateLogs(ctx context.Context, set Settings, cfg component.Config) (Logs, error) {
if f == nil {
return nil, pipeline.ErrSignalNotSupported
}
return f(ctx, set, cfg)
}
// CreateMetrics implements Factory.CreateMetrics.
func (f CreateMetricsFunc) CreateMetrics(ctx context.Context, set Settings, cfg component.Config) (Metrics, error) {
if f == nil {
@ -84,6 +108,14 @@ func (f CreateMetricsFunc) CreateMetrics(ctx context.Context, set Settings, cfg
return f(ctx, set, cfg)
}
// WithLogs overrides the default "error not supported" implementation for CreateLogs and the default "undefined" stability level.
func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {
o.logsStabilityLevel = sl
o.CreateLogsFunc = createLogs
})
}
// WithMetrics overrides the default "error not supported" implementation for CreateMetrics and the default "undefined" stability level.
func WithMetrics(createMetrics CreateMetricsFunc, sl component.StabilityLevel) FactoryOption {
return factoryOptionFunc(func(o *factory) {

View File

@ -31,7 +31,9 @@ func TestNewFactory(t *testing.T) {
func() component.Config { return &defaultCfg })
assert.EqualValues(t, testType, f.Type())
assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig())
_, err := f.CreateMetrics(context.Background(), nopSettings(), &defaultCfg)
_, err := f.CreateLogs(context.Background(), nopSettings(), &defaultCfg)
require.ErrorIs(t, err, pipeline.ErrSignalNotSupported)
_, err = f.CreateMetrics(context.Background(), nopSettings(), &defaultCfg)
require.ErrorIs(t, err, pipeline.ErrSignalNotSupported)
}
@ -41,12 +43,17 @@ func TestNewFactoryWithOptions(t *testing.T) {
f := NewFactory(
testType,
func() component.Config { return &defaultCfg },
WithLogs(createLogs, component.StabilityLevelAlpha),
WithMetrics(createMetrics, component.StabilityLevelAlpha))
assert.EqualValues(t, testType, f.Type())
assert.EqualValues(t, &defaultCfg, f.CreateDefaultConfig())
assert.Equal(t, component.StabilityLevelAlpha, f.LogsStability())
_, err := f.CreateLogs(context.Background(), Settings{}, &defaultCfg)
require.NoError(t, err)
assert.Equal(t, component.StabilityLevelAlpha, f.MetricsStability())
_, err := f.CreateMetrics(context.Background(), Settings{}, &defaultCfg)
_, err = f.CreateMetrics(context.Background(), Settings{}, &defaultCfg)
require.NoError(t, err)
}
@ -87,6 +94,10 @@ func TestMakeFactoryMap(t *testing.T) {
}
}
func createLogs(context.Context, Settings, component.Config) (Logs, error) {
return NewLogs(newTestScrapeLogsFunc(nil))
}
func createMetrics(context.Context, Settings, component.Config) (Metrics, error) {
return NewMetrics(newTestScrapeMetricsFunc(nil))
}

View File

@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
@ -179,6 +180,30 @@ func (sc *controller[T]) startScraping() {
}()
}
// NewLogsController creates a receiver.Logs with the configured options, that can control multiple scraper.Logs.
func NewLogsController(cfg *ControllerConfig,
rSet receiver.Settings,
nextConsumer consumer.Logs,
options ...ControllerOption,
) (receiver.Logs, error) {
co := getOptions(options)
scrapers := make([]scraper.Logs, 0, len(co.factoriesWithConfig))
for _, fwc := range co.factoriesWithConfig {
set := getSettings(fwc.f.Type(), rSet)
s, err := fwc.f.CreateLogs(context.Background(), set, fwc.cfg)
if err != nil {
return nil, err
}
s, err = wrapObsLogs(s, rSet.ID, set.ID, set.TelemetrySettings)
if err != nil {
return nil, err
}
scrapers = append(scrapers, s)
}
return newController[scraper.Logs](
cfg, rSet, scrapers, func(c *controller[scraper.Logs]) { scrapeLogs(c, nextConsumer) }, co.tickerCh)
}
// NewMetricsController creates a receiver.Metrics with the configured options, that can control multiple scraper.Metrics.
func NewMetricsController(cfg *ControllerConfig,
rSet receiver.Settings,
@ -203,6 +228,25 @@ func NewMetricsController(cfg *ControllerConfig,
cfg, rSet, scrapers, func(c *controller[scraper.Metrics]) { scrapeMetrics(c, nextConsumer) }, co.tickerCh)
}
func scrapeLogs(c *controller[scraper.Logs], nextConsumer consumer.Logs) {
ctx, done := withScrapeContext(c.timeout)
defer done()
logs := plog.NewLogs()
for i := range c.scrapers {
md, err := c.scrapers[i].ScrapeLogs(ctx)
if err != nil && !scrapererror.IsPartialScrapeError(err) {
continue
}
md.ResourceLogs().MoveAndAppendTo(logs.ResourceLogs())
}
logRecordCount := logs.LogRecordCount()
ctx = c.obsrecv.StartMetricsOp(ctx)
err := nextConsumer.ConsumeLogs(ctx, logs)
c.obsrecv.EndMetricsOp(ctx, "", logRecordCount, err)
}
func scrapeMetrics(c *controller[scraper.Metrics], nextConsumer consumer.Metrics) {
ctx, done := withScrapeContext(c.timeout)
defer done()

View File

@ -16,12 +16,12 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.uber.org/multierr"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
@ -52,13 +52,26 @@ func (ts *testClose) shutdown(context.Context) error {
return ts.err
}
type testScrapeMetrics struct {
type testScrape struct {
ch chan int
timesScrapeCalled int
err error
}
func (ts *testScrapeMetrics) scrape(context.Context) (pmetric.Metrics, error) {
func (ts *testScrape) scrapeLogs(context.Context) (plog.Logs, error) {
ts.timesScrapeCalled++
ts.ch <- ts.timesScrapeCalled
if ts.err != nil {
return plog.Logs{}, ts.err
}
md := plog.NewLogs()
md.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("")
return md, nil
}
func (ts *testScrape) scrapeMetrics(context.Context) (pmetric.Metrics, error) {
ts.timesScrapeCalled++
ts.ch <- ts.timesScrapeCalled
@ -78,7 +91,7 @@ func newTestNoDelaySettings() *ControllerConfig {
}
}
type metricsTestCase struct {
type scraperTestCase struct {
name string
scrapers int
@ -92,8 +105,119 @@ type metricsTestCase struct {
closeErr error
}
func TestScrapeController(t *testing.T) {
testCases := []metricsTestCase{
func TestLogsScrapeController(t *testing.T) {
testCases := []scraperTestCase{
{
name: "NoScrapers",
},
{
name: "AddLogsScrapersWithCollectionInterval",
scrapers: 2,
expectScraped: true,
},
{
name: "AddLogsScrapers_ScrapeError",
scrapers: 2,
scrapeErr: errors.New("err1"),
},
{
name: "AddLogsScrapersWithInitializeAndClose",
scrapers: 2,
initialize: true,
expectScraped: true,
close: true,
},
{
name: "AddLogsScrapersWithInitializeAndCloseErrors",
scrapers: 2,
initialize: true,
close: true,
initializeErr: errors.New("err1"),
closeErr: errors.New("err2"),
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
receiverID := component.MustNewID("receiver")
tt := metadatatest.SetupTelemetry()
tel := tt.NewTelemetrySettings()
_, parentSpan := tel.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
initializeChs := make([]chan bool, test.scrapers)
scrapeLogsChs := make([]chan int, test.scrapers)
closeChs := make([]chan bool, test.scrapers)
options := configureLogOptions(t, test, initializeChs, scrapeLogsChs, closeChs)
tickerCh := make(chan time.Time)
options = append(options, WithTickerChannel(tickerCh))
sink := new(consumertest.LogsSink)
cfg := newTestNoDelaySettings()
if test.scraperControllerSettings != nil {
cfg = test.scraperControllerSettings
}
mr, err := NewLogsController(cfg, receiver.Settings{ID: receiverID, TelemetrySettings: tel, BuildInfo: component.NewDefaultBuildInfo()}, sink, options...)
require.NoError(t, err)
err = mr.Start(context.Background(), componenttest.NewNopHost())
expectedStartErr := getExpectedStartErr(test)
if expectedStartErr != nil {
assert.Equal(t, expectedStartErr, err)
} else if test.initialize {
assertChannelsCalled(t, initializeChs, "start was not called")
}
const iterations = 5
if test.expectScraped || test.scrapeErr != nil {
// validate that scrape is called at least N times for each configured scraper
for _, ch := range scrapeLogsChs {
<-ch
}
// Consume the initial scrapes on start
for i := 0; i < iterations; i++ {
tickerCh <- time.Now()
for _, ch := range scrapeLogsChs {
<-ch
}
}
// wait until all calls to scrape have completed
if test.scrapeErr == nil {
require.Eventually(t, func() bool {
return sink.LogRecordCount() == (1+iterations)*(test.scrapers)
}, time.Second, time.Millisecond)
}
if test.expectScraped {
assert.GreaterOrEqual(t, sink.LogRecordCount(), iterations)
}
spans := tt.SpanRecorder.Ended()
assertReceiverSpan(t, spans)
assertScraperSpan(t, test.scrapeErr, spans, "scraper/scraper/ScrapeLogs")
assertLogsScraperObsMetrics(t, tt, receiverID, component.MustNewID("scraper"), test.scrapeErr, sink)
}
err = mr.Shutdown(context.Background())
expectedShutdownErr := getExpectedShutdownErr(test)
if expectedShutdownErr != nil {
assert.EqualError(t, err, expectedShutdownErr.Error())
} else if test.close {
assertChannelsCalled(t, closeChs, "shutdown was not called")
}
})
}
}
func TestMetricsScrapeController(t *testing.T) {
testCases := []scraperTestCase{
{
name: "NoScrapers",
},
@ -128,11 +252,7 @@ func TestScrapeController(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
receiverID := component.MustNewID("receiver")
tt := metadatatest.SetupTelemetry()
tel := tt.NewTelemetrySettings()
// TODO: Add capability for tracing testing in metadatatest.
spanRecorder := new(tracetest.SpanRecorder)
tel.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(spanRecorder))
_, parentSpan := tel.TracerProvider.Tracer("test").Start(context.Background(), t.Name())
defer parentSpan.End()
@ -190,10 +310,10 @@ func TestScrapeController(t *testing.T) {
assert.GreaterOrEqual(t, sink.DataPointCount(), iterations)
}
spans := spanRecorder.Ended()
spans := tt.SpanRecorder.Ended()
assertReceiverSpan(t, spans)
assertScraperSpan(t, test.scrapeErr, spans)
assertMetrics(t, tt, receiverID, component.MustNewID("scraper"), test.scrapeErr, sink)
assertScraperSpan(t, test.scrapeErr, spans, "scraper/scraper/ScrapeMetrics")
assertMetricsScraperObsMetrics(t, tt, receiverID, component.MustNewID("scraper"), test.scrapeErr, sink)
}
err = mr.Shutdown(context.Background())
@ -207,7 +327,34 @@ func TestScrapeController(t *testing.T) {
}
}
func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ControllerOption {
func configureLogOptions(t *testing.T, test scraperTestCase, initializeChs []chan bool, scrapeLogsChs []chan int, closeChs []chan bool) []ControllerOption {
var logsOptions []ControllerOption
for i := 0; i < test.scrapers; i++ {
var scraperOptions []scraper.Option
if test.initialize {
initializeChs[i] = make(chan bool, 1)
ti := &testInitialize{ch: initializeChs[i], err: test.initializeErr}
scraperOptions = append(scraperOptions, scraper.WithStart(ti.start))
}
if test.close {
closeChs[i] = make(chan bool, 1)
tc := &testClose{ch: closeChs[i], err: test.closeErr}
scraperOptions = append(scraperOptions, scraper.WithShutdown(tc.shutdown))
}
scrapeLogsChs[i] = make(chan int)
ts := &testScrape{ch: scrapeLogsChs[i], err: test.scrapeErr}
scp, err := scraper.NewLogs(ts.scrapeLogs, scraperOptions...)
require.NoError(t, err)
logsOptions = append(logsOptions, addLogsScraper(component.MustNewType("scraper"), scp))
}
return logsOptions
}
func configureMetricOptions(t *testing.T, test scraperTestCase, initializeChs []chan bool, scrapeMetricsChs []chan int, closeChs []chan bool) []ControllerOption {
var metricOptions []ControllerOption
for i := 0; i < test.scrapers; i++ {
@ -224,8 +371,8 @@ func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []
}
scrapeMetricsChs[i] = make(chan int)
tsm := &testScrapeMetrics{ch: scrapeMetricsChs[i], err: test.scrapeErr}
scp, err := scraper.NewMetrics(tsm.scrape, scraperOptions...)
ts := &testScrape{ch: scrapeMetricsChs[i], err: test.scrapeErr}
scp, err := scraper.NewMetrics(ts.scrapeMetrics, scraperOptions...)
require.NoError(t, err)
metricOptions = append(metricOptions, AddScraper(component.MustNewType("scraper"), scp))
@ -234,11 +381,11 @@ func configureMetricOptions(t *testing.T, test metricsTestCase, initializeChs []
return metricOptions
}
func getExpectedStartErr(test metricsTestCase) error {
func getExpectedStartErr(test scraperTestCase) error {
return test.initializeErr
}
func getExpectedShutdownErr(test metricsTestCase) error {
func getExpectedShutdownErr(test scraperTestCase) error {
var errs error
if test.closeErr != nil {
@ -275,7 +422,7 @@ func assertReceiverSpan(t *testing.T, spans []sdktrace.ReadOnlySpan) {
assert.True(t, receiverSpan)
}
func assertScraperSpan(t *testing.T, expectedErr error, spans []sdktrace.ReadOnlySpan) {
func assertScraperSpan(t *testing.T, expectedErr error, spans []sdktrace.ReadOnlySpan, expectedSpanName string) {
expectedStatusCode := codes.Unset
expectedStatusMessage := ""
if expectedErr != nil {
@ -285,7 +432,7 @@ func assertScraperSpan(t *testing.T, expectedErr error, spans []sdktrace.ReadOnl
scraperSpan := false
for _, span := range spans {
if span.Name() == "scraper/scraper/ScrapeMetrics" {
if span.Name() == expectedSpanName {
scraperSpan = true
assert.Equal(t, expectedStatusCode, span.Status().Code)
assert.Equal(t, expectedStatusMessage, span.Status().Description)
@ -295,7 +442,97 @@ func assertScraperSpan(t *testing.T, expectedErr error, spans []sdktrace.ReadOnl
assert.True(t, scraperSpan)
}
func assertMetrics(t *testing.T, tt metadatatest.Telemetry, receiver component.ID, scraper component.ID, expectedErr error, sink *consumertest.MetricsSink) {
func assertLogsScraperObsMetrics(t *testing.T, tt metadatatest.Telemetry, receiver component.ID, scraper component.ID, expectedErr error, sink *consumertest.LogsSink) {
logRecordCounts := 0
for _, md := range sink.AllLogs() {
logRecordCounts += md.LogRecordCount()
}
expectedScraped := int64(sink.LogRecordCount())
expectedErrored := int64(0)
if expectedErr != nil {
var partialError scrapererror.PartialScrapeError
if errors.As(expectedErr, &partialError) {
expectedErrored = int64(partialError.Failed)
} else {
expectedScraped = int64(0)
expectedErrored = int64(sink.LogRecordCount())
}
}
tt.AssertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_receiver_accepted_metric_points",
Description: "Number of metric points successfully pushed into the pipeline. [alpha]",
Unit: "{datapoints}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String(receiverKey, receiver.String()),
attribute.String(transportTag, "")),
Value: int64(logRecordCounts),
},
},
},
},
{
Name: "otelcol_receiver_refused_metric_points",
Description: "Number of metric points that could not be pushed into the pipeline. [alpha]",
Unit: "{datapoints}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String(receiverKey, receiver.String()),
attribute.String(transportTag, "")),
Value: 0,
},
},
},
},
{
Name: "otelcol_scraper_scraped_log_records",
Description: "Number of log records successfully scraped. [alpha]",
Unit: "{datapoints}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String(receiverKey, receiver.String()),
attribute.String(scraperKey, scraper.String())),
Value: expectedScraped,
},
},
},
},
{
Name: "otelcol_scraper_errored_log_records",
Description: "Number of log records that were unable to be scraped. [alpha]",
Unit: "{datapoints}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String(receiverKey, receiver.String()),
attribute.String(scraperKey, scraper.String())),
Value: expectedErrored,
},
},
},
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
}
func assertMetricsScraperObsMetrics(t *testing.T, tt metadatatest.Telemetry, receiver component.ID, scraper component.ID, expectedErr error, sink *consumertest.MetricsSink) {
dataPointCounts := 0
for _, md := range sink.AllMetrics() {
dataPointCounts += md.DataPointCount()
@ -385,22 +622,22 @@ func assertMetrics(t *testing.T, tt metadatatest.Telemetry, receiver component.I
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
}
func TestSingleScrapePerInterval(t *testing.T) {
scrapeMetricsCh := make(chan int, 10)
tsm := &testScrapeMetrics{ch: scrapeMetricsCh}
func TestSingleLogsScraperPerInterval(t *testing.T) {
scrapeCh := make(chan int, 10)
ts := &testScrape{ch: scrapeCh}
cfg := newTestNoDelaySettings()
tickerCh := make(chan time.Time)
scp, err := scraper.NewMetrics(tsm.scrape)
scp, err := scraper.NewLogs(ts.scrapeLogs)
require.NoError(t, err)
recv, err := NewMetricsController(
recv, err := NewLogsController(
cfg,
receivertest.NewNopSettings(),
new(consumertest.MetricsSink),
AddScraper(component.MustNewType("scaper"), scp),
new(consumertest.LogsSink),
addLogsScraper(component.MustNewType("scraper"), scp),
WithTickerChannel(tickerCh),
)
require.NoError(t, err)
@ -413,7 +650,7 @@ func TestSingleScrapePerInterval(t *testing.T) {
assert.Eventually(
t,
func() bool {
return <-scrapeMetricsCh == 2
return <-scrapeCh == 2
},
300*time.Millisecond,
100*time.Millisecond,
@ -421,21 +658,91 @@ func TestSingleScrapePerInterval(t *testing.T) {
)
select {
case <-scrapeMetricsCh:
case <-scrapeCh:
assert.Fail(t, "Scrape was called more than twice")
case <-time.After(100 * time.Millisecond):
return
}
}
func TestScrapeControllerStartsOnInit(t *testing.T) {
func TestSingleMetricsScraperPerInterval(t *testing.T) {
scrapeCh := make(chan int, 10)
ts := &testScrape{ch: scrapeCh}
cfg := newTestNoDelaySettings()
tickerCh := make(chan time.Time)
scp, err := scraper.NewMetrics(ts.scrapeMetrics)
require.NoError(t, err)
recv, err := NewMetricsController(
cfg,
receivertest.NewNopSettings(),
new(consumertest.MetricsSink),
AddScraper(component.MustNewType("scraper"), scp),
WithTickerChannel(tickerCh),
)
require.NoError(t, err)
require.NoError(t, recv.Start(context.Background(), componenttest.NewNopHost()))
defer func() { require.NoError(t, recv.Shutdown(context.Background())) }()
tickerCh <- time.Now()
assert.Eventually(
t,
func() bool {
return <-scrapeCh == 2
},
300*time.Millisecond,
100*time.Millisecond,
"Make sure the scraper channel is called twice",
)
select {
case <-scrapeCh:
assert.Fail(t, "Scrape was called more than twice")
case <-time.After(100 * time.Millisecond):
return
}
}
func TestLogsScraperControllerStartsOnInit(t *testing.T) {
t.Parallel()
tsm := &testScrapeMetrics{
ts := &testScrape{
ch: make(chan int, 1),
}
scp, err := scraper.NewMetrics(tsm.scrape)
scp, err := scraper.NewLogs(ts.scrapeLogs)
require.NoError(t, err, "Must not error when creating scraper")
r, err := NewLogsController(
&ControllerConfig{
CollectionInterval: time.Hour,
InitialDelay: 0,
},
receivertest.NewNopSettings(),
new(consumertest.LogsSink),
addLogsScraper(component.MustNewType("scraper"), scp),
)
require.NoError(t, err, "Must not error when creating scrape controller")
assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Must not error on start")
<-time.After(500 * time.Nanosecond)
require.NoError(t, r.Shutdown(context.Background()), "Must not have errored on shutdown")
assert.Equal(t, 1, ts.timesScrapeCalled, "Must have been called as soon as the controller started")
}
func TestMetricsScraperControllerStartsOnInit(t *testing.T) {
t.Parallel()
ts := &testScrape{
ch: make(chan int, 1),
}
scp, err := scraper.NewMetrics(ts.scrapeMetrics)
require.NoError(t, err, "Must not error when creating scraper")
r, err := NewMetricsController(
@ -445,17 +752,56 @@ func TestScrapeControllerStartsOnInit(t *testing.T) {
},
receivertest.NewNopSettings(),
new(consumertest.MetricsSink),
AddScraper(component.MustNewType("scaper"), scp),
AddScraper(component.MustNewType("scraper"), scp),
)
require.NoError(t, err, "Must not error when creating scrape controller")
assert.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Must not error on start")
<-time.After(500 * time.Nanosecond)
require.NoError(t, r.Shutdown(context.Background()), "Must not have errored on shutdown")
assert.Equal(t, 1, tsm.timesScrapeCalled, "Must have been called as soon as the controller started")
assert.Equal(t, 1, ts.timesScrapeCalled, "Must have been called as soon as the controller started")
}
func TestScrapeControllerInitialDelay(t *testing.T) {
func TestLogsScraperControllerInitialDelay(t *testing.T) {
if testing.Short() {
t.Skip("This requires real time to pass, skipping")
return
}
t.Parallel()
var (
elapsed = make(chan time.Time, 1)
cfg = ControllerConfig{
CollectionInterval: time.Second,
InitialDelay: 300 * time.Millisecond,
}
)
scp, err := scraper.NewLogs(func(context.Context) (plog.Logs, error) {
elapsed <- time.Now()
return plog.NewLogs(), nil
})
require.NoError(t, err, "Must not error when creating scraper")
r, err := NewLogsController(
&cfg,
receivertest.NewNopSettings(),
new(consumertest.LogsSink),
addLogsScraper(component.MustNewType("scraper"), scp),
)
require.NoError(t, err, "Must not error when creating receiver")
t0 := time.Now()
require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()), "Must not error when starting")
t1 := <-elapsed
assert.GreaterOrEqual(t, t1.Sub(t0), 300*time.Millisecond, "Must have had 300ms pass as defined by initial delay")
assert.NoError(t, r.Shutdown(context.Background()), "Must not error closing down")
}
func TestMetricsScraperControllerInitialDelay(t *testing.T) {
if testing.Short() {
t.Skip("This requires real time to pass, skipping")
return
@ -481,7 +827,7 @@ func TestScrapeControllerInitialDelay(t *testing.T) {
&cfg,
receivertest.NewNopSettings(),
new(consumertest.MetricsSink),
AddScraper(component.MustNewType("scaper"), scp),
AddScraper(component.MustNewType("scraper"), scp),
)
require.NoError(t, err, "Must not error when creating receiver")
@ -494,7 +840,41 @@ func TestScrapeControllerInitialDelay(t *testing.T) {
assert.NoError(t, r.Shutdown(context.Background()), "Must not error closing down")
}
func TestShutdownBeforeScrapeCanStart(t *testing.T) {
func TestLogsScraperShutdownBeforeScrapeCanStart(t *testing.T) {
cfg := ControllerConfig{
CollectionInterval: time.Second,
InitialDelay: 5 * time.Second,
}
scp, err := scraper.NewLogs(func(context.Context) (plog.Logs, error) {
// make the scraper wait for long enough it would disrupt a shutdown.
time.Sleep(30 * time.Second)
return plog.NewLogs(), nil
})
require.NoError(t, err, "Must not error when creating scraper")
r, err := NewLogsController(
&cfg,
receivertest.NewNopSettings(),
new(consumertest.LogsSink),
addLogsScraper(component.MustNewType("scraper"), scp),
)
require.NoError(t, err, "Must not error when creating receiver")
require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
shutdown := make(chan struct{}, 1)
go func() {
assert.NoError(t, r.Shutdown(context.Background()))
close(shutdown)
}()
timer := time.NewTicker(10 * time.Second)
select {
case <-timer.C:
require.Fail(t, "shutdown should not wait for scraping")
case <-shutdown:
}
}
func TestMetricsScraperShutdownBeforeScrapeCanStart(t *testing.T) {
cfg := ControllerConfig{
CollectionInterval: time.Second,
InitialDelay: 5 * time.Second,
@ -511,7 +891,7 @@ func TestShutdownBeforeScrapeCanStart(t *testing.T) {
&cfg,
receivertest.NewNopSettings(),
new(consumertest.MetricsSink),
AddScraper(component.MustNewType("scaper"), scp),
AddScraper(component.MustNewType("scraper"), scp),
)
require.NoError(t, err, "Must not error when creating receiver")
require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
@ -527,3 +907,11 @@ func TestShutdownBeforeScrapeCanStart(t *testing.T) {
case <-shutdown:
}
}
func addLogsScraper(t component.Type, sc scraper.Logs) ControllerOption {
f := scraper.NewFactory(t, nil,
scraper.WithLogs(func(context.Context, scraper.Settings, component.Config) (scraper.Logs, error) {
return sc, nil
}, component.StabilityLevelAlpha))
return AddFactoryWithConfig(f, nil)
}

View File

@ -29,28 +29,28 @@ const (
erroredLogRecordsKey = "errored_log_records"
)
func newObsLogs(delegate scraper.ScrapeLogsFunc, receiverID component.ID, scraperID component.ID, telSettings component.TelemetrySettings) (scraper.ScrapeLogsFunc, error) {
telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(telSettings)
func wrapObsLogs(sc scraper.Logs, receiverID component.ID, scraperID component.ID, set component.TelemetrySettings) (scraper.Logs, error) {
telemetryBuilder, errBuilder := metadata.NewTelemetryBuilder(set)
if errBuilder != nil {
return nil, errBuilder
}
tracer := metadata.Tracer(telSettings)
tracer := metadata.Tracer(set)
spanName := scraperKey + spanNameSep + scraperID.String() + spanNameSep + "ScrapeLogs"
otelAttrs := metric.WithAttributeSet(attribute.NewSet(
attribute.String(receiverKey, receiverID.String()),
attribute.String(scraperKey, scraperID.String()),
))
return func(ctx context.Context) (plog.Logs, error) {
scraperFuncs := func(ctx context.Context) (plog.Logs, error) {
ctx, span := tracer.Start(ctx, spanName)
defer span.End()
md, err := delegate(ctx)
md, err := sc.ScrapeLogs(ctx)
numScrapedLogs := 0
numErroredLogs := 0
if err != nil {
telSettings.Logger.Error("Error scraping logs", zap.Error(err))
set.Logger.Error("Error scraping logs", zap.Error(err))
var partialErr scrapererror.PartialScrapeError
if errors.As(err, &partialErr) {
numErroredLogs = partialErr.Failed
@ -77,5 +77,7 @@ func newObsLogs(delegate scraper.ScrapeLogsFunc, receiverID component.ID, scrape
}
return md, err
}, nil
}
return scraper.NewLogs(scraperFuncs, scraper.WithStart(sc.Start), scraper.WithShutdown(sc.Shutdown))
}

View File

@ -18,6 +18,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/testdata"
"go.opentelemetry.io/collector/scraper"
"go.opentelemetry.io/collector/scraper/scraperhelper/internal/metadatatest"
)
@ -36,9 +37,11 @@ func TestScrapeLogsDataOp(t *testing.T) {
{items: 15, err: nil},
}
for i := range params {
sf, err := newObsLogs(func(context.Context) (plog.Logs, error) {
sm, err := scraper.NewLogs(func(context.Context) (plog.Logs, error) {
return testdata.GenerateLogs(params[i].items), params[i].err
}, receiverID, scraperID, tel)
})
require.NoError(t, err)
sf, err := wrapObsLogs(sm, receiverID, scraperID, tel)
require.NoError(t, err)
_, err = sf.ScrapeLogs(parentCtx)
require.ErrorIs(t, err, params[i].err)
@ -81,9 +84,11 @@ func TestCheckScraperLogs(t *testing.T) {
tt := metadatatest.SetupTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
sf, err := newObsLogs(func(context.Context) (plog.Logs, error) {
sm, err := scraper.NewLogs(func(context.Context) (plog.Logs, error) {
return testdata.GenerateLogs(7), nil
}, receiverID, scraperID, tt.NewTelemetrySettings())
})
require.NoError(t, err)
sf, err := wrapObsLogs(sm, receiverID, scraperID, tt.NewTelemetrySettings())
require.NoError(t, err)
_, err = sf.ScrapeLogs(context.Background())
require.NoError(t, err)