mirror of https://github.com/grpc/grpc-java.git
Remove Headers
This commit is contained in:
parent
13c14df055
commit
a508c1d4f5
|
|
@ -59,7 +59,7 @@ public class ClientAuthInterceptor implements ClientInterceptor {
|
|||
|
||||
private final Credentials credentials;
|
||||
|
||||
private Metadata.Headers cached;
|
||||
private Metadata cached;
|
||||
private Map<String, List<String>> lastMetadata;
|
||||
// TODO(louiscryan): refresh token asynchronously with this executor.
|
||||
private Executor executor;
|
||||
|
|
@ -76,9 +76,9 @@ public class ClientAuthInterceptor implements ClientInterceptor {
|
|||
// would be in WWW-Authenticate, because it does not yet have access to the header.
|
||||
return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
|
||||
@Override
|
||||
protected void checkedStart(Listener<RespT> responseListener, Metadata.Headers headers)
|
||||
protected void checkedStart(Listener<RespT> responseListener, Metadata headers)
|
||||
throws Exception {
|
||||
Metadata.Headers cachedSaved;
|
||||
Metadata cachedSaved;
|
||||
synchronized (ClientAuthInterceptor.this) {
|
||||
// TODO(louiscryan): This is icky but the current auth library stores the same
|
||||
// metadata map until the next refresh cycle. This will be fixed once
|
||||
|
|
@ -104,8 +104,8 @@ public class ClientAuthInterceptor implements ClientInterceptor {
|
|||
}
|
||||
}
|
||||
|
||||
private static final Metadata.Headers toHeaders(Map<String, List<String>> metadata) {
|
||||
Metadata.Headers headers = new Metadata.Headers();
|
||||
private static final Metadata toHeaders(Map<String, List<String>> metadata) {
|
||||
Metadata headers = new Metadata();
|
||||
if (metadata != null) {
|
||||
for (String key : metadata.keySet()) {
|
||||
Metadata.Key<String> headerKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
|
||||
|
|
|
|||
|
|
@ -31,10 +31,10 @@
|
|||
|
||||
package io.grpc.auth;
|
||||
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.isA;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Matchers.same;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.same;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
|
@ -114,7 +114,7 @@ public class ClientAuthInterceptorTests {
|
|||
when(credentials.getRequestMetadata()).thenReturn(Multimaps.asMap(values));
|
||||
ClientCall<String, Integer> interceptedCall =
|
||||
interceptor.interceptCall(descriptor, CallOptions.DEFAULT, channel);
|
||||
Metadata.Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
interceptedCall.start(listener, headers);
|
||||
verify(call).start(listener, headers);
|
||||
|
||||
|
|
@ -131,7 +131,7 @@ public class ClientAuthInterceptorTests {
|
|||
when(credentials.getRequestMetadata()).thenThrow(new IOException("Broken"));
|
||||
ClientCall<String, Integer> interceptedCall =
|
||||
interceptor.interceptCall(descriptor, CallOptions.DEFAULT, channel);
|
||||
Metadata.Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
interceptedCall.start(listener, headers);
|
||||
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
|
||||
Mockito.verify(listener).onClose(statusCaptor.capture(), isA(Metadata.class));
|
||||
|
|
@ -142,7 +142,7 @@ public class ClientAuthInterceptorTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testWithOAuth2Credential() throws IOException {
|
||||
public void testWithOAuth2Credential() {
|
||||
final AccessToken token = new AccessToken("allyourbase", new Date(Long.MAX_VALUE));
|
||||
final OAuth2Credentials oAuth2Credentials = new OAuth2Credentials() {
|
||||
@Override
|
||||
|
|
@ -153,7 +153,7 @@ public class ClientAuthInterceptorTests {
|
|||
interceptor = new ClientAuthInterceptor(oAuth2Credentials, Executors.newSingleThreadExecutor());
|
||||
ClientCall<String, Integer> interceptedCall =
|
||||
interceptor.interceptCall(descriptor, CallOptions.DEFAULT, channel);
|
||||
Metadata.Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
interceptedCall.start(listener, headers);
|
||||
verify(call).start(listener, headers);
|
||||
Iterable<String> authorization = headers.getAll(AUTHORIZATION);
|
||||
|
|
|
|||
|
|
@ -242,7 +242,7 @@ public abstract class AbstractBenchmark {
|
|||
public ServerCall.Listener<ByteBuf> startCall(
|
||||
MethodDescriptor<ByteBuf, ByteBuf> method,
|
||||
final ServerCall<ByteBuf> call,
|
||||
Metadata.Headers headers) {
|
||||
Metadata headers) {
|
||||
call.request(1);
|
||||
return new ServerCall.Listener<ByteBuf>() {
|
||||
@Override
|
||||
|
|
@ -274,7 +274,7 @@ public abstract class AbstractBenchmark {
|
|||
public ServerCall.Listener<ByteBuf> startCall(
|
||||
MethodDescriptor<ByteBuf, ByteBuf> method,
|
||||
final ServerCall<ByteBuf> call,
|
||||
Metadata.Headers headers) {
|
||||
Metadata headers) {
|
||||
call.request(1);
|
||||
return new ServerCall.Listener<ByteBuf>() {
|
||||
@Override
|
||||
|
|
@ -308,7 +308,7 @@ public abstract class AbstractBenchmark {
|
|||
public ServerCall.Listener<ByteBuf> startCall(
|
||||
MethodDescriptor<ByteBuf, ByteBuf> method,
|
||||
final ServerCall<ByteBuf> call,
|
||||
Metadata.Headers headers) {
|
||||
Metadata headers) {
|
||||
call.request(1);
|
||||
return new ServerCall.Listener<ByteBuf>() {
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -57,9 +57,29 @@ public final class CallOptions {
|
|||
// unnamed arguments, which is undesirable.
|
||||
private Long deadlineNanoTime;
|
||||
|
||||
|
||||
@Nullable
|
||||
private Compressor compressor;
|
||||
|
||||
@Nullable
|
||||
private String authority;
|
||||
|
||||
/**
|
||||
* Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
|
||||
* generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
|
||||
* services, even if those services are hosted on different domain names. That assumes the
|
||||
* server is virtually hosting multiple domains and is guaranteed to continue doing so. It is
|
||||
* rare for a service provider to make such a guarantee. <em>At this time, there is no security
|
||||
* verification of the overridden value, such as making sure the authority matches the server's
|
||||
* TLS certificate.</em>
|
||||
*/
|
||||
@ExperimentalApi
|
||||
public CallOptions withAuthority(@Nullable String authority) {
|
||||
CallOptions newOptions = new CallOptions(this);
|
||||
newOptions.authority = authority;
|
||||
return newOptions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new {@code CallOptions} with the given absolute deadline in nanoseconds in the clock
|
||||
* as per {@link System#nanoTime()}.
|
||||
|
|
@ -110,6 +130,20 @@ public final class CallOptions {
|
|||
return newOptions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
|
||||
* generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
|
||||
* services, even if those services are hosted on different domain names. That assumes the
|
||||
* server is virtually hosting multiple domains and is guaranteed to continue doing so. It is
|
||||
* rare for a service provider to make such a guarantee. <em>At this time, there is no security
|
||||
* verification of the overridden value, such as making sure the authority matches the server's
|
||||
* TLS certificate.</em>
|
||||
*/
|
||||
@Nullable
|
||||
public String getAuthority() {
|
||||
return authority;
|
||||
}
|
||||
|
||||
private CallOptions() {
|
||||
}
|
||||
|
||||
|
|
@ -119,6 +153,7 @@ public final class CallOptions {
|
|||
private CallOptions(CallOptions other) {
|
||||
deadlineNanoTime = other.deadlineNanoTime;
|
||||
compressor = other.compressor;
|
||||
authority = other.authority;
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation") // guava 14.0
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
|
|||
|
||||
import io.grpc.ClientCallImpl.ClientTransportProvider;
|
||||
import io.grpc.MessageEncoding.Compressor;
|
||||
import io.grpc.Metadata.Headers;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.internal.ClientStream;
|
||||
import io.grpc.internal.ClientStreamListener;
|
||||
import io.grpc.internal.ClientTransport;
|
||||
|
|
@ -393,7 +393,7 @@ public final class ChannelImpl extends Channel {
|
|||
|
||||
@Override
|
||||
public ClientStream newStream(
|
||||
MethodDescriptor<?, ?> method, Headers headers, ClientStreamListener listener) {
|
||||
MethodDescriptor<?, ?> method, Metadata headers, ClientStreamListener listener) {
|
||||
listener.closed(shutdownStatus, new Metadata());
|
||||
return new ClientCallImpl.NoopClientStream();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ public abstract class ClientCall<RequestT, ResponseT> {
|
|||
*
|
||||
* @param headers containing metadata sent by the server at the start of the response.
|
||||
*/
|
||||
public abstract void onHeaders(Metadata.Headers headers);
|
||||
public abstract void onHeaders(Metadata headers);
|
||||
|
||||
/**
|
||||
* A response message has been received. May be called zero or more times depending on whether
|
||||
|
|
@ -123,7 +123,7 @@ public abstract class ClientCall<RequestT, ResponseT> {
|
|||
* @throws IllegalStateException if a method (including {@code start()}) on this class has been
|
||||
* called.
|
||||
*/
|
||||
public abstract void start(Listener<ResponseT> responseListener, Metadata.Headers headers);
|
||||
public abstract void start(Listener<ResponseT> responseListener, Metadata headers);
|
||||
|
||||
/**
|
||||
* Requests up to the given number of messages from the call to be delivered to
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@
|
|||
|
||||
package io.grpc;
|
||||
|
||||
import static io.grpc.internal.GrpcUtil.AUTHORITY_KEY;
|
||||
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
|
||||
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
|
||||
import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
|
||||
|
|
@ -94,7 +95,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void start(Listener<RespT> observer, Metadata.Headers headers) {
|
||||
public void start(Listener<RespT> observer, Metadata headers) {
|
||||
Preconditions.checkState(stream == null, "Already started");
|
||||
Long deadlineNanoTime = callOptions.getDeadlineNanoTime();
|
||||
ClientStreamListener listener = new ClientStreamListenerImpl(observer, deadlineNanoTime);
|
||||
|
|
@ -124,6 +125,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
headers.put(TIMEOUT_KEY, timeoutMicros);
|
||||
}
|
||||
|
||||
// Hack to propagate authority. This should be properly pass to the transport.newStream
|
||||
// somehow.
|
||||
headers.removeAll(AUTHORITY_KEY);
|
||||
if (callOptions.getAuthority() != null) {
|
||||
headers.put(AUTHORITY_KEY, callOptions.getAuthority());
|
||||
}
|
||||
|
||||
// Fill out the User-Agent header.
|
||||
headers.removeAll(USER_AGENT_KEY);
|
||||
if (userAgent != null) {
|
||||
|
|
@ -241,7 +249,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void headersRead(final Metadata.Headers headers) {
|
||||
public void headersRead(final Metadata headers) {
|
||||
callExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ public class ClientInterceptors {
|
|||
|
||||
private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
|
||||
@Override
|
||||
public void start(Listener<Object> responseListener, Metadata.Headers headers) {}
|
||||
public void start(Listener<Object> responseListener, Metadata headers) {}
|
||||
|
||||
@Override
|
||||
public void request(int numMessages) {}
|
||||
|
|
@ -120,7 +120,7 @@ public class ClientInterceptors {
|
|||
* A {@link io.grpc.ForwardingClientCall} that delivers exceptions from its start logic to the
|
||||
* call listener.
|
||||
*
|
||||
* <p>{@link ClientCall#start(ClientCall.Listener, Metadata.Headers)} should not throw any
|
||||
* <p>{@link ClientCall#start(ClientCall.Listener, Metadata)} should not throw any
|
||||
* exception other than those caused by misuse, e.g., {@link IllegalStateException}. {@code
|
||||
* CheckedForwardingClientCall} provides {@code checkedStart()} in which throwing exceptions is
|
||||
* allowed.
|
||||
|
|
@ -140,7 +140,7 @@ public class ClientInterceptors {
|
|||
* this.delegate().start()}, as this can result in {@link ClientCall.Listener#onClose} being
|
||||
* called multiple times.
|
||||
*/
|
||||
protected abstract void checkedStart(Listener<RespT> responseListener, Metadata.Headers headers)
|
||||
protected abstract void checkedStart(Listener<RespT> responseListener, Metadata headers)
|
||||
throws Exception;
|
||||
|
||||
protected CheckedForwardingClientCall(ClientCall<ReqT, RespT> delegate) {
|
||||
|
|
@ -154,7 +154,7 @@ public class ClientInterceptors {
|
|||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public final void start(Listener<RespT> responseListener, Metadata.Headers headers) {
|
||||
public final void start(Listener<RespT> responseListener, Metadata headers) {
|
||||
try {
|
||||
checkedStart(responseListener, headers);
|
||||
} catch (Exception e) {
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ public abstract class ForwardingClientCall<ReqT, RespT> extends ClientCall<ReqT,
|
|||
protected abstract ClientCall<ReqT, RespT> delegate();
|
||||
|
||||
@Override
|
||||
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
|
||||
public void start(Listener<RespT> responseListener, Metadata headers) {
|
||||
delegate().start(responseListener, headers);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ public abstract class ForwardingClientCallListener<RespT> extends ClientCall.Lis
|
|||
protected abstract ClientCall.Listener<RespT> delegate();
|
||||
|
||||
@Override
|
||||
public void onHeaders(Metadata.Headers headers) {
|
||||
public void onHeaders(Metadata headers) {
|
||||
delegate().onHeaders(headers);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ public abstract class ForwardingServerCall<RespT> extends ServerCall<RespT> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendHeaders(Metadata.Headers headers) {
|
||||
public void sendHeaders(Metadata headers) {
|
||||
delegate().sendHeaders(headers);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -38,6 +38,8 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashMap;
|
||||
|
|
@ -239,6 +241,11 @@ public class Metadata {
|
|||
// One *2 for keys+values, one *2 to prevent resizing if a single key has multiple values
|
||||
List<byte[]> serialized = new ArrayList<byte[]>(store.size() * 2 * 2);
|
||||
for (Map.Entry<String, List<MetadataEntry>> keyEntry : store.entrySet()) {
|
||||
// Intentionally skip this field on serialization. It must be handled special by the
|
||||
// transport.
|
||||
if (keyEntry.getKey().equals(GrpcUtil.AUTHORITY_KEY.name())) {
|
||||
continue;
|
||||
}
|
||||
for (int i = 0; i < keyEntry.getValue().size(); i++) {
|
||||
MetadataEntry entry = keyEntry.getValue().get(i);
|
||||
byte[] asciiName;
|
||||
|
|
@ -297,84 +304,17 @@ public class Metadata {
|
|||
|
||||
/**
|
||||
* Concrete instance for metadata attached to the start of a call.
|
||||
*/
|
||||
public static class Headers extends Metadata {
|
||||
private String path;
|
||||
private String authority;
|
||||
|
||||
/**
|
||||
* Called by the transport layer to create headers from their binary serialized values.
|
||||
*
|
||||
* <p>This method does not copy the provided byte arrays. The byte arrays must not be mutated.
|
||||
* @deprecated use Metadata instead.
|
||||
*/
|
||||
public Headers(byte[]... headers) {
|
||||
super(headers);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public static class Headers extends Metadata {
|
||||
/**
|
||||
* Called by the application layer to construct headers prior to passing them to the
|
||||
* transport for serialization.
|
||||
*/
|
||||
public Headers() {
|
||||
}
|
||||
|
||||
/**
|
||||
* The path for the operation.
|
||||
*/
|
||||
public String getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
public void setPath(String path) {
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
/**
|
||||
* The serving authority for the operation.
|
||||
*/
|
||||
public String getAuthority() {
|
||||
return authority;
|
||||
}
|
||||
|
||||
/**
|
||||
* Override the HTTP/2 authority the channel claims to be connecting to. <em>This is not
|
||||
* generally safe.</em> Overriding allows advanced users to re-use a single Channel for multiple
|
||||
* services, even if those services are hosted on different domain names. That assumes the
|
||||
* server is virtually hosting multiple domains and is guaranteed to continue doing so. It is
|
||||
* rare for a service provider to make such a guarantee. <em>At this time, there is no security
|
||||
* verification of the overridden value, such as making sure the authority matches the server's
|
||||
* TLS certificate.</em>
|
||||
*/
|
||||
public void setAuthority(String authority) {
|
||||
this.authority = authority;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void merge(Metadata other) {
|
||||
super.merge(other);
|
||||
mergePathAndAuthority(other);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void merge(Metadata other, Set<Key<?>> keys) {
|
||||
super.merge(other, keys);
|
||||
mergePathAndAuthority(other);
|
||||
}
|
||||
|
||||
private void mergePathAndAuthority(Metadata other) {
|
||||
if (other instanceof Headers) {
|
||||
Headers otherHeaders = (Headers) other;
|
||||
path = otherHeaders.path != null ? otherHeaders.path : path;
|
||||
authority = otherHeaders.authority != null ? otherHeaders.authority : authority;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Headers(path=" + path
|
||||
+ ",authority=" + authority
|
||||
+ ",metadata=" + super.toStringInternal() + ")";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -121,7 +121,7 @@ public abstract class ServerCall<ResponseT> {
|
|||
* @throws IllegalStateException if {@code close} has been called, a message has been sent, or
|
||||
* headers have already been sent
|
||||
*/
|
||||
public abstract void sendHeaders(Metadata.Headers headers);
|
||||
public abstract void sendHeaders(Metadata headers);
|
||||
|
||||
/**
|
||||
* Send a response message. Messages are the primary form of communication associated with
|
||||
|
|
|
|||
|
|
@ -54,5 +54,5 @@ public interface ServerCallHandler<RequestT, ResponseT> {
|
|||
ServerCall.Listener<RequestT> startCall(
|
||||
MethodDescriptor<RequestT, ResponseT> method,
|
||||
ServerCall<ResponseT> call,
|
||||
Metadata.Headers headers);
|
||||
Metadata headers);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -293,7 +293,7 @@ public final class ServerImpl extends Server {
|
|||
|
||||
@Override
|
||||
public ServerStreamListener streamCreated(final ServerStream stream, final String methodName,
|
||||
final Metadata.Headers headers) {
|
||||
final Metadata headers) {
|
||||
final Future<?> timeout = scheduleTimeout(stream, headers);
|
||||
SerializingExecutor serializingExecutor = new SerializingExecutor(executor);
|
||||
final JumpToApplicationThreadServerStreamListener jumpListener
|
||||
|
|
@ -328,7 +328,7 @@ public final class ServerImpl extends Server {
|
|||
return jumpListener;
|
||||
}
|
||||
|
||||
private Future<?> scheduleTimeout(final ServerStream stream, Metadata.Headers headers) {
|
||||
private Future<?> scheduleTimeout(final ServerStream stream, Metadata headers) {
|
||||
Long timeoutMicros = headers.get(TIMEOUT_KEY);
|
||||
if (timeoutMicros == null) {
|
||||
return DEFAULT_TIMEOUT_FUTURE;
|
||||
|
|
@ -348,7 +348,7 @@ public final class ServerImpl extends Server {
|
|||
/** Never returns {@code null}. */
|
||||
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
|
||||
ServerMethodDefinition<ReqT, RespT> methodDef, Future<?> timeout,
|
||||
Metadata.Headers headers) {
|
||||
Metadata headers) {
|
||||
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
|
||||
final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>(
|
||||
stream, methodDef.getMethodDescriptor());
|
||||
|
|
@ -489,7 +489,7 @@ public final class ServerImpl extends Server {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendHeaders(Metadata.Headers headers) {
|
||||
public void sendHeaders(Metadata headers) {
|
||||
Preconditions.checkState(!sendHeadersCalled, "sendHeaders has already been called");
|
||||
Preconditions.checkState(!closeCalled, "call is closed");
|
||||
Preconditions.checkState(!sendMessageCalled, "sendMessage has already been called");
|
||||
|
|
|
|||
|
|
@ -64,6 +64,6 @@ public interface ServerInterceptor {
|
|||
<RequestT, ResponseT> ServerCall.Listener<RequestT> interceptCall(
|
||||
MethodDescriptor<RequestT, ResponseT> method,
|
||||
ServerCall<ResponseT> call,
|
||||
Metadata.Headers headers,
|
||||
Metadata headers,
|
||||
ServerCallHandler<RequestT, ResponseT> next);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ public class ServerInterceptors {
|
|||
public ServerCall.Listener<ReqT> startCall(
|
||||
MethodDescriptor<ReqT, RespT> method,
|
||||
ServerCall<RespT> call,
|
||||
Metadata.Headers headers) {
|
||||
Metadata headers) {
|
||||
return interceptor.interceptCall(method, call, headers, callHandler);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ import io.grpc.Status;
|
|||
import io.grpc.internal.ClientStream;
|
||||
import io.grpc.internal.ClientStreamListener;
|
||||
import io.grpc.internal.ClientTransport;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.ServerStream;
|
||||
import io.grpc.internal.ServerStreamListener;
|
||||
import io.grpc.internal.ServerTransport;
|
||||
|
|
@ -106,11 +107,12 @@ class InProcessTransport implements ServerTransport, ClientTransport {
|
|||
|
||||
@Override
|
||||
public synchronized ClientStream newStream(MethodDescriptor<?, ?> method,
|
||||
Metadata.Headers headers, ClientStreamListener clientStreamListener) {
|
||||
Metadata headers, ClientStreamListener clientStreamListener) {
|
||||
if (shutdownStatus != null) {
|
||||
clientStreamListener.closed(shutdownStatus, new Metadata());
|
||||
return new NoopClientStream();
|
||||
}
|
||||
headers.removeAll(GrpcUtil.AUTHORITY_KEY);
|
||||
InProcessStream stream = new InProcessStream();
|
||||
stream.serverStream.setListener(clientStreamListener);
|
||||
ServerStreamListener serverStreamListener = serverTransportListener.streamCreated(
|
||||
|
|
@ -265,7 +267,7 @@ class InProcessTransport implements ServerTransport, ClientTransport {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void writeHeaders(Metadata.Headers headers) {
|
||||
public synchronized void writeHeaders(Metadata headers) {
|
||||
if (closed) {
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -116,7 +116,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
|
|||
*
|
||||
* @param headers the parsed headers
|
||||
*/
|
||||
protected void inboundHeadersReceived(Metadata.Headers headers) {
|
||||
protected void inboundHeadersReceived(Metadata headers) {
|
||||
if (inboundPhase() == Phase.STATUS) {
|
||||
log.log(Level.INFO, "Received headers on closed stream {0} {1}",
|
||||
new Object[]{id(), headers});
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
|
|||
}
|
||||
|
||||
@Override
|
||||
public final void writeHeaders(Metadata.Headers headers) {
|
||||
public final void writeHeaders(Metadata headers) {
|
||||
Preconditions.checkNotNull(headers, "headers");
|
||||
outboundPhase(Phase.HEADERS);
|
||||
headersSent = true;
|
||||
|
|
@ -102,7 +102,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
|
|||
@Override
|
||||
public final void writeMessage(InputStream message) {
|
||||
if (!headersSent) {
|
||||
writeHeaders(new Metadata.Headers());
|
||||
writeHeaders(new Metadata());
|
||||
headersSent = true;
|
||||
}
|
||||
super.writeMessage(message);
|
||||
|
|
@ -169,7 +169,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
|
|||
*
|
||||
* @param headers the headers to be sent to client.
|
||||
*/
|
||||
protected abstract void internalSendHeaders(Metadata.Headers headers);
|
||||
protected abstract void internalSendHeaders(Metadata headers);
|
||||
|
||||
/**
|
||||
* Sends an outbound frame to the remote end point.
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ public interface ClientStreamListener extends StreamListener {
|
|||
*
|
||||
* @param headers the fully buffered received headers.
|
||||
*/
|
||||
void headersRead(Metadata.Headers headers);
|
||||
void headersRead(Metadata headers);
|
||||
|
||||
/**
|
||||
* Called when the stream is fully closed. {@link
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ public interface ClientTransport {
|
|||
*/
|
||||
// TODO(nmittler): Consider also throwing for stopping.
|
||||
ClientStream newStream(MethodDescriptor<?, ?> method,
|
||||
Metadata.Headers headers,
|
||||
Metadata headers,
|
||||
ClientStreamListener listener);
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -61,6 +61,17 @@ public final class GrpcUtil {
|
|||
public static final Metadata.Key<String> MESSAGE_ENCODING_KEY =
|
||||
Metadata.Key.of(GrpcUtil.MESSAGE_ENCODING, Metadata.ASCII_STRING_MARSHALLER);
|
||||
|
||||
/**
|
||||
* {@link io.grpc.Metadata.Key} for the :authority pseudo header.
|
||||
*
|
||||
* <p> Don't actually serialized this.
|
||||
*
|
||||
* <p>TODO(carl-mastrangelo): This is a hack and should exist as shortly as possible. Remove it
|
||||
* once a cleaner alternative exists (passing it directly into the transport, etc.)
|
||||
*/
|
||||
public static final Metadata.Key<String> AUTHORITY_KEY =
|
||||
Metadata.Key.of("grpc-authority", Metadata.ASCII_STRING_MARSHALLER);
|
||||
|
||||
/**
|
||||
* {@link io.grpc.Metadata.Key} for the Content-Type request/response header.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
|
|||
*
|
||||
* @param headers the received headers
|
||||
*/
|
||||
protected void transportHeadersReceived(Metadata.Headers headers) {
|
||||
protected void transportHeadersReceived(Metadata headers) {
|
||||
Preconditions.checkNotNull(headers);
|
||||
if (transportError != null) {
|
||||
// Already received a transport error so just augment it.
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ public interface ServerStream extends Stream {
|
|||
*
|
||||
* @param headers to send to client.
|
||||
*/
|
||||
void writeHeaders(Metadata.Headers headers);
|
||||
void writeHeaders(Metadata headers);
|
||||
|
||||
/**
|
||||
* Closes the stream for both reading and writing. A status code of
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ public interface ServerTransportListener {
|
|||
* @return a listener for events on the new stream.
|
||||
*/
|
||||
ServerStreamListener streamCreated(ServerStream stream, String method,
|
||||
Metadata.Headers headers);
|
||||
Metadata headers);
|
||||
|
||||
/**
|
||||
* The transport completed shutting down. All resources have been released.
|
||||
|
|
|
|||
|
|
@ -113,7 +113,7 @@ public class ChannelImplTest {
|
|||
public void immediateDeadlineExceeded() {
|
||||
ClientCall<String, Integer> call =
|
||||
channel.newCall(method, CallOptions.DEFAULT.withDeadlineNanoTime(System.nanoTime()));
|
||||
call.start(mockCallListener, new Metadata.Headers());
|
||||
call.start(mockCallListener, new Metadata());
|
||||
verify(mockCallListener, timeout(1000)).onClose(
|
||||
same(Status.DEADLINE_EXCEEDED), any(Metadata.class));
|
||||
}
|
||||
|
|
@ -135,7 +135,7 @@ public class ChannelImplTest {
|
|||
// Create transport and call
|
||||
ClientTransport mockTransport = mock(ClientTransport.class);
|
||||
ClientStream mockStream = mock(ClientStream.class);
|
||||
Metadata.Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport);
|
||||
when(mockTransport.newStream(same(method), same(headers), any(ClientStreamListener.class)))
|
||||
.thenReturn(mockStream);
|
||||
|
|
@ -150,7 +150,7 @@ public class ChannelImplTest {
|
|||
// Second call
|
||||
ClientCall<String, Integer> call2 = channel.newCall(method, CallOptions.DEFAULT);
|
||||
ClientStream mockStream2 = mock(ClientStream.class);
|
||||
Metadata.Headers headers2 = new Metadata.Headers();
|
||||
Metadata headers2 = new Metadata();
|
||||
when(mockTransport.newStream(same(method), same(headers2), any(ClientStreamListener.class)))
|
||||
.thenReturn(mockStream2);
|
||||
call2.start(mockCallListener2, headers2);
|
||||
|
|
@ -169,7 +169,7 @@ public class ChannelImplTest {
|
|||
|
||||
// Further calls should fail without going to the transport
|
||||
ClientCall<String, Integer> call3 = channel.newCall(method, CallOptions.DEFAULT);
|
||||
call3.start(mockCallListener3, new Metadata.Headers());
|
||||
call3.start(mockCallListener3, new Metadata());
|
||||
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
|
||||
verify(mockCallListener3, timeout(1000))
|
||||
.onClose(statusCaptor.capture(), any(Metadata.class));
|
||||
|
|
@ -200,7 +200,7 @@ public class ChannelImplTest {
|
|||
when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport);
|
||||
doThrow(goldenStatus.asRuntimeException())
|
||||
.when(mockTransport).start(any(ClientTransport.Listener.class));
|
||||
call.start(mockCallListener, new Metadata.Headers());
|
||||
call.start(mockCallListener, new Metadata());
|
||||
verify(mockTransportFactory).newClientTransport();
|
||||
verify(mockTransport).start(any(ClientTransport.Listener.class));
|
||||
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
|
||||
|
|
@ -212,7 +212,7 @@ public class ChannelImplTest {
|
|||
call = channel.newCall(method, CallOptions.DEFAULT);
|
||||
ClientTransport mockTransport2 = mock(ClientTransport.class);
|
||||
ClientStream mockStream2 = mock(ClientStream.class);
|
||||
Metadata.Headers headers2 = new Metadata.Headers();
|
||||
Metadata headers2 = new Metadata();
|
||||
when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport2);
|
||||
doAnswer(new Answer<Void>() {
|
||||
@Override
|
||||
|
|
@ -237,7 +237,7 @@ public class ChannelImplTest {
|
|||
call = channel.newCall(method, CallOptions.DEFAULT);
|
||||
ClientTransport mockTransport3 = mock(ClientTransport.class);
|
||||
ClientStream mockStream3 = mock(ClientStream.class);
|
||||
Metadata.Headers headers3 = new Metadata.Headers();
|
||||
Metadata headers3 = new Metadata();
|
||||
when(mockTransportFactory.newClientTransport()).thenReturn(mockTransport3);
|
||||
when(mockTransport3.newStream(same(method), same(headers3), any(ClientStreamListener.class)))
|
||||
.thenReturn(mockStream3);
|
||||
|
|
@ -289,7 +289,7 @@ public class ChannelImplTest {
|
|||
public void testNoDeadlockOnShutdown() {
|
||||
// Force creation of transport
|
||||
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
|
||||
call.start(mockCallListener, new Metadata.Headers());
|
||||
call.start(mockCallListener, new Metadata());
|
||||
call.cancel();
|
||||
|
||||
verify(mockTransport).start(transportListenerCaptor.capture());
|
||||
|
|
|
|||
|
|
@ -34,11 +34,11 @@ package io.grpc;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.same;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.anyInt;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
|
@ -91,8 +91,7 @@ public class ClientInterceptorsTest {
|
|||
Answer<Void> checkStartCalled = new Answer<Void>() {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) {
|
||||
verify(call).start(Mockito.<ClientCall.Listener<Integer>>any(),
|
||||
Mockito.<Metadata.Headers>any());
|
||||
verify(call).start(Mockito.<ClientCall.Listener<Integer>>any(), Mockito.<Metadata>any());
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
|
@ -228,7 +227,7 @@ public class ClientInterceptorsTest {
|
|||
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
|
||||
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
|
||||
@Override
|
||||
public void start(ClientCall.Listener<RespT> responseListener, Metadata.Headers headers) {
|
||||
public void start(ClientCall.Listener<RespT> responseListener, Metadata headers) {
|
||||
headers.put(credKey, "abcd");
|
||||
super.start(responseListener, headers);
|
||||
}
|
||||
|
|
@ -240,8 +239,8 @@ public class ClientInterceptorsTest {
|
|||
ClientCall.Listener<Integer> listener = mock(ClientCall.Listener.class);
|
||||
ClientCall<String, Integer> interceptedCall = intercepted.newCall(method, CallOptions.DEFAULT);
|
||||
// start() on the intercepted call will eventually reach the call created by the real channel
|
||||
interceptedCall.start(listener, new Metadata.Headers());
|
||||
ArgumentCaptor<Metadata.Headers> captor = ArgumentCaptor.forClass(Metadata.Headers.class);
|
||||
interceptedCall.start(listener, new Metadata());
|
||||
ArgumentCaptor<Metadata> captor = ArgumentCaptor.forClass(Metadata.class);
|
||||
// The headers passed to the real channel call will contain the information inserted by the
|
||||
// interceptor.
|
||||
verify(call).start(same(listener), captor.capture());
|
||||
|
|
@ -250,7 +249,7 @@ public class ClientInterceptorsTest {
|
|||
|
||||
@Test
|
||||
public void examineInboundHeaders() {
|
||||
final List<Metadata.Headers> examinedHeaders = new ArrayList<Metadata.Headers>();
|
||||
final List<Metadata> examinedHeaders = new ArrayList<Metadata>();
|
||||
ClientInterceptor interceptor = new ClientInterceptor() {
|
||||
@Override
|
||||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
|
||||
|
|
@ -260,10 +259,10 @@ public class ClientInterceptorsTest {
|
|||
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
|
||||
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
|
||||
@Override
|
||||
public void start(ClientCall.Listener<RespT> responseListener, Metadata.Headers headers) {
|
||||
public void start(ClientCall.Listener<RespT> responseListener, Metadata headers) {
|
||||
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
|
||||
@Override
|
||||
public void onHeaders(Metadata.Headers headers) {
|
||||
public void onHeaders(Metadata headers) {
|
||||
examinedHeaders.add(headers);
|
||||
super.onHeaders(headers);
|
||||
}
|
||||
|
|
@ -276,11 +275,11 @@ public class ClientInterceptorsTest {
|
|||
@SuppressWarnings("unchecked")
|
||||
ClientCall.Listener<Integer> listener = mock(ClientCall.Listener.class);
|
||||
ClientCall<String, Integer> interceptedCall = intercepted.newCall(method, CallOptions.DEFAULT);
|
||||
interceptedCall.start(listener, new Metadata.Headers());
|
||||
interceptedCall.start(listener, new Metadata());
|
||||
// Capture the underlying call listener that will receive headers from the transport.
|
||||
ArgumentCaptor<ClientCall.Listener<Integer>> captor = ArgumentCaptor.forClass(null);
|
||||
verify(call).start(captor.capture(), Mockito.<Metadata.Headers>any());
|
||||
Metadata.Headers inboundHeaders = new Metadata.Headers();
|
||||
verify(call).start(captor.capture(), Mockito.<Metadata>any());
|
||||
Metadata inboundHeaders = new Metadata();
|
||||
// Simulate that a headers arrives on the underlying call listener.
|
||||
captor.getValue().onHeaders(inboundHeaders);
|
||||
assertEquals(Arrays.asList(inboundHeaders), examinedHeaders);
|
||||
|
|
@ -303,7 +302,7 @@ public class ClientInterceptorsTest {
|
|||
assertNotSame(call, interceptedCall);
|
||||
@SuppressWarnings("unchecked")
|
||||
ClientCall.Listener<Integer> listener = mock(ClientCall.Listener.class);
|
||||
Metadata.Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
interceptedCall.start(listener, headers);
|
||||
verify(call).start(same(listener), same(headers));
|
||||
interceptedCall.sendMessage("request");
|
||||
|
|
@ -326,8 +325,8 @@ public class ClientInterceptorsTest {
|
|||
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
|
||||
return new CheckedForwardingClientCall<ReqT, RespT>(call) {
|
||||
@Override
|
||||
protected void checkedStart(ClientCall.Listener<RespT> responseListener,
|
||||
Metadata.Headers headers) throws Exception {
|
||||
protected void checkedStart(ClientCall.Listener<RespT> responseListener, Metadata headers)
|
||||
throws Exception {
|
||||
throw error;
|
||||
// delegate().start will not be called
|
||||
}
|
||||
|
|
@ -339,7 +338,7 @@ public class ClientInterceptorsTest {
|
|||
ClientCall.Listener<Integer> listener = mock(ClientCall.Listener.class);
|
||||
ClientCall<String, Integer> interceptedCall = intercepted.newCall(method, CallOptions.DEFAULT);
|
||||
assertNotSame(call, interceptedCall);
|
||||
interceptedCall.start(listener, new Metadata.Headers());
|
||||
interceptedCall.start(listener, new Metadata());
|
||||
interceptedCall.sendMessage("request");
|
||||
interceptedCall.halfClose();
|
||||
interceptedCall.request(1);
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ public class MetadataTest {
|
|||
public void testMutations() {
|
||||
Fish lance = new Fish(LANCE);
|
||||
Fish cat = new Fish("cat");
|
||||
Metadata.Headers metadata = new Metadata.Headers();
|
||||
Metadata metadata = new Metadata();
|
||||
|
||||
assertEquals(null, metadata.get(KEY));
|
||||
metadata.put(KEY, lance);
|
||||
|
|
@ -107,7 +107,7 @@ public class MetadataTest {
|
|||
@Test
|
||||
public void testWriteParsed() {
|
||||
Fish lance = new Fish(LANCE);
|
||||
Metadata.Headers metadata = new Metadata.Headers();
|
||||
Metadata metadata = new Metadata();
|
||||
metadata.put(KEY, lance);
|
||||
// Should be able to read same instance out
|
||||
assertSame(lance, metadata.get(KEY));
|
||||
|
|
@ -127,7 +127,7 @@ public class MetadataTest {
|
|||
|
||||
@Test
|
||||
public void testWriteRaw() {
|
||||
Metadata.Headers raw = new Metadata.Headers(KEY.asciiName(), LANCE_BYTES);
|
||||
Metadata raw = new Metadata(KEY.asciiName(), LANCE_BYTES);
|
||||
Fish lance = raw.get(KEY);
|
||||
assertEquals(lance, new Fish(LANCE));
|
||||
// Reading again should return the same parsed instance
|
||||
|
|
@ -136,7 +136,7 @@ public class MetadataTest {
|
|||
|
||||
@Test
|
||||
public void testSerializeRaw() {
|
||||
Metadata.Headers raw = new Metadata.Headers(KEY.asciiName(), LANCE_BYTES);
|
||||
Metadata raw = new Metadata(KEY.asciiName(), LANCE_BYTES);
|
||||
byte[][] serialized = raw.serialize();
|
||||
assertArrayEquals(serialized[0], KEY.asciiName());
|
||||
assertArrayEquals(serialized[1], LANCE_BYTES);
|
||||
|
|
@ -144,8 +144,8 @@ public class MetadataTest {
|
|||
|
||||
@Test
|
||||
public void testMergeByteConstructed() {
|
||||
Metadata.Headers raw = new Metadata.Headers(KEY.asciiName(), LANCE_BYTES);
|
||||
Metadata.Headers serializable = new Metadata.Headers();
|
||||
Metadata raw = new Metadata(KEY.asciiName(), LANCE_BYTES);
|
||||
Metadata serializable = new Metadata();
|
||||
serializable.merge(raw);
|
||||
|
||||
byte[][] serialized = serializable.serialize();
|
||||
|
|
@ -157,11 +157,9 @@ public class MetadataTest {
|
|||
@Test
|
||||
public void headerMergeShouldCopyValues() {
|
||||
Fish lance = new Fish(LANCE);
|
||||
Metadata.Headers h1 = new Metadata.Headers();
|
||||
Metadata h1 = new Metadata();
|
||||
|
||||
Metadata.Headers h2 = new Metadata.Headers();
|
||||
h2.setPath("/some/path");
|
||||
h2.setAuthority("authority");
|
||||
Metadata h2 = new Metadata();
|
||||
h2.put(KEY, lance);
|
||||
|
||||
h1.merge(h2);
|
||||
|
|
@ -170,8 +168,6 @@ public class MetadataTest {
|
|||
assertTrue(fishes.hasNext());
|
||||
assertSame(fishes.next(), lance);
|
||||
assertFalse(fishes.hasNext());
|
||||
assertEquals("/some/path", h1.getPath());
|
||||
assertEquals("authority", h1.getAuthority());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -195,13 +191,10 @@ public class MetadataTest {
|
|||
|
||||
@Test
|
||||
public void verifyToString() {
|
||||
Metadata.Headers h = new Metadata.Headers();
|
||||
h.setPath("/path");
|
||||
h.setAuthority("myauthority");
|
||||
Metadata h = new Metadata();
|
||||
h.put(KEY, new Fish("binary"));
|
||||
h.put(Metadata.Key.of("test", Metadata.ASCII_STRING_MARSHALLER), "ascii");
|
||||
assertEquals("Headers(path=/path,authority=myauthority,"
|
||||
+ "metadata={test-bin=[Fish(binary)], test=[ascii]})", h.toString());
|
||||
assertEquals("Metadata({test-bin=[Fish(binary)], test=[ascii]})", h.toString());
|
||||
|
||||
Metadata t = new Metadata();
|
||||
t.put(Metadata.Key.of("test", Metadata.ASCII_STRING_MARSHALLER), "ascii");
|
||||
|
|
@ -229,7 +222,7 @@ public class MetadataTest {
|
|||
Metadata.Key<String> keyUpperCase
|
||||
= Metadata.Key.of("IF-MODIFIED-SINCE", Metadata.ASCII_STRING_MARSHALLER);
|
||||
|
||||
Metadata metadata = new Metadata.Headers();
|
||||
Metadata metadata = new Metadata();
|
||||
metadata.put(keyTitleCase, "plain string");
|
||||
assertEquals("plain string", metadata.get(keyTitleCase));
|
||||
assertEquals("plain string", metadata.get(keyLowerCase));
|
||||
|
|
|
|||
|
|
@ -186,7 +186,7 @@ public class ServerImplTest {
|
|||
public ServerCall.Listener<String> startCall(
|
||||
MethodDescriptor<String, Integer> method,
|
||||
ServerCall<Integer> call,
|
||||
Metadata.Headers headers) {
|
||||
Metadata headers) {
|
||||
assertEquals("Waiter/serve", method.getFullMethodName());
|
||||
assertNotNull(call);
|
||||
assertNotNull(headers);
|
||||
|
|
@ -198,7 +198,7 @@ public class ServerImplTest {
|
|||
ServerTransportListener transportListener
|
||||
= transportServer.registerNewServerTransport(new SimpleServerTransport());
|
||||
|
||||
Metadata.Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
headers.put(metadataKey, 0);
|
||||
ServerStreamListener streamListener
|
||||
= transportListener.streamCreated(stream, "Waiter/serve", headers);
|
||||
|
|
@ -254,7 +254,7 @@ public class ServerImplTest {
|
|||
public ServerCall.Listener<String> startCall(
|
||||
MethodDescriptor<String, Integer> method,
|
||||
ServerCall<Integer> call,
|
||||
Metadata.Headers headers) {
|
||||
Metadata headers) {
|
||||
throw status.asRuntimeException();
|
||||
}
|
||||
}).build());
|
||||
|
|
@ -262,7 +262,7 @@ public class ServerImplTest {
|
|||
= transportServer.registerNewServerTransport(new SimpleServerTransport());
|
||||
|
||||
ServerStreamListener streamListener
|
||||
= transportListener.streamCreated(stream, "Waiter/serve", new Metadata.Headers());
|
||||
= transportListener.streamCreated(stream, "Waiter/serve", new Metadata());
|
||||
assertNotNull(streamListener);
|
||||
verifyNoMoreInteractions(stream);
|
||||
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
|
||||
import io.grpc.Metadata.Headers;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor.Marshaller;
|
||||
import io.grpc.MethodDescriptor.MethodType;
|
||||
import io.grpc.ServerCall.Listener;
|
||||
|
|
@ -80,7 +80,7 @@ public class ServerInterceptorsTest {
|
|||
MethodDescriptor.create(
|
||||
MethodType.UNKNOWN, "basic/flow", requestMarshaller, responseMarshaller),
|
||||
handler).build();
|
||||
private Headers headers = new Headers();
|
||||
private Metadata headers = new Metadata();
|
||||
|
||||
/** Set up for test. */
|
||||
@Before
|
||||
|
|
@ -88,7 +88,7 @@ public class ServerInterceptorsTest {
|
|||
MockitoAnnotations.initMocks(this);
|
||||
Mockito.when(handler.startCall(
|
||||
Mockito.<MethodDescriptor<String, Integer>>any(),
|
||||
Mockito.<ServerCall<Integer>>any(), Mockito.<Headers>any()))
|
||||
Mockito.<ServerCall<Integer>>any(), Mockito.<Metadata>any()))
|
||||
.thenReturn(listener);
|
||||
}
|
||||
|
||||
|
|
@ -172,7 +172,7 @@ public class ServerInterceptorsTest {
|
|||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||
MethodDescriptor<ReqT, RespT> method,
|
||||
ServerCall<RespT> call,
|
||||
Headers headers,
|
||||
Metadata headers,
|
||||
ServerCallHandler<ReqT, RespT> next) {
|
||||
// Calling next twice is permitted, although should only rarely be useful.
|
||||
assertSame(listener, next.startCall(method, call, headers));
|
||||
|
|
@ -195,7 +195,7 @@ public class ServerInterceptorsTest {
|
|||
public ServerCall.Listener<String> startCall(
|
||||
MethodDescriptor<String, Integer> method,
|
||||
ServerCall<Integer> call,
|
||||
Headers headers) {
|
||||
Metadata headers) {
|
||||
order.add("handler");
|
||||
return listener;
|
||||
}
|
||||
|
|
@ -205,7 +205,7 @@ public class ServerInterceptorsTest {
|
|||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||
MethodDescriptor<ReqT, RespT> method,
|
||||
ServerCall<RespT> call,
|
||||
Headers headers,
|
||||
Metadata headers,
|
||||
ServerCallHandler<ReqT, RespT> next) {
|
||||
order.add("i1");
|
||||
return next.startCall(method, call, headers);
|
||||
|
|
@ -216,7 +216,7 @@ public class ServerInterceptorsTest {
|
|||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||
MethodDescriptor<ReqT, RespT> method,
|
||||
ServerCall<RespT> call,
|
||||
Headers headers,
|
||||
Metadata headers,
|
||||
ServerCallHandler<ReqT, RespT> next) {
|
||||
order.add("i2");
|
||||
return next.startCall(method, call, headers);
|
||||
|
|
@ -247,7 +247,7 @@ public class ServerInterceptorsTest {
|
|||
public <R1, R2> ServerCall.Listener<R1> interceptCall(
|
||||
MethodDescriptor<R1, R2> methodDescriptor,
|
||||
ServerCall<R2> call,
|
||||
Headers headers,
|
||||
Metadata headers,
|
||||
ServerCallHandler<R1, R2> next) {
|
||||
assertSame(method, methodDescriptor);
|
||||
assertSame(call, ServerInterceptorsTest.this.call);
|
||||
|
|
@ -287,7 +287,7 @@ public class ServerInterceptorsTest {
|
|||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||
MethodDescriptor<ReqT, RespT> method,
|
||||
ServerCall<RespT> call,
|
||||
Headers headers,
|
||||
Metadata headers,
|
||||
ServerCallHandler<ReqT, RespT> next) {
|
||||
return next.startCall(method, call, headers);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,6 @@ import static org.mockito.Mockito.verify;
|
|||
|
||||
import io.grpc.MessageEncoding;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Metadata.Headers;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.Status.Code;
|
||||
import io.grpc.internal.AbstractStream.Phase;
|
||||
|
|
@ -172,7 +171,7 @@ public class AbstractClientStreamTest {
|
|||
public void inboundHeadersReceived_notifiesListener() {
|
||||
AbstractClientStream<Integer> stream =
|
||||
new BaseAbstractClientStream<Integer>(allocator, mockListener);
|
||||
Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
|
||||
stream.inboundHeadersReceived(headers);
|
||||
verify(mockListener).headersRead(headers);
|
||||
|
|
@ -182,7 +181,7 @@ public class AbstractClientStreamTest {
|
|||
public void inboundHeadersReceived_failsOnPhaseStatus() {
|
||||
AbstractClientStream<Integer> stream =
|
||||
new BaseAbstractClientStream<Integer>(allocator, mockListener);
|
||||
Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
stream.inboundPhase(Phase.STATUS);
|
||||
|
||||
thrown.expect(IllegalStateException.class);
|
||||
|
|
@ -194,7 +193,7 @@ public class AbstractClientStreamTest {
|
|||
public void inboundHeadersReceived_succeedsOnPhaseMessage() {
|
||||
AbstractClientStream<Integer> stream =
|
||||
new BaseAbstractClientStream<Integer>(allocator, mockListener);
|
||||
Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
stream.inboundPhase(Phase.MESSAGE);
|
||||
|
||||
stream.inboundHeadersReceived(headers);
|
||||
|
|
@ -206,7 +205,7 @@ public class AbstractClientStreamTest {
|
|||
public void inboundHeadersReceived_acceptsGzipEncoding() {
|
||||
AbstractClientStream<Integer> stream =
|
||||
new BaseAbstractClientStream<Integer>(allocator, mockListener);
|
||||
Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, new MessageEncoding.Gzip().getMessageEncoding());
|
||||
|
||||
stream.inboundHeadersReceived(headers);
|
||||
|
|
@ -217,7 +216,7 @@ public class AbstractClientStreamTest {
|
|||
public void inboundHeadersReceived_acceptsIdentityEncoding() {
|
||||
AbstractClientStream<Integer> stream =
|
||||
new BaseAbstractClientStream<Integer>(allocator, mockListener);
|
||||
Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, MessageEncoding.NONE.getMessageEncoding());
|
||||
|
||||
stream.inboundHeadersReceived(headers);
|
||||
|
|
@ -228,7 +227,7 @@ public class AbstractClientStreamTest {
|
|||
public void inboundHeadersReceived_notifiesListenerOnBadEncoding() {
|
||||
AbstractClientStream<Integer> stream =
|
||||
new BaseAbstractClientStream<Integer>(allocator, mockListener);
|
||||
Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, "bad");
|
||||
|
||||
stream.inboundHeadersReceived(headers);
|
||||
|
|
@ -275,7 +274,7 @@ public class AbstractClientStreamTest {
|
|||
public void onReady() {}
|
||||
|
||||
@Override
|
||||
public void headersRead(Headers headers) {}
|
||||
public void headersRead(Metadata headers) {}
|
||||
|
||||
@Override
|
||||
public void closed(Status status, Metadata trailers) {}
|
||||
|
|
|
|||
|
|
@ -41,8 +41,8 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import com.google.common.io.BaseEncoding;
|
||||
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Metadata.BinaryMarshaller;
|
||||
import io.grpc.Metadata.Headers;
|
||||
import io.grpc.Metadata.Key;
|
||||
|
||||
import org.junit.Test;
|
||||
|
|
@ -79,7 +79,7 @@ public class TransportFrameUtilTest {
|
|||
|
||||
@Test
|
||||
public void testToHttp2Headers() {
|
||||
Headers headers = new Headers();
|
||||
Metadata headers = new Metadata();
|
||||
headers.put(PLAIN_STRING, COMPLIANT_ASCII_STRING);
|
||||
headers.put(BINARY_STRING, NONCOMPLIANT_ASCII_STRING);
|
||||
headers.put(BINARY_STRING_WITHOUT_SUFFIX, NONCOMPLIANT_ASCII_STRING);
|
||||
|
|
@ -104,13 +104,13 @@ public class TransportFrameUtilTest {
|
|||
|
||||
@Test
|
||||
public void testToAndFromHttp2Headers() {
|
||||
Headers headers = new Headers();
|
||||
Metadata headers = new Metadata();
|
||||
headers.put(PLAIN_STRING, COMPLIANT_ASCII_STRING);
|
||||
headers.put(BINARY_STRING, NONCOMPLIANT_ASCII_STRING);
|
||||
headers.put(BINARY_STRING_WITHOUT_SUFFIX, NONCOMPLIANT_ASCII_STRING);
|
||||
byte[][] http2Headers = TransportFrameUtil.toHttp2Headers(headers);
|
||||
byte[][] rawSerialized = TransportFrameUtil.toRawSerializedHeaders(http2Headers);
|
||||
Headers recoveredHeaders = new Headers(rawSerialized);
|
||||
Metadata recoveredHeaders = new Metadata(rawSerialized);
|
||||
assertEquals(COMPLIANT_ASCII_STRING, recoveredHeaders.get(PLAIN_STRING));
|
||||
assertEquals(NONCOMPLIANT_ASCII_STRING, recoveredHeaders.get(BINARY_STRING));
|
||||
assertNull(recoveredHeaders.get(BINARY_STRING_WITHOUT_SUFFIX));
|
||||
|
|
|
|||
|
|
@ -58,12 +58,12 @@ public class HeaderClientInterceptor implements ClientInterceptor {
|
|||
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
|
||||
|
||||
@Override
|
||||
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
|
||||
public void start(Listener<RespT> responseListener, Metadata headers) {
|
||||
/* put custom header */
|
||||
headers.put(customHeadKey, "customRequestValue");
|
||||
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
|
||||
@Override
|
||||
public void onHeaders(Metadata.Headers headers) {
|
||||
public void onHeaders(Metadata headers) {
|
||||
/**
|
||||
* if you don't need receive header from server,
|
||||
* you can use {@link io.grpc.stub.MetadataUtils attachHeaders}
|
||||
|
|
|
|||
|
|
@ -56,14 +56,14 @@ public class HeaderServerInterceptor implements ServerInterceptor {
|
|||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||
MethodDescriptor<ReqT, RespT> method,
|
||||
ServerCall<RespT> call,
|
||||
final Metadata.Headers requestHeaders,
|
||||
final Metadata requestHeaders,
|
||||
ServerCallHandler<ReqT, RespT> next) {
|
||||
logger.info("header received from client:" + requestHeaders.toString());
|
||||
return next.startCall(method, new SimpleForwardingServerCall<RespT>(call) {
|
||||
boolean sentHeaders = false;
|
||||
|
||||
@Override
|
||||
public void sendHeaders(Metadata.Headers responseHeaders) {
|
||||
public void sendHeaders(Metadata responseHeaders) {
|
||||
responseHeaders.put(customHeadKey, "customRespondValue");
|
||||
super.sendHeaders(responseHeaders);
|
||||
sentHeaders = true;
|
||||
|
|
@ -72,7 +72,7 @@ public class HeaderServerInterceptor implements ServerInterceptor {
|
|||
@Override
|
||||
public void sendMessage(RespT message) {
|
||||
if (!sentHeaders) {
|
||||
sendHeaders(new Metadata.Headers());
|
||||
sendHeaders(new Metadata());
|
||||
}
|
||||
super.sendMessage(message);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -99,8 +99,8 @@ public abstract class AbstractTransportTest {
|
|||
|
||||
public static final Metadata.Key<Messages.SimpleContext> METADATA_KEY =
|
||||
ProtoUtils.keyForProto(Messages.SimpleContext.getDefaultInstance());
|
||||
private static final AtomicReference<Metadata.Headers> requestHeadersCapture =
|
||||
new AtomicReference<Metadata.Headers>();
|
||||
private static final AtomicReference<Metadata> requestHeadersCapture =
|
||||
new AtomicReference<Metadata>();
|
||||
private static ScheduledExecutorService testServiceExecutor;
|
||||
private static ServerImpl server;
|
||||
private static int OPERATION_TIMEOUT = 5000;
|
||||
|
|
@ -449,7 +449,7 @@ public abstract class AbstractTransportTest {
|
|||
channel.newCall(TestServiceGrpc.METHOD_STREAMING_OUTPUT_CALL, CallOptions.DEFAULT);
|
||||
call.start(new ClientCall.Listener<StreamingOutputCallResponse>() {
|
||||
@Override
|
||||
public void onHeaders(Metadata.Headers headers) {}
|
||||
public void onHeaders(Metadata headers) {}
|
||||
|
||||
@Override
|
||||
public void onMessage(final StreamingOutputCallResponse message) {
|
||||
|
|
@ -460,7 +460,7 @@ public abstract class AbstractTransportTest {
|
|||
public void onClose(Status status, Metadata trailers) {
|
||||
queue.add(status);
|
||||
}
|
||||
}, new Metadata.Headers());
|
||||
}, new Metadata());
|
||||
call.sendMessage(request);
|
||||
call.halfClose();
|
||||
|
||||
|
|
@ -521,7 +521,7 @@ public abstract class AbstractTransportTest {
|
|||
TestServiceGrpc.newBlockingStub(channel);
|
||||
|
||||
// Capture the context exchange
|
||||
Metadata.Headers fixedHeaders = new Metadata.Headers();
|
||||
Metadata fixedHeaders = new Metadata();
|
||||
// Send a context proto (as it's in the default extension registry)
|
||||
Messages.SimpleContext contextValue =
|
||||
Messages.SimpleContext.newBuilder().setValue("dog").build();
|
||||
|
|
@ -529,7 +529,7 @@ public abstract class AbstractTransportTest {
|
|||
stub = MetadataUtils.attachHeaders(stub, fixedHeaders);
|
||||
// .. and expect it to be echoed back in trailers
|
||||
AtomicReference<Metadata> trailersCapture = new AtomicReference<Metadata>();
|
||||
AtomicReference<Metadata.Headers> headersCapture = new AtomicReference<Metadata.Headers>();
|
||||
AtomicReference<Metadata> headersCapture = new AtomicReference<Metadata>();
|
||||
stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
|
||||
|
||||
Assert.assertNotNull(stub.emptyCall(Empty.getDefaultInstance()));
|
||||
|
|
@ -544,7 +544,7 @@ public abstract class AbstractTransportTest {
|
|||
TestServiceGrpc.TestServiceStub stub = TestServiceGrpc.newStub(channel);
|
||||
|
||||
// Capture the context exchange
|
||||
Metadata.Headers fixedHeaders = new Metadata.Headers();
|
||||
Metadata fixedHeaders = new Metadata();
|
||||
// Send a context proto (as it's in the default extension registry)
|
||||
Messages.SimpleContext contextValue =
|
||||
Messages.SimpleContext.newBuilder().setValue("dog").build();
|
||||
|
|
@ -552,7 +552,7 @@ public abstract class AbstractTransportTest {
|
|||
stub = MetadataUtils.attachHeaders(stub, fixedHeaders);
|
||||
// .. and expect it to be echoed back in trailers
|
||||
AtomicReference<Metadata> trailersCapture = new AtomicReference<Metadata>();
|
||||
AtomicReference<Metadata.Headers> headersCapture = new AtomicReference<Metadata.Headers>();
|
||||
AtomicReference<Metadata> headersCapture = new AtomicReference<Metadata>();
|
||||
stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
|
||||
|
||||
List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@
|
|||
|
||||
package io.grpc.netty;
|
||||
|
||||
import static io.grpc.internal.GrpcUtil.AUTHORITY_KEY;
|
||||
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
|
@ -120,7 +121,7 @@ class NettyClientTransport implements ClientTransport {
|
|||
}
|
||||
|
||||
@Override
|
||||
public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata.Headers headers,
|
||||
public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers,
|
||||
ClientStreamListener listener) {
|
||||
Preconditions.checkNotNull(method, "method");
|
||||
Preconditions.checkNotNull(headers, "headers");
|
||||
|
|
@ -131,8 +132,11 @@ class NettyClientTransport implements ClientTransport {
|
|||
|
||||
// Convert the headers into Netty HTTP/2 headers.
|
||||
AsciiString defaultPath = new AsciiString("/" + method.getFullMethodName());
|
||||
AsciiString defaultAuthority = new AsciiString(headers.containsKey(AUTHORITY_KEY)
|
||||
? headers.get(AUTHORITY_KEY) : authority);
|
||||
headers.removeAll(AUTHORITY_KEY);
|
||||
Http2Headers http2Headers = Utils.convertClientHeaders(headers, negotiationHandler.scheme(),
|
||||
defaultPath, authority);
|
||||
defaultPath, defaultAuthority);
|
||||
|
||||
ChannelFutureListener failureListener = new ChannelFutureListener() {
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -39,12 +39,11 @@ import static io.grpc.netty.Utils.TE_TRAILERS;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import io.grpc.Metadata.Headers;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.ServerStreamListener;
|
||||
import io.grpc.internal.ServerTransportListener;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
|
|
@ -151,7 +150,7 @@ class NettyServerHandler extends Http2ConnectionHandler {
|
|||
http2Stream.setProperty(streamKey, stream);
|
||||
String method = determineMethod(streamId, headers);
|
||||
|
||||
Headers metadata = Utils.convertHeaders(headers);
|
||||
Metadata metadata = Utils.convertHeaders(headers);
|
||||
ServerStreamListener listener =
|
||||
transportListener.streamCreated(stream, method, metadata);
|
||||
stream.setListener(listener);
|
||||
|
|
|
|||
|
|
@ -87,7 +87,7 @@ class NettyServerStream extends AbstractServerStream<Integer> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void internalSendHeaders(Metadata.Headers headers) {
|
||||
protected void internalSendHeaders(Metadata headers) {
|
||||
writeQueue.enqueue(new SendResponseHeadersCommand(id(),
|
||||
Utils.convertServerHeaders(headers), false),
|
||||
true);
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@
|
|||
|
||||
package io.grpc.netty;
|
||||
|
||||
import static io.grpc.internal.GrpcUtil.AUTHORITY_KEY;
|
||||
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
|
||||
import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
|
||||
import static io.netty.util.CharsetUtil.UTF_8;
|
||||
|
|
@ -79,16 +80,8 @@ class Utils {
|
|||
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP =
|
||||
new DefaultEventLoopGroupResource(0, "grpc-default-worker-ELG");
|
||||
|
||||
public static Metadata.Headers convertHeaders(Http2Headers http2Headers) {
|
||||
Metadata.Headers headers = new Metadata.Headers(convertHeadersToArray(http2Headers));
|
||||
if (http2Headers.authority() != null) {
|
||||
// toString() here is safe since it doesn't use the default Charset.
|
||||
headers.setAuthority(http2Headers.authority().toString());
|
||||
}
|
||||
if (http2Headers.path() != null) {
|
||||
headers.setPath(http2Headers.path().toString());
|
||||
}
|
||||
return headers;
|
||||
public static Metadata convertHeaders(Http2Headers http2Headers) {
|
||||
return new Metadata(convertHeadersToArray(http2Headers));
|
||||
}
|
||||
|
||||
private static byte[][] convertHeadersToArray(Http2Headers http2Headers) {
|
||||
|
|
@ -103,7 +96,7 @@ class Utils {
|
|||
return TransportFrameUtil.toRawSerializedHeaders(headerValues);
|
||||
}
|
||||
|
||||
public static Http2Headers convertClientHeaders(Metadata.Headers headers,
|
||||
public static Http2Headers convertClientHeaders(Metadata headers,
|
||||
ByteString scheme,
|
||||
ByteString defaultPath,
|
||||
ByteString defaultAuthority) {
|
||||
|
|
@ -121,11 +114,8 @@ class Utils {
|
|||
.set(TE_HEADER, TE_TRAILERS);
|
||||
|
||||
// Override the default authority and path if provided by the headers.
|
||||
if (headers.getAuthority() != null) {
|
||||
http2Headers.authority(new ByteString(headers.getAuthority().getBytes(UTF_8)));
|
||||
}
|
||||
if (headers.getPath() != null) {
|
||||
http2Headers.path(new ByteString(headers.getPath().getBytes(UTF_8)));
|
||||
if (headers.containsKey(AUTHORITY_KEY)) {
|
||||
http2Headers.authority(new ByteString(headers.get(AUTHORITY_KEY).getBytes(UTF_8)));
|
||||
}
|
||||
|
||||
// Set the User-Agent header.
|
||||
|
|
@ -135,7 +125,7 @@ class Utils {
|
|||
return http2Headers;
|
||||
}
|
||||
|
||||
public static Http2Headers convertServerHeaders(Metadata.Headers headers) {
|
||||
public static Http2Headers convertServerHeaders(Metadata headers) {
|
||||
Http2Headers http2Headers = convertMetadata(headers);
|
||||
http2Headers.set(CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC);
|
||||
http2Headers.status(STATUS_OK);
|
||||
|
|
|
|||
|
|
@ -206,7 +206,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
|
|||
stream().id(STREAM_ID);
|
||||
Http2Headers headers = grpcResponseHeaders();
|
||||
stream().transportHeadersReceived(headers, false);
|
||||
verify(listener).headersRead(any(Metadata.Headers.class));
|
||||
verify(listener).headersRead(any(Metadata.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -129,7 +129,7 @@ public class NettyClientTransportTest {
|
|||
// Verify that the received headers contained the User-Agent.
|
||||
assertEquals(1, serverListener.streamListeners.size());
|
||||
|
||||
Metadata.Headers headers = serverListener.streamListeners.get(0).headers;
|
||||
Metadata headers = serverListener.streamListeners.get(0).headers;
|
||||
assertEquals(GrpcUtil.getGrpcUserAgent("netty", null), headers.get(USER_AGENT_KEY));
|
||||
}
|
||||
|
||||
|
|
@ -141,13 +141,13 @@ public class NettyClientTransportTest {
|
|||
|
||||
// Send a single RPC and wait for the response.
|
||||
String userAgent = "testUserAgent";
|
||||
Metadata.Headers sentHeaders = new Metadata.Headers();
|
||||
Metadata sentHeaders = new Metadata();
|
||||
sentHeaders.put(USER_AGENT_KEY, userAgent);
|
||||
new Rpc(transport, sentHeaders).halfClose().waitForResponse();
|
||||
|
||||
// Verify that the received headers contained the User-Agent.
|
||||
assertEquals(1, serverListener.streamListeners.size());
|
||||
Metadata.Headers receivedHeaders = serverListener.streamListeners.get(0).headers;
|
||||
Metadata receivedHeaders = serverListener.streamListeners.get(0).headers;
|
||||
assertEquals(GrpcUtil.getGrpcUserAgent("netty", userAgent),
|
||||
receivedHeaders.get(USER_AGENT_KEY));
|
||||
}
|
||||
|
|
@ -249,10 +249,10 @@ public class NettyClientTransportTest {
|
|||
final TestClientStreamListener listener = new TestClientStreamListener();
|
||||
|
||||
Rpc(NettyClientTransport transport) {
|
||||
this(transport, new Metadata.Headers());
|
||||
this(transport, new Metadata());
|
||||
}
|
||||
|
||||
Rpc(NettyClientTransport transport, Metadata.Headers headers) {
|
||||
Rpc(NettyClientTransport transport, Metadata headers) {
|
||||
stream = transport.newStream(METHOD, headers, listener);
|
||||
stream.request(1);
|
||||
stream.writeMessage(new ByteArrayInputStream(MESSAGE.getBytes()));
|
||||
|
|
@ -278,7 +278,7 @@ public class NettyClientTransportTest {
|
|||
private final SettableFuture<Void> responseFuture = SettableFuture.create();
|
||||
|
||||
@Override
|
||||
public void headersRead(Metadata.Headers headers) {
|
||||
public void headersRead(Metadata headers) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -305,9 +305,9 @@ public class NettyClientTransportTest {
|
|||
private static final class EchoServerStreamListener implements ServerStreamListener {
|
||||
final ServerStream stream;
|
||||
final String method;
|
||||
final Metadata.Headers headers;
|
||||
final Metadata headers;
|
||||
|
||||
EchoServerStreamListener(ServerStream stream, String method, Metadata.Headers headers) {
|
||||
EchoServerStreamListener(ServerStream stream, String method, Metadata headers) {
|
||||
this.stream = stream;
|
||||
this.method = method;
|
||||
this.headers = headers;
|
||||
|
|
@ -348,7 +348,7 @@ public class NettyClientTransportTest {
|
|||
|
||||
@Override
|
||||
public ServerStreamListener streamCreated(final ServerStream stream, String method,
|
||||
Metadata.Headers headers) {
|
||||
Metadata headers) {
|
||||
EchoServerStreamListener listener = new EchoServerStreamListener(stream, method, headers);
|
||||
streamListeners.add(listener);
|
||||
return listener;
|
||||
|
|
|
|||
|
|
@ -127,7 +127,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
|
|||
|
||||
when(transportListener.streamCreated(any(ServerStream.class),
|
||||
any(String.class),
|
||||
any(Metadata.Headers.class)))
|
||||
any(Metadata.class)))
|
||||
.thenReturn(streamListener);
|
||||
handler = newHandler(transportListener);
|
||||
frameWriter = new DefaultHttp2FrameWriter();
|
||||
|
|
@ -358,7 +358,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
|
|||
ArgumentCaptor.forClass(NettyServerStream.class);
|
||||
ArgumentCaptor<String> methodCaptor = ArgumentCaptor.forClass(String.class);
|
||||
verify(transportListener).streamCreated(streamCaptor.capture(), methodCaptor.capture(),
|
||||
any(Metadata.Headers.class));
|
||||
any(Metadata.class));
|
||||
stream = streamCaptor.getValue();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
|
|||
|
||||
@Test
|
||||
public void writeHeadersShouldSendHeaders() throws Exception {
|
||||
Metadata.Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
stream().writeHeaders(headers);
|
||||
verify(writeQueue).enqueue(new SendResponseHeadersCommand(STREAM_ID,
|
||||
Utils.convertServerHeaders(headers), false), true);
|
||||
|
|
@ -112,7 +112,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
|
|||
|
||||
@Test
|
||||
public void duplicateWriteHeadersShouldFail() throws Exception {
|
||||
Metadata.Headers headers = new Metadata.Headers();
|
||||
Metadata headers = new Metadata();
|
||||
stream().writeHeaders(headers);
|
||||
verify(writeQueue).enqueue(new SendResponseHeadersCommand(STREAM_ID,
|
||||
Utils.convertServerHeaders(headers), false), true);
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@
|
|||
|
||||
package io.grpc.okhttp;
|
||||
|
||||
import static io.grpc.internal.GrpcUtil.AUTHORITY_KEY;
|
||||
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
|
||||
import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
|
||||
|
||||
|
|
@ -63,7 +64,7 @@ public class Headers {
|
|||
* creating a stream. Since this serializes the headers, this method should be called in the
|
||||
* application thread context.
|
||||
*/
|
||||
public static List<Header> createRequestHeaders(Metadata.Headers headers, String defaultPath,
|
||||
public static List<Header> createRequestHeaders(Metadata headers, String defaultPath,
|
||||
String defaultAuthority) {
|
||||
Preconditions.checkNotNull(headers, "headers");
|
||||
Preconditions.checkNotNull(defaultPath, "defaultPath");
|
||||
|
|
@ -74,9 +75,12 @@ public class Headers {
|
|||
// Set GRPC-specific headers.
|
||||
okhttpHeaders.add(SCHEME_HEADER);
|
||||
okhttpHeaders.add(METHOD_HEADER);
|
||||
String authority = headers.getAuthority() != null ? headers.getAuthority() : defaultAuthority;
|
||||
|
||||
String authority = headers.containsKey(AUTHORITY_KEY)
|
||||
? headers.get(AUTHORITY_KEY) : defaultAuthority;
|
||||
headers.removeAll(AUTHORITY_KEY);
|
||||
okhttpHeaders.add(new Header(Header.TARGET_AUTHORITY, authority));
|
||||
String path = headers.getPath() != null ? headers.getPath() : defaultPath;
|
||||
String path = defaultPath;
|
||||
okhttpHeaders.add(new Header(Header.TARGET_PATH, path));
|
||||
|
||||
String userAgent = GrpcUtil.getGrpcUserAgent("okhttp", headers.get(USER_AGENT_KEY));
|
||||
|
|
|
|||
|
|
@ -241,7 +241,7 @@ class OkHttpClientTransport implements ClientTransport {
|
|||
|
||||
@Override
|
||||
public OkHttpClientStream newStream(MethodDescriptor<?, ?> method,
|
||||
Metadata.Headers headers,
|
||||
Metadata headers,
|
||||
ClientStreamListener listener) {
|
||||
Preconditions.checkNotNull(method, "method");
|
||||
Preconditions.checkNotNull(headers, "headers");
|
||||
|
|
|
|||
|
|
@ -45,8 +45,8 @@ class Utils {
|
|||
static final int DEFAULT_WINDOW_SIZE = 65535;
|
||||
static final int CONNECTION_STREAM_ID = 0;
|
||||
|
||||
public static Metadata.Headers convertHeaders(List<Header> http2Headers) {
|
||||
return new Metadata.Headers(convertHeadersToArray(http2Headers));
|
||||
public static Metadata convertHeaders(List<Header> http2Headers) {
|
||||
return new Metadata(convertHeadersToArray(http2Headers));
|
||||
}
|
||||
|
||||
public static Metadata convertTrailers(List<Header> http2Headers) {
|
||||
|
|
|
|||
|
|
@ -196,8 +196,8 @@ public class OkHttpClientTransportTest {
|
|||
initTransport();
|
||||
MockStreamListener listener1 = new MockStreamListener();
|
||||
MockStreamListener listener2 = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener1).request(1);
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener2).request(1);
|
||||
clientTransport.newStream(method, new Metadata(), listener1).request(1);
|
||||
clientTransport.newStream(method, new Metadata(), listener2).request(1);
|
||||
assertEquals(2, activeStreamCount());
|
||||
assertContainStream(3);
|
||||
assertContainStream(5);
|
||||
|
|
@ -219,7 +219,7 @@ public class OkHttpClientTransportTest {
|
|||
final int numMessages = 10;
|
||||
final String message = "Hello Client";
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener).request(numMessages);
|
||||
clientTransport.newStream(method, new Metadata(), listener).request(numMessages);
|
||||
assertContainStream(3);
|
||||
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
|
||||
assertNotNull(listener.headers);
|
||||
|
|
@ -263,7 +263,7 @@ public class OkHttpClientTransportTest {
|
|||
public void invalidInboundHeadersCancelStream() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener).request(1);
|
||||
clientTransport.newStream(method, new Metadata(), listener).request(1);
|
||||
assertContainStream(3);
|
||||
// Empty headers block without correct content type or status
|
||||
frameHandler().headers(false, false, 3, 0, new ArrayList<Header>(),
|
||||
|
|
@ -281,7 +281,7 @@ public class OkHttpClientTransportTest {
|
|||
public void readStatus() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
clientTransport.newStream(method, new Metadata(), listener);
|
||||
assertContainStream(3);
|
||||
frameHandler().headers(true, true, 3, 0, grpcResponseTrailers(), HeadersMode.HTTP_20_HEADERS);
|
||||
listener.waitUntilStreamClosed();
|
||||
|
|
@ -292,7 +292,7 @@ public class OkHttpClientTransportTest {
|
|||
public void receiveReset() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
clientTransport.newStream(method, new Metadata(), listener);
|
||||
assertContainStream(3);
|
||||
frameHandler().rstStream(3, ErrorCode.PROTOCOL_ERROR);
|
||||
listener.waitUntilStreamClosed();
|
||||
|
|
@ -303,7 +303,7 @@ public class OkHttpClientTransportTest {
|
|||
public void cancelStream() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
clientTransport.newStream(method, new Metadata(), listener);
|
||||
getStream(3).cancel(Status.CANCELLED);
|
||||
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
|
||||
listener.waitUntilStreamClosed();
|
||||
|
|
@ -315,7 +315,7 @@ public class OkHttpClientTransportTest {
|
|||
public void headersShouldAddDefaultUserAgent() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
clientTransport.newStream(method, new Metadata(), listener);
|
||||
Header userAgentHeader = new Header(GrpcUtil.USER_AGENT_KEY.name(),
|
||||
GrpcUtil.getGrpcUserAgent("okhttp", null));
|
||||
List<Header> expectedHeaders = Arrays.asList(SCHEME_HEADER, METHOD_HEADER,
|
||||
|
|
@ -332,7 +332,7 @@ public class OkHttpClientTransportTest {
|
|||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
String userAgent = "fakeUserAgent";
|
||||
Metadata.Headers metadata = new Metadata.Headers();
|
||||
Metadata metadata = new Metadata();
|
||||
metadata.put(GrpcUtil.USER_AGENT_KEY, userAgent);
|
||||
clientTransport.newStream(method, metadata, listener);
|
||||
List<Header> expectedHeaders = Arrays.asList(SCHEME_HEADER, METHOD_HEADER,
|
||||
|
|
@ -350,7 +350,7 @@ public class OkHttpClientTransportTest {
|
|||
public void cancelStreamForDeadlineExceeded() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
clientTransport.newStream(method, new Metadata(), listener);
|
||||
getStream(3).cancel(Status.DEADLINE_EXCEEDED);
|
||||
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
|
||||
listener.waitUntilStreamClosed();
|
||||
|
|
@ -362,7 +362,7 @@ public class OkHttpClientTransportTest {
|
|||
final String message = "Hello Server";
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
OkHttpClientStream stream =
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
clientTransport.newStream(method, new Metadata(), listener);
|
||||
InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
|
||||
assertEquals(12, input.available());
|
||||
stream.writeMessage(input);
|
||||
|
|
@ -380,8 +380,8 @@ public class OkHttpClientTransportTest {
|
|||
initTransport();
|
||||
MockStreamListener listener1 = new MockStreamListener();
|
||||
MockStreamListener listener2 = new MockStreamListener();
|
||||
clientTransport.newStream(method,new Metadata.Headers(), listener1).request(2);
|
||||
clientTransport.newStream(method,new Metadata.Headers(), listener2).request(2);
|
||||
clientTransport.newStream(method,new Metadata(), listener1).request(2);
|
||||
clientTransport.newStream(method,new Metadata(), listener2).request(2);
|
||||
assertEquals(2, activeStreamCount());
|
||||
OkHttpClientStream stream1 = getStream(3);
|
||||
OkHttpClientStream stream2 = getStream(5);
|
||||
|
|
@ -438,7 +438,7 @@ public class OkHttpClientTransportTest {
|
|||
public void windowUpdateWithInboundFlowControl() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener).request(1);
|
||||
clientTransport.newStream(method, new Metadata(), listener).request(1);
|
||||
|
||||
int messageLength = Utils.DEFAULT_WINDOW_SIZE / 2 + 1;
|
||||
byte[] fakeMessage = new byte[messageLength];
|
||||
|
|
@ -462,7 +462,7 @@ public class OkHttpClientTransportTest {
|
|||
public void outboundFlowControl() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata(), listener);
|
||||
|
||||
// The first message should be sent out.
|
||||
int messageLength = Utils.DEFAULT_WINDOW_SIZE / 2 + 1;
|
||||
|
|
@ -496,7 +496,7 @@ public class OkHttpClientTransportTest {
|
|||
public void outboundFlowControlWithInitialWindowSizeChange() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata(), listener);
|
||||
int messageLength = 20;
|
||||
setInitialWindowSize(HEADER_LENGTH + 10);
|
||||
InputStream input = new ByteArrayInputStream(new byte[messageLength]);
|
||||
|
|
@ -541,9 +541,9 @@ public class OkHttpClientTransportTest {
|
|||
MockStreamListener listener1 = new MockStreamListener();
|
||||
MockStreamListener listener2 = new MockStreamListener();
|
||||
OkHttpClientStream stream1
|
||||
= clientTransport.newStream(method, new Metadata.Headers(), listener1);
|
||||
= clientTransport.newStream(method, new Metadata(), listener1);
|
||||
OkHttpClientStream stream2
|
||||
= clientTransport.newStream(method, new Metadata.Headers(), listener2);
|
||||
= clientTransport.newStream(method, new Metadata(), listener2);
|
||||
assertEquals(2, activeStreamCount());
|
||||
clientTransport.shutdown();
|
||||
verify(frameWriter, timeout(TIME_OUT_MS)).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any());
|
||||
|
|
@ -567,8 +567,8 @@ public class OkHttpClientTransportTest {
|
|||
// start 2 streams.
|
||||
MockStreamListener listener1 = new MockStreamListener();
|
||||
MockStreamListener listener2 = new MockStreamListener();
|
||||
clientTransport.newStream(method,new Metadata.Headers(), listener1).request(1);
|
||||
clientTransport.newStream(method,new Metadata.Headers(), listener2).request(1);
|
||||
clientTransport.newStream(method,new Metadata(), listener1).request(1);
|
||||
clientTransport.newStream(method,new Metadata(), listener2).request(1);
|
||||
assertEquals(2, activeStreamCount());
|
||||
|
||||
// Receive goAway, max good id is 3.
|
||||
|
|
@ -620,7 +620,7 @@ public class OkHttpClientTransportTest {
|
|||
initTransport(startId);
|
||||
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener).request(1);
|
||||
clientTransport.newStream(method, new Metadata(), listener).request(1);
|
||||
|
||||
// New stream should be failed.
|
||||
assertNewStreamFail();
|
||||
|
|
@ -653,11 +653,11 @@ public class OkHttpClientTransportTest {
|
|||
final MockStreamListener listener1 = new MockStreamListener();
|
||||
final MockStreamListener listener2 = new MockStreamListener();
|
||||
OkHttpClientStream stream1
|
||||
= clientTransport.newStream(method, new Metadata.Headers(), listener1);
|
||||
= clientTransport.newStream(method, new Metadata(), listener1);
|
||||
|
||||
// The second stream should be pending.
|
||||
OkHttpClientStream stream2 =
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener2);
|
||||
clientTransport.newStream(method, new Metadata(), listener2);
|
||||
String sentMessage = "hello";
|
||||
InputStream input = new ByteArrayInputStream(sentMessage.getBytes(UTF_8));
|
||||
assertEquals(5, input.available());
|
||||
|
|
@ -690,7 +690,7 @@ public class OkHttpClientTransportTest {
|
|||
setMaxConcurrentStreams(0);
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
OkHttpClientStream stream
|
||||
= clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
= clientTransport.newStream(method, new Metadata(), listener);
|
||||
waitForStreamPending(1);
|
||||
stream.sendCancel(Status.CANCELLED);
|
||||
// The second cancel should be an no-op.
|
||||
|
|
@ -706,9 +706,9 @@ public class OkHttpClientTransportTest {
|
|||
setMaxConcurrentStreams(1);
|
||||
final MockStreamListener listener1 = new MockStreamListener();
|
||||
final MockStreamListener listener2 = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener1);
|
||||
clientTransport.newStream(method, new Metadata(), listener1);
|
||||
// The second stream should be pending.
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener2);
|
||||
clientTransport.newStream(method, new Metadata(), listener2);
|
||||
|
||||
waitForStreamPending(1);
|
||||
assertEquals(1, activeStreamCount());
|
||||
|
|
@ -731,7 +731,7 @@ public class OkHttpClientTransportTest {
|
|||
setMaxConcurrentStreams(0);
|
||||
final MockStreamListener listener = new MockStreamListener();
|
||||
// The second stream should be pending.
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
clientTransport.newStream(method, new Metadata(), listener);
|
||||
waitForStreamPending(1);
|
||||
|
||||
clientTransport.shutdown();
|
||||
|
|
@ -752,11 +752,11 @@ public class OkHttpClientTransportTest {
|
|||
final MockStreamListener listener3 = new MockStreamListener();
|
||||
|
||||
OkHttpClientStream stream1 =
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener1);
|
||||
clientTransport.newStream(method, new Metadata(), listener1);
|
||||
|
||||
// The second and third stream should be pending.
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener2);
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener3);
|
||||
clientTransport.newStream(method, new Metadata(), listener2);
|
||||
clientTransport.newStream(method, new Metadata(), listener3);
|
||||
|
||||
waitForStreamPending(2);
|
||||
assertEquals(1, activeStreamCount());
|
||||
|
|
@ -777,7 +777,7 @@ public class OkHttpClientTransportTest {
|
|||
public void receivingWindowExceeded() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener).request(1);
|
||||
clientTransport.newStream(method, new Metadata(), listener).request(1);
|
||||
|
||||
frameHandler().headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
|
||||
|
||||
|
|
@ -821,7 +821,7 @@ public class OkHttpClientTransportTest {
|
|||
private void shouldHeadersBeFlushed(boolean shouldBeFlushed) throws Exception {
|
||||
initTransport();
|
||||
OkHttpClientStream stream = clientTransport.newStream(
|
||||
method, new Metadata.Headers(), new MockStreamListener());
|
||||
method, new Metadata(), new MockStreamListener());
|
||||
verify(frameWriter, timeout(TIME_OUT_MS)).synStream(
|
||||
eq(false), eq(false), eq(3), eq(0), Matchers.anyListOf(Header.class));
|
||||
if (shouldBeFlushed) {
|
||||
|
|
@ -836,7 +836,7 @@ public class OkHttpClientTransportTest {
|
|||
public void receiveDataWithoutHeader() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method,new Metadata.Headers(), listener).request(1);
|
||||
clientTransport.newStream(method,new Metadata(), listener).request(1);
|
||||
Buffer buffer = createMessageFrame(new byte[1]);
|
||||
frameHandler().data(false, 3, buffer, (int) buffer.size());
|
||||
|
||||
|
|
@ -854,7 +854,7 @@ public class OkHttpClientTransportTest {
|
|||
public void receiveDataWithoutHeaderAndTrailer() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener).request(1);
|
||||
clientTransport.newStream(method, new Metadata(), listener).request(1);
|
||||
Buffer buffer = createMessageFrame(new byte[1]);
|
||||
frameHandler().data(false, 3, buffer, (int) buffer.size());
|
||||
|
||||
|
|
@ -872,7 +872,7 @@ public class OkHttpClientTransportTest {
|
|||
public void receiveLongEnoughDataWithoutHeaderAndTrailer() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener).request(1);
|
||||
clientTransport.newStream(method, new Metadata(), listener).request(1);
|
||||
Buffer buffer = createMessageFrame(new byte[1000]);
|
||||
frameHandler().data(false, 3, buffer, (int) buffer.size());
|
||||
|
||||
|
|
@ -889,7 +889,7 @@ public class OkHttpClientTransportTest {
|
|||
public void receiveDataForUnknownStreamUpdateConnectionWindow() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata(), listener);
|
||||
stream.cancel(Status.CANCELLED);
|
||||
|
||||
Buffer buffer = createMessageFrame(
|
||||
|
|
@ -913,7 +913,7 @@ public class OkHttpClientTransportTest {
|
|||
public void receiveWindowUpdateForUnknownStream() throws Exception {
|
||||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata(), listener);
|
||||
stream.cancel(Status.CANCELLED);
|
||||
// This should be ignored.
|
||||
frameHandler().windowUpdate(3, 73);
|
||||
|
|
@ -931,7 +931,7 @@ public class OkHttpClientTransportTest {
|
|||
initTransport();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
OkHttpClientStream stream = clientTransport.newStream(
|
||||
method, new Metadata.Headers(), listener);
|
||||
method, new Metadata(), listener);
|
||||
assertTrue(stream.isReady());
|
||||
assertTrue(listener.isOnReadyCalled());
|
||||
stream.cancel(Status.CANCELLED);
|
||||
|
|
@ -946,7 +946,7 @@ public class OkHttpClientTransportTest {
|
|||
setInitialWindowSize(0);
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
OkHttpClientStream stream = clientTransport.newStream(
|
||||
method, new Metadata.Headers(), listener);
|
||||
method, new Metadata(), listener);
|
||||
assertTrue(stream.isReady());
|
||||
// Be notified at the beginning.
|
||||
assertTrue(listener.isOnReadyCalled());
|
||||
|
|
@ -1084,7 +1084,7 @@ public class OkHttpClientTransportTest {
|
|||
initTransportAndDelayConnected();
|
||||
final String message = "Hello Server";
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata(), listener);
|
||||
InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
|
||||
stream.writeMessage(input);
|
||||
stream.flush();
|
||||
|
|
@ -1107,7 +1107,7 @@ public class OkHttpClientTransportTest {
|
|||
initTransportAndDelayConnected();
|
||||
final String message = "Hello Server";
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata(), listener);
|
||||
InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
|
||||
stream.writeMessage(input);
|
||||
stream.flush();
|
||||
|
|
@ -1122,7 +1122,7 @@ public class OkHttpClientTransportTest {
|
|||
public void shutdownDuringConnecting() throws Exception {
|
||||
initTransportAndDelayConnected();
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
clientTransport.newStream(method, new Metadata(), listener);
|
||||
|
||||
clientTransport.shutdown();
|
||||
allowTransportConnected();
|
||||
|
|
@ -1162,7 +1162,7 @@ public class OkHttpClientTransportTest {
|
|||
|
||||
private void assertNewStreamFail() throws Exception {
|
||||
MockStreamListener listener = new MockStreamListener();
|
||||
clientTransport.newStream(method, new Metadata.Headers(), listener);
|
||||
clientTransport.newStream(method, new Metadata(), listener);
|
||||
listener.waitUntilStreamClosed();
|
||||
assertFalse(listener.status.isOk());
|
||||
}
|
||||
|
|
@ -1261,7 +1261,7 @@ public class OkHttpClientTransportTest {
|
|||
|
||||
private static class MockStreamListener implements ClientStreamListener {
|
||||
Status status;
|
||||
Metadata.Headers headers;
|
||||
Metadata headers;
|
||||
Metadata trailers;
|
||||
CountDownLatch closed = new CountDownLatch(1);
|
||||
ArrayList<String> messages = new ArrayList<String>();
|
||||
|
|
@ -1271,7 +1271,7 @@ public class OkHttpClientTransportTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void headersRead(Metadata.Headers headers) {
|
||||
public void headersRead(Metadata headers) {
|
||||
this.headers = headers;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -188,7 +188,7 @@ public class ClientCalls {
|
|||
|
||||
private static <ReqT, RespT> void startCall(ClientCall<ReqT, RespT> call,
|
||||
ClientCall.Listener<RespT> responseListener, boolean streamingResponse) {
|
||||
call.start(responseListener, new Metadata.Headers());
|
||||
call.start(responseListener, new Metadata());
|
||||
if (streamingResponse) {
|
||||
call.request(1);
|
||||
} else {
|
||||
|
|
@ -237,7 +237,7 @@ public class ClientCalls {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onHeaders(Metadata.Headers headers) {
|
||||
public void onHeaders(Metadata headers) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -278,7 +278,7 @@ public class ClientCalls {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onHeaders(Metadata.Headers headers) {
|
||||
public void onHeaders(Metadata headers) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -393,7 +393,7 @@ public class ClientCalls {
|
|||
private boolean done = false;
|
||||
|
||||
@Override
|
||||
public void onHeaders(Metadata.Headers headers) {
|
||||
public void onHeaders(Metadata headers) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ public class MetadataUtils {
|
|||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public static <T extends AbstractStub> T attachHeaders(
|
||||
T stub,
|
||||
final Metadata.Headers extraHeaders) {
|
||||
final Metadata extraHeaders) {
|
||||
return (T) stub.withInterceptors(newAttachHeadersInterceptor(extraHeaders));
|
||||
}
|
||||
|
||||
|
|
@ -68,7 +68,7 @@ public class MetadataUtils {
|
|||
* @param extraHeaders the headers to be passed by each call that is processed by the returned
|
||||
* interceptor
|
||||
*/
|
||||
public static ClientInterceptor newAttachHeadersInterceptor(final Metadata.Headers extraHeaders) {
|
||||
public static ClientInterceptor newAttachHeadersInterceptor(final Metadata extraHeaders) {
|
||||
return new ClientInterceptor() {
|
||||
@Override
|
||||
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
|
||||
|
|
@ -77,7 +77,7 @@ public class MetadataUtils {
|
|||
Channel next) {
|
||||
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
|
||||
@Override
|
||||
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
|
||||
public void start(Listener<RespT> responseListener, Metadata headers) {
|
||||
headers.merge(extraHeaders);
|
||||
super.start(responseListener, headers);
|
||||
}
|
||||
|
|
@ -97,7 +97,7 @@ public class MetadataUtils {
|
|||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public static <T extends AbstractStub> T captureMetadata(
|
||||
T stub,
|
||||
AtomicReference<Metadata.Headers> headersCapture,
|
||||
AtomicReference<Metadata> headersCapture,
|
||||
AtomicReference<Metadata> trailersCapture) {
|
||||
return (T) stub.withInterceptors(
|
||||
newCaptureMetadataInterceptor(headersCapture, trailersCapture));
|
||||
|
|
@ -111,7 +111,7 @@ public class MetadataUtils {
|
|||
* @return an implementation of the channel with captures installed.
|
||||
*/
|
||||
public static ClientInterceptor newCaptureMetadataInterceptor(
|
||||
final AtomicReference<Metadata.Headers> headersCapture,
|
||||
final AtomicReference<Metadata> headersCapture,
|
||||
final AtomicReference<Metadata> trailersCapture) {
|
||||
return new ClientInterceptor() {
|
||||
@Override
|
||||
|
|
@ -121,12 +121,12 @@ public class MetadataUtils {
|
|||
Channel next) {
|
||||
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
|
||||
@Override
|
||||
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
|
||||
public void start(Listener<RespT> responseListener, Metadata headers) {
|
||||
headersCapture.set(null);
|
||||
trailersCapture.set(null);
|
||||
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
|
||||
@Override
|
||||
public void onHeaders(Metadata.Headers headers) {
|
||||
public void onHeaders(Metadata headers) {
|
||||
headersCapture.set(headers);
|
||||
super.onHeaders(headers);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -125,7 +125,7 @@ public class ServerCalls {
|
|||
public ServerCall.Listener<ReqT> startCall(
|
||||
MethodDescriptor<ReqT, RespT> methodDescriptor,
|
||||
final ServerCall<RespT> call,
|
||||
Metadata.Headers headers) {
|
||||
Metadata headers) {
|
||||
final ResponseObserver<RespT> responseObserver = new ResponseObserver<RespT>(call);
|
||||
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
|
||||
// sends more than 1 requests, we will catch it in onMessage() and emit INVALID_ARGUMENT.
|
||||
|
|
@ -177,7 +177,7 @@ public class ServerCalls {
|
|||
public ServerCall.Listener<ReqT> startCall(
|
||||
MethodDescriptor<ReqT, RespT> methodDescriptor,
|
||||
final ServerCall<RespT> call,
|
||||
Metadata.Headers headers) {
|
||||
Metadata headers) {
|
||||
call.request(1);
|
||||
final ResponseObserver<RespT> responseObserver = new ResponseObserver<RespT>(call);
|
||||
final StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ public class ClientCallsTest {
|
|||
Integer req = 2;
|
||||
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
|
||||
ArgumentCaptor<ClientCall.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(call).start(listenerCaptor.capture(), any(Metadata.Headers.class));
|
||||
verify(call).start(listenerCaptor.capture(), any(Metadata.class));
|
||||
ClientCall.Listener<String> listener = listenerCaptor.getValue();
|
||||
verify(call).sendMessage(req);
|
||||
verify(call).halfClose();
|
||||
|
|
@ -82,7 +82,7 @@ public class ClientCallsTest {
|
|||
Integer req = 2;
|
||||
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
|
||||
ArgumentCaptor<ClientCall.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(call).start(listenerCaptor.capture(), any(Metadata.Headers.class));
|
||||
verify(call).start(listenerCaptor.capture(), any(Metadata.class));
|
||||
ClientCall.Listener<String> listener = listenerCaptor.getValue();
|
||||
listener.onClose(Status.INVALID_ARGUMENT, new Metadata());
|
||||
try {
|
||||
|
|
@ -98,7 +98,7 @@ public class ClientCallsTest {
|
|||
Integer req = 2;
|
||||
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
|
||||
ArgumentCaptor<ClientCall.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
|
||||
verify(call).start(listenerCaptor.capture(), any(Metadata.Headers.class));
|
||||
verify(call).start(listenerCaptor.capture(), any(Metadata.class));
|
||||
ClientCall.Listener<String> listener = listenerCaptor.getValue();
|
||||
future.cancel(true);
|
||||
verify(call).cancel();
|
||||
|
|
|
|||
|
|
@ -84,14 +84,14 @@ public class TestUtils {
|
|||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||
MethodDescriptor<ReqT, RespT> method,
|
||||
ServerCall<RespT> call,
|
||||
final Metadata.Headers requestHeaders,
|
||||
final Metadata requestHeaders,
|
||||
ServerCallHandler<ReqT, RespT> next) {
|
||||
return next.startCall(method,
|
||||
new SimpleForwardingServerCall<RespT>(call) {
|
||||
boolean sentHeaders;
|
||||
|
||||
@Override
|
||||
public void sendHeaders(Metadata.Headers responseHeaders) {
|
||||
public void sendHeaders(Metadata responseHeaders) {
|
||||
responseHeaders.merge(requestHeaders, keySet);
|
||||
super.sendHeaders(responseHeaders);
|
||||
sentHeaders = true;
|
||||
|
|
@ -100,7 +100,7 @@ public class TestUtils {
|
|||
@Override
|
||||
public void sendMessage(RespT message) {
|
||||
if (!sentHeaders) {
|
||||
sendHeaders(new Metadata.Headers());
|
||||
sendHeaders(new Metadata());
|
||||
}
|
||||
super.sendMessage(message);
|
||||
}
|
||||
|
|
@ -121,13 +121,13 @@ public class TestUtils {
|
|||
* {@link #echoRequestHeadersInterceptor}.
|
||||
*/
|
||||
public static ServerInterceptor recordRequestHeadersInterceptor(
|
||||
final AtomicReference<Metadata.Headers> headersCapture) {
|
||||
final AtomicReference<Metadata> headersCapture) {
|
||||
return new ServerInterceptor() {
|
||||
@Override
|
||||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||
MethodDescriptor<ReqT, RespT> method,
|
||||
ServerCall<RespT> call,
|
||||
Metadata.Headers requestHeaders,
|
||||
Metadata requestHeaders,
|
||||
ServerCallHandler<ReqT, RespT> next) {
|
||||
headersCapture.set(requestHeaders);
|
||||
return next.startCall(method, call, requestHeaders);
|
||||
|
|
|
|||
Loading…
Reference in New Issue