Updated client connection creation pattern

Signed-off-by: wei-chenglai <qazwsx0939059006@gmail.com>
This commit is contained in:
wei-chenglai 2025-01-08 00:56:53 -05:00
parent 56194cb968
commit 36a6a8f06c
1 changed files with 23 additions and 13 deletions

View File

@ -25,6 +25,7 @@ import (
"time" "time"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
grpccredentials "google.golang.org/grpc/credentials" grpccredentials "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
@ -102,11 +103,7 @@ func (s *ServerConfig) NewServer() (*grpc.Server, error) {
// DialWithTimeOut will attempt to create a client connection based on the given targets, one at a time, until a client connection is successfully established. // DialWithTimeOut will attempt to create a client connection based on the given targets, one at a time, until a client connection is successfully established.
func (c *ClientConfig) DialWithTimeOut(paths []string, timeout time.Duration) (*grpc.ClientConn, error) { func (c *ClientConfig) DialWithTimeOut(paths []string, timeout time.Duration) (*grpc.ClientConn, error) {
opts := []grpc.DialOption{ var opts []grpc.DialOption
// grpc.WithBlock is deprecated. TODO: Perhaps need to reconsider the approach in a future PR
//nolint:staticcheck
grpc.WithBlock(),
}
var cred grpccredentials.TransportCredentials var cred grpccredentials.TransportCredentials
if c.ServerAuthCAFile == "" && !c.InsecureSkipServerVerify { if c.ServerAuthCAFile == "" && !c.InsecureSkipServerVerify {
@ -136,9 +133,7 @@ func (c *ClientConfig) DialWithTimeOut(paths []string, timeout time.Duration) (*
} }
cred = grpccredentials.NewTLS(config) cred = grpccredentials.NewTLS(config)
} }
opts = append(opts, grpc.WithTransportCredentials(cred)) opts = append(opts, grpc.WithTransportCredentials(cred))
var cc *grpc.ClientConn var cc *grpc.ClientConn
var err error var err error
var allErrs []error var allErrs []error
@ -147,7 +142,7 @@ func (c *ClientConfig) DialWithTimeOut(paths []string, timeout time.Duration) (*
if err == nil { if err == nil {
return cc, nil return cc, nil
} }
allErrs = append(allErrs, err) allErrs = append(allErrs, fmt.Errorf("dial %s error: %v", path, err))
} }
return nil, utilerrors.NewAggregate(allErrs) return nil, utilerrors.NewAggregate(allErrs)
@ -157,12 +152,27 @@ func createGRPCConnection(path string, timeout time.Duration, opts ...grpc.DialO
ctx, cancel := context.WithTimeout(context.Background(), timeout) ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() defer cancel()
// grpc.DialContext is deprecated. TODO: Perhaps need to reconsider the approach in a future PR cc, err := grpc.NewClient(path, opts...)
//nolint:staticcheck
cc, err := grpc.DialContext(ctx, path, opts...)
if err != nil { if err != nil {
return nil, fmt.Errorf("dial %s error: %v", path, err) return nil, err
} }
defer func() {
if err != nil {
cc.Close()
}
}()
return cc, nil // A blocking dial blocks until the clientConn is ready.
for {
state := cc.GetState()
if state == connectivity.Idle {
cc.Connect()
}
if state == connectivity.Ready {
return cc, nil
}
if !cc.WaitForStateChange(ctx, state) {
return nil, fmt.Errorf("timeout waiting for connection to %s, state is %s", path, state)
}
}
} }