diff --git a/Cargo.lock b/Cargo.lock index 9e9ea2c3..68eef159 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2182,6 +2182,18 @@ dependencies = [ "http", ] +[[package]] +name = "orion-internal" +version = "0.1.0" +dependencies = [ + "compact_str", + "orion-configuration", + "orion-error", + "tokio", + "tokio-test", + "tracing", +] + [[package]] name = "orion-interner" version = "0.1.0" @@ -2227,6 +2239,7 @@ dependencies = [ "orion-error", "orion-format", "orion-http-header", + "orion-internal", "orion-interner", "orion-metrics", "orion-tracing", @@ -3627,6 +3640,19 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-util" version = "0.7.16" diff --git a/Cargo.toml b/Cargo.toml index b2011252..bee8d760 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "orion-error", "orion-http-header", "orion-interner", + "orion-internal", "orion-format", "orion-lib", "orion-metrics", @@ -35,6 +36,7 @@ orion-error = { path = "orion-error" } orion-format = { path = "orion-format" } orion-http-header = { path = "orion-http-header" } orion-interner = { path = "orion-interner" } +orion-internal = { path = "orion-internal" } orion-lib = { path = "orion-lib" } orion-metrics = { path = "orion-metrics" } orion-tracing = { path = "orion-tracing" } diff --git a/docs/proposal/internal-listener-and-upstream-transport.md b/docs/proposal/internal-listener-and-upstream-transport.md new file mode 100644 index 00000000..677cda4e --- /dev/null +++ b/docs/proposal/internal-listener-and-upstream-transport.md @@ -0,0 +1,370 @@ +--- +title: Internal Listener and Upstream Transport Support +authors: +- "@Eeshu-Yadav" +reviewers: +- "@YaoZengzeng" +- "@dawid-nowak" +approvers: +- "@YaoZengzeng" +- "@dawid-nowak" + +creation-date: 2025-10-03 + +--- + +## Internal Listener and Upstream Transport Support + +### Summary + +This proposal implements internal listener and upstream transport functionality in Orion to enable waypoint proxy capabilities. Internal listeners allow in-process communication without network APIs, while internal upstream transport enables metadata passthrough between proxy hops. This enables ambient mesh deployments with TCP proxy chaining and multi-hop routing. + +### Motivation + +To support ambient service mesh deployments, Orion needs: + +1. **Internal connections**: Accept connections from within the same process via in-memory channels +2. **Name-based routing**: Route to internal listeners by name instead of network addresses +3. **Metadata propagation**: Preserve request context across proxy hops for routing and observability +4. **Performance optimization**: Eliminate network stack overhead for co-located proxy communication + +#### Goals + +- Implement Envoy-compatible internal listener and upstream transport support +- Enable clusters to connect via `server_listener_name` with metadata passthrough +- Provide thread-safe connection handling with proper lifecycle management +- Maintain full compatibility with Envoy xDS configurations +- Ensure zero performance regression for network listeners + + +### Proposal + +The proposal introduces three main components to enable internal listener and upstream transport functionality: + +1. **Internal Connection Factory**: A global, thread-safe registry that manages internal listener registration and connection establishment between clusters and listeners within the same proxy instance. + +2. **Enhanced Internal Listener Runtime**: Extension of the existing listener infrastructure to handle internal connections, process them through filter chains, and manage lifecycle events. + +3. **Internal Upstream Transport**: Implementation of cluster-side functionality to establish connections to internal endpoints and pass metadata through the transport socket layer. + +The implementation follows Envoy's internal listener design while leveraging Rust's type system and async runtime for safety and performance. + + +#### Notes + +**Design Decisions:** +- In-process only communication using Tokio duplex streams +- Global factory pattern with `std::sync::OnceLock` for thread-safe initialization +- Weak references for automatic lifecycle management +- Initial support for host, cluster, and route metadata (request metadata in future) +- Full Envoy configuration compatibility with listener name validation + + +### Design Details + +#### Architecture Overview + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Orion Proxy Process │ +│ │ +│ ┌──────────────┐ ┌──────────────┐ │ +│ │ External │ │ Internal │ │ +│ │ Listener │ │ Listener │ │ +│ │ (Network) │ │ (In-Memory) │ │ +│ └──────┬───────┘ └──────┬───────┘ │ +│ │ │ │ +│ │ TCP Connection │ Register │ +│ ▼ ▼ │ +│ ┌────────────────────────────────────────────────────┐ │ +│ │ Internal Connection Factory │ │ +│ │ ┌──────────────────────────────────────────┐ │ │ +│ │ │ Listener Registry │ │ │ +│ │ │ HashMap │ │ │ +│ │ └──────────────────────────────────────────┘ │ │ +│ └────────────────────────────┬───────────────────────┘ │ +│ │ │ +│ │ Connect │ +│ ▼ │ +│ ┌──────────────┐ ┌─────────────────┐ │ +│ │ Cluster │─────▶│ TCP Proxy │ │ +│ │ (Internal │ │ Filter │ │ +│ │ Endpoint) │ └─────────────────┘ │ +│ └──────────────┘ │ +│ │ │ +│ │ Internal Connection (Duplex Stream) │ +│ ▼ │ +│ ┌──────────────────────────────────────┐ │ +│ │ Internal Upstream Transport │ │ +│ │ - Metadata Passthrough │ │ +│ │ - Host/Cluster/Route Metadata │ │ +│ └──────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +#### Component Details + +##### 1. Internal Connection Factory + +**Location**: `orion-lib/src/transport/internal_connection.rs` + +```rust +pub struct InternalConnectionFactory { + listeners: Arc>>, +} + +pub struct InternalListenerHandle { + pub name: String, + pub connection_sender: mpsc::UnboundedSender, + listener_ref: Weak<()>, +} + +pub struct InternalConnectionPair { + pub upstream: Arc, + pub downstream: Arc, +} + +pub struct InternalStream { + metadata: InternalConnectionMetadata, + stream: tokio::io::DuplexStream, + is_closed: Arc>, +} +``` + +**Key Operations**: `register_listener`, `unregister_listener`, `connect_to_listener`, `is_listener_active`, `list_listeners`, `get_stats` + +##### 2. Enhanced Internal Listener Runtime + +**Location**: `orion-lib/src/listeners/listener.rs` + +```rust +async fn run_internal_listener( + name: &'static str, + filter_chains: HashMap, + mut route_updates_receiver: broadcast::Receiver, + mut secret_updates_receiver: broadcast::Receiver, +) -> Error { + let factory = global_internal_connection_factory(); + let (_handle, mut connection_receiver, _listener_ref) = + factory.register_listener(name.to_string()).await?; + + loop { + tokio::select! { + Some(connection_pair) = connection_receiver.recv() => { + tokio::spawn(handle_internal_connection(connection_pair, filter_chains_clone)); + } + Ok(route_update) = route_updates_receiver.recv() => { + process_route_update(&name, &filter_chains, route_update); + } + Ok(secret_update) = secret_updates_receiver.recv() => { + process_secret_update(&name, &mut filter_chains_clone, secret_update); + } + } + } +} +``` + +##### 3. Internal Cluster Connector + +**Location**: `orion-lib/src/transport/internal_cluster_connector.rs` + +```rust +pub struct InternalClusterConnector { + listener_name: String, + endpoint_id: Option, +} + +impl InternalClusterConnector { + pub async fn connect(&self) -> Result { + let factory = global_internal_connection_factory(); + factory.connect_to_listener(&self.listener_name, self.endpoint_id.clone()).await + } +} + +pub struct InternalChannelConnector { + connector: InternalClusterConnector, + cluster_name: &'static str, +} +``` + +##### 4. Configuration Data Structures + +**Listener Configuration** (`orion-configuration/src/config/listener.rs`): + +```rust +pub enum ListenerAddress { + Socket(SocketAddr), + Internal(InternalListener), +} + +pub struct InternalListener { + pub buffer_size_kb: Option, +} +``` + +**Cluster Configuration** (`orion-configuration/src/config/cluster.rs`): + +```rust +pub enum EndpointAddress { + Socket(SocketAddr), + Internal(InternalEndpointAddress), +} + +pub struct InternalEndpointAddress { + pub server_listener_name: CompactString, + pub endpoint_id: Option, +} + +pub enum TransportSocket { + InternalUpstream(InternalUpstreamTransport), + RawBuffer, +} + +pub struct InternalUpstreamTransport { + pub passthrough_metadata: Vec, + pub transport_socket: Box, +} + +pub struct MetadataValueSource { + pub kind: MetadataKind, + pub name: CompactString, +} + +pub enum MetadataKind { + Host, + Route, + Cluster, +} +``` + +##### 5. Example Configuration + +**Bootstrap Configuration**: + +```yaml +static_resources: + listeners: + # External listener accepting network connections + - name: "listener_0" + address: + socket_address: + address: "0.0.0.0" + port_value: 15001 + filter_chains: + - filters: + - name: "tcp_proxy" + typed_config: + "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy" + stat_prefix: "ingress_tcp" + cluster: "internal_cluster" + + # Internal listener accepting in-process connections + - name: "waypoint_internal" + address: + envoy_internal_address: + server_listener_name: "waypoint_internal" + filter_chains: + - filters: + - name: "http_connection_manager" + typed_config: + "@type": "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager" + stat_prefix: "waypoint_http" + route_config: + name: "local_route" + virtual_hosts: + - name: "backend" + domains: ["*"] + routes: + - match: { prefix: "/" } + route: { cluster: "backend_service" } + + clusters: + # Cluster routing to internal listener + - name: "internal_cluster" + type: "STATIC" + load_assignment: + cluster_name: "internal_cluster" + endpoints: + - lb_endpoints: + - endpoint: + address: + envoy_internal_address: + server_listener_name: "waypoint_internal" + transport_socket: + name: "internal_upstream" + typed_config: + "@type": "type.googleapis.com/envoy.extensions.transport_sockets.internal_upstream.v3.InternalUpstreamTransport" + passthrough_metadata: + - kind: { host: {} } + name: "envoy.filters.listener.original_dst" + - kind: { cluster: {} } + name: "istio.workload" + transport_socket: + name: "raw_buffer" + typed_config: + "@type": "type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer" + + # Backend service cluster + - name: "backend_service" + type: "STATIC" + load_assignment: + cluster_name: "backend_service" + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: "10.0.0.5" + port_value: 8080 +``` + +#### Implementation Phases + +The implementation is divided into four GitHub issues for manageable development and review: + +**Phase 1: Internal Connection Factory** - Connection factory with thread-safe registry and lifecycle management + +**Phase 2: Enhanced Internal Listener Runtime** - Connection acceptance and filter chain integration + +**Phase 3: Cluster Internal Connection Support** - Cluster connectors with load balancing for internal endpoints + +**Phase 4: Internal Upstream Transport & Metadata Passthrough** - Metadata extraction and passthrough implementation + +#### Test Plan + +**Unit Tests**: +- Listener registration/unregistration in connection factory +- Connection establishment between listeners and clusters +- Thread safety and concurrent access +- Error handling for non-existent/inactive listeners + +**Integration Tests**: +- End-to-end flow: External listener → Internal listener → Backend +- Configuration parsing and validation +- Metadata propagation across proxy hops + +--- + +## References + +1. [Envoy Internal Listener Documentation](https://www.envoyproxy.io/docs/envoy/latest/configuration/other_features/internal_listener) +2. [Envoy Internal Upstream Transport Proto](https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/transport_sockets/internal_upstream/v3/internal_upstream.proto) +3. [Envoy Metadata Types](https://www.envoyproxy.io/docs/envoy/latest/api-v3/type/metadata/v3/metadata.proto) + + +--- + +## Appendix + +### Glossary + +- **Internal Listener**: A listener that accepts connections from within the proxy process rather than from the network +- **Waypoint Proxy**: A shared proxy in ambient service mesh that handles L7 processing for multiple workloads +- **Internal Upstream Transport**: Transport socket that enables metadata passthrough for internal connections +- **Server Listener Name**: Unique identifier for an internal listener used by clusters to establish connections +- **Metadata Passthrough**: Mechanism to propagate context (host/cluster/route metadata) across proxy hops +- **Duplex Stream**: Bidirectional async I/O stream provided by Tokio for in-memory communication + +### Acknowledgments + +This feature was proposed by @YaoZengzeng in this [issue](https://github.com/kmesh-net/orion/issues/59) and has been reviewed by @dawid-nowak @YaoZengzeng. The design follows Envoy's internal listener specification while adapting to Orion's Rust-based architecture and async runtime. diff --git a/orion-configuration/src/config/cluster.rs b/orion-configuration/src/config/cluster.rs index 56f883d4..6341a3c7 100644 --- a/orion-configuration/src/config/cluster.rs +++ b/orion-configuration/src/config/cluster.rs @@ -155,7 +155,7 @@ impl From for Vec { Address::Internal(internal_addr) => Some(LbEndpoint { address: EndpointAddress::Internal(InternalEndpointAddress { server_listener_name: internal_addr.server_listener_name.into(), - endpoint_id: internal_addr.endpoint_id.map(|id| id.into()), + endpoint_id: internal_addr.endpoint_id.map(std::convert::Into::into), }), health_status: HealthStatus::default(), load_balancing_weight: NonZeroU32::MIN, @@ -193,7 +193,7 @@ impl EndpointAddress { pub fn into_addr(self) -> Result { match self { EndpointAddress::Socket(addr) => Ok(addr), - EndpointAddress::Internal(_) => Err("Cannot convert internal address to socket address".to_string()), + EndpointAddress::Internal(_) => Err("Cannot convert internal address to socket address".to_owned()), } } } @@ -785,7 +785,7 @@ mod envoy_conversions { Address::Socket(socket_addr) => Ok(EndpointAddress::Socket(socket_addr)), Address::Internal(internal_addr) => Ok(EndpointAddress::Internal(InternalEndpointAddress { server_listener_name: internal_addr.server_listener_name.into(), - endpoint_id: internal_addr.endpoint_id.map(|id| id.into()), + endpoint_id: internal_addr.endpoint_id.map(std::convert::Into::into), })), Address::Pipe(_, _) => { Err(GenericError::unsupported_variant("Pipe addresses are not supported for endpoints")) diff --git a/orion-configuration/src/config/core.rs b/orion-configuration/src/config/core.rs index 4d1c89a7..559a5981 100644 --- a/orion-configuration/src/config/core.rs +++ b/orion-configuration/src/config/core.rs @@ -305,7 +305,7 @@ pub mod envoy_conversions { impl std::fmt::Display for Address { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Socket(addr) => write!(f, "{}", addr), + Self::Socket(addr) => write!(f, "{addr}"), Self::Internal(internal) => write!(f, "internal:{}", internal.server_listener_name), Self::Pipe(path, _) => f.write_str(path), } diff --git a/orion-configuration/src/config/runtime.rs b/orion-configuration/src/config/runtime.rs index 0442d07a..b9cbda7d 100644 --- a/orion-configuration/src/config/runtime.rs +++ b/orion-configuration/src/config/runtime.rs @@ -144,7 +144,7 @@ fn get_cgroup_v2_cpu_limit() -> crate::Result { } fn parse_cgroup_v2_cpu_max(content: &str) -> crate::Result { - let parts: Vec<&str> = content.trim().split_whitespace().collect(); + let parts: Vec<&str> = content.split_whitespace().collect(); if parts.len() == 2 && parts[0] != "max" { let quota: i64 = parts[0].parse()?; let period: i64 = parts[1].parse()?; @@ -160,8 +160,8 @@ fn parse_cgroup_v2_cpu_max(content: &str) -> crate::Result { fn get_cgroup_v1_cpu_limit() -> crate::Result { let cgroup_path = get_cgroup_v1_cpu_path()?; - let quota_path = format!("{}/cpu.cfs_quota_us", cgroup_path); - let period_path = format!("{}/cpu.cfs_period_us", cgroup_path); + let quota_path = format!("{cgroup_path}/cpu.cfs_quota_us"); + let period_path = format!("{cgroup_path}/cpu.cfs_period_us"); let quota_content = std::fs::read_to_string("a_path)?; let period_content = std::fs::read_to_string(&period_path)?; @@ -193,7 +193,7 @@ fn parse_cgroup_v1_cpu_path(cgroup_content: &str) -> crate::Result { let mut parts = line.split(':'); if let (Some(_), Some(controllers), Some(path)) = (parts.next(), parts.next(), parts.next()) { if controllers.split(',').any(|c| c == "cpu") { - return Some(format!("/sys/fs/cgroup/cpu{}", path)); + return Some(format!("/sys/fs/cgroup/cpu{path}")); } } None @@ -202,7 +202,7 @@ fn parse_cgroup_v1_cpu_path(cgroup_content: &str) -> crate::Result { } if std::path::Path::new("/sys/fs/cgroup/cpu").exists() { - Ok("/sys/fs/cgroup/cpu".to_string()) + Ok("/sys/fs/cgroup/cpu".to_owned()) } else { Err("CPU cgroup path not found".into()) } diff --git a/orion-internal/Cargo.toml b/orion-internal/Cargo.toml new file mode 100644 index 00000000..19bca0ec --- /dev/null +++ b/orion-internal/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "orion-internal" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +description = "Internal connection and internal listener infrastructure for Orion proxy" + +[dependencies] +tokio = { version = "1.42.0", features = ["full"] } +tracing = "0.1.41" +orion-error = { path = "../orion-error" } +orion-configuration = { path = "../orion-configuration" } +compact_str = "0.8.0" + +[dev-dependencies] +tokio-test = "0.4" diff --git a/orion-internal/src/connection.rs b/orion-internal/src/connection.rs new file mode 100644 index 00000000..81e72c37 --- /dev/null +++ b/orion-internal/src/connection.rs @@ -0,0 +1,471 @@ +// Copyright 2025 The kmesh Authors +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +use std::{ + collections::HashMap, + ops::Deref, + sync::{Arc, Weak}, +}; + +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::{mpsc, Mutex, RwLock}, + time::Instant, +}; + +pub use crate::{Error, Result}; + +#[derive(Debug, Clone)] +pub struct InternalConnectionMetadata { + pub listener_name: String, + pub buffer_size_kb: Option, + pub created_at: Instant, + pub endpoint_id: Option, +} + +#[derive(Debug, Clone)] +pub struct InternalListenerHandle { + pub name: String, + pub connection_sender: mpsc::UnboundedSender, + listener_ref: Weak<()>, + buffer_size_kb: Option, +} + +impl InternalListenerHandle { + pub fn new( + name: String, + connection_sender: mpsc::UnboundedSender, + listener_ref: Weak<()>, + buffer_size_kb: Option, + ) -> Self { + Self { name, connection_sender, listener_ref, buffer_size_kb } + } + + pub fn is_alive(&self) -> bool { + self.listener_ref.strong_count() > 0 + } + + pub fn create_connection(&self, endpoint_id: Option) -> Result { + if !self.is_alive() { + return Err(Error::new(format!("Internal listener '{}' is no longer active", self.name))); + } + + let metadata = InternalConnectionMetadata { + listener_name: self.name.clone(), + buffer_size_kb: self.buffer_size_kb, + created_at: Instant::now(), + endpoint_id, + }; + + let (upstream, downstream) = create_internal_connection_pair(metadata); + + let connection_pair = InternalConnectionPair { upstream: upstream.clone(), downstream: downstream.clone() }; + + if self.connection_sender.send(connection_pair.clone()).is_err() { + return Err(Error::new(format!("Failed to send connection to internal listener '{}'", self.name))); + } + + Ok(connection_pair) + } +} + +#[derive(Debug, Clone)] +pub struct InternalConnectionPair { + pub upstream: Arc, + pub downstream: Arc, +} + +#[derive(Debug)] +pub struct InternalStream { + metadata: InternalConnectionMetadata, + stream: Mutex, + is_closed: Arc>, +} + +impl InternalStream { + fn new(metadata: InternalConnectionMetadata, stream: tokio::io::DuplexStream) -> Self { + Self { metadata, stream: Mutex::new(stream), is_closed: Arc::new(RwLock::new(false)) } + } +} + +impl InternalStream { + pub fn metadata(&self) -> &InternalConnectionMetadata { + &self.metadata + } + + pub fn is_active(&self) -> bool { + if let Ok(is_closed) = self.is_closed.try_read() { + !*is_closed + } else { + false + } + } +} + +impl AsyncRead for InternalStream { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + match self.stream.try_lock() { + Ok(mut stream) => tokio::io::AsyncRead::poll_read(std::pin::Pin::new(&mut *stream), cx, buf), + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } + } +} + +impl AsyncWrite for InternalStream { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + match self.stream.try_lock() { + Ok(mut stream) => tokio::io::AsyncWrite::poll_write(std::pin::Pin::new(&mut *stream), cx, buf), + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.stream.try_lock() { + Ok(mut stream) => tokio::io::AsyncWrite::poll_flush(std::pin::Pin::new(&mut *stream), cx), + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.stream.try_lock() { + Ok(mut stream) => tokio::io::AsyncWrite::poll_shutdown(std::pin::Pin::new(&mut *stream), cx), + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } + } +} + +#[derive(Debug)] +pub struct InternalConnectionFactory { + listeners: Arc>>, +} + +impl InternalConnectionFactory { + pub fn new() -> Self { + Self { listeners: Arc::new(RwLock::new(HashMap::new())) } + } + + pub async fn register_listener( + &self, + name: String, + buffer_size_kb: Option, + ) -> Result<(InternalListenerHandle, mpsc::UnboundedReceiver, Arc<()>)> { + let (connection_tx, connection_rx) = mpsc::unbounded_channel(); + let listener_ref = Arc::new(()); + let weak_ref = Arc::downgrade(&listener_ref); + + let handle = InternalListenerHandle::new(name.clone(), connection_tx, weak_ref, buffer_size_kb); + + let mut listeners = self.listeners.write().await; + + if listeners.contains_key(&name) { + return Err(Error::new(format!("Internal listener '{name}' is already registered"))); + } + + listeners.insert(name, handle.clone()); + Ok((handle, connection_rx, listener_ref)) + } + + pub async fn unregister_listener(&self, name: &str) -> Result<()> { + let mut listeners = self.listeners.write().await; + + if listeners.remove(name).is_none() { + return Err(Error::new(format!("Internal listener '{name}' was not registered"))); + } + + Ok(()) + } + + pub async fn connect_to_listener( + &self, + name: &str, + endpoint_id: Option, + ) -> Result> { + let listeners = self.listeners.read().await; + let handle = listeners.get(name).ok_or_else(|| Error::new(format!("Internal listener '{name}' not found")))?; + + let connection_pair = handle.create_connection(endpoint_id)?; + Ok(Box::new(InternalStreamWrapper { inner: connection_pair.upstream })) + } + + pub async fn list_listeners(&self) -> Vec { + let listeners = self.listeners.read().await; + listeners.keys().map(String::clone).collect() + } + + pub async fn is_listener_active(&self, name: &str) -> bool { + let listeners = self.listeners.read().await; + listeners.get(name).is_some_and(InternalListenerHandle::is_alive) + } + + pub async fn get_stats(&self) -> InternalConnectionStats { + let listeners = self.listeners.read().await; + let active_listeners = listeners.len(); + + InternalConnectionStats { active_listeners, total_pooled_connections: 0, max_pooled_connections: 0 } + } +} + +impl Default for InternalConnectionFactory { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug, Clone)] +pub struct InternalConnectionStats { + pub active_listeners: usize, + pub total_pooled_connections: usize, + pub max_pooled_connections: usize, +} + +pub struct InternalStreamWrapper { + inner: Arc, +} + +impl Deref for InternalStreamWrapper { + type Target = InternalStream; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl AsyncRead for InternalStreamWrapper { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + match self.inner.stream.try_lock() { + Ok(mut stream) => { + let pinned_stream = std::pin::Pin::new(&mut *stream); + tokio::io::AsyncRead::poll_read(pinned_stream, cx, buf) + }, + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } + } +} + +impl AsyncWrite for InternalStreamWrapper { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + match self.inner.stream.try_lock() { + Ok(mut stream) => { + let pinned_stream = std::pin::Pin::new(&mut *stream); + tokio::io::AsyncWrite::poll_write(pinned_stream, cx, buf) + }, + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.inner.stream.try_lock() { + Ok(mut stream) => { + let pinned_stream = std::pin::Pin::new(&mut *stream); + tokio::io::AsyncWrite::poll_flush(pinned_stream, cx) + }, + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.inner.stream.try_lock() { + Ok(mut stream) => { + let pinned_stream = std::pin::Pin::new(&mut *stream); + tokio::io::AsyncWrite::poll_shutdown(pinned_stream, cx) + }, + Err(_) => { + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }, + } + } +} + +const DEFAULT_BUFFER_SIZE: usize = 8192; + +fn create_internal_connection_pair(metadata: InternalConnectionMetadata) -> (Arc, Arc) { + let buffer_size = metadata.buffer_size_kb.map(|kb| (kb as usize) * 1024).unwrap_or(DEFAULT_BUFFER_SIZE); + + let (upstream_io, downstream_io) = tokio::io::duplex(buffer_size); + + let upstream = Arc::new(InternalStream::new(metadata.clone(), upstream_io)); + let downstream = Arc::new(InternalStream::new(metadata, downstream_io)); + + (upstream, downstream) +} + +static GLOBAL_FACTORY: std::sync::OnceLock = std::sync::OnceLock::new(); + +pub fn global_internal_connection_factory() -> &'static InternalConnectionFactory { + GLOBAL_FACTORY.get_or_init(InternalConnectionFactory::new) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_factory_creation() { + let factory = InternalConnectionFactory::new(); + let stats = factory.get_stats().await; + assert_eq!(stats.active_listeners, 0); + assert_eq!(stats.total_pooled_connections, 0); + } + + #[tokio::test] + async fn test_listener_registration() { + let factory = InternalConnectionFactory::new(); + + let result = factory.register_listener("test_listener".to_string(), None).await; + assert!(result.is_ok()); + let (_handle, _rx, _listener_ref) = result.unwrap(); + + let stats = factory.get_stats().await; + assert_eq!(stats.active_listeners, 1); + + let result = factory.register_listener("test_listener".to_string(), None).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_listener_unregistration() { + let factory = InternalConnectionFactory::new(); + + let (_handle, _rx, _listener_ref) = factory.register_listener("test_listener".to_string(), None).await.unwrap(); + let result = factory.unregister_listener("test_listener").await; + assert!(result.is_ok()); + + let stats = factory.get_stats().await; + assert_eq!(stats.active_listeners, 0); + + let result = factory.unregister_listener("non_existent").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_connection_to_non_existent_listener() { + let factory = InternalConnectionFactory::new(); + + let result = factory.connect_to_listener("non_existent", None).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_listener_lifecycle() { + let factory = InternalConnectionFactory::new(); + + let (handle, _rx, _listener_ref) = factory.register_listener("test_listener".to_string(), None).await.unwrap(); + + assert!(factory.is_listener_active("test_listener").await); + assert!(handle.is_alive()); + + factory.unregister_listener("test_listener").await.unwrap(); + + assert!(!factory.is_listener_active("test_listener").await); + } + + #[tokio::test] + async fn test_list_listeners() { + let factory = InternalConnectionFactory::new(); + + let listeners = factory.list_listeners().await; + assert!(listeners.is_empty()); + + let (_handle1, _rx1, _listener_ref1) = factory.register_listener("listener1".to_string(), None).await.unwrap(); + let (_handle2, _rx2, _listener_ref2) = factory.register_listener("listener2".to_string(), None).await.unwrap(); + + let listeners = factory.list_listeners().await; + assert_eq!(listeners.len(), 2); + assert!(listeners.contains(&String::from("listener1"))); + assert!(listeners.contains(&String::from("listener2"))); + } + + #[tokio::test] + async fn test_global_factory() { + let factory = global_internal_connection_factory(); + let stats = factory.get_stats().await; + assert_eq!(stats.max_pooled_connections, 0); + } + + #[tokio::test] + async fn test_buffer_size_configuration() { + let metadata_default = InternalConnectionMetadata { + listener_name: "test".to_string(), + buffer_size_kb: None, + created_at: Instant::now(), + endpoint_id: None, + }; + let (upstream, downstream) = create_internal_connection_pair(metadata_default); + assert!(upstream.is_active()); + assert!(downstream.is_active()); + + let metadata_custom = InternalConnectionMetadata { + listener_name: "test".to_string(), + buffer_size_kb: Some(4), + created_at: Instant::now(), + endpoint_id: None, + }; + let (upstream_custom, downstream_custom) = create_internal_connection_pair(metadata_custom); + assert!(upstream_custom.is_active()); + assert!(downstream_custom.is_active()); + } +} diff --git a/orion-internal/src/connector.rs b/orion-internal/src/connector.rs new file mode 100644 index 00000000..fb8116de --- /dev/null +++ b/orion-internal/src/connector.rs @@ -0,0 +1,207 @@ +// Copyright 2025 The kmesh Authors +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +use crate::{connection::global_internal_connection_factory, Error, InternalStreamWrapper, Result}; + +#[derive(Debug, Clone)] +pub struct InternalClusterConnector { + listener_name: String, + endpoint_id: Option, +} + +impl InternalClusterConnector { + pub fn new(listener_name: String, endpoint_id: Option) -> Self { + Self { listener_name, endpoint_id } + } + + pub fn listener_name(&self) -> &str { + &self.listener_name + } + + pub fn endpoint_id(&self) -> Option<&str> { + self.endpoint_id.as_deref() + } + + pub async fn connect(&self) -> Result> { + let factory = global_internal_connection_factory(); + + if !factory.is_listener_active(&self.listener_name).await { + return Err(Error::new(format!( + "Internal listener '{}' is not active or not registered", + self.listener_name + ))); + } + + factory.connect_to_listener(&self.listener_name, self.endpoint_id.clone()).await + } + + pub async fn is_available(&self) -> bool { + let factory = global_internal_connection_factory(); + factory.is_listener_active(&self.listener_name).await + } +} + +#[derive(Debug, Clone)] +pub struct InternalChannelConnector { + connector: InternalClusterConnector, + cluster_name: &'static str, +} + +impl InternalChannelConnector { + pub fn new(listener_name: String, cluster_name: &'static str, endpoint_id: Option) -> Self { + let connector = InternalClusterConnector::new(listener_name, endpoint_id); + + Self { connector, cluster_name } + } + + pub fn cluster_name(&self) -> &'static str { + self.cluster_name + } + + pub fn listener_name(&self) -> &str { + self.connector.listener_name() + } + + pub async fn connect(&self) -> Result { + let stream = self.connector.connect().await?; + + Ok(InternalChannel { + stream, + cluster_name: self.cluster_name, + listener_name: self.connector.listener_name().to_owned(), + endpoint_id: self.connector.endpoint_id().map(std::borrow::ToOwned::to_owned), + }) + } + + pub async fn is_available(&self) -> bool { + self.connector.is_available().await + } +} + +pub struct InternalChannel { + pub stream: Box, + pub cluster_name: &'static str, + pub listener_name: String, + pub endpoint_id: Option, +} + +impl InternalChannel { + pub fn cluster_name(&self) -> &'static str { + self.cluster_name + } + + pub fn listener_name(&self) -> &str { + &self.listener_name + } + + pub fn endpoint_id(&self) -> Option<&str> { + self.endpoint_id.as_deref() + } +} + +pub mod cluster_helpers { + use super::{global_internal_connection_factory, InternalChannelConnector}; + use crate::InternalConnectionStats; + use orion_configuration::config::cluster::InternalEndpointAddress; + + pub fn create_internal_connector( + internal_addr: &InternalEndpointAddress, + cluster_name: &'static str, + ) -> InternalChannelConnector { + InternalChannelConnector::new( + internal_addr.server_listener_name.to_string(), + cluster_name, + internal_addr.endpoint_id.as_ref().map(std::string::ToString::to_string), + ) + } + + pub async fn is_internal_listener_available(listener_name: &str) -> bool { + let factory = global_internal_connection_factory(); + factory.is_listener_active(listener_name).await + } + + pub async fn get_internal_connection_stats() -> InternalConnectionStats { + let factory = global_internal_connection_factory(); + factory.get_stats().await + } + + pub async fn list_internal_listeners() -> Vec { + let factory = global_internal_connection_factory(); + factory.list_listeners().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_internal_connector_creation() { + let connector = InternalClusterConnector::new(String::from("test_listener"), Some(String::from("endpoint1"))); + assert_eq!(connector.listener_name(), "test_listener"); + assert_eq!(connector.endpoint_id(), Some("endpoint1")); + } + + #[tokio::test] + async fn test_connection_to_non_existent_listener() { + let connector = InternalClusterConnector::new(String::from("non_existent_listener"), None); + let result = connector.connect().await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_availability_check() { + let connector = InternalClusterConnector::new(String::from("non_existent_listener"), None); + assert!(!connector.is_available().await); + } + + #[tokio::test] + async fn test_internal_channel_connector() { + let channel_connector = InternalChannelConnector::new( + String::from("test_listener"), + "test_cluster", + Some(String::from("endpoint1")), + ); + + assert_eq!(channel_connector.cluster_name(), "test_cluster"); + assert_eq!(channel_connector.listener_name(), "test_listener"); + assert!(!channel_connector.is_available().await); + } + + #[tokio::test] + async fn test_cluster_helpers() { + use cluster_helpers::*; + use orion_configuration::config::cluster::InternalEndpointAddress; + + let internal_addr = InternalEndpointAddress { + server_listener_name: String::from("test_listener").into(), + endpoint_id: Some(String::from("endpoint1").into()), + }; + + let connector = create_internal_connector(&internal_addr, "test_cluster"); + assert_eq!(connector.cluster_name(), "test_cluster"); + assert_eq!(connector.listener_name(), "test_listener"); + + assert!(!is_internal_listener_available("non_existent").await); + + let stats = get_internal_connection_stats().await; + assert_eq!(stats.active_listeners, 0); + + let listeners = list_internal_listeners().await; + assert!(listeners.is_empty()); + } +} diff --git a/orion-internal/src/filter_state.rs b/orion-internal/src/filter_state.rs new file mode 100644 index 00000000..45df3419 --- /dev/null +++ b/orion-internal/src/filter_state.rs @@ -0,0 +1,49 @@ +// Copyright 2025 The kmesh Authors +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +use std::net::SocketAddr; + +/// Reserved internal address used as the peer address for in-process (internal) connections. +/// Chosen from the loopback range (127.255.255.254:65534) to avoid conflicts with real network traffic. +/// This address clearly identifies internal connections in logs and debugging. +pub const INTERNAL_PEER_ADDR: SocketAddr = + SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 254), 65534)); + +/// Reserved internal address used as the local address for in-process (internal) connections. +/// Chosen from the loopback range (127.255.255.255:65535) to avoid conflicts with real network traffic. +/// This address clearly identifies internal connections in logs and debugging. +pub const INTERNAL_LOCAL_ADDR: SocketAddr = + SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 255), 65535)); + +/// Check if a socket address is an internal connection address +pub fn is_internal_address(addr: &SocketAddr) -> bool { + *addr == INTERNAL_PEER_ADDR || *addr == INTERNAL_LOCAL_ADDR +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_internal_addresses() { + assert!(is_internal_address(&INTERNAL_PEER_ADDR)); + assert!(is_internal_address(&INTERNAL_LOCAL_ADDR)); + + let normal_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + assert!(!is_internal_address(&normal_addr)); + } +} diff --git a/orion-internal/src/lib.rs b/orion-internal/src/lib.rs new file mode 100644 index 00000000..0950e4ab --- /dev/null +++ b/orion-internal/src/lib.rs @@ -0,0 +1,39 @@ +// Copyright 2025 The kmesh Authors +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +//! # orion-internal +//! +//! Internal connection and internal listener infrastructure for Orion proxy. +//! This crate provides the core functionality for in-process communication between +//! different components of the proxy, particularly for waypoint proxy capabilities +//! in ambient service mesh scenarios. + +pub mod connection; +pub mod connector; +pub mod filter_state; + +// Re-export commonly used types +pub use connection::{ + global_internal_connection_factory, InternalConnectionFactory, InternalConnectionMetadata, InternalConnectionPair, + InternalConnectionStats, InternalListenerHandle, InternalStream, InternalStreamWrapper, +}; +pub use connector::{cluster_helpers, InternalChannel, InternalChannelConnector, InternalClusterConnector}; +pub use filter_state::{is_internal_address, INTERNAL_LOCAL_ADDR, INTERNAL_PEER_ADDR}; + +// Re-export error types +pub type Error = orion_error::Error; +pub type Result = ::core::result::Result; diff --git a/orion-internal/tests/integration_tests.rs b/orion-internal/tests/integration_tests.rs new file mode 100644 index 00000000..51401098 --- /dev/null +++ b/orion-internal/tests/integration_tests.rs @@ -0,0 +1,209 @@ +// Copyright 2025 The kmesh Authors +// +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// + +use orion_internal::{cluster_helpers::*, global_internal_connection_factory, InternalChannelConnector}; + +#[tokio::test] +async fn test_complete_internal_connection_flow() { + let factory = global_internal_connection_factory(); + + let listener_name = "test_integration_listener_global"; + let (_handle, mut connection_receiver, _listener_ref) = + factory.register_listener(listener_name.to_string(), None).await.expect("Failed to register listener"); + + assert!(factory.is_listener_active(listener_name).await); + let listeners = factory.list_listeners().await; + assert!(listeners.contains(&listener_name.to_string())); + + let cluster_connector = + InternalChannelConnector::new(listener_name.to_string(), "test_cluster", Some("endpoint1".to_string())); + + assert!(cluster_connector.is_available().await); + + let connection_future = cluster_connector.connect(); + let listener_future = async { connection_receiver.recv().await }; + + let (cluster_connection, listener_connection) = tokio::join!(connection_future, listener_future); + + assert!(cluster_connection.is_ok(), "Cluster connection failed: {:?}", cluster_connection.err()); + assert!(listener_connection.is_some(), "Listener didn't receive connection"); + + let cluster_channel = cluster_connection.unwrap(); + let listener_pair = listener_connection.unwrap(); + + assert_eq!(cluster_channel.cluster_name(), "test_cluster"); + assert_eq!(cluster_channel.listener_name(), listener_name); + assert_eq!(cluster_channel.endpoint_id(), Some("endpoint1")); + + let metadata = listener_pair.downstream.metadata(); + assert_eq!(metadata.listener_name, listener_name); + assert_eq!(metadata.buffer_size_kb, None); + assert_eq!(metadata.endpoint_id, Some("endpoint1".to_string())); + + assert!(listener_pair.upstream.is_active()); + assert!(listener_pair.downstream.is_active()); + + factory.unregister_listener(listener_name).await.expect("Failed to unregister listener"); + assert!(!factory.is_listener_active(listener_name).await); +} + +#[tokio::test] +async fn test_connection_pooling() { + let factory = global_internal_connection_factory(); + + let listener_name = "test_pooling_listener_global"; + let (_handle, mut connection_receiver, _listener_ref) = + factory.register_listener(listener_name.to_string(), None).await.expect("Failed to register listener"); + + let connector = InternalChannelConnector::new(listener_name.to_string(), "test_cluster", None); + + let connection_future = connector.connect(); + let listener_future = connection_receiver.recv(); + let (_cluster_conn, listener_pair) = tokio::join!(connection_future, listener_future); + + assert!(listener_pair.is_some()); + + let stats = factory.get_stats().await; + assert!(stats.active_listeners >= 1); + + factory.unregister_listener(listener_name).await.expect("Failed to unregister listener"); +} + +#[tokio::test] +async fn test_error_scenarios() { + let factory = global_internal_connection_factory(); + + let connector = InternalChannelConnector::new("non_existent_listener".to_string(), "test_cluster", None); + + assert!(!connector.is_available().await); + let result = connector.connect().await; + assert!(result.is_err()); + + let listener_name = "test_error_listener_global"; + let result1 = factory.register_listener(listener_name.to_string(), None).await; + assert!(result1.is_ok()); + let (_handle1, _rx1, _listener_ref1) = result1.unwrap(); + + let result2 = factory.register_listener(listener_name.to_string(), None).await; + assert!(result2.is_err()); + + let result = factory.unregister_listener("non_existent").await; + assert!(result.is_err()); + + factory.unregister_listener(listener_name).await.expect("Failed to unregister listener"); +} + +#[tokio::test] +async fn test_connection_to_unregistered_listener() { + let _factory = global_internal_connection_factory(); + + let connector = InternalChannelConnector::new("non_existent_timeout_listener".to_string(), "test_cluster", None); + + let result = connector.connect().await; + assert!(result.is_err()); + + assert!(!connector.is_available().await); +} + +#[tokio::test] +async fn test_global_factory() { + let factory = global_internal_connection_factory(); + + let listener_name = "test_global_listener"; + let (_handle, _rx, _listener_ref) = factory.register_listener(listener_name.to_string(), None).await.unwrap(); + + assert!(factory.is_listener_active(listener_name).await); + + let stats = factory.get_stats().await; + assert!(stats.active_listeners > 0); + + factory.unregister_listener(listener_name).await.expect("Failed to unregister listener"); +} + +#[tokio::test] +async fn test_statistics_and_monitoring() { + let factory = global_internal_connection_factory(); + + let listener1 = "test_stats_listener1_global"; + + let (_handle1, _rx1, _listener_ref1) = + factory.register_listener(listener1.to_string(), None).await.expect("Failed to register listener1"); + + let stats = factory.get_stats().await; + assert!(stats.active_listeners >= 1); + + let listeners = factory.list_listeners().await; + assert!(listeners.contains(&listener1.to_string())); + + factory.unregister_listener(listener1).await.expect("Failed to unregister listener1"); +} + +#[tokio::test] +async fn test_cluster_helpers() { + use orion_configuration::config::cluster::InternalEndpointAddress; + + let internal_addr = InternalEndpointAddress { + server_listener_name: "test_cluster_helpers_listener".to_string().into(), + endpoint_id: Some("endpoint1".to_string().into()), + }; + + let connector = create_internal_connector(&internal_addr, "test_cluster"); + assert_eq!(connector.cluster_name(), "test_cluster"); + assert_eq!(connector.listener_name(), "test_cluster_helpers_listener"); + + assert!(!is_internal_listener_available("non_existent_listener_xyz").await); + + let stats = get_internal_connection_stats().await; + assert_eq!(stats.max_pooled_connections, 0); + + let factory = global_internal_connection_factory(); + let (_handle, _rx, _listener_ref) = + factory.register_listener("test_cluster_helpers_listener".to_string(), None).await.unwrap(); + + assert!(is_internal_listener_available("test_cluster_helpers_listener").await); + + let listeners = list_internal_listeners().await; + assert!(listeners.contains(&"test_cluster_helpers_listener".to_string())); + + factory.unregister_listener("test_cluster_helpers_listener").await.unwrap(); +} + +#[tokio::test] +async fn test_buffer_size_configuration() { + let factory = global_internal_connection_factory(); + + let listener_name = "test_buffer_size_listener"; + let buffer_size_kb = Some(8); + + let (_handle, mut connection_receiver, _listener_ref) = + factory.register_listener(listener_name.to_string(), buffer_size_kb).await.unwrap(); + + let connector = InternalChannelConnector::new(listener_name.to_string(), "test_cluster", None); + + let connection_future = connector.connect(); + let listener_future = connection_receiver.recv(); + let (_cluster_conn, listener_pair) = tokio::join!(connection_future, listener_future); + + assert!(listener_pair.is_some()); + let connection_pair = listener_pair.unwrap(); + + let metadata = connection_pair.downstream.metadata(); + assert_eq!(metadata.buffer_size_kb, Some(8)); + assert_eq!(metadata.listener_name, listener_name); + + factory.unregister_listener(listener_name).await.unwrap(); +} diff --git a/orion-lib/Cargo.toml b/orion-lib/Cargo.toml index 3319e985..e973481d 100644 --- a/orion-lib/Cargo.toml +++ b/orion-lib/Cargo.toml @@ -37,6 +37,7 @@ orion-error.workspace = true orion-format.workspace = true orion-http-header.workspace = true orion-interner.workspace = true +orion-internal.workspace = true orion-metrics.workspace = true orion-tracing.workspace = true orion-xds.workspace = true diff --git a/orion-lib/src/clusters/load_assignment.rs b/orion-lib/src/clusters/load_assignment.rs index 6d1faf37..2c3c2c82 100644 --- a/orion-lib/src/clusters/load_assignment.rs +++ b/orion-lib/src/clusters/load_assignment.rs @@ -80,7 +80,7 @@ impl EndpointAddressType { if let Ok(socket_addr) = addr_str.parse::() { EndpointAddress::Socket(socket_addr) } else { - panic!("Cannot convert authority back to socket address: {}", addr_str); + panic!("Cannot convert authority back to socket address: {addr_str}"); } }, EndpointAddressType::Internal(internal_addr, _) => EndpointAddress::Internal(internal_addr.clone()), diff --git a/orion-lib/src/lib.rs b/orion-lib/src/lib.rs index 21707b4a..4e04a633 100644 --- a/orion-lib/src/lib.rs +++ b/orion-lib/src/lib.rs @@ -44,7 +44,7 @@ pub use clusters::{ load_assignment::PartialClusterLoadAssignment, ClusterLoadAssignmentBuilder, }; -pub use listeners::listener::ListenerFactory; +pub use listeners::listener::{init_internal_worker_pool, ListenerFactory}; pub use listeners_manager::{ListenerConfigurationChange, ListenersManager, RouteConfigurationChange}; pub use orion_configuration::config::network_filters::http_connection_manager::RouteConfiguration; use orion_configuration::config::{ @@ -56,6 +56,11 @@ use orion_configuration::config::{ pub use secrets::SecretManager; pub(crate) use transport::AsyncStream; +pub use orion_internal::{ + cluster_helpers, global_internal_connection_factory, InternalChannelConnector, InternalConnectionFactory, + InternalConnectionPair, InternalConnectionStats, InternalListenerHandle, +}; + pub type Error = orion_error::Error; pub type Result = ::core::result::Result; diff --git a/orion-lib/src/listeners/filter_state.rs b/orion-lib/src/listeners/filter_state.rs index 7fe2d442..80f114a4 100644 --- a/orion-lib/src/listeners/filter_state.rs +++ b/orion-lib/src/listeners/filter_state.rs @@ -17,6 +17,7 @@ use compact_str::CompactString; use orion_configuration::config::common::TlvType; +use orion_internal::{is_internal_address, INTERNAL_LOCAL_ADDR, INTERNAL_PEER_ADDR}; use std::{collections::HashMap, net::SocketAddr}; #[derive(Debug, Clone)] @@ -39,6 +40,13 @@ pub enum DownstreamConnectionMetadata { proxy_peer_address: SocketAddr, proxy_local_address: SocketAddr, }, + /// Internal connections from in-process communication (e.g., waypoint proxy) + /// - `listener_name`: identifies which internal listener accepted this connection + /// - `endpoint_id`: optional identifier for the specific endpoint (used for load balancing) + FromInternal { + listener_name: String, + endpoint_id: Option, + }, } impl DownstreamConnectionMetadata { @@ -47,15 +55,26 @@ impl DownstreamConnectionMetadata { Self::FromSocket { peer_address, .. } => *peer_address, Self::FromProxyProtocol { original_peer_address, .. } => *original_peer_address, Self::FromTlv { proxy_peer_address, .. } => *proxy_peer_address, + Self::FromInternal { .. } => INTERNAL_PEER_ADDR, } } + pub fn local_address(&self) -> SocketAddr { match self { Self::FromSocket { local_address, .. } => *local_address, Self::FromProxyProtocol { original_destination_address, .. } => *original_destination_address, Self::FromTlv { original_destination_address, .. } => *original_destination_address, + Self::FromInternal { .. } => INTERNAL_LOCAL_ADDR, } } + + pub fn is_internal(&self) -> bool { + matches!(self, Self::FromInternal { .. }) + } + + pub fn is_internal_address(addr: SocketAddr) -> bool { + is_internal_address(&addr) + } } #[derive(Debug, Clone)] diff --git a/orion-lib/src/listeners/http_connection_manager.rs b/orion-lib/src/listeners/http_connection_manager.rs index 21f3a2c7..09457f8b 100644 --- a/orion-lib/src/listeners/http_connection_manager.rs +++ b/orion-lib/src/listeners/http_connection_manager.rs @@ -1024,7 +1024,7 @@ impl Service> for HttpRequestHandler { add, 1, trans_handler.thread_id(), - &[KeyValue::new("listener", listener_name_for_route.to_string())] + &[KeyValue::new("listener", listener_name_for_route.to_owned())] ); if let Some(state) = trans_handler.span_state.as_ref() { @@ -1049,7 +1049,7 @@ impl Service> for HttpRequestHandler { add, nbytes + resp_head_size as u64, trans_handler.thread_id(), - &[KeyValue::new("listener", listener_name_for_response.to_string())] + &[KeyValue::new("listener", listener_name_for_response.to_owned())] ); let is_transaction_complete = if let Some(ctx) = trans_handler.access_log_ctx.as_ref() { diff --git a/orion-lib/src/listeners/http_connection_manager/route.rs b/orion-lib/src/listeners/http_connection_manager/route.rs index 05e7b413..c5103f30 100644 --- a/orion-lib/src/listeners/http_connection_manager/route.rs +++ b/orion-lib/src/listeners/http_connection_manager/route.rs @@ -194,7 +194,7 @@ impl<'a> RequestHandler<(MatchedRequest<'a>, &HttpConnectionManager)> for &Route let err = err.into_inner(); let event_error = EventError::try_infer_from(&err); let flags = event_error.clone().map(ResponseFlags::from).unwrap_or_default(); - let event_kind = event_error.map_or(EventKind::ViaUpstream, |e| EventKind::Error(e)); + let event_kind = event_error.map_or(EventKind::ViaUpstream, EventKind::Error); debug!( "HttpConnectionManager Error processing response {:?}: {}({})", err, @@ -211,7 +211,7 @@ impl<'a> RequestHandler<(MatchedRequest<'a>, &HttpConnectionManager)> for &Route let err = err.into_inner(); let event_error = EventError::try_infer_from(&err); let flags = event_error.clone().map(ResponseFlags::from).unwrap_or_default(); - let event_kind = event_error.map_or(EventKind::ViaUpstream, |e| EventKind::Error(e)); + let event_kind = event_error.map_or(EventKind::ViaUpstream, EventKind::Error); debug!( "Failed to get an HTTP connection: {:?}: {}({})", err, diff --git a/orion-lib/src/listeners/listener.rs b/orion-lib/src/listeners/listener.rs index de1e37fa..75048110 100644 --- a/orion-lib/src/listeners/listener.rs +++ b/orion-lib/src/listeners/listener.rs @@ -50,10 +50,82 @@ use std::{ }; use tokio::{ net::{TcpListener, TcpSocket}, - sync::broadcast::{self}, + sync::{ + broadcast::{self}, + mpsc, + }, }; use tracing::{debug, info, warn}; +#[derive(Debug)] +struct InternalConnectionWorkerPool { + workers: Vec>, + senders: Vec>, + next_worker: std::sync::atomic::AtomicUsize, +} + +#[derive(Debug)] +struct InternalConnectionTask { + listener_name: String, + connection_pair: orion_internal::InternalConnectionPair, + filter_chains: Arc>, +} + +impl InternalConnectionWorkerPool { + fn new(num_workers: usize) -> Self { + let mut senders: Vec> = Vec::with_capacity(num_workers); + let mut workers = Vec::with_capacity(num_workers); + + for _ in 0..num_workers { + let (sender, mut receiver) = mpsc::unbounded_channel(); + senders.push(sender); + + let worker = tokio::spawn(async move { + while let Some(task) = receiver.recv().await { + if let Err(e) = + handle_internal_connection_static(task.listener_name, task.connection_pair, task.filter_chains) + { + warn!("Error handling internal connection task: {}", e); + } + } + }); + workers.push(worker); + } + + Self { workers, senders, next_worker: std::sync::atomic::AtomicUsize::new(0) } + } + + fn submit_task(&self, task: InternalConnectionTask) -> Result<()> { + let worker_index = self.next_worker.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % self.senders.len(); + self.senders[worker_index].send(task).map_err(|_| Error::new("Worker pool is shut down")) + } + + async fn shutdown(self) { + drop(self.senders); + + for worker in self.workers { + let _ = worker.await; + } + } +} + +static INTERNAL_WORKER_POOL: std::sync::OnceLock = std::sync::OnceLock::new(); + +pub fn init_internal_worker_pool(num_workers: Option) { + let workers = num_workers + .or_else(|| std::env::var("ORION_INTERNAL_WORKER_POOL_SIZE").ok().and_then(|s| s.parse::().ok())) + .unwrap_or(4); + let _ = INTERNAL_WORKER_POOL.set(InternalConnectionWorkerPool::new(workers)); +} + +fn get_internal_worker_pool() -> &'static InternalConnectionWorkerPool { + INTERNAL_WORKER_POOL.get_or_init(|| { + let workers = + std::env::var("ORION_INTERNAL_WORKER_POOL_SIZE").ok().and_then(|s| s.parse::().ok()).unwrap_or(4); + InternalConnectionWorkerPool::new(workers) + }) +} + #[derive(Debug, Clone)] struct PartialListener { name: &'static str, @@ -311,7 +383,7 @@ impl Listener { maybe_route_update = route_updates_receiver.recv() => { //todo: add context to the error here once orion-error lands match maybe_route_update { - Ok(route_update) => {Self::process_route_update(&name, &filter_chains, route_update)}, + Ok(route_update) => {Self::process_route_update(name, &filter_chains, route_update)}, Err(e) => {return e.into();} } }, @@ -320,7 +392,7 @@ impl Listener { Ok(secret_update) => { // todo: possibly expensive clone - may need to rethink this structure let mut filter_chains_clone = filter_chains.as_ref().clone(); - Self::process_secret_update(&name, &mut filter_chains_clone, secret_update); + Self::process_secret_update(name, &mut filter_chains_clone, secret_update); filter_chains = Arc::new(filter_chains_clone); } Err(e) => {return e.into();} @@ -332,38 +404,88 @@ impl Listener { async fn run_internal_listener( name: &'static str, - _internal_config: InternalListenerConfig, + internal_config: InternalListenerConfig, filter_chains: HashMap, _with_tls_inspector: bool, _with_tlv_listener_filter: bool, mut route_updates_receiver: broadcast::Receiver, mut secret_updates_receiver: broadcast::Receiver, ) -> Error { + use orion_internal::global_internal_connection_factory; + use tracing::{debug, error, info, warn}; + let filter_chains = Arc::new(filter_chains); + let factory = global_internal_connection_factory(); + + let (_handle, mut connection_receiver, _listener_ref) = + match factory.register_listener(name.to_owned(), internal_config.buffer_size_kb).await { + Ok(result) => result, + Err(e) => { + error!("Failed to register internal listener '{}': {}", name, e); + return e; + }, + }; + + info!("Internal listener '{}' registered to connection factory", name); - // For now, internal listeners just wait for updates - // The actual connection handling will be implemented when we add the internal connection factory loop { tokio::select! { + maybe_connection = connection_receiver.recv() => { + if let Some(connection_pair) = maybe_connection { + debug!("Internal listener '{}' received new connection", name); + + let filter_chains_clone = filter_chains.clone(); + let listener_name = name.to_owned(); + + // Use worker pool instead of tokio::spawn for better performance + // with large numbers of short connections + let task = InternalConnectionTask { + listener_name, + connection_pair, + filter_chains: filter_chains_clone, + }; + + if let Err(e) = get_internal_worker_pool().submit_task(task) { + warn!("Failed to submit internal connection task: {}", e); + } + } else { + warn!("Internal listener '{}' connection channel closed", name); + break; + } + }, maybe_route_update = route_updates_receiver.recv() => { match maybe_route_update { - Ok(route_update) => {Self::process_route_update(&name, &filter_chains, route_update);} - Err(e) => {return e.into();} + Ok(route_update) => { + Self::process_route_update(name, &filter_chains, route_update); + } + Err(e) => { + error!("Route update error for internal listener '{}': {}", name, e); + return e.into(); + } } }, maybe_secret_update = secret_updates_receiver.recv() => { match maybe_secret_update { Ok(secret_update) => { let mut filter_chains_clone = filter_chains.as_ref().clone(); - Self::process_secret_update(&name, &mut filter_chains_clone, secret_update); - // Note: For internal listeners, we'd need to update the shared state - // This will be implemented when we add the internal connection factory + Self::process_secret_update(name, &mut filter_chains_clone, secret_update); + // TODO: Update the shared filter chains state for active connections + } + Err(e) => { + error!("Secret update error for internal listener '{}': {}", name, e); + return e.into(); } - Err(e) => {return e.into();} } } } } + + if let Err(e) = factory.unregister_listener(name).await { + warn!("Failed to unregister internal listener '{}': {}", name, e); + } + + info!("Internal listener '{}' shutting down", name); + Error::new("Internal listener shutdown") } fn select_filterchain<'a, T>( @@ -472,8 +594,8 @@ impl Listener { let ssl = AtomicBool::new(false); defer! { - with_metric!(listeners::DOWNSTREAM_CX_DESTROY, add, 1, shard_id, &[KeyValue::new("listener", listener_name.to_string())]); - with_metric!(listeners::DOWNSTREAM_CX_ACTIVE, sub, 1, shard_id, &[KeyValue::new("listener", listener_name.to_string())]); + with_metric!(listeners::DOWNSTREAM_CX_DESTROY, add, 1, shard_id, &[KeyValue::new("listener", listener_name.to_owned())]); + with_metric!(listeners::DOWNSTREAM_CX_ACTIVE, sub, 1, shard_id, &[KeyValue::new("listener", listener_name.to_owned())]); if ssl.load(Ordering::Relaxed) { with_metric!(http::DOWNSTREAM_CX_SSL_ACTIVE, add, 1, shard_id, &[KeyValue::new("listener", listener_name)]); } @@ -512,7 +634,7 @@ impl Listener { add, 1, shard_id, - &[KeyValue::new("listener", listener_name.to_string())] + &[KeyValue::new("listener", listener_name.to_owned())] ); with_metric!( http::DOWNSTREAM_CX_SSL_ACTIVE, @@ -870,3 +992,42 @@ filter_chains: assert_eq!(Listener::select_filterchain(&m, &metadata, Some("hello.world")).unwrap().copied(), Some(3)); } } + +fn handle_internal_connection_static( + listener_name: String, + connection_pair: orion_internal::InternalConnectionPair, + filter_chains: Arc>, +) -> Result<()> { + use crate::listeners::filter_state::DownstreamConnectionMetadata; + + debug!("Handling new internal connection for listener '{}'", listener_name); + + let downstream_metadata = DownstreamConnectionMetadata::FromInternal { + listener_name: listener_name.clone(), + endpoint_id: connection_pair.downstream.metadata().endpoint_id.clone(), + }; + + let filter_chain = if let Some(fc) = Listener::select_filterchain(&filter_chains, &downstream_metadata, None)? { + fc + } else { + warn!("No matching filter chain found for internal connection"); + return Err(crate::Error::new("No matching filter chain")); + }; + + let downstream_stream = connection_pair.downstream; + + match &filter_chain.handler { + crate::listeners::filterchain::ConnectionHandler::Http(_http_manager) => { + info!("Processing internal connection through HTTP filter chain"); + // TODO: Implement HTTP connection processing + let _ = downstream_stream; + Ok(()) + }, + crate::listeners::filterchain::ConnectionHandler::Tcp(_tcp_proxy) => { + info!("Processing internal connection through TCP filter chain"); + // TODO: Implement TCP connection processing + let _ = downstream_stream; + Ok(()) + }, + } +} diff --git a/orion-lib/src/listeners/listeners_manager.rs b/orion-lib/src/listeners/listeners_manager.rs index 7817f57b..4e692421 100644 --- a/orion-lib/src/listeners/listeners_manager.rs +++ b/orion-lib/src/listeners/listeners_manager.rs @@ -20,9 +20,12 @@ use tokio::sync::{broadcast, mpsc}; use tracing::{info, warn}; use orion_configuration::config::{ - listener::ListenerAddress, network_filters::http_connection_manager::RouteConfiguration, Listener as ListenerConfig, + network_filters::http_connection_manager::RouteConfiguration, Listener as ListenerConfig, }; +#[cfg(test)] +use orion_configuration::config::listener::ListenerAddress; + use super::listener::{Listener, ListenerFactory}; use crate::{secrets::TransportSecret, ConfigDump, Result}; #[derive(Debug, Clone)] @@ -116,7 +119,7 @@ impl ListenersManager { warn!("Internal problem when updating a route: {e}"); } }, - _ = ct.cancelled() => { + () = ct.cancelled() => { warn!("Listener manager exiting"); return Ok(()); } @@ -125,7 +128,7 @@ impl ListenersManager { } pub fn start_listener(&mut self, listener: Listener, listener_conf: ListenerConfig) -> Result<()> { - let listener_name = listener.get_name().to_string(); + let listener_name = listener.get_name().to_owned(); if let Some((addr, dev)) = listener.get_socket() { info!("Listener {} at {addr} (device bind:{})", listener_name, dev.is_some()); } else { @@ -145,7 +148,7 @@ impl ListenersManager { let listener_info = ListenerInfo::new(join_handle, listener_conf, version); self.listener_handles.insert(listener_name.clone(), listener_info); - let version_count = self.listener_handles.get_vec(&listener_name).map(|v| v.len()).unwrap_or(0); + let version_count = self.listener_handles.get_vec(&listener_name).map(std::vec::Vec::len).unwrap_or(0); info!("Started version {} of listener {} ({} total active version(s))", version, listener_name, version_count); Ok(()) diff --git a/orion-proxy/src/proxy.rs b/orion-proxy/src/proxy.rs index 39674b06..af517fce 100644 --- a/orion-proxy/src/proxy.rs +++ b/orion-proxy/src/proxy.rs @@ -237,7 +237,7 @@ fn spawn_proxy_runtime_from_thread( _ = start_proxy(configuration_receivers, ct.clone()) => { info!("Proxy Runtime terminated!"); } - _ = ct.cancelled() => { + () = ct.cancelled() => { info!("Shutdown channel closed, shutting down Proxy runtime!"); } } @@ -264,7 +264,7 @@ fn spawn_services_runtime_from_thread( } info!("Services Runtime terminated!"); } - _ = ct.cancelled() => { + () = ct.cancelled() => { info!("Shutdown channel closed, shutting down Services runtime!"); } }