Merge pull request #207 from yangzhouhan/master

modified test proto and add benchmark test for streaming rpc
This commit is contained in:
Qi Zhao 2015-06-03 10:50:48 -07:00
commit d6f8134fd2
5 changed files with 735 additions and 708 deletions

View File

@ -40,13 +40,11 @@ import (
"io" "io"
"math" "math"
"net" "net"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
testpb "google.golang.org/grpc/interop/grpc_testing"
) )
func newPayload(t testpb.PayloadType, size int) *testpb.Payload { func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
@ -62,7 +60,7 @@ func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
grpclog.Fatalf("Unsupported payload type: %d", t) grpclog.Fatalf("Unsupported payload type: %d", t)
} }
return &testpb.Payload{ return &testpb.Payload{
Type: t.Enum(), Type: t,
Body: body, Body: body,
} }
} }
@ -70,49 +68,13 @@ func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
type testServer struct { type testServer struct {
} }
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return new(testpb.Empty), nil
}
func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{ return &testpb.SimpleResponse{
Payload: newPayload(in.GetResponseType(), int(in.GetResponseSize())), Payload: newPayload(in.ResponseType, int(in.ResponseSize)),
}, nil }, nil
} }
func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error { func (s *testServer) StreamingCall(stream testpb.TestService_StreamingCallServer) error {
cs := args.GetResponseParameters()
for _, c := range cs {
if us := c.GetIntervalUs(); us > 0 {
time.Sleep(time.Duration(us) * time.Microsecond)
}
if err := stream.Send(&testpb.StreamingOutputCallResponse{
Payload: newPayload(args.GetResponseType(), int(c.GetSize())),
}); err != nil {
return err
}
}
return nil
}
func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
var sum int
for {
in, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&testpb.StreamingInputCallResponse{
AggregatedPayloadSize: proto.Int32(int32(sum)),
})
}
if err != nil {
return err
}
p := in.GetPayload().GetBody()
sum += len(p)
}
}
func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
for { for {
in, err := stream.Recv() in, err := stream.Recv()
if err == io.EOF { if err == io.EOF {
@ -122,47 +84,12 @@ func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServ
if err != nil { if err != nil {
return err return err
} }
cs := in.GetResponseParameters() if err := stream.Send(&testpb.SimpleResponse{
for _, c := range cs { Payload: newPayload(in.ResponseType, int(in.ResponseSize)),
if us := c.GetIntervalUs(); us > 0 { }); err != nil {
time.Sleep(time.Duration(us) * time.Microsecond)
}
if err := stream.Send(&testpb.StreamingOutputCallResponse{
Payload: newPayload(in.GetResponseType(), int(c.GetSize())),
}); err != nil {
return err
}
}
}
}
func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error {
var msgBuf []*testpb.StreamingOutputCallRequest
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
break
}
if err != nil {
return err return err
} }
msgBuf = append(msgBuf, in)
} }
for _, m := range msgBuf {
cs := m.GetResponseParameters()
for _, c := range cs {
if us := c.GetIntervalUs(); us > 0 {
time.Sleep(time.Duration(us) * time.Microsecond)
}
if err := stream.Send(&testpb.StreamingOutputCallResponse{
Payload: newPayload(m.GetResponseType(), int(c.GetSize())),
}); err != nil {
return err
}
}
}
return nil
} }
// StartServer starts a gRPC server serving a benchmark service. It returns its // StartServer starts a gRPC server serving a benchmark service. It returns its
@ -184,8 +111,8 @@ func StartServer() (string, func()) {
func DoUnaryCall(tc testpb.TestServiceClient, reqSize, respSize int) { func DoUnaryCall(tc testpb.TestServiceClient, reqSize, respSize int) {
pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize) pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
req := &testpb.SimpleRequest{ req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), ResponseType: pl.Type,
ResponseSize: proto.Int32(int32(respSize)), ResponseSize: int32(respSize),
Payload: pl, Payload: pl,
} }
if _, err := tc.UnaryCall(context.Background(), req); err != nil { if _, err := tc.UnaryCall(context.Background(), req); err != nil {
@ -193,6 +120,22 @@ func DoUnaryCall(tc testpb.TestServiceClient, reqSize, respSize int) {
} }
} }
// DoStreamingRoundTrip performs a round trip for a single streaming rpc.
func DoStreamingRoundTrip(tc testpb.TestServiceClient, stream testpb.TestService_StreamingCallClient, reqSize, respSize int) {
pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
req := &testpb.SimpleRequest{
ResponseType: pl.Type,
ResponseSize: int32(respSize),
Payload: pl,
}
if err := stream.Send(req); err != nil {
grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
if _, err := stream.Recv(); err != nil {
grpclog.Fatal("%v.StreamingCall(_) = _, %v", tc, err)
}
}
// NewClientConn creates a gRPC client connection to addr. // NewClientConn creates a gRPC client connection to addr.
func NewClientConn(addr string) *grpc.ClientConn { func NewClientConn(addr string) *grpc.ClientConn {
conn, err := grpc.Dial(addr) conn, err := grpc.Dial(addr)

View File

@ -6,11 +6,13 @@ import (
"testing" "testing"
"time" "time"
"golang.org/x/net/context"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/benchmark/stats"
testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/grpclog"
) )
func run(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceClient)) { func runUnary(b *testing.B, maxConcurrentCalls int) {
s := stats.AddStats(b, 38) s := stats.AddStats(b, 38)
b.StopTimer() b.StopTimer()
target, stopper := StartServer() target, stopper := StartServer()
@ -20,9 +22,8 @@ func run(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceCli
// Warm up connection. // Warm up connection.
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
caller(tc) unaryCaller(tc)
} }
ch := make(chan int, maxConcurrentCalls*4) ch := make(chan int, maxConcurrentCalls*4)
var ( var (
mu sync.Mutex mu sync.Mutex
@ -35,7 +36,7 @@ func run(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceCli
go func() { go func() {
for _ = range ch { for _ = range ch {
start := time.Now() start := time.Now()
caller(tc) unaryCaller(tc)
elapse := time.Since(start) elapse := time.Since(start)
mu.Lock() mu.Lock()
s.Add(elapse) s.Add(elapse)
@ -54,24 +55,89 @@ func run(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceCli
conn.Close() conn.Close()
} }
func smallCaller(client testpb.TestServiceClient) { func runStream(b *testing.B, maxConcurrentCalls int) {
s := stats.AddStats(b, 38)
b.StopTimer()
target, stopper := StartServer()
defer stopper()
conn := NewClientConn(target)
tc := testpb.NewTestServiceClient(conn)
stream, err := tc.StreamingCall(context.Background())
if err != nil {
grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
// Warm up connection.
for i := 0; i < 10; i++ {
streamCaller(tc, stream)
}
ch := make(chan int, maxConcurrentCalls*4)
var (
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(maxConcurrentCalls)
// Distribute the b.N calls over maxConcurrentCalls workers.
for i := 0; i < maxConcurrentCalls; i++ {
go func() {
for _ = range ch {
start := time.Now()
streamCaller(tc, stream)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
}
wg.Done()
}()
}
b.StartTimer()
for i := 0; i < b.N; i++ {
ch <- i
}
b.StopTimer()
close(ch)
wg.Wait()
conn.Close()
}
func unaryCaller(client testpb.TestServiceClient) {
DoUnaryCall(client, 1, 1) DoUnaryCall(client, 1, 1)
} }
func BenchmarkClientSmallc1(b *testing.B) { func streamCaller(client testpb.TestServiceClient, stream testpb.TestService_StreamingCallClient) {
run(b, 1, smallCaller) DoStreamingRoundTrip(client, stream, 1, 1)
} }
func BenchmarkClientSmallc8(b *testing.B) { func BenchmarkClientStreamc1(b *testing.B) {
run(b, 8, smallCaller) runStream(b, 1)
} }
func BenchmarkClientSmallc64(b *testing.B) { func BenchmarkClientStreamc8(b *testing.B) {
run(b, 64, smallCaller) runStream(b, 8)
} }
func BenchmarkClientSmallc512(b *testing.B) { func BenchmarkClientStreamc64(b *testing.B) {
run(b, 512, smallCaller) runStream(b, 64)
}
func BenchmarkClientStreamc512(b *testing.B) {
runStream(b, 512)
}
func BenchmarkClientUnaryc1(b *testing.B) {
runUnary(b, 1)
}
func BenchmarkClientUnaryc8(b *testing.B) {
runUnary(b, 8)
}
func BenchmarkClientUnaryc64(b *testing.B) {
runUnary(b, 64)
}
func BenchmarkClientUnaryc512(b *testing.B) {
runUnary(b, 512)
} }
func TestMain(m *testing.M) { func TestMain(m *testing.M) {

View File

@ -9,29 +9,95 @@ import (
"sync" "sync"
"time" "time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark" "google.golang.org/grpc/benchmark"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
testpb "google.golang.org/grpc/interop/grpc_testing"
) )
var ( var (
server = flag.String("server", "", "The server address") server = flag.String("server", "", "The server address")
maxConcurrentRPCs = flag.Int("max_concurrent_rpcs", 1, "The max number of concurrent RPCs") maxConcurrentRPCs = flag.Int("max_concurrent_rpcs", 1, "The max number of concurrent RPCs")
duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client") duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client")
rpcType = flag.Int("rpc_type", 0,
`Configure different client rpc type. Valid options are:
0 : unary call;
1 : streaming call.`)
) )
func caller(client testpb.TestServiceClient) { func unaryCaller(client testpb.TestServiceClient) {
benchmark.DoUnaryCall(client, 1, 1) benchmark.DoUnaryCall(client, 1, 1)
} }
func closeLoop() { func streamCaller(client testpb.TestServiceClient, stream testpb.TestService_StreamingCallClient) {
s := stats.NewStats(256) benchmark.DoStreamingRoundTrip(client, stream, 1, 1)
conn := benchmark.NewClientConn(*server) }
tc := testpb.NewTestServiceClient(conn)
// Warm up connection. func buildConnection() (s *stats.Stats, conn *grpc.ClientConn, tc testpb.TestServiceClient) {
s = stats.NewStats(256)
conn = benchmark.NewClientConn(*server)
tc = testpb.NewTestServiceClient(conn)
return s, conn, tc
}
func closeLoopUnary() {
s, conn, tc := buildConnection()
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
caller(tc) unaryCaller(tc)
}
ch := make(chan int, *maxConcurrentRPCs*4)
var (
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(*maxConcurrentRPCs)
for i := 0; i < *maxConcurrentRPCs; i++ {
go func() {
for _ = range ch {
start := time.Now()
unaryCaller(tc)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
}
wg.Done()
}()
}
// Stop the client when time is up.
done := make(chan struct{})
go func() {
<-time.After(time.Duration(*duration) * time.Second)
close(done)
}()
ok := true
for ok {
select {
case ch <- 0:
case <-done:
ok = false
}
}
close(ch)
wg.Wait()
conn.Close()
grpclog.Println(s.String())
}
func closeLoopStream() {
s, conn, tc := buildConnection()
stream, err := tc.StreamingCall(context.Background())
if err != nil {
grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
for i := 0; i < 100; i++ {
streamCaller(tc, stream)
} }
ch := make(chan int, *maxConcurrentRPCs*4) ch := make(chan int, *maxConcurrentRPCs*4)
var ( var (
@ -44,7 +110,7 @@ func closeLoop() {
go func() { go func() {
for _ = range ch { for _ = range ch {
start := time.Now() start := time.Now()
caller(tc) streamCaller(tc, stream)
elapse := time.Since(start) elapse := time.Since(start)
mu.Lock() mu.Lock()
s.Add(elapse) s.Add(elapse)
@ -72,9 +138,9 @@ func closeLoop() {
conn.Close() conn.Close()
grpclog.Println(s.String()) grpclog.Println(s.String())
} }
func main() { func main() {
flag.Parse() flag.Parse()
go func() { go func() {
lis, err := net.Listen("tcp", ":0") lis, err := net.Listen("tcp", ":0")
if err != nil { if err != nil {
@ -85,5 +151,10 @@ func main() {
grpclog.Fatalf("Failed to serve: %v", err) grpclog.Fatalf("Failed to serve: %v", err)
} }
}() }()
closeLoop() switch *rpcType {
case 0:
closeLoopUnary()
case 1:
closeLoopStream()
}
} }

File diff suppressed because it is too large Load Diff

View File

@ -1,140 +1,148 @@
// An integration test service that covers all the method signature permutations // An integration test service that covers all the method signature permutations
// of unary/streaming requests/responses. // of unary/streaming requests/responses.
syntax = "proto2"; syntax = "proto3";
package grpc.testing; package grpc.testing;
message Empty {}
// The type of payload that should be returned.
enum PayloadType { enum PayloadType {
// Compressable text format. // Compressable text format.
COMPRESSABLE = 0; COMPRESSABLE = 0;
// Uncompressable binary format. // Uncompressable binary format.
UNCOMPRESSABLE = 1; UNCOMPRESSABLE = 1;
// Randomly chosen from all other formats defined in this enum. // Randomly chosen from all other formats defined in this enum.
RANDOM = 2; RANDOM = 2;
}
message StatsRequest {
// run number
optional int32 test_num = 1;
}
message ServerStats {
// wall clock time
double time_elapsed = 1;
// user time used by the server process and threads
double time_user = 2;
// server time used by the server process and all threads
double time_system = 3;
} }
// A block of data, to simply increase gRPC message size.
message Payload { message Payload {
// The type of data in body. // The type of data in body.
optional PayloadType type = 1; PayloadType type = 1;
// Primary contents of payload. // Primary contents of payload.
optional bytes body = 2; bytes body = 2;
}
message HistogramData {
repeated uint32 bucket = 1;
double min_seen = 2;
double max_seen = 3;
double sum = 4;
double sum_of_squares = 5;
double count = 6;
}
enum ClientType {
SYNCHRONOUS_CLIENT = 0;
ASYNC_CLIENT = 1;
}
enum ServerType {
SYNCHRONOUS_SERVER = 0;
ASYNC_SERVER = 1;
}
enum RpcType {
UNARY = 0;
STREAMING = 1;
}
message ClientConfig {
repeated string server_targets = 1;
ClientType client_type = 2;
bool enable_ssl = 3;
int32 outstanding_rpcs_per_channel = 4;
int32 client_channels = 5;
int32 payload_size = 6;
// only for async client:
int32 async_client_threads = 7;
RpcType rpc_type = 8;
}
// Request current stats
message Mark {}
message ClientArgs {
oneof argtype {
ClientConfig setup = 1;
Mark mark = 2;
}
}
message ClientStats {
HistogramData latencies = 1;
double time_elapsed = 3;
double time_user = 4;
double time_system = 5;
}
message ClientStatus {
ClientStats stats = 1;
}
message ServerConfig {
ServerType server_type = 1;
int32 threads = 2;
bool enable_ssl = 3;
}
message ServerArgs {
oneof argtype {
ServerConfig setup = 1;
Mark mark = 2;
}
}
message ServerStatus {
ServerStats stats = 1;
int32 port = 2;
} }
// Unary request.
message SimpleRequest { message SimpleRequest {
// Desired payload type in the response from the server. // Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats. // If response_type is RANDOM, server randomly chooses one from other formats.
optional PayloadType response_type = 1; PayloadType response_type = 1;
// Desired payload size in the response from the server. // Desired payload size in the response from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression. // If response_type is COMPRESSABLE, this denotes the size before compression.
optional int32 response_size = 2; int32 response_size = 2;
// Optional input payload sent along with the request. // Optional input payload sent along with the request.
optional Payload payload = 3; Payload payload = 3;
// Whether SimpleResponse should include username.
optional bool fill_username = 4;
// Whether SimpleResponse should include OAuth scope.
optional bool fill_oauth_scope = 5;
} }
// Unary response, as configured by the request.
message SimpleResponse { message SimpleResponse {
// Payload to increase message size. Payload payload = 1;
optional Payload payload = 1;
// The user the request came from, for verifying authentication was
// successful when the client expected it.
optional string username = 2;
// OAuth scope.
optional string oauth_scope = 3;
} }
// Client-streaming request.
message StreamingInputCallRequest {
// Optional input payload sent along with the request.
optional Payload payload = 1;
// Not expecting any payload from the response.
}
// Client-streaming response.
message StreamingInputCallResponse {
// Aggregated size of payloads received from the client.
optional int32 aggregated_payload_size = 1;
}
// Configuration for a particular response.
message ResponseParameters {
// Desired payload sizes in responses from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
optional int32 size = 1;
// Desired interval between consecutive responses in the response stream in
// microseconds.
optional int32 interval_us = 2;
}
// Server-streaming request.
message StreamingOutputCallRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, the payload from each response in the stream
// might be of different types. This is to simulate a mixed type of payload
// stream.
optional PayloadType response_type = 1;
// Configuration for each expected response message.
repeated ResponseParameters response_parameters = 2;
// Optional input payload sent along with the request.
optional Payload payload = 3;
}
// Server-streaming response, as configured by the request and parameters.
message StreamingOutputCallResponse {
// Payload to increase response size.
optional Payload payload = 1;
}
// A simple service to test the various types of RPCs and experiment with
// performance with various types of payload.
service TestService { service TestService {
// One empty request followed by one empty response. // One request followed by one response.
rpc EmptyCall(Empty) returns (Empty); // The server returns the client payload as-is.
rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
// One request followed by one response. // One request followed by one response.
// The server returns the client payload as-is. // The server returns the client payload as-is.
rpc UnaryCall(SimpleRequest) returns (SimpleResponse); rpc StreamingCall(stream SimpleRequest) returns (stream SimpleResponse);
}
// One request followed by a sequence of responses (streamed download).
// The server returns the payload with client desired type and sizes. service Worker {
rpc StreamingOutputCall(StreamingOutputCallRequest) // Start test with specified workload
returns (stream StreamingOutputCallResponse); rpc RunTest(stream ClientArgs) returns (stream ClientStatus);
// Start test with specified workload
// A sequence of requests followed by one response (streamed upload). rpc RunServer(stream ServerArgs) returns (stream ServerStatus);
// The server returns the aggregated size of client payload as the result.
rpc StreamingInputCall(stream StreamingInputCallRequest)
returns (StreamingInputCallResponse);
// A sequence of requests with each request served by the server immediately.
// As one request could lead to multiple responses, this interface
// demonstrates the idea of full duplexing.
rpc FullDuplexCall(stream StreamingOutputCallRequest)
returns (stream StreamingOutputCallResponse);
// A sequence of requests followed by a sequence of responses.
// The server buffers all the client requests and then serves them in order. A
// stream of responses are returned to the client when the server starts with
// first request.
rpc HalfDuplexCall(stream StreamingOutputCallRequest)
returns (stream StreamingOutputCallResponse);
} }