grpc-go/interop/soak_tests.go

204 lines
7.4 KiB
Go

/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package interop
import (
"bytes"
"context"
"fmt"
"os"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/peer"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
// SoakWorkerResults stores the aggregated results for a specific worker during the soak test.
type SoakWorkerResults struct {
iterationsSucceeded int
Failures int
Latencies *stats.Histogram
}
// SoakIterationConfig holds the parameters required for a single soak iteration.
type SoakIterationConfig struct {
RequestSize int // The size of the request payload in bytes.
ResponseSize int // The expected size of the response payload in bytes.
Client testgrpc.TestServiceClient // The gRPC client to make the call.
CallOptions []grpc.CallOption // Call options for the RPC.
}
// SoakTestConfig holds the configuration for the entire soak test.
type SoakTestConfig struct {
RequestSize int
ResponseSize int
PerIterationMaxAcceptableLatency time.Duration
MinTimeBetweenRPCs time.Duration
OverallTimeout time.Duration
ServerAddr string
NumWorkers int
Iterations int
MaxFailures int
ChannelForTest func() (*grpc.ClientConn, func())
}
func doOneSoakIteration(ctx context.Context, config SoakIterationConfig) (latency time.Duration, err error) {
start := time.Now()
// Do a large-unary RPC.
// Create the request payload.
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, config.RequestSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(config.ResponseSize),
Payload: pl,
}
// Perform the GRPC call.
var reply *testpb.SimpleResponse
reply, err = config.Client.UnaryCall(ctx, req, config.CallOptions...)
if err != nil {
err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
return 0, err
}
// Validate response.
t := reply.GetPayload().GetType()
s := len(reply.GetPayload().GetBody())
if t != testpb.PayloadType_COMPRESSABLE || s != config.ResponseSize {
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, config.ResponseSize)
return 0, err
}
// Calculate latency and return result.
latency = time.Since(start)
return latency, nil
}
func executeSoakTestInWorker(ctx context.Context, config SoakTestConfig, startTime time.Time, workerID int, soakWorkerResults *SoakWorkerResults) {
timeoutDuration := config.OverallTimeout
soakIterationsPerWorker := config.Iterations / config.NumWorkers
if soakWorkerResults.Latencies == nil {
soakWorkerResults.Latencies = stats.NewHistogram(stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
})
}
for i := 0; i < soakIterationsPerWorker; i++ {
if ctx.Err() != nil {
return
}
if time.Since(startTime) >= timeoutDuration {
fmt.Printf("Test exceeded overall timeout of %v, stopping...\n", config.OverallTimeout)
return
}
earliestNextStart := time.After(config.MinTimeBetweenRPCs)
currentChannel, cleanup := config.ChannelForTest()
defer cleanup()
client := testgrpc.NewTestServiceClient(currentChannel)
var p peer.Peer
iterationConfig := SoakIterationConfig{
RequestSize: config.RequestSize,
ResponseSize: config.ResponseSize,
Client: client,
CallOptions: []grpc.CallOption{grpc.Peer(&p)},
}
latency, err := doOneSoakIteration(ctx, iterationConfig)
if err != nil {
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s failed: %s\n", workerID, i, 0, p.Addr, config.ServerAddr, err)
soakWorkerResults.Failures++
<-earliestNextStart
continue
}
if latency > config.PerIterationMaxAcceptableLatency {
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s exceeds max acceptable latency: %d\n", workerID, i, latency.Milliseconds(), p.Addr, config.ServerAddr, config.PerIterationMaxAcceptableLatency.Milliseconds())
soakWorkerResults.Failures++
<-earliestNextStart
continue
}
// Success: log the details of the iteration.
soakWorkerResults.Latencies.Add(latency.Milliseconds())
soakWorkerResults.iterationsSucceeded++
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s succeeded\n", workerID, i, latency.Milliseconds(), p.Addr, config.ServerAddr)
<-earliestNextStart
}
}
// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
// stub that is created with the provided server address and dial options.
// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method.
func DoSoakTest(ctx context.Context, soakConfig SoakTestConfig) {
if soakConfig.Iterations%soakConfig.NumWorkers != 0 {
fmt.Fprintf(os.Stderr, "soakIterations must be evenly divisible by soakNumWThreads\n")
}
startTime := time.Now()
var wg sync.WaitGroup
soakWorkerResults := make([]SoakWorkerResults, soakConfig.NumWorkers)
for i := 0; i < soakConfig.NumWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
executeSoakTestInWorker(ctx, soakConfig, startTime, workerID, &soakWorkerResults[workerID])
}(i)
}
// Wait for all goroutines to complete.
wg.Wait()
// Handle results.
totalSuccesses := 0
totalFailures := 0
latencies := stats.NewHistogram(stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
})
for _, worker := range soakWorkerResults {
totalSuccesses += worker.iterationsSucceeded
totalFailures += worker.Failures
if worker.Latencies != nil {
// Add latencies from the worker's Histogram to the main latencies.
latencies.Merge(worker.Latencies)
}
}
var b bytes.Buffer
totalIterations := totalSuccesses + totalFailures
latencies.Print(&b)
fmt.Fprintf(os.Stderr,
"(server_uri: %s) soak test successes: %d / %d iterations. Total failures: %d. Latencies in milliseconds: %s\n",
soakConfig.ServerAddr, totalSuccesses, soakConfig.Iterations, totalFailures, b.String())
if totalIterations != soakConfig.Iterations {
logger.Fatalf("Soak test consumed all %v of time and quit early, ran %d out of %d iterations.\n", soakConfig.OverallTimeout, totalIterations, soakConfig.Iterations)
}
if totalFailures > soakConfig.MaxFailures {
logger.Fatalf("Soak test total failures: %d exceeded max failures threshold: %d\n", totalFailures, soakConfig.MaxFailures)
}
if soakConfig.ChannelForTest != nil {
_, cleanup := soakConfig.ChannelForTest()
defer cleanup()
}
}