components-contrib/common/component/redis/settings.go

166 lines
5.7 KiB
Go

/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package redis
import (
"crypto/tls"
"fmt"
"strconv"
"time"
"github.com/dapr/kit/config"
)
type Settings struct {
// The Redis host
Host string `mapstructure:"redisHost"`
// The Redis password
Password string `mapstructure:"redisPassword"`
// The Redis username
Username string `mapstructure:"redisUsername"`
// Database to be selected after connecting to the server.
DB int `mapstructure:"redisDB"`
// The redis type node or cluster
RedisType string `mapstructure:"redisType"`
// Maximum number of retries before giving up.
// A value of -1 (not 0) disables retries
// Default is 3 retries
RedisMaxRetries int `mapstructure:"redisMaxRetries"`
// Minimum backoff between each retry.
// Default is 8 milliseconds; -1 disables backoff.
RedisMinRetryInterval Duration `mapstructure:"redisMinRetryInterval"`
// Maximum backoff between each retry.
// Default is 512 milliseconds; -1 disables backoff.
RedisMaxRetryInterval Duration `mapstructure:"redisMaxRetryInterval"`
// Dial timeout for establishing new connections.
DialTimeout Duration `mapstructure:"dialTimeout"`
// Timeout for socket reads. If reached, commands will fail
// with a timeout instead of blocking. Use value -1 for no timeout and 0 for default.
ReadTimeout Duration `mapstructure:"readTimeout"`
// Timeout for socket writes. If reached, commands will fail
WriteTimeout Duration `mapstructure:"writeTimeout"`
// Maximum number of socket connections.
PoolSize int `mapstructure:"poolSize"`
// Minimum number of idle connections which is useful when establishing
// new connection is slow.
MinIdleConns int `mapstructure:"minIdleConns"`
// Connection age at which client retires (closes) the connection.
// Default is to not close aged connections.
MaxConnAge Duration `mapstructure:"maxConnAge"`
// Amount of time client waits for connection if all connections
// are busy before returning an error.
// Default is ReadTimeout + 1 second.
PoolTimeout Duration `mapstructure:"poolTimeout"`
// Amount of time after which client closes idle connections.
// Should be less than server's timeout.
// Default is 5 minutes. -1 disables idle timeout check.
IdleTimeout Duration `mapstructure:"idleTimeout"`
// Frequency of idle checks made by idle connections reaper.
// Default is 1 minute. -1 disables idle connections reaper,
// but idle connections are still discarded by the client
// if IdleTimeout is set.
IdleCheckFrequency Duration `mapstructure:"idleCheckFrequency"`
// The master name
SentinelMasterName string `mapstructure:"sentinelMasterName"`
// Use Redis Sentinel for automatic failover.
Failover bool `mapstructure:"failover"`
// A flag to enables TLS by setting InsecureSkipVerify to true
EnableTLS bool `mapstructure:"enableTLS"`
// Client certificate and key
ClientCert string `mapstructure:"clientCert"`
ClientKey string `mapstructure:"clientKey"`
// == state only properties ==
TTLInSeconds *int `mapstructure:"ttlInSeconds" mdonly:"state"`
QueryIndexes string `mapstructure:"queryIndexes" mdonly:"state"`
// == pubsub only properties ==
// The consumer identifier
ConsumerID string `mapstructure:"consumerID" mdonly:"pubsub"`
// The interval between checking for pending messages to redelivery (0 disables redelivery)
RedeliverInterval time.Duration `mapstructure:"-" mdonly:"pubsub"`
// The amount time a message must be pending before attempting to redeliver it (0 disables redelivery)
ProcessingTimeout time.Duration `mapstructure:"processingTimeout" mdonly:"pubsub"`
// The size of the message queue for processing
QueueDepth uint `mapstructure:"queueDepth" mdonly:"pubsub"`
// The number of concurrent workers that are processing messages
Concurrency uint `mapstructure:"concurrency" mdonly:"pubsub"`
// The max len of stream
MaxLenApprox int64 `mapstructure:"maxLenApprox" mdonly:"pubsub"`
// The TTL of stream entries
StreamTTL time.Duration `mapstructure:"streamTTL" mdonly:"pubsub"`
// EntraID / AzureAD Authentication based on the shared code which essentially uses the DefaultAzureCredential
// from the official Azure Identity SDK for Go
UseEntraID bool `mapstructure:"useEntraID" mapstructurealiases:"useAzureAD"`
}
func (s *Settings) Decode(in interface{}) error {
if err := config.Decode(in, s); err != nil {
return fmt.Errorf("decode failed. %w", err)
}
return nil
}
func (s *Settings) SetCertificate(fn func(cert *tls.Certificate)) error {
if s.ClientCert == "" || s.ClientKey == "" {
return nil
}
cert, err := tls.X509KeyPair([]byte(s.ClientCert), []byte(s.ClientKey))
if err != nil {
return err
}
fn(&cert)
return nil
}
func (s *Settings) GetMinID(now time.Time) string {
// If StreamTTL is not set, return empty string (no trimming)
if s.StreamTTL == 0 {
return ""
}
return fmt.Sprintf("%d-1", now.Add(-s.StreamTTL).UnixMilli())
}
type Duration time.Duration
func (r *Duration) DecodeString(value string) error {
if val, err := strconv.Atoi(value); err == nil {
if val < 0 {
*r = Duration(val)
return nil
}
*r = Duration(time.Duration(val) * time.Millisecond)
return nil
}
// Convert it by parsing
d, err := time.ParseDuration(value)
if err != nil {
return err
}
*r = Duration(d)
return nil
}