client/pkg/sources/v1/ping_client.go

250 lines
7.5 KiB
Go

// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v1
import (
"context"
"fmt"
"knative.dev/client/pkg/config"
"k8s.io/client-go/util/retry"
"k8s.io/apimachinery/pkg/runtime"
"knative.dev/client/pkg/util"
"knative.dev/eventing/pkg/client/clientset/versioned/scheme"
knerrors "knative.dev/client/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
clientv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/sources/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
)
type PingSourceUpdateFunc func(origSource *sourcesv1.PingSource) (*sourcesv1.PingSource, error)
// Interface for interacting with a Ping source
type KnPingSourcesClient interface {
// GetPingSource fetches a Ping source by its name
GetPingSource(ctx context.Context, name string) (*sourcesv1.PingSource, error)
// CreatePingSource creates a Ping source
CreatePingSource(ctx context.Context, pingSource *sourcesv1.PingSource) error
// UpdatePingSource updates a Ping source
UpdatePingSource(ctx context.Context, pingSource *sourcesv1.PingSource) error
// UpdatePingSourceWithRetry updates a Ping source and retries on conflict
UpdatePingSourceWithRetry(ctx context.Context, name string, updateFunc PingSourceUpdateFunc, nrRetries int) error
// DeletePingSource deletes a Ping source
DeletePingSource(ctx context.Context, name string) error
// ListPingSource lists all Ping sources
// TODO: Support list configs like in service list
ListPingSource(ctx context.Context) (*sourcesv1.PingSourceList, error)
// Get namespace for this source
Namespace() string
}
// knSourcesClient is a combination of Sources client interface and namespace
// Temporarily help to add sources dependencies
// May be changed when adding real sources features
type pingSourcesClient struct {
client clientv1.PingSourceInterface
namespace string
}
// NewKnSourcesClient is to invoke Eventing Sources Client API to create object
func newKnPingSourcesClient(client clientv1.PingSourceInterface, namespace string) KnPingSourcesClient {
return &pingSourcesClient{
client: client,
namespace: namespace,
}
}
// Get the namespace for which this client has been created
func (c *pingSourcesClient) Namespace() string {
return c.namespace
}
func (c *pingSourcesClient) CreatePingSource(ctx context.Context, pingsource *sourcesv1.PingSource) error {
if pingsource.Spec.Sink.Ref == nil && pingsource.Spec.Sink.URI == nil {
return fmt.Errorf("a sink is required for creating a source")
}
_, err := c.client.Create(ctx, pingsource, metav1.CreateOptions{})
if err != nil {
return knerrors.GetError(err)
}
return nil
}
func (c *pingSourcesClient) UpdatePingSource(ctx context.Context, pingSource *sourcesv1.PingSource) error {
_, err := c.client.Update(ctx, pingSource, metav1.UpdateOptions{})
if err != nil {
return knerrors.GetError(err)
}
return nil
}
func (c *pingSourcesClient) UpdatePingSourceWithRetry(ctx context.Context, name string, updateFunc PingSourceUpdateFunc, nrRetries int) error {
return updatePingSourceWithRetry(ctx, c, name, updateFunc, nrRetries)
}
func updatePingSourceWithRetry(ctx context.Context, c KnPingSourcesClient, name string, updateFunc PingSourceUpdateFunc, nrRetries int) error {
b := config.DefaultRetry
b.Steps = nrRetries
err := retry.RetryOnConflict(b, func() error {
return updatePingSource(ctx, c, name, updateFunc)
})
return err
}
func updatePingSource(ctx context.Context, c KnPingSourcesClient, name string, updateFunc PingSourceUpdateFunc) error {
source, err := c.GetPingSource(ctx, name)
if err != nil {
return err
}
if source.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't update ping source %s because it has been marked for deletion", name)
}
updatedSource, err := updateFunc(source.DeepCopy())
if err != nil {
return err
}
return c.UpdatePingSource(ctx, updatedSource)
}
func (c *pingSourcesClient) DeletePingSource(ctx context.Context, name string) error {
err := c.client.Delete(ctx, name, metav1.DeleteOptions{})
if err != nil {
return knerrors.GetError(err)
}
return nil
}
func (c *pingSourcesClient) GetPingSource(ctx context.Context, name string) (*sourcesv1.PingSource, error) {
source, err := c.client.Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, knerrors.GetError(err)
}
err = updatePingSourceGVK(source)
if err != nil {
return nil, err
}
return source, nil
}
// ListPingSource returns the available Ping sources
func (c *pingSourcesClient) ListPingSource(ctx context.Context) (*sourcesv1.PingSourceList, error) {
sourceList, err := c.client.List(ctx, metav1.ListOptions{})
if err != nil {
return nil, knerrors.GetError(err)
}
return updatePingSourceListGVK(sourceList)
}
func updatePingSourceGVK(obj runtime.Object) error {
return util.UpdateGroupVersionKindWithScheme(obj, sourcesv1.SchemeGroupVersion, scheme.Scheme)
}
func updatePingSourceListGVK(sourceList *sourcesv1.PingSourceList) (*sourcesv1.PingSourceList, error) {
sourceListNew := sourceList.DeepCopy()
err := updatePingSourceGVK(sourceListNew)
if err != nil {
return nil, err
}
sourceListNew.Items = make([]sourcesv1.PingSource, len(sourceList.Items))
for idx, source := range sourceList.Items {
sourceClone := source.DeepCopy()
err := updatePingSourceGVK(sourceClone)
if err != nil {
return nil, err
}
sourceListNew.Items[idx] = *sourceClone
}
return sourceListNew, nil
}
// Builder for building up Ping sources
type PingSourceBuilder struct {
pingSource *sourcesv1.PingSource
}
func NewPingSourceBuilder(name string) *PingSourceBuilder {
return &PingSourceBuilder{pingSource: &sourcesv1.PingSource{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}}
}
func NewPingSourceBuilderFromExisting(pingsource *sourcesv1.PingSource) *PingSourceBuilder {
return &PingSourceBuilder{pingSource: pingsource.DeepCopy()}
}
func (b *PingSourceBuilder) Schedule(schedule string) *PingSourceBuilder {
b.pingSource.Spec.Schedule = schedule
return b
}
func (b *PingSourceBuilder) Data(data string) *PingSourceBuilder {
b.pingSource.Spec.Data = data
return b
}
func (b *PingSourceBuilder) DataBase64(data string) *PingSourceBuilder {
b.pingSource.Spec.DataBase64 = data
return b
}
func (b *PingSourceBuilder) Sink(sink duckv1.Destination) *PingSourceBuilder {
b.pingSource.Spec.Sink = sink
return b
}
// CloudEventOverrides adds given Cloud Event override extensions map to source spec
func (b *PingSourceBuilder) CloudEventOverrides(ceo map[string]string, toRemove []string) *PingSourceBuilder {
if ceo == nil && len(toRemove) == 0 {
return b
}
ceOverrides := b.pingSource.Spec.CloudEventOverrides
if ceOverrides == nil {
ceOverrides = &duckv1.CloudEventOverrides{Extensions: map[string]string{}}
b.pingSource.Spec.CloudEventOverrides = ceOverrides
}
for k, v := range ceo {
ceOverrides.Extensions[k] = v
}
for _, r := range toRemove {
delete(ceOverrides.Extensions, r)
}
return b
}
func (b *PingSourceBuilder) Build() *sourcesv1.PingSource {
return b.pingSource
}