diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 72456e28fe..269506b450 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -75,6 +75,7 @@ import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory; import io.grpc.internal.testing.StatsTestUtils.MetricsRecord; import io.grpc.internal.testing.StatsTestUtils.MockableSpan; import io.grpc.internal.testing.StatsTestUtils; +import io.grpc.internal.testing.StreamRecorder; import io.grpc.internal.testing.TestClientStreamTracer; import io.grpc.internal.testing.TestServerStreamTracer; import io.grpc.internal.testing.TestStreamTracer; @@ -83,7 +84,6 @@ import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientCalls; import io.grpc.stub.MetadataUtils; import io.grpc.stub.StreamObserver; -import io.grpc.testing.StreamRecorder; import io.grpc.testing.TestUtils; import io.grpc.testing.integration.Messages.EchoStatus; import io.grpc.testing.integration.Messages.Payload; diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java index ba6f0fa1c4..b3e9a1d75d 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java @@ -26,13 +26,13 @@ import com.squareup.okhttp.ConnectionSpec; import com.squareup.okhttp.TlsVersion; import io.grpc.ManagedChannel; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.testing.StreamRecorder; import io.grpc.internal.testing.TestUtils; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NettyServerBuilder; import io.grpc.okhttp.OkHttpChannelBuilder; import io.grpc.okhttp.internal.Platform; import io.grpc.stub.StreamObserver; -import io.grpc.testing.StreamRecorder; import io.netty.handler.ssl.OpenSsl; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; diff --git a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java index 11c09fd1cb..2e0a22360e 100644 --- a/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java +++ b/services/src/test/java/io/grpc/protobuf/services/ProtoReflectionServiceTest.java @@ -28,6 +28,7 @@ import io.grpc.Server; import io.grpc.ServerServiceDefinition; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.testing.StreamRecorder; import io.grpc.reflection.testing.AnotherDynamicServiceGrpc; import io.grpc.reflection.testing.DynamicReflectionTestDepthTwoProto; import io.grpc.reflection.testing.DynamicServiceGrpc; @@ -46,7 +47,6 @@ import io.grpc.reflection.v1alpha.ServiceResponse; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; -import io.grpc.testing.StreamRecorder; import io.grpc.util.MutableHandlerRegistry; import java.util.ArrayList; import java.util.Arrays; diff --git a/testing/src/main/java/io/grpc/internal/testing/StreamRecorder.java b/testing/src/main/java/io/grpc/internal/testing/StreamRecorder.java new file mode 100644 index 0000000000..4c4d07cd6b --- /dev/null +++ b/testing/src/main/java/io/grpc/internal/testing/StreamRecorder.java @@ -0,0 +1,113 @@ +/* + * Copyright 2014, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal.testing; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +/** + * Utility implementation of {@link StreamObserver} used in testing. Records all the observed + * values produced by the stream as well as any errors. + */ +public class StreamRecorder implements StreamObserver { + + /** + * Creates a new recorder. + */ + public static StreamRecorder create() { + return new StreamRecorder(); + } + + private final CountDownLatch latch; + private final List results; + private Throwable error; + private final SettableFuture firstValue; + + private StreamRecorder() { + firstValue = SettableFuture.create(); + latch = new CountDownLatch(1); + results = Collections.synchronizedList(new ArrayList()); + } + + @Override + public void onNext(T value) { + if (!firstValue.isDone()) { + firstValue.set(value); + } + results.add(value); + } + + @Override + public void onError(Throwable t) { + if (!firstValue.isDone()) { + firstValue.setException(t); + } + error = t; + latch.countDown(); + } + + @Override + public void onCompleted() { + if (!firstValue.isDone()) { + firstValue.setException(new IllegalStateException("No first value provided")); + } + latch.countDown(); + } + + /** + * Waits for the stream to terminate. + */ + public void awaitCompletion() throws Exception { + latch.await(); + } + + /** + * Waits a fixed timeout for the stream to terminate. + */ + public boolean awaitCompletion(int timeout, TimeUnit unit) throws Exception { + return latch.await(timeout, unit); + } + + /** + * Returns the current set of received values. + */ + public List getValues() { + return Collections.unmodifiableList(results); + } + + /** + * Returns the stream terminating error. + */ + @Nullable public Throwable getError() { + return error; + } + + /** + * Returns a {@link ListenableFuture} for the first value received from the stream. Useful + * for testing unary call patterns. + */ + public ListenableFuture firstValue() { + return firstValue; + } +} diff --git a/testing/src/main/java/io/grpc/testing/StreamRecorder.java b/testing/src/main/java/io/grpc/testing/StreamRecorder.java index ca382f0abf..9e5df25263 100644 --- a/testing/src/main/java/io/grpc/testing/StreamRecorder.java +++ b/testing/src/main/java/io/grpc/testing/StreamRecorder.java @@ -30,7 +30,10 @@ import javax.annotation.Nullable; /** * Utility implementation of {@link StreamObserver} used in testing. Records all the observed * values produced by the stream as well as any errors. + * + * @deprecated Not for public use */ +@Deprecated @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1791") public class StreamRecorder implements StreamObserver {