267 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			267 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Go
		
	
	
	
| // Copyright The OpenTelemetry Authors
 | |
| // SPDX-License-Identifier: Apache-2.0
 | |
| 
 | |
| package confmap // import "go.opentelemetry.io/collector/confmap"
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"time"
 | |
| 
 | |
| 	"go.uber.org/zap"
 | |
| 	yaml "go.yaml.in/yaml/v3"
 | |
| )
 | |
| 
 | |
| // ProviderSettings are the settings to initialize a Provider.
 | |
| type ProviderSettings struct {
 | |
| 	// Logger is a zap.Logger that will be passed to Providers.
 | |
| 	// Providers should be able to rely on the Logger being non-nil;
 | |
| 	// when instantiating a Provider with a ProviderFactory,
 | |
| 	// nil Logger references should be replaced with a no-op Logger.
 | |
| 	Logger *zap.Logger
 | |
| 
 | |
| 	// prevent unkeyed literal initialization
 | |
| 	_ struct{}
 | |
| }
 | |
| 
 | |
| // ProviderFactory defines a factory that can be used to instantiate
 | |
| // new instances of a Provider.
 | |
| type ProviderFactory = moduleFactory[Provider, ProviderSettings]
 | |
| 
 | |
| // CreateProviderFunc is a function that creates a Provider instance.
 | |
| type CreateProviderFunc = createConfmapFunc[Provider, ProviderSettings]
 | |
| 
 | |
| // NewProviderFactory can be used to create a ProviderFactory.
 | |
| func NewProviderFactory(f CreateProviderFunc) ProviderFactory {
 | |
| 	return newConfmapModuleFactory(f)
 | |
| }
 | |
| 
 | |
| // Provider is an interface that helps to retrieve a config map and watch for any
 | |
| // changes to the config map. Implementations may load the config from a file,
 | |
| // a database or any other source.
 | |
| //
 | |
| // The typical usage is the following:
 | |
| //
 | |
| //	r, err := provider.Retrieve("file:/path/to/config")
 | |
| //	// Use r.Map; wait for watcher to be called.
 | |
| //	r.Close()
 | |
| //	r, err = provider.Retrieve("file:/path/to/config")
 | |
| //	// Use r.Map; wait for watcher to be called.
 | |
| //	r.Close()
 | |
| //	// repeat retrieve/wait/close cycle until it is time to shut down the Collector process.
 | |
| //	// ...
 | |
| //	provider.Shutdown()
 | |
| type Provider interface {
 | |
| 	// Retrieve goes to the configuration source and retrieves the selected data which
 | |
| 	// contains the value to be injected in the configuration and the corresponding watcher that
 | |
| 	// will be used to monitor for updates of the retrieved value.
 | |
| 	//
 | |
| 	// `uri` must follow the "<scheme>:<opaque_data>" format. This format is compatible
 | |
| 	// with the URI definition (see https://datatracker.ietf.org/doc/html/rfc3986). The "<scheme>"
 | |
| 	// must be always included in the `uri`. The "<scheme>" supported by any provider:
 | |
| 	//   - MUST consist of a sequence of characters beginning with a letter and followed by any
 | |
| 	//     combination of letters, digits, plus ("+"), period ("."), or hyphen ("-").
 | |
| 	//     See https://datatracker.ietf.org/doc/html/rfc3986#section-3.1.
 | |
| 	//   - MUST be at least 2 characters long to avoid conflicting with a driver-letter identifier as specified
 | |
| 	//     in https://tools.ietf.org/id/draft-kerwin-file-scheme-07.html#syntax.
 | |
| 	//   - For testing, all implementation MUST check that confmaptest.ValidateProviderScheme returns no error.
 | |
| 	//
 | |
| 	// `watcher` callback is called when the config changes. watcher may be called from
 | |
| 	// a different go routine. After watcher is called, Provider.Retrieve should be called
 | |
| 	// to get the new config. See description of Retrieved for more details.
 | |
| 	// watcher may be nil, which indicates that the caller is not interested in
 | |
| 	// knowing about the changes.
 | |
| 	//
 | |
| 	// If ctx is cancelled should return immediately with an error.
 | |
| 	// Should never be called concurrently with itself or with Shutdown.
 | |
| 	Retrieve(ctx context.Context, uri string, watcher WatcherFunc) (*Retrieved, error)
 | |
| 
 | |
| 	// Scheme returns the location scheme used by Retrieve.
 | |
| 	Scheme() string
 | |
| 
 | |
| 	// Shutdown signals that the configuration for which this Provider was used to
 | |
| 	// retrieve values is no longer in use and the Provider should close and release
 | |
| 	// any resources that it may have created.
 | |
| 	//
 | |
| 	// This method must be called when the Collector service ends, either in case of
 | |
| 	// success or error. Retrieve cannot be called after Shutdown.
 | |
| 	//
 | |
| 	// Should never be called concurrently with itself or with Retrieve.
 | |
| 	// If ctx is cancelled should return immediately with an error.
 | |
| 	Shutdown(ctx context.Context) error
 | |
| }
 | |
| 
 | |
| type WatcherFunc func(*ChangeEvent)
 | |
| 
 | |
| // ChangeEvent describes the particular change event that happened with the config.
 | |
| type ChangeEvent struct {
 | |
| 	// Error is nil if the config is changed and needs to be re-fetched.
 | |
| 	// Any non-nil error indicates that there was a problem with watching the config changes.
 | |
| 	Error error
 | |
| 	// prevent unkeyed literal initialization
 | |
| 	_ struct{}
 | |
| }
 | |
| 
 | |
| // Retrieved holds the result of a call to the Retrieve method of a Provider object.
 | |
| type Retrieved struct {
 | |
| 	rawConf   any
 | |
| 	errorHint error
 | |
| 	closeFunc CloseFunc
 | |
| 
 | |
| 	stringRepresentation string
 | |
| 	isSetString          bool
 | |
| }
 | |
| 
 | |
| type retrievedSettings struct {
 | |
| 	errorHint            error
 | |
| 	stringRepresentation string
 | |
| 	isSetString          bool
 | |
| 	closeFunc            CloseFunc
 | |
| }
 | |
| 
 | |
| // RetrievedOption options to customize Retrieved values.
 | |
| type RetrievedOption interface {
 | |
| 	apply(*retrievedSettings)
 | |
| }
 | |
| 
 | |
| type retrievedOptionFunc func(*retrievedSettings)
 | |
| 
 | |
| func (of retrievedOptionFunc) apply(e *retrievedSettings) {
 | |
| 	of(e)
 | |
| }
 | |
| 
 | |
| // WithRetrievedClose overrides the default Retrieved.Close function.
 | |
| // The default Retrieved.Close function does nothing and always returns nil.
 | |
| func WithRetrievedClose(closeFunc CloseFunc) RetrievedOption {
 | |
| 	return retrievedOptionFunc(func(settings *retrievedSettings) {
 | |
| 		settings.closeFunc = closeFunc
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func withStringRepresentation(stringRepresentation string) RetrievedOption {
 | |
| 	return retrievedOptionFunc(func(settings *retrievedSettings) {
 | |
| 		settings.stringRepresentation = stringRepresentation
 | |
| 		settings.isSetString = true
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func withErrorHint(errorHint error) RetrievedOption {
 | |
| 	return retrievedOptionFunc(func(settings *retrievedSettings) {
 | |
| 		settings.errorHint = errorHint
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // NewRetrievedFromYAML returns a new Retrieved instance that contains the deserialized data from the yaml bytes.
 | |
| // * yamlBytes the yaml bytes that will be deserialized.
 | |
| // * opts specifies options associated with this Retrieved value, such as CloseFunc.
 | |
| func NewRetrievedFromYAML(yamlBytes []byte, opts ...RetrievedOption) (*Retrieved, error) {
 | |
| 	var rawConf any
 | |
| 	if err := yaml.Unmarshal(yamlBytes, &rawConf); err != nil {
 | |
| 		// If the string is not valid YAML, we try to use it verbatim as a string.
 | |
| 		strRep := string(yamlBytes)
 | |
| 		return NewRetrieved(strRep, append(opts,
 | |
| 			withStringRepresentation(strRep),
 | |
| 			withErrorHint(fmt.Errorf("assuming string type since contents are not valid YAML: %w", err)),
 | |
| 		)...)
 | |
| 	}
 | |
| 
 | |
| 	switch rawConf.(type) {
 | |
| 	case string:
 | |
| 		val := string(yamlBytes)
 | |
| 		return NewRetrieved(val, append(opts, withStringRepresentation(val))...)
 | |
| 	default:
 | |
| 		opts = append(opts, withStringRepresentation(string(yamlBytes)))
 | |
| 	}
 | |
| 
 | |
| 	return NewRetrieved(rawConf, opts...)
 | |
| }
 | |
| 
 | |
| // NewRetrieved returns a new Retrieved instance that contains the data from the raw deserialized config.
 | |
| // The rawConf can be one of the following types:
 | |
| //   - Primitives: int, int32, int64, float32, float64, bool, string;
 | |
| //   - []any;
 | |
| //   - map[string]any;
 | |
| func NewRetrieved(rawConf any, opts ...RetrievedOption) (*Retrieved, error) {
 | |
| 	if err := checkRawConfType(rawConf); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	set := retrievedSettings{}
 | |
| 	for _, opt := range opts {
 | |
| 		opt.apply(&set)
 | |
| 	}
 | |
| 	return &Retrieved{
 | |
| 		rawConf:              rawConf,
 | |
| 		errorHint:            set.errorHint,
 | |
| 		closeFunc:            set.closeFunc,
 | |
| 		stringRepresentation: set.stringRepresentation,
 | |
| 		isSetString:          set.isSetString,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // AsConf returns the retrieved configuration parsed as a Conf.
 | |
| func (r *Retrieved) AsConf() (*Conf, error) {
 | |
| 	if r.rawConf == nil {
 | |
| 		return New(), nil
 | |
| 	}
 | |
| 	val, ok := r.rawConf.(map[string]any)
 | |
| 	if !ok {
 | |
| 		if r.errorHint != nil {
 | |
| 			return nil, fmt.Errorf("retrieved value (type=%T) cannot be used as a Conf: %w", r.rawConf, r.errorHint)
 | |
| 		}
 | |
| 		return nil, fmt.Errorf("retrieved value (type=%T) cannot be used as a Conf", r.rawConf)
 | |
| 	}
 | |
| 	return NewFromStringMap(val), nil
 | |
| }
 | |
| 
 | |
| // AsRaw returns the retrieved configuration parsed as an any which can be one of the following types:
 | |
| //   - Primitives: int, int32, int64, float32, float64, bool, string;
 | |
| //   - []any - every member follows the same rules as the given any;
 | |
| //   - map[string]any - every value follows the same rules as the given any;
 | |
| func (r *Retrieved) AsRaw() (any, error) {
 | |
| 	return r.rawConf, nil
 | |
| }
 | |
| 
 | |
| // AsString returns the retrieved configuration as a string.
 | |
| // If the retrieved configuration is not convertible to a string unambiguously, an error is returned.
 | |
| // If the retrieved configuration is a string, the string is returned.
 | |
| // This method is used to resolve ${} references in inline position.
 | |
| func (r *Retrieved) AsString() (string, error) {
 | |
| 	if !r.isSetString {
 | |
| 		if str, ok := r.rawConf.(string); ok {
 | |
| 			return str, nil
 | |
| 		}
 | |
| 		return "", fmt.Errorf("retrieved value does not have unambiguous string representation: %v", r.rawConf)
 | |
| 	}
 | |
| 	return r.stringRepresentation, nil
 | |
| }
 | |
| 
 | |
| // Close and release any watchers that Provider.Retrieve may have created.
 | |
| //
 | |
| // Should block until all resources are closed, and guarantee that `onChange` is not
 | |
| // going to be called after it returns except when `ctx` is cancelled.
 | |
| //
 | |
| // Should never be called concurrently with itself.
 | |
| func (r *Retrieved) Close(ctx context.Context) error {
 | |
| 	if r.closeFunc == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	return r.closeFunc(ctx)
 | |
| }
 | |
| 
 | |
| // CloseFunc a function equivalent to Retrieved.Close.
 | |
| type CloseFunc func(context.Context) error
 | |
| 
 | |
| func checkRawConfType(rawConf any) error {
 | |
| 	if rawConf == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	switch rawConf.(type) {
 | |
| 	case int, int32, int64, float32, float64, bool, string, []any, map[string]any, time.Time:
 | |
| 		return nil
 | |
| 	default:
 | |
| 		return fmt.Errorf(
 | |
| 			"unsupported type=%T for retrieved config,"+
 | |
| 				" ensure that values are wrapped in quotes", rawConf)
 | |
| 	}
 | |
| }
 |