diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index 05aef43a4..dca06ee76 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -42,7 +42,6 @@ package main import ( "context" "encoding/gob" - "errors" "flag" "fmt" "io" @@ -53,7 +52,6 @@ import ( "reflect" "runtime" "runtime/pprof" - "strconv" "strings" "sync" "sync/atomic" @@ -62,6 +60,7 @@ import ( "google.golang.org/grpc" bm "google.golang.org/grpc/benchmark" + "google.golang.org/grpc/benchmark/flags" testpb "google.golang.org/grpc/benchmark/grpc_testing" "google.golang.org/grpc/benchmark/latency" "google.golang.org/grpc/benchmark/stats" @@ -70,70 +69,119 @@ import ( "google.golang.org/grpc/test/bufconn" ) -const ( - modeOn = "on" - modeOff = "off" - modeBoth = "both" - - // compression modes - modeAll = "all" - modeGzip = "gzip" - modeNop = "nop" +var ( + workloads = flags.StringWithAllowedValues("workloads", workloadsAll, + fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")), allWorkloads) + traceMode = flags.StringWithAllowedValues("trace", toggleModeOff, + fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) + preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff, + fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) + channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff, + fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) + compressorMode = flags.StringWithAllowedValues("compression", compModeOff, + fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes) + networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone, + "Network mode includes LAN, WAN, Local and Longhaul", allNetworkModes) + readLatency = flags.DurationSlice("latency", defaultReadLatency, "Simulated one-way network latency - may be a comma-separated list") + readKbps = flags.IntSlice("kbps", defaultReadKbps, "Simulated network throughput (in kbps) - may be a comma-separated list") + readMTU = flags.IntSlice("mtu", defaultReadMTU, "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list") + maxConcurrentCalls = flags.IntSlice("maxConcurrentCalls", defaultMaxConcurrentCalls, "Number of concurrent RPCs during benchmarks") + readReqSizeBytes = flags.IntSlice("reqSizeBytes", defaultReqSizeBytes, "Request size in bytes - may be a comma-separated list") + readRespSizeBytes = flags.IntSlice("respSizeBytes", defaultRespSizeBytes, "Response size in bytes - may be a comma-separated list") + benchTime = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark") + memProfile = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.") + memProfileRate = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+ + "memProfile should be set before setting profile rate. To include every allocated block in the profile, "+ + "set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.") + cpuProfile = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided") + benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file") + useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O") ) -var allCompressionModes = []string{modeOff, modeGzip, modeNop, modeAll} -var allTraceModes = []string{modeOn, modeOff, modeBoth} -var allPreloaderModes = []string{modeOn, modeOff, modeBoth} - const ( workloadsUnary = "unary" workloadsStreaming = "streaming" workloadsUnconstrained = "unconstrained" workloadsAll = "all" + // Compression modes. + compModeOff = "off" + compModeGzip = "gzip" + compModeNop = "nop" + compModeAll = "all" + // Toggle modes. + toggleModeOff = "off" + toggleModeOn = "on" + toggleModeBoth = "both" + // Network modes. + networkModeNone = "none" + networkModeLocal = "Local" + networkModeLAN = "LAN" + networkModeWAN = "WAN" + networkLongHaul = "Longhaul" + + numStatsBuckets = 10 ) -var allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll} - var ( - runMode = []bool{true, true, true} // {runUnary, runStream, runUnconstrained} - // When set the latency to 0 (no delay), the result is slower than the real result with no delay - // because latency simulation section has extra operations - ltc = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay. - kbps = []int{0, 10240} // if non-positive, infinite - mtu = []int{0} // if non-positive, infinite - maxConcurrentCalls = []int{1, 8, 64, 512} - reqSizeBytes = []int{1, 1024, 1024 * 1024} - respSizeBytes = []int{1, 1024, 1024 * 1024} - enableTrace []bool - benchtime time.Duration - memProfile, cpuProfile string - memProfileRate int - modeCompressor []string - enablePreloader []bool - enableChannelz []bool - networkMode string - benchmarkResultFile string - networks = map[string]latency.Network{ - "Local": latency.Local, - "LAN": latency.LAN, - "WAN": latency.WAN, - "Longhaul": latency.Longhaul, + allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll} + allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll} + allToggleModes = []string{toggleModeOff, toggleModeOn, toggleModeBoth} + allNetworkModes = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul} + defaultReadLatency = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay. + defaultReadKbps = []int{0, 10240} // if non-positive, infinite + defaultReadMTU = []int{0} // if non-positive, infinite + defaultMaxConcurrentCalls = []int{1, 8, 64, 512} + defaultReqSizeBytes = []int{1, 1024, 1024 * 1024} + defaultRespSizeBytes = []int{1, 1024, 1024 * 1024} + networks = map[string]latency.Network{ + networkModeLocal: latency.Local, + networkModeLAN: latency.LAN, + networkModeWAN: latency.WAN, + networkLongHaul: latency.Longhaul, } ) -func unaryBenchmark(startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) uint64 { +// runModes indicates the workloads to run. This is initialized with a call to +// `runModesFromWorkloads`, passing the workloads flag set by the user. +type runModes struct { + unary, streaming, unconstrained bool +} + +// runModesFromWorkloads determines the runModes based on the value of +// workloads flag set by the user. +func runModesFromWorkloads(workload string) runModes { + r := runModes{} + switch workload { + case workloadsUnary: + r.unary = true + case workloadsStreaming: + r.streaming = true + case workloadsUnconstrained: + r.unconstrained = true + case workloadsAll: + r.unary = true + r.streaming = true + r.unconstrained = true + default: + log.Fatalf("Unknown workloads setting: %v (want one of: %v)", + workloads, strings.Join(allWorkloads, ", ")) + } + return r +} + +func unaryBenchmark(startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchTime time.Duration, s *stats.Stats) uint64 { caller, cleanup := makeFuncUnary(benchFeatures) defer cleanup() - return runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s) + return runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchTime, s) } -func streamBenchmark(startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) uint64 { +func streamBenchmark(startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchTime time.Duration, s *stats.Stats) uint64 { caller, cleanup := makeFuncStream(benchFeatures) defer cleanup() - return runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchtime, s) + return runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchTime, s) } -func unconstrainedStreamBenchmark(benchFeatures stats.Features, warmuptime, benchtime time.Duration) (uint64, uint64) { +func unconstrainedStreamBenchmark(benchFeatures stats.Features, warmuptime, benchTime time.Duration) (uint64, uint64) { var sender, recver func(int) var cleanup func() if benchFeatures.EnablePreloader { @@ -157,7 +205,7 @@ func unconstrainedStreamBenchmark(benchFeatures stats.Features, warmuptime, benc atomic.StoreUint64(&responseCount, 0) }() - bmEnd := time.Now().Add(benchtime + warmuptime) + bmEnd := time.Now().Add(benchTime + warmuptime) for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { go func(pos int) { for { @@ -190,7 +238,7 @@ func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, fu nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} opts := []grpc.DialOption{} sopts := []grpc.ServerOption{} - if benchFeatures.ModeCompressor == "nop" { + if benchFeatures.ModeCompressor == compModeNop { sopts = append(sopts, grpc.RPCCompressor(nopCompressor{}), grpc.RPCDecompressor(nopDecompressor{}), @@ -200,7 +248,7 @@ func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, fu grpc.WithDecompressor(nopDecompressor{}), ) } - if benchFeatures.ModeCompressor == "gzip" { + if benchFeatures.ModeCompressor == compModeGzip { sopts = append(sopts, grpc.RPCCompressor(grpc.NewGZIPCompressor()), grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), @@ -214,7 +262,7 @@ func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, fu opts = append(opts, grpc.WithInsecure()) var lis net.Listener - if *useBufconn { + if benchFeatures.UseBufConn { bcLis := bufconn.Listen(256 * 1024) lis = bcLis opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) { @@ -328,7 +376,7 @@ func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, r } } -func runBenchmark(caller func(int), startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchtime time.Duration, s *stats.Stats) uint64 { +func runBenchmark(caller func(int), startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchTime time.Duration, s *stats.Stats) uint64 { // Warm up connection. for i := 0; i < 10; i++ { caller(0) @@ -340,7 +388,7 @@ func runBenchmark(caller func(int), startTimer func(), stopTimer func(uint64), b wg sync.WaitGroup ) wg.Add(benchFeatures.MaxConcurrentCalls) - bmEnd := time.Now().Add(benchtime) + bmEnd := time.Now().Add(benchTime) var count uint64 for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { go func(pos int) { @@ -365,178 +413,258 @@ func runBenchmark(caller func(int), startTimer func(), stopTimer func(uint64), b return count } -var useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O") +// benchOpts represents all configurable options available while running this +// benchmark. This is built from the values passed as flags. +type benchOpts struct { + rModes runModes + benchTime time.Duration + memProfileRate int + memProfile string + cpuProfile string + networkMode string + benchmarkResultFile string + useBufconn bool + features *featureOpts +} -// Initiate main function to get settings of features. -func init() { - var ( - workloads, traceMode, compressorMode, readLatency, channelzOn string - preloaderMode string - readKbps, readMtu, readMaxConcurrentCalls intSliceType - readReqSizeBytes, readRespSizeBytes intSliceType - ) - flag.StringVar(&workloads, "workloads", workloadsAll, - fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", "))) - flag.StringVar(&traceMode, "trace", modeOff, - fmt.Sprintf("Trace mode - One of: %v", strings.Join(allTraceModes, ", "))) - flag.StringVar(&readLatency, "latency", "", "Simulated one-way network latency - may be a comma-separated list") - flag.StringVar(&channelzOn, "channelz", modeOff, "whether channelz should be turned on") - flag.DurationVar(&benchtime, "benchtime", time.Second, "Configures the amount of time to run each benchmark") - flag.Var(&readKbps, "kbps", "Simulated network throughput (in kbps) - may be a comma-separated list") - flag.Var(&readMtu, "mtu", "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list") - flag.Var(&readMaxConcurrentCalls, "maxConcurrentCalls", "Number of concurrent RPCs during benchmarks") - flag.Var(&readReqSizeBytes, "reqSizeBytes", "Request size in bytes - may be a comma-separated list") - flag.Var(&readRespSizeBytes, "respSizeBytes", "Response size in bytes - may be a comma-separated list") - flag.StringVar(&memProfile, "memProfile", "", "Enables memory profiling output to the filename provided.") - flag.IntVar(&memProfileRate, "memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+ - "memProfile should be set before setting profile rate. To include every allocated block in the profile, "+ - "set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.") - flag.StringVar(&cpuProfile, "cpuProfile", "", "Enables CPU profiling output to the filename provided") - flag.StringVar(&compressorMode, "compression", modeOff, - fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompressionModes, ", "))) - flag.StringVar(&preloaderMode, "preloader", modeOff, - fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allPreloaderModes, ", "))) - flag.StringVar(&benchmarkResultFile, "resultFile", "", "Save the benchmark result into a binary file") - flag.StringVar(&networkMode, "networkMode", "", "Network mode includes LAN, WAN, Local and Longhaul") +// featureOpts represents options which can have multiple values. The user +// usually provides a comma-separated list of options for each of these +// features through command line flags. We generate all possible combinations +// for the provided values and run the benchmarks for each combination. +type featureOpts struct { + enableTrace []bool // Feature index 0 + readLatencies []time.Duration // Feature index 1 + readKbps []int // Feature index 2 + readMTU []int // Feature index 3 + maxConcurrentCalls []int // Feature index 4 + reqSizeBytes []int // Feature index 5 + respSizeBytes []int // Feature index 6 + compModes []string // Feature index 7 + enableChannelz []bool // Feature index 8 + enablePreloader []bool // Feature index 9 +} + +// featureIndex is an enum for the different features that could be configured +// by the user through command line flags. +type featureIndex int + +const ( + enableTraceIndex featureIndex = iota + readLatenciesIndex + readKbpsIndex + readMTUIndex + maxConcurrentCallsIndex + reqSizeBytesIndex + respSizeBytesIndex + compModesIndex + enableChannelzIndex + enablePreloaderIndex + + // This is a place holder to indicate the total number of feature indices we + // have. Any new feature indices should be added above this. + maxFeatureIndex +) + +// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each +// element of the slice (indexed by 'featuresIndex' enum) contains the number +// of features to be exercised by the benchmark code. +// For example: Index 0 of the returned slice contains the number of values for +// enableTrace feature, while index 1 contains the number of value of +// readLatencies feature and so on. +func makeFeaturesNum(b *benchOpts) []int { + featuresNum := make([]int, maxFeatureIndex) + for i := 0; i < len(featuresNum); i++ { + switch featureIndex(i) { + case enableTraceIndex: + featuresNum[i] = len(b.features.enableTrace) + case readLatenciesIndex: + featuresNum[i] = len(b.features.readLatencies) + case readKbpsIndex: + featuresNum[i] = len(b.features.readKbps) + case readMTUIndex: + featuresNum[i] = len(b.features.readMTU) + case maxConcurrentCallsIndex: + featuresNum[i] = len(b.features.maxConcurrentCalls) + case reqSizeBytesIndex: + featuresNum[i] = len(b.features.reqSizeBytes) + case respSizeBytesIndex: + featuresNum[i] = len(b.features.respSizeBytes) + case compModesIndex: + featuresNum[i] = len(b.features.compModes) + case enableChannelzIndex: + featuresNum[i] = len(b.features.enableChannelz) + case enablePreloaderIndex: + featuresNum[i] = len(b.features.enablePreloader) + default: + log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, maxFeatureIndex) + } + } + return featuresNum +} + +// sharedFeatures returns a bool slice which acts as a bitmask. Each item in +// the slice represents a feature, indexed by 'featureIndex' enum. The bit is +// set to 1 if the corresponding feature does not have multiple value, so is +// shared amongst all benchmarks. +func sharedFeatures(featuresNum []int) []bool { + result := make([]bool, len(featuresNum)) + for i, num := range featuresNum { + if num <= 1 { + result[i] = true + } + } + return result +} + +// generateFeatures generates all combinations of the provided feature options. +// While all the feature options are stored in the benchOpts struct, the input +// parameter 'featuresNum' is a slice indexed by 'featureIndex' enum containing +// the number of values for each feature. +// For example, let's say the user sets -workloads=all and +// -maxConcurrentCalls=1,100, this would end up with the following +// combinations: +// [workloads: unary, maxConcurrentCalls=1] +// [workloads: unary, maxConcurrentCalls=1] +// [workloads: streaming, maxConcurrentCalls=100] +// [workloads: streaming, maxConcurrentCalls=100] +// [workloads: unconstrained, maxConcurrentCalls=1] +// [workloads: unconstrained, maxConcurrentCalls=100] +func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features { + // curPos and initialPos are two slices where each value acts as an index + // into the appropriate feature slice maintained in benchOpts.features. This + // loop generates all possible combinations of features by changing one value + // at a time, and once curPos becomes equal to intialPos, we have explored + // all options. + var result []stats.Features + var curPos []int + initialPos := make([]int, maxFeatureIndex) + for !reflect.DeepEqual(initialPos, curPos) { + if curPos == nil { + curPos = make([]int, maxFeatureIndex) + } + result = append(result, stats.Features{ + // These features stay the same for each iteration. + NetworkMode: b.networkMode, + UseBufConn: b.useBufconn, + // These features can potentially change for each iteration. + EnableTrace: b.features.enableTrace[curPos[enableTraceIndex]], + Latency: b.features.readLatencies[curPos[readLatenciesIndex]], + Kbps: b.features.readKbps[curPos[readKbpsIndex]], + Mtu: b.features.readMTU[curPos[readMTUIndex]], + MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[maxConcurrentCallsIndex]], + ReqSizeBytes: b.features.reqSizeBytes[curPos[reqSizeBytesIndex]], + RespSizeBytes: b.features.respSizeBytes[curPos[respSizeBytesIndex]], + ModeCompressor: b.features.compModes[curPos[compModesIndex]], + EnableChannelz: b.features.enableChannelz[curPos[enableChannelzIndex]], + EnablePreloader: b.features.enablePreloader[curPos[enablePreloaderIndex]], + }) + addOne(curPos, featuresNum) + } + return result +} + +// addOne mutates the input slice 'features' by changing one feature, thus +// arriving at the next combination of feature values. 'featuresMaxPosition' +// provides the numbers of allowed values for each feature, indexed by +// 'featureIndex' enum. +func addOne(features []int, featuresMaxPosition []int) { + for i := len(features) - 1; i >= 0; i-- { + features[i] = (features[i] + 1) + if features[i]/featuresMaxPosition[i] == 0 { + break + } + features[i] = features[i] % featuresMaxPosition[i] + } +} + +// processFlags reads the command line flags and builds benchOpts. Specifying +// invalid values for certain flags will cause flag.Parse() to fail, and the +// program to terminate. +// This *SHOULD* be the only place where the flags are accessed. All other +// parts of the benchmark code should rely on the returned benchOpts. +func processFlags() *benchOpts { flag.Parse() if flag.NArg() != 0 { log.Fatal("Error: unparsed arguments: ", flag.Args()) } - switch workloads { - case workloadsUnary: - runMode[0] = true - runMode[1] = false - runMode[2] = false - case workloadsStreaming: - runMode[0] = false - runMode[1] = true - runMode[2] = false - case workloadsUnconstrained: - runMode[0] = false - runMode[1] = false - runMode[2] = true - case workloadsAll: - runMode[0] = true - runMode[1] = true - runMode[2] = true - default: - log.Fatalf("Unknown workloads setting: %v (want one of: %v)", - workloads, strings.Join(allWorkloads, ", ")) + + opts := &benchOpts{ + rModes: runModesFromWorkloads(*workloads), + benchTime: *benchTime, + memProfileRate: *memProfileRate, + memProfile: *memProfile, + cpuProfile: *cpuProfile, + networkMode: *networkMode, + benchmarkResultFile: *benchmarkResultFile, + useBufconn: *useBufconn, + features: &featureOpts{ + enableTrace: setToggleMode(*traceMode), + readLatencies: append([]time.Duration(nil), *readLatency...), + readKbps: append([]int(nil), *readKbps...), + readMTU: append([]int(nil), *readMTU...), + maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...), + reqSizeBytes: append([]int(nil), *readReqSizeBytes...), + respSizeBytes: append([]int(nil), *readRespSizeBytes...), + compModes: setCompressorMode(*compressorMode), + enableChannelz: setToggleMode(*channelzOn), + enablePreloader: setToggleMode(*preloaderMode), + }, } - modeCompressor = setModeCompressor(compressorMode) - enablePreloader = setMode(preloaderMode) - enableTrace = setMode(traceMode) - enableChannelz = setMode(channelzOn) - // Time input formats as (time + unit). - readTimeFromInput(<c, readLatency) - readIntFromIntSlice(&kbps, readKbps) - readIntFromIntSlice(&mtu, readMtu) - readIntFromIntSlice(&maxConcurrentCalls, readMaxConcurrentCalls) - readIntFromIntSlice(&reqSizeBytes, readReqSizeBytes) - readIntFromIntSlice(&respSizeBytes, readRespSizeBytes) + // Re-write latency, kpbs and mtu if network mode is set. - if network, ok := networks[networkMode]; ok { - ltc = []time.Duration{network.Latency} - kbps = []int{network.Kbps} - mtu = []int{network.MTU} + if network, ok := networks[opts.networkMode]; ok { + opts.features.readLatencies = []time.Duration{network.Latency} + opts.features.readKbps = []int{network.Kbps} + opts.features.readMTU = []int{network.MTU} } + return opts } -func setMode(name string) []bool { - switch name { - case modeOn: +func setToggleMode(val string) []bool { + switch val { + case toggleModeOn: return []bool{true} - case modeOff: + case toggleModeOff: return []bool{false} - case modeBoth: + case toggleModeBoth: return []bool{false, true} default: - log.Fatalf("Unknown %s setting: %v (want one of: %v)", - name, name, strings.Join(allTraceModes, ", ")) + // This should never happen because a wrong value passed to this flag would + // be caught during flag.Parse(). return []bool{} } } -func setModeCompressor(name string) []string { - switch name { - case modeNop: - return []string{"nop"} - case modeGzip: - return []string{"gzip"} - case modeAll: - return []string{"off", "nop", "gzip"} - case modeOff: - return []string{"off"} +func setCompressorMode(val string) []string { + switch val { + case compModeNop, compModeGzip, compModeOff: + return []string{val} + case compModeAll: + return []string{compModeNop, compModeGzip, compModeOff} default: - log.Fatalf("Unknown %s setting: %v (want one of: %v)", - name, name, strings.Join(allCompressionModes, ", ")) + // This should never happen because a wrong value passed to this flag would + // be caught during flag.Parse(). return []string{} } } -type intSliceType []int - -func (intSlice *intSliceType) String() string { - return fmt.Sprintf("%v", *intSlice) -} - -func (intSlice *intSliceType) Set(value string) error { - if len(*intSlice) > 0 { - return errors.New("interval flag already set") - } - for _, num := range strings.Split(value, ",") { - next, err := strconv.Atoi(num) - if err != nil { - return err - } - *intSlice = append(*intSlice, next) - } - return nil -} - -func readIntFromIntSlice(values *[]int, replace intSliceType) { - // If not set replace in the flag, just return to run the default settings. - if len(replace) == 0 { - return - } - *values = replace -} - -func readTimeFromInput(values *[]time.Duration, replace string) { - if strings.Compare(replace, "") != 0 { - *values = []time.Duration{} - for _, ltc := range strings.Split(replace, ",") { - duration, err := time.ParseDuration(ltc) - if err != nil { - log.Fatal(err.Error()) - } - *values = append(*values, duration) - } - } -} - -func printThroughput(requestCount uint64, requestSize int, responseCount uint64, responseSize int) { - requestThroughput := float64(requestCount) * float64(requestSize) * 8 / benchtime.Seconds() - responseThroughput := float64(responseCount) * float64(responseSize) * 8 / benchtime.Seconds() +func printThroughput(requestCount uint64, requestSize int, responseCount uint64, responseSize int, benchTime time.Duration) { + requestThroughput := float64(requestCount) * float64(requestSize) * 8 / benchTime.Seconds() + responseThroughput := float64(responseCount) * float64(responseSize) * 8 / benchTime.Seconds() fmt.Printf("Number of requests: %v\tRequest throughput: %v bit/s\n", requestCount, requestThroughput) fmt.Printf("Number of responses: %v\tResponse throughput: %v bit/s\n", responseCount, responseThroughput) fmt.Println() } func main() { - before() - featuresPos := make([]int, 10) - // 0:enableTracing 1:ltc 2:kbps 3:mtu 4:maxC 5:reqSize 6:respSize - featuresNum := []int{len(enableTrace), len(ltc), len(kbps), len(mtu), - len(maxConcurrentCalls), len(reqSizeBytes), len(respSizeBytes), len(modeCompressor), len(enableChannelz), len(enablePreloader)} - initalPos := make([]int, len(featuresPos)) - s := stats.NewStats(10) + opts := processFlags() + before(opts) + s := stats.NewStats(numStatsBuckets) s.SortLatency() var memStats runtime.MemStats var results testing.BenchmarkResult var startAllocs, startBytes uint64 var startTime time.Time - start := true var startTimer = func() { runtime.ReadMemStats(&memStats) startAllocs = memStats.Mallocs @@ -545,74 +673,59 @@ func main() { } var stopTimer = func(count uint64) { runtime.ReadMemStats(&memStats) - results = testing.BenchmarkResult{N: int(count), T: time.Since(startTime), - Bytes: 0, MemAllocs: memStats.Mallocs - startAllocs, MemBytes: memStats.TotalAlloc - startBytes} - } - sharedPos := make([]bool, len(featuresPos)) - for i := 0; i < len(featuresPos); i++ { - if featuresNum[i] <= 1 { - sharedPos[i] = true + results = testing.BenchmarkResult{ + N: int(count), + T: time.Since(startTime), + Bytes: 0, + MemAllocs: memStats.Mallocs - startAllocs, + MemBytes: memStats.TotalAlloc - startBytes, } } // Run benchmarks resultSlice := []stats.BenchResults{} - for !reflect.DeepEqual(featuresPos, initalPos) || start { - start = false - benchFeature := stats.Features{ - NetworkMode: networkMode, - EnableTrace: enableTrace[featuresPos[0]], - Latency: ltc[featuresPos[1]], - Kbps: kbps[featuresPos[2]], - Mtu: mtu[featuresPos[3]], - MaxConcurrentCalls: maxConcurrentCalls[featuresPos[4]], - ReqSizeBytes: reqSizeBytes[featuresPos[5]], - RespSizeBytes: respSizeBytes[featuresPos[6]], - ModeCompressor: modeCompressor[featuresPos[7]], - EnableChannelz: enableChannelz[featuresPos[8]], - EnablePreloader: enablePreloader[featuresPos[9]], - } - - grpc.EnableTracing = enableTrace[featuresPos[0]] - if enableChannelz[featuresPos[8]] { + featuresNum := makeFeaturesNum(opts) + sharedPos := sharedFeatures(featuresNum) + for _, benchFeature := range opts.generateFeatures(featuresNum) { + grpc.EnableTracing = benchFeature.EnableTrace + if benchFeature.EnableChannelz { channelz.TurnOn() } - if runMode[0] { - count := unaryBenchmark(startTimer, stopTimer, benchFeature, benchtime, s) + if opts.rModes.unary { + count := unaryBenchmark(startTimer, stopTimer, benchFeature, opts.benchTime, s) s.SetBenchmarkResult("Unary", benchFeature, results.N, results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos) fmt.Println(s.BenchString()) fmt.Println(s.String()) - printThroughput(count, benchFeature.ReqSizeBytes, count, benchFeature.RespSizeBytes) + printThroughput(count, benchFeature.ReqSizeBytes, count, benchFeature.RespSizeBytes, opts.benchTime) resultSlice = append(resultSlice, s.GetBenchmarkResults()) s.Clear() } - if runMode[1] { - count := streamBenchmark(startTimer, stopTimer, benchFeature, benchtime, s) + if opts.rModes.streaming { + count := streamBenchmark(startTimer, stopTimer, benchFeature, opts.benchTime, s) s.SetBenchmarkResult("Stream", benchFeature, results.N, results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos) fmt.Println(s.BenchString()) fmt.Println(s.String()) - printThroughput(count, benchFeature.ReqSizeBytes, count, benchFeature.RespSizeBytes) + printThroughput(count, benchFeature.ReqSizeBytes, count, benchFeature.RespSizeBytes, opts.benchTime) resultSlice = append(resultSlice, s.GetBenchmarkResults()) s.Clear() } - if runMode[2] { - requestCount, responseCount := unconstrainedStreamBenchmark(benchFeature, time.Second, benchtime) + if opts.rModes.unconstrained { + requestCount, responseCount := unconstrainedStreamBenchmark(benchFeature, time.Second, opts.benchTime) fmt.Printf("Unconstrained Stream-%v\n", benchFeature) - printThroughput(requestCount, benchFeature.ReqSizeBytes, responseCount, benchFeature.RespSizeBytes) + printThroughput(requestCount, benchFeature.ReqSizeBytes, responseCount, benchFeature.RespSizeBytes, opts.benchTime) } - bm.AddOne(featuresPos, featuresNum) } - after(resultSlice) + after(opts, resultSlice) } -func before() { - if memProfile != "" { - runtime.MemProfileRate = memProfileRate +func before(opts *benchOpts) { + if opts.memProfile != "" { + runtime.MemProfileRate = opts.memProfileRate } - if cpuProfile != "" { - f, err := os.Create(cpuProfile) + if opts.cpuProfile != "" { + f, err := os.Create(opts.cpuProfile) if err != nil { fmt.Fprintf(os.Stderr, "testing: %s\n", err) return @@ -625,27 +738,27 @@ func before() { } } -func after(data []stats.BenchResults) { - if cpuProfile != "" { +func after(opts *benchOpts, data []stats.BenchResults) { + if opts.cpuProfile != "" { pprof.StopCPUProfile() // flushes profile to disk } - if memProfile != "" { - f, err := os.Create(memProfile) + if opts.memProfile != "" { + f, err := os.Create(opts.memProfile) if err != nil { fmt.Fprintf(os.Stderr, "testing: %s\n", err) os.Exit(2) } runtime.GC() // materialize all statistics if err = pprof.WriteHeapProfile(f); err != nil { - fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", memProfile, err) + fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", opts.memProfile, err) os.Exit(2) } f.Close() } - if benchmarkResultFile != "" { - f, err := os.Create(benchmarkResultFile) + if opts.benchmarkResultFile != "" { + f, err := os.Create(opts.benchmarkResultFile) if err != nil { - log.Fatalf("testing: can't write benchmark result %s: %s\n", benchmarkResultFile, err) + log.Fatalf("testing: can't write benchmark result %s: %s\n", opts.benchmarkResultFile, err) } dataEncoder := gob.NewEncoder(f) dataEncoder.Encode(data) @@ -667,10 +780,10 @@ func (nopCompressor) Do(w io.Writer, p []byte) error { return nil } -func (nopCompressor) Type() string { return "nop" } +func (nopCompressor) Type() string { return compModeNop } // nopDecompressor is a decompressor that just copies data. type nopDecompressor struct{} func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return ioutil.ReadAll(r) } -func (nopDecompressor) Type() string { return "nop" } +func (nopDecompressor) Type() string { return compModeNop } diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 511fc4fba..b5e983721 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -29,30 +29,14 @@ import ( "io" "log" "net" - "sync" - "testing" - "time" "google.golang.org/grpc" testpb "google.golang.org/grpc/benchmark/grpc_testing" - "google.golang.org/grpc/benchmark/latency" - "google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/status" ) -// AddOne add 1 to the features slice -func AddOne(features []int, featuresMaxPosition []int) { - for i := len(features) - 1; i >= 0; i-- { - features[i] = (features[i] + 1) - if features[i]/featuresMaxPosition[i] == 0 { - break - } - features[i] = features[i] % featuresMaxPosition[i] - } -} - // Allows reuse of the same testpb.Payload object. func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) { if size < 0 { @@ -306,131 +290,3 @@ func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.Dia } return conn } - -func runUnary(b *testing.B, benchFeatures stats.Features) { - s := stats.AddStats(b, 38) - nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - grpclog.Fatalf("Failed to listen: %v", err) - } - target := lis.Addr().String() - lis = nw.Listener(lis) - stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) - defer stopper() - conn := NewClientConn( - target, grpc.WithInsecure(), - grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) { - return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", address) - }), - ) - tc := testpb.NewBenchmarkServiceClient(conn) - - // Warm up connection. - for i := 0; i < 10; i++ { - unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) - } - ch := make(chan int, benchFeatures.MaxConcurrentCalls*4) - var ( - mu sync.Mutex - wg sync.WaitGroup - ) - wg.Add(benchFeatures.MaxConcurrentCalls) - - // Distribute the b.N calls over maxConcurrentCalls workers. - for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { - go func() { - for range ch { - start := time.Now() - unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) - elapse := time.Since(start) - mu.Lock() - s.Add(elapse) - mu.Unlock() - } - wg.Done() - }() - } - b.ResetTimer() - for i := 0; i < b.N; i++ { - ch <- i - } - close(ch) - wg.Wait() - b.StopTimer() - conn.Close() -} - -func runStream(b *testing.B, benchFeatures stats.Features) { - s := stats.AddStats(b, 38) - nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - grpclog.Fatalf("Failed to listen: %v", err) - } - target := lis.Addr().String() - lis = nw.Listener(lis) - stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) - defer stopper() - conn := NewClientConn( - target, grpc.WithInsecure(), - grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) { - return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", address) - }), - ) - tc := testpb.NewBenchmarkServiceClient(conn) - - // Warm up connection. - stream, err := tc.StreamingCall(context.Background()) - if err != nil { - b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) - } - for i := 0; i < 10; i++ { - streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) - } - - ch := make(chan struct{}, benchFeatures.MaxConcurrentCalls*4) - var ( - mu sync.Mutex - wg sync.WaitGroup - ) - wg.Add(benchFeatures.MaxConcurrentCalls) - - // Distribute the b.N calls over maxConcurrentCalls workers. - for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { - stream, err := tc.StreamingCall(context.Background()) - if err != nil { - b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) - } - go func() { - for range ch { - start := time.Now() - streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) - elapse := time.Since(start) - mu.Lock() - s.Add(elapse) - mu.Unlock() - } - wg.Done() - }() - } - b.ResetTimer() - for i := 0; i < b.N; i++ { - ch <- struct{}{} - } - close(ch) - wg.Wait() - b.StopTimer() - conn.Close() -} -func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) { - if err := DoUnaryCall(client, reqSize, respSize); err != nil { - grpclog.Fatalf("DoUnaryCall failed: %v", err) - } -} - -func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) { - if err := DoStreamingRoundTrip(stream, reqSize, respSize); err != nil { - grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err) - } -} diff --git a/benchmark/benchmark_test.go b/benchmark/benchmark_test.go deleted file mode 100644 index dd4984a26..000000000 --- a/benchmark/benchmark_test.go +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package benchmark - -import ( - "fmt" - "os" - "reflect" - "testing" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/benchmark/stats" -) - -func BenchmarkClient(b *testing.B) { - enableTrace := []bool{true, false} // run both enable and disable by default - // When set the latency to 0 (no delay), the result is slower than the real result with no delay - // because latency simulation section has extra operations - latency := []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay. - kbps := []int{0, 10240} // if non-positive, infinite - mtu := []int{0} // if non-positive, infinite - maxConcurrentCalls := []int{1, 8, 64, 512} - reqSizeBytes := []int{1, 1024 * 1024} - respSizeBytes := []int{1, 1024 * 1024} - featuresCurPos := make([]int, 7) - - // 0:enableTracing 1:md 2:ltc 3:kbps 4:mtu 5:maxC 6:connCount 7:reqSize 8:respSize - featuresMaxPosition := []int{len(enableTrace), len(latency), len(kbps), len(mtu), len(maxConcurrentCalls), len(reqSizeBytes), len(respSizeBytes)} - initalPos := make([]int, len(featuresCurPos)) - - // run benchmarks - start := true - for !reflect.DeepEqual(featuresCurPos, initalPos) || start { - start = false - tracing := "Trace" - if !enableTrace[featuresCurPos[0]] { - tracing = "noTrace" - } - - benchFeature := stats.Features{ - EnableTrace: enableTrace[featuresCurPos[0]], - Latency: latency[featuresCurPos[1]], - Kbps: kbps[featuresCurPos[2]], - Mtu: mtu[featuresCurPos[3]], - MaxConcurrentCalls: maxConcurrentCalls[featuresCurPos[4]], - ReqSizeBytes: reqSizeBytes[featuresCurPos[5]], - RespSizeBytes: respSizeBytes[featuresCurPos[6]], - } - - grpc.EnableTracing = enableTrace[featuresCurPos[0]] - b.Run(fmt.Sprintf("Unary-%s-%s", - tracing, benchFeature.String()), func(b *testing.B) { - runUnary(b, benchFeature) - }) - - b.Run(fmt.Sprintf("Stream-%s-%s", - tracing, benchFeature.String()), func(b *testing.B) { - runStream(b, benchFeature) - }) - AddOne(featuresCurPos, featuresMaxPosition) - } -} - -func TestMain(m *testing.M) { - os.Exit(stats.RunTestMain(m)) -} diff --git a/benchmark/flags/flags.go b/benchmark/flags/flags.go new file mode 100644 index 000000000..9354b7367 --- /dev/null +++ b/benchmark/flags/flags.go @@ -0,0 +1,140 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* +Package flags provide convenience types and routines to accept specific types +of flag values on the command line. +*/ +package flags + +import ( + "bytes" + "flag" + "fmt" + "strconv" + "strings" + "time" +) + +// stringFlagWithAllowedValues represents a string flag which can only take a +// predefined set of values. +type stringFlagWithAllowedValues struct { + val string + allowed []string +} + +// StringWithAllowedValues returns a flag variable of type +// stringFlagWithAllowedValues configured with the provided parameters. +// 'allowed` is the set of values that this flag can be set to. +func StringWithAllowedValues(name, defaultVal, usage string, allowed []string) *string { + as := &stringFlagWithAllowedValues{defaultVal, allowed} + flag.CommandLine.Var(as, name, usage) + return &as.val +} + +// String implements the flag.Value interface. +func (as *stringFlagWithAllowedValues) String() string { + return as.val +} + +// Set implements the flag.Value interface. +func (as *stringFlagWithAllowedValues) Set(val string) error { + for _, a := range as.allowed { + if a == val { + as.val = val + return nil + } + } + return fmt.Errorf("want one of: %v", strings.Join(as.allowed, ", ")) +} + +type durationSliceValue []time.Duration + +// DurationSlice returns a flag representing a slice of time.Duration objects. +func DurationSlice(name string, defaultVal []time.Duration, usage string) *[]time.Duration { + ds := make([]time.Duration, len(defaultVal)) + copy(ds, defaultVal) + dsv := (*durationSliceValue)(&ds) + flag.CommandLine.Var(dsv, name, usage) + return &ds +} + +// Set implements the flag.Value interface. +func (dsv *durationSliceValue) Set(s string) error { + ds := strings.Split(s, ",") + var dd []time.Duration + for _, n := range ds { + d, err := time.ParseDuration(n) + if err != nil { + return err + } + dd = append(dd, d) + } + *dsv = durationSliceValue(dd) + return nil +} + +// String implements the flag.Value interface. +func (dsv *durationSliceValue) String() string { + var b bytes.Buffer + for i, d := range *dsv { + if i > 0 { + b.WriteRune(',') + } + b.WriteString(d.String()) + } + return b.String() +} + +type intSliceValue []int + +// IntSlice returns a flag representing a slice of ints. +func IntSlice(name string, defaultVal []int, usage string) *[]int { + is := make([]int, len(defaultVal)) + copy(is, defaultVal) + isv := (*intSliceValue)(&is) + flag.CommandLine.Var(isv, name, usage) + return &is +} + +// Set implements the flag.Value interface. +func (isv *intSliceValue) Set(s string) error { + is := strings.Split(s, ",") + var ret []int + for _, n := range is { + i, err := strconv.Atoi(n) + if err != nil { + return err + } + ret = append(ret, i) + } + *isv = intSliceValue(ret) + return nil +} + +// String implements the flag.Value interface. +func (isv *intSliceValue) String() string { + var b bytes.Buffer + for i, n := range *isv { + if i > 0 { + b.WriteRune(',') + } + b.WriteString(strconv.Itoa(n)) + } + return b.String() +} diff --git a/benchmark/flags/flags_test.go b/benchmark/flags/flags_test.go new file mode 100644 index 000000000..e1ac7c54e --- /dev/null +++ b/benchmark/flags/flags_test.go @@ -0,0 +1,113 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package flags + +import ( + "flag" + "reflect" + "testing" + "time" +) + +func TestStringWithAllowedValues(t *testing.T) { + const defaultVal = "default" + tests := []struct { + args string + allowed []string + wantVal string + wantErr bool + }{ + {"-workloads=all", []string{"unary", "streaming", "all"}, "all", false}, + {"-workloads=disallowed", []string{"unary", "streaming", "all"}, defaultVal, true}, + } + + for _, test := range tests { + flag.CommandLine = flag.NewFlagSet("test", flag.ContinueOnError) + var w = StringWithAllowedValues("workloads", defaultVal, "usage", test.allowed) + err := flag.CommandLine.Parse([]string{test.args}) + switch { + case !test.wantErr && err != nil: + t.Errorf("failed to parse command line args {%v}: %v", test.args, err) + case test.wantErr && err == nil: + t.Errorf("flag.Parse(%v) = nil, want non-nil error", test.args) + default: + if *w != test.wantVal { + t.Errorf("flag value is %v, want %v", *w, test.wantVal) + } + } + } +} + +func TestDurationSlice(t *testing.T) { + defaultVal := []time.Duration{time.Second, time.Nanosecond} + tests := []struct { + args string + wantVal []time.Duration + wantErr bool + }{ + {"-latencies=1s", []time.Duration{time.Second}, false}, + {"-latencies=1s,2s,3s", []time.Duration{time.Second, 2 * time.Second, 3 * time.Second}, false}, + {"-latencies=bad", defaultVal, true}, + } + + for _, test := range tests { + flag.CommandLine = flag.NewFlagSet("test", flag.ContinueOnError) + var w = DurationSlice("latencies", defaultVal, "usage") + err := flag.CommandLine.Parse([]string{test.args}) + switch { + case !test.wantErr && err != nil: + t.Errorf("failed to parse command line args {%v}: %v", test.args, err) + case test.wantErr && err == nil: + t.Errorf("flag.Parse(%v) = nil, want non-nil error", test.args) + default: + if !reflect.DeepEqual(*w, test.wantVal) { + t.Errorf("flag value is %v, want %v", *w, test.wantVal) + } + } + } +} + +func TestIntSlice(t *testing.T) { + defaultVal := []int{1, 1024} + tests := []struct { + args string + wantVal []int + wantErr bool + }{ + {"-kbps=1", []int{1}, false}, + {"-kbps=1,2,3", []int{1, 2, 3}, false}, + {"-kbps=20e4", defaultVal, true}, + } + + for _, test := range tests { + flag.CommandLine = flag.NewFlagSet("test", flag.ContinueOnError) + var w = IntSlice("kbps", defaultVal, "usage") + err := flag.CommandLine.Parse([]string{test.args}) + switch { + case !test.wantErr && err != nil: + t.Errorf("failed to parse command line args {%v}: %v", test.args, err) + case test.wantErr && err == nil: + t.Errorf("flag.Parse(%v) = nil, want non-nil error", test.args) + default: + if !reflect.DeepEqual(*w, test.wantVal) { + t.Errorf("flag value is %v, want %v", *w, test.wantVal) + } + } + } +} diff --git a/benchmark/stats/stats.go b/benchmark/stats/stats.go index 64a17cf3a..0968f9625 100644 --- a/benchmark/stats/stats.go +++ b/benchmark/stats/stats.go @@ -32,6 +32,7 @@ import ( // Features contains most fields for a benchmark type Features struct { NetworkMode string + UseBufConn bool EnableTrace bool Latency time.Duration Kbps int