Enable multi namespace log collection in e2e test (#1959)

* enable multi namespace log collection in e2e test

* Update test/logstream/README.md

Co-authored-by: Victor Agababov <vagababov@gmail.com>

* Update test/logstream/README.md

Co-authored-by: Victor Agababov <vagababov@gmail.com>

* Update test/logstream/interface.go

Co-authored-by: Victor Agababov <vagababov@gmail.com>

* var grouping of global variables

* fix broken build

* Update test/logstream/README.md

Co-authored-by: Victor Agababov <vagababov@gmail.com>

* Update test/logstream/v2/stream_test.go

Co-authored-by: Victor Agababov <vagababov@gmail.com>

* Update test/logstream/v2/stream_test.go

Co-authored-by: Victor Agababov <vagababov@gmail.com>

* Update test/logstream/v2/stream.go

Co-authored-by: Victor Agababov <vagababov@gmail.com>

* minor misc style fixes

* closing brackets spacing

* Update test/logstream/v2/stream.go

Co-authored-by: Victor Agababov <vagababov@gmail.com>

Co-authored-by: Victor Agababov <vagababov@gmail.com>
This commit is contained in:
Igor Belianski 2020-12-15 12:24:58 -08:00 committed by GitHub
parent a1d2289bb5
commit ef8048c0ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 237 additions and 86 deletions

View File

@ -12,11 +12,16 @@ This is a guide to start using `logstream` in your e2e testing.
and linking it like
[this](https://github.com/knative/serving/blob/e797247322b5aa35001152d2a2715dbc20a86cc4/test/conformance.go#L20-L23)
2) Test resources must be named with
2. Test resources must be named with
[`test.ObjectNameForTest(t)`](https://github.com/knative/networking/blob/40ef99aa5db0d38730a89a1de7e5b28b8ef6eed5/vendor/knative.dev/pkg/test/helpers/name.go#L50)
3. At the start of your test add: `t.Cleanup(logstream.Start(t))`
4. To enable logcapture from containers across multiple namespaces configure SYSTEM_NAMESPACE
to contains a csv list of namespaces (`knative-serving,knative-test ??????{}`). Specific, well
known containers that do not produce key decorated logs (see detailed description below) need
to be enumerated in WellKnownContainers in stream.go.
With that, you will start getting logs from the processes in the system
namespace interleaved into your test output via `t.Log`.

View File

@ -19,6 +19,7 @@ package logstream
import (
"context"
"os"
"strings"
"sync"
"knative.dev/pkg/system"
@ -51,7 +52,9 @@ func Start(t ti) Canceler {
return
}
stream = &shim{logstreamv2.FromNamespace(context.TODO(), kc, ns)}
// handle case when ns contains a csv list
namespaces := strings.Split(ns, ",")
stream = &shim{logstreamv2.FromNamespaces(context.Background(), kc, namespaces)}
} else {
// Otherwise set up a null stream.

View File

@ -34,19 +34,28 @@ import (
"knative.dev/pkg/ptr"
)
func FromNamespaces(ctx context.Context, c kubernetes.Interface, namespaces []string) Source {
return &namespaceSource{
ctx: ctx,
kc: c,
namespaces: namespaces,
keys: make(map[string]Callback, 1),
}
}
func FromNamespace(ctx context.Context, c kubernetes.Interface, namespace string) Source {
return &namespaceSource{
ctx: ctx,
kc: c,
namespace: namespace,
keys: make(map[string]Callback, 1),
ctx: ctx,
kc: c,
namespaces: []string{namespace},
keys: make(map[string]Callback, 1),
}
}
type namespaceSource struct {
namespace string
kc kubernetes.Interface
ctx context.Context
namespaces []string
kc kubernetes.Interface
ctx context.Context
m sync.RWMutex
once sync.Once
@ -57,7 +66,7 @@ type namespaceSource struct {
func (s *namespaceSource) StartStream(name string, l Callback) (Canceler, error) {
s.once.Do(func() { s.watchErr = s.watchPods() })
if s.watchErr != nil {
return nil, fmt.Errorf("failed to watch pods in namespace %q: %w", s.namespace, s.watchErr)
return nil, fmt.Errorf("failed to watch pods in one of the namespace(s) %q: %w", s.namespaces, s.watchErr)
}
// Register a key
@ -74,38 +83,40 @@ func (s *namespaceSource) StartStream(name string, l Callback) (Canceler, error)
}
func (s *namespaceSource) watchPods() error {
wi, err := s.kc.CoreV1().Pods(s.namespace).Watch(s.ctx, metav1.ListOptions{})
if err != nil {
return err
}
go func() {
defer wi.Stop()
watchedPods := sets.NewString()
for {
select {
case <-s.ctx.Done():
return
case ev := <-wi.ResultChan():
// We have reports of this being randomly nil.
if ev.Object == nil || reflect.ValueOf(ev.Object).IsNil() {
continue
}
p := ev.Object.(*corev1.Pod)
switch ev.Type {
case watch.Deleted:
watchedPods.Delete(p.Name)
case watch.Added, watch.Modified:
if !watchedPods.Has(p.Name) && isPodReady(p) {
watchedPods.Insert(p.Name)
s.startForPod(p)
}
}
}
for _, ns := range s.namespaces {
wi, err := s.kc.CoreV1().Pods(ns).Watch(s.ctx, metav1.ListOptions{})
if err != nil {
return err
}
}()
go func() {
defer wi.Stop()
watchedPods := sets.NewString()
for {
select {
case <-s.ctx.Done():
return
case ev := <-wi.ResultChan():
// We have reports of this being randomly nil.
if ev.Object == nil || reflect.ValueOf(ev.Object).IsNil() {
continue
}
p := ev.Object.(*corev1.Pod)
switch ev.Type {
case watch.Deleted:
watchedPods.Delete(p.Name)
case watch.Added, watch.Modified:
if !watchedPods.Has(p.Name) && isPodReady(p) {
watchedPods.Insert(p.Name)
s.startForPod(p)
}
}
}
}
}()
}
return nil
}
@ -119,9 +130,12 @@ func (s *namespaceSource) startForPod(pod *corev1.Pod) {
psn, pn, cn := pod.Namespace, pod.Name, container.Name
handleLine := s.handleLine
if cn == ChaosDuck {
// Specialcase logs from chaosduck to be able to easily see when pods
// have been killed throughout all tests.
if wellKnownContainers.Has(cn) {
// Specialcase logs from chaosduck, queueproxy etc.
// - ChaosDuck logs enable easy
// monitoring of killed pods throughout all tests.
// - QueueProxy logs enable
// debugging troubleshooting data plane request handling issues.
handleLine = s.handleGenericLine
}
@ -137,13 +151,13 @@ func (s *namespaceSource) startForPod(pod *corev1.Pod) {
req := s.kc.CoreV1().Pods(psn).GetLogs(pn, options)
stream, err := req.Stream(context.Background())
if err != nil {
s.handleGenericLine([]byte(err.Error()), pn)
s.handleGenericLine([]byte(err.Error()), pn, cn)
return
}
defer stream.Close()
// Read this container's stream.
for scanner := bufio.NewScanner(stream); scanner.Scan(); {
handleLine(scanner.Bytes(), pn)
handleLine(scanner.Bytes(), pn, cn)
}
// Pods get killed with chaos duck, so logs might end
// before the test does. So don't report an error here.
@ -167,9 +181,16 @@ const (
timeFormat = "15:04:05.000"
// ChaosDuck is the well known name for the chaosduck.
ChaosDuck = "chaosduck"
// QueueProxy is the well known name for the queueproxy.
QueueProxy = "queueproxy"
)
func (s *namespaceSource) handleLine(l []byte, pod string) {
// Names of well known containers that do not produce nicely formatted logs that
// could be easily filtered and parsed by handleLine. Logs from these containers
// are captured without filtering.
var wellKnownContainers = sets.NewString(ChaosDuck, QueueProxy)
func (s *namespaceSource) handleLine(l []byte, pod string, _ string) {
// This holds the standard structure of our logs.
var line struct {
Level string `json:"severity"`
@ -231,12 +252,12 @@ func (s *namespaceSource) handleLine(l []byte, pod string) {
// handleGenericLine prints the given logline to all active tests as it cannot be parsed
// and/or doesn't contain any correlation data (like the chaosduck for example).
func (s *namespaceSource) handleGenericLine(l []byte, pod string) {
func (s *namespaceSource) handleGenericLine(l []byte, pod string, cn string) {
s.m.RLock()
defer s.m.RUnlock()
for _, logf := range s.keys {
// I 15:04:05.000 webhook-699b7b668d-9smk2 this is my message
logf("I %s %s %s", time.Now().Format(timeFormat), pod, string(l))
logf("I %s %s %s %s", time.Now().Format(timeFormat), pod, cn, string(l))
}
}

View File

@ -29,6 +29,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
@ -41,19 +42,79 @@ import (
)
const (
noLogTimeout = 100 * time.Millisecond
testKey = "horror-movie-2020"
testLine = `{"severity":"debug","timestamp":"2020-10-20T18:42:28.553Z","logger":"controller.revision-controller.knative.dev-serving-pkg-reconciler-revision.Reconciler","caller":"controller/controller.go:397","message":"Adding to queue default/s2-nhjv6 (depth: 1)","commit":"4411bf3","knative.dev/pod":"controller-f95b977c-4wlh4","knative.dev/controller":"revision-controller","knative.dev/key":"default/horror-movie-2020", "error":"el-otoño-eternal" }`
knativeContainer = "knativeContainer"
userContainer = "userContainer"
noLogTimeout = 100 * time.Millisecond
testKey = "horror-movie-2020"
// default test controller line with all matchin keys and attributes
testLine = `{"severity":"debug","timestamp":"2020-10-20T18:42:28.553Z","logger":"controller.revision-controller.knative.dev-serving-pkg-reconciler-revision.Reconciler","caller":"controller/controller.go:397","message":"Adding to queue default/s2-nhjv6 (depth: 1)","commit":"4411bf3","knative.dev/pod":"controller-f95b977c-4wlh4","knative.dev/controller":"revision-controller","knative.dev/key":"default/horror-movie-2020", "error":"el-otoño-eternal" }`
// test controller line with mismatched key entry (knative.dev/key)
testLineWithMissmatchedKey = `{"severity":"debug","timestamp":"2020-10-20T18:42:28.553Z","logger":"controller.revision-controller.knative.dev-serving-pkg-reconciler-revision.Reconciler","caller":"controller/controller.go:397","message":"Adding to queue default/s2-nhjv6 (depth: 1)","commit":"4411bf3","knative.dev/pod":"controller-f95b977c-4wlh4","knative.dev/controller":"revision-controller","knative.dev/key":"default/romcom-1990", "error":"el-otoño-eternal" }`
// test controller line with missing key entry (knative.dev/key)
testLineWithMissingKey = `{"severity":"debug","timestamp":"2020-10-20T18:42:28.553Z","logger":"controller.revision-controller.knative.dev-serving-pkg-reconciler-revision.Reconciler","caller":"controller/controller.go:397","message":"Adding to queue default/s2-nhjv6 (depth: 1)","commit":"4411bf3","knative.dev/pod":"controller-f95b977c-4wlh4","knative.dev/controller":"revision-controller", "error":"el-otoño-eternal" }`
testNonJSONLine = `Some non-json string produced by controller`
// this line doesn't have json entry for knative.dev/controller so we expect
// log parsing s to fallback to using "caller" attribute.
testNonControllerLine = `{"severity":"debug","timestamp":"2020-10-20T18:42:28.553Z","logger":"controller.revision-controller.knative.dev-serving-pkg-reconciler-revision.Reconciler","caller":"non_controller.go:397","message":"Adding to queue default/s2-nhjv6 (depth: 1)","commit":"4411bf3","knative.dev/pod":"controller-f95b977c-4wlh4","knative.dev/key":"default/horror-movie-2020", "error":"non_controller_error" }`
testChaosDuckLine = `Some non-json Chaos Duck string`
testQueueProxyLine = `Some non-json Queueproxy string`
testUserContainerLine = `Some non-json user container string`
testLinePattern = "el-otoño-eternal"
testNonControllerLinePattern = "non_controller_error"
)
var pod = &corev1.Pod{
// This map determines test log lines to be produced by each fake container
var (
logProductionMap = map[string][]string{
knativeContainer: {testLine, testLineWithMissmatchedKey, testLineWithMissingKey, testNonJSONLine, testNonControllerLine},
logstream.ChaosDuck: {testChaosDuckLine},
logstream.QueueProxy: {testQueueProxyLine},
userContainer: {testUserContainerLine},
}
singlePod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "RandomPodName",
Namespace: "defaultNameSpace",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: knativeContainer,
}},
},
}
knativePod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "RandomPodName",
Namespace: "defaultNameSpace",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: knativeContainer,
}, {
Name: logstream.ChaosDuck,
}},
},
}
)
var userPod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: logstream.ChaosDuck,
Namespace: "default",
Name: "SomeOtherRandomPodName",
Namespace: "usertestNamespace",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: logstream.ChaosDuck,
Name: logstream.QueueProxy,
}, {
Name: userContainer,
}},
},
}
@ -69,15 +130,15 @@ var readyStatus = corev1.PodStatus{
func TestWatchErr(t *testing.T) {
f := newK8sFake(fake.NewSimpleClientset(), errors.New("lookin' good"), nil)
stream := logstream.FromNamespace(context.Background(), f, "a-namespace")
_, err := stream.StartStream(pod.Name, nil)
_, err := stream.StartStream(knativePod.Name, nil)
if err == nil {
t.Fatal("LogStream creation should have failed")
}
}
func TestFailToStartStream(t *testing.T) {
pod := pod.DeepCopy()
pod.Status = readyStatus
singlePod := singlePod.DeepCopy()
singlePod.Status = readyStatus
const want = "hungry for apples"
f := newK8sFake(fake.NewSimpleClientset(), nil, /*watcher*/
@ -92,8 +153,8 @@ func TestFailToStartStream(t *testing.T) {
close(logFuncInvoked)
}
ctx, cancel := context.WithCancel(context.Background())
stream := logstream.FromNamespace(ctx, f, pod.Namespace)
streamC, err := stream.StartStream(pod.Name, logFunc)
stream := logstream.FromNamespace(ctx, f, singlePod.Namespace)
streamC, err := stream.StartStream(singlePod.Name, logFunc)
if err != nil {
t.Fatal("Failed to start the stream: ", err)
}
@ -101,8 +162,8 @@ func TestFailToStartStream(t *testing.T) {
streamC()
cancel()
})
podClient := f.CoreV1().Pods(pod.Namespace)
if _, err := podClient.Create(context.Background(), pod, metav1.CreateOptions{}); err != nil {
podClient := f.CoreV1().Pods(singlePod.Namespace)
if _, err := podClient.Create(context.Background(), singlePod, metav1.CreateOptions{}); err != nil {
t.Fatal("CreatePod()=", err)
}
@ -113,27 +174,64 @@ func TestFailToStartStream(t *testing.T) {
}
}
func processLogEntries(t *testing.T, logFuncInvoked <-chan string, patterns []string) {
expectedLogMatchesSet := sets.NewString(patterns...)
OUTER:
for len(expectedLogMatchesSet) > 0 {
// we expect exactly len(expectedLogMatchesSet) log entries
// each need to be matched with exactly one pattern from
// patterns...
select {
case <-time.After(noLogTimeout):
t.Error("Timed out: log message wasn't received")
case logLine := <-logFuncInvoked:
// classify string that we got here
for _, s := range sets.StringKeySet(expectedLogMatchesSet).List() {
if strings.Contains(logLine, s) {
expectedLogMatchesSet.Delete(s)
continue OUTER
}
}
t.Fatal("Unexpected log entry received:", logLine)
}
}
// now we expected timeout without any logs
select {
case <-time.After(noLogTimeout):
case logLine := <-logFuncInvoked:
t.Fatal("No more logs expected at this point, got:", logLine)
}
}
func TestNamespaceStream(t *testing.T) {
pod := pod.DeepCopy() // Needed to run the test multiple times in a row
knativePod := knativePod.DeepCopy() // Needed to run the test multiple times in a row
userPod := userPod.DeepCopy()
f := newK8sFake(fake.NewSimpleClientset(), nil, nil)
logFuncInvoked := make(chan struct{})
logFuncInvoked := make(chan string)
t.Cleanup(func() { close(logFuncInvoked) })
logFunc := func(format string, args ...interface{}) {
logFuncInvoked <- struct{}{}
logFuncInvoked <- fmt.Sprintf(format, args)
}
ctx, cancel := context.WithCancel(context.Background())
stream := logstream.FromNamespace(ctx, f, pod.Namespace)
stream := logstream.FromNamespaces(ctx, f, []string{knativePod.Namespace, userPod.Namespace})
streamC, err := stream.StartStream(testKey, logFunc)
if err != nil {
t.Fatal("Failed to start the stream: ", err)
}
t.Cleanup(streamC)
podClient := f.CoreV1().Pods(pod.Namespace)
if _, err := podClient.Create(context.Background(), pod, metav1.CreateOptions{}); err != nil {
podClient := f.CoreV1().Pods(knativePod.Namespace)
if _, err := podClient.Create(context.Background(), knativePod, metav1.CreateOptions{}); err != nil {
t.Fatal("CreatePod()=", err)
}
userPodClient := f.CoreV1().Pods(userPod.Namespace)
if _, err := userPodClient.Create(context.Background(), userPod, metav1.CreateOptions{}); err != nil {
t.Fatal("CreatePod()=", err)
}
@ -143,18 +241,26 @@ func TestNamespaceStream(t *testing.T) {
t.Error("Unready pod should not report logs")
}
pod.Status = readyStatus
if _, err := podClient.Update(context.Background(), pod, metav1.UpdateOptions{}); err != nil {
knativePod.Status = readyStatus
if _, err := podClient.Update(context.Background(), knativePod, metav1.UpdateOptions{}); err != nil {
t.Fatal("UpdatePod()=", err)
}
userPod.Status = readyStatus
if _, err := userPodClient.Update(context.Background(), userPod, metav1.UpdateOptions{}); err != nil {
t.Fatal("UpdatePod()=", err)
}
select {
case <-time.After(noLogTimeout):
t.Error("Timed out: log message wasn't received")
case <-logFuncInvoked:
}
// We are expecting to get back 4 log entries:
// 1. non filtered non json entries from queueproxy
// 2. non filtered non json entries from chaosduck
// 3. nicely formatted, filtered(with matching key) entry from knativeContainer
// 4. nicely formatted, filtered(with matching key) entry from knativeContainer (fallback to caller attribubute)
processLogEntries(t, logFuncInvoked, []string{testLinePattern, testNonControllerLinePattern, testChaosDuckLine, testQueueProxyLine})
if _, err := podClient.Update(context.Background(), pod, metav1.UpdateOptions{}); err != nil {
if _, err := podClient.Update(context.Background(), knativePod, metav1.UpdateOptions{}); err != nil {
t.Fatal("UpdatePod()=", err)
}
if _, err := userPodClient.Update(context.Background(), userPod, metav1.UpdateOptions{}); err != nil {
t.Fatal("UpdatePod()=", err)
}
@ -164,7 +270,10 @@ func TestNamespaceStream(t *testing.T) {
t.Error("Repeat updates to the same pod should not trigger GetLogs")
}
if err := podClient.Delete(context.Background(), pod.Name, metav1.DeleteOptions{}); err != nil {
if err := podClient.Delete(context.Background(), knativePod.Name, metav1.DeleteOptions{}); err != nil {
t.Fatal("UpdatePod()=", err)
}
if err := userPodClient.Delete(context.Background(), userPod.Name, metav1.DeleteOptions{}); err != nil {
t.Fatal("UpdatePod()=", err)
}
@ -174,9 +283,9 @@ func TestNamespaceStream(t *testing.T) {
t.Error("Deletion should not trigger GetLogs")
}
pod.Spec.Containers[0].Name = "goose-with-a-flair"
knativePod.Spec.Containers[0].Name = "goose-with-a-flair"
// Create pod with the same name? Why not. And let's make it ready from the get go.
if _, err := podClient.Create(context.Background(), pod, metav1.CreateOptions{}); err != nil {
if _, err := podClient.Create(context.Background(), knativePod, metav1.CreateOptions{}); err != nil {
t.Fatal("CreatePod()=", err)
}
@ -187,7 +296,7 @@ func TestNamespaceStream(t *testing.T) {
}
// Delete again.
if err := podClient.Delete(context.Background(), pod.Name, metav1.DeleteOptions{}); err != nil {
if err := podClient.Delete(context.Background(), knativePod.Name, metav1.DeleteOptions{}); err != nil {
t.Fatal("UpdatePod()=", err)
}
// Kill the context.
@ -196,7 +305,7 @@ func TestNamespaceStream(t *testing.T) {
// We can't assume that the cancel signal doesn't race the pod creation signal, so
// we retry a few times to give some leeway.
if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) {
if _, err := podClient.Create(context.Background(), pod, metav1.CreateOptions{}); err != nil {
if _, err := podClient.Create(context.Background(), knativePod, metav1.CreateOptions{}); err != nil {
return false, err
}
@ -205,7 +314,7 @@ func TestNamespaceStream(t *testing.T) {
return true, nil
case <-logFuncInvoked:
t.Log("Log was still produced, trying again...")
if err := podClient.Delete(context.Background(), pod.Name, metav1.DeleteOptions{}); err != nil {
if err := podClient.Delete(context.Background(), knativePod.Name, metav1.DeleteOptions{}); err != nil {
return false, err
}
return false, nil
@ -258,19 +367,32 @@ func (f *fakeclient) Pods(ns string) v1.PodInterface {
}
}
func (f *fakePods) GetLogs(name string, opts *corev1.PodLogOptions) *restclient.Request {
func logsForContainer(container string) string {
result := ""
for _, s := range logProductionMap[container] {
if len(result) > 0 {
result += "\n"
}
result += s
}
return result
}
func (f *fakePods) GetLogs(podName string, opts *corev1.PodLogOptions) *restclient.Request {
fakeClient := &fakerest.RESTClient{
Client: fakerest.CreateHTTPClient(func(request *http.Request) (*http.Response, error) {
resp := &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(
strings.NewReader(testLine)),
strings.NewReader(logsForContainer(opts.Container))),
}
return resp, nil
}),
NegotiatedSerializer: scheme.Codecs.WithoutConversion(),
GroupVersion: schema.GroupVersion{Version: "v1"},
VersionedAPIPath: fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/log", f.ns, name),
VersionedAPIPath: fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/log", f.ns, podName),
}
ret := fakeClient.Request()
if f.logsErr != nil {