diff --git a/bindings/azure/storagequeues/storagequeues.go b/bindings/azure/storagequeues/storagequeues.go index a26841d02..88bd06d46 100644 --- a/bindings/azure/storagequeues/storagequeues.go +++ b/bindings/azure/storagequeues/storagequeues.go @@ -98,7 +98,7 @@ func (d *AzureQueueHelper) Init(ctx context.Context, meta bindings.Metadata) (*s var clientErr error queueServiceClient, clientErr = azqueue.NewServiceClient(m.GetQueueURL(azEnvSettings), credential, &options) if clientErr != nil { - return nil, fmt.Errorf("cannot init storage queue client with Azure AD token: %w", err) + return nil, fmt.Errorf("cannot init storage queue client with Azure AD token: %w", clientErr) } } @@ -108,7 +108,7 @@ func (d *AzureQueueHelper) Init(ctx context.Context, meta bindings.Metadata) (*s d.queueClient = queueServiceClient.NewQueueClient(m.QueueName) createCtx, createCancel := context.WithTimeout(ctx, 2*time.Minute) - _, err = d.queueClient.Create(createCtx, &azqueue.CreateOptions{}) + _, err = d.queueClient.Create(createCtx, nil) createCancel() if err != nil { return nil, err @@ -121,6 +121,8 @@ func (d *AzureQueueHelper) Write(ctx context.Context, data []byte, ttl *time.Dur var ttlSeconds *int32 if ttl != nil { ttlSeconds = ptr.Of(int32(ttl.Seconds())) + } else { + ttlSeconds = ptr.Of(int32(defaultTTL.Seconds())) } s, err := strconv.Unquote(string(data)) @@ -132,10 +134,6 @@ func (d *AzureQueueHelper) Write(ctx context.Context, data []byte, ttl *time.Dur s = base64.StdEncoding.EncodeToString([]byte(s)) } - if ttl == nil { - ttlToUse := defaultTTL - ttl = &ttlToUse - } _, err = d.queueClient.EnqueueMessage(ctx, s, &azqueue.EnqueueMessageOptions{ TimeToLive: ttlSeconds, }) @@ -161,16 +159,17 @@ func (d *AzureQueueHelper) Read(ctx context.Context, consumer *consumer) error { } mt := res.Messages[0].MessageText - var data []byte - - if d.decodeBase64 { - decoded, decodeError := base64.StdEncoding.DecodeString(*mt) - if decodeError != nil { - return decodeError + data := []byte("") + if mt != nil { + if d.decodeBase64 { + decoded, decodeError := base64.StdEncoding.DecodeString(*mt) + if decodeError != nil { + return decodeError + } + data = decoded + } else { + data = []byte(*mt) } - data = decoded - } else { - data = []byte(*mt) } _, err = consumer.callback(ctx, &bindings.ReadResponse{ @@ -181,12 +180,15 @@ func (d *AzureQueueHelper) Read(ctx context.Context, consumer *consumer) error { return err } - _, err = d.queueClient.DeleteMessage(ctx, *res.Messages[0].MessageID, *res.Messages[0].PopReceipt, &azqueue.DeleteMessageOptions{}) - if err != nil { - return err + if res.Messages[0].MessageID != nil && res.Messages[0].PopReceipt != nil { + _, err = d.queueClient.DeleteMessage(ctx, *res.Messages[0].MessageID, *res.Messages[0].PopReceipt, nil) + if err != nil { + return err + } + return nil + } else { + return fmt.Errorf("could not delete message from queue: message ID or pop receipt is nil") } - - return nil } func (d *AzureQueueHelper) Close() error {