Skip to content

Commit e942545

Browse files
committed
feat: implement internal connection factory for in-process communication
- InternalConnectionWorkerPool with efficient task distribution - Special address range for internal connections (127.255.255.x) - Enhanced connection metadata with listener_name and endpoint_id - Performance optimization replacing tokio::spawn overhead Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
1 parent 7574a84 commit e942545

File tree

3 files changed

+492
-52
lines changed

3 files changed

+492
-52
lines changed
Lines changed: 370 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,370 @@
1+
---
2+
title: Internal Listener and Upstream Transport Support
3+
authors:
4+
- "@Eeshu-Yadav"
5+
reviewers:
6+
- "@YaoZengzeng"
7+
- "@dawid-nowak"
8+
approvers:
9+
- "@YaoZengzeng"
10+
- "@dawid-nowak"
11+
12+
creation-date: 2025-10-03
13+
14+
---
15+
16+
## Internal Listener and Upstream Transport Support
17+
18+
### Summary
19+
20+
This proposal implements internal listener and upstream transport functionality in Orion to enable waypoint proxy capabilities. Internal listeners allow in-process communication without network APIs, while internal upstream transport enables metadata passthrough between proxy hops. This enables ambient mesh deployments with TCP proxy chaining and multi-hop routing.
21+
22+
### Motivation
23+
24+
To support ambient service mesh deployments, Orion needs:
25+
26+
1. **Internal connections**: Accept connections from within the same process via in-memory channels
27+
2. **Name-based routing**: Route to internal listeners by name instead of network addresses
28+
3. **Metadata propagation**: Preserve request context across proxy hops for routing and observability
29+
4. **Performance optimization**: Eliminate network stack overhead for co-located proxy communication
30+
31+
#### Goals
32+
33+
- Implement Envoy-compatible internal listener and upstream transport support
34+
- Enable clusters to connect via `server_listener_name` with metadata passthrough
35+
- Provide thread-safe connection handling with proper lifecycle management
36+
- Maintain full compatibility with Envoy xDS configurations
37+
- Ensure zero performance regression for network listeners
38+
39+
40+
### Proposal
41+
42+
The proposal introduces three main components to enable internal listener and upstream transport functionality:
43+
44+
1. **Internal Connection Factory**: A global, thread-safe registry that manages internal listener registration and connection establishment between clusters and listeners within the same proxy instance.
45+
46+
2. **Enhanced Internal Listener Runtime**: Extension of the existing listener infrastructure to handle internal connections, process them through filter chains, and manage lifecycle events.
47+
48+
3. **Internal Upstream Transport**: Implementation of cluster-side functionality to establish connections to internal endpoints and pass metadata through the transport socket layer.
49+
50+
The implementation follows Envoy's internal listener design while leveraging Rust's type system and async runtime for safety and performance.
51+
52+
53+
#### Notes
54+
55+
**Design Decisions:**
56+
- In-process only communication using Tokio duplex streams
57+
- Global factory pattern with `std::sync::OnceLock` for thread-safe initialization
58+
- Weak references for automatic lifecycle management
59+
- Initial support for host, cluster, and route metadata (request metadata in future)
60+
- Full Envoy configuration compatibility with listener name validation
61+
62+
63+
### Design Details
64+
65+
#### Architecture Overview
66+
67+
```
68+
┌─────────────────────────────────────────────────────────────────┐
69+
│ Orion Proxy Process │
70+
│ │
71+
│ ┌──────────────┐ ┌──────────────┐ │
72+
│ │ External │ │ Internal │ │
73+
│ │ Listener │ │ Listener │ │
74+
│ │ (Network) │ │ (In-Memory) │ │
75+
│ └──────┬───────┘ └──────┬───────┘ │
76+
│ │ │ │
77+
│ │ TCP Connection │ Register │
78+
│ ▼ ▼ │
79+
│ ┌────────────────────────────────────────────────────┐ │
80+
│ │ Internal Connection Factory │ │
81+
│ │ ┌──────────────────────────────────────────┐ │ │
82+
│ │ │ Listener Registry │ │ │
83+
│ │ │ HashMap<String, ListenerHandle> │ │ │
84+
│ │ └──────────────────────────────────────────┘ │ │
85+
│ └────────────────────────────┬───────────────────────┘ │
86+
│ │ │
87+
│ │ Connect │
88+
│ ▼ │
89+
│ ┌──────────────┐ ┌─────────────────┐ │
90+
│ │ Cluster │─────▶│ TCP Proxy │ │
91+
│ │ (Internal │ │ Filter │ │
92+
│ │ Endpoint) │ └─────────────────┘ │
93+
│ └──────────────┘ │
94+
│ │ │
95+
│ │ Internal Connection (Duplex Stream) │
96+
│ ▼ │
97+
│ ┌──────────────────────────────────────┐ │
98+
│ │ Internal Upstream Transport │ │
99+
│ │ - Metadata Passthrough │ │
100+
│ │ - Host/Cluster/Route Metadata │ │
101+
│ └──────────────────────────────────────┘ │
102+
└─────────────────────────────────────────────────────────────────┘
103+
```
104+
105+
#### Component Details
106+
107+
##### 1. Internal Connection Factory
108+
109+
**Location**: `orion-lib/src/transport/internal_connection.rs`
110+
111+
```rust
112+
pub struct InternalConnectionFactory {
113+
listeners: Arc<RwLock<HashMap<String, InternalListenerHandle>>>,
114+
}
115+
116+
pub struct InternalListenerHandle {
117+
pub name: String,
118+
pub connection_sender: mpsc::UnboundedSender<InternalConnectionPair>,
119+
listener_ref: Weak<()>,
120+
}
121+
122+
pub struct InternalConnectionPair {
123+
pub upstream: Arc<InternalStream>,
124+
pub downstream: Arc<InternalStream>,
125+
}
126+
127+
pub struct InternalStream {
128+
metadata: InternalConnectionMetadata,
129+
stream: tokio::io::DuplexStream,
130+
is_closed: Arc<RwLock<bool>>,
131+
}
132+
```
133+
134+
**Key Operations**: `register_listener`, `unregister_listener`, `connect_to_listener`, `is_listener_active`, `list_listeners`, `get_stats`
135+
136+
##### 2. Enhanced Internal Listener Runtime
137+
138+
**Location**: `orion-lib/src/listeners/listener.rs`
139+
140+
```rust
141+
async fn run_internal_listener(
142+
name: &'static str,
143+
filter_chains: HashMap<FilterChainMatch, FilterchainType>,
144+
mut route_updates_receiver: broadcast::Receiver<RouteConfigurationChange>,
145+
mut secret_updates_receiver: broadcast::Receiver<TlsContextChange>,
146+
) -> Error {
147+
let factory = global_internal_connection_factory();
148+
let (_handle, mut connection_receiver, _listener_ref) =
149+
factory.register_listener(name.to_string()).await?;
150+
151+
loop {
152+
tokio::select! {
153+
Some(connection_pair) = connection_receiver.recv() => {
154+
tokio::spawn(handle_internal_connection(connection_pair, filter_chains_clone));
155+
}
156+
Ok(route_update) = route_updates_receiver.recv() => {
157+
process_route_update(&name, &filter_chains, route_update);
158+
}
159+
Ok(secret_update) = secret_updates_receiver.recv() => {
160+
process_secret_update(&name, &mut filter_chains_clone, secret_update);
161+
}
162+
}
163+
}
164+
}
165+
```
166+
167+
##### 3. Internal Cluster Connector
168+
169+
**Location**: `orion-lib/src/transport/internal_cluster_connector.rs`
170+
171+
```rust
172+
pub struct InternalClusterConnector {
173+
listener_name: String,
174+
endpoint_id: Option<String>,
175+
}
176+
177+
impl InternalClusterConnector {
178+
pub async fn connect(&self) -> Result<AsyncStream> {
179+
let factory = global_internal_connection_factory();
180+
factory.connect_to_listener(&self.listener_name, self.endpoint_id.clone()).await
181+
}
182+
}
183+
184+
pub struct InternalChannelConnector {
185+
connector: InternalClusterConnector,
186+
cluster_name: &'static str,
187+
}
188+
```
189+
190+
##### 4. Configuration Data Structures
191+
192+
**Listener Configuration** (`orion-configuration/src/config/listener.rs`):
193+
194+
```rust
195+
pub enum ListenerAddress {
196+
Socket(SocketAddr),
197+
Internal(InternalListener),
198+
}
199+
200+
pub struct InternalListener {
201+
pub buffer_size_kb: Option<u32>,
202+
}
203+
```
204+
205+
**Cluster Configuration** (`orion-configuration/src/config/cluster.rs`):
206+
207+
```rust
208+
pub enum EndpointAddress {
209+
Socket(SocketAddr),
210+
Internal(InternalEndpointAddress),
211+
}
212+
213+
pub struct InternalEndpointAddress {
214+
pub server_listener_name: CompactString,
215+
pub endpoint_id: Option<CompactString>,
216+
}
217+
218+
pub enum TransportSocket {
219+
InternalUpstream(InternalUpstreamTransport),
220+
RawBuffer,
221+
}
222+
223+
pub struct InternalUpstreamTransport {
224+
pub passthrough_metadata: Vec<MetadataValueSource>,
225+
pub transport_socket: Box<TransportSocket>,
226+
}
227+
228+
pub struct MetadataValueSource {
229+
pub kind: MetadataKind,
230+
pub name: CompactString,
231+
}
232+
233+
pub enum MetadataKind {
234+
Host,
235+
Route,
236+
Cluster,
237+
}
238+
```
239+
240+
##### 5. Example Configuration
241+
242+
**Bootstrap Configuration**:
243+
244+
```yaml
245+
static_resources:
246+
listeners:
247+
# External listener accepting network connections
248+
- name: "listener_0"
249+
address:
250+
socket_address:
251+
address: "0.0.0.0"
252+
port_value: 15001
253+
filter_chains:
254+
- filters:
255+
- name: "tcp_proxy"
256+
typed_config:
257+
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy"
258+
stat_prefix: "ingress_tcp"
259+
cluster: "internal_cluster"
260+
261+
# Internal listener accepting in-process connections
262+
- name: "waypoint_internal"
263+
address:
264+
envoy_internal_address:
265+
server_listener_name: "waypoint_internal"
266+
filter_chains:
267+
- filters:
268+
- name: "http_connection_manager"
269+
typed_config:
270+
"@type": "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager"
271+
stat_prefix: "waypoint_http"
272+
route_config:
273+
name: "local_route"
274+
virtual_hosts:
275+
- name: "backend"
276+
domains: ["*"]
277+
routes:
278+
- match: { prefix: "/" }
279+
route: { cluster: "backend_service" }
280+
281+
clusters:
282+
# Cluster routing to internal listener
283+
- name: "internal_cluster"
284+
type: "STATIC"
285+
load_assignment:
286+
cluster_name: "internal_cluster"
287+
endpoints:
288+
- lb_endpoints:
289+
- endpoint:
290+
address:
291+
envoy_internal_address:
292+
server_listener_name: "waypoint_internal"
293+
transport_socket:
294+
name: "internal_upstream"
295+
typed_config:
296+
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.internal_upstream.v3.InternalUpstreamTransport"
297+
passthrough_metadata:
298+
- kind: { host: {} }
299+
name: "envoy.filters.listener.original_dst"
300+
- kind: { cluster: {} }
301+
name: "istio.workload"
302+
transport_socket:
303+
name: "raw_buffer"
304+
typed_config:
305+
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.raw_buffer.v3.RawBuffer"
306+
307+
# Backend service cluster
308+
- name: "backend_service"
309+
type: "STATIC"
310+
load_assignment:
311+
cluster_name: "backend_service"
312+
endpoints:
313+
- lb_endpoints:
314+
- endpoint:
315+
address:
316+
socket_address:
317+
address: "10.0.0.5"
318+
port_value: 8080
319+
```
320+
321+
#### Implementation Phases
322+
323+
The implementation is divided into four GitHub issues for manageable development and review:
324+
325+
**Phase 1: Internal Connection Factory** - Connection factory with thread-safe registry and lifecycle management
326+
327+
**Phase 2: Enhanced Internal Listener Runtime** - Connection acceptance and filter chain integration
328+
329+
**Phase 3: Cluster Internal Connection Support** - Cluster connectors with load balancing for internal endpoints
330+
331+
**Phase 4: Internal Upstream Transport & Metadata Passthrough** - Metadata extraction and passthrough implementation
332+
333+
#### Test Plan
334+
335+
**Unit Tests**:
336+
- Listener registration/unregistration in connection factory
337+
- Connection establishment between listeners and clusters
338+
- Thread safety and concurrent access
339+
- Error handling for non-existent/inactive listeners
340+
341+
**Integration Tests**:
342+
- End-to-end flow: External listener → Internal listener → Backend
343+
- Configuration parsing and validation
344+
- Metadata propagation across proxy hops
345+
346+
---
347+
348+
## References
349+
350+
1. [Envoy Internal Listener Documentation](https://www.envoyproxy.io/docs/envoy/latest/configuration/other_features/internal_listener)
351+
2. [Envoy Internal Upstream Transport Proto](https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/transport_sockets/internal_upstream/v3/internal_upstream.proto)
352+
3. [Envoy Metadata Types](https://www.envoyproxy.io/docs/envoy/latest/api-v3/type/metadata/v3/metadata.proto)
353+
354+
355+
---
356+
357+
## Appendix
358+
359+
### Glossary
360+
361+
- **Internal Listener**: A listener that accepts connections from within the proxy process rather than from the network
362+
- **Waypoint Proxy**: A shared proxy in ambient service mesh that handles L7 processing for multiple workloads
363+
- **Internal Upstream Transport**: Transport socket that enables metadata passthrough for internal connections
364+
- **Server Listener Name**: Unique identifier for an internal listener used by clusters to establish connections
365+
- **Metadata Passthrough**: Mechanism to propagate context (host/cluster/route metadata) across proxy hops
366+
- **Duplex Stream**: Bidirectional async I/O stream provided by Tokio for in-memory communication
367+
368+
### Acknowledgments
369+
370+
This feature was proposed by @YaoZengzeng in this [issue](https://github.com/kmesh-net/orion/issues/59) and has been reviewed by @dawid-nowak @YaoZengzeng. The design follows Envoy's internal listener specification while adapting to Orion's Rust-based architecture and async runtime.

0 commit comments

Comments
 (0)