feat: support redis sentinal (#1910)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-12-09 17:25:20 +08:00
parent b2704d2e85
commit f120c8778b
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
10 changed files with 68 additions and 51 deletions

View File

@ -35,11 +35,12 @@ import (
) )
type Config struct { type Config struct {
Addrs []string Addrs []string
Username string MasterName string
Password string Username string
BrokerDB int Password string
BackendDB int BrokerDB int
BackendDB int
} }
type Job struct { type Job struct {
@ -53,19 +54,21 @@ func New(cfg *Config, queue Queue) (*Job, error) {
machineryv1log.Set(&MachineryLogger{}) machineryv1log.Set(&MachineryLogger{})
if err := ping(&redis.UniversalOptions{ if err := ping(&redis.UniversalOptions{
Addrs: cfg.Addrs, Addrs: cfg.Addrs,
Username: cfg.Username, MasterName: cfg.MasterName,
Password: cfg.Password, Username: cfg.Username,
DB: cfg.BrokerDB, Password: cfg.Password,
DB: cfg.BrokerDB,
}); err != nil { }); err != nil {
return nil, err return nil, err
} }
if err := ping(&redis.UniversalOptions{ if err := ping(&redis.UniversalOptions{
Addrs: cfg.Addrs, Addrs: cfg.Addrs,
Username: cfg.Username, MasterName: cfg.MasterName,
Password: cfg.Password, Username: cfg.Username,
DB: cfg.BackendDB, Password: cfg.Password,
DB: cfg.BackendDB,
}); err != nil { }); err != nil {
return nil, err return nil, err
} }
@ -76,6 +79,7 @@ func New(cfg *Config, queue Queue) (*Job, error) {
ResultBackend: fmt.Sprintf("redis://%s:%s@%s/%d", cfg.Username, cfg.Password, strings.Join(cfg.Addrs, ","), cfg.BackendDB), ResultBackend: fmt.Sprintf("redis://%s:%s@%s/%d", cfg.Username, cfg.Password, strings.Join(cfg.Addrs, ","), cfg.BackendDB),
ResultsExpireIn: DefaultResultsExpireIn, ResultsExpireIn: DefaultResultsExpireIn,
Redis: &machineryv1config.RedisConfig{ Redis: &machineryv1config.RedisConfig{
MasterName: cfg.MasterName,
MaxIdle: DefaultRedisMaxIdle, MaxIdle: DefaultRedisMaxIdle,
IdleTimeout: DefaultRedisIdleTimeout, IdleTimeout: DefaultRedisIdleTimeout,
ReadTimeout: DefaultRedisReadTimeout, ReadTimeout: DefaultRedisReadTimeout,

View File

@ -167,22 +167,25 @@ type RedisConfig struct {
// DEPRECATED: Please use the `addrs` field instead. // DEPRECATED: Please use the `addrs` field instead.
Port int `yaml:"port" mapstructure:"port"` Port int `yaml:"port" mapstructure:"port"`
// Server addresses. // Addrs is server addresses.
Addrs []string `yaml:"addrs" mapstructure:"addrs"` Addrs []string `yaml:"addrs" mapstructure:"addrs"`
// Server username. // MasterName is the sentinel master name.
MasterName string `yaml:"masterName" mapstructure:"masterName"`
// Username is server username.
Username string `yaml:"username" mapstructure:"username"` Username string `yaml:"username" mapstructure:"username"`
// Server password. // Password is server password.
Password string `yaml:"password" mapstructure:"password"` Password string `yaml:"password" mapstructure:"password"`
// Server cache DB name. // DB is server cache DB name.
DB int `yaml:"db" mapstructure:"db"` DB int `yaml:"db" mapstructure:"db"`
// Server broker DB name. // BrokerDB is server broker DB name.
BrokerDB int `yaml:"brokerDB" mapstructure:"brokerDB"` BrokerDB int `yaml:"brokerDB" mapstructure:"brokerDB"`
// Server backend DB name. // BackendDB is server backend DB name.
BackendDB int `yaml:"backendDB" mapstructure:"backendDB"` BackendDB int `yaml:"backendDB" mapstructure:"backendDB"`
} }

View File

@ -77,13 +77,14 @@ func TestManagerConfig_Load(t *testing.T) {
Migrate: true, Migrate: true,
}, },
Redis: RedisConfig{ Redis: RedisConfig{
Host: "bar", Host: "bar",
Password: "bar", Password: "bar",
Addrs: []string{"foo", "bar"}, Addrs: []string{"foo", "bar"},
Port: 6379, MasterName: "baz",
DB: 0, Port: 6379,
BrokerDB: 1, DB: 0,
BackendDB: 2, BrokerDB: 1,
BackendDB: 2,
}, },
}, },
Cache: CacheConfig{ Cache: CacheConfig{

View File

@ -39,6 +39,7 @@ database:
migrate: true migrate: true
redis: redis:
addrs: [foo, bar] addrs: [foo, bar]
masterName: "baz"
password: bar password: bar
host: bar host: bar
port: 6379 port: 6379

View File

@ -27,10 +27,11 @@ import (
func NewRedis(cfg *config.RedisConfig) (redis.UniversalClient, error) { func NewRedis(cfg *config.RedisConfig) (redis.UniversalClient, error) {
redis.SetLogger(&redisLogger{}) redis.SetLogger(&redisLogger{})
client := redis.NewUniversalClient(&redis.UniversalOptions{ client := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: cfg.Addrs, Addrs: cfg.Addrs,
DB: cfg.DB, MasterName: cfg.MasterName,
Username: cfg.Username, DB: cfg.DB,
Password: cfg.Password, Username: cfg.Username,
Password: cfg.Password,
}) })
if err := client.Ping(context.Background()).Err(); err != nil { if err := client.Ping(context.Background()).Err(); err != nil {

View File

@ -28,11 +28,12 @@ type Job struct {
func New(cfg *config.Config) (*Job, error) { func New(cfg *config.Config) (*Job, error) {
j, err := internaljob.New(&internaljob.Config{ j, err := internaljob.New(&internaljob.Config{
Addrs: cfg.Database.Redis.Addrs, Addrs: cfg.Database.Redis.Addrs,
Username: cfg.Database.Redis.Username, MasterName: cfg.Database.Redis.MasterName,
Password: cfg.Database.Redis.Password, Username: cfg.Database.Redis.Username,
BrokerDB: cfg.Database.Redis.BrokerDB, Password: cfg.Database.Redis.Password,
BackendDB: cfg.Database.Redis.BackendDB, BrokerDB: cfg.Database.Redis.BrokerDB,
BackendDB: cfg.Database.Redis.BackendDB,
}, internaljob.GlobalQueue) }, internaljob.GlobalQueue)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -235,19 +235,22 @@ type RedisConfig struct {
// DEPRECATED: Please use the `addrs` field instead. // DEPRECATED: Please use the `addrs` field instead.
Port int `yaml:"port" mapstructure:"port"` Port int `yaml:"port" mapstructure:"port"`
// Server addresses. // Addrs is server addresses.
Addrs []string `yaml:"addrs" mapstructure:"addrs"` Addrs []string `yaml:"addrs" mapstructure:"addrs"`
// Server username. // MasterName is the sentinel master name.
MasterName string `yaml:"masterName" mapstructure:"masterName"`
// Username is server username.
Username string `yaml:"username" mapstructure:"username"` Username string `yaml:"username" mapstructure:"username"`
// Server password. // Password is server password.
Password string `yaml:"password" mapstructure:"password"` Password string `yaml:"password" mapstructure:"password"`
// Broker database name. // BrokerDB is broker database name.
BrokerDB int `yaml:"brokerDB" mapstructure:"brokerDB"` BrokerDB int `yaml:"brokerDB" mapstructure:"brokerDB"`
// Backend database name. // BackendDB is backend database name.
BackendDB int `yaml:"backendDB" mapstructure:"backendDB"` BackendDB int `yaml:"backendDB" mapstructure:"backendDB"`
} }

View File

@ -85,12 +85,13 @@ func TestConfig_Load(t *testing.T) {
SchedulerWorkerNum: 1, SchedulerWorkerNum: 1,
LocalWorkerNum: 5, LocalWorkerNum: 5,
Redis: RedisConfig{ Redis: RedisConfig{
Addrs: []string{"foo", "bar"}, Addrs: []string{"foo", "bar"},
Host: "127.0.0.1", MasterName: "baz",
Port: 6379, Host: "127.0.0.1",
Password: "foo", Port: 6379,
BrokerDB: 1, Password: "foo",
BackendDB: 2, BrokerDB: 1,
BackendDB: 2,
}, },
}, },
Storage: StorageConfig{ Storage: StorageConfig{

View File

@ -51,6 +51,7 @@ job:
localWorkerNum: 5 localWorkerNum: 5
redis: redis:
addrs: ["foo", "bar"] addrs: ["foo", "bar"]
masterName: "baz"
host: 127.0.0.1 host: 127.0.0.1
port: 6379 port: 6379
password: foo password: foo

View File

@ -57,11 +57,12 @@ type job struct {
func New(cfg *config.Config, resource resource.Resource) (Job, error) { func New(cfg *config.Config, resource resource.Resource) (Job, error) {
redisConfig := &internaljob.Config{ redisConfig := &internaljob.Config{
Addrs: cfg.Job.Redis.Addrs, Addrs: cfg.Job.Redis.Addrs,
Username: cfg.Job.Redis.Username, MasterName: cfg.Job.Redis.MasterName,
Password: cfg.Job.Redis.Password, Username: cfg.Job.Redis.Username,
BrokerDB: cfg.Job.Redis.BrokerDB, Password: cfg.Job.Redis.Password,
BackendDB: cfg.Job.Redis.BackendDB, BrokerDB: cfg.Job.Redis.BrokerDB,
BackendDB: cfg.Job.Redis.BackendDB,
} }
globalJob, err := internaljob.New(redisConfig, internaljob.GlobalQueue) globalJob, err := internaljob.New(redisConfig, internaljob.GlobalQueue)