mirror of https://github.com/grpc/grpc-go.git
				
				
				
			
		
			
				
	
	
		
			370 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			370 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
/*
 | 
						|
 *
 | 
						|
 * Copyright 2014 gRPC authors.
 | 
						|
 *
 | 
						|
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
 * you may not use this file except in compliance with the License.
 | 
						|
 * You may obtain a copy of the License at
 | 
						|
 *
 | 
						|
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 *
 | 
						|
 * Unless required by applicable law or agreed to in writing, software
 | 
						|
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
 * See the License for the specific language governing permissions and
 | 
						|
 * limitations under the License.
 | 
						|
 *
 | 
						|
 */
 | 
						|
 | 
						|
//go:generate protoc -I grpc_testing --go_out=plugins=grpc:grpc_testing grpc_testing/control.proto grpc_testing/messages.proto grpc_testing/payloads.proto grpc_testing/services.proto grpc_testing/stats.proto
 | 
						|
 | 
						|
/*
 | 
						|
Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks.
 | 
						|
*/
 | 
						|
package benchmark
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"net"
 | 
						|
	"sync"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"golang.org/x/net/context"
 | 
						|
	"google.golang.org/grpc"
 | 
						|
	testpb "google.golang.org/grpc/benchmark/grpc_testing"
 | 
						|
	"google.golang.org/grpc/benchmark/latency"
 | 
						|
	"google.golang.org/grpc/benchmark/stats"
 | 
						|
	"google.golang.org/grpc/grpclog"
 | 
						|
)
 | 
						|
 | 
						|
// AddOne add 1 to the features slice
 | 
						|
func AddOne(features []int, featuresMaxPosition []int) {
 | 
						|
	for i := len(features) - 1; i >= 0; i-- {
 | 
						|
		features[i] = (features[i] + 1)
 | 
						|
		if features[i]/featuresMaxPosition[i] == 0 {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		features[i] = features[i] % featuresMaxPosition[i]
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Allows reuse of the same testpb.Payload object.
 | 
						|
func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
 | 
						|
	if size < 0 {
 | 
						|
		grpclog.Fatalf("Requested a response with invalid length %d", size)
 | 
						|
	}
 | 
						|
	body := make([]byte, size)
 | 
						|
	switch t {
 | 
						|
	case testpb.PayloadType_COMPRESSABLE:
 | 
						|
	case testpb.PayloadType_UNCOMPRESSABLE:
 | 
						|
		grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
 | 
						|
	default:
 | 
						|
		grpclog.Fatalf("Unsupported payload type: %d", t)
 | 
						|
	}
 | 
						|
	p.Type = t
 | 
						|
	p.Body = body
 | 
						|
}
 | 
						|
 | 
						|
func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
 | 
						|
	p := new(testpb.Payload)
 | 
						|
	setPayload(p, t, size)
 | 
						|
	return p
 | 
						|
}
 | 
						|
 | 
						|
type testServer struct {
 | 
						|
}
 | 
						|
 | 
						|
func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
 | 
						|
	return &testpb.SimpleResponse{
 | 
						|
		Payload: newPayload(in.ResponseType, int(in.ResponseSize)),
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
 | 
						|
	response := &testpb.SimpleResponse{
 | 
						|
		Payload: new(testpb.Payload),
 | 
						|
	}
 | 
						|
	in := new(testpb.SimpleRequest)
 | 
						|
	for {
 | 
						|
		// use ServerStream directly to reuse the same testpb.SimpleRequest object
 | 
						|
		err := stream.(grpc.ServerStream).RecvMsg(in)
 | 
						|
		if err == io.EOF {
 | 
						|
			// read done.
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
 | 
						|
		if err := stream.Send(response); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// byteBufServer is a gRPC server that sends and receives byte buffer.
 | 
						|
// The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
 | 
						|
type byteBufServer struct {
 | 
						|
	respSize int32
 | 
						|
}
 | 
						|
 | 
						|
// UnaryCall is an empty function and is not used for benchmark.
 | 
						|
// If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated.
 | 
						|
func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
 | 
						|
	return &testpb.SimpleResponse{}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
 | 
						|
	for {
 | 
						|
		var in []byte
 | 
						|
		err := stream.(grpc.ServerStream).RecvMsg(&in)
 | 
						|
		if err == io.EOF {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		out := make([]byte, s.respSize)
 | 
						|
		if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// ServerInfo contains the information to create a gRPC benchmark server.
 | 
						|
type ServerInfo struct {
 | 
						|
	// Type is the type of the server.
 | 
						|
	// It should be "protobuf" or "bytebuf".
 | 
						|
	Type string
 | 
						|
 | 
						|
	// Metadata is an optional configuration.
 | 
						|
	// For "protobuf", it's ignored.
 | 
						|
	// For "bytebuf", it should be an int representing response size.
 | 
						|
	Metadata interface{}
 | 
						|
 | 
						|
	// Listener is the network listener for the server to use
 | 
						|
	Listener net.Listener
 | 
						|
}
 | 
						|
 | 
						|
// StartServer starts a gRPC server serving a benchmark service according to info.
 | 
						|
// It returns a function to stop the server.
 | 
						|
func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
 | 
						|
	opts = append(opts, grpc.WriteBufferSize(128*1024))
 | 
						|
	opts = append(opts, grpc.ReadBufferSize(128*1024))
 | 
						|
	s := grpc.NewServer(opts...)
 | 
						|
	switch info.Type {
 | 
						|
	case "protobuf":
 | 
						|
		testpb.RegisterBenchmarkServiceServer(s, &testServer{})
 | 
						|
	case "bytebuf":
 | 
						|
		respSize, ok := info.Metadata.(int32)
 | 
						|
		if !ok {
 | 
						|
			grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
 | 
						|
		}
 | 
						|
		testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
 | 
						|
	default:
 | 
						|
		grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
 | 
						|
	}
 | 
						|
	go s.Serve(info.Listener)
 | 
						|
	return func() {
 | 
						|
		s.Stop()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// DoUnaryCall performs an unary RPC with given stub and request and response sizes.
 | 
						|
func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error {
 | 
						|
	pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
 | 
						|
	req := &testpb.SimpleRequest{
 | 
						|
		ResponseType: pl.Type,
 | 
						|
		ResponseSize: int32(respSize),
 | 
						|
		Payload:      pl,
 | 
						|
	}
 | 
						|
	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
 | 
						|
		return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// DoStreamingRoundTrip performs a round trip for a single streaming rpc.
 | 
						|
func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
 | 
						|
	pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
 | 
						|
	req := &testpb.SimpleRequest{
 | 
						|
		ResponseType: pl.Type,
 | 
						|
		ResponseSize: int32(respSize),
 | 
						|
		Payload:      pl,
 | 
						|
	}
 | 
						|
	if err := stream.Send(req); err != nil {
 | 
						|
		return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
 | 
						|
	}
 | 
						|
	if _, err := stream.Recv(); err != nil {
 | 
						|
		// EOF is a valid error here.
 | 
						|
		if err == io.EOF {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
 | 
						|
func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
 | 
						|
	out := make([]byte, reqSize)
 | 
						|
	if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
 | 
						|
		return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
 | 
						|
	}
 | 
						|
	var in []byte
 | 
						|
	if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
 | 
						|
		// EOF is a valid error here.
 | 
						|
		if err == io.EOF {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// NewClientConn creates a gRPC client connection to addr.
 | 
						|
func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
 | 
						|
	return NewClientConnWithContext(context.Background(), addr, opts...)
 | 
						|
}
 | 
						|
 | 
						|
// NewClientConnWithContext creates a gRPC client connection to addr using ctx.
 | 
						|
func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
 | 
						|
	opts = append(opts, grpc.WithWriteBufferSize(128*1024))
 | 
						|
	opts = append(opts, grpc.WithReadBufferSize(128*1024))
 | 
						|
	conn, err := grpc.DialContext(ctx, addr, opts...)
 | 
						|
	if err != nil {
 | 
						|
		grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
 | 
						|
	}
 | 
						|
	return conn
 | 
						|
}
 | 
						|
 | 
						|
func runUnary(b *testing.B, benchFeatures stats.Features) {
 | 
						|
	s := stats.AddStats(b, 38)
 | 
						|
	nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
 | 
						|
	lis, err := net.Listen("tcp", "localhost:0")
 | 
						|
	if err != nil {
 | 
						|
		grpclog.Fatalf("Failed to listen: %v", err)
 | 
						|
	}
 | 
						|
	target := lis.Addr().String()
 | 
						|
	lis = nw.Listener(lis)
 | 
						|
	stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
 | 
						|
	defer stopper()
 | 
						|
	conn := NewClientConn(
 | 
						|
		target, grpc.WithInsecure(),
 | 
						|
		grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
 | 
						|
			return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
 | 
						|
		}),
 | 
						|
	)
 | 
						|
	tc := testpb.NewBenchmarkServiceClient(conn)
 | 
						|
 | 
						|
	// Warm up connection.
 | 
						|
	for i := 0; i < 10; i++ {
 | 
						|
		unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
 | 
						|
	}
 | 
						|
	ch := make(chan int, benchFeatures.MaxConcurrentCalls*4)
 | 
						|
	var (
 | 
						|
		mu sync.Mutex
 | 
						|
		wg sync.WaitGroup
 | 
						|
	)
 | 
						|
	wg.Add(benchFeatures.MaxConcurrentCalls)
 | 
						|
 | 
						|
	// Distribute the b.N calls over maxConcurrentCalls workers.
 | 
						|
	for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
 | 
						|
		go func() {
 | 
						|
			for range ch {
 | 
						|
				start := time.Now()
 | 
						|
				unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
 | 
						|
				elapse := time.Since(start)
 | 
						|
				mu.Lock()
 | 
						|
				s.Add(elapse)
 | 
						|
				mu.Unlock()
 | 
						|
			}
 | 
						|
			wg.Done()
 | 
						|
		}()
 | 
						|
	}
 | 
						|
	b.ResetTimer()
 | 
						|
	for i := 0; i < b.N; i++ {
 | 
						|
		ch <- i
 | 
						|
	}
 | 
						|
	close(ch)
 | 
						|
	wg.Wait()
 | 
						|
	b.StopTimer()
 | 
						|
	conn.Close()
 | 
						|
}
 | 
						|
 | 
						|
func runStream(b *testing.B, benchFeatures stats.Features) {
 | 
						|
	s := stats.AddStats(b, 38)
 | 
						|
	nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
 | 
						|
	lis, err := net.Listen("tcp", "localhost:0")
 | 
						|
	if err != nil {
 | 
						|
		grpclog.Fatalf("Failed to listen: %v", err)
 | 
						|
	}
 | 
						|
	target := lis.Addr().String()
 | 
						|
	lis = nw.Listener(lis)
 | 
						|
	stopper := StartServer(ServerInfo{Type: "protobuf", Listener: lis}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
 | 
						|
	defer stopper()
 | 
						|
	conn := NewClientConn(
 | 
						|
		target, grpc.WithInsecure(),
 | 
						|
		grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
 | 
						|
			return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
 | 
						|
		}),
 | 
						|
	)
 | 
						|
	tc := testpb.NewBenchmarkServiceClient(conn)
 | 
						|
 | 
						|
	// Warm up connection.
 | 
						|
	stream, err := tc.StreamingCall(context.Background())
 | 
						|
	if err != nil {
 | 
						|
		b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
 | 
						|
	}
 | 
						|
	for i := 0; i < 10; i++ {
 | 
						|
		streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
 | 
						|
	}
 | 
						|
 | 
						|
	ch := make(chan struct{}, benchFeatures.MaxConcurrentCalls*4)
 | 
						|
	var (
 | 
						|
		mu sync.Mutex
 | 
						|
		wg sync.WaitGroup
 | 
						|
	)
 | 
						|
	wg.Add(benchFeatures.MaxConcurrentCalls)
 | 
						|
 | 
						|
	// Distribute the b.N calls over maxConcurrentCalls workers.
 | 
						|
	for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
 | 
						|
		stream, err := tc.StreamingCall(context.Background())
 | 
						|
		if err != nil {
 | 
						|
			b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
 | 
						|
		}
 | 
						|
		go func() {
 | 
						|
			for range ch {
 | 
						|
				start := time.Now()
 | 
						|
				streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
 | 
						|
				elapse := time.Since(start)
 | 
						|
				mu.Lock()
 | 
						|
				s.Add(elapse)
 | 
						|
				mu.Unlock()
 | 
						|
			}
 | 
						|
			wg.Done()
 | 
						|
		}()
 | 
						|
	}
 | 
						|
	b.ResetTimer()
 | 
						|
	for i := 0; i < b.N; i++ {
 | 
						|
		ch <- struct{}{}
 | 
						|
	}
 | 
						|
	close(ch)
 | 
						|
	wg.Wait()
 | 
						|
	b.StopTimer()
 | 
						|
	conn.Close()
 | 
						|
}
 | 
						|
func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
 | 
						|
	if err := DoUnaryCall(client, reqSize, respSize); err != nil {
 | 
						|
		grpclog.Fatalf("DoUnaryCall failed: %v", err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
 | 
						|
	if err := DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
 | 
						|
		grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)
 | 
						|
	}
 | 
						|
}
 |