515 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			515 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
| // Copyright The OpenTelemetry Authors
 | |
| // SPDX-License-Identifier: Apache-2.0
 | |
| 
 | |
| package confmap
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"reflect"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"testing"
 | |
| 
 | |
| 	"github.com/stretchr/testify/assert"
 | |
| 	"github.com/stretchr/testify/require"
 | |
| 	"go.uber.org/zap"
 | |
| 	yaml "sigs.k8s.io/yaml/goyaml.v3"
 | |
| 
 | |
| 	"go.opentelemetry.io/collector/featuregate"
 | |
| )
 | |
| 
 | |
| type mockProvider struct {
 | |
| 	scheme    string
 | |
| 	retM      any
 | |
| 	errR      error
 | |
| 	errS      error
 | |
| 	errW      error
 | |
| 	closeFunc func(ctx context.Context) error
 | |
| }
 | |
| 
 | |
| func (m *mockProvider) Retrieve(_ context.Context, _ string, watcher WatcherFunc) (*Retrieved, error) {
 | |
| 	if m.errR != nil {
 | |
| 		return nil, m.errR
 | |
| 	}
 | |
| 	if m.retM == nil {
 | |
| 		return NewRetrieved(nil)
 | |
| 	}
 | |
| 
 | |
| 	watcher(&ChangeEvent{Error: m.errW})
 | |
| 	return NewRetrieved(m.retM, WithRetrievedClose(m.closeFunc))
 | |
| }
 | |
| 
 | |
| func (m *mockProvider) Scheme() string {
 | |
| 	if m.scheme == "" {
 | |
| 		return "mock"
 | |
| 	}
 | |
| 	return m.scheme
 | |
| }
 | |
| 
 | |
| func (m *mockProvider) Shutdown(context.Context) error {
 | |
| 	return m.errS
 | |
| }
 | |
| 
 | |
| func newMockProvider(m *mockProvider) ProviderFactory {
 | |
| 	return NewProviderFactory(func(_ ProviderSettings) Provider {
 | |
| 		return m
 | |
| 	})
 | |
| }
 | |
| 
 | |
| type fakeProvider struct {
 | |
| 	scheme string
 | |
| 	ret    func(ctx context.Context, uri string, watcher WatcherFunc) (*Retrieved, error)
 | |
| 	logger *zap.Logger
 | |
| }
 | |
| 
 | |
| func newFileProvider(tb testing.TB) ProviderFactory {
 | |
| 	return newFakeProvider("file", func(_ context.Context, uri string, _ WatcherFunc) (*Retrieved, error) {
 | |
| 		return NewRetrieved(newConfFromFile(tb, uri[5:]))
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func newFakeProvider(scheme string, ret func(ctx context.Context, uri string, watcher WatcherFunc) (*Retrieved, error)) ProviderFactory {
 | |
| 	return NewProviderFactory(func(ps ProviderSettings) Provider {
 | |
| 		return &fakeProvider{
 | |
| 			scheme: scheme,
 | |
| 			ret:    ret,
 | |
| 			logger: ps.Logger,
 | |
| 		}
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func newObservableFileProvider(tb testing.TB) (ProviderFactory, *fakeProvider) {
 | |
| 	return newObservableProvider("file", func(_ context.Context, uri string, _ WatcherFunc) (*Retrieved, error) {
 | |
| 		return NewRetrieved(newConfFromFile(tb, uri[5:]))
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func newObservableProvider(scheme string, ret func(ctx context.Context, uri string, watcher WatcherFunc) (*Retrieved, error)) (ProviderFactory, *fakeProvider) {
 | |
| 	fp := &fakeProvider{
 | |
| 		scheme: scheme,
 | |
| 		ret:    ret,
 | |
| 	}
 | |
| 	return NewProviderFactory(func(ps ProviderSettings) Provider {
 | |
| 		fp.logger = ps.Logger
 | |
| 		return fp
 | |
| 	}), fp
 | |
| }
 | |
| 
 | |
| func (f *fakeProvider) Retrieve(ctx context.Context, uri string, watcher WatcherFunc) (*Retrieved, error) {
 | |
| 	return f.ret(ctx, uri, watcher)
 | |
| }
 | |
| 
 | |
| func (f *fakeProvider) Scheme() string {
 | |
| 	return f.scheme
 | |
| }
 | |
| 
 | |
| func (f *fakeProvider) Shutdown(context.Context) error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| type mockConverter struct {
 | |
| 	err error
 | |
| }
 | |
| 
 | |
| func (m *mockConverter) Convert(context.Context, *Conf) error {
 | |
| 	return errors.New("converter_err")
 | |
| }
 | |
| 
 | |
| func TestNewResolverInvalidSchemeInURI(t *testing.T) {
 | |
| 	_, err := NewResolver(ResolverSettings{URIs: []string{"s_3:has invalid char"}, ProviderFactories: []ProviderFactory{newMockProvider(&mockProvider{scheme: "s3"})}})
 | |
| 	assert.EqualError(t, err, `invalid uri: "s_3:has invalid char"`)
 | |
| }
 | |
| 
 | |
| func TestNewResolverDuplicateScheme(t *testing.T) {
 | |
| 	_, err := NewResolver(ResolverSettings{URIs: []string{"mock:something"}, ProviderFactories: []ProviderFactory{newMockProvider(&mockProvider{scheme: "mock"}), newMockProvider(&mockProvider{scheme: "mock"})}})
 | |
| 	assert.EqualError(t, err, `duplicate 'confmap.Provider' scheme "mock"`)
 | |
| }
 | |
| 
 | |
| func TestResolverErrors(t *testing.T) {
 | |
| 	tests := []struct {
 | |
| 		name              string
 | |
| 		locations         []string
 | |
| 		providers         []Provider
 | |
| 		converters        []Converter
 | |
| 		defaultScheme     string
 | |
| 		expectBuildErr    bool
 | |
| 		expectResolveErr  bool
 | |
| 		expectWatchErr    bool
 | |
| 		expectCloseErr    bool
 | |
| 		expectShutdownErr bool
 | |
| 	}{
 | |
| 		{
 | |
| 			name:           "unsupported location scheme",
 | |
| 			locations:      []string{"mock:", "notsupported:"},
 | |
| 			providers:      []Provider{&mockProvider{}},
 | |
| 			expectBuildErr: true,
 | |
| 		},
 | |
| 		{
 | |
| 			name:      "default scheme not found",
 | |
| 			locations: []string{"mock:", "err:"},
 | |
| 			providers: []Provider{
 | |
| 				&mockProvider{},
 | |
| 				&mockProvider{scheme: "err", errR: errors.New("retrieve_err")},
 | |
| 			},
 | |
| 			defaultScheme:  "missing",
 | |
| 			expectBuildErr: true,
 | |
| 		},
 | |
| 		{
 | |
| 			name:      "retrieve location config error",
 | |
| 			locations: []string{"mock:", "err:"},
 | |
| 			providers: []Provider{
 | |
| 				&mockProvider{},
 | |
| 				&mockProvider{scheme: "err", errR: errors.New("retrieve_err")},
 | |
| 			},
 | |
| 			expectResolveErr: true,
 | |
| 		},
 | |
| 		{
 | |
| 			name:      "retrieve location not convertible to Conf",
 | |
| 			locations: []string{"mock:", "err:"},
 | |
| 			providers: []Provider{
 | |
| 				&mockProvider{},
 | |
| 				&mockProvider{scheme: "err", retM: "invalid value"},
 | |
| 			},
 | |
| 			expectResolveErr: true,
 | |
| 		},
 | |
| 		{
 | |
| 			name:             "converter error",
 | |
| 			locations:        []string{"mock:"},
 | |
| 			providers:        []Provider{&mockProvider{}},
 | |
| 			converters:       []Converter{&mockConverter{err: errors.New("converter_err")}},
 | |
| 			expectResolveErr: true,
 | |
| 		},
 | |
| 		{
 | |
| 			name:      "watch error",
 | |
| 			locations: []string{"mock:", "err:"},
 | |
| 			providers: []Provider{
 | |
| 				&mockProvider{},
 | |
| 				&mockProvider{scheme: "err", retM: map[string]any{}, errW: errors.New("watch_err")},
 | |
| 			},
 | |
| 			expectWatchErr: true,
 | |
| 		},
 | |
| 		{
 | |
| 			name:      "close error",
 | |
| 			locations: []string{"mock:", "err:"},
 | |
| 			providers: []Provider{
 | |
| 				&mockProvider{},
 | |
| 				&mockProvider{scheme: "err", retM: map[string]any{}, closeFunc: func(context.Context) error { return errors.New("close_err") }},
 | |
| 			},
 | |
| 			expectCloseErr: true,
 | |
| 		},
 | |
| 		{
 | |
| 			name:      "shutdown error",
 | |
| 			locations: []string{"mock:", "err:"},
 | |
| 			providers: []Provider{
 | |
| 				&mockProvider{},
 | |
| 				&mockProvider{scheme: "err", retM: map[string]any{}, errS: errors.New("close_err")},
 | |
| 			},
 | |
| 			expectShutdownErr: true,
 | |
| 		},
 | |
| 	}
 | |
| 	for _, tt := range tests {
 | |
| 		t.Run(tt.name, func(t *testing.T) {
 | |
| 			mockProviderFuncs := make([]ProviderFactory, len(tt.providers))
 | |
| 			for i, provider := range tt.providers {
 | |
| 				p := provider
 | |
| 				mockProviderFuncs[i] = NewProviderFactory(func(_ ProviderSettings) Provider { return p })
 | |
| 			}
 | |
| 			converterFuncs := make([]ConverterFactory, len(tt.converters))
 | |
| 			for i, converter := range tt.converters {
 | |
| 				c := converter
 | |
| 				converterFuncs[i] = NewConverterFactory(func(_ ConverterSettings) Converter { return c })
 | |
| 			}
 | |
| 			resolver, err := NewResolver(ResolverSettings{URIs: tt.locations, ProviderFactories: mockProviderFuncs, DefaultScheme: tt.defaultScheme, ConverterFactories: converterFuncs})
 | |
| 			if tt.expectBuildErr {
 | |
| 				assert.Error(t, err)
 | |
| 				return
 | |
| 			}
 | |
| 			require.NoError(t, err)
 | |
| 
 | |
| 			_, errN := resolver.Resolve(context.Background())
 | |
| 			if tt.expectResolveErr {
 | |
| 				assert.Error(t, errN)
 | |
| 				return
 | |
| 			}
 | |
| 			require.NoError(t, errN)
 | |
| 
 | |
| 			errW := <-resolver.Watch()
 | |
| 			if tt.expectWatchErr {
 | |
| 				assert.Error(t, errW)
 | |
| 				return
 | |
| 			}
 | |
| 			require.NoError(t, errW)
 | |
| 
 | |
| 			_, errC := resolver.Resolve(context.Background())
 | |
| 			if tt.expectCloseErr {
 | |
| 				assert.Error(t, errC)
 | |
| 				return
 | |
| 			}
 | |
| 			require.NoError(t, errN)
 | |
| 
 | |
| 			errS := resolver.Shutdown(context.Background())
 | |
| 			if tt.expectShutdownErr {
 | |
| 				assert.Error(t, errS)
 | |
| 				return
 | |
| 			}
 | |
| 			assert.NoError(t, errC)
 | |
| 		})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestBackwardsCompatibilityForFilePath(t *testing.T) {
 | |
| 	tests := []struct {
 | |
| 		name           string
 | |
| 		location       string
 | |
| 		errMessage     string
 | |
| 		expectBuildErr bool
 | |
| 	}{
 | |
| 		{
 | |
| 			name:       "unix",
 | |
| 			location:   `/test`,
 | |
| 			errMessage: `file:/test`,
 | |
| 		},
 | |
| 		{
 | |
| 			name:       "file_unix",
 | |
| 			location:   `file:/test`,
 | |
| 			errMessage: `file:/test`,
 | |
| 		},
 | |
| 		{
 | |
| 			name:       "windows_C",
 | |
| 			location:   `c:\test`,
 | |
| 			errMessage: `file:c:\test`,
 | |
| 		},
 | |
| 		{
 | |
| 			name:       "windows_z",
 | |
| 			location:   `z:\test`,
 | |
| 			errMessage: `file:z:\test`,
 | |
| 		},
 | |
| 		{
 | |
| 			name:       "file_windows",
 | |
| 			location:   `file:c:\test`,
 | |
| 			errMessage: `file:c:\test`,
 | |
| 		},
 | |
| 		{
 | |
| 			name:           "invalid_scheme",
 | |
| 			location:       `LL:\test`,
 | |
| 			expectBuildErr: true,
 | |
| 		},
 | |
| 	}
 | |
| 	for _, tt := range tests {
 | |
| 		t.Run(tt.name, func(t *testing.T) {
 | |
| 			resolver, err := NewResolver(ResolverSettings{
 | |
| 				URIs: []string{tt.location},
 | |
| 				ProviderFactories: []ProviderFactory{
 | |
| 					newFakeProvider("file", func(_ context.Context, uri string, _ WatcherFunc) (*Retrieved, error) {
 | |
| 						return nil, errors.New(uri)
 | |
| 					}),
 | |
| 				},
 | |
| 				ConverterFactories: nil,
 | |
| 			})
 | |
| 			if tt.expectBuildErr {
 | |
| 				assert.Error(t, err)
 | |
| 				return
 | |
| 			}
 | |
| 			require.NoError(t, err)
 | |
| 			_, err = resolver.Resolve(context.Background())
 | |
| 			assert.ErrorContains(t, err, tt.errMessage, tt.name)
 | |
| 		})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func TestResolver(t *testing.T) {
 | |
| 	numCalls := atomic.Int32{}
 | |
| 	resolver, err := NewResolver(ResolverSettings{
 | |
| 		URIs: []string{"mock:"},
 | |
| 		ProviderFactories: []ProviderFactory{
 | |
| 			newMockProvider(&mockProvider{retM: map[string]any{}, closeFunc: func(context.Context) error {
 | |
| 				numCalls.Add(1)
 | |
| 				return nil
 | |
| 			}}),
 | |
| 		},
 | |
| 		ConverterFactories: nil,
 | |
| 	})
 | |
| 	require.NoError(t, err)
 | |
| 	_, errN := resolver.Resolve(context.Background())
 | |
| 	require.NoError(t, errN)
 | |
| 	assert.Equal(t, int32(0), numCalls.Load())
 | |
| 
 | |
| 	errW := <-resolver.Watch()
 | |
| 	require.NoError(t, errW)
 | |
| 
 | |
| 	// Repeat Resolve/Watch.
 | |
| 
 | |
| 	_, errN = resolver.Resolve(context.Background())
 | |
| 	require.NoError(t, errN)
 | |
| 	assert.Equal(t, int32(1), numCalls.Load())
 | |
| 
 | |
| 	errW = <-resolver.Watch()
 | |
| 	require.NoError(t, errW)
 | |
| 
 | |
| 	_, errN = resolver.Resolve(context.Background())
 | |
| 	require.NoError(t, errN)
 | |
| 	assert.Equal(t, int32(2), numCalls.Load())
 | |
| 
 | |
| 	errC := resolver.Shutdown(context.Background())
 | |
| 	require.NoError(t, errC)
 | |
| 	assert.Equal(t, int32(3), numCalls.Load())
 | |
| }
 | |
| 
 | |
| func TestResolverNewLinesInOpaqueValue(t *testing.T) {
 | |
| 	_, err := NewResolver(ResolverSettings{
 | |
| 		URIs:               []string{"mock:receivers:\n nop:\n"},
 | |
| 		ProviderFactories:  []ProviderFactory{newMockProvider(&mockProvider{retM: map[string]any{}})},
 | |
| 		ConverterFactories: nil,
 | |
| 	})
 | |
| 	assert.NoError(t, err)
 | |
| }
 | |
| 
 | |
| func TestResolverNoLocations(t *testing.T) {
 | |
| 	_, err := NewResolver(ResolverSettings{
 | |
| 		URIs:               []string{},
 | |
| 		ProviderFactories:  []ProviderFactory{newMockProvider(&mockProvider{})},
 | |
| 		ConverterFactories: nil,
 | |
| 	})
 | |
| 	assert.Error(t, err)
 | |
| }
 | |
| 
 | |
| func TestResolverNoProviders(t *testing.T) {
 | |
| 	_, err := NewResolver(ResolverSettings{
 | |
| 		URIs:               []string{filepath.Join("testdata", "config.yaml")},
 | |
| 		ProviderFactories:  nil,
 | |
| 		ConverterFactories: nil,
 | |
| 	})
 | |
| 	assert.Error(t, err)
 | |
| }
 | |
| 
 | |
| func TestResolverShutdownClosesWatch(t *testing.T) {
 | |
| 	resolver, err := NewResolver(ResolverSettings{
 | |
| 		URIs:               []string{filepath.Join("testdata", "config.yaml")},
 | |
| 		ProviderFactories:  []ProviderFactory{newFileProvider(t)},
 | |
| 		ConverterFactories: nil,
 | |
| 	})
 | |
| 	require.NoError(t, err)
 | |
| 	_, errN := resolver.Resolve(context.Background())
 | |
| 	require.NoError(t, errN)
 | |
| 
 | |
| 	var watcherWG sync.WaitGroup
 | |
| 	watcherWG.Add(1)
 | |
| 	go func() {
 | |
| 		errW, ok := <-resolver.Watch()
 | |
| 		// Channel is closed, no exception
 | |
| 		assert.NoError(t, errW)
 | |
| 		assert.False(t, ok)
 | |
| 		watcherWG.Done()
 | |
| 	}()
 | |
| 
 | |
| 	require.NoError(t, resolver.Shutdown(context.Background()))
 | |
| 	watcherWG.Wait()
 | |
| }
 | |
| 
 | |
| func TestProvidesDefaultLogger(t *testing.T) {
 | |
| 	factory, provider := newObservableFileProvider(t)
 | |
| 	_, err := NewResolver(ResolverSettings{
 | |
| 		URIs:              []string{filepath.Join("testdata", "config.yaml")},
 | |
| 		ProviderFactories: []ProviderFactory{factory},
 | |
| 		ConverterFactories: []ConverterFactory{NewConverterFactory(func(set ConverterSettings) Converter {
 | |
| 			assert.NotNil(t, set.Logger)
 | |
| 			return &mockConverter{}
 | |
| 		})},
 | |
| 	})
 | |
| 	require.NoError(t, err)
 | |
| 	require.NotNil(t, provider.logger)
 | |
| }
 | |
| 
 | |
| func TestResolverDefaultProviderSet(t *testing.T) {
 | |
| 	envProvider := newEnvProvider()
 | |
| 	fileProvider := newFileProvider(t)
 | |
| 
 | |
| 	r, err := NewResolver(ResolverSettings{
 | |
| 		URIs:              []string{"env:"},
 | |
| 		ProviderFactories: []ProviderFactory{fileProvider, envProvider},
 | |
| 		DefaultScheme:     "env",
 | |
| 	})
 | |
| 	require.NoError(t, err)
 | |
| 	assert.NotNil(t, r.defaultScheme)
 | |
| 	_, ok := r.providers["env"]
 | |
| 	assert.True(t, ok)
 | |
| }
 | |
| 
 | |
| type mergeTest struct {
 | |
| 	Name        string           `yaml:"name"`
 | |
| 	AppendPaths []string         `yaml:"append_paths"`
 | |
| 	Configs     []map[string]any `yaml:"configs"`
 | |
| 	Expected    map[string]any   `yaml:"expected"`
 | |
| }
 | |
| 
 | |
| func TestMergeFunctionality(t *testing.T) {
 | |
| 	tests := []struct {
 | |
| 		name         string
 | |
| 		scenarioFile string
 | |
| 		flagEnabled  bool
 | |
| 	}{
 | |
| 		{
 | |
| 			name:         "feature-flag-enabled",
 | |
| 			scenarioFile: "testdata/merge-append-scenarios.yaml",
 | |
| 			flagEnabled:  true,
 | |
| 		},
 | |
| 		{
 | |
| 			name:         "feature-flag-disabled",
 | |
| 			scenarioFile: "testdata/merge-append-scenarios-featuregate-disabled.yaml",
 | |
| 			flagEnabled:  false,
 | |
| 		},
 | |
| 	}
 | |
| 	for _, tt := range tests {
 | |
| 		t.Run(tt.name, func(t *testing.T) {
 | |
| 			if tt.flagEnabled {
 | |
| 				require.NoError(t, featuregate.GlobalRegistry().Set(enableMergeAppendOption.ID(), true))
 | |
| 				defer func() {
 | |
| 					// Restore previous value.
 | |
| 					require.NoError(t, featuregate.GlobalRegistry().Set(enableMergeAppendOption.ID(), false))
 | |
| 				}()
 | |
| 			}
 | |
| 			runScenario(t, tt.scenarioFile)
 | |
| 		})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func runScenario(t *testing.T, path string) {
 | |
| 	yamlData, err := os.ReadFile(filepath.Clean(path))
 | |
| 	require.NoError(t, err)
 | |
| 	var testcases []*mergeTest
 | |
| 	err = yaml.Unmarshal(yamlData, &testcases)
 | |
| 	require.NoError(t, err)
 | |
| 	for _, tt := range testcases {
 | |
| 		t.Run(tt.Name, func(t *testing.T) {
 | |
| 			configFiles := make([]string, 0)
 | |
| 			for _, c := range tt.Configs {
 | |
| 				// store configs into a temp file. This makes it easier for us to test feature gate functionality
 | |
| 				file, err := os.CreateTemp(t.TempDir(), "*.yaml")
 | |
| 				defer func() { require.NoError(t, file.Close()) }()
 | |
| 				require.NoError(t, err)
 | |
| 				b, err := json.Marshal(c)
 | |
| 				require.NoError(t, err)
 | |
| 				n, err := file.Write(b)
 | |
| 				require.NoError(t, err)
 | |
| 				require.Positive(t, n)
 | |
| 				configFiles = append(configFiles, file.Name())
 | |
| 			}
 | |
| 
 | |
| 			resolver, err := NewResolver(ResolverSettings{
 | |
| 				URIs:              configFiles,
 | |
| 				ProviderFactories: []ProviderFactory{newFileProvider(t)},
 | |
| 				DefaultScheme:     "file",
 | |
| 			})
 | |
| 			require.NoError(t, err)
 | |
| 			conf, err := resolver.Resolve(context.Background())
 | |
| 			require.NoError(t, err)
 | |
| 			mergedConf := conf.ToStringMap()
 | |
| 			require.Truef(t, reflect.DeepEqual(mergedConf, tt.Expected), "Exp: %s\nGot: %s", tt.Expected, mergedConf)
 | |
| 		})
 | |
| 	}
 | |
| }
 |