Transition kubernetes client 7.0 module to instrumenter API (#4036)

* Convert kubernetes-client-7.0 to Instrumenter API

* Respond to PR feedback
This commit is contained in:
jack-berg 2021-09-07 17:17:42 -05:00 committed by GitHub
parent 4e221d6e4c
commit 1a994b9845
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 312 additions and 192 deletions

View File

@ -5,19 +5,21 @@
package io.opentelemetry.javaagent.instrumentation.kubernetesclient;
import static io.opentelemetry.javaagent.instrumentation.kubernetesclient.KubernetesClientTracer.tracer;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.kubernetesclient.KubernetesClientSingletons.inject;
import static io.opentelemetry.javaagent.instrumentation.kubernetesclient.KubernetesClientSingletons.instrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import io.kubernetes.client.openapi.ApiCallback;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.ApiResponse;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@ -55,15 +57,17 @@ public class ApiClientInstrumentation implements TypeInstrumentation {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return(readOnly = false) Request request) {
Context parentContext = Java8BytecodeBridge.currentContext();
if (!tracer().shouldStartSpan(parentContext)) {
Context parentContext = currentContext();
if (!instrumenter().shouldStart(parentContext, request)) {
return;
}
Context context = instrumenter().start(parentContext, request);
Scope scope = context.makeCurrent();
Request.Builder requestWithPropagation = request.newBuilder();
Context context = tracer().startSpan(parentContext, request, requestWithPropagation);
CurrentContextAndScope.set(parentContext, context);
inject(context, requestWithPropagation);
request = requestWithPropagation.build();
CurrentState.set(parentContext, context, scope, request);
}
}
@ -73,15 +77,19 @@ public class ApiClientInstrumentation implements TypeInstrumentation {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void onExit(
@Advice.Return ApiResponse<?> response, @Advice.Thrown Throwable throwable) {
Context context = CurrentContextAndScope.removeAndClose();
if (context == null) {
CurrentState currentState = CurrentState.remove();
if (currentState == null) {
return;
}
if (throwable == null) {
tracer().end(context, response);
} else {
tracer().endExceptionally(context, response, throwable);
currentState.getScope().close();
Context context = currentState.getContext();
ApiResponse<?> endResponse = response;
if (response == null && throwable instanceof ApiException) {
ApiException apiException = (ApiException) throwable;
endResponse = new ApiResponse<>(apiException.getCode(), apiException.getResponseHeaders());
}
instrumenter().end(context, currentState.getRequest(), endResponse, throwable);
}
}
@ -93,28 +101,32 @@ public class ApiClientInstrumentation implements TypeInstrumentation {
@Advice.Argument(0) Call httpCall,
@Advice.Argument(value = 2, readOnly = false) ApiCallback<?> callback,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
CurrentContextAndScope current = CurrentContextAndScope.remove();
if (current != null) {
context = current.getContext();
scope = current.getScope();
callback = new TracingApiCallback<>(callback, current.getParentContext(), context);
@Advice.Local("otelScope") Scope scope,
@Advice.Local("otelRequest") Request request) {
CurrentState current = CurrentState.remove();
if (current == null) {
return;
}
context = current.getContext();
scope = current.getScope();
request = current.getRequest();
callback = new TracingApiCallback<>(callback, current.getParentContext(), context, request);
}
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void onExit(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
@Advice.Local("otelScope") Scope scope,
@Advice.Local("otelRequest") Request request) {
if (scope == null) {
return;
}
scope.close();
if (throwable != null) {
tracer().endExceptionally(context, throwable);
instrumenter().end(context, request, null, throwable);
}
// else span will be ended in the TracingApiCallback
}

View File

@ -1,61 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kubernetesclient;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
* Kubernetes instrumentation starts and ends spans in two different methods - there only way to
* pass {@link Scope} between them is to use a thread local.
*/
public final class CurrentContextAndScope {
private static final ThreadLocal<CurrentContextAndScope> CURRENT = new ThreadLocal<>();
private final Context parentContext;
private final Context context;
private final Scope scope;
private CurrentContextAndScope(Context parentContext, Context context, Scope scope) {
this.parentContext = parentContext;
this.context = context;
this.scope = scope;
}
public static void set(Context parentContext, Context context) {
CURRENT.set(new CurrentContextAndScope(parentContext, context, context.makeCurrent()));
}
@Nullable
public static CurrentContextAndScope remove() {
CurrentContextAndScope contextAndScope = CURRENT.get();
CURRENT.remove();
return contextAndScope;
}
@Nullable
public static Context removeAndClose() {
CurrentContextAndScope contextAndScope = remove();
if (contextAndScope == null) {
return null;
}
contextAndScope.scope.close();
return contextAndScope.context;
}
public Context getParentContext() {
return parentContext;
}
public Context getContext() {
return context;
}
public Scope getScope() {
return scope;
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kubernetesclient;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import okhttp3.Request;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
* Kubernetes instrumentation starts and ends spans in two different methods - the only way to pass
* state between them is to use a thread local.
*/
public final class CurrentState {
private static final ThreadLocal<CurrentState> CURRENT = new ThreadLocal<>();
private final Context parentContext;
private final Context context;
private final Scope scope;
private final Request request;
private CurrentState(Context parentContext, Context context, Scope scope, Request request) {
this.parentContext = parentContext;
this.context = context;
this.scope = scope;
this.request = request;
}
public static void set(Context parentContext, Context context, Scope scope, Request request) {
CURRENT.set(new CurrentState(parentContext, context, scope, request));
}
@Nullable
public static CurrentState remove() {
CurrentState currentState = CURRENT.get();
CURRENT.remove();
return currentState;
}
public Context getParentContext() {
return parentContext;
}
public Context getContext() {
return context;
}
public Scope getScope() {
return scope;
}
public Request getRequest() {
return request;
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kubernetesclient;
import static io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor.alwaysClient;
import static io.opentelemetry.javaagent.instrumentation.kubernetesclient.RequestBuilderInjectAdapter.SETTER;
import io.kubernetes.client.openapi.ApiResponse;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor;
import okhttp3.Request;
public class KubernetesClientSingletons {
private static final Instrumenter<Request, ApiResponse<?>> INSTRUMENTER;
private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES =
Config.get()
.getBoolean("otel.instrumentation.kubernetes-client.experimental-span-attributes", false);
private static final ContextPropagators CONTEXT_PROPAGATORS;
static {
KubernetesHttpAttributesExtractor httpAttributesExtractor =
new KubernetesHttpAttributesExtractor();
SpanNameExtractor<Request> spanNameExtractor =
request -> KubernetesRequestDigest.parse(request).toString();
InstrumenterBuilder<Request, ApiResponse<?>> instrumenterBuilder =
Instrumenter.<Request, ApiResponse<?>>newBuilder(
GlobalOpenTelemetry.get(),
"io.opentelemetry.kubernetes-client-7.0",
spanNameExtractor)
.setSpanStatusExtractor(HttpSpanStatusExtractor.create(httpAttributesExtractor))
.addAttributesExtractor(httpAttributesExtractor)
.addAttributesExtractor(new KubernetesNetAttributesExtractor());
if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
instrumenterBuilder.addAttributesExtractor(new KubernetesExperimentalAttributesExtractor());
}
// Initialize with .newInstrumenter(alwaysClient()) instead of .newClientInstrumenter(..)
// because Request is immutable so context must be injected manually
INSTRUMENTER = instrumenterBuilder.newInstrumenter(alwaysClient());
CONTEXT_PROPAGATORS = GlobalOpenTelemetry.getPropagators();
}
public static Instrumenter<Request, ApiResponse<?>> instrumenter() {
return INSTRUMENTER;
}
public static void inject(Context context, Request.Builder requestBuilder) {
CONTEXT_PROPAGATORS.getTextMapPropagator().inject(context, requestBuilder, SETTER);
}
private KubernetesClientSingletons() {}
}

View File

@ -1,106 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kubernetesclient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.ApiResponse;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.tracer.HttpClientTracer;
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import okhttp3.Request;
public class KubernetesClientTracer
extends HttpClientTracer<Request, Request.Builder, ApiResponse<?>> {
private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES =
Config.get()
.getBoolean("otel.instrumentation.kubernetes-client.experimental-span-attributes", false);
private static final KubernetesClientTracer TRACER = new KubernetesClientTracer();
private KubernetesClientTracer() {
super(NetPeerAttributes.INSTANCE);
}
public static KubernetesClientTracer tracer() {
return TRACER;
}
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.kubernetes-client-7.0";
}
@Override
protected String method(Request request) {
return request.method();
}
@Override
protected URI url(Request request) {
return request.url().uri();
}
@Override
protected Integer status(ApiResponse<?> response) {
return response.getStatusCode();
}
@Override
protected String requestHeader(Request request, String name) {
return request.header(name);
}
@Override
protected String responseHeader(ApiResponse<?> response, String name) {
Map<String, List<String>> responseHeaders =
response.getHeaders() == null ? Collections.emptyMap() : response.getHeaders();
return responseHeaders.getOrDefault(name, Collections.emptyList()).stream()
.findFirst()
.orElse(null);
}
@Override
protected TextMapSetter<Request.Builder> getSetter() {
return RequestBuilderInjectAdapter.SETTER;
}
@Override
public void onException(Context context, Throwable throwable) {
super.onException(context, throwable);
if (throwable instanceof ApiException) {
int status = ((ApiException) throwable).getCode();
if (status != 0) {
Span.fromContext(context).setAttribute(SemanticAttributes.HTTP_STATUS_CODE, status);
}
}
}
@Override
protected void onRequest(SpanBuilder spanBuilder, Request request) {
super.onRequest(spanBuilder, request);
if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
KubernetesRequestDigest digest = KubernetesRequestDigest.parse(request);
spanBuilder
.setAttribute("kubernetes-client.namespace", digest.getResourceMeta().getNamespace())
.setAttribute("kubernetes-client.name", digest.getResourceMeta().getName());
}
}
@Override
protected String spanNameForRequest(Request request) {
return KubernetesRequestDigest.parse(request).toString();
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kubernetesclient;
import io.kubernetes.client.openapi.ApiResponse;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import okhttp3.Request;
import org.checkerframework.checker.nullness.qual.Nullable;
class KubernetesExperimentalAttributesExtractor
extends AttributesExtractor<Request, ApiResponse<?>> {
@Override
protected void onStart(AttributesBuilder attributes, Request request) {
KubernetesRequestDigest digest = KubernetesRequestDigest.parse(request);
attributes
.put("kubernetes-client.namespace", digest.getResourceMeta().getNamespace())
.put("kubernetes-client.name", digest.getResourceMeta().getName());
}
@Override
protected void onEnd(
AttributesBuilder attributes, Request request, @Nullable ApiResponse<?> apiResponse) {}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kubernetesclient;
import io.kubernetes.client.openapi.ApiResponse;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpAttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import okhttp3.Request;
import org.checkerframework.checker.nullness.qual.Nullable;
class KubernetesHttpAttributesExtractor extends HttpAttributesExtractor<Request, ApiResponse<?>> {
@Override
protected String method(Request request) {
return request.method();
}
@Override
protected String url(Request request) {
return request.url().toString();
}
@Override
protected @Nullable String target(Request request) {
return null;
}
@Override
protected @Nullable String host(Request request) {
return null;
}
@Override
protected @Nullable String route(Request request) {
return null;
}
@Override
protected @Nullable String scheme(Request request) {
return null;
}
@Override
protected @Nullable String userAgent(Request request) {
return request.header("user-agent");
}
@Override
protected @Nullable Long requestContentLength(
Request request, @Nullable ApiResponse<?> apiResponse) {
return null;
}
@Override
protected @Nullable Long requestContentLengthUncompressed(
Request request, @Nullable ApiResponse<?> apiResponse) {
return null;
}
@Override
protected String flavor(Request request, @Nullable ApiResponse<?> apiResponse) {
return SemanticAttributes.HttpFlavorValues.HTTP_1_1;
}
@Override
protected @Nullable String serverName(Request request, @Nullable ApiResponse<?> apiResponse) {
return null;
}
@Override
protected Integer statusCode(Request request, ApiResponse<?> apiResponse) {
return apiResponse.getStatusCode();
}
@Override
protected @Nullable Long responseContentLength(Request request, ApiResponse<?> apiResponse) {
return null;
}
@Override
protected @Nullable Long responseContentLengthUncompressed(
Request request, ApiResponse<?> apiResponse) {
return null;
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kubernetesclient;
import io.kubernetes.client.openapi.ApiResponse;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetAttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import okhttp3.Request;
import org.checkerframework.checker.nullness.qual.Nullable;
class KubernetesNetAttributesExtractor extends NetAttributesExtractor<Request, ApiResponse<?>> {
@Override
public String transport(Request request) {
return SemanticAttributes.NetTransportValues.IP_TCP;
}
@Override
public String peerName(Request request, @Nullable ApiResponse<?> apiResponse) {
return request.url().host();
}
@Override
public Integer peerPort(Request request, @Nullable ApiResponse<?> apiResponse) {
return request.url().port();
}
@Override
public @Nullable String peerIp(Request request, @Nullable ApiResponse<?> apiResponse) {
return null;
}
}

View File

@ -5,7 +5,7 @@
package io.opentelemetry.javaagent.instrumentation.kubernetesclient;
import static io.opentelemetry.javaagent.instrumentation.kubernetesclient.KubernetesClientTracer.tracer;
import static io.opentelemetry.javaagent.instrumentation.kubernetesclient.KubernetesClientSingletons.instrumenter;
import io.kubernetes.client.openapi.ApiCallback;
import io.kubernetes.client.openapi.ApiException;
@ -14,21 +14,25 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.List;
import java.util.Map;
import okhttp3.Request;
public class TracingApiCallback<T> implements ApiCallback<T> {
private final ApiCallback<T> delegate;
private final Context parentContext;
private final Context context;
private final Request request;
public TracingApiCallback(ApiCallback<T> delegate, Context parentContext, Context context) {
public TracingApiCallback(
ApiCallback<T> delegate, Context parentContext, Context context, Request request) {
this.delegate = delegate;
this.parentContext = parentContext;
this.context = context;
this.request = request;
}
@Override
public void onFailure(ApiException e, int status, Map<String, List<String>> headers) {
tracer().endExceptionally(context, new ApiResponse<>(status, headers), e);
instrumenter().end(context, request, new ApiResponse<>(status, headers), e);
if (delegate != null) {
try (Scope ignored = parentContext.makeCurrent()) {
delegate.onFailure(e, status, headers);
@ -38,7 +42,7 @@ public class TracingApiCallback<T> implements ApiCallback<T> {
@Override
public void onSuccess(T t, int status, Map<String, List<String>> headers) {
tracer().end(context, new ApiResponse<>(status, headers));
instrumenter().end(context, request, new ApiResponse<>(status, headers), null);
if (delegate != null) {
try (Scope ignored = parentContext.makeCurrent()) {
delegate.onSuccess(t, status, headers);