Merge branch 'master' into azblobbindingtrack2

This commit is contained in:
Alessandro (Ale) Segala 2022-11-18 14:46:22 -08:00 committed by GitHub
commit cf01ee81bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 85 additions and 20 deletions

View File

@ -15,6 +15,7 @@ package appconfig
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
@ -189,24 +190,19 @@ func parseMetadata(meta configuration.Metadata) (metadata, error) {
}
func (r *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequest) (*configuration.GetResponse, error) {
timeoutContext, cancel := context.WithTimeout(ctx, r.metadata.requestTimeout)
defer cancel()
keys := req.Keys
var items map[string]*configuration.Item
if len(keys) == 0 {
var err error
if items, err = r.getAll(timeoutContext, req); err != nil {
if items, err = r.getAll(ctx, req); err != nil {
return &configuration.GetResponse{}, err
}
} else {
items = make(map[string]*configuration.Item, len(keys))
for _, key := range keys {
// TODO: here contxt.TODO() is used because the SDK panics when a cancelled context is passed in GetSetting
// Issue - https://github.com/Azure/azure-sdk-for-go/issues/19223 . Needs to be modified to use timeoutContext once the SDK is fixed
resp, err := r.client.GetSetting(
context.TODO(),
resp, err := r.getSettings(
ctx,
key,
&azappconfig.GetSettingOptions{
Label: r.getLabelFromMetadata(req.Metadata),
@ -248,10 +244,10 @@ func (r *ConfigurationStore) getAll(ctx context.Context, req *configuration.GetR
},
nil)
// TODO: here contxt.TODO() is used because the SDK panics when a cancelled context is passed in NextPage
// Issue - https://github.com/Azure/azure-sdk-for-go/issues/19223 . It needs to be modified to use ctx once the SDK is fixed
for allSettingsPgr.More() {
if revResp, err := allSettingsPgr.NextPage(context.TODO()); err == nil {
timeoutContext, cancel := context.WithTimeout(ctx, r.metadata.requestTimeout)
defer cancel()
if revResp, err := allSettingsPgr.NextPage(timeoutContext); err == nil {
for _, setting := range revResp.Settings {
item := &configuration.Item{
Metadata: map[string]string{},
@ -295,20 +291,33 @@ func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.S
}
func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, sentinelKey string, id string) {
var etagVal *azcore.ETag
for {
// get sentinel key changes
_, err := r.Get(ctx, &configuration.GetRequest{
Keys: []string{sentinelKey},
Metadata: req.Metadata,
})
// get sentinel key changes.
resp, err := r.getSettings(
ctx,
sentinelKey,
&azappconfig.GetSettingOptions{
Label: r.getLabelFromMetadata(req.Metadata),
OnlyIfChanged: etagVal,
},
)
if err != nil {
r.logger.Debugf("azure appconfig error: fail to get sentinel key changes or sentinel key's value is unchanged: %s", err)
if errors.Is(err, context.Canceled) {
return
}
r.logger.Debugf("azure appconfig error: fail to get sentinel key or sentinel's key %s value is unchanged: %s", sentinelKey, err)
} else {
// if sentinel key has changed then update the Etag value.
etagVal = resp.ETag
items, err := r.Get(ctx, &configuration.GetRequest{
Keys: req.Keys,
Metadata: req.Metadata,
})
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
r.logger.Errorf("azure appconfig error: fail to get configuration key changes: %s", err)
} else {
r.handleSubscribedChange(ctx, handler, items, id)
@ -322,6 +331,13 @@ func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration
}
}
func (r *ConfigurationStore) getSettings(ctx context.Context, key string, getSettingsOptions *azappconfig.GetSettingOptions) (azappconfig.GetSettingResponse, error) {
timeoutContext, cancel := context.WithTimeout(ctx, r.metadata.requestTimeout)
defer cancel()
resp, err := r.client.GetSetting(timeoutContext, key, getSettingsOptions)
return resp, err
}
func (r *ConfigurationStore) handleSubscribedChange(ctx context.Context, handler configuration.UpdateHandler, items *configuration.GetResponse, id string) {
e := &configuration.UpdateEvent{
Items: items.Items,

2
go.mod
View File

@ -13,7 +13,7 @@ require (
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.4
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.3
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.5.0
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.10.1

4
go.sum
View File

@ -103,8 +103,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.4/go.mod h1:uGG2W01BaETf0Ozp+Q
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.11.0/go.mod h1:HcM1YX14R7CJcghJGOYCgdezslRSVzqwLf/q+4Y2r/0=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.3 h1:QzjiMJn/pBxOq1xA3F6ODUvO1agmt7+mI+DZEx6dPtc=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.3/go.mod h1:p74+tP95m8830ypJk53L93+BEsjTKY4SKQ75J2NmS5U=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.5.0 h1:OrKZybbyagpgJiREiIVzH5mV/z9oS4rXqdX7i31DSF0=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.5.0/go.mod h1:p74+tP95m8830ypJk53L93+BEsjTKY4SKQ75J2NmS5U=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2 h1:yJegJqjhrMJ3Oe5s43jOTGL2AsE7pJyx+7Yqls/65tw=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 h1:bFa9IcjvrCber6gGgDAUZ+I2bO8J7s8JxXmu9fhi2ss=

View File

@ -57,6 +57,9 @@ func (js *jetstreamPubSub) Init(metadata pubsub.Metadata) error {
} else if js.meta.tlsClientCert != "" && js.meta.tlsClientKey != "" {
js.l.Debug("Configure nats for tls client authentication")
opts = append(opts, nats.ClientCert(js.meta.tlsClientCert, js.meta.tlsClientKey))
} else if js.meta.token != "" {
js.l.Debug("Configure nats for token authentication")
opts = append(opts, nats.Token(js.meta.token))
}
js.nc, err = nats.Connect(js.meta.natsURL, opts...)

View File

@ -27,6 +27,7 @@ type metadata struct {
jwt string
seedKey string
token string
tlsClientCert string
tlsClientKey string
@ -58,6 +59,7 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) {
return metadata{}, fmt.Errorf("missing nats URL")
}
m.token = psm.Properties["token"]
m.jwt = psm.Properties["jwt"]
m.seedKey = psm.Properties["seedKey"]

View File

@ -71,6 +71,50 @@ func TestParseMetadata(t *testing.T) {
},
expectErr: false,
},
{
desc: "Valid Metadata with token",
input: pubsub.Metadata{Base: mdata.Base{
Properties: map[string]string{
"natsURL": "nats://localhost:4222",
"name": "myName",
"durableName": "myDurable",
"queueGroupName": "myQueue",
"startSequence": "1",
"startTime": "1629328511",
"deliverAll": "true",
"flowControl": "true",
"ackWait": "2s",
"maxDeliver": "10",
"backOff": "500ms, 2s, 10s",
"maxAckPending": "5000",
"replicas": "3",
"memoryStorage": "true",
"rateLimit": "20000",
"hearbeat": "1s",
"token": "myToken",
},
}},
want: metadata{
natsURL: "nats://localhost:4222",
name: "myName",
durableName: "myDurable",
queueGroupName: "myQueue",
startSequence: 1,
startTime: time.Unix(1629328511, 0),
deliverAll: true,
flowControl: true,
ackWait: 2 * time.Second,
maxDeliver: 10,
backOff: []time.Duration{time.Millisecond * 500, time.Second * 2, time.Second * 10},
maxAckPending: 5000,
replicas: 3,
memoryStorage: true,
rateLimit: 20000,
hearbeat: time.Second * 1,
token: "myToken",
},
expectErr: false,
},
{
desc: "Invalid metadata with missing seed key",
input: pubsub.Metadata{Base: mdata.Base{