mirror of https://github.com/grpc/grpc-go.git
204 lines
7.4 KiB
Go
204 lines
7.4 KiB
Go
/*
|
|
*
|
|
* Copyright 2014 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
package interop
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/benchmark/stats"
|
|
"google.golang.org/grpc/peer"
|
|
|
|
testgrpc "google.golang.org/grpc/interop/grpc_testing"
|
|
testpb "google.golang.org/grpc/interop/grpc_testing"
|
|
)
|
|
|
|
// SoakWorkerResults stores the aggregated results for a specific worker during the soak test.
|
|
type SoakWorkerResults struct {
|
|
iterationsSucceeded int
|
|
Failures int
|
|
Latencies *stats.Histogram
|
|
}
|
|
|
|
// SoakIterationConfig holds the parameters required for a single soak iteration.
|
|
type SoakIterationConfig struct {
|
|
RequestSize int // The size of the request payload in bytes.
|
|
ResponseSize int // The expected size of the response payload in bytes.
|
|
Client testgrpc.TestServiceClient // The gRPC client to make the call.
|
|
CallOptions []grpc.CallOption // Call options for the RPC.
|
|
}
|
|
|
|
// SoakTestConfig holds the configuration for the entire soak test.
|
|
type SoakTestConfig struct {
|
|
RequestSize int
|
|
ResponseSize int
|
|
PerIterationMaxAcceptableLatency time.Duration
|
|
MinTimeBetweenRPCs time.Duration
|
|
OverallTimeout time.Duration
|
|
ServerAddr string
|
|
NumWorkers int
|
|
Iterations int
|
|
MaxFailures int
|
|
ChannelForTest func() (*grpc.ClientConn, func())
|
|
}
|
|
|
|
func doOneSoakIteration(ctx context.Context, config SoakIterationConfig) (latency time.Duration, err error) {
|
|
start := time.Now()
|
|
// Do a large-unary RPC.
|
|
// Create the request payload.
|
|
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, config.RequestSize)
|
|
req := &testpb.SimpleRequest{
|
|
ResponseType: testpb.PayloadType_COMPRESSABLE,
|
|
ResponseSize: int32(config.ResponseSize),
|
|
Payload: pl,
|
|
}
|
|
// Perform the GRPC call.
|
|
var reply *testpb.SimpleResponse
|
|
reply, err = config.Client.UnaryCall(ctx, req, config.CallOptions...)
|
|
if err != nil {
|
|
err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
|
|
return 0, err
|
|
}
|
|
// Validate response.
|
|
t := reply.GetPayload().GetType()
|
|
s := len(reply.GetPayload().GetBody())
|
|
if t != testpb.PayloadType_COMPRESSABLE || s != config.ResponseSize {
|
|
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, config.ResponseSize)
|
|
return 0, err
|
|
}
|
|
// Calculate latency and return result.
|
|
latency = time.Since(start)
|
|
return latency, nil
|
|
}
|
|
|
|
func executeSoakTestInWorker(ctx context.Context, config SoakTestConfig, startTime time.Time, workerID int, soakWorkerResults *SoakWorkerResults) {
|
|
timeoutDuration := config.OverallTimeout
|
|
soakIterationsPerWorker := config.Iterations / config.NumWorkers
|
|
if soakWorkerResults.Latencies == nil {
|
|
soakWorkerResults.Latencies = stats.NewHistogram(stats.HistogramOptions{
|
|
NumBuckets: 20,
|
|
GrowthFactor: 1,
|
|
BaseBucketSize: 1,
|
|
MinValue: 0,
|
|
})
|
|
}
|
|
|
|
for i := 0; i < soakIterationsPerWorker; i++ {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
if time.Since(startTime) >= timeoutDuration {
|
|
fmt.Printf("Test exceeded overall timeout of %v, stopping...\n", config.OverallTimeout)
|
|
return
|
|
}
|
|
earliestNextStart := time.After(config.MinTimeBetweenRPCs)
|
|
currentChannel, cleanup := config.ChannelForTest()
|
|
defer cleanup()
|
|
client := testgrpc.NewTestServiceClient(currentChannel)
|
|
var p peer.Peer
|
|
iterationConfig := SoakIterationConfig{
|
|
RequestSize: config.RequestSize,
|
|
ResponseSize: config.ResponseSize,
|
|
Client: client,
|
|
CallOptions: []grpc.CallOption{grpc.Peer(&p)},
|
|
}
|
|
latency, err := doOneSoakIteration(ctx, iterationConfig)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s failed: %s\n", workerID, i, 0, p.Addr, config.ServerAddr, err)
|
|
soakWorkerResults.Failures++
|
|
<-earliestNextStart
|
|
continue
|
|
}
|
|
if latency > config.PerIterationMaxAcceptableLatency {
|
|
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s exceeds max acceptable latency: %d\n", workerID, i, latency.Milliseconds(), p.Addr, config.ServerAddr, config.PerIterationMaxAcceptableLatency.Milliseconds())
|
|
soakWorkerResults.Failures++
|
|
<-earliestNextStart
|
|
continue
|
|
}
|
|
// Success: log the details of the iteration.
|
|
soakWorkerResults.Latencies.Add(latency.Milliseconds())
|
|
soakWorkerResults.iterationsSucceeded++
|
|
fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s succeeded\n", workerID, i, latency.Milliseconds(), p.Addr, config.ServerAddr)
|
|
<-earliestNextStart
|
|
}
|
|
}
|
|
|
|
// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
|
|
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
|
|
// stub that is created with the provided server address and dial options.
|
|
// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method.
|
|
func DoSoakTest(ctx context.Context, soakConfig SoakTestConfig) {
|
|
if soakConfig.Iterations%soakConfig.NumWorkers != 0 {
|
|
fmt.Fprintf(os.Stderr, "soakIterations must be evenly divisible by soakNumWThreads\n")
|
|
}
|
|
startTime := time.Now()
|
|
var wg sync.WaitGroup
|
|
soakWorkerResults := make([]SoakWorkerResults, soakConfig.NumWorkers)
|
|
for i := 0; i < soakConfig.NumWorkers; i++ {
|
|
wg.Add(1)
|
|
go func(workerID int) {
|
|
defer wg.Done()
|
|
executeSoakTestInWorker(ctx, soakConfig, startTime, workerID, &soakWorkerResults[workerID])
|
|
}(i)
|
|
}
|
|
// Wait for all goroutines to complete.
|
|
wg.Wait()
|
|
|
|
// Handle results.
|
|
totalSuccesses := 0
|
|
totalFailures := 0
|
|
latencies := stats.NewHistogram(stats.HistogramOptions{
|
|
NumBuckets: 20,
|
|
GrowthFactor: 1,
|
|
BaseBucketSize: 1,
|
|
MinValue: 0,
|
|
})
|
|
for _, worker := range soakWorkerResults {
|
|
totalSuccesses += worker.iterationsSucceeded
|
|
totalFailures += worker.Failures
|
|
if worker.Latencies != nil {
|
|
// Add latencies from the worker's Histogram to the main latencies.
|
|
latencies.Merge(worker.Latencies)
|
|
}
|
|
}
|
|
var b bytes.Buffer
|
|
totalIterations := totalSuccesses + totalFailures
|
|
latencies.Print(&b)
|
|
fmt.Fprintf(os.Stderr,
|
|
"(server_uri: %s) soak test successes: %d / %d iterations. Total failures: %d. Latencies in milliseconds: %s\n",
|
|
soakConfig.ServerAddr, totalSuccesses, soakConfig.Iterations, totalFailures, b.String())
|
|
|
|
if totalIterations != soakConfig.Iterations {
|
|
logger.Fatalf("Soak test consumed all %v of time and quit early, ran %d out of %d iterations.\n", soakConfig.OverallTimeout, totalIterations, soakConfig.Iterations)
|
|
}
|
|
|
|
if totalFailures > soakConfig.MaxFailures {
|
|
logger.Fatalf("Soak test total failures: %d exceeded max failures threshold: %d\n", totalFailures, soakConfig.MaxFailures)
|
|
}
|
|
if soakConfig.ChannelForTest != nil {
|
|
_, cleanup := soakConfig.ChannelForTest()
|
|
defer cleanup()
|
|
}
|
|
}
|