gcp-observability: updated config to public preview config (#9622)

This commit is contained in:
DNVindhya 2022-10-18 14:23:54 -07:00 committed by GitHub
parent 43942623fb
commit aeb90e3855
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 838 additions and 941 deletions

View File

@ -76,15 +76,15 @@ public final class GcpObservability implements AutoCloseable {
if (instance == null) {
GlobalLocationTags globalLocationTags = new GlobalLocationTags();
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
Sink sink = new GcpLogSink(observabilityConfig.getProjectId(),
globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(),
SERVICES_TO_EXCLUDE);
LogHelper helper = new LogHelper(sink);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.getInstance(observabilityConfig);
instance = grpcInit(sink, observabilityConfig,
new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper),
new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper));
instance.registerStackDriverExporter(observabilityConfig.getDestinationProjectId(),
instance.registerStackDriverExporter(observabilityConfig.getProjectId(),
observabilityConfig.getCustomTags());
}
return instance;

View File

@ -17,10 +17,11 @@
package io.grpc.gcp.observability;
import io.grpc.Internal;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.Sampler;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.concurrent.ThreadSafe;
@Internal
public interface ObservabilityConfig {
@ -33,14 +34,14 @@ public interface ObservabilityConfig {
/** Is Cloud Tracing enabled. */
boolean isEnableCloudTracing();
/** Get destination project ID - where logs will go. */
String getDestinationProjectId();
/** Get project ID - where logs will go. */
String getProjectId();
/** Get filters set for logging. */
List<LogFilter> getLogFilters();
/** Get filters for client logging. */
List<LogFilter> getClientLogFilters();
/** Get event types to log. */
List<EventType> getEventTypes();
/** Get filters for server logging. */
List<LogFilter> getServerLogFilters();
/** Get sampler for TraceConfig - when Cloud Tracing is enabled. */
Sampler getSampler();
@ -51,27 +52,44 @@ public interface ObservabilityConfig {
/**
* POJO for representing a filter used in configuration.
*/
@ThreadSafe
class LogFilter {
/** Pattern indicating which service/method to log. */
public final String pattern;
/** Set of services. */
public final Set<String> services;
/** Number of bytes of each header to log. */
public final Integer headerBytes;
/* Set of fullMethodNames. */
public final Set<String> methods;
/** Number of bytes of each header to log. */
public final Integer messageBytes;
/** Boolean to indicate all services and methods. */
public final boolean matchAll;
/** Number of bytes of header to log. */
public final int headerBytes;
/** Number of bytes of message to log. */
public final int messageBytes;
/** Boolean to indicate if services and methods matching pattern needs to be excluded. */
public final boolean excludePattern;
/**
* Object used to represent filter used in configuration.
*
* @param pattern Pattern indicating which service/method to log
* @param headerBytes Number of bytes of each header to log
* @param messageBytes Number of bytes of each header to log
* @param services Set of services derived from pattern
* @param serviceMethods Set of fullMethodNames derived from pattern
* @param matchAll If true, match all services and methods
* @param headerBytes Total number of bytes of header to log
* @param messageBytes Total number of bytes of message to log
* @param excludePattern If true, services and methods matching pattern be excluded
*/
public LogFilter(String pattern, Integer headerBytes, Integer messageBytes) {
this.pattern = pattern;
public LogFilter(Set<String> services, Set<String> serviceMethods, boolean matchAll,
int headerBytes, int messageBytes,
boolean excludePattern) {
this.services = services;
this.methods = serviceMethods;
this.matchAll = matchAll;
this.headerBytes = headerBytes;
this.messageBytes = messageBytes;
this.excludePattern = excludePattern;
}
}
}

View File

@ -18,35 +18,47 @@ package io.grpc.gcp.observability;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.cloud.ServiceOptions;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.grpc.internal.JsonParser;
import io.grpc.internal.JsonUtil;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.Sampler;
import io.opencensus.trace.samplers.Samplers;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* gRPC GcpObservability configuration processor.
*/
final class ObservabilityConfigImpl implements ObservabilityConfig {
private static final String CONFIG_ENV_VAR_NAME = "GRPC_CONFIG_OBSERVABILITY";
private static final String CONFIG_FILE_ENV_VAR_NAME = "GRPC_CONFIG_OBSERVABILITY_JSON";
private static final Logger logger = Logger
.getLogger(ObservabilityConfigImpl.class.getName());
private static final String CONFIG_ENV_VAR_NAME = "GRPC_GCP_OBSERVABILITY_CONFIG";
private static final String CONFIG_FILE_ENV_VAR_NAME = "GRPC_GCP_OBSERVABILITY_CONFIG_FILE";
// Tolerance for floating-point comparisons.
private static final double EPSILON = 1e-6;
private static final Pattern METHOD_NAME_REGEX =
Pattern.compile("^([*])|((([\\w]+)/((?:\\w+)|[*])))$");
private boolean enableCloudLogging = false;
private boolean enableCloudMonitoring = false;
private boolean enableCloudTracing = false;
private String destinationProjectId = null;
private List<LogFilter> logFilters;
private List<EventType> eventTypes;
private String projectId = null;
private List<LogFilter> clientLogFilters;
private List<LogFilter> serverLogFilters;
private Sampler sampler;
private Map<String, String> customTags;
@ -62,7 +74,10 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {
}
void parseFile(String configFile) throws IOException {
parse(new String(Files.readAllBytes(Paths.get(configFile)), Charsets.UTF_8));
String configFileContent =
new String(Files.readAllBytes(Paths.get(configFile)), Charsets.UTF_8);
checkArgument(!configFileContent.isEmpty(), CONFIG_FILE_ENV_VAR_NAME + " is empty!");
parse(configFileContent);
}
@SuppressWarnings("unchecked")
@ -72,73 +87,151 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {
}
private void parseConfig(Map<String, ?> config) {
if (config != null) {
Boolean value = JsonUtil.getBoolean(config, "enable_cloud_logging");
if (value != null) {
enableCloudLogging = value;
}
value = JsonUtil.getBoolean(config, "enable_cloud_monitoring");
if (value != null) {
enableCloudMonitoring = value;
}
value = JsonUtil.getBoolean(config, "enable_cloud_trace");
if (value != null) {
enableCloudTracing = value;
}
destinationProjectId = JsonUtil.getString(config, "destination_project_id");
List<?> rawList = JsonUtil.getList(config, "log_filters");
if (rawList != null) {
List<Map<String, ?>> jsonLogFilters = JsonUtil.checkObjectList(rawList);
ImmutableList.Builder<LogFilter> logFiltersBuilder = new ImmutableList.Builder<>();
for (Map<String, ?> jsonLogFilter : jsonLogFilters) {
logFiltersBuilder.add(parseJsonLogFilter(jsonLogFilter));
}
this.logFilters = logFiltersBuilder.build();
}
rawList = JsonUtil.getList(config, "event_types");
if (rawList != null) {
List<String> jsonEventTypes = JsonUtil.checkStringList(rawList);
ImmutableList.Builder<EventType> eventTypesBuilder = new ImmutableList.Builder<>();
for (String jsonEventType : jsonEventTypes) {
eventTypesBuilder.add(EventType.valueOf(jsonEventType));
}
this.eventTypes = eventTypesBuilder.build();
}
Double samplingRate = JsonUtil.getNumberAsDouble(config, "global_trace_sampling_rate");
if (samplingRate == null) {
this.sampler = Samplers.probabilitySampler(0.0);
} else {
checkArgument(
samplingRate >= 0.0 && samplingRate <= 1.0,
"'global_trace_sampling_rate' needs to be between [0.0, 1.0]");
// Using alwaysSample() instead of probabilitySampler() because according to
// {@link io.opencensus.trace.samplers.ProbabilitySampler#shouldSample}
// there is a (very) small chance of *not* sampling if probability = 1.00.
if (1 - samplingRate < EPSILON) {
this.sampler = Samplers.alwaysSample();
} else {
this.sampler = Samplers.probabilitySampler(samplingRate);
}
}
Map<String, ?> rawCustomTags = JsonUtil.getObject(config, "custom_tags");
if (rawCustomTags != null) {
ImmutableMap.Builder<String, String> builder = new ImmutableMap.Builder<>();
for (Map.Entry<String, ?> entry: rawCustomTags.entrySet()) {
checkArgument(
entry.getValue() instanceof String,
"'custom_tags' needs to be a map of <string, string>");
builder.put(entry.getKey(), (String) entry.getValue());
}
customTags = builder.build();
}
checkArgument(config != null, "Invalid configuration");
if (config.isEmpty()) {
clientLogFilters = Collections.emptyList();
serverLogFilters = Collections.emptyList();
customTags = Collections.emptyMap();
return;
}
projectId = fetchProjectId(JsonUtil.getString(config, "project_id"));
Map<String, ?> rawCloudLoggingObject = JsonUtil.getObject(config, "cloud_logging");
if (rawCloudLoggingObject != null) {
enableCloudLogging = true;
ImmutableList.Builder<LogFilter> clientFiltersBuilder = new ImmutableList.Builder<>();
ImmutableList.Builder<LogFilter> serverFiltersBuilder = new ImmutableList.Builder<>();
parseLoggingObject(rawCloudLoggingObject, clientFiltersBuilder, serverFiltersBuilder);
clientLogFilters = clientFiltersBuilder.build();
serverLogFilters = serverFiltersBuilder.build();
}
Map<String, ?> rawCloudMonitoringObject = JsonUtil.getObject(config, "cloud_monitoring");
if (rawCloudMonitoringObject != null) {
enableCloudMonitoring = true;
}
Map<String, ?> rawCloudTracingObject = JsonUtil.getObject(config, "cloud_trace");
if (rawCloudTracingObject != null) {
enableCloudTracing = true;
sampler = parseTracingObject(rawCloudTracingObject);
}
Map<String, ?> rawCustomTagsObject = JsonUtil.getObject(config, "labels");
if (rawCustomTagsObject != null) {
customTags = parseCustomTags(rawCustomTagsObject);
}
if (clientLogFilters == null) {
clientLogFilters = Collections.emptyList();
}
if (serverLogFilters == null) {
serverLogFilters = Collections.emptyList();
}
if (customTags == null) {
customTags = Collections.emptyMap();
}
}
private static String fetchProjectId(String configProjectId) {
// If project_id is not specified in config, get default GCP project id from the environment
String projectId = configProjectId != null ? configProjectId : getDefaultGcpProjectId();
checkArgument(projectId != null, "Unable to detect project_id");
logger.log(Level.FINEST, "Found project ID : ", projectId);
return projectId;
}
private LogFilter parseJsonLogFilter(Map<String, ?> logFilterMap) {
return new LogFilter(JsonUtil.getString(logFilterMap, "pattern"),
JsonUtil.getNumberAsInteger(logFilterMap, "header_bytes"),
JsonUtil.getNumberAsInteger(logFilterMap, "message_bytes"));
private static String getDefaultGcpProjectId() {
return ServiceOptions.getDefaultProjectId();
}
private static void parseLoggingObject(
Map<String, ?> rawLoggingConfig,
ImmutableList.Builder<LogFilter> clientFilters,
ImmutableList.Builder<LogFilter> serverFilters) {
parseRpcEvents(JsonUtil.getList(rawLoggingConfig, "client_rpc_events"), clientFilters);
parseRpcEvents(JsonUtil.getList(rawLoggingConfig, "server_rpc_events"), serverFilters);
}
private static Sampler parseTracingObject(Map<String, ?> rawCloudTracingConfig) {
Sampler defaultSampler = Samplers.probabilitySampler(0.0);
Double samplingRate = JsonUtil.getNumberAsDouble(rawCloudTracingConfig, "sampling_rate");
if (samplingRate == null) {
return defaultSampler;
}
checkArgument(samplingRate >= 0.0 && samplingRate <= 1.0,
"'sampling_rate' needs to be between [0.0, 1.0]");
// Using alwaysSample() instead of probabilitySampler() because according to
// {@link io.opencensus.trace.samplers.ProbabilitySampler#shouldSample}
// there is a (very) small chance of *not* sampling if probability = 1.00.
return 1 - samplingRate < EPSILON ? Samplers.alwaysSample()
: Samplers.probabilitySampler(samplingRate);
}
private static Map<String, String> parseCustomTags(Map<String, ?> rawCustomTags) {
ImmutableMap.Builder<String, String> builder = new ImmutableMap.Builder<>();
for (Map.Entry<String, ?> entry: rawCustomTags.entrySet()) {
checkArgument(
entry.getValue() instanceof String,
"'labels' needs to be a map of <string, string>");
builder.put(entry.getKey(), (String) entry.getValue());
}
return builder.build();
}
private static void parseRpcEvents(List<?> rpcEvents, ImmutableList.Builder<LogFilter> filters) {
if (rpcEvents == null) {
return;
}
List<Map<String, ?>> jsonRpcEvents = JsonUtil.checkObjectList(rpcEvents);
for (Map<String, ?> jsonClientRpcEvent : jsonRpcEvents) {
filters.add(parseJsonLogFilter(jsonClientRpcEvent));
}
}
private static LogFilter parseJsonLogFilter(Map<String, ?> logFilterMap) {
ImmutableSet.Builder<String> servicesSetBuilder = new ImmutableSet.Builder<>();
ImmutableSet.Builder<String> methodsSetBuilder = new ImmutableSet.Builder<>();
boolean wildCardFilter = false;
boolean excludeFilter =
Boolean.TRUE.equals(JsonUtil.getBoolean(logFilterMap, "exclude"));
List<String> methodsList = JsonUtil.getListOfStrings(logFilterMap, "methods");
if (methodsList != null) {
wildCardFilter = extractMethodOrServicePattern(
methodsList, excludeFilter, servicesSetBuilder, methodsSetBuilder);
}
Integer maxHeaderBytes = JsonUtil.getNumberAsInteger(logFilterMap, "max_metadata_bytes");
Integer maxMessageBytes = JsonUtil.getNumberAsInteger(logFilterMap, "max_message_bytes");
return new LogFilter(
servicesSetBuilder.build(),
methodsSetBuilder.build(),
wildCardFilter,
maxHeaderBytes != null ? maxHeaderBytes.intValue() : 0,
maxMessageBytes != null ? maxMessageBytes.intValue() : 0,
excludeFilter);
}
private static boolean extractMethodOrServicePattern(List<String> patternList, boolean exclude,
ImmutableSet.Builder<String> servicesSetBuilder,
ImmutableSet.Builder<String> methodsSetBuilder) {
boolean globalFilter = false;
for (String methodOrServicePattern : patternList) {
Matcher matcher = METHOD_NAME_REGEX.matcher(methodOrServicePattern);
checkArgument(
matcher.matches(), "invalid service or method filter : " + methodOrServicePattern);
if ("*".equals(methodOrServicePattern)) {
checkArgument(!exclude, "cannot have 'exclude' and '*' wildcard in the same filter");
globalFilter = true;
} else if ("*".equals(matcher.group(5))) {
String service = matcher.group(4);
servicesSetBuilder.add(service);
} else {
methodsSetBuilder.add(methodOrServicePattern);
}
}
return globalFilter;
}
@Override
@ -157,18 +250,18 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {
}
@Override
public String getDestinationProjectId() {
return destinationProjectId;
public String getProjectId() {
return projectId;
}
@Override
public List<LogFilter> getLogFilters() {
return logFilters;
public List<LogFilter> getClientLogFilters() {
return clientLogFilters;
}
@Override
public List<EventType> getEventTypes() {
return eventTypes;
public List<LogFilter> getServerLogFilters() {
return serverLogFilters;
}
@Override

View File

@ -16,51 +16,27 @@
package io.grpc.gcp.observability.interceptors;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.grpc.Internal;
import io.grpc.MethodDescriptor;
import io.grpc.gcp.observability.ObservabilityConfig;
import io.grpc.gcp.observability.ObservabilityConfig.LogFilter;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Parses gRPC GcpObservability configuration filters for interceptors usage.
*/
@Internal
public class ConfigFilterHelper {
private static final Logger logger = Logger.getLogger(ConfigFilterHelper.class.getName());
public static final FilterParams NO_FILTER_PARAMS
= FilterParams.create(false, 0, 0);
public static final String globalPattern = "*";
private final ObservabilityConfig config;
@VisibleForTesting
boolean methodOrServiceFilterPresent;
// Flag to log every service and method
@VisibleForTesting
Map<String, FilterParams> perServiceFilters;
@VisibleForTesting
Map<String, FilterParams> perMethodFilters;
@VisibleForTesting
Set<EventType> logEventTypeSet;
@VisibleForTesting
ConfigFilterHelper(ObservabilityConfig config) {
private ConfigFilterHelper(ObservabilityConfig config) {
this.config = config;
this.methodOrServiceFilterPresent = false;
this.perServiceFilters = new HashMap<>();
this.perMethodFilters = new HashMap<>();
}
/**
@ -69,82 +45,44 @@ public class ConfigFilterHelper {
* @param config processed ObservabilityConfig object
* @return helper instance for filtering
*/
public static ConfigFilterHelper factory(ObservabilityConfig config) {
ConfigFilterHelper filterHelper = new ConfigFilterHelper(config);
if (config.isEnableCloudLogging()) {
filterHelper.setMethodOrServiceFilterMaps();
filterHelper.setEventFilterSet();
}
return filterHelper;
public static ConfigFilterHelper getInstance(ObservabilityConfig config) {
return new ConfigFilterHelper(config);
}
@VisibleForTesting
void setMethodOrServiceFilterMaps() {
List<LogFilter> logFilters = config.getLogFilters();
if (logFilters == null) {
return;
}
Map<String, FilterParams> perServiceFilters = new HashMap<>();
Map<String, FilterParams> perMethodFilters = new HashMap<>();
/**
* Checks if the corresponding service/method passed needs to be logged according to user provided
* observability configuration.
* Filters are evaluated in text order, first match is used.
*
* @param fullMethodName the fully qualified name of the method
* @param client set to true if method being checked is a client method; false otherwise
* @return FilterParams object 1. specifies if the corresponding method needs to be logged
* (log field will be set to true) 2. values of payload limits retrieved from configuration
*/
public FilterParams logRpcMethod(String fullMethodName, boolean client) {
FilterParams params = NO_FILTER_PARAMS;
for (LogFilter currentFilter : logFilters) {
// '*' for global, 'service/*' for service glob, or 'service/method' for fully qualified
String methodOrServicePattern = currentFilter.pattern;
int currentHeaderBytes
= currentFilter.headerBytes != null ? currentFilter.headerBytes : 0;
int currentMessageBytes
= currentFilter.messageBytes != null ? currentFilter.messageBytes : 0;
if (methodOrServicePattern.equals("*")) {
// parse config for global, e.g. "*"
if (perServiceFilters.containsKey(globalPattern)) {
logger.log(Level.WARNING, "Duplicate entry : {0}", methodOrServicePattern);
continue;
int index = checkNotNull(fullMethodName, "fullMethodName").lastIndexOf('/');
String serviceName = fullMethodName.substring(0, index);
List<LogFilter> logFilters =
client ? config.getClientLogFilters() : config.getServerLogFilters();
// TODO (dnvindhya): Optimize by caching results for fullMethodName.
for (LogFilter logFilter : logFilters) {
if (logFilter.matchAll
|| logFilter.services.contains(serviceName)
|| logFilter.methods.contains(fullMethodName)) {
if (logFilter.excludePattern) {
return params;
}
FilterParams params = FilterParams.create(true,
currentHeaderBytes, currentMessageBytes);
perServiceFilters.put(globalPattern, params);
} else if (methodOrServicePattern.endsWith("/*")) {
// TODO(DNVindhya): check if service name is a valid string for a service name
// parse config for a service, e.g. "service/*"
String service = MethodDescriptor.extractFullServiceName(methodOrServicePattern);
if (perServiceFilters.containsKey(service)) {
logger.log(Level.WARNING, "Duplicate entry : {0)", methodOrServicePattern);
continue;
}
FilterParams params = FilterParams.create(true,
currentHeaderBytes, currentMessageBytes);
perServiceFilters.put(service, params);
} else {
// TODO(DNVVindhya): check if methodOrServicePattern is a valid full qualified method name
// parse pattern for a fully qualified method, e.g "service/method"
if (perMethodFilters.containsKey(methodOrServicePattern)) {
logger.log(Level.WARNING, "Duplicate entry : {0}", methodOrServicePattern);
continue;
}
FilterParams params = FilterParams.create(true,
currentHeaderBytes, currentMessageBytes);
perMethodFilters.put(methodOrServicePattern, params);
int currentHeaderBytes = logFilter.headerBytes;
int currentMessageBytes = logFilter.messageBytes;
return FilterParams.create(true, currentHeaderBytes, currentMessageBytes);
}
}
this.perServiceFilters = ImmutableMap.copyOf(perServiceFilters);
this.perMethodFilters = ImmutableMap.copyOf(perMethodFilters);
if (!perServiceFilters.isEmpty() || !perMethodFilters.isEmpty()) {
this.methodOrServiceFilterPresent = true;
}
}
@VisibleForTesting
void setEventFilterSet() {
List<EventType> eventFilters = config.getEventTypes();
if (eventFilters == null) {
return;
}
if (eventFilters.isEmpty()) {
this.logEventTypeSet = ImmutableSet.of();
return;
}
this.logEventTypeSet = ImmutableSet.copyOf(eventFilters);
return params;
}
/**
@ -166,50 +104,4 @@ public class ConfigFilterHelper {
log, headerBytes, messageBytes);
}
}
/**
* Checks if the corresponding service/method passed needs to be logged as per the user provided
* configuration.
*
* @param method the fully qualified name of the method
* @return MethodFilterParams object 1. specifies if the corresponding method needs to be logged
* (log field will be set to true) 2. values of payload limits retrieved from configuration
*/
public FilterParams isMethodToBeLogged(MethodDescriptor<?, ?> method) {
FilterParams params = NO_FILTER_PARAMS;
if (methodOrServiceFilterPresent) {
String fullMethodName = method.getFullMethodName();
if (perMethodFilters.containsKey(fullMethodName)) {
params = perMethodFilters.get(fullMethodName);
} else {
String serviceName = method.getServiceName();
if (perServiceFilters.containsKey(serviceName)) {
params = perServiceFilters.get(serviceName);
} else if (perServiceFilters.containsKey(globalPattern)) {
params = perServiceFilters.get(globalPattern);
}
}
}
return params;
}
/**
* Checks if the corresponding event passed needs to be logged as per the user provided
* configuration.
*
* <p> All events are logged by default if event_types is not specified or {} in configuration.
* If event_types is specified as [], no events will be logged.
* If events types is specified as a non-empty list, only the events specified in the
* list will be logged.
* </p>
*
* @param event gRPC observability event
* @return true if event needs to be logged, false otherwise
*/
public boolean isEventToBeLogged(EventType event) {
if (logEventTypeSet == null) {
return true;
}
return logEventTypeSet.contains(event);
}
}

View File

@ -93,7 +93,7 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
final Deadline deadline = LogHelper.min(callOptions.getDeadline(),
Context.current().getDeadline());
FilterParams filterParams = filterHelper.isMethodToBeLogged(method);
FilterParams filterParams = filterHelper.logRpcMethod(method.getFullMethodName(), true);
if (!filterParams.log()) {
return next.newCall(method, callOptions);
}
@ -111,28 +111,26 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));
if (filterHelper.isEventToBeLogged(EventType.CLIENT_HEADER)) {
try {
helper.logClientHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
timeout,
headers,
maxHeaderBytes,
EventLogger.CLIENT,
callId,
null);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
// exceptions to callers which will lead to RPC getting aborted.
// Expected exceptions to be caught:
// 1. IllegalArgumentException
// 2. NullPointerException
logger.log(Level.SEVERE, "Unable to log request header", e);
}
try {
helper.logClientHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
timeout,
headers,
maxHeaderBytes,
EventLogger.CLIENT,
callId,
null);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
// exceptions to callers which will lead to RPC getting aborted.
// Expected exceptions to be caught:
// 1. IllegalArgumentException
// 2. NullPointerException
logger.log(Level.SEVERE, "Unable to log request header", e);
}
Listener<RespT> observabilityListener =
@ -140,22 +138,19 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
@Override
public void onMessage(RespT message) {
// Event: EventType.SERVER_MESSAGE
EventType responseMessageType = EventType.SERVER_MESSAGE;
if (filterHelper.isEventToBeLogged(responseMessageType)) {
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
responseMessageType,
message,
maxMessageBytes,
EventLogger.CLIENT,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
EventType.SERVER_MESSAGE,
message,
maxMessageBytes,
EventLogger.CLIENT,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
super.onMessage(message);
}
@ -163,21 +158,19 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
@Override
public void onHeaders(Metadata headers) {
// Event: EventType.SERVER_HEADER
if (filterHelper.isEventToBeLogged(EventType.SERVER_HEADER)) {
try {
helper.logServerHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
headers,
maxHeaderBytes,
EventLogger.CLIENT,
callId,
LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
try {
helper.logServerHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
headers,
maxHeaderBytes,
EventLogger.CLIENT,
callId,
LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
super.onHeaders(headers);
}
@ -185,22 +178,20 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
@Override
public void onClose(Status status, Metadata trailers) {
// Event: EventType.SERVER_TRAILER
if (filterHelper.isEventToBeLogged(EventType.SERVER_TRAILER)) {
try {
helper.logTrailer(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
status,
trailers,
maxHeaderBytes,
EventLogger.CLIENT,
callId,
LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
try {
helper.logTrailer(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
status,
trailers,
maxHeaderBytes,
EventLogger.CLIENT,
callId,
LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
super.onClose(status, trailers);
}
@ -211,22 +202,19 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
@Override
public void sendMessage(ReqT message) {
// Event: EventType.CLIENT_MESSAGE
EventType requestMessageType = EventType.CLIENT_MESSAGE;
if (filterHelper.isEventToBeLogged(requestMessageType)) {
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
requestMessageType,
message,
maxMessageBytes,
EventLogger.CLIENT,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
EventType.CLIENT_MESSAGE,
message,
maxMessageBytes,
EventLogger.CLIENT,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
super.sendMessage(message);
}
@ -234,18 +222,16 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
@Override
public void halfClose() {
// Event: EventType.CLIENT_HALF_CLOSE
if (filterHelper.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)) {
try {
helper.logHalfClose(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
EventLogger.CLIENT,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
try {
helper.logHalfClose(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
EventLogger.CLIENT,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
super.halfClose();
}
@ -253,18 +239,16 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
@Override
public void cancel(String message, Throwable cause) {
// Event: EventType.CANCEL
if (filterHelper.isEventToBeLogged(EventType.CANCEL)) {
try {
helper.logCancel(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
EventLogger.CLIENT,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
try {
helper.logCancel(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
EventLogger.CLIENT,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
super.cancel(message, cause);
}

View File

@ -93,7 +93,8 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));
FilterParams filterParams = filterHelper.isMethodToBeLogged(call.getMethodDescriptor());
FilterParams filterParams =
filterHelper.logRpcMethod(call.getMethodDescriptor().getFullMethodName(), false);
if (!filterParams.log()) {
return next.startCall(call, headers);
}
@ -102,28 +103,26 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
final int maxMessageBytes = filterParams.messageBytes();
// Event: EventType.CLIENT_HEADER
if (filterHelper.isEventToBeLogged(EventType.CLIENT_HEADER)) {
try {
helper.logClientHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
timeout,
headers,
maxHeaderBytes,
EventLogger.SERVER,
callId,
peerAddress);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
// exceptions to callers which will lead to RPC getting aborted.
// Expected exceptions to be caught:
// 1. IllegalArgumentException
// 2. NullPointerException
logger.log(Level.SEVERE, "Unable to log request header", e);
}
try {
helper.logClientHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
timeout,
headers,
maxHeaderBytes,
EventLogger.SERVER,
callId,
peerAddress);
} catch (Exception e) {
// Catching generic exceptions instead of specific ones for all the events.
// This way we can catch both expected and unexpected exceptions instead of re-throwing
// exceptions to callers which will lead to RPC getting aborted.
// Expected exceptions to be caught:
// 1. IllegalArgumentException
// 2. NullPointerException
logger.log(Level.SEVERE, "Unable to log request header", e);
}
ServerCall<ReqT, RespT> wrapperCall =
@ -131,21 +130,19 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
@Override
public void sendHeaders(Metadata headers) {
// Event: EventType.SERVER_HEADER
if (filterHelper.isEventToBeLogged(EventType.SERVER_HEADER)) {
try {
helper.logServerHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
headers,
maxHeaderBytes,
EventLogger.SERVER,
callId,
null);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
try {
helper.logServerHeader(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
headers,
maxHeaderBytes,
EventLogger.SERVER,
callId,
null);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
super.sendHeaders(headers);
}
@ -154,21 +151,19 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
public void sendMessage(RespT message) {
// Event: EventType.SERVER_MESSAGE
EventType responseMessageType = EventType.SERVER_MESSAGE;
if (filterHelper.isEventToBeLogged(responseMessageType)) {
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
responseMessageType,
message,
maxMessageBytes,
EventLogger.SERVER,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
responseMessageType,
message,
maxMessageBytes,
EventLogger.SERVER,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
super.sendMessage(message);
}
@ -176,22 +171,20 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
@Override
public void close(Status status, Metadata trailers) {
// Event: EventType.SERVER_TRAILER
if (filterHelper.isEventToBeLogged(EventType.SERVER_TRAILER)) {
try {
helper.logTrailer(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
status,
trailers,
maxHeaderBytes,
EventLogger.SERVER,
callId,
null);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
try {
helper.logTrailer(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
status,
trailers,
maxHeaderBytes,
EventLogger.SERVER,
callId,
null);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
super.close(status, trailers);
}
@ -201,23 +194,22 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
return new SimpleForwardingServerCallListener<ReqT>(listener) {
@Override
public void onMessage(ReqT message) {
// Event: EventType.CLIENT_MESSAGE
EventType requestMessageType = EventType.CLIENT_MESSAGE;
if (filterHelper.isEventToBeLogged(requestMessageType)) {
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
requestMessageType,
message,
maxMessageBytes,
EventLogger.SERVER,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
requestMessageType,
message,
maxMessageBytes,
EventLogger.SERVER,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
super.onMessage(message);
}
@ -225,18 +217,16 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
@Override
public void onHalfClose() {
// Event: EventType.CLIENT_HALF_CLOSE
if (filterHelper.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)) {
try {
helper.logHalfClose(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
EventLogger.SERVER,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
try {
helper.logHalfClose(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
EventLogger.SERVER,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
super.onHalfClose();
}
@ -244,18 +234,16 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
@Override
public void onCancel() {
// Event: EventType.CANCEL
if (filterHelper.isEventToBeLogged(EventType.CANCEL)) {
try {
helper.logCancel(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
EventLogger.SERVER,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
try {
helper.logCancel(
seq.getAndIncrement(),
serviceName,
methodName,
authority,
EventLogger.SERVER,
callId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
super.onCancel();
}

View File

@ -65,21 +65,22 @@ public class GcpLogSink implements Sink {
private final Collection<String> servicesToExclude;
@VisibleForTesting
GcpLogSink(Logging loggingClient, String destinationProjectId, Map<String, String> locationTags,
GcpLogSink(Logging loggingClient, String projectId, Map<String, String> locationTags,
Map<String, String> customTags, Collection<String> servicesToExclude) {
this(destinationProjectId, locationTags, customTags, servicesToExclude);
this(projectId, locationTags, customTags, servicesToExclude);
this.gcpLoggingClient = loggingClient;
}
/**
* Retrieves a single instance of GcpLogSink.
* @param destinationProjectId cloud project id to write logs
*
* @param projectId GCP project id to write logs
* @param servicesToExclude service names for which log entries should not be generated
*/
public GcpLogSink(String destinationProjectId, Map<String, String> locationTags,
public GcpLogSink(String projectId, Map<String, String> locationTags,
Map<String, String> customTags, Collection<String> servicesToExclude) {
this.projectId = destinationProjectId;
this.customTags = getCustomTags(customTags, locationTags, destinationProjectId);
this.projectId = projectId;
this.customTags = getCustomTags(customTags, locationTags, projectId);
this.kubernetesResource = getResource(locationTags);
this.servicesToExclude = checkNotNull(servicesToExclude, "servicesToExclude");
}
@ -136,12 +137,12 @@ public class GcpLogSink implements Sink {
@VisibleForTesting
static Map<String, String> getCustomTags(Map<String, String> customTags,
Map<String, String> locationTags, String destinationProjectId) {
Map<String, String> locationTags, String projectId) {
ImmutableMap.Builder<String, String> tagsBuilder = ImmutableMap.builder();
String sourceProjectId = locationTags.get("project_id");
if (!Strings.isNullOrEmpty(destinationProjectId)
if (!Strings.isNullOrEmpty(projectId)
&& !Strings.isNullOrEmpty(sourceProjectId)
&& !Objects.equals(sourceProjectId, destinationProjectId)) {
&& !Objects.equals(sourceProjectId, projectId)) {
tagsBuilder.put("source_project_id", sourceProjectId);
}
if (customTags != null) {

View File

@ -17,15 +17,17 @@
package io.grpc.gcp.observability;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.StaticTestingClassLoader;
@ -37,7 +39,6 @@ import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
import java.io.IOException;
@ -48,6 +49,7 @@ import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@RunWith(JUnit4.class)
@ -98,9 +100,9 @@ public class LoggingTest {
}
@Test
public void clientServer_interceptorCalled_logFewEvents() throws Exception {
public void clientServer_interceptorCalled_logEvents_usingMockSink() throws Exception {
Class<?> runnable =
classLoader.loadClass(LoggingTest.StaticTestingClassLogFewEvents.class.getName());
classLoader.loadClass(StaticTestingClassLogEventsUsingMockSink.class.getName());
((Runnable) runnable.getDeclaredConstructor().newInstance()).run();
}
@ -122,16 +124,17 @@ public class LoggingTest {
when(config.isEnableCloudLogging()).thenReturn(true);
FilterParams logAlwaysFilterParams = FilterParams.create(true, 1024, 10);
when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class)))
when(mockFilterHelper.logRpcMethod(anyString(), eq(true)))
.thenReturn(logAlwaysFilterParams);
when(mockFilterHelper.logRpcMethod(anyString(), eq(false)))
.thenReturn(logAlwaysFilterParams);
when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))).thenReturn(true);
try (GcpObservability unused =
GcpObservability.grpcInit(
sink, config, channelInterceptorFactory, serverInterceptorFactory)) {
Server server =
ServerBuilder.forPort(0)
.addService(new LoggingTestHelper.SimpleServiceImpl())
.addService(new ObservabilityTestHelper.SimpleServiceImpl())
.build()
.start();
int port = cleanupRule.register(server).getPort();
@ -139,7 +142,7 @@ public class LoggingTest {
SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(
ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
assertThat(ObservabilityTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
assertThat(Mockito.mockingDetails(spyLogHelper).getInvocations().size()).isGreaterThan(11);
} catch (IOException e) {
@ -163,16 +166,17 @@ public class LoggingTest {
when(config.isEnableCloudLogging()).thenReturn(true);
FilterParams logNeverFilterParams = FilterParams.create(false, 0, 0);
when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class)))
when(mockFilterHelper.logRpcMethod(anyString(), eq(true)))
.thenReturn(logNeverFilterParams);
when(mockFilterHelper.logRpcMethod(anyString(), eq(false)))
.thenReturn(logNeverFilterParams);
when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))).thenReturn(true);
try (GcpObservability unused =
GcpObservability.grpcInit(
mockSink, config, channelInterceptorFactory, serverInterceptorFactory)) {
Server server =
ServerBuilder.forPort(0)
.addService(new LoggingTestHelper.SimpleServiceImpl())
.addService(new ObservabilityTestHelper.SimpleServiceImpl())
.build()
.start();
int port = cleanupRule.register(server).getPort();
@ -180,7 +184,7 @@ public class LoggingTest {
SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(
ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
assertThat(ObservabilityTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
verifyNoInteractions(spyLogHelper);
verifyNoInteractions(mockSink);
@ -190,41 +194,32 @@ public class LoggingTest {
}
}
public static final class StaticTestingClassLogFewEvents implements Runnable {
public static final class StaticTestingClassLogEventsUsingMockSink implements Runnable {
@Override
public void run() {
Sink mockSink = mock(GcpLogSink.class);
ObservabilityConfig config = mock(ObservabilityConfig.class);
LogHelper mockLogHelper = mock(LogHelper.class);
LogHelper spyLogHelper = spy(new LogHelper(mockSink));
ConfigFilterHelper mockFilterHelper2 = mock(ConfigFilterHelper.class);
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory =
new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2);
new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper2);
InternalLoggingServerInterceptor.Factory serverInterceptorFactory =
new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2);
new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper2);
when(config.isEnableCloudLogging()).thenReturn(true);
FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0);
when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class)))
when(mockFilterHelper2.logRpcMethod(anyString(), eq(true)))
.thenReturn(logAlwaysFilterParams);
when(mockFilterHelper2.logRpcMethod(anyString(), eq(false)))
.thenReturn(logAlwaysFilterParams);
when(mockFilterHelper2.isEventToBeLogged(EventType.CLIENT_HEADER))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.SERVER_HEADER))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)).thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.SERVER_TRAILER)).thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.CANCEL)).thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.CLIENT_MESSAGE))
.thenReturn(false);
when(mockFilterHelper2.isEventToBeLogged(EventType.SERVER_MESSAGE))
.thenReturn(false);
try (GcpObservability observability =
GcpObservability.grpcInit(
mockSink, config, channelInterceptorFactory, serverInterceptorFactory)) {
Server server =
ServerBuilder.forPort(0)
.addService(new LoggingTestHelper.SimpleServiceImpl())
.addService(new ObservabilityTestHelper.SimpleServiceImpl())
.build()
.start();
int port = cleanupRule.register(server).getPort();
@ -232,7 +227,7 @@ public class LoggingTest {
SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(
ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
assertThat(ObservabilityTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
// Total number of calls should have been 14 (6 from client and 6 from server)
// Since cancel is not invoked, it will be 12.
@ -240,9 +235,15 @@ public class LoggingTest {
// message(count:2)
// events are not in the event_types list, i.e 14 - 2(cancel) - 2(req_msg) - 2(resp_msg)
// = 8
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(8);
assertThat(Mockito.mockingDetails(mockSink).getInvocations().size()).isEqualTo(12);
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
verify(mockSink, times(12)).write(captor.capture());
for (GrpcLogRecord record : captor.getAllValues()) {
assertThat(record.getType()).isInstanceOf(GrpcLogRecord.EventType.class);
assertThat(record.getLogger()).isInstanceOf(GrpcLogRecord.EventLogger.class);
}
} catch (IOException e) {
throw new AssertionError("Exception while testing logging event filter", e);
throw new AssertionError("Exception while testing logging using mock sink", e);
}
}
}

View File

@ -97,7 +97,7 @@ public class MetricsTest {
mock(InternalLoggingServerInterceptor.Factory.class);
when(mockConfig.isEnableCloudMonitoring()).thenReturn(true);
when(mockConfig.getDestinationProjectId()).thenReturn(PROJECT_ID);
when(mockConfig.getProjectId()).thenReturn(PROJECT_ID);
try {
GcpObservability observability =
@ -107,7 +107,7 @@ public class MetricsTest {
Server server =
ServerBuilder.forPort(0)
.addService(new LoggingTestHelper.SimpleServiceImpl())
.addService(new ObservabilityTestHelper.SimpleServiceImpl())
.build()
.start();
int port = cleanupRule.register(server).getPort();
@ -115,7 +115,7 @@ public class MetricsTest {
SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(
ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
assertThat(ObservabilityTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
// Adding sleep to ensure metrics are exported before querying cloud monitoring backend
TimeUnit.SECONDS.sleep(40);

View File

@ -18,23 +18,25 @@ package io.grpc.gcp.observability;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import io.grpc.gcp.observability.ObservabilityConfig.LogFilter;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.opencensus.trace.Sampler;
import io.opencensus.trace.samplers.Samplers;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@ -43,82 +45,159 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class ObservabilityConfigImplTest {
private static final String EVENT_TYPES = "{\n"
+ " \"enable_cloud_logging\": false,\n"
+ " \"event_types\": "
+ "[\"CLIENT_HEADER\", \"CLIENT_HALF_CLOSE\", \"SERVER_TRAILER\"]\n"
+ "}";
private static final String LOG_FILTERS = "{\n"
+ " \"enable_cloud_logging\": true,\n"
+ " \"destination_project_id\": \"grpc-testing\",\n"
+ " \"log_filters\": [{\n"
+ " \"pattern\": \"*/*\",\n"
+ " \"header_bytes\": 4096,\n"
+ " \"message_bytes\": 2048\n"
+ " },"
+ " {\n"
+ " \"pattern\": \"service1/Method2\"\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_logging\": {\n"
+ " \"client_rpc_events\": [{\n"
+ " \"methods\": [\"*\"],\n"
+ " \"max_metadata_bytes\": 4096\n"
+ " }"
+ " ],\n"
+ " \"server_rpc_events\": [{\n"
+ " \"methods\": [\"*\"],\n"
+ " \"max_metadata_bytes\": 32,\n"
+ " \"max_message_bytes\": 64\n"
+ " }"
+ " ]\n"
+ " }\n"
+ "}";
private static final String DEST_PROJECT_ID = "{\n"
+ " \"enable_cloud_logging\": true,\n"
+ " \"destination_project_id\": \"grpc-testing\"\n"
private static final String CLIENT_LOG_FILTERS = "{\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_logging\": {\n"
+ " \"client_rpc_events\": [{\n"
+ " \"methods\": [\"*\"],\n"
+ " \"max_metadata_bytes\": 4096,\n"
+ " \"max_message_bytes\": 2048\n"
+ " },"
+ " {\n"
+ " \"methods\": [\"service1/Method2\", \"Service2/*\"],\n"
+ " \"exclude\": true\n"
+ " }"
+ " ]\n"
+ " }\n"
+ "}";
private static final String DISABLE_CLOUD_LOGGING = "{\n"
+ " \"enable_cloud_logging\": false\n"
private static final String SERVER_LOG_FILTERS = "{\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_logging\": {\n"
+ " \"server_rpc_events\": [{\n"
+ " \"methods\": [\"service1/method4\", \"service2/method234\"],\n"
+ " \"max_metadata_bytes\": 32,\n"
+ " \"max_message_bytes\": 64\n"
+ " },"
+ " {\n"
+ " \"methods\": [\"service4/*\", \"Service2/*\"],\n"
+ " \"exclude\": true\n"
+ " }"
+ " ]\n"
+ " }\n"
+ "}";
private static final String PROJECT_ID = "{\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_logging\": {},\n"
+ " \"project_id\": \"grpc-testing\"\n"
+ "}";
private static final String EMPTY_CONFIG = "{}";
private static final String ENABLE_CLOUD_MONITORING_AND_TRACING = "{\n"
+ " \"enable_cloud_monitoring\": true,\n"
+ " \"enable_cloud_trace\": true\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_monitoring\": {},\n"
+ " \"cloud_trace\": {}\n"
+ "}";
private static final String GLOBAL_TRACING_ALWAYS_SAMPLER = "{\n"
+ " \"enable_cloud_trace\": true,\n"
+ " \"global_trace_sampling_rate\": 1.00\n"
private static final String ENABLE_CLOUD_MONITORING = "{\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_monitoring\": {}\n"
+ "}";
private static final String GLOBAL_TRACING_NEVER_SAMPLER = "{\n"
+ " \"enable_cloud_trace\": true,\n"
+ " \"global_trace_sampling_rate\": 0.00\n"
private static final String ENABLE_CLOUD_TRACE = "{\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_trace\": {}\n"
+ "}";
private static final String GLOBAL_TRACING_PROBABILISTIC_SAMPLER = "{\n"
+ " \"enable_cloud_trace\": true,\n"
+ " \"global_trace_sampling_rate\": 0.75\n"
private static final String TRACING_ALWAYS_SAMPLER = "{\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_trace\": {\n"
+ " \"sampling_rate\": 1.00\n"
+ " }\n"
+ "}";
private static final String GLOBAL_TRACING_DEFAULT_SAMPLER = "{\n"
+ " \"enable_cloud_trace\": true\n"
private static final String TRACING_NEVER_SAMPLER = "{\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_trace\": {\n"
+ " \"sampling_rate\": 0.00\n"
+ " }\n"
+ "}";
private static final String GLOBAL_TRACING_BADPROBABILISTIC_SAMPLER = "{\n"
+ " \"enable_cloud_tracing\": true,\n"
+ " \"global_trace_sampling_rate\": -0.75\n"
private static final String TRACING_PROBABILISTIC_SAMPLER = "{\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_trace\": {\n"
+ " \"sampling_rate\": 0.75\n"
+ " }\n"
+ "}";
private static final String TRACING_DEFAULT_SAMPLER = "{\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_trace\": {}\n"
+ "}";
private static final String GLOBAL_TRACING_BAD_PROBABILISTIC_SAMPLER = "{\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_trace\": {\n"
+ " \"sampling_rate\": -0.75\n"
+ " }\n"
+ "}";
private static final String CUSTOM_TAGS = "{\n"
+ " \"enable_cloud_logging\": true,\n"
+ " \"custom_tags\": {\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_logging\": {},\n"
+ " \"labels\": {\n"
+ " \"SOURCE_VERSION\" : \"J2e1Cf\",\n"
+ " \"SERVICE_NAME\" : \"payment-service\",\n"
+ " \"ENTRYPOINT_SCRIPT\" : \"entrypoint.sh\"\n"
+ " }\n"
+ "}";
private static final String BAD_CUSTOM_TAGS = "{\n"
+ " \"enable_cloud_monitoring\": true,\n"
+ " \"custom_tags\": {\n"
private static final String BAD_CUSTOM_TAGS =
"{\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_monitoring\": {},\n"
+ " \"labels\": {\n"
+ " \"SOURCE_VERSION\" : \"J2e1Cf\",\n"
+ " \"SERVICE_NAME\" : { \"SUB_SERVICE_NAME\" : \"payment-service\"},\n"
+ " \"ENTRYPOINT_SCRIPT\" : \"entrypoint.sh\"\n"
+ " }\n"
+ "}";
private static final String LOG_FILTER_GLOBAL_EXCLUDE =
"{\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_logging\": {\n"
+ " \"client_rpc_events\": [{\n"
+ " \"methods\": [\"service1/Method2\", \"*\"],\n"
+ " \"max_metadata_bytes\": 20,\n"
+ " \"max_message_bytes\": 15,\n"
+ " \"exclude\": true\n"
+ " }"
+ " ]\n"
+ " }\n"
+ "}";
private static final String LOG_FILTER_INVALID_METHOD =
"{\n"
+ " \"project_id\": \"grpc-testing\",\n"
+ " \"cloud_logging\": {\n"
+ " \"client_rpc_events\": [{\n"
+ " \"methods\": [\"s*&%ervice1/Method2\", \"*\"],\n"
+ " \"max_metadata_bytes\": 20\n"
+ " }"
+ " ]\n"
+ " }\n"
+ "}";
ObservabilityConfigImpl observabilityConfig = new ObservabilityConfigImpl();
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
@ -129,62 +208,126 @@ public class ObservabilityConfigImplTest {
observabilityConfig.parse(null);
fail("exception expected!");
} catch (IllegalArgumentException iae) {
assertThat(iae.getMessage()).isEqualTo("GRPC_CONFIG_OBSERVABILITY value is null!");
assertThat(iae.getMessage()).isEqualTo("GRPC_GCP_OBSERVABILITY_CONFIG value is null!");
}
}
@Test
public void emptyConfig() throws IOException {
observabilityConfig.parse("{}");
observabilityConfig.parse(EMPTY_CONFIG);
assertFalse(observabilityConfig.isEnableCloudLogging());
assertFalse(observabilityConfig.isEnableCloudMonitoring());
assertFalse(observabilityConfig.isEnableCloudTracing());
assertNull(observabilityConfig.getDestinationProjectId());
assertNull(observabilityConfig.getLogFilters());
assertNull(observabilityConfig.getEventTypes());
assertThat(observabilityConfig.getClientLogFilters()).isEmpty();
assertThat(observabilityConfig.getServerLogFilters()).isEmpty();
assertThat(observabilityConfig.getSampler()).isNull();
assertThat(observabilityConfig.getProjectId()).isNull();
assertThat(observabilityConfig.getCustomTags()).isEmpty();
}
@Test
public void disableCloudLogging() throws IOException {
observabilityConfig.parse(DISABLE_CLOUD_LOGGING);
assertFalse(observabilityConfig.isEnableCloudLogging());
assertFalse(observabilityConfig.isEnableCloudMonitoring());
assertFalse(observabilityConfig.isEnableCloudTracing());
assertNull(observabilityConfig.getDestinationProjectId());
assertNull(observabilityConfig.getLogFilters());
assertNull(observabilityConfig.getEventTypes());
public void emptyConfigFile() throws IOException {
File configFile = tempFolder.newFile();
try {
observabilityConfig.parseFile(configFile.getAbsolutePath());
fail("exception expected!");
} catch (IllegalArgumentException iae) {
assertThat(iae.getMessage()).isEqualTo(
"GRPC_GCP_OBSERVABILITY_CONFIG_FILE is empty!");
}
}
@Test
public void destProjectId() throws IOException {
observabilityConfig.parse(DEST_PROJECT_ID);
public void setProjectId() throws IOException {
observabilityConfig.parse(PROJECT_ID);
assertTrue(observabilityConfig.isEnableCloudLogging());
assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing");
assertThat(observabilityConfig.getProjectId()).isEqualTo("grpc-testing");
}
@Test
public void logFilters() throws IOException {
observabilityConfig.parse(LOG_FILTERS);
assertTrue(observabilityConfig.isEnableCloudLogging());
assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing");
List<LogFilter> logFilters = observabilityConfig.getLogFilters();
assertThat(logFilters).hasSize(2);
assertThat(logFilters.get(0).pattern).isEqualTo("*/*");
assertThat(logFilters.get(0).headerBytes).isEqualTo(4096);
assertThat(logFilters.get(0).messageBytes).isEqualTo(2048);
assertThat(logFilters.get(1).pattern).isEqualTo("service1/Method2");
assertThat(logFilters.get(1).headerBytes).isNull();
assertThat(logFilters.get(1).messageBytes).isNull();
assertThat(observabilityConfig.getProjectId()).isEqualTo("grpc-testing");
List<LogFilter> clientLogFilters = observabilityConfig.getClientLogFilters();
assertThat(clientLogFilters).hasSize(1);
assertThat(clientLogFilters.get(0).headerBytes).isEqualTo(4096);
assertThat(clientLogFilters.get(0).messageBytes).isEqualTo(0);
assertThat(clientLogFilters.get(0).excludePattern).isFalse();
assertThat(clientLogFilters.get(0).matchAll).isTrue();
assertThat(clientLogFilters.get(0).services).isEmpty();
assertThat(clientLogFilters.get(0).methods).isEmpty();
List<LogFilter> serverLogFilters = observabilityConfig.getServerLogFilters();
assertThat(serverLogFilters).hasSize(1);
assertThat(serverLogFilters.get(0).headerBytes).isEqualTo(32);
assertThat(serverLogFilters.get(0).messageBytes).isEqualTo(64);
assertThat(serverLogFilters.get(0).excludePattern).isFalse();
assertThat(serverLogFilters.get(0).matchAll).isTrue();
assertThat(serverLogFilters.get(0).services).isEmpty();
assertThat(serverLogFilters.get(0).methods).isEmpty();
}
@Test
public void eventTypes() throws IOException {
observabilityConfig.parse(EVENT_TYPES);
assertFalse(observabilityConfig.isEnableCloudLogging());
List<EventType> eventTypes = observabilityConfig.getEventTypes();
assertThat(eventTypes).isEqualTo(
ImmutableList.of(EventType.CLIENT_HEADER, EventType.CLIENT_HALF_CLOSE,
EventType.SERVER_TRAILER));
public void setClientLogFilters() throws IOException {
observabilityConfig.parse(CLIENT_LOG_FILTERS);
assertTrue(observabilityConfig.isEnableCloudLogging());
assertThat(observabilityConfig.getProjectId()).isEqualTo("grpc-testing");
List<LogFilter> logFilterList = observabilityConfig.getClientLogFilters();
assertThat(logFilterList).hasSize(2);
assertThat(logFilterList.get(0).headerBytes).isEqualTo(4096);
assertThat(logFilterList.get(0).messageBytes).isEqualTo(2048);
assertThat(logFilterList.get(0).excludePattern).isFalse();
assertThat(logFilterList.get(0).matchAll).isTrue();
assertThat(logFilterList.get(0).services).isEmpty();
assertThat(logFilterList.get(0).methods).isEmpty();
assertThat(logFilterList.get(1).headerBytes).isEqualTo(0);
assertThat(logFilterList.get(1).messageBytes).isEqualTo(0);
assertThat(logFilterList.get(1).excludePattern).isTrue();
assertThat(logFilterList.get(1).matchAll).isFalse();
assertThat(logFilterList.get(1).services).isEqualTo(Collections.singleton("Service2"));
assertThat(logFilterList.get(1).methods)
.isEqualTo(Collections.singleton("service1/Method2"));
}
@Test
public void setServerLogFilters() throws IOException {
Set<String> expectedMethods = Stream.of("service1/method4", "service2/method234")
.collect(Collectors.toCollection(HashSet::new));
observabilityConfig.parse(SERVER_LOG_FILTERS);
assertTrue(observabilityConfig.isEnableCloudLogging());
List<LogFilter> logFilterList = observabilityConfig.getServerLogFilters();
assertThat(logFilterList).hasSize(2);
assertThat(logFilterList.get(0).headerBytes).isEqualTo(32);
assertThat(logFilterList.get(0).messageBytes).isEqualTo(64);
assertThat(logFilterList.get(0).excludePattern).isFalse();
assertThat(logFilterList.get(0).matchAll).isFalse();
assertThat(logFilterList.get(0).services).isEmpty();
assertThat(logFilterList.get(0).methods)
.isEqualTo(expectedMethods);
Set<String> expectedServices = Stream.of("service4", "Service2")
.collect(Collectors.toCollection(HashSet::new));
assertThat(logFilterList.get(1).headerBytes).isEqualTo(0);
assertThat(logFilterList.get(1).messageBytes).isEqualTo(0);
assertThat(logFilterList.get(1).excludePattern).isTrue();
assertThat(logFilterList.get(1).matchAll).isFalse();
assertThat(logFilterList.get(1).services).isEqualTo(expectedServices);
assertThat(logFilterList.get(1).methods).isEmpty();
}
@Test
public void enableCloudMonitoring() throws IOException {
observabilityConfig.parse(ENABLE_CLOUD_MONITORING);
assertTrue(observabilityConfig.isEnableCloudMonitoring());
}
@Test
public void enableCloudTracing() throws IOException {
observabilityConfig.parse(ENABLE_CLOUD_TRACE);
assertTrue(observabilityConfig.isEnableCloudTracing());
}
@Test
@ -197,7 +340,7 @@ public class ObservabilityConfigImplTest {
@Test
public void alwaysSampler() throws IOException {
observabilityConfig.parse(GLOBAL_TRACING_ALWAYS_SAMPLER);
observabilityConfig.parse(TRACING_ALWAYS_SAMPLER);
assertTrue(observabilityConfig.isEnableCloudTracing());
Sampler sampler = observabilityConfig.getSampler();
assertThat(sampler).isNotNull();
@ -206,7 +349,7 @@ public class ObservabilityConfigImplTest {
@Test
public void neverSampler() throws IOException {
observabilityConfig.parse(GLOBAL_TRACING_NEVER_SAMPLER);
observabilityConfig.parse(TRACING_NEVER_SAMPLER);
assertTrue(observabilityConfig.isEnableCloudTracing());
Sampler sampler = observabilityConfig.getSampler();
assertThat(sampler).isNotNull();
@ -215,7 +358,7 @@ public class ObservabilityConfigImplTest {
@Test
public void probabilisticSampler() throws IOException {
observabilityConfig.parse(GLOBAL_TRACING_PROBABILISTIC_SAMPLER);
observabilityConfig.parse(TRACING_PROBABILISTIC_SAMPLER);
assertTrue(observabilityConfig.isEnableCloudTracing());
Sampler sampler = observabilityConfig.getSampler();
assertThat(sampler).isNotNull();
@ -224,7 +367,7 @@ public class ObservabilityConfigImplTest {
@Test
public void defaultSampler() throws IOException {
observabilityConfig.parse(GLOBAL_TRACING_DEFAULT_SAMPLER);
observabilityConfig.parse(TRACING_DEFAULT_SAMPLER);
assertTrue(observabilityConfig.isEnableCloudTracing());
Sampler sampler = observabilityConfig.getSampler();
assertThat(sampler).isNotNull();
@ -234,29 +377,44 @@ public class ObservabilityConfigImplTest {
@Test
public void badProbabilisticSampler_error() throws IOException {
try {
observabilityConfig.parse(GLOBAL_TRACING_BADPROBABILISTIC_SAMPLER);
observabilityConfig.parse(GLOBAL_TRACING_BAD_PROBABILISTIC_SAMPLER);
fail("exception expected!");
} catch (IllegalArgumentException iae) {
assertThat(iae.getMessage()).isEqualTo(
"'global_trace_sampling_rate' needs to be between [0.0, 1.0]");
"'sampling_rate' needs to be between [0.0, 1.0]");
}
}
@Test
public void configFileLogFilters() throws Exception {
File configFile = tempFolder.newFile();
Files.write(Paths.get(configFile.getAbsolutePath()), LOG_FILTERS.getBytes(Charsets.US_ASCII));
Files.write(
Paths.get(configFile.getAbsolutePath()), CLIENT_LOG_FILTERS.getBytes(Charsets.US_ASCII));
observabilityConfig.parseFile(configFile.getAbsolutePath());
assertTrue(observabilityConfig.isEnableCloudLogging());
assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing");
List<LogFilter> logFilters = observabilityConfig.getLogFilters();
assertThat(observabilityConfig.getProjectId()).isEqualTo("grpc-testing");
List<LogFilter> logFilters = observabilityConfig.getClientLogFilters();
assertThat(logFilters).hasSize(2);
assertThat(logFilters.get(0).pattern).isEqualTo("*/*");
assertThat(logFilters.get(0).headerBytes).isEqualTo(4096);
assertThat(logFilters.get(0).messageBytes).isEqualTo(2048);
assertThat(logFilters.get(1).pattern).isEqualTo("service1/Method2");
assertThat(logFilters.get(1).headerBytes).isNull();
assertThat(logFilters.get(1).messageBytes).isNull();
assertThat(logFilters.get(1).headerBytes).isEqualTo(0);
assertThat(logFilters.get(1).messageBytes).isEqualTo(0);
assertThat(logFilters).hasSize(2);
assertThat(logFilters.get(0).headerBytes).isEqualTo(4096);
assertThat(logFilters.get(0).messageBytes).isEqualTo(2048);
assertThat(logFilters.get(0).excludePattern).isFalse();
assertThat(logFilters.get(0).matchAll).isTrue();
assertThat(logFilters.get(0).services).isEmpty();
assertThat(logFilters.get(0).methods).isEmpty();
assertThat(logFilters.get(1).headerBytes).isEqualTo(0);
assertThat(logFilters.get(1).messageBytes).isEqualTo(0);
assertThat(logFilters.get(1).excludePattern).isTrue();
assertThat(logFilters.get(1).matchAll).isFalse();
assertThat(logFilters.get(1).services).isEqualTo(Collections.singleton("Service2"));
assertThat(logFilters.get(1).methods)
.isEqualTo(Collections.singleton("service1/Method2"));
}
@Test
@ -277,7 +435,29 @@ public class ObservabilityConfigImplTest {
fail("exception expected!");
} catch (IllegalArgumentException iae) {
assertThat(iae.getMessage()).isEqualTo(
"'custom_tags' needs to be a map of <string, string>");
"'labels' needs to be a map of <string, string>");
}
}
}
@Test
public void globalLogFilterExclude() throws IOException {
try {
observabilityConfig.parse(LOG_FILTER_GLOBAL_EXCLUDE);
fail("exception expected!");
} catch (IllegalArgumentException iae) {
assertThat(iae.getMessage()).isEqualTo(
"cannot have 'exclude' and '*' wildcard in the same filter");
}
}
@Test
public void logFilterInvalidMethod() throws IOException {
try {
observabilityConfig.parse(LOG_FILTER_INVALID_METHOD);
fail("exception expected!");
} catch (IllegalArgumentException iae) {
assertThat(iae.getMessage()).contains(
"invalid service or method filter");
}
}
}

View File

@ -21,7 +21,7 @@ import io.grpc.testing.protobuf.SimpleRequest;
import io.grpc.testing.protobuf.SimpleResponse;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
public class LoggingTestHelper {
public class ObservabilityTestHelper {
static String makeUnaryRpcViaClientStub(
String requestMessage, SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub) {

View File

@ -100,7 +100,7 @@ public class TracesTest {
when(mockConfig.isEnableCloudTracing()).thenReturn(true);
when(mockConfig.getSampler()).thenReturn(Samplers.alwaysSample());
when(mockConfig.getDestinationProjectId()).thenReturn(PROJECT_ID);
when(mockConfig.getProjectId()).thenReturn(PROJECT_ID);
try {
GcpObservability observability =
@ -110,7 +110,7 @@ public class TracesTest {
Server server =
ServerBuilder.forPort(0)
.addService(new LoggingTestHelper.SimpleServiceImpl())
.addService(new ObservabilityTestHelper.SimpleServiceImpl())
.build()
.start();
int port = cleanupRule.register(server).getPort();
@ -118,7 +118,7 @@ public class TracesTest {
SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(
ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
assertThat(ObservabilityTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
// Adding sleep to ensure traces are exported before querying cloud tracing backend
TimeUnit.SECONDS.sleep(10);

View File

@ -17,44 +17,32 @@
package io.grpc.gcp.observability.interceptors;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.grpc.MethodDescriptor;
import io.grpc.gcp.observability.ObservabilityConfig;
import io.grpc.gcp.observability.ObservabilityConfig.LogFilter;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.testing.TestMethodDescriptors;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
public class ConfigFilterHelperTest {
private static final ImmutableList<LogFilter> configLogFilters =
ImmutableList.of(
new LogFilter("service1/Method2",1024,1024),
new LogFilter("service2/*",2048,1024),
new LogFilter("*",128,128),
new LogFilter("service2/*",2048,1024));
private static final ImmutableList<EventType> configEventTypes =
ImmutableList.of(
EventType.CLIENT_HEADER,
EventType.CLIENT_HALF_CLOSE,
EventType.SERVER_TRAILER);
private final MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod()
.toBuilder();
private MethodDescriptor<Void, Void> method;
new LogFilter(Collections.emptySet(), Collections.singleton("service1/Method2"), false,
1024, 1024, false),
new LogFilter(
Collections.singleton("service2"), Collections.singleton("service4/method2"), false,
2048, 1024, false),
new LogFilter(
Collections.singleton("service2"), Collections.singleton("service4/method3"), false,
2048, 1024, true),
new LogFilter(
Collections.emptySet(), Collections.emptySet(), true,
128, 128, false));
private ObservabilityConfig mockConfig;
private ConfigFilterHelper configFilterHelper;
@ -62,157 +50,100 @@ public class ConfigFilterHelperTest {
@Before
public void setup() {
mockConfig = mock(ObservabilityConfig.class);
configFilterHelper = new ConfigFilterHelper(mockConfig);
configFilterHelper = ConfigFilterHelper.getInstance(mockConfig);
}
@Test
public void disableCloudLogging_emptyLogFilters() {
when(mockConfig.isEnableCloudLogging()).thenReturn(false);
assertFalse(configFilterHelper.methodOrServiceFilterPresent);
assertThat(configFilterHelper.perServiceFilters).isEmpty();
assertThat(configFilterHelper.perServiceFilters).isEmpty();
assertThat(configFilterHelper.perMethodFilters).isEmpty();
assertThat(configFilterHelper.logEventTypeSet).isNull();
}
@Test
public void enableCloudLogging_emptyLogFilters() {
public void enableCloudLogging_withoutLogFilters() {
when(mockConfig.isEnableCloudLogging()).thenReturn(true);
when(mockConfig.getLogFilters()).thenReturn(null);
when(mockConfig.getEventTypes()).thenReturn(null);
configFilterHelper.setMethodOrServiceFilterMaps();
configFilterHelper.setEventFilterSet();
assertFalse(configFilterHelper.methodOrServiceFilterPresent);
assertThat(configFilterHelper.perServiceFilters).isEmpty();
assertThat(configFilterHelper.perServiceFilters).isEmpty();
assertThat(configFilterHelper.perMethodFilters).isEmpty();
assertThat(configFilterHelper.logEventTypeSet).isNull();
assertThat(mockConfig.getClientLogFilters()).isEmpty();
assertThat(mockConfig.getServerLogFilters()).isEmpty();
}
@Test
public void enableCloudLogging_withLogFilters() {
public void checkMethodLog_withoutLogFilters() {
when(mockConfig.isEnableCloudLogging()).thenReturn(true);
when(mockConfig.getLogFilters()).thenReturn(configLogFilters);
when(mockConfig.getEventTypes()).thenReturn(configEventTypes);
assertThat(mockConfig.getClientLogFilters()).isEmpty();
assertThat(mockConfig.getServerLogFilters()).isEmpty();
configFilterHelper.setMethodOrServiceFilterMaps();
configFilterHelper.setEventFilterSet();
assertTrue(configFilterHelper.methodOrServiceFilterPresent);
Map<String, FilterParams> expectedServiceFilters = new HashMap<>();
expectedServiceFilters.put("*",
FilterParams.create(true, 128, 128));
expectedServiceFilters.put("service2",
FilterParams.create(true, 2048, 1024));
assertThat(configFilterHelper.perServiceFilters).isEqualTo(expectedServiceFilters);
Map<String, FilterParams> expectedMethodFilters = new HashMap<>();
expectedMethodFilters.put("service1/Method2",
FilterParams.create(true, 1024, 1024));
assertThat(configFilterHelper.perMethodFilters).isEqualTo(expectedMethodFilters);
Set<EventType> expectedLogEventTypeSet = ImmutableSet.copyOf(configEventTypes);
assertThat(configFilterHelper.logEventTypeSet).isEqualTo(expectedLogEventTypeSet);
FilterParams expectedParams =
FilterParams.create(false, 0, 0);
FilterParams clientResultParams
= configFilterHelper.logRpcMethod("service3/Method3", true);
assertThat(clientResultParams).isEqualTo(expectedParams);
FilterParams serverResultParams
= configFilterHelper.logRpcMethod("service3/Method3", false);
assertThat(serverResultParams).isEqualTo(expectedParams);
}
@Test
public void checkMethodAlwaysLogged() {
List<LogFilter> sampleLogFilters = ImmutableList.of(
new LogFilter("*", 4096, 4096));
when(mockConfig.getLogFilters()).thenReturn(sampleLogFilters);
configFilterHelper.setMethodOrServiceFilterMaps();
List<LogFilter> sampleLogFilters =
ImmutableList.of(
new LogFilter(
Collections.emptySet(), Collections.emptySet(), true,
4096, 4096, false));
when(mockConfig.getClientLogFilters()).thenReturn(sampleLogFilters);
when(mockConfig.getServerLogFilters()).thenReturn(sampleLogFilters);
FilterParams expectedParams =
FilterParams.create(true, 4096, 4096);
method = builder.setFullMethodName("service1/Method6").build();
FilterParams resultParams
= configFilterHelper.isMethodToBeLogged(method);
assertThat(resultParams).isEqualTo(expectedParams);
FilterParams clientResultParams
= configFilterHelper.logRpcMethod("service1/Method6", true);
assertThat(clientResultParams).isEqualTo(expectedParams);
FilterParams serverResultParams
= configFilterHelper.logRpcMethod("service1/Method6", false);
assertThat(serverResultParams).isEqualTo(expectedParams);
}
@Test
public void checkMethodNotToBeLogged() {
List<LogFilter> sampleLogFilters = ImmutableList.of(
new LogFilter("service1/Method2", 1024, 1024),
new LogFilter("service2/*", 2048, 1024));
when(mockConfig.getLogFilters()).thenReturn(sampleLogFilters);
configFilterHelper.setMethodOrServiceFilterMaps();
List<LogFilter> sampleLogFilters =
ImmutableList.of(
new LogFilter(Collections.emptySet(), Collections.singleton("service2/*"), false,
1024, 1024, true),
new LogFilter(
Collections.singleton("service2/Method1"), Collections.emptySet(), false,
2048, 1024, false));
when(mockConfig.getClientLogFilters()).thenReturn(sampleLogFilters);
when(mockConfig.getServerLogFilters()).thenReturn(sampleLogFilters);
FilterParams expectedParams =
FilterParams.create(false, 0, 0);
method = builder.setFullMethodName("service3/Method3").build();
FilterParams resultParams
= configFilterHelper.isMethodToBeLogged(method);
assertThat(resultParams).isEqualTo(expectedParams);
FilterParams clientResultParams1
= configFilterHelper.logRpcMethod("service3/Method3", true);
assertThat(clientResultParams1).isEqualTo(expectedParams);
FilterParams clientResultParams2
= configFilterHelper.logRpcMethod("service2/Method1", true);
assertThat(clientResultParams2).isEqualTo(expectedParams);
FilterParams serverResultParams
= configFilterHelper.logRpcMethod("service2/Method1", false);
assertThat(serverResultParams).isEqualTo(expectedParams);
}
@Test
public void checkMethodToBeLoggedConditional() {
when(mockConfig.getLogFilters()).thenReturn(configLogFilters);
configFilterHelper.setMethodOrServiceFilterMaps();
when(mockConfig.getClientLogFilters()).thenReturn(configLogFilters);
when(mockConfig.getServerLogFilters()).thenReturn(configLogFilters);
FilterParams expectedParams =
FilterParams.create(true, 1024, 1024);
method = builder.setFullMethodName("service1/Method2").build();
FilterParams resultParams
= configFilterHelper.isMethodToBeLogged(method);
= configFilterHelper.logRpcMethod("service1/Method2", true);
assertThat(resultParams).isEqualTo(expectedParams);
FilterParams expectedParamsWildCard =
FilterParams.create(true, 2048, 1024);
method = builder.setFullMethodName("service2/Method1").build();
FilterParams resultParamsWildCard
= configFilterHelper.isMethodToBeLogged(method);
= configFilterHelper.logRpcMethod("service2/Method1", true);
assertThat(resultParamsWildCard).isEqualTo(expectedParamsWildCard);
}
@Test
public void checkEventToBeLogged_noFilter_defaultLogAllEventTypes() {
List<EventType> eventList = new ArrayList<>();
eventList.add(EventType.CLIENT_HEADER);
eventList.add(EventType.SERVER_HEADER);
eventList.add(EventType.CLIENT_MESSAGE);
eventList.add(EventType.SERVER_MESSAGE);
eventList.add(EventType.CLIENT_HALF_CLOSE);
eventList.add(EventType.SERVER_TRAILER);
eventList.add(EventType.CANCEL);
for (EventType event : eventList) {
assertTrue(configFilterHelper.isEventToBeLogged(event));
}
}
@Test
public void checkEventToBeLogged_emptyFilter_doNotLogEventTypes() {
when(mockConfig.getEventTypes()).thenReturn(new ArrayList<>());
configFilterHelper.setEventFilterSet();
List<EventType> eventList = new ArrayList<>();
eventList.add(EventType.CLIENT_HEADER);
eventList.add(EventType.SERVER_HEADER);
eventList.add(EventType.CLIENT_MESSAGE);
eventList.add(EventType.SERVER_MESSAGE);
eventList.add(EventType.CLIENT_HALF_CLOSE);
eventList.add(EventType.SERVER_TRAILER);
eventList.add(EventType.CANCEL);
for (EventType event : eventList) {
assertFalse(configFilterHelper.isEventToBeLogged(event));
}
}
@Test
public void checkEventToBeLogged_withEventTypesFromConfig() {
when(mockConfig.getEventTypes()).thenReturn(configEventTypes);
configFilterHelper.setEventFilterSet();
EventType logEventType = EventType.CLIENT_HEADER;
assertTrue(configFilterHelper.isEventToBeLogged(logEventType));
EventType doNotLogEventType = EventType.SERVER_MESSAGE;
assertFalse(configFilterHelper.isEventToBeLogged(doNotLogEventType));
FilterParams excludeParams =
FilterParams.create(false, 0, 0);
FilterParams serverResultParams
= configFilterHelper.logRpcMethod("service4/method3", false);
assertThat(serverResultParams).isEqualTo(excludeParams);
}
}

View File

@ -26,7 +26,6 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -108,8 +107,6 @@ public class InternalLoggingChannelInterceptorTest {
cancelCalled = SettableFuture.create();
peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234);
filterParams = FilterParams.create(true, 0, 0);
when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class)))
.thenReturn(true);
}
@Test
@ -164,7 +161,7 @@ public class InternalLoggingChannelInterceptorTest {
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), true))
.thenReturn(filterParams);
ClientCall<byte[], byte[]> interceptedLoggingCall =
@ -329,7 +326,7 @@ public class InternalLoggingChannelInterceptorTest {
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), true))
.thenReturn(filterParams);
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
@ -387,7 +384,7 @@ public class InternalLoggingChannelInterceptorTest {
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), true))
.thenReturn(filterParams);
callFuture.set(
@ -449,7 +446,7 @@ public class InternalLoggingChannelInterceptorTest {
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), true))
.thenReturn(filterParams);
callFuture.set(
@ -547,7 +544,7 @@ public class InternalLoggingChannelInterceptorTest {
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), true))
.thenReturn(FilterParams.create(false, 0, 0));
ClientCall<byte[], byte[]> interceptedLoggingCall =
@ -612,7 +609,7 @@ public class InternalLoggingChannelInterceptorTest {
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), true))
.thenReturn(FilterParams.create(true, 10, 10));
ClientCall<byte[], byte[]> interceptedLoggingCall =
@ -636,106 +633,4 @@ public class InternalLoggingChannelInterceptorTest {
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(7);
}
}
@Test
public void eventFilter_enabled() {
when(mockFilterHelper.isEventToBeLogged(EventType.CLIENT_HEADER)).thenReturn(false);
when(mockFilterHelper.isEventToBeLogged(EventType.SERVER_HEADER)).thenReturn(false);
Channel channel = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
return new NoopClientCall<RequestT, ResponseT>() {
@Override
@SuppressWarnings("unchecked")
public void start(Listener<ResponseT> responseListener, Metadata headers) {
interceptedListener.set((Listener<byte[]>) responseListener);
actualClientInitial.set(headers);
}
@Override
public void sendMessage(RequestT message) {
actualRequest.set(message);
}
@Override
public void cancel(String message, Throwable cause) {
cancelCalled.set(null);
}
@Override
public void halfClose() {
halfCloseCalled.set(null);
}
@Override
public Attributes getAttributes() {
return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build();
}
};
}
@Override
public String authority() {
return "the-authority";
}
};
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
.thenReturn(FilterParams.create(true, 10, 10));
ClientCall<byte[], byte[]> interceptedLoggingCall =
factory.create()
.interceptCall(method,
CallOptions.DEFAULT,
channel);
{
interceptedLoggingCall.start(mockListener, new Metadata());
verify(mockLogHelper, never()).logClientHeader(
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
any(Duration.class),
any(Metadata.class),
anyInt(),
any(GrpcLogRecord.EventLogger.class),
anyString(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
ArgumentMatchers.any()));
interceptedListener.get().onHeaders(new Metadata());
verify(mockLogHelper, never()).logServerHeader(
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
any(Metadata.class),
anyInt(),
any(GrpcLogRecord.EventLogger.class),
anyString(),
ArgumentMatchers.any());
byte[] request = "this is a request".getBytes(US_ASCII);
interceptedLoggingCall.sendMessage(request);
interceptedLoggingCall.halfClose();
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedListener.get().onMessage(response);
Status status = Status.INTERNAL.withDescription("trailer description");
Metadata trailers = new Metadata();
interceptedListener.get().onClose(status, trailers);
interceptedLoggingCall.cancel(null, null);
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(5);
}
}
}

View File

@ -20,12 +20,10 @@ import static com.google.common.truth.Truth.assertThat;
import static io.grpc.gcp.observability.interceptors.LogHelperTest.BYTEARRAY_MARSHALLER;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -45,7 +43,6 @@ import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.internal.NoopServerCall;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.net.InetAddress;
@ -61,7 +58,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
@ -105,8 +101,6 @@ public class InternalLoggingServerInterceptorTest {
actualStatus = new AtomicReference<>();
actualTrailers = new AtomicReference<>();
peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234);
when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class)))
.thenReturn(true);
}
@Test
@ -121,7 +115,7 @@ public class InternalLoggingServerInterceptorTest {
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
FilterParams filterParams = FilterParams.create(true, 0, 0);
when(mockFilterHelper.isMethodToBeLogged(method)).thenReturn(filterParams);
when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), false)).thenReturn(filterParams);
capturedListener =
factory.create()
.interceptCall(
@ -312,7 +306,7 @@ public class InternalLoggingServerInterceptorTest {
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
FilterParams filterParams = FilterParams.create(true, 0, 0);
when(mockFilterHelper.isMethodToBeLogged(method)).thenReturn(filterParams);
when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), false)).thenReturn(filterParams);
final ServerCall<byte[], byte[]> noopServerCall = new NoopServerCall<byte[], byte[]>() {
@Override
public MethodDescriptor<byte[], byte[]> getMethodDescriptor() {
@ -365,7 +359,8 @@ public class InternalLoggingServerInterceptorTest {
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method)).thenReturn(FilterParams.create(false, 0, 0));
when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), false))
.thenReturn(FilterParams.create(false, 0, 0));
capturedListener =
factory.create()
.interceptCall(
@ -422,7 +417,7 @@ public class InternalLoggingServerInterceptorTest {
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
when(mockFilterHelper.logRpcMethod(method.getFullMethodName(), false))
.thenReturn(FilterParams.create(true, 10, 10));
capturedListener =
@ -483,85 +478,4 @@ public class InternalLoggingServerInterceptorTest {
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(7);
}
}
@Test
public void eventFilter_enabled() {
when(mockFilterHelper.isEventToBeLogged(EventType.CLIENT_HALF_CLOSE)).thenReturn(false);
Metadata clientInitial = new Metadata();
final MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
.thenReturn(FilterParams.create(true, 10, 10));
capturedListener =
factory.create()
.interceptCall(
new NoopServerCall<byte[], byte[]>() {
@Override
public void sendHeaders(Metadata headers) {
actualServerInitial.set(headers);
}
@Override
public void sendMessage(byte[] message) {
actualResponse.set(message);
}
@Override
public void close(Status status, Metadata trailers) {
actualStatus.set(status);
actualTrailers.set(trailers);
}
@Override
public MethodDescriptor<byte[], byte[]> getMethodDescriptor() {
return method;
}
@Override
public Attributes getAttributes() {
return Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer)
.build();
}
@Override
public String getAuthority() {
return "the-authority";
}
},
clientInitial,
(call, headers) -> {
interceptedLoggingCall.set(call);
return mockListener;
});
{
interceptedLoggingCall.get().sendHeaders(new Metadata());
byte[] request = "this is a request".getBytes(US_ASCII);
capturedListener.onMessage(request);
capturedListener.onHalfClose();
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedLoggingCall.get().sendMessage(response);
Status status = Status.INTERNAL.withDescription("trailer description");
Metadata trailers = new Metadata();
interceptedLoggingCall.get().close(status, trailers);
verify(mockLogHelper, never()).logHalfClose(
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
any(GrpcLogRecord.EventLogger.class),
anyString());
capturedListener.onCancel();
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(6);
}
}
}

View File

@ -169,10 +169,10 @@ public class GcpLogSinkTest {
@SuppressWarnings("unchecked")
public void emptyCustomTags_setSourceProject() {
Map<String, String> emptyCustomTags = null;
String destinationProjectId = "DESTINATION_PROJECT";
String projectId = "PROJECT";
Map<String, String> expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, LOCATION_TAGS,
destinationProjectId);
GcpLogSink sink = new GcpLogSink(mockLogging, destinationProjectId, LOCATION_TAGS,
projectId);
GcpLogSink sink = new GcpLogSink(mockLogging, projectId, LOCATION_TAGS,
emptyCustomTags, Collections.emptySet());
sink.write(LOG_PROTO);