chore(trigger): Cleanup Trigger create API (#561)

* further updates

* chore: Fix calls to apiserver create

* fix namespace issue

* fix
This commit is contained in:
Roland Huß 2019-12-17 22:39:04 +01:00 committed by Knative Prow Robot
parent e793090399
commit b10580ff72
20 changed files with 145 additions and 129 deletions

View File

@ -26,9 +26,8 @@ kn source apiserver create NAME --resource RESOURCE --service-account ACCOUNTNAM
"Ref" sends only the reference to the resource,
"Resource" send the full resource. (default "Ref")
-n, --namespace string Specify the namespace to operate in.
--resource strings Comma seperate Kind:APIVersion:isController list, e.g. Event:v1:true.
"APIVersion" and "isControler" can be omitted.
"APIVersion" is "v1" by default, "isController" is "false" by default.
--resource strings Specification for which events to listen, in the format Kind:APIVersion:isController, e.g. Deployment:apps/v1:true.
"isController" can be omitted and is "false" by default.
--service-account string Name of the service account to use to run this source
-s, --sink string Addressable sink for events
```

View File

@ -26,9 +26,8 @@ kn source apiserver update NAME --resource RESOURCE --service-account ACCOUNTNAM
"Ref" sends only the reference to the resource,
"Resource" send the full resource. (default "Ref")
-n, --namespace string Specify the namespace to operate in.
--resource strings Comma seperate Kind:APIVersion:isController list, e.g. Event:v1:true.
"APIVersion" and "isControler" can be omitted.
"APIVersion" is "v1" by default, "isController" is "false" by default.
--resource strings Specification for which events to listen, in the format Kind:APIVersion:isController, e.g. Deployment:apps/v1:true.
"isController" can be omitted and is "false" by default.
--service-account string Name of the service account to use to run this source
-s, --sink string Addressable sink for events
```

View File

@ -18,12 +18,13 @@ import (
apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kn_errors "knative.dev/client/pkg/errors"
"knative.dev/client/pkg/util"
"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/client/clientset/versioned/scheme"
client_v1alpha1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1alpha1"
duckv1 "knative.dev/pkg/apis/duck/v1"
kn_errors "knative.dev/client/pkg/errors"
"knative.dev/client/pkg/util"
)
const (
@ -36,7 +37,7 @@ type KnEventingClient interface {
// Namespace in which this client is operating for
Namespace() string
// CreateTrigger is used to create an instance of trigger
CreateTrigger(trigger *v1alpha1.Trigger) (*v1alpha1.Trigger, error)
CreateTrigger(trigger *v1alpha1.Trigger) error
// DeleteTrigger is used to delete an instance of trigger
DeleteTrigger(name string) error
// GetTrigger is used to get an instance of trigger
@ -64,12 +65,12 @@ func NewKnEventingClient(client client_v1alpha1.EventingV1alpha1Interface, names
}
//CreateTrigger is used to create an instance of trigger
func (c *knEventingClient) CreateTrigger(trigger *v1alpha1.Trigger) (*v1alpha1.Trigger, error) {
func (c *knEventingClient) CreateTrigger(trigger *v1alpha1.Trigger) error {
trigger, err := c.client.Triggers(c.namespace).Create(trigger)
if err != nil {
return nil, kn_errors.GetError(err)
return kn_errors.GetError(err)
}
return trigger, nil
return nil
}
//DeleteTrigger is used to delete an instance of trigger
@ -147,50 +148,53 @@ func NewTriggerBuilder(name string) *TriggerBuilder {
}
// NewTriggerBuilderFromExisting for building the object from existing Trigger object
func NewTriggerBuilderFromExisting(tr *v1alpha1.Trigger) *TriggerBuilder {
return &TriggerBuilder{trigger: tr.DeepCopy()}
func NewTriggerBuilderFromExisting(trigger *v1alpha1.Trigger) *TriggerBuilder {
return &TriggerBuilder{trigger: trigger.DeepCopy()}
}
// Namespace for this trigger
func (b *TriggerBuilder) Namespace(ns string) *TriggerBuilder {
b.trigger.Namespace = ns
return b
}
// Subscriber for the trigger to send to (it's a Sink actually)
func (b *TriggerBuilder) Subscriber(subscriber *duckv1.Destination) *TriggerBuilder {
b.trigger.Spec.Subscriber = subscriber
return b
}
// Broker to set the broker of trigger object
func (b *TriggerBuilder) Broker(broker string) *TriggerBuilder {
if broker != "" {
b.trigger.Spec.Broker = broker
}
b.trigger.Spec.Broker = broker
return b
}
// Filters to set the filters of trigger object
func (b *TriggerBuilder) Filters(filters map[string]string) *TriggerBuilder {
if filters != nil {
triggerFilterAttributes := v1alpha1.TriggerFilterAttributes(filters)
b.trigger.Spec.Filter = &v1alpha1.TriggerFilter{
Attributes: &triggerFilterAttributes,
}
func (b *TriggerBuilder) AddFilter(key, value string) *TriggerBuilder {
filter := b.trigger.Spec.Filter
if filter == nil {
filter = &v1alpha1.TriggerFilter{}
b.trigger.Spec.Filter = filter
}
attributes := filter.Attributes
if attributes == nil {
attributes = &v1alpha1.TriggerFilterAttributes{}
filter.Attributes = attributes
}
(*attributes)[key] = value
return b
}
// UpdateFilters to update the filters of trigger object
func (b *TriggerBuilder) UpdateFilters(toUpdate map[string]string, toRemove []string) *TriggerBuilder {
if b.trigger.Spec.Filter == nil {
b.Filters(toUpdate)
func (b *TriggerBuilder) RemoveFilter(key string) *TriggerBuilder {
filter := b.trigger.Spec.Filter
if filter == nil {
return b
}
existing := map[string]string(*b.trigger.Spec.Filter.Attributes)
for key, value := range toUpdate {
existing[key] = value
attributes := filter.Attributes
if attributes == nil {
return b
}
for _, key := range toRemove {
delete(existing, key)
}
b.Filters(existing)
return b
}
// Sink to set the subscriber of trigger object
func (b *TriggerBuilder) Sink(sink *duckv1.Destination) *TriggerBuilder {
b.trigger.Spec.Subscriber = sink
delete(*attributes, key)
return b
}

View File

@ -60,13 +60,13 @@ func (c *MockKnEventingClient) Namespace() string {
// CreateTrigger records a call for CreateCronJobSource with the expected error
func (sr *EventingRecorder) CreateTrigger(trigger interface{}, err error) {
sr.r.Add("CreateTrigger", []interface{}{trigger}, []interface{}{trigger, err})
sr.r.Add("CreateTrigger", []interface{}{trigger}, []interface{}{err})
}
// CreateTrigger performs a previously recorded action
func (c *MockKnEventingClient) CreateTrigger(trigger *v1alpha1.Trigger) (*v1alpha1.Trigger, error) {
func (c *MockKnEventingClient) CreateTrigger(trigger *v1alpha1.Trigger) error {
call := c.recorder.r.VerifyCall("CreateTrigger", trigger)
return call.Result[0].(*v1alpha1.Trigger), mock.ErrorOrNil(call.Result[1])
return mock.ErrorOrNil(call.Result[0])
}
// GetTrigger records a call for GetTrigger with the expected object or error. Either trigger or err should be nil

View File

@ -73,14 +73,12 @@ func TestCreateTrigger(t *testing.T) {
})
t.Run("create trigger without error", func(t *testing.T) {
ins, err := client.CreateTrigger(objNew)
err := client.CreateTrigger(objNew)
assert.NilError(t, err)
assert.Equal(t, ins.Name, name)
assert.Equal(t, ins.Namespace, testNamespace)
})
t.Run("create trigger with an error returns an error object", func(t *testing.T) {
_, err := client.CreateTrigger(newTrigger("unknown"))
err := client.CreateTrigger(newTrigger("unknown"))
assert.ErrorContains(t, err, "unknown")
})
}
@ -130,12 +128,12 @@ func TestListTrigger(t *testing.T) {
func TestTriggerBuilder(t *testing.T) {
a := NewTriggerBuilder("testtrigger")
a.Filters(map[string]string{"type": "foo", "source": "bar"})
a.AddFilter("type", "foo").AddFilter("source", "bar")
t.Run("update filter", func(t *testing.T) {
b := NewTriggerBuilderFromExisting(a.Build())
assert.DeepEqual(t, b.Build(), a.Build())
b.UpdateFilters(map[string]string{"type": "new"}, []string{"source"})
b.AddFilter("type", "new").RemoveFilter("source")
expected := &v1alpha1.TriggerFilter{
Attributes: &v1alpha1.TriggerFilterAttributes{
"type": "new",
@ -147,7 +145,7 @@ func TestTriggerBuilder(t *testing.T) {
t.Run("update filter with only deletions", func(t *testing.T) {
b := NewTriggerBuilderFromExisting(a.Build())
assert.DeepEqual(t, b.Build(), a.Build())
b.UpdateFilters(nil, []string{"source"})
b.RemoveFilter("source")
expected := &v1alpha1.TriggerFilter{
Attributes: &v1alpha1.TriggerFilterAttributes{
"type": "foo",
@ -159,7 +157,7 @@ func TestTriggerBuilder(t *testing.T) {
t.Run("update filter with only updates", func(t *testing.T) {
b := NewTriggerBuilderFromExisting(a.Build())
assert.DeepEqual(t, b.Build(), a.Build())
b.UpdateFilters(map[string]string{"type": "new"}, nil)
b.AddFilter("type", "new")
expected := &v1alpha1.TriggerFilter{
Attributes: &v1alpha1.TriggerFilterAttributes{
"type": "new",
@ -168,14 +166,12 @@ func TestTriggerBuilder(t *testing.T) {
}
assert.DeepEqual(t, expected, b.Build().Spec.Filter)
})
}
func newTrigger(name string) *v1alpha1.Trigger {
b := NewTriggerBuilder(name)
b.Filters(map[string]string{"type": "foo"})
b.Broker("default")
b.trigger.Name = name
b.trigger.Namespace = testNamespace
return b.Build()
return NewTriggerBuilder(name).
Namespace(testNamespace).
Broker("default").
AddFilter("type", "foo").
Build()
}

View File

@ -61,22 +61,27 @@ func NewAPIServerCreateCommand(p *commands.KnParams) *cobra.Command {
if err != nil {
return fmt.Errorf(
"cannot create ApiServerSource '%s' in namespace '%s' "+
"because %s", name, namespace, err)
"because: %s", name, namespace, err)
}
// create
resources, err := updateFlags.GetAPIServerResourceArray()
if err != nil {
return err
}
err = apiSourceClient.CreateAPIServerSource(
v1alpha1.NewAPIServerSourceBuilder(name).
Resources(updateFlags.GetAPIServerResourceArray()).
ServiceAccount(updateFlags.ServiceAccountName).
Mode(updateFlags.Mode).
Resources(*resources).
Sink(objectRef).
Build())
if err != nil {
return fmt.Errorf(
"cannot create ApiServerSource '%s' in namespace '%s' "+
"because %s", name, namespace, err)
"because: %s", name, namespace, err)
}
if err == nil {

View File

@ -28,8 +28,8 @@ func TestGetAPIServerResourceArray(t *testing.T) {
Mode: "Ref",
Resources: []string{"Service:serving.knative.dev/v1alpha1:true"},
}
created := createFlag.GetAPIServerResourceArray()
wanted := []sources_v1alpha1.ApiServerResource{{
created, _ := createFlag.GetAPIServerResourceArray()
wanted := &[]sources_v1alpha1.ApiServerResource{{
APIVersion: "serving.knative.dev/v1alpha1",
Kind: "Service",
Controller: true,
@ -42,8 +42,8 @@ func TestGetAPIServerResourceArray(t *testing.T) {
Mode: "Ref",
Resources: []string{"Service:serving.knative.dev/v1alpha1"},
}
created = createFlag.GetAPIServerResourceArray()
wanted = []sources_v1alpha1.ApiServerResource{{
created, _ = createFlag.GetAPIServerResourceArray()
wanted = &[]sources_v1alpha1.ApiServerResource{{
APIVersion: "serving.knative.dev/v1alpha1",
Kind: "Service",
Controller: false,
@ -54,10 +54,10 @@ func TestGetAPIServerResourceArray(t *testing.T) {
createFlag = APIServerSourceUpdateFlags{
ServiceAccountName: "test-sa",
Mode: "Ref",
Resources: []string{"Service"},
Resources: []string{"Service:v1"},
}
created = createFlag.GetAPIServerResourceArray()
wanted = []sources_v1alpha1.ApiServerResource{{
created, _ = createFlag.GetAPIServerResourceArray()
wanted = &[]sources_v1alpha1.ApiServerResource{{
APIVersion: "v1",
Kind: "Service",
Controller: false,
@ -71,8 +71,8 @@ func TestGetAPIServerResourceArray(t *testing.T) {
Mode: "Resource",
Resources: []string{"Event:v1:true", "Pod:v2:false"},
}
created := createFlag.GetAPIServerResourceArray()
wanted := []sources_v1alpha1.ApiServerResource{{
created, _ := createFlag.GetAPIServerResourceArray()
wanted := &[]sources_v1alpha1.ApiServerResource{{
APIVersion: "v1",
Kind: "Event",
Controller: true,
@ -87,11 +87,11 @@ func TestGetAPIServerResourceArray(t *testing.T) {
createFlag = APIServerSourceUpdateFlags{
ServiceAccountName: "test-sa",
Mode: "Resource",
Resources: []string{"Event", "Pod"},
Resources: []string{"Event:v1", "Pod:v1"},
}
created = createFlag.GetAPIServerResourceArray()
created, _ = createFlag.GetAPIServerResourceArray()
wanted = []sources_v1alpha1.ApiServerResource{{
wanted = &[]sources_v1alpha1.ApiServerResource{{
APIVersion: "v1",
Kind: "Event",
Controller: false,

View File

@ -52,7 +52,7 @@ func TestSinkNotFoundError(t *testing.T) {
servingClient := knserving_client.NewMockKnServiceClient(t)
apiServerClient := knsources_v1alpha1.NewMockKnAPIServerSourceClient(t)
errorMsg := "cannot create ApiServerSource 'testsource' in namespace 'default' because no Service svc found"
errorMsg := "cannot create ApiServerSource 'testsource' in namespace 'default' because: no Service svc found"
servingRecorder := servingClient.Recorder()
servingRecorder.GetService("testsvc", nil, errors.New("no Service svc found"))

View File

@ -31,7 +31,6 @@ import (
const (
apiVersionSplitChar = ":"
defaultAPIVersion = "v1"
)
// APIServerSourceUpdateFlags are flags for create and update a ApiServerSource
@ -41,38 +40,47 @@ type APIServerSourceUpdateFlags struct {
Resources []string
}
type resourceSpec struct {
kind string
apiVersion string
isController bool
}
// GetAPIServerResourceArray is to return an array of ApiServerResource from a string. A sample is Event:v1:true,Pod:v2:false
func (f *APIServerSourceUpdateFlags) GetAPIServerResourceArray() []v1alpha1.ApiServerResource {
func (f *APIServerSourceUpdateFlags) GetAPIServerResourceArray() (*[]v1alpha1.ApiServerResource, error) {
var resourceList []v1alpha1.ApiServerResource
for _, r := range f.Resources {
version, kind, controller := getValidResource(r)
resourceSpec, err := getValidResource(r)
if err != nil {
return nil, err
}
resourceRef := v1alpha1.ApiServerResource{
APIVersion: version,
Kind: kind,
Controller: controller,
APIVersion: resourceSpec.apiVersion,
Kind: resourceSpec.kind,
Controller: resourceSpec.isController,
}
resourceList = append(resourceList, resourceRef)
}
return resourceList
return &resourceList, nil
}
func getValidResource(resource string) (string, string, bool) {
var version = defaultAPIVersion // v1 as default
var isController = false //false as default
func getValidResource(resource string) (*resourceSpec, error) {
var isController = false //false as default
var err error
parts := strings.Split(resource, apiVersionSplitChar)
kind := parts[0]
if len(parts) >= 2 {
version = parts[1]
if len(parts) < 2 {
return nil, fmt.Errorf("no APIVersion given for resource %s", resource)
}
version := parts[1]
if len(parts) >= 3 {
isController, err = strconv.ParseBool(parts[2])
if err != nil {
isController = false
return nil, fmt.Errorf("cannot parse controller flage in resource specification %s", resource)
}
}
return version, kind, isController
return &resourceSpec{apiVersion: version, kind: kind, isController: isController}, nil
}
//Add is to set parameters
@ -90,9 +98,8 @@ func (f *APIServerSourceUpdateFlags) Add(cmd *cobra.Command) {
cmd.Flags().StringSliceVar(&f.Resources,
"resource",
nil,
`Comma seperate Kind:APIVersion:isController list, e.g. Event:v1:true.
"APIVersion" and "isControler" can be omitted.
"APIVersion" is "v1" by default, "isController" is "false" by default.`)
`Specification for which events to listen, in the format Kind:APIVersion:isController, e.g. Deployment:apps/v1:true.
"isController" can be omitted and is "false" by default.`)
}
// APIServerSourceListHandlers handles printing human readable table for `kn source apiserver list` command's output

View File

@ -74,7 +74,11 @@ func NewAPIServerUpdateCommand(p *commands.KnParams) *cobra.Command {
}
if cmd.Flags().Changed("resource") {
b.Resources(apiServerUpdateFlags.GetAPIServerResourceArray())
resources, err := apiServerUpdateFlags.GetAPIServerResourceArray()
if err != nil {
return err
}
b.Resources(*resources)
}
if cmd.Flags().Changed("sink") {

View File

@ -39,7 +39,7 @@ func NewCronJobCreateCommand(p *commands.KnParams) *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) (err error) {
if len(args) != 1 {
return errors.New("requires the name of the crobjob source to create as single argument")
return errors.New("requires the name of the crojob source to create as single argument")
}
name := args[0]

View File

@ -32,7 +32,7 @@ func NewCronJobDeleteCommand(p *commands.KnParams) *cobra.Command {
kn source cronjob delete my-cron-trigger`,
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
return errors.New("'requires the name of the crobjob source to delete as single argument")
return errors.New("'requires the name of the cronjob source to delete as single argument")
}
name := args[0]

View File

@ -64,7 +64,7 @@ func NewTriggerCreateCommand(p *commands.KnParams) *cobra.Command {
if err != nil {
return fmt.Errorf(
"cannot create trigger '%s' in namespace '%s' "+
"because %s", name, namespace, err)
"because: %s", name, namespace, err)
}
filters, err := triggerUpdateFlags.GetFilters()
@ -85,11 +85,11 @@ func NewTriggerCreateCommand(p *commands.KnParams) *cobra.Command {
URI: objectRef.URI,
}
_, err = eventingClient.CreateTrigger(trigger)
err = eventingClient.CreateTrigger(trigger)
if err != nil {
return fmt.Errorf(
"cannot create trigger '%s' in namespace '%s' "+
"because %s", name, namespace, err)
"because: %s", name, namespace, err)
}
fmt.Fprintf(cmd.OutOrStdout(), "Trigger '%s' successfully created in namespace '%s'.\n", args[0], namespace)
return nil

View File

@ -57,7 +57,7 @@ func TestSinkNotFoundError(t *testing.T) {
eventingClient := eventing_client.NewMockKnEventingClient(t)
servingClient := knserving_client.NewMockKnServiceClient(t)
errorMsg := fmt.Sprintf("cannot create trigger '%s' in namespace 'default' because no Service mysvc found", triggerName)
errorMsg := fmt.Sprintf("cannot create trigger '%s' in namespace 'default' because: no Service mysvc found", triggerName)
servingRecorder := servingClient.Recorder()
servingRecorder.GetService("mysvc", nil, errors.New("no Service mysvc found"))

View File

@ -18,15 +18,15 @@ import (
"bytes"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
eventc_v1alpha1 "knative.dev/client/pkg/eventing/v1alpha1"
"knative.dev/client/pkg/kn/commands"
serving_client_v1alpha1 "knative.dev/client/pkg/serving/v1alpha1"
"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)
// Helper methods
@ -77,26 +77,21 @@ func executeTriggerCommand(triggerClient eventc_v1alpha1.KnEventingClient, servi
}
func createTrigger(namespace string, name string, filters map[string]string, broker string, svcname string) *v1alpha1.Trigger {
triggerFilterAttributes := v1alpha1.TriggerFilterAttributes(filters)
wanted := &v1alpha1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: v1alpha1.TriggerSpec{
Broker: broker,
Filter: &v1alpha1.TriggerFilter{
Attributes: &triggerFilterAttributes,
},
Subscriber: &duckv1.Destination{
Ref: &corev1.ObjectReference{
Name: svcname,
Kind: "Service",
},
},
},
triggerBuilder := eventc_v1alpha1.NewTriggerBuilder(name).
Namespace(namespace).
Broker(broker)
for k, v := range filters {
triggerBuilder.AddFilter(k, v)
}
return wanted
triggerBuilder.Subscriber(&duckv1.Destination{
Ref: &corev1.ObjectReference{
Name: svcname,
Kind: "Service",
},
})
return triggerBuilder.Build()
}
func createTriggerWithStatus(namespace string, name string, filters map[string]string, broker string, svcname string) *v1alpha1.Trigger {

View File

@ -83,14 +83,19 @@ func NewTriggerUpdateCommand(p *commands.KnParams) *cobra.Command {
return fmt.Errorf(
"cannot update trigger '%s' because %s", name, err)
}
b.UpdateFilters(updated, removed)
for k, v := range updated {
b.AddFilter(k, v)
}
for _, k := range removed {
b.RemoveFilter(k)
}
}
if cmd.Flags().Changed("sink") {
destination, err := sinkFlags.ResolveSink(servingClient)
if err != nil {
return err
}
b.Sink(&duckv1.Destination{
b.Subscriber(&duckv1.Destination{
Ref: destination.Ref,
URI: destination.URI,
})

View File

@ -63,7 +63,7 @@ func (f *TriggerUpdateFlags) GetFilters() (map[string]string, error) {
// GetFilter to return a map type of filters
func (f *TriggerUpdateFlags) GetUpdateFilters() (map[string]string, []string, error) {
filters := map[string]string{}
removes := []string{}
var removes []string
for _, item := range f.Filters {
if strings.HasSuffix(item, "-") {
removes = append(removes, item[:len(item)-1])

View File

@ -33,6 +33,7 @@ var apiVersions = map[string][]string{
},
"eventing": {
"sources.eventing.knative.dev/v1alpha1 (knative-eventing v0.11.0)",
"eventing.knative.dev/v1alpha1 (knative-eventing v0.11.0)",
},
}

View File

@ -39,6 +39,7 @@ Supported APIs:
- serving.knative.dev/v1alpha1 (knative-serving v0.11.0)
* Eventing
- sources.eventing.knative.dev/v1alpha1 (knative-eventing v0.11.0)
- eventing.knative.dev/v1alpha1 (knative-eventing v0.11.0)
`
const (

View File

@ -36,7 +36,7 @@ func TestSourceApiServer(t *testing.T) {
t.Run("create apiserver sources with a sink to a service", func(t *testing.T) {
test.apiServerSourceCreate(t, "testapisource0", "Event:v1:true", "testsa", "svc:testsvc0")
test.apiServerSourceCreate(t, "testapisource1", "Event", "testsa", "svc:testsvc0")
test.apiServerSourceCreate(t, "testapisource1", "Event:v1", "testsa", "svc:testsvc0")
})
t.Run("delete apiserver sources", func(t *testing.T) {
@ -49,7 +49,7 @@ func TestSourceApiServer(t *testing.T) {
})
t.Run("update apiserver source sink service", func(t *testing.T) {
test.apiServerSourceCreate(t, "testapisource3", "Event:1:true", "testsa", "svc:testsvc0")
test.apiServerSourceCreate(t, "testapisource3", "Event:v1:true", "testsa", "svc:testsvc0")
test.serviceCreate(t, "testsvc1")
test.apiServerSourceUpdateSink(t, "testapisource3", "svc:testsvc1")
jpSinkRefNameInSpec := "jsonpath={.spec.sink.ref.name}"
@ -82,7 +82,7 @@ func (test *e2eTest) apiServerSourceDelete(t *testing.T, sourceName string) {
func (test *e2eTest) setupServiceAccountForApiserver(t *testing.T, name string) {
kubectl := kubectl{t, Logger{}}
_, err := kubectl.RunWithOpts([]string{"create", "serviceaccount", name}, runOpts{})
_, err := kubectl.RunWithOpts([]string{"create", "serviceaccount", name, "--namespace", test.kn.namespace}, runOpts{})
if err != nil {
t.Fatalf(fmt.Sprintf("Error executing 'kubectl create serviceaccount test-sa'. Error: %s", err.Error()))
}