func/test/e2e/scenario_subscribe_test.go

217 lines
7.2 KiB
Go

//go:build e2e && linux
package e2e
import (
"os"
"path/filepath"
"strings"
"testing"
"time"
"gotest.tools/v3/assert"
"k8s.io/apimachinery/pkg/util/rand"
"knative.dev/func/test/oncluster"
"knative.dev/func/test/testhttp"
common "knative.dev/func/test/common"
)
type FuncSubscribeTestType struct {
T *testing.T
TestBrokerName string
TestBrokerUrl string
FuncProducerUrl string
SubscribeToEventType string
}
func (f *FuncSubscribeTestType) newKnCli() *common.TestExecCmd {
knCli := &common.TestExecCmd{
Binary: "kn",
ShouldFailOnError: true,
ShouldDumpCmdLine: true,
ShouldDumpOnSuccess: true,
T: f.T,
}
return knCli
}
func (f *FuncSubscribeTestType) newKubectlCli() *common.TestExecCmd {
kubectl := &common.TestExecCmd{
Binary: "kubectl",
ShouldDumpCmdLine: true,
ShouldDumpOnSuccess: false,
T: f.T,
}
return kubectl
}
func (f *FuncSubscribeTestType) setupBroker() {
f.TestBrokerName = "broker-" + rand.String(5)
knCli := f.newKnCli()
kubectl := f.newKubectlCli()
knCli.Exec("broker", "create", f.TestBrokerName, "--class", "MTChannelBasedBroker")
kubectl.Exec("wait", "broker/"+f.TestBrokerName, "--for=condition=TriggerChannelReady", "--timeout=15s")
cmd := knCli.Exec("broker", "describe", f.TestBrokerName, "-o", "url")
f.TestBrokerUrl = cmd.Out
f.TestBrokerUrl = strings.TrimRight(f.TestBrokerUrl, "\n")
assert.Assert(f.T, strings.HasPrefix(f.TestBrokerUrl, "http"))
f.T.Cleanup(func() {
f.newKnCli().Exec("broker", "delete", f.TestBrokerName)
})
}
// setupProducerFunc creates and deploy a knative function that produces events
// It will take 'type' and 'message' from query string to build and send an event to a TARGET_SINK (env var)
// Example: https://func-producer.default.127.0.0.1.sslip.io?type=HelloEvent&message=HELLO+EVENT+1
func (f *FuncSubscribeTestType) setupProducerFunc() {
var funcProducerName = "func-producer"
var funcProducerPath = filepath.Join(f.T.TempDir(), funcProducerName)
knFunc := common.NewKnFuncShellCli(f.T)
knFunc.Exec("create", "--language", "node", "--template", "http", funcProducerPath)
knFunc.SourceDir = funcProducerPath
indexJsContent := `
const { httpTransport, emitterFor, CloudEvent } = require("cloudevents");
const handle = async (context, body) => {
const ce = new CloudEvent({
source: "test.source",
type: context.query.type,
data: { message: context.query.message }
});
const emit = emitterFor(httpTransport(process.env.TARGET_SINK));
emit(ce);
}
module.exports = { handle };
`
err := os.WriteFile(filepath.Join(funcProducerPath, "index.js"), []byte(indexJsContent), 0644)
oncluster.AssertNoError(f.T, err)
knFunc.Exec("config", "env", "add", "--name", "TARGET_SINK", "--value", f.TestBrokerUrl, "-p", funcProducerPath)
knFunc.Exec("deploy", "-r", common.GetRegistry(), "-p", funcProducerPath)
f.FuncProducerUrl = knFunc.Exec("describe", "-o", "url", "-p", funcProducerPath).Out
f.FuncProducerUrl = strings.TrimRight(f.FuncProducerUrl, "\n")
f.T.Cleanup(func() {
knFunc.Exec("delete", funcProducerName)
})
}
// setupConsumerFunc creates and deploy the function that subscribe to events of type HelloEvent
func (f *FuncSubscribeTestType) setupConsumerFunc() {
var funcConsumerName = "func-consumer"
var funcConsumerPath = filepath.Join(f.T.TempDir(), funcConsumerName)
knFunc := common.NewKnFuncShellCli(f.T)
knFunc.Exec("create", "--language", "node", "--template", "cloudevents", funcConsumerPath)
knFunc.SourceDir = funcConsumerPath
indexJsContent := `
const { CloudEvent } = require('cloudevents');
const handle = async (context, event) => {
context.log.warn(event);
console.log(event);
return new CloudEvent({
source: 'consumer.processor',
type: 'consumer.processed'
})
};
module.exports = { handle };
`
err := os.WriteFile(filepath.Join(funcConsumerPath, "index.js"), []byte(indexJsContent), 0644)
oncluster.AssertNoError(f.T, err)
knFunc.Exec("subscribe", "--filter", "type="+f.SubscribeToEventType, "--source", f.TestBrokerName)
knFunc.Exec("deploy", "-r", common.GetRegistry(), "-p", funcConsumerPath)
f.T.Cleanup(func() {
knFunc.Exec("delete", funcConsumerName)
})
}
// TestFunctionSubscribeEvents tests the func integration with Kn Events by subscribing to events
// In other words, it tests `func subscribe` command
// To accomplish that the test steps consists in:
// - Deploy a function that produces events and emits to the broker
// - Deploy a function that subscribes to a specific Event Type (HelloEvent)
// - Make the producer func to send events of the expected (HelloEvent) and unexpected (DiscardEvent) CE Type
// - Assert the consumer function only receives the event it has subscribed to
func TestFunctionSubscribeEvents(t *testing.T) {
funcSubTest := &FuncSubscribeTestType{T: t, SubscribeToEventType: "HelloEvent"}
// ----------------------------------
// 1. Setup test Broker
// ----------------------------------
funcSubTest.setupBroker()
// ----------------------------------
// 2. Deploy test functions
// -----------------------------------
deploymentChan := make(chan string)
go func() {
funcSubTest.setupProducerFunc() // "kn function" that emits test events
deploymentChan <- "producer"
}()
go func() {
funcSubTest.setupConsumerFunc() // "kn function" that subscribe to events
deploymentChan <- "consumer"
}()
<-deploymentChan
<-deploymentChan
// ----------------------------------
// 3. Test
// ON WHEN a new event of a specific type is received by the broker
// ASSERT THAT the func-consumer receives the event it has subscribed to
// ----------------------------------
// Watch the logs of func-consumer and inspects for received Events
var gotEventA, gotEventB, c bool
var podReached, podNotFound bool
var doCheck = true
waitChan := make(chan bool)
go func() {
kubectl := funcSubTest.newKubectlCli()
for i := 0; doCheck; i++ {
result := kubectl.Exec("logs", "-l", "function.knative.dev/name=func-consumer", "-c", "user-container")
podNotFound = strings.Contains(result.Out, "No resources found")
podReached = podReached || !podNotFound
gotEventA = gotEventA || strings.Contains(result.Out, "EVENT_A_CATCH_ME")
gotEventB = gotEventB || strings.Contains(result.Out, "EVENT_B_DISCARD_ME")
doCheck = !(i > 20 || (podReached && podNotFound)) // check until function pod is Terminated
if doCheck {
if gotEventA && !c {
c = true
t.Log("Expected EVENT_A received. Watching for non-EVENT_B until function pod is Terminated")
}
kubectl.ShouldDumpCmdLine = false
time.Sleep(6 * time.Second) // 1.5 minutes max wait.
}
}
waitChan <- true
}()
time.Sleep(2 * time.Second)
// Invoke Producer func to force Event A to be emitted. The event should be received by func
testhttp.TestGet(t, funcSubTest.FuncProducerUrl+"?type="+funcSubTest.SubscribeToEventType+"&message=EVENT_A_CATCH_ME")
// Invoke Producer func to force Event B to be emitted. The event should NOT be received by func
testhttp.TestGet(t, funcSubTest.FuncProducerUrl+"?type=DiscardEvent&message=EVENT_B_DISCARD_ME")
<-waitChan
assert.Assert(t, gotEventA, "Event A was not received by the consumer function")
assert.Assert(t, !gotEventB, "Event B was received but it should not be")
}