Fix issue about disk attack cannot work well in chaos-mesh:physical machine chaos (#236)

* add default value of PayloadProcessNum&FillByFAllocate

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* recover unit-test

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* recover unit-test

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* Enable fill | write in dir.

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* Enable fill | write in dir.

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* Enable fill | write in dir.

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* fix lint

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* fix log

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* ignore some unhandled errors & fix unexported returned value in exported function

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* fix deprecated function

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* complete async disk attack in server side

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* better input args type in command pool

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* complete test for command pool

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* add license

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* add license

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* fix comment

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* add output channel to pools

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* manually test & fix disk attack

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* fix ut in disk attack

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

* fix Boilerplate header

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>

---------

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>
Co-authored-by: Cwen Yin <cwenyin0@gmail.com>
This commit is contained in:
Andrewmatilde 2023-02-08 17:01:30 +08:00 committed by GitHub
parent e7715f72b9
commit 19a157239e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 540 additions and 127 deletions

8
go.mod
View File

@ -1,6 +1,7 @@
module github.com/chaos-mesh/chaosd
require (
github.com/Jeffail/tunny v0.1.4
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a
github.com/chaos-mesh/chaos-mesh v0.9.1-0.20220812140450-4bc7ef589c13
@ -24,11 +25,13 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/robfig/cron/v3 v3.0.1
github.com/samber/lo v1.37.0
github.com/samber/mo v1.8.0
github.com/segmentio/kafka-go v0.4.31
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/spf13/cobra v1.4.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.2
github.com/stretchr/testify v1.8.1
github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe
github.com/swaggo/gin-swagger v1.5.0
github.com/swaggo/swag v1.8.3
@ -135,9 +138,10 @@ require (
go.uber.org/dig v1.14.1 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect

18
go.sum
View File

@ -70,6 +70,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/Jeffail/tunny v0.1.4 h1:chtpdz+nUtaYQeCKlNBg6GycFF/kGVHOr6A3cmzTJXs=
github.com/Jeffail/tunny v0.1.4/go.mod h1:P8xAx4XQl0xsuhjX1DtfaMDCSuavzdb2rwbd0lk+fvo=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA=
@ -947,6 +949,10 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4=
github.com/samber/lo v1.37.0 h1:XjVcB8g6tgUp8rsPsJ2CvhClfImrpL04YpQHXeHPhRw=
github.com/samber/lo v1.37.0/go.mod h1:9vaz2O4o8oOnK23pd2TrXufcbdbJIa3b6cstBWKpopA=
github.com/samber/mo v1.8.0 h1:vYjHTfg14JF9tD2NLhpoUsRi9bjyRoYwa4+do0nvbVw=
github.com/samber/mo v1.8.0/go.mod h1:BfkrCPuYzVG3ZljnZB783WIJIGk1mcZr9c9CPf8tAxs=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
@ -1005,6 +1011,8 @@ github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag
github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v0.0.0-20180303142811-b89eecf5ca5d/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@ -1013,8 +1021,11 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe h1:K8pHPVoTgxFJt1lXuIzzOX7zZhZFldJQK/CgKx9BFIc=
github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe/go.mod h1:lKJPbtWzJ9JhsTN1k1gZgleJWY/cqq0psdoMmaThG3w=
@ -1169,6 +1180,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
@ -1281,8 +1294,9 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@ -16,7 +16,6 @@ package core
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
@ -90,6 +89,16 @@ func NewDiskOption() *DiskOption {
}
}
func NewDiskOptionForServer() *DiskOption {
return &DiskOption{
CommonAttackConfig: CommonAttackConfig{
Kind: DiskServerAttack,
},
PayloadProcessNum: 1,
FillByFallocate: true,
}
}
func (opt *DiskOption) PreProcess() (*DiskAttackConfig, error) {
if err := opt.CommonAttackConfig.Validate(); err != nil {
return nil, err
@ -191,7 +200,7 @@ func initPath(opt *DiskOption) (string, error) {
// check if Path of file is valid when Path is not empty
if os.IsNotExist(err) {
var b []byte
if err := ioutil.WriteFile(opt.Path, b, 0600); err != nil {
if err := os.WriteFile(opt.Path, b, 0600); err != nil {
return "", err
}
if err := os.Remove(opt.Path); err != nil {

View File

@ -1,4 +1,4 @@
// Copyright 2021 Chaos Mesh Authors.
// Copyright 2023 Chaos Mesh Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -10,6 +10,7 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package core
import (

View File

@ -35,6 +35,7 @@ const (
NetworkAttack = "network"
StressAttack = "stress"
DiskAttack = "disk"
DiskServerAttack = "disk-server"
ClockAttack = "clock"
HostAttack = "host"
JVMAttack = "jvm"
@ -109,6 +110,8 @@ func GetAttackByKind(kind string) *AttackConfig {
attackConfig = &StressCommand{}
case DiskAttack:
attackConfig = &DiskAttackConfig{}
case DiskServerAttack:
attackConfig = &DiskAttackConfig{}
case JVMAttack:
attackConfig = &JVMCommand{}
case ClockAttack:

View File

@ -141,8 +141,8 @@ type JVMStressSpec struct {
// only when SQL match the Database, Table and SQLType, chaosd will inject fault
// for example:
//
// SQL is "select * from test.t1",
// only when ((Database == "test" || Database == "") && (Table == "t1" || Table == "") && (SQLType == "select" || SQLType == "")) is true, chaosd will inject fault
// SQL is "select * from test.t1",
// only when ((Database == "test" || Database == "") && (Table == "t1" || Table == "") && (SQLType == "select" || SQLType == "")) is true, chaosd will inject fault
type JVMMySQLSpec struct {
// the version of mysql-connector-java, only support 5.X.X(set to 5) and 8.X.X(set to 8) now
MySQLConnectorVersion string `json:"mysql-connector-version,omitempty"`

View File

@ -14,17 +14,15 @@
package chaosd
import (
"context"
"fmt"
"os"
"os/exec"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/pingcap/log"
"go.uber.org/zap"
"github.com/chaos-mesh/chaosd/pkg/server/utils"
pkgUtils "github.com/chaos-mesh/chaosd/pkg/utils"
"github.com/chaos-mesh/chaosd/pkg/core"
)
@ -33,88 +31,71 @@ type diskAttack struct{}
var DiskAttack AttackType = diskAttack{}
func (disk diskAttack) Attack(options core.AttackConfig, env Environment) error {
func handleDiskAttackOutput(output []byte, err error, c chan interface{}) {
if err != nil {
log.Error(string(output), zap.Error(err))
c <- err
}
log.Info(string(output))
}
func (diskAttack) Attack(options core.AttackConfig, env Environment) error {
err := ApplyDiskAttack(options, env)
if err != nil {
return err
}
return nil
}
func handleOutputChannelError(c chan interface{}) error {
close(c)
var multiErrs error
for i := range c {
if err, ok := i.(error); ok {
multiErrs = multierror.Append(multiErrs, err)
}
}
if multiErrs != nil {
return multiErrs
}
return nil
}
func ApplyDiskAttack(options core.AttackConfig, env Environment) error {
var attackConf *core.DiskAttackConfig
var ok bool
if attackConf, ok = options.(*core.DiskAttackConfig); !ok {
return fmt.Errorf("AttackConfig -> *DiskAttackConfig meet error")
}
poolSize := getPoolSize(attackConf)
outputChan := make(chan interface{}, poolSize+1)
if attackConf.Action == core.DiskFillAction {
if attackConf.FAllocateOption != nil {
cmd := core.FAllocateCommand.Unmarshal(*attackConf.FAllocateOption)
output, err := cmd.CombinedOutput()
if err != nil {
log.Error(string(output), zap.Error(err))
return err
}
log.Info(string(output))
return nil
}
for _, DdOption := range *attackConf.DdOptions {
cmd := core.DdCommand.Unmarshal(DdOption)
output, err := cmd.CombinedOutput()
if err != nil {
log.Error(string(output), zap.Error(err))
return err
}
log.Info(string(output))
}
return nil
cmdPool := pkgUtils.NewCommandPools(context.Background(), nil, poolSize)
env.Chaos.CmdPools[env.AttackUid] = cmdPool
fillDisk(attackConf, cmdPool, NewOutputHandler(handleDiskAttackOutput, outputChan))
cmdPool.Wait()
cmdPool.Close()
return handleOutputChannelError(outputChan)
}
if attackConf.DdOptions != nil {
duration, _ := options.ScheduleDuration()
var deadline <-chan time.Time
if duration != nil {
deadline = time.After(*duration)
var cmdPool *pkgUtils.CommandPools
deadline := getDeadline(options)
if deadline != nil {
cmdPool = pkgUtils.NewCommandPools(context.Background(), deadline, poolSize)
}
cmdPool = pkgUtils.NewCommandPools(context.Background(), nil, poolSize)
env.Chaos.CmdPools[env.AttackUid] = cmdPool
if len(*attackConf.DdOptions) == 0 {
return nil
}
rest := (*attackConf.DdOptions)[len(*attackConf.DdOptions)-1]
*attackConf.DdOptions = (*attackConf.DdOptions)[:len(*attackConf.DdOptions)-1]
cmd := core.DdCommand.Unmarshal(rest)
err := utils.ExecWithDeadline(deadline, cmd)
if err != nil {
return err
}
var wg sync.WaitGroup
var mu sync.Mutex
var errs error
wg.Add(len(*attackConf.DdOptions))
for _, ddOpt := range *attackConf.DdOptions {
cmd := core.DdCommand.Unmarshal(ddOpt)
go func(cmd *exec.Cmd) {
defer wg.Done()
err := utils.ExecWithDeadline(deadline, cmd)
if err != nil {
log.Error(cmd.String(), zap.Error(err))
mu.Lock()
defer mu.Unlock()
errs = multierror.Append(errs, err)
return
}
}(cmd)
}
wg.Wait()
if errs != nil {
return errs
}
applyPayload(attackConf, cmdPool, NewOutputHandler(handleDiskAttackOutput, outputChan))
cmdPool.Wait()
cmdPool.Close()
return handleOutputChannelError(outputChan)
}
return nil
}
func (diskAttack) Recover(exp core.Experiment, _ Environment) error {
func (diskAttack) Recover(exp core.Experiment, env Environment) error {
attackConfig, err := exp.GetRequestCommand()
if err != nil {
return err
@ -127,5 +108,10 @@ func (diskAttack) Recover(exp core.Experiment, _ Environment) error {
log.Warn(fmt.Sprintf("recover disk: remove %s failed", config.Path), zap.Error(err))
}
}
if cmdPool, ok := env.Chaos.CmdPools[exp.Uid]; ok {
cmdPool.Close()
}
return nil
}

View File

@ -0,0 +1,172 @@
// Copyright 2021 Chaos Mesh Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package chaosd
import (
"context"
"fmt"
"os"
"time"
"github.com/pingcap/log"
"go.uber.org/zap"
pkgUtils "github.com/chaos-mesh/chaosd/pkg/utils"
"github.com/chaos-mesh/chaosd/pkg/core"
)
type diskServerAttack struct{}
var DiskServerAttack AttackType = diskServerAttack{}
func handleDiskServerOutput(output []byte, err error, _ chan interface{}) {
if err != nil {
log.Error(string(output), zap.Error(err))
}
log.Info(string(output))
}
func (diskServerAttack) Attack(options core.AttackConfig, env Environment) error {
err := ApplyDiskServerAttack(options, env)
if err != nil {
return err
}
return nil
}
type OutputHandler struct {
StdoutHandler func([]byte, error, chan interface{})
OutputChan chan interface{}
}
func NewOutputHandler(
handler func([]byte, error, chan interface{}),
outputChan chan interface{}) *OutputHandler {
return &OutputHandler{
StdoutHandler: handler,
OutputChan: outputChan,
}
}
func getPoolSize(attackConf *core.DiskAttackConfig) int {
poolSize := 1
if attackConf.DdOptions != nil && len(*attackConf.DdOptions) > 0 {
poolSize = len(*attackConf.DdOptions)
}
return poolSize
}
func fillDisk(
attackConf *core.DiskAttackConfig,
cmdPool *pkgUtils.CommandPools,
outputHandler *OutputHandler) {
if attackConf.FAllocateOption != nil {
name, args := core.FAllocateCommand.GetCmdArgs(*attackConf.FAllocateOption)
runner := pkgUtils.NewCommandRunner(name, args).
WithOutputHandler(outputHandler.StdoutHandler, outputHandler.OutputChan)
cmdPool.Start(runner)
return
}
for _, DdOption := range *attackConf.DdOptions {
name, args := core.DdCommand.GetCmdArgs(DdOption)
runner := pkgUtils.NewCommandRunner(name, args).
WithOutputHandler(outputHandler.StdoutHandler, outputHandler.OutputChan)
cmdPool.Start(runner)
}
return
}
func getDeadline(options core.AttackConfig) *time.Time {
duration, _ := options.ScheduleDuration()
if duration != nil {
deadline := time.Now().Add(*duration)
return &deadline
}
return nil
}
func applyPayload(
attackConf *core.DiskAttackConfig,
cmdPool *pkgUtils.CommandPools,
outputHandler *OutputHandler) {
if len(*attackConf.DdOptions) == 0 {
return
}
rest := (*attackConf.DdOptions)[len(*attackConf.DdOptions)-1]
*attackConf.DdOptions = (*attackConf.DdOptions)[:len(*attackConf.DdOptions)-1]
name, args := core.DdCommand.GetCmdArgs(rest)
runner := pkgUtils.NewCommandRunner(name, args).
WithOutputHandler(outputHandler.StdoutHandler, outputHandler.OutputChan)
cmdPool.Start(runner)
for _, ddOpt := range *attackConf.DdOptions {
name, args := core.DdCommand.GetCmdArgs(ddOpt)
runner := pkgUtils.NewCommandRunner(name, args).
WithOutputHandler(outputHandler.StdoutHandler, outputHandler.OutputChan)
cmdPool.Start(runner)
}
}
func ApplyDiskServerAttack(options core.AttackConfig, env Environment) error {
var attackConf *core.DiskAttackConfig
var ok bool
if attackConf, ok = options.(*core.DiskAttackConfig); !ok {
return fmt.Errorf("AttackConfig -> *DiskAttackConfig meet error")
}
poolSize := getPoolSize(attackConf)
if attackConf.Action == core.DiskFillAction {
cmdPool := pkgUtils.NewCommandPools(context.Background(), nil, poolSize)
env.Chaos.CmdPools[env.AttackUid] = cmdPool
fillDisk(attackConf, cmdPool, NewOutputHandler(handleDiskServerOutput, nil))
return nil
}
if attackConf.DdOptions != nil {
var cmdPool *pkgUtils.CommandPools
deadline := getDeadline(options)
if deadline != nil {
cmdPool = pkgUtils.NewCommandPools(context.Background(), deadline, poolSize)
}
cmdPool = pkgUtils.NewCommandPools(context.Background(), nil, poolSize)
env.Chaos.CmdPools[env.AttackUid] = cmdPool
applyPayload(attackConf, cmdPool, NewOutputHandler(handleDiskServerOutput, nil))
}
return nil
}
func (diskServerAttack) Recover(exp core.Experiment, env Environment) error {
attackConfig, err := exp.GetRequestCommand()
if err != nil {
return err
}
config := *attackConfig.(*core.DiskAttackConfig)
switch config.Action {
case core.DiskFillAction, core.DiskWritePayloadAction:
err = os.Remove(config.Path)
if err != nil {
log.Warn(fmt.Sprintf("recover disk: remove %s failed", config.Path), zap.Error(err))
}
}
if cmdPool, ok := env.Chaos.CmdPools[exp.Uid]; ok {
log.Info(fmt.Sprintf("stop disk attack,read: %s", config.Path))
cmdPool.Close()
}
return nil
}

View File

@ -1,4 +1,4 @@
// Copyright 2021 Chaos Mesh Authors.
// Copyright 2023 Chaos Mesh Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -10,6 +10,7 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package chaosd
import (
@ -19,6 +20,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/chaos-mesh/chaosd/pkg/core"
"github.com/chaos-mesh/chaosd/pkg/utils"
)
func Test_diskAttack_Attack(t *testing.T) {
@ -30,9 +32,15 @@ func Test_diskAttack_Attack(t *testing.T) {
Path: "./a",
PayloadProcessNum: 1,
}
env := Environment{
AttackUid: "a",
Chaos: &Server{
CmdPools: make(map[string]*utils.CommandPools),
},
}
conf, err := opt.PreProcess()
assert.NoError(t, err)
err = DiskAttack.Attack(conf, Environment{})
err = DiskAttack.Attack(conf, env)
assert.NoError(t, err)
f, err := os.Open("./a")
@ -47,7 +55,7 @@ func Test_diskAttack_Attack(t *testing.T) {
opt.PayloadProcessNum = 4
wConf, err := opt.PreProcess()
assert.NoError(t, err)
err = DiskAttack.Attack(wConf, Environment{})
err = DiskAttack.Attack(wConf, env)
assert.NoError(t, err)
f, err = os.Open("./a")

View File

@ -61,6 +61,8 @@ func (s *Server) RecoverAttack(uid string) error {
attackType = StressAttack
case core.DiskAttack:
attackType = DiskAttack
case core.DiskServerAttack:
attackType = DiskServerAttack
case core.JVMAttack:
attackType = JVMAttack
case core.ClockAttack:

View File

@ -19,6 +19,7 @@ import (
"github.com/chaos-mesh/chaosd/pkg/config"
"github.com/chaos-mesh/chaosd/pkg/core"
"github.com/chaos-mesh/chaosd/pkg/scheduler"
"github.com/chaos-mesh/chaosd/pkg/utils"
)
type Server struct {
@ -30,6 +31,8 @@ type Server struct {
tcRule core.TCRuleStore
conf *config.Config
svr *chaosdaemon.DaemonServer
CmdPools map[string]*utils.CommandPools
}
func NewServer(
@ -51,5 +54,6 @@ func NewServer(
iptablesRule: iptables,
tcRule: tc,
svr: svr,
CmdPools: make(map[string]*utils.CommandPools),
}
}

View File

@ -16,8 +16,8 @@ package httpserver
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net/http"
"os"
"time"
"github.com/gin-gonic/gin"
@ -37,7 +37,7 @@ var (
errMissingClientCert = utils.ErrAuth.New("Sorry, but you need to provide a client certificate to continue")
)
func (s *httpServer) serverMode() string {
func (s *HttpServer) serverMode() string {
if len(s.conf.SSLCertFile) > 0 {
if len(s.conf.SSLClientCAFile) > 0 {
return MTLSServer
@ -47,7 +47,7 @@ func (s *httpServer) serverMode() string {
return HTTPServer
}
func (s *httpServer) startHttpsServer() (err error) {
func (s *HttpServer) startHttpsServer() (err error) {
mode := s.serverMode()
if mode == HTTPServer {
return nil
@ -60,7 +60,7 @@ func (s *httpServer) startHttpsServer() (err error) {
if mode == MTLSServer {
log.Info("starting HTTPS server with Client Auth", zap.String("address", httpsServerAddr))
caCert, ioErr := ioutil.ReadFile(s.conf.SSLClientCAFile)
caCert, ioErr := os.ReadFile(s.conf.SSLClientCAFile)
if ioErr != nil {
err = ioErr
return

View File

@ -22,7 +22,7 @@ import (
"github.com/chaos-mesh/chaosd/pkg/core"
)
func (s *httpServer) listExperiments(c *gin.Context) {
func (s *HttpServer) listExperiments(c *gin.Context) {
mode, ok := c.GetQuery("launch_mode")
var chaosList []*core.Experiment
var err error
@ -32,17 +32,17 @@ func (s *httpServer) listExperiments(c *gin.Context) {
chaosList, err = s.exp.List(context.Background())
}
if err != nil {
c.AbortWithError(http.StatusInternalServerError, err)
_ = c.AbortWithError(http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusOK, chaosList)
}
func (s *httpServer) listExperimentRuns(c *gin.Context) {
func (s *HttpServer) listExperimentRuns(c *gin.Context) {
uid := c.Param("uid")
runsList, err := s.chaos.ExpRun.ListByExperimentUID(context.Background(), uid)
if err != nil {
c.AbortWithError(http.StatusInternalServerError, err)
_ = c.AbortWithError(http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusOK, runsList)

View File

@ -30,7 +30,7 @@ import (
"github.com/chaos-mesh/chaosd/pkg/swaggerserver"
)
type httpServer struct {
type HttpServer struct {
conf *config.Config
chaos *chaosd.Server
exp core.ExperimentStore
@ -40,15 +40,15 @@ func NewServer(
conf *config.Config,
chaos *chaosd.Server,
exp core.ExperimentStore,
) *httpServer {
return &httpServer{
) *HttpServer {
return &HttpServer{
conf: conf,
chaos: chaos,
exp: exp,
}
}
func Register(s *httpServer, scheduler scheduler.Scheduler) {
func Register(s *HttpServer, scheduler scheduler.Scheduler) {
if s.conf.Platform != config.LocalPlatform {
return
}
@ -66,7 +66,7 @@ func Register(s *httpServer, scheduler scheduler.Scheduler) {
scheduler.Start()
}
func (s *httpServer) startHttpServer() error {
func (s *HttpServer) startHttpServer() error {
httpServerAddr := s.conf.Address()
log.Info("starting HTTP server", zap.String("address", httpServerAddr))
e := gin.Default()
@ -78,7 +78,7 @@ func (s *httpServer) startHttpServer() error {
return e.Run(httpServerAddr)
}
func (s *httpServer) handler(engine *gin.Engine) {
func (s *HttpServer) handler(engine *gin.Engine) {
api := engine.Group("/api")
{
api.GET("/swagger/*any", swaggerserver.Handler())
@ -107,7 +107,7 @@ func (s *httpServer) handler(engine *gin.Engine) {
}
}
func (s *httpServer) systemHandler(engine *gin.Engine) {
func (s *HttpServer) systemHandler(engine *gin.Engine) {
api := engine.Group("/api")
system := api.Group("/system")
{
@ -125,10 +125,10 @@ func (s *httpServer) systemHandler(engine *gin.Engine) {
// @Failure 400 {object} utils.APIError
// @Failure 500 {object} utils.APIError
// @Router /api/attack/process [post]
func (s *httpServer) createProcessAttack(c *gin.Context) {
func (s *HttpServer) createProcessAttack(c *gin.Context) {
attack := core.NewProcessCommand()
if err := c.ShouldBindJSON(attack); err != nil {
c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
_ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
return
}
@ -157,10 +157,10 @@ func (s *httpServer) createProcessAttack(c *gin.Context) {
// @Failure 400 {object} utils.APIError
// @Failure 500 {object} utils.APIError
// @Router /api/attack/network [post]
func (s *httpServer) createNetworkAttack(c *gin.Context) {
func (s *HttpServer) createNetworkAttack(c *gin.Context) {
attack := core.NewNetworkCommand()
if err := c.ShouldBindJSON(attack); err != nil {
c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
_ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
return
}
@ -189,10 +189,10 @@ func (s *httpServer) createNetworkAttack(c *gin.Context) {
// @Failure 400 {object} utils.APIError
// @Failure 500 {object} utils.APIError
// @Router /api/attack/stress [post]
func (s *httpServer) createStressAttack(c *gin.Context) {
func (s *HttpServer) createStressAttack(c *gin.Context) {
attack := core.NewStressCommand()
if err := c.ShouldBindJSON(attack); err != nil {
c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
_ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
return
}
@ -221,10 +221,10 @@ func (s *httpServer) createStressAttack(c *gin.Context) {
// @Failure 400 {object} utils.APIError
// @Failure 500 {object} utils.APIError
// @Router /api/attack/disk [post]
func (s *httpServer) createDiskAttack(c *gin.Context) {
options := core.NewDiskOption()
func (s *HttpServer) createDiskAttack(c *gin.Context) {
options := core.NewDiskOptionForServer()
if err := c.ShouldBindJSON(options); err != nil {
c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
_ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
return
}
@ -236,7 +236,7 @@ func (s *httpServer) createDiskAttack(c *gin.Context) {
return
}
uid, err := s.chaos.ExecuteAttack(chaosd.DiskAttack, attackConfig, core.ServerMode)
uid, err := s.chaos.ExecuteAttack(chaosd.DiskServerAttack, attackConfig, core.ServerMode)
if err != nil {
handleError(c, err)
return
@ -254,10 +254,10 @@ func (s *httpServer) createDiskAttack(c *gin.Context) {
// @Failure 400 {object} utils.APIError
// @Failure 500 {object} utils.APIError
// @Router /api/attack/clock [post]
func (s *httpServer) createClockAttack(c *gin.Context) {
func (s *HttpServer) createClockAttack(c *gin.Context) {
options := core.NewClockOption()
if err := c.ShouldBindJSON(options); err != nil {
c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
_ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
return
}
@ -286,10 +286,10 @@ func (s *httpServer) createClockAttack(c *gin.Context) {
// @Failure 400 {object} utils.APIError
// @Failure 500 {object} utils.APIError
// @Router /api/attack/http [post]
func (s *httpServer) createHTTPAttack(c *gin.Context) {
func (s *HttpServer) createHTTPAttack(c *gin.Context) {
attack := core.NewHTTPAttackOption()
if err := c.ShouldBindJSON(attack); err != nil {
c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
_ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
return
}
@ -318,10 +318,10 @@ func (s *httpServer) createHTTPAttack(c *gin.Context) {
// @Failure 400 {object} utils.APIError
// @Failure 500 {object} utils.APIError
// @Router /api/attack/jvm [post]
func (s *httpServer) createJVMAttack(c *gin.Context) {
func (s *HttpServer) createJVMAttack(c *gin.Context) {
options := core.NewJVMCommand()
if err := c.ShouldBindJSON(options); err != nil {
c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
_ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
return
}
@ -350,10 +350,10 @@ func (s *httpServer) createJVMAttack(c *gin.Context) {
// @Failure 400 {object} utils.APIError
// @Failure 500 {object} utils.APIError
// @Router /api/attack/kafka [post]
func (s *httpServer) createKafkaAttack(c *gin.Context) {
func (s *HttpServer) createKafkaAttack(c *gin.Context) {
options := core.NewKafkaCommand()
if err := c.ShouldBindJSON(options); err != nil {
c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
_ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
return
}
@ -382,10 +382,10 @@ func (s *httpServer) createKafkaAttack(c *gin.Context) {
// @Failure 400 {object} utils.APIError
// @Failure 500 {object} utils.APIError
// @Router /api/attack/vm [post]
func (s *httpServer) createVMAttack(c *gin.Context) {
func (s *HttpServer) createVMAttack(c *gin.Context) {
options := core.NewVMOption()
if err := c.ShouldBindJSON(options); err != nil {
c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
_ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
return
}
@ -414,10 +414,10 @@ func (s *httpServer) createVMAttack(c *gin.Context) {
// @Failure 400 {object} utils.APIError
// @Failure 500 {object} utils.APIError
// @Router /api/attack/redis [post]
func (s *httpServer) createRedisAttack(c *gin.Context) {
func (s *HttpServer) createRedisAttack(c *gin.Context) {
attack := core.NewRedisCommand()
if err := c.ShouldBindJSON(attack); err != nil {
c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
_ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
return
}
@ -446,10 +446,10 @@ func (s *httpServer) createRedisAttack(c *gin.Context) {
// @Failure 400 {object} utils.APIError
// @Failure 500 {object} utils.APIError
// @Router /api/attack/user_defined [post]
func (s *httpServer) createUserDefinedAttack(c *gin.Context) {
func (s *HttpServer) createUserDefinedAttack(c *gin.Context) {
attack := core.NewUserDefinedOption()
if err := c.ShouldBindJSON(attack); err != nil {
c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
_ = c.AbortWithError(http.StatusBadRequest, utils.ErrInternalServer.WrapWithNoMessage(err))
return
}
@ -477,7 +477,7 @@ func (s *httpServer) createUserDefinedAttack(c *gin.Context) {
// @Success 200 {object} utils.Response
// @Failure 500 {object} utils.APIError
// @Router /api/attack/{uid} [delete]
func (s *httpServer) recoverAttack(c *gin.Context) {
func (s *HttpServer) recoverAttack(c *gin.Context) {
uid := c.Param("uid")
err := s.chaos.RecoverAttack(uid)
if err != nil {

View File

@ -26,10 +26,10 @@ type healthInfo struct {
Message string `json:"message"`
}
func (s *httpServer) healthcheck(c *gin.Context) {
func (s *HttpServer) healthcheck(c *gin.Context) {
c.JSON(http.StatusOK, healthInfo{Status: 0})
}
func (s *httpServer) version(c *gin.Context) {
func (s *HttpServer) version(c *gin.Context) {
c.JSON(http.StatusOK, version.Get())
}

View File

@ -14,6 +14,7 @@
package utils
import (
"context"
"fmt"
"os/exec"
"reflect"
@ -24,6 +25,16 @@ type Command struct {
}
func (c Command) Unmarshal(val interface{}) *exec.Cmd {
name, args := c.GetCmdArgs(val)
return exec.Command(name, args...)
}
func (c Command) UnmarshalWithCtx(ctx context.Context, val interface{}) *exec.Cmd {
name, args := c.GetCmdArgs(val)
return exec.CommandContext(ctx, name, args...)
}
func (c Command) GetCmdArgs(val interface{}) (string, []string) {
v := reflect.ValueOf(val)
var options []string
@ -39,5 +50,5 @@ func (c Command) Unmarshal(val interface{}) *exec.Cmd {
options = append(options, fmt.Sprintf("%s=%v", tag, v.Field(i).String()))
}
}
return exec.Command(c.Name, options...) //
return c.Name, options
}

View File

@ -1,4 +1,4 @@
// Copyright 2021 Chaos Mesh Authors.
// Copyright 2023 Chaos Mesh Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -10,6 +10,7 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (

117
pkg/utils/pool.go Normal file
View File

@ -0,0 +1,117 @@
// Copyright 2023 Chaos Mesh Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"context"
"fmt"
"os/exec"
"sync"
"time"
"github.com/Jeffail/tunny"
"github.com/samber/lo"
"github.com/samber/mo"
)
// CommandPools is a group of commands runner
type CommandPools struct {
cancel context.CancelFunc
pools *tunny.Pool
wg sync.WaitGroup
}
// NewCommandPools returns a new CommandPools
func NewCommandPools(ctx context.Context, deadline *time.Time, size int) *CommandPools {
var ctx2 context.Context
var cancel context.CancelFunc
if deadline != nil {
ctx2, cancel = context.WithDeadline(ctx, *deadline)
} else {
ctx2, cancel = context.WithCancel(ctx)
}
return &CommandPools{
cancel: cancel,
pools: tunny.NewFunc(size, func(payload interface{}) interface{} {
cmdPayload, ok := payload.(lo.Tuple2[string, []string])
if !ok {
return mo.Err[[]byte](fmt.Errorf("payload is not CommandPayload"))
}
name, args := cmdPayload.Unpack()
cmd := exec.CommandContext(ctx2, name, args...)
output, err := cmd.CombinedOutput()
if err != nil {
return mo.Err[[]byte](fmt.Errorf("%s: %s", err, string(output)))
}
return mo.Ok[[]byte](output)
}),
}
}
type CommandRunner struct {
Name string
Args []string
outputHandler func([]byte, error, chan interface{})
outputChanel chan interface{}
}
func NewCommandRunner(name string, args []string) *CommandRunner {
return &CommandRunner{
Name: name,
Args: args,
outputHandler: func(bytes []byte, err error, c chan interface{}) {},
outputChanel: nil,
}
}
func (r *CommandRunner) WithOutputHandler(
handler func([]byte, error, chan interface{}),
outputChanel chan interface{},
) *CommandRunner {
r.outputHandler = handler
r.outputChanel = outputChanel
return r
}
func (p *CommandPools) Process(name string, args []string) ([]byte, error) {
result, ok := p.pools.Process(lo.Tuple2[string, []string]{
A: name,
B: args,
}).(mo.Result[[]byte])
if !ok {
return nil, fmt.Errorf("payload is not Result[[]byte]")
}
return result.Get()
}
// Start command async.
func (p *CommandPools) Start(runner *CommandRunner) {
p.wg.Add(1)
go func() {
output, err := p.Process(runner.Name, runner.Args)
runner.outputHandler(output, err, runner.outputChanel)
p.wg.Done()
}()
}
func (p *CommandPools) Wait() {
p.wg.Wait()
}
func (p *CommandPools) Close() {
p.cancel()
p.Wait()
p.pools.Close()
}

81
pkg/utils/pool_test.go Normal file
View File

@ -0,0 +1,81 @@
// Copyright 2023 Chaos Mesh Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"context"
"math"
"testing"
"time"
"github.com/pingcap/log"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestCommandPools_Cancel(t *testing.T) {
now := time.Now()
cmdPools := NewCommandPools(context.Background(), nil, 1)
var gErr []error
runner := NewCommandRunner("sleep", []string{"10s"}).
WithOutputHandler(func(output []byte, err error, _ chan interface{}) {
if err != nil {
log.Error(string(output), zap.Error(err))
gErr = append(gErr, err)
}
log.Info(string(output))
}, nil)
cmdPools.Start(runner)
cmdPools.Close()
assert.Less(t, time.Since(now).Seconds(), 10.0)
assert.Equal(t, 1, len(gErr))
}
func TestCommandPools_Deadline(t *testing.T) {
now := time.Now()
deadline := time.Now().Add(time.Millisecond * 50)
cmdPools := NewCommandPools(context.Background(), &deadline, 1)
var gErr []error
runner := NewCommandRunner("sleep", []string{"10s"}).
WithOutputHandler(func(output []byte, err error, _ chan interface{}) {
if err != nil {
log.Error(string(output), zap.Error(err))
gErr = append(gErr, err)
}
log.Info(string(output))
}, nil)
cmdPools.Start(runner)
cmdPools.Wait()
assert.Less(t, math.Abs(float64(time.Since(now).Milliseconds()-50)), 10.0)
assert.Equal(t, 1, len(gErr))
}
func TestCommandPools_Normal(t *testing.T) {
now := time.Now()
cmdPools := NewCommandPools(context.Background(), nil, 1)
var gErr []error
runner := NewCommandRunner("sleep", []string{"1s"}).
WithOutputHandler(func(output []byte, err error, _ chan interface{}) {
if err != nil {
log.Error(string(output), zap.Error(err))
gErr = append(gErr, err)
}
log.Info(string(output))
}, nil)
cmdPools.Start(runner)
cmdPools.Wait()
assert.Less(t, time.Since(now).Seconds(), 2.0)
assert.Equal(t, 0, len(gErr))
}