mirror of https://github.com/grpc/grpc-java.git
interop-testing: update test proto to match grpc-proto. (#5003)
Update test proto to match stubby4 test server / grpc-proto repo. - Deprecated PayloadType since it only provide 1 option. - Change test cases to ignore deprecated field in Payload
This commit is contained in:
parent
7d19683018
commit
51ab5b9432
|
|
@ -57,7 +57,6 @@ public final class LongLivedChannel extends HttpServlet {
|
||||||
int responseSize = 5678;
|
int responseSize = 5678;
|
||||||
SimpleRequest request = SimpleRequest.newBuilder()
|
SimpleRequest request = SimpleRequest.newBuilder()
|
||||||
.setResponseSize(responseSize)
|
.setResponseSize(responseSize)
|
||||||
.setResponseType(Messages.PayloadType.COMPRESSABLE)
|
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setBody(ByteString.copyFrom(new byte[requestSize])))
|
.setBody(ByteString.copyFrom(new byte[requestSize])))
|
||||||
.build();
|
.build();
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -18,7 +18,6 @@ package io.grpc.testing.integration;
|
||||||
|
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
|
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
|
||||||
import static io.grpc.testing.integration.Messages.PayloadType.COMPRESSABLE;
|
|
||||||
import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY;
|
import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY;
|
||||||
import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY;
|
import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
@ -26,6 +25,7 @@ import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
@ -39,10 +39,8 @@ import com.google.auth.oauth2.ServiceAccountCredentials;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.google.common.io.ByteStreams;
|
import com.google.common.io.ByteStreams;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
import com.google.protobuf.BoolValue;
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import com.google.protobuf.MessageLite;
|
import com.google.protobuf.MessageLite;
|
||||||
import io.grpc.CallOptions;
|
import io.grpc.CallOptions;
|
||||||
|
|
@ -84,9 +82,9 @@ import io.grpc.stub.MetadataUtils;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
import io.grpc.testing.TestUtils;
|
import io.grpc.testing.TestUtils;
|
||||||
import io.grpc.testing.integration.EmptyProtos.Empty;
|
import io.grpc.testing.integration.EmptyProtos.Empty;
|
||||||
|
import io.grpc.testing.integration.Messages.BoolValue;
|
||||||
import io.grpc.testing.integration.Messages.EchoStatus;
|
import io.grpc.testing.integration.Messages.EchoStatus;
|
||||||
import io.grpc.testing.integration.Messages.Payload;
|
import io.grpc.testing.integration.Messages.Payload;
|
||||||
import io.grpc.testing.integration.Messages.PayloadType;
|
|
||||||
import io.grpc.testing.integration.Messages.ResponseParameters;
|
import io.grpc.testing.integration.Messages.ResponseParameters;
|
||||||
import io.grpc.testing.integration.Messages.SimpleRequest;
|
import io.grpc.testing.integration.Messages.SimpleRequest;
|
||||||
import io.grpc.testing.integration.Messages.SimpleResponse;
|
import io.grpc.testing.integration.Messages.SimpleResponse;
|
||||||
|
|
@ -112,6 +110,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
|
@ -154,11 +153,11 @@ public abstract class AbstractInteropTest {
|
||||||
new FakeTagContextBinarySerializer();
|
new FakeTagContextBinarySerializer();
|
||||||
|
|
||||||
private final AtomicReference<ServerCall<?, ?>> serverCallCapture =
|
private final AtomicReference<ServerCall<?, ?>> serverCallCapture =
|
||||||
new AtomicReference<ServerCall<?, ?>>();
|
new AtomicReference<>();
|
||||||
private final AtomicReference<Metadata> requestHeadersCapture =
|
private final AtomicReference<Metadata> requestHeadersCapture =
|
||||||
new AtomicReference<Metadata>();
|
new AtomicReference<>();
|
||||||
private final AtomicReference<Context> contextCapture =
|
private final AtomicReference<Context> contextCapture =
|
||||||
new AtomicReference<Context>();
|
new AtomicReference<>();
|
||||||
private final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder();
|
private final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder();
|
||||||
private final FakeStatsRecorder serverStatsRecorder = new FakeStatsRecorder();
|
private final FakeStatsRecorder serverStatsRecorder = new FakeStatsRecorder();
|
||||||
|
|
||||||
|
|
@ -166,7 +165,7 @@ public abstract class AbstractInteropTest {
|
||||||
private Server server;
|
private Server server;
|
||||||
|
|
||||||
private final LinkedBlockingQueue<ServerStreamTracerInfo> serverStreamTracers =
|
private final LinkedBlockingQueue<ServerStreamTracerInfo> serverStreamTracers =
|
||||||
new LinkedBlockingQueue<ServerStreamTracerInfo>();
|
new LinkedBlockingQueue<>();
|
||||||
|
|
||||||
private static final class ServerStreamTracerInfo {
|
private static final class ServerStreamTracerInfo {
|
||||||
final String fullMethodName;
|
final String fullMethodName;
|
||||||
|
|
@ -256,7 +255,7 @@ public abstract class AbstractInteropTest {
|
||||||
protected TestServiceGrpc.TestServiceStub asyncStub;
|
protected TestServiceGrpc.TestServiceStub asyncStub;
|
||||||
|
|
||||||
private final LinkedBlockingQueue<TestClientStreamTracer> clientStreamTracers =
|
private final LinkedBlockingQueue<TestClientStreamTracer> clientStreamTracers =
|
||||||
new LinkedBlockingQueue<TestClientStreamTracer>();
|
new LinkedBlockingQueue<>();
|
||||||
|
|
||||||
private final ClientStreamTracer.Factory clientStreamTracerFactory =
|
private final ClientStreamTracer.Factory clientStreamTracerFactory =
|
||||||
new ClientStreamTracer.Factory() {
|
new ClientStreamTracer.Factory() {
|
||||||
|
|
@ -389,17 +388,15 @@ public abstract class AbstractInteropTest {
|
||||||
assumeEnoughMemory();
|
assumeEnoughMemory();
|
||||||
final SimpleRequest request = SimpleRequest.newBuilder()
|
final SimpleRequest request = SimpleRequest.newBuilder()
|
||||||
.setResponseSize(314159)
|
.setResponseSize(314159)
|
||||||
.setResponseType(PayloadType.COMPRESSABLE)
|
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setBody(ByteString.copyFrom(new byte[271828])))
|
.setBody(ByteString.copyFrom(new byte[271828])))
|
||||||
.build();
|
.build();
|
||||||
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[314159])))
|
.setBody(ByteString.copyFrom(new byte[314159])))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
assertEquals(goldenResponse, blockingStub.unaryCall(request));
|
assertResponse(goldenResponse, blockingStub.unaryCall(request));
|
||||||
|
|
||||||
assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.OK,
|
assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.OK,
|
||||||
Collections.singleton(request), Collections.singleton(goldenResponse));
|
Collections.singleton(request), Collections.singleton(goldenResponse));
|
||||||
|
|
@ -440,7 +437,7 @@ public abstract class AbstractInteropTest {
|
||||||
assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.INVALID_ARGUMENT);
|
assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.INVALID_ARGUMENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals(
|
assertResponse(
|
||||||
goldenResponse, blockingStub.withCompression("gzip").unaryCall(expectCompressedRequest));
|
goldenResponse, blockingStub.withCompression("gzip").unaryCall(expectCompressedRequest));
|
||||||
assertStatsTrace(
|
assertStatsTrace(
|
||||||
"grpc.testing.TestService/UnaryCall",
|
"grpc.testing.TestService/UnaryCall",
|
||||||
|
|
@ -448,7 +445,7 @@ public abstract class AbstractInteropTest {
|
||||||
Collections.singleton(expectCompressedRequest),
|
Collections.singleton(expectCompressedRequest),
|
||||||
Collections.singleton(goldenResponse));
|
Collections.singleton(goldenResponse));
|
||||||
|
|
||||||
assertEquals(goldenResponse, blockingStub.unaryCall(expectUncompressedRequest));
|
assertResponse(goldenResponse, blockingStub.unaryCall(expectUncompressedRequest));
|
||||||
assertStatsTrace(
|
assertStatsTrace(
|
||||||
"grpc.testing.TestService/UnaryCall",
|
"grpc.testing.TestService/UnaryCall",
|
||||||
Status.Code.OK,
|
Status.Code.OK,
|
||||||
|
|
@ -482,14 +479,14 @@ public abstract class AbstractInteropTest {
|
||||||
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159])))
|
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159])))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
assertEquals(goldenResponse, blockingStub.unaryCall(responseShouldBeCompressed));
|
assertResponse(goldenResponse, blockingStub.unaryCall(responseShouldBeCompressed));
|
||||||
assertStatsTrace(
|
assertStatsTrace(
|
||||||
"grpc.testing.TestService/UnaryCall",
|
"grpc.testing.TestService/UnaryCall",
|
||||||
Status.Code.OK,
|
Status.Code.OK,
|
||||||
Collections.singleton(responseShouldBeCompressed),
|
Collections.singleton(responseShouldBeCompressed),
|
||||||
Collections.singleton(goldenResponse));
|
Collections.singleton(goldenResponse));
|
||||||
|
|
||||||
assertEquals(goldenResponse, blockingStub.unaryCall(responseShouldBeUncompressed));
|
assertResponse(goldenResponse, blockingStub.unaryCall(responseShouldBeUncompressed));
|
||||||
assertStatsTrace(
|
assertStatsTrace(
|
||||||
"grpc.testing.TestService/UnaryCall",
|
"grpc.testing.TestService/UnaryCall",
|
||||||
Status.Code.OK,
|
Status.Code.OK,
|
||||||
|
|
@ -500,7 +497,6 @@ public abstract class AbstractInteropTest {
|
||||||
@Test
|
@Test
|
||||||
public void serverStreaming() throws Exception {
|
public void serverStreaming() throws Exception {
|
||||||
final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
|
final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
|
||||||
.setResponseType(PayloadType.COMPRESSABLE)
|
|
||||||
.addResponseParameters(ResponseParameters.newBuilder()
|
.addResponseParameters(ResponseParameters.newBuilder()
|
||||||
.setSize(31415))
|
.setSize(31415))
|
||||||
.addResponseParameters(ResponseParameters.newBuilder()
|
.addResponseParameters(ResponseParameters.newBuilder()
|
||||||
|
|
@ -513,22 +509,18 @@ public abstract class AbstractInteropTest {
|
||||||
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
|
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[31415])))
|
.setBody(ByteString.copyFrom(new byte[31415])))
|
||||||
.build(),
|
.build(),
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[9])))
|
.setBody(ByteString.copyFrom(new byte[9])))
|
||||||
.build(),
|
.build(),
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[2653])))
|
.setBody(ByteString.copyFrom(new byte[2653])))
|
||||||
.build(),
|
.build(),
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[58979])))
|
.setBody(ByteString.copyFrom(new byte[58979])))
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
|
|
@ -536,7 +528,7 @@ public abstract class AbstractInteropTest {
|
||||||
asyncStub.streamingOutputCall(request, recorder);
|
asyncStub.streamingOutputCall(request, recorder);
|
||||||
recorder.awaitCompletion();
|
recorder.awaitCompletion();
|
||||||
assertSuccess(recorder);
|
assertSuccess(recorder);
|
||||||
assertEquals(goldenResponses, recorder.getValues());
|
assertResponses(goldenResponses, recorder.getValues());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -659,7 +651,7 @@ public abstract class AbstractInteropTest {
|
||||||
asyncStub.streamingOutputCall(request, recorder);
|
asyncStub.streamingOutputCall(request, recorder);
|
||||||
recorder.awaitCompletion();
|
recorder.awaitCompletion();
|
||||||
assertSuccess(recorder);
|
assertSuccess(recorder);
|
||||||
assertEquals(goldenResponses, recorder.getValues());
|
assertResponses(goldenResponses, recorder.getValues());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -692,26 +684,22 @@ public abstract class AbstractInteropTest {
|
||||||
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
|
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[31415])))
|
.setBody(ByteString.copyFrom(new byte[31415])))
|
||||||
.build(),
|
.build(),
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[9])))
|
.setBody(ByteString.copyFrom(new byte[9])))
|
||||||
.build(),
|
.build(),
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[2653])))
|
.setBody(ByteString.copyFrom(new byte[2653])))
|
||||||
.build(),
|
.build(),
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[58979])))
|
.setBody(ByteString.copyFrom(new byte[58979])))
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(5);
|
final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(5);
|
||||||
StreamObserver<StreamingOutputCallRequest> requestObserver
|
StreamObserver<StreamingOutputCallRequest> requestObserver
|
||||||
= asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() {
|
= asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() {
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -735,9 +723,9 @@ public abstract class AbstractInteropTest {
|
||||||
Object actualResponse = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
|
Object actualResponse = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
|
||||||
assertNotNull("Timed out waiting for response", actualResponse);
|
assertNotNull("Timed out waiting for response", actualResponse);
|
||||||
if (actualResponse instanceof Throwable) {
|
if (actualResponse instanceof Throwable) {
|
||||||
throw new AssertionError((Throwable) actualResponse);
|
throw new AssertionError(actualResponse);
|
||||||
}
|
}
|
||||||
assertEquals(goldenResponses.get(i), actualResponse);
|
assertResponse(goldenResponses.get(i), (StreamingOutputCallResponse) actualResponse);
|
||||||
}
|
}
|
||||||
requestObserver.onCompleted();
|
requestObserver.onCompleted();
|
||||||
assertEquals("Completed", queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
|
assertEquals("Completed", queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
|
||||||
|
|
@ -787,7 +775,6 @@ public abstract class AbstractInteropTest {
|
||||||
.build();
|
.build();
|
||||||
final StreamingOutputCallResponse goldenResponse = StreamingOutputCallResponse.newBuilder()
|
final StreamingOutputCallResponse goldenResponse = StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[31415])))
|
.setBody(ByteString.copyFrom(new byte[31415])))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
@ -795,7 +782,7 @@ public abstract class AbstractInteropTest {
|
||||||
StreamObserver<StreamingOutputCallRequest> requestObserver
|
StreamObserver<StreamingOutputCallRequest> requestObserver
|
||||||
= asyncStub.fullDuplexCall(responseObserver);
|
= asyncStub.fullDuplexCall(responseObserver);
|
||||||
requestObserver.onNext(request);
|
requestObserver.onNext(request);
|
||||||
assertEquals(goldenResponse, responseObserver.firstValue().get());
|
assertResponse(goldenResponse, responseObserver.firstValue().get());
|
||||||
requestObserver.onError(new RuntimeException());
|
requestObserver.onError(new RuntimeException());
|
||||||
responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
|
responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
|
||||||
assertEquals(1, responseObserver.getValues().size());
|
assertEquals(1, responseObserver.getValues().size());
|
||||||
|
|
@ -811,7 +798,6 @@ public abstract class AbstractInteropTest {
|
||||||
List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
|
List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
|
||||||
StreamingOutputCallRequest.Builder streamingOutputBuilder =
|
StreamingOutputCallRequest.Builder streamingOutputBuilder =
|
||||||
StreamingOutputCallRequest.newBuilder();
|
StreamingOutputCallRequest.newBuilder();
|
||||||
streamingOutputBuilder.setResponseType(COMPRESSABLE);
|
|
||||||
for (Integer size : responseSizes) {
|
for (Integer size : responseSizes) {
|
||||||
streamingOutputBuilder.addResponseParameters(
|
streamingOutputBuilder.addResponseParameters(
|
||||||
ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
|
ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
|
||||||
|
|
@ -835,7 +821,6 @@ public abstract class AbstractInteropTest {
|
||||||
assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
|
assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
|
||||||
for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
|
for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
|
||||||
StreamingOutputCallResponse response = recorder.getValues().get(ix);
|
StreamingOutputCallResponse response = recorder.getValues().get(ix);
|
||||||
assertEquals(COMPRESSABLE, response.getPayload().getType());
|
|
||||||
int length = response.getPayload().getBody().size();
|
int length = response.getPayload().getBody().size();
|
||||||
int expectedSize = responseSizes.get(ix % responseSizes.size());
|
int expectedSize = responseSizes.get(ix % responseSizes.size());
|
||||||
assertEquals("comparison failed at index " + ix, expectedSize, length);
|
assertEquals("comparison failed at index " + ix, expectedSize, length);
|
||||||
|
|
@ -851,7 +836,6 @@ public abstract class AbstractInteropTest {
|
||||||
List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
|
List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
|
||||||
StreamingOutputCallRequest.Builder streamingOutputBuilder =
|
StreamingOutputCallRequest.Builder streamingOutputBuilder =
|
||||||
StreamingOutputCallRequest.newBuilder();
|
StreamingOutputCallRequest.newBuilder();
|
||||||
streamingOutputBuilder.setResponseType(COMPRESSABLE);
|
|
||||||
for (Integer size : responseSizes) {
|
for (Integer size : responseSizes) {
|
||||||
streamingOutputBuilder.addResponseParameters(
|
streamingOutputBuilder.addResponseParameters(
|
||||||
ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
|
ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
|
||||||
|
|
@ -871,7 +855,6 @@ public abstract class AbstractInteropTest {
|
||||||
assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
|
assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
|
||||||
for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
|
for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
|
||||||
StreamingOutputCallResponse response = recorder.getValues().get(ix);
|
StreamingOutputCallResponse response = recorder.getValues().get(ix);
|
||||||
assertEquals(COMPRESSABLE, response.getPayload().getType());
|
|
||||||
int length = response.getPayload().getBody().size();
|
int length = response.getPayload().getBody().size();
|
||||||
int expectedSize = responseSizes.get(ix % responseSizes.size());
|
int expectedSize = responseSizes.get(ix % responseSizes.size());
|
||||||
assertEquals("comparison failed at index " + ix, expectedSize, length);
|
assertEquals("comparison failed at index " + ix, expectedSize, length);
|
||||||
|
|
@ -881,23 +864,20 @@ public abstract class AbstractInteropTest {
|
||||||
@Test
|
@Test
|
||||||
public void serverStreamingShouldBeFlowControlled() throws Exception {
|
public void serverStreamingShouldBeFlowControlled() throws Exception {
|
||||||
final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
|
final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
|
||||||
.setResponseType(COMPRESSABLE)
|
|
||||||
.addResponseParameters(ResponseParameters.newBuilder().setSize(100000))
|
.addResponseParameters(ResponseParameters.newBuilder().setSize(100000))
|
||||||
.addResponseParameters(ResponseParameters.newBuilder().setSize(100001))
|
.addResponseParameters(ResponseParameters.newBuilder().setSize(100001))
|
||||||
.build();
|
.build();
|
||||||
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
|
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[100000]))).build(),
|
.setBody(ByteString.copyFrom(new byte[100000]))).build(),
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[100001]))).build());
|
.setBody(ByteString.copyFrom(new byte[100001]))).build());
|
||||||
|
|
||||||
long start = System.nanoTime();
|
long start = System.nanoTime();
|
||||||
|
|
||||||
final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(10);
|
final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
|
||||||
ClientCall<StreamingOutputCallRequest, StreamingOutputCallResponse> call =
|
ClientCall<StreamingOutputCallRequest, StreamingOutputCallResponse> call =
|
||||||
channel.newCall(TestServiceGrpc.getStreamingOutputCallMethod(), CallOptions.DEFAULT);
|
channel.newCall(TestServiceGrpc.getStreamingOutputCallMethod(), CallOptions.DEFAULT);
|
||||||
call.start(new ClientCall.Listener<StreamingOutputCallResponse>() {
|
call.start(new ClientCall.Listener<StreamingOutputCallResponse>() {
|
||||||
|
|
@ -919,8 +899,9 @@ public abstract class AbstractInteropTest {
|
||||||
|
|
||||||
// Time how long it takes to get the first response.
|
// Time how long it takes to get the first response.
|
||||||
call.request(1);
|
call.request(1);
|
||||||
assertEquals(goldenResponses.get(0),
|
Object response = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
|
||||||
queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
|
assertTrue(response instanceof StreamingOutputCallResponse);
|
||||||
|
assertResponse(goldenResponses.get(0), (StreamingOutputCallResponse) response);
|
||||||
long firstCallDuration = System.nanoTime() - start;
|
long firstCallDuration = System.nanoTime() - start;
|
||||||
|
|
||||||
// Without giving additional flow control, make sure that we don't get another response. We wait
|
// Without giving additional flow control, make sure that we don't get another response. We wait
|
||||||
|
|
@ -932,8 +913,9 @@ public abstract class AbstractInteropTest {
|
||||||
|
|
||||||
// Make sure that everything still completes.
|
// Make sure that everything still completes.
|
||||||
call.request(1);
|
call.request(1);
|
||||||
assertEquals(goldenResponses.get(1),
|
response = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
|
||||||
queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
|
assertTrue(response instanceof StreamingOutputCallResponse);
|
||||||
|
assertResponse(goldenResponses.get(1), (StreamingOutputCallResponse) response);
|
||||||
assertEquals(Status.OK, queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
|
assertEquals(Status.OK, queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -942,18 +924,15 @@ public abstract class AbstractInteropTest {
|
||||||
assumeEnoughMemory();
|
assumeEnoughMemory();
|
||||||
final SimpleRequest request = SimpleRequest.newBuilder()
|
final SimpleRequest request = SimpleRequest.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[unaryPayloadLength()])))
|
.setBody(ByteString.copyFrom(new byte[unaryPayloadLength()])))
|
||||||
.setResponseSize(10)
|
.setResponseSize(10)
|
||||||
.setResponseType(PayloadType.COMPRESSABLE)
|
|
||||||
.build();
|
.build();
|
||||||
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[10])))
|
.setBody(ByteString.copyFrom(new byte[10])))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
assertEquals(goldenResponse, blockingStub.unaryCall(request));
|
assertResponse(goldenResponse, blockingStub.unaryCall(request));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -961,15 +940,13 @@ public abstract class AbstractInteropTest {
|
||||||
assumeEnoughMemory();
|
assumeEnoughMemory();
|
||||||
final SimpleRequest request = SimpleRequest.newBuilder()
|
final SimpleRequest request = SimpleRequest.newBuilder()
|
||||||
.setResponseSize(unaryPayloadLength())
|
.setResponseSize(unaryPayloadLength())
|
||||||
.setResponseType(PayloadType.COMPRESSABLE)
|
|
||||||
.build();
|
.build();
|
||||||
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[unaryPayloadLength()])))
|
.setBody(ByteString.copyFrom(new byte[unaryPayloadLength()])))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
assertEquals(goldenResponse, blockingStub.unaryCall(request));
|
assertResponse(goldenResponse, blockingStub.unaryCall(request));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -984,8 +961,8 @@ public abstract class AbstractInteropTest {
|
||||||
fixedHeaders.put(Util.METADATA_KEY, contextValue);
|
fixedHeaders.put(Util.METADATA_KEY, contextValue);
|
||||||
stub = MetadataUtils.attachHeaders(stub, fixedHeaders);
|
stub = MetadataUtils.attachHeaders(stub, fixedHeaders);
|
||||||
// .. and expect it to be echoed back in trailers
|
// .. and expect it to be echoed back in trailers
|
||||||
AtomicReference<Metadata> trailersCapture = new AtomicReference<Metadata>();
|
AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
|
||||||
AtomicReference<Metadata> headersCapture = new AtomicReference<Metadata>();
|
AtomicReference<Metadata> headersCapture = new AtomicReference<>();
|
||||||
stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
|
stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
|
||||||
|
|
||||||
assertNotNull(stub.emptyCall(EMPTY));
|
assertNotNull(stub.emptyCall(EMPTY));
|
||||||
|
|
@ -1007,14 +984,13 @@ public abstract class AbstractInteropTest {
|
||||||
fixedHeaders.put(Util.METADATA_KEY, contextValue);
|
fixedHeaders.put(Util.METADATA_KEY, contextValue);
|
||||||
stub = MetadataUtils.attachHeaders(stub, fixedHeaders);
|
stub = MetadataUtils.attachHeaders(stub, fixedHeaders);
|
||||||
// .. and expect it to be echoed back in trailers
|
// .. and expect it to be echoed back in trailers
|
||||||
AtomicReference<Metadata> trailersCapture = new AtomicReference<Metadata>();
|
AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
|
||||||
AtomicReference<Metadata> headersCapture = new AtomicReference<Metadata>();
|
AtomicReference<Metadata> headersCapture = new AtomicReference<>();
|
||||||
stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
|
stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
|
||||||
|
|
||||||
List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
|
List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
|
||||||
Messages.StreamingOutputCallRequest.Builder streamingOutputBuilder =
|
Messages.StreamingOutputCallRequest.Builder streamingOutputBuilder =
|
||||||
Messages.StreamingOutputCallRequest.newBuilder();
|
Messages.StreamingOutputCallRequest.newBuilder();
|
||||||
streamingOutputBuilder.setResponseType(COMPRESSABLE);
|
|
||||||
for (Integer size : responseSizes) {
|
for (Integer size : responseSizes) {
|
||||||
streamingOutputBuilder.addResponseParameters(
|
streamingOutputBuilder.addResponseParameters(
|
||||||
ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
|
ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
|
||||||
|
|
@ -1107,7 +1083,6 @@ public abstract class AbstractInteropTest {
|
||||||
.setSize(1)
|
.setSize(1)
|
||||||
.setIntervalUs(10000);
|
.setIntervalUs(10000);
|
||||||
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
|
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
|
||||||
.setResponseType(PayloadType.COMPRESSABLE)
|
|
||||||
.addResponseParameters(responseParameters)
|
.addResponseParameters(responseParameters)
|
||||||
.addResponseParameters(responseParameters)
|
.addResponseParameters(responseParameters)
|
||||||
.addResponseParameters(responseParameters)
|
.addResponseParameters(responseParameters)
|
||||||
|
|
@ -1189,7 +1164,7 @@ public abstract class AbstractInteropTest {
|
||||||
MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
|
MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
|
||||||
TestServiceGrpc.getStreamingOutputCallMethod();
|
TestServiceGrpc.getStreamingOutputCallMethod();
|
||||||
ByteSizeMarshaller<StreamingOutputCallResponse> mar =
|
ByteSizeMarshaller<StreamingOutputCallResponse> mar =
|
||||||
new ByteSizeMarshaller<StreamingOutputCallResponse>(md.getResponseMarshaller());
|
new ByteSizeMarshaller<>(md.getResponseMarshaller());
|
||||||
blockingServerStreamingCall(
|
blockingServerStreamingCall(
|
||||||
blockingStub.getChannel(),
|
blockingStub.getChannel(),
|
||||||
md.toBuilder(md.getRequestMarshaller(), mar).build(),
|
md.toBuilder(md.getRequestMarshaller(), mar).build(),
|
||||||
|
|
@ -1210,11 +1185,11 @@ public abstract class AbstractInteropTest {
|
||||||
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
|
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
|
||||||
.addResponseParameters(ResponseParameters.newBuilder().setSize(1))
|
.addResponseParameters(ResponseParameters.newBuilder().setSize(1))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
|
MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
|
||||||
TestServiceGrpc.getStreamingOutputCallMethod();
|
TestServiceGrpc.getStreamingOutputCallMethod();
|
||||||
ByteSizeMarshaller<StreamingOutputCallRequest> mar =
|
ByteSizeMarshaller<StreamingOutputCallRequest> mar =
|
||||||
new ByteSizeMarshaller<StreamingOutputCallRequest>(md.getRequestMarshaller());
|
new ByteSizeMarshaller<>(md.getRequestMarshaller());
|
||||||
blockingServerStreamingCall(
|
blockingServerStreamingCall(
|
||||||
blockingStub.getChannel(),
|
blockingStub.getChannel(),
|
||||||
md.toBuilder(mar, md.getResponseMarshaller()).build(),
|
md.toBuilder(mar, md.getResponseMarshaller()).build(),
|
||||||
|
|
@ -1246,7 +1221,7 @@ public abstract class AbstractInteropTest {
|
||||||
MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
|
MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
|
||||||
TestServiceGrpc.getStreamingOutputCallMethod();
|
TestServiceGrpc.getStreamingOutputCallMethod();
|
||||||
ByteSizeMarshaller<StreamingOutputCallRequest> mar =
|
ByteSizeMarshaller<StreamingOutputCallRequest> mar =
|
||||||
new ByteSizeMarshaller<StreamingOutputCallRequest>(md.getRequestMarshaller());
|
new ByteSizeMarshaller<>(md.getRequestMarshaller());
|
||||||
blockingServerStreamingCall(
|
blockingServerStreamingCall(
|
||||||
blockingStub.getChannel(),
|
blockingStub.getChannel(),
|
||||||
md.toBuilder(mar, md.getResponseMarshaller()).build(),
|
md.toBuilder(mar, md.getResponseMarshaller()).build(),
|
||||||
|
|
@ -1273,7 +1248,7 @@ public abstract class AbstractInteropTest {
|
||||||
MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
|
MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
|
||||||
TestServiceGrpc.getStreamingOutputCallMethod();
|
TestServiceGrpc.getStreamingOutputCallMethod();
|
||||||
ByteSizeMarshaller<StreamingOutputCallRequest> mar =
|
ByteSizeMarshaller<StreamingOutputCallRequest> mar =
|
||||||
new ByteSizeMarshaller<StreamingOutputCallRequest>(md.getRequestMarshaller());
|
new ByteSizeMarshaller<>(md.getRequestMarshaller());
|
||||||
blockingServerStreamingCall(
|
blockingServerStreamingCall(
|
||||||
blockingStub.getChannel(),
|
blockingStub.getChannel(),
|
||||||
md.toBuilder(mar, md.getResponseMarshaller()).build(),
|
md.toBuilder(mar, md.getResponseMarshaller()).build(),
|
||||||
|
|
@ -1322,22 +1297,19 @@ public abstract class AbstractInteropTest {
|
||||||
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
|
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[3])))
|
.setBody(ByteString.copyFrom(new byte[3])))
|
||||||
.build(),
|
.build(),
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[1])))
|
.setBody(ByteString.copyFrom(new byte[1])))
|
||||||
.build(),
|
.build(),
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[4])))
|
.setBody(ByteString.copyFrom(new byte[4])))
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
final ArrayBlockingQueue<StreamingOutputCallResponse> responses =
|
final ArrayBlockingQueue<StreamingOutputCallResponse> responses =
|
||||||
new ArrayBlockingQueue<StreamingOutputCallResponse>(3);
|
new ArrayBlockingQueue<>(3);
|
||||||
final SettableFuture<Void> completed = SettableFuture.create();
|
final SettableFuture<Void> completed = SettableFuture.create();
|
||||||
final SettableFuture<Void> errorSeen = SettableFuture.create();
|
final SettableFuture<Void> errorSeen = SettableFuture.create();
|
||||||
StreamObserver<StreamingOutputCallResponse> responseObserver =
|
StreamObserver<StreamingOutputCallResponse> responseObserver =
|
||||||
|
|
@ -1361,16 +1333,16 @@ public abstract class AbstractInteropTest {
|
||||||
StreamObserver<StreamingOutputCallRequest> requestObserver
|
StreamObserver<StreamingOutputCallRequest> requestObserver
|
||||||
= asyncStub.fullDuplexCall(responseObserver);
|
= asyncStub.fullDuplexCall(responseObserver);
|
||||||
requestObserver.onNext(requests.get(0));
|
requestObserver.onNext(requests.get(0));
|
||||||
assertEquals(
|
assertResponse(
|
||||||
goldenResponses.get(0), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
|
goldenResponses.get(0), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
|
||||||
// Initiate graceful shutdown.
|
// Initiate graceful shutdown.
|
||||||
channel.shutdown();
|
channel.shutdown();
|
||||||
requestObserver.onNext(requests.get(1));
|
requestObserver.onNext(requests.get(1));
|
||||||
assertEquals(
|
assertResponse(
|
||||||
goldenResponses.get(1), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
|
goldenResponses.get(1), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
|
||||||
// The previous ping-pong could have raced with the shutdown, but this one certainly shouldn't.
|
// The previous ping-pong could have raced with the shutdown, but this one certainly shouldn't.
|
||||||
requestObserver.onNext(requests.get(2));
|
requestObserver.onNext(requests.get(2));
|
||||||
assertEquals(
|
assertResponse(
|
||||||
goldenResponses.get(2), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
|
goldenResponses.get(2), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
|
||||||
assertFalse(completed.isDone());
|
assertFalse(completed.isDone());
|
||||||
requestObserver.onCompleted();
|
requestObserver.onCompleted();
|
||||||
|
|
@ -1384,24 +1356,20 @@ public abstract class AbstractInteropTest {
|
||||||
final int requestSize = 271828;
|
final int requestSize = 271828;
|
||||||
final SimpleRequest request = SimpleRequest.newBuilder()
|
final SimpleRequest request = SimpleRequest.newBuilder()
|
||||||
.setResponseSize(responseSize)
|
.setResponseSize(responseSize)
|
||||||
.setResponseType(PayloadType.COMPRESSABLE)
|
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setBody(ByteString.copyFrom(new byte[requestSize])))
|
.setBody(ByteString.copyFrom(new byte[requestSize])))
|
||||||
.build();
|
.build();
|
||||||
final StreamingOutputCallRequest streamingRequest = StreamingOutputCallRequest.newBuilder()
|
final StreamingOutputCallRequest streamingRequest = StreamingOutputCallRequest.newBuilder()
|
||||||
.addResponseParameters(ResponseParameters.newBuilder().setSize(responseSize))
|
.addResponseParameters(ResponseParameters.newBuilder().setSize(responseSize))
|
||||||
.setResponseType(PayloadType.COMPRESSABLE)
|
|
||||||
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[requestSize])))
|
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[requestSize])))
|
||||||
.build();
|
.build();
|
||||||
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[responseSize])))
|
.setBody(ByteString.copyFrom(new byte[responseSize])))
|
||||||
.build();
|
.build();
|
||||||
final StreamingOutputCallResponse goldenStreamingResponse =
|
final StreamingOutputCallResponse goldenStreamingResponse =
|
||||||
StreamingOutputCallResponse.newBuilder()
|
StreamingOutputCallResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[responseSize])))
|
.setBody(ByteString.copyFrom(new byte[responseSize])))
|
||||||
.build();
|
.build();
|
||||||
final byte[] trailingBytes =
|
final byte[] trailingBytes =
|
||||||
|
|
@ -1413,12 +1381,12 @@ public abstract class AbstractInteropTest {
|
||||||
metadata.put(Util.ECHO_TRAILING_METADATA_KEY, trailingBytes);
|
metadata.put(Util.ECHO_TRAILING_METADATA_KEY, trailingBytes);
|
||||||
TestServiceGrpc.TestServiceBlockingStub blockingStub = this.blockingStub;
|
TestServiceGrpc.TestServiceBlockingStub blockingStub = this.blockingStub;
|
||||||
blockingStub = MetadataUtils.attachHeaders(blockingStub, metadata);
|
blockingStub = MetadataUtils.attachHeaders(blockingStub, metadata);
|
||||||
AtomicReference<Metadata> headersCapture = new AtomicReference<Metadata>();
|
AtomicReference<Metadata> headersCapture = new AtomicReference<>();
|
||||||
AtomicReference<Metadata> trailersCapture = new AtomicReference<Metadata>();
|
AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
|
||||||
blockingStub = MetadataUtils.captureMetadata(blockingStub, headersCapture, trailersCapture);
|
blockingStub = MetadataUtils.captureMetadata(blockingStub, headersCapture, trailersCapture);
|
||||||
SimpleResponse response = blockingStub.unaryCall(request);
|
SimpleResponse response = blockingStub.unaryCall(request);
|
||||||
|
|
||||||
assertEquals(goldenResponse, response);
|
assertResponse(goldenResponse, response);
|
||||||
assertEquals("test_initial_metadata_value",
|
assertEquals("test_initial_metadata_value",
|
||||||
headersCapture.get().get(Util.ECHO_INITIAL_METADATA_KEY));
|
headersCapture.get().get(Util.ECHO_INITIAL_METADATA_KEY));
|
||||||
assertTrue(
|
assertTrue(
|
||||||
|
|
@ -1432,8 +1400,8 @@ public abstract class AbstractInteropTest {
|
||||||
metadata.put(Util.ECHO_TRAILING_METADATA_KEY, trailingBytes);
|
metadata.put(Util.ECHO_TRAILING_METADATA_KEY, trailingBytes);
|
||||||
TestServiceGrpc.TestServiceStub stub = asyncStub;
|
TestServiceGrpc.TestServiceStub stub = asyncStub;
|
||||||
stub = MetadataUtils.attachHeaders(stub, metadata);
|
stub = MetadataUtils.attachHeaders(stub, metadata);
|
||||||
headersCapture = new AtomicReference<Metadata>();
|
headersCapture = new AtomicReference<>();
|
||||||
trailersCapture = new AtomicReference<Metadata>();
|
trailersCapture = new AtomicReference<>();
|
||||||
stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
|
stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
|
||||||
|
|
||||||
StreamRecorder<Messages.StreamingOutputCallResponse> recorder = StreamRecorder.create();
|
StreamRecorder<Messages.StreamingOutputCallResponse> recorder = StreamRecorder.create();
|
||||||
|
|
@ -1444,7 +1412,7 @@ public abstract class AbstractInteropTest {
|
||||||
recorder.awaitCompletion();
|
recorder.awaitCompletion();
|
||||||
|
|
||||||
assertSuccess(recorder);
|
assertSuccess(recorder);
|
||||||
assertEquals(goldenStreamingResponse, recorder.firstValue().get());
|
assertResponse(goldenStreamingResponse, recorder.firstValue().get());
|
||||||
assertEquals("test_initial_metadata_value",
|
assertEquals("test_initial_metadata_value",
|
||||||
headersCapture.get().get(Util.ECHO_INITIAL_METADATA_KEY));
|
headersCapture.get().get(Util.ECHO_INITIAL_METADATA_KEY));
|
||||||
assertTrue(
|
assertTrue(
|
||||||
|
|
@ -1632,14 +1600,13 @@ public abstract class AbstractInteropTest {
|
||||||
// cast to ServiceAccountCredentials to double-check the right type of object was created.
|
// cast to ServiceAccountCredentials to double-check the right type of object was created.
|
||||||
GoogleCredentials credentials =
|
GoogleCredentials credentials =
|
||||||
ServiceAccountCredentials.class.cast(GoogleCredentials.fromStream(credentialsStream));
|
ServiceAccountCredentials.class.cast(GoogleCredentials.fromStream(credentialsStream));
|
||||||
credentials = credentials.createScoped(Arrays.<String>asList(authScope));
|
credentials = credentials.createScoped(Arrays.asList(authScope));
|
||||||
TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
|
TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
|
||||||
.withCallCredentials(MoreCallCredentials.from(credentials));
|
.withCallCredentials(MoreCallCredentials.from(credentials));
|
||||||
final SimpleRequest request = SimpleRequest.newBuilder()
|
final SimpleRequest request = SimpleRequest.newBuilder()
|
||||||
.setFillUsername(true)
|
.setFillUsername(true)
|
||||||
.setFillOauthScope(true)
|
.setFillOauthScope(true)
|
||||||
.setResponseSize(314159)
|
.setResponseSize(314159)
|
||||||
.setResponseType(PayloadType.COMPRESSABLE)
|
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setBody(ByteString.copyFrom(new byte[271828])))
|
.setBody(ByteString.copyFrom(new byte[271828])))
|
||||||
.build();
|
.build();
|
||||||
|
|
@ -1656,10 +1623,9 @@ public abstract class AbstractInteropTest {
|
||||||
.setOauthScope(response.getOauthScope())
|
.setOauthScope(response.getOauthScope())
|
||||||
.setUsername(response.getUsername())
|
.setUsername(response.getUsername())
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[314159])))
|
.setBody(ByteString.copyFrom(new byte[314159])))
|
||||||
.build();
|
.build();
|
||||||
assertEquals(goldenResponse, response);
|
assertResponse(goldenResponse, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Sends a large unary rpc with compute engine credentials. */
|
/** Sends a large unary rpc with compute engine credentials. */
|
||||||
|
|
@ -1671,7 +1637,6 @@ public abstract class AbstractInteropTest {
|
||||||
.setFillUsername(true)
|
.setFillUsername(true)
|
||||||
.setFillOauthScope(true)
|
.setFillOauthScope(true)
|
||||||
.setResponseSize(314159)
|
.setResponseSize(314159)
|
||||||
.setResponseType(PayloadType.COMPRESSABLE)
|
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setBody(ByteString.copyFrom(new byte[271828])))
|
.setBody(ByteString.copyFrom(new byte[271828])))
|
||||||
.build();
|
.build();
|
||||||
|
|
@ -1686,16 +1651,14 @@ public abstract class AbstractInteropTest {
|
||||||
.setOauthScope(response.getOauthScope())
|
.setOauthScope(response.getOauthScope())
|
||||||
.setUsername(response.getUsername())
|
.setUsername(response.getUsername())
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[314159])))
|
.setBody(ByteString.copyFrom(new byte[314159])))
|
||||||
.build();
|
.build();
|
||||||
assertEquals(goldenResponse, response);
|
assertResponse(goldenResponse, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Test JWT-based auth. */
|
/** Test JWT-based auth. */
|
||||||
public void jwtTokenCreds(InputStream serviceAccountJson) throws Exception {
|
public void jwtTokenCreds(InputStream serviceAccountJson) throws Exception {
|
||||||
final SimpleRequest request = SimpleRequest.newBuilder()
|
final SimpleRequest request = SimpleRequest.newBuilder()
|
||||||
.setResponseType(PayloadType.COMPRESSABLE)
|
|
||||||
.setResponseSize(314159)
|
.setResponseSize(314159)
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setBody(ByteString.copyFrom(new byte[271828])))
|
.setBody(ByteString.copyFrom(new byte[271828])))
|
||||||
|
|
@ -1716,7 +1679,7 @@ public abstract class AbstractInteropTest {
|
||||||
throws Exception {
|
throws Exception {
|
||||||
GoogleCredentials utilCredentials =
|
GoogleCredentials utilCredentials =
|
||||||
GoogleCredentials.fromStream(credentialsStream);
|
GoogleCredentials.fromStream(credentialsStream);
|
||||||
utilCredentials = utilCredentials.createScoped(Arrays.<String>asList(authScope));
|
utilCredentials = utilCredentials.createScoped(Arrays.asList(authScope));
|
||||||
AccessToken accessToken = utilCredentials.refreshAccessToken();
|
AccessToken accessToken = utilCredentials.refreshAccessToken();
|
||||||
|
|
||||||
OAuth2Credentials credentials = OAuth2Credentials.create(accessToken);
|
OAuth2Credentials credentials = OAuth2Credentials.create(accessToken);
|
||||||
|
|
@ -1779,7 +1742,7 @@ public abstract class AbstractInteropTest {
|
||||||
|
|
||||||
stub.unaryCall(SimpleRequest.getDefaultInstance());
|
stub.unaryCall(SimpleRequest.getDefaultInstance());
|
||||||
|
|
||||||
List<Certificate> certificates = Lists.newArrayList();
|
List<Certificate> certificates;
|
||||||
SSLSession sslSession =
|
SSLSession sslSession =
|
||||||
serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
|
serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
|
||||||
try {
|
try {
|
||||||
|
|
@ -1995,7 +1958,7 @@ public abstract class AbstractInteropTest {
|
||||||
for (MessageLite msg : receivedMessages) {
|
for (MessageLite msg : receivedMessages) {
|
||||||
assertThat(tracer.nextInboundEvent()).isEqualTo(String.format("inboundMessage(%d)", seqNo));
|
assertThat(tracer.nextInboundEvent()).isEqualTo(String.format("inboundMessage(%d)", seqNo));
|
||||||
assertThat(tracer.nextInboundEvent()).matches(
|
assertThat(tracer.nextInboundEvent()).matches(
|
||||||
String.format("inboundMessageRead\\(%d, -?[0-9]+, -?[0-9]+\\)", seqNo));
|
String.format("inboundMessageRead\\(%d, -?[0-9]+, -?[0-9]+\\)", seqNo));
|
||||||
uncompressedReceivedSize += msg.getSerializedSize();
|
uncompressedReceivedSize += msg.getSerializedSize();
|
||||||
seqNo++;
|
seqNo++;
|
||||||
}
|
}
|
||||||
|
|
@ -2058,6 +2021,44 @@ public abstract class AbstractInteropTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Helper methods for responses containing Payload since proto equals does not ignore deprecated
|
||||||
|
// fields (PayloadType).
|
||||||
|
private void assertResponses(
|
||||||
|
Collection<StreamingOutputCallResponse> expected,
|
||||||
|
Collection<StreamingOutputCallResponse> actual) {
|
||||||
|
assertSame(expected.size(), actual.size());
|
||||||
|
Iterator<StreamingOutputCallResponse> expectedIter = expected.iterator();
|
||||||
|
Iterator<StreamingOutputCallResponse> actualIter = actual.iterator();
|
||||||
|
|
||||||
|
while (expectedIter.hasNext()) {
|
||||||
|
assertResponse(expectedIter.next(), actualIter.next());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertResponse(
|
||||||
|
StreamingOutputCallResponse expected, StreamingOutputCallResponse actual) {
|
||||||
|
if (expected == null || actual == null) {
|
||||||
|
assertEquals(expected, actual);
|
||||||
|
} else {
|
||||||
|
assertPayload(expected.getPayload(), actual.getPayload());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertResponse(SimpleResponse expected, SimpleResponse actual) {
|
||||||
|
assertPayload(expected.getPayload(), actual.getPayload());
|
||||||
|
assertEquals(expected.getUsername(), actual.getUsername());
|
||||||
|
assertEquals(expected.getOauthScope(), actual.getOauthScope());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertPayload(Payload expected, Payload actual) {
|
||||||
|
// Compare non deprecated fields in Payload, to make this test forward compatible.
|
||||||
|
if (expected == null || actual == null) {
|
||||||
|
assertEquals(expected, actual);
|
||||||
|
} else {
|
||||||
|
assertEquals(expected.getBody(), actual.getBody());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Captures the request attributes. Useful for testing ServerCalls.
|
* Captures the request attributes. Useful for testing ServerCalls.
|
||||||
* {@link ServerCall#getAttributes()}
|
* {@link ServerCall#getAttributes()}
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,6 @@ import io.grpc.netty.NegotiationType;
|
||||||
import io.grpc.netty.NettyChannelBuilder;
|
import io.grpc.netty.NettyChannelBuilder;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
import io.grpc.testing.integration.Messages.Payload;
|
import io.grpc.testing.integration.Messages.Payload;
|
||||||
import io.grpc.testing.integration.Messages.PayloadType;
|
|
||||||
import io.grpc.testing.integration.Messages.SimpleRequest;
|
import io.grpc.testing.integration.Messages.SimpleRequest;
|
||||||
import io.grpc.testing.integration.Messages.SimpleResponse;
|
import io.grpc.testing.integration.Messages.SimpleResponse;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
|
@ -205,12 +204,10 @@ public final class Http2Client {
|
||||||
private final int payloadSize = 271828;
|
private final int payloadSize = 271828;
|
||||||
private final SimpleRequest simpleRequest = SimpleRequest.newBuilder()
|
private final SimpleRequest simpleRequest = SimpleRequest.newBuilder()
|
||||||
.setResponseSize(responseSize)
|
.setResponseSize(responseSize)
|
||||||
.setResponseType(PayloadType.COMPRESSABLE)
|
|
||||||
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[payloadSize])))
|
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[payloadSize])))
|
||||||
.build();
|
.build();
|
||||||
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[responseSize])))
|
.setBody(ByteString.copyFrom(new byte[responseSize])))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
@ -272,7 +269,7 @@ public final class Http2Client {
|
||||||
assertResponseEquals(blockingStub.unaryCall(simpleRequest), goldenResponse);
|
assertResponseEquals(blockingStub.unaryCall(simpleRequest), goldenResponse);
|
||||||
|
|
||||||
threadpool = MoreExecutors.listeningDecorator(newFixedThreadPool(numThreads));
|
threadpool = MoreExecutors.listeningDecorator(newFixedThreadPool(numThreads));
|
||||||
List<ListenableFuture<?>> workerFutures = new ArrayList<ListenableFuture<?>>();
|
List<ListenableFuture<?>> workerFutures = new ArrayList<>();
|
||||||
for (int i = 0; i < numThreads; i++) {
|
for (int i = 0; i < numThreads; i++) {
|
||||||
workerFutures.add(threadpool.submit(new MaxStreamsWorker(i, simpleRequest)));
|
workerFutures.add(threadpool.submit(new MaxStreamsWorker(i, simpleRequest)));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,6 @@ import io.grpc.internal.LogExceptionRunnable;
|
||||||
import io.grpc.stub.ServerCallStreamObserver;
|
import io.grpc.stub.ServerCallStreamObserver;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
import io.grpc.testing.integration.Messages.Payload;
|
import io.grpc.testing.integration.Messages.Payload;
|
||||||
import io.grpc.testing.integration.Messages.PayloadType;
|
|
||||||
import io.grpc.testing.integration.Messages.ResponseParameters;
|
import io.grpc.testing.integration.Messages.ResponseParameters;
|
||||||
import io.grpc.testing.integration.Messages.SimpleRequest;
|
import io.grpc.testing.integration.Messages.SimpleRequest;
|
||||||
import io.grpc.testing.integration.Messages.SimpleResponse;
|
import io.grpc.testing.integration.Messages.SimpleResponse;
|
||||||
|
|
@ -37,8 +36,6 @@ import io.grpc.testing.integration.Messages.StreamingInputCallRequest;
|
||||||
import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
|
import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
|
||||||
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
|
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
|
||||||
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
|
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
|
@ -56,12 +53,9 @@ import javax.annotation.concurrent.GuardedBy;
|
||||||
* sent in response streams.
|
* sent in response streams.
|
||||||
*/
|
*/
|
||||||
public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
||||||
private static final String UNCOMPRESSABLE_FILE =
|
|
||||||
"/io/grpc/testing/integration/testdata/uncompressable.bin";
|
|
||||||
private final Random random = new Random();
|
private final Random random = new Random();
|
||||||
|
|
||||||
private final ScheduledExecutorService executor;
|
private final ScheduledExecutorService executor;
|
||||||
private final ByteString uncompressableBuffer;
|
|
||||||
private final ByteString compressableBuffer;
|
private final ByteString compressableBuffer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -70,11 +64,10 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
||||||
public TestServiceImpl(ScheduledExecutorService executor) {
|
public TestServiceImpl(ScheduledExecutorService executor) {
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
this.compressableBuffer = ByteString.copyFrom(new byte[1024]);
|
this.compressableBuffer = ByteString.copyFrom(new byte[1024]);
|
||||||
this.uncompressableBuffer = createBufferFromFile(UNCOMPRESSABLE_FILE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void emptyCall(EmptyProtos.Empty empty,
|
public void emptyCall(EmptyProtos.Empty request,
|
||||||
StreamObserver<EmptyProtos.Empty> responseObserver) {
|
StreamObserver<EmptyProtos.Empty> responseObserver) {
|
||||||
responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance());
|
responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance());
|
||||||
responseObserver.onCompleted();
|
responseObserver.onCompleted();
|
||||||
|
|
@ -103,16 +96,12 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (req.getResponseSize() != 0) {
|
if (req.getResponseSize() != 0) {
|
||||||
boolean compressable = compressableResponse(req.getResponseType());
|
|
||||||
ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer;
|
|
||||||
// For consistency with the c++ TestServiceImpl, use a random offset for unary calls.
|
// For consistency with the c++ TestServiceImpl, use a random offset for unary calls.
|
||||||
// TODO(wonderfly): whether or not this is a good approach needs further discussion.
|
// TODO(wonderfly): whether or not this is a good approach needs further discussion.
|
||||||
int offset = random.nextInt(
|
int offset = random.nextInt(compressableBuffer.size());
|
||||||
compressable ? compressableBuffer.size() : uncompressableBuffer.size());
|
ByteString payload = generatePayload(compressableBuffer, offset, req.getResponseSize());
|
||||||
ByteString payload = generatePayload(dataBuffer, offset, req.getResponseSize());
|
|
||||||
responseBuilder.setPayload(
|
responseBuilder.setPayload(
|
||||||
Payload.newBuilder()
|
Payload.newBuilder()
|
||||||
.setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE)
|
|
||||||
.setBody(payload));
|
.setBody(payload));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -211,7 +200,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
||||||
public StreamObserver<Messages.StreamingOutputCallRequest> halfDuplexCall(
|
public StreamObserver<Messages.StreamingOutputCallRequest> halfDuplexCall(
|
||||||
final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
|
final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
|
||||||
final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver);
|
final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver);
|
||||||
final Queue<Chunk> chunks = new ArrayDeque<Chunk>();
|
final Queue<Chunk> chunks = new ArrayDeque<>();
|
||||||
return new StreamObserver<StreamingOutputCallRequest>() {
|
return new StreamObserver<StreamingOutputCallRequest>() {
|
||||||
@Override
|
@Override
|
||||||
public void onNext(StreamingOutputCallRequest request) {
|
public void onNext(StreamingOutputCallRequest request) {
|
||||||
|
|
@ -237,7 +226,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
||||||
* available, the stream is half-closed.
|
* available, the stream is half-closed.
|
||||||
*/
|
*/
|
||||||
private class ResponseDispatcher {
|
private class ResponseDispatcher {
|
||||||
private final Chunk completionChunk = new Chunk(0, 0, 0, false);
|
private final Chunk completionChunk = new Chunk(0, 0, 0);
|
||||||
private final Queue<Chunk> chunks;
|
private final Queue<Chunk> chunks;
|
||||||
private final StreamObserver<StreamingOutputCallResponse> responseStream;
|
private final StreamObserver<StreamingOutputCallResponse> responseStream;
|
||||||
private boolean scheduled;
|
private boolean scheduled;
|
||||||
|
|
@ -385,14 +374,11 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
||||||
public Queue<Chunk> toChunkQueue(StreamingOutputCallRequest request) {
|
public Queue<Chunk> toChunkQueue(StreamingOutputCallRequest request) {
|
||||||
Queue<Chunk> chunkQueue = new ArrayDeque<Chunk>();
|
Queue<Chunk> chunkQueue = new ArrayDeque<Chunk>();
|
||||||
int offset = 0;
|
int offset = 0;
|
||||||
boolean compressable = compressableResponse(request.getResponseType());
|
|
||||||
for (ResponseParameters params : request.getResponseParametersList()) {
|
for (ResponseParameters params : request.getResponseParametersList()) {
|
||||||
chunkQueue.add(new Chunk(params.getIntervalUs(), offset, params.getSize(), compressable));
|
chunkQueue.add(new Chunk(params.getIntervalUs(), offset, params.getSize()));
|
||||||
|
|
||||||
// Increment the offset past this chunk.
|
// Increment the offset past this chunk. Buffer need to be circular.
|
||||||
// Both buffers need to be circular.
|
offset = (offset + params.getSize()) % compressableBuffer.size();
|
||||||
offset = (offset + params.getSize())
|
|
||||||
% (compressable ? compressableBuffer.size() : uncompressableBuffer.size());
|
|
||||||
}
|
}
|
||||||
return chunkQueue;
|
return chunkQueue;
|
||||||
}
|
}
|
||||||
|
|
@ -400,20 +386,18 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
||||||
/**
|
/**
|
||||||
* A single chunk of a response stream. Contains delivery information for the dispatcher and can
|
* A single chunk of a response stream. Contains delivery information for the dispatcher and can
|
||||||
* be converted to a streaming response proto. A chunk just references it's payload in the
|
* be converted to a streaming response proto. A chunk just references it's payload in the
|
||||||
* {@link #uncompressableBuffer} array. The payload isn't actually created until {@link
|
* {@link #compressableBuffer} array. The payload isn't actually created until {@link
|
||||||
* #toResponse()} is called.
|
* #toResponse()} is called.
|
||||||
*/
|
*/
|
||||||
private class Chunk {
|
private class Chunk {
|
||||||
private final int delayMicroseconds;
|
private final int delayMicroseconds;
|
||||||
private final int offset;
|
private final int offset;
|
||||||
private final int length;
|
private final int length;
|
||||||
private final boolean compressable;
|
|
||||||
|
|
||||||
public Chunk(int delayMicroseconds, int offset, int length, boolean compressable) {
|
public Chunk(int delayMicroseconds, int offset, int length) {
|
||||||
this.delayMicroseconds = delayMicroseconds;
|
this.delayMicroseconds = delayMicroseconds;
|
||||||
this.offset = offset;
|
this.offset = offset;
|
||||||
this.length = length;
|
this.length = length;
|
||||||
this.compressable = compressable;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -422,57 +406,14 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
||||||
private StreamingOutputCallResponse toResponse() {
|
private StreamingOutputCallResponse toResponse() {
|
||||||
StreamingOutputCallResponse.Builder responseBuilder =
|
StreamingOutputCallResponse.Builder responseBuilder =
|
||||||
StreamingOutputCallResponse.newBuilder();
|
StreamingOutputCallResponse.newBuilder();
|
||||||
ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer;
|
ByteString payload = generatePayload(compressableBuffer, offset, length);
|
||||||
ByteString payload = generatePayload(dataBuffer, offset, length);
|
|
||||||
responseBuilder.setPayload(
|
responseBuilder.setPayload(
|
||||||
Payload.newBuilder()
|
Payload.newBuilder()
|
||||||
.setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE)
|
|
||||||
.setBody(payload));
|
.setBody(payload));
|
||||||
return responseBuilder.build();
|
return responseBuilder.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a buffer with data read from a file.
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("Finally") // Not concerned about suppression; expected to be exceedingly rare
|
|
||||||
private ByteString createBufferFromFile(String fileClassPath) {
|
|
||||||
ByteString buffer = ByteString.EMPTY;
|
|
||||||
InputStream inputStream = getClass().getResourceAsStream(fileClassPath);
|
|
||||||
if (inputStream == null) {
|
|
||||||
throw new IllegalArgumentException("Unable to locate file on classpath: " + fileClassPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
buffer = ByteString.readFrom(inputStream);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
inputStream.close();
|
|
||||||
} catch (IOException ignorable) {
|
|
||||||
// ignore
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return buffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicates whether or not the response for this type should be compressable. If {@code RANDOM},
|
|
||||||
* picks a random boolean.
|
|
||||||
*/
|
|
||||||
private boolean compressableResponse(PayloadType responseType) {
|
|
||||||
switch (responseType) {
|
|
||||||
case COMPRESSABLE:
|
|
||||||
return true;
|
|
||||||
case RANDOM:
|
|
||||||
return random.nextBoolean();
|
|
||||||
case UNCOMPRESSABLE:
|
|
||||||
default:
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Generates a payload of desired type and size. Reads compressableBuffer or
|
* Generates a payload of desired type and size. Reads compressableBuffer or
|
||||||
* uncompressableBuffer as a circular buffer.
|
* uncompressableBuffer as a circular buffer.
|
||||||
|
|
@ -506,7 +447,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
||||||
* testing end-to-end metadata propagation.
|
* testing end-to-end metadata propagation.
|
||||||
*/
|
*/
|
||||||
private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) {
|
private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) {
|
||||||
final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys));
|
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
|
||||||
return new ServerInterceptor() {
|
return new ServerInterceptor() {
|
||||||
@Override
|
@Override
|
||||||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||||
|
|
@ -534,7 +475,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
||||||
* Echoes request headers with the specified key(s) from a client into response headers only.
|
* Echoes request headers with the specified key(s) from a client into response headers only.
|
||||||
*/
|
*/
|
||||||
private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) {
|
private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) {
|
||||||
final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys));
|
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
|
||||||
return new ServerInterceptor() {
|
return new ServerInterceptor() {
|
||||||
@Override
|
@Override
|
||||||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||||
|
|
@ -561,7 +502,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
||||||
* Echoes request headers with the specified key(s) from a client into response trailers only.
|
* Echoes request headers with the specified key(s) from a client into response trailers only.
|
||||||
*/
|
*/
|
||||||
private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) {
|
private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) {
|
||||||
final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys));
|
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
|
||||||
return new ServerInterceptor() {
|
return new ServerInterceptor() {
|
||||||
@Override
|
@Override
|
||||||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||||
|
|
|
||||||
|
|
@ -11,34 +11,25 @@
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
// Message definitions to be used by integration test service definitions.
|
// Message definitions to be used by integration test service definitions.
|
||||||
|
|
||||||
syntax = "proto3";
|
syntax = "proto3";
|
||||||
|
|
||||||
import "google/protobuf/wrappers.proto";
|
|
||||||
|
|
||||||
package grpc.testing;
|
package grpc.testing;
|
||||||
|
|
||||||
option java_package = "io.grpc.testing.integration";
|
option java_package = "io.grpc.testing.integration";
|
||||||
|
|
||||||
// DEPRECATED, don't use. To be removed shortly.
|
// TODO(jihuncho): Use well-known types once all languages are synced.
|
||||||
// The type of payload that should be returned.
|
message BoolValue {
|
||||||
enum PayloadType {
|
// The bool value.
|
||||||
// Compressable text format.
|
bool value = 1;
|
||||||
COMPRESSABLE = 0;
|
|
||||||
|
|
||||||
// Uncompressable binary format.
|
|
||||||
UNCOMPRESSABLE = 1;
|
|
||||||
|
|
||||||
// Randomly chosen from all other formats defined in this enum.
|
|
||||||
RANDOM = 2;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// A block of data, to simply increase gRPC message size.
|
// A block of data, to simply increase gRPC message size.
|
||||||
message Payload {
|
message Payload {
|
||||||
// DEPRECATED, don't use. To be removed shortly.
|
reserved 1;
|
||||||
// The type of data in body.
|
|
||||||
PayloadType type = 1;
|
|
||||||
// Primary contents of payload.
|
// Primary contents of payload.
|
||||||
bytes body = 2;
|
bytes body = 2;
|
||||||
}
|
}
|
||||||
|
|
@ -52,10 +43,7 @@ message EchoStatus {
|
||||||
|
|
||||||
// Unary request.
|
// Unary request.
|
||||||
message SimpleRequest {
|
message SimpleRequest {
|
||||||
// DEPRECATED, don't use. To be removed shortly.
|
reserved 1;
|
||||||
// Desired payload type in the response from the server.
|
|
||||||
// If response_type is RANDOM, server randomly chooses one from other formats.
|
|
||||||
PayloadType response_type = 1;
|
|
||||||
|
|
||||||
// Desired payload size in the response from the server.
|
// Desired payload size in the response from the server.
|
||||||
int32 response_size = 2;
|
int32 response_size = 2;
|
||||||
|
|
@ -73,13 +61,13 @@ message SimpleRequest {
|
||||||
// "nullable" in order to interoperate seamlessly with clients not able to
|
// "nullable" in order to interoperate seamlessly with clients not able to
|
||||||
// implement the full compression tests by introspecting the call to verify
|
// implement the full compression tests by introspecting the call to verify
|
||||||
// the response's compression status.
|
// the response's compression status.
|
||||||
google.protobuf.BoolValue response_compressed = 6;
|
BoolValue response_compressed = 6;
|
||||||
|
|
||||||
// Whether server should return a given status
|
// Whether server should return a given status
|
||||||
EchoStatus response_status = 7;
|
EchoStatus response_status = 7;
|
||||||
|
|
||||||
// Whether the server should expect this request to be compressed.
|
// Whether the server should expect this request to be compressed.
|
||||||
google.protobuf.BoolValue expect_compressed = 8;
|
BoolValue expect_compressed = 8;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unary response, as configured by the request.
|
// Unary response, as configured by the request.
|
||||||
|
|
@ -106,7 +94,7 @@ message StreamingInputCallRequest {
|
||||||
// is "nullable" in order to interoperate seamlessly with servers not able to
|
// is "nullable" in order to interoperate seamlessly with servers not able to
|
||||||
// implement the full compression tests by introspecting the call to verify
|
// implement the full compression tests by introspecting the call to verify
|
||||||
// the request's compression status.
|
// the request's compression status.
|
||||||
google.protobuf.BoolValue expect_compressed = 2;
|
BoolValue expect_compressed = 2;
|
||||||
|
|
||||||
// Not expecting any payload from the response.
|
// Not expecting any payload from the response.
|
||||||
}
|
}
|
||||||
|
|
@ -130,17 +118,12 @@ message ResponseParameters {
|
||||||
// "nullable" in order to interoperate seamlessly with clients not able to
|
// "nullable" in order to interoperate seamlessly with clients not able to
|
||||||
// implement the full compression tests by introspecting the call to verify
|
// implement the full compression tests by introspecting the call to verify
|
||||||
// the response's compression status.
|
// the response's compression status.
|
||||||
google.protobuf.BoolValue compressed = 3;
|
BoolValue compressed = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Server-streaming request.
|
// Server-streaming request.
|
||||||
message StreamingOutputCallRequest {
|
message StreamingOutputCallRequest {
|
||||||
// DEPRECATED, don't use. To be removed shortly.
|
reserved 1;
|
||||||
// Desired payload type in the response from the server.
|
|
||||||
// If response_type is RANDOM, the payload from each response in the stream
|
|
||||||
// might be of different types. This is to simulate a mixed type of payload
|
|
||||||
// stream.
|
|
||||||
PayloadType response_type = 1;
|
|
||||||
|
|
||||||
// Configuration for each expected response message.
|
// Configuration for each expected response message.
|
||||||
repeated ResponseParameters response_parameters = 2;
|
repeated ResponseParameters response_parameters = 2;
|
||||||
|
|
|
||||||
|
|
@ -68,7 +68,7 @@ service TestService {
|
||||||
// that case.
|
// that case.
|
||||||
service UnimplementedService {
|
service UnimplementedService {
|
||||||
// A call that no server should implement
|
// A call that no server should implement
|
||||||
rpc UnimplementedCall(grpc.testing.Empty) returns(grpc.testing.Empty);
|
rpc UnimplementedCall(grpc.testing.Empty) returns (grpc.testing.Empty);
|
||||||
}
|
}
|
||||||
|
|
||||||
// A service used to control reconnect server.
|
// A service used to control reconnect server.
|
||||||
|
|
|
||||||
Binary file not shown.
|
|
@ -26,7 +26,6 @@ import io.grpc.netty.NegotiationType;
|
||||||
import io.grpc.netty.NettyChannelBuilder;
|
import io.grpc.netty.NettyChannelBuilder;
|
||||||
import io.grpc.netty.NettyServerBuilder;
|
import io.grpc.netty.NettyServerBuilder;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
import io.grpc.testing.integration.Messages.PayloadType;
|
|
||||||
import io.grpc.testing.integration.Messages.ResponseParameters;
|
import io.grpc.testing.integration.Messages.ResponseParameters;
|
||||||
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
|
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
|
||||||
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
|
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
|
||||||
|
|
@ -110,8 +109,7 @@ public class ConcurrencyTest {
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
// Prepare the request.
|
// Prepare the request.
|
||||||
StreamingOutputCallRequest.Builder requestBuilder = StreamingOutputCallRequest.newBuilder()
|
StreamingOutputCallRequest.Builder requestBuilder = StreamingOutputCallRequest.newBuilder();
|
||||||
.setResponseType(PayloadType.RANDOM);
|
|
||||||
for (int i = 0; i < NUM_RESPONSES_PER_REQUEST; i++) {
|
for (int i = 0; i < NUM_RESPONSES_PER_REQUEST; i++) {
|
||||||
requestBuilder.addResponseParameters(ResponseParameters.newBuilder()
|
requestBuilder.addResponseParameters(ResponseParameters.newBuilder()
|
||||||
.setSize(1000)
|
.setSize(1000)
|
||||||
|
|
|
||||||
|
|
@ -115,8 +115,7 @@ public class Http2OkHttpTest extends AbstractInteropTest {
|
||||||
Messages.ResponseParameters.newBuilder()
|
Messages.ResponseParameters.newBuilder()
|
||||||
.setSize(1);
|
.setSize(1);
|
||||||
Messages.StreamingOutputCallRequest.Builder requestBuilder =
|
Messages.StreamingOutputCallRequest.Builder requestBuilder =
|
||||||
Messages.StreamingOutputCallRequest.newBuilder()
|
Messages.StreamingOutputCallRequest.newBuilder();
|
||||||
.setResponseType(Messages.PayloadType.COMPRESSABLE);
|
|
||||||
for (int i = 0; i < 1000; i++) {
|
for (int i = 0; i < 1000; i++) {
|
||||||
requestBuilder.addResponseParameters(responseParameters);
|
requestBuilder.addResponseParameters(responseParameters);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ package io.grpc.testing.integration;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import com.google.protobuf.BoolValue;
|
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import io.grpc.CallOptions;
|
import io.grpc.CallOptions;
|
||||||
import io.grpc.Channel;
|
import io.grpc.Channel;
|
||||||
|
|
@ -41,8 +40,8 @@ import io.grpc.internal.AbstractServerImplBuilder;
|
||||||
import io.grpc.internal.GrpcUtil;
|
import io.grpc.internal.GrpcUtil;
|
||||||
import io.grpc.netty.NettyChannelBuilder;
|
import io.grpc.netty.NettyChannelBuilder;
|
||||||
import io.grpc.netty.NettyServerBuilder;
|
import io.grpc.netty.NettyServerBuilder;
|
||||||
|
import io.grpc.testing.integration.Messages.BoolValue;
|
||||||
import io.grpc.testing.integration.Messages.Payload;
|
import io.grpc.testing.integration.Messages.Payload;
|
||||||
import io.grpc.testing.integration.Messages.PayloadType;
|
|
||||||
import io.grpc.testing.integration.Messages.SimpleRequest;
|
import io.grpc.testing.integration.Messages.SimpleRequest;
|
||||||
import io.grpc.testing.integration.Messages.SimpleResponse;
|
import io.grpc.testing.integration.Messages.SimpleResponse;
|
||||||
import java.io.FilterInputStream;
|
import java.io.FilterInputStream;
|
||||||
|
|
@ -107,13 +106,11 @@ public class TransportCompressionTest extends AbstractInteropTest {
|
||||||
final SimpleRequest request = SimpleRequest.newBuilder()
|
final SimpleRequest request = SimpleRequest.newBuilder()
|
||||||
.setResponseSize(314159)
|
.setResponseSize(314159)
|
||||||
.setResponseCompressed(BoolValue.newBuilder().setValue(true))
|
.setResponseCompressed(BoolValue.newBuilder().setValue(true))
|
||||||
.setResponseType(PayloadType.COMPRESSABLE)
|
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setBody(ByteString.copyFrom(new byte[271828])))
|
.setBody(ByteString.copyFrom(new byte[271828])))
|
||||||
.build();
|
.build();
|
||||||
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
|
||||||
.setPayload(Payload.newBuilder()
|
.setPayload(Payload.newBuilder()
|
||||||
.setType(PayloadType.COMPRESSABLE)
|
|
||||||
.setBody(ByteString.copyFrom(new byte[314159])))
|
.setBody(ByteString.copyFrom(new byte[314159])))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue