From acfb3b9851ee207838934d42182f2f20809e5e03 Mon Sep 17 00:00:00 2001 From: zpencer Date: Fri, 27 Apr 2018 18:32:36 -0700 Subject: [PATCH] services,core: simplify CallId generation (#4365) BinaryLog.java is the class that is responsible for intercepting client and server calls. It now requires a CallId to be passed in. The BinaryLogProviderImpl is responsible for generating a CallId and passing it in. --- .../AbstractManagedChannelImplBuilder.java | 3 - .../internal/AbstractServerImplBuilder.java | 3 - .../io/grpc/internal/BinaryLogProvider.java | 76 +------ .../io/grpc/internal/CensusTracingModule.java | 7 +- .../grpc/internal/BinaryLogProviderTest.java | 98 ++++----- .../grpc/internal/ManagedChannelImplTest.java | 3 +- .../java/io/grpc/internal/ServerImplTest.java | 4 +- .../main/java/io/grpc/services/BinaryLog.java | 186 +++++++++--------- .../grpc/services/BinaryLogProviderImpl.java | 18 +- .../internal/CensusBinaryLogProvider.java | 41 ++++ .../java/io/grpc/services/BinaryLogTest.java | 43 +--- .../internal/CensusBinaryLogProviderTest.java | 63 ++++++ 12 files changed, 268 insertions(+), 277 deletions(-) create mode 100644 services/src/main/java/io/grpc/services/internal/CensusBinaryLogProvider.java create mode 100644 services/src/test/java/io/grpc/services/internal/CensusBinaryLogProviderTest.java diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 140b2488a2..0386a2de3c 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -422,9 +422,6 @@ public abstract class AbstractManagedChannelImplBuilder Tracing.getPropagationComponent().getBinaryFormat()); effectiveInterceptors.add(0, censusTracing.getClientInterceptor()); } - if (binlogProvider != null) { - effectiveInterceptors.add(0, binlogProvider.getClientCallIdSetter()); - } return effectiveInterceptors; } diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index 855678d71a..967b346549 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -266,9 +266,6 @@ public abstract class AbstractServerImplBuilder SERVER_CALL_ID_CONTEXT_KEY - = Context.key("binarylog-context-key"); - // TODO(zpencer): move to services and make package private when this class is moved - @Internal public static final CallOptions.Key CLIENT_CALL_ID_CALLOPTION_KEY = CallOptions.Key.of("binarylog-calloptions-key", null); @VisibleForTesting @@ -116,7 +105,6 @@ public abstract class BinaryLogProvider implements Closeable { return ServerMethodDefinition.create(binMethod, binlogHandler); } - /** * Returns a {@link ServerInterceptor} for binary logging. gRPC is free to cache the interceptor, * so the interceptor must be reusable across calls. At runtime, the request and response @@ -135,7 +123,8 @@ public abstract class BinaryLogProvider implements Closeable { */ // TODO(zpencer): ensure the interceptor properly handles retries and hedging @Nullable - protected abstract ClientInterceptor getClientInterceptor(String fullMethodName); + protected abstract ClientInterceptor getClientInterceptor( + String fullMethodName, CallOptions callOptions); @Override public void close() throws IOException { @@ -143,62 +132,6 @@ public abstract class BinaryLogProvider implements Closeable { // TODO(zpencer): make BinaryLogProvider provide a BinaryLog, and this method belongs there } - private static final ServerStreamTracer SERVER_CALLID_SETTER = new ServerStreamTracer() { - @Override - public Context filterContext(Context context) { - Context toRestore = context.attach(); - try { - Span span = Tracing.getTracer().getCurrentSpan(); - if (span == null) { - return context; - } - - return context.withValue(SERVER_CALL_ID_CONTEXT_KEY, CallId.fromCensusSpan(span)); - } finally { - context.detach(toRestore); - } - } - }; - - private static final ServerStreamTracer.Factory SERVER_CALLID_SETTER_FACTORY - = new ServerStreamTracer.Factory() { - @Override - public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) { - return SERVER_CALLID_SETTER; - } - }; - - /** - * Returns a {@link ServerStreamTracer.Factory} that copies the call ID to the {@link Context} - * as {@code SERVER_CALL_ID_CONTEXT_KEY}. - */ - public ServerStreamTracer.Factory getServerCallIdSetter() { - return SERVER_CALLID_SETTER_FACTORY; - } - - private static final ClientInterceptor CLIENT_CALLID_SETTER = new ClientInterceptor() { - @Override - public ClientCall interceptCall( - MethodDescriptor method, CallOptions callOptions, Channel next) { - Span span = Tracing.getTracer().getCurrentSpan(); - if (span == null) { - return next.newCall(method, callOptions); - } - - return next.newCall( - method, - callOptions.withOption(CLIENT_CALL_ID_CALLOPTION_KEY, CallId.fromCensusSpan(span))); - } - }; - - /** - * Returns a {@link ClientInterceptor} that copies the call ID to the {@link CallOptions} - * as {@code CALL_CLIENT_CALL_ID_CALLOPTION_KEY}. - */ - public ClientInterceptor getClientCallIdSetter() { - return CLIENT_CALLID_SETTER; - } - /** * A priority, from 0 to 10 that this provider should be used, taking the current environment into * consideration. 5 should be considered the default, and then tweaked based on environment @@ -250,7 +183,8 @@ public abstract class BinaryLogProvider implements Closeable { MethodDescriptor method, CallOptions callOptions, Channel next) { - ClientInterceptor binlogInterceptor = getClientInterceptor(method.getFullMethodName()); + ClientInterceptor binlogInterceptor = getClientInterceptor( + method.getFullMethodName(), callOptions); if (binlogInterceptor == null) { return next.newCall(method, callOptions); } else { @@ -280,7 +214,7 @@ public abstract class BinaryLogProvider implements Closeable { this.lo = lo; } - static CallId fromCensusSpan(Span span) { + public static CallId fromCensusSpan(Span span) { return new CallId(0, ByteBuffer.wrap(span.getContext().getSpanId().getBytes()).getLong()); } } diff --git a/core/src/main/java/io/grpc/internal/CensusTracingModule.java b/core/src/main/java/io/grpc/internal/CensusTracingModule.java index a40c377a05..fca03ba682 100644 --- a/core/src/main/java/io/grpc/internal/CensusTracingModule.java +++ b/core/src/main/java/io/grpc/internal/CensusTracingModule.java @@ -379,7 +379,12 @@ final class CensusTracingModule { // for the direct access and BlankSpan when Tracer API is used. final ClientCallTracer tracerFactory = newClientCallTracer(CONTEXT_SPAN_KEY.get(), method); ClientCall call = - next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory)); + next.newCall( + method, + callOptions.withStreamTracerFactory(tracerFactory) + .withOption( + BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, + BinaryLogProvider.CallId.fromCensusSpan(tracerFactory.span))); return new SimpleForwardingClientCall(call) { @Override public void start(Listener responseListener, Metadata headers) { diff --git a/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java b/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java index 9140e5806d..af1b5e3369 100644 --- a/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java +++ b/core/src/test/java/io/grpc/internal/BinaryLogProviderTest.java @@ -17,17 +17,18 @@ package io.grpc.internal; import static com.google.common.truth.Truth.assertThat; -import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.common.util.concurrent.SettableFuture; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; -import io.grpc.Context; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; @@ -41,12 +42,14 @@ import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.ServerMethodDefinition; -import io.grpc.ServerStreamTracer; import io.grpc.StringMarshaller; import io.grpc.internal.BinaryLogProvider.CallId; import io.grpc.internal.testing.StatsTestUtils.MockableSpan; import io.grpc.testing.TestMethodDescriptors; -import io.opencensus.trace.Tracing; +import io.opencensus.trace.Span; +import io.opencensus.trace.SpanBuilder; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.propagation.BinaryFormat; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -54,7 +57,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Random; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.junit.runner.RunWith; @@ -96,7 +98,8 @@ public class BinaryLogProviderTest { } @Override - public ClientInterceptor getClientInterceptor(String fullMethodName) { + public ClientInterceptor getClientInterceptor( + String fullMethodName, CallOptions callOptions) { return new TestBinaryLogClientInterceptor(); } @@ -308,6 +311,39 @@ public class BinaryLogProviderTest { .isEqualTo(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong()); } + @Test + public void censusTracerSetsCallId() throws Exception { + Tracer tracer = mock(Tracer.class); + SpanBuilder builder = mock(SpanBuilder.class); + when(tracer.spanBuilderWithExplicitParent(any(String.class), any(Span.class))) + .thenReturn(builder); + when(builder.setRecordEvents(any(Boolean.class))).thenReturn(builder); + MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0)); + when(builder.startSpan()).thenReturn(mockableSpan); + + final SettableFuture options = SettableFuture.create(); + Channel c = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + options.set(callOptions); + return null; + } + + @Override + public String authority() { + return null; + } + }; + new CensusTracingModule(tracer, mock(BinaryFormat.class)) + .getClientInterceptor() + .interceptCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT, c); + CallId callId = options.get().getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY); + assertThat(callId.hi).isEqualTo(0); + assertThat(callId.lo) + .isEqualTo(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong()); + } + @SuppressWarnings({"rawtypes", "unchecked"}) private static void onServerMessageHelper(ServerCall.Listener listener, Object request) { listener.onMessage(request); @@ -330,56 +366,6 @@ public class BinaryLogProviderTest { return methodDef.getServerCallHandler().startCall(serverCall, new Metadata()); } - @Test - public void serverCallIdSetter() { - ServerStreamTracer tracer = binlogProvider - .getServerCallIdSetter() - .newServerStreamTracer("service/method", new Metadata()); - MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0)); - Context context = Context.current().withValue(CONTEXT_SPAN_KEY, mockableSpan); - Context filtered = tracer.filterContext(context); - CallId callId = BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY.get(filtered); - assertThat(callId.hi).isEqualTo(0); - assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong()) - .isEqualTo(callId.lo); - } - - @Test - public void clientCallIdSetter() throws Exception { - final MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0)); - Tracing.getTracer().withSpan(mockableSpan, new Callable() { - @Override - public Void call() throws Exception { - final SettableFuture future = SettableFuture.create(); - Channel channel = new Channel() { - @Override - public ClientCall newCall( - MethodDescriptor methodDescriptor, - CallOptions callOptions) { - future.set(callOptions); - return null; - } - - @Override - public String authority() { - return null; - } - }; - binlogProvider.getClientCallIdSetter().interceptCall( - TestMethodDescriptors.voidMethod(), - CallOptions.DEFAULT, - channel); - CallOptions callOptions = future.get(); - CallId callId = callOptions - .getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY); - assertThat(callId.hi).isEqualTo(0); - assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong()) - .isEqualTo(callId.lo); - return null; - } - }).call(); - } - private final class TestBinaryLogClientInterceptor implements ClientInterceptor { @Override public ClientCall interceptCall( diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 44c3b010ef..71a27ea1d2 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -2289,7 +2289,8 @@ public class ManagedChannelImplTest { } @Override - public ClientInterceptor getClientInterceptor(String fullMethodName) { + public ClientInterceptor getClientInterceptor( + String fullMethodName, CallOptions callOptions) { return new ClientInterceptor() { @Override public ClientCall interceptCall( diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index ac0c3dedf9..6be63e1605 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -48,6 +48,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; +import io.grpc.CallOptions; import io.grpc.ClientInterceptor; import io.grpc.Compressor; import io.grpc.Context; @@ -1252,7 +1253,8 @@ public class ServerImplTest { @Nullable @Override - public ClientInterceptor getClientInterceptor(String fullMethodName) { + public ClientInterceptor getClientInterceptor( + String fullMethodName, CallOptions callOptions) { return null; } diff --git a/services/src/main/java/io/grpc/services/BinaryLog.java b/services/src/main/java/io/grpc/services/BinaryLog.java index f51da4eb90..421a5f1e13 100644 --- a/services/src/main/java/io/grpc/services/BinaryLog.java +++ b/services/src/main/java/io/grpc/services/BinaryLog.java @@ -28,7 +28,6 @@ import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; -import io.grpc.Context; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; @@ -51,7 +50,6 @@ import io.grpc.binarylog.MetadataEntry; import io.grpc.binarylog.Peer; import io.grpc.binarylog.Peer.PeerType; import io.grpc.binarylog.Uint128; -import io.grpc.internal.BinaryLogProvider; import io.grpc.internal.BinaryLogProvider.CallId; import java.net.Inet4Address; import java.net.Inet6Address; @@ -73,7 +71,7 @@ import javax.annotation.concurrent.ThreadSafe; * A binary log class that is configured for a specific {@link MethodDescriptor}. */ @ThreadSafe -final class BinaryLog implements ServerInterceptor, ClientInterceptor { +final class BinaryLog { private static final Logger logger = Logger.getLogger(BinaryLog.class.getName()); private static final int IP_PORT_BYTES = 2; private static final int IP_PORT_UPPER_MASK = 0xff00; @@ -248,22 +246,6 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { abstract int getMaxMessageBytes(); } - static CallId getCallIdForServer(Context context) { - CallId callId = BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY.get(context); - if (callId == null) { - return emptyCallId; - } - return callId; - } - - static CallId getCallIdForClient(CallOptions callOptions) { - CallId callId = callOptions.getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY); - if (callId == null) { - return emptyCallId; - } - return callId; - } - static SocketAddress getPeerSocket(Attributes streamAttributes) { SocketAddress peer = streamAttributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); if (peer == null) { @@ -272,97 +254,105 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor { return peer; } - @Override - public ClientCall interceptCall( - final MethodDescriptor method, CallOptions callOptions, Channel next) { - final CallId callId = getCallIdForClient(callOptions); - return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + public ClientInterceptor getClientInterceptor(final CallId callId) { + return new ClientInterceptor() { @Override - public void start(Listener responseListener, Metadata headers) { - writer.logSendInitialMetadata(headers, CLIENT, callId); - ClientCall.Listener wListener = - new SimpleForwardingClientCallListener(responseListener) { - @Override - public void onMessage(RespT message) { - writer.logInboundMessage( - method.getResponseMarshaller(), - message, - DUMMY_IS_COMPRESSED, - CLIENT, - callId); - super.onMessage(message); - } + public ClientCall interceptCall( + final MethodDescriptor method, CallOptions callOptions, Channel next) { + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + writer.logSendInitialMetadata(headers, CLIENT, callId); + ClientCall.Listener wListener = + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onMessage(RespT message) { + writer.logInboundMessage( + method.getResponseMarshaller(), + message, + DUMMY_IS_COMPRESSED, + CLIENT, + callId); + super.onMessage(message); + } - @Override - public void onHeaders(Metadata headers) { - SocketAddress peer = getPeerSocket(getAttributes()); - writer.logRecvInitialMetadata(headers, CLIENT, callId, peer); - super.onHeaders(headers); - } + @Override + public void onHeaders(Metadata headers) { + SocketAddress peer = getPeerSocket(getAttributes()); + writer.logRecvInitialMetadata(headers, CLIENT, callId, peer); + super.onHeaders(headers); + } - @Override - public void onClose(Status status, Metadata trailers) { - writer.logTrailingMetadata(trailers, CLIENT, callId); - super.onClose(status, trailers); - } - }; - super.start(wListener, headers); - } + @Override + public void onClose(Status status, Metadata trailers) { + writer.logTrailingMetadata(trailers, CLIENT, callId); + super.onClose(status, trailers); + } + }; + super.start(wListener, headers); + } - @Override - public void sendMessage(ReqT message) { - writer.logOutboundMessage( - method.getRequestMarshaller(), - message, - DUMMY_IS_COMPRESSED, - CLIENT, - callId); - super.sendMessage(message); + @Override + public void sendMessage(ReqT message) { + writer.logOutboundMessage( + method.getRequestMarshaller(), + message, + DUMMY_IS_COMPRESSED, + CLIENT, + callId); + super.sendMessage(message); + } + }; } }; } - @Override - public Listener interceptCall( - final ServerCall call, Metadata headers, ServerCallHandler next) { - final CallId callId = getCallIdForServer(Context.current()); - SocketAddress peer = getPeerSocket(call.getAttributes()); - writer.logRecvInitialMetadata(headers, SERVER, callId, peer); - ServerCall wCall = new SimpleForwardingServerCall(call) { + public ServerInterceptor getServerInterceptor(final CallId callId) { + return new ServerInterceptor() { @Override - public void sendMessage(RespT message) { - writer.logOutboundMessage( - call.getMethodDescriptor().getResponseMarshaller(), - message, - DUMMY_IS_COMPRESSED, - SERVER, - callId); - super.sendMessage(message); - } + public Listener interceptCall( + final ServerCall call, + Metadata headers, + ServerCallHandler next) { + SocketAddress peer = getPeerSocket(call.getAttributes()); + writer.logRecvInitialMetadata(headers, SERVER, callId, peer); + ServerCall wCall = new SimpleForwardingServerCall(call) { + @Override + public void sendMessage(RespT message) { + writer.logOutboundMessage( + call.getMethodDescriptor().getResponseMarshaller(), + message, + DUMMY_IS_COMPRESSED, + SERVER, + callId); + super.sendMessage(message); + } - @Override - public void sendHeaders(Metadata headers) { - writer.logSendInitialMetadata(headers, SERVER, callId); - super.sendHeaders(headers); - } + @Override + public void sendHeaders(Metadata headers) { + writer.logSendInitialMetadata(headers, SERVER, callId); + super.sendHeaders(headers); + } - @Override - public void close(Status status, Metadata trailers) { - writer.logTrailingMetadata(trailers, SERVER, callId); - super.close(status, trailers); - } - }; + @Override + public void close(Status status, Metadata trailers) { + writer.logTrailingMetadata(trailers, SERVER, callId); + super.close(status, trailers); + } + }; - return new SimpleForwardingServerCallListener(next.startCall(wCall, headers)) { - @Override - public void onMessage(ReqT message) { - writer.logInboundMessage( - call.getMethodDescriptor().getRequestMarshaller(), - message, - DUMMY_IS_COMPRESSED, - SERVER, - callId); - super.onMessage(message); + return new SimpleForwardingServerCallListener(next.startCall(wCall, headers)) { + @Override + public void onMessage(ReqT message) { + writer.logInboundMessage( + call.getMethodDescriptor().getRequestMarshaller(), + message, + DUMMY_IS_COMPRESSED, + SERVER, + callId); + super.onMessage(message); + } + }; } }; } diff --git a/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java b/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java index 8833c561ba..0a59579716 100644 --- a/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java +++ b/services/src/main/java/io/grpc/services/BinaryLogProviderImpl.java @@ -16,9 +16,11 @@ package io.grpc.services; +import io.grpc.CallOptions; import io.grpc.ClientInterceptor; import io.grpc.ServerInterceptor; import io.grpc.internal.BinaryLogProvider; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -29,6 +31,7 @@ import javax.annotation.Nullable; public class BinaryLogProviderImpl extends BinaryLogProvider { private static final Logger logger = Logger.getLogger(BinaryLogProviderImpl.class.getName()); private final BinaryLog.Factory factory; + private final AtomicLong counter = new AtomicLong(); public BinaryLogProviderImpl() { this(BinaryLogSinkProvider.provider(), System.getenv("GRPC_BINARY_LOG_CONFIG")); @@ -49,13 +52,14 @@ public class BinaryLogProviderImpl extends BinaryLogProvider { @Nullable @Override public ServerInterceptor getServerInterceptor(String fullMethodName) { - return null; + return factory.getLog(fullMethodName).getServerInterceptor(getServerCallId()); } @Nullable @Override - public ClientInterceptor getClientInterceptor(String fullMethodName) { - return null; + public ClientInterceptor getClientInterceptor( + String fullMethodName, CallOptions callOptions) { + return factory.getLog(fullMethodName).getClientInterceptor(getClientCallId(callOptions)); } @Override @@ -67,4 +71,12 @@ public class BinaryLogProviderImpl extends BinaryLogProvider { protected boolean isAvailable() { return factory != null; } + + protected CallId getServerCallId() { + return new CallId(0, counter.getAndIncrement()); + } + + protected CallId getClientCallId(CallOptions options) { + return new CallId(0, counter.getAndIncrement()); + } } diff --git a/services/src/main/java/io/grpc/services/internal/CensusBinaryLogProvider.java b/services/src/main/java/io/grpc/services/internal/CensusBinaryLogProvider.java new file mode 100644 index 0000000000..bac009bceb --- /dev/null +++ b/services/src/main/java/io/grpc/services/internal/CensusBinaryLogProvider.java @@ -0,0 +1,41 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services.internal; + +import io.grpc.CallOptions; +import io.grpc.internal.BinaryLogProvider; +import io.grpc.services.BinaryLogProviderImpl; +import io.opencensus.trace.Span; +import io.opencensus.trace.Tracing; + +public final class CensusBinaryLogProvider extends BinaryLogProviderImpl { + @Override + protected int priority() { + return 6; + } + + @Override + protected CallId getServerCallId() { + Span currentSpan = Tracing.getTracer().getCurrentSpan(); + return CallId.fromCensusSpan(currentSpan); + } + + @Override + protected CallId getClientCallId(CallOptions options) { + return options.getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY); + } +} diff --git a/services/src/test/java/io/grpc/services/BinaryLogTest.java b/services/src/test/java/io/grpc/services/BinaryLogTest.java index 01c3b27ffa..235f520735 100644 --- a/services/src/test/java/io/grpc/services/BinaryLogTest.java +++ b/services/src/test/java/io/grpc/services/BinaryLogTest.java @@ -18,9 +18,6 @@ package io.grpc.services; import static io.grpc.internal.BinaryLogProvider.BYTEARRAY_MARSHALLER; import static io.grpc.services.BinaryLog.DUMMY_SOCKET; -import static io.grpc.services.BinaryLog.emptyCallId; -import static io.grpc.services.BinaryLog.getCallIdForClient; -import static io.grpc.services.BinaryLog.getCallIdForServer; import static io.grpc.services.BinaryLog.getPeerSocket; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -37,7 +34,6 @@ import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; -import io.grpc.Context; import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -65,7 +61,6 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Arrays; -import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.Test; @@ -682,28 +677,6 @@ public final class BinaryLogTest { verifyNoMoreInteractions(sink); } - @Test - public void getCallIdServer() { - assertSame(emptyCallId, getCallIdForServer(Context.ROOT)); - assertSame( - CALL_ID, - getCallIdForServer( - Context.ROOT.withValue( - BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY, - CALL_ID))); - } - - @Test - public void getCallIdClient() { - assertSame(emptyCallId, getCallIdForClient(CallOptions.DEFAULT)); - assertSame( - CALL_ID, - getCallIdForClient( - CallOptions.DEFAULT.withOption( - BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, - CALL_ID))); - } - @Test public void getPeerSocketTest() { assertSame(DUMMY_SOCKET, getPeerSocket(Attributes.EMPTY)); @@ -761,6 +734,7 @@ public final class BinaryLogTest { .build(); ClientCall interceptedCall = new BinaryLog(mockSinkWriter) + .getClientInterceptor(CALL_ID) .interceptCall( method, CallOptions.DEFAULT.withOption( @@ -835,20 +809,8 @@ public final class BinaryLogTest { } @Test - public void serverInterceptor() throws Exception { - Context.current() - .withValue(BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY, CALL_ID) - .call(new Callable() { - @Override - public Void call() throws Exception { - serverInterceptor0(); - return null; - } - }); - } - @SuppressWarnings({"rawtypes", "unchecked"}) - private void serverInterceptor0() throws Exception { + public void serverInterceptor() throws Exception { final AtomicReference interceptedCall = new AtomicReference(); ServerCall.Listener capturedListener; @@ -871,6 +833,7 @@ public final class BinaryLogTest { .build(); capturedListener = new BinaryLog(mockSinkWriter) + .getServerInterceptor(CALL_ID) .interceptCall( new NoopServerCall() { @Override diff --git a/services/src/test/java/io/grpc/services/internal/CensusBinaryLogProviderTest.java b/services/src/test/java/io/grpc/services/internal/CensusBinaryLogProviderTest.java new file mode 100644 index 0000000000..58bfcba630 --- /dev/null +++ b/services/src/test/java/io/grpc/services/internal/CensusBinaryLogProviderTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services.internal; + +import static com.google.common.truth.Truth.assertThat; +import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY; + +import io.grpc.CallOptions; +import io.grpc.Context; +import io.grpc.internal.BinaryLogProvider; +import io.grpc.internal.BinaryLogProvider.CallId; +import io.grpc.internal.testing.StatsTestUtils.MockableSpan; +import java.nio.ByteBuffer; +import java.util.Random; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link CensusBinaryLogProvider}. + */ +@RunWith(JUnit4.class) +public class CensusBinaryLogProviderTest { + @Test + public void serverCallIdFromCensus() { + final MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0)); + Context context = Context.current().withValue(CONTEXT_SPAN_KEY, mockableSpan); + context.run(new Runnable() { + @Override + public void run() { + CallId callId = new CensusBinaryLogProvider().getServerCallId(); + assertThat(callId.hi).isEqualTo(0); + assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong()) + .isEqualTo(callId.lo); + } + }); + } + + @Test + public void clientCallId() throws Exception { + CallId expected = new CallId(1234, 5677); + CallId actual = new CensusBinaryLogProvider() + .getClientCallId( + CallOptions.DEFAULT.withOption( + BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, + expected)); + assertThat(actual).isEqualTo(expected); + } +}