services: add temp file based binary log sink (#4404)

No need to use service provider for BinaryLogSink, it can just be an
interface that is passed into BinaryLogProviderImpl.

Add a default TempFileSink that uses the protobuf object's
writeDelimited method to write to the output stream.
Warning: TempFileSink blocks.
This commit is contained in:
zpencer 2018-05-20 17:23:48 -07:00 committed by GitHub
parent faffb09f0a
commit 30478d88c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 218 additions and 87 deletions

View File

@ -16,12 +16,13 @@
package io.grpc.services; package io.grpc.services;
import com.google.common.base.Preconditions;
import io.grpc.BinaryLogProvider; import io.grpc.BinaryLogProvider;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptor;
import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptor;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -31,22 +32,29 @@ import javax.annotation.Nullable;
class BinaryLogProviderImpl extends BinaryLogProvider { class BinaryLogProviderImpl extends BinaryLogProvider {
private static final Logger logger = Logger.getLogger(BinaryLogProviderImpl.class.getName()); private static final Logger logger = Logger.getLogger(BinaryLogProviderImpl.class.getName());
private final BinlogHelper.Factory factory; private final BinlogHelper.Factory factory;
private final BinaryLogSink sink;
private final AtomicLong counter = new AtomicLong(); private final AtomicLong counter = new AtomicLong();
public BinaryLogProviderImpl() { public BinaryLogProviderImpl() throws IOException {
this(BinaryLogSinkProvider.provider(), System.getenv("GRPC_BINARY_LOG_CONFIG")); this(new TempFileSink(), System.getenv("GRPC_BINARY_LOG_CONFIG"));
} }
BinaryLogProviderImpl(BinaryLogSink sink, String configStr) { /**
BinlogHelper.Factory factory = null; * Creates an instance.
* @param sink ownership is transferred to this class.
* @param configStr config string to parse to determine logged methods and msg size limits.
* @throws IOException if initialization failed.
*/
BinaryLogProviderImpl(BinaryLogSink sink, String configStr) throws IOException {
this.sink = Preconditions.checkNotNull(sink);
try { try {
factory = new BinlogHelper.FactoryImpl(sink, configStr); factory = new BinlogHelper.FactoryImpl(sink, configStr);
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.log(Level.SEVERE, "Caught exception, binary log will be disabled", e); sink.close();
} catch (Error err) { // parsing the conf string may throw if it is blank or contains errors
logger.log(Level.SEVERE, "Caught exception, binary log will be disabled", err); throw new IOException(
"Can not initialize. The env variable GRPC_BINARY_LOG_CONFIG must be valid.", e);
} }
this.factory = factory;
} }
@Nullable @Nullable
@ -70,6 +78,11 @@ class BinaryLogProviderImpl extends BinaryLogProvider {
return helperForMethod.getClientInterceptor(getClientCallId(callOptions)); return helperForMethod.getClientInterceptor(getClientCallId(callOptions));
} }
@Override
public void close() throws IOException {
sink.close();
}
protected CallId getServerCallId() { protected CallId getServerCallId() {
return new CallId(0, counter.getAndIncrement()); return new CallId(0, counter.getAndIncrement());
} }

View File

@ -19,23 +19,9 @@ package io.grpc.services;
import com.google.protobuf.MessageLite; import com.google.protobuf.MessageLite;
import java.io.Closeable; import java.io.Closeable;
abstract class BinaryLogSink implements Closeable { interface BinaryLogSink extends Closeable {
/** /**
* Writes the {@code message} to the destination. * Writes the {@code message} to the destination.
*/ */
public abstract void write(MessageLite message); void write(MessageLite message);
/**
* Whether this provider is available for use, taking the current environment into consideration.
* If {@code false}, no other methods are safe to be called.
*/
protected abstract boolean isAvailable();
/**
* A priority, from 0 to 10 that this provider should be used, taking the current environment into
* consideration. 5 should be considered the default, and then tweaked based on environment
* detection. A priority of 0 does not imply that the provider wouldn't work; just that it should
* be last in line.
*/
protected abstract int priority();
} }

View File

@ -1,53 +0,0 @@
/*
* Copyright 2017 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.services;
import io.grpc.InternalServiceProviders;
import java.util.Collections;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
/**
* Subclasses must be thread safe, and are responsible for writing the binary log message to
* the appropriate destination.
*/
@ThreadSafe
final class BinaryLogSinkProvider {
private static final BinaryLogSink INSTANCE = InternalServiceProviders.load(
BinaryLogSink.class,
Collections.<Class<?>>emptyList(),
BinaryLogSinkProvider.class.getClassLoader(),
new InternalServiceProviders.PriorityAccessor<BinaryLogSink>() {
@Override
public boolean isAvailable(BinaryLogSink provider) {
return provider.isAvailable();
}
@Override
public int getPriority(BinaryLogSink provider) {
return provider.priority();
}
});
/**
* Returns the {@code BinaryLogSink} that should be used.
*/
@Nullable
static BinaryLogSink provider() {
return INSTANCE;
}
}

View File

@ -19,14 +19,15 @@ package io.grpc.services;
import io.grpc.BinaryLog; import io.grpc.BinaryLog;
import io.grpc.ExperimentalApi; import io.grpc.ExperimentalApi;
import io.grpc.InternalBinaryLogs; import io.grpc.InternalBinaryLogs;
import java.io.IOException;
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4017") @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4017")
public final class BinaryLogs { public final class BinaryLogs {
public static BinaryLog createBinaryLog() { public static BinaryLog createBinaryLog() throws IOException {
return InternalBinaryLogs.createBinaryLog(new BinaryLogProviderImpl()); return InternalBinaryLogs.createBinaryLog(new BinaryLogProviderImpl());
} }
public static BinaryLog createCensusBinaryLog() { public static BinaryLog createCensusBinaryLog() throws IOException {
return InternalBinaryLogs.createBinaryLog(new CensusBinaryLogProvider()); return InternalBinaryLogs.createBinaryLog(new CensusBinaryLogProvider());
} }

View File

@ -20,9 +20,19 @@ import io.grpc.BinaryLogProvider;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.opencensus.trace.Span; import io.opencensus.trace.Span;
import io.opencensus.trace.Tracing; import io.opencensus.trace.Tracing;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
final class CensusBinaryLogProvider extends BinaryLogProviderImpl { final class CensusBinaryLogProvider extends BinaryLogProviderImpl {
public CensusBinaryLogProvider() throws IOException {
super();
}
CensusBinaryLogProvider(BinaryLogSink sink, String configStr) throws IOException {
super(sink, configStr);
}
@Override @Override
protected CallId getServerCallId() { protected CallId getServerCallId() {
Span currentSpan = Tracing.getTracer().getCurrentSpan(); Span currentSpan = Tracing.getTracer().getCurrentSpan();

View File

@ -0,0 +1,84 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.services;
import com.google.protobuf.MessageLite;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* The output file goes to the JVM's temp dir with a prefix of BINARY_INFO. The proto messages
* are written serially using {@link MessageLite#writeDelimitedTo(OutputStream)}.
*/
class TempFileSink implements BinaryLogSink {
private static final Logger logger = Logger.getLogger(TempFileSink.class.getName());
private final String outPath;
private final OutputStream out;
private boolean closed;
TempFileSink() throws IOException {
File outFile = File.createTempFile("BINARY_INFO.", "");
outPath = outFile.getPath();
logger.log(Level.INFO, "Writing binary logs to to {0}", outFile.getAbsolutePath());
out = new BufferedOutputStream(new FileOutputStream(outFile));
}
String getPath() {
return this.outPath;
}
@Override
public synchronized void write(MessageLite message) {
if (closed) {
logger.log(Level.FINEST, "Attempt to write after TempFileSink is closed.");
return;
}
try {
message.writeDelimitedTo(out);
} catch (IOException e) {
logger.log(Level.SEVERE, "Caught exception while writing", e);
closeQuietly();
}
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
closed = true;
try {
out.flush();
} finally {
out.close();
}
}
private synchronized void closeQuietly() {
try {
close();
} catch (IOException e) {
logger.log(Level.SEVERE, "Caught exception while closing", e);
}
}
}

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2018, gRPC Authors All rights reserved. * Copyright 2018 The gRPC Authors
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -18,6 +18,8 @@ package io.grpc.services;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import org.junit.Test; import org.junit.Test;
@ -42,4 +44,13 @@ public class BinaryLogProviderImplTest {
assertNull(binlog.getServerInterceptor("package.service/method")); assertNull(binlog.getServerInterceptor("package.service/method"));
assertNull(binlog.getClientInterceptor("package.service/method", CallOptions.DEFAULT)); assertNull(binlog.getClientInterceptor("package.service/method", CallOptions.DEFAULT));
} }
@Test
public void closeTest() throws Exception {
BinaryLogSink sink = mock(BinaryLogSink.class);
BinaryLogProviderImpl log = new BinaryLogProviderImpl(sink, "*");
verify(sink, never()).close();
log.close();
verify(sink).close();
}
} }

View File

@ -24,29 +24,39 @@ import io.grpc.BinaryLogProvider.CallId;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.internal.testing.StatsTestUtils.MockableSpan; import io.grpc.internal.testing.StatsTestUtils.MockableSpan;
import io.grpc.services.CensusBinaryLogProvider;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Callable;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/** /**
* Tests for {@link CensusBinaryLogProvider}. * Tests for {@link CensusBinaryLogProvider}.
*/ */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class CensusBinaryLogProviderTest { public class CensusBinaryLogProviderTest {
@Mock
private BinaryLogSink sink;
public CensusBinaryLogProviderTest() {
MockitoAnnotations.initMocks(this);
}
@Test @Test
public void serverCallIdFromCensus() { public void serverCallIdFromCensus() throws Exception {
final MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0)); final MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0));
Context context = Context.current().withValue(CONTEXT_SPAN_KEY, mockableSpan); Context context = Context.current().withValue(CONTEXT_SPAN_KEY, mockableSpan);
context.run(new Runnable() { context.call(new Callable<Void>() {
@Override @Override
public void run() { public Void call() throws Exception {
CallId callId = new CensusBinaryLogProvider().getServerCallId(); CallId callId = new CensusBinaryLogProvider(sink, "*").getServerCallId();
assertThat(callId.hi).isEqualTo(0); assertThat(callId.hi).isEqualTo(0);
assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong()) assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong())
.isEqualTo(callId.lo); .isEqualTo(callId.lo);
return null;
} }
}); });
} }
@ -54,7 +64,7 @@ public class CensusBinaryLogProviderTest {
@Test @Test
public void clientCallId() throws Exception { public void clientCallId() throws Exception {
CallId expected = new CallId(1234, 5677); CallId expected = new CallId(1234, 5677);
CallId actual = new CensusBinaryLogProvider() CallId actual = new CensusBinaryLogProvider(sink, "*")
.getClientCallId( .getClientCallId(
CallOptions.DEFAULT.withOption( CallOptions.DEFAULT.withOption(
BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY,

View File

@ -0,0 +1,69 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.services;
import static org.junit.Assert.assertEquals;
import io.grpc.binarylog.GrpcLogEntry;
import io.grpc.binarylog.Uint128;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for {@link io.grpc.services.TempFileSink}.
*/
@RunWith(JUnit4.class)
public class TempFileSinkTest {
@Test
public void readMyWrite() throws Exception {
TempFileSink sink = new TempFileSink();
GrpcLogEntry e1 = GrpcLogEntry.newBuilder()
.setCallId(Uint128.newBuilder().setLow(1234))
.build();
GrpcLogEntry e2 = GrpcLogEntry.newBuilder()
.setCallId(Uint128.newBuilder().setLow(5678))
.build();
sink.write(e1);
sink.write(e2);
sink.close();
DataInputStream input = new DataInputStream(new FileInputStream(sink.getPath()));
try {
GrpcLogEntry read1 = GrpcLogEntry.parseDelimitedFrom(input);
GrpcLogEntry read2 = GrpcLogEntry.parseDelimitedFrom(input);
assertEquals(e1, read1);
assertEquals(e2, read2);
assertEquals(-1, input.read());
} finally {
input.close();
}
}
@Test
public void writeAfterCloseIsSilent() throws IOException {
TempFileSink sink = new TempFileSink();
sink.close();
sink.write(GrpcLogEntry.newBuilder()
.setCallId(Uint128.newBuilder().setLow(1234))
.build());
}
}