Add composite provider to support multiple network providers (#224)

* add composite provider

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* fix nginx lua script and add E2E

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* fix test case

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* revert image

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* fix indent

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* follow latest interface change

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

* move e2e to v1beta1 file and add workflow

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>

---------

Signed-off-by: Megrez Lu <lujiajing1126@gmail.com>
This commit is contained in:
Jiajing LU 2024-11-01 11:28:58 +08:00 committed by GitHub
parent f0363f28c0
commit 6854752435
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 358 additions and 21 deletions

View File

@ -0,0 +1,85 @@
name: E2E-Multiple-NetworkProvider
on:
push:
branches:
- master
- release-*
pull_request: {}
workflow_dispatch: {}
env:
# Common versions
GO_VERSION: '1.17'
KIND_IMAGE: 'kindest/node:v1.23.3'
KIND_CLUSTER_NAME: 'ci-testing'
jobs:
rollout:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Setup Go
uses: actions/setup-go@v2
with:
go-version: ${{ env.GO_VERSION }}
- name: Setup Kind Cluster
uses: helm/kind-action@v1.2.0
with:
node_image: ${{ env.KIND_IMAGE }}
cluster_name: ${{ env.KIND_CLUSTER_NAME }}
config: ./test/kind-conf.yaml
- name: Build image
run: |
export IMAGE="openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID}"
docker build --pull --no-cache . -t $IMAGE
kind load docker-image --name=${KIND_CLUSTER_NAME} $IMAGE || { echo >&2 "kind not installed or error loading image: $IMAGE"; exit 1; }
- name: Install Kruise Rollout
run: |
set -ex
kubectl cluster-info
IMG=openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh
for ((i=1;i<10;i++));
do
set +e
PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l)
set -e
if [ "$PODS" -eq "1" ]; then
break
fi
sleep 3
done
set +e
PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l)
kubectl get node -o yaml
kubectl get all -n kruise-rollout -o yaml
set -e
if [ "$PODS" -eq "1" ]; then
echo "Wait for kruise-rollout ready successfully"
else
echo "Timeout to wait for kruise-rollout ready"
exit 1
fi
- name: Run E2E Tests
run: |
export KUBECONFIG=/home/runner/.kube/config
kubectl apply -f ./test/e2e/test_data/customNetworkProvider/istio_crd.yaml
kubectl apply -f ./test/e2e/test_data/customNetworkProvider/lua_script_configmap.yaml
make ginkgo
set +e
./bin/ginkgo -timeout 60m -v --focus='Canary rollout with multiple network providers' test/e2e
retVal=$?
# kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout
restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}')
if [ "${restartCount}" -eq "0" ];then
echo "Kruise-rollout has not restarted"
else
kubectl get pod -n kruise-rollout --no-headers
echo "Kruise-rollout has restarted, abort!!!"
kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout
exit 1
fi
exit $retVal

View File

@ -26,6 +26,7 @@ end
-- headers & cookie apis
-- traverse matches
for _,match in ipairs(obj.matches) do
if match.headers and next(match.headers) ~= nil then
local header = match.headers[1]
-- cookie
if ( header.name == "canary-by-cookie" )
@ -42,5 +43,6 @@ for _,match in ipairs(obj.matches) do
end
end
end
end
-- must be return annotations
return annotations

View File

@ -336,8 +336,9 @@ func (m *Manager) PatchStableService(c *TrafficRoutingContext) (bool, error) {
func newNetworkProvider(c client.Client, con *TrafficRoutingContext, sService, cService string) (network.NetworkProvider, error) {
trafficRouting := con.ObjectRef[0]
networkProviders := make([]network.NetworkProvider, 0, 3)
if trafficRouting.CustomNetworkRefs != nil {
return custom.NewCustomController(c, custom.Config{
np, innerErr := custom.NewCustomController(c, custom.Config{
Key: con.Key,
RolloutNs: con.Namespace,
CanaryService: cService,
@ -347,9 +348,13 @@ func newNetworkProvider(c client.Client, con *TrafficRoutingContext, sService, c
//only set for CustomController, never work for Ingress and Gateway
DisableGenerateCanaryService: con.DisableGenerateCanaryService,
})
if innerErr != nil {
return nil, innerErr
}
networkProviders = append(networkProviders, np)
}
if trafficRouting.Ingress != nil {
return ingress.NewIngressTrafficRouting(c, ingress.Config{
np, innerErr := ingress.NewIngressTrafficRouting(c, ingress.Config{
Key: con.Key,
Namespace: con.Namespace,
CanaryService: cService,
@ -357,17 +362,30 @@ func newNetworkProvider(c client.Client, con *TrafficRoutingContext, sService, c
TrafficConf: trafficRouting.Ingress,
OwnerRef: con.OwnerRef,
})
if innerErr != nil {
return nil, innerErr
}
networkProviders = append(networkProviders, np)
}
if trafficRouting.Gateway != nil {
return gateway.NewGatewayTrafficRouting(c, gateway.Config{
np, innerErr := gateway.NewGatewayTrafficRouting(c, gateway.Config{
Key: con.Key,
Namespace: con.Namespace,
CanaryService: cService,
StableService: sService,
TrafficConf: trafficRouting.Gateway,
})
if innerErr != nil {
return nil, innerErr
}
return nil, fmt.Errorf("TrafficRouting current only support Ingress or Gateway API")
networkProviders = append(networkProviders, np)
}
if len(networkProviders) == 0 {
return nil, fmt.Errorf("TrafficRouting current only supports Ingress, Gateway API and CustomNetworkRefs")
} else if len(networkProviders) == 1 {
return networkProviders[0], nil
}
return network.CompositeController(networkProviders), nil
}
func (m *Manager) createCanaryService(c *TrafficRoutingContext, cService string, spec corev1.ServiceSpec) (*corev1.Service, error) {

View File

@ -0,0 +1,66 @@
/*
Copyright 2024 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package network
import (
"context"
"k8s.io/apimachinery/pkg/util/validation/field"
"github.com/openkruise/rollouts/api/v1beta1"
)
var (
_ NetworkProvider = (CompositeController)(nil)
)
// CompositeController is a set of NetworkProvider
type CompositeController []NetworkProvider
func (c CompositeController) Initialize(ctx context.Context) error {
for _, provider := range c {
if err := provider.Initialize(ctx); err != nil {
return err
}
}
return nil
}
func (c CompositeController) EnsureRoutes(ctx context.Context, strategy *v1beta1.TrafficRoutingStrategy) (bool, error) {
done := true
for _, provider := range c {
if innerDone, innerErr := provider.EnsureRoutes(ctx, strategy); innerErr != nil {
return false, innerErr
} else if !innerDone {
done = false
}
}
return done, nil
}
func (c CompositeController) Finalise(ctx context.Context) (bool, error) {
modified := false
errList := field.ErrorList{}
for _, provider := range c {
if updated, innerErr := provider.Finalise(ctx); innerErr != nil {
errList = append(errList, field.InternalError(field.NewPath("FinalizeChildNetworkProvider"), innerErr))
} else if updated {
modified = true
}
}
return modified, errList.ToAggregate()
}

View File

@ -27,12 +27,11 @@ type NetworkProvider interface {
// Initialize only determine if the network resources(ingress & gateway api) exist.
// If error is nil, then the network resources exist.
Initialize(ctx context.Context) error
// EnsureRoutes check and set canary weight and matches.
// weight indicates percentage of traffic to canary service, and range of values[0,100]
// matches indicates A/B Testing release for headers, cookies
// 1. check if canary has been set desired weight.
// 2. If not, set canary desired weight
// When the first set weight is returned false, mainly to give the provider some time to process, only when again ensure, will return true
// EnsureRoutes check and set routes, e.g. canary weight and match conditions.
// 1. Canary weight specifies the relative proportion of traffic to be forwarded to the canary service within the range of [0,100]
// 2. Match conditions indicates rules to be satisfied for A/B testing scenarios, such as header, cookie, queryParams etc.
// Return true if and only if the route resources have been correctly updated and does not change in this round of reconciliation.
// Otherwise, return false to wait for the eventual consistency.
EnsureRoutes(ctx context.Context, strategy *v1beta1.TrafficRoutingStrategy) (bool, error)
// Finalise will do some cleanup work after the canary rollout complete, such as delete canary ingress.
// if error is nil, the return bool value means if the resources are modified

View File

@ -34,6 +34,7 @@ import (
netv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/util/retry"
@ -3982,6 +3983,133 @@ var _ = SIGDescribe("Rollout v1beta1", func() {
})
})
KruiseDescribe("Canary rollout with multiple network providers", func() {
It("V1->V2: Route traffic with header/queryParams/path matches and weight using rollout for Istio and Ingress", func() {
By("Creating Rollout...")
rollout := &v1beta1.Rollout{}
Expect(ReadYamlToObject("./test_data/customNetworkProvider/rollout_with_multi_trafficrouting.yaml", rollout)).ToNot(HaveOccurred())
CreateObject(rollout)
By("Creating workload and waiting for all pods ready...")
// service
service := &v1.Service{}
Expect(ReadYamlToObject("./test_data/rollout/service.yaml", service)).ToNot(HaveOccurred())
CreateObject(service)
// istio api
vs := &unstructured.Unstructured{}
Expect(ReadYamlToObject("./test_data/customNetworkProvider/virtualservice_without_destinationrule.yaml", vs)).ToNot(HaveOccurred())
vs.SetAPIVersion("networking.istio.io/v1alpha3")
vs.SetKind("VirtualService")
CreateObject(vs)
// ingress
ingress := &netv1.Ingress{}
Expect(ReadYamlToObject("./test_data/rollout/nginx_ingress.yaml", ingress)).ToNot(HaveOccurred())
CreateObject(ingress)
// workload
workload := &apps.Deployment{}
Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred())
workload.Spec.Replicas = utilpointer.Int32(4)
CreateObject(workload)
WaitDeploymentAllPodsReady(workload)
// check rollout status
Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred())
Expect(rollout.Status.Phase).Should(Equal(v1beta1.RolloutPhaseHealthy))
By("check rollout status & paused success")
// v1 -> v2, start rollout action
newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"})
workload.Spec.Template.Spec.Containers[0].Env = newEnvs
UpdateDeployment(workload)
By("Update deployment env NODE_NAME from(version1) -> to(version2), routing traffic with header agent:pc to new version pods")
time.Sleep(time.Second * 2)
Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred())
Expect(workload.Spec.Paused).Should(BeTrue())
// wait step 1 complete
WaitRolloutCanaryStepPaused(rollout.Name, 1)
// check rollout status
Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred())
Expect(rollout.Status.CanaryStatus.CanaryReplicas).Should(BeNumerically("==", 1))
Expect(rollout.Status.CanaryStatus.CanaryReadyReplicas).Should(BeNumerically("==", 1))
// check virtualservice spec
Expect(GetObject(vs.GetName(), vs)).NotTo(HaveOccurred())
expectedSpec := `{"gateways":["nginx-gateway"],"hosts":["*"],"http":[{"match":[{"uri":{"prefix":"/pc"}}],"route":[{"destination":{"host":"echoserver-canary"}}]},{"match":[{"queryParams":{"user-agent":{"exact":"pc"}}}],"route":[{"destination":{"host":"echoserver-canary"}}]},{"match":[{"headers":{"user-agent":{"exact":"pc"}}}],"route":[{"destination":{"host":"echoserver-canary"}}]},{"route":[{"destination":{"host":"echoserver"}}]}]}`
Expect(util.DumpJSON(vs.Object["spec"])).Should(Equal(expectedSpec))
// check original spec annotation
expectedAnno := `{"spec":{"gateways":["nginx-gateway"],"hosts":["*"],"http":[{"route":[{"destination":{"host":"echoserver"}}]}]}}`
Expect(vs.GetAnnotations()[OriginalSpecAnnotation]).Should(Equal(expectedAnno))
// check canary-ingress spec
cIngress := &netv1.Ingress{}
Expect(GetObject(service.Name+"-canary", cIngress)).NotTo(HaveOccurred())
Expect(cIngress.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)]).Should(Equal("true"))
Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)]).Should(BeEmpty())
Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-by-header", nginxIngressAnnotationDefaultPrefix)]).Should(Equal("user-agent"))
Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-by-header-value", nginxIngressAnnotationDefaultPrefix)]).Should(Equal("pc"))
// resume rollout canary
ResumeRolloutCanary(rollout.Name)
By("Resume rollout, and wait next step(2), routing 50% traffic to new version pods")
WaitRolloutCanaryStepPaused(rollout.Name, 2)
// check rollout status
Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred())
Expect(rollout.Status.CanaryStatus.CanaryReplicas).Should(BeNumerically("==", 2))
Expect(rollout.Status.CanaryStatus.CanaryReadyReplicas).Should(BeNumerically("==", 2))
// check virtualservice spec
Expect(GetObject(vs.GetName(), vs)).NotTo(HaveOccurred())
expectedSpec = `{"gateways":["nginx-gateway"],"hosts":["*"],"http":[{"route":[{"destination":{"host":"echoserver"},"weight":50},{"destination":{"host":"echoserver-canary"},"weight":50}]}]}`
Expect(util.DumpJSON(vs.Object["spec"])).Should(Equal(expectedSpec))
// check original spec annotation
Expect(vs.GetAnnotations()[OriginalSpecAnnotation]).Should(Equal(expectedAnno))
// check canary-ingress spec
Expect(GetObject(service.Name+"-canary", cIngress)).NotTo(HaveOccurred())
Expect(cIngress.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)]).Should(Equal("true"))
Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)]).Should(Equal(removePercentageSign(*rollout.Spec.Strategy.Canary.Steps[1].Traffic)))
Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-by-header", nginxIngressAnnotationDefaultPrefix)]).Should(BeEmpty())
Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-by-header-value", nginxIngressAnnotationDefaultPrefix)]).Should(BeEmpty())
// resume rollout
ResumeRolloutCanary(rollout.Name)
WaitRolloutStatusPhase(rollout.Name, v1beta1.RolloutPhaseHealthy)
By("rollout completed, and check")
// check ingress & service & virtualservice & deployment
// ingress
Expect(GetObject(ingress.Name, ingress)).NotTo(HaveOccurred())
cIngress = &netv1.Ingress{}
Expect(GetObject(fmt.Sprintf("%s-canary", ingress.Name), cIngress)).To(HaveOccurred())
// virtualservice
Expect(GetObject(vs.GetName(), vs)).NotTo(HaveOccurred())
expectedSpec = `{"gateways":["nginx-gateway"],"hosts":["*"],"http":[{"route":[{"destination":{"host":"echoserver"}}]}]}`
Expect(util.DumpJSON(vs.Object["spec"])).Should(Equal(expectedSpec))
Expect(vs.GetAnnotations()[OriginalSpecAnnotation]).Should(Equal(""))
// service
Expect(GetObject(service.Name, service)).NotTo(HaveOccurred())
Expect(service.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey]).Should(Equal(""))
cService := &v1.Service{}
Expect(GetObject(fmt.Sprintf("%s-canary", service.Name), cService)).To(HaveOccurred())
// deployment
Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred())
Expect(workload.Spec.Paused).Should(BeFalse())
Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", *workload.Spec.Replicas))
Expect(workload.Status.Replicas).Should(BeNumerically("==", *workload.Spec.Replicas))
Expect(workload.Status.ReadyReplicas).Should(BeNumerically("==", *workload.Spec.Replicas))
for _, env := range workload.Spec.Template.Spec.Containers[0].Env {
if env.Name == "NODE_NAME" {
Expect(env.Value).Should(Equal("version2"))
}
}
// check progressing succeed
Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred())
cond := getRolloutConditionV1beta1(rollout.Status, v1beta1.RolloutConditionProgressing)
Expect(cond.Reason).Should(Equal(v1beta1.ProgressingReasonCompleted))
Expect(string(cond.Status)).Should(Equal(string(metav1.ConditionFalse)))
cond = getRolloutConditionV1beta1(rollout.Status, v1beta1.RolloutConditionSucceeded)
Expect(string(cond.Status)).Should(Equal(string(metav1.ConditionTrue)))
Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred())
WaitRolloutWorkloadGeneration(rollout.Name, workload.Generation)
})
})
})
func removePercentageSign(input string) string {

View File

@ -0,0 +1,37 @@
apiVersion: rollouts.kruise.io/v1beta1
kind: Rollout
metadata:
name: rollouts-demo
spec:
disabled: false
workloadRef:
apiVersion: apps/v1
kind: Deployment
name: echoserver
strategy:
canary:
enableExtraWorkloadForCanary: true
steps:
- replicas: 1
matches:
- headers:
- type: Exact
name: user-agent
value: pc
- queryParams:
- type: Exact
name: user-agent
value: pc
- path:
value: /pc
- replicas: "50%"
traffic: "50%"
trafficRoutings:
- service: echoserver
ingress:
classType: nginx
name: echoserver
customNetworkRefs:
- apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
name: vs-demo

View File

@ -22,6 +22,8 @@ spec:
containers:
- name: echoserver
image: cilium/echoserver:latest
# For ARM-based env
# image: jmalloc/echo-server:latest
# imagePullPolicy: IfNotPresent
ports:
- containerPort: 8080