Auto-update dependencies (#113)

Produced via:
  `dep ensure -update knative.dev/test-infra knative.dev/pkg`
/assign mattmoor
This commit is contained in:
mattmoor-sockpuppet 2019-10-02 07:11:07 -07:00 committed by Knative Prow Robot
parent a087d59d01
commit ee274c73a2
18 changed files with 293 additions and 97 deletions

6
Gopkg.lock generated
View File

@ -927,7 +927,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:cf1f8c7dd0c71d2e26bce8889404ede818a53aff0453aa187ead7c9a48103ea9" digest = "1:2a1af5e20affdf0508f4296328c812f1448bff1c5d7a8d74a2a8a56b5913ff72"
name = "knative.dev/pkg" name = "knative.dev/pkg"
packages = [ packages = [
"apis", "apis",
@ -946,7 +946,7 @@
"metrics/metricskey", "metrics/metricskey",
] ]
pruneopts = "T" pruneopts = "T"
revision = "43f0d8fdb9183d6fd95f021d2888c560d2a8c8ff" revision = "849fcc967b598b47945067cfe0536377bac6bd19"
[[projects]] [[projects]]
branch = "master" branch = "master"
@ -957,7 +957,7 @@
"tools/dep-collector", "tools/dep-collector",
] ]
pruneopts = "UT" pruneopts = "UT"
revision = "f0bc0373119bd2d8f7a059d377438b64deb32ff1" revision = "12f3c6a839ace07e8171d00e3057c11ed33c465c"
[[projects]] [[projects]]
digest = "1:8730e0150dfb2b7e173890c8b9868e7a273082ef8e39f4940e3506a481cf895c" digest = "1:8730e0150dfb2b7e173890c8b9868e7a273082ef8e39f4940e3506a481cf895c"

View File

@ -56,7 +56,9 @@ required = [
[[constraint]] [[constraint]]
name = "contrib.go.opencensus.io/exporter/stackdriver" name = "contrib.go.opencensus.io/exporter/stackdriver"
version = "v0.12.2" # The build fails against 0.12.6 and newer because
# stackdriver.Options.GetMonitoredResource was removed.
version = "<=v0.12.5"
[[constraint]] [[constraint]]
name = "github.com/google/mako" name = "github.com/google/mako"

View File

@ -19,6 +19,7 @@ package kmeta
import ( import (
"fmt" "fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@ -59,3 +60,16 @@ func DeletionHandlingAccessor(obj interface{}) (Accessor, error) {
return accessor, nil return accessor, nil
} }
// ObjectReference returns an core/v1.ObjectReference for the given object
func ObjectReference(obj Accessor) corev1.ObjectReference {
gvk := obj.GroupVersionKind()
apiVersion, kind := gvk.ToAPIVersionAndKind()
return corev1.ObjectReference{
APIVersion: apiVersion,
Kind: kind,
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
}

View File

@ -79,7 +79,7 @@ type TableRow struct {
// WantEvents holds the ordered list of events we expect during reconciliation. // WantEvents holds the ordered list of events we expect during reconciliation.
WantEvents []string WantEvents []string
// WantServiceReadyStats holds the ServiceReady stats we exepect during reconciliation. // WantServiceReadyStats holds the ServiceReady stats we expect during reconciliation.
WantServiceReadyStats map[string]int WantServiceReadyStats map[string]int
// WithReactors is a set of functions that are installed as Reactors for the execution // WithReactors is a set of functions that are installed as Reactors for the execution

View File

@ -17,18 +17,42 @@ limitations under the License.
package testing package testing
import ( import (
"sync"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/tracker" "knative.dev/pkg/tracker"
) )
// NullTracker implements Tracker. // NullTracker implements Tracker
type NullTracker struct{} //
// Alias is preserved for backwards compatibility
type NullTracker = FakeTracker
var _ tracker.Interface = (*NullTracker)(nil) // FakeTracker implements Tracker.
type FakeTracker struct {
sync.Mutex
references []corev1.ObjectReference
}
var _ tracker.Interface = (*FakeTracker)(nil)
// OnChanged implements OnChanged. // OnChanged implements OnChanged.
func (*NullTracker) OnChanged(interface{}) {} func (*FakeTracker) OnChanged(interface{}) {}
// Track implements Track. // Track implements Track.
func (*NullTracker) Track(corev1.ObjectReference, interface{}) error { return nil } func (n *FakeTracker) Track(ref corev1.ObjectReference, obj interface{}) error {
n.Lock()
defer n.Unlock()
n.references = append(n.references, ref)
return nil
}
// References returns the list of objects being tracked
func (n *FakeTracker) References() []corev1.ObjectReference {
n.Lock()
defer n.Unlock()
return append(n.references[:0:0], n.references...)
}

View File

@ -18,8 +18,10 @@ package alerter
import ( import (
qpb "github.com/google/mako/proto/quickstore/quickstore_go_proto" qpb "github.com/google/mako/proto/quickstore/quickstore_go_proto"
"knative.dev/pkg/test/helpers"
"knative.dev/pkg/test/mako/alerter/github" "knative.dev/pkg/test/mako/alerter/github"
"knative.dev/pkg/test/mako/alerter/slack" "knative.dev/pkg/test/mako/alerter/slack"
"knative.dev/pkg/test/mako/config"
) )
// Alerter controls alert for performance regressions detected by Mako. // Alerter controls alert for performance regressions detected by Mako.
@ -39,7 +41,7 @@ func (alerter *Alerter) SetupGitHub(org, repo, githubTokenPath string) error {
} }
// SetupSlack will setup Slack for the alerter. // SetupSlack will setup Slack for the alerter.
func (alerter *Alerter) SetupSlack(userName, readTokenPath, writeTokenPath string, channels []slack.Channel) error { func (alerter *Alerter) SetupSlack(userName, readTokenPath, writeTokenPath string, channels []config.Channel) error {
messageHandler, err := slack.Setup(userName, readTokenPath, writeTokenPath, channels, false) messageHandler, err := slack.Setup(userName, readTokenPath, writeTokenPath, channels, false)
if err != nil { if err != nil {
return err return err
@ -49,20 +51,28 @@ func (alerter *Alerter) SetupSlack(userName, readTokenPath, writeTokenPath strin
} }
// HandleBenchmarkResult will handle the benchmark result which returns from `q.Store()` // HandleBenchmarkResult will handle the benchmark result which returns from `q.Store()`
func (alerter *Alerter) HandleBenchmarkResult(testName string, output qpb.QuickstoreOutput, err error) { func (alerter *Alerter) HandleBenchmarkResult(testName string, output qpb.QuickstoreOutput, err error) error {
if err != nil { if err != nil {
if output.GetStatus() == qpb.QuickstoreOutput_ANALYSIS_FAIL { if output.GetStatus() == qpb.QuickstoreOutput_ANALYSIS_FAIL {
var errs []error
summary := output.GetSummaryOutput() summary := output.GetSummaryOutput()
if alerter.githubIssueHandler != nil { if alerter.githubIssueHandler != nil {
alerter.githubIssueHandler.CreateIssueForTest(testName, summary) if err := alerter.githubIssueHandler.CreateIssueForTest(testName, summary); err != nil {
errs = append(errs, err)
}
} }
if alerter.slackMessageHandler != nil { if alerter.slackMessageHandler != nil {
alerter.slackMessageHandler.SendAlert(summary) if err := alerter.slackMessageHandler.SendAlert(summary); err != nil {
errs = append(errs, err)
}
} }
return helpers.CombineErrors(errs)
} }
return return err
} }
if alerter.githubIssueHandler != nil { if alerter.githubIssueHandler != nil {
alerter.githubIssueHandler.CloseIssueForTest(testName) return alerter.githubIssueHandler.CloseIssueForTest(testName)
} }
return nil
} }

View File

@ -23,6 +23,7 @@ import (
"time" "time"
"knative.dev/pkg/test/helpers" "knative.dev/pkg/test/helpers"
"knative.dev/pkg/test/mako/config"
"knative.dev/pkg/test/slackutil" "knative.dev/pkg/test/slackutil"
) )
@ -34,22 +35,16 @@ As of %s, there is a new performance regression detected from automation test:
%s` %s`
) )
// Channel contains Slack channel's info
type Channel struct {
Name string
Identity string
}
// MessageHandler handles methods for slack messages // MessageHandler handles methods for slack messages
type MessageHandler struct { type MessageHandler struct {
readClient slackutil.ReadOperations readClient slackutil.ReadOperations
writeClient slackutil.WriteOperations writeClient slackutil.WriteOperations
channels []Channel channels []config.Channel
dryrun bool dryrun bool
} }
// Setup creates the necessary setup to make calls to work with slack // Setup creates the necessary setup to make calls to work with slack
func Setup(userName, readTokenPath, writeTokenPath string, channels []Channel, dryrun bool) (*MessageHandler, error) { func Setup(userName, readTokenPath, writeTokenPath string, channels []config.Channel, dryrun bool) (*MessageHandler, error) {
readClient, err := slackutil.NewReadClient(userName, readTokenPath) readClient, err := slackutil.NewReadClient(userName, readTokenPath)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot authenticate to slack read client: %v", err) return nil, fmt.Errorf("cannot authenticate to slack read client: %v", err)

View File

@ -30,34 +30,34 @@ import (
const koDataPathEnvName = "KO_DATA_PATH" const koDataPathEnvName = "KO_DATA_PATH"
// MustGetBenchmark wraps getBenchmark in log.Fatalf // MustGetBenchmark wraps getBenchmark in log.Fatalf
func MustGetBenchmark() *string { func MustGetBenchmark() (*string, *string) {
b, err := getBenchmark() benchmarkKey, benchmarkName, err := getBenchmark()
if err != nil { if err != nil {
log.Fatalf("unable to determine benchmark_key: %v", err) log.Fatalf("unable to determine benchmark_key: %v", err)
} }
return b return benchmarkKey, benchmarkName
} }
// getBenchmark fetches the appropriate benchmark_key for this configured environment. // getBenchmark fetches the appropriate benchmark_key for this configured environment.
func getBenchmark() (*string, error) { func getBenchmark() (*string, *string, error) {
// Figure out what environment we're running in from the Mako configmap. // Figure out what environment we're running in from the Mako configmap.
env, err := getEnvironment() env, err := getEnvironment()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
// Read the Mako config file for this environment. // Read the Mako config file for this environment.
data, err := readFileFromKoData(env + ".config") data, err := readFileFromKoData(env + ".config")
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
// Parse the Mako config file. // Parse the Mako config file.
bi := &mpb.BenchmarkInfo{} bi := &mpb.BenchmarkInfo{}
if err := proto.UnmarshalText(string(data), bi); err != nil { if err := proto.UnmarshalText(string(data), bi); err != nil {
return nil, err return nil, nil, err
} }
// Return the benchmark_key from this environment's config file. // Return the benchmark_key from this environment's config file.
return bi.BenchmarkKey, nil return bi.BenchmarkKey, bi.BenchmarkName, nil
} }
// readFileFromKoData reads the named file from kodata. // readFileFromKoData reads the named file from kodata.

View File

@ -17,9 +17,11 @@ limitations under the License.
package config package config
import ( import (
"path/filepath"
"strings" "strings"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/configmap"
) )
const ( const (
@ -29,12 +31,19 @@ const (
// Config defines the mako configuration options. // Config defines the mako configuration options.
type Config struct { type Config struct {
// Repository holds the name of the repository that runs the benchmarks.
Repository string
// Environment holds the name of the environement, // Environment holds the name of the environement,
// where the test runs, e.g. `dev`. // where the test runs, e.g. `dev`.
Environment string Environment string
// List of additional tags to apply to the run. // List of additional tags to apply to the run.
AdditionalTags []string AdditionalTags []string
// SlackConfig holds the slack configurations for the benchmarks,
// it's used to determine which slack channels to alert on if there is performance regression.
SlackConfig string
} }
// NewConfigFromMap creates a Config from the supplied map // NewConfigFromMap creates a Config from the supplied map
@ -44,12 +53,18 @@ func NewConfigFromMap(data map[string]string) (*Config, error) {
AdditionalTags: []string{}, AdditionalTags: []string{},
} }
if raw, ok := data["repository"]; ok {
lc.Repository = raw
}
if raw, ok := data["environment"]; ok { if raw, ok := data["environment"]; ok {
lc.Environment = raw lc.Environment = raw
} }
if raw, ok := data["additionalTags"]; ok && raw != "" { if raw, ok := data["additionalTags"]; ok && raw != "" {
lc.AdditionalTags = strings.Split(raw, ",") lc.AdditionalTags = strings.Split(raw, ",")
} }
if raw, ok := data["slackConfig"]; ok {
lc.SlackConfig = raw
}
return lc, nil return lc, nil
} }
@ -58,3 +73,15 @@ func NewConfigFromMap(data map[string]string) (*Config, error) {
func NewConfigFromConfigMap(configMap *corev1.ConfigMap) (*Config, error) { func NewConfigFromConfigMap(configMap *corev1.ConfigMap) (*Config, error) {
return NewConfigFromMap(configMap.Data) return NewConfigFromMap(configMap.Data)
} }
func loadConfig() (*Config, error) {
makoCM, err := configmap.Load(filepath.Join("/etc", ConfigName))
if err != nil {
return nil, err
}
cfg, err := NewConfigFromMap(makoCM)
if err != nil {
return nil, err
}
return cfg, nil
}

View File

@ -18,33 +18,34 @@ package config
import ( import (
"log" "log"
"path/filepath"
"knative.dev/pkg/configmap"
) )
// TODO: perhaps cache the loaded CM. // TODO: perhaps cache the loaded CM.
// MustGetRepository returns the repository from the configmap, or dies.
func MustGetRepository() string {
cfg, err := loadConfig()
if err != nil {
log.Fatalf("unable to load config from the configmap: %v", err)
}
if cfg.Repository == "" {
log.Fatal("unable to get repository from the configmap")
}
return cfg.Repository
}
// MustGetTags returns the additional tags from the configmap, or dies. // MustGetTags returns the additional tags from the configmap, or dies.
func MustGetTags() []string { func MustGetTags() []string {
makoCM, err := configmap.Load(filepath.Join("/etc", ConfigName)) cfg, err := loadConfig()
if err != nil { if err != nil {
log.Fatalf("unable to load configmap: %v", err) log.Fatalf("unable to load config from the configmap: %v", err)
}
cfg, err := NewConfigFromMap(makoCM)
if err != nil {
log.Fatalf("unable to parse configmap: %v", err)
} }
return cfg.AdditionalTags return cfg.AdditionalTags
} }
// getEnvironment fetches the Mako config environment to which this cluster should publish. // getEnvironment fetches the Mako config environment to which this cluster should publish.
func getEnvironment() (string, error) { func getEnvironment() (string, error) {
makoCM, err := configmap.Load(filepath.Join("/etc", ConfigName)) cfg, err := loadConfig()
if err != nil {
return "", err
}
cfg, err := NewConfigFromMap(makoCM)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -0,0 +1,56 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
yaml "gopkg.in/yaml.v2"
)
var defaultChannel = Channel{Name: "performance", Identity: "CBDMABCTF"}
// Channel contains Slack channel's info.
type Channel struct {
Name string `yaml:"name,omitempty"`
Identity string `yaml:"identity,omitempty"`
}
// SlackConfig contains slack configuration for the benchmarks.
type SlackConfig struct {
BenchmarkChannels map[string][]Channel `yaml:"benchmarkChannels,omitempty"`
}
// GetSlackChannels returns the slack channels to alert on for the given benchmark.
// If any error happens, or the config is not found, return the default channel.
func GetSlackChannels(benchmarkName string) []Channel {
cfg, err := loadConfig()
if err != nil {
return []Channel{defaultChannel}
}
return getSlackChannels(cfg.SlackConfig, benchmarkName)
}
func getSlackChannels(configStr, benchmarkName string) []Channel {
slackConfig := &SlackConfig{}
if err := yaml.Unmarshal([]byte(configStr), slackConfig); err != nil {
return []Channel{defaultChannel}
}
if channels, ok := slackConfig.BenchmarkChannels[benchmarkName]; ok {
return channels
}
return []Channel{defaultChannel}
}

View File

@ -41,6 +41,9 @@ data:
# by adding `foo.config` under their benchmark's kodata directory. # by adding `foo.config` under their benchmark's kodata directory.
environment: dev environment: dev
# Repository name that holds the benchmark.
repository: test-repo
# Additional tags to tag the runs. These tags are added # Additional tags to tag the runs. These tags are added
# to the list that the binary itself publishes (Kubernetes version, etc). # to the list that the binary itself publishes (Kubernetes version, etc).
# It is a comma separated list of tags. # It is a comma separated list of tags.

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"path/filepath"
"runtime" "runtime"
"strings" "strings"
@ -33,6 +34,7 @@ import (
"knative.dev/pkg/changeset" "knative.dev/pkg/changeset"
"knative.dev/pkg/controller" "knative.dev/pkg/controller"
"knative.dev/pkg/injection" "knative.dev/pkg/injection"
"knative.dev/pkg/test/mako/alerter"
"knative.dev/pkg/test/mako/config" "knative.dev/pkg/test/mako/config"
) )
@ -41,8 +43,29 @@ const (
// write results, and it authenticates and publishes them to Mako after // write results, and it authenticates and publishes them to Mako after
// assorted preprocessing. // assorted preprocessing.
sidecarAddress = "localhost:9813" sidecarAddress = "localhost:9813"
// org is the orgnization name that is used by Github client
org = "knative"
// slackUserName is the slack user name that is used by Slack client
slackUserName = "Knative Testgrid Robot"
) )
// Client is a wrapper that wraps all Mako related operations
type Client struct {
Quickstore *quickstore.Quickstore
Context context.Context
ShutDownFunc func(context.Context)
benchmarkName string
alerter *alerter.Alerter
}
// StoreAndHandleResult stores the benchmarking data and handles the result.
func (c *Client) StoreAndHandleResult() error {
out, err := c.Quickstore.Store()
return c.alerter.HandleBenchmarkResult(c.benchmarkName, out, err)
}
// EscapeTag replaces characters that Mako doesn't accept with ones it does. // EscapeTag replaces characters that Mako doesn't accept with ones it does.
func EscapeTag(tag string) string { func EscapeTag(tag string) string {
return strings.ReplaceAll(tag, ".", "_") return strings.ReplaceAll(tag, ".", "_")
@ -52,36 +75,36 @@ func EscapeTag(tag string) string {
// It will add a few common tags and allow each benchmark to add custm tags as well. // It will add a few common tags and allow each benchmark to add custm tags as well.
// It returns the mako client handle to store metrics, a method to close the connection // It returns the mako client handle to store metrics, a method to close the connection
// to mako server once done and error in case of failures. // to mako server once done and error in case of failures.
func Setup(ctx context.Context, extraTags ...string) (context.Context, *quickstore.Quickstore, func(context.Context), error) { func Setup(ctx context.Context, extraTags ...string) (*Client, error) {
tags := append(config.MustGetTags(), extraTags...) tags := append(config.MustGetTags(), extraTags...)
// Get the commit of the benchmarks // Get the commit of the benchmarks
commitID, err := changeset.Get() commitID, err := changeset.Get()
if err != nil { if err != nil {
return nil, nil, nil, err return nil, err
} }
// Setup a deployment informer, so that we can use the lister to track // Setup a deployment informer, so that we can use the lister to track
// desired and available pod counts. // desired and available pod counts.
cfg, err := rest.InClusterConfig() cfg, err := rest.InClusterConfig()
if err != nil { if err != nil {
return nil, nil, nil, err return nil, err
} }
ctx, informers := injection.Default.SetupInformers(ctx, cfg) ctx, informers := injection.Default.SetupInformers(ctx, cfg)
if err := controller.StartInformers(ctx.Done(), informers...); err != nil { if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
return nil, nil, nil, err return nil, err
} }
// Get the Kubernetes version from the API server. // Get the Kubernetes version from the API server.
kc := kubeclient.Get(ctx) kc := kubeclient.Get(ctx)
version, err := kc.Discovery().ServerVersion() version, err := kc.Discovery().ServerVersion()
if err != nil { if err != nil {
return nil, nil, nil, err return nil, err
} }
// Determine the number of Kubernetes nodes through the kubernetes client. // Determine the number of Kubernetes nodes through the kubernetes client.
nodes, err := kc.CoreV1().Nodes().List(metav1.ListOptions{}) nodes, err := kc.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil { if err != nil {
return nil, nil, nil, err return nil, err
} }
tags = append(tags, "nodes="+fmt.Sprintf("%d", len(nodes.Items))) tags = append(tags, "nodes="+fmt.Sprintf("%d", len(nodes.Items)))
@ -102,8 +125,10 @@ func Setup(ctx context.Context, extraTags ...string) (context.Context, *quicksto
tags = append(tags, "instanceType="+EscapeTag(parts[3])) tags = append(tags, "instanceType="+EscapeTag(parts[3]))
} }
benchmarkKey, benchmarkName := config.MustGetBenchmark()
// Create a new Quickstore that connects to the microservice
qs, qclose, err := quickstore.NewAtAddress(ctx, &qpb.QuickstoreInput{ qs, qclose, err := quickstore.NewAtAddress(ctx, &qpb.QuickstoreInput{
BenchmarkKey: config.MustGetBenchmark(), BenchmarkKey: benchmarkKey,
Tags: append(tags, Tags: append(tags,
"commit="+commitID, "commit="+commitID,
"kubernetes="+EscapeTag(version.String()), "kubernetes="+EscapeTag(version.String()),
@ -111,7 +136,34 @@ func Setup(ctx context.Context, extraTags ...string) (context.Context, *quicksto
), ),
}, sidecarAddress) }, sidecarAddress)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, err
} }
return ctx, qs, qclose, nil
// Create a new Alerter that alerts for performance regressions
alerter := &alerter.Alerter{}
alerter.SetupGitHub(
org,
config.MustGetRepository(),
tokenPath("github-token"),
)
alerter.SetupSlack(
slackUserName,
tokenPath("slack-read-token"),
tokenPath("slack-write-token"),
config.GetSlackChannels(*benchmarkName),
)
client := &Client{
Quickstore: qs,
Context: ctx,
ShutDownFunc: qclose,
alerter: alerter,
benchmarkName: *benchmarkName,
}
return client, nil
}
func tokenPath(token string) string {
return filepath.Join("/var/secrets", token)
} }

View File

@ -24,6 +24,7 @@ import (
"net" "net"
"os" "os"
"strings" "strings"
"sync"
"github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/jsonpb"
@ -33,10 +34,13 @@ import (
const ( const (
port = ":9813" port = ":9813"
// A 10 minutes run at 1000 rps of eventing perf tests is usually ~= 70 MBi, so 100MBi is reasonable
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 100
) )
type server struct { type server struct {
stopCh chan struct{} stopOnce sync.Once
stopCh chan struct{}
} }
func (s *server) Store(ctx context.Context, in *qspb.StoreInput) (*qspb.StoreOutput, error) { func (s *server) Store(ctx context.Context, in *qspb.StoreInput) (*qspb.StoreOutput, error) {
@ -96,7 +100,7 @@ func makeRow(prototype []string, points map[string]string) []string {
} }
func (s *server) ShutdownMicroservice(ctx context.Context, in *qspb.ShutdownInput) (*qspb.ShutdownOutput, error) { func (s *server) ShutdownMicroservice(ctx context.Context, in *qspb.ShutdownInput) (*qspb.ShutdownOutput, error) {
close(s.stopCh) s.stopOnce.Do(func() { close(s.stopCh) })
return &qspb.ShutdownOutput{}, nil return &qspb.ShutdownOutput{}, nil
} }
@ -105,7 +109,7 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("failed to listen: %v", err) log.Fatalf("failed to listen: %v", err)
} }
s := grpc.NewServer() s := grpc.NewServer(grpc.MaxRecvMsgSize(defaultServerMaxReceiveMessageSize))
stopCh := make(chan struct{}) stopCh := make(chan struct{})
go func() { go func() {
qspb.RegisterQuickstoreServer(s, &server{stopCh: stopCh}) qspb.RegisterQuickstoreServer(s, &server{stopCh: stopCh})

View File

@ -87,8 +87,7 @@ type SpoofingClient struct {
Client *http.Client Client *http.Client
RequestInterval time.Duration RequestInterval time.Duration
RequestTimeout time.Duration RequestTimeout time.Duration
Logf logging.FormatLogger
logf logging.FormatLogger
} }
// TransportOption allows callers to customize the http.Transport used by a SpoofingClient // TransportOption allows callers to customize the http.Transport used by a SpoofingClient
@ -139,7 +138,7 @@ func New(
Client: &http.Client{Transport: roundTripper}, Client: &http.Client{Transport: roundTripper},
RequestInterval: requestInterval, RequestInterval: requestInterval,
RequestTimeout: RequestTimeout, RequestTimeout: RequestTimeout,
logf: logf, Logf: logf,
} }
return &sc, nil return &sc, nil
} }
@ -215,17 +214,17 @@ func (sc *SpoofingClient) Poll(req *http.Request, inState ResponseChecker) (*Res
resp, err = sc.Do(req) resp, err = sc.Do(req)
if err != nil { if err != nil {
if isTCPTimeout(err) { if isTCPTimeout(err) {
sc.logf("Retrying %s for TCP timeout %v", req.URL.String(), err) sc.Logf("Retrying %s for TCP timeout %v", req.URL.String(), err)
return false, nil return false, nil
} }
// Retrying on DNS error, since we may be using xip.io or nip.io in tests. // Retrying on DNS error, since we may be using xip.io or nip.io in tests.
if isDNSError(err) { if isDNSError(err) {
sc.logf("Retrying %s for DNS error %v", req.URL.String(), err) sc.Logf("Retrying %s for DNS error %v", req.URL.String(), err)
return false, nil return false, nil
} }
// Repeat the poll on `connection refused` errors, which are usually transient Istio errors. // Repeat the poll on `connection refused` errors, which are usually transient Istio errors.
if isTCPConnectRefuse(err) { if isTCPConnectRefuse(err) {
sc.logf("Retrying %s for connection refused %v", req.URL.String(), err) sc.Logf("Retrying %s for connection refused %v", req.URL.String(), err)
return false, nil return false, nil
} }
return true, err return true, err
@ -252,14 +251,14 @@ func (sc *SpoofingClient) logZipkinTrace(spoofResp *Response) {
} }
traceID := spoofResp.Header.Get(zipkin.ZipkinTraceIDHeader) traceID := spoofResp.Header.Get(zipkin.ZipkinTraceIDHeader)
sc.logf("Logging Zipkin Trace for: %s", traceID) sc.Logf("Logging Zipkin Trace for: %s", traceID)
json, err := zipkin.JSONTrace(traceID /* We don't know the expected number of spans */, -1, 5*time.Second) json, err := zipkin.JSONTrace(traceID /* We don't know the expected number of spans */, -1, 5*time.Second)
if err != nil { if err != nil {
if _, ok := err.(*zipkin.TimeoutError); !ok { if _, ok := err.(*zipkin.TimeoutError); !ok {
sc.logf("Error getting zipkin trace: %v", err) sc.Logf("Error getting zipkin trace: %v", err)
} }
} }
sc.logf("%s", json) sc.Logf("%s", json)
} }

View File

@ -24,7 +24,6 @@ type Client interface {
// ClusterOperations contains all provider specific logics // ClusterOperations contains all provider specific logics
type ClusterOperations interface { type ClusterOperations interface {
Provider() string Provider() string
Initialize() error
Acquire() error Acquire() error
Delete() error Delete() error
} }

View File

@ -87,6 +87,13 @@ type GKERequest struct {
// Addons: cluster addons to be added to cluster, such as istio // Addons: cluster addons to be added to cluster, such as istio
Addons []string Addons []string
// SkipCreation: skips cluster creation
SkipCreation bool
// NeedsCleanup: enforce clean up if given this option, used when running
// locally
NeedsCleanup bool
} }
// GKECluster implements ClusterOperations // GKECluster implements ClusterOperations
@ -94,12 +101,12 @@ type GKECluster struct {
Request *GKERequest Request *GKERequest
// Project might be GKE specific, so put it here // Project might be GKE specific, so put it here
Project *string Project *string
// NeedCleanup tells whether the cluster needs to be deleted afterwards // NeedsCleanup tells whether the cluster needs to be deleted afterwards
// This probably should be part of task wrapper's logic // This probably should be part of task wrapper's logic
NeedCleanup bool NeedsCleanup bool
Cluster *container.Cluster Cluster *container.Cluster
operations GKESDKOperations operations GKESDKOperations
boskosOps boskos.Operation boskosOps boskos.Operation
} }
// GKESDKOperations wraps GKE SDK related functions // GKESDKOperations wraps GKE SDK related functions
@ -153,7 +160,7 @@ func (gs *GKEClient) Setup(r GKERequest) ClusterOperations {
if r.Project != "" { // use provided project and create cluster if r.Project != "" { // use provided project and create cluster
gc.Project = &r.Project gc.Project = &r.Project
gc.NeedCleanup = true gc.NeedsCleanup = true
} }
if r.MinNodes == 0 { if r.MinNodes == 0 {
@ -207,15 +214,21 @@ func (gs *GKEClient) Setup(r GKERequest) ClusterOperations {
return gc return gc
} }
// Initialize checks environment for cluster and projects to decide whether using // initialize checks environment for cluster and projects to decide whether using
// existing cluster/project or creating new ones. // existing cluster/project or creating new ones.
func (gc *GKECluster) Initialize() error { func (gc *GKECluster) initialize() error {
// Try obtain project name via `kubectl`, `gcloud` // Try obtain project name via `kubectl`, `gcloud`
if gc.Project == nil { if gc.Project == nil {
if err := gc.checkEnvironment(); err != nil { if err := gc.checkEnvironment(); err != nil {
return fmt.Errorf("failed checking existing cluster: '%v'", err) return fmt.Errorf("failed checking existing cluster: '%v'", err)
} else if gc.Cluster != nil { // return if Cluster was already set by kubeconfig } else if gc.Cluster != nil { // Return if Cluster was already set by kubeconfig
return nil // If clustername provided and kubeconfig set, ignore kubeconfig
if gc.Request != nil && gc.Request.ClusterName != "" && gc.Cluster.Name != gc.Request.ClusterName {
gc.Cluster = nil
}
if gc.Cluster != nil {
return nil
}
} }
} }
// Get project name from boskos if running in Prow // Get project name from boskos if running in Prow
@ -230,7 +243,7 @@ func (gc *GKECluster) Initialize() error {
return errors.New("gcp project must be set") return errors.New("gcp project must be set")
} }
if !common.IsProw() && gc.Cluster == nil { if !common.IsProw() && gc.Cluster == nil {
gc.NeedCleanup = true gc.NeedsCleanup = true
} }
log.Printf("Using project %q for running test", *gc.Project) log.Printf("Using project %q for running test", *gc.Project)
return nil return nil
@ -246,21 +259,26 @@ func (gc *GKECluster) Provider() string {
// in us-central1, and default BackupRegions are us-west1 and us-east1. If // in us-central1, and default BackupRegions are us-west1 and us-east1. If
// Region or Zone is provided then there is no retries // Region or Zone is provided then there is no retries
func (gc *GKECluster) Acquire() error { func (gc *GKECluster) Acquire() error {
if err := gc.initialize(); err != nil {
return fmt.Errorf("failed initialing with environment: '%v'", err)
}
gc.ensureProtected() gc.ensureProtected()
var clusterName string clusterName := gc.Request.ClusterName
var err error var err error
// Check if using existing cluster // Check if using existing cluster
if gc.Cluster != nil { if gc.Cluster != nil {
return nil return nil
} }
if gc.Request.SkipCreation {
log.Println("Skipping cluster creation as SkipCreation is set")
return nil
}
// Perform GKE specific cluster creation logics // Perform GKE specific cluster creation logics
if gc.Request.ClusterName == "" { if gc.Request.ClusterName == "" {
clusterName, err = getResourceName(ClusterResource) clusterName, err = getResourceName(ClusterResource)
if err != nil { if err != nil {
return fmt.Errorf("failed getting cluster name: '%v'", err) return fmt.Errorf("failed getting cluster name: '%v'", err)
} }
} else {
clusterName = gc.Request.ClusterName
} }
regions := []string{gc.Request.Region} regions := []string{gc.Request.Region}
@ -341,7 +359,7 @@ func (gc *GKECluster) Acquire() error {
} }
if err != nil { if err != nil {
errMsg := fmt.Sprintf("Error during cluster creation: '%v'. ", err) errMsg := fmt.Sprintf("Error during cluster creation: '%v'. ", err)
if gc.NeedCleanup { // Delete half created cluster if it's user created if gc.NeedsCleanup { // Delete half created cluster if it's user created
errMsg = fmt.Sprintf("%sDeleting cluster %q in %q in background...\n", errMsg, clusterName, clusterLoc) errMsg = fmt.Sprintf("%sDeleting cluster %q in %q in background...\n", errMsg, clusterName, clusterLoc)
go gc.operations.delete(*gc.Project, clusterName, clusterLoc) go gc.operations.delete(*gc.Project, clusterName, clusterLoc)
} }
@ -364,6 +382,9 @@ func (gc *GKECluster) Acquire() error {
// Delete takes care of GKE cluster resource cleanup. It only release Boskos resource if running in // Delete takes care of GKE cluster resource cleanup. It only release Boskos resource if running in
// Prow, otherwise deletes the cluster if marked NeedsCleanup // Prow, otherwise deletes the cluster if marked NeedsCleanup
func (gc *GKECluster) Delete() error { func (gc *GKECluster) Delete() error {
if err := gc.initialize(); err != nil {
return fmt.Errorf("failed initialing with environment: '%v'", err)
}
gc.ensureProtected() gc.ensureProtected()
// Release Boskos if running in Prow, will let Janitor taking care of // Release Boskos if running in Prow, will let Janitor taking care of
// clusters deleting // clusters deleting
@ -372,9 +393,9 @@ func (gc *GKECluster) Delete() error {
return gc.boskosOps.ReleaseGKEProject(nil, *gc.Project) return gc.boskosOps.ReleaseGKEProject(nil, *gc.Project)
} }
// NeedCleanup is only true if running locally and cluster created by the // NeedsCleanup is only true if running locally and cluster created by the
// process // process
if !gc.NeedCleanup { if !gc.NeedsCleanup && !gc.Request.NeedsCleanup {
return nil return nil
} }
// Should only get here if running locally and cluster created by this // Should only get here if running locally and cluster created by this

View File

@ -121,17 +121,6 @@ func (i *impl) Track(ref corev1.ObjectReference, obj interface{}) error {
return nil return nil
} }
func objectReference(item kmeta.Accessor) corev1.ObjectReference {
gvk := item.GroupVersionKind()
apiVersion, kind := gvk.ToAPIVersionAndKind()
return corev1.ObjectReference{
APIVersion: apiVersion,
Kind: kind,
Namespace: item.GetNamespace(),
Name: item.GetName(),
}
}
func isExpired(expiry time.Time) bool { func isExpired(expiry time.Time) bool {
return time.Now().After(expiry) return time.Now().After(expiry)
} }
@ -144,7 +133,7 @@ func (i *impl) OnChanged(obj interface{}) {
return return
} }
or := objectReference(item) or := kmeta.ObjectReference(item)
// TODO(mattmoor): Consider locking the mapping (global) for a // TODO(mattmoor): Consider locking the mapping (global) for a
// smaller scope and leveraging a per-set lock to guard its access. // smaller scope and leveraging a per-set lock to guard its access.