Add support for Azure EventHub provider

Solves #190

Signed-off-by: Edvin Norling <edvin.norling@xenit.se>
This commit is contained in:
Edvin Norling 2021-04-28 10:53:36 +02:00 committed by Edvin Norling
parent 15d98c9afc
commit 6ffa2e91fe
6 changed files with 76 additions and 2 deletions

View File

@ -28,7 +28,7 @@ const (
// ProviderSpec defines the desired state of Provider
type ProviderSpec struct {
// Type of provider
// +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;github;gitlab;bitbucket;azuredevops;googlechat;webex;sentry
// +kubebuilder:validation:Enum=slack;discord;msteams;rocket;generic;github;gitlab;bitbucket;azuredevops;googlechat;webex;sentry;eventhub;
// +required
Type string `json:"type"`
@ -76,6 +76,7 @@ const (
GoogleChatProvider string = "googlechat"
WebexProvider string = "webex"
SentryProvider string = "sentry"
EventHubProvider string = "eventhub"
)
// ProviderStatus defines the observed state of Provider

View File

@ -92,6 +92,7 @@ spec:
- googlechat
- webex
- sentry
- eventhub
type: string
username:
description: Bot username for this provider

View File

@ -46,6 +46,7 @@ Notification providers:
* Google Chat
* Webex
* Sentry
* EventHub
* Generic webhook
Git commit status providers:
@ -106,7 +107,7 @@ kubectl create secret generic webhook-url \
Note that the secret must contain an `address` field.
The provider type can be: `slack`, `msteams`, `rocket`, `discord`, `googlechat`, `webex`, `sentry`, `github`, `gitlab`, `bitbucket`, `azuredevops` or `generic`.
The provider type can be: `slack`, `msteams`, `rocket`, `discord`, `googlechat`, `webex`, `sentry`, `eventhub`, `github`, `gitlab`, `bitbucket`, `azuredevops` or `generic`.
When type `generic` is specified, the notification controller will post the
incoming [event](event.md) in JSON format to the webhook address.

8
go.mod
View File

@ -5,6 +5,10 @@ go 1.15
replace github.com/fluxcd/notification-controller/api => ./api
require (
github.com/Azure/azure-amqp-common-go/v3 v3.1.0 // indirect
github.com/Azure/azure-event-hubs-go/v3 v3.3.7
github.com/Azure/azure-sdk-for-go v53.4.0+incompatible // indirect
github.com/Azure/go-amqp v0.13.6 // indirect
github.com/fluxcd/notification-controller/api v0.13.0
github.com/fluxcd/pkg/apis/meta v0.9.0
github.com/fluxcd/pkg/runtime v0.11.0
@ -12,8 +16,10 @@ require (
github.com/go-logr/logr v0.3.0
github.com/google/go-github/v32 v32.1.0
github.com/hashicorp/go-retryablehttp v0.6.8
github.com/jpillora/backoff v1.0.0 // indirect
github.com/ktrysmt/go-bitbucket v0.6.5
github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/onsi/ginkgo v1.14.1
github.com/onsi/gomega v1.10.2
github.com/sethvargo/go-limiter v0.6.0
@ -22,6 +28,8 @@ require (
github.com/stretchr/testify v1.6.1
github.com/whilp/git-urls v1.0.0
github.com/xanzy/go-gitlab v0.38.2
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b // indirect
golang.org/x/net v0.0.0-20210427231257-85d9c07bbe3a // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
k8s.io/api v0.20.4
k8s.io/apimachinery v0.20.4

View File

@ -0,0 +1,61 @@
/*
Copyright 2020 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package notifier
import (
"context"
"encoding/json"
"fmt"
"time"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/fluxcd/pkg/runtime/events"
)
type EventHub struct {
Hub *eventhub.Hub
}
func NewEventHub(endpointURL string) (*EventHub, error) {
hub, err := eventhub.NewHubFromConnectionString(endpointURL)
if err != nil {
return nil, err
}
return &EventHub{
Hub: hub,
}, nil
}
// Post EventHub msg
func (e *EventHub) Post(event events.Event) error {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
eventBytes, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("Unable to marshall event: %w", err)
}
//err := e.Hub.Send(ctx, eventhub.NewEventFromString("From, String"))
err = e.Hub.Send(ctx, eventhub.NewEvent(eventBytes))
if err != nil {
return fmt.Errorf("Failed to send msg: %w", err)
}
err = e.Hub.Close(context.Background())
if err != nil {
return fmt.Errorf("Unable to close connection: %w", err)
}
return nil
}

View File

@ -75,6 +75,8 @@ func (f Factory) Notifier(provider string) (Interface, error) {
n, err = NewWebex(f.URL, f.ProxyURL, f.CertPool)
case v1beta1.SentryProvider:
n, err = NewSentry(f.CertPool, f.URL)
case v1beta1.EventHubProvider:
n, err = NewEventHub(f.URL)
default:
err = fmt.Errorf("provider %s not supported", provider)
}