Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions terraform/eks/daemon/otel/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ data "external" "clone_helm_chart" {
]
}

# Pre-create the events log group and stream — the OTLP HTTP exporter's
# x-aws-log-group-create header is not yet honored by the backend.
resource "aws_cloudwatch_log_group" "events" {
name = "/aws/containerinsights/${aws_eks_cluster.this.name}/events"
retention_in_days = 1
}

resource "aws_cloudwatch_log_stream" "events" {
name = "events"
log_group_name = aws_cloudwatch_log_group.events.name
}

resource "helm_release" "aws_observability" {
name = "amazon-cloudwatch-observability"
chart = "./helm-charts/charts/amazon-cloudwatch-observability"
Expand All @@ -139,13 +151,14 @@ resource "helm_release" "aws_observability" {
set = [
{ name = "clusterName", value = aws_eks_cluster.this.name },
{ name = "region", value = var.region },
{ name = "otelContainerInsights.enabled", value = "true" },
{ name = "otelContainerInsights.events.enabled", value = "true" }
]

depends_on = [
aws_eks_addon.pod_identity_agent,
null_resource.kubectl,
data.external.clone_helm_chart,
aws_cloudwatch_log_stream.events,
]
}

Expand Down Expand Up @@ -360,8 +373,8 @@ resource "null_resource" "validator" {
echo "Running OTEL standard cluster integration tests"
cd ../../../..

echo "Waiting 3 minutes for metrics to propagate..."
sleep 180
echo "Waiting 5 minutes for metrics and events to propagate..."
sleep 300

go test -tags integration -timeout 1h -v ${var.test_dir} \
-eksClusterName=${aws_eks_cluster.this.name} \
Expand Down
45 changes: 45 additions & 0 deletions test/otel/standard/events_setup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
//go:build integration

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package standard

import (
"context"
"fmt"
"time"

"github.com/aws/amazon-cloudwatch-agent-test/util/otellogs"
)

// Events log group follows the convention:
// /aws/containerinsights/{clusterName}/events
func eventsLogGroup() string {
return fmt.Sprintf("/aws/containerinsights/%s/events", cfg.ClusterName)
}

const pipelineEvents = "events"

var (
eventsLogsClient *otellogs.OtelLogsClient
eventsLogQueryCache *otellogs.LogQueryCache
eventsLogsLookback = 10 * time.Minute
)

// initEventsLogsClient is called from TestMain (setup_test.go) after cfg is populated.
func initEventsLogsClient(ctx context.Context) error {
var err error
logsCfg := otellogs.LogsConfig{
Region: cfg.Region,
ClusterName: cfg.ClusterName,
AccountID: cfg.AccountID,
LookbackWindow: eventsLogsLookback,
}
eventsLogsClient, err = otellogs.NewClient(ctx, logsCfg)
if err != nil {
return fmt.Errorf("creating events logs client: %w", err)
}
eventsLogQueryCache = otellogs.NewLogQueryCache(eventsLogsClient, cfg.ClusterName, eventsLogsLookback)
return nil
}
251 changes: 251 additions & 0 deletions test/otel/standard/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
//go:build integration

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package standard

import (
"context"
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/require"
)

// ---------------------------------------------------------------------------
// TestEventsLogGroupExists — the events log group has records.
// ---------------------------------------------------------------------------

func TestEventsLogGroupExists(t *testing.T) {
results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents)
require.NoError(t, err, "querying events log group")
require.NotEmpty(t, results, "no event logs found in %s", eventsLogGroup())
}

// ---------------------------------------------------------------------------
// TestEventsResourceAttributes — every event has required resource attributes.
// ---------------------------------------------------------------------------

func TestEventsResourceAttributes(t *testing.T) {
results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents)
require.NoError(t, err)
require.NotEmpty(t, results)

requiredAttrs := []string{
"k8s.cluster.name",
"k8s.object.kind",
"k8s.object.name",
"k8s.object.uid",
"cloud.region",
"cloud.account.id",
"cloud.provider",
"cloud.platform",
"cloud.resource_id",
"aws.log.group.names",
}

for _, attr := range requiredAttrs {
t.Run(attr, func(t *testing.T) {
for i, r := range results {
v, ok := r.Resource[attr]
require.True(t, ok, "event[%d] missing resource.attributes.%s (kind=%s, name=%s)",
i, attr, r.Resource["k8s.object.kind"], r.Resource["k8s.object.name"])
require.NotEmpty(t, v, "event[%d] resource.attributes.%s is empty", i, attr)
}
})
}
}

// ---------------------------------------------------------------------------
// TestEventsClusterIdentity — cluster-level attributes are correct.
// ---------------------------------------------------------------------------

func TestEventsClusterIdentity(t *testing.T) {
results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents)
require.NoError(t, err)
require.NotEmpty(t, results)

expectedPrefix := fmt.Sprintf("arn:aws:eks:%s:", cfg.Region)
expectedSuffix := ":cluster/" + cfg.ClusterName

for i, r := range results {
require.Equal(t, cfg.ClusterName, r.Resource["k8s.cluster.name"],
"event[%d] k8s.cluster.name mismatch", i)
require.Equal(t, cfg.Region, r.Resource["cloud.region"],
"event[%d] cloud.region mismatch", i)
require.Equal(t, "aws", r.Resource["cloud.provider"],
"event[%d] cloud.provider mismatch", i)
require.Equal(t, "aws_eks", r.Resource["cloud.platform"],
"event[%d] cloud.platform mismatch", i)

rid := r.Resource["cloud.resource_id"]
require.True(t, strings.HasPrefix(rid, expectedPrefix),
"event[%d] cloud.resource_id %q should start with %q", i, rid, expectedPrefix)
require.True(t, strings.HasSuffix(rid, expectedSuffix),
"event[%d] cloud.resource_id %q should end with %q", i, rid, expectedSuffix)
}
}

// ---------------------------------------------------------------------------
// TestEventsScopeAttributes — scope name and attributes are correct.
// ---------------------------------------------------------------------------

func TestEventsScopeAttributes(t *testing.T) {
results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents)
require.NoError(t, err)
require.NotEmpty(t, results)

expectedScope := map[string]string{
"cloudwatch.source": "cloudwatch-agent",
"cloudwatch.solution": "k8s-otel-container-insights",
"cloudwatch.pipeline": "events",
}

for key, want := range expectedScope {
t.Run(key, func(t *testing.T) {
for i, r := range results {
got, ok := r.Scope[key]
require.True(t, ok, "event[%d] missing scope.attributes.%s", i, key)
require.Equal(t, want, got, "event[%d] scope.attributes.%s", i, key)
}
})
}
}

// ---------------------------------------------------------------------------
// TestEventsLogAttributes — k8s.event.* attributes and body/severity present.
// ---------------------------------------------------------------------------

func TestEventsLogAttributes(t *testing.T) {
results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents)
require.NoError(t, err)
require.NotEmpty(t, results)

requiredAttrs := []string{
"k8s.event.reason",
"k8s.event.uid",
"k8s.event.name",
"k8s.event.start_time",
}

for _, attr := range requiredAttrs {
t.Run(attr, func(t *testing.T) {
for i, r := range results {
v, ok := r.Attributes[attr]
require.True(t, ok, "event[%d] missing attributes.%s", i, attr)
require.NotEmpty(t, v, "event[%d] attributes.%s is empty", i, attr)
}
})
}

t.Run("severityText", func(t *testing.T) {
for i, r := range results {
require.NotEmpty(t, r.SeverityText, "event[%d] missing severityText", i)
}
})

t.Run("body", func(t *testing.T) {
for i, r := range results {
require.NotEmpty(t, r.Body, "event[%d] missing body", i)
}
})
}

// ---------------------------------------------------------------------------
// TestEventsPodEnrichment — Pod events are enriched with k8sattributes,
// nodemetadataenricher, and workload derivation.
// ---------------------------------------------------------------------------

func TestEventsPodEnrichment(t *testing.T) {
results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents)
require.NoError(t, err)
require.NotEmpty(t, results)

var podEvents []int
for i, r := range results {
if r.Resource["k8s.object.kind"] == "Pod" {
podEvents = append(podEvents, i)
}
}
if len(podEvents) == 0 {
t.Skip("no Pod events found — cluster may not have generated any yet")
}

// Receiver-sourced attributes — must always be present on Pod events.
t.Run("receiver_attrs", func(t *testing.T) {
for _, idx := range podEvents {
r := results[idx]
for _, key := range []string{"k8s.pod.name", "k8s.namespace.name"} {
v := r.Resource[key]
require.NotEmpty(t, v, "pod event[%d] missing receiver attribute %s (pod=%s)",
idx, key, r.Resource["k8s.object.name"])
}
}
})

// Enrichment-sourced attributes — k8sattributes adds k8s.node.name;
// nodemetadataenricher adds host.* and cloud.availability_zone. May be absent
// if pod was deleted before enrichment ran. Verify majority of pod events
// have these attributes (matches PR #697 logs convention).
t.Run("enrichment_attrs", func(t *testing.T) {
enrichmentAttrs := []string{
"k8s.node.name",
"host.id",
"host.name",
"host.type",
"host.image.id",
"cloud.availability_zone",
}
for _, key := range enrichmentAttrs {
t.Run(key, func(t *testing.T) {
count := 0
for _, idx := range podEvents {
if v, ok := results[idx].Resource[key]; ok && v != "" {
count++
}
}
require.True(t, count > len(podEvents)/2,
"pod event resource.attributes.%s present on only %d/%d pod events (expected majority)",
key, count, len(podEvents))
})
}
})

// At least one Pod event must have workload derivation.
t.Run("workload_derivation", func(t *testing.T) {
var hasWorkload bool
for _, idx := range podEvents {
r := results[idx]
if r.Resource["k8s.workload.name"] != "" {
hasWorkload = true
require.NotEmpty(t, r.Resource["k8s.workload.type"],
"pod event has k8s.workload.name but missing k8s.workload.type")
break
}
}
require.True(t, hasWorkload,
"no Pod events had workload enrichment — expected at least one workload-owned pod event")
})
}

// ---------------------------------------------------------------------------
// TestEventsKindDiversity — events for multiple K8s object kinds are present.
// ---------------------------------------------------------------------------

func TestEventsKindDiversity(t *testing.T) {
results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents)
require.NoError(t, err)
require.NotEmpty(t, results)

kinds := make(map[string]bool)
for _, r := range results {
if k := r.Resource["k8s.object.kind"]; k != "" {
kinds[k] = true
}
}
t.Logf("event kinds seen: %v", kinds)
require.True(t, len(kinds) >= 2,
"expected at least 2 event kinds, got %d: %v", len(kinds), kinds)
}
6 changes: 6 additions & 0 deletions test/otel/standard/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,11 @@ func TestMain(m *testing.M) {
otelmetrics.WithSourceRegistry(registry),
)

// Initialize events logs client for K8s events pipeline tests.
if err := initEventsLogsClient(ctx); err != nil {
fmt.Fprintf(os.Stderr, "Events logs client error: %v\n", err)
os.Exit(1)
}

os.Exit(m.Run())
}
Loading
Loading