Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
65e60ec
wip
pront Aug 4, 2025
8621a68
WIP2
pront Aug 4, 2025
83c585b
wip3
pront Aug 4, 2025
d948740
ran cargo fmt
pront Aug 4, 2025
de4ec21
Merge remote-tracking branch 'origin' into pront/otel-investigation-logs
pront Aug 7, 2025
2e03966
refactoring otel source
pront Aug 8, 2025
f250f8b
wip
pront Aug 8, 2025
696c91d
Merge remote-tracking branch 'origin' into pront/otel-investigation-logs
pront Aug 11, 2025
64eecdf
ran cargo fmt
pront Aug 11, 2025
f976a74
commit the otel proto desc
pront Aug 11, 2025
98fc30d
refactoring
pront Aug 11, 2025
35140c6
fix re-build mess
pront Aug 12, 2025
4c11c87
unrelated web-playground build fix
pront Aug 12, 2025
0592b73
avoid duplication in tests
pront Aug 12, 2025
200bb79
dbg tests
pront Aug 12, 2025
a763a80
dbg tests
pront Aug 12, 2025
8dc3674
ran cargo fmt
pront Aug 18, 2025
de17445
chore(dev): cargo vdev build licenses
pront Aug 18, 2025
8ca6122
Merge remote-tracking branch 'origin/master' into pront/otel-investig…
pront Aug 18, 2025
c4366c7
wip - cleanup
pront Aug 18, 2025
4e7f5d9
ran cargo fmt
pront Aug 19, 2025
4c9b143
chore(deps): cargo update -p vrl
pront Aug 19, 2025
e6dab61
chore(dev): cargo vdev build licenses
pront Aug 19, 2025
c0ac421
e2e - wip
pront Aug 19, 2025
60807e5
Merge remote-tracking branch 'origin' into pront/otel-investigation-logs
pront Aug 20, 2025
1f5fa7d
cleanup
pront Aug 20, 2025
e047de6
ran cargo fmt
pront Aug 20, 2025
7dd4968
Merge remote-tracking branch 'origin' into pront/otel-investigation-logs
pront Aug 21, 2025
32f2188
set default
pront Aug 21, 2025
f11e27f
wip
pront Aug 22, 2025
5066d92
vdev cruft
pront Aug 22, 2025
6d4a14f
ran cargo fmt
pront Aug 22, 2025
2bc4240
Merge remote-tracking branch 'origin' into pront/otel-investigation-logs
pront Aug 26, 2025
128f2ac
Merge remote-tracking branch 'origin' into pront/otel-investigation-logs
pront Aug 27, 2025
43f3f9d
cleanup
pront Aug 27, 2025
a619059
fix new config
pront Aug 27, 2025
1c48361
use JSON names - wip
pront Aug 27, 2025
ca4334a
Merge remote-tracking branch 'origin' into pront/otel-investigation-logs
pront Aug 27, 2025
425186f
ran cargo fmt
pront Aug 28, 2025
6d62658
chore(dev): cargo vdev build licenses
pront Aug 28, 2025
2b6f1c8
update VRL
pront Aug 28, 2025
a830fdf
ran cargo fmt
pront Aug 28, 2025
4ec6f8a
chore(dev): cargo vdev build licenses
pront Aug 28, 2025
de3ac9a
handle metrics too
pront Aug 28, 2025
9101088
ran cargo fmt
pront Aug 28, 2025
b537cd3
vrl update fixes
pront Aug 28, 2025
1e09da5
Merge remote-tracking branch 'origin' into pront/otel-investigation-logs
pront Aug 28, 2025
db3b324
ran cargo fmt
pront Aug 28, 2025
860808f
wip
pront Sep 2, 2025
bcea371
chore(dev): cargo vdev build licenses
pront Sep 2, 2025
af06922
fix typo and add first version of the changelog
pront Sep 2, 2025
d4f723f
ran cargo fmt
pront Sep 2, 2025
43a2bcd
fix typo and add first version of the changelog
pront Sep 2, 2025
afc48ae
add author
pront Sep 2, 2025
31ce84d
Merge branch 'master' into pront/otel-investigation-logs
pront Sep 2, 2025
2ac056a
Apply suggestions from code review
pront Sep 2, 2025
40a76ee
tweaks to changelog
pront Sep 2, 2025
55eec57
gen comp docs
pront Sep 2, 2025
5480f39
Merge branch 'master' into pront/otel-investigation-logs
pront Sep 2, 2025
0980161
fix changelog typo
pront Sep 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions changelog.d/otlp_decoding.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
The `opentelemetry` source now supports a new decoding mode which can be enabled by setting `use_otlp_decoding` to `true`. In this mode,
all events preserve the [OTLP](https://opentelemetry.io/docs/specs/otel/protocol/) format. These events can be forwarded directly to
the `opentelemetry` sink without modifications.

**Note:** The OTLP metric format and the Vector metric format differ, so the `opentelemetry` source emits OTLP formatted metrics as Vector log
events. These events cannot be used with existing metrics transforms. However, they can be ingested by the OTEL collectors as metrics.

authors: pront
7 changes: 7 additions & 0 deletions lib/codecs/src/decoding/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,18 @@ pub trait Deserializer: DynClone + Send + Sync {
/// frame can potentially hold multiple events, e.g. when parsing a JSON
/// array. However, we optimize the most common case of emitting one event
/// by not requiring heap allocations for it.
///
/// **Note**: The type of the produced events depends on the implementation.
fn parse(
&self,
bytes: Bytes,
log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>>;

/// Parses trace events from bytes.
fn parse_traces(&self, _bytes: Bytes) -> vector_common::Result<SmallVec<[Event; 1]>> {
unimplemented!()
}
}

dyn_clone::clone_trait_object!(Deserializer);
Expand Down
62 changes: 46 additions & 16 deletions lib/codecs/src/decoding/format/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ use derivative::Derivative;
use prost_reflect::{DynamicMessage, MessageDescriptor};
use smallvec::{SmallVec, smallvec};
use vector_config::configurable_component;
use vector_core::event::LogEvent;
use vector_core::event::{LogEvent, TraceEvent};
use vector_core::{
config::{DataType, LogNamespace, log_schema},
event::Event,
schema,
};
use vrl::protobuf::{
descriptor::get_message_descriptor,
parse::{Options, proto_to_value},
};
use vrl::value::Kind;
use vrl::protobuf::descriptor::{get_message_descriptor, get_message_descriptor_from_bytes};
use vrl::protobuf::parse::{Options, proto_to_value};
use vrl::value::{Kind, Value};

use super::Deserializer;

Expand Down Expand Up @@ -86,30 +84,56 @@ pub struct ProtobufDeserializerOptions {
#[derive(Debug, Clone)]
pub struct ProtobufDeserializer {
message_descriptor: MessageDescriptor,
options: Options,
}

impl ProtobufDeserializer {
/// Creates a new `ProtobufDeserializer`.
pub fn new(message_descriptor: MessageDescriptor) -> Self {
Self { message_descriptor }
Self {
message_descriptor,
options: Default::default(),
}
}

/// Creates a new deserializer instance using the descriptor bytes directly.
pub fn new_from_bytes(
desc_bytes: &[u8],
message_type: &str,
options: Options,
) -> vector_common::Result<Self> {
let message_descriptor = get_message_descriptor_from_bytes(desc_bytes, message_type)?;
Ok(Self {
message_descriptor,
options,
})
}
}

fn extract_vrl_value(
bytes: Bytes,
message_descriptor: &MessageDescriptor,
options: &Options,
) -> vector_common::Result<Value> {
let dynamic_message = DynamicMessage::decode(message_descriptor.clone(), bytes)
.map_err(|error| format!("Error parsing protobuf: {error:?}"))?;

Ok(proto_to_value(
&prost_reflect::Value::Message(dynamic_message),
None,
options,
)?)
}

impl Deserializer for ProtobufDeserializer {
fn parse(
&self,
bytes: Bytes,
log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
let dynamic_message = DynamicMessage::decode(self.message_descriptor.clone(), bytes)
.map_err(|error| format!("Error parsing protobuf: {error:?}"))?;

let proto_vrl = proto_to_value(
&prost_reflect::Value::Message(dynamic_message),
None,
&Options::default(),
)?;
let mut event = Event::Log(LogEvent::from(proto_vrl));
let vrl_value = extract_vrl_value(bytes, &self.message_descriptor, &self.options)?;
let mut event = Event::Log(LogEvent::from(vrl_value));

let event = match log_namespace {
LogNamespace::Vector => event,
LogNamespace::Legacy => {
Expand All @@ -126,6 +150,12 @@ impl Deserializer for ProtobufDeserializer {

Ok(smallvec![event])
}

fn parse_traces(&self, bytes: Bytes) -> vector_common::Result<SmallVec<[Event; 1]>> {
let vrl_value = extract_vrl_value(bytes, &self.message_descriptor, &self.options)?;
let trace_event = Event::Trace(TraceEvent::from(vrl_value));
Ok(smallvec![trace_event])
}
}

impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer {
Expand Down
28 changes: 21 additions & 7 deletions lib/opentelemetry-proto/build.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use glob::glob;
use std::{env, io::Result, path::PathBuf};
use std::fs::{read_to_string, write};
use std::path::Path;
use std::{io::Result, path::PathBuf};

fn main() -> Result<()> {
let proto_root = PathBuf::from("src/proto/opentelemetry-proto");
Expand All @@ -10,12 +12,7 @@ fn main() -> Result<()> {
.filter_map(|result| result.ok())
.collect();

// Set up re-run triggers
for proto in &proto_paths {
println!("cargo:rerun-if-changed={}", proto.display());
}

let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap());
let out_dir = PathBuf::from(std::env::var("OUT_DIR").unwrap());
let descriptor_path = out_dir.join("opentelemetry-proto.desc");

tonic_build::configure()
Expand All @@ -24,5 +21,22 @@ fn main() -> Result<()> {
.file_descriptor_set_path(&descriptor_path)
.compile(&proto_paths, &[include_path])?;

write_static_descriptor_reference(&descriptor_path, &out_dir)?;

Ok(())
}

fn write_static_descriptor_reference(descriptor_path: &Path, out_dir: &Path) -> Result<()> {
let include_line = format!(
"pub static DESCRIPTOR_BYTES: &[u8] = include_bytes!(r\"{}\");\n",
descriptor_path.display()
);

let include_file = out_dir.join("opentelemetry-proto.rs");
let existing = read_to_string(&include_file).ok();
if existing.as_deref() != Some(&include_line) {
write(&include_file, include_line)?;
}

Ok(())
}
3 changes: 3 additions & 0 deletions lib/opentelemetry-proto/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@ pub mod resource {
tonic::include_proto!("opentelemetry.proto.resource.v1");
}
}

/// The raw descriptor bytes for all the above.
include!(concat!(env!("OUT_DIR"), "/opentelemetry-proto.rs"));
7 changes: 7 additions & 0 deletions lib/vector-core/src/event/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ impl TraceEvent {
}
}

impl From<Value> for TraceEvent {
fn from(value: Value) -> Self {
let log_event = LogEvent::from(value);
Self(log_event)
}
}

impl From<LogEvent> for TraceEvent {
fn from(log: LogEvent) -> Self {
Self(log)
Expand Down
6 changes: 3 additions & 3 deletions scripts/e2e/opentelemetry-logs/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: opentelemetry-vector-e2e
services:
otel-collector-source:
container_name: otel-collector-source
image: otel/opentelemetry-collector-contrib:${CONFIG_COLLECTOR_VERSION:-latest}
image: otel/opentelemetry-collector-contrib:${CONFIG_COLLECTOR_VERSION}
init: true
volumes:
- type: bind
Expand Down Expand Up @@ -33,7 +33,7 @@ services:

otel-collector-sink:
container_name: otel-collector-sink
image: otel/opentelemetry-collector-contrib:${CONFIG_COLLECTOR_VERSION:-latest}
image: otel/opentelemetry-collector-contrib:${CONFIG_COLLECTOR_VERSION}
init: true
volumes:
- type: bind
Expand All @@ -57,7 +57,7 @@ services:
init: true
volumes:
- type: bind
source: ../../../tests/data/e2e/opentelemetry/logs/vector.yaml
source: ../../../tests/data/e2e/opentelemetry/logs/${CONFIG_VECTOR_CONFIG}
target: /etc/vector/vector.yaml
read_only: true
- type: bind
Expand Down
3 changes: 2 additions & 1 deletion scripts/e2e/opentelemetry-logs/test.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
features:
- e2e-tests-opentelemetry
- e2e-tests-opentelemetry

test: "e2e"

Expand All @@ -14,6 +14,7 @@ runner:
matrix:
# Determines which `otel/opentelemetry-collector-contrib` version to use
collector_version: [ 'latest' ]
vector_config: [ 'vector_default.yaml', 'vector_otlp.yaml' ]

# Only trigger this integration test if relevant OTEL source/sink files change
paths:
Expand Down
Loading
Loading