updating review comments
Signed-off-by: akhilac1 <chetlapalle.akhila@gmail.com>
This commit is contained in:
parent
ddc130640e
commit
cf413bf8fc
|
@ -17,6 +17,7 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
|
@ -30,6 +31,7 @@ import (
|
|||
"github.com/jackc/pgconn"
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
_ "github.com/jackc/pgx/v4/stdlib"
|
||||
"golang.org/x/exp/utf8string"
|
||||
)
|
||||
|
||||
type ConfigurationStore struct {
|
||||
|
@ -49,6 +51,8 @@ const (
|
|||
ErrorAlreadyInitialized = "PostgreSQL configuration store already initialized"
|
||||
ErrorMissinMaxTimeout = "missing PostgreSQL maxTimeout setting in configuration"
|
||||
QueryTableExists = "SELECT EXISTS (SELECT FROM pg_tables where tablename = $1)"
|
||||
maxIdentifierLength = 64 // https://www.postgresql.org/docs/current/limits.html
|
||||
ErrorTooLongFieldLength = "field name is too long"
|
||||
)
|
||||
|
||||
func NewPostgresConfigurationStore(logger logger.Logger) configuration.Store {
|
||||
|
@ -70,8 +74,9 @@ func (p *ConfigurationStore) Init(metadata configuration.Metadata) error {
|
|||
p.metadata = m
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
client, err := Connect(ctx, p.metadata.connectionString)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), p.metadata.maxIdleTime)
|
||||
defer cancel()
|
||||
client, err := Connect(ctx, p.metadata.connectionString, p.metadata.maxIdleTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -110,7 +115,7 @@ func (p *ConfigurationStore) Get(ctx context.Context, req *configuration.GetRequ
|
|||
var item configuration.Item
|
||||
var key string
|
||||
var metadata []byte
|
||||
var v = make(map[string]string)
|
||||
v := make(map[string]string)
|
||||
|
||||
if err := rows.Scan(key, &item.Value, &item.Version, &metadata); err != nil {
|
||||
return nil, err
|
||||
|
@ -151,17 +156,20 @@ func (p *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration
|
|||
fmt.Fprintln(os.Stderr, "Error acquiring connection:", err)
|
||||
}
|
||||
defer conn.Release()
|
||||
|
||||
_, err = conn.Exec(context.Background(), channel)
|
||||
ctxTimeout, cancel := context.WithTimeout(ctx, p.metadata.maxIdleTime)
|
||||
defer cancel()
|
||||
_, err = conn.Exec(ctxTimeout, channel)
|
||||
if err != nil {
|
||||
p.logger.Errorf("Error listening to channel:", err)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
notification, err := conn.Conn().WaitForNotification(ctx)
|
||||
notification, err := conn.Conn().WaitForNotification(ctxTimeout)
|
||||
if err != nil {
|
||||
p.logger.Errorf("Error waiting for notification:", err)
|
||||
if !(pgconn.Timeout(err) || errors.Is(ctxTimeout.Err(), context.Canceled)) {
|
||||
p.logger.Errorf("Error waiting for notification:", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
p.handleSubscribedChange(ctx, handler, notification, id)
|
||||
|
@ -171,13 +179,13 @@ func (p *ConfigurationStore) doSubscribe(ctx context.Context, req *configuration
|
|||
func (p *ConfigurationStore) handleSubscribedChange(ctx context.Context, handler configuration.UpdateHandler, msg *pgconn.Notification, id string) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
p.logger.Errorf("panic in handleSubscribedChange()method and recovered: %s", err)
|
||||
p.logger.Errorf("panic in handleSubscribedChange method and recovered: %s", err)
|
||||
}
|
||||
}()
|
||||
payload := make(map[string]interface{})
|
||||
err := json.Unmarshal([]byte(msg.Payload), &payload)
|
||||
if err != nil {
|
||||
p.logger.Errorf("Error in UnMarshall: ", err)
|
||||
p.logger.Errorf("Error in UnMarshal: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -230,6 +238,12 @@ func parseMetadata(cmetadata configuration.Metadata) (metadata, error) {
|
|||
}
|
||||
|
||||
if tbl, ok := cmetadata.Properties[configtablekey]; ok && tbl != "" {
|
||||
if !utf8string.NewString(tbl).IsASCII() {
|
||||
return m, fmt.Errorf("invalid table name : '%v'. non-ascii characters are not supported", tbl)
|
||||
}
|
||||
if len(tbl) > maxIdentifierLength {
|
||||
return m, fmt.Errorf(ErrorTooLongFieldLength+" - tableName : '%v'. max allowed field length is %v ", tbl, maxIdentifierLength)
|
||||
}
|
||||
m.configTable = tbl
|
||||
} else {
|
||||
return m, fmt.Errorf(ErrorMissingTableName)
|
||||
|
@ -246,12 +260,12 @@ func parseMetadata(cmetadata configuration.Metadata) (metadata, error) {
|
|||
return m, nil
|
||||
}
|
||||
|
||||
func Connect(ctx context.Context, conn string) (*pgxpool.Pool, error) {
|
||||
func Connect(ctx context.Context, conn string, maxTimeout time.Duration) (*pgxpool.Pool, error) {
|
||||
pool, err := pgxpool.Connect(ctx, conn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("postgres configuration store connection error : %s", err)
|
||||
}
|
||||
pingErr := pool.Ping(context.Background())
|
||||
pingErr := pool.Ping(ctx)
|
||||
if pingErr != nil {
|
||||
return nil, fmt.Errorf("postgres configuration store ping error : %s", pingErr)
|
||||
}
|
||||
|
|
|
@ -67,5 +67,4 @@ func TestConnectAndQuery(t *testing.T) {
|
|||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("there were unfulfilled expectations: %s", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue