[chore][configgrpc] Fix receiver variable names (#13628)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Changes receiver variable names to be abbreviations for the current struct names.
This commit is contained in:
parent
cc556e7df5
commit
ef18508a09
|
|
@ -217,10 +217,10 @@ func NewDefaultServerConfig() ServerConfig {
|
|||
}
|
||||
}
|
||||
|
||||
func (gcs *ClientConfig) Validate() error {
|
||||
if gcs.BalancerName != "" {
|
||||
if balancer.Get(gcs.BalancerName) == nil {
|
||||
return fmt.Errorf("invalid balancer_name: %s", gcs.BalancerName)
|
||||
func (cc *ClientConfig) Validate() error {
|
||||
if cc.BalancerName != "" {
|
||||
if balancer.Get(cc.BalancerName) == nil {
|
||||
return fmt.Errorf("invalid balancer_name: %s", cc.BalancerName)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -228,23 +228,23 @@ func (gcs *ClientConfig) Validate() error {
|
|||
}
|
||||
|
||||
// sanitizedEndpoint strips the prefix of either http:// or https:// from configgrpc.ClientConfig.Endpoint.
|
||||
func (gcs *ClientConfig) sanitizedEndpoint() string {
|
||||
func (cc *ClientConfig) sanitizedEndpoint() string {
|
||||
switch {
|
||||
case gcs.isSchemeHTTP():
|
||||
return strings.TrimPrefix(gcs.Endpoint, "http://")
|
||||
case gcs.isSchemeHTTPS():
|
||||
return strings.TrimPrefix(gcs.Endpoint, "https://")
|
||||
case cc.isSchemeHTTP():
|
||||
return strings.TrimPrefix(cc.Endpoint, "http://")
|
||||
case cc.isSchemeHTTPS():
|
||||
return strings.TrimPrefix(cc.Endpoint, "https://")
|
||||
default:
|
||||
return gcs.Endpoint
|
||||
return cc.Endpoint
|
||||
}
|
||||
}
|
||||
|
||||
func (gcs *ClientConfig) isSchemeHTTP() bool {
|
||||
return strings.HasPrefix(gcs.Endpoint, "http://")
|
||||
func (cc *ClientConfig) isSchemeHTTP() bool {
|
||||
return strings.HasPrefix(cc.Endpoint, "http://")
|
||||
}
|
||||
|
||||
func (gcs *ClientConfig) isSchemeHTTPS() bool {
|
||||
return strings.HasPrefix(gcs.Endpoint, "https://")
|
||||
func (cc *ClientConfig) isSchemeHTTPS() bool {
|
||||
return strings.HasPrefix(cc.Endpoint, "https://")
|
||||
}
|
||||
|
||||
// ToClientConnOption is a sealed interface wrapping options for [ClientConfig.ToClientConn].
|
||||
|
|
@ -266,24 +266,24 @@ func (grpcDialOptionWrapper) isToClientConnOption() {}
|
|||
// a non-blocking dial (the function won't wait for connections to be
|
||||
// established, and connecting happens in the background). To make it a blocking
|
||||
// dial, use the WithGrpcDialOption(grpc.WithBlock()) option.
|
||||
func (gcs *ClientConfig) ToClientConn(
|
||||
func (cc *ClientConfig) ToClientConn(
|
||||
ctx context.Context,
|
||||
host component.Host,
|
||||
settings component.TelemetrySettings,
|
||||
extraOpts ...ToClientConnOption,
|
||||
) (*grpc.ClientConn, error) {
|
||||
grpcOpts, err := gcs.getGrpcDialOptions(ctx, host, settings, extraOpts)
|
||||
grpcOpts, err := cc.getGrpcDialOptions(ctx, host, settings, extraOpts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
//nolint:staticcheck // SA1019 see https://github.com/open-telemetry/opentelemetry-collector/pull/11575
|
||||
return grpc.DialContext(ctx, gcs.sanitizedEndpoint(), grpcOpts...)
|
||||
return grpc.DialContext(ctx, cc.sanitizedEndpoint(), grpcOpts...)
|
||||
}
|
||||
|
||||
func (gcs *ClientConfig) addHeadersIfAbsent(ctx context.Context) context.Context {
|
||||
kv := make([]string, 0, 2*len(gcs.Headers))
|
||||
func (cc *ClientConfig) addHeadersIfAbsent(ctx context.Context) context.Context {
|
||||
kv := make([]string, 0, 2*len(cc.Headers))
|
||||
existingMd, _ := metadata.FromOutgoingContext(ctx)
|
||||
for k, v := range gcs.Headers {
|
||||
for k, v := range cc.Headers {
|
||||
if len(existingMd.Get(k)) == 0 {
|
||||
kv = append(kv, k, string(v))
|
||||
}
|
||||
|
|
@ -291,43 +291,43 @@ func (gcs *ClientConfig) addHeadersIfAbsent(ctx context.Context) context.Context
|
|||
return metadata.AppendToOutgoingContext(ctx, kv...)
|
||||
}
|
||||
|
||||
func (gcs *ClientConfig) getGrpcDialOptions(
|
||||
func (cc *ClientConfig) getGrpcDialOptions(
|
||||
ctx context.Context,
|
||||
host component.Host,
|
||||
settings component.TelemetrySettings,
|
||||
extraOpts []ToClientConnOption,
|
||||
) ([]grpc.DialOption, error) {
|
||||
var opts []grpc.DialOption
|
||||
if gcs.Compression.IsCompressed() {
|
||||
cp, err := getGRPCCompressionName(gcs.Compression)
|
||||
if cc.Compression.IsCompressed() {
|
||||
cp, err := getGRPCCompressionName(cc.Compression)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(cp)))
|
||||
}
|
||||
|
||||
tlsCfg, err := gcs.TLS.LoadTLSConfig(ctx)
|
||||
tlsCfg, err := cc.TLS.LoadTLSConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cred := insecure.NewCredentials()
|
||||
if tlsCfg != nil {
|
||||
cred = credentials.NewTLS(tlsCfg)
|
||||
} else if gcs.isSchemeHTTPS() {
|
||||
} else if cc.isSchemeHTTPS() {
|
||||
cred = credentials.NewTLS(&tls.Config{})
|
||||
}
|
||||
opts = append(opts, grpc.WithTransportCredentials(cred))
|
||||
|
||||
if gcs.ReadBufferSize > 0 {
|
||||
opts = append(opts, grpc.WithReadBufferSize(gcs.ReadBufferSize))
|
||||
if cc.ReadBufferSize > 0 {
|
||||
opts = append(opts, grpc.WithReadBufferSize(cc.ReadBufferSize))
|
||||
}
|
||||
|
||||
if gcs.WriteBufferSize > 0 {
|
||||
opts = append(opts, grpc.WithWriteBufferSize(gcs.WriteBufferSize))
|
||||
if cc.WriteBufferSize > 0 {
|
||||
opts = append(opts, grpc.WithWriteBufferSize(cc.WriteBufferSize))
|
||||
}
|
||||
|
||||
if gcs.Keepalive.HasValue() {
|
||||
keepaliveConfig := gcs.Keepalive.Get()
|
||||
if cc.Keepalive.HasValue() {
|
||||
keepaliveConfig := cc.Keepalive.Get()
|
||||
keepAliveOption := grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: keepaliveConfig.Time,
|
||||
Timeout: keepaliveConfig.Timeout,
|
||||
|
|
@ -336,12 +336,12 @@ func (gcs *ClientConfig) getGrpcDialOptions(
|
|||
opts = append(opts, keepAliveOption)
|
||||
}
|
||||
|
||||
if gcs.Auth.HasValue() {
|
||||
if cc.Auth.HasValue() {
|
||||
if host.GetExtensions() == nil {
|
||||
return nil, errors.New("no extensions configuration available")
|
||||
}
|
||||
|
||||
grpcAuthenticator, cerr := gcs.Auth.Get().GetGRPCClientAuthenticator(ctx, host.GetExtensions())
|
||||
grpcAuthenticator, cerr := cc.Auth.Get().GetGRPCClientAuthenticator(ctx, host.GetExtensions())
|
||||
if cerr != nil {
|
||||
return nil, cerr
|
||||
}
|
||||
|
|
@ -353,12 +353,12 @@ func (gcs *ClientConfig) getGrpcDialOptions(
|
|||
opts = append(opts, grpc.WithPerRPCCredentials(perRPCCredentials))
|
||||
}
|
||||
|
||||
if gcs.BalancerName != "" {
|
||||
opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingPolicy":%q}`, gcs.BalancerName)))
|
||||
if cc.BalancerName != "" {
|
||||
opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingPolicy":%q}`, cc.BalancerName)))
|
||||
}
|
||||
|
||||
if gcs.Authority != "" {
|
||||
opts = append(opts, grpc.WithAuthority(gcs.Authority))
|
||||
if cc.Authority != "" {
|
||||
opts = append(opts, grpc.WithAuthority(cc.Authority))
|
||||
}
|
||||
|
||||
otelOpts := []otelgrpc.Option{
|
||||
|
|
@ -370,19 +370,19 @@ func (gcs *ClientConfig) getGrpcDialOptions(
|
|||
// Enable OpenTelemetry observability plugin.
|
||||
opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelOpts...)))
|
||||
|
||||
if len(gcs.Headers) > 0 {
|
||||
if len(cc.Headers) > 0 {
|
||||
opts = append(opts,
|
||||
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
return invoker(gcs.addHeadersIfAbsent(ctx), method, req, reply, cc, opts...)
|
||||
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, gcc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
return invoker(cc.addHeadersIfAbsent(ctx), method, req, reply, gcc, opts...)
|
||||
}),
|
||||
grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
return streamer(gcs.addHeadersIfAbsent(ctx), desc, cc, method, opts...)
|
||||
grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, gcc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
return streamer(cc.addHeadersIfAbsent(ctx), desc, gcc, method, opts...)
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
// Apply middleware options. Note: OpenTelemetry could be registered as an extension.
|
||||
for _, middleware := range gcs.Middlewares {
|
||||
for _, middleware := range cc.Middlewares {
|
||||
middlewareOptions, err := middleware.GetGRPCClientOptions(ctx, host.GetExtensions())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get gRPC client options from middleware: %w", err)
|
||||
|
|
@ -399,17 +399,17 @@ func (gcs *ClientConfig) getGrpcDialOptions(
|
|||
return opts, nil
|
||||
}
|
||||
|
||||
func (gss *ServerConfig) Validate() error {
|
||||
if gss.MaxRecvMsgSizeMiB*1024*1024 < 0 {
|
||||
return fmt.Errorf("invalid max_recv_msg_size_mib value, must be between 1 and %d: %d", math.MaxInt/1024/1024, gss.MaxRecvMsgSizeMiB)
|
||||
func (sc *ServerConfig) Validate() error {
|
||||
if sc.MaxRecvMsgSizeMiB*1024*1024 < 0 {
|
||||
return fmt.Errorf("invalid max_recv_msg_size_mib value, must be between 1 and %d: %d", math.MaxInt/1024/1024, sc.MaxRecvMsgSizeMiB)
|
||||
}
|
||||
|
||||
if gss.ReadBufferSize < 0 {
|
||||
return fmt.Errorf("invalid read_buffer_size value: %d", gss.ReadBufferSize)
|
||||
if sc.ReadBufferSize < 0 {
|
||||
return fmt.Errorf("invalid read_buffer_size value: %d", sc.ReadBufferSize)
|
||||
}
|
||||
|
||||
if gss.WriteBufferSize < 0 {
|
||||
return fmt.Errorf("invalid write_buffer_size value: %d", gss.WriteBufferSize)
|
||||
if sc.WriteBufferSize < 0 {
|
||||
return fmt.Errorf("invalid write_buffer_size value: %d", sc.WriteBufferSize)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -431,20 +431,20 @@ func WithGrpcServerOption(opt grpc.ServerOption) ToServerOption {
|
|||
func (grpcServerOptionWrapper) isToServerOption() {}
|
||||
|
||||
// ToServer returns a [grpc.Server] for the configuration.
|
||||
func (gss *ServerConfig) ToServer(
|
||||
func (sc *ServerConfig) ToServer(
|
||||
ctx context.Context,
|
||||
host component.Host,
|
||||
settings component.TelemetrySettings,
|
||||
extraOpts ...ToServerOption,
|
||||
) (*grpc.Server, error) {
|
||||
grpcOpts, err := gss.getGrpcServerOptions(ctx, host, settings, extraOpts)
|
||||
grpcOpts, err := sc.getGrpcServerOptions(ctx, host, settings, extraOpts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return grpc.NewServer(grpcOpts...), nil
|
||||
}
|
||||
|
||||
func (gss *ServerConfig) getGrpcServerOptions(
|
||||
func (sc *ServerConfig) getGrpcServerOptions(
|
||||
ctx context.Context,
|
||||
host component.Host,
|
||||
settings component.TelemetrySettings,
|
||||
|
|
@ -452,36 +452,36 @@ func (gss *ServerConfig) getGrpcServerOptions(
|
|||
) ([]grpc.ServerOption, error) {
|
||||
var opts []grpc.ServerOption
|
||||
|
||||
if gss.TLS.HasValue() {
|
||||
tlsCfg, err := gss.TLS.Get().LoadTLSConfig(ctx)
|
||||
if sc.TLS.HasValue() {
|
||||
tlsCfg, err := sc.TLS.Get().LoadTLSConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts = append(opts, grpc.Creds(credentials.NewTLS(tlsCfg)))
|
||||
}
|
||||
|
||||
if gss.MaxRecvMsgSizeMiB > 0 && gss.MaxRecvMsgSizeMiB*1024*1024 > 0 {
|
||||
opts = append(opts, grpc.MaxRecvMsgSize(gss.MaxRecvMsgSizeMiB*1024*1024))
|
||||
if sc.MaxRecvMsgSizeMiB > 0 && sc.MaxRecvMsgSizeMiB*1024*1024 > 0 {
|
||||
opts = append(opts, grpc.MaxRecvMsgSize(sc.MaxRecvMsgSizeMiB*1024*1024))
|
||||
}
|
||||
|
||||
if gss.MaxConcurrentStreams > 0 {
|
||||
opts = append(opts, grpc.MaxConcurrentStreams(gss.MaxConcurrentStreams))
|
||||
if sc.MaxConcurrentStreams > 0 {
|
||||
opts = append(opts, grpc.MaxConcurrentStreams(sc.MaxConcurrentStreams))
|
||||
}
|
||||
|
||||
if gss.ReadBufferSize > 0 {
|
||||
opts = append(opts, grpc.ReadBufferSize(gss.ReadBufferSize))
|
||||
if sc.ReadBufferSize > 0 {
|
||||
opts = append(opts, grpc.ReadBufferSize(sc.ReadBufferSize))
|
||||
}
|
||||
|
||||
if gss.WriteBufferSize > 0 {
|
||||
opts = append(opts, grpc.WriteBufferSize(gss.WriteBufferSize))
|
||||
if sc.WriteBufferSize > 0 {
|
||||
opts = append(opts, grpc.WriteBufferSize(sc.WriteBufferSize))
|
||||
}
|
||||
|
||||
// The default values referenced in the GRPC docs are set within the server, so this code doesn't need
|
||||
// to apply them over zero/nil values before passing these as grpc.ServerOptions.
|
||||
// The following shows the server code for applying default grpc.ServerOptions.
|
||||
// https://github.com/grpc/grpc-go/blob/120728e1f775e40a2a764341939b78d666b08260/internal/transport/http2_server.go#L184-L200
|
||||
if gss.Keepalive.HasValue() {
|
||||
keepaliveConfig := gss.Keepalive.Get()
|
||||
if sc.Keepalive.HasValue() {
|
||||
keepaliveConfig := sc.Keepalive.Get()
|
||||
if keepaliveConfig.ServerParameters.HasValue() {
|
||||
svrParams := keepaliveConfig.ServerParameters.Get()
|
||||
opts = append(opts, grpc.KeepaliveParams(keepalive.ServerParameters{
|
||||
|
|
@ -508,8 +508,8 @@ func (gss *ServerConfig) getGrpcServerOptions(
|
|||
var uInterceptors []grpc.UnaryServerInterceptor
|
||||
var sInterceptors []grpc.StreamServerInterceptor
|
||||
|
||||
if gss.Auth.HasValue() {
|
||||
authenticator, err := gss.Auth.Get().GetServerAuthenticator(ctx, host.GetExtensions())
|
||||
if sc.Auth.HasValue() {
|
||||
authenticator, err := sc.Auth.Get().GetServerAuthenticator(ctx, host.GetExtensions())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -526,13 +526,13 @@ func (gss *ServerConfig) getGrpcServerOptions(
|
|||
|
||||
// Enable OpenTelemetry observability plugin.
|
||||
|
||||
uInterceptors = append(uInterceptors, enhanceWithClientInformation(gss.IncludeMetadata))
|
||||
sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation(gss.IncludeMetadata)) //nolint:contextcheck // context already handled
|
||||
uInterceptors = append(uInterceptors, enhanceWithClientInformation(sc.IncludeMetadata))
|
||||
sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation(sc.IncludeMetadata)) //nolint:contextcheck // context already handled
|
||||
|
||||
opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))
|
||||
|
||||
// Apply middleware options. Note: OpenTelemetry could be registered as an extension.
|
||||
for _, middleware := range gss.Middlewares {
|
||||
for _, middleware := range sc.Middlewares {
|
||||
middlewareOptions, err := middleware.GetGRPCServerOptions(ctx, host.GetExtensions())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get gRPC server options from middleware: %w", err)
|
||||
|
|
|
|||
|
|
@ -114,12 +114,12 @@ var (
|
|||
)
|
||||
|
||||
func TestDefaultGrpcClientSettings(t *testing.T) {
|
||||
gcs := &ClientConfig{
|
||||
cc := &ClientConfig{
|
||||
TLS: configtls.ClientConfig{
|
||||
Insecure: true,
|
||||
},
|
||||
}
|
||||
opts, err := gcs.getGrpcDialOptions(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToClientConnOption{})
|
||||
opts, err := cc.getGrpcDialOptions(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToClientConnOption{})
|
||||
require.NoError(t, err)
|
||||
/* Expecting 2 DialOptions:
|
||||
* - WithTransportCredentials (TLS)
|
||||
|
|
@ -129,13 +129,13 @@ func TestDefaultGrpcClientSettings(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGrpcClientExtraOption(t *testing.T) {
|
||||
gcs := &ClientConfig{
|
||||
cc := &ClientConfig{
|
||||
TLS: configtls.ClientConfig{
|
||||
Insecure: true,
|
||||
},
|
||||
}
|
||||
extraOpt := grpc.WithUserAgent("test-agent")
|
||||
opts, err := gcs.getGrpcDialOptions(
|
||||
opts, err := cc.getGrpcDialOptions(
|
||||
context.Background(),
|
||||
componenttest.NewNopHost(),
|
||||
componenttest.NewNopTelemetrySettings(),
|
||||
|
|
@ -549,13 +549,13 @@ func TestGRPCClientSettingsError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestUseSecure(t *testing.T) {
|
||||
gcs := &ClientConfig{
|
||||
cc := &ClientConfig{
|
||||
Headers: nil,
|
||||
Endpoint: "",
|
||||
Compression: "",
|
||||
TLS: configtls.ClientConfig{},
|
||||
}
|
||||
dialOpts, err := gcs.getGrpcDialOptions(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToClientConnOption{})
|
||||
dialOpts, err := cc.getGrpcDialOptions(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToClientConnOption{})
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, dialOpts, 2)
|
||||
}
|
||||
|
|
@ -1180,13 +1180,13 @@ func (gts *grpcTraceServer) startTestServerWithHostError(_ *testing.T, gss Serve
|
|||
}
|
||||
|
||||
// sendTestRequest issues a ptraceotlp export request and captures metadata.
|
||||
func sendTestRequest(t *testing.T, gcs ClientConfig) (ptraceotlp.ExportResponse, error) {
|
||||
return sendTestRequestWithHost(t, gcs, componenttest.NewNopHost())
|
||||
func sendTestRequest(t *testing.T, cc ClientConfig) (ptraceotlp.ExportResponse, error) {
|
||||
return sendTestRequestWithHost(t, cc, componenttest.NewNopHost())
|
||||
}
|
||||
|
||||
// sendTestRequestWithHost is similar to sendTestRequest but allows specifying the host
|
||||
func sendTestRequestWithHost(t *testing.T, gcs ClientConfig, host component.Host) (ptraceotlp.ExportResponse, error) {
|
||||
grpcClientConn, errClient := gcs.ToClientConn(context.Background(), host, componenttest.NewNopTelemetrySettings())
|
||||
func sendTestRequestWithHost(t *testing.T, cc ClientConfig, host component.Host) (ptraceotlp.ExportResponse, error) {
|
||||
grpcClientConn, errClient := cc.ToClientConn(context.Background(), host, componenttest.NewNopTelemetrySettings())
|
||||
require.NoError(t, errClient)
|
||||
defer func() { assert.NoError(t, grpcClientConn.Close()) }()
|
||||
c := ptraceotlp.NewGRPCClient(grpcClientConn)
|
||||
|
|
|
|||
Loading…
Reference in New Issue