add rollout history controller

Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>
Signed-off-by: yike21 <yike21@qq.com>

Signed-off-by: yike21 <yike21@qq.com>
This commit is contained in:
berg 2022-07-22 18:47:37 +08:00 committed by yike21
parent f21c3fb763
commit 3421d8fbd8
12 changed files with 6420 additions and 8 deletions

View File

@ -0,0 +1,110 @@
name: E2E-RolloutHistory-CloneSet-1.23
on:
push:
branches:
- master
- release-*
pull_request: {}
workflow_dispatch: {}
env:
# Common versions
GO_VERSION: '1.17'
KIND_IMAGE: 'kindest/node:v1.23.3'
KIND_CLUSTER_NAME: 'ci-testing'
jobs:
rollout:
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: ${{ env.GO_VERSION }}
- name: Setup Kind Cluster
uses: helm/kind-action@v1.2.0
with:
node_image: ${{ env.KIND_IMAGE }}
cluster_name: ${{ env.KIND_CLUSTER_NAME }}
config: ./test/kind-conf.yaml
- name: Build image
run: |
export IMAGE="openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID}"
docker build --pull --no-cache . -t $IMAGE
kind load docker-image --name=${KIND_CLUSTER_NAME} $IMAGE || { echo >&2 "kind not installed or error loading image: $IMAGE"; exit 1; }
- name: Install Kruise
run: |
set -ex
kubectl cluster-info
make helm
helm repo add openkruise https://openkruise.github.io/charts/
helm repo update
helm install kruise openkruise/kruise
for ((i=1;i<10;i++));
do
set +e
PODS=$(kubectl get pod -n kruise-system | grep '1/1' | grep kruise-controller-manager | wc -l)
set -e
if [ "$PODS" -eq "2" ]; then
break
fi
sleep 3
done
set +e
PODS=$(kubectl get pod -n kruise-system | grep '1/1' | grep kruise-controller-manager | wc -l)
set -e
if [ "$PODS" -eq "2" ]; then
echo "Wait for kruise-manager ready successfully"
else
echo "Timeout to wait for kruise-manager ready"
exit 1
fi
- name: Install Kruise Rollout
run: |
set -ex
kubectl cluster-info
IMG=openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh
for ((i=1;i<10;i++));
do
set +e
PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l)
set -e
if [ "$PODS" -eq "1" ]; then
break
fi
sleep 3
done
set +e
PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l)
kubectl get node -o yaml
kubectl get all -n kruise-rollout -o yaml
set -e
if [ "$PODS" -eq "1" ]; then
echo "Wait for kruise-rollout ready successfully"
else
echo "Timeout to wait for kruise-rollout ready"
exit 1
fi
- name: Run E2E Tests
run: |
export KUBECONFIG=/home/runner/.kube/config
make ginkgo
set +e
./bin/ginkgo -timeout 60m -v --focus='CloneSet canary rollout with RolloutHistory' test/e2e
retVal=$?
# kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout
restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}')
if [ "${restartCount}" -eq "0" ];then
echo "Kruise-rollout has not restarted"
else
kubectl get pod -n kruise-rollout --no-headers
echo "Kruise-rollout has restarted, abort!!!"
kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout
exit 1
fi
exit $retVal

View File

@ -0,0 +1,110 @@
name: E2E-RolloutHistory-StatefulSet-1.23
on:
push:
branches:
- master
- release-*
pull_request: {}
workflow_dispatch: {}
env:
# Common versions
GO_VERSION: '1.17'
KIND_IMAGE: 'kindest/node:v1.23.3'
KIND_CLUSTER_NAME: 'ci-testing'
jobs:
rollout:
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: ${{ env.GO_VERSION }}
- name: Setup Kind Cluster
uses: helm/kind-action@v1.2.0
with:
node_image: ${{ env.KIND_IMAGE }}
cluster_name: ${{ env.KIND_CLUSTER_NAME }}
config: ./test/kind-conf.yaml
- name: Build image
run: |
export IMAGE="openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID}"
docker build --pull --no-cache . -t $IMAGE
kind load docker-image --name=${KIND_CLUSTER_NAME} $IMAGE || { echo >&2 "kind not installed or error loading image: $IMAGE"; exit 1; }
- name: Install Kruise
run: |
set -ex
kubectl cluster-info
make helm
helm repo add openkruise https://openkruise.github.io/charts/
helm repo update
helm install kruise openkruise/kruise
for ((i=1;i<10;i++));
do
set +e
PODS=$(kubectl get pod -n kruise-system | grep '1/1' | grep kruise-controller-manager | wc -l)
set -e
if [ "$PODS" -eq "2" ]; then
break
fi
sleep 3
done
set +e
PODS=$(kubectl get pod -n kruise-system | grep '1/1' | grep kruise-controller-manager | wc -l)
set -e
if [ "$PODS" -eq "2" ]; then
echo "Wait for kruise-manager ready successfully"
else
echo "Timeout to wait for kruise-manager ready"
exit 1
fi
- name: Install Kruise Rollout
run: |
set -ex
kubectl cluster-info
IMG=openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh
for ((i=1;i<10;i++));
do
set +e
PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l)
set -e
if [ "$PODS" -eq "1" ]; then
break
fi
sleep 3
done
set +e
PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l)
kubectl get node -o yaml
kubectl get all -n kruise-rollout -o yaml
set -e
if [ "$PODS" -eq "1" ]; then
echo "Wait for kruise-rollout ready successfully"
else
echo "Timeout to wait for kruise-rollout ready"
exit 1
fi
- name: Run E2E Tests
run: |
export KUBECONFIG=/home/runner/.kube/config
make ginkgo
set +e
./bin/ginkgo -timeout 60m -v --focus='StatefulSet canary rollout with RolloutHistory' test/e2e
retVal=$?
# kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout
restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}')
if [ "${restartCount}" -eq "0" ];then
echo "Kruise-rollout has not restarted"
else
kubectl get pod -n kruise-rollout --no-headers
echo "Kruise-rollout has restarted, abort!!!"
kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout
exit 1
fi
exit $retVal

View File

@ -0,0 +1,205 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// RolloutHistorySpec defines the desired state of RolloutHistory
type RolloutHistorySpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
// RolloutWrapper indicates information of the rollout related with rollouthistory
RolloutWrapper RolloutWrapper `json:"rolloutWrapper,omitempty"`
// Workload indicates information of the workload, such as cloneset, deployment, advanced statefulset
Workload Workload `json:"workload,omitempty"`
// ServiceWrapper indicates information of the service related with workload
ServiceWrapper ServiceWrapper `json:"serviceWrapper,omitempty"`
// TrafficRoutingWrapper indicates information of traffic route related with workload
TrafficRoutingWrapper TrafficRoutingWrapper `json:"trafficRoutingWrapper,omitempty"`
}
// RolloutWrapper indicates information of the rollout related
type RolloutWrapper struct {
// Name indicates the rollout name
Name string `json:"name,omitempty"`
// Rollout indecates the related rollout
Rollout Rollout `json:"rollout,omitempty"`
}
// ServiceWrapper indicates information of the service related
type ServiceWrapper struct {
// Name indicates the service name
Name string `json:"name,omitempty"`
// Service indicates the service
Service *v1.Service `json:"service,omitempty"`
}
// TrafficRoutingWrapper indicates information of Gateway API or Ingress
type TrafficRoutingWrapper struct {
// Ingress indicates information of ingress
// +optional
IngressWrapper *IngressWrapper `json:"ingressWrapper,omitempty"`
// HTTPRouteWrapper indacates information of Gateway API
// +optional
HTTPRouteWrapper *HTTPRouteWrapper `json:"httpRouteWrapper,omitempty"`
}
// IngressWrapper indicates information of the ingress related
type IngressWrapper struct {
// Name indicates the ingress name
Name string `json:"name,omitempty"`
// Ingress indicates the ingress
Ingress *networkingv1.Ingress `json:"ingress,omitempty"`
}
// HTTPRouteWrapper indicates information of gateway API
type HTTPRouteWrapper struct {
//Name indicates the httproute name
Name string `json:"name,omitempty"`
// HTTPRoute indicates the HTTPRoute
HTTPRoute *v1alpha2.HTTPRoute `json:"httpRoute,omitempty"`
}
// Workload indicates information of the workload, such as cloneset, deployment, advanced statefulset
type Workload struct {
metav1.TypeMeta `json:",inline"`
// Name indicates the workload name
Name string `json:"name,omitempty"`
// Label selector for pods.
// It must match the pod template's labels.
Selector *metav1.LabelSelector `json:"selector" protobuf:"bytes,2,opt,name=selector"`
// Number of desired pods. This is a pointer to distinguish between explicit
// zero and not specified. Defaults to 1.
// +optional
Replicas *int32 `json:"replicas,omitempty" protobuf:"varint,1,opt,name=replicas"`
// Type of deployment or CloneSetUpdateStrategy. Can be "Recreate" or "RollingUpdate".
// +optional
Type string `json:"type,omitempty" protobuf:"bytes,1,opt,name=type,casttype=DeploymentStrategyType"`
// Template describes the pods that will be created.
// +kubebuilder:pruning:PreserveUnknownFields
// +kubebuilder:validation:Schemaless
Template v1.PodTemplateSpec `json:"template"`
}
// RolloutHistoryStatus defines the observed state of RolloutHistory
type RolloutHistoryStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
// Phase indicates phase of RolloutHistory, such as "pending", "progressing", "completed"
Phase string `json:"phase,omitempty"`
// CanaryStepIndex indicates the current step
CanaryStepIndex *int32 `json:"canaryStepIndex,omitempty"`
// CanaryStepState indicates state of this rollout revision, such as "init", "pending", "update", "terminated", "completed", "cancelled", whick is upon rollout canary_step_state
CanaryStepState string `json:"canaryStepState,omitempty"`
// RolloutState indicates the rollouts status
RolloutState RolloutState `json:"rolloutState,omitempty"`
// canaryStepPods indicates the pods released
CanaryStepPods []CanaryStepPods `json:"canaryStepPods,omitempty"`
}
// RolloutState indicates the rollouts status
type RolloutState struct {
// RolloutPhase is the rollout phase.
RolloutPhase RolloutPhase `json:"rolloutPhase,omitempty"`
// Message provides details on why the rollout is in its current phase
Message string `json:"message,omitempty"`
}
// CanaryStepPods indicates the pods for a revision
type CanaryStepPods struct {
// StepIndex indicates the step index
StepIndex int32 `json:"stepIndex,omitempty"`
// Pods indicates the pods information
Pods []Pod `json:"pods,omitempty"`
// PodsInTotal indicates the num of new pods released by now
PodsInTotal int32 `json:"podsInTotal"`
// PodsInStep indicates the num of new pods released this step
PodsInStep int32 `json:"podsInStep"`
}
// Pod indicates the information of a pod, including name, ip, node_name.
type Pod struct {
// Name indicates the node name
Name string `json:"name,omitempty"`
// IP indicates the pod ip
IP string `json:"ip,omitempty"`
// Node indicates the node which pod is located at
Node string `json:"node,omitempty"`
}
// CanaryStepState indicates canary-phase of rollouthistory when user do a rollout
const (
CanaryStateInit string = "init"
CanaryStatePending string = "pending"
CanaryStateUpdated string = "updated"
CanaryStateTerminated string = "terminated"
CanaryStateCompleted string = "completed"
)
// Phase indicates rollouthistory status/phase
const (
PhaseInit string = "init"
PhaseCompleted string = "completed"
PhaseProgressing string = "progressing"
)
// MaxRolloutHistoryNum indicates how many rollouthistories there can be at most
const MaxRolloutHistoryNum int = 10
// +genclient
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="PHASE",type="string",JSONPath=".status.phase",description="Phase indicates phase of RolloutHistory, such as pending, progressing, completed"
//+kubebuilder:printcolumn:name="CURRENT_STEP",type="integer",JSONPath=".status.canaryStepIndex",description="CanaryStepIndex indicates the current step"
//+kubebuilder:printcolumn:name="CURRENT_STATE",type="string",JSONPath=".status.canaryStepState",description="CanaryStepState indicates state of this rollout revision, such as init, pending, update, terminated, completed, cancelled, whick is upon rollout canary_step_state"
//+kubebuilder:printcolumn:name="MESSAGE",type="string",JSONPath=".status.rolloutState.message",description="Message provides details on why the rollout is in its current phase"
//+kubebuilder:printcolumn:name="AGE",type=date,JSONPath=".metadata.creationTimestamp"
// RolloutHistory is the Schema for the rollouthistories API
type RolloutHistory struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec RolloutHistorySpec `json:"spec,omitempty"`
Status RolloutHistoryStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// RolloutHistoryList contains a list of RolloutHistory
type RolloutHistoryList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []RolloutHistory `json:"items"`
}
func init() {
SchemeBuilder.Register(&RolloutHistory{}, &RolloutHistoryList{})
}

View File

@ -1,4 +1,3 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
@ -22,8 +21,12 @@ limitations under the License.
package v1alpha1
import (
"k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/gateway-api/apis/v1alpha2"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
@ -199,6 +202,26 @@ func (in *CanaryStep) DeepCopy() *CanaryStep {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CanaryStepPods) DeepCopyInto(out *CanaryStepPods) {
*out = *in
if in.Pods != nil {
in, out := &in.Pods, &out.Pods
*out = make([]Pod, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CanaryStepPods.
func (in *CanaryStepPods) DeepCopy() *CanaryStepPods {
if in == nil {
return nil
}
out := new(CanaryStepPods)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *CanaryStrategy) DeepCopyInto(out *CanaryStrategy) {
*out = *in
@ -252,6 +275,26 @@ func (in *GatewayTrafficRouting) DeepCopy() *GatewayTrafficRouting {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *HTTPRouteWrapper) DeepCopyInto(out *HTTPRouteWrapper) {
*out = *in
if in.HTTPRoute != nil {
in, out := &in.HTTPRoute, &out.HTTPRoute
*out = new(v1alpha2.HTTPRoute)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPRouteWrapper.
func (in *HTTPRouteWrapper) DeepCopy() *HTTPRouteWrapper {
if in == nil {
return nil
}
out := new(HTTPRouteWrapper)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *IngressTrafficRouting) DeepCopyInto(out *IngressTrafficRouting) {
*out = *in
@ -267,6 +310,26 @@ func (in *IngressTrafficRouting) DeepCopy() *IngressTrafficRouting {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *IngressWrapper) DeepCopyInto(out *IngressWrapper) {
*out = *in
if in.Ingress != nil {
in, out := &in.Ingress, &out.Ingress
*out = new(networkingv1.Ingress)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IngressWrapper.
func (in *IngressWrapper) DeepCopy() *IngressWrapper {
if in == nil {
return nil
}
out := new(IngressWrapper)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ObjectRef) DeepCopyInto(out *ObjectRef) {
*out = *in
@ -287,6 +350,21 @@ func (in *ObjectRef) DeepCopy() *ObjectRef {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Pod) DeepCopyInto(out *Pod) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Pod.
func (in *Pod) DeepCopy() *Pod {
if in == nil {
return nil
}
out := new(Pod)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ReleaseBatch) DeepCopyInto(out *ReleaseBatch) {
*out = *in
@ -372,6 +450,112 @@ func (in *RolloutCondition) DeepCopy() *RolloutCondition {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RolloutHistory) DeepCopyInto(out *RolloutHistory) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
in.Status.DeepCopyInto(&out.Status)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RolloutHistory.
func (in *RolloutHistory) DeepCopy() *RolloutHistory {
if in == nil {
return nil
}
out := new(RolloutHistory)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *RolloutHistory) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RolloutHistoryList) DeepCopyInto(out *RolloutHistoryList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]RolloutHistory, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RolloutHistoryList.
func (in *RolloutHistoryList) DeepCopy() *RolloutHistoryList {
if in == nil {
return nil
}
out := new(RolloutHistoryList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *RolloutHistoryList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RolloutHistorySpec) DeepCopyInto(out *RolloutHistorySpec) {
*out = *in
in.RolloutWrapper.DeepCopyInto(&out.RolloutWrapper)
in.Workload.DeepCopyInto(&out.Workload)
in.ServiceWrapper.DeepCopyInto(&out.ServiceWrapper)
in.TrafficRoutingWrapper.DeepCopyInto(&out.TrafficRoutingWrapper)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RolloutHistorySpec.
func (in *RolloutHistorySpec) DeepCopy() *RolloutHistorySpec {
if in == nil {
return nil
}
out := new(RolloutHistorySpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RolloutHistoryStatus) DeepCopyInto(out *RolloutHistoryStatus) {
*out = *in
if in.CanaryStepIndex != nil {
in, out := &in.CanaryStepIndex, &out.CanaryStepIndex
*out = new(int32)
**out = **in
}
out.RolloutState = in.RolloutState
if in.CanaryStepPods != nil {
in, out := &in.CanaryStepPods, &out.CanaryStepPods
*out = make([]CanaryStepPods, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RolloutHistoryStatus.
func (in *RolloutHistoryStatus) DeepCopy() *RolloutHistoryStatus {
if in == nil {
return nil
}
out := new(RolloutHistoryStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RolloutList) DeepCopyInto(out *RolloutList) {
*out = *in
@ -441,6 +625,21 @@ func (in *RolloutSpec) DeepCopy() *RolloutSpec {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RolloutState) DeepCopyInto(out *RolloutState) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RolloutState.
func (in *RolloutState) DeepCopy() *RolloutState {
if in == nil {
return nil
}
out := new(RolloutState)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RolloutStatus) DeepCopyInto(out *RolloutStatus) {
*out = *in
@ -488,6 +687,42 @@ func (in *RolloutStrategy) DeepCopy() *RolloutStrategy {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RolloutWrapper) DeepCopyInto(out *RolloutWrapper) {
*out = *in
in.Rollout.DeepCopyInto(&out.Rollout)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RolloutWrapper.
func (in *RolloutWrapper) DeepCopy() *RolloutWrapper {
if in == nil {
return nil
}
out := new(RolloutWrapper)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ServiceWrapper) DeepCopyInto(out *ServiceWrapper) {
*out = *in
if in.Service != nil {
in, out := &in.Service, &out.Service
*out = new(v1.Service)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ServiceWrapper.
func (in *ServiceWrapper) DeepCopy() *ServiceWrapper {
if in == nil {
return nil
}
out := new(ServiceWrapper)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TrafficRouting) DeepCopyInto(out *TrafficRouting) {
*out = *in
@ -513,6 +748,58 @@ func (in *TrafficRouting) DeepCopy() *TrafficRouting {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TrafficRoutingWrapper) DeepCopyInto(out *TrafficRoutingWrapper) {
*out = *in
if in.IngressWrapper != nil {
in, out := &in.IngressWrapper, &out.IngressWrapper
*out = new(IngressWrapper)
(*in).DeepCopyInto(*out)
}
if in.HTTPRouteWrapper != nil {
in, out := &in.HTTPRouteWrapper, &out.HTTPRouteWrapper
*out = new(HTTPRouteWrapper)
(*in).DeepCopyInto(*out)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TrafficRoutingWrapper.
func (in *TrafficRoutingWrapper) DeepCopy() *TrafficRoutingWrapper {
if in == nil {
return nil
}
out := new(TrafficRoutingWrapper)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Workload) DeepCopyInto(out *Workload) {
*out = *in
out.TypeMeta = in.TypeMeta
if in.Selector != nil {
in, out := &in.Selector, &out.Selector
*out = new(metav1.LabelSelector)
(*in).DeepCopyInto(*out)
}
if in.Replicas != nil {
in, out := &in.Replicas, &out.Replicas
*out = new(int32)
**out = **in
}
in.Template.DeepCopyInto(&out.Template)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Workload.
func (in *Workload) DeepCopy() *Workload {
if in == nil {
return nil
}
out := new(Workload)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *WorkloadRef) DeepCopyInto(out *WorkloadRef) {
*out = *in

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,7 @@
resources:
- bases/rollouts.kruise.io_rollouts.yaml
- bases/rollouts.kruise.io_batchreleases.yaml
- bases/rollouts.kruise.io_rollouthistories.yaml
#+kubebuilder:scaffold:crdkustomizeresource
patchesStrategicMerge:
@ -11,12 +12,14 @@ patchesStrategicMerge:
# patches here are for enabling the conversion webhook for each CRD
#- patches/webhook_in_rollouts.yaml
#- patches/webhook_in_batchreleases.yaml
#- patches/webhook_in_rollouthistories.yaml
#+kubebuilder:scaffold:crdkustomizewebhookpatch
# [CERTMANAGER] To enable webhook, uncomment all the sections with [CERTMANAGER] prefix.
# patches here are for enabling the CA injection for each CRD
#- patches/cainjection_in_rollouts.yaml
#- patches/cainjection_in_batchreleases.yaml
#- patches/cainjection_in_rollouthistories.yaml
#+kubebuilder:scaffold:crdkustomizecainjectionpatch
# the following config is for teaching kustomize how to do kustomization for CRDs.

View File

@ -262,6 +262,32 @@ rules:
- get
- patch
- update
- apiGroups:
- rollouts.kruise.io
resources:
- rollouthistories
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- rollouts.kruise.io
resources:
- rollouthistories/finalizers
verbs:
- update
- apiGroups:
- rollouts.kruise.io
resources:
- rollouthistories/status
verbs:
- get
- patch
- update
- apiGroups:
- rollouts.kruise.io
resources:

22
main.go
View File

@ -22,12 +22,6 @@ import (
kruisev1aplphal1 "github.com/openkruise/kruise-api/apps/v1alpha1"
kruisev1beta1 "github.com/openkruise/kruise-api/apps/v1beta1"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
br "github.com/openkruise/rollouts/pkg/controller/batchrelease"
"github.com/openkruise/rollouts/pkg/controller/rollout"
"github.com/openkruise/rollouts/pkg/util"
utilclient "github.com/openkruise/rollouts/pkg/util/client"
"github.com/openkruise/rollouts/pkg/webhook"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
@ -37,6 +31,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/healthz"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
rolloutsv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
br "github.com/openkruise/rollouts/pkg/controller/batchrelease"
"github.com/openkruise/rollouts/pkg/controller/rollout"
"github.com/openkruise/rollouts/pkg/controller/rollouthistory"
"github.com/openkruise/rollouts/pkg/util"
utilclient "github.com/openkruise/rollouts/pkg/util/client"
"github.com/openkruise/rollouts/pkg/webhook"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
@ -109,6 +111,14 @@ func main() {
os.Exit(1)
}
if err = (&rollouthistory.RolloutHistoryReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("rollouthistory-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RolloutHistory")
os.Exit(1)
}
//+kubebuilder:scaffold:builder
setupLog.Info("setup webhook")
if err = webhook.SetupWithManager(mgr); err != nil {

View File

@ -0,0 +1,100 @@
/*
Copyright 2022.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rollouthistory
import (
"context"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
rolloutv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
)
// RolloutHistoryReconciler reconciles a RolloutHistory object
type RolloutHistoryReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
}
//+kubebuilder:rbac:groups=rollouts.kruise.io,resources=rollouthistories,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=rollouts.kruise.io,resources=rollouthistories/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=rollouts.kruise.io,resources=rollouthistories/finalizers,verbs=update
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the RolloutHistory object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile
func (r *RolloutHistoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// get rollout
rollout := &rolloutv1alpha1.Rollout{}
err := r.Get(ctx, req.NamespacedName, rollout)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// get RolloutHistory which is not completed and related to the rollout (only one or zero)
var rollouthistory *rolloutv1alpha1.RolloutHistory
if rollouthistory, err = r.getRHByRollout(rollout); err != nil {
// if not find RolloutHistory related this rollout, skip some situations
if rollout.Status.CanaryStatus == nil || rollout.Spec.Strategy.Canary == nil || rollout.Spec.ObjectRef.WorkloadRef == nil || rollout.Spec.RolloutID == "" || rollout.Status.CanaryStatus.CurrentStepIndex != 0 {
return ctrl.Result{}, nil
}
// create a rollouthistory when user do a new rollout
if rollouthistory, err = r.createRHByRollout(rollout); err != nil {
return ctrl.Result{}, err
}
}
// handle rollouthistory according to rollouthistory.status.canaryStepState
switch rollouthistory.Status.CanaryStepState {
case rolloutv1alpha1.CanaryStateInit:
err = r.handleInit(rollout, rollouthistory)
case rolloutv1alpha1.CanaryStatePending:
err = r.handlePending(rollout, rollouthistory)
case rolloutv1alpha1.CanaryStateUpdated:
err = r.handleUpdated(rollout, rollouthistory)
case rolloutv1alpha1.CanaryStateTerminated:
err = r.handleTerminated(rollout, rollouthistory)
case rolloutv1alpha1.CanaryStateCompleted:
err = r.handleCompleted(rollout, rollouthistory)
}
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *RolloutHistoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&rolloutv1alpha1.Rollout{}).
Complete(r)
}

View File

@ -0,0 +1,669 @@
/*
Copyright 2022.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rollouthistory
import (
"context"
"errors"
"fmt"
"sort"
kruiseapi "github.com/openkruise/kruise-api/apps/v1alpha1"
kruiseapi_beta1 "github.com/openkruise/kruise-api/apps/v1beta1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
v1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
rolloutv1alpha1 "github.com/openkruise/rollouts/api/v1alpha1"
"github.com/openkruise/rollouts/pkg/util"
)
// get rollouthistory according to rollout. The rollouthistory should be not completed now
func (r *RolloutHistoryReconciler) getRHByRollout(rollout *rolloutv1alpha1.Rollout) (*rolloutv1alpha1.RolloutHistory, error) {
// get all rollouthistories
rhList := &rolloutv1alpha1.RolloutHistoryList{}
err := r.List(context.TODO(), rhList, &client.ListOptions{}, client.InNamespace(rollout.Namespace))
if err != nil {
return nil, err
}
// if the num of rollouthistory is greater than MaxRolloutHistoryNum, it will delete excess rollouthistories.
// sort rollouthistories by creationTimeStamp
result := rhList.Items
sort.Slice(result, func(i, j int) bool {
return result[i].CreationTimestamp.Before(&result[j].CreationTimestamp)
})
lenth := len(result)
if lenth > rolloutv1alpha1.MaxRolloutHistoryNum {
for _, rollouthistory := range result[:lenth-rolloutv1alpha1.MaxRolloutHistoryNum] {
thisRolloutHistory := rollouthistory
err = r.Delete(context.TODO(), &thisRolloutHistory, &client.DeleteOptions{})
if err != nil {
return nil, err
}
}
result = append(result[:0], result[lenth-rolloutv1alpha1.MaxRolloutHistoryNum:]...)
}
// get target rollouthistory according to rollout.Spec.RolloutID and rollout.Name
for i := range result {
RH := result[i]
if rollout.Spec.RolloutID == RH.Spec.RolloutWrapper.Rollout.Spec.RolloutID &&
rollout.Name == RH.Spec.RolloutWrapper.Name &&
RH.Status.Phase != rolloutv1alpha1.PhaseCompleted {
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.getRHByRollout successed", " ")
return &RH, nil
}
}
return nil, errors.New("getRHByRollout can't find RH")
}
// create a rollouthistory which happens when user do a rollout
func (r *RolloutHistoryReconciler) createRHByRollout(rollout *rolloutv1alpha1.Rollout) (*rolloutv1alpha1.RolloutHistory, error) {
logger := log.FromContext(context.TODO())
// init .spec.rolloutWrapper
rolloutWrapper, err := r.initRolloutWrapper(rollout)
if err != nil {
logger.Info(err.Error() + " after r.initRolloutInfo")
return nil, err
}
// init .spec.workload
workload, err := r.initWorkloadInfo(rollout)
if err != nil {
logger.Info(err.Error() + " after r.initWorkloadInfo")
return nil, err
}
// init .spec.serviceWrapper
serviceWrapper, err := r.initServiceWrapper(rollout, workload)
if err != nil {
logger.Info(err.Error() + " after r.initServiceInfo")
return nil, err
}
// init .spec.traffcRoutingWrapper
trafficRoutingWrapper, err := r.initTrafficRoutingWrapper(rollout)
if err != nil {
logger.Info(err.Error() + " after r.initIngressInfo")
return nil, err
}
// init rollouthistory
RH := &rolloutv1alpha1.RolloutHistory{
TypeMeta: v1.TypeMeta{
Kind: "RolloutHistory",
APIVersion: "rollouts.kruise.io/v1alpha1",
},
ObjectMeta: v1.ObjectMeta{
Name: rollout.Name + "-rh-" + RandAllString(6),
Namespace: rollout.Namespace,
},
Spec: rolloutv1alpha1.RolloutHistorySpec{
RolloutWrapper: *rolloutWrapper,
Workload: *workload,
ServiceWrapper: *serviceWrapper,
TrafficRoutingWrapper: *trafficRoutingWrapper,
},
Status: rolloutv1alpha1.RolloutHistoryStatus{},
}
// create this rollouthistory
err = r.Create(context.TODO(), RH, &client.CreateOptions{})
if err != nil {
logger.Info(err.Error() + " after r.Create")
return nil, err
}
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.createRHByRollout.1 successed", RH.Status.RolloutState.Message)
// init .status for rollouthistory
if err = r.initRHStatus(rollout, RH); err != nil {
logger.Info(err.Error() + " after r.initRHStatus")
return nil, err
}
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.createRHByRollout.2 successed", RH.Status.RolloutState.Message)
return RH, nil
}
// handle function, when rollouthistory .status.canaryStepState is CanaryStateInit
func (r *RolloutHistoryReconciler) handleInit(RT *rolloutv1alpha1.Rollout, RH *rolloutv1alpha1.RolloutHistory) error {
// when rollouthistory is meet the need in CanaryStateInit, move it to next canaryStepState
if *RH.Status.CanaryStepIndex == 0 &&
RT.Status.CanaryStatus.CurrentStepState == rolloutv1alpha1.CanaryStepStateUpgrade &&
*RH.Status.CanaryStepIndex+1 == RT.Status.CanaryStatus.CurrentStepIndex {
RH.Status.CanaryStepState = rolloutv1alpha1.CanaryStatePending
*RH.Status.CanaryStepIndex += 1
RH.Status.Phase = rolloutv1alpha1.PhaseProgressing
// update this rollouthistory
err := r.Status().Update(context.TODO(), RH, &client.UpdateOptions{})
if err != nil {
return err
}
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.handleInit successed", RH.Status.RolloutState.Message)
}
return nil
}
// handle function, when rollouthistory .status.canaryStepState is CanaryStatePending
func (r *RolloutHistoryReconciler) handlePending(RT *rolloutv1alpha1.Rollout, RH *rolloutv1alpha1.RolloutHistory) error {
// if rollouthistory .status.canaryStepIndex is equal to rollout .status.canaryStatus.currentStepIndex, and
// rollout .status.canaryStatus.currentStepState is CanaryStepStatePaused, rollouthistory need to record pod released information
// in this canary step
if RT.Status.CanaryStatus.CurrentStepIndex == int32(*RH.Status.CanaryStepIndex) &&
RT.Status.CanaryStatus.CurrentStepState == rolloutv1alpha1.CanaryStepStatePaused {
// rollout is in StepPaused, update info
var err error
// update .status.rolloutState
err = r.updateStatusRolloutState(RT, RH)
if err != nil {
return err
}
// update .status.canaryStepPods information in this step
err = r.updateCanaryStepPods(RT, RH)
if err != nil {
return err
}
// when update done, rollouthistory .status.canaryStepState should be CanaryStateUpdated
RH.Status.CanaryStepState = rolloutv1alpha1.CanaryStateUpdated
err = r.Status().Update(context.TODO(), RH, &client.UpdateOptions{})
if err != nil {
return err
}
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.handlePending successed", RH.Status.RolloutState.Message)
}
// if rollout .status.canaryStatus.currentStepIndex is 0 now, it means that user do a continuous rollout v1 -> v2(not completed) -> v3
if RT.Status.CanaryStatus.CurrentStepIndex == 0 {
err := r.doTerminated(RT, RH)
if err != nil {
return err
}
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.doTerminated successed", RH.Name)
}
// if rollout .status.phase is RolloutPhaseHealthy now, it means that user do a continuous rollout v1 -> v2(not completed) -> v1(back)
if RT.Status.Phase == rolloutv1alpha1.RolloutPhaseHealthy &&
RT.Status.CanaryStatus.CurrentStepState != rolloutv1alpha1.CanaryStepStateCompleted {
err := r.doCancelled(RT, RH)
if err != nil {
return err
}
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.doCancelled successed", RH.Name)
}
return nil
}
// handle function, when rollouthistory .status.canaryStepState is CanaryStateUpdated
func (r *RolloutHistoryReconciler) handleUpdated(RT *rolloutv1alpha1.Rollout, RH *rolloutv1alpha1.RolloutHistory) error {
// if rollout .status.canaryStatus.currentStepState is canaryStepStateUpgrade, it means that the last canary step has been approved
if RT.Status.CanaryStatus.CurrentStepState == rolloutv1alpha1.CanaryStepStateUpgrade &&
RT.Status.CanaryStatus.CurrentStepIndex == int32(*RH.Status.CanaryStepIndex)+1 {
RH.Status.CanaryStepState = rolloutv1alpha1.CanaryStatePending
*RH.Status.CanaryStepIndex++
err := r.Status().Update(context.TODO(), RH, &client.UpdateOptions{})
if err != nil {
return err
}
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.handleUpdate successed", RH.Status.RolloutState.Message)
}
// if rollout .status.canaryStatus.currentStepState is CanaryStepStateCompleted now , it means that this rollout has completed
if RT.Status.CanaryStatus.CurrentStepState == rolloutv1alpha1.CanaryStepStateCompleted &&
RT.Status.CanaryStatus.CurrentStepIndex == int32(*RH.Status.CanaryStepIndex) {
err := r.doCompleted(RT, RH)
if err != nil {
return err
}
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.doCompleted successed", RH.Status.RolloutState.Message)
}
// if rollout .status.phase is RolloutPhaseHealthy now, it means that user do a continuous rollout v1 -> v2(not completed) -> v1(back)
if RT.Status.Phase == rolloutv1alpha1.RolloutPhaseHealthy &&
RT.Status.CanaryStatus.CurrentStepState != rolloutv1alpha1.CanaryStepStateCompleted {
err := r.doCancelled(RT, RH)
if err != nil {
return err
}
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.doCancelled successed", RH.Name)
}
// if rollout .status.canaryStatus.currentStepIndex is 0 now, it means that user do a continuous rollout v1 -> v2(not completed) -> v3
if RT.Status.CanaryStatus.CurrentStepIndex == 0 {
err := r.doTerminated(RT, RH)
if err != nil {
return err
}
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.doTerminated successed", RH.Name)
}
return nil
}
// handle function, when rollouthistory .status.canaryStepState is CanaryStateTerminated, when user do a continuous rollout v1 -> v2(not completed) -> v3
func (r *RolloutHistoryReconciler) handleTerminated(RT *rolloutv1alpha1.Rollout, RH *rolloutv1alpha1.RolloutHistory) error {
if RH.Status.Phase == rolloutv1alpha1.PhaseCompleted {
return nil
}
// update this terminated-rollout related rollouthistory(v2 not completed), make .status.phase completed
RH.Status.Phase = rolloutv1alpha1.PhaseCompleted
err := r.Status().Update(context.TODO(), RH, &client.UpdateOptions{})
if err != nil {
return err
}
// when user do a continuous rollout v1 -> v2(not completed) -> v3, it will create a new rollouthistory for this rollout
newWorkload, err := r.initWorkloadInfo(RT)
if err != nil {
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.handleTerminated and initWorKload failed", RH.Status.RolloutState.Message)
return err
}
// get this new rollout which rolloutID is different and rollout name is same
newRollout := &rolloutv1alpha1.Rollout{}
err = r.Get(context.TODO(), types.NamespacedName{Namespace: RT.Namespace, Name: RT.Name}, newRollout)
if err != nil {
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.handleTerminated and get newRollout failed", RH.Status.RolloutState.Message)
return err
}
// init rolloutWrapper for the new rollouthistory
newRolloutWrapper, err := r.initRolloutWrapper(newRollout)
if err != nil {
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.handleTerminated and initRolloutInfo failed", RH.Status.RolloutState.Message)
return err
}
// init this new rollouthistory for rollout(v3)
newRolloutHistory := &rolloutv1alpha1.RolloutHistory{
TypeMeta: v1.TypeMeta{
Kind: RH.Kind,
APIVersion: RH.APIVersion,
},
ObjectMeta: v1.ObjectMeta{
Name: RT.Name + "-rh-" + RandAllString(6),
Namespace: RT.Namespace,
},
Spec: rolloutv1alpha1.RolloutHistorySpec{
RolloutWrapper: *newRolloutWrapper,
Workload: *newWorkload,
TrafficRoutingWrapper: RH.Spec.TrafficRoutingWrapper,
ServiceWrapper: RH.Spec.ServiceWrapper,
},
Status: rolloutv1alpha1.RolloutHistoryStatus{},
}
// create this rollouthistory, which record information of rollout(v3)
err = r.Create(context.TODO(), newRolloutHistory, &client.CreateOptions{})
if err != nil {
return err
}
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.createRHByRollout.1 successed", RH.Status.RolloutState.Message)
// init .status for rollouthistory
if err = r.initRHStatus(RT, newRolloutHistory); err != nil {
return err
}
r.Recorder.Event(newRolloutHistory.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.createRHByRollout.2 successed", RH.Status.RolloutState.Message)
return nil
}
// handle function, when rollouthistory .status.canaryStepState is CanaryStateCompleted
func (r *RolloutHistoryReconciler) handleCompleted(RT *rolloutv1alpha1.Rollout, RH *rolloutv1alpha1.RolloutHistory) error {
if RH.Status.Phase != rolloutv1alpha1.PhaseCompleted && RT.Status.Phase == rolloutv1alpha1.RolloutPhaseHealthy {
var err error
err = r.updateStatusRolloutState(RT, RH)
if err != nil {
return err
}
// update the final step status
if *RT.Spec.Strategy.Canary.Steps[*RH.Status.CanaryStepIndex-1].Weight != 100 {
*RH.Status.CanaryStepIndex += 1
err = r.updateCanaryStepPods(RT, RH)
if err != nil {
return err
}
}
RH.Status.Phase = rolloutv1alpha1.PhaseCompleted
err = r.Status().Update(context.TODO(), RH, &client.UpdateOptions{})
if err != nil {
return err
}
}
return nil
}
// handle function when rollout process is terminated, which means that user do a new rollout
func (r *RolloutHistoryReconciler) doTerminated(RT *rolloutv1alpha1.Rollout, RH *rolloutv1alpha1.RolloutHistory) error {
// just make .status.canaryStepState CanaryStateTerminated
RH.Status.CanaryStepState = rolloutv1alpha1.CanaryStateTerminated
err := r.Status().Update(context.TODO(), RH, &client.UpdateOptions{})
if err != nil {
return err
}
return nil
}
// handle function when rollout process is cancelled, which means that user do a rollback
func (r *RolloutHistoryReconciler) doCancelled(RT *rolloutv1alpha1.Rollout, RH *rolloutv1alpha1.RolloutHistory) error {
// if rollout is cancelled, make rollouthistory .status.canaryStepState CanaryStateCompleted and .status.phase PhaseCompleted
RH.Status.CanaryStepState = rolloutv1alpha1.CanaryStateCompleted
RH.Status.Phase = rolloutv1alpha1.PhaseCompleted
err := r.updateStatusRolloutState(RT, RH)
if err != nil {
return err
}
// if rollout is cancelled, make rollouthistory .status.canaryStepIndex value "-1"
*RH.Status.CanaryStepIndex = -1
err = r.Status().Update(context.TODO(), RH, &client.UpdateOptions{})
if err != nil {
return err
}
return nil
}
// handle function doComplated, usually it be called when something unexpected happens and rollout .status.canaryStatus.currentStepState become CanaryStateCompleted
func (r *RolloutHistoryReconciler) doCompleted(RT *rolloutv1alpha1.Rollout, RH *rolloutv1alpha1.RolloutHistory) error {
RH.Status.CanaryStepState = rolloutv1alpha1.CanaryStateCompleted
err := r.updateStatusRolloutState(RT, RH)
if err != nil {
return err
}
err = r.Status().Update(context.TODO(), RH, &client.UpdateOptions{})
if err != nil {
return err
}
return nil
}
// init .status for rollouthistory
func (r *RolloutHistoryReconciler) initRHStatus(RT *rolloutv1alpha1.Rollout, RH *rolloutv1alpha1.RolloutHistory) error {
RH.Status.Phase = rolloutv1alpha1.PhaseInit
RH.Status.CanaryStepState = rolloutv1alpha1.CanaryStateInit
RH.Status.CanaryStepIndex = new(int32)
*RH.Status.CanaryStepIndex = 0
RH.Status.RolloutState = rolloutv1alpha1.RolloutState{RolloutPhase: RT.Status.Phase, Message: RT.Status.Message}
RH.Status.CanaryStepPods = make([]rolloutv1alpha1.CanaryStepPods, 0)
if err := r.Status().Update(context.TODO(), RH, &client.UpdateOptions{}); err != nil {
return err
}
r.Recorder.Event(RH.DeepCopy().DeepCopyObject(), corev1.EventTypeNormal, "r.initRHStatus succeed", " ")
return nil
}
// init .spec.rolloutWrapper
func (r *RolloutHistoryReconciler) initRolloutWrapper(rollout *rolloutv1alpha1.Rollout) (*rolloutv1alpha1.RolloutWrapper, error) {
RolloutWrapper := &rolloutv1alpha1.RolloutWrapper{
Name: rollout.Name,
Rollout: *rollout,
}
return RolloutWrapper, nil
}
// init .spec.traffcRoutingWrapper
func (r *RolloutHistoryReconciler) initTrafficRoutingWrapper(rollout *rolloutv1alpha1.Rollout) (*rolloutv1alpha1.TrafficRoutingWrapper, error) {
trafficRoutingWrapper := &rolloutv1alpha1.TrafficRoutingWrapper{}
var err error
// if gateway is configured, init it
if rollout.Spec.Strategy.Canary.TrafficRoutings[0].Gateway != nil &&
rollout.Spec.Strategy.Canary.TrafficRoutings[0].Gateway.HTTPRouteName != nil {
trafficRoutingWrapper.HTTPRouteWrapper, err = r.initGateWayInfo(rollout)
if err != nil {
return nil, err
}
}
// if ingress is configured, init it
if rollout.Spec.Strategy.Canary.TrafficRoutings[0].Ingress.Name != "" {
trafficRoutingWrapper.IngressWrapper, err = r.initIngressInfo(rollout)
if err != nil {
return nil, err
}
}
return trafficRoutingWrapper, nil
}
// init Gateway, especially HTTPRoute
func (r *RolloutHistoryReconciler) initGateWayInfo(rollout *rolloutv1alpha1.Rollout) (*rolloutv1alpha1.HTTPRouteWrapper, error) {
GatewayName := *rollout.Spec.Strategy.Canary.TrafficRoutings[0].Gateway.HTTPRouteName
HTTPRoute := &v1alpha2.HTTPRoute{}
err := r.Get(context.TODO(), types.NamespacedName{Namespace: rollout.Namespace, Name: GatewayName}, HTTPRoute)
if err != nil {
return nil, errors.New("initGateway error: HTTPRoute " + GatewayName + " not find")
}
return &rolloutv1alpha1.HTTPRouteWrapper{Name: GatewayName, HTTPRoute: HTTPRoute}, err
}
// init Ingress
func (r *RolloutHistoryReconciler) initIngressInfo(rollout *rolloutv1alpha1.Rollout) (*rolloutv1alpha1.IngressWrapper, error) {
IngressName := rollout.Spec.Strategy.Canary.TrafficRoutings[0].Ingress.Name
Ingress := &networkingv1.Ingress{}
err := r.Get(context.TODO(), types.NamespacedName{Namespace: rollout.Namespace, Name: IngressName}, Ingress)
if err != nil {
return nil, errors.New("initIngressInfo error: Ingress " + IngressName + " not find")
}
return &rolloutv1alpha1.IngressWrapper{Name: IngressName, Ingress: Ingress}, err
}
// init .spec.workload
func (r *RolloutHistoryReconciler) initWorkloadInfo(rollout *rolloutv1alpha1.Rollout) (*rolloutv1alpha1.Workload, error) {
// init specific workload, according to rollout.spec.objectRef.workloadRef.kind, now rollouthistory support Deployment, CloneSet, native StatefulSet and Advanced StatefulSet
switch rollout.Spec.ObjectRef.WorkloadRef.Kind {
case "Deployment":
return r.initDeploymentInfo(rollout)
case "CloneSet":
return r.initCloneSetInfo(rollout)
case "StatefulSet":
// According to rollout.spec.objectRef.workload.APIVersion, select Advanced StatefulSet or Native StatefulSet
if rollout.Spec.ObjectRef.WorkloadRef.APIVersion == "apps.kruise.io/v1beta1" {
return r.initAdvancedStatefulSetInfo(rollout)
}
if rollout.Spec.ObjectRef.WorkloadRef.APIVersion == "apps/v1" {
return r.initNativeStatefulSetInfo(rollout)
}
return nil, errors.New("StatefulSet is invalid")
default:
return nil, errors.New("workload is invalid")
}
}
// init workload deployment
func (r *RolloutHistoryReconciler) initDeploymentInfo(rollout *rolloutv1alpha1.Rollout) (*rolloutv1alpha1.Workload, error) {
deployment := &appsv1.Deployment{}
err := r.Get(context.TODO(), types.NamespacedName{Namespace: rollout.Namespace, Name: rollout.Spec.ObjectRef.WorkloadRef.Name}, deployment)
if err != nil {
return nil, errors.New("deployment not find")
}
DeploymentInfo := rolloutv1alpha1.Workload{
TypeMeta: deployment.TypeMeta,
Name: deployment.Name,
Selector: deployment.Spec.Selector,
Replicas: deployment.Spec.Replicas,
Type: string(deployment.Spec.Strategy.Type),
Template: deployment.Spec.Template,
}
return &DeploymentInfo, nil
}
// init workload cloneset
func (r *RolloutHistoryReconciler) initCloneSetInfo(rollout *rolloutv1alpha1.Rollout) (*rolloutv1alpha1.Workload, error) {
cloneSet := &kruiseapi.CloneSet{}
err := r.Get(context.TODO(), types.NamespacedName{Namespace: rollout.Namespace, Name: rollout.Spec.ObjectRef.WorkloadRef.Name}, cloneSet)
if err != nil {
return &rolloutv1alpha1.Workload{}, err
}
CloneSetInfo := rolloutv1alpha1.Workload{
TypeMeta: cloneSet.TypeMeta,
Name: cloneSet.Name,
Selector: cloneSet.Spec.Selector,
Replicas: cloneSet.Spec.Replicas,
Type: string(cloneSet.Spec.UpdateStrategy.Type),
Template: cloneSet.Spec.Template,
}
return &CloneSetInfo, nil
}
// init Native StatefulSet
func (r *RolloutHistoryReconciler) initNativeStatefulSetInfo(rollout *rolloutv1alpha1.Rollout) (*rolloutv1alpha1.Workload, error) {
cloneSet := &appsv1.StatefulSet{}
err := r.Get(context.TODO(), types.NamespacedName{Namespace: rollout.Namespace, Name: rollout.Spec.ObjectRef.WorkloadRef.Name}, cloneSet)
if err != nil {
return &rolloutv1alpha1.Workload{}, err
}
CloneSetInfo := rolloutv1alpha1.Workload{
TypeMeta: cloneSet.TypeMeta,
Name: cloneSet.Name,
Selector: cloneSet.Spec.Selector,
Replicas: cloneSet.Spec.Replicas,
Type: string(cloneSet.Spec.UpdateStrategy.Type),
Template: cloneSet.Spec.Template,
}
return &CloneSetInfo, nil
}
// init Advanced StatefulSet
func (r *RolloutHistoryReconciler) initAdvancedStatefulSetInfo(rollout *rolloutv1alpha1.Rollout) (*rolloutv1alpha1.Workload, error) {
cloneSet := &kruiseapi_beta1.StatefulSet{}
err := r.Get(context.TODO(), types.NamespacedName{Namespace: rollout.Namespace, Name: rollout.Spec.ObjectRef.WorkloadRef.Name}, cloneSet)
if err != nil {
return &rolloutv1alpha1.Workload{}, err
}
CloneSetInfo := rolloutv1alpha1.Workload{
TypeMeta: cloneSet.TypeMeta,
Name: cloneSet.Name,
Selector: cloneSet.Spec.Selector,
Replicas: cloneSet.Spec.Replicas,
Type: string(cloneSet.Spec.UpdateStrategy.Type),
Template: cloneSet.Spec.Template,
}
return &CloneSetInfo, nil
}
// init .spec.serviceWrapper
func (r *RolloutHistoryReconciler) initServiceWrapper(rollout *rolloutv1alpha1.Rollout, workloadInfo *rolloutv1alpha1.Workload) (*rolloutv1alpha1.ServiceWrapper, error) {
Service := &corev1.Service{}
err := r.Get(context.TODO(), types.NamespacedName{Namespace: rollout.Namespace, Name: rollout.Spec.Strategy.Canary.TrafficRoutings[0].Service}, Service)
if err != nil {
return nil, errors.New("service not find")
}
return &rolloutv1alpha1.ServiceWrapper{Name: Service.Name, Service: Service}, nil
}
// update rollouthistory .status.rolloutState
func (r *RolloutHistoryReconciler) updateStatusRolloutState(RT *rolloutv1alpha1.Rollout, RH *rolloutv1alpha1.RolloutHistory) error {
RH.Status.RolloutState = rolloutv1alpha1.RolloutState{RolloutPhase: RT.Status.Phase, Message: RT.Status.Message}
return nil
}
// update canaryStepPods,
func (r *RolloutHistoryReconciler) updateCanaryStepPods(RT *rolloutv1alpha1.Rollout, RH *rolloutv1alpha1.RolloutHistory) error {
podList := &corev1.PodList{}
var err error
var extraSelector labels.Selector
// get labelSelector including rolloutBathID, rolloutID and workload selector
selector, _ := v1.LabelSelectorAsSelector(RH.Spec.Workload.Selector)
lableSelectorString := fmt.Sprintf("%v=%v,%v=%v,%v", util.RolloutBatchIDLabel, *RH.Status.CanaryStepIndex, util.RolloutIDLabel, RH.Spec.RolloutWrapper.Rollout.Spec.RolloutID, selector.String())
extraSelector, err = labels.Parse(lableSelectorString)
if err != nil {
return err
}
// get pods according to lableSelector
err = r.List(context.TODO(), podList, &client.ListOptions{LabelSelector: extraSelector}, client.InNamespace(RT.Namespace))
if err != nil {
return err
}
// if num of pods is empty, append a empty CanaryStepPods{}
if len(podList.Items) == 0 {
RH.Status.CanaryStepPods = append(RH.Status.CanaryStepPods, rolloutv1alpha1.CanaryStepPods{})
return nil
}
// get currentStepPods
currentStepPods := rolloutv1alpha1.CanaryStepPods{}
var pods []rolloutv1alpha1.Pod
// get pods name, ip, node and add them to CanaryStepPods.Pods
for i := range podList.Items {
pod := &podList.Items[i]
if pod.DeletionTimestamp.IsZero() {
cur := rolloutv1alpha1.Pod{Name: pod.Name, IP: pod.Status.PodIP, Node: pod.Spec.NodeName}
pods = append(pods, cur)
}
}
currentStepPods.Pods = pods
currentStepPods.StepIndex = *RH.Status.CanaryStepIndex
currentStepPods.PodsInStep = int32(len(pods))
// sum pods released by now
currentStepPods.PodsInTotal = int32(len(pods))
for _, stepStatus := range RH.Status.CanaryStepPods {
currentStepPods.PodsInTotal += stepStatus.PodsInStep
}
// add currentStepPods to .status.canaryStepPods
RH.Status.CanaryStepPods = append(RH.Status.CanaryStepPods, currentStepPods)
return nil
}

View File

@ -0,0 +1,39 @@
/*
Copyright 2022.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rollouthistory
import (
"crypto/rand"
"math/big"
"strings"
)
var CHARS = []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z",
"1", "2", "3", "4", "5", "6", "7", "8", "9", "0"}
func RandAllString(lenNum int) string {
str := strings.Builder{}
for i := 0; i < lenNum; i++ {
n, err := rand.Int(rand.Reader, big.NewInt(36))
if err != nil {
return ""
}
l := CHARS[n.Int64()]
str.WriteString(l)
}
return str.String()
}

File diff suppressed because it is too large Load Diff