add output channel to pools

Signed-off-by: andrewmatilde <davis6813585853062@outlook.com>
This commit is contained in:
andrewmatilde 2023-02-08 14:42:40 +08:00
parent 890ae6dbf7
commit cc151c6dc0
2 changed files with 56 additions and 25 deletions

View File

@ -59,6 +59,32 @@ func NewCommandPools(ctx context.Context, deadline *time.Time, size int) *Comman
}
}
type CommandRunner struct {
Name string
Args []string
outputHandler func([]byte, error, *chan interface{})
outputChanel *chan interface{}
}
func NewCommandRunner(name string, args []string) *CommandRunner {
return &CommandRunner{
Name: name,
Args: args,
outputHandler: func(bytes []byte, err error, c *chan interface{}) {},
outputChanel: nil,
}
}
func (r *CommandRunner) WithOutputHandler(
handler func([]byte, error, *chan interface{}),
outputChanel *chan interface{},
) *CommandRunner {
r.outputHandler = handler
r.outputChanel = outputChanel
return r
}
func (p *CommandPools) Process(name string, args []string) ([]byte, error) {
result, ok := p.pools.Process(lo.Tuple2[string, []string]{
A: name,
@ -71,12 +97,11 @@ func (p *CommandPools) Process(name string, args []string) ([]byte, error) {
}
// Start command async.
// TODO: give an input channel for output value when needed
func (p *CommandPools) Start(name string, args []string, outputHandler func([]byte, error)) {
func (p *CommandPools) Start(runner *CommandRunner) {
p.wg.Add(1)
go func() {
output, err := p.Process(name, args)
outputHandler(output, err)
output, err := p.Process(runner.Name, runner.Args)
runner.outputHandler(output, err, runner.outputChanel)
p.wg.Done()
}()
}

View File

@ -27,13 +27,15 @@ func TestCommandPools_Cancel(t *testing.T) {
now := time.Now()
cmdPools := NewCommandPools(context.Background(), nil, 1)
var gErr []error
cmdPools.Start("sleep", []string{"10s"}, func(output []byte, err error) {
if err != nil {
log.Error(string(output), zap.Error(err))
gErr = append(gErr, err)
}
log.Info(string(output))
})
runner := NewCommandRunner("sleep", []string{"10s"}).
WithOutputHandler(func(output []byte, err error, _ *chan interface{}) {
if err != nil {
log.Error(string(output), zap.Error(err))
gErr = append(gErr, err)
}
log.Info(string(output))
}, nil)
cmdPools.Start(runner)
cmdPools.Close()
assert.Less(t, time.Since(now).Seconds(), 10.0)
assert.Equal(t, 1, len(gErr))
@ -44,13 +46,15 @@ func TestCommandPools_Deadline(t *testing.T) {
deadline := time.Now().Add(time.Millisecond * 50)
cmdPools := NewCommandPools(context.Background(), &deadline, 1)
var gErr []error
cmdPools.Start("sleep", []string{"10s"}, func(output []byte, err error) {
if err != nil {
log.Error(string(output), zap.Error(err))
gErr = append(gErr, err)
}
log.Info(string(output))
})
runner := NewCommandRunner("sleep", []string{"10s"}).
WithOutputHandler(func(output []byte, err error, _ *chan interface{}) {
if err != nil {
log.Error(string(output), zap.Error(err))
gErr = append(gErr, err)
}
log.Info(string(output))
}, nil)
cmdPools.Start(runner)
cmdPools.Wait()
assert.Less(t, math.Abs(float64(time.Since(now).Milliseconds()-50)), 10.0)
assert.Equal(t, 1, len(gErr))
@ -61,13 +65,15 @@ func TestCommandPools_Normal(t *testing.T) {
now := time.Now()
cmdPools := NewCommandPools(context.Background(), nil, 1)
var gErr []error
cmdPools.Start("sleep", []string{"1s"}, func(output []byte, err error) {
if err != nil {
log.Error(string(output), zap.Error(err))
gErr = append(gErr, err)
}
log.Info(string(output))
})
runner := NewCommandRunner("sleep", []string{"1s"}).
WithOutputHandler(func(output []byte, err error, _ *chan interface{}) {
if err != nil {
log.Error(string(output), zap.Error(err))
gErr = append(gErr, err)
}
log.Info(string(output))
}, nil)
cmdPools.Start(runner)
cmdPools.Wait()
assert.Less(t, time.Since(now).Seconds(), 2.0)
assert.Equal(t, 0, len(gErr))