Make Logs Type Non-internal (#1456)

This allows Logs to be used in the contrib repo.

Most of the references were fixed by sed + 'make fmt'.
This commit is contained in:
Ben Keith 2020-07-29 16:58:32 -04:00 committed by GitHub
parent 3d91e3f72b
commit 4896d31e1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 152 additions and 192 deletions

View File

@ -26,7 +26,7 @@ import (
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/consumer/pdata"
)
// ExampleReceiver is for testing purposes. We are defining an example config and factory
@ -299,7 +299,7 @@ func (f *ExampleExporterFactory) CreateLogsExporter(
type ExampleExporterConsumer struct {
Traces []consumerdata.TraceData
Metrics []consumerdata.MetricsData
Logs []data.Logs
Logs []pdata.Logs
ExporterStarted bool
ExporterShutdown bool
}
@ -324,7 +324,7 @@ func (exp *ExampleExporterConsumer) ConsumeMetricsData(_ context.Context, md con
return nil
}
func (exp *ExampleExporterConsumer) ConsumeLogs(_ context.Context, ld data.Logs) error {
func (exp *ExampleExporterConsumer) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
exp.Logs = append(exp.Logs, ld)
return nil
}
@ -414,7 +414,7 @@ func (ep *ExampleProcessor) GetCapabilities() component.ProcessorCapabilities {
return component.ProcessorCapabilities{MutatesConsumedData: false}
}
func (ep *ExampleProcessor) ConsumeLogs(ctx context.Context, ld data.Logs) error {
func (ep *ExampleProcessor) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
return ep.nextConsumer.ConsumeLogs(ctx, ld)
}

View File

@ -20,7 +20,6 @@ import (
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/data"
)
// MetricsConsumerBase defines a common interface for MetricsConsumerOld and MetricsConsumer.
@ -62,9 +61,9 @@ type TraceConsumer interface {
ConsumeTraces(ctx context.Context, td pdata.Traces) error
}
// LogsConsumer is an interface that receives data.Logs, processes it
// LogsConsumer is an interface that receives pdata.Logs, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type LogsConsumer interface {
// ConsumeLogs receives data.Logs for processing.
ConsumeLogs(ctx context.Context, ld data.Logs) error
// ConsumeLogs receives pdata.Logs for processing.
ConsumeLogs(ctx context.Context, ld pdata.Logs) error
}

View File

@ -14,14 +14,73 @@
package pdata
import otlplogs "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/logs/v1"
import (
"github.com/gogo/protobuf/proto"
// NewResourceLogsSliceFromOrig creates ResourceLogsSlice from otlplogs.ResourceLogs.
// This function simply makes generated newResourceLogsSlice() function publicly
// available for internal.data.Log to call. We intentionally placed data.Log in the
// internal package so that it is not available publicly while it is experimental.
// Once the experiment is over data.Log should move to this package (pdata) and
// NewResourceLogsSliceFromOrig function will no longer be needed.
func NewResourceLogsSliceFromOrig(orig *[]*otlplogs.ResourceLogs) ResourceLogsSlice {
return ResourceLogsSlice{orig}
otlplogs "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/logs/v1"
)
// This file defines in-memory data structures to represent logs.
// Logs is the top-level struct that is propagated through the logs pipeline.
//
// This is a reference type (like builtin map).
//
// Must use NewLogs functions to create new instances.
// Important: zero-initialized instance is not valid for use.
type Logs struct {
orig *[]*otlplogs.ResourceLogs
}
// LogsFromOtlp creates the internal Logs representation from the ProtoBuf.
func LogsFromOtlp(orig []*otlplogs.ResourceLogs) Logs {
return Logs{&orig}
}
// LogsToOtlp converts the internal Logs to the ProtoBuf.
func LogsToOtlp(ld Logs) []*otlplogs.ResourceLogs {
return *ld.orig
}
// NewLogs creates a new Logs.
func NewLogs() Logs {
orig := []*otlplogs.ResourceLogs(nil)
return Logs{&orig}
}
// Clone returns a copy of Logs.
func (ld Logs) Clone() Logs {
otlp := LogsToOtlp(ld)
resourceSpansClones := make([]*otlplogs.ResourceLogs, 0, len(otlp))
for _, resourceSpans := range otlp {
resourceSpansClones = append(resourceSpansClones,
proto.Clone(resourceSpans).(*otlplogs.ResourceLogs))
}
return LogsFromOtlp(resourceSpansClones)
}
// LogRecordCount calculates the total number of log records.
func (ld Logs) LogRecordCount() int {
logCount := 0
rss := ld.ResourceLogs()
for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)
if rs.IsNil() {
continue
}
ill := rs.InstrumentationLibraryLogs()
for i := 0; i < ill.Len(); i++ {
logs := ill.At(i)
if logs.IsNil() {
continue
}
logCount += logs.Logs().Len()
}
}
return logCount
}
func (ld Logs) ResourceLogs() ResourceLogsSlice {
return ResourceLogsSlice(ld)
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package data
package pdata
import (
"testing"
@ -46,13 +46,13 @@ func TestLogRecordCount(t *testing.T) {
}
func TestLogRecordCountWithNils(t *testing.T) {
assert.EqualValues(t, 0, LogsFromProto([]*otlplogs.ResourceLogs{nil, {}}).LogRecordCount())
assert.EqualValues(t, 0, LogsFromProto([]*otlplogs.ResourceLogs{
assert.EqualValues(t, 0, LogsFromOtlp([]*otlplogs.ResourceLogs{nil, {}}).LogRecordCount())
assert.EqualValues(t, 0, LogsFromOtlp([]*otlplogs.ResourceLogs{
{
InstrumentationLibraryLogs: []*otlplogs.InstrumentationLibraryLogs{nil, {}},
},
}).LogRecordCount())
assert.EqualValues(t, 2, LogsFromProto([]*otlplogs.ResourceLogs{
assert.EqualValues(t, 2, LogsFromOtlp([]*otlplogs.ResourceLogs{
{
InstrumentationLibraryLogs: []*otlplogs.InstrumentationLibraryLogs{
{
@ -65,7 +65,7 @@ func TestLogRecordCountWithNils(t *testing.T) {
func TestToFromLogProto(t *testing.T) {
otlp := []*otlplogs.ResourceLogs(nil)
td := LogsFromProto(otlp)
td := LogsFromOtlp(otlp)
assert.EqualValues(t, NewLogs(), td)
assert.EqualValues(t, otlp, LogsToProto(td))
assert.EqualValues(t, otlp, LogsToOtlp(td))
}

View File

@ -20,21 +20,21 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
)
// PushLogsData is a helper function that is similar to ConsumeLogsData but also returns
// the number of dropped logs.
type PushLogsData func(ctx context.Context, md data.Logs) (droppedTimeSeries int, err error)
type PushLogsData func(ctx context.Context, md pdata.Logs) (droppedTimeSeries int, err error)
type logsRequest struct {
baseRequest
ld data.Logs
ld pdata.Logs
pusher PushLogsData
}
func newLogsRequest(ctx context.Context, ld data.Logs, pusher PushLogsData) request {
func newLogsRequest(ctx context.Context, ld pdata.Logs, pusher PushLogsData) request {
return &logsRequest{
baseRequest: baseRequest{ctx: ctx},
ld: ld,
@ -60,7 +60,7 @@ type logsExporter struct {
pushLogsData PushLogsData
}
func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld data.Logs) error {
func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
exporterCtx := obsreport.ExporterContext(ctx, lexp.cfg.Name())
_, err := lexp.sender.send(newLogsRequest(exporterCtx, ld, lexp.pushLogsData))
return err

View File

@ -25,7 +25,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/data/testdata"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
@ -153,7 +153,7 @@ func TestLogsExporter_WithShutdown_ReturnError(t *testing.T) {
}
func newPushLogsData(droppedTimeSeries int, retError error) PushLogsData {
return func(ctx context.Context, td data.Logs) (int, error) {
return func(ctx context.Context, td pdata.Logs) (int, error) {
return droppedTimeSeries, retError
}
}

View File

@ -20,7 +20,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/data"
)
type nopExporterOld struct {
@ -84,7 +83,7 @@ func (ne *nopExporter) ConsumeMetrics(context.Context, pdata.Metrics) error {
return ne.retError
}
func (ne *nopExporter) ConsumeLogs(context.Context, data.Logs) error {
func (ne *nopExporter) ConsumeLogs(context.Context, pdata.Logs) error {
return ne.retError
}

View File

@ -64,6 +64,6 @@ func TestNopMetricsExporter(t *testing.T) {
func TestNopLogsExporter(t *testing.T) {
nme := NewNopLogsExporter()
require.NoError(t, nme.Start(context.Background(), nil))
require.NoError(t, nme.ConsumeLogs(context.Background(), data.NewLogs()))
require.NoError(t, nme.ConsumeLogs(context.Background(), pdata.NewLogs()))
require.NoError(t, nme.Shutdown(context.Background()))
}

View File

@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/internal/data"
)
// SinkTraceExporterOld acts as a trace receiver for use in tests.
@ -239,7 +238,7 @@ func (sme *SinkMetricsExporter) Shutdown(context.Context) error {
type SinkLogsExporter struct {
consumeLogError error // to be returned by ConsumeLog, if set
mu sync.Mutex
logs []data.Logs
logs []pdata.Logs
logRecordsCount int
}
@ -260,7 +259,7 @@ func (sle *SinkLogsExporter) Start(context.Context, component.Host) error {
}
// ConsumeLogData stores traces for tests.
func (sle *SinkLogsExporter) ConsumeLogs(_ context.Context, ld data.Logs) error {
func (sle *SinkLogsExporter) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
sle.mu.Lock()
defer sle.mu.Unlock()
if sle.consumeLogError != nil {
@ -274,11 +273,11 @@ func (sle *SinkLogsExporter) ConsumeLogs(_ context.Context, ld data.Logs) error
}
// AllLog returns the metrics sent to the test sink.
func (sle *SinkLogsExporter) AllLogs() []data.Logs {
func (sle *SinkLogsExporter) AllLogs() []pdata.Logs {
sle.mu.Lock()
defer sle.mu.Unlock()
copyLogs := make([]data.Logs, len(sle.logs))
copyLogs := make([]pdata.Logs, len(sle.logs))
copy(copyLogs, sle.logs)
return copyLogs
}

View File

@ -27,7 +27,6 @@ import (
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/internal/data/testdata"
)
@ -139,7 +138,7 @@ func TestSinkLogsExporter(t *testing.T) {
sink := new(SinkLogsExporter)
require.NoError(t, sink.Start(context.Background(), componenttest.NewNopHost()))
md := testdata.GenerateLogDataOneLogNoResource()
want := make([]data.Logs, 0, 7)
want := make([]pdata.Logs, 0, 7)
for i := 0; i < 7; i++ {
require.NoError(t, sink.ConsumeLogs(context.Background(), md))
want = append(want, md)

View File

@ -26,7 +26,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/consumer/pdata"
)
// Marshaler configuration used for marhsaling Protobuf to JSON. Use default config.
@ -202,7 +202,7 @@ func (e *Exporter) ConsumeMetricsData(ctx context.Context, md consumerdata.Metri
return nil
}
func (e *Exporter) ConsumeLogs(ctx context.Context, ld data.Logs) error {
func (e *Exporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
// Ensure only one write operation happens at a time.
e.mutex.Lock()
defer e.mutex.Unlock()
@ -210,7 +210,7 @@ func (e *Exporter) ConsumeLogs(ctx context.Context, ld data.Logs) error {
// Prepare to write JSON object.
jw := &jsonWriter{writer: e.file}
logsProto := data.LogsToProto(ld)
logsProto := pdata.LogsToOtlp(ld)
for _, rl := range logsProto {
if err := jw.Begin(); err != nil {

View File

@ -27,7 +27,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/consumer/pdata"
otlpcommon "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1"
logspb "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/logs/v1"
otresourcepb "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/resource/v1"
@ -196,7 +196,7 @@ func TestFileLogsExporterNoErrors(t *testing.T) {
},
},
}
assert.NoError(t, exporter.ConsumeLogs(context.Background(), data.LogsFromProto(ld)))
assert.NoError(t, exporter.ConsumeLogs(context.Background(), pdata.LogsFromOtlp(ld)))
assert.NoError(t, exporter.Shutdown(context.Background()))
decoder := json.NewDecoder(mf)
@ -331,7 +331,7 @@ func TestFileLogsExporterErrors(t *testing.T) {
exporter := &Exporter{file: mf}
require.NotNil(t, exporter)
assert.Error(t, exporter.ConsumeLogs(context.Background(), data.LogsFromProto(ld)))
assert.Error(t, exporter.ConsumeLogs(context.Background(), pdata.LogsFromOtlp(ld)))
assert.NoError(t, exporter.Shutdown(context.Background()))
})
}

View File

@ -29,7 +29,6 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/data"
)
type logDataBuffer struct {
@ -420,7 +419,7 @@ func NewLogsExporter(config configmodels.Exporter, level string, logger *zap.Log
func (s *loggingExporter) pushLogData(
_ context.Context,
ld data.Logs,
ld pdata.Logs,
) (int, error) {
s.logger.Info("LogsExporter", zap.Int("#logs", ld.LogRecordCount()))

View File

@ -102,9 +102,9 @@ func (e *exporterImp) pushMetricsData(ctx context.Context, md pdata.Metrics) (in
return 0, nil
}
func (e *exporterImp) pushLogData(ctx context.Context, logs data.Logs) (int, error) {
func (e *exporterImp) pushLogData(ctx context.Context, logs pdata.Logs) (int, error) {
request := &otlplogs.ExportLogsServiceRequest{
ResourceLogs: data.LogsToProto(logs),
ResourceLogs: pdata.LogsToOtlp(logs),
}
err := e.w.exportLogs(ctx, request)

View File

@ -1,86 +0,0 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package data
import (
"github.com/gogo/protobuf/proto"
"go.opentelemetry.io/collector/consumer/pdata"
otlplogs "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/logs/v1"
)
// This file defines in-memory data structures to represent logs.
// Logs is the top-level struct that is propagated through the logs pipeline.
//
// This is a reference type (like builtin map).
//
// Must use NewLogs functions to create new instances.
// Important: zero-initialized instance is not valid for use.
type Logs struct {
orig *[]*otlplogs.ResourceLogs
}
// LogsFromProto creates the internal Logs representation from the ProtoBuf.
func LogsFromProto(orig []*otlplogs.ResourceLogs) Logs {
return Logs{&orig}
}
// LogsToProto converts the internal Logs to the ProtoBuf.
func LogsToProto(ld Logs) []*otlplogs.ResourceLogs {
return *ld.orig
}
// NewLogs creates a new Logs.
func NewLogs() Logs {
orig := []*otlplogs.ResourceLogs(nil)
return Logs{&orig}
}
// Clone returns a copy of Logs.
func (ld Logs) Clone() Logs {
otlp := LogsToProto(ld)
resourceSpansClones := make([]*otlplogs.ResourceLogs, 0, len(otlp))
for _, resourceSpans := range otlp {
resourceSpansClones = append(resourceSpansClones,
proto.Clone(resourceSpans).(*otlplogs.ResourceLogs))
}
return LogsFromProto(resourceSpansClones)
}
// LogRecordCount calculates the total number of log records.
func (ld Logs) LogRecordCount() int {
logCount := 0
rss := ld.ResourceLogs()
for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)
if rs.IsNil() {
continue
}
ill := rs.InstrumentationLibraryLogs()
for i := 0; i < ill.Len(); i++ {
logs := ill.At(i)
if logs.IsNil() {
continue
}
logCount += logs.Logs().Len()
}
}
return logCount
}
func (ld Logs) ResourceLogs() pdata.ResourceLogsSlice {
return pdata.NewResourceLogsSliceFromOrig(ld.orig)
}

View File

@ -17,7 +17,6 @@ package testdata
import (
"time"
"go.opentelemetry.io/collector/internal/data"
otlplogs "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/logs/v1"
"go.opentelemetry.io/collector/consumer/pdata"
@ -33,8 +32,8 @@ const (
NumLogTests = 11
)
func GenerateLogDataEmpty() data.Logs {
ld := data.NewLogs()
func GenerateLogDataEmpty() pdata.Logs {
ld := pdata.NewLogs()
return ld
}
@ -42,7 +41,7 @@ func generateLogOtlpEmpty() []*otlplogs.ResourceLogs {
return []*otlplogs.ResourceLogs(nil)
}
func GenerateLogDataOneEmptyResourceLogs() data.Logs {
func GenerateLogDataOneEmptyResourceLogs() pdata.Logs {
ld := GenerateLogDataEmpty()
ld.ResourceLogs().Resize(1)
return ld
@ -54,8 +53,8 @@ func generateLogOtlpOneEmptyResourceLogs() []*otlplogs.ResourceLogs {
}
}
func GenerateLogDataOneEmptyOneNilResourceLogs() data.Logs {
return data.LogsFromProto(generateLogOtlpOneEmptyOneNilResourceLogs())
func GenerateLogDataOneEmptyOneNilResourceLogs() pdata.Logs {
return pdata.LogsFromOtlp(generateLogOtlpOneEmptyOneNilResourceLogs())
}
@ -66,7 +65,7 @@ func generateLogOtlpOneEmptyOneNilResourceLogs() []*otlplogs.ResourceLogs {
}
}
func GenerateLogDataNoLogRecords() data.Logs {
func GenerateLogDataNoLogRecords() pdata.Logs {
ld := GenerateLogDataOneEmptyResourceLogs()
rs0 := ld.ResourceLogs().At(0)
initResource1(rs0.Resource())
@ -81,7 +80,7 @@ func generateLogOtlpNoLogRecords() []*otlplogs.ResourceLogs {
}
}
func GenerateLogDataOneEmptyLogs() data.Logs {
func GenerateLogDataOneEmptyLogs() pdata.Logs {
ld := GenerateLogDataNoLogRecords()
rs0 := ld.ResourceLogs().At(0)
rs0.InstrumentationLibraryLogs().Resize(1)
@ -104,8 +103,8 @@ func generateLogOtlpOneEmptyLogs() []*otlplogs.ResourceLogs {
}
}
func GenerateLogDataOneEmptyOneNilLogRecord() data.Logs {
return data.LogsFromProto(generateLogOtlpOneEmptyOneNilLogRecord())
func GenerateLogDataOneEmptyOneNilLogRecord() pdata.Logs {
return pdata.LogsFromOtlp(generateLogOtlpOneEmptyOneNilLogRecord())
}
func generateLogOtlpOneEmptyOneNilLogRecord() []*otlplogs.ResourceLogs {
@ -124,7 +123,7 @@ func generateLogOtlpOneEmptyOneNilLogRecord() []*otlplogs.ResourceLogs {
}
}
func GenerateLogDataOneLogNoResource() data.Logs {
func GenerateLogDataOneLogNoResource() pdata.Logs {
ld := GenerateLogDataOneEmptyResourceLogs()
rs0 := ld.ResourceLogs().At(0)
rs0.InstrumentationLibraryLogs().Resize(1)
@ -148,7 +147,7 @@ func generateLogOtlpOneLogNoResource() []*otlplogs.ResourceLogs {
}
}
func GenerateLogDataOneLog() data.Logs {
func GenerateLogDataOneLog() pdata.Logs {
ld := GenerateLogDataOneEmptyLogs()
rs0 := ld.ResourceLogs().At(0)
rs0.InstrumentationLibraryLogs().Resize(1)
@ -173,8 +172,8 @@ func generateLogOtlpOneLog() []*otlplogs.ResourceLogs {
}
}
func GenerateLogDataOneLogOneNil() data.Logs {
return data.LogsFromProto(generateLogOtlpOneLogOneNil())
func GenerateLogDataOneLogOneNil() pdata.Logs {
return pdata.LogsFromOtlp(generateLogOtlpOneLogOneNil())
}
func generateLogOtlpOneLogOneNil() []*otlplogs.ResourceLogs {
@ -193,7 +192,7 @@ func generateLogOtlpOneLogOneNil() []*otlplogs.ResourceLogs {
}
}
func GenerateLogDataTwoLogsSameResource() data.Logs {
func GenerateLogDataTwoLogsSameResource() pdata.Logs {
ld := GenerateLogDataOneEmptyLogs()
rs0 := ld.ResourceLogs().At(0)
rs0.InstrumentationLibraryLogs().Resize(1)
@ -220,8 +219,8 @@ func GenerateLogOtlpSameResourceTwoLogs() []*otlplogs.ResourceLogs {
}
}
func GenerateLogDataTwoLogsSameResourceOneDifferent() data.Logs {
ld := data.NewLogs()
func GenerateLogDataTwoLogsSameResourceOneDifferent() pdata.Logs {
ld := pdata.NewLogs()
ld.ResourceLogs().Resize(2)
rl0 := ld.ResourceLogs().At(0)
initResource1(rl0.Resource())

View File

@ -19,13 +19,13 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/consumer/pdata"
otlplogs "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/logs/v1"
)
type logTestCase struct {
name string
ld data.Logs
ld pdata.Logs
otlp []*otlplogs.ResourceLogs
}
@ -96,9 +96,9 @@ func TestToFromOtlpLog(t *testing.T) {
for i := range allTestCases {
test := allTestCases[i]
t.Run(test.name, func(t *testing.T) {
ld := data.LogsFromProto(test.otlp)
ld := pdata.LogsFromOtlp(test.otlp)
assert.EqualValues(t, test.ld, ld)
otlp := data.LogsToProto(ld)
otlp := pdata.LogsToOtlp(ld)
assert.EqualValues(t, test.otlp, otlp)
})
}

View File

@ -28,7 +28,6 @@ import (
"go.opentelemetry.io/collector/consumer/converter"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/internal/data"
)
// This file contains implementations of cloning Trace/Metrics connectors
@ -226,7 +225,7 @@ type LogCloningFanOutConnector []consumer.LogsConsumer
var _ consumer.LogsConsumer = (*LogCloningFanOutConnector)(nil)
// ConsumeLogs exports the span data to all consumers wrapped by the current one.
func (lfc LogCloningFanOutConnector) ConsumeLogs(ctx context.Context, ld data.Logs) error {
func (lfc LogCloningFanOutConnector) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
var errs []error
// Fan out to first len-1 consumers.

View File

@ -22,7 +22,6 @@ import (
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/converter"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/data"
)
// This file contains implementations of Trace/Metrics connectors
@ -166,7 +165,7 @@ type LogFanOutConnector []consumer.LogsConsumer
var _ consumer.LogsConsumer = (*LogFanOutConnector)(nil)
// Consume exports the span data to all consumers wrapped by the current one.
func (fc LogFanOutConnector) ConsumeLogs(ctx context.Context, ld data.Logs) error {
func (fc LogFanOutConnector) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
var errs []error
for _, tc := range fc {
if err := tc.ConsumeLogs(ctx, ld); err != nil {

View File

@ -26,7 +26,6 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/processor"
)
@ -153,7 +152,7 @@ func (ml *memoryLimiter) ProcessMetrics(ctx context.Context, md pdata.Metrics) (
}
// ProcessLogs implements the LProcessor interface
func (ml *memoryLimiter) ProcessLogs(ctx context.Context, ld data.Logs) (data.Logs, error) {
func (ml *memoryLimiter) ProcessLogs(ctx context.Context, ld pdata.Logs) (pdata.Logs, error) {
numRecords := ld.LogRecordCount()
if ml.forcingDrop() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"

View File

@ -256,7 +256,7 @@ func TestLogMemoryPressureResponse(t *testing.T) {
require.NoError(t, err)
ctx := context.Background()
ld := data.NewLogs()
ld := pdata.NewLogs()
// Below memAllocLimit.
currentMemAlloc = 800

View File

@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/obsreport"
)
@ -51,7 +50,7 @@ type MProcessor interface {
type LProcessor interface {
// ProcessLogs is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessLogs(context.Context, data.Logs) (data.Logs, error)
ProcessLogs(context.Context, pdata.Logs) (pdata.Logs, error)
}
// Option apply changes to internalOptions.
@ -207,7 +206,7 @@ type logProcessor struct {
nextConsumer consumer.LogsConsumer
}
func (lp *logProcessor) ConsumeLogs(ctx context.Context, ld data.Logs) error {
func (lp *logProcessor) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
processorCtx := obsreport.ProcessorContext(ctx, lp.fullName)
var err error
ld, err = lp.processor.ProcessLogs(processorCtx, ld)

View File

@ -29,7 +29,6 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/internal/data/testdata"
)
@ -186,6 +185,6 @@ func newTestLProcessor(retError error) LProcessor {
return &testLProcessor{retError: retError}
}
func (tlp *testLProcessor) ProcessLogs(_ context.Context, ld data.Logs) (data.Logs, error) {
func (tlp *testLProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
return ld, tlp.retError
}

View File

@ -21,12 +21,12 @@ import (
"go.uber.org/zap"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/receiver/fluentforwardreceiver/observ"
)
// Collector acts as an aggregator of LogRecords so that we don't have to
// generate as many data.Logs instances...we can pre-batch the LogRecord
// generate as many pdata.Logs instances...we can pre-batch the LogRecord
// instances from several Forward events into one to hopefully reduce
// allocations and GC overhead.
type Collector struct {
@ -75,8 +75,8 @@ func fillBufferUntilChanEmpty(eventCh <-chan Event, buf []Event) []Event {
}
}
func collectLogRecords(events []Event, logger *zap.Logger) data.Logs {
out := data.NewLogs()
func collectLogRecords(events []Event, logger *zap.Logger) pdata.Logs {
out := pdata.NewLogs()
logs := out.ResourceLogs()

View File

@ -33,7 +33,6 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/receiver/fluentforwardreceiver/testdata"
"go.opentelemetry.io/collector/testutil/logstest"
)
@ -112,7 +111,7 @@ func TestMessageEvent(t *testing.T) {
require.Equal(t, len(eventBytes), n)
require.NoError(t, conn.Close())
var converted []data.Logs
var converted []pdata.Logs
require.Eventually(t, func() bool {
converted = next.AllLogs()
return len(converted) == 1
@ -144,7 +143,7 @@ func TestForwardEvent(t *testing.T) {
require.Equal(t, len(eventBytes), n)
require.NoError(t, conn.Close())
var converted []data.Logs
var converted []pdata.Logs
require.Eventually(t, func() bool {
converted = next.AllLogs()
return len(converted) == 1
@ -226,7 +225,7 @@ func TestForwardPackedEvent(t *testing.T) {
require.Equal(t, len(eventBytes), n)
require.NoError(t, conn.Close())
var converted []data.Logs
var converted []pdata.Logs
require.Eventually(t, func() bool {
converted = next.AllLogs()
return len(converted) == 1
@ -297,7 +296,7 @@ func TestForwardPackedCompressedEvent(t *testing.T) {
require.Equal(t, len(eventBytes), n)
require.NoError(t, conn.Close())
var converted []data.Logs
var converted []pdata.Logs
require.Eventually(t, func() bool {
converted = next.AllLogs()
return len(converted) == 1
@ -382,7 +381,7 @@ func TestUnixEndpoint(t *testing.T) {
require.NoError(t, err)
require.Greater(t, n, 0)
var converted []data.Logs
var converted []pdata.Logs
require.Eventually(t, func() bool {
converted = next.AllLogs()
return len(converted) == 1
@ -426,7 +425,7 @@ func TestHighVolume(t *testing.T) {
wg.Wait()
var converted []data.Logs
var converted []pdata.Logs
require.Eventually(t, func() bool {
converted = next.AllLogs()

View File

@ -19,7 +19,7 @@ import (
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/consumer/pdata"
collectorlog "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1"
"go.opentelemetry.io/collector/obsreport"
)
@ -53,7 +53,7 @@ func (r *Receiver) Export(ctx context.Context, req *collectorlog.ExportLogsServi
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.instanceName, receiverTransport, receiverTagValue)
ld := data.LogsFromProto(req.ResourceLogs)
ld := pdata.LogsFromOtlp(req.ResourceLogs)
err := r.sendToNextConsumer(ctxWithReceiverName, ld)
if err != nil {
return nil, err
@ -62,7 +62,7 @@ func (r *Receiver) Export(ctx context.Context, req *collectorlog.ExportLogsServi
return &collectorlog.ExportLogsServiceResponse{}, nil
}
func (r *Receiver) sendToNextConsumer(ctx context.Context, ld data.Logs) error {
func (r *Receiver) sendToNextConsumer(ctx context.Context, ld pdata.Logs) error {
numSpans := ld.LogRecordCount()
if numSpans == 0 {
return nil

View File

@ -26,8 +26,8 @@ import (
"google.golang.org/grpc"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/data"
collectorlog "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/logs/v1"
otlplog "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/logs/v1"
"go.opentelemetry.io/collector/obsreport"
@ -77,7 +77,7 @@ func TestExport(t *testing.T) {
// Keep log data to compare the test result against it
// Clone needed because OTLP proto XXX_ fields are altered in the GRPC downstream
traceData := data.LogsFromProto(resourceLogs).Clone()
traceData := pdata.LogsFromOtlp(resourceLogs).Clone()
req := &collectorlog.ExportLogsServiceRequest{
ResourceLogs: resourceLogs,

View File

@ -33,7 +33,7 @@ import (
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
idata "go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/processor/attributesprocessor"
"go.opentelemetry.io/collector/translator/internaldata"
)
@ -185,7 +185,7 @@ func TestPipelinesBuilder_BuildVarious(t *testing.T) {
}
// Send one custom data.
log := idata.Logs{}
log := pdata.Logs{}
processor.firstLC.(consumer.LogsConsumer).ConsumeLogs(context.Background(), log)
// Now verify received data.

View File

@ -29,7 +29,7 @@ import (
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
idata "go.opentelemetry.io/collector/internal/data"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/processor/attributesprocessor"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/collector/receiver/zipkinreceiver"
@ -246,7 +246,7 @@ func TestReceiversBuilder_BuildCustom(t *testing.T) {
}
// Send one data.
log := idata.Logs{}
log := pdata.Logs{}
producer := receiver.receiver.(*componenttest.ExampleReceiverProducer)
producer.LogConsumer.ConsumeLogs(context.Background(), log)

View File

@ -16,7 +16,6 @@ package logstest
import (
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/data"
)
type Log struct {
@ -29,8 +28,8 @@ type Log struct {
// relatively easy to read and write declaratively compared to the highly
// imperative and verbose method of using pdata directly.
// Attributes are sorted by key name.
func Logs(recs ...Log) data.Logs {
out := data.NewLogs()
func Logs(recs ...Log) pdata.Logs {
out := pdata.NewLogs()
logs := out.ResourceLogs()