Implement a Call.Factory for okhttp 3.x+ library instrumentation (#3812)

* Add a README for the okhttp library instrumentation.

* Create new instrumentation for okhttp3 4.x+

This exposes a `Call.Factory` which will properly handle internal context propagation when used with async callbacks.

* update the "4.x" instrumentation to support 3.x

* Get rid of the 4.x instrumentation, and just update the 3.x instrumentation to work

* updates from PR review

* replace old reflection with method handle usage

* Apply suggestions from code review

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
John Watson 2021-08-17 20:16:24 -07:00 committed by GitHub
parent d8eae4997d
commit 6dbb64ec7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 293 additions and 56 deletions

View File

@ -14,6 +14,7 @@ import okhttp3.Interceptor;
/** Holder of singleton interceptors for adding to instrumented clients. */
public final class OkHttp3Singletons {
@SuppressWarnings("deprecation") // we're still using the interceptor on its own for now
public static final Interceptor TRACING_INTERCEPTOR =
OkHttpTracing.newBuilder(GlobalOpenTelemetry.get())
.addAttributesExtractor(

View File

@ -7,11 +7,35 @@ package io.opentelemetry.javaagent.instrumentation.okhttp.v3_0
import io.opentelemetry.instrumentation.okhttp.v3_0.AbstractOkHttp3Test
import io.opentelemetry.instrumentation.test.AgentTestTrait
import okhttp3.Call
import okhttp3.OkHttpClient
import java.util.concurrent.TimeUnit
class OkHttp3Test extends AbstractOkHttp3Test implements AgentTestTrait {
@Override
OkHttpClient.Builder configureClient(OkHttpClient.Builder clientBuilder) {
return clientBuilder
Call.Factory createCallFactory(OkHttpClient.Builder clientBuilder) {
return clientBuilder.build()
}
def "reused builder has one interceptor"() {
def builder = new OkHttpClient.Builder()
.connectTimeout(CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.retryOnConnectionFailure(false)
when:
def newClient = builder.build().newBuilder().build()
then:
newClient.interceptors().size() == 1
}
def "builder created from client has one interceptor"() {
when:
def newClient = ((OkHttpClient) client).newBuilder().build()
then:
newClient.interceptors().size() == 1
}
}

View File

@ -0,0 +1,57 @@
# Manual Instrumentation for OkHttp3 version 3.0.0+
Provides OpenTelemetry instrumentation for [okhttp3](https://square.github.io/okhttp/).
## Quickstart
### Add these dependencies to your project:
Replace `OPENTELEMETRY_VERSION` with the latest stable
[release](https://mvnrepository.com/artifact/io.opentelemetry). `Minimum version: 1.5.0`
For Maven, add to your `pom.xml` dependencies:
```xml
<dependencies>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-okhttp-3.0</artifactId>
<version>OPENTELEMETRY_VERSION</version>
</dependency>
</dependencies>
```
For Gradle, add to your dependencies:
```groovy
implementation("io.opentelemetry.instrumentation:opentelemetry-okhttp-3.0:OPENTELEMETRY_VERSION")
```
### Usage
The instrumentation library provides an OkHttp `Call.Factory` implementation that wraps
an instance of the `OkHttpClient` to provide OpenTelemetry-based spans and context
propagation.
```java
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.okhttp.v3_0.OkHttpTracing;
import okhttp3.Call;
import okhttp3.OkHttpClient;
import java.util.concurrent.ExecutorService;
public class OkHttpConfiguration {
//Use this Call.Factory implementation for making standard http client calls.
public Call.Factory createTracedClient(OpenTelemetry openTelemetry) {
return OkHttpTracing.newBuilder(openTelemetry).build().newCallFactory(createClient());
}
//your configuration of the OkHttpClient goes here:
private OkHttpClient createClient() {
return new OkHttpClient.Builder().build();
}
}
```

View File

@ -60,6 +60,7 @@ final class OkHttpAttributesExtractor extends HttpAttributesExtractor<Request, R
}
@Override
@SuppressWarnings("UnnecessaryDefaultInEnumSwitch")
protected @Nullable String flavor(Request request, @Nullable Response response) {
if (response == null) {
return null;
@ -73,8 +74,10 @@ final class OkHttpAttributesExtractor extends HttpAttributesExtractor<Request, R
return SemanticAttributes.HttpFlavorValues.HTTP_2_0;
case SPDY_3:
return SemanticAttributes.HttpFlavorValues.SPDY;
// No OTel mapping for other protocols like H2C.
default:
return null;
}
return null;
}
@Override

View File

@ -8,10 +8,10 @@ package io.opentelemetry.instrumentation.okhttp.v3_0;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.concurrent.ExecutorService;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Dispatcher;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
@ -38,21 +38,33 @@ public final class OkHttpTracing {
/**
* Returns a new {@link Interceptor} that can be used with methods like {@link
* okhttp3.OkHttpClient.Builder#addInterceptor(Interceptor)}. Note that asynchronous calls using
* {@link okhttp3.Call.Factory#enqueue(Callback)} will not work correctly unless you also decorate
* the {@linkplain Dispatcher#executorService() dispatcher's executor service} with {@link
* io.opentelemetry.context.Context#taskWrapping(ExecutorService)}. For example, if using the
* default {@link Dispatcher}, you will need to configure {@link okhttp3.OkHttpClient.Builder}
* something like
* okhttp3.OkHttpClient.Builder#addInterceptor(Interceptor)}.
*
* <pre>{@code
* new OkHttpClient.Builder()
* .dispatcher(new Dispatcher(Context.taskWrapping(new Dispatcher().executorService())))
* .addInterceptor(OkHttpTracing.create(openTelemetry).newInterceptor())
* ...
* }</pre>
* <p>Important: asynchronous calls using {@link okhttp3.Call.Factory#enqueue(Callback)} will
* *not* work correctly using just this interceptor.
*
* <p>It is strongly recommended that you use the {@link #newCallFactory(OkHttpClient)} method to
* decorate your {@link OkHttpClient}, rather than using this method directly.
*
* @deprecated Please use the {@link #newCallFactory(OkHttpClient)} method instead.
*/
@Deprecated
public Interceptor newInterceptor() {
return new TracingInterceptor(instrumenter, propagators);
}
/**
* Construct a new OpenTelemetry tracing-enabled {@link okhttp3.Call.Factory} using the provided
* {@link OkHttpClient} instance.
*
* <p>Using this method will result in proper propagation and span parenting, for both {@linkplain
* Call#execute() synchronous} and {@linkplain Call#enqueue(Callback) asynchronous} usages.
*
* @param baseClient An instance of OkHttpClient configured as desired.
* @return a {@link Call.Factory} for creating new {@link Call} instances.
*/
public Call.Factory newCallFactory(OkHttpClient baseClient) {
OkHttpClient tracingClient = baseClient.newBuilder().addInterceptor(newInterceptor()).build();
return new TracingCallFactory(tracingClient);
}
}

View File

@ -0,0 +1,158 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.okhttp.v3_0;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.caching.Cache;
import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okio.Timeout;
import org.checkerframework.checker.nullness.qual.Nullable;
class TracingCallFactory implements Call.Factory {
private static final Cache<Request, Context> contextsByRequest =
Cache.newBuilder().setWeakKeys().build();
@Nullable private static MethodHandle timeoutMethodHandle;
@Nullable private static MethodHandle cloneMethodHandle;
static {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
try {
MethodType methodType = MethodType.methodType(Timeout.class);
timeoutMethodHandle = lookup.findVirtual(Call.class, "timeout", methodType);
} catch (NoSuchMethodException | IllegalAccessException e) {
timeoutMethodHandle = null;
}
try {
MethodType methodType = MethodType.methodType(Call.class);
cloneMethodHandle = lookup.findVirtual(Call.class, "clone", methodType);
} catch (NoSuchMethodException | IllegalAccessException e) {
cloneMethodHandle = null;
}
}
private final OkHttpClient okHttpClient;
TracingCallFactory(OkHttpClient okHttpClient) {
this.okHttpClient = okHttpClient;
}
@Nullable
static Context getCallingContextForRequest(Request request) {
return contextsByRequest.get(request);
}
@Override
public Call newCall(Request request) {
Context callingContext = Context.current();
Request requestCopy = request.newBuilder().build();
contextsByRequest.put(requestCopy, callingContext);
return new TracingCall(okHttpClient.newCall(requestCopy), callingContext);
}
static class TracingCall implements Call {
private final Call delegate;
private final Context callingContext;
TracingCall(Call delegate, Context callingContext) {
this.delegate = delegate;
this.callingContext = callingContext;
}
@Override
public void cancel() {
delegate.cancel();
}
@Override
public Call clone() throws CloneNotSupportedException {
if (cloneMethodHandle == null) {
return (Call) super.clone();
}
try {
// we pull the current context here, because the cloning might be happening in a different
// context than the original call creation.
return new TracingCall((Call) cloneMethodHandle.invoke(delegate), Context.current());
} catch (Throwable e) {
return (Call) super.clone();
}
}
@Override
public void enqueue(Callback callback) {
delegate.enqueue(new TracingCallback(callback, callingContext));
}
@Override
public Response execute() throws IOException {
try (Scope scope = callingContext.makeCurrent()) {
return delegate.execute();
}
}
@Override
public boolean isCanceled() {
return delegate.isCanceled();
}
@Override
public boolean isExecuted() {
return delegate.isExecuted();
}
@Override
public Request request() {
return delegate.request();
}
// @Override method was introduced in 3.12
public Timeout timeout() {
if (timeoutMethodHandle == null) {
return Timeout.NONE;
}
try {
return (Timeout) timeoutMethodHandle.invoke(delegate);
} catch (Throwable e) {
// do nothing...we're before 3.12, or something else has gone wrong that we can't do
// anything about.
return Timeout.NONE;
}
}
private static class TracingCallback implements Callback {
private final Callback delegate;
private final Context callingContext;
public TracingCallback(Callback delegate, Context callingContext) {
this.delegate = delegate;
this.callingContext = callingContext;
}
@Override
public void onFailure(Call call, IOException e) {
try (Scope scope = callingContext.makeCurrent()) {
delegate.onFailure(call, e);
}
}
@Override
public void onResponse(Call call, Response response) throws IOException {
try (Scope scope = callingContext.makeCurrent()) {
delegate.onResponse(call, response);
}
}
}
}
}

View File

@ -26,8 +26,11 @@ final class TracingInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
Context parentContext = Context.current();
Request request = chain.request();
Context parentContext = TracingCallFactory.getCallingContextForRequest(request);
if (parentContext == null) {
parentContext = Context.current();
}
if (!instrumenter.shouldStart(parentContext, request)) {
return chain.proceed(chain.request());

View File

@ -5,18 +5,16 @@
package io.opentelemetry.instrumentation.okhttp.v3_0
import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import okhttp3.Dispatcher
import okhttp3.Call
import okhttp3.OkHttpClient
class OkHttp3Test extends AbstractOkHttp3Test implements LibraryTestTrait {
@Override
OkHttpClient.Builder configureClient(OkHttpClient.Builder clientBuilder) {
return clientBuilder
// The double "new Dispatcher" style is the simplest way to decorate the default executor.
.dispatcher(new Dispatcher(Context.taskWrapping(new Dispatcher().executorService())))
.addInterceptor(OkHttpTracing.create(getOpenTelemetry()).newInterceptor())
Call.Factory createCallFactory(OkHttpClient.Builder clientBuilder) {
return OkHttpTracing.create(getOpenTelemetry()).newCallFactory(clientBuilder.build())
}
// library instrumentation doesn't have a good way of suppressing nested CLIENT spans yet
@ -25,8 +23,4 @@ class OkHttp3Test extends AbstractOkHttp3Test implements LibraryTestTrait {
false
}
@Override
boolean testCausalityWithCallback() {
false
}
}

View File

@ -9,7 +9,6 @@ import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.TimeUnit
import okhttp3.Call
import okhttp3.Callback
import okhttp3.Headers
@ -22,17 +21,18 @@ import okhttp3.Response
import okhttp3.internal.http.HttpMethod
import spock.lang.Shared
import java.util.concurrent.TimeUnit
abstract class AbstractOkHttp3Test extends HttpClientTest<Request> {
abstract OkHttpClient.Builder configureClient(OkHttpClient.Builder clientBuilder)
abstract Call.Factory createCallFactory(OkHttpClient.Builder clientBuilder)
@Shared
def client = configureClient(
Call.Factory client = createCallFactory(
new OkHttpClient.Builder()
.connectTimeout(CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.protocols(Arrays.asList(Protocol.HTTP_1_1))
.retryOnConnectionFailure(false))
.build()
@Override
Request buildRequest(String method, URI uri, Map<String, String> headers) {
@ -45,7 +45,10 @@ abstract class AbstractOkHttp3Test extends HttpClientTest<Request> {
@Override
int sendRequest(Request request, String method, URI uri, Map<String, String> headers) {
return client.newCall(request).execute().code()
def response = client.newCall(request).execute()
response.body().withCloseable {
return response.code()
}
}
@Override
@ -58,7 +61,9 @@ abstract class AbstractOkHttp3Test extends HttpClientTest<Request> {
@Override
void onResponse(Call call, Response response) throws IOException {
requestResult.complete(response.code())
response.body().withCloseable {
requestResult.complete(response.code())
}
}
})
}
@ -85,24 +90,4 @@ abstract class AbstractOkHttp3Test extends HttpClientTest<Request> {
attributes
}
def "reused builder has one interceptor"() {
when:
def builder = configureClient(new OkHttpClient.Builder()
.connectTimeout(CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.retryOnConnectionFailure(false))
builder.build()
def newClient = builder.build()
then:
newClient.interceptors().size() == 1
}
def "builder created from client has one interceptor"() {
when:
def newClient = client.newBuilder().build()
then:
newClient.interceptors().size() == 1
}
}

View File

@ -220,7 +220,7 @@ public abstract class AbstractHttpClientTest<REQUEST> {
testing.waitAndAssertTraces(
trace -> {
// Workaroud until release of
// Workaround until release of
// https://github.com/open-telemetry/opentelemetry-java/pull/3386
// in 1.5
List<List<SpanData>> traces = testing.traces();