servlet: Implement gRPC server as a Servlet (#8596)

Full end to end implementation of gRPC server as a Servlet including tests and examples

Co-authored-by: Penn (Dapeng) Zhang <zdapeng@google.com>
Co-authored-by: Chengyuan Zhang <chengyuanzhang@google.com>
This commit is contained in:
Colin Alworth 2023-01-20 15:17:58 -06:00 committed by GitHub
parent 44847bf4e9
commit 706646f8bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 3387 additions and 5 deletions

View File

@ -33,6 +33,7 @@ $ VERSION_FILES=(
examples/example-jwt-auth/pom.xml
examples/example-hostname/build.gradle
examples/example-hostname/pom.xml
examples/example-servlet/build.gradle
examples/example-tls/build.gradle
examples/example-tls/pom.xml
examples/example-xds/build.gradle

View File

@ -19,6 +19,8 @@ def subprojects = [
project(':grpc-protobuf-lite'),
project(':grpc-rls'),
project(':grpc-services'),
project(':grpc-servlet'),
project(':grpc-servlet-jakarta'),
project(':grpc-stub'),
project(':grpc-testing'),
project(':grpc-xds'),

View File

@ -0,0 +1,37 @@
# Hello World Example using Servlets
This example uses Java Servlets instead of Netty for the gRPC server. This example requires `grpc-java`
and `protoc-gen-grpc-java` to already be built. You are strongly encouraged to check out a git release
tag, since these builds will already be available.
```bash
git checkout v<major>.<minor>.<patch>
```
Otherwise, you must follow [COMPILING](../../COMPILING.md).
To build the example,
1. **[Install gRPC Java library SNAPSHOT locally, including code generation plugin](../../COMPILING.md) (Only need this step for non-released versions, e.g. master HEAD).**
2. In this directory, build the war file
```bash
$ ../gradlew war
```
To run this, deploy the war, now found in `build/libs/example-servlet.war` to your choice of servlet
container. Note that this container must support the Servlet 4.0 spec, for this particular example must
use `javax.servlet` packages instead of the more modern `jakarta.servlet`, though there is a `grpc-servlet-jakarta`
artifact that can be used for Jakarta support. Be sure to enable http/2 support in the servlet container,
or clients will not be able to connect.
To test that this is working properly, build the HelloWorldClient example and direct it to connect to your
http/2 server. From the parent directory:
1. Build the executables:
```bash
$ ../gradlew installDist
```
2. Run the client app, specifying the name to say hello to and the server's address:
```bash
$ ./build/install/examples/bin/hello-world-client World localhost:8080
```

View File

@ -0,0 +1,46 @@
plugins {
// ASSUMES GRADLE 5.6 OR HIGHER. Use plugin version 0.8.10 with earlier gradle versions
id 'com.google.protobuf' version '0.8.17'
// Generate IntelliJ IDEA's .idea & .iml project files
id 'idea'
id 'war'
}
repositories {
maven { // The google mirror is less flaky than mavenCentral()
url "https://maven-central.storage-download.googleapis.com/maven2/" }
mavenLocal()
}
sourceCompatibility = 1.8
targetCompatibility = 1.8
def grpcVersion = '1.53.0-SNAPSHOT' // CURRENT_GRPC_VERSION
def protocVersion = '3.21.7'
dependencies {
implementation "io.grpc:grpc-protobuf:${grpcVersion}",
"io.grpc:grpc-servlet:${grpcVersion}",
"io.grpc:grpc-stub:${grpcVersion}"
providedImplementation "javax.servlet:javax.servlet-api:4.0.1",
"org.apache.tomcat:annotations-api:6.0.53"
}
protobuf {
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
plugins { grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" } }
generateProtoTasks {
all()*.plugins { grpc {} }
}
}
// Inform IDEs like IntelliJ IDEA, Eclipse or NetBeans about the generated code.
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}

View File

@ -0,0 +1,8 @@
pluginManagement {
repositories {
maven { // The google mirror is less flaky than mavenCentral()
url "https://maven-central.storage-download.googleapis.com/maven2/"
}
gradlePluginPortal()
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet.examples.helloworld;
import io.grpc.stub.StreamObserver;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.servlet.ServletAdapter;
import io.grpc.servlet.ServletServerBuilder;
import java.io.IOException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* A servlet that hosts a gRPC server over HTTP/2 and shares the resource URI for the normal servlet
* clients over HTTP/1.0+.
*
* <p>For creating a servlet that solely serves gRPC services, do not follow this example, simply
* extend or register a {@link io.grpc.servlet.GrpcServlet} instead.
*/
@WebServlet(urlPatterns = {"/helloworld.Greeter/SayHello"}, asyncSupported = true)
public class HelloWorldServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private final ServletAdapter servletAdapter =
new ServletServerBuilder().addService(new GreeterImpl()).buildServletAdapter();
private static final class GreeterImpl extends GreeterGrpc.GreeterImplBase {
GreeterImpl() {}
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws IOException {
response.setContentType("text/html");
response.getWriter().println("<p>Hello World!</p>");
}
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response)
throws IOException {
if (ServletAdapter.isGrpc(request)) {
servletAdapter.doPost(request, response);
} else {
response.setContentType("text/html");
response.getWriter().println("<p>Hello non-gRPC client!</p>");
}
}
@Override
public void destroy() {
servletAdapter.destroy();
super.destroy();
}
}

View File

@ -0,0 +1,37 @@
// Copyright 2015 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}

View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Need this file for deployment to GlassFish -->
<!DOCTYPE glassfish-web-app PUBLIC "-//GlassFish.org//DTD GlassFish Application Server 3.1 Servlet 3.0//EN" "http://glassfish.org/dtds/glassfish-web-app_3_0-1.dtd">
<glassfish-web-app error-url="">
<class-loader delegate="false"/>
</glassfish-web-app>

View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Need this file for deployment to WildFly -->
<jboss-web xmlns="http://www.jboss.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.jboss.com/xml/ns/javaee
http://www.jboss.org/j2ee/schema/jboss-web_5_1.xsd">
<context-root>/</context-root>
</jboss-web>

View File

@ -244,11 +244,8 @@ public abstract class AbstractInteropTest {
protected static final Empty EMPTY = Empty.getDefaultInstance();
private void startServer() {
maybeStartHandshakerServer();
ServerBuilder<?> builder = getServerBuilder();
private void configBuilder(@Nullable ServerBuilder<?> builder) {
if (builder == null) {
server = null;
return;
}
testServiceExecutor = Executors.newScheduledThreadPool(2);
@ -266,6 +263,14 @@ public abstract class AbstractInteropTest {
new TestServiceImpl(testServiceExecutor),
allInterceptors))
.addStreamTracerFactory(serverStreamTracerFactory);
}
protected void startServer(@Nullable ServerBuilder<?> builder) {
maybeStartHandshakerServer();
if (builder == null) {
server = null;
return;
}
try {
server = builder.build().start();
@ -333,7 +338,9 @@ public abstract class AbstractInteropTest {
*/
@Before
public void setUp() {
startServer();
ServerBuilder<?> serverBuilder = getServerBuilder();
configBuilder(serverBuilder);
startServer(serverBuilder);
channel = createChannel();
blockingStub =

View File

@ -16,10 +16,12 @@
package io.grpc.netty;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Internal;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
@ -107,5 +109,11 @@ public final class InternalNettyChannelBuilder {
return builder.buildTransportFactory();
}
@VisibleForTesting
public static void setTransportTracerFactory(
NettyChannelBuilder builder, TransportTracer.Factory factory) {
builder.setTransportTracerFactory(factory);
}
private InternalNettyChannelBuilder() {}
}

111
servlet/build.gradle Normal file
View File

@ -0,0 +1,111 @@
plugins {
id "java-library"
id "maven-publish"
}
description = "gRPC: Servlet"
// javax.servlet-api 4.0 requires a minimum of Java 8, so we might as well use that source level
sourceCompatibility = 1.8
targetCompatibility = 1.8
def jettyVersion = '10.0.7'
configurations {
itImplementation.extendsFrom(implementation)
undertowTestImplementation.extendsFrom(itImplementation)
tomcatTestImplementation.extendsFrom(itImplementation)
jettyTestImplementation.extendsFrom(itImplementation)
}
sourceSets {
// Create a test sourceset for each classpath - could be simplified if we made new test directories
undertowTest {}
tomcatTest {}
// Only compile these tests if java 11+ is being used
if (JavaVersion.current().isJava11Compatible()) {
jettyTest {}
}
}
dependencies {
api project(':grpc-api')
compileOnly 'javax.servlet:javax.servlet-api:4.0.1',
libraries.javax.annotation // java 9, 10 needs it
implementation project(':grpc-core'),
libraries.guava
testImplementation 'javax.servlet:javax.servlet-api:4.0.1',
'org.jetbrains.kotlinx:lincheck:2.14.1'
itImplementation project(':grpc-servlet'),
project(':grpc-netty'),
project(':grpc-core').sourceSets.test.runtimeClasspath,
libraries.junit
itImplementation(project(':grpc-interop-testing')) {
// Avoid grpc-netty-shaded dependency
exclude group: 'io.grpc', module: 'grpc-alts'
exclude group: 'io.grpc', module: 'grpc-xds'
}
undertowTestImplementation 'io.undertow:undertow-servlet:2.2.14.Final'
tomcatTestImplementation 'org.apache.tomcat.embed:tomcat-embed-core:9.0.56'
jettyTestImplementation "org.eclipse.jetty:jetty-servlet:${jettyVersion}",
"org.eclipse.jetty.http2:http2-server:${jettyVersion}",
"org.eclipse.jetty:jetty-client:${jettyVersion}"
project(':grpc-testing')
}
test {
if (JavaVersion.current().isJava9Compatible()) {
jvmArgs += [
// required for Lincheck
'--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED',
'--add-exports=java.base/jdk.internal.util=ALL-UNNAMED',
]
}
}
// Set up individual classpaths for each test, to avoid any mismatch,
// and ensure they are only used when supported by the current jvm
check.dependsOn(tasks.register('undertowTest', Test) {
classpath = sourceSets.undertowTest.runtimeClasspath
testClassesDirs = sourceSets.undertowTest.output.classesDirs
})
check.dependsOn(tasks.register('tomcat9Test', Test) {
classpath = sourceSets.tomcatTest.runtimeClasspath
testClassesDirs = sourceSets.tomcatTest.output.classesDirs
// Provide a temporary directory for tomcat to be deleted after test finishes
def tomcatTempDir = "$buildDir/tomcat_catalina_base"
systemProperty 'catalina.base', tomcatTempDir
doLast {
file(tomcatTempDir).deleteDir()
}
// tomcat-embed-core 9 presently performs illegal reflective access on
// java.io.ObjectStreamClass$Caches.localDescs and sun.rmi.transport.Target.ccl,
// see https://lists.apache.org/thread/s0xr7tk2kfkkxfjps9n7dhh4cypfdhyy
if (JavaVersion.current().isJava9Compatible()) {
jvmArgs += ['--add-opens=java.base/java.io=ALL-UNNAMED', '--add-opens=java.rmi/sun.rmi.transport=ALL-UNNAMED']
}
})
// Only run these tests if java 11+ is being used
if (JavaVersion.current().isJava11Compatible()) {
check.dependsOn(tasks.register('jettyTest', Test) {
classpath = sourceSets.jettyTest.runtimeClasspath
testClassesDirs = sourceSets.jettyTest.output.classesDirs
})
}
jacocoTestReport {
executionData undertowTest, tomcat9Test
if (JavaVersion.current().isJava11Compatible()) {
executionData jettyTest
}
}

View File

@ -0,0 +1,128 @@
plugins {
id "java-library"
id "maven-publish"
}
description = "gRPC: Jakarta Servlet"
sourceCompatibility = 1.8
targetCompatibility = 1.8
// Set up classpaths and source directories for different servlet tests
configurations {
itImplementation.extendsFrom(implementation)
jettyTestImplementation.extendsFrom(itImplementation)
tomcatTestImplementation.extendsFrom(itImplementation)
undertowTestImplementation.extendsFrom(itImplementation)
}
sourceSets {
undertowTest {
java {
include '**/Undertow*.java'
}
}
tomcatTest {
java {
include '**/Tomcat*.java'
}
}
// Only run these tests if java 11+ is being used
if (JavaVersion.current().isJava11Compatible()) {
jettyTest {
java {
include '**/Jetty*.java'
}
}
}
}
// Mechanically transform sources from grpc-servlet to use the corrected packages
def migrate(String name, String inputDir, SourceSet sourceSet) {
def outputDir = layout.buildDirectory.dir('generated/sources/jakarta-' + name)
sourceSet.java.srcDir outputDir
return tasks.register('migrateSources' + name.capitalize(), Sync) { task ->
into(outputDir)
from("$inputDir/io/grpc/servlet") {
into('io/grpc/servlet/jakarta')
filter { String line ->
line.replaceAll('javax\\.servlet', 'jakarta.servlet')
.replaceAll('io\\.grpc\\.servlet', 'io.grpc.servlet.jakarta')
}
}
}
}
compileJava.dependsOn migrate('main', '../src/main/java', sourceSets.main)
sourcesJar.dependsOn migrateSourcesMain
// Build the set of sourceSets and classpaths to modify, since Jetty 11 requires Java 11
// and must be skipped
compileUndertowTestJava.dependsOn(migrate('undertowTest', '../src/undertowTest/java', sourceSets.undertowTest))
compileTomcatTestJava.dependsOn(migrate('tomcatTest', '../src/tomcatTest/java', sourceSets.tomcatTest))
if (JavaVersion.current().isJava11Compatible()) {
compileJettyTestJava.dependsOn(migrate('jettyTest', '../src/jettyTest/java', sourceSets.jettyTest))
}
// Disable checkstyle for this project, since it consists only of generated code
tasks.withType(Checkstyle) {
enabled = false
}
dependencies {
api project(':grpc-api')
compileOnly 'jakarta.servlet:jakarta.servlet-api:5.0.0',
libraries.javax.annotation
implementation project(':grpc-core'),
libraries.guava
itImplementation project(':grpc-servlet-jakarta'),
project(':grpc-netty'),
project(':grpc-core').sourceSets.test.runtimeClasspath,
libraries.junit
itImplementation(project(':grpc-interop-testing')) {
// Avoid grpc-netty-shaded dependency
exclude group: 'io.grpc', module: 'grpc-alts'
exclude group: 'io.grpc', module: 'grpc-xds'
}
tomcatTestImplementation 'org.apache.tomcat.embed:tomcat-embed-core:10.0.14'
jettyTestImplementation "org.eclipse.jetty:jetty-servlet:11.0.7",
"org.eclipse.jetty.http2:http2-server:11.0.7"
undertowTestImplementation 'io.undertow:undertow-servlet-jakartaee9:2.2.13.Final'
}
// Set up individual classpaths for each test, to avoid any mismatch,
// and ensure they are only used when supported by the current jvm
check.dependsOn(tasks.register('undertowTest', Test) {
classpath = sourceSets.undertowTest.runtimeClasspath
testClassesDirs = sourceSets.undertowTest.output.classesDirs
})
check.dependsOn(tasks.register('tomcat10Test', Test) {
classpath = sourceSets.tomcatTest.runtimeClasspath
testClassesDirs = sourceSets.tomcatTest.output.classesDirs
// Provide a temporary directory for tomcat to be deleted after test finishes
def tomcatTempDir = "$buildDir/tomcat_catalina_base"
systemProperty 'catalina.base', tomcatTempDir
doLast {
file(tomcatTempDir).deleteDir()
}
// tomcat-embed-core 10 presently performs illegal reflective access on
// java.io.ObjectStreamClass$Caches.localDescs and sun.rmi.transport.Target.ccl,
// see https://lists.apache.org/thread/s0xr7tk2kfkkxfjps9n7dhh4cypfdhyy
if (JavaVersion.current().isJava9Compatible()) {
jvmArgs += ['--add-opens=java.base/java.io=ALL-UNNAMED', '--add-opens=java.rmi/sun.rmi.transport=ALL-UNNAMED']
}
})
// Only run these tests if java 11+ is being used
if (JavaVersion.current().isJava11Compatible()) {
check.dependsOn(tasks.register('jetty11Test', Test) {
classpath = sourceSets.jettyTest.runtimeClasspath
testClassesDirs = sourceSets.jettyTest.output.classesDirs
})
}

View File

@ -0,0 +1,124 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import static com.google.common.truth.Truth.assertThat;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import io.grpc.BindableService;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.integration.Messages.Payload;
import io.grpc.testing.integration.Messages.SimpleRequest;
import io.grpc.testing.integration.Messages.SimpleResponse;
import io.grpc.testing.integration.TestServiceGrpc;
import io.grpc.testing.integration.TestServiceImpl;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.http2.parser.RateControl;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Smoke test for {@link GrpcServlet}. */
@RunWith(JUnit4.class)
public class GrpcServletSmokeTest {
private static final String HOST = "localhost";
private static final String MYAPP = "/grpc.testing.TestService";
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
private int port;
private Server server;
@Before
public void startServer() {
BindableService service = new TestServiceImpl(scheduledExecutorService);
GrpcServlet grpcServlet = new GrpcServlet(ImmutableList.of(service));
server = new Server(0);
ServerConnector sc = (ServerConnector)server.getConnectors()[0];
HTTP2CServerConnectionFactory factory =
new HTTP2CServerConnectionFactory(new HttpConfiguration());
// Explicitly disable safeguards against malicious clients, as some unit tests trigger this
factory.setRateControlFactory(new RateControl.Factory() {});
sc.addConnectionFactory(factory);
ServletContextHandler context =
new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(MYAPP);
context.addServlet(new ServletHolder(grpcServlet), "/*");
server.setHandler(context);
try {
server.start();
} catch (Exception e) {
throw new AssertionError(e);
}
port = sc.getLocalPort();
}
@After
public void tearDown() {
scheduledExecutorService.shutdown();
try {
server.stop();
} catch (Exception e) {
throw new AssertionError(e);
}
}
@Test
public void unaryCall() {
Channel channel = cleanupRule.register(
ManagedChannelBuilder.forAddress(HOST, port).usePlaintext().build());
SimpleResponse response = TestServiceGrpc.newBlockingStub(channel).unaryCall(
SimpleRequest.newBuilder()
.setResponseSize(1234)
.setPayload(Payload.newBuilder().setBody(ByteString.copyFromUtf8("hello foo")))
.build());
assertThat(response.getPayload().getBody().size()).isEqualTo(1234);
}
@Test
public void httpGetRequest() throws Exception {
HttpClient httpClient = new HttpClient();
try {
httpClient.start();
ContentResponse response =
httpClient.GET("http://" + HOST + ":" + port + MYAPP + "/UnaryCall");
assertThat(response.getStatus()).isEqualTo(405);
assertThat(response.getContentAsString()).contains("GET method not supported");
} finally {
httpClient.stop();
}
}
}

View File

@ -0,0 +1,94 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ServerBuilder;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.testing.integration.AbstractInteropTest;
import org.eclipse.jetty.http2.parser.RateControl;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
public class JettyInteropTest extends AbstractInteropTest {
private static final String HOST = "localhost";
private static final String MYAPP = "/grpc.testing.TestService";
private int port;
private Server server;
@After
@Override
public void tearDown() {
super.tearDown();
try {
server.stop();
} catch (Exception e) {
throw new AssertionError(e);
}
}
@Override
protected ServerBuilder<?> getServerBuilder() {
return new ServletServerBuilder().maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
}
@Override
protected void startServer(ServerBuilder<?> builer) {
GrpcServlet grpcServlet =
new GrpcServlet(((ServletServerBuilder) builer).buildServletAdapter());
server = new Server(0);
ServerConnector sc = (ServerConnector)server.getConnectors()[0];
HTTP2CServerConnectionFactory factory =
new HTTP2CServerConnectionFactory(new HttpConfiguration());
// Explicitly disable safeguards against malicious clients, as some unit tests trigger this
factory.setRateControlFactory(new RateControl.Factory() {});
sc.addConnectionFactory(factory);
ServletContextHandler context =
new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(MYAPP);
context.addServlet(new ServletHolder(grpcServlet), "/*");
server.setHandler(context);
try {
server.start();
} catch (Exception e) {
throw new AssertionError(e);
}
port = sc.getLocalPort();
}
@Override
protected ManagedChannelBuilder<?> createChannelBuilder() {
NettyChannelBuilder builder =
(NettyChannelBuilder) ManagedChannelBuilder.forAddress(HOST, port)
.usePlaintext()
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
InternalNettyChannelBuilder.setStatsEnabled(builder, false);
builder.intercept(createCensusStatsClientInterceptor());
return builder;
}
}

View File

@ -0,0 +1,249 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import io.grpc.InternalChannelz;
import io.grpc.InternalInstrumented;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractTransportTest;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.FakeClock;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransportListener;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.eclipse.jetty.http2.parser.RateControl;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.Ignore;
import org.junit.Test;
public class JettyTransportTest extends AbstractTransportTest {
private static final String MYAPP = "/service";
private final FakeClock fakeClock = new FakeClock();
private Server jettyServer;
private int port;
@Override
protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
return new InternalServer() {
final InternalServer delegate =
new ServletServerBuilder().buildTransportServers(streamTracerFactories);
@Override
public void start(ServerListener listener) throws IOException {
delegate.start(listener);
ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService();
ServerTransportListener serverTransportListener =
listener.transportCreated(new ServletServerBuilder.ServerTransportImpl(scheduler));
ServletAdapter adapter =
new ServletAdapter(serverTransportListener, streamTracerFactories,
Integer.MAX_VALUE);
GrpcServlet grpcServlet = new GrpcServlet(adapter);
jettyServer = new Server(0);
ServerConnector sc = (ServerConnector) jettyServer.getConnectors()[0];
HttpConfiguration httpConfiguration = new HttpConfiguration();
// Must be set for several tests to pass, so that the request handling can begin before
// content arrives.
httpConfiguration.setDelayDispatchUntilContent(false);
HTTP2CServerConnectionFactory factory =
new HTTP2CServerConnectionFactory(httpConfiguration);
factory.setRateControlFactory(new RateControl.Factory() {
});
sc.addConnectionFactory(factory);
ServletContextHandler context =
new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(MYAPP);
context.addServlet(new ServletHolder(grpcServlet), "/*");
jettyServer.setHandler(context);
try {
jettyServer.start();
} catch (Exception e) {
throw new AssertionError(e);
}
port = sc.getLocalPort();
}
@Override
public void shutdown() {
delegate.shutdown();
}
@Override
public SocketAddress getListenSocketAddress() {
return delegate.getListenSocketAddress();
}
@Override
public InternalInstrumented<InternalChannelz.SocketStats> getListenSocketStats() {
return delegate.getListenSocketStats();
}
@Override
public List<? extends SocketAddress> getListenSocketAddresses() {
return delegate.getListenSocketAddresses();
}
@Nullable
@Override
public List<InternalInstrumented<InternalChannelz.SocketStats>> getListenSocketStatsList() {
return delegate.getListenSocketStatsList();
}
};
}
@Override
protected InternalServer newServer(int port,
List<ServerStreamTracer.Factory> streamTracerFactories) {
return newServer(streamTracerFactories);
}
@Override
protected ManagedClientTransport newClientTransport(InternalServer server) {
NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder
// Although specified here, address is ignored because we never call build.
.forAddress("localhost", 0)
.flowControlWindow(65 * 1024)
.negotiationType(NegotiationType.PLAINTEXT);
InternalNettyChannelBuilder
.setTransportTracerFactory(nettyChannelBuilder, fakeClockTransportTracer);
ClientTransportFactory clientFactory =
InternalNettyChannelBuilder.buildTransportFactory(nettyChannelBuilder);
return clientFactory.newClientTransport(
new InetSocketAddress("localhost", port),
new ClientTransportFactory.ClientTransportOptions()
.setAuthority(testAuthority(server))
.setEagAttributes(eagAttrs()),
transportLogger());
}
@Override
protected String testAuthority(InternalServer server) {
return "localhost:" + port;
}
@Override
protected void advanceClock(long offset, TimeUnit unit) {
fakeClock.forwardTime(offset, unit);
}
@Override
protected long fakeCurrentTimeNanos() {
return fakeClock.getTicker().read();
}
@Override
@Ignore("Skip the test, server lifecycle is managed by the container")
@Test
public void serverAlreadyListening() {
}
@Override
@Ignore("Skip the test, server lifecycle is managed by the container")
@Test
public void openStreamPreventsTermination() {
}
@Override
@Ignore("Skip the test, server lifecycle is managed by the container")
@Test
public void shutdownNowKillsServerStream() {
}
@Override
@Ignore("Skip the test, server lifecycle is managed by the container")
@Test
public void serverNotListening() {
}
// FIXME
@Override
@Ignore("Servlet flow control not implemented yet")
@Test
public void flowControlPushBack() {
}
// FIXME
@Override
@Ignore("Jetty is broken on client RST_STREAM")
@Test
public void shutdownNowKillsClientStream() {
}
@Override
@Ignore("Server side sockets are managed by the servlet container")
@Test
public void socketStats() {
}
@Override
@Ignore("serverTransportListener will not terminate")
@Test
public void clientStartAndStopOnceConnected() {
}
@Override
@Ignore("clientStreamTracer1.getInboundTrailers() is not null; listeners.poll() doesn't apply")
@Test
public void serverCancel() {
}
@Override
@Ignore("This doesn't apply: Ensure that for a closed ServerStream, interactions are noops")
@Test
public void interactionsAfterServerStreamCloseAreNoops() {
}
@Override
@Ignore("listeners.poll() doesn't apply")
@Test
public void interactionsAfterClientStreamCancelAreNoops() {
}
@Override
@Ignore("assertNull(serverStatus.getCause()) isn't true")
@Test
public void clientCancel() {
}
@Override
@Ignore("regression since bumping grpc v1.46 to v1.53")
@Test
public void messageProducerOnlyProducesRequestedMessages() {}
}

View File

@ -0,0 +1,282 @@
/*
* Copyright 2019 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.servlet.ServletServerStream.toHexString;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINEST;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.InternalLogId;
import io.grpc.servlet.ServletServerStream.ServletTransportState;
import java.io.IOException;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.logging.Logger;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
import javax.servlet.ServletOutputStream;
/** Handles write actions from the container thread and the application thread. */
final class AsyncServletOutputStreamWriter {
/**
* Memory boundary for write actions.
*
* <pre>
* WriteState curState = writeState.get(); // mark a boundary
* doSomething(); // do something within the boundary
* boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
* if (successful) {
* // state has not changed since
* return;
* } else {
* // state is changed by another thread while doSomething(), need recompute
* }
* </pre>
*
* <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
* application thread (calling {@code runOrBuffer()}) that read and update the
* writeState. Only onWritePossible() may turn {@code readyAndDrained} from false to true, and
* only runOrBuffer() may turn it from true to false.
*/
private final AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);
private final Log log;
private final BiFunction<byte[], Integer, ActionItem> writeAction;
private final ActionItem flushAction;
private final ActionItem completeAction;
private final BooleanSupplier isReady;
/**
* New write actions will be buffered into this queue if the servlet output stream is not ready or
* the queue is not drained.
*/
// SPSC queue would do
private final Queue<ActionItem> writeChain = new ConcurrentLinkedQueue<>();
// for a theoretical race condition that onWritePossible() is called immediately after isReady()
// returns false and before writeState.compareAndSet()
@Nullable
private volatile Thread parkingThread;
AsyncServletOutputStreamWriter(
AsyncContext asyncContext,
ServletTransportState transportState,
InternalLogId logId) throws IOException {
Logger logger = Logger.getLogger(AsyncServletOutputStreamWriter.class.getName());
this.log = new Log() {
@Override
public void fine(String str, Object... params) {
if (logger.isLoggable(FINE)) {
logger.log(FINE, "[" + logId + "]" + str, params);
}
}
@Override
public void finest(String str, Object... params) {
if (logger.isLoggable(FINEST)) {
logger.log(FINEST, "[" + logId + "] " + str, params);
}
}
};
ServletOutputStream outputStream = asyncContext.getResponse().getOutputStream();
this.writeAction = (byte[] bytes, Integer numBytes) -> () -> {
outputStream.write(bytes, 0, numBytes);
transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes));
log.finest("outbound data: length={0}, bytes={1}", numBytes, toHexString(bytes, numBytes));
};
this.flushAction = () -> {
log.finest("flushBuffer");
asyncContext.getResponse().flushBuffer();
};
this.completeAction = () -> {
log.fine("call is completing");
transportState.runOnTransportThread(
() -> {
transportState.complete();
asyncContext.complete();
log.fine("call completed");
});
};
this.isReady = () -> outputStream.isReady();
}
/**
* Constructor without java.util.logging and javax.servlet.* dependency, so that Lincheck can run.
*
* @param writeAction Provides an {@link ActionItem} to write given bytes with specified length.
* @param isReady Indicates whether the writer can write bytes at the moment (asynchronously).
*/
@VisibleForTesting
AsyncServletOutputStreamWriter(
BiFunction<byte[], Integer, ActionItem> writeAction,
ActionItem flushAction,
ActionItem completeAction,
BooleanSupplier isReady,
Log log) {
this.writeAction = writeAction;
this.flushAction = flushAction;
this.completeAction = completeAction;
this.isReady = isReady;
this.log = log;
}
/** Called from application thread. */
void writeBytes(byte[] bytes, int numBytes) throws IOException {
runOrBuffer(writeAction.apply(bytes, numBytes));
}
/** Called from application thread. */
void flush() throws IOException {
runOrBuffer(flushAction);
}
/** Called from application thread. */
void complete() {
try {
runOrBuffer(completeAction);
} catch (IOException ignore) {
// actually completeAction does not throw IOException
}
}
/** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
void onWritePossible() throws IOException {
log.finest("onWritePossible: ENTRY. The servlet output stream becomes ready");
assureReadyAndDrainedTurnsFalse();
while (isReady.getAsBoolean()) {
WriteState curState = writeState.get();
ActionItem actionItem = writeChain.poll();
if (actionItem != null) {
actionItem.run();
continue;
}
if (writeState.compareAndSet(curState, curState.withReadyAndDrained(true))) {
// state has not changed since.
log.finest(
"onWritePossible: EXIT. All data available now is sent out and the servlet output"
+ " stream is still ready");
return;
}
// else, state changed by another thread (runOrBuffer()), need to drain the writeChain
// again
}
log.finest("onWritePossible: EXIT. The servlet output stream becomes not ready");
}
private void assureReadyAndDrainedTurnsFalse() {
// readyAndDrained should have been set to false already.
// Just in case due to a race condition readyAndDrained is still true at this moment and is
// being set to false by runOrBuffer() concurrently.
while (writeState.get().readyAndDrained) {
parkingThread = Thread.currentThread();
LockSupport.parkNanos(Duration.ofMinutes(1).toNanos()); // should return immediately
}
parkingThread = null;
}
/**
* Either execute the write action directly, or buffer the action and let the container thread
* drain it.
*
* <p>Called from application thread.
*/
private void runOrBuffer(ActionItem actionItem) throws IOException {
WriteState curState = writeState.get();
if (curState.readyAndDrained) { // write to the outputStream directly
actionItem.run();
if (actionItem == completeAction) {
return;
}
if (!isReady.getAsBoolean()) {
boolean successful =
writeState.compareAndSet(curState, curState.withReadyAndDrained(false));
LockSupport.unpark(parkingThread);
checkState(successful, "Bug: curState is unexpectedly changed by another thread");
log.finest("the servlet output stream becomes not ready");
}
} else { // buffer to the writeChain
writeChain.offer(actionItem);
if (!writeState.compareAndSet(curState, curState.withReadyAndDrained(false))) {
checkState(
writeState.get().readyAndDrained,
"Bug: onWritePossible() should have changed readyAndDrained to true, but not");
ActionItem lastItem = writeChain.poll();
if (lastItem != null) {
checkState(lastItem == actionItem, "Bug: lastItem != actionItem");
runOrBuffer(lastItem);
}
} // state has not changed since
}
}
/** Write actions, e.g. writeBytes, flush, complete. */
@FunctionalInterface
@VisibleForTesting
interface ActionItem {
void run() throws IOException;
}
@VisibleForTesting // Lincheck test can not run with java.util.logging dependency.
interface Log {
default void fine(String str, Object...params) {}
default void finest(String str, Object...params) {}
}
private static final class WriteState {
static final WriteState DEFAULT = new WriteState(false);
/**
* The servlet output stream is ready and the writeChain is empty.
*
* <p>readyAndDrained turns from false to true when:
* {@code onWritePossible()} exits while currently there is no more data to write, but the last
* check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
*
* <p>readyAndDrained turns from true to false when:
* {@code runOrBuffer()} exits while either the action item is written directly to the
* servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
* right after that returns false, or the action item is buffered into the writeChain.
*/
final boolean readyAndDrained;
WriteState(boolean readyAndDrained) {
this.readyAndDrained = readyAndDrained;
}
/**
* Only {@code onWritePossible()} can set readyAndDrained to true, and only {@code
* runOrBuffer()} can set it to false.
*/
@CheckReturnValue
WriteState withReadyAndDrained(boolean readyAndDrained) {
return new WriteState(readyAndDrained);
}
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.BindableService;
import io.grpc.ExperimentalApi;
import java.io.IOException;
import java.util.List;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* A simple servlet backed by a gRPC server. Must set {@code asyncSupported} to true. The {@code
* /contextRoot/urlPattern} must match the gRPC services' path, which is
* "/full-service-name/short-method-name".
*
* <p>The API is experimental. The authors would like to know more about the real usecases. Users
* are welcome to provide feedback by commenting on
* <a href=https://github.com/grpc/grpc-java/issues/5066>the tracking issue</a>.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066")
public class GrpcServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private final ServletAdapter servletAdapter;
@VisibleForTesting
GrpcServlet(ServletAdapter servletAdapter) {
this.servletAdapter = servletAdapter;
}
/**
* Instantiate the servlet serving the given list of gRPC services. ServerInterceptors can be
* added on each gRPC service by {@link
* io.grpc.ServerInterceptors#intercept(BindableService, io.grpc.ServerInterceptor...)}
*/
public GrpcServlet(List<? extends BindableService> bindableServices) {
this(loadServices(bindableServices));
}
private static ServletAdapter loadServices(List<? extends BindableService> bindableServices) {
ServletServerBuilder serverBuilder = new ServletServerBuilder();
bindableServices.forEach(serverBuilder::addService);
return serverBuilder.buildServletAdapter();
}
@Override
protected final void doGet(HttpServletRequest request, HttpServletResponse response)
throws IOException {
servletAdapter.doGet(request, response);
}
@Override
protected final void doPost(HttpServletRequest request, HttpServletResponse response)
throws IOException {
servletAdapter.doPost(request, response);
}
@Override
public void destroy() {
servletAdapter.destroy();
super.destroy();
}
}

View File

@ -0,0 +1,333 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINEST;
import com.google.common.io.BaseEncoding;
import io.grpc.Attributes;
import io.grpc.ExperimentalApi;
import io.grpc.Grpc;
import io.grpc.InternalLogId;
import io.grpc.InternalMetadata;
import io.grpc.Metadata;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ReadableBuffers;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.StatsTraceContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* An adapter that transforms {@link HttpServletRequest} into gRPC request and lets a gRPC server
* process it, and transforms the gRPC response into {@link HttpServletResponse}. An adapter can be
* instantiated by {@link ServletServerBuilder#buildServletAdapter()}.
*
* <p>In a servlet, calling {@link #doPost(HttpServletRequest, HttpServletResponse)} inside {@link
* javax.servlet.http.HttpServlet#doPost(HttpServletRequest, HttpServletResponse)} makes the servlet
* backed by the gRPC server associated with the adapter. The servlet must support Asynchronous
* Processing and must be deployed to a container that supports servlet 4.0 and enables HTTP/2.
*
* <p>The API is experimental. The authors would like to know more about the real usecases. Users
* are welcome to provide feedback by commenting on
* <a href=https://github.com/grpc/grpc-java/issues/5066>the tracking issue</a>.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066")
public final class ServletAdapter {
static final Logger logger = Logger.getLogger(ServletAdapter.class.getName());
private final ServerTransportListener transportListener;
private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
private final int maxInboundMessageSize;
private final Attributes attributes;
ServletAdapter(
ServerTransportListener transportListener,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
int maxInboundMessageSize) {
this.transportListener = transportListener;
this.streamTracerFactories = streamTracerFactories;
this.maxInboundMessageSize = maxInboundMessageSize;
attributes = transportListener.transportReady(Attributes.EMPTY);
}
/**
* Call this method inside {@link javax.servlet.http.HttpServlet#doGet(HttpServletRequest,
* HttpServletResponse)} to serve gRPC GET request.
*
* <p>This method is currently not implemented.
*
* <p>Note that in rare case gRPC client sends GET requests.
*
* <p>Do not modify {@code req} and {@code resp} before or after calling this method. However,
* calling {@code resp.setBufferSize()} before invocation is allowed.
*/
public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED, "GET method not supported");
}
/**
* Call this method inside {@link javax.servlet.http.HttpServlet#doPost(HttpServletRequest,
* HttpServletResponse)} to serve gRPC POST request.
*
* <p>Do not modify {@code req} and {@code resp} before or after calling this method. However,
* calling {@code resp.setBufferSize()} before invocation is allowed.
*/
public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
checkArgument(req.isAsyncSupported(), "servlet does not support asynchronous operation");
checkArgument(ServletAdapter.isGrpc(req), "the request is not a gRPC request");
InternalLogId logId = InternalLogId.allocate(ServletAdapter.class, null);
logger.log(FINE, "[{0}] RPC started", logId);
AsyncContext asyncCtx = req.startAsync(req, resp);
String method = req.getRequestURI().substring(1); // remove the leading "/"
Metadata headers = getHeaders(req);
if (logger.isLoggable(FINEST)) {
logger.log(FINEST, "[{0}] method: {1}", new Object[] {logId, method});
logger.log(FINEST, "[{0}] headers: {1}", new Object[] {logId, headers});
}
Long timeoutNanos = headers.get(TIMEOUT_KEY);
if (timeoutNanos == null) {
timeoutNanos = 0L;
}
asyncCtx.setTimeout(TimeUnit.NANOSECONDS.toMillis(timeoutNanos));
StatsTraceContext statsTraceCtx =
StatsTraceContext.newServerContext(streamTracerFactories, method, headers);
ServletServerStream stream = new ServletServerStream(
asyncCtx,
statsTraceCtx,
maxInboundMessageSize,
attributes.toBuilder()
.set(
Grpc.TRANSPORT_ATTR_REMOTE_ADDR,
new InetSocketAddress(req.getRemoteHost(), req.getRemotePort()))
.set(
Grpc.TRANSPORT_ATTR_LOCAL_ADDR,
new InetSocketAddress(req.getLocalAddr(), req.getLocalPort()))
.build(),
getAuthority(req),
logId);
transportListener.streamCreated(stream, method, headers);
stream.transportState().runOnTransportThread(stream.transportState()::onStreamAllocated);
asyncCtx.getRequest().getInputStream()
.setReadListener(new GrpcReadListener(stream, asyncCtx, logId));
asyncCtx.addListener(new GrpcAsyncListener(stream, logId));
}
// This method must use Enumeration and its members, since that is the only way to read headers
// from the servlet api.
@SuppressWarnings("JdkObsolete")
private static Metadata getHeaders(HttpServletRequest req) {
Enumeration<String> headerNames = req.getHeaderNames();
checkNotNull(
headerNames, "Servlet container does not allow HttpServletRequest.getHeaderNames()");
List<byte[]> byteArrays = new ArrayList<>();
while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
Enumeration<String> values = req.getHeaders(headerName);
if (values == null) {
continue;
}
while (values.hasMoreElements()) {
String value = values.nextElement();
if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII));
byteArrays.add(BaseEncoding.base64().decode(value));
} else {
byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII));
byteArrays.add(value.getBytes(StandardCharsets.US_ASCII));
}
}
}
return InternalMetadata.newMetadata(byteArrays.toArray(new byte[][]{}));
}
// This method must use HttpRequest#getRequestURL or HttpUtils#getRequestURL, both of which
// can only return StringBuffer instances
@SuppressWarnings("JdkObsolete")
private static String getAuthority(HttpServletRequest req) {
try {
return new URI(req.getRequestURL().toString()).getAuthority();
} catch (URISyntaxException e) {
logger.log(FINE, "Error getting authority from the request URL {0}", req.getRequestURL());
return req.getServerName() + ":" + req.getServerPort();
}
}
/**
* Call this method when the adapter is no longer needed. The gRPC server will be terminated.
*/
public void destroy() {
transportListener.transportTerminated();
}
private static final class GrpcAsyncListener implements AsyncListener {
final InternalLogId logId;
final ServletServerStream stream;
GrpcAsyncListener(ServletServerStream stream, InternalLogId logId) {
this.stream = stream;
this.logId = logId;
}
@Override
public void onComplete(AsyncEvent event) {}
@Override
public void onTimeout(AsyncEvent event) {
if (logger.isLoggable(FINE)) {
logger.log(FINE, String.format("[{%s}] Timeout: ", logId), event.getThrowable());
}
// If the resp is not committed, cancel() to avoid being redirected to an error page.
// Else, the container will send RST_STREAM in the end.
if (!event.getAsyncContext().getResponse().isCommitted()) {
stream.cancel(Status.DEADLINE_EXCEEDED);
} else {
stream.transportState().runOnTransportThread(
() -> stream.transportState().transportReportStatus(Status.DEADLINE_EXCEEDED));
}
}
@Override
public void onError(AsyncEvent event) {
if (logger.isLoggable(FINE)) {
logger.log(FINE, String.format("[{%s}] Error: ", logId), event.getThrowable());
}
// If the resp is not committed, cancel() to avoid being redirected to an error page.
// Else, the container will send RST_STREAM at the end.
if (!event.getAsyncContext().getResponse().isCommitted()) {
stream.cancel(Status.fromThrowable(event.getThrowable()));
} else {
stream.transportState().runOnTransportThread(
() -> stream.transportState().transportReportStatus(
Status.fromThrowable(event.getThrowable())));
}
}
@Override
public void onStartAsync(AsyncEvent event) {}
}
private static final class GrpcReadListener implements ReadListener {
final ServletServerStream stream;
final AsyncContext asyncCtx;
final ServletInputStream input;
final InternalLogId logId;
GrpcReadListener(
ServletServerStream stream,
AsyncContext asyncCtx,
InternalLogId logId) throws IOException {
this.stream = stream;
this.asyncCtx = asyncCtx;
input = asyncCtx.getRequest().getInputStream();
this.logId = logId;
}
final byte[] buffer = new byte[4 * 1024];
@Override
public void onDataAvailable() throws IOException {
logger.log(FINEST, "[{0}] onDataAvailable: ENTRY", logId);
while (input.isReady()) {
int length = input.read(buffer);
if (length == -1) {
logger.log(FINEST, "[{0}] inbound data: read end of stream", logId);
return;
} else {
if (logger.isLoggable(FINEST)) {
logger.log(
FINEST,
"[{0}] inbound data: length = {1}, bytes = {2}",
new Object[] {logId, length, ServletServerStream.toHexString(buffer, length)});
}
byte[] copy = Arrays.copyOf(buffer, length);
stream.transportState().runOnTransportThread(
() -> stream.transportState().inboundDataReceived(ReadableBuffers.wrap(copy), false));
}
}
logger.log(FINEST, "[{0}] onDataAvailable: EXIT", logId);
}
@Override
public void onAllDataRead() {
logger.log(FINE, "[{0}] onAllDataRead", logId);
stream.transportState().runOnTransportThread(() ->
stream.transportState().inboundDataReceived(ReadableBuffers.empty(), true));
}
@Override
public void onError(Throwable t) {
if (logger.isLoggable(FINE)) {
logger.log(FINE, String.format("[{%s}] Error: ", logId), t);
}
// If the resp is not committed, cancel() to avoid being redirected to an error page.
// Else, the container will send RST_STREAM at the end.
if (!asyncCtx.getResponse().isCommitted()) {
stream.cancel(Status.fromThrowable(t));
} else {
stream.transportState().runOnTransportThread(
() -> stream.transportState()
.transportReportStatus(Status.fromThrowable(t)));
}
}
}
/**
* Checks whether an incoming {@code HttpServletRequest} may come from a gRPC client.
*
* @return true if the request comes from a gRPC client
*/
public static boolean isGrpc(HttpServletRequest request) {
return request.getContentType() != null
&& request.getContentType().contains(GrpcUtil.CONTENT_TYPE_GRPC);
}
}

View File

@ -0,0 +1,268 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Attributes;
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingServerBuilder;
import io.grpc.Internal;
import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalInstrumented;
import io.grpc.InternalLogId;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ServerImplBuilder;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.SharedResourceHolder;
import java.io.File;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
/**
* Builder to build a gRPC server that can run as a servlet. This is for advanced custom settings.
* Normally, users should consider extending the out-of-box {@link GrpcServlet} directly instead.
*
* <p>The API is experimental. The authors would like to know more about the real usecases. Users
* are welcome to provide feedback by commenting on
* <a href=https://github.com/grpc/grpc-java/issues/5066>the tracking issue</a>.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066")
@NotThreadSafe
public final class ServletServerBuilder extends ForwardingServerBuilder<ServletServerBuilder> {
List<? extends ServerStreamTracer.Factory> streamTracerFactories;
int maxInboundMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
private final ServerImplBuilder serverImplBuilder;
private ScheduledExecutorService scheduler;
private boolean internalCaller;
private boolean usingCustomScheduler;
private InternalServerImpl internalServer;
public ServletServerBuilder() {
serverImplBuilder = new ServerImplBuilder(this::buildTransportServers);
}
/**
* Builds a gRPC server that can run as a servlet.
*
* <p>The returned server will not be started or bound to a port.
*
* <p>Users should not call this method directly. Instead users should call
* {@link #buildServletAdapter()} which internally will call {@code build()} and {@code start()}
* appropriately.
*
* @throws IllegalStateException if this method is called by users directly
*/
@Override
public Server build() {
checkState(internalCaller, "build() method should not be called directly by an application");
return super.build();
}
/**
* Creates a {@link ServletAdapter}.
*/
public ServletAdapter buildServletAdapter() {
return new ServletAdapter(buildAndStart(), streamTracerFactories, maxInboundMessageSize);
}
private ServerTransportListener buildAndStart() {
Server server;
try {
internalCaller = true;
server = build().start();
} catch (IOException e) {
// actually this should never happen
throw new RuntimeException(e);
} finally {
internalCaller = false;
}
if (!usingCustomScheduler) {
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
}
// Create only one "transport" for all requests because it has no knowledge of which request is
// associated with which client socket. This "transport" does not do socket connection, the
// container does.
ServerTransportImpl serverTransport = new ServerTransportImpl(scheduler);
ServerTransportListener delegate =
internalServer.serverListener.transportCreated(serverTransport);
return new ServerTransportListener() {
@Override
public void streamCreated(ServerStream stream, String method, Metadata headers) {
delegate.streamCreated(stream, method, headers);
}
@Override
public Attributes transportReady(Attributes attributes) {
return delegate.transportReady(attributes);
}
@Override
public void transportTerminated() {
server.shutdown();
delegate.transportTerminated();
if (!usingCustomScheduler) {
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler);
}
}
};
}
@VisibleForTesting
InternalServer buildTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
checkNotNull(streamTracerFactories, "streamTracerFactories");
this.streamTracerFactories = streamTracerFactories;
internalServer = new InternalServerImpl();
return internalServer;
}
@Internal
@Override
protected ServerBuilder<?> delegate() {
return serverImplBuilder;
}
/**
* Throws {@code UnsupportedOperationException}. TLS should be configured by the servlet
* container.
*/
@Override
public ServletServerBuilder useTransportSecurity(File certChain, File privateKey) {
throw new UnsupportedOperationException("TLS should be configured by the servlet container");
}
@Override
public ServletServerBuilder maxInboundMessageSize(int bytes) {
checkArgument(bytes >= 0, "bytes must be >= 0");
maxInboundMessageSize = bytes;
return this;
}
/**
* Provides a custom scheduled executor service to the server builder.
*
* @return this
*/
public ServletServerBuilder scheduledExecutorService(ScheduledExecutorService scheduler) {
this.scheduler = checkNotNull(scheduler, "scheduler");
usingCustomScheduler = true;
return this;
}
private static final class InternalServerImpl implements InternalServer {
ServerListener serverListener;
InternalServerImpl() {}
@Override
public void start(ServerListener listener) {
serverListener = listener;
}
@Override
public void shutdown() {
if (serverListener != null) {
serverListener.serverShutdown();
}
}
@Override
public SocketAddress getListenSocketAddress() {
return new SocketAddress() {
@Override
public String toString() {
return "ServletServer";
}
};
}
@Override
public InternalInstrumented<SocketStats> getListenSocketStats() {
// sockets are managed by the servlet container, grpc is ignorant of that
return null;
}
@Override
public List<? extends SocketAddress> getListenSocketAddresses() {
return Collections.emptyList();
}
@Nullable
@Override
public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
return null;
}
}
@VisibleForTesting
static final class ServerTransportImpl implements ServerTransport {
private final InternalLogId logId = InternalLogId.allocate(ServerTransportImpl.class, null);
private final ScheduledExecutorService scheduler;
ServerTransportImpl(ScheduledExecutorService scheduler) {
this.scheduler = checkNotNull(scheduler, "scheduler");
}
@Override
public void shutdown() {}
@Override
public void shutdownNow(Status reason) {}
@Override
public ScheduledExecutorService getScheduledExecutorService() {
return scheduler;
}
@Override
public ListenableFuture<SocketStats> getStats() {
// does not support instrumentation
return null;
}
@Override
public InternalLogId getLogId() {
return logId;
}
}
}

View File

@ -0,0 +1,336 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_GRPC;
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINEST;
import static java.util.logging.Level.WARNING;
import com.google.common.io.BaseEncoding;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Attributes;
import io.grpc.InternalLogId;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.internal.AbstractServerStream;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SerializingExecutor;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportFrameUtil;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.WritableBuffer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletResponse;
final class ServletServerStream extends AbstractServerStream {
private static final Logger logger = Logger.getLogger(ServletServerStream.class.getName());
private final ServletTransportState transportState;
private final Sink sink = new Sink();
private final AsyncContext asyncCtx;
private final HttpServletResponse resp;
private final Attributes attributes;
private final String authority;
private final InternalLogId logId;
private final AsyncServletOutputStreamWriter writer;
ServletServerStream(
AsyncContext asyncCtx,
StatsTraceContext statsTraceCtx,
int maxInboundMessageSize,
Attributes attributes,
String authority,
InternalLogId logId) throws IOException {
super(ByteArrayWritableBuffer::new, statsTraceCtx);
transportState =
new ServletTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer());
this.attributes = attributes;
this.authority = authority;
this.logId = logId;
this.asyncCtx = asyncCtx;
this.resp = (HttpServletResponse) asyncCtx.getResponse();
this.writer = new AsyncServletOutputStreamWriter(
asyncCtx, transportState, logId);
resp.getOutputStream().setWriteListener(new GrpcWriteListener());
}
@Override
protected ServletTransportState transportState() {
return transportState;
}
@Override
public Attributes getAttributes() {
return attributes;
}
@Override
public String getAuthority() {
return authority;
}
@Override
public int streamId() {
return -1;
}
@Override
protected Sink abstractServerStreamSink() {
return sink;
}
private void writeHeadersToServletResponse(Metadata metadata) {
// Discard any application supplied duplicates of the reserved headers
metadata.discardAll(CONTENT_TYPE_KEY);
metadata.discardAll(GrpcUtil.TE_HEADER);
metadata.discardAll(GrpcUtil.USER_AGENT_KEY);
if (logger.isLoggable(FINE)) {
logger.log(FINE, "[{0}] writeHeaders {1}", new Object[] {logId, metadata});
}
resp.setStatus(HttpServletResponse.SC_OK);
resp.setContentType(CONTENT_TYPE_GRPC);
byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(metadata);
for (int i = 0; i < serializedHeaders.length; i += 2) {
resp.addHeader(
new String(serializedHeaders[i], StandardCharsets.US_ASCII),
new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII));
}
}
final class ServletTransportState extends TransportState {
private final SerializingExecutor transportThreadExecutor =
new SerializingExecutor(MoreExecutors.directExecutor());
private ServletTransportState(
int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer) {
super(maxMessageSize, statsTraceCtx, transportTracer);
}
@Override
public void runOnTransportThread(Runnable r) {
transportThreadExecutor.execute(r);
}
@Override
public void bytesRead(int numBytes) {
// no-op
// no flow control yet
}
@Override
public void deframeFailed(Throwable cause) {
if (logger.isLoggable(FINE)) {
logger.log(FINE, String.format("[{%s}] Exception processing message", logId), cause);
}
cancel(Status.fromThrowable(cause));
}
}
private static final class ByteArrayWritableBuffer implements WritableBuffer {
private final int capacity;
final byte[] bytes;
private int index;
ByteArrayWritableBuffer(int capacityHint) {
this.bytes = new byte[min(1024 * 1024, max(4096, capacityHint))];
this.capacity = bytes.length;
}
@Override
public void write(byte[] src, int srcIndex, int length) {
System.arraycopy(src, srcIndex, bytes, index, length);
index += length;
}
@Override
public void write(byte b) {
bytes[index++] = b;
}
@Override
public int writableBytes() {
return capacity - index;
}
@Override
public int readableBytes() {
return index;
}
@Override
public void release() {}
}
private final class GrpcWriteListener implements WriteListener {
@Override
public void onError(Throwable t) {
if (logger.isLoggable(FINE)) {
logger.log(FINE, String.format("[{%s}] Error: ", logId), t);
}
// If the resp is not committed, cancel() to avoid being redirected to an error page.
// Else, the container will send RST_STREAM at the end.
if (!resp.isCommitted()) {
cancel(Status.fromThrowable(t));
} else {
transportState.runOnTransportThread(
() -> transportState.transportReportStatus(Status.fromThrowable(t)));
}
}
@Override
public void onWritePossible() throws IOException {
writer.onWritePossible();
}
}
private final class Sink implements AbstractServerStream.Sink {
final TrailerSupplier trailerSupplier = new TrailerSupplier();
@Override
public void writeHeaders(Metadata headers) {
writeHeadersToServletResponse(headers);
resp.setTrailerFields(trailerSupplier);
try {
writer.flush();
} catch (IOException e) {
logger.log(WARNING, String.format("[{%s}] Exception when flushBuffer", logId), e);
cancel(Status.fromThrowable(e));
}
}
@Override
public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages) {
if (frame == null && !flush) {
return;
}
if (logger.isLoggable(FINEST)) {
logger.log(
FINEST,
"[{0}] writeFrame: numBytes = {1}, flush = {2}, numMessages = {3}",
new Object[]{logId, frame == null ? 0 : frame.readableBytes(), flush, numMessages});
}
try {
if (frame != null) {
int numBytes = frame.readableBytes();
if (numBytes > 0) {
onSendingBytes(numBytes);
}
writer.writeBytes(((ByteArrayWritableBuffer) frame).bytes, frame.readableBytes());
}
if (flush) {
writer.flush();
}
} catch (IOException e) {
logger.log(WARNING, String.format("[{%s}] Exception writing message", logId), e);
cancel(Status.fromThrowable(e));
}
}
@Override
public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
if (logger.isLoggable(FINE)) {
logger.log(
FINE,
"[{0}] writeTrailers: {1}, headersSent = {2}, status = {3}",
new Object[] {logId, trailers, headersSent, status});
}
if (!headersSent) {
writeHeadersToServletResponse(trailers);
} else {
byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(trailers);
for (int i = 0; i < serializedHeaders.length; i += 2) {
String key = new String(serializedHeaders[i], StandardCharsets.US_ASCII);
String newValue = new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII);
trailerSupplier.get().computeIfPresent(key, (k, v) -> v + "," + newValue);
trailerSupplier.get().putIfAbsent(key, newValue);
}
}
writer.complete();
}
@Override
public void cancel(Status status) {
if (resp.isCommitted() && Code.DEADLINE_EXCEEDED == status.getCode()) {
return; // let the servlet timeout, the container will sent RST_STREAM automatically
}
transportState.runOnTransportThread(() -> transportState.transportReportStatus(status));
// There is no way to RST_STREAM with CANCEL code, so write trailers instead
close(Status.CANCELLED.withCause(status.asRuntimeException()), new Metadata());
CountDownLatch countDownLatch = new CountDownLatch(1);
transportState.runOnTransportThread(() -> {
asyncCtx.complete();
countDownLatch.countDown();
});
try {
countDownLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private static final class TrailerSupplier implements Supplier<Map<String, String>> {
final Map<String, String> trailers = Collections.synchronizedMap(new HashMap<>());
TrailerSupplier() {}
@Override
public Map<String, String> get() {
return trailers;
}
}
static String toHexString(byte[] bytes, int length) {
String hex = BaseEncoding.base16().encode(bytes, 0, min(length, 64));
if (length > 80) {
hex += "...";
}
if (length > 64) {
int offset = max(64, length - 16);
hex += BaseEncoding.base16().encode(bytes, offset, length - offset);
}
return hex;
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* API that implements gRPC server as a servlet. The API requires that the application container
* supports Servlet 4.0 and enables HTTP/2.
*
* <p>The API is experimental. The authors would like to know more about the real usecases. Users
* are welcome to provide feedback by commenting on
* <a href=https://github.com/grpc/grpc-java/issues/5066>the tracking issue</a>.
*/
@io.grpc.ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066")
package io.grpc.servlet;

View File

@ -0,0 +1,174 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import static com.google.common.truth.Truth.assertWithMessage;
import static org.jetbrains.kotlinx.lincheck.strategy.managed.ManagedStrategyGuaranteeKt.forClasses;
import io.grpc.servlet.AsyncServletOutputStreamWriter.ActionItem;
import io.grpc.servlet.AsyncServletOutputStreamWriter.Log;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.jetbrains.kotlinx.lincheck.LinChecker;
import org.jetbrains.kotlinx.lincheck.annotations.OpGroupConfig;
import org.jetbrains.kotlinx.lincheck.annotations.Operation;
import org.jetbrains.kotlinx.lincheck.annotations.Param;
import org.jetbrains.kotlinx.lincheck.paramgen.BooleanGen;
import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingCTest;
import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingOptions;
import org.jetbrains.kotlinx.lincheck.verifier.VerifierState;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Test concurrency correctness of {@link AsyncServletOutputStreamWriter} using model checking with
* Lincheck.
*
* <p>This test should only call AsyncServletOutputStreamWriter's API surface and not rely on any
* implementation detail such as whether it's using a lock-free approach or not.
*
* <p>The test executes two threads concurrently, one for write and flush, and the other for
* onWritePossible up to {@link #OPERATIONS_PER_THREAD} operations on each thread. Lincheck will
* test all possibly interleaves (on context switch) between the two threads, and then verify the
* operations are linearizable in each interleave scenario.
*/
@ModelCheckingCTest
@OpGroupConfig(name = "update", nonParallel = true)
@OpGroupConfig(name = "write", nonParallel = true)
@Param(name = "keepReady", gen = BooleanGen.class)
@RunWith(JUnit4.class)
public class AsyncServletOutputStreamWriterConcurrencyTest extends VerifierState {
private static final int OPERATIONS_PER_THREAD = 6;
private final AsyncServletOutputStreamWriter writer;
private final boolean[] keepReadyArray = new boolean[OPERATIONS_PER_THREAD];
private volatile boolean isReady;
// when isReadyReturnedFalse, writer.onWritePossible() will be called.
private volatile boolean isReadyReturnedFalse;
private int producerIndex;
private int consumerIndex;
private int bytesWritten;
/** Public no-args constructor. */
public AsyncServletOutputStreamWriterConcurrencyTest() {
BiFunction<byte[], Integer, ActionItem> writeAction =
(bytes, numBytes) -> () -> {
assertWithMessage("write should only be called while isReady() is true")
.that(isReady)
.isTrue();
// The byte to be written must equal to consumerIndex, otherwise execution order is wrong
assertWithMessage("write in wrong order").that(bytes[0]).isEqualTo((byte) consumerIndex);
bytesWritten++;
writeOrFlush();
};
ActionItem flushAction = () -> {
assertWithMessage("flush must only be called while isReady() is true").that(isReady).isTrue();
writeOrFlush();
};
writer = new AsyncServletOutputStreamWriter(
writeAction,
flushAction,
() -> { },
this::isReady,
new Log() {});
}
private void writeOrFlush() {
boolean keepReady = keepReadyArray[consumerIndex];
if (!keepReady) {
isReady = false;
}
consumerIndex++;
}
private boolean isReady() {
if (!isReady) {
assertWithMessage("isReady() already returned false, onWritePossible() will be invoked")
.that(isReadyReturnedFalse).isFalse();
isReadyReturnedFalse = true;
}
return isReady;
}
/**
* Writes a single byte with value equal to {@link #producerIndex}.
*
* @param keepReady when the byte is written:
* the ServletOutputStream should remain ready if keepReady == true;
* the ServletOutputStream should become unready if keepReady == false.
*/
// @com.google.errorprone.annotations.Keep
@Operation(group = "write")
public void write(@Param(name = "keepReady") boolean keepReady) throws IOException {
keepReadyArray[producerIndex] = keepReady;
writer.writeBytes(new byte[]{(byte) producerIndex}, 1);
producerIndex++;
}
/**
* Flushes the writer.
*
* @param keepReady when flushing:
* the ServletOutputStream should remain ready if keepReady == true;
* the ServletOutputStream should become unready if keepReady == false.
*/
// @com.google.errorprone.annotations.Keep // called by lincheck reflectively
@Operation(group = "write")
public void flush(@Param(name = "keepReady") boolean keepReady) throws IOException {
keepReadyArray[producerIndex] = keepReady;
writer.flush();
producerIndex++;
}
/** If the writer is not ready, let it turn ready and call writer.onWritePossible(). */
// @com.google.errorprone.annotations.Keep // called by lincheck reflectively
@Operation(group = "update")
public void maybeOnWritePossible() throws IOException {
if (isReadyReturnedFalse) {
isReadyReturnedFalse = false;
isReady = true;
writer.onWritePossible();
}
}
@Override
protected Object extractState() {
return bytesWritten;
}
@Test
public void linCheck() {
ModelCheckingOptions options = new ModelCheckingOptions()
.actorsBefore(0)
.threads(2)
.actorsPerThread(OPERATIONS_PER_THREAD)
.actorsAfter(0)
.addGuarantee(
forClasses(
ConcurrentLinkedQueue.class.getName(),
AtomicReference.class.getName())
.allMethods()
.treatAsAtomic());
LinChecker.check(AsyncServletOutputStreamWriterConcurrencyTest.class, options);
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import java.util.Enumeration;
import java.util.StringTokenizer;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Test for {@link ServletServerBuilder}. */
@RunWith(JUnit4.class)
public class ServletServerBuilderTest {
@Test
public void scheduledExecutorService() throws Exception {
ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class);
HttpServletRequest request = mock(HttpServletRequest.class);
HttpServletResponse response = mock(HttpServletResponse.class);
AsyncContext asyncContext = mock(AsyncContext.class);
ServletInputStream inputStream = mock(ServletInputStream.class);
ServletOutputStream outputStream = mock(ServletOutputStream.class);
ScheduledFuture<?> future = mock(ScheduledFuture.class);
doReturn(future).when(scheduler).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
doReturn(true).when(request).isAsyncSupported();
doReturn(asyncContext).when(request).startAsync(request, response);
doReturn("application/grpc").when(request).getContentType();
doReturn("/hello/world").when(request).getRequestURI();
@SuppressWarnings({"JdkObsolete", "unchecked"}) // Required by servlet API signatures.
// StringTokenizer is actually Enumeration<String>
Enumeration<String> headerNames =
(Enumeration<String>) ((Enumeration<?>) new StringTokenizer("grpc-timeout"));
@SuppressWarnings({"JdkObsolete", "unchecked"})
Enumeration<String> headers =
(Enumeration<String>) ((Enumeration<?>) new StringTokenizer("1m"));
doReturn(headerNames).when(request).getHeaderNames();
doReturn(headers).when(request).getHeaders("grpc-timeout");
doReturn(new StringBuffer("localhost:8080")).when(request).getRequestURL();
doReturn(inputStream).when(request).getInputStream();
doReturn("1.1.1.1").when(request).getLocalAddr();
doReturn(8080).when(request).getLocalPort();
doReturn("remote").when(request).getRemoteHost();
doReturn(80).when(request).getRemotePort();
doReturn(outputStream).when(response).getOutputStream();
doReturn(request).when(asyncContext).getRequest();
doReturn(response).when(asyncContext).getResponse();
ServletServerBuilder serverBuilder =
new ServletServerBuilder().scheduledExecutorService(scheduler);
ServletAdapter servletAdapter = serverBuilder.buildServletAdapter();
servletAdapter.doPost(request, response);
verify(asyncContext).setTimeout(1);
// The following just verifies that scheduler is populated to the transport.
// It doesn't matter what tasks (such as handshake timeout and request deadline) are actually
// scheduled.
verify(scheduler, timeout(5000).atLeastOnce())
.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
}
}

View File

@ -0,0 +1,140 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ServerBuilder;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.testing.integration.AbstractInteropTest;
import java.io.File;
import org.apache.catalina.Context;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.startup.Tomcat;
import org.apache.coyote.http2.Http2Protocol;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Ignore;
import org.junit.Test;
/**
* Interop test for Tomcat server and Netty client.
*/
public class TomcatInteropTest extends AbstractInteropTest {
private static final String HOST = "localhost";
private static final String MYAPP = "/grpc.testing.TestService";
private int port;
private Tomcat server;
@After
@Override
public void tearDown() {
super.tearDown();
try {
server.stop();
} catch (LifecycleException e) {
throw new AssertionError(e);
}
}
@AfterClass
public static void cleanUp() throws Exception {
FileUtils.deleteDirectory(new File("tomcat.0"));
}
@Override
protected ServerBuilder<?> getServerBuilder() {
return new ServletServerBuilder().maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
}
@Override
protected void startServer(ServerBuilder<?> builer) {
server = new Tomcat();
server.setPort(0);
Context ctx = server.addContext(MYAPP, new File("build/tmp").getAbsolutePath());
Tomcat
.addServlet(
ctx, "io.grpc.servlet.TomcatInteropTest",
new GrpcServlet(((ServletServerBuilder) builer).buildServletAdapter()))
.setAsyncSupported(true);
ctx.addServletMappingDecoded("/*", "io.grpc.servlet.TomcatInteropTest");
// Explicitly disable safeguards against malicious clients, as some unit tests trigger these
Http2Protocol http2Protocol = new Http2Protocol();
http2Protocol.setOverheadCountFactor(0);
http2Protocol.setOverheadWindowUpdateThreshold(0);
http2Protocol.setOverheadContinuationThreshold(0);
http2Protocol.setOverheadDataThreshold(0);
server.getConnector().addUpgradeProtocol(http2Protocol);
try {
server.start();
} catch (LifecycleException e) {
throw new RuntimeException(e);
}
port = server.getConnector().getLocalPort();
}
@Override
protected ManagedChannelBuilder<?> createChannelBuilder() {
NettyChannelBuilder builder =
(NettyChannelBuilder) ManagedChannelBuilder.forAddress(HOST, port)
.usePlaintext()
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
InternalNettyChannelBuilder.setStatsEnabled(builder, false);
builder.intercept(createCensusStatsClientInterceptor());
return builder;
}
@Override
protected boolean metricsExpected() {
return false; // otherwise re-test will not work
}
// FIXME
@Override
@Ignore("Tomcat is broken on client GOAWAY")
@Test
public void gracefulShutdown() {}
// FIXME
@Override
@Ignore("Tomcat is not able to send trailer only")
@Test
public void specialStatusMessage() {}
// FIXME
@Override
@Ignore("Tomcat is not able to send trailer only")
@Test
public void unimplementedMethod() {}
// FIXME
@Override
@Ignore("Tomcat is not able to send trailer only")
@Test
public void statusCodeAndMessage() {}
// FIXME
@Override
@Ignore("Tomcat is not able to send trailer only")
@Test
public void emptyStream() {}
}

View File

@ -0,0 +1,270 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalInstrumented;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractTransportTest;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.FakeClock;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransportListener;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.servlet.ServletServerBuilder.ServerTransportImpl;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.catalina.Context;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.startup.Tomcat;
import org.apache.coyote.http2.Http2Protocol;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
/**
* Transport test for Tomcat server and Netty client.
*/
public class TomcatTransportTest extends AbstractTransportTest {
private static final String MYAPP = "/service";
private final FakeClock fakeClock = new FakeClock();
private Tomcat tomcatServer;
private int port;
@After
@Override
public void tearDown() throws InterruptedException {
super.tearDown();
try {
tomcatServer.stop();
} catch (LifecycleException e) {
throw new AssertionError(e);
}
}
@Override
protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
return new InternalServer() {
final InternalServer delegate =
new ServletServerBuilder().buildTransportServers(streamTracerFactories);
@Override
public void start(ServerListener listener) throws IOException {
delegate.start(listener);
ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService();
ServerTransportListener serverTransportListener =
listener.transportCreated(new ServerTransportImpl(scheduler));
ServletAdapter adapter =
new ServletAdapter(serverTransportListener, streamTracerFactories, Integer.MAX_VALUE);
GrpcServlet grpcServlet = new GrpcServlet(adapter);
tomcatServer = new Tomcat();
tomcatServer.setPort(0);
Context ctx = tomcatServer.addContext(MYAPP, new File("build/tmp").getAbsolutePath());
Tomcat.addServlet(ctx, "TomcatTransportTest", grpcServlet)
.setAsyncSupported(true);
ctx.addServletMappingDecoded("/*", "TomcatTransportTest");
tomcatServer.getConnector().addUpgradeProtocol(new Http2Protocol());
try {
tomcatServer.start();
} catch (LifecycleException e) {
throw new RuntimeException(e);
}
port = tomcatServer.getConnector().getLocalPort();
}
@Override
public void shutdown() {
delegate.shutdown();
}
@Override
public SocketAddress getListenSocketAddress() {
return delegate.getListenSocketAddress();
}
@Override
public InternalInstrumented<SocketStats> getListenSocketStats() {
return delegate.getListenSocketStats();
}
@Override
public List<? extends SocketAddress> getListenSocketAddresses() {
return delegate.getListenSocketAddresses();
}
@Nullable
@Override
public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
return delegate.getListenSocketStatsList();
}
};
}
@Override
protected InternalServer newServer(
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
return newServer(streamTracerFactories);
}
@Override
protected ManagedClientTransport newClientTransport(InternalServer server) {
NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder
// Although specified here, address is ignored because we never call build.
.forAddress("localhost", 0)
.flowControlWindow(65 * 1024)
.negotiationType(NegotiationType.PLAINTEXT);
InternalNettyChannelBuilder
.setTransportTracerFactory(nettyChannelBuilder, fakeClockTransportTracer);
ClientTransportFactory clientFactory =
InternalNettyChannelBuilder.buildTransportFactory(nettyChannelBuilder);
return clientFactory.newClientTransport(
new InetSocketAddress("localhost", port),
new ClientTransportFactory.ClientTransportOptions()
.setAuthority(testAuthority(server))
.setEagAttributes(eagAttrs()),
transportLogger());
}
@Override
protected String testAuthority(InternalServer server) {
return "localhost:" + port;
}
@Override
protected void advanceClock(long offset, TimeUnit unit) {
fakeClock.forwardNanos(unit.toNanos(offset));
}
@Override
protected long fakeCurrentTimeNanos() {
return fakeClock.getTicker().read();
}
@Override
@Ignore("Skip the test, server lifecycle is managed by the container")
@Test
public void serverAlreadyListening() {}
@Override
@Ignore("Skip the test, server lifecycle is managed by the container")
@Test
public void openStreamPreventsTermination() {}
@Override
@Ignore("Skip the test, server lifecycle is managed by the container")
@Test
public void shutdownNowKillsServerStream() {}
@Override
@Ignore("Skip the test, server lifecycle is managed by the container")
@Test
public void serverNotListening() {}
// FIXME
@Override
@Ignore("Tomcat is broken on client GOAWAY")
@Test
public void newStream_duringShutdown() {}
// FIXME
@Override
@Ignore("Tomcat is broken on client GOAWAY")
@Test
public void ping_duringShutdown() {}
// FIXME
@Override
@Ignore("Tomcat is broken on client RST_STREAM")
@Test
public void frameAfterRstStreamShouldNotBreakClientChannel() {}
// FIXME
@Override
@Ignore("Tomcat is broken on client RST_STREAM")
@Test
public void shutdownNowKillsClientStream() {}
// FIXME
@Override
@Ignore("Servlet flow control not implemented yet")
@Test
public void flowControlPushBack() {}
@Override
@Ignore("Server side sockets are managed by the servlet container")
@Test
public void socketStats() {}
@Override
@Ignore("serverTransportListener will not terminate")
@Test
public void clientStartAndStopOnceConnected() {}
@Override
@Ignore("clientStreamTracer1.getInboundTrailers() is not null; listeners.poll() doesn't apply")
@Test
public void serverCancel() {}
@Override
@Ignore("This doesn't apply: Ensure that for a closed ServerStream, interactions are noops")
@Test
public void interactionsAfterServerStreamCloseAreNoops() {}
@Override
@Ignore("listeners.poll() doesn't apply")
@Test
public void interactionsAfterClientStreamCancelAreNoops() {}
@Override
@Ignore("assertNull(serverStatus.getCause()) isn't true")
@Test
public void clientCancel() {}
@Override
@Ignore("Tomcat does not support trailers only")
@Test
public void earlyServerClose_noServerHeaders() {}
@Override
@Ignore("Tomcat does not support trailers only")
@Test
public void earlyServerClose_serverFailure() {}
@Override
@Ignore("Tomcat does not support trailers only")
@Test
public void earlyServerClose_serverFailure_withClientCancelOnListenerClosed() {}
@Override
@Ignore("regression since bumping grpc v1.46 to v1.53")
@Test
public void messageProducerOnlyProducesRequestedMessages() {}
}

View File

@ -0,0 +1,128 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import static io.undertow.servlet.Servlets.defaultContainer;
import static io.undertow.servlet.Servlets.deployment;
import static io.undertow.servlet.Servlets.servlet;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ServerBuilder;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.testing.integration.AbstractInteropTest;
import io.undertow.Handlers;
import io.undertow.Undertow;
import io.undertow.UndertowOptions;
import io.undertow.server.HttpHandler;
import io.undertow.server.handlers.PathHandler;
import io.undertow.servlet.api.DeploymentInfo;
import io.undertow.servlet.api.DeploymentManager;
import io.undertow.servlet.api.InstanceFactory;
import io.undertow.servlet.util.ImmediateInstanceHandle;
import java.net.InetSocketAddress;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
/**
* Interop test for Undertow server and Netty client.
*/
public class UndertowInteropTest extends AbstractInteropTest {
private static final String HOST = "localhost";
private static final String MYAPP = "/grpc.testing.TestService";
private int port;
private Undertow server;
private DeploymentManager manager;
@After
@Override
public void tearDown() {
super.tearDown();
if (server != null) {
server.stop();
}
if (manager != null) {
try {
manager.stop();
} catch (ServletException e) {
throw new AssertionError("failed to stop container", e);
}
}
}
@Override
protected ServletServerBuilder getServerBuilder() {
return new ServletServerBuilder().maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
}
@Override
protected void startServer(ServerBuilder<?> builder) {
GrpcServlet grpcServlet =
new GrpcServlet(((ServletServerBuilder) builder).buildServletAdapter());
InstanceFactory<? extends Servlet> instanceFactory =
() -> new ImmediateInstanceHandle<>(grpcServlet);
DeploymentInfo servletBuilder =
deployment()
.setClassLoader(UndertowInteropTest.class.getClassLoader())
.setContextPath(MYAPP)
.setDeploymentName("UndertowInteropTest.war")
.addServlets(
servlet("InteropTestServlet", GrpcServlet.class, instanceFactory)
.addMapping("/*")
.setAsyncSupported(true));
manager = defaultContainer().addDeployment(servletBuilder);
manager.deploy();
HttpHandler servletHandler;
try {
servletHandler = manager.start();
} catch (ServletException e) {
throw new RuntimeException(e);
}
PathHandler path = Handlers.path(Handlers.redirect(MYAPP))
.addPrefixPath("/", servletHandler); // for unimplementedService test
server = Undertow.builder()
.setServerOption(UndertowOptions.ENABLE_HTTP2, true)
.setServerOption(UndertowOptions.SHUTDOWN_TIMEOUT, 5000 /* 5 sec */)
.addHttpListener(0, HOST)
.setHandler(path)
.build();
server.start();
port = ((InetSocketAddress) server.getListenerInfo().get(0).getAddress()).getPort();
}
@Override
protected ManagedChannelBuilder<?> createChannelBuilder() {
NettyChannelBuilder builder = (NettyChannelBuilder) ManagedChannelBuilder
.forAddress(HOST, port)
.usePlaintext()
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE);
InternalNettyChannelBuilder.setStatsEnabled(builder, false);
builder.intercept(createCensusStatsClientInterceptor());
return builder;
}
// FIXME
@Override
@Ignore("Undertow is broken on client GOAWAY")
@Test
public void gracefulShutdown() {}
}

View File

@ -0,0 +1,304 @@
/*
* Copyright 2018 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.servlet;
import static io.undertow.servlet.Servlets.defaultContainer;
import static io.undertow.servlet.Servlets.deployment;
import static io.undertow.servlet.Servlets.servlet;
import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalInstrumented;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractTransportTest;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.FakeClock;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransportListener;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.servlet.ServletServerBuilder.ServerTransportImpl;
import io.undertow.Handlers;
import io.undertow.Undertow;
import io.undertow.UndertowOptions;
import io.undertow.server.HttpHandler;
import io.undertow.server.handlers.PathHandler;
import io.undertow.servlet.api.DeploymentInfo;
import io.undertow.servlet.api.DeploymentManager;
import io.undertow.servlet.api.InstanceFactory;
import io.undertow.servlet.util.ImmediateInstanceHandle;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Test;
/**
* Transport test for Undertow server and Netty client.
*/
public class UndertowTransportTest extends AbstractTransportTest {
private static final String HOST = "localhost";
private static final String MYAPP = "/service";
private final FakeClock fakeClock = new FakeClock();
private Undertow undertowServer;
private DeploymentManager manager;
private int port;
@After
@Override
public void tearDown() throws InterruptedException {
super.tearDown();
if (undertowServer != null) {
undertowServer.stop();
}
if (manager != null) {
try {
manager.stop();
} catch (ServletException e) {
throw new AssertionError("failed to stop container", e);
}
}
}
@Override
protected InternalServer newServer(List<ServerStreamTracer.Factory>
streamTracerFactories) {
return new InternalServer() {
final InternalServer delegate =
new ServletServerBuilder().buildTransportServers(streamTracerFactories);
@Override
public void start(ServerListener listener) throws IOException {
delegate.start(listener);
ScheduledExecutorService scheduler = fakeClock.getScheduledExecutorService();
ServerTransportListener serverTransportListener =
listener.transportCreated(new ServerTransportImpl(scheduler));
ServletAdapter adapter =
new ServletAdapter(serverTransportListener, streamTracerFactories, Integer.MAX_VALUE);
GrpcServlet grpcServlet = new GrpcServlet(adapter);
InstanceFactory<? extends Servlet> instanceFactory =
() -> new ImmediateInstanceHandle<>(grpcServlet);
DeploymentInfo servletBuilder =
deployment()
.setClassLoader(UndertowInteropTest.class.getClassLoader())
.setContextPath(MYAPP)
.setDeploymentName("UndertowTransportTest.war")
.addServlets(
servlet("TransportTestServlet", GrpcServlet.class, instanceFactory)
.addMapping("/*")
.setAsyncSupported(true));
manager = defaultContainer().addDeployment(servletBuilder);
manager.deploy();
HttpHandler servletHandler;
try {
servletHandler = manager.start();
} catch (ServletException e) {
throw new RuntimeException(e);
}
PathHandler path =
Handlers.path(Handlers.redirect(MYAPP))
.addPrefixPath("/", servletHandler); // for unimplementedService test
undertowServer =
Undertow.builder()
.setServerOption(UndertowOptions.ENABLE_HTTP2, true)
.setServerOption(UndertowOptions.SHUTDOWN_TIMEOUT, 5000 /* 5 sec */)
.addHttpListener(0, HOST)
.setHandler(path)
.build();
undertowServer.start();
port = ((InetSocketAddress) undertowServer.getListenerInfo().get(0).getAddress()).getPort();
}
@Override
public void shutdown() {
delegate.shutdown();
}
@Override
public SocketAddress getListenSocketAddress() {
return delegate.getListenSocketAddress();
}
@Override
public InternalInstrumented<SocketStats> getListenSocketStats() {
return delegate.getListenSocketStats();
}
@Override
public List<? extends SocketAddress> getListenSocketAddresses() {
return delegate.getListenSocketAddresses();
}
@Nullable
@Override
public List<InternalInstrumented<SocketStats>> getListenSocketStatsList() {
return delegate.getListenSocketStatsList();
}
};
}
@Override
protected InternalServer newServer(int port,
List<ServerStreamTracer.Factory> streamTracerFactories) {
return newServer(streamTracerFactories);
}
@Override
protected ManagedClientTransport newClientTransport(InternalServer server) {
NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder
// Although specified here, address is ignored because we never call build.
.forAddress("localhost", 0)
.flowControlWindow(65 * 1024)
.negotiationType(NegotiationType.PLAINTEXT);
InternalNettyChannelBuilder
.setTransportTracerFactory(nettyChannelBuilder, fakeClockTransportTracer);
ClientTransportFactory clientFactory =
InternalNettyChannelBuilder.buildTransportFactory(nettyChannelBuilder);
return clientFactory.newClientTransport(
new InetSocketAddress("localhost", port),
new ClientTransportFactory.ClientTransportOptions()
.setAuthority(testAuthority(server))
.setEagAttributes(eagAttrs()),
transportLogger());
}
@Override
protected String testAuthority(InternalServer server) {
return "localhost:" + port;
}
@Override
protected void advanceClock(long offset, TimeUnit unit) {
fakeClock.forwardNanos(unit.toNanos(offset));
}
@Override
protected long fakeCurrentTimeNanos() {
return fakeClock.getTicker().read();
}
@Override
@Ignore("Skip the test, server lifecycle is managed by the container")
@Test
public void serverAlreadyListening() {}
@Override
@Ignore("Skip the test, server lifecycle is managed by the container")
@Test
public void openStreamPreventsTermination() {}
@Override
@Ignore("Skip the test, server lifecycle is managed by the container")
@Test
public void shutdownNowKillsServerStream() {}
@Override
@Ignore("Skip the test, server lifecycle is managed by the container")
@Test
public void serverNotListening() {}
@Override
@Ignore("Skip the test, can not set HTTP/2 SETTINGS_MAX_HEADER_LIST_SIZE")
@Test
public void serverChecksInboundMetadataSize() {}
// FIXME
@Override
@Ignore("Undertow is broken on client GOAWAY")
@Test
public void newStream_duringShutdown() {}
// FIXME
@Override
@Ignore("Undertow is broken on client GOAWAY")
@Test
public void ping_duringShutdown() {}
// FIXME
@Override
@Ignore("Undertow is broken on client RST_STREAM")
@Test
public void frameAfterRstStreamShouldNotBreakClientChannel() {}
// FIXME
@Override
@Ignore("Undertow is broken on client RST_STREAM")
@Test
public void shutdownNowKillsClientStream() {}
// FIXME: https://github.com/grpc/grpc-java/issues/8925
@Override
@Ignore("flaky")
@Test
public void clientCancelFromWithinMessageRead() {}
// FIXME
@Override
@Ignore("Servlet flow control not implemented yet")
@Test
public void flowControlPushBack() {}
@Override
@Ignore("Server side sockets are managed by the servlet container")
@Test
public void socketStats() {}
@Override
@Ignore("serverTransportListener will not terminate")
@Test
public void clientStartAndStopOnceConnected() {}
@Override
@Ignore("clientStreamTracer1.getInboundTrailers() is not null; listeners.poll() doesn't apply")
@Test
public void serverCancel() {}
@Override
@Ignore("This doesn't apply: Ensure that for a closed ServerStream, interactions are noops")
@Test
public void interactionsAfterServerStreamCloseAreNoops() {}
@Override
@Ignore("listeners.poll() doesn't apply")
@Test
public void interactionsAfterClientStreamCancelAreNoops() {}
@Override
@Ignore("assertNull(serverStatus.getCause()) isn't true")
@Test
public void clientCancel() {}
@Override
@Ignore("regression since bumping grpc v1.46 to v1.53")
@Test
public void messageProducerOnlyProducesRequestedMessages() {}
}

View File

@ -48,6 +48,8 @@ include ":grpc-all"
include ":grpc-alts"
include ":grpc-benchmarks"
include ":grpc-services"
include ":grpc-servlet"
include ":grpc-servlet-jakarta"
include ":grpc-xds"
include ":grpc-bom"
include ":grpc-rls"
@ -76,6 +78,8 @@ project(':grpc-all').projectDir = "$rootDir/all" as File
project(':grpc-alts').projectDir = "$rootDir/alts" as File
project(':grpc-benchmarks').projectDir = "$rootDir/benchmarks" as File
project(':grpc-services').projectDir = "$rootDir/services" as File
project(':grpc-servlet').projectDir = "$rootDir/servlet" as File
project(':grpc-servlet-jakarta').projectDir = "$rootDir/servlet/jakarta" as File
project(':grpc-xds').projectDir = "$rootDir/xds" as File
project(':grpc-bom').projectDir = "$rootDir/bom" as File
project(':grpc-rls').projectDir = "$rootDir/rls" as File