Add reconnecting udp connection type to jaeger exporter (#1063)
* port reconnecting udp client from jaeger-client-go * Fix precommit issues * Fix license check * Add initial value for max packet size * Fix for atomic usage on 386 arch * Modify reconnecting option to an affirmative * Add changelog entry * Dont hold rlock for writes Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
parent
e3abf31b0b
commit
a304e8280d
|
|
@ -8,6 +8,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- Add reconnecting udp connection type to Jaeger exporter.
|
||||
This change adds a new optional implementation of the udp conn interface used to detect changes to an agent's host dns record.
|
||||
It then adopts the new destination address to ensure the exporter doesn't get stuck. This change was ported from jaegertracing/jaeger-client-go#520. (#1063)
|
||||
|
||||
## [0.11.0] - 2020-08-24
|
||||
|
||||
### Added
|
||||
|
|
|
|||
|
|
@ -115,6 +115,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
|||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
|
||||
|
|
|
|||
|
|
@ -17,7 +17,9 @@ package jaeger
|
|||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/apache/thrift/lib/go/thrift"
|
||||
|
||||
|
|
@ -32,41 +34,76 @@ type agentClientUDP struct {
|
|||
gen.Agent
|
||||
io.Closer
|
||||
|
||||
connUDP *net.UDPConn
|
||||
connUDP udpConn
|
||||
client *gen.AgentClient
|
||||
maxPacketSize int // max size of datagram in bytes
|
||||
thriftBuffer *thrift.TMemoryBuffer // buffer used to calculate byte size of a span
|
||||
}
|
||||
|
||||
type udpConn interface {
|
||||
Write([]byte) (int, error)
|
||||
SetWriteBuffer(int) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
type agentClientUDPParams struct {
|
||||
HostPort string
|
||||
MaxPacketSize int
|
||||
Logger *log.Logger
|
||||
AttemptReconnecting bool
|
||||
AttemptReconnectInterval time.Duration
|
||||
}
|
||||
|
||||
// newAgentClientUDP creates a client that sends spans to Jaeger Agent over UDP.
|
||||
func newAgentClientUDP(hostPort string, maxPacketSize int) (*agentClientUDP, error) {
|
||||
if maxPacketSize == 0 {
|
||||
maxPacketSize = udpPacketMaxLength
|
||||
func newAgentClientUDP(params agentClientUDPParams) (*agentClientUDP, error) {
|
||||
// validate hostport
|
||||
if _, _, err := net.SplitHostPort(params.HostPort); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize)
|
||||
if params.MaxPacketSize <= 0 {
|
||||
params.MaxPacketSize = udpPacketMaxLength
|
||||
}
|
||||
|
||||
if params.AttemptReconnecting && params.AttemptReconnectInterval <= 0 {
|
||||
params.AttemptReconnectInterval = time.Second * 30
|
||||
}
|
||||
|
||||
thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
|
||||
protocolFactory := thrift.NewTCompactProtocolFactory()
|
||||
client := gen.NewAgentClientFactory(thriftBuffer, protocolFactory)
|
||||
|
||||
destAddr, err := net.ResolveUDPAddr("udp", hostPort)
|
||||
if err != nil {
|
||||
var connUDP udpConn
|
||||
var err error
|
||||
|
||||
if params.AttemptReconnecting {
|
||||
// host is hostname, setup resolver loop in case host record changes during operation
|
||||
connUDP, err = newReconnectingUDPConn(params.HostPort, params.MaxPacketSize, params.AttemptReconnectInterval, net.ResolveUDPAddr, net.DialUDP, params.Logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
destAddr, err := net.ResolveUDPAddr("udp", params.HostPort)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
connUDP, err = net.DialUDP(destAddr.Network(), nil, destAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := connUDP.SetWriteBuffer(params.MaxPacketSize); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
connUDP, err := net.DialUDP(destAddr.Network(), nil, destAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := connUDP.SetWriteBuffer(maxPacketSize); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
clientUDP := &agentClientUDP{
|
||||
return &agentClientUDP{
|
||||
connUDP: connUDP,
|
||||
client: client,
|
||||
maxPacketSize: maxPacketSize,
|
||||
thriftBuffer: thriftBuffer}
|
||||
return clientUDP, nil
|
||||
maxPacketSize: params.MaxPacketSize,
|
||||
thriftBuffer: thriftBuffer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// EmitBatch implements EmitBatch() of Agent interface
|
||||
|
|
|
|||
|
|
@ -0,0 +1,94 @@
|
|||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package jaeger
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewAgentClientUDPWithParamsBadHostport(t *testing.T) {
|
||||
hostPort := "blahblah"
|
||||
|
||||
agentClient, err := newAgentClientUDP(agentClientUDPParams{
|
||||
HostPort: hostPort,
|
||||
})
|
||||
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, agentClient)
|
||||
}
|
||||
|
||||
func TestNewAgentClientUDPWithParams(t *testing.T) {
|
||||
mockServer, err := newUDPListener()
|
||||
require.NoError(t, err)
|
||||
defer mockServer.Close()
|
||||
|
||||
agentClient, err := newAgentClientUDP(agentClientUDPParams{
|
||||
HostPort: mockServer.LocalAddr().String(),
|
||||
MaxPacketSize: 25000,
|
||||
AttemptReconnecting: true,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, agentClient)
|
||||
assert.Equal(t, 25000, agentClient.maxPacketSize)
|
||||
|
||||
if assert.IsType(t, &reconnectingUDPConn{}, agentClient.connUDP) {
|
||||
assert.Equal(t, (*log.Logger)(nil), agentClient.connUDP.(*reconnectingUDPConn).logger)
|
||||
}
|
||||
|
||||
assert.NoError(t, agentClient.Close())
|
||||
}
|
||||
|
||||
func TestNewAgentClientUDPWithParamsDefaults(t *testing.T) {
|
||||
mockServer, err := newUDPListener()
|
||||
require.NoError(t, err)
|
||||
defer mockServer.Close()
|
||||
|
||||
agentClient, err := newAgentClientUDP(agentClientUDPParams{
|
||||
HostPort: mockServer.LocalAddr().String(),
|
||||
AttemptReconnecting: true,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, agentClient)
|
||||
assert.Equal(t, udpPacketMaxLength, agentClient.maxPacketSize)
|
||||
|
||||
if assert.IsType(t, &reconnectingUDPConn{}, agentClient.connUDP) {
|
||||
assert.Equal(t, (*log.Logger)(nil), agentClient.connUDP.(*reconnectingUDPConn).logger)
|
||||
}
|
||||
|
||||
assert.NoError(t, agentClient.Close())
|
||||
}
|
||||
|
||||
func TestNewAgentClientUDPWithParamsReconnectingDisabled(t *testing.T) {
|
||||
mockServer, err := newUDPListener()
|
||||
require.NoError(t, err)
|
||||
defer mockServer.Close()
|
||||
|
||||
agentClient, err := newAgentClientUDP(agentClientUDPParams{
|
||||
HostPort: mockServer.LocalAddr().String(),
|
||||
Logger: nil,
|
||||
AttemptReconnecting: false,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, agentClient)
|
||||
assert.Equal(t, udpPacketMaxLength, agentClient.maxPacketSize)
|
||||
|
||||
assert.IsType(t, &net.UDPConn{}, agentClient.connUDP)
|
||||
|
||||
assert.NoError(t, agentClient.Close())
|
||||
}
|
||||
|
|
@ -115,6 +115,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
|||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
|
||||
|
|
|
|||
|
|
@ -0,0 +1,202 @@
|
|||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package jaeger
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// reconnectingUDPConn is an implementation of udpConn that resolves hostPort every resolveTimeout, if the resolved address is
|
||||
// different than the current conn then the new address is dialed and the conn is swapped.
|
||||
type reconnectingUDPConn struct {
|
||||
// `sync/atomic` expects the first word in an allocated struct to be 64-bit
|
||||
// aligned on both ARM and x86-32. See https://goo.gl/zW7dgq for more details.
|
||||
bufferBytes int64
|
||||
hostPort string
|
||||
resolveFunc resolveFunc
|
||||
dialFunc dialFunc
|
||||
logger *log.Logger
|
||||
|
||||
connMtx sync.RWMutex
|
||||
conn *net.UDPConn
|
||||
destAddr *net.UDPAddr
|
||||
closeChan chan struct{}
|
||||
}
|
||||
|
||||
type resolveFunc func(network string, hostPort string) (*net.UDPAddr, error)
|
||||
type dialFunc func(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error)
|
||||
|
||||
// newReconnectingUDPConn returns a new udpConn that resolves hostPort every resolveTimeout, if the resolved address is
|
||||
// different than the current conn then the new address is dialed and the conn is swapped.
|
||||
func newReconnectingUDPConn(hostPort string, bufferBytes int, resolveTimeout time.Duration, resolveFunc resolveFunc, dialFunc dialFunc, logger *log.Logger) (*reconnectingUDPConn, error) {
|
||||
conn := &reconnectingUDPConn{
|
||||
hostPort: hostPort,
|
||||
resolveFunc: resolveFunc,
|
||||
dialFunc: dialFunc,
|
||||
logger: logger,
|
||||
closeChan: make(chan struct{}),
|
||||
bufferBytes: int64(bufferBytes),
|
||||
}
|
||||
|
||||
if err := conn.attemptResolveAndDial(); err != nil {
|
||||
conn.logf("failed resolving destination address on connection startup, with err: %q. retrying in %s", err.Error(), resolveTimeout)
|
||||
}
|
||||
|
||||
go conn.reconnectLoop(resolveTimeout)
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (c *reconnectingUDPConn) logf(format string, args ...interface{}) {
|
||||
if c.logger != nil {
|
||||
c.logger.Printf(format, args...)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *reconnectingUDPConn) reconnectLoop(resolveTimeout time.Duration) {
|
||||
ticker := time.NewTicker(resolveTimeout)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.closeChan:
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := c.attemptResolveAndDial(); err != nil {
|
||||
c.logf("%s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *reconnectingUDPConn) attemptResolveAndDial() error {
|
||||
newAddr, err := c.resolveFunc("udp", c.hostPort)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to resolve new addr for host %q, with err: %w", c.hostPort, err)
|
||||
}
|
||||
|
||||
c.connMtx.RLock()
|
||||
curAddr := c.destAddr
|
||||
c.connMtx.RUnlock()
|
||||
|
||||
// dont attempt dial if an addr was successfully dialed previously and, resolved addr is the same as current conn
|
||||
if curAddr != nil && newAddr.String() == curAddr.String() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := c.attemptDialNewAddr(newAddr); err != nil {
|
||||
return fmt.Errorf("failed to dial newly resolved addr '%s', with err: %w", newAddr, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *reconnectingUDPConn) attemptDialNewAddr(newAddr *net.UDPAddr) error {
|
||||
connUDP, err := c.dialFunc(newAddr.Network(), nil, newAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if bufferBytes := int(atomic.LoadInt64(&c.bufferBytes)); bufferBytes != 0 {
|
||||
if err = connUDP.SetWriteBuffer(bufferBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
c.connMtx.Lock()
|
||||
c.destAddr = newAddr
|
||||
// store prev to close later
|
||||
prevConn := c.conn
|
||||
c.conn = connUDP
|
||||
c.connMtx.Unlock()
|
||||
|
||||
if prevConn != nil {
|
||||
return prevConn.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write calls net.udpConn.Write, if it fails an attempt is made to connect to a new addr, if that succeeds the write is retried before returning
|
||||
func (c *reconnectingUDPConn) Write(b []byte) (int, error) {
|
||||
var bytesWritten int
|
||||
var err error
|
||||
|
||||
c.connMtx.RLock()
|
||||
conn := c.conn
|
||||
c.connMtx.RUnlock()
|
||||
|
||||
if conn == nil {
|
||||
// if connection is not initialized indicate this with err in order to hook into retry logic
|
||||
err = fmt.Errorf("UDP connection not yet initialized, an address has not been resolved")
|
||||
} else {
|
||||
bytesWritten, err = conn.Write(b)
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
return bytesWritten, nil
|
||||
}
|
||||
|
||||
// attempt to resolve and dial new address in case that's the problem, if resolve and dial succeeds, try write again
|
||||
if reconnErr := c.attemptResolveAndDial(); reconnErr == nil {
|
||||
c.connMtx.RLock()
|
||||
conn := c.conn
|
||||
c.connMtx.RUnlock()
|
||||
|
||||
return conn.Write(b)
|
||||
}
|
||||
|
||||
// return original error if reconn fails
|
||||
return bytesWritten, err
|
||||
}
|
||||
|
||||
// Close stops the reconnectLoop, then closes the connection via net.udpConn 's implementation
|
||||
func (c *reconnectingUDPConn) Close() error {
|
||||
close(c.closeChan)
|
||||
|
||||
// acquire rw lock before closing conn to ensure calls to Write drain
|
||||
c.connMtx.Lock()
|
||||
defer c.connMtx.Unlock()
|
||||
|
||||
if c.conn != nil {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetWriteBuffer defers to the net.udpConn SetWriteBuffer implementation wrapped with a RLock. if no conn is currently held
|
||||
// and SetWriteBuffer is called store bufferBytes to be set for new conns
|
||||
func (c *reconnectingUDPConn) SetWriteBuffer(bytes int) error {
|
||||
var err error
|
||||
|
||||
c.connMtx.RLock()
|
||||
conn := c.conn
|
||||
c.connMtx.RUnlock()
|
||||
|
||||
if conn != nil {
|
||||
err = c.conn.SetWriteBuffer(bytes)
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
atomic.StoreInt64(&c.bufferBytes, int64(bytes))
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
@ -0,0 +1,462 @@
|
|||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package jaeger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"runtime"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockResolver struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *mockResolver) ResolveUDPAddr(network string, hostPort string) (*net.UDPAddr, error) {
|
||||
args := m.Called(network, hostPort)
|
||||
|
||||
a0 := args.Get(0)
|
||||
if a0 == nil {
|
||||
return (*net.UDPAddr)(nil), args.Error(1)
|
||||
}
|
||||
return a0.(*net.UDPAddr), args.Error(1)
|
||||
}
|
||||
|
||||
type mockDialer struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *mockDialer) DialUDP(network string, laddr, raddr *net.UDPAddr) (*net.UDPConn, error) {
|
||||
args := m.Called(network, laddr, raddr)
|
||||
|
||||
a0 := args.Get(0)
|
||||
if a0 == nil {
|
||||
return (*net.UDPConn)(nil), args.Error(1)
|
||||
}
|
||||
|
||||
return a0.(*net.UDPConn), args.Error(1)
|
||||
}
|
||||
|
||||
func newUDPListener() (net.PacketConn, error) {
|
||||
return net.ListenPacket("udp", "127.0.0.1:0")
|
||||
}
|
||||
|
||||
func newUDPConn() (net.PacketConn, *net.UDPConn, error) {
|
||||
mockServer, err := newUDPListener()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
addr, err := net.ResolveUDPAddr("udp", mockServer.LocalAddr().String())
|
||||
if err != nil {
|
||||
mockServer.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
conn, err := net.DialUDP("udp", nil, addr)
|
||||
if err != nil {
|
||||
mockServer.Close()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return mockServer, conn, nil
|
||||
}
|
||||
|
||||
func assertSockBufferSize(t *testing.T, expectedBytes int, conn *net.UDPConn) bool {
|
||||
fd, _ := conn.File()
|
||||
bufferBytes, _ := syscall.GetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_SNDBUF)
|
||||
|
||||
// The linux kernel doubles SO_SNDBUF value (to allow space for bookkeeping overhead) when it is set using setsockopt(2), and this doubled value is returned by getsockopt(2)
|
||||
// https://linux.die.net/man/7/socket
|
||||
if runtime.GOOS == "linux" {
|
||||
return assert.GreaterOrEqual(t, expectedBytes*2, bufferBytes)
|
||||
}
|
||||
|
||||
return assert.Equal(t, expectedBytes, bufferBytes)
|
||||
}
|
||||
|
||||
func assertConnWritable(t *testing.T, conn udpConn, serverConn net.PacketConn) {
|
||||
expectedString := "yo this is a test"
|
||||
_, err := conn.Write([]byte(expectedString))
|
||||
require.NoError(t, err)
|
||||
|
||||
var buf = make([]byte, len(expectedString))
|
||||
err = serverConn.SetReadDeadline(time.Now().Add(time.Second))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, err = serverConn.ReadFrom(buf)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []byte(expectedString), buf)
|
||||
}
|
||||
|
||||
func waitForCallWithTimeout(call *mock.Call) bool {
|
||||
called := make(chan struct{})
|
||||
call.Run(func(args mock.Arguments) {
|
||||
close(called)
|
||||
})
|
||||
|
||||
var wasCalled bool
|
||||
// wait at most 100 milliseconds for the second call of ResolveUDPAddr that is supposed to fail
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
|
||||
select {
|
||||
case <-called:
|
||||
wasCalled = true
|
||||
case <-ctx.Done():
|
||||
fmt.Println("timed out")
|
||||
}
|
||||
cancel()
|
||||
|
||||
return wasCalled
|
||||
}
|
||||
|
||||
func waitForConnCondition(conn *reconnectingUDPConn, condition func(conn *reconnectingUDPConn) bool) bool {
|
||||
var conditionVal bool
|
||||
for i := 0; i < 10; i++ {
|
||||
conn.connMtx.RLock()
|
||||
conditionVal = condition(conn)
|
||||
conn.connMtx.RUnlock()
|
||||
if conditionVal || i >= 9 {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
|
||||
return conditionVal
|
||||
}
|
||||
|
||||
func newMockUDPAddr(t *testing.T, port int) *net.UDPAddr {
|
||||
var buf = make([]byte, 4)
|
||||
// random is not seeded to ensure tests are deterministic (also doesnt matter if ip is valid)
|
||||
_, err := rand.Read(buf)
|
||||
require.NoError(t, err)
|
||||
|
||||
return &net.UDPAddr{
|
||||
IP: net.IPv4(buf[0], buf[1], buf[2], buf[3]),
|
||||
Port: port,
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewResolvedUDPConn(t *testing.T) {
|
||||
hostPort := "blahblah:34322"
|
||||
|
||||
mockServer, clientConn, err := newUDPConn()
|
||||
require.NoError(t, err)
|
||||
defer mockServer.Close()
|
||||
|
||||
mockUDPAddr := newMockUDPAddr(t, 34322)
|
||||
|
||||
resolver := mockResolver{}
|
||||
resolver.
|
||||
On("ResolveUDPAddr", "udp", hostPort).
|
||||
Return(mockUDPAddr, nil).
|
||||
Once()
|
||||
|
||||
dialer := mockDialer{}
|
||||
dialer.
|
||||
On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr).
|
||||
Return(clientConn, nil).
|
||||
Once()
|
||||
|
||||
conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Hour, resolver.ResolveUDPAddr, dialer.DialUDP, nil)
|
||||
assert.NoError(t, err)
|
||||
require.NotNil(t, conn)
|
||||
|
||||
err = conn.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// assert the actual connection was closed
|
||||
assert.Error(t, clientConn.Close())
|
||||
|
||||
resolver.AssertExpectations(t)
|
||||
dialer.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestResolvedUDPConnWrites(t *testing.T) {
|
||||
hostPort := "blahblah:34322"
|
||||
|
||||
mockServer, clientConn, err := newUDPConn()
|
||||
require.NoError(t, err)
|
||||
defer mockServer.Close()
|
||||
|
||||
mockUDPAddr := newMockUDPAddr(t, 34322)
|
||||
|
||||
resolver := mockResolver{}
|
||||
resolver.
|
||||
On("ResolveUDPAddr", "udp", hostPort).
|
||||
Return(mockUDPAddr, nil).
|
||||
Once()
|
||||
|
||||
dialer := mockDialer{}
|
||||
dialer.
|
||||
On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr).
|
||||
Return(clientConn, nil).
|
||||
Once()
|
||||
|
||||
conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Hour, resolver.ResolveUDPAddr, dialer.DialUDP, nil)
|
||||
assert.NoError(t, err)
|
||||
require.NotNil(t, conn)
|
||||
|
||||
assertConnWritable(t, conn, mockServer)
|
||||
|
||||
err = conn.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// assert the actual connection was closed
|
||||
assert.Error(t, clientConn.Close())
|
||||
|
||||
resolver.AssertExpectations(t)
|
||||
dialer.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestResolvedUDPConnEventuallyDials(t *testing.T) {
|
||||
hostPort := "blahblah:34322"
|
||||
|
||||
mockServer, clientConn, err := newUDPConn()
|
||||
require.NoError(t, err)
|
||||
defer mockServer.Close()
|
||||
|
||||
mockUDPAddr := newMockUDPAddr(t, 34322)
|
||||
|
||||
resolver := mockResolver{}
|
||||
resolver.
|
||||
On("ResolveUDPAddr", "udp", hostPort).
|
||||
Return(nil, fmt.Errorf("failed to resolve")).Once().
|
||||
On("ResolveUDPAddr", "udp", hostPort).
|
||||
Return(mockUDPAddr, nil)
|
||||
|
||||
dialer := mockDialer{}
|
||||
dialCall := dialer.
|
||||
On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr).
|
||||
Return(clientConn, nil).Once()
|
||||
|
||||
conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, nil)
|
||||
assert.NoError(t, err)
|
||||
require.NotNil(t, conn)
|
||||
|
||||
err = conn.SetWriteBuffer(udpPacketMaxLength)
|
||||
assert.NoError(t, err)
|
||||
|
||||
wasCalled := waitForCallWithTimeout(dialCall)
|
||||
assert.True(t, wasCalled)
|
||||
|
||||
connEstablished := waitForConnCondition(conn, func(conn *reconnectingUDPConn) bool {
|
||||
return conn.conn != nil
|
||||
})
|
||||
|
||||
assert.True(t, connEstablished)
|
||||
|
||||
assertConnWritable(t, conn, mockServer)
|
||||
assertSockBufferSize(t, udpPacketMaxLength, clientConn)
|
||||
|
||||
err = conn.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// assert the actual connection was closed
|
||||
assert.Error(t, clientConn.Close())
|
||||
|
||||
resolver.AssertExpectations(t)
|
||||
dialer.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestResolvedUDPConnNoSwapIfFail(t *testing.T) {
|
||||
hostPort := "blahblah:34322"
|
||||
|
||||
mockServer, clientConn, err := newUDPConn()
|
||||
require.NoError(t, err)
|
||||
defer mockServer.Close()
|
||||
|
||||
mockUDPAddr := newMockUDPAddr(t, 34322)
|
||||
|
||||
resolver := mockResolver{}
|
||||
resolver.
|
||||
On("ResolveUDPAddr", "udp", hostPort).
|
||||
Return(mockUDPAddr, nil).Once()
|
||||
|
||||
failCall := resolver.On("ResolveUDPAddr", "udp", hostPort).
|
||||
Return(nil, fmt.Errorf("resolve failed"))
|
||||
|
||||
dialer := mockDialer{}
|
||||
dialer.
|
||||
On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr).
|
||||
Return(clientConn, nil).Once()
|
||||
|
||||
conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, nil)
|
||||
assert.NoError(t, err)
|
||||
require.NotNil(t, conn)
|
||||
|
||||
wasCalled := waitForCallWithTimeout(failCall)
|
||||
|
||||
assert.True(t, wasCalled)
|
||||
|
||||
assertConnWritable(t, conn, mockServer)
|
||||
|
||||
err = conn.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// assert the actual connection was closed
|
||||
assert.Error(t, clientConn.Close())
|
||||
|
||||
resolver.AssertExpectations(t)
|
||||
dialer.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestResolvedUDPConnWriteRetry(t *testing.T) {
|
||||
hostPort := "blahblah:34322"
|
||||
|
||||
mockServer, clientConn, err := newUDPConn()
|
||||
require.NoError(t, err)
|
||||
defer mockServer.Close()
|
||||
|
||||
mockUDPAddr := newMockUDPAddr(t, 34322)
|
||||
|
||||
resolver := mockResolver{}
|
||||
resolver.
|
||||
On("ResolveUDPAddr", "udp", hostPort).
|
||||
Return(nil, fmt.Errorf("failed to resolve")).Once().
|
||||
On("ResolveUDPAddr", "udp", hostPort).
|
||||
Return(mockUDPAddr, nil).Once()
|
||||
|
||||
dialer := mockDialer{}
|
||||
dialer.
|
||||
On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr).
|
||||
Return(clientConn, nil).Once()
|
||||
|
||||
conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, nil)
|
||||
assert.NoError(t, err)
|
||||
require.NotNil(t, conn)
|
||||
|
||||
err = conn.SetWriteBuffer(udpPacketMaxLength)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assertConnWritable(t, conn, mockServer)
|
||||
assertSockBufferSize(t, udpPacketMaxLength, clientConn)
|
||||
|
||||
err = conn.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// assert the actual connection was closed
|
||||
assert.Error(t, clientConn.Close())
|
||||
|
||||
resolver.AssertExpectations(t)
|
||||
dialer.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestResolvedUDPConnWriteRetryFails(t *testing.T) {
|
||||
hostPort := "blahblah:34322"
|
||||
|
||||
resolver := mockResolver{}
|
||||
resolver.
|
||||
On("ResolveUDPAddr", "udp", hostPort).
|
||||
Return(nil, fmt.Errorf("failed to resolve")).Twice()
|
||||
|
||||
dialer := mockDialer{}
|
||||
|
||||
conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, nil)
|
||||
assert.NoError(t, err)
|
||||
require.NotNil(t, conn)
|
||||
|
||||
err = conn.SetWriteBuffer(udpPacketMaxLength)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = conn.Write([]byte("yo this is a test"))
|
||||
|
||||
assert.Error(t, err)
|
||||
|
||||
err = conn.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
resolver.AssertExpectations(t)
|
||||
dialer.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestResolvedUDPConnChanges(t *testing.T) {
|
||||
hostPort := "blahblah:34322"
|
||||
|
||||
mockServer, clientConn, err := newUDPConn()
|
||||
require.NoError(t, err)
|
||||
defer mockServer.Close()
|
||||
|
||||
mockUDPAddr1 := newMockUDPAddr(t, 34322)
|
||||
|
||||
mockServer2, clientConn2, err := newUDPConn()
|
||||
require.NoError(t, err)
|
||||
defer mockServer2.Close()
|
||||
|
||||
mockUDPAddr2 := newMockUDPAddr(t, 34322)
|
||||
|
||||
// ensure address doesn't duplicate mockUDPAddr1
|
||||
for i := 0; i < 10 && mockUDPAddr2.IP.Equal(mockUDPAddr1.IP); i++ {
|
||||
mockUDPAddr2 = newMockUDPAddr(t, 34322)
|
||||
}
|
||||
|
||||
// this is really unlikely to ever fail the test, but its here as a safeguard
|
||||
require.False(t, mockUDPAddr2.IP.Equal(mockUDPAddr1.IP))
|
||||
|
||||
resolver := mockResolver{}
|
||||
resolver.
|
||||
On("ResolveUDPAddr", "udp", hostPort).
|
||||
Return(mockUDPAddr1, nil).Once().
|
||||
On("ResolveUDPAddr", "udp", hostPort).
|
||||
Return(mockUDPAddr2, nil)
|
||||
|
||||
dialer := mockDialer{}
|
||||
dialer.
|
||||
On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr1).
|
||||
Return(clientConn, nil).Once()
|
||||
|
||||
secondDial := dialer.
|
||||
On("DialUDP", "udp", (*net.UDPAddr)(nil), mockUDPAddr2).
|
||||
Return(clientConn2, nil).Once()
|
||||
|
||||
conn, err := newReconnectingUDPConn(hostPort, udpPacketMaxLength, time.Millisecond*10, resolver.ResolveUDPAddr, dialer.DialUDP, nil)
|
||||
assert.NoError(t, err)
|
||||
require.NotNil(t, conn)
|
||||
|
||||
err = conn.SetWriteBuffer(udpPacketMaxLength)
|
||||
assert.NoError(t, err)
|
||||
|
||||
wasCalled := waitForCallWithTimeout(secondDial)
|
||||
assert.True(t, wasCalled)
|
||||
|
||||
connSwapped := waitForConnCondition(conn, func(conn *reconnectingUDPConn) bool {
|
||||
return conn.conn == clientConn2
|
||||
})
|
||||
|
||||
assert.True(t, connSwapped)
|
||||
|
||||
assertConnWritable(t, conn, mockServer2)
|
||||
assertSockBufferSize(t, udpPacketMaxLength, clientConn2)
|
||||
|
||||
err = conn.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// assert the prev connection was closed
|
||||
assert.Error(t, clientConn.Close())
|
||||
|
||||
// assert the actual connection was closed
|
||||
assert.Error(t, clientConn2.Close())
|
||||
|
||||
resolver.AssertExpectations(t)
|
||||
dialer.AssertExpectations(t)
|
||||
}
|
||||
|
|
@ -20,7 +20,9 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/apache/thrift/lib/go/thrift"
|
||||
|
||||
|
|
@ -36,13 +38,24 @@ type EndpointOption func() (batchUploader, error)
|
|||
|
||||
// WithAgentEndpoint instructs exporter to send spans to jaeger-agent at this address.
|
||||
// For example, localhost:6831.
|
||||
func WithAgentEndpoint(agentEndpoint string) EndpointOption {
|
||||
func WithAgentEndpoint(agentEndpoint string, options ...AgentEndpointOption) EndpointOption {
|
||||
return func() (batchUploader, error) {
|
||||
if agentEndpoint == "" {
|
||||
return nil, errors.New("agentEndpoint must not be empty")
|
||||
}
|
||||
|
||||
client, err := newAgentClientUDP(agentEndpoint, udpPacketMaxLength)
|
||||
o := &AgentEndpointOptions{
|
||||
agentClientUDPParams{
|
||||
HostPort: agentEndpoint,
|
||||
AttemptReconnecting: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, opt := range options {
|
||||
opt(o)
|
||||
}
|
||||
|
||||
client, err := newAgentClientUDP(o.agentClientUDPParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -51,6 +64,33 @@ func WithAgentEndpoint(agentEndpoint string) EndpointOption {
|
|||
}
|
||||
}
|
||||
|
||||
type AgentEndpointOption func(o *AgentEndpointOptions)
|
||||
|
||||
type AgentEndpointOptions struct {
|
||||
agentClientUDPParams
|
||||
}
|
||||
|
||||
// WithLogger sets a logger to be used by agent client.
|
||||
func WithLogger(logger *log.Logger) AgentEndpointOption {
|
||||
return func(o *AgentEndpointOptions) {
|
||||
o.Logger = logger
|
||||
}
|
||||
}
|
||||
|
||||
// WithDisableAttemptReconnecting sets option to disable reconnecting udp client.
|
||||
func WithDisableAttemptReconnecting() AgentEndpointOption {
|
||||
return func(o *AgentEndpointOptions) {
|
||||
o.AttemptReconnecting = false
|
||||
}
|
||||
}
|
||||
|
||||
// WithAttemptReconnectingInterval sets the interval between attempts to re resolve agent endpoint.
|
||||
func WithAttemptReconnectingInterval(interval time.Duration) AgentEndpointOption {
|
||||
return func(o *AgentEndpointOptions) {
|
||||
o.AttemptReconnectInterval = interval
|
||||
}
|
||||
}
|
||||
|
||||
// WithCollectorEndpoint defines the full url to the Jaeger HTTP Thrift collector.
|
||||
// For example, http://localhost:14268/api/traces
|
||||
func WithCollectorEndpoint(collectorEndpoint string, options ...CollectorEndpointOption) EndpointOption {
|
||||
|
|
|
|||
Loading…
Reference in New Issue