Load-balancing ManagedChannelImpl.

- Add NameResolver and LoadBalancer interfaces.
- ManagedChannelImpl now uses NameResolver and LoadBalancer for
  transport selection, which may return transports to multiple
  addresses.
- Transports are still owned by ManagedChannelImpl, which implements
  TransportManager interface that is provided to LoadBalancer, so that
  LoadBalancer doesn't worry about Transport lifecycles.
- Channel builders can be created by forTarget() that accepts the fully
  qualified target name, which is an URI. (TODO) it's not tested.
- The old address-based construction pattern is supported by using
  AbstractManagedChannelImplBuilder.DirectAddressNameResolver.
- (TODO) DnsNameResolver and SimpleLoadBalancer are currently
  incomplete. They merely work for the single-address scenario.
This commit is contained in:
Kun Zhang 2015-09-04 17:21:44 -07:00
parent bfbd6e1ec5
commit 942f4c99d8
29 changed files with 1515 additions and 218 deletions

View File

@ -0,0 +1,116 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
/**
* An immutable type-safe container of attributes.
*/
@ExperimentalApi
@Immutable
public final class Attributes {
private final HashMap<String, Object> data = new HashMap<String, Object>();
public static final Attributes EMPTY = new Attributes();
private Attributes() {
}
/**
* Gets the value for the key, or {@code null} if it's not present.
*/
@SuppressWarnings("unchecked")
@Nullable
public <T> T get(Key<T> key) {
return (T) data.get(key.name);
}
/**
* Create a new builder.
*/
public static Builder newBuilder() {
return new Builder();
}
public static final class Key<T> {
private final String name;
/**
* Construct the key.
*
* @param name the name, which should be namespaced like com.foo.BarAttribute to avoid
* collision.
*/
public Key(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}
@Override
public String toString() {
return data.toString();
}
public static final class Builder {
private Attributes product;
private Builder() {
this.product = new Attributes();
}
public <T> void set(Key<T> key, T value) {
product.data.put(key.name, value);
}
/**
* Build the attributes. Can only be called once.
*/
public Attributes build() {
Preconditions.checkState(product != null, "Already built");
Attributes result = product;
product = null;
return result;
}
}
}

View File

@ -62,6 +62,9 @@ public final class CallOptions {
@Nullable
private String authority;
@Nullable
private RequestKey requestKey;
/**
* Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
* generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
@ -130,6 +133,25 @@ public final class CallOptions {
return newOptions;
}
/**
* Returns a new {@code CallOptions} with a request key for affinity-based routing.
*/
@ExperimentalApi
public CallOptions withRequestKey(@Nullable RequestKey requestKey) {
CallOptions newOptions = new CallOptions(this);
newOptions.requestKey = requestKey;
return newOptions;
}
/**
* Returns the request key for affinity-based routing.
*/
@ExperimentalApi
@Nullable
public RequestKey getRequestKey() {
return requestKey;
}
/**
* Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
* generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
@ -155,6 +177,7 @@ public final class CallOptions {
deadlineNanoTime = other.deadlineNanoTime;
compressor = other.compressor;
authority = other.authority;
requestKey = other.requestKey;
}
@SuppressWarnings("deprecation") // guava 14.0

View File

@ -0,0 +1,138 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import com.google.common.base.Preconditions;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
/**
* A factory for DNS-based {@link NameResolver}s.
*
* <p>The format of the target URI is {@code "[dns://[<DNS_server_address>]/]<name>"}.
*/
@ExperimentalApi
public final class DnsNameResolverFactory extends NameResolver.Factory {
private static final DnsNameResolverFactory instance = new DnsNameResolverFactory();
@Override
public NameResolver newNameResolver(URI targetUri) {
String scheme = targetUri.getScheme();
if (scheme == null) {
return new DnsNameResolver(null, targetUri.toString());
} else if (scheme.equals("dns")) {
String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath");
Preconditions.checkArgument(targetPath.startsWith("/"),
"the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
String name = targetPath.substring(1);
return new DnsNameResolver(targetUri.getAuthority(), name);
} else {
return null;
}
}
private DnsNameResolverFactory() {
}
public static DnsNameResolverFactory getInstance() {
return instance;
}
private static class DnsNameResolver extends NameResolver {
private final String authority;
private final String host;
private final int port;
private ExecutorService executor;
DnsNameResolver(@Nullable String nsAuthority, String name) {
// TODO: if a DNS server is provided as nsAuthority, use it.
// https://www.captechconsulting.com/blogs/accessing-the-dusty-corners-of-dns-with-java
// Must prepend a "//" to the name when constructing a URI, otherwise
// the authority and host of the resulted URI would be null.
URI nameUri = URI.create("//" + name);
authority = Preconditions.checkNotNull(nameUri.getAuthority(),
"nameUri (%s) doesn't have an authority", nameUri);
host = Preconditions.checkNotNull(nameUri.getHost(), "host");
port = nameUri.getPort();
Preconditions.checkArgument(port > 0, "port (%s) must be positive", port);
}
@Override
public String getServiceAuthority() {
return authority;
}
@Override
public synchronized void start(final Listener listener) {
Preconditions.checkState(executor == null, "already started");
executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
executor.execute(new Runnable() {
@Override
public void run() {
InetAddress[] inetAddrs;
try {
inetAddrs = InetAddress.getAllByName(host);
} catch (Exception e) {
listener.onError(Status.UNAVAILABLE.withCause(e));
return;
}
ArrayList<ResolvedServerInfo> servers
= new ArrayList<ResolvedServerInfo>(inetAddrs.length);
for (int i = 0; i < inetAddrs.length; i++) {
InetAddress inetAddr = inetAddrs[i];
servers.add(
new ResolvedServerInfo(new InetSocketAddress(inetAddr, port), Attributes.EMPTY));
}
listener.onUpdate(servers, Attributes.EMPTY);
}
});
}
@Override
public synchronized void shutdown() {
if (executor != null) {
executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor);
}
}
}
}

View File

@ -0,0 +1,103 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.internal.ClientTransport;
import java.net.SocketAddress;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
/**
* A pluggable component that receives resolved addresses from {@link NameResolver} and provides the
* channel a usable transport when asked.
*
* <p>Note to implementations: all methods are expected to return quickly. Any work that may block
* should be done asynchronously.
*/
// TODO(zhangkun83): since it's also used for non-loadbalancing cases like pick-first,
// "RequestRouter" might be a better name.
@ExperimentalApi
@ThreadSafe
public abstract class LoadBalancer {
/**
* Pick a transport that Channel will use for next RPC.
*
* @param requestKey for affinity-based routing
*/
public abstract ListenableFuture<ClientTransport> pickTransport(
@Nullable RequestKey requestKey);
/**
* Shuts down this {@code LoadBalancer}.
*/
public void shutdown() { }
/**
* Handles newly resolved addresses and service config from name resolution system.
*
* <p>Implementations should not modify the given {@code servers}.
*/
public void handleResolvedAddresses(List<ResolvedServerInfo> servers, Attributes config) { }
/**
* Handles an error from the name resolution system.
*
* @param error a non-OK status
*/
public void handleNameResolutionError(Status error) { }
/**
* Called when a transport is fully connected and ready to accept traffic.
*/
public void transportReady(SocketAddress addr, ClientTransport transport) { }
/**
* Called when a transport is shutting down.
*/
public void transportShutdown(SocketAddress addr, ClientTransport transport, Status s) { }
public abstract static class Factory {
/**
* Creates a {@link LoadBalancer} that will be used inside a channel.
*
* @param serviceName the DNS-style service name, which is also the authority
* @param tm the interface where an {@code LoadBalancer} implementation gets connected
* transports from
*/
public abstract LoadBalancer newLoadBalancer(String serviceName, TransportManager tm);
}
}

View File

@ -44,6 +44,11 @@ public abstract class ManagedChannelBuilder<T extends ManagedChannelBuilder<T>>
return ManagedChannelProvider.provider().builderForAddress(name, port);
}
@ExperimentalApi
public static ManagedChannelBuilder<?> forTarget(String target) {
return ManagedChannelProvider.provider().builderForTarget(target);
}
/**
* Provides a custom executor.
*
@ -99,6 +104,24 @@ public abstract class ManagedChannelBuilder<T extends ManagedChannelBuilder<T>>
@ExperimentalApi("primarily for testing")
public abstract T usePlaintext(boolean skipNegotiation);
/*
* Provides a custom {@link NameResolver.Factory} for the channel.
*
* <p>If this method is not called, the builder will look up in the global resolver registry for
* a factory for the provided target.
*/
@ExperimentalApi
public abstract T nameResolverFactory(NameResolver.Factory resolverFactory);
/**
* Provides a custom {@link LoadBalancer.Factory} for the channel.
*
* <p>If this method is not called, the builder will use {@link SimpleLoadBalancerFactory} for the
* channel.
*/
@ExperimentalApi
public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory);
/**
* Builds a channel using the given parameters.
*/

View File

@ -106,6 +106,12 @@ public abstract class ManagedChannelProvider {
*/
protected abstract ManagedChannelBuilder<?> builderForAddress(String name, int port);
/**
* Creates a new builder with the given target URI.
*/
@ExperimentalApi
protected abstract ManagedChannelBuilder<?> builderForTarget(String target);
public static final class ProviderNotFoundException extends RuntimeException {
public ProviderNotFoundException(String msg) {
super(msg);

View File

@ -0,0 +1,101 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import java.net.URI;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
/**
* A pluggable component that resolves a target URI (which is broken down into 3 parts as described
* below) and return addresses to the caller.
*
* <p>The format of the target URI is {@code "[<scheme>:]<scheme-specific-string>"}
*
* <p>{@code NameResolver} has no knowledge of load-balancing. The addresses of a target may be
* changed over time, thus the caller registers a {@link Listener} to receive continuous updates.
*/
@ExperimentalApi
@ThreadSafe
public abstract class NameResolver {
/**
* Returns the authority, which is also the name of the service.
*
* <p>An implementation must generate it locally and must keep it unchanged.
*/
public abstract String getServiceAuthority();
/**
* Starts the resolution.
*
* @param listener used to receive updates on the target
*/
public abstract void start(Listener listener);
/**
* Stops the resolution. Updates to the Listener will stop.
*/
public abstract void shutdown();
public abstract static class Factory {
/**
* Creates a {@link NameResolver} for the given target URI, or {@code null} if the given URI
* cannot be resolved by this factory.
*/
@Nullable
public abstract NameResolver newNameResolver(URI targetUri);
}
/**
* Receives address updates.
*
* <p>All methods are expected to return quickly.
*/
@ThreadSafe
public interface Listener {
/**
* Handles updates on resolved addresses and config.
*
* <p>Implementations will not modify the given {@code servers}.
*/
void onUpdate(List<ResolvedServerInfo> servers, Attributes config);
/**
* Handles an error from the resolver.
*
* @param error a non-OK status
*/
void onError(Status error);
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
/**
* A key generated from an RPC request, and to be used for affinity-based
* routing.
*/
@ExperimentalApi
public final class RequestKey {
// TODO(zhangkun83): materialize this class once we decide the form of the affinity key.
private RequestKey() {
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import java.net.SocketAddress;
import javax.annotation.concurrent.Immutable;
/**
* The information about a server from a {@link NameResolver}.
*/
@ExperimentalApi
@Immutable
public final class ResolvedServerInfo {
private final SocketAddress address;
private final Attributes attributes;
/**
* Constructor.
*
* @param address the address object
* @param attributes attributes associated with this address.
*/
public ResolvedServerInfo(SocketAddress address, Attributes attributes) {
this.address = address;
this.attributes = attributes;
}
/**
* Returns the address.
*/
public SocketAddress getAddress() {
return address;
}
/**
* Returns the associated attributes.
*/
public Attributes getAttributes() {
return attributes;
}
@Override
public String toString() {
return "[address=" + address + ", attrs=" + attributes + "]";
}
}

View File

@ -0,0 +1,179 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.internal.ClientTransport;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* A {@link LoadBalancer} that provides simple round-robin and pick-first routing mechanism over the
* addresses from the {@link NameResolver}.
*/
// TODO(zhangkun83): Only pick-first is implemented. We need to implement round-robin.
public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory {
private static final SimpleLoadBalancerFactory instance = new SimpleLoadBalancerFactory();
private SimpleLoadBalancerFactory() {
}
public static SimpleLoadBalancerFactory getInstance() {
return instance;
}
@Override
public LoadBalancer newLoadBalancer(String serviceName, TransportManager tm) {
return new SimpleLoadBalancer(tm);
}
private static class SimpleLoadBalancer extends LoadBalancer {
@GuardedBy("servers")
private final List<ResolvedServerInfo> servers = new ArrayList<ResolvedServerInfo>();
@GuardedBy("servers")
private int currentServerIndex;
// TODO(zhangkun83): virtually any LoadBalancer would need to handle picks before name
// resolution is done, we may want to move the related logic into ManagedChannelImpl.
@GuardedBy("servers")
private List<SettableFuture<ClientTransport>> pendingPicks;
@GuardedBy("servers")
private StatusException nameResolutionError;
private final TransportManager tm;
private SimpleLoadBalancer(TransportManager tm) {
this.tm = tm;
}
@Override
public ListenableFuture<ClientTransport> pickTransport(@Nullable RequestKey requestKey) {
ResolvedServerInfo currentServer;
synchronized (servers) {
if (servers.isEmpty()) {
if (nameResolutionError != null) {
return Futures.immediateFailedFuture(nameResolutionError);
}
SettableFuture<ClientTransport> future = SettableFuture.create();
if (pendingPicks == null) {
pendingPicks = new ArrayList<SettableFuture<ClientTransport>>();
}
pendingPicks.add(future);
return future;
}
currentServer = servers.get(currentServerIndex);
}
return tm.getTransport(currentServer.getAddress());
}
@Override
public void handleResolvedAddresses(
List<ResolvedServerInfo> updatedServers, Attributes config) {
List<SettableFuture<ClientTransport>> pendingPicksCopy = null;
ResolvedServerInfo currentServer = null;
synchronized (servers) {
nameResolutionError = null;
servers.clear();
for (ResolvedServerInfo addr : updatedServers) {
servers.add(addr);
}
if (!servers.isEmpty()) {
pendingPicksCopy = pendingPicks;
pendingPicks = null;
if (currentServerIndex >= servers.size()) {
currentServerIndex = 0;
}
currentServer = servers.get(currentServerIndex);
}
}
if (pendingPicksCopy != null) {
// If pendingPicksCopy != null, then servers.isEmpty() == false, then
// currentServer must have been assigned.
Preconditions.checkState(currentServer != null, "currentServer is null");
for (final SettableFuture<ClientTransport> pendingPick : pendingPicksCopy) {
ListenableFuture<ClientTransport> future = tm.getTransport(currentServer.getAddress());
Futures.addCallback(future, new FutureCallback<ClientTransport>() {
@Override public void onSuccess(ClientTransport result) {
pendingPick.set(result);
}
@Override public void onFailure(Throwable t) {
pendingPick.setException(t);
}
});
}
}
}
@Override
public void handleNameResolutionError(Status error) {
List<SettableFuture<ClientTransport>> pendingPicksCopy = null;
StatusException statusException = error.asException();
synchronized (servers) {
pendingPicksCopy = pendingPicks;
pendingPicks = null;
nameResolutionError = statusException;
}
if (pendingPicksCopy != null) {
for (SettableFuture<ClientTransport> pendingPick : pendingPicksCopy) {
pendingPick.setException(statusException);
}
}
}
@Override
public void transportShutdown(SocketAddress addr, ClientTransport transport, Status s) {
if (!s.isOk()) {
// If the current transport is shut down due to error, move on to the next address in the
// list
synchronized (servers) {
if (addr.equals(servers.get(currentServerIndex).getAddress())) {
currentServerIndex++;
if (currentServerIndex >= servers.size()) {
currentServerIndex = 0;
}
}
}
}
}
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.internal.ClientTransport;
import java.net.SocketAddress;
/**
* Manages transport life-cycles and provide ready-to-use transports.
*/
@ExperimentalApi
public abstract class TransportManager {
/**
* Advises this {@code TransportManager} to retain transports only to these servers, for warming
* up connections and discarding unused connections.
*/
public abstract void updateRetainedTransports(SocketAddress[] addrs);
/**
* Returns the future of a transport for the given server.
*
* <p>If the channel has been shut down, the value of the future will be {@code null}.
*/
// TODO(zhangkun83): GrpcLoadBalancer will use this to get transport to connect to LB servers,
// which would have a different authority than the primary servers. We need to figure out how to
// do it.
public abstract ListenableFuture<ClientTransport> getTransport(SocketAddress addr);
}

View File

@ -39,6 +39,8 @@ import io.grpc.internal.AbstractReferenceCounted;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.ClientTransportFactory;
import java.net.SocketAddress;
/**
* Builder for a channel that issues in-process requests. Clients identify the in-process server by
* its name.
@ -59,9 +61,9 @@ public class InProcessChannelBuilder extends
}
private final String name;
private String authority = "localhost";
private InProcessChannelBuilder(String name) {
super(new InProcessSocketAddress(name), "localhost");
this.name = Preconditions.checkNotNull(name);
}
@ -73,40 +75,40 @@ public class InProcessChannelBuilder extends
return this;
}
@Override
public InProcessChannelBuilder overrideAuthority(String authority) {
this.authority = authority;
return this;
}
@Override
protected ClientTransportFactory buildTransportFactory() {
return new InProcessClientTransportFactory(name, authority);
return new InProcessClientTransportFactory(name);
}
private static class InProcessClientTransportFactory extends AbstractReferenceCounted
implements ClientTransportFactory {
private final String name;
private final String authority;
private InProcessClientTransportFactory(String name, String authority) {
private InProcessClientTransportFactory(String name) {
this.name = name;
this.authority = authority;
}
@Override
public ClientTransport newClientTransport() {
public ClientTransport newClientTransport(SocketAddress addr, String authority) {
return new InProcessTransport(name);
}
@Override
public String authority() {
return authority;
}
@Override
protected void deallocate() {
// Do nothing.
}
}
private static class InProcessSocketAddress extends SocketAddress {
final String name;
InProcessSocketAddress(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}
}

View File

@ -31,12 +31,23 @@
package io.grpc.internal;
import io.grpc.ClientInterceptor;
import io.grpc.Internal;
import io.grpc.ManagedChannelBuilder;
import com.google.common.base.Preconditions;
import io.grpc.Attributes;
import io.grpc.ClientInterceptor;
import io.grpc.DnsNameResolverFactory;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo;
import io.grpc.SimpleLoadBalancerFactory;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
@ -54,9 +65,34 @@ public abstract class AbstractManagedChannelImplBuilder
private Executor executor;
private final List<ClientInterceptor> interceptors = new ArrayList<ClientInterceptor>();
private final URI target;
@Nullable
private final SocketAddress directServerAddress;
@Nullable
private String userAgent;
@Nullable
private String authorityOverride;
@Nullable
private NameResolver.Factory nameResolverFactory;
@Nullable
private LoadBalancer.Factory loadBalancerFactory;
protected AbstractManagedChannelImplBuilder(URI target) {
this.target = Preconditions.checkNotNull(target);
this.directServerAddress = null;
}
protected AbstractManagedChannelImplBuilder(SocketAddress directServerAddress, String authority) {
this.target = URI.create("direct-address:///" + directServerAddress);
this.directServerAddress = directServerAddress;
this.nameResolverFactory = new DirectAddressNameResolverFactory(directServerAddress, authority);
}
@Override
public final T executor(Executor executor) {
this.executor = executor;
@ -74,6 +110,24 @@ public abstract class AbstractManagedChannelImplBuilder
return intercept(Arrays.asList(interceptors));
}
@Override
public final T nameResolverFactory(NameResolver.Factory resolverFactory) {
Preconditions.checkState(directServerAddress == null,
"directServerAddress is set (%s), which forbids the use of NameResolverFactory",
directServerAddress);
this.nameResolverFactory = resolverFactory;
return thisT();
}
@Override
public final T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory) {
Preconditions.checkState(directServerAddress == null,
"directServerAddress is set (%s), which forbids the use of LoadBalancerFactory",
directServerAddress);
this.loadBalancerFactory = loadBalancerFactory;
return thisT();
}
private T thisT() {
@SuppressWarnings("unchecked")
T thisT = (T) this;
@ -86,10 +140,33 @@ public abstract class AbstractManagedChannelImplBuilder
return thisT();
}
@Override
public final T overrideAuthority(String authority) {
this.authorityOverride = checkAuthority(authority);
return thisT();
}
/**
* Verifies the authority is valid. This method exists as an escape hatch for putting in an
* authority that is valid, but would fail the default validation provided by this
* implementation.
*/
protected String checkAuthority(String authority) {
return GrpcUtil.checkAuthority(authority);
}
@Override
public ManagedChannelImpl build() {
ClientTransportFactory transportFactory = buildTransportFactory();
return new ManagedChannelImpl(transportFactory, executor, userAgent, interceptors);
ClientTransportFactory transportFactory = new AuthorityOverridingTransportFactory(
buildTransportFactory(), authorityOverride);
return new ManagedChannelImpl(
target,
// TODO(carl-mastrangelo): Allow clients to pass this in
new ExponentialBackoffPolicy.Provider(),
// TODO(zhangkun83): use a NameResolver registry for the "nameResolverFactory == null" case
nameResolverFactory == null ? DnsNameResolverFactory.getInstance() : nameResolverFactory,
loadBalancerFactory == null ? SimpleLoadBalancerFactory.getInstance() : loadBalancerFactory,
transportFactory, executor, userAgent, interceptors);
}
/**
@ -99,4 +176,68 @@ public abstract class AbstractManagedChannelImplBuilder
*/
@Internal
protected abstract ClientTransportFactory buildTransportFactory();
private static class AuthorityOverridingTransportFactory implements ClientTransportFactory {
final ClientTransportFactory factory;
@Nullable final String authorityOverride;
AuthorityOverridingTransportFactory(
ClientTransportFactory factory, @Nullable String authorityOverride) {
this.factory = factory;
this.authorityOverride = authorityOverride;
}
@Override
public ClientTransport newClientTransport(SocketAddress serverAddress, String authority) {
return factory.newClientTransport(
serverAddress, authorityOverride != null ? authorityOverride : authority);
}
@Override
public int referenceCount() {
return factory.referenceCount();
}
@Override
public ReferenceCounted retain() {
factory.retain();
return this;
}
@Override
public ReferenceCounted release() {
factory.release();
return this;
}
}
private static class DirectAddressNameResolverFactory extends NameResolver.Factory {
final SocketAddress address;
final String authority;
DirectAddressNameResolverFactory(SocketAddress address, String authority) {
this.address = address;
this.authority = authority;
}
@Override
public NameResolver newNameResolver(URI notUsedUri) {
return new NameResolver() {
@Override
public String getServiceAuthority() {
return authority;
}
@Override
public void start(final Listener listener) {
listener.onUpdate(
Collections.singletonList(new ResolvedServerInfo(address, Attributes.EMPTY)),
Attributes.EMPTY);
}
@Override
public void shutdown() {}
};
}
}
}

View File

@ -96,7 +96,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
* @return a future for client transport. If no more transports can be created, e.g., channel is
* shut down, the future's value will be {@code null}.
*/
ListenableFuture<ClientTransport> get();
ListenableFuture<ClientTransport> get(CallOptions callOptions);
}
@ -139,7 +139,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
}
ClientStreamListener listener = new ClientStreamListenerImpl(observer);
ListenableFuture<ClientTransport> transportFuture = clientTransportProvider.get();
ListenableFuture<ClientTransport> transportFuture = clientTransportProvider.get(callOptions);
if (transportFuture.isDone()) {
// Try to skip DelayedStream when possible to avoid the overhead of a volatile read in the
// fast path. If that fails, stream will stay null and DelayedStream will be created.
@ -409,7 +409,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
StreamCreationTask() {
this.transportFuture = Preconditions.checkNotNull(
clientTransportProvider.get(), "transportFuture");
clientTransportProvider.get(callOptions), "transportFuture");
}
@Override

View File

@ -108,6 +108,8 @@ public interface ClientTransport {
* that this method is called without {@link #shutdown} being called. If the argument to this
* function is {@link Status#isOk}, it is safe to immediately reconnect.
*
* <p>This is called exactly once, and must be called prior to {@link #transportTerminated}.
*
* @param s the reason for the shutdown.
*/
void transportShutdown(Status s);
@ -115,7 +117,8 @@ public interface ClientTransport {
/**
* The transport completed shutting down. All resources have been released.
*
* <p> {@link #transportShutdown(Status)} must be called before calling this method.
* <p>This is called exactly once, and must be called after {@link #transportShutdown} has been
* called.
*/
void transportTerminated();

View File

@ -31,14 +31,15 @@
package io.grpc.internal;
import java.net.SocketAddress;
/** Pre-configured factory for creating {@link ClientTransport} instances. */
public interface ClientTransportFactory extends ReferenceCounted {
/** Creates an unstarted transport for exclusive use. */
ClientTransport newClientTransport();
/**
* Returns the authority of the channel. Typically, this should be in the form {@code host:port}.
* Note that since there is not a scheme, there can't be a default port.
* Creates an unstarted transport for exclusive use.
*
* @param serverAddress the address that the transport is connected to
* @param authority the HTTP/2 authority of the server
*/
String authority();
ClientTransport newClientTransport(SocketAddress serverAddress, String authority);
}

View File

@ -351,7 +351,7 @@ public final class GrpcUtil {
/**
* Shared executor for channels.
*/
static final Resource<ExecutorService> SHARED_CHANNEL_EXECUTOR =
public static final Resource<ExecutorService> SHARED_CHANNEL_EXECUTOR =
new Resource<ExecutorService>() {
private static final String name = "grpc-default-executor";
@Override

View File

@ -33,9 +33,11 @@ package io.grpc.internal;
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
@ -45,11 +47,21 @@ import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.DecompressorRegistry;
import io.grpc.ExperimentalApi;
import io.grpc.LoadBalancer;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo;
import io.grpc.Status;
import io.grpc.TransportManager;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
@ -90,8 +102,15 @@ public final class ManagedChannelImpl extends ManagedChannel {
*/
private final Channel interceptorChannel;
private final NameResolver nameResolver;
private final LoadBalancer loadBalancer;
/**
* Maps addresses to transports for that server.
*/
@GuardedBy("lock")
private TransportSet transportSet;
private final Map<SocketAddress, TransportSet> transports =
new HashMap<SocketAddress, TransportSet>();
@GuardedBy("lock")
private boolean shutdown;
@ -101,55 +120,21 @@ public final class ManagedChannelImpl extends ManagedChannel {
private volatile Compressor defaultCompressor;
private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
@Override
public ListenableFuture<ClientTransport> get() {
TransportSet transportSetCopy;
public ListenableFuture<ClientTransport> get(CallOptions callOptions) {
synchronized (lock) {
if (shutdown) {
return NULL_VALUE_TRANSPORT_FUTURE;
}
if (transportSet == null) {
transportSet = new TransportSet(backoffPolicyProvider, transportFactory,
scheduledExecutor, new TransportSet.Callback() {
@Override
public void onTerminated() {
synchronized (lock) {
if (shutdown) {
transportSet = null;
if (terminated) {
log.warning("transportTerminated called after already terminated");
}
terminated = true;
lock.notifyAll();
onChannelTerminated();
}
}
}
});
}
transportSetCopy = transportSet;
}
return transportSetCopy.obtainActiveTransport();
return loadBalancer.pickTransport(callOptions.getRequestKey());
}
};
// TODO(zhangkun83): remove this in favor of the one that accepts BackoffPolicy.Provider
ManagedChannelImpl(ClientTransportFactory transportFactory, @Nullable Executor executor,
@Nullable String userAgent, List<ClientInterceptor> interceptors) {
this(new ExponentialBackoffPolicy.Provider(), transportFactory, executor, userAgent,
interceptors);
}
ManagedChannelImpl(BackoffPolicy.Provider backoffPolicyProvider,
ManagedChannelImpl(URI targetUri, BackoffPolicy.Provider backoffPolicyProvider,
NameResolver.Factory nameResolverFactory, LoadBalancer.Factory loadBalancerFactory,
ClientTransportFactory transportFactory, @Nullable Executor executor,
@Nullable String userAgent, List<ClientInterceptor> interceptors) {
this.backoffPolicyProvider = backoffPolicyProvider;
this.transportFactory = transportFactory;
this.userAgent = userAgent;
this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE);
if (executor == null) {
usingSharedExecutor = true;
this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
@ -157,7 +142,28 @@ public final class ManagedChannelImpl extends ManagedChannel {
usingSharedExecutor = false;
this.executor = executor;
}
this.backoffPolicyProvider = backoffPolicyProvider;
this.nameResolver = nameResolverFactory.newNameResolver(targetUri);
Preconditions.checkArgument(this.nameResolver != null,
"The given NameResolverFactory cannot resolve %s", targetUri);
this.loadBalancer = loadBalancerFactory.newLoadBalancer(nameResolver.getServiceAuthority(), tm);
this.transportFactory = transportFactory;
this.userAgent = userAgent;
this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE);
this.nameResolver.start(new NameResolver.Listener() {
@Override
public void onUpdate(List<ResolvedServerInfo> servers, Attributes config) {
loadBalancer.handleResolvedAddresses(servers, config);
}
@Override
public void onError(Status error) {
Preconditions.checkArgument(!error.isOk(), "the error status must not be OK");
loadBalancer.handleNameResolutionError(error);
}
});
}
/**
@ -180,7 +186,7 @@ public final class ManagedChannelImpl extends ManagedChannel {
*/
@Override
public ManagedChannelImpl shutdown() {
TransportSet transportSetCopy = null;
ArrayList<TransportSet> transportsCopy = new ArrayList<TransportSet>();
synchronized (lock) {
if (shutdown) {
return this;
@ -188,16 +194,16 @@ public final class ManagedChannelImpl extends ManagedChannel {
shutdown = true;
// After shutdown there are no new calls, so no new cancellation tasks are needed
scheduledExecutor = SharedResourceHolder.release(TIMER_SERVICE, scheduledExecutor);
if (transportSet == null) {
if (transports.isEmpty()) {
terminated = true;
lock.notifyAll();
onChannelTerminated();
} else {
transportSetCopy = transportSet;
transportsCopy.addAll(transports.values());
}
}
if (transportSetCopy != null) {
transportSetCopy.shutdown();
for (TransportSet ts : transportsCopy) {
ts.shutdown();
}
return this;
}
@ -276,7 +282,8 @@ public final class ManagedChannelImpl extends ManagedChannel {
@Override
public String authority() {
return transportFactory.authority();
String authority = nameResolver.getServiceAuthority();
return Preconditions.checkNotNull(authority, "authority");
}
}
@ -290,4 +297,43 @@ public final class ManagedChannelImpl extends ManagedChannel {
// Release the transport factory so that it can deallocate any resources.
transportFactory.release();
}
private final TransportManager tm = new TransportManager() {
@Override
public void updateRetainedTransports(SocketAddress[] addrs) {
// TODO(zhangkun83): warm-up new servers and discard removed servers.
}
@Override
public ListenableFuture<ClientTransport> getTransport(final SocketAddress addr) {
TransportSet ts;
synchronized (lock) {
if (shutdown) {
return null;
}
ts = transports.get(addr);
if (ts == null) {
ts = new TransportSet(addr, authority(), loadBalancer, backoffPolicyProvider,
transportFactory, scheduledExecutor, new TransportSet.Callback() {
@Override
public void onTerminated() {
synchronized (lock) {
transports.remove(addr);
if (shutdown && transports.isEmpty()) {
if (terminated) {
log.warning("transportTerminated called after already terminated");
}
terminated = true;
lock.notifyAll();
onChannelTerminated();
}
}
}
});
transports.put(addr, ts);
}
}
return ts.obtainActiveTransport();
}
};
}

View File

@ -36,13 +36,16 @@ import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.LoadBalancer;
import io.grpc.Status;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@ -63,6 +66,8 @@ final class TransportSet {
}
private final Object lock = new Object();
private final SocketAddress server;
private final String authority;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Callback callback;
private final ClientTransportFactory transportFactory;
@ -84,6 +89,8 @@ final class TransportSet {
@GuardedBy("lock")
private final Collection<ClientTransport> transports = new ArrayList<ClientTransport>();
private final LoadBalancer loadBalancer;
@GuardedBy("lock")
private boolean shutdown;
@ -93,9 +100,12 @@ final class TransportSet {
*/
private volatile SettableFuture<ClientTransport> activeTransportFuture;
TransportSet(BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
Callback callback) {
TransportSet(SocketAddress server, String authority, LoadBalancer loadBalancer,
BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory,
ScheduledExecutorService scheduledExecutor, Callback callback) {
this.server = server;
this.authority = authority;
this.loadBalancer = loadBalancer;
this.backoffPolicyProvider = backoffPolicyProvider;
this.transportFactory = transportFactory;
this.scheduledExecutor = scheduledExecutor;
@ -156,8 +166,10 @@ final class TransportSet {
if (shutdown) {
return;
}
ClientTransport newActiveTransport = transportFactory.newClientTransport();
log.info("Created transport '" + newActiveTransport);
ClientTransport newActiveTransport = transportFactory.newClientTransport(
server, authority);
log.log(Level.INFO, "Created transport {0} for {1}",
new Object[] {newActiveTransport, server});
transports.add(newActiveTransport);
newActiveTransport.start(
new TransportListener(newActiveTransport, activeTransportFuture));
@ -222,30 +234,34 @@ final class TransportSet {
@Override
public void transportReady() {
synchronized (lock) {
log.info("Transport '" + transport + " is ready");
log.log(Level.INFO, "Transport {0} for {1} is ready", new Object[] {transport, server});
Preconditions.checkState(transportFuture.isDone(), "the transport future is not done");
if (isAttachedToActiveTransport()) {
reconnectPolicy = null;
}
}
loadBalancer.transportReady(server, transport);
}
@Override
public void transportShutdown(Status s) {
synchronized (lock) {
log.info("Transport '" + transport + " is being shutdown");
log.log(Level.INFO, "Transport {0} for {1} is being shutdown",
new Object[] {transport, server});
Preconditions.checkState(transportFuture.isDone(), "the transport future is not done");
if (isAttachedToActiveTransport()) {
createActiveTransportFuture();
}
}
loadBalancer.transportShutdown(server, transport, s);
}
@Override
public void transportTerminated() {
boolean runCallback = false;
synchronized (lock) {
log.info("Transport '" + transport + " is terminated");
log.log(Level.INFO, "Transport {0} for {1} is terminated",
new Object[] {transport, server});
Preconditions.checkState(!isAttachedToActiveTransport(),
"Listener is still attached to activeTransportFuture. "
+ "Seems transportTerminated was not called.");

View File

@ -113,6 +113,11 @@ public class ManagedChannelProviderTest {
protected ManagedChannelBuilder<?> builderForAddress(String host, int port) {
throw new UnsupportedOperationException();
}
@Override
protected ManagedChannelBuilder<?> builderForTarget(String target) {
throw new UnsupportedOperationException();
}
}
public static class Available0Provider extends BaseProvider {

View File

@ -0,0 +1,144 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.internal.ClientTransport;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.net.SocketAddress;
import java.util.ArrayList;
/** Unit test for {@link SimpleLoadBalancerFactory}. */
@RunWith(JUnit4.class)
public class SimpleLoadBalancerTest {
private LoadBalancer loadBalancer;
private ArrayList<ResolvedServerInfo> servers;
@Mock
private TransportManager mockTransportManager;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
loadBalancer = SimpleLoadBalancerFactory.getInstance().newLoadBalancer(
"fakeservice", mockTransportManager);
servers = new ArrayList<ResolvedServerInfo>();
for (int i = 0; i < 3; i++) {
servers.add(new ResolvedServerInfo(new FakeSocketAddress("server" + i), Attributes.EMPTY));
}
}
@Test
public void pickBeforeResolved() throws Exception {
ClientTransport mockTransport = mock(ClientTransport.class);
SettableFuture<ClientTransport> sourceFuture = SettableFuture.create();
when(mockTransportManager.getTransport(same(servers.get(0).getAddress())))
.thenReturn(sourceFuture);
ListenableFuture<ClientTransport> f1 = loadBalancer.pickTransport(null);
ListenableFuture<ClientTransport> f2 = loadBalancer.pickTransport(null);
assertNotNull(f1);
assertNotNull(f2);
assertNotSame(f1, f2);
assertFalse(f1.isDone());
assertFalse(f2.isDone());
verify(mockTransportManager, never()).getTransport(any(SocketAddress.class));
loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY);
verify(mockTransportManager, times(2)).getTransport(same(servers.get(0).getAddress()));
assertFalse(f1.isDone());
assertFalse(f2.isDone());
assertNotSame(sourceFuture, f1);
assertNotSame(sourceFuture, f2);
sourceFuture.set(mockTransport);
assertSame(mockTransport, f1.get());
assertSame(mockTransport, f2.get());
ListenableFuture<ClientTransport> f3 = loadBalancer.pickTransport(null);
assertSame(sourceFuture, f3);
verify(mockTransportManager, times(3)).getTransport(same(servers.get(0).getAddress()));
verifyNoMoreInteractions(mockTransportManager);
}
@Test
public void transportFailed() throws Exception {
ClientTransport mockTransport1 = mock(ClientTransport.class);
ClientTransport mockTransport2 = mock(ClientTransport.class);
when(mockTransportManager.getTransport(same(servers.get(0).getAddress()))).thenReturn(
Futures.immediateFuture(mockTransport1));
when(mockTransportManager.getTransport(same(servers.get(1).getAddress()))).thenReturn(
Futures.immediateFuture(mockTransport2));
loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY);
ListenableFuture<ClientTransport> f1 = loadBalancer.pickTransport(null);
ListenableFuture<ClientTransport> f2 = loadBalancer.pickTransport(null);
assertSame(mockTransport1, f1.get());
assertSame(mockTransport1, f2.get());
loadBalancer.transportShutdown(servers.get(0).getAddress(), mockTransport1, Status.INTERNAL);
ListenableFuture<ClientTransport> f3 = loadBalancer.pickTransport(null);
assertSame(mockTransport2, f3.get());
}
private static class FakeSocketAddress extends SocketAddress {
final String name;
FakeSocketAddress(String name) {
this.name = name;
}
@Override
public String toString() {
return "FakeSocketAddress-" + name;
}
}
}

View File

@ -94,7 +94,7 @@ public class ClientCallImplTest {
final ClientStream stream = mock(ClientStream.class);
ClientTransportProvider provider = new ClientTransportProvider() {
@Override
public ListenableFuture<ClientTransport> get() {
public ListenableFuture<ClientTransport> get(CallOptions callOptions) {
return Futures.immediateFuture(transport);
}
};

View File

@ -37,6 +37,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
@ -47,6 +48,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
@ -56,6 +58,9 @@ import io.grpc.IntegerMarshaller;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.NameResolver;
import io.grpc.ResolvedServerInfo;
import io.grpc.SimpleLoadBalancerFactory;
import io.grpc.Status;
import io.grpc.StringMarshaller;
@ -70,6 +75,8 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.net.SocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -81,15 +88,23 @@ import java.util.concurrent.atomic.AtomicLong;
/** Unit tests for {@link ManagedChannelImpl}. */
@RunWith(JUnit4.class)
public class ManagedChannelImplTest {
private static final List<ClientInterceptor> NO_INTERCEPTOR =
Collections.<ClientInterceptor>emptyList();
private final MethodDescriptor<String, Integer> method = MethodDescriptor.create(
MethodDescriptor.MethodType.UNKNOWN, "/service/method",
new StringMarshaller(), new IntegerMarshaller());
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final String serviceName = "fake.example.com";
private final URI target = URI.create("//" + serviceName);
private final String authority = serviceName;
private final SocketAddress socketAddress = new SocketAddress() {};
private final ResolvedServerInfo server = new ResolvedServerInfo(socketAddress, Attributes.EMPTY);
@Mock
private ClientTransport mockTransport;
@Mock
private ClientTransportFactory mockTransportFactory;
private ManagedChannel channel;
@Mock
@ -104,16 +119,18 @@ public class ManagedChannelImplTest {
private ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
ArgumentCaptor.forClass(ClientStreamListener.class);
private void createChannel(List<ClientInterceptor> interceptors) throws Exception {
channel = new ManagedChannelImpl(new FakeBackoffPolicyProvider(), mockTransportFactory,
executor, null, interceptors);
private ManagedChannel createChannel(
NameResolver.Factory nameResolverFactory, List<ClientInterceptor> interceptors) {
return new ManagedChannelImpl(target, new FakeBackoffPolicyProvider(),
nameResolverFactory, SimpleLoadBalancerFactory.getInstance(),
mockTransportFactory, executor, null, interceptors);
}
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
createChannel(Collections.<ClientInterceptor>emptyList());
when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport);
when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class)))
.thenReturn(mockTransport);
}
@After
@ -123,6 +140,7 @@ public class ManagedChannelImplTest {
@Test
public void immediateDeadlineExceeded() {
ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR);
ClientCall<String, Integer> call =
channel.newCall(method, CallOptions.DEFAULT.withDeadlineNanoTime(System.nanoTime()));
call.start(mockCallListener, new Metadata());
@ -132,6 +150,7 @@ public class ManagedChannelImplTest {
@Test
public void shutdownWithNoTransportsEverCreated() {
ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR);
verifyNoMoreInteractions(mockTransportFactory);
channel.shutdown();
assertTrue(channel.isShutdown());
@ -140,6 +159,8 @@ public class ManagedChannelImplTest {
@Test
public void twoCallsAndGracefulShutdown() {
ManagedChannel channel = createChannel(
new FakeNameResolverFactory(server), Collections.<ClientInterceptor>emptyList());
verifyNoMoreInteractions(mockTransportFactory);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
verifyNoMoreInteractions(mockTransportFactory);
@ -148,11 +169,13 @@ public class ManagedChannelImplTest {
ClientTransport mockTransport = mock(ClientTransport.class);
ClientStream mockStream = mock(ClientStream.class);
Metadata headers = new Metadata();
when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport);
when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class)))
.thenReturn(mockTransport);
when(mockTransport.newStream(same(method), same(headers), any(ClientStreamListener.class)))
.thenReturn(mockStream);
call.start(mockCallListener, headers);
verify(mockTransportFactory, timeout(1000)).newClientTransport();
verify(mockTransportFactory, timeout(1000))
.newClientTransport(same(socketAddress), eq(authority));
verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture());
ClientTransport.Listener transportListener = transportListenerCaptor.getValue();
verify(mockTransport, timeout(1000))
@ -206,6 +229,7 @@ public class ManagedChannelImplTest {
@Test
public void transportFailsOnStart() {
ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR);
Status goldenStatus = Status.INTERNAL.withDescription("wanted it to fail");
// mockTransport2 shuts immediately during start
@ -230,10 +254,12 @@ public class ManagedChannelImplTest {
when(mockTransport2.newStream(same(method), same(headers2), any(ClientStreamListener.class)))
.thenReturn(mockStream2);
// The factory returns the immediately shut-down transport first, then the normal one
when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport2, mockTransport);
when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class)))
.thenReturn(mockTransport2, mockTransport);
call.start(mockCallListener2, headers2);
verify(mockTransportFactory, timeout(1000).times(2)).newClientTransport();
verify(mockTransportFactory, timeout(1000).times(2))
.newClientTransport(same(socketAddress), eq(authority));
verify(mockTransport2, timeout(1000)).start(any(ClientTransport.Listener.class));
verify(mockTransport2, timeout(1000))
.newStream(same(method), same(headers2), streamListenerCaptor.capture());
@ -268,7 +294,7 @@ public class ManagedChannelImplTest {
transportListenerCaptor.getValue().transportTerminated();
assertTrue(channel.isTerminated());
verify(mockTransportFactory, times(2)).newClientTransport();
verify(mockTransportFactory, times(2)).newClientTransport(same(socketAddress), eq(authority));
verifyNoMoreInteractions(mockTransport);
verifyNoMoreInteractions(mockTransport2);
verifyNoMoreInteractions(mockStream2);
@ -286,13 +312,15 @@ public class ManagedChannelImplTest {
return next.newCall(method, callOptions);
}
};
createChannel(Arrays.asList(interceptor));
ManagedChannel channel = createChannel(
new FakeNameResolverFactory(server), Arrays.asList(interceptor));
assertNotNull(channel.newCall(method, CallOptions.DEFAULT));
assertEquals(1, atomic.get());
}
@Test
public void testNoDeadlockOnShutdown() {
ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR);
// Force creation of transport
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
Metadata headers = new Metadata();
@ -341,6 +369,18 @@ public class ManagedChannelImplTest {
transportListener.transportTerminated();
}
@Test
public void nameResolutionFailed() {
Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
ManagedChannel channel = createChannel(new FailingNameResolverFactory(error), NO_INTERCEPTOR);
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(mockCallListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class));
Status status = statusCaptor.getValue();
assertSame(error, status);
}
private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider {
@Override
public BackoffPolicy get() {
@ -352,4 +392,53 @@ public class ManagedChannelImplTest {
};
}
}
private class FakeNameResolverFactory extends NameResolver.Factory {
final ResolvedServerInfo server;
FakeNameResolverFactory(ResolvedServerInfo server) {
this.server = server;
}
@Override
public NameResolver newNameResolver(final URI targetUri) {
assertEquals(null, targetUri.getScheme());
assertEquals(serviceName, targetUri.getAuthority());
return new NameResolver() {
@Override public String getServiceAuthority() {
assertNotNull(targetUri.toString() + " has authority", targetUri.getAuthority());
return targetUri.getAuthority();
}
@Override public void start(final Listener listener) {
listener.onUpdate(Collections.singletonList(server), Attributes.EMPTY);
}
@Override public void shutdown() {}
};
}
}
private class FailingNameResolverFactory extends NameResolver.Factory {
final Status error;
FailingNameResolverFactory(Status error) {
this.error = error;
}
@Override
public NameResolver newNameResolver(URI notUsedUri) {
return new NameResolver() {
@Override public String getServiceAuthority() {
return "irrelevant-authority";
}
@Override public void start(final Listener listener) {
listener.onError(error);
}
@Override public void shutdown() {}
};
}
}
}

View File

@ -50,6 +50,7 @@ import io.netty.handler.ssl.SslContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
@ -61,8 +62,6 @@ import javax.net.ssl.SSLException;
public class NettyChannelBuilder extends AbstractManagedChannelImplBuilder<NettyChannelBuilder> {
public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB
private final SocketAddress serverAddress;
private String authority;
private NegotiationType negotiationType = NegotiationType.TLS;
private Class<? extends Channel> channelType = NioSocketChannel.class;
@Nullable
@ -78,30 +77,39 @@ public class NettyChannelBuilder extends AbstractManagedChannelImplBuilder<Netty
* noticing changes to DNS.
*/
public static NettyChannelBuilder forAddress(SocketAddress serverAddress) {
String authority;
if (serverAddress instanceof InetSocketAddress) {
InetSocketAddress address = (InetSocketAddress) serverAddress;
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
} else {
// Specialized address types are allowed to support custom Channel types so just assume their
// toString() values are valid :authority values. We defer checking validity of authority
// until buildTransportFactory() to provide the user an opportunity to override the value.
authority = serverAddress.toString();
}
return new NettyChannelBuilder(serverAddress, authority);
return new NettyChannelBuilder(serverAddress);
}
/**
* Creates a new builder with the given host and port.
*/
public static NettyChannelBuilder forAddress(String host, int port) {
return new NettyChannelBuilder(
new InetSocketAddress(host, port), GrpcUtil.authorityFromHostAndPort(host, port));
return forAddress(new InetSocketAddress(host, port));
}
protected NettyChannelBuilder(SocketAddress serverAddress, String authority) {
this.serverAddress = serverAddress;
this.authority = authority;
/**
* Creates a new builder with the given target URI that will be resolved by
* {@link io.grpc.NameResolver}.
*/
public static NettyChannelBuilder forTarget(String targetUri) {
return new NettyChannelBuilder(URI.create(targetUri));
}
private NettyChannelBuilder(URI target) {
super(target);
}
protected NettyChannelBuilder(SocketAddress address) {
super(address, getAuthorityFromAddress(address));
}
private static String getAuthorityFromAddress(SocketAddress address) {
if (address instanceof InetSocketAddress) {
InetSocketAddress inetAddress = (InetSocketAddress) address;
return GrpcUtil.authorityFromHostAndPort(inetAddress.getHostString(), inetAddress.getPort());
} else {
return address.toString();
}
}
/**
@ -180,28 +188,14 @@ public class NettyChannelBuilder extends AbstractManagedChannelImplBuilder<Netty
}
@Override
public final NettyChannelBuilder overrideAuthority(String authority) {
this.authority = checkAuthority(authority);
return this;
protected ClientTransportFactory buildTransportFactory() {
return new NettyTransportFactory(channelType, negotiationType, sslContext,
eventLoopGroup, flowControlWindow, maxMessageSize);
}
/**
* Verifies the authority is valid. This method exists as an escape hatch for putting in an
* authority that is valid, but would fail the default validation provided by this implementation.
*/
protected String checkAuthority(String authority) {
return GrpcUtil.checkAuthority(authority);
}
@Override
protected final ClientTransportFactory buildTransportFactory() {
// Check authority, since non-inet ServerAddresses delay the authority check.
checkAuthority(authority);
return new NettyTransportFactory(serverAddress, authority, channelType, eventLoopGroup,
flowControlWindow, createProtocolNegotiator(), maxMessageSize);
}
private ProtocolNegotiator createProtocolNegotiator() {
private static ProtocolNegotiator createProtocolNegotiator(
String authority, NegotiationType negotiationType, SslContext sslContext) {
ProtocolNegotiator negotiator;
switch (negotiationType) {
case PLAINTEXT:
return ProtocolNegotiators.plaintext();
@ -223,29 +217,25 @@ public class NettyChannelBuilder extends AbstractManagedChannelImplBuilder<Netty
private static class NettyTransportFactory extends AbstractReferenceCounted
implements ClientTransportFactory {
private final SocketAddress serverAddress;
private final Class<? extends Channel> channelType;
private final NegotiationType negotiationType;
private final SslContext sslContext;
private final EventLoopGroup group;
private final boolean usingSharedGroup;
private final int flowControlWindow;
private final ProtocolNegotiator negotiator;
private final int maxMessageSize;
private final String authority;
private NettyTransportFactory(SocketAddress serverAddress,
String authority,
Class<? extends Channel> channelType,
private NettyTransportFactory(Class<? extends Channel> channelType,
NegotiationType negotiationType,
SslContext sslContext,
EventLoopGroup group,
int flowControlWindow,
ProtocolNegotiator negotiator,
int maxMessageSize) {
this.serverAddress = serverAddress;
this.channelType = channelType;
this.negotiationType = negotiationType;
this.sslContext = sslContext;
this.flowControlWindow = flowControlWindow;
this.negotiator = negotiator;
this.maxMessageSize = maxMessageSize;
this.authority = authority;
usingSharedGroup = group == null;
if (usingSharedGroup) {
// The group was unspecified, using the shared group.
@ -256,14 +246,11 @@ public class NettyChannelBuilder extends AbstractManagedChannelImplBuilder<Netty
}
@Override
public ClientTransport newClientTransport() {
return new NettyClientTransport(serverAddress, channelType, group, negotiator,
flowControlWindow, maxMessageSize, authority);
}
@Override
public String authority() {
return authority;
public ClientTransport newClientTransport(SocketAddress serverAddress, String authority) {
GrpcUtil.checkAuthority(authority);
return new NettyClientTransport(serverAddress, channelType, group,
createProtocolNegotiator(authority, negotiationType, sslContext),
flowControlWindow, maxMessageSize, authority);
}
@Override

View File

@ -51,4 +51,9 @@ public class NettyChannelProvider extends ManagedChannelProvider {
protected NettyChannelBuilder builderForAddress(String name, int port) {
return NettyChannelBuilder.forAddress(name, port);
}
@Override
protected NettyChannelBuilder builderForTarget(String target) {
return NettyChannelBuilder.forTarget(target);
}
}

View File

@ -31,8 +31,6 @@
package io.grpc.netty;
import static org.junit.Assert.assertEquals;
import io.grpc.internal.ClientTransportFactory;
import org.junit.Rule;
@ -51,18 +49,28 @@ public class NettyChannelBuilderTest {
@Test
public void overrideAllowsInvalidAuthority() {
NettyChannelBuilder builder = new NettyChannelBuilder(new SocketAddress(){}, "") {
NettyChannelBuilder builder = new NettyChannelBuilder(new SocketAddress(){}) {
@Override
protected String checkAuthority(String authority) {
return authority;
}
};
ClientTransportFactory factory = builder.overrideAuthority("invalid_authority")
ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority")
.negotiationType(NegotiationType.PLAINTEXT)
.buildTransportFactory();
}
assertEquals("invalid_authority", factory.authority());
@Test
public void failOverrideInvalidAuthority() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid authority:");
NettyChannelBuilder builder = new NettyChannelBuilder(new SocketAddress(){});
ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority")
.negotiationType(NegotiationType.PLAINTEXT)
.buildTransportFactory();
}
@Test

View File

@ -50,6 +50,9 @@ import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.SharedResourceHolder.Resource;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -99,19 +102,27 @@ public class OkHttpChannelBuilder extends
return new OkHttpChannelBuilder(host, port);
}
/**
* Creates a new builder for the given target URI that will be resolved by
* {@link io.grpc.NameResolver}.
*/
public static OkHttpChannelBuilder forTarget(String targetUri) {
return new OkHttpChannelBuilder(URI.create(targetUri));
}
private Executor transportExecutor;
private final String host;
private final int port;
private String authority;
private SSLSocketFactory sslSocketFactory;
private ConnectionSpec connectionSpec = DEFAULT_CONNECTION_SPEC;
private NegotiationType negotiationType = NegotiationType.TLS;
private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
protected OkHttpChannelBuilder(String host, int port) {
this.host = Preconditions.checkNotNull(host);
this.port = port;
this.authority = GrpcUtil.authorityFromHostAndPort(host, port);
this(URI.create("dns:///" + GrpcUtil.authorityFromHostAndPort(host, port)));
}
private OkHttpChannelBuilder(URI target) {
super(target);
}
/**
@ -125,27 +136,6 @@ public class OkHttpChannelBuilder extends
return this;
}
/**
* Overrides the host used with TLS and HTTP virtual hosting. It does not change what host is
* actually connected to. This method differs from {@link #overrideAuthority(String)} in that it
* appends the port number to the host provided.
*
* <p>Should only used by tests.
*
* @deprecated use {@link #overrideAuthority} instead
*/
@Deprecated
public final OkHttpChannelBuilder overrideHostForAuthority(String host) {
this.authority = GrpcUtil.authorityFromHostAndPort(host, this.port);
return this;
}
@Override
public final OkHttpChannelBuilder overrideAuthority(String authority) {
this.authority = checkAuthority(authority);
return this;
}
/**
* Sets the negotiation type for the HTTP/2 connection.
*
@ -205,18 +195,10 @@ public class OkHttpChannelBuilder extends
@Override
protected final ClientTransportFactory buildTransportFactory() {
return new OkHttpTransportFactory(host, port, authority, transportExecutor,
return new OkHttpTransportFactory(transportExecutor,
createSocketFactory(), connectionSpec, maxMessageSize);
}
/**
* Verifies the authority is valid. This method exists as an escape hatch for putting in an
* authority that is valid, but would fail the default validation provided by this implementation.
*/
protected String checkAuthority(String authority) {
return GrpcUtil.checkAuthority(authority);
}
private SSLSocketFactory createSocketFactory() {
switch (negotiationType) {
case TLS:
@ -231,25 +213,16 @@ public class OkHttpChannelBuilder extends
private static class OkHttpTransportFactory extends AbstractReferenceCounted
implements ClientTransportFactory {
private final String host;
private final int port;
private final String authority;
private final Executor executor;
private final boolean usingSharedExecutor;
private final SSLSocketFactory socketFactory;
private final ConnectionSpec connectionSpec;
private final int maxMessageSize;
private OkHttpTransportFactory(String host,
int port,
String authority,
Executor executor,
private OkHttpTransportFactory(Executor executor,
SSLSocketFactory socketFactory,
ConnectionSpec connectionSpec,
int maxMessageSize) {
this.host = host;
this.port = port;
this.authority = authority;
this.socketFactory = socketFactory;
this.connectionSpec = connectionSpec;
this.maxMessageSize = maxMessageSize;
@ -264,14 +237,10 @@ public class OkHttpChannelBuilder extends
}
@Override
public ClientTransport newClientTransport() {
return new OkHttpClientTransport(host, port, authority, executor, socketFactory,
Utils.convertSpec(connectionSpec), maxMessageSize);
}
@Override
public String authority() {
return authority;
public ClientTransport newClientTransport(SocketAddress addr, String authority) {
InetSocketAddress inetSocketAddr = (InetSocketAddress) addr;
return new OkHttpClientTransport(inetSocketAddr.getHostString(), inetSocketAddr.getPort(),
authority, executor, socketFactory, Utils.convertSpec(connectionSpec), maxMessageSize);
}
@Override

View File

@ -61,4 +61,9 @@ public class OkHttpChannelProvider extends ManagedChannelProvider {
protected OkHttpChannelBuilder builderForAddress(String name, int port) {
return OkHttpChannelBuilder.forAddress(name, port);
}
@Override
protected OkHttpChannelBuilder builderForTarget(String target) {
return OkHttpChannelBuilder.forTarget(target);
}
}

View File

@ -31,8 +31,6 @@
package io.grpc.okhttp;
import static org.junit.Assert.assertEquals;
import io.grpc.internal.ClientTransportFactory;
import org.junit.Rule;
@ -55,11 +53,20 @@ public class OkHttpChannelBuilderTest {
}
};
ClientTransportFactory factory = builder.overrideAuthority("invalid_authority")
ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority")
.negotiationType(NegotiationType.PLAINTEXT)
.buildTransportFactory();
}
assertEquals("invalid_authority", factory.authority());
@Test
public void failOverrideInvalidAuthority() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid authority:");
OkHttpChannelBuilder builder = new OkHttpChannelBuilder("good", 1234);
ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority")
.negotiationType(NegotiationType.PLAINTEXT)
.buildTransportFactory();
}
@Test