Skip to content

Commit 600671c

Browse files
committed
used guard_
1 parent f1486b2 commit 600671c

File tree

3 files changed

+110
-124
lines changed

3 files changed

+110
-124
lines changed

io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala renamed to io/jvm-native/src/main/scala/fs2/io/process/ProcessesCompanionJvmNative.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import fs2.io.CollectionCompat.*
3030

3131
import java.lang
3232

33-
private[process] trait Processesjvmnative {
33+
private[process] trait ProcessesCompanionJvmNative {
3434
def forAsync[F[_]](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] {
3535

3636
def spawn(process: ProcessBuilder): Resource[F, Process[F]] =

io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,4 @@ package fs2
2323
package io
2424
package process
2525

26-
private[process] trait ProcessesCompanionPlatform extends Processesjvmnative
26+
private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmNative

io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala

Lines changed: 108 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -37,32 +37,23 @@ import java.io.IOException
3737
import cats.effect.LiftIO
3838
import cats.effect.IO
3939
import org.typelevel.scalaccompat.annotation._
40-
import scala.concurrent.duration.*
41-
import cats.effect.implicits.*
40+
import fs2.io.internal.NativeUtil._
4241

4342
@extern
4443
@nowarn212("cat=unused")
45-
object SyscallBindings {
46-
def syscall(number: CLong, arg1: CLong, arg2: CLong): CLong = extern
44+
object LibC {
45+
def pidfd_open(pid: CInt, flags: CInt): CInt = extern
4746
}
4847

49-
object pidFd {
50-
private val SYS_pidfd_open = 434L
51-
52-
def pidfd_open(pid: pid_t, flags: Int): Int = {
53-
val fd = SyscallBindings.syscall(SYS_pidfd_open, pid.toLong, flags.toLong)
54-
fd.toInt
55-
}
56-
}
57-
58-
final case class NativeProcess(
48+
private final case class NativeProcess(
5949
pid: pid_t,
6050
stdinFd: Int,
6151
stdoutFd: Int,
62-
stderrFd: Int
52+
stderrFd: Int,
53+
pidfd: Option[Int] = None
6354
)
6455

65-
private[process] trait ProcessesCompanionPlatform extends Processesjvmnative {
56+
private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmNative {
6657

6758
private def findExecutable(cmd: String)(implicit z: Zone): Option[String] = {
6859
val pathEnv = sys.env.getOrElse("PATH", "")
@@ -76,22 +67,35 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative {
7667
}
7768

7869
@inline private def closeAll(fds: Int*): Unit =
79-
fds.foreach { fd => close(fd); () }
70+
fds.foreach(close)
71+
72+
def pipeResource[F[_]](implicit F: Async[F]): Resource[F, (Int, Int)] =
73+
Resource.make {
74+
F.blocking {
75+
val fds = stackalloc[CInt](2.toUInt)
76+
guard_(pipe(fds))
77+
(fds(0), fds(1))
78+
}
79+
} { case (r, w) => F.blocking { close(r); close(w); () } }
8080

8181
def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] =
8282
if (LinktimeInfo.isMac || LinktimeInfo.isLinux) {
8383
new UnsealedProcesses[F] {
8484
def spawn(process: ProcessBuilder): Resource[F, Process[F]] = {
8585

86-
def createProcess(): F[NativeProcess] = F.blocking {
87-
Zone { implicit z =>
88-
val stdinPipe = stackalloc[CInt](2.toUInt)
89-
val stdoutPipe = stackalloc[CInt](2.toUInt)
90-
val stderrPipe = stackalloc[CInt](2.toUInt)
86+
val pipesResource: Resource[F, ((Int, Int), (Int, Int), (Int, Int))] =
87+
for {
88+
stdinPipe <- pipeResource[F]
89+
stdoutPipe <- pipeResource[F]
90+
stderrPipe <- pipeResource[F]
91+
} yield (stdinPipe, stdoutPipe, stderrPipe)
9192

92-
if (pipe(stdinPipe) != 0 || pipe(stdoutPipe) != 0 || pipe(stderrPipe) != 0) {
93-
throw new RuntimeException("Failed to create pipes")
94-
}
93+
def createProcess(
94+
stdinPipe: (Int, Int),
95+
stdoutPipe: (Int, Int),
96+
stderrPipe: (Int, Int)
97+
): F[NativeProcess] = F.blocking {
98+
Zone { implicit z =>
9599
val envMap =
96100
if (process.inheritEnv)
97101
sys.env ++ process.extraEnv
@@ -110,49 +114,42 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative {
110114
}
111115
argv(allArgs.length.toULong) = null
112116

113-
val executable = findExecutable(process.command).getOrElse(process.command)
114-
115-
fork() match {
116-
case -1 =>
117-
closeAll(
118-
stdinPipe(0),
119-
stdinPipe(1),
120-
stdoutPipe(0),
121-
stdoutPipe(1),
122-
stderrPipe(0),
123-
stderrPipe(1)
124-
)
125-
throw new IOException("Unable to fork process")
117+
val executable =
118+
if (process.command.startsWith("/"))
119+
process.command
120+
else
121+
findExecutable(process.command).getOrElse(process.command)
122+
val ret = guard(fork())
123+
ret match {
126124
case 0 =>
127-
closeAll(stdinPipe(1), stdoutPipe(0), stderrPipe(0))
128-
if (
129-
dup2(stdinPipe(0), STDIN_FILENO) == -1 ||
130-
dup2(stdoutPipe(1), STDOUT_FILENO) == -1 ||
131-
dup2(stderrPipe(1), STDERR_FILENO) == -1
132-
) {
133-
_exit(1)
134-
throw new IOException("Unable to redirect file descriptors")
135-
}
136-
closeAll(stdinPipe(0), stdoutPipe(1), stderrPipe(1))
125+
closeAll(stdinPipe._2, stdoutPipe._1, stderrPipe._1)
126+
guard_(dup2(stdinPipe._1, STDIN_FILENO))
127+
guard_(dup2(stdoutPipe._2, STDOUT_FILENO))
128+
guard_(dup2(stderrPipe._2, STDERR_FILENO))
129+
closeAll(stdinPipe._1, stdoutPipe._2, stderrPipe._2)
137130

138131
process.workingDirectory.foreach { dir =>
139132
if ((dir != null) && (dir.toString != ".")) {
140-
val ret = chdir(toCString(dir.toString))
141-
if (ret != 0)
142-
throw new IOException(s"Failed to chdir to ${dir.toString}")
133+
guard_(chdir(toCString(dir.toString)))
143134
}
144135
}
145136

146137
execve(toCString(executable), argv, envp)
147138
_exit(127)
148-
throw new IOException(s"Failed to create process for command: ${process.command}")
139+
throw new AssertionError("unreachable")
149140
case pid =>
150-
closeAll(stdinPipe(0), stdoutPipe(1), stderrPipe(1))
141+
closeAll(stdinPipe._1, stdoutPipe._2, stderrPipe._2)
142+
val pidfd =
143+
if (LinktimeInfo.isLinux) {
144+
val fd = LibC.pidfd_open(pid, 0)
145+
if (fd >= 0) Some(fd) else None
146+
} else None
151147
NativeProcess(
152148
pid = pid,
153-
stdinFd = stdinPipe(1),
154-
stdoutFd = stdoutPipe(0),
155-
stderrFd = stderrPipe(0)
149+
stdinFd = stdinPipe._2,
150+
stdoutFd = stdoutPipe._1,
151+
stderrFd = stderrPipe._1,
152+
pidfd
156153
)
157154
}
158155
}
@@ -177,101 +174,90 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative {
177174
}
178175
}
179176

180-
Resource.make(createProcess())(cleanup).map { nativeProcess =>
181-
new UnsealedProcess[F] {
182-
def isAlive: F[Boolean] = F.delay {
183-
kill(nativeProcess.pid, 0) == 0 || errno.errno != EPERM
177+
pipesResource.flatMap { case (stdinPipe, stdoutPipe, stderrPipe) =>
178+
Resource
179+
.make(createProcess(stdinPipe, stdoutPipe, stderrPipe))(cleanup)
180+
.flatMap { nativeProcess =>
181+
nativeProcess.pidfd match {
182+
case Some(pidfd) =>
183+
for {
184+
poller <- Resource.eval(fileDescriptorPoller[F])
185+
handle <- poller.registerFileDescriptor(pidfd, true, false).mapK(LiftIO.liftK)
186+
} yield (nativeProcess, Some(handle))
187+
case None =>
188+
Resource.pure((nativeProcess, None))
189+
}
184190
}
191+
.map { case (nativeProcess, pollHandleOpt) =>
192+
new UnsealedProcess[F] {
193+
def isAlive: F[Boolean] = F.delay {
194+
kill(nativeProcess.pid, 0) == 0 || errno.errno == EPERM
195+
}
185196

186-
def exitValue: F[Int] =
187-
if (LinktimeInfo.isLinux) {
188-
F.delay(pidFd.pidfd_open(nativeProcess.pid, 0)).flatMap { pidfd =>
189-
if (pidfd >= 0) {
190-
fileDescriptorPoller[F].flatMap { poller =>
191-
poller
192-
.registerFileDescriptor(pidfd, true, false)
193-
.use { handle =>
194-
handle.pollReadRec(()) { _ =>
197+
def exitValue: F[Int] =
198+
if (LinktimeInfo.isLinux) {
199+
(nativeProcess.pidfd, pollHandleOpt) match {
200+
case (Some(_), Some(handle)) =>
201+
handle
202+
.pollReadRec(()) { _ =>
195203
IO {
196204
val statusPtr = stackalloc[CInt]()
197205
val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG)
198-
199206
if (result == nativeProcess.pid) {
200207
val exitCode = WEXITSTATUS(!statusPtr)
201208
Right(exitCode)
202209
} else if (result == 0) {
203210
Left(())
204211
} else {
205-
if (errno.errno == ECHILD) {
212+
if (errno.errno == ECHILD)
206213
throw new IOException("No such process")
207-
} else {
214+
else
208215
throw new IOException(
209216
s"waitpid failed with errno: ${errno.errno}"
210217
)
211-
}
212218
}
213219
}
214220
}
215-
}
216-
.to
221+
.to
222+
case _ =>
223+
fallbackExitValue(nativeProcess.pid)
217224
}
218225
} else {
219226
fallbackExitValue(nativeProcess.pid)
220227
}
221-
}
222-
} else {
223-
fallbackExitValue(nativeProcess.pid)
224-
}
225228

226-
def stdin: Pipe[F, Byte, Nothing] = { in =>
227-
in
228-
.through(writeFd(nativeProcess.stdinFd))
229-
.onFinalize {
230-
F.blocking {
231-
close(nativeProcess.stdinFd)
232-
}.void
229+
def stdin: Pipe[F, Byte, Nothing] = { in =>
230+
in
231+
.through(writeFd(nativeProcess.stdinFd))
232+
.onFinalize {
233+
F.blocking {
234+
close(nativeProcess.stdinFd)
235+
}.void
236+
}
233237
}
234-
}
235-
def stdout: Stream[F, Byte] = readFd(nativeProcess.stdoutFd, 8192)
236-
.onFinalize {
237-
F.blocking {
238-
close(nativeProcess.stdoutFd)
239-
}.void
240-
}
241238

242-
def stderr: Stream[F, Byte] = readFd(nativeProcess.stderrFd, 8192)
243-
.onFinalize {
244-
F.blocking {
245-
close(nativeProcess.stderrFd)
246-
}.void
239+
def stdout: Stream[F, Byte] = readFd(nativeProcess.stdoutFd, 8192)
240+
.onFinalize {
241+
F.blocking {
242+
close(nativeProcess.stdoutFd)
243+
}.void
244+
}
245+
246+
def stderr: Stream[F, Byte] = readFd(nativeProcess.stderrFd, 8192)
247+
.onFinalize {
248+
F.blocking {
249+
close(nativeProcess.stderrFd)
250+
}.void
251+
}
247252
}
248-
}
253+
}
249254
}
250255
}
251256

252-
private def fallbackExitValue(pid: pid_t): F[Int] = {
253-
def loop: F[Int] =
254-
F.blocking {
255-
Zone { _ =>
256-
val status = stackalloc[CInt]()
257-
val result = waitpid(pid, status, WNOHANG)
258-
259-
if (result == pid) {
260-
Some(WEXITSTATUS(!status))
261-
} else if (result == 0) None
262-
else throw new IOException(s"waitpid failed with errno: ${errno.errno}")
263-
}
264-
}.flatMap {
265-
case Some(code) => F.pure(code)
266-
case None => F.sleep(10.millis) >> loop
267-
}
268-
269-
loop.onCancel {
270-
F.blocking {
271-
kill(pid, SIGKILL)
272-
()
273-
}
274-
}
257+
private def fallbackExitValue(pid: pid_t): F[Int] = F.blocking {
258+
val status = stackalloc[CInt]()
259+
guard_(waitpid(pid, status, 0))
260+
WEXITSTATUS(!status)
275261
}
276262
}
277263
} else super.forAsync[F]

0 commit comments

Comments
 (0)