feat: preliminary dapr runtime support (#1518)

* Dapr runtime support

- installs Dapr cli in CI
- installs Dapr runtime on allocation of test cluster
- annotates services to enable dapr sidecar integration
- installs redis via helm, enabling state store, pub/sub and distributed
  lock
- integration test added for local invocation
- integration test added for service-to-service invocation via the
  sidecar

Note that Dapr runs metrics on port 9002 so as not to collide with
Knative metrics.

* create constants for knative service labels

* extract dapr annotations and use labels
This commit is contained in:
Luke Kingland 2023-02-03 01:01:31 +09:00 committed by GitHub
parent 6a539e40df
commit 10c9948a12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 326 additions and 42 deletions

View File

@ -409,9 +409,12 @@ func (c *Client) Runtimes() ([]string, error) {
// Apply (aka upsert)
//
// Invokes all lower-level methods as necessary to create a running function
// whose source code and metadata match that provided by the passed
// function instance.
// The general-purpose high-level method to initiate a synchronization of
// a function's source code and it's deployed instance(s).
//
// Invokes all lower-level methods, including initialization, as necessary to
// create a running function whose source code and metadata match that provided
// by the passed function instance, returning the final route and any errors.
func (c *Client) Apply(ctx context.Context, f Function) (route string, err error) {
if f, err = NewFunction(f.Root); err != nil {
return
@ -429,7 +432,7 @@ func (c *Client) Apply(ctx context.Context, f Function) (route string, err error
// source code.
//
// Use Init, Build, Push and Deploy independently for lower level control.
// Returns the primary route to the function or any errors.
// Returns final primary route to the Function and any errors.
func (c *Client) Update(ctx context.Context, root string) (route string, err error) {
f, err := NewFunction(root)
if err != nil {
@ -705,8 +708,7 @@ func (c *Client) printBuildActivity(ctx context.Context) {
}()
}
// Deploy the function at path. Errors if the function has not been
// initialized with an image tag.
// Deploy the function at path. Errors if the function has not been built.
func (c *Client) Deploy(ctx context.Context, path string) (err error) {
go func() {
<-ctx.Done()

View File

@ -5,7 +5,11 @@ package function_test
import (
"context"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"reflect"
"testing"
"time"
@ -18,32 +22,26 @@ import (
"knative.dev/pkg/ptr"
)
/*
NOTE: Running integration tests locally requires a configured test cluster.
Test failures may require manual removal of dangling resources.
// # Integration Tsets
//
// go test -tags integration ./...
//
// ## Cluster Required
//
// These integration tests require a properly configured cluster,
// such as that which is setup and configured in CI (see .github/workflows).
// Linux developers can set up the cluster via:
//
// ./hack/binaries.sh && ./hack/allocate.sh && ./hack/registry.sh
//
// ## Cluster Cleanup
//
// The test cluster and most resources can be removed with:
## Integration Cluster
These integration tests require a properly configured cluster,
such as that which is setup and configured in CI (see .github/workflows).
A local KinD cluster can be started via:
./hack/allocate.sh && ./hack/configure.sh
## Integration Testing
These tests can be run via the make target:
make test-integration
or manually by specifying the tag
go test -v -tags integration ./...
## Teardown and Cleanup
Tests should clean up after themselves. In the event of failures, one may
need to manually remove files:
rm -rf ./testdata/example.com
The test cluster is not automatically removed, as it can be reused. To remove:
./hack/delete.sh
*/
// ./hack/delete.sh
//
// NOTE: Downloaded images are not removed.
//
const (
// DefaultRegistry must contain both the registry host and
@ -237,16 +235,171 @@ func TestRemoteRepositories(t *testing.T) {
}
}
// TestInvoke_ClientToService ensures that the client can invoke a remotely
// deployed service, both by the route returned directly as well as using
// the invocation helper client.Invoke.
func TestInvoke_ClientToService(t *testing.T) {
var (
root, done = Mktemp(t)
verbose = true
ctx = context.Background()
client = newClient(verbose)
route string
err error
)
defer done()
// Create a function
f := fn.Function{Name: "f", Runtime: "go"}
if err = client.Init(f); err != nil {
t.Fatal(err)
}
source := `
package function
import (
"context"
"net/http"
)
func Handle(ctx context.Context, res http.ResponseWriter, req *http.Request) {
res.Write([]byte("TestInvoke_ClientToService OK"))
}
`
os.WriteFile(filepath.Join(root, "handle.go"), []byte(source), os.ModePerm)
if route, err = client.Apply(ctx, f); err != nil {
t.Fatal(err)
}
defer client.Remove(ctx, f, true)
// Invoke via the route
resp, err := http.Get(route)
if err != nil {
t.Fatal(err)
}
b, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if string(b) != "TestInvoke_ClientToService OK" {
t.Fatalf("unexpected response from HTTP GET: %v", b)
}
// Invoke using the helper
_, body, err := client.Invoke(ctx, root, "", fn.NewInvokeMessage())
if err != nil {
t.Fatal(err)
}
if body != "TestInvoke_ClientToService OK" {
t.Fatalf("unexpected response from client.Invoke: %v", b)
}
}
// TestInvoke_ServiceToService ensures that a Function can invoke another
// service via localhost service discovery api provided by the Dapr sidecar.
func TestInvoke_ServiceToService(t *testing.T) {
var (
verbose = true
ctx = context.Background()
client = newClient(verbose)
err error
source string
route string
)
// Create function A
// A function which responds to GET requests with a static value.
root, done := Mktemp(t)
defer done()
f := fn.Function{Name: "a", Runtime: "go"}
if err := client.Init(f); err != nil {
t.Fatal(err)
}
source = `
package function
import (
"context"
"net/http"
)
func Handle(ctx context.Context, res http.ResponseWriter, req *http.Request) {
res.Write([]byte("TestInvoke_ServiceToService OK"))
}
`
os.WriteFile(filepath.Join(root, "handle.go"), []byte(source), os.ModePerm)
if _, err = client.Apply(ctx, f); err != nil {
t.Fatal(err)
}
defer client.Remove(ctx, f, true)
// Create Function B
// which responds with the response from an invocation of 'a' via the
// localhost service discovery and invocation API.
root, done = Mktemp(t)
defer done()
f = fn.Function{Name: "b", Runtime: "go"}
if err := client.Init(f); err != nil {
t.Fatal(err)
}
source = `
package function
import (
"context"
"net/http"
"fmt"
"io"
)
func Handle(ctx context.Context, res http.ResponseWriter, req *http.Request) {
r, err := http.Get("http://localhost:3500/v1.0/invoke/a/method/")
if err != nil {
fmt.Printf("unable to invoke function a", err)
http.Error(res, "unable to invoke function a", http.StatusServiceUnavailable)
}
defer r.Body.Close()
io.Copy(res,r.Body)
}
`
os.WriteFile(filepath.Join(root, "handle.go"), []byte(source), os.ModePerm)
if route, err = client.Apply(ctx, f); err != nil {
t.Fatal(err)
}
defer client.Remove(ctx, f, true)
resp, err := http.Get(route)
if err != nil {
t.Fatal(err)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
fmt.Printf("### function a response body: %s\n", body)
if string(body) != "TestInvoke_ServiceToService OK" {
t.Fatalf("Unexpected response from Function B: %v", body)
}
}
// ***********
// Helpers
// ***********
// newClient creates an instance of the func client whose concrete impls
// match those created by the kn func plugin CLI.
// newClient creates an instance of the func client with concrete impls
// sufficient for running integration tests.
func newClient(verbose bool) *fn.Client {
builder := buildpacks.NewBuilder(buildpacks.WithVerbose(verbose))
pusher := docker.NewPusher(docker.WithVerbose(verbose))
deployer := knative.NewDeployer(knative.WithDeployerNamespace(DefaultNamespace), knative.WithDeployerVerbose(verbose))
describer := knative.NewDescriber(DefaultNamespace, verbose)
remover := knative.NewRemover(DefaultNamespace, verbose)
lister := knative.NewLister(DefaultNamespace, verbose)
@ -256,6 +409,7 @@ func newClient(verbose bool) *fn.Client {
fn.WithBuilder(builder),
fn.WithPusher(pusher),
fn.WithDeployer(deployer),
fn.WithDescriber(describer),
fn.WithRemover(remover),
fn.WithLister(lister),
)

View File

@ -40,7 +40,8 @@ main() {
eventing
networking
registry
configure
namespace
dapr_runtime
next_steps
echo "${em}DONE${me}"
@ -195,7 +196,7 @@ data:
EOF
}
configure() {
namespace() {
echo "${em}⑦ Configure Namespace${me}"
# Create Namespace
@ -243,10 +244,74 @@ EOF
}
dapr_runtime() {
echo "${em}⑦ Dapr${me}"
# Install Dapr Runtime
dapr init --kubernetes --wait
# Enalble Redis Persistence and Pub/Sub
#
# 1) Redis
# Creates a Redis leader with three replicas
# TODO: helm and the bitnami charts are likely not necessary. The Bitnami
# charts do tweak quite a few settings, but I am skeptical it is necessary
# in a CI/CD environment, as it does add nontrivial support overhead.
# TODO: If the bitnami redis chart seems worth the effort, munge this command
# to only start a single instance rather than four.
# helm repo add bitnami https://charts.bitnami.com/bitnami
echo "${em}- Redis ${me}"
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install redis bitnami/redis --set image.tag=6.2
helm repo update
# 2) Expose a Redis-backed Dapr State Storage component
echo "${em}- State Storage Component${me}"
kubectl apply -f - << EOF
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
namespace: default
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379
- name: redisPassword
secretKeyRef:
name: redis
key: redis-password
EOF
# 3) Expose A Redis-backed Dapr Pub/Sub Component
echo "${em}- Pub/Sub Component${me}"
kubectl apply -f - << EOF
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: default
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379
- name: redisPassword
secretKeyRef:
name: redis
key: redis-password
EOF
}
next_steps() {
local red=$(tput bold)$(tput setaf 1)
echo "${em}Configure Registry${me}"
echo "${em}Image Registry${me}"
echo "If not in CI (running ci.sh): "
echo " ${red}set registry as insecure${me} in the docker daemon config (/etc/docker/daemon.json on linux or ~/.docker/daemon.json on OSX):"
echo " { \"insecure-registries\": [ \"localhost:50000\" ] }"

View File

@ -14,6 +14,8 @@ export TERM="${TERM:-dumb}"
main() {
local kubectl_version=v1.24.6
local kind_version=v0.16.0
local dapr_version=v1.9.1
local helm_version=v3.11.0
local em=$(tput bold)$(tput setaf 2)
local me=$(tput sgr0)
@ -23,6 +25,8 @@ main() {
install_kubectl
install_kind
install_yq
install_dapr
install_helm
echo "${em}DONE${me}"
@ -32,7 +36,7 @@ install_kubectl() {
echo 'Installing kubectl...'
curl -sSLO "https://storage.googleapis.com/kubernetes-release/release/$kubectl_version/bin/linux/amd64/kubectl"
chmod +x kubectl
sudo mv kubectl /usr/local/bin/kubectl
sudo mv kubectl /usr/local/bin/
kubectl version --client=true
}
@ -40,7 +44,7 @@ install_kind() {
echo 'Installing kind...'
curl -sSLo kind "https://github.com/kubernetes-sigs/kind/releases/download/$kind_version/kind-linux-amd64"
chmod +x kind
sudo mv kind /usr/local/bin/kind
sudo mv kind /usr/local/bin/
kind version
}
@ -50,5 +54,22 @@ install_yq() {
yq --version
}
install_dapr() {
echo 'Installing dapr...'
curl -sSLo dapr.tgz "https://github.com/dapr/cli/releases/download/$dapr_version/dapr_linux_amd64.tar.gz"
tar -xvf dapr.tgz
chmod +x dapr
sudo mv dapr /usr/local/bin/
dapr version
}
install_helm() {
echo 'Installing helm (v3)...'
curl -sSLo helm.tgz "https://get.helm.sh/helm-$helm_version-linux-amd64.tar.gz"
tar -xvf helm.tgz
chmod +x linux-amd64/helm
sudo mv linux-amd64/helm /usr/local/bin/
helm version
}
main "$@"

View File

@ -342,10 +342,7 @@ func generateNewService(f fn.Function, decorator DeployDecorator) (*v1.Service,
labels = decorator.UpdateLabels(f, labels)
}
annotations := f.Deploy.Annotations
if decorator != nil {
annotations = decorator.UpdateAnnotations(f, annotations)
}
annotations := newServiceAnnotations(f, decorator)
// we need to create a separate map for Annotations specified in a Revision,
// in case we will need to specify autoscaling annotations -> these could be only in a Revision not in a Service
@ -388,6 +385,44 @@ func generateNewService(f fn.Function, decorator DeployDecorator) (*v1.Service,
return service, nil
}
// newServiceAnnotations creates a final map of service annotations based
// on static defaults plus the function's defined annotations plus the
// application of any provided annotation decorator.
func newServiceAnnotations(f fn.Function, d DeployDecorator) (aa map[string]string) {
aa = make(map[string]string)
// Enables Dapr support.
// Has no effect unless the target cluster has Dapr control plane installed.
for k, v := range daprAnnotations(f.Name) {
aa[k] = v
}
// Function-defined annotations
for k, v := range f.Deploy.Annotations {
aa[k] = v
}
// Decorator
if d != nil {
aa = d.UpdateAnnotations(f, aa)
}
return
}
// annotations which, if included and Dapr control plane is installed in
// the target cluster will result in a sidecar exposing the dapr HTTP API
// on localhost:3500 and metrics on 9092
func daprAnnotations(appid string) map[string]string {
aa := make(map[string]string)
aa["dapr.io/app-id"] = appid
aa["dapr.io/enabled"] = DaprEnabled
aa["dapr.io/metrics-port"] = DaprMetricsPort
aa["dapr.io/app-port"] = "8080"
aa["dapr.io/enable-api-logging"] = DaprEnableAPILogging
return aa
}
func updateService(f fn.Function, newEnv []corev1.EnvVar, newEnvFrom []corev1.EnvFromSource, newVolumes []corev1.Volume, newVolumeMounts []corev1.VolumeMount, decorator DeployDecorator) func(service *v1.Service) (*v1.Service, error) {
return func(service *v1.Service) (*v1.Service, error) {
// Removing the name so the k8s server can fill it in with generated name,

7
knative/labels.go Normal file
View File

@ -0,0 +1,7 @@
package knative
const (
DaprEnabled = "true"
DaprMetricsPort = "9092"
DaprEnableAPILogging = "true"
)