246 lines
8.9 KiB
Go
246 lines
8.9 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package confmap // import "go.opentelemetry.io/collector/confmap"
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"go.uber.org/zap"
|
|
"gopkg.in/yaml.v3"
|
|
)
|
|
|
|
// ProviderSettings are the settings to initialize a Provider.
|
|
type ProviderSettings struct {
|
|
// Logger is a zap.Logger that will be passed to Providers.
|
|
// Providers should be able to rely on the Logger being non-nil;
|
|
// when instantiating a Provider with a ProviderFactory,
|
|
// nil Logger references should be replaced with a no-op Logger.
|
|
Logger *zap.Logger
|
|
}
|
|
|
|
// ProviderFactory defines a factory that can be used to instantiate
|
|
// new instances of a Provider.
|
|
type ProviderFactory = moduleFactory[Provider, ProviderSettings]
|
|
|
|
// CreateProviderFunc is a function that creates a Provider instance.
|
|
type CreateProviderFunc = createConfmapFunc[Provider, ProviderSettings]
|
|
|
|
// NewProviderFactory can be used to create a ProviderFactory.
|
|
func NewProviderFactory(f CreateProviderFunc) ProviderFactory {
|
|
return newConfmapModuleFactory(f)
|
|
}
|
|
|
|
// Provider is an interface that helps to retrieve a config map and watch for any
|
|
// changes to the config map. Implementations may load the config from a file,
|
|
// a database or any other source.
|
|
//
|
|
// The typical usage is the following:
|
|
//
|
|
// r, err := provider.Retrieve("file:/path/to/config")
|
|
// // Use r.Map; wait for watcher to be called.
|
|
// r.Close()
|
|
// r, err = provider.Retrieve("file:/path/to/config")
|
|
// // Use r.Map; wait for watcher to be called.
|
|
// r.Close()
|
|
// // repeat retrieve/wait/close cycle until it is time to shut down the Collector process.
|
|
// // ...
|
|
// provider.Shutdown()
|
|
type Provider interface {
|
|
// Retrieve goes to the configuration source and retrieves the selected data which
|
|
// contains the value to be injected in the configuration and the corresponding watcher that
|
|
// will be used to monitor for updates of the retrieved value.
|
|
//
|
|
// `uri` must follow the "<scheme>:<opaque_data>" format. This format is compatible
|
|
// with the URI definition (see https://datatracker.ietf.org/doc/html/rfc3986). The "<scheme>"
|
|
// must be always included in the `uri`. The "<scheme>" supported by any provider:
|
|
// - MUST consist of a sequence of characters beginning with a letter and followed by any
|
|
// combination of letters, digits, plus ("+"), period ("."), or hyphen ("-").
|
|
// See https://datatracker.ietf.org/doc/html/rfc3986#section-3.1.
|
|
// - MUST be at least 2 characters long to avoid conflicting with a driver-letter identifier as specified
|
|
// in https://tools.ietf.org/id/draft-kerwin-file-scheme-07.html#syntax.
|
|
// - For testing, all implementation MUST check that confmaptest.ValidateProviderScheme returns no error.
|
|
//
|
|
// `watcher` callback is called when the config changes. watcher may be called from
|
|
// a different go routine. After watcher is called Retrieved.Get should be called
|
|
// to get the new config. See description of Retrieved for more details.
|
|
// watcher may be nil, which indicates that the caller is not interested in
|
|
// knowing about the changes.
|
|
//
|
|
// If ctx is cancelled should return immediately with an error.
|
|
// Should never be called concurrently with itself or with Shutdown.
|
|
Retrieve(ctx context.Context, uri string, watcher WatcherFunc) (*Retrieved, error)
|
|
|
|
// Scheme returns the location scheme used by Retrieve.
|
|
Scheme() string
|
|
|
|
// Shutdown signals that the configuration for which this Provider was used to
|
|
// retrieve values is no longer in use and the Provider should close and release
|
|
// any resources that it may have created.
|
|
//
|
|
// This method must be called when the Collector service ends, either in case of
|
|
// success or error. Retrieve cannot be called after Shutdown.
|
|
//
|
|
// Should never be called concurrently with itself or with Retrieve.
|
|
// If ctx is cancelled should return immediately with an error.
|
|
Shutdown(ctx context.Context) error
|
|
}
|
|
|
|
type WatcherFunc func(*ChangeEvent)
|
|
|
|
// ChangeEvent describes the particular change event that happened with the config.
|
|
type ChangeEvent struct {
|
|
// Error is nil if the config is changed and needs to be re-fetched.
|
|
// Any non-nil error indicates that there was a problem with watching the config changes.
|
|
Error error
|
|
}
|
|
|
|
// Retrieved holds the result of a call to the Retrieve method of a Provider object.
|
|
type Retrieved struct {
|
|
rawConf any
|
|
closeFunc CloseFunc
|
|
|
|
stringRepresentation string
|
|
isSetString bool
|
|
}
|
|
|
|
type retrievedSettings struct {
|
|
stringRepresentation string
|
|
isSetString bool
|
|
closeFunc CloseFunc
|
|
}
|
|
|
|
// RetrievedOption options to customize Retrieved values.
|
|
type RetrievedOption interface {
|
|
apply(*retrievedSettings)
|
|
}
|
|
|
|
type retrievedOptionFunc func(*retrievedSettings)
|
|
|
|
func (of retrievedOptionFunc) apply(e *retrievedSettings) {
|
|
of(e)
|
|
}
|
|
|
|
// WithRetrievedClose overrides the default Retrieved.Close function.
|
|
// The default Retrieved.Close function does nothing and always returns nil.
|
|
func WithRetrievedClose(closeFunc CloseFunc) RetrievedOption {
|
|
return retrievedOptionFunc(func(settings *retrievedSettings) {
|
|
settings.closeFunc = closeFunc
|
|
})
|
|
}
|
|
|
|
func withStringRepresentation(stringRepresentation string) RetrievedOption {
|
|
return retrievedOptionFunc(func(settings *retrievedSettings) {
|
|
settings.stringRepresentation = stringRepresentation
|
|
settings.isSetString = true
|
|
})
|
|
}
|
|
|
|
// NewRetrievedFromYAML returns a new Retrieved instance that contains the deserialized data from the yaml bytes.
|
|
// * yamlBytes the yaml bytes that will be deserialized.
|
|
// * opts specifies options associated with this Retrieved value, such as CloseFunc.
|
|
func NewRetrievedFromYAML(yamlBytes []byte, opts ...RetrievedOption) (*Retrieved, error) {
|
|
var rawConf any
|
|
if err := yaml.Unmarshal(yamlBytes, &rawConf); err != nil {
|
|
// If the string is not valid YAML, we try to use it verbatim as a string.
|
|
strRep := string(yamlBytes)
|
|
return NewRetrieved(strRep, append(opts, withStringRepresentation(strRep))...)
|
|
}
|
|
|
|
switch rawConf.(type) {
|
|
case string:
|
|
val := string(yamlBytes)
|
|
return NewRetrieved(val, append(opts, withStringRepresentation(val))...)
|
|
default:
|
|
opts = append(opts, withStringRepresentation(string(yamlBytes)))
|
|
}
|
|
|
|
return NewRetrieved(rawConf, opts...)
|
|
}
|
|
|
|
// NewRetrieved returns a new Retrieved instance that contains the data from the raw deserialized config.
|
|
// The rawConf can be one of the following types:
|
|
// - Primitives: int, int32, int64, float32, float64, bool, string;
|
|
// - []any;
|
|
// - map[string]any;
|
|
func NewRetrieved(rawConf any, opts ...RetrievedOption) (*Retrieved, error) {
|
|
if err := checkRawConfType(rawConf); err != nil {
|
|
return nil, err
|
|
}
|
|
set := retrievedSettings{}
|
|
for _, opt := range opts {
|
|
opt.apply(&set)
|
|
}
|
|
return &Retrieved{
|
|
rawConf: rawConf,
|
|
closeFunc: set.closeFunc,
|
|
stringRepresentation: set.stringRepresentation,
|
|
isSetString: set.isSetString,
|
|
}, nil
|
|
}
|
|
|
|
// AsConf returns the retrieved configuration parsed as a Conf.
|
|
func (r *Retrieved) AsConf() (*Conf, error) {
|
|
if r.rawConf == nil {
|
|
return New(), nil
|
|
}
|
|
val, ok := r.rawConf.(map[string]any)
|
|
if !ok {
|
|
return nil, fmt.Errorf("retrieved value (type=%T) cannot be used as a Conf", r.rawConf)
|
|
}
|
|
return NewFromStringMap(val), nil
|
|
}
|
|
|
|
// AsRaw returns the retrieved configuration parsed as an any which can be one of the following types:
|
|
// - Primitives: int, int32, int64, float32, float64, bool, string;
|
|
// - []any - every member follows the same rules as the given any;
|
|
// - map[string]any - every value follows the same rules as the given any;
|
|
func (r *Retrieved) AsRaw() (any, error) {
|
|
return r.rawConf, nil
|
|
}
|
|
|
|
// AsString returns the retrieved configuration as a string.
|
|
// If the retrieved configuration is not convertible to a string unambiguously, an error is returned.
|
|
// If the retrieved configuration is a string, the string is returned.
|
|
// This method is used to resolve ${} references in inline position.
|
|
func (r *Retrieved) AsString() (string, error) {
|
|
if !r.isSetString {
|
|
if str, ok := r.rawConf.(string); ok {
|
|
return str, nil
|
|
}
|
|
return "", fmt.Errorf("retrieved value does not have unambiguous string representation: %v", r.rawConf)
|
|
}
|
|
return r.stringRepresentation, nil
|
|
}
|
|
|
|
// Close and release any watchers that Provider.Retrieve may have created.
|
|
//
|
|
// Should block until all resources are closed, and guarantee that `onChange` is not
|
|
// going to be called after it returns except when `ctx` is cancelled.
|
|
//
|
|
// Should never be called concurrently with itself.
|
|
func (r *Retrieved) Close(ctx context.Context) error {
|
|
if r.closeFunc == nil {
|
|
return nil
|
|
}
|
|
return r.closeFunc(ctx)
|
|
}
|
|
|
|
// CloseFunc a function equivalent to Retrieved.Close.
|
|
type CloseFunc func(context.Context) error
|
|
|
|
func checkRawConfType(rawConf any) error {
|
|
if rawConf == nil {
|
|
return nil
|
|
}
|
|
switch rawConf.(type) {
|
|
case int, int32, int64, float32, float64, bool, string, []any, map[string]any:
|
|
return nil
|
|
default:
|
|
return fmt.Errorf(
|
|
"unsupported type=%T for retrieved config,"+
|
|
" ensure that values are wrapped in quotes", rawConf)
|
|
}
|
|
}
|