diff --git a/observability/build.gradle b/observability/build.gradle index 38197da722..1b826baf0c 100644 --- a/observability/build.gradle +++ b/observability/build.gradle @@ -7,15 +7,29 @@ plugins { } description = "gRPC: Observability" + +[compileJava].each() { + it.options.compilerArgs += [ + // only has AutoValue annotation processor + "-Xlint:-processing" + ] + appendToProperty( + it.options.errorprone.excludedPaths, + ".*/build/generated/sources/annotationProcessor/java/.*", + "|") +} + dependencies { def cloudLoggingVersion = '3.6.1' + annotationProcessor libraries.autovalue api project(':grpc-api') implementation project(':grpc-protobuf'), project(':grpc-stub'), project(':grpc-alts'), libraries.google_auth_oauth2_http, + libraries.autovalue_annotation, libraries.perfmark, ('com.google.guava:guava:31.0.1-jre'), ('com.google.errorprone:error_prone_annotations:2.11.0'), diff --git a/observability/src/main/java/io/grpc/observability/Observability.java b/observability/src/main/java/io/grpc/observability/Observability.java index a22016f1ad..29db4b9306 100644 --- a/observability/src/main/java/io/grpc/observability/Observability.java +++ b/observability/src/main/java/io/grpc/observability/Observability.java @@ -21,8 +21,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; import io.grpc.ExperimentalApi; import io.grpc.ManagedChannelProvider.ProviderNotFoundException; +import io.grpc.internal.TimeProvider; +import io.grpc.observability.interceptors.ConfigFilterHelper; import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.observability.interceptors.InternalLoggingServerInterceptor; +import io.grpc.observability.interceptors.LogHelper; import io.grpc.observability.logging.GcpLogSink; import io.grpc.observability.logging.Sink; import java.io.IOException; @@ -42,14 +45,13 @@ public final class Observability implements AutoCloseable { if (instance == null) { GlobalLoggingTags globalLoggingTags = new GlobalLoggingTags(); ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance(); - Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId()); + Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(), + globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), 10); + LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); + ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig); instance = grpcInit(sink, - new InternalLoggingChannelInterceptor.FactoryImpl(sink, - globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), - observabilityConfig), - new InternalLoggingServerInterceptor.FactoryImpl(sink, - globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), - observabilityConfig)); + new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper), + new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper)); } return instance; } diff --git a/observability/src/main/java/io/grpc/observability/ObservabilityConfig.java b/observability/src/main/java/io/grpc/observability/ObservabilityConfig.java index 61c5f90835..e7d3c67146 100644 --- a/observability/src/main/java/io/grpc/observability/ObservabilityConfig.java +++ b/observability/src/main/java/io/grpc/observability/ObservabilityConfig.java @@ -17,6 +17,7 @@ package io.grpc.observability; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; +import java.util.List; public interface ObservabilityConfig { /** Is Cloud Logging enabled. */ @@ -26,10 +27,10 @@ public interface ObservabilityConfig { String getDestinationProjectId(); /** Get filters set for logging. */ - LogFilter[] getLogFilters(); + List getLogFilters(); /** Get event types to log. */ - EventType[] getEventTypes(); + List getEventTypes(); /** * POJO for representing a filter used in configuration. @@ -44,7 +45,14 @@ public interface ObservabilityConfig { /** Number of bytes of each header to log. */ public final Integer messageBytes; - LogFilter(String pattern, Integer headerBytes, Integer messageBytes) { + /** + * Object used to represent filter used in configuration. + * + * @param pattern Pattern indicating which service/method to log + * @param headerBytes Number of bytes of each header to log + * @param messageBytes Number of bytes of each header to log + */ + public LogFilter(String pattern, Integer headerBytes, Integer messageBytes) { this.pattern = pattern; this.headerBytes = headerBytes; this.messageBytes = messageBytes; diff --git a/observability/src/main/java/io/grpc/observability/ObservabilityConfigImpl.java b/observability/src/main/java/io/grpc/observability/ObservabilityConfigImpl.java index 52dbb5be40..8b67b96919 100644 --- a/observability/src/main/java/io/grpc/observability/ObservabilityConfigImpl.java +++ b/observability/src/main/java/io/grpc/observability/ObservabilityConfigImpl.java @@ -18,6 +18,7 @@ package io.grpc.observability; import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.collect.ImmutableList; import io.grpc.internal.JsonParser; import io.grpc.internal.JsonUtil; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; @@ -25,14 +26,16 @@ import java.io.IOException; import java.util.List; import java.util.Map; -/** gRPC Observability configuration processor. */ +/** + * gRPC Observability configuration processor. + */ final class ObservabilityConfigImpl implements ObservabilityConfig { private static final String CONFIG_ENV_VAR_NAME = "GRPC_CONFIG_OBSERVABILITY"; private boolean enableCloudLogging = true; private String destinationProjectId = null; - private LogFilter[] logFilters; - private EventType[] eventTypes; + private List logFilters; + private List eventTypes; static ObservabilityConfigImpl getInstance() throws IOException { ObservabilityConfigImpl config = new ObservabilityConfigImpl(); @@ -46,7 +49,7 @@ final class ObservabilityConfigImpl implements ObservabilityConfig { parseLoggingConfig((Map) JsonParser.parse(config)); } - private void parseLoggingConfig(Map loggingConfig) { + private void parseLoggingConfig(Map loggingConfig) { if (loggingConfig != null) { Boolean value = JsonUtil.getBoolean(loggingConfig, "enable_cloud_logging"); if (value != null) { @@ -56,18 +59,20 @@ final class ObservabilityConfigImpl implements ObservabilityConfig { List rawList = JsonUtil.getList(loggingConfig, "log_filters"); if (rawList != null) { List> jsonLogFilters = JsonUtil.checkObjectList(rawList); - this.logFilters = new LogFilter[jsonLogFilters.size()]; - for (int i = 0; i < jsonLogFilters.size(); i++) { - this.logFilters[i] = parseJsonLogFilter(jsonLogFilters.get(i)); + ImmutableList.Builder logFiltersBuilder = new ImmutableList.Builder<>(); + for (Map jsonLogFilter : jsonLogFilters) { + logFiltersBuilder.add(parseJsonLogFilter(jsonLogFilter)); } + this.logFilters = logFiltersBuilder.build(); } rawList = JsonUtil.getList(loggingConfig, "event_types"); if (rawList != null) { List jsonEventTypes = JsonUtil.checkStringList(rawList); - this.eventTypes = new EventType[jsonEventTypes.size()]; - for (int i = 0; i < jsonEventTypes.size(); i++) { - this.eventTypes[i] = convertEventType(jsonEventTypes.get(i)); + ImmutableList.Builder eventTypesBuilder = new ImmutableList.Builder<>(); + for (String jsonEventType : jsonEventTypes) { + eventTypesBuilder.add(convertEventType(jsonEventType)); } + this.eventTypes = eventTypesBuilder.build(); } } } @@ -80,7 +85,7 @@ final class ObservabilityConfigImpl implements ObservabilityConfig { return EventType.GRPC_CALL_REQUEST_HEADER; case "GRPC_CALL_RESPONSE_HEADER": return EventType.GRPC_CALL_RESPONSE_HEADER; - case"GRPC_CALL_REQUEST_MESSAGE": + case "GRPC_CALL_REQUEST_MESSAGE": return EventType.GRPC_CALL_REQUEST_MESSAGE; case "GRPC_CALL_RESPONSE_MESSAGE": return EventType.GRPC_CALL_RESPONSE_MESSAGE; @@ -95,7 +100,7 @@ final class ObservabilityConfigImpl implements ObservabilityConfig { } } - private LogFilter parseJsonLogFilter(Map logFilterMap) { + private LogFilter parseJsonLogFilter(Map logFilterMap) { return new LogFilter(JsonUtil.getString(logFilterMap, "pattern"), JsonUtil.getNumberAsInteger(logFilterMap, "header_bytes"), JsonUtil.getNumberAsInteger(logFilterMap, "message_bytes")); @@ -112,12 +117,12 @@ final class ObservabilityConfigImpl implements ObservabilityConfig { } @Override - public LogFilter[] getLogFilters() { + public List getLogFilters() { return logFilters; } @Override - public EventType[] getEventTypes() { + public List getEventTypes() { return eventTypes; } } diff --git a/observability/src/main/java/io/grpc/observability/interceptors/ConfigFilterHelper.java b/observability/src/main/java/io/grpc/observability/interceptors/ConfigFilterHelper.java new file mode 100644 index 0000000000..7216a1d435 --- /dev/null +++ b/observability/src/main/java/io/grpc/observability/interceptors/ConfigFilterHelper.java @@ -0,0 +1,221 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.observability.interceptors; + +import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.grpc.Internal; +import io.grpc.MethodDescriptor; +import io.grpc.observability.ObservabilityConfig; +import io.grpc.observability.ObservabilityConfig.LogFilter; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Parses gRPC Observability configuration filters for interceptors usage. + */ +@Internal +public class ConfigFilterHelper { + + private static final Logger logger = Logger.getLogger(ConfigFilterHelper.class.getName()); + + public static final FilterParams NO_FILTER_PARAMS + = FilterParams.create(false, 0, 0); + public static final String globalPattern = "*"; + + private final ObservabilityConfig config; + @VisibleForTesting + boolean methodOrServiceFilterPresent; + // Flag to log every service and method + @VisibleForTesting + Map perServiceFilters; + @VisibleForTesting + Map perMethodFilters; + @VisibleForTesting + Set logEventTypeSet; + + @VisibleForTesting + ConfigFilterHelper(ObservabilityConfig config) { + this.config = config; + this.methodOrServiceFilterPresent = false; + this.perServiceFilters = new HashMap<>(); + this.perMethodFilters = new HashMap<>(); + } + + /** + * Creates and returns helper instance for log filtering. + * + * @param config processed ObservabilityConfig object + * @return helper instance for filtering + */ + public static ConfigFilterHelper factory(ObservabilityConfig config) { + ConfigFilterHelper filterHelper = new ConfigFilterHelper(config); + if (config.isEnableCloudLogging()) { + filterHelper.setMethodOrServiceFilterMaps(); + filterHelper.setEventFilterSet(); + } + return filterHelper; + } + + @VisibleForTesting + void setMethodOrServiceFilterMaps() { + List logFilters = config.getLogFilters(); + if (logFilters == null) { + return; + } + + Map perServiceFilters = new HashMap<>(); + Map perMethodFilters = new HashMap<>(); + + for (LogFilter currentFilter : logFilters) { + // '*' for global, 'service/*' for service glob, or 'service/method' for fully qualified + String methodOrServicePattern = currentFilter.pattern; + int currentHeaderBytes + = currentFilter.headerBytes != null ? currentFilter.headerBytes : 0; + int currentMessageBytes + = currentFilter.messageBytes != null ? currentFilter.messageBytes : 0; + if (methodOrServicePattern.equals("*")) { + // parse config for global, e.g. "*" + if (perServiceFilters.containsKey(globalPattern)) { + logger.log(Level.WARNING, "Duplicate entry : {0}", methodOrServicePattern); + continue; + } + FilterParams params = FilterParams.create(true, + currentHeaderBytes, currentMessageBytes); + perServiceFilters.put(globalPattern, params); + } else if (methodOrServicePattern.endsWith("/*")) { + // TODO(DNVindhya): check if service name is a valid string for a service name + // parse config for a service, e.g. "service/*" + String service = MethodDescriptor.extractFullServiceName(methodOrServicePattern); + if (perServiceFilters.containsKey(service)) { + logger.log(Level.WARNING, "Duplicate entry : {0)", methodOrServicePattern); + continue; + } + FilterParams params = FilterParams.create(true, + currentHeaderBytes, currentMessageBytes); + perServiceFilters.put(service, params); + } else { + // TODO(DNVVindhya): check if methodOrServicePattern is a valid full qualified method name + // parse pattern for a fully qualified method, e.g "service/method" + if (perMethodFilters.containsKey(methodOrServicePattern)) { + logger.log(Level.WARNING, "Duplicate entry : {0}", methodOrServicePattern); + continue; + } + FilterParams params = FilterParams.create(true, + currentHeaderBytes, currentMessageBytes); + perMethodFilters.put(methodOrServicePattern, params); + } + } + this.perServiceFilters = ImmutableMap.copyOf(perServiceFilters); + this.perMethodFilters = ImmutableMap.copyOf(perMethodFilters); + if (!perServiceFilters.isEmpty() || !perMethodFilters.isEmpty()) { + this.methodOrServiceFilterPresent = true; + } + } + + @VisibleForTesting + void setEventFilterSet() { + List eventFilters = config.getEventTypes(); + if (eventFilters == null) { + return; + } + if (eventFilters.isEmpty()) { + this.logEventTypeSet = ImmutableSet.of(); + return; + } + this.logEventTypeSet = ImmutableSet.copyOf(eventFilters); + } + + /** + * Class containing results for method/service filter information, such as flag for logging + * method/service and payload limits to be used for filtering. + */ + @AutoValue + public abstract static class FilterParams { + + abstract boolean log(); + + abstract int headerBytes(); + + abstract int messageBytes(); + + @VisibleForTesting + public static FilterParams create(boolean log, int headerBytes, int messageBytes) { + return new AutoValue_ConfigFilterHelper_FilterParams( + log, headerBytes, messageBytes); + } + } + + /** + * Checks if the corresponding service/method passed needs to be logged as per the user provided + * configuration. + * + * @param method the fully qualified name of the method + * @return MethodFilterParams object 1. specifies if the corresponding method needs to be logged + * (log field will be set to true) 2. values of payload limits retrieved from configuration + */ + public FilterParams isMethodToBeLogged(MethodDescriptor method) { + FilterParams params = NO_FILTER_PARAMS; + if (methodOrServiceFilterPresent) { + String fullMethodName = method.getFullMethodName(); + if (perMethodFilters.containsKey(fullMethodName)) { + params = perMethodFilters.get(fullMethodName); + } else { + String serviceName = method.getServiceName(); + if (perServiceFilters.containsKey(serviceName)) { + params = perServiceFilters.get(serviceName); + } else if (perServiceFilters.containsKey(globalPattern)) { + params = perServiceFilters.get(globalPattern); + } + } + } + return params; + } + + /** + * Checks if the corresponding event passed needs to be logged as per the user provided + * configuration. + * + *

All events are logged by default if event_types is not specified or {} in configuration. + * If event_types is specified as [], no events will be logged. + * If events types is specified as a non-empty list, only the events specified in the + * list will be logged. + *

+ * + * @param event gRPC observability event + * @return true if event needs to be logged, false otherwise + */ + public boolean isEventToBeLogged(EventType event) { + if (logEventTypeSet == null) { + return true; + } + boolean logEvent; + if (logEventTypeSet.isEmpty()) { + logEvent = false; + } else { + logEvent = logEventTypeSet.contains(event); + } + return logEvent; + } +} diff --git a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java index 6c4ee63f5c..07a429d512 100644 --- a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java +++ b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java @@ -30,12 +30,9 @@ import io.grpc.Internal; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; -import io.grpc.internal.TimeProvider; -import io.grpc.observability.ObservabilityConfig; -import io.grpc.observability.logging.Sink; +import io.grpc.observability.interceptors.ConfigFilterHelper.FilterParams; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; -import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -47,44 +44,39 @@ import java.util.logging.Logger; */ @Internal public final class InternalLoggingChannelInterceptor implements ClientInterceptor { + private static final Logger logger = Logger .getLogger(InternalLoggingChannelInterceptor.class.getName()); private final LogHelper helper; + private final ConfigFilterHelper filterHelper; public interface Factory { ClientInterceptor create(); } public static class FactoryImpl implements Factory { - private final Sink sink; - private final LogHelper helper; - /** Create the {@link Factory} we need to create our {@link ClientInterceptor}s. */ - public FactoryImpl(Sink sink, Map locationTags, Map customTags, - ObservabilityConfig observabilityConfig) { - this.sink = sink; - this.helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER, locationTags, customTags, - observabilityConfig); + private final LogHelper helper; + private final ConfigFilterHelper filterHelper; + + /** + * Create the {@link Factory} we need to create our {@link ClientInterceptor}s. + */ + public FactoryImpl(LogHelper helper, ConfigFilterHelper filterHelper) { + this.helper = helper; + this.filterHelper = filterHelper; } @Override public ClientInterceptor create() { - return new InternalLoggingChannelInterceptor(helper); - } - - /** - * Closes the sink instance. - */ - public void close() { - if (sink != null) { - sink.close(); - } + return new InternalLoggingChannelInterceptor(helper, filterHelper); } } - private InternalLoggingChannelInterceptor(LogHelper helper) { + private InternalLoggingChannelInterceptor(LogHelper helper, ConfigFilterHelper filterHelper) { this.helper = helper; + this.filterHelper = filterHelper; } @Override @@ -100,12 +92,14 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto final Deadline deadline = LogHelper.min(callOptions.getDeadline(), Context.current().getDeadline()); - // TODO (dnvindhya): implement isMethodToBeLogged() to check for methods to be logged - // according to config. Until then always return true. - if (!helper.isMethodToBeLogged(method.getFullMethodName())) { + FilterParams filterParams = filterHelper.isMethodToBeLogged(method); + if (!filterParams.log()) { return next.newCall(method, callOptions); } + final int maxHeaderBytes = filterParams.headerBytes(); + final int maxMessageBytes = filterParams.messageBytes(); + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { @Override @@ -116,25 +110,28 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto final Duration timeout = deadline == null ? null : Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)); - try { - helper.logRequestHeader( - seq.getAndIncrement(), - serviceName, - methodName, - authority, - timeout, - headers, - EventLogger.LOGGER_CLIENT, - rpcId, - null); - } catch (Exception e) { - // Catching generic exceptions instead of specific ones for all the events. - // This way we can catch both expected and unexpected exceptions instead of re-throwing - // exceptions to callers which will lead to RPC getting aborted. - // Expected exceptions to be caught: - // 1. IllegalArgumentException - // 2. NullPointerException - logger.log(Level.SEVERE, "Unable to log request header", e); + if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) { + try { + helper.logRequestHeader( + seq.getAndIncrement(), + serviceName, + methodName, + authority, + timeout, + headers, + maxHeaderBytes, + EventLogger.LOGGER_CLIENT, + rpcId, + null); + } catch (Exception e) { + // Catching generic exceptions instead of specific ones for all the events. + // This way we can catch both expected and unexpected exceptions instead of re-throwing + // exceptions to callers which will lead to RPC getting aborted. + // Expected exceptions to be caught: + // 1. IllegalArgumentException + // 2. NullPointerException + logger.log(Level.SEVERE, "Unable to log request header", e); + } } Listener observabilityListener = @@ -142,17 +139,21 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto @Override public void onMessage(RespT message) { // Event: EventType.GRPC_CALL_RESPONSE_MESSAGE - try { - helper.logRpcMessage( - seq.getAndIncrement(), - serviceName, - methodName, - EventType.GRPC_CALL_RESPONSE_MESSAGE, - message, - EventLogger.LOGGER_CLIENT, - rpcId); - } catch (Exception e) { - logger.log(Level.SEVERE, "Unable to log response message", e); + EventType responseMessageType = EventType.GRPC_CALL_RESPONSE_MESSAGE; + if (filterHelper.isEventToBeLogged(responseMessageType)) { + try { + helper.logRpcMessage( + seq.getAndIncrement(), + serviceName, + methodName, + responseMessageType, + message, + maxMessageBytes, + EventLogger.LOGGER_CLIENT, + rpcId); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log response message", e); + } } super.onMessage(message); } @@ -160,17 +161,20 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto @Override public void onHeaders(Metadata headers) { // Event: EventType.GRPC_CALL_RESPONSE_HEADER - try { - helper.logResponseHeader( - seq.getAndIncrement(), - serviceName, - methodName, - headers, - EventLogger.LOGGER_CLIENT, - rpcId, - LogHelper.getPeerAddress(getAttributes())); - } catch (Exception e) { - logger.log(Level.SEVERE, "Unable to log response header", e); + if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) { + try { + helper.logResponseHeader( + seq.getAndIncrement(), + serviceName, + methodName, + headers, + maxHeaderBytes, + EventLogger.LOGGER_CLIENT, + rpcId, + LogHelper.getPeerAddress(getAttributes())); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log response header", e); + } } super.onHeaders(headers); } @@ -178,18 +182,21 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto @Override public void onClose(Status status, Metadata trailers) { // Event: EventType.GRPC_CALL_TRAILER - try { - helper.logTrailer( - seq.getAndIncrement(), - serviceName, - methodName, - status, - trailers, - EventLogger.LOGGER_CLIENT, - rpcId, - LogHelper.getPeerAddress(getAttributes())); - } catch (Exception e) { - logger.log(Level.SEVERE, "Unable to log trailer", e); + if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) { + try { + helper.logTrailer( + seq.getAndIncrement(), + serviceName, + methodName, + status, + trailers, + maxHeaderBytes, + EventLogger.LOGGER_CLIENT, + rpcId, + LogHelper.getPeerAddress(getAttributes())); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log trailer", e); + } } super.onClose(status, trailers); } @@ -200,17 +207,21 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto @Override public void sendMessage(ReqT message) { // Event: EventType.GRPC_CALL_REQUEST_MESSAGE - try { - helper.logRpcMessage( - seq.getAndIncrement(), - serviceName, - methodName, - EventType.GRPC_CALL_REQUEST_MESSAGE, - message, - EventLogger.LOGGER_CLIENT, - rpcId); - } catch (Exception e) { - logger.log(Level.SEVERE, "Unable to log request message", e); + EventType requestMessageType = EventType.GRPC_CALL_REQUEST_MESSAGE; + if (filterHelper.isEventToBeLogged(requestMessageType)) { + try { + helper.logRpcMessage( + seq.getAndIncrement(), + serviceName, + methodName, + requestMessageType, + message, + maxMessageBytes, + EventLogger.LOGGER_CLIENT, + rpcId); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log request message", e); + } } super.sendMessage(message); } @@ -218,15 +229,17 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto @Override public void halfClose() { // Event: EventType.GRPC_CALL_HALF_CLOSE - try { - helper.logHalfClose( - seq.getAndIncrement(), - serviceName, - methodName, - EventLogger.LOGGER_CLIENT, - rpcId); - } catch (Exception e) { - logger.log(Level.SEVERE, "Unable to log half close", e); + if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) { + try { + helper.logHalfClose( + seq.getAndIncrement(), + serviceName, + methodName, + EventLogger.LOGGER_CLIENT, + rpcId); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log half close", e); + } } super.halfClose(); } @@ -234,15 +247,17 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto @Override public void cancel(String message, Throwable cause) { // Event: EventType.GRPC_CALL_CANCEL - try { - helper.logCancel( - seq.getAndIncrement(), - serviceName, - methodName, - EventLogger.LOGGER_CLIENT, - rpcId); - } catch (Exception e) { - logger.log(Level.SEVERE, "Unable to log cancel", e); + if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) { + try { + helper.logCancel( + seq.getAndIncrement(), + serviceName, + methodName, + EventLogger.LOGGER_CLIENT, + rpcId); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log cancel", e); + } } super.cancel(message, cause); } diff --git a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptor.java b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptor.java index c9df0f65c4..ca4350bece 100644 --- a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptor.java +++ b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptor.java @@ -28,61 +28,55 @@ import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.Status; -import io.grpc.internal.TimeProvider; -import io.grpc.observability.ObservabilityConfig; -import io.grpc.observability.logging.Sink; + +import io.grpc.observability.interceptors.ConfigFilterHelper.FilterParams; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; import java.net.SocketAddress; -import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; -/** A logging interceptor for {@code LoggingServerProvider}. */ +/** + * A logging interceptor for {@code LoggingServerProvider}. + */ @Internal public final class InternalLoggingServerInterceptor implements ServerInterceptor { + private static final Logger logger = Logger .getLogger(InternalLoggingServerInterceptor.class.getName()); private final LogHelper helper; + private final ConfigFilterHelper filterHelper; public interface Factory { ServerInterceptor create(); } public static class FactoryImpl implements Factory { - private final Sink sink; - private final LogHelper helper; - /** Create the {@link Factory} we need to create our {@link ServerInterceptor}s. */ - public FactoryImpl(Sink sink, Map locationTags, - Map customTags, - ObservabilityConfig observabilityConfig) { - this.sink = sink; - this.helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER, locationTags, customTags, - observabilityConfig); + private final LogHelper helper; + private final ConfigFilterHelper filterHelper; + + /** + * Create the {@link Factory} we need to create our {@link ServerInterceptor}s. + */ + public FactoryImpl(LogHelper helper, ConfigFilterHelper filterHelper) { + this.helper = helper; + this.filterHelper = filterHelper; } @Override public ServerInterceptor create() { - return new InternalLoggingServerInterceptor(helper); - } - - /** - * Closes the sink instance. - */ - public void close() { - if (sink != null) { - sink.close(); - } + return new InternalLoggingServerInterceptor(helper, filterHelper); } } - private InternalLoggingServerInterceptor(LogHelper helper) { + private InternalLoggingServerInterceptor(LogHelper helper, ConfigFilterHelper filterHelper) { this.helper = helper; + this.filterHelper = filterHelper; } @Override @@ -98,32 +92,37 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor final Duration timeout = deadline == null ? null : Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)); - // TODO (dnvindhya): implement isMethodToBeLogged() to check for methods to be logged - // according to config. Until then always return true. - if (!helper.isMethodToBeLogged(call.getMethodDescriptor().getFullMethodName())) { + FilterParams filterParams = filterHelper.isMethodToBeLogged(call.getMethodDescriptor()); + if (!filterParams.log()) { return next.startCall(call, headers); } + final int maxHeaderBytes = filterParams.headerBytes(); + final int maxMessageBytes = filterParams.messageBytes(); + // Event: EventType.GRPC_CALL_REQUEST_HEADER - try { - helper.logRequestHeader( - seq.getAndIncrement(), - serviceName, - methodName, - authority, - timeout, - headers, - EventLogger.LOGGER_SERVER, - rpcId, - peerAddress); - } catch (Exception e) { - // Catching generic exceptions instead of specific ones for all the events. - // This way we can catch both expected and unexpected exceptions instead of re-throwing - // exceptions to callers which will lead to RPC getting aborted. - // Expected exceptions to be caught: - // 1. IllegalArgumentException - // 2. NullPointerException - logger.log(Level.SEVERE, "Unable to log request header", e); + if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) { + try { + helper.logRequestHeader( + seq.getAndIncrement(), + serviceName, + methodName, + authority, + timeout, + headers, + maxHeaderBytes, + EventLogger.LOGGER_SERVER, + rpcId, + peerAddress); + } catch (Exception e) { + // Catching generic exceptions instead of specific ones for all the events. + // This way we can catch both expected and unexpected exceptions instead of re-throwing + // exceptions to callers which will lead to RPC getting aborted. + // Expected exceptions to be caught: + // 1. IllegalArgumentException + // 2. NullPointerException + logger.log(Level.SEVERE, "Unable to log request header", e); + } } ServerCall wrapperCall = @@ -131,17 +130,20 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor @Override public void sendHeaders(Metadata headers) { // Event: EventType.GRPC_CALL_RESPONSE_HEADER - try { - helper.logResponseHeader( - seq.getAndIncrement(), - serviceName, - methodName, - headers, - EventLogger.LOGGER_SERVER, - rpcId, - null); - } catch (Exception e) { - logger.log(Level.SEVERE, "Unable to log response header", e); + if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) { + try { + helper.logResponseHeader( + seq.getAndIncrement(), + serviceName, + methodName, + headers, + maxHeaderBytes, + EventLogger.LOGGER_SERVER, + rpcId, + null); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log response header", e); + } } super.sendHeaders(headers); } @@ -149,17 +151,21 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor @Override public void sendMessage(RespT message) { // Event: EventType.GRPC_CALL_RESPONSE_MESSAGE - try { - helper.logRpcMessage( - seq.getAndIncrement(), - serviceName, - methodName, - EventType.GRPC_CALL_RESPONSE_MESSAGE, - message, - EventLogger.LOGGER_SERVER, - rpcId); - } catch (Exception e) { - logger.log(Level.SEVERE, "Unable to log response message", e); + EventType responseMessageType = EventType.GRPC_CALL_RESPONSE_MESSAGE; + if (filterHelper.isEventToBeLogged(responseMessageType)) { + try { + helper.logRpcMessage( + seq.getAndIncrement(), + serviceName, + methodName, + responseMessageType, + message, + maxMessageBytes, + EventLogger.LOGGER_SERVER, + rpcId); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log response message", e); + } } super.sendMessage(message); } @@ -167,18 +173,21 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor @Override public void close(Status status, Metadata trailers) { // Event: EventType.GRPC_CALL_TRAILER - try { - helper.logTrailer( - seq.getAndIncrement(), - serviceName, - methodName, - status, - trailers, - EventLogger.LOGGER_SERVER, - rpcId, - null); - } catch (Exception e) { - logger.log(Level.SEVERE, "Unable to log trailer", e); + if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) { + try { + helper.logTrailer( + seq.getAndIncrement(), + serviceName, + methodName, + status, + trailers, + maxHeaderBytes, + EventLogger.LOGGER_SERVER, + rpcId, + null); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log trailer", e); + } } super.close(status, trailers); } @@ -189,17 +198,21 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor @Override public void onMessage(ReqT message) { // Event: EventType.GRPC_CALL_REQUEST_MESSAGE - try { - helper.logRpcMessage( - seq.getAndIncrement(), - serviceName, - methodName, - EventType.GRPC_CALL_REQUEST_MESSAGE, - message, - EventLogger.LOGGER_SERVER, - rpcId); - } catch (Exception e) { - logger.log(Level.SEVERE, "Unable to log request message", e); + EventType requestMessageType = EventType.GRPC_CALL_REQUEST_MESSAGE; + if (filterHelper.isEventToBeLogged(requestMessageType)) { + try { + helper.logRpcMessage( + seq.getAndIncrement(), + serviceName, + methodName, + requestMessageType, + message, + maxMessageBytes, + EventLogger.LOGGER_SERVER, + rpcId); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log request message", e); + } } super.onMessage(message); } @@ -207,15 +220,17 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor @Override public void onHalfClose() { // Event: EventType.GRPC_CALL_HALF_CLOSE - try { - helper.logHalfClose( - seq.getAndIncrement(), - serviceName, - methodName, - EventLogger.LOGGER_SERVER, - rpcId); - } catch (Exception e) { - logger.log(Level.SEVERE, "Unable to log half close", e); + if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) { + try { + helper.logHalfClose( + seq.getAndIncrement(), + serviceName, + methodName, + EventLogger.LOGGER_SERVER, + rpcId); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log half close", e); + } } super.onHalfClose(); } @@ -223,15 +238,17 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor @Override public void onCancel() { // Event: EventType.GRPC_CALL_CANCEL - try { - helper.logCancel( - seq.getAndIncrement(), - serviceName, - methodName, - EventLogger.LOGGER_SERVER, - rpcId); - } catch (Exception e) { - logger.log(Level.SEVERE, "Unable to log cancel", e); + if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) { + try { + helper.logCancel( + seq.getAndIncrement(), + serviceName, + methodName, + EventLogger.LOGGER_SERVER, + rpcId); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unable to log cancel", e); + } } super.onCancel(); } diff --git a/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java b/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java index f4e705b470..4373752d52 100644 --- a/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java +++ b/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java @@ -30,7 +30,6 @@ import io.grpc.InternalMetadata; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.internal.TimeProvider; -import io.grpc.observability.ObservabilityConfig; import io.grpc.observability.logging.Sink; import io.grpc.observabilitylog.v1.GrpcLogRecord; import io.grpc.observabilitylog.v1.GrpcLogRecord.Address; @@ -42,7 +41,9 @@ import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Map; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -50,10 +51,10 @@ import javax.annotation.Nullable; /** * Helper class for GCP observability logging. */ -class LogHelper { +public class LogHelper { private static final Logger logger = Logger.getLogger(LogHelper.class.getName()); - // TODO(dnvindhya): Define it in one places(TBD) to make it easily accessible from everywhere + // TODO(DNVindhya): Define it in one places(TBD) to make it easily accessible from everywhere static final Metadata.Key STATUS_DETAILS_KEY = Metadata.Key.of( "grpc-status-details-bin", @@ -61,18 +62,16 @@ class LogHelper { private final Sink sink; private final TimeProvider timeProvider; - // TODO(DNvindhya) remove unused annotation once the following 2 are actually used - @SuppressWarnings({"unused"}) private final Map locationTags; - @SuppressWarnings({"unused"}) private final Map customTags; - @SuppressWarnings({"unused"}) private final ObservabilityConfig observabilityConfig; - LogHelper(Sink sink, TimeProvider timeProvider, Map locationTags, - Map customTags, ObservabilityConfig observabilityConfig) { + /** + * Creates a LogHelper instance. + * + * @param sink sink + * @param timeProvider timeprovider + */ + public LogHelper(Sink sink, TimeProvider timeProvider) { this.sink = sink; this.timeProvider = timeProvider; - this.locationTags = locationTags; - this.customTags = customTags; - this.observabilityConfig = observabilityConfig; } /** @@ -85,6 +84,7 @@ class LogHelper { String authority, @Nullable Duration timeout, Metadata metadata, + int maxHeaderBytes, GrpcLogRecord.EventLogger eventLogger, String rpcId, // null on client side @@ -96,7 +96,8 @@ class LogHelper { peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_SERVER, "peerAddress can only be specified by server"); - PayloadBuilder pair = createMetadataProto(metadata); + PayloadBuilder pair = + createMetadataProto(metadata, maxHeaderBytes); GrpcLogRecord.Builder logEntryBuilder = createTimestamp() .setSequenceId(seqId) .setServiceName(serviceName) @@ -107,6 +108,7 @@ class LogHelper { .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) .setMetadata(pair.payload) .setPayloadSize(pair.size) + .setPayloadTruncated(pair.truncated) .setRpcId(rpcId); if (timeout != null) { logEntryBuilder.setTimeout(timeout); @@ -118,13 +120,14 @@ class LogHelper { } /** - * Logs the reponse header. Binary logging equivalent of logServerHeader. + * Logs the response header. Binary logging equivalent of logServerHeader. */ void logResponseHeader( long seqId, String serviceName, String methodName, Metadata metadata, + int maxHeaderBytes, GrpcLogRecord.EventLogger eventLogger, String rpcId, @Nullable SocketAddress peerAddress) { @@ -137,7 +140,8 @@ class LogHelper { peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT, "peerAddress can only be specified for client"); - PayloadBuilder pair = createMetadataProto(metadata); + PayloadBuilder pair = + createMetadataProto(metadata, maxHeaderBytes); GrpcLogRecord.Builder logEntryBuilder = createTimestamp() .setSequenceId(seqId) .setServiceName(serviceName) @@ -147,6 +151,7 @@ class LogHelper { .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) .setMetadata(pair.payload) .setPayloadSize(pair.size) + .setPayloadTruncated(pair.truncated) .setRpcId(rpcId); if (peerAddress != null) { logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress)); @@ -163,6 +168,7 @@ class LogHelper { String methodName, Status status, Metadata metadata, + int maxHeaderBytes, GrpcLogRecord.EventLogger eventLogger, String rpcId, @Nullable SocketAddress peerAddress) { @@ -174,7 +180,8 @@ class LogHelper { peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT, "peerAddress can only be specified for client"); - PayloadBuilder pair = createMetadataProto(metadata); + PayloadBuilder pair = + createMetadataProto(metadata, maxHeaderBytes); GrpcLogRecord.Builder logEntryBuilder = createTimestamp() .setSequenceId(seqId) .setServiceName(serviceName) @@ -184,6 +191,7 @@ class LogHelper { .setLogLevel(LogLevel.LOG_LEVEL_DEBUG) .setMetadata(pair.payload) .setPayloadSize(pair.size) + .setPayloadTruncated(pair.truncated) .setStatusCode(status.getCode().value()) .setRpcId(rpcId); String statusDescription = status.getDescription(); @@ -209,6 +217,7 @@ class LogHelper { String methodName, EventType eventType, T message, + int maxMessageBytes, EventLogger eventLogger, String rpcId) { checkNotNull(serviceName, "serviceName"); @@ -220,13 +229,13 @@ class LogHelper { "event type must correspond to client message or server message"); checkNotNull(message, "message"); - // TODO(dnvindhya): Implement conversion of generics to ByteString + // TODO(DNVindhya): Implement conversion of generics to ByteString // Following is a temporary workaround to log if message is of following types : // 1. com.google.protobuf.Message // 2. byte[] byte[] messageBytesArray = null; if (message instanceof com.google.protobuf.Message) { - messageBytesArray = ((com.google.protobuf.Message)message).toByteArray(); + messageBytesArray = ((com.google.protobuf.Message) message).toByteArray(); } else if (message instanceof byte[]) { messageBytesArray = (byte[]) message; } else { @@ -235,7 +244,7 @@ class LogHelper { } PayloadBuilder pair = null; if (messageBytesArray != null) { - pair = createMesageProto(messageBytesArray); + pair = createMessageProto(messageBytesArray, maxMessageBytes); } GrpcLogRecord.Builder logEntryBuilder = createTimestamp() @@ -250,7 +259,8 @@ class LogHelper { logEntryBuilder.setPayloadSize(pair.size); } if (pair != null && pair.payload != null) { - logEntryBuilder.setMessage(pair.payload); + logEntryBuilder.setMessage(pair.payload) + .setPayloadTruncated(pair.truncated); } sink.write(logEntryBuilder.build()); } @@ -308,45 +318,79 @@ class LogHelper { return GrpcLogRecord.newBuilder().setTimestamp(Timestamps.fromNanos(nanos)); } + // TODO(DNVindhya): Evaluate if we need following clause for metadata logging in Observability + // Leaving the implementation for now as is to have same behavior across Java and Go + private static final Set NEVER_INCLUDED_METADATA = new HashSet<>( + Collections.singletonList( + // grpc-status-details-bin is already logged in `status_details` field of the + // observabilitylog proto + STATUS_DETAILS_KEY.name())); + private static final Set ALWAYS_INCLUDED_METADATA = new HashSet<>( + Collections.singletonList( + "grpc-trace-bin")); + static final class PayloadBuilder { T payload; int size; + boolean truncated; - private PayloadBuilder(T payload, int size) { + private PayloadBuilder(T payload, int size, boolean truncated) { this.payload = payload; this.size = size; + this.truncated = truncated; } } - // TODO(dnvindhya): Create a unit test for the metadata conversion - static PayloadBuilder createMetadataProto(Metadata metadata) { + static PayloadBuilder createMetadataProto(Metadata metadata, + int maxHeaderBytes) { checkNotNull(metadata, "metadata"); + checkArgument(maxHeaderBytes >= 0, + "maxHeaderBytes must be non negative"); GrpcLogRecord.Metadata.Builder metadataBuilder = GrpcLogRecord.Metadata.newBuilder(); - // This code is tightly coupled with io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata's + // This code is tightly coupled with io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata // implementation byte[][] serialized = InternalMetadata.serialize(metadata); + boolean truncated = false; int totalMetadataBytes = 0; if (serialized != null) { - int singleMetadataEntryBytes = 0; // Calculate bytes for each GrpcLogRecord.Metadata.MetadataEntry for (int i = 0; i < serialized.length; i += 2) { String key = new String(serialized[i], Charsets.UTF_8); byte[] value = serialized[i + 1]; - singleMetadataEntryBytes = totalMetadataBytes + key.length() + value.length; + if (NEVER_INCLUDED_METADATA.contains(key)) { + continue; + } + boolean forceInclude = ALWAYS_INCLUDED_METADATA.contains(key); + int metadataBytesAfterAdd = totalMetadataBytes + key.length() + value.length; + if (!forceInclude && metadataBytesAfterAdd > maxHeaderBytes) { + truncated = true; + continue; + } metadataBuilder.addEntryBuilder() .setKey(key) .setValue(ByteString.copyFrom(value)); - totalMetadataBytes = singleMetadataEntryBytes; + if (!forceInclude) { + // force included keys do not count towards the size limit + totalMetadataBytes = metadataBytesAfterAdd; + } } } - return new PayloadBuilder<>(metadataBuilder, totalMetadataBytes); + return new PayloadBuilder<>(metadataBuilder, totalMetadataBytes, truncated); } - static PayloadBuilder createMesageProto(byte[] message) { + static PayloadBuilder createMessageProto(byte[] message, int maxMessageBytes) { + checkArgument(maxMessageBytes >= 0, + "maxMessageBytes must be non negative"); + int desiredBytes = 0; int messageLength = message.length; + if (maxMessageBytes > 0) { + desiredBytes = Math.min(maxMessageBytes, messageLength); + } ByteString messageData = - ByteString.copyFrom(message, 0, messageLength); - return new PayloadBuilder(messageData, messageLength); + ByteString.copyFrom(message, 0, desiredBytes); + + return new PayloadBuilder<>(messageData, messageLength, + maxMessageBytes < message.length); } static Address socketAddressToProto(SocketAddress address) { @@ -366,7 +410,7 @@ class LogHelper { } builder.setIpPort(((InetSocketAddress) address).getPort()); } else if (address.getClass().getName().equals("io.netty.channel.unix.DomainSocketAddress")) { - // To avoid a compile time dependency on grpc-netty, we check against the + // To avoid a compiled time dependency on grpc-netty, we check against the // runtime class name. builder.setType(Address.Type.TYPE_UNIX) .setAddress(address.toString()); @@ -395,10 +439,4 @@ class LogHelper { } return deadline0.minimum(deadline1); } - - // TODO (dnvindhya) : Implement service and method name filtering - // Add unit tests for the method as part of filtering implementation - boolean isMethodToBeLogged(String fullMethodName) { - return true; - } } diff --git a/observability/src/main/java/io/grpc/observability/logging/GcpLogSink.java b/observability/src/main/java/io/grpc/observability/logging/GcpLogSink.java index 419160062d..93f84ae93c 100644 --- a/observability/src/main/java/io/grpc/observability/logging/GcpLogSink.java +++ b/observability/src/main/java/io/grpc/observability/logging/GcpLogSink.java @@ -24,12 +24,15 @@ import com.google.cloud.logging.Payload.JsonPayload; import com.google.cloud.logging.Severity; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; import com.google.protobuf.util.JsonFormat; import io.grpc.internal.JsonParser; import io.grpc.observabilitylog.v1.GrpcLogRecord; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -39,10 +42,19 @@ import java.util.logging.Logger; public class GcpLogSink implements Sink { private final Logger logger = Logger.getLogger(GcpLogSink.class.getName()); - // TODO (dnvindhya): Make cloud logging service a configurable value + // TODO(DNVindhya): Make cloud logging service a configurable value private static final String SERVICE_TO_EXCLUDE = "google.logging.v2.LoggingServiceV2"; private static final String DEFAULT_LOG_NAME = "grpc"; + private static final String K8S_MONITORED_RESOURCE_TYPE = "k8s_container"; + private static final Set kubernetesResourceLabelSet + = ImmutableSet.of("project_id", "location", "cluster_name", "namespace_name", + "pod_name", "container_name"); + private static final int FALLBACK_FLUSH_LIMIT = 100; + private final Map customTags; private final Logging gcpLoggingClient; + private final MonitoredResource kubernetesResource; + private final int flushLimit; + private int flushCounter; private static Logging createLoggingClient(String projectId) { LoggingOptions.Builder builder = LoggingOptions.newBuilder(); @@ -57,15 +69,27 @@ public class GcpLogSink implements Sink { * * @param destinationProjectId cloud project id to write logs */ - public GcpLogSink(String destinationProjectId) { - this(createLoggingClient(destinationProjectId)); + public GcpLogSink(String destinationProjectId, Map locationTags, + Map customTags, int flushLimit) { + this(createLoggingClient(destinationProjectId), locationTags, customTags, flushLimit); + } @VisibleForTesting - GcpLogSink(Logging client) { + GcpLogSink(Logging client, Map locationTags, Map customTags, + int flushLimit) { this.gcpLoggingClient = client; + this.customTags = customTags != null ? customTags : new HashMap<>(); + this.kubernetesResource = getResource(locationTags); + this.flushLimit = flushLimit != 0 ? flushLimit : FALLBACK_FLUSH_LIMIT; + this.flushCounter = 0; } + /** + * Writes logs to GCP Cloud Logging. + * + * @param logProto gRPC logging proto containing the message to be logged + */ @Override public void write(GrpcLogRecord logProto) { if (gcpLoggingClient == null) { @@ -78,22 +102,45 @@ public class GcpLogSink implements Sink { try { GrpcLogRecord.EventType event = logProto.getEventType(); Severity logEntrySeverity = getCloudLoggingLevel(logProto.getLogLevel()); - // TODO(vindhyan): make sure all (int, long) values are not displayed as double - LogEntry grpcLogEntry = + // TODO(DNVindhya): make sure all (int, long) values are not displayed as double + // For now, every value is being converted as string because of JsonFormat.printer().print + LogEntry.Builder grpcLogEntryBuilder = LogEntry.newBuilder(JsonPayload.of(protoToMapConverter(logProto))) .setSeverity(logEntrySeverity) .setLogName(DEFAULT_LOG_NAME) - .setResource(MonitoredResource.newBuilder("global").build()) - .build(); + .setResource(kubernetesResource); + if (!customTags.isEmpty()) { + grpcLogEntryBuilder.setLabels(customTags); + } + LogEntry grpcLogEntry = grpcLogEntryBuilder.build(); synchronized (this) { logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", event); gcpLoggingClient.write(Collections.singleton(grpcLogEntry)); + flushCounter += 1; + if (flushCounter >= flushLimit) { + gcpLoggingClient.flush(); + flushCounter = 0; + } } } catch (Exception e) { logger.log(Level.SEVERE, "Caught exception while writing to Cloud Logging", e); } } + @VisibleForTesting + static MonitoredResource getResource(Map resourceTags) { + MonitoredResource.Builder builder = MonitoredResource.newBuilder(K8S_MONITORED_RESOURCE_TYPE); + if ((resourceTags != null) && !resourceTags.isEmpty()) { + for (Map.Entry entry : resourceTags.entrySet()) { + String resourceKey = entry.getKey(); + if (kubernetesResourceLabelSet.contains(resourceKey)) { + builder.addLabel(resourceKey, entry.getValue()); + } + } + } + return builder.build(); + } + @SuppressWarnings("unchecked") private Map protoToMapConverter(GrpcLogRecord logProto) throws IOException { diff --git a/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java b/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java index b8de06cf56..cf8bbdc950 100644 --- a/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java +++ b/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java @@ -34,9 +34,9 @@ import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelProvider; import io.grpc.MethodDescriptor; import io.grpc.TlsChannelCredentials; +import io.grpc.observability.interceptors.ConfigFilterHelper; import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor; -import io.grpc.observability.logging.GcpLogSink; -import io.grpc.observability.logging.Sink; +import io.grpc.observability.interceptors.LogHelper; import io.grpc.testing.TestMethodDescriptors; import org.junit.Rule; import org.junit.Test; @@ -58,13 +58,14 @@ public class LoggingChannelProviderTest { public void initTwiceCausesException() { ManagedChannelProvider prevProvider = ManagedChannelProvider.provider(); assertThat(prevProvider).isNotInstanceOf(LoggingChannelProvider.class); - Sink mockSink = mock(GcpLogSink.class); + LogHelper mockLogHelper = mock(LogHelper.class); + ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); LoggingChannelProvider.init( - new InternalLoggingChannelInterceptor.FactoryImpl(mockSink, null, null, null)); + new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper)); assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class); try { LoggingChannelProvider.init( - new InternalLoggingChannelInterceptor.FactoryImpl(mockSink, null, null, null)); + new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper)); fail("should have failed for calling init() again"); } catch (IllegalStateException e) { assertThat(e).hasMessageThat().contains("LoggingChannelProvider already initialized!"); diff --git a/observability/src/test/java/io/grpc/observability/LoggingServerProviderTest.java b/observability/src/test/java/io/grpc/observability/LoggingServerProviderTest.java index 8f837f44f4..7454286517 100644 --- a/observability/src/test/java/io/grpc/observability/LoggingServerProviderTest.java +++ b/observability/src/test/java/io/grpc/observability/LoggingServerProviderTest.java @@ -35,9 +35,9 @@ import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.ServerProvider; +import io.grpc.observability.interceptors.ConfigFilterHelper; import io.grpc.observability.interceptors.InternalLoggingServerInterceptor; -import io.grpc.observability.logging.GcpLogSink; -import io.grpc.observability.logging.Sink; +import io.grpc.observability.interceptors.LogHelper; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.protobuf.SimpleRequest; @@ -59,13 +59,14 @@ public class LoggingServerProviderTest { public void initTwiceCausesException() { ServerProvider prevProvider = ServerProvider.provider(); assertThat(prevProvider).isNotInstanceOf(LoggingServerProvider.class); - Sink mockSink = mock(GcpLogSink.class); + LogHelper mockLogHelper = mock(LogHelper.class); + ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); LoggingServerProvider.init( - new InternalLoggingServerInterceptor.FactoryImpl(mockSink, null, null, null)); + new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper)); assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class); try { LoggingServerProvider.init( - new InternalLoggingServerInterceptor.FactoryImpl(mockSink, null, null, null)); + new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper)); fail("should have failed for calling init() again"); } catch (IllegalStateException e) { assertThat(e).hasMessageThat().contains("LoggingServerProvider already initialized!"); diff --git a/observability/src/test/java/io/grpc/observability/LoggingTest.java b/observability/src/test/java/io/grpc/observability/LoggingTest.java index 8c2a0a6965..cd97188e9d 100644 --- a/observability/src/test/java/io/grpc/observability/LoggingTest.java +++ b/observability/src/test/java/io/grpc/observability/LoggingTest.java @@ -17,23 +17,37 @@ package io.grpc.observability; import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; -import io.grpc.ManagedChannel; +import com.google.common.collect.ImmutableMap; import io.grpc.ManagedChannelBuilder; +import io.grpc.MethodDescriptor; import io.grpc.Server; import io.grpc.ServerBuilder; +import io.grpc.internal.TimeProvider; +import io.grpc.observability.interceptors.ConfigFilterHelper; +import io.grpc.observability.interceptors.ConfigFilterHelper.FilterParams; import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.observability.interceptors.InternalLoggingServerInterceptor; +import io.grpc.observability.interceptors.LogHelper; import io.grpc.observability.logging.GcpLogSink; import io.grpc.observability.logging.Sink; +import io.grpc.observabilitylog.v1.GrpcLogRecord; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.protobuf.SimpleServiceGrpc; import java.io.IOException; +import java.util.Map; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; @RunWith(JUnit4.class) public class LoggingTest { @@ -41,31 +55,129 @@ public class LoggingTest { @Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); - private static final String PROJECT_ID = "project-id"; + private static final String PROJECT_ID = "PROJECT"; + private static final Map locationTags = ImmutableMap.of( + "project_id", "PROJECT", + "location", "us-central1-c", + "cluster_name", "grpc-observability-cluster", + "namespace_name", "default" , + "pod_name", "app1-6c7c58f897-n92c5"); + private static final Map customTags = ImmutableMap.of( + "KEY1", "Value1", + "KEY2", "VALUE2"); + private static final int flushLimit = 100; /** * Cloud logging test using LoggingChannelProvider and LoggingServerProvider. + * + *

Ignoring test, because it calls external CLoud Logging APIs. + * To test cloud logging setup, + * 1. Set up Cloud Logging Auth credentials + * 2. Assign permissions to service account to write logs to project specified by + * variable PROJECT_ID + * 3. Comment @Ignore annotation + *

*/ @Ignore @Test - public void clientServer_interceptorCalled() + public void clientServer_interceptorCalled_logAlways() throws IOException { - Sink sink = new GcpLogSink(PROJECT_ID); + Sink sink = new GcpLogSink(PROJECT_ID, locationTags, customTags, flushLimit); + LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER)); + ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); + FilterParams logAlwaysFilterParams = + FilterParams.create(true, 0, 0); + when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logAlwaysFilterParams); + when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) + .thenReturn(true); LoggingServerProvider.init( - new InternalLoggingServerInterceptor.FactoryImpl(sink, null, null, null)); + new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper)); Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) .build().start(); int port = cleanupRule.register(server).getPort(); LoggingChannelProvider.init( - new InternalLoggingChannelInterceptor.FactoryImpl(sink, null, null, null)); - ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", port) - .usePlaintext().build(); + new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper)); SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( - cleanupRule.register(channel)); + cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) + .usePlaintext().build())); assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) .isEqualTo("Hello buddy"); + assertThat(Mockito.mockingDetails(spyLogHelper).getInvocations().size()).isGreaterThan(11); sink.close(); LoggingChannelProvider.shutdown(); LoggingServerProvider.shutdown(); } + + @Test + public void clientServer_interceptorCalled_logNever() throws IOException { + Sink mockSink = mock(GcpLogSink.class); + LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER)); + ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class); + FilterParams logNeverFilterParams = + FilterParams.create(false, 0, 0); + when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logNeverFilterParams); + when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) + .thenReturn(true); + LoggingServerProvider.init( + new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper)); + Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) + .build().start(); + int port = cleanupRule.register(server).getPort(); + LoggingChannelProvider.init( + new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper)); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( + cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) + .usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + verifyNoInteractions(spyLogHelper); + verifyNoInteractions(mockSink); + LoggingChannelProvider.shutdown(); + LoggingServerProvider.shutdown(); + } + + @Test + public void clientServer_interceptorCalled_doNotLogMessageEvents() throws IOException { + LogHelper mockLogHelper = mock(LogHelper.class); + ConfigFilterHelper mockFilterHelper2 = mock(ConfigFilterHelper.class); + FilterParams logAlwaysFilterParams = + FilterParams.create(true, 0, 0); + when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class))) + .thenReturn(logAlwaysFilterParams); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) + .thenReturn(true); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE)) + .thenReturn(false); + when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE)) + .thenReturn(false); + LoggingServerProvider.init( + new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2)); + Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) + .build().start(); + int port = cleanupRule.register(server).getPort(); + LoggingChannelProvider.init( + new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2)); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( + cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port) + .usePlaintext().build())); + assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) + .isEqualTo("Hello buddy"); + // Total number of calls should have been 14 (6 from client and 6 from server) + // Since cancel is not invoked, it will be 12. + // Request message(Total count:2 (1 from client and 1 from server) and Response message(count:2) + // events are not in the event_types list, i.e 14 - 2(cancel) - 2(req_msg) - 2(resp_msg) = 8 + assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(8); + LoggingChannelProvider.shutdown(); + LoggingServerProvider.shutdown(); + } } diff --git a/observability/src/test/java/io/grpc/observability/ObservabilityConfigImplTest.java b/observability/src/test/java/io/grpc/observability/ObservabilityConfigImplTest.java index 5ca24af67c..cdaeb2e739 100644 --- a/observability/src/test/java/io/grpc/observability/ObservabilityConfigImplTest.java +++ b/observability/src/test/java/io/grpc/observability/ObservabilityConfigImplTest.java @@ -22,8 +22,11 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.collect.ImmutableList; +import io.grpc.observability.ObservabilityConfig.LogFilter; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; import java.io.IOException; +import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -99,23 +102,23 @@ public class ObservabilityConfigImplTest { observabilityConfig.parse(LOG_FILTERS); assertTrue(observabilityConfig.isEnableCloudLogging()); assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing"); - ObservabilityConfig.LogFilter[] logFilters = observabilityConfig.getLogFilters(); - assertThat(logFilters).hasLength(2); - assertThat(logFilters[0].pattern).isEqualTo("*/*"); - assertThat(logFilters[0].headerBytes).isEqualTo(4096); - assertThat(logFilters[0].messageBytes).isEqualTo(2048); - assertThat(logFilters[1].pattern).isEqualTo("service1/Method2"); - assertThat(logFilters[1].headerBytes).isNull(); - assertThat(logFilters[1].messageBytes).isNull(); + List logFilters = observabilityConfig.getLogFilters(); + assertThat(logFilters).hasSize(2); + assertThat(logFilters.get(0).pattern).isEqualTo("*/*"); + assertThat(logFilters.get(0).headerBytes).isEqualTo(4096); + assertThat(logFilters.get(0).messageBytes).isEqualTo(2048); + assertThat(logFilters.get(1).pattern).isEqualTo("service1/Method2"); + assertThat(logFilters.get(1).headerBytes).isNull(); + assertThat(logFilters.get(1).messageBytes).isNull(); } @Test public void eventTypes() throws IOException { observabilityConfig.parse(EVENT_TYPES); assertFalse(observabilityConfig.isEnableCloudLogging()); - EventType[] eventTypes = observabilityConfig.getEventTypes(); + List eventTypes = observabilityConfig.getEventTypes(); assertThat(eventTypes).isEqualTo( - new EventType[]{EventType.GRPC_CALL_REQUEST_HEADER, EventType.GRPC_CALL_HALF_CLOSE, - EventType.GRPC_CALL_TRAILER}); + ImmutableList.of(EventType.GRPC_CALL_REQUEST_HEADER, EventType.GRPC_CALL_HALF_CLOSE, + EventType.GRPC_CALL_TRAILER)); } } diff --git a/observability/src/test/java/io/grpc/observability/interceptors/ConfigFilterHelperTest.java b/observability/src/test/java/io/grpc/observability/interceptors/ConfigFilterHelperTest.java new file mode 100644 index 0000000000..30257b218a --- /dev/null +++ b/observability/src/test/java/io/grpc/observability/interceptors/ConfigFilterHelperTest.java @@ -0,0 +1,219 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.observability.interceptors; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import io.grpc.MethodDescriptor; +import io.grpc.observability.ObservabilityConfig; +import io.grpc.observability.ObservabilityConfig.LogFilter; +import io.grpc.observability.interceptors.ConfigFilterHelper.FilterParams; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; +import io.grpc.testing.TestMethodDescriptors; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.junit.Before; +import org.junit.Test; + +public class ConfigFilterHelperTest { + private static final ImmutableList configLogFilters = + ImmutableList.of( + new LogFilter("service1/Method2",1024,1024), + new LogFilter("service2/*",2048,1024), + new LogFilter("*",128,128), + new LogFilter("service2/*",2048,1024)); + + private static final ImmutableList configEventTypes = + ImmutableList.of( + EventType.GRPC_CALL_REQUEST_HEADER, + EventType.GRPC_CALL_HALF_CLOSE, + EventType.GRPC_CALL_TRAILER); + + private final MethodDescriptor.Builder builder = TestMethodDescriptors.voidMethod() + .toBuilder(); + private MethodDescriptor method; + + private ObservabilityConfig mockConfig; + private ConfigFilterHelper configFilterHelper; + + @Before + public void setup() { + mockConfig = mock(ObservabilityConfig.class); + configFilterHelper = new ConfigFilterHelper(mockConfig); + } + + @Test + public void disableCloudLogging_emptyLogFilters() { + when(mockConfig.isEnableCloudLogging()).thenReturn(false); + assertFalse(configFilterHelper.methodOrServiceFilterPresent); + assertThat(configFilterHelper.perServiceFilters).isEmpty(); + assertThat(configFilterHelper.perServiceFilters).isEmpty(); + assertThat(configFilterHelper.perMethodFilters).isEmpty(); + assertThat(configFilterHelper.logEventTypeSet).isNull(); + } + + @Test + public void enableCloudLogging_emptyLogFilters() { + when(mockConfig.isEnableCloudLogging()).thenReturn(true); + when(mockConfig.getLogFilters()).thenReturn(null); + when(mockConfig.getEventTypes()).thenReturn(null); + configFilterHelper.setMethodOrServiceFilterMaps(); + configFilterHelper.setEventFilterSet(); + + assertFalse(configFilterHelper.methodOrServiceFilterPresent); + assertThat(configFilterHelper.perServiceFilters).isEmpty(); + assertThat(configFilterHelper.perServiceFilters).isEmpty(); + assertThat(configFilterHelper.perMethodFilters).isEmpty(); + assertThat(configFilterHelper.logEventTypeSet).isNull(); + } + + @Test + public void enableCloudLogging_withLogFilters() { + when(mockConfig.isEnableCloudLogging()).thenReturn(true); + when(mockConfig.getLogFilters()).thenReturn(configLogFilters); + when(mockConfig.getEventTypes()).thenReturn(configEventTypes); + + configFilterHelper.setMethodOrServiceFilterMaps(); + configFilterHelper.setEventFilterSet(); + + assertTrue(configFilterHelper.methodOrServiceFilterPresent); + + Map expectedServiceFilters = new HashMap<>(); + expectedServiceFilters.put("*", + FilterParams.create(true, 128, 128)); + expectedServiceFilters.put("service2", + FilterParams.create(true, 2048, 1024)); + assertEquals(configFilterHelper.perServiceFilters, expectedServiceFilters); + + Map expectedMethodFilters = new HashMap<>(); + expectedMethodFilters.put("service1/Method2", + FilterParams.create(true, 1024, 1024)); + assertEquals(configFilterHelper.perMethodFilters, expectedMethodFilters); + + Set expectedLogEventTypeSet = ImmutableSet.copyOf(configEventTypes); + assertEquals(configFilterHelper.logEventTypeSet, expectedLogEventTypeSet); + } + + @Test + public void checkMethodAlwaysLogged() { + List sampleLogFilters = ImmutableList.of( + new LogFilter("*", 4096, 4096)); + when(mockConfig.getLogFilters()).thenReturn(sampleLogFilters); + configFilterHelper.setMethodOrServiceFilterMaps(); + + FilterParams expectedParams = + FilterParams.create(true, 4096, 4096); + method = builder.setFullMethodName("service1/Method6").build(); + FilterParams resultParams + = configFilterHelper.isMethodToBeLogged(method); + assertEquals(resultParams, expectedParams); + } + + @Test + public void checkMethodNotToBeLogged() { + List sampleLogFilters = ImmutableList.of( + new LogFilter("service1/Method2", 1024, 1024), + new LogFilter("service2/*", 2048, 1024)); + when(mockConfig.getLogFilters()).thenReturn(sampleLogFilters); + configFilterHelper.setMethodOrServiceFilterMaps(); + + FilterParams expectedParams = + FilterParams.create(false, 0, 0); + method = builder.setFullMethodName("service3/Method3").build(); + FilterParams resultParams + = configFilterHelper.isMethodToBeLogged(method); + assertEquals(resultParams, expectedParams); + } + + @Test + public void checkMethodToBeLoggedConditional() { + when(mockConfig.getLogFilters()).thenReturn(configLogFilters); + configFilterHelper.setMethodOrServiceFilterMaps(); + + FilterParams expectedParams = + FilterParams.create(true, 1024, 1024); + method = builder.setFullMethodName("service1/Method2").build(); + FilterParams resultParams + = configFilterHelper.isMethodToBeLogged(method); + assertEquals(resultParams, expectedParams); + + FilterParams expectedParamsWildCard = + FilterParams.create(true, 2048, 1024); + method = builder.setFullMethodName("service2/Method1").build(); + FilterParams resultParamsWildCard + = configFilterHelper.isMethodToBeLogged(method); + assertEquals(resultParamsWildCard, expectedParamsWildCard); + } + + @Test + public void checkEventToBeLogged_noFilter_defaultLogAllEventTypes() { + List eventList = new ArrayList<>(); + eventList.add(EventType.GRPC_CALL_REQUEST_HEADER); + eventList.add(EventType.GRPC_CALL_RESPONSE_HEADER); + eventList.add(EventType.GRPC_CALL_REQUEST_MESSAGE); + eventList.add(EventType.GRPC_CALL_RESPONSE_MESSAGE); + eventList.add(EventType.GRPC_CALL_HALF_CLOSE); + eventList.add(EventType.GRPC_CALL_TRAILER); + eventList.add(EventType.GRPC_CALL_CANCEL); + + for (EventType event : eventList) { + assertTrue(configFilterHelper.isEventToBeLogged(event)); + } + } + + + @Test + public void checkEventToBeLogged_emptyFilter_doNotLogEventTypes() { + when(mockConfig.getEventTypes()).thenReturn(new ArrayList<>()); + configFilterHelper.setEventFilterSet(); + + List eventList = new ArrayList<>(); + eventList.add(EventType.GRPC_CALL_REQUEST_HEADER); + eventList.add(EventType.GRPC_CALL_RESPONSE_HEADER); + eventList.add(EventType.GRPC_CALL_REQUEST_MESSAGE); + eventList.add(EventType.GRPC_CALL_RESPONSE_MESSAGE); + eventList.add(EventType.GRPC_CALL_HALF_CLOSE); + eventList.add(EventType.GRPC_CALL_TRAILER); + eventList.add(EventType.GRPC_CALL_CANCEL); + + for (EventType event : eventList) { + assertFalse(configFilterHelper.isEventToBeLogged(event)); + } + } + + @Test + public void checkEventToBeLogged_withEventTypesFromConfig() { + when(mockConfig.getEventTypes()).thenReturn(configEventTypes); + configFilterHelper.setEventFilterSet(); + + EventType logEventType = EventType.GRPC_CALL_REQUEST_HEADER; + assertTrue(configFilterHelper.isEventToBeLogged(logEventType)); + + EventType doNotLogEventType = EventType.GRPC_CALL_RESPONSE_MESSAGE; + assertFalse(configFilterHelper.isEventToBeLogged(doNotLogEventType)); + } +} diff --git a/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java index ae14557566..325b2e715d 100644 --- a/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java +++ b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java @@ -18,16 +18,23 @@ package io.grpc.observability.interceptors; import static com.google.common.truth.Truth.assertThat; import static io.grpc.observability.interceptors.LogHelperTest.BYTEARRAY_MARSHALLER; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import com.google.common.util.concurrent.SettableFuture; -import com.google.protobuf.ByteString; import com.google.protobuf.Duration; import com.google.protobuf.util.Durations; import io.grpc.Attributes; @@ -42,16 +49,16 @@ import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; import io.grpc.internal.NoopClientCall; -import io.grpc.observability.logging.GcpLogSink; -import io.grpc.observability.logging.Sink; +import io.grpc.observability.interceptors.ConfigFilterHelper.FilterParams; import io.grpc.observabilitylog.v1.GrpcLogRecord; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; - -import io.grpc.observabilitylog.v1.GrpcLogRecord.MetadataEntry; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -60,7 +67,10 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.AdditionalMatchers; import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -73,7 +83,7 @@ public class InternalLoggingChannelInterceptorTest { @Rule public final MockitoRule mockito = MockitoJUnit.rule(); - private static final Charset US_ASCII = Charset.forName("US-ASCII"); + private static final Charset US_ASCII = StandardCharsets.US_ASCII; private InternalLoggingChannelInterceptor.Factory factory; private AtomicReference> interceptedListener; @@ -82,20 +92,28 @@ public class InternalLoggingChannelInterceptorTest { private SettableFuture halfCloseCalled; private SettableFuture cancelCalled; private SocketAddress peer; - private final Sink mockSink = mock(GcpLogSink.class); + private LogHelper mockLogHelper; + private ConfigFilterHelper mockFilterHelper; + private FilterParams filterParams; @Before public void setup() throws Exception { - factory = new InternalLoggingChannelInterceptor.FactoryImpl(mockSink, null, null, null); + mockLogHelper = mock(LogHelper.class); + mockFilterHelper = mock(ConfigFilterHelper.class); + factory = new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper); interceptedListener = new AtomicReference<>(); actualClientInitial = new AtomicReference<>(); actualRequest = new AtomicReference<>(); halfCloseCalled = SettableFuture.create(); cancelCalled = SettableFuture.create(); peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234); + filterParams = FilterParams.create(true, 0, 0); + when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) + .thenReturn(true); } @Test + @SuppressWarnings("unchecked") public void internalLoggingChannelInterceptor() throws Exception { Channel channel = new Channel() { @Override @@ -137,6 +155,382 @@ public class InternalLoggingChannelInterceptorTest { } }; + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + when(mockFilterHelper.isMethodToBeLogged(method)) + .thenReturn(filterParams); + + ClientCall interceptedLoggingCall = + factory.create() + .interceptCall(method, + CallOptions.DEFAULT, + channel); + + // send request header + { + Metadata clientInitial = new Metadata(); + String dataA = "aaaaaaaaa"; + String dataB = "bbbbbbbbb"; + Metadata.Key keyA = + Metadata.Key.of("a", Metadata.ASCII_STRING_MARSHALLER); + Metadata.Key keyB = + Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER); + clientInitial.put(keyA, dataA); + clientInitial.put(keyB, dataB); + interceptedLoggingCall.start(mockListener, clientInitial); + verify(mockLogHelper).logRequestHeader( + /*seq=*/ eq(1L), + eq("service"), + eq("method"), + eq("the-authority"), + ArgumentMatchers.isNull(), + same(clientInitial), + eq(filterParams.headerBytes()), + eq(EventLogger.LOGGER_CLIENT), + anyString(), + ArgumentMatchers.isNull()); + verifyNoMoreInteractions(mockLogHelper); + assertSame(clientInitial, actualClientInitial.get()); + } + + reset(mockLogHelper); + reset(mockListener); + + // receive response header + { + Metadata serverInitial = new Metadata(); + interceptedListener.get().onHeaders(serverInitial); + verify(mockLogHelper).logResponseHeader( + /*seq=*/ eq(2L), + eq("service"), + eq("method"), + same(serverInitial), + eq(filterParams.headerBytes()), + eq(EventLogger.LOGGER_CLIENT), + anyString(), + same(peer)); + verifyNoMoreInteractions(mockLogHelper); + verify(mockListener).onHeaders(same(serverInitial)); + } + + reset(mockLogHelper); + reset(mockListener); + + // send request message + { + byte[] request = "this is a request".getBytes(US_ASCII); + interceptedLoggingCall.sendMessage(request); + verify(mockLogHelper).logRpcMessage( + /*seq=*/ eq(3L), + eq("service"), + eq("method"), + eq(EventType.GRPC_CALL_REQUEST_MESSAGE), + same(request), + eq(filterParams.messageBytes()), + eq(EventLogger.LOGGER_CLIENT), + anyString()); + verifyNoMoreInteractions(mockLogHelper); + assertSame(request, actualRequest.get()); + } + + reset(mockLogHelper); + reset(mockListener); + + // client half close + { + interceptedLoggingCall.halfClose(); + verify(mockLogHelper).logHalfClose( + /*seq=*/ eq(4L), + eq("service"), + eq("method"), + eq(EventLogger.LOGGER_CLIENT), + anyString()); + halfCloseCalled.get(1, TimeUnit.MILLISECONDS); + verifyNoMoreInteractions(mockLogHelper); + } + + reset(mockLogHelper); + reset(mockListener); + + // receive response message + { + byte[] response = "this is a response".getBytes(US_ASCII); + interceptedListener.get().onMessage(response); + verify(mockLogHelper).logRpcMessage( + /*seq=*/ eq(5L), + eq("service"), + eq("method"), + eq(EventType.GRPC_CALL_RESPONSE_MESSAGE), + same(response), + eq(filterParams.messageBytes()), + eq(EventLogger.LOGGER_CLIENT), + anyString()); + verifyNoMoreInteractions(mockLogHelper); + verify(mockListener).onMessage(same(response)); + } + + reset(mockLogHelper); + reset(mockListener); + + // receive trailer + { + Status status = Status.INTERNAL.withDescription("trailer description"); + Metadata trailers = new Metadata(); + interceptedListener.get().onClose(status, trailers); + verify(mockLogHelper).logTrailer( + /*seq=*/ eq(6L), + eq("service"), + eq("method"), + same(status), + same(trailers), + eq(filterParams.headerBytes()), + eq(EventLogger.LOGGER_CLIENT), + anyString(), + same(peer)); + verifyNoMoreInteractions(mockLogHelper); + verify(mockListener).onClose(same(status), same(trailers)); + } + + reset(mockLogHelper); + reset(mockListener); + + // cancel + { + interceptedLoggingCall.cancel(null, null); + verify(mockLogHelper).logCancel( + /*seq=*/ eq(7L), + eq("service"), + eq("method"), + eq(EventLogger.LOGGER_CLIENT), + anyString()); + cancelCalled.get(1, TimeUnit.MILLISECONDS); + } + } + + @Test + public void clientDeadLineLogged_deadlineSetViaCallOption() { + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + when(mockFilterHelper.isMethodToBeLogged(method)) + .thenReturn(filterParams); + @SuppressWarnings("unchecked") + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + + ClientCall interceptedLoggingCall = + factory.create() + .interceptCall( + method, + CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS), + new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions) { + return new NoopClientCall<>(); + } + + @Override + public String authority() { + return "the-authority"; + } + }); + interceptedLoggingCall.start(mockListener, new Metadata()); + ArgumentCaptor callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class); + verify(mockLogHelper, times(1)) + .logRequestHeader( + anyLong(), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + callOptTimeoutCaptor.capture(), + any(Metadata.class), + anyInt(), + any(GrpcLogRecord.EventLogger.class), + anyString(), + AdditionalMatchers.or(ArgumentMatchers.isNull(), + ArgumentMatchers.any())); + Duration timeout = callOptTimeoutCaptor.getValue(); + assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout)) + .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); + } + + @Test + public void clientDeadlineLogged_deadlineSetViaContext() throws Exception { + final SettableFuture> callFuture = SettableFuture.create(); + Context.current() + .withDeadline( + Deadline.after(1, TimeUnit.SECONDS), + Executors.newSingleThreadScheduledExecutor()) + .run(() -> { + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + when(mockFilterHelper.isMethodToBeLogged(method)) + .thenReturn(filterParams); + + callFuture.set( + factory.create() + .interceptCall( + method, + CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS), + new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions) { + return new NoopClientCall<>(); + } + + @Override + public String authority() { + return "the-authority"; + } + })); + }); + @SuppressWarnings("unchecked") + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + Objects.requireNonNull(callFuture.get()).start(mockListener, new Metadata()); + ArgumentCaptor contextTimeoutCaptor = ArgumentCaptor.forClass(Duration.class); + verify(mockLogHelper, times(1)) + .logRequestHeader( + anyLong(), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + contextTimeoutCaptor.capture(), + any(Metadata.class), + anyInt(), + any(GrpcLogRecord.EventLogger.class), + anyString(), + AdditionalMatchers.or(ArgumentMatchers.isNull(), + ArgumentMatchers.any())); + Duration timeout = contextTimeoutCaptor.getValue(); + assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout)) + .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); + } + + @Test + public void clientDeadlineLogged_deadlineSetViaContextAndCallOptions() throws Exception { + Deadline contextDeadline = Deadline.after(10, TimeUnit.SECONDS); + Deadline callOptionsDeadline = CallOptions.DEFAULT + .withDeadlineAfter(15, TimeUnit.SECONDS).getDeadline(); + + final SettableFuture> callFuture = SettableFuture.create(); + Context.current() + .withDeadline( + contextDeadline, Executors.newSingleThreadScheduledExecutor()) + .run(() -> { + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + when(mockFilterHelper.isMethodToBeLogged(method)) + .thenReturn(filterParams); + + callFuture.set( + factory.create() + .interceptCall( + method, + CallOptions.DEFAULT.withDeadlineAfter(15, TimeUnit.SECONDS), + new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions) { + return new NoopClientCall<>(); + } + + @Override + public String authority() { + return "the-authority"; + } + })); + }); + @SuppressWarnings("unchecked") + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + Objects.requireNonNull(callFuture.get()).start(mockListener, new Metadata()); + ArgumentCaptor timeoutCaptor = ArgumentCaptor.forClass(Duration.class); + verify(mockLogHelper, times(1)) + .logRequestHeader( + anyLong(), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + timeoutCaptor.capture(), + any(Metadata.class), + anyInt(), + any(GrpcLogRecord.EventLogger.class), + anyString(), + AdditionalMatchers.or(ArgumentMatchers.isNull(), + ArgumentMatchers.any())); + Duration timeout = timeoutCaptor.getValue(); + assertThat(LogHelper.min(contextDeadline, callOptionsDeadline)) + .isSameInstanceAs(contextDeadline); + assertThat(TimeUnit.SECONDS.toNanos(10) - Durations.toNanos(timeout)) + .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); + } + + @Test + public void clientMethodOrServiceFilter_disabled() { + Channel channel = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + return new NoopClientCall() { + @Override + @SuppressWarnings("unchecked") + public void start(Listener responseListener, Metadata headers) { + interceptedListener.set((Listener) responseListener); + actualClientInitial.set(headers); + } + + @Override + public void sendMessage(RequestT message) { + actualRequest.set(message); + } + + @Override + public void cancel(String message, Throwable cause) { + cancelCalled.set(null); + } + + @Override + public void halfClose() { + halfCloseCalled.set(null); + } + + @Override + public Attributes getAttributes() { + return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build(); + } + }; + } + + @Override + public String authority() { + return "the-authority"; + } + }; + @SuppressWarnings("unchecked") ClientCall.Listener mockListener = mock(ClientCall.Listener.class); @@ -147,6 +541,8 @@ public class InternalLoggingChannelInterceptorTest { .setRequestMarshaller(BYTEARRAY_MARSHALLER) .setResponseMarshaller(BYTEARRAY_MARSHALLER) .build(); + when(mockFilterHelper.isMethodToBeLogged(method)) + .thenReturn(FilterParams.create(false, 0, 0)); ClientCall interceptedLoggingCall = factory.create() @@ -154,130 +550,55 @@ public class InternalLoggingChannelInterceptorTest { CallOptions.DEFAULT, channel); - // send client header - { - EventType expectedRequestHeaderEvent = EventType.GRPC_CALL_REQUEST_HEADER; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); - Metadata clientInitial = new Metadata(); - String dataA = "aaaaaaaaa"; - String dataB = "bbbbbbbbb"; - Metadata.Key keyA = - Metadata.Key.of("a", Metadata.ASCII_STRING_MARSHALLER); - Metadata.Key keyB = - Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER); - MetadataEntry entryA = - MetadataEntry - .newBuilder() - .setKey(keyA.name()) - .setValue(ByteString.copyFrom(dataA.getBytes(US_ASCII))) - .build(); - MetadataEntry entryB = - MetadataEntry - .newBuilder() - .setKey(keyB.name()) - .setValue(ByteString.copyFrom(dataB.getBytes(US_ASCII))) - .build(); - clientInitial.put(keyA, dataA); - clientInitial.put(keyB, dataB); - GrpcLogRecord.Metadata expectedMetadata = GrpcLogRecord.Metadata - .newBuilder() - .addEntry(entryA) - .addEntry(entryB) - .build(); - interceptedLoggingCall.start(mockListener, clientInitial); - verify(mockSink).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedRequestHeaderEvent); - assertEquals(captor.getValue().getSequenceId(), 1L); - assertEquals(captor.getValue().getServiceName(), "service"); - assertEquals(captor.getValue().getMethodName(), "method"); - assertEquals(captor.getValue().getAuthority(), "the-authority"); - assertEquals(captor.getValue().getMetadata(), expectedMetadata); - verifyNoMoreInteractions(mockSink); - assertSame(clientInitial, actualClientInitial.get()); - } - - // TODO(dnvindhya) : Add a helper method to verify other fields of GrpcLogRecord for all events - // receive server header - { - EventType expectedResponseHeaderEvent = EventType.GRPC_CALL_RESPONSE_HEADER; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); - Metadata serverInitial = new Metadata(); - interceptedListener.get().onHeaders(serverInitial); - verify(mockSink, times(2)).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedResponseHeaderEvent); - verifyNoMoreInteractions(mockSink); - verify(mockListener).onHeaders(same(serverInitial)); - } - - // send client message - { - EventType expectedRequestMessageEvent = EventType.GRPC_CALL_REQUEST_MESSAGE; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); - byte[] request = "this is a request".getBytes(US_ASCII); - interceptedLoggingCall.sendMessage(request); - verify(mockSink, times(3)).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedRequestMessageEvent); - verifyNoMoreInteractions(mockSink); - assertSame(request, actualRequest.get()); - } - - // client half close - { - EventType expectedHalfCloseEvent = EventType.GRPC_CALL_HALF_CLOSE; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); - interceptedLoggingCall.halfClose(); - verify(mockSink, times(4)).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedHalfCloseEvent); - halfCloseCalled.get(1, TimeUnit.SECONDS); - verifyNoMoreInteractions(mockSink); - } - - // receive server message - { - EventType expectedResponseMessageEvent = EventType.GRPC_CALL_RESPONSE_MESSAGE; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); - byte[] response = "this is a response".getBytes(US_ASCII); - interceptedListener.get().onMessage(response); - verify(mockSink, times(5)).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedResponseMessageEvent); - verifyNoMoreInteractions(mockSink); - verify(mockListener).onMessage(same(response)); - } - - // receive trailer - { - EventType expectedTrailerEvent = EventType.GRPC_CALL_TRAILER; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); - Status status = Status.INTERNAL.withDescription("trailer description"); - Metadata trailers = new Metadata(); - - interceptedListener.get().onClose(status, trailers); - verify(mockSink, times(6)).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedTrailerEvent); - verifyNoMoreInteractions(mockSink); - verify(mockListener).onClose(same(status), same(trailers)); - } - - // cancel - { - EventType expectedCancelEvent = EventType.GRPC_CALL_CANCEL; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); - interceptedLoggingCall.cancel(null, null); - verify(mockSink, times(7)).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedCancelEvent); - cancelCalled.get(1, TimeUnit.SECONDS); - } + interceptedLoggingCall.start(mockListener, new Metadata()); + verifyNoInteractions(mockLogHelper); } @Test - public void clientDeadLineLogged_deadlineSetViaCallOption() { + public void clientMethodOrServiceFilter_enabled() { + Channel channel = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + return new NoopClientCall() { + @Override + @SuppressWarnings("unchecked") + public void start(Listener responseListener, Metadata headers) { + interceptedListener.set((Listener) responseListener); + actualClientInitial.set(headers); + } + + @Override + public void sendMessage(RequestT message) { + actualRequest.set(message); + } + + @Override + public void cancel(String message, Throwable cause) { + cancelCalled.set(null); + } + + @Override + public void halfClose() { + halfCloseCalled.set(null); + } + + @Override + public Attributes getAttributes() { + return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build(); + } + }; + } + + @Override + public String authority() { + return "the-authority"; + } + }; + + @SuppressWarnings("unchecked") + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + MethodDescriptor method = MethodDescriptor.newBuilder() .setType(MethodType.UNKNOWN) @@ -285,132 +606,129 @@ public class InternalLoggingChannelInterceptorTest { .setRequestMarshaller(BYTEARRAY_MARSHALLER) .setResponseMarshaller(BYTEARRAY_MARSHALLER) .build(); - @SuppressWarnings("unchecked") - ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + when(mockFilterHelper.isMethodToBeLogged(method)) + .thenReturn(FilterParams.create(true, 10, 10)); ClientCall interceptedLoggingCall = factory.create() - .interceptCall( - method, - CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS), - new Channel() { - @Override - public ClientCall newCall( - MethodDescriptor methodDescriptor, - CallOptions callOptions) { - return new NoopClientCall<>(); - } + .interceptCall(method, + CallOptions.DEFAULT, + channel); - @Override - public String authority() { - return "the-authority"; - } - }); - interceptedLoggingCall.start(mockListener, new Metadata()); - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); - verify(mockSink, times(1)).write(captor.capture()); - Duration timeout = captor.getValue().getTimeout(); - assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout)) - .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); + { + interceptedLoggingCall.start(mockListener, new Metadata()); + interceptedListener.get().onHeaders(new Metadata()); + byte[] request = "this is a request".getBytes(US_ASCII); + interceptedLoggingCall.sendMessage(request); + interceptedLoggingCall.halfClose(); + byte[] response = "this is a response".getBytes(US_ASCII); + interceptedListener.get().onMessage(response); + Status status = Status.INTERNAL.withDescription("trailer description"); + Metadata trailers = new Metadata(); + interceptedListener.get().onClose(status, trailers); + interceptedLoggingCall.cancel(null, null); + assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(7); + } } @Test - public void clientDeadlineLogged_deadlineSetViaContext() throws Exception { - final SettableFuture> callFuture = SettableFuture.create(); - Context.current() - .withDeadline( - Deadline.after(1, TimeUnit.SECONDS), Executors.newSingleThreadScheduledExecutor()) - .run(new Runnable() { + public void eventFilter_enabled() { + when(mockFilterHelper.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)).thenReturn(false); + when(mockFilterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)).thenReturn(false); + + Channel channel = new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + return new NoopClientCall() { @Override - public void run() { - MethodDescriptor method = - MethodDescriptor.newBuilder() - .setType(MethodType.UNKNOWN) - .setFullMethodName("service/method") - .setRequestMarshaller(BYTEARRAY_MARSHALLER) - .setResponseMarshaller(BYTEARRAY_MARSHALLER) - .build(); - - callFuture.set( - factory.create() - .interceptCall( - method, - CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS), - new Channel() { - @Override - public ClientCall newCall( - MethodDescriptor methodDescriptor, - CallOptions callOptions) { - return new NoopClientCall<>(); - } - - @Override - public String authority() { - return "the-authority"; - } - })); + @SuppressWarnings("unchecked") + public void start(Listener responseListener, Metadata headers) { + interceptedListener.set((Listener) responseListener); + actualClientInitial.set(headers); } - }); + + @Override + public void sendMessage(RequestT message) { + actualRequest.set(message); + } + + @Override + public void cancel(String message, Throwable cause) { + cancelCalled.set(null); + } + + @Override + public void halfClose() { + halfCloseCalled.set(null); + } + + @Override + public Attributes getAttributes() { + return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build(); + } + }; + } + + @Override + public String authority() { + return "the-authority"; + } + }; + @SuppressWarnings("unchecked") ClientCall.Listener mockListener = mock(ClientCall.Listener.class); - callFuture.get().start(mockListener, new Metadata()); - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); - verify(mockSink, times(1)).write(captor.capture()); - Duration timeout = captor.getValue().getTimeout(); - assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout)) - .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); - } - @Test - public void clientDeadlineLogged_deadlineSetViaContextAndCallOptions() throws Exception { - Deadline contextDeadline = Deadline.after(10, TimeUnit.SECONDS); - Deadline callOptionsDeadline = CallOptions.DEFAULT - .withDeadlineAfter(15, TimeUnit.SECONDS).getDeadline(); + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + when(mockFilterHelper.isMethodToBeLogged(method)) + .thenReturn(FilterParams.create(true, 10, 10)); - final SettableFuture> callFuture = SettableFuture.create(); - Context.current() - .withDeadline( - contextDeadline, Executors.newSingleThreadScheduledExecutor()) - .run(new Runnable() { - @Override - public void run() { - MethodDescriptor method = - MethodDescriptor.newBuilder() - .setType(MethodType.UNKNOWN) - .setFullMethodName("service/method") - .setRequestMarshaller(BYTEARRAY_MARSHALLER) - .setResponseMarshaller(BYTEARRAY_MARSHALLER) - .build(); + ClientCall interceptedLoggingCall = + factory.create() + .interceptCall(method, + CallOptions.DEFAULT, + channel); - callFuture.set( - factory.create() - .interceptCall( - method, - CallOptions.DEFAULT.withDeadlineAfter(15, TimeUnit.SECONDS), - new Channel() { - @Override - public ClientCall newCall( - MethodDescriptor methodDescriptor, - CallOptions callOptions) { - return new NoopClientCall<>(); - } - - @Override - public String authority() { - return "the-authority"; - } - })); - } - }); - @SuppressWarnings("unchecked") - ClientCall.Listener mockListener = mock(ClientCall.Listener.class); - callFuture.get().start(mockListener, new Metadata()); - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); - verify(mockSink, times(1)).write(captor.capture()); - Duration timeout = captor.getValue().getTimeout(); - assertThat(LogHelper.min(contextDeadline, callOptionsDeadline)) - .isSameInstanceAs(contextDeadline); - assertThat(TimeUnit.SECONDS.toNanos(10) - Durations.toNanos(timeout)) - .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); + { + interceptedLoggingCall.start(mockListener, new Metadata()); + verify(mockLogHelper, never()).logRequestHeader( + anyLong(), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + any(Duration.class), + any(Metadata.class), + anyInt(), + any(GrpcLogRecord.EventLogger.class), + anyString(), + AdditionalMatchers.or(ArgumentMatchers.isNull(), + ArgumentMatchers.any())); + interceptedListener.get().onHeaders(new Metadata()); + verify(mockLogHelper, never()).logResponseHeader( + anyLong(), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + any(Metadata.class), + anyInt(), + any(GrpcLogRecord.EventLogger.class), + anyString(), + ArgumentMatchers.any()); + byte[] request = "this is a request".getBytes(US_ASCII); + interceptedLoggingCall.sendMessage(request); + interceptedLoggingCall.halfClose(); + byte[] response = "this is a response".getBytes(US_ASCII); + interceptedListener.get().onMessage(response); + Status status = Status.INTERNAL.withDescription("trailer description"); + Metadata trailers = new Metadata(); + interceptedListener.get().onClose(status, trailers); + interceptedLoggingCall.cancel(null, null); + assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(5); + } } } diff --git a/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptorTest.java b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptorTest.java index 17813f0a27..f3a245f63a 100644 --- a/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptorTest.java +++ b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptorTest.java @@ -18,13 +18,20 @@ package io.grpc.observability.interceptors; import static com.google.common.truth.Truth.assertThat; import static io.grpc.observability.interceptors.LogHelperTest.BYTEARRAY_MARSHALLER; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import com.google.protobuf.Duration; import com.google.protobuf.util.Durations; @@ -37,14 +44,15 @@ import io.grpc.MethodDescriptor.MethodType; import io.grpc.ServerCall; import io.grpc.Status; import io.grpc.internal.NoopServerCall; -import io.grpc.observability.logging.GcpLogSink; -import io.grpc.observability.logging.Sink; +import io.grpc.observability.interceptors.ConfigFilterHelper.FilterParams; import io.grpc.observabilitylog.v1.GrpcLogRecord; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -53,7 +61,10 @@ import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.AdditionalMatchers; import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -66,7 +77,7 @@ public class InternalLoggingServerInterceptorTest { @Rule public final MockitoRule mockito = MockitoJUnit.rule(); - private static final Charset US_ASCII = Charset.forName("US-ASCII"); + private static final Charset US_ASCII = StandardCharsets.US_ASCII; private InternalLoggingServerInterceptor.Factory factory; private AtomicReference> interceptedLoggingCall; @@ -77,13 +88,16 @@ public class InternalLoggingServerInterceptorTest { private AtomicReference actualResponse; private AtomicReference actualStatus; private AtomicReference actualTrailers; - private final Sink mockSink = mock(GcpLogSink.class); + private LogHelper mockLogHelper; + private ConfigFilterHelper mockFilterHelper; private SocketAddress peer; @Before @SuppressWarnings("unchecked") public void setup() throws Exception { - factory = new InternalLoggingServerInterceptor.FactoryImpl(mockSink, null, null, null); + mockLogHelper = mock(LogHelper.class); + mockFilterHelper = mock(ConfigFilterHelper.class); + factory = new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper); interceptedLoggingCall = new AtomicReference<>(); mockListener = mock(ServerCall.Listener.class); actualServerInitial = new AtomicReference<>(); @@ -91,9 +105,12 @@ public class InternalLoggingServerInterceptorTest { actualStatus = new AtomicReference<>(); actualTrailers = new AtomicReference<>(); peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234); + when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))) + .thenReturn(true); } @Test + @SuppressWarnings("unchecked") public void internalLoggingServerInterceptor() { Metadata clientInitial = new Metadata(); final MethodDescriptor method = @@ -103,6 +120,8 @@ public class InternalLoggingServerInterceptorTest { .setRequestMarshaller(BYTEARRAY_MARSHALLER) .setResponseMarshaller(BYTEARRAY_MARSHALLER) .build(); + FilterParams filterParams = FilterParams.create(true, 0, 0); + when(mockFilterHelper.isMethodToBeLogged(method)).thenReturn(filterParams); capturedListener = factory.create() .interceptCall( @@ -148,89 +167,131 @@ public class InternalLoggingServerInterceptorTest { }); // receive request header { - EventType expectedRequestHeaderEvent = EventType.GRPC_CALL_REQUEST_HEADER; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); - verify(mockSink).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedRequestHeaderEvent); - verifyNoMoreInteractions(mockSink); + verify(mockLogHelper).logRequestHeader( + /*seq=*/ eq(1L), + eq("service"), + eq("method"), + eq("the-authority"), + ArgumentMatchers.isNull(), + same(clientInitial), + eq(filterParams.headerBytes()), + eq(EventLogger.LOGGER_SERVER), + anyString(), + same(peer)); + verifyNoMoreInteractions(mockLogHelper); } + reset(mockLogHelper); + reset(mockListener); + // send response header { - EventType expectedResponseHeaderEvent = EventType.GRPC_CALL_RESPONSE_HEADER; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); - Metadata serverInital = new Metadata(); - interceptedLoggingCall.get().sendHeaders(serverInital); - verify(mockSink, times(2)).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedResponseHeaderEvent); - verifyNoMoreInteractions(mockSink); - assertSame(serverInital, actualServerInitial.get()); + Metadata serverInitial = new Metadata(); + interceptedLoggingCall.get().sendHeaders(serverInitial); + verify(mockLogHelper).logResponseHeader( + /*seq=*/ eq(2L), + eq("service"), + eq("method"), + same(serverInitial), + eq(filterParams.headerBytes()), + eq(EventLogger.LOGGER_SERVER), + anyString(), + ArgumentMatchers.isNull()); + verifyNoMoreInteractions(mockLogHelper); + assertSame(serverInitial, actualServerInitial.get()); } + reset(mockLogHelper); + reset(mockListener); + // receive request message { - EventType expectedRequestMessageEvent = EventType.GRPC_CALL_REQUEST_MESSAGE; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); byte[] request = "this is a request".getBytes(US_ASCII); capturedListener.onMessage(request); - verify(mockSink, times(3)).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedRequestMessageEvent); - verifyNoMoreInteractions(mockSink); + verify(mockLogHelper).logRpcMessage( + /*seq=*/ eq(3L), + eq("service"), + eq("method"), + eq(EventType.GRPC_CALL_REQUEST_MESSAGE), + same(request), + eq(filterParams.messageBytes()), + eq(EventLogger.LOGGER_SERVER), + anyString()); + verifyNoMoreInteractions(mockLogHelper); verify(mockListener).onMessage(same(request)); } + reset(mockLogHelper); + reset(mockListener); + // client half close { - EventType expectedHalfCloseEvent = EventType.GRPC_CALL_HALF_CLOSE; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); capturedListener.onHalfClose(); - verify(mockSink, times(4)).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedHalfCloseEvent); - verifyNoMoreInteractions(mockSink); + verify(mockLogHelper).logHalfClose( + /*seq=*/ eq(4L), + eq("service"), + eq("method"), + eq(EventLogger.LOGGER_SERVER), + anyString()); + verifyNoMoreInteractions(mockLogHelper); verify(mockListener).onHalfClose(); } + reset(mockLogHelper); + reset(mockListener); + // send response message { - EventType expectedResponseMessageEvent = EventType.GRPC_CALL_RESPONSE_MESSAGE; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); byte[] response = "this is a response".getBytes(US_ASCII); interceptedLoggingCall.get().sendMessage(response); - verify(mockSink, times(5)).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedResponseMessageEvent); - verifyNoMoreInteractions(mockSink); + verify(mockLogHelper).logRpcMessage( + /*seq=*/ eq(5L), + eq("service"), + eq("method"), + eq(EventType.GRPC_CALL_RESPONSE_MESSAGE), + same(response), + eq(filterParams.messageBytes()), + eq(EventLogger.LOGGER_SERVER), + anyString()); + verifyNoMoreInteractions(mockLogHelper); assertSame(response, actualResponse.get()); } + reset(mockLogHelper); + reset(mockListener); + // send trailer { - EventType expectedTrailerEvent = EventType.GRPC_CALL_TRAILER; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); Status status = Status.INTERNAL.withDescription("trailer description"); Metadata trailers = new Metadata(); - interceptedLoggingCall.get().close(status, trailers); - verify(mockSink, times(6)).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedTrailerEvent); - verifyNoMoreInteractions(mockSink); + verify(mockLogHelper).logTrailer( + /*seq=*/ eq(6L), + eq("service"), + eq("method"), + same(status), + same(trailers), + eq(filterParams.headerBytes()), + eq(EventLogger.LOGGER_SERVER), + anyString(), + ArgumentMatchers.isNull()); + verifyNoMoreInteractions(mockLogHelper); assertSame(status, actualStatus.get()); assertSame(trailers, actualTrailers.get()); } + reset(mockLogHelper); + reset(mockListener); + // cancel { - EventType expectedCancelEvent = EventType.GRPC_CALL_CANCEL; - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); capturedListener.onCancel(); - verify(mockSink, times(7)).write(captor.capture()); - assertEquals(captor.getValue().getEventType(), - expectedCancelEvent); + verify(mockLogHelper).logCancel( + /*seq=*/ eq(7L), + eq("service"), + eq("method"), + eq(EventLogger.LOGGER_SERVER), + anyString()); verify(mockListener).onCancel(); } } @@ -244,6 +305,8 @@ public class InternalLoggingServerInterceptorTest { .setRequestMarshaller(BYTEARRAY_MARSHALLER) .setResponseMarshaller(BYTEARRAY_MARSHALLER) .build(); + FilterParams filterParams = FilterParams.create(true, 0, 0); + when(mockFilterHelper.isMethodToBeLogged(method)).thenReturn(filterParams); final ServerCall noopServerCall = new NoopServerCall() { @Override public MethodDescriptor getMethodDescriptor() { @@ -265,15 +328,233 @@ public class InternalLoggingServerInterceptorTest { factory.create() .interceptCall(noopServerCall, new Metadata(), - (call, headers) -> { - return new ServerCall.Listener() {}; - }); + (call, headers) -> new ServerCall.Listener() {}); }); - ArgumentCaptor captor = ArgumentCaptor.forClass(GrpcLogRecord.class); - verify(mockSink, times(1)).write(captor.capture()); - verifyNoMoreInteractions(mockSink); - Duration timeout = captor.getValue().getTimeout(); + ArgumentCaptor timeoutCaptor = ArgumentCaptor.forClass(Duration.class); + verify(mockLogHelper, times(1)) + .logRequestHeader( + /*seq=*/ eq(1L), + eq("service"), + eq("method"), + eq("the-authority"), + timeoutCaptor.capture(), + any(Metadata.class), + eq(filterParams.headerBytes()), + eq(EventLogger.LOGGER_SERVER), + anyString(), + ArgumentMatchers.isNull()); + verifyNoMoreInteractions(mockLogHelper); + Duration timeout = timeoutCaptor.getValue(); assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout)) .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); } + + @Test + public void serverMethodOrServiceFilter_disabled() { + Metadata clientInitial = new Metadata(); + final MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + when(mockFilterHelper.isMethodToBeLogged(method)).thenReturn(FilterParams.create(false, 0, 0)); + capturedListener = + factory.create() + .interceptCall( + new NoopServerCall() { + @Override + public void sendHeaders(Metadata headers) { + actualServerInitial.set(headers); + } + + @Override + public void sendMessage(byte[] message) { + actualResponse.set(message); + } + + @Override + public void close(Status status, Metadata trailers) { + actualStatus.set(status); + actualTrailers.set(trailers); + } + + @Override + public MethodDescriptor getMethodDescriptor() { + return method; + } + + @Override + public Attributes getAttributes() { + return Attributes + .newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer) + .build(); + } + + @Override + public String getAuthority() { + return "the-authority"; + } + }, + clientInitial, + (call, headers) -> { + interceptedLoggingCall.set(call); + return mockListener; + }); + verifyNoInteractions(mockLogHelper); + } + + @Test + public void serverMethodOrServiceFilter_enabled() { + Metadata clientInitial = new Metadata(); + final MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + when(mockFilterHelper.isMethodToBeLogged(method)) + .thenReturn(FilterParams.create(true, 10, 10)); + + capturedListener = + factory.create() + .interceptCall( + new NoopServerCall() { + @Override + public void sendHeaders(Metadata headers) { + actualServerInitial.set(headers); + } + + @Override + public void sendMessage(byte[] message) { + actualResponse.set(message); + } + + @Override + public void close(Status status, Metadata trailers) { + actualStatus.set(status); + actualTrailers.set(trailers); + } + + @Override + public MethodDescriptor getMethodDescriptor() { + return method; + } + + @Override + public Attributes getAttributes() { + return Attributes + .newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer) + .build(); + } + + @Override + public String getAuthority() { + return "the-authority"; + } + }, + clientInitial, + (call, headers) -> { + interceptedLoggingCall.set(call); + return mockListener; + }); + + { + interceptedLoggingCall.get().sendHeaders(new Metadata()); + byte[] request = "this is a request".getBytes(US_ASCII); + capturedListener.onMessage(request); + capturedListener.onHalfClose(); + byte[] response = "this is a response".getBytes(US_ASCII); + interceptedLoggingCall.get().sendMessage(response); + Status status = Status.INTERNAL.withDescription("trailer description"); + Metadata trailers = new Metadata(); + interceptedLoggingCall.get().close(status, trailers); + capturedListener.onCancel(); + assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(7); + } + } + + @Test + public void eventFilter_enabled() { + when(mockFilterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)).thenReturn(false); + + Metadata clientInitial = new Metadata(); + final MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + when(mockFilterHelper.isMethodToBeLogged(method)) + .thenReturn(FilterParams.create(true, 10, 10)); + + capturedListener = + factory.create() + .interceptCall( + new NoopServerCall() { + @Override + public void sendHeaders(Metadata headers) { + actualServerInitial.set(headers); + } + + @Override + public void sendMessage(byte[] message) { + actualResponse.set(message); + } + + @Override + public void close(Status status, Metadata trailers) { + actualStatus.set(status); + actualTrailers.set(trailers); + } + + @Override + public MethodDescriptor getMethodDescriptor() { + return method; + } + + @Override + public Attributes getAttributes() { + return Attributes + .newBuilder() + .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer) + .build(); + } + + @Override + public String getAuthority() { + return "the-authority"; + } + }, + clientInitial, + (call, headers) -> { + interceptedLoggingCall.set(call); + return mockListener; + }); + + { + interceptedLoggingCall.get().sendHeaders(new Metadata()); + byte[] request = "this is a request".getBytes(US_ASCII); + capturedListener.onMessage(request); + capturedListener.onHalfClose(); + byte[] response = "this is a response".getBytes(US_ASCII); + interceptedLoggingCall.get().sendMessage(response); + Status status = Status.INTERNAL.withDescription("trailer description"); + Metadata trailers = new Metadata(); + interceptedLoggingCall.get().close(status, trailers); + verify(mockLogHelper, never()).logHalfClose( + anyLong(), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()), + any(GrpcLogRecord.EventLogger.class), + anyString()); + capturedListener.onCancel(); + assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(6); + } + } } diff --git a/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java b/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java index ec5ed6d3fd..e44c7ba45a 100644 --- a/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java +++ b/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java @@ -19,12 +19,15 @@ package io.grpc.observability.interceptors; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import com.google.common.collect.Iterables; import com.google.protobuf.ByteString; import com.google.protobuf.Duration; import com.google.protobuf.Timestamp; @@ -35,7 +38,6 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor.Marshaller; import io.grpc.Status; import io.grpc.internal.TimeProvider; -import io.grpc.observability.ObservabilityConfig; import io.grpc.observability.interceptors.LogHelper.PayloadBuilder; import io.grpc.observability.logging.GcpLogSink; import io.grpc.observability.logging.Sink; @@ -53,8 +55,8 @@ import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.nio.charset.Charset; -import java.util.Map; +import java.nio.charset.StandardCharsets; +import java.util.Objects; import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; @@ -66,8 +68,6 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class LogHelperTest { - - private static final Charset US_ASCII = Charset.forName("US-ASCII"); public static final Marshaller BYTEARRAY_MARSHALLER = new ByteArrayMarshaller(); private static final String DATA_A = "aaaaaaaaa"; private static final String DATA_B = "bbbbbbbbb"; @@ -82,38 +82,32 @@ public class LogHelperTest { MetadataEntry .newBuilder() .setKey(KEY_A.name()) - .setValue(ByteString.copyFrom(DATA_A.getBytes(US_ASCII))) + .setValue(ByteString.copyFrom(DATA_A.getBytes(StandardCharsets.US_ASCII))) .build(); private static final MetadataEntry ENTRY_B = MetadataEntry .newBuilder() .setKey(KEY_B.name()) - .setValue(ByteString.copyFrom(DATA_B.getBytes(US_ASCII))) + .setValue(ByteString.copyFrom(DATA_B.getBytes(StandardCharsets.US_ASCII))) .build(); private static final MetadataEntry ENTRY_C = MetadataEntry .newBuilder() .setKey(KEY_C.name()) - .setValue(ByteString.copyFrom(DATA_C.getBytes(US_ASCII))) + .setValue(ByteString.copyFrom(DATA_C.getBytes(StandardCharsets.US_ASCII))) .build(); - + private static final int HEADER_LIMIT = 10; + private static final int MESSAGE_LIMIT = Integer.MAX_VALUE; private final Metadata nonEmptyMetadata = new Metadata(); - private final int nonEmptyMetadataSize = 30; private final Sink sink = mock(GcpLogSink.class); private final Timestamp timestamp = Timestamp.newBuilder().setSeconds(9876).setNanos(54321).build(); private final TimeProvider timeProvider = () -> TimeUnit.SECONDS.toNanos(9876) + 54321; - @SuppressWarnings("unchecked") private final Map locationTags = mock(Map.class); - @SuppressWarnings("unchecked") private final Map customTags = mock(Map.class); - private final ObservabilityConfig observabilityConfig = mock(ObservabilityConfig.class); - private final LogHelper logHelper = - new LogHelper( - sink, - timeProvider, locationTags, customTags, observabilityConfig); + private final LogHelper logHelper = new LogHelper(sink, timeProvider); @Before - public void setUp() throws Exception { + public void setUp() { nonEmptyMetadata.put(KEY_A, DATA_A); nonEmptyMetadata.put(KEY_B, DATA_B); nonEmptyMetadata.put(KEY_C, DATA_C); @@ -151,7 +145,7 @@ public class LogHelperTest { } @Test - public void socketToProto_unknown() throws Exception { + public void socketToProto_unknown() { SocketAddress unknownSocket = new SocketAddress() { @Override public String toString() { @@ -167,7 +161,7 @@ public class LogHelperTest { } @Test - public void metadataToProto_empty() throws Exception { + public void metadataToProto_empty() { assertEquals( GrpcLogRecord.newBuilder() .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) @@ -175,11 +169,12 @@ public class LogHelperTest { GrpcLogRecord.Metadata.getDefaultInstance()) .build(), metadataToProtoTestHelper( - EventType.GRPC_CALL_REQUEST_HEADER, new Metadata())); + EventType.GRPC_CALL_REQUEST_HEADER, new Metadata(), Integer.MAX_VALUE)); } @Test - public void metadataToProto() throws Exception { + public void metadataToProto() { + int nonEmptyMetadataSize = 30; assertEquals( GrpcLogRecord.newBuilder() .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) @@ -193,9 +188,101 @@ public class LogHelperTest { .setPayloadSize(nonEmptyMetadataSize) .build(), metadataToProtoTestHelper( - EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata)); + EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata, Integer.MAX_VALUE)); } + @Test + public void metadataToProto_setsTruncated() { + assertTrue(LogHelper.createMetadataProto(nonEmptyMetadata, 0).truncated); + } + + @Test + public void metadataToProto_truncated() { + // 0 byte limit not enough for any metadata + assertEquals( + io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance(), + LogHelper.createMetadataProto(nonEmptyMetadata, 0).payload.build()); + // not enough bytes for first key value + assertEquals( + io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance(), + LogHelper.createMetadataProto(nonEmptyMetadata, 9).payload.build()); + // enough for first key value + assertEquals( + io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .build(), + LogHelper.createMetadataProto(nonEmptyMetadata, 10).payload.build()); + // Test edge cases for >= 2 key values + assertEquals( + io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .build(), + LogHelper.createMetadataProto(nonEmptyMetadata, 19).payload.build()); + assertEquals( + io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .addEntry(ENTRY_B) + .build(), + LogHelper.createMetadataProto(nonEmptyMetadata, 20).payload.build()); + assertEquals( + io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .addEntry(ENTRY_B) + .build(), + LogHelper.createMetadataProto(nonEmptyMetadata, 29).payload.build()); + + // not truncated: enough for all keys + assertEquals( + io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata + .newBuilder() + .addEntry(ENTRY_A) + .addEntry(ENTRY_B) + .addEntry(ENTRY_C) + .build(), + LogHelper.createMetadataProto(nonEmptyMetadata, 30).payload.build()); + } + + @Test + public void messageToProto() { + byte[] bytes + = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes( + StandardCharsets.US_ASCII); + assertEquals( + GrpcLogRecord.newBuilder() + .setMessage(ByteString.copyFrom(bytes)) + .setPayloadSize(bytes.length) + .build(), + messageTestHelper(bytes, Integer.MAX_VALUE)); + } + + @Test + public void messageToProto_truncated() { + byte[] bytes + = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes( + StandardCharsets.US_ASCII); + assertEquals( + GrpcLogRecord.newBuilder() + .setPayloadSize(bytes.length) + .setPayloadTruncated(true) + .build(), + messageTestHelper(bytes, 0)); + + int limit = 10; + String truncatedMessage = "this is a "; + assertEquals( + GrpcLogRecord.newBuilder() + .setMessage(ByteString.copyFrom(truncatedMessage.getBytes(StandardCharsets.US_ASCII))) + .setPayloadSize(bytes.length) + .setPayloadTruncated(true) + .build(), + messageTestHelper(bytes, limit)); + } + + @Test public void logRequestHeader() throws Exception { long seqId = 1; @@ -209,7 +296,8 @@ public class LogHelperTest { InetSocketAddress peerAddress = new InetSocketAddress(address, port); GrpcLogRecord.Builder builder = - metadataToProtoTestHelper(EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata) + metadataToProtoTestHelper(EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata, + HEADER_LIMIT) .toBuilder() .setTimestamp(timestamp) .setSequenceId(seqId) @@ -232,6 +320,7 @@ public class LogHelperTest { authority, timeout, nonEmptyMetadata, + HEADER_LIMIT, EventLogger.LOGGER_CLIENT, rpcId, null); @@ -247,6 +336,7 @@ public class LogHelperTest { authority, timeout, nonEmptyMetadata, + HEADER_LIMIT, EventLogger.LOGGER_SERVER, rpcId, peerAddress); @@ -266,6 +356,7 @@ public class LogHelperTest { authority, null, nonEmptyMetadata, + HEADER_LIMIT, EventLogger.LOGGER_CLIENT, rpcId, null); @@ -284,6 +375,7 @@ public class LogHelperTest { authority, timeout, nonEmptyMetadata, + HEADER_LIMIT, EventLogger.LOGGER_CLIENT, rpcId, peerAddress); @@ -304,7 +396,8 @@ public class LogHelperTest { InetSocketAddress peerAddress = new InetSocketAddress(address, port); GrpcLogRecord.Builder builder = - metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata) + metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata, + HEADER_LIMIT) .toBuilder() .setTimestamp(timestamp) .setSequenceId(seqId) @@ -324,6 +417,7 @@ public class LogHelperTest { serviceName, methodName, nonEmptyMetadata, + HEADER_LIMIT, EventLogger.LOGGER_CLIENT, rpcId, peerAddress); @@ -337,6 +431,7 @@ public class LogHelperTest { serviceName, methodName, nonEmptyMetadata, + HEADER_LIMIT, EventLogger.LOGGER_SERVER, rpcId, null); @@ -354,9 +449,11 @@ public class LogHelperTest { serviceName, methodName, nonEmptyMetadata, + HEADER_LIMIT, EventLogger.LOGGER_SERVER, rpcId, peerAddress); + fail(); } catch (IllegalArgumentException expected) { assertThat(expected).hasMessageThat() @@ -376,7 +473,8 @@ public class LogHelperTest { Status statusDescription = Status.INTERNAL.withDescription("test description"); GrpcLogRecord.Builder builder = - metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata) + metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata, + HEADER_LIMIT) .toBuilder() .setTimestamp(timestamp) .setSequenceId(seqId) @@ -399,6 +497,7 @@ public class LogHelperTest { methodName, statusDescription, nonEmptyMetadata, + HEADER_LIMIT, EventLogger.LOGGER_CLIENT, rpcId, peerAddress); @@ -413,6 +512,7 @@ public class LogHelperTest { methodName, statusDescription, nonEmptyMetadata, + HEADER_LIMIT, EventLogger.LOGGER_SERVER, rpcId, null); @@ -431,6 +531,7 @@ public class LogHelperTest { methodName, statusDescription, nonEmptyMetadata, + HEADER_LIMIT, EventLogger.LOGGER_CLIENT, rpcId, null); @@ -448,6 +549,7 @@ public class LogHelperTest { methodName, statusDescription.getCode().toStatus(), nonEmptyMetadata, + HEADER_LIMIT, EventLogger.LOGGER_CLIENT, rpcId, peerAddress); @@ -459,14 +561,43 @@ public class LogHelperTest { } @Test - public void logRpcMessage() throws Exception { + public void alwaysLoggedMetadata_grpcTraceBin() { + Metadata.Key key + = Metadata.Key.of("grpc-trace-bin", Metadata.BINARY_BYTE_MARSHALLER); + Metadata metadata = new Metadata(); + metadata.put(key, new byte[1]); + int zeroHeaderBytes = 0; + PayloadBuilder pair = + LogHelper.createMetadataProto(metadata, zeroHeaderBytes); + assertEquals( + key.name(), + Objects.requireNonNull(Iterables.getOnlyElement(pair.payload.getEntryBuilderList())) + .getKey()); + assertFalse(pair.truncated); + } + + @Test + public void neverLoggedMetadata_grpcStatusDetailsBin() { + Metadata.Key key + = Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER); + Metadata metadata = new Metadata(); + metadata.put(key, new byte[1]); + int unlimitedHeaderBytes = Integer.MAX_VALUE; + PayloadBuilder pair + = LogHelper.createMetadataProto(metadata, unlimitedHeaderBytes); + assertThat(pair.payload.getEntryBuilderList()).isEmpty(); + assertFalse(pair.truncated); + } + + @Test + public void logRpcMessage() { long seqId = 1; String serviceName = "service"; String methodName = "method"; String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; byte[] message = new byte[100]; - GrpcLogRecord.Builder builder = messageTestHelper(message) + GrpcLogRecord.Builder builder = messageTestHelper(message, MESSAGE_LIMIT) .toBuilder() .setTimestamp(timestamp) .setSequenceId(seqId) @@ -485,6 +616,7 @@ public class LogHelperTest { methodName, EventType.GRPC_CALL_REQUEST_MESSAGE, message, + MESSAGE_LIMIT, EventLogger.LOGGER_CLIENT, rpcId); verify(sink).write(base); @@ -497,6 +629,7 @@ public class LogHelperTest { methodName, EventType.GRPC_CALL_RESPONSE_MESSAGE, message, + MESSAGE_LIMIT, EventLogger.LOGGER_CLIENT, rpcId); verify(sink).write( @@ -512,6 +645,7 @@ public class LogHelperTest { methodName, EventType.GRPC_CALL_REQUEST_MESSAGE, message, + MESSAGE_LIMIT, EventLogger.LOGGER_SERVER, rpcId); verify(sink).write( @@ -527,6 +661,7 @@ public class LogHelperTest { methodName, EventType.GRPC_CALL_RESPONSE_MESSAGE, message, + MESSAGE_LIMIT, EventLogger.LOGGER_SERVER, rpcId); verify(sink).write( @@ -548,28 +683,31 @@ public class LogHelperTest { } private static GrpcLogRecord metadataToProtoTestHelper( - EventType type, Metadata metadata) { + EventType type, Metadata metadata, int maxHeaderBytes) { GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder(); PayloadBuilder pair - = LogHelper.createMetadataProto(metadata); + = LogHelper.createMetadataProto(metadata, maxHeaderBytes); builder.setMetadata(pair.payload); builder.setPayloadSize(pair.size); + builder.setPayloadTruncated(pair.truncated); builder.setEventType(type); return builder.build(); } - private static GrpcLogRecord messageTestHelper(byte[] message) { + private static GrpcLogRecord messageTestHelper(byte[] message, int maxMessageBytes) { GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder(); PayloadBuilder pair - = LogHelper.createMesageProto(message); + = LogHelper.createMessageProto(message, maxMessageBytes); builder.setMessage(pair.payload); builder.setPayloadSize(pair.size); + builder.setPayloadTruncated(pair.truncated); return builder.build(); } // Used only in tests // Copied from internal static final class ByteArrayMarshaller implements Marshaller { + @Override public InputStream stream(byte[] value) { return new ByteArrayInputStream(value); @@ -595,6 +733,7 @@ public class LogHelperTest { // Copied from internal static final class IoUtils { + /** maximum buffer to be read is 16 KB. */ private static final int MAX_BUFFER_LENGTH = 16384; diff --git a/observability/src/test/java/io/grpc/observability/logging/GcpLogSinkTest.java b/observability/src/test/java/io/grpc/observability/logging/GcpLogSinkTest.java index ab70fc4456..7aa5cd515f 100644 --- a/observability/src/test/java/io/grpc/observability/logging/GcpLogSinkTest.java +++ b/observability/src/test/java/io/grpc/observability/logging/GcpLogSinkTest.java @@ -20,17 +20,26 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.anyIterable; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import com.google.cloud.MonitoredResource; import com.google.cloud.logging.LogEntry; import com.google.cloud.logging.Logging; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Duration; import com.google.protobuf.Struct; import com.google.protobuf.Value; +import com.google.protobuf.util.Durations; import io.grpc.observabilitylog.v1.GrpcLogRecord; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -49,6 +58,42 @@ public class GcpLogSinkTest { @Rule public final MockitoRule mockito = MockitoJUnit.rule(); + private static final Map locationTags = ImmutableMap.of("project_id", "PROJECT", + "location", "us-central1-c", + "cluster_name", "grpc-observability-cluster", + "namespace_name", "default" , + "pod_name", "app1-6c7c58f897-n92c5"); + private static final Map customTags = ImmutableMap.of("KEY1", "Value1", + "KEY2", "VALUE2"); + private static final int flushLimit = 10; + private final long seqId = 1; + private final String serviceName = "service"; + private final String methodName = "method"; + private final String authority = "authority"; + private final Duration timeout = Durations.fromMillis(1234); + private final String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f"; + private final GrpcLogRecord logProto = GrpcLogRecord.newBuilder() + .setSequenceId(seqId) + .setServiceName(serviceName) + .setMethodName(methodName) + .setAuthority(authority) + .setTimeout(timeout) + .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) + .setEventLogger(EventLogger.LOGGER_CLIENT) + .setRpcId(rpcId) + .build(); + private final Struct expectedStructLogProto = Struct.newBuilder() + .putFields("sequence_id", Value.newBuilder().setStringValue(String.valueOf(seqId)).build()) + .putFields("service_name", Value.newBuilder().setStringValue(serviceName).build()) + .putFields("method_name", Value.newBuilder().setStringValue(methodName).build()) + .putFields("authority", Value.newBuilder().setStringValue(authority).build()) + .putFields("timeout", Value.newBuilder().setStringValue("1.234s").build()) + .putFields("event_type", Value.newBuilder().setStringValue( + String.valueOf(EventType.GRPC_CALL_REQUEST_HEADER)).build()) + .putFields("event_logger", Value.newBuilder().setStringValue( + String.valueOf(EventLogger.LOGGER_CLIENT)).build()) + .putFields("rpc_id", Value.newBuilder().setStringValue(rpcId).build()) + .build(); private Logging mockLogging; @Before @@ -58,38 +103,81 @@ public class GcpLogSinkTest { @Test public void createSink() { - Sink mockSink = new GcpLogSink(mockLogging); + Sink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit); assertThat(mockSink).isInstanceOf(GcpLogSink.class); } @Test @SuppressWarnings("unchecked") public void verifyWrite() throws Exception { - Sink mockSink = new GcpLogSink(mockLogging); - GrpcLogRecord logProto = GrpcLogRecord.newBuilder() - .setRpcId("1234") - .build(); - Struct expectedStructLogProto = Struct.newBuilder().putFields( - "rpc_id", Value.newBuilder().setStringValue("1234").build() - ).build(); - + Sink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit); mockSink.write(logProto); + ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( (Class) Collection.class); verify(mockLogging, times(1)).write(logEntrySetCaptor.capture()); for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); + System.out.println(entry); assertEquals(entry.getPayload().getData(), expectedStructLogProto); } verifyNoMoreInteractions(mockLogging); } + @Test + @SuppressWarnings("unchecked") + public void verifyWriteWithTags() { + GcpLogSink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit); + MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(locationTags); + mockSink.write(logProto); + + ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( + (Class) Collection.class); + verify(mockLogging, times(1)).write(logEntrySetCaptor.capture()); + System.out.println(logEntrySetCaptor.getValue()); + for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { + LogEntry entry = it.next(); + assertEquals(entry.getResource(), expectedMonitoredResource); + assertEquals(entry.getLabels(), customTags); + assertEquals(entry.getPayload().getData(), expectedStructLogProto); + } + verifyNoMoreInteractions(mockLogging); + } + + @Test + @SuppressWarnings("unchecked") + public void emptyCustomTags_labelsNotSet() { + Map emptyCustomTags = null; + Map expectedEmptyLabels = new HashMap<>(); + GcpLogSink mockSink = new GcpLogSink(mockLogging, locationTags, emptyCustomTags, flushLimit); + mockSink.write(logProto); + + ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( + (Class) Collection.class); + verify(mockLogging, times(1)).write(logEntrySetCaptor.capture()); + for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { + LogEntry entry = it.next(); + assertEquals(entry.getLabels(), expectedEmptyLabels); + assertEquals(entry.getPayload().getData(), expectedStructLogProto); + } + } + + @Test + public void verifyFlush() { + int lowerFlushLimit = 2; + GcpLogSink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, lowerFlushLimit); + mockSink.write(logProto); + verify(mockLogging, never()).flush(); + mockSink.write(logProto); + verify(mockLogging, times(1)).flush(); + mockSink.write(logProto); + mockSink.write(logProto); + verify(mockLogging, times(2)).flush(); + } + @Test public void verifyClose() throws Exception { - Sink mockSink = new GcpLogSink(mockLogging); - GrpcLogRecord logProto = GrpcLogRecord.newBuilder() - .setRpcId("1234") - .build(); + Sink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit); mockSink.write(logProto); verify(mockLogging, times(1)).write(anyIterable()); mockSink.close();