Propagate otel context through custom aws client context for lambda direct calls (#11675)

Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
This commit is contained in:
John Bley 2024-07-09 23:29:04 -04:00 committed by GitHub
parent ec91735598
commit 6b65447300
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 502 additions and 1 deletions

View File

@ -11,6 +11,7 @@ import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.internal.ContextPropagationDebug;
import io.opentelemetry.instrumentation.awslambdacore.v1_0.AwsLambdaRequest;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import javax.annotation.Nullable;
@ -48,11 +49,20 @@ public class AwsLambdaFunctionInstrumenter {
public Context extract(AwsLambdaRequest input) {
ContextPropagationDebug.debugContextLeakIfEnabled();
// Look in both the http headers and the custom client context
Map<String, String> headers = input.getHeaders();
if (input.getAwsContext() != null && input.getAwsContext().getClientContext() != null) {
Map<String, String> customContext = input.getAwsContext().getClientContext().getCustom();
if (customContext != null) {
headers = new HashMap<>(headers);
headers.putAll(customContext);
}
}
return openTelemetry
.getPropagators()
.getTextMapPropagator()
.extract(Context.root(), input.getHeaders(), MapGetter.INSTANCE);
.extract(Context.root(), headers, MapGetter.INSTANCE);
}
private enum MapGetter implements TextMapGetter<Map<String, String>> {

View File

@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambdacore.v1_0.internal;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.amazonaws.services.lambda.runtime.ClientContext;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.awslambdacore.v1_0.AwsLambdaRequest;
import java.util.HashMap;
import org.junit.jupiter.api.Test;
class InstrumenterExtractionTest {
@Test
public void useCustomContext() {
AwsLambdaFunctionInstrumenter instr =
AwsLambdaFunctionInstrumenterFactory.createInstrumenter(
OpenTelemetry.propagating(
ContextPropagators.create(W3CTraceContextPropagator.getInstance())));
com.amazonaws.services.lambda.runtime.Context awsContext =
mock(com.amazonaws.services.lambda.runtime.Context.class);
ClientContext clientContext = mock(ClientContext.class);
when(awsContext.getClientContext()).thenReturn(clientContext);
HashMap<String, String> customMap = new HashMap<>();
customMap.put("traceparent", "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01");
when(clientContext.getCustom()).thenReturn(customMap);
AwsLambdaRequest input = AwsLambdaRequest.create(awsContext, new HashMap<>(), new HashMap<>());
Context extracted = instr.extract(input);
SpanContext spanContext = Span.fromContext(extracted).getSpanContext();
assertThat(spanContext.getTraceId()).isEqualTo("4bf92f3577b34da6a3ce929d0e0e4736");
assertThat(spanContext.getSpanId()).isEqualTo("00f067aa0ba902b7");
}
}

View File

@ -13,6 +13,7 @@ muzzle {
excludeInstrumentationName("aws-sdk-2.2-sqs")
excludeInstrumentationName("aws-sdk-2.2-sns")
excludeInstrumentationName("aws-sdk-2.2-lambda")
// several software.amazon.awssdk artifacts are missing for this version
skip("2.17.200")
@ -43,6 +44,7 @@ muzzle {
extraDependency("software.amazon.awssdk:protocol-core")
excludeInstrumentationName("aws-sdk-2.2-sns")
excludeInstrumentationName("aws-sdk-2.2-lambda")
// several software.amazon.awssdk artifacts are missing for this version
skip("2.17.200")
@ -57,6 +59,21 @@ muzzle {
extraDependency("software.amazon.awssdk:protocol-core")
excludeInstrumentationName("aws-sdk-2.2-sqs")
excludeInstrumentationName("aws-sdk-2.2-lambda")
// several software.amazon.awssdk artifacts are missing for this version
skip("2.17.200")
}
pass {
group.set("software.amazon.awssdk")
module.set("lambda")
versions.set("[2.17.0,)")
// Used by all SDK services, the only case it isn't is an SDK extension such as a custom HTTP
// client, which is not target of instrumentation anyways.
extraDependency("software.amazon.awssdk:protocol-core")
excludeInstrumentationName("aws-sdk-2.2-sqs")
excludeInstrumentationName("aws-sdk-2.2-sns")
// several software.amazon.awssdk artifacts are missing for this version
skip("2.17.200")
@ -81,6 +98,7 @@ dependencies {
testLibrary("software.amazon.awssdk:dynamodb:2.2.0")
testLibrary("software.amazon.awssdk:ec2:2.2.0")
testLibrary("software.amazon.awssdk:kinesis:2.2.0")
testLibrary("software.amazon.awssdk:lambda:2.2.0")
testLibrary("software.amazon.awssdk:rds:2.2.0")
testLibrary("software.amazon.awssdk:s3:2.2.0")
testLibrary("software.amazon.awssdk:sqs:2.2.0")

View File

@ -0,0 +1,15 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
public final class LambdaAdviceBridge {
private LambdaAdviceBridge() {}
public static void referenceForMuzzleOnly() {
throw new UnsupportedOperationException(
LambdaImpl.class.getName() + " referencing for muzzle, should never be actually called");
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static net.bytebuddy.matcher.ElementMatchers.none;
import com.google.auto.service.AutoService;
import io.opentelemetry.instrumentation.awssdk.v2_2.LambdaAdviceBridge;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(InstrumentationModule.class)
public class LambdaInstrumentationModule extends AbstractAwsSdkInstrumentationModule {
public LambdaInstrumentationModule() {
super("aws-sdk-2.2-lambda");
}
@Override
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
return hasClassesNamed(
"software.amazon.awssdk.services.lambda.model.InvokeRequest",
"software.amazon.awssdk.protocols.jsoncore.JsonNode");
}
@Override
public void doTransform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
none(), LambdaInstrumentationModule.class.getName() + "$RegisterAdvice");
}
@SuppressWarnings("unused")
public static class RegisterAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit() {
// (indirectly) using LambdaImpl class here to make sure it is available from LambdaAccess
// (injected into app classloader) and checked by Muzzle
LambdaAdviceBridge.referenceForMuzzleOnly();
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;
import io.opentelemetry.instrumentation.awssdk.v2_2.AbstractAws2LambdaTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
class Aws2LambdaTest extends AbstractAws2LambdaTest {
@RegisterExtension
private static final AgentInstrumentationExtension testing =
AgentInstrumentationExtension.create();
@Override
protected InstrumentationExtension getTesting() {
return testing;
}
@Override
protected boolean canTestLambdaInvoke() {
// only supported since 2.17.0
return Boolean.getBoolean("testLatestDeps");
}
@Override
protected ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() {
return ClientOverrideConfiguration.builder();
}
}

View File

@ -17,6 +17,7 @@ dependencies {
testLibrary("software.amazon.awssdk:dynamodb:2.2.0")
testLibrary("software.amazon.awssdk:ec2:2.2.0")
testLibrary("software.amazon.awssdk:kinesis:2.2.0")
testLibrary("software.amazon.awssdk:lambda:2.2.0")
testLibrary("software.amazon.awssdk:rds:2.2.0")
testLibrary("software.amazon.awssdk:s3:2.2.0")
testLibrary("software.amazon.awssdk:sqs:2.2.0")

View File

@ -7,8 +7,11 @@ dependencies {
library("software.amazon.awssdk:aws-core:2.2.0")
library("software.amazon.awssdk:sqs:2.2.0")
library("software.amazon.awssdk:lambda:2.2.0")
library("software.amazon.awssdk:sns:2.2.0")
library("software.amazon.awssdk:aws-json-protocol:2.2.0")
// json-utils was added in 2.17.0
compileOnly("software.amazon.awssdk:json-utils:2.17.0")
compileOnly(project(":muzzle")) // For @NoMuzzle
testImplementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:testing"))
@ -38,10 +41,24 @@ testing {
implementation("software.amazon.awssdk:aws-core:+")
implementation("software.amazon.awssdk:aws-json-protocol:+")
implementation("software.amazon.awssdk:dynamodb:+")
implementation("software.amazon.awssdk:lambda:+")
} else {
implementation("software.amazon.awssdk:aws-core:2.2.0")
implementation("software.amazon.awssdk:aws-json-protocol:2.2.0")
implementation("software.amazon.awssdk:dynamodb:2.2.0")
implementation("software.amazon.awssdk:lambda:2.2.0")
}
}
}
val testLambda by registering(JvmTestSuite::class) {
dependencies {
implementation(project())
implementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:testing"))
if (findProperty("testLatestDeps") as Boolean) {
implementation("software.amazon.awssdk:lambda:+")
} else {
implementation("software.amazon.awssdk:lambda:2.17.0")
}
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import software.amazon.awssdk.core.SdkRequest;
final class LambdaAccess {
private LambdaAccess() {}
private static final boolean enabled = PluginImplUtil.isImplPresent("LambdaImpl");
@NoMuzzle
public static SdkRequest modifyRequest(SdkRequest request, Context otelContext) {
return enabled ? LambdaImpl.modifyRequest(request, otelContext) : null;
}
}

View File

@ -0,0 +1,102 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import io.opentelemetry.api.GlobalOpenTelemetry;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.protocols.jsoncore.JsonNode;
import software.amazon.awssdk.protocols.jsoncore.internal.ObjectJsonNode;
import software.amazon.awssdk.protocols.jsoncore.internal.StringJsonNode;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
// this class is only used from LambdaAccess from method with @NoMuzzle annotation
// Direct lambda invocations (e.g., not through an api gateway) currently strip
// away the otel propagation headers (but leave x-ray ones intact). Use the
// custom client context header as an additional propagation mechanism for this
// very specific scenario. For reference, the header is named "X-Amz-Client-Context" but the api to
// manipulate it abstracts that away. The client context field is documented in
// https://docs.aws.amazon.com/lambda/latest/api/API_Invoke.html#API_Invoke_RequestParameters
final class LambdaImpl {
static {
// Force loading of InvokeRequest; this ensures that an exception is thrown at this point when
// the Lambda library is not present, which will cause DirectLambdaAccess to have
// enabled=false in library mode.
@SuppressWarnings("unused")
String invokeRequestName = InvokeRequest.class.getName();
// was added in 2.17.0
@SuppressWarnings("unused")
String jsonNodeName = JsonNode.class.getName();
}
private static final String CLIENT_CONTEXT_CUSTOM_FIELDS_KEY = "custom";
static final int MAX_CLIENT_CONTEXT_LENGTH = 3583; // visible for testing
private LambdaImpl() {}
@Nullable
static SdkRequest modifyRequest(
SdkRequest request, io.opentelemetry.context.Context otelContext) {
if (isDirectLambdaInvocation(request)) {
return modifyOrAddCustomContextHeader((InvokeRequest) request, otelContext);
}
return null;
}
static boolean isDirectLambdaInvocation(SdkRequest request) {
return request instanceof InvokeRequest;
}
static SdkRequest modifyOrAddCustomContextHeader(
InvokeRequest request, io.opentelemetry.context.Context otelContext) {
InvokeRequest.Builder builder = request.toBuilder();
// Unfortunately the value of this thing is a base64-encoded json with a character limit; also
// therefore not comma-composable like many http headers
String clientContextString = request.clientContext();
String clientContextJsonString = "{}";
if (clientContextString != null && !clientContextString.isEmpty()) {
clientContextJsonString =
new String(Base64.getDecoder().decode(clientContextString), StandardCharsets.UTF_8);
}
JsonNode jsonNode = JsonNode.parser().parse(clientContextJsonString);
if (!jsonNode.isObject()) {
return null;
}
JsonNode customNode =
jsonNode
.asObject()
.computeIfAbsent(
CLIENT_CONTEXT_CUSTOM_FIELDS_KEY, (k) -> new ObjectJsonNode(new LinkedHashMap<>()));
if (!customNode.isObject()) {
return null;
}
Map<String, JsonNode> map = customNode.asObject();
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.inject(otelContext, map, (nodes, key, value) -> nodes.put(key, new StringJsonNode(value)));
if (map.isEmpty()) {
return null;
}
// turn it back into a string (json encode)
String newJson = jsonNode.toString();
// turn it back into a base64 string
String newJson64 = Base64.getEncoder().encodeToString(newJson.getBytes(StandardCharsets.UTF_8));
// check it for length (err on the safe side with >=)
if (newJson64.length() >= MAX_CLIENT_CONTEXT_LENGTH) {
return null;
}
builder.clientContext(newJson64);
return builder.build();
}
}

View File

@ -212,6 +212,10 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
if (modifiedRequest != null) {
return modifiedRequest;
}
modifiedRequest = LambdaAccess.modifyRequest(request, otelContext);
if (modifiedRequest != null) {
return modifiedRequest;
}
// Insert other special handling here, following the same pattern as SQS and SNS.

View File

@ -0,0 +1,107 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
class Aws2LambdaTest extends AbstractAws2LambdaTest {
@RegisterExtension
private static final LibraryInstrumentationExtension testing =
LibraryInstrumentationExtension.create();
private static Context context;
private static AwsSdkTelemetry telemetry;
@BeforeAll
static void setup() {
testing.runWithHttpServerSpan(
() -> {
context = Context.current();
});
telemetry = AwsSdkTelemetry.create(testing.getOpenTelemetry());
}
@Override
protected InstrumentationExtension getTesting() {
return testing;
}
@Override
protected ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() {
return ClientOverrideConfiguration.builder()
.addExecutionInterceptor(telemetry.newExecutionInterceptor());
}
private static String base64ify(String json) {
return Base64.getEncoder().encodeToString(json.getBytes(StandardCharsets.UTF_8));
}
@Test
void noExistingClientContext() {
InvokeRequest request = InvokeRequest.builder().build();
InvokeRequest newRequest =
(InvokeRequest) LambdaImpl.modifyOrAddCustomContextHeader(request, context);
String newClientContext = newRequest.clientContext();
newClientContext =
new String(Base64.getDecoder().decode(newClientContext), StandardCharsets.UTF_8);
assertThat(newClientContext.contains("traceparent")).isTrue();
}
@Test
void withExistingClientContext() {
String clientContext =
base64ify(
"{\"otherStuff\": \"otherValue\", \"custom\": {\"preExisting\": \"somevalue\"} }");
InvokeRequest request = InvokeRequest.builder().clientContext(clientContext).build();
InvokeRequest newRequest =
(InvokeRequest) LambdaImpl.modifyOrAddCustomContextHeader(request, context);
String newClientContext = newRequest.clientContext();
newClientContext =
new String(Base64.getDecoder().decode(newClientContext), StandardCharsets.UTF_8);
assertThat(newClientContext.contains("traceparent")).isTrue();
assertThat(newClientContext.contains("preExisting")).isTrue();
assertThat(newClientContext.contains("otherStuff")).isTrue();
}
@Test
void exceedingMaximumLengthDoesNotModify() {
// awkward way to build a valid json that is almost but not quite too long
StringBuilder buffer = new StringBuilder("x");
String long64edClientContext = "";
while (true) {
buffer.append("x");
String newClientContext = base64ify("{\"" + buffer + "\": \"" + buffer + "\"}");
if (newClientContext.length() >= LambdaImpl.MAX_CLIENT_CONTEXT_LENGTH) {
break;
}
long64edClientContext = newClientContext;
}
InvokeRequest request = InvokeRequest.builder().clientContext(long64edClientContext).build();
assertThat(request.clientContext().equals(long64edClientContext)).isTrue();
InvokeRequest newRequest =
(InvokeRequest) LambdaImpl.modifyOrAddCustomContextHeader(request, context);
assertThat(newRequest).isNull(); // null return means no modification performed
}
}

View File

@ -14,6 +14,7 @@ dependencies {
compileOnly("software.amazon.awssdk:dynamodb:2.2.0")
compileOnly("software.amazon.awssdk:ec2:2.2.0")
compileOnly("software.amazon.awssdk:kinesis:2.2.0")
compileOnly("software.amazon.awssdk:lambda:2.2.0")
compileOnly("software.amazon.awssdk:rds:2.2.0")
compileOnly("software.amazon.awssdk:s3:2.2.0")
compileOnly("software.amazon.awssdk:sqs:2.2.0")

View File

@ -0,0 +1,78 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.testing.internal.armeria.common.HttpResponse;
import io.opentelemetry.testing.internal.armeria.common.HttpStatus;
import io.opentelemetry.testing.internal.armeria.common.MediaType;
import io.opentelemetry.testing.internal.armeria.testing.junit5.server.mock.MockWebServerExtension;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.LambdaClientBuilder;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
public abstract class AbstractAws2LambdaTest {
@RegisterExtension
private static final MockWebServerExtension server = new MockWebServerExtension();
private static final StaticCredentialsProvider CREDENTIALS_PROVIDER =
StaticCredentialsProvider.create(
AwsBasicCredentials.create("my-access-key", "my-secret-key"));
protected abstract InstrumentationExtension getTesting();
protected abstract ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder();
protected boolean canTestLambdaInvoke() {
return true;
}
@Test
void testInvokeLambda() {
Assumptions.assumeTrue(canTestLambdaInvoke());
LambdaClientBuilder builder = LambdaClient.builder();
builder
.overrideConfiguration(createOverrideConfigurationBuilder().build())
.endpointOverride(server.httpUri());
builder.region(Region.AP_NORTHEAST_1).credentialsProvider(CREDENTIALS_PROVIDER);
LambdaClient client = builder.build();
server.enqueue(HttpResponse.of(HttpStatus.OK, MediaType.PLAIN_TEXT_UTF_8, "ok"));
InvokeRequest request = InvokeRequest.builder().functionName("test").build();
InvokeResponse response = client.invoke(request);
assertThat(response.statusCode()).isEqualTo(200);
assertThat(response.payload().asUtf8String()).isEqualTo("ok");
String clientContextHeader =
server.takeRequest().request().headers().get("x-amz-client-context");
assertThat(clientContextHeader).isNotEmpty();
String clientContextJson =
new String(Base64.getDecoder().decode(clientContextHeader), StandardCharsets.UTF_8);
assertThat(clientContextJson).contains("traceparent");
getTesting()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("Lambda.Invoke").hasKind(SpanKind.CLIENT).hasNoParent()));
}
}