* webhook

Signed-off-by: veophi <vec.g.sun@gmail.com>

* improve webhook

Signed-off-by: veophi <vec.g.sun@gmail.com>
This commit is contained in:
Wei-Xiang Sun 2022-03-09 17:59:41 +08:00 committed by GitHub
parent 4bac9d9cbe
commit 62544eb1de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 3656 additions and 0 deletions

25
webhook/add_rollout.go Normal file
View File

@ -0,0 +1,25 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package webhook
import (
"github.com/openkruise/rollouts/webhook/rollout/validating"
)
func init() {
addHandlers(validating.HandlerMap)
}

25
webhook/add_workload.go Normal file
View File

@ -0,0 +1,25 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package webhook
import (
"github.com/openkruise/rollouts/webhook/workload/mutating"
)
func init() {
addHandlers(mutating.HandlerMap)
}

View File

@ -0,0 +1,275 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package validating
import (
"context"
"fmt"
"net/http"
"reflect"
appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
addmissionv1 "k8s.io/api/admission/v1"
apps "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/validation/field"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
// RolloutCreateUpdateHandler handles Rollout
type RolloutCreateUpdateHandler struct {
// To use the client, you need to do the following:
// - uncomment it
// - import sigs.k8s.io/controller-runtime/pkg/client
// - uncomment the InjectClient method at the bottom of this file.
Client client.Client
// Decoder decodes objects
Decoder *admission.Decoder
}
var _ admission.Handler = &RolloutCreateUpdateHandler{}
// Handle handles admission requests.
func (h *RolloutCreateUpdateHandler) Handle(ctx context.Context, req admission.Request) admission.Response {
switch req.Operation {
case addmissionv1.Create:
obj := &appsv1alpha1.Rollout{}
if err := h.Decoder.Decode(req, obj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
errList := h.validateRollout(obj)
if len(errList) != 0 {
return admission.Errored(http.StatusUnprocessableEntity, errList.ToAggregate())
}
case addmissionv1.Update:
obj := &appsv1alpha1.Rollout{}
if err := h.Decoder.Decode(req, obj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
oldObj := &appsv1alpha1.Rollout{}
if err := h.Decoder.DecodeRaw(req.AdmissionRequest.OldObject, oldObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
errList := h.validateRolloutUpdate(oldObj, obj)
if len(errList) != 0 {
return admission.Errored(http.StatusUnprocessableEntity, errList.ToAggregate())
}
}
return admission.ValidationResponse(true, "")
}
func (h *RolloutCreateUpdateHandler) validateRolloutUpdate(oldObj, newObj *appsv1alpha1.Rollout) field.ErrorList {
latestObject := &appsv1alpha1.Rollout{}
err := h.Client.Get(context.TODO(), client.ObjectKeyFromObject(newObj), latestObject)
if err != nil {
return field.ErrorList{field.InternalError(field.NewPath("Rollout"), err)}
}
switch latestObject.Status.Phase {
case "", appsv1alpha1.RolloutPhaseInitial, appsv1alpha1.RolloutPhaseHealthy:
if !reflect.DeepEqual(oldObj.Spec.ObjectRef, newObj.Spec.ObjectRef) {
return field.ErrorList{field.Forbidden(field.NewPath("Spec.ObjectRef"), "Rollout 'ObjectRef' field is immutable")}
}
default:
// except spec.paused
oldObj.Spec.Strategy.Paused = false
newObj.Spec.Strategy.Paused = false
if !reflect.DeepEqual(oldObj.Spec, newObj.Spec) {
return field.ErrorList{field.Forbidden(field.NewPath("Status.Phase"), "Rollout is immutable because it is at Progressing/Terminating phase")}
}
}
if newObj.Status.CanaryStatus != nil && newObj.Status.CanaryStatus.CurrentStepState == appsv1alpha1.CanaryStepStateCompleted {
if oldObj.Status.CanaryStatus != nil {
switch oldObj.Status.CanaryStatus.CurrentStepState {
case appsv1alpha1.CanaryStepStateCompleted, appsv1alpha1.CanaryStepStatePaused:
default:
return field.ErrorList{field.Forbidden(field.NewPath("Status"), "CanaryStatus.CurrentStepState only allow to translate to 'StepInCompleted' from 'StepInPaused'")}
}
}
}
return h.validateRollout(newObj)
}
func (h *RolloutCreateUpdateHandler) validateRollout(rollout *appsv1alpha1.Rollout) field.ErrorList {
errList := validateRolloutSpec(rollout, field.NewPath("Spec"))
errList = append(errList, h.validateRolloutConflict(rollout, field.NewPath("Conflict Checker"))...)
return errList
}
func (h *RolloutCreateUpdateHandler) validateRolloutConflict(rollout *appsv1alpha1.Rollout, path *field.Path) field.ErrorList {
errList := field.ErrorList{}
rolloutList := &appsv1alpha1.RolloutList{}
err := h.Client.List(context.TODO(), rolloutList, client.InNamespace(rollout.Namespace))
if err != nil {
return append(errList, field.InternalError(path, err))
}
for i := range rolloutList.Items {
r := &rolloutList.Items[i]
if r.Name == rollout.Name || !IsSameWorkloadRefGVKName(r.Spec.ObjectRef.WorkloadRef, rollout.Spec.ObjectRef.WorkloadRef) {
continue
}
return field.ErrorList{field.Invalid(path, rollout.Name,
fmt.Sprintf("This rollout conflict with Rollout(%v), one workload only have less than one Rollout", client.ObjectKeyFromObject(r)))}
}
return nil
}
func validateRolloutSpec(rollout *appsv1alpha1.Rollout, fldPath *field.Path) field.ErrorList {
errList := validateRolloutSpecObjectRef(&rollout.Spec.ObjectRef, fldPath.Child("ObjectRef"))
errList = append(errList, validateRolloutSpecStrategy(&rollout.Spec.Strategy, fldPath.Child("Strategy"))...)
return errList
}
func validateRolloutSpecObjectRef(objectRef *appsv1alpha1.ObjectRef, fldPath *field.Path) field.ErrorList {
switch objectRef.Type {
case "", appsv1alpha1.WorkloadRefType:
if objectRef.WorkloadRef == nil ||
objectRef.WorkloadRef.Kind != "Deployment" ||
objectRef.WorkloadRef.APIVersion != apps.SchemeGroupVersion.String() {
return field.ErrorList{field.Invalid(fldPath.Child("WorkloadRef"), objectRef.WorkloadRef, "WorkloadRef only support 'Deployments.apps/v1'")}
}
default:
return field.ErrorList{field.Invalid(fldPath.Child("Type"), objectRef.Type, "ObjectRef only support 'workloadRef' type")}
}
return nil
}
func validateRolloutSpecStrategy(strategy *appsv1alpha1.RolloutStrategy, fldPath *field.Path) field.ErrorList {
switch strategy.Type {
case "", appsv1alpha1.RolloutStrategyCanary:
return validateRolloutSpecCanaryStrategy(strategy.Canary, fldPath.Child("Canary"))
default:
return field.ErrorList{field.Invalid(fldPath.Child("Type"), strategy.Type, "Strategy type only support 'canary'")}
}
}
func validateRolloutSpecCanaryStrategy(canary *appsv1alpha1.CanaryStrategy, fldPath *field.Path) field.ErrorList {
if canary == nil {
return field.ErrorList{field.Invalid(fldPath, nil, "Canary cannot be empty")}
}
errList := validateRolloutSpecCanarySteps(canary.Steps, fldPath.Child("Steps"))
errList = append(errList, validateRolloutSpecCanaryTraffic(canary.TrafficRouting, fldPath.Child("TrafficRouting"))...)
return errList
}
func validateRolloutSpecCanaryTraffic(traffic *appsv1alpha1.TrafficRouting, fldPath *field.Path) field.ErrorList {
if traffic == nil {
return field.ErrorList{field.Invalid(fldPath, nil, "Canary.TrafficRouting cannot be empty")}
}
errList := field.ErrorList{}
if len(traffic.Service) == 0 {
errList = append(errList, field.Invalid(fldPath.Child("Service"), traffic.Service, "TrafficRouting.Service cannot be empty"))
}
switch traffic.Type {
case "", appsv1alpha1.TrafficRoutingNginx:
if traffic.Nginx == nil ||
len(traffic.Nginx.Ingress) == 0 {
errList = append(errList, field.Invalid(fldPath.Child("Nginx"), traffic.Nginx, "TrafficRouting.Nginx.Ingress cannot be empty"))
}
default:
errList = append(errList, field.Invalid(fldPath.Child("Type"), traffic.Type, "TrafficRouting only support 'nginx' type"))
}
return errList
}
func validateRolloutSpecCanarySteps(steps []appsv1alpha1.CanaryStep, fldPath *field.Path) field.ErrorList {
stepCount := len(steps)
if stepCount == 0 {
return field.ErrorList{field.Invalid(fldPath, steps, "The number of Canary.Steps cannot be empty")}
}
for i := range steps {
s := &steps[i]
if s.Weight <= 0 || s.Weight > 100 {
return field.ErrorList{field.Invalid(fldPath.Index(i).Child("Weight"),
s.Weight, `Weight must be a positive number with "0" < weight <= "100"`)}
}
if s.Replicas != nil {
canaryReplicas, err := intstr.GetScaledValueFromIntOrPercent(s.Replicas, 100, true)
if err != nil || canaryReplicas <= 0 || canaryReplicas > 100 {
return field.ErrorList{field.Invalid(fldPath.Index(i).Child("CanaryReplicas"),
s.Replicas, `canaryReplicas must be positive number with with "0" < canaryReplicas <= "100", or a percentage with "0%" < canaryReplicas <= "100%"`)}
}
}
}
for i := 1; i < stepCount; i++ {
prev := &steps[i-1]
curr := &steps[i]
if curr.Weight < prev.Weight {
return field.ErrorList{field.Invalid(fldPath.Child("Weight"), steps, `Steps.Weight must be a non decreasing sequence`)}
}
// if they are comparable, then compare them
if IsPercentageCanaryReplicasType(prev.Replicas) != IsPercentageCanaryReplicasType(curr.Replicas) {
continue
}
prevCanaryReplicas, _ := intstr.GetScaledValueFromIntOrPercent(prev.Replicas, 100, true)
currCanaryReplicas, _ := intstr.GetScaledValueFromIntOrPercent(curr.Replicas, 100, true)
if prev.Replicas == nil {
prevCanaryReplicas = int(prev.Weight)
}
if curr.Replicas == nil {
currCanaryReplicas = int(curr.Weight)
}
if currCanaryReplicas < prevCanaryReplicas {
return field.ErrorList{field.Invalid(fldPath.Child("CanaryReplicas"), steps, `Steps.CanaryReplicas must be a non decreasing sequence`)}
}
}
return nil
}
func IsPercentageCanaryReplicasType(replicas *intstr.IntOrString) bool {
return replicas == nil || replicas.Type == intstr.String
}
func IsSameWorkloadRefGVKName(a, b *appsv1alpha1.WorkloadRef) bool {
if a == nil || b == nil {
return false
}
return reflect.DeepEqual(a, b)
}
var _ inject.Client = &RolloutCreateUpdateHandler{}
// InjectClient injects the client into the RolloutCreateUpdateHandler
func (h *RolloutCreateUpdateHandler) InjectClient(c client.Client) error {
h.Client = c
return nil
}
var _ admission.DecoderInjector = &RolloutCreateUpdateHandler{}
// InjectDecoder injects the decoder into the RolloutCreateUpdateHandler
func (h *RolloutCreateUpdateHandler) InjectDecoder(d *admission.Decoder) error {
h.Decoder = d
return nil
}

View File

@ -0,0 +1,542 @@
package validating
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
apps "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)
var (
scheme = runtime.NewScheme()
rollout = appsv1alpha1.Rollout{
TypeMeta: metav1.TypeMeta{
APIVersion: appsv1alpha1.SchemeGroupVersion.String(),
Kind: "Rollout",
},
ObjectMeta: metav1.ObjectMeta{
Name: "rollout-demo",
Namespace: "namespace-unit-test",
},
Spec: appsv1alpha1.RolloutSpec{
ObjectRef: appsv1alpha1.ObjectRef{
Type: appsv1alpha1.WorkloadRefType,
WorkloadRef: &appsv1alpha1.WorkloadRef{
APIVersion: apps.SchemeGroupVersion.String(),
Kind: "Deployment",
Name: "deployment-demo",
},
},
Strategy: appsv1alpha1.RolloutStrategy{
Type: appsv1alpha1.RolloutStrategyCanary,
Canary: &appsv1alpha1.CanaryStrategy{
Steps: []appsv1alpha1.CanaryStep{
{
Weight: 10,
Replicas: &intstr.IntOrString{Type: intstr.Int, IntVal: int32(1)},
Pause: appsv1alpha1.RolloutPause{},
},
{
Weight: 10,
Replicas: &intstr.IntOrString{Type: intstr.Int, IntVal: int32(3)},
Pause: appsv1alpha1.RolloutPause{Duration: pointer.Int32(1 * 24 * 60 * 60)},
},
{
Weight: 30,
Pause: appsv1alpha1.RolloutPause{Duration: pointer.Int32(7 * 24 * 60 * 60)},
},
{
Weight: 100,
},
},
TrafficRouting: &appsv1alpha1.TrafficRouting{
Type: appsv1alpha1.TrafficRoutingNginx,
Service: "service-demo",
Nginx: &appsv1alpha1.NginxTrafficRouting{
Ingress: "ingress-nginx-demo",
},
},
},
},
},
Status: appsv1alpha1.RolloutStatus{
CanaryStatus: &appsv1alpha1.CanaryStatus{
CurrentStepState: appsv1alpha1.CanaryStepStateCompleted,
},
},
}
)
func init() {
_ = appsv1alpha1.AddToScheme(scheme)
}
func TestRolloutValidateCreate(t *testing.T) {
RegisterFailHandler(Fail)
cases := []struct {
Name string
Succeed bool
GetObject func() []client.Object
}{
{
Name: "Normal case",
Succeed: true,
GetObject: func() []client.Object {
return []client.Object{rollout.DeepCopy()}
},
},
/***********************************************************
The following cases may lead to controller panic
**********************************************************/
{
Name: "WorkloadRef is nil",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.ObjectRef.WorkloadRef = nil
return []client.Object{object}
},
},
{
Name: "Canary is nil",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary = nil
return []client.Object{object}
},
},
{
Name: "Traffic is nil",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.TrafficRouting = nil
return []client.Object{object}
},
},
{
Name: "Nginx is nil",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.TrafficRouting.Nginx = nil
return []client.Object{object}
},
},
/****************************************************************
The following cases may lead to that controller cannot work
***************************************************************/
{
Name: "Service name is empty",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.TrafficRouting.Service = ""
return []client.Object{object}
},
},
{
Name: "Nginx ingress name is empty",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.TrafficRouting.Nginx.Ingress = ""
return []client.Object{object}
},
},
{
Name: "Steps is empty",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.Steps = []appsv1alpha1.CanaryStep{}
return []client.Object{object}
},
},
{
Name: "WorkloadRef is not Deployment kind",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.ObjectRef.WorkloadRef = &appsv1alpha1.WorkloadRef{
APIVersion: "apps/v1",
Kind: "StatefulSet",
Name: "whatever",
}
return []client.Object{object}
},
},
{
Name: "Steps.Weight is a decreasing sequence",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.Steps[2].Weight = 5
return []client.Object{object}
},
},
{
Name: "Steps.Replicas is a decreasing sequence",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.Steps[1].Replicas = &intstr.IntOrString{Type: intstr.String, StrVal: "50%"}
return []client.Object{object}
},
},
{
Name: "Steps.Replicas is illegal value, '50'",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.Steps[1].Replicas = &intstr.IntOrString{Type: intstr.String, StrVal: "50"}
return []client.Object{object}
},
},
{
Name: "Steps.Replicas is illegal value, '101%'",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.Steps[1].Replicas = &intstr.IntOrString{Type: intstr.String, StrVal: "101%"}
return []client.Object{object}
},
},
{
Name: "Steps.Replicas is illegal value, '0%'",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.Steps[1].Replicas = &intstr.IntOrString{Type: intstr.String, StrVal: "0%"}
return []client.Object{object}
},
},
{
Name: "Steps.Weight is illegal value, 0",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.Steps[1].Weight = 0
return []client.Object{object}
},
},
{
Name: "Steps.Weight is illegal value, 101",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.Steps[1].Weight = 101
return []client.Object{object}
},
},
//{
// Name: "The last Steps.Weight is not 100",
// Succeed: false,
// GetObject: func() []client.Object {
// object := rollout.DeepCopy()
// n := len(object.Spec.Strategy.Canary.Steps)
// object.Spec.Strategy.Canary.Steps[n-1].Weight = 80
// return []client.Object{object}
// },
//},
{
Name: "Wrong objectRef type",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.ObjectRef.Type = "Whatever"
return []client.Object{object}
},
},
{
Name: "Wrong strategy type",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Type = "Whatever"
return []client.Object{object}
},
},
{
Name: "Wrong Traffic type",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.TrafficRouting.Type = "Whatever"
return []client.Object{object}
},
},
/****************************************************************
The following cases are conflict cases
***************************************************************/
{
Name: "Without conflict",
Succeed: true,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object1 := rollout.DeepCopy()
object1.Name = "object-1"
object1.Spec.ObjectRef.WorkloadRef.Name = "another"
object2 := rollout.DeepCopy()
object2.Name = "object-2"
object2.Spec.ObjectRef.WorkloadRef.APIVersion = "another"
object3 := rollout.DeepCopy()
object3.Name = "object-3"
object3.Spec.ObjectRef.WorkloadRef.Kind = "another"
return []client.Object{
object, object1, object2, object3,
}
},
},
{
Name: "With conflict",
Succeed: false,
GetObject: func() []client.Object {
object := rollout.DeepCopy()
object1 := rollout.DeepCopy()
object1.Name = "object-1"
object1.Spec.ObjectRef.WorkloadRef.Name = "another"
object2 := rollout.DeepCopy()
object2.Name = "object-2"
object2.Spec.ObjectRef.WorkloadRef.APIVersion = "another"
object3 := rollout.DeepCopy()
object3.Name = "object-3"
object3.Spec.ObjectRef.WorkloadRef.Kind = "another"
object4 := rollout.DeepCopy()
object4.Name = "object-4"
return []client.Object{
object, object1, object2, object3, object4,
}
},
},
}
for _, cs := range cases {
t.Run(cs.Name, func(t *testing.T) {
objects := cs.GetObject()
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objects...).Build()
handler := RolloutCreateUpdateHandler{
Client: cli,
}
errList := handler.validateRollout(objects[0].(*appsv1alpha1.Rollout))
t.Log(errList)
Expect(len(errList) == 0).Should(Equal(cs.Succeed))
})
}
}
func TestRolloutValidateUpdate(t *testing.T) {
RegisterFailHandler(Fail)
cases := []struct {
Name string
Succeed bool
GetOldObject func() client.Object
GetNewObject func() client.Object
}{
{
Name: "Normal case",
Succeed: true,
GetOldObject: func() client.Object {
object := rollout.DeepCopy()
return object
},
GetNewObject: func() client.Object {
object := rollout.DeepCopy()
object.Spec.Strategy.Canary.Steps[0].Weight = 5
return object
},
},
{
Name: "Rollout is progressing, but spec not changed",
Succeed: true,
GetOldObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.Phase = appsv1alpha1.RolloutPhaseProgressing
return object
},
GetNewObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.Phase = appsv1alpha1.RolloutPhaseProgressing
object.Status.CanaryStatus.CurrentStepIndex = 1
return object
},
},
{
Name: "Rollout is progressing, and spec changed",
Succeed: false,
GetOldObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.Phase = appsv1alpha1.RolloutPhaseProgressing
return object
},
GetNewObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.Phase = appsv1alpha1.RolloutPhaseProgressing
object.Spec.Strategy.Canary.Steps[0].Weight = 5
return object
},
},
{
Name: "Rollout is terminating, and spec changed",
Succeed: false,
GetOldObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.Phase = appsv1alpha1.RolloutPhaseTerminating
return object
},
GetNewObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.Phase = appsv1alpha1.RolloutPhaseTerminating
object.Spec.Strategy.Canary.Steps[0].Weight = 5
return object
},
},
{
Name: "Rollout is initial, and spec changed",
Succeed: true,
GetOldObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.Phase = appsv1alpha1.RolloutPhaseInitial
return object
},
GetNewObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.Phase = appsv1alpha1.RolloutPhaseInitial
object.Spec.Strategy.Canary.Steps[0].Weight = 5
return object
},
},
{
Name: "Rollout is healthy, and spec changed",
Succeed: true,
GetOldObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.Phase = appsv1alpha1.RolloutPhaseHealthy
return object
},
GetNewObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.Phase = appsv1alpha1.RolloutPhaseHealthy
object.Spec.Strategy.Canary.Steps[0].Weight = 5
return object
},
},
{
Name: "Rollout canary state: paused -> completed",
Succeed: true,
GetOldObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.CanaryStatus.CurrentStepState = appsv1alpha1.CanaryStepStatePaused
return object
},
GetNewObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.CanaryStatus.CurrentStepState = appsv1alpha1.CanaryStepStateCompleted
return object
},
},
{
Name: "Rollout canary state: completed -> completed",
Succeed: true,
GetOldObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.CanaryStatus.CurrentStepState = appsv1alpha1.CanaryStepStateCompleted
return object
},
GetNewObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.CanaryStatus.CurrentStepState = appsv1alpha1.CanaryStepStateCompleted
return object
},
},
{
Name: "Rollout canary state: upgrade -> completed",
Succeed: false,
GetOldObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.CanaryStatus.CurrentStepState = appsv1alpha1.CanaryStepStateUpgrade
return object
},
GetNewObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.CanaryStatus.CurrentStepState = appsv1alpha1.CanaryStepStateCompleted
return object
},
},
{
Name: "Rollout canary state: routing -> completed",
Succeed: false,
GetOldObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.CanaryStatus.CurrentStepState = appsv1alpha1.CanaryStepStateTrafficRouting
return object
},
GetNewObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.CanaryStatus.CurrentStepState = appsv1alpha1.CanaryStepStateCompleted
return object
},
},
{
Name: "Rollout canary state: analysis -> completed",
Succeed: false,
GetOldObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.CanaryStatus.CurrentStepState = appsv1alpha1.CanaryStepStateMetricsAnalysis
return object
},
GetNewObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.CanaryStatus.CurrentStepState = appsv1alpha1.CanaryStepStateCompleted
return object
},
},
{
Name: "Rollout canary state: others -> completed",
Succeed: false,
GetOldObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.CanaryStatus.CurrentStepState = "Whatever"
return object
},
GetNewObject: func() client.Object {
object := rollout.DeepCopy()
object.Status.CanaryStatus.CurrentStepState = appsv1alpha1.CanaryStepStateCompleted
return object
},
},
}
for _, cs := range cases {
t.Run(cs.Name, func(t *testing.T) {
oldObject := cs.GetOldObject()
newObject := cs.GetNewObject()
cli := fake.NewClientBuilder().WithScheme(scheme).WithObjects(oldObject).Build()
handler := RolloutCreateUpdateHandler{
Client: cli,
}
errList := handler.validateRolloutUpdate(oldObject.(*appsv1alpha1.Rollout), newObject.(*appsv1alpha1.Rollout))
t.Log(errList)
Expect(len(errList) == 0).Should(Equal(cs.Succeed))
})
}
}

View File

@ -0,0 +1,30 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package validating
import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
// +kubebuilder:webhook:path=/validate-rollouts-kruise-io-rollout,mutating=false,failurePolicy=fail,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=rollouts.kruise.io,resources=rollouts,verbs=create;update,versions=v1alpha1,name=vrollout.kb.io
var (
// HandlerMap contains admission webhook handlers
HandlerMap = map[string]admission.Handler{
"validate-rollouts-kruise-io-rollout": &RolloutCreateUpdateHandler{},
}
)

124
webhook/server.go Normal file
View File

@ -0,0 +1,124 @@
/*
Copyright 2020 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package webhook
import (
"context"
"fmt"
"time"
webhookutil "github.com/openkruise/rollouts/webhook/util"
webhookcontroller "github.com/openkruise/rollouts/webhook/util/controller"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
type GateFunc func() (enabled bool)
var (
// HandlerMap contains all admission webhook handlers.
HandlerMap = map[string]admission.Handler{}
handlerGates = map[string]GateFunc{}
)
func addHandlers(m map[string]admission.Handler) {
addHandlersWithGate(m, nil)
}
func addHandlersWithGate(m map[string]admission.Handler, fn GateFunc) {
for path, handler := range m {
if len(path) == 0 {
klog.Warningf("Skip handler with empty path.")
continue
}
if path[0] != '/' {
path = "/" + path
}
_, found := HandlerMap[path]
if found {
klog.V(1).Infof("conflicting webhook builder path %v in handler map", path)
}
klog.Infof("add webhook path(%s)", path)
HandlerMap[path] = handler
if fn != nil {
handlerGates[path] = fn
}
}
}
func filterActiveHandlers() {
disablePaths := sets.NewString()
for path := range HandlerMap {
if fn, ok := handlerGates[path]; ok {
if !fn() {
disablePaths.Insert(path)
}
}
}
for _, path := range disablePaths.List() {
delete(HandlerMap, path)
}
}
func SetupWithManager(mgr manager.Manager) error {
server := mgr.GetWebhookServer()
server.Host = "0.0.0.0"
server.Port = webhookutil.GetPort()
server.CertDir = webhookutil.GetCertDir()
// register admission handlers
filterActiveHandlers()
for path, handler := range HandlerMap {
server.Register(path, &webhook.Admission{Handler: handler})
klog.V(3).Infof("Registered webhook handler %s", path)
}
err := initialize(context.TODO(), mgr.GetConfig())
if err != nil {
return err
}
klog.Infof("webhook init done")
return nil
}
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=admissionregistration.k8s.io,resources=mutatingwebhookconfigurations,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=admissionregistration.k8s.io,resources=validatingwebhookconfigurations,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get;list;watch;update;patch
func initialize(ctx context.Context, cfg *rest.Config) error {
c, err := webhookcontroller.New(cfg, HandlerMap)
if err != nil {
return err
}
go func() {
c.Start(ctx)
}()
timer := time.NewTimer(time.Second * 20)
defer timer.Stop()
select {
case <-webhookcontroller.Inited():
return nil
case <-timer.C:
return fmt.Errorf("failed to start webhook controller for waiting more than 20s")
}
}

View File

@ -0,0 +1,179 @@
/*
Copyright 2020 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package configuration
import (
"context"
"encoding/json"
"fmt"
"net/url"
"reflect"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
webhookutil "github.com/openkruise/rollouts/webhook/util"
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
const (
mutatingWebhookConfigurationName = "kruise-rollout-mutating-webhook-configuration"
validatingWebhookConfigurationName = "kruise-rollout-validating-webhook-configuration"
)
func Ensure(kubeClient clientset.Interface, handlers map[string]admission.Handler, caBundle []byte) error {
mutatingConfig, err := kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Get(context.TODO(), mutatingWebhookConfigurationName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("not found MutatingWebhookConfiguration %s", mutatingWebhookConfigurationName)
}
validatingConfig, err := kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Get(context.TODO(), validatingWebhookConfigurationName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("not found ValidatingWebhookConfiguration %s", validatingWebhookConfigurationName)
}
oldMutatingConfig := mutatingConfig.DeepCopy()
oldValidatingConfig := validatingConfig.DeepCopy()
mutatingTemplate, err := parseMutatingTemplate(mutatingConfig)
if err != nil {
return err
}
validatingTemplate, err := parseValidatingTemplate(validatingConfig)
if err != nil {
return err
}
var mutatingWHs []admissionregistrationv1.MutatingWebhook
for i := range mutatingTemplate {
wh := &mutatingTemplate[i]
wh.ClientConfig.CABundle = caBundle
path, err := getPath(&wh.ClientConfig)
if err != nil {
return err
}
if _, ok := handlers[path]; !ok {
klog.Warningf("Ignore webhook for %s in configuration", path)
continue
}
if wh.ClientConfig.Service != nil {
wh.ClientConfig.Service.Namespace = webhookutil.GetNamespace()
wh.ClientConfig.Service.Name = webhookutil.GetServiceName()
}
if host := webhookutil.GetHost(); len(host) > 0 && wh.ClientConfig.Service != nil {
convertClientConfig(&wh.ClientConfig, host, webhookutil.GetPort())
}
mutatingWHs = append(mutatingWHs, *wh)
}
mutatingConfig.Webhooks = mutatingWHs
var validatingWHs []admissionregistrationv1.ValidatingWebhook
for i := range validatingTemplate {
wh := &validatingTemplate[i]
wh.ClientConfig.CABundle = caBundle
path, err := getPath(&wh.ClientConfig)
if err != nil {
return err
}
if _, ok := handlers[path]; !ok {
klog.Warningf("Ignore webhook for %s in configuration", path)
continue
}
if wh.ClientConfig.Service != nil {
wh.ClientConfig.Service.Namespace = webhookutil.GetNamespace()
wh.ClientConfig.Service.Name = webhookutil.GetServiceName()
}
if host := webhookutil.GetHost(); len(host) > 0 && wh.ClientConfig.Service != nil {
convertClientConfig(&wh.ClientConfig, host, webhookutil.GetPort())
}
validatingWHs = append(validatingWHs, *wh)
}
validatingConfig.Webhooks = validatingWHs
if !reflect.DeepEqual(mutatingConfig, oldMutatingConfig) {
if _, err := kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Update(context.TODO(), mutatingConfig, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update %s: %v", mutatingWebhookConfigurationName, err)
}
}
if !reflect.DeepEqual(validatingConfig, oldValidatingConfig) {
if _, err := kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Update(context.TODO(), validatingConfig, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to update %s: %v", validatingWebhookConfigurationName, err)
}
}
return nil
}
func getPath(clientConfig *admissionregistrationv1.WebhookClientConfig) (string, error) {
if clientConfig.Service != nil {
return *clientConfig.Service.Path, nil
} else if clientConfig.URL != nil {
u, err := url.Parse(*clientConfig.URL)
if err != nil {
return "", err
}
return u.Path, nil
}
return "", fmt.Errorf("invalid clientConfig: %+v", clientConfig)
}
func convertClientConfig(clientConfig *admissionregistrationv1.WebhookClientConfig, host string, port int) {
url := fmt.Sprintf("https://%s:%d%s", host, port, *clientConfig.Service.Path)
clientConfig.URL = &url
clientConfig.Service = nil
}
func parseMutatingTemplate(mutatingConfig *admissionregistrationv1.MutatingWebhookConfiguration) ([]admissionregistrationv1.MutatingWebhook, error) {
if templateStr := mutatingConfig.Annotations["template"]; len(templateStr) > 0 {
var mutatingWHs []admissionregistrationv1.MutatingWebhook
if err := json.Unmarshal([]byte(templateStr), &mutatingWHs); err != nil {
return nil, err
}
return mutatingWHs, nil
}
templateBytes, err := json.Marshal(mutatingConfig.Webhooks)
if err != nil {
return nil, err
}
if mutatingConfig.Annotations == nil {
mutatingConfig.Annotations = make(map[string]string, 1)
}
mutatingConfig.Annotations["template"] = string(templateBytes)
return mutatingConfig.Webhooks, nil
}
func parseValidatingTemplate(validatingConfig *admissionregistrationv1.ValidatingWebhookConfiguration) ([]admissionregistrationv1.ValidatingWebhook, error) {
if templateStr := validatingConfig.Annotations["template"]; len(templateStr) > 0 {
var validatingWHs []admissionregistrationv1.ValidatingWebhook
if err := json.Unmarshal([]byte(templateStr), &validatingWHs); err != nil {
return nil, err
}
return validatingWHs, nil
}
templateBytes, err := json.Marshal(validatingConfig.Webhooks)
if err != nil {
return nil, err
}
if validatingConfig.Annotations == nil {
validatingConfig.Annotations = make(map[string]string, 1)
}
validatingConfig.Annotations["template"] = string(templateBytes)
return validatingConfig.Webhooks, nil
}

View File

@ -0,0 +1,273 @@
/*
Copyright 2020 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"sync"
"time"
webhookutil "github.com/openkruise/rollouts/webhook/util"
"github.com/openkruise/rollouts/webhook/util/configuration"
"github.com/openkruise/rollouts/webhook/util/generator"
"github.com/openkruise/rollouts/webhook/util/writer"
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
v1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
apiextensionslisters "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
admissionregistrationinformers "k8s.io/client-go/informers/admissionregistration/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
const (
mutatingWebhookConfigurationName = "kruise-rollout-mutating-webhook-configuration"
validatingWebhookConfigurationName = "kruise-rollout-validating-webhook-configuration"
defaultResyncPeriod = time.Minute
)
var (
namespace = webhookutil.GetNamespace()
secretName = webhookutil.GetSecretName()
uninit = make(chan struct{})
onceInit = sync.Once{}
)
func Inited() chan struct{} {
return uninit
}
type Controller struct {
kubeClient clientset.Interface
handlers map[string]admission.Handler
informerFactory informers.SharedInformerFactory
//secretLister corelisters.SecretNamespaceLister
//mutatingWCLister admissionregistrationlisters.MutatingWebhookConfigurationLister
//validatingWCLister admissionregistrationlisters.ValidatingWebhookConfigurationLister
crdClient apiextensionsclientset.Interface
crdInformer cache.SharedIndexInformer
crdLister apiextensionslisters.CustomResourceDefinitionLister
synced []cache.InformerSynced
queue workqueue.RateLimitingInterface
}
func New(cfg *rest.Config, handlers map[string]admission.Handler) (*Controller, error) {
kubeClient, err := clientset.NewForConfig(cfg)
if err != nil {
return nil, err
}
c := &Controller{
kubeClient: kubeClient,
handlers: handlers,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "webhook-controller"),
}
c.informerFactory = informers.NewSharedInformerFactory(c.kubeClient, 0)
secretInformer := coreinformers.New(c.informerFactory, namespace, nil).Secrets()
admissionRegistrationInformer := admissionregistrationinformers.New(c.informerFactory, v1.NamespaceAll, nil)
//c.secretLister = secretInformer.Lister().Secrets(namespace)
//c.mutatingWCLister = admissionRegistrationInformer.MutatingWebhookConfigurations().Lister()
//c.validatingWCLister = admissionRegistrationInformer.ValidatingWebhookConfigurations().Lister()
secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
secret := obj.(*v1.Secret)
if secret.Name == secretName {
klog.Infof("Secret %s added", secretName)
c.queue.Add("")
}
},
UpdateFunc: func(old, cur interface{}) {
secret := cur.(*v1.Secret)
if secret.Name == secretName {
klog.Infof("Secret %s updated", secretName)
c.queue.Add("")
}
},
})
admissionRegistrationInformer.MutatingWebhookConfigurations().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
conf := obj.(*admissionregistrationv1.MutatingWebhookConfiguration)
if conf.Name == mutatingWebhookConfigurationName {
klog.Infof("MutatingWebhookConfiguration %s added", mutatingWebhookConfigurationName)
c.queue.Add("")
}
},
UpdateFunc: func(old, cur interface{}) {
conf := cur.(*admissionregistrationv1.MutatingWebhookConfiguration)
if conf.Name == mutatingWebhookConfigurationName {
klog.Infof("MutatingWebhookConfiguration %s update", mutatingWebhookConfigurationName)
c.queue.Add("")
}
},
})
admissionRegistrationInformer.ValidatingWebhookConfigurations().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
conf := obj.(*admissionregistrationv1.ValidatingWebhookConfiguration)
if conf.Name == validatingWebhookConfigurationName {
klog.Infof("ValidatingWebhookConfiguration %s added", validatingWebhookConfigurationName)
c.queue.Add("")
}
},
UpdateFunc: func(old, cur interface{}) {
conf := cur.(*admissionregistrationv1.ValidatingWebhookConfiguration)
if conf.Name == validatingWebhookConfigurationName {
klog.Infof("ValidatingWebhookConfiguration %s updated", validatingWebhookConfigurationName)
c.queue.Add("")
}
},
})
c.crdClient = apiextensionsclientset.NewForConfigOrDie(cfg)
c.crdInformer = apiextensionsinformers.NewCustomResourceDefinitionInformer(c.crdClient, 0, cache.Indexers{})
c.crdInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
crd := obj.(*apiextensionsv1.CustomResourceDefinition)
if crd.Spec.Group == "rollouts.kruise.io" {
klog.Infof("CustomResourceDefinition %s added", crd.Name)
c.queue.Add("")
}
},
UpdateFunc: func(old, cur interface{}) {
crd := cur.(*apiextensionsv1.CustomResourceDefinition)
if crd.Spec.Group == "rollouts.kruise.io" {
klog.Infof("CustomResourceDefinition %s updated", crd.Name)
c.queue.Add("")
}
},
})
c.crdLister = apiextensionslisters.NewCustomResourceDefinitionLister(c.crdInformer.GetIndexer())
c.synced = []cache.InformerSynced{
secretInformer.Informer().HasSynced,
admissionRegistrationInformer.MutatingWebhookConfigurations().Informer().HasSynced,
admissionRegistrationInformer.ValidatingWebhookConfigurations().Informer().HasSynced,
c.crdInformer.HasSynced,
}
return c, nil
}
func (c *Controller) Start(ctx context.Context) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.Infof("Starting webhook-controller")
defer klog.Infof("Shutting down webhook-controller")
c.informerFactory.Start(ctx.Done())
go func() {
c.crdInformer.Run(ctx.Done())
}()
if !cache.WaitForNamedCacheSync("webhook-controller", ctx.Done(), c.synced...) {
return
}
go wait.Until(func() {
for c.processNextWorkItem() {
}
}, time.Second, ctx.Done())
klog.Infof("Started webhook-controller")
<-ctx.Done()
}
func (c *Controller) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.sync()
if err == nil {
c.queue.AddAfter(key, defaultResyncPeriod)
c.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
c.queue.AddRateLimited(key)
return true
}
func (c *Controller) sync() error {
klog.Infof("Starting to sync webhook certs and configurations")
defer func() {
klog.Infof("Finished to sync webhook certs and configurations")
}()
var dnsName string
var certWriter writer.CertWriter
var err error
if dnsName = webhookutil.GetHost(); len(dnsName) == 0 {
dnsName = generator.ServiceToCommonName(webhookutil.GetNamespace(), webhookutil.GetServiceName())
}
certWriterType := webhookutil.GetCertWriter()
if certWriterType == writer.FsCertWriter || (len(certWriterType) == 0 && len(webhookutil.GetHost()) != 0) {
certWriter, err = writer.NewFSCertWriter(writer.FSCertWriterOptions{
Path: webhookutil.GetCertDir(),
})
} else {
certWriter, err = writer.NewSecretCertWriter(writer.SecretCertWriterOptions{
Clientset: c.kubeClient,
Secret: &types.NamespacedName{Namespace: webhookutil.GetNamespace(), Name: webhookutil.GetSecretName()},
})
}
if err != nil {
return fmt.Errorf("failed to ensure certs: %v", err)
}
certs, _, err := certWriter.EnsureCert(dnsName)
if err != nil {
return fmt.Errorf("failed to ensure certs: %v", err)
}
if err := writer.WriteCertsToDir(webhookutil.GetCertDir(), certs); err != nil {
return fmt.Errorf("failed to write certs to dir: %v", err)
}
if err := configuration.Ensure(c.kubeClient, c.handlers, certs.CACert); err != nil {
return fmt.Errorf("failed to ensure configuration: %v", err)
}
onceInit.Do(func() {
close(uninit)
})
return nil
}

View File

@ -0,0 +1,41 @@
/*
Copyright 2020 The Kruise Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generator
// Artifacts hosts a private key, its corresponding serving certificate and
// the CA certificate that signs the serving certificate.
type Artifacts struct {
// PEM encoded private key
Key []byte
// PEM encoded serving certificate
Cert []byte
// PEM encoded CA private key
CAKey []byte
// PEM encoded CA certificate
CACert []byte
// Resource version of the certs
ResourceVersion string
}
// CertGenerator is an interface to provision the serving certificate.
type CertGenerator interface {
// Generate returns a Artifacts struct.
Generate(CommonName string) (*Artifacts, error)
// SetCA sets the PEM-encoded CA private key and CA cert for signing the generated serving cert.
SetCA(caKey, caCert []byte)
}

View File

@ -0,0 +1,54 @@
/*
Copyright 2020 The Kruise Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fake
import (
"bytes"
"fmt"
"github.com/openkruise/rollouts/webhook/util/generator"
)
// CertGenerator is a certGenerator for testing.
type CertGenerator struct {
CAKey []byte
CACert []byte
DNSNameToCertArtifacts map[string]*generator.Artifacts
}
var _ generator.CertGenerator = &CertGenerator{}
// SetCA sets the PEM-encoded CA private key and CA cert for signing the generated serving cert.
func (cp *CertGenerator) SetCA(CAKey, CACert []byte) {
cp.CAKey = CAKey
cp.CACert = CACert
}
// Generate generates certificates by matching a common name.
func (cp *CertGenerator) Generate(commonName string) (*generator.Artifacts, error) {
certs, found := cp.DNSNameToCertArtifacts[commonName]
if !found {
return nil, fmt.Errorf("failed to find common name %q in the certGenerator", commonName)
}
if cp.CAKey != nil && cp.CACert != nil &&
!bytes.Contains(cp.CAKey, []byte("invalid")) && !bytes.Contains(cp.CACert, []byte("invalid")) {
certs.CAKey = cp.CAKey
certs.CACert = cp.CACert
}
return certs, nil
}

View File

@ -0,0 +1,197 @@
/*
Copyright 2020 The Kruise Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generator
import (
"crypto"
cryptorand "crypto/rand"
"crypto/rsa"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"errors"
"fmt"
"math"
"math/big"
"net"
"time"
"k8s.io/client-go/util/cert"
"k8s.io/client-go/util/keyutil"
)
const (
rsaKeySize = 2048
)
// ServiceToCommonName generates the CommonName for the certificate when using a k8s service.
func ServiceToCommonName(serviceNamespace, serviceName string) string {
return fmt.Sprintf("%s.%s.svc", serviceName, serviceNamespace)
}
// SelfSignedCertGenerator implements the certGenerator interface.
// It provisions self-signed certificates.
type SelfSignedCertGenerator struct {
caKey []byte
caCert []byte
}
var _ CertGenerator = &SelfSignedCertGenerator{}
// SetCA sets the PEM-encoded CA private key and CA cert for signing the generated serving cert.
func (cp *SelfSignedCertGenerator) SetCA(caKey, caCert []byte) {
cp.caKey = caKey
cp.caCert = caCert
}
// Generate creates and returns a CA certificate, certificate and
// key for the server. serverKey and serverCert are used by the server
// to establish trust for clients, CA certificate is used by the
// client to verify the server authentication chain.
// The cert will be valid for 365 days.
func (cp *SelfSignedCertGenerator) Generate(commonName string) (*Artifacts, error) {
var signingKey *rsa.PrivateKey
var signingCert *x509.Certificate
var valid bool
var err error
valid, signingKey, signingCert = cp.validCACert()
if !valid {
signingKey, err = NewPrivateKey()
if err != nil {
return nil, fmt.Errorf("failed to create the CA private key: %v", err)
}
signingCert, err = cert.NewSelfSignedCACert(cert.Config{CommonName: "webhook-cert-ca"}, signingKey)
if err != nil {
return nil, fmt.Errorf("failed to create the CA cert: %v", err)
}
}
hostIP := net.ParseIP(commonName)
var altIPs []net.IP
DNSNames := []string{"localhost"}
if hostIP.To4() != nil {
altIPs = append(altIPs, hostIP.To4())
} else {
DNSNames = append(DNSNames, commonName)
}
key, err := NewPrivateKey()
if err != nil {
return nil, fmt.Errorf("failed to create the private key: %v", err)
}
signedCert, err := NewSignedCert(
cert.Config{
CommonName: commonName,
Usages: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
AltNames: cert.AltNames{IPs: altIPs, DNSNames: DNSNames},
},
key, signingCert, signingKey,
)
if err != nil {
return nil, fmt.Errorf("failed to create the cert: %v", err)
}
return &Artifacts{
Key: EncodePrivateKeyPEM(key),
Cert: EncodeCertPEM(signedCert),
CAKey: EncodePrivateKeyPEM(signingKey),
CACert: EncodeCertPEM(signingCert),
}, nil
}
func (cp *SelfSignedCertGenerator) validCACert() (bool, *rsa.PrivateKey, *x509.Certificate) {
if !ValidCACert(cp.caKey, cp.caCert, cp.caCert, "",
time.Now().AddDate(1, 0, 0)) {
return false, nil, nil
}
var ok bool
key, err := keyutil.ParsePrivateKeyPEM(cp.caKey)
if err != nil {
return false, nil, nil
}
privateKey, ok := key.(*rsa.PrivateKey)
if !ok {
return false, nil, nil
}
certs, err := cert.ParseCertsPEM(cp.caCert)
if err != nil {
return false, nil, nil
}
if len(certs) != 1 {
return false, nil, nil
}
return true, privateKey, certs[0]
}
// NewPrivateKey creates an RSA private key
func NewPrivateKey() (*rsa.PrivateKey, error) {
return rsa.GenerateKey(cryptorand.Reader, rsaKeySize)
}
// NewSignedCert creates a signed certificate using the given CA certificate and key
func NewSignedCert(cfg cert.Config, key crypto.Signer, caCert *x509.Certificate, caKey crypto.Signer) (*x509.Certificate, error) {
serial, err := cryptorand.Int(cryptorand.Reader, new(big.Int).SetInt64(math.MaxInt64))
if err != nil {
return nil, err
}
if len(cfg.CommonName) == 0 {
return nil, errors.New("must specify a CommonName")
}
if len(cfg.Usages) == 0 {
return nil, errors.New("must specify at least one ExtKeyUsage")
}
certTmpl := x509.Certificate{
Subject: pkix.Name{
CommonName: cfg.CommonName,
Organization: cfg.Organization,
},
DNSNames: cfg.AltNames.DNSNames,
IPAddresses: cfg.AltNames.IPs,
SerialNumber: serial,
NotBefore: caCert.NotBefore,
NotAfter: time.Now().Add(time.Hour * 24 * 365 * 10).UTC(),
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
ExtKeyUsage: cfg.Usages,
}
certDERBytes, err := x509.CreateCertificate(cryptorand.Reader, &certTmpl, caCert, key.Public(), caKey)
if err != nil {
return nil, err
}
return x509.ParseCertificate(certDERBytes)
}
// EncodePrivateKeyPEM returns PEM-encoded private key data
func EncodePrivateKeyPEM(key *rsa.PrivateKey) []byte {
block := pem.Block{
Type: keyutil.RSAPrivateKeyBlockType,
Bytes: x509.MarshalPKCS1PrivateKey(key),
}
return pem.EncodeToMemory(&block)
}
// EncodeCertPEM returns PEM-encoded certificate data
func EncodeCertPEM(ct *x509.Certificate) []byte {
block := pem.Block{
Type: cert.CertificateBlockType,
Bytes: ct.Raw,
}
return pem.EncodeToMemory(&block)
}

View File

@ -0,0 +1,62 @@
/*
Copyright 2020 The Kruise Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generator
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"time"
)
// ValidCACert think cert and key are valid if they meet the following requirements:
// - key and cert are valid pair
// - caCert is the root ca of cert
// - cert is for dnsName
// - cert won't expire before time
func ValidCACert(key, cert, caCert []byte, dnsName string, time time.Time) bool {
if len(key) == 0 || len(cert) == 0 || len(caCert) == 0 {
return false
}
// Verify key and cert are valid pair
_, err := tls.X509KeyPair(cert, key)
if err != nil {
return false
}
// Verify cert is valid for at least 1 year.
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(caCert) {
return false
}
block, _ := pem.Decode(cert)
if block == nil {
return false
}
c, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return false
}
ops := x509.VerifyOptions{
DNSName: dnsName,
Roots: pool,
CurrentTime: time,
}
_, err = c.Verify(ops)
return err == nil
}

72
webhook/util/util.go Normal file
View File

@ -0,0 +1,72 @@
/*
Copyright 2020 The rollout Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"os"
"strconv"
"k8s.io/klog/v2"
)
func GetHost() string {
return os.Getenv("WEBHOOK_HOST")
}
func GetNamespace() string {
if ns := os.Getenv("POD_NAMESPACE"); len(ns) > 0 {
return ns
}
return "kruise-rollout"
}
func GetSecretName() string {
if name := os.Getenv("SECRET_NAME"); len(name) > 0 {
return name
}
return "kruise-rollout-webhook-certs"
}
func GetServiceName() string {
if name := os.Getenv("SERVICE_NAME"); len(name) > 0 {
return name
}
return "kruise-rollout-webhook-service"
}
func GetPort() int {
port := 9876
if p := os.Getenv("WEBHOOK_PORT"); len(p) > 0 {
if p, err := strconv.ParseInt(p, 10, 32); err == nil {
port = int(p)
} else {
klog.Fatalf("failed to convert WEBHOOK_PORT=%v in env: %v", p, err)
}
}
return port
}
func GetCertDir() string {
if p := os.Getenv("WEBHOOK_CERT_DIR"); len(p) > 0 {
return p
}
return "/tmp/kruise-rollout-webhook-certs"
}
func GetCertWriter() string {
return os.Getenv("WEBHOOK_CERT_WRITER")
}

View File

@ -0,0 +1,452 @@
/*
Copyright 2020 The Kruise Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package atomic
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"runtime"
"strings"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)
const (
maxFileNameLength = 255
maxPathLength = 4096
)
// Writer handles atomically projecting content for a set of files into
// a target directory.
//
// Note:
//
// 1. Writer reserves the set of pathnames starting with `..`.
// 2. Writer offers no concurrency guarantees and must be synchronized
// by the caller.
//
// The visible files in this volume are symlinks to files in the writer's data
// directory. Actual files are stored in a hidden timestamped directory which
// is symlinked to by the data directory. The timestamped directory and
// data directory symlink are created in the writer's target dir.  This scheme
// allows the files to be atomically updated by changing the target of the
// data directory symlink.
//
// Consumers of the target directory can monitor the ..data symlink using
// inotify or fanotify to receive events when the content in the volume is
// updated.
type Writer struct {
targetDir string
}
type FileProjection struct {
Data []byte
Mode int32
}
// NewAtomicWriter creates a new Writer configured to write to the given
// target directory, or returns an error if the target directory does not exist.
func NewAtomicWriter(targetDir string) (*Writer, error) {
_, err := os.Stat(targetDir)
if os.IsNotExist(err) {
return nil, err
}
return &Writer{targetDir: targetDir}, nil
}
const (
dataDirName = "..data"
newDataDirName = "..data_tmp"
)
// Write does an atomic projection of the given payload into the writer's target
// directory. Input paths must not begin with '..'.
//
// The Write algorithm is:
//
// 1. The payload is validated; if the payload is invalid, the function returns
// 2.  The current timestamped directory is detected by reading the data directory
// symlink
// 3. The old version of the volume is walked to determine whether any
// portion of the payload was deleted and is still present on disk.
// 4. The data in the current timestamped directory is compared to the projected
// data to determine if an update is required.
// 5.  A new timestamped dir is created
// 6. The payload is written to the new timestamped directory
// 7.  Symlinks and directory for new user-visible files are created (if needed).
//
// For example, consider the files:
// <target-dir>/podName
// <target-dir>/user/labels
// <target-dir>/k8s/annotations
//
// The user visible files are symbolic links into the internal data directory:
// <target-dir>/podName -> ..data/podName
// <target-dir>/usr -> ..data/usr
// <target-dir>/k8s -> ..data/k8s
//
// The data directory itself is a link to a timestamped directory with
// the real data:
// <target-dir>/..data -> ..2016_02_01_15_04_05.12345678/
// 8.  A symlink to the new timestamped directory ..data_tmp is created that will
// become the new data directory
// 9.  The new data directory symlink is renamed to the data directory; rename is atomic
// 10. Old paths are removed from the user-visible portion of the target directory
// 11.  The previous timestamped directory is removed, if it exists
func (w *Writer) Write(payload map[string]FileProjection) error {
// (1)
cleanPayload, err := validatePayload(payload)
if err != nil {
klog.Error(err, "invalid payload")
return err
}
// (2)
dataDirPath := path.Join(w.targetDir, dataDirName)
oldTsDir, err := os.Readlink(dataDirPath)
if err != nil {
if !os.IsNotExist(err) {
klog.Error(err, "unable to read link for data directory")
return err
}
// although Readlink() returns "" on err, don't be fragile by relying on it (since it's not specified in docs)
// empty oldTsDir indicates that it didn't exist
oldTsDir = ""
}
oldTsPath := path.Join(w.targetDir, oldTsDir)
var pathsToRemove sets.String
// if there was no old version, there's nothing to remove
if len(oldTsDir) != 0 {
// (3)
pathsToRemove, err = w.pathsToRemove(cleanPayload, oldTsPath)
if err != nil {
klog.Error(err, "unable to determine user-visible files to remove")
return err
}
// (4)
if should, err := shouldWritePayload(cleanPayload, oldTsPath); err != nil {
klog.Error(err, "unable to determine whether payload should be written to disk")
return err
} else if !should && len(pathsToRemove) == 0 {
klog.V(6).Info("no update required for target directory", "directory", w.targetDir)
return nil
} else {
klog.V(1).Info("write required for target directory", "directory", w.targetDir)
}
}
// (5)
tsDir, err := w.newTimestampDir()
if err != nil {
klog.Error(err, "error creating new ts data directory")
return err
}
tsDirName := filepath.Base(tsDir)
// (6)
if err = w.writePayloadToDir(cleanPayload, tsDir); err != nil {
klog.Error(err, "unable to write payload to ts data directory", "ts directory", tsDir)
return err
}
klog.V(1).Info("performed write of new data to ts data directory", "ts directory", tsDir)
// (7)
if err = w.createUserVisibleFiles(cleanPayload); err != nil {
klog.Error(err, "unable to create visible symlinks in target directory", "target directory", w.targetDir)
return err
}
// (8)
newDataDirPath := path.Join(w.targetDir, newDataDirName)
if err = os.Symlink(tsDirName, newDataDirPath); err != nil {
os.RemoveAll(tsDir)
klog.Error(err, "unable to create symbolic link for atomic update")
return err
}
// (9)
if runtime.GOOS == "windows" {
os.Remove(dataDirPath)
err = os.Symlink(tsDirName, dataDirPath)
os.Remove(newDataDirPath)
} else {
err = os.Rename(newDataDirPath, dataDirPath)
}
if err != nil {
os.Remove(newDataDirPath)
os.RemoveAll(tsDir)
klog.Error(err, "unable to rename symbolic link for data directory", "data directory", newDataDirPath)
return err
}
// (10)
if err = w.removeUserVisiblePaths(pathsToRemove); err != nil {
klog.Error(err, "unable to remove old visible symlinks")
return err
}
// (11)
if len(oldTsDir) > 0 {
if err = os.RemoveAll(oldTsPath); err != nil {
klog.Error(err, "unable to remove old data directory", "data directory", oldTsDir)
return err
}
}
return nil
}
// validatePayload returns an error if any path in the payload returns a copy of the payload with the paths cleaned.
func validatePayload(payload map[string]FileProjection) (map[string]FileProjection, error) {
cleanPayload := make(map[string]FileProjection)
for k, content := range payload {
if err := validatePath(k); err != nil {
return nil, err
}
cleanPayload[filepath.Clean(k)] = content
}
return cleanPayload, nil
}
// validatePath validates a single path, returning an error if the path is
// invalid. paths may not:
//
// 1. be absolute
// 2. contain '..' as an element
// 3. start with '..'
// 4. contain filenames larger than 255 characters
// 5. be longer than 4096 characters
func validatePath(targetPath string) error {
// TODO: somehow unify this with the similar api validation,
// validateVolumeSourcePath; the error semantics are just different enough
// from this that it was time-prohibitive trying to find the right
// refactoring to re-use.
if targetPath == "" {
return fmt.Errorf("invalid path: must not be empty: %q", targetPath)
}
if path.IsAbs(targetPath) {
return fmt.Errorf("invalid path: must be relative path: %s", targetPath)
}
if len(targetPath) > maxPathLength {
return fmt.Errorf("invalid path: must be less than or equal to %d characters", maxPathLength)
}
items := strings.Split(targetPath, string(os.PathSeparator))
for _, item := range items {
if item == ".." {
return fmt.Errorf("invalid path: must not contain '..': %s", targetPath)
}
if len(item) > maxFileNameLength {
return fmt.Errorf("invalid path: filenames must be less than or equal to %d characters", maxFileNameLength)
}
}
if strings.HasPrefix(items[0], "..") && len(items[0]) > 2 {
return fmt.Errorf("invalid path: must not start with '..': %s", targetPath)
}
return nil
}
// shouldWritePayload returns whether the payload should be written to disk.
func shouldWritePayload(payload map[string]FileProjection, oldTsDir string) (bool, error) {
for userVisiblePath, fileProjection := range payload {
shouldWrite, err := shouldWriteFile(path.Join(oldTsDir, userVisiblePath), fileProjection.Data)
if err != nil {
return false, err
}
if shouldWrite {
return true, nil
}
}
return false, nil
}
// shouldWriteFile returns whether a new version of a file should be written to disk.
func shouldWriteFile(path string, content []byte) (bool, error) {
_, err := os.Lstat(path)
if os.IsNotExist(err) {
return true, nil
}
contentOnFs, err := ioutil.ReadFile(path)
if err != nil {
return false, err
}
return !bytes.Equal(content, contentOnFs), nil
}
// pathsToRemove walks the current version of the data directory and
// determines which paths should be removed (if any) after the payload is
// written to the target directory.
func (w *Writer) pathsToRemove(payload map[string]FileProjection, oldTsDir string) (sets.String, error) {
paths := sets.NewString()
visitor := func(path string, info os.FileInfo, err error) error {
relativePath := strings.TrimPrefix(path, oldTsDir)
relativePath = strings.TrimPrefix(relativePath, string(os.PathSeparator))
if relativePath == "" {
return nil
}
paths.Insert(relativePath)
return nil
}
err := filepath.Walk(oldTsDir, visitor)
if os.IsNotExist(err) {
return nil, nil
} else if err != nil {
return nil, err
}
newPaths := sets.NewString()
for file := range payload {
// add all subpaths for the payload to the set of new paths
// to avoid attempting to remove non-empty dirs
for subPath := file; subPath != ""; {
newPaths.Insert(subPath)
subPath, _ = filepath.Split(subPath)
subPath = strings.TrimSuffix(subPath, string(os.PathSeparator))
}
}
result := paths.Difference(newPaths)
if len(result) > 0 {
klog.V(1).Info("paths to remove", "target directory", w.targetDir, "paths", result)
}
return result, nil
}
// newTimestampDir creates a new timestamp directory
func (w *Writer) newTimestampDir() (string, error) {
tsDir, err := ioutil.TempDir(w.targetDir, time.Now().UTC().Format("..2006_01_02_15_04_05."))
if err != nil {
klog.Error(err, "unable to create new temp directory")
return "", err
}
// 0755 permissions are needed to allow 'group' and 'other' to recurse the
// directory tree. do a chmod here to ensure that permissions are set correctly
// regardless of the process' umask.
err = os.Chmod(tsDir, 0755)
if err != nil {
klog.Error(err, "unable to set mode on new temp directory")
return "", err
}
return tsDir, nil
}
// writePayloadToDir writes the given payload to the given directory. The
// directory must exist.
func (w *Writer) writePayloadToDir(payload map[string]FileProjection, dir string) error {
for userVisiblePath, fileProjection := range payload {
content := fileProjection.Data
mode := os.FileMode(fileProjection.Mode)
fullPath := path.Join(dir, userVisiblePath)
baseDir, _ := filepath.Split(fullPath)
err := os.MkdirAll(baseDir, os.ModePerm)
if err != nil {
klog.Error(err, "unable to create directory", "directory", baseDir)
return err
}
err = ioutil.WriteFile(fullPath, content, mode)
if err != nil {
klog.Error(err, "unable to write file", "file", fullPath, "mode", mode)
return err
}
// Chmod is needed because ioutil.WriteFile() ends up calling
// open(2) to create the file, so the final mode used is "mode &
// ~umask". But we want to make sure the specified mode is used
// in the file no matter what the umask is.
err = os.Chmod(fullPath, mode)
if err != nil {
klog.Error(err, "unable to write file", "file", fullPath, "mode", mode)
}
}
return nil
}
// createUserVisibleFiles creates the relative symlinks for all the
// files configured in the payload. If the directory in a file path does not
// exist, it is created.
//
// Viz:
// For files: "bar", "foo/bar", "baz/bar", "foo/baz/blah"
// the following symlinks are created:
// bar -> ..data/bar
// foo -> ..data/foo
// baz -> ..data/baz
func (w *Writer) createUserVisibleFiles(payload map[string]FileProjection) error {
for userVisiblePath := range payload {
slashpos := strings.Index(userVisiblePath, string(os.PathSeparator))
if slashpos == -1 {
slashpos = len(userVisiblePath)
}
linkname := userVisiblePath[:slashpos]
_, err := os.Readlink(path.Join(w.targetDir, linkname))
if err != nil && os.IsNotExist(err) {
// The link into the data directory for this path doesn't exist; create it
visibleFile := path.Join(w.targetDir, linkname)
dataDirFile := path.Join(dataDirName, linkname)
err = os.Symlink(dataDirFile, visibleFile)
if err != nil {
return err
}
}
}
return nil
}
// removeUserVisiblePaths removes the set of paths from the user-visible
// portion of the writer's target directory.
func (w *Writer) removeUserVisiblePaths(paths sets.String) error {
ps := string(os.PathSeparator)
var lasterr error
for p := range paths {
// only remove symlinks from the volume root directory (i.e. items that don't contain '/')
if strings.Contains(p, ps) {
continue
}
if err := os.Remove(path.Join(w.targetDir, p)); err != nil {
klog.Error(err, "unable to prune old user-visible path", "path", p)
lasterr = err
}
}
return lasterr
}

View File

@ -0,0 +1,137 @@
/*
Copyright 2020 The Kruise Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package writer
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
"time"
"github.com/openkruise/rollouts/webhook/util/generator"
"k8s.io/klog/v2"
)
const (
// CAKeyName is the name of the CA private key
CAKeyName = "ca-key.pem"
// CACertName is the name of the CA certificate
CACertName = "ca-cert.pem"
// ServerKeyName is the name of the server private key
ServerKeyName = "key.pem"
ServerKeyName2 = "tls.key"
// ServerCertName is the name of the serving certificate
ServerCertName = "cert.pem"
ServerCertName2 = "tls.crt"
)
// CertWriter provides method to handle webhooks.
type CertWriter interface {
// EnsureCert provisions the cert for the webhookClientConfig.
EnsureCert(dnsName string) (*generator.Artifacts, bool, error)
}
// handleCommon ensures the given webhook has a proper certificate.
// It uses the given certReadWriter to read and (or) write the certificate.
func handleCommon(dnsName string, ch certReadWriter) (*generator.Artifacts, bool, error) {
if len(dnsName) == 0 {
return nil, false, errors.New("dnsName should not be empty")
}
if ch == nil {
return nil, false, errors.New("certReaderWriter should not be nil")
}
certs, changed, err := createIfNotExists(ch)
if err != nil {
return nil, changed, err
}
// Recreate the cert if it's invalid.
valid := validCert(certs, dnsName)
if !valid {
klog.Info("cert is invalid or expiring, regenerating a new one")
certs, err = ch.overwrite(certs.ResourceVersion)
if err != nil {
return nil, false, err
}
changed = true
}
return certs, changed, nil
}
func createIfNotExists(ch certReadWriter) (*generator.Artifacts, bool, error) {
// Try to read first
certs, err := ch.read()
if isNotFound(err) {
// Create if not exists
certs, err = ch.write()
switch {
// This may happen if there is another racer.
case isAlreadyExists(err):
certs, err = ch.read()
return certs, true, err
default:
return certs, true, err
}
}
return certs, false, err
}
// certReadWriter provides methods for reading and writing certificates.
type certReadWriter interface {
// read reads a webhook name and returns the certs for it.
read() (*generator.Artifacts, error)
// write writes the certs and return the certs it wrote.
write() (*generator.Artifacts, error)
// overwrite overwrites the existing certs and return the certs it wrote.
overwrite(resourceVersion string) (*generator.Artifacts, error)
}
func validCert(certs *generator.Artifacts, dnsName string) bool {
if certs == nil || certs.Cert == nil || certs.Key == nil || certs.CACert == nil {
return false
}
// Verify key and cert are valid pair
_, err := tls.X509KeyPair(certs.Cert, certs.Key)
if err != nil {
return false
}
// Verify cert is good for desired DNS name and signed by CA and will be valid for desired period of time.
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(certs.CACert) {
return false
}
block, _ := pem.Decode(certs.Cert)
if block == nil {
return false
}
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return false
}
ops := x509.VerifyOptions{
DNSName: dnsName,
Roots: pool,
CurrentTime: time.Now().AddDate(0, 6, 0),
}
_, err = cert.Verify(ops)
return err == nil
}

View File

@ -0,0 +1,44 @@
/*
Copyright 2020 The Kruise Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package writer
type notFoundError struct {
err error
}
func (e notFoundError) Error() string {
return e.err.Error()
}
func isNotFound(err error) bool {
_, ok := err.(notFoundError)
return ok
}
type alreadyExistError struct {
err error
}
func (e alreadyExistError) Error() string {
return e.err.Error()
}
func isAlreadyExists(err error) bool {
_, ok := err.(alreadyExistError)
return ok
}

230
webhook/util/writer/fs.go Normal file
View File

@ -0,0 +1,230 @@
/*
Copyright 2020 The Kruise Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package writer
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"github.com/openkruise/rollouts/webhook/util/generator"
"github.com/openkruise/rollouts/webhook/util/writer/atomic"
"k8s.io/klog/v2"
)
const (
FsCertWriter = "fs"
)
// fsCertWriter provisions the certificate by reading and writing to the filesystem.
type fsCertWriter struct {
// dnsName is the DNS name that the certificate is for.
dnsName string
*FSCertWriterOptions
}
// FSCertWriterOptions are options for constructing a FSCertWriter.
type FSCertWriterOptions struct {
// certGenerator generates the certificates.
CertGenerator generator.CertGenerator
// path is the directory that the certificate and private key and CA certificate will be written.
Path string
}
var _ CertWriter = &fsCertWriter{}
func (ops *FSCertWriterOptions) setDefaults() {
if ops.CertGenerator == nil {
ops.CertGenerator = &generator.SelfSignedCertGenerator{}
}
}
func (ops *FSCertWriterOptions) validate() error {
if len(ops.Path) == 0 {
return errors.New("path must be set in FSCertWriterOptions")
}
return nil
}
// NewFSCertWriter constructs a CertWriter that persists the certificate on filesystem.
func NewFSCertWriter(ops FSCertWriterOptions) (CertWriter, error) {
ops.setDefaults()
err := ops.validate()
if err != nil {
return nil, err
}
return &fsCertWriter{
FSCertWriterOptions: &ops,
}, nil
}
// EnsureCert provisions certificates for a webhookClientConfig by writing the certificates in the filesystem.
func (f *fsCertWriter) EnsureCert(dnsName string) (*generator.Artifacts, bool, error) {
// create or refresh cert and write it to fs
f.dnsName = dnsName
return handleCommon(f.dnsName, f)
}
func (f *fsCertWriter) write() (*generator.Artifacts, error) {
return f.doWrite()
}
func (f *fsCertWriter) overwrite(_ string) (*generator.Artifacts, error) {
return f.doWrite()
}
func (f *fsCertWriter) doWrite() (*generator.Artifacts, error) {
certs, err := f.CertGenerator.Generate(f.dnsName)
if err != nil {
return nil, err
}
if err = WriteCertsToDir(f.Path, certs); err != nil {
return nil, err
}
return certs, nil
}
func WriteCertsToDir(path string, certs *generator.Artifacts) error {
// Writer's algorithm only manages files using symbolic link.
// If a file is not a symbolic link, will ignore the update for it.
// We want to cleanup for Writer by removing old files that are not symbolic links.
err := prepareToWrite(path)
if err != nil {
return err
}
aw, err := atomic.NewAtomicWriter(path)
if err != nil {
return err
}
return aw.Write(certToProjectionMap(certs))
}
// prepareToWrite ensures it directory is compatible with the atomic.Writer library.
func prepareToWrite(dir string) error {
_, err := os.Stat(dir)
switch {
case os.IsNotExist(err):
klog.Info("cert directory doesn't exist, creating", "directory", dir)
// TODO: figure out if we can reduce the permission. (Now it's 0777)
err = os.MkdirAll(dir, 0777)
if err != nil {
return fmt.Errorf("can't create dir: %v", dir)
}
case err != nil:
return err
}
filenames := []string{CAKeyName, CACertName, ServerCertName, ServerCertName2, ServerKeyName, ServerKeyName2}
for _, f := range filenames {
abspath := path.Join(dir, f)
_, err := os.Stat(abspath)
if os.IsNotExist(err) {
continue
} else if err != nil {
klog.Error(err, "unable to stat file", "file", abspath)
}
_, err = os.Readlink(abspath)
// if it's not a symbolic link
if err != nil {
err = os.Remove(abspath)
if err != nil {
klog.Error(err, "unable to remove old file", "file", abspath)
}
}
}
return nil
}
func (f *fsCertWriter) read() (*generator.Artifacts, error) {
if err := ensureExist(f.Path); err != nil {
return nil, err
}
caKeyBytes, err := ioutil.ReadFile(path.Join(f.Path, CAKeyName))
if err != nil {
return nil, err
}
caCertBytes, err := ioutil.ReadFile(path.Join(f.Path, CACertName))
if err != nil {
return nil, err
}
certBytes, err := ioutil.ReadFile(path.Join(f.Path, ServerCertName))
if err != nil {
return nil, err
}
keyBytes, err := ioutil.ReadFile(path.Join(f.Path, ServerKeyName))
if err != nil {
return nil, err
}
return &generator.Artifacts{
CAKey: caKeyBytes,
CACert: caCertBytes,
Cert: certBytes,
Key: keyBytes,
}, nil
}
func ensureExist(dir string) error {
filenames := []string{CAKeyName, CACertName, ServerCertName, ServerCertName2, ServerKeyName, ServerKeyName2}
for _, filename := range filenames {
_, err := os.Stat(path.Join(dir, filename))
switch {
case err == nil:
continue
case os.IsNotExist(err):
return notFoundError{err}
default:
return err
}
}
return nil
}
func certToProjectionMap(cert *generator.Artifacts) map[string]atomic.FileProjection {
// TODO: figure out if we can reduce the permission. (Now it's 0666)
return map[string]atomic.FileProjection{
CAKeyName: {
Data: cert.CAKey,
Mode: 0666,
},
CACertName: {
Data: cert.CACert,
Mode: 0666,
},
ServerCertName: {
Data: cert.Cert,
Mode: 0666,
},
ServerCertName2: {
Data: cert.Cert,
Mode: 0666,
},
ServerKeyName: {
Data: cert.Key,
Mode: 0666,
},
ServerKeyName2: {
Data: cert.Key,
Mode: 0666,
},
}
}

View File

@ -0,0 +1,185 @@
/*
Copyright 2020 The Kruise Authors.
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package writer
import (
"context"
"errors"
"github.com/openkruise/rollouts/webhook/util/generator"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)
const (
SecretCertWriter = "secret"
)
// secretCertWriter provisions the certificate by reading and writing to the k8s secrets.
type secretCertWriter struct {
*SecretCertWriterOptions
// dnsName is the DNS name that the certificate is for.
dnsName string
}
// SecretCertWriterOptions is options for constructing a secretCertWriter.
type SecretCertWriterOptions struct {
// client talks to a kubernetes cluster for creating the secret.
Clientset clientset.Interface
// certGenerator generates the certificates.
CertGenerator generator.CertGenerator
// secret points the secret that contains certificates that written by the CertWriter.
Secret *types.NamespacedName
}
var _ CertWriter = &secretCertWriter{}
func (ops *SecretCertWriterOptions) setDefaults() {
if ops.CertGenerator == nil {
ops.CertGenerator = &generator.SelfSignedCertGenerator{}
}
}
func (ops *SecretCertWriterOptions) validate() error {
if ops.Clientset == nil {
return errors.New("client must be set in SecretCertWriterOptions")
}
if ops.Secret == nil {
return errors.New("secret must be set in SecretCertWriterOptions")
}
return nil
}
// NewSecretCertWriter constructs a CertWriter that persists the certificate in a k8s secret.
func NewSecretCertWriter(ops SecretCertWriterOptions) (CertWriter, error) {
ops.setDefaults()
err := ops.validate()
if err != nil {
return nil, err
}
return &secretCertWriter{
SecretCertWriterOptions: &ops,
}, nil
}
// EnsureCert provisions certificates for a webhookClientConfig by writing the certificates to a k8s secret.
func (s *secretCertWriter) EnsureCert(dnsName string) (*generator.Artifacts, bool, error) {
// Create or refresh the certs based on clientConfig
s.dnsName = dnsName
return handleCommon(s.dnsName, s)
}
var _ certReadWriter = &secretCertWriter{}
func (s *secretCertWriter) buildSecret() (*corev1.Secret, *generator.Artifacts, error) {
certs, err := s.CertGenerator.Generate(s.dnsName)
if err != nil {
return nil, nil, err
}
secret := certsToSecret(certs, *s.Secret)
return secret, certs, err
}
func (s *secretCertWriter) write() (*generator.Artifacts, error) {
secret, certs, err := s.buildSecret()
if err != nil {
return nil, err
}
_, err = s.Clientset.CoreV1().Secrets(secret.Namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
if apierrors.IsAlreadyExists(err) {
return nil, alreadyExistError{err}
}
return certs, err
}
func (s *secretCertWriter) overwrite(resourceVersion string) (
*generator.Artifacts, error) {
secret, certs, err := s.buildSecret()
if err != nil {
return nil, err
}
secret.ResourceVersion = resourceVersion
secret, err = s.Clientset.CoreV1().Secrets(secret.Namespace).Update(context.TODO(), secret, metav1.UpdateOptions{})
if err != nil {
klog.Infof("Cert writer update secret failed: %v", err)
return certs, err
}
klog.Infof("Cert writer update secret %s resourceVersion from %s to %s",
secret.Name, resourceVersion, secret.ResourceVersion)
return certs, err
}
func (s *secretCertWriter) read() (*generator.Artifacts, error) {
//secret := &corev1.Secret{
// TypeMeta: metav1.TypeMeta{
// APIVersion: "v1",
// Kind: "Secret",
// },
//}
secret, err := s.Clientset.CoreV1().Secrets(s.Secret.Namespace).Get(context.TODO(), s.Secret.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil, notFoundError{err}
} else if err != nil {
return nil, err
}
certs := secretToCerts(secret)
if certs != nil && certs.CACert != nil && certs.CAKey != nil {
// Store the CA for next usage.
s.CertGenerator.SetCA(certs.CAKey, certs.CACert)
}
return certs, nil
}
func secretToCerts(secret *corev1.Secret) *generator.Artifacts {
if secret.Data == nil {
return &generator.Artifacts{ResourceVersion: secret.ResourceVersion}
}
return &generator.Artifacts{
CAKey: secret.Data[CAKeyName],
CACert: secret.Data[CACertName],
Cert: secret.Data[ServerCertName],
Key: secret.Data[ServerKeyName],
ResourceVersion: secret.ResourceVersion,
}
}
func certsToSecret(certs *generator.Artifacts, sec types.NamespacedName) *corev1.Secret {
return &corev1.Secret{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Secret",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: sec.Namespace,
Name: sec.Name,
},
Data: map[string][]byte{
CAKeyName: certs.CAKey,
CACertName: certs.CACert,
ServerKeyName: certs.Key,
ServerKeyName2: certs.Key,
ServerCertName: certs.Cert,
ServerCertName2: certs.Cert,
},
}
}

View File

@ -0,0 +1,32 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mutating
import (
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
// +kubebuilder:webhook:path=/mutate-apps-kruise-io-v1alpha1-cloneset,mutating=true,failurePolicy=Ignore,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps.kruise.io,resources=clonesets,verbs=update,versions=v1alpha1,name=mcloneset.kb.io
// +kubebuilder:webhook:path=/mutate-apps-v1-deployment,mutating=true,failurePolicy=Ignore,sideEffects=None,admissionReviewVersions=v1;v1beta1,groups=apps,resources=deployments,verbs=update,versions=v1,name=mdeployment.kb.io
var (
// HandlerMap contains admission webhook handlers
HandlerMap = map[string]admission.Handler{
"mutate-apps-kruise-io-v1alpha1-cloneset": &WorkloadHandler{},
"mutate-apps-v1-deployment": &WorkloadHandler{},
}
)

View File

@ -0,0 +1,258 @@
/*
Copyright 2019 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mutating
import (
"context"
"encoding/json"
"net/http"
kruiseappsv1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
admissionv1 "k8s.io/api/admission/v1"
apps "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
// WorkloadHandler handles Pod
type WorkloadHandler struct {
// To use the client, you need to do the following:
// - uncomment it
// - import sigs.k8s.io/controller-runtime/pkg/client
// - uncomment the InjectClient method at the bottom of this file.
Client client.Client
// Decoder decodes objects
Decoder *admission.Decoder
Finder *util.ControllerFinder
}
var _ admission.Handler = &WorkloadHandler{}
// Handle handles admission requests.
// TODO
// Currently there is an implicit condition for rollout: the workload must be currently in a stable version (only one version of Pods),
// if not, it will not enter the rollout process. There is an additional problem here, the user may not be aware of this.
// when user does a release and thinks it enters the rollout process, but due to the implicit condition above,
// it actually goes through the normal release process. No good idea to solve this problem has been found yet.
func (h *WorkloadHandler) Handle(ctx context.Context, req admission.Request) admission.Response {
// if subResources, then ignore
if req.Operation != admissionv1.Update || req.SubResource != "" {
return admission.Allowed("")
}
switch req.Kind.Group {
// kruise cloneSet
case kruiseappsv1alpha1.GroupVersion.Group:
if req.Kind.Kind != util.ControllerKruiseKindCS.Kind {
return admission.Allowed("")
}
// check cloneset
newObj := &kruiseappsv1alpha1.CloneSet{}
if err := h.Decoder.Decode(req, newObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
oldObj := &kruiseappsv1alpha1.CloneSet{}
if err := h.Decoder.Decode(
admission.Request{AdmissionRequest: admissionv1.AdmissionRequest{Object: req.AdmissionRequest.OldObject}},
oldObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
err, changed := h.handlerCloneSet(newObj, oldObj)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if !changed {
return admission.Allowed("")
}
marshalled, err := json.Marshal(newObj)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(req.AdmissionRequest.Object.Raw, marshalled)
// native k8s deloyment
case apps.SchemeGroupVersion.Group:
if req.Kind.Kind != util.ControllerKindDep.Kind {
return admission.Allowed("")
}
// check deployment
newObj := &apps.Deployment{}
if err := h.Decoder.Decode(req, newObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
oldObj := &apps.Deployment{}
if err := h.Decoder.Decode(
admission.Request{AdmissionRequest: admissionv1.AdmissionRequest{Object: req.AdmissionRequest.OldObject}},
oldObj); err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
err, changed := h.handlerDeployment(newObj, oldObj)
if err != nil {
return admission.Errored(http.StatusBadRequest, err)
}
if !changed {
return admission.Allowed("")
}
marshalled, err := json.Marshal(newObj)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
return admission.PatchResponseFromRaw(req.AdmissionRequest.Object.Raw, marshalled)
}
return admission.Allowed("")
}
func (h *WorkloadHandler) handlerDeployment(newObj, oldObj *apps.Deployment) (err error, changed bool) {
// in rollout progressing
if state, _ := util.GetRolloutState(newObj.Annotations); state != nil {
// deployment paused=false is not allowed until the rollout is completed
if newObj.Spec.Paused == false {
changed = true
newObj.Spec.Paused = true
klog.Warningf("deployment(%s/%s) is in rollout(%s) progressing, and set paused=true", newObj.Namespace, newObj.Name, state.RolloutName)
}
return
}
// indicate whether the workload can enter the rollout process
// 1. replicas > 0
if newObj.Spec.Replicas != nil && *newObj.Spec.Replicas == 0 {
return
}
// 2. deployment.spec.strategy.type must be RollingUpdate
if newObj.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
klog.Warningf("deployment(%s/%s) strategy type is 'Recreate', rollout will not work on it", newObj.Namespace, newObj.Name)
return
}
// 3. deployment.spec.PodTemplate not change
if util.EqualIgnoreHash(&oldObj.Spec.Template, &newObj.Spec.Template) {
return
}
// 4. the deployment must be in a stable version (only one version of rs)
stableRs, err := h.Finder.GetDeploymentStableRs(newObj)
if err != nil {
return
} else if stableRs == nil {
return
}
// 5. have matched rollout crd
rollout, err := h.fetchMatchedRollout(newObj)
if err != nil {
return
} else if rollout == nil {
return
}
klog.Infof("deployment(%s/%s) will be in rollout progressing, and set paused=true", newObj.Namespace, newObj.Name)
changed = true
// need set workload paused = true
newObj.Spec.Paused = true
state := &util.RolloutState{RolloutName: rollout.Name}
by, _ := json.Marshal(state)
newObj.Annotations[util.InRolloutProgressingAnnotation] = string(by)
return
}
func (h *WorkloadHandler) fetchMatchedRollout(obj client.Object) (*appsv1alpha1.Rollout, error) {
oGv := obj.GetObjectKind().GroupVersionKind()
rolloutList := &appsv1alpha1.RolloutList{}
if err := h.Client.List(context.TODO(), rolloutList, &client.ListOptions{Namespace: obj.GetNamespace()}); err != nil {
klog.Errorf("WorkloadHandler List rollout failed: %s", err.Error())
return nil, err
}
for i := range rolloutList.Items {
rollout := &rolloutList.Items[i]
if !rollout.DeletionTimestamp.IsZero() || rollout.Spec.ObjectRef.Type == appsv1alpha1.RevisionRefType ||
rollout.Spec.ObjectRef.WorkloadRef == nil {
continue
}
ref := rollout.Spec.ObjectRef.WorkloadRef
gv, err := schema.ParseGroupVersion(ref.APIVersion)
if err != nil {
klog.Warningf("ParseGroupVersion rollout(%s/%s) ref failed: %s", rollout.Namespace, rollout.Name, err.Error())
continue
}
if oGv.Group == gv.Group && oGv.Kind == ref.Kind && obj.GetName() == ref.Name {
return rollout, nil
}
}
return nil, nil
}
func (h *WorkloadHandler) handlerCloneSet(newObj, oldObj *kruiseappsv1alpha1.CloneSet) (err error, changed bool) {
// in rollout progressing
if state, _ := util.GetRolloutState(newObj.Annotations); state != nil {
if newObj.Spec.UpdateStrategy.Paused == false {
changed = true
newObj.Spec.UpdateStrategy.Paused = true
klog.Warningf("cloneSet(%s/%s) is in rollout(%s) progressing, and set paused=true", newObj.Namespace, newObj.Name, state.RolloutName)
}
return
}
// indicate whether the workload can enter the rollout process
// 1. replicas > 0
if newObj.Spec.Replicas != nil && *newObj.Spec.Replicas == 0 {
return
}
// 2. cloneSet.spec.PodTemplate is changed
if util.EqualIgnoreHash(&oldObj.Spec.Template, &newObj.Spec.Template) {
return
}
// 3. the cloneSet must be in a stable version (only one version of pods)
if newObj.Status.UpdatedReplicas != newObj.Status.Replicas {
return
}
// 4. have matched rollout crd
rollout, err := h.fetchMatchedRollout(newObj)
if err != nil {
return
} else if rollout == nil {
return
}
klog.Infof("cloneSet(%s/%s) will be in rollout progressing, and paused", newObj.Namespace, newObj.Name)
changed = true
// need set workload paused = true
newObj.Spec.UpdateStrategy.Paused = true
state := &util.RolloutState{RolloutName: rollout.Name}
by, _ := json.Marshal(state)
newObj.Annotations[util.InRolloutProgressingAnnotation] = string(by)
return
}
var _ inject.Client = &WorkloadHandler{}
// InjectClient injects the client into the WorkloadHandler
func (h *WorkloadHandler) InjectClient(c client.Client) error {
h.Client = c
h.Finder = util.NewControllerFinder(c)
return nil
}
var _ admission.DecoderInjector = &WorkloadHandler{}
// InjectDecoder injects the decoder into the WorkloadHandler
func (h *WorkloadHandler) InjectDecoder(d *admission.Decoder) error {
h.Decoder = d
return nil
}

View File

@ -0,0 +1,419 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mutating
import (
"context"
"encoding/json"
"reflect"
"testing"
kruisev1aplphal "github.com/openkruise/kruise-api/apps/v1alpha1"
appsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
var (
scheme *runtime.Scheme
deploymentDemo = &apps.Deployment{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "Deployment",
},
ObjectMeta: metav1.ObjectMeta{
Name: "echoserver",
Labels: map[string]string{},
Annotations: map[string]string{},
UID: types.UID("281ba6f7-ff28-4779-940b-e966640c201f"),
},
Spec: apps.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "echoserver",
},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "echoserver",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "echoserver",
Image: "echoserver:v1",
},
},
},
},
},
}
rsDemo = &apps.ReplicaSet{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
},
ObjectMeta: metav1.ObjectMeta{
Name: "echoserver-v1",
Labels: map[string]string{
"app": "echoserver",
"pod-template-hash": "5b494f7bf",
},
Annotations: map[string]string{},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(deploymentDemo, schema.GroupVersionKind{
Group: apps.SchemeGroupVersion.Group,
Version: apps.SchemeGroupVersion.Version,
Kind: "Deployment",
}),
},
},
Spec: apps.ReplicaSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "echoserver",
},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "echoserver",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "echoserver",
Image: "echoserver:v1",
},
},
},
},
},
}
cloneSetDemo = &kruisev1aplphal.CloneSet{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
},
ObjectMeta: metav1.ObjectMeta{
Name: "echoserver",
Labels: map[string]string{},
Annotations: map[string]string{},
},
Spec: kruisev1aplphal.CloneSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "echoserver",
},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "echoserver",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "echoserver",
Image: "echoserver:v1",
},
},
},
},
},
}
rolloutDemo = &appsv1alpha1.Rollout{
ObjectMeta: metav1.ObjectMeta{
Name: "rollout-demo",
Labels: map[string]string{},
},
Spec: appsv1alpha1.RolloutSpec{
ObjectRef: appsv1alpha1.ObjectRef{
Type: appsv1alpha1.WorkloadRefType,
WorkloadRef: &appsv1alpha1.WorkloadRef{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: "echoserver",
},
},
},
}
)
func init() {
scheme = runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
_ = kruisev1aplphal.AddToScheme(scheme)
_ = appsv1alpha1.AddToScheme(scheme)
}
func TestHandlerDeployment(t *testing.T) {
cases := []struct {
name string
getObjs func() (*apps.Deployment, *apps.Deployment)
expectObj func() *apps.Deployment
getRollout func() *appsv1alpha1.Rollout
getRs func() []*apps.ReplicaSet
isError bool
}{
{
name: "deployment image v1->v2, matched rollout",
getObjs: func() (*apps.Deployment, *apps.Deployment) {
oldObj := deploymentDemo.DeepCopy()
newObj := deploymentDemo.DeepCopy()
newObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
return oldObj, newObj
},
expectObj: func() *apps.Deployment {
obj := deploymentDemo.DeepCopy()
obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo"}`
obj.Spec.Paused = true
return obj
},
getRs: func() []*apps.ReplicaSet {
rs := rsDemo.DeepCopy()
return []*apps.ReplicaSet{rs}
},
getRollout: func() *appsv1alpha1.Rollout {
return rolloutDemo.DeepCopy()
},
},
{
name: "deployment image v1->v2, no matched rollout",
getObjs: func() (*apps.Deployment, *apps.Deployment) {
oldObj := deploymentDemo.DeepCopy()
newObj := deploymentDemo.DeepCopy()
newObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
return oldObj, newObj
},
expectObj: func() *apps.Deployment {
obj := deploymentDemo.DeepCopy()
obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
return obj
},
getRs: func() []*apps.ReplicaSet {
rs := rsDemo.DeepCopy()
return []*apps.ReplicaSet{rs}
},
getRollout: func() *appsv1alpha1.Rollout {
obj := rolloutDemo.DeepCopy()
obj.Spec.ObjectRef.WorkloadRef = &appsv1alpha1.WorkloadRef{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: "other",
}
return obj
},
},
{
name: "deployment image v2->v3, matched rollout, but multiple rss",
getObjs: func() (*apps.Deployment, *apps.Deployment) {
oldObj := deploymentDemo.DeepCopy()
oldObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
newObj := deploymentDemo.DeepCopy()
newObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v3"
return oldObj, newObj
},
expectObj: func() *apps.Deployment {
obj := deploymentDemo.DeepCopy()
obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v3"
return obj
},
getRs: func() []*apps.ReplicaSet {
rs1 := rsDemo.DeepCopy()
rs2 := rsDemo.DeepCopy()
rs2.Name = "echoserver-v2"
rs2.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
return []*apps.ReplicaSet{rs1, rs2}
},
getRollout: func() *appsv1alpha1.Rollout {
return rolloutDemo.DeepCopy()
},
},
{
name: "set deployment paused = false, matched rollout, in progressing, reject",
getObjs: func() (*apps.Deployment, *apps.Deployment) {
oldObj := deploymentDemo.DeepCopy()
oldObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
oldObj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo","RolloutDone":false}`
oldObj.Spec.Paused = true
newObj := deploymentDemo.DeepCopy()
newObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
newObj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo","RolloutDone":false}`
newObj.Spec.Paused = false
return oldObj, newObj
},
expectObj: func() *apps.Deployment {
obj := deploymentDemo.DeepCopy()
obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo","RolloutDone":false}`
obj.Spec.Paused = true
return obj
},
getRs: func() []*apps.ReplicaSet {
rs := rsDemo.DeepCopy()
return []*apps.ReplicaSet{rs}
},
getRollout: func() *appsv1alpha1.Rollout {
return rolloutDemo.DeepCopy()
},
},
{
name: "set deployment paused = false, matched rollout, in finalising, allow",
getObjs: func() (*apps.Deployment, *apps.Deployment) {
oldObj := deploymentDemo.DeepCopy()
oldObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
oldObj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo","RolloutDone":true}`
oldObj.Spec.Paused = true
newObj := deploymentDemo.DeepCopy()
newObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
newObj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo","RolloutDone":true}`
newObj.Spec.Paused = false
return oldObj, newObj
},
expectObj: func() *apps.Deployment {
obj := deploymentDemo.DeepCopy()
obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo","RolloutDone":true}`
obj.Spec.Paused = true
return obj
},
getRs: func() []*apps.ReplicaSet {
rs := rsDemo.DeepCopy()
return []*apps.ReplicaSet{rs}
},
getRollout: func() *appsv1alpha1.Rollout {
obj := rolloutDemo.DeepCopy()
return obj
},
},
}
decoder, _ := admission.NewDecoder(scheme)
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
client := fake.NewClientBuilder().WithScheme(scheme).Build()
h := WorkloadHandler{
Client: client,
Decoder: decoder,
Finder: util.NewControllerFinder(client),
}
rollout := cs.getRollout()
if err := client.Create(context.TODO(), rollout); err != nil {
t.Errorf(err.Error())
}
for _, rs := range cs.getRs() {
if err := client.Create(context.TODO(), rs); err != nil {
t.Errorf(err.Error())
}
}
oldObj, newObj := cs.getObjs()
err, _ := h.handlerDeployment(newObj, oldObj)
if cs.isError && err == nil {
t.Fatal("handlerDeployment failed")
} else if !cs.isError && err != nil {
t.Fatalf(err.Error())
}
if !reflect.DeepEqual(newObj, cs.expectObj()) {
by, _ := json.Marshal(newObj)
t.Fatalf("handlerDeployment failed, and new(%s)", string(by))
}
})
}
}
func TestHandlerCloneSet(t *testing.T) {
cases := []struct {
name string
getObjs func() (*kruisev1aplphal.CloneSet, *kruisev1aplphal.CloneSet)
expectObj func() *kruisev1aplphal.CloneSet
getRollout func() *appsv1alpha1.Rollout
isError bool
}{
{
name: "cloneSet image v1->v2, matched rollout",
getObjs: func() (*kruisev1aplphal.CloneSet, *kruisev1aplphal.CloneSet) {
oldObj := cloneSetDemo.DeepCopy()
newObj := cloneSetDemo.DeepCopy()
newObj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
return oldObj, newObj
},
expectObj: func() *kruisev1aplphal.CloneSet {
obj := cloneSetDemo.DeepCopy()
obj.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
obj.Annotations[util.InRolloutProgressingAnnotation] = `{"RolloutName":"rollout-demo"}`
obj.Spec.UpdateStrategy.Paused = true
return obj
},
getRollout: func() *appsv1alpha1.Rollout {
obj := rolloutDemo.DeepCopy()
obj.Spec.ObjectRef.WorkloadRef = &appsv1alpha1.WorkloadRef{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
Name: "echoserver",
}
return obj
},
},
}
decoder, _ := admission.NewDecoder(scheme)
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
client := fake.NewClientBuilder().WithScheme(scheme).Build()
h := WorkloadHandler{
Client: client,
Decoder: decoder,
Finder: util.NewControllerFinder(client),
}
rollout := cs.getRollout()
if err := client.Create(context.TODO(), rollout); err != nil {
t.Errorf(err.Error())
}
oldObj, newObj := cs.getObjs()
err, _ := h.handlerCloneSet(newObj, oldObj)
if cs.isError && err == nil {
t.Fatal("handlerCloneSet failed")
} else if !cs.isError && err != nil {
t.Fatalf(err.Error())
}
if !reflect.DeepEqual(newObj, cs.expectObj()) {
by, _ := json.Marshal(newObj)
t.Fatalf("handlerCloneSet failed, and new(%s)", string(by))
}
})
}
}