Address review comments
Signed-off-by: Bernd Verst <github@bernd.dev>
This commit is contained in:
parent
358ebbf0bc
commit
4b01912b0f
|
@ -98,7 +98,7 @@ func (d *AzureQueueHelper) Init(ctx context.Context, meta bindings.Metadata) (*s
|
||||||
var clientErr error
|
var clientErr error
|
||||||
queueServiceClient, clientErr = azqueue.NewServiceClient(m.GetQueueURL(azEnvSettings), credential, &options)
|
queueServiceClient, clientErr = azqueue.NewServiceClient(m.GetQueueURL(azEnvSettings), credential, &options)
|
||||||
if clientErr != nil {
|
if clientErr != nil {
|
||||||
return nil, fmt.Errorf("cannot init storage queue client with Azure AD token: %w", err)
|
return nil, fmt.Errorf("cannot init storage queue client with Azure AD token: %w", clientErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +108,7 @@ func (d *AzureQueueHelper) Init(ctx context.Context, meta bindings.Metadata) (*s
|
||||||
d.queueClient = queueServiceClient.NewQueueClient(m.QueueName)
|
d.queueClient = queueServiceClient.NewQueueClient(m.QueueName)
|
||||||
|
|
||||||
createCtx, createCancel := context.WithTimeout(ctx, 2*time.Minute)
|
createCtx, createCancel := context.WithTimeout(ctx, 2*time.Minute)
|
||||||
_, err = d.queueClient.Create(createCtx, &azqueue.CreateOptions{})
|
_, err = d.queueClient.Create(createCtx, nil)
|
||||||
createCancel()
|
createCancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -121,6 +121,8 @@ func (d *AzureQueueHelper) Write(ctx context.Context, data []byte, ttl *time.Dur
|
||||||
var ttlSeconds *int32
|
var ttlSeconds *int32
|
||||||
if ttl != nil {
|
if ttl != nil {
|
||||||
ttlSeconds = ptr.Of(int32(ttl.Seconds()))
|
ttlSeconds = ptr.Of(int32(ttl.Seconds()))
|
||||||
|
} else {
|
||||||
|
ttlSeconds = ptr.Of(int32(defaultTTL.Seconds()))
|
||||||
}
|
}
|
||||||
|
|
||||||
s, err := strconv.Unquote(string(data))
|
s, err := strconv.Unquote(string(data))
|
||||||
|
@ -132,10 +134,6 @@ func (d *AzureQueueHelper) Write(ctx context.Context, data []byte, ttl *time.Dur
|
||||||
s = base64.StdEncoding.EncodeToString([]byte(s))
|
s = base64.StdEncoding.EncodeToString([]byte(s))
|
||||||
}
|
}
|
||||||
|
|
||||||
if ttl == nil {
|
|
||||||
ttlToUse := defaultTTL
|
|
||||||
ttl = &ttlToUse
|
|
||||||
}
|
|
||||||
_, err = d.queueClient.EnqueueMessage(ctx, s, &azqueue.EnqueueMessageOptions{
|
_, err = d.queueClient.EnqueueMessage(ctx, s, &azqueue.EnqueueMessageOptions{
|
||||||
TimeToLive: ttlSeconds,
|
TimeToLive: ttlSeconds,
|
||||||
})
|
})
|
||||||
|
@ -161,16 +159,17 @@ func (d *AzureQueueHelper) Read(ctx context.Context, consumer *consumer) error {
|
||||||
}
|
}
|
||||||
mt := res.Messages[0].MessageText
|
mt := res.Messages[0].MessageText
|
||||||
|
|
||||||
var data []byte
|
data := []byte("")
|
||||||
|
if mt != nil {
|
||||||
if d.decodeBase64 {
|
if d.decodeBase64 {
|
||||||
decoded, decodeError := base64.StdEncoding.DecodeString(*mt)
|
decoded, decodeError := base64.StdEncoding.DecodeString(*mt)
|
||||||
if decodeError != nil {
|
if decodeError != nil {
|
||||||
return decodeError
|
return decodeError
|
||||||
|
}
|
||||||
|
data = decoded
|
||||||
|
} else {
|
||||||
|
data = []byte(*mt)
|
||||||
}
|
}
|
||||||
data = decoded
|
|
||||||
} else {
|
|
||||||
data = []byte(*mt)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = consumer.callback(ctx, &bindings.ReadResponse{
|
_, err = consumer.callback(ctx, &bindings.ReadResponse{
|
||||||
|
@ -181,12 +180,15 @@ func (d *AzureQueueHelper) Read(ctx context.Context, consumer *consumer) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = d.queueClient.DeleteMessage(ctx, *res.Messages[0].MessageID, *res.Messages[0].PopReceipt, &azqueue.DeleteMessageOptions{})
|
if res.Messages[0].MessageID != nil && res.Messages[0].PopReceipt != nil {
|
||||||
if err != nil {
|
_, err = d.queueClient.DeleteMessage(ctx, *res.Messages[0].MessageID, *res.Messages[0].PopReceipt, nil)
|
||||||
return err
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("could not delete message from queue: message ID or pop receipt is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *AzureQueueHelper) Close() error {
|
func (d *AzureQueueHelper) Close() error {
|
||||||
|
|
Loading…
Reference in New Issue