fixed wrapping error from %s to %w
Signed-off-by: akhilac1 <chetlapalle.akhila@gmail.com>
This commit is contained in:
parent
925e8f32a0
commit
d569c1776d
|
@ -70,7 +70,7 @@ const (
|
|||
|
||||
var (
|
||||
allowedChars = regexp.MustCompile(`^[a-zA-Z0-9./_]*$`)
|
||||
defaultMaxConnIdleTime = time.Minute * 30
|
||||
defaultMaxConnIdleTime = time.Second * 30
|
||||
)
|
||||
|
||||
func NewPostgresConfigurationStore(logger logger.Logger) configuration.Store {
|
||||
|
@ -78,7 +78,6 @@ func NewPostgresConfigurationStore(logger logger.Logger) configuration.Store {
|
|||
return &ConfigurationStore{
|
||||
logger: logger,
|
||||
subscribeStopChanMap: make(map[string]interface{}),
|
||||
configLock: sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -94,17 +93,16 @@ func (p *ConfigurationStore) Init(metadata configuration.Metadata) error {
|
|||
p.metadata = m
|
||||
}
|
||||
p.ActiveSubscriptions = make(map[string]*subscription)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultMaxConnIdleTime)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), p.metadata.maxIdleTimeout)
|
||||
defer cancel()
|
||||
// ctx := context.Background()
|
||||
client, err := Connect(ctx, p.metadata.connectionString)
|
||||
client, err := Connect(ctx, p.metadata.connectionString, p.metadata.maxIdleTimeout)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error connecting to configuration store: '%s'", err)
|
||||
return fmt.Errorf("error connecting to configuration store: '%w'", err)
|
||||
}
|
||||
p.client = client
|
||||
pingErr := p.client.Ping(ctx)
|
||||
if pingErr != nil {
|
||||
return fmt.Errorf("unable to connect to configuration store: '%s'", pingErr)
|
||||
return fmt.Errorf("unable to connect to configuration store: '%w'", pingErr)
|
||||
}
|
||||
// check if table exists
|
||||
exists := false
|
||||
|
@ -126,7 +124,7 @@ func (p *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequ
|
|||
query, params, err := buildQuery(req, p.metadata.configTable)
|
||||
if err != nil {
|
||||
p.logger.Error(err)
|
||||
return nil, fmt.Errorf("error in configuration store query: '%s' ", err)
|
||||
return nil, fmt.Errorf("error in configuration store query: '%w' ", err)
|
||||
}
|
||||
rows, err := p.client.Query(ctx, query, params...)
|
||||
if err != nil {
|
||||
|
@ -134,7 +132,7 @@ func (p *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequ
|
|||
if err == sql.ErrNoRows {
|
||||
return &configuration.GetResponse{}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("error in querying configuration store: '%s'", err)
|
||||
return nil, fmt.Errorf("error in querying configuration store: '%w'", err)
|
||||
}
|
||||
items := make(map[string]*configuration.Item)
|
||||
for i := 0; rows.Next(); i++ {
|
||||
|
@ -143,10 +141,10 @@ func (p *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequ
|
|||
var metadata []byte
|
||||
v := make(map[string]string)
|
||||
if err := rows.Scan(&key, &item.Value, &item.Version, &metadata); err != nil {
|
||||
return nil, fmt.Errorf("error in reading data from configuration store: '%s'", err)
|
||||
return nil, fmt.Errorf("error in reading data from configuration store: '%w'", err)
|
||||
}
|
||||
if err := json.Unmarshal(metadata, &v); err != nil {
|
||||
return nil, fmt.Errorf("error in unmarshalling response from configuration store: '%s'", err)
|
||||
return nil, fmt.Errorf("error in unmarshalling response from configuration store: '%w'", err)
|
||||
}
|
||||
item.Metadata = v
|
||||
if item.Value != "" {
|
||||
|
@ -183,13 +181,13 @@ func (p *ConfigurationStore) Unsubscribe(ctx context.Context, req *configuration
|
|||
conn, err := p.client.Acquire(ctx)
|
||||
if err != nil {
|
||||
p.logger.Errorf("error acquiring connection:", err)
|
||||
return fmt.Errorf("error acquiring connection: %s ", err)
|
||||
return fmt.Errorf("error acquiring connection: %w ", err)
|
||||
}
|
||||
defer conn.Release()
|
||||
_, err = conn.Exec(ctx, pgChannel)
|
||||
if err != nil {
|
||||
p.logger.Errorf("error listening to channel:", err)
|
||||
return fmt.Errorf("error listening to channel: %s", err)
|
||||
return fmt.Errorf("error listening to channel: %w", err)
|
||||
}
|
||||
delete(p.ActiveSubscriptions, k)
|
||||
return nil
|
||||
|
@ -226,7 +224,7 @@ func (p *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration
|
|||
func (p *ConfigurationStore) handleSubscribedChange(ctx context.Context, handler configuration.UpdateHandler, msg *pgconn.Notification, channel string, subscriptionID string) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
p.logger.Errorf("panic in handlesubscribedchange method and recovered: %s", err)
|
||||
p.logger.Errorf("panic in handlesubscribedchange method and recovered: %w", err)
|
||||
}
|
||||
}()
|
||||
payload := make(map[string]interface{})
|
||||
|
@ -273,7 +271,7 @@ func (p *ConfigurationStore) handleSubscribedChange(ctx context.Context, handler
|
|||
}
|
||||
err = handler(ctx, e)
|
||||
if err != nil {
|
||||
p.logger.Errorf("fail to call handler to notify event for configuration update subscribe: %s", err)
|
||||
p.logger.Errorf("fail to call handler to notify event for configuration update subscribe: %w", err)
|
||||
}
|
||||
} else {
|
||||
p.logger.Info("unknown format of data received in notify event - '%s'", msg.Payload)
|
||||
|
@ -310,18 +308,18 @@ func parseMetadata(cmetadata configuration.Metadata) (metadata, error) {
|
|||
return m, nil
|
||||
}
|
||||
|
||||
func Connect(ctx context.Context, conn string) (*pgxpool.Pool, error) {
|
||||
func Connect(ctx context.Context, conn string, maxTimeout time.Duration) (*pgxpool.Pool, error) {
|
||||
config, err := pgxpool.ParseConfig(conn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("postgres configuration store connection error : %s", err)
|
||||
return nil, fmt.Errorf("postgres configuration store connection error : %w", err)
|
||||
}
|
||||
pool, err := pgxpool.NewWithConfig(ctx, config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("postgres configuration store connection error : %s", err)
|
||||
return nil, fmt.Errorf("postgres configuration store connection error : %w", err)
|
||||
}
|
||||
pingErr := pool.Ping(ctx)
|
||||
if pingErr != nil {
|
||||
return nil, fmt.Errorf("postgres configuration store ping error : %s", pingErr)
|
||||
return nil, fmt.Errorf("postgres configuration store ping error : %w", pingErr)
|
||||
}
|
||||
return pool, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue