diff --git a/core/src/main/java/io/grpc/Attributes.java b/core/src/main/java/io/grpc/Attributes.java new file mode 100644 index 0000000000..9508ecffec --- /dev/null +++ b/core/src/main/java/io/grpc/Attributes.java @@ -0,0 +1,116 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import com.google.common.base.Preconditions; + +import java.util.HashMap; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +/** + * An immutable type-safe container of attributes. + */ +@ExperimentalApi +@Immutable +public final class Attributes { + + private final HashMap data = new HashMap(); + + public static final Attributes EMPTY = new Attributes(); + + private Attributes() { + } + + /** + * Gets the value for the key, or {@code null} if it's not present. + */ + @SuppressWarnings("unchecked") + @Nullable + public T get(Key key) { + return (T) data.get(key.name); + } + + /** + * Create a new builder. + */ + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Key { + private final String name; + + /** + * Construct the key. + * + * @param name the name, which should be namespaced like com.foo.BarAttribute to avoid + * collision. + */ + public Key(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } + + @Override + public String toString() { + return data.toString(); + } + + public static final class Builder { + private Attributes product; + + private Builder() { + this.product = new Attributes(); + } + + public void set(Key key, T value) { + product.data.put(key.name, value); + } + + /** + * Build the attributes. Can only be called once. + */ + public Attributes build() { + Preconditions.checkState(product != null, "Already built"); + Attributes result = product; + product = null; + return result; + } + } +} diff --git a/core/src/main/java/io/grpc/CallOptions.java b/core/src/main/java/io/grpc/CallOptions.java index f73c2ed203..5d7f2e21ad 100644 --- a/core/src/main/java/io/grpc/CallOptions.java +++ b/core/src/main/java/io/grpc/CallOptions.java @@ -62,6 +62,9 @@ public final class CallOptions { @Nullable private String authority; + @Nullable + private RequestKey requestKey; + /** * Override the HTTP/2 authority the channel claims to be connecting to. This is not * generally safe. Overriding allows advanced users to re-use a single Channel for multiple @@ -130,6 +133,25 @@ public final class CallOptions { return newOptions; } + /** + * Returns a new {@code CallOptions} with a request key for affinity-based routing. + */ + @ExperimentalApi + public CallOptions withRequestKey(@Nullable RequestKey requestKey) { + CallOptions newOptions = new CallOptions(this); + newOptions.requestKey = requestKey; + return newOptions; + } + + /** + * Returns the request key for affinity-based routing. + */ + @ExperimentalApi + @Nullable + public RequestKey getRequestKey() { + return requestKey; + } + /** * Override the HTTP/2 authority the channel claims to be connecting to. This is not * generally safe. Overriding allows advanced users to re-use a single Channel for multiple @@ -155,6 +177,7 @@ public final class CallOptions { deadlineNanoTime = other.deadlineNanoTime; compressor = other.compressor; authority = other.authority; + requestKey = other.requestKey; } @SuppressWarnings("deprecation") // guava 14.0 diff --git a/core/src/main/java/io/grpc/DnsNameResolverFactory.java b/core/src/main/java/io/grpc/DnsNameResolverFactory.java new file mode 100644 index 0000000000..e4367475c3 --- /dev/null +++ b/core/src/main/java/io/grpc/DnsNameResolverFactory.java @@ -0,0 +1,138 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import com.google.common.base.Preconditions; + +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.SharedResourceHolder; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; + +import javax.annotation.Nullable; + +/** + * A factory for DNS-based {@link NameResolver}s. + * + *

The format of the target URI is {@code "[dns://[]/]"}. + */ +@ExperimentalApi +public final class DnsNameResolverFactory extends NameResolver.Factory { + + private static final DnsNameResolverFactory instance = new DnsNameResolverFactory(); + + @Override + public NameResolver newNameResolver(URI targetUri) { + String scheme = targetUri.getScheme(); + if (scheme == null) { + return new DnsNameResolver(null, targetUri.toString()); + } else if (scheme.equals("dns")) { + String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath"); + Preconditions.checkArgument(targetPath.startsWith("/"), + "the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri); + String name = targetPath.substring(1); + return new DnsNameResolver(targetUri.getAuthority(), name); + } else { + return null; + } + } + + private DnsNameResolverFactory() { + } + + public static DnsNameResolverFactory getInstance() { + return instance; + } + + private static class DnsNameResolver extends NameResolver { + private final String authority; + private final String host; + private final int port; + private ExecutorService executor; + + DnsNameResolver(@Nullable String nsAuthority, String name) { + // TODO: if a DNS server is provided as nsAuthority, use it. + // https://www.captechconsulting.com/blogs/accessing-the-dusty-corners-of-dns-with-java + + // Must prepend a "//" to the name when constructing a URI, otherwise + // the authority and host of the resulted URI would be null. + URI nameUri = URI.create("//" + name); + authority = Preconditions.checkNotNull(nameUri.getAuthority(), + "nameUri (%s) doesn't have an authority", nameUri); + host = Preconditions.checkNotNull(nameUri.getHost(), "host"); + port = nameUri.getPort(); + Preconditions.checkArgument(port > 0, "port (%s) must be positive", port); + } + + @Override + public String getServiceAuthority() { + return authority; + } + + @Override + public synchronized void start(final Listener listener) { + Preconditions.checkState(executor == null, "already started"); + executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR); + executor.execute(new Runnable() { + @Override + public void run() { + InetAddress[] inetAddrs; + try { + inetAddrs = InetAddress.getAllByName(host); + } catch (Exception e) { + listener.onError(Status.UNAVAILABLE.withCause(e)); + return; + } + ArrayList servers + = new ArrayList(inetAddrs.length); + for (int i = 0; i < inetAddrs.length; i++) { + InetAddress inetAddr = inetAddrs[i]; + servers.add( + new ResolvedServerInfo(new InetSocketAddress(inetAddr, port), Attributes.EMPTY)); + } + listener.onUpdate(servers, Attributes.EMPTY); + } + }); + } + + @Override + public synchronized void shutdown() { + if (executor != null) { + executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor); + } + } + } +} diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java new file mode 100644 index 0000000000..3449021dab --- /dev/null +++ b/core/src/main/java/io/grpc/LoadBalancer.java @@ -0,0 +1,103 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import com.google.common.util.concurrent.ListenableFuture; + +import io.grpc.internal.ClientTransport; + +import java.net.SocketAddress; +import java.util.List; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A pluggable component that receives resolved addresses from {@link NameResolver} and provides the + * channel a usable transport when asked. + * + *

Note to implementations: all methods are expected to return quickly. Any work that may block + * should be done asynchronously. + */ +// TODO(zhangkun83): since it's also used for non-loadbalancing cases like pick-first, +// "RequestRouter" might be a better name. +@ExperimentalApi +@ThreadSafe +public abstract class LoadBalancer { + /** + * Pick a transport that Channel will use for next RPC. + * + * @param requestKey for affinity-based routing + */ + public abstract ListenableFuture pickTransport( + @Nullable RequestKey requestKey); + + /** + * Shuts down this {@code LoadBalancer}. + */ + public void shutdown() { } + + /** + * Handles newly resolved addresses and service config from name resolution system. + * + *

Implementations should not modify the given {@code servers}. + */ + public void handleResolvedAddresses(List servers, Attributes config) { } + + /** + * Handles an error from the name resolution system. + * + * @param error a non-OK status + */ + public void handleNameResolutionError(Status error) { } + + /** + * Called when a transport is fully connected and ready to accept traffic. + */ + public void transportReady(SocketAddress addr, ClientTransport transport) { } + + /** + * Called when a transport is shutting down. + */ + public void transportShutdown(SocketAddress addr, ClientTransport transport, Status s) { } + + public abstract static class Factory { + /** + * Creates a {@link LoadBalancer} that will be used inside a channel. + * + * @param serviceName the DNS-style service name, which is also the authority + * @param tm the interface where an {@code LoadBalancer} implementation gets connected + * transports from + */ + public abstract LoadBalancer newLoadBalancer(String serviceName, TransportManager tm); + } +} diff --git a/core/src/main/java/io/grpc/ManagedChannelBuilder.java b/core/src/main/java/io/grpc/ManagedChannelBuilder.java index 0e5e33b7c1..17c3091880 100644 --- a/core/src/main/java/io/grpc/ManagedChannelBuilder.java +++ b/core/src/main/java/io/grpc/ManagedChannelBuilder.java @@ -44,6 +44,11 @@ public abstract class ManagedChannelBuilder> return ManagedChannelProvider.provider().builderForAddress(name, port); } + @ExperimentalApi + public static ManagedChannelBuilder forTarget(String target) { + return ManagedChannelProvider.provider().builderForTarget(target); + } + /** * Provides a custom executor. * @@ -99,6 +104,24 @@ public abstract class ManagedChannelBuilder> @ExperimentalApi("primarily for testing") public abstract T usePlaintext(boolean skipNegotiation); + /* + * Provides a custom {@link NameResolver.Factory} for the channel. + * + *

If this method is not called, the builder will look up in the global resolver registry for + * a factory for the provided target. + */ + @ExperimentalApi + public abstract T nameResolverFactory(NameResolver.Factory resolverFactory); + + /** + * Provides a custom {@link LoadBalancer.Factory} for the channel. + * + *

If this method is not called, the builder will use {@link SimpleLoadBalancerFactory} for the + * channel. + */ + @ExperimentalApi + public abstract T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory); + /** * Builds a channel using the given parameters. */ diff --git a/core/src/main/java/io/grpc/ManagedChannelProvider.java b/core/src/main/java/io/grpc/ManagedChannelProvider.java index 991b8e1ec0..434a007fcd 100644 --- a/core/src/main/java/io/grpc/ManagedChannelProvider.java +++ b/core/src/main/java/io/grpc/ManagedChannelProvider.java @@ -106,6 +106,12 @@ public abstract class ManagedChannelProvider { */ protected abstract ManagedChannelBuilder builderForAddress(String name, int port); + /** + * Creates a new builder with the given target URI. + */ + @ExperimentalApi + protected abstract ManagedChannelBuilder builderForTarget(String target); + public static final class ProviderNotFoundException extends RuntimeException { public ProviderNotFoundException(String msg) { super(msg); diff --git a/core/src/main/java/io/grpc/NameResolver.java b/core/src/main/java/io/grpc/NameResolver.java new file mode 100644 index 0000000000..22a5e61801 --- /dev/null +++ b/core/src/main/java/io/grpc/NameResolver.java @@ -0,0 +1,101 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import java.net.URI; +import java.util.List; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A pluggable component that resolves a target URI (which is broken down into 3 parts as described + * below) and return addresses to the caller. + * + *

The format of the target URI is {@code "[:]"} + * + *

{@code NameResolver} has no knowledge of load-balancing. The addresses of a target may be + * changed over time, thus the caller registers a {@link Listener} to receive continuous updates. + */ +@ExperimentalApi +@ThreadSafe +public abstract class NameResolver { + /** + * Returns the authority, which is also the name of the service. + * + *

An implementation must generate it locally and must keep it unchanged. + */ + public abstract String getServiceAuthority(); + + /** + * Starts the resolution. + * + * @param listener used to receive updates on the target + */ + public abstract void start(Listener listener); + + /** + * Stops the resolution. Updates to the Listener will stop. + */ + public abstract void shutdown(); + + public abstract static class Factory { + /** + * Creates a {@link NameResolver} for the given target URI, or {@code null} if the given URI + * cannot be resolved by this factory. + */ + @Nullable + public abstract NameResolver newNameResolver(URI targetUri); + } + + /** + * Receives address updates. + * + *

All methods are expected to return quickly. + */ + @ThreadSafe + public interface Listener { + /** + * Handles updates on resolved addresses and config. + * + *

Implementations will not modify the given {@code servers}. + */ + void onUpdate(List servers, Attributes config); + + /** + * Handles an error from the resolver. + * + * @param error a non-OK status + */ + void onError(Status error); + } +} diff --git a/core/src/main/java/io/grpc/RequestKey.java b/core/src/main/java/io/grpc/RequestKey.java new file mode 100644 index 0000000000..6ba6bcfb9a --- /dev/null +++ b/core/src/main/java/io/grpc/RequestKey.java @@ -0,0 +1,44 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +/** + * A key generated from an RPC request, and to be used for affinity-based + * routing. + */ +@ExperimentalApi +public final class RequestKey { + + // TODO(zhangkun83): materialize this class once we decide the form of the affinity key. + private RequestKey() { + } +} diff --git a/core/src/main/java/io/grpc/ResolvedServerInfo.java b/core/src/main/java/io/grpc/ResolvedServerInfo.java new file mode 100644 index 0000000000..2e914de3ab --- /dev/null +++ b/core/src/main/java/io/grpc/ResolvedServerInfo.java @@ -0,0 +1,76 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import java.net.SocketAddress; + +import javax.annotation.concurrent.Immutable; + +/** + * The information about a server from a {@link NameResolver}. + */ +@ExperimentalApi +@Immutable +public final class ResolvedServerInfo { + private final SocketAddress address; + private final Attributes attributes; + + /** + * Constructor. + * + * @param address the address object + * @param attributes attributes associated with this address. + */ + public ResolvedServerInfo(SocketAddress address, Attributes attributes) { + this.address = address; + this.attributes = attributes; + } + + /** + * Returns the address. + */ + public SocketAddress getAddress() { + return address; + } + + /** + * Returns the associated attributes. + */ + public Attributes getAttributes() { + return attributes; + } + + @Override + public String toString() { + return "[address=" + address + ", attrs=" + attributes + "]"; + } +} diff --git a/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java b/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java new file mode 100644 index 0000000000..b8d07f0f9e --- /dev/null +++ b/core/src/main/java/io/grpc/SimpleLoadBalancerFactory.java @@ -0,0 +1,179 @@ +/* + * Copyright 2014, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +import io.grpc.internal.ClientTransport; + +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +/** + * A {@link LoadBalancer} that provides simple round-robin and pick-first routing mechanism over the + * addresses from the {@link NameResolver}. + */ +// TODO(zhangkun83): Only pick-first is implemented. We need to implement round-robin. +public final class SimpleLoadBalancerFactory extends LoadBalancer.Factory { + + private static final SimpleLoadBalancerFactory instance = new SimpleLoadBalancerFactory(); + + private SimpleLoadBalancerFactory() { + } + + public static SimpleLoadBalancerFactory getInstance() { + return instance; + } + + @Override + public LoadBalancer newLoadBalancer(String serviceName, TransportManager tm) { + return new SimpleLoadBalancer(tm); + } + + private static class SimpleLoadBalancer extends LoadBalancer { + @GuardedBy("servers") + private final List servers = new ArrayList(); + @GuardedBy("servers") + private int currentServerIndex; + // TODO(zhangkun83): virtually any LoadBalancer would need to handle picks before name + // resolution is done, we may want to move the related logic into ManagedChannelImpl. + @GuardedBy("servers") + private List> pendingPicks; + @GuardedBy("servers") + private StatusException nameResolutionError; + + private final TransportManager tm; + + private SimpleLoadBalancer(TransportManager tm) { + this.tm = tm; + } + + @Override + public ListenableFuture pickTransport(@Nullable RequestKey requestKey) { + ResolvedServerInfo currentServer; + synchronized (servers) { + if (servers.isEmpty()) { + if (nameResolutionError != null) { + return Futures.immediateFailedFuture(nameResolutionError); + } + SettableFuture future = SettableFuture.create(); + if (pendingPicks == null) { + pendingPicks = new ArrayList>(); + } + pendingPicks.add(future); + return future; + } + currentServer = servers.get(currentServerIndex); + } + return tm.getTransport(currentServer.getAddress()); + } + + @Override + public void handleResolvedAddresses( + List updatedServers, Attributes config) { + List> pendingPicksCopy = null; + ResolvedServerInfo currentServer = null; + synchronized (servers) { + nameResolutionError = null; + servers.clear(); + for (ResolvedServerInfo addr : updatedServers) { + servers.add(addr); + } + if (!servers.isEmpty()) { + pendingPicksCopy = pendingPicks; + pendingPicks = null; + if (currentServerIndex >= servers.size()) { + currentServerIndex = 0; + } + currentServer = servers.get(currentServerIndex); + } + } + if (pendingPicksCopy != null) { + // If pendingPicksCopy != null, then servers.isEmpty() == false, then + // currentServer must have been assigned. + Preconditions.checkState(currentServer != null, "currentServer is null"); + for (final SettableFuture pendingPick : pendingPicksCopy) { + ListenableFuture future = tm.getTransport(currentServer.getAddress()); + Futures.addCallback(future, new FutureCallback() { + @Override public void onSuccess(ClientTransport result) { + pendingPick.set(result); + } + + @Override public void onFailure(Throwable t) { + pendingPick.setException(t); + } + }); + } + } + } + + @Override + public void handleNameResolutionError(Status error) { + List> pendingPicksCopy = null; + StatusException statusException = error.asException(); + synchronized (servers) { + pendingPicksCopy = pendingPicks; + pendingPicks = null; + nameResolutionError = statusException; + } + if (pendingPicksCopy != null) { + for (SettableFuture pendingPick : pendingPicksCopy) { + pendingPick.setException(statusException); + } + } + } + + @Override + public void transportShutdown(SocketAddress addr, ClientTransport transport, Status s) { + if (!s.isOk()) { + // If the current transport is shut down due to error, move on to the next address in the + // list + synchronized (servers) { + if (addr.equals(servers.get(currentServerIndex).getAddress())) { + currentServerIndex++; + if (currentServerIndex >= servers.size()) { + currentServerIndex = 0; + } + } + } + } + } + } +} diff --git a/core/src/main/java/io/grpc/TransportManager.java b/core/src/main/java/io/grpc/TransportManager.java new file mode 100644 index 0000000000..1848bc70a3 --- /dev/null +++ b/core/src/main/java/io/grpc/TransportManager.java @@ -0,0 +1,60 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import com.google.common.util.concurrent.ListenableFuture; + +import io.grpc.internal.ClientTransport; + +import java.net.SocketAddress; + +/** + * Manages transport life-cycles and provide ready-to-use transports. + */ +@ExperimentalApi +public abstract class TransportManager { + /** + * Advises this {@code TransportManager} to retain transports only to these servers, for warming + * up connections and discarding unused connections. + */ + public abstract void updateRetainedTransports(SocketAddress[] addrs); + + /** + * Returns the future of a transport for the given server. + * + *

If the channel has been shut down, the value of the future will be {@code null}. + */ + // TODO(zhangkun83): GrpcLoadBalancer will use this to get transport to connect to LB servers, + // which would have a different authority than the primary servers. We need to figure out how to + // do it. + public abstract ListenableFuture getTransport(SocketAddress addr); +} diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java index 4fb903843e..4eeb1c96ac 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java @@ -39,6 +39,8 @@ import io.grpc.internal.AbstractReferenceCounted; import io.grpc.internal.ClientTransport; import io.grpc.internal.ClientTransportFactory; +import java.net.SocketAddress; + /** * Builder for a channel that issues in-process requests. Clients identify the in-process server by * its name. @@ -59,9 +61,9 @@ public class InProcessChannelBuilder extends } private final String name; - private String authority = "localhost"; private InProcessChannelBuilder(String name) { + super(new InProcessSocketAddress(name), "localhost"); this.name = Preconditions.checkNotNull(name); } @@ -73,40 +75,40 @@ public class InProcessChannelBuilder extends return this; } - @Override - public InProcessChannelBuilder overrideAuthority(String authority) { - this.authority = authority; - return this; - } - @Override protected ClientTransportFactory buildTransportFactory() { - return new InProcessClientTransportFactory(name, authority); + return new InProcessClientTransportFactory(name); } private static class InProcessClientTransportFactory extends AbstractReferenceCounted implements ClientTransportFactory { private final String name; - private final String authority; - private InProcessClientTransportFactory(String name, String authority) { + private InProcessClientTransportFactory(String name) { this.name = name; - this.authority = authority; } @Override - public ClientTransport newClientTransport() { + public ClientTransport newClientTransport(SocketAddress addr, String authority) { return new InProcessTransport(name); } - @Override - public String authority() { - return authority; - } - @Override protected void deallocate() { // Do nothing. } } + + private static class InProcessSocketAddress extends SocketAddress { + final String name; + + InProcessSocketAddress(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } } diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 5097c9fc24..b2d4766781 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -31,12 +31,23 @@ package io.grpc.internal; -import io.grpc.ClientInterceptor; -import io.grpc.Internal; -import io.grpc.ManagedChannelBuilder; +import com.google.common.base.Preconditions; +import io.grpc.Attributes; +import io.grpc.ClientInterceptor; +import io.grpc.DnsNameResolverFactory; +import io.grpc.Internal; +import io.grpc.LoadBalancer; +import io.grpc.ManagedChannelBuilder; +import io.grpc.NameResolver; +import io.grpc.ResolvedServerInfo; +import io.grpc.SimpleLoadBalancerFactory; + +import java.net.SocketAddress; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.Executor; @@ -54,9 +65,34 @@ public abstract class AbstractManagedChannelImplBuilder private Executor executor; private final List interceptors = new ArrayList(); + private final URI target; + + @Nullable + private final SocketAddress directServerAddress; + @Nullable private String userAgent; + @Nullable + private String authorityOverride; + + @Nullable + private NameResolver.Factory nameResolverFactory; + + @Nullable + private LoadBalancer.Factory loadBalancerFactory; + + protected AbstractManagedChannelImplBuilder(URI target) { + this.target = Preconditions.checkNotNull(target); + this.directServerAddress = null; + } + + protected AbstractManagedChannelImplBuilder(SocketAddress directServerAddress, String authority) { + this.target = URI.create("direct-address:///" + directServerAddress); + this.directServerAddress = directServerAddress; + this.nameResolverFactory = new DirectAddressNameResolverFactory(directServerAddress, authority); + } + @Override public final T executor(Executor executor) { this.executor = executor; @@ -74,6 +110,24 @@ public abstract class AbstractManagedChannelImplBuilder return intercept(Arrays.asList(interceptors)); } + @Override + public final T nameResolverFactory(NameResolver.Factory resolverFactory) { + Preconditions.checkState(directServerAddress == null, + "directServerAddress is set (%s), which forbids the use of NameResolverFactory", + directServerAddress); + this.nameResolverFactory = resolverFactory; + return thisT(); + } + + @Override + public final T loadBalancerFactory(LoadBalancer.Factory loadBalancerFactory) { + Preconditions.checkState(directServerAddress == null, + "directServerAddress is set (%s), which forbids the use of LoadBalancerFactory", + directServerAddress); + this.loadBalancerFactory = loadBalancerFactory; + return thisT(); + } + private T thisT() { @SuppressWarnings("unchecked") T thisT = (T) this; @@ -86,10 +140,33 @@ public abstract class AbstractManagedChannelImplBuilder return thisT(); } + @Override + public final T overrideAuthority(String authority) { + this.authorityOverride = checkAuthority(authority); + return thisT(); + } + + /** + * Verifies the authority is valid. This method exists as an escape hatch for putting in an + * authority that is valid, but would fail the default validation provided by this + * implementation. + */ + protected String checkAuthority(String authority) { + return GrpcUtil.checkAuthority(authority); + } + @Override public ManagedChannelImpl build() { - ClientTransportFactory transportFactory = buildTransportFactory(); - return new ManagedChannelImpl(transportFactory, executor, userAgent, interceptors); + ClientTransportFactory transportFactory = new AuthorityOverridingTransportFactory( + buildTransportFactory(), authorityOverride); + return new ManagedChannelImpl( + target, + // TODO(carl-mastrangelo): Allow clients to pass this in + new ExponentialBackoffPolicy.Provider(), + // TODO(zhangkun83): use a NameResolver registry for the "nameResolverFactory == null" case + nameResolverFactory == null ? DnsNameResolverFactory.getInstance() : nameResolverFactory, + loadBalancerFactory == null ? SimpleLoadBalancerFactory.getInstance() : loadBalancerFactory, + transportFactory, executor, userAgent, interceptors); } /** @@ -99,4 +176,68 @@ public abstract class AbstractManagedChannelImplBuilder */ @Internal protected abstract ClientTransportFactory buildTransportFactory(); + + private static class AuthorityOverridingTransportFactory implements ClientTransportFactory { + final ClientTransportFactory factory; + @Nullable final String authorityOverride; + + AuthorityOverridingTransportFactory( + ClientTransportFactory factory, @Nullable String authorityOverride) { + this.factory = factory; + this.authorityOverride = authorityOverride; + } + + @Override + public ClientTransport newClientTransport(SocketAddress serverAddress, String authority) { + return factory.newClientTransport( + serverAddress, authorityOverride != null ? authorityOverride : authority); + } + + @Override + public int referenceCount() { + return factory.referenceCount(); + } + + @Override + public ReferenceCounted retain() { + factory.retain(); + return this; + } + + @Override + public ReferenceCounted release() { + factory.release(); + return this; + } + } + + private static class DirectAddressNameResolverFactory extends NameResolver.Factory { + final SocketAddress address; + final String authority; + + DirectAddressNameResolverFactory(SocketAddress address, String authority) { + this.address = address; + this.authority = authority; + } + + @Override + public NameResolver newNameResolver(URI notUsedUri) { + return new NameResolver() { + @Override + public String getServiceAuthority() { + return authority; + } + + @Override + public void start(final Listener listener) { + listener.onUpdate( + Collections.singletonList(new ResolvedServerInfo(address, Attributes.EMPTY)), + Attributes.EMPTY); + } + + @Override + public void shutdown() {} + }; + } + } } diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index de7c0aadd9..7c7a684532 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -96,7 +96,7 @@ final class ClientCallImpl extends ClientCall { * @return a future for client transport. If no more transports can be created, e.g., channel is * shut down, the future's value will be {@code null}. */ - ListenableFuture get(); + ListenableFuture get(CallOptions callOptions); } @@ -139,7 +139,7 @@ final class ClientCallImpl extends ClientCall { } ClientStreamListener listener = new ClientStreamListenerImpl(observer); - ListenableFuture transportFuture = clientTransportProvider.get(); + ListenableFuture transportFuture = clientTransportProvider.get(callOptions); if (transportFuture.isDone()) { // Try to skip DelayedStream when possible to avoid the overhead of a volatile read in the // fast path. If that fails, stream will stay null and DelayedStream will be created. @@ -409,7 +409,7 @@ final class ClientCallImpl extends ClientCall { StreamCreationTask() { this.transportFuture = Preconditions.checkNotNull( - clientTransportProvider.get(), "transportFuture"); + clientTransportProvider.get(callOptions), "transportFuture"); } @Override diff --git a/core/src/main/java/io/grpc/internal/ClientTransport.java b/core/src/main/java/io/grpc/internal/ClientTransport.java index c2c88e47b0..fd9fa859f7 100644 --- a/core/src/main/java/io/grpc/internal/ClientTransport.java +++ b/core/src/main/java/io/grpc/internal/ClientTransport.java @@ -108,6 +108,8 @@ public interface ClientTransport { * that this method is called without {@link #shutdown} being called. If the argument to this * function is {@link Status#isOk}, it is safe to immediately reconnect. * + *

This is called exactly once, and must be called prior to {@link #transportTerminated}. + * * @param s the reason for the shutdown. */ void transportShutdown(Status s); @@ -115,7 +117,8 @@ public interface ClientTransport { /** * The transport completed shutting down. All resources have been released. * - *

{@link #transportShutdown(Status)} must be called before calling this method. + *

This is called exactly once, and must be called after {@link #transportShutdown} has been + * called. */ void transportTerminated(); diff --git a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java index a36ade1322..bc06a10966 100644 --- a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java +++ b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java @@ -31,14 +31,15 @@ package io.grpc.internal; +import java.net.SocketAddress; + /** Pre-configured factory for creating {@link ClientTransport} instances. */ public interface ClientTransportFactory extends ReferenceCounted { - /** Creates an unstarted transport for exclusive use. */ - ClientTransport newClientTransport(); - /** - * Returns the authority of the channel. Typically, this should be in the form {@code host:port}. - * Note that since there is not a scheme, there can't be a default port. + * Creates an unstarted transport for exclusive use. + * + * @param serverAddress the address that the transport is connected to + * @param authority the HTTP/2 authority of the server */ - String authority(); + ClientTransport newClientTransport(SocketAddress serverAddress, String authority); } diff --git a/core/src/main/java/io/grpc/internal/GrpcUtil.java b/core/src/main/java/io/grpc/internal/GrpcUtil.java index 7f22fb328d..b2b16608e4 100644 --- a/core/src/main/java/io/grpc/internal/GrpcUtil.java +++ b/core/src/main/java/io/grpc/internal/GrpcUtil.java @@ -351,7 +351,7 @@ public final class GrpcUtil { /** * Shared executor for channels. */ - static final Resource SHARED_CHANNEL_EXECUTOR = + public static final Resource SHARED_CHANNEL_EXECUTOR = new Resource() { private static final String name = "grpc-default-executor"; @Override diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index bc01591d76..7c05c646be 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -33,9 +33,11 @@ package io.grpc.internal; import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -45,11 +47,21 @@ import io.grpc.Codec; import io.grpc.Compressor; import io.grpc.DecompressorRegistry; import io.grpc.ExperimentalApi; +import io.grpc.LoadBalancer; import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor; +import io.grpc.NameResolver; +import io.grpc.ResolvedServerInfo; +import io.grpc.Status; +import io.grpc.TransportManager; import io.grpc.internal.ClientCallImpl.ClientTransportProvider; +import java.net.SocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -90,8 +102,15 @@ public final class ManagedChannelImpl extends ManagedChannel { */ private final Channel interceptorChannel; + private final NameResolver nameResolver; + private final LoadBalancer loadBalancer; + + /** + * Maps addresses to transports for that server. + */ @GuardedBy("lock") - private TransportSet transportSet; + private final Map transports = + new HashMap(); @GuardedBy("lock") private boolean shutdown; @@ -101,55 +120,21 @@ public final class ManagedChannelImpl extends ManagedChannel { private volatile Compressor defaultCompressor; private final ClientTransportProvider transportProvider = new ClientTransportProvider() { - @Override - public ListenableFuture get() { - TransportSet transportSetCopy; + public ListenableFuture get(CallOptions callOptions) { synchronized (lock) { if (shutdown) { return NULL_VALUE_TRANSPORT_FUTURE; } - if (transportSet == null) { - transportSet = new TransportSet(backoffPolicyProvider, transportFactory, - scheduledExecutor, new TransportSet.Callback() { - @Override - public void onTerminated() { - synchronized (lock) { - if (shutdown) { - transportSet = null; - if (terminated) { - log.warning("transportTerminated called after already terminated"); - } - terminated = true; - lock.notifyAll(); - onChannelTerminated(); - } - } - } - }); - } - transportSetCopy = transportSet; } - return transportSetCopy.obtainActiveTransport(); + return loadBalancer.pickTransport(callOptions.getRequestKey()); } }; - // TODO(zhangkun83): remove this in favor of the one that accepts BackoffPolicy.Provider - ManagedChannelImpl(ClientTransportFactory transportFactory, @Nullable Executor executor, - @Nullable String userAgent, List interceptors) { - this(new ExponentialBackoffPolicy.Provider(), transportFactory, executor, userAgent, - interceptors); - } - - ManagedChannelImpl(BackoffPolicy.Provider backoffPolicyProvider, + ManagedChannelImpl(URI targetUri, BackoffPolicy.Provider backoffPolicyProvider, + NameResolver.Factory nameResolverFactory, LoadBalancer.Factory loadBalancerFactory, ClientTransportFactory transportFactory, @Nullable Executor executor, @Nullable String userAgent, List interceptors) { - this.backoffPolicyProvider = backoffPolicyProvider; - this.transportFactory = transportFactory; - this.userAgent = userAgent; - this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors); - scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE); - if (executor == null) { usingSharedExecutor = true; this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR); @@ -157,7 +142,28 @@ public final class ManagedChannelImpl extends ManagedChannel { usingSharedExecutor = false; this.executor = executor; } + this.backoffPolicyProvider = backoffPolicyProvider; + this.nameResolver = nameResolverFactory.newNameResolver(targetUri); + Preconditions.checkArgument(this.nameResolver != null, + "The given NameResolverFactory cannot resolve %s", targetUri); + this.loadBalancer = loadBalancerFactory.newLoadBalancer(nameResolver.getServiceAuthority(), tm); + this.transportFactory = transportFactory; + this.userAgent = userAgent; + this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors); + scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE); + this.nameResolver.start(new NameResolver.Listener() { + @Override + public void onUpdate(List servers, Attributes config) { + loadBalancer.handleResolvedAddresses(servers, config); + } + + @Override + public void onError(Status error) { + Preconditions.checkArgument(!error.isOk(), "the error status must not be OK"); + loadBalancer.handleNameResolutionError(error); + } + }); } /** @@ -180,7 +186,7 @@ public final class ManagedChannelImpl extends ManagedChannel { */ @Override public ManagedChannelImpl shutdown() { - TransportSet transportSetCopy = null; + ArrayList transportsCopy = new ArrayList(); synchronized (lock) { if (shutdown) { return this; @@ -188,16 +194,16 @@ public final class ManagedChannelImpl extends ManagedChannel { shutdown = true; // After shutdown there are no new calls, so no new cancellation tasks are needed scheduledExecutor = SharedResourceHolder.release(TIMER_SERVICE, scheduledExecutor); - if (transportSet == null) { + if (transports.isEmpty()) { terminated = true; lock.notifyAll(); onChannelTerminated(); } else { - transportSetCopy = transportSet; + transportsCopy.addAll(transports.values()); } } - if (transportSetCopy != null) { - transportSetCopy.shutdown(); + for (TransportSet ts : transportsCopy) { + ts.shutdown(); } return this; } @@ -276,7 +282,8 @@ public final class ManagedChannelImpl extends ManagedChannel { @Override public String authority() { - return transportFactory.authority(); + String authority = nameResolver.getServiceAuthority(); + return Preconditions.checkNotNull(authority, "authority"); } } @@ -290,4 +297,43 @@ public final class ManagedChannelImpl extends ManagedChannel { // Release the transport factory so that it can deallocate any resources. transportFactory.release(); } + + private final TransportManager tm = new TransportManager() { + @Override + public void updateRetainedTransports(SocketAddress[] addrs) { + // TODO(zhangkun83): warm-up new servers and discard removed servers. + } + + @Override + public ListenableFuture getTransport(final SocketAddress addr) { + TransportSet ts; + synchronized (lock) { + if (shutdown) { + return null; + } + ts = transports.get(addr); + if (ts == null) { + ts = new TransportSet(addr, authority(), loadBalancer, backoffPolicyProvider, + transportFactory, scheduledExecutor, new TransportSet.Callback() { + @Override + public void onTerminated() { + synchronized (lock) { + transports.remove(addr); + if (shutdown && transports.isEmpty()) { + if (terminated) { + log.warning("transportTerminated called after already terminated"); + } + terminated = true; + lock.notifyAll(); + onChannelTerminated(); + } + } + } + }); + transports.put(addr, ts); + } + } + return ts.obtainActiveTransport(); + } + }; } diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java index fc4e046a61..9d7effe18b 100644 --- a/core/src/main/java/io/grpc/internal/TransportSet.java +++ b/core/src/main/java/io/grpc/internal/TransportSet.java @@ -36,13 +36,16 @@ import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import io.grpc.LoadBalancer; import io.grpc.Status; +import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -63,6 +66,8 @@ final class TransportSet { } private final Object lock = new Object(); + private final SocketAddress server; + private final String authority; private final BackoffPolicy.Provider backoffPolicyProvider; private final Callback callback; private final ClientTransportFactory transportFactory; @@ -84,6 +89,8 @@ final class TransportSet { @GuardedBy("lock") private final Collection transports = new ArrayList(); + private final LoadBalancer loadBalancer; + @GuardedBy("lock") private boolean shutdown; @@ -93,9 +100,12 @@ final class TransportSet { */ private volatile SettableFuture activeTransportFuture; - TransportSet(BackoffPolicy.Provider backoffPolicyProvider, - ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor, - Callback callback) { + TransportSet(SocketAddress server, String authority, LoadBalancer loadBalancer, + BackoffPolicy.Provider backoffPolicyProvider, ClientTransportFactory transportFactory, + ScheduledExecutorService scheduledExecutor, Callback callback) { + this.server = server; + this.authority = authority; + this.loadBalancer = loadBalancer; this.backoffPolicyProvider = backoffPolicyProvider; this.transportFactory = transportFactory; this.scheduledExecutor = scheduledExecutor; @@ -156,8 +166,10 @@ final class TransportSet { if (shutdown) { return; } - ClientTransport newActiveTransport = transportFactory.newClientTransport(); - log.info("Created transport '" + newActiveTransport); + ClientTransport newActiveTransport = transportFactory.newClientTransport( + server, authority); + log.log(Level.INFO, "Created transport {0} for {1}", + new Object[] {newActiveTransport, server}); transports.add(newActiveTransport); newActiveTransport.start( new TransportListener(newActiveTransport, activeTransportFuture)); @@ -222,30 +234,34 @@ final class TransportSet { @Override public void transportReady() { synchronized (lock) { - log.info("Transport '" + transport + " is ready"); + log.log(Level.INFO, "Transport {0} for {1} is ready", new Object[] {transport, server}); Preconditions.checkState(transportFuture.isDone(), "the transport future is not done"); if (isAttachedToActiveTransport()) { reconnectPolicy = null; } } + loadBalancer.transportReady(server, transport); } @Override public void transportShutdown(Status s) { synchronized (lock) { - log.info("Transport '" + transport + " is being shutdown"); + log.log(Level.INFO, "Transport {0} for {1} is being shutdown", + new Object[] {transport, server}); Preconditions.checkState(transportFuture.isDone(), "the transport future is not done"); if (isAttachedToActiveTransport()) { createActiveTransportFuture(); } } + loadBalancer.transportShutdown(server, transport, s); } @Override public void transportTerminated() { boolean runCallback = false; synchronized (lock) { - log.info("Transport '" + transport + " is terminated"); + log.log(Level.INFO, "Transport {0} for {1} is terminated", + new Object[] {transport, server}); Preconditions.checkState(!isAttachedToActiveTransport(), "Listener is still attached to activeTransportFuture. " + "Seems transportTerminated was not called."); diff --git a/core/src/test/java/io/grpc/ManagedChannelProviderTest.java b/core/src/test/java/io/grpc/ManagedChannelProviderTest.java index ab03cafc70..c8eae61828 100644 --- a/core/src/test/java/io/grpc/ManagedChannelProviderTest.java +++ b/core/src/test/java/io/grpc/ManagedChannelProviderTest.java @@ -113,6 +113,11 @@ public class ManagedChannelProviderTest { protected ManagedChannelBuilder builderForAddress(String host, int port) { throw new UnsupportedOperationException(); } + + @Override + protected ManagedChannelBuilder builderForTarget(String target) { + throw new UnsupportedOperationException(); + } } public static class Available0Provider extends BaseProvider { diff --git a/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java b/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java new file mode 100644 index 0000000000..f79f6189f2 --- /dev/null +++ b/core/src/test/java/io/grpc/SimpleLoadBalancerTest.java @@ -0,0 +1,144 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +import io.grpc.internal.ClientTransport; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.net.SocketAddress; +import java.util.ArrayList; + +/** Unit test for {@link SimpleLoadBalancerFactory}. */ +@RunWith(JUnit4.class) +public class SimpleLoadBalancerTest { + private LoadBalancer loadBalancer; + + private ArrayList servers; + + @Mock + private TransportManager mockTransportManager; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + loadBalancer = SimpleLoadBalancerFactory.getInstance().newLoadBalancer( + "fakeservice", mockTransportManager); + servers = new ArrayList(); + for (int i = 0; i < 3; i++) { + servers.add(new ResolvedServerInfo(new FakeSocketAddress("server" + i), Attributes.EMPTY)); + } + } + + @Test + public void pickBeforeResolved() throws Exception { + ClientTransport mockTransport = mock(ClientTransport.class); + SettableFuture sourceFuture = SettableFuture.create(); + when(mockTransportManager.getTransport(same(servers.get(0).getAddress()))) + .thenReturn(sourceFuture); + ListenableFuture f1 = loadBalancer.pickTransport(null); + ListenableFuture f2 = loadBalancer.pickTransport(null); + assertNotNull(f1); + assertNotNull(f2); + assertNotSame(f1, f2); + assertFalse(f1.isDone()); + assertFalse(f2.isDone()); + verify(mockTransportManager, never()).getTransport(any(SocketAddress.class)); + loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY); + verify(mockTransportManager, times(2)).getTransport(same(servers.get(0).getAddress())); + assertFalse(f1.isDone()); + assertFalse(f2.isDone()); + assertNotSame(sourceFuture, f1); + assertNotSame(sourceFuture, f2); + sourceFuture.set(mockTransport); + assertSame(mockTransport, f1.get()); + assertSame(mockTransport, f2.get()); + ListenableFuture f3 = loadBalancer.pickTransport(null); + assertSame(sourceFuture, f3); + verify(mockTransportManager, times(3)).getTransport(same(servers.get(0).getAddress())); + verifyNoMoreInteractions(mockTransportManager); + } + + @Test + public void transportFailed() throws Exception { + ClientTransport mockTransport1 = mock(ClientTransport.class); + ClientTransport mockTransport2 = mock(ClientTransport.class); + when(mockTransportManager.getTransport(same(servers.get(0).getAddress()))).thenReturn( + Futures.immediateFuture(mockTransport1)); + when(mockTransportManager.getTransport(same(servers.get(1).getAddress()))).thenReturn( + Futures.immediateFuture(mockTransport2)); + loadBalancer.handleResolvedAddresses(servers, Attributes.EMPTY); + ListenableFuture f1 = loadBalancer.pickTransport(null); + ListenableFuture f2 = loadBalancer.pickTransport(null); + assertSame(mockTransport1, f1.get()); + assertSame(mockTransport1, f2.get()); + loadBalancer.transportShutdown(servers.get(0).getAddress(), mockTransport1, Status.INTERNAL); + ListenableFuture f3 = loadBalancer.pickTransport(null); + assertSame(mockTransport2, f3.get()); + } + + private static class FakeSocketAddress extends SocketAddress { + final String name; + + FakeSocketAddress(String name) { + this.name = name; + } + + @Override + public String toString() { + return "FakeSocketAddress-" + name; + } + } + +} diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index b81f6390d8..5ede4c1469 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -94,7 +94,7 @@ public class ClientCallImplTest { final ClientStream stream = mock(ClientStream.class); ClientTransportProvider provider = new ClientTransportProvider() { @Override - public ListenableFuture get() { + public ListenableFuture get(CallOptions callOptions) { return Futures.immediateFuture(transport); } }; diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 1ed153f79d..3e52395107 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -37,6 +37,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; @@ -47,6 +48,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -56,6 +58,9 @@ import io.grpc.IntegerMarshaller; import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.NameResolver; +import io.grpc.ResolvedServerInfo; +import io.grpc.SimpleLoadBalancerFactory; import io.grpc.Status; import io.grpc.StringMarshaller; @@ -70,6 +75,8 @@ import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.net.SocketAddress; +import java.net.URI; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -81,15 +88,23 @@ import java.util.concurrent.atomic.AtomicLong; /** Unit tests for {@link ManagedChannelImpl}. */ @RunWith(JUnit4.class) public class ManagedChannelImplTest { + private static final List NO_INTERCEPTOR = + Collections.emptyList(); private final MethodDescriptor method = MethodDescriptor.create( MethodDescriptor.MethodType.UNKNOWN, "/service/method", new StringMarshaller(), new IntegerMarshaller()); private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private final String serviceName = "fake.example.com"; + private final URI target = URI.create("//" + serviceName); + private final String authority = serviceName; + private final SocketAddress socketAddress = new SocketAddress() {}; + private final ResolvedServerInfo server = new ResolvedServerInfo(socketAddress, Attributes.EMPTY); @Mock private ClientTransport mockTransport; @Mock private ClientTransportFactory mockTransportFactory; + private ManagedChannel channel; @Mock @@ -104,16 +119,18 @@ public class ManagedChannelImplTest { private ArgumentCaptor streamListenerCaptor = ArgumentCaptor.forClass(ClientStreamListener.class); - private void createChannel(List interceptors) throws Exception { - channel = new ManagedChannelImpl(new FakeBackoffPolicyProvider(), mockTransportFactory, - executor, null, interceptors); + private ManagedChannel createChannel( + NameResolver.Factory nameResolverFactory, List interceptors) { + return new ManagedChannelImpl(target, new FakeBackoffPolicyProvider(), + nameResolverFactory, SimpleLoadBalancerFactory.getInstance(), + mockTransportFactory, executor, null, interceptors); } @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); - createChannel(Collections.emptyList()); - when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport); + when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class))) + .thenReturn(mockTransport); } @After @@ -123,6 +140,7 @@ public class ManagedChannelImplTest { @Test public void immediateDeadlineExceeded() { + ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR); ClientCall call = channel.newCall(method, CallOptions.DEFAULT.withDeadlineNanoTime(System.nanoTime())); call.start(mockCallListener, new Metadata()); @@ -132,6 +150,7 @@ public class ManagedChannelImplTest { @Test public void shutdownWithNoTransportsEverCreated() { + ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR); verifyNoMoreInteractions(mockTransportFactory); channel.shutdown(); assertTrue(channel.isShutdown()); @@ -140,6 +159,8 @@ public class ManagedChannelImplTest { @Test public void twoCallsAndGracefulShutdown() { + ManagedChannel channel = createChannel( + new FakeNameResolverFactory(server), Collections.emptyList()); verifyNoMoreInteractions(mockTransportFactory); ClientCall call = channel.newCall(method, CallOptions.DEFAULT); verifyNoMoreInteractions(mockTransportFactory); @@ -148,11 +169,13 @@ public class ManagedChannelImplTest { ClientTransport mockTransport = mock(ClientTransport.class); ClientStream mockStream = mock(ClientStream.class); Metadata headers = new Metadata(); - when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport); + when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class))) + .thenReturn(mockTransport); when(mockTransport.newStream(same(method), same(headers), any(ClientStreamListener.class))) .thenReturn(mockStream); call.start(mockCallListener, headers); - verify(mockTransportFactory, timeout(1000)).newClientTransport(); + verify(mockTransportFactory, timeout(1000)) + .newClientTransport(same(socketAddress), eq(authority)); verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture()); ClientTransport.Listener transportListener = transportListenerCaptor.getValue(); verify(mockTransport, timeout(1000)) @@ -206,6 +229,7 @@ public class ManagedChannelImplTest { @Test public void transportFailsOnStart() { + ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR); Status goldenStatus = Status.INTERNAL.withDescription("wanted it to fail"); // mockTransport2 shuts immediately during start @@ -230,10 +254,12 @@ public class ManagedChannelImplTest { when(mockTransport2.newStream(same(method), same(headers2), any(ClientStreamListener.class))) .thenReturn(mockStream2); // The factory returns the immediately shut-down transport first, then the normal one - when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport2, mockTransport); + when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class))) + .thenReturn(mockTransport2, mockTransport); call.start(mockCallListener2, headers2); - verify(mockTransportFactory, timeout(1000).times(2)).newClientTransport(); + verify(mockTransportFactory, timeout(1000).times(2)) + .newClientTransport(same(socketAddress), eq(authority)); verify(mockTransport2, timeout(1000)).start(any(ClientTransport.Listener.class)); verify(mockTransport2, timeout(1000)) .newStream(same(method), same(headers2), streamListenerCaptor.capture()); @@ -268,7 +294,7 @@ public class ManagedChannelImplTest { transportListenerCaptor.getValue().transportTerminated(); assertTrue(channel.isTerminated()); - verify(mockTransportFactory, times(2)).newClientTransport(); + verify(mockTransportFactory, times(2)).newClientTransport(same(socketAddress), eq(authority)); verifyNoMoreInteractions(mockTransport); verifyNoMoreInteractions(mockTransport2); verifyNoMoreInteractions(mockStream2); @@ -286,13 +312,15 @@ public class ManagedChannelImplTest { return next.newCall(method, callOptions); } }; - createChannel(Arrays.asList(interceptor)); + ManagedChannel channel = createChannel( + new FakeNameResolverFactory(server), Arrays.asList(interceptor)); assertNotNull(channel.newCall(method, CallOptions.DEFAULT)); assertEquals(1, atomic.get()); } @Test public void testNoDeadlockOnShutdown() { + ManagedChannel channel = createChannel(new FakeNameResolverFactory(server), NO_INTERCEPTOR); // Force creation of transport ClientCall call = channel.newCall(method, CallOptions.DEFAULT); Metadata headers = new Metadata(); @@ -341,6 +369,18 @@ public class ManagedChannelImplTest { transportListener.transportTerminated(); } + @Test + public void nameResolutionFailed() { + Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error")); + ManagedChannel channel = createChannel(new FailingNameResolverFactory(error), NO_INTERCEPTOR); + ClientCall call = channel.newCall(method, CallOptions.DEFAULT); + call.start(mockCallListener, new Metadata()); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(mockCallListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class)); + Status status = statusCaptor.getValue(); + assertSame(error, status); + } + private static class FakeBackoffPolicyProvider implements BackoffPolicy.Provider { @Override public BackoffPolicy get() { @@ -352,4 +392,53 @@ public class ManagedChannelImplTest { }; } } + + private class FakeNameResolverFactory extends NameResolver.Factory { + final ResolvedServerInfo server; + + FakeNameResolverFactory(ResolvedServerInfo server) { + this.server = server; + } + + @Override + public NameResolver newNameResolver(final URI targetUri) { + assertEquals(null, targetUri.getScheme()); + assertEquals(serviceName, targetUri.getAuthority()); + return new NameResolver() { + @Override public String getServiceAuthority() { + assertNotNull(targetUri.toString() + " has authority", targetUri.getAuthority()); + return targetUri.getAuthority(); + } + + @Override public void start(final Listener listener) { + listener.onUpdate(Collections.singletonList(server), Attributes.EMPTY); + } + + @Override public void shutdown() {} + }; + } + } + + private class FailingNameResolverFactory extends NameResolver.Factory { + final Status error; + + FailingNameResolverFactory(Status error) { + this.error = error; + } + + @Override + public NameResolver newNameResolver(URI notUsedUri) { + return new NameResolver() { + @Override public String getServiceAuthority() { + return "irrelevant-authority"; + } + + @Override public void start(final Listener listener) { + listener.onError(error); + } + + @Override public void shutdown() {} + }; + } + } } diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 8acac651e9..580f8080cf 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -50,6 +50,7 @@ import io.netty.handler.ssl.SslContext; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.URI; import javax.annotation.Nullable; import javax.net.ssl.SSLException; @@ -61,8 +62,6 @@ import javax.net.ssl.SSLException; public class NettyChannelBuilder extends AbstractManagedChannelImplBuilder { public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB - private final SocketAddress serverAddress; - private String authority; private NegotiationType negotiationType = NegotiationType.TLS; private Class channelType = NioSocketChannel.class; @Nullable @@ -78,30 +77,39 @@ public class NettyChannelBuilder extends AbstractManagedChannelImplBuilder channelType; + private final NegotiationType negotiationType; + private final SslContext sslContext; private final EventLoopGroup group; private final boolean usingSharedGroup; private final int flowControlWindow; - private final ProtocolNegotiator negotiator; private final int maxMessageSize; - private final String authority; - private NettyTransportFactory(SocketAddress serverAddress, - String authority, - Class channelType, + private NettyTransportFactory(Class channelType, + NegotiationType negotiationType, + SslContext sslContext, EventLoopGroup group, int flowControlWindow, - ProtocolNegotiator negotiator, int maxMessageSize) { - this.serverAddress = serverAddress; this.channelType = channelType; + this.negotiationType = negotiationType; + this.sslContext = sslContext; this.flowControlWindow = flowControlWindow; - this.negotiator = negotiator; this.maxMessageSize = maxMessageSize; - this.authority = authority; - usingSharedGroup = group == null; if (usingSharedGroup) { // The group was unspecified, using the shared group. @@ -256,14 +246,11 @@ public class NettyChannelBuilder extends AbstractManagedChannelImplBuilderShould only used by tests. - * - * @deprecated use {@link #overrideAuthority} instead - */ - @Deprecated - public final OkHttpChannelBuilder overrideHostForAuthority(String host) { - this.authority = GrpcUtil.authorityFromHostAndPort(host, this.port); - return this; - } - - @Override - public final OkHttpChannelBuilder overrideAuthority(String authority) { - this.authority = checkAuthority(authority); - return this; - } - /** * Sets the negotiation type for the HTTP/2 connection. * @@ -205,18 +195,10 @@ public class OkHttpChannelBuilder extends @Override protected final ClientTransportFactory buildTransportFactory() { - return new OkHttpTransportFactory(host, port, authority, transportExecutor, + return new OkHttpTransportFactory(transportExecutor, createSocketFactory(), connectionSpec, maxMessageSize); } - /** - * Verifies the authority is valid. This method exists as an escape hatch for putting in an - * authority that is valid, but would fail the default validation provided by this implementation. - */ - protected String checkAuthority(String authority) { - return GrpcUtil.checkAuthority(authority); - } - private SSLSocketFactory createSocketFactory() { switch (negotiationType) { case TLS: @@ -231,25 +213,16 @@ public class OkHttpChannelBuilder extends private static class OkHttpTransportFactory extends AbstractReferenceCounted implements ClientTransportFactory { - private final String host; - private final int port; - private final String authority; private final Executor executor; private final boolean usingSharedExecutor; private final SSLSocketFactory socketFactory; private final ConnectionSpec connectionSpec; private final int maxMessageSize; - private OkHttpTransportFactory(String host, - int port, - String authority, - Executor executor, + private OkHttpTransportFactory(Executor executor, SSLSocketFactory socketFactory, ConnectionSpec connectionSpec, int maxMessageSize) { - this.host = host; - this.port = port; - this.authority = authority; this.socketFactory = socketFactory; this.connectionSpec = connectionSpec; this.maxMessageSize = maxMessageSize; @@ -264,14 +237,10 @@ public class OkHttpChannelBuilder extends } @Override - public ClientTransport newClientTransport() { - return new OkHttpClientTransport(host, port, authority, executor, socketFactory, - Utils.convertSpec(connectionSpec), maxMessageSize); - } - - @Override - public String authority() { - return authority; + public ClientTransport newClientTransport(SocketAddress addr, String authority) { + InetSocketAddress inetSocketAddr = (InetSocketAddress) addr; + return new OkHttpClientTransport(inetSocketAddr.getHostString(), inetSocketAddr.getPort(), + authority, executor, socketFactory, Utils.convertSpec(connectionSpec), maxMessageSize); } @Override diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelProvider.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelProvider.java index c915dcc1ea..9b3e34f3b4 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelProvider.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelProvider.java @@ -61,4 +61,9 @@ public class OkHttpChannelProvider extends ManagedChannelProvider { protected OkHttpChannelBuilder builderForAddress(String name, int port) { return OkHttpChannelBuilder.forAddress(name, port); } + + @Override + protected OkHttpChannelBuilder builderForTarget(String target) { + return OkHttpChannelBuilder.forTarget(target); + } } diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java index a1fe95ddaa..b80a7fd14b 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpChannelBuilderTest.java @@ -31,8 +31,6 @@ package io.grpc.okhttp; -import static org.junit.Assert.assertEquals; - import io.grpc.internal.ClientTransportFactory; import org.junit.Rule; @@ -55,11 +53,20 @@ public class OkHttpChannelBuilderTest { } }; - ClientTransportFactory factory = builder.overrideAuthority("invalid_authority") + ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority") .negotiationType(NegotiationType.PLAINTEXT) .buildTransportFactory(); + } - assertEquals("invalid_authority", factory.authority()); + @Test + public void failOverrideInvalidAuthority() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Invalid authority:"); + OkHttpChannelBuilder builder = new OkHttpChannelBuilder("good", 1234); + + ClientTransportFactory factory = builder.overrideAuthority("[invalidauthority") + .negotiationType(NegotiationType.PLAINTEXT) + .buildTransportFactory(); } @Test