Wrappers for AWS lambda tracing (#1471)

* wrappers for AWS lambda instrumentation

* code review changes

* code review changes
This commit is contained in:
Jakub Wach 2020-10-27 08:55:31 +01:00 committed by GitHub
parent 5bed579286
commit 3a81c52d2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 543 additions and 3 deletions

View File

@ -2,8 +2,14 @@
This package contains libraries to help instrument AWS lambda functions in your code.
To use the instrumentation, replace your function classes that implement `RequestHandler` with those
that extend `TracingRequestHandler`. You will need to change the method name to `doHandleRequest`.
## Using wrappers
To use the instrumentation, configure `OTEL_LAMBDA_HANDLER` env property to your lambda handler method in following format `package.ClassName::methodName`
and use `io.opentelemetry.instrumentation.awslambda.v1_0.TracingRequestWrapper` (or `io.opentelemetry.instrumentation.awslambda.v1_0.TracingRequestStreamWrapper`) as
your `Handler`.
## Using handlers
To use the instrumentation, replace your function classes that implement `RequestHandler` (or `RequestStreamHandler`) with those
that extend `TracingRequestHandler` (or `TracingRequestStreamHandler`). You will need to change the method name to `doHandleRequest`.
```java
public class MyRequestHandler extends TracingRequestHandler<String, String> {
@ -20,7 +26,7 @@ public class MyRequestHandler extends TracingRequestHandler<String, String> {
A `SERVER` span will be created with the name you specify for the function when deploying it.
In addition to the code change, it is recommended to setup X-Ray trace propagation to be able to
In addition, it is recommended to setup X-Ray trace propagation to be able to
link to tracing information provided by Lambda itself. To do so, add a dependency on
`opentelemetry-extension-tracepropagators`. Make sure the version matches the version of the SDK
you use.

View File

@ -0,0 +1,105 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Span.Kind;
import io.opentelemetry.trace.Tracer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
/**
* A base class similar to {@link RequestStreamHandler} but will automatically trace invocations of
* {@link #doHandleRequest(InputStream input, OutputStream output, Context)}.
*/
public abstract class TracingRequestStreamHandler implements RequestStreamHandler {
private final AwsLambdaTracer tracer;
/**
* Creates a new {@link TracingRequestStreamHandler} which traces using the default {@link
* Tracer}.
*/
protected TracingRequestStreamHandler() {
this.tracer = new AwsLambdaTracer();
}
/**
* Creates a new {@link TracingRequestStreamHandler} which traces using the specified {@link
* Tracer}.
*/
protected TracingRequestStreamHandler(Tracer tracer) {
this.tracer = new AwsLambdaTracer(tracer);
}
/**
* Creates a new {@link TracingRequestStreamHandler} which traces using the specified {@link
* AwsLambdaTracer}.
*/
protected TracingRequestStreamHandler(AwsLambdaTracer tracer) {
this.tracer = tracer;
}
@Override
public final void handleRequest(InputStream input, OutputStream output, Context context)
throws IOException {
Span span = tracer.startSpan(context, Kind.SERVER);
try (Scope ignored = tracer.startScope(span)) {
doHandleRequest(input, new OutputStreamWrapper(output, span), context);
} catch (Throwable t) {
tracer.endExceptionally(span, t);
OpenTelemetrySdk.getTracerManagement().forceFlush().join(1, TimeUnit.SECONDS);
throw t;
}
}
protected abstract void doHandleRequest(InputStream input, OutputStream output, Context context)
throws IOException;
private class OutputStreamWrapper extends OutputStream {
private final OutputStream delegate;
private final Span span;
OutputStreamWrapper(OutputStream delegate, Span span) {
this.delegate = delegate;
this.span = span;
}
@Override
public void write(byte[] b) throws IOException {
delegate.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
delegate.write(b, off, len);
}
@Override
public void flush() throws IOException {
delegate.flush();
}
@Override
public void close() throws IOException {
delegate.close();
tracer.end(span);
OpenTelemetrySdk.getTracerManagement().forceFlush().join(1, TimeUnit.SECONDS);
}
@Override
public void write(int b) throws IOException {
delegate.write(b);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Wrapper for {@link TracingRequestStreamHandler}. Allows for wrapping a regular lambda, enabling
* single span tracing. Main lambda class should be configured as env property OTEL_LAMBDA_HANDLER
* in package.ClassName::methodName format. Lambda class must implement {@link
* RequestStreamHandler}.
*/
public class TracingRequestStreamWrapper extends TracingRequestStreamHandler {
private static final WrappedLambda WRAPPED_LAMBDA = WrappedLambda.fromConfiguration();
@Override
protected void doHandleRequest(InputStream input, OutputStream output, Context context)
throws IOException {
if (!(WRAPPED_LAMBDA.getTargetObject() instanceof RequestStreamHandler)) {
throw new RuntimeException(
WRAPPED_LAMBDA.getTargetClass().getName()
+ " is not an instance of RequestStreamHandler");
}
((RequestStreamHandler) WRAPPED_LAMBDA.getTargetObject()).handleRequest(input, output, context);
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
import com.amazonaws.services.lambda.runtime.Context;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* Wrapper for {@link TracingRequestHandler}. Allows for wrapping a regular lambda, enabling single
* span tracing. Main lambda class should be configured as env property OTEL_LAMBDA_HANDLER in
* package.ClassName::methodName format.
*/
public final class TracingRequestWrapper extends TracingRequestHandler {
private static final WrappedLambda WRAPPED_LAMBDA = WrappedLambda.fromConfiguration();
private Object[] createParametersArray(Method targetMethod, Object input, Context context) {
Class<?>[] parameterTypes = targetMethod.getParameterTypes();
Object[] parameters = new Object[parameterTypes.length];
for (int i = 0; i < parameterTypes.length; i++) {
// loop through to populate each index of parameter
Object parameter = null;
Class clazz = parameterTypes[i];
boolean isContext = clazz.equals(Context.class);
if (i == 0 && !isContext) {
// first position if it's not context
parameter = input;
} else if (isContext) {
// populate context
parameter = context;
}
parameters[i] = parameter;
}
return parameters;
}
@Override
protected Object doHandleRequest(Object input, Context context) {
Method targetMethod = WRAPPED_LAMBDA.getRequestTargetMethod();
Object[] parameters = createParametersArray(targetMethod, input, context);
Object returnObj;
try {
returnObj = targetMethod.invoke(WRAPPED_LAMBDA.getTargetObject(), parameters);
} catch (IllegalAccessException e) {
throw new RuntimeException("Method is inaccessible", e);
} catch (InvocationTargetException e) {
throw (e.getCause() instanceof RuntimeException
? (RuntimeException) e.getCause()
: new RuntimeException(e.getTargetException()));
}
return returnObj;
}
}

View File

@ -0,0 +1,130 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
import com.amazonaws.services.lambda.runtime.Context;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
/** Model for wrapped lambda function (object, class, method). */
class WrappedLambda {
public static final String OTEL_LAMBDA_HANDLER_ENV_KEY = "OTEL_LAMBDA_HANDLER";
private final Object targetObject;
private final Class<?> targetClass;
private final String targetMethodName;
/**
* Creates new lambda wrapper out of configuration. Supported env properties: - {@value
* OTEL_LAMBDA_HANDLER_ENV_KEY} - lambda handler in format: package.ClassName::methodName
*
* @return
*/
static WrappedLambda fromConfiguration() {
String lambdaHandler = System.getenv(OTEL_LAMBDA_HANDLER_ENV_KEY);
if (lambdaHandler == null || lambdaHandler.isEmpty()) {
throw new RuntimeException(OTEL_LAMBDA_HANDLER_ENV_KEY + " was not specified.");
}
// expect format to be package.ClassName::methodName
String[] split = lambdaHandler.split("::");
if (split.length != 2) {
throw new RuntimeException(
lambdaHandler
+ " is not a valid handler name. Expected format: package.ClassName::methodName");
}
String handlerClassName = split[0];
String targetMethodName = split[1];
Class<?> targetClass;
try {
targetClass = Class.forName(handlerClassName);
} catch (ClassNotFoundException e) {
// no class found
throw new RuntimeException(handlerClassName + " not found in classpath");
}
return new WrappedLambda(targetClass, targetMethodName);
}
WrappedLambda(Class<?> targetClass, String targetMethodName) {
this.targetClass = targetClass;
this.targetMethodName = targetMethodName;
this.targetObject = instantiateTargetClass();
}
private Object instantiateTargetClass() {
Object targetObject;
try {
Constructor<?> ctor = targetClass.getConstructor();
targetObject = ctor.newInstance();
} catch (NoSuchMethodException e) {
throw new RuntimeException(
targetClass.getName() + " does not have an appropriate constructor");
} catch (InstantiationException e) {
throw new RuntimeException(targetClass.getName() + " cannot be an abstract class");
} catch (IllegalAccessException e) {
throw new RuntimeException(targetClass.getName() + "'s constructor is not accessible");
} catch (InvocationTargetException e) {
throw new RuntimeException(
targetClass.getName() + " threw an exception from the constructor");
}
return targetObject;
}
private boolean isLastParameterContext(Parameter[] parameters) {
if (parameters.length == 0) {
return false;
}
return parameters[parameters.length - 1].getType().equals(Context.class);
}
Method getRequestTargetMethod() {
/*
Per method selection specifications
http://docs.aws.amazon.com/lambda/latest/dg/java-programming-model-handler-types.html
- Context can be omitted
- Select the method with the largest number of parameters.
- If two or more methods have the same number of parameters, AWS Lambda selects the method that has the Context as the last parameter.
- If none or all of these methods have the Context parameter, then the behavior is undefined.
*/
List<Method> methods = Arrays.asList(targetClass.getMethods());
Optional<Method> firstOptional =
methods.stream()
.filter((Method m) -> m.getName().equals(targetMethodName))
.sorted(
(Method a, Method b) -> {
// sort descending (reverse of default ascending)
if (a.getParameterCount() != b.getParameterCount()) {
return b.getParameterCount() - a.getParameterCount();
}
if (isLastParameterContext(a.getParameters())) {
return -1;
} else if (isLastParameterContext(b.getParameters())) {
return 1;
}
return -1;
})
.findFirst();
if (!firstOptional.isPresent()) {
throw new RuntimeException("Method " + targetMethodName + " not found");
}
return firstOptional.get();
}
Object getTargetObject() {
return targetObject;
}
Class<?> getTargetClass() {
return targetClass;
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0
import static io.opentelemetry.trace.Span.Kind.SERVER
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.RequestStreamHandler
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.InstrumentationTestTrait
import io.opentelemetry.trace.attributes.SemanticAttributes
import java.nio.charset.Charset
import org.junit.Rule
import org.junit.contrib.java.lang.system.EnvironmentVariables
import spock.lang.Shared
class TracingRequestStreamWrapperTest extends InstrumentationSpecification implements InstrumentationTestTrait {
@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables()
static class TestRequestHandler implements RequestStreamHandler {
@Override
void handleRequest(InputStream input, OutputStream output, Context context) {
BufferedReader reader = new BufferedReader(new InputStreamReader(input))
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(output))
String line = reader.readLine()
if (line == "hello") {
writer.write("world")
writer.flush()
writer.close()
} else {
throw new IllegalArgumentException("bad argument")
}
}
}
@Shared
TracingRequestStreamWrapper wrapper
def childSetup() {
environmentVariables.set(WrappedLambda.OTEL_LAMBDA_HANDLER_ENV_KEY, "io.opentelemetry.instrumentation.awslambda.v1_0.TracingRequestStreamWrapperTest\$TestRequestHandler::handleRequest")
wrapper = new TracingRequestStreamWrapper()
}
def "handler traced"() {
when:
def context = Mock(Context)
context.getFunctionName() >> "my_function"
context.getAwsRequestId() >> "1-22-333"
def input = new ByteArrayInputStream("hello\n".getBytes(Charset.defaultCharset()))
def output = new ByteArrayOutputStream()
wrapper.handleRequest(input, output, context)
then:
assertTraces(1) {
trace(0, 1) {
span(0) {
name("my_function")
kind SERVER
attributes {
"${SemanticAttributes.FAAS_EXECUTION.key}" "1-22-333"
}
}
}
}
}
def "handler traced with exception"() {
when:
def context = Mock(Context)
context.getFunctionName() >> "my_function"
context.getAwsRequestId() >> "1-22-333"
def input = new ByteArrayInputStream("bye".getBytes(Charset.defaultCharset()))
def output = new ByteArrayOutputStream()
def thrown
try {
wrapper.handleRequest(input, output, context)
} catch (Throwable t) {
thrown = t
}
then:
thrown != null
assertTraces(1) {
trace(0, 1) {
span(0) {
name("my_function")
kind SERVER
errored true
errorEvent(IllegalArgumentException, "bad argument")
attributes {
"${SemanticAttributes.FAAS_EXECUTION.key}" "1-22-333"
}
}
}
}
}
}

View File

@ -0,0 +1,96 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0
import static io.opentelemetry.trace.Span.Kind.SERVER
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.RequestHandler
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.InstrumentationTestTrait
import io.opentelemetry.trace.attributes.SemanticAttributes
import org.junit.Rule
import org.junit.contrib.java.lang.system.EnvironmentVariables
import spock.lang.Shared
class TracingRequestWrapperTest extends InstrumentationSpecification implements InstrumentationTestTrait {
@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables()
static class TestRequestHandler implements RequestHandler<String, String> {
@Override
String handleRequest(String input, Context context) {
if (input == "hello") {
return "world"
}
throw new IllegalArgumentException("bad argument")
}
}
@Shared
TracingRequestWrapper wrapper
def childSetup() {
environmentVariables.set(WrappedLambda.OTEL_LAMBDA_HANDLER_ENV_KEY, "io.opentelemetry.instrumentation.awslambda.v1_0.TracingRequestWrapperTest\$TestRequestHandler::handleRequest")
wrapper = new TracingRequestWrapper()
}
def "handler traced"() {
when:
def context = Mock(Context)
context.getFunctionName() >> "my_function"
context.getAwsRequestId() >> "1-22-333"
def result = wrapper.handleRequest("hello", context)
then:
result == "world"
assertTraces(1) {
trace(0, 1) {
span(0) {
name("my_function")
kind SERVER
attributes {
"${SemanticAttributes.FAAS_EXECUTION.key}" "1-22-333"
}
}
}
}
}
def "handler traced with exception"() {
when:
def context = Mock(Context)
context.getFunctionName() >> "my_function"
context.getAwsRequestId() >> "1-22-333"
def thrown
try {
wrapper.handleRequest("goodbye", context)
} catch (Throwable t) {
thrown = t
}
then:
thrown != null
assertTraces(1) {
trace(0, 1) {
span(0) {
name("my_function")
kind SERVER
errored true
errorEvent(IllegalArgumentException, "bad argument")
attributes {
"${SemanticAttributes.FAAS_EXECUTION.key}" "1-22-333"
}
}
}
}
}
}