Upstream the logstream package from knative/serving. (#472)

This also upstreams the utility for generating test resource names upon which it builds.
This commit is contained in:
Matt Moore 2019-06-18 20:29:46 -07:00 committed by Knative Prow Robot
parent 55ded05b4e
commit d90a9bc97d
5 changed files with 300 additions and 0 deletions

36
test/helpers/name.go Normal file
View File

@ -0,0 +1,36 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helpers
import (
"strings"
"testing"
)
const (
testNamePrefix = "Test"
)
// ObjectPrefixForTest returns the name prefix for this test's random names.
func ObjectPrefixForTest(t *testing.T) string {
return MakeK8sNamePrefix(strings.TrimPrefix(t.Name(), testNamePrefix))
}
// ObjectNameForTest generates a random object name based on the test name.
func ObjectNameForTest(t *testing.T) string {
return AppendRandomString(ObjectPrefixForTest(t))
}

21
test/logstream/doc.go Normal file
View File

@ -0,0 +1,21 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package logstream lets end-to-end tests incorporate controller logs
// into the error output of tests. It is enabled by setting the
// SYSTEM_NAMESPACE environment variable, which tells this package
// what namespace to stream logs from.
package logstream

View File

@ -0,0 +1,52 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logstream
import (
"os"
"testing"
"github.com/knative/pkg/system"
)
// Canceler is the type of a function returned when a logstream is started to be
// deferred so that the logstream can be stopped when the test is complete.
type Canceler func()
// Start begins streaming the logs from system components with a `key:` matching
// `test.ObjectNameForTest(t)` to `t.Log`. It returns a Canceler, which must
// be called before the test completes.
func Start(t *testing.T) Canceler {
return stream.Start(t)
}
type streamer interface {
Start(t *testing.T) Canceler
}
var stream streamer
func init() {
ns := os.Getenv(system.NamespaceEnvKey)
if ns != "" {
// If SYSTEM_NAMESPACE is set, then start the stream.
stream = &kubelogs{namespace: ns}
} else {
// Otherwise set up a null stream.
stream = &null{}
}
}

161
test/logstream/kubelogs.go Normal file
View File

@ -0,0 +1,161 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logstream
import (
"bufio"
"encoding/json"
"fmt"
"strings"
"sync"
"testing"
"time"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/knative/pkg/ptr"
"github.com/knative/pkg/test"
"github.com/knative/pkg/test/helpers"
)
type kubelogs struct {
namespace string
once sync.Once
m sync.RWMutex
keys map[string]logger
err error
}
type logger func(string, ...interface{})
var _ streamer = (*kubelogs)(nil)
func (k *kubelogs) init(t *testing.T) {
k.keys = make(map[string]logger)
kc, err := test.NewKubeClient(test.Flags.Kubeconfig, test.Flags.Cluster)
if err != nil {
t.Errorf("Error loading client config: %v", err)
}
// List the pods in the given namespace.
pl, err := kc.Kube.CoreV1().Pods(k.namespace).List(metav1.ListOptions{})
if err != nil {
t.Errorf("Error listing pods: %v", err)
}
eg := errgroup.Group{}
for _, pod := range pl.Items {
// Grab data from all containers in the pods. We need this in case
// an envoy sidecar is injected for mesh installs. This should be
// equivalent to --all-containers.
for _, container := range pod.Spec.Containers {
// Required for capture below.
pod, container := pod, container
eg.Go(func() error {
options := &corev1.PodLogOptions{
Container: container.Name,
// Follow directs the api server to continuously stream logs back.
Follow: true,
// Only return new logs (this value is being used for "epsilon").
SinceSeconds: ptr.Int64(1),
}
req := kc.Kube.CoreV1().Pods(k.namespace).GetLogs(pod.Name, options)
stream, err := req.Stream()
if err != nil {
return err
}
defer stream.Close()
// Read this container's stream.
for scanner := bufio.NewScanner(stream); scanner.Scan(); {
k.handleLine(scanner.Text())
}
return fmt.Errorf("logstream completed prematurely for: %s/%s", pod.Name, container.Name)
})
}
}
// Monitor the error group in the background and surface an error on the kubelogs
// in case anything had an active stream open.
go func() {
if err := eg.Wait(); err != nil {
k.m.Lock()
defer k.m.Unlock()
k.err = err
}
}()
}
func (k *kubelogs) handleLine(l string) {
// This holds the standard structure of our logs.
var line struct {
Level string `json:"level"`
Timestamp time.Time `json:"ts"`
Controller string `json:"knative.dev/controller"`
Caller string `json:"caller"`
Key string `json:"knative.dev/key"`
Message string `json:"msg"`
// TODO(mattmoor): Parse out more context.
}
if err := json.Unmarshal([]byte(l), &line); err != nil {
// Ignore malformed lines.
return
}
if line.Key == "" {
return
}
k.m.RLock()
defer k.m.RUnlock()
for name, logf := range k.keys {
// TODO(mattmoor): Do a slightly smarter match.
if !strings.Contains(line.Key, name) {
continue
}
// TODO(mattmoor): What information do we want to display?
logf("[%s] %s", line.Controller, line.Message)
}
}
// Start implements streamer
func (k *kubelogs) Start(t *testing.T) Canceler {
k.once.Do(func() { k.init(t) })
name := helpers.ObjectPrefixForTest(t)
// Register a key
k.m.Lock()
defer k.m.Unlock()
k.keys[name] = t.Logf
// Return a function that unregisters that key.
return func() {
k.m.Lock()
defer k.m.Unlock()
delete(k.keys, name)
if k.err != nil {
t.Errorf("error during logstream: %v", k.err)
}
}
}

30
test/logstream/null.go Normal file
View File

@ -0,0 +1,30 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logstream
import (
"testing"
)
type null struct{}
var _ streamer = (*null)(nil)
// Start implements streamer
func (*null) Start(t *testing.T) Canceler {
return func() {}
}