diff --git a/processor/attributesprocessor/attributes.go b/processor/attributesprocessor/attributes.go index befb10d701..4f03f300dc 100644 --- a/processor/attributesprocessor/attributes.go +++ b/processor/attributesprocessor/attributes.go @@ -126,6 +126,11 @@ func (a *attributesProcessor) GetCapabilities() processor.Capabilities { return processor.Capabilities{MutatesConsumedData: true} } +// Shutdown is invoked during service shutdown. +func (a *attributesProcessor) Shutdown() error { + return nil +} + func insertAttribute(action attributeAction, attributesMap map[string]*tracepb.AttributeValue) { // Insert is only performed when the target key does not already exist // in the attribute map. diff --git a/processor/nodebatcherprocessor/node_batcher.go b/processor/nodebatcherprocessor/node_batcher.go index e05ae0e695..f37f519259 100644 --- a/processor/nodebatcherprocessor/node_batcher.go +++ b/processor/nodebatcherprocessor/node_batcher.go @@ -113,6 +113,12 @@ func (b *batcher) GetCapabilities() processor.Capabilities { return processor.Capabilities{MutatesConsumedData: false} } +// Shutdown is invoked during service shutdown. +func (b *batcher) Shutdown() error { + // TODO: flush accumulated data. + return nil +} + func (b *batcher) genBucketID(node *commonpb.Node, resource *resourcepb.Resource, spanFormat string) string { h := sha256.New() if node != nil { diff --git a/processor/probabilisticsamplerprocessor/probabilisticsampler.go b/processor/probabilisticsamplerprocessor/probabilisticsampler.go index 01c6a3cb7e..6bac3a3390 100644 --- a/processor/probabilisticsamplerprocessor/probabilisticsampler.go +++ b/processor/probabilisticsamplerprocessor/probabilisticsampler.go @@ -86,6 +86,11 @@ func (tsp *tracesamplerprocessor) GetCapabilities() processor.Capabilities { return processor.Capabilities{MutatesConsumedData: false} } +// Shutdown is invoked during service shutdown. +func (tsp *tracesamplerprocessor) Shutdown() error { + return nil +} + // hash is a murmur3 hash function, see http://en.wikipedia.org/wiki/MurmurHash. func hash(key []byte, seed uint32) (hash uint32) { const ( diff --git a/processor/processor.go b/processor/processor.go index c6594a76a7..304537d985 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -19,16 +19,27 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer" ) +// Processor defines the common functions that must be implemented by TraceProcessor +// and MetricsProcessor. +type Processor interface { + // GetCapabilities must return the capabilities of the processor. + GetCapabilities() Capabilities + + // Shutdown is invoked during service shutdown. Typically used to flush the data + // that may be accumulated in the processor. + Shutdown() error +} + // TraceProcessor composes TraceConsumer with some additional processor-specific functions. type TraceProcessor interface { consumer.TraceConsumer - GetCapabilities() Capabilities + Processor } // MetricsProcessor composes MetricsConsumer with some additional processor-specific functions. type MetricsProcessor interface { consumer.MetricsConsumer - GetCapabilities() Capabilities + Processor } // Capabilities describes the capabilities of TraceProcessor or MetricsProcessor. diff --git a/processor/processortest/nop_processor.go b/processor/processortest/nop_processor.go index e72fa57971..073fe0f371 100644 --- a/processor/processortest/nop_processor.go +++ b/processor/processortest/nop_processor.go @@ -45,6 +45,11 @@ func (np *nopProcessor) GetCapabilities() processor.Capabilities { return processor.Capabilities{MutatesConsumedData: false} } +// Shutdown is invoked during service shutdown. +func (np *nopProcessor) Shutdown() error { + return nil +} + // NewNopTraceProcessor creates an TraceProcessor that just pass the received data to the nextTraceProcessor. func NewNopTraceProcessor(nextTraceProcessor consumer.TraceConsumer) consumer.TraceConsumer { return &nopProcessor{nextTraceProcessor: nextTraceProcessor} diff --git a/processor/queuedprocessor/queued_processor.go b/processor/queuedprocessor/queued_processor.go index 44dcec0e91..42bd2380c0 100644 --- a/processor/queuedprocessor/queued_processor.go +++ b/processor/queuedprocessor/queued_processor.go @@ -135,6 +135,12 @@ func (sp *queuedSpanProcessor) GetCapabilities() processor.Capabilities { return processor.Capabilities{MutatesConsumedData: false} } +// Shutdown is invoked during service shutdown. +func (sp *queuedSpanProcessor) Shutdown() error { + // TODO: flush the queue. + return nil +} + func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) { startTime := time.Now() err := sp.sender.ConsumeTraceData(item.ctx, item.td) diff --git a/processor/spanprocessor/span.go b/processor/spanprocessor/span.go index c6c58d445d..fcd7a6ee2d 100644 --- a/processor/spanprocessor/span.go +++ b/processor/spanprocessor/span.go @@ -61,6 +61,11 @@ func (sp *spanProcessor) GetCapabilities() processor.Capabilities { return processor.Capabilities{MutatesConsumedData: true} } +// Shutdown is invoked during service shutdown. +func (sp *spanProcessor) Shutdown() error { + return nil +} + func (sp *spanProcessor) nameSpan(span *tracepb.Span) { // Note: There was a separate proposal for creating the string. // With benchmarking, strings.Builder is faster than the proposal. diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 93589fd6fb..083a7813c4 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -315,6 +315,11 @@ func (tsp *tailSamplingSpanProcessor) GetCapabilities() processor.Capabilities { return processor.Capabilities{MutatesConsumedData: false} } +// Shutdown is invoked during service shutdown. +func (tsp *tailSamplingSpanProcessor) Shutdown() error { + return nil +} + func (tsp *tailSamplingSpanProcessor) dropTrace(traceID traceKey, deletionTime time.Time) { var trace *sampling.TraceData if d, ok := tsp.idToTrace.Load(traceID); ok { diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index d9f9139edf..4a1f122ba6 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -331,3 +331,8 @@ func (p *mockSpanProcessor) ConsumeTraceData(ctx context.Context, td consumerdat func (p *mockSpanProcessor) GetCapabilities() processor.Capabilities { return processor.Capabilities{MutatesConsumedData: false} } + +// Shutdown is invoked during service shutdown. +func (p *mockSpanProcessor) Shutdown() error { + return nil +} diff --git a/service/service.go b/service/service.go index 78d9ba0dec..80983067c6 100644 --- a/service/service.go +++ b/service/service.go @@ -284,7 +284,8 @@ func (app *Application) shutdownPipelines() { app.logger.Info("Stopping receivers...") app.builtReceivers.StopAll() - // TODO: shutdown processors + // TODO: shutdown processors by calling Shutdown() for each processor in the + // order they are arranged in the pipeline. app.logger.Info("Shutting down exporters...") app.exporters.ShutdownAll()