services,core: simplify CallId generation (#4365)

BinaryLog.java is the class that is responsible for intercepting
client and server calls. It now requires a CallId to be passed
in. The BinaryLogProviderImpl is responsible for generating a
CallId and passing it in.
This commit is contained in:
zpencer 2018-04-27 18:32:36 -07:00 committed by GitHub
parent 9ada30b25d
commit acfb3b9851
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 268 additions and 277 deletions

View File

@ -422,9 +422,6 @@ public abstract class AbstractManagedChannelImplBuilder
Tracing.getPropagationComponent().getBinaryFormat());
effectiveInterceptors.add(0, censusTracing.getClientInterceptor());
}
if (binlogProvider != null) {
effectiveInterceptors.add(0, binlogProvider.getClientCallIdSetter());
}
return effectiveInterceptors;
}

View File

@ -266,9 +266,6 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
Tracing.getPropagationComponent().getBinaryFormat());
tracerFactories.add(censusTracing.getServerTracerFactory());
}
if (binlogProvider != null) {
tracerFactories.add(binlogProvider.getServerCallIdSetter());
}
tracerFactories.addAll(streamTracerFactories);
return tracerFactories;
}

View File

@ -22,21 +22,16 @@ import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Context;
import io.grpc.Internal;
import io.grpc.InternalClientInterceptors;
import io.grpc.InternalServerInterceptors;
import io.grpc.InternalServiceProviders;
import io.grpc.InternalServiceProviders.PriorityAccessor;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerStreamTracer;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracing;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
@ -47,12 +42,6 @@ import java.util.logging.Logger;
import javax.annotation.Nullable;
public abstract class BinaryLogProvider implements Closeable {
// TODO(zpencer): move to services and make package private
@Internal
public static final Context.Key<CallId> SERVER_CALL_ID_CONTEXT_KEY
= Context.key("binarylog-context-key");
// TODO(zpencer): move to services and make package private when this class is moved
@Internal
public static final CallOptions.Key<CallId> CLIENT_CALL_ID_CALLOPTION_KEY
= CallOptions.Key.of("binarylog-calloptions-key", null);
@VisibleForTesting
@ -116,7 +105,6 @@ public abstract class BinaryLogProvider implements Closeable {
return ServerMethodDefinition.create(binMethod, binlogHandler);
}
/**
* Returns a {@link ServerInterceptor} for binary logging. gRPC is free to cache the interceptor,
* so the interceptor must be reusable across calls. At runtime, the request and response
@ -135,7 +123,8 @@ public abstract class BinaryLogProvider implements Closeable {
*/
// TODO(zpencer): ensure the interceptor properly handles retries and hedging
@Nullable
protected abstract ClientInterceptor getClientInterceptor(String fullMethodName);
protected abstract ClientInterceptor getClientInterceptor(
String fullMethodName, CallOptions callOptions);
@Override
public void close() throws IOException {
@ -143,62 +132,6 @@ public abstract class BinaryLogProvider implements Closeable {
// TODO(zpencer): make BinaryLogProvider provide a BinaryLog, and this method belongs there
}
private static final ServerStreamTracer SERVER_CALLID_SETTER = new ServerStreamTracer() {
@Override
public Context filterContext(Context context) {
Context toRestore = context.attach();
try {
Span span = Tracing.getTracer().getCurrentSpan();
if (span == null) {
return context;
}
return context.withValue(SERVER_CALL_ID_CONTEXT_KEY, CallId.fromCensusSpan(span));
} finally {
context.detach(toRestore);
}
}
};
private static final ServerStreamTracer.Factory SERVER_CALLID_SETTER_FACTORY
= new ServerStreamTracer.Factory() {
@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
return SERVER_CALLID_SETTER;
}
};
/**
* Returns a {@link ServerStreamTracer.Factory} that copies the call ID to the {@link Context}
* as {@code SERVER_CALL_ID_CONTEXT_KEY}.
*/
public ServerStreamTracer.Factory getServerCallIdSetter() {
return SERVER_CALLID_SETTER_FACTORY;
}
private static final ClientInterceptor CLIENT_CALLID_SETTER = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
Span span = Tracing.getTracer().getCurrentSpan();
if (span == null) {
return next.newCall(method, callOptions);
}
return next.newCall(
method,
callOptions.withOption(CLIENT_CALL_ID_CALLOPTION_KEY, CallId.fromCensusSpan(span)));
}
};
/**
* Returns a {@link ClientInterceptor} that copies the call ID to the {@link CallOptions}
* as {@code CALL_CLIENT_CALL_ID_CALLOPTION_KEY}.
*/
public ClientInterceptor getClientCallIdSetter() {
return CLIENT_CALLID_SETTER;
}
/**
* A priority, from 0 to 10 that this provider should be used, taking the current environment into
* consideration. 5 should be considered the default, and then tweaked based on environment
@ -250,7 +183,8 @@ public abstract class BinaryLogProvider implements Closeable {
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
ClientInterceptor binlogInterceptor = getClientInterceptor(method.getFullMethodName());
ClientInterceptor binlogInterceptor = getClientInterceptor(
method.getFullMethodName(), callOptions);
if (binlogInterceptor == null) {
return next.newCall(method, callOptions);
} else {
@ -280,7 +214,7 @@ public abstract class BinaryLogProvider implements Closeable {
this.lo = lo;
}
static CallId fromCensusSpan(Span span) {
public static CallId fromCensusSpan(Span span) {
return new CallId(0, ByteBuffer.wrap(span.getContext().getSpanId().getBytes()).getLong());
}
}

View File

@ -379,7 +379,12 @@ final class CensusTracingModule {
// for the direct access and BlankSpan when Tracer API is used.
final ClientCallTracer tracerFactory = newClientCallTracer(CONTEXT_SPAN_KEY.get(), method);
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
next.newCall(
method,
callOptions.withStreamTracerFactory(tracerFactory)
.withOption(
BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY,
BinaryLogProvider.CallId.fromCensusSpan(tracerFactory.span)));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {

View File

@ -17,17 +17,18 @@
package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
@ -41,12 +42,14 @@ import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerStreamTracer;
import io.grpc.StringMarshaller;
import io.grpc.internal.BinaryLogProvider.CallId;
import io.grpc.internal.testing.StatsTestUtils.MockableSpan;
import io.grpc.testing.TestMethodDescriptors;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.Span;
import io.opencensus.trace.SpanBuilder;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.propagation.BinaryFormat;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@ -54,7 +57,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -96,7 +98,8 @@ public class BinaryLogProviderTest {
}
@Override
public ClientInterceptor getClientInterceptor(String fullMethodName) {
public ClientInterceptor getClientInterceptor(
String fullMethodName, CallOptions callOptions) {
return new TestBinaryLogClientInterceptor();
}
@ -308,6 +311,39 @@ public class BinaryLogProviderTest {
.isEqualTo(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong());
}
@Test
public void censusTracerSetsCallId() throws Exception {
Tracer tracer = mock(Tracer.class);
SpanBuilder builder = mock(SpanBuilder.class);
when(tracer.spanBuilderWithExplicitParent(any(String.class), any(Span.class)))
.thenReturn(builder);
when(builder.setRecordEvents(any(Boolean.class))).thenReturn(builder);
MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0));
when(builder.startSpan()).thenReturn(mockableSpan);
final SettableFuture<CallOptions> options = SettableFuture.create();
Channel c = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
options.set(callOptions);
return null;
}
@Override
public String authority() {
return null;
}
};
new CensusTracingModule(tracer, mock(BinaryFormat.class))
.getClientInterceptor()
.interceptCall(TestMethodDescriptors.voidMethod(), CallOptions.DEFAULT, c);
CallId callId = options.get().getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY);
assertThat(callId.hi).isEqualTo(0);
assertThat(callId.lo)
.isEqualTo(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong());
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void onServerMessageHelper(ServerCall.Listener listener, Object request) {
listener.onMessage(request);
@ -330,56 +366,6 @@ public class BinaryLogProviderTest {
return methodDef.getServerCallHandler().startCall(serverCall, new Metadata());
}
@Test
public void serverCallIdSetter() {
ServerStreamTracer tracer = binlogProvider
.getServerCallIdSetter()
.newServerStreamTracer("service/method", new Metadata());
MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0));
Context context = Context.current().withValue(CONTEXT_SPAN_KEY, mockableSpan);
Context filtered = tracer.filterContext(context);
CallId callId = BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY.get(filtered);
assertThat(callId.hi).isEqualTo(0);
assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong())
.isEqualTo(callId.lo);
}
@Test
public void clientCallIdSetter() throws Exception {
final MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0));
Tracing.getTracer().withSpan(mockableSpan, new Callable<Void>() {
@Override
public Void call() throws Exception {
final SettableFuture<CallOptions> future = SettableFuture.create();
Channel channel = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
future.set(callOptions);
return null;
}
@Override
public String authority() {
return null;
}
};
binlogProvider.getClientCallIdSetter().interceptCall(
TestMethodDescriptors.voidMethod(),
CallOptions.DEFAULT,
channel);
CallOptions callOptions = future.get();
CallId callId = callOptions
.getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY);
assertThat(callId.hi).isEqualTo(0);
assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong())
.isEqualTo(callId.lo);
return null;
}
}).call();
}
private final class TestBinaryLogClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(

View File

@ -2289,7 +2289,8 @@ public class ManagedChannelImplTest {
}
@Override
public ClientInterceptor getClientInterceptor(String fullMethodName) {
public ClientInterceptor getClientInterceptor(
String fullMethodName, CallOptions callOptions) {
return new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(

View File

@ -48,6 +48,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientInterceptor;
import io.grpc.Compressor;
import io.grpc.Context;
@ -1252,7 +1253,8 @@ public class ServerImplTest {
@Nullable
@Override
public ClientInterceptor getClientInterceptor(String fullMethodName) {
public ClientInterceptor getClientInterceptor(
String fullMethodName, CallOptions callOptions) {
return null;
}

View File

@ -28,7 +28,6 @@ import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
@ -51,7 +50,6 @@ import io.grpc.binarylog.MetadataEntry;
import io.grpc.binarylog.Peer;
import io.grpc.binarylog.Peer.PeerType;
import io.grpc.binarylog.Uint128;
import io.grpc.internal.BinaryLogProvider;
import io.grpc.internal.BinaryLogProvider.CallId;
import java.net.Inet4Address;
import java.net.Inet6Address;
@ -73,7 +71,7 @@ import javax.annotation.concurrent.ThreadSafe;
* A binary log class that is configured for a specific {@link MethodDescriptor}.
*/
@ThreadSafe
final class BinaryLog implements ServerInterceptor, ClientInterceptor {
final class BinaryLog {
private static final Logger logger = Logger.getLogger(BinaryLog.class.getName());
private static final int IP_PORT_BYTES = 2;
private static final int IP_PORT_UPPER_MASK = 0xff00;
@ -248,22 +246,6 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
abstract int getMaxMessageBytes();
}
static CallId getCallIdForServer(Context context) {
CallId callId = BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY.get(context);
if (callId == null) {
return emptyCallId;
}
return callId;
}
static CallId getCallIdForClient(CallOptions callOptions) {
CallId callId = callOptions.getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY);
if (callId == null) {
return emptyCallId;
}
return callId;
}
static SocketAddress getPeerSocket(Attributes streamAttributes) {
SocketAddress peer = streamAttributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (peer == null) {
@ -272,97 +254,105 @@ final class BinaryLog implements ServerInterceptor, ClientInterceptor {
return peer;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
final CallId callId = getCallIdForClient(callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
public ClientInterceptor getClientInterceptor(final CallId callId) {
return new ClientInterceptor() {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
writer.logSendInitialMetadata(headers, CLIENT, callId);
ClientCall.Listener<RespT> wListener =
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onMessage(RespT message) {
writer.logInboundMessage(
method.getResponseMarshaller(),
message,
DUMMY_IS_COMPRESSED,
CLIENT,
callId);
super.onMessage(message);
}
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
writer.logSendInitialMetadata(headers, CLIENT, callId);
ClientCall.Listener<RespT> wListener =
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onMessage(RespT message) {
writer.logInboundMessage(
method.getResponseMarshaller(),
message,
DUMMY_IS_COMPRESSED,
CLIENT,
callId);
super.onMessage(message);
}
@Override
public void onHeaders(Metadata headers) {
SocketAddress peer = getPeerSocket(getAttributes());
writer.logRecvInitialMetadata(headers, CLIENT, callId, peer);
super.onHeaders(headers);
}
@Override
public void onHeaders(Metadata headers) {
SocketAddress peer = getPeerSocket(getAttributes());
writer.logRecvInitialMetadata(headers, CLIENT, callId, peer);
super.onHeaders(headers);
}
@Override
public void onClose(Status status, Metadata trailers) {
writer.logTrailingMetadata(trailers, CLIENT, callId);
super.onClose(status, trailers);
}
};
super.start(wListener, headers);
}
@Override
public void onClose(Status status, Metadata trailers) {
writer.logTrailingMetadata(trailers, CLIENT, callId);
super.onClose(status, trailers);
}
};
super.start(wListener, headers);
}
@Override
public void sendMessage(ReqT message) {
writer.logOutboundMessage(
method.getRequestMarshaller(),
message,
DUMMY_IS_COMPRESSED,
CLIENT,
callId);
super.sendMessage(message);
@Override
public void sendMessage(ReqT message) {
writer.logOutboundMessage(
method.getRequestMarshaller(),
message,
DUMMY_IS_COMPRESSED,
CLIENT,
callId);
super.sendMessage(message);
}
};
}
};
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
final ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
final CallId callId = getCallIdForServer(Context.current());
SocketAddress peer = getPeerSocket(call.getAttributes());
writer.logRecvInitialMetadata(headers, SERVER, callId, peer);
ServerCall<ReqT, RespT> wCall = new SimpleForwardingServerCall<ReqT, RespT>(call) {
public ServerInterceptor getServerInterceptor(final CallId callId) {
return new ServerInterceptor() {
@Override
public void sendMessage(RespT message) {
writer.logOutboundMessage(
call.getMethodDescriptor().getResponseMarshaller(),
message,
DUMMY_IS_COMPRESSED,
SERVER,
callId);
super.sendMessage(message);
}
public <ReqT, RespT> Listener<ReqT> interceptCall(
final ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
SocketAddress peer = getPeerSocket(call.getAttributes());
writer.logRecvInitialMetadata(headers, SERVER, callId, peer);
ServerCall<ReqT, RespT> wCall = new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendMessage(RespT message) {
writer.logOutboundMessage(
call.getMethodDescriptor().getResponseMarshaller(),
message,
DUMMY_IS_COMPRESSED,
SERVER,
callId);
super.sendMessage(message);
}
@Override
public void sendHeaders(Metadata headers) {
writer.logSendInitialMetadata(headers, SERVER, callId);
super.sendHeaders(headers);
}
@Override
public void sendHeaders(Metadata headers) {
writer.logSendInitialMetadata(headers, SERVER, callId);
super.sendHeaders(headers);
}
@Override
public void close(Status status, Metadata trailers) {
writer.logTrailingMetadata(trailers, SERVER, callId);
super.close(status, trailers);
}
};
@Override
public void close(Status status, Metadata trailers) {
writer.logTrailingMetadata(trailers, SERVER, callId);
super.close(status, trailers);
}
};
return new SimpleForwardingServerCallListener<ReqT>(next.startCall(wCall, headers)) {
@Override
public void onMessage(ReqT message) {
writer.logInboundMessage(
call.getMethodDescriptor().getRequestMarshaller(),
message,
DUMMY_IS_COMPRESSED,
SERVER,
callId);
super.onMessage(message);
return new SimpleForwardingServerCallListener<ReqT>(next.startCall(wCall, headers)) {
@Override
public void onMessage(ReqT message) {
writer.logInboundMessage(
call.getMethodDescriptor().getRequestMarshaller(),
message,
DUMMY_IS_COMPRESSED,
SERVER,
callId);
super.onMessage(message);
}
};
}
};
}

View File

@ -16,9 +16,11 @@
package io.grpc.services;
import io.grpc.CallOptions;
import io.grpc.ClientInterceptor;
import io.grpc.ServerInterceptor;
import io.grpc.internal.BinaryLogProvider;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@ -29,6 +31,7 @@ import javax.annotation.Nullable;
public class BinaryLogProviderImpl extends BinaryLogProvider {
private static final Logger logger = Logger.getLogger(BinaryLogProviderImpl.class.getName());
private final BinaryLog.Factory factory;
private final AtomicLong counter = new AtomicLong();
public BinaryLogProviderImpl() {
this(BinaryLogSinkProvider.provider(), System.getenv("GRPC_BINARY_LOG_CONFIG"));
@ -49,13 +52,14 @@ public class BinaryLogProviderImpl extends BinaryLogProvider {
@Nullable
@Override
public ServerInterceptor getServerInterceptor(String fullMethodName) {
return null;
return factory.getLog(fullMethodName).getServerInterceptor(getServerCallId());
}
@Nullable
@Override
public ClientInterceptor getClientInterceptor(String fullMethodName) {
return null;
public ClientInterceptor getClientInterceptor(
String fullMethodName, CallOptions callOptions) {
return factory.getLog(fullMethodName).getClientInterceptor(getClientCallId(callOptions));
}
@Override
@ -67,4 +71,12 @@ public class BinaryLogProviderImpl extends BinaryLogProvider {
protected boolean isAvailable() {
return factory != null;
}
protected CallId getServerCallId() {
return new CallId(0, counter.getAndIncrement());
}
protected CallId getClientCallId(CallOptions options) {
return new CallId(0, counter.getAndIncrement());
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.services.internal;
import io.grpc.CallOptions;
import io.grpc.internal.BinaryLogProvider;
import io.grpc.services.BinaryLogProviderImpl;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracing;
public final class CensusBinaryLogProvider extends BinaryLogProviderImpl {
@Override
protected int priority() {
return 6;
}
@Override
protected CallId getServerCallId() {
Span currentSpan = Tracing.getTracer().getCurrentSpan();
return CallId.fromCensusSpan(currentSpan);
}
@Override
protected CallId getClientCallId(CallOptions options) {
return options.getOption(BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY);
}
}

View File

@ -18,9 +18,6 @@ package io.grpc.services;
import static io.grpc.internal.BinaryLogProvider.BYTEARRAY_MARSHALLER;
import static io.grpc.services.BinaryLog.DUMMY_SOCKET;
import static io.grpc.services.BinaryLog.emptyCallId;
import static io.grpc.services.BinaryLog.getCallIdForClient;
import static io.grpc.services.BinaryLog.getCallIdForServer;
import static io.grpc.services.BinaryLog.getPeerSocket;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@ -37,7 +34,6 @@ import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
@ -65,7 +61,6 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
@ -682,28 +677,6 @@ public final class BinaryLogTest {
verifyNoMoreInteractions(sink);
}
@Test
public void getCallIdServer() {
assertSame(emptyCallId, getCallIdForServer(Context.ROOT));
assertSame(
CALL_ID,
getCallIdForServer(
Context.ROOT.withValue(
BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY,
CALL_ID)));
}
@Test
public void getCallIdClient() {
assertSame(emptyCallId, getCallIdForClient(CallOptions.DEFAULT));
assertSame(
CALL_ID,
getCallIdForClient(
CallOptions.DEFAULT.withOption(
BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY,
CALL_ID)));
}
@Test
public void getPeerSocketTest() {
assertSame(DUMMY_SOCKET, getPeerSocket(Attributes.EMPTY));
@ -761,6 +734,7 @@ public final class BinaryLogTest {
.build();
ClientCall<byte[], byte[]> interceptedCall =
new BinaryLog(mockSinkWriter)
.getClientInterceptor(CALL_ID)
.interceptCall(
method,
CallOptions.DEFAULT.withOption(
@ -835,20 +809,8 @@ public final class BinaryLogTest {
}
@Test
public void serverInterceptor() throws Exception {
Context.current()
.withValue(BinaryLogProvider.SERVER_CALL_ID_CONTEXT_KEY, CALL_ID)
.call(new Callable<Void>() {
@Override
public Void call() throws Exception {
serverInterceptor0();
return null;
}
});
}
@SuppressWarnings({"rawtypes", "unchecked"})
private void serverInterceptor0() throws Exception {
public void serverInterceptor() throws Exception {
final AtomicReference<ServerCall> interceptedCall =
new AtomicReference<ServerCall>();
ServerCall.Listener<byte[]> capturedListener;
@ -871,6 +833,7 @@ public final class BinaryLogTest {
.build();
capturedListener =
new BinaryLog(mockSinkWriter)
.getServerInterceptor(CALL_ID)
.interceptCall(
new NoopServerCall<byte[], byte[]>() {
@Override

View File

@ -0,0 +1,63 @@
/*
* Copyright 2018, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.services.internal;
import static com.google.common.truth.Truth.assertThat;
import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY;
import io.grpc.CallOptions;
import io.grpc.Context;
import io.grpc.internal.BinaryLogProvider;
import io.grpc.internal.BinaryLogProvider.CallId;
import io.grpc.internal.testing.StatsTestUtils.MockableSpan;
import java.nio.ByteBuffer;
import java.util.Random;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for {@link CensusBinaryLogProvider}.
*/
@RunWith(JUnit4.class)
public class CensusBinaryLogProviderTest {
@Test
public void serverCallIdFromCensus() {
final MockableSpan mockableSpan = MockableSpan.generateRandomSpan(new Random(0));
Context context = Context.current().withValue(CONTEXT_SPAN_KEY, mockableSpan);
context.run(new Runnable() {
@Override
public void run() {
CallId callId = new CensusBinaryLogProvider().getServerCallId();
assertThat(callId.hi).isEqualTo(0);
assertThat(ByteBuffer.wrap(mockableSpan.getContext().getSpanId().getBytes()).getLong())
.isEqualTo(callId.lo);
}
});
}
@Test
public void clientCallId() throws Exception {
CallId expected = new CallId(1234, 5677);
CallId actual = new CensusBinaryLogProvider()
.getClientCallId(
CallOptions.DEFAULT.withOption(
BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY,
expected));
assertThat(actual).isEqualTo(expected);
}
}