diff --git a/client.go b/client.go index e49305f1..8dbc6ddf 100644 --- a/client.go +++ b/client.go @@ -409,9 +409,12 @@ func (c *Client) Runtimes() ([]string, error) { // Apply (aka upsert) // -// Invokes all lower-level methods as necessary to create a running function -// whose source code and metadata match that provided by the passed -// function instance. +// The general-purpose high-level method to initiate a synchronization of +// a function's source code and it's deployed instance(s). +// +// Invokes all lower-level methods, including initialization, as necessary to +// create a running function whose source code and metadata match that provided +// by the passed function instance, returning the final route and any errors. func (c *Client) Apply(ctx context.Context, f Function) (route string, err error) { if f, err = NewFunction(f.Root); err != nil { return @@ -429,7 +432,7 @@ func (c *Client) Apply(ctx context.Context, f Function) (route string, err error // source code. // // Use Init, Build, Push and Deploy independently for lower level control. -// Returns the primary route to the function or any errors. +// Returns final primary route to the Function and any errors. func (c *Client) Update(ctx context.Context, root string) (route string, err error) { f, err := NewFunction(root) if err != nil { @@ -705,8 +708,7 @@ func (c *Client) printBuildActivity(ctx context.Context) { }() } -// Deploy the function at path. Errors if the function has not been -// initialized with an image tag. +// Deploy the function at path. Errors if the function has not been built. func (c *Client) Deploy(ctx context.Context, path string) (err error) { go func() { <-ctx.Done() diff --git a/client_int_test.go b/client_int_test.go index 14192b60..89e83a44 100644 --- a/client_int_test.go +++ b/client_int_test.go @@ -5,7 +5,11 @@ package function_test import ( "context" + "fmt" + "io" + "net/http" "os" + "path/filepath" "reflect" "testing" "time" @@ -18,32 +22,26 @@ import ( "knative.dev/pkg/ptr" ) -/* - NOTE: Running integration tests locally requires a configured test cluster. - Test failures may require manual removal of dangling resources. +// # Integration Tsets +// +// go test -tags integration ./... +// +// ## Cluster Required +// +// These integration tests require a properly configured cluster, +// such as that which is setup and configured in CI (see .github/workflows). +// Linux developers can set up the cluster via: +// +// ./hack/binaries.sh && ./hack/allocate.sh && ./hack/registry.sh +// +// ## Cluster Cleanup +// +// The test cluster and most resources can be removed with: - ## Integration Cluster - - These integration tests require a properly configured cluster, - such as that which is setup and configured in CI (see .github/workflows). - A local KinD cluster can be started via: - ./hack/allocate.sh && ./hack/configure.sh - - ## Integration Testing - - These tests can be run via the make target: - make test-integration - or manually by specifying the tag - go test -v -tags integration ./... - - ## Teardown and Cleanup - - Tests should clean up after themselves. In the event of failures, one may - need to manually remove files: - rm -rf ./testdata/example.com - The test cluster is not automatically removed, as it can be reused. To remove: - ./hack/delete.sh -*/ +// ./hack/delete.sh +// +// NOTE: Downloaded images are not removed. +// const ( // DefaultRegistry must contain both the registry host and @@ -237,16 +235,171 @@ func TestRemoteRepositories(t *testing.T) { } } +// TestInvoke_ClientToService ensures that the client can invoke a remotely +// deployed service, both by the route returned directly as well as using +// the invocation helper client.Invoke. +func TestInvoke_ClientToService(t *testing.T) { + var ( + root, done = Mktemp(t) + verbose = true + ctx = context.Background() + client = newClient(verbose) + route string + err error + ) + defer done() + + // Create a function + f := fn.Function{Name: "f", Runtime: "go"} + if err = client.Init(f); err != nil { + t.Fatal(err) + } + source := ` +package function + +import ( + "context" + "net/http" +) + +func Handle(ctx context.Context, res http.ResponseWriter, req *http.Request) { + res.Write([]byte("TestInvoke_ClientToService OK")) +} +` + os.WriteFile(filepath.Join(root, "handle.go"), []byte(source), os.ModePerm) + + if route, err = client.Apply(ctx, f); err != nil { + t.Fatal(err) + } + defer client.Remove(ctx, f, true) + + // Invoke via the route + resp, err := http.Get(route) + if err != nil { + t.Fatal(err) + } + b, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if string(b) != "TestInvoke_ClientToService OK" { + t.Fatalf("unexpected response from HTTP GET: %v", b) + } + + // Invoke using the helper + _, body, err := client.Invoke(ctx, root, "", fn.NewInvokeMessage()) + if err != nil { + t.Fatal(err) + } + + if body != "TestInvoke_ClientToService OK" { + t.Fatalf("unexpected response from client.Invoke: %v", b) + } +} + +// TestInvoke_ServiceToService ensures that a Function can invoke another +// service via localhost service discovery api provided by the Dapr sidecar. +func TestInvoke_ServiceToService(t *testing.T) { + var ( + verbose = true + ctx = context.Background() + client = newClient(verbose) + err error + source string + route string + ) + + // Create function A + // A function which responds to GET requests with a static value. + root, done := Mktemp(t) + defer done() + f := fn.Function{Name: "a", Runtime: "go"} + if err := client.Init(f); err != nil { + t.Fatal(err) + } + source = ` +package function + +import ( + "context" + "net/http" +) + +func Handle(ctx context.Context, res http.ResponseWriter, req *http.Request) { + res.Write([]byte("TestInvoke_ServiceToService OK")) +} +` + os.WriteFile(filepath.Join(root, "handle.go"), []byte(source), os.ModePerm) + if _, err = client.Apply(ctx, f); err != nil { + t.Fatal(err) + } + defer client.Remove(ctx, f, true) + + // Create Function B + // which responds with the response from an invocation of 'a' via the + // localhost service discovery and invocation API. + root, done = Mktemp(t) + defer done() + f = fn.Function{Name: "b", Runtime: "go"} + if err := client.Init(f); err != nil { + t.Fatal(err) + } + + source = ` +package function + +import ( + "context" + "net/http" + "fmt" + "io" +) + +func Handle(ctx context.Context, res http.ResponseWriter, req *http.Request) { + r, err := http.Get("http://localhost:3500/v1.0/invoke/a/method/") + if err != nil { + fmt.Printf("unable to invoke function a", err) + http.Error(res, "unable to invoke function a", http.StatusServiceUnavailable) + } + defer r.Body.Close() + io.Copy(res,r.Body) +} +` + os.WriteFile(filepath.Join(root, "handle.go"), []byte(source), os.ModePerm) + if route, err = client.Apply(ctx, f); err != nil { + t.Fatal(err) + } + defer client.Remove(ctx, f, true) + + resp, err := http.Get(route) + if err != nil { + t.Fatal(err) + } + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + fmt.Printf("### function a response body: %s\n", body) + + if string(body) != "TestInvoke_ServiceToService OK" { + t.Fatalf("Unexpected response from Function B: %v", body) + } +} + // *********** // Helpers // *********** -// newClient creates an instance of the func client whose concrete impls -// match those created by the kn func plugin CLI. +// newClient creates an instance of the func client with concrete impls +// sufficient for running integration tests. func newClient(verbose bool) *fn.Client { builder := buildpacks.NewBuilder(buildpacks.WithVerbose(verbose)) pusher := docker.NewPusher(docker.WithVerbose(verbose)) deployer := knative.NewDeployer(knative.WithDeployerNamespace(DefaultNamespace), knative.WithDeployerVerbose(verbose)) + describer := knative.NewDescriber(DefaultNamespace, verbose) remover := knative.NewRemover(DefaultNamespace, verbose) lister := knative.NewLister(DefaultNamespace, verbose) @@ -256,6 +409,7 @@ func newClient(verbose bool) *fn.Client { fn.WithBuilder(builder), fn.WithPusher(pusher), fn.WithDeployer(deployer), + fn.WithDescriber(describer), fn.WithRemover(remover), fn.WithLister(lister), ) diff --git a/hack/allocate.sh b/hack/allocate.sh index d54dcd87..4909cf89 100755 --- a/hack/allocate.sh +++ b/hack/allocate.sh @@ -40,7 +40,8 @@ main() { eventing networking registry - configure + namespace + dapr_runtime next_steps echo "${em}DONE${me}" @@ -195,7 +196,7 @@ data: EOF } -configure() { +namespace() { echo "${em}⑦ Configure Namespace${me}" # Create Namespace @@ -243,10 +244,74 @@ EOF } +dapr_runtime() { + echo "${em}⑦ Dapr${me}" + + # Install Dapr Runtime + dapr init --kubernetes --wait + + # Enalble Redis Persistence and Pub/Sub + # + # 1) Redis + # Creates a Redis leader with three replicas + # TODO: helm and the bitnami charts are likely not necessary. The Bitnami + # charts do tweak quite a few settings, but I am skeptical it is necessary + # in a CI/CD environment, as it does add nontrivial support overhead. + # TODO: If the bitnami redis chart seems worth the effort, munge this command + # to only start a single instance rather than four. + # helm repo add bitnami https://charts.bitnami.com/bitnami + echo "${em}- Redis ${me}" + helm repo add bitnami https://charts.bitnami.com/bitnami + helm install redis bitnami/redis --set image.tag=6.2 + helm repo update + + # 2) Expose a Redis-backed Dapr State Storage component + echo "${em}- State Storage Component${me}" + kubectl apply -f - << EOF +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore + namespace: default +spec: + type: state.redis + version: v1 + metadata: + - name: redisHost + value: redis-master.default.svc.cluster.local:6379 + - name: redisPassword + secretKeyRef: + name: redis + key: redis-password +EOF + + # 3) Expose A Redis-backed Dapr Pub/Sub Component + echo "${em}- Pub/Sub Component${me}" + kubectl apply -f - << EOF +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: pubsub + namespace: default +spec: + type: pubsub.redis + version: v1 + metadata: + - name: redisHost + value: redis-master.default.svc.cluster.local:6379 + - name: redisPassword + secretKeyRef: + name: redis + key: redis-password +EOF + +} + + next_steps() { local red=$(tput bold)$(tput setaf 1) - echo "${em}Configure Registry${me}" + echo "${em}Image Registry${me}" echo "If not in CI (running ci.sh): " echo " ${red}set registry as insecure${me} in the docker daemon config (/etc/docker/daemon.json on linux or ~/.docker/daemon.json on OSX):" echo " { \"insecure-registries\": [ \"localhost:50000\" ] }" diff --git a/hack/binaries.sh b/hack/binaries.sh index c2bf1ac2..b8b32f8c 100755 --- a/hack/binaries.sh +++ b/hack/binaries.sh @@ -14,6 +14,8 @@ export TERM="${TERM:-dumb}" main() { local kubectl_version=v1.24.6 local kind_version=v0.16.0 + local dapr_version=v1.9.1 + local helm_version=v3.11.0 local em=$(tput bold)$(tput setaf 2) local me=$(tput sgr0) @@ -23,6 +25,8 @@ main() { install_kubectl install_kind install_yq + install_dapr + install_helm echo "${em}DONE${me}" @@ -32,7 +36,7 @@ install_kubectl() { echo 'Installing kubectl...' curl -sSLO "https://storage.googleapis.com/kubernetes-release/release/$kubectl_version/bin/linux/amd64/kubectl" chmod +x kubectl - sudo mv kubectl /usr/local/bin/kubectl + sudo mv kubectl /usr/local/bin/ kubectl version --client=true } @@ -40,7 +44,7 @@ install_kind() { echo 'Installing kind...' curl -sSLo kind "https://github.com/kubernetes-sigs/kind/releases/download/$kind_version/kind-linux-amd64" chmod +x kind - sudo mv kind /usr/local/bin/kind + sudo mv kind /usr/local/bin/ kind version } @@ -50,5 +54,22 @@ install_yq() { yq --version } +install_dapr() { + echo 'Installing dapr...' + curl -sSLo dapr.tgz "https://github.com/dapr/cli/releases/download/$dapr_version/dapr_linux_amd64.tar.gz" + tar -xvf dapr.tgz + chmod +x dapr + sudo mv dapr /usr/local/bin/ + dapr version +} + +install_helm() { + echo 'Installing helm (v3)...' + curl -sSLo helm.tgz "https://get.helm.sh/helm-$helm_version-linux-amd64.tar.gz" + tar -xvf helm.tgz + chmod +x linux-amd64/helm + sudo mv linux-amd64/helm /usr/local/bin/ + helm version +} main "$@" diff --git a/knative/deployer.go b/knative/deployer.go index 298f894b..14abefea 100644 --- a/knative/deployer.go +++ b/knative/deployer.go @@ -342,10 +342,7 @@ func generateNewService(f fn.Function, decorator DeployDecorator) (*v1.Service, labels = decorator.UpdateLabels(f, labels) } - annotations := f.Deploy.Annotations - if decorator != nil { - annotations = decorator.UpdateAnnotations(f, annotations) - } + annotations := newServiceAnnotations(f, decorator) // we need to create a separate map for Annotations specified in a Revision, // in case we will need to specify autoscaling annotations -> these could be only in a Revision not in a Service @@ -388,6 +385,44 @@ func generateNewService(f fn.Function, decorator DeployDecorator) (*v1.Service, return service, nil } +// newServiceAnnotations creates a final map of service annotations based +// on static defaults plus the function's defined annotations plus the +// application of any provided annotation decorator. +func newServiceAnnotations(f fn.Function, d DeployDecorator) (aa map[string]string) { + aa = make(map[string]string) + + // Enables Dapr support. + // Has no effect unless the target cluster has Dapr control plane installed. + for k, v := range daprAnnotations(f.Name) { + aa[k] = v + } + + // Function-defined annotations + for k, v := range f.Deploy.Annotations { + aa[k] = v + } + + // Decorator + if d != nil { + aa = d.UpdateAnnotations(f, aa) + } + + return +} + +// annotations which, if included and Dapr control plane is installed in +// the target cluster will result in a sidecar exposing the dapr HTTP API +// on localhost:3500 and metrics on 9092 +func daprAnnotations(appid string) map[string]string { + aa := make(map[string]string) + aa["dapr.io/app-id"] = appid + aa["dapr.io/enabled"] = DaprEnabled + aa["dapr.io/metrics-port"] = DaprMetricsPort + aa["dapr.io/app-port"] = "8080" + aa["dapr.io/enable-api-logging"] = DaprEnableAPILogging + return aa +} + func updateService(f fn.Function, newEnv []corev1.EnvVar, newEnvFrom []corev1.EnvFromSource, newVolumes []corev1.Volume, newVolumeMounts []corev1.VolumeMount, decorator DeployDecorator) func(service *v1.Service) (*v1.Service, error) { return func(service *v1.Service) (*v1.Service, error) { // Removing the name so the k8s server can fill it in with generated name, diff --git a/knative/labels.go b/knative/labels.go new file mode 100644 index 00000000..2dc00214 --- /dev/null +++ b/knative/labels.go @@ -0,0 +1,7 @@ +package knative + +const ( + DaprEnabled = "true" + DaprMetricsPort = "9092" + DaprEnableAPILogging = "true" +)