mirror of https://github.com/grpc/grpc-go.git
				
				
				
			Merge branch 'master' of https://github.com/grpc/grpc-go
This commit is contained in:
		
						commit
						7af9aab473
					
				| 
						 | 
				
			
			@ -0,0 +1,298 @@
 | 
			
		|||
/*
 | 
			
		||||
 *
 | 
			
		||||
 * Copyright 2016, Google Inc.
 | 
			
		||||
 * All rights reserved.
 | 
			
		||||
 *
 | 
			
		||||
 * Redistribution and use in source and binary forms, with or without
 | 
			
		||||
 * modification, are permitted provided that the following conditions are
 | 
			
		||||
 * met:
 | 
			
		||||
 *
 | 
			
		||||
 *     * Redistributions of source code must retain the above copyright
 | 
			
		||||
 * notice, this list of conditions and the following disclaimer.
 | 
			
		||||
 *     * Redistributions in binary form must reproduce the above
 | 
			
		||||
 * copyright notice, this list of conditions and the following disclaimer
 | 
			
		||||
 * in the documentation and/or other materials provided with the
 | 
			
		||||
 * distribution.
 | 
			
		||||
 *     * Neither the name of Google Inc. nor the names of its
 | 
			
		||||
 * contributors may be used to endorse or promote products derived from
 | 
			
		||||
 * this software without specific prior written permission.
 | 
			
		||||
 *
 | 
			
		||||
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 | 
			
		||||
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 | 
			
		||||
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 | 
			
		||||
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 | 
			
		||||
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 | 
			
		||||
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 | 
			
		||||
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 | 
			
		||||
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 | 
			
		||||
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 | 
			
		||||
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 | 
			
		||||
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 | 
			
		||||
 *
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
// client starts an interop client to do stress test and a metrics server to report qps.
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"flag"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"net"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
	"google.golang.org/grpc"
 | 
			
		||||
	"google.golang.org/grpc/codes"
 | 
			
		||||
	"google.golang.org/grpc/grpclog"
 | 
			
		||||
	"google.golang.org/grpc/interop"
 | 
			
		||||
	testpb "google.golang.org/grpc/interop/grpc_testing"
 | 
			
		||||
	metricspb "google.golang.org/grpc/stress/grpc_testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	serverAddresses      = flag.String("server_addresses", "localhost:8080", "a list of server addresses")
 | 
			
		||||
	testCases            = flag.String("test_cases", "", "a list of test cases along with the relative weights")
 | 
			
		||||
	testDurationSecs     = flag.Int("test_duration_secs", -1, "test duration in seconds")
 | 
			
		||||
	numChannelsPerServer = flag.Int("num_channels_per_server", 1, "Number of channels (i.e connections) to each server")
 | 
			
		||||
	numStubsPerChannel   = flag.Int("num_stubs_per_channel", 1, "Number of client stubs per each connection to server")
 | 
			
		||||
	metricsPort          = flag.Int("metrics_port", 8081, "The port at which the stress client exposes QPS metrics")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// testCaseWithWeight contains the test case type and its weight.
 | 
			
		||||
type testCaseWithWeight struct {
 | 
			
		||||
	name   string
 | 
			
		||||
	weight int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// parseTestCases converts test case string to a list of struct testCaseWithWeight.
 | 
			
		||||
func parseTestCases(testCaseString string) []testCaseWithWeight {
 | 
			
		||||
	testCaseStrings := strings.Split(testCaseString, ",")
 | 
			
		||||
	testCases := make([]testCaseWithWeight, len(testCaseStrings))
 | 
			
		||||
	for i, str := range testCaseStrings {
 | 
			
		||||
		testCase := strings.Split(str, ":")
 | 
			
		||||
		if len(testCase) != 2 {
 | 
			
		||||
			panic(fmt.Sprintf("invalid test case with weight: %s", str))
 | 
			
		||||
		}
 | 
			
		||||
		// Check if test case is supported.
 | 
			
		||||
		switch testCase[0] {
 | 
			
		||||
		case
 | 
			
		||||
			"empty_unary",
 | 
			
		||||
			"large_unary",
 | 
			
		||||
			"client_streaming",
 | 
			
		||||
			"server_streaming",
 | 
			
		||||
			"empty_stream":
 | 
			
		||||
		default:
 | 
			
		||||
			panic(fmt.Sprintf("unknown test type: %s", testCase[0]))
 | 
			
		||||
		}
 | 
			
		||||
		testCases[i].name = testCase[0]
 | 
			
		||||
		w, err := strconv.Atoi(testCase[1])
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(fmt.Sprintf("%v", err))
 | 
			
		||||
		}
 | 
			
		||||
		testCases[i].weight = w
 | 
			
		||||
	}
 | 
			
		||||
	return testCases
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// weightedRandomTestSelector defines a weighted random selector for test case types.
 | 
			
		||||
type weightedRandomTestSelector struct {
 | 
			
		||||
	tests       []testCaseWithWeight
 | 
			
		||||
	totalWeight int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// newWeightedRandomTestSelector constructs a weightedRandomTestSelector with the given list of testCaseWithWeight.
 | 
			
		||||
func newWeightedRandomTestSelector(tests []testCaseWithWeight) *weightedRandomTestSelector {
 | 
			
		||||
	var totalWeight int
 | 
			
		||||
	for _, t := range tests {
 | 
			
		||||
		totalWeight += t.weight
 | 
			
		||||
	}
 | 
			
		||||
	rand.Seed(time.Now().UnixNano())
 | 
			
		||||
	return &weightedRandomTestSelector{tests, totalWeight}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (selector weightedRandomTestSelector) getNextTest() string {
 | 
			
		||||
	random := rand.Intn(selector.totalWeight)
 | 
			
		||||
	var weightSofar int
 | 
			
		||||
	for _, test := range selector.tests {
 | 
			
		||||
		weightSofar += test.weight
 | 
			
		||||
		if random < weightSofar {
 | 
			
		||||
			return test.name
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	panic("no test case selected by weightedRandomTestSelector")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// gauge stores the qps of one interop client (one stub).
 | 
			
		||||
type gauge struct {
 | 
			
		||||
	mutex sync.RWMutex
 | 
			
		||||
	val   int64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *gauge) set(v int64) {
 | 
			
		||||
	g.mutex.Lock()
 | 
			
		||||
	defer g.mutex.Unlock()
 | 
			
		||||
	g.val = v
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *gauge) get() int64 {
 | 
			
		||||
	g.mutex.RLock()
 | 
			
		||||
	defer g.mutex.RUnlock()
 | 
			
		||||
	return g.val
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// server implements metrics server functions.
 | 
			
		||||
type server struct {
 | 
			
		||||
	mutex sync.RWMutex
 | 
			
		||||
	// gauges is a map from /stress_test/server_<n>/channel_<n>/stub_<n>/qps to its qps gauge.
 | 
			
		||||
	gauges map[string]*gauge
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// newMetricsServer returns a new metrics server.
 | 
			
		||||
func newMetricsServer() *server {
 | 
			
		||||
	return &server{gauges: make(map[string]*gauge)}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetAllGauges returns all gauges.
 | 
			
		||||
func (s *server) GetAllGauges(in *metricspb.EmptyMessage, stream metricspb.MetricsService_GetAllGaugesServer) error {
 | 
			
		||||
	s.mutex.RLock()
 | 
			
		||||
	defer s.mutex.RUnlock()
 | 
			
		||||
 | 
			
		||||
	for name, gauge := range s.gauges {
 | 
			
		||||
		if err := stream.Send(&metricspb.GaugeResponse{Name: name, Value: &metricspb.GaugeResponse_LongValue{gauge.get()}}); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetGauge returns the gauge for the given name.
 | 
			
		||||
func (s *server) GetGauge(ctx context.Context, in *metricspb.GaugeRequest) (*metricspb.GaugeResponse, error) {
 | 
			
		||||
	s.mutex.RLock()
 | 
			
		||||
	defer s.mutex.RUnlock()
 | 
			
		||||
 | 
			
		||||
	if g, ok := s.gauges[in.Name]; ok {
 | 
			
		||||
		return &metricspb.GaugeResponse{Name: in.Name, Value: &metricspb.GaugeResponse_LongValue{g.get()}}, nil
 | 
			
		||||
	}
 | 
			
		||||
	return nil, grpc.Errorf(codes.InvalidArgument, "gauge with name %s not found", in.Name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// createGauge creates a guage using the given name in metrics server.
 | 
			
		||||
func (s *server) createGauge(name string) *gauge {
 | 
			
		||||
	s.mutex.Lock()
 | 
			
		||||
	defer s.mutex.Unlock()
 | 
			
		||||
 | 
			
		||||
	if _, ok := s.gauges[name]; ok {
 | 
			
		||||
		// gauge already exists.
 | 
			
		||||
		panic(fmt.Sprintf("gauge %s already exists", name))
 | 
			
		||||
	}
 | 
			
		||||
	var g gauge
 | 
			
		||||
	s.gauges[name] = &g
 | 
			
		||||
	return &g
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func startServer(server *server, port int) {
 | 
			
		||||
	lis, err := net.Listen("tcp", ":"+strconv.Itoa(port))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		grpclog.Fatalf("failed to listen: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	s := grpc.NewServer()
 | 
			
		||||
	metricspb.RegisterMetricsServiceServer(s, server)
 | 
			
		||||
	s.Serve(lis)
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// performRPCs uses weightedRandomTestSelector to select test case and runs the tests.
 | 
			
		||||
func performRPCs(gauge *gauge, conn *grpc.ClientConn, selector *weightedRandomTestSelector, stop <-chan bool) {
 | 
			
		||||
	client := testpb.NewTestServiceClient(conn)
 | 
			
		||||
	var numCalls int64
 | 
			
		||||
	startTime := time.Now()
 | 
			
		||||
	for {
 | 
			
		||||
		done := make(chan bool, 1)
 | 
			
		||||
		go func() {
 | 
			
		||||
			test := selector.getNextTest()
 | 
			
		||||
			switch test {
 | 
			
		||||
			case "empty_unary":
 | 
			
		||||
				interop.DoEmptyUnaryCall(client)
 | 
			
		||||
			case "large_unary":
 | 
			
		||||
				interop.DoLargeUnaryCall(client)
 | 
			
		||||
			case "client_streaming":
 | 
			
		||||
				interop.DoClientStreaming(client)
 | 
			
		||||
			case "server_streaming":
 | 
			
		||||
				interop.DoServerStreaming(client)
 | 
			
		||||
			case "empty_stream":
 | 
			
		||||
				interop.DoEmptyStream(client)
 | 
			
		||||
			}
 | 
			
		||||
			done <- true
 | 
			
		||||
		}()
 | 
			
		||||
		select {
 | 
			
		||||
		case <-stop:
 | 
			
		||||
			return
 | 
			
		||||
		case <-done:
 | 
			
		||||
			numCalls++
 | 
			
		||||
			gauge.set(int64(float64(numCalls) / time.Since(startTime).Seconds()))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func logParameterInfo(addresses []string, tests []testCaseWithWeight) {
 | 
			
		||||
	grpclog.Printf("server_addresses: %s", *serverAddresses)
 | 
			
		||||
	grpclog.Printf("test_cases: %s", *testCases)
 | 
			
		||||
	grpclog.Printf("test_duration-secs: %d", *testDurationSecs)
 | 
			
		||||
	grpclog.Printf("num_channels_per_server: %d", *numChannelsPerServer)
 | 
			
		||||
	grpclog.Printf("num_stubs_per_channel: %d", *numStubsPerChannel)
 | 
			
		||||
	grpclog.Printf("metrics_port: %d", *metricsPort)
 | 
			
		||||
 | 
			
		||||
	grpclog.Println("addresses:")
 | 
			
		||||
	for i, addr := range addresses {
 | 
			
		||||
		grpclog.Printf("%d. %s\n", i+1, addr)
 | 
			
		||||
	}
 | 
			
		||||
	grpclog.Println("tests:")
 | 
			
		||||
	for i, test := range tests {
 | 
			
		||||
		grpclog.Printf("%d. %v\n", i+1, test)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	flag.Parse()
 | 
			
		||||
	addresses := strings.Split(*serverAddresses, ",")
 | 
			
		||||
	tests := parseTestCases(*testCases)
 | 
			
		||||
	logParameterInfo(addresses, tests)
 | 
			
		||||
	testSelector := newWeightedRandomTestSelector(tests)
 | 
			
		||||
	metricsServer := newMetricsServer()
 | 
			
		||||
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	wg.Add(len(addresses) * *numChannelsPerServer * *numStubsPerChannel)
 | 
			
		||||
	stop := make(chan bool)
 | 
			
		||||
 | 
			
		||||
	for serverIndex, address := range addresses {
 | 
			
		||||
		for connIndex := 0; connIndex < *numChannelsPerServer; connIndex++ {
 | 
			
		||||
			conn, err := grpc.Dial(address, grpc.WithInsecure())
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				grpclog.Fatalf("Fail to dial: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			defer conn.Close()
 | 
			
		||||
			for clientIndex := 0; clientIndex < *numStubsPerChannel; clientIndex++ {
 | 
			
		||||
				name := fmt.Sprintf("/stress_test/server_%d/channel_%d/stub_%d/qps", serverIndex+1, connIndex+1, clientIndex+1)
 | 
			
		||||
				go func() {
 | 
			
		||||
					defer wg.Done()
 | 
			
		||||
					g := metricsServer.createGauge(name)
 | 
			
		||||
					performRPCs(g, conn, testSelector, stop)
 | 
			
		||||
				}()
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	go startServer(metricsServer, *metricsPort)
 | 
			
		||||
	if *testDurationSecs > 0 {
 | 
			
		||||
		time.Sleep(time.Duration(*testDurationSecs) * time.Second)
 | 
			
		||||
		close(stop)
 | 
			
		||||
	}
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	grpclog.Printf(" ===== ALL DONE ===== ")
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,350 @@
 | 
			
		|||
// Code generated by protoc-gen-go.
 | 
			
		||||
// source: metrics.proto
 | 
			
		||||
// DO NOT EDIT!
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
Package grpc_testing is a generated protocol buffer package.
 | 
			
		||||
 | 
			
		||||
It is generated from these files:
 | 
			
		||||
	metrics.proto
 | 
			
		||||
 | 
			
		||||
It has these top-level messages:
 | 
			
		||||
	GaugeResponse
 | 
			
		||||
	GaugeRequest
 | 
			
		||||
	EmptyMessage
 | 
			
		||||
*/
 | 
			
		||||
package grpc_testing
 | 
			
		||||
 | 
			
		||||
import proto "github.com/golang/protobuf/proto"
 | 
			
		||||
import fmt "fmt"
 | 
			
		||||
import math "math"
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	context "golang.org/x/net/context"
 | 
			
		||||
	grpc "google.golang.org/grpc"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Reference imports to suppress errors if they are not otherwise used.
 | 
			
		||||
var _ = proto.Marshal
 | 
			
		||||
var _ = fmt.Errorf
 | 
			
		||||
var _ = math.Inf
 | 
			
		||||
 | 
			
		||||
// This is a compile-time assertion to ensure that this generated file
 | 
			
		||||
// is compatible with the proto package it is being compiled against.
 | 
			
		||||
const _ = proto.ProtoPackageIsVersion1
 | 
			
		||||
 | 
			
		||||
// Reponse message containing the gauge name and value
 | 
			
		||||
type GaugeResponse struct {
 | 
			
		||||
	Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
 | 
			
		||||
	// Types that are valid to be assigned to Value:
 | 
			
		||||
	//	*GaugeResponse_LongValue
 | 
			
		||||
	//	*GaugeResponse_DoubleValue
 | 
			
		||||
	//	*GaugeResponse_StringValue
 | 
			
		||||
	Value isGaugeResponse_Value `protobuf_oneof:"value"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *GaugeResponse) Reset()                    { *m = GaugeResponse{} }
 | 
			
		||||
func (m *GaugeResponse) String() string            { return proto.CompactTextString(m) }
 | 
			
		||||
func (*GaugeResponse) ProtoMessage()               {}
 | 
			
		||||
func (*GaugeResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
 | 
			
		||||
 | 
			
		||||
type isGaugeResponse_Value interface {
 | 
			
		||||
	isGaugeResponse_Value()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type GaugeResponse_LongValue struct {
 | 
			
		||||
	LongValue int64 `protobuf:"varint,2,opt,name=long_value,json=longValue,oneof"`
 | 
			
		||||
}
 | 
			
		||||
type GaugeResponse_DoubleValue struct {
 | 
			
		||||
	DoubleValue float64 `protobuf:"fixed64,3,opt,name=double_value,json=doubleValue,oneof"`
 | 
			
		||||
}
 | 
			
		||||
type GaugeResponse_StringValue struct {
 | 
			
		||||
	StringValue string `protobuf:"bytes,4,opt,name=string_value,json=stringValue,oneof"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (*GaugeResponse_LongValue) isGaugeResponse_Value()   {}
 | 
			
		||||
func (*GaugeResponse_DoubleValue) isGaugeResponse_Value() {}
 | 
			
		||||
func (*GaugeResponse_StringValue) isGaugeResponse_Value() {}
 | 
			
		||||
 | 
			
		||||
func (m *GaugeResponse) GetValue() isGaugeResponse_Value {
 | 
			
		||||
	if m != nil {
 | 
			
		||||
		return m.Value
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *GaugeResponse) GetLongValue() int64 {
 | 
			
		||||
	if x, ok := m.GetValue().(*GaugeResponse_LongValue); ok {
 | 
			
		||||
		return x.LongValue
 | 
			
		||||
	}
 | 
			
		||||
	return 0
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *GaugeResponse) GetDoubleValue() float64 {
 | 
			
		||||
	if x, ok := m.GetValue().(*GaugeResponse_DoubleValue); ok {
 | 
			
		||||
		return x.DoubleValue
 | 
			
		||||
	}
 | 
			
		||||
	return 0
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *GaugeResponse) GetStringValue() string {
 | 
			
		||||
	if x, ok := m.GetValue().(*GaugeResponse_StringValue); ok {
 | 
			
		||||
		return x.StringValue
 | 
			
		||||
	}
 | 
			
		||||
	return ""
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// XXX_OneofFuncs is for the internal use of the proto package.
 | 
			
		||||
func (*GaugeResponse) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
 | 
			
		||||
	return _GaugeResponse_OneofMarshaler, _GaugeResponse_OneofUnmarshaler, _GaugeResponse_OneofSizer, []interface{}{
 | 
			
		||||
		(*GaugeResponse_LongValue)(nil),
 | 
			
		||||
		(*GaugeResponse_DoubleValue)(nil),
 | 
			
		||||
		(*GaugeResponse_StringValue)(nil),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func _GaugeResponse_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
 | 
			
		||||
	m := msg.(*GaugeResponse)
 | 
			
		||||
	// value
 | 
			
		||||
	switch x := m.Value.(type) {
 | 
			
		||||
	case *GaugeResponse_LongValue:
 | 
			
		||||
		b.EncodeVarint(2<<3 | proto.WireVarint)
 | 
			
		||||
		b.EncodeVarint(uint64(x.LongValue))
 | 
			
		||||
	case *GaugeResponse_DoubleValue:
 | 
			
		||||
		b.EncodeVarint(3<<3 | proto.WireFixed64)
 | 
			
		||||
		b.EncodeFixed64(math.Float64bits(x.DoubleValue))
 | 
			
		||||
	case *GaugeResponse_StringValue:
 | 
			
		||||
		b.EncodeVarint(4<<3 | proto.WireBytes)
 | 
			
		||||
		b.EncodeStringBytes(x.StringValue)
 | 
			
		||||
	case nil:
 | 
			
		||||
	default:
 | 
			
		||||
		return fmt.Errorf("GaugeResponse.Value has unexpected type %T", x)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func _GaugeResponse_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
 | 
			
		||||
	m := msg.(*GaugeResponse)
 | 
			
		||||
	switch tag {
 | 
			
		||||
	case 2: // value.long_value
 | 
			
		||||
		if wire != proto.WireVarint {
 | 
			
		||||
			return true, proto.ErrInternalBadWireType
 | 
			
		||||
		}
 | 
			
		||||
		x, err := b.DecodeVarint()
 | 
			
		||||
		m.Value = &GaugeResponse_LongValue{int64(x)}
 | 
			
		||||
		return true, err
 | 
			
		||||
	case 3: // value.double_value
 | 
			
		||||
		if wire != proto.WireFixed64 {
 | 
			
		||||
			return true, proto.ErrInternalBadWireType
 | 
			
		||||
		}
 | 
			
		||||
		x, err := b.DecodeFixed64()
 | 
			
		||||
		m.Value = &GaugeResponse_DoubleValue{math.Float64frombits(x)}
 | 
			
		||||
		return true, err
 | 
			
		||||
	case 4: // value.string_value
 | 
			
		||||
		if wire != proto.WireBytes {
 | 
			
		||||
			return true, proto.ErrInternalBadWireType
 | 
			
		||||
		}
 | 
			
		||||
		x, err := b.DecodeStringBytes()
 | 
			
		||||
		m.Value = &GaugeResponse_StringValue{x}
 | 
			
		||||
		return true, err
 | 
			
		||||
	default:
 | 
			
		||||
		return false, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func _GaugeResponse_OneofSizer(msg proto.Message) (n int) {
 | 
			
		||||
	m := msg.(*GaugeResponse)
 | 
			
		||||
	// value
 | 
			
		||||
	switch x := m.Value.(type) {
 | 
			
		||||
	case *GaugeResponse_LongValue:
 | 
			
		||||
		n += proto.SizeVarint(2<<3 | proto.WireVarint)
 | 
			
		||||
		n += proto.SizeVarint(uint64(x.LongValue))
 | 
			
		||||
	case *GaugeResponse_DoubleValue:
 | 
			
		||||
		n += proto.SizeVarint(3<<3 | proto.WireFixed64)
 | 
			
		||||
		n += 8
 | 
			
		||||
	case *GaugeResponse_StringValue:
 | 
			
		||||
		n += proto.SizeVarint(4<<3 | proto.WireBytes)
 | 
			
		||||
		n += proto.SizeVarint(uint64(len(x.StringValue)))
 | 
			
		||||
		n += len(x.StringValue)
 | 
			
		||||
	case nil:
 | 
			
		||||
	default:
 | 
			
		||||
		panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
 | 
			
		||||
	}
 | 
			
		||||
	return n
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Request message containing the gauge name
 | 
			
		||||
type GaugeRequest struct {
 | 
			
		||||
	Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *GaugeRequest) Reset()                    { *m = GaugeRequest{} }
 | 
			
		||||
func (m *GaugeRequest) String() string            { return proto.CompactTextString(m) }
 | 
			
		||||
func (*GaugeRequest) ProtoMessage()               {}
 | 
			
		||||
func (*GaugeRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
 | 
			
		||||
 | 
			
		||||
type EmptyMessage struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *EmptyMessage) Reset()                    { *m = EmptyMessage{} }
 | 
			
		||||
func (m *EmptyMessage) String() string            { return proto.CompactTextString(m) }
 | 
			
		||||
func (*EmptyMessage) ProtoMessage()               {}
 | 
			
		||||
func (*EmptyMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	proto.RegisterType((*GaugeResponse)(nil), "grpc.testing.GaugeResponse")
 | 
			
		||||
	proto.RegisterType((*GaugeRequest)(nil), "grpc.testing.GaugeRequest")
 | 
			
		||||
	proto.RegisterType((*EmptyMessage)(nil), "grpc.testing.EmptyMessage")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Reference imports to suppress errors if they are not otherwise used.
 | 
			
		||||
var _ context.Context
 | 
			
		||||
var _ grpc.ClientConn
 | 
			
		||||
 | 
			
		||||
// This is a compile-time assertion to ensure that this generated file
 | 
			
		||||
// is compatible with the grpc package it is being compiled against.
 | 
			
		||||
const _ = grpc.SupportPackageIsVersion1
 | 
			
		||||
 | 
			
		||||
// Client API for MetricsService service
 | 
			
		||||
 | 
			
		||||
type MetricsServiceClient interface {
 | 
			
		||||
	// Returns the values of all the gauges that are currently being maintained by
 | 
			
		||||
	// the service
 | 
			
		||||
	GetAllGauges(ctx context.Context, in *EmptyMessage, opts ...grpc.CallOption) (MetricsService_GetAllGaugesClient, error)
 | 
			
		||||
	// Returns the value of one gauge
 | 
			
		||||
	GetGauge(ctx context.Context, in *GaugeRequest, opts ...grpc.CallOption) (*GaugeResponse, error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type metricsServiceClient struct {
 | 
			
		||||
	cc *grpc.ClientConn
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewMetricsServiceClient(cc *grpc.ClientConn) MetricsServiceClient {
 | 
			
		||||
	return &metricsServiceClient{cc}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *metricsServiceClient) GetAllGauges(ctx context.Context, in *EmptyMessage, opts ...grpc.CallOption) (MetricsService_GetAllGaugesClient, error) {
 | 
			
		||||
	stream, err := grpc.NewClientStream(ctx, &_MetricsService_serviceDesc.Streams[0], c.cc, "/grpc.testing.MetricsService/GetAllGauges", opts...)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	x := &metricsServiceGetAllGaugesClient{stream}
 | 
			
		||||
	if err := x.ClientStream.SendMsg(in); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if err := x.ClientStream.CloseSend(); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return x, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type MetricsService_GetAllGaugesClient interface {
 | 
			
		||||
	Recv() (*GaugeResponse, error)
 | 
			
		||||
	grpc.ClientStream
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type metricsServiceGetAllGaugesClient struct {
 | 
			
		||||
	grpc.ClientStream
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (x *metricsServiceGetAllGaugesClient) Recv() (*GaugeResponse, error) {
 | 
			
		||||
	m := new(GaugeResponse)
 | 
			
		||||
	if err := x.ClientStream.RecvMsg(m); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return m, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *metricsServiceClient) GetGauge(ctx context.Context, in *GaugeRequest, opts ...grpc.CallOption) (*GaugeResponse, error) {
 | 
			
		||||
	out := new(GaugeResponse)
 | 
			
		||||
	err := grpc.Invoke(ctx, "/grpc.testing.MetricsService/GetGauge", in, out, c.cc, opts...)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return out, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Server API for MetricsService service
 | 
			
		||||
 | 
			
		||||
type MetricsServiceServer interface {
 | 
			
		||||
	// Returns the values of all the gauges that are currently being maintained by
 | 
			
		||||
	// the service
 | 
			
		||||
	GetAllGauges(*EmptyMessage, MetricsService_GetAllGaugesServer) error
 | 
			
		||||
	// Returns the value of one gauge
 | 
			
		||||
	GetGauge(context.Context, *GaugeRequest) (*GaugeResponse, error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func RegisterMetricsServiceServer(s *grpc.Server, srv MetricsServiceServer) {
 | 
			
		||||
	s.RegisterService(&_MetricsService_serviceDesc, srv)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func _MetricsService_GetAllGauges_Handler(srv interface{}, stream grpc.ServerStream) error {
 | 
			
		||||
	m := new(EmptyMessage)
 | 
			
		||||
	if err := stream.RecvMsg(m); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	return srv.(MetricsServiceServer).GetAllGauges(m, &metricsServiceGetAllGaugesServer{stream})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type MetricsService_GetAllGaugesServer interface {
 | 
			
		||||
	Send(*GaugeResponse) error
 | 
			
		||||
	grpc.ServerStream
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type metricsServiceGetAllGaugesServer struct {
 | 
			
		||||
	grpc.ServerStream
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (x *metricsServiceGetAllGaugesServer) Send(m *GaugeResponse) error {
 | 
			
		||||
	return x.ServerStream.SendMsg(m)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func _MetricsService_GetGauge_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
 | 
			
		||||
	in := new(GaugeRequest)
 | 
			
		||||
	if err := dec(in); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	out, err := srv.(MetricsServiceServer).GetGauge(ctx, in)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return out, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _MetricsService_serviceDesc = grpc.ServiceDesc{
 | 
			
		||||
	ServiceName: "grpc.testing.MetricsService",
 | 
			
		||||
	HandlerType: (*MetricsServiceServer)(nil),
 | 
			
		||||
	Methods: []grpc.MethodDesc{
 | 
			
		||||
		{
 | 
			
		||||
			MethodName: "GetGauge",
 | 
			
		||||
			Handler:    _MetricsService_GetGauge_Handler,
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
	Streams: []grpc.StreamDesc{
 | 
			
		||||
		{
 | 
			
		||||
			StreamName:    "GetAllGauges",
 | 
			
		||||
			Handler:       _MetricsService_GetAllGauges_Handler,
 | 
			
		||||
			ServerStreams: true,
 | 
			
		||||
		},
 | 
			
		||||
	},
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var fileDescriptor0 = []byte{
 | 
			
		||||
	// 253 bytes of a gzipped FileDescriptorProto
 | 
			
		||||
	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0xcd, 0x4d, 0x2d, 0x29,
 | 
			
		||||
	0xca, 0x4c, 0x2e, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x2f, 0x2a, 0x48, 0xd6,
 | 
			
		||||
	0x2b, 0x49, 0x2d, 0x2e, 0xc9, 0xcc, 0x4b, 0x57, 0x9a, 0xce, 0xc8, 0xc5, 0xeb, 0x9e, 0x58, 0x9a,
 | 
			
		||||
	0x9e, 0x1a, 0x94, 0x5a, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x2a, 0x24, 0xc4, 0xc5, 0x92, 0x97, 0x98,
 | 
			
		||||
	0x9b, 0x2a, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x19, 0x04, 0x66, 0x0b, 0xc9, 0x73, 0x71, 0xe5, 0xe4,
 | 
			
		||||
	0xe7, 0xa5, 0xc7, 0x97, 0x25, 0xe6, 0x94, 0xa6, 0x4a, 0x30, 0x01, 0x65, 0x98, 0x3d, 0x18, 0x82,
 | 
			
		||||
	0x38, 0x41, 0x62, 0x61, 0x20, 0x21, 0x21, 0x65, 0x2e, 0x9e, 0x94, 0xfc, 0xd2, 0xa4, 0x9c, 0x54,
 | 
			
		||||
	0xa8, 0x12, 0x66, 0xa0, 0x12, 0x46, 0xa0, 0x12, 0x6e, 0x88, 0x28, 0x5c, 0x51, 0x31, 0xd0, 0x25,
 | 
			
		||||
	0x70, 0x73, 0x58, 0x40, 0x36, 0x80, 0x14, 0x41, 0x44, 0xc1, 0x8a, 0x9c, 0xd8, 0xb9, 0x58, 0xc1,
 | 
			
		||||
	0xb2, 0x4a, 0x4a, 0x5c, 0x3c, 0x50, 0x87, 0x15, 0x96, 0x02, 0x1d, 0x8b, 0xcd, 0x5d, 0x4a, 0x7c,
 | 
			
		||||
	0x5c, 0x3c, 0xae, 0xb9, 0x05, 0x25, 0x95, 0xbe, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, 0xa9, 0x46, 0x0b,
 | 
			
		||||
	0x18, 0xb9, 0xf8, 0x7c, 0x21, 0xbe, 0x0d, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0x15, 0xf2, 0x04,
 | 
			
		||||
	0x1a, 0x93, 0x5a, 0xe2, 0x98, 0x93, 0x03, 0x36, 0xac, 0x58, 0x48, 0x4a, 0x0f, 0xd9, 0xff, 0x7a,
 | 
			
		||||
	0xc8, 0xda, 0xa5, 0xa4, 0x51, 0xe5, 0x50, 0xc2, 0xc5, 0x80, 0x51, 0xc8, 0x99, 0x8b, 0x03, 0x68,
 | 
			
		||||
	0x14, 0x58, 0x14, 0xdd, 0x18, 0x64, 0x97, 0xe2, 0x35, 0x26, 0x89, 0x0d, 0x1c, 0x0b, 0xc6, 0x80,
 | 
			
		||||
	0x00, 0x00, 0x00, 0xff, 0xff, 0x5e, 0x7d, 0xb2, 0xc9, 0x96, 0x01, 0x00, 0x00,
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,64 @@
 | 
			
		|||
// Copyright 2015-2016, Google Inc.
 | 
			
		||||
// All rights reserved.
 | 
			
		||||
//
 | 
			
		||||
// Redistribution and use in source and binary forms, with or without
 | 
			
		||||
// modification, are permitted provided that the following conditions are
 | 
			
		||||
// met:
 | 
			
		||||
//
 | 
			
		||||
//     * Redistributions of source code must retain the above copyright
 | 
			
		||||
// notice, this list of conditions and the following disclaimer.
 | 
			
		||||
//     * Redistributions in binary form must reproduce the above
 | 
			
		||||
// copyright notice, this list of conditions and the following disclaimer
 | 
			
		||||
// in the documentation and/or other materials provided with the
 | 
			
		||||
// distribution.
 | 
			
		||||
//     * Neither the name of Google Inc. nor the names of its
 | 
			
		||||
// contributors may be used to endorse or promote products derived from
 | 
			
		||||
// this software without specific prior written permission.
 | 
			
		||||
//
 | 
			
		||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 | 
			
		||||
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 | 
			
		||||
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 | 
			
		||||
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 | 
			
		||||
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 | 
			
		||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 | 
			
		||||
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 | 
			
		||||
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 | 
			
		||||
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 | 
			
		||||
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 | 
			
		||||
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 | 
			
		||||
 | 
			
		||||
// Contains the definitions for a metrics service and the type of metrics
 | 
			
		||||
// exposed by the service.
 | 
			
		||||
//
 | 
			
		||||
// Currently, 'Gauge' (i.e a metric that represents the measured value of
 | 
			
		||||
// something at an instant of time) is the only metric type supported by the
 | 
			
		||||
// service.
 | 
			
		||||
syntax = "proto3";
 | 
			
		||||
 | 
			
		||||
package grpc.testing;
 | 
			
		||||
 | 
			
		||||
// Reponse message containing the gauge name and value
 | 
			
		||||
message GaugeResponse {
 | 
			
		||||
  string name = 1;
 | 
			
		||||
  oneof value {
 | 
			
		||||
    int64 long_value = 2;
 | 
			
		||||
    double double_value = 3;
 | 
			
		||||
    string string_value = 4;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Request message containing the gauge name
 | 
			
		||||
message GaugeRequest {
 | 
			
		||||
  string name = 1;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
message EmptyMessage {}
 | 
			
		||||
 | 
			
		||||
service MetricsService {
 | 
			
		||||
  // Returns the values of all the gauges that are currently being maintained by
 | 
			
		||||
  // the service
 | 
			
		||||
  rpc GetAllGauges(EmptyMessage) returns (stream GaugeResponse);
 | 
			
		||||
 | 
			
		||||
  // Returns the value of one gauge
 | 
			
		||||
  rpc GetGauge(GaugeRequest) returns (GaugeResponse);
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,97 @@
 | 
			
		|||
/*
 | 
			
		||||
 *
 | 
			
		||||
 * Copyright 2016, Google Inc.
 | 
			
		||||
 * All rights reserved.
 | 
			
		||||
 *
 | 
			
		||||
 * Redistribution and use in source and binary forms, with or without
 | 
			
		||||
 * modification, are permitted provided that the following conditions are
 | 
			
		||||
 * met:
 | 
			
		||||
 *
 | 
			
		||||
 *     * Redistributions of source code must retain the above copyright
 | 
			
		||||
 * notice, this list of conditions and the following disclaimer.
 | 
			
		||||
 *     * Redistributions in binary form must reproduce the above
 | 
			
		||||
 * copyright notice, this list of conditions and the following disclaimer
 | 
			
		||||
 * in the documentation and/or other materials provided with the
 | 
			
		||||
 * distribution.
 | 
			
		||||
 *     * Neither the name of Google Inc. nor the names of its
 | 
			
		||||
 * contributors may be used to endorse or promote products derived from
 | 
			
		||||
 * this software without specific prior written permission.
 | 
			
		||||
 *
 | 
			
		||||
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 | 
			
		||||
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 | 
			
		||||
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 | 
			
		||||
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 | 
			
		||||
 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 | 
			
		||||
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 | 
			
		||||
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 | 
			
		||||
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 | 
			
		||||
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 | 
			
		||||
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 | 
			
		||||
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 | 
			
		||||
 *
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"flag"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
 | 
			
		||||
	"golang.org/x/net/context"
 | 
			
		||||
	"google.golang.org/grpc"
 | 
			
		||||
	"google.golang.org/grpc/grpclog"
 | 
			
		||||
	metricspb "google.golang.org/grpc/stress/grpc_testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	metricsServerAddress = flag.String("metrics_server_address", "", "The metrics server addresses in the fomrat <hostname>:<port>")
 | 
			
		||||
	totalOnly            = flag.Bool("total_only", false, "If true, this prints only the total value of all gauges")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func printMetrics(client metricspb.MetricsServiceClient, totalOnly bool) {
 | 
			
		||||
	stream, err := client.GetAllGauges(context.Background(), &metricspb.EmptyMessage{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		grpclog.Fatalf("failed to call GetAllGuages: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var (
 | 
			
		||||
		overallQPS int64
 | 
			
		||||
		rpcStatus  error
 | 
			
		||||
	)
 | 
			
		||||
	for {
 | 
			
		||||
		gaugeResponse, err := stream.Recv()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			rpcStatus = err
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		if _, ok := gaugeResponse.GetValue().(*metricspb.GaugeResponse_LongValue); !ok {
 | 
			
		||||
			panic(fmt.Sprintf("gauge %s is not a long value", gaugeResponse.Name))
 | 
			
		||||
		}
 | 
			
		||||
		v := gaugeResponse.GetLongValue()
 | 
			
		||||
		if !totalOnly {
 | 
			
		||||
			grpclog.Printf("%s: %d", gaugeResponse.Name, v)
 | 
			
		||||
		}
 | 
			
		||||
		overallQPS += v
 | 
			
		||||
	}
 | 
			
		||||
	if rpcStatus != io.EOF {
 | 
			
		||||
		grpclog.Fatalf("failed to finish server streaming: %v", rpcStatus)
 | 
			
		||||
	}
 | 
			
		||||
	grpclog.Printf("overall qps: %d", overallQPS)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	flag.Parse()
 | 
			
		||||
	if *metricsServerAddress == "" {
 | 
			
		||||
		grpclog.Fatalf("Metrics server address is empty.")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	conn, err := grpc.Dial(*metricsServerAddress, grpc.WithInsecure())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		grpclog.Fatalf("cannot connect to metrics server: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer conn.Close()
 | 
			
		||||
 | 
			
		||||
	c := metricspb.NewMetricsServiceClient(conn)
 | 
			
		||||
	printMetrics(c, *totalOnly)
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
		Reference in New Issue