Adding initialVisibilityDelay option to Azure Storage Queue binding component
Signed-off-by: Will Velida <willvelida@hotmail.co.uk>
This commit is contained in:
parent
294dd75354
commit
8829ed0245
|
@ -69,7 +69,7 @@ metadata:
|
|||
default: 'false'
|
||||
binding:
|
||||
output: true
|
||||
input: false
|
||||
input: false
|
||||
- name: "encodeBase64"
|
||||
type: bool
|
||||
description: |
|
||||
|
@ -88,4 +88,13 @@ metadata:
|
|||
binding:
|
||||
output: false
|
||||
input: true
|
||||
- name: "initialVisibilityDelay"
|
||||
type: duration
|
||||
description: |
|
||||
Sets a delay before a message becomes visible in the queue after being added.
|
||||
It can also be specified per message by setting the `initialVisibilityDelay` property in the invocation request's metadata.
|
||||
example: '30s'
|
||||
binding:
|
||||
output: true
|
||||
input: false
|
||||
|
||||
|
|
|
@ -36,15 +36,16 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
defaultTTL = 10 * time.Minute
|
||||
defaultVisibilityTimeout = 30 * time.Second
|
||||
defaultPollingInterval = 10 * time.Second
|
||||
dequeueCount = "dequeueCount"
|
||||
insertionTime = "insertionTime"
|
||||
expirationTime = "expirationTime"
|
||||
nextVisibleTime = "nextVisibleTime"
|
||||
popReceipt = "popReceipt"
|
||||
messageID = "messageID"
|
||||
defaultTTL = 10 * time.Minute
|
||||
defaultVisibilityTimeout = 30 * time.Second
|
||||
defaultPollingInterval = 10 * time.Second
|
||||
defaultInitialVisibilityDelay = 30 * time.Second
|
||||
dequeueCount = "dequeueCount"
|
||||
insertionTime = "insertionTime"
|
||||
expirationTime = "expirationTime"
|
||||
nextVisibleTime = "nextVisibleTime"
|
||||
popReceipt = "popReceipt"
|
||||
messageID = "messageID"
|
||||
)
|
||||
|
||||
type consumer struct {
|
||||
|
@ -54,7 +55,7 @@ type consumer struct {
|
|||
// QueueHelper enables injection for testnig.
|
||||
type QueueHelper interface {
|
||||
Init(ctx context.Context, metadata bindings.Metadata) (*storageQueuesMetadata, error)
|
||||
Write(ctx context.Context, data []byte, ttl *time.Duration) error
|
||||
Write(ctx context.Context, data []byte, ttl *time.Duration, initialVisibilityDelay *time.Duration) error
|
||||
Read(ctx context.Context, consumer *consumer) error
|
||||
Close() error
|
||||
}
|
||||
|
@ -129,7 +130,7 @@ func (d *AzureQueueHelper) Init(ctx context.Context, meta bindings.Metadata) (*s
|
|||
return m, nil
|
||||
}
|
||||
|
||||
func (d *AzureQueueHelper) Write(ctx context.Context, data []byte, ttl *time.Duration) error {
|
||||
func (d *AzureQueueHelper) Write(ctx context.Context, data []byte, ttl *time.Duration, initialVisibilityDelay *time.Duration) error {
|
||||
var ttlSeconds *int32
|
||||
if ttl != nil {
|
||||
ttlSeconds = ptr.Of(int32(ttl.Seconds()))
|
||||
|
@ -146,9 +147,16 @@ func (d *AzureQueueHelper) Write(ctx context.Context, data []byte, ttl *time.Dur
|
|||
s = base64.StdEncoding.EncodeToString([]byte(s))
|
||||
}
|
||||
|
||||
_, err = d.queueClient.EnqueueMessage(ctx, s, &azqueue.EnqueueMessageOptions{
|
||||
options := &azqueue.EnqueueMessageOptions{
|
||||
TimeToLive: ttlSeconds,
|
||||
})
|
||||
}
|
||||
|
||||
// Add the initial visibility delay if specified
|
||||
if initialVisibilityDelay != nil {
|
||||
options.VisibilityTimeout = ptr.Of(int32(initialVisibilityDelay.Seconds()))
|
||||
}
|
||||
|
||||
_, err = d.queueClient.EnqueueMessage(ctx, s, options)
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -248,15 +256,16 @@ type AzureStorageQueues struct {
|
|||
}
|
||||
|
||||
type storageQueuesMetadata struct {
|
||||
QueueName string
|
||||
QueueEndpoint string
|
||||
AccountName string
|
||||
AccountKey string
|
||||
DecodeBase64 bool
|
||||
EncodeBase64 bool
|
||||
PollingInterval time.Duration `mapstructure:"pollingInterval"`
|
||||
TTL *time.Duration `mapstructure:"ttl" mapstructurealiases:"ttlInSeconds"`
|
||||
VisibilityTimeout *time.Duration
|
||||
QueueName string
|
||||
QueueEndpoint string
|
||||
AccountName string
|
||||
AccountKey string
|
||||
DecodeBase64 bool
|
||||
EncodeBase64 bool
|
||||
PollingInterval time.Duration `mapstructure:"pollingInterval"`
|
||||
TTL *time.Duration `mapstructure:"ttl" mapstructurealiases:"ttlInSeconds"`
|
||||
VisibilityTimeout *time.Duration
|
||||
InitialVisibilityDelay *time.Duration `mapstructure:"initialVisibilityDelay"`
|
||||
}
|
||||
|
||||
func (m *storageQueuesMetadata) GetQueueURL(azEnvSettings azauth.EnvironmentSettings) string {
|
||||
|
@ -350,7 +359,17 @@ func (a *AzureStorageQueues) Invoke(ctx context.Context, req *bindings.InvokeReq
|
|||
ttlToUse = &ttl
|
||||
}
|
||||
|
||||
err = a.helper.Write(ctx, req.Data, ttlToUse)
|
||||
// Get the initial visibility delay from request metadata, or use the component's metadata
|
||||
initialVisibilityDelayToUse := a.metadata.InitialVisibilityDelay
|
||||
if val, ok := req.Metadata["initialVisibilityDelay"]; ok && val != "" {
|
||||
duration, parseErr := time.ParseDuration(val)
|
||||
if parseErr != nil {
|
||||
return nil, fmt.Errorf("invalid value for initialVisibilityDelay: %w", parseErr)
|
||||
}
|
||||
initialVisibilityDelayToUse = &duration
|
||||
}
|
||||
|
||||
err = a.helper.Write(ctx, req.Data, ttlToUse, initialVisibilityDelayToUse)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -45,9 +45,9 @@ func (m *MockHelper) Init(ctx context.Context, metadata bindings.Metadata) (*sto
|
|||
return m.metadata, err
|
||||
}
|
||||
|
||||
func (m *MockHelper) Write(ctx context.Context, data []byte, ttl *time.Duration) error {
|
||||
func (m *MockHelper) Write(ctx context.Context, data []byte, ttl *time.Duration, initialVisibilityDelay *time.Duration) error {
|
||||
m.messages <- data
|
||||
retvals := m.Called(data, ttl)
|
||||
retvals := m.Called(data, ttl, initialVisibilityDelay)
|
||||
return retvals.Error(0)
|
||||
}
|
||||
|
||||
|
@ -89,6 +89,8 @@ func TestWriteQueue(t *testing.T) {
|
|||
mm := new(MockHelper)
|
||||
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool {
|
||||
return in == nil
|
||||
}), mock.MatchedBy(func(in *time.Duration) bool {
|
||||
return in == nil
|
||||
})).Return(nil)
|
||||
|
||||
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
|
||||
|
@ -111,6 +113,8 @@ func TestWriteWithTTLInQueue(t *testing.T) {
|
|||
mm := new(MockHelper)
|
||||
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool {
|
||||
return in != nil && *in == time.Second
|
||||
}), mock.MatchedBy(func(in *time.Duration) bool {
|
||||
return in == nil
|
||||
})).Return(nil)
|
||||
|
||||
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
|
||||
|
@ -133,6 +137,8 @@ func TestWriteWithTTLInWrite(t *testing.T) {
|
|||
mm := new(MockHelper)
|
||||
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool {
|
||||
return in != nil && *in == time.Second
|
||||
}), mock.MatchedBy(func(in *time.Duration) bool {
|
||||
return in == nil
|
||||
})).Return(nil)
|
||||
|
||||
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
|
||||
|
@ -174,7 +180,7 @@ func TestWriteWithTTLInWrite(t *testing.T) {
|
|||
|
||||
func TestReadQueue(t *testing.T) {
|
||||
mm := new(MockHelper)
|
||||
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration")).Return(nil)
|
||||
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration"), mock.AnythingOfType("*time.Duration")).Return(nil)
|
||||
mm.On("Read", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("*storagequeues.consumer")).Return(nil)
|
||||
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
|
||||
|
||||
|
@ -215,7 +221,7 @@ func TestReadQueue(t *testing.T) {
|
|||
|
||||
func TestReadQueueDecode(t *testing.T) {
|
||||
mm := new(MockHelper)
|
||||
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration")).Return(nil)
|
||||
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration"), mock.AnythingOfType("*time.Duration")).Return(nil)
|
||||
mm.On("Read", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("*storagequeues.consumer")).Return(nil)
|
||||
|
||||
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
|
||||
|
@ -286,7 +292,7 @@ func TestReadQueueDecode(t *testing.T) {
|
|||
*/
|
||||
func TestReadQueueNoMessage(t *testing.T) {
|
||||
mm := new(MockHelper)
|
||||
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration")).Return(nil)
|
||||
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.AnythingOfType("*time.Duration"), mock.AnythingOfType("*time.Duration")).Return(nil)
|
||||
mm.On("Read", mock.AnythingOfType("*context.cancelCtx"), mock.AnythingOfType("*storagequeues.consumer")).Return(nil)
|
||||
|
||||
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
|
||||
|
@ -322,64 +328,71 @@ func TestParseMetadata(t *testing.T) {
|
|||
properties map[string]string
|
||||
// Account key is parsed in azauth
|
||||
// expectedAccountKey string
|
||||
expectedQueueName string
|
||||
expectedQueueEndpointURL string
|
||||
expectedPollingInterval time.Duration
|
||||
expectedTTL *time.Duration
|
||||
expectedVisibilityTimeout *time.Duration
|
||||
expectedQueueName string
|
||||
expectedQueueEndpointURL string
|
||||
expectedPollingInterval time.Duration
|
||||
expectedTTL *time.Duration
|
||||
expectedVisibilityTimeout *time.Duration
|
||||
expectedInitialVisibilityDelay *time.Duration
|
||||
}{
|
||||
{
|
||||
name: "Account and key",
|
||||
properties: map[string]string{"storageAccessKey": "myKey", "queue": "queue1", "storageAccount": "devstoreaccount1"},
|
||||
// expectedAccountKey: "myKey",
|
||||
expectedQueueName: "queue1",
|
||||
expectedQueueEndpointURL: "",
|
||||
expectedPollingInterval: defaultPollingInterval,
|
||||
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
|
||||
expectedQueueName: "queue1",
|
||||
expectedQueueEndpointURL: "",
|
||||
expectedPollingInterval: defaultPollingInterval,
|
||||
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
|
||||
expectedInitialVisibilityDelay: ptr.Of(defaultInitialVisibilityDelay),
|
||||
},
|
||||
{
|
||||
name: "Accout, key, and endpoint",
|
||||
properties: map[string]string{"accountKey": "myKey", "queueName": "queue1", "storageAccount": "someAccount", "queueEndpointUrl": "https://foo.example.com:10001"},
|
||||
// expectedAccountKey: "myKey",
|
||||
expectedQueueName: "queue1",
|
||||
expectedQueueEndpointURL: "https://foo.example.com:10001",
|
||||
expectedPollingInterval: defaultPollingInterval,
|
||||
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
|
||||
expectedQueueName: "queue1",
|
||||
expectedQueueEndpointURL: "https://foo.example.com:10001",
|
||||
expectedPollingInterval: defaultPollingInterval,
|
||||
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
|
||||
expectedInitialVisibilityDelay: ptr.Of(defaultInitialVisibilityDelay),
|
||||
},
|
||||
{
|
||||
name: "Empty TTL",
|
||||
properties: map[string]string{"storageAccessKey": "myKey", "queue": "queue1", "storageAccount": "devstoreaccount1", metadata.TTLMetadataKey: ""},
|
||||
// expectedAccountKey: "myKey",
|
||||
expectedQueueName: "queue1",
|
||||
expectedQueueEndpointURL: "",
|
||||
expectedPollingInterval: defaultPollingInterval,
|
||||
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
|
||||
expectedQueueName: "queue1",
|
||||
expectedQueueEndpointURL: "",
|
||||
expectedPollingInterval: defaultPollingInterval,
|
||||
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
|
||||
expectedInitialVisibilityDelay: ptr.Of(defaultInitialVisibilityDelay),
|
||||
},
|
||||
{
|
||||
name: "With TTL",
|
||||
properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", metadata.TTLMetadataKey: "1"},
|
||||
// expectedAccountKey: "myKey",
|
||||
expectedQueueName: "queue1",
|
||||
expectedTTL: &oneSecondDuration,
|
||||
expectedQueueEndpointURL: "",
|
||||
expectedPollingInterval: defaultPollingInterval,
|
||||
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
|
||||
expectedQueueName: "queue1",
|
||||
expectedTTL: &oneSecondDuration,
|
||||
expectedQueueEndpointURL: "",
|
||||
expectedPollingInterval: defaultPollingInterval,
|
||||
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
|
||||
expectedInitialVisibilityDelay: ptr.Of(defaultInitialVisibilityDelay),
|
||||
},
|
||||
{
|
||||
name: "With visibility timeout",
|
||||
properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", "visibilityTimeout": "5s"},
|
||||
expectedQueueName: "queue1",
|
||||
expectedPollingInterval: defaultPollingInterval,
|
||||
expectedVisibilityTimeout: ptr.Of(5 * time.Second),
|
||||
name: "With visibility timeout",
|
||||
properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", "visibilityTimeout": "5s"},
|
||||
expectedQueueName: "queue1",
|
||||
expectedPollingInterval: defaultPollingInterval,
|
||||
expectedVisibilityTimeout: ptr.Of(5 * time.Second),
|
||||
expectedInitialVisibilityDelay: ptr.Of(defaultInitialVisibilityDelay),
|
||||
},
|
||||
{
|
||||
name: "With polling interval",
|
||||
properties: map[string]string{"accessKey": "myKey", "storageAccountQueue": "queue1", "storageAccount": "devstoreaccount1", "pollingInterval": "2s"},
|
||||
// expectedAccountKey: "myKey",
|
||||
expectedQueueName: "queue1",
|
||||
expectedQueueEndpointURL: "",
|
||||
expectedPollingInterval: 2 * time.Second,
|
||||
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
|
||||
expectedQueueName: "queue1",
|
||||
expectedQueueEndpointURL: "",
|
||||
expectedPollingInterval: 2 * time.Second,
|
||||
expectedVisibilityTimeout: ptr.Of(defaultVisibilityTimeout),
|
||||
expectedInitialVisibilityDelay: ptr.Of(defaultInitialVisibilityDelay),
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -448,3 +461,31 @@ func TestParseMetadataWithInvalidTTL(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteWithInitialVisibilityDelay(t *testing.T) {
|
||||
mm := new(MockHelper)
|
||||
expectedDelay := 5 * time.Second
|
||||
mm.On("Write", mock.AnythingOfType("[]uint8"), mock.MatchedBy(func(in *time.Duration) bool {
|
||||
return in == nil
|
||||
}), mock.MatchedBy(func(in *time.Duration) bool {
|
||||
return in != nil && *in == expectedDelay
|
||||
})).Return(nil)
|
||||
|
||||
a := AzureStorageQueues{helper: mm, logger: logger.NewLogger("test"), closeCh: make(chan struct{})}
|
||||
|
||||
m := bindings.Metadata{}
|
||||
m.Properties = map[string]string{"storageAccessKey": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "queue": "queue1", "storageAccount": "devstoreaccount1"}
|
||||
|
||||
err := a.Init(t.Context(), m)
|
||||
require.NoError(t, err)
|
||||
|
||||
r := bindings.InvokeRequest{
|
||||
Data: []byte("This is my message"),
|
||||
Metadata: map[string]string{"initialVisibilityDelay": "5s"},
|
||||
}
|
||||
|
||||
_, err = a.Invoke(t.Context(), &r)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, a.Close())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue