modified test proto and add benchmark test for streaming rpc

This commit is contained in:
yangzhouhan 2015-06-01 14:58:25 -07:00
parent 4674062dc7
commit 7b4cd4d7d3
5 changed files with 705 additions and 688 deletions

View File

@ -40,13 +40,13 @@ import (
"io"
"math"
"net"
"time"
// "time"
"github.com/golang/protobuf/proto"
// "github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/grpclog"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
@ -62,7 +62,7 @@ func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
grpclog.Fatalf("Unsupported payload type: %d", t)
}
return &testpb.Payload{
Type: t.Enum(),
Type: t,
Body: body,
}
}
@ -70,49 +70,13 @@ func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
type testServer struct {
}
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return new(testpb.Empty), nil
}
func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{
Payload: newPayload(in.GetResponseType(), int(in.GetResponseSize())),
Payload: newPayload(in.ResponseType, int(in.ResponseSize)),
}, nil
}
func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
cs := args.GetResponseParameters()
for _, c := range cs {
if us := c.GetIntervalUs(); us > 0 {
time.Sleep(time.Duration(us) * time.Microsecond)
}
if err := stream.Send(&testpb.StreamingOutputCallResponse{
Payload: newPayload(args.GetResponseType(), int(c.GetSize())),
}); err != nil {
return err
}
}
return nil
}
func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
var sum int
for {
in, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&testpb.StreamingInputCallResponse{
AggregatedPayloadSize: proto.Int32(int32(sum)),
})
}
if err != nil {
return err
}
p := in.GetPayload().GetBody()
sum += len(p)
}
}
func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
func (s *testServer) StreamingCall(stream testpb.TestService_StreamingCallServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
@ -122,47 +86,12 @@ func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServ
if err != nil {
return err
}
cs := in.GetResponseParameters()
for _, c := range cs {
if us := c.GetIntervalUs(); us > 0 {
time.Sleep(time.Duration(us) * time.Microsecond)
}
if err := stream.Send(&testpb.StreamingOutputCallResponse{
Payload: newPayload(in.GetResponseType(), int(c.GetSize())),
}); err != nil {
return err
}
}
}
}
func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error {
var msgBuf []*testpb.StreamingOutputCallRequest
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
break
}
if err != nil {
if err := stream.Send(&testpb.SimpleResponse{
Payload: newPayload(in.ResponseType, int(in.ResponseSize)),
}); err != nil {
return err
}
msgBuf = append(msgBuf, in)
}
for _, m := range msgBuf {
cs := m.GetResponseParameters()
for _, c := range cs {
if us := c.GetIntervalUs(); us > 0 {
time.Sleep(time.Duration(us) * time.Microsecond)
}
if err := stream.Send(&testpb.StreamingOutputCallResponse{
Payload: newPayload(m.GetResponseType(), int(c.GetSize())),
}); err != nil {
return err
}
}
}
return nil
}
// StartServer starts a gRPC server serving a benchmark service. It returns its
@ -184,8 +113,8 @@ func StartServer() (string, func()) {
func DoUnaryCall(tc testpb.TestServiceClient, reqSize, respSize int) {
pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(respSize)),
ResponseType: pl.Type,
ResponseSize: int32(respSize),
Payload: pl,
}
if _, err := tc.UnaryCall(context.Background(), req); err != nil {
@ -193,6 +122,27 @@ func DoUnaryCall(tc testpb.TestServiceClient, reqSize, respSize int) {
}
}
//DoStreamingcall performs a streaming RPC with given stub and request and response size.client side
func DoStreamingCall(tc testpb.TestServiceClient, reqSize, respSize int) {
pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
req := &testpb.SimpleRequest{
ResponseType: pl.Type,
ResponseSize: int32(respSize),
Payload: pl,
}
stream, err := tc.StreamingCall(context.Background())
if err != nil {
grpclog.Fatalf("TestService/Streaming call rpc failred: ", err)
}
if err := stream.Send(req); err != nil {
grpclog.Fatalf("/TestService/Streaming call send failed: ", err)
}
_, err = stream.Recv()
if err != nil {
grpclog.Fatal("/TestService/streamingCall receive failed: ", err)
}
}
// NewClientConn creates a gRPC client connection to addr.
func NewClientConn(addr string) *grpc.ClientConn {
conn, err := grpc.Dial(addr)

View File

@ -6,8 +6,8 @@ import (
"testing"
"time"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/benchmark/stats"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
func run(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceClient)) {
@ -54,10 +54,76 @@ func run(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceCli
conn.Close()
}
func runStream(b *testing.B, maxConcurrentCalls int, caller func(testpb.TestServiceClient)) {
s := stats.AddStats(b, 38)
b.StopTimer()
target, stopper := StartServer()
defer stopper()
conn := NewClientConn(target)
tc := testpb.NewTestServiceClient(conn)
// Warm up connection.
for i := 0; i < 10; i++ {
caller(tc)
}
ch := make(chan int, maxConcurrentCalls*4)
var (
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(maxConcurrentCalls)
// Distribute the b.N calls over maxConcurrentCalls workers.
for i := 0; i < maxConcurrentCalls; i++ {
go func() {
for _ = range ch {
start := time.Now()
caller(tc)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
}
wg.Done()
}()
}
b.StartTimer()
for i := 0; i < b.N; i++ {
ch <- i
}
b.StopTimer()
close(ch)
wg.Wait()
conn.Close()
}
func smallCaller(client testpb.TestServiceClient) {
DoUnaryCall(client, 1, 1)
}
func streamCaller(client testpb.TestServiceClient) {
//func streamCaller(client testpb.TestServiceClient){
DoStreamingCall(client, 1, 1)
//DoStreamingCall(client,1,1)
}
func BenchmarkClientStreamc1(b *testing.B) {
runStream(b, 1, streamCaller)
}
func BenchmarkClientStreamc8(b *testing.B) {
runStream(b, 8, streamCaller)
}
func BenchmarkClientStreamc64(b *testing.B) {
runStream(b, 64, streamCaller)
}
func BenchmarkClientStreamc512(b *testing.B) {
runStream(b, 512, streamCaller)
}
func BenchmarkClientSmallc1(b *testing.B) {
run(b, 1, smallCaller)
}

View File

@ -10,9 +10,9 @@ import (
"time"
"google.golang.org/grpc/benchmark"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/grpclog"
testpb "google.golang.org/grpc/interop/grpc_testing"
)
var (
@ -21,17 +21,21 @@ var (
duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client")
)
func caller(client testpb.TestServiceClient) {
func unaryCaller(client testpb.TestServiceClient) {
benchmark.DoUnaryCall(client, 1, 1)
}
func closeLoop() {
func streamCaller(client testpb.TestServiceClient) {
benchmark.DoStreamingCall(client, 1, 1)
}
func closeLoopStream() {
s := stats.NewStats(256)
conn := benchmark.NewClientConn(*server)
tc := testpb.NewTestServiceClient(conn)
// Warm up connection.
for i := 0; i < 100; i++ {
caller(tc)
streamCaller(tc)
}
ch := make(chan int, *maxConcurrentRPCs*4)
var (
@ -44,7 +48,55 @@ func closeLoop() {
go func() {
for _ = range ch {
start := time.Now()
caller(tc)
streamCaller(tc)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
}
wg.Done()
}()
}
// Stop the client when time is up.
done := make(chan struct{})
go func() {
<-time.After(time.Duration(*duration) * time.Second)
close(done)
}()
ok := true
for ok {
select {
case ch <- 0:
case <-done:
ok = false
}
}
close(ch)
wg.Wait()
conn.Close()
grpclog.Println(s.String())
}
func closeLoop() {
s := stats.NewStats(256)
conn := benchmark.NewClientConn(*server)
tc := testpb.NewTestServiceClient(conn)
// Warm up connection.
for i := 0; i < 100; i++ {
unaryCaller(tc)
}
ch := make(chan int, *maxConcurrentRPCs*4)
var (
mu sync.Mutex
wg sync.WaitGroup
)
wg.Add(*maxConcurrentRPCs)
// Distribute RPCs over maxConcurrentCalls workers.
for i := 0; i < *maxConcurrentRPCs; i++ {
go func() {
for _ = range ch {
start := time.Now()
unaryCaller(tc)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)

File diff suppressed because it is too large Load Diff

View File

@ -1,140 +1,148 @@
// An integration test service that covers all the method signature permutations
// of unary/streaming requests/responses.
syntax = "proto2";
syntax = "proto3";
package grpc.testing;
message Empty {}
// The type of payload that should be returned.
enum PayloadType {
// Compressable text format.
COMPRESSABLE = 0;
// Compressable text format.
COMPRESSABLE = 0;
// Uncompressable binary format.
UNCOMPRESSABLE = 1;
// Uncompressable binary format.
UNCOMPRESSABLE = 1;
// Randomly chosen from all other formats defined in this enum.
RANDOM = 2;
// Randomly chosen from all other formats defined in this enum.
RANDOM = 2;
}
message StatsRequest {
// run number
optional int32 test_num = 1;
}
message ServerStats {
// wall clock time
double time_elapsed = 1;
// user time used by the server process and threads
double time_user = 2;
// server time used by the server process and all threads
double time_system = 3;
}
// A block of data, to simply increase gRPC message size.
message Payload {
// The type of data in body.
optional PayloadType type = 1;
// Primary contents of payload.
optional bytes body = 2;
// The type of data in body.
PayloadType type = 1;
// Primary contents of payload.
bytes body = 2;
}
message HistogramData {
repeated uint32 bucket = 1;
double min_seen = 2;
double max_seen = 3;
double sum = 4;
double sum_of_squares = 5;
double count = 6;
}
enum ClientType {
SYNCHRONOUS_CLIENT = 0;
ASYNC_CLIENT = 1;
}
enum ServerType {
SYNCHRONOUS_SERVER = 0;
ASYNC_SERVER = 1;
}
enum RpcType {
UNARY = 0;
STREAMING = 1;
}
message ClientConfig {
repeated string server_targets = 1;
ClientType client_type = 2;
bool enable_ssl = 3;
int32 outstanding_rpcs_per_channel = 4;
int32 client_channels = 5;
int32 payload_size = 6;
// only for async client:
int32 async_client_threads = 7;
RpcType rpc_type = 8;
}
// Request current stats
message Mark {}
message ClientArgs {
oneof argtype {
ClientConfig setup = 1;
Mark mark = 2;
}
}
message ClientStats {
HistogramData latencies = 1;
double time_elapsed = 3;
double time_user = 4;
double time_system = 5;
}
message ClientStatus {
ClientStats stats = 1;
}
message ServerConfig {
ServerType server_type = 1;
int32 threads = 2;
bool enable_ssl = 3;
}
message ServerArgs {
oneof argtype {
ServerConfig setup = 1;
Mark mark = 2;
}
}
message ServerStatus {
ServerStats stats = 1;
int32 port = 2;
}
// Unary request.
message SimpleRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
optional PayloadType response_type = 1;
// Desired payload type in the response from the server.
// If response_type is RANDOM, server randomly chooses one from other formats.
PayloadType response_type = 1;
// Desired payload size in the response from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
optional int32 response_size = 2;
// Desired payload size in the response from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
int32 response_size = 2;
// Optional input payload sent along with the request.
optional Payload payload = 3;
// Whether SimpleResponse should include username.
optional bool fill_username = 4;
// Whether SimpleResponse should include OAuth scope.
optional bool fill_oauth_scope = 5;
// Optional input payload sent along with the request.
Payload payload = 3;
}
// Unary response, as configured by the request.
message SimpleResponse {
// Payload to increase message size.
optional Payload payload = 1;
// The user the request came from, for verifying authentication was
// successful when the client expected it.
optional string username = 2;
// OAuth scope.
optional string oauth_scope = 3;
Payload payload = 1;
}
// Client-streaming request.
message StreamingInputCallRequest {
// Optional input payload sent along with the request.
optional Payload payload = 1;
// Not expecting any payload from the response.
}
// Client-streaming response.
message StreamingInputCallResponse {
// Aggregated size of payloads received from the client.
optional int32 aggregated_payload_size = 1;
}
// Configuration for a particular response.
message ResponseParameters {
// Desired payload sizes in responses from the server.
// If response_type is COMPRESSABLE, this denotes the size before compression.
optional int32 size = 1;
// Desired interval between consecutive responses in the response stream in
// microseconds.
optional int32 interval_us = 2;
}
// Server-streaming request.
message StreamingOutputCallRequest {
// Desired payload type in the response from the server.
// If response_type is RANDOM, the payload from each response in the stream
// might be of different types. This is to simulate a mixed type of payload
// stream.
optional PayloadType response_type = 1;
// Configuration for each expected response message.
repeated ResponseParameters response_parameters = 2;
// Optional input payload sent along with the request.
optional Payload payload = 3;
}
// Server-streaming response, as configured by the request and parameters.
message StreamingOutputCallResponse {
// Payload to increase response size.
optional Payload payload = 1;
}
// A simple service to test the various types of RPCs and experiment with
// performance with various types of payload.
service TestService {
// One empty request followed by one empty response.
rpc EmptyCall(Empty) returns (Empty);
// One request followed by one response.
// The server returns the client payload as-is.
rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
// One request followed by one response.
// The server returns the client payload as-is.
rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
// One request followed by a sequence of responses (streamed download).
// The server returns the payload with client desired type and sizes.
rpc StreamingOutputCall(StreamingOutputCallRequest)
returns (stream StreamingOutputCallResponse);
// A sequence of requests followed by one response (streamed upload).
// The server returns the aggregated size of client payload as the result.
rpc StreamingInputCall(stream StreamingInputCallRequest)
returns (StreamingInputCallResponse);
// A sequence of requests with each request served by the server immediately.
// As one request could lead to multiple responses, this interface
// demonstrates the idea of full duplexing.
rpc FullDuplexCall(stream StreamingOutputCallRequest)
returns (stream StreamingOutputCallResponse);
// A sequence of requests followed by a sequence of responses.
// The server buffers all the client requests and then serves them in order. A
// stream of responses are returned to the client when the server starts with
// first request.
rpc HalfDuplexCall(stream StreamingOutputCallRequest)
returns (stream StreamingOutputCallResponse);
// One request followed by one response.
// The server returns the client payload as-is.
rpc StreamingCall(stream SimpleRequest) returns (stream SimpleResponse);
}
service Worker {
// Start test with specified workload
rpc RunTest(stream ClientArgs) returns (stream ClientStatus);
// Start test with specified workload
rpc RunServer(stream ServerArgs) returns (stream ServerStatus);
}