diff --git a/cmd/attack/kafka.go b/cmd/attack/kafka.go index b8ddc25..d1efcbd 100644 --- a/cmd/attack/kafka.go +++ b/cmd/attack/kafka.go @@ -67,6 +67,7 @@ func NewKafkaFillCommand(dep fx.Option, options *core.KafkaCommand) *cobra.Comma cmd.Flags().Uint16VarP(&options.Port, "port", "P", 9092, "the port of kafka server") cmd.Flags().StringVarP(&options.Username, "username", "u", "", "the username of kafka client") cmd.Flags().StringVarP(&options.Password, "password", "p", "", "the password of kafka client") + cmd.Flags().StringVarP(&options.AuthMechanism, "auth-mechanism", "a", "sasl/plan", "the authentication mechanism of kafka, supported value: sasl/plain, sasl/scram-sha-256, sasl/scram-sha-512") cmd.Flags().UintVarP(&options.MessageSize, "size", "s", 4*1024, "the size of each message") cmd.Flags().Uint64VarP(&options.MaxBytes, "max-bytes", "m", 1<<34, "the max bytes to fill") cmd.Flags().StringVarP(&options.ReloadCommand, "reload-cmd", "r", "", "the command to reload kafka config") @@ -87,6 +88,7 @@ func NewKafkaFloodCommand(dep fx.Option, options *core.KafkaCommand) *cobra.Comm cmd.Flags().Uint16VarP(&options.Port, "port", "P", 9092, "the port of kafka server") cmd.Flags().StringVarP(&options.Username, "username", "u", "", "the username of kafka client") cmd.Flags().StringVarP(&options.Password, "password", "p", "", "the password of kafka client") + cmd.Flags().StringVarP(&options.AuthMechanism, "auth-mechanism", "a", "sasl/plan", "the authentication mechanism of kafka, supported value: sasl/plain, sasl/scram-sha-256, sasl/scram-sha-512") cmd.Flags().UintVarP(&options.MessageSize, "size", "s", 1024, "the size of each message") cmd.Flags().UintVarP(&options.Threads, "threads", "t", 100, "the numbers of worker threads") return cmd @@ -107,6 +109,7 @@ func NewKafkaIOCommand(dep fx.Option, options *core.KafkaCommand) *cobra.Command cmd.Flags().Uint16VarP(&options.Port, "port", "P", 9092, "the port of kafka server") cmd.Flags().StringVarP(&options.Username, "username", "u", "", "the username of kafka client") cmd.Flags().StringVarP(&options.Password, "password", "p", "", "the password of kafka client") + cmd.Flags().StringVarP(&options.AuthMechanism, "auth-mechanism", "a", "sasl/plan", "the authentication mechanism of kafka, supported value: sasl/plain, sasl/scram-sha-256, sasl/scram-sha-512") cmd.Flags().StringVarP(&options.ConfigFile, "config", "c", "/etc/kafka/server.properties", "the path of server config") cmd.Flags().BoolVarP(&options.NonReadable, "non-readable", "r", false, "make kafka cluster non-readable") cmd.Flags().BoolVarP(&options.NonWritable, "non-writable", "w", false, "make kafka cluster non-writable") diff --git a/pkg/core/disk_test.go b/pkg/core/disk_test.go index a57e42e..cc6624f 100644 --- a/pkg/core/disk_test.go +++ b/pkg/core/disk_test.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/core/jvm.go b/pkg/core/jvm.go index 3d10925..17505e1 100644 --- a/pkg/core/jvm.go +++ b/pkg/core/jvm.go @@ -140,8 +140,9 @@ type JVMStressSpec struct { // JVMMySQLSpec is the specification of MySQL fault injection in JVM // only when SQL match the Database, Table and SQLType, chaosd will inject fault // for examle: -// SQL is "select * from test.t1", -// only when ((Database == "test" || Database == "") && (Table == "t1" || Table == "") && (SQLType == "select" || SQLType == "")) is true, chaosd will inject fault +// +// SQL is "select * from test.t1", +// only when ((Database == "test" || Database == "") && (Table == "t1" || Table == "") && (SQLType == "select" || SQLType == "")) is true, chaosd will inject fault type JVMMySQLSpec struct { // the version of mysql-connector-java, only support 5.X.X(set to 5) and 8.X.X(set to 8) now MySQLConnectorVersion string `json:"mysql-connector-version,omitempty"` diff --git a/pkg/core/kafka.go b/pkg/core/kafka.go index 1eaaba1..2254e87 100644 --- a/pkg/core/kafka.go +++ b/pkg/core/kafka.go @@ -24,9 +24,18 @@ type KafkaAttackAction string const ( // Kafka actions - KafkaFillAction = "fill" - KafkaFloodAction = "flood" - KafkaIOAction = "io" + KafkaFillAction KafkaAttackAction = "fill" + KafkaFloodAction = "flood" + KafkaIOAction = "io" +) + +type KafkaAuthMechanism string + +const ( + SaslPlain KafkaAuthMechanism = "sasl/plain" + SaslScream256 = "sasl/scram-sha-256" + SaslScram512 = "sasl/scram-sha-512" + AuthMechanismEmpty = "" ) var _ AttackConfig = &KafkaCommand{} @@ -39,12 +48,13 @@ type KafkaCommand struct { Topic string `json:"topic,omitempty"` // options for fill and flood attack - Host string `json:"host,omitempty"` - Port uint16 `json:"port,omitempty"` - Username string `json:"username,omitempty"` - Password string `json:"password,omitempty"` - MessageSize uint `json:"messageSize,omitempty"` - MaxBytes uint64 `json:"maxBytes,omitempty"` + Host string `json:"host,omitempty"` + Port uint16 `json:"port,omitempty"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` + AuthMechanism string `json:"authMechanism,omitempty"` + MessageSize uint `json:"messageSize,omitempty"` + MaxBytes uint64 `json:"maxBytes,omitempty"` // options for fill attack ReloadCommand string `json:"reloadCommand,omitempty"` @@ -69,6 +79,10 @@ func (c *KafkaCommand) Validate() error { return errors.New("topic is required") } + if err := c.validateAuthMechanism(); err != nil { + return err + } + switch c.Action { case KafkaFillAction: return c.validateFillAction() @@ -81,6 +95,25 @@ func (c *KafkaCommand) Validate() error { } } +func (c *KafkaCommand) validateAuthMechanism() error { + if c.Username != "" && c.AuthMechanism == "" { + return errors.New("auth mechanism is required") + } + + switch KafkaAuthMechanism(c.AuthMechanism) { + case SaslPlain: + fallthrough + case SaslScram512: + fallthrough + case SaslScream256: + fallthrough + case AuthMechanismEmpty: + return nil + default: + return errors.Errorf("invalid auth mechanism: %s", c.AuthMechanism) + } +} + func (c *KafkaCommand) validateDSNAndMessageSize() error { if c.Host == "" { return errors.New("host is required") diff --git a/pkg/server/chaosd/disk_test.go b/pkg/server/chaosd/disk_test.go index 7aa407d..d0c36bd 100644 --- a/pkg/server/chaosd/disk_test.go +++ b/pkg/server/chaosd/disk_test.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/server/chaosd/kafka.go b/pkg/server/chaosd/kafka.go index 21ff790..db2bfaf 100644 --- a/pkg/server/chaosd/kafka.go +++ b/pkg/server/chaosd/kafka.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/log" perr "github.com/pkg/errors" client "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" "github.com/segmentio/kafka-go/sasl/scram" "go.uber.org/zap" @@ -77,7 +78,19 @@ func newDialer(attack *core.KafkaCommand) (dialer *client.Dialer, err error) { DualStack: true, } if attack.Username != "" { - dialer.SASLMechanism, err = scram.Mechanism(scram.SHA512, attack.Username, attack.Password) + switch core.KafkaAuthMechanism(attack.AuthMechanism) { + case core.SaslPlain: + dialer.SASLMechanism = plain.Mechanism{ + Username: attack.Username, + Password: attack.Password, + } + case core.SaslScream256: + dialer.SASLMechanism, err = scram.Mechanism(scram.SHA256, attack.Username, attack.Password) + case core.SaslScram512: + dialer.SASLMechanism, err = scram.Mechanism(scram.SHA512, attack.Username, attack.Password) + default: + return nil, errors.Errorf("invalid auth mechanism: %s", attack.AuthMechanism) + } if err != nil { return nil, perr.Wrap(err, "create scram mechanism") } diff --git a/pkg/utils/command_test.go b/pkg/utils/command_test.go index ae9d388..743f0e7 100644 --- a/pkg/utils/command_test.go +++ b/pkg/utils/command_test.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS,