observability: implement client interceptor for logging (#8956)

This commit is contained in:
DNVindhya 2022-03-16 22:09:25 -07:00 committed by GitHub
parent 1a6840accd
commit 0628cab226
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1995 additions and 9 deletions

View File

@ -20,11 +20,14 @@ import io.grpc.ExperimentalApi;
import io.grpc.ManagedChannelProvider.ProviderNotFoundException;
import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;
/** The main class for gRPC Observability features. */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869")
public final class Observability {
private static boolean initialized = false;
private static final String PROJECT_ID = "PROJECT";
/**
* Initialize grpc-observability.
@ -35,13 +38,17 @@ public final class Observability {
if (initialized) {
throw new IllegalStateException("Observability already initialized!");
}
LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl());
// TODO(dnvindhya): PROJECT_ID to be replaced with configured destinationProjectId
Sink sink = new GcpLogSink(PROJECT_ID);
LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl(sink));
LoggingServerProvider.init(new InternalLoggingServerInterceptor.FactoryImpl());
// TODO(sanjaypujare): initialize customTags map
initialized = true;
}
/** Un-initialize or finish grpc-observability. */
// TODO(sanjaypujare): Once Observability is made into Singleton object,
// close() on sink will be called as part of grpcFinish()
public static synchronized void grpcFinish() {
if (!initialized) {
throw new IllegalStateException("Observability not initialized!");

View File

@ -0,0 +1,94 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.observability.interceptors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.primitives.Ints;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.util.Arrays;
// This is copied from guava 20.0 because it is a @Beta api
final class InetAddressUtil {
private static final int IPV6_PART_COUNT = 8;
public static String toAddrString(InetAddress ip) {
checkNotNull(ip);
if (ip instanceof Inet4Address) {
// For IPv4, Java's formatting is good enough.
return ip.getHostAddress();
}
checkArgument(ip instanceof Inet6Address);
byte[] bytes = ip.getAddress();
int[] hextets = new int[IPV6_PART_COUNT];
for (int i = 0; i < hextets.length; i++) {
hextets[i] = Ints.fromBytes((byte) 0, (byte) 0, bytes[2 * i], bytes[2 * i + 1]);
}
compressLongestRunOfZeroes(hextets);
return hextetsToIPv6String(hextets);
}
private static void compressLongestRunOfZeroes(int[] hextets) {
int bestRunStart = -1;
int bestRunLength = -1;
int runStart = -1;
for (int i = 0; i < hextets.length + 1; i++) {
if (i < hextets.length && hextets[i] == 0) {
if (runStart < 0) {
runStart = i;
}
} else if (runStart >= 0) {
int runLength = i - runStart;
if (runLength > bestRunLength) {
bestRunStart = runStart;
bestRunLength = runLength;
}
runStart = -1;
}
}
if (bestRunLength >= 2) {
Arrays.fill(hextets, bestRunStart, bestRunStart + bestRunLength, -1);
}
}
private static String hextetsToIPv6String(int[] hextets) {
// While scanning the array, handle these state transitions:
// start->num => "num" start->gap => "::"
// num->num => ":num" num->gap => "::"
// gap->num => "num" gap->gap => ""
StringBuilder buf = new StringBuilder(39);
boolean lastWasNumber = false;
for (int i = 0; i < hextets.length; i++) {
boolean thisIsNumber = hextets[i] >= 0;
if (thisIsNumber) {
if (lastWasNumber) {
buf.append(':');
}
buf.append(Integer.toHexString(hextets[i]));
} else {
if (i == 0 || lastWasNumber) {
buf.append("::");
}
}
lastWasNumber = thisIsNumber;
}
return buf.toString();
}
}

View File

@ -16,33 +16,235 @@
package io.grpc.observability.interceptors;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Internal;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.logging.Sink;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
/** A logging interceptor for {@code LoggingChannelProvider}. */
/**
* A logging interceptor for {@code LoggingChannelProvider}.
*/
@Internal
public final class InternalLoggingChannelInterceptor implements ClientInterceptor {
private static final Logger logger = Logger
.getLogger(InternalLoggingChannelInterceptor.class.getName());
private final LogHelper helper;
public interface Factory {
ClientInterceptor create();
}
public static class FactoryImpl implements Factory {
private final Sink sink;
private final LogHelper helper;
static LogHelper createLogHelper(Sink sink, TimeProvider provider) {
return new LogHelper(sink, provider);
}
public FactoryImpl(Sink sink) {
this.sink = sink;
this.helper = createLogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
}
@Override
public ClientInterceptor create() {
return new InternalLoggingChannelInterceptor();
return new InternalLoggingChannelInterceptor(helper);
}
/**
* Closes the sink instance.
*/
public void close() {
if (sink != null) {
sink.close();
}
}
}
private InternalLoggingChannelInterceptor(LogHelper helper) {
this.helper = helper;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
// TODO(dnvindhya) implement the interceptor
return null;
final AtomicLong seq = new AtomicLong(1);
final String rpcId = UUID.randomUUID().toString();
final String authority = next.authority();
final String serviceName = method.getServiceName();
final String methodName = method.getBareMethodName();
// Get the stricter deadline to calculate the timeout once the call starts
final Deadline deadline = LogHelper.min(callOptions.getDeadline(),
Context.current().getDeadline());
// TODO (dnvindhya): implement isMethodToBeLogged() to check for methods to be logged
// according to config. Until then always return true.
if (!helper.isMethodToBeLogged(method.getFullMethodName())) {
return next.newCall(method, callOptions);
}
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// Event: EventType.GRPC_CALL_REQUEST_HEADER
// The timeout should reflect the time remaining when the call is started, so compute
// remaining time here.
final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));
try {
helper.logRequestHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
timeout,
headers,
EventLogger.LOGGER_CLIENT,
rpcId,
null);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
// exceptions to callers which will lead to RPC getting aborted.
// Expected exceptions to be caught:
// 1. IllegalArgumentException
// 2. NullPointerException
logger.log(Level.SEVERE, "Unable to log request header", e);
}
Listener<RespT> observabilityListener =
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onMessage(RespT message) {
// Event: EventType.GRPC_CALL_RESPONSE_MESSAGE
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE,
message,
EventLogger.LOGGER_CLIENT,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
super.onMessage(message);
}
@Override
public void onHeaders(Metadata headers) {
// Event: EventType.GRPC_CALL_RESPONSE_HEADER
try {
helper.logResponseHeader(
seq.getAndIncrement(),
serviceName,
methodName,
headers,
EventLogger.LOGGER_CLIENT,
rpcId,
LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
super.onHeaders(headers);
}
@Override
public void onClose(Status status, Metadata trailers) {
// Event: EventType.GRPC_CALL_TRAILER
try {
helper.logTrailer(
seq.getAndIncrement(),
serviceName,
methodName,
status,
trailers,
EventLogger.LOGGER_CLIENT,
rpcId,
LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
super.onClose(status, trailers);
}
};
super.start(observabilityListener, headers);
}
@Override
public void sendMessage(ReqT message) {
// Event: EventType.GRPC_CALL_REQUEST_MESSAGE
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE,
message,
EventLogger.LOGGER_CLIENT,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
super.sendMessage(message);
}
@Override
public void halfClose() {
// Event: EventType.GRPC_CALL_HALF_CLOSE
try {
helper.logHalfClose(
seq.getAndIncrement(),
serviceName,
methodName,
EventLogger.LOGGER_CLIENT,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
super.halfClose();
}
@Override
public void cancel(String message, Throwable cause) {
// Event: EventType.GRPC_CALL_CANCEL
try {
helper.logCancel(
seq.getAndIncrement(),
serviceName,
methodName,
EventLogger.LOGGER_CLIENT,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
super.cancel(message, cause);
}
};
}
}

View File

@ -0,0 +1,371 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.observability.interceptors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Charsets;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Timestamps;
import io.grpc.Attributes;
import io.grpc.Deadline;
import io.grpc.Grpc;
import io.grpc.InternalMetadata;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.logging.Sink;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.Address;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.observabilitylog.v1.GrpcLogRecord.LogLevel;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* Helper class for GCP observability logging.
*/
class LogHelper {
private static final Logger logger = Logger.getLogger(LogHelper.class.getName());
// TODO(dnvindhya): Define it in one places(TBD) to make it easily accessible from everywhere
static final Metadata.Key<byte[]> STATUS_DETAILS_KEY =
Metadata.Key.of(
"grpc-status-details-bin",
Metadata.BINARY_BYTE_MARSHALLER);
private final Sink sink;
private final TimeProvider timeProvider;
LogHelper(Sink sink, TimeProvider timeProvider) {
this.sink = sink;
this.timeProvider = timeProvider;
}
/**
* Logs the request header.
* Binary logging equivalent of logClientHeader.
*/
void logRequestHeader(
long seqId,
String serviceName,
String methodName,
String authority,
@Nullable Duration timeout,
Metadata metadata,
GrpcLogRecord.EventLogger eventLogger,
String rpcId,
// null on client side
@Nullable SocketAddress peerAddress) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId");
checkArgument(
peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_SERVER,
"peerAddress can only be specified by server");
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair = createMetadataProto(metadata);
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setAuthority(authority)
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setMetadata(pair.proto)
.setPayloadSize(pair.size)
.setRpcId(rpcId);
if (timeout != null) {
logEntryBuilder.setTimeout(timeout);
}
if (peerAddress != null) {
logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress));
}
sink.write(logEntryBuilder.build());
}
/**
* Logs the reponse header.
* Binary logging equivalent of logServerHeader.
*/
void logResponseHeader(
long seqId,
String serviceName,
String methodName,
Metadata metadata,
GrpcLogRecord.EventLogger eventLogger,
String rpcId,
@Nullable SocketAddress peerAddress) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId");
// Logging peer address only on the first incoming event. On server side, peer address will
// of logging request header
checkArgument(
peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT,
"peerAddress can only be specified for client");
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair = createMetadataProto(metadata);
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_RESPONSE_HEADER)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setMetadata(pair.proto)
.setPayloadSize(pair.size)
.setRpcId(rpcId);
if (peerAddress != null) {
logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress));
}
sink.write(logEntryBuilder.build());
}
/**
* Logs the server trailer.
*/
void logTrailer(
long seqId,
String serviceName,
String methodName,
Status status,
Metadata metadata,
GrpcLogRecord.EventLogger eventLogger,
String rpcId,
@Nullable SocketAddress peerAddress) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(status, "status");
checkNotNull(rpcId, "rpcId");
checkArgument(
peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT,
"peerAddress can only be specified for client");
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair = createMetadataProto(metadata);
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_TRAILER)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setMetadata(pair.proto)
.setPayloadSize(pair.size)
.setStatusCode(status.getCode().value())
.setRpcId(rpcId);
String statusDescription = status.getDescription();
if (statusDescription != null) {
logEntryBuilder.setStatusMessage(statusDescription);
}
byte[] statusDetailBytes = metadata.get(STATUS_DETAILS_KEY);
if (statusDetailBytes != null) {
logEntryBuilder.setStatusDetails(ByteString.copyFrom(statusDetailBytes));
}
if (peerAddress != null) {
logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress));
}
sink.write(logEntryBuilder.build());
}
/**
* Logs the RPC message.
*/
<T> void logRpcMessage(
long seqId,
String serviceName,
String methodName,
GrpcLogRecord.EventType eventType,
T message,
GrpcLogRecord.EventLogger eventLogger,
String rpcId) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId");
checkArgument(
eventType == EventType.GRPC_CALL_REQUEST_MESSAGE
|| eventType == EventType.GRPC_CALL_RESPONSE_MESSAGE,
"event type must correspond to client message or server message");
checkNotNull(message, "message");
// TODO(dnvindhya): Convert message to bystestring and also log the message
// byte[] messageArray = (byte[])message;
// int messageLength = messageArray.length;
// ByteString messageData =
// ByteString.copyFrom((byte[]) message, 0, ((byte[]) message).length);
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(eventType)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setRpcId(rpcId);
sink.write(logEntryBuilder.build());
}
/**
* Logs half close.
*/
void logHalfClose(
long seqId,
String serviceName,
String methodName,
GrpcLogRecord.EventLogger eventLogger,
String rpcId) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId");
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_HALF_CLOSE)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setRpcId(rpcId);
sink.write(logEntryBuilder.build());
}
/**
* Logs cancellation.
*/
void logCancel(
long seqId,
String serviceName,
String methodName,
GrpcLogRecord.EventLogger eventLogger,
String rpcId) {
checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId");
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_CANCEL)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setRpcId(rpcId);
sink.write(logEntryBuilder.build());
}
GrpcLogRecord.Builder createTimestamp() {
long nanos = timeProvider.currentTimeNanos();
return GrpcLogRecord.newBuilder().setTimestamp(Timestamps.fromNanos(nanos));
}
static final class PayloadBuilder<T> {
T proto;
int size;
private PayloadBuilder(T proto, int size) {
this.proto = proto;
this.size = size;
}
}
// TODO(dnvindhya): Create a unit test for the metadata conversion
static PayloadBuilder<GrpcLogRecord.Metadata.Builder> createMetadataProto(Metadata metadata) {
checkNotNull(metadata, "metadata");
GrpcLogRecord.Metadata.Builder metadataBuilder = GrpcLogRecord.Metadata.newBuilder();
// This code is tightly coupled with io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata's
// implementation
byte[][] serialized = InternalMetadata.serialize(metadata);
int totalMetadataBytes = 0;
if (serialized != null) {
int singleMetadataEntryBytes = 0;
// Calculate bytes for each GrpcLogRecord.Metadata.MetadataEntry
for (int i = 0; i < serialized.length; i += 2) {
String key = new String(serialized[i], Charsets.UTF_8);
byte[] value = serialized[i + 1];
singleMetadataEntryBytes = totalMetadataBytes + key.length() + value.length;
metadataBuilder.addEntryBuilder()
.setKey(key)
.setValue(ByteString.copyFrom(value));
totalMetadataBytes = singleMetadataEntryBytes;
}
}
return new PayloadBuilder<>(metadataBuilder, totalMetadataBytes);
}
static Address socketAddressToProto(SocketAddress address) {
checkNotNull(address, "address");
Address.Builder builder = Address.newBuilder();
if (address instanceof InetSocketAddress) {
InetAddress inetAddress = ((InetSocketAddress) address).getAddress();
if (inetAddress instanceof Inet4Address) {
builder.setType(Address.Type.TYPE_IPV4)
.setAddress(InetAddressUtil.toAddrString(inetAddress));
} else if (inetAddress instanceof Inet6Address) {
builder.setType(Address.Type.TYPE_IPV6)
.setAddress(InetAddressUtil.toAddrString(inetAddress));
} else {
logger.log(Level.SEVERE, "unknown type of InetSocketAddress: {}", address);
builder.setAddress(address.toString());
}
builder.setIpPort(((InetSocketAddress) address).getPort());
} else if (address.getClass().getName().equals("io.netty.channel.unix.DomainSocketAddress")) {
// To avoid a compile time dependency on grpc-netty, we check against the
// runtime class name.
builder.setType(Address.Type.TYPE_UNIX)
.setAddress(address.toString());
} else {
builder.setType(Address.Type.TYPE_UNKNOWN).setAddress(address.toString());
}
return builder.build();
}
/**
* Retrieves socket address.
*/
static SocketAddress getPeerAddress(Attributes streamAttributes) {
return streamAttributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
}
/**
* Checks deadline for timeout.
*/
static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
if (deadline0 == null) {
return deadline1;
}
if (deadline1 == null) {
return deadline0;
}
return deadline0.minimum(deadline1);
}
// TODO (dnvindhya) : Implement service and method name filtering
// Add unit tests for the method as part of filtering implementation
boolean isMethodToBeLogged(String fullMethodName) {
return true;
}
}

View File

@ -0,0 +1,138 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.observability.logging;
import com.google.cloud.MonitoredResource;
import com.google.cloud.logging.LogEntry;
import com.google.cloud.logging.Logging;
import com.google.cloud.logging.LoggingOptions;
import com.google.cloud.logging.Payload.JsonPayload;
import com.google.cloud.logging.Severity;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.protobuf.util.JsonFormat;
import io.grpc.internal.JsonParser;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Sink for Google Cloud Logging.
*/
public class GcpLogSink implements Sink {
private final Logger logger = Logger.getLogger(GcpLogSink.class.getName());
// TODO (dnvindhya): Make cloud logging service a configurable value
private static final String SERVICE_TO_EXCLUDE = "google.logging.v2.LoggingServiceV2";
private static final String DEFAULT_LOG_NAME = "grpc";
private final Logging gcpLoggingClient;
private static Logging createLoggingClient(String projectId) {
LoggingOptions.Builder builder = LoggingOptions.newBuilder();
if (!Strings.isNullOrEmpty(projectId)) {
builder.setProjectId(projectId);
}
return builder.build().getService();
}
/**
* Retrieves a single instance of GcpLogSink.
*
* @param destinationProjectId cloud project id to write logs
*/
public GcpLogSink(String destinationProjectId) {
this(createLoggingClient(destinationProjectId));
}
@VisibleForTesting
GcpLogSink(Logging client) {
this.gcpLoggingClient = client;
}
@Override
public void write(GrpcLogRecord logProto) {
if (gcpLoggingClient == null) {
logger.log(Level.SEVERE, "Attempt to write after GcpLogSink is closed.");
return;
}
if (SERVICE_TO_EXCLUDE.equals(logProto.getServiceName())) {
return;
}
try {
GrpcLogRecord.EventType event = logProto.getEventType();
Severity logEntrySeverity = getCloudLoggingLevel(logProto.getLogLevel());
// TODO(vindhyan): make sure all (int, long) values are not displayed as double
LogEntry grpcLogEntry =
LogEntry.newBuilder(JsonPayload.of(protoToMapConverter(logProto)))
.setSeverity(logEntrySeverity)
.setLogName(DEFAULT_LOG_NAME)
.setResource(MonitoredResource.newBuilder("global").build())
.build();
synchronized (this) {
logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", event);
gcpLoggingClient.write(Collections.singleton(grpcLogEntry));
}
} catch (Exception e) {
logger.log(Level.SEVERE, "Caught exception while writing to Cloud Logging", e);
}
}
@SuppressWarnings("unchecked")
private Map<String, Object> protoToMapConverter(GrpcLogRecord logProto)
throws IOException {
JsonFormat.Printer printer = JsonFormat.printer().preservingProtoFieldNames();
String recordJson = printer.print(logProto);
return (Map<String, Object>) JsonParser.parse(recordJson);
}
private Severity getCloudLoggingLevel(GrpcLogRecord.LogLevel recordLevel) {
switch (recordLevel.getNumber()) {
case 1: // GrpcLogRecord.LogLevel.LOG_LEVEL_TRACE
case 2: // GrpcLogRecord.LogLevel.LOG_LEVEL_DEBUG
return Severity.DEBUG;
case 3: // GrpcLogRecord.LogLevel.LOG_LEVEL_INFO
return Severity.INFO;
case 4: // GrpcLogRecord.LogLevel.LOG_LEVEL_WARN
return Severity.WARNING;
case 5: // GrpcLogRecord.LogLevel.LOG_LEVEL_ERROR
return Severity.ERROR;
case 6: // GrpcLogRecord.LogLevel.LOG_LEVEL_CRITICAL
return Severity.CRITICAL;
default:
return Severity.DEFAULT;
}
}
/**
* Closes Cloud Logging Client.
*/
@Override
public synchronized void close() {
if (gcpLoggingClient == null) {
logger.log(Level.WARNING, "Attempt to close after GcpLogSink is closed.");
return;
}
try {
gcpLoggingClient.close();
} catch (Exception e) {
logger.log(Level.SEVERE, "Caught exception while closing", e);
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.observability.logging;
import io.grpc.ExperimentalApi;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
/**
* Sink for GCP observability.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869")
public interface Sink {
/**
* Writes the {@code message} to the destination.
*/
void write(GrpcLogRecord message);
/**
* Closes the sink.
*/
void close();
}

View File

@ -69,7 +69,7 @@ message GrpcLogRecord {
// Each call may have several log entries. They will all have the same rpc_id.
// Nothing is guaranteed about their value other than they are unique across
// different RPCs in the same gRPC process.
uint64 rpc_id = 2;
string rpc_id = 2;
EventType event_type = 3; // one of the above EventType enum
EventLogger event_logger = 4; // one of the above EventLogger enum

View File

@ -35,6 +35,8 @@ import io.grpc.ManagedChannelProvider;
import io.grpc.MethodDescriptor;
import io.grpc.TlsChannelCredentials;
import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;
import io.grpc.testing.TestMethodDescriptors;
import org.junit.Rule;
import org.junit.Test;
@ -46,6 +48,7 @@ import org.mockito.junit.MockitoRule;
@RunWith(JUnit4.class)
public class LoggingChannelProviderTest {
@Rule
public final MockitoRule mocks = MockitoJUnit.rule();
@ -55,10 +58,12 @@ public class LoggingChannelProviderTest {
public void initTwiceCausesException() {
ManagedChannelProvider prevProvider = ManagedChannelProvider.provider();
assertThat(prevProvider).isNotInstanceOf(LoggingChannelProvider.class);
LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl());
Sink mockSink = mock(GcpLogSink.class);
LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl(mockSink));
assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class);
try {
LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl());
LoggingChannelProvider.init(
new InternalLoggingChannelInterceptor.FactoryImpl(mockSink));
fail("should have failed for calling init() again");
} catch (IllegalStateException e) {
assertThat(e).hasMessageThat().contains("LoggingChannelProvider already initialized!");

View File

@ -38,7 +38,7 @@ public class ObservabilityTest {
Observability.grpcFinish();
try {
Observability.grpcFinish();
fail("should have failed for calling grpcFinit() on uninitialized");
fail("should have failed for calling grpcFinish() on uninitialized");
} catch (IllegalStateException e) {
assertThat(e).hasMessageThat().contains("Observability not initialized!");
}

View File

@ -0,0 +1,416 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.observability.interceptors;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.observability.interceptors.LogHelperTest.BYTEARRAY_MARSHALLER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.internal.NoopClientCall;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.observabilitylog.v1.GrpcLogRecord.MetadataEntry;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
/**
* Tests for {@link InternalLoggingChannelInterceptor}.
*/
@RunWith(JUnit4.class)
public class InternalLoggingChannelInterceptorTest {
@Rule
public final MockitoRule mockito = MockitoJUnit.rule();
private static final Charset US_ASCII = Charset.forName("US-ASCII");
private InternalLoggingChannelInterceptor.Factory factory;
private AtomicReference<ClientCall.Listener<byte[]>> interceptedListener;
private AtomicReference<Metadata> actualClientInitial;
private AtomicReference<Object> actualRequest;
private SettableFuture<Void> halfCloseCalled;
private SettableFuture<Void> cancelCalled;
private SocketAddress peer;
private final Sink mockSink = mock(GcpLogSink.class);
@Before
public void setup() throws Exception {
factory = new InternalLoggingChannelInterceptor.FactoryImpl(mockSink);
interceptedListener = new AtomicReference<>();
actualClientInitial = new AtomicReference<>();
actualRequest = new AtomicReference<>();
halfCloseCalled = SettableFuture.create();
cancelCalled = SettableFuture.create();
peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234);
}
@Test
public void internalLoggingChannelInterceptor() throws Exception {
Channel channel = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
return new NoopClientCall<RequestT, ResponseT>() {
@Override
@SuppressWarnings("unchecked")
public void start(Listener<ResponseT> responseListener, Metadata headers) {
interceptedListener.set((Listener<byte[]>) responseListener);
actualClientInitial.set(headers);
}
@Override
public void sendMessage(RequestT message) {
actualRequest.set(message);
}
@Override
public void cancel(String message, Throwable cause) {
cancelCalled.set(null);
}
@Override
public void halfClose() {
halfCloseCalled.set(null);
}
@Override
public Attributes getAttributes() {
return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build();
}
};
}
@Override
public String authority() {
return "the-authority";
}
};
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
ClientCall<byte[], byte[]> interceptedLoggingCall =
factory.create()
.interceptCall(method,
CallOptions.DEFAULT,
channel);
// send client header
{
EventType expectedRequestHeaderEvent = EventType.GRPC_CALL_REQUEST_HEADER;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
Metadata clientInitial = new Metadata();
String dataA = "aaaaaaaaa";
String dataB = "bbbbbbbbb";
Metadata.Key<String> keyA =
Metadata.Key.of("a", Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<String> keyB =
Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER);
MetadataEntry entryA =
MetadataEntry
.newBuilder()
.setKey(keyA.name())
.setValue(ByteString.copyFrom(dataA.getBytes(US_ASCII)))
.build();
MetadataEntry entryB =
MetadataEntry
.newBuilder()
.setKey(keyB.name())
.setValue(ByteString.copyFrom(dataB.getBytes(US_ASCII)))
.build();
clientInitial.put(keyA, dataA);
clientInitial.put(keyB, dataB);
GrpcLogRecord.Metadata expectedMetadata = GrpcLogRecord.Metadata
.newBuilder()
.addEntry(entryA)
.addEntry(entryB)
.build();
interceptedLoggingCall.start(mockListener, clientInitial);
verify(mockSink).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedRequestHeaderEvent);
assertEquals(captor.getValue().getSequenceId(), 1L);
assertEquals(captor.getValue().getServiceName(), "service");
assertEquals(captor.getValue().getMethodName(), "method");
assertEquals(captor.getValue().getAuthority(), "the-authority");
assertEquals(captor.getValue().getMetadata(), expectedMetadata);
verifyNoMoreInteractions(mockSink);
assertSame(clientInitial, actualClientInitial.get());
}
// TODO(dnvindhya) : Add a helper method to verify other fields of GrpcLogRecord for all events
// receive server header
{
EventType expectedResponseHeaderEvent = EventType.GRPC_CALL_RESPONSE_HEADER;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
Metadata serverInitial = new Metadata();
interceptedListener.get().onHeaders(serverInitial);
verify(mockSink, times(2)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedResponseHeaderEvent);
verifyNoMoreInteractions(mockSink);
verify(mockListener).onHeaders(same(serverInitial));
}
// send client message
{
EventType expectedRequestMessageEvent = EventType.GRPC_CALL_REQUEST_MESSAGE;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
byte[] request = "this is a request".getBytes(US_ASCII);
interceptedLoggingCall.sendMessage(request);
verify(mockSink, times(3)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedRequestMessageEvent);
verifyNoMoreInteractions(mockSink);
assertSame(request, actualRequest.get());
}
// client half close
{
EventType expectedHalfCloseEvent = EventType.GRPC_CALL_HALF_CLOSE;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
interceptedLoggingCall.halfClose();
verify(mockSink, times(4)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedHalfCloseEvent);
halfCloseCalled.get(1, TimeUnit.SECONDS);
verifyNoMoreInteractions(mockSink);
}
// receive server message
{
EventType expectedResponseMessageEvent = EventType.GRPC_CALL_RESPONSE_MESSAGE;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedListener.get().onMessage(response);
verify(mockSink, times(5)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedResponseMessageEvent);
verifyNoMoreInteractions(mockSink);
verify(mockListener).onMessage(same(response));
}
// receive trailer
{
EventType expectedTrailerEvent = EventType.GRPC_CALL_TRAILER;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
Status status = Status.INTERNAL.withDescription("trailer description");
Metadata trailers = new Metadata();
interceptedListener.get().onClose(status, trailers);
verify(mockSink, times(6)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedTrailerEvent);
verifyNoMoreInteractions(mockSink);
verify(mockListener).onClose(same(status), same(trailers));
}
// cancel
{
EventType expectedCancelEvent = EventType.GRPC_CALL_CANCEL;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
interceptedLoggingCall.cancel(null, null);
verify(mockSink, times(7)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedCancelEvent);
cancelCalled.get(1, TimeUnit.SECONDS);
}
}
@Test
public void clientDeadLineLogged_deadlineSetViaCallOption() {
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
ClientCall<byte[], byte[]> interceptedLoggingCall =
factory.create()
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<>();
}
@Override
public String authority() {
return "the-authority";
}
});
interceptedLoggingCall.start(mockListener, new Metadata());
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
verify(mockSink, times(1)).write(captor.capture());
Duration timeout = captor.getValue().getTimeout();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@Test
public void clientDeadlineLogged_deadlineSetViaContext() throws Exception {
final SettableFuture<ClientCall<byte[], byte[]>> callFuture = SettableFuture.create();
Context.current()
.withDeadline(
Deadline.after(1, TimeUnit.SECONDS), Executors.newSingleThreadScheduledExecutor())
.run(new Runnable() {
@Override
public void run() {
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
callFuture.set(
factory.create()
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<>();
}
@Override
public String authority() {
return "the-authority";
}
}));
}
});
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
callFuture.get().start(mockListener, new Metadata());
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
verify(mockSink, times(1)).write(captor.capture());
Duration timeout = captor.getValue().getTimeout();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@Test
public void clientDeadlineLogged_deadlineSetViaContextAndCallOptions() throws Exception {
Deadline contextDeadline = Deadline.after(10, TimeUnit.SECONDS);
Deadline callOptionsDeadline = CallOptions.DEFAULT
.withDeadlineAfter(15, TimeUnit.SECONDS).getDeadline();
final SettableFuture<ClientCall<byte[], byte[]>> callFuture = SettableFuture.create();
Context.current()
.withDeadline(
contextDeadline, Executors.newSingleThreadScheduledExecutor())
.run(new Runnable() {
@Override
public void run() {
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
callFuture.set(
factory.create()
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(15, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<>();
}
@Override
public String authority() {
return "the-authority";
}
}));
}
});
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
callFuture.get().start(mockListener, new Metadata());
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
verify(mockSink, times(1)).write(captor.capture());
Duration timeout = captor.getValue().getTimeout();
assertThat(LogHelper.min(contextDeadline, callOptionsDeadline))
.isSameInstanceAs(contextDeadline);
assertThat(TimeUnit.SECONDS.toNanos(10) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
}

View File

@ -0,0 +1,618 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.observability.interceptors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Durations;
import io.grpc.Attributes;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.Status;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.interceptors.LogHelper.PayloadBuilder;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.Address;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.observabilitylog.v1.GrpcLogRecord.LogLevel;
import io.grpc.observabilitylog.v1.GrpcLogRecord.MetadataEntry;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for {@link LogHelper}.
*/
@RunWith(JUnit4.class)
public class LogHelperTest {
private static final Charset US_ASCII = Charset.forName("US-ASCII");
public static final Marshaller<byte[]> BYTEARRAY_MARSHALLER = new ByteArrayMarshaller();
private static final String DATA_A = "aaaaaaaaa";
private static final String DATA_B = "bbbbbbbbb";
private static final String DATA_C = "ccccccccc";
private static final Metadata.Key<String> KEY_A =
Metadata.Key.of("a", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> KEY_B =
Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> KEY_C =
Metadata.Key.of("c", Metadata.ASCII_STRING_MARSHALLER);
private static final MetadataEntry ENTRY_A =
MetadataEntry
.newBuilder()
.setKey(KEY_A.name())
.setValue(ByteString.copyFrom(DATA_A.getBytes(US_ASCII)))
.build();
private static final MetadataEntry ENTRY_B =
MetadataEntry
.newBuilder()
.setKey(KEY_B.name())
.setValue(ByteString.copyFrom(DATA_B.getBytes(US_ASCII)))
.build();
private static final MetadataEntry ENTRY_C =
MetadataEntry
.newBuilder()
.setKey(KEY_C.name())
.setValue(ByteString.copyFrom(DATA_C.getBytes(US_ASCII)))
.build();
private final Metadata nonEmptyMetadata = new Metadata();
private final int nonEmptyMetadataSize = 30;
private final Sink sink = mock(GcpLogSink.class);
private final Timestamp timestamp
= Timestamp.newBuilder().setSeconds(9876).setNanos(54321).build();
private final TimeProvider timeProvider = new TimeProvider() {
@Override
public long currentTimeNanos() {
return TimeUnit.SECONDS.toNanos(9876) + 54321;
}
};
private final LogHelper logHelper =
new LogHelper(
sink,
timeProvider);
private final byte[] message = new byte[100];
@Before
public void setUp() throws Exception {
nonEmptyMetadata.put(KEY_A, DATA_A);
nonEmptyMetadata.put(KEY_B, DATA_B);
nonEmptyMetadata.put(KEY_C, DATA_C);
}
@Test
public void socketToProto_ipv4() throws Exception {
InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
assertEquals(
Address
.newBuilder()
.setType(Address.Type.TYPE_IPV4)
.setAddress("127.0.0.1")
.setIpPort(12345)
.build(),
LogHelper.socketAddressToProto(socketAddress));
}
@Test
public void socketToProto_ipv6() throws Exception {
// this is a ipv6 link local address
InetAddress address = InetAddress.getByName("2001:db8:0:0:0:0:2:1");
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
assertEquals(
Address
.newBuilder()
.setType(Address.Type.TYPE_IPV6)
.setAddress("2001:db8::2:1") // RFC 5952 section 4: ipv6 canonical form required
.setIpPort(12345)
.build(),
LogHelper.socketAddressToProto(socketAddress));
}
@Test
public void socketToProto_unknown() throws Exception {
SocketAddress unknownSocket = new SocketAddress() {
@Override
public String toString() {
return "some-socket-address";
}
};
assertEquals(
Address.newBuilder()
.setType(Address.Type.TYPE_UNKNOWN)
.setAddress("some-socket-address")
.build(),
LogHelper.socketAddressToProto(unknownSocket));
}
@Test
public void metadataToProto_empty() throws Exception {
assertEquals(
GrpcLogRecord.newBuilder()
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
.setMetadata(
GrpcLogRecord.Metadata.getDefaultInstance())
.build(),
metadataToProtoTestHelper(
EventType.GRPC_CALL_REQUEST_HEADER, new Metadata()));
}
@Test
public void metadataToProto() throws Exception {
assertEquals(
GrpcLogRecord.newBuilder()
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
.setMetadata(
GrpcLogRecord.Metadata
.newBuilder()
.addEntry(ENTRY_A)
.addEntry(ENTRY_B)
.addEntry(ENTRY_C)
.build())
.setPayloadSize(nonEmptyMetadataSize)
.build(),
metadataToProtoTestHelper(
EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata));
}
@Test
public void logRequestHeader() throws Exception {
long seqId = 1;
String serviceName = "service";
String methodName = "method";
String authority = "authority";
Duration timeout = Durations.fromMillis(1234);
String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345;
InetSocketAddress peerAddress = new InetSocketAddress(address, port);
GrpcLogRecord.Builder builder =
metadataToProtoTestHelper(EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata)
.toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
.setEventLogger(EventLogger.LOGGER_CLIENT)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setRpcId(rpcId);
builder.setAuthority(authority)
.setTimeout(timeout);
GrpcLogRecord base = builder.build();
// logged on client
{
logHelper.logRequestHeader(
seqId,
serviceName,
methodName,
authority,
timeout,
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
null);
verify(sink).write(base);
}
// logged on server
{
logHelper.logRequestHeader(
seqId,
serviceName,
methodName,
authority,
timeout,
nonEmptyMetadata,
EventLogger.LOGGER_SERVER,
rpcId,
peerAddress);
verify(sink).write(
base.toBuilder()
.setPeerAddress(LogHelper.socketAddressToProto(peerAddress))
.setEventLogger(EventLogger.LOGGER_SERVER)
.build());
}
// timeout is null
{
logHelper.logRequestHeader(
seqId,
serviceName,
methodName,
authority,
null,
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
null);
verify(sink).write(
base.toBuilder()
.clearTimeout()
.build());
}
// peerAddress is not null (error on client)
try {
logHelper.logRequestHeader(
seqId,
serviceName,
methodName,
authority,
timeout,
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
peerAddress);
fail();
} catch (IllegalArgumentException expected) {
assertThat(expected).hasMessageThat().contains("peerAddress can only be specified by server");
}
}
@Test
public void logResponseHeader() throws Exception {
long seqId = 1;
String serviceName = "service";
String methodName = "method";
String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345;
InetSocketAddress peerAddress = new InetSocketAddress(address, port);
GrpcLogRecord.Builder builder =
metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata)
.toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_RESPONSE_HEADER)
.setEventLogger(EventLogger.LOGGER_CLIENT)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setRpcId(rpcId);
builder.setPeerAddress(LogHelper.socketAddressToProto(peerAddress));
GrpcLogRecord base = builder.build();
// logged on client
{
logHelper.logResponseHeader(
seqId,
serviceName,
methodName,
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
peerAddress);
verify(sink).write(base);
}
// logged on server
{
logHelper.logResponseHeader(
seqId,
serviceName,
methodName,
nonEmptyMetadata,
EventLogger.LOGGER_SERVER,
rpcId,
null);
verify(sink).write(
base.toBuilder()
.setEventLogger(EventLogger.LOGGER_SERVER)
.clearPeerAddress()
.build());
}
// peerAddress is not null (error on server)
try {
logHelper.logResponseHeader(
seqId,
serviceName,
methodName,
nonEmptyMetadata,
EventLogger.LOGGER_SERVER,
rpcId,
peerAddress);
fail();
} catch (IllegalArgumentException expected) {
assertThat(expected).hasMessageThat()
.contains("peerAddress can only be specified for client");
}
}
@Test
public void logTrailer() throws Exception {
long seqId = 1;
String serviceName = "service";
String methodName = "method";
String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345;
InetSocketAddress peerAddress = new InetSocketAddress(address, port);
Status statusDescription = Status.INTERNAL.withDescription("test description");
GrpcLogRecord.Builder builder =
metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata)
.toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_TRAILER)
.setEventLogger(EventLogger.LOGGER_CLIENT)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setStatusCode(Status.INTERNAL.getCode().value())
.setStatusMessage("test description")
.setRpcId(rpcId);
builder.setPeerAddress(LogHelper.socketAddressToProto(peerAddress));
GrpcLogRecord base = builder.build();
// logged on client
{
logHelper.logTrailer(
seqId,
serviceName,
methodName,
statusDescription,
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
peerAddress);
verify(sink).write(base);
}
// logged on server
{
logHelper.logTrailer(
seqId,
serviceName,
methodName,
statusDescription,
nonEmptyMetadata,
EventLogger.LOGGER_SERVER,
rpcId,
null);
verify(sink).write(
base.toBuilder()
.clearPeerAddress()
.setEventLogger(EventLogger.LOGGER_SERVER)
.build());
}
// peer address is null
{
logHelper.logTrailer(
seqId,
serviceName,
methodName,
statusDescription,
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
null);
verify(sink).write(
base.toBuilder()
.clearPeerAddress()
.build());
}
// status description is null
{
logHelper.logTrailer(
seqId,
serviceName,
methodName,
statusDescription.getCode().toStatus(),
nonEmptyMetadata,
EventLogger.LOGGER_CLIENT,
rpcId,
peerAddress);
verify(sink).write(
base.toBuilder()
.clearStatusMessage()
.build());
}
}
@Test
public void logRpcMessage() throws Exception {
long seqId = 1;
String serviceName = "service";
String methodName = "method";
String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_REQUEST_MESSAGE)
.setEventLogger(EventLogger.LOGGER_CLIENT)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setRpcId(rpcId);
GrpcLogRecord base = builder.build();
// request message
{
logHelper.logRpcMessage(
seqId,
serviceName,
methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE,
message,
EventLogger.LOGGER_CLIENT,
rpcId);
verify(sink).write(base);
}
// response message, logged on client
{
logHelper.logRpcMessage(
seqId,
serviceName,
methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE,
message,
EventLogger.LOGGER_CLIENT,
rpcId);
verify(sink).write(
base.toBuilder()
.setEventType(EventType.GRPC_CALL_RESPONSE_MESSAGE)
.build());
}
// request message, logged on server
{
logHelper.logRpcMessage(
seqId,
serviceName,
methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE,
message,
EventLogger.LOGGER_SERVER,
rpcId);
verify(sink).write(
base.toBuilder()
.setEventLogger(EventLogger.LOGGER_SERVER)
.build());
}
// response message, logged on server
{
logHelper.logRpcMessage(
seqId,
serviceName,
methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE,
message,
EventLogger.LOGGER_SERVER,
rpcId);
verify(sink).write(
base.toBuilder()
.setEventType(EventType.GRPC_CALL_RESPONSE_MESSAGE)
.setEventLogger(EventLogger.LOGGER_SERVER)
.build());
}
}
@Test
public void getPeerAddressTest() throws Exception {
SocketAddress peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234);
assertNull(LogHelper.getPeerAddress(Attributes.EMPTY));
assertSame(
peer,
LogHelper.getPeerAddress(
Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build()));
}
private static GrpcLogRecord metadataToProtoTestHelper(
EventType type, Metadata metadata) {
GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder();
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair
= LogHelper.createMetadataProto(metadata);
builder.setMetadata(pair.proto);
builder.setPayloadSize(pair.size);
builder.setEventType(type);
return builder.build();
}
// Used only in tests
// Copied from internal
static final class ByteArrayMarshaller implements Marshaller<byte[]> {
@Override
public InputStream stream(byte[] value) {
return new ByteArrayInputStream(value);
}
@Override
public byte[] parse(InputStream stream) {
try {
return parseHelper(stream);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private byte[] parseHelper(InputStream stream) throws IOException {
try {
return IoUtils.toByteArray(stream);
} finally {
stream.close();
}
}
}
// Copied from internal
static final class IoUtils {
/** maximum buffer to be read is 16 KB. */
private static final int MAX_BUFFER_LENGTH = 16384;
/** Returns the byte array. */
public static byte[] toByteArray(InputStream in) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
copy(in, out);
return out.toByteArray();
}
/** Copies the data from input stream to output stream. */
public static long copy(InputStream from, OutputStream to) throws IOException {
// Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
checkNotNull(from);
checkNotNull(to);
byte[] buf = new byte[MAX_BUFFER_LENGTH];
long total = 0;
while (true) {
int r = from.read(buf);
if (r == -1) {
break;
}
to.write(buf, 0, r);
total += r;
}
return total;
}
}
}

View File

@ -0,0 +1,99 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.observability.logging;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyIterable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.cloud.logging.LogEntry;
import com.google.cloud.logging.Logging;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import java.util.Collection;
import java.util.Iterator;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
/**
* Tests for {@link io.grpc.observability.logging.GcpLogSink}.
*/
@RunWith(JUnit4.class)
public class GcpLogSinkTest {
@Rule
public final MockitoRule mockito = MockitoJUnit.rule();
private Logging mockLogging;
@Before
public void setUp() {
mockLogging = mock(Logging.class);
}
@Test
public void createSink() {
Sink mockSink = new GcpLogSink(mockLogging);
assertThat(mockSink).isInstanceOf(GcpLogSink.class);
}
@Test
@SuppressWarnings("unchecked")
public void verifyWrite() throws Exception {
Sink mockSink = new GcpLogSink(mockLogging);
GrpcLogRecord logProto = GrpcLogRecord.newBuilder()
.setRpcId("1234")
.build();
Struct expectedStructLogProto = Struct.newBuilder().putFields(
"rpc_id", Value.newBuilder().setStringValue("1234").build()
).build();
mockSink.write(logProto);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
verify(mockLogging, times(1)).write(logEntrySetCaptor.capture());
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
assertEquals(entry.getPayload().getData(), expectedStructLogProto);
}
verifyNoMoreInteractions(mockLogging);
}
@Test
public void verifyClose() throws Exception {
Sink mockSink = new GcpLogSink(mockLogging);
GrpcLogRecord logProto = GrpcLogRecord.newBuilder()
.setRpcId("1234")
.build();
mockSink.write(logProto);
verify(mockLogging, times(1)).write(anyIterable());
mockSink.close();
verify(mockLogging).close();
verifyNoMoreInteractions(mockLogging);
}
}