Support configurable MaxOutstanding* and NumGoroutines settings for GCP PubSub component (#3442)

Signed-off-by: Nathan Lowry <nathandl@gmail.com>
This commit is contained in:
Nathan Lowry 2024-06-21 16:02:29 -04:00 committed by GitHub
parent 0375e200b6
commit 864baaad3d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 179 additions and 17 deletions

View File

@ -64,7 +64,7 @@ All contributions come through pull requests. To submit a proposed change, we re
#### Use work-in-progress PRs for early feedback
A good way to communicate before investing too much time is to create a "Work-in-progress" PR and share it with your reviewers. The standard way of doing this is to add a "[WIP]" prefix in your PR's title and assign the **do-not-merge** label. This will let people looking at your PR know that it is not well baked yet.
A good way to communicate before investing too much time is to create a "Work-in-progress" PR and share it with your reviewers. The standard way of doing this is to open your PR as a draft, add a "[WIP]" prefix in your PR's title, and assign the **do-not-merge** label. This will let people looking at your PR know that it is not well baked yet.
### Developer Certificate of Origin: Signing your work

View File

@ -37,4 +37,7 @@ type metadata struct {
OrderingKey string `mapstructure:"orderingKey"`
DeadLetterTopic string `mapstructure:"deadLetterTopic"`
MaxDeliveryAttempts int `mapstructure:"maxDeliveryAttempts"`
MaxOutstandingMessages int `mapstructure:"maxOutstandingMessages"`
MaxOutstandingBytes int `mapstructure:"maxOutstandingBytes"`
MaxConcurrentConnections int `mapstructure:"maxConcurrentConnections"`
}

View File

@ -60,3 +60,18 @@ metadata:
type: number
default: '5'
example: '5'
- name: maxOutstandingMessages
description: |
Maximum number of messages a GCP streaming-pull connection is allowed to have outstanding
type: number
example: '1000'
- name: maxOutstandingBytes
description: |
Maximum number of bytes a GCP streaming-pull connection is allowed to have outstanding
type: number
example: '1e9'
- name: maxConcurrentConnections
description: |
Max number of concurrent streaming-pull connections to maintain
type: number
example: '10'

View File

@ -319,6 +319,21 @@ func (g *GCPPubSub) handleSubscriptionMessages(parentCtx context.Context, topic
readReconnectAttemptsRemaining := func() int { return len(reconnAttempts) }
// Apply configured limits for MaxOutstandingMessages, MaxOutstandingBytes, and MaxConcurrentConnections
// NOTE: negative MaxOutstandingMessages and MaxOutstaningBytes values are allowed and indicate
// in the GCP pubsub library that no limit should be applied. Zero values result in the package
// default being used: 1000 messages and 1e9 (1G) bytes respectively.
if g.metadata.MaxOutstandingMessages != 0 {
sub.ReceiveSettings.MaxOutstandingMessages = g.metadata.MaxOutstandingMessages
}
if g.metadata.MaxOutstandingBytes != 0 {
sub.ReceiveSettings.MaxOutstandingBytes = g.metadata.MaxOutstandingBytes
}
// NOTE: For MaxConcurrentConnections, negative values are not allowed so only override if the value is greater than 0
if g.metadata.MaxConcurrentConnections > 0 {
sub.ReceiveSettings.NumGoroutines = g.metadata.MaxConcurrentConnections
}
// Periodically refill the reconnect attempts channel to avoid
// exhausting all the refill attempts due to intermittent issues
// occurring over a longer period of time.

View File

@ -128,4 +128,133 @@ func TestInit(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, "connectionRecoveryInSec")
})
t.Run("valid optional maxOutstandingMessages", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project",
"maxOutstandingMessages": "50",
}
md, err := createMetadata(m)
require.NoError(t, err)
assert.Equal(t, 50, md.MaxOutstandingMessages, "MaxOutstandingMessages should match the provided configuration")
})
t.Run("missing optional maxOutstandingMessages", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project",
}
md, err := createMetadata(m)
require.NoError(t, err)
assert.Equal(t, 0, md.MaxOutstandingMessages)
})
t.Run("valid negative optional maxOutstandingMessages", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project",
"maxOutstandingMessages": "-1",
}
md, err := createMetadata(m)
require.NoError(t, err)
assert.Equal(t, -1, md.MaxOutstandingMessages, "MaxOutstandingMessages should match the provided configuration")
})
t.Run("invalid optional maxOutstandingMessages", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project",
"maxOutstandingMessages": "foobar",
}
_, err := createMetadata(m)
require.Error(t, err)
require.ErrorContains(t, err, "maxOutstandingMessages")
})
t.Run("valid optional maxOutstandingBytes", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project",
"maxOutstandingBytes": "1000000000",
}
md, err := createMetadata(m)
require.NoError(t, err)
assert.Equal(t, 1000000000, md.MaxOutstandingBytes, "MaxOutstandingBytes should match the provided configuration")
})
t.Run("missing optional maxOutstandingBytes", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project",
}
md, err := createMetadata(m)
require.NoError(t, err)
assert.Equal(t, 0, md.MaxOutstandingBytes)
})
t.Run("valid negative optional maxOutstandingBytes", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project",
"maxOutstandingBytes": "-1",
}
md, err := createMetadata(m)
require.NoError(t, err)
assert.Equal(t, -1, md.MaxOutstandingBytes, "MaxOutstandingBytes should match the provided configuration")
})
t.Run("invalid optional maxOutstandingBytes", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project",
"maxOutstandingBytes": "foobar",
}
_, err := createMetadata(m)
require.Error(t, err)
require.ErrorContains(t, err, "maxOutstandingBytes")
})
t.Run("valid optional maxConcurrentConnections", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project",
"maxConcurrentConnections": "2",
}
md, err := createMetadata(m)
require.NoError(t, err)
assert.Equal(t, 2, md.MaxConcurrentConnections, "MaxConcurrentConnections should match the provided configuration")
})
t.Run("missing optional maxConcurrentConnections", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project",
}
md, err := createMetadata(m)
require.NoError(t, err)
assert.Equal(t, 0, md.MaxConcurrentConnections)
})
t.Run("invalid optional maxConcurrentConnections", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"projectId": "test-project",
"maxConcurrentConnections": "foobar",
}
_, err := createMetadata(m)
require.Error(t, err)
require.ErrorContains(t, err, "maxConcurrentConnections")
})
}