initial commit of logging support extension (#1524)
* initial commit of logging support extension * complete move from extensions to sdk_extensions * address code review comments * move logging_support to logging/support * missed test update in last commit * code format * change list to queue in implementation, collection in api * missed making a utility method static * fix a broken test * Update sdk_extensions/logging/support/src/main/java/io/opentelemetry/logging/api/LogRecord.java Co-authored-by: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> * reworked to align with how we batch process spans * move of CompletableResultCode and formatting * update AnyValue to use long instead of int * address code review comments rename Exporter to LogExporter, move to 'export' package rename LogExporter.accept(data) to LogExporter.export(data) decrease flush interval from 5000 to 200ms return CompletableResultCode from LogExporter.shutdown() change traceId and spanId types to String from byte[] * implement BatchLogProcessor.fromConfigMap * rearrange packages * add readme * respond to review feedback, refactored BatchLogProcessor * added a couple tests * update to use new Attributes, remove support directory, public LogRecord getters Co-authored-by: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com>
This commit is contained in:
parent
68cb1640ad
commit
64c2b8cc65
|
|
@ -0,0 +1,6 @@
|
|||
# OpenTelemetry Experimental Logging Support
|
||||
|
||||
This project contains experimental support for transport of logs via OpenTelemetry. The API
|
||||
presented is intended for the use of logging library adapters to enable resource and request
|
||||
correlation with other observability signals and transport of logs through the OpenTelemetry
|
||||
collector.
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
plugins {
|
||||
id "java"
|
||||
id "maven-publish"
|
||||
|
||||
id "ru.vyarus.animalsniffer"
|
||||
}
|
||||
|
||||
description = 'OpenTelemetry Contrib Logging Support'
|
||||
ext.moduleName = "io.opentelemetry.sdk.extensions.logging"
|
||||
|
||||
dependencies {
|
||||
api project(':opentelemetry-sdk')
|
||||
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.10.4'
|
||||
testImplementation libraries.awaitility
|
||||
|
||||
annotationProcessor libraries.auto_value
|
||||
|
||||
signature "org.codehaus.mojo.signature:java17:1.0@signature"
|
||||
signature "net.sf.androidscents.signature:android-api-level-24:7.0_r2@signature"
|
||||
}
|
||||
|
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Copyright 2020, OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.logging;
|
||||
|
||||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||
import io.opentelemetry.sdk.logging.data.LogRecord;
|
||||
import io.opentelemetry.sdk.trace.TracerSdkProvider;
|
||||
|
||||
public interface LogProcessor {
|
||||
|
||||
void addLogRecord(LogRecord record);
|
||||
|
||||
/**
|
||||
* Called when {@link TracerSdkProvider#shutdown()} is called.
|
||||
*
|
||||
* @return result
|
||||
*/
|
||||
CompletableResultCode shutdown();
|
||||
|
||||
/**
|
||||
* Processes all span events that have not yet been processed.
|
||||
*
|
||||
* @return result
|
||||
*/
|
||||
CompletableResultCode forceFlush();
|
||||
}
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright 2020, OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.logging;
|
||||
|
||||
import io.opentelemetry.sdk.logging.data.LogRecord;
|
||||
|
||||
/** A LogSink accepts logging records for transmission to an aggregator or log processing system. */
|
||||
public interface LogSink {
|
||||
/**
|
||||
* Pass a record to the SDK for transmission to a logging exporter.
|
||||
*
|
||||
* @param record record to transmit
|
||||
*/
|
||||
void offer(LogRecord record);
|
||||
}
|
||||
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Copyright 2020, OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.logging;
|
||||
|
||||
import io.opentelemetry.internal.Utils;
|
||||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||
import io.opentelemetry.sdk.logging.data.LogRecord;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
public class LogSinkSdkProvider {
|
||||
private final LogSink logSink = new SdkLogSink();
|
||||
private final List<LogProcessor> processors = new ArrayList<>();
|
||||
|
||||
private LogSinkSdkProvider() {}
|
||||
|
||||
public LogSink get(String instrumentationName, String instrumentationVersion) {
|
||||
// Currently there is no differentiation by instrumentation library
|
||||
return logSink;
|
||||
}
|
||||
|
||||
public void addLogProcessor(LogProcessor processor) {
|
||||
processors.add(Utils.checkNotNull(processor, "Processor can not be null"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Flushes all attached processors.
|
||||
*
|
||||
* @return result
|
||||
*/
|
||||
public CompletableResultCode forceFlush() {
|
||||
final List<CompletableResultCode> processorResults = new ArrayList<>(processors.size());
|
||||
for (LogProcessor processor : processors) {
|
||||
processorResults.add(processor.forceFlush());
|
||||
}
|
||||
return CompletableResultCode.ofAll(processorResults);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shut down of provider and associated processors.
|
||||
*
|
||||
* @return result
|
||||
*/
|
||||
public CompletableResultCode shutdown() {
|
||||
Collection<CompletableResultCode> processorResults = new ArrayList<>(processors.size());
|
||||
for (LogProcessor processor : processors) {
|
||||
processorResults.add(processor.shutdown());
|
||||
}
|
||||
return CompletableResultCode.ofAll(processorResults);
|
||||
}
|
||||
|
||||
private class SdkLogSink implements LogSink {
|
||||
@Override
|
||||
public void offer(LogRecord record) {
|
||||
for (LogProcessor processor : processors) {
|
||||
processor.addLogRecord(record);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
public LogSinkSdkProvider build() {
|
||||
return new LogSinkSdkProvider();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,276 @@
|
|||
/*
|
||||
* Copyright 2020, OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.logging.data;
|
||||
|
||||
import com.google.auto.value.AutoValue;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.concurrent.Immutable;
|
||||
|
||||
/**
|
||||
* A class that represents all the possible values for a data body. An {@code AnyValue} can have 6
|
||||
* types of values: {@code String}, {@code boolean}, {@code int}, {@code double}, {@code array}, or
|
||||
* {@code kvlist}. represented through {@code AnyValue.Type}. A {@code array} or a {@code kvlist}
|
||||
* can in turn hold other {@code AnyValue} instances, allowing for mapping to JSON-like structures.
|
||||
*
|
||||
* @since 0.9.0
|
||||
*/
|
||||
@Immutable
|
||||
public abstract class AnyValue {
|
||||
|
||||
/** An enum that represents all the possible value types for an {@code AnyValue}. */
|
||||
public enum Type {
|
||||
STRING,
|
||||
BOOL,
|
||||
INT64,
|
||||
DOUBLE,
|
||||
ARRAY,
|
||||
KVLIST
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an {@code AnyValue} with a string value.
|
||||
*
|
||||
* @param stringValue The new value.
|
||||
* @return an {@code AnyValue} with a string value.
|
||||
*/
|
||||
public static AnyValue stringAnyValue(String stringValue) {
|
||||
return AnyValueString.create(stringValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the string value of this {@code AnyValue}. An UnsupportedOperationException will be
|
||||
* thrown if getType() is not {@link AnyValue.Type#STRING}.
|
||||
*
|
||||
* @return the string value of this {@code AttributeValue}.
|
||||
*/
|
||||
public String getStringValue() {
|
||||
throw new UnsupportedOperationException(
|
||||
String.format("This type can only return %s data", getType().name()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an {@code AnyValue} with an int value.
|
||||
*
|
||||
* @param longValue The new value.
|
||||
* @return an {@code AnyValue} with a int value.
|
||||
*/
|
||||
public static AnyValue longAnyValue(long longValue) {
|
||||
return AnyValueLong.create(longValue);
|
||||
}
|
||||
|
||||
public long getLongValue() {
|
||||
throw new UnsupportedOperationException(
|
||||
String.format("This type can only return %s data", getType().name()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an {@code AnyValue} with a bool value.
|
||||
*
|
||||
* @param boolValue The new value.
|
||||
* @return an {@code AnyValue} with a bool value.
|
||||
*/
|
||||
public static AnyValue boolAnyValue(boolean boolValue) {
|
||||
return AnyValueBool.create(boolValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the boolean value of this {@code AnyValue}. An UnsupportedOperationException will be
|
||||
* thrown if getType() is not {@link AnyValue.Type#BOOL}.
|
||||
*
|
||||
* @return the boolean value of this {@code AttributeValue}.
|
||||
*/
|
||||
public boolean getBoolValue() {
|
||||
throw new UnsupportedOperationException(
|
||||
String.format("This type can only return %s data", getType().name()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an {@code AnyValue} with a double value.
|
||||
*
|
||||
* @param doubleValue The new value.
|
||||
* @return an {@code AnyValue} with a double value.
|
||||
*/
|
||||
public static AnyValue doubleAnyValue(double doubleValue) {
|
||||
return AnyValueDouble.create(doubleValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the double value of this {@code AnyValue}. An UnsupportedOperationException will be
|
||||
* thrown if getType() is not {@link AnyValue.Type#DOUBLE}.
|
||||
*
|
||||
* @return the double value of this {@code AttributeValue}.
|
||||
*/
|
||||
public double getDoubleValue() {
|
||||
throw new UnsupportedOperationException(
|
||||
String.format("This type can only return %s data", getType().name()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an {@code AnyValue} with a array value.
|
||||
*
|
||||
* @param values The new value.
|
||||
* @return an {@code AnyValue} with a array value.
|
||||
*/
|
||||
public static AnyValue arrayAnyValue(List<AnyValue> values) {
|
||||
return AnyValueArray.create(values);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the array value of this {@code AnyValue}. An UnsupportedOperationException will be
|
||||
* thrown if getType() is not {@link AnyValue.Type#ARRAY}.
|
||||
*
|
||||
* @return the array value of this {@code AttributeValue}.
|
||||
*/
|
||||
public List<AnyValue> getArrayValue() {
|
||||
throw new UnsupportedOperationException(
|
||||
String.format("This type can only return %s data", getType().name()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an {@code AnyValue} with a kvlist value.
|
||||
*
|
||||
* @param values The new value.
|
||||
* @return an {@code AnyValue} with a kvlist value.
|
||||
*/
|
||||
public static AnyValue kvlistAnyValue(Map<String, AnyValue> values) {
|
||||
return AnyValueKvlist.create(values);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the string value of this {@code AnyValue}. An UnsupportedOperationException will be
|
||||
* thrown if getType() is not {@link AnyValue.Type#STRING}.
|
||||
*
|
||||
* @return the string value of this {@code AttributeValue}.
|
||||
*/
|
||||
public Map<String, AnyValue> getKvlistValue() {
|
||||
throw new UnsupportedOperationException(
|
||||
String.format("This type can only return %s data", getType().name()));
|
||||
}
|
||||
|
||||
public abstract Type getType();
|
||||
|
||||
@Immutable
|
||||
@AutoValue
|
||||
abstract static class AnyValueString extends AnyValue {
|
||||
AnyValueString() {}
|
||||
|
||||
static AnyValue create(String stringValue) {
|
||||
return new AutoValue_AnyValue_AnyValueString(stringValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Type getType() {
|
||||
return Type.STRING;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public abstract String getStringValue();
|
||||
}
|
||||
|
||||
@Immutable
|
||||
@AutoValue
|
||||
abstract static class AnyValueLong extends AnyValue {
|
||||
AnyValueLong() {}
|
||||
|
||||
static AnyValue create(long longValue) {
|
||||
return new AutoValue_AnyValue_AnyValueLong(longValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Type getType() {
|
||||
return Type.INT64;
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract long getLongValue();
|
||||
}
|
||||
|
||||
@Immutable
|
||||
@AutoValue
|
||||
abstract static class AnyValueBool extends AnyValue {
|
||||
AnyValueBool() {}
|
||||
|
||||
static AnyValue create(boolean boolValue) {
|
||||
return new AutoValue_AnyValue_AnyValueBool(boolValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Type getType() {
|
||||
return Type.BOOL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract boolean getBoolValue();
|
||||
}
|
||||
|
||||
@Immutable
|
||||
@AutoValue
|
||||
abstract static class AnyValueDouble extends AnyValue {
|
||||
AnyValueDouble() {}
|
||||
|
||||
static AnyValue create(double doubleValue) {
|
||||
return new AutoValue_AnyValue_AnyValueDouble(doubleValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Type getType() {
|
||||
return Type.DOUBLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract double getDoubleValue();
|
||||
}
|
||||
|
||||
@Immutable
|
||||
@AutoValue
|
||||
abstract static class AnyValueArray extends AnyValue {
|
||||
AnyValueArray() {}
|
||||
|
||||
static AnyValue create(List<AnyValue> arrayValue) {
|
||||
return new AutoValue_AnyValue_AnyValueArray(arrayValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Type getType() {
|
||||
return Type.ARRAY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract List<AnyValue> getArrayValue();
|
||||
}
|
||||
|
||||
@Immutable
|
||||
@AutoValue
|
||||
abstract static class AnyValueKvlist extends AnyValue {
|
||||
AnyValueKvlist() {}
|
||||
|
||||
static AnyValue create(Map<String, AnyValue> kvlistValue) {
|
||||
return new AutoValue_AnyValue_AnyValueKvlist(kvlistValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Type getType() {
|
||||
return Type.KVLIST;
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract Map<String, AnyValue> getKvlistValue();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,181 @@
|
|||
/*
|
||||
* Copyright 2020, OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.logging.data;
|
||||
|
||||
import com.google.auto.value.AutoValue;
|
||||
import io.opentelemetry.common.Attributes;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* A LogRecord is an implementation of the <a
|
||||
* href="https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/logs/data-model.md">
|
||||
* OpenTelemetry logging model</a>.
|
||||
*/
|
||||
@AutoValue
|
||||
public abstract class LogRecord {
|
||||
|
||||
public abstract long getTimeUnixNano();
|
||||
|
||||
public abstract String getTraceId();
|
||||
|
||||
public abstract String getSpanId();
|
||||
|
||||
public abstract int getFlags();
|
||||
|
||||
public abstract Severity getSeverity();
|
||||
|
||||
@Nullable
|
||||
public abstract String getSeverityText();
|
||||
|
||||
@Nullable
|
||||
public abstract String getName();
|
||||
|
||||
public abstract AnyValue getBody();
|
||||
|
||||
public abstract Attributes getAttributes();
|
||||
|
||||
public enum Severity {
|
||||
UNDEFINED_SEVERITY_NUMBER(0),
|
||||
TRACE(1),
|
||||
TRACE2(2),
|
||||
TRACE3(3),
|
||||
TRACE4(4),
|
||||
DEBUG(5),
|
||||
DEBUG2(6),
|
||||
DEBUG3(7),
|
||||
DEBUG4(8),
|
||||
INFO(9),
|
||||
INFO2(10),
|
||||
INFO3(11),
|
||||
INFO4(12),
|
||||
WARN(13),
|
||||
WARN2(14),
|
||||
WARN3(15),
|
||||
WARN4(16),
|
||||
ERROR(17),
|
||||
ERROR2(18),
|
||||
ERROR3(19),
|
||||
ERROR4(20),
|
||||
FATAL(21),
|
||||
FATAL2(22),
|
||||
FATAL3(23),
|
||||
FATAL4(24),
|
||||
;
|
||||
|
||||
private final int severityNumber;
|
||||
|
||||
Severity(int severityNumber) {
|
||||
this.severityNumber = severityNumber;
|
||||
}
|
||||
|
||||
public int getSeverityNumber() {
|
||||
return severityNumber;
|
||||
}
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private long timeUnixNano;
|
||||
private String traceId = "";
|
||||
private String spanId = "";
|
||||
private int flags;
|
||||
private Severity severity = Severity.UNDEFINED_SEVERITY_NUMBER;
|
||||
private String severityText;
|
||||
private String name;
|
||||
private AnyValue body = AnyValue.stringAnyValue("");
|
||||
private final Attributes.Builder attributeBuilder = Attributes.newBuilder();
|
||||
|
||||
public Builder setUnixTimeNano(long timestamp) {
|
||||
this.timeUnixNano = timestamp;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setUnixTimeMillis(long timestamp) {
|
||||
return setUnixTimeNano(TimeUnit.MILLISECONDS.toNanos(timestamp));
|
||||
}
|
||||
|
||||
public Builder setTraceId(String traceId) {
|
||||
this.traceId = traceId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setSpanId(String spanId) {
|
||||
this.spanId = spanId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setFlags(int flags) {
|
||||
this.flags = flags;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setSeverity(Severity severity) {
|
||||
this.severity = severity;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setSeverityText(String severityText) {
|
||||
this.severityText = severityText;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setName(String name) {
|
||||
this.name = name;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setBody(AnyValue body) {
|
||||
this.body = body;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setBody(String body) {
|
||||
this.body = AnyValue.stringAnyValue(body);
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setAttributes(Attributes attributes) {
|
||||
this.attributeBuilder.addAll(attributes);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a LogRecord instance.
|
||||
*
|
||||
* @return value object being built
|
||||
*/
|
||||
public LogRecord build() {
|
||||
if (timeUnixNano == 0) {
|
||||
timeUnixNano = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis());
|
||||
}
|
||||
return new AutoValue_LogRecord(
|
||||
timeUnixNano,
|
||||
traceId,
|
||||
spanId,
|
||||
flags,
|
||||
severity,
|
||||
severityText,
|
||||
name,
|
||||
body,
|
||||
attributeBuilder.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,373 @@
|
|||
/*
|
||||
* Copyright 2020, OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.logging.export;
|
||||
|
||||
import io.opentelemetry.OpenTelemetry;
|
||||
import io.opentelemetry.common.Labels;
|
||||
import io.opentelemetry.internal.Utils;
|
||||
import io.opentelemetry.metrics.LongCounter;
|
||||
import io.opentelemetry.metrics.LongCounter.BoundLongCounter;
|
||||
import io.opentelemetry.metrics.Meter;
|
||||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||
import io.opentelemetry.sdk.common.DaemonThreadFactory;
|
||||
import io.opentelemetry.sdk.common.export.ConfigBuilder;
|
||||
import io.opentelemetry.sdk.logging.LogProcessor;
|
||||
import io.opentelemetry.sdk.logging.data.LogRecord;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class BatchLogProcessor implements LogProcessor {
|
||||
private static final String WORKER_THREAD_NAME =
|
||||
BatchLogProcessor.class.getSimpleName() + "_WorkerThread";
|
||||
|
||||
private final Worker worker;
|
||||
private final Thread workerThread;
|
||||
|
||||
private BatchLogProcessor(
|
||||
int maxQueueSize,
|
||||
long scheduleDelayMillis,
|
||||
int maxExportBatchSize,
|
||||
long exporterTimeoutMillis,
|
||||
LogExporter logExporter) {
|
||||
this.worker =
|
||||
new Worker(
|
||||
logExporter,
|
||||
scheduleDelayMillis,
|
||||
maxExportBatchSize,
|
||||
exporterTimeoutMillis,
|
||||
new ArrayBlockingQueue<LogRecord>(maxQueueSize));
|
||||
this.workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
|
||||
this.workerThread.start();
|
||||
}
|
||||
|
||||
public static Builder builder(LogExporter logExporter) {
|
||||
return new Builder(logExporter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addLogRecord(LogRecord record) {
|
||||
worker.addLogRecord(record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableResultCode shutdown() {
|
||||
workerThread.interrupt();
|
||||
return worker.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableResultCode forceFlush() {
|
||||
return worker.forceFlush();
|
||||
}
|
||||
|
||||
private static class Worker implements Runnable {
|
||||
static {
|
||||
Meter meter = OpenTelemetry.getMeter("io.opentelemetry.sdk.logging");
|
||||
LongCounter logRecordsProcessed =
|
||||
meter
|
||||
.longCounterBuilder("logRecordsProcessed")
|
||||
.setUnit("1")
|
||||
.setDescription("Number of records processed")
|
||||
.build();
|
||||
successCounter = logRecordsProcessed.bind(Labels.of("result", "success"));
|
||||
exporterFailureCounter =
|
||||
logRecordsProcessed.bind(
|
||||
Labels.of("result", "dropped record", "cause", "exporter failure"));
|
||||
queueFullRecordCounter =
|
||||
logRecordsProcessed.bind(Labels.of("result", "dropped record", "cause", "queue full"));
|
||||
}
|
||||
|
||||
private static final BoundLongCounter exporterFailureCounter;
|
||||
private static final BoundLongCounter queueFullRecordCounter;
|
||||
private static final BoundLongCounter successCounter;
|
||||
|
||||
private final long scheduleDelayNanos;
|
||||
private final int maxExportBatchSize;
|
||||
private final LogExporter logExporter;
|
||||
private final long exporterTimeoutMillis;
|
||||
private final ArrayList<LogRecord> batch;
|
||||
private final BlockingQueue<LogRecord> queue;
|
||||
|
||||
private final AtomicReference<CompletableResultCode> flushRequested = new AtomicReference<>();
|
||||
private volatile boolean continueWork = true;
|
||||
private long nextExportTime;
|
||||
|
||||
private Worker(
|
||||
LogExporter logExporter,
|
||||
long scheduleDelayMillis,
|
||||
int maxExportBatchSize,
|
||||
long exporterTimeoutMillis,
|
||||
BlockingQueue<LogRecord> queue) {
|
||||
this.logExporter = logExporter;
|
||||
this.maxExportBatchSize = maxExportBatchSize;
|
||||
this.exporterTimeoutMillis = exporterTimeoutMillis;
|
||||
this.scheduleDelayNanos = TimeUnit.MILLISECONDS.toNanos(scheduleDelayMillis);
|
||||
this.queue = queue;
|
||||
this.batch = new ArrayList<>(this.maxExportBatchSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
updateNextExportTime();
|
||||
|
||||
while (continueWork) {
|
||||
if (flushRequested.get() != null) {
|
||||
flush();
|
||||
}
|
||||
|
||||
try {
|
||||
LogRecord lastElement = queue.poll(100, TimeUnit.MILLISECONDS);
|
||||
if (lastElement != null) {
|
||||
batch.add(lastElement);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
|
||||
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) {
|
||||
exportCurrentBatch();
|
||||
updateNextExportTime();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void flush() {
|
||||
int recordsToFlush = queue.size();
|
||||
while (recordsToFlush > 0) {
|
||||
LogRecord record = queue.poll();
|
||||
assert record != null;
|
||||
batch.add(record);
|
||||
recordsToFlush--;
|
||||
if (batch.size() >= maxExportBatchSize) {
|
||||
exportCurrentBatch();
|
||||
}
|
||||
}
|
||||
exportCurrentBatch();
|
||||
CompletableResultCode result = flushRequested.get();
|
||||
assert result != null;
|
||||
flushRequested.set(null);
|
||||
}
|
||||
|
||||
private void updateNextExportTime() {
|
||||
nextExportTime = System.nanoTime() + scheduleDelayNanos;
|
||||
}
|
||||
|
||||
private void exportCurrentBatch() {
|
||||
if (batch.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
final CompletableResultCode result = logExporter.export(batch);
|
||||
result.join(exporterTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||
if (result.isSuccess()) {
|
||||
successCounter.add(batch.size());
|
||||
} else {
|
||||
exporterFailureCounter.add(1);
|
||||
}
|
||||
} catch (Exception t) {
|
||||
exporterFailureCounter.add(batch.size());
|
||||
} finally {
|
||||
batch.clear();
|
||||
}
|
||||
}
|
||||
|
||||
private CompletableResultCode shutdown() {
|
||||
final CompletableResultCode result = new CompletableResultCode();
|
||||
final CompletableResultCode flushResult = forceFlush();
|
||||
flushResult.whenComplete(
|
||||
new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
continueWork = false;
|
||||
final CompletableResultCode shutdownResult = logExporter.shutdown();
|
||||
shutdownResult.whenComplete(
|
||||
new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (flushResult.isSuccess() && shutdownResult.isSuccess()) {
|
||||
result.succeed();
|
||||
} else {
|
||||
result.fail();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
private CompletableResultCode forceFlush() {
|
||||
CompletableResultCode flushResult = new CompletableResultCode();
|
||||
this.flushRequested.compareAndSet(null, flushResult);
|
||||
return this.flushRequested.get();
|
||||
}
|
||||
|
||||
public void addLogRecord(LogRecord record) {
|
||||
if (!queue.offer(record)) {
|
||||
queueFullRecordCounter.add(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class Builder extends ConfigBuilder<Builder> {
|
||||
private static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 200;
|
||||
private static final int DEFAULT_MAX_QUEUE_SIZE = 2048;
|
||||
private static final int DEFAULT_MAX_EXPORT_BATCH_SIZE = 512;
|
||||
private static final long DEFAULT_EXPORT_TIMEOUT_MILLIS = 30_000;
|
||||
|
||||
private static final String KEY_SCHEDULE_DELAY_MILLIS = "otel.log.schedule.delay";
|
||||
private static final String KEY_MAX_QUEUE_SIZE = "otel.log.max.queue";
|
||||
private static final String KEY_MAX_EXPORT_BATCH_SIZE = "otel.log.max.export.batch";
|
||||
private static final String KEY_EXPORT_TIMEOUT_MILLIS = "otel.log.export.timeout";
|
||||
|
||||
private final LogExporter logExporter;
|
||||
private long scheduleDelayMillis = DEFAULT_SCHEDULE_DELAY_MILLIS;
|
||||
private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
|
||||
private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE;
|
||||
private long exporterTimeoutMillis = DEFAULT_EXPORT_TIMEOUT_MILLIS;
|
||||
|
||||
public Builder(LogExporter logExporter) {
|
||||
this.logExporter = Utils.checkNotNull(logExporter, "Exporter argument can not be null");
|
||||
}
|
||||
|
||||
public Builder newBuilder(LogExporter logExporter) {
|
||||
return new Builder(logExporter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a BatchLogProcessor.
|
||||
*
|
||||
* @return configured processor
|
||||
*/
|
||||
public BatchLogProcessor build() {
|
||||
return new BatchLogProcessor(
|
||||
maxQueueSize,
|
||||
scheduleDelayMillis,
|
||||
maxExportBatchSize,
|
||||
exporterTimeoutMillis,
|
||||
logExporter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the delay interval between two consecutive exports. The actual interval may be shorter
|
||||
* if the batch size is getting larger than {@code maxQueuedSpans / 2}.
|
||||
*
|
||||
* <p>Default value is {@code 250}ms.
|
||||
*
|
||||
* @param scheduleDelayMillis the delay interval between two consecutive exports.
|
||||
* @return this.
|
||||
* @see BatchLogProcessor.Builder#DEFAULT_SCHEDULE_DELAY_MILLIS
|
||||
*/
|
||||
public BatchLogProcessor.Builder setScheduleDelayMillis(long scheduleDelayMillis) {
|
||||
this.scheduleDelayMillis = scheduleDelayMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
public long getScheduleDelayMillis() {
|
||||
return scheduleDelayMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum time an exporter will be allowed to run before being cancelled.
|
||||
*
|
||||
* <p>Default value is {@code 30000}ms
|
||||
*
|
||||
* @param exporterTimeoutMillis the timeout for exports in milliseconds.
|
||||
* @return this
|
||||
* @see BatchLogProcessor.Builder#DEFAULT_EXPORT_TIMEOUT_MILLIS
|
||||
*/
|
||||
public Builder setExporterTimeoutMillis(int exporterTimeoutMillis) {
|
||||
this.exporterTimeoutMillis = exporterTimeoutMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
public long getExporterTimeoutMillis() {
|
||||
return exporterTimeoutMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum number of Spans that are kept in the queue before start dropping.
|
||||
*
|
||||
* <p>See the BatchSampledSpansProcessor class description for a high-level design description
|
||||
* of this class.
|
||||
*
|
||||
* <p>Default value is {@code 2048}.
|
||||
*
|
||||
* @param maxQueueSize the maximum number of Spans that are kept in the queue before start
|
||||
* dropping.
|
||||
* @return this.
|
||||
* @see BatchLogProcessor.Builder#DEFAULT_MAX_QUEUE_SIZE
|
||||
*/
|
||||
public Builder setMaxQueueSize(int maxQueueSize) {
|
||||
this.maxQueueSize = maxQueueSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMaxQueueSize() {
|
||||
return maxQueueSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum batch size for every export. This must be smaller or equal to {@code
|
||||
* maxQueuedSpans}.
|
||||
*
|
||||
* <p>Default value is {@code 512}.
|
||||
*
|
||||
* @param maxExportBatchSize the maximum batch size for every export.
|
||||
* @return this.
|
||||
* @see BatchLogProcessor.Builder#DEFAULT_MAX_EXPORT_BATCH_SIZE
|
||||
*/
|
||||
public Builder setMaxExportBatchSize(int maxExportBatchSize) {
|
||||
Utils.checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive.");
|
||||
this.maxExportBatchSize = maxExportBatchSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMaxExportBatchSize() {
|
||||
return maxExportBatchSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Builder fromConfigMap(
|
||||
Map<String, String> configMap, NamingConvention namingConvention) {
|
||||
configMap = namingConvention.normalize(configMap);
|
||||
Long longValue = getLongProperty(KEY_SCHEDULE_DELAY_MILLIS, configMap);
|
||||
if (longValue != null) {
|
||||
this.setScheduleDelayMillis(longValue);
|
||||
}
|
||||
Integer intValue = getIntProperty(KEY_MAX_QUEUE_SIZE, configMap);
|
||||
if (intValue != null) {
|
||||
this.setMaxQueueSize(intValue);
|
||||
}
|
||||
intValue = getIntProperty(KEY_MAX_EXPORT_BATCH_SIZE, configMap);
|
||||
if (intValue != null) {
|
||||
this.setMaxExportBatchSize(intValue);
|
||||
}
|
||||
intValue = getIntProperty(KEY_EXPORT_TIMEOUT_MILLIS, configMap);
|
||||
if (intValue != null) {
|
||||
this.setExporterTimeoutMillis(intValue);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright 2020, OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.logging.export;
|
||||
|
||||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||
import io.opentelemetry.sdk.logging.data.LogRecord;
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* An exporter is responsible for taking a list of {@link LogRecord}s and transmitting them to their
|
||||
* ultimate destination.
|
||||
*/
|
||||
public interface LogExporter {
|
||||
CompletableResultCode export(Collection<LogRecord> records);
|
||||
|
||||
CompletableResultCode shutdown();
|
||||
}
|
||||
|
|
@ -0,0 +1,138 @@
|
|||
/*
|
||||
* Copyright 2020, OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.logging;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||
import io.opentelemetry.sdk.logging.data.LogRecord;
|
||||
import io.opentelemetry.sdk.logging.data.LogRecord.Severity;
|
||||
import io.opentelemetry.sdk.logging.export.BatchLogProcessor;
|
||||
import io.opentelemetry.sdk.logging.util.TestLogExporter;
|
||||
import io.opentelemetry.sdk.logging.util.TestLogProcessor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
@RunWith(JUnit4.class)
|
||||
public class LogSinkSdkProviderTest {
|
||||
|
||||
private static LogRecord createLog(LogRecord.Severity severity, String message) {
|
||||
return new LogRecord.Builder()
|
||||
.setUnixTimeMillis(System.currentTimeMillis())
|
||||
.setSeverity(severity)
|
||||
.setBody(message)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLogSinkSdkProvider() {
|
||||
TestLogExporter exporter = new TestLogExporter();
|
||||
LogProcessor processor = BatchLogProcessor.builder(exporter).build();
|
||||
LogSinkSdkProvider provider = new LogSinkSdkProvider.Builder().build();
|
||||
provider.addLogProcessor(processor);
|
||||
LogSink sink = provider.get("test", "0.1a");
|
||||
sink.offer(createLog(Severity.ERROR, "test"));
|
||||
provider.forceFlush().join(500, TimeUnit.MILLISECONDS);
|
||||
assertThat(exporter.getRecords().size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchSize() {
|
||||
TestLogExporter exporter = new TestLogExporter();
|
||||
LogProcessor processor =
|
||||
BatchLogProcessor.builder(exporter)
|
||||
.setScheduleDelayMillis(3000) // Long enough to not be in play
|
||||
.setMaxExportBatchSize(5)
|
||||
.setMaxQueueSize(10)
|
||||
.build();
|
||||
LogSinkSdkProvider provider = new LogSinkSdkProvider.Builder().build();
|
||||
provider.addLogProcessor(processor);
|
||||
LogSink sink = provider.get("test", "0.1a");
|
||||
|
||||
for (int i = 0; i < 7; i++) {
|
||||
sink.offer(createLog(Severity.WARN, "test #" + i));
|
||||
}
|
||||
// Ensure that more than batch size kicks off a flush
|
||||
await().atMost(500, TimeUnit.MILLISECONDS).until(() -> exporter.getRecords().size() > 0);
|
||||
// Ensure that everything gets through
|
||||
CompletableResultCode result = provider.forceFlush();
|
||||
result.join(1, TimeUnit.SECONDS);
|
||||
assertThat(exporter.getCallCount()).isGreaterThanOrEqualTo(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoBlocking() {
|
||||
TestLogExporter exporter = new TestLogExporter();
|
||||
exporter.setOnCall(
|
||||
() -> {
|
||||
try {
|
||||
Thread.sleep(250);
|
||||
} catch (InterruptedException ex) {
|
||||
fail("Exporter wait interrupted", ex);
|
||||
}
|
||||
});
|
||||
LogProcessor processor =
|
||||
BatchLogProcessor.builder(exporter)
|
||||
.setScheduleDelayMillis(3000) // Long enough to not be in play
|
||||
.setMaxExportBatchSize(5)
|
||||
.setMaxQueueSize(10)
|
||||
.build();
|
||||
LogSinkSdkProvider provider = new LogSinkSdkProvider.Builder().build();
|
||||
provider.addLogProcessor(processor);
|
||||
LogSink sink = provider.get("test", "0.1a");
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
int testRecordCount = 700;
|
||||
for (int i = 0; i < testRecordCount; i++) {
|
||||
sink.offer(createLog(Severity.WARN, "test #" + i));
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
assertThat(end - start).isLessThan(250L);
|
||||
provider.forceFlush().join(1, TimeUnit.SECONDS);
|
||||
assertThat(exporter.getRecords().size()).isLessThan(testRecordCount); // We dropped records
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleProcessors() {
|
||||
TestLogProcessor processorOne = new TestLogProcessor();
|
||||
TestLogProcessor processorTwo = new TestLogProcessor();
|
||||
LogSinkSdkProvider provider = new LogSinkSdkProvider.Builder().build();
|
||||
provider.addLogProcessor(processorOne);
|
||||
provider.addLogProcessor(processorTwo);
|
||||
LogSink sink = provider.get("test", "0.1");
|
||||
LogRecord record = LogRecord.builder().setBody("test").build();
|
||||
sink.offer(record);
|
||||
assertThat(processorOne.getRecords().size()).isEqualTo(1);
|
||||
assertThat(processorTwo.getRecords().size()).isEqualTo(1);
|
||||
assertThat(processorOne.getRecords().get(0)).isEqualTo(record);
|
||||
assertThat(processorTwo.getRecords().get(0)).isEqualTo(record);
|
||||
|
||||
CompletableResultCode flushResult = provider.forceFlush();
|
||||
flushResult.join(1, TimeUnit.SECONDS);
|
||||
assertThat(processorOne.getFlushes()).isEqualTo(1);
|
||||
assertThat(processorTwo.getFlushes()).isEqualTo(1);
|
||||
|
||||
CompletableResultCode shutdownResult = provider.shutdown();
|
||||
shutdownResult.join(1, TimeUnit.SECONDS);
|
||||
assertThat(processorOne.shutdownHasBeenCalled()).isEqualTo(true);
|
||||
assertThat(processorTwo.shutdownHasBeenCalled()).isEqualTo(true);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Copyright 2020, OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.logging.sdk;
|
||||
|
||||
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||
import io.opentelemetry.sdk.logging.data.LogRecord;
|
||||
import io.opentelemetry.sdk.logging.export.BatchLogProcessor;
|
||||
import io.opentelemetry.sdk.logging.export.LogExporter;
|
||||
import io.opentelemetry.sdk.logging.util.TestLogExporter;
|
||||
import java.util.Collection;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.junit.Test;
|
||||
|
||||
public class BatchLogProcessorTest {
|
||||
@Test
|
||||
public void testBuilder() {
|
||||
Properties props = new Properties();
|
||||
long delay = 1234L;
|
||||
int queue = 2345;
|
||||
int batch = 521;
|
||||
int timeout = 5432;
|
||||
|
||||
props.put("otel.log.schedule.delay", Long.toString(delay));
|
||||
props.put("otel.log.max.queue", Integer.toString(queue));
|
||||
props.put("otel.log.max.export.batch", Integer.toString(batch));
|
||||
props.put("otel.log.export.timeout", Integer.toString(timeout));
|
||||
|
||||
BatchLogProcessor.Builder builder =
|
||||
BatchLogProcessor.builder(
|
||||
new LogExporter() {
|
||||
@Override
|
||||
public CompletableResultCode export(Collection<LogRecord> records) {
|
||||
return CompletableResultCode.ofSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableResultCode shutdown() {
|
||||
return CompletableResultCode.ofSuccess();
|
||||
}
|
||||
});
|
||||
|
||||
builder.readProperties(props);
|
||||
assertThat(builder.getScheduleDelayMillis()).isEqualTo(delay);
|
||||
assertThat(builder.getMaxQueueSize()).isEqualTo(queue);
|
||||
assertThat(builder.getMaxExportBatchSize()).isEqualTo(batch);
|
||||
assertThat(builder.getExporterTimeoutMillis()).isEqualTo(timeout);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testForceExport() {
|
||||
int batchSize = 10;
|
||||
int testRecordsToSend = 17; // greater than, but not a multiple of batch
|
||||
TestLogExporter exporter = new TestLogExporter();
|
||||
BatchLogProcessor processor =
|
||||
BatchLogProcessor.builder(exporter)
|
||||
.setMaxExportBatchSize(batchSize)
|
||||
.setMaxQueueSize(20) // more than we will send
|
||||
.setScheduleDelayMillis(2000) // longer than test
|
||||
.build();
|
||||
for (int i = 0; i < 17; i++) {
|
||||
LogRecord record = LogRecord.builder().setBody(Integer.toString(i)).build();
|
||||
processor.addLogRecord(record);
|
||||
}
|
||||
await().until(() -> exporter.getCallCount() > 0);
|
||||
assertThat(exporter.getRecords().size()).isEqualTo(batchSize);
|
||||
processor.forceFlush().join(500, TimeUnit.MILLISECONDS);
|
||||
assertThat(exporter.getRecords().size()).isEqualTo(testRecordsToSend);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Copyright 2020, OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.logging.util;
|
||||
|
||||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||
import io.opentelemetry.sdk.logging.data.LogRecord;
|
||||
import io.opentelemetry.sdk.logging.export.LogExporter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public class TestLogExporter implements LogExporter {
|
||||
|
||||
private final ArrayList<LogRecord> records = new ArrayList<>();
|
||||
@Nullable private Runnable onCall = null;
|
||||
private int callCount = 0;
|
||||
|
||||
@Override
|
||||
public CompletableResultCode export(Collection<LogRecord> records) {
|
||||
this.records.addAll(records);
|
||||
callCount++;
|
||||
if (onCall != null) {
|
||||
onCall.run();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableResultCode shutdown() {
|
||||
return new CompletableResultCode().succeed();
|
||||
}
|
||||
|
||||
public ArrayList<LogRecord> getRecords() {
|
||||
return records;
|
||||
}
|
||||
|
||||
public void setOnCall(@Nullable Runnable onCall) {
|
||||
this.onCall = onCall;
|
||||
}
|
||||
|
||||
public int getCallCount() {
|
||||
return callCount;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Copyright 2020, OpenTelemetry Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.opentelemetry.sdk.logging.util;
|
||||
|
||||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||
import io.opentelemetry.sdk.logging.LogProcessor;
|
||||
import io.opentelemetry.sdk.logging.data.LogRecord;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class TestLogProcessor implements LogProcessor {
|
||||
private final List<LogRecord> records = new ArrayList<>();
|
||||
private boolean shutdownCalled = false;
|
||||
private int flushes = 0;
|
||||
|
||||
@Override
|
||||
public void addLogRecord(LogRecord record) {
|
||||
records.add(record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableResultCode shutdown() {
|
||||
shutdownCalled = true;
|
||||
return CompletableResultCode.ofSuccess();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableResultCode forceFlush() {
|
||||
flushes++;
|
||||
return CompletableResultCode.ofSuccess();
|
||||
}
|
||||
|
||||
public List<LogRecord> getRecords() {
|
||||
return records;
|
||||
}
|
||||
|
||||
public int getFlushes() {
|
||||
return flushes;
|
||||
}
|
||||
|
||||
public boolean shutdownHasBeenCalled() {
|
||||
return shutdownCalled;
|
||||
}
|
||||
}
|
||||
|
|
@ -42,6 +42,7 @@ include ":opentelemetry-all",
|
|||
":opentelemetry-sdk",
|
||||
":opentelemetry-sdk-extension-async-processor",
|
||||
":opentelemetry-sdk-extension-aws-v1-support",
|
||||
":opentelemetry-sdk-extension-logging",
|
||||
":opentelemetry-sdk-extension-otproto",
|
||||
":opentelemetry-sdk-extension-resources",
|
||||
":opentelemetry-sdk-extension-testbed",
|
||||
|
|
|
|||
Loading…
Reference in New Issue