Merge branch 'main' into fix-json-tag

This commit is contained in:
FingerLeader 2023-01-31 19:20:58 +08:00 committed by GitHub
commit 4740601e79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 77 additions and 66 deletions

View File

@ -50,6 +50,7 @@ jobs:
elif [[ "$job" == "unit-test" ]]; then
make unit-test
elif [[ "$job" == "integration-test" ]]; then
curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | sudo apt-key add -
sudo apt-get update
sudo apt-get install -y stress-ng
make integration-test

View File

@ -10,7 +10,7 @@ GO := $(GOENV) go
CGO := $(CGOENV) go
GOTEST := TEST_USE_EXISTING_CLUSTER=false NO_PROXY="${NO_PROXY},testhost" go test
SHELL := /usr/bin/env bash
BYTEMAN_DIR := byteman-chaos-mesh-download-v4.0.18-0.12
BYTEMAN_DIR := byteman-chaos-mesh-download-v4.0.20-0.12
# Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set)
ifeq (,$(shell go env GOBIN))
@ -90,10 +90,15 @@ ifeq (,$(wildcard bin/tools/byteman))
mv ${BYTEMAN_DIR} ./bin/tools/byteman
endif
ifeq (,$(wildcard bin/tools/memStress))
curl -fsSL -o memStress_v0.3-x86_64-linux-gnu.tar.gz https://github.com/chaos-mesh/memStress/releases/download/v0.3/memStress_v0.3-${ARCH}-linux-gnu.tar.gz
tar zxvf memStress_v0.3-x86_64-linux-gnu.tar.gz
curl -fsSL -o memStress_v0.3-${ARCH}-linux-gnu.tar.gz https://github.com/chaos-mesh/memStress/releases/download/v0.3/memStress_v0.3-${ARCH}-linux-gnu.tar.gz
tar zxvf memStress_v0.3-${ARCH}-linux-gnu.tar.gz
mv memStress ./bin/tools/memStress
endif
ifeq (,$(wildcard bin/tools/tproxy))
curl -fsSL -o tproxy-${ARCH}.tar.gz https://github.com/chaos-mesh/chaos-tproxy/releases/download/v0.5.4/tproxy-${ARCH}.tar.gz
tar zxvf tproxy-${ARCH}.tar.gz
mv tproxy ./bin/tools/tproxy
endif
swagger_spec:
ifeq ($(SWAGGER),1)

View File

@ -114,7 +114,7 @@ func NewDiskFillCommand(dep fx.Option, options *core.DiskOption) *cobra.Command
Short: "fill disk",
Run: func(*cobra.Command, []string) {
options.Action = core.DiskFillAction
utils.FxNewAppWithoutLog(dep, fx.Invoke(processDiskAttack), fx.NopLogger).Run()
utils.FxNewAppWithoutLog(dep, fx.Invoke(processDiskAttack)).Run()
},
}
@ -128,7 +128,7 @@ func NewDiskFillCommand(dep fx.Option, options *core.DiskOption) *cobra.Command
"If path not provided, a temp file will be generated and deleted immediately after data filled in or allocated")
cmd.Flags().StringVarP(&options.Percent, "percent", "c", "",
"'percent' how many percent data of disk will fill in the file path")
cmd.Flags().BoolVarP(&options.FillByFAllocate, "fallocate", "f", true, "fill disk by fallocate instead of dd")
cmd.Flags().BoolVarP(&options.FillByFallocate, "fallocate", "f", true, "fill disk by fallocate instead of dd")
return cmd
}

View File

@ -103,6 +103,10 @@ func NewKafkaIOCommand(dep fx.Option, options *core.KafkaCommand) *cobra.Command
},
}
cmd.Flags().StringVarP(&options.Host, "host", "H", "localhost", "the host of kafka server")
cmd.Flags().Uint16VarP(&options.Port, "port", "P", 9092, "the port of kafka server")
cmd.Flags().StringVarP(&options.Username, "username", "u", "", "the username of kafka client")
cmd.Flags().StringVarP(&options.Password, "password", "p", "", "the password of kafka client")
cmd.Flags().StringVarP(&options.ConfigFile, "config", "c", "/etc/kafka/server.properties", "the path of server config")
cmd.Flags().BoolVarP(&options.NonReadable, "non-readable", "r", false, "make kafka cluster non-readable")
cmd.Flags().BoolVarP(&options.NonWritable, "non-writable", "w", false, "make kafka cluster non-writable")

View File

@ -184,7 +184,7 @@ func NetworkPartitionCommand(dep fx.Option, options *core.NetworkCommand) *cobra
Run: func(*cobra.Command, []string) {
options.Action = core.NetworkPartitionAction
options.CompleteDefaults()
fx.New(dep, fx.Invoke(commonNetworkAttackFunc)).Run()
utils.FxNewAppWithoutLog(dep, fx.Invoke(commonNetworkAttackFunc)).Run()
},
}

View File

@ -77,7 +77,7 @@ type DiskOption struct {
Percent string `json:"percent,omitempty"`
PayloadProcessNum uint8 `json:"payload-process-num,omitempty"`
FillByFAllocate bool `json:"fill-by-fallocate,omitempty"`
FillByFallocate bool `json:"fallocate,omitempty"`
}
func NewDiskOption() *DiskOption {
@ -86,7 +86,7 @@ func NewDiskOption() *DiskOption {
Kind: DiskAttack,
},
PayloadProcessNum: 1,
FillByFAllocate: true,
FillByFallocate: true,
}
}
@ -105,7 +105,7 @@ func (opt *DiskOption) PreProcess() (*DiskAttackConfig, error) {
return nil, err
}
if opt.Action == DiskFillAction && opt.FillByFAllocate && byteSize != 0 {
if opt.Action == DiskFillAction && opt.FillByFallocate && byteSize != 0 {
return &DiskAttackConfig{
CommonAttackConfig: opt.CommonAttackConfig,
DdOptions: nil,

View File

@ -21,6 +21,7 @@ import (
"time"
"github.com/chaos-mesh/chaos-mesh/api/v1alpha1"
"github.com/chaos-mesh/chaos-mesh/controllers/podnetworkchaos/netutils"
"github.com/chaos-mesh/chaos-mesh/pkg/chaosdaemon/pb"
"github.com/chaos-mesh/chaos-mesh/pkg/netem"
"github.com/pingcap/errors"
@ -510,10 +511,6 @@ func (n *NetworkCommand) NeedApplyIPSet() bool {
return false
}
func (n *NetworkCommand) NeedApplyIptables() bool {
return true
}
func (n *NetworkCommand) NeedApplyTC() bool {
switch n.Action {
case NetworkDelayAction, NetworkLossAction, NetworkCorruptAction, NetworkDuplicateAction, NetworkBandwidthAction:
@ -523,20 +520,20 @@ func (n *NetworkCommand) NeedApplyTC() bool {
}
}
func (n *NetworkCommand) AdditionalChain(ipset string) ([]*pb.Chain, error) {
func (n *NetworkCommand) AdditionalChain(ipset string, uid string) ([]*pb.Chain, error) {
chains := make([]*pb.Chain, 0, 2)
var toChains, fromChains []*pb.Chain
var err error
if n.Direction == "to" || n.Direction == "both" {
toChains, err = n.getAdditionalChain(ipset, "to")
toChains, err = n.getAdditionalChain(ipset, "to", uid)
if err != nil {
return nil, err
}
}
if n.Direction == "from" || n.Direction == "both" {
fromChains, err = n.getAdditionalChain(ipset, "from")
fromChains, err = n.getAdditionalChain(ipset, "from", uid)
if err != nil {
return nil, err
}
@ -548,7 +545,7 @@ func (n *NetworkCommand) AdditionalChain(ipset string) ([]*pb.Chain, error) {
return chains, nil
}
func (n *NetworkCommand) getAdditionalChain(ipset, direction string) ([]*pb.Chain, error) {
func (n *NetworkCommand) getAdditionalChain(ipset, direction string, uid string) ([]*pb.Chain, error) {
var directionStr string
var directionChain pb.Chain_Direction
if direction == "to" {
@ -562,9 +559,11 @@ func (n *NetworkCommand) getAdditionalChain(ipset, direction string) ([]*pb.Chai
}
chains := make([]*pb.Chain, 0, 2)
// The `targetLength`s in `netutils.CompressName()` are different because of
// the need to distinguish between the different chains.
if len(n.AcceptTCPFlags) > 0 {
chains = append(chains, &pb.Chain{
Name: fmt.Sprintf("%s/0", directionStr),
Name: fmt.Sprintf("%s/%s", directionStr, netutils.CompressName(uid, 19, "")),
Ipsets: []string{ipset},
Direction: directionChain,
Protocol: n.IPProtocol,
@ -575,7 +574,7 @@ func (n *NetworkCommand) getAdditionalChain(ipset, direction string) ([]*pb.Chai
if n.Action == NetworkPartitionAction {
chains = append(chains, &pb.Chain{
Name: fmt.Sprintf("%s/1", directionStr),
Name: fmt.Sprintf("%s/%s", directionStr, netutils.CompressName(uid, 20, "")),
Ipsets: []string{ipset},
Direction: directionChain,
Protocol: n.IPProtocol,
@ -598,7 +597,7 @@ func (n *NetworkCommand) NeedApplyDNSServer() bool {
}
func (n *NetworkCommand) NeedAdditionalChains() bool {
if n.Action != NetworkPartitionAction || (n.Action == NetworkDelayAction && len(n.AcceptTCPFlags) != 0) {
if n.Action == NetworkPartitionAction || (n.Action == NetworkDelayAction && len(n.AcceptTCPFlags) != 0) {
return true
}
return false

View File

@ -35,7 +35,7 @@ func TestPatitionChain(t *testing.T) {
},
chains: []*pb.Chain{
{
Name: "OUTPUT/1",
Name: "OUTPUT/3c552_e0172bc4fd046_",
Ipsets: []string{"test"},
Direction: pb.Chain_OUTPUT,
Protocol: "tcp",
@ -53,7 +53,7 @@ func TestPatitionChain(t *testing.T) {
},
chains: []*pb.Chain{
{
Name: "INPUT/1",
Name: "INPUT/3c552_e0172bc4fd046_",
Ipsets: []string{"test"},
Direction: pb.Chain_INPUT,
Protocol: "tcp",
@ -71,14 +71,14 @@ func TestPatitionChain(t *testing.T) {
},
chains: []*pb.Chain{
{
Name: "OUTPUT/1",
Name: "OUTPUT/3c552_e0172bc4fd046_",
Ipsets: []string{"test"},
Direction: pb.Chain_OUTPUT,
Protocol: "tcp",
Target: "DROP",
},
{
Name: "INPUT/1",
Name: "INPUT/3c552_e0172bc4fd046_",
Ipsets: []string{"test"},
Direction: pb.Chain_INPUT,
Protocol: "tcp",
@ -97,7 +97,7 @@ func TestPatitionChain(t *testing.T) {
},
chains: []*pb.Chain{
{
Name: "OUTPUT/0",
Name: "OUTPUT/3c552_e0172bc4fd04_",
Ipsets: []string{"test"},
Direction: pb.Chain_OUTPUT,
Protocol: "tcp",
@ -105,14 +105,14 @@ func TestPatitionChain(t *testing.T) {
Target: "ACCEPT",
},
{
Name: "OUTPUT/1",
Name: "OUTPUT/3c552_e0172bc4fd046_",
Ipsets: []string{"test"},
Direction: pb.Chain_OUTPUT,
Protocol: "tcp",
Target: "DROP",
},
{
Name: "INPUT/0",
Name: "INPUT/3c552_e0172bc4fd04_",
Ipsets: []string{"test"},
Direction: pb.Chain_INPUT,
Protocol: "tcp",
@ -120,7 +120,7 @@ func TestPatitionChain(t *testing.T) {
Target: "ACCEPT",
},
{
Name: "INPUT/1",
Name: "INPUT/3c552_e0172bc4fd046_",
Ipsets: []string{"test"},
Direction: pb.Chain_INPUT,
Protocol: "tcp",
@ -130,7 +130,7 @@ func TestPatitionChain(t *testing.T) {
},
}
for _, tc := range testCases {
chains, err := tc.cmd.AdditionalChain("test")
chains, err := tc.cmd.AdditionalChain("test", "3c5528e1-4c32-4f80-983c-913ad7e860e2")
if err != nil {
t.Errorf("failed to partition chain: %v", err)
}

View File

@ -74,16 +74,13 @@ func (networkAttack) Attack(options core.AttackConfig, env Environment) (err err
}
}
if attack.NeedApplyIptables() {
if err = env.Chaos.applyIptables(attack, ipsetName, env.AttackUid); err != nil {
return perrors.WithStack(err)
}
if err = env.Chaos.applyIptables(attack, ipsetName, env.AttackUid); err != nil {
return perrors.WithStack(err)
}
if attack.NeedApplyTC() {
if err = env.Chaos.applyTC(attack, ipsetName, env.AttackUid); err != nil {
return perrors.WithStack(err)
}
// Because some tcs add filter iptables which will not be stored in the DB, we must re-apply these tcs to add the iptables.
if err = env.Chaos.applyTC(attack, ipsetName, env.AttackUid); err != nil {
return perrors.WithStack(err)
}
case core.NetworkNICDownAction:
@ -140,9 +137,11 @@ func (s *Server) applyIptables(attack *core.NetworkCommand, ipset, uid string) e
return perrors.WithStack(err)
}
chains := core.IptablesRuleList(iptables).ToChains()
var newChains []*pb.Chain
// Presently, only partition and delay with `accept-tcp-flags` need to add additional chains
if attack.NeedAdditionalChains() {
newChains, err := attack.AdditionalChain(ipset)
newChains, err = attack.AdditionalChain(ipset, uid)
if err != nil {
return perrors.WithStack(err)
}
@ -156,15 +155,17 @@ func (s *Server) applyIptables(attack *core.NetworkCommand, ipset, uid string) e
return perrors.WithStack(err)
}
// TODO: cwen0
//if err := s.iptablesRule.Set(context.Background(), &core.IptablesRule{
// Name: newChain.Name,
// IPSets: strings.Join(newChain.Ipsets, ","),
// Direction: pb.Chain_Direction_name[int32(newChain.Direction)],
// Experiment: uid,
//}); err != nil {
// return perrors.WithStack(err)
//}
for _, newChain := range newChains {
if err := s.iptablesRule.Set(context.Background(), &core.IptablesRule{
Name: newChain.Name,
IPSets: strings.Join(newChain.Ipsets, ","),
Direction: pb.Chain_Direction_name[int32(newChain.Direction)],
Protocol: newChain.Protocol,
Experiment: uid,
}); err != nil {
return perrors.WithStack(err)
}
}
return nil
}
@ -180,17 +181,24 @@ func (s *Server) applyTC(attack *core.NetworkCommand, ipset string, uid string)
return perrors.WithStack(err)
}
newTC, err := attack.ToTC(ipset)
if err != nil {
return perrors.WithStack(err)
}
var newTC *pb.Tc
if attack.NeedApplyTC() {
newTC, err = attack.ToTC(ipset)
if err != nil {
return perrors.WithStack(err)
}
tcs = append(tcs, newTC)
tcs = append(tcs, newTC)
}
if _, err := s.svr.SetTcs(context.Background(), &pb.TcsRequest{Tcs: tcs, EnterNS: false}); err != nil {
return perrors.WithStack(err)
}
if !attack.NeedApplyTC() {
return nil
}
tc := &core.TcParameter{
Device: attack.Device,
}
@ -380,22 +388,16 @@ func (networkAttack) Recover(exp core.Experiment, env Environment) error {
case core.NetworkPortOccupiedAction:
return env.Chaos.recoverPortOccupied(attack, env.AttackUid)
case core.NetworkDelayAction, core.NetworkLossAction, core.NetworkCorruptAction, core.NetworkDuplicateAction, core.NetworkPartitionAction, core.NetworkBandwidthAction:
if attack.NeedApplyIPSet() {
if err := env.Chaos.recoverIPSet(env.AttackUid); err != nil {
return perrors.WithStack(err)
}
if err := env.Chaos.recoverIPSet(env.AttackUid); err != nil {
return perrors.WithStack(err)
}
if attack.NeedApplyIptables() {
if err := env.Chaos.recoverIptables(env.AttackUid); err != nil {
return perrors.WithStack(err)
}
if err := env.Chaos.recoverIptables(env.AttackUid); err != nil {
return perrors.WithStack(err)
}
if attack.NeedApplyTC() {
if err := env.Chaos.recoverTC(env.AttackUid, attack.Device); err != nil {
return perrors.WithStack(err)
}
if err := env.Chaos.recoverTC(env.AttackUid, attack.Device); err != nil {
return perrors.WithStack(err)
}
case core.NetworkNICDownAction:
return env.Chaos.recoverNICDown(attack)

View File

@ -22,11 +22,11 @@ bin_path=../../../bin
echo "download byteman example"
if [[ ! (-e byteman-example) ]]; then
git clone https://github.com/WangXiangUSTC/byteman-example.git
git clone https://github.com/chaos-mesh/byteman-example.git
fi
echo "download byteman && set environment variable"
byteman_dir="byteman-chaos-mesh-download-v4.0.18-0.12"
byteman_dir="byteman-chaos-mesh-download-v4.0.20-0.12"
if [[ ! (-e ${byteman_dir}.tar.gz) ]]; then
curl -fsSL -o ${byteman_dir}.tar.gz https://mirrors.chaos-mesh.org/${byteman_dir}.tar.gz
tar zxvf ${byteman_dir}.tar.gz