diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 0a1c2b844..1d620fa8e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -2,9 +2,9 @@ Thank you for your interest in Dapr! -This project welcomes contributions and suggestions. Most contributions require you to signoff on your commits via -the Developer Certificate of Origin (DCO). When you submit a pull request, a DCO-bot will automatically determine -whether you need to provide signoff for your commit. Please follow the instructions provided by DCO-bot, as pull +This project welcomes contributions and suggestions. Most contributions require you to signoff on your commits via +the Developer Certificate of Origin (DCO). When you submit a pull request, a DCO-bot will automatically determine +whether you need to provide signoff for your commit. Please follow the instructions provided by DCO-bot, as pull requests cannot be merged until the author(s) have provided signoff to fulfill the DCO requirement. You may find more information on the DCO requirements [below](#developer-certificate-of-origin-signing-your-work). @@ -64,7 +64,7 @@ All contributions come through pull requests. To submit a proposed change, we re #### Use work-in-progress PRs for early feedback -A good way to communicate before investing too much time is to create a "Work-in-progress" PR and share it with your reviewers. The standard way of doing this is to add a "[WIP]" prefix in your PR's title and assign the **do-not-merge** label. This will let people looking at your PR know that it is not well baked yet. +A good way to communicate before investing too much time is to create a "Work-in-progress" PR and share it with your reviewers. The standard way of doing this is to open your PR as a draft, add a "[WIP]" prefix in your PR's title, and assign the **do-not-merge** label. This will let people looking at your PR know that it is not well baked yet. ### Developer Certificate of Origin: Signing your work diff --git a/pubsub/gcp/pubsub/metadata.go b/pubsub/gcp/pubsub/metadata.go index 7c88af06e..168d99096 100644 --- a/pubsub/gcp/pubsub/metadata.go +++ b/pubsub/gcp/pubsub/metadata.go @@ -29,12 +29,15 @@ type metadata struct { AuthProviderCertURL string `mapstructure:"authProviderX509CertUrl" mdignore:"true"` ClientCertURL string `mapstructure:"clientX509CertUrl" mdignore:"true"` - DisableEntityManagement bool `mapstructure:"disableEntityManagement"` - EnableMessageOrdering bool `mapstructure:"enableMessageOrdering"` - MaxReconnectionAttempts int `mapstructure:"maxReconnectionAttempts"` - ConnectionRecoveryInSec int `mapstructure:"connectionRecoveryInSec"` - ConnectionEndpoint string `mapstructure:"endpoint"` - OrderingKey string `mapstructure:"orderingKey"` - DeadLetterTopic string `mapstructure:"deadLetterTopic"` - MaxDeliveryAttempts int `mapstructure:"maxDeliveryAttempts"` + DisableEntityManagement bool `mapstructure:"disableEntityManagement"` + EnableMessageOrdering bool `mapstructure:"enableMessageOrdering"` + MaxReconnectionAttempts int `mapstructure:"maxReconnectionAttempts"` + ConnectionRecoveryInSec int `mapstructure:"connectionRecoveryInSec"` + ConnectionEndpoint string `mapstructure:"endpoint"` + OrderingKey string `mapstructure:"orderingKey"` + DeadLetterTopic string `mapstructure:"deadLetterTopic"` + MaxDeliveryAttempts int `mapstructure:"maxDeliveryAttempts"` + MaxOutstandingMessages int `mapstructure:"maxOutstandingMessages"` + MaxOutstandingBytes int `mapstructure:"maxOutstandingBytes"` + MaxConcurrentConnections int `mapstructure:"maxConcurrentConnections"` } diff --git a/pubsub/gcp/pubsub/metadata.yaml b/pubsub/gcp/pubsub/metadata.yaml index 0fb211fc3..4adffd2d3 100644 --- a/pubsub/gcp/pubsub/metadata.yaml +++ b/pubsub/gcp/pubsub/metadata.yaml @@ -13,32 +13,32 @@ builtinAuthenticationProfiles: metadata: - name: enableMessageOrdering description: | - When set to "true", subscribed messages will be received in order, + When set to "true", subscribed messages will be received in order, depending on publishing and permissions configuration. type: bool default: 'false' example: '"true", "false"' - name: orderingKey description: | - The key provided in the request. It's used when "enableMessageOrdering" + The key provided in the request. It's used when "enableMessageOrdering" is set to true to order messages based on such key. type: string example: '"my-orderingkey"' - name: disableEntityManagement description: | - When set to true, topics and subscriptions do not get created automatically. + When set to true, topics and subscriptions do not get created automatically. type: bool default: 'false' example: '"true", "false"' - name: maxReconnectionAttempts description: | - Defines the maximum number of reconnect attempts. + Defines the maximum number of reconnect attempts. type: number default: '30' example: '30' - name: connectionRecoveryInSec description: | - Time in seconds to wait between connection recovery attempts. + Time in seconds to wait between connection recovery attempts. type: number default: '2' example: '2' @@ -60,3 +60,18 @@ metadata: type: number default: '5' example: '5' + - name: maxOutstandingMessages + description: | + Maximum number of messages a GCP streaming-pull connection is allowed to have outstanding + type: number + example: '1000' + - name: maxOutstandingBytes + description: | + Maximum number of bytes a GCP streaming-pull connection is allowed to have outstanding + type: number + example: '1e9' + - name: maxConcurrentConnections + description: | + Max number of concurrent streaming-pull connections to maintain + type: number + example: '10' diff --git a/pubsub/gcp/pubsub/pubsub.go b/pubsub/gcp/pubsub/pubsub.go index 38f65db5a..562545db3 100644 --- a/pubsub/gcp/pubsub/pubsub.go +++ b/pubsub/gcp/pubsub/pubsub.go @@ -319,6 +319,21 @@ func (g *GCPPubSub) handleSubscriptionMessages(parentCtx context.Context, topic readReconnectAttemptsRemaining := func() int { return len(reconnAttempts) } + // Apply configured limits for MaxOutstandingMessages, MaxOutstandingBytes, and MaxConcurrentConnections + // NOTE: negative MaxOutstandingMessages and MaxOutstaningBytes values are allowed and indicate + // in the GCP pubsub library that no limit should be applied. Zero values result in the package + // default being used: 1000 messages and 1e9 (1G) bytes respectively. + if g.metadata.MaxOutstandingMessages != 0 { + sub.ReceiveSettings.MaxOutstandingMessages = g.metadata.MaxOutstandingMessages + } + if g.metadata.MaxOutstandingBytes != 0 { + sub.ReceiveSettings.MaxOutstandingBytes = g.metadata.MaxOutstandingBytes + } + // NOTE: For MaxConcurrentConnections, negative values are not allowed so only override if the value is greater than 0 + if g.metadata.MaxConcurrentConnections > 0 { + sub.ReceiveSettings.NumGoroutines = g.metadata.MaxConcurrentConnections + } + // Periodically refill the reconnect attempts channel to avoid // exhausting all the refill attempts due to intermittent issues // occurring over a longer period of time. diff --git a/pubsub/gcp/pubsub/pubsub_test.go b/pubsub/gcp/pubsub/pubsub_test.go index 99b699f8b..0ddfaaad7 100644 --- a/pubsub/gcp/pubsub/pubsub_test.go +++ b/pubsub/gcp/pubsub/pubsub_test.go @@ -128,4 +128,133 @@ func TestInit(t *testing.T) { require.Error(t, err) require.ErrorContains(t, err, "connectionRecoveryInSec") }) + + t.Run("valid optional maxOutstandingMessages", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", + "maxOutstandingMessages": "50", + } + + md, err := createMetadata(m) + require.NoError(t, err) + assert.Equal(t, 50, md.MaxOutstandingMessages, "MaxOutstandingMessages should match the provided configuration") + }) + + t.Run("missing optional maxOutstandingMessages", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", + } + + md, err := createMetadata(m) + require.NoError(t, err) + assert.Equal(t, 0, md.MaxOutstandingMessages) + }) + + t.Run("valid negative optional maxOutstandingMessages", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", + "maxOutstandingMessages": "-1", + } + + md, err := createMetadata(m) + require.NoError(t, err) + assert.Equal(t, -1, md.MaxOutstandingMessages, "MaxOutstandingMessages should match the provided configuration") + }) + + t.Run("invalid optional maxOutstandingMessages", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", + "maxOutstandingMessages": "foobar", + } + + _, err := createMetadata(m) + require.Error(t, err) + require.ErrorContains(t, err, "maxOutstandingMessages") + }) + + t.Run("valid optional maxOutstandingBytes", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", + "maxOutstandingBytes": "1000000000", + } + + md, err := createMetadata(m) + require.NoError(t, err) + assert.Equal(t, 1000000000, md.MaxOutstandingBytes, "MaxOutstandingBytes should match the provided configuration") + }) + + t.Run("missing optional maxOutstandingBytes", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", + } + + md, err := createMetadata(m) + require.NoError(t, err) + assert.Equal(t, 0, md.MaxOutstandingBytes) + }) + + t.Run("valid negative optional maxOutstandingBytes", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", + "maxOutstandingBytes": "-1", + } + + md, err := createMetadata(m) + require.NoError(t, err) + assert.Equal(t, -1, md.MaxOutstandingBytes, "MaxOutstandingBytes should match the provided configuration") + }) + + t.Run("invalid optional maxOutstandingBytes", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", + "maxOutstandingBytes": "foobar", + } + + _, err := createMetadata(m) + require.Error(t, err) + require.ErrorContains(t, err, "maxOutstandingBytes") + }) + + t.Run("valid optional maxConcurrentConnections", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", + "maxConcurrentConnections": "2", + } + + md, err := createMetadata(m) + require.NoError(t, err) + assert.Equal(t, 2, md.MaxConcurrentConnections, "MaxConcurrentConnections should match the provided configuration") + }) + + t.Run("missing optional maxConcurrentConnections", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", + } + + md, err := createMetadata(m) + require.NoError(t, err) + assert.Equal(t, 0, md.MaxConcurrentConnections) + }) + + t.Run("invalid optional maxConcurrentConnections", func(t *testing.T) { + m := pubsub.Metadata{} + m.Properties = map[string]string{ + "projectId": "test-project", + "maxConcurrentConnections": "foobar", + } + + _, err := createMetadata(m) + require.Error(t, err) + require.ErrorContains(t, err, "maxConcurrentConnections") + }) }