changed row iteration to use generic method from pgxv5

Signed-off-by: akhilac1 <chetlapalle.akhila@gmail.com>
This commit is contained in:
akhilac1 2022-09-23 17:30:09 +05:30
parent 86b77cc269
commit d013c7505e
1 changed files with 33 additions and 28 deletions

View File

@ -27,6 +27,7 @@ import (
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"k8s.io/utils/strings/slices"
@ -40,7 +41,7 @@ type ConfigurationStore struct {
client *pgxpool.Pool
logger logger.Logger
configLock sync.Mutex
subscribeStopChanMap map[string]interface{}
subscribeStopChanMap map[string]chan struct{}
ActiveSubscriptions map[string]*subscription
}
@ -49,6 +50,11 @@ type subscription struct {
keys []string
}
type pgResponse struct {
key string
item *configuration.Item
}
const (
configtablekey = "table"
connMaxIdleTimeKey = "connMaxIdleTime"
@ -77,7 +83,7 @@ func NewPostgresConfigurationStore(logger logger.Logger) configuration.Store {
logger.Debug("Instantiating PostgreSQL configuration store")
return &ConfigurationStore{
logger: logger,
subscribeStopChanMap: make(map[string]interface{}),
subscribeStopChanMap: make(map[string]chan struct{}),
}
}
@ -100,9 +106,9 @@ func (p *ConfigurationStore) Init(metadata configuration.Metadata) error {
return fmt.Errorf("error connecting to configuration store: '%w'", err)
}
p.client = client
pingErr := p.client.Ping(ctx)
if pingErr != nil {
return fmt.Errorf("unable to connect to configuration store: '%w'", pingErr)
err = p.client.Ping(ctx)
if err != nil {
return fmt.Errorf("unable to connect to configuration store: '%w'", err)
}
// check if table exists
exists := false
@ -111,7 +117,7 @@ func (p *ConfigurationStore) Init(metadata configuration.Metadata) error {
if err == sql.ErrNoRows {
return fmt.Errorf(ErrorMissingTable, p.metadata.configTable)
}
return fmt.Errorf("error in checking if configtable exists - '%v'", p.metadata.configTable)
return fmt.Errorf("error in checking if configtable '%s' exists - '%w'", p.metadata.configTable, err)
}
return nil
}
@ -134,25 +140,24 @@ func (p *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequ
}
return nil, fmt.Errorf("error in querying configuration store: '%w'", err)
}
items := make(map[string]*configuration.Item)
for i := 0; rows.Next(); i++ {
var item configuration.Item
var key string
var metadata []byte
v := make(map[string]string)
if err := rows.Scan(&key, &item.Value, &item.Version, &metadata); err != nil {
return nil, fmt.Errorf("error in reading data from configuration store: '%w'", err)
items, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (pgResponse, error) {
a := pgResponse{
item: new(configuration.Item),
}
if err := json.Unmarshal(metadata, &v); err != nil {
return nil, fmt.Errorf("error in unmarshalling response from configuration store: '%w'", err)
}
item.Metadata = v
if item.Value != "" {
items[key] = &item
if err := row.Scan(&a.key, &a.item.Value, &a.item.Version, &a.item.Metadata); err != nil {
return pgResponse{}, fmt.Errorf("error in reading data from configuration store: '%w'", err)
}
return a, nil
})
if err != nil {
return nil, fmt.Errorf("unable to parse response from configuration store - %w", err)
}
result := make(map[string]*configuration.Item)
for _, v := range items {
result[v.key] = v.item
}
return &configuration.GetResponse{
Items: items,
Items: result,
}, nil
}
@ -176,7 +181,7 @@ func (p *ConfigurationStore) Unsubscribe(ctx context.Context, req *configuration
if v.uuid == req.ID {
if oldStopChan, ok := p.subscribeStopChanMap[req.ID]; ok {
delete(p.subscribeStopChanMap, req.ID)
close(oldStopChan.(chan struct{}))
close(oldStopChan)
pgChannel := fmt.Sprintf(unlistenTemplate, k)
conn, err := p.client.Acquire(ctx)
if err != nil {
@ -186,7 +191,7 @@ func (p *ConfigurationStore) Unsubscribe(ctx context.Context, req *configuration
defer conn.Release()
_, err = conn.Exec(ctx, pgChannel)
if err != nil {
p.logger.Errorf("error listening to channel:", err)
p.logger.Errorf("error un-listening to channel:", err)
return fmt.Errorf("error listening to channel: %w", err)
}
delete(p.ActiveSubscriptions, k)
@ -212,7 +217,7 @@ func (p *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration
for {
notification, err := conn.Conn().WaitForNotification(ctx)
if err != nil {
if !pgconn.Timeout(err) && !errors.Is(ctx.Err(), context.Canceled) {
if !pgconn.Timeout(err) && !errors.Is(err, context.Canceled) {
p.logger.Errorf("error waiting for notification:", err)
}
return
@ -317,9 +322,9 @@ func Connect(ctx context.Context, conn string, maxTimeout time.Duration) (*pgxpo
if err != nil {
return nil, fmt.Errorf("postgres configuration store connection error : %w", err)
}
pingErr := pool.Ping(ctx)
if pingErr != nil {
return nil, fmt.Errorf("postgres configuration store ping error : %w", pingErr)
err = pool.Ping(ctx)
if err != nil {
return nil, fmt.Errorf("postgres configuration store ping error : %w", err)
}
return pool, nil
}
@ -397,7 +402,7 @@ func (p *ConfigurationStore) subscribeToChannel(ctx context.Context, pgNotifyCha
pgNotifyCmd := fmt.Sprintf(listenTemplate, channel)
if sub, isActive := p.isSubscriptionActive(req); isActive {
if oldStopChan, ok := p.subscribeStopChanMap[sub]; ok {
close(oldStopChan.(chan struct{}))
close(oldStopChan)
delete(p.subscribeStopChanMap, sub)
delete(p.ActiveSubscriptions, channel)
}