From 3f92a95f8434cbd99e93ff8dcfd77cf329b522b7 Mon Sep 17 00:00:00 2001 From: Louise Allen Date: Tue, 19 May 2026 13:40:02 +0000 Subject: [PATCH] Add OTEL K8s events integration tests Port K8s events integration tests into the agent test framework for the OTEL standard cluster. Tests validate that K8s events ingested via the k8s_events receiver flow through the cluster-scraper logs pipeline, land in CloudWatch Logs, and carry the expected resource/scope/log attributes set by the pipeline processors (k8sattributes, nodemetadataenricher, transform). Components: - util/otellogs/: CW Logs Insights client with query cache and OTLP JSON parser, mirroring the util/otelmetrics structure - test/otel/standard/events_test.go: 7 integration tests covering log group existence, resource attributes, cluster identity, scope attributes, log attributes, pod enrichment, and event kind diversity - test/otel/standard/events_setup_test.go: log group helper, pipeline constant, and logs client init invoked from TestMain - terraform/eks/daemon/otel/: pre-create the events log group and stream (OTLP x-aws-log-group-create header is not honored by the backend), enable otelContainerInsights.events.enabled in helm, bump test-runner sleep from 3 to 5 minutes for log propagation Validated locally against an existing standard cluster: 7/7 PASS. --- terraform/eks/daemon/otel/main.tf | 19 +- test/otel/standard/events_setup_test.go | 45 +++++ test/otel/standard/events_test.go | 251 ++++++++++++++++++++++++ test/otel/standard/setup_test.go | 6 + util/otellogs/client.go | 109 ++++++++++ util/otellogs/models.go | 123 ++++++++++++ util/otellogs/query_cache.go | 134 +++++++++++++ 7 files changed, 684 insertions(+), 3 deletions(-) create mode 100644 test/otel/standard/events_setup_test.go create mode 100644 test/otel/standard/events_test.go create mode 100644 util/otellogs/client.go create mode 100644 util/otellogs/models.go create mode 100644 util/otellogs/query_cache.go diff --git a/terraform/eks/daemon/otel/main.tf b/terraform/eks/daemon/otel/main.tf index 22b3d5de2..c68441ad9 100644 --- a/terraform/eks/daemon/otel/main.tf +++ b/terraform/eks/daemon/otel/main.tf @@ -130,6 +130,18 @@ data "external" "clone_helm_chart" { ] } +# Pre-create the events log group and stream — the OTLP HTTP exporter's +# x-aws-log-group-create header is not yet honored by the backend. +resource "aws_cloudwatch_log_group" "events" { + name = "/aws/containerinsights/${aws_eks_cluster.this.name}/events" + retention_in_days = 1 +} + +resource "aws_cloudwatch_log_stream" "events" { + name = "events" + log_group_name = aws_cloudwatch_log_group.events.name +} + resource "helm_release" "aws_observability" { name = "amazon-cloudwatch-observability" chart = "./helm-charts/charts/amazon-cloudwatch-observability" @@ -139,13 +151,14 @@ resource "helm_release" "aws_observability" { set = [ { name = "clusterName", value = aws_eks_cluster.this.name }, { name = "region", value = var.region }, - { name = "otelContainerInsights.enabled", value = "true" }, + { name = "otelContainerInsights.events.enabled", value = "true" } ] depends_on = [ aws_eks_addon.pod_identity_agent, null_resource.kubectl, data.external.clone_helm_chart, + aws_cloudwatch_log_stream.events, ] } @@ -360,8 +373,8 @@ resource "null_resource" "validator" { echo "Running OTEL standard cluster integration tests" cd ../../../.. - echo "Waiting 3 minutes for metrics to propagate..." - sleep 180 + echo "Waiting 5 minutes for metrics and events to propagate..." + sleep 300 go test -tags integration -timeout 1h -v ${var.test_dir} \ -eksClusterName=${aws_eks_cluster.this.name} \ diff --git a/test/otel/standard/events_setup_test.go b/test/otel/standard/events_setup_test.go new file mode 100644 index 000000000..0a61458af --- /dev/null +++ b/test/otel/standard/events_setup_test.go @@ -0,0 +1,45 @@ +//go:build integration + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package standard + +import ( + "context" + "fmt" + "time" + + "github.com/aws/amazon-cloudwatch-agent-test/util/otellogs" +) + +// Events log group follows the convention: +// /aws/containerinsights/{clusterName}/events +func eventsLogGroup() string { + return fmt.Sprintf("/aws/containerinsights/%s/events", cfg.ClusterName) +} + +const pipelineEvents = "events" + +var ( + eventsLogsClient *otellogs.OtelLogsClient + eventsLogQueryCache *otellogs.LogQueryCache + eventsLogsLookback = 10 * time.Minute +) + +// initEventsLogsClient is called from TestMain (setup_test.go) after cfg is populated. +func initEventsLogsClient(ctx context.Context) error { + var err error + logsCfg := otellogs.LogsConfig{ + Region: cfg.Region, + ClusterName: cfg.ClusterName, + AccountID: cfg.AccountID, + LookbackWindow: eventsLogsLookback, + } + eventsLogsClient, err = otellogs.NewClient(ctx, logsCfg) + if err != nil { + return fmt.Errorf("creating events logs client: %w", err) + } + eventsLogQueryCache = otellogs.NewLogQueryCache(eventsLogsClient, cfg.ClusterName, eventsLogsLookback) + return nil +} diff --git a/test/otel/standard/events_test.go b/test/otel/standard/events_test.go new file mode 100644 index 000000000..8bd354b23 --- /dev/null +++ b/test/otel/standard/events_test.go @@ -0,0 +1,251 @@ +//go:build integration + +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package standard + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +// --------------------------------------------------------------------------- +// TestEventsLogGroupExists — the events log group has records. +// --------------------------------------------------------------------------- + +func TestEventsLogGroupExists(t *testing.T) { + results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents) + require.NoError(t, err, "querying events log group") + require.NotEmpty(t, results, "no event logs found in %s", eventsLogGroup()) +} + +// --------------------------------------------------------------------------- +// TestEventsResourceAttributes — every event has required resource attributes. +// --------------------------------------------------------------------------- + +func TestEventsResourceAttributes(t *testing.T) { + results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents) + require.NoError(t, err) + require.NotEmpty(t, results) + + requiredAttrs := []string{ + "k8s.cluster.name", + "k8s.object.kind", + "k8s.object.name", + "k8s.object.uid", + "cloud.region", + "cloud.account.id", + "cloud.provider", + "cloud.platform", + "cloud.resource_id", + "aws.log.group.names", + } + + for _, attr := range requiredAttrs { + t.Run(attr, func(t *testing.T) { + for i, r := range results { + v, ok := r.Resource[attr] + require.True(t, ok, "event[%d] missing resource.attributes.%s (kind=%s, name=%s)", + i, attr, r.Resource["k8s.object.kind"], r.Resource["k8s.object.name"]) + require.NotEmpty(t, v, "event[%d] resource.attributes.%s is empty", i, attr) + } + }) + } +} + +// --------------------------------------------------------------------------- +// TestEventsClusterIdentity — cluster-level attributes are correct. +// --------------------------------------------------------------------------- + +func TestEventsClusterIdentity(t *testing.T) { + results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents) + require.NoError(t, err) + require.NotEmpty(t, results) + + expectedPrefix := fmt.Sprintf("arn:aws:eks:%s:", cfg.Region) + expectedSuffix := ":cluster/" + cfg.ClusterName + + for i, r := range results { + require.Equal(t, cfg.ClusterName, r.Resource["k8s.cluster.name"], + "event[%d] k8s.cluster.name mismatch", i) + require.Equal(t, cfg.Region, r.Resource["cloud.region"], + "event[%d] cloud.region mismatch", i) + require.Equal(t, "aws", r.Resource["cloud.provider"], + "event[%d] cloud.provider mismatch", i) + require.Equal(t, "aws_eks", r.Resource["cloud.platform"], + "event[%d] cloud.platform mismatch", i) + + rid := r.Resource["cloud.resource_id"] + require.True(t, strings.HasPrefix(rid, expectedPrefix), + "event[%d] cloud.resource_id %q should start with %q", i, rid, expectedPrefix) + require.True(t, strings.HasSuffix(rid, expectedSuffix), + "event[%d] cloud.resource_id %q should end with %q", i, rid, expectedSuffix) + } +} + +// --------------------------------------------------------------------------- +// TestEventsScopeAttributes — scope name and attributes are correct. +// --------------------------------------------------------------------------- + +func TestEventsScopeAttributes(t *testing.T) { + results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents) + require.NoError(t, err) + require.NotEmpty(t, results) + + expectedScope := map[string]string{ + "cloudwatch.source": "cloudwatch-agent", + "cloudwatch.solution": "k8s-otel-container-insights", + "cloudwatch.pipeline": "events", + } + + for key, want := range expectedScope { + t.Run(key, func(t *testing.T) { + for i, r := range results { + got, ok := r.Scope[key] + require.True(t, ok, "event[%d] missing scope.attributes.%s", i, key) + require.Equal(t, want, got, "event[%d] scope.attributes.%s", i, key) + } + }) + } +} + +// --------------------------------------------------------------------------- +// TestEventsLogAttributes — k8s.event.* attributes and body/severity present. +// --------------------------------------------------------------------------- + +func TestEventsLogAttributes(t *testing.T) { + results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents) + require.NoError(t, err) + require.NotEmpty(t, results) + + requiredAttrs := []string{ + "k8s.event.reason", + "k8s.event.uid", + "k8s.event.name", + "k8s.event.start_time", + } + + for _, attr := range requiredAttrs { + t.Run(attr, func(t *testing.T) { + for i, r := range results { + v, ok := r.Attributes[attr] + require.True(t, ok, "event[%d] missing attributes.%s", i, attr) + require.NotEmpty(t, v, "event[%d] attributes.%s is empty", i, attr) + } + }) + } + + t.Run("severityText", func(t *testing.T) { + for i, r := range results { + require.NotEmpty(t, r.SeverityText, "event[%d] missing severityText", i) + } + }) + + t.Run("body", func(t *testing.T) { + for i, r := range results { + require.NotEmpty(t, r.Body, "event[%d] missing body", i) + } + }) +} + +// --------------------------------------------------------------------------- +// TestEventsPodEnrichment — Pod events are enriched with k8sattributes, +// nodemetadataenricher, and workload derivation. +// --------------------------------------------------------------------------- + +func TestEventsPodEnrichment(t *testing.T) { + results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents) + require.NoError(t, err) + require.NotEmpty(t, results) + + var podEvents []int + for i, r := range results { + if r.Resource["k8s.object.kind"] == "Pod" { + podEvents = append(podEvents, i) + } + } + if len(podEvents) == 0 { + t.Skip("no Pod events found — cluster may not have generated any yet") + } + + // Receiver-sourced attributes — must always be present on Pod events. + t.Run("receiver_attrs", func(t *testing.T) { + for _, idx := range podEvents { + r := results[idx] + for _, key := range []string{"k8s.pod.name", "k8s.namespace.name"} { + v := r.Resource[key] + require.NotEmpty(t, v, "pod event[%d] missing receiver attribute %s (pod=%s)", + idx, key, r.Resource["k8s.object.name"]) + } + } + }) + + // Enrichment-sourced attributes — k8sattributes adds k8s.node.name; + // nodemetadataenricher adds host.* and cloud.availability_zone. May be absent + // if pod was deleted before enrichment ran. Verify majority of pod events + // have these attributes (matches PR #697 logs convention). + t.Run("enrichment_attrs", func(t *testing.T) { + enrichmentAttrs := []string{ + "k8s.node.name", + "host.id", + "host.name", + "host.type", + "host.image.id", + "cloud.availability_zone", + } + for _, key := range enrichmentAttrs { + t.Run(key, func(t *testing.T) { + count := 0 + for _, idx := range podEvents { + if v, ok := results[idx].Resource[key]; ok && v != "" { + count++ + } + } + require.True(t, count > len(podEvents)/2, + "pod event resource.attributes.%s present on only %d/%d pod events (expected majority)", + key, count, len(podEvents)) + }) + } + }) + + // At least one Pod event must have workload derivation. + t.Run("workload_derivation", func(t *testing.T) { + var hasWorkload bool + for _, idx := range podEvents { + r := results[idx] + if r.Resource["k8s.workload.name"] != "" { + hasWorkload = true + require.NotEmpty(t, r.Resource["k8s.workload.type"], + "pod event has k8s.workload.name but missing k8s.workload.type") + break + } + } + require.True(t, hasWorkload, + "no Pod events had workload enrichment — expected at least one workload-owned pod event") + }) +} + +// --------------------------------------------------------------------------- +// TestEventsKindDiversity — events for multiple K8s object kinds are present. +// --------------------------------------------------------------------------- + +func TestEventsKindDiversity(t *testing.T) { + results, err := eventsLogQueryCache.Get(context.Background(), eventsLogGroup(), pipelineEvents) + require.NoError(t, err) + require.NotEmpty(t, results) + + kinds := make(map[string]bool) + for _, r := range results { + if k := r.Resource["k8s.object.kind"]; k != "" { + kinds[k] = true + } + } + t.Logf("event kinds seen: %v", kinds) + require.True(t, len(kinds) >= 2, + "expected at least 2 event kinds, got %d: %v", len(kinds), kinds) +} diff --git a/test/otel/standard/setup_test.go b/test/otel/standard/setup_test.go index f597be166..8dcfc8577 100644 --- a/test/otel/standard/setup_test.go +++ b/test/otel/standard/setup_test.go @@ -102,5 +102,11 @@ func TestMain(m *testing.M) { otelmetrics.WithSourceRegistry(registry), ) + // Initialize events logs client for K8s events pipeline tests. + if err := initEventsLogsClient(ctx); err != nil { + fmt.Fprintf(os.Stderr, "Events logs client error: %v\n", err) + os.Exit(1) + } + os.Exit(m.Run()) } diff --git a/util/otellogs/client.go b/util/otellogs/client.go new file mode 100644 index 000000000..da2be3e92 --- /dev/null +++ b/util/otellogs/client.go @@ -0,0 +1,109 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package otellogs + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" +) + +// LogsConfig holds configuration for the OTEL logs integration test client. +type LogsConfig struct { + Region string + ClusterName string + AccountID string + LookbackWindow time.Duration +} + +// OtelLogsClient queries CloudWatch Logs Insights for OTLP-ingested logs. +type OtelLogsClient struct { + cwl *cloudwatchlogs.Client + region string + pollInterval time.Duration + maxPollRetries int +} + +// NewClient creates an OtelLogsClient. +func NewClient(ctx context.Context, cfg LogsConfig) (*OtelLogsClient, error) { + awsCfg, err := awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(cfg.Region)) + if err != nil { + return nil, fmt.Errorf("loading AWS config: %w", err) + } + return &OtelLogsClient{ + cwl: cloudwatchlogs.NewFromConfig(awsCfg), + region: cfg.Region, + pollInterval: 2 * time.Second, + maxPollRetries: 30, + }, nil +} + +// Query executes a Logs Insights query against the given log group and returns +// parsed LogResult entries. The query should select fields needed for validation. +// timeRange specifies how far back to look (from now). +func (c *OtelLogsClient) Query(ctx context.Context, logGroup string, query string, timeRange time.Duration) ([]LogResult, error) { + rows, err := c.QueryRaw(ctx, logGroup, query, timeRange) + if err != nil { + return nil, err + } + results := make([]LogResult, 0, len(rows)) + for _, row := range rows { + lr := LogResult{ + Resource: make(map[string]string), + Scope: make(map[string]string), + Attributes: make(map[string]string), + } + for _, field := range row { + if field.Field == nil || field.Value == nil { + continue + } + lr.SetField(*field.Field, *field.Value) + } + results = append(results, lr) + } + return results, nil +} + +// QueryRaw executes a Logs Insights query and returns the raw result fields. +func (c *OtelLogsClient) QueryRaw(ctx context.Context, logGroup string, query string, timeRange time.Duration) ([][]types.ResultField, error) { + end := time.Now() + start := end.Add(-timeRange) + + slog.Debug("starting logs query", "logGroup", logGroup, "query", query) + + output, err := c.cwl.StartQuery(ctx, &cloudwatchlogs.StartQueryInput{ + LogGroupName: aws.String(logGroup), + StartTime: aws.Int64(start.Unix()), + EndTime: aws.Int64(end.Unix()), + QueryString: aws.String(query), + Limit: aws.Int32(1000), + }) + if err != nil { + return nil, fmt.Errorf("StartQuery: %w", err) + } + + for attempt := 0; attempt < c.maxPollRetries; attempt++ { + resp, err := c.cwl.GetQueryResults(ctx, &cloudwatchlogs.GetQueryResultsInput{ + QueryId: output.QueryId, + }) + if err != nil { + return nil, fmt.Errorf("GetQueryResults: %w", err) + } + switch resp.Status { + case types.QueryStatusComplete: + slog.Debug("logs query returned", "rows", len(resp.Results)) + return resp.Results, nil + case types.QueryStatusFailed, types.QueryStatusCancelled, types.QueryStatusTimeout: + return nil, fmt.Errorf("query ended with status: %s", resp.Status) + } + time.Sleep(c.pollInterval) + } + return nil, fmt.Errorf("query did not complete after %d attempts", c.maxPollRetries) +} diff --git a/util/otellogs/models.go b/util/otellogs/models.go new file mode 100644 index 000000000..a358d5e60 --- /dev/null +++ b/util/otellogs/models.go @@ -0,0 +1,123 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package otellogs + +import ( + "encoding/json" + "fmt" + "strings" +) + +// LogResult represents a single OTLP log record parsed from CW Logs. +type LogResult struct { + Resource map[string]string + Scope map[string]string + Attributes map[string]string + Body string + SeverityText string + SeverityNumber string + TimeUnixNano string + ObservedTimeUnixNano string + TraceID string + SpanID string +} + +// SetField routes a Logs Insights result field into the appropriate LogResult +// field based on its dot-path prefix. +func (lr *LogResult) SetField(field, value string) { + switch { + case field == "body": + lr.Body = value + case field == "severityText": + lr.SeverityText = value + case field == "severityNumber": + lr.SeverityNumber = value + case field == "timeUnixNano": + lr.TimeUnixNano = value + case field == "observedTimeUnixNano": + lr.ObservedTimeUnixNano = value + case field == "traceId": + lr.TraceID = value + case field == "spanId": + lr.SpanID = value + case strings.HasPrefix(field, "resource.attributes."): + lr.Resource[strings.TrimPrefix(field, "resource.attributes.")] = value + case strings.HasPrefix(field, "scope.attributes."): + lr.Scope[strings.TrimPrefix(field, "scope.attributes.")] = value + case strings.HasPrefix(field, "attributes."): + lr.Attributes[strings.TrimPrefix(field, "attributes.")] = value + } +} + +// HasResource returns true if the resource attribute key exists and is non-empty. +func (lr *LogResult) HasResource(key string) bool { + v, ok := lr.Resource[key] + return ok && v != "" +} + +// HasScope returns true if the scope attribute key exists and is non-empty. +func (lr *LogResult) HasScope(key string) bool { + v, ok := lr.Scope[key] + return ok && v != "" +} + +// otlpLogJSON mirrors the JSON structure of an OTLP log record as stored in +// CloudWatch Logs when ingested via the OTLP endpoint. +type otlpLogJSON struct { + Resource struct { + Attributes map[string]any `json:"attributes"` + } `json:"resource"` + Scope struct { + Attributes map[string]any `json:"attributes"` + } `json:"scope"` + Body string `json:"body"` + Attributes map[string]any `json:"attributes"` + SeverityText string `json:"severityText"` + SeverityNumber json.Number `json:"severityNumber"` + TimeUnixNano json.Number `json:"timeUnixNano"` + ObservedTimeUnixNano json.Number `json:"observedTimeUnixNano"` + TraceID string `json:"traceId"` + SpanID string `json:"spanId"` +} + +// ParseOTLPLogJSON parses a raw OTLP log JSON string into a LogResult. +func ParseOTLPLogJSON(raw string) (LogResult, error) { + var j otlpLogJSON + if err := json.Unmarshal([]byte(raw), &j); err != nil { + return LogResult{}, fmt.Errorf("parsing OTLP log JSON: %w", err) + } + lr := LogResult{ + Resource: toStringMap(j.Resource.Attributes), + Scope: toStringMap(j.Scope.Attributes), + Attributes: toStringMap(j.Attributes), + Body: j.Body, + SeverityText: j.SeverityText, + SeverityNumber: j.SeverityNumber.String(), + TimeUnixNano: j.TimeUnixNano.String(), + ObservedTimeUnixNano: j.ObservedTimeUnixNano.String(), + TraceID: j.TraceID, + SpanID: j.SpanID, + } + if lr.Resource == nil { + lr.Resource = make(map[string]string) + } + if lr.Scope == nil { + lr.Scope = make(map[string]string) + } + if lr.Attributes == nil { + lr.Attributes = make(map[string]string) + } + return lr, nil +} + +func toStringMap(m map[string]any) map[string]string { + if m == nil { + return make(map[string]string) + } + out := make(map[string]string, len(m)) + for k, v := range m { + out[k] = fmt.Sprintf("%v", v) + } + return out +} diff --git a/util/otellogs/query_cache.go b/util/otellogs/query_cache.go new file mode 100644 index 000000000..e6fd1e07b --- /dev/null +++ b/util/otellogs/query_cache.go @@ -0,0 +1,134 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package otellogs + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" +) + +// LogQueryCache provides session-scoped caching of Logs Insights queries. +type LogQueryCache struct { + mu sync.RWMutex + cache map[string]cacheEntry + inflight map[string]chan struct{} + client *OtelLogsClient + cluster string + lookback time.Duration +} + +type cacheEntry struct { + results []LogResult + err error +} + +// NewLogQueryCache creates a cache backed by the given client. +func NewLogQueryCache(client *OtelLogsClient, clusterName string, lookback time.Duration) *LogQueryCache { + if lookback == 0 { + lookback = 10 * time.Minute + } + return &LogQueryCache{ + cache: make(map[string]cacheEntry), + inflight: make(map[string]chan struct{}), + client: client, + cluster: clusterName, + lookback: lookback, + } +} + +// Get returns cached log results for a given log group filtered by pipeline. +func (c *LogQueryCache) Get(ctx context.Context, logGroup, pipeline string) ([]LogResult, error) { + key := logGroup + "|" + pipeline + + c.mu.RLock() + if entry, ok := c.cache[key]; ok { + c.mu.RUnlock() + return entry.results, entry.err + } + if ch, ok := c.inflight[key]; ok { + c.mu.RUnlock() + <-ch + c.mu.RLock() + entry := c.cache[key] + c.mu.RUnlock() + return entry.results, entry.err + } + c.mu.RUnlock() + + c.mu.Lock() + if entry, ok := c.cache[key]; ok { + c.mu.Unlock() + return entry.results, entry.err + } + if ch, ok := c.inflight[key]; ok { + c.mu.Unlock() + <-ch + c.mu.RLock() + entry := c.cache[key] + c.mu.RUnlock() + return entry.results, entry.err + } + ch := make(chan struct{}) + c.inflight[key] = ch + c.mu.Unlock() + + entry := c.fetch(ctx, logGroup, pipeline) + + c.mu.Lock() + c.cache[key] = entry + delete(c.inflight, key) + c.mu.Unlock() + close(ch) + + return entry.results, entry.err +} + +func (c *LogQueryCache) fetch(ctx context.Context, logGroup, pipeline string) cacheEntry { + // Use @message substring matching because OTLP log JSON has dots in key + // names (e.g. "k8s.cluster.name") which Insights cannot filter structurally. + query := fmt.Sprintf( + `fields @message`+ + ` | filter @message like %q`+ + ` | filter @message like %q`+ + ` | limit 200`, + c.cluster, pipeline, + ) + rows, err := c.client.QueryRaw(ctx, logGroup, query, c.lookback) + if err != nil { + return cacheEntry{err: err} + } + results := parseMessageRows(rows) + return cacheEntry{results: results} +} + +func parseMessageRows(rows [][]types.ResultField) []LogResult { + var results []LogResult + for _, row := range rows { + msg := extractField(row, "@message") + if msg == "" { + continue + } + lr, err := ParseOTLPLogJSON(msg) + if err != nil { + continue + } + results = append(results, lr) + } + return results +} + +func extractField(row []types.ResultField, name string) string { + for _, f := range row { + if f.Field != nil && *f.Field == name { + if f.Value != nil { + return *f.Value + } + } + } + return "" +}