Add a host wrapper to improve ReportFatalError msg (#2775)

* Add a host wrapper  to improve ReportFatalError msg

* Adding host wrapper when starting pipeline

* Do not allocate in the loop
This commit is contained in:
Paulo Janotti 2021-03-23 15:09:25 -07:00 committed by GitHub
parent a20fba1488
commit c81a01b7d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 77 additions and 7 deletions

View File

@ -91,7 +91,7 @@ func (exps Exporters) StartAll(ctx context.Context, host component.Host) error {
for _, exp := range exps {
exp.logger.Info("Exporter is starting...")
if err := exp.Start(ctx, host); err != nil {
if err := exp.Start(ctx, newHostWrapper(host, exp.logger)); err != nil {
return err
}
exp.logger.Info("Exporter started.")
@ -299,7 +299,7 @@ func (eb *exportersBuilder) buildExporter(
}
}
eb.logger.Info("Exporter is enabled.", zap.String("exporter", config.Name()))
eb.logger.Info("Exporter was built.", zap.String("exporter", config.Name()))
return exporter, nil
}

View File

@ -52,7 +52,7 @@ func (exts Extensions) StartAll(ctx context.Context, host component.Host) error
for _, ext := range exts {
ext.logger.Info("Extension is starting...")
if err := ext.Start(ctx, host); err != nil {
if err := ext.Start(ctx, newHostWrapper(host, ext.logger)); err != nil {
return err
}

View File

@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package builder
import (
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
)
// hostWrapper adds behavior on top of the component.Host being passed when starting the built components.
type hostWrapper struct {
component.Host
*zap.Logger
}
func newHostWrapper(host component.Host, logger *zap.Logger) component.Host {
return &hostWrapper{
host,
logger,
}
}
func (hw *hostWrapper) ReportFatalError(err error) {
// The logger from the built component already identifies the component.
hw.Logger.Error("Component fatal error", zap.Error(err))
hw.Host.ReportFatalError(err)
}

View File

@ -0,0 +1,29 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package builder
import (
"errors"
"testing"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component/componenttest"
)
func Test_newHostWrapper(t *testing.T) {
hw := newHostWrapper(componenttest.NewNopHost(), zap.NewNop())
hw.ReportFatalError(errors.New("test error"))
}

View File

@ -49,12 +49,13 @@ type BuiltPipelines map[*configmodels.Pipeline]*builtPipeline
func (bps BuiltPipelines) StartProcessors(ctx context.Context, host component.Host) error {
for _, bp := range bps {
bp.logger.Info("Pipeline is starting...")
hostWrapper := newHostWrapper(host, bp.logger)
// Start in reverse order, starting from the back of processors pipeline.
// This is important so that processors that are earlier in the pipeline and
// reference processors that are later in the pipeline do not start sending
// data to later pipelines which are not yet started.
for i := len(bp.processors) - 1; i >= 0; i-- {
if err := bp.processors[i].Start(ctx, host); err != nil {
if err := bp.processors[i].Start(ctx, hostWrapper); err != nil {
return err
}
}
@ -200,7 +201,7 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
pipelineLogger := pb.logger.With(zap.String("pipeline_name", pipelineCfg.Name),
zap.String("pipeline_datatype", string(pipelineCfg.InputType)))
pipelineLogger.Info("Pipeline is enabled.")
pipelineLogger.Info("Pipeline was built.")
bp := &builtPipeline{
pipelineLogger,

View File

@ -69,7 +69,7 @@ func (rcvs Receivers) StartAll(ctx context.Context, host component.Host) error {
for _, rcv := range rcvs {
rcv.logger.Info("Receiver is starting...")
if err := rcv.Start(ctx, host); err != nil {
if err := rcv.Start(ctx, newHostWrapper(host, rcv.logger)); err != nil {
return err
}
rcv.logger.Info("Receiver started.")
@ -227,7 +227,7 @@ func (rb *receiversBuilder) attachReceiverToPipelines(
}
rcv.receiver = createdReceiver
logger.Info("Receiver is enabled.", zap.String("datatype", string(dataType)))
logger.Info("Receiver was built.", zap.String("datatype", string(dataType)))
return nil
}