opentelemetry : Implement metrics (#10593)

Adds a new module grpc-opentelemetry that integrates OpenTelemetry and focuses on metrics.

OpenTelemetry APIs are used for instrumenting metrics collection. Users are expected to provide SDK with implementations.

If no SDK is passed, by default gRPC uses OpenTelemetry.noop().
This commit is contained in:
Vindhya Ningegowda 2023-11-13 10:58:16 -08:00 committed by GitHub
parent 84baad12fc
commit d5544bbb02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1916 additions and 1 deletions

View File

@ -220,7 +220,7 @@ public final class GrpcUtil {
public static final Splitter ACCEPT_ENCODING_SPLITTER = Splitter.on(',').trimResults();
private static final String IMPLEMENTATION_VERSION = "1.60.0-SNAPSHOT"; // CURRENT_GRPC_VERSION
public static final String IMPLEMENTATION_VERSION = "1.60.0-SNAPSHOT"; // CURRENT_GRPC_VERSION
/**
* The default timeout in nanos for a keepalive ping request.

View File

@ -18,6 +18,7 @@ netty = '4.1.100.Final'
# SECURITY.md
nettytcnative = '2.0.61.Final'
opencensus = "0.31.1"
opentelemetry = "1.31.0"
protobuf = "3.24.0"
[libraries]
@ -80,6 +81,8 @@ opencensus-exporter-stats-stackdriver = { module = "io.opencensus:opencensus-exp
opencensus-exporter-trace-stackdriver = { module = "io.opencensus:opencensus-exporter-trace-stackdriver", version.ref = "opencensus" }
opencensus-impl = { module = "io.opencensus:opencensus-impl", version.ref = "opencensus" }
opencensus-proto = "io.opencensus:opencensus-proto:0.2.0"
opentelemetry-api = { module = "io.opentelemetry:opentelemetry-api", version.ref = "opentelemetry" }
opentelemetry-sdk-testing = { module = "io.opentelemetry:opentelemetry-sdk-testing", version.ref = "opentelemetry" }
perfmark-api = "io.perfmark:perfmark-api:0.26.0"
protobuf-java = { module = "com.google.protobuf:protobuf-java", version.ref = "protobuf" }
protobuf-java-util = { module = "com.google.protobuf:protobuf-java-util", version.ref = "protobuf" }

View File

@ -0,0 +1,37 @@
plugins {
id "java-library"
}
description = 'gRPC: OpenTelemetry'
dependencies {
api project(':grpc-api')
implementation libraries.guava,
project(':grpc-core'),
libraries.opentelemetry.api,
libraries.auto.value.annotations
testImplementation testFixtures(project(':grpc-core')),
project(':grpc-testing'),
libraries.opentelemetry.sdk.testing,
"org.assertj:assertj-core:3.24.2"
annotationProcessor libraries.auto.value
}
tasks.named("jar").configure {
manifest {
attributes('Automatic-Module-Name': 'io.grpc.opentelemetry')
}
}
tasks.named("compileJava").configure {
it.options.compilerArgs += [
// only has AutoValue annotation processor
"-Xlint:-processing"
]
appendToProperty(
it.options.errorprone.excludedPaths,
".*/build/generated/sources/annotationProcessor/java/.*",
"|")
}

View File

@ -0,0 +1,480 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.opentelemetry;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.Deadline;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StreamTracer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* Provides factories for {@link StreamTracer} that records metrics to OpenTelemetry.
*
* <p>On the client-side, a factory is created for each call, and the factory creates a stream
* tracer for each attempt. If there is no stream created when the call is ended, we still create a
* tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats
* of the overall RPC, such as RETRIES_PER_CALL, to OpenTelemetry.
*
* <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
* starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call, and
* it's the tracer that reports the summary to OpenTelemetry.
*/
final class OpenTelemetryMetricsModule {
private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
// Using floating point because TimeUnit.NANOSECONDS.toSeconds would discard
// fractional seconds.
private static final double SECONDS_PER_NANO = 1e-9;
private final OpenTelemetryMetricsResource resource;
private final Supplier<Stopwatch> stopwatchSupplier;
OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
OpenTelemetryMetricsResource resource) {
this.resource = checkNotNull(resource, "resource");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
}
/**
* Returns the server tracer factory.
*/
ServerStreamTracer.Factory getServerTracerFactory() {
return new ServerTracerFactory();
}
/**
* Returns the client interceptor that facilitates OpenTelemetry metrics reporting.
*/
ClientInterceptor getClientInterceptor() {
return new MetricsClientInterceptor();
}
static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) {
return isGeneratedMethod ? fullMethodName : "other";
}
private static final class ClientTracer extends ClientStreamTracer {
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
/*
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
* JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicLongFieldUpdater<ClientTracer> tmpOutboundWireSizeUpdater;
AtomicLongFieldUpdater<ClientTracer> tmpInboundWireSizeUpdater;
try {
tmpOutboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize");
tmpInboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpOutboundWireSizeUpdater = null;
tmpInboundWireSizeUpdater = null;
}
outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
}
final Stopwatch stopwatch;
final CallAttemptsTracerFactory attemptsState;
final AtomicBoolean inboundReceivedOrClosed = new AtomicBoolean();
final OpenTelemetryMetricsModule module;
final StreamInfo info;
final String fullMethodName;
volatile long outboundWireSize;
volatile long inboundWireSize;
long attemptNanos;
Code statusCode;
ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module,
StreamInfo info, String fullMethodName) {
this.attemptsState = attemptsState;
this.module = module;
this.info = info;
this.fullMethodName = fullMethodName;
this.stopwatch = module.stopwatchSupplier.get().start();
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundWireSize(long bytes) {
if (outboundWireSizeUpdater != null) {
outboundWireSizeUpdater.getAndAdd(this, bytes);
} else {
outboundWireSize += bytes;
}
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundWireSize(long bytes) {
if (inboundWireSizeUpdater != null) {
inboundWireSizeUpdater.getAndAdd(this, bytes);
} else {
inboundWireSize += bytes;
}
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundMessage(int seqNo) {
if (inboundReceivedOrClosed.compareAndSet(false, true)) {
// Because inboundUncompressedSize() might be called after streamClosed(),
// we will report stats in callEnded(). Note that this attempt is already committed.
attemptsState.inboundMetricTracer = this;
}
}
@Override
public void streamClosed(Status status) {
stopwatch.stop();
attemptNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
Deadline deadline = info.getCallOptions().getDeadline();
statusCode = status.getCode();
if (statusCode == Code.CANCELLED && deadline != null) {
// When the server's deadline expires, it can only reset the stream with CANCEL and no
// description. Since our timer may be delayed in firing, we double-check the deadline and
// turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
if (deadline.isExpired()) {
statusCode = Code.DEADLINE_EXCEEDED;
}
}
attemptsState.attemptEnded();
if (inboundReceivedOrClosed.compareAndSet(false, true)) {
// Stream is closed early. So no need to record metrics for any inbound events after this
// point.
recordFinishedAttempt();
} // Otherwise will report metrics in callEnded() to guarantee all inbound metrics are
// recorded.
}
void recordFinishedAttempt() {
// TODO(dnvindhya) : add target as an attribute
io.opentelemetry.api.common.Attributes attribute =
io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
STATUS_KEY, statusCode.toString());
module.resource.clientAttemptDurationCounter()
.record(attemptNanos * SECONDS_PER_NANO, attribute);
module.resource.clientTotalSentCompressedMessageSizeCounter()
.record(outboundWireSize, attribute);
module.resource.clientTotalReceivedCompressedMessageSizeCounter()
.record(inboundWireSize, attribute);
}
}
@VisibleForTesting
static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
ClientTracer inboundMetricTracer;
private final OpenTelemetryMetricsModule module;
private final Stopwatch attemptStopwatch;
private final Stopwatch callStopWatch;
@GuardedBy("lock")
private boolean callEnded;
private final String fullMethodName;
private Status status;
private long callLatencyNanos;
private final Object lock = new Object();
private final AtomicLong attemptsPerCall = new AtomicLong();
@GuardedBy("lock")
private int activeStreams;
@GuardedBy("lock")
private boolean finishedCallToBeRecorded;
CallAttemptsTracerFactory(OpenTelemetryMetricsModule module, String fullMethodName) {
this.module = checkNotNull(module, "module");
this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
this.attemptStopwatch = module.stopwatchSupplier.get();
this.callStopWatch = module.stopwatchSupplier.get().start();
// TODO(dnvindhya) : add target as an attribute
io.opentelemetry.api.common.Attributes attribute =
io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName);
// Record here in case mewClientStreamTracer() would never be called.
module.resource.clientAttemptCountCounter().add(1, attribute);
}
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
synchronized (lock) {
if (finishedCallToBeRecorded) {
// This can be the case when the call is cancelled but a retry attempt is created.
return new ClientStreamTracer() {};
}
if (++activeStreams == 1 && attemptStopwatch.isRunning()) {
attemptStopwatch.stop();
}
}
// Skip recording for the first time, since it is already recorded in
// CallAttemptsTracerFactory constructor. attemptsPerCall will be non-zero after the first
// attempt, as first attempt cannot be a transparent retry.
if (attemptsPerCall.get() > 0) {
// TODO(dnvindhya): Add target as an attribute
io.opentelemetry.api.common.Attributes attribute =
io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName);
module.resource.clientAttemptCountCounter().add(1, attribute);
}
if (!info.isTransparentRetry()) {
attemptsPerCall.incrementAndGet();
}
return new ClientTracer(this, module, info, fullMethodName);
}
// Called whenever each attempt is ended.
void attemptEnded() {
boolean shouldRecordFinishedCall = false;
synchronized (lock) {
if (--activeStreams == 0) {
attemptStopwatch.start();
if (callEnded && !finishedCallToBeRecorded) {
shouldRecordFinishedCall = true;
finishedCallToBeRecorded = true;
}
}
}
if (shouldRecordFinishedCall) {
recordFinishedCall();
}
}
void callEnded(Status status) {
callStopWatch.stop();
this.status = status;
boolean shouldRecordFinishedCall = false;
synchronized (lock) {
if (callEnded) {
// TODO(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen
return;
}
callEnded = true;
if (activeStreams == 0 && !finishedCallToBeRecorded) {
shouldRecordFinishedCall = true;
finishedCallToBeRecorded = true;
}
}
if (shouldRecordFinishedCall) {
recordFinishedCall();
}
}
void recordFinishedCall() {
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = new ClientTracer(this, module, null, fullMethodName);
tracer.attemptNanos = attemptStopwatch.elapsed(TimeUnit.NANOSECONDS);
tracer.statusCode = status.getCode();
tracer.recordFinishedAttempt();
} else if (inboundMetricTracer != null) {
// activeStreams has been decremented to 0 by attemptEnded(),
// so inboundMetricTracer.statusCode is guaranteed to be assigned already.
inboundMetricTracer.recordFinishedAttempt();
}
callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS);
// TODO(dnvindhya): record target as an attribute
io.opentelemetry.api.common.Attributes attribute =
io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
STATUS_KEY, status.getCode().toString());
module.resource.clientCallDurationCounter()
.record(callLatencyNanos * SECONDS_PER_NANO, attribute);
}
}
private static final class ServerTracer extends ServerStreamTracer {
@Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
@Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater;
@Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater;
/*
* When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
* JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
* (potentially racy) direct updates of the volatile variables.
*/
static {
AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpOutboundWireSizeUpdater;
AtomicLongFieldUpdater<ServerTracer> tmpInboundWireSizeUpdater;
try {
tmpStreamClosedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
tmpOutboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
tmpInboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
} catch (Throwable t) {
logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
tmpStreamClosedUpdater = null;
tmpOutboundWireSizeUpdater = null;
tmpInboundWireSizeUpdater = null;
}
streamClosedUpdater = tmpStreamClosedUpdater;
outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
}
private final OpenTelemetryMetricsModule module;
private final String fullMethodName;
private volatile boolean isGeneratedMethod;
private volatile int streamClosed;
private final Stopwatch stopwatch;
private volatile long outboundWireSize;
private volatile long inboundWireSize;
ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName) {
this.module = checkNotNull(module, "module");
this.fullMethodName = fullMethodName;
this.stopwatch = module.stopwatchSupplier.get().start();
}
@Override
public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
// Only record method name as an attribute if isSampledToLocalTracing is set to true,
// which is true for all generated methods. Otherwise, programmatically
// created methods result in high cardinality metrics.
boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
isGeneratedMethod = isSampledToLocalTracing;
io.opentelemetry.api.common.Attributes attribute =
io.opentelemetry.api.common.Attributes.of(
METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));
module.resource.serverCallCountCounter().add(1, attribute);
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void outboundWireSize(long bytes) {
if (outboundWireSizeUpdater != null) {
outboundWireSizeUpdater.getAndAdd(this, bytes);
} else {
outboundWireSize += bytes;
}
}
@Override
@SuppressWarnings("NonAtomicVolatileUpdate")
public void inboundWireSize(long bytes) {
if (inboundWireSizeUpdater != null) {
inboundWireSizeUpdater.getAndAdd(this, bytes);
} else {
inboundWireSize += bytes;
}
}
/**
* Record a finished stream and mark the current time as the end time.
*
* <p>Can be called from any thread without synchronization. Calling it the second time or more
* is a no-op.
*/
@Override
public void streamClosed(Status status) {
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
}
} else {
if (streamClosed != 0) {
return;
}
streamClosed = 1;
}
stopwatch.stop();
long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
io.opentelemetry.api.common.Attributes attributes =
io.opentelemetry.api.common.Attributes.of(
METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod),
STATUS_KEY, status.getCode().toString());
module.resource.serverCallDurationCounter()
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes);
module.resource.serverTotalSentCompressedMessageSizeCounter()
.record(outboundWireSize, attributes);
module.resource.serverTotalReceivedCompressedMessageSizeCounter()
.record(inboundWireSize, attributes);
}
}
@VisibleForTesting
final class ServerTracerFactory extends ServerStreamTracer.Factory {
@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName);
}
}
@VisibleForTesting
final class MetricsClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// Only record method name as an attribute if isSampledToLocalTracing is set to true,
// which is true for all generated methods. Otherwise, programatically
// created methods result in high cardinality metrics.
final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
OpenTelemetryMetricsModule.this, recordMethodName(method.getFullMethodName(),
method.isSampledToLocalTracing()));
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
tracerFactory.callEnded(status);
super.onClose(status, trailers);
}
},
headers);
}
};
}
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.opentelemetry;
import com.google.auto.value.AutoValue;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongHistogram;
@AutoValue
abstract class OpenTelemetryMetricsResource {
/* Client Metrics */
abstract DoubleHistogram clientCallDurationCounter();
abstract LongCounter clientAttemptCountCounter();
abstract DoubleHistogram clientAttemptDurationCounter();
abstract LongHistogram clientTotalSentCompressedMessageSizeCounter();
abstract LongHistogram clientTotalReceivedCompressedMessageSizeCounter();
/* Server Metrics */
abstract LongCounter serverCallCountCounter();
abstract DoubleHistogram serverCallDurationCounter();
abstract LongHistogram serverTotalSentCompressedMessageSizeCounter();
abstract LongHistogram serverTotalReceivedCompressedMessageSizeCounter();
static Builder builder() {
return new AutoValue_OpenTelemetryMetricsResource.Builder();
}
@AutoValue.Builder
abstract static class Builder {
abstract Builder clientCallDurationCounter(DoubleHistogram counter);
abstract Builder clientAttemptCountCounter(LongCounter counter);
abstract Builder clientAttemptDurationCounter(DoubleHistogram counter);
abstract Builder clientTotalSentCompressedMessageSizeCounter(LongHistogram counter);
abstract Builder clientTotalReceivedCompressedMessageSizeCounter(
LongHistogram counter);
abstract Builder serverCallCountCounter(LongCounter counter);
abstract Builder serverCallDurationCounter(DoubleHistogram counter);
abstract Builder serverTotalSentCompressedMessageSizeCounter(LongHistogram counter);
abstract Builder serverTotalReceivedCompressedMessageSizeCounter(
LongHistogram counter);
abstract OpenTelemetryMetricsResource build();
}
}

View File

@ -0,0 +1,210 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.opentelemetry;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import io.grpc.ClientInterceptor;
import io.grpc.ExperimentalApi;
import io.grpc.ServerStreamTracer;
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
/**
* The entrypoint for OpenTelemetry metrics functionality in gRPC.
*
* <p>OpenTelemetryModule uses {@link io.opentelemetry.api.OpenTelemetry} APIs for instrumentation.
* When no SDK is explicitly added no telemetry data will be collected. See
* {@link io.opentelemetry.sdk.OpenTelemetrySdk} for information on how to construct the SDK.
*
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10591")
public final class OpenTelemetryModule {
private static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = new Supplier<Stopwatch>() {
@Override
public Stopwatch get() {
return Stopwatch.createUnstarted();
}
};
private final OpenTelemetry openTelemetryInstance;
private final MeterProvider meterProvider;
private final Meter meter;
private final OpenTelemetryMetricsResource resource;
public static Builder newBuilder() {
return new Builder();
}
private OpenTelemetryModule(Builder builder) {
this.openTelemetryInstance = checkNotNull(builder.openTelemetrySdk, "openTelemetrySdk");
this.meterProvider = checkNotNull(openTelemetryInstance.getMeterProvider(), "meterProvider");
this.meter = this.meterProvider
.meterBuilder(OpenTelemetryConstants.INSTRUMENTATION_SCOPE)
.setInstrumentationVersion(IMPLEMENTATION_VERSION)
.build();
this.resource = createMetricInstruments(meter);
}
@VisibleForTesting
OpenTelemetry getOpenTelemetryInstance() {
return this.openTelemetryInstance;
}
@VisibleForTesting
MeterProvider getMeterProvider() {
return this.meterProvider;
}
@VisibleForTesting
Meter getMeter() {
return this.meter;
}
@VisibleForTesting
OpenTelemetryMetricsResource getResource() {
return this.resource;
}
/**
* Returns a {@link ClientInterceptor} with metrics implementation.
*/
public ClientInterceptor getClientInterceptor() {
OpenTelemetryMetricsModule openTelemetryMetricsModule =
new OpenTelemetryMetricsModule(
STOPWATCH_SUPPLIER,
resource);
return openTelemetryMetricsModule.getClientInterceptor();
}
/**
* Returns a {@link ServerStreamTracer.Factory} with metrics implementation.
*/
public ServerStreamTracer.Factory getServerStreamTracerFactory() {
OpenTelemetryMetricsModule openTelemetryMetricsModule =
new OpenTelemetryMetricsModule(
STOPWATCH_SUPPLIER,
resource);
return openTelemetryMetricsModule.getServerTracerFactory();
}
@VisibleForTesting
static OpenTelemetryMetricsResource createMetricInstruments(Meter meter) {
OpenTelemetryMetricsResource.Builder builder = OpenTelemetryMetricsResource.builder();
builder.clientCallDurationCounter(
meter.histogramBuilder("grpc.client.call.duration")
.setUnit("s")
.setDescription(
"Time taken by gRPC to complete an RPC from application's perspective")
.build());
builder.clientAttemptCountCounter(
meter.counterBuilder("grpc.client.attempt.started")
.setUnit("{attempt}")
.setDescription("Number of client call attempts started")
.build());
builder.clientAttemptDurationCounter(
meter.histogramBuilder(
"grpc.client.attempt.duration")
.setUnit("s")
.setDescription("Time taken to complete a client call attempt")
.build());
builder.clientTotalSentCompressedMessageSizeCounter(
meter.histogramBuilder(
"grpc.client.attempt.sent_total_compressed_message_size")
.setUnit("By")
.setDescription("Compressed message bytes sent per client call attempt")
.ofLongs()
.build());
builder.clientTotalReceivedCompressedMessageSizeCounter(
meter.histogramBuilder(
"grpc.client.attempt.rcvd_total_compressed_message_size")
.setUnit("By")
.setDescription("Compressed message bytes received per call attempt")
.ofLongs()
.build());
builder.serverCallCountCounter(
meter.counterBuilder("grpc.server.call.started")
.setUnit("{call}")
.setDescription("Number of server calls started")
.build());
builder.serverCallDurationCounter(
meter.histogramBuilder("grpc.server.call.duration")
.setUnit("s")
.setDescription(
"Time taken to complete a call from server transport's perspective")
.build());
builder.serverTotalSentCompressedMessageSizeCounter(
meter.histogramBuilder(
"grpc.server.call.sent_total_compressed_message_size")
.setUnit("By")
.setDescription("Compressed message bytes sent per server call")
.ofLongs()
.build());
builder.serverTotalReceivedCompressedMessageSizeCounter(
meter.histogramBuilder(
"grpc.server.call.rcvd_total_compressed_message_size")
.setUnit("By")
.setDescription("Compressed message bytes received per server call")
.ofLongs()
.build());
return builder.build();
}
/**
* Builder for configuring {@link OpenTelemetryModule}.
*/
public static class Builder {
private OpenTelemetry openTelemetrySdk = OpenTelemetry.noop();
private Builder() {}
/**
* Sets the {@link io.opentelemetry.api.OpenTelemetry} entrypoint to use. This can be used to
* configure OpenTelemetry by returning the instance created by a
* {@link io.opentelemetry.sdk.OpenTelemetrySdkBuilder}.
*/
public Builder sdk(OpenTelemetry sdk) {
this.openTelemetrySdk = sdk;
return this;
}
/**
* Returns a new {@link OpenTelemetryModule} built with the configuration of this {@link
* Builder}.
*/
public OpenTelemetryModule build() {
return new OpenTelemetryModule(this);
}
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.opentelemetry.internal;
import io.opentelemetry.api.common.AttributeKey;
public final class OpenTelemetryConstants {
public static final String INSTRUMENTATION_SCOPE = "grpc-java";
public static final AttributeKey<String> METHOD_KEY = AttributeKey.stringKey("grpc.method");
public static final AttributeKey<String> STATUS_KEY = AttributeKey.stringKey("grpc.status");
public static final AttributeKey<String> TARGET_KEY = AttributeKey.stringKey("grpc.target");
private OpenTelemetryConstants() {
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.opentelemetry;
import static com.google.common.truth.Truth.assertThat;
import io.grpc.internal.GrpcUtil;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class OpenTelemetryModuleTest {
private final InMemoryMetricReader inMemoryMetricReader = InMemoryMetricReader.create();
private final SdkMeterProvider meterProvider =
SdkMeterProvider.builder().registerMetricReader(inMemoryMetricReader).build();
private final OpenTelemetry noopOpenTelemetry = OpenTelemetry.noop();
@Test
public void build() {
OpenTelemetrySdk sdk =
OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();
OpenTelemetryModule openTelemetryModule = OpenTelemetryModule.newBuilder()
.sdk(sdk)
.build();
assertThat(openTelemetryModule.getOpenTelemetryInstance()).isSameInstanceAs(sdk);
assertThat(openTelemetryModule.getMeterProvider()).isNotNull();
assertThat(openTelemetryModule.getMeter()).isSameInstanceAs(
meterProvider.meterBuilder("grpc-java")
.setInstrumentationVersion(GrpcUtil.IMPLEMENTATION_VERSION)
.build());
}
@Test
public void builderDefaults() {
OpenTelemetryModule module = OpenTelemetryModule.newBuilder().build();
assertThat(module.getOpenTelemetryInstance()).isNotNull();
assertThat(module.getOpenTelemetryInstance()).isSameInstanceAs(noopOpenTelemetry);
assertThat(module.getMeterProvider()).isNotNull();
assertThat(module.getMeterProvider())
.isSameInstanceAs(noopOpenTelemetry.getMeterProvider());
assertThat(module.getMeter()).isSameInstanceAs(noopOpenTelemetry
.getMeterProvider()
.meterBuilder("grpc-java")
.setInstrumentationVersion(GrpcUtil.IMPLEMENTATION_VERSION)
.build());
}
}

View File

@ -71,6 +71,7 @@ include ":grpc-gcp-observability:interop"
include ":grpc-istio-interop-testing"
include ":grpc-inprocess"
include ":grpc-util"
include ":grpc-opentelemetry"
project(':grpc-api').projectDir = "$rootDir/api" as File
project(':grpc-core').projectDir = "$rootDir/core" as File
@ -104,6 +105,7 @@ project(':grpc-gcp-observability:interop').projectDir = "$rootDir/gcp-observabil
project(':grpc-istio-interop-testing').projectDir = "$rootDir/istio-interop-testing" as File
project(':grpc-inprocess').projectDir = "$rootDir/inprocess" as File
project(':grpc-util').projectDir = "$rootDir/util" as File
project(':grpc-opentelemetry').projectDir = "$rootDir/opentelemetry" as File
if (settings.hasProperty('skipCodegen') && skipCodegen.toBoolean()) {
println '*** Skipping the build of codegen and compilation of proto files because skipCodegen=true'