From 84b14f163006ae10583e6873d310dd7a439c4f1a Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 12 Nov 2025 12:33:39 -0300 Subject: [PATCH] otel: add env variable to set agent.monitoring runtime (#11018) * otel: add env variables to enable beats receivers in container * remove ENABLE_BEATS_RECEIVERS, add AGENT_MONITORING_RUNTIME_EXPERIMENTAL * remove policy update from the test * changing test to use local config, read env when fetching default config * add tests for policy override * update test description * update docs (cherry picked from commit d4ec6116d09ba94e8ab853edd63fae8900df3233) --- docs/hybrid-agent-beats-receivers.md | 2 + internal/pkg/agent/cmd/container.go | 4 + internal/pkg/core/monitoring/config/config.go | 10 +- testing/integration/ess/container_cmd_test.go | 312 ++++++++++++++++++ 4 files changed, 327 insertions(+), 1 deletion(-) diff --git a/docs/hybrid-agent-beats-receivers.md b/docs/hybrid-agent-beats-receivers.md index 96259da800c..28569807f87 100644 --- a/docs/hybrid-agent-beats-receivers.md +++ b/docs/hybrid-agent-beats-receivers.md @@ -36,6 +36,8 @@ https://github.com/elastic/kibana/issues/233186 is implemented. Before that chan overrides API can be used to add `_runtime_experimental: "otel"` to the `agent.monitoring` section of the policy. See https://support.elastic.dev/knowledge/view/06b69893 for details on the policy overrides API. +For the Elastic Agent container images, the `AGENT_MONITORING_RUNTIME_EXPERIMENTAL` environment variable can be set to either `process` or `otel` to override the default runtime used for agent monitoring. + Executing the `elastic-agent diagnostics` command in this mode will now produce an `otel-final.yml` file showing the generated collector configuration used to run the Beat receivers. diff --git a/internal/pkg/agent/cmd/container.go b/internal/pkg/agent/cmd/container.go index 6d7e8db2085..a2a030371f5 100644 --- a/internal/pkg/agent/cmd/container.go +++ b/internal/pkg/agent/cmd/container.go @@ -147,6 +147,10 @@ be used when the same credentials will be used across all the possible actions a KIBANA_CA - path to certificate authority to use with communicate with Kibana [$ELASTICSEARCH_CA] ELASTIC_AGENT_TAGS - user provided tags for the agent [linux,staging] +* Beats Receivers + The following experimental environment variables can be set to enable using Beats Receivers. + + AGENT_MONITORING_RUNTIME_EXPERIMENTAL - Set to either "process" or "otel" to use the respective runtime for the monitoring components. * Elastic-Agent event logging If EVENTS_TO_STDERR is set to true log entries containing event data or whole raw events will be logged to stderr alongside diff --git a/internal/pkg/core/monitoring/config/config.go b/internal/pkg/core/monitoring/config/config.go index cb0e19987d1..62193d17e87 100644 --- a/internal/pkg/core/monitoring/config/config.go +++ b/internal/pkg/core/monitoring/config/config.go @@ -5,6 +5,7 @@ package config import ( + "os" "strings" "time" @@ -111,6 +112,13 @@ type BufferConfig struct { // DefaultConfig creates a config with pre-set default values. func DefaultConfig() *MonitoringConfig { + monRuntimeManager := DefaultRuntimeManager + monRuntimeEnv := os.Getenv("AGENT_MONITORING_RUNTIME_EXPERIMENTAL") + switch monRuntimeEnv { + case ProcessRuntimeManager, OtelRuntimeManager: + monRuntimeManager = monRuntimeEnv + } + return &MonitoringConfig{ Enabled: true, MonitorLogs: true, @@ -125,7 +133,7 @@ func DefaultConfig() *MonitoringConfig { Namespace: defaultNamespace, APM: defaultAPMConfig(), Diagnostics: defaultDiagnostics(), - RuntimeManager: DefaultRuntimeManager, + RuntimeManager: monRuntimeManager, } } diff --git a/testing/integration/ess/container_cmd_test.go b/testing/integration/ess/container_cmd_test.go index 283dbf6788a..c90347d71fc 100644 --- a/testing/integration/ess/container_cmd_test.go +++ b/testing/integration/ess/container_cmd_test.go @@ -27,6 +27,8 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-libs/kibana" + monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config" + "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/core/process" atesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" @@ -444,6 +446,246 @@ func createMockESOutput(t *testing.T, info *define.Info, percentDuplicate, perce return mockesURL, outputResp.Item.ID } +// TestContainerCMDAgentMonitoringRuntimeExperimental tests that when +// AGENT_MONITORING_RUNTIME_EXPERIMENTAL is set, Elastic Agent uses the +// respective runtime to run the agent.monitoring components from the +// local configuration. +func TestContainerCMDAgentMonitoringRuntimeExperimental(t *testing.T) { + define.Require(t, define.Requirements{ + Stack: &define.Stack{}, + Local: false, + Sudo: true, + OS: []define.OS{ + {Type: define.Linux}, + }, + Group: "container", + }) + + testCases := []struct { + name string + agentMonitoringRuntimeEnv string + expectedRuntimeName string + }{ + { + name: "var set to otel", + agentMonitoringRuntimeEnv: monitoringCfg.OtelRuntimeManager, + expectedRuntimeName: string(monitoringCfg.OtelRuntimeManager), + }, + { + name: "var set to process", + agentMonitoringRuntimeEnv: monitoringCfg.ProcessRuntimeManager, + expectedRuntimeName: string(monitoringCfg.ProcessRuntimeManager), + }, + { + name: "var set to invalid value", + agentMonitoringRuntimeEnv: "invalid", + expectedRuntimeName: string(monitoringCfg.DefaultRuntimeManager), + }, + { + name: "var not set", + agentMonitoringRuntimeEnv: "", + expectedRuntimeName: string(monitoringCfg.DefaultRuntimeManager), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute) + defer cancel() + + agentFixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + err = agentFixture.Prepare(ctx) + require.NoError(t, err) + + mockesURL := integration.StartMockES(t, 0, 0, 0, 0) + + // Create a local agent config file with monitoring enabled + agentConfig := createSimpleAgentMonitoringConfig(t, agentFixture.WorkDir(), mockesURL) + + env := []string{ + "STATE_PATH=" + agentFixture.WorkDir(), + } + + // Set environment variable if specified + if tc.agentMonitoringRuntimeEnv != "" { + env = append(env, "AGENT_MONITORING_RUNTIME_EXPERIMENTAL="+tc.agentMonitoringRuntimeEnv) + } + + cmd, agentOutput := prepareAgentCMD(t, ctx, agentFixture, []string{"container", "-c", agentConfig}, env) + t.Logf(">> running binary with: %v", cmd.Args) + if err := cmd.Start(); err != nil { + t.Fatalf("error running container cmd: %s", err) + } + + require.EventuallyWithT(t, func(ct *assert.CollectT) { + err = agentFixture.IsHealthy(ctx, atesting.WithCmdOptions(withEnv(env))) + require.NoError(ct, err) + }, + 2*time.Minute, time.Second, + "Elastic-Agent did not report healthy. Agent status error: \"%v\", Agent logs\n%s", + err, agentOutput, + ) + + // Verify that components are using the expected runtime + require.EventuallyWithTf(t, func(ct *assert.CollectT) { + status, err := agentFixture.ExecStatus(ctx, atesting.WithCmdOptions(withEnv(env))) + require.NoErrorf(t, err, "error getting agent status") + + expectedComponentCount := 4 // process runtime + if tc.expectedRuntimeName == string(monitoringCfg.OtelRuntimeManager) { + expectedComponentCount = 5 + } + + require.Len(ct, status.Components, expectedComponentCount, "expected right number of components in agent status") + + for _, comp := range status.Components { + var compRuntime string + switch comp.VersionInfo.Name { + case "beats-receiver": + compRuntime = string(component.OtelRuntimeManager) + case "beat-v2-client": + compRuntime = string(component.ProcessRuntimeManager) + } + t.Logf("Component ID: %s, version info: %s, runtime: %s", comp.ID, comp.VersionInfo.Name, compRuntime) + switch comp.ID { + case "beat/metrics-monitoring", "filestream-monitoring", "http/metrics-monitoring", "prometheus/metrics-monitoring": + // Monitoring components should use the expected runtime + assert.Equalf(t, tc.expectedRuntimeName, compRuntime, "expected correct runtime name for monitoring component %s with id %s", comp.Name, comp.ID) + default: + // Non-monitoring components should use the default runtime + assert.Equalf(t, string(component.DefaultRuntimeManager), compRuntime, "expected default runtime for non-monitoring component %s with id %s", comp.Name, comp.ID) + } + } + }, 1*time.Minute, 1*time.Second, + "components did not use expected runtime", + ) + }) + } +} + +// TestContainerCMDAgentMonitoringRuntimeExperimentalPolicy tests that when +// AGENT_MONITORING_RUNTIME_EXPERIMENTAL is set, the agent.monitoring +// from the fleet policy takes precedence over the environment variable. +func TestContainerCMDAgentMonitoringRuntimeExperimentalPolicy(t *testing.T) { + info := define.Require(t, define.Requirements{ + Stack: &define.Stack{}, + Local: false, + Sudo: true, + OS: []define.OS{ + {Type: define.Linux}, + }, + Group: "container", + }) + + testCases := []struct { + name string + agentMonitoringRuntimeEnv string + expectedRuntimeName string + }{ + { + name: "var set to otel", + agentMonitoringRuntimeEnv: monitoringCfg.OtelRuntimeManager, + expectedRuntimeName: string(monitoringCfg.ProcessRuntimeManager), // set by policy + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute) + defer cancel() + + agentFixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + err = agentFixture.Prepare(ctx) + require.NoError(t, err) + + fleetURL, err := fleettools.DefaultURL(ctx, info.KibanaClient) + if err != nil { + t.Fatalf("could not get Fleet URL: %s", err) + } + + policyName := fmt.Sprintf("test-beats-receivers-monitoring-%s-%s", tc.name, uuid.Must(uuid.NewV4()).String()) + policyID, enrollmentToken := createPolicy( + t, + ctx, + agentFixture, + info, + policyName, + "") + + addLogIntegration(t, info, policyID, "/tmp/beats-receivers-test.log") + integration.GenerateLogFile(t, "/tmp/beats-receivers-test.log", time.Second/2, 50) + + // set monitoring runtime to process via policy + setAgentMonitoringRuntime(t, info, policyID, policyName, monitoringCfg.ProcessRuntimeManager) + + env := []string{ + "FLEET_ENROLL=1", + "FLEET_URL=" + fleetURL, + "FLEET_ENROLLMENT_TOKEN=" + enrollmentToken, + "STATE_PATH=" + agentFixture.WorkDir(), + } + + // Set environment variable if specified + if tc.agentMonitoringRuntimeEnv != "" { + env = append(env, "AGENT_MONITORING_RUNTIME_EXPERIMENTAL="+tc.agentMonitoringRuntimeEnv) + } + + cmd, agentOutput := prepareAgentCMD(t, ctx, agentFixture, []string{"container"}, env) + t.Logf(">> running binary with: %v", cmd.Args) + if err := cmd.Start(); err != nil { + t.Fatalf("error running container cmd: %s", err) + } + + require.EventuallyWithT(t, func(ct *assert.CollectT) { + err = agentFixture.IsHealthy(ctx, atesting.WithCmdOptions(withEnv(env))) + require.NoError(ct, err) + }, + 2*time.Minute, time.Second, + "Elastic-Agent did not report healthy. Agent status error: \"%v\", Agent logs\n%s", + err, agentOutput, + ) + + // Verify that components are using the expected runtime + require.EventuallyWithTf(t, func(ct *assert.CollectT) { + status, err := agentFixture.ExecStatus(ctx, atesting.WithCmdOptions(withEnv(env))) + require.NoErrorf(t, err, "error getting agent status") + + expectedComponentCount := 4 // process runtime + if tc.expectedRuntimeName == string(monitoringCfg.OtelRuntimeManager) { + expectedComponentCount = 5 + } + + require.Len(ct, status.Components, expectedComponentCount, "expected right number of components in agent status") + + for _, comp := range status.Components { + var compRuntime string + switch comp.VersionInfo.Name { + case "beats-receiver": + compRuntime = string(component.OtelRuntimeManager) + case "beat-v2-client": + compRuntime = string(component.ProcessRuntimeManager) + } + t.Logf("Component ID: %s, version info: %s, runtime: %s", comp.ID, comp.VersionInfo.Name, compRuntime) + switch comp.ID { + case "beat/metrics-monitoring", "filestream-monitoring", "http/metrics-monitoring", "prometheus/metrics-monitoring": + // Monitoring components should use the expected runtime + assert.Equalf(t, tc.expectedRuntimeName, compRuntime, "unexpected runtime name for monitoring component %s with id %s", comp.Name, comp.ID) + default: + // Non-monitoring components should use the default runtime + assert.Equalf(t, string(component.DefaultRuntimeManager), compRuntime, "expected default runtime for non-monitoring component %s with id %s", comp.Name, comp.ID) + } + } + }, 1*time.Minute, 1*time.Second, + "components did not use expected runtime", + ) + }) + } +} + func addLogIntegration(t *testing.T, info *define.Info, policyID, logFilePath string) { agentPolicyBuilder := strings.Builder{} tmpl, err := template.New(t.Name() + "custom-log-policy").Parse(integration.PolicyJSON) @@ -495,3 +737,73 @@ func addLogIntegration(t *testing.T, info *define.Info, policyID, logFilePath st t.FailNow() } } + +// createSimpleAgentMonitoringConfig creates a simple agent configuration file with monitoring enabled +func createSimpleAgentMonitoringConfig(t *testing.T, workDir string, esAddr string) string { + configTemplate := ` +outputs: + default: + type: elasticsearch + hosts: + - %s + +agent: + logging: + level: debug + monitoring: + enabled: true + metrics: true + +inputs: + - id: system-metrics + type: system/metrics + use_output: default + streams: + - metricsets: + - cpu + data_stream.dataset: system.cpu + - metricsets: + - memory + data_stream.dataset: system.memory +` + + config := fmt.Sprintf(configTemplate, esAddr) + configPath := filepath.Join(workDir, "elastic-agent.yml") + err := os.WriteFile(configPath, []byte(config), 0644) + if err != nil { + t.Fatalf("failed to write agent config file: %s", err) + } + + return configPath +} + +func setAgentMonitoringRuntime(t *testing.T, info *define.Info, policyID string, policyName string, runtime string) { + reqBody := fmt.Sprintf(` +{ + "name": "%s", + "namespace": "default", + "overrides": { + "agent": { + "monitoring": { + "_runtime_experimental": "%s" + } + } + } +} +`, policyName, runtime) + + status, result, err := info.KibanaClient.Request( + http.MethodPut, + fmt.Sprintf("/api/fleet/agent_policies/%s", policyID), + nil, + nil, + bytes.NewBufferString(reqBody)) + if err != nil { + t.Fatalf("could not execute request to update policy: %s", err) + } + if status != http.StatusOK { + t.Fatalf("updating policy failed. Status code %d, response:\n%s", status, string(result)) + } + + t.Logf("Successfully set monitoring to process runtime for policy %s", policyID) +}