/* * * Copyright 2014 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package interop import ( "bytes" "context" "fmt" "os" "sync" "time" "google.golang.org/grpc" "google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/peer" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" ) // SoakWorkerResults stores the aggregated results for a specific worker during the soak test. type SoakWorkerResults struct { IterationsDone int Failures int Latencies *stats.Histogram } // SoakIterationConfig holds the parameters required for a single soak iteration. type SoakIterationConfig struct { RequestSize int // The size of the request payload in bytes. ResponseSize int // The expected size of the response payload in bytes. Client testgrpc.TestServiceClient // The gRPC client to make the call. CallOptions []grpc.CallOption // Call options for the RPC. } // SoakTestConfig holds the configuration for the entire soak test. type SoakTestConfig struct { RequestSize int ResponseSize int PerIterationMaxAcceptableLatency time.Duration MinTimeBetweenRPCs time.Duration OverallTimeout time.Duration ServerAddr string NumWorkers int Iterations int MaxFailures int ChannelForTest func() (*grpc.ClientConn, func()) } func doOneSoakIteration(ctx context.Context, config SoakIterationConfig) (latency time.Duration, err error) { start := time.Now() // Do a large-unary RPC. // Create the request payload. pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, config.RequestSize) req := &testpb.SimpleRequest{ ResponseType: testpb.PayloadType_COMPRESSABLE, ResponseSize: int32(config.ResponseSize), Payload: pl, } // Perform the GRPC call. var reply *testpb.SimpleResponse reply, err = config.Client.UnaryCall(ctx, req, config.CallOptions...) if err != nil { err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err) return 0, err } // Validate response. t := reply.GetPayload().GetType() s := len(reply.GetPayload().GetBody()) if t != testpb.PayloadType_COMPRESSABLE || s != config.ResponseSize { err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, config.ResponseSize) return 0, err } // Calculate latency and return result. latency = time.Since(start) return latency, nil } func executeSoakTestInWorker(ctx context.Context, config SoakTestConfig, startTime time.Time, workerID int, soakWorkerResults *SoakWorkerResults) { timeoutDuration := config.OverallTimeout soakIterationsPerWorker := config.Iterations / config.NumWorkers for i := 0; i < soakIterationsPerWorker; i++ { if ctx.Err() != nil { return } if time.Since(startTime) >= timeoutDuration { fmt.Printf("Test exceeded overall timeout of %v, stopping...\n", config.OverallTimeout) return } earliestNextStart := time.After(config.MinTimeBetweenRPCs) currentChannel, cleanup := config.ChannelForTest() defer cleanup() client := testgrpc.NewTestServiceClient(currentChannel) var p peer.Peer iterationConfig := SoakIterationConfig{ RequestSize: config.RequestSize, ResponseSize: config.ResponseSize, Client: client, CallOptions: []grpc.CallOption{grpc.Peer(&p)}, } latency, err := doOneSoakIteration(ctx, iterationConfig) if err != nil { fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s failed: %s\n", workerID, i, 0, p.Addr, config.ServerAddr, err) soakWorkerResults.Failures++ <-earliestNextStart continue } if latency > config.PerIterationMaxAcceptableLatency { fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s exceeds max acceptable latency: %d\n", workerID, i, latency, p.Addr, config.ServerAddr, config.PerIterationMaxAcceptableLatency.Milliseconds()) soakWorkerResults.Failures++ <-earliestNextStart continue } // Success: log the details of the iteration. soakWorkerResults.Latencies.Add(latency.Milliseconds()) soakWorkerResults.IterationsDone++ fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s succeeded\n", workerID, i, latency, p.Addr, config.ServerAddr) <-earliestNextStart } } // DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds. // If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new // stub that is created with the provided server address and dial options. // TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method. func DoSoakTest(ctx context.Context, soakConfig SoakTestConfig) { if soakConfig.Iterations%soakConfig.NumWorkers != 0 { fmt.Fprintf(os.Stderr, "soakIterations must be evenly divisible by soakNumWThreads\n") } startTime := time.Now() var wg sync.WaitGroup soakWorkerResults := make([]SoakWorkerResults, soakConfig.NumWorkers) for i := 0; i < soakConfig.NumWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() executeSoakTestInWorker(ctx, soakConfig, startTime, workerID, &soakWorkerResults[workerID]) }(i) } // Wait for all goroutines to complete. wg.Wait() // Handle results. totalIterations := 0 totalFailures := 0 latencies := stats.NewHistogram(stats.HistogramOptions{ NumBuckets: 20, GrowthFactor: 1, BaseBucketSize: 1, MinValue: 0, }) for _, worker := range soakWorkerResults { totalIterations += worker.IterationsDone totalFailures += worker.Failures if worker.Latencies != nil { // Add latencies from the worker's Histogram to the main latencies. latencies.Merge(worker.Latencies) } } var b bytes.Buffer latencies.Print(&b) fmt.Fprintf(os.Stderr, "(server_uri: %s) soak test ran: %d / %d iterations. Total failures: %d. Latencies in milliseconds: %s\n", soakConfig.ServerAddr, totalIterations, soakConfig.Iterations, totalFailures, b.String()) if totalIterations != soakConfig.Iterations { fmt.Fprintf(os.Stderr, "Soak test consumed all %v of time and quit early, ran %d out of %d iterations.\n", soakConfig.OverallTimeout, totalIterations, soakConfig.Iterations) } if totalFailures > soakConfig.MaxFailures { fmt.Fprintf(os.Stderr, "Soak test total failures: %d exceeded max failures threshold: %d\n", totalFailures, soakConfig.MaxFailures) } if soakConfig.ChannelForTest != nil { _, cleanup := soakConfig.ChannelForTest() defer cleanup() } }