pkg/metrics/e2e_test.go

336 lines
9.4 KiB
Go

/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"sort"
"sync"
"testing"
"time"
ocmetrics "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
ocresource "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"go.opencensus.io/resource"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
proto "google.golang.org/protobuf/proto"
logtesting "knative.dev/pkg/logging/testing"
"knative.dev/pkg/metrics/metricskey"
"knative.dev/pkg/metrics/metricstest"
)
var NamespaceTagKey = tag.MustNewKey(metricskey.LabelNamespaceName)
type metricExtract struct {
Name string
Labels map[string]string
Value int64
}
func (m metricExtract) Key() string {
return fmt.Sprintf("%s<%s>", m.Name, resource.EncodeLabels(m.Labels))
}
func (m metricExtract) String() string {
return fmt.Sprintf("%s:%d", m.Key(), m.Value)
}
func sortMetrics() cmp.Option {
return cmp.Transformer("Sort", func(in []metricExtract) []string {
out := make([]string, 0, len(in))
seen := map[string]int{}
for _, m := range in {
// Keep only the newest report for a key
key := m.Key()
if seen[key] == 0 {
out = append(out, m.String())
seen[key] = len(out) // Store address+1 to avoid doubling first item.
} else {
out[seen[key]-1] = m.String()
}
}
sort.Strings(out)
return out
})
}
// Begin table tests for exporters
func TestMetricsExport(t *testing.T) {
TestOverrideBundleCount = 1
t.Cleanup(func() { TestOverrideBundleCount = 0 })
ocFake := openCensusFake{address: "localhost:12345"}
prometheusPort := 19090
configForBackend := func(backend metricsBackend) ExporterOptions {
return ExporterOptions{
Domain: metricsDomain,
Component: testComponent,
PrometheusPort: prometheusPort,
ConfigMap: map[string]string{
BackendDestinationKey: string(backend),
collectorAddressKey: ocFake.address,
reportingPeriodKey: "1",
},
}
}
resources := []*resource.Resource{{
Type: "revision",
Labels: map[string]string{
"project": "p1",
"revision": "r1",
},
}, {
Type: "revision",
Labels: map[string]string{
"project": "p1",
"revision": "r2",
},
}}
gauge := stats.Int64("testing/value", "Stored value", stats.UnitDimensionless)
counter := stats.Int64("export counts", "Times through the export", stats.UnitDimensionless)
gaugeView := &view.View{
Name: "testing/value",
Description: "Test value",
Measure: gauge,
Aggregation: view.LastValue(),
}
resourceCounter := &view.View{
Name: "resource_global_export_count",
Description: "Count of exports via RegisterResourceView.",
Measure: counter,
Aggregation: view.Count(),
}
globalCounter := &view.View{
Name: "global_export_counts",
Description: "Count of exports via standard OpenCensus view.",
Measure: counter,
Aggregation: view.Count(),
}
expected := []metricExtract{
{"knative.dev/project/testComponent/global_export_counts", map[string]string{}, 2},
{"knative.dev/project/testComponent/resource_global_export_count", map[string]string{}, 2},
{"knative.dev/project/testComponent/testing/value", map[string]string{"project": "p1", "revision": "r1"}, 0},
{"knative.dev/project/testComponent/testing/value", map[string]string{"project": "p1", "revision": "r2"}, 1},
}
harnesses := []struct {
name string
init func(t *testing.T) error
validate func(t *testing.T)
}{{
name: "Prometheus",
init: func(t *testing.T) error {
if err := UpdateExporter(context.Background(), configForBackend(prometheus), logtesting.TestLogger(t)); err != nil {
return err
}
// Wait for the webserver to actually start serving metrics
return wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", prometheusPort))
if err != nil {
defer resp.Body.Close()
}
return err == nil && resp.StatusCode == http.StatusOK, nil
})
},
validate: func(t *testing.T) {
metricstest.EnsureRecorded()
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", prometheusPort))
if err != nil {
t.Fatalf("failed to fetch prometheus metrics: %+v", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("failed to read prometheus response: %+v", err)
}
const want = `# HELP testComponent_global_export_counts Count of exports via standard OpenCensus view.
# TYPE testComponent_global_export_counts counter
testComponent_global_export_counts 2
# HELP testComponent_resource_global_export_count Count of exports via RegisterResourceView.
# TYPE testComponent_resource_global_export_count counter
testComponent_resource_global_export_count 2
# HELP testComponent_testing_value Test value
# TYPE testComponent_testing_value gauge
testComponent_testing_value{project="p1",revision="r1"} 0
testComponent_testing_value{project="p1",revision="r2"} 1
`
if diff := cmp.Diff(want, string(body)); diff != "" {
t.Errorf("Unexpected prometheus output (-want +got):\n%s", diff)
}
},
}, {
name: "OpenCensus",
init: func(t *testing.T) error {
if err := ocFake.start(len(resources) + 1); err != nil {
return err
}
t.Log("Created exporter at", ocFake.address)
return UpdateExporter(context.Background(), configForBackend(openCensus), logtesting.TestLogger(t))
},
validate: func(t *testing.T) {
metricstest.EnsureRecorded()
records := []metricExtract{}
// Each Resource has an independent thread invoking reportView; this
// set avoids the race condition where we get two reports for the
// same metric on the channel before we get any reports for another
// metric.
keys := map[string]struct{}{}
timeout := time.After(5 * time.Second)
loop:
for {
select {
case record := <-ocFake.published:
if record == nil {
continue loop
}
for _, m := range record.Metrics {
if len(m.Timeseries) > 0 {
labels := map[string]string{}
if record.Resource != nil {
labels = record.Resource.Labels
}
metric := metricExtract{
Name: m.MetricDescriptor.Name,
Labels: labels,
Value: m.Timeseries[0].Points[0].GetInt64Value(),
}
records = append(records, metric)
keys[metric.Key()] = struct{}{}
}
}
if len(keys) >= len(expected) {
break loop
}
case <-timeout:
t.Error("Timeout reading input")
break loop
}
}
if diff := cmp.Diff(expected, records, sortMetrics()); diff != "" {
t.Errorf("Unexpected OpenCensus exports (-want +got):\n%s", diff)
}
},
}}
for _, c := range harnesses {
t.Run(c.name, func(t *testing.T) {
ClearMetersForTest()
if err := c.init(t); err != nil {
t.Fatalf("unable to init: %+v", err)
}
view.Register(globalCounter)
if err := RegisterResourceView(gaugeView, resourceCounter); err != nil {
t.Fatal("Unable to register views:", err)
}
t.Cleanup(func() {
view.Unregister(globalCounter)
UnregisterResourceView(gaugeView, resourceCounter)
})
for i, r := range resources {
ctx := context.Background()
Record(ctx, counter.M(int64(1)))
if r != nil {
ctx = metricskey.WithResource(ctx, *r)
}
Record(ctx, gauge.M(int64(i)))
}
c.validate(t)
})
}
}
type openCensusFake struct {
ocmetrics.UnimplementedMetricsServiceServer
address string
srv *grpc.Server
exports sync.WaitGroup
wg sync.WaitGroup
published chan *ocmetrics.ExportMetricsServiceRequest
}
func (oc *openCensusFake) start(expectedStreams int) error {
ln, err := net.Listen("tcp", oc.address)
if err != nil {
return err
}
oc.published = make(chan *ocmetrics.ExportMetricsServiceRequest, 100)
oc.srv = grpc.NewServer()
ocmetrics.RegisterMetricsServiceServer(oc.srv, oc)
// Run the server in the background.
oc.wg.Add(1)
go func() {
oc.srv.Serve(ln)
oc.wg.Done()
oc.wg.Wait()
close(oc.published)
}()
oc.exports.Add(expectedStreams)
go oc.stop()
return nil
}
func (oc *openCensusFake) stop() {
oc.exports.Wait()
oc.srv.Stop()
}
func (oc *openCensusFake) Export(stream ocmetrics.MetricsService_ExportServer) error {
var streamResource *ocresource.Resource
oc.wg.Add(1)
defer oc.wg.Done()
metricSeen := false
for {
in, err := stream.Recv()
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return err
}
if in.Resource != nil {
// The stream is stateful, keep track of the last Resource seen.
streamResource = in.Resource
}
if len(in.Metrics) > 0 {
if in.Resource == nil {
in.Resource = streamResource
}
oc.published <- proto.Clone(in).(*ocmetrics.ExportMetricsServiceRequest)
if !metricSeen {
oc.exports.Done()
metricSeen = true
}
}
}
}