GCP Pub/Sub Message Ordering Configuration (#1099)

* Add configuration for enabling message ordering

* Simplify passing the metadata

* Update field order to satisfy alignment linter

Co-authored-by: Luke Kennedy <luke.kennedy@optiisolutions.com>
This commit is contained in:
Luke Kennedy 2021-08-31 01:54:46 +10:00 committed by GitHub
parent 6591b0de68
commit 8fd1ee57a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 2 deletions

View File

@ -3,7 +3,6 @@ package pubsub
// GCPPubSubMetaData pubsub metadata
type metadata struct {
consumerID string
DisableEntityManagement bool
Type string
IdentityProjectID string
ProjectID string
@ -15,4 +14,6 @@ type metadata struct {
TokenURI string
AuthProviderCertURL string
ClientCertURL string
DisableEntityManagement bool
EnableMessageOrdering bool
}

View File

@ -29,6 +29,7 @@ const (
metadataClientX509CertURLKey = "clientX509CertUrl"
metadataPrivateKeyKey = "privateKey"
metadataDisableEntityManagementKey = "disableEntityManagement"
metadataEnableMessageOrderingKey = "enableMessageOrdering"
)
// GCPPubSub type
@ -123,6 +124,12 @@ func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
}
}
if val, found := pubSubMetadata.Properties[metadataEnableMessageOrderingKey]; found && val != "" {
if boolVal, err := strconv.ParseBool(val); err == nil {
result.EnableMessageOrdering = boolVal
}
}
return &result, nil
}
@ -277,7 +284,7 @@ func (g *GCPPubSub) ensureSubscription(subscription string, topic string) error
exists, subErr := entity.Exists(context.Background())
if !exists {
_, subErr = g.client.CreateSubscription(context.Background(), managedSubscription,
gcppubsub.SubscriptionConfig{Topic: g.getTopic(topic)})
gcppubsub.SubscriptionConfig{Topic: g.getTopic(topic), EnableMessageOrdering: g.metadata.EnableMessageOrdering})
}
return subErr

View File

@ -22,6 +22,7 @@ func TestInit(t *testing.T) {
"identityProjectId": "project1",
"tokenUri": "https://token",
"type": "serviceaccount",
"enableMessageOrdering": "true",
}
b, err := createMetadata(m)
assert.Nil(t, err)
@ -36,6 +37,7 @@ func TestInit(t *testing.T) {
assert.Equal(t, "project1", b.IdentityProjectID)
assert.Equal(t, "https://token", b.TokenURI)
assert.Equal(t, "serviceaccount", b.Type)
assert.Equal(t, true, b.EnableMessageOrdering)
})
t.Run("metadata is correct with implicit creds", func(t *testing.T) {