Skip to content

Commit 8bf7f9a

Browse files
Merge pull request #67 from Dridus/rmm/windows-io-hang
work around hang starting database on windows
2 parents 2783485 + 0df2e49 commit 8bf7f9a

File tree

1 file changed

+94
-19
lines changed

1 file changed

+94
-19
lines changed

postgresql_commands/src/traits.rs

Lines changed: 94 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -160,28 +160,103 @@ impl CommandExecutor for std::process::Command {
160160
impl AsyncCommandExecutor for tokio::process::Command {
161161
/// Execute the command and return the stdout and stderr
162162
async fn execute(&mut self, timeout: Option<Duration>) -> Result<(String, String)> {
163+
#[tracing::instrument(level = "debug", skip(reader))]
164+
async fn read_process(
165+
mut reader: Option<impl tokio::io::AsyncRead + Unpin>,
166+
mut exit_anyway_broadcast_receiver: tokio::sync::broadcast::Receiver<()>,
167+
) -> Result<String> {
168+
let Some(reader) = reader.as_mut() else {
169+
return Ok(String::new());
170+
};
171+
let mut vec = Vec::new();
172+
loop {
173+
use tokio::io::AsyncReadExt as _;
174+
tokio::select! {
175+
n = reader.read_buf(&mut vec) => {
176+
if n? == 0 {
177+
return Ok(String::from_utf8_lossy(&vec).into_owned());
178+
}
179+
},
180+
_ = exit_anyway_broadcast_receiver.recv() => {
181+
return Ok(String::from_utf8_lossy(&vec).into_owned());
182+
},
183+
}
184+
}
185+
}
186+
163187
debug!("Executing command: {}", self.to_command_string());
164-
let output = match timeout {
165-
Some(duration) => tokio::time::timeout(duration, self.output()).await?,
166-
None => self.output().await,
167-
}?;
168188

169-
let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
170-
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
171-
debug!(
172-
"Result: {}\nstdout: {}\nstderr: {}",
173-
output
174-
.status
175-
.code()
176-
.map_or("None".to_string(), |c| c.to_string()),
177-
stdout,
178-
stderr
179-
);
189+
let res_fut = async {
190+
let mut child = self
191+
.stdout(std::process::Stdio::piped())
192+
.stderr(std::process::Stdio::piped())
193+
.spawn()?;
194+
195+
let stdout = child.stdout.take();
196+
let stderr = child.stderr.take();
197+
198+
// on windows, pg_ctl start will appear to hang if you try to read out all of stdout
199+
// and stderr. so, on windows do a horrible hack and forcibly end reading of stdout
200+
// and stderr 50ms after the process exits. on not-windows, this early exit mechanism
201+
// is set up but never sent to, resulting in the same behavior as `read_to_end`.
202+
203+
let (exit_anyway_broadcast_sender, exit_anyway_broadcast_receiver_stdout) =
204+
tokio::sync::broadcast::channel(1);
205+
let exit_anyway_broadcast_receiver_stderr = exit_anyway_broadcast_sender.subscribe();
206+
let stdout = tokio::spawn(async {
207+
read_process(stdout, exit_anyway_broadcast_receiver_stdout).await
208+
});
209+
let stderr = tokio::spawn(async {
210+
read_process(stderr, exit_anyway_broadcast_receiver_stderr).await
211+
});
212+
let exit_status = child.wait().await;
213+
#[cfg(target_os = "windows")]
214+
{
215+
tokio::time::sleep(Duration::from_millis(50)).await;
216+
let _ = exit_anyway_broadcast_sender.send(());
217+
}
218+
let (stdout, stderr) = tokio::join!(stdout, stderr);
219+
std::mem::drop(exit_anyway_broadcast_sender);
220+
221+
let exit_status = exit_status?;
222+
fn debug_render(
223+
which: &'static str,
224+
res: &std::result::Result<Result<String>, tokio::task::JoinError>,
225+
) -> String {
226+
match res {
227+
Ok(Ok(s)) => s.into(),
228+
Ok(Err(io_err)) => format!("<failed to read {}: {:?}>", which, io_err),
229+
Err(join_err) => format!("<failed to read {}: {:?}>", which, join_err),
230+
}
231+
}
232+
debug!(
233+
"Result: {}\nstdout: {}\nstderr: {}",
234+
exit_status
235+
.code()
236+
.map_or("None".to_string(), |c| c.to_string()),
237+
debug_render("stdout", &stdout),
238+
debug_render("stderr", &stderr)
239+
);
240+
241+
fn unwrap2_or_empty_string<E, E2>(
242+
r: std::result::Result<std::result::Result<String, E>, E2>,
243+
) -> String {
244+
r.map_or_else(|_| String::new(), |r| r.unwrap_or_else(|_| String::new()))
245+
}
180246

181-
if output.status.success() {
182-
Ok((stdout, stderr))
183-
} else {
184-
Err(Error::CommandError { stdout, stderr })
247+
let stdout = unwrap2_or_empty_string(stdout);
248+
let stderr = unwrap2_or_empty_string(stderr);
249+
250+
if exit_status.success() {
251+
Ok((stdout, stderr))
252+
} else {
253+
Err(Error::CommandError { stdout, stderr })
254+
}
255+
};
256+
257+
match timeout {
258+
Some(duration) => tokio::time::timeout(duration, res_fut).await?,
259+
None => res_fut.await,
185260
}
186261
}
187262
}

0 commit comments

Comments
 (0)