diff --git a/test/mako/config/benchmark.go b/test/mako/config/benchmark.go index 4c437f149..4a1f2e0bc 100644 --- a/test/mako/config/benchmark.go +++ b/test/mako/config/benchmark.go @@ -28,36 +28,40 @@ import ( ) const koDataPathEnvName = "KO_DATA_PATH" +const configMako = "/etc/config-mako" // MustGetBenchmark wraps getBenchmark in log.Fatalf -func MustGetBenchmark() (*string, *string) { - benchmarkKey, benchmarkName, err := getBenchmark() +func MustGetBenchmark() *mpb.BenchmarkInfo { + bench, err := getBenchmark() if err != nil { - log.Fatalf("unable to determine benchmark_key: %v", err) + log.Fatalf("unable to determine benchmark info: %v", err) } - return benchmarkKey, benchmarkName + return bench } // getBenchmark fetches the appropriate benchmark_key for this configured environment. -func getBenchmark() (*string, *string, error) { +func getBenchmark() (*mpb.BenchmarkInfo, error) { // Figure out what environment we're running in from the Mako configmap. env, err := getEnvironment() if err != nil { - return nil, nil, err + return nil, err } // Read the Mako config file for this environment. - data, err := readFileFromKoData(env + ".config") - if err != nil { - return nil, nil, err + data, koerr := readFileFromKoData(env + ".config") + if koerr != nil { + data, err = ioutil.ReadFile(filepath.Join(configMako, env+".config")) + if err != nil { + return nil, fmt.Errorf("cannot load both from kodata and from config mako config map: %s, %s", koerr.Error(), err.Error()) + } } // Parse the Mako config file. bi := &mpb.BenchmarkInfo{} if err := proto.UnmarshalText(string(data), bi); err != nil { - return nil, nil, err + return nil, err } // Return the benchmark_key from this environment's config file. - return bi.BenchmarkKey, bi.BenchmarkName, nil + return bi, nil } // readFileFromKoData reads the named file from kodata. diff --git a/test/mako/sidecar.go b/test/mako/sidecar.go index 90c637758..9447f2bde 100644 --- a/test/mako/sidecar.go +++ b/test/mako/sidecar.go @@ -176,12 +176,8 @@ func SetupHelper(ctx context.Context, benchmarkKey *string, benchmarkName *strin } func Setup(ctx context.Context, extraTags ...string) (*Client, error) { - benchmarkKey, benchmarkName := config.MustGetBenchmark() - return SetupHelper(ctx, benchmarkKey, benchmarkName, extraTags...) -} - -func SetupWithBenchmarkConfig(ctx context.Context, benchmarkKey *string, benchmarkName *string, extraTags ...string) (*Client, error) { - return SetupHelper(ctx, benchmarkKey, benchmarkName, extraTags...) + bench := config.MustGetBenchmark() + return SetupHelper(ctx, bench.BenchmarkKey, bench.BenchmarkName, extraTags...) } func tokenPath(token string) string { diff --git a/test/mako/stub-sidecar/main.go b/test/mako/stub-sidecar/main.go index e9d618d14..1b5809784 100644 --- a/test/mako/stub-sidecar/main.go +++ b/test/mako/stub-sidecar/main.go @@ -19,104 +19,150 @@ package main import ( "context" "encoding/csv" + "flag" "fmt" - "log" - "net" - "os" + "net/http" "strings" - "sync" "github.com/golang/protobuf/jsonpb" + mako "github.com/google/mako/spec/proto/mako_go_proto" + + "log" + "net" + "sync" "google.golang.org/grpc" + + "knative.dev/pkg/test/mako/config" qspb "knative.dev/pkg/third_party/mako/proto/quickstore_go_proto" ) const ( - port = ":9813" - // A 10 minutes run at 1000 rps of eventing perf tests is usually ~= 70 MBi, so 100MBi is reasonable - defaultServerMaxReceiveMessageSize = 1024 * 1024 * 100 + port = ":9813" + defaultServerMaxReceiveMessageSize = 1024 * 1024 * 1024 ) type server struct { + info *mako.BenchmarkInfo stopOnce sync.Once stopCh chan struct{} + sb *strings.Builder } func (s *server) Store(ctx context.Context, in *qspb.StoreInput) (*qspb.StoreOutput, error) { m := jsonpb.Marshaler{} qi, _ := m.MarshalToString(in.GetQuickstoreInput()) - fmt.Printf("# %s\n", qi) - cols := []string{"inputValue", "errorMessage"} - writer := csv.NewWriter(os.Stdout) + fmt.Printf("# Received input") + + fmt.Fprintf(s.sb, "# %s\n", qi) + writer := csv.NewWriter(s.sb) + + kv := calculateKeyIndexColumnsMap(s.info) + cols := make([]string, len(kv)) + for k, i := range kv { + cols[i] = k + } + fmt.Fprintf(s.sb, "# %s\n", strings.Join(cols, ",")) for _, sp := range in.GetSamplePoints() { for _, mv := range sp.GetMetricValueList() { - cols = updateCols(cols, mv.GetValueKey()) vals := map[string]string{"inputValue": fmt.Sprintf("%f", sp.GetInputValue())} vals[mv.GetValueKey()] = fmt.Sprintf("%f", mv.GetValue()) - writer.Write(makeRow(cols, vals)) + writer.Write(makeRow(vals, kv)) } } for _, ra := range in.GetRunAggregates() { - cols = updateCols(cols, ra.GetValueKey()) vals := map[string]string{ra.GetValueKey(): fmt.Sprintf("%f", ra.GetValue())} - writer.Write(makeRow(cols, vals)) + writer.Write(makeRow(vals, kv)) } for _, sa := range in.GetSampleErrors() { vals := map[string]string{"inputValue": fmt.Sprintf("%f", sa.GetInputValue()), "errorMessage": sa.GetErrorMessage()} - writer.Write(makeRow(cols, vals)) + writer.Write(makeRow(vals, kv)) } writer.Flush() - fmt.Printf("# %s\n", strings.Join(cols, ",")) + + fmt.Fprintf(s.sb, "# CSV end\n") + fmt.Printf("# Input completed") return &qspb.StoreOutput{}, nil } -func updateCols(prototype []string, k string) []string { - for _, n := range prototype { - if n == k { - return prototype - } - } - prototype = append(prototype, k) - return prototype -} - -func makeRow(prototype []string, points map[string]string) []string { - row := make([]string, len(prototype)) - // n^2 but whatever +func makeRow(points map[string]string, kv map[string]int) []string { + row := make([]string, len(kv)) for k, v := range points { - for i, n := range prototype { - if k == n { - row[i] = v - } - } + row[kv[k]] = v } return row } +func calculateKeyIndexColumnsMap(info *mako.BenchmarkInfo) map[string]int { + kv := make(map[string]int) + kv["inputValue"] = 0 + kv["errorMessage"] = 1 + for i, m := range info.MetricInfoList { + kv[*m.ValueKey] = i + 2 + } + return kv +} + func (s *server) ShutdownMicroservice(ctx context.Context, in *qspb.ShutdownInput) (*qspb.ShutdownOutput, error) { s.stopOnce.Do(func() { close(s.stopCh) }) return &qspb.ShutdownOutput{}, nil } +var httpPort int + +func init() { + flag.IntVar(&httpPort, "p", 0, "Port to use for using stub in HTTP mode. 0 means print to logs and quit") +} + func main() { + flag.Parse() + lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer(grpc.MaxRecvMsgSize(defaultServerMaxReceiveMessageSize)) stopCh := make(chan struct{}) + info := config.MustGetBenchmark() + var sb strings.Builder + + fmt.Fprintf(&sb, "# Benchmark %s - %s\n", *info.BenchmarkKey, *info.BenchmarkName) + go func() { - qspb.RegisterQuickstoreServer(s, &server{stopCh: stopCh}) + qspb.RegisterQuickstoreServer(s, &server{info: info, stopCh: stopCh, sb: &sb}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }() <-stopCh s.GracefulStop() + + results := sb.String() + + if httpPort != 0 { + m := http.NewServeMux() + s := http.Server{Addr: fmt.Sprintf(":%d", httpPort), Handler: m} + + m.HandleFunc("/results", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "text/csv") + _, err := fmt.Fprint(w, results) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + }) + m.HandleFunc("/close", func(writer http.ResponseWriter, request *http.Request) { + s.Shutdown(context.Background()) + }) + if err := s.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatal(err) + } + fmt.Print("Successfully served the results") + } else { + fmt.Print(sb.String()) + } } diff --git a/test/mako/stub-sidecar/read_results.sh b/test/mako/stub-sidecar/read_results.sh new file mode 100644 index 000000000..ca6efb0d7 --- /dev/null +++ b/test/mako/stub-sidecar/read_results.sh @@ -0,0 +1,74 @@ +#!/bin/bash + +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This scripts helps to read results of mako-stub from http + +check_command_exists() { + CMD_NAME=$1 + command -v "$CMD_NAME" > /dev/null || { + echo "Command $CMD_NAME does not exist" + exit 1 + } +} + +check_command_exists kubectl +check_command_exists curl + +if [[ $# -lt 7 ]] +then + echo "Usage: $0 " + exit 1 +fi + +MAKO_STUB_POD_NAME="$1" +MAKO_STUB_NAMESPACE="$2" +MAKO_STUB_PORT="$3" +TIMEOUT="$4" +RETRIES="$5" +RETRIES_INTERVAL="$6" +OUTPUT_FILE="$7" + +# Find port ready to use + +port=10000 +isfree=$(netstat -tapln | grep $port) + +while [[ -n "$isfree" ]]; do + port=$((port + 1)) + isfree=$(netstat -tapln | grep $port) +done + +for i in $(seq $RETRIES); do + kubectl port-forward -n "$MAKO_STUB_NAMESPACE" "$MAKO_STUB_POD_NAME" $port:$MAKO_STUB_PORT & + PORT_FORWARD_PID=$! + + sleep 10 + + curl --connect-timeout $TIMEOUT "http://localhost:$port/results" > $OUTPUT_FILE + curl_exit_status=$? + + kill $PORT_FORWARD_PID + wait $PORT_FORWARD_PID 2>/dev/null + + if [ 0 -eq $curl_exit_status ]; then + exit 0 + else + sleep $RETRIES_INTERVAL + fi + +done + +exit 1