diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/driver/LoadClient.java b/benchmarks/src/main/java/io/grpc/benchmarks/driver/LoadClient.java index e82e28398f..df1ec46843 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/driver/LoadClient.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/driver/LoadClient.java @@ -46,7 +46,6 @@ import io.grpc.benchmarks.proto.Control; import io.grpc.benchmarks.proto.Messages; import io.grpc.benchmarks.proto.Payloads; import io.grpc.benchmarks.proto.Stats; -import io.grpc.internal.ManagedChannelImpl; import io.grpc.stub.ClientCalls; import io.grpc.stub.StreamObserver; import io.netty.buffer.ByteBuf; @@ -96,7 +95,7 @@ class LoadClient { log.log(Level.INFO, "Client Config \n" + config.toString()); this.config = config; // Create the channels - channels = new ManagedChannelImpl[config.getClientChannels()]; + channels = new ManagedChannel[config.getClientChannels()]; for (int i = 0; i < config.getClientChannels(); i++) { channels[i] = Utils.newClientChannel( diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index de22269a49..7ccfaa78ce 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -46,6 +46,8 @@ import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; import io.grpc.Internal; import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer2; +import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.NameResolver; import io.grpc.NameResolverProvider; @@ -94,6 +96,7 @@ public abstract class AbstractManagedChannelImplBuilder @Nullable private Executor executor; + private final List interceptors = new ArrayList(); private final String target; @@ -113,6 +116,9 @@ public abstract class AbstractManagedChannelImplBuilder @Nullable private LoadBalancer.Factory loadBalancerFactory; + @Nullable + private LoadBalancer2.Factory loadBalancer2Factory; + @Nullable private DecompressorRegistry decompressorRegistry; @@ -204,6 +210,17 @@ public abstract class AbstractManagedChannelImplBuilder return thisT(); } + /** + * DO NOT CALL THIS, as its argument type will soon be renamed. + */ + public final T loadBalancerFactory(LoadBalancer2.Factory loadBalancerFactory) { + Preconditions.checkState(directServerAddress == null, + "directServerAddress is set (%s), which forbids the use of LoadBalancerFactory", + directServerAddress); + this.loadBalancer2Factory = loadBalancerFactory; + return thisT(); + } + @Override public final T decompressorRegistry(DecompressorRegistry registry) { this.decompressorRegistry = registry; @@ -266,7 +283,7 @@ public abstract class AbstractManagedChannelImplBuilder } @Override - public ManagedChannelImpl build() { + public ManagedChannel build() { ClientTransportFactory transportFactory = buildTransportFactory(); if (authorityOverride != null) { transportFactory = new AuthorityOverridingTransportFactory( @@ -279,20 +296,39 @@ public abstract class AbstractManagedChannelImplBuilder // getResource(), then this shouldn't be a problem unless called on the UI thread. nameResolverFactory = NameResolverProvider.asFactory(); } - return new ManagedChannelImpl( - target, - // TODO(carl-mastrangelo): Allow clients to pass this in - new ExponentialBackoffPolicy.Provider(), - nameResolverFactory, - getNameResolverParams(), - firstNonNull(loadBalancerFactory, PickFirstBalancerFactory.getInstance()), - transportFactory, - firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()), - firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()), - GrpcUtil.TIMER_SERVICE, GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis, - executor, userAgent, interceptors, - firstNonNull(statsFactory, - firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE))); + if (loadBalancer2Factory != null) { + return new ManagedChannelImpl2( + target, + // TODO(carl-mastrangelo): Allow clients to pass this in + new ExponentialBackoffPolicy.Provider(), + nameResolverFactory, + getNameResolverParams(), + loadBalancer2Factory, + transportFactory, + firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()), + firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()), + SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE), + getExecutorPool(executor), + SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR), + GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis, + userAgent, interceptors, firstNonNull(statsFactory, + firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE))); + } else { + return new ManagedChannelImpl( + target, + // TODO(carl-mastrangelo): Allow clients to pass this in + new ExponentialBackoffPolicy.Provider(), + nameResolverFactory, + getNameResolverParams(), + firstNonNull(loadBalancerFactory, PickFirstBalancerFactory.getInstance()), + transportFactory, + firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()), + firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()), + GrpcUtil.TIMER_SERVICE, GrpcUtil.STOPWATCH_SUPPLIER, idleTimeoutMillis, + executor, userAgent, interceptors, + firstNonNull(statsFactory, + firstNonNull(Stats.getStatsContextFactory(), NoopStatsContextFactory.INSTANCE))); + } } /** @@ -311,6 +347,24 @@ public abstract class AbstractManagedChannelImplBuilder return Attributes.EMPTY; } + private static ObjectPool getExecutorPool(final @Nullable Executor executor) { + if (executor != null) { + return new ObjectPool() { + @Override + public Executor getObject() { + return executor; + } + + @Override + public Executor returnObject(Object returned) { + return null; + } + }; + } else { + return SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR); + } + } + private static class AuthorityOverridingTransportFactory implements ClientTransportFactory { final ClientTransportFactory factory; final String authorityOverride; diff --git a/core/src/main/java/io/grpc/internal/SharedResourcePool.java b/core/src/main/java/io/grpc/internal/SharedResourcePool.java new file mode 100644 index 0000000000..fbcef4343a --- /dev/null +++ b/core/src/main/java/io/grpc/internal/SharedResourcePool.java @@ -0,0 +1,59 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +/** + * An ObjectPool backed by a {@link SharedResourceHolder.Resource}. + */ +public final class SharedResourcePool implements ObjectPool { + private final SharedResourceHolder.Resource resource; + + private SharedResourcePool(SharedResourceHolder.Resource resource) { + this.resource = resource; + } + + public static SharedResourcePool forResource(SharedResourceHolder.Resource resource) { + return new SharedResourcePool(resource); + } + + @Override + public T getObject() { + return SharedResourceHolder.get(resource); + } + + @Override + @SuppressWarnings("unchecked") + public T returnObject(Object object) { + SharedResourceHolder.release(resource, (T) object); + return null; + } +} diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/CascadingTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/CascadingTest.java index adaf5412a3..c1e65338f0 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/CascadingTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/CascadingTest.java @@ -43,6 +43,7 @@ import com.google.common.util.concurrent.SettableFuture; import io.grpc.Context; import io.grpc.Context.CancellableContext; import io.grpc.Deadline; +import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; @@ -52,7 +53,6 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; -import io.grpc.internal.ManagedChannelImpl; import io.grpc.internal.ServerImpl; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; @@ -84,7 +84,7 @@ public class CascadingTest { @Mock TestServiceGrpc.TestServiceImplBase service; - private ManagedChannelImpl channel; + private ManagedChannel channel; private ServerImpl server; private CountDownLatch observedCancellations; private CountDownLatch receivedCancellations;