Support for Postgres configuration store
Signed-off-by: akhilac1 <chetlapalle.akhila@gmail.com>
This commit is contained in:
parent
e6a8156c4b
commit
cc81a9e26a
|
|
@ -32,9 +32,9 @@ import (
|
|||
_ "github.com/jackc/pgx/v4/stdlib"
|
||||
)
|
||||
|
||||
type PostgresConfigStore struct {
|
||||
type ConfigurationStore struct {
|
||||
metadata metadata
|
||||
pool *pgxpool.Pool
|
||||
client *pgxpool.Pool
|
||||
logger logger.Logger
|
||||
subscribeStopChanMap sync.Map
|
||||
}
|
||||
|
|
@ -51,16 +51,16 @@ const (
|
|||
QueryTableExists = "SELECT EXISTS (SELECT FROM pg_tables where tablename = $1)"
|
||||
)
|
||||
|
||||
func NewPostgresConfigurationStore(logger logger.Logger) *PostgresConfigStore {
|
||||
func NewPostgresConfigurationStore(logger logger.Logger) configuration.Store {
|
||||
logger.Debug("Instantiating PostgreSQL configuration store")
|
||||
return &PostgresConfigStore{
|
||||
return &ConfigurationStore{
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PostgresConfigStore) Init(metadata configuration.Metadata) error {
|
||||
func (p *ConfigurationStore) Init(metadata configuration.Metadata) error {
|
||||
p.logger.Debug(InfoStartInit)
|
||||
if p.pool != nil {
|
||||
if p.client != nil {
|
||||
return fmt.Errorf(ErrorAlreadyInitialized)
|
||||
}
|
||||
if m, err := parseMetadata(metadata); err != nil {
|
||||
|
|
@ -71,33 +71,33 @@ func (p *PostgresConfigStore) Init(metadata configuration.Metadata) error {
|
|||
}
|
||||
|
||||
ctx := context.Background()
|
||||
pool, err := Connect(ctx, p.metadata.connectionString)
|
||||
client, err := Connect(ctx, p.metadata.connectionString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.pool = pool
|
||||
pingErr := p.pool.Ping(ctx)
|
||||
p.client = client
|
||||
pingErr := p.client.Ping(ctx)
|
||||
if pingErr != nil {
|
||||
return pingErr
|
||||
}
|
||||
|
||||
// check if table exists
|
||||
exists := false
|
||||
err = p.pool.QueryRow(ctx, QueryTableExists, p.metadata.configTable).Scan(&exists)
|
||||
err = p.client.QueryRow(ctx, QueryTableExists, p.metadata.configTable).Scan(&exists)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PostgresConfigStore) Get(ctx context.Context, req *configuration.GetRequest) (*configuration.GetResponse, error) {
|
||||
func (p *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequest) (*configuration.GetResponse, error) {
|
||||
query, err := buildQuery(req, p.metadata.configTable)
|
||||
if err != nil {
|
||||
p.logger.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := p.pool.Query(ctx, query)
|
||||
rows, err := p.client.Query(ctx, query)
|
||||
if err != nil {
|
||||
// If no rows exist, return an empty response, otherwise return the error.
|
||||
if err == sql.ErrNoRows {
|
||||
|
|
@ -124,7 +124,7 @@ func (p *PostgresConfigStore) Get(ctx context.Context, req *configuration.GetReq
|
|||
return &response, nil
|
||||
}
|
||||
|
||||
func (p *PostgresConfigStore) Subscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler) (string, error) {
|
||||
func (p *ConfigurationStore) Subscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler) (string, error) {
|
||||
subscribeID := uuid.New().String()
|
||||
key := "listen " + p.metadata.configTable
|
||||
// subscribe to events raised on the configTable
|
||||
|
|
@ -137,7 +137,7 @@ func (p *PostgresConfigStore) Subscribe(ctx context.Context, req *configuration.
|
|||
return subscribeID, nil
|
||||
}
|
||||
|
||||
func (p *PostgresConfigStore) Unsubscribe(ctx context.Context, req *configuration.UnsubscribeRequest) error {
|
||||
func (p *ConfigurationStore) Unsubscribe(ctx context.Context, req *configuration.UnsubscribeRequest) error {
|
||||
if oldStopChan, ok := p.subscribeStopChanMap.Load(req.ID); ok {
|
||||
p.subscribeStopChanMap.Delete(req.ID)
|
||||
close(oldStopChan.(chan struct{}))
|
||||
|
|
@ -145,8 +145,8 @@ func (p *PostgresConfigStore) Unsubscribe(ctx context.Context, req *configuratio
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *PostgresConfigStore) doSubscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, channel string, id string, stop chan struct{}) {
|
||||
conn, err := p.pool.Acquire(ctx)
|
||||
func (p *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration.SubscribeRequest, handler configuration.UpdateHandler, channel string, id string, stop chan struct{}) {
|
||||
conn, err := p.client.Acquire(ctx)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, "Error acquiring connection:", err)
|
||||
}
|
||||
|
|
@ -168,7 +168,7 @@ func (p *PostgresConfigStore) doSubscribe(ctx context.Context, req *configuratio
|
|||
}
|
||||
}
|
||||
|
||||
func (p *PostgresConfigStore) handleSubscribedChange(ctx context.Context, handler configuration.UpdateHandler, msg *pgconn.Notification, id string) {
|
||||
func (p *ConfigurationStore) handleSubscribedChange(ctx context.Context, handler configuration.UpdateHandler, msg *pgconn.Notification, id string) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
p.logger.Errorf("panic in handleSubscribedChange()method and recovered: %s", err)
|
||||
|
|
|
|||
Loading…
Reference in New Issue