Skip to content

Commit e6a0119

Browse files
authored
fix: graceful shutdown of ws conns (#1473)
1 parent caa3407 commit e6a0119

File tree

4 files changed

+38
-39
lines changed

4 files changed

+38
-39
lines changed

crates/core/src/client_events.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -185,16 +185,14 @@ where
185185
node_controller.send(NodeEvent::Disconnect { cause: None }).await.ok();
186186
anyhow::bail!("shutdown event");
187187
}
188+
Err(error) if matches!(error.kind(), ErrorKind::TransportProtocolDisconnect) => {
189+
return Err(anyhow::anyhow!(error));
190+
}
188191
Err(error) => {
189192
tracing::debug!(%error, "client error");
190193
continue;
191194
}
192195
};
193-
// fixme: only allow in certain modes (e.g. while testing)
194-
if let ClientRequest::Disconnect { cause } = &*req.request {
195-
node_controller.send(NodeEvent::Disconnect { cause: cause.clone() }).await.ok();
196-
anyhow::bail!("shutdown event");
197-
}
198196
let cli_id = req.client_id;
199197
let res = process_open_request(req, op_manager.clone()).await;
200198
results.push(async move {
@@ -207,6 +205,10 @@ where
207205
}
208206
}
209207
Ok(None) => (cli_id, Ok(None)),
208+
Err(Error::Disconnected) => {
209+
tracing::debug!("client disconnected");
210+
(cli_id, Err(ClientError::from(ErrorKind::Disconnect)))
211+
}
210212
Err(err) => (cli_id, Err(ErrorKind::OperationError { cause: format!("{err}").into() }.into())),
211213
}
212214
});
@@ -544,6 +546,9 @@ async fn process_open_request(
544546

545547
return Ok(Some(Either::Right(callback_rx.unwrap())));
546548
}
549+
ClientRequest::Close => {
550+
return Err(Error::Disconnected);
551+
}
547552
_ => {
548553
tracing::error!("Op not supported");
549554
}

crates/core/src/client_events/combinator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ async fn client_fn(
171171
}
172172
}
173173
}
174-
tracing::error!("Client shut down");
174+
tracing::error!("Peer client interface shut down");
175175
}
176176

177177
#[cfg(test)]

crates/core/tests/operations.rs

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -201,22 +201,19 @@ async fn test_put_contract() -> TestResult {
201201
make_put(&mut client_api_a, wrapped_state.clone(), contract.clone()).await?;
202202

203203
// Wait for put response
204-
loop {
205-
let resp = tokio::time::timeout(Duration::from_secs(30), client_api_a.recv()).await;
206-
match resp {
207-
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
208-
assert_eq!(key, contract_key);
209-
break;
210-
}
211-
Ok(Ok(other)) => {
212-
tracing::warn!("unexpected response while waiting for put: {:?}", other);
213-
}
214-
Ok(Err(e)) => {
215-
bail!("Error receiving put response: {}", e);
216-
}
217-
Err(_) => {
218-
bail!("Timeout waiting for put response");
219-
}
204+
let resp = tokio::time::timeout(Duration::from_secs(30), client_api_a.recv()).await;
205+
match resp {
206+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
207+
assert_eq!(key, contract_key);
208+
}
209+
Ok(Ok(other)) => {
210+
tracing::warn!("unexpected response while waiting for put: {:?}", other);
211+
}
212+
Ok(Err(e)) => {
213+
bail!("Error receiving put response: {}", e);
214+
}
215+
Err(_) => {
216+
bail!("Timeout waiting for put response");
220217
}
221218
}
222219

@@ -362,22 +359,19 @@ async fn test_update_contract() -> TestResult {
362359
make_put(&mut client_api_a, wrapped_state.clone(), contract.clone()).await?;
363360

364361
// Wait for put response
365-
loop {
366-
let resp = tokio::time::timeout(Duration::from_secs(30), client_api_a.recv()).await;
367-
match resp {
368-
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
369-
assert_eq!(key, contract_key, "Contract key mismatch in PUT response");
370-
break;
371-
}
372-
Ok(Ok(other)) => {
373-
tracing::warn!("unexpected response while waiting for put: {:?}", other);
374-
}
375-
Ok(Err(e)) => {
376-
bail!("Error receiving put response: {}", e);
377-
}
378-
Err(_) => {
379-
bail!("Timeout waiting for put response");
380-
}
362+
let resp = tokio::time::timeout(Duration::from_secs(30), client_api_a.recv()).await;
363+
match resp {
364+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
365+
assert_eq!(key, contract_key, "Contract key mismatch in PUT response");
366+
}
367+
Ok(Ok(other)) => {
368+
tracing::warn!("unexpected response while waiting for put: {:?}", other);
369+
}
370+
Ok(Err(e)) => {
371+
bail!("Error receiving put response: {}", e);
372+
}
373+
Err(_) => {
374+
bail!("Timeout waiting for put response");
381375
}
382376
}
383377

0 commit comments

Comments
 (0)