Skip to content

Commit b6abbbf

Browse files
committed
feat: Extract internal connection infrastructure to orion-internal crate
Moved all internal connection related code (factories, connectors, streams, tests) from orion-lib to dedicated orion-internal crate. Signed-off-by: Eeshu-Yadav <eeshuyadav123@gmail.com>
1 parent d32ae90 commit b6abbbf

File tree

13 files changed

+166
-40
lines changed

13 files changed

+166
-40
lines changed

Cargo.lock

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ members = [
66
"orion-error",
77
"orion-http-header",
88
"orion-interner",
9+
"orion-internal",
910
"orion-format",
1011
"orion-lib",
1112
"orion-metrics",
@@ -35,6 +36,7 @@ orion-error = { path = "orion-error" }
3536
orion-format = { path = "orion-format" }
3637
orion-http-header = { path = "orion-http-header" }
3738
orion-interner = { path = "orion-interner" }
39+
orion-internal = { path = "orion-internal" }
3840
orion-lib = { path = "orion-lib" }
3941
orion-metrics = { path = "orion-metrics" }
4042
orion-tracing = { path = "orion-tracing" }

orion-internal/Cargo.toml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[package]
2+
name = "orion-internal"
3+
version = "0.1.0"
4+
edition = "2021"
5+
license = "Apache-2.0"
6+
description = "Internal connection and internal listener infrastructure for Orion proxy"
7+
8+
[dependencies]
9+
tokio = { version = "1.42.0", features = ["full"] }
10+
tracing = "0.1.41"
11+
orion-error = { path = "../orion-error" }
12+
orion-configuration = { path = "../orion-configuration" }
13+
compact_str = "0.8.0"
14+
15+
[dev-dependencies]
16+
tokio-test = "0.4"

orion-lib/src/transport/internal_connection.rs renamed to orion-internal/src/connection.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ use tokio::{
2727
time::Instant,
2828
};
2929

30-
use super::AsyncStream;
31-
use crate::{Error, Result};
30+
pub use crate::{Error, Result};
3231

3332
#[derive(Debug, Clone)]
3433
pub struct InternalConnectionMetadata {
@@ -213,7 +212,11 @@ impl InternalConnectionFactory {
213212
Ok(())
214213
}
215214

216-
pub async fn connect_to_listener(&self, name: &str, endpoint_id: Option<String>) -> Result<AsyncStream> {
215+
pub async fn connect_to_listener(
216+
&self,
217+
name: &str,
218+
endpoint_id: Option<String>,
219+
) -> Result<Box<InternalStreamWrapper>> {
217220
let listeners = self.listeners.read().await;
218221
let handle = listeners.get(name).ok_or_else(|| Error::new(format!("Internal listener '{name}' not found")))?;
219222

orion-lib/src/transport/internal_cluster_connector.rs renamed to orion-internal/src/connector.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
//
1616
//
1717

18-
use super::{global_internal_connection_factory, AsyncStream};
19-
use crate::{Error, Result};
18+
use crate::{connection::global_internal_connection_factory, Error, InternalStreamWrapper, Result};
2019

2120
#[derive(Debug, Clone)]
2221
pub struct InternalClusterConnector {
@@ -37,7 +36,7 @@ impl InternalClusterConnector {
3736
self.endpoint_id.as_deref()
3837
}
3938

40-
pub async fn connect(&self) -> Result<AsyncStream> {
39+
pub async fn connect(&self) -> Result<Box<InternalStreamWrapper>> {
4140
let factory = global_internal_connection_factory();
4241

4342
if !factory.is_listener_active(&self.listener_name).await {
@@ -94,7 +93,7 @@ impl InternalChannelConnector {
9493
}
9594

9695
pub struct InternalChannel {
97-
pub stream: AsyncStream,
96+
pub stream: Box<InternalStreamWrapper>,
9897
pub cluster_name: &'static str,
9998
pub listener_name: String,
10099
pub endpoint_id: Option<String>,
@@ -116,6 +115,7 @@ impl InternalChannel {
116115

117116
pub mod cluster_helpers {
118117
use super::{global_internal_connection_factory, InternalChannelConnector};
118+
use crate::InternalConnectionStats;
119119
use orion_configuration::config::cluster::InternalEndpointAddress;
120120

121121
pub fn create_internal_connector(
@@ -134,7 +134,7 @@ pub mod cluster_helpers {
134134
factory.is_listener_active(listener_name).await
135135
}
136136

137-
pub async fn get_internal_connection_stats() -> crate::transport::InternalConnectionStats {
137+
pub async fn get_internal_connection_stats() -> InternalConnectionStats {
138138
let factory = global_internal_connection_factory();
139139
factory.get_stats().await
140140
}

orion-internal/src/filter_state.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2025 The kmesh Authors
2+
//
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
//
17+
18+
use std::net::SocketAddr;
19+
20+
/// Reserved internal address used as the peer address for in-process (internal) connections.
21+
/// Chosen from the loopback range (127.255.255.254:65534) to avoid conflicts with real network traffic.
22+
/// This address clearly identifies internal connections in logs and debugging.
23+
pub const INTERNAL_PEER_ADDR: SocketAddr =
24+
SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 254), 65534));
25+
26+
/// Reserved internal address used as the local address for in-process (internal) connections.
27+
/// Chosen from the loopback range (127.255.255.255:65535) to avoid conflicts with real network traffic.
28+
/// This address clearly identifies internal connections in logs and debugging.
29+
pub const INTERNAL_LOCAL_ADDR: SocketAddr =
30+
SocketAddr::V4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::new(127, 255, 255, 255), 65535));
31+
32+
/// Check if a socket address is an internal connection address
33+
pub fn is_internal_address(addr: &SocketAddr) -> bool {
34+
*addr == INTERNAL_PEER_ADDR || *addr == INTERNAL_LOCAL_ADDR
35+
}
36+
37+
#[cfg(test)]
38+
mod tests {
39+
use super::*;
40+
41+
#[test]
42+
fn test_internal_addresses() {
43+
assert!(is_internal_address(&INTERNAL_PEER_ADDR));
44+
assert!(is_internal_address(&INTERNAL_LOCAL_ADDR));
45+
46+
let normal_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
47+
assert!(!is_internal_address(&normal_addr));
48+
}
49+
}

orion-internal/src/lib.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2025 The kmesh Authors
2+
//
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
//
17+
18+
//! # orion-internal
19+
//!
20+
//! Internal connection and internal listener infrastructure for Orion proxy.
21+
//! This crate provides the core functionality for in-process communication between
22+
//! different components of the proxy, particularly for waypoint proxy capabilities
23+
//! in ambient service mesh scenarios.
24+
25+
pub mod connection;
26+
pub mod connector;
27+
pub mod filter_state;
28+
29+
// Re-export commonly used types
30+
pub use connection::{
31+
global_internal_connection_factory, InternalConnectionFactory, InternalConnectionMetadata, InternalConnectionPair,
32+
InternalConnectionStats, InternalListenerHandle, InternalStream, InternalStreamWrapper,
33+
};
34+
pub use connector::{cluster_helpers, InternalChannel, InternalChannelConnector, InternalClusterConnector};
35+
pub use filter_state::{is_internal_address, INTERNAL_LOCAL_ADDR, INTERNAL_PEER_ADDR};
36+
37+
// Re-export error types
38+
pub type Error = orion_error::Error;
39+
pub type Result<T> = ::core::result::Result<T, Error>;

orion-lib/tests/internal_connection_integration.rs renamed to orion-internal/tests/integration_tests.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
//
1616
//
1717

18-
use orion_lib::{global_internal_connection_factory, InternalChannelConnector};
18+
use orion_internal::{cluster_helpers::*, global_internal_connection_factory, InternalChannelConnector};
1919

2020
#[tokio::test]
2121
async fn test_complete_internal_connection_flow() {
@@ -155,22 +155,29 @@ async fn test_statistics_and_monitoring() {
155155
#[tokio::test]
156156
async fn test_cluster_helpers() {
157157
use orion_configuration::config::cluster::InternalEndpointAddress;
158-
use orion_lib::cluster_helpers::*;
159158

160159
let internal_addr = InternalEndpointAddress {
161-
server_listener_name: "test_listener".to_string().into(),
160+
server_listener_name: "test_cluster_helpers_listener".to_string().into(),
162161
endpoint_id: Some("endpoint1".to_string().into()),
163162
};
164163

165164
let connector = create_internal_connector(&internal_addr, "test_cluster");
166165
assert_eq!(connector.cluster_name(), "test_cluster");
167-
assert_eq!(connector.listener_name(), "test_listener");
166+
assert_eq!(connector.listener_name(), "test_cluster_helpers_listener");
168167

169-
assert!(!is_internal_listener_available("non_existent").await);
168+
assert!(!is_internal_listener_available("non_existent_listener_xyz").await);
170169

171170
let stats = get_internal_connection_stats().await;
172171
assert_eq!(stats.max_pooled_connections, 0);
173172

173+
let factory = global_internal_connection_factory();
174+
let (_handle, _rx, _listener_ref) =
175+
factory.register_listener("test_cluster_helpers_listener".to_string()).await.unwrap();
176+
177+
assert!(is_internal_listener_available("test_cluster_helpers_listener").await);
178+
174179
let listeners = list_internal_listeners().await;
175-
assert!(listeners.is_empty());
180+
assert!(listeners.contains(&"test_cluster_helpers_listener".to_string()));
181+
182+
factory.unregister_listener("test_cluster_helpers_listener").await.unwrap();
176183
}

orion-lib/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ orion-error.workspace = true
3737
orion-format.workspace = true
3838
orion-http-header.workspace = true
3939
orion-interner.workspace = true
40+
orion-internal.workspace = true
4041
orion-metrics.workspace = true
4142
orion-tracing.workspace = true
4243
orion-xds.workspace = true

orion-lib/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,11 @@ use orion_configuration::config::{
5454
Bootstrap, Cluster, Listener as ListenerConfig,
5555
};
5656
pub use secrets::SecretManager;
57-
pub use transport::internal_cluster_connector::cluster_helpers;
5857
pub(crate) use transport::AsyncStream;
59-
pub use transport::{
60-
global_internal_connection_factory, InternalChannelConnector, InternalConnectionFactory, InternalConnectionPair,
61-
InternalConnectionStats, InternalListenerHandle,
58+
59+
pub use orion_internal::{
60+
cluster_helpers, global_internal_connection_factory, InternalChannelConnector, InternalConnectionFactory,
61+
InternalConnectionPair, InternalConnectionStats, InternalListenerHandle,
6262
};
6363

6464
pub type Error = orion_error::Error;

0 commit comments

Comments
 (0)