Add profiling lz4 compression support

Gzip is still the default
This commit is contained in:
Nikolay Martynov 2020-02-12 14:47:09 -05:00
parent 23d3b75555
commit ca9f623d05
8 changed files with 155 additions and 13 deletions

View File

@ -23,6 +23,7 @@ dependencies {
compile deps.guava compile deps.guava
compile deps.okhttp compile deps.okhttp
compile group: 'com.github.jnr', name: 'jnr-posix', version: '3.0.52' compile group: 'com.github.jnr', name: 'jnr-posix', version: '3.0.52'
compile group: 'org.lz4', name: 'lz4-java', version: '1.7.1'
testCompile deps.junit5 testCompile deps.junit5
testCompile project(':dd-java-agent:agent-profiling:profiling-testing') testCompile project(':dd-java-agent:agent-profiling:profiling-testing')

View File

@ -5,6 +5,10 @@ enum CompressionType {
OFF, OFF,
/** Default compression */ /** Default compression */
ON, ON,
/** Lower compression ratio with less CPU overhead * */
LOW,
/** Better compression ratio for the price of higher CPU usage * */
MEDIUM,
/** Unknown compression config value */ /** Unknown compression config value */
UNKNOWN; UNKNOWN;
@ -18,6 +22,10 @@ enum CompressionType {
return OFF; return OFF;
case "on": case "on":
return ON; return ON;
case "low":
return LOW;
case "medium":
return MEDIUM;
default: default:
return UNKNOWN; return UNKNOWN;
} }

View File

@ -244,7 +244,13 @@ public final class RecordingUploader {
// currently only gzip and off are supported // currently only gzip and off are supported
// this needs to be updated once more compression types are added // this needs to be updated once more compression types are added
switch (type) { switch (type) {
case LOW:
{
compression = (is, expectedSize) -> StreamUtils.lz4Stream(is, expectedSize, consumer);
break;
}
case ON: case ON:
case MEDIUM:
{ {
compression = (is, expectedSize) -> StreamUtils.gzipStream(is, expectedSize, consumer); compression = (is, expectedSize) -> StreamUtils.gzipStream(is, expectedSize, consumer);
break; break;

View File

@ -8,11 +8,15 @@ import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
import net.jpountz.lz4.LZ4FrameOutputStream;
import org.openjdk.jmc.common.io.IOToolkit; import org.openjdk.jmc.common.io.IOToolkit;
/** A collection of I/O stream related helper methods */ /** A collection of I/O stream related helper methods */
public final class StreamUtils { public final class StreamUtils {
// https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md#general-structure-of-lz4-frame-format
static final int[] LZ4_MAGIC = new int[] {0x04, 0x22, 0x4D, 0x18};
// JMC's IOToolkit hides this from us... // JMC's IOToolkit hides this from us...
static final int ZIP_MAGIC[] = new int[] {80, 75, 3, 4}; static final int ZIP_MAGIC[] = new int[] {80, 75, 3, 4};
static final int GZ_MAGIC[] = new int[] {31, 139}; static final int GZ_MAGIC[] = new int[] {31, 139};
@ -32,10 +36,10 @@ public final class StreamUtils {
/** /**
* Read a stream into a consumer gzip-compressing content. If the stream is already compressed * Read a stream into a consumer gzip-compressing content. If the stream is already compressed
* (gzip, zip) the original data will be returned. * (gzip, zip, lz4) the original data will be returned.
* *
* @param is the input stream * @param is the input stream
* @return zipped contents of the input stream or the the original content if the stream is * @return gzipped contents of the input stream or the the original content if the stream is
* already compressed * already compressed
* @throws IOException * @throws IOException
*/ */
@ -53,6 +57,29 @@ public final class StreamUtils {
} }
} }
/**
* Read a stream into a consumer lz4-compressing content. If the stream is already compressed
* (gzip, zip, lz4) the original data will be returned.
*
* @param is the input stream
* @return lz4ed contents of the input stream or the the original content if the stream is already
* compressed
* @throws IOException
*/
public static <T> T lz4Stream(
InputStream is, final int expectedSize, final BytesConsumer<T> consumer) throws IOException {
is = ensureMarkSupported(is);
if (isCompressed(is)) {
return readStream(is, expectedSize, consumer);
} else {
final FastByteArrayOutputStream baos = new FastByteArrayOutputStream(expectedSize);
try (final OutputStream zipped = new LZ4FrameOutputStream(baos)) {
copy(is, zipped);
}
return baos.consume(consumer);
}
}
/** /**
* Read a stream into a consumer. * Read a stream into a consumer.
* *
@ -168,7 +195,7 @@ public final class StreamUtils {
*/ */
private static boolean isCompressed(final InputStream is) throws IOException { private static boolean isCompressed(final InputStream is) throws IOException {
checkMarkSupported(is); checkMarkSupported(is);
return isGzip(is) || isZip(is); return isGzip(is) || isLz4(is) || isZip(is);
} }
/** /**
@ -205,6 +232,23 @@ public final class StreamUtils {
} }
} }
/**
* Check whether the stream represents LZ4 data
*
* @param is input stream; must support {@linkplain InputStream#mark(int)}
* @return {@literal true} if the stream represents LZ4 data
* @throws IOException
*/
private static boolean isLz4(final InputStream is) throws IOException {
checkMarkSupported(is);
is.mark(LZ4_MAGIC.length);
try {
return IOToolkit.hasMagic(is, LZ4_MAGIC);
} finally {
is.reset();
}
}
private static InputStream ensureMarkSupported(InputStream is) { private static InputStream ensureMarkSupported(InputStream is) {
if (!is.markSupported()) { if (!is.markSupported()) {
is = new BufferedInputStream(is); is = new BufferedInputStream(is);

View File

@ -49,6 +49,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import net.jpountz.lz4.LZ4FrameInputStream;
import okhttp3.Credentials; import okhttp3.Credentials;
import okhttp3.HttpUrl; import okhttp3.HttpUrl;
import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockResponse;
@ -138,7 +139,7 @@ public class RecordingUploaderTest {
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(strings = {"on", "off", "invalid"}) @ValueSource(strings = {"on", "low", "medium", "off", "invalid"})
public void testRequestParameters(final String compression) public void testRequestParameters(final String compression)
throws IOException, InterruptedException { throws IOException, InterruptedException {
when(config.getProfilingUploadCompression()).thenReturn(compression); when(config.getProfilingUploadCompression()).thenReturn(compression);
@ -184,8 +185,10 @@ public class RecordingUploaderTest {
byte[] uploadedBytes = byte[] uploadedBytes =
(byte[]) Iterables.getFirst(parameters.get(RecordingUploader.DATA_PARAM), new byte[] {}); (byte[]) Iterables.getFirst(parameters.get(RecordingUploader.DATA_PARAM), new byte[] {});
if (compression.equals("on") || compression.equals("invalid")) { if (compression.equals("on") || compression.equals("medium") || compression.equals("invalid")) {
uploadedBytes = unGzip(uploadedBytes); uploadedBytes = unGzip(uploadedBytes);
} else if (compression.equals("low")) {
uploadedBytes = unLz4(uploadedBytes);
} }
assertArrayEquals(expectedBytes, uploadedBytes); assertArrayEquals(expectedBytes, uploadedBytes);
} }
@ -433,4 +436,11 @@ public class RecordingUploaderTest {
ByteStreams.copy(stream, result); ByteStreams.copy(stream, result);
return result.toByteArray(); return result.toByteArray();
} }
private byte[] unLz4(final byte[] compressed) throws IOException {
final InputStream stream = new LZ4FrameInputStream(new ByteArrayInputStream(compressed));
final ByteArrayOutputStream result = new ByteArrayOutputStream();
ByteStreams.copy(stream, result);
return result.toByteArray();
}
} }

View File

@ -13,6 +13,8 @@ import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
import net.jpountz.lz4.LZ4FrameInputStream;
import net.jpountz.lz4.LZ4FrameOutputStream;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -25,6 +27,7 @@ class StreamUtilsTest {
private static byte[] testRecordingBytes; private static byte[] testRecordingBytes;
private static byte[] testRecordingGzippedBytes; private static byte[] testRecordingGzippedBytes;
private static byte[] testRecordingZippedBytes; private static byte[] testRecordingZippedBytes;
private static byte[] testRecordingLz4edBytes;
@BeforeAll @BeforeAll
public static void setupClass() throws IOException { public static void setupClass() throws IOException {
@ -37,6 +40,10 @@ class StreamUtilsTest {
final ByteArrayOutputStream zippedStream = new ByteArrayOutputStream(); final ByteArrayOutputStream zippedStream = new ByteArrayOutputStream();
ByteStreams.copy(testRecordingStream(), createZipOutputStream(zippedStream)); ByteStreams.copy(testRecordingStream(), createZipOutputStream(zippedStream));
testRecordingZippedBytes = zippedStream.toByteArray(); testRecordingZippedBytes = zippedStream.toByteArray();
final ByteArrayOutputStream zl4edStream = new ByteArrayOutputStream();
ByteStreams.copy(testRecordingStream(), new LZ4FrameOutputStream(zl4edStream));
testRecordingLz4edBytes = zl4edStream.toByteArray();
} }
@Test @Test
@ -61,9 +68,7 @@ class StreamUtilsTest {
final byte[] gzippedBytes = final byte[] gzippedBytes =
StreamUtils.gzipStream(testRecordingStream(), expectedSize, CONSUME_TO_BYTES); StreamUtils.gzipStream(testRecordingStream(), expectedSize, CONSUME_TO_BYTES);
final byte[] uncompressedBytes = assertArrayEquals(testRecordingBytes, uncompressGzip(gzippedBytes));
ByteStreams.toByteArray(new GZIPInputStream(new ByteArrayInputStream(gzippedBytes)));
assertArrayEquals(testRecordingBytes, uncompress(gzippedBytes));
} }
@Test @Test
@ -72,11 +77,11 @@ class StreamUtilsTest {
final byte[] gzippedBytes = final byte[] gzippedBytes =
StreamUtils.gzipStream(testRecordingStream(), expectedSize, CONSUME_TO_BYTES); StreamUtils.gzipStream(testRecordingStream(), expectedSize, CONSUME_TO_BYTES);
assertArrayEquals(testRecordingBytes, uncompress(gzippedBytes)); assertArrayEquals(testRecordingBytes, uncompressGzip(gzippedBytes));
} }
@Test @Test
public void alreadyGzipStream() throws IOException { public void gzipAlreadyGzippedStream() throws IOException {
final byte[] bytes = final byte[] bytes =
StreamUtils.gzipStream( StreamUtils.gzipStream(
new ByteArrayInputStream(testRecordingGzippedBytes), new ByteArrayInputStream(testRecordingGzippedBytes),
@ -87,7 +92,7 @@ class StreamUtilsTest {
} }
@Test @Test
public void alreadyZipStream() throws IOException { public void gzipAlreadyZippedStream() throws IOException {
final byte[] bytes = final byte[] bytes =
StreamUtils.gzipStream( StreamUtils.gzipStream(
new ByteArrayInputStream(testRecordingZippedBytes), new ByteArrayInputStream(testRecordingZippedBytes),
@ -97,6 +102,68 @@ class StreamUtilsTest {
assertArrayEquals(testRecordingZippedBytes, bytes); assertArrayEquals(testRecordingZippedBytes, bytes);
} }
@Test
public void gzipAlreadyLz4edStream() throws IOException {
final byte[] bytes =
StreamUtils.gzipStream(
new ByteArrayInputStream(testRecordingLz4edBytes),
DEFAULT_EXPECTED_SIZE,
CONSUME_TO_BYTES);
assertArrayEquals(testRecordingLz4edBytes, bytes);
}
@Test
public void lz4Stream() throws IOException {
final int expectedSize = 1; // Try very small value to test 'undershoot' logic
final byte[] gzippedBytes =
StreamUtils.lz4Stream(testRecordingStream(), expectedSize, CONSUME_TO_BYTES);
assertArrayEquals(testRecordingBytes, uncompressLz4(gzippedBytes));
}
@Test
public void lz4StreamLargeExpectedSize() throws IOException {
final int expectedSize = testRecordingBytes.length * 2; // overshoot the size
final byte[] gzippedBytes =
StreamUtils.lz4Stream(testRecordingStream(), expectedSize, CONSUME_TO_BYTES);
assertArrayEquals(testRecordingBytes, uncompressLz4(gzippedBytes));
}
@Test
public void lz4AlreadyGzippedStream() throws IOException {
final byte[] bytes =
StreamUtils.lz4Stream(
new ByteArrayInputStream(testRecordingGzippedBytes),
DEFAULT_EXPECTED_SIZE,
CONSUME_TO_BYTES);
assertArrayEquals(testRecordingGzippedBytes, bytes);
}
@Test
public void lz4AlreadyZippedStream() throws IOException {
final byte[] bytes =
StreamUtils.lz4Stream(
new ByteArrayInputStream(testRecordingZippedBytes),
DEFAULT_EXPECTED_SIZE,
CONSUME_TO_BYTES);
assertArrayEquals(testRecordingZippedBytes, bytes);
}
@Test
public void lz4AlreadyLz4edStream() throws IOException {
final byte[] bytes =
StreamUtils.lz4Stream(
new ByteArrayInputStream(testRecordingLz4edBytes),
DEFAULT_EXPECTED_SIZE,
CONSUME_TO_BYTES);
assertArrayEquals(testRecordingLz4edBytes, bytes);
}
private static InputStream testRecordingStream() { private static InputStream testRecordingStream() {
return StreamUtilsTest.class.getResourceAsStream("/test-recording.jfr"); return StreamUtilsTest.class.getResourceAsStream("/test-recording.jfr");
} }
@ -108,7 +175,11 @@ class StreamUtilsTest {
return result; return result;
} }
private static byte[] uncompress(final byte[] bytes) throws IOException { private static byte[] uncompressGzip(final byte[] bytes) throws IOException {
return ByteStreams.toByteArray(new GZIPInputStream(new ByteArrayInputStream(bytes))); return ByteStreams.toByteArray(new GZIPInputStream(new ByteArrayInputStream(bytes)));
} }
private static byte[] uncompressLz4(final byte[] bytes) throws IOException {
return ByteStreams.toByteArray(new LZ4FrameInputStream(new ByteArrayInputStream(bytes)));
}
} }

View File

@ -30,6 +30,7 @@ dependencies {
testCompile "org.openjdk.jmc:flightrecorder:$jmcVersion" testCompile "org.openjdk.jmc:flightrecorder:$jmcVersion"
testCompile "org.openjdk.jmc:flightrecorder.rules:$jmcVersion" testCompile "org.openjdk.jmc:flightrecorder.rules:$jmcVersion"
testCompile "org.openjdk.jmc:flightrecorder.rules.jdk:$jmcVersion" testCompile "org.openjdk.jmc:flightrecorder.rules.jdk:$jmcVersion"
compile group: 'org.lz4', name: 'lz4-java', version: '1.7.1'
} }
tasks.withType(Test).configureEach { tasks.withType(Test).configureEach {

View File

@ -11,6 +11,7 @@ import org.openjdk.jmc.flightrecorder.JfrLoaderToolkit
import java.time.Instant import java.time.Instant
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.zip.GZIPInputStream
class ProfilingIntegrationContinuousProfilesTest extends AbstractSmokeTest { class ProfilingIntegrationContinuousProfilesTest extends AbstractSmokeTest {
@ -96,7 +97,7 @@ class ProfilingIntegrationContinuousProfilesTest extends AbstractSmokeTest {
firstRequestParameters.get("chunk-data").get(0) != null firstRequestParameters.get("chunk-data").get(0) != null
IItemCollection events = JfrLoaderToolkit.loadEvents(new ByteArrayInputStream(secondRequestParameters.get("chunk-data").get(0))) IItemCollection events = JfrLoaderToolkit.loadEvents(new GZIPInputStream(new ByteArrayInputStream(secondRequestParameters.get("chunk-data").get(0))))
IItemCollection scopeEvents = events.apply(ItemFilters.type("datadog.Scope")) IItemCollection scopeEvents = events.apply(ItemFilters.type("datadog.Scope"))
scopeEvents.size() > 0 scopeEvents.size() > 0