Skip to content

Commit 6156e26

Browse files
committed
Add connect timeout
1 parent bd21746 commit 6156e26

File tree

14 files changed

+89
-4
lines changed

14 files changed

+89
-4
lines changed

common/client-core/src/cli_helpers/client_add_gateway.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ where
140140
available_gateways,
141141
#[cfg(unix)]
142142
connection_fd_callback: None,
143+
connect_timeout: None,
143144
};
144145

145146
let init_details =

common/client-core/src/cli_helpers/client_init.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ where
188188
available_gateways,
189189
#[cfg(unix)]
190190
connection_fd_callback: None,
191+
connect_timeout: None,
191192
};
192193

193194
let init_details =

common/client-core/src/client/base_client/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ use std::fmt::Debug;
6565
use std::os::raw::c_int as RawFd;
6666
use std::path::Path;
6767
use std::sync::Arc;
68+
use std::time::Duration;
6869
use time::OffsetDateTime;
6970
use tokio::sync::mpsc::Sender;
7071
use url::Url;
@@ -230,6 +231,7 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
230231

231232
#[cfg(unix)]
232233
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
234+
connect_timeout: Option<Duration>,
233235

234236
derivation_material: Option<DerivationMaterial>,
235237
}
@@ -258,6 +260,7 @@ where
258260
setup_method: GatewaySetup::MustLoad { gateway_id: None },
259261
#[cfg(unix)]
260262
connection_fd_callback: None,
263+
connect_timeout: None,
261264
derivation_material: None,
262265
}
263266
}
@@ -533,6 +536,7 @@ where
533536
packet_router: PacketRouter,
534537
stats_reporter: ClientStatsSender,
535538
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
539+
connect_timeout: Option<Duration>,
536540
shutdown_tracker: &ShutdownTracker,
537541
) -> Result<GatewayClient<C, S::CredentialStore>, ClientCoreError>
538542
where
@@ -577,6 +581,7 @@ where
577581
stats_reporter,
578582
#[cfg(unix)]
579583
connection_fd_callback,
584+
connect_timeout,
580585
shutdown_tracker.clone_shutdown_token(),
581586
)
582587
};
@@ -640,6 +645,7 @@ where
640645
packet_router: PacketRouter,
641646
stats_reporter: ClientStatsSender,
642647
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
648+
connect_timeout: Option<Duration>,
643649
shutdown_tracker: &ShutdownTracker,
644650
) -> Result<Box<dyn GatewayTransceiver + Send>, ClientCoreError>
645651
where
@@ -672,6 +678,7 @@ where
672678
stats_reporter,
673679
#[cfg(unix)]
674680
connection_fd_callback,
681+
connect_timeout,
675682
shutdown_tracker,
676683
)
677684
.await?;
@@ -1074,6 +1081,7 @@ where
10741081
stats_reporter.clone(),
10751082
#[cfg(unix)]
10761083
self.connection_fd_callback,
1084+
self.connect_timeout,
10771085
&shutdown_tracker.child_tracker(),
10781086
)
10791087
.await?;

common/client-core/src/init/helpers.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,13 +382,15 @@ pub(super) async fn register_with_gateway(
382382
gateway_listener: Url,
383383
our_identity: Arc<ed25519::KeyPair>,
384384
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
385+
connect_timeout: Option<Duration>,
385386
) -> Result<RegistrationResult, ClientCoreError> {
386387
let mut gateway_client = GatewayClient::new_init(
387388
gateway_listener,
388389
gateway_id,
389390
our_identity.clone(),
390391
#[cfg(unix)]
391392
connection_fd_callback,
393+
connect_timeout,
392394
);
393395

394396
gateway_client.establish_connection().await.map_err(|err| {

common/client-core/src/init/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use nym_topology::node::RoutingNode;
2323
use rand::rngs::OsRng;
2424
use rand::{CryptoRng, RngCore};
2525
use serde::Serialize;
26+
use std::time::Duration;
2627
#[cfg(unix)]
2728
use std::{os::fd::RawFd, sync::Arc};
2829

@@ -56,6 +57,7 @@ async fn setup_new_gateway<K, D>(
5657
selection_specification: GatewaySelectionSpecification,
5758
available_gateways: Vec<RoutingNode>,
5859
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
60+
connect_timeout: Option<Duration>,
5961
) -> Result<InitialisationResult, ClientCoreError>
6062
where
6163
K: KeyStore,
@@ -117,6 +119,7 @@ where
117119
our_identity,
118120
#[cfg(unix)]
119121
connection_fd_callback,
122+
connect_timeout,
120123
)
121124
.await?;
122125
(
@@ -213,6 +216,7 @@ where
213216
available_gateways,
214217
#[cfg(unix)]
215218
connection_fd_callback,
219+
connect_timeout,
216220
} => {
217221
tracing::debug!("GatewaySetup::New with spec: {specification:?}");
218222
setup_new_gateway(
@@ -222,6 +226,7 @@ where
222226
available_gateways,
223227
#[cfg(unix)]
224228
connection_fd_callback,
229+
connect_timeout,
225230
)
226231
.await
227232
}

common/client-core/src/init/types.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::fmt::{Debug, Display};
2121
#[cfg(unix)]
2222
use std::os::fd::RawFd;
2323
use std::sync::Arc;
24+
use std::time::Duration;
2425
use time::OffsetDateTime;
2526
use url::Url;
2627

@@ -214,6 +215,9 @@ pub enum GatewaySetup {
214215
/// Callback useful for allowing initial connection to gateway
215216
#[cfg(unix)]
216217
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
218+
219+
/// Timeout for establishing connection
220+
connect_timeout: Option<Duration>,
217221
},
218222

219223
ReuseConnection {
@@ -239,6 +243,7 @@ impl Debug for GatewaySetup {
239243
available_gateways,
240244
#[cfg(unix)]
241245
connection_fd_callback: _,
246+
connect_timeout: _,
242247
} => f
243248
.debug_struct("GatewaySetup::New")
244249
.field("specification", specification)
@@ -280,6 +285,7 @@ impl GatewaySetup {
280285
available_gateways: vec![],
281286
#[cfg(unix)]
282287
connection_fd_callback: None,
288+
connect_timeout: None,
283289
}
284290
}
285291

common/client-libs/gateway-client/src/client/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use url::Url;
3838

3939
#[cfg(unix)]
4040
use std::os::fd::RawFd;
41+
use std::time::Duration;
4142
#[cfg(not(target_arch = "wasm32"))]
4243
use tokio::time::sleep;
4344

@@ -104,10 +105,13 @@ pub struct GatewayClient<C, St = EphemeralCredentialStorage> {
104105
// currently unused (but populated)
105106
negotiated_protocol: Option<u8>,
106107

107-
// Callback on the fd as soon as the connection has been established
108+
/// Callback on the fd as soon as the connection has been established
108109
#[cfg(unix)]
109110
connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
110111

112+
/// Maximum duration to wait for a connection to be established when set
113+
connect_timeout: Option<Duration>,
114+
111115
/// Listen to shutdown messages and send notifications back to the task manager
112116
shutdown_token: ShutdownToken,
113117
}
@@ -124,6 +128,7 @@ impl<C, St> GatewayClient<C, St> {
124128
bandwidth_controller: Option<BandwidthController<C, St>>,
125129
stats_reporter: ClientStatsSender,
126130
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
131+
connect_timeout: Option<Duration>,
127132
shutdown_token: ShutdownToken,
128133
) -> Self {
129134
GatewayClient {
@@ -141,6 +146,7 @@ impl<C, St> GatewayClient<C, St> {
141146
negotiated_protocol: None,
142147
#[cfg(unix)]
143148
connection_fd_callback,
149+
connect_timeout,
144150
shutdown_token,
145151
}
146152
}
@@ -208,6 +214,7 @@ impl<C, St> GatewayClient<C, St> {
208214
&self.gateway_address,
209215
#[cfg(unix)]
210216
self.connection_fd_callback.clone(),
217+
self.connect_timeout,
211218
)
212219
.await?;
213220

@@ -1132,6 +1139,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
11321139
gateway_identity: ed25519::PublicKey,
11331140
local_identity: Arc<ed25519::KeyPair>,
11341141
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
1142+
connect_timeout: Option<Duration>,
11351143
) -> Self {
11361144
log::trace!("Initialising gateway client");
11371145
use futures::channel::mpsc;
@@ -1158,6 +1166,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
11581166
negotiated_protocol: None,
11591167
#[cfg(unix)]
11601168
connection_fd_callback,
1169+
connect_timeout,
11611170
shutdown_token,
11621171
}
11631172
}
@@ -1190,6 +1199,7 @@ impl GatewayClient<InitOnly, EphemeralCredentialStorage> {
11901199
negotiated_protocol: self.negotiated_protocol,
11911200
#[cfg(unix)]
11921201
connection_fd_callback: self.connection_fd_callback,
1202+
connect_timeout: self.connect_timeout,
11931203
shutdown_token,
11941204
}
11951205
}

common/client-libs/gateway-client/src/client/websockets.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::error::GatewayClientError;
22

33
use nym_http_api_client::HickoryDnsResolver;
4+
use std::time::Duration;
45
#[cfg(unix)]
56
use std::{
67
os::fd::{AsRawFd, RawFd},
@@ -17,6 +18,7 @@ use std::net::SocketAddr;
1718
pub(crate) async fn connect_async(
1819
endpoint: &str,
1920
#[cfg(unix)] connection_fd_callback: Option<Arc<dyn Fn(RawFd) + Send + Sync>>,
21+
connect_timeout: Option<Duration>,
2022
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response), GatewayClientError> {
2123
use tokio::net::TcpSocket;
2224

@@ -64,7 +66,22 @@ pub(crate) async fn connect_async(
6466
callback.as_ref()(socket.as_raw_fd());
6567
}
6668

67-
match socket.connect(sock_addr).await {
69+
let connect_res = if let Some(connect_timeout) = connect_timeout {
70+
match tokio::time::timeout(connect_timeout, socket.connect(sock_addr)).await {
71+
Ok(res) => res,
72+
Err(_elapsed) => {
73+
stream = Err(GatewayClientError::NetworkConnectionTimeout {
74+
address: endpoint.to_owned(),
75+
timeout: connect_timeout,
76+
});
77+
continue;
78+
}
79+
}
80+
} else {
81+
socket.connect(sock_addr).await
82+
};
83+
84+
match connect_res {
6885
Ok(s) => {
6986
stream = Ok(s);
7087
break;

common/client-libs/gateway-client/src/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use nym_gateway_requests::registration::handshake::error::HandshakeError;
55
use nym_gateway_requests::{GatewayRequestsError, SimpleGatewayRequestsError};
66
use std::io;
7+
use std::time::Duration;
78
use thiserror::Error;
89
use tungstenite::Error as WsError;
910

@@ -46,6 +47,9 @@ pub enum GatewayClientError {
4647
source: Box<WsError>,
4748
},
4849

50+
#[error("timeout when establishing connection: {address}, timeout: {timeout:?}")]
51+
NetworkConnectionTimeout { address: String, timeout: Duration },
52+
4953
#[error("no socket address for endpoint: {address}")]
5054
NoEndpointForConnection { address: String },
5155

common/wasm/client-core/src/helpers.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use nym_topology::{EpochRewardedSet, NymTopology, RoutingNode};
2323
use nym_validator_client::client::IdentityKey;
2424
use nym_validator_client::{nym_api::NymApiClientExt, UserAgent};
2525
use rand::thread_rng;
26+
use std::time::Duration;
2627
use url::Url;
2728
use wasm_bindgen::prelude::wasm_bindgen;
2829
use wasm_bindgen_futures::future_to_promise;
@@ -127,6 +128,7 @@ pub async fn setup_gateway_wasm(
127128
force_tls: bool,
128129
chosen_gateway: Option<IdentityKey>,
129130
gateways: Vec<RoutingNode>,
131+
connect_timeout: Option<Duration>,
130132
) -> Result<InitialisationResult, WasmCoreError> {
131133
// TODO: so much optimization and extra features could be added here, but that's for the future
132134

@@ -144,6 +146,7 @@ pub async fn setup_gateway_wasm(
144146
GatewaySetup::New {
145147
specification: selection_spec,
146148
available_gateways: gateways,
149+
connect_timeout,
147150
}
148151
};
149152

@@ -159,6 +162,7 @@ pub async fn setup_gateway_from_api(
159162
nym_apis: &[Url],
160163
minimum_performance: u8,
161164
ignore_epoch_roles: bool,
165+
connect_timeout: Option<Duration>,
162166
) -> Result<InitialisationResult, WasmCoreError> {
163167
let gateways = gateways_for_init(
164168
nym_apis,
@@ -168,7 +172,14 @@ pub async fn setup_gateway_from_api(
168172
None,
169173
)
170174
.await?;
171-
setup_gateway_wasm(client_store, force_tls, chosen_gateway, gateways).await
175+
setup_gateway_wasm(
176+
client_store,
177+
force_tls,
178+
chosen_gateway,
179+
gateways,
180+
connect_timeout,
181+
)
182+
.await
172183
}
173184

174185
pub async fn current_gateways_wasm(
@@ -192,9 +203,17 @@ pub async fn setup_from_topology(
192203
force_tls: bool,
193204
topology: &NymTopology,
194205
client_store: &ClientStorage,
206+
connect_timeout: Option<Duration>,
195207
) -> Result<InitialisationResult, WasmCoreError> {
196208
let gateways = topology.entry_capable_nodes().cloned().collect::<Vec<_>>();
197-
setup_gateway_wasm(client_store, force_tls, explicit_gateway, gateways).await
209+
setup_gateway_wasm(
210+
client_store,
211+
force_tls,
212+
explicit_gateway,
213+
gateways,
214+
connect_timeout,
215+
)
216+
.await
198217
}
199218

200219
pub async fn generate_new_client_keys(store: &ClientStorage) -> Result<(), WasmCoreError> {
@@ -213,6 +232,7 @@ pub async fn add_gateway(
213232
min_performance: u8,
214233
ignore_epoch_roles: bool,
215234
storage: &ClientStorage,
235+
connect_timeout: Option<Duration>,
216236
) -> Result<(), WasmCoreError> {
217237
let selection_spec = GatewaySelectionSpecification::new(
218238
preferred_gateway.clone(),
@@ -267,6 +287,7 @@ pub async fn add_gateway(
267287
let gateway_setup = GatewaySetup::New {
268288
specification: selection_spec,
269289
available_gateways,
290+
connect_timeout,
270291
};
271292

272293
let init_details = setup_gateway(gateway_setup, storage, storage).await?;

0 commit comments

Comments
 (0)