From 45da9c576667d43ad2d8ac8bf4c8d67fd9f48a9b Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 18 Jun 2015 09:53:23 -0700 Subject: [PATCH] Add a few ChannelImpl tests and improve error status --- core/src/main/java/io/grpc/ChannelImpl.java | 18 +- .../test/java/io/grpc/ChannelImplTest.java | 240 ++++++++++++++++++ .../test/java/io/grpc/IntegerMarshaller.java | 49 ++++ .../src/test/java/io/grpc/ServerImplTest.java | 36 +-- .../test/java/io/grpc/StringMarshaller.java | 59 +++++ .../okhttp/OkHttpClientTransport.java | 3 +- 6 files changed, 365 insertions(+), 40 deletions(-) create mode 100644 core/src/test/java/io/grpc/ChannelImplTest.java create mode 100644 core/src/test/java/io/grpc/IntegerMarshaller.java create mode 100644 core/src/test/java/io/grpc/StringMarshaller.java diff --git a/core/src/main/java/io/grpc/ChannelImpl.java b/core/src/main/java/io/grpc/ChannelImpl.java index dacd911468..b65f312240 100644 --- a/core/src/main/java/io/grpc/ChannelImpl.java +++ b/core/src/main/java/io/grpc/ChannelImpl.java @@ -192,8 +192,17 @@ public final class ChannelImpl extends Channel { * * @see ClientTransport#ping(PingCallback, Executor) */ - public void ping(PingCallback callback, Executor executor) { - obtainActiveTransport().ping(callback, executor); + public void ping(final PingCallback callback, final Executor executor) { + try { + obtainActiveTransport().ping(callback, executor); + } catch (final RuntimeException ex) { + executor.execute(new Runnable() { + @Override + public void run() { + callback.pingFailed(ex); + } + }); + } } /* @@ -300,13 +309,12 @@ public final class ChannelImpl extends Channel { transport = obtainActiveTransport(); } catch (RuntimeException ex) { stream = new NoopClientStream(); - listener.closed(Status.INTERNAL.withDescription("Failed starting transport").withCause(ex), - new Metadata.Trailers()); + listener.closed(Status.fromThrowable(ex), new Metadata.Trailers()); return; } if (transport == null) { stream = new NoopClientStream(); - listener.closed(Status.CANCELLED.withDescription("Channel is shutdown"), + listener.closed(Status.UNAVAILABLE.withDescription("Channel is shutdown"), new Metadata.Trailers()); return; } diff --git a/core/src/test/java/io/grpc/ChannelImplTest.java b/core/src/test/java/io/grpc/ChannelImplTest.java new file mode 100644 index 0000000000..17503c5681 --- /dev/null +++ b/core/src/test/java/io/grpc/ChannelImplTest.java @@ -0,0 +1,240 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.grpc.transport.ClientStream; +import io.grpc.transport.ClientStreamListener; +import io.grpc.transport.ClientTransport; +import io.grpc.transport.ClientTransportFactory; + +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** Unit tests for {@link ChannelImpl}. */ +@RunWith(JUnit4.class) +public class ChannelImplTest { + private MethodDescriptor method = MethodDescriptor.create( + MethodType.UNKNOWN, "/service/method", 1, TimeUnit.SECONDS, + new StringMarshaller(), new IntegerMarshaller()); + private ExecutorService executor = Executors.newSingleThreadExecutor(); + private ClientTransportFactory mockTransportFactory = mock(ClientTransportFactory.class); + private ChannelImpl channel = new ChannelImpl(mockTransportFactory, executor); + @SuppressWarnings("unchecked") + private ClientCall.Listener mockCallListener = mock(ClientCall.Listener.class); + @SuppressWarnings("unchecked") + private ClientCall.Listener mockCallListener2 = mock(ClientCall.Listener.class); + @SuppressWarnings("unchecked") + private ClientCall.Listener mockCallListener3 = mock(ClientCall.Listener.class); + private ArgumentCaptor transportListenerCaptor = + ArgumentCaptor.forClass(ClientTransport.Listener.class); + private ArgumentCaptor streamListenerCaptor = + ArgumentCaptor.forClass(ClientStreamListener.class); + + @After + public void tearDown() { + executor.shutdownNow(); + } + + @Test + public void shutdownWithNoTransportsEverCreated() { + verifyNoMoreInteractions(mockTransportFactory); + channel.shutdown(); + assertTrue(channel.isShutdown()); + assertTrue(channel.isTerminated()); + } + + @Test + public void twoCallsAndGracefulShutdown() { + verifyNoMoreInteractions(mockTransportFactory); + ClientCall call = channel.newCall(method); + verifyNoMoreInteractions(mockTransportFactory); + + // Create transport and call + ClientTransport mockTransport = mock(ClientTransport.class); + ClientStream mockStream = mock(ClientStream.class); + Metadata.Headers headers = new Metadata.Headers(); + when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport); + when(mockTransport.newStream(same(method), same(headers), any(ClientStreamListener.class))) + .thenReturn(mockStream); + call.start(mockCallListener, headers); + verify(mockTransportFactory).newClientTransport(); + verify(mockTransport).start(transportListenerCaptor.capture()); + ClientTransport.Listener transportListener = transportListenerCaptor.getValue(); + verify(mockTransport) + .newStream(same(method), same(headers), streamListenerCaptor.capture()); + ClientStreamListener streamListener = streamListenerCaptor.getValue(); + + // Second call + ClientCall call2 = channel.newCall(method); + ClientStream mockStream2 = mock(ClientStream.class); + Metadata.Headers headers2 = new Metadata.Headers(); + when(mockTransport.newStream(same(method), same(headers2), any(ClientStreamListener.class))) + .thenReturn(mockStream2); + call2.start(mockCallListener2, headers2); + verify(mockTransport) + .newStream(same(method), same(headers2), streamListenerCaptor.capture()); + ClientStreamListener streamListener2 = streamListenerCaptor.getValue(); + Metadata.Trailers trailers = new Metadata.Trailers(); + streamListener2.closed(Status.CANCELLED, trailers); + verify(mockCallListener2, timeout(1000)).onClose(Status.CANCELLED, trailers); + + // Shutdown + channel.shutdown(); + assertTrue(channel.isShutdown()); + assertFalse(channel.isTerminated()); + verify(mockTransport).shutdown(); + + // Further calls should fail without going to the transport + ClientCall call3 = channel.newCall(method); + call3.start(mockCallListener3, new Metadata.Headers()); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(mockCallListener3, timeout(1000)) + .onClose(statusCaptor.capture(), any(Metadata.Trailers.class)); + assertSame(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); + + // Finish shutdown + transportListener.transportShutdown(); + assertFalse(channel.isTerminated()); + streamListener.closed(Status.CANCELLED, trailers); + verify(mockCallListener, timeout(1000)).onClose(Status.CANCELLED, trailers); + assertFalse(channel.isTerminated()); + + transportListener.transportTerminated(); + assertTrue(channel.isTerminated()); + + verifyNoMoreInteractions(mockTransportFactory); + verifyNoMoreInteractions(mockTransport); + verifyNoMoreInteractions(mockStream); + } + + @Test + public void transportFailsOnStart() { + Status goldenStatus = Status.INTERNAL.withDescription("wanted it to fail"); + + // Have transport throw exception on start + ClientCall call = channel.newCall(method); + ClientTransport mockTransport = mock(ClientTransport.class); + when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport); + doThrow(goldenStatus.asRuntimeException()) + .when(mockTransport).start(any(ClientTransport.Listener.class)); + call.start(mockCallListener, new Metadata.Headers()); + verify(mockTransportFactory).newClientTransport(); + verify(mockTransport).start(any(ClientTransport.Listener.class)); + ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); + verify(mockCallListener, timeout(1000)) + .onClose(statusCaptor.capture(), any(Metadata.Trailers.class)); + assertSame(goldenStatus, statusCaptor.getValue()); + + // Have transport shutdown immediately during start + call = channel.newCall(method); + ClientTransport mockTransport2 = mock(ClientTransport.class); + ClientStream mockStream2 = mock(ClientStream.class); + Metadata.Headers headers2 = new Metadata.Headers(); + when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport2); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + ClientTransport.Listener listener = (ClientTransport.Listener) invocation.getArguments()[0]; + listener.transportShutdown(); + listener.transportTerminated(); + return null; + } + }).when(mockTransport2).start(any(ClientTransport.Listener.class)); + Exception ex = new IllegalStateException("Transport shutdown"); + when(mockTransport2.newStream(same(method), same(headers2), any(ClientStreamListener.class))) + .thenReturn(mockStream2); + call.start(mockCallListener2, headers2); + verify(mockTransportFactory, times(2)).newClientTransport(); + verify(mockTransport2).start(any(ClientTransport.Listener.class)); + verify(mockTransport2).newStream(same(method), same(headers2), streamListenerCaptor.capture()); + Metadata.Trailers trailers2 = new Metadata.Trailers(); + streamListenerCaptor.getValue().closed(Status.CANCELLED, trailers2); + verify(mockCallListener2, timeout(1000)).onClose(Status.CANCELLED, trailers2); + + // Make sure the Channel can still handle new calls + call = channel.newCall(method); + ClientTransport mockTransport3 = mock(ClientTransport.class); + ClientStream mockStream3 = mock(ClientStream.class); + Metadata.Headers headers3 = new Metadata.Headers(); + when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport3); + when(mockTransport3.newStream(same(method), same(headers3), any(ClientStreamListener.class))) + .thenReturn(mockStream3); + call.start(mockCallListener3, headers3); + verify(mockTransportFactory, times(3)).newClientTransport(); + verify(mockTransport3).start(transportListenerCaptor.capture()); + verify(mockTransport3).newStream(same(method), same(headers3), streamListenerCaptor.capture()); + Metadata.Trailers trailers3 = new Metadata.Trailers(); + streamListenerCaptor.getValue().closed(Status.CANCELLED, trailers3); + verify(mockCallListener3, timeout(1000)).onClose(Status.CANCELLED, trailers3); + + // Make sure shutdown still works + channel.shutdown(); + assertTrue(channel.isShutdown()); + assertFalse(channel.isTerminated()); + verify(mockTransport3).shutdown(); + transportListenerCaptor.getValue().transportShutdown(); + assertFalse(channel.isTerminated()); + + transportListenerCaptor.getValue().transportTerminated(); + assertTrue(channel.isTerminated()); + + verifyNoMoreInteractions(mockTransportFactory); + verifyNoMoreInteractions(mockTransport); + verifyNoMoreInteractions(mockTransport2); + verifyNoMoreInteractions(mockTransport3); + verifyNoMoreInteractions(mockStream2); + verifyNoMoreInteractions(mockStream3); + } +} diff --git a/core/src/test/java/io/grpc/IntegerMarshaller.java b/core/src/test/java/io/grpc/IntegerMarshaller.java new file mode 100644 index 0000000000..7059a58cec --- /dev/null +++ b/core/src/test/java/io/grpc/IntegerMarshaller.java @@ -0,0 +1,49 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import java.io.InputStream; + +/** Marshalls decimal-encoded integers. */ +class IntegerMarshaller implements Marshaller { + public static IntegerMarshaller INSTANCE = new IntegerMarshaller(); + + @Override + public InputStream stream(Integer value) { + return StringMarshaller.INSTANCE.stream(value.toString()); + } + + @Override + public Integer parse(InputStream stream) { + return Integer.valueOf(StringMarshaller.INSTANCE.parse(stream)); + } +} diff --git a/core/src/test/java/io/grpc/ServerImplTest.java b/core/src/test/java/io/grpc/ServerImplTest.java index 5a34fbfc04..ef2398b0eb 100644 --- a/core/src/test/java/io/grpc/ServerImplTest.java +++ b/core/src/test/java/io/grpc/ServerImplTest.java @@ -31,7 +31,6 @@ package io.grpc; -import static com.google.common.base.Charsets.UTF_8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -45,8 +44,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import com.google.common.io.ByteStreams; - import io.grpc.transport.ServerListener; import io.grpc.transport.ServerStream; import io.grpc.transport.ServerStreamListener; @@ -62,7 +59,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.concurrent.BrokenBarrierException; @@ -75,8 +71,8 @@ import java.util.concurrent.atomic.AtomicReference; /** Unit tests for {@link ServerImpl}. */ @RunWith(JUnit4.class) public class ServerImplTest { - private static final IntegerMarshaller INTEGER_MARSHALLER = new IntegerMarshaller(); - private static final StringMarshaller STRING_MARSHALLER = new StringMarshaller(); + private static final IntegerMarshaller INTEGER_MARSHALLER = IntegerMarshaller.INSTANCE; + private static final StringMarshaller STRING_MARSHALLER = StringMarshaller.INSTANCE; private ExecutorService executor = Executors.newSingleThreadExecutor(); private MutableHandlerRegistry registry = new MutableHandlerRegistryImpl(); @@ -300,32 +296,4 @@ public class ServerImplTest { listener.transportTerminated(); } } - - private static class StringMarshaller implements Marshaller { - @Override - public InputStream stream(String value) { - return new ByteArrayInputStream(value.getBytes(UTF_8)); - } - - @Override - public String parse(InputStream stream) { - try { - return new String(ByteStreams.toByteArray(stream), UTF_8); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - } - - private static class IntegerMarshaller implements Marshaller { - @Override - public InputStream stream(Integer value) { - return STRING_MARSHALLER.stream(value.toString()); - } - - @Override - public Integer parse(InputStream stream) { - return Integer.valueOf(STRING_MARSHALLER.parse(stream)); - } - } } diff --git a/core/src/test/java/io/grpc/StringMarshaller.java b/core/src/test/java/io/grpc/StringMarshaller.java new file mode 100644 index 0000000000..78450d99f4 --- /dev/null +++ b/core/src/test/java/io/grpc/StringMarshaller.java @@ -0,0 +1,59 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import static com.google.common.base.Charsets.UTF_8; + +import com.google.common.io.ByteStreams; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** Marshalls UTF-8 encoded strings. */ +class StringMarshaller implements Marshaller { + public static StringMarshaller INSTANCE = new StringMarshaller(); + + @Override + public InputStream stream(String value) { + return new ByteArrayInputStream(value.getBytes(UTF_8)); + } + + @Override + public String parse(InputStream stream) { + try { + return new String(ByteStreams.toByteArray(stream), UTF_8); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } +} diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java index 9b5539417b..90801569cf 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java @@ -337,7 +337,8 @@ class OkHttpClientTransport implements ClientTransport { } catch (IOException e) { // TODO(jhump): should we instead notify the listener of shutdown+terminated? // (and probably do all of this work asynchronously instead of in calling thread) - throw new RuntimeException(e); + throw Status.UNAVAILABLE.withDescription("Failed connecting").withCause(e) + .asRuntimeException(); } Variant variant = new Http2(); frameReader = variant.newReader(source, true);