diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 186692557a..74e06062ad 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -416,6 +416,9 @@ public abstract class AbstractManagedChannelImplBuilder Tracing.getPropagationComponent().getBinaryFormat()); effectiveInterceptors.add(0, censusTracing.getClientInterceptor()); } + if (binlogProvider != null) { + effectiveInterceptors.add(0, binlogProvider.getClientCallIdSetter()); + } return effectiveInterceptors; } diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index 967b346549..855678d71a 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -266,6 +266,9 @@ public abstract class AbstractServerImplBuilder SERVER_CALL_ID_CONTEXT_KEY + = Context.key("binarylog-context-key"); + // TODO(zpencer): move to services and make package private when this class is moved + @Internal + public static final CallOptions.Key CLIENT_CALL_ID_CALLOPTION_KEY + = CallOptions.Key.of("binarylog-calloptions-key", null); @VisibleForTesting public static final Marshaller BYTEARRAY_MARSHALLER = new ByteArrayMarshaller(); @@ -128,6 +143,62 @@ public abstract class BinaryLogProvider implements Closeable { // TODO(zpencer): make BinaryLogProvider provide a BinaryLog, and this method belongs there } + private static final ServerStreamTracer SERVER_CALLID_SETTER = new ServerStreamTracer() { + @Override + public Context filterContext(Context context) { + Context toRestore = context.attach(); + try { + Span span = Tracing.getTracer().getCurrentSpan(); + if (span == null) { + return context; + } + + return context.withValue(SERVER_CALL_ID_CONTEXT_KEY, CallId.fromCensusSpan(span)); + } finally { + context.detach(toRestore); + } + } + }; + + private static final ServerStreamTracer.Factory SERVER_CALLID_SETTER_FACTORY + = new ServerStreamTracer.Factory() { + @Override + public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) { + return SERVER_CALLID_SETTER; + } + }; + + /** + * Returns a {@link ServerStreamTracer.Factory} that copies the call ID to the {@link Context} + * as {@code SERVER_CALL_ID_CONTEXT_KEY}. + */ + public ServerStreamTracer.Factory getServerCallIdSetter() { + return SERVER_CALLID_SETTER_FACTORY; + } + + private static final ClientInterceptor CLIENT_CALLID_SETTER = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + Span span = Tracing.getTracer().getCurrentSpan(); + if (span == null) { + return next.newCall(method, callOptions); + } + + return next.newCall( + method, + callOptions.withOption(CLIENT_CALL_ID_CALLOPTION_KEY, CallId.fromCensusSpan(span))); + } + }; + + /** + * Returns a {@link ClientInterceptor} that copies the call ID to the {@link CallOptions} + * as {@code CALL_CLIENT_CALL_ID_CALLOPTION_KEY}. + */ + public ClientInterceptor getClientCallIdSetter() { + return CLIENT_CALLID_SETTER; + } + /** * A priority, from 0 to 10 that this provider should be used, taking the current environment into * consideration. 5 should be considered the default, and then tweaked based on environment @@ -192,4 +263,25 @@ public abstract class BinaryLogProvider implements Closeable { } } } + + /** + * A CallId is two byte[] arrays both of size 8 that uniquely identifies the RPC. Users are + * free to use the byte arrays however they see fit. + */ + public static final class CallId { + public final long hi; + public final long lo; + + /** + * Creates an instance. + */ + public CallId(long hi, long lo) { + this.hi = hi; + this.lo = lo; + } + + static CallId fromCensusSpan(Span span) { + return new CallId(0, ByteBuffer.wrap(span.getContext().getSpanId().getBytes()).getLong()); + } + } } diff --git a/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java b/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java index 1a18f2c96d..9140e5806d 100644 --- a/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java +++ b/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java @@ -17,14 +17,17 @@ package io.grpc.internal; import static com.google.common.truth.Truth.assertThat; +import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import com.google.common.util.concurrent.SettableFuture; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.Context; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; @@ -38,12 +41,20 @@ import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.ServerMethodDefinition; +import io.grpc.ServerStreamTracer; import io.grpc.StringMarshaller; +import io.grpc.internal.BinaryLogProvider.CallId; +import io.grpc.internal.testing.StatsTestUtils.MockableSpan; +import io.grpc.testing.TestMethodDescriptors; +import io.opencensus.trace.Tracing; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.junit.runner.RunWith; @@ -288,6 +299,15 @@ public class BinaryLogProviderTest { (int) method.parseResponse(new ByteArrayInputStream((byte[]) serializedResp.get(0)))); } + @Test + public void callIdFromSpan() { + MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0)); + CallId callId = CallId.fromCensusSpan(mockableSpan); + assertThat(callId.hi).isEqualTo(0); + assertThat(callId.lo) + .isEqualTo(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong()); + } + @SuppressWarnings({"rawtypes", "unchecked"}) private static void onServerMessageHelper(ServerCall.Listener listener, Object request) { listener.onMessage(request); @@ -310,6 +330,56 @@ public class BinaryLogProviderTest { return methodDef.getServerCallHandler().startCall(serverCall, new Metadata()); } + @Test + public void serverCallIdSetter() { + ServerStreamTracer tracer = binlogProvider + .getServerCallIdSetter() + .newServerStreamTracer("service/method", new Metadata()); + MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0)); + Context context = Context.current().withValue(CONTEXT_SPAN_KEY, mockableSpan); + Context filtered = tracer.filterContext(context); + CallId callId = BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY.get(filtered); + assertThat(callId.hi).isEqualTo(0); + assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong()) + .isEqualTo(callId.lo); + } + + @Test + public void clientCallIdSetter() throws Exception { + final MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0)); + Tracing.getTracer().withSpan(mockableSpan, new Callable() { + @Override + public Void call() throws Exception { + final SettableFuture future = SettableFuture.create(); + Channel channel = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions) { + future.set(callOptions); + return null; + } + + @Override + public String authority() { + return null; + } + }; + binlogProvider.getClientCallIdSetter().interceptCall( + TestMethodDescriptors.voidMethod(), + CallOptions.DEFAULT, + channel); + CallOptions callOptions = future.get(); + CallId callId = callOptions + .getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY); + assertThat(callId.hi).isEqualTo(0); + assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong()) + .isEqualTo(callId.lo); + return null; + } + }).call(); + } + private final class TestBinaryLogClientInterceptor implements ClientInterceptor { @Override public ClientCall interceptCall( diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 2e3dcaf5c6..8da3c28204 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -370,7 +370,6 @@ class NettyServer implements InternalServer, WithLogId { } } }); - return ret; } diff --git a/services/src/main/java/io/grpc/services/BinaryLog.java b/services/src/main/java/io/grpc/services/BinaryLog.java index c57c534da5..a414dda22f 100644 --- a/services/src/main/java/io/grpc/services/BinaryLog.java +++ b/services/src/main/java/io/grpc/services/BinaryLog.java @@ -23,14 +23,17 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.primitives.Bytes; import com.google.protobuf.ByteString; +import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.Context; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; +import io.grpc.Grpc; import io.grpc.InternalMetadata; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -48,12 +51,13 @@ import io.grpc.binarylog.MetadataEntry; import io.grpc.binarylog.Peer; import io.grpc.binarylog.Peer.PeerType; import io.grpc.binarylog.Uint128; +import io.grpc.internal.BinaryLogProvider; +import io.grpc.internal.BinaryLogProvider.CallId; import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Collections; import java.util.HashMap; @@ -77,9 +81,8 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { private static final boolean SERVER = true; private static final boolean CLIENT = false; - // TODO(zpencer): extract these fields from call and stop using dummy values @VisibleForTesting - static final byte[] dumyCallId = new byte[16]; + static final CallId emptyCallId = new CallId(0, 0); @VisibleForTesting static final SocketAddress DUMMY_SOCKET = new DummySocketAddress(); @VisibleForTesting @@ -106,14 +109,12 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { } @Override - void logSendInitialMetadata( - Metadata metadata, boolean isServer, byte[] callId, SocketAddress peerSocket) { + void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId) { GrpcLogEntry entry = GrpcLogEntry .newBuilder() .setType(Type.SEND_INITIAL_METADATA) .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) .setCallId(callIdToProto(callId)) - .setPeer(socketToProto(peerSocket)) .setMetadata(metadataToProto(metadata, maxHeaderBytes)) .build(); sink.write(entry); @@ -121,7 +122,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { @Override void logRecvInitialMetadata( - Metadata metadata, boolean isServer, byte[] callId, SocketAddress peerSocket) { + Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) { GrpcLogEntry entry = GrpcLogEntry .newBuilder() .setType(Type.RECV_INITIAL_METADATA) @@ -134,7 +135,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { } @Override - void logTrailingMetadata(Metadata metadata, boolean isServer, byte[] callId) { + void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId) { GrpcLogEntry entry = GrpcLogEntry .newBuilder() .setType(isServer ? Type.SEND_TRAILING_METADATA : Type.RECV_TRAILING_METADATA) @@ -151,7 +152,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { T message, boolean compressed, boolean isServer, - byte[] callId) { + CallId callId) { if (marshaller != BYTEARRAY_MARSHALLER) { throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller"); } @@ -172,7 +173,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { T message, boolean compressed, boolean isServer, - byte[] callId) { + CallId callId) { if (marshaller != BYTEARRAY_MARSHALLER) { throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller"); } @@ -203,21 +204,20 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { * Logs the sending of initial metadata. This method logs the appropriate number of bytes * as determined by the binary logging configuration. */ - abstract void logSendInitialMetadata( - Metadata metadata, boolean isServer, byte[] callId, SocketAddress peerSocket); + abstract void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId); /** * Logs the receiving of initial metadata. This method logs the appropriate number of bytes * as determined by the binary logging configuration. */ abstract void logRecvInitialMetadata( - Metadata metadata, boolean isServer, byte[] callId, SocketAddress peerSocket); + Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket); /** * Logs the trailing metadata. This method logs the appropriate number of bytes * as determined by the binary logging configuration. */ - abstract void logTrailingMetadata(Metadata metadata, boolean isServer, byte[] callId); + abstract void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId); /** * Logs the outbound message. This method logs the appropriate number of bytes from @@ -226,7 +226,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { * This method takes ownership of {@code message}. */ abstract void logOutboundMessage( - Marshaller marshaller, T message, boolean compressed, boolean isServer, byte[] callId); + Marshaller marshaller, T message, boolean compressed, boolean isServer, CallId callId); /** * Logs the inbound message. This method logs the appropriate number of bytes from @@ -235,7 +235,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { * This method takes ownership of {@code message}. */ abstract void logInboundMessage( - Marshaller marshaller, T message, boolean compressed, boolean isServer, byte[] callId); + Marshaller marshaller, T message, boolean compressed, boolean isServer, CallId callId); /** * Returns the number bytes of the header this writer will log, according to configuration. @@ -275,13 +275,38 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { return DEFAULT_FACTORY.getLog(fullMethodName); } + static CallId getCallIdForServer(Context context) { + CallId callId = BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY.get(context); + if (callId == null) { + return emptyCallId; + } + return callId; + } + + static CallId getCallIdForClient(CallOptions callOptions) { + CallId callId = callOptions.getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY); + if (callId == null) { + return emptyCallId; + } + return callId; + } + + static SocketAddress getPeerSocket(Attributes streamAttributes) { + SocketAddress peer = streamAttributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); + if (peer == null) { + return DUMMY_SOCKET; + } + return peer; + } + @Override public ClientCall interceptCall( final MethodDescriptor method, CallOptions callOptions, Channel next) { + final CallId callId = getCallIdForClient(callOptions); return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { @Override public void start(Listener responseListener, Metadata headers) { - writer.logSendInitialMetadata(headers, CLIENT, dumyCallId, DUMMY_SOCKET); + writer.logSendInitialMetadata(headers, CLIENT, callId); ClientCall.Listener wListener = new SimpleForwardingClientCallListener(responseListener) { @Override @@ -291,19 +316,20 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { message, DUMMY_IS_COMPRESSED, CLIENT, - dumyCallId); + callId); super.onMessage(message); } @Override public void onHeaders(Metadata headers) { - writer.logRecvInitialMetadata(headers, CLIENT, dumyCallId, DUMMY_SOCKET); + SocketAddress peer = getPeerSocket(getAttributes()); + writer.logRecvInitialMetadata(headers, CLIENT, callId, peer); super.onHeaders(headers); } @Override public void onClose(Status status, Metadata trailers) { - writer.logTrailingMetadata(trailers, CLIENT, dumyCallId); + writer.logTrailingMetadata(trailers, CLIENT, callId); super.onClose(status, trailers); } }; @@ -317,7 +343,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { message, DUMMY_IS_COMPRESSED, CLIENT, - dumyCallId); + callId); super.sendMessage(message); } }; @@ -326,7 +352,9 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { @Override public Listener interceptCall( final ServerCall call, Metadata headers, ServerCallHandler next) { - writer.logRecvInitialMetadata(headers, SERVER, dumyCallId, DUMMY_SOCKET); + final CallId callId = getCallIdForServer(Context.current()); + SocketAddress peer = getPeerSocket(call.getAttributes()); + writer.logRecvInitialMetadata(headers, SERVER, callId, peer); ServerCall wCall = new SimpleForwardingServerCall(call) { @Override public void sendMessage(RespT message) { @@ -335,19 +363,19 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { message, DUMMY_IS_COMPRESSED, SERVER, - dumyCallId); + callId); super.sendMessage(message); } @Override public void sendHeaders(Metadata headers) { - writer.logSendInitialMetadata(headers, SERVER, dumyCallId, DUMMY_SOCKET); + writer.logSendInitialMetadata(headers, SERVER, callId); super.sendHeaders(headers); } @Override public void close(Status status, Metadata trailers) { - writer.logTrailingMetadata(trailers, SERVER, dumyCallId); + writer.logTrailingMetadata(trailers, SERVER, callId); super.close(status, trailers); } }; @@ -360,7 +388,7 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { message, DUMMY_IS_COMPRESSED, SERVER, - dumyCallId); + callId); super.onMessage(message); } }; @@ -522,19 +550,15 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { } /** - * Returns a {@link Uint128} by interpreting the first 8 bytes as the high int64 and the second - * 8 bytes as the low int64. + * Returns a {@link Uint128} from a CallId. */ - // TODO(zpencer): verify int64 representation with other gRPC languages - static Uint128 callIdToProto(byte[] bytes) { - Preconditions.checkNotNull(bytes); - Preconditions.checkArgument( - bytes.length == 16, - String.format("can only convert from 16 byte input, actual length = %d", bytes.length)); - ByteBuffer bb = ByteBuffer.wrap(bytes); - long high = bb.getLong(); - long low = bb.getLong(); - return Uint128.newBuilder().setHigh(high).setLow(low).build(); + static Uint128 callIdToProto(CallId callId) { + Preconditions.checkNotNull(callId); + return Uint128 + .newBuilder() + .setHigh(callId.hi) + .setLow(callId.lo) + .build(); } @VisibleForTesting diff --git a/services/src/test/java/io/grpc/services/BinaryLogTest.java b/services/src/test/java/io/grpc/services/BinaryLogTest.java index 7438347ebc..01c3b27ffa 100644 --- a/services/src/test/java/io/grpc/services/BinaryLogTest.java +++ b/services/src/test/java/io/grpc/services/BinaryLogTest.java @@ -18,12 +18,13 @@ package io.grpc.services; import static io.grpc.internal.BinaryLogProvider.BYTEARRAY_MARSHALLER; import static io.grpc.services.BinaryLog.DUMMY_SOCKET; -import static io.grpc.services.BinaryLog.dumyCallId; +import static io.grpc.services.BinaryLog.emptyCallId; +import static io.grpc.services.BinaryLog.getCallIdForClient; +import static io.grpc.services.BinaryLog.getCallIdForServer; +import static io.grpc.services.BinaryLog.getPeerSocket; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; @@ -32,9 +33,12 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.primitives.Bytes; import com.google.protobuf.ByteString; +import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; +import io.grpc.Context; +import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; @@ -47,6 +51,8 @@ import io.grpc.binarylog.MetadataEntry; import io.grpc.binarylog.Peer; import io.grpc.binarylog.Peer.PeerType; import io.grpc.binarylog.Uint128; +import io.grpc.internal.BinaryLogProvider; +import io.grpc.internal.BinaryLogProvider.CallId; import io.grpc.internal.NoopClientCall; import io.grpc.internal.NoopServerCall; import io.grpc.services.BinaryLog.FactoryImpl; @@ -59,6 +65,7 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Arrays; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.Test; @@ -108,9 +115,9 @@ public final class BinaryLogTest { private static final boolean IS_CLIENT = false; private static final boolean IS_COMPRESSED = true; private static final boolean IS_UNCOMPRESSED = false; - private static final byte[] CALL_ID = new byte[] { - 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, - 0x19, 0x10, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f }; + // TODO(zpencer): rename this to callId, since byte[] is mutable + private static final CallId CALL_ID = + new CallId(0x1112131415161718L, 0x19101a1b1c1d1e1fL); private static final int HEADER_LIMIT = 10; private static final int MESSAGE_LIMIT = Integer.MAX_VALUE; @@ -120,12 +127,14 @@ public final class BinaryLogTest { new SinkWriterImpl(sink, HEADER_LIMIT, MESSAGE_LIMIT); private final SinkWriter mockSinkWriter = mock(SinkWriter.class); private final byte[] message = new byte[100]; + private SocketAddress peer; @Before public void setUp() throws Exception { nonEmptyMetadata.put(KEY_A, DATA_A); nonEmptyMetadata.put(KEY_B, DATA_B); nonEmptyMetadata.put(KEY_C, DATA_C); + peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234); } @Test @@ -297,9 +306,7 @@ public final class BinaryLogTest { @Test public void callIdToProto() { - byte[] callId = new byte[] { - 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, - 0x19, 0x10, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f }; + CallId callId = new CallId(0x1112131415161718L, 0x19101a1b1c1d1e1fL); assertEquals( Uint128 .newBuilder() @@ -307,29 +314,6 @@ public final class BinaryLogTest { .setLow(0x19101a1b1c1d1e1fL) .build(), BinaryLog.callIdToProto(callId)); - - } - - @Test - public void callIdToProto_invalid_shorter_len() { - try { - BinaryLog.callIdToProto(new byte[14]); - fail(); - } catch (IllegalArgumentException expected) { - assertTrue( - expected.getMessage().startsWith("can only convert from 16 byte input, actual length")); - } - } - - @Test - public void callIdToProto_invalid_longer_len() { - try { - BinaryLog.callIdToProto(new byte[18]); - fail(); - } catch (IllegalArgumentException expected) { - assertTrue( - expected.getMessage().startsWith("can only convert from 16 byte input, actual length")); - } } @Test @@ -506,34 +490,26 @@ public final class BinaryLogTest { @Test public void logSendInitialMetadata_server() throws Exception { - InetAddress address = InetAddress.getByName("127.0.0.1"); - int port = 12345; - InetSocketAddress socketAddress = new InetSocketAddress(address, port); - sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress); + sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID); verify(sink).write( GrpcLogEntry .newBuilder() .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setPeer(BinaryLog.socketToProto(socketAddress)) .setMetadata(BinaryLog.metadataToProto(nonEmptyMetadata, 10)) .build()); } @Test public void logSendInitialMetadata_client() throws Exception { - InetAddress address = InetAddress.getByName("127.0.0.1"); - int port = 12345; - InetSocketAddress socketAddress = new InetSocketAddress(address, port); - sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress); + sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID); verify(sink).write( GrpcLogEntry .newBuilder() .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinaryLog.callIdToProto(CALL_ID)) - .setPeer(BinaryLog.socketToProto(socketAddress)) .setMetadata(BinaryLog.metadataToProto(nonEmptyMetadata, 10)) .build()); } @@ -706,6 +682,36 @@ public final class BinaryLogTest { verifyNoMoreInteractions(sink); } + @Test + public void getCallIdServer() { + assertSame(emptyCallId, getCallIdForServer(Context.ROOT)); + assertSame( + CALL_ID, + getCallIdForServer( + Context.ROOT.withValue( + BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY, + CALL_ID))); + } + + @Test + public void getCallIdClient() { + assertSame(emptyCallId, getCallIdForClient(CallOptions.DEFAULT)); + assertSame( + CALL_ID, + getCallIdForClient( + CallOptions.DEFAULT.withOption( + BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, + CALL_ID))); + } + + @Test + public void getPeerSocketTest() { + assertSame(DUMMY_SOCKET, getPeerSocket(Attributes.EMPTY)); + assertSame( + peer, + getPeerSocket(Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build())); + } + @Test @SuppressWarnings({"rawtypes", "unchecked"}) public void clientInterceptor() throws Exception { @@ -730,6 +736,11 @@ public final class BinaryLogTest { public void sendMessage(RequestT message) { actualRequest.set(message); } + + @Override + public Attributes getAttributes() { + return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build(); + } }; } @@ -750,7 +761,11 @@ public final class BinaryLogTest { .build(); ClientCall interceptedCall = new BinaryLog(mockSinkWriter) - .interceptCall(method, CallOptions.DEFAULT, channel); + .interceptCall( + method, + CallOptions.DEFAULT.withOption( + BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, CALL_ID), + channel); // send initial metadata { @@ -759,8 +774,7 @@ public final class BinaryLogTest { verify(mockSinkWriter).logSendInitialMetadata( same(clientInitial), eq(IS_CLIENT), - same(dumyCallId), - same(DUMMY_SOCKET)); + same(CALL_ID)); verifyNoMoreInteractions(mockSinkWriter); assertSame(clientInitial, actualClientInitial.get()); } @@ -771,8 +785,8 @@ public final class BinaryLogTest { interceptedListener.get().onHeaders(serverInitial); verify(mockSinkWriter).logRecvInitialMetadata(same(serverInitial), eq(IS_CLIENT), - same(dumyCallId), - same(DUMMY_SOCKET)); + same(CALL_ID), + same(peer)); verifyNoMoreInteractions(mockSinkWriter); verify(mockListener).onHeaders(same(serverInitial)); } @@ -786,7 +800,7 @@ public final class BinaryLogTest { same(request), eq(BinaryLog.DUMMY_IS_COMPRESSED), eq(IS_CLIENT), - same(dumyCallId)); + same(CALL_ID)); verifyNoMoreInteractions(mockSinkWriter); assertSame(request, actualRequest.get()); } @@ -800,7 +814,7 @@ public final class BinaryLogTest { eq(response), eq(BinaryLog.DUMMY_IS_COMPRESSED), eq(IS_CLIENT), - same(dumyCallId)); + same(CALL_ID)); verifyNoMoreInteractions(mockSinkWriter); verify(mockListener).onMessage(same(response)); } @@ -814,15 +828,27 @@ public final class BinaryLogTest { verify(mockSinkWriter).logTrailingMetadata( same(trailers), eq(IS_CLIENT), - same(dumyCallId)); + same(CALL_ID)); verifyNoMoreInteractions(mockSinkWriter); verify(mockListener).onClose(same(status), same(trailers)); } } @Test - @SuppressWarnings({"rawtypes", "unchecked"}) public void serverInterceptor() throws Exception { + Context.current() + .withValue(BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY, CALL_ID) + .call(new Callable() { + @Override + public Void call() throws Exception { + serverInterceptor0(); + return null; + } + }); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private void serverInterceptor0() throws Exception { final AtomicReference interceptedCall = new AtomicReference(); ServerCall.Listener capturedListener; @@ -867,6 +893,14 @@ public final class BinaryLogTest { public MethodDescriptor getMethodDescriptor() { return method; } + + @Override + public Attributes getAttributes() { + return Attributes + .newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer) + .build(); + } }, clientInitial, new ServerCallHandler() { @@ -881,8 +915,8 @@ public final class BinaryLogTest { verify(mockSinkWriter).logRecvInitialMetadata( same(clientInitial), eq(IS_SERVER), - same(dumyCallId), - same(DUMMY_SOCKET)); + same(CALL_ID), + same(peer)); verifyNoMoreInteractions(mockSinkWriter); } @@ -893,8 +927,7 @@ public final class BinaryLogTest { verify(mockSinkWriter).logSendInitialMetadata( same(serverInital), eq(IS_SERVER), - same(dumyCallId), - same(DUMMY_SOCKET)); + same(CALL_ID)); verifyNoMoreInteractions(mockSinkWriter); assertSame(serverInital, actualServerInitial.get()); } @@ -908,7 +941,7 @@ public final class BinaryLogTest { same(request), eq(BinaryLog.DUMMY_IS_COMPRESSED), eq(IS_SERVER), - same(dumyCallId)); + same(CALL_ID)); verifyNoMoreInteractions(mockSinkWriter); verify(mockListener).onMessage(same(request)); } @@ -922,7 +955,7 @@ public final class BinaryLogTest { same(response), eq(BinaryLog.DUMMY_IS_COMPRESSED), eq(IS_SERVER), - same(dumyCallId)); + same(CALL_ID)); verifyNoMoreInteractions(mockSinkWriter); assertSame(response, actualResponse.get()); } @@ -935,7 +968,7 @@ public final class BinaryLogTest { verify(mockSinkWriter).logTrailingMetadata( same(trailers), eq(IS_SERVER), - same(dumyCallId)); + same(CALL_ID)); verifyNoMoreInteractions(mockSinkWriter); assertSame(status, actualStatus.get()); assertSame(trailers, actualTrailers.get());