Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions extension/awscloudwatchlogsprovisionerextension/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package awscloudwatchlogsprovisionerextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/awscloudwatchlogsprovisionerextension"

import (
"errors"
"time"

"go.opentelemetry.io/collector/component"
)

// Config for the awscloudwatchlogsprovisioner extension.
//
// The extension reads x-aws-log-group and x-aws-log-stream headers from outgoing
// HTTP requests and lazily creates the corresponding CloudWatch log groups and
// streams. Headers can be set by the otlphttp exporter (static) or by the
// headers_setter extension (dynamic, from client.Metadata).
//
// Alternatively, LogGroup and LogStream can be set directly in this config.
// When set, the extension injects them as request headers, avoiding the need
// to pass them through confighttp.ClientConfig.Headers (which uses configopaque.String
// and gets nil'd during YAML serialization).
type Config struct {
// Region is the AWS region for CloudWatch Logs API calls (required).
Region string `mapstructure:"region"`

// AdditionalAuth is a reference to the inner auth extension (typically sigv4auth)
// that this extension chains with for request signing. Follows the same pattern
// as headers_setter's additional_auth field.
AdditionalAuth *component.ID `mapstructure:"additional_auth"`

// LogGroup is the CloudWatch log group name. When set, the extension injects
// it as the x-aws-log-group header on outgoing requests.
LogGroup string `mapstructure:"log_group"`

// LogStream is the CloudWatch log stream name. When set, the extension injects
// it as the x-aws-log-stream header on outgoing requests.
LogStream string `mapstructure:"log_stream"`

// LogRetention is the log retention in days. When set, the extension injects
// it as the x-aws-log-retention header on outgoing requests.
LogRetention int64 `mapstructure:"log_retention"`

// LogsProvisionTimeout is the HTTP timeout for each CreateLogGroup/CreateLogStream
// API call (including SDK retries). Bounds how long singleflight waiters block.
// Default: 10s.
LogsProvisionTimeout time.Duration `mapstructure:"logs_provision_timeout"`

// LogsProvisionFailureBackoff is the TTL for negative cache entries.
// During this period, the extension won't retry creation for the same (group, stream) pair.
// Default: 30s.
LogsProvisionFailureBackoff time.Duration `mapstructure:"logs_provision_failure_backoff"`
}

func (cfg *Config) Validate() error {
if cfg.Region == "" {
return errors.New("region is required")
}
if cfg.LogsProvisionTimeout <= 0 {
return errors.New("logs_provision_timeout must be positive")
}
if cfg.LogsProvisionFailureBackoff < 0 {
return errors.New("logs_provision_failure_backoff must not be negative")
}
return nil
}
57 changes: 57 additions & 0 deletions extension/awscloudwatchlogsprovisionerextension/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package awscloudwatchlogsprovisionerextension

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
)

func TestConfig_Defaults(t *testing.T) {
cfg := createDefaultConfig().(*Config)

assert.Equal(t, 10*time.Second, cfg.LogsProvisionTimeout)
assert.Equal(t, 30*time.Second, cfg.LogsProvisionFailureBackoff)
assert.Nil(t, cfg.AdditionalAuth)
assert.Empty(t, cfg.Region)
}

func TestConfig_Validate_RequiresRegion(t *testing.T) {
cfg := &Config{LogsProvisionTimeout: 10 * time.Second}
assert.Error(t, cfg.Validate(), "should require region")

cfg.Region = "us-east-1"
assert.NoError(t, cfg.Validate())
}

func TestConfig_Validate_RequiresPositiveTimeout(t *testing.T) {
cfg := &Config{Region: "us-east-1", LogsProvisionTimeout: 0}
assert.Error(t, cfg.Validate(), "should reject zero timeout")

cfg.LogsProvisionTimeout = -1 * time.Second
assert.Error(t, cfg.Validate(), "should reject negative timeout")

cfg.LogsProvisionTimeout = 10 * time.Second
assert.NoError(t, cfg.Validate())
}

func TestConfig_WithAdditionalAuth(t *testing.T) {
authID := component.MustNewID("sigv4auth")
cfg := &Config{
Region: "us-east-1",
LogsProvisionTimeout: 10 * time.Second,
AdditionalAuth: &authID,
}

assert.Equal(t, "sigv4auth", cfg.AdditionalAuth.String())
assert.NoError(t, cfg.Validate())
}

func TestFactory_Type(t *testing.T) {
f := NewFactory()
assert.Equal(t, component.MustNewType("awscloudwatchlogsprovisioner"), f.Type())
}
63 changes: 63 additions & 0 deletions extension/awscloudwatchlogsprovisionerextension/cwlogsclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package awscloudwatchlogsprovisionerextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/awscloudwatchlogsprovisionerextension"

import (
"context"
"errors"
"net/http"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
)

type defaultCWLogsClient struct {
svc *cloudwatchlogs.Client
}

func newDefaultCWLogsClient(ctx context.Context, region string, timeout time.Duration) (cwLogsClient, error) {
cfg, err := awsconfig.LoadDefaultConfig(
ctx,
awsconfig.WithRegion(region),
awsconfig.WithHTTPClient(&http.Client{Timeout: timeout}),
)
if err != nil {
return nil, err
}
return &defaultCWLogsClient{svc: cloudwatchlogs.NewFromConfig(cfg)}, nil
}

func (c *defaultCWLogsClient) CreateLogGroup(ctx context.Context, logGroupName string) error {
_, err := c.svc.CreateLogGroup(ctx, &cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String(logGroupName),
})
if err != nil && !isAlreadyExists(err) {
return err
}
return nil
}

func (c *defaultCWLogsClient) CreateLogStream(ctx context.Context, logGroupName, logStreamName string) error {
_, err := c.svc.CreateLogStream(ctx, &cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String(logGroupName),
LogStreamName: aws.String(logStreamName),
})
if err != nil && !isAlreadyExists(err) {
return err
}
return nil
}

func isAlreadyExists(err error) bool {
var alreadyExists *types.ResourceAlreadyExistsException
return errors.As(err, &alreadyExists)
}

func isNotFound(err error) bool {
var notFound *types.ResourceNotFoundException
return errors.As(err, &notFound)
}
9 changes: 9 additions & 0 deletions extension/awscloudwatchlogsprovisionerextension/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml

// Package awscloudwatchlogsprovisionerextension implements extensionauth.HTTPClient
// to dynamically set x-aws-log-group headers and create CloudWatch log groups and
// streams on first encounter.
package awscloudwatchlogsprovisionerextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/awscloudwatchlogsprovisionerextension"
Loading
Loading