Bump github.com/uber/jaeger-client-go

Bumps [github.com/uber/jaeger-client-go](https://github.com/uber/jaeger-client-go) from 2.24.0+incompatible to 2.25.0+incompatible.
- [Release notes](https://github.com/uber/jaeger-client-go/releases)
- [Changelog](https://github.com/jaegertracing/jaeger-client-go/blob/master/CHANGELOG.md)
- [Commits](https://github.com/uber/jaeger-client-go/compare/v2.24.0...v2.25.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
Signed-off-by: Daniel J Walsh <dwalsh@redhat.com>
This commit is contained in:
dependabot-preview[bot] 2020-07-14 17:02:43 +00:00 committed by Daniel J Walsh
parent d83077b16c
commit 4113d2c298
No known key found for this signature in database
GPG Key ID: A2DF901DABE2C028
14 changed files with 464 additions and 67 deletions

2
go.mod
View File

@ -53,7 +53,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.6.1
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2
github.com/uber/jaeger-client-go v2.24.0+incompatible
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
github.com/varlink/go v0.0.0-20190502142041-0f1d566d194b
github.com/vishvananda/netlink v1.1.0

5
go.sum
View File

@ -430,6 +430,7 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
@ -445,8 +446,8 @@ github.com/tchap/go-patricia v2.3.0+incompatible h1:GkY4dP3cEfEASBPPkWd+AmjYxhmD
github.com/tchap/go-patricia v2.3.0+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/u-root/u-root v6.0.0+incompatible/go.mod h1:RYkpo8pTHrNjW08opNd/U6p/RJE7K0D8fXO0d47+3YY=
github.com/uber/jaeger-client-go v2.24.0+incompatible h1:CGchgJcHsDd2jWnaL4XngByMrXoGHh3n8oCqAKx0uMo=
github.com/uber/jaeger-client-go v2.24.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U=
github.com/uber/jaeger-client-go v2.25.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=

View File

@ -1,6 +1,19 @@
Changes by Version
==================
2.25.0 (2020-07-13)
-------------------
## Breaking changes
- [feat] Periodically re-resolve UDP server address, with opt-out (#520) -- Trevor Foster
The re-resolving of UDP address is now enabled by default, to make the client more robust in Kubernetes deployments.
The old resolve-once behavior can be restored by setting DisableAttemptReconnecting=true in the Configuration struct,
or via JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED=true environment variable.
## Bug fixes
- Do not add invalid context to references (#521) -- Yuri Shkuro
2.24.0 (2020-06-14)
-------------------
- Mention FromEnv() in the README, docs, and examples (#518) -- Martin Lercher

View File

@ -142,10 +142,19 @@
version = "v0.0.5"
[[projects]]
digest = "1:0496f0e99014b7fd0a560c539f51d0882731137b85494142f47e550e4657176a"
digest = "1:ac83cf90d08b63ad5f7e020ef480d319ae890c208f8524622a2f3136e2686b02"
name = "github.com/stretchr/objx"
packages = ["."]
pruneopts = "UT"
revision = "477a77ecc69700c7cdeb1fa9e129548e1c1c393c"
version = "v0.1.1"
[[projects]]
digest = "1:d88ba57c4e8f5db6ce9ab6605a89f4542ee751b576884ba5271c2ba3d4b6f2d2"
name = "github.com/stretchr/testify"
packages = [
"assert",
"mock",
"require",
"suite",
]
@ -153,6 +162,42 @@
revision = "221dbe5ed46703ee255b1da0dec05086f5035f62"
version = "v1.4.0"
[[projects]]
digest = "1:5b98956718573850caf7e0fd00b571a6657c4ef1f345ddf0c96b43ce355fe862"
name = "github.com/uber/jaeger-client-go"
packages = [
".",
"config",
"crossdock/client",
"crossdock/common",
"crossdock/endtoend",
"crossdock/log",
"crossdock/server",
"crossdock/thrift/tracetest",
"internal/baggage",
"internal/baggage/remote",
"internal/reporterstats",
"internal/spanlog",
"internal/throttler",
"internal/throttler/remote",
"log",
"log/zap/mock_opentracing",
"rpcmetrics",
"testutils",
"thrift",
"thrift-gen/agent",
"thrift-gen/baggage",
"thrift-gen/jaeger",
"thrift-gen/sampling",
"thrift-gen/zipkincore",
"transport",
"transport/zipkin",
"utils",
]
pruneopts = "UT"
revision = "66c008c3d6ad856cac92a0af53186efbffa8e6a5"
version = "v2.24.0"
[[projects]]
digest = "1:0ec60ffd594af00ba1660bc746aa0e443d27dd4003dee55f9d08a0b4ff5431a3"
name = "github.com/uber/jaeger-lib"
@ -314,8 +359,36 @@
"github.com/pkg/errors",
"github.com/prometheus/client_golang/prometheus",
"github.com/stretchr/testify/assert",
"github.com/stretchr/testify/mock",
"github.com/stretchr/testify/require",
"github.com/stretchr/testify/suite",
"github.com/uber/jaeger-client-go",
"github.com/uber/jaeger-client-go/config",
"github.com/uber/jaeger-client-go/crossdock/client",
"github.com/uber/jaeger-client-go/crossdock/common",
"github.com/uber/jaeger-client-go/crossdock/endtoend",
"github.com/uber/jaeger-client-go/crossdock/log",
"github.com/uber/jaeger-client-go/crossdock/server",
"github.com/uber/jaeger-client-go/crossdock/thrift/tracetest",
"github.com/uber/jaeger-client-go/internal/baggage",
"github.com/uber/jaeger-client-go/internal/baggage/remote",
"github.com/uber/jaeger-client-go/internal/reporterstats",
"github.com/uber/jaeger-client-go/internal/spanlog",
"github.com/uber/jaeger-client-go/internal/throttler",
"github.com/uber/jaeger-client-go/internal/throttler/remote",
"github.com/uber/jaeger-client-go/log",
"github.com/uber/jaeger-client-go/log/zap/mock_opentracing",
"github.com/uber/jaeger-client-go/rpcmetrics",
"github.com/uber/jaeger-client-go/testutils",
"github.com/uber/jaeger-client-go/thrift",
"github.com/uber/jaeger-client-go/thrift-gen/agent",
"github.com/uber/jaeger-client-go/thrift-gen/baggage",
"github.com/uber/jaeger-client-go/thrift-gen/jaeger",
"github.com/uber/jaeger-client-go/thrift-gen/sampling",
"github.com/uber/jaeger-client-go/thrift-gen/zipkincore",
"github.com/uber/jaeger-client-go/transport",
"github.com/uber/jaeger-client-go/transport/zipkin",
"github.com/uber/jaeger-client-go/utils",
"github.com/uber/jaeger-lib/metrics",
"github.com/uber/jaeger-lib/metrics/metricstest",
"github.com/uber/jaeger-lib/metrics/prometheus",

View File

@ -61,6 +61,8 @@ JAEGER_PASSWORD | Password to send as part of "Basic" authentication to the coll
JAEGER_REPORTER_LOG_SPANS | Whether the reporter should also log the spans" `true` or `false` (default `false`).
JAEGER_REPORTER_MAX_QUEUE_SIZE | The reporter's maximum queue size (default `100`).
JAEGER_REPORTER_FLUSH_INTERVAL | The reporter's flush interval, with units, e.g. `500ms` or `2s` ([valid units][timeunits]; default `1s`).
JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED | When true, disables udp connection helper that periodically re-resolves the agent's hostname and reconnects if there was a change (default `false`).
JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL | Controls how often the agent client re-resolves the provided hostname in order to detect address changes ([valid units][timeunits]; default `30s`).
JAEGER_SAMPLER_TYPE | The sampler type: `remote`, `const`, `probabilistic`, `ratelimiting` (default `remote`). See also https://www.jaegertracing.io/docs/latest/sampling/.
JAEGER_SAMPLER_PARAM | The sampler parameter (number).
JAEGER_SAMPLER_MANAGER_HOST_PORT | (deprecated) The HTTP endpoint when using the `remote` sampler.

View File

@ -22,6 +22,7 @@ import (
"time"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/utils"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/internal/baggage/remote"
@ -124,6 +125,17 @@ type ReporterConfig struct {
// Can be provided by FromEnv() via the environment variable named JAEGER_AGENT_HOST / JAEGER_AGENT_PORT
LocalAgentHostPort string `yaml:"localAgentHostPort"`
// DisableAttemptReconnecting when true, disables udp connection helper that periodically re-resolves
// the agent's hostname and reconnects if there was a change. This option only
// applies if LocalAgentHostPort is specified.
// Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED
DisableAttemptReconnecting bool `yaml:"disableAttemptReconnecting"`
// AttemptReconnectInterval controls how often the agent client re-resolves the provided hostname
// in order to detect address changes. This option only applies if DisableAttemptReconnecting is false.
// Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL
AttemptReconnectInterval time.Duration
// CollectorEndpoint instructs reporter to send spans to jaeger-collector at this URL.
// Can be provided by FromEnv() via the environment variable named JAEGER_ENDPOINT
CollectorEndpoint string `yaml:"collectorEndpoint"`
@ -384,7 +396,7 @@ func (rc *ReporterConfig) NewReporter(
metrics *jaeger.Metrics,
logger jaeger.Logger,
) (jaeger.Reporter, error) {
sender, err := rc.newTransport()
sender, err := rc.newTransport(logger)
if err != nil {
return nil, err
}
@ -401,7 +413,7 @@ func (rc *ReporterConfig) NewReporter(
return reporter, err
}
func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) {
func (rc *ReporterConfig) newTransport(logger jaeger.Logger) (jaeger.Transport, error) {
switch {
case rc.CollectorEndpoint != "":
httpOptions := []transport.HTTPOption{transport.HTTPBatchSize(1), transport.HTTPHeaders(rc.HTTPHeaders)}
@ -410,6 +422,13 @@ func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) {
}
return transport.NewHTTPTransport(rc.CollectorEndpoint, httpOptions...), nil
default:
return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0)
return jaeger.NewUDPTransportWithParams(jaeger.UDPTransportParams{
AgentClientUDPParams: utils.AgentClientUDPParams{
HostPort: rc.LocalAgentHostPort,
Logger: logger,
DisableAttemptReconnecting: rc.DisableAttemptReconnecting,
AttemptReconnectInterval: rc.AttemptReconnectInterval,
},
})
}
}

View File

@ -24,30 +24,31 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/uber/jaeger-client-go"
)
const (
// environment variable names
envServiceName = "JAEGER_SERVICE_NAME"
envDisabled = "JAEGER_DISABLED"
envRPCMetrics = "JAEGER_RPC_METRICS"
envTags = "JAEGER_TAGS"
envSamplerType = "JAEGER_SAMPLER_TYPE"
envSamplerParam = "JAEGER_SAMPLER_PARAM"
envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint
envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT"
envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS"
envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL"
envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE"
envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL"
envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS"
envEndpoint = "JAEGER_ENDPOINT"
envUser = "JAEGER_USER"
envPassword = "JAEGER_PASSWORD"
envAgentHost = "JAEGER_AGENT_HOST"
envAgentPort = "JAEGER_AGENT_PORT"
envServiceName = "JAEGER_SERVICE_NAME"
envDisabled = "JAEGER_DISABLED"
envRPCMetrics = "JAEGER_RPC_METRICS"
envTags = "JAEGER_TAGS"
envSamplerType = "JAEGER_SAMPLER_TYPE"
envSamplerParam = "JAEGER_SAMPLER_PARAM"
envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint
envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT"
envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS"
envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL"
envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE"
envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL"
envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS"
envReporterAttemptReconnectingDisabled = "JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED"
envReporterAttemptReconnectInterval = "JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL"
envEndpoint = "JAEGER_ENDPOINT"
envUser = "JAEGER_USER"
envPassword = "JAEGER_PASSWORD"
envAgentHost = "JAEGER_AGENT_HOST"
envAgentPort = "JAEGER_AGENT_PORT"
)
// FromEnv uses environment variables to set the tracer's Configuration
@ -206,6 +207,24 @@ func (rc *ReporterConfig) reporterConfigFromEnv() (*ReporterConfig, error) {
if useEnv || rc.LocalAgentHostPort == "" {
rc.LocalAgentHostPort = fmt.Sprintf("%s:%d", host, port)
}
if e := os.Getenv(envReporterAttemptReconnectingDisabled); e != "" {
if value, err := strconv.ParseBool(e); err == nil {
rc.DisableAttemptReconnecting = value
} else {
return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectingDisabled, e)
}
}
if !rc.DisableAttemptReconnecting {
if e := os.Getenv(envReporterAttemptReconnectInterval); e != "" {
if value, err := time.ParseDuration(e); err == nil {
rc.AttemptReconnectInterval = value
} else {
return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectInterval, e)
}
}
}
}
return rc, nil

View File

@ -22,7 +22,7 @@ import (
const (
// JaegerClientVersion is the version of the client library reported as Span tag.
JaegerClientVersion = "Go-2.24.0"
JaegerClientVersion = "Go-2.25.0"
// JaegerClientVersionTagKey is the name of the tag used to report client version.
JaegerClientVersionTagKey = "jaeger.version"

View File

@ -212,10 +212,14 @@ func (c SpanContext) SetFirehose() {
}
func (c SpanContext) String() string {
if c.traceID.High == 0 {
return fmt.Sprintf("%016x:%016x:%016x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
var flags int32
if c.samplingState != nil {
flags = c.samplingState.stateFlags.Load()
}
return fmt.Sprintf("%016x%016x:%016x:%016x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load())
if c.traceID.High == 0 {
return fmt.Sprintf("%016x:%016x:%016x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), flags)
}
return fmt.Sprintf("%016x%016x:%016x:%016x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), flags)
}
// ContextFromString reconstructs the Context encoded in a string

View File

@ -216,10 +216,10 @@ func (t *Tracer) startSpanWithOptions(
options.StartTime = t.timeNow()
}
// Predicate whether the given span context is a valid reference
// which may be used as parent / debug ID / baggage items source
isValidReference := func(ctx SpanContext) bool {
return ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0
// Predicate whether the given span context is an empty reference
// or may be used as parent / debug ID / baggage items source
isEmptyReference := func(ctx SpanContext) bool {
return !ctx.IsValid() && !ctx.isDebugIDContainerOnly() && len(ctx.baggage) == 0
}
var references []Reference
@ -235,7 +235,7 @@ func (t *Tracer) startSpanWithOptions(
reflect.ValueOf(ref.ReferencedContext)))
continue
}
if !isValidReference(ctxRef) {
if isEmptyReference(ctxRef) {
continue
}
@ -245,14 +245,17 @@ func (t *Tracer) startSpanWithOptions(
continue
}
references = append(references, Reference{Type: ref.Type, Context: ctxRef})
if ctxRef.IsValid() {
// we don't want empty context that contains only debug-id or baggage
references = append(references, Reference{Type: ref.Type, Context: ctxRef})
}
if !hasParent {
parent = ctxRef
hasParent = ref.Type == opentracing.ChildOfRef
}
}
if !hasParent && isValidReference(parent) {
if !hasParent && !isEmptyReference(parent) {
// If ChildOfRef wasn't found but a FollowFromRef exists, use the context from
// the FollowFromRef as the parent
hasParent = true

View File

@ -19,6 +19,7 @@ import (
"fmt"
"github.com/uber/jaeger-client-go/internal/reporterstats"
"github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-client-go/thrift"
j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
"github.com/uber/jaeger-client-go/utils"
@ -57,35 +58,57 @@ type udpSender struct {
failedToEmitSpans int64
}
// NewUDPTransport creates a reporter that submits spans to jaeger-agent.
// UDPTransportParams allows specifying options for initializing a UDPTransport. An instance of this struct should
// be passed to NewUDPTransportWithParams.
type UDPTransportParams struct {
utils.AgentClientUDPParams
}
// NewUDPTransportWithParams creates a reporter that submits spans to jaeger-agent.
// TODO: (breaking change) move to transport/ package.
func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
if len(hostPort) == 0 {
hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
func NewUDPTransportWithParams(params UDPTransportParams) (Transport, error) {
if len(params.HostPort) == 0 {
params.HostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
}
if maxPacketSize == 0 {
maxPacketSize = utils.UDPPacketMaxLength
if params.Logger == nil {
params.Logger = log.StdLogger
}
if params.MaxPacketSize == 0 {
params.MaxPacketSize = utils.UDPPacketMaxLength
}
protocolFactory := thrift.NewTCompactProtocolFactory()
// Each span is first written to thriftBuffer to determine its size in bytes.
thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize)
thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
thriftProtocol := protocolFactory.GetProtocol(thriftBuffer)
client, err := utils.NewAgentClientUDP(hostPort, maxPacketSize)
client, err := utils.NewAgentClientUDPWithParams(params.AgentClientUDPParams)
if err != nil {
return nil, err
}
return &udpSender{
client: client,
maxSpanBytes: maxPacketSize - emitBatchOverhead,
maxSpanBytes: params.MaxPacketSize - emitBatchOverhead,
thriftBuffer: thriftBuffer,
thriftProtocol: thriftProtocol,
}, nil
}
// NewUDPTransport creates a reporter that submits spans to jaeger-agent.
// TODO: (breaking change) move to transport/ package.
func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
return NewUDPTransportWithParams(UDPTransportParams{
AgentClientUDPParams: utils.AgentClientUDPParams{
HostPort: hostPort,
MaxPacketSize: maxPacketSize,
},
})
}
// SetReporterStats implements reporterstats.Receiver.
func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) {
s.reporterStats = rs

View File

@ -0,0 +1,189 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/uber/jaeger-client-go/log"
)
// reconnectingUDPConn is an implementation of udpConn that resolves hostPort every resolveTimeout, if the resolved address is
// different than the current conn then the new address is dialed and the conn is swapped.
type reconnectingUDPConn struct {
hostPort string
resolveFunc resolveFunc
dialFunc dialFunc
logger log.Logger
bufferBytes int64
connMtx sync.RWMutex
conn *net.UDPConn
destAddr *net.UDPAddr
closeChan chan struct{}
}
type resolveFunc func(network string, hostPort string) (*net.UDPAddr, error)
type dialFunc func(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error)
// newReconnectingUDPConn returns a new udpConn that resolves hostPort every resolveTimeout, if the resolved address is
// different than the current conn then the new address is dialed and the conn is swapped.
func newReconnectingUDPConn(hostPort string, resolveTimeout time.Duration, resolveFunc resolveFunc, dialFunc dialFunc, logger log.Logger) (*reconnectingUDPConn, error) {
conn := &reconnectingUDPConn{
hostPort: hostPort,
resolveFunc: resolveFunc,
dialFunc: dialFunc,
logger: logger,
closeChan: make(chan struct{}),
}
if err := conn.attemptResolveAndDial(); err != nil {
logger.Error(fmt.Sprintf("failed resolving destination address on connection startup, with err: %q. retrying in %s", err.Error(), resolveTimeout))
}
go conn.reconnectLoop(resolveTimeout)
return conn, nil
}
func (c *reconnectingUDPConn) reconnectLoop(resolveTimeout time.Duration) {
ticker := time.NewTicker(resolveTimeout)
defer ticker.Stop()
for {
select {
case <-c.closeChan:
return
case <-ticker.C:
if err := c.attemptResolveAndDial(); err != nil {
c.logger.Error(err.Error())
}
}
}
}
func (c *reconnectingUDPConn) attemptResolveAndDial() error {
newAddr, err := c.resolveFunc("udp", c.hostPort)
if err != nil {
return fmt.Errorf("failed to resolve new addr for host %q, with err: %w", c.hostPort, err)
}
c.connMtx.RLock()
curAddr := c.destAddr
c.connMtx.RUnlock()
// dont attempt dial if an addr was successfully dialed previously and, resolved addr is the same as current conn
if curAddr != nil && newAddr.String() == curAddr.String() {
return nil
}
if err := c.attemptDialNewAddr(newAddr); err != nil {
return fmt.Errorf("failed to dial newly resolved addr '%s', with err: %w", newAddr, err)
}
return nil
}
func (c *reconnectingUDPConn) attemptDialNewAddr(newAddr *net.UDPAddr) error {
connUDP, err := c.dialFunc(newAddr.Network(), nil, newAddr)
if err != nil {
return err
}
if bufferBytes := int(atomic.LoadInt64(&c.bufferBytes)); bufferBytes != 0 {
if err = connUDP.SetWriteBuffer(bufferBytes); err != nil {
return err
}
}
c.connMtx.Lock()
c.destAddr = newAddr
// store prev to close later
prevConn := c.conn
c.conn = connUDP
c.connMtx.Unlock()
if prevConn != nil {
return prevConn.Close()
}
return nil
}
// Write calls net.udpConn.Write, if it fails an attempt is made to connect to a new addr, if that succeeds the write is retried before returning
func (c *reconnectingUDPConn) Write(b []byte) (int, error) {
var bytesWritten int
var err error
c.connMtx.RLock()
if c.conn == nil {
// if connection is not initialized indicate this with err in order to hook into retry logic
err = fmt.Errorf("UDP connection not yet initialized, an address has not been resolved")
} else {
bytesWritten, err = c.conn.Write(b)
}
c.connMtx.RUnlock()
if err == nil {
return bytesWritten, nil
}
// attempt to resolve and dial new address in case that's the problem, if resolve and dial succeeds, try write again
if reconnErr := c.attemptResolveAndDial(); reconnErr == nil {
c.connMtx.RLock()
defer c.connMtx.RUnlock()
return c.conn.Write(b)
}
// return original error if reconn fails
return bytesWritten, err
}
// Close stops the reconnectLoop, then closes the connection via net.udpConn 's implementation
func (c *reconnectingUDPConn) Close() error {
close(c.closeChan)
// acquire rw lock before closing conn to ensure calls to Write drain
c.connMtx.Lock()
defer c.connMtx.Unlock()
if c.conn != nil {
return c.conn.Close()
}
return nil
}
// SetWriteBuffer defers to the net.udpConn SetWriteBuffer implementation wrapped with a RLock. if no conn is currently held
// and SetWriteBuffer is called store bufferBytes to be set for new conns
func (c *reconnectingUDPConn) SetWriteBuffer(bytes int) error {
var err error
c.connMtx.RLock()
if c.conn != nil {
err = c.conn.SetWriteBuffer(bytes)
}
c.connMtx.RUnlock()
if err == nil {
atomic.StoreInt64(&c.bufferBytes, int64(bytes))
}
return err
}

View File

@ -19,7 +19,9 @@ import (
"fmt"
"io"
"net"
"time"
"github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-client-go/thrift"
"github.com/uber/jaeger-client-go/thrift-gen/agent"
@ -35,41 +37,90 @@ type AgentClientUDP struct {
agent.Agent
io.Closer
connUDP *net.UDPConn
connUDP udpConn
client *agent.AgentClient
maxPacketSize int // max size of datagram in bytes
thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
}
// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP.
func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) {
if maxPacketSize == 0 {
maxPacketSize = UDPPacketMaxLength
type udpConn interface {
Write([]byte) (int, error)
SetWriteBuffer(int) error
Close() error
}
// AgentClientUDPParams allows specifying options for initializing an AgentClientUDP. An instance of this struct should
// be passed to NewAgentClientUDPWithParams.
type AgentClientUDPParams struct {
HostPort string
MaxPacketSize int
Logger log.Logger
DisableAttemptReconnecting bool
AttemptReconnectInterval time.Duration
}
// NewAgentClientUDPWithParams creates a client that sends spans to Jaeger Agent over UDP.
func NewAgentClientUDPWithParams(params AgentClientUDPParams) (*AgentClientUDP, error) {
// validate hostport
if _, _, err := net.SplitHostPort(params.HostPort); err != nil {
return nil, err
}
thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize)
if params.MaxPacketSize == 0 {
params.MaxPacketSize = UDPPacketMaxLength
}
if params.Logger == nil {
params.Logger = log.StdLogger
}
if !params.DisableAttemptReconnecting && params.AttemptReconnectInterval == 0 {
params.AttemptReconnectInterval = time.Second * 30
}
thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
protocolFactory := thrift.NewTCompactProtocolFactory()
client := agent.NewAgentClientFactory(thriftBuffer, protocolFactory)
destAddr, err := net.ResolveUDPAddr("udp", hostPort)
if err != nil {
var connUDP udpConn
var err error
if params.DisableAttemptReconnecting {
destAddr, err := net.ResolveUDPAddr("udp", params.HostPort)
if err != nil {
return nil, err
}
connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr)
if err != nil {
return nil, err
}
} else {
// host is hostname, setup resolver loop in case host record changes during operation
connUDP, err = newReconnectingUDPConn(params.HostPort, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger)
if err != nil {
return nil, err
}
}
if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil {
return nil, err
}
connUDP, err := net.DialUDP(destAddr.Network(), nil, destAddr)
if err != nil {
return nil, err
}
if err := connUDP.SetWriteBuffer(maxPacketSize); err != nil {
return nil, err
}
clientUDP := &AgentClientUDP{
return &AgentClientUDP{
connUDP: connUDP,
client: client,
maxPacketSize: maxPacketSize,
thriftBuffer: thriftBuffer}
return clientUDP, nil
maxPacketSize: params.MaxPacketSize,
thriftBuffer: thriftBuffer,
}, nil
}
// NewAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP.
func NewAgentClientUDP(hostPort string, maxPacketSize int) (*AgentClientUDP, error) {
return NewAgentClientUDPWithParams(AgentClientUDPParams{
HostPort: hostPort,
MaxPacketSize: maxPacketSize,
})
}
// EmitZipkinBatch implements EmitZipkinBatch() of Agent interface

2
vendor/modules.txt vendored
View File

@ -500,7 +500,7 @@ github.com/stretchr/testify/require
github.com/syndtr/gocapability/capability
# github.com/tchap/go-patricia v2.3.0+incompatible
github.com/tchap/go-patricia/patricia
# github.com/uber/jaeger-client-go v2.24.0+incompatible
# github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-client-go
github.com/uber/jaeger-client-go/config
github.com/uber/jaeger-client-go/internal/baggage