Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/hybrid-agent-beats-receivers.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ https://github.com/elastic/kibana/issues/233186 is implemented. Before that chan
overrides API can be used to add `_runtime_experimental: "otel"` to the `agent.monitoring` section of the policy.
See https://support.elastic.dev/knowledge/view/06b69893 for details on the policy overrides API.

For the Elastic Agent container images, the `AGENT_MONITORING_RUNTIME_EXPERIMENTAL` environment variable can be set to either `process` or `otel` to override the default runtime used for agent monitoring.

Executing the `elastic-agent diagnostics` command in this mode will now produce an `otel-final.yml` file showing the generated
collector configuration used to run the Beat receivers.

Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/agent/cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ be used when the same credentials will be used across all the possible actions a
KIBANA_CA - path to certificate authority to use with communicate with Kibana [$ELASTICSEARCH_CA]
ELASTIC_AGENT_TAGS - user provided tags for the agent [linux,staging]

* Beats Receivers
The following experimental environment variables can be set to enable using Beats Receivers.

AGENT_MONITORING_RUNTIME_EXPERIMENTAL - Set to either "process" or "otel" to use the respective runtime for the monitoring components.

* Elastic-Agent event logging
If EVENTS_TO_STDERR is set to true log entries containing event data or whole raw events will be logged to stderr alongside
Expand Down
10 changes: 9 additions & 1 deletion internal/pkg/core/monitoring/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package config

import (
"os"
"strings"
"time"

Expand Down Expand Up @@ -111,6 +112,13 @@ type BufferConfig struct {

// DefaultConfig creates a config with pre-set default values.
func DefaultConfig() *MonitoringConfig {
monRuntimeManager := DefaultRuntimeManager
monRuntimeEnv := os.Getenv("AGENT_MONITORING_RUNTIME_EXPERIMENTAL")
switch monRuntimeEnv {
case ProcessRuntimeManager, OtelRuntimeManager:
monRuntimeManager = monRuntimeEnv
}

return &MonitoringConfig{
Enabled: true,
MonitorLogs: true,
Expand All @@ -125,7 +133,7 @@ func DefaultConfig() *MonitoringConfig {
Namespace: defaultNamespace,
APM: defaultAPMConfig(),
Diagnostics: defaultDiagnostics(),
RuntimeManager: DefaultRuntimeManager,
RuntimeManager: monRuntimeManager,
}
}

Expand Down
312 changes: 312 additions & 0 deletions testing/integration/ess/container_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/kibana"
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/core/process"
atesting "github.com/elastic/elastic-agent/pkg/testing"
"github.com/elastic/elastic-agent/pkg/testing/define"
Expand Down Expand Up @@ -444,6 +446,246 @@ func createMockESOutput(t *testing.T, info *define.Info, percentDuplicate, perce
return mockesURL, outputResp.Item.ID
}

// TestContainerCMDAgentMonitoringRuntimeExperimental tests that when
// AGENT_MONITORING_RUNTIME_EXPERIMENTAL is set, Elastic Agent uses the
// respective runtime to run the agent.monitoring components from the
// local configuration.
func TestContainerCMDAgentMonitoringRuntimeExperimental(t *testing.T) {
define.Require(t, define.Requirements{
Stack: &define.Stack{},
Local: false,
Sudo: true,
OS: []define.OS{
{Type: define.Linux},
},
Group: "container",
})

testCases := []struct {
name string
agentMonitoringRuntimeEnv string
expectedRuntimeName string
}{
{
name: "var set to otel",
agentMonitoringRuntimeEnv: monitoringCfg.OtelRuntimeManager,
expectedRuntimeName: string(monitoringCfg.OtelRuntimeManager),
},
{
name: "var set to process",
agentMonitoringRuntimeEnv: monitoringCfg.ProcessRuntimeManager,
expectedRuntimeName: string(monitoringCfg.ProcessRuntimeManager),
},
{
name: "var set to invalid value",
agentMonitoringRuntimeEnv: "invalid",
expectedRuntimeName: string(monitoringCfg.DefaultRuntimeManager),
},
{
name: "var not set",
agentMonitoringRuntimeEnv: "",
expectedRuntimeName: string(monitoringCfg.DefaultRuntimeManager),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute)
defer cancel()

agentFixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
require.NoError(t, err)

err = agentFixture.Prepare(ctx)
require.NoError(t, err)

mockesURL := integration.StartMockES(t, 0, 0, 0, 0)

// Create a local agent config file with monitoring enabled
agentConfig := createSimpleAgentMonitoringConfig(t, agentFixture.WorkDir(), mockesURL)

env := []string{
"STATE_PATH=" + agentFixture.WorkDir(),
}

// Set environment variable if specified
if tc.agentMonitoringRuntimeEnv != "" {
env = append(env, "AGENT_MONITORING_RUNTIME_EXPERIMENTAL="+tc.agentMonitoringRuntimeEnv)
}

cmd, agentOutput := prepareAgentCMD(t, ctx, agentFixture, []string{"container", "-c", agentConfig}, env)
t.Logf(">> running binary with: %v", cmd.Args)
if err := cmd.Start(); err != nil {
t.Fatalf("error running container cmd: %s", err)
}

require.EventuallyWithT(t, func(ct *assert.CollectT) {
err = agentFixture.IsHealthy(ctx, atesting.WithCmdOptions(withEnv(env)))
require.NoError(ct, err)
},
2*time.Minute, time.Second,
"Elastic-Agent did not report healthy. Agent status error: \"%v\", Agent logs\n%s",
err, agentOutput,
)

// Verify that components are using the expected runtime
require.EventuallyWithTf(t, func(ct *assert.CollectT) {
status, err := agentFixture.ExecStatus(ctx, atesting.WithCmdOptions(withEnv(env)))
require.NoErrorf(t, err, "error getting agent status")

expectedComponentCount := 4 // process runtime
if tc.expectedRuntimeName == string(monitoringCfg.OtelRuntimeManager) {
expectedComponentCount = 5
}

require.Len(ct, status.Components, expectedComponentCount, "expected right number of components in agent status")

for _, comp := range status.Components {
var compRuntime string
switch comp.VersionInfo.Name {
case "beats-receiver":
compRuntime = string(component.OtelRuntimeManager)
case "beat-v2-client":
compRuntime = string(component.ProcessRuntimeManager)
}
t.Logf("Component ID: %s, version info: %s, runtime: %s", comp.ID, comp.VersionInfo.Name, compRuntime)
switch comp.ID {
case "beat/metrics-monitoring", "filestream-monitoring", "http/metrics-monitoring", "prometheus/metrics-monitoring":
// Monitoring components should use the expected runtime
assert.Equalf(t, tc.expectedRuntimeName, compRuntime, "expected correct runtime name for monitoring component %s with id %s", comp.Name, comp.ID)
default:
// Non-monitoring components should use the default runtime
assert.Equalf(t, string(component.DefaultRuntimeManager), compRuntime, "expected default runtime for non-monitoring component %s with id %s", comp.Name, comp.ID)
}
}
}, 1*time.Minute, 1*time.Second,
"components did not use expected runtime",
)
})
}
}

// TestContainerCMDAgentMonitoringRuntimeExperimentalPolicy tests that when
// AGENT_MONITORING_RUNTIME_EXPERIMENTAL is set, the agent.monitoring
// from the fleet policy takes precedence over the environment variable.
func TestContainerCMDAgentMonitoringRuntimeExperimentalPolicy(t *testing.T) {
info := define.Require(t, define.Requirements{
Stack: &define.Stack{},
Local: false,
Sudo: true,
OS: []define.OS{
{Type: define.Linux},
},
Group: "container",
})

testCases := []struct {
name string
agentMonitoringRuntimeEnv string
expectedRuntimeName string
}{
{
name: "var set to otel",
agentMonitoringRuntimeEnv: monitoringCfg.OtelRuntimeManager,
expectedRuntimeName: string(monitoringCfg.ProcessRuntimeManager), // set by policy
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute)
defer cancel()

agentFixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
require.NoError(t, err)

err = agentFixture.Prepare(ctx)
require.NoError(t, err)

fleetURL, err := fleettools.DefaultURL(ctx, info.KibanaClient)
if err != nil {
t.Fatalf("could not get Fleet URL: %s", err)
}

policyName := fmt.Sprintf("test-beats-receivers-monitoring-%s-%s", tc.name, uuid.Must(uuid.NewV4()).String())
policyID, enrollmentToken := createPolicy(
t,
ctx,
agentFixture,
info,
policyName,
"")

addLogIntegration(t, info, policyID, "/tmp/beats-receivers-test.log")
integration.GenerateLogFile(t, "/tmp/beats-receivers-test.log", time.Second/2, 50)

// set monitoring runtime to process via policy
setAgentMonitoringRuntime(t, info, policyID, policyName, monitoringCfg.ProcessRuntimeManager)

env := []string{
"FLEET_ENROLL=1",
"FLEET_URL=" + fleetURL,
"FLEET_ENROLLMENT_TOKEN=" + enrollmentToken,
"STATE_PATH=" + agentFixture.WorkDir(),
}

// Set environment variable if specified
if tc.agentMonitoringRuntimeEnv != "" {
env = append(env, "AGENT_MONITORING_RUNTIME_EXPERIMENTAL="+tc.agentMonitoringRuntimeEnv)
}

cmd, agentOutput := prepareAgentCMD(t, ctx, agentFixture, []string{"container"}, env)
t.Logf(">> running binary with: %v", cmd.Args)
if err := cmd.Start(); err != nil {
t.Fatalf("error running container cmd: %s", err)
}

require.EventuallyWithT(t, func(ct *assert.CollectT) {
err = agentFixture.IsHealthy(ctx, atesting.WithCmdOptions(withEnv(env)))
require.NoError(ct, err)
},
2*time.Minute, time.Second,
"Elastic-Agent did not report healthy. Agent status error: \"%v\", Agent logs\n%s",
err, agentOutput,
)

// Verify that components are using the expected runtime
require.EventuallyWithTf(t, func(ct *assert.CollectT) {
status, err := agentFixture.ExecStatus(ctx, atesting.WithCmdOptions(withEnv(env)))
require.NoErrorf(t, err, "error getting agent status")

expectedComponentCount := 4 // process runtime
if tc.expectedRuntimeName == string(monitoringCfg.OtelRuntimeManager) {
expectedComponentCount = 5
}

require.Len(ct, status.Components, expectedComponentCount, "expected right number of components in agent status")

for _, comp := range status.Components {
var compRuntime string
switch comp.VersionInfo.Name {
case "beats-receiver":
compRuntime = string(component.OtelRuntimeManager)
case "beat-v2-client":
compRuntime = string(component.ProcessRuntimeManager)
}
t.Logf("Component ID: %s, version info: %s, runtime: %s", comp.ID, comp.VersionInfo.Name, compRuntime)
switch comp.ID {
case "beat/metrics-monitoring", "filestream-monitoring", "http/metrics-monitoring", "prometheus/metrics-monitoring":
// Monitoring components should use the expected runtime
assert.Equalf(t, tc.expectedRuntimeName, compRuntime, "unexpected runtime name for monitoring component %s with id %s", comp.Name, comp.ID)
default:
// Non-monitoring components should use the default runtime
assert.Equalf(t, string(component.DefaultRuntimeManager), compRuntime, "expected default runtime for non-monitoring component %s with id %s", comp.Name, comp.ID)
}
}
}, 1*time.Minute, 1*time.Second,
"components did not use expected runtime",
)
})
}
}

func addLogIntegration(t *testing.T, info *define.Info, policyID, logFilePath string) {
agentPolicyBuilder := strings.Builder{}
tmpl, err := template.New(t.Name() + "custom-log-policy").Parse(integration.PolicyJSON)
Expand Down Expand Up @@ -495,3 +737,73 @@ func addLogIntegration(t *testing.T, info *define.Info, policyID, logFilePath st
t.FailNow()
}
}

// createSimpleAgentMonitoringConfig creates a simple agent configuration file with monitoring enabled
func createSimpleAgentMonitoringConfig(t *testing.T, workDir string, esAddr string) string {
configTemplate := `
outputs:
default:
type: elasticsearch
hosts:
- %s

agent:
logging:
level: debug
monitoring:
enabled: true
metrics: true

inputs:
- id: system-metrics
type: system/metrics
use_output: default
streams:
- metricsets:
- cpu
data_stream.dataset: system.cpu
- metricsets:
- memory
data_stream.dataset: system.memory
`

config := fmt.Sprintf(configTemplate, esAddr)
configPath := filepath.Join(workDir, "elastic-agent.yml")
err := os.WriteFile(configPath, []byte(config), 0644)
if err != nil {
t.Fatalf("failed to write agent config file: %s", err)
}

return configPath
}

func setAgentMonitoringRuntime(t *testing.T, info *define.Info, policyID string, policyName string, runtime string) {
reqBody := fmt.Sprintf(`
{
"name": "%s",
"namespace": "default",
"overrides": {
"agent": {
"monitoring": {
"_runtime_experimental": "%s"
}
}
}
}
`, policyName, runtime)

status, result, err := info.KibanaClient.Request(
http.MethodPut,
fmt.Sprintf("/api/fleet/agent_policies/%s", policyID),
nil,
nil,
bytes.NewBufferString(reqBody))
if err != nil {
t.Fatalf("could not execute request to update policy: %s", err)
}
if status != http.StatusOK {
t.Fatalf("updating policy failed. Status code %d, response:\n%s", status, string(result))
}

t.Logf("Successfully set monitoring to process runtime for policy %s", policyID)
}