From 1b799adc1955d3b4f3bfcf70205feb39db0e188d Mon Sep 17 00:00:00 2001 From: DNVindhya Date: Fri, 17 Mar 2023 15:53:46 -0700 Subject: [PATCH] gcp-observability: Update logging fields for GA and use custom BatchingSettings (#9959) This commit updates the following in gcp observability logging schema * `payload.status_code` will be of type `google.rpc.Code` instead of `uint32`. * names in enum `Address.TYPE` Use custom batching settings for [LoggingOptions](https://javadoc.io/doc/com.google.cloud/google-cloud-logging/latest/com/google/cloud/logging/LoggingOptions.html) Note: Upgraded `com.google.cloud:google-cloud-logging` from `3.6.1` to `3.14.5`. --- gcp-observability/build.gradle | 16 ++++++------- .../observability/interceptors/LogHelper.java | 9 +++---- .../gcp/observability/logging/GcpLogSink.java | 24 ++++++++++++++++++- .../v1/observabilitylog.proto | 9 +++---- .../interceptors/LogHelperTest.java | 7 +++--- 5 files changed, 45 insertions(+), 20 deletions(-) diff --git a/gcp-observability/build.gradle b/gcp-observability/build.gradle index 85e6e9eeb0..63850efc4e 100644 --- a/gcp-observability/build.gradle +++ b/gcp-observability/build.gradle @@ -20,11 +20,13 @@ tasks.named("compileJava").configure { } dependencies { - def cloudLoggingVersion = '3.6.1' + def cloudLoggingVersion = '3.14.5' annotationProcessor libraries.auto.value api project(':grpc-api') - + + // TODO(dnvindhya): Prefer using our own libraries, update the dependencies + // in gradle/libs.versions instead implementation project(':grpc-protobuf'), project(':grpc-stub'), project(':grpc-alts'), @@ -35,12 +37,10 @@ dependencies { libraries.opencensus.exporter.trace.stackdriver, project(':grpc-xds'), // Align grpc versions project(':grpc-services'), // Align grpc versions - libraries.animalsniffer.annotations, // Prefer our version - libraries.google.auth.credentials, // Prefer our version - libraries.protobuf.java.util, // Prefer our version - libraries.gson, // Prefer our version - libraries.perfmark.api, // Prefer our version - libraries.re2j, // Prefer our version + ('com.google.protobuf:protobuf-java:3.21.12'), + ('com.google.api.grpc:proto-google-common-protos:2.14.2'), + ('com.google.auth:google-auth-library-oauth2-http:1.16.0'), + ('io.opencensus:opencensus-api:0.31.1'), ('com.google.guava:guava:31.1-jre') runtimeOnly libraries.opencensus.impl diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/LogHelper.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/LogHelper.java index 9b46699efa..abd44c4365 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/LogHelper.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/interceptors/LogHelper.java @@ -23,6 +23,7 @@ import static io.grpc.InternalMetadata.BASE64_ENCODING_OMIT_PADDING; import com.google.common.base.Joiner; import com.google.protobuf.ByteString; import com.google.protobuf.Duration; +import com.google.rpc.Code; import io.grpc.Attributes; import io.grpc.Deadline; import io.grpc.Grpc; @@ -182,7 +183,7 @@ public class LogHelper { PayloadBuilderHelper pair = createMetadataProto(metadata, maxHeaderBytes); - pair.payloadBuilder.setStatusCode(status.getCode().value()); + pair.payloadBuilder.setStatusCode(Code.forNumber(status.getCode().value())); String statusDescription = status.getDescription(); if (statusDescription != null) { pair.payloadBuilder.setStatusMessage(statusDescription); @@ -404,10 +405,10 @@ public class LogHelper { if (address instanceof InetSocketAddress) { InetAddress inetAddress = ((InetSocketAddress) address).getAddress(); if (inetAddress instanceof Inet4Address) { - builder.setType(Address.Type.TYPE_IPV4) + builder.setType(Address.Type.IPV4) .setAddress(InetAddressUtil.toAddrString(inetAddress)); } else if (inetAddress instanceof Inet6Address) { - builder.setType(Address.Type.TYPE_IPV6) + builder.setType(Address.Type.IPV6) .setAddress(InetAddressUtil.toAddrString(inetAddress)); } else { logger.log(Level.SEVERE, "unknown type of InetSocketAddress: {}", address); @@ -417,7 +418,7 @@ public class LogHelper { } else if (address.getClass().getName().equals("io.netty.channel.unix.DomainSocketAddress")) { // To avoid a compiled time dependency on grpc-netty, we check against the // runtime class name. - builder.setType(Address.Type.TYPE_UNIX) + builder.setType(Address.Type.UNIX) .setAddress(address.toString()); } else { builder.setType(Address.Type.TYPE_UNKNOWN).setAddress(address.toString()); diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java index e91f310e64..02ff4049eb 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java @@ -18,12 +18,15 @@ package io.grpc.gcp.observability.logging; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowController; import com.google.cloud.MonitoredResource; import com.google.cloud.logging.LogEntry; import com.google.cloud.logging.Logging; import com.google.cloud.logging.LoggingOptions; import com.google.cloud.logging.Payload.JsonPayload; import com.google.cloud.logging.Severity; +import com.google.cloud.logging.v2.stub.LoggingServiceV2StubSettings; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -41,6 +44,7 @@ import java.util.Objects; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; +import org.threeten.bp.Duration; /** * Sink for Google Cloud Logging. @@ -102,6 +106,7 @@ public class GcpLogSink implements Sink { if (servicesToExclude.contains(logProto.getServiceName())) { return; } + LogEntry grpcLogEntry = null; try { GrpcLogRecord.EventType eventType = logProto.getType(); // TODO(DNVindhya): make sure all (int, long) values are not displayed as double @@ -117,11 +122,18 @@ public class GcpLogSink implements Sink { if (!customTags.isEmpty()) { grpcLogEntryBuilder.setLabels(customTags); } - LogEntry grpcLogEntry = grpcLogEntryBuilder.build(); + grpcLogEntry = grpcLogEntryBuilder.build(); synchronized (this) { logger.log(Level.FINEST, "Writing gRPC event : {0} to Cloud Logging", eventType); gcpLoggingClient.write(Collections.singleton(grpcLogEntry)); } + } catch (FlowController.FlowControlRuntimeException e) { + String grpcLogEntryString = null; + if (grpcLogEntry != null) { + grpcLogEntryString = grpcLogEntry.toStructuredJsonString(); + } + logger.log(Level.SEVERE, "Limit exceeded while writing log entry to cloud logging"); + logger.log(Level.SEVERE, "Log entry = ", grpcLogEntryString); } catch (Exception e) { logger.log(Level.SEVERE, "Caught exception while writing to Cloud Logging", e); } @@ -132,6 +144,16 @@ public class GcpLogSink implements Sink { if (!Strings.isNullOrEmpty(projectId)) { builder.setProjectId(projectId); } + BatchingSettings loggingDefaultBatchingSettings = LoggingServiceV2StubSettings.newBuilder() + .writeLogEntriesSettings().getBatchingSettings(); + // Custom batching settings + BatchingSettings grpcLoggingVBatchingSettings = loggingDefaultBatchingSettings.toBuilder() + .setDelayThreshold(Duration.ofSeconds(1L)).setFlowControlSettings( + loggingDefaultBatchingSettings.getFlowControlSettings().toBuilder() + .setMaxOutstandingRequestBytes(52428800L) //50 MiB + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException) + .build()).build(); + builder.setBatchingSettings(grpcLoggingVBatchingSettings); return builder.build().getService(); } diff --git a/gcp-observability/src/main/proto/grpc/observabilitylog/v1/observabilitylog.proto b/gcp-observability/src/main/proto/grpc/observabilitylog/v1/observabilitylog.proto index 85ef00ac2d..7d278aa08a 100644 --- a/gcp-observability/src/main/proto/grpc/observabilitylog/v1/observabilitylog.proto +++ b/gcp-observability/src/main/proto/grpc/observabilitylog/v1/observabilitylog.proto @@ -20,6 +20,7 @@ package grpc.observabilitylog.v1; import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; +import "google/rpc/code.proto"; option java_multiple_files = true; option java_package = "io.grpc.observabilitylog.v1"; @@ -97,7 +98,7 @@ message Payload { // the RPC timeout value google.protobuf.Duration timeout = 2; // The gRPC status code - uint32 status_code = 3; + google.rpc.Code status_code = 3; // The gRPC status message string status_message = 4; // The value of the grpc-status-details-bin metadata key, if any. @@ -115,9 +116,9 @@ message Payload { message Address { enum Type { TYPE_UNKNOWN = 0; - TYPE_IPV4 = 1; // in 1.2.3.4 form - TYPE_IPV6 = 2; // IPv6 canonical form (RFC5952 section 4) - TYPE_UNIX = 3; // UDS string + IPV4 = 1; // in 1.2.3.4 form + IPV6 = 2; // IPv6 canonical form (RFC5952 section 4) + UNIX = 3; // UDS string } Type type = 1; string address = 2; diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java index 73704eb418..a6d9fab702 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java @@ -29,6 +29,7 @@ import static org.mockito.Mockito.verify; import com.google.protobuf.ByteString; import com.google.protobuf.Duration; import com.google.protobuf.util.Durations; +import com.google.rpc.Code; import io.grpc.Attributes; import io.grpc.Grpc; import io.grpc.Metadata; @@ -94,7 +95,7 @@ public class LogHelperTest { assertThat(LogHelper.socketAddressToProto(socketAddress)) .isEqualTo(Address .newBuilder() - .setType(Address.Type.TYPE_IPV4) + .setType(Address.Type.IPV4) .setAddress("127.0.0.1") .setIpPort(12345) .build()); @@ -109,7 +110,7 @@ public class LogHelperTest { assertThat(LogHelper.socketAddressToProto(socketAddress)) .isEqualTo(Address .newBuilder() - .setType(Address.Type.TYPE_IPV6) + .setType(Address.Type.IPV6) .setAddress("2001:db8::2:1") // RFC 5952 section 4: ipv6 canonical form required .setIpPort(12345) .build()); @@ -454,7 +455,7 @@ public class LogHelperTest { builder.setPeer(LogHelper.socketAddressToProto(peer)); builder.setPayload( builder.getPayload().toBuilder() - .setStatusCode(Status.INTERNAL.getCode().value()) + .setStatusCode(Code.forNumber(Status.INTERNAL.getCode().value())) .setStatusMessage("test description") .build()); GrpcLogRecord base = builder.build();