Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions plugins/processors/nodemetadataenricher/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func NewFactory() processor.Factory {
TypeStr,
createDefaultConfig,
processor.WithMetrics(createMetricsProcessor, stability),
processor.WithLogs(createLogsProcessor, stability),
)
}

Expand Down Expand Up @@ -56,3 +57,26 @@ func createMetricsProcessor(
processorhelper.WithCapabilities(processorCapabilities),
)
}

func createLogsProcessor(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing unit tests for this function and maybe this file as well

ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Logs,
) (processor.Logs, error) {
_, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("configuration parsing error")
}

logsProcessor := newNodeMetadataEnricherProcessor(set.Logger)

return processorhelper.NewLogs(
ctx,
set,
cfg,
nextConsumer,
logsProcessor.processLogs,
processorhelper.WithCapabilities(processorCapabilities),
)
}
92 changes: 92 additions & 0 deletions plugins/processors/nodemetadataenricher/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package nodemetadataenricher

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"
)

// dummyConfig is an alternate component.Config used to exercise the
// configuration-type assertion error path in createMetricsProcessor and
// createLogsProcessor. It does NOT embed the processor's *Config type, so the
// `cfg.(*Config)` assertion in the factory functions should fail.
type dummyConfig struct{}

func TestNewFactory(t *testing.T) {
factory := NewFactory()

require.NotNil(t, factory)
assert.Equal(t, TypeStr, factory.Type())

// Both metric and log signal stability levels should be set.
assert.Equal(t, stability, factory.MetricsStability())
assert.Equal(t, stability, factory.LogsStability())
}

func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig()

require.NotNil(t, cfg)
_, ok := cfg.(*Config)
assert.True(t, ok, "createDefaultConfig should return *Config")
}

func TestCreateMetricsProcessor(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

p, err := factory.CreateMetrics(
context.Background(),
processortest.NewNopSettings(factory.Type()),
cfg,
consumertest.NewNop(),
)
require.NoError(t, err)
require.NotNil(t, p)
assert.True(t, p.Capabilities().MutatesData, "processor should declare it mutates data")
}

func TestCreateMetricsProcessor_BadConfigType(t *testing.T) {
p, err := createMetricsProcessor(
context.Background(),
processortest.NewNopSettings(component.MustNewType("nodemetadataenricher")),
&dummyConfig{},
consumertest.NewNop(),
)
assert.Error(t, err, "createMetricsProcessor should reject a config of the wrong type")
assert.Nil(t, p)
}

func TestCreateLogsProcessor(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

p, err := factory.CreateLogs(
context.Background(),
processortest.NewNopSettings(factory.Type()),
cfg,
consumertest.NewNop(),
)
require.NoError(t, err)
require.NotNil(t, p)
assert.True(t, p.Capabilities().MutatesData, "processor should declare it mutates data")
}

func TestCreateLogsProcessor_BadConfigType(t *testing.T) {
p, err := createLogsProcessor(
context.Background(),
processortest.NewNopSettings(component.MustNewType("nodemetadataenricher")),
&dummyConfig{},
consumertest.NewNop(),
)
assert.Error(t, err, "createLogsProcessor should reject a config of the wrong type")
assert.Nil(t, p)
}
82 changes: 57 additions & 25 deletions plugins/processors/nodemetadataenricher/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"
"sync/atomic"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

Expand Down Expand Up @@ -37,38 +39,68 @@ func newNodeMetadataEnricherProcessor(logger *zap.Logger) *nodeMetadataEnricherP
return p
}

func (p *nodeMetadataEnricherProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
// resolveCache returns the node metadata cache, lazily initializing the
// processor's reference if the cache extension was not ready at construction
// time. Returns nil if the cache is still unavailable.
func (p *nodeMetadataEnricherProcessor) resolveCache() *nodemetadatacache.NodeMetadataCache {
cache := p.cache.Load()
if cache != nil {
return cache
}
cache = nodemetadatacache.GetNodeMetadataCache()
if cache != nil {
p.logger.Debug("Lazily initialized node metadata cache reference")
p.cache.Store(cache)
}
return cache
}

// enrichResource applies node metadata attributes to a single resource if it
// has a non-empty k8s.node.name attribute and a corresponding entry exists in
// the cache. Resources without a node name or with a cache miss are left
// untouched.
func (p *nodeMetadataEnricherProcessor) enrichResource(resource pcommon.Resource, cache *nodemetadatacache.NodeMetadataCache) {
nodeNameVal, exists := resource.Attributes().Get(attrNodeName)
if !exists || nodeNameVal.Str() == "" {
return
}

metadata := cache.Get(nodeNameVal.Str())
if metadata == nil {
return
}

resource.Attributes().PutStr(attrHostID, metadata.HostID)
resource.Attributes().PutStr(attrHostName, metadata.HostName)
resource.Attributes().PutStr(attrHostType, metadata.HostType)
resource.Attributes().PutStr(attrHostImageID, metadata.HostImageID)
resource.Attributes().PutStr(attrAvailabilityZone, metadata.AvailabilityZone)
}

func (p *nodeMetadataEnricherProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
cache := p.resolveCache()
if cache == nil {
// Extension may not have been ready at creation time — retry.
cache = nodemetadatacache.GetNodeMetadataCache()
if cache != nil {
p.logger.Debug("Lazily initialized node metadata cache reference")
p.cache.Store(cache)
} else {
return md, nil
}
return md, nil
}

rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
resource := rms.At(i).Resource()
nodeNameVal, exists := resource.Attributes().Get(attrNodeName)
if !exists || nodeNameVal.Str() == "" {
continue
}

metadata := cache.Get(nodeNameVal.Str())
if metadata == nil {
continue
}

resource.Attributes().PutStr(attrHostID, metadata.HostID)
resource.Attributes().PutStr(attrHostName, metadata.HostName)
resource.Attributes().PutStr(attrHostType, metadata.HostType)
resource.Attributes().PutStr(attrHostImageID, metadata.HostImageID)
resource.Attributes().PutStr(attrAvailabilityZone, metadata.AvailabilityZone)
p.enrichResource(rms.At(i).Resource(), cache)
}

return md, nil
}

func (p *nodeMetadataEnricherProcessor) processLogs(_ context.Context, ld plog.Logs) (plog.Logs, error) {
Comment thread
sky333999 marked this conversation as resolved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this is an almost duplicate of processMetrics, can we extract the common functionality into a util function and use it inside both processing functions ?

cache := p.resolveCache()
if cache == nil {
return ld, nil
}

rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
p.enrichResource(rls.At(i).Resource(), cache)
}

return ld, nil
}
Loading
Loading