Fix Slack chat.postMessage error handling

Signed-off-by: Hidehito Yabuuchi <hdht.ybuc@gmail.com>
This commit is contained in:
Hidehito Yabuuchi 2025-03-24 18:56:17 +09:00
parent 8dd496a132
commit 882383e44c
16 changed files with 339 additions and 101 deletions

View File

@ -137,15 +137,22 @@ func (s *Alertmanager) Post(ctx context.Context, event eventv1.Event) error {
},
}
var opts []requestOptFunc
if s.Token != "" {
opts = append(opts, func(request *retryablehttp.Request) {
request.Header.Add("Authorization", "Bearer "+s.Token)
})
var opts []postOption
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
err := postMessage(ctx, s.URL, s.ProxyURL, s.CertPool, payload, opts...)
if err != nil {
if s.CertPool != nil {
opts = append(opts, withCertPool(s.CertPool))
}
if s.Token != "" {
opts = append(opts, withRequestModifier(func(request *retryablehttp.Request) {
request.Header.Add("Authorization", "Bearer "+s.Token)
}))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}

View File

@ -32,29 +32,118 @@ import (
"github.com/hashicorp/go-retryablehttp"
)
type requestOptFunc func(*retryablehttp.Request)
type postOptions struct {
proxy string
certPool *x509.CertPool
requestModifier func(*retryablehttp.Request)
responseValidator func(statusCode int, body []byte) error
}
func postMessage(ctx context.Context, address, proxy string, certPool *x509.CertPool, payload interface{}, reqOpts ...requestOptFunc) error {
type postOption func(*postOptions)
func postMessage(ctx context.Context, address string, payload interface{}, opts ...postOption) error {
options := &postOptions{
// Default validateResponse function verifies that the response status code is 200, 202 or 201.
responseValidator: func(statusCode int, body []byte) error {
if statusCode == http.StatusOK ||
statusCode == http.StatusAccepted ||
statusCode == http.StatusCreated {
return nil
}
return fmt.Errorf("request failed with status code %d, %s", statusCode, string(body))
},
}
for _, o := range opts {
o(options)
}
httpClient, err := newHTTPClient(options)
if err != nil {
return err
}
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshalling notification payload failed: %w", err)
}
req, err := retryablehttp.NewRequestWithContext(ctx, http.MethodPost, address, data)
if err != nil {
return fmt.Errorf("failed to create a new request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if options.requestModifier != nil {
options.requestModifier(req)
}
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to execute request: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
if err := options.responseValidator(resp.StatusCode, body); err != nil {
return fmt.Errorf("request failed: %w", err)
}
return nil
}
func withProxy(proxy string) postOption {
return func(opts *postOptions) {
opts.proxy = proxy
}
}
func withCertPool(certPool *x509.CertPool) postOption {
return func(opts *postOptions) {
opts.certPool = certPool
}
}
func withRequestModifier(reqModifier func(*retryablehttp.Request)) postOption {
return func(opts *postOptions) {
opts.requestModifier = reqModifier
}
}
func withResponseValidator(respValidator func(statusCode int, body []byte) error) postOption {
return func(opts *postOptions) {
opts.responseValidator = respValidator
}
}
func newHTTPClient(opts *postOptions) (*retryablehttp.Client, error) {
httpClient := retryablehttp.NewClient()
if certPool != nil {
if opts.certPool != nil {
httpClient.HTTPClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: certPool,
RootCAs: opts.certPool,
},
}
}
if proxy != "" {
proxyURL, err := url.Parse(proxy)
if opts.proxy != "" {
proxyURL, err := url.Parse(opts.proxy)
if err != nil {
return fmt.Errorf("unable to parse proxy URL '%s', error: %w", proxy, err)
return nil, fmt.Errorf("unable to parse proxy URL: %w", err)
}
var tlsConfig *tls.Config
if certPool != nil {
if opts.certPool != nil {
tlsConfig = &tls.Config{
RootCAs: certPool,
RootCAs: opts.certPool,
}
}
httpClient.HTTPClient.Transport = &http.Transport{
Proxy: http.ProxyURL(proxyURL),
TLSClientConfig: tlsConfig,
@ -79,34 +168,5 @@ func postMessage(ctx context.Context, address, proxy string, certPool *x509.Cert
httpClient.RetryMax = 4
httpClient.Logger = nil
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshalling notification payload failed: %w", err)
}
req, err := retryablehttp.NewRequest(http.MethodPost, address, data)
if err != nil {
return fmt.Errorf("failed to create a new request: %w", err)
}
if ctx != nil {
req = req.WithContext(ctx)
}
req.Header.Set("Content-Type", "application/json")
for _, o := range reqOpts {
o(req)
}
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to execute request: %w", err)
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted && resp.StatusCode != http.StatusCreated {
b, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("unable to read response body, %s", err)
}
return fmt.Errorf("request failed with status code %d, %s", resp.StatusCode, string(b))
}
return nil
return httpClient, nil
}

View File

@ -20,13 +20,16 @@ import (
"context"
"crypto/x509"
"encoding/json"
"errors"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/hashicorp/go-retryablehttp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -45,7 +48,7 @@ func Test_postMessage(t *testing.T) {
require.Equal(t, "success", payload["status"])
}))
defer ts.Close()
err := postMessage(context.Background(), ts.URL, "", nil, map[string]string{"status": "success"})
err := postMessage(context.Background(), ts.URL, map[string]string{"status": "success"})
require.NoError(t, err)
}
@ -56,7 +59,7 @@ func Test_postMessage_timeout(t *testing.T) {
defer ts.Close()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := postMessage(ctx, ts.URL, "", nil, map[string]string{"status": "success"})
err := postMessage(ctx, ts.URL, map[string]string{"status": "success"})
require.Error(t, err, "context deadline exceeded")
}
@ -77,10 +80,42 @@ func Test_postSelfSignedCert(t *testing.T) {
require.NoError(t, err)
certpool := x509.NewCertPool()
certpool.AddCert(cert)
err = postMessage(context.Background(), ts.URL, "", certpool, map[string]string{"status": "success"})
err = postMessage(context.Background(), ts.URL, map[string]string{"status": "success"}, withCertPool(certpool))
require.NoError(t, err)
}
func Test_postMessage_requestModifier(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
require.Equal(t, "Bearer token", r.Header.Get("Authorization"))
}))
defer ts.Close()
err := postMessage(context.Background(), ts.URL, map[string]string{"status": "success"}, withRequestModifier(func(req *retryablehttp.Request) {
req.Header.Set("Authorization", "Bearer token")
}))
require.NoError(t, err)
}
func Test_postMessage_responseValidator(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Default response validator determines success, but the custom validator below will determine failure .
w.WriteHeader(http.StatusOK)
w.Write([]byte("error: bad request"))
}))
defer ts.Close()
err := postMessage(context.Background(), ts.URL, map[string]string{"status": "success"})
require.NoError(t, err)
err = postMessage(context.Background(), ts.URL, map[string]string{"status": "success"}, withResponseValidator(func(_ int, body []byte) error {
if strings.HasPrefix(string(body), "error:") {
return errors.New(string(body))
}
return nil
}))
require.ErrorContains(t, err, "request failed: error: bad request")
}
func testEvent() eventv1.Event {
return eventv1.Event{
InvolvedObject: corev1.ObjectReference{

View File

@ -90,8 +90,12 @@ func (s *Discord) Post(ctx context.Context, event eventv1.Event) error {
payload.Attachments = []SlackAttachment{a}
err := postMessage(ctx, s.URL, s.ProxyURL, nil, payload)
if err != nil {
var opts []postOption
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}

View File

@ -77,18 +77,28 @@ func (f *Forwarder) Post(ctx context.Context, event eventv1.Event) error {
}
sig = fmt.Sprintf("sha256=%s", sign(eventJSON, f.HMACKey))
}
err := postMessage(ctx, f.URL, f.ProxyURL, f.CertPool, event, func(req *retryablehttp.Request) {
req.Header.Set(NotificationHeader, event.ReportingController)
for key, val := range f.Headers {
req.Header.Set(key, val)
}
if sig != "" {
req.Header.Set("X-Signature", sig)
}
})
if err != nil {
opts := []postOption{
withRequestModifier(func(req *retryablehttp.Request) {
req.Header.Set(NotificationHeader, event.ReportingController)
for key, val := range f.Headers {
req.Header.Set(key, val)
}
if sig != "" {
req.Header.Set("X-Signature", sig)
}
}),
}
if f.ProxyURL != "" {
opts = append(opts, withProxy(f.ProxyURL))
}
if f.CertPool != nil {
opts = append(opts, withCertPool(f.CertPool))
}
if err := postMessage(ctx, f.URL, event, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}

View File

@ -142,8 +142,12 @@ func (s *GoogleChat) Post(ctx context.Context, event eventv1.Event) error {
Cards: []GoogleChatCard{card},
}
err := postMessage(ctx, s.URL, s.ProxyURL, nil, payload)
if err != nil {
var opts []postOption
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}

View File

@ -84,15 +84,24 @@ func (g *Grafana) Post(ctx context.Context, event eventv1.Event) error {
Tags: sfields,
}
err := postMessage(ctx, g.URL, g.ProxyURL, g.CertPool, payload, func(request *retryablehttp.Request) {
if (g.Username != "" && g.Password != "") && g.Token == "" {
request.Header.Add("Authorization", "Basic "+basicAuth(g.Username, g.Password))
}
if g.Token != "" {
request.Header.Add("Authorization", "Bearer "+g.Token)
}
})
if err != nil {
opts := []postOption{
withRequestModifier(func(req *retryablehttp.Request) {
if (g.Username != "" && g.Password != "") && g.Token == "" {
req.Header.Add("Authorization", "Basic "+basicAuth(g.Username, g.Password))
}
if g.Token != "" {
req.Header.Add("Authorization", "Bearer "+g.Token)
}
}),
}
if g.ProxyURL != "" {
opts = append(opts, withProxy(g.ProxyURL))
}
if g.CertPool != nil {
opts = append(opts, withCertPool(g.CertPool))
}
if err := postMessage(ctx, g.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil

View File

@ -109,5 +109,5 @@ func (l *Lark) Post(ctx context.Context, event eventv1.Event) error {
Card: card,
}
return postMessage(ctx, l.URL, "", nil, payload)
return postMessage(ctx, l.URL, payload)
}

View File

@ -65,11 +65,17 @@ func (m *Matrix) Post(ctx context.Context, event eventv1.Event) error {
MsgType: "m.text",
}
err = postMessage(ctx, fullURL, "", m.CertPool, payload, func(request *retryablehttp.Request) {
request.Method = http.MethodPut
request.Header.Add("Authorization", "Bearer "+m.Token)
})
if err != nil {
opts := []postOption{
withRequestModifier(func(req *retryablehttp.Request) {
req.Method = http.MethodPut
req.Header.Add("Authorization", "Bearer "+m.Token)
}),
}
if m.CertPool != nil {
opts = append(opts, withCertPool(m.CertPool))
}
if err := postMessage(ctx, fullURL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}

View File

@ -78,12 +78,21 @@ func (s *Opsgenie) Post(ctx context.Context, event eventv1.Event) error {
Details: details,
}
err := postMessage(ctx, s.URL, s.ProxyURL, s.CertPool, payload, func(req *retryablehttp.Request) {
req.Header.Set("Authorization", "GenieKey "+s.ApiKey)
})
opts := []postOption{
withRequestModifier(func(req *retryablehttp.Request) {
req.Header.Set("Authorization", "GenieKey "+s.ApiKey)
}),
}
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if s.CertPool != nil {
opts = append(opts, withCertPool(s.CertPool))
}
if err != nil {
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}

View File

@ -54,19 +54,36 @@ func (p *PagerDuty) Post(ctx context.Context, event eventv1.Event) error {
if event.HasMetadata(eventv1.MetaCommitStatusKey, eventv1.MetaCommitStatusUpdateValue) || event.HasReason(meta.ProgressingReason) {
return nil
}
e := toPagerDutyV2Event(event, p.RoutingKey)
err := postMessage(ctx, p.Endpoint+"/v2/enqueue", p.ProxyURL, p.CertPool, e)
if err != nil {
var opts []postOption
if p.ProxyURL != "" {
opts = append(opts, withProxy(p.ProxyURL))
}
if p.CertPool != nil {
opts = append(opts, withCertPool(p.CertPool))
}
if err := postMessage(
ctx,
p.Endpoint+"/v2/enqueue",
toPagerDutyV2Event(event, p.RoutingKey),
opts...,
); err != nil {
return fmt.Errorf("failed sending event: %w", err)
}
// Send a change event for info events
if event.Severity == eventv1.EventSeverityInfo {
ce := toPagerDutyChangeEvent(event, p.RoutingKey)
err = postMessage(ctx, p.Endpoint+"/v2/change/enqueue", p.ProxyURL, p.CertPool, ce)
if err != nil {
if err := postMessage(
ctx,
p.Endpoint+"/v2/change/enqueue",
toPagerDutyChangeEvent(event, p.RoutingKey),
opts...,
); err != nil {
return fmt.Errorf("failed sending change event: %w", err)
}
}
return nil
}

View File

@ -83,8 +83,15 @@ func (s *Rocket) Post(ctx context.Context, event eventv1.Event) error {
payload.Attachments = []SlackAttachment{a}
err := postMessage(ctx, s.URL, s.ProxyURL, s.CertPool, payload)
if err != nil {
var opts []postOption
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if s.CertPool != nil {
opts = append(opts, withCertPool(s.CertPool))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil

View File

@ -19,6 +19,7 @@ package notifier
import (
"context"
"crypto/x509"
"encoding/json"
"fmt"
"net/url"
"strings"
@ -118,13 +119,49 @@ func (s *Slack) Post(ctx context.Context, event eventv1.Event) error {
payload.Attachments = []SlackAttachment{a}
err := postMessage(ctx, s.URL, s.ProxyURL, s.CertPool, payload, func(request *retryablehttp.Request) {
if s.Token != "" {
request.Header.Add("Authorization", "Bearer "+s.Token)
}
})
if err != nil {
opts := []postOption{
withRequestModifier(func(request *retryablehttp.Request) {
if s.Token != "" {
request.Header.Add("Authorization", "Bearer "+s.Token)
}
}),
}
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if s.CertPool != nil {
opts = append(opts, withCertPool(s.CertPool))
}
if s.URL == "https://slack.com/api/chat.postMessage" {
opts = append(opts, withResponseValidator(validateSlackResponse))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}
// validateSlackResponse validates that a chat.postMessage API response is successful.
// chat.postMessage API always returns 200 OK.
// See https://api.slack.com/methods/chat.postMessage.
//
// On the other hand, incoming webhooks return more expressive HTTP status codes.
// See https://api.slack.com/messaging/webhooks#handling_errors.
func validateSlackResponse(_ int, body []byte) error {
type slackResponse struct {
Ok bool `json:"ok"`
Error string `json:"error"`
}
slackResp := slackResponse{}
if err := json.Unmarshal(body, &slackResp); err != nil {
return fmt.Errorf("unable to unmarshal response body: %w", err)
}
if slackResp.Ok {
return nil
}
return fmt.Errorf("Slack responded with error: %s", slackResp.Error)
}

View File

@ -58,3 +58,18 @@ func TestSlack_PostUpdate(t *testing.T) {
err = slack.Post(context.TODO(), event)
require.NoError(t, err)
}
func TestSlack_ValidateResponse(t *testing.T) {
body := []byte(`{
"ok": true
}`)
err := validateSlackResponse(http.StatusOK, body)
require.NoError(t, err)
body = []byte(`{
"ok": false,
"error": "too_many_attachments"
}`)
err = validateSlackResponse(http.StatusOK, body)
require.ErrorContains(t, err, "Slack responded with error: too_many_attachments")
}

View File

@ -161,8 +161,15 @@ func (s *MSTeams) Post(ctx context.Context, event eventv1.Event) error {
payload = buildMSTeamsAdaptiveCardPayload(&event, objName)
}
err := postMessage(ctx, s.URL, s.ProxyURL, s.CertPool, payload)
if err != nil {
var opts []postOption
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if s.CertPool != nil {
opts = append(opts, withCertPool(s.CertPool))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}

View File

@ -108,10 +108,21 @@ func (s *Webex) Post(ctx context.Context, event eventv1.Event) error {
Markdown: s.CreateMarkdown(&event),
}
if err := postMessage(ctx, s.URL, s.ProxyURL, s.CertPool, payload, func(request *retryablehttp.Request) {
request.Header.Add("Authorization", "Bearer "+s.Token)
}); err != nil {
opts := []postOption{
withRequestModifier(func(req *retryablehttp.Request) {
req.Header.Add("Authorization", "Bearer "+s.Token)
}),
}
if s.ProxyURL != "" {
opts = append(opts, withProxy(s.ProxyURL))
}
if s.CertPool != nil {
opts = append(opts, withCertPool(s.CertPool))
}
if err := postMessage(ctx, s.URL, payload, opts...); err != nil {
return fmt.Errorf("postMessage failed: %w", err)
}
return nil
}