Mako stub improvements (#868)

* mako-stub now writes columns before the message output
mako-stub now REQUIRES the dev.config in the config-mako ConfigMap
mako-stub waits a configurable time for the stdout flush

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* mako-stub serves results with http AFTER the results are received
Added an handy script to read the results with http

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Trailing new line

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Applied suggested changes + fixed read_results.sh script

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Changes from review

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
This commit is contained in:
Francesco Guardiani 2019-11-22 20:33:09 +01:00 committed by Knative Prow Robot
parent 8c1fb5fe04
commit 2ec0f8da50
4 changed files with 172 additions and 52 deletions

View File

@ -28,36 +28,40 @@ import (
)
const koDataPathEnvName = "KO_DATA_PATH"
const configMako = "/etc/config-mako"
// MustGetBenchmark wraps getBenchmark in log.Fatalf
func MustGetBenchmark() (*string, *string) {
benchmarkKey, benchmarkName, err := getBenchmark()
func MustGetBenchmark() *mpb.BenchmarkInfo {
bench, err := getBenchmark()
if err != nil {
log.Fatalf("unable to determine benchmark_key: %v", err)
log.Fatalf("unable to determine benchmark info: %v", err)
}
return benchmarkKey, benchmarkName
return bench
}
// getBenchmark fetches the appropriate benchmark_key for this configured environment.
func getBenchmark() (*string, *string, error) {
func getBenchmark() (*mpb.BenchmarkInfo, error) {
// Figure out what environment we're running in from the Mako configmap.
env, err := getEnvironment()
if err != nil {
return nil, nil, err
return nil, err
}
// Read the Mako config file for this environment.
data, err := readFileFromKoData(env + ".config")
if err != nil {
return nil, nil, err
data, koerr := readFileFromKoData(env + ".config")
if koerr != nil {
data, err = ioutil.ReadFile(filepath.Join(configMako, env+".config"))
if err != nil {
return nil, fmt.Errorf("cannot load both from kodata and from config mako config map: %s, %s", koerr.Error(), err.Error())
}
}
// Parse the Mako config file.
bi := &mpb.BenchmarkInfo{}
if err := proto.UnmarshalText(string(data), bi); err != nil {
return nil, nil, err
return nil, err
}
// Return the benchmark_key from this environment's config file.
return bi.BenchmarkKey, bi.BenchmarkName, nil
return bi, nil
}
// readFileFromKoData reads the named file from kodata.

View File

@ -176,12 +176,8 @@ func SetupHelper(ctx context.Context, benchmarkKey *string, benchmarkName *strin
}
func Setup(ctx context.Context, extraTags ...string) (*Client, error) {
benchmarkKey, benchmarkName := config.MustGetBenchmark()
return SetupHelper(ctx, benchmarkKey, benchmarkName, extraTags...)
}
func SetupWithBenchmarkConfig(ctx context.Context, benchmarkKey *string, benchmarkName *string, extraTags ...string) (*Client, error) {
return SetupHelper(ctx, benchmarkKey, benchmarkName, extraTags...)
bench := config.MustGetBenchmark()
return SetupHelper(ctx, bench.BenchmarkKey, bench.BenchmarkName, extraTags...)
}
func tokenPath(token string) string {

View File

@ -19,104 +19,150 @@ package main
import (
"context"
"encoding/csv"
"flag"
"fmt"
"log"
"net"
"os"
"net/http"
"strings"
"sync"
"github.com/golang/protobuf/jsonpb"
mako "github.com/google/mako/spec/proto/mako_go_proto"
"log"
"net"
"sync"
"google.golang.org/grpc"
"knative.dev/pkg/test/mako/config"
qspb "knative.dev/pkg/third_party/mako/proto/quickstore_go_proto"
)
const (
port = ":9813"
// A 10 minutes run at 1000 rps of eventing perf tests is usually ~= 70 MBi, so 100MBi is reasonable
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 100
port = ":9813"
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 1024
)
type server struct {
info *mako.BenchmarkInfo
stopOnce sync.Once
stopCh chan struct{}
sb *strings.Builder
}
func (s *server) Store(ctx context.Context, in *qspb.StoreInput) (*qspb.StoreOutput, error) {
m := jsonpb.Marshaler{}
qi, _ := m.MarshalToString(in.GetQuickstoreInput())
fmt.Printf("# %s\n", qi)
cols := []string{"inputValue", "errorMessage"}
writer := csv.NewWriter(os.Stdout)
fmt.Printf("# Received input")
fmt.Fprintf(s.sb, "# %s\n", qi)
writer := csv.NewWriter(s.sb)
kv := calculateKeyIndexColumnsMap(s.info)
cols := make([]string, len(kv))
for k, i := range kv {
cols[i] = k
}
fmt.Fprintf(s.sb, "# %s\n", strings.Join(cols, ","))
for _, sp := range in.GetSamplePoints() {
for _, mv := range sp.GetMetricValueList() {
cols = updateCols(cols, mv.GetValueKey())
vals := map[string]string{"inputValue": fmt.Sprintf("%f", sp.GetInputValue())}
vals[mv.GetValueKey()] = fmt.Sprintf("%f", mv.GetValue())
writer.Write(makeRow(cols, vals))
writer.Write(makeRow(vals, kv))
}
}
for _, ra := range in.GetRunAggregates() {
cols = updateCols(cols, ra.GetValueKey())
vals := map[string]string{ra.GetValueKey(): fmt.Sprintf("%f", ra.GetValue())}
writer.Write(makeRow(cols, vals))
writer.Write(makeRow(vals, kv))
}
for _, sa := range in.GetSampleErrors() {
vals := map[string]string{"inputValue": fmt.Sprintf("%f", sa.GetInputValue()), "errorMessage": sa.GetErrorMessage()}
writer.Write(makeRow(cols, vals))
writer.Write(makeRow(vals, kv))
}
writer.Flush()
fmt.Printf("# %s\n", strings.Join(cols, ","))
fmt.Fprintf(s.sb, "# CSV end\n")
fmt.Printf("# Input completed")
return &qspb.StoreOutput{}, nil
}
func updateCols(prototype []string, k string) []string {
for _, n := range prototype {
if n == k {
return prototype
}
}
prototype = append(prototype, k)
return prototype
}
func makeRow(prototype []string, points map[string]string) []string {
row := make([]string, len(prototype))
// n^2 but whatever
func makeRow(points map[string]string, kv map[string]int) []string {
row := make([]string, len(kv))
for k, v := range points {
for i, n := range prototype {
if k == n {
row[i] = v
}
}
row[kv[k]] = v
}
return row
}
func calculateKeyIndexColumnsMap(info *mako.BenchmarkInfo) map[string]int {
kv := make(map[string]int)
kv["inputValue"] = 0
kv["errorMessage"] = 1
for i, m := range info.MetricInfoList {
kv[*m.ValueKey] = i + 2
}
return kv
}
func (s *server) ShutdownMicroservice(ctx context.Context, in *qspb.ShutdownInput) (*qspb.ShutdownOutput, error) {
s.stopOnce.Do(func() { close(s.stopCh) })
return &qspb.ShutdownOutput{}, nil
}
var httpPort int
func init() {
flag.IntVar(&httpPort, "p", 0, "Port to use for using stub in HTTP mode. 0 means print to logs and quit")
}
func main() {
flag.Parse()
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer(grpc.MaxRecvMsgSize(defaultServerMaxReceiveMessageSize))
stopCh := make(chan struct{})
info := config.MustGetBenchmark()
var sb strings.Builder
fmt.Fprintf(&sb, "# Benchmark %s - %s\n", *info.BenchmarkKey, *info.BenchmarkName)
go func() {
qspb.RegisterQuickstoreServer(s, &server{stopCh: stopCh})
qspb.RegisterQuickstoreServer(s, &server{info: info, stopCh: stopCh, sb: &sb})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
<-stopCh
s.GracefulStop()
results := sb.String()
if httpPort != 0 {
m := http.NewServeMux()
s := http.Server{Addr: fmt.Sprintf(":%d", httpPort), Handler: m}
m.HandleFunc("/results", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", "text/csv")
_, err := fmt.Fprint(w, results)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
})
m.HandleFunc("/close", func(writer http.ResponseWriter, request *http.Request) {
s.Shutdown(context.Background())
})
if err := s.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal(err)
}
fmt.Print("Successfully served the results")
} else {
fmt.Print(sb.String())
}
}

View File

@ -0,0 +1,74 @@
#!/bin/bash
# Copyright 2019 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This scripts helps to read results of mako-stub from http
check_command_exists() {
CMD_NAME=$1
command -v "$CMD_NAME" > /dev/null || {
echo "Command $CMD_NAME does not exist"
exit 1
}
}
check_command_exists kubectl
check_command_exists curl
if [[ $# -lt 7 ]]
then
echo "Usage: $0 <mako_stub_pod_name> <mako_stub_namespace> <mako_stub_port> <timeout> <retries> <retries_interval> <out_file>"
exit 1
fi
MAKO_STUB_POD_NAME="$1"
MAKO_STUB_NAMESPACE="$2"
MAKO_STUB_PORT="$3"
TIMEOUT="$4"
RETRIES="$5"
RETRIES_INTERVAL="$6"
OUTPUT_FILE="$7"
# Find port ready to use
port=10000
isfree=$(netstat -tapln | grep $port)
while [[ -n "$isfree" ]]; do
port=$((port + 1))
isfree=$(netstat -tapln | grep $port)
done
for i in $(seq $RETRIES); do
kubectl port-forward -n "$MAKO_STUB_NAMESPACE" "$MAKO_STUB_POD_NAME" $port:$MAKO_STUB_PORT &
PORT_FORWARD_PID=$!
sleep 10
curl --connect-timeout $TIMEOUT "http://localhost:$port/results" > $OUTPUT_FILE
curl_exit_status=$?
kill $PORT_FORWARD_PID
wait $PORT_FORWARD_PID 2>/dev/null
if [ 0 -eq $curl_exit_status ]; then
exit 0
else
sleep $RETRIES_INTERVAL
fi
done
exit 1