From e2cfe371f078683c36ad32b5c5b6d8c5c08b616c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Serkan=20=C3=96ZAL?= Date: Mon, 12 Aug 2024 21:45:47 +0300 Subject: [PATCH] Use `aws-lambda-java-serialization` library, which is available by default, while deserializing input and serializing output (#11868) --- .../v1_0/TracingRequestStreamHandler.java | 45 +- .../v1_0/TracingRequestStreamWrapper.java | 5 +- .../library/build.gradle.kts | 8 + .../v2_2/LambdaParameters.java | 31 ++ .../v2_2/TracingRequestApiGatewayWrapper.java | 17 +- .../v2_2/TracingRequestWrapper.java | 86 +++- .../v2_2/TracingRequestWrapperBase.java | 6 - .../v2_2/{ => internal}/CustomJodaModule.java | 2 +- .../v2_2/internal/SerializationUtil.java | 133 ++++++ .../v2_2/AwsLambdaWrapperTest.java | 46 +- .../v2_2/LambdaParametersTest.java | 130 ++++++ ...acingRequestWrapperStandardEventsTest.java | 398 ++++++++++-------- .../v2_2/internal/SerializationUtilTest.java | 359 ++++++++++++++++ 13 files changed, 1009 insertions(+), 257 deletions(-) rename instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/{ => internal}/CustomJodaModule.java (96%) create mode 100644 instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/internal/SerializationUtil.java create mode 100644 instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/internal/SerializationUtilTest.java diff --git a/instrumentation/aws-lambda/aws-lambda-core-1.0/library/src/main/java/io/opentelemetry/instrumentation/awslambdacore/v1_0/TracingRequestStreamHandler.java b/instrumentation/aws-lambda/aws-lambda-core-1.0/library/src/main/java/io/opentelemetry/instrumentation/awslambdacore/v1_0/TracingRequestStreamHandler.java index fced56818f..ae6a42bd6d 100644 --- a/instrumentation/aws-lambda/aws-lambda-core-1.0/library/src/main/java/io/opentelemetry/instrumentation/awslambdacore/v1_0/TracingRequestStreamHandler.java +++ b/instrumentation/aws-lambda/aws-lambda-core-1.0/library/src/main/java/io/opentelemetry/instrumentation/awslambdacore/v1_0/TracingRequestStreamHandler.java @@ -7,6 +7,7 @@ package io.opentelemetry.instrumentation.awslambdacore.v1_0; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.ApiGatewayProxyRequest; import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.AwsLambdaFunctionInstrumenter; @@ -67,49 +68,55 @@ public abstract class TracingRequestStreamHandler implements RequestStreamHandle @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { - ApiGatewayProxyRequest proxyRequest = ApiGatewayProxyRequest.forStream(input); - AwsLambdaRequest request = - AwsLambdaRequest.create(context, proxyRequest, proxyRequest.getHeaders()); + AwsLambdaRequest request = createRequest(input, context, proxyRequest); io.opentelemetry.context.Context parentContext = instrumenter.extract(request); if (!instrumenter.shouldStart(parentContext, request)) { - doHandleRequest(proxyRequest.freshStream(), output, context); + doHandleRequest(proxyRequest.freshStream(), output, context, request); return; } io.opentelemetry.context.Context otelContext = instrumenter.start(parentContext, request); + Throwable error = null; try (Scope ignored = otelContext.makeCurrent()) { doHandleRequest( proxyRequest.freshStream(), - new OutputStreamWrapper(output, otelContext, request, openTelemetrySdk), - context); + new OutputStreamWrapper(output, otelContext), + context, + request); } catch (Throwable t) { - instrumenter.end(otelContext, request, null, t); - LambdaUtils.forceFlush(openTelemetrySdk, flushTimeoutNanos, TimeUnit.NANOSECONDS); + error = t; throw t; + } finally { + instrumenter.end(otelContext, request, null, error); + LambdaUtils.forceFlush(openTelemetrySdk, flushTimeoutNanos, TimeUnit.NANOSECONDS); } } + protected AwsLambdaRequest createRequest( + InputStream input, Context context, ApiGatewayProxyRequest proxyRequest) throws IOException { + return AwsLambdaRequest.create(context, proxyRequest, proxyRequest.getHeaders()); + } + + protected void doHandleRequest( + InputStream input, OutputStream output, Context context, AwsLambdaRequest request) + throws IOException { + doHandleRequest(input, output, context); + } + protected abstract void doHandleRequest(InputStream input, OutputStream output, Context context) throws IOException; - private class OutputStreamWrapper extends OutputStream { + private static class OutputStreamWrapper extends OutputStream { private final OutputStream delegate; private final io.opentelemetry.context.Context otelContext; - private final AwsLambdaRequest request; - private final OpenTelemetrySdk openTelemetrySdk; private OutputStreamWrapper( - OutputStream delegate, - io.opentelemetry.context.Context otelContext, - AwsLambdaRequest request, - OpenTelemetrySdk openTelemetrySdk) { + OutputStream delegate, io.opentelemetry.context.Context otelContext) { this.delegate = delegate; this.otelContext = otelContext; - this.request = request; - this.openTelemetrySdk = openTelemetrySdk; } @Override @@ -135,8 +142,8 @@ public abstract class TracingRequestStreamHandler implements RequestStreamHandle @Override public void close() throws IOException { delegate.close(); - instrumenter.end(otelContext, request, null, null); - LambdaUtils.forceFlush(openTelemetrySdk, flushTimeoutNanos, TimeUnit.NANOSECONDS); + Span span = Span.fromContext(otelContext); + span.addEvent("Output stream closed"); } } } diff --git a/instrumentation/aws-lambda/aws-lambda-core-1.0/library/src/main/java/io/opentelemetry/instrumentation/awslambdacore/v1_0/TracingRequestStreamWrapper.java b/instrumentation/aws-lambda/aws-lambda-core-1.0/library/src/main/java/io/opentelemetry/instrumentation/awslambdacore/v1_0/TracingRequestStreamWrapper.java index 18ceb38cbf..d20564ffc8 100644 --- a/instrumentation/aws-lambda/aws-lambda-core-1.0/library/src/main/java/io/opentelemetry/instrumentation/awslambdacore/v1_0/TracingRequestStreamWrapper.java +++ b/instrumentation/aws-lambda/aws-lambda-core-1.0/library/src/main/java/io/opentelemetry/instrumentation/awslambdacore/v1_0/TracingRequestStreamWrapper.java @@ -23,7 +23,7 @@ import java.io.OutputStream; */ public class TracingRequestStreamWrapper extends TracingRequestStreamHandler { - private final WrappedLambda wrappedLambda; + protected final WrappedLambda wrappedLambda; public TracingRequestStreamWrapper() { this( @@ -32,7 +32,8 @@ public class TracingRequestStreamWrapper extends TracingRequestStreamHandler { } // Visible for testing - TracingRequestStreamWrapper(OpenTelemetrySdk openTelemetrySdk, WrappedLambda wrappedLambda) { + protected TracingRequestStreamWrapper( + OpenTelemetrySdk openTelemetrySdk, WrappedLambda wrappedLambda) { super(openTelemetrySdk, WrapperConfiguration.flushTimeout()); this.wrappedLambda = wrappedLambda; } diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/build.gradle.kts b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/build.gradle.kts index 6918c6fbfa..452d421895 100644 --- a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/build.gradle.kts +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/build.gradle.kts @@ -18,6 +18,13 @@ dependencies { // in public API. library("com.amazonaws:aws-lambda-java-events:2.2.1") + // By default, "aws-lambda-java-serialization" library is enabled in the classpath + // at the AWS Lambda environment except "java8" runtime which is deprecated. + // But it is available at "java8.al2" runtime, so it is still can be used + // by Java 8 based Lambda functions. + // So that is the reason that why we add it as compile only dependency. + compileOnly("com.amazonaws:aws-lambda-java-serialization:1.1.5") + // We need Jackson for wrappers to reproduce the serialization does when Lambda invokes a RequestHandler with event // since Lambda will only be able to invoke the wrapper itself with a generic Object. // Note that Lambda itself uses Jackson, but does not expose it to the function so we need to include it here. @@ -33,6 +40,7 @@ dependencies { testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure") testImplementation("io.opentelemetry:opentelemetry-extension-trace-propagators") testImplementation("com.google.guava:guava") + testImplementation("com.amazonaws:aws-lambda-java-serialization:1.1.5") testImplementation(project(":instrumentation:aws-lambda:aws-lambda-events-2.2:testing")) testImplementation("uk.org.webcompere:system-stubs-jupiter") diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/LambdaParameters.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/LambdaParameters.java index ecc3a33d70..53e68280ba 100644 --- a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/LambdaParameters.java +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/LambdaParameters.java @@ -6,6 +6,7 @@ package io.opentelemetry.instrumentation.awslambdaevents.v2_2; import com.amazonaws.services.lambda.runtime.Context; +import java.io.InputStream; import java.lang.reflect.Method; import java.util.function.BiFunction; @@ -27,5 +28,35 @@ final class LambdaParameters { return parameters; } + static Object[] toParameters(Method targetMethod, T input, Context context) { + Class[] parameterTypes = targetMethod.getParameterTypes(); + Object[] parameters = new Object[parameterTypes.length]; + for (int i = 0; i < parameterTypes.length; i++) { + Class clazz = parameterTypes[i]; + boolean isContext = clazz.equals(Context.class); + if (isContext) { + parameters[i] = context; + } else if (i == 0) { + parameters[0] = input; + } + } + return parameters; + } + + static Object toInput( + Method targetMethod, + InputStream inputStream, + BiFunction, Object> mapper) { + Class[] parameterTypes = targetMethod.getParameterTypes(); + for (int i = 0; i < parameterTypes.length; i++) { + Class clazz = parameterTypes[i]; + boolean isContext = clazz.equals(Context.class); + if (i == 0 && !isContext) { + return mapper.apply(inputStream, clazz); + } + } + return null; + } + private LambdaParameters() {} } diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestApiGatewayWrapper.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestApiGatewayWrapper.java index 00eb6e88e5..711b7e3079 100644 --- a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestApiGatewayWrapper.java +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestApiGatewayWrapper.java @@ -8,8 +8,8 @@ package io.opentelemetry.instrumentation.awslambdaevents.v2_2; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent; -import com.fasterxml.jackson.core.JsonProcessingException; import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.WrappedLambda; +import io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal.SerializationUtil; import io.opentelemetry.sdk.OpenTelemetrySdk; import java.util.function.BiFunction; @@ -35,12 +35,7 @@ public class TracingRequestApiGatewayWrapper // Visible for testing static T map(APIGatewayProxyRequestEvent event, Class clazz) { - try { - return OBJECT_MAPPER.readValue(event.getBody(), clazz); - } catch (JsonProcessingException e) { - throw new IllegalStateException( - "Could not map API Gateway event body to requested parameter type: " + clazz, e); - } + return SerializationUtil.fromJson(event.getBody(), clazz); } @Override @@ -52,12 +47,8 @@ public class TracingRequestApiGatewayWrapper if (result instanceof APIGatewayProxyResponseEvent) { event = (APIGatewayProxyResponseEvent) result; } else { - try { - event = new APIGatewayProxyResponseEvent(); - event.setBody(OBJECT_MAPPER.writeValueAsString(result)); - } catch (JsonProcessingException e) { - throw new IllegalStateException("Could not serialize return value.", e); - } + event = new APIGatewayProxyResponseEvent(); + event.setBody(SerializationUtil.toJson(result)); } return event; } diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapper.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapper.java index 5cbeec41cf..484ee169d6 100644 --- a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapper.java +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapper.java @@ -5,32 +5,90 @@ package io.opentelemetry.instrumentation.awslambdaevents.v2_2; +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; +import io.opentelemetry.instrumentation.awslambdacore.v1_0.AwsLambdaRequest; +import io.opentelemetry.instrumentation.awslambdacore.v1_0.TracingRequestStreamWrapper; +import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.ApiGatewayProxyRequest; +import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.MapUtils; import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.WrappedLambda; +import io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal.SerializationUtil; import io.opentelemetry.sdk.OpenTelemetrySdk; -import java.util.function.BiFunction; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Map; /** - * Wrapper for {@link io.opentelemetry.instrumentation.awslambdacore.v1_0.TracingRequestHandler}. - * Allows for wrapping a regular lambda, not proxied through API Gateway. Therefore, HTTP headers - * propagation is not supported. + * Wrapper for {@link com.amazonaws.services.lambda.runtime.RequestHandler} based Lambda handlers. */ -public class TracingRequestWrapper extends TracingRequestWrapperBase { +public class TracingRequestWrapper extends TracingRequestStreamWrapper { public TracingRequestWrapper() { - super(TracingRequestWrapper::map); + super(); } // Visible for testing - TracingRequestWrapper( - OpenTelemetrySdk openTelemetrySdk, - WrappedLambda wrappedLambda, - BiFunction, Object> mapper) { - super(openTelemetrySdk, wrappedLambda, mapper); + TracingRequestWrapper(OpenTelemetrySdk openTelemetrySdk, WrappedLambda wrappedLambda) { + super(openTelemetrySdk, wrappedLambda); } - // Visible for testing - static T map(Object jsonMap, Class clazz) { + @Override + protected final AwsLambdaRequest createRequest( + InputStream inputStream, Context context, ApiGatewayProxyRequest proxyRequest) { + Method targetMethod = wrappedLambda.getRequestTargetMethod(); + Object input = LambdaParameters.toInput(targetMethod, inputStream, TracingRequestWrapper::map); + return AwsLambdaRequest.create(context, input, extractHeaders(input)); + } + + protected Map extractHeaders(Object input) { + if (input instanceof APIGatewayProxyRequestEvent) { + return MapUtils.emptyIfNull(((APIGatewayProxyRequestEvent) input).getHeaders()); + } + return Collections.emptyMap(); + } + + @Override + protected final void doHandleRequest( + InputStream input, OutputStream output, Context context, AwsLambdaRequest request) { + Method targetMethod = wrappedLambda.getRequestTargetMethod(); + Object[] parameters = LambdaParameters.toParameters(targetMethod, request.getInput(), context); try { - return OBJECT_MAPPER.convertValue(jsonMap, clazz); + Object result = targetMethod.invoke(wrappedLambda.getTargetObject(), parameters); + SerializationUtil.toJson(output, result); + } catch (IllegalAccessException e) { + throw new IllegalStateException("Method is inaccessible", e); + } catch (InvocationTargetException e) { + throw (e.getCause() instanceof RuntimeException + ? (RuntimeException) e.getCause() + : new IllegalStateException(e.getTargetException())); + } + } + + @SuppressWarnings({"unchecked", "TypeParameterUnusedInFormals"}) + // Used for testing + OUTPUT handleRequest(INPUT input, Context context) throws IOException { + byte[] inputJsonData = SerializationUtil.toJsonData(input); + ByteArrayInputStream inputStream = new ByteArrayInputStream(inputJsonData); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + super.handleRequest(inputStream, outputStream, context); + + byte[] outputJsonData = outputStream.toByteArray(); + return (OUTPUT) + SerializationUtil.fromJson( + new ByteArrayInputStream(outputJsonData), + wrappedLambda.getRequestTargetMethod().getReturnType()); + } + + // Visible for testing + static T map(InputStream inputStream, Class clazz) { + try { + return SerializationUtil.fromJson(inputStream, clazz); } catch (IllegalArgumentException e) { throw new IllegalStateException( "Could not map input to requested parameter type: " + clazz, e); diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapperBase.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapperBase.java index c435e03d2e..5169c8f121 100644 --- a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapperBase.java +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapperBase.java @@ -7,8 +7,6 @@ package io.opentelemetry.instrumentation.awslambdaevents.v2_2; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; import io.opentelemetry.instrumentation.api.internal.HttpConstants; import io.opentelemetry.instrumentation.awslambdacore.v1_0.TracingRequestHandler; import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.MapUtils; @@ -29,10 +27,6 @@ import java.util.function.BiFunction; */ abstract class TracingRequestWrapperBase extends TracingRequestHandler { - protected static final ObjectMapper OBJECT_MAPPER = - new ObjectMapper() - .registerModule(new CustomJodaModule()) - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); private final WrappedLambda wrappedLambda; private final Method targetMethod; private final BiFunction, Object> parameterMapper; diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/CustomJodaModule.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/internal/CustomJodaModule.java similarity index 96% rename from instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/CustomJodaModule.java rename to instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/internal/CustomJodaModule.java index a1991b4681..1be598ae8f 100644 --- a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/CustomJodaModule.java +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/internal/CustomJodaModule.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.instrumentation.awslambdaevents.v2_2; +package io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/internal/SerializationUtil.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/internal/SerializationUtil.java new file mode 100644 index 0000000000..81649340e6 --- /dev/null +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/main/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/internal/SerializationUtil.java @@ -0,0 +1,133 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal; + +import com.amazonaws.services.lambda.runtime.serialization.PojoSerializer; +import com.amazonaws.services.lambda.runtime.serialization.events.LambdaEventSerializers; +import com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + +/** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ +public final class SerializationUtil { + + private static final int DEFAULT_BUFFER_SIZE = 1024; + + private static final ClassValue> serializerCache = + new ClassValue>() { + @Override + protected PojoSerializer computeValue(Class type) { + return createSerializer(type); + } + }; + + private static PojoSerializer createSerializer(Class clazz) { + try { + if (LambdaEventSerializers.isLambdaSupportedEvent(clazz.getName())) { + return LambdaEventSerializers.serializerFor(clazz, clazz.getClassLoader()); + } + return JacksonFactory.getInstance().getSerializer(clazz); + } catch (NoClassDefFoundError e) { + // For "java8" runtime, "aws-lambda-java-serialization" library + // is not available in the classpath by default. + // So fall back to object mapper based legacy serialization. + return new ObjectMapperPojoSerializer(clazz); + } + } + + @SuppressWarnings("unchecked") + public static PojoSerializer getSerializer(Class clazz) { + return (PojoSerializer) serializerCache.get(clazz); + } + + public static T fromJson(String json, Class clazz) { + PojoSerializer serializer = getSerializer(clazz); + return serializer.fromJson(json); + } + + public static T fromJson(InputStream inputStream, Class clazz) { + PojoSerializer serializer = getSerializer(clazz); + return serializer.fromJson(inputStream); + } + + @SuppressWarnings("unchecked") + public static void toJson(OutputStream outputStream, T obj) { + if (obj != null) { + PojoSerializer serializer = getSerializer((Class) obj.getClass()); + serializer.toJson(obj, outputStream); + } + } + + @SuppressWarnings("unchecked") + public static String toJson(T obj) { + if (obj == null) { + return null; + } + PojoSerializer serializer = getSerializer((Class) obj.getClass()); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE); + serializer.toJson(obj, outputStream); + return new String(outputStream.toByteArray(), StandardCharsets.UTF_8); + } + + public static byte[] toJsonData(T obj) { + if (obj == null) { + return new byte[] {}; + } + ByteArrayOutputStream os = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE); + SerializationUtil.toJson(os, obj); + return os.toByteArray(); + } + + private static class ObjectMapperPojoSerializer implements PojoSerializer { + + private final ObjectMapper objectMapper = + new ObjectMapper() + .registerModule(new CustomJodaModule()) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + private final Class clazz; + + ObjectMapperPojoSerializer(Class clazz) { + this.clazz = clazz; + } + + @Override + public T fromJson(InputStream input) { + try { + return objectMapper.readValue(input, clazz); + } catch (IOException e) { + throw new IllegalStateException("Could not deserialize from JSON input stream.", e); + } + } + + @Override + public T fromJson(String input) { + try { + return objectMapper.readValue(input, clazz); + } catch (IOException e) { + throw new IllegalStateException("Could not deserialize from JSON string.", e); + } + } + + @Override + public void toJson(T value, OutputStream output) { + try { + objectMapper.writeValue(output, value); + } catch (IOException e) { + throw new IllegalStateException("Could not serialize to JSON.", e); + } + } + } + + private SerializationUtil() {} +} diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/AwsLambdaWrapperTest.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/AwsLambdaWrapperTest.java index 87cb651713..12bef1d950 100644 --- a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/AwsLambdaWrapperTest.java +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/AwsLambdaWrapperTest.java @@ -19,6 +19,7 @@ import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExte import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.semconv.incubating.CloudIncubatingAttributes; import io.opentelemetry.semconv.incubating.FaasIncubatingAttributes; +import java.io.IOException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -54,12 +55,9 @@ public class AwsLambdaWrapperTest { key = WrappedLambda.OTEL_LAMBDA_HANDLER_ENV_KEY, value = "io.opentelemetry.instrumentation.awslambdaevents.v2_2.AwsLambdaWrapperTest$TestRequestHandlerString::handleRequest") - void handlerTraced() { + void handlerTraced() throws IOException { TracingRequestWrapper wrapper = - new TracingRequestWrapper( - testing.getOpenTelemetrySdk(), - WrappedLambda.fromConfiguration(), - TracingRequestWrapper::map); + new TracingRequestWrapper(testing.getOpenTelemetrySdk(), WrappedLambda.fromConfiguration()); Object result = wrapper.handleRequest("hello", context); assertThat(result).isEqualTo("world"); @@ -84,10 +82,7 @@ public class AwsLambdaWrapperTest { "io.opentelemetry.instrumentation.awslambdaevents.v2_2.AwsLambdaWrapperTest$TestRequestHandlerString::handleRequest") void handlerTracedWithException() { TracingRequestWrapper wrapper = - new TracingRequestWrapper( - testing.getOpenTelemetrySdk(), - WrappedLambda.fromConfiguration(), - TracingRequestWrapper::map); + new TracingRequestWrapper(testing.getOpenTelemetrySdk(), WrappedLambda.fromConfiguration()); Throwable thrown = catchThrowable(() -> wrapper.handleRequest("goodbye", context)); assertThat(thrown).isInstanceOf(IllegalArgumentException.class); @@ -112,12 +107,9 @@ public class AwsLambdaWrapperTest { key = WrappedLambda.OTEL_LAMBDA_HANDLER_ENV_KEY, value = "io.opentelemetry.instrumentation.awslambdaevents.v2_2.AwsLambdaWrapperTest$TestRequestHandlerInteger::handleRequest") - void handlerTraced_integer() { + void handlerTraced_integer() throws IOException { TracingRequestWrapper wrapper = - new TracingRequestWrapper( - testing.getOpenTelemetrySdk(), - WrappedLambda.fromConfiguration(), - TracingRequestWrapper::map); + new TracingRequestWrapper(testing.getOpenTelemetrySdk(), WrappedLambda.fromConfiguration()); Object result = wrapper.handleRequest(1, context); assertThat(result).isEqualTo("world"); @@ -140,12 +132,9 @@ public class AwsLambdaWrapperTest { key = WrappedLambda.OTEL_LAMBDA_HANDLER_ENV_KEY, value = "io.opentelemetry.instrumentation.awslambdaevents.v2_2.AwsLambdaWrapperTest$TestRequestHandlerCustomType::handleRequest") - void handlerTraced_custom() { + void handlerTraced_custom() throws IOException { TracingRequestWrapper wrapper = - new TracingRequestWrapper( - testing.getOpenTelemetrySdk(), - WrappedLambda.fromConfiguration(), - TracingRequestWrapper::map); + new TracingRequestWrapper(testing.getOpenTelemetrySdk(), WrappedLambda.fromConfiguration()); CustomType ct = new CustomType(); ct.key = "hello there"; ct.value = "General Kenobi"; @@ -188,9 +177,28 @@ public class AwsLambdaWrapperTest { } } + @SuppressWarnings("UnusedMethod") private static class CustomType { String key; String value; + + // Need getter/setter of all the attributes for serialization/deserialization + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } } public static final class TestRequestHandlerCustomType diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/LambdaParametersTest.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/LambdaParametersTest.java index 2bf8357f43..f738b5a349 100644 --- a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/LambdaParametersTest.java +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/LambdaParametersTest.java @@ -9,6 +9,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import com.amazonaws.services.lambda.runtime.Context; +import io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal.SerializationUtil; +import java.io.ByteArrayInputStream; import java.lang.reflect.Method; import org.junit.jupiter.api.Test; @@ -16,6 +18,10 @@ class LambdaParametersTest { public void onlyContext(Context context) {} + public void noContext(String one) {} + + public void contextOnSecond(String one, Context context) {} + public void contextOnThird(String one, String two, Context context) {} @Test @@ -30,6 +36,31 @@ class LambdaParametersTest { assertThat(params[0]).isEqualTo(context); } + @Test + void shouldOnlySetInputWhenNoContext() throws NoSuchMethodException { + // given + Context context = mock(Context.class); + Method method = getClass().getMethod("noContext", String.class); + // when + Object[] params = LambdaParameters.toArray(method, "", context, (o, c) -> o); + // then + assertThat(params).hasSize(1); + assertThat(params[0]).isEqualTo(""); + } + + @Test + void shouldSetContextOnTheSecondPosition() throws NoSuchMethodException { + // given + Context context = mock(Context.class); + Method method = getClass().getMethod("contextOnSecond", String.class, Context.class); + // when + Object[] params = LambdaParameters.toArray(method, "", context, (o, c) -> o); + // then + assertThat(params).hasSize(2); + assertThat(params[0]).isEqualTo(""); + assertThat(params[1]).isEqualTo(context); + } + @Test void shouldSetContextOnTheLastPosition() throws NoSuchMethodException { // given @@ -44,4 +75,103 @@ class LambdaParametersTest { assertThat(params[1]).isNull(); assertThat(params[2]).isEqualTo(context); } + + @Test + void shouldNotResolveInputWhenNoInput() throws NoSuchMethodException { + // given + Method method = getClass().getMethod("onlyContext", Context.class); + String giveInput = "testInput"; + // when + Object resolvedInput = + LambdaParameters.toInput( + method, + new ByteArrayInputStream(SerializationUtil.toJsonData(giveInput)), + (i, c) -> SerializationUtil.fromJson(i, c)); + // then + assertThat(resolvedInput).isNull(); + } + + @Test + void shouldResolveInputWithContext() throws NoSuchMethodException { + // given + Method method = getClass().getMethod("contextOnSecond", String.class, Context.class); + String givenInput = "testInput"; + // when + Object resolvedInput = + LambdaParameters.toInput( + method, + new ByteArrayInputStream(SerializationUtil.toJsonData(givenInput)), + (i, c) -> SerializationUtil.fromJson(i, c)); + // then + assertThat(resolvedInput).isNotNull(); + assertThat(resolvedInput).isEqualTo(givenInput); + } + + @Test + void shouldResolveInputWithoutContext() throws NoSuchMethodException { + // given + Method method = getClass().getMethod("noContext", String.class); + String givenInput = "testInput"; + // when + Object resolvedInput = + LambdaParameters.toInput( + method, + new ByteArrayInputStream(SerializationUtil.toJsonData(givenInput)), + (i, c) -> SerializationUtil.fromJson(i, c)); + // then + assertThat(resolvedInput).isNotNull(); + assertThat(resolvedInput).isEqualTo(givenInput); + } + + @Test + void shouldResolveParametersWhenOnlyContext() throws NoSuchMethodException { + // given + Context context = mock(Context.class); + Method method = getClass().getMethod("onlyContext", Context.class); + // when + Object[] params = LambdaParameters.toParameters(method, "", context); + // then + assertThat(params).hasSize(1); + assertThat(params[0]).isEqualTo(context); + } + + @Test + void shouldResolveParametersWhenNoContext() throws NoSuchMethodException { + // given + Context context = mock(Context.class); + Method method = getClass().getMethod("noContext", String.class); + // when + Object[] params = LambdaParameters.toParameters(method, "", context); + // then + assertThat(params).hasSize(1); + assertThat(params[0]).isEqualTo(""); + } + + @Test + void shouldResolveParametersWhenContextOnTheSecondPosition() throws NoSuchMethodException { + // given + Context context = mock(Context.class); + Method method = getClass().getMethod("contextOnSecond", String.class, Context.class); + // when + Object[] params = LambdaParameters.toParameters(method, "", context); + // then + assertThat(params).hasSize(2); + assertThat(params[0]).isEqualTo(""); + assertThat(params[1]).isEqualTo(context); + } + + @Test + void shouldResolveParametersWhenContextOnTheLastPosition() throws NoSuchMethodException { + // given + Context context = mock(Context.class); + Method method = + getClass().getMethod("contextOnThird", String.class, String.class, Context.class); + // when + Object[] params = LambdaParameters.toParameters(method, "", context); + // then + assertThat(params).hasSize(3); + assertThat(params[0]).isEqualTo(""); + assertThat(params[1]).isNull(); + assertThat(params[2]).isEqualTo(context); + } } diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapperStandardEventsTest.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapperStandardEventsTest.java index 620d084a1b..c25d911ed1 100644 --- a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapperStandardEventsTest.java +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/TracingRequestWrapperStandardEventsTest.java @@ -5,7 +5,7 @@ package io.opentelemetry.instrumentation.awslambdaevents.v2_2; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import com.amazonaws.services.lambda.runtime.Context; @@ -15,198 +15,215 @@ import com.amazonaws.services.lambda.runtime.events.S3Event; import com.amazonaws.services.lambda.runtime.events.SNSEvent; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.ScheduledEvent; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import io.opentelemetry.instrumentation.awslambdacore.v1_0.internal.WrappedLambda; +import io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal.SerializationUtil; import io.opentelemetry.sdk.OpenTelemetrySdk; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; class TracingRequestWrapperStandardEventsTest { - private static final String SUCCESS = "success"; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final Map, String> EVENTS_JSON = buildEventExamples(); + private static final Map, EventInfo> EVENTS_JSON = buildEventExamples(); private final OpenTelemetrySdk sdk = OpenTelemetrySdk.builder().build(); private final Context context = mock(Context.class); private TracingRequestWrapper wrapper; - private static Map, String> buildEventExamples() { - Map, String> events = new HashMap<>(); + static final class EventInfo { + final Class eventType; + final String eventBody; + + EventInfo(Class eventType, String eventBody) { + this.eventType = eventType; + this.eventBody = eventBody; + } + } + + private static Map, EventInfo> buildEventExamples() { + Map, EventInfo> events = new HashMap<>(); events.put( ScheduledEventRequestHandler.class, - "{\n" - + " \"version\": \"0\",\n" - + " \"id\": \"53dc4d37-cffa-4f76-80c9-8b7d4a4d2eaa\",\n" - + " \"detail-type\": \"Scheduled Event\",\n" - + " \"source\": \"aws.events\",\n" - + " \"account\": \"123456789012\",\n" - + " \"time\": \"2015-10-08T16:53:06Z\",\n" - + " \"region\": \"us-east-1\",\n" - + " \"resources\": [\n" - + " \"arn:aws:events:us-east-1:123456789012:rule/my-scheduled-rule\"\n" - + " ],\n" - + " \"detail\": {}\n" - + "}"); + new EventInfo( + ScheduledEvent.class, + "{\n" + + " \"version\": \"0\",\n" + + " \"id\": \"53dc4d37-cffa-4f76-80c9-8b7d4a4d2eaa\",\n" + + " \"detail-type\": \"Scheduled Event\",\n" + + " \"source\": \"aws.events\",\n" + + " \"account\": \"123456789012\",\n" + + " \"time\": \"2015-10-08T16:53:06Z\",\n" + + " \"region\": \"us-east-1\",\n" + + " \"resources\": [\n" + + " \"arn:aws:events:us-east-1:123456789012:rule/my-scheduled-rule\"\n" + + " ],\n" + + " \"detail\": {}\n" + + "}")); events.put( KinesisEventRequestHandler.class, - "{\n" - + " \"Records\": [\n" - + " {\n" - + " \"kinesis\": {\n" - + " \"kinesisSchemaVersion\": \"1.0\",\n" - + " \"partitionKey\": \"1\",\n" - + " \"sequenceNumber\": \"49590338271490256608559692538361571095921575989136588898\",\n" - + " \"data\": \"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==\",\n" - + " \"approximateArrivalTimestamp\": 1545084650.987\n" - + " },\n" - + " \"eventSource\": \"aws:kinesis\",\n" - + " \"eventVersion\": \"1.0\",\n" - + " \"eventID\": \"shardId-000000000006:49590338271490256608559692538361571095921575989136588898\",\n" - + " \"eventName\": \"aws:kinesis:record\",\n" - + " \"invokeIdentityArn\": \"arn:aws:iam::123456789012:role/lambda-role\",\n" - + " \"awsRegion\": \"us-east-2\",\n" - + " \"eventSourceARN\": \"arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream\"\n" - + " },\n" - + " {\n" - + " \"kinesis\": {\n" - + " \"kinesisSchemaVersion\": \"1.0\",\n" - + " \"partitionKey\": \"1\",\n" - + " \"sequenceNumber\": \"49590338271490256608559692540925702759324208523137515618\",\n" - + " \"data\": \"VGhpcyBpcyBvbmx5IGEgdGVzdC4=\",\n" - + " \"approximateArrivalTimestamp\": 1545084711.166\n" - + " },\n" - + " \"eventSource\": \"aws:kinesis\",\n" - + " \"eventVersion\": \"1.0\",\n" - + " \"eventID\": \"shardId-000000000006:49590338271490256608559692540925702759324208523137515618\",\n" - + " \"eventName\": \"aws:kinesis:record\",\n" - + " \"invokeIdentityArn\": \"arn:aws:iam::123456789012:role/lambda-role\",\n" - + " \"awsRegion\": \"us-east-2\",\n" - + " \"eventSourceARN\": \"arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream\"\n" - + " }\n" - + " ]\n" - + "}"); + new EventInfo( + KinesisEvent.class, + "{\n" + + " \"Records\": [\n" + + " {\n" + + " \"kinesis\": {\n" + + " \"kinesisSchemaVersion\": \"1.0\",\n" + + " \"partitionKey\": \"1\",\n" + + " \"sequenceNumber\": \"49590338271490256608559692538361571095921575989136588898\",\n" + + " \"data\": \"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==\",\n" + + " \"approximateArrivalTimestamp\": 1545084650.987\n" + + " },\n" + + " \"eventSource\": \"aws:kinesis\",\n" + + " \"eventVersion\": \"1.0\",\n" + + " \"eventID\": \"shardId-000000000006:49590338271490256608559692538361571095921575989136588898\",\n" + + " \"eventName\": \"aws:kinesis:record\",\n" + + " \"invokeIdentityArn\": \"arn:aws:iam::123456789012:role/lambda-role\",\n" + + " \"awsRegion\": \"us-east-2\",\n" + + " \"eventSourceARN\": \"arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream\"\n" + + " },\n" + + " {\n" + + " \"kinesis\": {\n" + + " \"kinesisSchemaVersion\": \"1.0\",\n" + + " \"partitionKey\": \"1\",\n" + + " \"sequenceNumber\": \"49590338271490256608559692540925702759324208523137515618\",\n" + + " \"data\": \"VGhpcyBpcyBvbmx5IGEgdGVzdC4=\",\n" + + " \"approximateArrivalTimestamp\": 1545084711.166\n" + + " },\n" + + " \"eventSource\": \"aws:kinesis\",\n" + + " \"eventVersion\": \"1.0\",\n" + + " \"eventID\": \"shardId-000000000006:49590338271490256608559692540925702759324208523137515618\",\n" + + " \"eventName\": \"aws:kinesis:record\",\n" + + " \"invokeIdentityArn\": \"arn:aws:iam::123456789012:role/lambda-role\",\n" + + " \"awsRegion\": \"us-east-2\",\n" + + " \"eventSourceARN\": \"arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream\"\n" + + " }\n" + + " ]\n" + + "}")); events.put( SqsEventRequestHandler.class, - "{\n" - + " \"Records\": [\n" - + " {\n" - + " \"messageId\": \"059f36b4-87a3-44ab-83d2-661975830a7d\",\n" - + " \"receiptHandle\": \"AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...\",\n" - + " \"body\": \"Test message.\",\n" - + " \"attributes\": {\n" - + " \"ApproximateReceiveCount\": \"1\",\n" - + " \"SentTimestamp\": \"1545082649183\",\n" - + " \"SenderId\": \"AIDAIENQZJOLO23YVJ4VO\",\n" - + " \"ApproximateFirstReceiveTimestamp\": \"1545082649185\"\n" - + " },\n" - + " \"messageAttributes\": {},\n" - + " \"md5OfBody\": \"e4e68fb7bd0e697a0ae8f1bb342846b3\",\n" - + " \"eventSource\": \"aws:sqs\",\n" - + " \"eventSourceARN\": \"arn:aws:sqs:us-east-2:123456789012:my-queue\",\n" - + " \"awsRegion\": \"us-east-2\"\n" - + " },\n" - + " {\n" - + " \"messageId\": \"2e1424d4-f796-459a-8184-9c92662be6da\",\n" - + " \"receiptHandle\": \"AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...\",\n" - + " \"body\": \"Test message.\",\n" - + " \"attributes\": {\n" - + " \"ApproximateReceiveCount\": \"1\",\n" - + " \"SentTimestamp\": \"1545082650636\",\n" - + " \"SenderId\": \"AIDAIENQZJOLO23YVJ4VO\",\n" - + " \"ApproximateFirstReceiveTimestamp\": \"1545082650649\"\n" - + " },\n" - + " \"messageAttributes\": {},\n" - + " \"md5OfBody\": \"e4e68fb7bd0e697a0ae8f1bb342846b3\",\n" - + " \"eventSource\": \"aws:sqs\",\n" - + " \"eventSourceARN\": \"arn:aws:sqs:us-east-2:123456789012:my-queue\",\n" - + " \"awsRegion\": \"us-east-2\"\n" - + " }\n" - + " ]\n" - + "}"); + new EventInfo( + SQSEvent.class, + "{\n" + + " \"Records\": [\n" + + " {\n" + + " \"messageId\": \"059f36b4-87a3-44ab-83d2-661975830a7d\",\n" + + " \"receiptHandle\": \"AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...\",\n" + + " \"body\": \"Test message.\",\n" + + " \"attributes\": {\n" + + " \"ApproximateReceiveCount\": \"1\",\n" + + " \"SentTimestamp\": \"1545082649183\",\n" + + " \"SenderId\": \"AIDAIENQZJOLO23YVJ4VO\",\n" + + " \"ApproximateFirstReceiveTimestamp\": \"1545082649185\"\n" + + " },\n" + + " \"messageAttributes\": {},\n" + + " \"md5OfBody\": \"e4e68fb7bd0e697a0ae8f1bb342846b3\",\n" + + " \"eventSource\": \"aws:sqs\",\n" + + " \"eventSourceARN\": \"arn:aws:sqs:us-east-2:123456789012:my-queue\",\n" + + " \"awsRegion\": \"us-east-2\"\n" + + " },\n" + + " {\n" + + " \"messageId\": \"2e1424d4-f796-459a-8184-9c92662be6da\",\n" + + " \"receiptHandle\": \"AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...\",\n" + + " \"body\": \"Test message.\",\n" + + " \"attributes\": {\n" + + " \"ApproximateReceiveCount\": \"1\",\n" + + " \"SentTimestamp\": \"1545082650636\",\n" + + " \"SenderId\": \"AIDAIENQZJOLO23YVJ4VO\",\n" + + " \"ApproximateFirstReceiveTimestamp\": \"1545082650649\"\n" + + " },\n" + + " \"messageAttributes\": {},\n" + + " \"md5OfBody\": \"e4e68fb7bd0e697a0ae8f1bb342846b3\",\n" + + " \"eventSource\": \"aws:sqs\",\n" + + " \"eventSourceARN\": \"arn:aws:sqs:us-east-2:123456789012:my-queue\",\n" + + " \"awsRegion\": \"us-east-2\"\n" + + " }\n" + + " ]\n" + + "}")); events.put( S3EventRequestHandler.class, - "{\n" - + " \"Records\": [\n" - + " {\n" - + " \"eventVersion\": \"2.1\",\n" - + " \"eventSource\": \"aws:s3\",\n" - + " \"awsRegion\": \"us-east-2\",\n" - + " \"eventTime\": \"2019-09-03T19:37:27.192Z\",\n" - + " \"eventName\": \"ObjectCreated:Put\",\n" - + " \"userIdentity\": {\n" - + " \"principalId\": \"AWS:AIDAINPONIXQXHT3IKHL2\"\n" - + " },\n" - + " \"requestParameters\": {\n" - + " \"sourceIPAddress\": \"205.255.255.255\"\n" - + " },\n" - + " \"responseElements\": {\n" - + " \"x-amz-request-id\": \"D82B88E5F771F645\",\n" - + " \"x-amz-id-2\": \"vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=\"\n" - + " },\n" - + " \"s3\": {\n" - + " \"s3SchemaVersion\": \"1.0\",\n" - + " \"configurationId\": \"828aa6fc-f7b5-4305-8584-487c791949c1\",\n" - + " \"bucket\": {\n" - + " \"name\": \"DOC-EXAMPLE-BUCKET\",\n" - + " \"ownerIdentity\": {\n" - + " \"principalId\": \"A3I5XTEXAMAI3E\"\n" - + " },\n" - + " \"arn\": \"arn:aws:s3:::lambda-artifacts-deafc19498e3f2df\"\n" - + " },\n" - + " \"object\": {\n" - + " \"key\": \"b21b84d653bb07b05b1e6b33684dc11b\",\n" - + " \"size\": 1305107,\n" - + " \"eTag\": \"b21b84d653bb07b05b1e6b33684dc11b\",\n" - + " \"sequencer\": \"0C0F6F405D6ED209E1\"\n" - + " }\n" - + " }\n" - + " }\n" - + " ]\n" - + "}"); + new EventInfo( + S3Event.class, + "{\n" + + " \"Records\": [\n" + + " {\n" + + " \"eventVersion\": \"2.1\",\n" + + " \"eventSource\": \"aws:s3\",\n" + + " \"awsRegion\": \"us-east-2\",\n" + + " \"eventTime\": \"2019-09-03T19:37:27.192Z\",\n" + + " \"eventName\": \"ObjectCreated:Put\",\n" + + " \"userIdentity\": {\n" + + " \"principalId\": \"AWS:AIDAINPONIXQXHT3IKHL2\"\n" + + " },\n" + + " \"requestParameters\": {\n" + + " \"sourceIPAddress\": \"205.255.255.255\"\n" + + " },\n" + + " \"responseElements\": {\n" + + " \"x-amz-request-id\": \"D82B88E5F771F645\",\n" + + " \"x-amz-id-2\": \"vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=\"\n" + + " },\n" + + " \"s3\": {\n" + + " \"s3SchemaVersion\": \"1.0\",\n" + + " \"configurationId\": \"828aa6fc-f7b5-4305-8584-487c791949c1\",\n" + + " \"bucket\": {\n" + + " \"name\": \"DOC-EXAMPLE-BUCKET\",\n" + + " \"ownerIdentity\": {\n" + + " \"principalId\": \"A3I5XTEXAMAI3E\"\n" + + " },\n" + + " \"arn\": \"arn:aws:s3:::lambda-artifacts-deafc19498e3f2df\"\n" + + " },\n" + + " \"object\": {\n" + + " \"key\": \"b21b84d653bb07b05b1e6b33684dc11b\",\n" + + " \"size\": 1305107,\n" + + " \"eTag\": \"b21b84d653bb07b05b1e6b33684dc11b\",\n" + + " \"sequencer\": \"0C0F6F405D6ED209E1\"\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + "}")); events.put( SnsEventRequestHandler.class, - "{\n" - + " \"Records\": [\n" - + " {\n" - + " \"EventVersion\": \"1.0\",\n" - + " \"EventSubscriptionArn\": \"arn:aws:sns:us-east-2:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486\",\n" - + " \"EventSource\": \"aws:sns\",\n" - + " \"Sns\": {\n" - + " \"SignatureVersion\": \"1\",\n" - + " \"Timestamp\": \"2019-01-02T12:45:07.000Z\",\n" - + " \"Signature\": \"tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==\",\n" - + " \"SigningCertUrl\": \"https://sns.us-east-2.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem\",\n" - + " \"MessageId\": \"95df01b4-ee98-5cb9-9903-4c221d41eb5e\",\n" - + " \"Message\": \"Hello from SNS!\",\n" - + " \"MessageAttributes\": {\n" - + " \"Test\": {\n" - + " \"Type\": \"String\",\n" - + " \"Value\": \"TestString\"\n" - + " },\n" - + " \"TestBinary\": {\n" - + " \"Type\": \"Binary\",\n" - + " \"Value\": \"TestBinary\"\n" - + " }\n" - + " },\n" - + " \"Type\": \"Notification\",\n" - + " \"UnsubscribeUrl\": \"https://sns.us-east-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-2:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486\",\n" - + " \"TopicArn\":\"arn:aws:sns:us-east-2:123456789012:sns-lambda\",\n" - + " \"Subject\": \"TestInvoke\"\n" - + " }\n" - + " }\n" - + " ]\n" - + "}"); - + new EventInfo( + SNSEvent.class, + "{\n" + + " \"Records\": [\n" + + " {\n" + + " \"EventVersion\": \"1.0\",\n" + + " \"EventSubscriptionArn\": \"arn:aws:sns:us-east-2:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486\",\n" + + " \"EventSource\": \"aws:sns\",\n" + + " \"Sns\": {\n" + + " \"SignatureVersion\": \"1\",\n" + + " \"Timestamp\": \"2019-01-02T12:45:07.000Z\",\n" + + " \"Signature\": \"tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==\",\n" + + " \"SigningCertUrl\": \"https://sns.us-east-2.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem\",\n" + + " \"MessageId\": \"95df01b4-ee98-5cb9-9903-4c221d41eb5e\",\n" + + " \"Message\": \"Hello from SNS!\",\n" + + " \"MessageAttributes\": {\n" + + " \"Test\": {\n" + + " \"Type\": \"String\",\n" + + " \"Value\": \"TestString\"\n" + + " },\n" + + " \"TestBinary\": {\n" + + " \"Type\": \"Binary\",\n" + + " \"Value\": \"TestBinary\"\n" + + " }\n" + + " },\n" + + " \"Type\": \"Notification\",\n" + + " \"UnsubscribeUrl\": \"https://sns.us-east-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-2:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486\",\n" + + " \"TopicArn\":\"arn:aws:sns:us-east-2:123456789012:sns-lambda\",\n" + + " \"Subject\": \"TestInvoke\"\n" + + " }\n" + + " }\n" + + " ]\n" + + "}")); return events; } private TracingRequestWrapper buildWrapper(Class targetClass) { WrappedLambda wrappedLambda = new WrappedLambda(targetClass, "handleRequest"); - return new TracingRequestWrapper(sdk, wrappedLambda, TracingRequestWrapper::map); + return new TracingRequestWrapper(sdk, wrappedLambda); } @ParameterizedTest @@ -218,46 +235,61 @@ class TracingRequestWrapperStandardEventsTest { S3EventRequestHandler.class, SnsEventRequestHandler.class }) - void handleScheduledEvent(Class targetClass) throws JsonProcessingException { + void handleLambdaEvent(Class targetClass) throws IOException { wrapper = buildWrapper(targetClass); - Object parsedScheduledEvent = - OBJECT_MAPPER.readValue(EVENTS_JSON.get(targetClass), Object.class); - assertEquals(SUCCESS, wrapper.doHandleRequest(parsedScheduledEvent, context)); + EventInfo eventInfo = EVENTS_JSON.get(targetClass); + + Object input = SerializationUtil.fromJson(eventInfo.eventBody, eventInfo.eventType); + + // Call to object based "O handleRequest(I input, Context context)" method + // delegates to the stream based + // "handleRequest(InputStream input, OutputStream output, Context context)" method. + // So serialization/deserialization of both input and outputs are triggered to be verified here. + Object output = wrapper.handleRequest(input, context); + + assertThat(input.getClass()).isEqualTo(output.getClass()); + + // "equals" methods are not properly implemented of Lambda events, + // so we are comparing them over their serialized json data + String inputJson = SerializationUtil.toJson(input); + String outputJson = SerializationUtil.toJson(output); + assertThat(inputJson).isEqualTo(outputJson); } public static class ScheduledEventRequestHandler - implements RequestHandler { + implements RequestHandler { @Override - public String handleRequest(ScheduledEvent i, Context cntxt) { - return SUCCESS; + public ScheduledEvent handleRequest(ScheduledEvent i, Context cntxt) { + return i; } } - public static class KinesisEventRequestHandler implements RequestHandler { + public static class KinesisEventRequestHandler + implements RequestHandler { @Override - public String handleRequest(KinesisEvent i, Context cntxt) { - return SUCCESS; + public KinesisEvent handleRequest(KinesisEvent i, Context cntxt) { + return i; } } - public static class SqsEventRequestHandler implements RequestHandler { + public static class SqsEventRequestHandler implements RequestHandler { @Override - public String handleRequest(SQSEvent i, Context cntxt) { - return SUCCESS; + public SQSEvent handleRequest(SQSEvent i, Context cntxt) { + return i; } } - public static class S3EventRequestHandler implements RequestHandler { + public static class S3EventRequestHandler implements RequestHandler { @Override - public String handleRequest(S3Event i, Context cntxt) { - return SUCCESS; + public S3Event handleRequest(S3Event i, Context cntxt) { + return i; } } - public static class SnsEventRequestHandler implements RequestHandler { + public static class SnsEventRequestHandler implements RequestHandler { @Override - public String handleRequest(SNSEvent i, Context cntxt) { - return SUCCESS; + public SNSEvent handleRequest(SNSEvent i, Context cntxt) { + return i; } } } diff --git a/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/internal/SerializationUtilTest.java b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/internal/SerializationUtilTest.java new file mode 100644 index 0000000000..7711d15779 --- /dev/null +++ b/instrumentation/aws-lambda/aws-lambda-events-2.2/library/src/test/java/io/opentelemetry/instrumentation/awslambdaevents/v2_2/internal/SerializationUtilTest.java @@ -0,0 +1,359 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awslambdaevents.v2_2.internal; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.S3Event; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.lambda.runtime.events.ScheduledEvent; +import com.amazonaws.services.lambda.runtime.events.models.s3.S3EventNotification; +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import org.joda.time.DateTime; +import org.junit.jupiter.api.Test; + +public class SerializationUtilTest { + + private static final Map, String> events = buildEventExamples(); + + private static Map, String> buildEventExamples() { + Map, String> events = new HashMap<>(); + events.put( + ScheduledEvent.class, + "{\n" + + " \"version\": \"0\",\n" + + " \"id\": \"53dc4d37-cffa-4f76-80c9-8b7d4a4d2eaa\",\n" + + " \"detail-type\": \"Scheduled Event\",\n" + + " \"source\": \"aws.events\",\n" + + " \"account\": \"123456789012\",\n" + + " \"time\": \"2015-10-08T16:53:06Z\",\n" + + " \"region\": \"us-east-1\",\n" + + " \"resources\": [\n" + + " \"arn:aws:events:us-east-1:123456789012:rule/my-scheduled-rule\"\n" + + " ],\n" + + " \"detail\": {}\n" + + "}"); + events.put( + KinesisEvent.class, + "{\n" + + " \"Records\": [\n" + + " {\n" + + " \"kinesis\": {\n" + + " \"kinesisSchemaVersion\": \"1.0\",\n" + + " \"partitionKey\": \"1\",\n" + + " \"sequenceNumber\": \"49590338271490256608559692538361571095921575989136588898\",\n" + + " \"data\": \"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==\",\n" + + " \"approximateArrivalTimestamp\": 1545084650.987\n" + + " },\n" + + " \"eventSource\": \"aws:kinesis\",\n" + + " \"eventVersion\": \"1.0\",\n" + + " \"eventID\": \"shardId-000000000006:49590338271490256608559692538361571095921575989136588898\",\n" + + " \"eventName\": \"aws:kinesis:record\",\n" + + " \"invokeIdentityArn\": \"arn:aws:iam::123456789012:role/lambda-role\",\n" + + " \"awsRegion\": \"us-east-2\",\n" + + " \"eventSourceARN\": \"arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream\"\n" + + " },\n" + + " {\n" + + " \"kinesis\": {\n" + + " \"kinesisSchemaVersion\": \"1.0\",\n" + + " \"partitionKey\": \"1\",\n" + + " \"sequenceNumber\": \"49590338271490256608559692540925702759324208523137515618\",\n" + + " \"data\": \"VGhpcyBpcyBvbmx5IGEgdGVzdC4=\",\n" + + " \"approximateArrivalTimestamp\": 1545084711.166\n" + + " },\n" + + " \"eventSource\": \"aws:kinesis\",\n" + + " \"eventVersion\": \"1.0\",\n" + + " \"eventID\": \"shardId-000000000006:49590338271490256608559692540925702759324208523137515618\",\n" + + " \"eventName\": \"aws:kinesis:record\",\n" + + " \"invokeIdentityArn\": \"arn:aws:iam::123456789012:role/lambda-role\",\n" + + " \"awsRegion\": \"us-east-2\",\n" + + " \"eventSourceARN\": \"arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream\"\n" + + " }\n" + + " ]\n" + + "}"); + events.put( + SQSEvent.class, + "{\n" + + " \"Records\": [\n" + + " {\n" + + " \"messageId\": \"059f36b4-87a3-44ab-83d2-661975830a7d\",\n" + + " \"receiptHandle\": \"AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...\",\n" + + " \"body\": \"Test message.\",\n" + + " \"attributes\": {\n" + + " \"ApproximateReceiveCount\": \"1\",\n" + + " \"SentTimestamp\": \"1545082649183\",\n" + + " \"SenderId\": \"AIDAIENQZJOLO23YVJ4VO\",\n" + + " \"ApproximateFirstReceiveTimestamp\": \"1545082649185\"\n" + + " },\n" + + " \"messageAttributes\": {},\n" + + " \"md5OfBody\": \"e4e68fb7bd0e697a0ae8f1bb342846b3\",\n" + + " \"eventSource\": \"aws:sqs\",\n" + + " \"eventSourceARN\": \"arn:aws:sqs:us-east-2:123456789012:my-queue\",\n" + + " \"awsRegion\": \"us-east-2\"\n" + + " },\n" + + " {\n" + + " \"messageId\": \"2e1424d4-f796-459a-8184-9c92662be6da\",\n" + + " \"receiptHandle\": \"AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...\",\n" + + " \"body\": \"Test message.\",\n" + + " \"attributes\": {\n" + + " \"ApproximateReceiveCount\": \"1\",\n" + + " \"SentTimestamp\": \"1545082650636\",\n" + + " \"SenderId\": \"AIDAIENQZJOLO23YVJ4VO\",\n" + + " \"ApproximateFirstReceiveTimestamp\": \"1545082650649\"\n" + + " },\n" + + " \"messageAttributes\": {},\n" + + " \"md5OfBody\": \"e4e68fb7bd0e697a0ae8f1bb342846b3\",\n" + + " \"eventSource\": \"aws:sqs\",\n" + + " \"eventSourceARN\": \"arn:aws:sqs:us-east-2:123456789012:my-queue\",\n" + + " \"awsRegion\": \"us-east-2\"\n" + + " }\n" + + " ]\n" + + "}"); + events.put( + S3Event.class, + "{\n" + + " \"Records\": [\n" + + " {\n" + + " \"eventVersion\": \"2.1\",\n" + + " \"eventSource\": \"aws:s3\",\n" + + " \"awsRegion\": \"us-east-2\",\n" + + " \"eventTime\": \"2019-09-03T19:37:27.192Z\",\n" + + " \"eventName\": \"ObjectCreated:Put\",\n" + + " \"userIdentity\": {\n" + + " \"principalId\": \"AWS:AIDAINPONIXQXHT3IKHL2\"\n" + + " },\n" + + " \"requestParameters\": {\n" + + " \"sourceIPAddress\": \"205.255.255.255\"\n" + + " },\n" + + " \"responseElements\": {\n" + + " \"x-amz-request-id\": \"D82B88E5F771F645\",\n" + + " \"x-amz-id-2\": \"vlR7PnpV2Ce81l0PRw6jlUpck7Jo5ZsQjryTjKlc5aLWGVHPZLj5NeC6qMa0emYBDXOo6QBU0Wo=\"\n" + + " },\n" + + " \"s3\": {\n" + + " \"s3SchemaVersion\": \"1.0\",\n" + + " \"configurationId\": \"828aa6fc-f7b5-4305-8584-487c791949c1\",\n" + + " \"bucket\": {\n" + + " \"name\": \"DOC-EXAMPLE-BUCKET\",\n" + + " \"ownerIdentity\": {\n" + + " \"principalId\": \"A3I5XTEXAMAI3E\"\n" + + " },\n" + + " \"arn\": \"arn:aws:s3:::lambda-artifacts-deafc19498e3f2df\"\n" + + " },\n" + + " \"object\": {\n" + + " \"key\": \"b21b84d653bb07b05b1e6b33684dc11b\",\n" + + " \"size\": 1305107,\n" + + " \"eTag\": \"b21b84d653bb07b05b1e6b33684dc11b\",\n" + + " \"sequencer\": \"0C0F6F405D6ED209E1\"\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"); + events.put( + SNSEvent.class, + "{\n" + + " \"Records\": [\n" + + " {\n" + + " \"EventVersion\": \"1.0\",\n" + + " \"EventSubscriptionArn\": \"arn:aws:sns:us-east-2:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486\",\n" + + " \"EventSource\": \"aws:sns\",\n" + + " \"Sns\": {\n" + + " \"SignatureVersion\": \"1\",\n" + + " \"Timestamp\": \"2019-01-02T12:45:07.000Z\",\n" + + " \"Signature\": \"tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==\",\n" + + " \"SigningCertUrl\": \"https://sns.us-east-2.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem\",\n" + + " \"MessageId\": \"95df01b4-ee98-5cb9-9903-4c221d41eb5e\",\n" + + " \"Message\": \"Hello from SNS!\",\n" + + " \"MessageAttributes\": {\n" + + " \"Test\": {\n" + + " \"Type\": \"String\",\n" + + " \"Value\": \"TestString\"\n" + + " },\n" + + " \"TestBinary\": {\n" + + " \"Type\": \"Binary\",\n" + + " \"Value\": \"TestBinary\"\n" + + " }\n" + + " },\n" + + " \"Type\": \"Notification\",\n" + + " \"UnsubscribeUrl\": \"https://sns.us-east-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-2:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486\",\n" + + " \"TopicArn\":\"arn:aws:sns:us-east-2:123456789012:sns-lambda\",\n" + + " \"Subject\": \"TestInvoke\"\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"); + return events; + } + + private static void assertScheduledEvent(ScheduledEvent event) { + assertThat(event).isNotNull(); + assertThat(event.getSource()).isEqualTo("aws.events"); + assertThat(event.getDetailType()).isEqualTo("Scheduled Event"); + assertThat(event.getAccount()).isEqualTo("123456789012"); + assertThat(event.getId()).isEqualTo("53dc4d37-cffa-4f76-80c9-8b7d4a4d2eaa"); + assertThat(event.getRegion()).isEqualTo("us-east-1"); + assertThat(event.getTime().getMillis()) + .isEqualTo(new DateTime("2015-10-08T16:53:06Z").getMillis()); + } + + private static void assertKinesisEvent(KinesisEvent event) { + assertThat(event).isNotNull(); + assertThat(event.getRecords()).isNotNull(); + assertThat(event.getRecords().size()).isEqualTo(2); + KinesisEvent.KinesisEventRecord record = event.getRecords().get(0); + assertThat(record.getEventSource()).isEqualTo("aws:kinesis"); + assertThat(record.getEventSourceARN()) + .isEqualTo("arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"); + assertThat(record.getEventName()).isEqualTo("aws:kinesis:record"); + assertThat(record.getEventID()) + .isEqualTo("shardId-000000000006:49590338271490256608559692538361571095921575989136588898"); + assertThat(record.getKinesis().getPartitionKey()).isEqualTo("1"); + assertThat(record.getKinesis().getSequenceNumber()) + .isEqualTo("49590338271490256608559692538361571095921575989136588898"); + assertThat(new String(record.getKinesis().getData().array(), StandardCharsets.UTF_8)) + .isEqualTo("Hello, this is a test."); + } + + private static void assertSqsEvent(SQSEvent event) { + assertThat(event).isNotNull(); + assertThat(event.getRecords()).isNotNull(); + assertThat(event.getRecords().size()).isEqualTo(2); + SQSEvent.SQSMessage record = event.getRecords().get(0); + assertThat(record.getEventSource()).isEqualTo("aws:sqs"); + assertThat(record.getEventSourceArn()).isEqualTo("arn:aws:sqs:us-east-2:123456789012:my-queue"); + assertThat(record.getMessageId()).isEqualTo("059f36b4-87a3-44ab-83d2-661975830a7d"); + assertThat(record.getBody()).isEqualTo("Test message."); + assertThat(record.getMd5OfBody()).isEqualTo("e4e68fb7bd0e697a0ae8f1bb342846b3"); + assertThat(record.getReceiptHandle()).isEqualTo("AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a..."); + } + + private static void assertS3Event(S3Event event) { + assertThat(event).isNotNull(); + assertThat(event.getRecords()).isNotNull(); + assertThat(event.getRecords().size()).isEqualTo(1); + S3EventNotification.S3EventNotificationRecord record = event.getRecords().get(0); + assertThat(record.getEventSource()).isEqualTo("aws:s3"); + assertThat(record.getEventName()).isEqualTo("ObjectCreated:Put"); + assertThat(record.getS3().getBucket().getName()).isEqualTo("DOC-EXAMPLE-BUCKET"); + assertThat(record.getS3().getBucket().getArn()) + .isEqualTo("arn:aws:s3:::lambda-artifacts-deafc19498e3f2df"); + assertThat(record.getS3().getObject().getKey()).isEqualTo("b21b84d653bb07b05b1e6b33684dc11b"); + } + + private static void assertSnsEvent(SNSEvent event) { + assertThat(event).isNotNull(); + assertThat(event.getRecords()).isNotNull(); + assertThat(event.getRecords().size()).isEqualTo(1); + SNSEvent.SNSRecord record = event.getRecords().get(0); + assertThat(record.getEventSource()).isEqualTo("aws:sns"); + assertThat(record.getEventSubscriptionArn()) + .isEqualTo( + "arn:aws:sns:us-east-2:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486"); + assertThat(record.getSNS().getMessageId()).isEqualTo("95df01b4-ee98-5cb9-9903-4c221d41eb5e"); + assertThat(record.getSNS().getMessage()).isEqualTo("Hello from SNS!"); + assertThat(record.getSNS().getType()).isEqualTo("Notification"); + assertThat(record.getSNS().getTopicArn()) + .isEqualTo("arn:aws:sns:us-east-2:123456789012:sns-lambda"); + assertThat(record.getSNS().getSubject()).isEqualTo("TestInvoke"); + } + + @Test + void scheduledEventDeserializedFromStringJson() { + String eventBody = events.get(ScheduledEvent.class); + ScheduledEvent event = SerializationUtil.fromJson(eventBody, ScheduledEvent.class); + + assertScheduledEvent(event); + } + + @Test + void scheduledEventDeserializedFromInputStreamJson() { + String eventBody = events.get(ScheduledEvent.class); + ScheduledEvent event = + SerializationUtil.fromJson( + new ByteArrayInputStream(eventBody.getBytes(StandardCharsets.UTF_8)), + ScheduledEvent.class); + + assertScheduledEvent(event); + } + + @Test + void kinesisEventDeserializedFromStringJson() { + String eventBody = events.get(KinesisEvent.class); + KinesisEvent event = SerializationUtil.fromJson(eventBody, KinesisEvent.class); + + assertKinesisEvent(event); + } + + @Test + void kinesisEventDeserializedFromInputStreamJson() { + String eventBody = events.get(KinesisEvent.class); + KinesisEvent event = + SerializationUtil.fromJson( + new ByteArrayInputStream(eventBody.getBytes(StandardCharsets.UTF_8)), + KinesisEvent.class); + + assertKinesisEvent(event); + } + + @Test + void sqsEventDeserializedFromStringJson() { + String eventBody = events.get(SQSEvent.class); + SQSEvent event = SerializationUtil.fromJson(eventBody, SQSEvent.class); + + assertSqsEvent(event); + } + + @Test + void sqsEventDeserializedFromInputStreamJson() { + String eventBody = events.get(SQSEvent.class); + SQSEvent event = + SerializationUtil.fromJson( + new ByteArrayInputStream(eventBody.getBytes(StandardCharsets.UTF_8)), SQSEvent.class); + + assertSqsEvent(event); + } + + @Test + void s3EventDeserializedFromStringJson() { + String eventBody = events.get(S3Event.class); + S3Event event = SerializationUtil.fromJson(eventBody, S3Event.class); + + assertS3Event(event); + } + + @Test + void s3EventDeserializedFromInputStreamJson() { + String eventBody = events.get(S3Event.class); + S3Event event = + SerializationUtil.fromJson( + new ByteArrayInputStream(eventBody.getBytes(StandardCharsets.UTF_8)), S3Event.class); + + assertS3Event(event); + } + + @Test + void snsEventDeserializedFromStringJson() { + String eventBody = events.get(SNSEvent.class); + SNSEvent event = SerializationUtil.fromJson(eventBody, SNSEvent.class); + + assertSnsEvent(event); + } + + @Test + void snsEventDeserializedFromInputStreamJson() { + String eventBody = events.get(SNSEvent.class); + SNSEvent event = + SerializationUtil.fromJson( + new ByteArrayInputStream(eventBody.getBytes(StandardCharsets.UTF_8)), SNSEvent.class); + + assertSnsEvent(event); + } +}