276 lines
8.4 KiB
Go
276 lines
8.4 KiB
Go
/*
|
|
Copyright 2018 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package dynamic
|
|
|
|
import (
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"reflect"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
auditregv1alpha1 "k8s.io/api/auditregistration/v1alpha1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
auditinternal "k8s.io/apiserver/pkg/apis/audit"
|
|
auditv1 "k8s.io/apiserver/pkg/apis/audit/v1"
|
|
"k8s.io/apiserver/pkg/audit"
|
|
webhook "k8s.io/apiserver/pkg/util/webhook"
|
|
informers "k8s.io/client-go/informers"
|
|
"k8s.io/client-go/kubernetes/fake"
|
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
)
|
|
|
|
func TestDynamic(t *testing.T) {
|
|
eventList1 := &atomic.Value{}
|
|
eventList1.Store(auditinternal.EventList{})
|
|
eventList2 := &atomic.Value{}
|
|
eventList2.Store(auditinternal.EventList{})
|
|
|
|
// start test servers
|
|
server1 := httptest.NewServer(buildTestHandler(t, eventList1))
|
|
defer server1.Close()
|
|
server2 := httptest.NewServer(buildTestHandler(t, eventList2))
|
|
defer server2.Close()
|
|
|
|
testPolicy := auditregv1alpha1.Policy{
|
|
Level: auditregv1alpha1.LevelMetadata,
|
|
Stages: []auditregv1alpha1.Stage{
|
|
auditregv1alpha1.StageResponseStarted,
|
|
},
|
|
}
|
|
testEvent := auditinternal.Event{
|
|
Level: auditinternal.LevelMetadata,
|
|
Stage: auditinternal.StageResponseStarted,
|
|
Verb: "get",
|
|
RequestURI: "/test/path",
|
|
}
|
|
testConfig1 := &auditregv1alpha1.AuditSink{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "test1",
|
|
UID: types.UID("test1"),
|
|
},
|
|
Spec: auditregv1alpha1.AuditSinkSpec{
|
|
Policy: testPolicy,
|
|
Webhook: auditregv1alpha1.Webhook{
|
|
ClientConfig: auditregv1alpha1.WebhookClientConfig{
|
|
URL: &server1.URL,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
testConfig2 := &auditregv1alpha1.AuditSink{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: "test2",
|
|
UID: types.UID("test2"),
|
|
},
|
|
Spec: auditregv1alpha1.AuditSinkSpec{
|
|
Policy: testPolicy,
|
|
Webhook: auditregv1alpha1.Webhook{
|
|
ClientConfig: auditregv1alpha1.WebhookClientConfig{
|
|
URL: &server2.URL,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
config, stopChan := defaultTestConfig()
|
|
config.BufferedConfig.MaxBatchWait = 10 * time.Millisecond
|
|
|
|
b, err := NewBackend(config)
|
|
require.NoError(t, err)
|
|
d := b.(*backend)
|
|
err = b.Run(stopChan)
|
|
require.NoError(t, err)
|
|
|
|
t.Run("find none", func(t *testing.T) {
|
|
require.Len(t, d.GetDelegates(), 0)
|
|
})
|
|
|
|
t.Run("find one", func(t *testing.T) {
|
|
d.addSink(testConfig1)
|
|
delegates := d.GetDelegates()
|
|
require.Len(t, delegates, 1)
|
|
require.Contains(t, delegates, types.UID("test1"))
|
|
require.Equal(t, testConfig1, delegates["test1"].configuration)
|
|
|
|
// send event and check that it arrives
|
|
b.ProcessEvents(&testEvent)
|
|
err := checkForEvent(eventList1, testEvent)
|
|
require.NoError(t, err, "unable to find events sent to sink")
|
|
})
|
|
|
|
t.Run("find two", func(t *testing.T) {
|
|
eventList1.Store(auditinternal.EventList{})
|
|
d.addSink(testConfig2)
|
|
delegates := d.GetDelegates()
|
|
require.Len(t, delegates, 2)
|
|
require.Contains(t, delegates, types.UID("test1"))
|
|
require.Contains(t, delegates, types.UID("test2"))
|
|
require.Equal(t, testConfig1, delegates["test1"].configuration)
|
|
require.Equal(t, testConfig2, delegates["test2"].configuration)
|
|
|
|
// send event to both delegates and check that it arrives in both places
|
|
b.ProcessEvents(&testEvent)
|
|
err := checkForEvent(eventList1, testEvent)
|
|
require.NoError(t, err, "unable to find events sent to sink 1")
|
|
err = checkForEvent(eventList2, testEvent)
|
|
require.NoError(t, err, "unable to find events sent to sink 2")
|
|
})
|
|
|
|
t.Run("delete one", func(t *testing.T) {
|
|
eventList2.Store(auditinternal.EventList{})
|
|
d.deleteSink(testConfig1)
|
|
delegates := d.GetDelegates()
|
|
require.Len(t, delegates, 1)
|
|
require.Contains(t, delegates, types.UID("test2"))
|
|
require.Equal(t, testConfig2, delegates["test2"].configuration)
|
|
|
|
// send event and check that it arrives to remaining sink
|
|
b.ProcessEvents(&testEvent)
|
|
err := checkForEvent(eventList2, testEvent)
|
|
require.NoError(t, err, "unable to find events sent to sink")
|
|
})
|
|
|
|
t.Run("update one", func(t *testing.T) {
|
|
eventList1.Store(auditinternal.EventList{})
|
|
oldConfig := *testConfig2
|
|
testConfig2.Spec.Webhook.ClientConfig.URL = &server1.URL
|
|
testConfig2.UID = types.UID("test2.1")
|
|
d.updateSink(&oldConfig, testConfig2)
|
|
delegates := d.GetDelegates()
|
|
require.Len(t, delegates, 1)
|
|
require.Contains(t, delegates, types.UID("test2.1"))
|
|
require.Equal(t, testConfig2, delegates["test2.1"].configuration)
|
|
|
|
// send event and check that it arrives to updated sink
|
|
b.ProcessEvents(&testEvent)
|
|
err := checkForEvent(eventList1, testEvent)
|
|
require.NoError(t, err, "unable to find events sent to sink")
|
|
})
|
|
|
|
t.Run("update meta only", func(t *testing.T) {
|
|
eventList1.Store(auditinternal.EventList{})
|
|
oldConfig := *testConfig2
|
|
testConfig2.UID = types.UID("test2.2")
|
|
testConfig2.Labels = map[string]string{"my": "label"}
|
|
d.updateSink(&oldConfig, testConfig2)
|
|
delegates := d.GetDelegates()
|
|
require.Len(t, delegates, 1)
|
|
require.Contains(t, delegates, types.UID("test2.2"))
|
|
|
|
// send event and check that it arrives to same sink
|
|
b.ProcessEvents(&testEvent)
|
|
err := checkForEvent(eventList1, testEvent)
|
|
require.NoError(t, err, "unable to find events sent to sink")
|
|
})
|
|
|
|
t.Run("shutdown", func(t *testing.T) {
|
|
// if the stop signal is not propagated correctly the buffers will not
|
|
// close down gracefully, and the shutdown method will hang causing
|
|
// the test will timeout.
|
|
timeoutChan := make(chan struct{})
|
|
successChan := make(chan struct{})
|
|
go func() {
|
|
time.Sleep(1 * time.Second)
|
|
timeoutChan <- struct{}{}
|
|
}()
|
|
go func() {
|
|
close(stopChan)
|
|
d.Shutdown()
|
|
successChan <- struct{}{}
|
|
}()
|
|
for {
|
|
select {
|
|
case <-timeoutChan:
|
|
t.Error("shutdown timed out")
|
|
return
|
|
case <-successChan:
|
|
return
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
// checkForEvent will poll to check for an audit event in an atomic event list
|
|
func checkForEvent(a *atomic.Value, evSent auditinternal.Event) error {
|
|
return wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) {
|
|
el := a.Load().(auditinternal.EventList)
|
|
if len(el.Items) != 1 {
|
|
return false, nil
|
|
}
|
|
evFound := el.Items[0]
|
|
eq := reflect.DeepEqual(evSent, evFound)
|
|
if !eq {
|
|
return false, fmt.Errorf("event mismatch -- sent: %+v found: %+v", evSent, evFound)
|
|
}
|
|
return true, nil
|
|
})
|
|
}
|
|
|
|
// buildTestHandler returns a handler that will update the atomic value passed in
|
|
// with the event list it receives
|
|
func buildTestHandler(t *testing.T, a *atomic.Value) http.HandlerFunc {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
body, err := ioutil.ReadAll(r.Body)
|
|
if err != nil {
|
|
t.Fatalf("could not read request body: %v", err)
|
|
}
|
|
el := auditinternal.EventList{}
|
|
decoder := audit.Codecs.UniversalDecoder(auditv1.SchemeGroupVersion)
|
|
if err := runtime.DecodeInto(decoder, body, &el); err != nil {
|
|
t.Fatalf("failed decoding buf: %b, apiVersion: %s", body, auditv1.SchemeGroupVersion)
|
|
}
|
|
defer r.Body.Close()
|
|
a.Store(el)
|
|
w.WriteHeader(200)
|
|
})
|
|
}
|
|
|
|
// defaultTestConfig returns a Config object suitable for testing along with its
|
|
// associated stopChan
|
|
func defaultTestConfig() (*Config, chan struct{}) {
|
|
authWrapper := webhook.AuthenticationInfoResolverWrapper(
|
|
func(a webhook.AuthenticationInfoResolver) webhook.AuthenticationInfoResolver { return a },
|
|
)
|
|
client := fake.NewSimpleClientset()
|
|
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
|
stop := make(chan struct{})
|
|
|
|
eventSink := &v1core.EventSinkImpl{Interface: client.CoreV1().Events("")}
|
|
|
|
informerFactory.Start(stop)
|
|
informerFactory.WaitForCacheSync(stop)
|
|
informer := informerFactory.Auditregistration().V1alpha1().AuditSinks()
|
|
return &Config{
|
|
Informer: informer,
|
|
EventConfig: EventConfig{Sink: eventSink},
|
|
BufferedConfig: NewDefaultWebhookBatchConfig(),
|
|
WebhookConfig: WebhookConfig{
|
|
AuthInfoResolverWrapper: authWrapper,
|
|
ServiceResolver: webhook.NewDefaultServiceResolver(),
|
|
},
|
|
}, stop
|
|
}
|