Added delivery options to broker create (#1670)

* Added delivery flags

* Added test cases

* Added client tests

* Added unit tests and updated flag usage

* Added broker update command

* Added unit tests for update command

* Added mock and update retry tests
This commit is contained in:
Gunjan Vyas 2022-05-26 16:36:05 +05:30 committed by GitHub
parent 4df601027b
commit 2a56f07c18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 908 additions and 10 deletions

View File

@ -29,4 +29,5 @@ kn broker
* [kn broker delete](kn_broker_delete.md) - Delete a broker * [kn broker delete](kn_broker_delete.md) - Delete a broker
* [kn broker describe](kn_broker_describe.md) - Describe broker * [kn broker describe](kn_broker_describe.md) - Describe broker
* [kn broker list](kn_broker_list.md) - List brokers * [kn broker list](kn_broker_list.md) - List brokers
* [kn broker update](kn_broker_update.md) - Update a broker

View File

@ -21,9 +21,15 @@ kn broker create NAME
### Options ### Options
``` ```
--class string Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available) --backoff-delay string The delay before retrying.
--backoff-policy string The retry backoff policy (linear, exponential).
--class string Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available).
--dl-sink string The sink receiving event that could not be sent to a destination.
-h, --help help for create -h, --help help for create
-n, --namespace string Specify the namespace to operate in. -n, --namespace string Specify the namespace to operate in.
--retry int32 The minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink.
--retry-after-max string An optional upper bound on the duration specified in a "Retry-After" header when calculating backoff times for retrying 429 and 503 response codes. Setting the value to zero ("PT0S") can be used to opt-out of respecting "Retry-After" header values altogether. This value only takes effect if "Retry" is configured, and also depends on specific implementations (Channels, Sources, etc.) choosing to provide this capability.
--timeout string The timeout of each single request. The value must be greater than 0.
``` ```
### Options inherited from parent commands ### Options inherited from parent commands

View File

@ -0,0 +1,47 @@
## kn broker update
Update a broker
```
kn broker update NAME
```
### Examples
```
# Update a broker 'mybroker' in the current namespace with delivery sink svc1
kn broker update mybroker --dl-sink svc1
# Update a broker 'mybroker' in the 'myproject' namespace and with retry 2 seconds
kn broker update mybroker --namespace myproject --retry 2
```
### Options
```
--backoff-delay string The delay before retrying.
--backoff-policy string The retry backoff policy (linear, exponential).
--dl-sink string The sink receiving event that could not be sent to a destination.
-h, --help help for update
-n, --namespace string Specify the namespace to operate in.
--retry int32 The minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink.
--retry-after-max string An optional upper bound on the duration specified in a "Retry-After" header when calculating backoff times for retrying 429 and 503 response codes. Setting the value to zero ("PT0S") can be used to opt-out of respecting "Retry-After" header values altogether. This value only takes effect if "Retry" is configured, and also depends on specific implementations (Channels, Sources, etc.) choosing to provide this capability.
--timeout string The timeout of each single request. The value must be greater than 0.
```
### Options inherited from parent commands
```
--cluster string name of the kubeconfig cluster to use
--config string kn configuration file (default: ~/.config/kn/config.yaml)
--context string name of the kubeconfig context to use
--kubeconfig string kubectl configuration file (default: ~/.kube/config)
--log-http log http traffic
```
### SEE ALSO
* [kn broker](kn_broker.md) - Manage message brokers

View File

@ -19,14 +19,13 @@ import (
"fmt" "fmt"
"time" "time"
"knative.dev/client/pkg/config"
"k8s.io/client-go/util/retry"
apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/retry"
"knative.dev/client/pkg/config"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/client/clientset/versioned/scheme" "knative.dev/eventing/pkg/client/clientset/versioned/scheme"
clientv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1" clientv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
@ -38,6 +37,7 @@ import (
) )
type TriggerUpdateFunc func(origTrigger *eventingv1.Trigger) (*eventingv1.Trigger, error) type TriggerUpdateFunc func(origTrigger *eventingv1.Trigger) (*eventingv1.Trigger, error)
type BrokerUpdateFunc func(origBroker *eventingv1.Broker) (*eventingv1.Broker, error)
// KnEventingClient to Eventing Sources. All methods are relative to the // KnEventingClient to Eventing Sources. All methods are relative to the
// namespace specified during construction // namespace specified during construction
@ -64,6 +64,10 @@ type KnEventingClient interface {
DeleteBroker(ctx context.Context, name string, timeout time.Duration) error DeleteBroker(ctx context.Context, name string, timeout time.Duration) error
// ListBrokers returns list of broker CRDs // ListBrokers returns list of broker CRDs
ListBrokers(ctx context.Context) (*eventingv1.BrokerList, error) ListBrokers(ctx context.Context) (*eventingv1.BrokerList, error)
// UpdateBroker is used to update an instance of broker
UpdateBroker(ctx context.Context, broker *eventingv1.Broker) error
// UpdateBrokerWithRetry is used to update an instance of broker
UpdateBrokerWithRetry(ctx context.Context, name string, updateFunc BrokerUpdateFunc, nrRetries int) error
} }
// KnEventingClient is a combination of Sources client interface and namespace // KnEventingClient is a combination of Sources client interface and namespace
@ -349,6 +353,45 @@ func (c *knEventingClient) ListBrokers(ctx context.Context) (*eventingv1.BrokerL
return brokerListNew, nil return brokerListNew, nil
} }
// UpdateBroker is used to update an instance of broker
func (c *knEventingClient) UpdateBroker(ctx context.Context, broker *eventingv1.Broker) error {
_, err := c.client.Brokers(c.namespace).Update(ctx, broker, meta_v1.UpdateOptions{})
if err != nil {
return kn_errors.GetError(err)
}
return nil
}
func (c *knEventingClient) UpdateBrokerWithRetry(ctx context.Context, name string, updateFunc BrokerUpdateFunc, nrRetries int) error {
return updateBrokerWithRetry(ctx, c, name, updateFunc, nrRetries)
}
func updateBrokerWithRetry(ctx context.Context, c KnEventingClient, name string, updateFunc BrokerUpdateFunc, nrRetries int) error {
b := config.DefaultRetry
b.Steps = nrRetries
updateBrokerFunc := func() error {
return updateBroker(ctx, c, name, updateFunc)
}
err := retry.RetryOnConflict(b, updateBrokerFunc)
return err
}
func updateBroker(ctx context.Context, c KnEventingClient, name string, updateFunc BrokerUpdateFunc) error {
broker, err := c.GetBroker(ctx, name)
if err != nil {
return err
}
if broker.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't update broker %s because it has been marked for deletion", name)
}
updatedBroker, err := updateFunc(broker.DeepCopy())
if err != nil {
return err
}
return c.UpdateBroker(ctx, updatedBroker)
}
// BrokerBuilder is for building the broker // BrokerBuilder is for building the broker
type BrokerBuilder struct { type BrokerBuilder struct {
broker *eventingv1.Broker broker *eventingv1.Broker
@ -363,6 +406,13 @@ func NewBrokerBuilder(name string) *BrokerBuilder {
}} }}
} }
// NewBrokerBuilderFromExisting returns broker builder from original broker
func NewBrokerBuilderFromExisting(broker *eventingv1.Broker) *BrokerBuilder {
return &BrokerBuilder{
broker: broker,
}
}
// WithGvk add the GVK coordinates for read tests // WithGvk add the GVK coordinates for read tests
func (b *BrokerBuilder) WithGvk() *BrokerBuilder { func (b *BrokerBuilder) WithGvk() *BrokerBuilder {
_ = updateEventingGVK(b.broker) _ = updateEventingGVK(b.broker)
@ -387,6 +437,80 @@ func (b *BrokerBuilder) Class(class string) *BrokerBuilder {
return b return b
} }
// DlSink for the broker builder
func (b *BrokerBuilder) DlSink(dlSink *duckv1.Destination) *BrokerBuilder {
empty := duckv1.Destination{}
if dlSink == nil || *dlSink == empty {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.DeadLetterSink = dlSink
return b
}
// Retry for the broker builder
func (b *BrokerBuilder) Retry(retry *int32) *BrokerBuilder {
if retry == nil || *retry == 0 {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.Retry = retry
return b
}
// Timeout for the broker builder
func (b *BrokerBuilder) Timeout(timeout *string) *BrokerBuilder {
if timeout == nil || *timeout == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.Timeout = timeout
return b
}
// BackoffPolicy for the broker builder
func (b *BrokerBuilder) BackoffPolicy(policyType *v1.BackoffPolicyType) *BrokerBuilder {
if policyType == nil || *policyType == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.BackoffPolicy = policyType
return b
}
// BackoffDelay for the broker builder
func (b *BrokerBuilder) BackoffDelay(backoffDelay *string) *BrokerBuilder {
if backoffDelay == nil || *backoffDelay == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.BackoffDelay = backoffDelay
return b
}
// RetryAfterMax for the broker builder
func (b *BrokerBuilder) RetryAfterMax(max *string) *BrokerBuilder {
if max == nil || *max == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.RetryAfterMax = max
return b
}
// Build to return an instance of broker object // Build to return an instance of broker object
func (b *BrokerBuilder) Build() *eventingv1.Broker { func (b *BrokerBuilder) Build() *eventingv1.Broker {
return b.broker return b.broker

View File

@ -104,7 +104,7 @@ func (c *MockKnEventingClient) ListTriggers(context.Context) (*eventingv1.Trigge
return call.Result[0].(*eventingv1.TriggerList), mock.ErrorOrNil(call.Result[1]) return call.Result[0].(*eventingv1.TriggerList), mock.ErrorOrNil(call.Result[1])
} }
// UpdateTrigger records a call for ListTriggers with the expected result and error (nil if none) // UpdateTrigger records a call for UpdateTrigger with the expected result and error (nil if none)
func (sr *EventingRecorder) UpdateTrigger(trigger interface{}, err error) { func (sr *EventingRecorder) UpdateTrigger(trigger interface{}, err error) {
sr.r.Add("UpdateTrigger", []interface{}{trigger}, []interface{}{err}) sr.r.Add("UpdateTrigger", []interface{}{trigger}, []interface{}{err})
} }
@ -163,6 +163,20 @@ func (c *MockKnEventingClient) ListBrokers(context.Context) (*eventingv1.BrokerL
return call.Result[0].(*eventingv1.BrokerList), mock.ErrorOrNil(call.Result[1]) return call.Result[0].(*eventingv1.BrokerList), mock.ErrorOrNil(call.Result[1])
} }
// UpdateBroker records a call for UpdateBroker with the expected result and error (nil if none)
func (sr *EventingRecorder) UpdateBroker(broker *eventingv1.Broker, err error) {
sr.r.Add("UpdateBroker", []interface{}{broker}, []interface{}{err})
}
func (c *MockKnEventingClient) UpdateBroker(ctx context.Context, broker *eventingv1.Broker) error {
call := c.recorder.r.VerifyCall("UpdateBroker")
return mock.ErrorOrNil(call.Result[0])
}
func (c *MockKnEventingClient) UpdateBrokerWithRetry(ctx context.Context, name string, updateFunc BrokerUpdateFunc, nrRetries int) error {
return updateBrokerWithRetry(ctx, c, name, updateFunc, nrRetries)
}
// Validate validates whether every recorded action has been called // Validate validates whether every recorded action has been called
func (sr *EventingRecorder) Validate() { func (sr *EventingRecorder) Validate() {
sr.r.CheckThatAllRecordedMethodsHaveBeenCalled() sr.r.CheckThatAllRecordedMethodsHaveBeenCalled()

View File

@ -41,6 +41,9 @@ func TestMockKnClient(t *testing.T) {
recorder.GetBroker("foo", nil, nil) recorder.GetBroker("foo", nil, nil)
recorder.DeleteBroker("foo", time.Duration(10)*time.Second, nil) recorder.DeleteBroker("foo", time.Duration(10)*time.Second, nil)
recorder.ListBrokers(nil, nil) recorder.ListBrokers(nil, nil)
recorder.GetBroker("foo", &eventingv1.Broker{}, nil)
recorder.UpdateBroker(&eventingv1.Broker{}, nil)
recorder.UpdateBroker(&eventingv1.Broker{}, nil)
// Call all service // Call all service
ctx := context.Background() ctx := context.Background()
@ -57,6 +60,10 @@ func TestMockKnClient(t *testing.T) {
client.GetBroker(ctx, "foo") client.GetBroker(ctx, "foo")
client.DeleteBroker(ctx, "foo", time.Duration(10)*time.Second) client.DeleteBroker(ctx, "foo", time.Duration(10)*time.Second)
client.ListBrokers(ctx) client.ListBrokers(ctx)
client.UpdateBroker(ctx, &eventingv1.Broker{})
client.UpdateBrokerWithRetry(ctx, "foo", func(origBroker *eventingv1.Broker) (*eventingv1.Broker, error) {
return origBroker, nil
}, 10)
// Validate // Validate
recorder.Validate() recorder.Validate()

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -334,6 +335,8 @@ func TestBrokerCreate(t *testing.T) {
objNew := newBroker(name) objNew := newBroker(name)
brokerObjWithClass := newBrokerWithClass(name) brokerObjWithClass := newBrokerWithClass(name)
brokerObjWithDeliveryOptions := newBrokerWithDeliveryOptions(name)
brokerObjWithNilDeliveryOptions := newBrokerWithNilDeliveryOptions(name)
server.AddReactor("create", "brokers", server.AddReactor("create", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) { func(a client_testing.Action) (bool, runtime.Object, error) {
@ -360,6 +363,56 @@ func TestBrokerCreate(t *testing.T) {
err := client.CreateBroker(context.Background(), newBroker("unknown")) err := client.CreateBroker(context.Background(), newBroker("unknown"))
assert.ErrorContains(t, err, "unknown") assert.ErrorContains(t, err, "unknown")
}) })
t.Run("create broker with delivery options", func(t *testing.T) {
err := client.CreateBroker(context.Background(), brokerObjWithDeliveryOptions)
assert.NilError(t, err)
})
t.Run("create broker with nil delivery options", func(t *testing.T) {
err := client.CreateBroker(context.Background(), brokerObjWithNilDeliveryOptions)
assert.NilError(t, err)
})
t.Run("create broker with nil delivery spec", func(t *testing.T) {
builderFuncs := []func(builder *BrokerBuilder) *BrokerBuilder{
func(builder *BrokerBuilder) *BrokerBuilder {
var sink = &duckv1.Destination{
Ref: &duckv1.KReference{Name: "test-svc", Kind: "Service", APIVersion: "serving.knative.dev/v1", Namespace: "default"},
}
return builder.DlSink(sink)
},
func(builder *BrokerBuilder) *BrokerBuilder {
var retry int32 = 5
return builder.Retry(&retry)
},
func(builder *BrokerBuilder) *BrokerBuilder {
var timeout = "PT5S"
return builder.Timeout(&timeout)
},
func(builder *BrokerBuilder) *BrokerBuilder {
var policy = v1.BackoffPolicyType("linear")
return builder.BackoffPolicy(&policy)
},
func(builder *BrokerBuilder) *BrokerBuilder {
var delay = "PT5S"
return builder.BackoffDelay(&delay)
},
func(builder *BrokerBuilder) *BrokerBuilder {
var max = "PT5S"
return builder.RetryAfterMax(&max)
},
}
for _, bf := range builderFuncs {
brokerBuilder := NewBrokerBuilder(name)
brokerBuilder.broker.Spec.Delivery = nil
updatedBuilder := bf(brokerBuilder)
broker := updatedBuilder.Build()
err := client.CreateBroker(context.Background(), broker)
assert.NilError(t, err)
}
})
} }
func TestBrokerGet(t *testing.T) { func TestBrokerGet(t *testing.T) {
@ -489,6 +542,136 @@ func TestBrokerList(t *testing.T) {
}) })
} }
func TestBrokerUpdate(t *testing.T) {
var name = "broker"
server, client := setup()
obj := newBroker(name)
errorObj := newBroker("error-obj")
updatedObj := newBrokerWithDeliveryOptions(name)
server.AddReactor("update", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) {
assert.Equal(t, testNamespace, a.GetNamespace())
name := a.(client_testing.UpdateAction).GetObject().(metav1.Object).GetName()
if name == "error-obj" {
return true, nil, fmt.Errorf("error while creating broker %s", name)
}
return true, updatedObj, nil
})
server.AddReactor("get", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) {
assert.Equal(t, testNamespace, a.GetNamespace())
return true, obj, nil
})
t.Run("update broker without error", func(t *testing.T) {
err := client.UpdateBroker(context.Background(), updatedObj)
assert.NilError(t, err)
})
t.Run("create broker with an error returns an error object", func(t *testing.T) {
err := client.UpdateBroker(context.Background(), errorObj)
assert.ErrorContains(t, err, "error while creating broker")
})
}
func TestUpdateBrokerWithRetry(t *testing.T) {
serving, client := setup()
var attemptCount, maxAttempts = 0, 5
serving.AddReactor("get", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) {
name := a.(client_testing.GetAction).GetName()
if name == "deletedBroker" {
broker := newBroker(name)
now := metav1.Now()
broker.DeletionTimestamp = &now
return true, broker, nil
}
if name == "getErrorBroker" {
return true, nil, errors.NewInternalError(fmt.Errorf("mock internal error"))
}
return true, newBroker(name), nil
})
serving.AddReactor("update", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) {
newBroker := a.(client_testing.UpdateAction).GetObject()
name := newBroker.(metav1.Object).GetName()
if name == "testBroker" && attemptCount > 0 {
attemptCount--
return true, nil, errors.NewConflict(eventingv1.Resource("broker"), "errorBroker", fmt.Errorf("error updating because of conflict"))
}
if name == "errorBroker" {
return true, nil, errors.NewInternalError(fmt.Errorf("mock internal error"))
}
return true, NewBrokerBuilderFromExisting(newBroker.(*eventingv1.Broker)).Build(), nil
})
t.Run("Update broker successfully without any retries", func(t *testing.T) {
err := client.UpdateBrokerWithRetry(context.Background(), "testBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) {
return broker, nil
}, maxAttempts)
assert.NilError(t, err, "No retries required as no conflict error occurred")
})
t.Run("Update broker with retry after max retries", func(t *testing.T) {
attemptCount = maxAttempts - 1
err := client.UpdateBrokerWithRetry(context.Background(), "testBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) {
return broker, nil
}, maxAttempts)
assert.NilError(t, err, "Update retried %d times and succeeded", maxAttempts)
assert.Equal(t, attemptCount, 0)
})
t.Run("Update broker with retry and fail with conflict after exhausting max retries", func(t *testing.T) {
attemptCount = maxAttempts
err := client.UpdateBrokerWithRetry(context.Background(), "testBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) {
return broker, nil
}, maxAttempts)
assert.ErrorType(t, err, errors.IsConflict, "Update retried %d times and failed", maxAttempts)
assert.Equal(t, attemptCount, 0)
})
t.Run("Update broker with retry and fail with conflict after exhausting max retries", func(t *testing.T) {
attemptCount = maxAttempts
err := client.UpdateBrokerWithRetry(context.Background(), "testBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) {
return broker, nil
}, maxAttempts)
assert.ErrorType(t, err, errors.IsConflict, "Update retried %d times and failed", maxAttempts)
assert.Equal(t, attemptCount, 0)
})
t.Run("Update broker with retry fails with a non conflict error", func(t *testing.T) {
err := client.UpdateBrokerWithRetry(context.Background(), "errorBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) {
return broker, nil
}, maxAttempts)
assert.ErrorType(t, err, errors.IsInternalError)
})
t.Run("Update broker with retry fails with resource already deleted error", func(t *testing.T) {
err := client.UpdateBrokerWithRetry(context.Background(), "deletedBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) {
return broker, nil
}, maxAttempts)
assert.ErrorContains(t, err, "marked for deletion")
})
t.Run("Update broker with retry fails with error from updateFunc", func(t *testing.T) {
err := client.UpdateBrokerWithRetry(context.Background(), "testBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) {
return broker, fmt.Errorf("error updating object")
}, maxAttempts)
assert.ErrorContains(t, err, "error updating object")
})
t.Run("Update broker with retry fails with error from GetBroker", func(t *testing.T) {
err := client.UpdateBrokerWithRetry(context.Background(), "getErrorBroker", func(broker *eventingv1.Broker) (*eventingv1.Broker, error) {
return broker, nil
}, maxAttempts)
assert.ErrorType(t, err, errors.IsInternalError)
})
}
func newTrigger(name string) *eventingv1.Trigger { func newTrigger(name string) *eventingv1.Trigger {
return NewTriggerBuilder(name). return NewTriggerBuilder(name).
Namespace(testNamespace). Namespace(testNamespace).
@ -518,6 +701,36 @@ func newBrokerWithClass(name string) *eventingv1.Broker {
Build() Build()
} }
func newBrokerWithDeliveryOptions(name string) *eventingv1.Broker {
sink := &duckv1.Destination{
Ref: &duckv1.KReference{Name: "test-svc", Kind: "Service", APIVersion: "serving.knative.dev/v1", Namespace: "default"},
}
testTimeout := "PT10S"
retry := int32(2)
policy := v1.BackoffPolicyType("linear")
return NewBrokerBuilder(name).
Namespace(testNamespace).
DlSink(sink).
Timeout(&testTimeout).
Retry(&retry).
BackoffDelay(&testTimeout).
BackoffPolicy(&policy).
RetryAfterMax(&testTimeout).
Build()
}
func newBrokerWithNilDeliveryOptions(name string) *eventingv1.Broker {
return NewBrokerBuilder(name).
Namespace(testNamespace).
DlSink(nil).
Timeout(nil).
Retry(nil).
BackoffDelay(nil).
BackoffPolicy(nil).
RetryAfterMax(nil).
Build()
}
func getBrokerDeleteEvents(name string) []watch.Event { func getBrokerDeleteEvents(name string) []watch.Event {
return []watch.Event{ return []watch.Event{
{Type: watch.Added, Object: createBrokerWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", "msg1")}, {Type: watch.Added, Object: createBrokerWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", "msg1")},

View File

@ -33,5 +33,6 @@ func NewBrokerCommand(p *commands.KnParams) *cobra.Command {
brokerCmd.AddCommand(NewBrokerDescribeCommand(p)) brokerCmd.AddCommand(NewBrokerDescribeCommand(p))
brokerCmd.AddCommand(NewBrokerDeleteCommand(p)) brokerCmd.AddCommand(NewBrokerDeleteCommand(p))
brokerCmd.AddCommand(NewBrokerListCommand(p)) brokerCmd.AddCommand(NewBrokerListCommand(p))
brokerCmd.AddCommand(NewBrokerUpdateCommand(p))
return brokerCmd return brokerCmd
} }

View File

@ -18,7 +18,13 @@ package broker
import ( import (
"bytes" "bytes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
"knative.dev/client/pkg/dynamic"
dynamicfake "knative.dev/client/pkg/dynamic/fake"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"
clientv1beta1 "knative.dev/client/pkg/eventing/v1" clientv1beta1 "knative.dev/client/pkg/eventing/v1"
"knative.dev/client/pkg/kn/commands" "knative.dev/client/pkg/kn/commands"
@ -28,6 +34,12 @@ import (
// Helper methods // Helper methods
var blankConfig clientcmd.ClientConfig var blankConfig clientcmd.ClientConfig
const (
testSvc = "test-svc"
testTimeout = "PT10S"
testRetry = int32(5)
)
func init() { func init() {
var err error var err error
blankConfig, err = clientcmd.NewClientConfigFromBytes([]byte(`kind: Config blankConfig, err = clientcmd.NewClientConfigFromBytes([]byte(`kind: Config
@ -61,6 +73,14 @@ func executeBrokerCommand(brokerClient clientv1beta1.KnEventingClient, args ...s
return brokerClient, nil return brokerClient, nil
} }
mysvc := &servingv1.Service{
TypeMeta: metav1.TypeMeta{Kind: "Service", APIVersion: "serving.knative.dev/v1"},
ObjectMeta: metav1.ObjectMeta{Name: testSvc, Namespace: "default"},
}
knParams.NewDynamicClient = func(namespace string) (dynamic.KnDynamicClient, error) {
return dynamicfake.CreateFakeKnDynamicClient("default", mysvc), nil
}
cmd := NewBrokerCommand(knParams) cmd := NewBrokerCommand(knParams)
cmd.SetArgs(args) cmd.SetArgs(args)
cmd.SetOutput(output) cmd.SetOutput(output)
@ -85,3 +105,31 @@ func createBrokerWithNamespace(brokerName, namespace string) *v1beta1.Broker {
func createBrokerWithClass(brokerName, class string) *v1beta1.Broker { func createBrokerWithClass(brokerName, class string) *v1beta1.Broker {
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").Class(class).Build() return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").Class(class).Build()
} }
func createBrokerWithDlSink(brokerName, service string) *v1beta1.Broker {
sink := &duckv1.Destination{
Ref: &duckv1.KReference{Name: service, Kind: "Service", APIVersion: "serving.knative.dev/v1", Namespace: "default"},
}
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").DlSink(sink).Build()
}
func createBrokerWithTimeout(brokerName, timeout string) *v1beta1.Broker {
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").Timeout(&timeout).Build()
}
func createBrokerWithRetry(brokerName string, retry int32) *v1beta1.Broker {
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").Retry(&retry).Build()
}
func createBrokerWithBackoffPolicy(brokerName, policy string) *v1beta1.Broker {
boPolicy := v1.BackoffPolicyType(policy)
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").BackoffPolicy(&boPolicy).Build()
}
func createBrokerWithBackoffDelay(brokerName, delay string) *v1beta1.Broker {
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").BackoffDelay(&delay).Build()
}
func createBrokerWithRetryAfterMax(brokerName, timeout string) *v1beta1.Broker {
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").RetryAfterMax(&timeout).Build()
}

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"github.com/spf13/cobra" "github.com/spf13/cobra"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
clientv1beta1 "knative.dev/client/pkg/eventing/v1" clientv1beta1 "knative.dev/client/pkg/eventing/v1"
"knative.dev/client/pkg/kn/commands" "knative.dev/client/pkg/kn/commands"
@ -39,6 +40,7 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command {
var className string var className string
var deliveryFlags DeliveryOptionFlags
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "create NAME", Use: "create NAME",
Short: "Create a broker", Short: "Create a broker",
@ -59,10 +61,28 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command {
return err return err
} }
dynamicClient, err := p.NewDynamicClient(namespace)
if err != nil {
return err
}
destination, err := deliveryFlags.GetDlSink(cmd, dynamicClient, namespace)
if err != nil {
return err
}
backoffPolicy := v1.BackoffPolicyType(deliveryFlags.BackoffPolicy)
brokerBuilder := clientv1beta1. brokerBuilder := clientv1beta1.
NewBrokerBuilder(name). NewBrokerBuilder(name).
Namespace(namespace). Namespace(namespace).
Class(className) Class(className).
DlSink(destination).
Retry(&deliveryFlags.RetryCount).
Timeout(&deliveryFlags.Timeout).
BackoffPolicy(&backoffPolicy).
BackoffDelay(&deliveryFlags.BackoffDelay).
RetryAfterMax(&deliveryFlags.RetryAfterMax)
err = eventingClient.CreateBroker(cmd.Context(), brokerBuilder.Build()) err = eventingClient.CreateBroker(cmd.Context(), brokerBuilder.Build())
if err != nil { if err != nil {
@ -75,6 +95,7 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command {
}, },
} }
commands.AddNamespaceFlags(cmd.Flags(), false) commands.AddNamespaceFlags(cmd.Flags(), false)
cmd.Flags().StringVar(&className, "class", "", "Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available)") cmd.Flags().StringVar(&className, "class", "", "Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available).")
deliveryFlags.Add(cmd)
return cmd return cmd
} }

View File

@ -68,3 +68,84 @@ func TestBrokerCreateWithError(t *testing.T) {
assert.ErrorContains(t, err, "broker create") assert.ErrorContains(t, err, "broker create")
assert.Assert(t, util.ContainsAll(err.Error(), "broker create", "requires", "name", "argument")) assert.Assert(t, util.ContainsAll(err.Error(), "broker create", "requires", "name", "argument"))
} }
func TestBrokerCreateWithDlSink(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
eventingRecorder.CreateBroker(createBrokerWithDlSink(brokerName, testSvc), nil)
out, err := executeBrokerCommand(eventingClient, "create", brokerName, "--dl-sink", testSvc)
assert.NilError(t, err, "Broker should be created")
assert.Assert(t, util.ContainsAll(out, "Broker", brokerName, "created", "namespace", "default"))
eventingRecorder.Validate()
}
func TestBrokerCreateWithTimeout(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
eventingRecorder.CreateBroker(createBrokerWithTimeout(brokerName, testTimeout), nil)
out, err := executeBrokerCommand(eventingClient, "create", brokerName, "--timeout", testTimeout)
assert.NilError(t, err, "Broker should be created")
assert.Assert(t, util.ContainsAll(out, "Broker", brokerName, "created", "namespace", "default"))
eventingRecorder.Validate()
}
func TestBrokerCreateWithRetry(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
eventingRecorder.CreateBroker(createBrokerWithRetry(brokerName, testRetry), nil)
out, err := executeBrokerCommand(eventingClient, "create", brokerName, "--retry", "5")
assert.NilError(t, err, "Broker should be created")
assert.Assert(t, util.ContainsAll(out, "Broker", brokerName, "created", "namespace", "default"))
eventingRecorder.Validate()
}
func TestBrokerCreateWithBackoffPolicy(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
policies := []string{"linear", "exponential"}
for _, p := range policies {
eventingRecorder.CreateBroker(createBrokerWithBackoffPolicy(brokerName, p), nil)
out, err := executeBrokerCommand(eventingClient, "create", brokerName, "--backoff-policy", p)
assert.NilError(t, err, "Broker should be created")
assert.Assert(t, util.ContainsAll(out, "Broker", brokerName, "created", "namespace", "default"))
}
eventingRecorder.Validate()
}
func TestBrokerCreateWithBackoffDelay(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
eventingRecorder.CreateBroker(createBrokerWithBackoffDelay(brokerName, testTimeout), nil)
out, err := executeBrokerCommand(eventingClient, "create", brokerName, "--backoff-delay", testTimeout)
assert.NilError(t, err, "Broker should be created")
assert.Assert(t, util.ContainsAll(out, "Broker", brokerName, "created", "namespace", "default"))
eventingRecorder.Validate()
}
func TestBrokerCreateWithRetryAfterMax(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
eventingRecorder.CreateBroker(createBrokerWithRetryAfterMax(brokerName, testTimeout), nil)
out, err := executeBrokerCommand(eventingClient, "create", brokerName, "--retry-after-max", testTimeout)
assert.NilError(t, err, "Broker should be created")
assert.Assert(t, util.ContainsAll(out, "Broker", brokerName, "created", "namespace", "default"))
eventingRecorder.Validate()
}

View File

@ -0,0 +1,60 @@
// Copyright © 2022 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package broker
import (
"github.com/spf13/cobra"
"knative.dev/client/pkg/dynamic"
"knative.dev/client/pkg/kn/commands/flags"
duckv1 "knative.dev/pkg/apis/duck/v1"
)
type DeliveryOptionFlags struct {
SinkFlags flags.SinkFlags
RetryCount int32
Timeout string
BackoffPolicy string
BackoffDelay string
RetryAfterMax string
}
func (d *DeliveryOptionFlags) Add(cmd *cobra.Command) {
d.SinkFlags.AddWithFlagName(cmd, "dl-sink", "")
cmd.Flag("dl-sink").Usage = "The sink receiving event that could not be sent to a destination."
cmd.Flags().Int32Var(&d.RetryCount, "retry", 0, "The minimum number of retries the sender should attempt when "+
"sending an event before moving it to the dead letter sink.")
cmd.Flags().StringVar(&d.Timeout, "timeout", "", "The timeout of each single request. The value must be greater than 0.")
cmd.Flags().StringVar(&d.BackoffPolicy, "backoff-policy", "", "The retry backoff policy (linear, exponential).")
cmd.Flags().StringVar(&d.BackoffDelay, "backoff-delay", "", "The delay before retrying.")
cmd.Flags().StringVar(&d.RetryAfterMax, "retry-after-max", "", "An optional upper bound on the duration specified in a "+
"\"Retry-After\" header when calculating backoff times for retrying 429 and 503 response codes. "+
"Setting the value to zero (\"PT0S\") can be used to opt-out of respecting \"Retry-After\" header values altogether. "+
"This value only takes effect if \"Retry\" is configured, and also depends on specific implementations (Channels, Sources, etc.) "+
"choosing to provide this capability.")
}
func (d *DeliveryOptionFlags) GetDlSink(cmd *cobra.Command, dynamicClient dynamic.KnDynamicClient, namespace string) (*duckv1.Destination, error) {
var empty = flags.SinkFlags{}
var destination *duckv1.Destination
var err error
if d.SinkFlags != empty {
destination, err = d.SinkFlags.ResolveSink(cmd.Context(), dynamicClient, namespace)
if err != nil {
return nil, err
}
}
return destination, err
}

View File

@ -0,0 +1,115 @@
/*
Copyright 2022 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package broker
import (
"errors"
"fmt"
"github.com/spf13/cobra"
"knative.dev/client/pkg/config"
v1 "knative.dev/client/pkg/eventing/v1"
duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/client/pkg/kn/commands"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
)
var updateExample = `
# Update a broker 'mybroker' in the current namespace with delivery sink svc1
kn broker update mybroker --dl-sink svc1
# Update a broker 'mybroker' in the 'myproject' namespace and with retry 2 seconds
kn broker update mybroker --namespace myproject --retry 2
`
func NewBrokerUpdateCommand(p *commands.KnParams) *cobra.Command {
var deliveryFlags DeliveryOptionFlags
cmd := &cobra.Command{
Use: "update NAME",
Short: "Update a broker",
Example: updateExample,
RunE: func(cmd *cobra.Command, args []string) (err error) {
if len(args) != 1 {
return errors.New("'broker update' requires the broker name given as single argument")
}
name := args[0]
namespace, err := p.GetNamespace(cmd)
if err != nil {
return err
}
eventingClient, err := p.NewEventingClient(namespace)
if err != nil {
return err
}
dynamicClient, err := p.NewDynamicClient(namespace)
if err != nil {
return err
}
updateFunc := func(origBroker *eventingv1.Broker) (*eventingv1.Broker, error) {
b := v1.NewBrokerBuilderFromExisting(origBroker)
if cmd.Flags().Changed("dl-sink") {
destination, err := deliveryFlags.GetDlSink(cmd, dynamicClient, namespace)
if err != nil {
return nil, err
}
b.DlSink(destination)
}
if cmd.Flags().Changed("retry") {
b.Retry(&deliveryFlags.RetryCount)
}
if cmd.Flags().Changed("timeout") {
b.Timeout(&deliveryFlags.Timeout)
}
if cmd.Flags().Changed("backoff-policy") {
backoffPolicy := duckv1.BackoffPolicyType(deliveryFlags.BackoffPolicy)
b.BackoffPolicy(&backoffPolicy)
}
if cmd.Flags().Changed("backoff-delay") {
b.BackoffDelay(&deliveryFlags.BackoffDelay)
}
if cmd.Flags().Changed("retry-after-max") {
b.RetryAfterMax(&deliveryFlags.RetryAfterMax)
}
return b.Build(), nil
}
err = eventingClient.UpdateBrokerWithRetry(cmd.Context(), name, updateFunc, config.DefaultRetry.Steps)
if err == nil {
fmt.Fprintf(cmd.OutOrStdout(), "Broker '%s' updated in namespace '%s'.\n", name, namespace)
}
return err
},
PreRunE: func(cmd *cobra.Command, args []string) error {
return preCheck(cmd)
}}
commands.AddNamespaceFlags(cmd.Flags(), false)
deliveryFlags.Add(cmd)
return cmd
}
func preCheck(cmd *cobra.Command) error {
if cmd.Flags().NFlag() == 0 {
return fmt.Errorf("flag(s) not set\nUsage: %s", cmd.Use)
}
return nil
}

View File

@ -0,0 +1,160 @@
/*
Copyright 2022 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package broker
import (
"testing"
"gotest.tools/v3/assert"
clienteventingv1 "knative.dev/client/pkg/eventing/v1"
"knative.dev/client/pkg/util"
)
func TestBrokerUpdateWithDlSink(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
present := createBroker("test-broker")
updated := createBrokerWithDlSink("test-broker", testSvc)
eventingRecorder.GetBroker("test-broker", present, nil)
eventingRecorder.UpdateBroker(updated, nil)
out, err := executeBrokerCommand(eventingClient, "update", "test-broker",
"--dl-sink", testSvc)
assert.NilError(t, err, "Broker should be updated")
assert.Assert(t, util.ContainsAll(out, "Broker", "test-broker", "updated", "namespace", "default"))
eventingRecorder.Validate()
}
func TestBrokerUpdateWithTimeout(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
present := createBroker("test-broker")
updated := createBrokerWithTimeout("test-broker", "10")
eventingRecorder.GetBroker("test-broker", present, nil)
eventingRecorder.UpdateBroker(updated, nil)
out, err := executeBrokerCommand(eventingClient, "update", "test-broker",
"--timeout", "10")
assert.NilError(t, err, "Broker should be updated")
assert.Assert(t, util.ContainsAll(out, "Broker", "test-broker", "updated", "namespace", "default"))
eventingRecorder.Validate()
}
func TestBrokerUpdateWithRetry(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
present := createBroker("test-broker")
updated := createBrokerWithRetry("test-broker", 5)
eventingRecorder.GetBroker("test-broker", present, nil)
eventingRecorder.UpdateBroker(updated, nil)
out, err := executeBrokerCommand(eventingClient, "update", "test-broker",
"--retry", "5")
assert.NilError(t, err, "Broker should be updated")
assert.Assert(t, util.ContainsAll(out, "Broker", "test-broker", "updated", "namespace", "default"))
eventingRecorder.Validate()
}
func TestBrokerUpdateWithBackoffPolicy(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
present := createBroker("test-broker")
updated := createBrokerWithBackoffPolicy("test-broker", "linear")
eventingRecorder.GetBroker("test-broker", present, nil)
eventingRecorder.UpdateBroker(updated, nil)
out, err := executeBrokerCommand(eventingClient, "update", "test-broker",
"--backoff-policy", "linear")
assert.NilError(t, err, "Broker should be updated")
assert.Assert(t, util.ContainsAll(out, "Broker", "test-broker", "updated", "namespace", "default"))
eventingRecorder.Validate()
}
func TestBrokerUpdateWithBackoffDelay(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
present := createBroker("test-broker")
updated := createBrokerWithBackoffDelay("test-broker", "PT10S")
eventingRecorder.GetBroker("test-broker", present, nil)
eventingRecorder.UpdateBroker(updated, nil)
out, err := executeBrokerCommand(eventingClient, "update", "test-broker",
"--backoff-delay", "PT10S")
assert.NilError(t, err, "Broker should be updated")
assert.Assert(t, util.ContainsAll(out, "Broker", "test-broker", "updated", "namespace", "default"))
eventingRecorder.Validate()
}
func TestBrokerUpdateWithRetryAfterMax(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
present := createBroker("test-broker")
updated := createBrokerWithRetryAfterMax("test-broker", "PT10S")
eventingRecorder.GetBroker("test-broker", present, nil)
eventingRecorder.UpdateBroker(updated, nil)
out, err := executeBrokerCommand(eventingClient, "update", "test-broker",
"--retry-after-max", "PT10S")
assert.NilError(t, err, "Broker should be updated")
assert.Assert(t, util.ContainsAll(out, "Broker", "test-broker", "updated", "namespace", "default"))
eventingRecorder.Validate()
}
func TestBrokerUpdateError(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
present := createBroker("test-broker")
eventingRecorder.GetBroker("test-broker", present, nil)
_, err := executeBrokerCommand(eventingClient, "update", "test-broker",
"--dl-sink", "absent-svc")
assert.ErrorContains(t, err, "not found")
eventingRecorder.Validate()
}
func TestBrokerUpdateErrorNoFlags(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
_, err := executeBrokerCommand(eventingClient, "update", "test-broker")
assert.ErrorContains(t, err, "flag(s) not set")
eventingRecorder.Validate()
}
func TestBrokerUpdateErrorNoName(t *testing.T) {
eventingClient := clienteventingv1.NewMockKnEventingClient(t)
eventingRecorder := eventingClient.Recorder()
_, err := executeBrokerCommand(eventingClient, "update", "--dl-sink", testSvc)
assert.ErrorContains(t, err, "requires the broker name")
eventingRecorder.Validate()
}