From 3f7db8b7489fba21370b457bc2114078ead66ef7 Mon Sep 17 00:00:00 2001 From: jason plumb <75337021+breedx-splk@users.noreply.github.com> Date: Tue, 23 Jan 2024 13:44:05 -0800 Subject: [PATCH] [disk-buffering] - Single responsibility for disk exporters (#1161) --- disk-buffering/{CONTRIBUTING.md => DESIGN.md} | 30 +- disk-buffering/README.md | 12 +- disk-buffering/build.gradle.kts | 2 +- .../disk/buffering/LogRecordDiskExporter.java | 95 ----- .../buffering/LogRecordFromDiskExporter.java | 46 +++ .../buffering/LogRecordToDiskExporter.java | 68 ++++ .../disk/buffering/MetricDiskExporter.java | 100 ----- .../buffering/MetricFromDiskExporter.java | 46 +++ .../disk/buffering/MetricToDiskExporter.java | 86 ++++ .../disk/buffering/SpanDiskExporter.java | 93 ----- .../disk/buffering/SpanFromDiskExporter.java | 46 +++ .../disk/buffering/SpanToDiskExporter.java | 69 ++++ .../disk/buffering/StoredBatchExporter.java | 23 -- .../internal/StorageConfiguration.java | 14 +- .../internal/exporter/FromDiskExporter.java | 15 + .../exporter/FromDiskExporterBuilder.java | 81 ++++ .../FromDiskExporterImpl.java} | 55 ++- .../internal/exporter/ToDiskExporter.java | 57 +++ .../ToDiskExporterBuilder.java} | 59 +-- .../mapping/logs/LogRecordDataMapper.java | 5 +- .../mapping/metrics/MetricDataMapper.java | 10 +- .../mapping/spans/SpanDataMapper.java | 10 +- .../buffering/internal/storage/Storage.java | 8 +- .../internal/storage/StorageBuilder.java | 53 +++ ...est.java => FromDiskExporterImplTest.java} | 67 +--- .../disk/buffering/IntegrationTest.java | 98 ++++- .../buffering/LogRecordDiskExporterTest.java | 46 --- .../LogRecordToDiskExporterTest.java | 64 +++ .../buffering/MetricDiskExporterTest.java | 62 --- .../buffering/MetricToDiskExporterTest.java | 77 ++++ .../disk/buffering/SpanDiskExporterTest.java | 46 --- .../buffering/SpanFromDiskExporterTest.java | 149 +++++++ .../buffering/SpanToDiskExporterTest.java | 63 +++ .../exporter/ToDiskExporterBuilderTest.java | 32 ++ .../internal/exporter/ToDiskExporterTest.java | 93 +++++ .../mapping/metrics/MetricDataMapperTest.java | 378 +++++++++--------- .../metrics/ProtoMetricsDataMapperTest.java | 107 +++-- .../serializers/MetricDataSerializerTest.java | 365 ++++++++++------- .../serializers/SpanDataSerializerTest.java | 81 ++-- .../internal/storage/FolderManagerTest.java | 2 +- .../buffering/internal/storage/TestData.java | 10 +- .../storage/files/ReadableFileTest.java | 4 +- .../storage/files/WritableFileTest.java | 2 +- .../testutils/BaseSignalSerializerTest.java | 5 + .../disk/buffering/testutils/TestData.java | 66 ++- 45 files changed, 1823 insertions(+), 1077 deletions(-) rename disk-buffering/{CONTRIBUTING.md => DESIGN.md} (69%) delete mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java delete mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java delete mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java delete mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/StoredBatchExporter.java create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporter.java create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java rename disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/{exporters/DiskExporter.java => exporter/FromDiskExporterImpl.java} (53%) create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java rename disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/{exporters/DiskExporterBuilder.java => exporter/ToDiskExporterBuilder.java} (56%) create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java rename disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/{internal/exporters/DiskExporterTest.java => FromDiskExporterImplTest.java} (59%) delete mode 100644 disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporterTest.java create mode 100644 disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporterTest.java delete mode 100644 disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporterTest.java create mode 100644 disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporterTest.java delete mode 100644 disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporterTest.java create mode 100644 disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java create mode 100644 disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporterTest.java create mode 100644 disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java create mode 100644 disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java diff --git a/disk-buffering/CONTRIBUTING.md b/disk-buffering/DESIGN.md similarity index 69% rename from disk-buffering/CONTRIBUTING.md rename to disk-buffering/DESIGN.md index a53f488b..01f6048d 100644 --- a/disk-buffering/CONTRIBUTING.md +++ b/disk-buffering/DESIGN.md @@ -1,11 +1,25 @@ -# Contributor Guide +# Design Overview -Each one of the three exporters provided by this -tool ([LogRecordDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java), [MetricDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java) -and [SpanDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java)) -is responsible of performing 2 actions, `write` and `read/delegate`, the `write` one happens -automatically as a set of signals are provided from the processor, while the `read/delegate` one has -to be triggered manually by the consumer of this library as explained in the [README](README.md). +There are three main disk-writing exporters provided by this module: + +* [LogRecordToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java) +* [MetricToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java) +* [SpanToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java)) + +Each is responsible for writing a specific type of telemetry to disk storage for later +harvest/ingest. + +For later reading, there are: + +* [LogRecordFromToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java) +* [MetricFromDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java) +* [SpanFromDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java)) + +Each one of those has a `create()` method that takes a delegate exporter (to send data +to ingest) and the `StorageConfiguration` that tells them where to find buffered data. + +As explained in the [README](README.md), this has to be triggered manually by the consumer of +this library and does not happen automatically. ## Writing overview @@ -14,7 +28,7 @@ to be triggered manually by the consumer of this library as explained in the [RE * The writing process happens automatically within its `export(Collection signals)` method, which is called by the configured signal processor. * When a set of signals is received, these are delegated over to - the [DiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporter.java) + a type-specific wrapper of [ToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java) class which then serializes them using an implementation of [SignalSerializer](src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SignalSerializer.java) and then the serialized data is appended into a File using an instance of diff --git a/disk-buffering/README.md b/disk-buffering/README.md index 9684faa1..d4a5443b 100644 --- a/disk-buffering/README.md +++ b/disk-buffering/README.md @@ -1,6 +1,6 @@ # Disk buffering -This module provides signal exporter wrappers that intercept and store signals in files which can be +This module provides exporters that store telemetry data in files which can be sent later on demand. A high level description of how it works is that there are two separate processes in place, one for writing data in disk, and one for reading/exporting the previously stored data. @@ -8,11 +8,11 @@ stored data. * Each exporter stores the received data automatically in disk right after it's received from its processor. * The reading of the data back from disk and exporting process has to be done manually. At - the moment there's no automatic mechanism to do so. There's more information on it can be + the moment there's no automatic mechanism to do so. There's more information on how it can be achieved, under [Reading data](#reading-data). > For a more detailed information on how the whole process works, take a look at -> the [CONTRIBUTING](CONTRIBUTING.md) file. +> the [DESIGN.md](DESIGN.md) file. ## Configuration @@ -43,11 +43,11 @@ In order to use it, you need to wrap your own exporter with a new instance of the ones provided in here: * For a LogRecordExporter, it must be wrapped within - a [LogRecordDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java). + a [LogRecordToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java). * For a MetricExporter, it must be wrapped within - a [MetricDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java). + a [MetricToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java). * For a SpanExporter, it must be wrapped within - a [SpanDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java). + a [SpanToDiskExporter](src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java). Each wrapper will need the following when instantiating them: diff --git a/disk-buffering/build.gradle.kts b/disk-buffering/build.gradle.kts index bd871667..4ddf7846 100644 --- a/disk-buffering/build.gradle.kts +++ b/disk-buffering/build.gradle.kts @@ -53,7 +53,7 @@ wire { java {} sourcePath { - srcJar("io.opentelemetry.proto:opentelemetry-proto:0.20.0-alpha") + srcJar("io.opentelemetry.proto:opentelemetry-proto:1.1.0-alpha") } root( diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java deleted file mode 100644 index 3a9088bc..00000000 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporter.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering; - -import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; -import io.opentelemetry.contrib.disk.buffering.internal.exporters.DiskExporter; -import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; -import io.opentelemetry.sdk.common.Clock; -import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.logs.LogRecordProcessor; -import io.opentelemetry.sdk.logs.data.LogRecordData; -import io.opentelemetry.sdk.logs.export.LogRecordExporter; -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.TimeUnit; - -/** - * This is a {@link LogRecordExporter} wrapper that takes care of intercepting all the signals sent - * out to be exported, tries to store them in the disk in order to export them later. - * - *

In order to use it, you need to wrap your own {@link LogRecordExporter} with a new instance of - * this one, which will be the one you need to register in your {@link LogRecordProcessor}. - */ -public final class LogRecordDiskExporter implements LogRecordExporter, StoredBatchExporter { - private final LogRecordExporter wrapped; - private final DiskExporter diskExporter; - - /** - * Creates a new instance of {@link LogRecordDiskExporter}. - * - * @param wrapped - The exporter where the data retrieved from the disk will be delegated to. - * @param rootDir - The directory to create this signal's cache dir where all the data will be - * written into. - * @param configuration - How you want to manage the storage process. - * @throws IOException If no dir can be created in rootDir. - */ - public static LogRecordDiskExporter create( - LogRecordExporter wrapped, File rootDir, StorageConfiguration configuration) - throws IOException { - return create(wrapped, rootDir, configuration, Clock.getDefault()); - } - - // This is exposed for testing purposes. - static LogRecordDiskExporter create( - LogRecordExporter wrapped, File rootDir, StorageConfiguration configuration, Clock clock) - throws IOException { - DiskExporter diskExporter = - DiskExporter.builder() - .setSerializer(SignalSerializer.ofLogs()) - .setRootDir(rootDir) - .setFolderName("logs") - .setStorageConfiguration(configuration) - .setStorageClock(clock) - .setExportFunction(wrapped::export) - .build(); - return new LogRecordDiskExporter(wrapped, diskExporter); - } - - private LogRecordDiskExporter( - LogRecordExporter wrapped, DiskExporter diskExporter) { - this.wrapped = wrapped; - this.diskExporter = diskExporter; - } - - @Override - public CompletableResultCode export(Collection logs) { - return diskExporter.onExport(logs); - } - - @Override - public CompletableResultCode flush() { - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode shutdown() { - try { - diskExporter.onShutDown(); - } catch (IOException e) { - return CompletableResultCode.ofFailure(); - } finally { - wrapped.shutdown(); - } - return CompletableResultCode.ofSuccess(); - } - - @Override - public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException { - return diskExporter.exportStoredBatch(timeout, unit); - } -} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java new file mode 100644 index 00000000..fed891e6 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordFromDiskExporter.java @@ -0,0 +1,46 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering; + +import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.export.LogRecordExporter; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class LogRecordFromDiskExporter implements FromDiskExporter { + + private final FromDiskExporterImpl delegate; + + public static LogRecordFromDiskExporter create( + LogRecordExporter exporter, StorageConfiguration config) throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder() + .setFolderName("logs") + .setStorageConfiguration(config) + .setDeserializer(SignalSerializer.ofLogs()) + .setExportFunction(exporter::export) + .build(); + return new LogRecordFromDiskExporter(delegate); + } + + private LogRecordFromDiskExporter(FromDiskExporterImpl delegate) { + this.delegate = delegate; + } + + @Override + public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException { + return delegate.exportStoredBatch(timeout, unit); + } + + @Override + public void shutdown() throws IOException { + delegate.shutdown(); + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java new file mode 100644 index 00000000..48b4c4a5 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporter.java @@ -0,0 +1,68 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering; + +import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.export.LogRecordExporter; +import java.io.IOException; +import java.util.Collection; + +/** + * This class implements a {@link LogRecordExporter} that delegates to an instance of {@code + * ToDiskExporter}. + */ +public class LogRecordToDiskExporter implements LogRecordExporter { + private final ToDiskExporter delegate; + + /** + * Creates a new LogRecordToDiskExporter that will buffer LogRecordData telemetry on disk storage. + * + * @param delegate - The LogRecordExporter to delegate to if disk writing fails. + * @param config - The StorageConfiguration that specifies how storage is managed. + * @return A new LogRecordToDiskExporter instance. + * @throws IOException if the delegate ToDiskExporter could not be created. + */ + public static LogRecordToDiskExporter create( + LogRecordExporter delegate, StorageConfiguration config) throws IOException { + ToDiskExporter toDisk = + ToDiskExporter.builder() + .setFolderName("logs") + .setStorageConfiguration(config) + .setSerializer(SignalSerializer.ofLogs()) + .setExportFunction(delegate::export) + .build(); + return new LogRecordToDiskExporter(toDisk); + } + + // Visible for testing + LogRecordToDiskExporter(ToDiskExporter delegate) { + this.delegate = delegate; + } + + @Override + public CompletableResultCode export(Collection logs) { + return delegate.export(logs); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + try { + delegate.shutdown(); + return CompletableResultCode.ofSuccess(); + } catch (IOException e) { + return CompletableResultCode.ofFailure(); + } + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java deleted file mode 100644 index d61a5f39..00000000 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporter.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering; - -import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; -import io.opentelemetry.contrib.disk.buffering.internal.exporters.DiskExporter; -import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; -import io.opentelemetry.sdk.common.Clock; -import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.metrics.InstrumentType; -import io.opentelemetry.sdk.metrics.data.AggregationTemporality; -import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.export.MetricExporter; -import io.opentelemetry.sdk.metrics.export.MetricReader; -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.TimeUnit; - -/** - * This is a {@link MetricExporter} wrapper that takes care of intercepting all the signals sent out - * to be exported, tries to store them in the disk in order to export them later. - * - *

In order to use it, you need to wrap your own {@link MetricExporter} with a new instance of - * this one, which will be the one you need to register in your {@link MetricReader}. - */ -public final class MetricDiskExporter implements MetricExporter, StoredBatchExporter { - private final MetricExporter wrapped; - private final DiskExporter diskExporter; - - /** - * Creates a new instance of {@link MetricDiskExporter}. - * - * @param wrapped - The exporter where the data retrieved from the disk will be delegated to. - * @param rootDir - The directory to create this signal's cache dir where all the data will be - * written into. - * @param configuration - How you want to manage the storage process. - * @throws IOException If no dir can be created in rootDir. - */ - public static MetricDiskExporter create( - MetricExporter wrapped, File rootDir, StorageConfiguration configuration) throws IOException { - return create(wrapped, rootDir, configuration, Clock.getDefault()); - } - - // This is exposed for testing purposes. - public static MetricDiskExporter create( - MetricExporter wrapped, File rootDir, StorageConfiguration configuration, Clock clock) - throws IOException { - DiskExporter diskExporter = - DiskExporter.builder() - .setRootDir(rootDir) - .setFolderName("metrics") - .setStorageConfiguration(configuration) - .setSerializer(SignalSerializer.ofMetrics()) - .setExportFunction(wrapped::export) - .setStorageClock(clock) - .build(); - return new MetricDiskExporter(wrapped, diskExporter); - } - - private MetricDiskExporter(MetricExporter wrapped, DiskExporter diskExporter) { - this.wrapped = wrapped; - this.diskExporter = diskExporter; - } - - @Override - public CompletableResultCode export(Collection metrics) { - return diskExporter.onExport(metrics); - } - - @Override - public CompletableResultCode flush() { - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode shutdown() { - try { - diskExporter.onShutDown(); - } catch (IOException e) { - return CompletableResultCode.ofFailure(); - } finally { - wrapped.shutdown(); - } - return CompletableResultCode.ofSuccess(); - } - - @Override - public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) { - return wrapped.getAggregationTemporality(instrumentType); - } - - @Override - public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException { - return diskExporter.exportStoredBatch(timeout, unit); - } -} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java new file mode 100644 index 00000000..669d7805 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricFromDiskExporter.java @@ -0,0 +1,46 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering; + +import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class MetricFromDiskExporter implements FromDiskExporter { + + private final FromDiskExporterImpl delegate; + + public static MetricFromDiskExporter create(MetricExporter exporter, StorageConfiguration config) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder() + .setFolderName("metrics") + .setStorageConfiguration(config) + .setDeserializer(SignalSerializer.ofMetrics()) + .setExportFunction(exporter::export) + .build(); + return new MetricFromDiskExporter(delegate); + } + + private MetricFromDiskExporter(FromDiskExporterImpl delegate) { + this.delegate = delegate; + } + + @Override + public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException { + return delegate.exportStoredBatch(timeout, unit); + } + + @Override + public void shutdown() throws IOException { + delegate.shutdown(); + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java new file mode 100644 index 00000000..6620a334 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporter.java @@ -0,0 +1,86 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering; + +import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import java.io.IOException; +import java.util.Collection; +import java.util.function.Function; + +/** + * This class implements a {@link MetricExporter} that delegates to an instance of {@code + * ToDiskExporter}. + */ +public class MetricToDiskExporter implements MetricExporter { + + private final ToDiskExporter delegate; + private final Function typeToTemporality; + + /** + * Creates a new MetricToDiskExporter that will buffer Metric telemetry on disk storage. + * + * @param delegate - The MetricExporter to delegate to if disk writing fails. + * @param config - The StorageConfiguration that specifies how storage is managed. + * @param typeToTemporality - The function that maps an InstrumentType into an + * AggregationTemporality. + * @return A new MetricToDiskExporter instance. + * @throws IOException if the delegate ToDiskExporter could not be created. + */ + public static MetricToDiskExporter create( + MetricExporter delegate, + StorageConfiguration config, + Function typeToTemporality) + throws IOException { + ToDiskExporter toDisk = + ToDiskExporter.builder() + .setFolderName("metrics") + .setStorageConfiguration(config) + .setSerializer(SignalSerializer.ofMetrics()) + .setExportFunction(delegate::export) + .build(); + return new MetricToDiskExporter(toDisk, typeToTemporality); + } + + // VisibleForTesting + MetricToDiskExporter( + ToDiskExporter delegate, + Function typeToTemporality) { + this.delegate = delegate; + this.typeToTemporality = typeToTemporality; + } + + @Override + public CompletableResultCode export(Collection metrics) { + return delegate.export(metrics); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + try { + delegate.shutdown(); + } catch (IOException e) { + return CompletableResultCode.ofFailure(); + } + return CompletableResultCode.ofSuccess(); + } + + @Override + public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) { + return typeToTemporality.apply(instrumentType); + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java deleted file mode 100644 index 31e931e0..00000000 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporter.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering; - -import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; -import io.opentelemetry.contrib.disk.buffering.internal.exporters.DiskExporter; -import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; -import io.opentelemetry.sdk.common.Clock; -import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.trace.SpanProcessor; -import io.opentelemetry.sdk.trace.data.SpanData; -import io.opentelemetry.sdk.trace.export.SpanExporter; -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.TimeUnit; - -/** - * This is a {@link SpanExporter} wrapper that takes care of intercepting all the signals sent out - * to be exported, tries to store them in the disk in order to export them later. - * - *

In order to use it, you need to wrap your own {@link SpanExporter} with a new instance of this - * one, which will be the one you need to register in your {@link SpanProcessor}. - */ -public final class SpanDiskExporter implements SpanExporter, StoredBatchExporter { - private final SpanExporter wrapped; - private final DiskExporter diskExporter; - - /** - * Creates a new instance of {@link SpanDiskExporter}. - * - * @param wrapped - The exporter where the data retrieved from the disk will be delegated to. - * @param rootDir - The directory to create this signal's cache dir where all the data will be - * written into. - * @param configuration - How you want to manage the storage process. - * @throws IOException If no dir can be created in rootDir. - */ - public static SpanDiskExporter create( - SpanExporter wrapped, File rootDir, StorageConfiguration configuration) throws IOException { - return create(wrapped, rootDir, configuration, Clock.getDefault()); - } - - // This is exposed for testing purposes. - public static SpanDiskExporter create( - SpanExporter wrapped, File rootDir, StorageConfiguration configuration, Clock clock) - throws IOException { - DiskExporter diskExporter = - DiskExporter.builder() - .setRootDir(rootDir) - .setFolderName("spans") - .setStorageConfiguration(configuration) - .setSerializer(SignalSerializer.ofSpans()) - .setExportFunction(wrapped::export) - .setStorageClock(clock) - .build(); - return new SpanDiskExporter(wrapped, diskExporter); - } - - private SpanDiskExporter(SpanExporter wrapped, DiskExporter diskExporter) { - this.wrapped = wrapped; - this.diskExporter = diskExporter; - } - - @Override - public CompletableResultCode export(Collection spans) { - return diskExporter.onExport(spans); - } - - @Override - public CompletableResultCode shutdown() { - try { - diskExporter.onShutDown(); - } catch (IOException e) { - return CompletableResultCode.ofFailure(); - } finally { - wrapped.shutdown(); - } - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode flush() { - return CompletableResultCode.ofSuccess(); - } - - @Override - public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException { - return diskExporter.exportStoredBatch(timeout, unit); - } -} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java new file mode 100644 index 00000000..7f587aea --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporter.java @@ -0,0 +1,46 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering; + +import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporter; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public class SpanFromDiskExporter implements FromDiskExporter { + + private final FromDiskExporterImpl delegate; + + public static SpanFromDiskExporter create(SpanExporter exporter, StorageConfiguration config) + throws IOException { + FromDiskExporterImpl delegate = + FromDiskExporterImpl.builder() + .setFolderName("spans") + .setStorageConfiguration(config) + .setDeserializer(SignalSerializer.ofSpans()) + .setExportFunction(exporter::export) + .build(); + return new SpanFromDiskExporter(delegate); + } + + private SpanFromDiskExporter(FromDiskExporterImpl delegate) { + this.delegate = delegate; + } + + @Override + public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException { + return delegate.exportStoredBatch(timeout, unit); + } + + @Override + public void shutdown() throws IOException { + delegate.shutdown(); + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java new file mode 100644 index 00000000..bb1a2bb0 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporter.java @@ -0,0 +1,69 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering; + +import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.io.IOException; +import java.util.Collection; + +/** + * This class implements a SpanExporter that delegates to an instance of {@code + * ToDiskExporter}. + */ +public class SpanToDiskExporter implements SpanExporter { + + private final ToDiskExporter delegate; + + /** + * Creates a new SpanToDiskExporter that will buffer Span telemetry on disk storage. + * + * @param delegate - The SpanExporter to delegate to if disk writing fails. + * @param config - The StorageConfiguration that specifies how storage is managed. + * @return A new SpanToDiskExporter instance. + * @throws IOException if the delegate ToDiskExporter could not be created. + */ + public static SpanToDiskExporter create(SpanExporter delegate, StorageConfiguration config) + throws IOException { + ToDiskExporter toDisk = + ToDiskExporter.builder() + .setFolderName("spans") + .setStorageConfiguration(config) + .setSerializer(SignalSerializer.ofSpans()) + .setExportFunction(delegate::export) + .build(); + return new SpanToDiskExporter(toDisk); + } + + // Visible for testing + SpanToDiskExporter(ToDiskExporter delegate) { + this.delegate = delegate; + } + + @Override + public CompletableResultCode export(Collection spans) { + return delegate.export(spans); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + try { + delegate.shutdown(); + } catch (IOException e) { + return CompletableResultCode.ofFailure(); + } + return CompletableResultCode.ofSuccess(); + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/StoredBatchExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/StoredBatchExporter.java deleted file mode 100644 index ccaf2ad5..00000000 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/StoredBatchExporter.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -public interface StoredBatchExporter { - - /** - * Reads data from the disk and attempts to export it. - * - * @param timeout The amount of time to wait for the wrapped exporter to finish. - * @param unit The unit of the time provided. - * @return TRUE if there was data available and it was successfully exported within the timeout - * provided. FALSE if either of those conditions didn't meet. - * @throws IOException If an unexpected error happens. - */ - boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException; -} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/StorageConfiguration.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/StorageConfiguration.java index 3cd4a48b..1458f054 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/StorageConfiguration.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/StorageConfiguration.java @@ -8,11 +8,16 @@ package io.opentelemetry.contrib.disk.buffering.internal; import com.google.auto.value.AutoValue; import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider; import io.opentelemetry.contrib.disk.buffering.internal.files.TemporaryFileProvider; +import java.io.File; import java.util.concurrent.TimeUnit; /** Defines how the storage should be managed. */ @AutoValue public abstract class StorageConfiguration { + + /** The root storage location for buffered telemetry. */ + public abstract File getRootDir(); + /** The max amount of time a file can receive new data. */ public abstract long getMaxFileAgeForWriteMillis(); @@ -45,18 +50,19 @@ public abstract class StorageConfiguration { /** A creator of temporary files needed to do the disk reading process. */ public abstract TemporaryFileProvider getTemporaryFileProvider(); - public static StorageConfiguration getDefault() { - return builder().build(); + public static StorageConfiguration getDefault(File rootDir) { + return builder().setRootDir(rootDir).build(); } public static Builder builder() { + TemporaryFileProvider fileProvider = DefaultTemporaryFileProvider.getInstance(); return new AutoValue_StorageConfiguration.Builder() .setMaxFileSize(1024 * 1024) // 1MB .setMaxFolderSize(10 * 1024 * 1024) // 10MB .setMaxFileAgeForWriteMillis(TimeUnit.SECONDS.toMillis(30)) .setMinFileAgeForReadMillis(TimeUnit.SECONDS.toMillis(33)) .setMaxFileAgeForReadMillis(TimeUnit.HOURS.toMillis(18)) - .setTemporaryFileProvider(DefaultTemporaryFileProvider.getInstance()); + .setTemporaryFileProvider(fileProvider); } @AutoValue.Builder @@ -73,6 +79,8 @@ public abstract class StorageConfiguration { public abstract Builder setTemporaryFileProvider(TemporaryFileProvider value); + public abstract Builder setRootDir(File rootDir); + public abstract StorageConfiguration build(); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporter.java new file mode 100644 index 00000000..fdc3bb79 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporter.java @@ -0,0 +1,15 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.exporter; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +public interface FromDiskExporter { + boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException; + + void shutdown() throws IOException; +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java new file mode 100644 index 00000000..781f2a11 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterBuilder.java @@ -0,0 +1,81 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.exporter; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder; +import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; +import org.jetbrains.annotations.NotNull; + +public class FromDiskExporterBuilder { + + private SignalSerializer serializer = noopSerializer(); + private Function, CompletableResultCode> exportFunction = + x -> CompletableResultCode.ofFailure(); + + @NotNull + private static SignalSerializer noopSerializer() { + return new SignalSerializer() { + + @Override + public byte[] serialize(Collection ts) { + return new byte[0]; + } + + @Override + public List deserialize(byte[] source) { + return Collections.emptyList(); + } + }; + } + + private final StorageBuilder storageBuilder = Storage.builder(); + + @CanIgnoreReturnValue + public FromDiskExporterBuilder setFolderName(String folderName) { + storageBuilder.setFolderName(folderName); + return this; + } + + @CanIgnoreReturnValue + public FromDiskExporterBuilder setStorageConfiguration(StorageConfiguration configuration) { + storageBuilder.setStorageConfiguration(configuration); + return this; + } + + @CanIgnoreReturnValue + public FromDiskExporterBuilder setStorageClock(Clock clock) { + storageBuilder.setStorageClock(clock); + return this; + } + + @CanIgnoreReturnValue + public FromDiskExporterBuilder setDeserializer(SignalSerializer serializer) { + this.serializer = serializer; + return this; + } + + @CanIgnoreReturnValue + public FromDiskExporterBuilder setExportFunction( + Function, CompletableResultCode> exportFunction) { + this.exportFunction = exportFunction; + return this; + } + + public FromDiskExporterImpl build() throws IOException { + Storage storage = storageBuilder.build(); + return new FromDiskExporterImpl<>(serializer, exportFunction, storage); + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java similarity index 53% rename from disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporter.java rename to disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java index 6994318c..117a5e05 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporter.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java @@ -3,9 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.contrib.disk.buffering.internal.exporters; +package io.opentelemetry.contrib.disk.buffering.internal.exporter; -import io.opentelemetry.contrib.disk.buffering.StoredBatchExporter; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; @@ -17,25 +16,38 @@ import java.util.function.Function; import java.util.logging.Level; import java.util.logging.Logger; -public final class DiskExporter implements StoredBatchExporter { +/** + * Signal-type generic class that can read telemetry previously buffered on disk and send it to + * another delegated exporter. + */ +public final class FromDiskExporterImpl implements FromDiskExporter { private final Storage storage; - private final SignalSerializer serializer; + private final SignalSerializer deserializer; private final Function, CompletableResultCode> exportFunction; - private static final Logger logger = Logger.getLogger(DiskExporter.class.getName()); + private static final Logger logger = Logger.getLogger(FromDiskExporterImpl.class.getName()); - DiskExporter( - SignalSerializer serializer, + FromDiskExporterImpl( + SignalSerializer deserializer, Function, CompletableResultCode> exportFunction, Storage storage) { - this.serializer = serializer; + this.deserializer = deserializer; this.exportFunction = exportFunction; this.storage = storage; } - public static DiskExporterBuilder builder() { - return new DiskExporterBuilder(); + public static FromDiskExporterBuilder builder() { + return new FromDiskExporterBuilder<>(); } + /** + * Reads data from the disk and attempts to export it. + * + * @param timeout The amount of time to wait for the wrapped exporter to finish. + * @param unit The unit of the time provided. + * @return true if there was data available and it was successfully exported within the timeout + * provided. false otherwise. + * @throws IOException If an unexpected error happens. + */ @Override public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException { logger.log(Level.INFO, "Attempting to export batch from disk."); @@ -44,31 +56,14 @@ public final class DiskExporter implements StoredBatchExporter { bytes -> { logger.log(Level.INFO, "About to export stored batch."); CompletableResultCode join = - exportFunction.apply(serializer.deserialize(bytes)).join(timeout, unit); + exportFunction.apply(deserializer.deserialize(bytes)).join(timeout, unit); return join.isSuccess(); }); return result == ReadableResult.SUCCEEDED; } - public void onShutDown() throws IOException { + @Override + public void shutdown() throws IOException { storage.close(); } - - public CompletableResultCode onExport(Collection data) { - logger.log(Level.FINER, "Intercepting exporter batch."); - try { - if (storage.write(serializer.serialize(data))) { - return CompletableResultCode.ofSuccess(); - } else { - logger.log(Level.INFO, "Could not store batch in disk. Exporting it right away."); - return exportFunction.apply(data); - } - } catch (IOException e) { - logger.log( - Level.WARNING, - "An unexpected error happened while attempting to write the data in disk. Exporting it right away.", - e); - return exportFunction.apply(data); - } - } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java new file mode 100644 index 00000000..ca2c60f6 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporter.java @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.exporter; + +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.io.IOException; +import java.util.Collection; +import java.util.function.Function; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class ToDiskExporter { + + private static final Logger logger = Logger.getLogger(ToDiskExporter.class.getName()); + private final Storage storage; + private final SignalSerializer serializer; + private final Function, CompletableResultCode> exportFunction; + + ToDiskExporter( + SignalSerializer serializer, + Function, CompletableResultCode> exportFunction, + Storage storage) { + this.serializer = serializer; + this.exportFunction = exportFunction; + this.storage = storage; + } + + public static ToDiskExporterBuilder builder() { + return new ToDiskExporterBuilder<>(); + } + + public CompletableResultCode export(Collection data) { + logger.log(Level.FINER, "Intercepting exporter batch."); + try { + if (storage.write(serializer.serialize(data))) { + return CompletableResultCode.ofSuccess(); + } + logger.log(Level.INFO, "Could not store batch in disk. Exporting it right away."); + return exportFunction.apply(data); + } catch (IOException e) { + logger.log( + Level.WARNING, + "An unexpected error happened while attempting to write the data in disk. Exporting it right away.", + e); + return exportFunction.apply(data); + } + } + + public void shutdown() throws IOException { + storage.close(); + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporterBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java similarity index 56% rename from disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporterBuilder.java rename to disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java index 785b0ef0..75d920a6 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporterBuilder.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilder.java @@ -3,23 +3,22 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.contrib.disk.buffering.internal.exporters; +package io.opentelemetry.contrib.disk.buffering.internal.exporter; import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; -import io.opentelemetry.contrib.disk.buffering.internal.storage.FolderManager; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.storage.StorageBuilder; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; -import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.function.Function; -public final class DiskExporterBuilder { +public final class ToDiskExporterBuilder { private SignalSerializer serializer = new SignalSerializer() { @@ -34,71 +33,49 @@ public final class DiskExporterBuilder { return Collections.emptyList(); } }; - private File rootDir = new File("."); - private String folderName = "data"; - private StorageConfiguration configuration = StorageConfiguration.getDefault(); - private Clock clock = Clock.getDefault(); + + private final StorageBuilder storageBuilder = Storage.builder(); private Function, CompletableResultCode> exportFunction = x -> CompletableResultCode.ofFailure(); - DiskExporterBuilder() {} + ToDiskExporterBuilder() {} @CanIgnoreReturnValue - public DiskExporterBuilder setRootDir(File rootDir) { - this.rootDir = rootDir; + public ToDiskExporterBuilder setFolderName(String folderName) { + storageBuilder.setFolderName(folderName); return this; } @CanIgnoreReturnValue - public DiskExporterBuilder setFolderName(String folderName) { - this.folderName = folderName; + public ToDiskExporterBuilder setStorageConfiguration(StorageConfiguration configuration) { + validateConfiguration(configuration); + storageBuilder.setStorageConfiguration(configuration); return this; } @CanIgnoreReturnValue - public DiskExporterBuilder setStorageConfiguration(StorageConfiguration configuration) { - this.configuration = configuration; + public ToDiskExporterBuilder setStorageClock(Clock clock) { + storageBuilder.setStorageClock(clock); return this; } @CanIgnoreReturnValue - public DiskExporterBuilder setStorageClock(Clock clock) { - this.clock = clock; - return this; - } - - @CanIgnoreReturnValue - public DiskExporterBuilder setSerializer(SignalSerializer serializer) { + public ToDiskExporterBuilder setSerializer(SignalSerializer serializer) { this.serializer = serializer; return this; } @CanIgnoreReturnValue - public DiskExporterBuilder setExportFunction( + public ToDiskExporterBuilder setExportFunction( Function, CompletableResultCode> exportFunction) { this.exportFunction = exportFunction; return this; } - private static File getSignalFolder(File rootDir, String folderName) throws IOException { - File folder = new File(rootDir, folderName); - if (!folder.exists()) { - if (!folder.mkdirs()) { - throw new IOException( - "Could not create the signal folder: '" + folderName + "' inside: " + rootDir); - } - } - return folder; - } - - public DiskExporter build() throws IOException { - validateConfiguration(configuration); - - File folder = getSignalFolder(rootDir, folderName); - Storage storage = new Storage(new FolderManager(folder, configuration, clock)); - - return new DiskExporter<>(serializer, exportFunction, storage); + public ToDiskExporter build() throws IOException { + Storage storage = storageBuilder.build(); + return new ToDiskExporter<>(serializer, exportFunction, storage); } private static void validateConfiguration(StorageConfiguration configuration) { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/LogRecordDataMapper.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/LogRecordDataMapper.java index 92af6a38..16ce9e1a 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/LogRecordDataMapper.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/logs/LogRecordDataMapper.java @@ -5,10 +5,11 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs; +import static io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.SpanDataMapper.flagsFromInt; + import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.logs.Severity; import io.opentelemetry.api.trace.SpanContext; -import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.AttributesMapper; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.common.ByteStringMapper; @@ -89,7 +90,7 @@ public final class LogRecordDataMapper { SpanContext.create( ByteStringMapper.getInstance().protoToString(source.trace_id), ByteStringMapper.getInstance().protoToString(source.span_id), - TraceFlags.getSampled(), + flagsFromInt(source.flags), TraceState.getDefault())); target.setTotalAttributeCount(source.dropped_attributes_count + attributes.size()); target.setResource(resource); diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/MetricDataMapper.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/MetricDataMapper.java index 12df0d3e..1b9f2bf9 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/MetricDataMapper.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/MetricDataMapper.java @@ -62,6 +62,7 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableValueAtQuantile; import io.opentelemetry.sdk.resources.Resource; import java.util.ArrayList; import java.util.List; +import org.jetbrains.annotations.NotNull; public final class MetricDataMapper { @@ -555,23 +556,26 @@ public final class MetricDataMapper { exemplarListToDoubleExemplarDataList(source.exemplars)); } + @NotNull private static DoubleExemplarData exemplarToDoubleExemplarData(Exemplar source) { return ImmutableDoubleExemplarData.create( protoToAttributes(source.filtered_attributes), source.time_unix_nano, - createForExemplar(source), + createSpanContext(source), source.as_double); } + @NotNull private static LongExemplarData exemplarToLongExemplarData(Exemplar source) { return ImmutableLongExemplarData.create( protoToAttributes(source.filtered_attributes), source.time_unix_nano, - createForExemplar(source), + createSpanContext(source), source.as_int); } - private static SpanContext createForExemplar(Exemplar value) { + @NotNull + private static SpanContext createSpanContext(Exemplar value) { return SpanContext.create( ByteStringMapper.getInstance().protoToString(value.trace_id), ByteStringMapper.getInstance().protoToString(value.span_id), diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/SpanDataMapper.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/SpanDataMapper.java index 34f63757..633ab4ee 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/SpanDataMapper.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/spans/SpanDataMapper.java @@ -55,6 +55,7 @@ public final class SpanDataMapper { } span.trace_id(byteStringMapper.stringToProto(source.getTraceId())); span.span_id(byteStringMapper.stringToProto(source.getSpanId())); + span.flags(source.getSpanContext().getTraceFlags().asByte()); span.parent_span_id(byteStringMapper.stringToProto(source.getParentSpanId())); span.name(source.getName()); span.kind(mapSpanKindToProto(source.getKind())); @@ -107,7 +108,7 @@ public final class SpanDataMapper { SpanContext.create( traceId, ByteStringMapper.getInstance().protoToString(source.span_id), - TraceFlags.getSampled(), + flagsFromInt(source.flags), decodeTraceState(source.trace_state))); target.setParentSpanContext( SpanContext.create( @@ -257,7 +258,7 @@ public final class SpanDataMapper { SpanContext.create( ByteStringMapper.getInstance().protoToString(source.trace_id), ByteStringMapper.getInstance().protoToString(source.span_id), - TraceFlags.getSampled(), + flagsFromInt(source.flags), decodeTraceState(source.trace_state)); return LinkData.create(spanContext, attributes, totalAttrCount); } @@ -308,6 +309,7 @@ public final class SpanDataMapper { SpanContext spanContext = source.getSpanContext(); builder.trace_id(ByteStringMapper.getInstance().stringToProto(spanContext.getTraceId())); builder.span_id(ByteStringMapper.getInstance().stringToProto(spanContext.getSpanId())); + builder.flags = spanContext.getTraceFlags().asByte(); builder.attributes.addAll(attributesToProto(source.getAttributes())); builder.dropped_attributes_count( source.getTotalAttributeCount() - source.getAttributes().size()); @@ -315,4 +317,8 @@ public final class SpanDataMapper { return builder.build(); } + + public static TraceFlags flagsFromInt(int b) { + return TraceFlags.fromByte((byte) (b & 0x000000FF)); + } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java index f56f1f15..384dc720 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java @@ -16,16 +16,20 @@ import java.util.function.Function; import javax.annotation.Nullable; public final class Storage implements Closeable { + private static final int MAX_ATTEMPTS = 3; private final FolderManager folderManager; + private final AtomicBoolean isClosed = new AtomicBoolean(false); @Nullable private WritableFile writableFile; @Nullable private ReadableFile readableFile; - private static final int MAX_ATTEMPTS = 3; - private final AtomicBoolean isClosed = new AtomicBoolean(false); public Storage(FolderManager folderManager) { this.folderManager = folderManager; } + public static StorageBuilder builder() { + return new StorageBuilder(); + } + /** * Attempts to write an item into a writable file. * diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java new file mode 100644 index 00000000..ee5540ea --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageBuilder.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.storage; + +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; +import io.opentelemetry.sdk.common.Clock; +import java.io.File; +import java.io.IOException; + +public class StorageBuilder { + + private String folderName = "data"; + private StorageConfiguration configuration = StorageConfiguration.getDefault(new File(".")); + private Clock clock = Clock.getDefault(); + + StorageBuilder() {} + + @CanIgnoreReturnValue + public StorageBuilder setFolderName(String folderName) { + this.folderName = folderName; + return this; + } + + @CanIgnoreReturnValue + public StorageBuilder setStorageConfiguration(StorageConfiguration configuration) { + this.configuration = configuration; + return this; + } + + @CanIgnoreReturnValue + public StorageBuilder setStorageClock(Clock clock) { + this.clock = clock; + return this; + } + + public Storage build() throws IOException { + File folder = ensureSubdir(configuration.getRootDir(), folderName); + FolderManager folderManager = new FolderManager(folder, configuration, clock); + return new Storage(folderManager); + } + + private static File ensureSubdir(File rootDir, String child) throws IOException { + File subdir = new File(rootDir, child); + if (subdir.exists() || subdir.mkdirs()) { + return subdir; + } + throw new IOException("Could not create the subdir: '" + child + "' inside: " + rootDir); + } +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java similarity index 59% rename from disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporterTest.java rename to disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java index 23df6ffc..01815449 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporters/DiskExporterTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java @@ -3,20 +3,17 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.contrib.disk.buffering.internal.exporters; +package io.opentelemetry.contrib.disk.buffering; import static io.opentelemetry.contrib.disk.buffering.internal.storage.TestData.MIN_FILE_AGE_FOR_READ_MILLIS; import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData; import io.opentelemetry.sdk.common.Clock; @@ -34,11 +31,11 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @SuppressWarnings("unchecked") -class DiskExporterTest { +class FromDiskExporterImplTest { private SpanExporter wrapped; private SignalSerializer serializer; private Clock clock; - private DiskExporter exporter; + private FromDiskExporterImpl exporter; private final List deserializedData = Collections.emptyList(); @TempDir File rootDir; private static final String STORAGE_FOLDER_NAME = "testName"; @@ -46,14 +43,14 @@ class DiskExporterTest { @BeforeEach void setUp() throws IOException { clock = createClockMock(); + setUpSerializer(); wrapped = mock(); exporter = - DiskExporter.builder() - .setRootDir(rootDir) + FromDiskExporterImpl.builder() .setFolderName(STORAGE_FOLDER_NAME) - .setStorageConfiguration(TestData.getDefaultConfiguration()) - .setSerializer(serializer) + .setStorageConfiguration(TestData.getDefaultConfiguration(rootDir)) + .setDeserializer(serializer) .setExportFunction(wrapped::export) .setStorageClock(clock) .build(); @@ -70,30 +67,6 @@ class DiskExporterTest { assertThat(exporter.exportStoredBatch(1, TimeUnit.SECONDS)).isTrue(); } - @Test - void whenMinFileReadIsNotGraterThanMaxFileWrite_throwException() { - assertThatThrownBy( - () -> { - StorageConfiguration invalidConfig = - StorageConfiguration.builder() - .setMaxFileAgeForWriteMillis(2) - .setMinFileAgeForReadMillis(1) - .build(); - - DiskExporter.builder() - .setRootDir(rootDir) - .setFolderName(STORAGE_FOLDER_NAME) - .setStorageConfiguration(invalidConfig) - .setSerializer(serializer) - .setExportFunction(wrapped::export) - .setStorageClock(clock) - .build(); - }) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage( - "The configured max file age for writing must be lower than the configured min file age for reading"); - } - @Test void whenExportingStoredBatch_withAvailableData_andUnsuccessfullyProcessed_returnFalse() throws IOException { @@ -115,31 +88,9 @@ class DiskExporterTest { assertThat(new File(rootDir, STORAGE_FOLDER_NAME).exists()).isTrue(); } - @Test - void whenWritingSucceedsOnExport_returnSuccessfulResultCode() { - doReturn(new byte[2]).when(serializer).serialize(deserializedData); - - CompletableResultCode completableResultCode = exporter.onExport(deserializedData); - - assertThat(completableResultCode.isSuccess()).isTrue(); - verifyNoInteractions(wrapped); - } - - @Test - void whenWritingFailsOnExport_doExportRightAway() throws IOException { - doReturn(CompletableResultCode.ofSuccess()).when(wrapped).export(deserializedData); - exporter.onShutDown(); - - CompletableResultCode completableResultCode = exporter.onExport(deserializedData); - - assertThat(completableResultCode.isSuccess()).isTrue(); - verify(wrapped).export(deserializedData); - } - - private File createDummyFile() throws IOException { + private void createDummyFile() throws IOException { File file = new File(rootDir, STORAGE_FOLDER_NAME + "/" + 1000L); Files.write(file.toPath(), singletonList("First line")); - return file; } private void setUpSerializer() { diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java index 8fd4b746..1dce40d8 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/IntegrationTest.java @@ -17,76 +17,122 @@ import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterBuilder; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; +import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.SdkLoggerProvider; +import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.logs.export.LogRecordExporter; import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor; import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricExporter; import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Supplier; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; public class IntegrationTest { private InMemorySpanExporter memorySpanExporter; - private SpanDiskExporter diskSpanExporter; + private SpanToDiskExporter spanToDiskExporter; private Tracer tracer; private InMemoryMetricExporter memoryMetricExporter; - private MetricDiskExporter diskMetricExporter; + private MetricToDiskExporter metricToDiskExporter; private SdkMeterProvider meterProvider; private Meter meter; private InMemoryLogRecordExporter memoryLogRecordExporter; - private LogRecordDiskExporter diskLogRecordExporter; + private LogRecordToDiskExporter logToDiskExporter; private Logger logger; private Clock clock; @TempDir File rootDir; private static final long INITIAL_TIME_IN_MILLIS = 1000; - private static final StorageConfiguration STORAGE_CONFIGURATION = - StorageConfiguration.getDefault(); + private StorageConfiguration storageConfig; @BeforeEach void setUp() throws IOException { + storageConfig = StorageConfiguration.getDefault(rootDir); clock = mock(); + doReturn(MILLISECONDS.toNanos(INITIAL_TIME_IN_MILLIS)).when(clock).now(); // Setting up spans memorySpanExporter = InMemorySpanExporter.create(); - diskSpanExporter = - SpanDiskExporter.create(memorySpanExporter, rootDir, STORAGE_CONFIGURATION, clock); - tracer = createTracerProvider(diskSpanExporter).get("SpanInstrumentationScope"); + ToDiskExporter toDiskSpanExporter = + buildToDiskExporter(SignalSerializer.ofSpans(), memorySpanExporter::export); + spanToDiskExporter = new SpanToDiskExporter(toDiskSpanExporter); + tracer = createTracerProvider(spanToDiskExporter).get("SpanInstrumentationScope"); // Setting up metrics memoryMetricExporter = InMemoryMetricExporter.create(); - diskMetricExporter = - MetricDiskExporter.create(memoryMetricExporter, rootDir, STORAGE_CONFIGURATION, clock); - meterProvider = createMeterProvider(diskMetricExporter); + ToDiskExporter toDiskMetricExporter = + buildToDiskExporter(SignalSerializer.ofMetrics(), memoryMetricExporter::export); + metricToDiskExporter = + new MetricToDiskExporter( + toDiskMetricExporter, memoryMetricExporter::getAggregationTemporality); + meterProvider = createMeterProvider(metricToDiskExporter); meter = meterProvider.get("MetricInstrumentationScope"); // Setting up logs memoryLogRecordExporter = InMemoryLogRecordExporter.create(); - diskLogRecordExporter = - LogRecordDiskExporter.create( - memoryLogRecordExporter, rootDir, STORAGE_CONFIGURATION, clock); - logger = createLoggerProvider(diskLogRecordExporter).get("LogInstrumentationScope"); + ToDiskExporter toDiskLogExporter = + buildToDiskExporter(SignalSerializer.ofLogs(), memoryLogRecordExporter::export); + logToDiskExporter = new LogRecordToDiskExporter(toDiskLogExporter); + logger = createLoggerProvider(logToDiskExporter).get("LogInstrumentationScope"); + } + + @NotNull + private ToDiskExporter buildToDiskExporter( + SignalSerializer serializer, Function, CompletableResultCode> exporter) + throws IOException { + return ToDiskExporter.builder() + .setFolderName("spans") + .setStorageConfiguration(storageConfig) + .setSerializer(serializer) + .setExportFunction(exporter) + .setStorageClock(clock) + .build(); + } + + @NotNull + private FromDiskExporterImpl buildFromDiskExporter( + FromDiskExporterBuilder builder, + Function, CompletableResultCode> exportFunction, + SignalSerializer serializer) + throws IOException { + return builder + .setExportFunction(exportFunction) + .setFolderName("spans") + .setStorageConfiguration(storageConfig) + .setDeserializer(serializer) + .setStorageClock(clock) + .build(); } @Test void verifySpansIntegration() throws IOException { Span span = tracer.spanBuilder("Span name").startSpan(); span.end(); - - assertExporter(diskSpanExporter, () -> memorySpanExporter.getFinishedSpanItems().size()); + FromDiskExporterImpl fromDiskExporter = + buildFromDiskExporter( + FromDiskExporterImpl.builder(), memorySpanExporter::export, SignalSerializer.ofSpans()); + assertExporter(fromDiskExporter, () -> memorySpanExporter.getFinishedSpanItems().size()); } @Test @@ -94,24 +140,34 @@ public class IntegrationTest { meter.counterBuilder("Counter").build().add(2); meterProvider.forceFlush(); - assertExporter(diskMetricExporter, () -> memoryMetricExporter.getFinishedMetricItems().size()); + FromDiskExporterImpl fromDiskExporter = + buildFromDiskExporter( + FromDiskExporterImpl.builder(), + memoryMetricExporter::export, + SignalSerializer.ofMetrics()); + assertExporter(fromDiskExporter, () -> memoryMetricExporter.getFinishedMetricItems().size()); } @Test void verifyLogRecordsIntegration() throws IOException { logger.logRecordBuilder().setBody("I'm a log!").emit(); + FromDiskExporterImpl fromDiskExporter = + buildFromDiskExporter( + FromDiskExporterImpl.builder(), + memoryLogRecordExporter::export, + SignalSerializer.ofLogs()); assertExporter( - diskLogRecordExporter, () -> memoryLogRecordExporter.getFinishedLogRecordItems().size()); + fromDiskExporter, () -> memoryLogRecordExporter.getFinishedLogRecordItems().size()); } - private void assertExporter(StoredBatchExporter exporter, Supplier finishedItems) + private void assertExporter(FromDiskExporterImpl exporter, Supplier finishedItems) throws IOException { // Verify no data has been received in the original exporter until this point. assertEquals(0, finishedItems.get()); // Go to the future when we can read the stored items. - fastForwardTimeByMillis(STORAGE_CONFIGURATION.getMinFileAgeForReadMillis()); + fastForwardTimeByMillis(storageConfig.getMinFileAgeForReadMillis()); // Read and send stored data. assertTrue(exporter.exportStoredBatch(1, TimeUnit.SECONDS)); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporterTest.java deleted file mode 100644 index b7a4003a..00000000 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/LogRecordDiskExporterTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; - -import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; -import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData; -import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.logs.export.LogRecordExporter; -import java.io.File; -import java.io.IOException; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -class LogRecordDiskExporterTest { - private LogRecordExporter wrapped; - private LogRecordDiskExporter exporter; - private static final StorageConfiguration STORAGE_CONFIGURATION = - TestData.getDefaultConfiguration(); - private static final String STORAGE_FOLDER_NAME = "logs"; - @TempDir File rootDir; - - @BeforeEach - void setUp() throws IOException { - wrapped = mock(); - exporter = LogRecordDiskExporter.create(wrapped, rootDir, STORAGE_CONFIGURATION); - } - - @Test - void verifyCacheFolderName() { - File[] files = rootDir.listFiles(); - assertEquals(1, files.length); - assertEquals(STORAGE_FOLDER_NAME, files[0].getName()); - } - - @Test - void onFlush_returnSuccess() { - assertEquals(CompletableResultCode.ofSuccess(), exporter.flush()); - } -} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporterTest.java new file mode 100644 index 00000000..6409cf06 --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/LogRecordToDiskExporterTest.java @@ -0,0 +1,64 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class LogRecordToDiskExporterTest { + + @Mock private ToDiskExporter delegate; + + @Test + void delegateShutdown_success() throws IOException { + LogRecordToDiskExporter testClass = new LogRecordToDiskExporter(delegate); + CompletableResultCode result = testClass.shutdown(); + assertThat(result.isSuccess()).isTrue(); + verify(delegate).shutdown(); + } + + @Test + void delegateShutdown_fail() throws IOException { + doThrow(new IOException("boom")).when(delegate).shutdown(); + LogRecordToDiskExporter testClass = new LogRecordToDiskExporter(delegate); + CompletableResultCode result = testClass.shutdown(); + assertThat(result.isSuccess()).isFalse(); + verify(delegate).shutdown(); + } + + @Test + void delegateExport() { + LogRecordData log1 = mock(); + LogRecordData log2 = mock(); + List logRecords = Arrays.asList(log1, log2); + + LogRecordToDiskExporter testClass = new LogRecordToDiskExporter(delegate); + testClass.export(logRecords); + + verify(delegate).export(logRecords); + } + + @Test + void flushReturnsSuccess() { + LogRecordToDiskExporter testClass = new LogRecordToDiskExporter(delegate); + CompletableResultCode result = testClass.flush(); + assertThat(result.isSuccess()).isTrue(); + } +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporterTest.java deleted file mode 100644 index f7804343..00000000 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/MetricDiskExporterTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - -import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; -import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData; -import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.metrics.InstrumentType; -import io.opentelemetry.sdk.metrics.data.AggregationTemporality; -import io.opentelemetry.sdk.metrics.export.MetricExporter; -import java.io.File; -import java.io.IOException; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -class MetricDiskExporterTest { - - private MetricExporter wrapped; - private MetricDiskExporter exporter; - private static final StorageConfiguration STORAGE_CONFIGURATION = - TestData.getDefaultConfiguration(); - private static final String STORAGE_FOLDER_NAME = "metrics"; - @TempDir File rootDir; - - @BeforeEach - void setUp() throws IOException { - wrapped = mock(); - exporter = MetricDiskExporter.create(wrapped, rootDir, STORAGE_CONFIGURATION); - } - - @Test - void verifyCacheFolderName() { - File[] files = rootDir.listFiles(); - assertEquals(1, files.length); - assertEquals(STORAGE_FOLDER_NAME, files[0].getName()); - } - - @Test - void onFlush_returnSuccess() { - assertEquals(CompletableResultCode.ofSuccess(), exporter.flush()); - } - - @Test - void provideWrappedAggregationTemporality() { - InstrumentType instrumentType = mock(); - AggregationTemporality aggregationTemporality = AggregationTemporality.DELTA; - doReturn(aggregationTemporality).when(wrapped).getAggregationTemporality(instrumentType); - - assertEquals(aggregationTemporality, exporter.getAggregationTemporality(instrumentType)); - - verify(wrapped).getAggregationTemporality(instrumentType); - } -} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporterTest.java new file mode 100644 index 00000000..9ba84f67 --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/MetricToDiskExporterTest.java @@ -0,0 +1,77 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering; + +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class MetricToDiskExporterTest { + + @Mock private ToDiskExporter delegate; + + @Test + void delegateShutdown_success() throws IOException { + MetricToDiskExporter testClass = + new MetricToDiskExporter(delegate, MetricToDiskExporterTest::temporalityFn); + CompletableResultCode result = testClass.shutdown(); + assertThat(result.isSuccess()).isTrue(); + verify(delegate).shutdown(); + } + + private static AggregationTemporality temporalityFn(InstrumentType instrumentType) { + return CUMULATIVE; + } + + @Test + void delegateShutdown_fail() throws IOException { + doThrow(new IOException("boom")).when(delegate).shutdown(); + MetricToDiskExporter testClass = + new MetricToDiskExporter(delegate, MetricToDiskExporterTest::temporalityFn); + CompletableResultCode result = testClass.shutdown(); + assertThat(result.isSuccess()).isFalse(); + verify(delegate).shutdown(); + } + + @Test + void delegateExport() { + MetricData metric1 = mock(); + MetricData metric2 = mock(); + List metrics = Arrays.asList(metric1, metric2); + + MetricToDiskExporter testClass = + new MetricToDiskExporter(delegate, MetricToDiskExporterTest::temporalityFn); + + testClass.export(metrics); + + verify(delegate).export(metrics); + } + + @Test + void flushReturnsSuccess() { + MetricToDiskExporter testClass = + new MetricToDiskExporter(delegate, MetricToDiskExporterTest::temporalityFn); + + CompletableResultCode result = testClass.flush(); + assertThat(result.isSuccess()).isTrue(); + } +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporterTest.java deleted file mode 100644 index 34356d94..00000000 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanDiskExporterTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.contrib.disk.buffering; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; - -import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; -import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData; -import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.trace.export.SpanExporter; -import java.io.File; -import java.io.IOException; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -class SpanDiskExporterTest { - private SpanExporter wrapped; - private SpanDiskExporter exporter; - private static final StorageConfiguration STORAGE_CONFIGURATION = - TestData.getDefaultConfiguration(); - private static final String STORAGE_FOLDER_NAME = "spans"; - @TempDir File rootDir; - - @BeforeEach - void setUp() throws IOException { - wrapped = mock(); - exporter = SpanDiskExporter.create(wrapped, rootDir, STORAGE_CONFIGURATION); - } - - @Test - void verifyCacheFolderName() { - File[] files = rootDir.listFiles(); - assertEquals(1, files.length); - assertEquals(STORAGE_FOLDER_NAME, files[0].getName()); - } - - @Test - void onFlush_returnSuccess() { - assertEquals(CompletableResultCode.ofSuccess(), exporter.flush()); - } -} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java new file mode 100644 index 00000000..51189492 --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanFromDiskExporterTest.java @@ -0,0 +1,149 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; +import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.spans.models.SpanDataImpl; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.testutils.TestData; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.ArgumentCaptor; + +class SpanFromDiskExporterTest { + + @TempDir File tempDir; + + @SuppressWarnings("unchecked") + @Test + void fromDisk() throws Exception { + StorageConfiguration config = + StorageConfiguration.builder() + .setRootDir(tempDir) + .setMaxFileAgeForWriteMillis(TimeUnit.HOURS.toMillis(24)) + .setMinFileAgeForReadMillis(0) + .setMaxFileAgeForReadMillis(TimeUnit.HOURS.toMillis(24)) + .setTemporaryFileProvider(DefaultTemporaryFileProvider.getInstance()) + .build(); + + List spans = writeSomeSpans(config); + + SpanExporter exporter = mock(); + ArgumentCaptor> capture = ArgumentCaptor.forClass(Collection.class); + when(exporter.export(capture.capture())).thenReturn(CompletableResultCode.ofSuccess()); + + SpanFromDiskExporter testClass = SpanFromDiskExporter.create(exporter, config); + boolean result = testClass.exportStoredBatch(30, TimeUnit.SECONDS); + assertThat(result).isTrue(); + List exportedSpans = (List) capture.getValue(); + + long now = spans.get(0).getStartEpochNanos(); + SpanData expected1 = makeSpan1(TraceFlags.getSampled(), now); + SpanData expected2 = makeSpan2(TraceFlags.getSampled(), now); + + assertThat(exportedSpans.get(0)).isEqualTo(expected1); + assertThat(exportedSpans.get(1)).isEqualTo(expected2); + assertThat(exportedSpans).containsExactly(expected1, expected2); + + verify(exporter).export(eq(Arrays.asList(expected1, expected2))); + } + + private static List writeSomeSpans(StorageConfiguration config) throws Exception { + long now = System.currentTimeMillis() * 1_000_000; + SpanData span1 = makeSpan1(TraceFlags.getDefault(), now); + SpanData span2 = makeSpan2(TraceFlags.getSampled(), now); + List spans = Arrays.asList(span1, span2); + + SignalSerializer serializer = SignalSerializer.ofSpans(); + File subdir = new File(config.getRootDir(), "spans"); + assertTrue(subdir.mkdir()); + + Storage storage = + Storage.builder().setStorageConfiguration(config).setFolderName("spans").build(); + storage.write(serializer.serialize(spans)); + storage.close(); + return spans; + } + + private static SpanData makeSpan1(TraceFlags parentSpanContextFlags, long now) { + Attributes attributes = Attributes.of(AttributeKey.stringKey("foo"), "bar"); + SpanContext parentContext = TestData.makeContext(parentSpanContextFlags, TestData.SPAN_ID); + return SpanDataImpl.builder() + .setName("span1") + .setSpanContext( + SpanContext.create( + TestData.TRACE_ID, + TestData.SPAN_ID, + TraceFlags.getDefault(), + TraceState.getDefault())) + .setParentSpanContext(parentContext) + .setInstrumentationScopeInfo(TestData.INSTRUMENTATION_SCOPE_INFO_FULL) + .setStatus(StatusData.create(StatusCode.OK, "whatever")) + .setAttributes(attributes) + .setKind(SpanKind.SERVER) + .setStartEpochNanos(now) + .setEndEpochNanos(now + 50_000_000) + .setTotalRecordedEvents(0) + .setTotalRecordedLinks(0) + .setTotalAttributeCount(attributes.size()) + .setLinks(Collections.emptyList()) + .setEvents(Collections.emptyList()) + .setResource(Resource.getDefault()) + .build(); + } + + private static SpanData makeSpan2(TraceFlags parentSpanContextFlags, long now) { + Attributes attributes = Attributes.of(AttributeKey.stringKey("bar"), "baz"); + String spanId = "aaaaaaaaa12312312"; + SpanContext parentContext = TestData.makeContext(parentSpanContextFlags, spanId); + return SpanDataImpl.builder() + .setName("span2") + .setSpanContext( + SpanContext.create( + TestData.TRACE_ID, spanId, TraceFlags.getSampled(), TraceState.getDefault())) + .setParentSpanContext(parentContext) + .setInstrumentationScopeInfo(TestData.INSTRUMENTATION_SCOPE_INFO_FULL) + .setStatus(StatusData.create(StatusCode.OK, "excellent")) + .setAttributes(attributes) + .setKind(SpanKind.CLIENT) + .setStartEpochNanos(now + 12) + .setEndEpochNanos(now + 12 + 40_000_000) + .setTotalRecordedEvents(0) + .setTotalRecordedLinks(0) + .setTotalAttributeCount(attributes.size()) + .setLinks(Collections.emptyList()) + .setEvents(Collections.emptyList()) + .setResource(Resource.getDefault()) + .build(); + } +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporterTest.java new file mode 100644 index 00000000..96dcfcaa --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/SpanToDiskExporterTest.java @@ -0,0 +1,63 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.opentelemetry.contrib.disk.buffering.internal.exporter.ToDiskExporter; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class SpanToDiskExporterTest { + @Mock private ToDiskExporter delegate; + + @Test + void delegateShutdown_success() throws IOException { + SpanToDiskExporter testClass = new SpanToDiskExporter(delegate); + CompletableResultCode result = testClass.shutdown(); + assertThat(result.isSuccess()).isTrue(); + verify(delegate).shutdown(); + } + + @Test + void delegateShutdown_fail() throws IOException { + doThrow(new IOException("boom")).when(delegate).shutdown(); + SpanToDiskExporter testClass = new SpanToDiskExporter(delegate); + CompletableResultCode result = testClass.shutdown(); + assertThat(result.isSuccess()).isFalse(); + verify(delegate).shutdown(); + } + + @Test + void delegateExport() { + SpanData span1 = mock(); + SpanData span2 = mock(); + List spans = Arrays.asList(span1, span2); + + SpanToDiskExporter testClass = new SpanToDiskExporter(delegate); + testClass.export(spans); + + verify(delegate).export(spans); + } + + @Test + void flushReturnsSuccess() { + SpanToDiskExporter testClass = new SpanToDiskExporter(delegate); + CompletableResultCode result = testClass.flush(); + assertThat(result.isSuccess()).isTrue(); + } +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java new file mode 100644 index 00000000..050fa65a --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterBuilderTest.java @@ -0,0 +1,32 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.exporter; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.io.File; +import org.junit.jupiter.api.Test; + +class ToDiskExporterBuilderTest { + + @Test + void whenMinFileReadIsNotGraterThanMaxFileWrite_throwException() { + StorageConfiguration invalidConfig = + StorageConfiguration.builder() + .setMaxFileAgeForWriteMillis(2) + .setMinFileAgeForReadMillis(1) + .setRootDir(new File(".")) + .build(); + + assertThatThrownBy( + () -> ToDiskExporter.builder().setStorageConfiguration(invalidConfig)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "The configured max file age for writing must be lower than the configured min file age for reading"); + } +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java new file mode 100644 index 00000000..f7b6e3ff --- /dev/null +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/ToDiskExporterTest.java @@ -0,0 +1,93 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.exporter; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.sdk.common.CompletableResultCode; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class ToDiskExporterTest { + + private final List records = Arrays.asList("one", "two", "three"); + + private final byte[] serialized = "one,two,three".getBytes(UTF_8); + + @Mock private SignalSerializer serializer; + + @Mock private Storage storage; + private ToDiskExporter toDiskExporter; + private Function, CompletableResultCode> exportFn; + private Collection exportedFnSeen; + private AtomicReference exportFnResultToReturn; + + @BeforeEach + void setup() { + exportedFnSeen = null; + exportFnResultToReturn = new AtomicReference<>(null); + exportFn = + (Collection x) -> { + exportedFnSeen = x; + return exportFnResultToReturn.get(); + }; + toDiskExporter = new ToDiskExporter<>(serializer, exportFn, storage); + when(serializer.serialize(records)).thenReturn(serialized); + } + + @Test + void whenWritingSucceedsOnExport_returnSuccessfulResultCode() throws Exception { + when(storage.write(serialized)).thenReturn(true); + CompletableResultCode completableResultCode = toDiskExporter.export(records); + assertThat(completableResultCode.isSuccess()).isTrue(); + verify(storage).write(serialized); + assertThat(exportedFnSeen).isNull(); + } + + @Test + void whenWritingFailsOnExport_doExportRightAway() throws Exception { + when(storage.write(serialized)).thenReturn(false); + exportFnResultToReturn.set(CompletableResultCode.ofSuccess()); + + CompletableResultCode completableResultCode = toDiskExporter.export(records); + + assertThat(completableResultCode.isSuccess()).isTrue(); + assertThat(exportedFnSeen).isEqualTo(records); + } + + @Test + void whenExceptionInWrite_doExportRightAway() throws Exception { + when(storage.write(serialized)).thenThrow(new IOException("boom")); + exportFnResultToReturn.set(CompletableResultCode.ofFailure()); + + CompletableResultCode completableResultCode = toDiskExporter.export(records); + + assertThat(completableResultCode.isSuccess()).isFalse(); + assertThat(exportedFnSeen).isEqualTo(records); + } + + @Test + void shutdownClosesStorage() throws Exception { + toDiskExporter.export(records); + toDiskExporter.shutdown(); + verify(storage).close(); + } +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/MetricDataMapperTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/MetricDataMapperTest.java index 1a03f737..b4f7f64d 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/MetricDataMapperTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/MetricDataMapperTest.java @@ -5,8 +5,14 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.metrics; +import static io.opentelemetry.contrib.disk.buffering.testutils.TestData.makeContext; +import static io.opentelemetry.contrib.disk.buffering.testutils.TestData.makeLongGauge; +import static io.opentelemetry.contrib.disk.buffering.testutils.TestData.makeLongPointData; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; import io.opentelemetry.proto.metrics.v1.Metric; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; @@ -16,10 +22,8 @@ import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.ExponentialHistogramBuckets; import io.opentelemetry.sdk.metrics.data.ExponentialHistogramData; import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; -import io.opentelemetry.sdk.metrics.data.GaugeData; import io.opentelemetry.sdk.metrics.data.HistogramData; import io.opentelemetry.sdk.metrics.data.HistogramPointData; -import io.opentelemetry.sdk.metrics.data.LongExemplarData; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.SumData; @@ -34,8 +38,6 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramP import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData; -import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongExemplarData; -import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData; @@ -44,224 +46,234 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableValueAtQuantile; import io.opentelemetry.sdk.resources.Resource; import java.util.Arrays; import java.util.Collections; +import java.util.List; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; class MetricDataMapperTest { - private static final LongExemplarData LONG_EXEMPLAR_DATA = - ImmutableLongExemplarData.create(TestData.ATTRIBUTES, 100L, TestData.SPAN_CONTEXT, 1L); - - private static final DoubleExemplarData DOUBLE_EXEMPLAR_DATA = - ImmutableDoubleExemplarData.create(TestData.ATTRIBUTES, 100L, TestData.SPAN_CONTEXT, 1.0); - private static final LongPointData LONG_POINT_DATA = - ImmutableLongPointData.create( - 1L, 2L, TestData.ATTRIBUTES, 1L, Collections.singletonList(LONG_EXEMPLAR_DATA)); - - private static final DoublePointData DOUBLE_POINT_DATA = - ImmutableDoublePointData.create( - 1L, 2L, TestData.ATTRIBUTES, 1.0, Collections.singletonList(DOUBLE_EXEMPLAR_DATA)); - - private static final GaugeData LONG_GAUGE_DATA = - ImmutableGaugeData.create(Collections.singletonList(LONG_POINT_DATA)); - - private static final GaugeData DOUBLE_GAUGE_DATA = - ImmutableGaugeData.create(Collections.singletonList(DOUBLE_POINT_DATA)); - - private static final SumData LONG_SUM_DATA = - ImmutableSumData.create( - true, AggregationTemporality.DELTA, Collections.singletonList(LONG_POINT_DATA)); - - private static final SumData DOUBLE_SUM_DATA = - ImmutableSumData.create( - true, AggregationTemporality.DELTA, Collections.singletonList(DOUBLE_POINT_DATA)); - - private static final ValueAtQuantile VALUE_AT_QUANTILE = - ImmutableValueAtQuantile.create(2.0, 1.0); - private static final SummaryPointData SUMMARY_POINT_DATA = - ImmutableSummaryPointData.create( - 1L, 2L, TestData.ATTRIBUTES, 1L, 2.0, Collections.singletonList(VALUE_AT_QUANTILE)); - - private static final SummaryData SUMMARY_DATA = - ImmutableSummaryData.create(Collections.singletonList(SUMMARY_POINT_DATA)); - - private static final HistogramPointData HISTOGRAM_POINT_DATA = - ImmutableHistogramPointData.create( - 1L, - 2L, - TestData.ATTRIBUTES, - 15.0, - true, - 4.0, - true, - 7.0, - Collections.singletonList(10.0), - Arrays.asList(1L, 2L), - Collections.singletonList(DOUBLE_EXEMPLAR_DATA)); - private static final ExponentialHistogramBuckets POSITIVE_BUCKET = - ImmutableExponentialHistogramBuckets.create(1, 10, Arrays.asList(1L, 10L)); - - private static final ExponentialHistogramBuckets NEGATIVE_BUCKET = - ImmutableExponentialHistogramBuckets.create(1, 0, Collections.emptyList()); - - private static final ExponentialHistogramPointData EXPONENTIAL_HISTOGRAM_POINT_DATA = - ImmutableExponentialHistogramPointData.create( - 1, - 10.0, - 1L, - true, - 2.0, - true, - 4.0, - POSITIVE_BUCKET, - NEGATIVE_BUCKET, - 1L, - 2L, - TestData.ATTRIBUTES, - Collections.singletonList(DOUBLE_EXEMPLAR_DATA)); - private static final HistogramData HISTOGRAM_DATA = - ImmutableHistogramData.create( - AggregationTemporality.CUMULATIVE, Collections.singletonList(HISTOGRAM_POINT_DATA)); - - private static final ExponentialHistogramData EXPONENTIAL_HISTOGRAM_DATA = - ImmutableExponentialHistogramData.create( - AggregationTemporality.CUMULATIVE, - Collections.singletonList(EXPONENTIAL_HISTOGRAM_POINT_DATA)); - - private static final MetricData LONG_GAUGE_METRIC = - ImmutableMetricData.createLongGauge( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Long gauge name", - "Long gauge description", - "ms", - LONG_GAUGE_DATA); - - private static final MetricData DOUBLE_GAUGE_METRIC = - ImmutableMetricData.createDoubleGauge( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Double gauge name", - "Double gauge description", - "ms", - DOUBLE_GAUGE_DATA); - private static final MetricData LONG_SUM_METRIC = - ImmutableMetricData.createLongSum( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Long sum name", - "Long sum description", - "ms", - LONG_SUM_DATA); - private static final MetricData DOUBLE_SUM_METRIC = - ImmutableMetricData.createDoubleSum( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Double sum name", - "Double sum description", - "ms", - DOUBLE_SUM_DATA); - private static final MetricData SUMMARY_METRIC = - ImmutableMetricData.createDoubleSummary( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Summary name", - "Summary description", - "ms", - SUMMARY_DATA); - - private static final MetricData HISTOGRAM_METRIC = - ImmutableMetricData.createDoubleHistogram( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Histogram name", - "Histogram description", - "ms", - HISTOGRAM_DATA); - private static final MetricData EXPONENTIAL_HISTOGRAM_METRIC = - ImmutableMetricData.createExponentialHistogram( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Exponential histogram name", - "Exponential histogram description", - "ms", - EXPONENTIAL_HISTOGRAM_DATA); - @Test void verifyLongGaugeMapping() { - Metric proto = mapToProto(LONG_GAUGE_METRIC); + MetricData longGauge = makeLongGauge(TraceFlags.getDefault()); + MetricData expected = makeLongGauge(TraceFlags.getSampled()); - assertEquals( - LONG_GAUGE_METRIC, - mapToSdk( - proto, - LONG_GAUGE_METRIC.getResource(), - LONG_GAUGE_METRIC.getInstrumentationScopeInfo())); + Metric proto = mapToProto(longGauge); + + MetricData result = + mapToSdk(proto, longGauge.getResource(), longGauge.getInstrumentationScopeInfo()); + + assertThat(result).isEqualTo(expected); } @Test void verifyDoubleGaugeMapping() { - Metric proto = mapToProto(DOUBLE_GAUGE_METRIC); + MetricData doubleGauge = makeDoubleGauge(TraceFlags.getDefault()); + MetricData expected = makeDoubleGauge(TraceFlags.getSampled()); - assertEquals( - DOUBLE_GAUGE_METRIC, - mapToSdk( - proto, - DOUBLE_GAUGE_METRIC.getResource(), - DOUBLE_GAUGE_METRIC.getInstrumentationScopeInfo())); + Metric proto = mapToProto(doubleGauge); + MetricData result = + mapToSdk(proto, doubleGauge.getResource(), doubleGauge.getInstrumentationScopeInfo()); + + assertThat(result).isEqualTo(expected); } @Test void verifyLongSumMapping() { - Metric proto = mapToProto(LONG_SUM_METRIC); + MetricData longSum = makeLongSum(TraceFlags.getDefault()); + MetricData expected = makeLongSum(TraceFlags.getSampled()); - assertEquals( - LONG_SUM_METRIC, - mapToSdk( - proto, LONG_SUM_METRIC.getResource(), LONG_SUM_METRIC.getInstrumentationScopeInfo())); + Metric proto = mapToProto(longSum); + MetricData result = + mapToSdk(proto, TestData.RESOURCE_FULL, longSum.getInstrumentationScopeInfo()); + assertThat(expected).isEqualTo(result); } @Test void verifyDoubleSumMapping() { - Metric proto = mapToProto(DOUBLE_SUM_METRIC); + MetricData doubleSum = makeDoubleSum(TraceFlags.getDefault()); + MetricData expected = makeDoubleSum(TraceFlags.getSampled()); - assertEquals( - DOUBLE_SUM_METRIC, - mapToSdk( - proto, - DOUBLE_SUM_METRIC.getResource(), - DOUBLE_SUM_METRIC.getInstrumentationScopeInfo())); + Metric proto = mapToProto(doubleSum); + + MetricData result = + mapToSdk(proto, doubleSum.getResource(), doubleSum.getInstrumentationScopeInfo()); + + assertThat(result).isEqualTo(expected); } @Test void verifySummaryMapping() { - Metric proto = mapToProto(SUMMARY_METRIC); + ValueAtQuantile value = ImmutableValueAtQuantile.create(2.0, 1.0); + SummaryPointData pointData = + ImmutableSummaryPointData.create( + 1L, 2L, TestData.ATTRIBUTES, 1L, 2.0, Collections.singletonList(value)); + + SummaryData data = ImmutableSummaryData.create(Collections.singletonList(pointData)); + + MetricData summaryMetric = + ImmutableMetricData.createDoubleSummary( + TestData.RESOURCE_FULL, + TestData.INSTRUMENTATION_SCOPE_INFO_FULL, + "Summary name", + "Summary description", + "ms", + data); + + Metric proto = mapToProto(summaryMetric); assertEquals( - SUMMARY_METRIC, - mapToSdk( - proto, SUMMARY_METRIC.getResource(), SUMMARY_METRIC.getInstrumentationScopeInfo())); + summaryMetric, + mapToSdk(proto, summaryMetric.getResource(), summaryMetric.getInstrumentationScopeInfo())); } @Test void verifyHistogramMapping() { - Metric proto = mapToProto(HISTOGRAM_METRIC); - assertEquals( - HISTOGRAM_METRIC, + MetricData histogramMetric = makeHistogram(TraceFlags.getDefault()); + MetricData expected = makeHistogram(TraceFlags.getSampled()); + + Metric proto = mapToProto(histogramMetric); + + MetricData result = mapToSdk( - proto, HISTOGRAM_METRIC.getResource(), HISTOGRAM_METRIC.getInstrumentationScopeInfo())); + proto, histogramMetric.getResource(), histogramMetric.getInstrumentationScopeInfo()); + + assertThat(result).isEqualTo(expected); } @Test void verifyExponentialHistogramMapping() { - Metric proto = mapToProto(EXPONENTIAL_HISTOGRAM_METRIC); + MetricData histogram = makeExponentialHistogram(TraceFlags.getDefault()); + MetricData expected = makeExponentialHistogram(TraceFlags.getSampled()); - assertEquals( - EXPONENTIAL_HISTOGRAM_METRIC, - mapToSdk( - proto, - EXPONENTIAL_HISTOGRAM_METRIC.getResource(), - EXPONENTIAL_HISTOGRAM_METRIC.getInstrumentationScopeInfo())); + Metric proto = mapToProto(histogram); + + MetricData result = + mapToSdk(proto, histogram.getResource(), histogram.getInstrumentationScopeInfo()); + + assertThat(result).isEqualTo(expected); + } + + @NotNull + private static MetricData makeDoubleGauge(TraceFlags flags) { + DoublePointData point = makeDoublePointData(flags); + return ImmutableMetricData.createDoubleGauge( + TestData.RESOURCE_FULL, + TestData.INSTRUMENTATION_SCOPE_INFO_FULL, + "Double gauge name", + "Double gauge description", + "ms", + ImmutableGaugeData.create(Collections.singletonList(point))); + } + + @NotNull + private static List makeDoubleExemplars(TraceFlags flags) { + SpanContext context = makeContext(flags); + DoubleExemplarData doubleExemplarData = + ImmutableDoubleExemplarData.create(TestData.ATTRIBUTES, 100L, context, 1.0); + return Collections.singletonList(doubleExemplarData); + } + + @NotNull + private static MetricData makeLongSum(TraceFlags flags) { + LongPointData pointData = makeLongPointData(flags); + SumData sumData = + ImmutableSumData.create( + true, AggregationTemporality.DELTA, Collections.singletonList(pointData)); + return ImmutableMetricData.createLongSum( + TestData.RESOURCE_FULL, + TestData.INSTRUMENTATION_SCOPE_INFO_FULL, + "Long sum name", + "Long sum description", + "ms", + sumData); + } + + @NotNull + private static MetricData makeDoubleSum(TraceFlags flags) { + DoublePointData doublePointData = makeDoublePointData(flags); + SumData sumData = + ImmutableSumData.create( + true, AggregationTemporality.DELTA, Collections.singletonList(doublePointData)); + + return ImmutableMetricData.createDoubleSum( + TestData.RESOURCE_FULL, + TestData.INSTRUMENTATION_SCOPE_INFO_FULL, + "Double sum name", + "Double sum description", + "ms", + sumData); + } + + @NotNull + private static DoublePointData makeDoublePointData(TraceFlags flags) { + return ImmutableDoublePointData.create( + 1L, 2L, TestData.ATTRIBUTES, 1.0, makeDoubleExemplars(flags)); + } + + @NotNull + private static MetricData makeExponentialHistogram(TraceFlags flags) { + ExponentialHistogramBuckets positiveBucket = + ImmutableExponentialHistogramBuckets.create(1, 10, Arrays.asList(1L, 10L)); + ExponentialHistogramBuckets negativeBucket = + ImmutableExponentialHistogramBuckets.create(1, 0, Collections.emptyList()); + + ExponentialHistogramPointData pointData = + ImmutableExponentialHistogramPointData.create( + 1, + 10.0, + 1L, + true, + 2.0, + true, + 4.0, + positiveBucket, + negativeBucket, + 1L, + 2L, + TestData.ATTRIBUTES, + makeDoubleExemplars(flags)); + + ExponentialHistogramData histogramData = + ImmutableExponentialHistogramData.create( + AggregationTemporality.CUMULATIVE, Collections.singletonList(pointData)); + + return ImmutableMetricData.createExponentialHistogram( + TestData.RESOURCE_FULL, + TestData.INSTRUMENTATION_SCOPE_INFO_FULL, + "Exponential histogram name", + "Exponential histogram description", + "ms", + histogramData); + } + + @NotNull + private static MetricData makeHistogram(TraceFlags flags) { + HistogramPointData dataPoint = + ImmutableHistogramPointData.create( + 1L, + 2L, + TestData.ATTRIBUTES, + 15.0, + true, + 4.0, + true, + 7.0, + Collections.singletonList(10.0), + Arrays.asList(1L, 2L), + makeDoubleExemplars(flags)); + + HistogramData data = + ImmutableHistogramData.create( + AggregationTemporality.CUMULATIVE, Collections.singletonList(dataPoint)); + + return ImmutableMetricData.createDoubleHistogram( + TestData.RESOURCE_FULL, + TestData.INSTRUMENTATION_SCOPE_INFO_FULL, + "Histogram name", + "Histogram description", + "ms", + data); } private static Metric mapToProto(MetricData source) { diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapperTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapperTest.java index 16812f3b..b45e9c9e 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapperTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/mapping/metrics/ProtoMetricsDataMapperTest.java @@ -8,19 +8,13 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.m import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; import io.opentelemetry.proto.metrics.v1.Metric; import io.opentelemetry.proto.metrics.v1.MetricsData; import io.opentelemetry.proto.metrics.v1.ResourceMetrics; import io.opentelemetry.proto.metrics.v1.ScopeMetrics; -import io.opentelemetry.sdk.metrics.data.GaugeData; -import io.opentelemetry.sdk.metrics.data.LongExemplarData; -import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; -import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongExemplarData; -import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; -import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -29,54 +23,12 @@ import org.junit.jupiter.api.Test; class ProtoMetricsDataMapperTest { - private static final LongExemplarData LONG_EXEMPLAR_DATA = - ImmutableLongExemplarData.create(TestData.ATTRIBUTES, 100L, TestData.SPAN_CONTEXT, 1L); - - private static final LongPointData LONG_POINT_DATA = - ImmutableLongPointData.create( - 1L, 2L, TestData.ATTRIBUTES, 1L, Collections.singletonList(LONG_EXEMPLAR_DATA)); - private static final GaugeData LONG_GAUGE_DATA = - ImmutableGaugeData.create(Collections.singletonList(LONG_POINT_DATA)); - - private static final MetricData LONG_GAUGE_METRIC = - ImmutableMetricData.createLongGauge( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Long gauge name", - "Long gauge description", - "ms", - LONG_GAUGE_DATA); - - private static final MetricData OTHER_LONG_GAUGE_METRIC = - ImmutableMetricData.createLongGauge( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Long gauge name", - "Long gauge description", - "ms", - LONG_GAUGE_DATA); - - private static final MetricData LONG_GAUGE_METRIC_WITH_DIFFERENT_SCOPE_SAME_RESOURCE = - ImmutableMetricData.createLongGauge( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_WITHOUT_VERSION, - "Long gauge name", - "Long gauge description", - "ms", - LONG_GAUGE_DATA); - - private static final MetricData LONG_GAUGE_METRIC_WITH_DIFFERENT_RESOURCE = - ImmutableMetricData.createLongGauge( - TestData.RESOURCE_WITHOUT_SCHEMA_URL, - TestData.INSTRUMENTATION_SCOPE_INFO_WITHOUT_VERSION, - "Long gauge name", - "Long gauge description", - "ms", - LONG_GAUGE_DATA); - @Test void verifyConversionDataStructure() { - List signals = Collections.singletonList(LONG_GAUGE_METRIC); + MetricData gauge1 = TestData.makeLongGauge(TraceFlags.getDefault()); + List signals = Collections.singletonList(gauge1); + MetricData expectedGauge1 = TestData.makeLongGauge(TraceFlags.getSampled()); + List expectedSignals = Collections.singletonList(expectedGauge1); MetricsData proto = mapToProto(signals); @@ -85,12 +37,17 @@ class ProtoMetricsDataMapperTest { assertEquals(1, resourceMetrics.get(0).scope_metrics.size()); assertEquals(1, resourceMetrics.get(0).scope_metrics.get(0).metrics.size()); - assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(signals); + assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(expectedSignals); } @Test void verifyMultipleMetricsWithSameResourceAndScope() { - List signals = Arrays.asList(LONG_GAUGE_METRIC, OTHER_LONG_GAUGE_METRIC); + MetricData gauge1 = TestData.makeLongGauge(TraceFlags.getDefault()); + MetricData gauge2 = TestData.makeLongGauge(TraceFlags.getDefault()); + List signals = Arrays.asList(gauge1, gauge2); + MetricData expectedGauge1 = TestData.makeLongGauge(TraceFlags.getSampled()); + MetricData expectedGauge2 = TestData.makeLongGauge(TraceFlags.getSampled()); + List expectedSignals = Arrays.asList(expectedGauge1, expectedGauge2); MetricsData proto = mapToProto(signals); @@ -101,13 +58,25 @@ class ProtoMetricsDataMapperTest { List metrics = scopeMetrics.get(0).metrics; assertEquals(2, metrics.size()); - assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(signals); + List result = mapFromProto(proto); + + assertThat(result).containsExactlyInAnyOrderElementsOf(expectedSignals); } @Test void verifyMultipleMetricsWithSameResourceDifferentScope() { - List signals = - Arrays.asList(LONG_GAUGE_METRIC, LONG_GAUGE_METRIC_WITH_DIFFERENT_SCOPE_SAME_RESOURCE); + MetricData gauge1 = TestData.makeLongGauge(TraceFlags.getDefault()); + MetricData gauge2 = + TestData.makeLongGauge( + TraceFlags.getDefault(), TestData.INSTRUMENTATION_SCOPE_INFO_WITHOUT_VERSION); + + MetricData expectedGauge1 = TestData.makeLongGauge(TraceFlags.getSampled()); + MetricData expectedGauge2 = + TestData.makeLongGauge( + TraceFlags.getSampled(), TestData.INSTRUMENTATION_SCOPE_INFO_WITHOUT_VERSION); + + List signals = Arrays.asList(gauge1, gauge2); + List expectedSignals = Arrays.asList(expectedGauge1, expectedGauge2); MetricsData proto = mapToProto(signals); @@ -122,13 +91,27 @@ class ProtoMetricsDataMapperTest { assertEquals(1, firstScopeMetrics.size()); assertEquals(1, secondScopeMetrics.size()); - assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(signals); + assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(expectedSignals); } @Test void verifyMultipleMetricsWithDifferentResource() { - List signals = - Arrays.asList(LONG_GAUGE_METRIC, LONG_GAUGE_METRIC_WITH_DIFFERENT_RESOURCE); + MetricData gauge1 = TestData.makeLongGauge(TraceFlags.getDefault()); + MetricData gauge2 = + TestData.makeLongGauge( + TraceFlags.getDefault(), + TestData.RESOURCE_WITHOUT_SCHEMA_URL, + TestData.INSTRUMENTATION_SCOPE_INFO_WITHOUT_VERSION); + List signals = Arrays.asList(gauge1, gauge2); + MetricData expectedGauge1 = TestData.makeLongGauge(TraceFlags.getSampled()); + MetricData expectedGauge2 = + TestData.makeLongGauge( + TraceFlags.getSampled(), + TestData.RESOURCE_WITHOUT_SCHEMA_URL, + TestData.INSTRUMENTATION_SCOPE_INFO_WITHOUT_VERSION); + List expectedSignals = Arrays.asList(expectedGauge1, expectedGauge2); + // , LONG_GAUGE_METRIC_WITH_DIFFERENT_RESOURCE); + // List expectedSignals = Arrays.asList(expected); MetricsData proto = mapToProto(signals); @@ -147,7 +130,7 @@ class ProtoMetricsDataMapperTest { assertEquals(1, firstMetrics.size()); assertEquals(1, secondMetrics.size()); - assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(signals); + assertThat(mapFromProto(proto)).containsExactlyInAnyOrderElementsOf(expectedSignals); } private static MetricsData mapToProto(Collection signals) { diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/MetricDataSerializerTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/MetricDataSerializerTest.java index c659609e..e622ba8f 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/MetricDataSerializerTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/MetricDataSerializerTest.java @@ -5,6 +5,11 @@ package io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers; +import static java.util.Collections.singletonList; + +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.contrib.disk.buffering.testutils.BaseSignalSerializerTest; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; @@ -40,155 +45,229 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableValueAtQuantile; import java.util.Arrays; import java.util.Collections; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; class MetricDataSerializerTest extends BaseSignalSerializerTest { - private static final LongExemplarData LONG_EXEMPLAR_DATA = - ImmutableLongExemplarData.create(TestData.ATTRIBUTES, 100L, TestData.SPAN_CONTEXT, 1L); - - private static final DoubleExemplarData DOUBLE_EXEMPLAR_DATA = - ImmutableDoubleExemplarData.create(TestData.ATTRIBUTES, 100L, TestData.SPAN_CONTEXT, 1.0); - private static final LongPointData LONG_POINT_DATA = - ImmutableLongPointData.create( - 1L, 2L, TestData.ATTRIBUTES, 1L, Collections.singletonList(LONG_EXEMPLAR_DATA)); - - private static final DoublePointData DOUBLE_POINT_DATA = - ImmutableDoublePointData.create( - 1L, 2L, TestData.ATTRIBUTES, 1.0, Collections.singletonList(DOUBLE_EXEMPLAR_DATA)); - - private static final GaugeData LONG_GAUGE_DATA = - ImmutableGaugeData.create(Collections.singletonList(LONG_POINT_DATA)); - - private static final GaugeData DOUBLE_GAUGE_DATA = - ImmutableGaugeData.create(Collections.singletonList(DOUBLE_POINT_DATA)); - - private static final SumData LONG_SUM_DATA = - ImmutableSumData.create( - true, AggregationTemporality.DELTA, Collections.singletonList(LONG_POINT_DATA)); - - private static final SumData DOUBLE_SUM_DATA = - ImmutableSumData.create( - true, AggregationTemporality.DELTA, Collections.singletonList(DOUBLE_POINT_DATA)); - - private static final ValueAtQuantile VALUE_AT_QUANTILE = - ImmutableValueAtQuantile.create(2.0, 1.0); - private static final SummaryPointData SUMMARY_POINT_DATA = - ImmutableSummaryPointData.create( - 1L, 2L, TestData.ATTRIBUTES, 1L, 2.0, Collections.singletonList(VALUE_AT_QUANTILE)); - - private static final SummaryData SUMMARY_DATA = - ImmutableSummaryData.create(Collections.singletonList(SUMMARY_POINT_DATA)); - - private static final HistogramPointData HISTOGRAM_POINT_DATA = - ImmutableHistogramPointData.create( - 1L, - 2L, - TestData.ATTRIBUTES, - 15.0, - true, - 4.0, - true, - 7.0, - Collections.singletonList(10.0), - Arrays.asList(1L, 2L), - Collections.singletonList(DOUBLE_EXEMPLAR_DATA)); - private static final ExponentialHistogramBuckets POSITIVE_BUCKET = - ImmutableExponentialHistogramBuckets.create(1, 10, Arrays.asList(1L, 10L)); - - private static final ExponentialHistogramBuckets NEGATIVE_BUCKET = - ImmutableExponentialHistogramBuckets.create(1, 0, Collections.emptyList()); - private static final ExponentialHistogramPointData EXPONENTIAL_HISTOGRAM_POINT_DATA = - ImmutableExponentialHistogramPointData.create( - 1, - 10.0, - 1L, - true, - 2.0, - true, - 4.0, - POSITIVE_BUCKET, - NEGATIVE_BUCKET, - 1L, - 2L, - TestData.ATTRIBUTES, - Collections.singletonList(DOUBLE_EXEMPLAR_DATA)); - private static final HistogramData HISTOGRAM_DATA = - ImmutableHistogramData.create( - AggregationTemporality.CUMULATIVE, Collections.singletonList(HISTOGRAM_POINT_DATA)); - private static final ExponentialHistogramData EXPONENTIAL_HISTOGRAM_DATA = - ImmutableExponentialHistogramData.create( - AggregationTemporality.CUMULATIVE, - Collections.singletonList(EXPONENTIAL_HISTOGRAM_POINT_DATA)); - private static final MetricData LONG_GAUGE_METRIC = - ImmutableMetricData.createLongGauge( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Long gauge name", - "Long gauge description", - "ms", - LONG_GAUGE_DATA); - - private static final MetricData DOUBLE_GAUGE_METRIC = - ImmutableMetricData.createDoubleGauge( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Double gauge name", - "Double gauge description", - "ms", - DOUBLE_GAUGE_DATA); - private static final MetricData LONG_SUM_METRIC = - ImmutableMetricData.createLongSum( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Long sum name", - "Long sum description", - "ms", - LONG_SUM_DATA); - private static final MetricData DOUBLE_SUM_METRIC = - ImmutableMetricData.createDoubleSum( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Double sum name", - "Double sum description", - "ms", - DOUBLE_SUM_DATA); - private static final MetricData SUMMARY_METRIC = - ImmutableMetricData.createDoubleSummary( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Summary name", - "Summary description", - "ms", - SUMMARY_DATA); - - private static final MetricData HISTOGRAM_METRIC = - ImmutableMetricData.createDoubleHistogram( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Histogram name", - "Histogram description", - "ms", - HISTOGRAM_DATA); - private static final MetricData EXPONENTIAL_HISTOGRAM_METRIC = - ImmutableMetricData.createExponentialHistogram( - TestData.RESOURCE_FULL, - TestData.INSTRUMENTATION_SCOPE_INFO_FULL, - "Exponential histogram name", - "Exponential histogram description", - "ms", - EXPONENTIAL_HISTOGRAM_DATA); + @Test + void verifyLongGauge() { + MetricData metric = createLongGauge(TraceFlags.getDefault()); + MetricData expected = createLongGauge(TraceFlags.getSampled()); + assertSerializeDeserialize(metric, expected); + } @Test - void verifySerialization() { - assertSerialization( - LONG_GAUGE_METRIC, - DOUBLE_GAUGE_METRIC, - LONG_SUM_METRIC, - DOUBLE_SUM_METRIC, - SUMMARY_METRIC, - HISTOGRAM_METRIC, - EXPONENTIAL_HISTOGRAM_METRIC); + void verifyDoubleGauge() { + MetricData metric = createDoubleGauge(TraceFlags.getDefault()); + // This silly work exists here so that we can verify that the exemplar that we get back has a + // span context + // whose flags are set to something different. This is because the trace flags are NOT part of + // the exemplar + // protos, and what we get back is not what we put in. + MetricData expected = createDoubleGauge(TraceFlags.getSampled()); + assertSerializeDeserialize(metric, expected); + } + + @Test + void verifyLongSum() { + MetricData metric = makeLongSum(TraceFlags.getDefault()); + MetricData expected = makeLongSum(TraceFlags.getSampled()); + assertSerializeDeserialize(metric, expected); + } + + @Test + void verifySummary() { + ValueAtQuantile value = ImmutableValueAtQuantile.create(2.0, 1.0); + + SummaryPointData pointData = + ImmutableSummaryPointData.create( + 1L, 2L, TestData.ATTRIBUTES, 1L, 2.0, singletonList(value)); + + SummaryData data = ImmutableSummaryData.create(singletonList(pointData)); + MetricData metric = + ImmutableMetricData.createDoubleSummary( + TestData.RESOURCE_FULL, + TestData.INSTRUMENTATION_SCOPE_INFO_FULL, + "Summary name", + "Summary description", + "ms", + data); + assertSerialization(metric); + } + + @Test + void verifyDoubleSum() { + MetricData metric = makeDoubleSum(TraceFlags.getDefault()); + MetricData expected = makeDoubleSum(TraceFlags.getSampled()); + assertSerializeDeserialize(metric, expected); + } + + @Test + void verifyHistogram() { + MetricData metric = makeHistogram(TraceFlags.getDefault()); + MetricData expected = makeHistogram(TraceFlags.getSampled()); + assertSerializeDeserialize(metric, expected); + } + + @Test + void verifyExponentialHistogram() { + MetricData metric = makeExponentialHistogram(TraceFlags.getDefault()); + MetricData expected = makeExponentialHistogram(TraceFlags.getSampled()); + assertSerializeDeserialize(metric, expected); + } + + @NotNull + private static MetricData createLongGauge(TraceFlags flags) { + LongPointData pointData = makeLongPointData(flags); + GaugeData data = ImmutableGaugeData.create(singletonList(pointData)); + return ImmutableMetricData.createLongGauge( + TestData.RESOURCE_FULL, + TestData.INSTRUMENTATION_SCOPE_INFO_FULL, + "Long gauge name", + "Long gauge description", + "ms", + data); + } + + @NotNull + private static DoublePointData makeDoublePointData(TraceFlags flags) { + DoubleExemplarData doubleExemplarData = makeDoubleExemplar(flags); + return ImmutableDoublePointData.create( + 1L, 2L, TestData.ATTRIBUTES, 1.0, singletonList(doubleExemplarData)); + } + + @NotNull + private static MetricData createDoubleGauge(TraceFlags flags) { + DoublePointData point = makeDoublePointData(flags); + GaugeData gaugeData = ImmutableGaugeData.create(singletonList(point)); + MetricData metric = + ImmutableMetricData.createDoubleGauge( + TestData.RESOURCE_FULL, + TestData.INSTRUMENTATION_SCOPE_INFO_FULL, + "Double gauge name", + "Double gauge description", + "ms", + gaugeData); + return metric; + } + + @NotNull + private static DoubleExemplarData makeDoubleExemplar(TraceFlags flags) { + SpanContext expectedExemplarContext = createSpanContext(flags); + return ImmutableDoubleExemplarData.create( + TestData.ATTRIBUTES, 100L, expectedExemplarContext, 1.0); + } + + @NotNull + private static MetricData makeLongSum(TraceFlags flags) { + LongPointData point = makeLongPointData(flags); + SumData sumData = + ImmutableSumData.create(true, AggregationTemporality.DELTA, singletonList(point)); + + return ImmutableMetricData.createLongSum( + TestData.RESOURCE_FULL, + TestData.INSTRUMENTATION_SCOPE_INFO_FULL, + "Long sum name", + "Long sum description", + "ms", + sumData); + } + + @NotNull + private static LongPointData makeLongPointData(TraceFlags flags) { + LongExemplarData ex = makeLongExemplar(flags); + return ImmutableLongPointData.create(1L, 2L, TestData.ATTRIBUTES, 1L, singletonList(ex)); + } + + @NotNull + private static LongExemplarData makeLongExemplar(TraceFlags flags) { + SpanContext context = createSpanContext(flags); + return ImmutableLongExemplarData.create(TestData.ATTRIBUTES, 100L, context, 1L); + } + + @NotNull + private static SpanContext createSpanContext(TraceFlags flags) { + return SpanContext.create(TestData.TRACE_ID, TestData.SPAN_ID, flags, TraceState.getDefault()); + } + + @NotNull + private static MetricData makeDoubleSum(TraceFlags flags) { + DoublePointData point = makeDoublePointData(flags); + SumData DOUBLE_SUM_DATA = + ImmutableSumData.create(true, AggregationTemporality.DELTA, singletonList(point)); + return ImmutableMetricData.createDoubleSum( + TestData.RESOURCE_FULL, + TestData.INSTRUMENTATION_SCOPE_INFO_FULL, + "Double sum name", + "Double sum description", + "ms", + DOUBLE_SUM_DATA); + } + + @NotNull + private static MetricData makeExponentialHistogram(TraceFlags flags) { + ExponentialHistogramBuckets positiveBucket = + ImmutableExponentialHistogramBuckets.create(1, 10, Arrays.asList(1L, 10L)); + + ExponentialHistogramBuckets negativeBucket = + ImmutableExponentialHistogramBuckets.create(1, 0, Collections.emptyList()); + + DoubleExemplarData exemplar = makeDoubleExemplar(flags); + ExponentialHistogramPointData pointData = + ImmutableExponentialHistogramPointData.create( + 1, + 10.0, + 1L, + true, + 2.0, + true, + 4.0, + positiveBucket, + negativeBucket, + 1L, + 2L, + TestData.ATTRIBUTES, + singletonList(exemplar)); + ExponentialHistogramData data = + ImmutableExponentialHistogramData.create( + AggregationTemporality.CUMULATIVE, singletonList(pointData)); + return ImmutableMetricData.createExponentialHistogram( + TestData.RESOURCE_FULL, + TestData.INSTRUMENTATION_SCOPE_INFO_FULL, + "Exponential histogram name", + "Exponential histogram description", + "ms", + data); + } + + @NotNull + private static MetricData makeHistogram(TraceFlags flags) { + DoubleExemplarData exemplar = makeDoubleExemplar(flags); + HistogramPointData pointData = + ImmutableHistogramPointData.create( + 1L, + 2L, + TestData.ATTRIBUTES, + 15.0, + true, + 4.0, + true, + 7.0, + singletonList(10.0), + Arrays.asList(1L, 2L), + singletonList(exemplar)); + + HistogramData data = + ImmutableHistogramData.create(AggregationTemporality.CUMULATIVE, singletonList(pointData)); + + return ImmutableMetricData.createDoubleHistogram( + TestData.RESOURCE_FULL, + TestData.INSTRUMENTATION_SCOPE_INFO_FULL, + "Histogram name", + "Histogram description", + "ms", + data); } @Override diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SpanDataSerializerTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SpanDataSerializerTest.java index 0958d632..3c358aec 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SpanDataSerializerTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/serializers/SpanDataSerializerTest.java @@ -28,47 +28,50 @@ class SpanDataSerializerTest extends BaseSignalSerializerTest { private static final LinkData LINK_DATA_WITH_TRACE_STATE = LinkData.create(TestData.SPAN_CONTEXT_WITH_TRACE_STATE, TestData.ATTRIBUTES, 20); - private static final SpanData SPAN_DATA = - SpanDataImpl.builder() - .setResource(TestData.RESOURCE_FULL) - .setInstrumentationScopeInfo(TestData.INSTRUMENTATION_SCOPE_INFO_FULL) - .setName("Span name") - .setSpanContext(TestData.SPAN_CONTEXT) - .setParentSpanContext(TestData.PARENT_SPAN_CONTEXT) - .setAttributes(TestData.ATTRIBUTES) - .setStartEpochNanos(1L) - .setEndEpochNanos(2L) - .setKind(SpanKind.CLIENT) - .setStatus(StatusData.error()) - .setEvents(Collections.singletonList(EVENT_DATA)) - .setLinks(Arrays.asList(LINK_DATA, LINK_DATA_WITH_TRACE_STATE)) - .setTotalAttributeCount(10) - .setTotalRecordedEvents(2) - .setTotalRecordedLinks(2) - .build(); - - private static final SpanData SPAN_DATA_WITH_TRACE_STATE = - SpanDataImpl.builder() - .setResource(TestData.RESOURCE_FULL) - .setInstrumentationScopeInfo(TestData.INSTRUMENTATION_SCOPE_INFO_FULL) - .setName("Span name2") - .setSpanContext(TestData.SPAN_CONTEXT_WITH_TRACE_STATE) - .setParentSpanContext(TestData.PARENT_SPAN_CONTEXT) - .setAttributes(TestData.ATTRIBUTES) - .setStartEpochNanos(1L) - .setEndEpochNanos(2L) - .setKind(SpanKind.CLIENT) - .setStatus(StatusData.error()) - .setEvents(Collections.singletonList(EVENT_DATA)) - .setLinks(Collections.singletonList(LINK_DATA)) - .setTotalAttributeCount(10) - .setTotalRecordedEvents(2) - .setTotalRecordedLinks(2) - .build(); + @Test + void verifySerialization_noFlagsNoState() { + SpanData span = + SpanDataImpl.builder() + .setResource(TestData.RESOURCE_FULL) + .setInstrumentationScopeInfo(TestData.INSTRUMENTATION_SCOPE_INFO_FULL) + .setName("Span name") + .setSpanContext(TestData.SPAN_CONTEXT) + .setParentSpanContext(TestData.PARENT_SPAN_CONTEXT) + .setAttributes(TestData.ATTRIBUTES) + .setStartEpochNanos(1L) + .setEndEpochNanos(2L) + .setKind(SpanKind.CLIENT) + .setStatus(StatusData.error()) + .setEvents(Collections.singletonList(EVENT_DATA)) + .setLinks(Arrays.asList(LINK_DATA, LINK_DATA_WITH_TRACE_STATE)) + .setTotalAttributeCount(10) + .setTotalRecordedEvents(2) + .setTotalRecordedLinks(2) + .build(); + assertSerialization(span); + } @Test - void verifySerialization() { - assertSerialization(SPAN_DATA, SPAN_DATA_WITH_TRACE_STATE); + void verifySerialization_withTraceState() { + SpanData span = + SpanDataImpl.builder() + .setResource(TestData.RESOURCE_FULL) + .setInstrumentationScopeInfo(TestData.INSTRUMENTATION_SCOPE_INFO_FULL) + .setName("Span name2") + .setSpanContext(TestData.SPAN_CONTEXT_WITH_TRACE_STATE) + .setParentSpanContext(TestData.PARENT_SPAN_CONTEXT) + .setAttributes(TestData.ATTRIBUTES) + .setStartEpochNanos(1L) + .setEndEpochNanos(2L) + .setKind(SpanKind.CLIENT) + .setStatus(StatusData.error()) + .setEvents(Collections.singletonList(EVENT_DATA)) + .setLinks(Collections.singletonList(LINK_DATA)) + .setTotalAttributeCount(10) + .setTotalRecordedEvents(2) + .setTotalRecordedLinks(2) + .build(); + assertSerialization(span); } @Override diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java index 9b41d6f7..aaf0544c 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/FolderManagerTest.java @@ -37,7 +37,7 @@ class FolderManagerTest { @BeforeEach void setUp() { clock = mock(); - folderManager = new FolderManager(rootDir, TestData.getDefaultConfiguration(), clock); + folderManager = new FolderManager(rootDir, TestData.getDefaultConfiguration(rootDir), clock); } @Test diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java index b267327c..883c5dc1 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/TestData.java @@ -8,6 +8,7 @@ package io.opentelemetry.contrib.disk.buffering.internal.storage; import io.opentelemetry.contrib.disk.buffering.internal.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.files.DefaultTemporaryFileProvider; import io.opentelemetry.contrib.disk.buffering.internal.files.TemporaryFileProvider; +import java.io.File; public final class TestData { @@ -17,12 +18,15 @@ public final class TestData { public static final int MAX_FILE_SIZE = 100; public static final int MAX_FOLDER_SIZE = 300; - public static StorageConfiguration getDefaultConfiguration() { - return getConfiguration(DefaultTemporaryFileProvider.getInstance()); + public static StorageConfiguration getDefaultConfiguration(File rootDir) { + TemporaryFileProvider fileProvider = DefaultTemporaryFileProvider.getInstance(); + return getConfiguration(fileProvider, rootDir); } - public static StorageConfiguration getConfiguration(TemporaryFileProvider fileProvider) { + public static StorageConfiguration getConfiguration( + TemporaryFileProvider fileProvider, File rootDir) { return StorageConfiguration.builder() + .setRootDir(rootDir) .setMaxFileAgeForWriteMillis(MAX_FILE_AGE_FOR_WRITE_MILLIS) .setMinFileAgeForReadMillis(MIN_FILE_AGE_FOR_READ_MILLIS) .setMaxFileAgeForReadMillis(MAX_FILE_AGE_FOR_READ_MILLIS) diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java index 442d9fa2..a7fbd3da 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java @@ -97,7 +97,7 @@ class ReadableFileTest { clock = mock(); readableFile = new ReadableFile( - source, CREATED_TIME_MILLIS, clock, getConfiguration(temporaryFileProvider)); + source, CREATED_TIME_MILLIS, clock, getConfiguration(temporaryFileProvider, dir)); } private static void addFileContents(File source) throws IOException { @@ -184,7 +184,7 @@ class ReadableFileTest { ReadableFile emptyReadableFile = new ReadableFile( - emptyFile, CREATED_TIME_MILLIS, clock, getConfiguration(temporaryFileProvider)); + emptyFile, CREATED_TIME_MILLIS, clock, getConfiguration(temporaryFileProvider, dir)); assertEquals(ReadableResult.FAILED, emptyReadableFile.readAndProcess(bytes -> true)); diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java index b10631f9..9ad9d7fc 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/WritableFileTest.java @@ -43,7 +43,7 @@ class WritableFileTest { new WritableFile( new File(rootDir, String.valueOf(CREATED_TIME_MILLIS)), CREATED_TIME_MILLIS, - TestData.getDefaultConfiguration(), + TestData.getDefaultConfiguration(rootDir), clock); } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java index 5ad19aff..53d34c8a 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/BaseSignalSerializerTest.java @@ -36,5 +36,10 @@ public abstract class BaseSignalSerializerTest { assertThat(deserialize(serialized)).containsExactly(targets); } + protected void assertSerializeDeserialize(SIGNAL_SDK_ITEM input, SIGNAL_SDK_ITEM expected) { + byte[] serialized = serialize(input); + assertThat(deserialize(serialized)).containsExactly(expected); + } + protected abstract SignalSerializer getSerializer(); } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/TestData.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/TestData.java index dc049229..15c32c42 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/TestData.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/testutils/TestData.java @@ -10,7 +10,17 @@ import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.data.GaugeData; +import io.opentelemetry.sdk.metrics.data.LongExemplarData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongExemplarData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; import io.opentelemetry.sdk.resources.Resource; +import java.util.Collections; +import org.jetbrains.annotations.NotNull; @SuppressWarnings("unchecked") public final class TestData { @@ -38,7 +48,7 @@ public final class TestData { Resource.create(Attributes.builder().put("resourceAttr", "resourceAttrValue").build()); public static final SpanContext SPAN_CONTEXT = - SpanContext.create(TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault()); + SpanContext.create(TRACE_ID, SPAN_ID, TraceFlags.getDefault(), TraceState.getDefault()); public static final SpanContext SPAN_CONTEXT_WITH_TRACE_STATE = SpanContext.create( TRACE_ID, @@ -67,5 +77,59 @@ public final class TestData { .build()) .build(); + @NotNull + public static MetricData makeLongGauge(TraceFlags flags) { + return makeLongGauge(flags, RESOURCE_FULL, INSTRUMENTATION_SCOPE_INFO_FULL); + } + + @NotNull + public static MetricData makeLongGauge( + TraceFlags flags, InstrumentationScopeInfo instrumentationScopeInfo) { + return makeLongGauge(flags, RESOURCE_FULL, instrumentationScopeInfo); + } + + @NotNull + public static MetricData makeLongGauge(TraceFlags flags, Resource resource) { + return makeLongGauge(flags, resource, INSTRUMENTATION_SCOPE_INFO_FULL); + } + + @NotNull + public static MetricData makeLongGauge( + TraceFlags flags, Resource resource, InstrumentationScopeInfo instrumentationScopeInfo) { + LongPointData point = makeLongPointData(flags); + GaugeData gaugeData = + ImmutableGaugeData.create(Collections.singletonList(point)); + return ImmutableMetricData.createLongGauge( + resource, + instrumentationScopeInfo, + "Long gauge name", + "Long gauge description", + "ms", + gaugeData); + } + + @NotNull + public static LongPointData makeLongPointData(TraceFlags flags) { + LongExemplarData longExemplarData = makeLongExemplarData(flags); + return ImmutableLongPointData.create( + 1L, 2L, ATTRIBUTES, 1L, Collections.singletonList(longExemplarData)); + } + + @NotNull + public static SpanContext makeContext(TraceFlags flags) { + return makeContext(flags, SPAN_ID); + } + + @NotNull + public static SpanContext makeContext(TraceFlags flags, String spanId) { + return SpanContext.create(TRACE_ID, spanId, flags, TraceState.getDefault()); + } + + @NotNull + private static LongExemplarData makeLongExemplarData(TraceFlags flags) { + SpanContext context = makeContext(flags); + return ImmutableLongExemplarData.create(ATTRIBUTES, 100L, context, 1L); + } + private TestData() {} }