gcp-observability: update observability logging proto (#9608)

This commit is contained in:
DNVindhya 2022-10-17 22:47:54 -07:00 committed by GitHub
parent e16f1436a9
commit 43942623fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 596 additions and 699 deletions

View File

@ -35,7 +35,6 @@ import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.interceptors.LogHelper; import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.gcp.observability.logging.GcpLogSink; import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink; import io.grpc.gcp.observability.logging.Sink;
import io.grpc.internal.TimeProvider;
import io.opencensus.common.Duration; import io.opencensus.common.Duration;
import io.opencensus.contrib.grpc.metrics.RpcViewConstants; import io.opencensus.contrib.grpc.metrics.RpcViewConstants;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration; import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration;
@ -79,8 +78,8 @@ public final class GcpObservability implements AutoCloseable {
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance(); ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(), Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(), globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(),
observabilityConfig.getFlushMessageCount(), SERVICES_TO_EXCLUDE); SERVICES_TO_EXCLUDE);
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); LogHelper helper = new LogHelper(sink);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig); ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
instance = grpcInit(sink, observabilityConfig, instance = grpcInit(sink, observabilityConfig,
new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper), new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper),

View File

@ -36,9 +36,6 @@ public interface ObservabilityConfig {
/** Get destination project ID - where logs will go. */ /** Get destination project ID - where logs will go. */
String getDestinationProjectId(); String getDestinationProjectId();
/** Get message count threshold to flush - flush once message count is reached. */
Long getFlushMessageCount();
/** Get filters set for logging. */ /** Get filters set for logging. */
List<LogFilter> getLogFilters(); List<LogFilter> getLogFilters();

View File

@ -45,7 +45,6 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {
private boolean enableCloudMonitoring = false; private boolean enableCloudMonitoring = false;
private boolean enableCloudTracing = false; private boolean enableCloudTracing = false;
private String destinationProjectId = null; private String destinationProjectId = null;
private Long flushMessageCount = null;
private List<LogFilter> logFilters; private List<LogFilter> logFilters;
private List<EventType> eventTypes; private List<EventType> eventTypes;
private Sampler sampler; private Sampler sampler;
@ -87,7 +86,6 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {
enableCloudTracing = value; enableCloudTracing = value;
} }
destinationProjectId = JsonUtil.getString(config, "destination_project_id"); destinationProjectId = JsonUtil.getString(config, "destination_project_id");
flushMessageCount = JsonUtil.getNumberAsLong(config, "flush_message_count");
List<?> rawList = JsonUtil.getList(config, "log_filters"); List<?> rawList = JsonUtil.getList(config, "log_filters");
if (rawList != null) { if (rawList != null) {
List<Map<String, ?>> jsonLogFilters = JsonUtil.checkObjectList(rawList); List<Map<String, ?>> jsonLogFilters = JsonUtil.checkObjectList(rawList);
@ -102,7 +100,7 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {
List<String> jsonEventTypes = JsonUtil.checkStringList(rawList); List<String> jsonEventTypes = JsonUtil.checkStringList(rawList);
ImmutableList.Builder<EventType> eventTypesBuilder = new ImmutableList.Builder<>(); ImmutableList.Builder<EventType> eventTypesBuilder = new ImmutableList.Builder<>();
for (String jsonEventType : jsonEventTypes) { for (String jsonEventType : jsonEventTypes) {
eventTypesBuilder.add(convertEventType(jsonEventType)); eventTypesBuilder.add(EventType.valueOf(jsonEventType));
} }
this.eventTypes = eventTypesBuilder.build(); this.eventTypes = eventTypesBuilder.build();
} }
@ -136,28 +134,6 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {
} }
} }
private EventType convertEventType(String val) {
switch (val) {
case "GRPC_CALL_UNKNOWN":
return EventType.GRPC_CALL_UNKNOWN;
case "GRPC_CALL_REQUEST_HEADER":
return EventType.GRPC_CALL_REQUEST_HEADER;
case "GRPC_CALL_RESPONSE_HEADER":
return EventType.GRPC_CALL_RESPONSE_HEADER;
case "GRPC_CALL_REQUEST_MESSAGE":
return EventType.GRPC_CALL_REQUEST_MESSAGE;
case "GRPC_CALL_RESPONSE_MESSAGE":
return EventType.GRPC_CALL_RESPONSE_MESSAGE;
case "GRPC_CALL_TRAILER":
return EventType.GRPC_CALL_TRAILER;
case "GRPC_CALL_HALF_CLOSE":
return EventType.GRPC_CALL_HALF_CLOSE;
case "GRPC_CALL_CANCEL":
return EventType.GRPC_CALL_CANCEL;
default:
throw new IllegalArgumentException("Unknown event type value:" + val);
}
}
private LogFilter parseJsonLogFilter(Map<String, ?> logFilterMap) { private LogFilter parseJsonLogFilter(Map<String, ?> logFilterMap) {
return new LogFilter(JsonUtil.getString(logFilterMap, "pattern"), return new LogFilter(JsonUtil.getString(logFilterMap, "pattern"),
@ -185,11 +161,6 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {
return destinationProjectId; return destinationProjectId;
} }
@Override
public Long getFlushMessageCount() {
return flushMessageCount;
}
@Override @Override
public List<LogFilter> getLogFilters() { public List<LogFilter> getLogFilters() {
return logFilters; return logFilters;

View File

@ -85,7 +85,7 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
CallOptions callOptions, Channel next) { CallOptions callOptions, Channel next) {
final AtomicLong seq = new AtomicLong(1); final AtomicLong seq = new AtomicLong(1);
final String rpcId = UUID.randomUUID().toString(); final String callId = UUID.randomUUID().toString();
final String authority = next.authority(); final String authority = next.authority();
final String serviceName = method.getServiceName(); final String serviceName = method.getServiceName();
final String methodName = method.getBareMethodName(); final String methodName = method.getBareMethodName();
@ -105,15 +105,15 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
@Override @Override
public void start(Listener<RespT> responseListener, Metadata headers) { public void start(Listener<RespT> responseListener, Metadata headers) {
// Event: EventType.GRPC_CALL_REQUEST_HEADER // Event: EventType.CLIENT_HEADER
// The timeout should reflect the time remaining when the call is started, so compute // The timeout should reflect the time remaining when the call is started, so compute
// remaining time here. // remaining time here.
final Duration timeout = deadline == null ? null final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)); : Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) { if (filterHelper.isEventToBeLogged(EventType.CLIENT_HEADER)) {
try { try {
helper.logRequestHeader( helper.logClientHeader(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
@ -121,8 +121,8 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
timeout, timeout,
headers, headers,
maxHeaderBytes, maxHeaderBytes,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId, callId,
null); null);
} catch (Exception e) { } catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events. // Catching generic exceptions instead of specific ones for all the events.
@ -139,19 +139,20 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
new SimpleForwardingClientCallListener<RespT>(responseListener) { new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override @Override
public void onMessage(RespT message) { public void onMessage(RespT message) {
// Event: EventType.GRPC_CALL_RESPONSE_MESSAGE // Event: EventType.SERVER_MESSAGE
EventType responseMessageType = EventType.GRPC_CALL_RESPONSE_MESSAGE; EventType responseMessageType = EventType.SERVER_MESSAGE;
if (filterHelper.isEventToBeLogged(responseMessageType)) { if (filterHelper.isEventToBeLogged(responseMessageType)) {
try { try {
helper.logRpcMessage( helper.logRpcMessage(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
authority,
responseMessageType, responseMessageType,
message, message,
maxMessageBytes, maxMessageBytes,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId); callId);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e); logger.log(Level.SEVERE, "Unable to log response message", e);
} }
@ -161,17 +162,18 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
@Override @Override
public void onHeaders(Metadata headers) { public void onHeaders(Metadata headers) {
// Event: EventType.GRPC_CALL_RESPONSE_HEADER // Event: EventType.SERVER_HEADER
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) { if (filterHelper.isEventToBeLogged(EventType.SERVER_HEADER)) {
try { try {
helper.logResponseHeader( helper.logServerHeader(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
authority,
headers, headers,
maxHeaderBytes, maxHeaderBytes,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId, callId,
LogHelper.getPeerAddress(getAttributes())); LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e); logger.log(Level.SEVERE, "Unable to log response header", e);
@ -182,18 +184,19 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
@Override @Override
public void onClose(Status status, Metadata trailers) { public void onClose(Status status, Metadata trailers) {
// Event: EventType.GRPC_CALL_TRAILER // Event: EventType.SERVER_TRAILER
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) { if (filterHelper.isEventToBeLogged(EventType.SERVER_TRAILER)) {
try { try {
helper.logTrailer( helper.logTrailer(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
authority,
status, status,
trailers, trailers,
maxHeaderBytes, maxHeaderBytes,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId, callId,
LogHelper.getPeerAddress(getAttributes())); LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e); logger.log(Level.SEVERE, "Unable to log trailer", e);
@ -207,19 +210,20 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
@Override @Override
public void sendMessage(ReqT message) { public void sendMessage(ReqT message) {
// Event: EventType.GRPC_CALL_REQUEST_MESSAGE // Event: EventType.CLIENT_MESSAGE
EventType requestMessageType = EventType.GRPC_CALL_REQUEST_MESSAGE; EventType requestMessageType = EventType.CLIENT_MESSAGE;
if (filterHelper.isEventToBeLogged(requestMessageType)) { if (filterHelper.isEventToBeLogged(requestMessageType)) {
try { try {
helper.logRpcMessage( helper.logRpcMessage(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
authority,
requestMessageType, requestMessageType,
message, message,
maxMessageBytes, maxMessageBytes,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId); callId);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e); logger.log(Level.SEVERE, "Unable to log request message", e);
} }
@ -229,15 +233,16 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
@Override @Override
public void halfClose() { public void halfClose() {
// Event: EventType.GRPC_CALL_HALF_CLOSE // Event: EventType.CLIENT_HALF_CLOSE
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) { if (filterHelper.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)) {
try { try {
helper.logHalfClose( helper.logHalfClose(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
EventLogger.LOGGER_CLIENT, authority,
rpcId); EventLogger.CLIENT,
callId);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e); logger.log(Level.SEVERE, "Unable to log half close", e);
} }
@ -247,15 +252,16 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
@Override @Override
public void cancel(String message, Throwable cause) { public void cancel(String message, Throwable cause) {
// Event: EventType.GRPC_CALL_CANCEL // Event: EventType.CANCEL
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) { if (filterHelper.isEventToBeLogged(EventType.CANCEL)) {
try { try {
helper.logCancel( helper.logCancel(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
EventLogger.LOGGER_CLIENT, authority,
rpcId); EventLogger.CLIENT,
callId);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e); logger.log(Level.SEVERE, "Unable to log cancel", e);
} }

View File

@ -84,7 +84,7 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) { Metadata headers, ServerCallHandler<ReqT, RespT> next) {
final AtomicLong seq = new AtomicLong(1); final AtomicLong seq = new AtomicLong(1);
final String rpcId = UUID.randomUUID().toString(); final String callId = UUID.randomUUID().toString();
final String authority = call.getAuthority(); final String authority = call.getAuthority();
final String serviceName = call.getMethodDescriptor().getServiceName(); final String serviceName = call.getMethodDescriptor().getServiceName();
final String methodName = call.getMethodDescriptor().getBareMethodName(); final String methodName = call.getMethodDescriptor().getBareMethodName();
@ -101,10 +101,10 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
final int maxHeaderBytes = filterParams.headerBytes(); final int maxHeaderBytes = filterParams.headerBytes();
final int maxMessageBytes = filterParams.messageBytes(); final int maxMessageBytes = filterParams.messageBytes();
// Event: EventType.GRPC_CALL_REQUEST_HEADER // Event: EventType.CLIENT_HEADER
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) { if (filterHelper.isEventToBeLogged(EventType.CLIENT_HEADER)) {
try { try {
helper.logRequestHeader( helper.logClientHeader(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
@ -112,8 +112,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
timeout, timeout,
headers, headers,
maxHeaderBytes, maxHeaderBytes,
EventLogger.LOGGER_SERVER, EventLogger.SERVER,
rpcId, callId,
peerAddress); peerAddress);
} catch (Exception e) { } catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events. // Catching generic exceptions instead of specific ones for all the events.
@ -130,17 +130,18 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
new SimpleForwardingServerCall<ReqT, RespT>(call) { new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override @Override
public void sendHeaders(Metadata headers) { public void sendHeaders(Metadata headers) {
// Event: EventType.GRPC_CALL_RESPONSE_HEADER // Event: EventType.SERVER_HEADER
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) { if (filterHelper.isEventToBeLogged(EventType.SERVER_HEADER)) {
try { try {
helper.logResponseHeader( helper.logServerHeader(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
authority,
headers, headers,
maxHeaderBytes, maxHeaderBytes,
EventLogger.LOGGER_SERVER, EventLogger.SERVER,
rpcId, callId,
null); null);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e); logger.log(Level.SEVERE, "Unable to log response header", e);
@ -151,19 +152,20 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
@Override @Override
public void sendMessage(RespT message) { public void sendMessage(RespT message) {
// Event: EventType.GRPC_CALL_RESPONSE_MESSAGE // Event: EventType.SERVER_MESSAGE
EventType responseMessageType = EventType.GRPC_CALL_RESPONSE_MESSAGE; EventType responseMessageType = EventType.SERVER_MESSAGE;
if (filterHelper.isEventToBeLogged(responseMessageType)) { if (filterHelper.isEventToBeLogged(responseMessageType)) {
try { try {
helper.logRpcMessage( helper.logRpcMessage(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
authority,
responseMessageType, responseMessageType,
message, message,
maxMessageBytes, maxMessageBytes,
EventLogger.LOGGER_SERVER, EventLogger.SERVER,
rpcId); callId);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e); logger.log(Level.SEVERE, "Unable to log response message", e);
} }
@ -173,18 +175,19 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
@Override @Override
public void close(Status status, Metadata trailers) { public void close(Status status, Metadata trailers) {
// Event: EventType.GRPC_CALL_TRAILER // Event: EventType.SERVER_TRAILER
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) { if (filterHelper.isEventToBeLogged(EventType.SERVER_TRAILER)) {
try { try {
helper.logTrailer( helper.logTrailer(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
authority,
status, status,
trailers, trailers,
maxHeaderBytes, maxHeaderBytes,
EventLogger.LOGGER_SERVER, EventLogger.SERVER,
rpcId, callId,
null); null);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e); logger.log(Level.SEVERE, "Unable to log trailer", e);
@ -198,19 +201,20 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
return new SimpleForwardingServerCallListener<ReqT>(listener) { return new SimpleForwardingServerCallListener<ReqT>(listener) {
@Override @Override
public void onMessage(ReqT message) { public void onMessage(ReqT message) {
// Event: EventType.GRPC_CALL_REQUEST_MESSAGE // Event: EventType.CLIENT_MESSAGE
EventType requestMessageType = EventType.GRPC_CALL_REQUEST_MESSAGE; EventType requestMessageType = EventType.CLIENT_MESSAGE;
if (filterHelper.isEventToBeLogged(requestMessageType)) { if (filterHelper.isEventToBeLogged(requestMessageType)) {
try { try {
helper.logRpcMessage( helper.logRpcMessage(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
authority,
requestMessageType, requestMessageType,
message, message,
maxMessageBytes, maxMessageBytes,
EventLogger.LOGGER_SERVER, EventLogger.SERVER,
rpcId); callId);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e); logger.log(Level.SEVERE, "Unable to log request message", e);
} }
@ -220,15 +224,16 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
@Override @Override
public void onHalfClose() { public void onHalfClose() {
// Event: EventType.GRPC_CALL_HALF_CLOSE // Event: EventType.CLIENT_HALF_CLOSE
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) { if (filterHelper.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)) {
try { try {
helper.logHalfClose( helper.logHalfClose(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
EventLogger.LOGGER_SERVER, authority,
rpcId); EventLogger.SERVER,
callId);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e); logger.log(Level.SEVERE, "Unable to log half close", e);
} }
@ -238,15 +243,16 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
@Override @Override
public void onCancel() { public void onCancel() {
// Event: EventType.GRPC_CALL_CANCEL // Event: EventType.CANCEL
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) { if (filterHelper.isEventToBeLogged(EventType.CANCEL)) {
try { try {
helper.logCancel( helper.logCancel(
seq.getAndIncrement(), seq.getAndIncrement(),
serviceName, serviceName,
methodName, methodName,
EventLogger.LOGGER_SERVER, authority,
rpcId); EventLogger.SERVER,
callId);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e); logger.log(Level.SEVERE, "Unable to log cancel", e);
} }

View File

@ -18,32 +18,32 @@ package io.grpc.gcp.observability.interceptors;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.InternalMetadata.BASE64_ENCODING_OMIT_PADDING;
import com.google.common.base.Charsets; import com.google.common.base.Joiner;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.Duration; import com.google.protobuf.Duration;
import com.google.protobuf.util.Timestamps;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.Deadline; import io.grpc.Deadline;
import io.grpc.Grpc; import io.grpc.Grpc;
import io.grpc.Internal; import io.grpc.Internal;
import io.grpc.InternalMetadata;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.gcp.observability.logging.Sink; import io.grpc.gcp.observability.logging.Sink;
import io.grpc.internal.TimeProvider; import io.grpc.observabilitylog.v1.Address;
import io.grpc.observabilitylog.v1.GrpcLogRecord; import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.Address;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.observabilitylog.v1.GrpcLogRecord.LogLevel; import io.grpc.observabilitylog.v1.Payload;
import java.net.Inet4Address; import java.net.Inet4Address;
import java.net.Inet6Address; import java.net.Inet6Address;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -63,23 +63,20 @@ public class LogHelper {
Metadata.BINARY_BYTE_MARSHALLER); Metadata.BINARY_BYTE_MARSHALLER);
private final Sink sink; private final Sink sink;
private final TimeProvider timeProvider;
/** /**
* Creates a LogHelper instance. * Creates a LogHelper instance.
* @param sink sink
* *
* @param sink sink
* @param timeProvider timeprovider
*/ */
public LogHelper(Sink sink, TimeProvider timeProvider) { public LogHelper(Sink sink) {
this.sink = sink; this.sink = sink;
this.timeProvider = timeProvider;
} }
/** /**
* Logs the request header. Binary logging equivalent of logClientHeader. * Logs the request header. Binary logging equivalent of logClientHeader.
*/ */
void logRequestHeader( void logClientHeader(
long seqId, long seqId,
String serviceName, String serviceName,
String methodName, String methodName,
@ -88,35 +85,33 @@ public class LogHelper {
Metadata metadata, Metadata metadata,
int maxHeaderBytes, int maxHeaderBytes,
GrpcLogRecord.EventLogger eventLogger, GrpcLogRecord.EventLogger eventLogger,
String rpcId, String callId,
// null on client side // null on client side
@Nullable SocketAddress peerAddress) { @Nullable SocketAddress peerAddress) {
checkNotNull(serviceName, "serviceName"); checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName"); checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId"); checkNotNull(authority, "authority");
checkNotNull(callId, "callId");
checkArgument( checkArgument(
peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_SERVER, peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.SERVER,
"peerAddress can only be specified by server"); "peerAddress can only be specified by server");
PayloadBuilderHelper<Payload.Builder> pair =
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair =
createMetadataProto(metadata, maxHeaderBytes); createMetadataProto(metadata, maxHeaderBytes);
GrpcLogRecord.Builder logEntryBuilder = createTimestamp() if (timeout != null) {
pair.payloadBuilder.setTimeout(timeout);
}
GrpcLogRecord.Builder logEntryBuilder = GrpcLogRecord.newBuilder()
.setSequenceId(seqId) .setSequenceId(seqId)
.setServiceName(serviceName) .setServiceName(serviceName)
.setMethodName(methodName) .setMethodName(methodName)
.setAuthority(authority) .setAuthority(authority)
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER) .setType(EventType.CLIENT_HEADER)
.setEventLogger(eventLogger) .setLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG) .setPayload(pair.payloadBuilder)
.setMetadata(pair.payload)
.setPayloadSize(pair.size)
.setPayloadTruncated(pair.truncated) .setPayloadTruncated(pair.truncated)
.setRpcId(rpcId); .setCallId(callId);
if (timeout != null) {
logEntryBuilder.setTimeout(timeout);
}
if (peerAddress != null) { if (peerAddress != null) {
logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress)); logEntryBuilder.setPeer(socketAddressToProto(peerAddress));
} }
sink.write(logEntryBuilder.build()); sink.write(logEntryBuilder.build());
} }
@ -124,39 +119,40 @@ public class LogHelper {
/** /**
* Logs the response header. Binary logging equivalent of logServerHeader. * Logs the response header. Binary logging equivalent of logServerHeader.
*/ */
void logResponseHeader( void logServerHeader(
long seqId, long seqId,
String serviceName, String serviceName,
String methodName, String methodName,
String authority,
Metadata metadata, Metadata metadata,
int maxHeaderBytes, int maxHeaderBytes,
GrpcLogRecord.EventLogger eventLogger, GrpcLogRecord.EventLogger eventLogger,
String rpcId, String callId,
@Nullable SocketAddress peerAddress) { @Nullable SocketAddress peerAddress) {
checkNotNull(serviceName, "serviceName"); checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName"); checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId"); checkNotNull(authority, "authority");
checkNotNull(callId, "callId");
// Logging peer address only on the first incoming event. On server side, peer address will // Logging peer address only on the first incoming event. On server side, peer address will
// of logging request header // of logging request header
checkArgument( checkArgument(
peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT, peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.CLIENT,
"peerAddress can only be specified for client"); "peerAddress can only be specified for client");
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair = PayloadBuilderHelper<Payload.Builder> pair =
createMetadataProto(metadata, maxHeaderBytes); createMetadataProto(metadata, maxHeaderBytes);
GrpcLogRecord.Builder logEntryBuilder = createTimestamp() GrpcLogRecord.Builder logEntryBuilder = GrpcLogRecord.newBuilder()
.setSequenceId(seqId) .setSequenceId(seqId)
.setServiceName(serviceName) .setServiceName(serviceName)
.setMethodName(methodName) .setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_RESPONSE_HEADER) .setAuthority(authority)
.setEventLogger(eventLogger) .setType(EventType.SERVER_HEADER)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG) .setLogger(eventLogger)
.setMetadata(pair.payload) .setPayload(pair.payloadBuilder)
.setPayloadSize(pair.size)
.setPayloadTruncated(pair.truncated) .setPayloadTruncated(pair.truncated)
.setRpcId(rpcId); .setCallId(callId);
if (peerAddress != null) { if (peerAddress != null) {
logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress)); logEntryBuilder.setPeer(socketAddressToProto(peerAddress));
} }
sink.write(logEntryBuilder.build()); sink.write(logEntryBuilder.build());
} }
@ -168,44 +164,45 @@ public class LogHelper {
long seqId, long seqId,
String serviceName, String serviceName,
String methodName, String methodName,
String authority,
Status status, Status status,
Metadata metadata, Metadata metadata,
int maxHeaderBytes, int maxHeaderBytes,
GrpcLogRecord.EventLogger eventLogger, GrpcLogRecord.EventLogger eventLogger,
String rpcId, String callId,
@Nullable SocketAddress peerAddress) { @Nullable SocketAddress peerAddress) {
checkNotNull(serviceName, "serviceName"); checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName"); checkNotNull(methodName, "methodName");
checkNotNull(authority, "authority");
checkNotNull(status, "status"); checkNotNull(status, "status");
checkNotNull(rpcId, "rpcId"); checkNotNull(callId, "callId");
checkArgument( checkArgument(
peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT, peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.CLIENT,
"peerAddress can only be specified for client"); "peerAddress can only be specified for client");
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair = PayloadBuilderHelper<Payload.Builder> pair =
createMetadataProto(metadata, maxHeaderBytes); createMetadataProto(metadata, maxHeaderBytes);
GrpcLogRecord.Builder logEntryBuilder = createTimestamp() pair.payloadBuilder.setStatusCode(status.getCode().value());
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_TRAILER)
.setEventLogger(eventLogger)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setMetadata(pair.payload)
.setPayloadSize(pair.size)
.setPayloadTruncated(pair.truncated)
.setStatusCode(status.getCode().value())
.setRpcId(rpcId);
String statusDescription = status.getDescription(); String statusDescription = status.getDescription();
if (statusDescription != null) { if (statusDescription != null) {
logEntryBuilder.setStatusMessage(statusDescription); pair.payloadBuilder.setStatusMessage(statusDescription);
} }
byte[] statusDetailBytes = metadata.get(STATUS_DETAILS_KEY); byte[] statusDetailBytes = metadata.get(STATUS_DETAILS_KEY);
if (statusDetailBytes != null) { if (statusDetailBytes != null) {
logEntryBuilder.setStatusDetails(ByteString.copyFrom(statusDetailBytes)); pair.payloadBuilder.setStatusDetails(ByteString.copyFrom(statusDetailBytes));
} }
GrpcLogRecord.Builder logEntryBuilder = GrpcLogRecord.newBuilder()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setAuthority(authority)
.setType(EventType.SERVER_TRAILER)
.setLogger(eventLogger)
.setPayload(pair.payloadBuilder)
.setPayloadTruncated(pair.truncated)
.setCallId(callId);
if (peerAddress != null) { if (peerAddress != null) {
logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress)); logEntryBuilder.setPeer(socketAddressToProto(peerAddress));
} }
sink.write(logEntryBuilder.build()); sink.write(logEntryBuilder.build());
} }
@ -217,17 +214,19 @@ public class LogHelper {
long seqId, long seqId,
String serviceName, String serviceName,
String methodName, String methodName,
String authority,
EventType eventType, EventType eventType,
T message, T message,
int maxMessageBytes, int maxMessageBytes,
EventLogger eventLogger, EventLogger eventLogger,
String rpcId) { String callId) {
checkNotNull(serviceName, "serviceName"); checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName"); checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId"); checkNotNull(authority, "authority");
checkNotNull(callId, "callId");
checkArgument( checkArgument(
eventType == EventType.GRPC_CALL_REQUEST_MESSAGE eventType == EventType.CLIENT_MESSAGE
|| eventType == EventType.GRPC_CALL_RESPONSE_MESSAGE, || eventType == EventType.SERVER_MESSAGE,
"event type must correspond to client message or server message"); "event type must correspond to client message or server message");
checkNotNull(message, "message"); checkNotNull(message, "message");
@ -241,27 +240,23 @@ public class LogHelper {
} else if (message instanceof byte[]) { } else if (message instanceof byte[]) {
messageBytesArray = (byte[]) message; messageBytesArray = (byte[]) message;
} else { } else {
logger.log(Level.WARNING, "message is of UNKNOWN type, message and payload_size fields" logger.log(Level.WARNING, "message is of UNKNOWN type, message and payload_size fields "
+ "of GrpcLogRecord proto will not be logged"); + "of GrpcLogRecord proto will not be logged");
} }
PayloadBuilder<ByteString> pair = null; PayloadBuilderHelper<Payload.Builder> pair = null;
if (messageBytesArray != null) { if (messageBytesArray != null) {
pair = createMessageProto(messageBytesArray, maxMessageBytes); pair = createMessageProto(messageBytesArray, maxMessageBytes);
} }
GrpcLogRecord.Builder logEntryBuilder = GrpcLogRecord.newBuilder()
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId) .setSequenceId(seqId)
.setServiceName(serviceName) .setServiceName(serviceName)
.setMethodName(methodName) .setMethodName(methodName)
.setEventType(eventType) .setAuthority(authority)
.setEventLogger(eventLogger) .setType(eventType)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG) .setLogger(eventLogger)
.setRpcId(rpcId); .setCallId(callId);
if (pair != null && pair.size != 0) { if (pair != null) {
logEntryBuilder.setPayloadSize(pair.size); logEntryBuilder.setPayload(pair.payloadBuilder)
}
if (pair != null && pair.payload != null) {
logEntryBuilder.setMessage(pair.payload)
.setPayloadTruncated(pair.truncated); .setPayloadTruncated(pair.truncated);
} }
sink.write(logEntryBuilder.build()); sink.write(logEntryBuilder.build());
@ -274,20 +269,22 @@ public class LogHelper {
long seqId, long seqId,
String serviceName, String serviceName,
String methodName, String methodName,
String authority,
GrpcLogRecord.EventLogger eventLogger, GrpcLogRecord.EventLogger eventLogger,
String rpcId) { String callId) {
checkNotNull(serviceName, "serviceName"); checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName"); checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId"); checkNotNull(authority, "authority");
checkNotNull(callId, "callId");
GrpcLogRecord.Builder logEntryBuilder = createTimestamp() GrpcLogRecord.Builder logEntryBuilder = GrpcLogRecord.newBuilder()
.setSequenceId(seqId) .setSequenceId(seqId)
.setServiceName(serviceName) .setServiceName(serviceName)
.setMethodName(methodName) .setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_HALF_CLOSE) .setAuthority(authority)
.setEventLogger(eventLogger) .setType(EventType.CLIENT_HALF_CLOSE)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG) .setLogger(eventLogger)
.setRpcId(rpcId); .setCallId(callId);
sink.write(logEntryBuilder.build()); sink.write(logEntryBuilder.build());
} }
@ -298,28 +295,25 @@ public class LogHelper {
long seqId, long seqId,
String serviceName, String serviceName,
String methodName, String methodName,
String authority,
GrpcLogRecord.EventLogger eventLogger, GrpcLogRecord.EventLogger eventLogger,
String rpcId) { String callId) {
checkNotNull(serviceName, "serviceName"); checkNotNull(serviceName, "serviceName");
checkNotNull(methodName, "methodName"); checkNotNull(methodName, "methodName");
checkNotNull(rpcId, "rpcId"); checkNotNull(authority, "authority");
checkNotNull(callId, "callId");
GrpcLogRecord.Builder logEntryBuilder = createTimestamp() GrpcLogRecord.Builder logEntryBuilder = GrpcLogRecord.newBuilder()
.setSequenceId(seqId) .setSequenceId(seqId)
.setServiceName(serviceName) .setServiceName(serviceName)
.setMethodName(methodName) .setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_CANCEL) .setAuthority(authority)
.setEventLogger(eventLogger) .setType(EventType.CANCEL)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG) .setLogger(eventLogger)
.setRpcId(rpcId); .setCallId(callId);
sink.write(logEntryBuilder.build()); sink.write(logEntryBuilder.build());
} }
GrpcLogRecord.Builder createTimestamp() {
long nanos = timeProvider.currentTimeNanos();
return GrpcLogRecord.newBuilder().setTimestamp(Timestamps.fromNanos(nanos));
}
// TODO(DNVindhya): Evaluate if we need following clause for metadata logging in GcpObservability // TODO(DNVindhya): Evaluate if we need following clause for metadata logging in GcpObservability
// Leaving the implementation for now as is to have same behavior across Java and Go // Leaving the implementation for now as is to have same behavior across Java and Go
private static final Set<String> NEVER_INCLUDED_METADATA = new HashSet<>( private static final Set<String> NEVER_INCLUDED_METADATA = new HashSet<>(
@ -331,58 +325,65 @@ public class LogHelper {
Collections.singletonList( Collections.singletonList(
"grpc-trace-bin")); "grpc-trace-bin"));
static final class PayloadBuilder<T> { static final class PayloadBuilderHelper<T> {
T payload; T payloadBuilder;
int size;
boolean truncated; boolean truncated;
private PayloadBuilder(T payload, int size, boolean truncated) { private PayloadBuilderHelper(T payload, boolean truncated) {
this.payload = payload; this.payloadBuilder = payload;
this.size = size;
this.truncated = truncated; this.truncated = truncated;
} }
} }
static PayloadBuilder<GrpcLogRecord.Metadata.Builder> createMetadataProto(Metadata metadata, static PayloadBuilderHelper<Payload.Builder> createMetadataProto(Metadata metadata,
int maxHeaderBytes) { int maxHeaderBytes) {
checkNotNull(metadata, "metadata"); checkNotNull(metadata, "metadata");
checkArgument(maxHeaderBytes >= 0, checkArgument(maxHeaderBytes >= 0,
"maxHeaderBytes must be non negative"); "maxHeaderBytes must be non negative");
GrpcLogRecord.Metadata.Builder metadataBuilder = GrpcLogRecord.Metadata.newBuilder(); Joiner joiner = Joiner.on(",").skipNulls();
// This code is tightly coupled with io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata Payload.Builder payloadBuilder = Payload.newBuilder();
// implementation
byte[][] serialized = InternalMetadata.serialize(metadata);
boolean truncated = false; boolean truncated = false;
int totalMetadataBytes = 0; int totalMetadataBytes = 0;
if (serialized != null) { for (String key : metadata.keys()) {
// Calculate bytes for each GrpcLogRecord.Metadata.MetadataEntry if (NEVER_INCLUDED_METADATA.contains(key)) {
for (int i = 0; i < serialized.length; i += 2) { continue;
String key = new String(serialized[i], Charsets.UTF_8); }
byte[] value = serialized[i + 1]; boolean forceInclude = ALWAYS_INCLUDED_METADATA.contains(key);
if (NEVER_INCLUDED_METADATA.contains(key)) { String metadataValue;
continue; if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
} Iterable<byte[]> metadataValues =
boolean forceInclude = ALWAYS_INCLUDED_METADATA.contains(key); metadata.getAll(Metadata.Key.of(key, Metadata.BINARY_BYTE_MARSHALLER));
int metadataBytesAfterAdd = totalMetadataBytes + key.length() + value.length; List<String> numList = new ArrayList<String>();
if (!forceInclude && metadataBytesAfterAdd > maxHeaderBytes) { metadataValues.forEach(
truncated = true; (element) -> {
continue; numList.add(BASE64_ENCODING_OMIT_PADDING.encode(element));
} });
metadataBuilder.addEntryBuilder() metadataValue = joiner.join(numList);
.setKey(key) } else {
.setValue(ByteString.copyFrom(value)); Iterable<String> metadataValues = metadata.getAll(
if (!forceInclude) { Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
// force included keys do not count towards the size limit metadataValue = joiner.join(metadataValues);
totalMetadataBytes = metadataBytesAfterAdd; }
}
int metadataBytesAfterAdd = totalMetadataBytes + key.length() + metadataValue.length();
if (!forceInclude && metadataBytesAfterAdd > maxHeaderBytes) {
truncated = true;
continue;
}
payloadBuilder.putMetadata(key, metadataValue);
if (!forceInclude) {
// force included keys do not count towards the size limit
totalMetadataBytes = metadataBytesAfterAdd;
} }
} }
return new PayloadBuilder<>(metadataBuilder, totalMetadataBytes, truncated); return new PayloadBuilderHelper<>(payloadBuilder, truncated);
} }
static PayloadBuilder<ByteString> createMessageProto(byte[] message, int maxMessageBytes) { static PayloadBuilderHelper<Payload.Builder> createMessageProto(
byte[] message, int maxMessageBytes) {
checkArgument(maxMessageBytes >= 0, checkArgument(maxMessageBytes >= 0,
"maxMessageBytes must be non negative"); "maxMessageBytes must be non negative");
Payload.Builder payloadBuilder = Payload.newBuilder();
int desiredBytes = 0; int desiredBytes = 0;
int messageLength = message.length; int messageLength = message.length;
if (maxMessageBytes > 0) { if (maxMessageBytes > 0) {
@ -390,8 +391,10 @@ public class LogHelper {
} }
ByteString messageData = ByteString messageData =
ByteString.copyFrom(message, 0, desiredBytes); ByteString.copyFrom(message, 0, desiredBytes);
payloadBuilder.setMessage(messageData);
payloadBuilder.setMessageLength(messageLength);
return new PayloadBuilder<>(messageData, messageLength, return new PayloadBuilderHelper<>(payloadBuilder,
maxMessageBytes < message.length); maxMessageBytes < message.length);
} }

View File

@ -33,6 +33,7 @@ import io.grpc.Internal;
import io.grpc.internal.JsonParser; import io.grpc.internal.JsonParser;
import io.grpc.observabilitylog.v1.GrpcLogRecord; import io.grpc.observabilitylog.v1.GrpcLogRecord;
import java.io.IOException; import java.io.IOException;
import java.time.Instant;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
@ -50,41 +51,36 @@ public class GcpLogSink implements Sink {
private static final String DEFAULT_LOG_NAME = private static final String DEFAULT_LOG_NAME =
"microservices.googleapis.com%2Fobservability%2Fgrpc"; "microservices.googleapis.com%2Fobservability%2Fgrpc";
private static final Severity DEFAULT_LOG_LEVEL = Severity.DEBUG;
private static final String K8S_MONITORED_RESOURCE_TYPE = "k8s_container"; private static final String K8S_MONITORED_RESOURCE_TYPE = "k8s_container";
private static final Set<String> kubernetesResourceLabelSet private static final Set<String> kubernetesResourceLabelSet
= ImmutableSet.of("project_id", "location", "cluster_name", "namespace_name", = ImmutableSet.of("project_id", "location", "cluster_name", "namespace_name",
"pod_name", "container_name"); "pod_name", "container_name");
private static final long FALLBACK_FLUSH_LIMIT = 100L;
private final String projectId; private final String projectId;
private final Map<String, String> customTags; private final Map<String, String> customTags;
private final MonitoredResource kubernetesResource; private final MonitoredResource kubernetesResource;
private final Long flushLimit;
/** Lazily initialize cloud logging client to avoid circular initialization. Because cloud /** Lazily initialize cloud logging client to avoid circular initialization. Because cloud
* logging APIs also uses gRPC. */ * logging APIs also uses gRPC. */
private volatile Logging gcpLoggingClient; private volatile Logging gcpLoggingClient;
private long flushCounter;
private final Collection<String> servicesToExclude; private final Collection<String> servicesToExclude;
@VisibleForTesting @VisibleForTesting
GcpLogSink(Logging loggingClient, String destinationProjectId, Map<String, String> locationTags, GcpLogSink(Logging loggingClient, String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit, Collection<String> servicesToExclude) { Map<String, String> customTags, Collection<String> servicesToExclude) {
this(destinationProjectId, locationTags, customTags, flushLimit, servicesToExclude); this(destinationProjectId, locationTags, customTags, servicesToExclude);
this.gcpLoggingClient = loggingClient; this.gcpLoggingClient = loggingClient;
} }
/** /**
* Retrieves a single instance of GcpLogSink. * Retrieves a single instance of GcpLogSink.
* * @param destinationProjectId cloud project id to write logs
* @param destinationProjectId cloud project id to write logs
* @param servicesToExclude service names for which log entries should not be generated * @param servicesToExclude service names for which log entries should not be generated
*/ */
public GcpLogSink(String destinationProjectId, Map<String, String> locationTags, public GcpLogSink(String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit, Collection<String> servicesToExclude) { Map<String, String> customTags, Collection<String> servicesToExclude) {
this.projectId = destinationProjectId; this.projectId = destinationProjectId;
this.customTags = getCustomTags(customTags, locationTags, destinationProjectId); this.customTags = getCustomTags(customTags, locationTags, destinationProjectId);
this.kubernetesResource = getResource(locationTags); this.kubernetesResource = getResource(locationTags);
this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT;
this.flushCounter = 0L;
this.servicesToExclude = checkNotNull(servicesToExclude, "servicesToExclude"); this.servicesToExclude = checkNotNull(servicesToExclude, "servicesToExclude");
} }
@ -106,28 +102,24 @@ public class GcpLogSink implements Sink {
return; return;
} }
try { try {
GrpcLogRecord.EventType event = logProto.getEventType(); GrpcLogRecord.EventType eventType = logProto.getType();
Severity logEntrySeverity = getCloudLoggingLevel(logProto.getLogLevel());
// TODO(DNVindhya): make sure all (int, long) values are not displayed as double // TODO(DNVindhya): make sure all (int, long) values are not displayed as double
// For now, every value is being converted as string because of JsonFormat.printer().print // For now, every value is being converted as string because of JsonFormat.printer().print
Map<String, Object> logProtoMap = protoToMapConverter(logProto);
LogEntry.Builder grpcLogEntryBuilder = LogEntry.Builder grpcLogEntryBuilder =
LogEntry.newBuilder(JsonPayload.of(protoToMapConverter(logProto))) LogEntry.newBuilder(JsonPayload.of(logProtoMap))
.setSeverity(logEntrySeverity) .setSeverity(DEFAULT_LOG_LEVEL)
.setLogName(DEFAULT_LOG_NAME) .setLogName(DEFAULT_LOG_NAME)
.setResource(kubernetesResource); .setResource(kubernetesResource)
.setTimestamp(Instant.now());
if (!customTags.isEmpty()) { if (!customTags.isEmpty()) {
grpcLogEntryBuilder.setLabels(customTags); grpcLogEntryBuilder.setLabels(customTags);
} }
LogEntry grpcLogEntry = grpcLogEntryBuilder.build(); LogEntry grpcLogEntry = grpcLogEntryBuilder.build();
synchronized (this) { synchronized (this) {
logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", event); logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", eventType);
gcpLoggingClient.write(Collections.singleton(grpcLogEntry)); gcpLoggingClient.write(Collections.singleton(grpcLogEntry));
flushCounter = ++flushCounter;
if (flushCounter >= flushLimit) {
gcpLoggingClient.flush();
flushCounter = 0L;
}
} }
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Caught exception while writing to Cloud Logging", e); logger.log(Level.SEVERE, "Caught exception while writing to Cloud Logging", e);
@ -175,29 +167,11 @@ public class GcpLogSink implements Sink {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private Map<String, Object> protoToMapConverter(GrpcLogRecord logProto) private Map<String, Object> protoToMapConverter(GrpcLogRecord logProto)
throws IOException { throws IOException {
JsonFormat.Printer printer = JsonFormat.printer().preservingProtoFieldNames(); JsonFormat.Printer printer = JsonFormat.printer();
String recordJson = printer.print(logProto); String recordJson = printer.print(logProto);
return (Map<String, Object>) JsonParser.parse(recordJson); return (Map<String, Object>) JsonParser.parse(recordJson);
} }
private Severity getCloudLoggingLevel(GrpcLogRecord.LogLevel recordLevel) {
switch (recordLevel.getNumber()) {
case 1: // GrpcLogRecord.LogLevel.LOG_LEVEL_TRACE
case 2: // GrpcLogRecord.LogLevel.LOG_LEVEL_DEBUG
return Severity.DEBUG;
case 3: // GrpcLogRecord.LogLevel.LOG_LEVEL_INFO
return Severity.INFO;
case 4: // GrpcLogRecord.LogLevel.LOG_LEVEL_WARN
return Severity.WARNING;
case 5: // GrpcLogRecord.LogLevel.LOG_LEVEL_ERROR
return Severity.ERROR;
case 6: // GrpcLogRecord.LogLevel.LOG_LEVEL_CRITICAL
return Severity.CRITICAL;
default:
return Severity.DEFAULT;
}
}
/** /**
* Closes Cloud Logging Client. * Closes Cloud Logging Client.
*/ */

View File

@ -28,151 +28,99 @@ option java_outer_classname = "ObservabilityLogProto";
message GrpcLogRecord { message GrpcLogRecord {
// List of event types // List of event types
enum EventType { enum EventType {
GRPC_CALL_UNKNOWN = 0; EVENT_TYPE_UNKNOWN = 0;
// Header sent from client to server // Header sent from client to server
GRPC_CALL_REQUEST_HEADER = 1; CLIENT_HEADER = 1;
// Header sent from server to client // Header sent from server to client
GRPC_CALL_RESPONSE_HEADER = 2; SERVER_HEADER = 2;
// Message sent from client to server // Message sent from client to server
GRPC_CALL_REQUEST_MESSAGE = 3; CLIENT_MESSAGE = 3;
// Message sent from server to client // Message sent from server to client
GRPC_CALL_RESPONSE_MESSAGE = 4; SERVER_MESSAGE = 4;
// Trailer indicates the end of the gRPC call
GRPC_CALL_TRAILER = 5;
// A signal that client is done sending // A signal that client is done sending
GRPC_CALL_HALF_CLOSE = 6; CLIENT_HALF_CLOSE = 5;
// Trailer indicates the end of the gRPC call
SERVER_TRAILER = 6;
// A signal that the rpc is canceled // A signal that the rpc is canceled
GRPC_CALL_CANCEL = 7; CANCEL = 7;
} }
// The entity that generates the log entry // The entity that generates the log entry
enum EventLogger { enum EventLogger {
LOGGER_UNKNOWN = 0; LOGGER_UNKNOWN = 0;
LOGGER_CLIENT = 1; CLIENT = 1;
LOGGER_SERVER = 2; SERVER = 2;
}
// The log severity level of the log entry
enum LogLevel {
LOG_LEVEL_UNKNOWN = 0;
LOG_LEVEL_TRACE = 1;
LOG_LEVEL_DEBUG = 2;
LOG_LEVEL_INFO = 3;
LOG_LEVEL_WARN = 4;
LOG_LEVEL_ERROR = 5;
LOG_LEVEL_CRITICAL = 6;
} }
// The timestamp of the log event // Uniquely identifies a call.
google.protobuf.Timestamp timestamp = 1; // Each call may have several log entries. They will all have the same call_id.
// Uniquely identifies a call. The value must not be 0 in order to disambiguate
// from an unset value.
// Each call may have several log entries. They will all have the same rpc_id.
// Nothing is guaranteed about their value other than they are unique across // Nothing is guaranteed about their value other than they are unique across
// different RPCs in the same gRPC process. // different RPCs in the same gRPC process.
string rpc_id = 2; string call_id = 2;
EventType event_type = 3; // one of the above EventType enum
EventLogger event_logger = 4; // one of the above EventLogger enum
// the name of the service
string service_name = 5;
// the name of the RPC method
string method_name = 6;
LogLevel log_level = 7; // one of the above LogLevel enum
// Peer address information. On client side, peer is logged on server
// header event or trailer event (if trailer-only). On server side, peer
// is always logged on the client header event.
Address peer_address = 8;
// the RPC timeout value
google.protobuf.Duration timeout = 11;
// A single process may be used to run multiple virtual servers with
// different identities.
// The authority is the name of such a server identify. It is typically a
// portion of the URI in the form of <host> or <host>:<port>.
string authority = 12;
// Size of the message or metadata, depending on the event type,
// regardless of whether the full message or metadata is being logged
// (i.e. could be truncated or omitted).
uint32 payload_size = 13;
// true if message or metadata field is either truncated or omitted due
// to config options
bool payload_truncated = 14;
// Used by header event or trailer event
Metadata metadata = 15;
// The entry sequence ID for this call. The first message has a value of 1, // The entry sequence ID for this call. The first message has a value of 1,
// to disambiguate from an unset value. The purpose of this field is to // to disambiguate from an unset value. The purpose of this field is to
// detect missing entries in environments where durability or ordering is // detect missing entries in environments where durability or ordering is
// not guaranteed. // not guaranteed.
uint64 sequence_id = 16; uint64 sequence_id = 3;
// Used by message event EventType type = 4; // one of the above EventType enum
bytes message = 17; EventLogger logger = 5; // one of the above EventLogger enum
// Payload for log entry.
// It can include a combination of {metadata, message, status based on type of
// the event event being logged and config options.
Payload payload = 6;
// true if message or metadata field is either truncated or omitted due
// to config options
bool payload_truncated = 7;
// Peer address information. On client side, peer is logged on server
// header event or trailer event (if trailer-only). On server side, peer
// is always logged on the client header event.
Address peer = 8;
// A single process may be used to run multiple virtual servers with
// different identities.
// The authority is the name of such a server identify. It is typically a
// portion of the URI in the form of <host> or <host>:<port>.
string authority = 10;
// the name of the service
string service_name = 11;
// the name of the RPC method
string method_name = 12;
}
message Payload {
// A list of metadata pairs
map<string, string> metadata = 1;
// the RPC timeout value
google.protobuf.Duration timeout = 2;
// The gRPC status code // The gRPC status code
uint32 status_code = 18; uint32 status_code = 3;
// The gRPC status message // The gRPC status message
string status_message = 19; string status_message = 4;
// The value of the grpc-status-details-bin metadata key, if any. // The value of the grpc-status-details-bin metadata key, if any.
// This is always an encoded google.rpc.Status message // This is always an encoded google.rpc.Status message
bytes status_details = 20; bytes status_details = 5;
// Size of the message or metadata, depending on the event type,
// Attributes of the environment generating log record. The purpose of this // regardless of whether the full message or metadata is being logged
// field is to identify the source environment. // (i.e. could be truncated or omitted).
EnvironmentTags env_tags = 21; uint32 message_length = 6;
// Used by message event
// A list of non-gRPC custom values specified by the application bytes message = 7;
repeated CustomTags custom_tags = 22; }
// A list of metadata pairs // Address information
message Metadata { message Address {
repeated MetadataEntry entry = 1; enum Type {
} TYPE_UNKNOWN = 0;
TYPE_IPV4 = 1; // in 1.2.3.4 form
// One metadata key value pair TYPE_IPV6 = 2; // IPv6 canonical form (RFC5952 section 4)
message MetadataEntry { TYPE_UNIX = 3; // UDS string
string key = 1; }
bytes value = 2; Type type = 1;
} string address = 2;
// only for TYPE_IPV4 and TYPE_IPV6
// Address information uint32 ip_port = 3;
message Address {
enum Type {
TYPE_UNKNOWN = 0;
TYPE_IPV4 = 1; // in 1.2.3.4 form
TYPE_IPV6 = 2; // IPv6 canonical form (RFC5952 section 4)
TYPE_UNIX = 3; // UDS string
}
Type type = 1;
string address = 2;
// only for TYPE_IPV4 and TYPE_IPV6
uint32 ip_port = 3;
}
// Source Environment information
message EnvironmentTags {
string gcp_project_id = 1;
string gcp_numeric_project_id = 2;
string gce_instance_id = 3;
string gce_instance_hostname = 4;
string gce_instance_zone = 5;
string gke_cluster_uid = 6;
string gke_cluster_name = 7;
string gke_cluster_location = 8;
}
// Custom key value pair
message CustomTags {
string key = 1;
string value = 2;
}
} }

View File

@ -36,7 +36,6 @@ import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.interceptors.LogHelper; import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.gcp.observability.logging.GcpLogSink; import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink; import io.grpc.gcp.observability.logging.Sink;
import io.grpc.internal.TimeProvider;
import io.grpc.observabilitylog.v1.GrpcLogRecord; import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.GrpcCleanupRule;
@ -67,7 +66,6 @@ public class LoggingTest {
private static final ImmutableMap<String, String> CUSTOM_TAGS = ImmutableMap.of( private static final ImmutableMap<String, String> CUSTOM_TAGS = ImmutableMap.of(
"KEY1", "Value1", "KEY1", "Value1",
"KEY2", "VALUE2"); "KEY2", "VALUE2");
private static final long FLUSH_LIMIT = 100L;
private final StaticTestingClassLoader classLoader = private final StaticTestingClassLoader classLoader =
new StaticTestingClassLoader(getClass().getClassLoader(), Pattern.compile("io\\.grpc\\..*")); new StaticTestingClassLoader(getClass().getClassLoader(), Pattern.compile("io\\.grpc\\..*"));
@ -113,9 +111,9 @@ public class LoggingTest {
public void run() { public void run() {
Sink sink = Sink sink =
new GcpLogSink( new GcpLogSink(
PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet()); PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, Collections.emptySet());
ObservabilityConfig config = mock(ObservabilityConfig.class); ObservabilityConfig config = mock(ObservabilityConfig.class);
LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER)); LogHelper spyLogHelper = spy(new LogHelper(sink));
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = InternalLoggingChannelInterceptor.Factory channelInterceptorFactory =
new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper);
@ -123,7 +121,7 @@ public class LoggingTest {
new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper);
when(config.isEnableCloudLogging()).thenReturn(true); when(config.isEnableCloudLogging()).thenReturn(true);
FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0); FilterParams logAlwaysFilterParams = FilterParams.create(true, 1024, 10);
when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class)))
.thenReturn(logAlwaysFilterParams); .thenReturn(logAlwaysFilterParams);
when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))).thenReturn(true); when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))).thenReturn(true);
@ -156,7 +154,7 @@ public class LoggingTest {
public void run() { public void run() {
Sink mockSink = mock(GcpLogSink.class); Sink mockSink = mock(GcpLogSink.class);
ObservabilityConfig config = mock(ObservabilityConfig.class); ObservabilityConfig config = mock(ObservabilityConfig.class);
LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER)); LogHelper spyLogHelper = spy(new LogHelper(mockSink));
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = InternalLoggingChannelInterceptor.Factory channelInterceptorFactory =
new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper); new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper);
@ -209,16 +207,16 @@ public class LoggingTest {
FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0); FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0);
when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class))) when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class)))
.thenReturn(logAlwaysFilterParams); .thenReturn(logAlwaysFilterParams);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) when(mockFilterHelper2.isEventToBeLogged(EventType.CLIENT_HEADER))
.thenReturn(true); .thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) when(mockFilterHelper2.isEventToBeLogged(EventType.SERVER_HEADER))
.thenReturn(true); .thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)).thenReturn(true); when(mockFilterHelper2.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)).thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)).thenReturn(true); when(mockFilterHelper2.isEventToBeLogged(EventType.SERVER_TRAILER)).thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)).thenReturn(true); when(mockFilterHelper2.isEventToBeLogged(EventType.CANCEL)).thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE)) when(mockFilterHelper2.isEventToBeLogged(EventType.CLIENT_MESSAGE))
.thenReturn(false); .thenReturn(false);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE)) when(mockFilterHelper2.isEventToBeLogged(EventType.SERVER_MESSAGE))
.thenReturn(false); .thenReturn(false);
try (GcpObservability observability = try (GcpObservability observability =

View File

@ -46,13 +46,12 @@ public class ObservabilityConfigImplTest {
private static final String EVENT_TYPES = "{\n" private static final String EVENT_TYPES = "{\n"
+ " \"enable_cloud_logging\": false,\n" + " \"enable_cloud_logging\": false,\n"
+ " \"event_types\": " + " \"event_types\": "
+ "[\"GRPC_CALL_REQUEST_HEADER\", \"GRPC_CALL_HALF_CLOSE\", \"GRPC_CALL_TRAILER\"]\n" + "[\"CLIENT_HEADER\", \"CLIENT_HALF_CLOSE\", \"SERVER_TRAILER\"]\n"
+ "}"; + "}";
private static final String LOG_FILTERS = "{\n" private static final String LOG_FILTERS = "{\n"
+ " \"enable_cloud_logging\": true,\n" + " \"enable_cloud_logging\": true,\n"
+ " \"destination_project_id\": \"grpc-testing\",\n" + " \"destination_project_id\": \"grpc-testing\",\n"
+ " \"flush_message_count\": 1000,\n"
+ " \"log_filters\": [{\n" + " \"log_filters\": [{\n"
+ " \"pattern\": \"*/*\",\n" + " \"pattern\": \"*/*\",\n"
+ " \"header_bytes\": 4096,\n" + " \"header_bytes\": 4096,\n"
@ -69,11 +68,6 @@ public class ObservabilityConfigImplTest {
+ " \"destination_project_id\": \"grpc-testing\"\n" + " \"destination_project_id\": \"grpc-testing\"\n"
+ "}"; + "}";
private static final String FLUSH_MESSAGE_COUNT = "{\n"
+ " \"enable_cloud_logging\": true,\n"
+ " \"flush_message_count\": 500\n"
+ "}";
private static final String DISABLE_CLOUD_LOGGING = "{\n" private static final String DISABLE_CLOUD_LOGGING = "{\n"
+ " \"enable_cloud_logging\": false\n" + " \"enable_cloud_logging\": false\n"
+ "}"; + "}";
@ -146,7 +140,6 @@ public class ObservabilityConfigImplTest {
assertFalse(observabilityConfig.isEnableCloudMonitoring()); assertFalse(observabilityConfig.isEnableCloudMonitoring());
assertFalse(observabilityConfig.isEnableCloudTracing()); assertFalse(observabilityConfig.isEnableCloudTracing());
assertNull(observabilityConfig.getDestinationProjectId()); assertNull(observabilityConfig.getDestinationProjectId());
assertNull(observabilityConfig.getFlushMessageCount());
assertNull(observabilityConfig.getLogFilters()); assertNull(observabilityConfig.getLogFilters());
assertNull(observabilityConfig.getEventTypes()); assertNull(observabilityConfig.getEventTypes());
} }
@ -158,7 +151,6 @@ public class ObservabilityConfigImplTest {
assertFalse(observabilityConfig.isEnableCloudMonitoring()); assertFalse(observabilityConfig.isEnableCloudMonitoring());
assertFalse(observabilityConfig.isEnableCloudTracing()); assertFalse(observabilityConfig.isEnableCloudTracing());
assertNull(observabilityConfig.getDestinationProjectId()); assertNull(observabilityConfig.getDestinationProjectId());
assertNull(observabilityConfig.getFlushMessageCount());
assertNull(observabilityConfig.getLogFilters()); assertNull(observabilityConfig.getLogFilters());
assertNull(observabilityConfig.getEventTypes()); assertNull(observabilityConfig.getEventTypes());
} }
@ -170,19 +162,11 @@ public class ObservabilityConfigImplTest {
assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing"); assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing");
} }
@Test
public void flushMessageCount() throws Exception {
observabilityConfig.parse(FLUSH_MESSAGE_COUNT);
assertTrue(observabilityConfig.isEnableCloudLogging());
assertThat(observabilityConfig.getFlushMessageCount()).isEqualTo(500L);
}
@Test @Test
public void logFilters() throws IOException { public void logFilters() throws IOException {
observabilityConfig.parse(LOG_FILTERS); observabilityConfig.parse(LOG_FILTERS);
assertTrue(observabilityConfig.isEnableCloudLogging()); assertTrue(observabilityConfig.isEnableCloudLogging());
assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing"); assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing");
assertThat(observabilityConfig.getFlushMessageCount()).isEqualTo(1000L);
List<LogFilter> logFilters = observabilityConfig.getLogFilters(); List<LogFilter> logFilters = observabilityConfig.getLogFilters();
assertThat(logFilters).hasSize(2); assertThat(logFilters).hasSize(2);
assertThat(logFilters.get(0).pattern).isEqualTo("*/*"); assertThat(logFilters.get(0).pattern).isEqualTo("*/*");
@ -199,8 +183,8 @@ public class ObservabilityConfigImplTest {
assertFalse(observabilityConfig.isEnableCloudLogging()); assertFalse(observabilityConfig.isEnableCloudLogging());
List<EventType> eventTypes = observabilityConfig.getEventTypes(); List<EventType> eventTypes = observabilityConfig.getEventTypes();
assertThat(eventTypes).isEqualTo( assertThat(eventTypes).isEqualTo(
ImmutableList.of(EventType.GRPC_CALL_REQUEST_HEADER, EventType.GRPC_CALL_HALF_CLOSE, ImmutableList.of(EventType.CLIENT_HEADER, EventType.CLIENT_HALF_CLOSE,
EventType.GRPC_CALL_TRAILER)); EventType.SERVER_TRAILER));
} }
@Test @Test
@ -265,7 +249,6 @@ public class ObservabilityConfigImplTest {
observabilityConfig.parseFile(configFile.getAbsolutePath()); observabilityConfig.parseFile(configFile.getAbsolutePath());
assertTrue(observabilityConfig.isEnableCloudLogging()); assertTrue(observabilityConfig.isEnableCloudLogging());
assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing"); assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing");
assertThat(observabilityConfig.getFlushMessageCount()).isEqualTo(1000L);
List<LogFilter> logFilters = observabilityConfig.getLogFilters(); List<LogFilter> logFilters = observabilityConfig.getLogFilters();
assertThat(logFilters).hasSize(2); assertThat(logFilters).hasSize(2);
assertThat(logFilters.get(0).pattern).isEqualTo("*/*"); assertThat(logFilters.get(0).pattern).isEqualTo("*/*");

View File

@ -48,9 +48,9 @@ public class ConfigFilterHelperTest {
private static final ImmutableList<EventType> configEventTypes = private static final ImmutableList<EventType> configEventTypes =
ImmutableList.of( ImmutableList.of(
EventType.GRPC_CALL_REQUEST_HEADER, EventType.CLIENT_HEADER,
EventType.GRPC_CALL_HALF_CLOSE, EventType.CLIENT_HALF_CLOSE,
EventType.GRPC_CALL_TRAILER); EventType.SERVER_TRAILER);
private final MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod() private final MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod()
.toBuilder(); .toBuilder();
@ -171,13 +171,13 @@ public class ConfigFilterHelperTest {
@Test @Test
public void checkEventToBeLogged_noFilter_defaultLogAllEventTypes() { public void checkEventToBeLogged_noFilter_defaultLogAllEventTypes() {
List<EventType> eventList = new ArrayList<>(); List<EventType> eventList = new ArrayList<>();
eventList.add(EventType.GRPC_CALL_REQUEST_HEADER); eventList.add(EventType.CLIENT_HEADER);
eventList.add(EventType.GRPC_CALL_RESPONSE_HEADER); eventList.add(EventType.SERVER_HEADER);
eventList.add(EventType.GRPC_CALL_REQUEST_MESSAGE); eventList.add(EventType.CLIENT_MESSAGE);
eventList.add(EventType.GRPC_CALL_RESPONSE_MESSAGE); eventList.add(EventType.SERVER_MESSAGE);
eventList.add(EventType.GRPC_CALL_HALF_CLOSE); eventList.add(EventType.CLIENT_HALF_CLOSE);
eventList.add(EventType.GRPC_CALL_TRAILER); eventList.add(EventType.SERVER_TRAILER);
eventList.add(EventType.GRPC_CALL_CANCEL); eventList.add(EventType.CANCEL);
for (EventType event : eventList) { for (EventType event : eventList) {
assertTrue(configFilterHelper.isEventToBeLogged(event)); assertTrue(configFilterHelper.isEventToBeLogged(event));
@ -191,13 +191,13 @@ public class ConfigFilterHelperTest {
configFilterHelper.setEventFilterSet(); configFilterHelper.setEventFilterSet();
List<EventType> eventList = new ArrayList<>(); List<EventType> eventList = new ArrayList<>();
eventList.add(EventType.GRPC_CALL_REQUEST_HEADER); eventList.add(EventType.CLIENT_HEADER);
eventList.add(EventType.GRPC_CALL_RESPONSE_HEADER); eventList.add(EventType.SERVER_HEADER);
eventList.add(EventType.GRPC_CALL_REQUEST_MESSAGE); eventList.add(EventType.CLIENT_MESSAGE);
eventList.add(EventType.GRPC_CALL_RESPONSE_MESSAGE); eventList.add(EventType.SERVER_MESSAGE);
eventList.add(EventType.GRPC_CALL_HALF_CLOSE); eventList.add(EventType.CLIENT_HALF_CLOSE);
eventList.add(EventType.GRPC_CALL_TRAILER); eventList.add(EventType.SERVER_TRAILER);
eventList.add(EventType.GRPC_CALL_CANCEL); eventList.add(EventType.CANCEL);
for (EventType event : eventList) { for (EventType event : eventList) {
assertFalse(configFilterHelper.isEventToBeLogged(event)); assertFalse(configFilterHelper.isEventToBeLogged(event));
@ -209,10 +209,10 @@ public class ConfigFilterHelperTest {
when(mockConfig.getEventTypes()).thenReturn(configEventTypes); when(mockConfig.getEventTypes()).thenReturn(configEventTypes);
configFilterHelper.setEventFilterSet(); configFilterHelper.setEventFilterSet();
EventType logEventType = EventType.GRPC_CALL_REQUEST_HEADER; EventType logEventType = EventType.CLIENT_HEADER;
assertTrue(configFilterHelper.isEventToBeLogged(logEventType)); assertTrue(configFilterHelper.isEventToBeLogged(logEventType));
EventType doNotLogEventType = EventType.GRPC_CALL_RESPONSE_MESSAGE; EventType doNotLogEventType = EventType.SERVER_MESSAGE;
assertFalse(configFilterHelper.isEventToBeLogged(doNotLogEventType)); assertFalse(configFilterHelper.isEventToBeLogged(doNotLogEventType));
} }
} }

View File

@ -185,7 +185,7 @@ public class InternalLoggingChannelInterceptorTest {
clientInitial.put(keyA, dataA); clientInitial.put(keyA, dataA);
clientInitial.put(keyB, dataB); clientInitial.put(keyB, dataB);
interceptedLoggingCall.start(mockListener, clientInitial); interceptedLoggingCall.start(mockListener, clientInitial);
verify(mockLogHelper).logRequestHeader( verify(mockLogHelper).logClientHeader(
/*seq=*/ eq(1L), /*seq=*/ eq(1L),
eq("service"), eq("service"),
eq("method"), eq("method"),
@ -193,7 +193,7 @@ public class InternalLoggingChannelInterceptorTest {
ArgumentMatchers.isNull(), ArgumentMatchers.isNull(),
same(clientInitial), same(clientInitial),
eq(filterParams.headerBytes()), eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_CLIENT), eq(EventLogger.CLIENT),
anyString(), anyString(),
ArgumentMatchers.isNull()); ArgumentMatchers.isNull());
verifyNoMoreInteractions(mockLogHelper); verifyNoMoreInteractions(mockLogHelper);
@ -207,13 +207,14 @@ public class InternalLoggingChannelInterceptorTest {
{ {
Metadata serverInitial = new Metadata(); Metadata serverInitial = new Metadata();
interceptedListener.get().onHeaders(serverInitial); interceptedListener.get().onHeaders(serverInitial);
verify(mockLogHelper).logResponseHeader( verify(mockLogHelper).logServerHeader(
/*seq=*/ eq(2L), /*seq=*/ eq(2L),
eq("service"), eq("service"),
eq("method"), eq("method"),
eq("the-authority"),
same(serverInitial), same(serverInitial),
eq(filterParams.headerBytes()), eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_CLIENT), eq(EventLogger.CLIENT),
anyString(), anyString(),
same(peer)); same(peer));
verifyNoMoreInteractions(mockLogHelper); verifyNoMoreInteractions(mockLogHelper);
@ -231,10 +232,11 @@ public class InternalLoggingChannelInterceptorTest {
/*seq=*/ eq(3L), /*seq=*/ eq(3L),
eq("service"), eq("service"),
eq("method"), eq("method"),
eq(EventType.GRPC_CALL_REQUEST_MESSAGE), eq("the-authority"),
eq(EventType.CLIENT_MESSAGE),
same(request), same(request),
eq(filterParams.messageBytes()), eq(filterParams.messageBytes()),
eq(EventLogger.LOGGER_CLIENT), eq(EventLogger.CLIENT),
anyString()); anyString());
verifyNoMoreInteractions(mockLogHelper); verifyNoMoreInteractions(mockLogHelper);
assertSame(request, actualRequest.get()); assertSame(request, actualRequest.get());
@ -250,7 +252,8 @@ public class InternalLoggingChannelInterceptorTest {
/*seq=*/ eq(4L), /*seq=*/ eq(4L),
eq("service"), eq("service"),
eq("method"), eq("method"),
eq(EventLogger.LOGGER_CLIENT), eq("the-authority"),
eq(EventLogger.CLIENT),
anyString()); anyString());
halfCloseCalled.get(1, TimeUnit.MILLISECONDS); halfCloseCalled.get(1, TimeUnit.MILLISECONDS);
verifyNoMoreInteractions(mockLogHelper); verifyNoMoreInteractions(mockLogHelper);
@ -267,10 +270,11 @@ public class InternalLoggingChannelInterceptorTest {
/*seq=*/ eq(5L), /*seq=*/ eq(5L),
eq("service"), eq("service"),
eq("method"), eq("method"),
eq(EventType.GRPC_CALL_RESPONSE_MESSAGE), eq("the-authority"),
eq(EventType.SERVER_MESSAGE),
same(response), same(response),
eq(filterParams.messageBytes()), eq(filterParams.messageBytes()),
eq(EventLogger.LOGGER_CLIENT), eq(EventLogger.CLIENT),
anyString()); anyString());
verifyNoMoreInteractions(mockLogHelper); verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onMessage(same(response)); verify(mockListener).onMessage(same(response));
@ -288,10 +292,11 @@ public class InternalLoggingChannelInterceptorTest {
/*seq=*/ eq(6L), /*seq=*/ eq(6L),
eq("service"), eq("service"),
eq("method"), eq("method"),
eq("the-authority"),
same(status), same(status),
same(trailers), same(trailers),
eq(filterParams.headerBytes()), eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_CLIENT), eq(EventLogger.CLIENT),
anyString(), anyString(),
same(peer)); same(peer));
verifyNoMoreInteractions(mockLogHelper); verifyNoMoreInteractions(mockLogHelper);
@ -308,7 +313,8 @@ public class InternalLoggingChannelInterceptorTest {
/*seq=*/ eq(7L), /*seq=*/ eq(7L),
eq("service"), eq("service"),
eq("method"), eq("method"),
eq(EventLogger.LOGGER_CLIENT), eq("the-authority"),
eq(EventLogger.CLIENT),
anyString()); anyString());
cancelCalled.get(1, TimeUnit.MILLISECONDS); cancelCalled.get(1, TimeUnit.MILLISECONDS);
} }
@ -349,7 +355,7 @@ public class InternalLoggingChannelInterceptorTest {
interceptedLoggingCall.start(mockListener, new Metadata()); interceptedLoggingCall.start(mockListener, new Metadata());
ArgumentCaptor<Duration> callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class); ArgumentCaptor<Duration> callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockLogHelper, times(1)) verify(mockLogHelper, times(1))
.logRequestHeader( .logClientHeader(
anyLong(), anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
@ -408,7 +414,7 @@ public class InternalLoggingChannelInterceptorTest {
Objects.requireNonNull(callFuture.get()).start(mockListener, new Metadata()); Objects.requireNonNull(callFuture.get()).start(mockListener, new Metadata());
ArgumentCaptor<Duration> contextTimeoutCaptor = ArgumentCaptor.forClass(Duration.class); ArgumentCaptor<Duration> contextTimeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockLogHelper, times(1)) verify(mockLogHelper, times(1))
.logRequestHeader( .logClientHeader(
anyLong(), anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
@ -470,7 +476,7 @@ public class InternalLoggingChannelInterceptorTest {
Objects.requireNonNull(callFuture.get()).start(mockListener, new Metadata()); Objects.requireNonNull(callFuture.get()).start(mockListener, new Metadata());
ArgumentCaptor<Duration> timeoutCaptor = ArgumentCaptor.forClass(Duration.class); ArgumentCaptor<Duration> timeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockLogHelper, times(1)) verify(mockLogHelper, times(1))
.logRequestHeader( .logClientHeader(
anyLong(), anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
@ -633,8 +639,8 @@ public class InternalLoggingChannelInterceptorTest {
@Test @Test
public void eventFilter_enabled() { public void eventFilter_enabled() {
when(mockFilterHelper.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)).thenReturn(false); when(mockFilterHelper.isEventToBeLogged(EventType.CLIENT_HEADER)).thenReturn(false);
when(mockFilterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)).thenReturn(false); when(mockFilterHelper.isEventToBeLogged(EventType.SERVER_HEADER)).thenReturn(false);
Channel channel = new Channel() { Channel channel = new Channel() {
@Override @Override
@ -697,7 +703,7 @@ public class InternalLoggingChannelInterceptorTest {
{ {
interceptedLoggingCall.start(mockListener, new Metadata()); interceptedLoggingCall.start(mockListener, new Metadata());
verify(mockLogHelper, never()).logRequestHeader( verify(mockLogHelper, never()).logClientHeader(
anyLong(), anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
@ -710,10 +716,11 @@ public class InternalLoggingChannelInterceptorTest {
AdditionalMatchers.or(ArgumentMatchers.isNull(), AdditionalMatchers.or(ArgumentMatchers.isNull(),
ArgumentMatchers.any())); ArgumentMatchers.any()));
interceptedListener.get().onHeaders(new Metadata()); interceptedListener.get().onHeaders(new Metadata());
verify(mockLogHelper, never()).logResponseHeader( verify(mockLogHelper, never()).logServerHeader(
anyLong(), anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
any(Metadata.class), any(Metadata.class),
anyInt(), anyInt(),
any(GrpcLogRecord.EventLogger.class), any(GrpcLogRecord.EventLogger.class),

View File

@ -167,7 +167,7 @@ public class InternalLoggingServerInterceptorTest {
}); });
// receive request header // receive request header
{ {
verify(mockLogHelper).logRequestHeader( verify(mockLogHelper).logClientHeader(
/*seq=*/ eq(1L), /*seq=*/ eq(1L),
eq("service"), eq("service"),
eq("method"), eq("method"),
@ -175,7 +175,7 @@ public class InternalLoggingServerInterceptorTest {
ArgumentMatchers.isNull(), ArgumentMatchers.isNull(),
same(clientInitial), same(clientInitial),
eq(filterParams.headerBytes()), eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_SERVER), eq(EventLogger.SERVER),
anyString(), anyString(),
same(peer)); same(peer));
verifyNoMoreInteractions(mockLogHelper); verifyNoMoreInteractions(mockLogHelper);
@ -188,13 +188,14 @@ public class InternalLoggingServerInterceptorTest {
{ {
Metadata serverInitial = new Metadata(); Metadata serverInitial = new Metadata();
interceptedLoggingCall.get().sendHeaders(serverInitial); interceptedLoggingCall.get().sendHeaders(serverInitial);
verify(mockLogHelper).logResponseHeader( verify(mockLogHelper).logServerHeader(
/*seq=*/ eq(2L), /*seq=*/ eq(2L),
eq("service"), eq("service"),
eq("method"), eq("method"),
eq("the-authority"),
same(serverInitial), same(serverInitial),
eq(filterParams.headerBytes()), eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_SERVER), eq(EventLogger.SERVER),
anyString(), anyString(),
ArgumentMatchers.isNull()); ArgumentMatchers.isNull());
verifyNoMoreInteractions(mockLogHelper); verifyNoMoreInteractions(mockLogHelper);
@ -212,10 +213,11 @@ public class InternalLoggingServerInterceptorTest {
/*seq=*/ eq(3L), /*seq=*/ eq(3L),
eq("service"), eq("service"),
eq("method"), eq("method"),
eq(EventType.GRPC_CALL_REQUEST_MESSAGE), eq("the-authority"),
eq(EventType.CLIENT_MESSAGE),
same(request), same(request),
eq(filterParams.messageBytes()), eq(filterParams.messageBytes()),
eq(EventLogger.LOGGER_SERVER), eq(EventLogger.SERVER),
anyString()); anyString());
verifyNoMoreInteractions(mockLogHelper); verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onMessage(same(request)); verify(mockListener).onMessage(same(request));
@ -231,7 +233,8 @@ public class InternalLoggingServerInterceptorTest {
/*seq=*/ eq(4L), /*seq=*/ eq(4L),
eq("service"), eq("service"),
eq("method"), eq("method"),
eq(EventLogger.LOGGER_SERVER), eq("the-authority"),
eq(EventLogger.SERVER),
anyString()); anyString());
verifyNoMoreInteractions(mockLogHelper); verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onHalfClose(); verify(mockListener).onHalfClose();
@ -248,10 +251,11 @@ public class InternalLoggingServerInterceptorTest {
/*seq=*/ eq(5L), /*seq=*/ eq(5L),
eq("service"), eq("service"),
eq("method"), eq("method"),
eq(EventType.GRPC_CALL_RESPONSE_MESSAGE), eq("the-authority"),
eq(EventType.SERVER_MESSAGE),
same(response), same(response),
eq(filterParams.messageBytes()), eq(filterParams.messageBytes()),
eq(EventLogger.LOGGER_SERVER), eq(EventLogger.SERVER),
anyString()); anyString());
verifyNoMoreInteractions(mockLogHelper); verifyNoMoreInteractions(mockLogHelper);
assertSame(response, actualResponse.get()); assertSame(response, actualResponse.get());
@ -269,10 +273,11 @@ public class InternalLoggingServerInterceptorTest {
/*seq=*/ eq(6L), /*seq=*/ eq(6L),
eq("service"), eq("service"),
eq("method"), eq("method"),
eq("the-authority"),
same(status), same(status),
same(trailers), same(trailers),
eq(filterParams.headerBytes()), eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_SERVER), eq(EventLogger.SERVER),
anyString(), anyString(),
ArgumentMatchers.isNull()); ArgumentMatchers.isNull());
verifyNoMoreInteractions(mockLogHelper); verifyNoMoreInteractions(mockLogHelper);
@ -290,7 +295,8 @@ public class InternalLoggingServerInterceptorTest {
/*seq=*/ eq(7L), /*seq=*/ eq(7L),
eq("service"), eq("service"),
eq("method"), eq("method"),
eq(EventLogger.LOGGER_SERVER), eq("the-authority"),
eq(EventLogger.SERVER),
anyString()); anyString());
verify(mockListener).onCancel(); verify(mockListener).onCancel();
} }
@ -332,7 +338,7 @@ public class InternalLoggingServerInterceptorTest {
}); });
ArgumentCaptor<Duration> timeoutCaptor = ArgumentCaptor.forClass(Duration.class); ArgumentCaptor<Duration> timeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockLogHelper, times(1)) verify(mockLogHelper, times(1))
.logRequestHeader( .logClientHeader(
/*seq=*/ eq(1L), /*seq=*/ eq(1L),
eq("service"), eq("service"),
eq("method"), eq("method"),
@ -340,7 +346,7 @@ public class InternalLoggingServerInterceptorTest {
timeoutCaptor.capture(), timeoutCaptor.capture(),
any(Metadata.class), any(Metadata.class),
eq(filterParams.headerBytes()), eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_SERVER), eq(EventLogger.SERVER),
anyString(), anyString(),
ArgumentMatchers.isNull()); ArgumentMatchers.isNull());
verifyNoMoreInteractions(mockLogHelper); verifyNoMoreInteractions(mockLogHelper);
@ -480,7 +486,7 @@ public class InternalLoggingServerInterceptorTest {
@Test @Test
public void eventFilter_enabled() { public void eventFilter_enabled() {
when(mockFilterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)).thenReturn(false); when(mockFilterHelper.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)).thenReturn(false);
Metadata clientInitial = new Metadata(); Metadata clientInitial = new Metadata();
final MethodDescriptor<byte[], byte[]> method = final MethodDescriptor<byte[], byte[]> method =
@ -551,6 +557,7 @@ public class InternalLoggingServerInterceptorTest {
anyLong(), anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
any(GrpcLogRecord.EventLogger.class), any(GrpcLogRecord.EventLogger.class),
anyString()); anyString());
capturedListener.onCancel(); capturedListener.onCancel();

View File

@ -26,26 +26,22 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.Duration; import com.google.protobuf.Duration;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Durations; import com.google.protobuf.util.Durations;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.Grpc; import io.grpc.Grpc;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor.Marshaller; import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.gcp.observability.interceptors.LogHelper.PayloadBuilder; import io.grpc.gcp.observability.interceptors.LogHelper.PayloadBuilderHelper;
import io.grpc.gcp.observability.logging.GcpLogSink; import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink; import io.grpc.gcp.observability.logging.Sink;
import io.grpc.internal.TimeProvider; import io.grpc.observabilitylog.v1.Address;
import io.grpc.observabilitylog.v1.GrpcLogRecord; import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.Address;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.observabilitylog.v1.GrpcLogRecord.LogLevel; import io.grpc.observabilitylog.v1.Payload;
import io.grpc.observabilitylog.v1.GrpcLogRecord.MetadataEntry;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -55,8 +51,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Objects; import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -77,33 +72,12 @@ public class LogHelperTest {
Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER); Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER);
private static final Metadata.Key<String> KEY_C = private static final Metadata.Key<String> KEY_C =
Metadata.Key.of("c", Metadata.ASCII_STRING_MARSHALLER); Metadata.Key.of("c", Metadata.ASCII_STRING_MARSHALLER);
private static final MetadataEntry ENTRY_A =
MetadataEntry
.newBuilder()
.setKey(KEY_A.name())
.setValue(ByteString.copyFrom(DATA_A.getBytes(StandardCharsets.US_ASCII)))
.build();
private static final MetadataEntry ENTRY_B =
MetadataEntry
.newBuilder()
.setKey(KEY_B.name())
.setValue(ByteString.copyFrom(DATA_B.getBytes(StandardCharsets.US_ASCII)))
.build();
private static final MetadataEntry ENTRY_C =
MetadataEntry
.newBuilder()
.setKey(KEY_C.name())
.setValue(ByteString.copyFrom(DATA_C.getBytes(StandardCharsets.US_ASCII)))
.build();
private static final int HEADER_LIMIT = 10; private static final int HEADER_LIMIT = 10;
private static final int MESSAGE_LIMIT = Integer.MAX_VALUE; private static final int MESSAGE_LIMIT = Integer.MAX_VALUE;
private final Metadata nonEmptyMetadata = new Metadata(); private final Metadata nonEmptyMetadata = new Metadata();
private final Sink sink = mock(GcpLogSink.class); private final Sink sink = mock(GcpLogSink.class);
private final Timestamp timestamp private final LogHelper logHelper = new LogHelper(sink);
= Timestamp.newBuilder().setSeconds(9876).setNanos(54321).build();
private final TimeProvider timeProvider = () -> TimeUnit.SECONDS.toNanos(9876) + 54321;
private final LogHelper logHelper = new LogHelper(sink, timeProvider);
@Before @Before
public void setUp() { public void setUp() {
@ -159,29 +133,26 @@ public class LogHelperTest {
@Test @Test
public void metadataToProto_empty() { public void metadataToProto_empty() {
assertThat(metadataToProtoTestHelper( assertThat(metadataToProtoTestHelper(
EventType.GRPC_CALL_REQUEST_HEADER, new Metadata(), Integer.MAX_VALUE)) EventType.CLIENT_HEADER, new Metadata(), Integer.MAX_VALUE))
.isEqualTo(GrpcLogRecord.newBuilder() .isEqualTo(GrpcLogRecord.newBuilder()
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER) .setType(EventType.CLIENT_HEADER)
.setMetadata( .setPayload(
GrpcLogRecord.Metadata.getDefaultInstance()) Payload.newBuilder().putAllMetadata(new HashMap<>()))
.build()); .build());
} }
@Test @Test
public void metadataToProto() { public void metadataToProto() {
int nonEmptyMetadataSize = 30; Payload.Builder payloadBuilder = Payload.newBuilder()
.putMetadata("a", DATA_A)
.putMetadata("b", DATA_B)
.putMetadata("c", DATA_C);
assertThat(metadataToProtoTestHelper( assertThat(metadataToProtoTestHelper(
EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata, Integer.MAX_VALUE)) EventType.CLIENT_HEADER, nonEmptyMetadata, Integer.MAX_VALUE))
.isEqualTo(GrpcLogRecord.newBuilder() .isEqualTo(GrpcLogRecord.newBuilder()
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER) .setType(EventType.CLIENT_HEADER)
.setMetadata( .setPayload(payloadBuilder)
GrpcLogRecord.Metadata
.newBuilder()
.addEntry(ENTRY_A)
.addEntry(ENTRY_B)
.addEntry(ENTRY_C)
.build())
.setPayloadSize(nonEmptyMetadataSize)
.build()); .build());
} }
@ -193,44 +164,45 @@ public class LogHelperTest {
@Test @Test
public void metadataToProto_truncated() { public void metadataToProto_truncated() {
// 0 byte limit not enough for any metadata // 0 byte limit not enough for any metadata
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 0).payload.build()) assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 0).payloadBuilder.build())
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance()); .isEqualTo(
io.grpc.observabilitylog.v1.Payload.newBuilder()
.putAllMetadata(new HashMap<>())
.build());
// not enough bytes for first key value // not enough bytes for first key value
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 9).payload.build()) assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 9).payloadBuilder.build())
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance()); .isEqualTo(
io.grpc.observabilitylog.v1.Payload.newBuilder()
.putAllMetadata(new HashMap<>())
.build());
// enough for first key value // enough for first key value
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 10).payload.build()) assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 10).payloadBuilder.build())
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata .isEqualTo(
.newBuilder() io.grpc.observabilitylog.v1.Payload.newBuilder().putMetadata("a", DATA_A).build());
.addEntry(ENTRY_A)
.build());
// Test edge cases for >= 2 key values // Test edge cases for >= 2 key values
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 19).payload.build()) assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 19).payloadBuilder.build())
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata .isEqualTo(
.newBuilder() io.grpc.observabilitylog.v1.Payload.newBuilder().putMetadata("a", DATA_A).build());
.addEntry(ENTRY_A) assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 20).payloadBuilder.build())
.build()); .isEqualTo(
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 20).payload.build()) io.grpc.observabilitylog.v1.Payload.newBuilder()
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata .putMetadata("a", DATA_A)
.newBuilder() .putMetadata("b", DATA_B)
.addEntry(ENTRY_A) .build());
.addEntry(ENTRY_B) assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 29).payloadBuilder.build())
.build()); .isEqualTo(
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 29).payload.build()) io.grpc.observabilitylog.v1.Payload.newBuilder()
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata .putMetadata("a", DATA_A)
.newBuilder() .putMetadata("b", DATA_B)
.addEntry(ENTRY_A) .build());
.addEntry(ENTRY_B)
.build());
// not truncated: enough for all keys // not truncated: enough for all keys
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 30).payload.build()) assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 30).payloadBuilder.build())
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata .isEqualTo(
.newBuilder() io.grpc.observabilitylog.v1.Payload.newBuilder()
.addEntry(ENTRY_A) .putMetadata("a", DATA_A)
.addEntry(ENTRY_B) .putMetadata("b", DATA_B)
.addEntry(ENTRY_C) .putMetadata("c", DATA_C)
.build()); .build());
} }
@Test @Test
@ -240,8 +212,11 @@ public class LogHelperTest {
StandardCharsets.US_ASCII); StandardCharsets.US_ASCII);
assertThat(messageTestHelper(bytes, Integer.MAX_VALUE)) assertThat(messageTestHelper(bytes, Integer.MAX_VALUE))
.isEqualTo(GrpcLogRecord.newBuilder() .isEqualTo(GrpcLogRecord.newBuilder()
.setMessage(ByteString.copyFrom(bytes)) .setPayload(
.setPayloadSize(bytes.length) Payload.newBuilder()
.setMessage(
ByteString.copyFrom(bytes))
.setMessageLength(bytes.length))
.build()); .build());
} }
@ -252,18 +227,25 @@ public class LogHelperTest {
StandardCharsets.US_ASCII); StandardCharsets.US_ASCII);
assertThat(messageTestHelper(bytes, 0)) assertThat(messageTestHelper(bytes, 0))
.isEqualTo(GrpcLogRecord.newBuilder() .isEqualTo(GrpcLogRecord.newBuilder()
.setPayloadSize(bytes.length) .setPayload(
Payload.newBuilder()
.setMessageLength(bytes.length))
.setPayloadTruncated(true) .setPayloadTruncated(true)
.build()); .build());
int limit = 10; int limit = 10;
String truncatedMessage = "this is a "; String truncatedMessage = "this is a ";
assertThat(messageTestHelper(bytes, limit)) assertThat(messageTestHelper(bytes, limit))
.isEqualTo(GrpcLogRecord.newBuilder() .isEqualTo(
.setMessage(ByteString.copyFrom(truncatedMessage.getBytes(StandardCharsets.US_ASCII))) GrpcLogRecord.newBuilder()
.setPayloadSize(bytes.length) .setPayload(
.setPayloadTruncated(true) Payload.newBuilder()
.build()); .setMessage(
ByteString.copyFrom(
truncatedMessage.getBytes(StandardCharsets.US_ASCII)))
.setMessageLength(bytes.length))
.setPayloadTruncated(true)
.build());
} }
@ -274,30 +256,28 @@ public class LogHelperTest {
String methodName = "method"; String methodName = "method";
String authority = "authority"; String authority = "authority";
Duration timeout = Durations.fromMillis(1234); Duration timeout = Durations.fromMillis(1234);
String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; String callId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
InetAddress address = InetAddress.getByName("127.0.0.1"); InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345; int port = 12345;
InetSocketAddress peerAddress = new InetSocketAddress(address, port); InetSocketAddress peerAddress = new InetSocketAddress(address, port);
GrpcLogRecord.Builder builder = GrpcLogRecord.Builder builder =
metadataToProtoTestHelper(EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata, metadataToProtoTestHelper(EventType.CLIENT_HEADER, nonEmptyMetadata,
HEADER_LIMIT) HEADER_LIMIT)
.toBuilder() .toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId) .setSequenceId(seqId)
.setServiceName(serviceName) .setServiceName(serviceName)
.setMethodName(methodName) .setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER) .setType(EventType.CLIENT_HEADER)
.setEventLogger(EventLogger.LOGGER_CLIENT) .setLogger(EventLogger.CLIENT)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG) .setCallId(callId);
.setRpcId(rpcId); builder.setAuthority(authority);
builder.setAuthority(authority) builder.setPayload(builder.getPayload().toBuilder().setTimeout(timeout).build());
.setTimeout(timeout);
GrpcLogRecord base = builder.build(); GrpcLogRecord base = builder.build();
// logged on client // logged on client
{ {
logHelper.logRequestHeader( logHelper.logClientHeader(
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
@ -305,15 +285,15 @@ public class LogHelperTest {
timeout, timeout,
nonEmptyMetadata, nonEmptyMetadata,
HEADER_LIMIT, HEADER_LIMIT,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId, callId,
null); null);
verify(sink).write(base); verify(sink).write(base);
} }
// logged on server // logged on server
{ {
logHelper.logRequestHeader( logHelper.logClientHeader(
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
@ -321,19 +301,19 @@ public class LogHelperTest {
timeout, timeout,
nonEmptyMetadata, nonEmptyMetadata,
HEADER_LIMIT, HEADER_LIMIT,
EventLogger.LOGGER_SERVER, EventLogger.SERVER,
rpcId, callId,
peerAddress); peerAddress);
verify(sink).write( verify(sink).write(
base.toBuilder() base.toBuilder()
.setPeerAddress(LogHelper.socketAddressToProto(peerAddress)) .setPeer(LogHelper.socketAddressToProto(peerAddress))
.setEventLogger(EventLogger.LOGGER_SERVER) .setLogger(EventLogger.SERVER)
.build()); .build());
} }
// timeout is null // timeout is null
{ {
logHelper.logRequestHeader( logHelper.logClientHeader(
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
@ -341,18 +321,18 @@ public class LogHelperTest {
null, null,
nonEmptyMetadata, nonEmptyMetadata,
HEADER_LIMIT, HEADER_LIMIT,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId, callId,
null); null);
verify(sink).write( verify(sink).write(
base.toBuilder() base.toBuilder()
.clearTimeout() .setPayload(base.getPayload().toBuilder().clearTimeout().build())
.build()); .build());
} }
// peerAddress is not null (error on client) // peerAddress is not null (error on client)
try { try {
logHelper.logRequestHeader( logHelper.logClientHeader(
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
@ -360,8 +340,8 @@ public class LogHelperTest {
timeout, timeout,
nonEmptyMetadata, nonEmptyMetadata,
HEADER_LIMIT, HEADER_LIMIT,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId, callId,
peerAddress); peerAddress);
fail(); fail();
} catch (IllegalArgumentException expected) { } catch (IllegalArgumentException expected) {
@ -374,68 +354,71 @@ public class LogHelperTest {
long seqId = 1; long seqId = 1;
String serviceName = "service"; String serviceName = "service";
String methodName = "method"; String methodName = "method";
String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; String authority = "authority";
String callId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
InetAddress address = InetAddress.getByName("127.0.0.1"); InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345; int port = 12345;
InetSocketAddress peerAddress = new InetSocketAddress(address, port); InetSocketAddress peerAddress = new InetSocketAddress(address, port);
GrpcLogRecord.Builder builder = GrpcLogRecord.Builder builder =
metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata, metadataToProtoTestHelper(EventType.SERVER_HEADER, nonEmptyMetadata,
HEADER_LIMIT) HEADER_LIMIT)
.toBuilder() .toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId) .setSequenceId(seqId)
.setServiceName(serviceName) .setServiceName(serviceName)
.setMethodName(methodName) .setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_RESPONSE_HEADER) .setAuthority(authority)
.setEventLogger(EventLogger.LOGGER_CLIENT) .setType(EventType.SERVER_HEADER)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG) .setLogger(EventLogger.CLIENT)
.setRpcId(rpcId); .setCallId(callId);
builder.setPeerAddress(LogHelper.socketAddressToProto(peerAddress)); builder.setPeer(LogHelper.socketAddressToProto(peerAddress));
GrpcLogRecord base = builder.build(); GrpcLogRecord base = builder.build();
// logged on client // logged on client
{ {
logHelper.logResponseHeader( logHelper.logServerHeader(
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
authority,
nonEmptyMetadata, nonEmptyMetadata,
HEADER_LIMIT, HEADER_LIMIT,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId, callId,
peerAddress); peerAddress);
verify(sink).write(base); verify(sink).write(base);
} }
// logged on server // logged on server
{ {
logHelper.logResponseHeader( logHelper.logServerHeader(
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
authority,
nonEmptyMetadata, nonEmptyMetadata,
HEADER_LIMIT, HEADER_LIMIT,
EventLogger.LOGGER_SERVER, EventLogger.SERVER,
rpcId, callId,
null); null);
verify(sink).write( verify(sink).write(
base.toBuilder() base.toBuilder()
.setEventLogger(EventLogger.LOGGER_SERVER) .setLogger(EventLogger.SERVER)
.clearPeerAddress() .clearPeer()
.build()); .build());
} }
// peerAddress is not null (error on server) // peerAddress is not null (error on server)
try { try {
logHelper.logResponseHeader( logHelper.logServerHeader(
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
authority,
nonEmptyMetadata, nonEmptyMetadata,
HEADER_LIMIT, HEADER_LIMIT,
EventLogger.LOGGER_SERVER, EventLogger.SERVER,
rpcId, callId,
peerAddress); peerAddress);
fail(); fail();
@ -450,27 +433,30 @@ public class LogHelperTest {
long seqId = 1; long seqId = 1;
String serviceName = "service"; String serviceName = "service";
String methodName = "method"; String methodName = "method";
String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; String authority = "authority";
String callId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
InetAddress address = InetAddress.getByName("127.0.0.1"); InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345; int port = 12345;
InetSocketAddress peerAddress = new InetSocketAddress(address, port); InetSocketAddress peer = new InetSocketAddress(address, port);
Status statusDescription = Status.INTERNAL.withDescription("test description"); Status statusDescription = Status.INTERNAL.withDescription("test description");
GrpcLogRecord.Builder builder = GrpcLogRecord.Builder builder =
metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata, metadataToProtoTestHelper(EventType.SERVER_HEADER, nonEmptyMetadata,
HEADER_LIMIT) HEADER_LIMIT)
.toBuilder() .toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId) .setSequenceId(seqId)
.setServiceName(serviceName) .setServiceName(serviceName)
.setMethodName(methodName) .setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_TRAILER) .setAuthority(authority)
.setEventLogger(EventLogger.LOGGER_CLIENT) .setType(EventType.SERVER_TRAILER)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG) .setLogger(EventLogger.CLIENT)
.setCallId(callId);
builder.setPeer(LogHelper.socketAddressToProto(peer));
builder.setPayload(
builder.getPayload().toBuilder()
.setStatusCode(Status.INTERNAL.getCode().value()) .setStatusCode(Status.INTERNAL.getCode().value())
.setStatusMessage("test description") .setStatusMessage("test description")
.setRpcId(rpcId); .build());
builder.setPeerAddress(LogHelper.socketAddressToProto(peerAddress));
GrpcLogRecord base = builder.build(); GrpcLogRecord base = builder.build();
// logged on client // logged on client
@ -479,12 +465,13 @@ public class LogHelperTest {
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
authority,
statusDescription, statusDescription,
nonEmptyMetadata, nonEmptyMetadata,
HEADER_LIMIT, HEADER_LIMIT,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId, callId,
peerAddress); peer);
verify(sink).write(base); verify(sink).write(base);
} }
@ -494,16 +481,17 @@ public class LogHelperTest {
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
authority,
statusDescription, statusDescription,
nonEmptyMetadata, nonEmptyMetadata,
HEADER_LIMIT, HEADER_LIMIT,
EventLogger.LOGGER_SERVER, EventLogger.SERVER,
rpcId, callId,
null); null);
verify(sink).write( verify(sink).write(
base.toBuilder() base.toBuilder()
.clearPeerAddress() .clearPeer()
.setEventLogger(EventLogger.LOGGER_SERVER) .setLogger(EventLogger.SERVER)
.build()); .build());
} }
@ -513,15 +501,16 @@ public class LogHelperTest {
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
authority,
statusDescription, statusDescription,
nonEmptyMetadata, nonEmptyMetadata,
HEADER_LIMIT, HEADER_LIMIT,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId, callId,
null); null);
verify(sink).write( verify(sink).write(
base.toBuilder() base.toBuilder()
.clearPeerAddress() .clearPeer()
.build()); .build());
} }
@ -531,15 +520,16 @@ public class LogHelperTest {
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
authority,
statusDescription.getCode().toStatus(), statusDescription.getCode().toStatus(),
nonEmptyMetadata, nonEmptyMetadata,
HEADER_LIMIT, HEADER_LIMIT,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId, callId,
peerAddress); peer);
verify(sink).write( verify(sink).write(
base.toBuilder() base.toBuilder()
.clearStatusMessage() .setPayload(base.getPayload().toBuilder().clearStatusMessage().build())
.build()); .build());
} }
} }
@ -551,10 +541,9 @@ public class LogHelperTest {
Metadata metadata = new Metadata(); Metadata metadata = new Metadata();
metadata.put(key, new byte[1]); metadata.put(key, new byte[1]);
int zeroHeaderBytes = 0; int zeroHeaderBytes = 0;
PayloadBuilder<io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.Builder> pair = PayloadBuilderHelper<Payload.Builder> pair =
LogHelper.createMetadataProto(metadata, zeroHeaderBytes); LogHelper.createMetadataProto(metadata, zeroHeaderBytes);
assertThat(Objects.requireNonNull(Iterables.getOnlyElement(pair.payload.getEntryBuilderList())) assertThat(pair.payloadBuilder.build().getMetadataMap().containsKey(key.name())).isTrue();
.getKey()).isEqualTo(key.name());
assertFalse(pair.truncated); assertFalse(pair.truncated);
} }
@ -565,9 +554,9 @@ public class LogHelperTest {
Metadata metadata = new Metadata(); Metadata metadata = new Metadata();
metadata.put(key, new byte[1]); metadata.put(key, new byte[1]);
int unlimitedHeaderBytes = Integer.MAX_VALUE; int unlimitedHeaderBytes = Integer.MAX_VALUE;
PayloadBuilder<io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.Builder> pair PayloadBuilderHelper<Payload.Builder> pair
= LogHelper.createMetadataProto(metadata, unlimitedHeaderBytes); = LogHelper.createMetadataProto(metadata, unlimitedHeaderBytes);
assertThat(pair.payload.getEntryBuilderList()).isEmpty(); assertThat(pair.payloadBuilder.getMetadataMap()).isEmpty();
assertFalse(pair.truncated); assertFalse(pair.truncated);
} }
@ -576,19 +565,19 @@ public class LogHelperTest {
long seqId = 1; long seqId = 1;
String serviceName = "service"; String serviceName = "service";
String methodName = "method"; String methodName = "method";
String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; String authority = "authority";
String callId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
byte[] message = new byte[100]; byte[] message = new byte[100];
GrpcLogRecord.Builder builder = messageTestHelper(message, MESSAGE_LIMIT) GrpcLogRecord.Builder builder = messageTestHelper(message, MESSAGE_LIMIT)
.toBuilder() .toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId) .setSequenceId(seqId)
.setServiceName(serviceName) .setServiceName(serviceName)
.setMethodName(methodName) .setMethodName(methodName)
.setEventType(EventType.GRPC_CALL_REQUEST_MESSAGE) .setAuthority(authority)
.setEventLogger(EventLogger.LOGGER_CLIENT) .setType(EventType.CLIENT_MESSAGE)
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG) .setLogger(EventLogger.CLIENT)
.setRpcId(rpcId); .setCallId(callId);
GrpcLogRecord base = builder.build(); GrpcLogRecord base = builder.build();
// request message // request message
{ {
@ -596,11 +585,12 @@ public class LogHelperTest {
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE, authority,
EventType.CLIENT_MESSAGE,
message, message,
MESSAGE_LIMIT, MESSAGE_LIMIT,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId); callId);
verify(sink).write(base); verify(sink).write(base);
} }
// response message, logged on client // response message, logged on client
@ -609,14 +599,15 @@ public class LogHelperTest {
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE, authority,
EventType.SERVER_MESSAGE,
message, message,
MESSAGE_LIMIT, MESSAGE_LIMIT,
EventLogger.LOGGER_CLIENT, EventLogger.CLIENT,
rpcId); callId);
verify(sink).write( verify(sink).write(
base.toBuilder() base.toBuilder()
.setEventType(EventType.GRPC_CALL_RESPONSE_MESSAGE) .setType(EventType.SERVER_MESSAGE)
.build()); .build());
} }
// request message, logged on server // request message, logged on server
@ -625,14 +616,15 @@ public class LogHelperTest {
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE, authority,
EventType.CLIENT_MESSAGE,
message, message,
MESSAGE_LIMIT, MESSAGE_LIMIT,
EventLogger.LOGGER_SERVER, EventLogger.SERVER,
rpcId); callId);
verify(sink).write( verify(sink).write(
base.toBuilder() base.toBuilder()
.setEventLogger(EventLogger.LOGGER_SERVER) .setLogger(EventLogger.SERVER)
.build()); .build());
} }
// response message, logged on server // response message, logged on server
@ -641,15 +633,34 @@ public class LogHelperTest {
seqId, seqId,
serviceName, serviceName,
methodName, methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE, authority,
EventType.SERVER_MESSAGE,
message, message,
MESSAGE_LIMIT, MESSAGE_LIMIT,
EventLogger.LOGGER_SERVER, EventLogger.SERVER,
rpcId); callId);
verify(sink).write( verify(sink).write(
base.toBuilder() base.toBuilder()
.setEventType(EventType.GRPC_CALL_RESPONSE_MESSAGE) .setType(EventType.SERVER_MESSAGE)
.setEventLogger(EventLogger.LOGGER_SERVER) .setLogger(EventLogger.SERVER)
.build());
}
// message is not of type : com.google.protobuf.Message or byte[]
{
logHelper.logRpcMessage(
seqId,
serviceName,
methodName,
authority,
EventType.CLIENT_MESSAGE,
"message",
MESSAGE_LIMIT,
EventLogger.CLIENT,
callId);
verify(sink).write(
base.toBuilder()
.clearPayload()
.clearPayloadTruncated()
.build()); .build());
} }
} }
@ -667,21 +678,19 @@ public class LogHelperTest {
private static GrpcLogRecord metadataToProtoTestHelper( private static GrpcLogRecord metadataToProtoTestHelper(
EventType type, Metadata metadata, int maxHeaderBytes) { EventType type, Metadata metadata, int maxHeaderBytes) {
GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder(); GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder();
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair PayloadBuilderHelper<Payload.Builder> pair
= LogHelper.createMetadataProto(metadata, maxHeaderBytes); = LogHelper.createMetadataProto(metadata, maxHeaderBytes);
builder.setMetadata(pair.payload); builder.setPayload(pair.payloadBuilder);
builder.setPayloadSize(pair.size);
builder.setPayloadTruncated(pair.truncated); builder.setPayloadTruncated(pair.truncated);
builder.setEventType(type); builder.setType(type);
return builder.build(); return builder.build();
} }
private static GrpcLogRecord messageTestHelper(byte[] message, int maxMessageBytes) { private static GrpcLogRecord messageTestHelper(byte[] message, int maxMessageBytes) {
GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder(); GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder();
PayloadBuilder<ByteString> pair PayloadBuilderHelper<Payload.Builder> pair
= LogHelper.createMessageProto(message, maxMessageBytes); = LogHelper.createMessageProto(message, maxMessageBytes);
builder.setMessage(pair.payload); builder.setPayload(pair.payloadBuilder);
builder.setPayloadSize(pair.size);
builder.setPayloadTruncated(pair.truncated); builder.setPayloadTruncated(pair.truncated);
return builder.build(); return builder.build();
} }

View File

@ -18,7 +18,6 @@ package io.grpc.gcp.observability.logging;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.anyIterable; import static org.mockito.ArgumentMatchers.anyIterable;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoInteractions;
@ -67,7 +66,6 @@ public class GcpLogSinkTest {
private static final ImmutableMap<String, String> CUSTOM_TAGS = private static final ImmutableMap<String, String> CUSTOM_TAGS =
ImmutableMap.of("KEY1", "Value1", ImmutableMap.of("KEY1", "Value1",
"KEY2", "VALUE2"); "KEY2", "VALUE2");
private static final long FLUSH_LIMIT = 10L;
// gRPC is expected to always use this log name when reporting to GCP cloud logging. // gRPC is expected to always use this log name when reporting to GCP cloud logging.
private static final String EXPECTED_LOG_NAME = private static final String EXPECTED_LOG_NAME =
"microservices.googleapis.com%2Fobservability%2Fgrpc"; "microservices.googleapis.com%2Fobservability%2Fgrpc";
@ -77,28 +75,33 @@ public class GcpLogSinkTest {
private static final String METHOD_NAME = "method"; private static final String METHOD_NAME = "method";
private static final String AUTHORITY = "authority"; private static final String AUTHORITY = "authority";
private static final Duration TIMEOUT = Durations.fromMillis(1234); private static final Duration TIMEOUT = Durations.fromMillis(1234);
private static final String RPC_ID = "d155e885-9587-4e77-81f7-3aa5a443d47f"; private static final String CALL_ID = "d155e885-9587-4e77-81f7-3aa5a443d47f";
private static final GrpcLogRecord LOG_PROTO = GrpcLogRecord.newBuilder() private static final GrpcLogRecord LOG_PROTO = GrpcLogRecord.newBuilder()
.setSequenceId(SEQ_ID) .setSequenceId(SEQ_ID)
.setServiceName(SERVICE_NAME) .setServiceName(SERVICE_NAME)
.setMethodName(METHOD_NAME) .setMethodName(METHOD_NAME)
.setAuthority(AUTHORITY) .setAuthority(AUTHORITY)
.setTimeout(TIMEOUT) .setPayload(io.grpc.observabilitylog.v1.Payload.newBuilder().setTimeout(TIMEOUT))
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER) .setType(EventType.CLIENT_HEADER)
.setEventLogger(EventLogger.LOGGER_CLIENT) .setLogger(EventLogger.CLIENT)
.setRpcId(RPC_ID) .setCallId(CALL_ID)
.build(); .build();
// .putFields("timeout", Value.newBuilder().setStringValue("1.234s").build())
private static final Struct struct =
Struct.newBuilder()
.putFields("timeout", Value.newBuilder().setStringValue("1.234s").build())
.build();
private static final Struct EXPECTED_STRUCT_LOG_PROTO = Struct.newBuilder() private static final Struct EXPECTED_STRUCT_LOG_PROTO = Struct.newBuilder()
.putFields("sequence_id", Value.newBuilder().setStringValue(String.valueOf(SEQ_ID)).build()) .putFields("sequenceId", Value.newBuilder().setStringValue(String.valueOf(SEQ_ID)).build())
.putFields("service_name", Value.newBuilder().setStringValue(SERVICE_NAME).build()) .putFields("serviceName", Value.newBuilder().setStringValue(SERVICE_NAME).build())
.putFields("method_name", Value.newBuilder().setStringValue(METHOD_NAME).build()) .putFields("methodName", Value.newBuilder().setStringValue(METHOD_NAME).build())
.putFields("authority", Value.newBuilder().setStringValue(AUTHORITY).build()) .putFields("authority", Value.newBuilder().setStringValue(AUTHORITY).build())
.putFields("timeout", Value.newBuilder().setStringValue("1.234s").build()) .putFields("payload", Value.newBuilder().setStructValue(struct).build())
.putFields("event_type", Value.newBuilder().setStringValue( .putFields("type", Value.newBuilder().setStringValue(
String.valueOf(EventType.GRPC_CALL_REQUEST_HEADER)).build()) String.valueOf(EventType.CLIENT_HEADER)).build())
.putFields("event_logger", Value.newBuilder().setStringValue( .putFields("logger", Value.newBuilder().setStringValue(
String.valueOf(EventLogger.LOGGER_CLIENT)).build()) String.valueOf(EventLogger.CLIENT)).build())
.putFields("rpc_id", Value.newBuilder().setStringValue(RPC_ID).build()) .putFields("callId", Value.newBuilder().setStringValue(CALL_ID).build())
.build(); .build();
@Mock @Mock
private Logging mockLogging; private Logging mockLogging;
@ -107,7 +110,7 @@ public class GcpLogSinkTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void verifyWrite() throws Exception { public void verifyWrite() throws Exception {
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet()); CUSTOM_TAGS, Collections.emptySet());
sink.write(LOG_PROTO); sink.write(LOG_PROTO);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass( ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
@ -125,7 +128,7 @@ public class GcpLogSinkTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void verifyWriteWithTags() { public void verifyWriteWithTags() {
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet()); CUSTOM_TAGS, Collections.emptySet());
MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(LOCATION_TAGS); MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(LOCATION_TAGS);
sink.write(LOG_PROTO); sink.write(LOG_PROTO);
@ -149,7 +152,7 @@ public class GcpLogSinkTest {
Map<String, String> emptyCustomTags = null; Map<String, String> emptyCustomTags = null;
Map<String, String> expectedEmptyLabels = new HashMap<>(); Map<String, String> expectedEmptyLabels = new HashMap<>();
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
emptyCustomTags, FLUSH_LIMIT, Collections.emptySet()); emptyCustomTags, Collections.emptySet());
sink.write(LOG_PROTO); sink.write(LOG_PROTO);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass( ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
@ -170,7 +173,7 @@ public class GcpLogSinkTest {
Map<String, String> expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, LOCATION_TAGS, Map<String, String> expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, LOCATION_TAGS,
destinationProjectId); destinationProjectId);
GcpLogSink sink = new GcpLogSink(mockLogging, destinationProjectId, LOCATION_TAGS, GcpLogSink sink = new GcpLogSink(mockLogging, destinationProjectId, LOCATION_TAGS,
emptyCustomTags, FLUSH_LIMIT, Collections.emptySet()); emptyCustomTags, Collections.emptySet());
sink.write(LOG_PROTO); sink.write(LOG_PROTO);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass( ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
@ -183,24 +186,10 @@ public class GcpLogSinkTest {
} }
} }
@Test
public void verifyFlush() {
long lowerFlushLimit = 2L;
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
CUSTOM_TAGS, lowerFlushLimit, Collections.emptySet());
sink.write(LOG_PROTO);
verify(mockLogging, never()).flush();
sink.write(LOG_PROTO);
verify(mockLogging, times(1)).flush();
sink.write(LOG_PROTO);
sink.write(LOG_PROTO);
verify(mockLogging, times(2)).flush();
}
@Test @Test
public void verifyClose() throws Exception { public void verifyClose() throws Exception {
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
CUSTOM_TAGS, FLUSH_LIMIT, Collections.emptySet()); CUSTOM_TAGS, Collections.emptySet());
sink.write(LOG_PROTO); sink.write(LOG_PROTO);
verify(mockLogging, times(1)).write(anyIterable()); verify(mockLogging, times(1)).write(anyIterable());
sink.close(); sink.close();
@ -211,7 +200,7 @@ public class GcpLogSinkTest {
@Test @Test
public void verifyExclude() throws Exception { public void verifyExclude() throws Exception {
Sink mockSink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS, Sink mockSink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
CUSTOM_TAGS, FLUSH_LIMIT, Collections.singleton("service")); CUSTOM_TAGS, Collections.singleton("service"));
mockSink.write(LOG_PROTO); mockSink.write(LOG_PROTO);
verifyNoInteractions(mockLogging); verifyNoInteractions(mockLogging);
} }