Standardize metadata parsing in configstores and lockstore (#2710)
Signed-off-by: Bernd Verst <github@bernd.dev>
This commit is contained in:
parent
3dc2691635
commit
2266e7ad44
|
@ -17,7 +17,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -30,6 +30,7 @@ import (
|
||||||
|
|
||||||
"github.com/dapr/components-contrib/configuration"
|
"github.com/dapr/components-contrib/configuration"
|
||||||
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
|
azauth "github.com/dapr/components-contrib/internal/authentication/azure"
|
||||||
|
contribMetadata "github.com/dapr/components-contrib/metadata"
|
||||||
|
|
||||||
"github.com/dapr/kit/logger"
|
"github.com/dapr/kit/logger"
|
||||||
)
|
)
|
||||||
|
@ -85,9 +86,9 @@ func (r *ConfigurationStore) Init(_ context.Context, metadata configuration.Meta
|
||||||
ApplicationID: "dapr-" + logger.DaprVersion,
|
ApplicationID: "dapr-" + logger.DaprVersion,
|
||||||
},
|
},
|
||||||
Retry: policy.RetryOptions{
|
Retry: policy.RetryOptions{
|
||||||
MaxRetries: int32(m.maxRetries),
|
MaxRetries: int32(m.MaxRetries),
|
||||||
RetryDelay: m.maxRetryDelay,
|
RetryDelay: m.internalMaxRetryDelay,
|
||||||
MaxRetryDelay: m.maxRetryDelay,
|
MaxRetryDelay: m.internalMaxRetryDelay,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,8 +96,8 @@ func (r *ConfigurationStore) Init(_ context.Context, metadata configuration.Meta
|
||||||
ClientOptions: coreClientOpts,
|
ClientOptions: coreClientOpts,
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.metadata.connectionString != "" {
|
if r.metadata.ConnectionString != "" {
|
||||||
r.client, err = azappconfig.NewClientFromConnectionString(r.metadata.connectionString, &options)
|
r.client, err = azappconfig.NewClientFromConnectionString(r.metadata.ConnectionString, &options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -113,7 +114,7 @@ func (r *ConfigurationStore) Init(_ context.Context, metadata configuration.Meta
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
r.client, err = azappconfig.NewClient(r.metadata.host, cred, &options)
|
r.client, err = azappconfig.NewClient(r.metadata.Host, cred, &options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -123,67 +124,37 @@ func (r *ConfigurationStore) Init(_ context.Context, metadata configuration.Meta
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseMetadata(meta configuration.Metadata) (metadata, error) {
|
func parseMetadata(meta configuration.Metadata) (metadata, error) {
|
||||||
m := metadata{}
|
m := metadata{
|
||||||
|
MaxRetries: defaultMaxRetries,
|
||||||
if val, ok := meta.Properties[host]; ok && val != "" {
|
internalMaxRetryDelay: defaultMaxRetryDelay,
|
||||||
m.host = val
|
internalRetryDelay: defaultRetryDelay,
|
||||||
|
internalSubscribePollInterval: defaultSubscribePollInterval,
|
||||||
|
internalRequestTimeout: defaultRequestTimeout,
|
||||||
|
}
|
||||||
|
decodeErr := contribMetadata.DecodeMetadata(meta.Properties, &m)
|
||||||
|
if decodeErr != nil {
|
||||||
|
return m, decodeErr
|
||||||
}
|
}
|
||||||
|
|
||||||
if val, ok := meta.Properties[connectionString]; ok && val != "" {
|
if m.ConnectionString != "" && m.Host != "" {
|
||||||
m.connectionString = val
|
|
||||||
}
|
|
||||||
|
|
||||||
if m.connectionString != "" && m.host != "" {
|
|
||||||
return m, fmt.Errorf("azure appconfig error: can't set both %s and %s fields in metadata", host, connectionString)
|
return m, fmt.Errorf("azure appconfig error: can't set both %s and %s fields in metadata", host, connectionString)
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.connectionString == "" && m.host == "" {
|
if m.ConnectionString == "" && m.Host == "" {
|
||||||
return m, fmt.Errorf("azure appconfig error: specify %s or %s field in metadata", host, connectionString)
|
return m, fmt.Errorf("azure appconfig error: specify %s or %s field in metadata", host, connectionString)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.maxRetries = defaultMaxRetries
|
if m.MaxRetryDelay != nil {
|
||||||
if val, ok := meta.Properties[maxRetries]; ok && val != "" {
|
m.internalMaxRetryDelay = time.Duration(*m.MaxRetryDelay)
|
||||||
parsedVal, err := strconv.Atoi(val)
|
|
||||||
if err != nil {
|
|
||||||
return m, fmt.Errorf("azure appconfig error: can't parse maxRetries field: %w", err)
|
|
||||||
}
|
|
||||||
m.maxRetries = parsedVal
|
|
||||||
}
|
}
|
||||||
|
if m.RetryDelay != nil {
|
||||||
m.maxRetryDelay = defaultMaxRetryDelay
|
m.internalRetryDelay = time.Duration(*m.RetryDelay)
|
||||||
if val, ok := meta.Properties[maxRetryDelay]; ok && val != "" {
|
|
||||||
parsedVal, err := strconv.Atoi(val)
|
|
||||||
if err != nil {
|
|
||||||
return m, fmt.Errorf("azure appconfig error: can't parse maxRetryDelay field: %w", err)
|
|
||||||
}
|
|
||||||
m.maxRetryDelay = time.Duration(parsedVal)
|
|
||||||
}
|
}
|
||||||
|
if m.SubscribePollInterval != nil {
|
||||||
m.retryDelay = defaultRetryDelay
|
m.internalSubscribePollInterval = time.Duration(*m.SubscribePollInterval)
|
||||||
if val, ok := meta.Properties[retryDelay]; ok && val != "" {
|
|
||||||
parsedVal, err := strconv.Atoi(val)
|
|
||||||
if err != nil {
|
|
||||||
return m, fmt.Errorf("azure appconfig error: can't parse retryDelay field: %w", err)
|
|
||||||
}
|
|
||||||
m.retryDelay = time.Duration(parsedVal)
|
|
||||||
}
|
}
|
||||||
|
if m.RequestTimeout != nil {
|
||||||
m.subscribePollInterval = defaultSubscribePollInterval
|
m.internalRequestTimeout = time.Duration(*m.RequestTimeout)
|
||||||
if val, ok := meta.Properties[subscribePollInterval]; ok && val != "" {
|
|
||||||
parsedVal, err := strconv.Atoi(val)
|
|
||||||
if err != nil {
|
|
||||||
return m, fmt.Errorf("azure appconfig error: can't parse subscribePollInterval field: %w", err)
|
|
||||||
}
|
|
||||||
m.subscribePollInterval = time.Duration(parsedVal)
|
|
||||||
}
|
|
||||||
|
|
||||||
m.requestTimeout = defaultRequestTimeout
|
|
||||||
if val, ok := meta.Properties[requestTimeout]; ok && val != "" {
|
|
||||||
parsedVal, err := strconv.Atoi(val)
|
|
||||||
if err != nil {
|
|
||||||
return m, fmt.Errorf("azure appconfig error: can't parse requestTimeout field: %w", err)
|
|
||||||
}
|
|
||||||
m.requestTimeout = time.Duration(parsedVal)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
|
@ -245,7 +216,7 @@ func (r *ConfigurationStore) getAll(ctx context.Context, req *configuration.GetR
|
||||||
nil)
|
nil)
|
||||||
|
|
||||||
for allSettingsPgr.More() {
|
for allSettingsPgr.More() {
|
||||||
timeoutContext, cancel := context.WithTimeout(ctx, r.metadata.requestTimeout)
|
timeoutContext, cancel := context.WithTimeout(ctx, r.metadata.internalRequestTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if revResp, err := allSettingsPgr.NextPage(timeoutContext); err == nil {
|
if revResp, err := allSettingsPgr.NextPage(timeoutContext); err == nil {
|
||||||
for _, setting := range revResp.Settings {
|
for _, setting := range revResp.Settings {
|
||||||
|
@ -326,13 +297,13 @@ func (r *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-time.After(r.metadata.subscribePollInterval):
|
case <-time.After(r.metadata.internalSubscribePollInterval):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ConfigurationStore) getSettings(ctx context.Context, key string, getSettingsOptions *azappconfig.GetSettingOptions) (azappconfig.GetSettingResponse, error) {
|
func (r *ConfigurationStore) getSettings(ctx context.Context, key string, getSettingsOptions *azappconfig.GetSettingOptions) (azappconfig.GetSettingResponse, error) {
|
||||||
timeoutContext, cancel := context.WithTimeout(ctx, r.metadata.requestTimeout)
|
timeoutContext, cancel := context.WithTimeout(ctx, r.metadata.internalRequestTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
resp, err := r.client.GetSetting(timeoutContext, key, getSettingsOptions)
|
resp, err := r.client.GetSetting(timeoutContext, key, getSettingsOptions)
|
||||||
return resp, err
|
return resp, err
|
||||||
|
@ -365,3 +336,11 @@ func (r *ConfigurationStore) Unsubscribe(ctx context.Context, req *configuration
|
||||||
}
|
}
|
||||||
return fmt.Errorf("azure appconfig error: subscription with id %s does not exist", req.ID)
|
return fmt.Errorf("azure appconfig error: subscription with id %s does not exist", req.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetComponentMetadata returns the metadata of the component.
|
||||||
|
func (r *ConfigurationStore) GetComponentMetadata() map[string]string {
|
||||||
|
metadataStruct := metadata{}
|
||||||
|
metadataInfo := map[string]string{}
|
||||||
|
contribMetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo)
|
||||||
|
return metadataInfo
|
||||||
|
}
|
||||||
|
|
|
@ -16,7 +16,6 @@ package appconfig
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -206,12 +205,12 @@ func TestInit(t *testing.T) {
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
cs, ok := s.(*ConfigurationStore)
|
cs, ok := s.(*ConfigurationStore)
|
||||||
assert.True(t, ok)
|
assert.True(t, ok)
|
||||||
assert.Equal(t, testProperties[host], cs.metadata.host)
|
assert.Equal(t, testProperties[host], cs.metadata.Host)
|
||||||
assert.Equal(t, 3, cs.metadata.maxRetries)
|
assert.Equal(t, 3, cs.metadata.MaxRetries)
|
||||||
assert.Equal(t, time.Second*4, cs.metadata.retryDelay)
|
assert.Equal(t, time.Second*4, cs.metadata.internalRetryDelay)
|
||||||
assert.Equal(t, time.Second*120, cs.metadata.maxRetryDelay)
|
assert.Equal(t, time.Second*120, cs.metadata.internalMaxRetryDelay)
|
||||||
assert.Equal(t, time.Second*30, cs.metadata.subscribePollInterval)
|
assert.Equal(t, time.Second*30, cs.metadata.internalSubscribePollInterval)
|
||||||
assert.Equal(t, time.Second*30, cs.metadata.requestTimeout)
|
assert.Equal(t, time.Second*30, cs.metadata.internalRequestTimeout)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Init with valid appConfigConnectionString metadata", func(t *testing.T) {
|
t.Run("Init with valid appConfigConnectionString metadata", func(t *testing.T) {
|
||||||
|
@ -231,12 +230,12 @@ func TestInit(t *testing.T) {
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
cs, ok := s.(*ConfigurationStore)
|
cs, ok := s.(*ConfigurationStore)
|
||||||
assert.True(t, ok)
|
assert.True(t, ok)
|
||||||
assert.Equal(t, testProperties[connectionString], cs.metadata.connectionString)
|
assert.Equal(t, testProperties[connectionString], cs.metadata.ConnectionString)
|
||||||
assert.Equal(t, 3, cs.metadata.maxRetries)
|
assert.Equal(t, 3, cs.metadata.MaxRetries)
|
||||||
assert.Equal(t, time.Second*4, cs.metadata.retryDelay)
|
assert.Equal(t, time.Second*4, cs.metadata.internalRetryDelay)
|
||||||
assert.Equal(t, time.Second*120, cs.metadata.maxRetryDelay)
|
assert.Equal(t, time.Second*120, cs.metadata.internalMaxRetryDelay)
|
||||||
assert.Equal(t, time.Second*30, cs.metadata.subscribePollInterval)
|
assert.Equal(t, time.Second*30, cs.metadata.internalSubscribePollInterval)
|
||||||
assert.Equal(t, time.Second*30, cs.metadata.requestTimeout)
|
assert.Equal(t, time.Second*30, cs.metadata.internalRequestTimeout)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -255,19 +254,22 @@ func Test_parseMetadata(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
|
|
||||||
want := metadata{
|
want := metadata{
|
||||||
host: "testHost",
|
Host: "testHost",
|
||||||
maxRetries: 3,
|
MaxRetries: 3,
|
||||||
retryDelay: time.Second * 4,
|
internalRetryDelay: time.Second * 4,
|
||||||
maxRetryDelay: time.Second * 120,
|
internalMaxRetryDelay: time.Second * 120,
|
||||||
subscribePollInterval: time.Second * 30,
|
internalSubscribePollInterval: time.Second * 30,
|
||||||
requestTimeout: time.Second * 30,
|
internalRequestTimeout: time.Second * 30,
|
||||||
}
|
}
|
||||||
|
|
||||||
m, _ := parseMetadata(meta)
|
m, _ := parseMetadata(meta)
|
||||||
assert.NotNil(t, m)
|
assert.NotNil(t, m)
|
||||||
if !reflect.DeepEqual(m, want) {
|
assert.Equal(t, want.Host, m.Host)
|
||||||
t.Errorf("parseMetadata() got = %v, want %v", m, want)
|
assert.Equal(t, want.MaxRetries, m.MaxRetries)
|
||||||
}
|
assert.Equal(t, want.internalRetryDelay, m.internalRetryDelay)
|
||||||
|
assert.Equal(t, want.internalMaxRetryDelay, m.internalMaxRetryDelay)
|
||||||
|
assert.Equal(t, want.internalSubscribePollInterval, m.internalSubscribePollInterval)
|
||||||
|
assert.Equal(t, want.internalRequestTimeout, m.internalRequestTimeout)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run(fmt.Sprintf("parse metadata with %s", connectionString), func(t *testing.T) {
|
t.Run(fmt.Sprintf("parse metadata with %s", connectionString), func(t *testing.T) {
|
||||||
|
@ -284,19 +286,22 @@ func Test_parseMetadata(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
|
|
||||||
want := metadata{
|
want := metadata{
|
||||||
connectionString: "testConnectionString",
|
ConnectionString: "testConnectionString",
|
||||||
maxRetries: 3,
|
MaxRetries: 3,
|
||||||
retryDelay: time.Second * 4,
|
internalRetryDelay: time.Second * 4,
|
||||||
maxRetryDelay: time.Second * 120,
|
internalMaxRetryDelay: time.Second * 120,
|
||||||
subscribePollInterval: time.Second * 30,
|
internalSubscribePollInterval: time.Second * 30,
|
||||||
requestTimeout: time.Second * 30,
|
internalRequestTimeout: time.Second * 30,
|
||||||
}
|
}
|
||||||
|
|
||||||
m, _ := parseMetadata(meta)
|
m, _ := parseMetadata(meta)
|
||||||
assert.NotNil(t, m)
|
assert.NotNil(t, m)
|
||||||
if !reflect.DeepEqual(m, want) {
|
assert.Equal(t, want.ConnectionString, m.ConnectionString)
|
||||||
t.Errorf("parseMetadata() got = %v, want %v", m, want)
|
assert.Equal(t, want.MaxRetries, m.MaxRetries)
|
||||||
}
|
assert.Equal(t, want.internalRetryDelay, m.internalRetryDelay)
|
||||||
|
assert.Equal(t, want.internalMaxRetryDelay, m.internalMaxRetryDelay)
|
||||||
|
assert.Equal(t, want.internalSubscribePollInterval, m.internalSubscribePollInterval)
|
||||||
|
assert.Equal(t, want.internalRequestTimeout, m.internalRequestTimeout)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run(fmt.Sprintf("both %s and %s fields set in metadata", host, connectionString), func(t *testing.T) {
|
t.Run(fmt.Sprintf("both %s and %s fields set in metadata", host, connectionString), func(t *testing.T) {
|
||||||
|
|
|
@ -16,11 +16,16 @@ package appconfig
|
||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
type metadata struct {
|
type metadata struct {
|
||||||
host string
|
Host string `mapstructure:"host"`
|
||||||
connectionString string
|
ConnectionString string `mapstructure:"connectionString"`
|
||||||
maxRetries int
|
MaxRetries int `mapstructure:"maxRetries"`
|
||||||
maxRetryDelay time.Duration
|
MaxRetryDelay *int `mapstructure:"maxRetryDelay"`
|
||||||
retryDelay time.Duration
|
RetryDelay *int `mapstructure:"retryDelay"`
|
||||||
subscribePollInterval time.Duration
|
SubscribePollInterval *int `mapstructure:"subscribePollInterval"`
|
||||||
requestTimeout time.Duration
|
RequestTimeout *int `mapstructure:"requestTimeout"`
|
||||||
|
|
||||||
|
internalRequestTimeout time.Duration `mapstructure:"-"`
|
||||||
|
internalMaxRetryDelay time.Duration `mapstructure:"-"`
|
||||||
|
internalSubscribePollInterval time.Duration `mapstructure:"-"`
|
||||||
|
internalRetryDelay time.Duration `mapstructure:"-"`
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ package postgres
|
||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
type metadata struct {
|
type metadata struct {
|
||||||
maxIdleTimeout time.Duration
|
MaxIdleTimeout time.Duration `mapstructure:"connMaxIdleTime"`
|
||||||
connectionString string
|
ConnectionString string `mapstructure:"connectionString"`
|
||||||
configTable string
|
ConfigTable string `mapstructure:"table"`
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import (
|
||||||
"k8s.io/utils/strings/slices"
|
"k8s.io/utils/strings/slices"
|
||||||
|
|
||||||
"github.com/dapr/components-contrib/configuration"
|
"github.com/dapr/components-contrib/configuration"
|
||||||
|
contribMetadata "github.com/dapr/components-contrib/metadata"
|
||||||
"github.com/dapr/kit/logger"
|
"github.com/dapr/kit/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -98,9 +99,9 @@ func (p *ConfigurationStore) Init(parentCtx context.Context, metadata configurat
|
||||||
p.metadata = m
|
p.metadata = m
|
||||||
}
|
}
|
||||||
p.ActiveSubscriptions = make(map[string]*subscription)
|
p.ActiveSubscriptions = make(map[string]*subscription)
|
||||||
ctx, cancel := context.WithTimeout(parentCtx, p.metadata.maxIdleTimeout)
|
ctx, cancel := context.WithTimeout(parentCtx, p.metadata.MaxIdleTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
client, err := Connect(ctx, p.metadata.connectionString, p.metadata.maxIdleTimeout)
|
client, err := Connect(ctx, p.metadata.ConnectionString, p.metadata.MaxIdleTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error connecting to configuration store: '%w'", err)
|
return fmt.Errorf("error connecting to configuration store: '%w'", err)
|
||||||
}
|
}
|
||||||
|
@ -111,12 +112,12 @@ func (p *ConfigurationStore) Init(parentCtx context.Context, metadata configurat
|
||||||
}
|
}
|
||||||
// check if table exists
|
// check if table exists
|
||||||
exists := false
|
exists := false
|
||||||
err = p.client.QueryRow(ctx, QueryTableExists, p.metadata.configTable).Scan(&exists)
|
err = p.client.QueryRow(ctx, QueryTableExists, p.metadata.ConfigTable).Scan(&exists)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, pgx.ErrNoRows) {
|
if errors.Is(err, pgx.ErrNoRows) {
|
||||||
return fmt.Errorf(ErrorMissingTable, p.metadata.configTable)
|
return fmt.Errorf(ErrorMissingTable, p.metadata.ConfigTable)
|
||||||
}
|
}
|
||||||
return fmt.Errorf("error in checking if configtable '%s' exists - '%w'", p.metadata.configTable, err)
|
return fmt.Errorf("error in checking if configtable '%s' exists - '%w'", p.metadata.ConfigTable, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -126,7 +127,7 @@ func (p *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequ
|
||||||
p.logger.Error(err)
|
p.logger.Error(err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
query, params, err := buildQuery(req, p.metadata.configTable)
|
query, params, err := buildQuery(req, p.metadata.ConfigTable)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.logger.Error(err)
|
p.logger.Error(err)
|
||||||
return nil, fmt.Errorf("error in configuration store query: '%w' ", err)
|
return nil, fmt.Errorf("error in configuration store query: '%w' ", err)
|
||||||
|
@ -168,7 +169,7 @@ func (p *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.S
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(pgNotifyChannels) == 0 {
|
if len(pgNotifyChannels) == 0 {
|
||||||
return "", fmt.Errorf("unable to subscribe to '%s'.pgNotifyChannel attribute cannot be empty", p.metadata.configTable)
|
return "", fmt.Errorf("unable to subscribe to '%s'.pgNotifyChannel attribute cannot be empty", p.metadata.ConfigTable)
|
||||||
}
|
}
|
||||||
return p.subscribeToChannel(ctx, pgNotifyChannels, req, handler)
|
return p.subscribeToChannel(ctx, pgNotifyChannels, req, handler)
|
||||||
}
|
}
|
||||||
|
@ -278,31 +279,30 @@ func (p *ConfigurationStore) handleSubscribedChange(ctx context.Context, handler
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseMetadata(cmetadata configuration.Metadata) (metadata, error) {
|
func parseMetadata(cmetadata configuration.Metadata) (metadata, error) {
|
||||||
m := metadata{}
|
m := metadata{
|
||||||
if val, ok := cmetadata.Properties[connectionStringKey]; ok && val != "" {
|
MaxIdleTimeout: defaultMaxConnIdleTime,
|
||||||
m.connectionString = val
|
}
|
||||||
} else {
|
decodeErr := contribMetadata.DecodeMetadata(cmetadata.Properties, &m)
|
||||||
|
if decodeErr != nil {
|
||||||
|
return m, decodeErr
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.ConnectionString == "" {
|
||||||
return m, fmt.Errorf(ErrorMissingConnectionString)
|
return m, fmt.Errorf(ErrorMissingConnectionString)
|
||||||
}
|
}
|
||||||
if tbl, ok := cmetadata.Properties[configtablekey]; ok && tbl != "" {
|
|
||||||
if !allowedChars.MatchString(tbl) {
|
if m.ConfigTable != "" {
|
||||||
return m, fmt.Errorf("invalid table name : '%v'. non-alphanumerics are not supported", tbl)
|
if !allowedChars.MatchString(m.ConfigTable) {
|
||||||
|
return m, fmt.Errorf("invalid table name : '%v'. non-alphanumerics are not supported", m.ConfigTable)
|
||||||
}
|
}
|
||||||
if len(tbl) > maxIdentifierLength {
|
if len(m.ConfigTable) > maxIdentifierLength {
|
||||||
return m, fmt.Errorf(ErrorTooLongFieldLength+" - tableName : '%v'. max allowed field length is %v ", tbl, maxIdentifierLength)
|
return m, fmt.Errorf(ErrorTooLongFieldLength+" - tableName : '%v'. max allowed field length is %v ", m.ConfigTable, maxIdentifierLength)
|
||||||
}
|
}
|
||||||
m.configTable = tbl
|
|
||||||
} else {
|
} else {
|
||||||
return m, fmt.Errorf(ErrorMissingTableName)
|
return m, fmt.Errorf(ErrorMissingTableName)
|
||||||
}
|
}
|
||||||
// configure maxTimeout if provided
|
if m.MaxIdleTimeout <= 0 {
|
||||||
if mxTimeout, ok := cmetadata.Properties[connMaxIdleTimeKey]; ok && mxTimeout != "" {
|
m.MaxIdleTimeout = defaultMaxConnIdleTime
|
||||||
if t, err := time.ParseDuration(mxTimeout); err == nil && t > 0 {
|
|
||||||
m.maxIdleTimeout = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if m.maxIdleTimeout <= 0 {
|
|
||||||
m.maxIdleTimeout = defaultMaxConnIdleTime
|
|
||||||
}
|
}
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
@ -416,3 +416,11 @@ func (p *ConfigurationStore) subscribeToChannel(ctx context.Context, pgNotifyCha
|
||||||
}
|
}
|
||||||
return subscribeID, nil
|
return subscribeID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetComponentMetadata returns the metadata of the component.
|
||||||
|
func (p *ConfigurationStore) GetComponentMetadata() map[string]string {
|
||||||
|
metadataStruct := metadata{}
|
||||||
|
metadataInfo := map[string]string{}
|
||||||
|
contribMetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo)
|
||||||
|
return metadataInfo
|
||||||
|
}
|
||||||
|
|
|
@ -80,15 +80,15 @@ func TestPostgresbuildQuery(t *testing.T) {
|
||||||
|
|
||||||
func TestConnectAndQuery(t *testing.T) {
|
func TestConnectAndQuery(t *testing.T) {
|
||||||
m := metadata{
|
m := metadata{
|
||||||
connectionString: "mockConnectionString",
|
ConnectionString: "mockConnectionString",
|
||||||
configTable: "mockConfigTable",
|
ConfigTable: "mockConfigTable",
|
||||||
}
|
}
|
||||||
|
|
||||||
mock, err := pgxmock.NewPool()
|
mock, err := pgxmock.NewPool()
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
defer mock.Close()
|
defer mock.Close()
|
||||||
|
|
||||||
query := "SELECT EXISTS (SELECT FROM pg_tables where tablename = '" + m.configTable + "'"
|
query := "SELECT EXISTS (SELECT FROM pg_tables where tablename = '" + m.ConfigTable + "'"
|
||||||
mock.ExpectQuery(regexp.QuoteMeta(query)).
|
mock.ExpectQuery(regexp.QuoteMeta(query)).
|
||||||
WillReturnRows(pgxmock.NewRows(
|
WillReturnRows(pgxmock.NewRows(
|
||||||
[]string{"exists"}).
|
[]string{"exists"}).
|
||||||
|
|
|
@ -16,12 +16,14 @@ package redis
|
||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
type metadata struct {
|
type metadata struct {
|
||||||
Host string
|
Host string `mapstructure:"redishost"`
|
||||||
Password string
|
Password string `mapstructure:"redisPassword"`
|
||||||
SentinelMasterName string
|
SentinelMasterName string `mapstructure:"sentinelMasterName"`
|
||||||
MaxRetries int
|
MaxRetries int `mapstructure:"maxRetries"`
|
||||||
MaxRetryBackoff time.Duration
|
MaxRetryBackoff *int `mapstructure:"maxRetryBackoff"`
|
||||||
EnableTLS bool
|
EnableTLS bool `mapstructure:"enableTLS"`
|
||||||
Failover bool
|
Failover bool `mapstructure:"failover"`
|
||||||
DB int
|
DB int `mapstructure:"redisDB"`
|
||||||
|
|
||||||
|
internalMaxRetryBackoff time.Duration `mapstructure:"-"`
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -29,6 +30,7 @@ import (
|
||||||
|
|
||||||
"github.com/dapr/components-contrib/configuration"
|
"github.com/dapr/components-contrib/configuration"
|
||||||
"github.com/dapr/components-contrib/configuration/redis/internal"
|
"github.com/dapr/components-contrib/configuration/redis/internal"
|
||||||
|
contribMetadata "github.com/dapr/components-contrib/metadata"
|
||||||
"github.com/dapr/kit/logger"
|
"github.com/dapr/kit/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -74,69 +76,29 @@ func NewRedisConfigurationStore(logger logger.Logger) configuration.Store {
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseRedisMetadata(meta configuration.Metadata) (metadata, error) {
|
func parseRedisMetadata(meta configuration.Metadata) (metadata, error) {
|
||||||
m := metadata{}
|
m := metadata{
|
||||||
|
EnableTLS: defaultEnableTLS,
|
||||||
|
MaxRetries: defaultMaxRetries,
|
||||||
|
internalMaxRetryBackoff: defaultMaxRetryBackoff,
|
||||||
|
Failover: false,
|
||||||
|
DB: defaultDB,
|
||||||
|
}
|
||||||
|
decodeErr := contribMetadata.DecodeMetadata(meta.Properties, &m)
|
||||||
|
if decodeErr != nil {
|
||||||
|
return m, decodeErr
|
||||||
|
}
|
||||||
|
|
||||||
if val, ok := meta.Properties[host]; ok && val != "" {
|
if m.Host == "" {
|
||||||
m.Host = val
|
|
||||||
} else {
|
|
||||||
return m, errors.New("redis store error: missing host address")
|
return m, errors.New("redis store error: missing host address")
|
||||||
}
|
}
|
||||||
|
|
||||||
if val, ok := meta.Properties[password]; ok && val != "" {
|
|
||||||
m.Password = val
|
|
||||||
}
|
|
||||||
|
|
||||||
m.EnableTLS = defaultEnableTLS
|
|
||||||
if val, ok := meta.Properties[enableTLS]; ok && val != "" {
|
|
||||||
tls, err := strconv.ParseBool(val)
|
|
||||||
if err != nil {
|
|
||||||
return m, fmt.Errorf("redis store error: can't parse enableTLS field: %s", err)
|
|
||||||
}
|
|
||||||
m.EnableTLS = tls
|
|
||||||
}
|
|
||||||
|
|
||||||
m.MaxRetries = defaultMaxRetries
|
|
||||||
if val, ok := meta.Properties[maxRetries]; ok && val != "" {
|
|
||||||
parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize)
|
|
||||||
if err != nil {
|
|
||||||
return m, fmt.Errorf("redis store error: can't parse maxRetries field: %s", err)
|
|
||||||
}
|
|
||||||
m.MaxRetries = int(parsedVal)
|
|
||||||
}
|
|
||||||
|
|
||||||
m.MaxRetryBackoff = defaultMaxRetryBackoff
|
|
||||||
if val, ok := meta.Properties[maxRetryBackoff]; ok && val != "" {
|
|
||||||
parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize)
|
|
||||||
if err != nil {
|
|
||||||
return m, fmt.Errorf("redis store error: can't parse maxRetryBackoff field: %s", err)
|
|
||||||
}
|
|
||||||
m.MaxRetryBackoff = time.Duration(parsedVal)
|
|
||||||
}
|
|
||||||
|
|
||||||
if val, ok := meta.Properties[failover]; ok && val != "" {
|
|
||||||
failover, err := strconv.ParseBool(val)
|
|
||||||
if err != nil {
|
|
||||||
return m, fmt.Errorf("redis store error: can't parse failover field: %s", err)
|
|
||||||
}
|
|
||||||
m.Failover = failover
|
|
||||||
}
|
|
||||||
|
|
||||||
// set the sentinelMasterName only with failover == true.
|
// set the sentinelMasterName only with failover == true.
|
||||||
if m.Failover {
|
if m.Failover && m.SentinelMasterName == "" {
|
||||||
if val, ok := meta.Properties[sentinelMasterName]; ok && val != "" {
|
return m, errors.New("redis store error: missing sentinelMasterName")
|
||||||
m.SentinelMasterName = val
|
|
||||||
} else {
|
|
||||||
return m, errors.New("redis store error: missing sentinelMasterName")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
m.DB = defaultDB
|
if m.MaxRetryBackoff != nil {
|
||||||
if val, ok := meta.Properties[redisDB]; ok && val != "" {
|
m.internalMaxRetryBackoff = time.Duration(*m.MaxRetryBackoff)
|
||||||
parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize)
|
|
||||||
if err != nil {
|
|
||||||
return m, fmt.Errorf("redis store error: can't parse redisDB field from metadata: %w", err)
|
|
||||||
}
|
|
||||||
m.DB = int(parsedVal)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
|
@ -171,7 +133,7 @@ func (r *ConfigurationStore) newClient(m metadata) *redis.Client {
|
||||||
Password: m.Password,
|
Password: m.Password,
|
||||||
DB: m.DB,
|
DB: m.DB,
|
||||||
MaxRetries: m.MaxRetries,
|
MaxRetries: m.MaxRetries,
|
||||||
MaxRetryBackoff: m.MaxRetryBackoff,
|
MaxRetryBackoff: m.internalMaxRetryBackoff,
|
||||||
}
|
}
|
||||||
|
|
||||||
// tell the linter to skip a check here.
|
// tell the linter to skip a check here.
|
||||||
|
@ -191,7 +153,7 @@ func (r *ConfigurationStore) newFailoverClient(m metadata) *redis.Client {
|
||||||
SentinelAddrs: []string{r.metadata.Host},
|
SentinelAddrs: []string{r.metadata.Host},
|
||||||
DB: m.DB,
|
DB: m.DB,
|
||||||
MaxRetries: m.MaxRetries,
|
MaxRetries: m.MaxRetries,
|
||||||
MaxRetryBackoff: m.MaxRetryBackoff,
|
MaxRetryBackoff: m.internalMaxRetryBackoff,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* #nosec */
|
/* #nosec */
|
||||||
|
@ -369,3 +331,11 @@ func (r *ConfigurationStore) handleSubscribedChange(ctx context.Context, req *co
|
||||||
r.logger.Errorf("fail to call handler to notify event for configuration update subscribe: %s", err)
|
r.logger.Errorf("fail to call handler to notify event for configuration update subscribe: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetComponentMetadata returns the metadata of the component.
|
||||||
|
func (r *ConfigurationStore) GetComponentMetadata() map[string]string {
|
||||||
|
metadataStruct := metadata{}
|
||||||
|
metadataInfo := map[string]string{}
|
||||||
|
contribMetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo)
|
||||||
|
return metadataInfo
|
||||||
|
}
|
||||||
|
|
|
@ -15,7 +15,6 @@ package redis
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"reflect"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -252,27 +251,27 @@ func Test_parseRedisMetadata(t *testing.T) {
|
||||||
testProperties[sentinelMasterName] = "tesSentinelMasterName"
|
testProperties[sentinelMasterName] = "tesSentinelMasterName"
|
||||||
testProperties[redisDB] = "1"
|
testProperties[redisDB] = "1"
|
||||||
testMetadata := metadata{
|
testMetadata := metadata{
|
||||||
Host: "testHost",
|
Host: "testHost",
|
||||||
Password: "testPassword",
|
Password: "testPassword",
|
||||||
EnableTLS: true,
|
EnableTLS: true,
|
||||||
MaxRetries: 10,
|
MaxRetries: 10,
|
||||||
MaxRetryBackoff: time.Second,
|
internalMaxRetryBackoff: time.Second,
|
||||||
Failover: true,
|
Failover: true,
|
||||||
SentinelMasterName: "tesSentinelMasterName",
|
SentinelMasterName: "tesSentinelMasterName",
|
||||||
DB: 1,
|
DB: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
testDefaultProperties := make(map[string]string)
|
testDefaultProperties := make(map[string]string)
|
||||||
testDefaultProperties[host] = "testHost"
|
testDefaultProperties[host] = "testHost"
|
||||||
defaultMetadata := metadata{
|
defaultMetadata := metadata{
|
||||||
Host: "testHost",
|
Host: "testHost",
|
||||||
Password: "",
|
Password: "",
|
||||||
EnableTLS: defaultEnableTLS,
|
EnableTLS: defaultEnableTLS,
|
||||||
MaxRetries: defaultMaxRetries,
|
MaxRetries: defaultMaxRetries,
|
||||||
MaxRetryBackoff: defaultMaxRetryBackoff,
|
internalMaxRetryBackoff: defaultMaxRetryBackoff,
|
||||||
Failover: false,
|
Failover: false,
|
||||||
SentinelMasterName: "",
|
SentinelMasterName: "",
|
||||||
DB: defaultDB,
|
DB: defaultDB,
|
||||||
}
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
@ -305,9 +304,14 @@ func Test_parseRedisMetadata(t *testing.T) {
|
||||||
t.Errorf("parseRedisMetadata() error = %v, wantErr %v", err, tt.wantErr)
|
t.Errorf("parseRedisMetadata() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(got, tt.want) {
|
assert.Equal(t, tt.want.Host, got.Host)
|
||||||
t.Errorf("parseRedisMetadata() got = %v, want %v", got, tt.want)
|
assert.Equal(t, tt.want.Password, got.Password)
|
||||||
}
|
assert.Equal(t, tt.want.EnableTLS, got.EnableTLS)
|
||||||
|
assert.Equal(t, tt.want.MaxRetries, got.MaxRetries)
|
||||||
|
assert.Equal(t, tt.want.internalMaxRetryBackoff, got.internalMaxRetryBackoff)
|
||||||
|
assert.Equal(t, tt.want.Failover, got.Failover)
|
||||||
|
assert.Equal(t, tt.want.SentinelMasterName, got.SentinelMasterName)
|
||||||
|
assert.Equal(t, tt.want.DB, got.DB)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,9 @@ type Store interface {
|
||||||
|
|
||||||
// Unsubscribe configuration with keys
|
// Unsubscribe configuration with keys
|
||||||
Unsubscribe(ctx context.Context, req *UnsubscribeRequest) error
|
Unsubscribe(ctx context.Context, req *UnsubscribeRequest) error
|
||||||
|
|
||||||
|
// GetComponentMetadata returns information on the component's metadata.
|
||||||
|
GetComponentMetadata() map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateHandler is the handler used to send event to daprd.
|
// UpdateHandler is the handler used to send event to daprd.
|
||||||
|
|
|
@ -16,12 +16,14 @@ package redis
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
rediscomponent "github.com/dapr/components-contrib/internal/component/redis"
|
rediscomponent "github.com/dapr/components-contrib/internal/component/redis"
|
||||||
"github.com/dapr/components-contrib/lock"
|
"github.com/dapr/components-contrib/lock"
|
||||||
|
contribMetadata "github.com/dapr/components-contrib/metadata"
|
||||||
"github.com/dapr/kit/logger"
|
"github.com/dapr/kit/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -186,3 +188,11 @@ func (r *StandaloneRedisLock) Close() error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetComponentMetadata returns the metadata of the component.
|
||||||
|
func (r *StandaloneRedisLock) GetComponentMetadata() map[string]string {
|
||||||
|
metadataStruct := rediscomponent.Metadata{}
|
||||||
|
metadataInfo := map[string]string{}
|
||||||
|
contribMetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo)
|
||||||
|
return metadataInfo
|
||||||
|
}
|
||||||
|
|
|
@ -24,4 +24,7 @@ type Store interface {
|
||||||
|
|
||||||
// Unlock tries to release a lock.
|
// Unlock tries to release a lock.
|
||||||
Unlock(ctx context.Context, req *UnlockRequest) (*UnlockResponse, error)
|
Unlock(ctx context.Context, req *UnlockRequest) (*UnlockResponse, error)
|
||||||
|
|
||||||
|
// GetComponentMetadata returns information on the component's metadata.
|
||||||
|
GetComponentMetadata() map[string]string
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,6 +140,14 @@ func GetMetadataProperty(props map[string]string, keys ...string) (val string, o
|
||||||
// DecodeMetadata decodes metadata into a struct
|
// DecodeMetadata decodes metadata into a struct
|
||||||
// This is an extension of mitchellh/mapstructure which also supports decoding durations
|
// This is an extension of mitchellh/mapstructure which also supports decoding durations
|
||||||
func DecodeMetadata(input interface{}, result interface{}) error {
|
func DecodeMetadata(input interface{}, result interface{}) error {
|
||||||
|
// avoids a common mistake of passing the metadata object, instead of the properties map
|
||||||
|
// if input is not of type map[string]string, then cast to metadata.Base and access the Properties
|
||||||
|
if _, ok := input.(map[string]string); !ok {
|
||||||
|
if base, ok := input.(Base); ok {
|
||||||
|
input = base.Properties
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
|
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
|
||||||
DecodeHook: mapstructure.ComposeDecodeHookFunc(
|
DecodeHook: mapstructure.ComposeDecodeHookFunc(
|
||||||
toTimeDurationHookFunc(),
|
toTimeDurationHookFunc(),
|
||||||
|
|
Loading…
Reference in New Issue