observability: implement filtering for logging and integrate tags to GcpLogSink (#9016)

This commit is contained in:
DNVindhya 2022-03-29 21:16:47 -07:00 committed by GitHub
parent 957079194a
commit 898e03b700
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 2194 additions and 665 deletions

View File

@ -7,15 +7,29 @@ plugins {
}
description = "gRPC: Observability"
[compileJava].each() {
it.options.compilerArgs += [
// only has AutoValue annotation processor
"-Xlint:-processing"
]
appendToProperty(
it.options.errorprone.excludedPaths,
".*/build/generated/sources/annotationProcessor/java/.*",
"|")
}
dependencies {
def cloudLoggingVersion = '3.6.1'
annotationProcessor libraries.autovalue
api project(':grpc-api')
implementation project(':grpc-protobuf'),
project(':grpc-stub'),
project(':grpc-alts'),
libraries.google_auth_oauth2_http,
libraries.autovalue_annotation,
libraries.perfmark,
('com.google.guava:guava:31.0.1-jre'),
('com.google.errorprone:error_prone_annotations:2.11.0'),

View File

@ -21,8 +21,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ExperimentalApi;
import io.grpc.ManagedChannelProvider.ProviderNotFoundException;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.interceptors.ConfigFilterHelper;
import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.observability.interceptors.LogHelper;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;
import java.io.IOException;
@ -42,14 +45,13 @@ public final class Observability implements AutoCloseable {
if (instance == null) {
GlobalLoggingTags globalLoggingTags = new GlobalLoggingTags();
ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId());
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), 10);
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
instance = grpcInit(sink,
new InternalLoggingChannelInterceptor.FactoryImpl(sink,
globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(),
observabilityConfig),
new InternalLoggingServerInterceptor.FactoryImpl(sink,
globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(),
observabilityConfig));
new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper),
new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper));
}
return instance;
}

View File

@ -17,6 +17,7 @@
package io.grpc.observability;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.util.List;
public interface ObservabilityConfig {
/** Is Cloud Logging enabled. */
@ -26,10 +27,10 @@ public interface ObservabilityConfig {
String getDestinationProjectId();
/** Get filters set for logging. */
LogFilter[] getLogFilters();
List<LogFilter> getLogFilters();
/** Get event types to log. */
EventType[] getEventTypes();
List<EventType> getEventTypes();
/**
* POJO for representing a filter used in configuration.
@ -44,7 +45,14 @@ public interface ObservabilityConfig {
/** Number of bytes of each header to log. */
public final Integer messageBytes;
LogFilter(String pattern, Integer headerBytes, Integer messageBytes) {
/**
* Object used to represent filter used in configuration.
*
* @param pattern Pattern indicating which service/method to log
* @param headerBytes Number of bytes of each header to log
* @param messageBytes Number of bytes of each header to log
*/
public LogFilter(String pattern, Integer headerBytes, Integer messageBytes) {
this.pattern = pattern;
this.headerBytes = headerBytes;
this.messageBytes = messageBytes;

View File

@ -18,6 +18,7 @@ package io.grpc.observability;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.ImmutableList;
import io.grpc.internal.JsonParser;
import io.grpc.internal.JsonUtil;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
@ -25,14 +26,16 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
/** gRPC Observability configuration processor. */
/**
* gRPC Observability configuration processor.
*/
final class ObservabilityConfigImpl implements ObservabilityConfig {
private static final String CONFIG_ENV_VAR_NAME = "GRPC_CONFIG_OBSERVABILITY";
private boolean enableCloudLogging = true;
private String destinationProjectId = null;
private LogFilter[] logFilters;
private EventType[] eventTypes;
private List<LogFilter> logFilters;
private List<EventType> eventTypes;
static ObservabilityConfigImpl getInstance() throws IOException {
ObservabilityConfigImpl config = new ObservabilityConfigImpl();
@ -56,18 +59,20 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {
List<?> rawList = JsonUtil.getList(loggingConfig, "log_filters");
if (rawList != null) {
List<Map<String, ?>> jsonLogFilters = JsonUtil.checkObjectList(rawList);
this.logFilters = new LogFilter[jsonLogFilters.size()];
for (int i = 0; i < jsonLogFilters.size(); i++) {
this.logFilters[i] = parseJsonLogFilter(jsonLogFilters.get(i));
ImmutableList.Builder<LogFilter> logFiltersBuilder = new ImmutableList.Builder<>();
for (Map<String, ?> jsonLogFilter : jsonLogFilters) {
logFiltersBuilder.add(parseJsonLogFilter(jsonLogFilter));
}
this.logFilters = logFiltersBuilder.build();
}
rawList = JsonUtil.getList(loggingConfig, "event_types");
if (rawList != null) {
List<String> jsonEventTypes = JsonUtil.checkStringList(rawList);
this.eventTypes = new EventType[jsonEventTypes.size()];
for (int i = 0; i < jsonEventTypes.size(); i++) {
this.eventTypes[i] = convertEventType(jsonEventTypes.get(i));
ImmutableList.Builder<EventType> eventTypesBuilder = new ImmutableList.Builder<>();
for (String jsonEventType : jsonEventTypes) {
eventTypesBuilder.add(convertEventType(jsonEventType));
}
this.eventTypes = eventTypesBuilder.build();
}
}
}
@ -112,12 +117,12 @@ final class ObservabilityConfigImpl implements ObservabilityConfig {
}
@Override
public LogFilter[] getLogFilters() {
public List<LogFilter> getLogFilters() {
return logFilters;
}
@Override
public EventType[] getEventTypes() {
public List<EventType> getEventTypes() {
return eventTypes;
}
}

View File

@ -0,0 +1,221 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.observability.interceptors;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.grpc.Internal;
import io.grpc.MethodDescriptor;
import io.grpc.observability.ObservabilityConfig;
import io.grpc.observability.ObservabilityConfig.LogFilter;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Parses gRPC Observability configuration filters for interceptors usage.
*/
@Internal
public class ConfigFilterHelper {
private static final Logger logger = Logger.getLogger(ConfigFilterHelper.class.getName());
public static final FilterParams NO_FILTER_PARAMS
= FilterParams.create(false, 0, 0);
public static final String globalPattern = "*";
private final ObservabilityConfig config;
@VisibleForTesting
boolean methodOrServiceFilterPresent;
// Flag to log every service and method
@VisibleForTesting
Map<String, FilterParams> perServiceFilters;
@VisibleForTesting
Map<String, FilterParams> perMethodFilters;
@VisibleForTesting
Set<EventType> logEventTypeSet;
@VisibleForTesting
ConfigFilterHelper(ObservabilityConfig config) {
this.config = config;
this.methodOrServiceFilterPresent = false;
this.perServiceFilters = new HashMap<>();
this.perMethodFilters = new HashMap<>();
}
/**
* Creates and returns helper instance for log filtering.
*
* @param config processed ObservabilityConfig object
* @return helper instance for filtering
*/
public static ConfigFilterHelper factory(ObservabilityConfig config) {
ConfigFilterHelper filterHelper = new ConfigFilterHelper(config);
if (config.isEnableCloudLogging()) {
filterHelper.setMethodOrServiceFilterMaps();
filterHelper.setEventFilterSet();
}
return filterHelper;
}
@VisibleForTesting
void setMethodOrServiceFilterMaps() {
List<LogFilter> logFilters = config.getLogFilters();
if (logFilters == null) {
return;
}
Map<String, FilterParams> perServiceFilters = new HashMap<>();
Map<String, FilterParams> perMethodFilters = new HashMap<>();
for (LogFilter currentFilter : logFilters) {
// '*' for global, 'service/*' for service glob, or 'service/method' for fully qualified
String methodOrServicePattern = currentFilter.pattern;
int currentHeaderBytes
= currentFilter.headerBytes != null ? currentFilter.headerBytes : 0;
int currentMessageBytes
= currentFilter.messageBytes != null ? currentFilter.messageBytes : 0;
if (methodOrServicePattern.equals("*")) {
// parse config for global, e.g. "*"
if (perServiceFilters.containsKey(globalPattern)) {
logger.log(Level.WARNING, "Duplicate entry : {0}", methodOrServicePattern);
continue;
}
FilterParams params = FilterParams.create(true,
currentHeaderBytes, currentMessageBytes);
perServiceFilters.put(globalPattern, params);
} else if (methodOrServicePattern.endsWith("/*")) {
// TODO(DNVindhya): check if service name is a valid string for a service name
// parse config for a service, e.g. "service/*"
String service = MethodDescriptor.extractFullServiceName(methodOrServicePattern);
if (perServiceFilters.containsKey(service)) {
logger.log(Level.WARNING, "Duplicate entry : {0)", methodOrServicePattern);
continue;
}
FilterParams params = FilterParams.create(true,
currentHeaderBytes, currentMessageBytes);
perServiceFilters.put(service, params);
} else {
// TODO(DNVVindhya): check if methodOrServicePattern is a valid full qualified method name
// parse pattern for a fully qualified method, e.g "service/method"
if (perMethodFilters.containsKey(methodOrServicePattern)) {
logger.log(Level.WARNING, "Duplicate entry : {0}", methodOrServicePattern);
continue;
}
FilterParams params = FilterParams.create(true,
currentHeaderBytes, currentMessageBytes);
perMethodFilters.put(methodOrServicePattern, params);
}
}
this.perServiceFilters = ImmutableMap.copyOf(perServiceFilters);
this.perMethodFilters = ImmutableMap.copyOf(perMethodFilters);
if (!perServiceFilters.isEmpty() || !perMethodFilters.isEmpty()) {
this.methodOrServiceFilterPresent = true;
}
}
@VisibleForTesting
void setEventFilterSet() {
List<EventType> eventFilters = config.getEventTypes();
if (eventFilters == null) {
return;
}
if (eventFilters.isEmpty()) {
this.logEventTypeSet = ImmutableSet.of();
return;
}
this.logEventTypeSet = ImmutableSet.copyOf(eventFilters);
}
/**
* Class containing results for method/service filter information, such as flag for logging
* method/service and payload limits to be used for filtering.
*/
@AutoValue
public abstract static class FilterParams {
abstract boolean log();
abstract int headerBytes();
abstract int messageBytes();
@VisibleForTesting
public static FilterParams create(boolean log, int headerBytes, int messageBytes) {
return new AutoValue_ConfigFilterHelper_FilterParams(
log, headerBytes, messageBytes);
}
}
/**
* Checks if the corresponding service/method passed needs to be logged as per the user provided
* configuration.
*
* @param method the fully qualified name of the method
* @return MethodFilterParams object 1. specifies if the corresponding method needs to be logged
* (log field will be set to true) 2. values of payload limits retrieved from configuration
*/
public FilterParams isMethodToBeLogged(MethodDescriptor<?, ?> method) {
FilterParams params = NO_FILTER_PARAMS;
if (methodOrServiceFilterPresent) {
String fullMethodName = method.getFullMethodName();
if (perMethodFilters.containsKey(fullMethodName)) {
params = perMethodFilters.get(fullMethodName);
} else {
String serviceName = method.getServiceName();
if (perServiceFilters.containsKey(serviceName)) {
params = perServiceFilters.get(serviceName);
} else if (perServiceFilters.containsKey(globalPattern)) {
params = perServiceFilters.get(globalPattern);
}
}
}
return params;
}
/**
* Checks if the corresponding event passed needs to be logged as per the user provided
* configuration.
*
* <p> All events are logged by default if event_types is not specified or {} in configuration.
* If event_types is specified as [], no events will be logged.
* If events types is specified as a non-empty list, only the events specified in the
* list will be logged.
* </p>
*
* @param event gRPC observability event
* @return true if event needs to be logged, false otherwise
*/
public boolean isEventToBeLogged(EventType event) {
if (logEventTypeSet == null) {
return true;
}
boolean logEvent;
if (logEventTypeSet.isEmpty()) {
logEvent = false;
} else {
logEvent = logEventTypeSet.contains(event);
}
return logEvent;
}
}

View File

@ -30,12 +30,9 @@ import io.grpc.Internal;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.ObservabilityConfig;
import io.grpc.observability.logging.Sink;
import io.grpc.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -47,44 +44,39 @@ import java.util.logging.Logger;
*/
@Internal
public final class InternalLoggingChannelInterceptor implements ClientInterceptor {
private static final Logger logger = Logger
.getLogger(InternalLoggingChannelInterceptor.class.getName());
private final LogHelper helper;
private final ConfigFilterHelper filterHelper;
public interface Factory {
ClientInterceptor create();
}
public static class FactoryImpl implements Factory {
private final Sink sink;
private final LogHelper helper;
/** Create the {@link Factory} we need to create our {@link ClientInterceptor}s. */
public FactoryImpl(Sink sink, Map<String, String> locationTags, Map<String, String> customTags,
ObservabilityConfig observabilityConfig) {
this.sink = sink;
this.helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER, locationTags, customTags,
observabilityConfig);
private final LogHelper helper;
private final ConfigFilterHelper filterHelper;
/**
* Create the {@link Factory} we need to create our {@link ClientInterceptor}s.
*/
public FactoryImpl(LogHelper helper, ConfigFilterHelper filterHelper) {
this.helper = helper;
this.filterHelper = filterHelper;
}
@Override
public ClientInterceptor create() {
return new InternalLoggingChannelInterceptor(helper);
}
/**
* Closes the sink instance.
*/
public void close() {
if (sink != null) {
sink.close();
}
return new InternalLoggingChannelInterceptor(helper, filterHelper);
}
}
private InternalLoggingChannelInterceptor(LogHelper helper) {
private InternalLoggingChannelInterceptor(LogHelper helper, ConfigFilterHelper filterHelper) {
this.helper = helper;
this.filterHelper = filterHelper;
}
@Override
@ -100,12 +92,14 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
final Deadline deadline = LogHelper.min(callOptions.getDeadline(),
Context.current().getDeadline());
// TODO (dnvindhya): implement isMethodToBeLogged() to check for methods to be logged
// according to config. Until then always return true.
if (!helper.isMethodToBeLogged(method.getFullMethodName())) {
FilterParams filterParams = filterHelper.isMethodToBeLogged(method);
if (!filterParams.log()) {
return next.newCall(method, callOptions);
}
final int maxHeaderBytes = filterParams.headerBytes();
final int maxMessageBytes = filterParams.messageBytes();
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
@ -116,6 +110,7 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) {
try {
helper.logRequestHeader(
seq.getAndIncrement(),
@ -124,6 +119,7 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
authority,
timeout,
headers,
maxHeaderBytes,
EventLogger.LOGGER_CLIENT,
rpcId,
null);
@ -136,48 +132,57 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
// 2. NullPointerException
logger.log(Level.SEVERE, "Unable to log request header", e);
}
}
Listener<RespT> observabilityListener =
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onMessage(RespT message) {
// Event: EventType.GRPC_CALL_RESPONSE_MESSAGE
EventType responseMessageType = EventType.GRPC_CALL_RESPONSE_MESSAGE;
if (filterHelper.isEventToBeLogged(responseMessageType)) {
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE,
responseMessageType,
message,
maxMessageBytes,
EventLogger.LOGGER_CLIENT,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
}
super.onMessage(message);
}
@Override
public void onHeaders(Metadata headers) {
// Event: EventType.GRPC_CALL_RESPONSE_HEADER
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) {
try {
helper.logResponseHeader(
seq.getAndIncrement(),
serviceName,
methodName,
headers,
maxHeaderBytes,
EventLogger.LOGGER_CLIENT,
rpcId,
LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
}
super.onHeaders(headers);
}
@Override
public void onClose(Status status, Metadata trailers) {
// Event: EventType.GRPC_CALL_TRAILER
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) {
try {
helper.logTrailer(
seq.getAndIncrement(),
@ -185,12 +190,14 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
methodName,
status,
trailers,
maxHeaderBytes,
EventLogger.LOGGER_CLIENT,
rpcId,
LogHelper.getPeerAddress(getAttributes()));
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
}
super.onClose(status, trailers);
}
};
@ -200,24 +207,29 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
@Override
public void sendMessage(ReqT message) {
// Event: EventType.GRPC_CALL_REQUEST_MESSAGE
EventType requestMessageType = EventType.GRPC_CALL_REQUEST_MESSAGE;
if (filterHelper.isEventToBeLogged(requestMessageType)) {
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE,
requestMessageType,
message,
maxMessageBytes,
EventLogger.LOGGER_CLIENT,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
}
super.sendMessage(message);
}
@Override
public void halfClose() {
// Event: EventType.GRPC_CALL_HALF_CLOSE
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) {
try {
helper.logHalfClose(
seq.getAndIncrement(),
@ -228,12 +240,14 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
}
super.halfClose();
}
@Override
public void cancel(String message, Throwable cause) {
// Event: EventType.GRPC_CALL_CANCEL
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) {
try {
helper.logCancel(
seq.getAndIncrement(),
@ -244,6 +258,7 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
}
super.cancel(message, cause);
}
};

View File

@ -28,61 +28,55 @@ import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.ObservabilityConfig;
import io.grpc.observability.logging.Sink;
import io.grpc.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.net.SocketAddress;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
/** A logging interceptor for {@code LoggingServerProvider}. */
/**
* A logging interceptor for {@code LoggingServerProvider}.
*/
@Internal
public final class InternalLoggingServerInterceptor implements ServerInterceptor {
private static final Logger logger = Logger
.getLogger(InternalLoggingServerInterceptor.class.getName());
private final LogHelper helper;
private final ConfigFilterHelper filterHelper;
public interface Factory {
ServerInterceptor create();
}
public static class FactoryImpl implements Factory {
private final Sink sink;
private final LogHelper helper;
/** Create the {@link Factory} we need to create our {@link ServerInterceptor}s. */
public FactoryImpl(Sink sink, Map<String, String> locationTags,
Map<String, String> customTags,
ObservabilityConfig observabilityConfig) {
this.sink = sink;
this.helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER, locationTags, customTags,
observabilityConfig);
private final LogHelper helper;
private final ConfigFilterHelper filterHelper;
/**
* Create the {@link Factory} we need to create our {@link ServerInterceptor}s.
*/
public FactoryImpl(LogHelper helper, ConfigFilterHelper filterHelper) {
this.helper = helper;
this.filterHelper = filterHelper;
}
@Override
public ServerInterceptor create() {
return new InternalLoggingServerInterceptor(helper);
}
/**
* Closes the sink instance.
*/
public void close() {
if (sink != null) {
sink.close();
}
return new InternalLoggingServerInterceptor(helper, filterHelper);
}
}
private InternalLoggingServerInterceptor(LogHelper helper) {
private InternalLoggingServerInterceptor(LogHelper helper, ConfigFilterHelper filterHelper) {
this.helper = helper;
this.filterHelper = filterHelper;
}
@Override
@ -98,13 +92,16 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));
// TODO (dnvindhya): implement isMethodToBeLogged() to check for methods to be logged
// according to config. Until then always return true.
if (!helper.isMethodToBeLogged(call.getMethodDescriptor().getFullMethodName())) {
FilterParams filterParams = filterHelper.isMethodToBeLogged(call.getMethodDescriptor());
if (!filterParams.log()) {
return next.startCall(call, headers);
}
final int maxHeaderBytes = filterParams.headerBytes();
final int maxMessageBytes = filterParams.messageBytes();
// Event: EventType.GRPC_CALL_REQUEST_HEADER
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)) {
try {
helper.logRequestHeader(
seq.getAndIncrement(),
@ -113,6 +110,7 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
authority,
timeout,
headers,
maxHeaderBytes,
EventLogger.LOGGER_SERVER,
rpcId,
peerAddress);
@ -125,48 +123,57 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
// 2. NullPointerException
logger.log(Level.SEVERE, "Unable to log request header", e);
}
}
ServerCall<ReqT, RespT> wrapperCall =
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata headers) {
// Event: EventType.GRPC_CALL_RESPONSE_HEADER
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)) {
try {
helper.logResponseHeader(
seq.getAndIncrement(),
serviceName,
methodName,
headers,
maxHeaderBytes,
EventLogger.LOGGER_SERVER,
rpcId,
null);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response header", e);
}
}
super.sendHeaders(headers);
}
@Override
public void sendMessage(RespT message) {
// Event: EventType.GRPC_CALL_RESPONSE_MESSAGE
EventType responseMessageType = EventType.GRPC_CALL_RESPONSE_MESSAGE;
if (filterHelper.isEventToBeLogged(responseMessageType)) {
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE,
responseMessageType,
message,
maxMessageBytes,
EventLogger.LOGGER_SERVER,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log response message", e);
}
}
super.sendMessage(message);
}
@Override
public void close(Status status, Metadata trailers) {
// Event: EventType.GRPC_CALL_TRAILER
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)) {
try {
helper.logTrailer(
seq.getAndIncrement(),
@ -174,12 +181,14 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
methodName,
status,
trailers,
maxHeaderBytes,
EventLogger.LOGGER_SERVER,
rpcId,
null);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log trailer", e);
}
}
super.close(status, trailers);
}
};
@ -189,24 +198,29 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
@Override
public void onMessage(ReqT message) {
// Event: EventType.GRPC_CALL_REQUEST_MESSAGE
EventType requestMessageType = EventType.GRPC_CALL_REQUEST_MESSAGE;
if (filterHelper.isEventToBeLogged(requestMessageType)) {
try {
helper.logRpcMessage(
seq.getAndIncrement(),
serviceName,
methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE,
requestMessageType,
message,
maxMessageBytes,
EventLogger.LOGGER_SERVER,
rpcId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log request message", e);
}
}
super.onMessage(message);
}
@Override
public void onHalfClose() {
// Event: EventType.GRPC_CALL_HALF_CLOSE
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)) {
try {
helper.logHalfClose(
seq.getAndIncrement(),
@ -217,12 +231,14 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log half close", e);
}
}
super.onHalfClose();
}
@Override
public void onCancel() {
// Event: EventType.GRPC_CALL_CANCEL
if (filterHelper.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)) {
try {
helper.logCancel(
seq.getAndIncrement(),
@ -233,6 +249,7 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
} catch (Exception e) {
logger.log(Level.SEVERE, "Unable to log cancel", e);
}
}
super.onCancel();
}
};

View File

@ -30,7 +30,6 @@ import io.grpc.InternalMetadata;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.ObservabilityConfig;
import io.grpc.observability.logging.Sink;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.Address;
@ -42,7 +41,9 @@ import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@ -50,10 +51,10 @@ import javax.annotation.Nullable;
/**
* Helper class for GCP observability logging.
*/
class LogHelper {
public class LogHelper {
private static final Logger logger = Logger.getLogger(LogHelper.class.getName());
// TODO(dnvindhya): Define it in one places(TBD) to make it easily accessible from everywhere
// TODO(DNVindhya): Define it in one places(TBD) to make it easily accessible from everywhere
static final Metadata.Key<byte[]> STATUS_DETAILS_KEY =
Metadata.Key.of(
"grpc-status-details-bin",
@ -61,18 +62,16 @@ class LogHelper {
private final Sink sink;
private final TimeProvider timeProvider;
// TODO(DNvindhya) remove unused annotation once the following 2 are actually used
@SuppressWarnings({"unused"}) private final Map<String, String> locationTags;
@SuppressWarnings({"unused"}) private final Map<String, String> customTags;
@SuppressWarnings({"unused"}) private final ObservabilityConfig observabilityConfig;
LogHelper(Sink sink, TimeProvider timeProvider, Map<String, String> locationTags,
Map<String, String> customTags, ObservabilityConfig observabilityConfig) {
/**
* Creates a LogHelper instance.
*
* @param sink sink
* @param timeProvider timeprovider
*/
public LogHelper(Sink sink, TimeProvider timeProvider) {
this.sink = sink;
this.timeProvider = timeProvider;
this.locationTags = locationTags;
this.customTags = customTags;
this.observabilityConfig = observabilityConfig;
}
/**
@ -85,6 +84,7 @@ class LogHelper {
String authority,
@Nullable Duration timeout,
Metadata metadata,
int maxHeaderBytes,
GrpcLogRecord.EventLogger eventLogger,
String rpcId,
// null on client side
@ -96,7 +96,8 @@ class LogHelper {
peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_SERVER,
"peerAddress can only be specified by server");
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair = createMetadataProto(metadata);
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair =
createMetadataProto(metadata, maxHeaderBytes);
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
@ -107,6 +108,7 @@ class LogHelper {
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setMetadata(pair.payload)
.setPayloadSize(pair.size)
.setPayloadTruncated(pair.truncated)
.setRpcId(rpcId);
if (timeout != null) {
logEntryBuilder.setTimeout(timeout);
@ -118,13 +120,14 @@ class LogHelper {
}
/**
* Logs the reponse header. Binary logging equivalent of logServerHeader.
* Logs the response header. Binary logging equivalent of logServerHeader.
*/
void logResponseHeader(
long seqId,
String serviceName,
String methodName,
Metadata metadata,
int maxHeaderBytes,
GrpcLogRecord.EventLogger eventLogger,
String rpcId,
@Nullable SocketAddress peerAddress) {
@ -137,7 +140,8 @@ class LogHelper {
peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT,
"peerAddress can only be specified for client");
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair = createMetadataProto(metadata);
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair =
createMetadataProto(metadata, maxHeaderBytes);
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
@ -147,6 +151,7 @@ class LogHelper {
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setMetadata(pair.payload)
.setPayloadSize(pair.size)
.setPayloadTruncated(pair.truncated)
.setRpcId(rpcId);
if (peerAddress != null) {
logEntryBuilder.setPeerAddress(socketAddressToProto(peerAddress));
@ -163,6 +168,7 @@ class LogHelper {
String methodName,
Status status,
Metadata metadata,
int maxHeaderBytes,
GrpcLogRecord.EventLogger eventLogger,
String rpcId,
@Nullable SocketAddress peerAddress) {
@ -174,7 +180,8 @@ class LogHelper {
peerAddress == null || eventLogger == GrpcLogRecord.EventLogger.LOGGER_CLIENT,
"peerAddress can only be specified for client");
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair = createMetadataProto(metadata);
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair =
createMetadataProto(metadata, maxHeaderBytes);
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
.setSequenceId(seqId)
.setServiceName(serviceName)
@ -184,6 +191,7 @@ class LogHelper {
.setLogLevel(LogLevel.LOG_LEVEL_DEBUG)
.setMetadata(pair.payload)
.setPayloadSize(pair.size)
.setPayloadTruncated(pair.truncated)
.setStatusCode(status.getCode().value())
.setRpcId(rpcId);
String statusDescription = status.getDescription();
@ -209,6 +217,7 @@ class LogHelper {
String methodName,
EventType eventType,
T message,
int maxMessageBytes,
EventLogger eventLogger,
String rpcId) {
checkNotNull(serviceName, "serviceName");
@ -220,7 +229,7 @@ class LogHelper {
"event type must correspond to client message or server message");
checkNotNull(message, "message");
// TODO(dnvindhya): Implement conversion of generics to ByteString
// TODO(DNVindhya): Implement conversion of generics to ByteString
// Following is a temporary workaround to log if message is of following types :
// 1. com.google.protobuf.Message
// 2. byte[]
@ -235,7 +244,7 @@ class LogHelper {
}
PayloadBuilder<ByteString> pair = null;
if (messageBytesArray != null) {
pair = createMesageProto(messageBytesArray);
pair = createMessageProto(messageBytesArray, maxMessageBytes);
}
GrpcLogRecord.Builder logEntryBuilder = createTimestamp()
@ -250,7 +259,8 @@ class LogHelper {
logEntryBuilder.setPayloadSize(pair.size);
}
if (pair != null && pair.payload != null) {
logEntryBuilder.setMessage(pair.payload);
logEntryBuilder.setMessage(pair.payload)
.setPayloadTruncated(pair.truncated);
}
sink.write(logEntryBuilder.build());
}
@ -308,45 +318,79 @@ class LogHelper {
return GrpcLogRecord.newBuilder().setTimestamp(Timestamps.fromNanos(nanos));
}
// TODO(DNVindhya): Evaluate if we need following clause for metadata logging in Observability
// Leaving the implementation for now as is to have same behavior across Java and Go
private static final Set<String> NEVER_INCLUDED_METADATA = new HashSet<>(
Collections.singletonList(
// grpc-status-details-bin is already logged in `status_details` field of the
// observabilitylog proto
STATUS_DETAILS_KEY.name()));
private static final Set<String> ALWAYS_INCLUDED_METADATA = new HashSet<>(
Collections.singletonList(
"grpc-trace-bin"));
static final class PayloadBuilder<T> {
T payload;
int size;
boolean truncated;
private PayloadBuilder(T payload, int size) {
private PayloadBuilder(T payload, int size, boolean truncated) {
this.payload = payload;
this.size = size;
this.truncated = truncated;
}
}
// TODO(dnvindhya): Create a unit test for the metadata conversion
static PayloadBuilder<GrpcLogRecord.Metadata.Builder> createMetadataProto(Metadata metadata) {
static PayloadBuilder<GrpcLogRecord.Metadata.Builder> createMetadataProto(Metadata metadata,
int maxHeaderBytes) {
checkNotNull(metadata, "metadata");
checkArgument(maxHeaderBytes >= 0,
"maxHeaderBytes must be non negative");
GrpcLogRecord.Metadata.Builder metadataBuilder = GrpcLogRecord.Metadata.newBuilder();
// This code is tightly coupled with io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata's
// This code is tightly coupled with io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
// implementation
byte[][] serialized = InternalMetadata.serialize(metadata);
boolean truncated = false;
int totalMetadataBytes = 0;
if (serialized != null) {
int singleMetadataEntryBytes = 0;
// Calculate bytes for each GrpcLogRecord.Metadata.MetadataEntry
for (int i = 0; i < serialized.length; i += 2) {
String key = new String(serialized[i], Charsets.UTF_8);
byte[] value = serialized[i + 1];
singleMetadataEntryBytes = totalMetadataBytes + key.length() + value.length;
if (NEVER_INCLUDED_METADATA.contains(key)) {
continue;
}
boolean forceInclude = ALWAYS_INCLUDED_METADATA.contains(key);
int metadataBytesAfterAdd = totalMetadataBytes + key.length() + value.length;
if (!forceInclude && metadataBytesAfterAdd > maxHeaderBytes) {
truncated = true;
continue;
}
metadataBuilder.addEntryBuilder()
.setKey(key)
.setValue(ByteString.copyFrom(value));
totalMetadataBytes = singleMetadataEntryBytes;
if (!forceInclude) {
// force included keys do not count towards the size limit
totalMetadataBytes = metadataBytesAfterAdd;
}
}
return new PayloadBuilder<>(metadataBuilder, totalMetadataBytes);
}
return new PayloadBuilder<>(metadataBuilder, totalMetadataBytes, truncated);
}
static PayloadBuilder<ByteString> createMesageProto(byte[] message) {
static PayloadBuilder<ByteString> createMessageProto(byte[] message, int maxMessageBytes) {
checkArgument(maxMessageBytes >= 0,
"maxMessageBytes must be non negative");
int desiredBytes = 0;
int messageLength = message.length;
if (maxMessageBytes > 0) {
desiredBytes = Math.min(maxMessageBytes, messageLength);
}
ByteString messageData =
ByteString.copyFrom(message, 0, messageLength);
return new PayloadBuilder<ByteString>(messageData, messageLength);
ByteString.copyFrom(message, 0, desiredBytes);
return new PayloadBuilder<>(messageData, messageLength,
maxMessageBytes < message.length);
}
static Address socketAddressToProto(SocketAddress address) {
@ -366,7 +410,7 @@ class LogHelper {
}
builder.setIpPort(((InetSocketAddress) address).getPort());
} else if (address.getClass().getName().equals("io.netty.channel.unix.DomainSocketAddress")) {
// To avoid a compile time dependency on grpc-netty, we check against the
// To avoid a compiled time dependency on grpc-netty, we check against the
// runtime class name.
builder.setType(Address.Type.TYPE_UNIX)
.setAddress(address.toString());
@ -395,10 +439,4 @@ class LogHelper {
}
return deadline0.minimum(deadline1);
}
// TODO (dnvindhya) : Implement service and method name filtering
// Add unit tests for the method as part of filtering implementation
boolean isMethodToBeLogged(String fullMethodName) {
return true;
}
}

View File

@ -24,12 +24,15 @@ import com.google.cloud.logging.Payload.JsonPayload;
import com.google.cloud.logging.Severity;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.util.JsonFormat;
import io.grpc.internal.JsonParser;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -39,10 +42,19 @@ import java.util.logging.Logger;
public class GcpLogSink implements Sink {
private final Logger logger = Logger.getLogger(GcpLogSink.class.getName());
// TODO (dnvindhya): Make cloud logging service a configurable value
// TODO(DNVindhya): Make cloud logging service a configurable value
private static final String SERVICE_TO_EXCLUDE = "google.logging.v2.LoggingServiceV2";
private static final String DEFAULT_LOG_NAME = "grpc";
private static final String K8S_MONITORED_RESOURCE_TYPE = "k8s_container";
private static final Set<String> kubernetesResourceLabelSet
= ImmutableSet.of("project_id", "location", "cluster_name", "namespace_name",
"pod_name", "container_name");
private static final int FALLBACK_FLUSH_LIMIT = 100;
private final Map<String, String> customTags;
private final Logging gcpLoggingClient;
private final MonitoredResource kubernetesResource;
private final int flushLimit;
private int flushCounter;
private static Logging createLoggingClient(String projectId) {
LoggingOptions.Builder builder = LoggingOptions.newBuilder();
@ -57,15 +69,27 @@ public class GcpLogSink implements Sink {
*
* @param destinationProjectId cloud project id to write logs
*/
public GcpLogSink(String destinationProjectId) {
this(createLoggingClient(destinationProjectId));
public GcpLogSink(String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, int flushLimit) {
this(createLoggingClient(destinationProjectId), locationTags, customTags, flushLimit);
}
@VisibleForTesting
GcpLogSink(Logging client) {
GcpLogSink(Logging client, Map<String, String> locationTags, Map<String, String> customTags,
int flushLimit) {
this.gcpLoggingClient = client;
this.customTags = customTags != null ? customTags : new HashMap<>();
this.kubernetesResource = getResource(locationTags);
this.flushLimit = flushLimit != 0 ? flushLimit : FALLBACK_FLUSH_LIMIT;
this.flushCounter = 0;
}
/**
* Writes logs to GCP Cloud Logging.
*
* @param logProto gRPC logging proto containing the message to be logged
*/
@Override
public void write(GrpcLogRecord logProto) {
if (gcpLoggingClient == null) {
@ -78,22 +102,45 @@ public class GcpLogSink implements Sink {
try {
GrpcLogRecord.EventType event = logProto.getEventType();
Severity logEntrySeverity = getCloudLoggingLevel(logProto.getLogLevel());
// TODO(vindhyan): make sure all (int, long) values are not displayed as double
LogEntry grpcLogEntry =
// TODO(DNVindhya): make sure all (int, long) values are not displayed as double
// For now, every value is being converted as string because of JsonFormat.printer().print
LogEntry.Builder grpcLogEntryBuilder =
LogEntry.newBuilder(JsonPayload.of(protoToMapConverter(logProto)))
.setSeverity(logEntrySeverity)
.setLogName(DEFAULT_LOG_NAME)
.setResource(MonitoredResource.newBuilder("global").build())
.build();
.setResource(kubernetesResource);
if (!customTags.isEmpty()) {
grpcLogEntryBuilder.setLabels(customTags);
}
LogEntry grpcLogEntry = grpcLogEntryBuilder.build();
synchronized (this) {
logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", event);
gcpLoggingClient.write(Collections.singleton(grpcLogEntry));
flushCounter += 1;
if (flushCounter >= flushLimit) {
gcpLoggingClient.flush();
flushCounter = 0;
}
}
} catch (Exception e) {
logger.log(Level.SEVERE, "Caught exception while writing to Cloud Logging", e);
}
}
@VisibleForTesting
static MonitoredResource getResource(Map<String, String> resourceTags) {
MonitoredResource.Builder builder = MonitoredResource.newBuilder(K8S_MONITORED_RESOURCE_TYPE);
if ((resourceTags != null) && !resourceTags.isEmpty()) {
for (Map.Entry<String, String> entry : resourceTags.entrySet()) {
String resourceKey = entry.getKey();
if (kubernetesResourceLabelSet.contains(resourceKey)) {
builder.addLabel(resourceKey, entry.getValue());
}
}
}
return builder.build();
}
@SuppressWarnings("unchecked")
private Map<String, Object> protoToMapConverter(GrpcLogRecord logProto)
throws IOException {

View File

@ -34,9 +34,9 @@ import io.grpc.ManagedChannelBuilder;
import io.grpc.ManagedChannelProvider;
import io.grpc.MethodDescriptor;
import io.grpc.TlsChannelCredentials;
import io.grpc.observability.interceptors.ConfigFilterHelper;
import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;
import io.grpc.observability.interceptors.LogHelper;
import io.grpc.testing.TestMethodDescriptors;
import org.junit.Rule;
import org.junit.Test;
@ -58,13 +58,14 @@ public class LoggingChannelProviderTest {
public void initTwiceCausesException() {
ManagedChannelProvider prevProvider = ManagedChannelProvider.provider();
assertThat(prevProvider).isNotInstanceOf(LoggingChannelProvider.class);
Sink mockSink = mock(GcpLogSink.class);
LogHelper mockLogHelper = mock(LogHelper.class);
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
LoggingChannelProvider.init(
new InternalLoggingChannelInterceptor.FactoryImpl(mockSink, null, null, null));
new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper));
assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class);
try {
LoggingChannelProvider.init(
new InternalLoggingChannelInterceptor.FactoryImpl(mockSink, null, null, null));
new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper));
fail("should have failed for calling init() again");
} catch (IllegalStateException e) {
assertThat(e).hasMessageThat().contains("LoggingChannelProvider already initialized!");

View File

@ -35,9 +35,9 @@ import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerProvider;
import io.grpc.observability.interceptors.ConfigFilterHelper;
import io.grpc.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;
import io.grpc.observability.interceptors.LogHelper;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.protobuf.SimpleRequest;
@ -59,13 +59,14 @@ public class LoggingServerProviderTest {
public void initTwiceCausesException() {
ServerProvider prevProvider = ServerProvider.provider();
assertThat(prevProvider).isNotInstanceOf(LoggingServerProvider.class);
Sink mockSink = mock(GcpLogSink.class);
LogHelper mockLogHelper = mock(LogHelper.class);
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
LoggingServerProvider.init(
new InternalLoggingServerInterceptor.FactoryImpl(mockSink, null, null, null));
new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper));
assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class);
try {
LoggingServerProvider.init(
new InternalLoggingServerInterceptor.FactoryImpl(mockSink, null, null, null));
new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper));
fail("should have failed for calling init() again");
} catch (IllegalStateException e) {
assertThat(e).hasMessageThat().contains("LoggingServerProvider already initialized!");

View File

@ -17,23 +17,37 @@
package io.grpc.observability;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import io.grpc.ManagedChannel;
import com.google.common.collect.ImmutableMap;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.interceptors.ConfigFilterHelper;
import io.grpc.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.observability.interceptors.LogHelper;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
import java.io.IOException;
import java.util.Map;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
@RunWith(JUnit4.class)
public class LoggingTest {
@ -41,31 +55,129 @@ public class LoggingTest {
@Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
private static final String PROJECT_ID = "project-id";
private static final String PROJECT_ID = "PROJECT";
private static final Map<String, String> locationTags = ImmutableMap.of(
"project_id", "PROJECT",
"location", "us-central1-c",
"cluster_name", "grpc-observability-cluster",
"namespace_name", "default" ,
"pod_name", "app1-6c7c58f897-n92c5");
private static final Map<String, String> customTags = ImmutableMap.of(
"KEY1", "Value1",
"KEY2", "VALUE2");
private static final int flushLimit = 100;
/**
* Cloud logging test using LoggingChannelProvider and LoggingServerProvider.
*
* <p> Ignoring test, because it calls external CLoud Logging APIs.
* To test cloud logging setup,
* 1. Set up Cloud Logging Auth credentials
* 2. Assign permissions to service account to write logs to project specified by
* variable PROJECT_ID
* 3. Comment @Ignore annotation
* </p>
*/
@Ignore
@Test
public void clientServer_interceptorCalled()
public void clientServer_interceptorCalled_logAlways()
throws IOException {
Sink sink = new GcpLogSink(PROJECT_ID);
Sink sink = new GcpLogSink(PROJECT_ID, locationTags, customTags, flushLimit);
LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER));
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
FilterParams logAlwaysFilterParams =
FilterParams.create(true, 0, 0);
when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class)))
.thenReturn(logAlwaysFilterParams);
when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class)))
.thenReturn(true);
LoggingServerProvider.init(
new InternalLoggingServerInterceptor.FactoryImpl(sink, null, null, null));
new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper));
Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl())
.build().start();
int port = cleanupRule.register(server).getPort();
LoggingChannelProvider.init(
new InternalLoggingChannelInterceptor.FactoryImpl(sink, null, null, null));
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", port)
.usePlaintext().build();
new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper));
SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(channel));
cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port)
.usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
assertThat(Mockito.mockingDetails(spyLogHelper).getInvocations().size()).isGreaterThan(11);
sink.close();
LoggingChannelProvider.shutdown();
LoggingServerProvider.shutdown();
}
@Test
public void clientServer_interceptorCalled_logNever() throws IOException {
Sink mockSink = mock(GcpLogSink.class);
LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER));
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
FilterParams logNeverFilterParams =
FilterParams.create(false, 0, 0);
when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class)))
.thenReturn(logNeverFilterParams);
when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class)))
.thenReturn(true);
LoggingServerProvider.init(
new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper));
Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl())
.build().start();
int port = cleanupRule.register(server).getPort();
LoggingChannelProvider.init(
new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper));
SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port)
.usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
verifyNoInteractions(spyLogHelper);
verifyNoInteractions(mockSink);
LoggingChannelProvider.shutdown();
LoggingServerProvider.shutdown();
}
@Test
public void clientServer_interceptorCalled_doNotLogMessageEvents() throws IOException {
LogHelper mockLogHelper = mock(LogHelper.class);
ConfigFilterHelper mockFilterHelper2 = mock(ConfigFilterHelper.class);
FilterParams logAlwaysFilterParams =
FilterParams.create(true, 0, 0);
when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class)))
.thenReturn(logAlwaysFilterParams);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE))
.thenReturn(false);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE))
.thenReturn(false);
LoggingServerProvider.init(
new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2));
Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl())
.build().start();
int port = cleanupRule.register(server).getPort();
LoggingChannelProvider.init(
new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2));
SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port)
.usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
// Total number of calls should have been 14 (6 from client and 6 from server)
// Since cancel is not invoked, it will be 12.
// Request message(Total count:2 (1 from client and 1 from server) and Response message(count:2)
// events are not in the event_types list, i.e 14 - 2(cancel) - 2(req_msg) - 2(resp_msg) = 8
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(8);
LoggingChannelProvider.shutdown();
LoggingServerProvider.shutdown();
}
}

View File

@ -22,8 +22,11 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.collect.ImmutableList;
import io.grpc.observability.ObservabilityConfig.LogFilter;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.io.IOException;
import java.util.List;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@ -99,23 +102,23 @@ public class ObservabilityConfigImplTest {
observabilityConfig.parse(LOG_FILTERS);
assertTrue(observabilityConfig.isEnableCloudLogging());
assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing");
ObservabilityConfig.LogFilter[] logFilters = observabilityConfig.getLogFilters();
assertThat(logFilters).hasLength(2);
assertThat(logFilters[0].pattern).isEqualTo("*/*");
assertThat(logFilters[0].headerBytes).isEqualTo(4096);
assertThat(logFilters[0].messageBytes).isEqualTo(2048);
assertThat(logFilters[1].pattern).isEqualTo("service1/Method2");
assertThat(logFilters[1].headerBytes).isNull();
assertThat(logFilters[1].messageBytes).isNull();
List<LogFilter> logFilters = observabilityConfig.getLogFilters();
assertThat(logFilters).hasSize(2);
assertThat(logFilters.get(0).pattern).isEqualTo("*/*");
assertThat(logFilters.get(0).headerBytes).isEqualTo(4096);
assertThat(logFilters.get(0).messageBytes).isEqualTo(2048);
assertThat(logFilters.get(1).pattern).isEqualTo("service1/Method2");
assertThat(logFilters.get(1).headerBytes).isNull();
assertThat(logFilters.get(1).messageBytes).isNull();
}
@Test
public void eventTypes() throws IOException {
observabilityConfig.parse(EVENT_TYPES);
assertFalse(observabilityConfig.isEnableCloudLogging());
EventType[] eventTypes = observabilityConfig.getEventTypes();
List<EventType> eventTypes = observabilityConfig.getEventTypes();
assertThat(eventTypes).isEqualTo(
new EventType[]{EventType.GRPC_CALL_REQUEST_HEADER, EventType.GRPC_CALL_HALF_CLOSE,
EventType.GRPC_CALL_TRAILER});
ImmutableList.of(EventType.GRPC_CALL_REQUEST_HEADER, EventType.GRPC_CALL_HALF_CLOSE,
EventType.GRPC_CALL_TRAILER));
}
}

View File

@ -0,0 +1,219 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.observability.interceptors;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import io.grpc.MethodDescriptor;
import io.grpc.observability.ObservabilityConfig;
import io.grpc.observability.ObservabilityConfig.LogFilter;
import io.grpc.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.testing.TestMethodDescriptors;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
public class ConfigFilterHelperTest {
private static final ImmutableList<LogFilter> configLogFilters =
ImmutableList.of(
new LogFilter("service1/Method2",1024,1024),
new LogFilter("service2/*",2048,1024),
new LogFilter("*",128,128),
new LogFilter("service2/*",2048,1024));
private static final ImmutableList<EventType> configEventTypes =
ImmutableList.of(
EventType.GRPC_CALL_REQUEST_HEADER,
EventType.GRPC_CALL_HALF_CLOSE,
EventType.GRPC_CALL_TRAILER);
private final MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod()
.toBuilder();
private MethodDescriptor<Void, Void> method;
private ObservabilityConfig mockConfig;
private ConfigFilterHelper configFilterHelper;
@Before
public void setup() {
mockConfig = mock(ObservabilityConfig.class);
configFilterHelper = new ConfigFilterHelper(mockConfig);
}
@Test
public void disableCloudLogging_emptyLogFilters() {
when(mockConfig.isEnableCloudLogging()).thenReturn(false);
assertFalse(configFilterHelper.methodOrServiceFilterPresent);
assertThat(configFilterHelper.perServiceFilters).isEmpty();
assertThat(configFilterHelper.perServiceFilters).isEmpty();
assertThat(configFilterHelper.perMethodFilters).isEmpty();
assertThat(configFilterHelper.logEventTypeSet).isNull();
}
@Test
public void enableCloudLogging_emptyLogFilters() {
when(mockConfig.isEnableCloudLogging()).thenReturn(true);
when(mockConfig.getLogFilters()).thenReturn(null);
when(mockConfig.getEventTypes()).thenReturn(null);
configFilterHelper.setMethodOrServiceFilterMaps();
configFilterHelper.setEventFilterSet();
assertFalse(configFilterHelper.methodOrServiceFilterPresent);
assertThat(configFilterHelper.perServiceFilters).isEmpty();
assertThat(configFilterHelper.perServiceFilters).isEmpty();
assertThat(configFilterHelper.perMethodFilters).isEmpty();
assertThat(configFilterHelper.logEventTypeSet).isNull();
}
@Test
public void enableCloudLogging_withLogFilters() {
when(mockConfig.isEnableCloudLogging()).thenReturn(true);
when(mockConfig.getLogFilters()).thenReturn(configLogFilters);
when(mockConfig.getEventTypes()).thenReturn(configEventTypes);
configFilterHelper.setMethodOrServiceFilterMaps();
configFilterHelper.setEventFilterSet();
assertTrue(configFilterHelper.methodOrServiceFilterPresent);
Map<String, FilterParams> expectedServiceFilters = new HashMap<>();
expectedServiceFilters.put("*",
FilterParams.create(true, 128, 128));
expectedServiceFilters.put("service2",
FilterParams.create(true, 2048, 1024));
assertEquals(configFilterHelper.perServiceFilters, expectedServiceFilters);
Map<String, FilterParams> expectedMethodFilters = new HashMap<>();
expectedMethodFilters.put("service1/Method2",
FilterParams.create(true, 1024, 1024));
assertEquals(configFilterHelper.perMethodFilters, expectedMethodFilters);
Set<EventType> expectedLogEventTypeSet = ImmutableSet.copyOf(configEventTypes);
assertEquals(configFilterHelper.logEventTypeSet, expectedLogEventTypeSet);
}
@Test
public void checkMethodAlwaysLogged() {
List<LogFilter> sampleLogFilters = ImmutableList.of(
new LogFilter("*", 4096, 4096));
when(mockConfig.getLogFilters()).thenReturn(sampleLogFilters);
configFilterHelper.setMethodOrServiceFilterMaps();
FilterParams expectedParams =
FilterParams.create(true, 4096, 4096);
method = builder.setFullMethodName("service1/Method6").build();
FilterParams resultParams
= configFilterHelper.isMethodToBeLogged(method);
assertEquals(resultParams, expectedParams);
}
@Test
public void checkMethodNotToBeLogged() {
List<LogFilter> sampleLogFilters = ImmutableList.of(
new LogFilter("service1/Method2", 1024, 1024),
new LogFilter("service2/*", 2048, 1024));
when(mockConfig.getLogFilters()).thenReturn(sampleLogFilters);
configFilterHelper.setMethodOrServiceFilterMaps();
FilterParams expectedParams =
FilterParams.create(false, 0, 0);
method = builder.setFullMethodName("service3/Method3").build();
FilterParams resultParams
= configFilterHelper.isMethodToBeLogged(method);
assertEquals(resultParams, expectedParams);
}
@Test
public void checkMethodToBeLoggedConditional() {
when(mockConfig.getLogFilters()).thenReturn(configLogFilters);
configFilterHelper.setMethodOrServiceFilterMaps();
FilterParams expectedParams =
FilterParams.create(true, 1024, 1024);
method = builder.setFullMethodName("service1/Method2").build();
FilterParams resultParams
= configFilterHelper.isMethodToBeLogged(method);
assertEquals(resultParams, expectedParams);
FilterParams expectedParamsWildCard =
FilterParams.create(true, 2048, 1024);
method = builder.setFullMethodName("service2/Method1").build();
FilterParams resultParamsWildCard
= configFilterHelper.isMethodToBeLogged(method);
assertEquals(resultParamsWildCard, expectedParamsWildCard);
}
@Test
public void checkEventToBeLogged_noFilter_defaultLogAllEventTypes() {
List<EventType> eventList = new ArrayList<>();
eventList.add(EventType.GRPC_CALL_REQUEST_HEADER);
eventList.add(EventType.GRPC_CALL_RESPONSE_HEADER);
eventList.add(EventType.GRPC_CALL_REQUEST_MESSAGE);
eventList.add(EventType.GRPC_CALL_RESPONSE_MESSAGE);
eventList.add(EventType.GRPC_CALL_HALF_CLOSE);
eventList.add(EventType.GRPC_CALL_TRAILER);
eventList.add(EventType.GRPC_CALL_CANCEL);
for (EventType event : eventList) {
assertTrue(configFilterHelper.isEventToBeLogged(event));
}
}
@Test
public void checkEventToBeLogged_emptyFilter_doNotLogEventTypes() {
when(mockConfig.getEventTypes()).thenReturn(new ArrayList<>());
configFilterHelper.setEventFilterSet();
List<EventType> eventList = new ArrayList<>();
eventList.add(EventType.GRPC_CALL_REQUEST_HEADER);
eventList.add(EventType.GRPC_CALL_RESPONSE_HEADER);
eventList.add(EventType.GRPC_CALL_REQUEST_MESSAGE);
eventList.add(EventType.GRPC_CALL_RESPONSE_MESSAGE);
eventList.add(EventType.GRPC_CALL_HALF_CLOSE);
eventList.add(EventType.GRPC_CALL_TRAILER);
eventList.add(EventType.GRPC_CALL_CANCEL);
for (EventType event : eventList) {
assertFalse(configFilterHelper.isEventToBeLogged(event));
}
}
@Test
public void checkEventToBeLogged_withEventTypesFromConfig() {
when(mockConfig.getEventTypes()).thenReturn(configEventTypes);
configFilterHelper.setEventFilterSet();
EventType logEventType = EventType.GRPC_CALL_REQUEST_HEADER;
assertTrue(configFilterHelper.isEventToBeLogged(logEventType));
EventType doNotLogEventType = EventType.GRPC_CALL_RESPONSE_MESSAGE;
assertFalse(configFilterHelper.isEventToBeLogged(doNotLogEventType));
}
}

View File

@ -18,16 +18,23 @@ package io.grpc.observability.interceptors;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.observability.interceptors.LogHelperTest.BYTEARRAY_MARSHALLER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import io.grpc.Attributes;
@ -42,16 +49,16 @@ import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.internal.NoopClientCall;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;
import io.grpc.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.observabilitylog.v1.GrpcLogRecord.MetadataEntry;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -60,7 +67,10 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
@ -73,7 +83,7 @@ public class InternalLoggingChannelInterceptorTest {
@Rule
public final MockitoRule mockito = MockitoJUnit.rule();
private static final Charset US_ASCII = Charset.forName("US-ASCII");
private static final Charset US_ASCII = StandardCharsets.US_ASCII;
private InternalLoggingChannelInterceptor.Factory factory;
private AtomicReference<ClientCall.Listener<byte[]>> interceptedListener;
@ -82,20 +92,28 @@ public class InternalLoggingChannelInterceptorTest {
private SettableFuture<Void> halfCloseCalled;
private SettableFuture<Void> cancelCalled;
private SocketAddress peer;
private final Sink mockSink = mock(GcpLogSink.class);
private LogHelper mockLogHelper;
private ConfigFilterHelper mockFilterHelper;
private FilterParams filterParams;
@Before
public void setup() throws Exception {
factory = new InternalLoggingChannelInterceptor.FactoryImpl(mockSink, null, null, null);
mockLogHelper = mock(LogHelper.class);
mockFilterHelper = mock(ConfigFilterHelper.class);
factory = new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper);
interceptedListener = new AtomicReference<>();
actualClientInitial = new AtomicReference<>();
actualRequest = new AtomicReference<>();
halfCloseCalled = SettableFuture.create();
cancelCalled = SettableFuture.create();
peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234);
filterParams = FilterParams.create(true, 0, 0);
when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class)))
.thenReturn(true);
}
@Test
@SuppressWarnings("unchecked")
public void internalLoggingChannelInterceptor() throws Exception {
Channel channel = new Channel() {
@Override
@ -137,6 +155,382 @@ public class InternalLoggingChannelInterceptorTest {
}
};
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
.thenReturn(filterParams);
ClientCall<byte[], byte[]> interceptedLoggingCall =
factory.create()
.interceptCall(method,
CallOptions.DEFAULT,
channel);
// send request header
{
Metadata clientInitial = new Metadata();
String dataA = "aaaaaaaaa";
String dataB = "bbbbbbbbb";
Metadata.Key<String> keyA =
Metadata.Key.of("a", Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<String> keyB =
Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER);
clientInitial.put(keyA, dataA);
clientInitial.put(keyB, dataB);
interceptedLoggingCall.start(mockListener, clientInitial);
verify(mockLogHelper).logRequestHeader(
/*seq=*/ eq(1L),
eq("service"),
eq("method"),
eq("the-authority"),
ArgumentMatchers.isNull(),
same(clientInitial),
eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_CLIENT),
anyString(),
ArgumentMatchers.isNull());
verifyNoMoreInteractions(mockLogHelper);
assertSame(clientInitial, actualClientInitial.get());
}
reset(mockLogHelper);
reset(mockListener);
// receive response header
{
Metadata serverInitial = new Metadata();
interceptedListener.get().onHeaders(serverInitial);
verify(mockLogHelper).logResponseHeader(
/*seq=*/ eq(2L),
eq("service"),
eq("method"),
same(serverInitial),
eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_CLIENT),
anyString(),
same(peer));
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onHeaders(same(serverInitial));
}
reset(mockLogHelper);
reset(mockListener);
// send request message
{
byte[] request = "this is a request".getBytes(US_ASCII);
interceptedLoggingCall.sendMessage(request);
verify(mockLogHelper).logRpcMessage(
/*seq=*/ eq(3L),
eq("service"),
eq("method"),
eq(EventType.GRPC_CALL_REQUEST_MESSAGE),
same(request),
eq(filterParams.messageBytes()),
eq(EventLogger.LOGGER_CLIENT),
anyString());
verifyNoMoreInteractions(mockLogHelper);
assertSame(request, actualRequest.get());
}
reset(mockLogHelper);
reset(mockListener);
// client half close
{
interceptedLoggingCall.halfClose();
verify(mockLogHelper).logHalfClose(
/*seq=*/ eq(4L),
eq("service"),
eq("method"),
eq(EventLogger.LOGGER_CLIENT),
anyString());
halfCloseCalled.get(1, TimeUnit.MILLISECONDS);
verifyNoMoreInteractions(mockLogHelper);
}
reset(mockLogHelper);
reset(mockListener);
// receive response message
{
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedListener.get().onMessage(response);
verify(mockLogHelper).logRpcMessage(
/*seq=*/ eq(5L),
eq("service"),
eq("method"),
eq(EventType.GRPC_CALL_RESPONSE_MESSAGE),
same(response),
eq(filterParams.messageBytes()),
eq(EventLogger.LOGGER_CLIENT),
anyString());
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onMessage(same(response));
}
reset(mockLogHelper);
reset(mockListener);
// receive trailer
{
Status status = Status.INTERNAL.withDescription("trailer description");
Metadata trailers = new Metadata();
interceptedListener.get().onClose(status, trailers);
verify(mockLogHelper).logTrailer(
/*seq=*/ eq(6L),
eq("service"),
eq("method"),
same(status),
same(trailers),
eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_CLIENT),
anyString(),
same(peer));
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onClose(same(status), same(trailers));
}
reset(mockLogHelper);
reset(mockListener);
// cancel
{
interceptedLoggingCall.cancel(null, null);
verify(mockLogHelper).logCancel(
/*seq=*/ eq(7L),
eq("service"),
eq("method"),
eq(EventLogger.LOGGER_CLIENT),
anyString());
cancelCalled.get(1, TimeUnit.MILLISECONDS);
}
}
@Test
public void clientDeadLineLogged_deadlineSetViaCallOption() {
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
.thenReturn(filterParams);
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
ClientCall<byte[], byte[]> interceptedLoggingCall =
factory.create()
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<>();
}
@Override
public String authority() {
return "the-authority";
}
});
interceptedLoggingCall.start(mockListener, new Metadata());
ArgumentCaptor<Duration> callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockLogHelper, times(1))
.logRequestHeader(
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
callOptTimeoutCaptor.capture(),
any(Metadata.class),
anyInt(),
any(GrpcLogRecord.EventLogger.class),
anyString(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
ArgumentMatchers.any()));
Duration timeout = callOptTimeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@Test
public void clientDeadlineLogged_deadlineSetViaContext() throws Exception {
final SettableFuture<ClientCall<byte[], byte[]>> callFuture = SettableFuture.create();
Context.current()
.withDeadline(
Deadline.after(1, TimeUnit.SECONDS),
Executors.newSingleThreadScheduledExecutor())
.run(() -> {
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
.thenReturn(filterParams);
callFuture.set(
factory.create()
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<>();
}
@Override
public String authority() {
return "the-authority";
}
}));
});
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
Objects.requireNonNull(callFuture.get()).start(mockListener, new Metadata());
ArgumentCaptor<Duration> contextTimeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockLogHelper, times(1))
.logRequestHeader(
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
contextTimeoutCaptor.capture(),
any(Metadata.class),
anyInt(),
any(GrpcLogRecord.EventLogger.class),
anyString(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
ArgumentMatchers.any()));
Duration timeout = contextTimeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@Test
public void clientDeadlineLogged_deadlineSetViaContextAndCallOptions() throws Exception {
Deadline contextDeadline = Deadline.after(10, TimeUnit.SECONDS);
Deadline callOptionsDeadline = CallOptions.DEFAULT
.withDeadlineAfter(15, TimeUnit.SECONDS).getDeadline();
final SettableFuture<ClientCall<byte[], byte[]>> callFuture = SettableFuture.create();
Context.current()
.withDeadline(
contextDeadline, Executors.newSingleThreadScheduledExecutor())
.run(() -> {
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
.thenReturn(filterParams);
callFuture.set(
factory.create()
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(15, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<>();
}
@Override
public String authority() {
return "the-authority";
}
}));
});
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
Objects.requireNonNull(callFuture.get()).start(mockListener, new Metadata());
ArgumentCaptor<Duration> timeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockLogHelper, times(1))
.logRequestHeader(
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
timeoutCaptor.capture(),
any(Metadata.class),
anyInt(),
any(GrpcLogRecord.EventLogger.class),
anyString(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
ArgumentMatchers.any()));
Duration timeout = timeoutCaptor.getValue();
assertThat(LogHelper.min(contextDeadline, callOptionsDeadline))
.isSameInstanceAs(contextDeadline);
assertThat(TimeUnit.SECONDS.toNanos(10) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@Test
public void clientMethodOrServiceFilter_disabled() {
Channel channel = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
return new NoopClientCall<RequestT, ResponseT>() {
@Override
@SuppressWarnings("unchecked")
public void start(Listener<ResponseT> responseListener, Metadata headers) {
interceptedListener.set((Listener<byte[]>) responseListener);
actualClientInitial.set(headers);
}
@Override
public void sendMessage(RequestT message) {
actualRequest.set(message);
}
@Override
public void cancel(String message, Throwable cause) {
cancelCalled.set(null);
}
@Override
public void halfClose() {
halfCloseCalled.set(null);
}
@Override
public Attributes getAttributes() {
return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build();
}
};
}
@Override
public String authority() {
return "the-authority";
}
};
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
@ -147,6 +541,8 @@ public class InternalLoggingChannelInterceptorTest {
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
.thenReturn(FilterParams.create(false, 0, 0));
ClientCall<byte[], byte[]> interceptedLoggingCall =
factory.create()
@ -154,130 +550,55 @@ public class InternalLoggingChannelInterceptorTest {
CallOptions.DEFAULT,
channel);
// send client header
{
EventType expectedRequestHeaderEvent = EventType.GRPC_CALL_REQUEST_HEADER;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
Metadata clientInitial = new Metadata();
String dataA = "aaaaaaaaa";
String dataB = "bbbbbbbbb";
Metadata.Key<String> keyA =
Metadata.Key.of("a", Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<String> keyB =
Metadata.Key.of("b", Metadata.ASCII_STRING_MARSHALLER);
MetadataEntry entryA =
MetadataEntry
.newBuilder()
.setKey(keyA.name())
.setValue(ByteString.copyFrom(dataA.getBytes(US_ASCII)))
.build();
MetadataEntry entryB =
MetadataEntry
.newBuilder()
.setKey(keyB.name())
.setValue(ByteString.copyFrom(dataB.getBytes(US_ASCII)))
.build();
clientInitial.put(keyA, dataA);
clientInitial.put(keyB, dataB);
GrpcLogRecord.Metadata expectedMetadata = GrpcLogRecord.Metadata
.newBuilder()
.addEntry(entryA)
.addEntry(entryB)
.build();
interceptedLoggingCall.start(mockListener, clientInitial);
verify(mockSink).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedRequestHeaderEvent);
assertEquals(captor.getValue().getSequenceId(), 1L);
assertEquals(captor.getValue().getServiceName(), "service");
assertEquals(captor.getValue().getMethodName(), "method");
assertEquals(captor.getValue().getAuthority(), "the-authority");
assertEquals(captor.getValue().getMetadata(), expectedMetadata);
verifyNoMoreInteractions(mockSink);
assertSame(clientInitial, actualClientInitial.get());
}
// TODO(dnvindhya) : Add a helper method to verify other fields of GrpcLogRecord for all events
// receive server header
{
EventType expectedResponseHeaderEvent = EventType.GRPC_CALL_RESPONSE_HEADER;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
Metadata serverInitial = new Metadata();
interceptedListener.get().onHeaders(serverInitial);
verify(mockSink, times(2)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedResponseHeaderEvent);
verifyNoMoreInteractions(mockSink);
verify(mockListener).onHeaders(same(serverInitial));
}
// send client message
{
EventType expectedRequestMessageEvent = EventType.GRPC_CALL_REQUEST_MESSAGE;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
byte[] request = "this is a request".getBytes(US_ASCII);
interceptedLoggingCall.sendMessage(request);
verify(mockSink, times(3)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedRequestMessageEvent);
verifyNoMoreInteractions(mockSink);
assertSame(request, actualRequest.get());
}
// client half close
{
EventType expectedHalfCloseEvent = EventType.GRPC_CALL_HALF_CLOSE;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
interceptedLoggingCall.halfClose();
verify(mockSink, times(4)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedHalfCloseEvent);
halfCloseCalled.get(1, TimeUnit.SECONDS);
verifyNoMoreInteractions(mockSink);
}
// receive server message
{
EventType expectedResponseMessageEvent = EventType.GRPC_CALL_RESPONSE_MESSAGE;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedListener.get().onMessage(response);
verify(mockSink, times(5)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedResponseMessageEvent);
verifyNoMoreInteractions(mockSink);
verify(mockListener).onMessage(same(response));
}
// receive trailer
{
EventType expectedTrailerEvent = EventType.GRPC_CALL_TRAILER;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
Status status = Status.INTERNAL.withDescription("trailer description");
Metadata trailers = new Metadata();
interceptedListener.get().onClose(status, trailers);
verify(mockSink, times(6)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedTrailerEvent);
verifyNoMoreInteractions(mockSink);
verify(mockListener).onClose(same(status), same(trailers));
}
// cancel
{
EventType expectedCancelEvent = EventType.GRPC_CALL_CANCEL;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
interceptedLoggingCall.cancel(null, null);
verify(mockSink, times(7)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedCancelEvent);
cancelCalled.get(1, TimeUnit.SECONDS);
}
interceptedLoggingCall.start(mockListener, new Metadata());
verifyNoInteractions(mockLogHelper);
}
@Test
public void clientDeadLineLogged_deadlineSetViaCallOption() {
public void clientMethodOrServiceFilter_enabled() {
Channel channel = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
return new NoopClientCall<RequestT, ResponseT>() {
@Override
@SuppressWarnings("unchecked")
public void start(Listener<ResponseT> responseListener, Metadata headers) {
interceptedListener.set((Listener<byte[]>) responseListener);
actualClientInitial.set(headers);
}
@Override
public void sendMessage(RequestT message) {
actualRequest.set(message);
}
@Override
public void cancel(String message, Throwable cause) {
cancelCalled.set(null);
}
@Override
public void halfClose() {
halfCloseCalled.set(null);
}
@Override
public Attributes getAttributes() {
return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build();
}
};
}
@Override
public String authority() {
return "the-authority";
}
};
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
@ -285,44 +606,79 @@ public class InternalLoggingChannelInterceptorTest {
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
when(mockFilterHelper.isMethodToBeLogged(method))
.thenReturn(FilterParams.create(true, 10, 10));
ClientCall<byte[], byte[]> interceptedLoggingCall =
factory.create()
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<>();
}
.interceptCall(method,
CallOptions.DEFAULT,
channel);
@Override
public String authority() {
return "the-authority";
}
});
{
interceptedLoggingCall.start(mockListener, new Metadata());
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
verify(mockSink, times(1)).write(captor.capture());
Duration timeout = captor.getValue().getTimeout();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
interceptedListener.get().onHeaders(new Metadata());
byte[] request = "this is a request".getBytes(US_ASCII);
interceptedLoggingCall.sendMessage(request);
interceptedLoggingCall.halfClose();
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedListener.get().onMessage(response);
Status status = Status.INTERNAL.withDescription("trailer description");
Metadata trailers = new Metadata();
interceptedListener.get().onClose(status, trailers);
interceptedLoggingCall.cancel(null, null);
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(7);
}
}
@Test
public void clientDeadlineLogged_deadlineSetViaContext() throws Exception {
final SettableFuture<ClientCall<byte[], byte[]>> callFuture = SettableFuture.create();
Context.current()
.withDeadline(
Deadline.after(1, TimeUnit.SECONDS), Executors.newSingleThreadScheduledExecutor())
.run(new Runnable() {
public void eventFilter_enabled() {
when(mockFilterHelper.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER)).thenReturn(false);
when(mockFilterHelper.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER)).thenReturn(false);
Channel channel = new Channel() {
@Override
public void run() {
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
return new NoopClientCall<RequestT, ResponseT>() {
@Override
@SuppressWarnings("unchecked")
public void start(Listener<ResponseT> responseListener, Metadata headers) {
interceptedListener.set((Listener<byte[]>) responseListener);
actualClientInitial.set(headers);
}
@Override
public void sendMessage(RequestT message) {
actualRequest.set(message);
}
@Override
public void cancel(String message, Throwable cause) {
cancelCalled.set(null);
}
@Override
public void halfClose() {
halfCloseCalled.set(null);
}
@Override
public Attributes getAttributes() {
return Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build();
}
};
}
@Override
public String authority() {
return "the-authority";
}
};
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
@ -330,87 +686,49 @@ public class InternalLoggingChannelInterceptorTest {
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
.thenReturn(FilterParams.create(true, 10, 10));
callFuture.set(
ClientCall<byte[], byte[]> interceptedLoggingCall =
factory.create()
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<>();
}
.interceptCall(method,
CallOptions.DEFAULT,
channel);
@Override
public String authority() {
return "the-authority";
}
}));
}
});
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
callFuture.get().start(mockListener, new Metadata());
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
verify(mockSink, times(1)).write(captor.capture());
Duration timeout = captor.getValue().getTimeout();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@Test
public void clientDeadlineLogged_deadlineSetViaContextAndCallOptions() throws Exception {
Deadline contextDeadline = Deadline.after(10, TimeUnit.SECONDS);
Deadline callOptionsDeadline = CallOptions.DEFAULT
.withDeadlineAfter(15, TimeUnit.SECONDS).getDeadline();
final SettableFuture<ClientCall<byte[], byte[]>> callFuture = SettableFuture.create();
Context.current()
.withDeadline(
contextDeadline, Executors.newSingleThreadScheduledExecutor())
.run(new Runnable() {
@Override
public void run() {
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
callFuture.set(
factory.create()
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(15, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<>();
}
@Override
public String authority() {
return "the-authority";
}
}));
}
});
@SuppressWarnings("unchecked")
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
callFuture.get().start(mockListener, new Metadata());
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
verify(mockSink, times(1)).write(captor.capture());
Duration timeout = captor.getValue().getTimeout();
assertThat(LogHelper.min(contextDeadline, callOptionsDeadline))
.isSameInstanceAs(contextDeadline);
assertThat(TimeUnit.SECONDS.toNanos(10) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
{
interceptedLoggingCall.start(mockListener, new Metadata());
verify(mockLogHelper, never()).logRequestHeader(
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
any(Duration.class),
any(Metadata.class),
anyInt(),
any(GrpcLogRecord.EventLogger.class),
anyString(),
AdditionalMatchers.or(ArgumentMatchers.isNull(),
ArgumentMatchers.any()));
interceptedListener.get().onHeaders(new Metadata());
verify(mockLogHelper, never()).logResponseHeader(
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
any(Metadata.class),
anyInt(),
any(GrpcLogRecord.EventLogger.class),
anyString(),
ArgumentMatchers.any());
byte[] request = "this is a request".getBytes(US_ASCII);
interceptedLoggingCall.sendMessage(request);
interceptedLoggingCall.halfClose();
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedListener.get().onMessage(response);
Status status = Status.INTERNAL.withDescription("trailer description");
Metadata trailers = new Metadata();
interceptedListener.get().onClose(status, trailers);
interceptedLoggingCall.cancel(null, null);
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(5);
}
}
}

View File

@ -18,13 +18,20 @@ package io.grpc.observability.interceptors;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.observability.interceptors.LogHelperTest.BYTEARRAY_MARSHALLER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
@ -37,14 +44,15 @@ import io.grpc.MethodDescriptor.MethodType;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.internal.NoopServerCall;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;
import io.grpc.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -53,7 +61,10 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
@ -66,7 +77,7 @@ public class InternalLoggingServerInterceptorTest {
@Rule
public final MockitoRule mockito = MockitoJUnit.rule();
private static final Charset US_ASCII = Charset.forName("US-ASCII");
private static final Charset US_ASCII = StandardCharsets.US_ASCII;
private InternalLoggingServerInterceptor.Factory factory;
private AtomicReference<ServerCall<byte[], byte[]>> interceptedLoggingCall;
@ -77,13 +88,16 @@ public class InternalLoggingServerInterceptorTest {
private AtomicReference<byte[]> actualResponse;
private AtomicReference<Status> actualStatus;
private AtomicReference<Metadata> actualTrailers;
private final Sink mockSink = mock(GcpLogSink.class);
private LogHelper mockLogHelper;
private ConfigFilterHelper mockFilterHelper;
private SocketAddress peer;
@Before
@SuppressWarnings("unchecked")
public void setup() throws Exception {
factory = new InternalLoggingServerInterceptor.FactoryImpl(mockSink, null, null, null);
mockLogHelper = mock(LogHelper.class);
mockFilterHelper = mock(ConfigFilterHelper.class);
factory = new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper);
interceptedLoggingCall = new AtomicReference<>();
mockListener = mock(ServerCall.Listener.class);
actualServerInitial = new AtomicReference<>();
@ -91,9 +105,12 @@ public class InternalLoggingServerInterceptorTest {
actualStatus = new AtomicReference<>();
actualTrailers = new AtomicReference<>();
peer = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234);
when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class)))
.thenReturn(true);
}
@Test
@SuppressWarnings("unchecked")
public void internalLoggingServerInterceptor() {
Metadata clientInitial = new Metadata();
final MethodDescriptor<byte[], byte[]> method =
@ -103,6 +120,8 @@ public class InternalLoggingServerInterceptorTest {
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
FilterParams filterParams = FilterParams.create(true, 0, 0);
when(mockFilterHelper.isMethodToBeLogged(method)).thenReturn(filterParams);
capturedListener =
factory.create()
.interceptCall(
@ -148,89 +167,131 @@ public class InternalLoggingServerInterceptorTest {
});
// receive request header
{
EventType expectedRequestHeaderEvent = EventType.GRPC_CALL_REQUEST_HEADER;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
verify(mockSink).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedRequestHeaderEvent);
verifyNoMoreInteractions(mockSink);
verify(mockLogHelper).logRequestHeader(
/*seq=*/ eq(1L),
eq("service"),
eq("method"),
eq("the-authority"),
ArgumentMatchers.isNull(),
same(clientInitial),
eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_SERVER),
anyString(),
same(peer));
verifyNoMoreInteractions(mockLogHelper);
}
reset(mockLogHelper);
reset(mockListener);
// send response header
{
EventType expectedResponseHeaderEvent = EventType.GRPC_CALL_RESPONSE_HEADER;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
Metadata serverInital = new Metadata();
interceptedLoggingCall.get().sendHeaders(serverInital);
verify(mockSink, times(2)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedResponseHeaderEvent);
verifyNoMoreInteractions(mockSink);
assertSame(serverInital, actualServerInitial.get());
Metadata serverInitial = new Metadata();
interceptedLoggingCall.get().sendHeaders(serverInitial);
verify(mockLogHelper).logResponseHeader(
/*seq=*/ eq(2L),
eq("service"),
eq("method"),
same(serverInitial),
eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_SERVER),
anyString(),
ArgumentMatchers.isNull());
verifyNoMoreInteractions(mockLogHelper);
assertSame(serverInitial, actualServerInitial.get());
}
reset(mockLogHelper);
reset(mockListener);
// receive request message
{
EventType expectedRequestMessageEvent = EventType.GRPC_CALL_REQUEST_MESSAGE;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
byte[] request = "this is a request".getBytes(US_ASCII);
capturedListener.onMessage(request);
verify(mockSink, times(3)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedRequestMessageEvent);
verifyNoMoreInteractions(mockSink);
verify(mockLogHelper).logRpcMessage(
/*seq=*/ eq(3L),
eq("service"),
eq("method"),
eq(EventType.GRPC_CALL_REQUEST_MESSAGE),
same(request),
eq(filterParams.messageBytes()),
eq(EventLogger.LOGGER_SERVER),
anyString());
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onMessage(same(request));
}
reset(mockLogHelper);
reset(mockListener);
// client half close
{
EventType expectedHalfCloseEvent = EventType.GRPC_CALL_HALF_CLOSE;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
capturedListener.onHalfClose();
verify(mockSink, times(4)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedHalfCloseEvent);
verifyNoMoreInteractions(mockSink);
verify(mockLogHelper).logHalfClose(
/*seq=*/ eq(4L),
eq("service"),
eq("method"),
eq(EventLogger.LOGGER_SERVER),
anyString());
verifyNoMoreInteractions(mockLogHelper);
verify(mockListener).onHalfClose();
}
reset(mockLogHelper);
reset(mockListener);
// send response message
{
EventType expectedResponseMessageEvent = EventType.GRPC_CALL_RESPONSE_MESSAGE;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedLoggingCall.get().sendMessage(response);
verify(mockSink, times(5)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedResponseMessageEvent);
verifyNoMoreInteractions(mockSink);
verify(mockLogHelper).logRpcMessage(
/*seq=*/ eq(5L),
eq("service"),
eq("method"),
eq(EventType.GRPC_CALL_RESPONSE_MESSAGE),
same(response),
eq(filterParams.messageBytes()),
eq(EventLogger.LOGGER_SERVER),
anyString());
verifyNoMoreInteractions(mockLogHelper);
assertSame(response, actualResponse.get());
}
reset(mockLogHelper);
reset(mockListener);
// send trailer
{
EventType expectedTrailerEvent = EventType.GRPC_CALL_TRAILER;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
Status status = Status.INTERNAL.withDescription("trailer description");
Metadata trailers = new Metadata();
interceptedLoggingCall.get().close(status, trailers);
verify(mockSink, times(6)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedTrailerEvent);
verifyNoMoreInteractions(mockSink);
verify(mockLogHelper).logTrailer(
/*seq=*/ eq(6L),
eq("service"),
eq("method"),
same(status),
same(trailers),
eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_SERVER),
anyString(),
ArgumentMatchers.isNull());
verifyNoMoreInteractions(mockLogHelper);
assertSame(status, actualStatus.get());
assertSame(trailers, actualTrailers.get());
}
reset(mockLogHelper);
reset(mockListener);
// cancel
{
EventType expectedCancelEvent = EventType.GRPC_CALL_CANCEL;
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
capturedListener.onCancel();
verify(mockSink, times(7)).write(captor.capture());
assertEquals(captor.getValue().getEventType(),
expectedCancelEvent);
verify(mockLogHelper).logCancel(
/*seq=*/ eq(7L),
eq("service"),
eq("method"),
eq(EventLogger.LOGGER_SERVER),
anyString());
verify(mockListener).onCancel();
}
}
@ -244,6 +305,8 @@ public class InternalLoggingServerInterceptorTest {
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
FilterParams filterParams = FilterParams.create(true, 0, 0);
when(mockFilterHelper.isMethodToBeLogged(method)).thenReturn(filterParams);
final ServerCall<byte[], byte[]> noopServerCall = new NoopServerCall<byte[], byte[]>() {
@Override
public MethodDescriptor<byte[], byte[]> getMethodDescriptor() {
@ -265,15 +328,233 @@ public class InternalLoggingServerInterceptorTest {
factory.create()
.interceptCall(noopServerCall,
new Metadata(),
(call, headers) -> {
return new ServerCall.Listener<byte[]>() {};
(call, headers) -> new ServerCall.Listener<byte[]>() {});
});
});
ArgumentCaptor<GrpcLogRecord> captor = ArgumentCaptor.forClass(GrpcLogRecord.class);
verify(mockSink, times(1)).write(captor.capture());
verifyNoMoreInteractions(mockSink);
Duration timeout = captor.getValue().getTimeout();
ArgumentCaptor<Duration> timeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockLogHelper, times(1))
.logRequestHeader(
/*seq=*/ eq(1L),
eq("service"),
eq("method"),
eq("the-authority"),
timeoutCaptor.capture(),
any(Metadata.class),
eq(filterParams.headerBytes()),
eq(EventLogger.LOGGER_SERVER),
anyString(),
ArgumentMatchers.isNull());
verifyNoMoreInteractions(mockLogHelper);
Duration timeout = timeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@Test
public void serverMethodOrServiceFilter_disabled() {
Metadata clientInitial = new Metadata();
final MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method)).thenReturn(FilterParams.create(false, 0, 0));
capturedListener =
factory.create()
.interceptCall(
new NoopServerCall<byte[], byte[]>() {
@Override
public void sendHeaders(Metadata headers) {
actualServerInitial.set(headers);
}
@Override
public void sendMessage(byte[] message) {
actualResponse.set(message);
}
@Override
public void close(Status status, Metadata trailers) {
actualStatus.set(status);
actualTrailers.set(trailers);
}
@Override
public MethodDescriptor<byte[], byte[]> getMethodDescriptor() {
return method;
}
@Override
public Attributes getAttributes() {
return Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer)
.build();
}
@Override
public String getAuthority() {
return "the-authority";
}
},
clientInitial,
(call, headers) -> {
interceptedLoggingCall.set(call);
return mockListener;
});
verifyNoInteractions(mockLogHelper);
}
@Test
public void serverMethodOrServiceFilter_enabled() {
Metadata clientInitial = new Metadata();
final MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
.thenReturn(FilterParams.create(true, 10, 10));
capturedListener =
factory.create()
.interceptCall(
new NoopServerCall<byte[], byte[]>() {
@Override
public void sendHeaders(Metadata headers) {
actualServerInitial.set(headers);
}
@Override
public void sendMessage(byte[] message) {
actualResponse.set(message);
}
@Override
public void close(Status status, Metadata trailers) {
actualStatus.set(status);
actualTrailers.set(trailers);
}
@Override
public MethodDescriptor<byte[], byte[]> getMethodDescriptor() {
return method;
}
@Override
public Attributes getAttributes() {
return Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer)
.build();
}
@Override
public String getAuthority() {
return "the-authority";
}
},
clientInitial,
(call, headers) -> {
interceptedLoggingCall.set(call);
return mockListener;
});
{
interceptedLoggingCall.get().sendHeaders(new Metadata());
byte[] request = "this is a request".getBytes(US_ASCII);
capturedListener.onMessage(request);
capturedListener.onHalfClose();
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedLoggingCall.get().sendMessage(response);
Status status = Status.INTERNAL.withDescription("trailer description");
Metadata trailers = new Metadata();
interceptedLoggingCall.get().close(status, trailers);
capturedListener.onCancel();
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(7);
}
}
@Test
public void eventFilter_enabled() {
when(mockFilterHelper.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)).thenReturn(false);
Metadata clientInitial = new Metadata();
final MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
when(mockFilterHelper.isMethodToBeLogged(method))
.thenReturn(FilterParams.create(true, 10, 10));
capturedListener =
factory.create()
.interceptCall(
new NoopServerCall<byte[], byte[]>() {
@Override
public void sendHeaders(Metadata headers) {
actualServerInitial.set(headers);
}
@Override
public void sendMessage(byte[] message) {
actualResponse.set(message);
}
@Override
public void close(Status status, Metadata trailers) {
actualStatus.set(status);
actualTrailers.set(trailers);
}
@Override
public MethodDescriptor<byte[], byte[]> getMethodDescriptor() {
return method;
}
@Override
public Attributes getAttributes() {
return Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer)
.build();
}
@Override
public String getAuthority() {
return "the-authority";
}
},
clientInitial,
(call, headers) -> {
interceptedLoggingCall.set(call);
return mockListener;
});
{
interceptedLoggingCall.get().sendHeaders(new Metadata());
byte[] request = "this is a request".getBytes(US_ASCII);
capturedListener.onMessage(request);
capturedListener.onHalfClose();
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedLoggingCall.get().sendMessage(response);
Status status = Status.INTERNAL.withDescription("trailer description");
Metadata trailers = new Metadata();
interceptedLoggingCall.get().close(status, trailers);
verify(mockLogHelper, never()).logHalfClose(
anyLong(),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
AdditionalMatchers.or(ArgumentMatchers.isNull(), anyString()),
any(GrpcLogRecord.EventLogger.class),
anyString());
capturedListener.onCancel();
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(6);
}
}
}

View File

@ -19,12 +19,15 @@ package io.grpc.observability.interceptors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.Timestamp;
@ -35,7 +38,6 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.Status;
import io.grpc.internal.TimeProvider;
import io.grpc.observability.ObservabilityConfig;
import io.grpc.observability.interceptors.LogHelper.PayloadBuilder;
import io.grpc.observability.logging.GcpLogSink;
import io.grpc.observability.logging.Sink;
@ -53,8 +55,8 @@ import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.Map;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
@ -66,8 +68,6 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class LogHelperTest {
private static final Charset US_ASCII = Charset.forName("US-ASCII");
public static final Marshaller<byte[]> BYTEARRAY_MARSHALLER = new ByteArrayMarshaller();
private static final String DATA_A = "aaaaaaaaa";
private static final String DATA_B = "bbbbbbbbb";
@ -82,38 +82,32 @@ public class LogHelperTest {
MetadataEntry
.newBuilder()
.setKey(KEY_A.name())
.setValue(ByteString.copyFrom(DATA_A.getBytes(US_ASCII)))
.setValue(ByteString.copyFrom(DATA_A.getBytes(StandardCharsets.US_ASCII)))
.build();
private static final MetadataEntry ENTRY_B =
MetadataEntry
.newBuilder()
.setKey(KEY_B.name())
.setValue(ByteString.copyFrom(DATA_B.getBytes(US_ASCII)))
.setValue(ByteString.copyFrom(DATA_B.getBytes(StandardCharsets.US_ASCII)))
.build();
private static final MetadataEntry ENTRY_C =
MetadataEntry
.newBuilder()
.setKey(KEY_C.name())
.setValue(ByteString.copyFrom(DATA_C.getBytes(US_ASCII)))
.setValue(ByteString.copyFrom(DATA_C.getBytes(StandardCharsets.US_ASCII)))
.build();
private static final int HEADER_LIMIT = 10;
private static final int MESSAGE_LIMIT = Integer.MAX_VALUE;
private final Metadata nonEmptyMetadata = new Metadata();
private final int nonEmptyMetadataSize = 30;
private final Sink sink = mock(GcpLogSink.class);
private final Timestamp timestamp
= Timestamp.newBuilder().setSeconds(9876).setNanos(54321).build();
private final TimeProvider timeProvider = () -> TimeUnit.SECONDS.toNanos(9876) + 54321;
@SuppressWarnings("unchecked") private final Map<String, String> locationTags = mock(Map.class);
@SuppressWarnings("unchecked") private final Map<String, String> customTags = mock(Map.class);
private final ObservabilityConfig observabilityConfig = mock(ObservabilityConfig.class);
private final LogHelper logHelper =
new LogHelper(
sink,
timeProvider, locationTags, customTags, observabilityConfig);
private final LogHelper logHelper = new LogHelper(sink, timeProvider);
@Before
public void setUp() throws Exception {
public void setUp() {
nonEmptyMetadata.put(KEY_A, DATA_A);
nonEmptyMetadata.put(KEY_B, DATA_B);
nonEmptyMetadata.put(KEY_C, DATA_C);
@ -151,7 +145,7 @@ public class LogHelperTest {
}
@Test
public void socketToProto_unknown() throws Exception {
public void socketToProto_unknown() {
SocketAddress unknownSocket = new SocketAddress() {
@Override
public String toString() {
@ -167,7 +161,7 @@ public class LogHelperTest {
}
@Test
public void metadataToProto_empty() throws Exception {
public void metadataToProto_empty() {
assertEquals(
GrpcLogRecord.newBuilder()
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
@ -175,11 +169,12 @@ public class LogHelperTest {
GrpcLogRecord.Metadata.getDefaultInstance())
.build(),
metadataToProtoTestHelper(
EventType.GRPC_CALL_REQUEST_HEADER, new Metadata()));
EventType.GRPC_CALL_REQUEST_HEADER, new Metadata(), Integer.MAX_VALUE));
}
@Test
public void metadataToProto() throws Exception {
public void metadataToProto() {
int nonEmptyMetadataSize = 30;
assertEquals(
GrpcLogRecord.newBuilder()
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
@ -193,9 +188,101 @@ public class LogHelperTest {
.setPayloadSize(nonEmptyMetadataSize)
.build(),
metadataToProtoTestHelper(
EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata));
EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata, Integer.MAX_VALUE));
}
@Test
public void metadataToProto_setsTruncated() {
assertTrue(LogHelper.createMetadataProto(nonEmptyMetadata, 0).truncated);
}
@Test
public void metadataToProto_truncated() {
// 0 byte limit not enough for any metadata
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance(),
LogHelper.createMetadataProto(nonEmptyMetadata, 0).payload.build());
// not enough bytes for first key value
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance(),
LogHelper.createMetadataProto(nonEmptyMetadata, 9).payload.build());
// enough for first key value
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
.newBuilder()
.addEntry(ENTRY_A)
.build(),
LogHelper.createMetadataProto(nonEmptyMetadata, 10).payload.build());
// Test edge cases for >= 2 key values
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
.newBuilder()
.addEntry(ENTRY_A)
.build(),
LogHelper.createMetadataProto(nonEmptyMetadata, 19).payload.build());
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
.newBuilder()
.addEntry(ENTRY_A)
.addEntry(ENTRY_B)
.build(),
LogHelper.createMetadataProto(nonEmptyMetadata, 20).payload.build());
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
.newBuilder()
.addEntry(ENTRY_A)
.addEntry(ENTRY_B)
.build(),
LogHelper.createMetadataProto(nonEmptyMetadata, 29).payload.build());
// not truncated: enough for all keys
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
.newBuilder()
.addEntry(ENTRY_A)
.addEntry(ENTRY_B)
.addEntry(ENTRY_C)
.build(),
LogHelper.createMetadataProto(nonEmptyMetadata, 30).payload.build());
}
@Test
public void messageToProto() {
byte[] bytes
= "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(
StandardCharsets.US_ASCII);
assertEquals(
GrpcLogRecord.newBuilder()
.setMessage(ByteString.copyFrom(bytes))
.setPayloadSize(bytes.length)
.build(),
messageTestHelper(bytes, Integer.MAX_VALUE));
}
@Test
public void messageToProto_truncated() {
byte[] bytes
= "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(
StandardCharsets.US_ASCII);
assertEquals(
GrpcLogRecord.newBuilder()
.setPayloadSize(bytes.length)
.setPayloadTruncated(true)
.build(),
messageTestHelper(bytes, 0));
int limit = 10;
String truncatedMessage = "this is a ";
assertEquals(
GrpcLogRecord.newBuilder()
.setMessage(ByteString.copyFrom(truncatedMessage.getBytes(StandardCharsets.US_ASCII)))
.setPayloadSize(bytes.length)
.setPayloadTruncated(true)
.build(),
messageTestHelper(bytes, limit));
}
@Test
public void logRequestHeader() throws Exception {
long seqId = 1;
@ -209,7 +296,8 @@ public class LogHelperTest {
InetSocketAddress peerAddress = new InetSocketAddress(address, port);
GrpcLogRecord.Builder builder =
metadataToProtoTestHelper(EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata)
metadataToProtoTestHelper(EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata,
HEADER_LIMIT)
.toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId)
@ -232,6 +320,7 @@ public class LogHelperTest {
authority,
timeout,
nonEmptyMetadata,
HEADER_LIMIT,
EventLogger.LOGGER_CLIENT,
rpcId,
null);
@ -247,6 +336,7 @@ public class LogHelperTest {
authority,
timeout,
nonEmptyMetadata,
HEADER_LIMIT,
EventLogger.LOGGER_SERVER,
rpcId,
peerAddress);
@ -266,6 +356,7 @@ public class LogHelperTest {
authority,
null,
nonEmptyMetadata,
HEADER_LIMIT,
EventLogger.LOGGER_CLIENT,
rpcId,
null);
@ -284,6 +375,7 @@ public class LogHelperTest {
authority,
timeout,
nonEmptyMetadata,
HEADER_LIMIT,
EventLogger.LOGGER_CLIENT,
rpcId,
peerAddress);
@ -304,7 +396,8 @@ public class LogHelperTest {
InetSocketAddress peerAddress = new InetSocketAddress(address, port);
GrpcLogRecord.Builder builder =
metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata)
metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata,
HEADER_LIMIT)
.toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId)
@ -324,6 +417,7 @@ public class LogHelperTest {
serviceName,
methodName,
nonEmptyMetadata,
HEADER_LIMIT,
EventLogger.LOGGER_CLIENT,
rpcId,
peerAddress);
@ -337,6 +431,7 @@ public class LogHelperTest {
serviceName,
methodName,
nonEmptyMetadata,
HEADER_LIMIT,
EventLogger.LOGGER_SERVER,
rpcId,
null);
@ -354,9 +449,11 @@ public class LogHelperTest {
serviceName,
methodName,
nonEmptyMetadata,
HEADER_LIMIT,
EventLogger.LOGGER_SERVER,
rpcId,
peerAddress);
fail();
} catch (IllegalArgumentException expected) {
assertThat(expected).hasMessageThat()
@ -376,7 +473,8 @@ public class LogHelperTest {
Status statusDescription = Status.INTERNAL.withDescription("test description");
GrpcLogRecord.Builder builder =
metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata)
metadataToProtoTestHelper(EventType.GRPC_CALL_RESPONSE_HEADER, nonEmptyMetadata,
HEADER_LIMIT)
.toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId)
@ -399,6 +497,7 @@ public class LogHelperTest {
methodName,
statusDescription,
nonEmptyMetadata,
HEADER_LIMIT,
EventLogger.LOGGER_CLIENT,
rpcId,
peerAddress);
@ -413,6 +512,7 @@ public class LogHelperTest {
methodName,
statusDescription,
nonEmptyMetadata,
HEADER_LIMIT,
EventLogger.LOGGER_SERVER,
rpcId,
null);
@ -431,6 +531,7 @@ public class LogHelperTest {
methodName,
statusDescription,
nonEmptyMetadata,
HEADER_LIMIT,
EventLogger.LOGGER_CLIENT,
rpcId,
null);
@ -448,6 +549,7 @@ public class LogHelperTest {
methodName,
statusDescription.getCode().toStatus(),
nonEmptyMetadata,
HEADER_LIMIT,
EventLogger.LOGGER_CLIENT,
rpcId,
peerAddress);
@ -459,14 +561,43 @@ public class LogHelperTest {
}
@Test
public void logRpcMessage() throws Exception {
public void alwaysLoggedMetadata_grpcTraceBin() {
Metadata.Key<byte[]> key
= Metadata.Key.of("grpc-trace-bin", Metadata.BINARY_BYTE_MARSHALLER);
Metadata metadata = new Metadata();
metadata.put(key, new byte[1]);
int zeroHeaderBytes = 0;
PayloadBuilder<io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.Builder> pair =
LogHelper.createMetadataProto(metadata, zeroHeaderBytes);
assertEquals(
key.name(),
Objects.requireNonNull(Iterables.getOnlyElement(pair.payload.getEntryBuilderList()))
.getKey());
assertFalse(pair.truncated);
}
@Test
public void neverLoggedMetadata_grpcStatusDetailsBin() {
Metadata.Key<byte[]> key
= Metadata.Key.of("grpc-status-details-bin", Metadata.BINARY_BYTE_MARSHALLER);
Metadata metadata = new Metadata();
metadata.put(key, new byte[1]);
int unlimitedHeaderBytes = Integer.MAX_VALUE;
PayloadBuilder<io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.Builder> pair
= LogHelper.createMetadataProto(metadata, unlimitedHeaderBytes);
assertThat(pair.payload.getEntryBuilderList()).isEmpty();
assertFalse(pair.truncated);
}
@Test
public void logRpcMessage() {
long seqId = 1;
String serviceName = "service";
String methodName = "method";
String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
byte[] message = new byte[100];
GrpcLogRecord.Builder builder = messageTestHelper(message)
GrpcLogRecord.Builder builder = messageTestHelper(message, MESSAGE_LIMIT)
.toBuilder()
.setTimestamp(timestamp)
.setSequenceId(seqId)
@ -485,6 +616,7 @@ public class LogHelperTest {
methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE,
message,
MESSAGE_LIMIT,
EventLogger.LOGGER_CLIENT,
rpcId);
verify(sink).write(base);
@ -497,6 +629,7 @@ public class LogHelperTest {
methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE,
message,
MESSAGE_LIMIT,
EventLogger.LOGGER_CLIENT,
rpcId);
verify(sink).write(
@ -512,6 +645,7 @@ public class LogHelperTest {
methodName,
EventType.GRPC_CALL_REQUEST_MESSAGE,
message,
MESSAGE_LIMIT,
EventLogger.LOGGER_SERVER,
rpcId);
verify(sink).write(
@ -527,6 +661,7 @@ public class LogHelperTest {
methodName,
EventType.GRPC_CALL_RESPONSE_MESSAGE,
message,
MESSAGE_LIMIT,
EventLogger.LOGGER_SERVER,
rpcId);
verify(sink).write(
@ -548,28 +683,31 @@ public class LogHelperTest {
}
private static GrpcLogRecord metadataToProtoTestHelper(
EventType type, Metadata metadata) {
EventType type, Metadata metadata, int maxHeaderBytes) {
GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder();
PayloadBuilder<GrpcLogRecord.Metadata.Builder> pair
= LogHelper.createMetadataProto(metadata);
= LogHelper.createMetadataProto(metadata, maxHeaderBytes);
builder.setMetadata(pair.payload);
builder.setPayloadSize(pair.size);
builder.setPayloadTruncated(pair.truncated);
builder.setEventType(type);
return builder.build();
}
private static GrpcLogRecord messageTestHelper(byte[] message) {
private static GrpcLogRecord messageTestHelper(byte[] message, int maxMessageBytes) {
GrpcLogRecord.Builder builder = GrpcLogRecord.newBuilder();
PayloadBuilder<ByteString> pair
= LogHelper.createMesageProto(message);
= LogHelper.createMessageProto(message, maxMessageBytes);
builder.setMessage(pair.payload);
builder.setPayloadSize(pair.size);
builder.setPayloadTruncated(pair.truncated);
return builder.build();
}
// Used only in tests
// Copied from internal
static final class ByteArrayMarshaller implements Marshaller<byte[]> {
@Override
public InputStream stream(byte[] value) {
return new ByteArrayInputStream(value);
@ -595,6 +733,7 @@ public class LogHelperTest {
// Copied from internal
static final class IoUtils {
/** maximum buffer to be read is 16 KB. */
private static final int MAX_BUFFER_LENGTH = 16384;

View File

@ -20,17 +20,26 @@ import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyIterable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.cloud.MonitoredResource;
import com.google.cloud.logging.LogEntry;
import com.google.cloud.logging.Logging;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Duration;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import com.google.protobuf.util.Durations;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger;
import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -49,6 +58,42 @@ public class GcpLogSinkTest {
@Rule
public final MockitoRule mockito = MockitoJUnit.rule();
private static final Map<String, String> locationTags = ImmutableMap.of("project_id", "PROJECT",
"location", "us-central1-c",
"cluster_name", "grpc-observability-cluster",
"namespace_name", "default" ,
"pod_name", "app1-6c7c58f897-n92c5");
private static final Map<String, String> customTags = ImmutableMap.of("KEY1", "Value1",
"KEY2", "VALUE2");
private static final int flushLimit = 10;
private final long seqId = 1;
private final String serviceName = "service";
private final String methodName = "method";
private final String authority = "authority";
private final Duration timeout = Durations.fromMillis(1234);
private final String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
private final GrpcLogRecord logProto = GrpcLogRecord.newBuilder()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setAuthority(authority)
.setTimeout(timeout)
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
.setEventLogger(EventLogger.LOGGER_CLIENT)
.setRpcId(rpcId)
.build();
private final Struct expectedStructLogProto = Struct.newBuilder()
.putFields("sequence_id", Value.newBuilder().setStringValue(String.valueOf(seqId)).build())
.putFields("service_name", Value.newBuilder().setStringValue(serviceName).build())
.putFields("method_name", Value.newBuilder().setStringValue(methodName).build())
.putFields("authority", Value.newBuilder().setStringValue(authority).build())
.putFields("timeout", Value.newBuilder().setStringValue("1.234s").build())
.putFields("event_type", Value.newBuilder().setStringValue(
String.valueOf(EventType.GRPC_CALL_REQUEST_HEADER)).build())
.putFields("event_logger", Value.newBuilder().setStringValue(
String.valueOf(EventLogger.LOGGER_CLIENT)).build())
.putFields("rpc_id", Value.newBuilder().setStringValue(rpcId).build())
.build();
private Logging mockLogging;
@Before
@ -58,38 +103,81 @@ public class GcpLogSinkTest {
@Test
public void createSink() {
Sink mockSink = new GcpLogSink(mockLogging);
Sink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit);
assertThat(mockSink).isInstanceOf(GcpLogSink.class);
}
@Test
@SuppressWarnings("unchecked")
public void verifyWrite() throws Exception {
Sink mockSink = new GcpLogSink(mockLogging);
GrpcLogRecord logProto = GrpcLogRecord.newBuilder()
.setRpcId("1234")
.build();
Struct expectedStructLogProto = Struct.newBuilder().putFields(
"rpc_id", Value.newBuilder().setStringValue("1234").build()
).build();
Sink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit);
mockSink.write(logProto);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
verify(mockLogging, times(1)).write(logEntrySetCaptor.capture());
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
System.out.println(entry);
assertEquals(entry.getPayload().getData(), expectedStructLogProto);
}
verifyNoMoreInteractions(mockLogging);
}
@Test
@SuppressWarnings("unchecked")
public void verifyWriteWithTags() {
GcpLogSink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit);
MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(locationTags);
mockSink.write(logProto);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
verify(mockLogging, times(1)).write(logEntrySetCaptor.capture());
System.out.println(logEntrySetCaptor.getValue());
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
assertEquals(entry.getResource(), expectedMonitoredResource);
assertEquals(entry.getLabels(), customTags);
assertEquals(entry.getPayload().getData(), expectedStructLogProto);
}
verifyNoMoreInteractions(mockLogging);
}
@Test
@SuppressWarnings("unchecked")
public void emptyCustomTags_labelsNotSet() {
Map<String, String> emptyCustomTags = null;
Map<String, String> expectedEmptyLabels = new HashMap<>();
GcpLogSink mockSink = new GcpLogSink(mockLogging, locationTags, emptyCustomTags, flushLimit);
mockSink.write(logProto);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
verify(mockLogging, times(1)).write(logEntrySetCaptor.capture());
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
assertEquals(entry.getLabels(), expectedEmptyLabels);
assertEquals(entry.getPayload().getData(), expectedStructLogProto);
}
}
@Test
public void verifyFlush() {
int lowerFlushLimit = 2;
GcpLogSink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, lowerFlushLimit);
mockSink.write(logProto);
verify(mockLogging, never()).flush();
mockSink.write(logProto);
verify(mockLogging, times(1)).flush();
mockSink.write(logProto);
mockSink.write(logProto);
verify(mockLogging, times(2)).flush();
}
@Test
public void verifyClose() throws Exception {
Sink mockSink = new GcpLogSink(mockLogging);
GrpcLogRecord logProto = GrpcLogRecord.newBuilder()
.setRpcId("1234")
.build();
Sink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit);
mockSink.write(logProto);
verify(mockLogging, times(1)).write(anyIterable());
mockSink.close();