Finish the initial Alerter support for Mako (#645)

* add slack operations and the main alerter

* function renaming

* solve the codereview issues

* address feedbacks

* add unit test and fix some old unit tests
This commit is contained in:
Chi Zhang 2019-09-11 14:35:31 -07:00 committed by Knative Prow Robot
parent 7f77962556
commit d484d03f55
11 changed files with 479 additions and 83 deletions

View File

@ -16,7 +16,7 @@ limitations under the License.
// error.go helps with error handling
package alerter
package helpers
import (
"errors"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package alerter
package helpers
import (
"log"

View File

@ -0,0 +1,68 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package alerter
import (
qpb "github.com/google/mako/proto/quickstore/quickstore_go_proto"
"knative.dev/pkg/test/mako/alerter/github"
"knative.dev/pkg/test/mako/alerter/slack"
)
// Alerter controls alert for performance regressions detected by Mako.
type Alerter struct {
githubIssueHandler *github.IssueHandler
slackMessageHandler *slack.MessageHandler
}
// SetupGitHub will setup SetupGitHub for the alerter.
func (alerter *Alerter) SetupGitHub(org, repo, githubTokenPath string) error {
issueHandler, err := github.Setup(org, repo, githubTokenPath, false)
if err != nil {
return err
}
alerter.githubIssueHandler = issueHandler
return nil
}
// SetupSlack will setup Slack for the alerter.
func (alerter *Alerter) SetupSlack(repo, userName, readTokenPath, writeTokenPath string) error {
messageHandler, err := slack.Setup(userName, readTokenPath, writeTokenPath, repo, false)
if err != nil {
return err
}
alerter.slackMessageHandler = messageHandler
return nil
}
// HandleBenchmarkResult will handle the benchmark result which returns from `q.Store()`
func (alerter *Alerter) HandleBenchmarkResult(testName string, output qpb.QuickstoreOutput, err error) {
if err != nil {
if output.GetStatus() == qpb.QuickstoreOutput_ANALYSIS_FAIL {
summary := output.GetSummaryOutput()
if alerter.githubIssueHandler != nil {
alerter.githubIssueHandler.CreateIssueForTest(testName, summary)
}
if alerter.slackMessageHandler != nil {
alerter.slackMessageHandler.SendAlert(summary)
}
}
return
}
if alerter.githubIssueHandler != nil {
alerter.githubIssueHandler.CloseIssueForTest(testName)
}
}

View File

@ -23,7 +23,7 @@ import (
"github.com/google/go-github/github"
"knative.dev/pkg/test/ghutil"
"knative.dev/pkg/test/mako/alerter"
"knative.dev/pkg/test/helpers"
)
const (
@ -39,14 +39,14 @@ const (
### Auto-generated issue tracking performance regression
* **Test name**: %s`
// reopenIssueCommentTemplate is a template for the comment of an issue that is reopened
reopenIssueCommentTemplate = `
New regression has been detected, reopening this issue:
%s`
// newIssueCommentTemplate is a template for the comment of an issue that has been quiet for a long time
newIssueCommentTemplate = `
A new regression for this test has been detected:
%s`
// reopenIssueCommentTemplate is a template for the comment of an issue that is reopened
reopenIssueCommentTemplate = `
New regression has been detected, reopening this issue:
%s`
// closeIssueComment is the comment of an issue when we close it
@ -54,8 +54,8 @@ A new regression for this test has been detected:
The performance regression goes way for this test, closing this issue.`
)
// issueHandler handles methods for github issues
type issueHandler struct {
// IssueHandler handles methods for github issues
type IssueHandler struct {
client ghutil.GithubOperations
config config
}
@ -68,17 +68,18 @@ type config struct {
}
// Setup creates the necessary setup to make calls to work with github issues
func Setup(githubToken string, config config) (*issueHandler, error) {
ghc, err := ghutil.NewGithubClient(githubToken)
func Setup(org, repo, githubTokenPath string, dryrun bool) (*IssueHandler, error) {
ghc, err := ghutil.NewGithubClient(githubTokenPath)
if err != nil {
return nil, fmt.Errorf("cannot authenticate to github: %v", err)
}
return &issueHandler{client: ghc, config: config}, nil
conf := config{org: org, repo: repo, dryrun: dryrun}
return &IssueHandler{client: ghc, config: conf}, nil
}
// CreateIssueForTest will try to add an issue with the given testName and description.
// If there is already an issue related to the test, it will try to update that issue.
func (gih *issueHandler) CreateIssueForTest(testName, desc string) error {
func (gih *IssueHandler) CreateIssueForTest(testName, desc string) error {
org := gih.config.org
repo := gih.config.repo
dryrun := gih.config.dryrun
@ -87,7 +88,8 @@ func (gih *issueHandler) CreateIssueForTest(testName, desc string) error {
// If the issue hasn't been created, create one
if issue == nil {
body := fmt.Sprintf(issueBodyTemplate, testName)
if err := gih.createNewIssue(org, repo, title, body, dryrun); err != nil {
issue, err := gih.createNewIssue(org, repo, title, body, dryrun)
if err != nil {
return err
}
comment := fmt.Sprintf(newIssueCommentTemplate, desc)
@ -118,9 +120,9 @@ func (gih *issueHandler) CreateIssueForTest(testName, desc string) error {
}
// createNewIssue will create a new issue, and add perfLabel for it.
func (gih *issueHandler) createNewIssue(org, repo, title, body string, dryrun bool) error {
func (gih *IssueHandler) createNewIssue(org, repo, title, body string, dryrun bool) (*github.Issue, error) {
var newIssue *github.Issue
if err := alerter.Run(
if err := helpers.Run(
"creating issue",
func() error {
var err error
@ -129,20 +131,23 @@ func (gih *issueHandler) createNewIssue(org, repo, title, body string, dryrun bo
},
dryrun,
); nil != err {
return err
return nil, err
}
return alerter.Run(
if err := helpers.Run(
"adding perf label",
func() error {
return gih.client.AddLabelsToIssue(org, repo, *newIssue.Number, []string{perfLabel})
},
dryrun,
)
); nil != err {
return nil, err
}
return newIssue, nil
}
// CloseIssueForTest will try to close the issue for the given testName.
// If there is no issue related to the test or the issue is already closed, the function will do nothing.
func (gih *issueHandler) CloseIssueForTest(testName string) error {
func (gih *IssueHandler) CloseIssueForTest(testName string) error {
org := gih.config.org
repo := gih.config.repo
dryrun := gih.config.dryrun
@ -153,7 +158,7 @@ func (gih *issueHandler) CloseIssueForTest(testName string) error {
}
issueNumber := *issue.Number
if err := alerter.Run(
if err := helpers.Run(
"add comment for the issue to close",
func() error {
_, cErr := gih.client.CreateComment(org, repo, issueNumber, closeIssueComment)
@ -163,7 +168,7 @@ func (gih *issueHandler) CloseIssueForTest(testName string) error {
); err != nil {
return err
}
return alerter.Run(
return helpers.Run(
"closing issue",
func() error {
return gih.client.CloseIssue(org, repo, issueNumber)
@ -173,8 +178,8 @@ func (gih *issueHandler) CloseIssueForTest(testName string) error {
}
// reopenIssue will reopen the given issue.
func (gih *issueHandler) reopenIssue(org, repo string, issueNumber int, dryrun bool) error {
return alerter.Run(
func (gih *IssueHandler) reopenIssue(org, repo string, issueNumber int, dryrun bool) error {
return helpers.Run(
"reopen the issue",
func() error {
return gih.client.ReopenIssue(org, repo, issueNumber)
@ -184,9 +189,9 @@ func (gih *issueHandler) reopenIssue(org, repo string, issueNumber int, dryrun b
}
// findIssue will return the issue in the given repo if it exists.
func (gih *issueHandler) findIssue(org, repo, title string, dryrun bool) *github.Issue {
func (gih *IssueHandler) findIssue(org, repo, title string, dryrun bool) *github.Issue {
var issues []*github.Issue
alerter.Run(
helpers.Run(
"list issues in the repo",
func() error {
var err error
@ -204,8 +209,8 @@ func (gih *issueHandler) findIssue(org, repo, title string, dryrun bool) *github
}
// addComment will add comment for the given issue.
func (gih *issueHandler) addComment(org, repo string, issueNumber int, commentBody string, dryrun bool) error {
return alerter.Run(
func (gih *IssueHandler) addComment(org, repo string, issueNumber int, commentBody string, dryrun bool) error {
return helpers.Run(
"add comment for issue",
func() error {
_, err := gih.client.CreateComment(org, repo, issueNumber, commentBody)

View File

@ -18,19 +18,21 @@ package github
import (
"fmt"
"os"
"testing"
"knative.dev/pkg/test/ghutil"
"knative.dev/pkg/test/ghutil/fakeghutil"
)
var gih issueHandler
var gih IssueHandler
func TestMain(m *testing.M) {
gih = issueHandler{
gih = IssueHandler{
client: fakeghutil.NewFakeGithubClient(),
config: config{org: "test_org", repo: "test_repo", dryrun: false},
}
os.Exit(m.Run())
}
func TestNewIssueWillBeAdded(t *testing.T) {
@ -70,7 +72,9 @@ func TestIssueCanBeClosed(t *testing.T) {
testName := "test closing existed issue"
testDesc := "test closing existed issue desc"
issueTitle := fmt.Sprintf(issueTitleTemplate, testName)
gih.client.CreateIssue(org, repo, issueTitle, testDesc)
if err := gih.CreateIssueForTest(testName, testDesc); err != nil {
t.Fatalf("expected to create a new issue %v, but failed", testName)
}
if err := gih.CloseIssueForTest(testName); err != nil {
t.Fatalf("tried to close the existed issue %v, but got an error %v", testName, err)

View File

@ -17,30 +17,40 @@ limitations under the License.
package slack
import (
"flag"
"fmt"
"sync"
"time"
"knative.dev/pkg/test/mako/alerter"
"knative.dev/pkg/test/helpers"
"knative.dev/pkg/test/slackutil"
)
const messageTemplate = `
var minInterval = flag.Duration("min-alert-interval", 24*time.Hour, "The minimum interval of sending Slack alerts.")
const (
messageTemplate = `
As of %s, there is a new performance regression detected from automation test:
%s`
)
// messageHandler handles methods for slack messages
type messageHandler struct {
client slackutil.Operations
config repoConfig
dryrun bool
// MessageHandler handles methods for slack messages
type MessageHandler struct {
readClient slackutil.ReadOperations
writeClient slackutil.WriteOperations
config repoConfig
dryrun bool
}
// Setup creates the necessary setup to make calls to work with slack
func Setup(userName, tokenPath, repo string, dryrun bool) (*messageHandler, error) {
client, err := slackutil.NewClient(userName, tokenPath)
func Setup(userName, readTokenPath, writeTokenPath, repo string, dryrun bool) (*MessageHandler, error) {
readClient, err := slackutil.NewReadClient(userName, readTokenPath)
if err != nil {
return nil, fmt.Errorf("cannot authenticate to slack: %v", err)
return nil, fmt.Errorf("cannot authenticate to slack read client: %v", err)
}
writeClient, err := slackutil.NewWriteClient(userName, writeTokenPath)
if err != nil {
return nil, fmt.Errorf("cannot authenticate to slack write client: %v", err)
}
var config *repoConfig
for _, repoConfig := range repoConfigs {
@ -52,30 +62,66 @@ func Setup(userName, tokenPath, repo string, dryrun bool) (*messageHandler, erro
if config == nil {
return nil, fmt.Errorf("no channel configuration found for repo %v", repo)
}
return &messageHandler{client: client, config: *config, dryrun: dryrun}, nil
return &MessageHandler{
readClient: readClient,
writeClient: writeClient,
config: *config,
dryrun: dryrun,
}, nil
}
// Post will post the given text to the slack channel(s)
func (smh *messageHandler) Post(text string) error {
// TODO(Fredy-Z): add deduplication logic, maybe do not send more than one alert within 24 hours?
errs := make([]error, 0)
// SendAlert will send the alert text to the slack channel(s)
func (smh *MessageHandler) SendAlert(text string) error {
dryrun := smh.dryrun
channels := smh.config.channels
mux := &sync.Mutex{}
errCh := make(chan error)
var wg sync.WaitGroup
for i := range channels {
channel := channels[i]
wg.Add(1)
go func() {
defer wg.Done()
// get the recent message history in the channel for this user
startTime := time.Now().Add(-1 * *minInterval)
var messageHistory []string
if err := helpers.Run(
fmt.Sprintf("retrieving message history in channel %q", channel.name),
func() error {
var err error
messageHistory, err = smh.readClient.MessageHistory(channel.identity, startTime)
return err
},
dryrun,
); err != nil {
errCh <- fmt.Errorf("failed to retrieve message history in channel %q", channel.name)
}
// do not send message again if messages were sent on the same channel a short while ago
if len(messageHistory) != 0 {
return
}
// send the alert message to the channel
message := fmt.Sprintf(messageTemplate, time.Now(), text)
if err := smh.client.Post(message, channel.identity); err != nil {
mux.Lock()
errs = append(errs, fmt.Errorf("failed to send message to channel %v", channel))
mux.Unlock()
if err := helpers.Run(
fmt.Sprintf("sending message %q to channel %q", message, channel.name),
func() error {
return smh.writeClient.Post(message, channel.identity)
},
dryrun,
); err != nil {
errCh <- fmt.Errorf("failed to send message to channel %q", channel.name)
}
}()
}
wg.Wait()
return alerter.CombineErrors(errs)
go func() {
wg.Wait()
close(errCh)
}()
errs := make([]error, 0)
for err := range errCh {
errs = append(errs, err)
}
return helpers.CombineErrors(errs)
}

View File

@ -0,0 +1,74 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package slack
import (
"os"
"testing"
"time"
"knative.dev/pkg/test/slackutil/fakeslackutil"
)
var mh MessageHandler
func TestMain(m *testing.M) {
client := fakeslackutil.NewFakeSlackClient()
mh = MessageHandler{
readClient: client,
writeClient: client,
config: repoConfig{
repo: "test_repo",
channels: []channel{
channel{name: "test_channel1", identity: "fsfdsf"},
channel{name: "test_channel2", identity: "fdsfhfdh"},
},
},
dryrun: false,
}
os.Exit(m.Run())
}
func TestMessaging(t *testing.T) {
firstMsg := "first message"
if err := mh.SendAlert(firstMsg); err != nil {
t.Fatalf("expected to send the message, but failed: %v", err)
}
for _, channel := range mh.config.channels {
history, err := mh.readClient.MessageHistory(channel.identity, time.Now().Add(-1*time.Hour))
if err != nil {
t.Fatalf("expected to get the message history, but failed: %v", err)
}
if len(history) != 1 {
t.Fatalf("the message is expected to be successfully sent, but failed: %v", err)
}
}
secondMsg := "second message"
if err := mh.SendAlert(secondMsg); err != nil {
t.Fatalf("expected to send the message, but failed: %v", err)
}
for _, channel := range mh.config.channels {
history, err := mh.readClient.MessageHistory(channel.identity, time.Now().Add(-1*time.Hour))
if err != nil {
t.Fatalf("expected to get the message history, but failed: %v", err)
}
if len(history) != 1 {
t.Fatalf("the message history is expected to still be 1, but now it's: %d", len(history))
}
}
}

View File

@ -0,0 +1,71 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// fakeslackutil.go fakes SlackClient for testing purpose
package fakeslackutil
import (
"sync"
"time"
)
type messageEntry struct {
text string
sentTime time.Time
}
// FakeSlackClient is a faked client, implements all functions of slackutil.ReadOperations and slackutil.WriteOperations
type FakeSlackClient struct {
History map[string][]messageEntry
mutex sync.RWMutex
}
// NewFakeSlackClient creates a FakeSlackClient and initialize it's maps
func NewFakeSlackClient() *FakeSlackClient {
return &FakeSlackClient{
History: make(map[string][]messageEntry), // map of channel name: slice of messages sent to the channel
mutex: sync.RWMutex{},
}
}
// MessageHistory returns the messages to the channel from the given startTime
func (c *FakeSlackClient) MessageHistory(channel string, startTime time.Time) ([]string, error) {
c.mutex.Lock()
messages := make([]string, 0)
if history, ok := c.History[channel]; ok {
for _, msg := range history {
if time.Now().After(startTime) {
messages = append(messages, msg.text)
}
}
}
c.mutex.Unlock()
return messages, nil
}
// Post sends the text as a message to the given channel
func (c *FakeSlackClient) Post(text, channel string) error {
c.mutex.Lock()
messages := make([]messageEntry, 0)
if history, ok := c.History[channel]; ok {
messages = history
}
messages = append(messages, messageEntry{text: text, sentTime: time.Now()})
c.History[channel] = messages
c.mutex.Unlock()
return nil
}

53
test/slackutil/http.go Normal file
View File

@ -0,0 +1,53 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// http.go includes functions to send HTTP requests.
package slackutil
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
)
// post sends an HTTP post request
func post(url string, uv url.Values) ([]byte, error) {
resp, err := http.PostForm(url, uv)
if err != nil {
return nil, err
}
return handleResponse(resp)
}
// get sends an HTTP get request
func get(url string) ([]byte, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
return handleResponse(resp)
}
// handleResponse handles the HTTP response and returns the body content
func handleResponse(resp *http.Response) ([]byte, error) {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http response code is not StatusOK: '%v'", resp.StatusCode)
}
return ioutil.ReadAll(resp.Body)
}

View File

@ -0,0 +1,88 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// message_read.go includes functions to read messages from Slack.
package slackutil
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
"strconv"
"strings"
"time"
)
const conversationHistoryURL = "https://slack.com/api/conversations.history"
// ReadOperations defines the read operations that can be done to Slack
type ReadOperations interface {
MessageHistory(channel string, startTime time.Time) ([]string, error)
}
// readClient contains Slack bot related information to perform read operations
type readClient struct {
userName string
tokenStr string
}
// NewReadClient reads token file and stores it for later authentication
func NewReadClient(userName, tokenPath string) (ReadOperations, error) {
b, err := ioutil.ReadFile(tokenPath)
if err != nil {
return nil, err
}
return &readClient{
userName: userName,
tokenStr: strings.TrimSpace(string(b)),
}, nil
}
func (c *readClient) MessageHistory(channel string, startTime time.Time) ([]string, error) {
u, _ := url.Parse(conversationHistoryURL)
q := u.Query()
q.Add("username", c.userName)
q.Add("token", c.tokenStr)
q.Add("channel", channel)
q.Add("oldest", strconv.FormatInt(startTime.Unix(), 10))
u.RawQuery = q.Encode()
content, err := get(u.String())
if err != nil {
return nil, err
}
// response code could also be 200 if channel doesn't exist, parse response body to find out
type m struct {
Text string `json:"text"`
}
var r struct {
OK bool `json:"ok"`
Messages []m `json:"messages"`
}
if err = json.Unmarshal(content, &r); nil != err || !r.OK {
return nil, fmt.Errorf("response not ok '%s'", string(content))
}
res := make([]string, len(r.Messages))
for i, message := range r.Messages {
res[i] = message.Text
}
return res, nil
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// messaging.go includes functions to send message to Slack channel.
// message_write.go includes functions to send messages to Slack.
package slackutil
@ -24,67 +24,54 @@ import (
"io/ioutil"
"strings"
"net/http"
"net/url"
)
const postMessageURL = "https://slack.com/api/chat.postMessage"
// Operations defines the operations that can be done to Slack
type Operations interface {
// WriteOperations defines the write operations that can be done to Slack
type WriteOperations interface {
Post(text, channel string) error
}
// client contains Slack bot related information
type client struct {
userName string
tokenStr string
iconEmoji *string
// writeClient contains Slack bot related information to perform write operations
type writeClient struct {
userName string
tokenStr string
}
// NewClient reads token file and stores it for later authentication
func NewClient(userName, tokenPath string) (Operations, error) {
// NewWriteClient reads token file and stores it for later authentication
func NewWriteClient(userName, tokenPath string) (WriteOperations, error) {
b, err := ioutil.ReadFile(tokenPath)
if err != nil {
return nil, err
}
return &client{
return &writeClient{
userName: userName,
tokenStr: strings.TrimSpace(string(b)),
}, nil
}
// Post posts the given text to channel
func (c *client) Post(text, channel string) error {
func (c *writeClient) Post(text, channel string) error {
uv := url.Values{}
uv.Add("username", c.userName)
uv.Add("token", c.tokenStr)
if nil != c.iconEmoji {
uv.Add("icon_emoji", *c.iconEmoji)
}
uv.Add("channel", channel)
uv.Add("text", text)
return c.postMessage(uv)
}
// postMessage does http post
func (c *client) postMessage(uv url.Values) error {
resp, err := http.PostForm(postMessageURL, uv)
content, err := post(postMessageURL, uv)
if err != nil {
return err
}
defer resp.Body.Close()
t, _ := ioutil.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("http response code is not '%d': '%s'", http.StatusOK, string(t))
}
// response code could also be 200 if channel doesn't exist, parse response body to find out
var b struct {
OK bool `json:"ok"`
}
if err = json.Unmarshal(t, &b); nil != err || !b.OK {
return fmt.Errorf("response not ok '%s'", string(t))
if err = json.Unmarshal(content, &b); nil != err || !b.OK {
return fmt.Errorf("response not ok '%s'", string(content))
}
return nil
}