core,services: binary log should use real peer socket and call id (#4266)

The peer socket is read from TRANSPORT_ATTR_REMOTE_ADDR from the
stream attributes. We only log the peer on receive initial metadata.

The call id assumes census is available. The call ID read from the
context via SERVER_CALL_ID_CONTEXT_KEY on server side, and read from
CallOptions via CLIENT_CALL_ID_CALLOPTION_KEY on client side. The
value is copied from CONTEXT_SPAN_KEY which is set by census.

Pass around CallId with two longs, not a byte[].
This commit is contained in:
zpencer 2018-04-03 13:39:11 -07:00 committed by GitHub
parent 724e32fe57
commit b1d91b9f60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 321 additions and 97 deletions

View File

@ -416,6 +416,9 @@ public abstract class AbstractManagedChannelImplBuilder
Tracing.getPropagationComponent().getBinaryFormat()); Tracing.getPropagationComponent().getBinaryFormat());
effectiveInterceptors.add(0, censusTracing.getClientInterceptor()); effectiveInterceptors.add(0, censusTracing.getClientInterceptor());
} }
if (binlogProvider != null) {
effectiveInterceptors.add(0, binlogProvider.getClientCallIdSetter());
}
return effectiveInterceptors; return effectiveInterceptors;
} }

View File

@ -266,6 +266,9 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
Tracing.getPropagationComponent().getBinaryFormat()); Tracing.getPropagationComponent().getBinaryFormat());
tracerFactories.add(censusTracing.getServerTracerFactory()); tracerFactories.add(censusTracing.getServerTracerFactory());
} }
if (binlogProvider != null) {
tracerFactories.add(binlogProvider.getServerCallIdSetter());
}
tracerFactories.addAll(streamTracerFactories); tracerFactories.addAll(streamTracerFactories);
return tracerFactories; return tracerFactories;
} }

View File

@ -22,24 +22,39 @@ import io.grpc.Channel;
import io.grpc.ClientCall; import io.grpc.ClientCall;
import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors; import io.grpc.ClientInterceptors;
import io.grpc.Context;
import io.grpc.Internal;
import io.grpc.InternalClientInterceptors; import io.grpc.InternalClientInterceptors;
import io.grpc.InternalServerInterceptors; import io.grpc.InternalServerInterceptors;
import io.grpc.InternalServiceProviders; import io.grpc.InternalServiceProviders;
import io.grpc.InternalServiceProviders.PriorityAccessor; import io.grpc.InternalServiceProviders.PriorityAccessor;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.Marshaller; import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.ServerCallHandler; import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition; import io.grpc.ServerMethodDefinition;
import io.grpc.ServerStreamTracer;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracing;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
public abstract class BinaryLogProvider implements Closeable { public abstract class BinaryLogProvider implements Closeable {
// TODO(zpencer): move to services and make package private
@Internal
public static final Context.Key<CallId> SERVER_CALL_ID_CONTEXT_KEY
= Context.key("binarylog-context-key");
// TODO(zpencer): move to services and make package private when this class is moved
@Internal
public static final CallOptions.Key<CallId> CLIENT_CALL_ID_CALLOPTION_KEY
= CallOptions.Key.of("binarylog-calloptions-key", null);
@VisibleForTesting @VisibleForTesting
public static final Marshaller<byte[]> BYTEARRAY_MARSHALLER = new ByteArrayMarshaller(); public static final Marshaller<byte[]> BYTEARRAY_MARSHALLER = new ByteArrayMarshaller();
@ -128,6 +143,62 @@ public abstract class BinaryLogProvider implements Closeable {
// TODO(zpencer): make BinaryLogProvider provide a BinaryLog, and this method belongs there // TODO(zpencer): make BinaryLogProvider provide a BinaryLog, and this method belongs there
} }
private static final ServerStreamTracer SERVER_CALLID_SETTER = new ServerStreamTracer() {
@Override
public Context filterContext(Context context) {
Context toRestore = context.attach();
try {
Span span = Tracing.getTracer().getCurrentSpan();
if (span == null) {
return context;
}
return context.withValue(SERVER_CALL_ID_CONTEXT_KEY, CallId.fromCensusSpan(span));
} finally {
context.detach(toRestore);
}
}
};
private static final ServerStreamTracer.Factory SERVER_CALLID_SETTER_FACTORY
= new ServerStreamTracer.Factory() {
@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
return SERVER_CALLID_SETTER;
}
};
/**
* Returns a {@link ServerStreamTracer.Factory} that copies the call ID to the {@link Context}
* as {@code SERVER_CALL_ID_CONTEXT_KEY}.
*/
public ServerStreamTracer.Factory getServerCallIdSetter() {
return SERVER_CALLID_SETTER_FACTORY;
}
private static final ClientInterceptor CLIENT_CALLID_SETTER = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
Span span = Tracing.getTracer().getCurrentSpan();
if (span == null) {
return next.newCall(method, callOptions);
}
return next.newCall(
method,
callOptions.withOption(CLIENT_CALL_ID_CALLOPTION_KEY, CallId.fromCensusSpan(span)));
}
};
/**
* Returns a {@link ClientInterceptor} that copies the call ID to the {@link CallOptions}
* as {@code CALL_CLIENT_CALL_ID_CALLOPTION_KEY}.
*/
public ClientInterceptor getClientCallIdSetter() {
return CLIENT_CALLID_SETTER;
}
/** /**
* A priority, from 0 to 10 that this provider should be used, taking the current environment into * A priority, from 0 to 10 that this provider should be used, taking the current environment into
* consideration. 5 should be considered the default, and then tweaked based on environment * consideration. 5 should be considered the default, and then tweaked based on environment
@ -192,4 +263,25 @@ public abstract class BinaryLogProvider implements Closeable {
} }
} }
} }
/**
* A CallId is two byte[] arrays both of size 8 that uniquely identifies the RPC. Users are
* free to use the byte arrays however they see fit.
*/
public static final class CallId {
public final long hi;
public final long lo;
/**
* Creates an instance.
*/
public CallId(long hi, long lo) {
this.hi = hi;
this.lo = lo;
}
static CallId fromCensusSpan(Span span) {
return new CallId(0, ByteBuffer.wrap(span.getContext().getSpanId().getBytes()).getLong());
}
}
} }

View File

@ -17,14 +17,17 @@
package io.grpc.internal; package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.Channel; import io.grpc.Channel;
import io.grpc.ClientCall; import io.grpc.ClientCall;
import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
@ -38,12 +41,20 @@ import io.grpc.ServerCall;
import io.grpc.ServerCallHandler; import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition; import io.grpc.ServerMethodDefinition;
import io.grpc.ServerStreamTracer;
import io.grpc.StringMarshaller; import io.grpc.StringMarshaller;
import io.grpc.internal.BinaryLogProvider.CallId;
import io.grpc.internal.testing.StatsTestUtils.MockableSpan;
import io.grpc.testing.TestMethodDescriptors;
import io.opencensus.trace.Tracing;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -288,6 +299,15 @@ public class BinaryLogProviderTest {
(int) method.parseResponse(new ByteArrayInputStream((byte[]) serializedResp.get(0)))); (int) method.parseResponse(new ByteArrayInputStream((byte[]) serializedResp.get(0))));
} }
@Test
public void callIdFromSpan() {
MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0));
CallId callId = CallId.fromCensusSpan(mockableSpan);
assertThat(callId.hi).isEqualTo(0);
assertThat(callId.lo)
.isEqualTo(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong());
}
@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
private static void onServerMessageHelper(ServerCall.Listener listener, Object request) { private static void onServerMessageHelper(ServerCall.Listener listener, Object request) {
listener.onMessage(request); listener.onMessage(request);
@ -310,6 +330,56 @@ public class BinaryLogProviderTest {
return methodDef.getServerCallHandler().startCall(serverCall, new Metadata()); return methodDef.getServerCallHandler().startCall(serverCall, new Metadata());
} }
@Test
public void serverCallIdSetter() {
ServerStreamTracer tracer = binlogProvider
.getServerCallIdSetter()
.newServerStreamTracer("service/method", new Metadata());
MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0));
Context context = Context.current().withValue(CONTEXT_SPAN_KEY, mockableSpan);
Context filtered = tracer.filterContext(context);
CallId callId = BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY.get(filtered);
assertThat(callId.hi).isEqualTo(0);
assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong())
.isEqualTo(callId.lo);
}
@Test
public void clientCallIdSetter() throws Exception {
final MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0));
Tracing.getTracer().withSpan(mockableSpan, new Callable<Void>() {
@Override
public Void call() throws Exception {
final SettableFuture<CallOptions> future = SettableFuture.create();
Channel channel = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
future.set(callOptions);
return null;
}
@Override
public String authority() {
return null;
}
};
binlogProvider.getClientCallIdSetter().interceptCall(
TestMethodDescriptors.voidMethod(),
CallOptions.DEFAULT,
channel);
CallOptions callOptions = future.get();
CallId callId = callOptions
.getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY);
assertThat(callId.hi).isEqualTo(0);
assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong())
.isEqualTo(callId.lo);
return null;
}
}).call();
}
private final class TestBinaryLogClientInterceptor implements ClientInterceptor { private final class TestBinaryLogClientInterceptor implements ClientInterceptor {
@Override @Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(

View File

@ -370,7 +370,6 @@ class NettyServer implements InternalServer, WithLogId {
} }
} }
}); });
return ret; return ret;
} }

View File

@ -23,14 +23,17 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.primitives.Bytes; import com.google.common.primitives.Bytes;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import io.grpc.Attributes;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.Channel; import io.grpc.Channel;
import io.grpc.ClientCall; import io.grpc.ClientCall;
import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener;
import io.grpc.Grpc;
import io.grpc.InternalMetadata; import io.grpc.InternalMetadata;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
@ -48,12 +51,13 @@ import io.grpc.binarylog.MetadataEntry;
import io.grpc.binarylog.Peer; import io.grpc.binarylog.Peer;
import io.grpc.binarylog.Peer.PeerType; import io.grpc.binarylog.Peer.PeerType;
import io.grpc.binarylog.Uint128; import io.grpc.binarylog.Uint128;
import io.grpc.internal.BinaryLogProvider;
import io.grpc.internal.BinaryLogProvider.CallId;
import java.net.Inet4Address; import java.net.Inet4Address;
import java.net.Inet6Address; import java.net.Inet6Address;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -77,9 +81,8 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
private static final boolean SERVER = true; private static final boolean SERVER = true;
private static final boolean CLIENT = false; private static final boolean CLIENT = false;
// TODO(zpencer): extract these fields from call and stop using dummy values
@VisibleForTesting @VisibleForTesting
static final byte[] dumyCallId = new byte[16]; static final CallId emptyCallId = new CallId(0, 0);
@VisibleForTesting @VisibleForTesting
static final SocketAddress DUMMY_SOCKET = new DummySocketAddress(); static final SocketAddress DUMMY_SOCKET = new DummySocketAddress();
@VisibleForTesting @VisibleForTesting
@ -106,14 +109,12 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
} }
@Override @Override
void logSendInitialMetadata( void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId) {
Metadata metadata, boolean isServer, byte[] callId, SocketAddress peerSocket) {
GrpcLogEntry entry = GrpcLogEntry GrpcLogEntry entry = GrpcLogEntry
.newBuilder() .newBuilder()
.setType(Type.SEND_INITIAL_METADATA) .setType(Type.SEND_INITIAL_METADATA)
.setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
.setCallId(callIdToProto(callId)) .setCallId(callIdToProto(callId))
.setPeer(socketToProto(peerSocket))
.setMetadata(metadataToProto(metadata, maxHeaderBytes)) .setMetadata(metadataToProto(metadata, maxHeaderBytes))
.build(); .build();
sink.write(entry); sink.write(entry);
@ -121,7 +122,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
@Override @Override
void logRecvInitialMetadata( void logRecvInitialMetadata(
Metadata metadata, boolean isServer, byte[] callId, SocketAddress peerSocket) { Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) {
GrpcLogEntry entry = GrpcLogEntry GrpcLogEntry entry = GrpcLogEntry
.newBuilder() .newBuilder()
.setType(Type.RECV_INITIAL_METADATA) .setType(Type.RECV_INITIAL_METADATA)
@ -134,7 +135,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
} }
@Override @Override
void logTrailingMetadata(Metadata metadata, boolean isServer, byte[] callId) { void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId) {
GrpcLogEntry entry = GrpcLogEntry GrpcLogEntry entry = GrpcLogEntry
.newBuilder() .newBuilder()
.setType(isServer ? Type.SEND_TRAILING_METADATA : Type.RECV_TRAILING_METADATA) .setType(isServer ? Type.SEND_TRAILING_METADATA : Type.RECV_TRAILING_METADATA)
@ -151,7 +152,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
T message, T message,
boolean compressed, boolean compressed,
boolean isServer, boolean isServer,
byte[] callId) { CallId callId) {
if (marshaller != BYTEARRAY_MARSHALLER) { if (marshaller != BYTEARRAY_MARSHALLER) {
throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller"); throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller");
} }
@ -172,7 +173,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
T message, T message,
boolean compressed, boolean compressed,
boolean isServer, boolean isServer,
byte[] callId) { CallId callId) {
if (marshaller != BYTEARRAY_MARSHALLER) { if (marshaller != BYTEARRAY_MARSHALLER) {
throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller"); throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller");
} }
@ -203,21 +204,20 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
* Logs the sending of initial metadata. This method logs the appropriate number of bytes * Logs the sending of initial metadata. This method logs the appropriate number of bytes
* as determined by the binary logging configuration. * as determined by the binary logging configuration.
*/ */
abstract void logSendInitialMetadata( abstract void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId);
Metadata metadata, boolean isServer, byte[] callId, SocketAddress peerSocket);
/** /**
* Logs the receiving of initial metadata. This method logs the appropriate number of bytes * Logs the receiving of initial metadata. This method logs the appropriate number of bytes
* as determined by the binary logging configuration. * as determined by the binary logging configuration.
*/ */
abstract void logRecvInitialMetadata( abstract void logRecvInitialMetadata(
Metadata metadata, boolean isServer, byte[] callId, SocketAddress peerSocket); Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket);
/** /**
* Logs the trailing metadata. This method logs the appropriate number of bytes * Logs the trailing metadata. This method logs the appropriate number of bytes
* as determined by the binary logging configuration. * as determined by the binary logging configuration.
*/ */
abstract void logTrailingMetadata(Metadata metadata, boolean isServer, byte[] callId); abstract void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId);
/** /**
* Logs the outbound message. This method logs the appropriate number of bytes from * Logs the outbound message. This method logs the appropriate number of bytes from
@ -226,7 +226,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
* This method takes ownership of {@code message}. * This method takes ownership of {@code message}.
*/ */
abstract <T> void logOutboundMessage( abstract <T> void logOutboundMessage(
Marshaller<T> marshaller, T message, boolean compressed, boolean isServer, byte[] callId); Marshaller<T> marshaller, T message, boolean compressed, boolean isServer, CallId callId);
/** /**
* Logs the inbound message. This method logs the appropriate number of bytes from * Logs the inbound message. This method logs the appropriate number of bytes from
@ -235,7 +235,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
* This method takes ownership of {@code message}. * This method takes ownership of {@code message}.
*/ */
abstract <T> void logInboundMessage( abstract <T> void logInboundMessage(
Marshaller<T> marshaller, T message, boolean compressed, boolean isServer, byte[] callId); Marshaller<T> marshaller, T message, boolean compressed, boolean isServer, CallId callId);
/** /**
* Returns the number bytes of the header this writer will log, according to configuration. * Returns the number bytes of the header this writer will log, according to configuration.
@ -275,13 +275,38 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
return DEFAULT_FACTORY.getLog(fullMethodName); return DEFAULT_FACTORY.getLog(fullMethodName);
} }
static CallId getCallIdForServer(Context context) {
CallId callId = BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY.get(context);
if (callId == null) {
return emptyCallId;
}
return callId;
}
static CallId getCallIdForClient(CallOptions callOptions) {
CallId callId = callOptions.getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY);
if (callId == null) {
return emptyCallId;
}
return callId;
}
static SocketAddress getPeerSocket(Attributes streamAttributes) {
SocketAddress peer = streamAttributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (peer == null) {
return DUMMY_SOCKET;
}
return peer;
}
@Override @Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
final CallId callId = getCallIdForClient(callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override @Override
public void start(Listener<RespT> responseListener, Metadata headers) { public void start(Listener<RespT> responseListener, Metadata headers) {
writer.logSendInitialMetadata(headers, CLIENT, dumyCallId, DUMMY_SOCKET); writer.logSendInitialMetadata(headers, CLIENT, callId);
ClientCall.Listener<RespT> wListener = ClientCall.Listener<RespT> wListener =
new SimpleForwardingClientCallListener<RespT>(responseListener) { new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override @Override
@ -291,19 +316,20 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
message, message,
DUMMY_IS_COMPRESSED, DUMMY_IS_COMPRESSED,
CLIENT, CLIENT,
dumyCallId); callId);
super.onMessage(message); super.onMessage(message);
} }
@Override @Override
public void onHeaders(Metadata headers) { public void onHeaders(Metadata headers) {
writer.logRecvInitialMetadata(headers, CLIENT, dumyCallId, DUMMY_SOCKET); SocketAddress peer = getPeerSocket(getAttributes());
writer.logRecvInitialMetadata(headers, CLIENT, callId, peer);
super.onHeaders(headers); super.onHeaders(headers);
} }
@Override @Override
public void onClose(Status status, Metadata trailers) { public void onClose(Status status, Metadata trailers) {
writer.logTrailingMetadata(trailers, CLIENT, dumyCallId); writer.logTrailingMetadata(trailers, CLIENT, callId);
super.onClose(status, trailers); super.onClose(status, trailers);
} }
}; };
@ -317,7 +343,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
message, message,
DUMMY_IS_COMPRESSED, DUMMY_IS_COMPRESSED,
CLIENT, CLIENT,
dumyCallId); callId);
super.sendMessage(message); super.sendMessage(message);
} }
}; };
@ -326,7 +352,9 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
@Override @Override
public <ReqT, RespT> Listener<ReqT> interceptCall( public <ReqT, RespT> Listener<ReqT> interceptCall(
final ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { final ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
writer.logRecvInitialMetadata(headers, SERVER, dumyCallId, DUMMY_SOCKET); final CallId callId = getCallIdForServer(Context.current());
SocketAddress peer = getPeerSocket(call.getAttributes());
writer.logRecvInitialMetadata(headers, SERVER, callId, peer);
ServerCall<ReqT, RespT> wCall = new SimpleForwardingServerCall<ReqT, RespT>(call) { ServerCall<ReqT, RespT> wCall = new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override @Override
public void sendMessage(RespT message) { public void sendMessage(RespT message) {
@ -335,19 +363,19 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
message, message,
DUMMY_IS_COMPRESSED, DUMMY_IS_COMPRESSED,
SERVER, SERVER,
dumyCallId); callId);
super.sendMessage(message); super.sendMessage(message);
} }
@Override @Override
public void sendHeaders(Metadata headers) { public void sendHeaders(Metadata headers) {
writer.logSendInitialMetadata(headers, SERVER, dumyCallId, DUMMY_SOCKET); writer.logSendInitialMetadata(headers, SERVER, callId);
super.sendHeaders(headers); super.sendHeaders(headers);
} }
@Override @Override
public void close(Status status, Metadata trailers) { public void close(Status status, Metadata trailers) {
writer.logTrailingMetadata(trailers, SERVER, dumyCallId); writer.logTrailingMetadata(trailers, SERVER, callId);
super.close(status, trailers); super.close(status, trailers);
} }
}; };
@ -360,7 +388,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
message, message,
DUMMY_IS_COMPRESSED, DUMMY_IS_COMPRESSED,
SERVER, SERVER,
dumyCallId); callId);
super.onMessage(message); super.onMessage(message);
} }
}; };
@ -522,19 +550,15 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
} }
/** /**
* Returns a {@link Uint128} by interpreting the first 8 bytes as the high int64 and the second * Returns a {@link Uint128} from a CallId.
* 8 bytes as the low int64.
*/ */
// TODO(zpencer): verify int64 representation with other gRPC languages static Uint128 callIdToProto(CallId callId) {
static Uint128 callIdToProto(byte[] bytes) { Preconditions.checkNotNull(callId);
Preconditions.checkNotNull(bytes); return Uint128
Preconditions.checkArgument( .newBuilder()
bytes.length == 16, .setHigh(callId.hi)
String.format("can only convert from 16 byte input, actual length = %d", bytes.length)); .setLow(callId.lo)
ByteBuffer bb = ByteBuffer.wrap(bytes); .build();
long high = bb.getLong();
long low = bb.getLong();
return Uint128.newBuilder().setHigh(high).setLow(low).build();
} }
@VisibleForTesting @VisibleForTesting

View File

@ -18,12 +18,13 @@ package io.grpc.services;
import static io.grpc.internal.BinaryLogProvider.BYTEARRAY_MARSHALLER; import static io.grpc.internal.BinaryLogProvider.BYTEARRAY_MARSHALLER;
import static io.grpc.services.BinaryLog.DUMMY_SOCKET; import static io.grpc.services.BinaryLog.DUMMY_SOCKET;
import static io.grpc.services.BinaryLog.dumyCallId; import static io.grpc.services.BinaryLog.emptyCallId;
import static io.grpc.services.BinaryLog.getCallIdForClient;
import static io.grpc.services.BinaryLog.getCallIdForServer;
import static io.grpc.services.BinaryLog.getPeerSocket;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -32,9 +33,12 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.primitives.Bytes; import com.google.common.primitives.Bytes;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import io.grpc.Attributes;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.Channel; import io.grpc.Channel;
import io.grpc.ClientCall; import io.grpc.ClientCall;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType; import io.grpc.MethodDescriptor.MethodType;
@ -47,6 +51,8 @@ import io.grpc.binarylog.MetadataEntry;
import io.grpc.binarylog.Peer; import io.grpc.binarylog.Peer;
import io.grpc.binarylog.Peer.PeerType; import io.grpc.binarylog.Peer.PeerType;
import io.grpc.binarylog.Uint128; import io.grpc.binarylog.Uint128;
import io.grpc.internal.BinaryLogProvider;
import io.grpc.internal.BinaryLogProvider.CallId;
import io.grpc.internal.NoopClientCall; import io.grpc.internal.NoopClientCall;
import io.grpc.internal.NoopServerCall; import io.grpc.internal.NoopServerCall;
import io.grpc.services.BinaryLog.FactoryImpl; import io.grpc.services.BinaryLog.FactoryImpl;
@ -59,6 +65,7 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -108,9 +115,9 @@ public final class BinaryLogTest {
private static final boolean IS_CLIENT = false; private static final boolean IS_CLIENT = false;
private static final boolean IS_COMPRESSED = true; private static final boolean IS_COMPRESSED = true;
private static final boolean IS_UNCOMPRESSED = false; private static final boolean IS_UNCOMPRESSED = false;
private static final byte[] CALL_ID = new byte[] { // TODO(zpencer): rename this to callId, since byte[] is mutable
0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, private static final CallId CALL_ID =
0x19, 0x10, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f }; new CallId(0x1112131415161718L, 0x19101a1b1c1d1e1fL);
private static final int HEADER_LIMIT = 10; private static final int HEADER_LIMIT = 10;
private static final int MESSAGE_LIMIT = Integer.MAX_VALUE; private static final int MESSAGE_LIMIT = Integer.MAX_VALUE;
@ -120,12 +127,14 @@ public final class BinaryLogTest {
new SinkWriterImpl(sink, HEADER_LIMIT, MESSAGE_LIMIT); new SinkWriterImpl(sink, HEADER_LIMIT, MESSAGE_LIMIT);
private final SinkWriter mockSinkWriter = mock(SinkWriter.class); private final SinkWriter mockSinkWriter = mock(SinkWriter.class);
private final byte[] message = new byte[100]; private final byte[] message = new byte[100];
private SocketAddress peer;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
nonEmptyMetadata.put(KEY_A, DATA_A); nonEmptyMetadata.put(KEY_A, DATA_A);
nonEmptyMetadata.put(KEY_B, DATA_B); nonEmptyMetadata.put(KEY_B, DATA_B);
nonEmptyMetadata.put(KEY_C, DATA_C); nonEmptyMetadata.put(KEY_C, DATA_C);
peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234);
} }
@Test @Test
@ -297,9 +306,7 @@ public final class BinaryLogTest {
@Test @Test
public void callIdToProto() { public void callIdToProto() {
byte[] callId = new byte[] { CallId callId = new CallId(0x1112131415161718L, 0x19101a1b1c1d1e1fL);
0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18,
0x19, 0x10, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f };
assertEquals( assertEquals(
Uint128 Uint128
.newBuilder() .newBuilder()
@ -307,29 +314,6 @@ public final class BinaryLogTest {
.setLow(0x19101a1b1c1d1e1fL) .setLow(0x19101a1b1c1d1e1fL)
.build(), .build(),
BinaryLog.callIdToProto(callId)); BinaryLog.callIdToProto(callId));
}
@Test
public void callIdToProto_invalid_shorter_len() {
try {
BinaryLog.callIdToProto(new byte[14]);
fail();
} catch (IllegalArgumentException expected) {
assertTrue(
expected.getMessage().startsWith("can only convert from 16 byte input, actual length"));
}
}
@Test
public void callIdToProto_invalid_longer_len() {
try {
BinaryLog.callIdToProto(new byte[18]);
fail();
} catch (IllegalArgumentException expected) {
assertTrue(
expected.getMessage().startsWith("can only convert from 16 byte input, actual length"));
}
} }
@Test @Test
@ -506,34 +490,26 @@ public final class BinaryLogTest {
@Test @Test
public void logSendInitialMetadata_server() throws Exception { public void logSendInitialMetadata_server() throws Exception {
InetAddress address = InetAddress.getByName("127.0.0.1"); sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID);
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress);
verify(sink).write( verify(sink).write(
GrpcLogEntry GrpcLogEntry
.newBuilder() .newBuilder()
.setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.SERVER) .setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinaryLog.callIdToProto(CALL_ID)) .setCallId(BinaryLog.callIdToProto(CALL_ID))
.setPeer(BinaryLog.socketToProto(socketAddress))
.setMetadata(BinaryLog.metadataToProto(nonEmptyMetadata, 10)) .setMetadata(BinaryLog.metadataToProto(nonEmptyMetadata, 10))
.build()); .build());
} }
@Test @Test
public void logSendInitialMetadata_client() throws Exception { public void logSendInitialMetadata_client() throws Exception {
InetAddress address = InetAddress.getByName("127.0.0.1"); sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID);
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress);
verify(sink).write( verify(sink).write(
GrpcLogEntry GrpcLogEntry
.newBuilder() .newBuilder()
.setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.CLIENT) .setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinaryLog.callIdToProto(CALL_ID)) .setCallId(BinaryLog.callIdToProto(CALL_ID))
.setPeer(BinaryLog.socketToProto(socketAddress))
.setMetadata(BinaryLog.metadataToProto(nonEmptyMetadata, 10)) .setMetadata(BinaryLog.metadataToProto(nonEmptyMetadata, 10))
.build()); .build());
} }
@ -706,6 +682,36 @@ public final class BinaryLogTest {
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
} }
@Test
public void getCallIdServer() {
assertSame(emptyCallId, getCallIdForServer(Context.ROOT));
assertSame(
CALL_ID,
getCallIdForServer(
Context.ROOT.withValue(
BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY,
CALL_ID)));
}
@Test
public void getCallIdClient() {
assertSame(emptyCallId, getCallIdForClient(CallOptions.DEFAULT));
assertSame(
CALL_ID,
getCallIdForClient(
CallOptions.DEFAULT.withOption(
BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY,
CALL_ID)));
}
@Test
public void getPeerSocketTest() {
assertSame(DUMMY_SOCKET, getPeerSocket(Attributes.EMPTY));
assertSame(
peer,
getPeerSocket(Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build()));
}
@Test @Test
@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
public void clientInterceptor() throws Exception { public void clientInterceptor() throws Exception {
@ -730,6 +736,11 @@ public final class BinaryLogTest {
public void sendMessage(RequestT message) { public void sendMessage(RequestT message) {
actualRequest.set(message); actualRequest.set(message);
} }
@Override
public Attributes getAttributes() {
return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build();
}
}; };
} }
@ -750,7 +761,11 @@ public final class BinaryLogTest {
.build(); .build();
ClientCall<byte[], byte[]> interceptedCall = ClientCall<byte[], byte[]> interceptedCall =
new BinaryLog(mockSinkWriter) new BinaryLog(mockSinkWriter)
.interceptCall(method, CallOptions.DEFAULT, channel); .interceptCall(
method,
CallOptions.DEFAULT.withOption(
BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, CALL_ID),
channel);
// send initial metadata // send initial metadata
{ {
@ -759,8 +774,7 @@ public final class BinaryLogTest {
verify(mockSinkWriter).logSendInitialMetadata( verify(mockSinkWriter).logSendInitialMetadata(
same(clientInitial), same(clientInitial),
eq(IS_CLIENT), eq(IS_CLIENT),
same(dumyCallId), same(CALL_ID));
same(DUMMY_SOCKET));
verifyNoMoreInteractions(mockSinkWriter); verifyNoMoreInteractions(mockSinkWriter);
assertSame(clientInitial, actualClientInitial.get()); assertSame(clientInitial, actualClientInitial.get());
} }
@ -771,8 +785,8 @@ public final class BinaryLogTest {
interceptedListener.get().onHeaders(serverInitial); interceptedListener.get().onHeaders(serverInitial);
verify(mockSinkWriter).logRecvInitialMetadata(same(serverInitial), verify(mockSinkWriter).logRecvInitialMetadata(same(serverInitial),
eq(IS_CLIENT), eq(IS_CLIENT),
same(dumyCallId), same(CALL_ID),
same(DUMMY_SOCKET)); same(peer));
verifyNoMoreInteractions(mockSinkWriter); verifyNoMoreInteractions(mockSinkWriter);
verify(mockListener).onHeaders(same(serverInitial)); verify(mockListener).onHeaders(same(serverInitial));
} }
@ -786,7 +800,7 @@ public final class BinaryLogTest {
same(request), same(request),
eq(BinaryLog.DUMMY_IS_COMPRESSED), eq(BinaryLog.DUMMY_IS_COMPRESSED),
eq(IS_CLIENT), eq(IS_CLIENT),
same(dumyCallId)); same(CALL_ID));
verifyNoMoreInteractions(mockSinkWriter); verifyNoMoreInteractions(mockSinkWriter);
assertSame(request, actualRequest.get()); assertSame(request, actualRequest.get());
} }
@ -800,7 +814,7 @@ public final class BinaryLogTest {
eq(response), eq(response),
eq(BinaryLog.DUMMY_IS_COMPRESSED), eq(BinaryLog.DUMMY_IS_COMPRESSED),
eq(IS_CLIENT), eq(IS_CLIENT),
same(dumyCallId)); same(CALL_ID));
verifyNoMoreInteractions(mockSinkWriter); verifyNoMoreInteractions(mockSinkWriter);
verify(mockListener).onMessage(same(response)); verify(mockListener).onMessage(same(response));
} }
@ -814,15 +828,27 @@ public final class BinaryLogTest {
verify(mockSinkWriter).logTrailingMetadata( verify(mockSinkWriter).logTrailingMetadata(
same(trailers), same(trailers),
eq(IS_CLIENT), eq(IS_CLIENT),
same(dumyCallId)); same(CALL_ID));
verifyNoMoreInteractions(mockSinkWriter); verifyNoMoreInteractions(mockSinkWriter);
verify(mockListener).onClose(same(status), same(trailers)); verify(mockListener).onClose(same(status), same(trailers));
} }
} }
@Test @Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void serverInterceptor() throws Exception { public void serverInterceptor() throws Exception {
Context.current()
.withValue(BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY, CALL_ID)
.call(new Callable<Void>() {
@Override
public Void call() throws Exception {
serverInterceptor0();
return null;
}
});
}
@SuppressWarnings({"rawtypes", "unchecked"})
private void serverInterceptor0() throws Exception {
final AtomicReference<ServerCall> interceptedCall = final AtomicReference<ServerCall> interceptedCall =
new AtomicReference<ServerCall>(); new AtomicReference<ServerCall>();
ServerCall.Listener<byte[]> capturedListener; ServerCall.Listener<byte[]> capturedListener;
@ -867,6 +893,14 @@ public final class BinaryLogTest {
public MethodDescriptor<byte[], byte[]> getMethodDescriptor() { public MethodDescriptor<byte[], byte[]> getMethodDescriptor() {
return method; return method;
} }
@Override
public Attributes getAttributes() {
return Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer)
.build();
}
}, },
clientInitial, clientInitial,
new ServerCallHandler<byte[], byte[]>() { new ServerCallHandler<byte[], byte[]>() {
@ -881,8 +915,8 @@ public final class BinaryLogTest {
verify(mockSinkWriter).logRecvInitialMetadata( verify(mockSinkWriter).logRecvInitialMetadata(
same(clientInitial), same(clientInitial),
eq(IS_SERVER), eq(IS_SERVER),
same(dumyCallId), same(CALL_ID),
same(DUMMY_SOCKET)); same(peer));
verifyNoMoreInteractions(mockSinkWriter); verifyNoMoreInteractions(mockSinkWriter);
} }
@ -893,8 +927,7 @@ public final class BinaryLogTest {
verify(mockSinkWriter).logSendInitialMetadata( verify(mockSinkWriter).logSendInitialMetadata(
same(serverInital), same(serverInital),
eq(IS_SERVER), eq(IS_SERVER),
same(dumyCallId), same(CALL_ID));
same(DUMMY_SOCKET));
verifyNoMoreInteractions(mockSinkWriter); verifyNoMoreInteractions(mockSinkWriter);
assertSame(serverInital, actualServerInitial.get()); assertSame(serverInital, actualServerInitial.get());
} }
@ -908,7 +941,7 @@ public final class BinaryLogTest {
same(request), same(request),
eq(BinaryLog.DUMMY_IS_COMPRESSED), eq(BinaryLog.DUMMY_IS_COMPRESSED),
eq(IS_SERVER), eq(IS_SERVER),
same(dumyCallId)); same(CALL_ID));
verifyNoMoreInteractions(mockSinkWriter); verifyNoMoreInteractions(mockSinkWriter);
verify(mockListener).onMessage(same(request)); verify(mockListener).onMessage(same(request));
} }
@ -922,7 +955,7 @@ public final class BinaryLogTest {
same(response), same(response),
eq(BinaryLog.DUMMY_IS_COMPRESSED), eq(BinaryLog.DUMMY_IS_COMPRESSED),
eq(IS_SERVER), eq(IS_SERVER),
same(dumyCallId)); same(CALL_ID));
verifyNoMoreInteractions(mockSinkWriter); verifyNoMoreInteractions(mockSinkWriter);
assertSame(response, actualResponse.get()); assertSame(response, actualResponse.get());
} }
@ -935,7 +968,7 @@ public final class BinaryLogTest {
verify(mockSinkWriter).logTrailingMetadata( verify(mockSinkWriter).logTrailingMetadata(
same(trailers), same(trailers),
eq(IS_SERVER), eq(IS_SERVER),
same(dumyCallId)); same(CALL_ID));
verifyNoMoreInteractions(mockSinkWriter); verifyNoMoreInteractions(mockSinkWriter);
assertSame(status, actualStatus.get()); assertSame(status, actualStatus.get());
assertSame(trailers, actualTrailers.get()); assertSame(trailers, actualTrailers.get());