mirror of https://github.com/grpc/grpc-go.git
				
				
				
			
		
			
				
	
	
		
			204 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			204 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Go
		
	
	
	
| /*
 | |
| *
 | |
| * Copyright 2014 gRPC authors.
 | |
| *
 | |
| * Licensed under the Apache License, Version 2.0 (the "License");
 | |
| * you may not use this file except in compliance with the License.
 | |
| * You may obtain a copy of the License at
 | |
| *
 | |
| *     http://www.apache.org/licenses/LICENSE-2.0
 | |
| *
 | |
| * Unless required by applicable law or agreed to in writing, software
 | |
| * distributed under the License is distributed on an "AS IS" BASIS,
 | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| * See the License for the specific language governing permissions and
 | |
| * limitations under the License.
 | |
| *
 | |
|  */
 | |
| 
 | |
| package interop
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"os"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"google.golang.org/grpc"
 | |
| 	"google.golang.org/grpc/benchmark/stats"
 | |
| 	"google.golang.org/grpc/peer"
 | |
| 
 | |
| 	testgrpc "google.golang.org/grpc/interop/grpc_testing"
 | |
| 	testpb "google.golang.org/grpc/interop/grpc_testing"
 | |
| )
 | |
| 
 | |
| // SoakWorkerResults stores the aggregated results for a specific worker during the soak test.
 | |
| type SoakWorkerResults struct {
 | |
| 	iterationsSucceeded int
 | |
| 	Failures            int
 | |
| 	Latencies           *stats.Histogram
 | |
| }
 | |
| 
 | |
| // SoakIterationConfig holds the parameters required for a single soak iteration.
 | |
| type SoakIterationConfig struct {
 | |
| 	RequestSize  int                        // The size of the request payload in bytes.
 | |
| 	ResponseSize int                        // The expected size of the response payload in bytes.
 | |
| 	Client       testgrpc.TestServiceClient // The gRPC client to make the call.
 | |
| 	CallOptions  []grpc.CallOption          // Call options for the RPC.
 | |
| }
 | |
| 
 | |
| // SoakTestConfig holds the configuration for the entire soak test.
 | |
| type SoakTestConfig struct {
 | |
| 	RequestSize                      int
 | |
| 	ResponseSize                     int
 | |
| 	PerIterationMaxAcceptableLatency time.Duration
 | |
| 	MinTimeBetweenRPCs               time.Duration
 | |
| 	OverallTimeout                   time.Duration
 | |
| 	ServerAddr                       string
 | |
| 	NumWorkers                       int
 | |
| 	Iterations                       int
 | |
| 	MaxFailures                      int
 | |
| 	ChannelForTest                   func() (*grpc.ClientConn, func())
 | |
| }
 | |
| 
 | |
| func doOneSoakIteration(ctx context.Context, config SoakIterationConfig) (latency time.Duration, err error) {
 | |
| 	start := time.Now()
 | |
| 	// Do a large-unary RPC.
 | |
| 	// Create the request payload.
 | |
| 	pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, config.RequestSize)
 | |
| 	req := &testpb.SimpleRequest{
 | |
| 		ResponseType: testpb.PayloadType_COMPRESSABLE,
 | |
| 		ResponseSize: int32(config.ResponseSize),
 | |
| 		Payload:      pl,
 | |
| 	}
 | |
| 	// Perform the GRPC call.
 | |
| 	var reply *testpb.SimpleResponse
 | |
| 	reply, err = config.Client.UnaryCall(ctx, req, config.CallOptions...)
 | |
| 	if err != nil {
 | |
| 		err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	// Validate response.
 | |
| 	t := reply.GetPayload().GetType()
 | |
| 	s := len(reply.GetPayload().GetBody())
 | |
| 	if t != testpb.PayloadType_COMPRESSABLE || s != config.ResponseSize {
 | |
| 		err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, config.ResponseSize)
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	// Calculate latency and return result.
 | |
| 	latency = time.Since(start)
 | |
| 	return latency, nil
 | |
| }
 | |
| 
 | |
| func executeSoakTestInWorker(ctx context.Context, config SoakTestConfig, startTime time.Time, workerID int, soakWorkerResults *SoakWorkerResults) {
 | |
| 	timeoutDuration := config.OverallTimeout
 | |
| 	soakIterationsPerWorker := config.Iterations / config.NumWorkers
 | |
| 	if soakWorkerResults.Latencies == nil {
 | |
| 		soakWorkerResults.Latencies = stats.NewHistogram(stats.HistogramOptions{
 | |
| 			NumBuckets:     20,
 | |
| 			GrowthFactor:   1,
 | |
| 			BaseBucketSize: 1,
 | |
| 			MinValue:       0,
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	for i := 0; i < soakIterationsPerWorker; i++ {
 | |
| 		if ctx.Err() != nil {
 | |
| 			return
 | |
| 		}
 | |
| 		if time.Since(startTime) >= timeoutDuration {
 | |
| 			fmt.Printf("Test exceeded overall timeout of %v, stopping...\n", config.OverallTimeout)
 | |
| 			return
 | |
| 		}
 | |
| 		earliestNextStart := time.After(config.MinTimeBetweenRPCs)
 | |
| 		currentChannel, cleanup := config.ChannelForTest()
 | |
| 		defer cleanup()
 | |
| 		client := testgrpc.NewTestServiceClient(currentChannel)
 | |
| 		var p peer.Peer
 | |
| 		iterationConfig := SoakIterationConfig{
 | |
| 			RequestSize:  config.RequestSize,
 | |
| 			ResponseSize: config.ResponseSize,
 | |
| 			Client:       client,
 | |
| 			CallOptions:  []grpc.CallOption{grpc.Peer(&p)},
 | |
| 		}
 | |
| 		latency, err := doOneSoakIteration(ctx, iterationConfig)
 | |
| 		if err != nil {
 | |
| 			fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s failed: %s\n", workerID, i, 0, p.Addr, config.ServerAddr, err)
 | |
| 			soakWorkerResults.Failures++
 | |
| 			<-earliestNextStart
 | |
| 			continue
 | |
| 		}
 | |
| 		if latency > config.PerIterationMaxAcceptableLatency {
 | |
| 			fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s exceeds max acceptable latency: %d\n", workerID, i, latency.Milliseconds(), p.Addr, config.ServerAddr, config.PerIterationMaxAcceptableLatency.Milliseconds())
 | |
| 			soakWorkerResults.Failures++
 | |
| 			<-earliestNextStart
 | |
| 			continue
 | |
| 		}
 | |
| 		// Success: log the details of the iteration.
 | |
| 		soakWorkerResults.Latencies.Add(latency.Milliseconds())
 | |
| 		soakWorkerResults.iterationsSucceeded++
 | |
| 		fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s succeeded\n", workerID, i, latency.Milliseconds(), p.Addr, config.ServerAddr)
 | |
| 		<-earliestNextStart
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
 | |
| // If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
 | |
| // stub that is created with the provided server address and dial options.
 | |
| // TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method.
 | |
| func DoSoakTest(ctx context.Context, soakConfig SoakTestConfig) {
 | |
| 	if soakConfig.Iterations%soakConfig.NumWorkers != 0 {
 | |
| 		fmt.Fprintf(os.Stderr, "soakIterations must be evenly divisible by soakNumWThreads\n")
 | |
| 	}
 | |
| 	startTime := time.Now()
 | |
| 	var wg sync.WaitGroup
 | |
| 	soakWorkerResults := make([]SoakWorkerResults, soakConfig.NumWorkers)
 | |
| 	for i := 0; i < soakConfig.NumWorkers; i++ {
 | |
| 		wg.Add(1)
 | |
| 		go func(workerID int) {
 | |
| 			defer wg.Done()
 | |
| 			executeSoakTestInWorker(ctx, soakConfig, startTime, workerID, &soakWorkerResults[workerID])
 | |
| 		}(i)
 | |
| 	}
 | |
| 	// Wait for all goroutines to complete.
 | |
| 	wg.Wait()
 | |
| 
 | |
| 	// Handle results.
 | |
| 	totalSuccesses := 0
 | |
| 	totalFailures := 0
 | |
| 	latencies := stats.NewHistogram(stats.HistogramOptions{
 | |
| 		NumBuckets:     20,
 | |
| 		GrowthFactor:   1,
 | |
| 		BaseBucketSize: 1,
 | |
| 		MinValue:       0,
 | |
| 	})
 | |
| 	for _, worker := range soakWorkerResults {
 | |
| 		totalSuccesses += worker.iterationsSucceeded
 | |
| 		totalFailures += worker.Failures
 | |
| 		if worker.Latencies != nil {
 | |
| 			// Add latencies from the worker's Histogram to the main latencies.
 | |
| 			latencies.Merge(worker.Latencies)
 | |
| 		}
 | |
| 	}
 | |
| 	var b bytes.Buffer
 | |
| 	totalIterations := totalSuccesses + totalFailures
 | |
| 	latencies.Print(&b)
 | |
| 	fmt.Fprintf(os.Stderr,
 | |
| 		"(server_uri: %s) soak test successes: %d / %d iterations. Total failures: %d. Latencies in milliseconds: %s\n",
 | |
| 		soakConfig.ServerAddr, totalSuccesses, soakConfig.Iterations, totalFailures, b.String())
 | |
| 
 | |
| 	if totalIterations != soakConfig.Iterations {
 | |
| 		logger.Fatalf("Soak test consumed all %v of time and quit early, ran %d out of %d iterations.\n", soakConfig.OverallTimeout, totalIterations, soakConfig.Iterations)
 | |
| 	}
 | |
| 
 | |
| 	if totalFailures > soakConfig.MaxFailures {
 | |
| 		logger.Fatalf("Soak test total failures: %d exceeded max failures threshold: %d\n", totalFailures, soakConfig.MaxFailures)
 | |
| 	}
 | |
| 	if soakConfig.ChannelForTest != nil {
 | |
| 		_, cleanup := soakConfig.ChannelForTest()
 | |
| 		defer cleanup()
 | |
| 	}
 | |
| }
 |