mirror of https://github.com/knative/func.git
217 lines
7.2 KiB
Go
217 lines
7.2 KiB
Go
//go:build e2e && linux
|
|
|
|
package e2e
|
|
|
|
import (
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"gotest.tools/v3/assert"
|
|
"k8s.io/apimachinery/pkg/util/rand"
|
|
"knative.dev/func/test/oncluster"
|
|
"knative.dev/func/test/testhttp"
|
|
|
|
common "knative.dev/func/test/common"
|
|
)
|
|
|
|
type FuncSubscribeTestType struct {
|
|
T *testing.T
|
|
TestBrokerName string
|
|
TestBrokerUrl string
|
|
FuncProducerUrl string
|
|
SubscribeToEventType string
|
|
}
|
|
|
|
func (f *FuncSubscribeTestType) newKnCli() *common.TestExecCmd {
|
|
knCli := &common.TestExecCmd{
|
|
Binary: "kn",
|
|
ShouldFailOnError: true,
|
|
ShouldDumpCmdLine: true,
|
|
ShouldDumpOnSuccess: true,
|
|
T: f.T,
|
|
}
|
|
return knCli
|
|
}
|
|
|
|
func (f *FuncSubscribeTestType) newKubectlCli() *common.TestExecCmd {
|
|
kubectl := &common.TestExecCmd{
|
|
Binary: "kubectl",
|
|
ShouldDumpCmdLine: true,
|
|
ShouldDumpOnSuccess: false,
|
|
T: f.T,
|
|
}
|
|
return kubectl
|
|
}
|
|
|
|
func (f *FuncSubscribeTestType) setupBroker() {
|
|
f.TestBrokerName = "broker-" + rand.String(5)
|
|
|
|
knCli := f.newKnCli()
|
|
kubectl := f.newKubectlCli()
|
|
knCli.Exec("broker", "create", f.TestBrokerName, "--class", "MTChannelBasedBroker")
|
|
kubectl.Exec("wait", "broker/"+f.TestBrokerName, "--for=condition=TriggerChannelReady", "--timeout=15s")
|
|
cmd := knCli.Exec("broker", "describe", f.TestBrokerName, "-o", "url")
|
|
|
|
f.TestBrokerUrl = cmd.Out
|
|
f.TestBrokerUrl = strings.TrimRight(f.TestBrokerUrl, "\n")
|
|
assert.Assert(f.T, strings.HasPrefix(f.TestBrokerUrl, "http"))
|
|
|
|
f.T.Cleanup(func() {
|
|
f.newKnCli().Exec("broker", "delete", f.TestBrokerName)
|
|
})
|
|
}
|
|
|
|
// setupProducerFunc creates and deploy a knative function that produces events
|
|
// It will take 'type' and 'message' from query string to build and send an event to a TARGET_SINK (env var)
|
|
// Example: https://func-producer.default.127.0.0.1.sslip.io?type=HelloEvent&message=HELLO+EVENT+1
|
|
func (f *FuncSubscribeTestType) setupProducerFunc() {
|
|
|
|
var funcProducerName = "func-producer"
|
|
var funcProducerPath = filepath.Join(f.T.TempDir(), funcProducerName)
|
|
|
|
knFunc := common.NewKnFuncShellCli(f.T)
|
|
knFunc.Exec("create", "--language", "node", "--template", "http", funcProducerPath)
|
|
knFunc.SourceDir = funcProducerPath
|
|
|
|
indexJsContent := `
|
|
const { httpTransport, emitterFor, CloudEvent } = require("cloudevents");
|
|
const handle = async (context, body) => {
|
|
const ce = new CloudEvent({
|
|
source: "test.source",
|
|
type: context.query.type,
|
|
data: { message: context.query.message }
|
|
});
|
|
const emit = emitterFor(httpTransport(process.env.TARGET_SINK));
|
|
emit(ce);
|
|
}
|
|
module.exports = { handle };
|
|
`
|
|
err := os.WriteFile(filepath.Join(funcProducerPath, "index.js"), []byte(indexJsContent), 0644)
|
|
oncluster.AssertNoError(f.T, err)
|
|
|
|
knFunc.Exec("config", "env", "add", "--name", "TARGET_SINK", "--value", f.TestBrokerUrl, "-p", funcProducerPath)
|
|
knFunc.Exec("deploy", "-r", common.GetRegistry(), "-p", funcProducerPath)
|
|
f.FuncProducerUrl = knFunc.Exec("describe", "-o", "url", "-p", funcProducerPath).Out
|
|
f.FuncProducerUrl = strings.TrimRight(f.FuncProducerUrl, "\n")
|
|
|
|
f.T.Cleanup(func() {
|
|
knFunc.Exec("delete", funcProducerName)
|
|
})
|
|
}
|
|
|
|
// setupConsumerFunc creates and deploy the function that subscribe to events of type HelloEvent
|
|
func (f *FuncSubscribeTestType) setupConsumerFunc() {
|
|
var funcConsumerName = "func-consumer"
|
|
var funcConsumerPath = filepath.Join(f.T.TempDir(), funcConsumerName)
|
|
|
|
knFunc := common.NewKnFuncShellCli(f.T)
|
|
knFunc.Exec("create", "--language", "node", "--template", "cloudevents", funcConsumerPath)
|
|
knFunc.SourceDir = funcConsumerPath
|
|
|
|
indexJsContent := `
|
|
const { CloudEvent } = require('cloudevents');
|
|
const handle = async (context, event) => {
|
|
context.log.warn(event);
|
|
console.log(event);
|
|
return new CloudEvent({
|
|
source: 'consumer.processor',
|
|
type: 'consumer.processed'
|
|
})
|
|
};
|
|
module.exports = { handle };
|
|
`
|
|
err := os.WriteFile(filepath.Join(funcConsumerPath, "index.js"), []byte(indexJsContent), 0644)
|
|
oncluster.AssertNoError(f.T, err)
|
|
|
|
knFunc.Exec("subscribe", "--filter", "type="+f.SubscribeToEventType, "--source", f.TestBrokerName)
|
|
knFunc.Exec("deploy", "-r", common.GetRegistry(), "-p", funcConsumerPath)
|
|
|
|
f.T.Cleanup(func() {
|
|
knFunc.Exec("delete", funcConsumerName)
|
|
})
|
|
}
|
|
|
|
// TestFunctionSubscribeEvents tests the func integration with Kn Events by subscribing to events
|
|
// In other words, it tests `func subscribe` command
|
|
// To accomplish that the test steps consists in:
|
|
// - Deploy a function that produces events and emits to the broker
|
|
// - Deploy a function that subscribes to a specific Event Type (HelloEvent)
|
|
// - Make the producer func to send events of the expected (HelloEvent) and unexpected (DiscardEvent) CE Type
|
|
// - Assert the consumer function only receives the event it has subscribed to
|
|
func TestFunctionSubscribeEvents(t *testing.T) {
|
|
|
|
funcSubTest := &FuncSubscribeTestType{T: t, SubscribeToEventType: "HelloEvent"}
|
|
|
|
// ----------------------------------
|
|
// 1. Setup test Broker
|
|
// ----------------------------------
|
|
funcSubTest.setupBroker()
|
|
|
|
// ----------------------------------
|
|
// 2. Deploy test functions
|
|
// -----------------------------------
|
|
deploymentChan := make(chan string)
|
|
|
|
go func() {
|
|
funcSubTest.setupProducerFunc() // "kn function" that emits test events
|
|
deploymentChan <- "producer"
|
|
}()
|
|
go func() {
|
|
funcSubTest.setupConsumerFunc() // "kn function" that subscribe to events
|
|
deploymentChan <- "consumer"
|
|
}()
|
|
<-deploymentChan
|
|
<-deploymentChan
|
|
|
|
// ----------------------------------
|
|
// 3. Test
|
|
// ON WHEN a new event of a specific type is received by the broker
|
|
// ASSERT THAT the func-consumer receives the event it has subscribed to
|
|
// ----------------------------------
|
|
|
|
// Watch the logs of func-consumer and inspects for received Events
|
|
|
|
var gotEventA, gotEventB, c bool
|
|
var podReached, podNotFound bool
|
|
var doCheck = true
|
|
|
|
waitChan := make(chan bool)
|
|
go func() {
|
|
kubectl := funcSubTest.newKubectlCli()
|
|
for i := 0; doCheck; i++ {
|
|
result := kubectl.Exec("logs", "-l", "function.knative.dev/name=func-consumer", "-c", "user-container")
|
|
|
|
podNotFound = strings.Contains(result.Out, "No resources found")
|
|
podReached = podReached || !podNotFound
|
|
gotEventA = gotEventA || strings.Contains(result.Out, "EVENT_A_CATCH_ME")
|
|
gotEventB = gotEventB || strings.Contains(result.Out, "EVENT_B_DISCARD_ME")
|
|
doCheck = !(i > 20 || (podReached && podNotFound)) // check until function pod is Terminated
|
|
if doCheck {
|
|
if gotEventA && !c {
|
|
c = true
|
|
t.Log("Expected EVENT_A received. Watching for non-EVENT_B until function pod is Terminated")
|
|
}
|
|
kubectl.ShouldDumpCmdLine = false
|
|
time.Sleep(6 * time.Second) // 1.5 minutes max wait.
|
|
}
|
|
}
|
|
waitChan <- true
|
|
}()
|
|
time.Sleep(2 * time.Second)
|
|
|
|
// Invoke Producer func to force Event A to be emitted. The event should be received by func
|
|
testhttp.TestGet(t, funcSubTest.FuncProducerUrl+"?type="+funcSubTest.SubscribeToEventType+"&message=EVENT_A_CATCH_ME")
|
|
|
|
// Invoke Producer func to force Event B to be emitted. The event should NOT be received by func
|
|
testhttp.TestGet(t, funcSubTest.FuncProducerUrl+"?type=DiscardEvent&message=EVENT_B_DISCARD_ME")
|
|
|
|
<-waitChan
|
|
|
|
assert.Assert(t, gotEventA, "Event A was not received by the consumer function")
|
|
assert.Assert(t, !gotEventB, "Event B was received but it should not be")
|
|
|
|
}
|