Azure Appconfig configuration store - Subscribe and Unsubscribe impl (#2088)

This commit is contained in:
Pravin Pushkar 2022-09-28 23:22:12 +05:30 committed by GitHub
parent f15a50581d
commit 23acfa50ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 220 additions and 38 deletions

View File

@ -17,6 +17,7 @@ import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
@ -24,6 +25,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig"
"github.com/google/uuid"
"github.com/dapr/components-contrib/configuration"
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
@ -32,14 +34,18 @@ import (
)
const (
host = "appConfigHost"
connectionString = "appConfigConnectionString"
maxRetries = "maxRetries"
retryDelay = "retryDelay"
maxRetryDelay = "maxRetryDelay"
defaultMaxRetries = 3
defaultRetryDelay = time.Second * 4
defaultMaxRetryDelay = time.Second * 120
host = "host"
connectionString = "connectionString"
maxRetries = "maxRetries"
retryDelay = "retryDelay"
maxRetryDelay = "maxRetryDelay"
subscribePollInterval = "subscribePollInterval"
requestTimeout = "requestTimeout"
defaultMaxRetries = 3
defaultRetryDelay = time.Second * 4
defaultMaxRetryDelay = time.Second * 120
defaultSubscribePollInterval = time.Hour * 24
defaultRequestTimeout = time.Second * 15
)
type azAppConfigClient interface {
@ -49,8 +55,9 @@ type azAppConfigClient interface {
// ConfigurationStore is a Azure App Configuration store.
type ConfigurationStore struct {
client azAppConfigClient
metadata metadata
client azAppConfigClient
metadata metadata
subscribeCancelCtxMap sync.Map
logger logger.Logger
}
@ -137,7 +144,7 @@ func parseMetadata(meta configuration.Metadata) (metadata, error) {
if val, ok := meta.Properties[maxRetries]; ok && val != "" {
parsedVal, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("azure appconfig error: can't parse maxRetries field: %s", err)
return m, fmt.Errorf("azure appconfig error: can't parse maxRetries field: %w", err)
}
m.maxRetries = parsedVal
}
@ -146,7 +153,7 @@ func parseMetadata(meta configuration.Metadata) (metadata, error) {
if val, ok := meta.Properties[maxRetryDelay]; ok && val != "" {
parsedVal, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("azure appconfig error: can't parse maxRetryDelay field: %s", err)
return m, fmt.Errorf("azure appconfig error: can't parse maxRetryDelay field: %w", err)
}
m.maxRetryDelay = time.Duration(parsedVal)
}
@ -155,28 +162,51 @@ func parseMetadata(meta configuration.Metadata) (metadata, error) {
if val, ok := meta.Properties[retryDelay]; ok && val != "" {
parsedVal, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("azure appconfig error: can't parse retryDelay field: %s", err)
return m, fmt.Errorf("azure appconfig error: can't parse retryDelay field: %w", err)
}
m.retryDelay = time.Duration(parsedVal)
}
m.subscribePollInterval = defaultSubscribePollInterval
if val, ok := meta.Properties[subscribePollInterval]; ok && val != "" {
parsedVal, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("azure appconfig error: can't parse subscribePollInterval field: %w", err)
}
m.subscribePollInterval = time.Duration(parsedVal)
}
m.requestTimeout = defaultRequestTimeout
if val, ok := meta.Properties[requestTimeout]; ok && val != "" {
parsedVal, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("azure appconfig error: can't parse requestTimeout field: %w", err)
}
m.requestTimeout = time.Duration(parsedVal)
}
return m, nil
}
func (r *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequest) (*configuration.GetResponse, error) {
timeoutContext, cancel := context.WithTimeout(ctx, r.metadata.requestTimeout)
defer cancel()
keys := req.Keys
var items map[string]*configuration.Item
if len(keys) == 0 {
var err error
if items, err = r.getAll(ctx, req); err != nil {
if items, err = r.getAll(timeoutContext, req); err != nil {
return &configuration.GetResponse{}, err
}
} else {
items = make(map[string]*configuration.Item, len(keys))
for _, key := range keys {
// TODO: here contxt.TODO() is used because the SDK panics when a cancelled context is passed in GetSetting
// Issue - https://github.com/Azure/azure-sdk-for-go/issues/19223 . Needs to be modified to use timeoutContext once the SDK is fixed
resp, err := r.client.GetSetting(
ctx,
context.TODO(),
key,
&azappconfig.GetSettingOptions{
Label: r.getLabelFromMetadata(req.Metadata),
@ -197,7 +227,6 @@ func (r *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequ
items[key] = item
}
}
return &configuration.GetResponse{
Items: items,
}, nil
@ -219,8 +248,10 @@ func (r *ConfigurationStore) getAll(ctx context.Context, req *configuration.GetR
},
nil)
// TODO: here contxt.TODO() is used because the SDK panics when a cancelled context is passed in NextPage
// Issue - https://github.com/Azure/azure-sdk-for-go/issues/19223 . It needs to be modified to use ctx once the SDK is fixed
for allSettingsPgr.More() {
if revResp, err := allSettingsPgr.NextPage(ctx); err == nil {
if revResp, err := allSettingsPgr.NextPage(context.TODO()); err == nil {
for _, setting := range revResp.Settings {
item := &configuration.Item{
Metadata: map[string]string{},
@ -233,7 +264,7 @@ func (r *ConfigurationStore) getAll(ctx context.Context, req *configuration.GetR
items[*setting.Key] = item
}
} else {
return nil, fmt.Errorf("failed to load all keys, error is %s", err)
return nil, fmt.Errorf("failed to load all keys, error is %w", err)
}
}
return items, nil
@ -248,9 +279,73 @@ func (r *ConfigurationStore) getLabelFromMetadata(metadata map[string]string) *s
}
func (r *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler) (string, error) {
return "", fmt.Errorf("Subscribe is not implemented by this configuration store")
sentinelKey := r.getSentinelKeyFromMetadata(req.Metadata)
if sentinelKey == "" {
return "", fmt.Errorf("azure appconfig error: sentinel key is not provided in metadata")
}
uuid, err := uuid.NewRandom()
if err != nil {
return "", fmt.Errorf("azure appconfig error: failed to generate uuid, error is %w", err)
}
subscribeID := uuid.String()
childContext, cancel := context.WithCancel(ctx)
r.subscribeCancelCtxMap.Store(subscribeID, cancel)
go r.doSubscribe(childContext, req, handler, sentinelKey, subscribeID)
return subscribeID, nil
}
func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, sentinelKey string, id string) {
for {
// get sentinel key changes
_, err := r.Get(ctx, &configuration.GetRequest{
Keys: []string{sentinelKey},
Metadata: req.Metadata,
})
if err != nil {
r.logger.Debugf("azure appconfig error: fail to get sentinel key changes or sentinel key's value is unchanged: %s", err)
} else {
items, err := r.Get(ctx, &configuration.GetRequest{
Keys: req.Keys,
Metadata: req.Metadata,
})
if err != nil {
r.logger.Errorf("azure appconfig error: fail to get configuration key changes: %s", err)
} else {
r.handleSubscribedChange(ctx, handler, items, id)
}
}
select {
case <-ctx.Done():
return
case <-time.After(r.metadata.subscribePollInterval):
}
}
}
func (r *ConfigurationStore) handleSubscribedChange(ctx context.Context, handler configuration.UpdateHandler, items *configuration.GetResponse, id string) {
e := &configuration.UpdateEvent{
Items: items.Items,
ID: id,
}
err := handler(ctx, e)
if err != nil {
r.logger.Errorf("azure appconfig error: fail to call handler to notify event for configuration update subscribe: %s", err)
}
}
func (r *ConfigurationStore) getSentinelKeyFromMetadata(metadata map[string]string) string {
if s, ok := metadata["sentinelKey"]; ok && s != "" {
return s
}
return ""
}
func (r *ConfigurationStore) Unsubscribe(ctx context.Context, req *configuration.UnsubscribeRequest) error {
return fmt.Errorf("Unsubscribe is not implemented by this configuration store")
if cancelContext, ok := r.subscribeCancelCtxMap.Load(req.ID); ok {
// already exist subscription
r.subscribeCancelCtxMap.Delete(req.ID)
cancelContext.(context.CancelFunc)()
return nil
}
return fmt.Errorf("azure appconfig error: subscription with id %s does not exist", req.ID)
}

View File

@ -35,7 +35,7 @@ type MockConfigurationStore struct {
}
func (m *MockConfigurationStore) GetSetting(ctx context.Context, key string, options *azappconfig.GetSettingOptions) (azappconfig.GetSettingResponse, error) {
if key == "testKey" {
if key == "testKey" || key == "test_sentinel_key" {
settings := azappconfig.Setting{}
settings.Key = to.StringPtr("testKey")
@ -113,6 +113,66 @@ func Test_getConfigurationWithProvidedKeys(t *testing.T) {
})
}
func Test_subscribeConfigurationWithProvidedKeys(t *testing.T) {
s := NewAzureAppConfigurationStore(logger.NewLogger("test")).(*ConfigurationStore)
s.client = &MockConfigurationStore{}
metadata := make(map[string]string)
metadata["sentinelKey"] = "test_sentinel_key"
t.Run("call subscribe with sentinel key", func(t *testing.T) {
req := configuration.SubscribeRequest{
Keys: []string{"testKey"},
Metadata: metadata,
}
subID, err := s.Subscribe(context.Background(), &req, updateEventHandler)
assert.True(t, len(subID) > 0)
assert.Nil(t, err)
unReq := &configuration.UnsubscribeRequest{
ID: subID,
}
s.Unsubscribe(context.Background(), unReq)
})
t.Run("call subscribe w/o sentinel key", func(t *testing.T) {
req := configuration.SubscribeRequest{
Keys: []string{"testKey"},
Metadata: make(map[string]string),
}
_, err := s.Subscribe(context.Background(), &req, updateEventHandler)
assert.NotNil(t, err)
})
}
func Test_unsubscribeConfigurationWithProvidedKeys(t *testing.T) {
s := NewAzureAppConfigurationStore(logger.NewLogger("test")).(*ConfigurationStore)
s.client = &MockConfigurationStore{}
cancelContext, cancel := context.WithCancel(context.Background())
s.subscribeCancelCtxMap.Store("id1", cancel)
t.Run("call unsubscribe with incorrect subId", func(t *testing.T) {
req := configuration.UnsubscribeRequest{
ID: "id_not_exist",
}
err := s.Unsubscribe(cancelContext, &req)
assert.NotNil(t, err)
_, ok := s.subscribeCancelCtxMap.Load("id1")
assert.True(t, ok)
})
t.Run("call unsubscribe with correct subId", func(t *testing.T) {
req := configuration.UnsubscribeRequest{
ID: "id1",
}
err := s.Unsubscribe(cancelContext, &req)
assert.Nil(t, err)
_, ok := s.subscribeCancelCtxMap.Load("id1")
assert.False(t, ok)
})
}
func Test_getConfigurationWithNoProvidedKeys(t *testing.T) {
s := NewAzureAppConfigurationStore(logger.NewLogger("test")).(*ConfigurationStore)
@ -137,6 +197,8 @@ func TestInit(t *testing.T) {
testProperties[maxRetries] = "3"
testProperties[retryDelay] = "4000000000"
testProperties[maxRetryDelay] = "120000000000"
testProperties[subscribePollInterval] = "30000000000"
testProperties[requestTimeout] = "30000000000"
m := configuration.Metadata{Base: mdata.Base{
Properties: testProperties,
@ -150,6 +212,8 @@ func TestInit(t *testing.T) {
assert.Equal(t, 3, cs.metadata.maxRetries)
assert.Equal(t, time.Second*4, cs.metadata.retryDelay)
assert.Equal(t, time.Second*120, cs.metadata.maxRetryDelay)
assert.Equal(t, time.Second*30, cs.metadata.subscribePollInterval)
assert.Equal(t, time.Second*30, cs.metadata.requestTimeout)
})
t.Run("Init with valid appConfigConnectionString metadata", func(t *testing.T) {
@ -158,6 +222,8 @@ func TestInit(t *testing.T) {
testProperties[maxRetries] = "3"
testProperties[retryDelay] = "4000000000"
testProperties[maxRetryDelay] = "120000000000"
testProperties[subscribePollInterval] = "30000000000"
testProperties[requestTimeout] = "30000000000"
m := configuration.Metadata{Base: mdata.Base{
Properties: testProperties,
@ -171,6 +237,8 @@ func TestInit(t *testing.T) {
assert.Equal(t, 3, cs.metadata.maxRetries)
assert.Equal(t, time.Second*4, cs.metadata.retryDelay)
assert.Equal(t, time.Second*120, cs.metadata.maxRetryDelay)
assert.Equal(t, time.Second*30, cs.metadata.subscribePollInterval)
assert.Equal(t, time.Second*30, cs.metadata.requestTimeout)
})
}
@ -181,16 +249,20 @@ func Test_parseMetadata(t *testing.T) {
testProperties[maxRetries] = "3"
testProperties[retryDelay] = "4000000000"
testProperties[maxRetryDelay] = "120000000000"
testProperties[subscribePollInterval] = "30000000000"
testProperties[requestTimeout] = "30000000000"
meta := configuration.Metadata{Base: mdata.Base{
Properties: testProperties,
}}
want := metadata{
host: "testHost",
maxRetries: 3,
retryDelay: time.Second * 4,
maxRetryDelay: time.Second * 120,
host: "testHost",
maxRetries: 3,
retryDelay: time.Second * 4,
maxRetryDelay: time.Second * 120,
subscribePollInterval: time.Second * 30,
requestTimeout: time.Second * 30,
}
m, _ := parseMetadata(meta)
@ -206,16 +278,20 @@ func Test_parseMetadata(t *testing.T) {
testProperties[maxRetries] = "3"
testProperties[retryDelay] = "4000000000"
testProperties[maxRetryDelay] = "120000000000"
testProperties[subscribePollInterval] = "30000000000"
testProperties[requestTimeout] = "30000000000"
meta := configuration.Metadata{Base: mdata.Base{
Properties: testProperties,
}}
want := metadata{
connectionString: "testConnectionString",
maxRetries: 3,
retryDelay: time.Second * 4,
maxRetryDelay: time.Second * 120,
connectionString: "testConnectionString",
maxRetries: 3,
retryDelay: time.Second * 4,
maxRetryDelay: time.Second * 120,
subscribePollInterval: time.Second * 30,
requestTimeout: time.Second * 30,
}
m, _ := parseMetadata(meta)
@ -232,6 +308,8 @@ func Test_parseMetadata(t *testing.T) {
testProperties[maxRetries] = "3"
testProperties[retryDelay] = "4000000000"
testProperties[maxRetryDelay] = "120000000000"
testProperties[subscribePollInterval] = "30000000000"
testProperties[requestTimeout] = "30000000000"
meta := configuration.Metadata{Base: mdata.Base{
Properties: testProperties,
@ -248,6 +326,8 @@ func Test_parseMetadata(t *testing.T) {
testProperties[maxRetries] = "3"
testProperties[retryDelay] = "4000000000"
testProperties[maxRetryDelay] = "120000000000"
testProperties[subscribePollInterval] = "30000000000"
testProperties[requestTimeout] = "30000000000"
meta := configuration.Metadata{Base: mdata.Base{
Properties: testProperties,
@ -257,3 +337,7 @@ func Test_parseMetadata(t *testing.T) {
assert.Error(t, err)
})
}
func updateEventHandler(ctx context.Context, e *configuration.UpdateEvent) error {
return nil
}

View File

@ -16,9 +16,11 @@ package appconfig
import "time"
type metadata struct {
host string
connectionString string
maxRetries int
maxRetryDelay time.Duration
retryDelay time.Duration
host string
connectionString string
maxRetries int
maxRetryDelay time.Duration
retryDelay time.Duration
subscribePollInterval time.Duration
requestTimeout time.Duration
}

View File

@ -291,8 +291,9 @@ func (r *ConfigurationStore) Unsubscribe(ctx context.Context, req *configuration
// already exist subscription
r.subscribeStopChanMap.Delete(req.ID)
close(oldStopChan.(chan struct{}))
return nil
}
return nil
return fmt.Errorf("subscription with id %s does not exist", req.ID)
}
func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, redisChannel4revision string, id string, stop chan struct{}) {

2
go.mod
View File

@ -155,7 +155,7 @@ require (
require (
cloud.google.com/go/secretmanager v1.4.0
dubbo.apache.org/dubbo-go/v3 v3.0.3-0.20220610080020-48691a404537
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.1-0.20220914204640-4d445978fe75
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.1
github.com/aliyun/aliyun-log-go-sdk v0.1.37
github.com/apache/dubbo-go-hessian2 v1.11.0
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.21.12+incompatible

4
go.sum
View File

@ -108,8 +108,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.3 h1:8LoU8N2lIUzkmstvwXvVfniMZ
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.3/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.1-0.20220914204640-4d445978fe75 h1:fXg1PuTP8+xhOnxrPtteHGbreNsTRFvu+NsbLCqQrWg=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.1-0.20220914204640-4d445978fe75/go.mod h1:p74+tP95m8830ypJk53L93+BEsjTKY4SKQ75J2NmS5U=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.1 h1:tuDqyaz/iP/1vkmu4aWwhKDe2nSuMMpVm0FERxwMC60=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.1/go.mod h1:p74+tP95m8830ypJk53L93+BEsjTKY4SKQ75J2NmS5U=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2 h1:yJegJqjhrMJ3Oe5s43jOTGL2AsE7pJyx+7Yqls/65tw=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 h1:bFa9IcjvrCber6gGgDAUZ+I2bO8J7s8JxXmu9fhi2ss=