add netty

Signed-off-by: marsishandsome <marsishandsome@gmail.com>
This commit is contained in:
marsishandsome 2022-01-14 21:37:42 +08:00
parent c3e585d555
commit 3f6ca69910
6 changed files with 2354 additions and 72 deletions

10
pom.xml
View File

@ -78,6 +78,16 @@
</properties>
<dependencies>
<dependency>
<groupId>io.perfmark</groupId>
<artifactId>perfmark-api</artifactId>
<version>0.24.0</version>
</dependency>
<dependency>
<groupId>io.perfmark</groupId>
<artifactId>perfmark-traceviewer</artifactId>
<version>0.24.0</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,350 @@
/*
* Copyright 2015 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.netty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.perfmark.Link;
import io.perfmark.PerfMark;
import io.prometheus.client.Histogram;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
/** A queue of pending writes to a {@link Channel} that is flushed as a single unit. */
class WriteQueue {
// Dequeue in chunks, so we don't have to acquire the queue's log too often.
@VisibleForTesting static final int DEQUE_CHUNK_SIZE = 128;
/** {@link Runnable} used to schedule work onto the tail of the event loop. */
private final Runnable later =
new Runnable() {
@Override
public void run() {
flush();
}
};
private final Channel channel;
private final Queue<Pair<QueuedCommand, Long>> queue;
private final AtomicBoolean scheduled = new AtomicBoolean();
private static final Logger logger = Logger.getLogger(WriteQueue.class.getName());
public static final Histogram writeQueuePendingDuration =
Histogram.build()
.name("grpc_netty_write_queue_pending_duration_ms")
.help("Pending duration of a task in the write queue.")
.register();
public static final Histogram writeQueueWaitBatchDuration =
Histogram.build()
.name("grpc_netty_write_queue_wait_batch_duration_seconds")
.help("Duration of waiting a batch filled in the write queue.")
.register();
public static final Histogram writeQueueBatchSize =
Histogram.build()
.name("grpc_netty_write_queue_batch_size")
.help("Number of tasks in a batch in the write queue.")
.register();
public static final Histogram writeQueueCmdRunDuration =
Histogram.build()
.name("grpc_netty_write_queue_cmd_run_duration_seconds")
.help("Duration of a task execution in the write queue.")
.labelNames("type")
.register();
public static final Histogram writeQueueChannelFlushDuration =
Histogram.build()
.name("grpc_netty_write_queue_channel_flush_duration_seconds")
.help("Duration of a channel flush in the write queue.")
.labelNames("phase")
.register();
public static final Histogram writeQueueFlushDuration =
Histogram.build()
.name("grpc_netty_write_queue_flush_duration_seconds")
.help("Duration of a flush of the write queue.")
.register();
public WriteQueue(Channel channel) {
this.channel = Preconditions.checkNotNull(channel, "channel");
queue = new ConcurrentLinkedQueue<>();
}
/** Schedule a flush on the channel. */
void scheduleFlush() {
if (scheduled.compareAndSet(false, true)) {
// Add the queue to the tail of the event loop so writes will be executed immediately
// inside the event loop. Note DO NOT do channel.write outside the event loop as
// it will not wake up immediately without a flush.
channel.eventLoop().execute(later);
}
}
/**
* Enqueue a write command on the channel.
*
* @param command a write to be executed on the channel.
* @param flush true if a flush of the write should be schedule, false if a later call to enqueue
* will schedule the flush.
*/
@CanIgnoreReturnValue
ChannelFuture enqueue(QueuedCommand command, boolean flush) {
// Detect erroneous code that tries to reuse command objects.
Preconditions.checkArgument(command.promise() == null, "promise must not be set on command");
ChannelPromise promise = channel.newPromise();
command.promise(promise);
queue.add(Pair.of(command, System.nanoTime()));
if (flush) {
scheduleFlush();
}
return promise;
}
/**
* Enqueue the runnable. It is not safe for another thread to queue an Runnable directly to the
* event loop, because it will be out-of-order with writes. This method allows the Runnable to be
* processed in-order with writes.
*/
void enqueue(Runnable runnable, boolean flush) {
Long now = System.nanoTime();
queue.add(Pair.<QueuedCommand, Long>of(new RunnableCommand(runnable), now));
if (flush) {
scheduleFlush();
}
}
/**
* Executes enqueued work directly on the current thread. This can be used to trigger writes
* before performing additional reads. Must be called from the event loop. This method makes no
* guarantee that the work queue is empty when it returns.
*/
void drainNow() {
Preconditions.checkState(channel.eventLoop().inEventLoop(), "must be on the event loop");
if (queue.peek() == null) {
return;
}
flush();
}
/**
* Process the queue of commands and dispatch them to the stream. This method is only called in
* the event loop
*/
private void flush() {
Histogram.Timer flushTimer = writeQueueFlushDuration.startTimer();
List<Record> batch = new ArrayList<>();
PerfMark.startTask("WriteQueue.periodicFlush");
long start = System.nanoTime();
try {
Pair<QueuedCommand, Long> item;
int i = 0;
boolean flushedOnce = false;
Histogram.Timer waitBatchTimer = writeQueueWaitBatchDuration.startTimer();
while ((item = queue.poll()) != null) {
QueuedCommand cmd = item.getLeft();
writeQueuePendingDuration.observe((System.nanoTime() - item.getRight()) / 1_000_000.0);
Record cmdRecord = new Record(cmd.toString());
Histogram.Timer cmdTimer =
writeQueueCmdRunDuration.labels(cmd.getClass().getSimpleName()).startTimer();
// Run the command
cmd.run(channel);
cmdTimer.observeDuration();
cmdRecord.end();
batch.add(cmdRecord);
if (++i == DEQUE_CHUNK_SIZE) {
waitBatchTimer.observeDuration();
waitBatchTimer = writeQueueWaitBatchDuration.startTimer();
i = 0;
// Flush each chunk so we are releasing buffers periodically. In theory this loop
// might never end as new events are continuously added to the queue, if we never
// flushed in that case we would be guaranteed to OOM.
PerfMark.startTask("WriteQueue.flush0");
Histogram.Timer channelFlushTimer =
writeQueueChannelFlushDuration.labels("flush0").startTimer();
Record flushRecord = new Record("flush0");
try {
channel.flush();
} finally {
writeQueueBatchSize.observe(DEQUE_CHUNK_SIZE);
channelFlushTimer.observeDuration();
flushRecord.end();
batch.add(flushRecord);
PerfMark.stopTask("WriteQueue.flush0");
}
flushedOnce = true;
}
}
// Must flush at least once, even if there were no writes.
if (i != 0 || !flushedOnce) {
PerfMark.startTask("WriteQueue.flush1");
Histogram.Timer channelFlushTimer =
writeQueueChannelFlushDuration.labels("flush1").startTimer();
Record flushRecord = new Record("flush1");
try {
channel.flush();
} finally {
waitBatchTimer.observeDuration();
writeQueueBatchSize.observe(i);
channelFlushTimer.observeDuration();
flushRecord.end();
batch.add(flushRecord);
PerfMark.stopTask("WriteQueue.flush1");
}
}
} finally {
PerfMark.stopTask("WriteQueue.periodicFlush");
flushTimer.observeDuration();
if (System.nanoTime() - start > 50_000_000) {
String msg = "Found slow batch. WriteQueue.flush: " + batch;
logger.log(Level.WARNING, msg);
System.out.println(msg);
}
// Mark the write as done, if the queue is non-empty after marking trigger a new write.
scheduled.set(false);
if (!queue.isEmpty()) {
scheduleFlush();
}
}
}
private static class RunnableCommand implements QueuedCommand {
private final Runnable runnable;
private final Link link;
public RunnableCommand(Runnable runnable) {
this.link = PerfMark.linkOut();
this.runnable = runnable;
}
@Override
public final void promise(ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public final ChannelPromise promise() {
throw new UnsupportedOperationException();
}
@Override
public final void run(Channel channel) {
runnable.run();
}
@Override
public Link getLink() {
return link;
}
}
abstract static class AbstractQueuedCommand implements QueuedCommand {
private ChannelPromise promise;
private final Link link;
AbstractQueuedCommand() {
this.link = PerfMark.linkOut();
}
@Override
public final void promise(ChannelPromise promise) {
this.promise = promise;
}
@Override
public final ChannelPromise promise() {
return promise;
}
@Override
public final void run(Channel channel) {
channel.write(this, promise);
}
@Override
public Link getLink() {
return link;
}
}
/** Simple wrapper type around a command and its optional completion listener. */
interface QueuedCommand {
/** Returns the promise beeing notified of the success/failure of the write. */
ChannelPromise promise();
/** Sets the promise. */
void promise(ChannelPromise promise);
void run(Channel channel);
Link getLink();
}
private static class Record {
long start;
long end;
long durationMS;
String name;
public Record(String name) {
this.name = name;
this.start = System.nanoTime();
}
public void end() {
this.end = System.nanoTime();
this.durationMS = (end - start) / 1_000_000;
}
public String toString() {
return "Record{"
+ "start="
+ start
+ ", end="
+ end
+ ", durationMS="
+ durationMS
+ ", name='"
+ name
+ '\''
+ '}';
}
}
}

View File

@ -0,0 +1,791 @@
/*
* Copyright 2014 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.stub;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.prometheus.client.Histogram;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* Utility functions for processing different call idioms. We have one-to-one correspondence between
* utilities in this class and the potential signatures in a generated stub class so that the
* runtime can vary behavior without requiring regeneration of the stub.
*/
public final class ClientCalls {
private static final Logger logger = Logger.getLogger(ClientCalls.class.getName());
public static final Histogram asyncUnaryRequestCallDuration =
Histogram.build()
.name("grpc_client_async_unary_request_call_duration_seconds")
.help("Histogram of time spent in asyncUnaryRequestCall")
.labelNames("phase")
.register();
// Prevent instantiation
private ClientCalls() {}
/**
* Executes a unary call with a response {@link StreamObserver}. The {@code call} should not be
* already started. After calling this method, {@code call} should no longer be used.
*
* <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
* {@code beforeStart()} will be called.
*/
public static <ReqT, RespT> void asyncUnaryCall(
ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
asyncUnaryRequestCall(call, req, responseObserver, false);
}
/**
* Executes a server-streaming call with a response {@link StreamObserver}. The {@code call}
* should not be already started. After calling this method, {@code call} should no longer be
* used.
*
* <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
* {@code beforeStart()} will be called.
*/
public static <ReqT, RespT> void asyncServerStreamingCall(
ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
asyncUnaryRequestCall(call, req, responseObserver, true);
}
/**
* Executes a client-streaming call returning a {@link StreamObserver} for the request messages.
* The {@code call} should not be already started. After calling this method, {@code call} should
* no longer be used.
*
* <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
* {@code beforeStart()} will be called.
*
* @return request stream observer. It will extend {@link ClientCallStreamObserver}
*/
public static <ReqT, RespT> StreamObserver<ReqT> asyncClientStreamingCall(
ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver) {
return asyncStreamingRequestCall(call, responseObserver, false);
}
/**
* Executes a bidirectional-streaming call. The {@code call} should not be already started. After
* calling this method, {@code call} should no longer be used.
*
* <p>If the provided {@code responseObserver} is an instance of {@link ClientResponseObserver},
* {@code beforeStart()} will be called.
*
* @return request stream observer. It will extend {@link ClientCallStreamObserver}
*/
public static <ReqT, RespT> StreamObserver<ReqT> asyncBidiStreamingCall(
ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver) {
return asyncStreamingRequestCall(call, responseObserver, true);
}
/**
* Executes a unary call and blocks on the response. The {@code call} should not be already
* started. After calling this method, {@code call} should no longer be used.
*
* @return the single response message.
* @throws StatusRuntimeException on error
*/
public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) {
try {
return getUnchecked(futureUnaryCall(call, req));
} catch (RuntimeException e) {
throw cancelThrow(call, e);
} catch (Error e) {
throw cancelThrow(call, e);
}
}
/**
* Executes a unary call and blocks on the response. The {@code call} should not be already
* started. After calling this method, {@code call} should no longer be used.
*
* @return the single response message.
* @throws StatusRuntimeException on error
*/
public static <ReqT, RespT> RespT blockingUnaryCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor();
boolean interrupt = false;
ClientCall<ReqT, RespT> call =
channel.newCall(
method,
callOptions
.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
.withExecutor(executor));
try {
ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
while (!responseFuture.isDone()) {
try {
executor.waitAndDrain();
} catch (InterruptedException e) {
interrupt = true;
call.cancel("Thread interrupted", e);
// Now wait for onClose() to be called, so interceptors can clean up
}
}
return getUnchecked(responseFuture);
} catch (RuntimeException e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} catch (Error e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} finally {
if (interrupt) {
Thread.currentThread().interrupt();
}
}
}
/**
* Executes a server-streaming call returning a blocking {@link Iterator} over the response
* stream. The {@code call} should not be already started. After calling this method, {@code call}
* should no longer be used.
*
* <p>The returned iterator may throw {@link StatusRuntimeException} on error.
*
* @return an iterator over the response stream.
*/
// TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
ClientCall<ReqT, RespT> call, ReqT req) {
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
asyncUnaryRequestCall(call, req, result.listener());
return result;
}
/**
* Executes a server-streaming call returning a blocking {@link Iterator} over the response
* stream. The {@code call} should not be already started. After calling this method, {@code call}
* should no longer be used.
*
* <p>The returned iterator may throw {@link StatusRuntimeException} on error.
*
* @return an iterator over the response stream.
*/
// TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor();
ClientCall<ReqT, RespT> call =
channel.newCall(
method,
callOptions
.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
.withExecutor(executor));
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor);
asyncUnaryRequestCall(call, req, result.listener());
return result;
}
/**
* Executes a unary call and returns a {@link ListenableFuture} to the response. The {@code call}
* should not be already started. After calling this method, {@code call} should no longer be
* used.
*
* @return a future for the single response message.
*/
public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
ClientCall<ReqT, RespT> call, ReqT req) {
GrpcFuture<RespT> responseFuture = new GrpcFuture<>(call);
asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<>(responseFuture));
return responseFuture;
}
/**
* Returns the result of calling {@link Future#get()} interruptibly on a task known not to throw a
* checked exception.
*
* <p>If interrupted, the interrupt is restored before throwing an exception..
*
* @throws java.util.concurrent.CancellationException if {@code get} throws a {@code
* CancellationException}.
* @throws io.grpc.StatusRuntimeException if {@code get} throws an {@link ExecutionException} or
* an {@link InterruptedException}.
*/
private static <V> V getUnchecked(Future<V> future) {
try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Status.CANCELLED
.withDescription("Thread interrupted")
.withCause(e)
.asRuntimeException();
} catch (ExecutionException e) {
throw toStatusRuntimeException(e.getCause());
}
}
/**
* Wraps the given {@link Throwable} in a {@link StatusRuntimeException}. If it contains an
* embedded {@link StatusException} or {@link StatusRuntimeException}, the returned exception will
* contain the embedded trailers and status, with the given exception as the cause. Otherwise, an
* exception will be generated from an {@link Status#UNKNOWN} status.
*/
private static StatusRuntimeException toStatusRuntimeException(Throwable t) {
Throwable cause = checkNotNull(t, "t");
while (cause != null) {
// If we have an embedded status, use it and replace the cause
if (cause instanceof StatusException) {
StatusException se = (StatusException) cause;
return new StatusRuntimeException(se.getStatus(), se.getTrailers());
} else if (cause instanceof StatusRuntimeException) {
StatusRuntimeException se = (StatusRuntimeException) cause;
return new StatusRuntimeException(se.getStatus(), se.getTrailers());
}
cause = cause.getCause();
}
return Status.UNKNOWN.withDescription("unexpected exception").withCause(t).asRuntimeException();
}
/**
* Cancels a call, and throws the exception.
*
* @param t must be a RuntimeException or Error
*/
private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) {
try {
call.cancel(null, t);
} catch (Throwable e) {
assert e instanceof RuntimeException || e instanceof Error;
logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e);
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else if (t instanceof Error) {
throw (Error) t;
}
// should be impossible
throw new AssertionError(t);
}
private static <ReqT, RespT> void asyncUnaryRequestCall(
ClientCall<ReqT, RespT> call,
ReqT req,
StreamObserver<RespT> responseObserver,
boolean streamingResponse) {
asyncUnaryRequestCall(
call,
req,
new StreamObserverToCallListenerAdapter<>(
responseObserver, new CallToStreamObserverAdapter<>(call, streamingResponse)));
}
private static <ReqT, RespT> void asyncUnaryRequestCall(
ClientCall<ReqT, RespT> call, ReqT req, StartableListener<RespT> responseListener) {
Histogram.Timer startCallTimer =
asyncUnaryRequestCallDuration.labels("start_call").startTimer();
startCall(call, responseListener);
startCallTimer.observeDuration();
try {
Histogram.Timer sendMessageTimer =
asyncUnaryRequestCallDuration.labels("send_message").startTimer();
call.sendMessage(req);
sendMessageTimer.observeDuration();
Histogram.Timer halfCloseTimer =
asyncUnaryRequestCallDuration.labels("half_close").startTimer();
call.halfClose();
halfCloseTimer.observeDuration();
} catch (RuntimeException e) {
throw cancelThrow(call, e);
} catch (Error e) {
throw cancelThrow(call, e);
}
}
private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
ClientCall<ReqT, RespT> call,
StreamObserver<RespT> responseObserver,
boolean streamingResponse) {
CallToStreamObserverAdapter<ReqT> adapter =
new CallToStreamObserverAdapter<>(call, streamingResponse);
startCall(call, new StreamObserverToCallListenerAdapter<>(responseObserver, adapter));
return adapter;
}
private static <ReqT, RespT> void startCall(
ClientCall<ReqT, RespT> call, StartableListener<RespT> responseListener) {
call.start(responseListener, new Metadata());
responseListener.onStart();
}
private abstract static class StartableListener<T> extends ClientCall.Listener<T> {
abstract void onStart();
}
private static final class CallToStreamObserverAdapter<T> extends ClientCallStreamObserver<T> {
private boolean frozen;
private final ClientCall<T, ?> call;
private final boolean streamingResponse;
private Runnable onReadyHandler;
private int initialRequest = 1;
private boolean autoRequestEnabled = true;
private boolean aborted = false;
private boolean completed = false;
// Non private to avoid synthetic class
CallToStreamObserverAdapter(ClientCall<T, ?> call, boolean streamingResponse) {
this.call = call;
this.streamingResponse = streamingResponse;
}
private void freeze() {
this.frozen = true;
}
@Override
public void onNext(T value) {
checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
checkState(!completed, "Stream is already completed, no further calls are allowed");
call.sendMessage(value);
}
@Override
public void onError(Throwable t) {
call.cancel("Cancelled by client with StreamObserver.onError()", t);
aborted = true;
}
@Override
public void onCompleted() {
call.halfClose();
completed = true;
}
@Override
public boolean isReady() {
return call.isReady();
}
@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
if (frozen) {
throw new IllegalStateException(
"Cannot alter onReadyHandler after call started. Use ClientResponseObserver");
}
this.onReadyHandler = onReadyHandler;
}
@Deprecated
@Override
public void disableAutoInboundFlowControl() {
disableAutoRequestWithInitial(1);
}
@Override
public void disableAutoRequestWithInitial(int request) {
if (frozen) {
throw new IllegalStateException(
"Cannot disable auto flow control after call started. Use ClientResponseObserver");
}
Preconditions.checkArgument(request >= 0, "Initial requests must be non-negative");
initialRequest = request;
autoRequestEnabled = false;
}
@Override
public void request(int count) {
if (!streamingResponse && count == 1) {
// Initially ask for two responses from flow-control so that if a misbehaving server
// sends more than one responses, we can catch it and fail it in the listener.
call.request(2);
} else {
call.request(count);
}
}
@Override
public void setMessageCompression(boolean enable) {
call.setMessageCompression(enable);
}
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
call.cancel(message, cause);
}
}
private static final class StreamObserverToCallListenerAdapter<ReqT, RespT>
extends StartableListener<RespT> {
private final StreamObserver<RespT> observer;
private final CallToStreamObserverAdapter<ReqT> adapter;
private boolean firstResponseReceived;
// Non private to avoid synthetic class
StreamObserverToCallListenerAdapter(
StreamObserver<RespT> observer, CallToStreamObserverAdapter<ReqT> adapter) {
this.observer = observer;
this.adapter = adapter;
if (observer instanceof ClientResponseObserver) {
@SuppressWarnings("unchecked")
ClientResponseObserver<ReqT, RespT> clientResponseObserver =
(ClientResponseObserver<ReqT, RespT>) observer;
clientResponseObserver.beforeStart(adapter);
}
adapter.freeze();
}
@Override
public void onHeaders(Metadata headers) {}
@Override
public void onMessage(RespT message) {
if (firstResponseReceived && !adapter.streamingResponse) {
throw Status.INTERNAL
.withDescription("More than one responses received for unary or client-streaming call")
.asRuntimeException();
}
firstResponseReceived = true;
observer.onNext(message);
if (adapter.streamingResponse && adapter.autoRequestEnabled) {
// Request delivery of the next inbound message.
adapter.request(1);
}
}
@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
observer.onCompleted();
} else {
observer.onError(status.asRuntimeException(trailers));
}
}
@Override
public void onReady() {
if (adapter.onReadyHandler != null) {
adapter.onReadyHandler.run();
}
}
@Override
void onStart() {
if (adapter.initialRequest > 0) {
adapter.request(adapter.initialRequest);
}
}
}
/** Completes a {@link GrpcFuture} using {@link StreamObserver} events. */
private static final class UnaryStreamToFuture<RespT> extends StartableListener<RespT> {
private final GrpcFuture<RespT> responseFuture;
private RespT value;
// Non private to avoid synthetic class
UnaryStreamToFuture(GrpcFuture<RespT> responseFuture) {
this.responseFuture = responseFuture;
}
@Override
public void onHeaders(Metadata headers) {}
@Override
public void onMessage(RespT value) {
if (this.value != null) {
throw Status.INTERNAL
.withDescription("More than one value received for unary call")
.asRuntimeException();
}
this.value = value;
}
@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
if (value == null) {
// No value received so mark the future as an error
responseFuture.setException(
Status.INTERNAL
.withDescription("No value received for unary call")
.asRuntimeException(trailers));
}
responseFuture.set(value);
} else {
responseFuture.setException(status.asRuntimeException(trailers));
}
}
@Override
void onStart() {
responseFuture.call.request(2);
}
}
private static final class GrpcFuture<RespT> extends AbstractFuture<RespT> {
private final ClientCall<?, RespT> call;
// Non private to avoid synthetic class
GrpcFuture(ClientCall<?, RespT> call) {
this.call = call;
}
@Override
protected void interruptTask() {
call.cancel("GrpcFuture was cancelled", null);
}
@Override
protected boolean set(@Nullable RespT resp) {
return super.set(resp);
}
@Override
protected boolean setException(Throwable throwable) {
return super.setException(throwable);
}
@SuppressWarnings("MissingOverride") // Add @Override once Java 6 support is dropped
protected String pendingToString() {
return MoreObjects.toStringHelper(this).add("clientCall", call).toString();
}
}
/**
* Convert events on a {@link io.grpc.ClientCall.Listener} into a blocking {@link Iterator}.
*
* <p>The class is not thread-safe, but it does permit {@link ClientCall.Listener} calls in a
* separate thread from {@link Iterator} calls.
*/
// TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error.
private static final class BlockingResponseStream<T> implements Iterator<T> {
// Due to flow control, only needs to hold up to 3 items: 2 for value, 1 for close.
// (2 for value, not 1, because of early request() in next())
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(3);
private final StartableListener<T> listener = new QueuingListener();
private final ClientCall<?, T> call;
/** May be null. */
private final ThreadlessExecutor threadless;
// Only accessed when iterating.
private Object last;
// Non private to avoid synthetic class
BlockingResponseStream(ClientCall<?, T> call) {
this(call, null);
}
// Non private to avoid synthetic class
BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless) {
this.call = call;
this.threadless = threadless;
}
StartableListener<T> listener() {
return listener;
}
private Object waitForNext() {
boolean interrupt = false;
try {
if (threadless == null) {
while (true) {
try {
return buffer.take();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
}
}
} else {
Object next;
while ((next = buffer.poll()) == null) {
try {
threadless.waitAndDrain();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, so interceptors can clean up
}
}
return next;
}
} finally {
if (interrupt) {
Thread.currentThread().interrupt();
}
}
}
@Override
public boolean hasNext() {
while (last == null) {
// Will block here indefinitely waiting for content. RPC timeouts defend against permanent
// hangs here as the call will become closed.
last = waitForNext();
}
if (last instanceof StatusRuntimeException) {
// Rethrow the exception with a new stacktrace.
StatusRuntimeException e = (StatusRuntimeException) last;
throw e.getStatus().asRuntimeException(e.getTrailers());
}
return last != this;
}
@Override
public T next() {
// Eagerly call request(1) so it can be processing the next message while we wait for the
// current one, which reduces latency for the next message. With MigratingThreadDeframer and
// if the data has already been recieved, every other message can be delivered instantly. This
// can be run after hasNext(), but just would be slower.
if (!(last instanceof StatusRuntimeException) && last != this) {
call.request(1);
}
if (!hasNext()) {
throw new NoSuchElementException();
}
@SuppressWarnings("unchecked")
T tmp = (T) last;
last = null;
return tmp;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
private final class QueuingListener extends StartableListener<T> {
// Non private to avoid synthetic class
QueuingListener() {}
private boolean done = false;
@Override
public void onHeaders(Metadata headers) {}
@Override
public void onMessage(T value) {
Preconditions.checkState(!done, "ClientCall already closed");
buffer.add(value);
}
@Override
public void onClose(Status status, Metadata trailers) {
Preconditions.checkState(!done, "ClientCall already closed");
if (status.isOk()) {
buffer.add(BlockingResponseStream.this);
} else {
buffer.add(status.asRuntimeException(trailers));
}
done = true;
}
@Override
void onStart() {
call.request(1);
}
}
}
@SuppressWarnings("serial")
private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runnable>
implements Executor {
private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());
private volatile Thread waiter;
// Non private to avoid synthetic class
ThreadlessExecutor() {}
/**
* Waits until there is a Runnable, then executes it and all queued Runnables after it. Must
* only be called by one thread at a time.
*/
public void waitAndDrain() throws InterruptedException {
throwIfInterrupted();
Runnable runnable = poll();
if (runnable == null) {
waiter = Thread.currentThread();
try {
while ((runnable = poll()) == null) {
LockSupport.park(this);
throwIfInterrupted();
}
} finally {
waiter = null;
}
}
do {
try {
runnable.run();
} catch (Throwable t) {
log.log(Level.WARNING, "Runnable threw exception", t);
}
} while ((runnable = poll()) != null);
}
private static void throwIfInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
@Override
public void execute(Runnable runnable) {
add(runnable);
LockSupport.unpark(waiter); // no-op if null
}
}
enum StubType {
BLOCKING,
FUTURE,
ASYNC
}
/** Internal {@link CallOptions.Key} to indicate stub types. */
static final CallOptions.Key<StubType> STUB_TYPE_OPTION =
CallOptions.Key.create("internal-stub-type");
}

View File

@ -21,9 +21,9 @@ import io.grpc.ManagedChannelBuilder;
import java.net.URI;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.tikv.grpc.ClientTracingInterceptor;
import org.tikv.common.HostMapping;
import org.tikv.common.pd.PDUtils;
import org.tikv.grpc.ClientTracingInterceptor;
public class ChannelFactory implements AutoCloseable {
private final int maxFrameSize;

View File

@ -96,85 +96,87 @@ public class ClientTracingInterceptor implements ClientInterceptor {
}
}
ForwardingClientCall.SimpleForwardingClientCall result = new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
ForwardingClientCall.SimpleForwardingClientCall result =
new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
private void end() {
slowLog.log();
}
private void end() {
slowLog.log();
}
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
SlowLogSpan span1 = slowLog.start("Started call");
Listener<RespT> tracingResponseListener =
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
SlowLogSpan span1 = slowLog.start("Started call");
Listener<RespT> tracingResponseListener =
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onHeaders(Metadata headers) {
SlowLogSpan span = slowLog.start("Response headers received" + headers.toString());
delegate().onHeaders(headers);
span.end();
}
@Override
public void onHeaders(Metadata headers) {
SlowLogSpan span =
slowLog.start("Response headers received" + headers.toString());
delegate().onHeaders(headers);
span.end();
}
@Override
public void onMessage(RespT message) {
SlowLogSpan span = slowLog.start("Response received");
delegate().onMessage(message);
span.end();
}
@Override
public void onMessage(RespT message) {
SlowLogSpan span = slowLog.start("Response received");
delegate().onMessage(message);
span.end();
}
@Override
public void onClose(Status status, Metadata trailers) {
SlowLogSpan span = null;
if (status.getCode().value() == 0) {
span = slowLog.start("Call closed");
} else {
span = slowLog.start("Call failed: " + status.getDescription());
}
delegate().onClose(status, trailers);
span.end();
end();
}
};
delegate().start(tracingResponseListener, headers);
span1.end();
}
@Override
public void onClose(Status status, Metadata trailers) {
SlowLogSpan span = null;
if (status.getCode().value() == 0) {
span = slowLog.start("Call closed");
} else {
span = slowLog.start("Call failed: " + status.getDescription());
}
delegate().onClose(status, trailers);
span.end();
end();
}
};
delegate().start(tracingResponseListener, headers);
span1.end();
}
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
String errorMessage;
if (message == null) {
errorMessage = "Error";
} else {
errorMessage = message;
}
SlowLogSpan span = null;
if (cause == null) {
span = slowLog.start(errorMessage);
} else {
span = slowLog.start(errorMessage + cause.getMessage());
}
delegate().cancel(message, cause);
span.end();
end();
}
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
String errorMessage;
if (message == null) {
errorMessage = "Error";
} else {
errorMessage = message;
}
SlowLogSpan span = null;
if (cause == null) {
span = slowLog.start(errorMessage);
} else {
span = slowLog.start(errorMessage + cause.getMessage());
}
delegate().cancel(message, cause);
span.end();
end();
}
@Override
public void halfClose() {
SlowLogSpan span = slowLog.start("Finished sending messages");
delegate().halfClose();
span.end();
end();
}
@Override
public void halfClose() {
SlowLogSpan span = slowLog.start("Finished sending messages");
delegate().halfClose();
span.end();
end();
}
@Override
public void sendMessage(ReqT message) {
SlowLogSpan span = slowLog.start("Message sent");
delegate().sendMessage(message);
span.end();
}
};
@Override
public void sendMessage(ReqT message) {
SlowLogSpan span = slowLog.start("Message sent");
delegate().sendMessage(message);
span.end();
}
};
span.end();
return result;