mirror of https://github.com/knative/client.git
feature(service update): Add generic mechanism to include retries on conflicts (#660)
* feature(service update): Add generic mechanism to include retries on conflicts Move the retry logic for retrying in case of a resource conflict into the KnClient. * chore: Update unit tests
This commit is contained in:
parent
12d718e7af
commit
06d4dffd1e
|
|
@ -25,7 +25,6 @@ import (
|
|||
|
||||
clientserving "knative.dev/client/pkg/serving"
|
||||
clientservingv1 "knative.dev/client/pkg/serving/v1"
|
||||
|
||||
"knative.dev/client/pkg/util"
|
||||
)
|
||||
|
||||
|
|
@ -52,10 +51,7 @@ func TestServiceUpdateEnvMock(t *testing.T) {
|
|||
template.Annotations = map[string]string{clientserving.UserImageAnnotationKey: "gcr.io/foo/bar:baz"}
|
||||
|
||||
r := client.Recorder()
|
||||
r.GetService("foo", nil, errors.NewNotFound(servingv1.Resource("service"), "foo"))
|
||||
r.CreateService(service, nil)
|
||||
r.GetService("foo", service, nil)
|
||||
r.UpdateService(updated, nil)
|
||||
recordServiceUpdateWithSuccess(r, "foo", service, updated)
|
||||
|
||||
output, err := executeServiceCommand(client, "create", "foo", "--image", "gcr.io/foo/bar:baz", "-e", "a=mouse", "--env", "b=cookie", "--env=empty", "--no-wait", "--revision-name=")
|
||||
assert.NilError(t, err)
|
||||
|
|
@ -100,10 +96,7 @@ func TestServiceUpdateAnnotationsMock(t *testing.T) {
|
|||
}
|
||||
|
||||
r := client.Recorder()
|
||||
r.GetService(svcName, nil, errors.NewNotFound(servingv1.Resource("service"), svcName))
|
||||
r.CreateService(newService, nil)
|
||||
r.GetService(svcName, newService, nil)
|
||||
r.UpdateService(updatedService, nil)
|
||||
recordServiceUpdateWithSuccess(r, svcName, newService, updatedService)
|
||||
|
||||
output, err := executeServiceCommand(client,
|
||||
"create", svcName, "--image", "gcr.io/foo/bar:baz",
|
||||
|
|
@ -127,6 +120,13 @@ func TestServiceUpdateAnnotationsMock(t *testing.T) {
|
|||
r.Validate()
|
||||
}
|
||||
|
||||
func recordServiceUpdateWithSuccess(r *clientservingv1.ServingRecorder, svcName string, newService *servingv1.Service, updatedService *servingv1.Service) {
|
||||
r.GetService(svcName, nil, errors.NewNotFound(servingv1.Resource("service"), svcName))
|
||||
r.CreateService(newService, nil)
|
||||
r.GetService(svcName, newService, nil)
|
||||
r.UpdateService(updatedService, nil)
|
||||
}
|
||||
|
||||
func TestServiceUpdateEnvFromAddingWithConfigMap(t *testing.T) {
|
||||
client := clientservingv1.NewMockKnServiceClient(t)
|
||||
svcName := "svc1"
|
||||
|
|
@ -170,10 +170,7 @@ func TestServiceUpdateEnvFromAddingWithConfigMap(t *testing.T) {
|
|||
}
|
||||
|
||||
r := client.Recorder()
|
||||
r.GetService(svcName, nil, errors.NewNotFound(servingv1.Resource("service"), svcName))
|
||||
r.CreateService(newService, nil)
|
||||
r.GetService(svcName, newService, nil)
|
||||
r.UpdateService(updatedService, nil)
|
||||
recordServiceUpdateWithSuccess(r, svcName, newService, updatedService)
|
||||
|
||||
output, err := executeServiceCommand(client,
|
||||
"create", svcName, "--image", "gcr.io/foo/bar:baz",
|
||||
|
|
@ -275,10 +272,7 @@ func TestServiceUpdateEnvFromRemovalWithConfigMap(t *testing.T) {
|
|||
template.Spec.Containers[0].EnvFrom = nil
|
||||
|
||||
r := client.Recorder()
|
||||
r.GetService(svcName, nil, errors.NewNotFound(servingv1.Resource("service"), svcName))
|
||||
r.CreateService(newService, nil)
|
||||
r.GetService(svcName, newService, nil)
|
||||
r.UpdateService(updatedService1, nil)
|
||||
recordServiceUpdateWithSuccess(r, svcName, newService, updatedService1)
|
||||
r.GetService(svcName, updatedService1, nil)
|
||||
//r.UpdateService(updatedService2, nil) // since an error happens, update is not triggered here
|
||||
r.GetService(svcName, updatedService2, nil)
|
||||
|
|
@ -454,10 +448,7 @@ func TestServiceUpdateEnvFromExistingWithConfigMap(t *testing.T) {
|
|||
}
|
||||
|
||||
r := client.Recorder()
|
||||
r.GetService(svcName, nil, errors.NewNotFound(servingv1.Resource("service"), svcName))
|
||||
r.CreateService(newService, nil)
|
||||
r.GetService(svcName, newService, nil)
|
||||
r.UpdateService(updatedService, nil)
|
||||
recordServiceUpdateWithSuccess(r, svcName, newService, updatedService)
|
||||
|
||||
output, err := executeServiceCommand(client,
|
||||
"create", svcName, "--image", "gcr.io/foo/bar:baz",
|
||||
|
|
@ -523,10 +514,7 @@ func TestServiceUpdateEnvFromAddingWithSecret(t *testing.T) {
|
|||
}
|
||||
|
||||
r := client.Recorder()
|
||||
r.GetService(svcName, nil, errors.NewNotFound(servingv1.Resource("service"), svcName))
|
||||
r.CreateService(newService, nil)
|
||||
r.GetService(svcName, newService, nil)
|
||||
r.UpdateService(updatedService, nil)
|
||||
recordServiceUpdateWithSuccess(r, svcName, newService, updatedService)
|
||||
|
||||
output, err := executeServiceCommand(client,
|
||||
"create", svcName, "--image", "gcr.io/foo/bar:baz",
|
||||
|
|
@ -733,10 +721,7 @@ func TestServiceUpdateEnvFromExistingWithSecret(t *testing.T) {
|
|||
}
|
||||
|
||||
r := client.Recorder()
|
||||
r.GetService(svcName, nil, errors.NewNotFound(servingv1.Resource("service"), svcName))
|
||||
r.CreateService(newService, nil)
|
||||
r.GetService(svcName, newService, nil)
|
||||
r.UpdateService(updatedService, nil)
|
||||
recordServiceUpdateWithSuccess(r, svcName, newService, updatedService)
|
||||
|
||||
output, err := executeServiceCommand(client,
|
||||
"create", svcName, "--image", "gcr.io/foo/bar:baz",
|
||||
|
|
@ -835,10 +820,7 @@ func TestServiceUpdateWithAddingVolume(t *testing.T) {
|
|||
}
|
||||
|
||||
r := client.Recorder()
|
||||
r.GetService(svcName, nil, errors.NewNotFound(servingv1.Resource("service"), svcName))
|
||||
r.CreateService(newService, nil)
|
||||
r.GetService(svcName, newService, nil)
|
||||
r.UpdateService(updatedService, nil)
|
||||
recordServiceUpdateWithSuccess(r, svcName, newService, updatedService)
|
||||
|
||||
output, err := executeServiceCommand(client,
|
||||
"create", svcName, "--image", "gcr.io/foo/bar:baz",
|
||||
|
|
@ -937,10 +919,7 @@ func TestServiceUpdateWithUpdatingVolume(t *testing.T) {
|
|||
}
|
||||
|
||||
r := client.Recorder()
|
||||
r.GetService(svcName, nil, errors.NewNotFound(servingv1.Resource("service"), svcName))
|
||||
r.CreateService(newService, nil)
|
||||
r.GetService(svcName, newService, nil)
|
||||
r.UpdateService(updatedService, nil)
|
||||
recordServiceUpdateWithSuccess(r, svcName, newService, updatedService)
|
||||
|
||||
output, err := executeServiceCommand(client,
|
||||
"create", svcName, "--image", "gcr.io/foo/bar:baz",
|
||||
|
|
@ -1041,10 +1020,7 @@ func TestServiceUpdateWithRemovingVolume(t *testing.T) {
|
|||
}
|
||||
|
||||
r := client.Recorder()
|
||||
r.GetService(svcName, nil, errors.NewNotFound(servingv1.Resource("service"), svcName))
|
||||
r.CreateService(newService, nil)
|
||||
r.GetService(svcName, newService, nil)
|
||||
r.UpdateService(updatedService, nil)
|
||||
recordServiceUpdateWithSuccess(r, svcName, newService, updatedService)
|
||||
|
||||
output, err := executeServiceCommand(client,
|
||||
"create", svcName, "--image", "gcr.io/foo/bar:baz",
|
||||
|
|
@ -1119,10 +1095,7 @@ func TestServiceUpdateWithAddingMount(t *testing.T) {
|
|||
}
|
||||
|
||||
r := client.Recorder()
|
||||
r.GetService(svcName, nil, errors.NewNotFound(servingv1.Resource("service"), svcName))
|
||||
r.CreateService(newService, nil)
|
||||
r.GetService(svcName, newService, nil)
|
||||
r.UpdateService(updatedService, nil)
|
||||
recordServiceUpdateWithSuccess(r, svcName, newService, updatedService)
|
||||
|
||||
output, err := executeServiceCommand(client,
|
||||
"create", svcName, "--image", "gcr.io/foo/bar:baz",
|
||||
|
|
@ -1227,10 +1200,7 @@ func TestServiceUpdateWithUpdatingMount(t *testing.T) {
|
|||
}
|
||||
|
||||
r := client.Recorder()
|
||||
r.GetService(svcName, nil, errors.NewNotFound(servingv1.Resource("service"), svcName))
|
||||
r.CreateService(newService, nil)
|
||||
r.GetService(svcName, newService, nil)
|
||||
r.UpdateService(updatedService, nil)
|
||||
recordServiceUpdateWithSuccess(r, svcName, newService, updatedService)
|
||||
|
||||
output, err := executeServiceCommand(client,
|
||||
"create", svcName, "--image", "gcr.io/foo/bar:baz",
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
|
||||
"knative.dev/client/pkg/kn/commands/flags"
|
||||
"knative.dev/client/pkg/kn/traffic"
|
||||
|
|
@ -74,15 +73,12 @@ func NewServiceUpdateCommand(p *commands.KnParams) *cobra.Command {
|
|||
return err
|
||||
}
|
||||
|
||||
var retries = 0
|
||||
for {
|
||||
name := args[0]
|
||||
service, err := client.GetService(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
service = service.DeepCopy()
|
||||
latestRevisionBeforeUpdate := service.Status.LatestReadyRevisionName
|
||||
// Use to store the latest revision name
|
||||
var latestRevisionBeforeUpdate string
|
||||
name := args[0]
|
||||
|
||||
updateFunc := func(service *servingv1.Service) (*servingv1.Service, error) {
|
||||
latestRevisionBeforeUpdate = service.Status.LatestReadyRevisionName
|
||||
var baseRevision *servingv1.Revision
|
||||
if !cmd.Flags().Changed("image") && editFlags.LockToDigest {
|
||||
baseRevision, err = client.GetBaseRevision(service)
|
||||
|
|
@ -92,48 +88,46 @@ func NewServiceUpdateCommand(p *commands.KnParams) *cobra.Command {
|
|||
}
|
||||
err = editFlags.Apply(service, baseRevision, cmd)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if trafficFlags.Changed(cmd) {
|
||||
traffic, err := traffic.Compute(cmd, service.Spec.Traffic, &trafficFlags)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
service.Spec.Traffic = traffic
|
||||
}
|
||||
return service, nil
|
||||
}
|
||||
|
||||
err = client.UpdateService(service)
|
||||
// Do the actual update with retry in case of conflicts
|
||||
err = client.UpdateServiceWithRetry(name, updateFunc, MaxUpdateRetries)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
out := cmd.OutOrStdout()
|
||||
//TODO: deprecated condition should be once --async is gone
|
||||
if !waitFlags.Async && !waitFlags.NoWait {
|
||||
fmt.Fprintf(out, "Updating Service '%s' in namespace '%s':\n", args[0], namespace)
|
||||
fmt.Fprintln(out, "")
|
||||
err := waitForService(client, name, out, waitFlags.TimeoutInSeconds)
|
||||
if err != nil {
|
||||
// Retry to update when a resource version conflict exists
|
||||
if apierrors.IsConflict(err) && retries < MaxUpdateRetries {
|
||||
retries++
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
out := cmd.OutOrStdout()
|
||||
//TODO: deprecated condition should be once --async is gone
|
||||
if !waitFlags.Async && !waitFlags.NoWait {
|
||||
fmt.Fprintf(out, "Updating Service '%s' in namespace '%s':\n", args[0], namespace)
|
||||
fmt.Fprintln(out, "")
|
||||
err := waitForService(client, name, out, waitFlags.TimeoutInSeconds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Fprintln(out, "")
|
||||
return showUrl(client, name, latestRevisionBeforeUpdate, "updated", out)
|
||||
} else {
|
||||
if waitFlags.Async {
|
||||
fmt.Fprintf(out, "\nWARNING: flag --async is deprecated and going to be removed in future release, please use --no-wait instead.\n\n")
|
||||
}
|
||||
fmt.Fprintf(out, "Service '%s' updated in namespace '%s'.\n", args[0], namespace)
|
||||
fmt.Fprintln(out, "")
|
||||
return showUrl(client, name, latestRevisionBeforeUpdate, "updated", out)
|
||||
} else {
|
||||
if waitFlags.Async {
|
||||
fmt.Fprintf(out, "\nWARNING: flag --async is deprecated and going to be removed in future release, please use --no-wait instead.\n\n")
|
||||
}
|
||||
|
||||
return nil
|
||||
fmt.Fprintf(out, "Service '%s' updated in namespace '%s'.\n", args[0], namespace)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
},
|
||||
PreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
return preCheck(cmd, args)
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"knative.dev/pkg/apis"
|
||||
"knative.dev/serving/pkg/client/clientset/versioned/scheme"
|
||||
|
|
@ -37,6 +38,10 @@ import (
|
|||
"knative.dev/client/pkg/errors"
|
||||
)
|
||||
|
||||
// Func signature for an updating function which returns the updated service object
|
||||
// or an error
|
||||
type serviceUpdateFunc func(origService *servingv1.Service) (*servingv1.Service, error)
|
||||
|
||||
// Kn interface to serving. All methods are relative to the
|
||||
// namespace specified during construction
|
||||
type KnServingClient interface {
|
||||
|
|
@ -53,9 +58,15 @@ type KnServingClient interface {
|
|||
// Create a new service
|
||||
CreateService(service *servingv1.Service) error
|
||||
|
||||
// Update the given service
|
||||
// UpdateService updates the given service. For a more robust variant with automatic
|
||||
// conflict resolution see UpdateServiceWithRetry
|
||||
UpdateService(service *servingv1.Service) error
|
||||
|
||||
// UpdateServiceWithRetry updates service and retries if there is a version conflict.
|
||||
// The updateFunc receives a deep copy of the existing service and can add update it in
|
||||
// place.
|
||||
UpdateServiceWithRetry(name string, updateFunc serviceUpdateFunc, nrRetries int) error
|
||||
|
||||
// Delete a service by name
|
||||
DeleteService(name string) error
|
||||
|
||||
|
|
@ -207,6 +218,37 @@ func (cl *knServingClient) UpdateService(service *servingv1.Service) error {
|
|||
return updateServingGvk(service)
|
||||
}
|
||||
|
||||
// Update the given service with a retry in case of a conflict
|
||||
func (cl *knServingClient) UpdateServiceWithRetry(name string, updateFunc serviceUpdateFunc, nrRetries int) error {
|
||||
return updateServiceWithRetry(cl, name, updateFunc, nrRetries)
|
||||
}
|
||||
|
||||
// Extracted to be usable with the Mocking client
|
||||
func updateServiceWithRetry(cl KnServingClient, name string, updateFunc serviceUpdateFunc, nrRetries int) error {
|
||||
var retries = 0
|
||||
for {
|
||||
service, err := cl.GetService(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
updatedService, err := updateFunc(service.DeepCopy())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = cl.UpdateService(updatedService)
|
||||
if err != nil {
|
||||
// Retry to update when a resource version conflict exists
|
||||
if apierrors.IsConflict(err) && retries < nrRetries {
|
||||
retries++
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Delete a service by name
|
||||
func (cl *knServingClient) DeleteService(serviceName string) error {
|
||||
err := cl.client.Services(cl.namespace).Delete(
|
||||
|
|
|
|||
|
|
@ -100,6 +100,11 @@ func (c *MockKnServingClient) UpdateService(service *servingv1.Service) error {
|
|||
return mock.ErrorOrNil(call.Result[0])
|
||||
}
|
||||
|
||||
// Delegate to shared retry method
|
||||
func (c *MockKnServingClient) UpdateServiceWithRetry(name string, updateFunc serviceUpdateFunc, maxRetry int) error {
|
||||
return updateServiceWithRetry(c, name, updateFunc, maxRetry)
|
||||
}
|
||||
|
||||
// Delete a service by name
|
||||
func (sr *ServingRecorder) DeleteService(name interface{}, err error) {
|
||||
sr.r.Add("DeleteService", []interface{}{name}, []interface{}{err})
|
||||
|
|
|
|||
Loading…
Reference in New Issue