diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java index 0a7f381f8a..e65f4d8ef0 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/GcpObservability.java @@ -46,7 +46,8 @@ public final class GcpObservability implements AutoCloseable { GlobalLoggingTags globalLoggingTags = new GlobalLoggingTags(); ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance(); Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(), - globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), 10); + globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), + observabilityConfig.getFlushMessageCount()); LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig); instance = grpcInit(sink, diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java index bd846a1dd5..c2f72a9623 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfig.java @@ -26,6 +26,9 @@ public interface ObservabilityConfig { /** Get destination project ID - where logs will go. */ String getDestinationProjectId(); + /** Get message count threshold to flush - flush once message count is reached. */ + Long getFlushMessageCount(); + /** Get filters set for logging. */ List getLogFilters(); diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java index b7e55fcd9d..7068254611 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/ObservabilityConfigImpl.java @@ -34,6 +34,7 @@ final class ObservabilityConfigImpl implements ObservabilityConfig { private boolean enableCloudLogging = true; private String destinationProjectId = null; + private Long flushMessageCount = null; private List logFilters; private List eventTypes; @@ -56,6 +57,7 @@ final class ObservabilityConfigImpl implements ObservabilityConfig { enableCloudLogging = value; } destinationProjectId = JsonUtil.getString(loggingConfig, "destination_project_id"); + flushMessageCount = JsonUtil.getNumberAsLong(loggingConfig, "flush_message_count"); List rawList = JsonUtil.getList(loggingConfig, "log_filters"); if (rawList != null) { List> jsonLogFilters = JsonUtil.checkObjectList(rawList); @@ -116,6 +118,11 @@ final class ObservabilityConfigImpl implements ObservabilityConfig { return destinationProjectId; } + @Override + public Long getFlushMessageCount() { + return flushMessageCount; + } + @Override public List getLogFilters() { return logFilters; diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/ConfigFilterHelper.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/ConfigFilterHelper.java index 54f1acbb61..38a3c80861 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/ConfigFilterHelper.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/ConfigFilterHelper.java @@ -210,12 +210,6 @@ public class ConfigFilterHelper { if (logEventTypeSet == null) { return true; } - boolean logEvent; - if (logEventTypeSet.isEmpty()) { - logEvent = false; - } else { - logEvent = logEventTypeSet.contains(event); - } - return logEvent; + return logEventTypeSet.contains(event); } } diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/CloudLoggingHandler.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/CloudLoggingHandler.java deleted file mode 100644 index 96625da7e1..0000000000 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/CloudLoggingHandler.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Copyright 2022 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.gcp.observability.logging; - -import com.google.cloud.MonitoredResource; -import com.google.cloud.logging.LogEntry; -import com.google.cloud.logging.Logging; -import com.google.cloud.logging.LoggingOptions; -import com.google.cloud.logging.Payload.JsonPayload; -import com.google.cloud.logging.Severity; -import com.google.common.base.Strings; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.util.JsonFormat; -import io.grpc.Internal; -import io.grpc.internal.JsonParser; -import io.grpc.observabilitylog.v1.GrpcLogRecord; -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.logging.Handler; -import java.util.logging.Level; -import java.util.logging.LogRecord; - -/** - * Custom logging handler that outputs logs generated using {@link java.util.logging.Logger} to - * Cloud Logging. - */ -// TODO(vindhyan): replace custom JUL handler with internal sink implementation to eliminate -// JUL dependency -@Internal -public class CloudLoggingHandler extends Handler { - - private static final String DEFAULT_LOG_NAME = "grpc-observability"; - private static final Level DEFAULT_LOG_LEVEL = Level.ALL; - - private final LoggingOptions loggingOptions; - private final Logging loggingClient; - private final Level baseLevel; - private final String cloudLogName; - - /** - * Creates a custom logging handler that publishes message to Cloud logging. Default log level is - * set to Level.FINEST if level is not passed. - */ - public CloudLoggingHandler() { - this(DEFAULT_LOG_LEVEL, null, null); - } - - /** - * Creates a custom logging handler that publishes message to Cloud logging. - * - * @param level set the level for which message levels will be logged by the custom logger - */ - public CloudLoggingHandler(Level level) { - this(level, null, null); - } - - /** - * Creates a custom logging handler that publishes message to Cloud logging. - * - * @param level set the level for which message levels will be logged by the custom logger - * @param logName the name of the log to which log entries are written - */ - public CloudLoggingHandler(Level level, String logName) { - this(level, logName, null); - } - - /** - * Creates a custom logging handler that publishes message to Cloud logging. - * - * @param level set the level for which message levels will be logged by the custom logger - * @param logName the name of the log to which log entries are written - * @param destinationProjectId the value of cloud project id to which logs are sent to by the - * custom logger - */ - public CloudLoggingHandler(Level level, String logName, String destinationProjectId) { - baseLevel = - (level != null) ? (level.equals(DEFAULT_LOG_LEVEL) ? Level.FINEST : level) : Level.FINEST; - setLevel(baseLevel); - cloudLogName = logName != null ? logName : DEFAULT_LOG_NAME; - - // TODO(dnvindhya) read the value from config instead of taking it as an argument - if (Strings.isNullOrEmpty(destinationProjectId)) { - loggingOptions = LoggingOptions.getDefaultInstance(); - } else { - loggingOptions = LoggingOptions.newBuilder().setProjectId(destinationProjectId).build(); - } - loggingClient = loggingOptions.getService(); - } - - @Override - public void publish(LogRecord record) { - if (!(record instanceof LogRecordExtension)) { - throw new IllegalArgumentException("Expected record of type LogRecordExtension"); - } - Level logLevel = record.getLevel(); - GrpcLogRecord protoRecord = ((LogRecordExtension) record).getGrpcLogRecord(); - writeLog(protoRecord, logLevel); - } - - private void writeLog(GrpcLogRecord logProto, Level logLevel) { - if (loggingClient == null) { - throw new IllegalStateException("Logging client not initialized"); - } - try { - Severity cloudLogLevel = getCloudLoggingLevel(logLevel); - Map mapPayload = protoToMapConverter(logProto); - - // TODO(vindhyan): make sure all (int, long) values are not displayed as double - LogEntry grpcLogEntry = - LogEntry.newBuilder(JsonPayload.of(mapPayload)) - .setSeverity(cloudLogLevel) - .setLogName(cloudLogName) - .setResource(MonitoredResource.newBuilder("global").build()) - .build(); - loggingClient.write(Collections.singleton(grpcLogEntry)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @SuppressWarnings("unchecked") - private Map protoToMapConverter(GrpcLogRecord logProto) - throws InvalidProtocolBufferException, IOException { - JsonFormat.Printer printer = JsonFormat.printer().preservingProtoFieldNames(); - String recordJson = printer.print(logProto); - return (Map) JsonParser.parse(recordJson); - } - - @Override - public void flush() { - if (loggingClient == null) { - throw new IllegalStateException("Logging client not initialized"); - } - loggingClient.flush(); - } - - @Override - public synchronized void close() throws SecurityException { - if (loggingClient == null) { - throw new IllegalStateException("Logging client not initialized"); - } - try { - loggingClient.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private Severity getCloudLoggingLevel(Level recordLevel) { - switch (recordLevel.intValue()) { - case 300: // FINEST - case 400: // FINER - case 500: // FINE - return Severity.DEBUG; - case 700: // CONFIG - case 800: // INFO - return Severity.INFO; - case 900: // WARNING - return Severity.WARNING; - case 1000: // SEVERE - return Severity.ERROR; - default: - return Severity.DEFAULT; - } - } -} diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java index 4bd0b1148e..9b301f653d 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java @@ -49,12 +49,12 @@ public class GcpLogSink implements Sink { private static final Set kubernetesResourceLabelSet = ImmutableSet.of("project_id", "location", "cluster_name", "namespace_name", "pod_name", "container_name"); - private static final int FALLBACK_FLUSH_LIMIT = 100; + private static final long FALLBACK_FLUSH_LIMIT = 100L; private final Map customTags; private final Logging gcpLoggingClient; private final MonitoredResource kubernetesResource; - private final int flushLimit; - private int flushCounter; + private final Long flushLimit; + private long flushCounter; private static Logging createLoggingClient(String projectId) { LoggingOptions.Builder builder = LoggingOptions.newBuilder(); @@ -70,19 +70,19 @@ public class GcpLogSink implements Sink { * @param destinationProjectId cloud project id to write logs */ public GcpLogSink(String destinationProjectId, Map locationTags, - Map customTags, int flushLimit) { + Map customTags, Long flushLimit) { this(createLoggingClient(destinationProjectId), locationTags, customTags, flushLimit); } @VisibleForTesting GcpLogSink(Logging client, Map locationTags, Map customTags, - int flushLimit) { + Long flushLimit) { this.gcpLoggingClient = client; this.customTags = customTags != null ? customTags : new HashMap<>(); this.kubernetesResource = getResource(locationTags); - this.flushLimit = flushLimit != 0 ? flushLimit : FALLBACK_FLUSH_LIMIT; - this.flushCounter = 0; + this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT; + this.flushCounter = 0L; } /** @@ -116,10 +116,10 @@ public class GcpLogSink implements Sink { synchronized (this) { logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", event); gcpLoggingClient.write(Collections.singleton(grpcLogEntry)); - flushCounter += 1; + flushCounter = ++flushCounter; if (flushCounter >= flushLimit) { gcpLoggingClient.flush(); - flushCounter = 0; + flushCounter = 0L; } } } catch (Exception e) { diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/LogRecordExtension.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/LogRecordExtension.java deleted file mode 100644 index e046499121..0000000000 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/LogRecordExtension.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2022 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.gcp.observability.logging; - -import io.grpc.Internal; -import io.grpc.observabilitylog.v1.GrpcLogRecord; -import java.util.logging.Level; -import java.util.logging.LogRecord; - -/** - * An extension of java.util.logging.LogRecord which includes gRPC observability logging specific - * fields. - */ -@Internal -public final class LogRecordExtension extends LogRecord { - - private final GrpcLogRecord grpcLogRecord; - - public LogRecordExtension(Level recordLevel, GrpcLogRecord record) { - super(recordLevel, null); - this.grpcLogRecord = record; - } - - public GrpcLogRecord getGrpcLogRecord() { - return grpcLogRecord; - } - - // Adding a serial version UID since base class i.e LogRecord is Serializable - private static final long serialVersionUID = 1L; -} diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java index e64c4f7891..2b41c83fe3 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/LoggingTest.java @@ -65,7 +65,7 @@ public class LoggingTest { private static final Map customTags = ImmutableMap.of( "KEY1", "Value1", "KEY2", "VALUE2"); - private static final int flushLimit = 100; + private static final long flushLimit = 100L; /** * Cloud logging test using LoggingChannelProvider and LoggingServerProvider. diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java index 7c49fc77ce..ff70d219fc 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/ObservabilityConfigImplTest.java @@ -42,6 +42,7 @@ public class ObservabilityConfigImplTest { private static final String LOG_FILTERS = "{\n" + " \"enable_cloud_logging\": true,\n" + " \"destination_project_id\": \"grpc-testing\",\n" + + " \"flush_message_count\": 1000,\n" + " \"log_filters\": [{\n" + " \"pattern\": \"*/*\",\n" + " \"header_bytes\": 4096,\n" @@ -58,6 +59,11 @@ public class ObservabilityConfigImplTest { + " \"destination_project_id\": \"grpc-testing\"\n" + "}"; + private static final String FLUSH_MESSAGE_COUNT = "{\n" + + " \"enable_cloud_logging\": true,\n" + + " \"flush_message_count\": 500\n" + + "}"; + private static final String DISABLE_CLOUD_LOGGING = "{\n" + " \"enable_cloud_logging\": false\n" + "}"; @@ -79,7 +85,9 @@ public class ObservabilityConfigImplTest { observabilityConfig.parse("{}"); assertTrue(observabilityConfig.isEnableCloudLogging()); assertNull(observabilityConfig.getDestinationProjectId()); + assertNull(observabilityConfig.getFlushMessageCount()); assertNull(observabilityConfig.getLogFilters()); + assertNull(observabilityConfig.getEventTypes()); } @Test @@ -87,7 +95,9 @@ public class ObservabilityConfigImplTest { observabilityConfig.parse(DISABLE_CLOUD_LOGGING); assertFalse(observabilityConfig.isEnableCloudLogging()); assertNull(observabilityConfig.getDestinationProjectId()); + assertNull(observabilityConfig.getFlushMessageCount()); assertNull(observabilityConfig.getLogFilters()); + assertNull(observabilityConfig.getEventTypes()); } @Test @@ -97,11 +107,19 @@ public class ObservabilityConfigImplTest { assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing"); } + @Test + public void flushMessageCount() throws Exception { + observabilityConfig.parse(FLUSH_MESSAGE_COUNT); + assertTrue(observabilityConfig.isEnableCloudLogging()); + assertThat(observabilityConfig.getFlushMessageCount()).isEqualTo(500L); + } + @Test public void logFilters() throws IOException { observabilityConfig.parse(LOG_FILTERS); assertTrue(observabilityConfig.isEnableCloudLogging()); assertThat(observabilityConfig.getDestinationProjectId()).isEqualTo("grpc-testing"); + assertThat(observabilityConfig.getFlushMessageCount()).isEqualTo(1000L); List logFilters = observabilityConfig.getLogFilters(); assertThat(logFilters).hasSize(2); assertThat(logFilters.get(0).pattern).isEqualTo("*/*"); diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java index 07fe9b1d3f..626e8c4619 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java @@ -65,7 +65,7 @@ public class GcpLogSinkTest { "pod_name", "app1-6c7c58f897-n92c5"); private static final Map customTags = ImmutableMap.of("KEY1", "Value1", "KEY2", "VALUE2"); - private static final int flushLimit = 10; + private static final long flushLimit = 10L; private final long seqId = 1; private final String serviceName = "service"; private final String methodName = "method"; @@ -164,7 +164,7 @@ public class GcpLogSinkTest { @Test public void verifyFlush() { - int lowerFlushLimit = 2; + long lowerFlushLimit = 2L; GcpLogSink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, lowerFlushLimit); mockSink.write(logProto); verify(mockLogging, never()).flush();