From bf3830a8b9c6ca12d92ce8786ba4ff966a6d6a2b Mon Sep 17 00:00:00 2001 From: jason plumb <75337021+breedx-splk@users.noreply.github.com> Date: Tue, 22 Jun 2021 00:27:03 -0700 Subject: [PATCH] support logs in the fake test backend (#3374) * support logs in the fake test backend * fix copypasta --- .../fakebackend/FakeBackendMain.java | 22 +++++++++++++ .../fakebackend/FakeLogsCollectorService.java | 31 +++++++++++++++++++ 2 files changed, 53 insertions(+) create mode 100644 smoke-tests/fake-backend/src/main/java/io/opentelemetry/smoketest/fakebackend/FakeLogsCollectorService.java diff --git a/smoke-tests/fake-backend/src/main/java/io/opentelemetry/smoketest/fakebackend/FakeBackendMain.java b/smoke-tests/fake-backend/src/main/java/io/opentelemetry/smoketest/fakebackend/FakeBackendMain.java index 790dd047af..e3370fb3cd 100644 --- a/smoke-tests/fake-backend/src/main/java/io/opentelemetry/smoketest/fakebackend/FakeBackendMain.java +++ b/smoke-tests/fake-backend/src/main/java/io/opentelemetry/smoketest/fakebackend/FakeBackendMain.java @@ -30,6 +30,7 @@ import com.linecorp.armeria.server.Server; import com.linecorp.armeria.server.grpc.GrpcService; import com.linecorp.armeria.server.healthcheck.HealthCheckService; import io.netty.buffer.ByteBufOutputStream; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import java.io.IOException; @@ -68,6 +69,15 @@ public class FakeBackendMain { marshaller.writeValue(value, gen); } }); + serializers.addSerializer( + new StdSerializer<>(ExportLogsServiceRequest.class) { + @Override + public void serialize( + ExportLogsServiceRequest value, JsonGenerator gen, SerializerProvider provider) + throws IOException { + marshaller.writeValue(value, gen); + } + }); module.setSerializers(serializers); mapper.addModule(module); OBJECT_MAPPER = mapper.build(); @@ -76,18 +86,21 @@ public class FakeBackendMain { public static void main(String[] args) { var traceCollector = new FakeTraceCollectorService(); var metricsCollector = new FakeMetricsCollectorService(); + var logsCollector = new FakeLogsCollectorService(); var server = Server.builder() .http(8080) .service(GrpcService.builder() .addService(traceCollector) .addService(metricsCollector) + .addService(logsCollector) .build()) .service( "/clear", (ctx, req) -> { traceCollector.clearRequests(); metricsCollector.clearRequests(); + logsCollector.clearRequests(); return HttpResponse.of(HttpStatus.OK); }) .service( @@ -108,6 +121,15 @@ public class FakeBackendMain { return HttpResponse.of( HttpStatus.OK, MediaType.JSON, HttpData.wrap(buf.buffer())); }) + .service( + "/get-logs", + (ctx, req) -> { + var requests = logsCollector.getRequests(); + var buf = new ByteBufOutputStream(ctx.alloc().buffer()); + OBJECT_MAPPER.writeValue((OutputStream) buf, requests); + return HttpResponse.of( + HttpStatus.OK, MediaType.JSON, HttpData.wrap(buf.buffer())); + }) .service("/health", HealthCheckService.of()) .build(); diff --git a/smoke-tests/fake-backend/src/main/java/io/opentelemetry/smoketest/fakebackend/FakeLogsCollectorService.java b/smoke-tests/fake-backend/src/main/java/io/opentelemetry/smoketest/fakebackend/FakeLogsCollectorService.java new file mode 100644 index 0000000000..ee550a61d4 --- /dev/null +++ b/smoke-tests/fake-backend/src/main/java/io/opentelemetry/smoketest/fakebackend/FakeLogsCollectorService.java @@ -0,0 +1,31 @@ +package io.opentelemetry.smoketest.fakebackend; + +import com.google.common.collect.ImmutableList; +import io.grpc.stub.StreamObserver; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse; +import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +public class FakeLogsCollectorService extends LogsServiceGrpc.LogsServiceImplBase{ + + private BlockingQueue exportRequests = new LinkedBlockingDeque<>(); + + List getRequests() { + return ImmutableList.copyOf(exportRequests); + } + + void clearRequests() { + exportRequests.clear(); + } + + @Override + public void export(ExportLogsServiceRequest request, + StreamObserver responseObserver) { + exportRequests.add(request); + responseObserver.onNext(ExportLogsServiceResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } +}