add metrics and traces for egress dials

Kubernetes-commit: fbb1fb8902c06cbcce47a025ce22fe260b27a697
This commit is contained in:
Chao Xu 2020-02-25 14:23:24 -08:00 committed by Kubernetes Publisher
parent b214a49983
commit 079efffdb4
2 changed files with 140 additions and 8 deletions

View File

@ -22,16 +22,20 @@ import (
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"fmt" "fmt"
"google.golang.org/grpc"
"io/ioutil" "io/ioutil"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/apis/apiserver"
"k8s.io/klog"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
"strings" "strings"
"time"
"google.golang.org/grpc"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apiserver/pkg/apis/apiserver"
egressmetrics "k8s.io/apiserver/pkg/server/egressselector/metrics"
"k8s.io/klog"
utiltrace "k8s.io/utils/trace"
client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
) )
var directDialer utilnet.DialFunc = http.DefaultTransport.(*http.Transport).DialContext var directDialer utilnet.DialFunc = http.DefaultTransport.(*http.Transport).DialContext
@ -152,7 +156,10 @@ func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialF
certPool = nil certPool = nil
} }
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) { contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
trace := utiltrace.New("Proxy via HTTP Connect over TCP", utiltrace.Field{Key: "address", Value: addr})
defer trace.LogIfLong(500 * time.Millisecond)
klog.V(4).Infof("Sending request to %q.", addr) klog.V(4).Infof("Sending request to %q.", addr)
start := time.Now()
proxyConn, err := tls.Dial("tcp", proxyAddress, proxyConn, err := tls.Dial("tcp", proxyAddress,
&tls.Config{ &tls.Config{
Certificates: []tls.Certificate{clientCerts}, Certificates: []tls.Certificate{clientCerts},
@ -160,27 +167,46 @@ func createConnectTCPDialer(tcpTransport *apiserver.TCPTransport) (utilnet.DialF
}, },
) )
if err != nil { if err != nil {
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP, egressmetrics.StageDial)
return nil, fmt.Errorf("dialing proxy %q failed: %v", proxyAddress, err) return nil, fmt.Errorf("dialing proxy %q failed: %v", proxyAddress, err)
} }
return tunnelHTTPConnect(proxyConn, proxyAddress, addr) ret, err := tunnelHTTPConnect(proxyConn, proxyAddress, addr)
if err != nil {
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP, egressmetrics.StageProxy)
return nil, err
}
egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportTCP)
return ret, nil
} }
return contextDialer, nil return contextDialer, nil
} }
func createConnectUDSDialer(udsConfig *apiserver.UDSTransport) (utilnet.DialFunc, error) { func createConnectUDSDialer(udsConfig *apiserver.UDSTransport) (utilnet.DialFunc, error) {
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) { contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
trace := utiltrace.New("Proxy via HTTP Connect over UDS", utiltrace.Field{Key: "address", Value: addr})
defer trace.LogIfLong(500 * time.Millisecond)
start := time.Now()
proxyConn, err := net.Dial("unix", udsConfig.UDSName) proxyConn, err := net.Dial("unix", udsConfig.UDSName)
if err != nil { if err != nil {
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS, egressmetrics.StageDial)
return nil, fmt.Errorf("dialing proxy %q failed: %v", udsConfig.UDSName, err) return nil, fmt.Errorf("dialing proxy %q failed: %v", udsConfig.UDSName, err)
} }
return tunnelHTTPConnect(proxyConn, udsConfig.UDSName, addr) ret, err := tunnelHTTPConnect(proxyConn, udsConfig.UDSName, addr)
if err != nil {
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS, egressmetrics.StageProxy)
return nil, err
}
egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolHTTPConnect, egressmetrics.TransportUDS)
return ret, nil
} }
return contextDialer, nil return contextDialer, nil
} }
func createGRPCUDSDialer(udsName string) (utilnet.DialFunc, error) { func createGRPCUDSDialer(udsName string) (utilnet.DialFunc, error) {
contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) { contextDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
trace := utiltrace.New("Proxy via GRPC over UDS", utiltrace.Field{Key: "address", Value: addr})
defer trace.LogIfLong(500 * time.Millisecond)
start := time.Now()
dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { dialOption := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
c, err := net.Dial("unix", udsName) c, err := net.Dial("unix", udsName)
if err != nil { if err != nil {
@ -191,13 +217,16 @@ func createGRPCUDSDialer(udsName string) (utilnet.DialFunc, error) {
tunnel, err := client.CreateGrpcTunnel(udsName, dialOption, grpc.WithInsecure()) tunnel, err := client.CreateGrpcTunnel(udsName, dialOption, grpc.WithInsecure())
if err != nil { if err != nil {
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS, egressmetrics.StageDial)
return nil, err return nil, err
} }
proxyConn, err := tunnel.Dial("tcp", addr) proxyConn, err := tunnel.Dial("tcp", addr)
if err != nil { if err != nil {
egressmetrics.Metrics.ObserveDialFailure(egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS, egressmetrics.StageProxy)
return nil, err return nil, err
} }
egressmetrics.Metrics.ObserveDialLatency(time.Since(start), egressmetrics.ProtocolGRPC, egressmetrics.TransportUDS)
return proxyConn, nil return proxyConn, nil
} }
return contextDialer, nil return contextDialer, nil

View File

@ -0,0 +1,103 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"time"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
namespace = "apiserver"
subsystem = "egress_dialer"
// ProtocolHTTPConnect means that the proxy protocol is http-connect.
ProtocolHTTPConnect = "http_connect"
// ProtocolHTTPGRPC means that the proxy protocol is the GRPC protocol.
ProtocolGRPC = "grpc"
// TransportTCP means that the transport is TCP.
TransportTCP = "tcp"
// TransportUDS means that the transport is UDS.
TransportUDS = "uds"
// StageTransport indicates that the dial failed at dialing to the proxy server.
StageDial = "dial"
// StageProtocol indicates that the dial failed at requesting the proxy server to proxy.
StageProxy = "proxy"
)
var (
// Use buckets ranging from 5 ms to 12.5 seconds.
latencyBuckets = []float64{0.005, 0.025, 0.1, 0.5, 2.5, 12.5}
latencySummaryMaxAge = 5 * time.Hour
// Metrics provides access to all dial metrics.
Metrics = newDialMetrics()
)
// DialMetrics instruments dials to proxy server with prometheus metrics.
type DialMetrics struct {
latencies *metrics.HistogramVec
failures *metrics.CounterVec
}
// newDialMetrics create a new DialMetrics, configured with default metric names.
func newDialMetrics() *DialMetrics {
latencies := metrics.NewHistogramVec(
&metrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dial_duration_seconds",
Help: "Dial latency histogram in seconds, labeled by the protocol (http-connect or grpc), transport (tcp or uds)",
Buckets: latencyBuckets,
StabilityLevel: metrics.ALPHA,
},
[]string{"protocol", "transport"},
)
failures := metrics.NewCounterVec(
&metrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dial_failure_count",
Help: "Dial failure count, labeled by the protocol (http-connect or grpc), transport (tcp or uds), and stage (dial or proxy). The stage indicates at which stage the dial failed",
StabilityLevel: metrics.ALPHA,
},
[]string{"protocol", "transport", "stage"},
)
legacyregistry.MustRegister(latencies)
legacyregistry.MustRegister(failures)
return &DialMetrics{latencies: latencies, failures: failures}
}
// Reset resets the metrics.
func (m *DialMetrics) Reset() {
m.latencies.Reset()
m.failures.Reset()
}
// ObserveDialLatency records the latency of a dial, labeled by protocol, transport.
func (m *DialMetrics) ObserveDialLatency(elapsed time.Duration, protocol, transport string) {
m.latencies.WithLabelValues(protocol, transport).Observe(elapsed.Seconds())
}
// ObserverDialFailure records a failed dial, labeled by protocol, transport, and the stage the dial failed at.
func (m *DialMetrics) ObserveDialFailure(protocol, transport, stage string) {
m.failures.WithLabelValues(protocol, transport, stage).Inc()
}