From ce06b0d04c5a79347ee511753b18c949774a89c2 Mon Sep 17 00:00:00 2001 From: Aaron Wislang Date: Thu, 27 Jan 2022 17:22:51 -0500 Subject: [PATCH] Add TestStreamIsActive (#384) * Return IsActive error vs log and continue Signed-off-by: Aaron Wislang * Add TestStreamIsActive Signed-off-by: Aaron Wislang * Bump github.com/onsi/gomega from 1.17.0 to 1.18.0 (#378) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Signed-off-by: Aaron Wislang * Configure WhiteSource Bolt for GitHub (#379) Co-authored-by: whitesource-bolt-for-github[bot] <42819689+whitesource-bolt-for-github[bot]@users.noreply.github.com> Co-authored-by: Tom Kerkhove Signed-off-by: Aaron Wislang * adding targetPendingRequests to the xkcd helm chart (#373) Signed-off-by: Aaron Schlesinger Signed-off-by: Aaron Wislang * fixing targetPendingRequests in the HTTPScaledObject v0.2.0 documentation (#372) Signed-off-by: Aaron Schlesinger Signed-off-by: Aaron Wislang * defer grpcServer.Stop() Co-authored-by: Aaron Schlesinger <70865+arschles@users.noreply.github.com> Signed-off-by: Aaron Wislang * Don't check error for gprServer Signed-off-by: Aaron Wislang Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: whitesource-bolt-for-github[bot] <42819689+whitesource-bolt-for-github[bot]@users.noreply.github.com> Co-authored-by: Tom Kerkhove Co-authored-by: Aaron Schlesinger <70865+arschles@users.noreply.github.com> --- scaler/handlers.go | 4 +- scaler/handlers_test.go | 137 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 136 insertions(+), 5 deletions(-) diff --git a/scaler/handlers.go b/scaler/handlers.go index 482039a..4920f2d 100644 --- a/scaler/handlers.go +++ b/scaler/handlers.go @@ -99,9 +99,9 @@ func (e *impl) StreamIsActive( if err != nil { e.lggr.Error( err, - "error getting active status in stream, continuing", + "error getting active status in stream", ) - continue + return err } server.Send(&externalscaler.IsActiveResponse{ Result: active.Result, diff --git a/scaler/handlers_test.go b/scaler/handlers_test.go index f6c4fa8..fd28954 100644 --- a/scaler/handlers_test.go +++ b/scaler/handlers_test.go @@ -3,6 +3,7 @@ package main import ( context "context" "fmt" + "net" "testing" "time" @@ -11,6 +12,8 @@ import ( "github.com/kedacore/http-add-on/pkg/routing" externalscaler "github.com/kedacore/http-add-on/proto" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" ) func standardTarget() routing.Target { @@ -22,6 +25,137 @@ func standardTarget() routing.Target { 123, ) } + +func TestStreamIsActive(t *testing.T) { + type testCase struct { + name string + host string + expected bool + expectedErr bool + setup func(*routing.Table, *queuePinger) + } + + testCases := []testCase{ + { + name: "Simple host inactive", + host: t.Name(), + expected: false, + expectedErr: false, + setup: func(table *routing.Table, q *queuePinger) { + table.AddTarget(t.Name(), standardTarget()) + q.pingMut.Lock() + defer q.pingMut.Unlock() + q.allCounts[t.Name()] = 0 + }, + }, + { + name: "Host is 'interceptor'", + host: "interceptor", + expected: true, + expectedErr: false, + setup: func(*routing.Table, *queuePinger) {}, + }, + { + name: "Simple host active", + host: t.Name(), + expected: true, + expectedErr: false, + setup: func(table *routing.Table, q *queuePinger) { + table.AddTarget(t.Name(), standardTarget()) + q.pingMut.Lock() + defer q.pingMut.Unlock() + q.allCounts[t.Name()] = 1 + }, + }, + { + name: "No host present, but host in routing table", + host: t.Name(), + expected: false, + expectedErr: false, + setup: func(table *routing.Table, q *queuePinger) { + table.AddTarget(t.Name(), standardTarget()) + }, + }, + { + name: "Host doesn't exist", + host: t.Name(), + expected: false, + expectedErr: true, + setup: func(*routing.Table, *queuePinger) {}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + r := require.New(t) + ctx := context.Background() + lggr := logr.Discard() + table := routing.NewTable() + ticker, pinger, err := newFakeQueuePinger(ctx, lggr) + r.NoError(err) + defer ticker.Stop() + tc.setup(table, pinger) + + hdl := newImpl( + lggr, + pinger, + table, + 123, + 200, + ) + + bufSize := 1024 * 1024 + lis := bufconn.Listen(bufSize) + grpcServer := grpc.NewServer() + defer grpcServer.Stop() + externalscaler.RegisterExternalScalerServer( + grpcServer, + hdl, + ) + go grpcServer.Serve(lis) + + bufDialFunc := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } + + conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialFunc), grpc.WithInsecure()) + if err != nil { + t.Fatalf("Failed to dial bufnet: %v", err) + } + defer conn.Close() + + client := externalscaler.NewExternalScalerClient(conn) + + testRef := &externalscaler.ScaledObjectRef{ + ScalerMetadata: map[string]string{ + "host": tc.host, + }, + } + + // First will see if we can establish the stream and handle this + // error. + streamClient, err := client.StreamIsActive(ctx, testRef) + if err != nil { + t.Fatalf("StreamIsActive failed: %v", err) + } + + // Next, as in TestIsActive, we check for any error, expected + // or unexpected, for each table test. + res, err := streamClient.Recv() + + if tc.expectedErr && err != nil { + return + } else if err != nil { + t.Fatalf("expected no error but got: %v", err) + } + + if tc.expected != res.Result { + t.Fatalf("Expected IsActive result %v, got: %v", tc.expected, res.Result) + } + }) + } +} + func TestIsActive(t *testing.T) { type testCase struct { name string @@ -88,9 +222,6 @@ func TestIsActive(t *testing.T) { lggr := logr.Discard() table := routing.NewTable() ticker, pinger, err := newFakeQueuePinger(ctx, lggr) - if err != nil { - t.Fatalf("failed to create queue pinger: %v", err) - } r.NoError(err) defer ticker.Stop() tc.setup(table, pinger)