core: Move negotiationLogger from channel attributes to GrpcHttp2ConnectionHandler (#7933)

This commit is contained in:
yifeizhuang 2021-03-10 16:43:56 -08:00 committed by GitHub
parent 2ca28a2903
commit 528ef63c58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 183 additions and 114 deletions

View File

@ -12,6 +12,8 @@ description = "gRPC: ALTS"
sourceCompatibility = 1.7
targetCompatibility = 1.7
evaluationDependsOn(project(':grpc-core').path)
dependencies {
api project(':grpc-core')
implementation project(':grpc-auth'),
@ -29,6 +31,7 @@ dependencies {
shadow project(path: ':grpc-netty-shaded', configuration: 'shadow')
testImplementation project(':grpc-testing'),
project(':grpc-core').sourceSets.test.output,
project(':grpc-testing-proto'),
libraries.guava,
libraries.junit,

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.protobuf.Any;
import io.grpc.Attributes;
import io.grpc.Channel;
import io.grpc.ChannelLogger;
import io.grpc.Grpc;
import io.grpc.InternalChannelz.OtherSecurity;
import io.grpc.InternalChannelz.Security;
@ -112,11 +113,14 @@ public final class AltsProtocolNegotiator {
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
TsiHandshaker handshaker = handshakerFactory.newHandshaker(grpcHandler.getAuthority());
ChannelLogger negotiationLogger = grpcHandler.getNegotiationLogger();
NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker);
ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler);
ChannelHandler thh = new TsiHandshakeHandler(
gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore);
ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(thh);
gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore,
negotiationLogger);
ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(thh,
negotiationLogger);
return wuah;
}
@ -166,12 +170,15 @@ public final class AltsProtocolNegotiator {
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
ChannelLogger negotiationLogger = grpcHandler.getNegotiationLogger();
TsiHandshaker handshaker = handshakerFactory.newHandshaker(/* authority= */ null);
NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker);
ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler);
ChannelHandler thh = new TsiHandshakeHandler(
gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore);
ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(thh);
gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore,
negotiationLogger);
ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(thh,
negotiationLogger);
return wuah;
}
@ -250,6 +257,7 @@ public final class AltsProtocolNegotiator {
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
ChannelHandler gnh = InternalProtocolNegotiators.grpcNegotiationHandler(grpcHandler);
ChannelLogger negotiationLogger = grpcHandler.getNegotiationLogger();
ChannelHandler securityHandler;
boolean isXdsDirectPath = false;
if (clusterNameAttrKey != null) {
@ -264,12 +272,14 @@ public final class AltsProtocolNegotiator {
TsiHandshaker handshaker = handshakerFactory.newHandshaker(grpcHandler.getAuthority());
NettyTsiHandshaker nettyHandshaker = new NettyTsiHandshaker(handshaker);
securityHandler = new TsiHandshakeHandler(
gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore);
gnh, nettyHandshaker, new AltsHandshakeValidator(), handshakeSemaphore,
negotiationLogger);
} else {
securityHandler = InternalProtocolNegotiators.clientTlsHandler(
gnh, sslContext, grpcHandler.getAuthority());
gnh, sslContext, grpcHandler.getAuthority(), negotiationLogger);
}
ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(securityHandler);
ChannelHandler wuah = InternalProtocolNegotiators.waitUntilActiveHandler(securityHandler,
negotiationLogger);
return wuah;
}

View File

@ -22,13 +22,13 @@ import static io.grpc.alts.internal.AltsProtocolNegotiator.AUTH_CONTEXT_KEY;
import static io.grpc.alts.internal.AltsProtocolNegotiator.TSI_PEER_KEY;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.InternalChannelz.Security;
import io.grpc.SecurityLevel;
import io.grpc.alts.internal.TsiHandshakeHandler.HandshakeValidator.SecurityDetails;
import io.grpc.internal.GrpcAttributes;
import io.grpc.netty.InternalProtocolNegotiationEvent;
import io.grpc.netty.InternalProtocolNegotiators;
import io.grpc.netty.ProtocolNegotiationEvent;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
@ -87,13 +87,15 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder {
private ProtocolNegotiationEvent pne;
private boolean semaphoreAcquired;
private final ChannelLogger negotiationLogger;
/**
* Constructs a TsiHandshakeHandler.
*/
public TsiHandshakeHandler(
ChannelHandler next, NettyTsiHandshaker handshaker, HandshakeValidator handshakeValidator) {
this(next, handshaker, handshakeValidator, null);
ChannelHandler next, NettyTsiHandshaker handshaker, HandshakeValidator handshakeValidator,
ChannelLogger negotiationLogger) {
this(next, handshaker, handshakeValidator, null, negotiationLogger);
}
/**
@ -102,11 +104,12 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder {
*/
public TsiHandshakeHandler(
ChannelHandler next, NettyTsiHandshaker handshaker, HandshakeValidator handshakeValidator,
AsyncSemaphore semaphore) {
AsyncSemaphore semaphore, ChannelLogger negotiationLogger) {
this.handshaker = checkNotNull(handshaker, "handshaker");
this.handshakeValidator = checkNotNull(handshakeValidator, "handshakeValidator");
this.next = checkNotNull(next, "next");
this.semaphore = semaphore;
this.negotiationLogger = negotiationLogger;
}
@Override
@ -155,8 +158,7 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder {
if (evt instanceof ProtocolNegotiationEvent) {
checkState(pne == null, "negotiation already started");
pne = (ProtocolNegotiationEvent) evt;
InternalProtocolNegotiators.negotiationLogger(ctx)
.log(ChannelLogLevel.INFO, "TsiHandshake started");
negotiationLogger.log(ChannelLogLevel.INFO, "TsiHandshake started");
ChannelFuture acquire = semaphoreAcquire(ctx);
if (acquire.isSuccess()) {
semaphoreAcquired = true;
@ -190,8 +192,7 @@ public final class TsiHandshakeHandler extends ByteToMessageDecoder {
private void fireProtocolNegotiationEvent(
ChannelHandlerContext ctx, TsiPeer peer, Object authContext, SecurityDetails details) {
checkState(pne != null, "negotiation not yet complete");
InternalProtocolNegotiators.negotiationLogger(ctx)
.log(ChannelLogLevel.INFO, "TsiHandshake finished");
negotiationLogger.log(ChannelLogLevel.INFO, "TsiHandshake finished");
ProtocolNegotiationEvent localPne = pne;
Attributes.Builder attrs = InternalProtocolNegotiationEvent.getAttributes(localPne).toBuilder()
.set(TSI_PEER_KEY, peer)

View File

@ -37,6 +37,7 @@ import io.grpc.alts.internal.TsiPeer.Property;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.TestUtils.NoopChannelLogger;
import io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.InternalProtocolNegotiationEvent;
import io.grpc.netty.NettyChannelBuilder;
@ -404,7 +405,7 @@ public class AltsProtocolNegotiatorTest {
Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
super(null, decoder, encoder, initialSettings);
super(null, decoder, encoder, initialSettings, new NoopChannelLogger());
}
@Override

View File

@ -17,12 +17,17 @@
package io.grpc.alts.internal;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import io.grpc.Attributes;
import io.grpc.Channel;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.ManagedChannel;
import io.grpc.grpclb.GrpclbConstants;
import io.grpc.inprocess.InProcessChannelBuilder;
@ -100,6 +105,9 @@ public final class GoogleDefaultProtocolNegotiatorTest {
void subtest_altsHandler(Attributes eagAttributes) {
GrpcHttp2ConnectionHandler mockHandler = mock(GrpcHttp2ConnectionHandler.class);
when(mockHandler.getEagAttributes()).thenReturn(eagAttributes);
ChannelLogger logger = mock(ChannelLogger.class);
doNothing().when(logger).log(any(ChannelLogLevel.class), anyString());
when(mockHandler.getNegotiationLogger()).thenReturn(logger);
final AtomicReference<Throwable> failure = new AtomicReference<>();
ChannelHandler exceptionCaught = new ChannelInboundHandlerAdapter() {
@ -125,6 +133,9 @@ public final class GoogleDefaultProtocolNegotiatorTest {
GrpcHttp2ConnectionHandler mockHandler = mock(GrpcHttp2ConnectionHandler.class);
when(mockHandler.getEagAttributes()).thenReturn(eagAttributes);
when(mockHandler.getAuthority()).thenReturn("authority");
ChannelLogger logger = mock(ChannelLogger.class);
doNothing().when(logger).log(any(ChannelLogLevel.class), anyString());
when(mockHandler.getNegotiationLogger()).thenReturn(logger);
ChannelHandler h = googleProtocolNegotiator.newHandler(mockHandler);
EmbeddedChannel chan = new EmbeddedChannel(h);

View File

@ -142,4 +142,13 @@ public final class TestUtils {
private TestUtils() {
}
public static class NoopChannelLogger extends ChannelLogger {
@Override
public void log(ChannelLogLevel level, String message) {}
@Override
public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
}
}

View File

@ -20,6 +20,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Except
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.ChannelLogger;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
@ -51,9 +52,10 @@ abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings initialSettings,
ChannelLogger negotiationLogger,
boolean autoFlowControl,
PingLimiter pingLimiter) {
super(channelUnused, decoder, encoder, initialSettings);
super(channelUnused, decoder, encoder, initialSettings, negotiationLogger);
// During a graceful shutdown, wait until all streams are closed.
gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_NO_TIMEOUT);

View File

@ -16,7 +16,10 @@
package io.grpc.netty;
import static com.google.common.base.Preconditions.checkState;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.Internal;
import io.grpc.InternalChannelz;
import io.netty.channel.ChannelPromise;
@ -34,14 +37,17 @@ public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler
@Nullable
protected final ChannelPromise channelUnused;
private final ChannelLogger negotiationLogger;
protected GrpcHttp2ConnectionHandler(
ChannelPromise channelUnused,
Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
Http2Settings initialSettings,
ChannelLogger negotiationLogger) {
super(decoder, encoder, initialSettings);
this.channelUnused = channelUnused;
this.negotiationLogger = negotiationLogger;
}
/**
@ -69,6 +75,14 @@ public abstract class GrpcHttp2ConnectionHandler extends Http2ConnectionHandler
Attributes attrs, InternalChannelz.Security securityInfo) {
}
/**
* Returns the channel logger for the given channel context.
*/
public ChannelLogger getNegotiationLogger() {
checkState(negotiationLogger != null, "NegotiationLogger must not be null");
return negotiationLogger;
}
/**
* Calling this method indicates that the channel will no longer be used. This method is roughly
* the same as calling {@link #close} on the channel, but leaving the channel alive. This is

View File

@ -21,7 +21,6 @@ import io.grpc.netty.ProtocolNegotiators.ClientTlsHandler;
import io.grpc.netty.ProtocolNegotiators.GrpcNegotiationHandler;
import io.grpc.netty.ProtocolNegotiators.WaitUntilActiveHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslContext;
import io.netty.util.AsciiString;
@ -32,13 +31,6 @@ public final class InternalProtocolNegotiators {
private InternalProtocolNegotiators() {}
/**
* Returns the channel logger for the given channel context, or a Noop Logger if absent.
*/
public static ChannelLogger negotiationLogger(ChannelHandlerContext ctx) {
return ProtocolNegotiators.negotiationLogger(ctx);
}
/**
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
* be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
@ -146,8 +138,9 @@ public final class InternalProtocolNegotiators {
/**
* Internal version of {@link WaitUntilActiveHandler}.
*/
public static ChannelHandler waitUntilActiveHandler(ChannelHandler next) {
return new WaitUntilActiveHandler(next);
public static ChannelHandler waitUntilActiveHandler(ChannelHandler next,
ChannelLogger negotiationLogger) {
return new WaitUntilActiveHandler(next, negotiationLogger);
}
/**
@ -158,19 +151,21 @@ public final class InternalProtocolNegotiators {
}
public static ChannelHandler clientTlsHandler(
ChannelHandler next, SslContext sslContext, String authority) {
return new ClientTlsHandler(next, sslContext, authority, null);
ChannelHandler next, SslContext sslContext, String authority,
ChannelLogger negotiationLogger) {
return new ClientTlsHandler(next, sslContext, authority, null, negotiationLogger);
}
public static class ProtocolNegotiationHandler
extends ProtocolNegotiators.ProtocolNegotiationHandler {
protected ProtocolNegotiationHandler(ChannelHandler next, String negotiatorName) {
super(next, negotiatorName);
protected ProtocolNegotiationHandler(ChannelHandler next, String negotiatorName,
ChannelLogger negotiationLogger) {
super(next, negotiatorName, negotiationLogger);
}
protected ProtocolNegotiationHandler(ChannelHandler next) {
super(next);
protected ProtocolNegotiationHandler(ChannelHandler next, ChannelLogger negotiationLogger) {
super(next, negotiationLogger);
}
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.InternalChannelz;
import io.grpc.Metadata;
import io.grpc.Status;
@ -142,7 +143,8 @@ class NettyClientHandler extends AbstractNettyHandler {
Runnable tooManyPingsRunnable,
TransportTracer transportTracer,
Attributes eagAttributes,
String authority) {
String authority,
ChannelLogger negotiationLogger) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
@ -167,7 +169,8 @@ class NettyClientHandler extends AbstractNettyHandler {
tooManyPingsRunnable,
transportTracer,
eagAttributes,
authority);
authority,
negotiationLogger);
}
@VisibleForTesting
@ -184,7 +187,8 @@ class NettyClientHandler extends AbstractNettyHandler {
Runnable tooManyPingsRunnable,
TransportTracer transportTracer,
Attributes eagAttributes,
String authority) {
String authority,
ChannelLogger negotiationLogger) {
Preconditions.checkNotNull(connection, "connection");
Preconditions.checkNotNull(frameReader, "frameReader");
Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
@ -235,6 +239,7 @@ class NettyClientHandler extends AbstractNettyHandler {
decoder,
encoder,
settings,
negotiationLogger,
lifecycleManager,
keepAliveManager,
stopwatchFactory,
@ -250,6 +255,7 @@ class NettyClientHandler extends AbstractNettyHandler {
Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings settings,
ChannelLogger negotiationLogger,
ClientTransportLifecycleManager lifecycleManager,
KeepAliveManager keepAliveManager,
Supplier<Stopwatch> stopwatchFactory,
@ -259,7 +265,8 @@ class NettyClientHandler extends AbstractNettyHandler {
String authority,
boolean autoFlowControl,
PingLimiter pingLimiter) {
super(/* channelUnused= */ null, decoder, encoder, settings, autoFlowControl, pingLimiter);
super(/* channelUnused= */ null, decoder, encoder, settings,
negotiationLogger, autoFlowControl, pingLimiter);
this.lifecycleManager = lifecycleManager;
this.keepAliveManager = keepAliveManager;
this.stopwatchFactory = stopwatchFactory;

View File

@ -54,7 +54,6 @@ import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
import io.netty.util.AsciiString;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.SocketAddress;
@ -69,20 +68,6 @@ import javax.annotation.Nullable;
*/
class NettyClientTransport implements ConnectionClientTransport {
/**
* Get the existing {@link ChannelLogger} key in case a separate, isolated class loader has
* already created {@link LOGGER_KEY}.
*/
private static final AttributeKey<ChannelLogger> getOrCreateChannelLogger() {
AttributeKey<ChannelLogger> key = AttributeKey.valueOf("channelLogger");
if (key == null) {
key = AttributeKey.newInstance("channelLogger");
}
return key;
}
static final AttributeKey<ChannelLogger> LOGGER_KEY = getOrCreateChannelLogger();
private final InternalLogId logId;
private final Map<ChannelOption<?>, ?> channelOptions;
private final SocketAddress remoteAddress;
@ -237,13 +222,13 @@ class NettyClientTransport implements ConnectionClientTransport {
tooManyPingsRunnable,
transportTracer,
eagAttributes,
authorityString);
authorityString,
channelLogger);
ChannelHandler negotiationHandler = negotiator.newHandler(handler);
Bootstrap b = new Bootstrap();
b.option(ALLOCATOR, Utils.getByteBufAllocator(false));
b.attr(LOGGER_KEY, channelLogger);
b.group(eventLoop);
b.channelFactory(channelFactory);
// For non-socket based channel, the option will be ignored.

View File

@ -32,6 +32,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.InternalChannelz;
import io.grpc.InternalMetadata;
import io.grpc.InternalStatus;
@ -85,6 +87,7 @@ import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
@ -268,7 +271,8 @@ class NettyServerHandler extends AbstractNettyHandler {
final KeepAliveEnforcer keepAliveEnforcer,
boolean autoFlowControl,
Attributes eagAttributes) {
super(channelUnused, decoder, encoder, settings, autoFlowControl, null);
super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
autoFlowControl, null);
final MaxConnectionIdleManager maxConnectionIdleManager;
if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
@ -1045,4 +1049,29 @@ class NettyServerHandler extends AbstractNettyHandler {
padding, endStream, promise);
}
}
private static class ServerChannelLogger extends ChannelLogger {
private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
@Override
public void log(ChannelLogLevel level, String message) {
log.log(toJavaLogLevel(level), message);
}
@Override
public void log(ChannelLogLevel level, String messageFormat, Object... args) {
log(level, MessageFormat.format(messageFormat, args));
}
}
private static Level toJavaLogLevel(ChannelLogLevel level) {
switch (level) {
case ERROR:
return Level.FINE;
case WARNING:
return Level.FINER;
default:
return Level.FINEST;
}
}
}

View File

@ -66,8 +66,6 @@ import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.handler.ssl.SslProvider;
import io.netty.util.AsciiString;
import io.netty.util.Attribute;
import io.netty.util.AttributeMap;
import java.io.ByteArrayInputStream;
import java.net.SocketAddress;
import java.net.URI;
@ -293,29 +291,6 @@ final class ProtocolNegotiators {
}
}
static ChannelLogger negotiationLogger(ChannelHandlerContext ctx) {
return negotiationLogger(ctx.channel());
}
private static ChannelLogger negotiationLogger(AttributeMap attributeMap) {
Attribute<ChannelLogger> attr = attributeMap.attr(NettyClientTransport.LOGGER_KEY);
final ChannelLogger channelLogger = attr.get();
if (channelLogger != null) {
return channelLogger;
}
// This is only for tests where there may not be a valid logger.
final class NoopChannelLogger extends ChannelLogger {
@Override
public void log(ChannelLogLevel level, String message) {}
@Override
public void log(ChannelLogLevel level, String messageFormat, Object... args) {}
}
return new NoopChannelLogger();
}
public static ProtocolNegotiator.ServerFactory fixedServerFactory(
ProtocolNegotiator negotiator) {
return new FixedProtocolNegotiatorServerFactory(negotiator);
@ -400,7 +375,7 @@ final class ProtocolNegotiators {
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler handler) {
ChannelHandler gnh = new GrpcNegotiationHandler(handler);
ChannelHandler sth = new ServerTlsHandler(gnh, sslContext, executorPool);
return new WaitUntilActiveHandler(sth);
return new WaitUntilActiveHandler(sth, handler.getNegotiationLogger());
}
@Override
@ -499,8 +474,10 @@ final class ProtocolNegotiators {
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler http2Handler) {
ChannelHandler protocolNegotiationHandler = negotiator.newHandler(http2Handler);
ChannelLogger negotiationLogger = http2Handler.getNegotiationLogger();
return new ProxyProtocolNegotiationHandler(
proxyAddress, proxyUsername, proxyPassword, protocolNegotiationHandler);
proxyAddress, proxyUsername, proxyPassword, protocolNegotiationHandler,
negotiationLogger);
}
@Override
@ -534,8 +511,9 @@ final class ProtocolNegotiators {
SocketAddress address,
@Nullable String userName,
@Nullable String password,
ChannelHandler next) {
super(next);
ChannelHandler next,
ChannelLogger negotiationLogger) {
super(next, negotiationLogger);
this.address = checkNotNull(address, "address");
this.userName = userName;
this.password = password;
@ -585,9 +563,10 @@ final class ProtocolNegotiators {
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
ChannelHandler gnh = new GrpcNegotiationHandler(grpcHandler);
ChannelLogger negotiationLogger = grpcHandler.getNegotiationLogger();
ChannelHandler cth = new ClientTlsHandler(gnh, sslContext, grpcHandler.getAuthority(),
this.executor);
return new WaitUntilActiveHandler(cth);
this.executor, negotiationLogger);
return new WaitUntilActiveHandler(cth, negotiationLogger);
}
@Override
@ -606,8 +585,8 @@ final class ProtocolNegotiators {
private Executor executor;
ClientTlsHandler(ChannelHandler next, SslContext sslContext, String authority,
Executor executor) {
super(next);
Executor executor, ChannelLogger negotiationLogger) {
super(next, negotiationLogger);
this.sslContext = checkNotNull(sslContext, "sslContext");
HostPort hostPort = parseAuthority(authority);
this.host = hostPort.host;
@ -784,7 +763,7 @@ final class ProtocolNegotiators {
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
ChannelHandler upgradeHandler =
new Http2UpgradeAndGrpcHandler(grpcHandler.getAuthority(), grpcHandler);
return new WaitUntilActiveHandler(upgradeHandler);
return new WaitUntilActiveHandler(upgradeHandler, grpcHandler.getNegotiationLogger());
}
@Override
@ -800,17 +779,19 @@ final class ProtocolNegotiators {
private final String authority;
private final GrpcHttp2ConnectionHandler next;
private final ChannelLogger negotiationLogger;
private ProtocolNegotiationEvent pne;
Http2UpgradeAndGrpcHandler(String authority, GrpcHttp2ConnectionHandler next) {
this.authority = checkNotNull(authority, "authority");
this.next = checkNotNull(next, "next");
this.negotiationLogger = next.getNegotiationLogger();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
negotiationLogger(ctx).log(ChannelLogLevel.INFO, "Http2Upgrade started");
negotiationLogger.log(ChannelLogLevel.INFO, "Http2Upgrade started");
HttpClientCodec httpClientCodec = new HttpClientCodec();
ctx.pipeline().addBefore(ctx.name(), null, httpClientCodec);
@ -836,7 +817,7 @@ final class ProtocolNegotiators {
pne = (ProtocolNegotiationEvent) evt;
} else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
checkState(pne != null, "negotiation not yet complete");
negotiationLogger(ctx).log(ChannelLogLevel.INFO, "Http2Upgrade finished");
negotiationLogger.log(ChannelLogLevel.INFO, "Http2Upgrade finished");
ctx.pipeline().remove(ctx.name());
next.handleProtocolNegotiationCompleted(pne.getAttributes(), pne.getSecurity());
} else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
@ -979,7 +960,8 @@ final class ProtocolNegotiators {
@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
ChannelHandler grpcNegotiationHandler = new GrpcNegotiationHandler(grpcHandler);
ChannelHandler activeHandler = new WaitUntilActiveHandler(grpcNegotiationHandler);
ChannelHandler activeHandler = new WaitUntilActiveHandler(grpcNegotiationHandler,
grpcHandler.getNegotiationLogger());
return activeHandler;
}
@ -1001,8 +983,8 @@ final class ProtocolNegotiators {
boolean protocolNegotiationEventReceived;
WaitUntilActiveHandler(ChannelHandler next) {
super(next);
WaitUntilActiveHandler(ChannelHandler next, ChannelLogger negotiationLogger) {
super(next, negotiationLogger);
}
@Override
@ -1045,20 +1027,24 @@ final class ProtocolNegotiators {
private final ChannelHandler next;
private final String negotiatorName;
private ProtocolNegotiationEvent pne;
private final ChannelLogger negotiationLogger;
protected ProtocolNegotiationHandler(ChannelHandler next, String negotiatorName) {
protected ProtocolNegotiationHandler(ChannelHandler next, String negotiatorName,
ChannelLogger negotiationLogger) {
this.next = checkNotNull(next, "next");
this.negotiatorName = negotiatorName;
this.negotiationLogger = checkNotNull(negotiationLogger, "negotiationLogger");
}
protected ProtocolNegotiationHandler(ChannelHandler next) {
protected ProtocolNegotiationHandler(ChannelHandler next, ChannelLogger negotiationLogger) {
this.next = checkNotNull(next, "next");
this.negotiatorName = getClass().getSimpleName().replace("Handler", "");
this.negotiationLogger = checkNotNull(negotiationLogger, "negotiationLogger");
}
@Override
public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
negotiationLogger(ctx).log(ChannelLogLevel.DEBUG, "{0} started", negotiatorName);
negotiationLogger.log(ChannelLogLevel.DEBUG, "{0} started", negotiatorName);
handlerAdded0(ctx);
}
@ -1099,7 +1085,7 @@ final class ProtocolNegotiators {
protected final void fireProtocolNegotiationEvent(ChannelHandlerContext ctx) {
checkState(pne != null, "previous protocol negotiation event hasn't triggered");
negotiationLogger(ctx).log(ChannelLogLevel.INFO, "{0} completed", negotiatorName);
negotiationLogger.log(ChannelLogLevel.INFO, "{0} completed", negotiatorName);
ctx.pipeline().replace(ctx.name(), /* newName= */ null, next);
ctx.fireUserEventTriggered(pne);
}

View File

@ -875,7 +875,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase<NettyClientHand
tooManyPingsRunnable,
transportTracer,
Attributes.EMPTY,
"someauthority");
"someauthority",
null);
}
@Override

View File

@ -53,13 +53,13 @@ import io.grpc.TlsChannelCredentials;
import io.grpc.TlsServerCredentials;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TestUtils.NoopChannelLogger;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.ProtocolNegotiators.ClientTlsHandler;
import io.grpc.netty.ProtocolNegotiators.ClientTlsProtocolNegotiator;
@ -183,6 +183,7 @@ public class ProtocolNegotiatorsTest {
private SslContext sslContext;
private SSLEngine engine;
private ChannelHandlerContext channelHandlerCtx;
private static ChannelLogger noopLogger = new NoopChannelLogger();
@Before
public void setUp() throws Exception {
@ -583,7 +584,7 @@ public class ProtocolNegotiatorsTest {
latch.countDown();
super.handlerAdded(ctx);
}
});
}, noopLogger);
ChannelHandler lateAddingHandler = new ChannelInboundHandlerAdapter() {
@Override
@ -627,7 +628,7 @@ public class ProtocolNegotiatorsTest {
latch.countDown();
super.handlerAdded(ctx);
}
});
}, noopLogger);
LocalAddress addr = new LocalAddress("local");
ChannelFuture cf = new Bootstrap()
@ -860,7 +861,8 @@ public class ProtocolNegotiatorsTest {
};
DefaultEventLoopGroup elg = new DefaultEventLoopGroup(1);
ClientTlsHandler handler = new ClientTlsHandler(grpcHandler, sslContext, "authority", elg);
ClientTlsHandler handler = new ClientTlsHandler(grpcHandler, sslContext,
"authority", elg, noopLogger);
pipeline.addLast(handler);
pipeline.replace(SslHandler.class, null, goodSslHandler);
pipeline.fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT);
@ -898,7 +900,8 @@ public class ProtocolNegotiatorsTest {
.ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE)
.applicationProtocolConfig(apn).build();
ClientTlsHandler handler = new ClientTlsHandler(grpcHandler, sslContext, "authority", elg);
ClientTlsHandler handler = new ClientTlsHandler(grpcHandler, sslContext,
"authority", elg, noopLogger);
pipeline.addLast(handler);
pipeline.replace(SslHandler.class, null, goodSslHandler);
pipeline.fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT);
@ -921,7 +924,8 @@ public class ProtocolNegotiatorsTest {
};
DefaultEventLoopGroup elg = new DefaultEventLoopGroup(1);
ClientTlsHandler handler = new ClientTlsHandler(grpcHandler, sslContext, "authority", elg);
ClientTlsHandler handler = new ClientTlsHandler(grpcHandler, sslContext,
"authority", elg, noopLogger);
pipeline.addLast(handler);
final AtomicReference<Throwable> error = new AtomicReference<>();
@ -948,7 +952,8 @@ public class ProtocolNegotiatorsTest {
@Test
public void clientTlsHandler_closeDuringNegotiation() throws Exception {
ClientTlsHandler handler = new ClientTlsHandler(grpcHandler, sslContext, "authority", null);
ClientTlsHandler handler = new ClientTlsHandler(grpcHandler, sslContext,
"authority", null, noopLogger);
pipeline.addLast(new WriteBufferingAndExceptionHandler(handler));
ChannelFuture pendingWrite = channel.writeAndFlush(NettyClientHandler.NOOP_MESSAGE);
@ -1174,7 +1179,7 @@ public class ProtocolNegotiatorsTest {
.sync()
.channel();
Channel c = new Bootstrap()
.handler(new WaitUntilActiveHandler(next))
.handler(new WaitUntilActiveHandler(next, noopLogger))
.channel(LocalChannel.class).group(group)
.connect(addr)
.sync()
@ -1338,7 +1343,7 @@ public class ProtocolNegotiatorsTest {
new DefaultHttp2ConnectionDecoder(conn, encoder, new DefaultHttp2FrameReader());
Http2Settings settings = new Http2Settings();
return new FakeGrpcHttp2ConnectionHandler(
/*channelUnused=*/ null, decoder, encoder, settings, noop);
/*channelUnused=*/ null, decoder, encoder, settings, noop, noopLogger);
}
private final boolean noop;
@ -1351,8 +1356,9 @@ public class ProtocolNegotiatorsTest {
Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings initialSettings,
boolean noop) {
super(channelUnused, decoder, encoder, initialSettings);
boolean noop,
ChannelLogger negotiationLogger) {
super(channelUnused, decoder, encoder, initialSettings, negotiationLogger);
this.noop = noop;
}

View File

@ -252,7 +252,7 @@ public final class SdsProtocolNegotiators {
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().remove(this);
}
});
}, grpcHandler.getNegotiationLogger());
checkNotNull(grpcHandler, "grpcHandler");
this.grpcHandler = grpcHandler;
this.sslContextProviderSupplier = sslContextProviderSupplier;
@ -399,7 +399,7 @@ public final class SdsProtocolNegotiators {
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().remove(this);
}
});
}, grpcHandler.getNegotiationLogger());
checkNotNull(grpcHandler, "grpcHandler");
this.grpcHandler = grpcHandler;
this.downstreamTlsContext = downstreamTlsContext;

View File

@ -25,6 +25,9 @@ import static io.grpc.xds.internal.sds.CommonTlsContextTestsUtil.SERVER_1_PEM_FI
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -34,6 +37,9 @@ import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValida
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.TlsCertificate;
import io.grpc.Attributes;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
import io.grpc.internal.TestUtils.NoopChannelLogger;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.InternalProtocolNegotiationEvent;
@ -179,6 +185,9 @@ public class SdsProtocolNegotiatorsTest {
ClientSdsProtocolNegotiator pn =
new ClientSdsProtocolNegotiator(InternalProtocolNegotiators.plaintext());
GrpcHttp2ConnectionHandler mockHandler = mock(GrpcHttp2ConnectionHandler.class);
ChannelLogger logger = mock(ChannelLogger.class);
doNothing().when(logger).log(any(ChannelLogLevel.class), anyString());
when(mockHandler.getNegotiationLogger()).thenReturn(logger);
TlsContextManager mockTlsContextManager = mock(TlsContextManager.class);
when(mockHandler.getEagAttributes())
.thenReturn(
@ -387,7 +396,7 @@ public class SdsProtocolNegotiatorsTest {
Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
super(channelUnused, decoder, encoder, initialSettings);
super(channelUnused, decoder, encoder, initialSettings, new NoopChannelLogger());
}
static FakeGrpcHttp2ConnectionHandler newHandler() {