[disk-buffering] - Single responsibility for disk exporters (#1161)

This commit is contained in:
jason plumb 2024-01-23 13:44:05 -08:00 committed by GitHub
parent fa5d16193b
commit 3f7db8b748
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
45 changed files with 1823 additions and 1077 deletions

View File

@ -1,11 +1,25 @@
# Contributor Guide
# Design Overview
Each one of the three exporters provided by this
tool ([LogRecordDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java), [MetricDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java)
and [SpanDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java))
is responsible of performing 2 actions, `write` and `read/delegate`, the `write` one happens
automatically as a set of signals are provided from the processor, while the `read/delegate` one has
to be triggered manually by the consumer of this library as explained in the [README](README.md).
There are three main disk-writing exporters provided by this module:
* [LogRecordToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java)
* [MetricToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java)
* [SpanToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java))
Each is responsible for writing a specific type of telemetry to disk storage for later
harvest/ingest.
For later reading, there are:
* [LogRecordFromToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java)
* [MetricFromDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java)
* [SpanFromDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java))
Each one of those has a `create()` method that takes a delegate exporter (to send data
to ingest) and the `StorageConfiguration` that tells them where to find buffered data.
As explained in the [README](README.md), this has to be triggered manually by the consumer of
this library and does not happen automatically.
## Writing overview
@ -14,7 +28,7 @@ to be triggered manually by the consumer of this library as explained in the [RE
* The writing process happens automatically within its `export(Collection<SignalData> signals)`
method, which is called by the configured signal processor.
* When a set of signals is received, these are delegated over to
the [DiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporter.java)
a type-specific wrapper of [ToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java)
class which then serializes them using an implementation
of [SignalSerializer](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java)
and then the serialized data is appended into a File using an instance of

View File

@ -1,6 +1,6 @@
# Disk buffering
This module provides signal exporter wrappers that intercept and store signals in files which can be
This module provides exporters that store telemetry data in files which can be
sent later on demand. A high level description of how it works is that there are two separate
processes in place, one for writing data in disk, and one for reading/exporting the previously
stored data.
@ -8,11 +8,11 @@ stored data.
* Each exporter stores the received data automatically in disk right after it's received from its
processor.
* The reading of the data back from disk and exporting process has to be done manually. At
the moment there's no automatic mechanism to do so. There's more information on it can be
the moment there's no automatic mechanism to do so. There's more information on how it can be
achieved, under [Reading data](#reading-data).
> For a more detailed information on how the whole process works, take a look at
> the [CONTRIBUTING](CONTRIBUTING.md) file.
> the [DESIGN.md](DESIGN.md) file.
## Configuration
@ -43,11 +43,11 @@ In order to use it, you need to wrap your own exporter with a new instance of
the ones provided in here:
* For a LogRecordExporter, it must be wrapped within
a [LogRecordDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java).
a [LogRecordToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java).
* For a MetricExporter, it must be wrapped within
a [MetricDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java).
a [MetricToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java).
* For a SpanExporter, it must be wrapped within
a [SpanDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java).
a [SpanToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java).
Each wrapper will need the following when instantiating them:

View File

@ -53,7 +53,7 @@ wire {
java {}
sourcePath {
srcJar("io.opentelemetry.proto:opentelemetry-proto:0.20.0-alpha")
srcJar("io.opentelemetry.proto:opentelemetry-proto:1.1.0-alpha")
}
root(

View File

@ -1,95 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporters.DiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
/**
* This is a {@link LogRecordExporter} wrapper that takes care of intercepting all the signals sent
* out to be exported, tries to store them in the disk in order to export them later.
*
* <p>In order to use it, you need to wrap your own {@link LogRecordExporter} with a new instance of
* this one, which will be the one you need to register in your {@link LogRecordProcessor}.
*/
public final class LogRecordDiskExporter implements LogRecordExporter, StoredBatchExporter {
private final LogRecordExporter wrapped;
private final DiskExporter<LogRecordData> diskExporter;
/**
* Creates a new instance of {@link LogRecordDiskExporter}.
*
* @param wrapped - The exporter where the data retrieved from the disk will be delegated to.
* @param rootDir - The directory to create this signal's cache dir where all the data will be
* written into.
* @param configuration - How you want to manage the storage process.
* @throws IOException If no dir can be created in rootDir.
*/
public static LogRecordDiskExporter create(
LogRecordExporter wrapped, File rootDir, StorageConfiguration configuration)
throws IOException {
return create(wrapped, rootDir, configuration, Clock.getDefault());
}
// This is exposed for testing purposes.
static LogRecordDiskExporter create(
LogRecordExporter wrapped, File rootDir, StorageConfiguration configuration, Clock clock)
throws IOException {
DiskExporter<LogRecordData> diskExporter =
DiskExporter.<LogRecordData>builder()
.setSerializer(SignalSerializer.ofLogs())
.setRootDir(rootDir)
.setFolderName("logs")
.setStorageConfiguration(configuration)
.setStorageClock(clock)
.setExportFunction(wrapped::export)
.build();
return new LogRecordDiskExporter(wrapped, diskExporter);
}
private LogRecordDiskExporter(
LogRecordExporter wrapped, DiskExporter<LogRecordData> diskExporter) {
this.wrapped = wrapped;
this.diskExporter = diskExporter;
}
@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
return diskExporter.onExport(logs);
}
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode shutdown() {
try {
diskExporter.onShutDown();
} catch (IOException e) {
return CompletableResultCode.ofFailure();
} finally {
wrapped.shutdown();
}
return CompletableResultCode.ofSuccess();
}
@Override
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
return diskExporter.exportStoredBatch(timeout, unit);
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class LogRecordFromDiskExporter implements FromDiskExporter {
private final FromDiskExporterImpl<LogRecordData> delegate;
public static LogRecordFromDiskExporter create(
LogRecordExporter exporter, StorageConfiguration config) throws IOException {
FromDiskExporterImpl<LogRecordData> delegate =
FromDiskExporterImpl.<LogRecordData>builder()
.setFolderName("logs")
.setStorageConfiguration(config)
.setDeserializer(SignalSerializer.ofLogs())
.setExportFunction(exporter::export)
.build();
return new LogRecordFromDiskExporter(delegate);
}
private LogRecordFromDiskExporter(FromDiskExporterImpl<LogRecordData> delegate) {
this.delegate = delegate;
}
@Override
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
return delegate.exportStoredBatch(timeout, unit);
}
@Override
public void shutdown() throws IOException {
delegate.shutdown();
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.IOException;
import java.util.Collection;
/**
* This class implements a {@link LogRecordExporter} that delegates to an instance of {@code
* ToDiskExporter<LogRecordData>}.
*/
public class LogRecordToDiskExporter implements LogRecordExporter {
private final ToDiskExporter<LogRecordData> delegate;
/**
* Creates a new LogRecordToDiskExporter that will buffer LogRecordData telemetry on disk storage.
*
* @param delegate - The LogRecordExporter to delegate to if disk writing fails.
* @param config - The StorageConfiguration that specifies how storage is managed.
* @return A new LogRecordToDiskExporter instance.
* @throws IOException if the delegate ToDiskExporter could not be created.
*/
public static LogRecordToDiskExporter create(
LogRecordExporter delegate, StorageConfiguration config) throws IOException {
ToDiskExporter<LogRecordData> toDisk =
ToDiskExporter.<LogRecordData>builder()
.setFolderName("logs")
.setStorageConfiguration(config)
.setSerializer(SignalSerializer.ofLogs())
.setExportFunction(delegate::export)
.build();
return new LogRecordToDiskExporter(toDisk);
}
// Visible for testing
LogRecordToDiskExporter(ToDiskExporter<LogRecordData> delegate) {
this.delegate = delegate;
}
@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
return delegate.export(logs);
}
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode shutdown() {
try {
delegate.shutdown();
return CompletableResultCode.ofSuccess();
} catch (IOException e) {
return CompletableResultCode.ofFailure();
}
}
}

View File

@ -1,100 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporters.DiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
/**
* This is a {@link MetricExporter} wrapper that takes care of intercepting all the signals sent out
* to be exported, tries to store them in the disk in order to export them later.
*
* <p>In order to use it, you need to wrap your own {@link MetricExporter} with a new instance of
* this one, which will be the one you need to register in your {@link MetricReader}.
*/
public final class MetricDiskExporter implements MetricExporter, StoredBatchExporter {
private final MetricExporter wrapped;
private final DiskExporter<MetricData> diskExporter;
/**
* Creates a new instance of {@link MetricDiskExporter}.
*
* @param wrapped - The exporter where the data retrieved from the disk will be delegated to.
* @param rootDir - The directory to create this signal's cache dir where all the data will be
* written into.
* @param configuration - How you want to manage the storage process.
* @throws IOException If no dir can be created in rootDir.
*/
public static MetricDiskExporter create(
MetricExporter wrapped, File rootDir, StorageConfiguration configuration) throws IOException {
return create(wrapped, rootDir, configuration, Clock.getDefault());
}
// This is exposed for testing purposes.
public static MetricDiskExporter create(
MetricExporter wrapped, File rootDir, StorageConfiguration configuration, Clock clock)
throws IOException {
DiskExporter<MetricData> diskExporter =
DiskExporter.<MetricData>builder()
.setRootDir(rootDir)
.setFolderName("metrics")
.setStorageConfiguration(configuration)
.setSerializer(SignalSerializer.ofMetrics())
.setExportFunction(wrapped::export)
.setStorageClock(clock)
.build();
return new MetricDiskExporter(wrapped, diskExporter);
}
private MetricDiskExporter(MetricExporter wrapped, DiskExporter<MetricData> diskExporter) {
this.wrapped = wrapped;
this.diskExporter = diskExporter;
}
@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
return diskExporter.onExport(metrics);
}
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode shutdown() {
try {
diskExporter.onShutDown();
} catch (IOException e) {
return CompletableResultCode.ofFailure();
} finally {
wrapped.shutdown();
}
return CompletableResultCode.ofSuccess();
}
@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return wrapped.getAggregationTemporality(instrumentType);
}
@Override
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
return diskExporter.exportStoredBatch(timeout, unit);
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class MetricFromDiskExporter implements FromDiskExporter {
private final FromDiskExporterImpl<MetricData> delegate;
public static MetricFromDiskExporter create(MetricExporter exporter, StorageConfiguration config)
throws IOException {
FromDiskExporterImpl<MetricData> delegate =
FromDiskExporterImpl.<MetricData>builder()
.setFolderName("metrics")
.setStorageConfiguration(config)
.setDeserializer(SignalSerializer.ofMetrics())
.setExportFunction(exporter::export)
.build();
return new MetricFromDiskExporter(delegate);
}
private MetricFromDiskExporter(FromDiskExporterImpl<MetricData> delegate) {
this.delegate = delegate;
}
@Override
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
return delegate.exportStoredBatch(timeout, unit);
}
@Override
public void shutdown() throws IOException {
delegate.shutdown();
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
import java.util.Collection;
import java.util.function.Function;
/**
* This class implements a {@link MetricExporter} that delegates to an instance of {@code
* ToDiskExporter<MetricData>}.
*/
public class MetricToDiskExporter implements MetricExporter {
private final ToDiskExporter<MetricData> delegate;
private final Function<InstrumentType, AggregationTemporality> typeToTemporality;
/**
* Creates a new MetricToDiskExporter that will buffer Metric telemetry on disk storage.
*
* @param delegate - The MetricExporter to delegate to if disk writing fails.
* @param config - The StorageConfiguration that specifies how storage is managed.
* @param typeToTemporality - The function that maps an InstrumentType into an
* AggregationTemporality.
* @return A new MetricToDiskExporter instance.
* @throws IOException if the delegate ToDiskExporter could not be created.
*/
public static MetricToDiskExporter create(
MetricExporter delegate,
StorageConfiguration config,
Function<InstrumentType, AggregationTemporality> typeToTemporality)
throws IOException {
ToDiskExporter<MetricData> toDisk =
ToDiskExporter.<MetricData>builder()
.setFolderName("metrics")
.setStorageConfiguration(config)
.setSerializer(SignalSerializer.ofMetrics())
.setExportFunction(delegate::export)
.build();
return new MetricToDiskExporter(toDisk, typeToTemporality);
}
// VisibleForTesting
MetricToDiskExporter(
ToDiskExporter<MetricData> delegate,
Function<InstrumentType, AggregationTemporality> typeToTemporality) {
this.delegate = delegate;
this.typeToTemporality = typeToTemporality;
}
@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
return delegate.export(metrics);
}
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode shutdown() {
try {
delegate.shutdown();
} catch (IOException e) {
return CompletableResultCode.ofFailure();
}
return CompletableResultCode.ofSuccess();
}
@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return typeToTemporality.apply(instrumentType);
}
}

View File

@ -1,93 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporters.DiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
/**
* This is a {@link SpanExporter} wrapper that takes care of intercepting all the signals sent out
* to be exported, tries to store them in the disk in order to export them later.
*
* <p>In order to use it, you need to wrap your own {@link SpanExporter} with a new instance of this
* one, which will be the one you need to register in your {@link SpanProcessor}.
*/
public final class SpanDiskExporter implements SpanExporter, StoredBatchExporter {
private final SpanExporter wrapped;
private final DiskExporter<SpanData> diskExporter;
/**
* Creates a new instance of {@link SpanDiskExporter}.
*
* @param wrapped - The exporter where the data retrieved from the disk will be delegated to.
* @param rootDir - The directory to create this signal's cache dir where all the data will be
* written into.
* @param configuration - How you want to manage the storage process.
* @throws IOException If no dir can be created in rootDir.
*/
public static SpanDiskExporter create(
SpanExporter wrapped, File rootDir, StorageConfiguration configuration) throws IOException {
return create(wrapped, rootDir, configuration, Clock.getDefault());
}
// This is exposed for testing purposes.
public static SpanDiskExporter create(
SpanExporter wrapped, File rootDir, StorageConfiguration configuration, Clock clock)
throws IOException {
DiskExporter<SpanData> diskExporter =
DiskExporter.<SpanData>builder()
.setRootDir(rootDir)
.setFolderName("spans")
.setStorageConfiguration(configuration)
.setSerializer(SignalSerializer.ofSpans())
.setExportFunction(wrapped::export)
.setStorageClock(clock)
.build();
return new SpanDiskExporter(wrapped, diskExporter);
}
private SpanDiskExporter(SpanExporter wrapped, DiskExporter<SpanData> diskExporter) {
this.wrapped = wrapped;
this.diskExporter = diskExporter;
}
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
return diskExporter.onExport(spans);
}
@Override
public CompletableResultCode shutdown() {
try {
diskExporter.onShutDown();
} catch (IOException e) {
return CompletableResultCode.ofFailure();
} finally {
wrapped.shutdown();
}
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}
@Override
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
return diskExporter.exportStoredBatch(timeout, unit);
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class SpanFromDiskExporter implements FromDiskExporter {
private final FromDiskExporterImpl<SpanData> delegate;
public static SpanFromDiskExporter create(SpanExporter exporter, StorageConfiguration config)
throws IOException {
FromDiskExporterImpl<SpanData> delegate =
FromDiskExporterImpl.<SpanData>builder()
.setFolderName("spans")
.setStorageConfiguration(config)
.setDeserializer(SignalSerializer.ofSpans())
.setExportFunction(exporter::export)
.build();
return new SpanFromDiskExporter(delegate);
}
private SpanFromDiskExporter(FromDiskExporterImpl<SpanData> delegate) {
this.delegate = delegate;
}
@Override
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
return delegate.exportStoredBatch(timeout, unit);
}
@Override
public void shutdown() throws IOException {
delegate.shutdown();
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.IOException;
import java.util.Collection;
/**
* This class implements a SpanExporter that delegates to an instance of {@code
* ToDiskExporter<SpanData>}.
*/
public class SpanToDiskExporter implements SpanExporter {
private final ToDiskExporter<SpanData> delegate;
/**
* Creates a new SpanToDiskExporter that will buffer Span telemetry on disk storage.
*
* @param delegate - The SpanExporter to delegate to if disk writing fails.
* @param config - The StorageConfiguration that specifies how storage is managed.
* @return A new SpanToDiskExporter instance.
* @throws IOException if the delegate ToDiskExporter could not be created.
*/
public static SpanToDiskExporter create(SpanExporter delegate, StorageConfiguration config)
throws IOException {
ToDiskExporter<SpanData> toDisk =
ToDiskExporter.<SpanData>builder()
.setFolderName("spans")
.setStorageConfiguration(config)
.setSerializer(SignalSerializer.ofSpans())
.setExportFunction(delegate::export)
.build();
return new SpanToDiskExporter(toDisk);
}
// Visible for testing
SpanToDiskExporter(ToDiskExporter<SpanData> delegate) {
this.delegate = delegate;
}
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
return delegate.export(spans);
}
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode shutdown() {
try {
delegate.shutdown();
} catch (IOException e) {
return CompletableResultCode.ofFailure();
}
return CompletableResultCode.ofSuccess();
}
}

View File

@ -1,23 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public interface StoredBatchExporter {
/**
* Reads data from the disk and attempts to export it.
*
* @param timeout The amount of time to wait for the wrapped exporter to finish.
* @param unit The unit of the time provided.
* @return TRUE if there was data available and it was successfully exported within the timeout
* provided. FALSE if either of those conditions didn't meet.
* @throws IOException If an unexpected error happens.
*/
boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException;
}

View File

@ -8,11 +8,16 @@ package io.opentelemetry.contrib.disk.buffering.internal;
import com.google.auto.value.AutoValue;
import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider;
import io.opentelemetry.contrib.disk.buffering.internal.files.TemporaryFileProvider;
import java.io.File;
import java.util.concurrent.TimeUnit;
/** Defines how the storage should be managed. */
@AutoValue
public abstract class StorageConfiguration {
/** The root storage location for buffered telemetry. */
public abstract File getRootDir();
/** The max amount of time a file can receive new data. */
public abstract long getMaxFileAgeForWriteMillis();
@ -45,18 +50,19 @@ public abstract class StorageConfiguration {
/** A creator of temporary files needed to do the disk reading process. */
public abstract TemporaryFileProvider getTemporaryFileProvider();
public static StorageConfiguration getDefault() {
return builder().build();
public static StorageConfiguration getDefault(File rootDir) {
return builder().setRootDir(rootDir).build();
}
public static Builder builder() {
TemporaryFileProvider fileProvider = DefaultTemporaryFileProvider.getInstance();
return new AutoValue_StorageConfiguration.Builder()
.setMaxFileSize(1024 * 1024) // 1MB
.setMaxFolderSize(10 * 1024 * 1024) // 10MB
.setMaxFileAgeForWriteMillis(TimeUnit.SECONDS.toMillis(30))
.setMinFileAgeForReadMillis(TimeUnit.SECONDS.toMillis(33))
.setMaxFileAgeForReadMillis(TimeUnit.HOURS.toMillis(18))
.setTemporaryFileProvider(DefaultTemporaryFileProvider.getInstance());
.setTemporaryFileProvider(fileProvider);
}
@AutoValue.Builder
@ -73,6 +79,8 @@ public abstract class StorageConfiguration {
public abstract Builder setTemporaryFileProvider(TemporaryFileProvider value);
public abstract Builder setRootDir(File rootDir);
public abstract StorageConfiguration build();
}
}

View File

@ -0,0 +1,15 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public interface FromDiskExporter {
boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException;
void shutdown() throws IOException;
}

View File

@ -0,0 +1,81 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
public class FromDiskExporterBuilder<T> {
private SignalSerializer<T> serializer = noopSerializer();
private Function<Collection<T>, CompletableResultCode> exportFunction =
x -> CompletableResultCode.ofFailure();
@NotNull
private static <T> SignalSerializer<T> noopSerializer() {
return new SignalSerializer<T>() {
@Override
public byte[] serialize(Collection<T> ts) {
return new byte[0];
}
@Override
public List<T> deserialize(byte[] source) {
return Collections.emptyList();
}
};
}
private final StorageBuilder storageBuilder = Storage.builder();
@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setFolderName(String folderName) {
storageBuilder.setFolderName(folderName);
return this;
}
@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setStorageConfiguration(StorageConfiguration configuration) {
storageBuilder.setStorageConfiguration(configuration);
return this;
}
@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setStorageClock(Clock clock) {
storageBuilder.setStorageClock(clock);
return this;
}
@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setDeserializer(SignalSerializer<T> serializer) {
this.serializer = serializer;
return this;
}
@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setExportFunction(
Function<Collection<T>, CompletableResultCode> exportFunction) {
this.exportFunction = exportFunction;
return this;
}
public FromDiskExporterImpl<T> build() throws IOException {
Storage storage = storageBuilder.build();
return new FromDiskExporterImpl<>(serializer, exportFunction, storage);
}
}

View File

@ -3,9 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering.internal.exporters;
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
import io.opentelemetry.contrib.disk.buffering.StoredBatchExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
@ -17,25 +16,38 @@ import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
public final class DiskExporter<EXPORT_DATA> implements StoredBatchExporter {
/**
* Signal-type generic class that can read telemetry previously buffered on disk and send it to
* another delegated exporter.
*/
public final class FromDiskExporterImpl<EXPORT_DATA> implements FromDiskExporter {
private final Storage storage;
private final SignalSerializer<EXPORT_DATA> serializer;
private final SignalSerializer<EXPORT_DATA> deserializer;
private final Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction;
private static final Logger logger = Logger.getLogger(DiskExporter.class.getName());
private static final Logger logger = Logger.getLogger(FromDiskExporterImpl.class.getName());
DiskExporter(
SignalSerializer<EXPORT_DATA> serializer,
FromDiskExporterImpl(
SignalSerializer<EXPORT_DATA> deserializer,
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
Storage storage) {
this.serializer = serializer;
this.deserializer = deserializer;
this.exportFunction = exportFunction;
this.storage = storage;
}
public static <T> DiskExporterBuilder<T> builder() {
return new DiskExporterBuilder<T>();
public static <T> FromDiskExporterBuilder<T> builder() {
return new FromDiskExporterBuilder<>();
}
/**
* Reads data from the disk and attempts to export it.
*
* @param timeout The amount of time to wait for the wrapped exporter to finish.
* @param unit The unit of the time provided.
* @return true if there was data available and it was successfully exported within the timeout
* provided. false otherwise.
* @throws IOException If an unexpected error happens.
*/
@Override
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
logger.log(Level.INFO, "Attempting to export batch from disk.");
@ -44,31 +56,14 @@ public final class DiskExporter<EXPORT_DATA> implements StoredBatchExporter {
bytes -> {
logger.log(Level.INFO, "About to export stored batch.");
CompletableResultCode join =
exportFunction.apply(serializer.deserialize(bytes)).join(timeout, unit);
exportFunction.apply(deserializer.deserialize(bytes)).join(timeout, unit);
return join.isSuccess();
});
return result == ReadableResult.SUCCEEDED;
}
public void onShutDown() throws IOException {
@Override
public void shutdown() throws IOException {
storage.close();
}
public CompletableResultCode onExport(Collection<EXPORT_DATA> data) {
logger.log(Level.FINER, "Intercepting exporter batch.");
try {
if (storage.write(serializer.serialize(data))) {
return CompletableResultCode.ofSuccess();
} else {
logger.log(Level.INFO, "Could not store batch in disk. Exporting it right away.");
return exportFunction.apply(data);
}
} catch (IOException e) {
logger.log(
Level.WARNING,
"An unexpected error happened while attempting to write the data in disk. Exporting it right away.",
e);
return exportFunction.apply(data);
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Collection;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ToDiskExporter<EXPORT_DATA> {
private static final Logger logger = Logger.getLogger(ToDiskExporter.class.getName());
private final Storage storage;
private final SignalSerializer<EXPORT_DATA> serializer;
private final Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction;
ToDiskExporter(
SignalSerializer<EXPORT_DATA> serializer,
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
Storage storage) {
this.serializer = serializer;
this.exportFunction = exportFunction;
this.storage = storage;
}
public static <T> ToDiskExporterBuilder<T> builder() {
return new ToDiskExporterBuilder<>();
}
public CompletableResultCode export(Collection<EXPORT_DATA> data) {
logger.log(Level.FINER, "Intercepting exporter batch.");
try {
if (storage.write(serializer.serialize(data))) {
return CompletableResultCode.ofSuccess();
}
logger.log(Level.INFO, "Could not store batch in disk. Exporting it right away.");
return exportFunction.apply(data);
} catch (IOException e) {
logger.log(
Level.WARNING,
"An unexpected error happened while attempting to write the data in disk. Exporting it right away.",
e);
return exportFunction.apply(data);
}
}
public void shutdown() throws IOException {
storage.close();
}
}

View File

@ -3,23 +3,22 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering.internal.exporters;
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.FolderManager;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
public final class DiskExporterBuilder<T> {
public final class ToDiskExporterBuilder<T> {
private SignalSerializer<T> serializer =
new SignalSerializer<T>() {
@ -34,71 +33,49 @@ public final class DiskExporterBuilder<T> {
return Collections.emptyList();
}
};
private File rootDir = new File(".");
private String folderName = "data";
private StorageConfiguration configuration = StorageConfiguration.getDefault();
private Clock clock = Clock.getDefault();
private final StorageBuilder storageBuilder = Storage.builder();
private Function<Collection<T>, CompletableResultCode> exportFunction =
x -> CompletableResultCode.ofFailure();
DiskExporterBuilder() {}
ToDiskExporterBuilder() {}
@CanIgnoreReturnValue
public DiskExporterBuilder<T> setRootDir(File rootDir) {
this.rootDir = rootDir;
public ToDiskExporterBuilder<T> setFolderName(String folderName) {
storageBuilder.setFolderName(folderName);
return this;
}
@CanIgnoreReturnValue
public DiskExporterBuilder<T> setFolderName(String folderName) {
this.folderName = folderName;
public ToDiskExporterBuilder<T> setStorageConfiguration(StorageConfiguration configuration) {
validateConfiguration(configuration);
storageBuilder.setStorageConfiguration(configuration);
return this;
}
@CanIgnoreReturnValue
public DiskExporterBuilder<T> setStorageConfiguration(StorageConfiguration configuration) {
this.configuration = configuration;
public ToDiskExporterBuilder<T> setStorageClock(Clock clock) {
storageBuilder.setStorageClock(clock);
return this;
}
@CanIgnoreReturnValue
public DiskExporterBuilder<T> setStorageClock(Clock clock) {
this.clock = clock;
return this;
}
@CanIgnoreReturnValue
public DiskExporterBuilder<T> setSerializer(SignalSerializer<T> serializer) {
public ToDiskExporterBuilder<T> setSerializer(SignalSerializer<T> serializer) {
this.serializer = serializer;
return this;
}
@CanIgnoreReturnValue
public DiskExporterBuilder<T> setExportFunction(
public ToDiskExporterBuilder<T> setExportFunction(
Function<Collection<T>, CompletableResultCode> exportFunction) {
this.exportFunction = exportFunction;
return this;
}
private static File getSignalFolder(File rootDir, String folderName) throws IOException {
File folder = new File(rootDir, folderName);
if (!folder.exists()) {
if (!folder.mkdirs()) {
throw new IOException(
"Could not create the signal folder: '" + folderName + "' inside: " + rootDir);
}
}
return folder;
}
public DiskExporter<T> build() throws IOException {
validateConfiguration(configuration);
File folder = getSignalFolder(rootDir, folderName);
Storage storage = new Storage(new FolderManager(folder, configuration, clock));
return new DiskExporter<>(serializer, exportFunction, storage);
public ToDiskExporter<T> build() throws IOException {
Storage storage = storageBuilder.build();
return new ToDiskExporter<>(serializer, exportFunction, storage);
}
private static void validateConfiguration(StorageConfiguration configuration) {

View File

@ -5,10 +5,11 @@
package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs;
import static io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.SpanDataMapper.flagsFromInt;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.AttributesMapper;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.ByteStringMapper;
@ -89,7 +90,7 @@ public final class LogRecordDataMapper {
SpanContext.create(
ByteStringMapper.getInstance().protoToString(source.trace_id),
ByteStringMapper.getInstance().protoToString(source.span_id),
TraceFlags.getSampled(),
flagsFromInt(source.flags),
TraceState.getDefault()));
target.setTotalAttributeCount(source.dropped_attributes_count + attributes.size());
target.setResource(resource);

View File

@ -62,6 +62,7 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableValueAtQuantile;
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.List;
import org.jetbrains.annotations.NotNull;
public final class MetricDataMapper {
@ -555,23 +556,26 @@ public final class MetricDataMapper {
exemplarListToDoubleExemplarDataList(source.exemplars));
}
@NotNull
private static DoubleExemplarData exemplarToDoubleExemplarData(Exemplar source) {
return ImmutableDoubleExemplarData.create(
protoToAttributes(source.filtered_attributes),
source.time_unix_nano,
createForExemplar(source),
createSpanContext(source),
source.as_double);
}
@NotNull
private static LongExemplarData exemplarToLongExemplarData(Exemplar source) {
return ImmutableLongExemplarData.create(
protoToAttributes(source.filtered_attributes),
source.time_unix_nano,
createForExemplar(source),
createSpanContext(source),
source.as_int);
}
private static SpanContext createForExemplar(Exemplar value) {
@NotNull
private static SpanContext createSpanContext(Exemplar value) {
return SpanContext.create(
ByteStringMapper.getInstance().protoToString(value.trace_id),
ByteStringMapper.getInstance().protoToString(value.span_id),

View File

@ -55,6 +55,7 @@ public final class SpanDataMapper {
}
span.trace_id(byteStringMapper.stringToProto(source.getTraceId()));
span.span_id(byteStringMapper.stringToProto(source.getSpanId()));
span.flags(source.getSpanContext().getTraceFlags().asByte());
span.parent_span_id(byteStringMapper.stringToProto(source.getParentSpanId()));
span.name(source.getName());
span.kind(mapSpanKindToProto(source.getKind()));
@ -107,7 +108,7 @@ public final class SpanDataMapper {
SpanContext.create(
traceId,
ByteStringMapper.getInstance().protoToString(source.span_id),
TraceFlags.getSampled(),
flagsFromInt(source.flags),
decodeTraceState(source.trace_state)));
target.setParentSpanContext(
SpanContext.create(
@ -257,7 +258,7 @@ public final class SpanDataMapper {
SpanContext.create(
ByteStringMapper.getInstance().protoToString(source.trace_id),
ByteStringMapper.getInstance().protoToString(source.span_id),
TraceFlags.getSampled(),
flagsFromInt(source.flags),
decodeTraceState(source.trace_state));
return LinkData.create(spanContext, attributes, totalAttrCount);
}
@ -308,6 +309,7 @@ public final class SpanDataMapper {
SpanContext spanContext = source.getSpanContext();
builder.trace_id(ByteStringMapper.getInstance().stringToProto(spanContext.getTraceId()));
builder.span_id(ByteStringMapper.getInstance().stringToProto(spanContext.getSpanId()));
builder.flags = spanContext.getTraceFlags().asByte();
builder.attributes.addAll(attributesToProto(source.getAttributes()));
builder.dropped_attributes_count(
source.getTotalAttributeCount() - source.getAttributes().size());
@ -315,4 +317,8 @@ public final class SpanDataMapper {
return builder.build();
}
public static TraceFlags flagsFromInt(int b) {
return TraceFlags.fromByte((byte) (b & 0x000000FF));
}
}

View File

@ -16,16 +16,20 @@ import java.util.function.Function;
import javax.annotation.Nullable;
public final class Storage implements Closeable {
private static final int MAX_ATTEMPTS = 3;
private final FolderManager folderManager;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
@Nullable private WritableFile writableFile;
@Nullable private ReadableFile readableFile;
private static final int MAX_ATTEMPTS = 3;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
public Storage(FolderManager folderManager) {
this.folderManager = folderManager;
}
public static StorageBuilder builder() {
return new StorageBuilder();
}
/**
* Attempts to write an item into a writable file.
*

View File

@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering.internal.storage;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.sdk.common.Clock;
import java.io.File;
import java.io.IOException;
public class StorageBuilder {
private String folderName = "data";
private StorageConfiguration configuration = StorageConfiguration.getDefault(new File("."));
private Clock clock = Clock.getDefault();
StorageBuilder() {}
@CanIgnoreReturnValue
public StorageBuilder setFolderName(String folderName) {
this.folderName = folderName;
return this;
}
@CanIgnoreReturnValue
public StorageBuilder setStorageConfiguration(StorageConfiguration configuration) {
this.configuration = configuration;
return this;
}
@CanIgnoreReturnValue
public StorageBuilder setStorageClock(Clock clock) {
this.clock = clock;
return this;
}
public Storage build() throws IOException {
File folder = ensureSubdir(configuration.getRootDir(), folderName);
FolderManager folderManager = new FolderManager(folder, configuration, clock);
return new Storage(folderManager);
}
private static File ensureSubdir(File rootDir, String child) throws IOException {
File subdir = new File(rootDir, child);
if (subdir.exists() || subdir.mkdirs()) {
return subdir;
}
throw new IOException("Could not create the subdir: '" + child + "' inside: " + rootDir);
}
}

View File

@ -3,20 +3,17 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering.internal.exporters;
package io.opentelemetry.contrib.disk.buffering;
import static io.opentelemetry.contrib.disk.buffering.internal.storage.TestData.MIN_FILE_AGE_FOR_READ_MILLIS;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData;
import io.opentelemetry.sdk.common.Clock;
@ -34,11 +31,11 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@SuppressWarnings("unchecked")
class DiskExporterTest {
class FromDiskExporterImplTest {
private SpanExporter wrapped;
private SignalSerializer<SpanData> serializer;
private Clock clock;
private DiskExporter<SpanData> exporter;
private FromDiskExporterImpl<SpanData> exporter;
private final List<SpanData> deserializedData = Collections.emptyList();
@TempDir File rootDir;
private static final String STORAGE_FOLDER_NAME = "testName";
@ -46,14 +43,14 @@ class DiskExporterTest {
@BeforeEach
void setUp() throws IOException {
clock = createClockMock();
setUpSerializer();
wrapped = mock();
exporter =
DiskExporter.<SpanData>builder()
.setRootDir(rootDir)
FromDiskExporterImpl.<SpanData>builder()
.setFolderName(STORAGE_FOLDER_NAME)
.setStorageConfiguration(TestData.getDefaultConfiguration())
.setSerializer(serializer)
.setStorageConfiguration(TestData.getDefaultConfiguration(rootDir))
.setDeserializer(serializer)
.setExportFunction(wrapped::export)
.setStorageClock(clock)
.build();
@ -70,30 +67,6 @@ class DiskExporterTest {
assertThat(exporter.exportStoredBatch(1, TimeUnit.SECONDS)).isTrue();
}
@Test
void whenMinFileReadIsNotGraterThanMaxFileWrite_throwException() {
assertThatThrownBy(
() -> {
StorageConfiguration invalidConfig =
StorageConfiguration.builder()
.setMaxFileAgeForWriteMillis(2)
.setMinFileAgeForReadMillis(1)
.build();
DiskExporter.<SpanData>builder()
.setRootDir(rootDir)
.setFolderName(STORAGE_FOLDER_NAME)
.setStorageConfiguration(invalidConfig)
.setSerializer(serializer)
.setExportFunction(wrapped::export)
.setStorageClock(clock)
.build();
})
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"The configured max file age for writing must be lower than the configured min file age for reading");
}
@Test
void whenExportingStoredBatch_withAvailableData_andUnsuccessfullyProcessed_returnFalse()
throws IOException {
@ -115,31 +88,9 @@ class DiskExporterTest {
assertThat(new File(rootDir, STORAGE_FOLDER_NAME).exists()).isTrue();
}
@Test
void whenWritingSucceedsOnExport_returnSuccessfulResultCode() {
doReturn(new byte[2]).when(serializer).serialize(deserializedData);
CompletableResultCode completableResultCode = exporter.onExport(deserializedData);
assertThat(completableResultCode.isSuccess()).isTrue();
verifyNoInteractions(wrapped);
}
@Test
void whenWritingFailsOnExport_doExportRightAway() throws IOException {
doReturn(CompletableResultCode.ofSuccess()).when(wrapped).export(deserializedData);
exporter.onShutDown();
CompletableResultCode completableResultCode = exporter.onExport(deserializedData);
assertThat(completableResultCode.isSuccess()).isTrue();
verify(wrapped).export(deserializedData);
}
private File createDummyFile() throws IOException {
private void createDummyFile() throws IOException {
File file = new File(rootDir, STORAGE_FOLDER_NAME + "/" + 1000L);
Files.write(file.toPath(), singletonList("First line"));
return file;
}
private void setUpSerializer() {

View File

@ -17,76 +17,122 @@ import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterBuilder;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricExporter;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
public class IntegrationTest {
private InMemorySpanExporter memorySpanExporter;
private SpanDiskExporter diskSpanExporter;
private SpanToDiskExporter spanToDiskExporter;
private Tracer tracer;
private InMemoryMetricExporter memoryMetricExporter;
private MetricDiskExporter diskMetricExporter;
private MetricToDiskExporter metricToDiskExporter;
private SdkMeterProvider meterProvider;
private Meter meter;
private InMemoryLogRecordExporter memoryLogRecordExporter;
private LogRecordDiskExporter diskLogRecordExporter;
private LogRecordToDiskExporter logToDiskExporter;
private Logger logger;
private Clock clock;
@TempDir File rootDir;
private static final long INITIAL_TIME_IN_MILLIS = 1000;
private static final StorageConfiguration STORAGE_CONFIGURATION =
StorageConfiguration.getDefault();
private StorageConfiguration storageConfig;
@BeforeEach
void setUp() throws IOException {
storageConfig = StorageConfiguration.getDefault(rootDir);
clock = mock();
doReturn(MILLISECONDS.toNanos(INITIAL_TIME_IN_MILLIS)).when(clock).now();
// Setting up spans
memorySpanExporter = InMemorySpanExporter.create();
diskSpanExporter =
SpanDiskExporter.create(memorySpanExporter, rootDir, STORAGE_CONFIGURATION, clock);
tracer = createTracerProvider(diskSpanExporter).get("SpanInstrumentationScope");
ToDiskExporter<SpanData> toDiskSpanExporter =
buildToDiskExporter(SignalSerializer.ofSpans(), memorySpanExporter::export);
spanToDiskExporter = new SpanToDiskExporter(toDiskSpanExporter);
tracer = createTracerProvider(spanToDiskExporter).get("SpanInstrumentationScope");
// Setting up metrics
memoryMetricExporter = InMemoryMetricExporter.create();
diskMetricExporter =
MetricDiskExporter.create(memoryMetricExporter, rootDir, STORAGE_CONFIGURATION, clock);
meterProvider = createMeterProvider(diskMetricExporter);
ToDiskExporter<MetricData> toDiskMetricExporter =
buildToDiskExporter(SignalSerializer.ofMetrics(), memoryMetricExporter::export);
metricToDiskExporter =
new MetricToDiskExporter(
toDiskMetricExporter, memoryMetricExporter::getAggregationTemporality);
meterProvider = createMeterProvider(metricToDiskExporter);
meter = meterProvider.get("MetricInstrumentationScope");
// Setting up logs
memoryLogRecordExporter = InMemoryLogRecordExporter.create();
diskLogRecordExporter =
LogRecordDiskExporter.create(
memoryLogRecordExporter, rootDir, STORAGE_CONFIGURATION, clock);
logger = createLoggerProvider(diskLogRecordExporter).get("LogInstrumentationScope");
ToDiskExporter<LogRecordData> toDiskLogExporter =
buildToDiskExporter(SignalSerializer.ofLogs(), memoryLogRecordExporter::export);
logToDiskExporter = new LogRecordToDiskExporter(toDiskLogExporter);
logger = createLoggerProvider(logToDiskExporter).get("LogInstrumentationScope");
}
@NotNull
private <T> ToDiskExporter<T> buildToDiskExporter(
SignalSerializer<T> serializer, Function<Collection<T>, CompletableResultCode> exporter)
throws IOException {
return ToDiskExporter.<T>builder()
.setFolderName("spans")
.setStorageConfiguration(storageConfig)
.setSerializer(serializer)
.setExportFunction(exporter)
.setStorageClock(clock)
.build();
}
@NotNull
private <T> FromDiskExporterImpl<T> buildFromDiskExporter(
FromDiskExporterBuilder<T> builder,
Function<Collection<T>, CompletableResultCode> exportFunction,
SignalSerializer<T> serializer)
throws IOException {
return builder
.setExportFunction(exportFunction)
.setFolderName("spans")
.setStorageConfiguration(storageConfig)
.setDeserializer(serializer)
.setStorageClock(clock)
.build();
}
@Test
void verifySpansIntegration() throws IOException {
Span span = tracer.spanBuilder("Span name").startSpan();
span.end();
assertExporter(diskSpanExporter, () -> memorySpanExporter.getFinishedSpanItems().size());
FromDiskExporterImpl<SpanData> fromDiskExporter =
buildFromDiskExporter(
FromDiskExporterImpl.builder(), memorySpanExporter::export, SignalSerializer.ofSpans());
assertExporter(fromDiskExporter, () -> memorySpanExporter.getFinishedSpanItems().size());
}
@Test
@ -94,24 +140,34 @@ public class IntegrationTest {
meter.counterBuilder("Counter").build().add(2);
meterProvider.forceFlush();
assertExporter(diskMetricExporter, () -> memoryMetricExporter.getFinishedMetricItems().size());
FromDiskExporterImpl<MetricData> fromDiskExporter =
buildFromDiskExporter(
FromDiskExporterImpl.builder(),
memoryMetricExporter::export,
SignalSerializer.ofMetrics());
assertExporter(fromDiskExporter, () -> memoryMetricExporter.getFinishedMetricItems().size());
}
@Test
void verifyLogRecordsIntegration() throws IOException {
logger.logRecordBuilder().setBody("I'm a log!").emit();
FromDiskExporterImpl<LogRecordData> fromDiskExporter =
buildFromDiskExporter(
FromDiskExporterImpl.builder(),
memoryLogRecordExporter::export,
SignalSerializer.ofLogs());
assertExporter(
diskLogRecordExporter, () -> memoryLogRecordExporter.getFinishedLogRecordItems().size());
fromDiskExporter, () -> memoryLogRecordExporter.getFinishedLogRecordItems().size());
}
private void assertExporter(StoredBatchExporter exporter, Supplier<Integer> finishedItems)
private <T> void assertExporter(FromDiskExporterImpl<T> exporter, Supplier<Integer> finishedItems)
throws IOException {
// Verify no data has been received in the original exporter until this point.
assertEquals(0, finishedItems.get());
// Go to the future when we can read the stored items.
fastForwardTimeByMillis(STORAGE_CONFIGURATION.getMinFileAgeForReadMillis());
fastForwardTimeByMillis(storageConfig.getMinFileAgeForReadMillis());
// Read and send stored data.
assertTrue(exporter.exportStoredBatch(1, TimeUnit.SECONDS));

View File

@ -1,46 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.File;
import java.io.IOException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
class LogRecordDiskExporterTest {
private LogRecordExporter wrapped;
private LogRecordDiskExporter exporter;
private static final StorageConfiguration STORAGE_CONFIGURATION =
TestData.getDefaultConfiguration();
private static final String STORAGE_FOLDER_NAME = "logs";
@TempDir File rootDir;
@BeforeEach
void setUp() throws IOException {
wrapped = mock();
exporter = LogRecordDiskExporter.create(wrapped, rootDir, STORAGE_CONFIGURATION);
}
@Test
void verifyCacheFolderName() {
File[] files = rootDir.listFiles();
assertEquals(1, files.length);
assertEquals(STORAGE_FOLDER_NAME, files[0].getName());
}
@Test
void onFlush_returnSuccess() {
assertEquals(CompletableResultCode.ofSuccess(), exporter.flush());
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class LogRecordToDiskExporterTest {
@Mock private ToDiskExporter<LogRecordData> delegate;
@Test
void delegateShutdown_success() throws IOException {
LogRecordToDiskExporter testClass = new LogRecordToDiskExporter(delegate);
CompletableResultCode result = testClass.shutdown();
assertThat(result.isSuccess()).isTrue();
verify(delegate).shutdown();
}
@Test
void delegateShutdown_fail() throws IOException {
doThrow(new IOException("boom")).when(delegate).shutdown();
LogRecordToDiskExporter testClass = new LogRecordToDiskExporter(delegate);
CompletableResultCode result = testClass.shutdown();
assertThat(result.isSuccess()).isFalse();
verify(delegate).shutdown();
}
@Test
void delegateExport() {
LogRecordData log1 = mock();
LogRecordData log2 = mock();
List<LogRecordData> logRecords = Arrays.asList(log1, log2);
LogRecordToDiskExporter testClass = new LogRecordToDiskExporter(delegate);
testClass.export(logRecords);
verify(delegate).export(logRecords);
}
@Test
void flushReturnsSuccess() {
LogRecordToDiskExporter testClass = new LogRecordToDiskExporter(delegate);
CompletableResultCode result = testClass.flush();
assertThat(result.isSuccess()).isTrue();
}
}

View File

@ -1,62 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.File;
import java.io.IOException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
class MetricDiskExporterTest {
private MetricExporter wrapped;
private MetricDiskExporter exporter;
private static final StorageConfiguration STORAGE_CONFIGURATION =
TestData.getDefaultConfiguration();
private static final String STORAGE_FOLDER_NAME = "metrics";
@TempDir File rootDir;
@BeforeEach
void setUp() throws IOException {
wrapped = mock();
exporter = MetricDiskExporter.create(wrapped, rootDir, STORAGE_CONFIGURATION);
}
@Test
void verifyCacheFolderName() {
File[] files = rootDir.listFiles();
assertEquals(1, files.length);
assertEquals(STORAGE_FOLDER_NAME, files[0].getName());
}
@Test
void onFlush_returnSuccess() {
assertEquals(CompletableResultCode.ofSuccess(), exporter.flush());
}
@Test
void provideWrappedAggregationTemporality() {
InstrumentType instrumentType = mock();
AggregationTemporality aggregationTemporality = AggregationTemporality.DELTA;
doReturn(aggregationTemporality).when(wrapped).getAggregationTemporality(instrumentType);
assertEquals(aggregationTemporality, exporter.getAggregationTemporality(instrumentType));
verify(wrapped).getAggregationTemporality(instrumentType);
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class MetricToDiskExporterTest {
@Mock private ToDiskExporter<MetricData> delegate;
@Test
void delegateShutdown_success() throws IOException {
MetricToDiskExporter testClass =
new MetricToDiskExporter(delegate, MetricToDiskExporterTest::temporalityFn);
CompletableResultCode result = testClass.shutdown();
assertThat(result.isSuccess()).isTrue();
verify(delegate).shutdown();
}
private static AggregationTemporality temporalityFn(InstrumentType instrumentType) {
return CUMULATIVE;
}
@Test
void delegateShutdown_fail() throws IOException {
doThrow(new IOException("boom")).when(delegate).shutdown();
MetricToDiskExporter testClass =
new MetricToDiskExporter(delegate, MetricToDiskExporterTest::temporalityFn);
CompletableResultCode result = testClass.shutdown();
assertThat(result.isSuccess()).isFalse();
verify(delegate).shutdown();
}
@Test
void delegateExport() {
MetricData metric1 = mock();
MetricData metric2 = mock();
List<MetricData> metrics = Arrays.asList(metric1, metric2);
MetricToDiskExporter testClass =
new MetricToDiskExporter(delegate, MetricToDiskExporterTest::temporalityFn);
testClass.export(metrics);
verify(delegate).export(metrics);
}
@Test
void flushReturnsSuccess() {
MetricToDiskExporter testClass =
new MetricToDiskExporter(delegate, MetricToDiskExporterTest::temporalityFn);
CompletableResultCode result = testClass.flush();
assertThat(result.isSuccess()).isTrue();
}
}

View File

@ -1,46 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.File;
import java.io.IOException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
class SpanDiskExporterTest {
private SpanExporter wrapped;
private SpanDiskExporter exporter;
private static final StorageConfiguration STORAGE_CONFIGURATION =
TestData.getDefaultConfiguration();
private static final String STORAGE_FOLDER_NAME = "spans";
@TempDir File rootDir;
@BeforeEach
void setUp() throws IOException {
wrapped = mock();
exporter = SpanDiskExporter.create(wrapped, rootDir, STORAGE_CONFIGURATION);
}
@Test
void verifyCacheFolderName() {
File[] files = rootDir.listFiles();
assertEquals(1, files.length);
assertEquals(STORAGE_FOLDER_NAME, files[0].getName());
}
@Test
void onFlush_returnSuccess() {
assertEquals(CompletableResultCode.ofSuccess(), exporter.flush());
}
}

View File

@ -0,0 +1,149 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.models.SpanDataImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.testutils.TestData;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentCaptor;
class SpanFromDiskExporterTest {
@TempDir File tempDir;
@SuppressWarnings("unchecked")
@Test
void fromDisk() throws Exception {
StorageConfiguration config =
StorageConfiguration.builder()
.setRootDir(tempDir)
.setMaxFileAgeForWriteMillis(TimeUnit.HOURS.toMillis(24))
.setMinFileAgeForReadMillis(0)
.setMaxFileAgeForReadMillis(TimeUnit.HOURS.toMillis(24))
.setTemporaryFileProvider(DefaultTemporaryFileProvider.getInstance())
.build();
List<SpanData> spans = writeSomeSpans(config);
SpanExporter exporter = mock();
ArgumentCaptor<Collection<SpanData>> capture = ArgumentCaptor.forClass(Collection.class);
when(exporter.export(capture.capture())).thenReturn(CompletableResultCode.ofSuccess());
SpanFromDiskExporter testClass = SpanFromDiskExporter.create(exporter, config);
boolean result = testClass.exportStoredBatch(30, TimeUnit.SECONDS);
assertThat(result).isTrue();
List<SpanData> exportedSpans = (List<SpanData>) capture.getValue();
long now = spans.get(0).getStartEpochNanos();
SpanData expected1 = makeSpan1(TraceFlags.getSampled(), now);
SpanData expected2 = makeSpan2(TraceFlags.getSampled(), now);
assertThat(exportedSpans.get(0)).isEqualTo(expected1);
assertThat(exportedSpans.get(1)).isEqualTo(expected2);
assertThat(exportedSpans).containsExactly(expected1, expected2);
verify(exporter).export(eq(Arrays.asList(expected1, expected2)));
}
private static List<SpanData> writeSomeSpans(StorageConfiguration config) throws Exception {
long now = System.currentTimeMillis() * 1_000_000;
SpanData span1 = makeSpan1(TraceFlags.getDefault(), now);
SpanData span2 = makeSpan2(TraceFlags.getSampled(), now);
List<SpanData> spans = Arrays.asList(span1, span2);
SignalSerializer<SpanData> serializer = SignalSerializer.ofSpans();
File subdir = new File(config.getRootDir(), "spans");
assertTrue(subdir.mkdir());
Storage storage =
Storage.builder().setStorageConfiguration(config).setFolderName("spans").build();
storage.write(serializer.serialize(spans));
storage.close();
return spans;
}
private static SpanData makeSpan1(TraceFlags parentSpanContextFlags, long now) {
Attributes attributes = Attributes.of(AttributeKey.stringKey("foo"), "bar");
SpanContext parentContext = TestData.makeContext(parentSpanContextFlags, TestData.SPAN_ID);
return SpanDataImpl.builder()
.setName("span1")
.setSpanContext(
SpanContext.create(
TestData.TRACE_ID,
TestData.SPAN_ID,
TraceFlags.getDefault(),
TraceState.getDefault()))
.setParentSpanContext(parentContext)
.setInstrumentationScopeInfo(TestData.INSTRUMENTATION_SCOPE_INFO_FULL)
.setStatus(StatusData.create(StatusCode.OK, "whatever"))
.setAttributes(attributes)
.setKind(SpanKind.SERVER)
.setStartEpochNanos(now)
.setEndEpochNanos(now + 50_000_000)
.setTotalRecordedEvents(0)
.setTotalRecordedLinks(0)
.setTotalAttributeCount(attributes.size())
.setLinks(Collections.emptyList())
.setEvents(Collections.emptyList())
.setResource(Resource.getDefault())
.build();
}
private static SpanData makeSpan2(TraceFlags parentSpanContextFlags, long now) {
Attributes attributes = Attributes.of(AttributeKey.stringKey("bar"), "baz");
String spanId = "aaaaaaaaa12312312";
SpanContext parentContext = TestData.makeContext(parentSpanContextFlags, spanId);
return SpanDataImpl.builder()
.setName("span2")
.setSpanContext(
SpanContext.create(
TestData.TRACE_ID, spanId, TraceFlags.getSampled(), TraceState.getDefault()))
.setParentSpanContext(parentContext)
.setInstrumentationScopeInfo(TestData.INSTRUMENTATION_SCOPE_INFO_FULL)
.setStatus(StatusData.create(StatusCode.OK, "excellent"))
.setAttributes(attributes)
.setKind(SpanKind.CLIENT)
.setStartEpochNanos(now + 12)
.setEndEpochNanos(now + 12 + 40_000_000)
.setTotalRecordedEvents(0)
.setTotalRecordedLinks(0)
.setTotalAttributeCount(attributes.size())
.setLinks(Collections.emptyList())
.setEvents(Collections.emptyList())
.setResource(Resource.getDefault())
.build();
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class SpanToDiskExporterTest {
@Mock private ToDiskExporter<SpanData> delegate;
@Test
void delegateShutdown_success() throws IOException {
SpanToDiskExporter testClass = new SpanToDiskExporter(delegate);
CompletableResultCode result = testClass.shutdown();
assertThat(result.isSuccess()).isTrue();
verify(delegate).shutdown();
}
@Test
void delegateShutdown_fail() throws IOException {
doThrow(new IOException("boom")).when(delegate).shutdown();
SpanToDiskExporter testClass = new SpanToDiskExporter(delegate);
CompletableResultCode result = testClass.shutdown();
assertThat(result.isSuccess()).isFalse();
verify(delegate).shutdown();
}
@Test
void delegateExport() {
SpanData span1 = mock();
SpanData span2 = mock();
List<SpanData> spans = Arrays.asList(span1, span2);
SpanToDiskExporter testClass = new SpanToDiskExporter(delegate);
testClass.export(spans);
verify(delegate).export(spans);
}
@Test
void flushReturnsSuccess() {
SpanToDiskExporter testClass = new SpanToDiskExporter(delegate);
CompletableResultCode result = testClass.flush();
assertThat(result.isSuccess()).isTrue();
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.File;
import org.junit.jupiter.api.Test;
class ToDiskExporterBuilderTest {
@Test
void whenMinFileReadIsNotGraterThanMaxFileWrite_throwException() {
StorageConfiguration invalidConfig =
StorageConfiguration.builder()
.setMaxFileAgeForWriteMillis(2)
.setMinFileAgeForReadMillis(1)
.setRootDir(new File("."))
.build();
assertThatThrownBy(
() -> ToDiskExporter.<SpanData>builder().setStorageConfiguration(invalidConfig))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"The configured max file age for writing must be lower than the configured min file age for reading");
}
}

View File

@ -0,0 +1,93 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.disk.buffering.internal.exporter;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class ToDiskExporterTest {
private final List<String> records = Arrays.asList("one", "two", "three");
private final byte[] serialized = "one,two,three".getBytes(UTF_8);
@Mock private SignalSerializer<String> serializer;
@Mock private Storage storage;
private ToDiskExporter<String> toDiskExporter;
private Function<Collection<String>, CompletableResultCode> exportFn;
private Collection<String> exportedFnSeen;
private AtomicReference<CompletableResultCode> exportFnResultToReturn;
@BeforeEach
void setup() {
exportedFnSeen = null;
exportFnResultToReturn = new AtomicReference<>(null);
exportFn =
(Collection<String> x) -> {
exportedFnSeen = x;
return exportFnResultToReturn.get();
};
toDiskExporter = new ToDiskExporter<>(serializer, exportFn, storage);
when(serializer.serialize(records)).thenReturn(serialized);
}
@Test
void whenWritingSucceedsOnExport_returnSuccessfulResultCode() throws Exception {
when(storage.write(serialized)).thenReturn(true);
CompletableResultCode completableResultCode = toDiskExporter.export(records);
assertThat(completableResultCode.isSuccess()).isTrue();
verify(storage).write(serialized);
assertThat(exportedFnSeen).isNull();
}
@Test
void whenWritingFailsOnExport_doExportRightAway() throws Exception {
when(storage.write(serialized)).thenReturn(false);
exportFnResultToReturn.set(CompletableResultCode.ofSuccess());
CompletableResultCode completableResultCode = toDiskExporter.export(records);
assertThat(completableResultCode.isSuccess()).isTrue();
assertThat(exportedFnSeen).isEqualTo(records);
}
@Test
void whenExceptionInWrite_doExportRightAway() throws Exception {
when(storage.write(serialized)).thenThrow(new IOException("boom"));
exportFnResultToReturn.set(CompletableResultCode.ofFailure());
CompletableResultCode completableResultCode = toDiskExporter.export(records);
assertThat(completableResultCode.isSuccess()).isFalse();
assertThat(exportedFnSeen).isEqualTo(records);
}
@Test
void shutdownClosesStorage() throws Exception {
toDiskExporter.export(records);
toDiskExporter.shutdown();
verify(storage).close();
}
}

View File

@ -5,8 +5,14 @@
package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics;
import static io.opentelemetry.contrib.disk.buffering.testutils.TestData.makeContext;
import static io.opentelemetry.contrib.disk.buffering.testutils.TestData.makeLongGauge;
import static io.opentelemetry.contrib.disk.buffering.testutils.TestData.makeLongPointData;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.contrib.disk.buffering.testutils.TestData;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
@ -16,10 +22,8 @@ import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramData;
import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData;
import io.opentelemetry.sdk.metrics.data.GaugeData;
import io.opentelemetry.sdk.metrics.data.HistogramData;
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.SumData;
@ -34,8 +38,6 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramP
import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongExemplarData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData;
@ -44,224 +46,234 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableValueAtQuantile;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
class MetricDataMapperTest {
private static final LongExemplarData LONG_EXEMPLAR_DATA =
ImmutableLongExemplarData.create(TestData.ATTRIBUTES, 100L, TestData.SPAN_CONTEXT, 1L);
private static final DoubleExemplarData DOUBLE_EXEMPLAR_DATA =
ImmutableDoubleExemplarData.create(TestData.ATTRIBUTES, 100L, TestData.SPAN_CONTEXT, 1.0);
private static final LongPointData LONG_POINT_DATA =
ImmutableLongPointData.create(
1L, 2L, TestData.ATTRIBUTES, 1L, Collections.singletonList(LONG_EXEMPLAR_DATA));
private static final DoublePointData DOUBLE_POINT_DATA =
ImmutableDoublePointData.create(
1L, 2L, TestData.ATTRIBUTES, 1.0, Collections.singletonList(DOUBLE_EXEMPLAR_DATA));
private static final GaugeData<LongPointData> LONG_GAUGE_DATA =
ImmutableGaugeData.create(Collections.singletonList(LONG_POINT_DATA));
private static final GaugeData<DoublePointData> DOUBLE_GAUGE_DATA =
ImmutableGaugeData.create(Collections.singletonList(DOUBLE_POINT_DATA));
private static final SumData<LongPointData> LONG_SUM_DATA =
ImmutableSumData.create(
true, AggregationTemporality.DELTA, Collections.singletonList(LONG_POINT_DATA));
private static final SumData<DoublePointData> DOUBLE_SUM_DATA =
ImmutableSumData.create(
true, AggregationTemporality.DELTA, Collections.singletonList(DOUBLE_POINT_DATA));
private static final ValueAtQuantile VALUE_AT_QUANTILE =
ImmutableValueAtQuantile.create(2.0, 1.0);
private static final SummaryPointData SUMMARY_POINT_DATA =
ImmutableSummaryPointData.create(
1L, 2L, TestData.ATTRIBUTES, 1L, 2.0, Collections.singletonList(VALUE_AT_QUANTILE));
private static final SummaryData SUMMARY_DATA =
ImmutableSummaryData.create(Collections.singletonList(SUMMARY_POINT_DATA));
private static final HistogramPointData HISTOGRAM_POINT_DATA =
ImmutableHistogramPointData.create(
1L,
2L,
TestData.ATTRIBUTES,
15.0,
true,
4.0,
true,
7.0,
Collections.singletonList(10.0),
Arrays.asList(1L, 2L),
Collections.singletonList(DOUBLE_EXEMPLAR_DATA));
private static final ExponentialHistogramBuckets POSITIVE_BUCKET =
ImmutableExponentialHistogramBuckets.create(1, 10, Arrays.asList(1L, 10L));
private static final ExponentialHistogramBuckets NEGATIVE_BUCKET =
ImmutableExponentialHistogramBuckets.create(1, 0, Collections.emptyList());
private static final ExponentialHistogramPointData EXPONENTIAL_HISTOGRAM_POINT_DATA =
ImmutableExponentialHistogramPointData.create(
1,
10.0,
1L,
true,
2.0,
true,
4.0,
POSITIVE_BUCKET,
NEGATIVE_BUCKET,
1L,
2L,
TestData.ATTRIBUTES,
Collections.singletonList(DOUBLE_EXEMPLAR_DATA));
private static final HistogramData HISTOGRAM_DATA =
ImmutableHistogramData.create(
AggregationTemporality.CUMULATIVE, Collections.singletonList(HISTOGRAM_POINT_DATA));
private static final ExponentialHistogramData EXPONENTIAL_HISTOGRAM_DATA =
ImmutableExponentialHistogramData.create(
AggregationTemporality.CUMULATIVE,
Collections.singletonList(EXPONENTIAL_HISTOGRAM_POINT_DATA));
private static final MetricData LONG_GAUGE_METRIC =
ImmutableMetricData.createLongGauge(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Long gauge name",
"Long gauge description",
"ms",
LONG_GAUGE_DATA);
private static final MetricData DOUBLE_GAUGE_METRIC =
ImmutableMetricData.createDoubleGauge(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Double gauge name",
"Double gauge description",
"ms",
DOUBLE_GAUGE_DATA);
private static final MetricData LONG_SUM_METRIC =
ImmutableMetricData.createLongSum(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Long sum name",
"Long sum description",
"ms",
LONG_SUM_DATA);
private static final MetricData DOUBLE_SUM_METRIC =
ImmutableMetricData.createDoubleSum(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Double sum name",
"Double sum description",
"ms",
DOUBLE_SUM_DATA);
private static final MetricData SUMMARY_METRIC =
ImmutableMetricData.createDoubleSummary(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Summary name",
"Summary description",
"ms",
SUMMARY_DATA);
private static final MetricData HISTOGRAM_METRIC =
ImmutableMetricData.createDoubleHistogram(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Histogram name",
"Histogram description",
"ms",
HISTOGRAM_DATA);
private static final MetricData EXPONENTIAL_HISTOGRAM_METRIC =
ImmutableMetricData.createExponentialHistogram(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Exponential histogram name",
"Exponential histogram description",
"ms",
EXPONENTIAL_HISTOGRAM_DATA);
@Test
void verifyLongGaugeMapping() {
Metric proto = mapToProto(LONG_GAUGE_METRIC);
MetricData longGauge = makeLongGauge(TraceFlags.getDefault());
MetricData expected = makeLongGauge(TraceFlags.getSampled());
assertEquals(
LONG_GAUGE_METRIC,
mapToSdk(
proto,
LONG_GAUGE_METRIC.getResource(),
LONG_GAUGE_METRIC.getInstrumentationScopeInfo()));
Metric proto = mapToProto(longGauge);
MetricData result =
mapToSdk(proto, longGauge.getResource(), longGauge.getInstrumentationScopeInfo());
assertThat(result).isEqualTo(expected);
}
@Test
void verifyDoubleGaugeMapping() {
Metric proto = mapToProto(DOUBLE_GAUGE_METRIC);
MetricData doubleGauge = makeDoubleGauge(TraceFlags.getDefault());
MetricData expected = makeDoubleGauge(TraceFlags.getSampled());
assertEquals(
DOUBLE_GAUGE_METRIC,
mapToSdk(
proto,
DOUBLE_GAUGE_METRIC.getResource(),
DOUBLE_GAUGE_METRIC.getInstrumentationScopeInfo()));
Metric proto = mapToProto(doubleGauge);
MetricData result =
mapToSdk(proto, doubleGauge.getResource(), doubleGauge.getInstrumentationScopeInfo());
assertThat(result).isEqualTo(expected);
}
@Test
void verifyLongSumMapping() {
Metric proto = mapToProto(LONG_SUM_METRIC);
MetricData longSum = makeLongSum(TraceFlags.getDefault());
MetricData expected = makeLongSum(TraceFlags.getSampled());
assertEquals(
LONG_SUM_METRIC,
mapToSdk(
proto, LONG_SUM_METRIC.getResource(), LONG_SUM_METRIC.getInstrumentationScopeInfo()));
Metric proto = mapToProto(longSum);
MetricData result =
mapToSdk(proto, TestData.RESOURCE_FULL, longSum.getInstrumentationScopeInfo());
assertThat(expected).isEqualTo(result);
}
@Test
void verifyDoubleSumMapping() {
Metric proto = mapToProto(DOUBLE_SUM_METRIC);
MetricData doubleSum = makeDoubleSum(TraceFlags.getDefault());
MetricData expected = makeDoubleSum(TraceFlags.getSampled());
assertEquals(
DOUBLE_SUM_METRIC,
mapToSdk(
proto,
DOUBLE_SUM_METRIC.getResource(),
DOUBLE_SUM_METRIC.getInstrumentationScopeInfo()));
Metric proto = mapToProto(doubleSum);
MetricData result =
mapToSdk(proto, doubleSum.getResource(), doubleSum.getInstrumentationScopeInfo());
assertThat(result).isEqualTo(expected);
}
@Test
void verifySummaryMapping() {
Metric proto = mapToProto(SUMMARY_METRIC);
ValueAtQuantile value = ImmutableValueAtQuantile.create(2.0, 1.0);
SummaryPointData pointData =
ImmutableSummaryPointData.create(
1L, 2L, TestData.ATTRIBUTES, 1L, 2.0, Collections.singletonList(value));
SummaryData data = ImmutableSummaryData.create(Collections.singletonList(pointData));
MetricData summaryMetric =
ImmutableMetricData.createDoubleSummary(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Summary name",
"Summary description",
"ms",
data);
Metric proto = mapToProto(summaryMetric);
assertEquals(
SUMMARY_METRIC,
mapToSdk(
proto, SUMMARY_METRIC.getResource(), SUMMARY_METRIC.getInstrumentationScopeInfo()));
summaryMetric,
mapToSdk(proto, summaryMetric.getResource(), summaryMetric.getInstrumentationScopeInfo()));
}
@Test
void verifyHistogramMapping() {
Metric proto = mapToProto(HISTOGRAM_METRIC);
assertEquals(
HISTOGRAM_METRIC,
MetricData histogramMetric = makeHistogram(TraceFlags.getDefault());
MetricData expected = makeHistogram(TraceFlags.getSampled());
Metric proto = mapToProto(histogramMetric);
MetricData result =
mapToSdk(
proto, HISTOGRAM_METRIC.getResource(), HISTOGRAM_METRIC.getInstrumentationScopeInfo()));
proto, histogramMetric.getResource(), histogramMetric.getInstrumentationScopeInfo());
assertThat(result).isEqualTo(expected);
}
@Test
void verifyExponentialHistogramMapping() {
Metric proto = mapToProto(EXPONENTIAL_HISTOGRAM_METRIC);
MetricData histogram = makeExponentialHistogram(TraceFlags.getDefault());
MetricData expected = makeExponentialHistogram(TraceFlags.getSampled());
assertEquals(
EXPONENTIAL_HISTOGRAM_METRIC,
mapToSdk(
proto,
EXPONENTIAL_HISTOGRAM_METRIC.getResource(),
EXPONENTIAL_HISTOGRAM_METRIC.getInstrumentationScopeInfo()));
Metric proto = mapToProto(histogram);
MetricData result =
mapToSdk(proto, histogram.getResource(), histogram.getInstrumentationScopeInfo());
assertThat(result).isEqualTo(expected);
}
@NotNull
private static MetricData makeDoubleGauge(TraceFlags flags) {
DoublePointData point = makeDoublePointData(flags);
return ImmutableMetricData.createDoubleGauge(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Double gauge name",
"Double gauge description",
"ms",
ImmutableGaugeData.create(Collections.singletonList(point)));
}
@NotNull
private static List<DoubleExemplarData> makeDoubleExemplars(TraceFlags flags) {
SpanContext context = makeContext(flags);
DoubleExemplarData doubleExemplarData =
ImmutableDoubleExemplarData.create(TestData.ATTRIBUTES, 100L, context, 1.0);
return Collections.singletonList(doubleExemplarData);
}
@NotNull
private static MetricData makeLongSum(TraceFlags flags) {
LongPointData pointData = makeLongPointData(flags);
SumData<LongPointData> sumData =
ImmutableSumData.create(
true, AggregationTemporality.DELTA, Collections.singletonList(pointData));
return ImmutableMetricData.createLongSum(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Long sum name",
"Long sum description",
"ms",
sumData);
}
@NotNull
private static MetricData makeDoubleSum(TraceFlags flags) {
DoublePointData doublePointData = makeDoublePointData(flags);
SumData<DoublePointData> sumData =
ImmutableSumData.create(
true, AggregationTemporality.DELTA, Collections.singletonList(doublePointData));
return ImmutableMetricData.createDoubleSum(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Double sum name",
"Double sum description",
"ms",
sumData);
}
@NotNull
private static DoublePointData makeDoublePointData(TraceFlags flags) {
return ImmutableDoublePointData.create(
1L, 2L, TestData.ATTRIBUTES, 1.0, makeDoubleExemplars(flags));
}
@NotNull
private static MetricData makeExponentialHistogram(TraceFlags flags) {
ExponentialHistogramBuckets positiveBucket =
ImmutableExponentialHistogramBuckets.create(1, 10, Arrays.asList(1L, 10L));
ExponentialHistogramBuckets negativeBucket =
ImmutableExponentialHistogramBuckets.create(1, 0, Collections.emptyList());
ExponentialHistogramPointData pointData =
ImmutableExponentialHistogramPointData.create(
1,
10.0,
1L,
true,
2.0,
true,
4.0,
positiveBucket,
negativeBucket,
1L,
2L,
TestData.ATTRIBUTES,
makeDoubleExemplars(flags));
ExponentialHistogramData histogramData =
ImmutableExponentialHistogramData.create(
AggregationTemporality.CUMULATIVE, Collections.singletonList(pointData));
return ImmutableMetricData.createExponentialHistogram(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Exponential histogram name",
"Exponential histogram description",
"ms",
histogramData);
}
@NotNull
private static MetricData makeHistogram(TraceFlags flags) {
HistogramPointData dataPoint =
ImmutableHistogramPointData.create(
1L,
2L,
TestData.ATTRIBUTES,
15.0,
true,
4.0,
true,
7.0,
Collections.singletonList(10.0),
Arrays.asList(1L, 2L),
makeDoubleExemplars(flags));
HistogramData data =
ImmutableHistogramData.create(
AggregationTemporality.CUMULATIVE, Collections.singletonList(dataPoint));
return ImmutableMetricData.createDoubleHistogram(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Histogram name",
"Histogram description",
"ms",
data);
}
private static Metric mapToProto(MetricData source) {

View File

@ -8,19 +8,13 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.m
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.contrib.disk.buffering.testutils.TestData;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.MetricsData;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.sdk.metrics.data.GaugeData;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongExemplarData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -29,54 +23,12 @@ import org.junit.jupiter.api.Test;
class ProtoMetricsDataMapperTest {
private static final LongExemplarData LONG_EXEMPLAR_DATA =
ImmutableLongExemplarData.create(TestData.ATTRIBUTES, 100L, TestData.SPAN_CONTEXT, 1L);
private static final LongPointData LONG_POINT_DATA =
ImmutableLongPointData.create(
1L, 2L, TestData.ATTRIBUTES, 1L, Collections.singletonList(LONG_EXEMPLAR_DATA));
private static final GaugeData<LongPointData> LONG_GAUGE_DATA =
ImmutableGaugeData.create(Collections.singletonList(LONG_POINT_DATA));
private static final MetricData LONG_GAUGE_METRIC =
ImmutableMetricData.createLongGauge(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Long gauge name",
"Long gauge description",
"ms",
LONG_GAUGE_DATA);
private static final MetricData OTHER_LONG_GAUGE_METRIC =
ImmutableMetricData.createLongGauge(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Long gauge name",
"Long gauge description",
"ms",
LONG_GAUGE_DATA);
private static final MetricData LONG_GAUGE_METRIC_WITH_DIFFERENT_SCOPE_SAME_RESOURCE =
ImmutableMetricData.createLongGauge(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_WITHOUT_VERSION,
"Long gauge name",
"Long gauge description",
"ms",
LONG_GAUGE_DATA);
private static final MetricData LONG_GAUGE_METRIC_WITH_DIFFERENT_RESOURCE =
ImmutableMetricData.createLongGauge(
TestData.RESOURCE_WITHOUT_SCHEMA_URL,
TestData.INSTRUMENTATION_SCOPE_INFO_WITHOUT_VERSION,
"Long gauge name",
"Long gauge description",
"ms",
LONG_GAUGE_DATA);
@Test
void verifyConversionDataStructure() {
List<MetricData> signals = Collections.singletonList(LONG_GAUGE_METRIC);
MetricData gauge1 = TestData.makeLongGauge(TraceFlags.getDefault());
List<MetricData> signals = Collections.singletonList(gauge1);
MetricData expectedGauge1 = TestData.makeLongGauge(TraceFlags.getSampled());
List<MetricData> expectedSignals = Collections.singletonList(expectedGauge1);
MetricsData proto = mapToProto(signals);
@ -85,12 +37,17 @@ class ProtoMetricsDataMapperTest {
assertEquals(1, resourceMetrics.get(0).scope_metrics.size());
assertEquals(1, resourceMetrics.get(0).scope_metrics.get(0).metrics.size());
assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(signals);
assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(expectedSignals);
}
@Test
void verifyMultipleMetricsWithSameResourceAndScope() {
List<MetricData> signals = Arrays.asList(LONG_GAUGE_METRIC, OTHER_LONG_GAUGE_METRIC);
MetricData gauge1 = TestData.makeLongGauge(TraceFlags.getDefault());
MetricData gauge2 = TestData.makeLongGauge(TraceFlags.getDefault());
List<MetricData> signals = Arrays.asList(gauge1, gauge2);
MetricData expectedGauge1 = TestData.makeLongGauge(TraceFlags.getSampled());
MetricData expectedGauge2 = TestData.makeLongGauge(TraceFlags.getSampled());
List<MetricData> expectedSignals = Arrays.asList(expectedGauge1, expectedGauge2);
MetricsData proto = mapToProto(signals);
@ -101,13 +58,25 @@ class ProtoMetricsDataMapperTest {
List<Metric> metrics = scopeMetrics.get(0).metrics;
assertEquals(2, metrics.size());
assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(signals);
List<MetricData> result = mapFromProto(proto);
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedSignals);
}
@Test
void verifyMultipleMetricsWithSameResourceDifferentScope() {
List<MetricData> signals =
Arrays.asList(LONG_GAUGE_METRIC, LONG_GAUGE_METRIC_WITH_DIFFERENT_SCOPE_SAME_RESOURCE);
MetricData gauge1 = TestData.makeLongGauge(TraceFlags.getDefault());
MetricData gauge2 =
TestData.makeLongGauge(
TraceFlags.getDefault(), TestData.INSTRUMENTATION_SCOPE_INFO_WITHOUT_VERSION);
MetricData expectedGauge1 = TestData.makeLongGauge(TraceFlags.getSampled());
MetricData expectedGauge2 =
TestData.makeLongGauge(
TraceFlags.getSampled(), TestData.INSTRUMENTATION_SCOPE_INFO_WITHOUT_VERSION);
List<MetricData> signals = Arrays.asList(gauge1, gauge2);
List<MetricData> expectedSignals = Arrays.asList(expectedGauge1, expectedGauge2);
MetricsData proto = mapToProto(signals);
@ -122,13 +91,27 @@ class ProtoMetricsDataMapperTest {
assertEquals(1, firstScopeMetrics.size());
assertEquals(1, secondScopeMetrics.size());
assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(signals);
assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(expectedSignals);
}
@Test
void verifyMultipleMetricsWithDifferentResource() {
List<MetricData> signals =
Arrays.asList(LONG_GAUGE_METRIC, LONG_GAUGE_METRIC_WITH_DIFFERENT_RESOURCE);
MetricData gauge1 = TestData.makeLongGauge(TraceFlags.getDefault());
MetricData gauge2 =
TestData.makeLongGauge(
TraceFlags.getDefault(),
TestData.RESOURCE_WITHOUT_SCHEMA_URL,
TestData.INSTRUMENTATION_SCOPE_INFO_WITHOUT_VERSION);
List<MetricData> signals = Arrays.asList(gauge1, gauge2);
MetricData expectedGauge1 = TestData.makeLongGauge(TraceFlags.getSampled());
MetricData expectedGauge2 =
TestData.makeLongGauge(
TraceFlags.getSampled(),
TestData.RESOURCE_WITHOUT_SCHEMA_URL,
TestData.INSTRUMENTATION_SCOPE_INFO_WITHOUT_VERSION);
List<MetricData> expectedSignals = Arrays.asList(expectedGauge1, expectedGauge2);
// , LONG_GAUGE_METRIC_WITH_DIFFERENT_RESOURCE);
// List<MetricData> expectedSignals = Arrays.asList(expected);
MetricsData proto = mapToProto(signals);
@ -147,7 +130,7 @@ class ProtoMetricsDataMapperTest {
assertEquals(1, firstMetrics.size());
assertEquals(1, secondMetrics.size());
assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(signals);
assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(expectedSignals);
}
private static MetricsData mapToProto(Collection<MetricData> signals) {

View File

@ -5,6 +5,11 @@
package io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers;
import static java.util.Collections.singletonList;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.contrib.disk.buffering.testutils.BaseSignalSerializerTest;
import io.opentelemetry.contrib.disk.buffering.testutils.TestData;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
@ -40,155 +45,229 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableValueAtQuantile;
import java.util.Arrays;
import java.util.Collections;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
class MetricDataSerializerTest extends BaseSignalSerializerTest<MetricData> {
private static final LongExemplarData LONG_EXEMPLAR_DATA =
ImmutableLongExemplarData.create(TestData.ATTRIBUTES, 100L, TestData.SPAN_CONTEXT, 1L);
private static final DoubleExemplarData DOUBLE_EXEMPLAR_DATA =
ImmutableDoubleExemplarData.create(TestData.ATTRIBUTES, 100L, TestData.SPAN_CONTEXT, 1.0);
private static final LongPointData LONG_POINT_DATA =
ImmutableLongPointData.create(
1L, 2L, TestData.ATTRIBUTES, 1L, Collections.singletonList(LONG_EXEMPLAR_DATA));
private static final DoublePointData DOUBLE_POINT_DATA =
ImmutableDoublePointData.create(
1L, 2L, TestData.ATTRIBUTES, 1.0, Collections.singletonList(DOUBLE_EXEMPLAR_DATA));
private static final GaugeData<LongPointData> LONG_GAUGE_DATA =
ImmutableGaugeData.create(Collections.singletonList(LONG_POINT_DATA));
private static final GaugeData<DoublePointData> DOUBLE_GAUGE_DATA =
ImmutableGaugeData.create(Collections.singletonList(DOUBLE_POINT_DATA));
private static final SumData<LongPointData> LONG_SUM_DATA =
ImmutableSumData.create(
true, AggregationTemporality.DELTA, Collections.singletonList(LONG_POINT_DATA));
private static final SumData<DoublePointData> DOUBLE_SUM_DATA =
ImmutableSumData.create(
true, AggregationTemporality.DELTA, Collections.singletonList(DOUBLE_POINT_DATA));
private static final ValueAtQuantile VALUE_AT_QUANTILE =
ImmutableValueAtQuantile.create(2.0, 1.0);
private static final SummaryPointData SUMMARY_POINT_DATA =
ImmutableSummaryPointData.create(
1L, 2L, TestData.ATTRIBUTES, 1L, 2.0, Collections.singletonList(VALUE_AT_QUANTILE));
private static final SummaryData SUMMARY_DATA =
ImmutableSummaryData.create(Collections.singletonList(SUMMARY_POINT_DATA));
private static final HistogramPointData HISTOGRAM_POINT_DATA =
ImmutableHistogramPointData.create(
1L,
2L,
TestData.ATTRIBUTES,
15.0,
true,
4.0,
true,
7.0,
Collections.singletonList(10.0),
Arrays.asList(1L, 2L),
Collections.singletonList(DOUBLE_EXEMPLAR_DATA));
private static final ExponentialHistogramBuckets POSITIVE_BUCKET =
ImmutableExponentialHistogramBuckets.create(1, 10, Arrays.asList(1L, 10L));
private static final ExponentialHistogramBuckets NEGATIVE_BUCKET =
ImmutableExponentialHistogramBuckets.create(1, 0, Collections.emptyList());
private static final ExponentialHistogramPointData EXPONENTIAL_HISTOGRAM_POINT_DATA =
ImmutableExponentialHistogramPointData.create(
1,
10.0,
1L,
true,
2.0,
true,
4.0,
POSITIVE_BUCKET,
NEGATIVE_BUCKET,
1L,
2L,
TestData.ATTRIBUTES,
Collections.singletonList(DOUBLE_EXEMPLAR_DATA));
private static final HistogramData HISTOGRAM_DATA =
ImmutableHistogramData.create(
AggregationTemporality.CUMULATIVE, Collections.singletonList(HISTOGRAM_POINT_DATA));
private static final ExponentialHistogramData EXPONENTIAL_HISTOGRAM_DATA =
ImmutableExponentialHistogramData.create(
AggregationTemporality.CUMULATIVE,
Collections.singletonList(EXPONENTIAL_HISTOGRAM_POINT_DATA));
private static final MetricData LONG_GAUGE_METRIC =
ImmutableMetricData.createLongGauge(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Long gauge name",
"Long gauge description",
"ms",
LONG_GAUGE_DATA);
private static final MetricData DOUBLE_GAUGE_METRIC =
ImmutableMetricData.createDoubleGauge(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Double gauge name",
"Double gauge description",
"ms",
DOUBLE_GAUGE_DATA);
private static final MetricData LONG_SUM_METRIC =
ImmutableMetricData.createLongSum(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Long sum name",
"Long sum description",
"ms",
LONG_SUM_DATA);
private static final MetricData DOUBLE_SUM_METRIC =
ImmutableMetricData.createDoubleSum(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Double sum name",
"Double sum description",
"ms",
DOUBLE_SUM_DATA);
private static final MetricData SUMMARY_METRIC =
ImmutableMetricData.createDoubleSummary(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Summary name",
"Summary description",
"ms",
SUMMARY_DATA);
private static final MetricData HISTOGRAM_METRIC =
ImmutableMetricData.createDoubleHistogram(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Histogram name",
"Histogram description",
"ms",
HISTOGRAM_DATA);
private static final MetricData EXPONENTIAL_HISTOGRAM_METRIC =
ImmutableMetricData.createExponentialHistogram(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Exponential histogram name",
"Exponential histogram description",
"ms",
EXPONENTIAL_HISTOGRAM_DATA);
@Test
void verifyLongGauge() {
MetricData metric = createLongGauge(TraceFlags.getDefault());
MetricData expected = createLongGauge(TraceFlags.getSampled());
assertSerializeDeserialize(metric, expected);
}
@Test
void verifySerialization() {
assertSerialization(
LONG_GAUGE_METRIC,
DOUBLE_GAUGE_METRIC,
LONG_SUM_METRIC,
DOUBLE_SUM_METRIC,
SUMMARY_METRIC,
HISTOGRAM_METRIC,
EXPONENTIAL_HISTOGRAM_METRIC);
void verifyDoubleGauge() {
MetricData metric = createDoubleGauge(TraceFlags.getDefault());
// This silly work exists here so that we can verify that the exemplar that we get back has a
// span context
// whose flags are set to something different. This is because the trace flags are NOT part of
// the exemplar
// protos, and what we get back is not what we put in.
MetricData expected = createDoubleGauge(TraceFlags.getSampled());
assertSerializeDeserialize(metric, expected);
}
@Test
void verifyLongSum() {
MetricData metric = makeLongSum(TraceFlags.getDefault());
MetricData expected = makeLongSum(TraceFlags.getSampled());
assertSerializeDeserialize(metric, expected);
}
@Test
void verifySummary() {
ValueAtQuantile value = ImmutableValueAtQuantile.create(2.0, 1.0);
SummaryPointData pointData =
ImmutableSummaryPointData.create(
1L, 2L, TestData.ATTRIBUTES, 1L, 2.0, singletonList(value));
SummaryData data = ImmutableSummaryData.create(singletonList(pointData));
MetricData metric =
ImmutableMetricData.createDoubleSummary(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Summary name",
"Summary description",
"ms",
data);
assertSerialization(metric);
}
@Test
void verifyDoubleSum() {
MetricData metric = makeDoubleSum(TraceFlags.getDefault());
MetricData expected = makeDoubleSum(TraceFlags.getSampled());
assertSerializeDeserialize(metric, expected);
}
@Test
void verifyHistogram() {
MetricData metric = makeHistogram(TraceFlags.getDefault());
MetricData expected = makeHistogram(TraceFlags.getSampled());
assertSerializeDeserialize(metric, expected);
}
@Test
void verifyExponentialHistogram() {
MetricData metric = makeExponentialHistogram(TraceFlags.getDefault());
MetricData expected = makeExponentialHistogram(TraceFlags.getSampled());
assertSerializeDeserialize(metric, expected);
}
@NotNull
private static MetricData createLongGauge(TraceFlags flags) {
LongPointData pointData = makeLongPointData(flags);
GaugeData<LongPointData> data = ImmutableGaugeData.create(singletonList(pointData));
return ImmutableMetricData.createLongGauge(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Long gauge name",
"Long gauge description",
"ms",
data);
}
@NotNull
private static DoublePointData makeDoublePointData(TraceFlags flags) {
DoubleExemplarData doubleExemplarData = makeDoubleExemplar(flags);
return ImmutableDoublePointData.create(
1L, 2L, TestData.ATTRIBUTES, 1.0, singletonList(doubleExemplarData));
}
@NotNull
private static MetricData createDoubleGauge(TraceFlags flags) {
DoublePointData point = makeDoublePointData(flags);
GaugeData<DoublePointData> gaugeData = ImmutableGaugeData.create(singletonList(point));
MetricData metric =
ImmutableMetricData.createDoubleGauge(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Double gauge name",
"Double gauge description",
"ms",
gaugeData);
return metric;
}
@NotNull
private static DoubleExemplarData makeDoubleExemplar(TraceFlags flags) {
SpanContext expectedExemplarContext = createSpanContext(flags);
return ImmutableDoubleExemplarData.create(
TestData.ATTRIBUTES, 100L, expectedExemplarContext, 1.0);
}
@NotNull
private static MetricData makeLongSum(TraceFlags flags) {
LongPointData point = makeLongPointData(flags);
SumData<LongPointData> sumData =
ImmutableSumData.create(true, AggregationTemporality.DELTA, singletonList(point));
return ImmutableMetricData.createLongSum(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Long sum name",
"Long sum description",
"ms",
sumData);
}
@NotNull
private static LongPointData makeLongPointData(TraceFlags flags) {
LongExemplarData ex = makeLongExemplar(flags);
return ImmutableLongPointData.create(1L, 2L, TestData.ATTRIBUTES, 1L, singletonList(ex));
}
@NotNull
private static LongExemplarData makeLongExemplar(TraceFlags flags) {
SpanContext context = createSpanContext(flags);
return ImmutableLongExemplarData.create(TestData.ATTRIBUTES, 100L, context, 1L);
}
@NotNull
private static SpanContext createSpanContext(TraceFlags flags) {
return SpanContext.create(TestData.TRACE_ID, TestData.SPAN_ID, flags, TraceState.getDefault());
}
@NotNull
private static MetricData makeDoubleSum(TraceFlags flags) {
DoublePointData point = makeDoublePointData(flags);
SumData<DoublePointData> DOUBLE_SUM_DATA =
ImmutableSumData.create(true, AggregationTemporality.DELTA, singletonList(point));
return ImmutableMetricData.createDoubleSum(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Double sum name",
"Double sum description",
"ms",
DOUBLE_SUM_DATA);
}
@NotNull
private static MetricData makeExponentialHistogram(TraceFlags flags) {
ExponentialHistogramBuckets positiveBucket =
ImmutableExponentialHistogramBuckets.create(1, 10, Arrays.asList(1L, 10L));
ExponentialHistogramBuckets negativeBucket =
ImmutableExponentialHistogramBuckets.create(1, 0, Collections.emptyList());
DoubleExemplarData exemplar = makeDoubleExemplar(flags);
ExponentialHistogramPointData pointData =
ImmutableExponentialHistogramPointData.create(
1,
10.0,
1L,
true,
2.0,
true,
4.0,
positiveBucket,
negativeBucket,
1L,
2L,
TestData.ATTRIBUTES,
singletonList(exemplar));
ExponentialHistogramData data =
ImmutableExponentialHistogramData.create(
AggregationTemporality.CUMULATIVE, singletonList(pointData));
return ImmutableMetricData.createExponentialHistogram(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Exponential histogram name",
"Exponential histogram description",
"ms",
data);
}
@NotNull
private static MetricData makeHistogram(TraceFlags flags) {
DoubleExemplarData exemplar = makeDoubleExemplar(flags);
HistogramPointData pointData =
ImmutableHistogramPointData.create(
1L,
2L,
TestData.ATTRIBUTES,
15.0,
true,
4.0,
true,
7.0,
singletonList(10.0),
Arrays.asList(1L, 2L),
singletonList(exemplar));
HistogramData data =
ImmutableHistogramData.create(AggregationTemporality.CUMULATIVE, singletonList(pointData));
return ImmutableMetricData.createDoubleHistogram(
TestData.RESOURCE_FULL,
TestData.INSTRUMENTATION_SCOPE_INFO_FULL,
"Histogram name",
"Histogram description",
"ms",
data);
}
@Override

View File

@ -28,47 +28,50 @@ class SpanDataSerializerTest extends BaseSignalSerializerTest<SpanData> {
private static final LinkData LINK_DATA_WITH_TRACE_STATE =
LinkData.create(TestData.SPAN_CONTEXT_WITH_TRACE_STATE, TestData.ATTRIBUTES, 20);
private static final SpanData SPAN_DATA =
SpanDataImpl.builder()
.setResource(TestData.RESOURCE_FULL)
.setInstrumentationScopeInfo(TestData.INSTRUMENTATION_SCOPE_INFO_FULL)
.setName("Span name")
.setSpanContext(TestData.SPAN_CONTEXT)
.setParentSpanContext(TestData.PARENT_SPAN_CONTEXT)
.setAttributes(TestData.ATTRIBUTES)
.setStartEpochNanos(1L)
.setEndEpochNanos(2L)
.setKind(SpanKind.CLIENT)
.setStatus(StatusData.error())
.setEvents(Collections.singletonList(EVENT_DATA))
.setLinks(Arrays.asList(LINK_DATA, LINK_DATA_WITH_TRACE_STATE))
.setTotalAttributeCount(10)
.setTotalRecordedEvents(2)
.setTotalRecordedLinks(2)
.build();
private static final SpanData SPAN_DATA_WITH_TRACE_STATE =
SpanDataImpl.builder()
.setResource(TestData.RESOURCE_FULL)
.setInstrumentationScopeInfo(TestData.INSTRUMENTATION_SCOPE_INFO_FULL)
.setName("Span name2")
.setSpanContext(TestData.SPAN_CONTEXT_WITH_TRACE_STATE)
.setParentSpanContext(TestData.PARENT_SPAN_CONTEXT)
.setAttributes(TestData.ATTRIBUTES)
.setStartEpochNanos(1L)
.setEndEpochNanos(2L)
.setKind(SpanKind.CLIENT)
.setStatus(StatusData.error())
.setEvents(Collections.singletonList(EVENT_DATA))
.setLinks(Collections.singletonList(LINK_DATA))
.setTotalAttributeCount(10)
.setTotalRecordedEvents(2)
.setTotalRecordedLinks(2)
.build();
@Test
void verifySerialization_noFlagsNoState() {
SpanData span =
SpanDataImpl.builder()
.setResource(TestData.RESOURCE_FULL)
.setInstrumentationScopeInfo(TestData.INSTRUMENTATION_SCOPE_INFO_FULL)
.setName("Span name")
.setSpanContext(TestData.SPAN_CONTEXT)
.setParentSpanContext(TestData.PARENT_SPAN_CONTEXT)
.setAttributes(TestData.ATTRIBUTES)
.setStartEpochNanos(1L)
.setEndEpochNanos(2L)
.setKind(SpanKind.CLIENT)
.setStatus(StatusData.error())
.setEvents(Collections.singletonList(EVENT_DATA))
.setLinks(Arrays.asList(LINK_DATA, LINK_DATA_WITH_TRACE_STATE))
.setTotalAttributeCount(10)
.setTotalRecordedEvents(2)
.setTotalRecordedLinks(2)
.build();
assertSerialization(span);
}
@Test
void verifySerialization() {
assertSerialization(SPAN_DATA, SPAN_DATA_WITH_TRACE_STATE);
void verifySerialization_withTraceState() {
SpanData span =
SpanDataImpl.builder()
.setResource(TestData.RESOURCE_FULL)
.setInstrumentationScopeInfo(TestData.INSTRUMENTATION_SCOPE_INFO_FULL)
.setName("Span name2")
.setSpanContext(TestData.SPAN_CONTEXT_WITH_TRACE_STATE)
.setParentSpanContext(TestData.PARENT_SPAN_CONTEXT)
.setAttributes(TestData.ATTRIBUTES)
.setStartEpochNanos(1L)
.setEndEpochNanos(2L)
.setKind(SpanKind.CLIENT)
.setStatus(StatusData.error())
.setEvents(Collections.singletonList(EVENT_DATA))
.setLinks(Collections.singletonList(LINK_DATA))
.setTotalAttributeCount(10)
.setTotalRecordedEvents(2)
.setTotalRecordedLinks(2)
.build();
assertSerialization(span);
}
@Override

View File

@ -37,7 +37,7 @@ class FolderManagerTest {
@BeforeEach
void setUp() {
clock = mock();
folderManager = new FolderManager(rootDir, TestData.getDefaultConfiguration(), clock);
folderManager = new FolderManager(rootDir, TestData.getDefaultConfiguration(rootDir), clock);
}
@Test

View File

@ -8,6 +8,7 @@ package io.opentelemetry.contrib.disk.buffering.internal.storage;
import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider;
import io.opentelemetry.contrib.disk.buffering.internal.files.TemporaryFileProvider;
import java.io.File;
public final class TestData {
@ -17,12 +18,15 @@ public final class TestData {
public static final int MAX_FILE_SIZE = 100;
public static final int MAX_FOLDER_SIZE = 300;
public static StorageConfiguration getDefaultConfiguration() {
return getConfiguration(DefaultTemporaryFileProvider.getInstance());
public static StorageConfiguration getDefaultConfiguration(File rootDir) {
TemporaryFileProvider fileProvider = DefaultTemporaryFileProvider.getInstance();
return getConfiguration(fileProvider, rootDir);
}
public static StorageConfiguration getConfiguration(TemporaryFileProvider fileProvider) {
public static StorageConfiguration getConfiguration(
TemporaryFileProvider fileProvider, File rootDir) {
return StorageConfiguration.builder()
.setRootDir(rootDir)
.setMaxFileAgeForWriteMillis(MAX_FILE_AGE_FOR_WRITE_MILLIS)
.setMinFileAgeForReadMillis(MIN_FILE_AGE_FOR_READ_MILLIS)
.setMaxFileAgeForReadMillis(MAX_FILE_AGE_FOR_READ_MILLIS)

View File

@ -97,7 +97,7 @@ class ReadableFileTest {
clock = mock();
readableFile =
new ReadableFile(
source, CREATED_TIME_MILLIS, clock, getConfiguration(temporaryFileProvider));
source, CREATED_TIME_MILLIS, clock, getConfiguration(temporaryFileProvider, dir));
}
private static void addFileContents(File source) throws IOException {
@ -184,7 +184,7 @@ class ReadableFileTest {
ReadableFile emptyReadableFile =
new ReadableFile(
emptyFile, CREATED_TIME_MILLIS, clock, getConfiguration(temporaryFileProvider));
emptyFile, CREATED_TIME_MILLIS, clock, getConfiguration(temporaryFileProvider, dir));
assertEquals(ReadableResult.FAILED, emptyReadableFile.readAndProcess(bytes -> true));

View File

@ -43,7 +43,7 @@ class WritableFileTest {
new WritableFile(
new File(rootDir, String.valueOf(CREATED_TIME_MILLIS)),
CREATED_TIME_MILLIS,
TestData.getDefaultConfiguration(),
TestData.getDefaultConfiguration(rootDir),
clock);
}

View File

@ -36,5 +36,10 @@ public abstract class BaseSignalSerializerTest<SIGNAL_SDK_ITEM> {
assertThat(deserialize(serialized)).containsExactly(targets);
}
protected void assertSerializeDeserialize(SIGNAL_SDK_ITEM input, SIGNAL_SDK_ITEM expected) {
byte[] serialized = serialize(input);
assertThat(deserialize(serialized)).containsExactly(expected);
}
protected abstract SignalSerializer<SIGNAL_SDK_ITEM> getSerializer();
}

View File

@ -10,7 +10,17 @@ import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.GaugeData;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongExemplarData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collections;
import org.jetbrains.annotations.NotNull;
@SuppressWarnings("unchecked")
public final class TestData {
@ -38,7 +48,7 @@ public final class TestData {
Resource.create(Attributes.builder().put("resourceAttr", "resourceAttrValue").build());
public static final SpanContext SPAN_CONTEXT =
SpanContext.create(TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault());
SpanContext.create(TRACE_ID, SPAN_ID, TraceFlags.getDefault(), TraceState.getDefault());
public static final SpanContext SPAN_CONTEXT_WITH_TRACE_STATE =
SpanContext.create(
TRACE_ID,
@ -67,5 +77,59 @@ public final class TestData {
.build())
.build();
@NotNull
public static MetricData makeLongGauge(TraceFlags flags) {
return makeLongGauge(flags, RESOURCE_FULL, INSTRUMENTATION_SCOPE_INFO_FULL);
}
@NotNull
public static MetricData makeLongGauge(
TraceFlags flags, InstrumentationScopeInfo instrumentationScopeInfo) {
return makeLongGauge(flags, RESOURCE_FULL, instrumentationScopeInfo);
}
@NotNull
public static MetricData makeLongGauge(TraceFlags flags, Resource resource) {
return makeLongGauge(flags, resource, INSTRUMENTATION_SCOPE_INFO_FULL);
}
@NotNull
public static MetricData makeLongGauge(
TraceFlags flags, Resource resource, InstrumentationScopeInfo instrumentationScopeInfo) {
LongPointData point = makeLongPointData(flags);
GaugeData<LongPointData> gaugeData =
ImmutableGaugeData.create(Collections.singletonList(point));
return ImmutableMetricData.createLongGauge(
resource,
instrumentationScopeInfo,
"Long gauge name",
"Long gauge description",
"ms",
gaugeData);
}
@NotNull
public static LongPointData makeLongPointData(TraceFlags flags) {
LongExemplarData longExemplarData = makeLongExemplarData(flags);
return ImmutableLongPointData.create(
1L, 2L, ATTRIBUTES, 1L, Collections.singletonList(longExemplarData));
}
@NotNull
public static SpanContext makeContext(TraceFlags flags) {
return makeContext(flags, SPAN_ID);
}
@NotNull
public static SpanContext makeContext(TraceFlags flags, String spanId) {
return SpanContext.create(TRACE_ID, spanId, flags, TraceState.getDefault());
}
@NotNull
private static LongExemplarData makeLongExemplarData(TraceFlags flags) {
SpanContext context = makeContext(flags);
return ImmutableLongExemplarData.create(ATTRIBUTES, 100L, context, 1L);
}
private TestData() {}
}