feat(service create): Wait for a service to be ready when its created (#156)

* feat(service create): Added --no-wait and --wait-timeout

By default, `kn service create` blocks until the service is either
created or an error occured during service creation.

With the option --no-wait the behaviour can be switched to an
async mode so that that kn returns immediately after the service is
created without waiting for a successful Ready status condition.

The timeout for how long to wait can be configured with --wait-timeout
If a timeout occur, that doesn't mean that the service is not created,
but the wait just returns. The default value is 60 seconds.

In wait mode, print out the service URL as a last line (so that it can be used together with `tail -1`) to extract the service URL after the service is created.

Fixes #54

* chore(service create): Tolerate if obeservedGeneration has not been set yet during startup

* chore(service create): Refactored based on review comments

* Introduced an --async flag (replacing --wait and --no-wait)
* Added proper retry handling on the list watch
* Updated help message

* chore(service wait): Added a new test for sync behaviour
This commit is contained in:
Roland Huß 2019-06-28 14:57:06 +02:00 committed by Knative Prow Robot
parent 483979bba4
commit a7d1bc9dc0
10 changed files with 739 additions and 56 deletions

View File

@ -36,6 +36,7 @@ kn service create NAME --image IMAGE [flags]
### Options
```
--async Create service and don't wait for it to become ready.
--concurrency-limit int Hard Limit of concurrent requests to be processed by a single replica.
--concurrency-target int Recommendation for when to scale up based on the concurrent number of incoming request. Defaults to --concurrency-limit when given.
-e, --env stringArray Environment variable to set. NAME=value; you may provide this flag any number of times to set multiple environment variables.
@ -49,6 +50,7 @@ kn service create NAME --image IMAGE [flags]
-n, --namespace string List the requested object(s) in given namespace.
--requests-cpu string The requested CPU (e.g., 250m).
--requests-memory string The requested CPU (e.g., 64Mi).
--wait-timeout int Seconds to wait before giving up on waiting for service to be ready (default: 60). (default 60)
```
### Options inherited from parent commands

View File

@ -17,17 +17,22 @@ package service
import (
"errors"
"fmt"
"github.com/knative/client/pkg/kn/commands"
servingv1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1"
serving_v1alpha1_api "github.com/knative/serving/pkg/apis/serving/v1alpha1"
serving_v1alpha1_client "github.com/knative/serving/pkg/client/clientset/versioned/typed/serving/v1alpha1"
"github.com/spf13/cobra"
"io"
"time"
corev1 "k8s.io/api/core/v1"
api_errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func NewServiceCreateCommand(p *commands.KnParams) *cobra.Command {
var editFlags ConfigurationEditFlags
var waitFlags commands.WaitFlags
serviceCreateCommand := &cobra.Command{
Use: "create NAME --image IMAGE",
@ -53,10 +58,11 @@ func NewServiceCreateCommand(p *commands.KnParams) *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) (err error) {
if len(args) != 1 {
return errors.New("requires the service name.")
return errors.New("'service create' requires the service name given as single argument")
}
name := args[0]
if editFlags.Image == "" {
return errors.New("requires the image name to run.")
return errors.New("'service create' requires the image name to run provided with the --image option")
}
namespace, err := p.GetNamespace(cmd)
@ -64,55 +70,121 @@ func NewServiceCreateCommand(p *commands.KnParams) *cobra.Command {
return err
}
service := servingv1alpha1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: args[0],
Namespace: namespace,
},
}
service.Spec.DeprecatedRunLatest = &servingv1alpha1.RunLatestType{
Configuration: servingv1alpha1.ConfigurationSpec{
DeprecatedRevisionTemplate: &servingv1alpha1.RevisionTemplateSpec{
Spec: servingv1alpha1.RevisionSpec{
DeprecatedContainer: &corev1.Container{},
},
},
},
}
err = editFlags.Apply(&service, cmd)
service, err := constructService(cmd, editFlags, name, namespace)
if err != nil {
return err
}
client, err := p.ServingFactory()
if err != nil {
return err
}
var serviceExists bool = false
if editFlags.ForceCreate {
existingService, err := client.Services(namespace).Get(args[0], v1.GetOptions{})
if err == nil {
serviceExists = true
service.ResourceVersion = existingService.ResourceVersion
_, err = client.Services(namespace).Update(&service)
if err != nil {
return err
}
fmt.Fprintf(cmd.OutOrStdout(), "Service '%s' successfully replaced in namespace '%s'.\n", args[0], namespace)
}
serviceExists, err := serviceExists(client, service.Name, namespace)
if err != nil {
return err
}
if !serviceExists {
_, err = client.Services(namespace).Create(&service)
if editFlags.ForceCreate && serviceExists {
err = replaceService(client, service, namespace, cmd.OutOrStdout())
} else {
err = createService(client, service, namespace, cmd.OutOrStdout())
}
if err != nil {
return err
}
if !waitFlags.Async {
waitForReady := newServiceWaitForReady(client, namespace)
err := waitForReady.Wait(name, time.Duration(waitFlags.TimeoutInSeconds)*time.Second, cmd.OutOrStdout())
if err != nil {
return err
}
fmt.Fprintf(cmd.OutOrStdout(), "Service '%s' successfully created in namespace '%s'.\n", args[0], namespace)
return showUrl(client, name, namespace, cmd.OutOrStdout())
}
return nil
},
}
commands.AddNamespaceFlags(serviceCreateCommand.Flags(), false)
editFlags.AddCreateFlags(serviceCreateCommand)
waitFlags.AddConditionWaitFlags(serviceCreateCommand, 60, "service")
return serviceCreateCommand
}
func createService(client serving_v1alpha1_client.ServingV1alpha1Interface, service *serving_v1alpha1_api.Service, namespace string, out io.Writer) error {
_, err := client.Services(namespace).Create(service)
if err != nil {
return err
}
fmt.Fprintf(out, "Service '%s' successfully created in namespace '%s'.\n", service.Name, namespace)
return nil
}
func replaceService(client serving_v1alpha1_client.ServingV1alpha1Interface, service *serving_v1alpha1_api.Service, namespace string, out io.Writer) error {
existingService, err := client.Services(namespace).Get(service.Name, v1.GetOptions{})
if err != nil {
return err
}
service.ResourceVersion = existingService.ResourceVersion
_, err = client.Services(namespace).Update(service)
if err != nil {
return err
}
fmt.Fprintf(out, "Service '%s' successfully replaced in namespace '%s'.\n", service.Name, namespace)
return nil
}
func serviceExists(client serving_v1alpha1_client.ServingV1alpha1Interface, name string, namespace string) (bool, error) {
_, err := client.Services(namespace).Get(name, v1.GetOptions{})
if api_errors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}
// Create service struct from provided options
func constructService(cmd *cobra.Command, editFlags ConfigurationEditFlags, name string, namespace string) (*serving_v1alpha1_api.Service,
error) {
service := serving_v1alpha1_api.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
// TODO: Should it always be `runLatest` ?
service.Spec.DeprecatedRunLatest = &serving_v1alpha1_api.RunLatestType{
Configuration: serving_v1alpha1_api.ConfigurationSpec{
DeprecatedRevisionTemplate: &serving_v1alpha1_api.RevisionTemplateSpec{
Spec: serving_v1alpha1_api.RevisionSpec{
DeprecatedContainer: &corev1.Container{},
},
},
},
}
err := editFlags.Apply(&service, cmd)
if err != nil {
return nil, err
}
return &service, nil
}
func showUrl(client serving_v1alpha1_client.ServingV1alpha1Interface, serviceName string, namespace string, out io.Writer) error {
service, err := client.Services(namespace).Get(serviceName, v1.GetOptions{})
if err != nil {
return fmt.Errorf("cannot fetch service '%s' in namespace '%s' for extracting the URL: %v", serviceName, namespace, err)
}
url := service.Status.URL.String()
if url == "" {
url = service.Status.DeprecatedDomain
}
fmt.Fprintln(out, "\nService URL:")
fmt.Fprintf(out, "%s\n", url)
return nil
}

View File

@ -21,8 +21,14 @@ import (
"strings"
"testing"
api_errors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"github.com/knative/client/pkg/kn/commands"
servinglib "github.com/knative/client/pkg/serving"
"github.com/knative/client/pkg/wait"
"github.com/knative/serving/pkg/apis/serving/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
@ -30,19 +36,26 @@ import (
client_testing "k8s.io/client-go/testing"
)
func fakeServiceCreate(args []string) (
func fakeServiceCreate(args []string, withExistingService bool, sync bool) (
action client_testing.Action,
created *v1alpha1.Service,
output string,
err error) {
knParams := &commands.KnParams{}
cmd, fakeServing, buf := commands.CreateTestKnCommand(NewServiceCommand(knParams), knParams)
fakeServing.AddReactor("*", "*",
fakeServing.AddReactor("get", "services",
func(a client_testing.Action) (bool, runtime.Object, error) {
if withExistingService {
return true, &v1alpha1.Service{}, nil
}
return true, nil, api_errors.NewNotFound(schema.GroupResource{}, "")
})
fakeServing.AddReactor("create", "services",
func(a client_testing.Action) (bool, runtime.Object, error) {
createAction, ok := a.(client_testing.CreateAction)
action = createAction
if !ok {
return true, nil, fmt.Errorf("wrong kind of action %v", action)
return true, nil, fmt.Errorf("wrong kind of action %v", a)
}
created, ok = createAction.GetObject().(*v1alpha1.Service)
if !ok {
@ -50,6 +63,23 @@ func fakeServiceCreate(args []string) (
}
return true, created, nil
})
if sync {
fakeServing.AddWatchReactor("services",
func(a client_testing.Action) (bool, watch.Interface, error) {
watchAction := a.(client_testing.WatchAction)
_, found := watchAction.GetWatchRestrictions().Fields.RequiresExactMatch("metadata.name")
if !found {
return true, nil, errors.New("no field selector on metadata.name found")
}
w := wait.NewFakeWatch(getServiceEvents())
w.Start()
return true, w, nil
})
fakeServing.AddReactor("get", "services",
func(a client_testing.Action) (bool, runtime.Object, error) {
return true, &v1alpha1.Service{}, nil
})
}
cmd.SetArgs(args)
err = cmd.Execute()
if err != nil {
@ -59,9 +89,17 @@ func fakeServiceCreate(args []string) (
return
}
func getServiceEvents() []watch.Event {
return []watch.Event{
{watch.Added, wait.CreateTestServiceWithConditions(corev1.ConditionUnknown, corev1.ConditionUnknown, "")},
{watch.Modified, wait.CreateTestServiceWithConditions(corev1.ConditionUnknown, corev1.ConditionTrue, "")},
{watch.Modified, wait.CreateTestServiceWithConditions(corev1.ConditionTrue, corev1.ConditionTrue, "")},
}
}
func TestServiceCreateImage(t *testing.T) {
action, created, output, err := fakeServiceCreate([]string{
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz"})
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz", "--async"}, false, false)
if err != nil {
t.Fatal(err)
} else if !action.Matches("create", "services") {
@ -78,9 +116,33 @@ func TestServiceCreateImage(t *testing.T) {
}
}
func TestServiceCreateImageSync(t *testing.T) {
action, created, output, err := fakeServiceCreate([]string{
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz"}, true, true)
if err != nil {
t.Fatal(err)
} else if !action.Matches("create", "services") {
t.Fatalf("Bad action %v", action)
}
template, err := servinglib.GetRevisionTemplate(created)
if err != nil {
t.Fatal(err)
}
if template.Spec.DeprecatedContainer.Image != "gcr.io/foo/bar:baz" {
t.Fatalf("wrong image set: %v", template.Spec.DeprecatedContainer.Image)
}
if !strings.Contains(output, "foo") || !strings.Contains(output, "created") ||
!strings.Contains(output, commands.FakeNamespace) {
t.Fatalf("wrong stdout message: %v", output)
}
if !strings.Contains(output, "OK") || !strings.Contains(output, "Waiting") {
t.Fatalf("not running in sync mode")
}
}
func TestServiceCreateEnv(t *testing.T) {
action, created, _, err := fakeServiceCreate([]string{
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz", "-e", "A=DOGS", "--env", "B=WOLVES"})
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz", "-e", "A=DOGS", "--env", "B=WOLVES", "--async"}, false, false)
if err != nil {
t.Fatal(err)
@ -111,7 +173,7 @@ func TestServiceCreateEnv(t *testing.T) {
func TestServiceCreateWithRequests(t *testing.T) {
action, created, _, err := fakeServiceCreate([]string{
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz", "--requests-cpu", "250m", "--requests-memory", "64Mi"})
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz", "--requests-cpu", "250m", "--requests-memory", "64Mi", "--async"}, false, false)
if err != nil {
t.Fatal(err)
@ -137,7 +199,7 @@ func TestServiceCreateWithRequests(t *testing.T) {
func TestServiceCreateWithLimits(t *testing.T) {
action, created, _, err := fakeServiceCreate([]string{
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz", "--limits-cpu", "1000m", "--limits-memory", "1024Mi"})
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz", "--limits-cpu", "1000m", "--limits-memory", "1024Mi", "--async"}, false, false)
if err != nil {
t.Fatal(err)
@ -163,7 +225,7 @@ func TestServiceCreateWithLimits(t *testing.T) {
func TestServiceCreateRequestsLimitsCPU(t *testing.T) {
action, created, _, err := fakeServiceCreate([]string{
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz", "--requests-cpu", "250m", "--limits-cpu", "1000m"})
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz", "--requests-cpu", "250m", "--limits-cpu", "1000m", "--async"}, false, false)
if err != nil {
t.Fatal(err)
@ -200,7 +262,7 @@ func TestServiceCreateRequestsLimitsCPU(t *testing.T) {
func TestServiceCreateRequestsLimitsMemory(t *testing.T) {
action, created, _, err := fakeServiceCreate([]string{
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz", "--requests-memory", "64Mi", "--limits-memory", "1024Mi"})
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz", "--requests-memory", "64Mi", "--limits-memory", "1024Mi", "--async"}, false, false)
if err != nil {
t.Fatal(err)
@ -238,7 +300,7 @@ func TestServiceCreateRequestsLimitsMemory(t *testing.T) {
func TestServiceCreateMaxMinScale(t *testing.T) {
action, created, _, err := fakeServiceCreate([]string{
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz",
"--min-scale", "1", "--max-scale", "5", "--concurrency-target", "10", "--concurrency-limit", "100"})
"--min-scale", "1", "--max-scale", "5", "--concurrency-target", "10", "--concurrency-limit", "100", "--async"}, false, false)
if err != nil {
t.Fatal(err)
@ -275,7 +337,7 @@ func TestServiceCreateRequestsLimitsCPUMemory(t *testing.T) {
action, created, _, err := fakeServiceCreate([]string{
"service", "create", "foo", "--image", "gcr.io/foo/bar:baz",
"--requests-cpu", "250m", "--limits-cpu", "1000m",
"--requests-memory", "64Mi", "--limits-memory", "1024Mi"})
"--requests-memory", "64Mi", "--limits-memory", "1024Mi", "--async"}, false, false)
if err != nil {
t.Fatal(err)
@ -322,12 +384,12 @@ func parseQuantity(t *testing.T, quantityString string) resource.Quantity {
func TestServiceCreateImageForce(t *testing.T) {
_, _, _, err := fakeServiceCreate([]string{
"service", "create", "foo", "--image", "gcr.io/foo/bar:v1"})
"service", "create", "foo", "--image", "gcr.io/foo/bar:v1", "--async"}, false, false)
if err != nil {
t.Fatal(err)
}
action, created, output, err := fakeServiceCreate([]string{
"service", "create", "foo", "--force", "--image", "gcr.io/foo/bar:v2"})
"service", "create", "foo", "--force", "--image", "gcr.io/foo/bar:v2", "--async"}, false, false)
if err != nil {
t.Fatal(err)
} else if !action.Matches("create", "services") {
@ -345,12 +407,12 @@ func TestServiceCreateImageForce(t *testing.T) {
func TestServiceCreateEnvForce(t *testing.T) {
_, _, _, err := fakeServiceCreate([]string{
"service", "create", "foo", "--image", "gcr.io/foo/bar:v1", "-e", "A=DOGS", "--env", "B=WOLVES"})
"service", "create", "foo", "--image", "gcr.io/foo/bar:v1", "-e", "A=DOGS", "--env", "B=WOLVES", "--async"}, false, false)
if err != nil {
t.Fatal(err)
}
action, created, output, err := fakeServiceCreate([]string{
"service", "create", "foo", "--force", "--image", "gcr.io/foo/bar:v2", "-e", "A=CATS", "--env", "B=LIONS"})
"service", "create", "foo", "--force", "--image", "gcr.io/foo/bar:v2", "-e", "A=CATS", "--env", "B=LIONS", "--async"}, false, false)
if err != nil {
t.Fatal(err)

View File

@ -0,0 +1,42 @@
// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package service
import (
"fmt"
"github.com/knative/client/pkg/wait"
"github.com/knative/pkg/apis"
serving_v1alpha1_api "github.com/knative/serving/pkg/apis/serving/v1alpha1"
serving_v1alpha1_client "github.com/knative/serving/pkg/client/clientset/versioned/typed/serving/v1alpha1"
"k8s.io/apimachinery/pkg/runtime"
)
// Create wait arguments for a Knative service which can be used to wait for
// a create/update options to be finished
// Can be used by `service_create` and `service_update`, hence this extra file
func newServiceWaitForReady(client serving_v1alpha1_client.ServingV1alpha1Interface, namespace string) wait.WaitForReady {
return wait.NewWaitForReady(
"service",
client.Services(namespace).Watch,
serviceConditionExtractor)
}
func serviceConditionExtractor(obj runtime.Object) (apis.Conditions, error) {
service, ok := obj.(*serving_v1alpha1_api.Service)
if !ok {
return nil, fmt.Errorf("%v is not a service", obj)
}
return apis.Conditions(service.Status.Conditions), nil
}

View File

@ -0,0 +1,40 @@
// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package commands
import (
"fmt"
"github.com/spf13/cobra"
)
// Flags for tuning wait behaviour
type WaitFlags struct {
// Timeout in seconds for how long to wait for a command to return
TimeoutInSeconds int
// If set then just apply resources and don't wait
Async bool
}
// Add flags which influence the sync/async behaviour when creating or updating
// resources. Set `waitDefault` argument if the default behaviour is synchronous.
// Use `what` for describing what is waited for.
func (p *WaitFlags) AddConditionWaitFlags(command *cobra.Command, waitTimeoutDefault int, what string) {
waitUsage := fmt.Sprintf("Create %s and don't wait for it to become ready.", what)
command.Flags().BoolVar(&p.Async, "async", false, waitUsage)
timeoutUsage := fmt.Sprintf("Seconds to wait before giving up on waiting for %s to be ready (default: %d).", what, waitTimeoutDefault)
command.Flags().IntVar(&p.TimeoutInSeconds, "wait-timeout", waitTimeoutDefault, timeoutUsage)
}

View File

@ -0,0 +1,78 @@
// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package commands
import (
"github.com/spf13/cobra"
"strings"
"testing"
)
type waitTestCase struct {
args []string
timeoutExpected int
isAsyncExpected bool
isParseErrorExpected bool
}
func TestAddWaitForReadyFlags(t *testing.T) {
for i, tc := range []waitTestCase{
{[]string{"--async"}, 60, true, false},
{[]string{}, 60, false, false},
{[]string{"--wait-timeout=120"}, 120, false, false},
// Can't be easily prevented, the timeout is just ignored in this case:
{[]string{"--async", "--wait-timeout=120"}, 120, true, false},
{[]string{"--wait-timeout=bla"}, 0, true, true},
} {
flags := &WaitFlags{}
cmd := cobra.Command{}
flags.AddConditionWaitFlags(&cmd, 60, "service")
err := cmd.ParseFlags(tc.args)
if err != nil && !tc.isParseErrorExpected {
t.Errorf("%d: parse flags: %v", i, err)
}
if err == nil && tc.isParseErrorExpected {
t.Errorf("%d: parse error expected, but got none: %v", i, err)
}
if tc.isParseErrorExpected {
continue
}
if flags.Async != tc.isAsyncExpected {
t.Errorf("%d: wrong async mode detected: %t (expected) != %t (actual)", i, tc.isAsyncExpected, flags.Async)
}
if flags.TimeoutInSeconds != tc.timeoutExpected {
t.Errorf("%d: Invalid timeout set. %d (expected) != %d (actual)", i, tc.timeoutExpected, flags.TimeoutInSeconds)
}
}
}
func TestAddWaitUsageMessage(t *testing.T) {
flags := &WaitFlags{}
cmd := cobra.Command{}
flags.AddConditionWaitFlags(&cmd, 60, "blub")
if !strings.Contains(cmd.UsageString(), "blub") {
t.Error("no type returned in usage")
}
if !strings.Contains(cmd.UsageString(), "don't wait") {
t.Error("wrong usage message")
}
if !strings.Contains(cmd.UsageString(), "60") {
t.Error("default timeout not contained")
}
}

View File

@ -0,0 +1,80 @@
// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wait
import (
"github.com/knative/pkg/apis"
"github.com/knative/serving/pkg/apis/serving/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
)
// Helper for testing watch functionality
type FakeWatch struct {
eventChan chan watch.Event
events []watch.Event
// Record how often stop was called
StopCalled int
}
// Create a new fake watch with the given events which will be send when
// on start
func NewFakeWatch(events []watch.Event) *FakeWatch {
return &FakeWatch{
eventChan: make(chan watch.Event),
events: events,
}
}
// Stop the watch challend
func (f *FakeWatch) Stop() {
f.StopCalled++
}
// Start and fire events
func (f *FakeWatch) Start() {
go f.fireEvents()
}
// Channel for getting the events
func (f *FakeWatch) ResultChan() <-chan watch.Event {
return f.eventChan
}
func (f *FakeWatch) fireEvents() {
for _, ev := range f.events {
f.eventChan <- ev
}
}
// Create a service skeletion with a given ConditionReady status and all other statuses set to otherReadyStatus. Optionally a single generation can be added.
func CreateTestServiceWithConditions(readyStatus corev1.ConditionStatus, otherReadyStatus corev1.ConditionStatus, reason string, generations ...int64) runtime.Object {
service := v1alpha1.Service{}
if len(generations) == 2 {
service.Generation = generations[0]
service.Status.ObservedGeneration = generations[1]
} else {
service.Generation = 1
service.Status.ObservedGeneration = 1
}
service.Status.Conditions = []apis.Condition{
{Type: "RoutesReady", Status: otherReadyStatus},
{Type: apis.ConditionReady, Status: readyStatus, Reason: reason},
{Type: "ConfigurationsReady", Status: otherReadyStatus},
}
return &service
}

188
pkg/wait/wait_for_ready.go Normal file
View File

@ -0,0 +1,188 @@
// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wait
import (
"fmt"
"github.com/knative/pkg/apis"
"io"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"time"
)
// Callbacks and configuration used while waiting
type waitForReadyConfig struct {
watchFunc WatchFunc
conditionsExtractor ConditionsExtractor
kind string
}
// Interface used for waiting of a resource of a given name to reach a definitive
// state in its "Ready" condition.
type WaitForReady interface {
// Wait on resource the resource with this name until a given timeout
// and write status out on writer
Wait(name string, timeout time.Duration, out io.Writer) error
}
// Create watch which is used when waiting for Ready condition
type WatchFunc func(opts v1.ListOptions) (watch.Interface, error)
// Extract conditions from a runtime object
type ConditionsExtractor func(obj runtime.Object) (apis.Conditions, error)
// Constructor with resource type specific configuration
func NewWaitForReady(kind string, watchFunc WatchFunc, extractor ConditionsExtractor) WaitForReady {
return &waitForReadyConfig{
kind: kind,
watchFunc: watchFunc,
conditionsExtractor: extractor,
}
}
// Wait until a resource enters condition of type "Ready" to "False" or "True".
// `watchFunc` creates the actual watch, `kind` is the type what your are watching for
// (e.g. "service"), `timeout` is a timeout after which the watch should be cancelled if no
// target state has been entered yet and `out` is used for printing out status messages
func (w *waitForReadyConfig) Wait(name string, timeout time.Duration, out io.Writer) error {
opts := v1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
}
addWatchTimeout(&opts, timeout)
fmt.Fprintf(out, "Waiting for %s '%s' to become ready ... ", w.kind, name)
flush(out)
floatingTimeout := timeout
for {
start := time.Now()
retry, timeoutReached, err := w.waitForReadyCondition(opts, name, floatingTimeout)
if err != nil {
fmt.Fprintln(out)
return err
}
floatingTimeout = floatingTimeout - time.Since(start)
if timeoutReached || floatingTimeout < 0 {
return fmt.Errorf("timeout: %s '%s' not ready after %d seconds", w.kind, name, timeout)
}
if retry {
// restart loop
continue
}
fmt.Fprintln(out, "OK")
return nil
}
}
func addWatchTimeout(opts *v1.ListOptions, timeout time.Duration) {
if timeout == 0 {
return
}
// Wait for service to enter 'Ready' state, with a timeout of which is slightly larger than
// the provided timeout. We have our own timeout which fires after "timeout" seconds
// and stops the watch
timeOutWatchSeconds := int64((timeout + 30*time.Second) / time.Second)
opts.TimeoutSeconds = &timeOutWatchSeconds
}
// Duck type for writers having a flush
type flusher interface {
Flush() error
}
func flush(out io.Writer) {
if flusher, ok := out.(flusher); ok {
flusher.Flush()
}
}
func (w *waitForReadyConfig) waitForReadyCondition(opts v1.ListOptions, name string, timeout time.Duration) (bool, bool, error) {
watcher, err := w.watchFunc(opts)
if err != nil {
return false, false, err
}
defer watcher.Stop()
for {
select {
case <-time.After(timeout):
return false, true, nil
case event, ok := <-watcher.ResultChan():
if !ok || event.Object == nil {
return true, false, nil
}
// Skip event if generations has not yet been consolidated
inSync, err := isGivenEqualsObservedGeneration(event.Object)
if err != nil {
return false, false, err
}
if !inSync {
continue
}
conditions, err := w.conditionsExtractor(event.Object)
if err != nil {
return false, false, err
}
for _, cond := range conditions {
if cond.Type == apis.ConditionReady {
switch cond.Status {
case corev1.ConditionTrue:
return false, false, nil
case corev1.ConditionFalse:
return false, false, fmt.Errorf("%s: %s", cond.Reason, cond.Message)
}
}
}
}
}
}
// Going over Unstructured to keep that function generally applicable.
// Alternative implemenentation: Add a func-field to waitForReadyConfig which has to be
// provided for every resource (like the conditions extractor)
func isGivenEqualsObservedGeneration(object runtime.Object) (bool, error) {
unstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(object)
if err != nil {
return false, err
}
meta, ok := unstructured["metadata"].(map[string]interface{})
if !ok {
return false, fmt.Errorf("cannot extract metadata from %v", object)
}
status, ok := unstructured["status"].(map[string]interface{})
if !ok {
return false, fmt.Errorf("cannot extract status from %v", object)
}
observedGeneration, ok := status["observedGeneration"]
if !ok {
// Can be the case if not status has been attached yet
return false, nil
}
givenGeneration, ok := meta["generation"]
if !ok {
return false, fmt.Errorf("no field 'generation' in metadata of %v", object)
}
return givenGeneration == observedGeneration, nil
}

View File

@ -0,0 +1,119 @@
// Copyright © 2019 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wait
import (
"bytes"
"strings"
"testing"
"time"
"github.com/knative/pkg/apis"
"github.com/knative/serving/pkg/apis/serving/v1alpha1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
)
type waitForReadyTestCase struct {
events []watch.Event
timeout time.Duration
errorExpected bool
messageContent []string
}
func TestAddWaitForReady(t *testing.T) {
for i, tc := range prepareTestCases() {
fakeWatchApi := NewFakeWatch(tc.events)
outBuffer := new(bytes.Buffer)
waitForReady := NewWaitForReady(
"blub",
func(opts v1.ListOptions) (watch.Interface, error) {
return fakeWatchApi, nil
},
func(obj runtime.Object) (apis.Conditions, error) {
return apis.Conditions(obj.(*v1alpha1.Service).Status.Conditions), nil
})
fakeWatchApi.Start()
err := waitForReady.Wait("foobar", tc.timeout, outBuffer)
close(fakeWatchApi.eventChan)
if !tc.errorExpected && err != nil {
t.Errorf("%d: Error received %v", i, err)
continue
}
if tc.errorExpected && err == nil {
t.Errorf("%d: No error but expected one", i)
}
txtToCheck := outBuffer.String()
if err != nil {
txtToCheck = err.Error()
}
for _, msg := range tc.messageContent {
if !strings.Contains(txtToCheck, msg) {
t.Errorf("%d: '%s' does not contain expected part %s", i, txtToCheck, msg)
}
}
if fakeWatchApi.StopCalled != 1 {
t.Errorf("%d: Exactly one 'stop' should be called, but got %d", i, fakeWatchApi.StopCalled)
}
}
}
// Test cases which consists of a series of events to send and the expected behaviour.
func prepareTestCases() []waitForReadyTestCase {
return []waitForReadyTestCase{
{peNormal(), time.Second, false, []string{"OK", "foobar", "blub"}},
{peError(), time.Second, true, []string{"FakeError"}},
{peTimeout(), time.Second, true, []string{"timeout"}},
{peWrongGeneration(), time.Second, true, []string{"timeout"}},
}
}
// =============================================================================
func peNormal() []watch.Event {
return []watch.Event{
{watch.Added, CreateTestServiceWithConditions(corev1.ConditionUnknown, corev1.ConditionUnknown, "")},
{watch.Modified, CreateTestServiceWithConditions(corev1.ConditionUnknown, corev1.ConditionTrue, "")},
{watch.Modified, CreateTestServiceWithConditions(corev1.ConditionTrue, corev1.ConditionTrue, "")},
}
}
func peError() []watch.Event {
return []watch.Event{
{watch.Added, CreateTestServiceWithConditions(corev1.ConditionUnknown, corev1.ConditionUnknown, "")},
{watch.Modified, CreateTestServiceWithConditions(corev1.ConditionFalse, corev1.ConditionTrue, "FakeError")},
}
}
func peTimeout() []watch.Event {
return []watch.Event{
{watch.Added, CreateTestServiceWithConditions(corev1.ConditionUnknown, corev1.ConditionUnknown, "")},
}
}
func peWrongGeneration() []watch.Event {
return []watch.Event{
{watch.Added, CreateTestServiceWithConditions(corev1.ConditionUnknown, corev1.ConditionUnknown, "")},
{watch.Modified, CreateTestServiceWithConditions(corev1.ConditionTrue, corev1.ConditionTrue, "", 1, 2)},
}
}

8
vendor/modules.txt vendored
View File

@ -203,19 +203,19 @@ k8s.io/apimachinery/pkg/apis/meta/v1beta1
k8s.io/apimachinery/pkg/labels
k8s.io/apimachinery/pkg/runtime
k8s.io/apimachinery/pkg/runtime/schema
k8s.io/apimachinery/pkg/api/errors
k8s.io/apimachinery/pkg/api/resource
k8s.io/apimachinery/pkg/api/meta
k8s.io/apimachinery/pkg/util/runtime
k8s.io/apimachinery/pkg/fields
k8s.io/apimachinery/pkg/watch
k8s.io/apimachinery/pkg/api/equality
k8s.io/apimachinery/pkg/api/validation
k8s.io/apimachinery/pkg/runtime/serializer
k8s.io/apimachinery/pkg/types
k8s.io/apimachinery/pkg/watch
k8s.io/apimachinery/pkg/conversion
k8s.io/apimachinery/pkg/fields
k8s.io/apimachinery/pkg/selection
k8s.io/apimachinery/pkg/util/intstr
k8s.io/apimachinery/pkg/api/errors
k8s.io/apimachinery/pkg/util/json
k8s.io/apimachinery/pkg/util/strategicpatch
k8s.io/apimachinery/pkg/util/errors
@ -224,10 +224,10 @@ k8s.io/apimachinery/pkg/util/sets
k8s.io/apimachinery/pkg/apis/meta/v1/unstructured
k8s.io/apimachinery/pkg/conversion/queryparams
k8s.io/apimachinery/pkg/util/naming
k8s.io/apimachinery/pkg/util/validation/field
k8s.io/apimachinery/pkg/util/net
k8s.io/apimachinery/pkg/util/yaml
k8s.io/apimachinery/pkg/apis/meta/v1/validation
k8s.io/apimachinery/pkg/util/validation/field
k8s.io/apimachinery/pkg/runtime/serializer/json
k8s.io/apimachinery/pkg/runtime/serializer/protobuf
k8s.io/apimachinery/pkg/runtime/serializer/recognizer