use subtests for the benchmark_test and add it into the Makefile (#1278)

* use subtests for the benchmark_test and add it into the Makefile

* benchmark: keep the original benchmark_test as version 16. use subtests benchmark as 17
This commit is contained in:
ZhouyihaiDing 2017-06-12 14:52:33 -07:00 committed by Menghan Li
parent 84158ac547
commit ba30de56b8
6 changed files with 259 additions and 207 deletions

View File

@ -9,6 +9,9 @@ updatedeps:
testdeps:
go get -d -v -t google.golang.org/grpc/...
benchdeps: testdeps
go get -d -v golang.org/x/perf/cmd/benchstat
updatetestdeps:
go get -d -v -t -u -f google.golang.org/grpc/...
@ -32,6 +35,9 @@ test: testdeps
testrace: testdeps
go test -v -race -cpu 1,4 google.golang.org/grpc/...
benchmark: benchdeps
go test google.golang.org/grpc/benchmark/... -benchmem -bench=. | tee /tmp/tmp.result && benchstat /tmp/tmp.result && rm /tmp/tmp.result
clean:
go clean -i google.golang.org/grpc/...
@ -49,4 +55,6 @@ coverage: testdeps
test \
testrace \
clean \
coverage
coverage \
benchdeps \
benchmark

View File

@ -25,10 +25,14 @@ import (
"fmt"
"io"
"net"
"sync"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/grpclog"
)
@ -217,3 +221,109 @@ func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
}
return conn
}
func runUnary(b *testing.B, maxConcurrentCalls, reqSize, respSize int) {
s := stats.AddStats(b, 38)
b.StopTimer()
target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1)))
defer stopper()
conn := NewClientConn(target, grpc.WithInsecure())
tc := testpb.NewBenchmarkServiceClient(conn)
// Warm up connection.
for i := 0; i < 10; i++ {
unaryCaller(tc, reqSize, respSize)
}
ch := make(chan int, maxConcurrentCalls*4)
var (
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(maxConcurrentCalls)
// Distribute the b.N calls over maxConcurrentCalls workers.
for i := 0; i < maxConcurrentCalls; i++ {
go func() {
for range ch {
start := time.Now()
unaryCaller(tc, reqSize, respSize)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
}
wg.Done()
}()
}
b.StartTimer()
for i := 0; i < b.N; i++ {
ch <- i
}
b.StopTimer()
close(ch)
wg.Wait()
conn.Close()
}
func runStream(b *testing.B, maxConcurrentCalls, reqSize, respSize int) {
s := stats.AddStats(b, 38)
b.StopTimer()
target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1)))
defer stopper()
conn := NewClientConn(target, grpc.WithInsecure())
tc := testpb.NewBenchmarkServiceClient(conn)
// Warm up connection.
stream, err := tc.StreamingCall(context.Background())
if err != nil {
b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
for i := 0; i < 10; i++ {
streamCaller(stream, reqSize, respSize)
}
ch := make(chan struct{}, maxConcurrentCalls*4)
var (
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(maxConcurrentCalls)
// Distribute the b.N calls over maxConcurrentCalls workers.
for i := 0; i < maxConcurrentCalls; i++ {
stream, err := tc.StreamingCall(context.Background())
if err != nil {
b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
go func() {
for range ch {
start := time.Now()
streamCaller(stream, reqSize, respSize)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
}
wg.Done()
}()
}
b.StartTimer()
for i := 0; i < b.N; i++ {
ch <- struct{}{}
}
b.StopTimer()
close(ch)
wg.Wait()
conn.Close()
}
func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
if err := DoUnaryCall(client, reqSize, respSize); err != nil {
grpclog.Fatalf("DoUnaryCall failed: %v", err)
}
}
func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
if err := DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)
}
}

View File

@ -0,0 +1,93 @@
// +build go1.6,!go1.7
package benchmark
import (
"os"
"testing"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark/stats"
)
func BenchmarkClientStreamc1(b *testing.B) {
grpc.EnableTracing = true
runStream(b, 1, 1, 1)
}
func BenchmarkClientStreamc8(b *testing.B) {
grpc.EnableTracing = true
runStream(b, 8, 1, 1)
}
func BenchmarkClientStreamc64(b *testing.B) {
grpc.EnableTracing = true
runStream(b, 64, 1, 1)
}
func BenchmarkClientStreamc512(b *testing.B) {
grpc.EnableTracing = true
runStream(b, 512, 1, 1)
}
func BenchmarkClientUnaryc1(b *testing.B) {
grpc.EnableTracing = true
runUnary(b, 1, 1, 1)
}
func BenchmarkClientUnaryc8(b *testing.B) {
grpc.EnableTracing = true
runUnary(b, 8, 1, 1)
}
func BenchmarkClientUnaryc64(b *testing.B) {
grpc.EnableTracing = true
runUnary(b, 64, 1, 1)
}
func BenchmarkClientUnaryc512(b *testing.B) {
grpc.EnableTracing = true
runUnary(b, 512, 1, 1)
}
func BenchmarkClientStreamNoTracec1(b *testing.B) {
grpc.EnableTracing = false
runStream(b, 1, 1, 1)
}
func BenchmarkClientStreamNoTracec8(b *testing.B) {
grpc.EnableTracing = false
runStream(b, 8, 1, 1)
}
func BenchmarkClientStreamNoTracec64(b *testing.B) {
grpc.EnableTracing = false
runStream(b, 64, 1, 1)
}
func BenchmarkClientStreamNoTracec512(b *testing.B) {
grpc.EnableTracing = false
runStream(b, 512, 1, 1)
}
func BenchmarkClientUnaryNoTracec1(b *testing.B) {
grpc.EnableTracing = false
runUnary(b, 1, 1, 1)
}
func BenchmarkClientUnaryNoTracec8(b *testing.B) {
grpc.EnableTracing = false
runUnary(b, 8, 1, 1)
}
func BenchmarkClientUnaryNoTracec64(b *testing.B) {
grpc.EnableTracing = false
runUnary(b, 64, 1, 1)
}
func BenchmarkClientUnaryNoTracec512(b *testing.B) {
grpc.EnableTracing = false
runUnary(b, 512, 1, 1)
}
func TestMain(m *testing.M) {
os.Exit(stats.RunTestMain(m))
}

View File

@ -0,0 +1,44 @@
// +build go1.7
package benchmark
import (
"fmt"
"os"
"testing"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark/stats"
)
func BenchmarkClient(b *testing.B) {
maxConcurrentCalls := []int{1, 8, 64, 512}
reqSizeBytes := []int{1, 1024}
reqspSizeBytes := []int{1, 1024}
for _, enableTracing := range []bool{true, false} {
grpc.EnableTracing = enableTracing
tracing := "Tracing"
if !enableTracing {
tracing = "noTrace"
}
for _, maxC := range maxConcurrentCalls {
for _, reqS := range reqSizeBytes {
for _, respS := range reqspSizeBytes {
b.Run(fmt.Sprintf("Unary-%s-maxConcurrentCalls_"+
"%#v-reqSize_%#vB-respSize_%#vB", tracing, maxC, reqS, respS), func(b *testing.B) {
runUnary(b, maxC, reqS, respS)
})
b.Run(fmt.Sprintf("Stream-%s-maxConcurrentCalls_"+
"%#v-reqSize_%#vB-respSize_%#vB", tracing, maxC, reqS, respS), func(b *testing.B) {
runStream(b, maxC, reqS, respS)
})
}
}
}
}
}
func TestMain(m *testing.M) {
os.Exit(stats.RunTestMain(m))
}

View File

@ -1,202 +0,0 @@
package benchmark
import (
"os"
"sync"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/grpclog"
)
func runUnary(b *testing.B, maxConcurrentCalls int) {
s := stats.AddStats(b, 38)
b.StopTimer()
target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1)))
defer stopper()
conn := NewClientConn(target, grpc.WithInsecure())
tc := testpb.NewBenchmarkServiceClient(conn)
// Warm up connection.
for i := 0; i < 10; i++ {
unaryCaller(tc)
}
ch := make(chan int, maxConcurrentCalls*4)
var (
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(maxConcurrentCalls)
// Distribute the b.N calls over maxConcurrentCalls workers.
for i := 0; i < maxConcurrentCalls; i++ {
go func() {
for range ch {
start := time.Now()
unaryCaller(tc)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
}
wg.Done()
}()
}
b.StartTimer()
for i := 0; i < b.N; i++ {
ch <- i
}
b.StopTimer()
close(ch)
wg.Wait()
conn.Close()
}
func runStream(b *testing.B, maxConcurrentCalls int) {
s := stats.AddStats(b, 38)
b.StopTimer()
target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1)))
defer stopper()
conn := NewClientConn(target, grpc.WithInsecure())
tc := testpb.NewBenchmarkServiceClient(conn)
// Warm up connection.
stream, err := tc.StreamingCall(context.Background())
if err != nil {
b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
for i := 0; i < 10; i++ {
streamCaller(stream)
}
ch := make(chan struct{}, maxConcurrentCalls*4)
var (
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(maxConcurrentCalls)
// Distribute the b.N calls over maxConcurrentCalls workers.
for i := 0; i < maxConcurrentCalls; i++ {
stream, err := tc.StreamingCall(context.Background())
if err != nil {
b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
go func() {
for range ch {
start := time.Now()
streamCaller(stream)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
}
wg.Done()
}()
}
b.StartTimer()
for i := 0; i < b.N; i++ {
ch <- struct{}{}
}
b.StopTimer()
close(ch)
wg.Wait()
conn.Close()
}
func unaryCaller(client testpb.BenchmarkServiceClient) {
if err := DoUnaryCall(client, 1, 1); err != nil {
grpclog.Fatalf("DoUnaryCall failed: %v", err)
}
}
func streamCaller(stream testpb.BenchmarkService_StreamingCallClient) {
if err := DoStreamingRoundTrip(stream, 1, 1); err != nil {
grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)
}
}
func BenchmarkClientStreamc1(b *testing.B) {
grpc.EnableTracing = true
runStream(b, 1)
}
func BenchmarkClientStreamc8(b *testing.B) {
grpc.EnableTracing = true
runStream(b, 8)
}
func BenchmarkClientStreamc64(b *testing.B) {
grpc.EnableTracing = true
runStream(b, 64)
}
func BenchmarkClientStreamc512(b *testing.B) {
grpc.EnableTracing = true
runStream(b, 512)
}
func BenchmarkClientUnaryc1(b *testing.B) {
grpc.EnableTracing = true
runUnary(b, 1)
}
func BenchmarkClientUnaryc8(b *testing.B) {
grpc.EnableTracing = true
runUnary(b, 8)
}
func BenchmarkClientUnaryc64(b *testing.B) {
grpc.EnableTracing = true
runUnary(b, 64)
}
func BenchmarkClientUnaryc512(b *testing.B) {
grpc.EnableTracing = true
runUnary(b, 512)
}
func BenchmarkClientStreamNoTracec1(b *testing.B) {
grpc.EnableTracing = false
runStream(b, 1)
}
func BenchmarkClientStreamNoTracec8(b *testing.B) {
grpc.EnableTracing = false
runStream(b, 8)
}
func BenchmarkClientStreamNoTracec64(b *testing.B) {
grpc.EnableTracing = false
runStream(b, 64)
}
func BenchmarkClientStreamNoTracec512(b *testing.B) {
grpc.EnableTracing = false
runStream(b, 512)
}
func BenchmarkClientUnaryNoTracec1(b *testing.B) {
grpc.EnableTracing = false
runUnary(b, 1)
}
func BenchmarkClientUnaryNoTracec8(b *testing.B) {
grpc.EnableTracing = false
runUnary(b, 8)
}
func BenchmarkClientUnaryNoTracec64(b *testing.B) {
grpc.EnableTracing = false
runUnary(b, 64)
}
func BenchmarkClientUnaryNoTracec512(b *testing.B) {
grpc.EnableTracing = false
runUnary(b, 512)
}
func TestMain(m *testing.M) {
os.Exit(stats.RunTestMain(m))
}

View File

@ -50,7 +50,7 @@ func AddStatsWithName(b *testing.B, name string, numBuckets int) *Stats {
}
p := strings.Split(runtime.FuncForPC(pc).Name(), ".")
benchName = p[len(p)-1]
if strings.HasPrefix(benchName, "Benchmark") {
if strings.HasPrefix(benchName, "run") {
break
}
}
@ -148,9 +148,8 @@ func splitLines(data []byte, eof bool) (advance int, token []byte, err error) {
func injectStatsIfFinished(line string) {
injectCond.L.Lock()
defer injectCond.L.Unlock()
// We assume that the benchmark results start with the benchmark name.
if curB == nil || !strings.HasPrefix(line, curBenchName) {
// We assume that the benchmark results start with "Benchmark".
if curB == nil || !strings.HasPrefix(line, "Benchmark") {
return
}