diff --git a/sdk_extensions/logging/README.md b/sdk_extensions/logging/README.md new file mode 100644 index 0000000000..70affb87df --- /dev/null +++ b/sdk_extensions/logging/README.md @@ -0,0 +1,6 @@ +# OpenTelemetry Experimental Logging Support + +This project contains experimental support for transport of logs via OpenTelemetry. The API +presented is intended for the use of logging library adapters to enable resource and request +correlation with other observability signals and transport of logs through the OpenTelemetry +collector. diff --git a/sdk_extensions/logging/build.gradle b/sdk_extensions/logging/build.gradle new file mode 100644 index 0000000000..e00c0029e9 --- /dev/null +++ b/sdk_extensions/logging/build.gradle @@ -0,0 +1,20 @@ +plugins { + id "java" + id "maven-publish" + + id "ru.vyarus.animalsniffer" +} + +description = 'OpenTelemetry Contrib Logging Support' +ext.moduleName = "io.opentelemetry.sdk.extensions.logging" + +dependencies { + api project(':opentelemetry-sdk') + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.10.4' + testImplementation libraries.awaitility + + annotationProcessor libraries.auto_value + + signature "org.codehaus.mojo.signature:java17:1.0@signature" + signature "net.sf.androidscents.signature:android-api-level-24:7.0_r2@signature" +} diff --git a/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/LogProcessor.java b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/LogProcessor.java new file mode 100644 index 0000000000..7f9ac7bc7d --- /dev/null +++ b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/LogProcessor.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.logging; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.logging.data.LogRecord; +import io.opentelemetry.sdk.trace.TracerSdkProvider; + +public interface LogProcessor { + + void addLogRecord(LogRecord record); + + /** + * Called when {@link TracerSdkProvider#shutdown()} is called. + * + * @return result + */ + CompletableResultCode shutdown(); + + /** + * Processes all span events that have not yet been processed. + * + * @return result + */ + CompletableResultCode forceFlush(); +} diff --git a/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/LogSink.java b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/LogSink.java new file mode 100644 index 0000000000..3b5ce9b1df --- /dev/null +++ b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/LogSink.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.logging; + +import io.opentelemetry.sdk.logging.data.LogRecord; + +/** A LogSink accepts logging records for transmission to an aggregator or log processing system. */ +public interface LogSink { + /** + * Pass a record to the SDK for transmission to a logging exporter. + * + * @param record record to transmit + */ + void offer(LogRecord record); +} diff --git a/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/LogSinkSdkProvider.java b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/LogSinkSdkProvider.java new file mode 100644 index 0000000000..0d21be4de0 --- /dev/null +++ b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/LogSinkSdkProvider.java @@ -0,0 +1,81 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.logging; + +import io.opentelemetry.internal.Utils; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.logging.data.LogRecord; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class LogSinkSdkProvider { + private final LogSink logSink = new SdkLogSink(); + private final List processors = new ArrayList<>(); + + private LogSinkSdkProvider() {} + + public LogSink get(String instrumentationName, String instrumentationVersion) { + // Currently there is no differentiation by instrumentation library + return logSink; + } + + public void addLogProcessor(LogProcessor processor) { + processors.add(Utils.checkNotNull(processor, "Processor can not be null")); + } + + /** + * Flushes all attached processors. + * + * @return result + */ + public CompletableResultCode forceFlush() { + final List processorResults = new ArrayList<>(processors.size()); + for (LogProcessor processor : processors) { + processorResults.add(processor.forceFlush()); + } + return CompletableResultCode.ofAll(processorResults); + } + + /** + * Shut down of provider and associated processors. + * + * @return result + */ + public CompletableResultCode shutdown() { + Collection processorResults = new ArrayList<>(processors.size()); + for (LogProcessor processor : processors) { + processorResults.add(processor.shutdown()); + } + return CompletableResultCode.ofAll(processorResults); + } + + private class SdkLogSink implements LogSink { + @Override + public void offer(LogRecord record) { + for (LogProcessor processor : processors) { + processor.addLogRecord(record); + } + } + } + + public static class Builder { + public LogSinkSdkProvider build() { + return new LogSinkSdkProvider(); + } + } +} diff --git a/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/data/AnyValue.java b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/data/AnyValue.java new file mode 100644 index 0000000000..31a672b47c --- /dev/null +++ b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/data/AnyValue.java @@ -0,0 +1,276 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.logging.data; + +import com.google.auto.value.AutoValue; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +/** + * A class that represents all the possible values for a data body. An {@code AnyValue} can have 6 + * types of values: {@code String}, {@code boolean}, {@code int}, {@code double}, {@code array}, or + * {@code kvlist}. represented through {@code AnyValue.Type}. A {@code array} or a {@code kvlist} + * can in turn hold other {@code AnyValue} instances, allowing for mapping to JSON-like structures. + * + * @since 0.9.0 + */ +@Immutable +public abstract class AnyValue { + + /** An enum that represents all the possible value types for an {@code AnyValue}. */ + public enum Type { + STRING, + BOOL, + INT64, + DOUBLE, + ARRAY, + KVLIST + } + + /** + * Returns an {@code AnyValue} with a string value. + * + * @param stringValue The new value. + * @return an {@code AnyValue} with a string value. + */ + public static AnyValue stringAnyValue(String stringValue) { + return AnyValueString.create(stringValue); + } + + /** + * Returns the string value of this {@code AnyValue}. An UnsupportedOperationException will be + * thrown if getType() is not {@link AnyValue.Type#STRING}. + * + * @return the string value of this {@code AttributeValue}. + */ + public String getStringValue() { + throw new UnsupportedOperationException( + String.format("This type can only return %s data", getType().name())); + } + + /** + * Returns an {@code AnyValue} with an int value. + * + * @param longValue The new value. + * @return an {@code AnyValue} with a int value. + */ + public static AnyValue longAnyValue(long longValue) { + return AnyValueLong.create(longValue); + } + + public long getLongValue() { + throw new UnsupportedOperationException( + String.format("This type can only return %s data", getType().name())); + } + + /** + * Returns an {@code AnyValue} with a bool value. + * + * @param boolValue The new value. + * @return an {@code AnyValue} with a bool value. + */ + public static AnyValue boolAnyValue(boolean boolValue) { + return AnyValueBool.create(boolValue); + } + + /** + * Returns the boolean value of this {@code AnyValue}. An UnsupportedOperationException will be + * thrown if getType() is not {@link AnyValue.Type#BOOL}. + * + * @return the boolean value of this {@code AttributeValue}. + */ + public boolean getBoolValue() { + throw new UnsupportedOperationException( + String.format("This type can only return %s data", getType().name())); + } + + /** + * Returns an {@code AnyValue} with a double value. + * + * @param doubleValue The new value. + * @return an {@code AnyValue} with a double value. + */ + public static AnyValue doubleAnyValue(double doubleValue) { + return AnyValueDouble.create(doubleValue); + } + + /** + * Returns the double value of this {@code AnyValue}. An UnsupportedOperationException will be + * thrown if getType() is not {@link AnyValue.Type#DOUBLE}. + * + * @return the double value of this {@code AttributeValue}. + */ + public double getDoubleValue() { + throw new UnsupportedOperationException( + String.format("This type can only return %s data", getType().name())); + } + + /** + * Returns an {@code AnyValue} with a array value. + * + * @param values The new value. + * @return an {@code AnyValue} with a array value. + */ + public static AnyValue arrayAnyValue(List values) { + return AnyValueArray.create(values); + } + + /** + * Returns the array value of this {@code AnyValue}. An UnsupportedOperationException will be + * thrown if getType() is not {@link AnyValue.Type#ARRAY}. + * + * @return the array value of this {@code AttributeValue}. + */ + public List getArrayValue() { + throw new UnsupportedOperationException( + String.format("This type can only return %s data", getType().name())); + } + + /** + * Returns an {@code AnyValue} with a kvlist value. + * + * @param values The new value. + * @return an {@code AnyValue} with a kvlist value. + */ + public static AnyValue kvlistAnyValue(Map values) { + return AnyValueKvlist.create(values); + } + + /** + * Returns the string value of this {@code AnyValue}. An UnsupportedOperationException will be + * thrown if getType() is not {@link AnyValue.Type#STRING}. + * + * @return the string value of this {@code AttributeValue}. + */ + public Map getKvlistValue() { + throw new UnsupportedOperationException( + String.format("This type can only return %s data", getType().name())); + } + + public abstract Type getType(); + + @Immutable + @AutoValue + abstract static class AnyValueString extends AnyValue { + AnyValueString() {} + + static AnyValue create(String stringValue) { + return new AutoValue_AnyValue_AnyValueString(stringValue); + } + + @Override + public final Type getType() { + return Type.STRING; + } + + @Override + @Nullable + public abstract String getStringValue(); + } + + @Immutable + @AutoValue + abstract static class AnyValueLong extends AnyValue { + AnyValueLong() {} + + static AnyValue create(long longValue) { + return new AutoValue_AnyValue_AnyValueLong(longValue); + } + + @Override + public final Type getType() { + return Type.INT64; + } + + @Override + public abstract long getLongValue(); + } + + @Immutable + @AutoValue + abstract static class AnyValueBool extends AnyValue { + AnyValueBool() {} + + static AnyValue create(boolean boolValue) { + return new AutoValue_AnyValue_AnyValueBool(boolValue); + } + + @Override + public final Type getType() { + return Type.BOOL; + } + + @Override + public abstract boolean getBoolValue(); + } + + @Immutable + @AutoValue + abstract static class AnyValueDouble extends AnyValue { + AnyValueDouble() {} + + static AnyValue create(double doubleValue) { + return new AutoValue_AnyValue_AnyValueDouble(doubleValue); + } + + @Override + public final Type getType() { + return Type.DOUBLE; + } + + @Override + public abstract double getDoubleValue(); + } + + @Immutable + @AutoValue + abstract static class AnyValueArray extends AnyValue { + AnyValueArray() {} + + static AnyValue create(List arrayValue) { + return new AutoValue_AnyValue_AnyValueArray(arrayValue); + } + + @Override + public final Type getType() { + return Type.ARRAY; + } + + @Override + public abstract List getArrayValue(); + } + + @Immutable + @AutoValue + abstract static class AnyValueKvlist extends AnyValue { + AnyValueKvlist() {} + + static AnyValue create(Map kvlistValue) { + return new AutoValue_AnyValue_AnyValueKvlist(kvlistValue); + } + + @Override + public final Type getType() { + return Type.KVLIST; + } + + @Override + public abstract Map getKvlistValue(); + } +} diff --git a/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/data/LogRecord.java b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/data/LogRecord.java new file mode 100644 index 0000000000..d86ea0abb9 --- /dev/null +++ b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/data/LogRecord.java @@ -0,0 +1,181 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.logging.data; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.common.Attributes; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +/** + * A LogRecord is an implementation of the + * OpenTelemetry logging model. + */ +@AutoValue +public abstract class LogRecord { + + public abstract long getTimeUnixNano(); + + public abstract String getTraceId(); + + public abstract String getSpanId(); + + public abstract int getFlags(); + + public abstract Severity getSeverity(); + + @Nullable + public abstract String getSeverityText(); + + @Nullable + public abstract String getName(); + + public abstract AnyValue getBody(); + + public abstract Attributes getAttributes(); + + public enum Severity { + UNDEFINED_SEVERITY_NUMBER(0), + TRACE(1), + TRACE2(2), + TRACE3(3), + TRACE4(4), + DEBUG(5), + DEBUG2(6), + DEBUG3(7), + DEBUG4(8), + INFO(9), + INFO2(10), + INFO3(11), + INFO4(12), + WARN(13), + WARN2(14), + WARN3(15), + WARN4(16), + ERROR(17), + ERROR2(18), + ERROR3(19), + ERROR4(20), + FATAL(21), + FATAL2(22), + FATAL3(23), + FATAL4(24), + ; + + private final int severityNumber; + + Severity(int severityNumber) { + this.severityNumber = severityNumber; + } + + public int getSeverityNumber() { + return severityNumber; + } + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private long timeUnixNano; + private String traceId = ""; + private String spanId = ""; + private int flags; + private Severity severity = Severity.UNDEFINED_SEVERITY_NUMBER; + private String severityText; + private String name; + private AnyValue body = AnyValue.stringAnyValue(""); + private final Attributes.Builder attributeBuilder = Attributes.newBuilder(); + + public Builder setUnixTimeNano(long timestamp) { + this.timeUnixNano = timestamp; + return this; + } + + public Builder setUnixTimeMillis(long timestamp) { + return setUnixTimeNano(TimeUnit.MILLISECONDS.toNanos(timestamp)); + } + + public Builder setTraceId(String traceId) { + this.traceId = traceId; + return this; + } + + public Builder setSpanId(String spanId) { + this.spanId = spanId; + return this; + } + + public Builder setFlags(int flags) { + this.flags = flags; + return this; + } + + public Builder setSeverity(Severity severity) { + this.severity = severity; + return this; + } + + public Builder setSeverityText(String severityText) { + this.severityText = severityText; + return this; + } + + public Builder setName(String name) { + this.name = name; + return this; + } + + public Builder setBody(AnyValue body) { + this.body = body; + return this; + } + + public Builder setBody(String body) { + this.body = AnyValue.stringAnyValue(body); + return this; + } + + public Builder setAttributes(Attributes attributes) { + this.attributeBuilder.addAll(attributes); + return this; + } + + /** + * Build a LogRecord instance. + * + * @return value object being built + */ + public LogRecord build() { + if (timeUnixNano == 0) { + timeUnixNano = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()); + } + return new AutoValue_LogRecord( + timeUnixNano, + traceId, + spanId, + flags, + severity, + severityText, + name, + body, + attributeBuilder.build()); + } + } +} diff --git a/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/export/BatchLogProcessor.java b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/export/BatchLogProcessor.java new file mode 100644 index 0000000000..ec4e102c44 --- /dev/null +++ b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/export/BatchLogProcessor.java @@ -0,0 +1,373 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.logging.export; + +import io.opentelemetry.OpenTelemetry; +import io.opentelemetry.common.Labels; +import io.opentelemetry.internal.Utils; +import io.opentelemetry.metrics.LongCounter; +import io.opentelemetry.metrics.LongCounter.BoundLongCounter; +import io.opentelemetry.metrics.Meter; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.DaemonThreadFactory; +import io.opentelemetry.sdk.common.export.ConfigBuilder; +import io.opentelemetry.sdk.logging.LogProcessor; +import io.opentelemetry.sdk.logging.data.LogRecord; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class BatchLogProcessor implements LogProcessor { + private static final String WORKER_THREAD_NAME = + BatchLogProcessor.class.getSimpleName() + "_WorkerThread"; + + private final Worker worker; + private final Thread workerThread; + + private BatchLogProcessor( + int maxQueueSize, + long scheduleDelayMillis, + int maxExportBatchSize, + long exporterTimeoutMillis, + LogExporter logExporter) { + this.worker = + new Worker( + logExporter, + scheduleDelayMillis, + maxExportBatchSize, + exporterTimeoutMillis, + new ArrayBlockingQueue(maxQueueSize)); + this.workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); + this.workerThread.start(); + } + + public static Builder builder(LogExporter logExporter) { + return new Builder(logExporter); + } + + @Override + public void addLogRecord(LogRecord record) { + worker.addLogRecord(record); + } + + @Override + public CompletableResultCode shutdown() { + workerThread.interrupt(); + return worker.shutdown(); + } + + @Override + public CompletableResultCode forceFlush() { + return worker.forceFlush(); + } + + private static class Worker implements Runnable { + static { + Meter meter = OpenTelemetry.getMeter("io.opentelemetry.sdk.logging"); + LongCounter logRecordsProcessed = + meter + .longCounterBuilder("logRecordsProcessed") + .setUnit("1") + .setDescription("Number of records processed") + .build(); + successCounter = logRecordsProcessed.bind(Labels.of("result", "success")); + exporterFailureCounter = + logRecordsProcessed.bind( + Labels.of("result", "dropped record", "cause", "exporter failure")); + queueFullRecordCounter = + logRecordsProcessed.bind(Labels.of("result", "dropped record", "cause", "queue full")); + } + + private static final BoundLongCounter exporterFailureCounter; + private static final BoundLongCounter queueFullRecordCounter; + private static final BoundLongCounter successCounter; + + private final long scheduleDelayNanos; + private final int maxExportBatchSize; + private final LogExporter logExporter; + private final long exporterTimeoutMillis; + private final ArrayList batch; + private final BlockingQueue queue; + + private final AtomicReference flushRequested = new AtomicReference<>(); + private volatile boolean continueWork = true; + private long nextExportTime; + + private Worker( + LogExporter logExporter, + long scheduleDelayMillis, + int maxExportBatchSize, + long exporterTimeoutMillis, + BlockingQueue queue) { + this.logExporter = logExporter; + this.maxExportBatchSize = maxExportBatchSize; + this.exporterTimeoutMillis = exporterTimeoutMillis; + this.scheduleDelayNanos = TimeUnit.MILLISECONDS.toNanos(scheduleDelayMillis); + this.queue = queue; + this.batch = new ArrayList<>(this.maxExportBatchSize); + } + + @Override + public void run() { + updateNextExportTime(); + + while (continueWork) { + if (flushRequested.get() != null) { + flush(); + } + + try { + LogRecord lastElement = queue.poll(100, TimeUnit.MILLISECONDS); + if (lastElement != null) { + batch.add(lastElement); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + + if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { + exportCurrentBatch(); + updateNextExportTime(); + } + } + } + + private void flush() { + int recordsToFlush = queue.size(); + while (recordsToFlush > 0) { + LogRecord record = queue.poll(); + assert record != null; + batch.add(record); + recordsToFlush--; + if (batch.size() >= maxExportBatchSize) { + exportCurrentBatch(); + } + } + exportCurrentBatch(); + CompletableResultCode result = flushRequested.get(); + assert result != null; + flushRequested.set(null); + } + + private void updateNextExportTime() { + nextExportTime = System.nanoTime() + scheduleDelayNanos; + } + + private void exportCurrentBatch() { + if (batch.isEmpty()) { + return; + } + + try { + final CompletableResultCode result = logExporter.export(batch); + result.join(exporterTimeoutMillis, TimeUnit.MILLISECONDS); + if (result.isSuccess()) { + successCounter.add(batch.size()); + } else { + exporterFailureCounter.add(1); + } + } catch (Exception t) { + exporterFailureCounter.add(batch.size()); + } finally { + batch.clear(); + } + } + + private CompletableResultCode shutdown() { + final CompletableResultCode result = new CompletableResultCode(); + final CompletableResultCode flushResult = forceFlush(); + flushResult.whenComplete( + new Runnable() { + @Override + public void run() { + continueWork = false; + final CompletableResultCode shutdownResult = logExporter.shutdown(); + shutdownResult.whenComplete( + new Runnable() { + @Override + public void run() { + if (flushResult.isSuccess() && shutdownResult.isSuccess()) { + result.succeed(); + } else { + result.fail(); + } + } + }); + } + }); + return result; + } + + private CompletableResultCode forceFlush() { + CompletableResultCode flushResult = new CompletableResultCode(); + this.flushRequested.compareAndSet(null, flushResult); + return this.flushRequested.get(); + } + + public void addLogRecord(LogRecord record) { + if (!queue.offer(record)) { + queueFullRecordCounter.add(1); + } + } + } + + public static class Builder extends ConfigBuilder { + private static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 200; + private static final int DEFAULT_MAX_QUEUE_SIZE = 2048; + private static final int DEFAULT_MAX_EXPORT_BATCH_SIZE = 512; + private static final long DEFAULT_EXPORT_TIMEOUT_MILLIS = 30_000; + + private static final String KEY_SCHEDULE_DELAY_MILLIS = "otel.log.schedule.delay"; + private static final String KEY_MAX_QUEUE_SIZE = "otel.log.max.queue"; + private static final String KEY_MAX_EXPORT_BATCH_SIZE = "otel.log.max.export.batch"; + private static final String KEY_EXPORT_TIMEOUT_MILLIS = "otel.log.export.timeout"; + + private final LogExporter logExporter; + private long scheduleDelayMillis = DEFAULT_SCHEDULE_DELAY_MILLIS; + private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE; + private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE; + private long exporterTimeoutMillis = DEFAULT_EXPORT_TIMEOUT_MILLIS; + + public Builder(LogExporter logExporter) { + this.logExporter = Utils.checkNotNull(logExporter, "Exporter argument can not be null"); + } + + public Builder newBuilder(LogExporter logExporter) { + return new Builder(logExporter); + } + + /** + * Build a BatchLogProcessor. + * + * @return configured processor + */ + public BatchLogProcessor build() { + return new BatchLogProcessor( + maxQueueSize, + scheduleDelayMillis, + maxExportBatchSize, + exporterTimeoutMillis, + logExporter); + } + + /** + * Sets the delay interval between two consecutive exports. The actual interval may be shorter + * if the batch size is getting larger than {@code maxQueuedSpans / 2}. + * + *

Default value is {@code 250}ms. + * + * @param scheduleDelayMillis the delay interval between two consecutive exports. + * @return this. + * @see BatchLogProcessor.Builder#DEFAULT_SCHEDULE_DELAY_MILLIS + */ + public BatchLogProcessor.Builder setScheduleDelayMillis(long scheduleDelayMillis) { + this.scheduleDelayMillis = scheduleDelayMillis; + return this; + } + + public long getScheduleDelayMillis() { + return scheduleDelayMillis; + } + + /** + * Sets the maximum time an exporter will be allowed to run before being cancelled. + * + *

Default value is {@code 30000}ms + * + * @param exporterTimeoutMillis the timeout for exports in milliseconds. + * @return this + * @see BatchLogProcessor.Builder#DEFAULT_EXPORT_TIMEOUT_MILLIS + */ + public Builder setExporterTimeoutMillis(int exporterTimeoutMillis) { + this.exporterTimeoutMillis = exporterTimeoutMillis; + return this; + } + + public long getExporterTimeoutMillis() { + return exporterTimeoutMillis; + } + + /** + * Sets the maximum number of Spans that are kept in the queue before start dropping. + * + *

See the BatchSampledSpansProcessor class description for a high-level design description + * of this class. + * + *

Default value is {@code 2048}. + * + * @param maxQueueSize the maximum number of Spans that are kept in the queue before start + * dropping. + * @return this. + * @see BatchLogProcessor.Builder#DEFAULT_MAX_QUEUE_SIZE + */ + public Builder setMaxQueueSize(int maxQueueSize) { + this.maxQueueSize = maxQueueSize; + return this; + } + + public int getMaxQueueSize() { + return maxQueueSize; + } + + /** + * Sets the maximum batch size for every export. This must be smaller or equal to {@code + * maxQueuedSpans}. + * + *

Default value is {@code 512}. + * + * @param maxExportBatchSize the maximum batch size for every export. + * @return this. + * @see BatchLogProcessor.Builder#DEFAULT_MAX_EXPORT_BATCH_SIZE + */ + public Builder setMaxExportBatchSize(int maxExportBatchSize) { + Utils.checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive."); + this.maxExportBatchSize = maxExportBatchSize; + return this; + } + + public int getMaxExportBatchSize() { + return maxExportBatchSize; + } + + @Override + protected Builder fromConfigMap( + Map configMap, NamingConvention namingConvention) { + configMap = namingConvention.normalize(configMap); + Long longValue = getLongProperty(KEY_SCHEDULE_DELAY_MILLIS, configMap); + if (longValue != null) { + this.setScheduleDelayMillis(longValue); + } + Integer intValue = getIntProperty(KEY_MAX_QUEUE_SIZE, configMap); + if (intValue != null) { + this.setMaxQueueSize(intValue); + } + intValue = getIntProperty(KEY_MAX_EXPORT_BATCH_SIZE, configMap); + if (intValue != null) { + this.setMaxExportBatchSize(intValue); + } + intValue = getIntProperty(KEY_EXPORT_TIMEOUT_MILLIS, configMap); + if (intValue != null) { + this.setExporterTimeoutMillis(intValue); + } + return this; + } + } +} diff --git a/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/export/LogExporter.java b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/export/LogExporter.java new file mode 100644 index 0000000000..a62fb4cf6e --- /dev/null +++ b/sdk_extensions/logging/src/main/java/io/opentelemetry/sdk/logging/export/LogExporter.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.logging.export; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.logging.data.LogRecord; +import java.util.Collection; + +/** + * An exporter is responsible for taking a list of {@link LogRecord}s and transmitting them to their + * ultimate destination. + */ +public interface LogExporter { + CompletableResultCode export(Collection records); + + CompletableResultCode shutdown(); +} diff --git a/sdk_extensions/logging/src/test/java/io/opentelemetry/sdk/logging/LogSinkSdkProviderTest.java b/sdk_extensions/logging/src/test/java/io/opentelemetry/sdk/logging/LogSinkSdkProviderTest.java new file mode 100644 index 0000000000..055f10ae9c --- /dev/null +++ b/sdk_extensions/logging/src/test/java/io/opentelemetry/sdk/logging/LogSinkSdkProviderTest.java @@ -0,0 +1,138 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.logging; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.logging.data.LogRecord; +import io.opentelemetry.sdk.logging.data.LogRecord.Severity; +import io.opentelemetry.sdk.logging.export.BatchLogProcessor; +import io.opentelemetry.sdk.logging.util.TestLogExporter; +import io.opentelemetry.sdk.logging.util.TestLogProcessor; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class LogSinkSdkProviderTest { + + private static LogRecord createLog(LogRecord.Severity severity, String message) { + return new LogRecord.Builder() + .setUnixTimeMillis(System.currentTimeMillis()) + .setSeverity(severity) + .setBody(message) + .build(); + } + + @Test + public void testLogSinkSdkProvider() { + TestLogExporter exporter = new TestLogExporter(); + LogProcessor processor = BatchLogProcessor.builder(exporter).build(); + LogSinkSdkProvider provider = new LogSinkSdkProvider.Builder().build(); + provider.addLogProcessor(processor); + LogSink sink = provider.get("test", "0.1a"); + sink.offer(createLog(Severity.ERROR, "test")); + provider.forceFlush().join(500, TimeUnit.MILLISECONDS); + assertThat(exporter.getRecords().size()).isEqualTo(1); + } + + @Test + public void testBatchSize() { + TestLogExporter exporter = new TestLogExporter(); + LogProcessor processor = + BatchLogProcessor.builder(exporter) + .setScheduleDelayMillis(3000) // Long enough to not be in play + .setMaxExportBatchSize(5) + .setMaxQueueSize(10) + .build(); + LogSinkSdkProvider provider = new LogSinkSdkProvider.Builder().build(); + provider.addLogProcessor(processor); + LogSink sink = provider.get("test", "0.1a"); + + for (int i = 0; i < 7; i++) { + sink.offer(createLog(Severity.WARN, "test #" + i)); + } + // Ensure that more than batch size kicks off a flush + await().atMost(500, TimeUnit.MILLISECONDS).until(() -> exporter.getRecords().size() > 0); + // Ensure that everything gets through + CompletableResultCode result = provider.forceFlush(); + result.join(1, TimeUnit.SECONDS); + assertThat(exporter.getCallCount()).isGreaterThanOrEqualTo(2); + } + + @Test + public void testNoBlocking() { + TestLogExporter exporter = new TestLogExporter(); + exporter.setOnCall( + () -> { + try { + Thread.sleep(250); + } catch (InterruptedException ex) { + fail("Exporter wait interrupted", ex); + } + }); + LogProcessor processor = + BatchLogProcessor.builder(exporter) + .setScheduleDelayMillis(3000) // Long enough to not be in play + .setMaxExportBatchSize(5) + .setMaxQueueSize(10) + .build(); + LogSinkSdkProvider provider = new LogSinkSdkProvider.Builder().build(); + provider.addLogProcessor(processor); + LogSink sink = provider.get("test", "0.1a"); + + long start = System.currentTimeMillis(); + int testRecordCount = 700; + for (int i = 0; i < testRecordCount; i++) { + sink.offer(createLog(Severity.WARN, "test #" + i)); + } + long end = System.currentTimeMillis(); + assertThat(end - start).isLessThan(250L); + provider.forceFlush().join(1, TimeUnit.SECONDS); + assertThat(exporter.getRecords().size()).isLessThan(testRecordCount); // We dropped records + } + + @Test + public void testMultipleProcessors() { + TestLogProcessor processorOne = new TestLogProcessor(); + TestLogProcessor processorTwo = new TestLogProcessor(); + LogSinkSdkProvider provider = new LogSinkSdkProvider.Builder().build(); + provider.addLogProcessor(processorOne); + provider.addLogProcessor(processorTwo); + LogSink sink = provider.get("test", "0.1"); + LogRecord record = LogRecord.builder().setBody("test").build(); + sink.offer(record); + assertThat(processorOne.getRecords().size()).isEqualTo(1); + assertThat(processorTwo.getRecords().size()).isEqualTo(1); + assertThat(processorOne.getRecords().get(0)).isEqualTo(record); + assertThat(processorTwo.getRecords().get(0)).isEqualTo(record); + + CompletableResultCode flushResult = provider.forceFlush(); + flushResult.join(1, TimeUnit.SECONDS); + assertThat(processorOne.getFlushes()).isEqualTo(1); + assertThat(processorTwo.getFlushes()).isEqualTo(1); + + CompletableResultCode shutdownResult = provider.shutdown(); + shutdownResult.join(1, TimeUnit.SECONDS); + assertThat(processorOne.shutdownHasBeenCalled()).isEqualTo(true); + assertThat(processorTwo.shutdownHasBeenCalled()).isEqualTo(true); + } +} diff --git a/sdk_extensions/logging/src/test/java/io/opentelemetry/sdk/logging/sdk/BatchLogProcessorTest.java b/sdk_extensions/logging/src/test/java/io/opentelemetry/sdk/logging/sdk/BatchLogProcessorTest.java new file mode 100644 index 0000000000..4d6558ff57 --- /dev/null +++ b/sdk_extensions/logging/src/test/java/io/opentelemetry/sdk/logging/sdk/BatchLogProcessorTest.java @@ -0,0 +1,87 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.logging.sdk; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; +import static org.awaitility.Awaitility.await; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.logging.data.LogRecord; +import io.opentelemetry.sdk.logging.export.BatchLogProcessor; +import io.opentelemetry.sdk.logging.export.LogExporter; +import io.opentelemetry.sdk.logging.util.TestLogExporter; +import java.util.Collection; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.junit.Test; + +public class BatchLogProcessorTest { + @Test + public void testBuilder() { + Properties props = new Properties(); + long delay = 1234L; + int queue = 2345; + int batch = 521; + int timeout = 5432; + + props.put("otel.log.schedule.delay", Long.toString(delay)); + props.put("otel.log.max.queue", Integer.toString(queue)); + props.put("otel.log.max.export.batch", Integer.toString(batch)); + props.put("otel.log.export.timeout", Integer.toString(timeout)); + + BatchLogProcessor.Builder builder = + BatchLogProcessor.builder( + new LogExporter() { + @Override + public CompletableResultCode export(Collection records) { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } + }); + + builder.readProperties(props); + assertThat(builder.getScheduleDelayMillis()).isEqualTo(delay); + assertThat(builder.getMaxQueueSize()).isEqualTo(queue); + assertThat(builder.getMaxExportBatchSize()).isEqualTo(batch); + assertThat(builder.getExporterTimeoutMillis()).isEqualTo(timeout); + } + + @Test + public void testForceExport() { + int batchSize = 10; + int testRecordsToSend = 17; // greater than, but not a multiple of batch + TestLogExporter exporter = new TestLogExporter(); + BatchLogProcessor processor = + BatchLogProcessor.builder(exporter) + .setMaxExportBatchSize(batchSize) + .setMaxQueueSize(20) // more than we will send + .setScheduleDelayMillis(2000) // longer than test + .build(); + for (int i = 0; i < 17; i++) { + LogRecord record = LogRecord.builder().setBody(Integer.toString(i)).build(); + processor.addLogRecord(record); + } + await().until(() -> exporter.getCallCount() > 0); + assertThat(exporter.getRecords().size()).isEqualTo(batchSize); + processor.forceFlush().join(500, TimeUnit.MILLISECONDS); + assertThat(exporter.getRecords().size()).isEqualTo(testRecordsToSend); + } +} diff --git a/sdk_extensions/logging/src/test/java/io/opentelemetry/sdk/logging/util/TestLogExporter.java b/sdk_extensions/logging/src/test/java/io/opentelemetry/sdk/logging/util/TestLogExporter.java new file mode 100644 index 0000000000..52dd08f75a --- /dev/null +++ b/sdk_extensions/logging/src/test/java/io/opentelemetry/sdk/logging/util/TestLogExporter.java @@ -0,0 +1,58 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.logging.util; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.logging.data.LogRecord; +import io.opentelemetry.sdk.logging.export.LogExporter; +import java.util.ArrayList; +import java.util.Collection; +import javax.annotation.Nullable; + +public class TestLogExporter implements LogExporter { + + private final ArrayList records = new ArrayList<>(); + @Nullable private Runnable onCall = null; + private int callCount = 0; + + @Override + public CompletableResultCode export(Collection records) { + this.records.addAll(records); + callCount++; + if (onCall != null) { + onCall.run(); + } + return null; + } + + @Override + public CompletableResultCode shutdown() { + return new CompletableResultCode().succeed(); + } + + public ArrayList getRecords() { + return records; + } + + public void setOnCall(@Nullable Runnable onCall) { + this.onCall = onCall; + } + + public int getCallCount() { + return callCount; + } +} diff --git a/sdk_extensions/logging/src/test/java/io/opentelemetry/sdk/logging/util/TestLogProcessor.java b/sdk_extensions/logging/src/test/java/io/opentelemetry/sdk/logging/util/TestLogProcessor.java new file mode 100644 index 0000000000..c2aa404b14 --- /dev/null +++ b/sdk_extensions/logging/src/test/java/io/opentelemetry/sdk/logging/util/TestLogProcessor.java @@ -0,0 +1,58 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.logging.util; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.logging.LogProcessor; +import io.opentelemetry.sdk.logging.data.LogRecord; +import java.util.ArrayList; +import java.util.List; + +public class TestLogProcessor implements LogProcessor { + private final List records = new ArrayList<>(); + private boolean shutdownCalled = false; + private int flushes = 0; + + @Override + public void addLogRecord(LogRecord record) { + records.add(record); + } + + @Override + public CompletableResultCode shutdown() { + shutdownCalled = true; + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode forceFlush() { + flushes++; + return CompletableResultCode.ofSuccess(); + } + + public List getRecords() { + return records; + } + + public int getFlushes() { + return flushes; + } + + public boolean shutdownHasBeenCalled() { + return shutdownCalled; + } +} diff --git a/settings.gradle b/settings.gradle index bfcdc69c41..f5d4a679d2 100644 --- a/settings.gradle +++ b/settings.gradle @@ -42,6 +42,7 @@ include ":opentelemetry-all", ":opentelemetry-sdk", ":opentelemetry-sdk-extension-async-processor", ":opentelemetry-sdk-extension-aws-v1-support", + ":opentelemetry-sdk-extension-logging", ":opentelemetry-sdk-extension-otproto", ":opentelemetry-sdk-extension-resources", ":opentelemetry-sdk-extension-testbed",