Use token & channel in azureevent

For JWT
* channel = eventhub namespace
* address = eventhub name
* token   = jwt token

For SAS
* address = connectionString, including endpoint tokens etc

Signed-off-by: Edvin Norling <edvin.norling@xenit.se>
This commit is contained in:
Edvin Norling 2021-05-05 10:18:19 +02:00
parent 45a77978b5
commit cfa71bf7a8
2 changed files with 8 additions and 8 deletions

View File

@ -17,7 +17,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/Azure/azure-amqp-common-go/v3/auth" "github.com/Azure/azure-amqp-common-go/v3/auth"
@ -31,12 +30,13 @@ type AzureEventHub struct {
} }
// NewAzureEventHub creates a eventhub client // NewAzureEventHub creates a eventhub client
func NewAzureEventHub(endpointURL string) (*AzureEventHub, error) { func NewAzureEventHub(endpointURL, token, eventhubNamespace string) (*AzureEventHub, error) {
var hub *eventhub.Hub var hub *eventhub.Hub
var err error var err error
if strings.ToLower(endpointURL[:8]) != "endpoint" { // token should only be defined if JWT is used
hub, err = newJWTHub(endpointURL) if token != "" {
hub, err = newJWTHub(endpointURL, token, eventhubNamespace)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create a eventhub using JWT %v", err) return nil, fmt.Errorf("failed to create a eventhub using JWT %v", err)
} }
@ -96,10 +96,10 @@ func (j *PureJWT) GetToken(uri string) (*auth.Token, error) {
} }
// newJWTHub used when address is a JWT token // newJWTHub used when address is a JWT token
func newJWTHub(address string) (*eventhub.Hub, error) { func newJWTHub(eventhubName, token, eventhubNamespace string) (*eventhub.Hub, error) {
provider := NewJWTProvider(address) provider := NewJWTProvider(token)
hub, err := eventhub.NewHub("fluxv2", "fluxv2", provider) hub, err := eventhub.NewHub(eventhubNamespace, eventhubName, provider)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -76,7 +76,7 @@ func (f Factory) Notifier(provider string) (Interface, error) {
case v1beta1.SentryProvider: case v1beta1.SentryProvider:
n, err = NewSentry(f.CertPool, f.URL) n, err = NewSentry(f.CertPool, f.URL)
case v1beta1.AzureEventHubProvider: case v1beta1.AzureEventHubProvider:
n, err = NewAzureEventHub(f.URL) n, err = NewAzureEventHub(f.URL, f.Token, f.Channel)
default: default:
err = fmt.Errorf("provider %s not supported", provider) err = fmt.Errorf("provider %s not supported", provider)
} }