Merge branch 'master' into rethinkdb-transactions

This commit is contained in:
Mukundan Sundararajan 2023-04-03 21:20:18 +05:30 committed by GitHub
commit 8a3a5e9ebc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 243 additions and 42 deletions

135
pubsub/kafka/metadata.yaml Normal file
View File

@ -0,0 +1,135 @@
# yaml-language-server: $schema=../../component-metadata-schema.json
schemaVersion: v1
type: pubsub
name: kafka
version: v1
status: stable
title: "Apache Kafka"
urls:
- title: Reference
url: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-apache-kafka/
metadata:
- name: brokers
required: true
description: "A comma-separated list of Kafka brokers"
example: "localhost:9092,dapr-kafka.myapp.svc.cluster.local:9093"
type: string
- name: consumerGroup
required: false
description: "A kafka consumer group to listen on. Each record published to a topic is delivered to one consumer within each consumer group subscribed to the topic"
example: "group1"
type: string
- name: clientID
required: false
description: "A user-provided string sent with every request to the Kafka brokers for logging, debugging, and auditing purposes. Defaults to \"sarama\""
example: "my-dapr-app"
type: string
- name: authRequired
required: false
description: "Deprecated Enable SASL authentication with the Kafka brokers"
example: "false"
type: bool
- name: authType
required: true
description: |
Configure or disable authentication. Supported values: "none", "password", "mtls", or "oidc"
example: "none"
type: string
- name: saslUsername
required: false
description: |
The SASL username used for authentication. Only required if authType is set to "password"
example: "myuser"
type: string
- name: saslPassword
required: false
description: |
The SASL password used for authentication. Can be secretKeyRef to use a secret reference. Only required if authType is set to "password"
example: "mypassword"
type: string
- name: saslMechanism
required: false
description: |
The SASL Authentication Mechanism you wish to use. Only required if authType is set to "password". Defaults to "PLAINTEXT"
example: "SHA-512"
type: string
- name: initialOffset
required: false
description: |
The initial offset to use if no offset was previously committed. Should be "newest" or "oldest". Defaults to "newest"
example: "oldest"
type: string
- name: maxMessageBytes
required: false
description: "The maximum size in bytes allowed for a single Kafka message. Defaults to 1024"
example: "2048"
type: number
- name: consumeRetryInterval
required: false
description: |
The interval between retries when attempting to consume topics. Treats numbers without suffix as milliseconds. Defaults to "100ms"
example: "200ms"
type: string
- name: consumeRetryEnabled
required: false
description: |
Disables consumer retry by setting this to "false"
example: "true"
type: bool
- name: version
required: false
description: |
Kafka cluster version. Defaults to "2.0.0.0"
example: "0.10.2.0"
type: string
- name: caCert
required: false
description: "Certificate authority certificate, required for using TLS. Can be secretKeyRef to use a secret reference"
example: "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
type: string
- name: clientCert
required: false
description: "Client certificate, required for authType mtls. Can be secretKeyRef to use a secret reference"
example: "-----BEGIN CERTIFICATE-----\n<base64-encoded DER>\n-----END CERTIFICATE-----"
type: string
- name: clientKey
required: false
description: "Client key, required for authType mtls. Can be secretKeyRef to use a secret reference"
example: "-----BEGIN RSA PRIVATE KEY-----\n<base64-encoded DER>\n-----END RSA PRIVATE KEY-----"
type: string
- name: skipVerify
required: false
description: |
Skip TLS verification, this is not recommended for use in production. Defaults to "false"
example: "true"
type: bool
- name: disableTls
required: false
description: |
Disable TLS for transport security. This is not recommended for use in production. Defaults to "false", which implies TLS is enabled
example: "true"
type: bool
- name: oidcTokenEndpoint
required: false
description: "Full URL to an OAuth2 identity provider access token endpoint. Required when authType is set to oidc"
example: "https://identity.example.com/v1/token"
type: string
- name: oidcClientID
required: false
description: "The OAuth2 client ID that has been provisioned in the identity provider. Required when authType is set to oidc"
example: "dapr-kafka"
type: string
- name: oidcClientSecret
required: false
description: "The OAuth2 client secret that has been provisioned in the identity provider. Required when authType is set to oidc"
example: "KeFg23!"
type: string
- name: oidcScopes
required: false
description: |
Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when authType is set to oidc. Defaults to "openid"
example: "openid,kafka-prod"
type: string

View File

@ -11,10 +11,9 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package mongodb is an implementation of StateStore interface to perform operations on store
package mongodb
// mongodb package is an implementation of StateStore interface to perform operations on store
import (
"context"
"encoding/json"
@ -22,6 +21,7 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"time"
"github.com/google/uuid"
@ -75,13 +75,15 @@ const (
// MongoDB is a state store implementation for MongoDB.
type MongoDB struct {
state.DefaultBulkStore
client *mongo.Client
collection *mongo.Collection
operationTimeout time.Duration
metadata mongoDBMetadata
features []state.Feature
logger logger.Logger
features []state.Feature
logger logger.Logger
isReplicaSet bool
}
type mongoDBMetadata struct {
@ -117,40 +119,39 @@ func NewMongoDB(logger logger.Logger) state.Store {
}
// Init establishes connection to the store based on the metadata.
func (m *MongoDB) Init(ctx context.Context, metadata state.Metadata) error {
meta, err := getMongoDBMetaData(metadata)
func (m *MongoDB) Init(ctx context.Context, metadata state.Metadata) (err error) {
m.metadata, err = getMongoDBMetaData(metadata)
if err != nil {
return err
}
m.operationTimeout = meta.OperationTimeout
m.operationTimeout = m.metadata.OperationTimeout
client, err := getMongoDBClient(ctx, meta)
client, err := m.getMongoDBClient(ctx)
if err != nil {
return fmt.Errorf("error in creating mongodb client: %s", err)
}
if err = client.Ping(ctx, nil); err != nil {
return fmt.Errorf("error in connecting to mongodb, host: %s error: %s", meta.Host, err)
return fmt.Errorf("error in connecting to mongodb, host: %s error: %s", m.metadata.Host, err)
}
m.client = client
// get the write concern
wc, err := getWriteConcernObject(meta.Writeconcern)
wc, err := getWriteConcernObject(m.metadata.Writeconcern)
if err != nil {
return fmt.Errorf("error in getting write concern object: %s", err)
}
// get the read concern
rc, err := getReadConcernObject(meta.Readconcern)
rc, err := getReadConcernObject(m.metadata.Readconcern)
if err != nil {
return fmt.Errorf("error in getting read concern object: %s", err)
}
m.metadata = *meta
opts := options.Collection().SetWriteConcern(wc).SetReadConcern(rc)
m.collection = m.client.Database(meta.DatabaseName).Collection(meta.CollectionName, opts)
m.collection = m.client.Database(m.metadata.DatabaseName).Collection(m.metadata.CollectionName, opts)
// Set expireAfterSeconds index on ttl field with a value of 0 to delete
// values immediately when the TTL value is reached.
@ -164,6 +165,10 @@ func (m *MongoDB) Init(ctx context.Context, metadata state.Metadata) error {
return fmt.Errorf("error in creating ttl index: %s", err)
}
if !m.isReplicaSet {
m.logger.Info("Connected to MongoDB without a replica set. Transactions are not available, and the component cannot be used as actor state store.")
}
return nil
}
@ -184,7 +189,7 @@ func (m *MongoDB) Set(ctx context.Context, req *state.SetRequest) error {
func (m *MongoDB) Ping(ctx context.Context) error {
if err := m.client.Ping(ctx, nil); err != nil {
return fmt.Errorf("mongoDB store: error connecting to mongoDB at %s: %s", m.metadata.Host, err)
return fmt.Errorf("error connecting to mongoDB at %s: %s", m.metadata.Host, err)
}
return nil
@ -356,21 +361,65 @@ func (m *MongoDB) deleteInternal(ctx context.Context, req *state.DeleteRequest)
return nil
}
// Multi performs a transactional operation. succeeds only if all operations succeed, and fails if one or more operations fail.
func (m *MongoDB) Multi(ctx context.Context, request *state.TransactionalStateRequest) error {
sess, err := m.client.StartSession()
txnOpts := options.Transaction().SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
defer sess.EndSession(ctx)
if err != nil {
return fmt.Errorf("error in starting the transaction: %s", err)
// BulkSet performs a bulk save operation.
// We need to implement a custom BulkSet/BulkDelete because with MongoDB transactions are not always available (only when connecting to a replica set), and when they're not, we need to fall back to performing operations in sequence.
func (m *MongoDB) BulkSet(ctx context.Context, req []state.SetRequest) error {
// Use transactions if we can
if m.isReplicaSet {
return m.Multi(ctx, &state.TransactionalStateRequest{
Operations: state.ToTransactionalStateOperationSlice(state.Upsert, req),
})
}
// Fallback to executing all operations in sequence
for i := range req {
err := m.Set(ctx, &req[i])
if err != nil {
return err
}
}
return nil
}
// BulkDelete performs a bulk delete operation.
// We need to implement a custom BulkSet/BulkDelete because with MongoDB transactions are not always available (only when connecting to a replica set), and when they're not, we need to fall back to performing operations in sequence.
func (m *MongoDB) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
// Use transactions if we can
if m.isReplicaSet {
return m.Multi(ctx, &state.TransactionalStateRequest{
Operations: state.ToTransactionalStateOperationSlice(state.Delete, req),
})
}
// Fallback to executing all operations in sequence
for i := range req {
err := m.Delete(ctx, &req[i])
if err != nil {
return err
}
}
return nil
}
// Multi performs a transactional operation. succeeds only if all operations succeed, and fails if one or more operations fail.
func (m *MongoDB) Multi(ctx context.Context, request *state.TransactionalStateRequest) error {
if !m.isReplicaSet {
return errors.New("using transactions with MongoDB requires connecting to a replica set")
}
sess, err := m.client.StartSession()
if err != nil {
return fmt.Errorf("error starting the transaction: %w", err)
}
defer sess.EndSession(ctx)
txnOpts := options.Transaction().
SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
sess.WithTransaction(ctx, func(sessCtx mongo.SessionContext) (interface{}, error) {
err = m.doTransaction(sessCtx, request.Operations)
return nil, err
}, txnOpts)
@ -390,8 +439,7 @@ func (m *MongoDB) doTransaction(sessCtx mongo.SessionContext, operations []state
if err != nil {
sessCtx.AbortTransaction(sessCtx)
return fmt.Errorf("error during transaction, aborting the transaction: %s", err)
return fmt.Errorf("error during transaction, aborting the transaction: %w", err)
}
}
@ -416,7 +464,7 @@ func (m *MongoDB) Query(ctx context.Context, req *state.QueryRequest) (*state.Qu
}, nil
}
func getMongoConnectionString(metadata *mongoDBMetadata) string {
func (metadata *mongoDBMetadata) getMongoConnectionString() string {
if metadata.ConnectionString != "" {
return metadata.ConnectionString
}
@ -436,14 +484,15 @@ func getMongoConnectionString(metadata *mongoDBMetadata) string {
return fmt.Sprintf(connectionURIFormat, metadata.Host, metadata.DatabaseName, metadata.Params)
}
func getMongoDBClient(ctx context.Context, metadata *mongoDBMetadata) (*mongo.Client, error) {
uri := getMongoConnectionString(metadata)
func (m *MongoDB) getMongoDBClient(ctx context.Context) (*mongo.Client, error) {
uri := m.metadata.getMongoConnectionString()
// Set client options
clientOptions := options.Client().ApplyURI(uri)
m.isReplicaSet = clientOptions.ReplicaSet != nil
// Connect to MongoDB
ctx, cancel := context.WithTimeout(ctx, metadata.OperationTimeout)
ctx, cancel := context.WithTimeout(ctx, m.metadata.OperationTimeout)
defer cancel()
daprUserAgent := "dapr-" + logger.DaprVersion
@ -461,7 +510,7 @@ func getMongoDBClient(ctx context.Context, metadata *mongoDBMetadata) (*mongo.Cl
return client, nil
}
func getMongoDBMetaData(meta state.Metadata) (*mongoDBMetadata, error) {
func getMongoDBMetaData(meta state.Metadata) (mongoDBMetadata, error) {
m := mongoDBMetadata{
DatabaseName: defaultDatabaseName,
CollectionName: defaultCollectionName,
@ -470,16 +519,21 @@ func getMongoDBMetaData(meta state.Metadata) (*mongoDBMetadata, error) {
decodeErr := metadata.DecodeMetadata(meta.Properties, &m)
if decodeErr != nil {
return nil, decodeErr
return m, decodeErr
}
if m.ConnectionString == "" {
if len(m.Host) == 0 && len(m.Server) == 0 {
return nil, errors.New("must set 'host' or 'server' fields in metadata")
return m, errors.New("must set 'host' or 'server' fields in metadata")
}
if len(m.Host) != 0 && len(m.Server) != 0 {
return nil, errors.New("'host' or 'server' fields are mutually exclusive")
return m, errors.New("'host' or 'server' fields are mutually exclusive")
}
// Ensure that params, if set, start with ?
if m.Params != "" && !strings.HasPrefix(m.Params, "?") {
m.Params = "?" + m.Params
}
}
@ -487,11 +541,11 @@ func getMongoDBMetaData(meta state.Metadata) (*mongoDBMetadata, error) {
if val, ok := meta.Properties[operationTimeout]; ok && val != "" {
m.OperationTimeout, err = time.ParseDuration(val)
if err != nil {
return nil, errors.New("incorrect operationTimeout field from metadata")
return m, errors.New("incorrect operationTimeout field from metadata")
}
}
return &m, nil
return m, nil
}
func getWriteConcernObject(cn string) (*writeconcern.WriteConcern, error) {

View File

@ -87,7 +87,7 @@ func TestGetMongoDBMetadata(t *testing.T) {
metadata, err := getMongoDBMetaData(m)
assert.Nil(t, err)
uri := getMongoConnectionString(metadata)
uri := metadata.getMongoConnectionString()
expected := "mongodb://username:password@127.0.0.2/TestDB"
assert.Equal(t, expected, uri)
@ -106,7 +106,7 @@ func TestGetMongoDBMetadata(t *testing.T) {
metadata, err := getMongoDBMetaData(m)
assert.Nil(t, err)
uri := getMongoConnectionString(metadata)
uri := metadata.getMongoConnectionString()
expected := "mongodb://localhost:27017/TestDB"
assert.Equal(t, expected, uri)
@ -128,7 +128,7 @@ func TestGetMongoDBMetadata(t *testing.T) {
metadata, err := getMongoDBMetaData(m)
assert.Nil(t, err)
uri := getMongoConnectionString(metadata)
uri := metadata.getMongoConnectionString()
expected := "mongodb://username:password@127.0.0.2/TestDB?ssl=true"
assert.Equal(t, expected, uri)
@ -148,7 +148,7 @@ func TestGetMongoDBMetadata(t *testing.T) {
metadata, err := getMongoDBMetaData(m)
assert.Nil(t, err)
uri := getMongoConnectionString(metadata)
uri := metadata.getMongoConnectionString()
expected := "mongodb+srv://server.example.com/?ssl=true"
assert.Equal(t, expected, uri)
@ -202,7 +202,7 @@ func TestGetMongoDBMetadata(t *testing.T) {
metadata, err := getMongoDBMetaData(m)
assert.Nil(t, err)
uri := getMongoConnectionString(metadata)
uri := metadata.getMongoConnectionString()
expected := "mongodb://localhost:99999/UnchanedDB"
assert.Equal(t, expected, uri)

View File

@ -100,3 +100,15 @@ func (b *DefaultBulkStore) BulkDelete(ctx context.Context, req []DeleteRequest)
type Querier interface {
Query(ctx context.Context, req *QueryRequest) (*QueryResponse, error)
}
// ToTransactionalStateOperationSlice converts []SetRequest and []DeleteRequest to []TransactionalStateOperation.
func ToTransactionalStateOperationSlice[T SetRequest | DeleteRequest](operation OperationType, req []T) []TransactionalStateOperation {
ops := make([]TransactionalStateOperation, len(req))
for i, r := range req {
ops[i] = TransactionalStateOperation{
Operation: operation,
Request: r,
}
}
return ops
}