From 2a56f07c18ea373bffe15b48e7c84e5ddb90a634 Mon Sep 17 00:00:00 2001 From: Gunjan Vyas Date: Thu, 26 May 2022 16:36:05 +0530 Subject: [PATCH] Added delivery options to broker create (#1670) * Added delivery flags * Added test cases * Added client tests * Added unit tests and updated flag usage * Added broker update command * Added unit tests for update command * Added mock and update retry tests --- docs/cmd/kn_broker.md | 1 + docs/cmd/kn_broker_create.md | 12 +- docs/cmd/kn_broker_update.md | 47 ++++ pkg/eventing/v1/client.go | 132 ++++++++++- pkg/eventing/v1/client_mock.go | 16 +- pkg/eventing/v1/client_mock_test.go | 7 + pkg/eventing/v1/client_test.go | 213 ++++++++++++++++++ pkg/kn/commands/broker/broker.go | 1 + pkg/kn/commands/broker/broker_test.go | 48 ++++ pkg/kn/commands/broker/create.go | 25 +- pkg/kn/commands/broker/create_test.go | 81 +++++++ .../commands/broker/delivery_option_flags.go | 60 +++++ pkg/kn/commands/broker/update.go | 115 ++++++++++ pkg/kn/commands/broker/update_test.go | 160 +++++++++++++ 14 files changed, 908 insertions(+), 10 deletions(-) create mode 100644 docs/cmd/kn_broker_update.md create mode 100644 pkg/kn/commands/broker/delivery_option_flags.go create mode 100644 pkg/kn/commands/broker/update.go create mode 100644 pkg/kn/commands/broker/update_test.go diff --git a/docs/cmd/kn_broker.md b/docs/cmd/kn_broker.md index c08b12348..4237f2e71 100644 --- a/docs/cmd/kn_broker.md +++ b/docs/cmd/kn_broker.md @@ -29,4 +29,5 @@ kn broker * [kn broker delete](kn_broker_delete.md) - Delete a broker * [kn broker describe](kn_broker_describe.md) - Describe broker * [kn broker list](kn_broker_list.md) - List brokers +* [kn broker update](kn_broker_update.md) - Update a broker diff --git a/docs/cmd/kn_broker_create.md b/docs/cmd/kn_broker_create.md index 20b997a46..90bf9b17f 100644 --- a/docs/cmd/kn_broker_create.md +++ b/docs/cmd/kn_broker_create.md @@ -21,9 +21,15 @@ kn broker create NAME ### Options ``` - --class string Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available) - -h, --help help for create - -n, --namespace string Specify the namespace to operate in. + --backoff-delay string The delay before retrying. + --backoff-policy string The retry backoff policy (linear, exponential). + --class string Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available). + --dl-sink string The sink receiving event that could not be sent to a destination. + -h, --help help for create + -n, --namespace string Specify the namespace to operate in. + --retry int32 The minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. + --retry-after-max string An optional upper bound on the duration specified in a "Retry-After" header when calculating backoff times for retrying 429 and 503 response codes. Setting the value to zero ("PT0S") can be used to opt-out of respecting "Retry-After" header values altogether. This value only takes effect if "Retry" is configured, and also depends on specific implementations (Channels, Sources, etc.) choosing to provide this capability. + --timeout string The timeout of each single request. The value must be greater than 0. ``` ### Options inherited from parent commands diff --git a/docs/cmd/kn_broker_update.md b/docs/cmd/kn_broker_update.md new file mode 100644 index 000000000..499344640 --- /dev/null +++ b/docs/cmd/kn_broker_update.md @@ -0,0 +1,47 @@ +## kn broker update + +Update a broker + +``` +kn broker update NAME +``` + +### Examples + +``` + + # Update a broker 'mybroker' in the current namespace with delivery sink svc1 + kn broker update mybroker --dl-sink svc1 + + # Update a broker 'mybroker' in the 'myproject' namespace and with retry 2 seconds + kn broker update mybroker --namespace myproject --retry 2 + +``` + +### Options + +``` + --backoff-delay string The delay before retrying. + --backoff-policy string The retry backoff policy (linear, exponential). + --dl-sink string The sink receiving event that could not be sent to a destination. + -h, --help help for update + -n, --namespace string Specify the namespace to operate in. + --retry int32 The minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink. + --retry-after-max string An optional upper bound on the duration specified in a "Retry-After" header when calculating backoff times for retrying 429 and 503 response codes. Setting the value to zero ("PT0S") can be used to opt-out of respecting "Retry-After" header values altogether. This value only takes effect if "Retry" is configured, and also depends on specific implementations (Channels, Sources, etc.) choosing to provide this capability. + --timeout string The timeout of each single request. The value must be greater than 0. +``` + +### Options inherited from parent commands + +``` + --cluster string name of the kubeconfig cluster to use + --config string kn configuration file (default: ~/.config/kn/config.yaml) + --context string name of the kubeconfig context to use + --kubeconfig string kubectl configuration file (default: ~/.kube/config) + --log-http log http traffic +``` + +### SEE ALSO + +* [kn broker](kn_broker.md) - Manage message brokers + diff --git a/pkg/eventing/v1/client.go b/pkg/eventing/v1/client.go index 0d6effb63..2349936ea 100644 --- a/pkg/eventing/v1/client.go +++ b/pkg/eventing/v1/client.go @@ -19,14 +19,13 @@ import ( "fmt" "time" - "knative.dev/client/pkg/config" - - "k8s.io/client-go/util/retry" - apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/util/retry" + "knative.dev/client/pkg/config" + v1 "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/client/clientset/versioned/scheme" clientv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1" @@ -38,6 +37,7 @@ import ( ) type TriggerUpdateFunc func(origTrigger *eventingv1.Trigger) (*eventingv1.Trigger, error) +type BrokerUpdateFunc func(origBroker *eventingv1.Broker) (*eventingv1.Broker, error) // KnEventingClient to Eventing Sources. All methods are relative to the // namespace specified during construction @@ -64,6 +64,10 @@ type KnEventingClient interface { DeleteBroker(ctx context.Context, name string, timeout time.Duration) error // ListBrokers returns list of broker CRDs ListBrokers(ctx context.Context) (*eventingv1.BrokerList, error) + // UpdateBroker is used to update an instance of broker + UpdateBroker(ctx context.Context, broker *eventingv1.Broker) error + // UpdateBrokerWithRetry is used to update an instance of broker + UpdateBrokerWithRetry(ctx context.Context, name string, updateFunc BrokerUpdateFunc, nrRetries int) error } // KnEventingClient is a combination of Sources client interface and namespace @@ -349,6 +353,45 @@ func (c *knEventingClient) ListBrokers(ctx context.Context) (*eventingv1.BrokerL return brokerListNew, nil } +// UpdateBroker is used to update an instance of broker +func (c *knEventingClient) UpdateBroker(ctx context.Context, broker *eventingv1.Broker) error { + _, err := c.client.Brokers(c.namespace).Update(ctx, broker, meta_v1.UpdateOptions{}) + if err != nil { + return kn_errors.GetError(err) + } + return nil +} + +func (c *knEventingClient) UpdateBrokerWithRetry(ctx context.Context, name string, updateFunc BrokerUpdateFunc, nrRetries int) error { + return updateBrokerWithRetry(ctx, c, name, updateFunc, nrRetries) +} + +func updateBrokerWithRetry(ctx context.Context, c KnEventingClient, name string, updateFunc BrokerUpdateFunc, nrRetries int) error { + b := config.DefaultRetry + b.Steps = nrRetries + updateBrokerFunc := func() error { + return updateBroker(ctx, c, name, updateFunc) + } + err := retry.RetryOnConflict(b, updateBrokerFunc) + return err +} + +func updateBroker(ctx context.Context, c KnEventingClient, name string, updateFunc BrokerUpdateFunc) error { + broker, err := c.GetBroker(ctx, name) + if err != nil { + return err + } + if broker.GetDeletionTimestamp() != nil { + return fmt.Errorf("can't update broker %s because it has been marked for deletion", name) + } + updatedBroker, err := updateFunc(broker.DeepCopy()) + if err != nil { + return err + } + + return c.UpdateBroker(ctx, updatedBroker) +} + // BrokerBuilder is for building the broker type BrokerBuilder struct { broker *eventingv1.Broker @@ -363,6 +406,13 @@ func NewBrokerBuilder(name string) *BrokerBuilder { }} } +// NewBrokerBuilderFromExisting returns broker builder from original broker +func NewBrokerBuilderFromExisting(broker *eventingv1.Broker) *BrokerBuilder { + return &BrokerBuilder{ + broker: broker, + } +} + // WithGvk add the GVK coordinates for read tests func (b *BrokerBuilder) WithGvk() *BrokerBuilder { _ = updateEventingGVK(b.broker) @@ -387,6 +437,80 @@ func (b *BrokerBuilder) Class(class string) *BrokerBuilder { return b } +// DlSink for the broker builder +func (b *BrokerBuilder) DlSink(dlSink *duckv1.Destination) *BrokerBuilder { + empty := duckv1.Destination{} + if dlSink == nil || *dlSink == empty { + return b + } + if b.broker.Spec.Delivery == nil { + b.broker.Spec.Delivery = &v1.DeliverySpec{} + } + b.broker.Spec.Delivery.DeadLetterSink = dlSink + return b +} + +// Retry for the broker builder +func (b *BrokerBuilder) Retry(retry *int32) *BrokerBuilder { + if retry == nil || *retry == 0 { + return b + } + if b.broker.Spec.Delivery == nil { + b.broker.Spec.Delivery = &v1.DeliverySpec{} + } + b.broker.Spec.Delivery.Retry = retry + return b +} + +// Timeout for the broker builder +func (b *BrokerBuilder) Timeout(timeout *string) *BrokerBuilder { + if timeout == nil || *timeout == "" { + return b + } + if b.broker.Spec.Delivery == nil { + b.broker.Spec.Delivery = &v1.DeliverySpec{} + } + b.broker.Spec.Delivery.Timeout = timeout + return b +} + +// BackoffPolicy for the broker builder +func (b *BrokerBuilder) BackoffPolicy(policyType *v1.BackoffPolicyType) *BrokerBuilder { + if policyType == nil || *policyType == "" { + return b + } + if b.broker.Spec.Delivery == nil { + b.broker.Spec.Delivery = &v1.DeliverySpec{} + } + b.broker.Spec.Delivery.BackoffPolicy = policyType + return b +} + +// BackoffDelay for the broker builder +func (b *BrokerBuilder) BackoffDelay(backoffDelay *string) *BrokerBuilder { + if backoffDelay == nil || *backoffDelay == "" { + return b + } + if b.broker.Spec.Delivery == nil { + b.broker.Spec.Delivery = &v1.DeliverySpec{} + } + b.broker.Spec.Delivery.BackoffDelay = backoffDelay + return b +} + +// RetryAfterMax for the broker builder +func (b *BrokerBuilder) RetryAfterMax(max *string) *BrokerBuilder { + if max == nil || *max == "" { + return b + } + if b.broker.Spec.Delivery == nil { + b.broker.Spec.Delivery = &v1.DeliverySpec{} + } + b.broker.Spec.Delivery.RetryAfterMax = max + return b + +} + // Build to return an instance of broker object func (b *BrokerBuilder) Build() *eventingv1.Broker { return b.broker diff --git a/pkg/eventing/v1/client_mock.go b/pkg/eventing/v1/client_mock.go index d4df61864..e8cea8559 100644 --- a/pkg/eventing/v1/client_mock.go +++ b/pkg/eventing/v1/client_mock.go @@ -104,7 +104,7 @@ func (c *MockKnEventingClient) ListTriggers(context.Context) (*eventingv1.Trigge return call.Result[0].(*eventingv1.TriggerList), mock.ErrorOrNil(call.Result[1]) } -// UpdateTrigger records a call for ListTriggers with the expected result and error (nil if none) +// UpdateTrigger records a call for UpdateTrigger with the expected result and error (nil if none) func (sr *EventingRecorder) UpdateTrigger(trigger interface{}, err error) { sr.r.Add("UpdateTrigger", []interface{}{trigger}, []interface{}{err}) } @@ -163,6 +163,20 @@ func (c *MockKnEventingClient) ListBrokers(context.Context) (*eventingv1.BrokerL return call.Result[0].(*eventingv1.BrokerList), mock.ErrorOrNil(call.Result[1]) } +// UpdateBroker records a call for UpdateBroker with the expected result and error (nil if none) +func (sr *EventingRecorder) UpdateBroker(broker *eventingv1.Broker, err error) { + sr.r.Add("UpdateBroker", []interface{}{broker}, []interface{}{err}) +} + +func (c *MockKnEventingClient) UpdateBroker(ctx context.Context, broker *eventingv1.Broker) error { + call := c.recorder.r.VerifyCall("UpdateBroker") + return mock.ErrorOrNil(call.Result[0]) +} + +func (c *MockKnEventingClient) UpdateBrokerWithRetry(ctx context.Context, name string, updateFunc BrokerUpdateFunc, nrRetries int) error { + return updateBrokerWithRetry(ctx, c, name, updateFunc, nrRetries) +} + // Validate validates whether every recorded action has been called func (sr *EventingRecorder) Validate() { sr.r.CheckThatAllRecordedMethodsHaveBeenCalled() diff --git a/pkg/eventing/v1/client_mock_test.go b/pkg/eventing/v1/client_mock_test.go index 0be2e130f..7a84792df 100644 --- a/pkg/eventing/v1/client_mock_test.go +++ b/pkg/eventing/v1/client_mock_test.go @@ -41,6 +41,9 @@ func TestMockKnClient(t *testing.T) { recorder.GetBroker("foo", nil, nil) recorder.DeleteBroker("foo", time.Duration(10)*time.Second, nil) recorder.ListBrokers(nil, nil) + recorder.GetBroker("foo", &eventingv1.Broker{}, nil) + recorder.UpdateBroker(&eventingv1.Broker{}, nil) + recorder.UpdateBroker(&eventingv1.Broker{}, nil) // Call all service ctx := context.Background() @@ -57,6 +60,10 @@ func TestMockKnClient(t *testing.T) { client.GetBroker(ctx, "foo") client.DeleteBroker(ctx, "foo", time.Duration(10)*time.Second) client.ListBrokers(ctx) + client.UpdateBroker(ctx, &eventingv1.Broker{}) + client.UpdateBrokerWithRetry(ctx, "foo", func(origBroker *eventingv1.Broker) (*eventingv1.Broker, error) { + return origBroker, nil + }, 10) // Validate recorder.Validate() diff --git a/pkg/eventing/v1/client_test.go b/pkg/eventing/v1/client_test.go index 8cefd7bff..258ec2147 100644 --- a/pkg/eventing/v1/client_test.go +++ b/pkg/eventing/v1/client_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" + v1 "knative.dev/eventing/pkg/apis/duck/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -334,6 +335,8 @@ func TestBrokerCreate(t *testing.T) { objNew := newBroker(name) brokerObjWithClass := newBrokerWithClass(name) + brokerObjWithDeliveryOptions := newBrokerWithDeliveryOptions(name) + brokerObjWithNilDeliveryOptions := newBrokerWithNilDeliveryOptions(name) server.AddReactor("create", "brokers", func(a client_testing.Action) (bool, runtime.Object, error) { @@ -360,6 +363,56 @@ func TestBrokerCreate(t *testing.T) { err := client.CreateBroker(context.Background(), newBroker("unknown")) assert.ErrorContains(t, err, "unknown") }) + + t.Run("create broker with delivery options", func(t *testing.T) { + err := client.CreateBroker(context.Background(), brokerObjWithDeliveryOptions) + assert.NilError(t, err) + }) + + t.Run("create broker with nil delivery options", func(t *testing.T) { + err := client.CreateBroker(context.Background(), brokerObjWithNilDeliveryOptions) + assert.NilError(t, err) + }) + + t.Run("create broker with nil delivery spec", func(t *testing.T) { + builderFuncs := []func(builder *BrokerBuilder) *BrokerBuilder{ + func(builder *BrokerBuilder) *BrokerBuilder { + var sink = &duckv1.Destination{ + Ref: &duckv1.KReference{Name: "test-svc", Kind: "Service", APIVersion: "serving.knative.dev/v1", Namespace: "default"}, + } + return builder.DlSink(sink) + }, + func(builder *BrokerBuilder) *BrokerBuilder { + var retry int32 = 5 + return builder.Retry(&retry) + }, + func(builder *BrokerBuilder) *BrokerBuilder { + var timeout = "PT5S" + return builder.Timeout(&timeout) + }, + func(builder *BrokerBuilder) *BrokerBuilder { + var policy = v1.BackoffPolicyType("linear") + return builder.BackoffPolicy(&policy) + }, + func(builder *BrokerBuilder) *BrokerBuilder { + var delay = "PT5S" + return builder.BackoffDelay(&delay) + }, + func(builder *BrokerBuilder) *BrokerBuilder { + var max = "PT5S" + return builder.RetryAfterMax(&max) + }, + } + for _, bf := range builderFuncs { + brokerBuilder := NewBrokerBuilder(name) + brokerBuilder.broker.Spec.Delivery = nil + updatedBuilder := bf(brokerBuilder) + + broker := updatedBuilder.Build() + err := client.CreateBroker(context.Background(), broker) + assert.NilError(t, err) + } + }) } func TestBrokerGet(t *testing.T) { @@ -489,6 +542,136 @@ func TestBrokerList(t *testing.T) { }) } +func TestBrokerUpdate(t *testing.T) { + var name = "broker" + server, client := setup() + + obj := newBroker(name) + errorObj := newBroker("error-obj") + updatedObj := newBrokerWithDeliveryOptions(name) + + server.AddReactor("update", "brokers", + func(a client_testing.Action) (bool, runtime.Object, error) { + assert.Equal(t, testNamespace, a.GetNamespace()) + name := a.(client_testing.UpdateAction).GetObject().(metav1.Object).GetName() + if name == "error-obj" { + return true, nil, fmt.Errorf("error while creating broker %s", name) + } + return true, updatedObj, nil + }) + server.AddReactor("get", "brokers", + func(a client_testing.Action) (bool, runtime.Object, error) { + assert.Equal(t, testNamespace, a.GetNamespace()) + return true, obj, nil + }) + + t.Run("update broker without error", func(t *testing.T) { + err := client.UpdateBroker(context.Background(), updatedObj) + assert.NilError(t, err) + }) + + t.Run("create broker with an error returns an error object", func(t *testing.T) { + err := client.UpdateBroker(context.Background(), errorObj) + assert.ErrorContains(t, err, "error while creating broker") + }) +} + +func TestUpdateBrokerWithRetry(t *testing.T) { + serving, client := setup() + var attemptCount, maxAttempts = 0, 5 + serving.AddReactor("get", "brokers", + func(a client_testing.Action) (bool, runtime.Object, error) { + name := a.(client_testing.GetAction).GetName() + if name == "deletedBroker" { + broker := newBroker(name) + now := metav1.Now() + broker.DeletionTimestamp = &now + return true, broker, nil + } + if name == "getErrorBroker" { + return true, nil, errors.NewInternalError(fmt.Errorf("mock internal error")) + } + return true, newBroker(name), nil + }) + + serving.AddReactor("update", "brokers", + func(a client_testing.Action) (bool, runtime.Object, error) { + newBroker := a.(client_testing.UpdateAction).GetObject() + name := newBroker.(metav1.Object).GetName() + + if name == "testBroker" && attemptCount > 0 { + attemptCount-- + return true, nil, errors.NewConflict(eventingv1.Resource("broker"), "errorBroker", fmt.Errorf("error updating because of conflict")) + } + if name == "errorBroker" { + return true, nil, errors.NewInternalError(fmt.Errorf("mock internal error")) + } + return true, NewBrokerBuilderFromExisting(newBroker.(*eventingv1.Broker)).Build(), nil + }) + + t.Run("Update broker successfully without any retries", func(t *testing.T) { + err := client.UpdateBrokerWithRetry(context.Background(), "testBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) { + return broker, nil + }, maxAttempts) + assert.NilError(t, err, "No retries required as no conflict error occurred") + }) + + t.Run("Update broker with retry after max retries", func(t *testing.T) { + attemptCount = maxAttempts - 1 + err := client.UpdateBrokerWithRetry(context.Background(), "testBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) { + return broker, nil + }, maxAttempts) + assert.NilError(t, err, "Update retried %d times and succeeded", maxAttempts) + assert.Equal(t, attemptCount, 0) + }) + + t.Run("Update broker with retry and fail with conflict after exhausting max retries", func(t *testing.T) { + attemptCount = maxAttempts + err := client.UpdateBrokerWithRetry(context.Background(), "testBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) { + return broker, nil + }, maxAttempts) + assert.ErrorType(t, err, errors.IsConflict, "Update retried %d times and failed", maxAttempts) + assert.Equal(t, attemptCount, 0) + }) + + t.Run("Update broker with retry and fail with conflict after exhausting max retries", func(t *testing.T) { + attemptCount = maxAttempts + err := client.UpdateBrokerWithRetry(context.Background(), "testBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) { + return broker, nil + }, maxAttempts) + assert.ErrorType(t, err, errors.IsConflict, "Update retried %d times and failed", maxAttempts) + assert.Equal(t, attemptCount, 0) + }) + + t.Run("Update broker with retry fails with a non conflict error", func(t *testing.T) { + err := client.UpdateBrokerWithRetry(context.Background(), "errorBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) { + return broker, nil + }, maxAttempts) + assert.ErrorType(t, err, errors.IsInternalError) + }) + + t.Run("Update broker with retry fails with resource already deleted error", func(t *testing.T) { + err := client.UpdateBrokerWithRetry(context.Background(), "deletedBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) { + return broker, nil + }, maxAttempts) + assert.ErrorContains(t, err, "marked for deletion") + }) + + t.Run("Update broker with retry fails with error from updateFunc", func(t *testing.T) { + err := client.UpdateBrokerWithRetry(context.Background(), "testBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) { + return broker, fmt.Errorf("error updating object") + }, maxAttempts) + assert.ErrorContains(t, err, "error updating object") + }) + + t.Run("Update broker with retry fails with error from GetBroker", func(t *testing.T) { + err := client.UpdateBrokerWithRetry(context.Background(), "getErrorBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) { + return broker, nil + }, maxAttempts) + assert.ErrorType(t, err, errors.IsInternalError) + }) +} + func newTrigger(name string) *eventingv1.Trigger { return NewTriggerBuilder(name). Namespace(testNamespace). @@ -518,6 +701,36 @@ func newBrokerWithClass(name string) *eventingv1.Broker { Build() } +func newBrokerWithDeliveryOptions(name string) *eventingv1.Broker { + sink := &duckv1.Destination{ + Ref: &duckv1.KReference{Name: "test-svc", Kind: "Service", APIVersion: "serving.knative.dev/v1", Namespace: "default"}, + } + testTimeout := "PT10S" + retry := int32(2) + policy := v1.BackoffPolicyType("linear") + return NewBrokerBuilder(name). + Namespace(testNamespace). + DlSink(sink). + Timeout(&testTimeout). + Retry(&retry). + BackoffDelay(&testTimeout). + BackoffPolicy(&policy). + RetryAfterMax(&testTimeout). + Build() +} + +func newBrokerWithNilDeliveryOptions(name string) *eventingv1.Broker { + return NewBrokerBuilder(name). + Namespace(testNamespace). + DlSink(nil). + Timeout(nil). + Retry(nil). + BackoffDelay(nil). + BackoffPolicy(nil). + RetryAfterMax(nil). + Build() +} + func getBrokerDeleteEvents(name string) []watch.Event { return []watch.Event{ {Type: watch.Added, Object: createBrokerWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", "msg1")}, diff --git a/pkg/kn/commands/broker/broker.go b/pkg/kn/commands/broker/broker.go index b5202f25c..52e45478a 100644 --- a/pkg/kn/commands/broker/broker.go +++ b/pkg/kn/commands/broker/broker.go @@ -33,5 +33,6 @@ func NewBrokerCommand(p *commands.KnParams) *cobra.Command { brokerCmd.AddCommand(NewBrokerDescribeCommand(p)) brokerCmd.AddCommand(NewBrokerDeleteCommand(p)) brokerCmd.AddCommand(NewBrokerListCommand(p)) + brokerCmd.AddCommand(NewBrokerUpdateCommand(p)) return brokerCmd } diff --git a/pkg/kn/commands/broker/broker_test.go b/pkg/kn/commands/broker/broker_test.go index ace84b4c0..94455f239 100644 --- a/pkg/kn/commands/broker/broker_test.go +++ b/pkg/kn/commands/broker/broker_test.go @@ -18,7 +18,13 @@ package broker import ( "bytes" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/clientcmd" + "knative.dev/client/pkg/dynamic" + dynamicfake "knative.dev/client/pkg/dynamic/fake" + v1 "knative.dev/eventing/pkg/apis/duck/v1" + duckv1 "knative.dev/pkg/apis/duck/v1" + servingv1 "knative.dev/serving/pkg/apis/serving/v1" clientv1beta1 "knative.dev/client/pkg/eventing/v1" "knative.dev/client/pkg/kn/commands" @@ -28,6 +34,12 @@ import ( // Helper methods var blankConfig clientcmd.ClientConfig +const ( + testSvc = "test-svc" + testTimeout = "PT10S" + testRetry = int32(5) +) + func init() { var err error blankConfig, err = clientcmd.NewClientConfigFromBytes([]byte(`kind: Config @@ -61,6 +73,14 @@ func executeBrokerCommand(brokerClient clientv1beta1.KnEventingClient, args ...s return brokerClient, nil } + mysvc := &servingv1.Service{ + TypeMeta: metav1.TypeMeta{Kind: "Service", APIVersion: "serving.knative.dev/v1"}, + ObjectMeta: metav1.ObjectMeta{Name: testSvc, Namespace: "default"}, + } + knParams.NewDynamicClient = func(namespace string) (dynamic.KnDynamicClient, error) { + return dynamicfake.CreateFakeKnDynamicClient("default", mysvc), nil + } + cmd := NewBrokerCommand(knParams) cmd.SetArgs(args) cmd.SetOutput(output) @@ -85,3 +105,31 @@ func createBrokerWithNamespace(brokerName, namespace string) *v1beta1.Broker { func createBrokerWithClass(brokerName, class string) *v1beta1.Broker { return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").Class(class).Build() } + +func createBrokerWithDlSink(brokerName, service string) *v1beta1.Broker { + sink := &duckv1.Destination{ + Ref: &duckv1.KReference{Name: service, Kind: "Service", APIVersion: "serving.knative.dev/v1", Namespace: "default"}, + } + return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").DlSink(sink).Build() +} + +func createBrokerWithTimeout(brokerName, timeout string) *v1beta1.Broker { + return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").Timeout(&timeout).Build() +} + +func createBrokerWithRetry(brokerName string, retry int32) *v1beta1.Broker { + return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").Retry(&retry).Build() +} + +func createBrokerWithBackoffPolicy(brokerName, policy string) *v1beta1.Broker { + boPolicy := v1.BackoffPolicyType(policy) + return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").BackoffPolicy(&boPolicy).Build() +} + +func createBrokerWithBackoffDelay(brokerName, delay string) *v1beta1.Broker { + return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").BackoffDelay(&delay).Build() +} + +func createBrokerWithRetryAfterMax(brokerName, timeout string) *v1beta1.Broker { + return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").RetryAfterMax(&timeout).Build() +} diff --git a/pkg/kn/commands/broker/create.go b/pkg/kn/commands/broker/create.go index bb60c2e6c..c90828975 100644 --- a/pkg/kn/commands/broker/create.go +++ b/pkg/kn/commands/broker/create.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/spf13/cobra" + v1 "knative.dev/eventing/pkg/apis/duck/v1" clientv1beta1 "knative.dev/client/pkg/eventing/v1" "knative.dev/client/pkg/kn/commands" @@ -39,6 +40,7 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command { var className string + var deliveryFlags DeliveryOptionFlags cmd := &cobra.Command{ Use: "create NAME", Short: "Create a broker", @@ -59,10 +61,28 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command { return err } + dynamicClient, err := p.NewDynamicClient(namespace) + if err != nil { + return err + } + + destination, err := deliveryFlags.GetDlSink(cmd, dynamicClient, namespace) + if err != nil { + return err + } + + backoffPolicy := v1.BackoffPolicyType(deliveryFlags.BackoffPolicy) + brokerBuilder := clientv1beta1. NewBrokerBuilder(name). Namespace(namespace). - Class(className) + Class(className). + DlSink(destination). + Retry(&deliveryFlags.RetryCount). + Timeout(&deliveryFlags.Timeout). + BackoffPolicy(&backoffPolicy). + BackoffDelay(&deliveryFlags.BackoffDelay). + RetryAfterMax(&deliveryFlags.RetryAfterMax) err = eventingClient.CreateBroker(cmd.Context(), brokerBuilder.Build()) if err != nil { @@ -75,6 +95,7 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command { }, } commands.AddNamespaceFlags(cmd.Flags(), false) - cmd.Flags().StringVar(&className, "class", "", "Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available)") + cmd.Flags().StringVar(&className, "class", "", "Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available).") + deliveryFlags.Add(cmd) return cmd } diff --git a/pkg/kn/commands/broker/create_test.go b/pkg/kn/commands/broker/create_test.go index 2ff3e7fba..02f0b394f 100644 --- a/pkg/kn/commands/broker/create_test.go +++ b/pkg/kn/commands/broker/create_test.go @@ -68,3 +68,84 @@ func TestBrokerCreateWithError(t *testing.T) { assert.ErrorContains(t, err, "broker create") assert.Assert(t, util.ContainsAll(err.Error(), "broker create", "requires", "name", "argument")) } + +func TestBrokerCreateWithDlSink(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + eventingRecorder.CreateBroker(createBrokerWithDlSink(brokerName, testSvc), nil) + + out, err := executeBrokerCommand(eventingClient, "create", brokerName, "--dl-sink", testSvc) + assert.NilError(t, err, "Broker should be created") + assert.Assert(t, util.ContainsAll(out, "Broker", brokerName, "created", "namespace", "default")) + + eventingRecorder.Validate() +} + +func TestBrokerCreateWithTimeout(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + eventingRecorder.CreateBroker(createBrokerWithTimeout(brokerName, testTimeout), nil) + + out, err := executeBrokerCommand(eventingClient, "create", brokerName, "--timeout", testTimeout) + assert.NilError(t, err, "Broker should be created") + assert.Assert(t, util.ContainsAll(out, "Broker", brokerName, "created", "namespace", "default")) + + eventingRecorder.Validate() +} + +func TestBrokerCreateWithRetry(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + eventingRecorder.CreateBroker(createBrokerWithRetry(brokerName, testRetry), nil) + + out, err := executeBrokerCommand(eventingClient, "create", brokerName, "--retry", "5") + assert.NilError(t, err, "Broker should be created") + assert.Assert(t, util.ContainsAll(out, "Broker", brokerName, "created", "namespace", "default")) + + eventingRecorder.Validate() +} + +func TestBrokerCreateWithBackoffPolicy(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + + policies := []string{"linear", "exponential"} + for _, p := range policies { + eventingRecorder.CreateBroker(createBrokerWithBackoffPolicy(brokerName, p), nil) + out, err := executeBrokerCommand(eventingClient, "create", brokerName, "--backoff-policy", p) + + assert.NilError(t, err, "Broker should be created") + assert.Assert(t, util.ContainsAll(out, "Broker", brokerName, "created", "namespace", "default")) + } + eventingRecorder.Validate() +} + +func TestBrokerCreateWithBackoffDelay(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + eventingRecorder.CreateBroker(createBrokerWithBackoffDelay(brokerName, testTimeout), nil) + + out, err := executeBrokerCommand(eventingClient, "create", brokerName, "--backoff-delay", testTimeout) + assert.NilError(t, err, "Broker should be created") + assert.Assert(t, util.ContainsAll(out, "Broker", brokerName, "created", "namespace", "default")) + + eventingRecorder.Validate() +} + +func TestBrokerCreateWithRetryAfterMax(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + eventingRecorder.CreateBroker(createBrokerWithRetryAfterMax(brokerName, testTimeout), nil) + + out, err := executeBrokerCommand(eventingClient, "create", brokerName, "--retry-after-max", testTimeout) + assert.NilError(t, err, "Broker should be created") + assert.Assert(t, util.ContainsAll(out, "Broker", brokerName, "created", "namespace", "default")) + + eventingRecorder.Validate() +} diff --git a/pkg/kn/commands/broker/delivery_option_flags.go b/pkg/kn/commands/broker/delivery_option_flags.go new file mode 100644 index 000000000..6a9d5caed --- /dev/null +++ b/pkg/kn/commands/broker/delivery_option_flags.go @@ -0,0 +1,60 @@ +// Copyright © 2022 The Knative Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package broker + +import ( + "github.com/spf13/cobra" + "knative.dev/client/pkg/dynamic" + "knative.dev/client/pkg/kn/commands/flags" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +type DeliveryOptionFlags struct { + SinkFlags flags.SinkFlags + RetryCount int32 + Timeout string + BackoffPolicy string + BackoffDelay string + RetryAfterMax string +} + +func (d *DeliveryOptionFlags) Add(cmd *cobra.Command) { + d.SinkFlags.AddWithFlagName(cmd, "dl-sink", "") + cmd.Flag("dl-sink").Usage = "The sink receiving event that could not be sent to a destination." + + cmd.Flags().Int32Var(&d.RetryCount, "retry", 0, "The minimum number of retries the sender should attempt when "+ + "sending an event before moving it to the dead letter sink.") + cmd.Flags().StringVar(&d.Timeout, "timeout", "", "The timeout of each single request. The value must be greater than 0.") + cmd.Flags().StringVar(&d.BackoffPolicy, "backoff-policy", "", "The retry backoff policy (linear, exponential).") + cmd.Flags().StringVar(&d.BackoffDelay, "backoff-delay", "", "The delay before retrying.") + cmd.Flags().StringVar(&d.RetryAfterMax, "retry-after-max", "", "An optional upper bound on the duration specified in a "+ + "\"Retry-After\" header when calculating backoff times for retrying 429 and 503 response codes. "+ + "Setting the value to zero (\"PT0S\") can be used to opt-out of respecting \"Retry-After\" header values altogether. "+ + "This value only takes effect if \"Retry\" is configured, and also depends on specific implementations (Channels, Sources, etc.) "+ + "choosing to provide this capability.") +} + +func (d *DeliveryOptionFlags) GetDlSink(cmd *cobra.Command, dynamicClient dynamic.KnDynamicClient, namespace string) (*duckv1.Destination, error) { + var empty = flags.SinkFlags{} + var destination *duckv1.Destination + var err error + if d.SinkFlags != empty { + destination, err = d.SinkFlags.ResolveSink(cmd.Context(), dynamicClient, namespace) + if err != nil { + return nil, err + } + } + return destination, err +} diff --git a/pkg/kn/commands/broker/update.go b/pkg/kn/commands/broker/update.go new file mode 100644 index 000000000..d81ae25cd --- /dev/null +++ b/pkg/kn/commands/broker/update.go @@ -0,0 +1,115 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package broker + +import ( + "errors" + "fmt" + + "github.com/spf13/cobra" + "knative.dev/client/pkg/config" + v1 "knative.dev/client/pkg/eventing/v1" + duckv1 "knative.dev/eventing/pkg/apis/duck/v1" + + "knative.dev/client/pkg/kn/commands" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" +) + +var updateExample = ` + # Update a broker 'mybroker' in the current namespace with delivery sink svc1 + kn broker update mybroker --dl-sink svc1 + + # Update a broker 'mybroker' in the 'myproject' namespace and with retry 2 seconds + kn broker update mybroker --namespace myproject --retry 2 +` + +func NewBrokerUpdateCommand(p *commands.KnParams) *cobra.Command { + var deliveryFlags DeliveryOptionFlags + + cmd := &cobra.Command{ + Use: "update NAME", + Short: "Update a broker", + Example: updateExample, + RunE: func(cmd *cobra.Command, args []string) (err error) { + if len(args) != 1 { + return errors.New("'broker update' requires the broker name given as single argument") + } + name := args[0] + + namespace, err := p.GetNamespace(cmd) + if err != nil { + return err + } + + eventingClient, err := p.NewEventingClient(namespace) + if err != nil { + return err + } + + dynamicClient, err := p.NewDynamicClient(namespace) + if err != nil { + return err + } + + updateFunc := func(origBroker *eventingv1.Broker) (*eventingv1.Broker, error) { + b := v1.NewBrokerBuilderFromExisting(origBroker) + if cmd.Flags().Changed("dl-sink") { + destination, err := deliveryFlags.GetDlSink(cmd, dynamicClient, namespace) + if err != nil { + return nil, err + } + b.DlSink(destination) + } + if cmd.Flags().Changed("retry") { + b.Retry(&deliveryFlags.RetryCount) + } + if cmd.Flags().Changed("timeout") { + b.Timeout(&deliveryFlags.Timeout) + } + if cmd.Flags().Changed("backoff-policy") { + backoffPolicy := duckv1.BackoffPolicyType(deliveryFlags.BackoffPolicy) + b.BackoffPolicy(&backoffPolicy) + } + if cmd.Flags().Changed("backoff-delay") { + b.BackoffDelay(&deliveryFlags.BackoffDelay) + } + if cmd.Flags().Changed("retry-after-max") { + b.RetryAfterMax(&deliveryFlags.RetryAfterMax) + } + return b.Build(), nil + } + err = eventingClient.UpdateBrokerWithRetry(cmd.Context(), name, updateFunc, config.DefaultRetry.Steps) + if err == nil { + fmt.Fprintf(cmd.OutOrStdout(), "Broker '%s' updated in namespace '%s'.\n", name, namespace) + } + return err + }, + PreRunE: func(cmd *cobra.Command, args []string) error { + return preCheck(cmd) + }} + commands.AddNamespaceFlags(cmd.Flags(), false) + deliveryFlags.Add(cmd) + return cmd +} + +func preCheck(cmd *cobra.Command) error { + if cmd.Flags().NFlag() == 0 { + return fmt.Errorf("flag(s) not set\nUsage: %s", cmd.Use) + } + + return nil +} diff --git a/pkg/kn/commands/broker/update_test.go b/pkg/kn/commands/broker/update_test.go new file mode 100644 index 000000000..d7adc31b9 --- /dev/null +++ b/pkg/kn/commands/broker/update_test.go @@ -0,0 +1,160 @@ +/* +Copyright 2022 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package broker + +import ( + "testing" + + "gotest.tools/v3/assert" + clienteventingv1 "knative.dev/client/pkg/eventing/v1" + "knative.dev/client/pkg/util" +) + +func TestBrokerUpdateWithDlSink(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + present := createBroker("test-broker") + updated := createBrokerWithDlSink("test-broker", testSvc) + eventingRecorder.GetBroker("test-broker", present, nil) + eventingRecorder.UpdateBroker(updated, nil) + + out, err := executeBrokerCommand(eventingClient, "update", "test-broker", + "--dl-sink", testSvc) + assert.NilError(t, err, "Broker should be updated") + assert.Assert(t, util.ContainsAll(out, "Broker", "test-broker", "updated", "namespace", "default")) + + eventingRecorder.Validate() +} + +func TestBrokerUpdateWithTimeout(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + present := createBroker("test-broker") + updated := createBrokerWithTimeout("test-broker", "10") + eventingRecorder.GetBroker("test-broker", present, nil) + eventingRecorder.UpdateBroker(updated, nil) + + out, err := executeBrokerCommand(eventingClient, "update", "test-broker", + "--timeout", "10") + assert.NilError(t, err, "Broker should be updated") + assert.Assert(t, util.ContainsAll(out, "Broker", "test-broker", "updated", "namespace", "default")) + + eventingRecorder.Validate() +} + +func TestBrokerUpdateWithRetry(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + present := createBroker("test-broker") + updated := createBrokerWithRetry("test-broker", 5) + eventingRecorder.GetBroker("test-broker", present, nil) + eventingRecorder.UpdateBroker(updated, nil) + + out, err := executeBrokerCommand(eventingClient, "update", "test-broker", + "--retry", "5") + assert.NilError(t, err, "Broker should be updated") + assert.Assert(t, util.ContainsAll(out, "Broker", "test-broker", "updated", "namespace", "default")) + + eventingRecorder.Validate() +} + +func TestBrokerUpdateWithBackoffPolicy(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + present := createBroker("test-broker") + updated := createBrokerWithBackoffPolicy("test-broker", "linear") + eventingRecorder.GetBroker("test-broker", present, nil) + eventingRecorder.UpdateBroker(updated, nil) + + out, err := executeBrokerCommand(eventingClient, "update", "test-broker", + "--backoff-policy", "linear") + assert.NilError(t, err, "Broker should be updated") + assert.Assert(t, util.ContainsAll(out, "Broker", "test-broker", "updated", "namespace", "default")) + + eventingRecorder.Validate() +} + +func TestBrokerUpdateWithBackoffDelay(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + present := createBroker("test-broker") + updated := createBrokerWithBackoffDelay("test-broker", "PT10S") + eventingRecorder.GetBroker("test-broker", present, nil) + eventingRecorder.UpdateBroker(updated, nil) + + out, err := executeBrokerCommand(eventingClient, "update", "test-broker", + "--backoff-delay", "PT10S") + assert.NilError(t, err, "Broker should be updated") + assert.Assert(t, util.ContainsAll(out, "Broker", "test-broker", "updated", "namespace", "default")) + + eventingRecorder.Validate() +} + +func TestBrokerUpdateWithRetryAfterMax(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + present := createBroker("test-broker") + updated := createBrokerWithRetryAfterMax("test-broker", "PT10S") + eventingRecorder.GetBroker("test-broker", present, nil) + eventingRecorder.UpdateBroker(updated, nil) + + out, err := executeBrokerCommand(eventingClient, "update", "test-broker", + "--retry-after-max", "PT10S") + assert.NilError(t, err, "Broker should be updated") + assert.Assert(t, util.ContainsAll(out, "Broker", "test-broker", "updated", "namespace", "default")) + + eventingRecorder.Validate() +} + +func TestBrokerUpdateError(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + present := createBroker("test-broker") + eventingRecorder.GetBroker("test-broker", present, nil) + + _, err := executeBrokerCommand(eventingClient, "update", "test-broker", + "--dl-sink", "absent-svc") + assert.ErrorContains(t, err, "not found") + eventingRecorder.Validate() +} + +func TestBrokerUpdateErrorNoFlags(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + + _, err := executeBrokerCommand(eventingClient, "update", "test-broker") + assert.ErrorContains(t, err, "flag(s) not set") + eventingRecorder.Validate() +} + +func TestBrokerUpdateErrorNoName(t *testing.T) { + eventingClient := clienteventingv1.NewMockKnEventingClient(t) + + eventingRecorder := eventingClient.Recorder() + + _, err := executeBrokerCommand(eventingClient, "update", "--dl-sink", testSvc) + assert.ErrorContains(t, err, "requires the broker name") + eventingRecorder.Validate() +}