From cce656a60b2e2c1a9705ca217a14b4bea0b04b95 Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 8 Apr 2024 17:26:25 +0800 Subject: [PATCH] feat: remove otel in client (#3169) Signed-off-by: Gaius --- pkg/rpc/cdnsystem/client/client.go | 3 - pkg/rpc/dfdaemon/client/client_v1.go | 2 - pkg/rpc/dfdaemon/client/client_v2.go | 2 - pkg/rpc/dfdaemon/server/server.go | 2 - pkg/rpc/health/client/client.go | 2 - pkg/rpc/inference/client/client_v1.go | 2 - pkg/rpc/manager/client/client_v1.go | 2 - pkg/rpc/manager/client/client_v2.go | 2 - pkg/rpc/scheduler/client/client_v1.go | 1 - pkg/rpc/scheduler/client/client_v2.go | 1 - pkg/rpc/security/client/client_v1.go | 2 - pkg/rpc/trainer/client/client_v1.go | 2 - scheduler/resource/resource.go | 3 +- scheduler/scheduler.go | 5 +- test/e2e/constants.go | 3 +- test/e2e/network_topology_test.go | 156 -------------------------- 16 files changed, 6 insertions(+), 184 deletions(-) delete mode 100644 test/e2e/network_topology_test.go diff --git a/pkg/rpc/cdnsystem/client/client.go b/pkg/rpc/cdnsystem/client/client.go index fb4374330..d9553b947 100644 --- a/pkg/rpc/cdnsystem/client/client.go +++ b/pkg/rpc/cdnsystem/client/client.go @@ -26,7 +26,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/balancer" @@ -59,7 +58,6 @@ func GetClientByAddr(ctx context.Context, netAddr dfnet.NetAddr, opts ...grpc.Di netAddr.Addr, append([]grpc.DialOption{ grpc.WithIdleTimeout(0), - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( rpc.ConvertErrorUnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor, @@ -97,7 +95,6 @@ func GetClient(ctx context.Context, dynconfig config.DynconfigInterface, opts .. resolver.SeedPeerVirtualTarget, append([]grpc.DialOption{ grpc.WithIdleTimeout(0), - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( rpc.ConvertErrorUnaryClientInterceptor, diff --git a/pkg/rpc/dfdaemon/client/client_v1.go b/pkg/rpc/dfdaemon/client/client_v1.go index abad91310..d4514e165 100644 --- a/pkg/rpc/dfdaemon/client/client_v1.go +++ b/pkg/rpc/dfdaemon/client/client_v1.go @@ -26,7 +26,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/emptypb" @@ -49,7 +48,6 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err target, append([]grpc.DialOption{ grpc.WithIdleTimeout(0), - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( rpc.ConvertErrorUnaryClientInterceptor, grpc_prometheus.UnaryClientInterceptor, diff --git a/pkg/rpc/dfdaemon/client/client_v2.go b/pkg/rpc/dfdaemon/client/client_v2.go index f280ca9a4..b955a2115 100644 --- a/pkg/rpc/dfdaemon/client/client_v2.go +++ b/pkg/rpc/dfdaemon/client/client_v2.go @@ -25,7 +25,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/balancer" @@ -50,7 +49,6 @@ func GetV2(ctx context.Context, dynconfig config.DynconfigInterface, opts ...grp resolver.SeedPeerVirtualTarget, append([]grpc.DialOption{ grpc.WithIdleTimeout(0), - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc_prometheus.UnaryClientInterceptor, diff --git a/pkg/rpc/dfdaemon/server/server.go b/pkg/rpc/dfdaemon/server/server.go index e093e187d..c50bde638 100644 --- a/pkg/rpc/dfdaemon/server/server.go +++ b/pkg/rpc/dfdaemon/server/server.go @@ -25,7 +25,6 @@ import ( grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" @@ -63,7 +62,6 @@ func New(svr dfdaemonv1.DaemonServer, healthServer healthpb.HealthServer, opts . limiter := rpc.NewRateLimiterInterceptor(DefaultQPS, DefaultBurst) grpcServer := grpc.NewServer(append([]grpc.ServerOption{ - grpc.StatsHandler(otelgrpc.NewServerHandler()), grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionIdle: DefaultMaxConnectionIdle, MaxConnectionAge: DefaultMaxConnectionAge, diff --git a/pkg/rpc/health/client/client.go b/pkg/rpc/health/client/client.go index 7efd74e48..48318e8eb 100644 --- a/pkg/rpc/health/client/client.go +++ b/pkg/rpc/health/client/client.go @@ -26,7 +26,6 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -45,7 +44,6 @@ func GetClient(ctx context.Context, target string, opts ...grpc.DialOption) (Cli target, append([]grpc.DialOption{ grpc.WithIdleTimeout(0), - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), diff --git a/pkg/rpc/inference/client/client_v1.go b/pkg/rpc/inference/client/client_v1.go index 054d63686..7aabfc6da 100644 --- a/pkg/rpc/inference/client/client_v1.go +++ b/pkg/rpc/inference/client/client_v1.go @@ -26,7 +26,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" inference "d7y.io/api/v2/pkg/apis/inference" @@ -53,7 +52,6 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err target, append([]grpc.DialOption{ grpc.WithIdleTimeout(0), - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), diff --git a/pkg/rpc/manager/client/client_v1.go b/pkg/rpc/manager/client/client_v1.go index d8588b6ac..48a91df2a 100644 --- a/pkg/rpc/manager/client/client_v1.go +++ b/pkg/rpc/manager/client/client_v1.go @@ -27,7 +27,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -47,7 +46,6 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V target, append([]grpc.DialOption{ grpc.WithIdleTimeout(0), - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), diff --git a/pkg/rpc/manager/client/client_v2.go b/pkg/rpc/manager/client/client_v2.go index e4686ba1b..fa01a0c2b 100644 --- a/pkg/rpc/manager/client/client_v2.go +++ b/pkg/rpc/manager/client/client_v2.go @@ -27,7 +27,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -47,7 +46,6 @@ func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V target, append([]grpc.DialOption{ grpc.WithIdleTimeout(0), - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), diff --git a/pkg/rpc/scheduler/client/client_v1.go b/pkg/rpc/scheduler/client/client_v1.go index c547837d5..ee211aff0 100644 --- a/pkg/rpc/scheduler/client/client_v1.go +++ b/pkg/rpc/scheduler/client/client_v1.go @@ -53,7 +53,6 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt resolver.SchedulerVirtualTarget, append([]grpc.DialOption{ grpc.WithIdleTimeout(0), - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( rpc.ConvertErrorUnaryClientInterceptor, diff --git a/pkg/rpc/scheduler/client/client_v2.go b/pkg/rpc/scheduler/client/client_v2.go index 4b6db2c45..a79258b4b 100644 --- a/pkg/rpc/scheduler/client/client_v2.go +++ b/pkg/rpc/scheduler/client/client_v2.go @@ -52,7 +52,6 @@ func GetV2(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt resolver.SchedulerVirtualTarget, append([]grpc.DialOption{ grpc.WithIdleTimeout(0), - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc_prometheus.UnaryClientInterceptor, diff --git a/pkg/rpc/security/client/client_v1.go b/pkg/rpc/security/client/client_v1.go index 7e1bf90f1..d43f94d90 100644 --- a/pkg/rpc/security/client/client_v1.go +++ b/pkg/rpc/security/client/client_v1.go @@ -27,7 +27,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" securityv1 "d7y.io/api/v2/pkg/apis/security/v1" @@ -56,7 +55,6 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err target, append([]grpc.DialOption{ grpc.WithIdleTimeout(0), - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), diff --git a/pkg/rpc/trainer/client/client_v1.go b/pkg/rpc/trainer/client/client_v1.go index dc5e17ee2..ca8a0b41c 100644 --- a/pkg/rpc/trainer/client/client_v1.go +++ b/pkg/rpc/trainer/client/client_v1.go @@ -26,7 +26,6 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" trainerv1 "d7y.io/api/v2/pkg/apis/trainer/v1" @@ -50,7 +49,6 @@ func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V target, append([]grpc.DialOption{ grpc.WithIdleTimeout(0), - grpc.WithStatsHandler(otelgrpc.NewClientHandler()), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( grpc_prometheus.UnaryClientInterceptor, grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), diff --git a/scheduler/resource/resource.go b/scheduler/resource/resource.go index a772be995..ff6f9fcd2 100644 --- a/scheduler/resource/resource.go +++ b/scheduler/resource/resource.go @@ -19,6 +19,7 @@ package resource import ( + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -108,7 +109,7 @@ func New(cfg *config.Config, gc gc.GC, dynconfig config.DynconfigInterface, opti // Initialize seed peer interface. if cfg.SeedPeer.Enable { - dialOptions := []grpc.DialOption{} + dialOptions := []grpc.DialOption{grpc.WithStatsHandler(otelgrpc.NewClientHandler())} if resource.transportCredentials != nil { dialOptions = append(dialOptions, grpc.WithTransportCredentials(resource.transportCredentials)) } else { diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 1a2c66379..c9a95473c 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -28,6 +28,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/johanbrandhorst/certify" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -123,7 +124,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err s.storage = storage // Initialize dial options of manager grpc client. - managerDialOptions := []grpc.DialOption{} + managerDialOptions := []grpc.DialOption{grpc.WithStatsHandler(otelgrpc.NewClientHandler())} if cfg.Security.AutoIssueCert { clientTransportCredentials, err := rpc.NewClientCredentials(cfg.Security.TLSPolicy, nil, []byte(cfg.Security.CACert)) if err != nil { @@ -144,7 +145,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err // Initialize dial options of trainer grpc client. if cfg.Trainer.Enable { - trainerDialOptions := []grpc.DialOption{} + trainerDialOptions := []grpc.DialOption{grpc.WithStatsHandler(otelgrpc.NewClientHandler())} if cfg.Security.AutoIssueCert { clientTransportCredentials, err := rpc.NewClientCredentials(cfg.Security.TLSPolicy, nil, []byte(cfg.Security.CACert)) if err != nil { diff --git a/test/e2e/constants.go b/test/e2e/constants.go index 48e3a36e8..d6e2f9596 100644 --- a/test/e2e/constants.go +++ b/test/e2e/constants.go @@ -24,8 +24,7 @@ const ( ) const ( - dfdaemonCompatibilityTestMode = "dfdaemon" - schedulerCompatibilityTestMode = "scheduler" + dfdaemonCompatibilityTestMode = "dfdaemon" ) const ( diff --git a/test/e2e/network_topology_test.go b/test/e2e/network_topology_test.go deleted file mode 100644 index 8187867cb..000000000 --- a/test/e2e/network_topology_test.go +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Copyright 2024 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package e2e - -import ( - "context" - "fmt" - "os" - "strconv" - "strings" - "time" - - . "github.com/onsi/ginkgo/v2" //nolint - . "github.com/onsi/gomega" //nolint - - "d7y.io/dragonfly/v2/test/e2e/util" -) - -var _ = Describe("Evaluator with networkTopology", func() { - Context("networkTopology", func() { - It("check networkTopology in redis", Label("networkTopology"), func() { - mode := os.Getenv("DRAGONFLY_COMPATIBILITY_E2E_TEST_MODE") - if mode == schedulerCompatibilityTestMode { - fmt.Println("networkTopology is disable, skip") - return - } - Expect(waitForProbedInNetworkTopology()).Should(BeTrue()) - - if waitForProbedInNetworkTopology() == true { - time.Sleep(2 * time.Minute) - Expect(checkNetworkTopologyUpdated()).Should(BeTrue()) - } - }) - }) -}) - -// getRedisExec get redis pod. -func getRedisExec() *util.PodExec { - out, err := util.KubeCtlCommand("-n", dragonflyNamespace, "get", "pod", "-l", "app.kubernetes.io/name=redis", - "-o", "jsonpath='{range .items[0]}{.metadata.name}{end}'").CombinedOutput() - podName := strings.Trim(string(out), "'") - Expect(err).NotTo(HaveOccurred()) - fmt.Println(podName) - Expect(strings.HasPrefix(podName, "dragonfly-redis")).Should(BeTrue()) - return util.NewPodExec(dragonflyNamespace, podName, "redis") -} - -func waitForProbedInNetworkTopology() bool { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - redisPod := getRedisExec() - - for { - select { - case <-ctx.Done(): - return false - case <-ticker.C: - out, err := redisPod.Command("redis-cli", "-a", "dragonfly", "-n", "3", "dbsize").CombinedOutput() - Expect(err).NotTo(HaveOccurred()) - key, err := strconv.Atoi(strings.Split(string(out), "\n")[1]) - if key == 0 || err != nil { - continue - } - - out, err = redisPod.Command("redis-cli", "-a", "dragonfly", "-n", "3", "KEYS", "scheduler:network-topology:*").CombinedOutput() - Expect(err).NotTo(HaveOccurred()) - networkTopologyKey := strings.Split(string(out), "\n")[1] - if networkTopologyKey == "" || err != nil { - continue - } - networkTopologyOut, err := redisPod.Command("redis-cli", "-a", "dragonfly", "-n", "3", "HGETALL", networkTopologyKey).CombinedOutput() - Expect(err).NotTo(HaveOccurred()) - if networkTopologyOut == nil || err != nil { - continue - } - - out, err = redisPod.Command("redis-cli", "-a", "dragonfly", "-n", "3", "KEYS", "scheduler:probes:*").CombinedOutput() - Expect(err).NotTo(HaveOccurred()) - probesKey := strings.Split(string(out), "\n")[1] - if probesKey == "" || err != nil { - continue - } - probesOut, err := redisPod.Command("redis-cli", "-a", "dragonfly", "-n", "3", "LRANGE", probesKey, "0", "-1").CombinedOutput() - Expect(err).NotTo(HaveOccurred()) - if probesOut == nil || err != nil { - continue - } - - out, err = redisPod.Command("redis-cli", "-a", "dragonfly", "-n", "3", "KEYS", "scheduler:probed-count:*").CombinedOutput() - Expect(err).NotTo(HaveOccurred()) - probedCountKey := strings.Split(string(out), "\n")[1] - if probedCountKey == "" || err != nil { - continue - } - probedCountOut, err := redisPod.Command("redis-cli", "-a", "dragonfly", "-n", "3", "GET", probedCountKey).CombinedOutput() - Expect(err).NotTo(HaveOccurred()) - if probedCountOut == nil || err != nil { - continue - } - - return true - } - } -} - -func checkNetworkTopologyUpdated() bool { - redisPod := getRedisExec() - - var networkTopologyKey string - out, err := redisPod.Command("redis-cli", "-a", "dragonfly", "-n", "3", "KEYS", "scheduler:network-topology:*").CombinedOutput() - Expect(err).NotTo(HaveOccurred()) - for i := 1; i <= 3; i++ { - networkTopologyKey = strings.Split(string(out), "\n")[i] - updatedAtOut, err := redisPod.Command("redis-cli", "-a", "dragonfly", "-n", "3", "HGET", networkTopologyKey, "updatedAt").CombinedOutput() - Expect(err).NotTo(HaveOccurred()) - createdAtOut, err := redisPod.Command("redis-cli", "-a", "dragonfly", "-n", "3", "HGET", networkTopologyKey, "createdAt").CombinedOutput() - Expect(err).NotTo(HaveOccurred()) - if strings.Split(string(updatedAtOut), "\n")[1] == strings.Split(string(createdAtOut), "\n")[1] { - return false - } - } - - var probedCountKey string - out, err = redisPod.Command("redis-cli", "-a", "dragonfly", "-n", "3", "KEYS", "scheduler:probed-count:*").CombinedOutput() - Expect(err).NotTo(HaveOccurred()) - for i := 1; i <= 3; i++ { - probedCountKey = strings.Split(string(out), "\n")[i] - probedCountOut, err := redisPod.Command("redis-cli", "-a", "dragonfly", "-n", "3", "GET", probedCountKey).CombinedOutput() - Expect(err).NotTo(HaveOccurred()) - probedCount, err := strconv.Atoi(strings.Split(string(probedCountOut), "\n")[1]) - Expect(err).NotTo(HaveOccurred()) - if probedCount <= 1 && probedCount >= 50 { - return false - } - } - - return true -}