diff --git a/configuration/postgres/postgres.go b/configuration/postgres/postgres.go index bb0f795b8..bb6f3e5b0 100644 --- a/configuration/postgres/postgres.go +++ b/configuration/postgres/postgres.go @@ -70,7 +70,7 @@ const ( var ( allowedChars = regexp.MustCompile(`^[a-zA-Z0-9./_]*$`) - defaultMaxConnIdleTime = time.Minute * 30 + defaultMaxConnIdleTime = time.Second * 30 ) func NewPostgresConfigurationStore(logger logger.Logger) configuration.Store { @@ -78,7 +78,6 @@ func NewPostgresConfigurationStore(logger logger.Logger) configuration.Store { return &ConfigurationStore{ logger: logger, subscribeStopChanMap: make(map[string]interface{}), - configLock: sync.Mutex{}, } } @@ -94,17 +93,16 @@ func (p *ConfigurationStore) Init(metadata configuration.Metadata) error { p.metadata = m } p.ActiveSubscriptions = make(map[string]*subscription) - ctx, cancel := context.WithTimeout(context.Background(), defaultMaxConnIdleTime) + ctx, cancel := context.WithTimeout(context.Background(), p.metadata.maxIdleTimeout) defer cancel() - // ctx := context.Background() - client, err := Connect(ctx, p.metadata.connectionString) + client, err := Connect(ctx, p.metadata.connectionString, p.metadata.maxIdleTimeout) if err != nil { - return fmt.Errorf("error connecting to configuration store: '%s'", err) + return fmt.Errorf("error connecting to configuration store: '%w'", err) } p.client = client pingErr := p.client.Ping(ctx) if pingErr != nil { - return fmt.Errorf("unable to connect to configuration store: '%s'", pingErr) + return fmt.Errorf("unable to connect to configuration store: '%w'", pingErr) } // check if table exists exists := false @@ -126,7 +124,7 @@ func (p *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequ query, params, err := buildQuery(req, p.metadata.configTable) if err != nil { p.logger.Error(err) - return nil, fmt.Errorf("error in configuration store query: '%s' ", err) + return nil, fmt.Errorf("error in configuration store query: '%w' ", err) } rows, err := p.client.Query(ctx, query, params...) if err != nil { @@ -134,7 +132,7 @@ func (p *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequ if err == sql.ErrNoRows { return &configuration.GetResponse{}, nil } - return nil, fmt.Errorf("error in querying configuration store: '%s'", err) + return nil, fmt.Errorf("error in querying configuration store: '%w'", err) } items := make(map[string]*configuration.Item) for i := 0; rows.Next(); i++ { @@ -143,10 +141,10 @@ func (p *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequ var metadata []byte v := make(map[string]string) if err := rows.Scan(&key, &item.Value, &item.Version, &metadata); err != nil { - return nil, fmt.Errorf("error in reading data from configuration store: '%s'", err) + return nil, fmt.Errorf("error in reading data from configuration store: '%w'", err) } if err := json.Unmarshal(metadata, &v); err != nil { - return nil, fmt.Errorf("error in unmarshalling response from configuration store: '%s'", err) + return nil, fmt.Errorf("error in unmarshalling response from configuration store: '%w'", err) } item.Metadata = v if item.Value != "" { @@ -183,13 +181,13 @@ func (p *ConfigurationStore) Unsubscribe(ctx context.Context, req *configuration conn, err := p.client.Acquire(ctx) if err != nil { p.logger.Errorf("error acquiring connection:", err) - return fmt.Errorf("error acquiring connection: %s ", err) + return fmt.Errorf("error acquiring connection: %w ", err) } defer conn.Release() _, err = conn.Exec(ctx, pgChannel) if err != nil { p.logger.Errorf("error listening to channel:", err) - return fmt.Errorf("error listening to channel: %s", err) + return fmt.Errorf("error listening to channel: %w", err) } delete(p.ActiveSubscriptions, k) return nil @@ -226,7 +224,7 @@ func (p *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration func (p *ConfigurationStore) handleSubscribedChange(ctx context.Context, handler configuration.UpdateHandler, msg *pgconn.Notification, channel string, subscriptionID string) { defer func() { if err := recover(); err != nil { - p.logger.Errorf("panic in handlesubscribedchange method and recovered: %s", err) + p.logger.Errorf("panic in handlesubscribedchange method and recovered: %w", err) } }() payload := make(map[string]interface{}) @@ -273,7 +271,7 @@ func (p *ConfigurationStore) handleSubscribedChange(ctx context.Context, handler } err = handler(ctx, e) if err != nil { - p.logger.Errorf("fail to call handler to notify event for configuration update subscribe: %s", err) + p.logger.Errorf("fail to call handler to notify event for configuration update subscribe: %w", err) } } else { p.logger.Info("unknown format of data received in notify event - '%s'", msg.Payload) @@ -310,18 +308,18 @@ func parseMetadata(cmetadata configuration.Metadata) (metadata, error) { return m, nil } -func Connect(ctx context.Context, conn string) (*pgxpool.Pool, error) { +func Connect(ctx context.Context, conn string, maxTimeout time.Duration) (*pgxpool.Pool, error) { config, err := pgxpool.ParseConfig(conn) if err != nil { - return nil, fmt.Errorf("postgres configuration store connection error : %s", err) + return nil, fmt.Errorf("postgres configuration store connection error : %w", err) } pool, err := pgxpool.NewWithConfig(ctx, config) if err != nil { - return nil, fmt.Errorf("postgres configuration store connection error : %s", err) + return nil, fmt.Errorf("postgres configuration store connection error : %w", err) } pingErr := pool.Ping(ctx) if pingErr != nil { - return nil, fmt.Errorf("postgres configuration store ping error : %s", pingErr) + return nil, fmt.Errorf("postgres configuration store ping error : %w", pingErr) } return pool, nil }