kafka attack: support more auth mechanism

Signed-off-by: cwen0 <cwenyin0@gmail.com>
This commit is contained in:
cwen0 2023-01-31 21:17:39 +08:00
parent 3a6efb7db2
commit e837c8a068
7 changed files with 65 additions and 15 deletions

View File

@ -67,6 +67,7 @@ func NewKafkaFillCommand(dep fx.Option, options *core.KafkaCommand) *cobra.Comma
cmd.Flags().Uint16VarP(&options.Port, "port", "P", 9092, "the port of kafka server")
cmd.Flags().StringVarP(&options.Username, "username", "u", "", "the username of kafka client")
cmd.Flags().StringVarP(&options.Password, "password", "p", "", "the password of kafka client")
cmd.Flags().StringVarP(&options.AuthMechanism, "auth-mechanism", "a", "sasl/plan", "the authentication mechanism of kafka, supported value: sasl/plain, sasl/scram-sha-256, sasl/scram-sha-512")
cmd.Flags().UintVarP(&options.MessageSize, "size", "s", 4*1024, "the size of each message")
cmd.Flags().Uint64VarP(&options.MaxBytes, "max-bytes", "m", 1<<34, "the max bytes to fill")
cmd.Flags().StringVarP(&options.ReloadCommand, "reload-cmd", "r", "", "the command to reload kafka config")
@ -87,6 +88,7 @@ func NewKafkaFloodCommand(dep fx.Option, options *core.KafkaCommand) *cobra.Comm
cmd.Flags().Uint16VarP(&options.Port, "port", "P", 9092, "the port of kafka server")
cmd.Flags().StringVarP(&options.Username, "username", "u", "", "the username of kafka client")
cmd.Flags().StringVarP(&options.Password, "password", "p", "", "the password of kafka client")
cmd.Flags().StringVarP(&options.AuthMechanism, "auth-mechanism", "a", "sasl/plan", "the authentication mechanism of kafka, supported value: sasl/plain, sasl/scram-sha-256, sasl/scram-sha-512")
cmd.Flags().UintVarP(&options.MessageSize, "size", "s", 1024, "the size of each message")
cmd.Flags().UintVarP(&options.Threads, "threads", "t", 100, "the numbers of worker threads")
return cmd
@ -107,6 +109,7 @@ func NewKafkaIOCommand(dep fx.Option, options *core.KafkaCommand) *cobra.Command
cmd.Flags().Uint16VarP(&options.Port, "port", "P", 9092, "the port of kafka server")
cmd.Flags().StringVarP(&options.Username, "username", "u", "", "the username of kafka client")
cmd.Flags().StringVarP(&options.Password, "password", "p", "", "the password of kafka client")
cmd.Flags().StringVarP(&options.AuthMechanism, "auth-mechanism", "a", "sasl/plan", "the authentication mechanism of kafka, supported value: sasl/plain, sasl/scram-sha-256, sasl/scram-sha-512")
cmd.Flags().StringVarP(&options.ConfigFile, "config", "c", "/etc/kafka/server.properties", "the path of server config")
cmd.Flags().BoolVarP(&options.NonReadable, "non-readable", "r", false, "make kafka cluster non-readable")
cmd.Flags().BoolVarP(&options.NonWritable, "non-writable", "w", false, "make kafka cluster non-writable")

View File

@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,

View File

@ -140,8 +140,9 @@ type JVMStressSpec struct {
// JVMMySQLSpec is the specification of MySQL fault injection in JVM
// only when SQL match the Database, Table and SQLType, chaosd will inject fault
// for examle:
// SQL is "select * from test.t1",
// only when ((Database == "test" || Database == "") && (Table == "t1" || Table == "") && (SQLType == "select" || SQLType == "")) is true, chaosd will inject fault
//
// SQL is "select * from test.t1",
// only when ((Database == "test" || Database == "") && (Table == "t1" || Table == "") && (SQLType == "select" || SQLType == "")) is true, chaosd will inject fault
type JVMMySQLSpec struct {
// the version of mysql-connector-java, only support 5.X.X(set to 5) and 8.X.X(set to 8) now
MySQLConnectorVersion string `json:"mysql-connector-version,omitempty"`

View File

@ -24,9 +24,18 @@ type KafkaAttackAction string
const (
// Kafka actions
KafkaFillAction = "fill"
KafkaFloodAction = "flood"
KafkaIOAction = "io"
KafkaFillAction KafkaAttackAction = "fill"
KafkaFloodAction = "flood"
KafkaIOAction = "io"
)
type KafkaAuthMechanism string
const (
SaslPlain KafkaAuthMechanism = "sasl/plain"
SaslScream256 = "sasl/scram-sha-256"
SaslScram512 = "sasl/scram-sha-512"
AuthMechanismEmpty = ""
)
var _ AttackConfig = &KafkaCommand{}
@ -39,12 +48,13 @@ type KafkaCommand struct {
Topic string `json:"topic,omitempty"`
// options for fill and flood attack
Host string `json:"host,omitempty"`
Port uint16 `json:"port,omitempty"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
MessageSize uint `json:"messageSize,omitempty"`
MaxBytes uint64 `json:"maxBytes,omitempty"`
Host string `json:"host,omitempty"`
Port uint16 `json:"port,omitempty"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
AuthMechanism string `json:"authMechanism,omitempty"`
MessageSize uint `json:"messageSize,omitempty"`
MaxBytes uint64 `json:"maxBytes,omitempty"`
// options for fill attack
ReloadCommand string `json:"reloadCommand,omitempty"`
@ -69,6 +79,10 @@ func (c *KafkaCommand) Validate() error {
return errors.New("topic is required")
}
if err := c.validateAuthMechanism(); err != nil {
return err
}
switch c.Action {
case KafkaFillAction:
return c.validateFillAction()
@ -81,6 +95,25 @@ func (c *KafkaCommand) Validate() error {
}
}
func (c *KafkaCommand) validateAuthMechanism() error {
if c.Username != "" && c.AuthMechanism == "" {
return errors.New("auth mechanism is required")
}
switch KafkaAuthMechanism(c.AuthMechanism) {
case SaslPlain:
fallthrough
case SaslScram512:
fallthrough
case SaslScream256:
fallthrough
case AuthMechanismEmpty:
return nil
default:
return errors.Errorf("invalid auth mechanism: %s", c.AuthMechanism)
}
}
func (c *KafkaCommand) validateDSNAndMessageSize() error {
if c.Host == "" {
return errors.New("host is required")

View File

@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,

View File

@ -32,6 +32,7 @@ import (
"github.com/pingcap/log"
perr "github.com/pkg/errors"
client "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
"go.uber.org/zap"
@ -77,7 +78,19 @@ func newDialer(attack *core.KafkaCommand) (dialer *client.Dialer, err error) {
DualStack: true,
}
if attack.Username != "" {
dialer.SASLMechanism, err = scram.Mechanism(scram.SHA512, attack.Username, attack.Password)
switch core.KafkaAuthMechanism(attack.AuthMechanism) {
case core.SaslPlain:
dialer.SASLMechanism = plain.Mechanism{
Username: attack.Username,
Password: attack.Password,
}
case core.SaslScream256:
dialer.SASLMechanism, err = scram.Mechanism(scram.SHA256, attack.Username, attack.Password)
case core.SaslScram512:
dialer.SASLMechanism, err = scram.Mechanism(scram.SHA512, attack.Username, attack.Password)
default:
return nil, errors.Errorf("invalid auth mechanism: %s", attack.AuthMechanism)
}
if err != nil {
return nil, perr.Wrap(err, "create scram mechanism")
}

View File

@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,