From 41bfdc93b0b0d7c89c6e201f1f0f9cf4f30abf53 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Thu, 5 Jun 2025 00:31:07 +0530 Subject: [PATCH 01/29] Added Processes to IONative --- .../fs2/io/process/ProcessesPlatform.scala | 0 .../src/main/scala/fs2/io/ioplatform.scala | 65 ++--- .../fs2/io/process/ProcessesPlatform.scala | 225 ++++++++++++++++++ 3 files changed, 259 insertions(+), 31 deletions(-) rename io/{jvm-native => jvm}/src/main/scala/fs2/io/process/ProcessesPlatform.scala (100%) create mode 100644 io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala diff --git a/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala similarity index 100% rename from io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala rename to io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala diff --git a/io/native/src/main/scala/fs2/io/ioplatform.scala b/io/native/src/main/scala/fs2/io/ioplatform.scala index 919246af1c..099429c47b 100644 --- a/io/native/src/main/scala/fs2/io/ioplatform.scala +++ b/io/native/src/main/scala/fs2/io/ioplatform.scala @@ -60,36 +60,7 @@ private[fs2] trait ioplatform extends iojvmnative { /** Stream of bytes read asynchronously from standard input. */ def stdin[F[_]: Async: LiftIO](bufSize: Int): Stream[F, Byte] = if (LinktimeInfo.isLinux || LinktimeInfo.isMac) - Stream - .resource { - Resource - .eval { - setNonBlocking(STDIN_FILENO) *> fileDescriptorPoller[F] - } - .flatMap { poller => - poller.registerFileDescriptor(STDIN_FILENO, true, false).mapK(LiftIO.liftK) - } - } - .flatMap { handle => - Stream.repeatEval { - handle - .pollReadRec(()) { _ => - IO { - val buf = new Array[Byte](bufSize) - val readed = guard(read(STDIN_FILENO, buf.atUnsafe(0), bufSize.toUSize)) - if (readed > 0) - Right(Some(Chunk.array(buf, 0, readed))) - else if (readed == 0) - Right(None) - else - Left(()) - } - } - .to - } - } - .unNoneTerminate - .unchunks + readFd(STDIN_FILENO, bufSize) else readInputStream(Sync[F].blocking(System.in), bufSize, false) @@ -107,7 +78,39 @@ private[fs2] trait ioplatform extends iojvmnative { else writeOutputStream(Sync[F].blocking(System.err), false) - private[this] def writeFd[F[_]: Async: LiftIO](fd: Int): Pipe[F, Byte, Nothing] = in => + private[fs2] def readFd[F[_]: Async: LiftIO](fd: Int, bufSize: Int): Stream[F, Byte] = + Stream + .resource { + Resource + .eval { + setNonBlocking(STDIN_FILENO) *> fileDescriptorPoller[F] + } + .flatMap { poller => + poller.registerFileDescriptor(fd, true, false).mapK(LiftIO.liftK) + } + } + .flatMap { handle => + Stream.repeatEval { + handle + .pollReadRec(()) { _ => + IO { + val buf = new Array[Byte](bufSize) + val readed = guard(read(STDIN_FILENO, buf.atUnsafe(0), bufSize.toUSize)) + if (readed > 0) + Right(Some(Chunk.array(buf, 0, readed))) + else if (readed == 0) + Right(None) + else + Left(()) + } + } + .to + } + } + .unNoneTerminate + .unchunks + + private[fs2] def writeFd[F[_]: Async: LiftIO](fd: Int): Pipe[F, Byte, Nothing] = in => Stream .resource { Resource diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala new file mode 100644 index 0000000000..8908fa446b --- /dev/null +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -0,0 +1,225 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package process + +import cats.effect.kernel.{Async, Resource} +import cats.syntax.all.* +import fs2.{Stream, Pipe} +import scala.scalanative.unsafe.* +import scala.scalanative.unsigned.* +import scala.scalanative.libc.* +import scala.scalanative.posix.sys.wait.* +import scala.scalanative.posix.errno.* +import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.posix.unistd.* +import scala.scalanative.posix.spawn.* +import scala.scalanative.posix.signal.* +import java.io.IOException +import cats.effect.LiftIO +import cats.effect.IO + +@extern +object SyscallBindings { + def syscall(number: CLong, arg1: CLong, arg2: CLong): CLong = extern +} + +object PidFd { + private val SYS_pidfd_open = 434L + val PIDFD_NONBLOCK = 1 + + def pidfd_open(pid: pid_t, flags: Int): Int = { + val fd = SyscallBindings.syscall(SYS_pidfd_open, pid.toLong, flags.toLong) + fd.toInt + } +} + +private[process] trait ProcessesCompanionPlatform { + def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] { + + def spawn(process: ProcessBuilder): Resource[F, Process[F]] = { + + def createProcess(): F[NativeProcess] = F.blocking { + Zone { implicit z => + val allArgs = process.command +: process.args + val argv = stackalloc[CString](allArgs.length.toULong + 1.toULong) + allArgs.zipWithIndex.foreach { case (arg, i) => + argv(i.toULong) = toCString(arg) + } + argv(allArgs.length.toULong) = null + + val envMap = + if (process.inheritEnv) + sys.env ++ process.extraEnv + else + process.extraEnv + + val envp = stackalloc[CString]((envMap.size + 1).toULong) + envMap.zipWithIndex.foreach { case ((k, v), i) => + envp(i.toULong) = toCString(s"$k=$v") + } + envp(envMap.size.toULong) = null + + val stdinPipe = stackalloc[CInt](2) + val stdoutPipe = stackalloc[CInt](2) + val stderrPipe = stackalloc[CInt](2) + + if (pipe(stdinPipe) != 0 || pipe(stdoutPipe) != 0 || pipe(stderrPipe) != 0) { + F.raiseError(new RuntimeException("Failed to create stdin pipe")) + } + + val fileActions = stackalloc[posix_spawn_file_actions_t]() + posix_spawn_file_actions_init(fileActions) + + posix_spawn_file_actions_adddup2(fileActions, stdinPipe(0), STDIN_FILENO) + posix_spawn_file_actions_adddup2(fileActions, stdoutPipe(1), STDOUT_FILENO) + posix_spawn_file_actions_adddup2(fileActions, stderrPipe(1), STDERR_FILENO) + + posix_spawn_file_actions_addclose(fileActions, stdinPipe(1)) + posix_spawn_file_actions_addclose(fileActions, stdoutPipe(0)) + posix_spawn_file_actions_addclose(fileActions, stderrPipe(0)) + + val pid = stackalloc[pid_t]() + println("hi") + println("argv(0): " + fromCString(argv(0))) + var i = 0 + while (argv(i) != null) { + println(s" argv($i): " + fromCString(argv(i))) + i += 1 + } + val result = posix_spawn( + pid, + argv(0), + fileActions, + null, + argv, + envp + ) + + posix_spawn_file_actions_destroy(fileActions) + + if (result != 0) { + close(stdinPipe(0)); close(stdinPipe(1)) + close(stdoutPipe(0)); close(stdoutPipe(1)) + close(stderrPipe(0)); close(stderrPipe(1)) + throw new RuntimeException(s"posix_spawn failed: $result") + } + + close(stdinPipe(0)) + close(stdoutPipe(1)) + close(stderrPipe(1)) + + NativeProcess( + pid = !pid, + stdinFd = stdinPipe(1), + stdoutFd = stdoutPipe(0), + stderrFd = stderrPipe(0) + ) + } + } + + def cleanup(proc: NativeProcess): F[Unit] = + F.blocking { + close(proc.stdinFd); close(proc.stdoutFd); close(proc.stderrFd) + } *> + F.delay(kill(proc.pid, SIGKILL)) *> + F.blocking { + val status = stackalloc[CInt]() + val r = waitpid(proc.pid, status, 0) + if (r < 0 && errno.errno != ECHILD) + F.raiseError(new RuntimeException(s"waitpid failed: errno=${errno.errno}")) + () + } + + Resource.make(createProcess())(cleanup).map { nativeProcess => + new UnsealedProcess[F] { + def isAlive: F[Boolean] = F.delay { + kill(nativeProcess.pid, 0) == 0 || errno.errno != ESRCH + } + + def exitValue: F[Int] = + if (LinktimeInfo.isLinux) { + F.delay(PidFd.pidfd_open(nativeProcess.pid, PidFd.PIDFD_NONBLOCK)).flatMap { pidfd => + if (pidfd >= 0) { + fileDescriptorPoller[F].flatMap { poller => + poller + .registerFileDescriptor(pidfd, true, false) + .use { handle => + handle.pollReadRec(()) { _ => + IO { + Zone { _ => + val statusPtr = stackalloc[CInt]() + val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG) + + if (result == nativeProcess.pid) { + val exitCode = WEXITSTATUS(!statusPtr) + Right(exitCode) + } else if (result == 0) { + Left(()) + } else { + if (errno.errno == ECHILD) { + throw new IOException("No such process") + } else { + throw new IOException( + s"waitpid failed with errno: ${errno.errno}" + ) + } + } + } + } + } + } + .to + } + } else { + fallbackExitValue(nativeProcess.pid) + } + } + } else { + fallbackExitValue(nativeProcess.pid) + } + + def stdin: Pipe[F, Byte, Nothing] = writeFd(nativeProcess.stdinFd) + + def stdout: Stream[F, Byte] = readFd(nativeProcess.stdoutFd, 8192) + + def stderr: Stream[F, Byte] = readFd(nativeProcess.stderrFd, 8192) + } + } + } + + private def fallbackExitValue(pid: pid_t): F[Int] = F.delay { + val status = stackalloc[CInt]() + val result = waitpid(pid, status, 0) + if (result < 0) throw new IOException(s"waitpid failed with errno: ${errno.errno}") + WEXITSTATUS(!status) + } + } +} + +case class NativeProcess( + pid: pid_t, + stdinFd: Int, + stdoutFd: Int, + stderrFd: Int +) From 4385bb1517fb259d5adefda35fc1484993bc7489 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Thu, 5 Jun 2025 23:16:26 +0530 Subject: [PATCH 02/29] unguard ProcessSuit and add nowarn212 --- .../src/main/scala/fs2/io/ioplatform.scala | 4 +- .../fs2/io/process/ProcessesPlatform.scala | 55 ++++++--- .../scala/fs2/io/process/ProcessSuite.scala | 114 +++++++++--------- 3 files changed, 92 insertions(+), 81 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/ioplatform.scala b/io/native/src/main/scala/fs2/io/ioplatform.scala index 099429c47b..210fc0822f 100644 --- a/io/native/src/main/scala/fs2/io/ioplatform.scala +++ b/io/native/src/main/scala/fs2/io/ioplatform.scala @@ -83,7 +83,7 @@ private[fs2] trait ioplatform extends iojvmnative { .resource { Resource .eval { - setNonBlocking(STDIN_FILENO) *> fileDescriptorPoller[F] + setNonBlocking(fd) *> fileDescriptorPoller[F] } .flatMap { poller => poller.registerFileDescriptor(fd, true, false).mapK(LiftIO.liftK) @@ -95,7 +95,7 @@ private[fs2] trait ioplatform extends iojvmnative { .pollReadRec(()) { _ => IO { val buf = new Array[Byte](bufSize) - val readed = guard(read(STDIN_FILENO, buf.atUnsafe(0), bufSize.toUSize)) + val readed = guard(read(fd, buf.atUnsafe(0), bufSize.toUSize)) if (readed > 0) Right(Some(Chunk.array(buf, 0, readed))) else if (readed == 0) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 8908fa446b..460d3ca6e8 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -36,10 +36,13 @@ import scala.scalanative.posix.unistd.* import scala.scalanative.posix.spawn.* import scala.scalanative.posix.signal.* import java.io.IOException +import scala.concurrent.duration.* import cats.effect.LiftIO import cats.effect.IO +import org.typelevel.scalaccompat.annotation._ @extern +@nowarn212("cat=unused") object SyscallBindings { def syscall(number: CLong, arg1: CLong, arg2: CLong): CLong = extern } @@ -92,22 +95,19 @@ private[process] trait ProcessesCompanionPlatform { posix_spawn_file_actions_init(fileActions) posix_spawn_file_actions_adddup2(fileActions, stdinPipe(0), STDIN_FILENO) - posix_spawn_file_actions_adddup2(fileActions, stdoutPipe(1), STDOUT_FILENO) - posix_spawn_file_actions_adddup2(fileActions, stderrPipe(1), STDERR_FILENO) - posix_spawn_file_actions_addclose(fileActions, stdinPipe(1)) + posix_spawn_file_actions_addclose(fileActions, stdinPipe(0)) + + posix_spawn_file_actions_adddup2(fileActions, stdoutPipe(1), STDOUT_FILENO) posix_spawn_file_actions_addclose(fileActions, stdoutPipe(0)) + posix_spawn_file_actions_addclose(fileActions, stdoutPipe(1)) + + posix_spawn_file_actions_adddup2(fileActions, stderrPipe(1), STDERR_FILENO) posix_spawn_file_actions_addclose(fileActions, stderrPipe(0)) + posix_spawn_file_actions_addclose(fileActions, stderrPipe(1)) val pid = stackalloc[pid_t]() - println("hi") - println("argv(0): " + fromCString(argv(0))) - var i = 0 - while (argv(i) != null) { - println(s" argv($i): " + fromCString(argv(i))) - i += 1 - } - val result = posix_spawn( + val result = posix_spawnp( pid, argv(0), fileActions, @@ -145,7 +145,7 @@ private[process] trait ProcessesCompanionPlatform { F.delay(kill(proc.pid, SIGKILL)) *> F.blocking { val status = stackalloc[CInt]() - val r = waitpid(proc.pid, status, 0) + val r = waitpid(proc.pid, status, WNOHANG) if (r < 0 && errno.errno != ECHILD) F.raiseError(new RuntimeException(s"waitpid failed: errno=${errno.errno}")) () @@ -192,11 +192,11 @@ private[process] trait ProcessesCompanionPlatform { .to } } else { - fallbackExitValue(nativeProcess.pid) + fallbackExitValue(nativeProcess.pid).to } } } else { - fallbackExitValue(nativeProcess.pid) + fallbackExitValue(nativeProcess.pid).to } def stdin: Pipe[F, Byte, Nothing] = writeFd(nativeProcess.stdinFd) @@ -208,12 +208,29 @@ private[process] trait ProcessesCompanionPlatform { } } - private def fallbackExitValue(pid: pid_t): F[Int] = F.delay { - val status = stackalloc[CInt]() - val result = waitpid(pid, status, 0) - if (result < 0) throw new IOException(s"waitpid failed with errno: ${errno.errno}") - WEXITSTATUS(!status) + private def fallbackExitValue(pid: pid_t): IO[Int] = { + def loop: IO[Int] = + IO.blocking { + Zone { _ => + val status = stackalloc[CInt]() + val result = waitpid(pid, status, WNOHANG) + if (result == pid) Some(WEXITSTATUS(!status)) + else if (result == 0) None + else throw new IOException(s"waitpid failed with errno: ${errno.errno}") + } + }.flatMap { + case Some(code) => IO.pure(code) + case None => IO.sleep(100.millis) >> loop + } + + loop.onCancel { + IO.blocking { + kill(pid, SIGKILL) + () + } + } } + } } diff --git a/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala b/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala index 118aa0edd1..9d1bdb2943 100644 --- a/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala +++ b/io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala @@ -72,20 +72,19 @@ class ProcessSuite extends Fs2Suite { } } - if (!isNative) - test("cat") { - ProcessBuilder("cat").spawn[IO].use { p => - val verySpecialMsg = "FS2 rocks!" - val in = Stream.emit(verySpecialMsg).through(fs2.text.utf8.encode).through(p.stdin) - val out = p.stdout.through(fs2.text.utf8.decode) - - out - .concurrently(in) - .compile - .string - .assertEquals(verySpecialMsg) - } + test("cat") { + ProcessBuilder("cat").spawn[IO].use { p => + val verySpecialMsg = "FS2 rocks!" + val in = Stream.emit(verySpecialMsg).through(fs2.text.utf8.encode).through(p.stdin) + val out = p.stdout.through(fs2.text.utf8.decode) + + out + .concurrently(in) + .compile + .string + .assertEquals(verySpecialMsg) } + } test("working directory") { Files[IO].tempDirectory.use { wd0 => @@ -125,60 +124,55 @@ class ProcessSuite extends Fs2Suite { } } - if (!isNative) - test("stdin cancelation") { - ProcessBuilder("cat") - .spawn[IO] - .use { p => - Stream - // apparently big enough to force `cat` to backpressure - .emit(Chunk.array(new Array[Byte](1024 * 1024))) - .unchunks - .repeat - .covary[IO] - .through(p.stdin) - .compile - .drain - } - .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang - } + test("stdin cancelation") { + ProcessBuilder("cat") + .spawn[IO] + .use { p => + Stream + // apparently big enough to force `cat` to backpressure + .emit(Chunk.array(new Array[Byte](1024 * 1024))) + .unchunks + .repeat + .covary[IO] + .through(p.stdin) + .compile + .drain + } + .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang + } - if (!isNative) - test("stdout cancelation") { - ProcessBuilder("cat") - .spawn[IO] - .use(_.stdout.compile.drain) - .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang - } + test("stdout cancelation") { + ProcessBuilder("cat") + .spawn[IO] + .use(_.stdout.compile.drain) + .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang + } - if (!isNative) - test("stderr cancelation") { - ProcessBuilder("cat") - .spawn[IO] - .use(_.stderr.compile.drain) - .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang - } + test("stderr cancelation") { + ProcessBuilder("cat") + .spawn[IO] + .use(_.stderr.compile.drain) + .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang + } - if (!isNative) - test("exit value cancelation") { - ProcessBuilder("cat") - .spawn[IO] - .use(_.exitValue.void) - .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang - } + test("exit value cancelation") { + ProcessBuilder("cat") + .spawn[IO] + .use(_.exitValue.void) + .timeoutTo(1.second, IO.unit) // assert that cancelation does not hang + } - if (!isNative) - test("flush") { - ProcessBuilder("cat").spawn[IO].use { p => - val in = (Stream.emit("all drains lead to the ocean") ++ Stream.never[IO]) - .through(fs2.text.utf8.encode) - .through(p.stdin) + test("flush") { + ProcessBuilder("cat").spawn[IO].use { p => + val in = (Stream.emit("all drains lead to the ocean") ++ Stream.never[IO]) + .through(fs2.text.utf8.encode) + .through(p.stdin) - val out = p.stdout.through(fs2.text.utf8.decode).exists(_.contains("ocean")) + val out = p.stdout.through(fs2.text.utf8.decode).exists(_.contains("ocean")) - out.concurrently(in).compile.drain // will hang if not flushed - } + out.concurrently(in).compile.drain // will hang if not flushed } + } test("close stdin") { ProcessBuilder("dd", "count=1").spawn[IO].use { p => From a67c869f7150bb451d5bea39747a68fa70e5d17d Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Thu, 5 Jun 2025 23:33:15 +0530 Subject: [PATCH 03/29] fixed F.raiseError --- .../src/main/scala/fs2/io/process/ProcessesPlatform.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 460d3ca6e8..d472f81dfe 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -88,7 +88,7 @@ private[process] trait ProcessesCompanionPlatform { val stderrPipe = stackalloc[CInt](2) if (pipe(stdinPipe) != 0 || pipe(stdoutPipe) != 0 || pipe(stderrPipe) != 0) { - F.raiseError(new RuntimeException("Failed to create stdin pipe")) + throw new RuntimeException("Failed to create pipes") } val fileActions = stackalloc[posix_spawn_file_actions_t]() @@ -96,15 +96,12 @@ private[process] trait ProcessesCompanionPlatform { posix_spawn_file_actions_adddup2(fileActions, stdinPipe(0), STDIN_FILENO) posix_spawn_file_actions_addclose(fileActions, stdinPipe(1)) - posix_spawn_file_actions_addclose(fileActions, stdinPipe(0)) posix_spawn_file_actions_adddup2(fileActions, stdoutPipe(1), STDOUT_FILENO) posix_spawn_file_actions_addclose(fileActions, stdoutPipe(0)) - posix_spawn_file_actions_addclose(fileActions, stdoutPipe(1)) posix_spawn_file_actions_adddup2(fileActions, stderrPipe(1), STDERR_FILENO) posix_spawn_file_actions_addclose(fileActions, stderrPipe(0)) - posix_spawn_file_actions_addclose(fileActions, stderrPipe(1)) val pid = stackalloc[pid_t]() val result = posix_spawnp( From 634ecc7bd6031cf6d82992d8d4010693ca3441be Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Sun, 8 Jun 2025 14:19:04 +0530 Subject: [PATCH 04/29] removed posix_spawnp and used fork,chdir,execve --- .../src/main/scala/fs2/io/ioplatform.scala | 1 - .../fs2/io/process/ProcessesPlatform.scala | 180 +++++++++++------- 2 files changed, 110 insertions(+), 71 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/ioplatform.scala b/io/native/src/main/scala/fs2/io/ioplatform.scala index 210fc0822f..57473b3cd0 100644 --- a/io/native/src/main/scala/fs2/io/ioplatform.scala +++ b/io/native/src/main/scala/fs2/io/ioplatform.scala @@ -31,7 +31,6 @@ import cats.effect.kernel.Resource import cats.effect.kernel.Sync import cats.syntax.all._ import fs2.io.internal.NativeUtil._ - import java.io.OutputStream import java.nio.charset.Charset import java.nio.charset.StandardCharsets diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index d472f81dfe..16b1b9d5c5 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -33,12 +33,12 @@ import scala.scalanative.posix.sys.wait.* import scala.scalanative.posix.errno.* import scala.scalanative.meta.LinktimeInfo import scala.scalanative.posix.unistd.* -import scala.scalanative.posix.spawn.* import scala.scalanative.posix.signal.* import java.io.IOException import scala.concurrent.duration.* import cats.effect.LiftIO import cats.effect.IO +import cats.effect.implicits.* import org.typelevel.scalaccompat.annotation._ @extern @@ -57,6 +57,13 @@ object PidFd { } } +final case class NativeProcess( + pid: pid_t, + stdinFd: Int, + stdoutFd: Int, + stderrFd: Int +) + private[process] trait ProcessesCompanionPlatform { def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] { @@ -64,25 +71,30 @@ private[process] trait ProcessesCompanionPlatform { def createProcess(): F[NativeProcess] = F.blocking { Zone { implicit z => - val allArgs = process.command +: process.args - val argv = stackalloc[CString](allArgs.length.toULong + 1.toULong) - allArgs.zipWithIndex.foreach { case (arg, i) => - argv(i.toULong) = toCString(arg) + def findExecutable(command: String): Option[String] = { + val pathEnv = sys.env.get("PATH").getOrElse("") + val paths = pathEnv.split(":").toList + + paths + .find { dir => + val fullPath = s"$dir/$command" + access(toCString(fullPath), X_OK) == 0 + } + .map(dir => s"$dir/$command") + } - argv(allArgs.length.toULong) = null val envMap = if (process.inheritEnv) sys.env ++ process.extraEnv - else - process.extraEnv - - val envp = stackalloc[CString]((envMap.size + 1).toULong) - envMap.zipWithIndex.foreach { case ((k, v), i) => - envp(i.toULong) = toCString(s"$k=$v") - } - envp(envMap.size.toULong) = null + else process.extraEnv + val executable = + if (process.command.contains("/")) { + process.command + } else { + findExecutable(process.command).getOrElse(process.command) + } val stdinPipe = stackalloc[CInt](2) val stdoutPipe = stackalloc[CInt](2) val stderrPipe = stackalloc[CInt](2) @@ -91,47 +103,62 @@ private[process] trait ProcessesCompanionPlatform { throw new RuntimeException("Failed to create pipes") } - val fileActions = stackalloc[posix_spawn_file_actions_t]() - posix_spawn_file_actions_init(fileActions) - - posix_spawn_file_actions_adddup2(fileActions, stdinPipe(0), STDIN_FILENO) - posix_spawn_file_actions_addclose(fileActions, stdinPipe(1)) - - posix_spawn_file_actions_adddup2(fileActions, stdoutPipe(1), STDOUT_FILENO) - posix_spawn_file_actions_addclose(fileActions, stdoutPipe(0)) - - posix_spawn_file_actions_adddup2(fileActions, stderrPipe(1), STDERR_FILENO) - posix_spawn_file_actions_addclose(fileActions, stderrPipe(0)) - - val pid = stackalloc[pid_t]() - val result = posix_spawnp( - pid, - argv(0), - fileActions, - null, - argv, - envp - ) - - posix_spawn_file_actions_destroy(fileActions) - - if (result != 0) { + val pid = fork() + if (pid < 0) { close(stdinPipe(0)); close(stdinPipe(1)) close(stdoutPipe(0)); close(stdoutPipe(1)) close(stderrPipe(0)); close(stderrPipe(1)) - throw new RuntimeException(s"posix_spawn failed: $result") - } + throw new RuntimeException("fork failed") + } else if (pid == 0) { + close(stdinPipe(1)) + close(stdoutPipe(0)) + close(stderrPipe(0)) + + if ( + dup2(stdinPipe(0), STDIN_FILENO) == -1 || + dup2(stdoutPipe(1), STDOUT_FILENO) == -1 || + dup2(stderrPipe(1), STDERR_FILENO) == -1 + ) { + _exit(1) + } - close(stdinPipe(0)) - close(stdoutPipe(1)) - close(stderrPipe(1)) + close(stdinPipe(0)) + close(stdoutPipe(1)) + close(stderrPipe(1)) - NativeProcess( - pid = !pid, - stdinFd = stdinPipe(1), - stdoutFd = stdoutPipe(0), - stderrFd = stderrPipe(0) - ) + process.workingDirectory.foreach { dir => + if (chdir(toCString(dir.toString)) != 0) { + _exit(1) + } + } + + val allArgs = process.command +: process.args + val argv = stackalloc[CString](allArgs.length.toULong + 1.toULong) + allArgs.zipWithIndex.foreach { case (arg, i) => + argv(i.toULong) = toCString(arg) + } + argv(allArgs.length.toULong) = null + + val envp = stackalloc[CString]((envMap.size + 1).toULong) + envMap.zipWithIndex.foreach { case ((k, v), i) => + envp(i.toULong) = toCString(s"$k=$v") + } + envp(envMap.size.toULong) = null + + execve(toCString(executable), argv, envp) + _exit(1) + throw new RuntimeException(s"execve failed") + } else { + close(stdinPipe(0)) + close(stdoutPipe(1)) + close(stderrPipe(1)) + NativeProcess( + pid = pid, + stdinFd = stdinPipe(1), + stdoutFd = stdoutPipe(0), + stderrFd = stderrPipe(0) + ) + } } } @@ -144,7 +171,7 @@ private[process] trait ProcessesCompanionPlatform { val status = stackalloc[CInt]() val r = waitpid(proc.pid, status, WNOHANG) if (r < 0 && errno.errno != ECHILD) - F.raiseError(new RuntimeException(s"waitpid failed: errno=${errno.errno}")) + throw new RuntimeException(s"waitpid failed: errno=${errno.errno}") () } @@ -189,39 +216,59 @@ private[process] trait ProcessesCompanionPlatform { .to } } else { - fallbackExitValue(nativeProcess.pid).to + fallbackExitValue(nativeProcess.pid) } } } else { - fallbackExitValue(nativeProcess.pid).to + fallbackExitValue(nativeProcess.pid) } - def stdin: Pipe[F, Byte, Nothing] = writeFd(nativeProcess.stdinFd) - + def stdin: Pipe[F, Byte, Nothing] = { in => + in + .through(writeFd(nativeProcess.stdinFd)) + .onFinalize { + F.blocking { + close(nativeProcess.stdinFd) + }.void + } + } def stdout: Stream[F, Byte] = readFd(nativeProcess.stdoutFd, 8192) + .onFinalize { + F.blocking { + close(nativeProcess.stdoutFd) + }.void + } def stderr: Stream[F, Byte] = readFd(nativeProcess.stderrFd, 8192) + .onFinalize { + F.blocking { + close(nativeProcess.stderrFd) + }.void + } } } } - private def fallbackExitValue(pid: pid_t): IO[Int] = { - def loop: IO[Int] = - IO.blocking { + private def fallbackExitValue(pid: pid_t): F[Int] = { + def loop: F[Int] = + F.delay { Zone { _ => val status = stackalloc[CInt]() val result = waitpid(pid, status, WNOHANG) - if (result == pid) Some(WEXITSTATUS(!status)) - else if (result == 0) None + + if (result == pid) { + println("bye bye process") + Some(WEXITSTATUS(!status)) + } else if (result == 0) None else throw new IOException(s"waitpid failed with errno: ${errno.errno}") } }.flatMap { - case Some(code) => IO.pure(code) - case None => IO.sleep(100.millis) >> loop + case Some(code) => F.pure(code) + case None => F.sleep(100.millis) >> loop } loop.onCancel { - IO.blocking { + F.blocking { kill(pid, SIGKILL) () } @@ -230,10 +277,3 @@ private[process] trait ProcessesCompanionPlatform { } } - -case class NativeProcess( - pid: pid_t, - stdinFd: Int, - stdoutFd: Int, - stderrFd: Int -) From 9af0b2e48555f4220c0e180b18c1a57d28ddbd99 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Sun, 8 Jun 2025 14:29:57 +0530 Subject: [PATCH 05/29] converted 2 to UInt --- .../main/scala/fs2/io/process/ProcessesPlatform.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 16b1b9d5c5..36fcc249e6 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -95,9 +95,9 @@ private[process] trait ProcessesCompanionPlatform { } else { findExecutable(process.command).getOrElse(process.command) } - val stdinPipe = stackalloc[CInt](2) - val stdoutPipe = stackalloc[CInt](2) - val stderrPipe = stackalloc[CInt](2) + val stdinPipe = stackalloc[CInt](2.toUInt) + val stdoutPipe = stackalloc[CInt](2.toUInt) + val stderrPipe = stackalloc[CInt](2.toUInt) if (pipe(stdinPipe) != 0 || pipe(stdoutPipe) != 0 || pipe(stderrPipe) != 0) { throw new RuntimeException("Failed to create pipes") @@ -251,13 +251,12 @@ private[process] trait ProcessesCompanionPlatform { private def fallbackExitValue(pid: pid_t): F[Int] = { def loop: F[Int] = - F.delay { + F.blocking { Zone { _ => val status = stackalloc[CInt]() val result = waitpid(pid, status, WNOHANG) if (result == pid) { - println("bye bye process") Some(WEXITSTATUS(!status)) } else if (result == 0) None else throw new IOException(s"waitpid failed with errno: ${errno.errno}") From 1d904ffcda6f6ef42442df6aa7319de37d1b0c5e Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Sun, 8 Jun 2025 14:47:16 +0530 Subject: [PATCH 06/29] removed unused imports --- io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 36fcc249e6..37a50b492c 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -25,7 +25,6 @@ package process import cats.effect.kernel.{Async, Resource} import cats.syntax.all.* -import fs2.{Stream, Pipe} import scala.scalanative.unsafe.* import scala.scalanative.unsigned.* import scala.scalanative.libc.* From 6a292525ee6e4cfc5cb455c620aa91dda592ed82 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Fri, 13 Jun 2025 11:59:23 +0530 Subject: [PATCH 07/29] removed PIDFD_NONBLOCK --- .../src/main/scala/fs2/io/process/ProcessesPlatform.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 37a50b492c..379804569f 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -46,9 +46,8 @@ object SyscallBindings { def syscall(number: CLong, arg1: CLong, arg2: CLong): CLong = extern } -object PidFd { +object pidFd { private val SYS_pidfd_open = 434L - val PIDFD_NONBLOCK = 1 def pidfd_open(pid: pid_t, flags: Int): Int = { val fd = SyscallBindings.syscall(SYS_pidfd_open, pid.toLong, flags.toLong) @@ -182,7 +181,7 @@ private[process] trait ProcessesCompanionPlatform { def exitValue: F[Int] = if (LinktimeInfo.isLinux) { - F.delay(PidFd.pidfd_open(nativeProcess.pid, PidFd.PIDFD_NONBLOCK)).flatMap { pidfd => + F.delay(pidFd.pidfd_open(nativeProcess.pid, 0)).flatMap { pidfd => if (pidfd >= 0) { fileDescriptorPoller[F].flatMap { poller => poller From 0ef2cc5a290208cc9ea556bec9ec60df4032c225 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Thu, 19 Jun 2025 13:09:01 +0530 Subject: [PATCH 08/29] added fallback --- .../fs2/io/process/ProcessesPlatform.scala | 96 +++++ .../fs2/io/process/ProcessesPlatform.scala | 72 +--- .../fs2/io/process/ProcessesPlatform.scala | 345 +++++++++--------- 3 files changed, 271 insertions(+), 242 deletions(-) create mode 100644 io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala diff --git a/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala new file mode 100644 index 0000000000..f001dbb768 --- /dev/null +++ b/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package process + +import cats.effect.kernel.Async +import cats.effect.kernel.Resource +import cats.syntax.all.* +import fs2.io.CollectionCompat.* + +import java.lang + +private[process] trait Processesjvmnative { + def forAsync[F[_]](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] { + + def spawn(process: ProcessBuilder): Resource[F, Process[F]] = + Resource + .make { + F.blocking { + val builder = new lang.ProcessBuilder((process.command :: process.args).asJava) + + process.workingDirectory.foreach { path => + builder.directory(path.toNioPath.toFile) + } + + val env = builder.environment() + if (!process.inheritEnv) env.clear() + process.extraEnv.foreach { case (k, v) => + env.put(k, v) + } + + builder.start() + } + } { process => + F.delay(process.isAlive()) + .ifM( + evalOnVirtualThreadIfAvailable( + F.blocking { + process.destroy() + process.waitFor() + () + } + ), + F.unit + ) + } + .map { process => + new UnsealedProcess[F] { + def isAlive = F.delay(process.isAlive()) + + def exitValue = isAlive.ifM( + evalOnVirtualThreadIfAvailable(F.interruptible(process.waitFor())), + F.delay(process.exitValue()) + ) + + def stdin = writeOutputStreamCancelable( + F.delay(process.getOutputStream()), + F.blocking(process.destroy()) + ) + + def stdout = readInputStreamCancelable( + F.delay(process.getInputStream()), + F.blocking(process.destroy()), + 8192 + ) + + def stderr = readInputStreamCancelable( + F.delay(process.getErrorStream()), + F.blocking(process.destroy()), + 8192 + ) + + } + } + } +} diff --git a/io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 0c9f5452bf..483f27ad2d 100644 --- a/io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -23,74 +23,4 @@ package fs2 package io package process -import cats.effect.kernel.Async -import cats.effect.kernel.Resource -import cats.syntax.all.* -import fs2.io.CollectionCompat.* - -import java.lang - -private[process] trait ProcessesCompanionPlatform { - def forAsync[F[_]](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] { - - def spawn(process: ProcessBuilder): Resource[F, Process[F]] = - Resource - .make { - F.blocking { - val builder = new lang.ProcessBuilder((process.command :: process.args).asJava) - - process.workingDirectory.foreach { path => - builder.directory(path.toNioPath.toFile) - } - - val env = builder.environment() - if (!process.inheritEnv) env.clear() - process.extraEnv.foreach { case (k, v) => - env.put(k, v) - } - - builder.start() - } - } { process => - F.delay(process.isAlive()) - .ifM( - evalOnVirtualThreadIfAvailable( - F.blocking { - process.destroy() - process.waitFor() - () - } - ), - F.unit - ) - } - .map { process => - new UnsealedProcess[F] { - def isAlive = F.delay(process.isAlive()) - - def exitValue = isAlive.ifM( - evalOnVirtualThreadIfAvailable(F.interruptible(process.waitFor())), - F.delay(process.exitValue()) - ) - - def stdin = writeOutputStreamCancelable( - F.delay(process.getOutputStream()), - F.blocking(process.destroy()) - ) - - def stdout = readInputStreamCancelable( - F.delay(process.getInputStream()), - F.blocking(process.destroy()), - 8192 - ) - - def stderr = readInputStreamCancelable( - F.delay(process.getErrorStream()), - F.blocking(process.destroy()), - 8192 - ) - - } - } - } -} +private[process] trait ProcessesCompanionPlatform extends Processesjvmnative diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 379804569f..570cfd90b3 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -62,215 +62,218 @@ final case class NativeProcess( stderrFd: Int ) -private[process] trait ProcessesCompanionPlatform { - def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] { +private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { + def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] = + if (LinktimeInfo.isMac || LinktimeInfo.isLinux) { + new UnsealedProcesses[F] { - def spawn(process: ProcessBuilder): Resource[F, Process[F]] = { + def spawn(process: ProcessBuilder): Resource[F, Process[F]] = { - def createProcess(): F[NativeProcess] = F.blocking { - Zone { implicit z => - def findExecutable(command: String): Option[String] = { - val pathEnv = sys.env.get("PATH").getOrElse("") - val paths = pathEnv.split(":").toList + def createProcess(): F[NativeProcess] = F.blocking { + Zone { implicit z => + def findExecutable(command: String): Option[String] = { + val pathEnv = sys.env.get("PATH").getOrElse("") + val paths = pathEnv.split(":").toList + + paths + .find { dir => + val fullPath = s"$dir/$command" + access(toCString(fullPath), X_OK) == 0 + } + .map(dir => s"$dir/$command") - paths - .find { dir => - val fullPath = s"$dir/$command" - access(toCString(fullPath), X_OK) == 0 } - .map(dir => s"$dir/$command") - } + val envMap = + if (process.inheritEnv) + sys.env ++ process.extraEnv + else process.extraEnv - val envMap = - if (process.inheritEnv) - sys.env ++ process.extraEnv - else process.extraEnv + val executable = + if (process.command.contains("/")) { + process.command + } else { + findExecutable(process.command).getOrElse(process.command) + } + val stdinPipe = stackalloc[CInt](2.toUInt) + val stdoutPipe = stackalloc[CInt](2.toUInt) + val stderrPipe = stackalloc[CInt](2.toUInt) - val executable = - if (process.command.contains("/")) { - process.command - } else { - findExecutable(process.command).getOrElse(process.command) - } - val stdinPipe = stackalloc[CInt](2.toUInt) - val stdoutPipe = stackalloc[CInt](2.toUInt) - val stderrPipe = stackalloc[CInt](2.toUInt) + if (pipe(stdinPipe) != 0 || pipe(stdoutPipe) != 0 || pipe(stderrPipe) != 0) { + throw new RuntimeException("Failed to create pipes") + } - if (pipe(stdinPipe) != 0 || pipe(stdoutPipe) != 0 || pipe(stderrPipe) != 0) { - throw new RuntimeException("Failed to create pipes") - } + val pid = fork() + if (pid < 0) { + close(stdinPipe(0)); close(stdinPipe(1)) + close(stdoutPipe(0)); close(stdoutPipe(1)) + close(stderrPipe(0)); close(stderrPipe(1)) + throw new RuntimeException("fork failed") + } else if (pid == 0) { + close(stdinPipe(1)) + close(stdoutPipe(0)) + close(stderrPipe(0)) - val pid = fork() - if (pid < 0) { - close(stdinPipe(0)); close(stdinPipe(1)) - close(stdoutPipe(0)); close(stdoutPipe(1)) - close(stderrPipe(0)); close(stderrPipe(1)) - throw new RuntimeException("fork failed") - } else if (pid == 0) { - close(stdinPipe(1)) - close(stdoutPipe(0)) - close(stderrPipe(0)) + if ( + dup2(stdinPipe(0), STDIN_FILENO) == -1 || + dup2(stdoutPipe(1), STDOUT_FILENO) == -1 || + dup2(stderrPipe(1), STDERR_FILENO) == -1 + ) { + _exit(1) + } - if ( - dup2(stdinPipe(0), STDIN_FILENO) == -1 || - dup2(stdoutPipe(1), STDOUT_FILENO) == -1 || - dup2(stderrPipe(1), STDERR_FILENO) == -1 - ) { - _exit(1) - } + close(stdinPipe(0)) + close(stdoutPipe(1)) + close(stderrPipe(1)) - close(stdinPipe(0)) - close(stdoutPipe(1)) - close(stderrPipe(1)) + process.workingDirectory.foreach { dir => + if (chdir(toCString(dir.toString)) != 0) { + _exit(1) + } + } - process.workingDirectory.foreach { dir => - if (chdir(toCString(dir.toString)) != 0) { - _exit(1) - } - } + val allArgs = process.command +: process.args + val argv = stackalloc[CString](allArgs.length.toULong + 1.toULong) + allArgs.zipWithIndex.foreach { case (arg, i) => + argv(i.toULong) = toCString(arg) + } + argv(allArgs.length.toULong) = null - val allArgs = process.command +: process.args - val argv = stackalloc[CString](allArgs.length.toULong + 1.toULong) - allArgs.zipWithIndex.foreach { case (arg, i) => - argv(i.toULong) = toCString(arg) - } - argv(allArgs.length.toULong) = null + val envp = stackalloc[CString]((envMap.size + 1).toULong) + envMap.zipWithIndex.foreach { case ((k, v), i) => + envp(i.toULong) = toCString(s"$k=$v") + } + envp(envMap.size.toULong) = null - val envp = stackalloc[CString]((envMap.size + 1).toULong) - envMap.zipWithIndex.foreach { case ((k, v), i) => - envp(i.toULong) = toCString(s"$k=$v") + execve(toCString(executable), argv, envp) + _exit(1) + throw new RuntimeException(s"execve failed") + } else { + close(stdinPipe(0)) + close(stdoutPipe(1)) + close(stderrPipe(1)) + NativeProcess( + pid = pid, + stdinFd = stdinPipe(1), + stdoutFd = stdoutPipe(0), + stderrFd = stderrPipe(0) + ) + } } - envp(envMap.size.toULong) = null - - execve(toCString(executable), argv, envp) - _exit(1) - throw new RuntimeException(s"execve failed") - } else { - close(stdinPipe(0)) - close(stdoutPipe(1)) - close(stderrPipe(1)) - NativeProcess( - pid = pid, - stdinFd = stdinPipe(1), - stdoutFd = stdoutPipe(0), - stderrFd = stderrPipe(0) - ) } - } - } - def cleanup(proc: NativeProcess): F[Unit] = - F.blocking { - close(proc.stdinFd); close(proc.stdoutFd); close(proc.stderrFd) - } *> - F.delay(kill(proc.pid, SIGKILL)) *> - F.blocking { - val status = stackalloc[CInt]() - val r = waitpid(proc.pid, status, WNOHANG) - if (r < 0 && errno.errno != ECHILD) - throw new RuntimeException(s"waitpid failed: errno=${errno.errno}") - () - } + def cleanup(proc: NativeProcess): F[Unit] = + F.blocking { + close(proc.stdinFd); close(proc.stdoutFd); close(proc.stderrFd) + } *> + F.delay(kill(proc.pid, SIGKILL)) *> + F.blocking { + val status = stackalloc[CInt]() + val r = waitpid(proc.pid, status, WNOHANG) + if (r < 0 && errno.errno != ECHILD) + throw new RuntimeException(s"waitpid failed: errno=${errno.errno}") + () + } - Resource.make(createProcess())(cleanup).map { nativeProcess => - new UnsealedProcess[F] { - def isAlive: F[Boolean] = F.delay { - kill(nativeProcess.pid, 0) == 0 || errno.errno != ESRCH - } + Resource.make(createProcess())(cleanup).map { nativeProcess => + new UnsealedProcess[F] { + def isAlive: F[Boolean] = F.delay { + kill(nativeProcess.pid, 0) == 0 || errno.errno != ESRCH + } - def exitValue: F[Int] = - if (LinktimeInfo.isLinux) { - F.delay(pidFd.pidfd_open(nativeProcess.pid, 0)).flatMap { pidfd => - if (pidfd >= 0) { - fileDescriptorPoller[F].flatMap { poller => - poller - .registerFileDescriptor(pidfd, true, false) - .use { handle => - handle.pollReadRec(()) { _ => - IO { - Zone { _ => - val statusPtr = stackalloc[CInt]() - val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG) + def exitValue: F[Int] = + if (LinktimeInfo.isLinux) { + F.delay(pidFd.pidfd_open(nativeProcess.pid, 0)).flatMap { pidfd => + if (pidfd >= 0) { + fileDescriptorPoller[F].flatMap { poller => + poller + .registerFileDescriptor(pidfd, true, false) + .use { handle => + handle.pollReadRec(()) { _ => + IO { + Zone { _ => + val statusPtr = stackalloc[CInt]() + val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG) - if (result == nativeProcess.pid) { - val exitCode = WEXITSTATUS(!statusPtr) - Right(exitCode) - } else if (result == 0) { - Left(()) - } else { - if (errno.errno == ECHILD) { - throw new IOException("No such process") - } else { - throw new IOException( - s"waitpid failed with errno: ${errno.errno}" - ) + if (result == nativeProcess.pid) { + val exitCode = WEXITSTATUS(!statusPtr) + Right(exitCode) + } else if (result == 0) { + Left(()) + } else { + if (errno.errno == ECHILD) { + throw new IOException("No such process") + } else { + throw new IOException( + s"waitpid failed with errno: ${errno.errno}" + ) + } + } } } } } - } + .to } - .to + } else { + fallbackExitValue(nativeProcess.pid) + } } } else { fallbackExitValue(nativeProcess.pid) } - } - } else { - fallbackExitValue(nativeProcess.pid) - } - def stdin: Pipe[F, Byte, Nothing] = { in => - in - .through(writeFd(nativeProcess.stdinFd)) - .onFinalize { - F.blocking { - close(nativeProcess.stdinFd) - }.void + def stdin: Pipe[F, Byte, Nothing] = { in => + in + .through(writeFd(nativeProcess.stdinFd)) + .onFinalize { + F.blocking { + close(nativeProcess.stdinFd) + }.void + } } - } - def stdout: Stream[F, Byte] = readFd(nativeProcess.stdoutFd, 8192) - .onFinalize { - F.blocking { - close(nativeProcess.stdoutFd) - }.void - } + def stdout: Stream[F, Byte] = readFd(nativeProcess.stdoutFd, 8192) + .onFinalize { + F.blocking { + close(nativeProcess.stdoutFd) + }.void + } - def stderr: Stream[F, Byte] = readFd(nativeProcess.stderrFd, 8192) - .onFinalize { - F.blocking { - close(nativeProcess.stderrFd) - }.void + def stderr: Stream[F, Byte] = readFd(nativeProcess.stderrFd, 8192) + .onFinalize { + F.blocking { + close(nativeProcess.stderrFd) + }.void + } } + } } - } - } - private def fallbackExitValue(pid: pid_t): F[Int] = { - def loop: F[Int] = - F.blocking { - Zone { _ => - val status = stackalloc[CInt]() - val result = waitpid(pid, status, WNOHANG) + private def fallbackExitValue(pid: pid_t): F[Int] = { + def loop: F[Int] = + F.blocking { + Zone { _ => + val status = stackalloc[CInt]() + val result = waitpid(pid, status, WNOHANG) - if (result == pid) { - Some(WEXITSTATUS(!status)) - } else if (result == 0) None - else throw new IOException(s"waitpid failed with errno: ${errno.errno}") + if (result == pid) { + Some(WEXITSTATUS(!status)) + } else if (result == 0) None + else throw new IOException(s"waitpid failed with errno: ${errno.errno}") + } + }.flatMap { + case Some(code) => F.pure(code) + case None => F.sleep(100.millis) >> loop + } + + loop.onCancel { + F.blocking { + kill(pid, SIGKILL) + () + } } - }.flatMap { - case Some(code) => F.pure(code) - case None => F.sleep(100.millis) >> loop } - loop.onCancel { - F.blocking { - kill(pid, SIGKILL) - () - } } - } - - } + } else super.forAsync[F] } From 526f1dbf160582a3887df413fd7db14510c403dc Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Sat, 12 Jul 2025 20:49:54 +0530 Subject: [PATCH 09/29] refactored cleanup and exitvalue --- .../fs2/io/process/ProcessesPlatform.scala | 221 ++++++++---------- 1 file changed, 99 insertions(+), 122 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 570cfd90b3..565af2ad31 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -34,10 +34,8 @@ import scala.scalanative.meta.LinktimeInfo import scala.scalanative.posix.unistd.* import scala.scalanative.posix.signal.* import java.io.IOException -import scala.concurrent.duration.* import cats.effect.LiftIO import cats.effect.IO -import cats.effect.implicits.* import org.typelevel.scalaccompat.annotation._ @extern @@ -63,38 +61,28 @@ final case class NativeProcess( ) private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { + + private def findExecutable(cmd: String)(implicit z: Zone): Option[String] = { + val pathEnv = sys.env.getOrElse("PATH", "") + pathEnv + .split(':') + .find { dir => + val full = s"$dir/$cmd" + access(toCString(full), X_OK) == 0 + } + .map(dir => s"$dir/$cmd") + } + + @inline private def closeAll(fds: Int*): Unit = + fds.foreach(fd => if (fd >= 0) close(fd)) + def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] = if (LinktimeInfo.isMac || LinktimeInfo.isLinux) { new UnsealedProcesses[F] { - def spawn(process: ProcessBuilder): Resource[F, Process[F]] = { def createProcess(): F[NativeProcess] = F.blocking { Zone { implicit z => - def findExecutable(command: String): Option[String] = { - val pathEnv = sys.env.get("PATH").getOrElse("") - val paths = pathEnv.split(":").toList - - paths - .find { dir => - val fullPath = s"$dir/$command" - access(toCString(fullPath), X_OK) == 0 - } - .map(dir => s"$dir/$command") - - } - - val envMap = - if (process.inheritEnv) - sys.env ++ process.extraEnv - else process.extraEnv - - val executable = - if (process.command.contains("/")) { - process.command - } else { - findExecutable(process.command).getOrElse(process.command) - } val stdinPipe = stackalloc[CInt](2.toUInt) val stdoutPipe = stackalloc[CInt](2.toUInt) val stderrPipe = stackalloc[CInt](2.toUInt) @@ -102,83 +90,92 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { if (pipe(stdinPipe) != 0 || pipe(stdoutPipe) != 0 || pipe(stderrPipe) != 0) { throw new RuntimeException("Failed to create pipes") } + val envMap = + if (process.inheritEnv) + sys.env ++ process.extraEnv + else process.extraEnv - val pid = fork() - if (pid < 0) { - close(stdinPipe(0)); close(stdinPipe(1)) - close(stdoutPipe(0)); close(stdoutPipe(1)) - close(stderrPipe(0)); close(stderrPipe(1)) - throw new RuntimeException("fork failed") - } else if (pid == 0) { - close(stdinPipe(1)) - close(stdoutPipe(0)) - close(stderrPipe(0)) + val envp = stackalloc[CString]((envMap.size + 1).toULong) + envMap.zipWithIndex.foreach { case ((k, v), i) => + envp(i.toULong) = toCString(s"$k=$v") + } + envp(envMap.size.toULong) = null - if ( - dup2(stdinPipe(0), STDIN_FILENO) == -1 || - dup2(stdoutPipe(1), STDOUT_FILENO) == -1 || - dup2(stderrPipe(1), STDERR_FILENO) == -1 - ) { - _exit(1) - } + val allArgs = process.command +: process.args + val argv = stackalloc[CString](allArgs.length.toULong + 1.toULong) + allArgs.zipWithIndex.foreach { case (arg, i) => + argv(i.toULong) = toCString(arg) + } + argv(allArgs.length.toULong) = null - close(stdinPipe(0)) - close(stdoutPipe(1)) - close(stderrPipe(1)) + val executable = findExecutable(process.command).getOrElse(process.command) - process.workingDirectory.foreach { dir => - if (chdir(toCString(dir.toString)) != 0) { + fork() match { + case -1 => + closeAll( + stdinPipe(0), + stdinPipe(1), + stdoutPipe(0), + stdoutPipe(1), + stderrPipe(0), + stderrPipe(1) + ) + throw new IOException("Unable to fork process") + case 0 => + closeAll(stdinPipe(1), stdoutPipe(0), stderrPipe(0)) + if ( + dup2(stdinPipe(0), STDIN_FILENO) == -1 || + dup2(stdoutPipe(1), STDOUT_FILENO) == -1 || + dup2(stderrPipe(1), STDERR_FILENO) == -1 + ) { _exit(1) + throw new IOException("Unable to redirect file descriptors") } - } + closeAll(stdinPipe(0), stdoutPipe(1), stderrPipe(1)) - val allArgs = process.command +: process.args - val argv = stackalloc[CString](allArgs.length.toULong + 1.toULong) - allArgs.zipWithIndex.foreach { case (arg, i) => - argv(i.toULong) = toCString(arg) - } - argv(allArgs.length.toULong) = null - - val envp = stackalloc[CString]((envMap.size + 1).toULong) - envMap.zipWithIndex.foreach { case ((k, v), i) => - envp(i.toULong) = toCString(s"$k=$v") - } - envp(envMap.size.toULong) = null + process.workingDirectory.foreach { dir => + if ((dir != null) && (dir.toString != ".")) + chdir(toCString(dir.toString)) + } - execve(toCString(executable), argv, envp) - _exit(1) - throw new RuntimeException(s"execve failed") - } else { - close(stdinPipe(0)) - close(stdoutPipe(1)) - close(stderrPipe(1)) - NativeProcess( - pid = pid, - stdinFd = stdinPipe(1), - stdoutFd = stdoutPipe(0), - stderrFd = stderrPipe(0) - ) + execve(toCString(executable), argv, envp) + _exit(127) + throw new IOException(s"Failed to create process for command: ${process.command}") + case pid => + closeAll(stdinPipe(0), stdoutPipe(1), stderrPipe(1)) + NativeProcess( + pid = pid, + stdinFd = stdinPipe(1), + stdoutFd = stdoutPipe(0), + stderrFd = stderrPipe(0) + ) } } } def cleanup(proc: NativeProcess): F[Unit] = F.blocking { - close(proc.stdinFd); close(proc.stdoutFd); close(proc.stderrFd) - } *> - F.delay(kill(proc.pid, SIGKILL)) *> - F.blocking { + closeAll(proc.stdinFd, proc.stdoutFd, proc.stderrFd) + val alive = { + val res = kill(proc.pid, 0) + res == 0 || errno.errno == EPERM + } + if (alive) { + kill(proc.pid, SIGKILL) + val status = stackalloc[CInt]() + waitpid(proc.pid, status, 0) + () + } else { val status = stackalloc[CInt]() - val r = waitpid(proc.pid, status, WNOHANG) - if (r < 0 && errno.errno != ECHILD) - throw new RuntimeException(s"waitpid failed: errno=${errno.errno}") + waitpid(proc.pid, status, WNOHANG) () } + } Resource.make(createProcess())(cleanup).map { nativeProcess => new UnsealedProcess[F] { def isAlive: F[Boolean] = F.delay { - kill(nativeProcess.pid, 0) == 0 || errno.errno != ESRCH + kill(nativeProcess.pid, 0) == 0 || errno.errno != EPERM } def exitValue: F[Int] = @@ -191,23 +188,21 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { .use { handle => handle.pollReadRec(()) { _ => IO { - Zone { _ => - val statusPtr = stackalloc[CInt]() - val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG) + val statusPtr = stackalloc[CInt]() + val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG) - if (result == nativeProcess.pid) { - val exitCode = WEXITSTATUS(!statusPtr) - Right(exitCode) - } else if (result == 0) { - Left(()) + if (result == nativeProcess.pid) { + val exitCode = WEXITSTATUS(!statusPtr) + Right(exitCode) + } else if (result == 0) { + Left(()) + } else { + if (errno.errno == ECHILD) { + throw new IOException("No such process") } else { - if (errno.errno == ECHILD) { - throw new IOException("No such process") - } else { - throw new IOException( - s"waitpid failed with errno: ${errno.errno}" - ) - } + throw new IOException( + s"waitpid failed with errno: ${errno.errno}" + ) } } } @@ -249,31 +244,13 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { } } - private def fallbackExitValue(pid: pid_t): F[Int] = { - def loop: F[Int] = - F.blocking { - Zone { _ => - val status = stackalloc[CInt]() - val result = waitpid(pid, status, WNOHANG) - - if (result == pid) { - Some(WEXITSTATUS(!status)) - } else if (result == 0) None - else throw new IOException(s"waitpid failed with errno: ${errno.errno}") - } - }.flatMap { - case Some(code) => F.pure(code) - case None => F.sleep(100.millis) >> loop - } - - loop.onCancel { - F.blocking { - kill(pid, SIGKILL) - () - } + private def fallbackExitValue(pid: pid_t): F[Int] = + F.blocking { + val status = stackalloc[CInt]() + val result = waitpid(pid, status, 0) + if (result == pid) WEXITSTATUS(!status) + else throw new IOException(s"waitpid failed with errno: ${errno.errno}") } - } - } } else super.forAsync[F] } From 953a825f91117961c0a4def0222c108f3a8731da Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Sat, 12 Jul 2025 21:35:24 +0530 Subject: [PATCH 10/29] return unit --- .../main/scala/fs2/io/process/ProcessesPlatform.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 565af2ad31..2fc54f1191 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -74,7 +74,7 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { } @inline private def closeAll(fds: Int*): Unit = - fds.foreach(fd => if (fd >= 0) close(fd)) + fds.foreach { fd => close(fd); () } def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] = if (LinktimeInfo.isMac || LinktimeInfo.isLinux) { @@ -134,8 +134,11 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { closeAll(stdinPipe(0), stdoutPipe(1), stderrPipe(1)) process.workingDirectory.foreach { dir => - if ((dir != null) && (dir.toString != ".")) - chdir(toCString(dir.toString)) + if ((dir != null) && (dir.toString != ".")) { + val ret = chdir(toCString(dir.toString)) + if (ret != 0) + throw new IOException(s"Failed to chdir to ${dir.toString}") + } } execve(toCString(executable), argv, envp) From f341ab1bd680aedd40c1972c9bb1b3c445b00f1a Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Sun, 13 Jul 2025 19:27:22 +0530 Subject: [PATCH 11/29] format --- io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 2fc54f1191..76b22ad970 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -137,7 +137,7 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { if ((dir != null) && (dir.toString != ".")) { val ret = chdir(toCString(dir.toString)) if (ret != 0) - throw new IOException(s"Failed to chdir to ${dir.toString}") + throw new IOException(s"Failed to to ${dir.toString}") } } From 42e726458a9a4807955a9c333afeba228b7fc713 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Sun, 13 Jul 2025 19:27:40 +0530 Subject: [PATCH 12/29] format --- io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 76b22ad970..2fc54f1191 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -137,7 +137,7 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { if ((dir != null) && (dir.toString != ".")) { val ret = chdir(toCString(dir.toString)) if (ret != 0) - throw new IOException(s"Failed to to ${dir.toString}") + throw new IOException(s"Failed to chdir to ${dir.toString}") } } From f1486b29d07b8f01ec053ae72aba077da711b893 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Wed, 23 Jul 2025 23:16:08 +0530 Subject: [PATCH 13/29] changed fallbackexitval --- .../fs2/io/process/ProcessesPlatform.scala | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 2fc54f1191..311ec24ca8 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -37,6 +37,8 @@ import java.io.IOException import cats.effect.LiftIO import cats.effect.IO import org.typelevel.scalaccompat.annotation._ +import scala.concurrent.duration.* +import cats.effect.implicits.* @extern @nowarn212("cat=unused") @@ -247,13 +249,30 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { } } - private def fallbackExitValue(pid: pid_t): F[Int] = - F.blocking { - val status = stackalloc[CInt]() - val result = waitpid(pid, status, 0) - if (result == pid) WEXITSTATUS(!status) - else throw new IOException(s"waitpid failed with errno: ${errno.errno}") + private def fallbackExitValue(pid: pid_t): F[Int] = { + def loop: F[Int] = + F.blocking { + Zone { _ => + val status = stackalloc[CInt]() + val result = waitpid(pid, status, WNOHANG) + + if (result == pid) { + Some(WEXITSTATUS(!status)) + } else if (result == 0) None + else throw new IOException(s"waitpid failed with errno: ${errno.errno}") + } + }.flatMap { + case Some(code) => F.pure(code) + case None => F.sleep(10.millis) >> loop + } + + loop.onCancel { + F.blocking { + kill(pid, SIGKILL) + () + } } + } } } else super.forAsync[F] } From 600671c1394ce96294069c0ce51b0692e05b4f0d Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Thu, 7 Aug 2025 19:39:15 +0530 Subject: [PATCH 14/29] used guard_ --- ...cala => ProcessesCompanionJvmNative.scala} | 2 +- .../fs2/io/process/ProcessesPlatform.scala | 2 +- .../fs2/io/process/ProcessesPlatform.scala | 230 ++++++++---------- 3 files changed, 110 insertions(+), 124 deletions(-) rename io/jvm-native/src/main/scala/fs2/io/process/{ProcessesPlatform.scala => ProcessesCompanionJvmNative.scala} (98%) diff --git a/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/jvm-native/src/main/scala/fs2/io/process/ProcessesCompanionJvmNative.scala similarity index 98% rename from io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala rename to io/jvm-native/src/main/scala/fs2/io/process/ProcessesCompanionJvmNative.scala index f001dbb768..d7e2da2d55 100644 --- a/io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/jvm-native/src/main/scala/fs2/io/process/ProcessesCompanionJvmNative.scala @@ -30,7 +30,7 @@ import fs2.io.CollectionCompat.* import java.lang -private[process] trait Processesjvmnative { +private[process] trait ProcessesCompanionJvmNative { def forAsync[F[_]](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] { def spawn(process: ProcessBuilder): Resource[F, Process[F]] = diff --git a/io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 483f27ad2d..7bd3f693c1 100644 --- a/io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/jvm/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -23,4 +23,4 @@ package fs2 package io package process -private[process] trait ProcessesCompanionPlatform extends Processesjvmnative +private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmNative diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 311ec24ca8..866fd0e9dd 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -37,32 +37,23 @@ import java.io.IOException import cats.effect.LiftIO import cats.effect.IO import org.typelevel.scalaccompat.annotation._ -import scala.concurrent.duration.* -import cats.effect.implicits.* +import fs2.io.internal.NativeUtil._ @extern @nowarn212("cat=unused") -object SyscallBindings { - def syscall(number: CLong, arg1: CLong, arg2: CLong): CLong = extern +object LibC { + def pidfd_open(pid: CInt, flags: CInt): CInt = extern } -object pidFd { - private val SYS_pidfd_open = 434L - - def pidfd_open(pid: pid_t, flags: Int): Int = { - val fd = SyscallBindings.syscall(SYS_pidfd_open, pid.toLong, flags.toLong) - fd.toInt - } -} - -final case class NativeProcess( +private final case class NativeProcess( pid: pid_t, stdinFd: Int, stdoutFd: Int, - stderrFd: Int + stderrFd: Int, + pidfd: Option[Int] = None ) -private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { +private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmNative { private def findExecutable(cmd: String)(implicit z: Zone): Option[String] = { val pathEnv = sys.env.getOrElse("PATH", "") @@ -76,22 +67,35 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { } @inline private def closeAll(fds: Int*): Unit = - fds.foreach { fd => close(fd); () } + fds.foreach(close) + + def pipeResource[F[_]](implicit F: Async[F]): Resource[F, (Int, Int)] = + Resource.make { + F.blocking { + val fds = stackalloc[CInt](2.toUInt) + guard_(pipe(fds)) + (fds(0), fds(1)) + } + } { case (r, w) => F.blocking { close(r); close(w); () } } def forAsync[F[_]: LiftIO](implicit F: Async[F]): Processes[F] = if (LinktimeInfo.isMac || LinktimeInfo.isLinux) { new UnsealedProcesses[F] { def spawn(process: ProcessBuilder): Resource[F, Process[F]] = { - def createProcess(): F[NativeProcess] = F.blocking { - Zone { implicit z => - val stdinPipe = stackalloc[CInt](2.toUInt) - val stdoutPipe = stackalloc[CInt](2.toUInt) - val stderrPipe = stackalloc[CInt](2.toUInt) + val pipesResource: Resource[F, ((Int, Int), (Int, Int), (Int, Int))] = + for { + stdinPipe <- pipeResource[F] + stdoutPipe <- pipeResource[F] + stderrPipe <- pipeResource[F] + } yield (stdinPipe, stdoutPipe, stderrPipe) - if (pipe(stdinPipe) != 0 || pipe(stdoutPipe) != 0 || pipe(stderrPipe) != 0) { - throw new RuntimeException("Failed to create pipes") - } + def createProcess( + stdinPipe: (Int, Int), + stdoutPipe: (Int, Int), + stderrPipe: (Int, Int) + ): F[NativeProcess] = F.blocking { + Zone { implicit z => val envMap = if (process.inheritEnv) sys.env ++ process.extraEnv @@ -110,49 +114,42 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { } argv(allArgs.length.toULong) = null - val executable = findExecutable(process.command).getOrElse(process.command) - - fork() match { - case -1 => - closeAll( - stdinPipe(0), - stdinPipe(1), - stdoutPipe(0), - stdoutPipe(1), - stderrPipe(0), - stderrPipe(1) - ) - throw new IOException("Unable to fork process") + val executable = + if (process.command.startsWith("/")) + process.command + else + findExecutable(process.command).getOrElse(process.command) + val ret = guard(fork()) + ret match { case 0 => - closeAll(stdinPipe(1), stdoutPipe(0), stderrPipe(0)) - if ( - dup2(stdinPipe(0), STDIN_FILENO) == -1 || - dup2(stdoutPipe(1), STDOUT_FILENO) == -1 || - dup2(stderrPipe(1), STDERR_FILENO) == -1 - ) { - _exit(1) - throw new IOException("Unable to redirect file descriptors") - } - closeAll(stdinPipe(0), stdoutPipe(1), stderrPipe(1)) + closeAll(stdinPipe._2, stdoutPipe._1, stderrPipe._1) + guard_(dup2(stdinPipe._1, STDIN_FILENO)) + guard_(dup2(stdoutPipe._2, STDOUT_FILENO)) + guard_(dup2(stderrPipe._2, STDERR_FILENO)) + closeAll(stdinPipe._1, stdoutPipe._2, stderrPipe._2) process.workingDirectory.foreach { dir => if ((dir != null) && (dir.toString != ".")) { - val ret = chdir(toCString(dir.toString)) - if (ret != 0) - throw new IOException(s"Failed to chdir to ${dir.toString}") + guard_(chdir(toCString(dir.toString))) } } execve(toCString(executable), argv, envp) _exit(127) - throw new IOException(s"Failed to create process for command: ${process.command}") + throw new AssertionError("unreachable") case pid => - closeAll(stdinPipe(0), stdoutPipe(1), stderrPipe(1)) + closeAll(stdinPipe._1, stdoutPipe._2, stderrPipe._2) + val pidfd = + if (LinktimeInfo.isLinux) { + val fd = LibC.pidfd_open(pid, 0) + if (fd >= 0) Some(fd) else None + } else None NativeProcess( pid = pid, - stdinFd = stdinPipe(1), - stdoutFd = stdoutPipe(0), - stderrFd = stderrPipe(0) + stdinFd = stdinPipe._2, + stdoutFd = stdoutPipe._1, + stderrFd = stderrPipe._1, + pidfd ) } } @@ -177,101 +174,90 @@ private[process] trait ProcessesCompanionPlatform extends Processesjvmnative { } } - Resource.make(createProcess())(cleanup).map { nativeProcess => - new UnsealedProcess[F] { - def isAlive: F[Boolean] = F.delay { - kill(nativeProcess.pid, 0) == 0 || errno.errno != EPERM + pipesResource.flatMap { case (stdinPipe, stdoutPipe, stderrPipe) => + Resource + .make(createProcess(stdinPipe, stdoutPipe, stderrPipe))(cleanup) + .flatMap { nativeProcess => + nativeProcess.pidfd match { + case Some(pidfd) => + for { + poller <- Resource.eval(fileDescriptorPoller[F]) + handle <- poller.registerFileDescriptor(pidfd, true, false).mapK(LiftIO.liftK) + } yield (nativeProcess, Some(handle)) + case None => + Resource.pure((nativeProcess, None)) + } } + .map { case (nativeProcess, pollHandleOpt) => + new UnsealedProcess[F] { + def isAlive: F[Boolean] = F.delay { + kill(nativeProcess.pid, 0) == 0 || errno.errno == EPERM + } - def exitValue: F[Int] = - if (LinktimeInfo.isLinux) { - F.delay(pidFd.pidfd_open(nativeProcess.pid, 0)).flatMap { pidfd => - if (pidfd >= 0) { - fileDescriptorPoller[F].flatMap { poller => - poller - .registerFileDescriptor(pidfd, true, false) - .use { handle => - handle.pollReadRec(()) { _ => + def exitValue: F[Int] = + if (LinktimeInfo.isLinux) { + (nativeProcess.pidfd, pollHandleOpt) match { + case (Some(_), Some(handle)) => + handle + .pollReadRec(()) { _ => IO { val statusPtr = stackalloc[CInt]() val result = waitpid(nativeProcess.pid, statusPtr, WNOHANG) - if (result == nativeProcess.pid) { val exitCode = WEXITSTATUS(!statusPtr) Right(exitCode) } else if (result == 0) { Left(()) } else { - if (errno.errno == ECHILD) { + if (errno.errno == ECHILD) throw new IOException("No such process") - } else { + else throw new IOException( s"waitpid failed with errno: ${errno.errno}" ) - } } } } - } - .to + .to + case _ => + fallbackExitValue(nativeProcess.pid) } } else { fallbackExitValue(nativeProcess.pid) } - } - } else { - fallbackExitValue(nativeProcess.pid) - } - def stdin: Pipe[F, Byte, Nothing] = { in => - in - .through(writeFd(nativeProcess.stdinFd)) - .onFinalize { - F.blocking { - close(nativeProcess.stdinFd) - }.void + def stdin: Pipe[F, Byte, Nothing] = { in => + in + .through(writeFd(nativeProcess.stdinFd)) + .onFinalize { + F.blocking { + close(nativeProcess.stdinFd) + }.void + } } - } - def stdout: Stream[F, Byte] = readFd(nativeProcess.stdoutFd, 8192) - .onFinalize { - F.blocking { - close(nativeProcess.stdoutFd) - }.void - } - def stderr: Stream[F, Byte] = readFd(nativeProcess.stderrFd, 8192) - .onFinalize { - F.blocking { - close(nativeProcess.stderrFd) - }.void + def stdout: Stream[F, Byte] = readFd(nativeProcess.stdoutFd, 8192) + .onFinalize { + F.blocking { + close(nativeProcess.stdoutFd) + }.void + } + + def stderr: Stream[F, Byte] = readFd(nativeProcess.stderrFd, 8192) + .onFinalize { + F.blocking { + close(nativeProcess.stderrFd) + }.void + } } - } + } } } - private def fallbackExitValue(pid: pid_t): F[Int] = { - def loop: F[Int] = - F.blocking { - Zone { _ => - val status = stackalloc[CInt]() - val result = waitpid(pid, status, WNOHANG) - - if (result == pid) { - Some(WEXITSTATUS(!status)) - } else if (result == 0) None - else throw new IOException(s"waitpid failed with errno: ${errno.errno}") - } - }.flatMap { - case Some(code) => F.pure(code) - case None => F.sleep(10.millis) >> loop - } - - loop.onCancel { - F.blocking { - kill(pid, SIGKILL) - () - } - } + private def fallbackExitValue(pid: pid_t): F[Int] = F.blocking { + val status = stackalloc[CInt]() + guard_(waitpid(pid, status, 0)) + WEXITSTATUS(!status) } } } else super.forAsync[F] From 7149372ae285bf6cd23638520cbb455e9f1e3092 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Thu, 7 Aug 2025 20:25:45 +0530 Subject: [PATCH 15/29] change toULong to toUSize --- .../scala/fs2/io/process/ProcessesPlatform.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 866fd0e9dd..366407b657 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -29,7 +29,7 @@ import scala.scalanative.unsafe.* import scala.scalanative.unsigned.* import scala.scalanative.libc.* import scala.scalanative.posix.sys.wait.* -import scala.scalanative.posix.errno.* +import scala.scalanative.posix.errno.{EPERM, ECHILD} import scala.scalanative.meta.LinktimeInfo import scala.scalanative.posix.unistd.* import scala.scalanative.posix.signal.* @@ -101,18 +101,18 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN sys.env ++ process.extraEnv else process.extraEnv - val envp = stackalloc[CString]((envMap.size + 1).toULong) + val envp = stackalloc[CString]((envMap.size + 1).toUSize) envMap.zipWithIndex.foreach { case ((k, v), i) => - envp(i.toULong) = toCString(s"$k=$v") + envp(i.toUSize) = toCString(s"$k=$v") } - envp(envMap.size.toULong) = null + envp(envMap.size.toUSize) = null val allArgs = process.command +: process.args - val argv = stackalloc[CString](allArgs.length.toULong + 1.toULong) + val argv = stackalloc[CString](allArgs.length.toUSize + 1.toUSize) allArgs.zipWithIndex.foreach { case (arg, i) => - argv(i.toULong) = toCString(arg) + argv(i.toUSize) = toCString(arg) } - argv(allArgs.length.toULong) = null + argv(allArgs.length.toUSize) = null val executable = if (process.command.startsWith("/")) From b53f5d957f98f8177ffe54b268e48e69b3b3fe3f Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Thu, 7 Aug 2025 20:41:25 +0530 Subject: [PATCH 16/29] added type --- io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 366407b657..7754f1d1dc 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -95,7 +95,7 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN stdoutPipe: (Int, Int), stderrPipe: (Int, Int) ): F[NativeProcess] = F.blocking { - Zone { implicit z => + Zone { implicit z: Zone => val envMap = if (process.inheritEnv) sys.env ++ process.extraEnv From a9b6ffd2c5e327034245411cb4f7d75f8b84cc2a Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Thu, 7 Aug 2025 22:45:13 +0530 Subject: [PATCH 17/29] added type --- io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 7754f1d1dc..8ac0b0faf6 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -95,7 +95,7 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN stdoutPipe: (Int, Int), stderrPipe: (Int, Int) ): F[NativeProcess] = F.blocking { - Zone { implicit z: Zone => + Zone[NativeProcess] { implicit z => val envMap = if (process.inheritEnv) sys.env ++ process.extraEnv From 881c96d264019f3b6b57d1f7a6864e89cf75886e Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Thu, 7 Aug 2025 23:30:44 +0530 Subject: [PATCH 18/29] added type --- .../src/main/scala/fs2/io/process/ProcessesPlatform.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 8ac0b0faf6..e62253b42f 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -95,7 +95,7 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN stdoutPipe: (Int, Int), stderrPipe: (Int, Int) ): F[NativeProcess] = F.blocking { - Zone[NativeProcess] { implicit z => + val nativeProcess: NativeProcess = Zone { implicit z => val envMap = if (process.inheritEnv) sys.env ++ process.extraEnv @@ -153,6 +153,7 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN ) } } + nativeProcess } def cleanup(proc: NativeProcess): F[Unit] = From b007fbd05f890753aed02b834c966068fae0de3c Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Thu, 7 Aug 2025 23:38:31 +0530 Subject: [PATCH 19/29] added explicit type --- io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index e62253b42f..715abd6951 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -152,7 +152,7 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN pidfd ) } - } + }: NativeProcess nativeProcess } From fad19f10b20e22fa7c0b951ab05cdc3725cbfde6 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Fri, 8 Aug 2025 12:27:34 +0530 Subject: [PATCH 20/29] refactor createprocess --- .../src/main/scala/fs2/io/process/ProcessesPlatform.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 715abd6951..2b709df88b 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -95,7 +95,7 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN stdoutPipe: (Int, Int), stderrPipe: (Int, Int) ): F[NativeProcess] = F.blocking { - val nativeProcess: NativeProcess = Zone { implicit z => + Zone.apply[NativeProcess] { implicit z => val envMap = if (process.inheritEnv) sys.env ++ process.extraEnv @@ -152,8 +152,7 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN pidfd ) } - }: NativeProcess - nativeProcess + } } def cleanup(proc: NativeProcess): F[Unit] = From eaebb468cccb1507917b58f26f73f9a8bbe90500 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Fri, 8 Aug 2025 13:11:22 +0530 Subject: [PATCH 21/29] remove implicit z --- io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 2b709df88b..22c123e297 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -95,7 +95,7 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN stdoutPipe: (Int, Int), stderrPipe: (Int, Int) ): F[NativeProcess] = F.blocking { - Zone.apply[NativeProcess] { implicit z => + Zone { val envMap = if (process.inheritEnv) sys.env ++ process.extraEnv From 4f6cf84f325d4b0af67c9a1378ea55818a4edcc3 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Fri, 8 Aug 2025 13:29:41 +0530 Subject: [PATCH 22/29] use Zone.acquire --- io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 22c123e297..0d3bf8a18e 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -95,7 +95,7 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN stdoutPipe: (Int, Int), stderrPipe: (Int, Int) ): F[NativeProcess] = F.blocking { - Zone { + Zone.acquire { val envMap = if (process.inheritEnv) sys.env ++ process.extraEnv From 8cba5c71f99afde4d8f874f089fffb887ebcb783 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Fri, 8 Aug 2025 15:09:39 +0530 Subject: [PATCH 23/29] remove implicit --- io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 0d3bf8a18e..43c3551aae 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -95,7 +95,7 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN stdoutPipe: (Int, Int), stderrPipe: (Int, Int) ): F[NativeProcess] = F.blocking { - Zone.acquire { + Zone.acquire { z => val envMap = if (process.inheritEnv) sys.env ++ process.extraEnv From 21b69b4f3ee61a07e6005ed13ec39c3ef8e6148b Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Fri, 8 Aug 2025 15:15:34 +0530 Subject: [PATCH 24/29] added implicit --- io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 43c3551aae..529ddd4d67 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -95,7 +95,7 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN stdoutPipe: (Int, Int), stderrPipe: (Int, Int) ): F[NativeProcess] = F.blocking { - Zone.acquire { z => + Zone.acquire { implicit z => val envMap = if (process.inheritEnv) sys.env ++ process.extraEnv From 40b2edfa39126a1bdc81e72052209ceee0ebe30f Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Fri, 8 Aug 2025 15:42:42 +0530 Subject: [PATCH 25/29] changed fallback --- .../fs2/io/process/ProcessesPlatform.scala | 33 ++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 529ddd4d67..7426d9a29a 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -38,6 +38,8 @@ import cats.effect.LiftIO import cats.effect.IO import org.typelevel.scalaccompat.annotation._ import fs2.io.internal.NativeUtil._ +import scala.concurrent.duration.* +import cats.effect.implicits.* @extern @nowarn212("cat=unused") @@ -254,11 +256,34 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN } } - private def fallbackExitValue(pid: pid_t): F[Int] = F.blocking { - val status = stackalloc[CInt]() - guard_(waitpid(pid, status, 0)) - WEXITSTATUS(!status) + private def fallbackExitValue(pid: pid_t): F[Int] = { + def check: F[Option[Int]] = F.blocking { + val status = stackalloc[CInt]() + val result = waitpid(pid, status, WNOHANG) + if (result == pid) Some(WEXITSTATUS(!status)) + else if (result == 0) None + else if (errno.errno == ECHILD) + throw new IOException("No such process") + else + throw new IOException(s"waitpid failed with errno: ${errno.errno}") + } + + def loop: F[Int] = + check.flatMap { + case Some(code) => F.pure(code) + case None => F.sleep(50.millis) >> loop + } + + loop.onCancel { + F.blocking { + kill(pid, SIGKILL) + val status = stackalloc[CInt]() + waitpid(pid, status, WNOHANG) + () + }.void + } } + } } else super.forAsync[F] } From 818c423054ebedff0c66b9a67869438484ec6086 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Mon, 11 Aug 2025 17:21:08 +0530 Subject: [PATCH 26/29] Added Kqueue for macOS --- io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 7426d9a29a..bf91e4f284 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -225,7 +225,7 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN fallbackExitValue(nativeProcess.pid) } } else { - fallbackExitValue(nativeProcess.pid) + fileDescriptorPoller.awaitevent(nativeProcess.pid, -5, 0x0005, 0x80000000).to } def stdin: Pipe[F, Byte, Nothing] = { in => From 43517d9814408dea755321636cd898ec07e74e40 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Mon, 11 Aug 2025 20:56:16 +0530 Subject: [PATCH 27/29] Added Kqueue for macOS --- .../src/main/scala/fs2/io/process/ProcessesPlatform.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index bf91e4f284..7b5b67b519 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -39,6 +39,7 @@ import cats.effect.IO import org.typelevel.scalaccompat.annotation._ import fs2.io.internal.NativeUtil._ import scala.concurrent.duration.* +import cats.effect.unsafe.KqueueSystem.Kqueue import cats.effect.implicits.* @extern @@ -225,7 +226,11 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN fallbackExitValue(nativeProcess.pid) } } else { - fileDescriptorPoller.awaitevent(nativeProcess.pid, -5, 0x0005, 0x80000000).to + fileDescriptorPoller[F] match { + case kq: Kqueue => + kq.awaitEvent(nativeProcess.pid, -5, 0x0005, 0x80000000).to.map(_.toInt) + case _ => fallbackExitValue(nativeProcess.pid) + } } def stdin: Pipe[F, Byte, Nothing] = { in => From b03ac7eec5eff0de2e18d443b2eedb3d1310d3cf Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Mon, 11 Aug 2025 21:04:29 +0530 Subject: [PATCH 28/29] blocking fallback --- .../fs2/io/process/ProcessesPlatform.scala | 30 +++---------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index 7b5b67b519..c544f2c95c 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -261,32 +261,10 @@ private[process] trait ProcessesCompanionPlatform extends ProcessesCompanionJvmN } } - private def fallbackExitValue(pid: pid_t): F[Int] = { - def check: F[Option[Int]] = F.blocking { - val status = stackalloc[CInt]() - val result = waitpid(pid, status, WNOHANG) - if (result == pid) Some(WEXITSTATUS(!status)) - else if (result == 0) None - else if (errno.errno == ECHILD) - throw new IOException("No such process") - else - throw new IOException(s"waitpid failed with errno: ${errno.errno}") - } - - def loop: F[Int] = - check.flatMap { - case Some(code) => F.pure(code) - case None => F.sleep(50.millis) >> loop - } - - loop.onCancel { - F.blocking { - kill(pid, SIGKILL) - val status = stackalloc[CInt]() - waitpid(pid, status, WNOHANG) - () - }.void - } + private def fallbackExitValue(pid: pid_t): F[Int] = F.delay { + val status = stackalloc[CInt]() + guard_(waitpid(pid, status, 0)) + WEXITSTATUS(!status) } } From f3ddaebdabc0d3ec89672694326ef107fbb53696 Mon Sep 17 00:00:00 2001 From: rahulrangers Date: Mon, 11 Aug 2025 21:06:48 +0530 Subject: [PATCH 29/29] remove unused imports --- io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala index c544f2c95c..cced898d4c 100644 --- a/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala +++ b/io/native/src/main/scala/fs2/io/process/ProcessesPlatform.scala @@ -38,9 +38,7 @@ import cats.effect.LiftIO import cats.effect.IO import org.typelevel.scalaccompat.annotation._ import fs2.io.internal.NativeUtil._ -import scala.concurrent.duration.* import cats.effect.unsafe.KqueueSystem.Kqueue -import cats.effect.implicits.* @extern @nowarn212("cat=unused")