diff --git a/pubsub/kafka/metadata.yaml b/pubsub/kafka/metadata.yaml new file mode 100644 index 000000000..7955be4d5 --- /dev/null +++ b/pubsub/kafka/metadata.yaml @@ -0,0 +1,135 @@ +# yaml-language-server: $schema=../../component-metadata-schema.json +schemaVersion: v1 +type: pubsub +name: kafka +version: v1 +status: stable +title: "Apache Kafka" +urls: + - title: Reference + url: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-apache-kafka/ +metadata: + - name: brokers + required: true + description: "A comma-separated list of Kafka brokers" + example: "localhost:9092,dapr-kafka.myapp.svc.cluster.local:9093" + type: string + - name: consumerGroup + required: false + description: "A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic" + example: "group1" + type: string + - name: clientID + required: false + description: "A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. Defaults to \"sarama\"" + example: "my-dapr-app" + type: string + - name: authRequired + required: false + description: "Deprecated Enable SASL authentication with the Kafka brokers" + example: "false" + type: bool + - name: authType + required: true + description: | + Configure or disable authentication. Supported values: "none", "password", "mtls", or "oidc" + example: "none" + type: string + - name: saslUsername + required: false + description: | + The SASL username used for authentication. Only required if authType is set to "password" + example: "myuser" + type: string + - name: saslPassword + required: false + description: | + The SASL password used for authentication. Can be secretKeyRef to use a secret reference. Only required if authType is set to "password" + example: "mypassword" + type: string + - name: saslMechanism + required: false + description: | + The SASL Authentication Mechanism you wish to use. Only required if authType is set to "password". Defaults to "PLAINTEXT" + example: "SHA-512" + type: string + - name: initialOffset + required: false + description: | + The initial offset to use if no offset was previously committed. Should be "newest" or "oldest". Defaults to "newest" + example: "oldest" + type: string + - name: maxMessageBytes + required: false + description: "The maximum size in bytes allowed for a single Kafka message. Defaults to 1024" + example: "2048" + type: number + - name: consumeRetryInterval + required: false + description: | + The interval between retries when attempting to consume topics. Treats numbers without suffix as milliseconds. Defaults to "100ms" + example: "200ms" + type: string + - name: consumeRetryEnabled + required: false + description: | + Disables consumer retry by setting this to "false" + example: "true" + type: bool + - name: version + required: false + description: | + Kafka cluster version. Defaults to "2.0.0.0" + example: "0.10.2.0" + type: string + - name: caCert + required: false + description: "Certificate authority certificate, required for using TLS. Can be secretKeyRef to use a secret reference" + example: "-----BEGIN CERTIFICATE-----\n\n-----END CERTIFICATE-----" + type: string + - name: clientCert + required: false + description: "Client certificate, required for authType mtls. Can be secretKeyRef to use a secret reference" + example: "-----BEGIN CERTIFICATE-----\n\n-----END CERTIFICATE-----" + type: string + - name: clientKey + required: false + description: "Client key, required for authType mtls. Can be secretKeyRef to use a secret reference" + example: "-----BEGIN RSA PRIVATE KEY-----\n\n-----END RSA PRIVATE KEY-----" + type: string + - name: skipVerify + required: false + description: | + Skip TLS verification, this is not recommended for use in production. Defaults to "false" + example: "true" + type: bool + - name: disableTls + required: false + description: | + Disable TLS for transport security. This is not recommended for use in production. Defaults to "false", which implies TLS is enabled + example: "true" + type: bool + - name: oidcTokenEndpoint + required: false + description: "Full URL to an OAuth2 identity provider access token endpoint. Required when authType is set to oidc" + example: "https://identity.example.com/v1/token" + type: string + - name: oidcClientID + required: false + description: "The OAuth2 client ID that has been provisioned in the identity provider. Required when authType is set to oidc" + example: "dapr-kafka" + type: string + - name: oidcClientSecret + required: false + description: "The OAuth2 client secret that has been provisioned in the identity provider. Required when authType is set to oidc" + example: "KeFg23!" + type: string + - name: oidcScopes + required: false + description: | + Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when authType is set to oidc. Defaults to "openid" + example: "openid,kafka-prod" + type: string + + + diff --git a/state/mongodb/mongodb.go b/state/mongodb/mongodb.go index 060ab78c8..046abddb2 100644 --- a/state/mongodb/mongodb.go +++ b/state/mongodb/mongodb.go @@ -11,10 +11,9 @@ See the License for the specific language governing permissions and limitations under the License. */ +// Package mongodb is an implementation of StateStore interface to perform operations on store package mongodb -// mongodb package is an implementation of StateStore interface to perform operations on store - import ( "context" "encoding/json" @@ -22,6 +21,7 @@ import ( "fmt" "reflect" "strconv" + "strings" "time" "github.com/google/uuid" @@ -75,13 +75,15 @@ const ( // MongoDB is a state store implementation for MongoDB. type MongoDB struct { state.DefaultBulkStore + client *mongo.Client collection *mongo.Collection operationTimeout time.Duration metadata mongoDBMetadata - features []state.Feature - logger logger.Logger + features []state.Feature + logger logger.Logger + isReplicaSet bool } type mongoDBMetadata struct { @@ -117,40 +119,39 @@ func NewMongoDB(logger logger.Logger) state.Store { } // Init establishes connection to the store based on the metadata. -func (m *MongoDB) Init(ctx context.Context, metadata state.Metadata) error { - meta, err := getMongoDBMetaData(metadata) +func (m *MongoDB) Init(ctx context.Context, metadata state.Metadata) (err error) { + m.metadata, err = getMongoDBMetaData(metadata) if err != nil { return err } - m.operationTimeout = meta.OperationTimeout + m.operationTimeout = m.metadata.OperationTimeout - client, err := getMongoDBClient(ctx, meta) + client, err := m.getMongoDBClient(ctx) if err != nil { return fmt.Errorf("error in creating mongodb client: %s", err) } if err = client.Ping(ctx, nil); err != nil { - return fmt.Errorf("error in connecting to mongodb, host: %s error: %s", meta.Host, err) + return fmt.Errorf("error in connecting to mongodb, host: %s error: %s", m.metadata.Host, err) } m.client = client // get the write concern - wc, err := getWriteConcernObject(meta.Writeconcern) + wc, err := getWriteConcernObject(m.metadata.Writeconcern) if err != nil { return fmt.Errorf("error in getting write concern object: %s", err) } // get the read concern - rc, err := getReadConcernObject(meta.Readconcern) + rc, err := getReadConcernObject(m.metadata.Readconcern) if err != nil { return fmt.Errorf("error in getting read concern object: %s", err) } - m.metadata = *meta opts := options.Collection().SetWriteConcern(wc).SetReadConcern(rc) - m.collection = m.client.Database(meta.DatabaseName).Collection(meta.CollectionName, opts) + m.collection = m.client.Database(m.metadata.DatabaseName).Collection(m.metadata.CollectionName, opts) // Set expireAfterSeconds index on ttl field with a value of 0 to delete // values immediately when the TTL value is reached. @@ -164,6 +165,10 @@ func (m *MongoDB) Init(ctx context.Context, metadata state.Metadata) error { return fmt.Errorf("error in creating ttl index: %s", err) } + if !m.isReplicaSet { + m.logger.Info("Connected to MongoDB without a replica set. Transactions are not available, and the component cannot be used as actor state store.") + } + return nil } @@ -184,7 +189,7 @@ func (m *MongoDB) Set(ctx context.Context, req *state.SetRequest) error { func (m *MongoDB) Ping(ctx context.Context) error { if err := m.client.Ping(ctx, nil); err != nil { - return fmt.Errorf("mongoDB store: error connecting to mongoDB at %s: %s", m.metadata.Host, err) + return fmt.Errorf("error connecting to mongoDB at %s: %s", m.metadata.Host, err) } return nil @@ -356,21 +361,65 @@ func (m *MongoDB) deleteInternal(ctx context.Context, req *state.DeleteRequest) return nil } -// Multi performs a transactional operation. succeeds only if all operations succeed, and fails if one or more operations fail. -func (m *MongoDB) Multi(ctx context.Context, request *state.TransactionalStateRequest) error { - sess, err := m.client.StartSession() - txnOpts := options.Transaction().SetReadConcern(readconcern.Snapshot()). - SetWriteConcern(writeconcern.New(writeconcern.WMajority())) - - defer sess.EndSession(ctx) - - if err != nil { - return fmt.Errorf("error in starting the transaction: %s", err) +// BulkSet performs a bulk save operation. +// We need to implement a custom BulkSet/BulkDelete because with MongoDB transactions are not always available (only when connecting to a replica set), and when they're not, we need to fall back to performing operations in sequence. +func (m *MongoDB) BulkSet(ctx context.Context, req []state.SetRequest) error { + // Use transactions if we can + if m.isReplicaSet { + return m.Multi(ctx, &state.TransactionalStateRequest{ + Operations: state.ToTransactionalStateOperationSlice(state.Upsert, req), + }) } + // Fallback to executing all operations in sequence + for i := range req { + err := m.Set(ctx, &req[i]) + if err != nil { + return err + } + } + + return nil +} + +// BulkDelete performs a bulk delete operation. +// We need to implement a custom BulkSet/BulkDelete because with MongoDB transactions are not always available (only when connecting to a replica set), and when they're not, we need to fall back to performing operations in sequence. +func (m *MongoDB) BulkDelete(ctx context.Context, req []state.DeleteRequest) error { + // Use transactions if we can + if m.isReplicaSet { + return m.Multi(ctx, &state.TransactionalStateRequest{ + Operations: state.ToTransactionalStateOperationSlice(state.Delete, req), + }) + } + + // Fallback to executing all operations in sequence + for i := range req { + err := m.Delete(ctx, &req[i]) + if err != nil { + return err + } + } + + return nil +} + +// Multi performs a transactional operation. succeeds only if all operations succeed, and fails if one or more operations fail. +func (m *MongoDB) Multi(ctx context.Context, request *state.TransactionalStateRequest) error { + if !m.isReplicaSet { + return errors.New("using transactions with MongoDB requires connecting to a replica set") + } + + sess, err := m.client.StartSession() + if err != nil { + return fmt.Errorf("error starting the transaction: %w", err) + } + defer sess.EndSession(ctx) + + txnOpts := options.Transaction(). + SetReadConcern(readconcern.Snapshot()). + SetWriteConcern(writeconcern.New(writeconcern.WMajority())) sess.WithTransaction(ctx, func(sessCtx mongo.SessionContext) (interface{}, error) { err = m.doTransaction(sessCtx, request.Operations) - return nil, err }, txnOpts) @@ -390,8 +439,7 @@ func (m *MongoDB) doTransaction(sessCtx mongo.SessionContext, operations []state if err != nil { sessCtx.AbortTransaction(sessCtx) - - return fmt.Errorf("error during transaction, aborting the transaction: %s", err) + return fmt.Errorf("error during transaction, aborting the transaction: %w", err) } } @@ -416,7 +464,7 @@ func (m *MongoDB) Query(ctx context.Context, req *state.QueryRequest) (*state.Qu }, nil } -func getMongoConnectionString(metadata *mongoDBMetadata) string { +func (metadata *mongoDBMetadata) getMongoConnectionString() string { if metadata.ConnectionString != "" { return metadata.ConnectionString } @@ -436,14 +484,15 @@ func getMongoConnectionString(metadata *mongoDBMetadata) string { return fmt.Sprintf(connectionURIFormat, metadata.Host, metadata.DatabaseName, metadata.Params) } -func getMongoDBClient(ctx context.Context, metadata *mongoDBMetadata) (*mongo.Client, error) { - uri := getMongoConnectionString(metadata) +func (m *MongoDB) getMongoDBClient(ctx context.Context) (*mongo.Client, error) { + uri := m.metadata.getMongoConnectionString() // Set client options clientOptions := options.Client().ApplyURI(uri) + m.isReplicaSet = clientOptions.ReplicaSet != nil // Connect to MongoDB - ctx, cancel := context.WithTimeout(ctx, metadata.OperationTimeout) + ctx, cancel := context.WithTimeout(ctx, m.metadata.OperationTimeout) defer cancel() daprUserAgent := "dapr-" + logger.DaprVersion @@ -461,7 +510,7 @@ func getMongoDBClient(ctx context.Context, metadata *mongoDBMetadata) (*mongo.Cl return client, nil } -func getMongoDBMetaData(meta state.Metadata) (*mongoDBMetadata, error) { +func getMongoDBMetaData(meta state.Metadata) (mongoDBMetadata, error) { m := mongoDBMetadata{ DatabaseName: defaultDatabaseName, CollectionName: defaultCollectionName, @@ -470,16 +519,21 @@ func getMongoDBMetaData(meta state.Metadata) (*mongoDBMetadata, error) { decodeErr := metadata.DecodeMetadata(meta.Properties, &m) if decodeErr != nil { - return nil, decodeErr + return m, decodeErr } if m.ConnectionString == "" { if len(m.Host) == 0 && len(m.Server) == 0 { - return nil, errors.New("must set 'host' or 'server' fields in metadata") + return m, errors.New("must set 'host' or 'server' fields in metadata") } if len(m.Host) != 0 && len(m.Server) != 0 { - return nil, errors.New("'host' or 'server' fields are mutually exclusive") + return m, errors.New("'host' or 'server' fields are mutually exclusive") + } + + // Ensure that params, if set, start with ? + if m.Params != "" && !strings.HasPrefix(m.Params, "?") { + m.Params = "?" + m.Params } } @@ -487,11 +541,11 @@ func getMongoDBMetaData(meta state.Metadata) (*mongoDBMetadata, error) { if val, ok := meta.Properties[operationTimeout]; ok && val != "" { m.OperationTimeout, err = time.ParseDuration(val) if err != nil { - return nil, errors.New("incorrect operationTimeout field from metadata") + return m, errors.New("incorrect operationTimeout field from metadata") } } - return &m, nil + return m, nil } func getWriteConcernObject(cn string) (*writeconcern.WriteConcern, error) { diff --git a/state/mongodb/mongodb_test.go b/state/mongodb/mongodb_test.go index 103265760..c926f3f39 100644 --- a/state/mongodb/mongodb_test.go +++ b/state/mongodb/mongodb_test.go @@ -87,7 +87,7 @@ func TestGetMongoDBMetadata(t *testing.T) { metadata, err := getMongoDBMetaData(m) assert.Nil(t, err) - uri := getMongoConnectionString(metadata) + uri := metadata.getMongoConnectionString() expected := "mongodb://username:password@127.0.0.2/TestDB" assert.Equal(t, expected, uri) @@ -106,7 +106,7 @@ func TestGetMongoDBMetadata(t *testing.T) { metadata, err := getMongoDBMetaData(m) assert.Nil(t, err) - uri := getMongoConnectionString(metadata) + uri := metadata.getMongoConnectionString() expected := "mongodb://localhost:27017/TestDB" assert.Equal(t, expected, uri) @@ -128,7 +128,7 @@ func TestGetMongoDBMetadata(t *testing.T) { metadata, err := getMongoDBMetaData(m) assert.Nil(t, err) - uri := getMongoConnectionString(metadata) + uri := metadata.getMongoConnectionString() expected := "mongodb://username:password@127.0.0.2/TestDB?ssl=true" assert.Equal(t, expected, uri) @@ -148,7 +148,7 @@ func TestGetMongoDBMetadata(t *testing.T) { metadata, err := getMongoDBMetaData(m) assert.Nil(t, err) - uri := getMongoConnectionString(metadata) + uri := metadata.getMongoConnectionString() expected := "mongodb+srv://server.example.com/?ssl=true" assert.Equal(t, expected, uri) @@ -202,7 +202,7 @@ func TestGetMongoDBMetadata(t *testing.T) { metadata, err := getMongoDBMetaData(m) assert.Nil(t, err) - uri := getMongoConnectionString(metadata) + uri := metadata.getMongoConnectionString() expected := "mongodb://localhost:99999/UnchanedDB" assert.Equal(t, expected, uri) diff --git a/state/store.go b/state/store.go index 38228b64a..f02b6172a 100644 --- a/state/store.go +++ b/state/store.go @@ -100,3 +100,15 @@ func (b *DefaultBulkStore) BulkDelete(ctx context.Context, req []DeleteRequest) type Querier interface { Query(ctx context.Context, req *QueryRequest) (*QueryResponse, error) } + +// ToTransactionalStateOperationSlice converts []SetRequest and []DeleteRequest to []TransactionalStateOperation. +func ToTransactionalStateOperationSlice[T SetRequest | DeleteRequest](operation OperationType, req []T) []TransactionalStateOperation { + ops := make([]TransactionalStateOperation, len(req)) + for i, r := range req { + ops[i] = TransactionalStateOperation{ + Operation: operation, + Request: r, + } + } + return ops +}