diff --git a/configuration/azure/appconfig/appconfig.go b/configuration/azure/appconfig/appconfig.go index b91601ad6..849763c5c 100644 --- a/configuration/azure/appconfig/appconfig.go +++ b/configuration/azure/appconfig/appconfig.go @@ -15,6 +15,7 @@ package appconfig import ( "context" + "errors" "fmt" "strconv" "sync" @@ -189,24 +190,19 @@ func parseMetadata(meta configuration.Metadata) (metadata, error) { } func (r *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequest) (*configuration.GetResponse, error) { - timeoutContext, cancel := context.WithTimeout(ctx, r.metadata.requestTimeout) - defer cancel() - keys := req.Keys var items map[string]*configuration.Item if len(keys) == 0 { var err error - if items, err = r.getAll(timeoutContext, req); err != nil { + if items, err = r.getAll(ctx, req); err != nil { return &configuration.GetResponse{}, err } } else { items = make(map[string]*configuration.Item, len(keys)) for _, key := range keys { - // TODO: here contxt.TODO() is used because the SDK panics when a cancelled context is passed in GetSetting - // Issue - https://github.com/Azure/azure-sdk-for-go/issues/19223 . Needs to be modified to use timeoutContext once the SDK is fixed - resp, err := r.client.GetSetting( - context.TODO(), + resp, err := r.getSettings( + ctx, key, &azappconfig.GetSettingOptions{ Label: r.getLabelFromMetadata(req.Metadata), @@ -248,10 +244,10 @@ func (r *ConfigurationStore) getAll(ctx context.Context, req *configuration.GetR }, nil) - // TODO: here contxt.TODO() is used because the SDK panics when a cancelled context is passed in NextPage - // Issue - https://github.com/Azure/azure-sdk-for-go/issues/19223 . It needs to be modified to use ctx once the SDK is fixed for allSettingsPgr.More() { - if revResp, err := allSettingsPgr.NextPage(context.TODO()); err == nil { + timeoutContext, cancel := context.WithTimeout(ctx, r.metadata.requestTimeout) + defer cancel() + if revResp, err := allSettingsPgr.NextPage(timeoutContext); err == nil { for _, setting := range revResp.Settings { item := &configuration.Item{ Metadata: map[string]string{}, @@ -295,20 +291,33 @@ func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.S } func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, sentinelKey string, id string) { + var etagVal *azcore.ETag for { - // get sentinel key changes - _, err := r.Get(ctx, &configuration.GetRequest{ - Keys: []string{sentinelKey}, - Metadata: req.Metadata, - }) + // get sentinel key changes. + resp, err := r.getSettings( + ctx, + sentinelKey, + &azappconfig.GetSettingOptions{ + Label: r.getLabelFromMetadata(req.Metadata), + OnlyIfChanged: etagVal, + }, + ) if err != nil { - r.logger.Debugf("azure appconfig error: fail to get sentinel key changes or sentinel key's value is unchanged: %s", err) + if errors.Is(err, context.Canceled) { + return + } + r.logger.Debugf("azure appconfig error: fail to get sentinel key or sentinel's key %s value is unchanged: %s", sentinelKey, err) } else { + // if sentinel key has changed then update the Etag value. + etagVal = resp.ETag items, err := r.Get(ctx, &configuration.GetRequest{ Keys: req.Keys, Metadata: req.Metadata, }) if err != nil { + if errors.Is(err, context.Canceled) { + return + } r.logger.Errorf("azure appconfig error: fail to get configuration key changes: %s", err) } else { r.handleSubscribedChange(ctx, handler, items, id) @@ -322,6 +331,13 @@ func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration } } +func (r *ConfigurationStore) getSettings(ctx context.Context, key string, getSettingsOptions *azappconfig.GetSettingOptions) (azappconfig.GetSettingResponse, error) { + timeoutContext, cancel := context.WithTimeout(ctx, r.metadata.requestTimeout) + defer cancel() + resp, err := r.client.GetSetting(timeoutContext, key, getSettingsOptions) + return resp, err +} + func (r *ConfigurationStore) handleSubscribedChange(ctx context.Context, handler configuration.UpdateHandler, items *configuration.GetResponse, id string) { e := &configuration.UpdateEvent{ Items: items.Items, diff --git a/go.mod b/go.mod index 84ce825b3..ca7e935c9 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/Azure/azure-sdk-for-go v65.0.0+incompatible github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.4 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 - github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.3 + github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.5.0 github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2 github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.10.1 diff --git a/go.sum b/go.sum index 72cb6d3dc..615004ee0 100644 --- a/go.sum +++ b/go.sum @@ -103,8 +103,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.4/go.mod h1:uGG2W01BaETf0Ozp+Q github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.11.0/go.mod h1:HcM1YX14R7CJcghJGOYCgdezslRSVzqwLf/q+4Y2r/0= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= -github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.3 h1:QzjiMJn/pBxOq1xA3F6ODUvO1agmt7+mI+DZEx6dPtc= -github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.3/go.mod h1:p74+tP95m8830ypJk53L93+BEsjTKY4SKQ75J2NmS5U= +github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.5.0 h1:OrKZybbyagpgJiREiIVzH5mV/z9oS4rXqdX7i31DSF0= +github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.5.0/go.mod h1:p74+tP95m8830ypJk53L93+BEsjTKY4SKQ75J2NmS5U= github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2 h1:yJegJqjhrMJ3Oe5s43jOTGL2AsE7pJyx+7Yqls/65tw= github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q= github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 h1:bFa9IcjvrCber6gGgDAUZ+I2bO8J7s8JxXmu9fhi2ss= diff --git a/pubsub/jetstream/jetstream.go b/pubsub/jetstream/jetstream.go index 28555d9e8..37e2a610d 100644 --- a/pubsub/jetstream/jetstream.go +++ b/pubsub/jetstream/jetstream.go @@ -57,6 +57,9 @@ func (js *jetstreamPubSub) Init(metadata pubsub.Metadata) error { } else if js.meta.tlsClientCert != "" && js.meta.tlsClientKey != "" { js.l.Debug("Configure nats for tls client authentication") opts = append(opts, nats.ClientCert(js.meta.tlsClientCert, js.meta.tlsClientKey)) + } else if js.meta.token != "" { + js.l.Debug("Configure nats for token authentication") + opts = append(opts, nats.Token(js.meta.token)) } js.nc, err = nats.Connect(js.meta.natsURL, opts...) diff --git a/pubsub/jetstream/metadata.go b/pubsub/jetstream/metadata.go index 890959013..e35708952 100644 --- a/pubsub/jetstream/metadata.go +++ b/pubsub/jetstream/metadata.go @@ -27,6 +27,7 @@ type metadata struct { jwt string seedKey string + token string tlsClientCert string tlsClientKey string @@ -58,6 +59,7 @@ func parseMetadata(psm pubsub.Metadata) (metadata, error) { return metadata{}, fmt.Errorf("missing nats URL") } + m.token = psm.Properties["token"] m.jwt = psm.Properties["jwt"] m.seedKey = psm.Properties["seedKey"] diff --git a/pubsub/jetstream/metadata_test.go b/pubsub/jetstream/metadata_test.go index 94314a2d2..0ad9007bc 100644 --- a/pubsub/jetstream/metadata_test.go +++ b/pubsub/jetstream/metadata_test.go @@ -71,6 +71,50 @@ func TestParseMetadata(t *testing.T) { }, expectErr: false, }, + { + desc: "Valid Metadata with token", + input: pubsub.Metadata{Base: mdata.Base{ + Properties: map[string]string{ + "natsURL": "nats://localhost:4222", + "name": "myName", + "durableName": "myDurable", + "queueGroupName": "myQueue", + "startSequence": "1", + "startTime": "1629328511", + "deliverAll": "true", + "flowControl": "true", + "ackWait": "2s", + "maxDeliver": "10", + "backOff": "500ms, 2s, 10s", + "maxAckPending": "5000", + "replicas": "3", + "memoryStorage": "true", + "rateLimit": "20000", + "hearbeat": "1s", + "token": "myToken", + }, + }}, + want: metadata{ + natsURL: "nats://localhost:4222", + name: "myName", + durableName: "myDurable", + queueGroupName: "myQueue", + startSequence: 1, + startTime: time.Unix(1629328511, 0), + deliverAll: true, + flowControl: true, + ackWait: 2 * time.Second, + maxDeliver: 10, + backOff: []time.Duration{time.Millisecond * 500, time.Second * 2, time.Second * 10}, + maxAckPending: 5000, + replicas: 3, + memoryStorage: true, + rateLimit: 20000, + hearbeat: time.Second * 1, + token: "myToken", + }, + expectErr: false, + }, { desc: "Invalid metadata with missing seed key", input: pubsub.Metadata{Base: mdata.Base{