From ca9f623d053d825e53f5f871b4c0d9108f08a467 Mon Sep 17 00:00:00 2001 From: Nikolay Martynov Date: Wed, 12 Feb 2020 14:47:09 -0500 Subject: [PATCH] Add profiling lz4 compression support Gzip is still the default --- .../profiling-uploader.gradle | 1 + .../profiling/uploader/CompressionType.java | 8 ++ .../profiling/uploader/RecordingUploader.java | 6 ++ .../profiling/uploader/util/StreamUtils.java | 50 ++++++++++- .../uploader/RecordingUploaderTest.java | 14 ++- .../uploader/util/StreamUtilsTest.java | 85 +++++++++++++++++-- .../profiling-integration-tests.gradle | 1 + ...ngIntegrationContinuousProfilesTest.groovy | 3 +- 8 files changed, 155 insertions(+), 13 deletions(-) diff --git a/dd-java-agent/agent-profiling/profiling-uploader/profiling-uploader.gradle b/dd-java-agent/agent-profiling/profiling-uploader/profiling-uploader.gradle index e84f1d1f61..2371a4ac45 100644 --- a/dd-java-agent/agent-profiling/profiling-uploader/profiling-uploader.gradle +++ b/dd-java-agent/agent-profiling/profiling-uploader/profiling-uploader.gradle @@ -23,6 +23,7 @@ dependencies { compile deps.guava compile deps.okhttp compile group: 'com.github.jnr', name: 'jnr-posix', version: '3.0.52' + compile group: 'org.lz4', name: 'lz4-java', version: '1.7.1' testCompile deps.junit5 testCompile project(':dd-java-agent:agent-profiling:profiling-testing') diff --git a/dd-java-agent/agent-profiling/profiling-uploader/src/main/java/com/datadog/profiling/uploader/CompressionType.java b/dd-java-agent/agent-profiling/profiling-uploader/src/main/java/com/datadog/profiling/uploader/CompressionType.java index 75c507cd3c..dee669fd74 100644 --- a/dd-java-agent/agent-profiling/profiling-uploader/src/main/java/com/datadog/profiling/uploader/CompressionType.java +++ b/dd-java-agent/agent-profiling/profiling-uploader/src/main/java/com/datadog/profiling/uploader/CompressionType.java @@ -5,6 +5,10 @@ enum CompressionType { OFF, /** Default compression */ ON, + /** Lower compression ratio with less CPU overhead * */ + LOW, + /** Better compression ratio for the price of higher CPU usage * */ + MEDIUM, /** Unknown compression config value */ UNKNOWN; @@ -18,6 +22,10 @@ enum CompressionType { return OFF; case "on": return ON; + case "low": + return LOW; + case "medium": + return MEDIUM; default: return UNKNOWN; } diff --git a/dd-java-agent/agent-profiling/profiling-uploader/src/main/java/com/datadog/profiling/uploader/RecordingUploader.java b/dd-java-agent/agent-profiling/profiling-uploader/src/main/java/com/datadog/profiling/uploader/RecordingUploader.java index c15a165cc6..39243c8113 100644 --- a/dd-java-agent/agent-profiling/profiling-uploader/src/main/java/com/datadog/profiling/uploader/RecordingUploader.java +++ b/dd-java-agent/agent-profiling/profiling-uploader/src/main/java/com/datadog/profiling/uploader/RecordingUploader.java @@ -244,7 +244,13 @@ public final class RecordingUploader { // currently only gzip and off are supported // this needs to be updated once more compression types are added switch (type) { + case LOW: + { + compression = (is, expectedSize) -> StreamUtils.lz4Stream(is, expectedSize, consumer); + break; + } case ON: + case MEDIUM: { compression = (is, expectedSize) -> StreamUtils.gzipStream(is, expectedSize, consumer); break; diff --git a/dd-java-agent/agent-profiling/profiling-uploader/src/main/java/com/datadog/profiling/uploader/util/StreamUtils.java b/dd-java-agent/agent-profiling/profiling-uploader/src/main/java/com/datadog/profiling/uploader/util/StreamUtils.java index 04c94890de..f97f82a176 100644 --- a/dd-java-agent/agent-profiling/profiling-uploader/src/main/java/com/datadog/profiling/uploader/util/StreamUtils.java +++ b/dd-java-agent/agent-profiling/profiling-uploader/src/main/java/com/datadog/profiling/uploader/util/StreamUtils.java @@ -8,11 +8,15 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.List; import java.util.zip.GZIPOutputStream; +import net.jpountz.lz4.LZ4FrameOutputStream; import org.openjdk.jmc.common.io.IOToolkit; /** A collection of I/O stream related helper methods */ public final class StreamUtils { + // https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md#general-structure-of-lz4-frame-format + static final int[] LZ4_MAGIC = new int[] {0x04, 0x22, 0x4D, 0x18}; + // JMC's IOToolkit hides this from us... static final int ZIP_MAGIC[] = new int[] {80, 75, 3, 4}; static final int GZ_MAGIC[] = new int[] {31, 139}; @@ -32,10 +36,10 @@ public final class StreamUtils { /** * Read a stream into a consumer gzip-compressing content. If the stream is already compressed - * (gzip, zip) the original data will be returned. + * (gzip, zip, lz4) the original data will be returned. * * @param is the input stream - * @return zipped contents of the input stream or the the original content if the stream is + * @return gzipped contents of the input stream or the the original content if the stream is * already compressed * @throws IOException */ @@ -53,6 +57,29 @@ public final class StreamUtils { } } + /** + * Read a stream into a consumer lz4-compressing content. If the stream is already compressed + * (gzip, zip, lz4) the original data will be returned. + * + * @param is the input stream + * @return lz4ed contents of the input stream or the the original content if the stream is already + * compressed + * @throws IOException + */ + public static T lz4Stream( + InputStream is, final int expectedSize, final BytesConsumer consumer) throws IOException { + is = ensureMarkSupported(is); + if (isCompressed(is)) { + return readStream(is, expectedSize, consumer); + } else { + final FastByteArrayOutputStream baos = new FastByteArrayOutputStream(expectedSize); + try (final OutputStream zipped = new LZ4FrameOutputStream(baos)) { + copy(is, zipped); + } + return baos.consume(consumer); + } + } + /** * Read a stream into a consumer. * @@ -168,7 +195,7 @@ public final class StreamUtils { */ private static boolean isCompressed(final InputStream is) throws IOException { checkMarkSupported(is); - return isGzip(is) || isZip(is); + return isGzip(is) || isLz4(is) || isZip(is); } /** @@ -205,6 +232,23 @@ public final class StreamUtils { } } + /** + * Check whether the stream represents LZ4 data + * + * @param is input stream; must support {@linkplain InputStream#mark(int)} + * @return {@literal true} if the stream represents LZ4 data + * @throws IOException + */ + private static boolean isLz4(final InputStream is) throws IOException { + checkMarkSupported(is); + is.mark(LZ4_MAGIC.length); + try { + return IOToolkit.hasMagic(is, LZ4_MAGIC); + } finally { + is.reset(); + } + } + private static InputStream ensureMarkSupported(InputStream is) { if (!is.markSupported()) { is = new BufferedInputStream(is); diff --git a/dd-java-agent/agent-profiling/profiling-uploader/src/test/java/com/datadog/profiling/uploader/RecordingUploaderTest.java b/dd-java-agent/agent-profiling/profiling-uploader/src/test/java/com/datadog/profiling/uploader/RecordingUploaderTest.java index 19cfd03c4c..ef7ce5f25d 100644 --- a/dd-java-agent/agent-profiling/profiling-uploader/src/test/java/com/datadog/profiling/uploader/RecordingUploaderTest.java +++ b/dd-java-agent/agent-profiling/profiling-uploader/src/test/java/com/datadog/profiling/uploader/RecordingUploaderTest.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; +import net.jpountz.lz4.LZ4FrameInputStream; import okhttp3.Credentials; import okhttp3.HttpUrl; import okhttp3.mockwebserver.MockResponse; @@ -138,7 +139,7 @@ public class RecordingUploaderTest { } @ParameterizedTest - @ValueSource(strings = {"on", "off", "invalid"}) + @ValueSource(strings = {"on", "low", "medium", "off", "invalid"}) public void testRequestParameters(final String compression) throws IOException, InterruptedException { when(config.getProfilingUploadCompression()).thenReturn(compression); @@ -184,8 +185,10 @@ public class RecordingUploaderTest { byte[] uploadedBytes = (byte[]) Iterables.getFirst(parameters.get(RecordingUploader.DATA_PARAM), new byte[] {}); - if (compression.equals("on") || compression.equals("invalid")) { + if (compression.equals("on") || compression.equals("medium") || compression.equals("invalid")) { uploadedBytes = unGzip(uploadedBytes); + } else if (compression.equals("low")) { + uploadedBytes = unLz4(uploadedBytes); } assertArrayEquals(expectedBytes, uploadedBytes); } @@ -433,4 +436,11 @@ public class RecordingUploaderTest { ByteStreams.copy(stream, result); return result.toByteArray(); } + + private byte[] unLz4(final byte[] compressed) throws IOException { + final InputStream stream = new LZ4FrameInputStream(new ByteArrayInputStream(compressed)); + final ByteArrayOutputStream result = new ByteArrayOutputStream(); + ByteStreams.copy(stream, result); + return result.toByteArray(); + } } diff --git a/dd-java-agent/agent-profiling/profiling-uploader/src/test/java/com/datadog/profiling/uploader/util/StreamUtilsTest.java b/dd-java-agent/agent-profiling/profiling-uploader/src/test/java/com/datadog/profiling/uploader/util/StreamUtilsTest.java index 4e598d6dce..650488d614 100644 --- a/dd-java-agent/agent-profiling/profiling-uploader/src/test/java/com/datadog/profiling/uploader/util/StreamUtilsTest.java +++ b/dd-java-agent/agent-profiling/profiling-uploader/src/test/java/com/datadog/profiling/uploader/util/StreamUtilsTest.java @@ -13,6 +13,8 @@ import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import net.jpountz.lz4.LZ4FrameInputStream; +import net.jpountz.lz4.LZ4FrameOutputStream; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -25,6 +27,7 @@ class StreamUtilsTest { private static byte[] testRecordingBytes; private static byte[] testRecordingGzippedBytes; private static byte[] testRecordingZippedBytes; + private static byte[] testRecordingLz4edBytes; @BeforeAll public static void setupClass() throws IOException { @@ -37,6 +40,10 @@ class StreamUtilsTest { final ByteArrayOutputStream zippedStream = new ByteArrayOutputStream(); ByteStreams.copy(testRecordingStream(), createZipOutputStream(zippedStream)); testRecordingZippedBytes = zippedStream.toByteArray(); + + final ByteArrayOutputStream zl4edStream = new ByteArrayOutputStream(); + ByteStreams.copy(testRecordingStream(), new LZ4FrameOutputStream(zl4edStream)); + testRecordingLz4edBytes = zl4edStream.toByteArray(); } @Test @@ -61,9 +68,7 @@ class StreamUtilsTest { final byte[] gzippedBytes = StreamUtils.gzipStream(testRecordingStream(), expectedSize, CONSUME_TO_BYTES); - final byte[] uncompressedBytes = - ByteStreams.toByteArray(new GZIPInputStream(new ByteArrayInputStream(gzippedBytes))); - assertArrayEquals(testRecordingBytes, uncompress(gzippedBytes)); + assertArrayEquals(testRecordingBytes, uncompressGzip(gzippedBytes)); } @Test @@ -72,11 +77,11 @@ class StreamUtilsTest { final byte[] gzippedBytes = StreamUtils.gzipStream(testRecordingStream(), expectedSize, CONSUME_TO_BYTES); - assertArrayEquals(testRecordingBytes, uncompress(gzippedBytes)); + assertArrayEquals(testRecordingBytes, uncompressGzip(gzippedBytes)); } @Test - public void alreadyGzipStream() throws IOException { + public void gzipAlreadyGzippedStream() throws IOException { final byte[] bytes = StreamUtils.gzipStream( new ByteArrayInputStream(testRecordingGzippedBytes), @@ -87,7 +92,7 @@ class StreamUtilsTest { } @Test - public void alreadyZipStream() throws IOException { + public void gzipAlreadyZippedStream() throws IOException { final byte[] bytes = StreamUtils.gzipStream( new ByteArrayInputStream(testRecordingZippedBytes), @@ -97,6 +102,68 @@ class StreamUtilsTest { assertArrayEquals(testRecordingZippedBytes, bytes); } + @Test + public void gzipAlreadyLz4edStream() throws IOException { + final byte[] bytes = + StreamUtils.gzipStream( + new ByteArrayInputStream(testRecordingLz4edBytes), + DEFAULT_EXPECTED_SIZE, + CONSUME_TO_BYTES); + + assertArrayEquals(testRecordingLz4edBytes, bytes); + } + + @Test + public void lz4Stream() throws IOException { + final int expectedSize = 1; // Try very small value to test 'undershoot' logic + final byte[] gzippedBytes = + StreamUtils.lz4Stream(testRecordingStream(), expectedSize, CONSUME_TO_BYTES); + + assertArrayEquals(testRecordingBytes, uncompressLz4(gzippedBytes)); + } + + @Test + public void lz4StreamLargeExpectedSize() throws IOException { + final int expectedSize = testRecordingBytes.length * 2; // overshoot the size + final byte[] gzippedBytes = + StreamUtils.lz4Stream(testRecordingStream(), expectedSize, CONSUME_TO_BYTES); + + assertArrayEquals(testRecordingBytes, uncompressLz4(gzippedBytes)); + } + + @Test + public void lz4AlreadyGzippedStream() throws IOException { + final byte[] bytes = + StreamUtils.lz4Stream( + new ByteArrayInputStream(testRecordingGzippedBytes), + DEFAULT_EXPECTED_SIZE, + CONSUME_TO_BYTES); + + assertArrayEquals(testRecordingGzippedBytes, bytes); + } + + @Test + public void lz4AlreadyZippedStream() throws IOException { + final byte[] bytes = + StreamUtils.lz4Stream( + new ByteArrayInputStream(testRecordingZippedBytes), + DEFAULT_EXPECTED_SIZE, + CONSUME_TO_BYTES); + + assertArrayEquals(testRecordingZippedBytes, bytes); + } + + @Test + public void lz4AlreadyLz4edStream() throws IOException { + final byte[] bytes = + StreamUtils.lz4Stream( + new ByteArrayInputStream(testRecordingLz4edBytes), + DEFAULT_EXPECTED_SIZE, + CONSUME_TO_BYTES); + + assertArrayEquals(testRecordingLz4edBytes, bytes); + } + private static InputStream testRecordingStream() { return StreamUtilsTest.class.getResourceAsStream("/test-recording.jfr"); } @@ -108,7 +175,11 @@ class StreamUtilsTest { return result; } - private static byte[] uncompress(final byte[] bytes) throws IOException { + private static byte[] uncompressGzip(final byte[] bytes) throws IOException { return ByteStreams.toByteArray(new GZIPInputStream(new ByteArrayInputStream(bytes))); } + + private static byte[] uncompressLz4(final byte[] bytes) throws IOException { + return ByteStreams.toByteArray(new LZ4FrameInputStream(new ByteArrayInputStream(bytes))); + } } diff --git a/dd-smoke-tests/profiling-integration-tests/profiling-integration-tests.gradle b/dd-smoke-tests/profiling-integration-tests/profiling-integration-tests.gradle index b3344cb5d8..4f7db66147 100644 --- a/dd-smoke-tests/profiling-integration-tests/profiling-integration-tests.gradle +++ b/dd-smoke-tests/profiling-integration-tests/profiling-integration-tests.gradle @@ -30,6 +30,7 @@ dependencies { testCompile "org.openjdk.jmc:flightrecorder:$jmcVersion" testCompile "org.openjdk.jmc:flightrecorder.rules:$jmcVersion" testCompile "org.openjdk.jmc:flightrecorder.rules.jdk:$jmcVersion" + compile group: 'org.lz4', name: 'lz4-java', version: '1.7.1' } tasks.withType(Test).configureEach { diff --git a/dd-smoke-tests/profiling-integration-tests/src/test/groovy/datadog/smoketest/ProfilingIntegrationContinuousProfilesTest.groovy b/dd-smoke-tests/profiling-integration-tests/src/test/groovy/datadog/smoketest/ProfilingIntegrationContinuousProfilesTest.groovy index b59415fe01..5da4380d14 100644 --- a/dd-smoke-tests/profiling-integration-tests/src/test/groovy/datadog/smoketest/ProfilingIntegrationContinuousProfilesTest.groovy +++ b/dd-smoke-tests/profiling-integration-tests/src/test/groovy/datadog/smoketest/ProfilingIntegrationContinuousProfilesTest.groovy @@ -11,6 +11,7 @@ import org.openjdk.jmc.flightrecorder.JfrLoaderToolkit import java.time.Instant import java.util.concurrent.TimeUnit +import java.util.zip.GZIPInputStream class ProfilingIntegrationContinuousProfilesTest extends AbstractSmokeTest { @@ -96,7 +97,7 @@ class ProfilingIntegrationContinuousProfilesTest extends AbstractSmokeTest { firstRequestParameters.get("chunk-data").get(0) != null - IItemCollection events = JfrLoaderToolkit.loadEvents(new ByteArrayInputStream(secondRequestParameters.get("chunk-data").get(0))) + IItemCollection events = JfrLoaderToolkit.loadEvents(new GZIPInputStream(new ByteArrayInputStream(secondRequestParameters.get("chunk-data").get(0)))) IItemCollection scopeEvents = events.apply(ItemFilters.type("datadog.Scope")) scopeEvents.size() > 0