From 0628cab226ae75c5103c322440d302fa76dbfc6d Mon Sep 17 00:00:00 2001 From: DNVindhya Date: Wed, 16 Mar 2022 22:09:25 -0700 Subject: [PATCH] observability: implement client interceptor for logging (#8956) --- .../io/grpc/observability/Observability.java | 9 +- .../interceptors/InetAddressUtil.java | 94 +++ .../InternalLoggingChannelInterceptor.java | 210 +++++- .../observability/interceptors/LogHelper.java | 371 +++++++++++ .../observability/logging/GcpLogSink.java | 138 ++++ .../io/grpc/observability/logging/Sink.java | 36 + .../v1/observabilitylog.proto | 2 +- .../LoggingChannelProviderTest.java | 9 +- .../grpc/observability/ObservabilityTest.java | 2 +- ...InternalLoggingChannelInterceptorTest.java | 416 ++++++++++++ .../interceptors/LogHelperTest.java | 618 ++++++++++++++++++ .../observability/logging/GcpLogSinkTest.java | 99 +++ 12 files changed, 1995 insertions(+), 9 deletions(-) create mode 100644 observability/src/main/java/io/grpc/observability/interceptors/InetAddressUtil.java create mode 100644 observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java create mode 100644 observability/src/main/java/io/grpc/observability/logging/GcpLogSink.java create mode 100644 observability/src/main/java/io/grpc/observability/logging/Sink.java create mode 100644 observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java create mode 100644 observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java create mode 100644 observability/src/test/java/io/grpc/observability/logging/GcpLogSinkTest.java diff --git a/observability/src/main/java/io/grpc/observability/Observability.java b/observability/src/main/java/io/grpc/observability/Observability.java index a7a6b04ad0..a6e6de7f36 100644 --- a/observability/src/main/java/io/grpc/observability/Observability.java +++ b/observability/src/main/java/io/grpc/observability/Observability.java @@ -20,11 +20,14 @@ import io.grpc.ExperimentalApi; import io.grpc.ManagedChannelProvider.ProviderNotFoundException; import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.observability.interceptors.InternalLoggingServerInterceptor; +import io.grpc.observability.logging.GcpLogSink; +import io.grpc.observability.logging.Sink; /** The main class for gRPC Observability features. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869") public final class Observability { private static boolean initialized = false; + private static final String PROJECT_ID = "PROJECT"; /** * Initialize grpc-observability. @@ -35,13 +38,17 @@ public final class Observability { if (initialized) { throw new IllegalStateException("Observability already initialized!"); } - LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl()); + // TODO(dnvindhya): PROJECT_ID to be replaced with configured destinationProjectId + Sink sink = new GcpLogSink(PROJECT_ID); + LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl(sink)); LoggingServerProvider.init(new InternalLoggingServerInterceptor.FactoryImpl()); // TODO(sanjaypujare): initialize customTags map initialized = true; } /** Un-initialize or finish grpc-observability. */ + // TODO(sanjaypujare): Once Observability is made into Singleton object, + // close() on sink will be called as part of grpcFinish() public static synchronized void grpcFinish() { if (!initialized) { throw new IllegalStateException("Observability not initialized!"); diff --git a/observability/src/main/java/io/grpc/observability/interceptors/InetAddressUtil.java b/observability/src/main/java/io/grpc/observability/interceptors/InetAddressUtil.java new file mode 100644 index 0000000000..9e6ff2ac89 --- /dev/null +++ b/observability/src/main/java/io/grpc/observability/interceptors/InetAddressUtil.java @@ -0,0 +1,94 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.observability.interceptors; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.primitives.Ints; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.util.Arrays; + +// This is copied from guava 20.0 because it is a @Beta api +final class InetAddressUtil { + private static final int IPV6_PART_COUNT = 8; + + public static String toAddrString(InetAddress ip) { + checkNotNull(ip); + if (ip instanceof Inet4Address) { + // For IPv4, Java's formatting is good enough. + return ip.getHostAddress(); + } + checkArgument(ip instanceof Inet6Address); + byte[] bytes = ip.getAddress(); + int[] hextets = new int[IPV6_PART_COUNT]; + for (int i = 0; i < hextets.length; i++) { + hextets[i] = Ints.fromBytes((byte) 0, (byte) 0, bytes[2 * i], bytes[2 * i + 1]); + } + compressLongestRunOfZeroes(hextets); + return hextetsToIPv6String(hextets); + } + + private static void compressLongestRunOfZeroes(int[] hextets) { + int bestRunStart = -1; + int bestRunLength = -1; + int runStart = -1; + for (int i = 0; i < hextets.length + 1; i++) { + if (i < hextets.length && hextets[i] == 0) { + if (runStart < 0) { + runStart = i; + } + } else if (runStart >= 0) { + int runLength = i - runStart; + if (runLength > bestRunLength) { + bestRunStart = runStart; + bestRunLength = runLength; + } + runStart = -1; + } + } + if (bestRunLength >= 2) { + Arrays.fill(hextets, bestRunStart, bestRunStart + bestRunLength, -1); + } + } + + private static String hextetsToIPv6String(int[] hextets) { + // While scanning the array, handle these state transitions: + // start->num => "num" start->gap => "::" + // num->num => ":num" num->gap => "::" + // gap->num => "num" gap->gap => "" + StringBuilder buf = new StringBuilder(39); + boolean lastWasNumber = false; + for (int i = 0; i < hextets.length; i++) { + boolean thisIsNumber = hextets[i] >= 0; + if (thisIsNumber) { + if (lastWasNumber) { + buf.append(':'); + } + buf.append(Integer.toHexString(hextets[i])); + } else { + if (i == 0 || lastWasNumber) { + buf.append("::"); + } + } + lastWasNumber = thisIsNumber; + } + return buf.toString(); + } +} diff --git a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java index 3e535de381..63d5280584 100644 --- a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java +++ b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java @@ -16,33 +16,235 @@ package io.grpc.observability.interceptors; +import com.google.protobuf.Duration; +import com.google.protobuf.util.Durations; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.Context; +import io.grpc.Deadline; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.Internal; +import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.internal.TimeProvider; +import io.grpc.observability.logging.Sink; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; -/** A logging interceptor for {@code LoggingChannelProvider}. */ +/** + * A logging interceptor for {@code LoggingChannelProvider}. + */ @Internal public final class InternalLoggingChannelInterceptor implements ClientInterceptor { + private static final Logger logger = Logger + .getLogger(InternalLoggingChannelInterceptor.class.getName()); + + private final LogHelper helper; public interface Factory { ClientInterceptor create(); } public static class FactoryImpl implements Factory { + private final Sink sink; + private final LogHelper helper; + + static LogHelper createLogHelper(Sink sink, TimeProvider provider) { + return new LogHelper(sink, provider); + } + + public FactoryImpl(Sink sink) { + this.sink = sink; + this.helper = createLogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); + } @Override public ClientInterceptor create() { - return new InternalLoggingChannelInterceptor(); + return new InternalLoggingChannelInterceptor(helper); } + + /** + * Closes the sink instance. + */ + public void close() { + if (sink != null) { + sink.close(); + } + } + } + + private InternalLoggingChannelInterceptor(LogHelper helper) { + this.helper = helper; } @Override public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { - // TODO(dnvindhya) implement the interceptor - return null; + + final AtomicLong seq = new AtomicLong(1); + final String rpcId = UUID.randomUUID().toString(); + final String authority = next.authority(); + final String serviceName = method.getServiceName(); + final String methodName = method.getBareMethodName(); + // Get the stricter deadline to calculate the timeout once the call starts + final Deadline deadline = LogHelper.min(callOptions.getDeadline(), + Context.current().getDeadline()); + + // TODO (dnvindhya): implement isMethodToBeLogged() to check for methods to be logged + // according to config. Until then always return true. + if (!helper.isMethodToBeLogged(method.getFullMethodName())) { + return next.newCall(method, callOptions); + } + + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + + @Override + public void start(Listener responseListener, Metadata headers) { + // Event: EventType.GRPC_CALL_REQUEST_HEADER + // The timeout should reflect the time remaining when the call is started, so compute + // remaining time here. + final Duration timeout = deadline == null ? null + : Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)); + + try { + helper.logRequestHeader( + seq.getAndIncrement(), + serviceName, + methodName, + authority, + timeout, + headers, + EventLogger.LOGGER_CLIENT, + rpcId, + null); + } catch (Exception e) { + // Catching generic exceptions instead of specific ones for all the events. + // This way we can catch both expected and unexpected exceptions instead of re-throwing + // exceptions to callers which will lead to RPC getting aborted. + // Expected exceptions to be caught: + // 1. IllegalArgumentException + // 2. NullPointerException + logger.log(Level.SEVERE, "Unable to log request header", e); + } + + Listener observabilityListener = + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onMessage(RespT message) { + // Event: EventType.GRPC_CALL_RESPONSE_MESSAGE + try { + helper.logRpcMessage( + seq.getAndIncrement(), + serviceName, + methodName, + EventType.GRPC_CALL_RESPONSE_MESSAGE, + message, + EventLogger.LOGGER_CLIENT, + rpcId); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log response message", e); + } + super.onMessage(message); + } + + @Override + public void onHeaders(Metadata headers) { + // Event: EventType.GRPC_CALL_RESPONSE_HEADER + try { + helper.logResponseHeader( + seq.getAndIncrement(), + serviceName, + methodName, + headers, + EventLogger.LOGGER_CLIENT, + rpcId, + LogHelper.getPeerAddress(getAttributes())); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log response header", e); + } + super.onHeaders(headers); + } + + @Override + public void onClose(Status status, Metadata trailers) { + // Event: EventType.GRPC_CALL_TRAILER + try { + helper.logTrailer( + seq.getAndIncrement(), + serviceName, + methodName, + status, + trailers, + EventLogger.LOGGER_CLIENT, + rpcId, + LogHelper.getPeerAddress(getAttributes())); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log trailer", e); + } + super.onClose(status, trailers); + } + }; + super.start(observabilityListener, headers); + } + + @Override + public void sendMessage(ReqT message) { + // Event: EventType.GRPC_CALL_REQUEST_MESSAGE + try { + helper.logRpcMessage( + seq.getAndIncrement(), + serviceName, + methodName, + EventType.GRPC_CALL_REQUEST_MESSAGE, + message, + EventLogger.LOGGER_CLIENT, + rpcId); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log request message", e); + } + super.sendMessage(message); + } + + @Override + public void halfClose() { + // Event: EventType.GRPC_CALL_HALF_CLOSE + try { + helper.logHalfClose( + seq.getAndIncrement(), + serviceName, + methodName, + EventLogger.LOGGER_CLIENT, + rpcId); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log half close", e); + } + super.halfClose(); + } + + @Override + public void cancel(String message, Throwable cause) { + // Event: EventType.GRPC_CALL_CANCEL + try { + helper.logCancel( + seq.getAndIncrement(), + serviceName, + methodName, + EventLogger.LOGGER_CLIENT, + rpcId); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log cancel", e); + } + super.cancel(message, cause); + } + }; } } diff --git a/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java b/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java new file mode 100644 index 0000000000..be02113174 --- /dev/null +++ b/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java @@ -0,0 +1,371 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.observability.interceptors; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Charsets; +import com.google.protobuf.ByteString; +import com.google.protobuf.Duration; +import com.google.protobuf.util.Timestamps; +import io.grpc.Attributes; +import io.grpc.Deadline; +import io.grpc.Grpc; +import io.grpc.InternalMetadata; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.internal.TimeProvider; +import io.grpc.observability.logging.Sink; +import io.grpc.observabilitylog.v1.GrpcLogRecord; +import io.grpc.observabilitylog.v1.GrpcLogRecord.Address; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; +import io.grpc.observabilitylog.v1.GrpcLogRecord.LogLevel; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** + * Helper class for GCP observability logging. + */ +class LogHelper { + private static final Logger logger = Logger.getLogger(LogHelper.class.getName()); + + // TODO(dnvindhya): Define it in one places(TBD) to make it easily accessible from everywhere + static final Metadata.Key STATUS_DETAILS_KEY = + Metadata.Key.of( + "grpc-status-details-bin", + Metadata.BINARY_BYTE_MARSHALLER); + + private final Sink sink; + private final TimeProvider timeProvider; + + LogHelper(Sink sink, TimeProvider timeProvider) { + this.sink = sink; + this.timeProvider = timeProvider; + } + + /** + * Logs the request header. + * Binary logging equivalent of logClientHeader. + */ + void logRequestHeader( + long seqId, + String serviceName, + String methodName, + String authority, + @Nullable Duration timeout, + Metadata metadata, + GrpcLogRecord.EventLogger eventLogger, + String rpcId, + // null on client side + @Nullable SocketAddress peerAddress) { + checkNotNull(serviceName, "serviceName"); + checkNotNull(methodName, "methodName"); + checkNotNull(rpcId, "rpcId"); + checkArgument( + peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_SERVER, + "peerAddress can only be specified by server"); + + PayloadBuilder pair = createMetadataProto(metadata); + GrpcLogRecord.Builder logEntryBuilder = createTimestamp() + .setSequenceId(seqId) + .setServiceName(serviceName) + .setMethodName(methodName) + .setAuthority(authority) + .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) + .setEventLogger(eventLogger) + .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) + .setMetadata(pair.proto) + .setPayloadSize(pair.size) + .setRpcId(rpcId); + if (timeout != null) { + logEntryBuilder.setTimeout(timeout); + } + if (peerAddress != null) { + logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress)); + } + sink.write(logEntryBuilder.build()); + } + + /** + * Logs the reponse header. + * Binary logging equivalent of logServerHeader. + */ + void logResponseHeader( + long seqId, + String serviceName, + String methodName, + Metadata metadata, + GrpcLogRecord.EventLogger eventLogger, + String rpcId, + @Nullable SocketAddress peerAddress) { + checkNotNull(serviceName, "serviceName"); + checkNotNull(methodName, "methodName"); + checkNotNull(rpcId, "rpcId"); + // Logging peer address only on the first incoming event. On server side, peer address will + // of logging request header + checkArgument( + peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT, + "peerAddress can only be specified for client"); + + PayloadBuilder pair = createMetadataProto(metadata); + GrpcLogRecord.Builder logEntryBuilder = createTimestamp() + .setSequenceId(seqId) + .setServiceName(serviceName) + .setMethodName(methodName) + .setEventType(EventType.GRPC_CALL_RESPONSE_HEADER) + .setEventLogger(eventLogger) + .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) + .setMetadata(pair.proto) + .setPayloadSize(pair.size) + .setRpcId(rpcId); + if (peerAddress != null) { + logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress)); + } + sink.write(logEntryBuilder.build()); + } + + /** + * Logs the server trailer. + */ + void logTrailer( + long seqId, + String serviceName, + String methodName, + Status status, + Metadata metadata, + GrpcLogRecord.EventLogger eventLogger, + String rpcId, + @Nullable SocketAddress peerAddress) { + checkNotNull(serviceName, "serviceName"); + checkNotNull(methodName, "methodName"); + checkNotNull(status, "status"); + checkNotNull(rpcId, "rpcId"); + checkArgument( + peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT, + "peerAddress can only be specified for client"); + + PayloadBuilder pair = createMetadataProto(metadata); + GrpcLogRecord.Builder logEntryBuilder = createTimestamp() + .setSequenceId(seqId) + .setServiceName(serviceName) + .setMethodName(methodName) + .setEventType(EventType.GRPC_CALL_TRAILER) + .setEventLogger(eventLogger) + .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) + .setMetadata(pair.proto) + .setPayloadSize(pair.size) + .setStatusCode(status.getCode().value()) + .setRpcId(rpcId); + String statusDescription = status.getDescription(); + if (statusDescription != null) { + logEntryBuilder.setStatusMessage(statusDescription); + } + byte[] statusDetailBytes = metadata.get(STATUS_DETAILS_KEY); + if (statusDetailBytes != null) { + logEntryBuilder.setStatusDetails(ByteString.copyFrom(statusDetailBytes)); + } + if (peerAddress != null) { + logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress)); + } + sink.write(logEntryBuilder.build()); + } + + /** + * Logs the RPC message. + */ + void logRpcMessage( + long seqId, + String serviceName, + String methodName, + GrpcLogRecord.EventType eventType, + T message, + GrpcLogRecord.EventLogger eventLogger, + String rpcId) { + checkNotNull(serviceName, "serviceName"); + checkNotNull(methodName, "methodName"); + checkNotNull(rpcId, "rpcId"); + checkArgument( + eventType == EventType.GRPC_CALL_REQUEST_MESSAGE + || eventType == EventType.GRPC_CALL_RESPONSE_MESSAGE, + "event type must correspond to client message or server message"); + checkNotNull(message, "message"); + + // TODO(dnvindhya): Convert message to bystestring and also log the message + // byte[] messageArray = (byte[])message; + // int messageLength = messageArray.length; + // ByteString messageData = + // ByteString.copyFrom((byte[]) message, 0, ((byte[]) message).length); + + GrpcLogRecord.Builder logEntryBuilder = createTimestamp() + .setSequenceId(seqId) + .setServiceName(serviceName) + .setMethodName(methodName) + .setEventType(eventType) + .setEventLogger(eventLogger) + .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) + .setRpcId(rpcId); + sink.write(logEntryBuilder.build()); + } + + /** + * Logs half close. + */ + void logHalfClose( + long seqId, + String serviceName, + String methodName, + GrpcLogRecord.EventLogger eventLogger, + String rpcId) { + checkNotNull(serviceName, "serviceName"); + checkNotNull(methodName, "methodName"); + checkNotNull(rpcId, "rpcId"); + + GrpcLogRecord.Builder logEntryBuilder = createTimestamp() + .setSequenceId(seqId) + .setServiceName(serviceName) + .setMethodName(methodName) + .setEventType(EventType.GRPC_CALL_HALF_CLOSE) + .setEventLogger(eventLogger) + .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) + .setRpcId(rpcId); + sink.write(logEntryBuilder.build()); + } + + /** + * Logs cancellation. + */ + void logCancel( + long seqId, + String serviceName, + String methodName, + GrpcLogRecord.EventLogger eventLogger, + String rpcId) { + checkNotNull(serviceName, "serviceName"); + checkNotNull(methodName, "methodName"); + checkNotNull(rpcId, "rpcId"); + + GrpcLogRecord.Builder logEntryBuilder = createTimestamp() + .setSequenceId(seqId) + .setServiceName(serviceName) + .setMethodName(methodName) + .setEventType(EventType.GRPC_CALL_CANCEL) + .setEventLogger(eventLogger) + .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) + .setRpcId(rpcId); + sink.write(logEntryBuilder.build()); + } + + GrpcLogRecord.Builder createTimestamp() { + long nanos = timeProvider.currentTimeNanos(); + return GrpcLogRecord.newBuilder().setTimestamp(Timestamps.fromNanos(nanos)); + } + + static final class PayloadBuilder { + T proto; + int size; + + private PayloadBuilder(T proto, int size) { + this.proto = proto; + this.size = size; + } + } + + // TODO(dnvindhya): Create a unit test for the metadata conversion + static PayloadBuilder createMetadataProto(Metadata metadata) { + checkNotNull(metadata, "metadata"); + GrpcLogRecord.Metadata.Builder metadataBuilder = GrpcLogRecord.Metadata.newBuilder(); + // This code is tightly coupled with io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata's + // implementation + byte[][] serialized = InternalMetadata.serialize(metadata); + int totalMetadataBytes = 0; + if (serialized != null) { + int singleMetadataEntryBytes = 0; + // Calculate bytes for each GrpcLogRecord.Metadata.MetadataEntry + for (int i = 0; i < serialized.length; i += 2) { + String key = new String(serialized[i], Charsets.UTF_8); + byte[] value = serialized[i + 1]; + singleMetadataEntryBytes = totalMetadataBytes + key.length() + value.length; + metadataBuilder.addEntryBuilder() + .setKey(key) + .setValue(ByteString.copyFrom(value)); + totalMetadataBytes = singleMetadataEntryBytes; + } + } + return new PayloadBuilder<>(metadataBuilder, totalMetadataBytes); + } + + static Address socketAddressToProto(SocketAddress address) { + checkNotNull(address, "address"); + Address.Builder builder = Address.newBuilder(); + if (address instanceof InetSocketAddress) { + InetAddress inetAddress = ((InetSocketAddress) address).getAddress(); + if (inetAddress instanceof Inet4Address) { + builder.setType(Address.Type.TYPE_IPV4) + .setAddress(InetAddressUtil.toAddrString(inetAddress)); + } else if (inetAddress instanceof Inet6Address) { + builder.setType(Address.Type.TYPE_IPV6) + .setAddress(InetAddressUtil.toAddrString(inetAddress)); + } else { + logger.log(Level.SEVERE, "unknown type of InetSocketAddress: {}", address); + builder.setAddress(address.toString()); + } + builder.setIpPort(((InetSocketAddress) address).getPort()); + } else if (address.getClass().getName().equals("io.netty.channel.unix.DomainSocketAddress")) { + // To avoid a compile time dependency on grpc-netty, we check against the + // runtime class name. + builder.setType(Address.Type.TYPE_UNIX) + .setAddress(address.toString()); + } else { + builder.setType(Address.Type.TYPE_UNKNOWN).setAddress(address.toString()); + } + return builder.build(); + } + + /** + * Retrieves socket address. + */ + static SocketAddress getPeerAddress(Attributes streamAttributes) { + return streamAttributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); + } + + /** + * Checks deadline for timeout. + */ + static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) { + if (deadline0 == null) { + return deadline1; + } + if (deadline1 == null) { + return deadline0; + } + return deadline0.minimum(deadline1); + } + + // TODO (dnvindhya) : Implement service and method name filtering + // Add unit tests for the method as part of filtering implementation + boolean isMethodToBeLogged(String fullMethodName) { + return true; + } + +} diff --git a/observability/src/main/java/io/grpc/observability/logging/GcpLogSink.java b/observability/src/main/java/io/grpc/observability/logging/GcpLogSink.java new file mode 100644 index 0000000000..419160062d --- /dev/null +++ b/observability/src/main/java/io/grpc/observability/logging/GcpLogSink.java @@ -0,0 +1,138 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.observability.logging; + +import com.google.cloud.MonitoredResource; +import com.google.cloud.logging.LogEntry; +import com.google.cloud.logging.Logging; +import com.google.cloud.logging.LoggingOptions; +import com.google.cloud.logging.Payload.JsonPayload; +import com.google.cloud.logging.Severity; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.protobuf.util.JsonFormat; +import io.grpc.internal.JsonParser; +import io.grpc.observabilitylog.v1.GrpcLogRecord; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Sink for Google Cloud Logging. + */ +public class GcpLogSink implements Sink { + private final Logger logger = Logger.getLogger(GcpLogSink.class.getName()); + + // TODO (dnvindhya): Make cloud logging service a configurable value + private static final String SERVICE_TO_EXCLUDE = "google.logging.v2.LoggingServiceV2"; + private static final String DEFAULT_LOG_NAME = "grpc"; + private final Logging gcpLoggingClient; + + private static Logging createLoggingClient(String projectId) { + LoggingOptions.Builder builder = LoggingOptions.newBuilder(); + if (!Strings.isNullOrEmpty(projectId)) { + builder.setProjectId(projectId); + } + return builder.build().getService(); + } + + /** + * Retrieves a single instance of GcpLogSink. + * + * @param destinationProjectId cloud project id to write logs + */ + public GcpLogSink(String destinationProjectId) { + this(createLoggingClient(destinationProjectId)); + } + + @VisibleForTesting + GcpLogSink(Logging client) { + this.gcpLoggingClient = client; + } + + @Override + public void write(GrpcLogRecord logProto) { + if (gcpLoggingClient == null) { + logger.log(Level.SEVERE, "Attempt to write after GcpLogSink is closed."); + return; + } + if (SERVICE_TO_EXCLUDE.equals(logProto.getServiceName())) { + return; + } + try { + GrpcLogRecord.EventType event = logProto.getEventType(); + Severity logEntrySeverity = getCloudLoggingLevel(logProto.getLogLevel()); + // TODO(vindhyan): make sure all (int, long) values are not displayed as double + LogEntry grpcLogEntry = + LogEntry.newBuilder(JsonPayload.of(protoToMapConverter(logProto))) + .setSeverity(logEntrySeverity) + .setLogName(DEFAULT_LOG_NAME) + .setResource(MonitoredResource.newBuilder("global").build()) + .build(); + synchronized (this) { + logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", event); + gcpLoggingClient.write(Collections.singleton(grpcLogEntry)); + } + } catch (Exception e) { + logger.log(Level.SEVERE, "Caught exception while writing to Cloud Logging", e); + } + } + + @SuppressWarnings("unchecked") + private Map protoToMapConverter(GrpcLogRecord logProto) + throws IOException { + JsonFormat.Printer printer = JsonFormat.printer().preservingProtoFieldNames(); + String recordJson = printer.print(logProto); + return (Map) JsonParser.parse(recordJson); + } + + private Severity getCloudLoggingLevel(GrpcLogRecord.LogLevel recordLevel) { + switch (recordLevel.getNumber()) { + case 1: // GrpcLogRecord.LogLevel.LOG_LEVEL_TRACE + case 2: // GrpcLogRecord.LogLevel.LOG_LEVEL_DEBUG + return Severity.DEBUG; + case 3: // GrpcLogRecord.LogLevel.LOG_LEVEL_INFO + return Severity.INFO; + case 4: // GrpcLogRecord.LogLevel.LOG_LEVEL_WARN + return Severity.WARNING; + case 5: // GrpcLogRecord.LogLevel.LOG_LEVEL_ERROR + return Severity.ERROR; + case 6: // GrpcLogRecord.LogLevel.LOG_LEVEL_CRITICAL + return Severity.CRITICAL; + default: + return Severity.DEFAULT; + } + } + + /** + * Closes Cloud Logging Client. + */ + @Override + public synchronized void close() { + if (gcpLoggingClient == null) { + logger.log(Level.WARNING, "Attempt to close after GcpLogSink is closed."); + return; + } + try { + gcpLoggingClient.close(); + } catch (Exception e) { + logger.log(Level.SEVERE, "Caught exception while closing", e); + } + } +} \ No newline at end of file diff --git a/observability/src/main/java/io/grpc/observability/logging/Sink.java b/observability/src/main/java/io/grpc/observability/logging/Sink.java new file mode 100644 index 0000000000..78b4e50c57 --- /dev/null +++ b/observability/src/main/java/io/grpc/observability/logging/Sink.java @@ -0,0 +1,36 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.observability.logging; + +import io.grpc.ExperimentalApi; +import io.grpc.observabilitylog.v1.GrpcLogRecord; + +/** + * Sink for GCP observability. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869") +public interface Sink { + /** + * Writes the {@code message} to the destination. + */ + void write(GrpcLogRecord message); + + /** + * Closes the sink. + */ + void close(); +} diff --git a/observability/src/main/proto/grpc/observabilitylog/v1/observabilitylog.proto b/observability/src/main/proto/grpc/observabilitylog/v1/observabilitylog.proto index d2e72329cd..a37ac6f43d 100644 --- a/observability/src/main/proto/grpc/observabilitylog/v1/observabilitylog.proto +++ b/observability/src/main/proto/grpc/observabilitylog/v1/observabilitylog.proto @@ -69,7 +69,7 @@ message GrpcLogRecord { // Each call may have several log entries. They will all have the same rpc_id. // Nothing is guaranteed about their value other than they are unique across // different RPCs in the same gRPC process. - uint64 rpc_id = 2; + string rpc_id = 2; EventType event_type = 3; // one of the above EventType enum EventLogger event_logger = 4; // one of the above EventLogger enum diff --git a/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java b/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java index 639bcbc6d0..31e1268262 100644 --- a/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java +++ b/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java @@ -35,6 +35,8 @@ import io.grpc.ManagedChannelProvider; import io.grpc.MethodDescriptor; import io.grpc.TlsChannelCredentials; import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor; +import io.grpc.observability.logging.GcpLogSink; +import io.grpc.observability.logging.Sink; import io.grpc.testing.TestMethodDescriptors; import org.junit.Rule; import org.junit.Test; @@ -46,6 +48,7 @@ import org.mockito.junit.MockitoRule; @RunWith(JUnit4.class) public class LoggingChannelProviderTest { + @Rule public final MockitoRule mocks = MockitoJUnit.rule(); @@ -55,10 +58,12 @@ public class LoggingChannelProviderTest { public void initTwiceCausesException() { ManagedChannelProvider prevProvider = ManagedChannelProvider.provider(); assertThat(prevProvider).isNotInstanceOf(LoggingChannelProvider.class); - LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl()); + Sink mockSink = mock(GcpLogSink.class); + LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl(mockSink)); assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class); try { - LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl()); + LoggingChannelProvider.init( + new InternalLoggingChannelInterceptor.FactoryImpl(mockSink)); fail("should have failed for calling init() again"); } catch (IllegalStateException e) { assertThat(e).hasMessageThat().contains("LoggingChannelProvider already initialized!"); diff --git a/observability/src/test/java/io/grpc/observability/ObservabilityTest.java b/observability/src/test/java/io/grpc/observability/ObservabilityTest.java index 6c71a0e264..37fadb6a00 100644 --- a/observability/src/test/java/io/grpc/observability/ObservabilityTest.java +++ b/observability/src/test/java/io/grpc/observability/ObservabilityTest.java @@ -38,7 +38,7 @@ public class ObservabilityTest { Observability.grpcFinish(); try { Observability.grpcFinish(); - fail("should have failed for calling grpcFinit() on uninitialized"); + fail("should have failed for calling grpcFinish() on uninitialized"); } catch (IllegalStateException e) { assertThat(e).hasMessageThat().contains("Observability not initialized!"); } diff --git a/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java new file mode 100644 index 0000000000..6ddb2abc32 --- /dev/null +++ b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java @@ -0,0 +1,416 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.observability.interceptors; + +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.observability.interceptors.LogHelperTest.BYTEARRAY_MARSHALLER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import com.google.common.util.concurrent.SettableFuture; +import com.google.protobuf.ByteString; +import com.google.protobuf.Duration; +import com.google.protobuf.util.Durations; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.Context; +import io.grpc.Deadline; +import io.grpc.Grpc; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.MethodType; +import io.grpc.Status; +import io.grpc.internal.NoopClientCall; +import io.grpc.observability.logging.GcpLogSink; +import io.grpc.observability.logging.Sink; +import io.grpc.observabilitylog.v1.GrpcLogRecord; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; + +import io.grpc.observabilitylog.v1.GrpcLogRecord.MetadataEntry; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.charset.Charset; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +/** + * Tests for {@link InternalLoggingChannelInterceptor}. + */ +@RunWith(JUnit4.class) +public class InternalLoggingChannelInterceptorTest { + + @Rule + public final MockitoRule mockito = MockitoJUnit.rule(); + + private static final Charset US_ASCII = Charset.forName("US-ASCII"); + + private InternalLoggingChannelInterceptor.Factory factory; + private AtomicReference> interceptedListener; + private AtomicReference actualClientInitial; + private AtomicReference actualRequest; + private SettableFuture halfCloseCalled; + private SettableFuture cancelCalled; + private SocketAddress peer; + private final Sink mockSink = mock(GcpLogSink.class); + + @Before + public void setup() throws Exception { + factory = new InternalLoggingChannelInterceptor.FactoryImpl(mockSink); + interceptedListener = new AtomicReference<>(); + actualClientInitial = new AtomicReference<>(); + actualRequest = new AtomicReference<>(); + halfCloseCalled = SettableFuture.create(); + cancelCalled = SettableFuture.create(); + peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234); + } + + @Test + public void internalLoggingChannelInterceptor() throws Exception { + Channel channel = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + return new NoopClientCall() { + @Override + @SuppressWarnings("unchecked") + public void start(Listener responseListener, Metadata headers) { + interceptedListener.set((Listener) responseListener); + actualClientInitial.set(headers); + } + + @Override + public void sendMessage(RequestT message) { + actualRequest.set(message); + } + + @Override + public void cancel(String message, Throwable cause) { + cancelCalled.set(null); + } + + @Override + public void halfClose() { + halfCloseCalled.set(null); + } + + @Override + public Attributes getAttributes() { + return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build(); + } + }; + } + + @Override + public String authority() { + return "the-authority"; + } + }; + + @SuppressWarnings("unchecked") + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + + ClientCall interceptedLoggingCall = + factory.create() + .interceptCall(method, + CallOptions.DEFAULT, + channel); + + // send client header + { + EventType expectedRequestHeaderEvent = EventType.GRPC_CALL_REQUEST_HEADER; + ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); + Metadata clientInitial = new Metadata(); + String dataA = "aaaaaaaaa"; + String dataB = "bbbbbbbbb"; + Metadata.Key keyA = + Metadata.Key.of("a", Metadata.ASCII_STRING_MARSHALLER); + Metadata.Key keyB = + Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER); + MetadataEntry entryA = + MetadataEntry + .newBuilder() + .setKey(keyA.name()) + .setValue(ByteString.copyFrom(dataA.getBytes(US_ASCII))) + .build(); + MetadataEntry entryB = + MetadataEntry + .newBuilder() + .setKey(keyB.name()) + .setValue(ByteString.copyFrom(dataB.getBytes(US_ASCII))) + .build(); + clientInitial.put(keyA, dataA); + clientInitial.put(keyB, dataB); + GrpcLogRecord.Metadata expectedMetadata = GrpcLogRecord.Metadata + .newBuilder() + .addEntry(entryA) + .addEntry(entryB) + .build(); + interceptedLoggingCall.start(mockListener, clientInitial); + verify(mockSink).write(captor.capture()); + assertEquals(captor.getValue().getEventType(), + expectedRequestHeaderEvent); + assertEquals(captor.getValue().getSequenceId(), 1L); + assertEquals(captor.getValue().getServiceName(), "service"); + assertEquals(captor.getValue().getMethodName(), "method"); + assertEquals(captor.getValue().getAuthority(), "the-authority"); + assertEquals(captor.getValue().getMetadata(), expectedMetadata); + verifyNoMoreInteractions(mockSink); + assertSame(clientInitial, actualClientInitial.get()); + } + + // TODO(dnvindhya) : Add a helper method to verify other fields of GrpcLogRecord for all events + // receive server header + { + EventType expectedResponseHeaderEvent = EventType.GRPC_CALL_RESPONSE_HEADER; + ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); + Metadata serverInitial = new Metadata(); + interceptedListener.get().onHeaders(serverInitial); + verify(mockSink, times(2)).write(captor.capture()); + assertEquals(captor.getValue().getEventType(), + expectedResponseHeaderEvent); + verifyNoMoreInteractions(mockSink); + verify(mockListener).onHeaders(same(serverInitial)); + } + + // send client message + { + EventType expectedRequestMessageEvent = EventType.GRPC_CALL_REQUEST_MESSAGE; + ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); + byte[] request = "this is a request".getBytes(US_ASCII); + interceptedLoggingCall.sendMessage(request); + verify(mockSink, times(3)).write(captor.capture()); + assertEquals(captor.getValue().getEventType(), + expectedRequestMessageEvent); + verifyNoMoreInteractions(mockSink); + assertSame(request, actualRequest.get()); + } + + // client half close + { + EventType expectedHalfCloseEvent = EventType.GRPC_CALL_HALF_CLOSE; + ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); + interceptedLoggingCall.halfClose(); + verify(mockSink, times(4)).write(captor.capture()); + assertEquals(captor.getValue().getEventType(), + expectedHalfCloseEvent); + halfCloseCalled.get(1, TimeUnit.SECONDS); + verifyNoMoreInteractions(mockSink); + } + + // receive server message + { + EventType expectedResponseMessageEvent = EventType.GRPC_CALL_RESPONSE_MESSAGE; + ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); + byte[] response = "this is a response".getBytes(US_ASCII); + interceptedListener.get().onMessage(response); + verify(mockSink, times(5)).write(captor.capture()); + assertEquals(captor.getValue().getEventType(), + expectedResponseMessageEvent); + verifyNoMoreInteractions(mockSink); + verify(mockListener).onMessage(same(response)); + } + + // receive trailer + { + EventType expectedTrailerEvent = EventType.GRPC_CALL_TRAILER; + ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); + Status status = Status.INTERNAL.withDescription("trailer description"); + Metadata trailers = new Metadata(); + + interceptedListener.get().onClose(status, trailers); + verify(mockSink, times(6)).write(captor.capture()); + assertEquals(captor.getValue().getEventType(), + expectedTrailerEvent); + verifyNoMoreInteractions(mockSink); + verify(mockListener).onClose(same(status), same(trailers)); + } + + // cancel + { + EventType expectedCancelEvent = EventType.GRPC_CALL_CANCEL; + ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); + interceptedLoggingCall.cancel(null, null); + verify(mockSink, times(7)).write(captor.capture()); + assertEquals(captor.getValue().getEventType(), + expectedCancelEvent); + cancelCalled.get(1, TimeUnit.SECONDS); + } + } + + @Test + public void clientDeadLineLogged_deadlineSetViaCallOption() { + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + @SuppressWarnings("unchecked") + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + + ClientCall interceptedLoggingCall = + factory.create() + .interceptCall( + method, + CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS), + new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions) { + return new NoopClientCall<>(); + } + + @Override + public String authority() { + return "the-authority"; + } + }); + interceptedLoggingCall.start(mockListener, new Metadata()); + ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); + verify(mockSink, times(1)).write(captor.capture()); + Duration timeout = captor.getValue().getTimeout(); + assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout)) + .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); + } + + @Test + public void clientDeadlineLogged_deadlineSetViaContext() throws Exception { + final SettableFuture> callFuture = SettableFuture.create(); + Context.current() + .withDeadline( + Deadline.after(1, TimeUnit.SECONDS), Executors.newSingleThreadScheduledExecutor()) + .run(new Runnable() { + @Override + public void run() { + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + + callFuture.set( + factory.create() + .interceptCall( + method, + CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS), + new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions) { + return new NoopClientCall<>(); + } + + @Override + public String authority() { + return "the-authority"; + } + })); + } + }); + @SuppressWarnings("unchecked") + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + callFuture.get().start(mockListener, new Metadata()); + ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); + verify(mockSink, times(1)).write(captor.capture()); + Duration timeout = captor.getValue().getTimeout(); + assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout)) + .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); + } + + @Test + public void clientDeadlineLogged_deadlineSetViaContextAndCallOptions() throws Exception { + Deadline contextDeadline = Deadline.after(10, TimeUnit.SECONDS); + Deadline callOptionsDeadline = CallOptions.DEFAULT + .withDeadlineAfter(15, TimeUnit.SECONDS).getDeadline(); + + final SettableFuture> callFuture = SettableFuture.create(); + Context.current() + .withDeadline( + contextDeadline, Executors.newSingleThreadScheduledExecutor()) + .run(new Runnable() { + @Override + public void run() { + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + + callFuture.set( + factory.create() + .interceptCall( + method, + CallOptions.DEFAULT.withDeadlineAfter(15, TimeUnit.SECONDS), + new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions) { + return new NoopClientCall<>(); + } + + @Override + public String authority() { + return "the-authority"; + } + })); + } + }); + @SuppressWarnings("unchecked") + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + callFuture.get().start(mockListener, new Metadata()); + ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); + verify(mockSink, times(1)).write(captor.capture()); + Duration timeout = captor.getValue().getTimeout(); + assertThat(LogHelper.min(contextDeadline, callOptionsDeadline)) + .isSameInstanceAs(contextDeadline); + assertThat(TimeUnit.SECONDS.toNanos(10) - Durations.toNanos(timeout)) + .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); + } +} diff --git a/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java b/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java new file mode 100644 index 0000000000..284dadb927 --- /dev/null +++ b/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java @@ -0,0 +1,618 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.observability.interceptors; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Duration; +import com.google.protobuf.Timestamp; +import com.google.protobuf.util.Durations; +import io.grpc.Attributes; +import io.grpc.Grpc; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor.Marshaller; +import io.grpc.Status; +import io.grpc.internal.TimeProvider; +import io.grpc.observability.interceptors.LogHelper.PayloadBuilder; +import io.grpc.observability.logging.GcpLogSink; +import io.grpc.observability.logging.Sink; +import io.grpc.observabilitylog.v1.GrpcLogRecord; +import io.grpc.observabilitylog.v1.GrpcLogRecord.Address; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; +import io.grpc.observabilitylog.v1.GrpcLogRecord.LogLevel; +import io.grpc.observabilitylog.v1.GrpcLogRecord.MetadataEntry; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.charset.Charset; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link LogHelper}. + */ +@RunWith(JUnit4.class) +public class LogHelperTest { + + private static final Charset US_ASCII = Charset.forName("US-ASCII"); + public static final Marshaller BYTEARRAY_MARSHALLER = new ByteArrayMarshaller(); + private static final String DATA_A = "aaaaaaaaa"; + private static final String DATA_B = "bbbbbbbbb"; + private static final String DATA_C = "ccccccccc"; + private static final Metadata.Key KEY_A = + Metadata.Key.of("a", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key KEY_B = + Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key KEY_C = + Metadata.Key.of("c", Metadata.ASCII_STRING_MARSHALLER); + private static final MetadataEntry ENTRY_A = + MetadataEntry + .newBuilder() + .setKey(KEY_A.name()) + .setValue(ByteString.copyFrom(DATA_A.getBytes(US_ASCII))) + .build(); + private static final MetadataEntry ENTRY_B = + MetadataEntry + .newBuilder() + .setKey(KEY_B.name()) + .setValue(ByteString.copyFrom(DATA_B.getBytes(US_ASCII))) + .build(); + private static final MetadataEntry ENTRY_C = + MetadataEntry + .newBuilder() + .setKey(KEY_C.name()) + .setValue(ByteString.copyFrom(DATA_C.getBytes(US_ASCII))) + .build(); + + + private final Metadata nonEmptyMetadata = new Metadata(); + private final int nonEmptyMetadataSize = 30; + private final Sink sink = mock(GcpLogSink.class); + private final Timestamp timestamp + = Timestamp.newBuilder().setSeconds(9876).setNanos(54321).build(); + private final TimeProvider timeProvider = new TimeProvider() { + @Override + public long currentTimeNanos() { + return TimeUnit.SECONDS.toNanos(9876) + 54321; + } + }; + private final LogHelper logHelper = + new LogHelper( + sink, + timeProvider); + private final byte[] message = new byte[100]; + + + @Before + public void setUp() throws Exception { + nonEmptyMetadata.put(KEY_A, DATA_A); + nonEmptyMetadata.put(KEY_B, DATA_B); + nonEmptyMetadata.put(KEY_C, DATA_C); + } + + @Test + public void socketToProto_ipv4() throws Exception { + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress socketAddress = new InetSocketAddress(address, port); + assertEquals( + Address + .newBuilder() + .setType(Address.Type.TYPE_IPV4) + .setAddress("127.0.0.1") + .setIpPort(12345) + .build(), + LogHelper.socketAddressToProto(socketAddress)); + } + + @Test + public void socketToProto_ipv6() throws Exception { + // this is a ipv6 link local address + InetAddress address = InetAddress.getByName("2001:db8:0:0:0:0:2:1"); + int port = 12345; + InetSocketAddress socketAddress = new InetSocketAddress(address, port); + assertEquals( + Address + .newBuilder() + .setType(Address.Type.TYPE_IPV6) + .setAddress("2001:db8::2:1") // RFC 5952 section 4: ipv6 canonical form required + .setIpPort(12345) + .build(), + LogHelper.socketAddressToProto(socketAddress)); + } + + @Test + public void socketToProto_unknown() throws Exception { + SocketAddress unknownSocket = new SocketAddress() { + @Override + public String toString() { + return "some-socket-address"; + } + }; + assertEquals( + Address.newBuilder() + .setType(Address.Type.TYPE_UNKNOWN) + .setAddress("some-socket-address") + .build(), + LogHelper.socketAddressToProto(unknownSocket)); + } + + @Test + public void metadataToProto_empty() throws Exception { + assertEquals( + GrpcLogRecord.newBuilder() + .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) + .setMetadata( + GrpcLogRecord.Metadata.getDefaultInstance()) + .build(), + metadataToProtoTestHelper( + EventType.GRPC_CALL_REQUEST_HEADER, new Metadata())); + } + + @Test + public void metadataToProto() throws Exception { + assertEquals( + GrpcLogRecord.newBuilder() + .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) + .setMetadata( + GrpcLogRecord.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .addEntry(ENTRY_B) + .addEntry(ENTRY_C) + .build()) + .setPayloadSize(nonEmptyMetadataSize) + .build(), + metadataToProtoTestHelper( + EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata)); + } + + @Test + public void logRequestHeader() throws Exception { + long seqId = 1; + String serviceName = "service"; + String methodName = "method"; + String authority = "authority"; + Duration timeout = Durations.fromMillis(1234); + String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress peerAddress = new InetSocketAddress(address, port); + + GrpcLogRecord.Builder builder = + metadataToProtoTestHelper(EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata) + .toBuilder() + .setTimestamp(timestamp) + .setSequenceId(seqId) + .setServiceName(serviceName) + .setMethodName(methodName) + .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) + .setEventLogger(EventLogger.LOGGER_CLIENT) + .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) + .setRpcId(rpcId); + builder.setAuthority(authority) + .setTimeout(timeout); + GrpcLogRecord base = builder.build(); + + // logged on client + { + logHelper.logRequestHeader( + seqId, + serviceName, + methodName, + authority, + timeout, + nonEmptyMetadata, + EventLogger.LOGGER_CLIENT, + rpcId, + null); + verify(sink).write(base); + } + + // logged on server + { + logHelper.logRequestHeader( + seqId, + serviceName, + methodName, + authority, + timeout, + nonEmptyMetadata, + EventLogger.LOGGER_SERVER, + rpcId, + peerAddress); + verify(sink).write( + base.toBuilder() + .setPeerAddress(LogHelper.socketAddressToProto(peerAddress)) + .setEventLogger(EventLogger.LOGGER_SERVER) + .build()); + } + + // timeout is null + { + logHelper.logRequestHeader( + seqId, + serviceName, + methodName, + authority, + null, + nonEmptyMetadata, + EventLogger.LOGGER_CLIENT, + rpcId, + null); + verify(sink).write( + base.toBuilder() + .clearTimeout() + .build()); + } + + // peerAddress is not null (error on client) + try { + logHelper.logRequestHeader( + seqId, + serviceName, + methodName, + authority, + timeout, + nonEmptyMetadata, + EventLogger.LOGGER_CLIENT, + rpcId, + peerAddress); + fail(); + } catch (IllegalArgumentException expected) { + assertThat(expected).hasMessageThat().contains("peerAddress can only be specified by server"); + } + } + + @Test + public void logResponseHeader() throws Exception { + long seqId = 1; + String serviceName = "service"; + String methodName = "method"; + String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress peerAddress = new InetSocketAddress(address, port); + + GrpcLogRecord.Builder builder = + metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata) + .toBuilder() + .setTimestamp(timestamp) + .setSequenceId(seqId) + .setServiceName(serviceName) + .setMethodName(methodName) + .setEventType(EventType.GRPC_CALL_RESPONSE_HEADER) + .setEventLogger(EventLogger.LOGGER_CLIENT) + .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) + .setRpcId(rpcId); + builder.setPeerAddress(LogHelper.socketAddressToProto(peerAddress)); + GrpcLogRecord base = builder.build(); + + // logged on client + { + logHelper.logResponseHeader( + seqId, + serviceName, + methodName, + nonEmptyMetadata, + EventLogger.LOGGER_CLIENT, + rpcId, + peerAddress); + verify(sink).write(base); + } + + // logged on server + { + logHelper.logResponseHeader( + seqId, + serviceName, + methodName, + nonEmptyMetadata, + EventLogger.LOGGER_SERVER, + rpcId, + null); + verify(sink).write( + base.toBuilder() + .setEventLogger(EventLogger.LOGGER_SERVER) + .clearPeerAddress() + .build()); + } + + // peerAddress is not null (error on server) + try { + logHelper.logResponseHeader( + seqId, + serviceName, + methodName, + nonEmptyMetadata, + EventLogger.LOGGER_SERVER, + rpcId, + peerAddress); + fail(); + } catch (IllegalArgumentException expected) { + assertThat(expected).hasMessageThat() + .contains("peerAddress can only be specified for client"); + } + } + + @Test + public void logTrailer() throws Exception { + long seqId = 1; + String serviceName = "service"; + String methodName = "method"; + String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; + InetAddress address = InetAddress.getByName("127.0.0.1"); + int port = 12345; + InetSocketAddress peerAddress = new InetSocketAddress(address, port); + Status statusDescription = Status.INTERNAL.withDescription("test description"); + + GrpcLogRecord.Builder builder = + metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata) + .toBuilder() + .setTimestamp(timestamp) + .setSequenceId(seqId) + .setServiceName(serviceName) + .setMethodName(methodName) + .setEventType(EventType.GRPC_CALL_TRAILER) + .setEventLogger(EventLogger.LOGGER_CLIENT) + .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) + .setStatusCode(Status.INTERNAL.getCode().value()) + .setStatusMessage("test description") + .setRpcId(rpcId); + builder.setPeerAddress(LogHelper.socketAddressToProto(peerAddress)); + GrpcLogRecord base = builder.build(); + + // logged on client + { + logHelper.logTrailer( + seqId, + serviceName, + methodName, + statusDescription, + nonEmptyMetadata, + EventLogger.LOGGER_CLIENT, + rpcId, + peerAddress); + verify(sink).write(base); + } + + // logged on server + { + logHelper.logTrailer( + seqId, + serviceName, + methodName, + statusDescription, + nonEmptyMetadata, + EventLogger.LOGGER_SERVER, + rpcId, + null); + verify(sink).write( + base.toBuilder() + .clearPeerAddress() + .setEventLogger(EventLogger.LOGGER_SERVER) + .build()); + } + + // peer address is null + { + logHelper.logTrailer( + seqId, + serviceName, + methodName, + statusDescription, + nonEmptyMetadata, + EventLogger.LOGGER_CLIENT, + rpcId, + null); + verify(sink).write( + base.toBuilder() + .clearPeerAddress() + .build()); + } + + // status description is null + { + logHelper.logTrailer( + seqId, + serviceName, + methodName, + statusDescription.getCode().toStatus(), + nonEmptyMetadata, + EventLogger.LOGGER_CLIENT, + rpcId, + peerAddress); + verify(sink).write( + base.toBuilder() + .clearStatusMessage() + .build()); + } + } + + @Test + public void logRpcMessage() throws Exception { + long seqId = 1; + String serviceName = "service"; + String methodName = "method"; + String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; + + GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder() + .setTimestamp(timestamp) + .setSequenceId(seqId) + .setServiceName(serviceName) + .setMethodName(methodName) + .setEventType(EventType.GRPC_CALL_REQUEST_MESSAGE) + .setEventLogger(EventLogger.LOGGER_CLIENT) + .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) + .setRpcId(rpcId); + GrpcLogRecord base = builder.build(); + // request message + { + logHelper.logRpcMessage( + seqId, + serviceName, + methodName, + EventType.GRPC_CALL_REQUEST_MESSAGE, + message, + EventLogger.LOGGER_CLIENT, + rpcId); + verify(sink).write(base); + } + // response message, logged on client + { + logHelper.logRpcMessage( + seqId, + serviceName, + methodName, + EventType.GRPC_CALL_RESPONSE_MESSAGE, + message, + EventLogger.LOGGER_CLIENT, + rpcId); + verify(sink).write( + base.toBuilder() + .setEventType(EventType.GRPC_CALL_RESPONSE_MESSAGE) + .build()); + } + // request message, logged on server + { + logHelper.logRpcMessage( + seqId, + serviceName, + methodName, + EventType.GRPC_CALL_REQUEST_MESSAGE, + message, + EventLogger.LOGGER_SERVER, + rpcId); + verify(sink).write( + base.toBuilder() + .setEventLogger(EventLogger.LOGGER_SERVER) + .build()); + } + // response message, logged on server + { + logHelper.logRpcMessage( + seqId, + serviceName, + methodName, + EventType.GRPC_CALL_RESPONSE_MESSAGE, + message, + EventLogger.LOGGER_SERVER, + rpcId); + verify(sink).write( + base.toBuilder() + .setEventType(EventType.GRPC_CALL_RESPONSE_MESSAGE) + .setEventLogger(EventLogger.LOGGER_SERVER) + .build()); + } + } + + @Test + public void getPeerAddressTest() throws Exception { + SocketAddress peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234); + assertNull(LogHelper.getPeerAddress(Attributes.EMPTY)); + assertSame( + peer, + LogHelper.getPeerAddress( + Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build())); + } + + private static GrpcLogRecord metadataToProtoTestHelper( + EventType type, Metadata metadata) { + GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder(); + PayloadBuilder pair + = LogHelper.createMetadataProto(metadata); + builder.setMetadata(pair.proto); + builder.setPayloadSize(pair.size); + builder.setEventType(type); + return builder.build(); + } + + // Used only in tests + // Copied from internal + static final class ByteArrayMarshaller implements Marshaller { + @Override + public InputStream stream(byte[] value) { + return new ByteArrayInputStream(value); + } + + @Override + public byte[] parse(InputStream stream) { + try { + return parseHelper(stream); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private byte[] parseHelper(InputStream stream) throws IOException { + try { + return IoUtils.toByteArray(stream); + } finally { + stream.close(); + } + } + } + + // Copied from internal + static final class IoUtils { + /** maximum buffer to be read is 16 KB. */ + private static final int MAX_BUFFER_LENGTH = 16384; + + /** Returns the byte array. */ + public static byte[] toByteArray(InputStream in) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + copy(in, out); + return out.toByteArray(); + } + + /** Copies the data from input stream to output stream. */ + public static long copy(InputStream from, OutputStream to) throws IOException { + // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta) + checkNotNull(from); + checkNotNull(to); + byte[] buf = new byte[MAX_BUFFER_LENGTH]; + long total = 0; + while (true) { + int r = from.read(buf); + if (r == -1) { + break; + } + to.write(buf, 0, r); + total += r; + } + return total; + } + } + +} diff --git a/observability/src/test/java/io/grpc/observability/logging/GcpLogSinkTest.java b/observability/src/test/java/io/grpc/observability/logging/GcpLogSinkTest.java new file mode 100644 index 0000000000..ab70fc4456 --- /dev/null +++ b/observability/src/test/java/io/grpc/observability/logging/GcpLogSinkTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.observability.logging; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.anyIterable; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import com.google.cloud.logging.LogEntry; +import com.google.cloud.logging.Logging; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; +import io.grpc.observabilitylog.v1.GrpcLogRecord; +import java.util.Collection; +import java.util.Iterator; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +/** + * Tests for {@link io.grpc.observability.logging.GcpLogSink}. + */ +@RunWith(JUnit4.class) +public class GcpLogSinkTest { + + @Rule + public final MockitoRule mockito = MockitoJUnit.rule(); + + private Logging mockLogging; + + @Before + public void setUp() { + mockLogging = mock(Logging.class); + } + + @Test + public void createSink() { + Sink mockSink = new GcpLogSink(mockLogging); + assertThat(mockSink).isInstanceOf(GcpLogSink.class); + } + + @Test + @SuppressWarnings("unchecked") + public void verifyWrite() throws Exception { + Sink mockSink = new GcpLogSink(mockLogging); + GrpcLogRecord logProto = GrpcLogRecord.newBuilder() + .setRpcId("1234") + .build(); + Struct expectedStructLogProto = Struct.newBuilder().putFields( + "rpc_id", Value.newBuilder().setStringValue("1234").build() + ).build(); + + mockSink.write(logProto); + ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( + (Class) Collection.class); + verify(mockLogging, times(1)).write(logEntrySetCaptor.capture()); + for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { + LogEntry entry = it.next(); + assertEquals(entry.getPayload().getData(), expectedStructLogProto); + } + verifyNoMoreInteractions(mockLogging); + } + + @Test + public void verifyClose() throws Exception { + Sink mockSink = new GcpLogSink(mockLogging); + GrpcLogRecord logProto = GrpcLogRecord.newBuilder() + .setRpcId("1234") + .build(); + mockSink.write(logProto); + verify(mockLogging, times(1)).write(anyIterable()); + mockSink.close(); + verify(mockLogging).close(); + verifyNoMoreInteractions(mockLogging); + } +}