mirror of https://github.com/linkerd/linkerd2.git
353 lines
8.9 KiB
Go
353 lines
8.9 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"math"
|
|
"math/rand"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/runconduit/conduit/controller/api/proxy"
|
|
common "github.com/runconduit/conduit/controller/gen/common"
|
|
pb "github.com/runconduit/conduit/controller/gen/proxy/telemetry"
|
|
"github.com/runconduit/conduit/controller/k8s"
|
|
"github.com/runconduit/conduit/controller/util"
|
|
log "github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc/codes"
|
|
"k8s.io/api/core/v1"
|
|
// Load all the auth plugins for the cloud providers.
|
|
_ "k8s.io/client-go/plugin/pkg/client/auth"
|
|
)
|
|
|
|
/* A simple script for posting simulated telemetry data to the proxy api */
|
|
|
|
var (
|
|
grpcResponseCodes = []codes.Code{
|
|
codes.OK,
|
|
codes.PermissionDenied,
|
|
codes.Unavailable,
|
|
}
|
|
|
|
httpResponseCodes = []int{
|
|
http.StatusContinue,
|
|
http.StatusSwitchingProtocols,
|
|
http.StatusProcessing,
|
|
http.StatusOK,
|
|
http.StatusCreated,
|
|
http.StatusAccepted,
|
|
http.StatusNonAuthoritativeInfo,
|
|
http.StatusNoContent,
|
|
http.StatusResetContent,
|
|
http.StatusPartialContent,
|
|
http.StatusMultiStatus,
|
|
http.StatusAlreadyReported,
|
|
http.StatusIMUsed,
|
|
http.StatusMultipleChoices,
|
|
http.StatusMovedPermanently,
|
|
http.StatusFound,
|
|
http.StatusSeeOther,
|
|
http.StatusNotModified,
|
|
http.StatusUseProxy,
|
|
http.StatusTemporaryRedirect,
|
|
http.StatusPermanentRedirect,
|
|
http.StatusBadRequest,
|
|
http.StatusUnauthorized,
|
|
http.StatusPaymentRequired,
|
|
http.StatusForbidden,
|
|
http.StatusNotFound,
|
|
http.StatusMethodNotAllowed,
|
|
http.StatusNotAcceptable,
|
|
http.StatusProxyAuthRequired,
|
|
http.StatusRequestTimeout,
|
|
http.StatusConflict,
|
|
http.StatusGone,
|
|
http.StatusLengthRequired,
|
|
http.StatusPreconditionFailed,
|
|
http.StatusRequestEntityTooLarge,
|
|
http.StatusRequestURITooLong,
|
|
http.StatusUnsupportedMediaType,
|
|
http.StatusRequestedRangeNotSatisfiable,
|
|
http.StatusExpectationFailed,
|
|
http.StatusTeapot,
|
|
http.StatusUnprocessableEntity,
|
|
http.StatusLocked,
|
|
http.StatusFailedDependency,
|
|
http.StatusUpgradeRequired,
|
|
http.StatusPreconditionRequired,
|
|
http.StatusTooManyRequests,
|
|
http.StatusRequestHeaderFieldsTooLarge,
|
|
http.StatusUnavailableForLegalReasons,
|
|
http.StatusInternalServerError,
|
|
http.StatusNotImplemented,
|
|
http.StatusBadGateway,
|
|
http.StatusServiceUnavailable,
|
|
http.StatusGatewayTimeout,
|
|
http.StatusHTTPVersionNotSupported,
|
|
http.StatusVariantAlsoNegotiates,
|
|
http.StatusInsufficientStorage,
|
|
http.StatusLoopDetected,
|
|
http.StatusNotExtended,
|
|
http.StatusNetworkAuthenticationRequired,
|
|
}
|
|
|
|
ports = []uint32{3333, 6262}
|
|
|
|
// latencyBucketBounds holds the maximum value (inclusive, in tenths of a
|
|
// millisecond) that may be counted in a given histogram bucket.
|
|
|
|
// These values are one order of magnitude greater than the controller's
|
|
// Prometheus buckets, because the proxy will reports latencies in tenths
|
|
// of a millisecond rather than whole milliseconds.
|
|
latencyBucketBounds = [26]uint32{
|
|
// prometheus.LinearBuckets(1, 1, 5),
|
|
10, 20, 30, 40, 50,
|
|
// prometheus.LinearBuckets(10, 10, 5),
|
|
100, 200, 300, 400, 50,
|
|
// prometheus.LinearBuckets(100, 100, 5),
|
|
1000, 2000, 3000, 4000, 5000,
|
|
// prometheus.LinearBuckets(1000, 1000, 5),
|
|
10000, 20000, 30000, 40000, 5000,
|
|
// prometheus.LinearBuckets(10000, 10000, 5),
|
|
100000, 200000, 300000, 400000, 500000,
|
|
// Prometheus implicitly creates a max bucket for everything that
|
|
// falls outside of the highest-valued bucket, but we need to
|
|
// create it explicitly.
|
|
math.MaxUint32,
|
|
}
|
|
)
|
|
|
|
func randomPort() uint32 {
|
|
return ports[rand.Intn(len(ports))]
|
|
}
|
|
|
|
func randomCount() uint32 {
|
|
return uint32(rand.Int31n(100) + 1)
|
|
}
|
|
|
|
func randomLatencies(count uint32) []uint32 {
|
|
latencies := make([]uint32, len(latencyBucketBounds))
|
|
for i := uint32(0); i < count; i++ {
|
|
|
|
// Randomly select a bucket to increment.
|
|
bucket := uint32(rand.Int31n(int32(len(latencies))))
|
|
latencies[bucket]++
|
|
}
|
|
return latencies
|
|
}
|
|
|
|
func randomGrpcEos(count uint32) (eos []*pb.EosScope) {
|
|
grpcResponseCodes := make(map[uint32]uint32)
|
|
for i := uint32(0); i < count; i++ {
|
|
grpcResponseCodes[randomGrpcResponseCode()] += 1
|
|
}
|
|
for code, streamCount := range grpcResponseCodes {
|
|
eos = append(eos, &pb.EosScope{
|
|
Ctx: &pb.EosCtx{End: &pb.EosCtx_GrpcStatusCode{GrpcStatusCode: code}},
|
|
Streams: streamCount,
|
|
})
|
|
}
|
|
return
|
|
}
|
|
|
|
func randomH2Eos(count uint32) (eos []*pb.EosScope) {
|
|
for i := uint32(0); i < count; i++ {
|
|
eos = append(eos, &pb.EosScope{
|
|
Ctx: &pb.EosCtx{End: &pb.EosCtx_Other{Other: true}},
|
|
Streams: uint32(rand.Int31()),
|
|
})
|
|
}
|
|
return
|
|
}
|
|
|
|
func randomGrpcResponseCode() uint32 {
|
|
return uint32(grpcResponseCodes[rand.Intn(len(grpcResponseCodes))])
|
|
}
|
|
|
|
func randomHttpResponseCode() uint32 {
|
|
return uint32(httpResponseCodes[rand.Intn(len(httpResponseCodes))])
|
|
}
|
|
|
|
func stringToIp(str string) *common.IPAddress {
|
|
octets := make([]uint8, 0)
|
|
for _, num := range strings.Split(str, ".") {
|
|
oct, _ := strconv.Atoi(num)
|
|
octets = append(octets, uint8(oct))
|
|
}
|
|
return util.IPV4(octets[0], octets[1], octets[2], octets[3])
|
|
}
|
|
|
|
func podIndexFunc(obj interface{}) ([]string, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func randomPod(pods []*v1.Pod, prvPodIp *common.IPAddress) *common.IPAddress {
|
|
var podIp *common.IPAddress
|
|
for {
|
|
if podIp != nil {
|
|
break
|
|
}
|
|
|
|
randomPod := pods[rand.Intn(len(pods))]
|
|
if strings.HasPrefix(randomPod.GetNamespace(), "kube-") {
|
|
continue // skip pods in the kube-* namespaces
|
|
}
|
|
podIp = stringToIp(randomPod.Status.PodIP)
|
|
if prvPodIp != nil && podIp.GetIpv4() == prvPodIp.GetIpv4() {
|
|
podIp = nil
|
|
}
|
|
}
|
|
return podIp
|
|
}
|
|
|
|
func main() {
|
|
rand.Seed(time.Now().UnixNano())
|
|
|
|
addr := flag.String("addr", ":8086", "address of proxy api")
|
|
requestCount := flag.Int("requests", 0, "number of api requests to make (default: infinite)")
|
|
sleep := flag.Duration("sleep", time.Second, "time to sleep between requests")
|
|
maxPods := flag.Int("max-pods", 0, "total number of pods to simulate (default unlimited)")
|
|
kubeConfigPath := flag.String("kubeconfig", "", "path to kube config - required")
|
|
flag.Parse()
|
|
|
|
if len(flag.Args()) > 0 {
|
|
log.Fatal("Unable to parse command line arguments")
|
|
return
|
|
}
|
|
|
|
client, conn, err := proxy.NewTelemetryClient(*addr)
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
}
|
|
defer conn.Close()
|
|
|
|
clientSet, err := k8s.NewClientSet(*kubeConfigPath)
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
}
|
|
|
|
pods, err := k8s.NewPodIndex(clientSet, podIndexFunc)
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
}
|
|
|
|
err = pods.Run()
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
}
|
|
|
|
podList, err := pods.List()
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
}
|
|
|
|
allPods := make([]*v1.Pod, 0)
|
|
for _, pod := range podList {
|
|
if pod.Status.PodIP != "" && (*maxPods == 0 || len(allPods) < *maxPods) {
|
|
allPods = append(allPods, pod)
|
|
}
|
|
}
|
|
|
|
for i := 0; (*requestCount == 0) || (i < *requestCount); i++ {
|
|
count := randomCount()
|
|
sourceIp := randomPod(allPods, nil)
|
|
targetIp := randomPod(allPods, sourceIp)
|
|
|
|
req := &pb.ReportRequest{
|
|
Process: &pb.Process{
|
|
ScheduledInstance: "hello-1mfa0",
|
|
ScheduledNamespace: "people",
|
|
},
|
|
ClientTransports: []*pb.ClientTransport{
|
|
// TCP
|
|
&pb.ClientTransport{
|
|
TargetAddr: &common.TcpAddress{
|
|
Ip: targetIp,
|
|
Port: randomPort(),
|
|
},
|
|
Connects: count,
|
|
Disconnects: []*pb.TransportSummary{
|
|
&pb.TransportSummary{
|
|
DurationMs: uint64(randomCount()),
|
|
BytesSent: uint64(randomCount()),
|
|
},
|
|
},
|
|
Protocol: common.Protocol_TCP,
|
|
},
|
|
},
|
|
ServerTransports: []*pb.ServerTransport{
|
|
// TCP
|
|
&pb.ServerTransport{
|
|
SourceIp: sourceIp,
|
|
Connects: count,
|
|
Disconnects: []*pb.TransportSummary{
|
|
&pb.TransportSummary{
|
|
DurationMs: uint64(randomCount()),
|
|
BytesSent: uint64(randomCount()),
|
|
},
|
|
},
|
|
Protocol: common.Protocol_TCP,
|
|
},
|
|
},
|
|
Proxy: pb.ReportRequest_INBOUND,
|
|
Requests: []*pb.RequestScope{
|
|
|
|
// gRPC
|
|
&pb.RequestScope{
|
|
Ctx: &pb.RequestCtx{
|
|
SourceIp: sourceIp,
|
|
TargetAddr: &common.TcpAddress{
|
|
Ip: targetIp,
|
|
Port: randomPort(),
|
|
},
|
|
Authority: "world.greeting:7778",
|
|
},
|
|
Count: count,
|
|
Responses: []*pb.ResponseScope{
|
|
&pb.ResponseScope{
|
|
Ctx: &pb.ResponseCtx{
|
|
HttpStatusCode: http.StatusOK,
|
|
},
|
|
ResponseLatencyCounts: randomLatencies(count),
|
|
Ends: randomGrpcEos(count),
|
|
},
|
|
},
|
|
},
|
|
|
|
// HTTP/2
|
|
&pb.RequestScope{
|
|
Ctx: &pb.RequestCtx{
|
|
SourceIp: sourceIp,
|
|
TargetAddr: &common.TcpAddress{
|
|
Ip: targetIp,
|
|
Port: randomPort(),
|
|
},
|
|
Authority: "world.greeting:7778",
|
|
},
|
|
Count: count,
|
|
Responses: []*pb.ResponseScope{
|
|
&pb.ResponseScope{
|
|
Ctx: &pb.ResponseCtx{
|
|
HttpStatusCode: randomHttpResponseCode(),
|
|
},
|
|
ResponseLatencyCounts: randomLatencies(count),
|
|
Ends: randomH2Eos(count),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
|
|
HistogramBucketBoundsTenthMs: latencyBucketBounds[:],
|
|
}
|
|
|
|
_, err = client.Report(context.Background(), req)
|
|
if err != nil {
|
|
log.Fatal(err.Error())
|
|
}
|
|
|
|
time.Sleep(*sleep)
|
|
}
|
|
}
|