orca: minor cleanups (#6239)

This commit is contained in:
Doug Fawley 2023-05-01 17:03:11 -07:00 committed by GitHub
parent 21a339ce4a
commit 713bd04130
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 73 additions and 54 deletions

View File

@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/orca/internal"
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
@ -58,7 +59,6 @@ func (s) TestE2ECallMetricsUnary(t *testing.T) {
desc string
injectMetrics bool
wantProto *v3orcapb.OrcaLoadReport
wantErr error
}{
{
desc: "with custom backend metrics",
@ -73,7 +73,6 @@ func (s) TestE2ECallMetricsUnary(t *testing.T) {
{
desc: "with no custom backend metrics",
injectMetrics: false,
wantErr: orca.ErrLoadReportMissing,
},
}
@ -146,9 +145,9 @@ func (s) TestE2ECallMetricsUnary(t *testing.T) {
t.Fatalf("EmptyCall failed: %v", err)
}
gotProto, err := orca.ToLoadReport(trailer)
if test.wantErr != nil && !errors.Is(err, test.wantErr) {
t.Fatalf("When retrieving load report, got error: %v, want: %v", err, orca.ErrLoadReportMissing)
gotProto, err := internal.ToLoadReport(trailer)
if err != nil {
t.Fatalf("When retrieving load report, got error: %v, want: <nil>", err)
}
if test.wantProto != nil && !cmp.Equal(gotProto, test.wantProto, cmp.Comparer(proto.Equal)) {
t.Fatalf("Received load report in trailer: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(test.wantProto))
@ -165,7 +164,6 @@ func (s) TestE2ECallMetricsStreaming(t *testing.T) {
desc string
injectMetrics bool
wantProto *v3orcapb.OrcaLoadReport
wantErr error
}{
{
desc: "with custom backend metrics",
@ -180,7 +178,6 @@ func (s) TestE2ECallMetricsStreaming(t *testing.T) {
{
desc: "with no custom backend metrics",
injectMetrics: false,
wantErr: orca.ErrLoadReportMissing,
},
}
@ -288,9 +285,9 @@ func (s) TestE2ECallMetricsStreaming(t *testing.T) {
}
}
gotProto, err := orca.ToLoadReport(stream.Trailer())
if test.wantErr != nil && !errors.Is(err, test.wantErr) {
t.Fatalf("When retrieving load report, got error: %v, want: %v", err, orca.ErrLoadReportMissing)
gotProto, err := internal.ToLoadReport(stream.Trailer())
if err != nil {
t.Fatalf("When retrieving load report, got error: %v, want: <nil>", err)
}
if test.wantProto != nil && !cmp.Equal(gotProto, test.wantProto, cmp.Comparer(proto.Equal)) {
t.Fatalf("Received load report in trailer: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(test.wantProto))

View File

@ -20,7 +20,16 @@
// avoid polluting the godoc of the top-level orca package.
package internal
import ibackoff "google.golang.org/grpc/internal/backoff"
import (
"errors"
"fmt"
ibackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
)
// AllowAnyMinReportingInterval prevents clamping of the MinReportingInterval
// configured via ServiceOptions, to a minimum of 30s.
@ -32,3 +41,31 @@ var AllowAnyMinReportingInterval interface{} // func(*ServiceOptions)
//
// For testing purposes only.
var DefaultBackoffFunc = ibackoff.DefaultExponential.Backoff
// TrailerMetadataKey is the key in which the per-call backend metrics are
// transmitted.
const TrailerMetadataKey = "endpoint-load-metrics-bin"
// ToLoadReport unmarshals a binary encoded [ORCA LoadReport] protobuf message
// from md and returns the corresponding struct. The load report is expected to
// be stored as the value for key "endpoint-load-metrics-bin".
//
// If no load report was found in the provided metadata, if multiple load
// reports are found, or if the load report found cannot be parsed, an error is
// returned.
//
// [ORCA LoadReport]: (https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15)
func ToLoadReport(md metadata.MD) (*v3orcapb.OrcaLoadReport, error) {
vs := md.Get(TrailerMetadataKey)
if len(vs) == 0 {
return nil, nil
}
if len(vs) != 1 {
return nil, errors.New("multiple orca load reports found in provided metadata")
}
ret := new(v3orcapb.OrcaLoadReport)
if err := proto.Unmarshal([]byte(vs[0]), ret); err != nil {
return nil, fmt.Errorf("failed to unmarshal load report found in metadata: %v", err)
}
return ret, nil
}

View File

@ -29,21 +29,19 @@ package orca
import (
"context"
"errors"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
igrpc "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancerload"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/orca/internal"
"google.golang.org/protobuf/proto"
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
)
var (
logger = grpclog.Component("orca-backend-metrics")
joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption)
joinServerOptions = igrpc.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption)
)
const trailerMetadataKey = "endpoint-load-metrics-bin"
@ -144,26 +142,6 @@ func (w *wrappedStream) Context() context.Context {
// ErrLoadReportMissing indicates no ORCA load report was found in trailers.
var ErrLoadReportMissing = errors.New("orca load report missing in provided metadata")
// ToLoadReport unmarshals a binary encoded [ORCA LoadReport] protobuf message
// from md and returns the corresponding struct. The load report is expected to
// be stored as the value for key "endpoint-load-metrics-bin".
//
// If no load report was found in the provided metadata, ErrLoadReportMissing is
// returned.
//
// [ORCA LoadReport]: (https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15)
func ToLoadReport(md metadata.MD) (*v3orcapb.OrcaLoadReport, error) {
vs := md.Get(trailerMetadataKey)
if len(vs) == 0 {
return nil, ErrLoadReportMissing
}
ret := new(v3orcapb.OrcaLoadReport)
if err := proto.Unmarshal([]byte(vs[0]), ret); err != nil {
return nil, fmt.Errorf("failed to unmarshal load report found in metadata: %v", err)
}
return ret, nil
}
// loadParser implements the Parser interface defined in `internal/balancerload`
// package. This interface is used by the client stream to parse load reports
// sent by the server in trailer metadata. The parsed loads are then sent to
@ -174,9 +152,12 @@ func ToLoadReport(md metadata.MD) (*v3orcapb.OrcaLoadReport, error) {
type loadParser struct{}
func (loadParser) Parse(md metadata.MD) interface{} {
lr, err := ToLoadReport(md)
lr, err := internal.ToLoadReport(md)
if err != nil {
logger.Errorf("Parse(%v) failed: %v", err)
logger.Infof("Parse failed: %v", err)
}
if lr == nil && logger.V(2) {
logger.Infof("Missing ORCA load report data")
}
return lr
}

View File

@ -25,12 +25,18 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/orca/internal"
v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
)
func TestToLoadReport(t *testing.T) {
goodReport := &v3orcapb.OrcaLoadReport{
CpuUtilization: 1.0,
MemUtilization: 50.0,
RequestCost: map[string]float64{"queryCost": 25.0},
Utilization: map[string]float64{"queueSize": 75.0},
}
tests := []struct {
name string
md metadata.MD
@ -40,7 +46,7 @@ func TestToLoadReport(t *testing.T) {
{
name: "no load report in metadata",
md: metadata.MD{},
wantErr: true,
wantErr: false,
},
{
name: "badly marshaled load report",
@ -49,29 +55,27 @@ func TestToLoadReport(t *testing.T) {
}(),
wantErr: true,
},
{
name: "multiple load reports",
md: func() metadata.MD {
b, _ := proto.Marshal(goodReport)
return metadata.Pairs("endpoint-load-metrics-bin", string(b), "endpoint-load-metrics-bin", string(b))
}(),
wantErr: true,
},
{
name: "good load report",
md: func() metadata.MD {
b, _ := proto.Marshal(&v3orcapb.OrcaLoadReport{
CpuUtilization: 1.0,
MemUtilization: 50.0,
RequestCost: map[string]float64{"queryCost": 25.0},
Utilization: map[string]float64{"queueSize": 75.0},
})
b, _ := proto.Marshal(goodReport)
return metadata.Pairs("endpoint-load-metrics-bin", string(b))
}(),
want: &v3orcapb.OrcaLoadReport{
CpuUtilization: 1.0,
MemUtilization: 50.0,
RequestCost: map[string]float64{"queryCost": 25.0},
Utilization: map[string]float64{"queueSize": 75.0},
},
want: goodReport,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := orca.ToLoadReport(test.md)
got, err := internal.ToLoadReport(test.md)
if (err != nil) != test.wantErr {
t.Fatalf("orca.ToLoadReport(%v) = %v, wantErr: %v", test.md, err, test.wantErr)
}

View File

@ -120,7 +120,7 @@ func (s *Service) determineReportingInterval(req *v3orcaservicepb.OrcaLoadReport
}
dur := req.GetReportInterval().AsDuration()
if dur < s.minReportingInterval {
logger.Warningf("Received reporting interval %q is less than configured minimum: %v. Using default: %s", dur, s.minReportingInterval)
logger.Warningf("Received reporting interval %q is less than configured minimum: %v. Using minimum", dur, s.minReportingInterval)
return s.minReportingInterval
}
return dur