Skip to content

Conversation

wForget
Copy link
Member

@wForget wForget commented Sep 30, 2025

What changes were proposed in this pull request?

Avoid NoSuchFileException when traversing walk files iterator to fix flaky test

Why are the changes needed?

I found an the flaky test when running spark sql test cases with datafusion-comet. https://github.com/apache/datafusion-comet/actions/runs/18110382622/job/51535109725

error details:

[info] - cleanup complete but invalid output for aborted job *** FAILED *** (430 milliseconds)
[info]   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /__w/datafusion-comet/datafusion-comet/apache-spark/target/tmp/spark-2cf998bb-fd3c-4621-88d8-05e9decad882/output @#output/.part-00009-cb134e84-5d4a-42d2-a342-b5edc52776ce-c000.snappy.parquet.crc
[info]   at java.base/java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:87)
[info]   at java.base/java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:103)
[info]   at java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1855)
[info]   at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:292)
[info]   at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
[info]   at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:169)
[info]   at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:298)
[info]   at java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
[info]   at scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46)
[info]   at scala.collection.Iterator$$anon$6.hasNext(Iterator.scala:477)
[info]   at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
[info]   at scala.collection.mutable.Growable.addAll(Growable.scala:61)
[info]   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
[info]   at scala.collection.immutable.SetBuilderImpl.addAll(Set.scala:405)
[info]   at scala.collection.immutable.Set$.from(Set.scala:362)
[info]   at scala.collection.IterableOnceOps.toSet(IterableOnce.scala:1469)
[info]   at scala.collection.IterableOnceOps.toSet$(IterableOnce.scala:1469)
[info]   at scala.collection.AbstractIterator.toSet(Iterator.scala:1306)
[info]   at org.apache.spark.sql.streaming.FileStreamSinkSuite.$anonfun$new$52(FileStreamSinkSuite.scala:538)
[info]   at org.apache.spark.sql.streaming.FileStreamSinkSuite.$anonfun$new$52$adapted(FileStreamSinkSuite.scala:505)

Does this PR introduce any user-facing change?

No

How was this patch tested?

I verified it locally using the following code. The previous logic would cause a NoSuchFileException exception, and the changed logic works well.

import java.io.File
import java.nio.file.{Files, Path}
import java.util.concurrent.atomic.AtomicBoolean
import scala.annotation.tailrec
import scala.jdk.CollectionConverters.asScalaIteratorConverter

object Test {

  def main(args: Array[String]): Unit = {
    val testDir = new File("/tmp/test_dir")
    testDir.mkdirs()
    val stopped = new AtomicBoolean(false)
    new Thread(() => {
      while (!stopped.get) {
        val newFile1 = new File(testDir, "newFile1.parquet")
        val newFile2 = new File(testDir, "newFile2.parquet")
        val newFile3 = new File(testDir, "newFile3.parquet")
        newFile1.createNewFile
        newFile2.createNewFile
        newFile3.createNewFile
        newFile1.delete
        newFile2.delete
        newFile3.delete
      }
    }).start()
    try {
      var count = 0
      while (count < 100) {
        count += 1
         // This will throw NoSuchFileException if a file is deleted during the walk
//         val outputFileNames = Files.walk(testDir.toPath).iterator().asScala
//           .filter(_.toString.endsWith(".parquet"))
//           .map(_.getFileName.toString)
//           .toSet

        val fileIter = Files.walk(testDir.toPath).iterator().asScala
        val wrappedIter = new Iterator[Path] {
          @tailrec
          override def hasNext: Boolean = try {
            fileIter.hasNext
          } catch {
            case e if e.getMessage.contains("NoSuchFileException") => hasNext
            case e => throw e;
          }
          override def next(): Path = fileIter.next()
        }
        val outputFileNames = wrappedIter
          .filter(_.toString.endsWith(".parquet"))
          .map(_.getFileName.toString)
          .toSet

        println(outputFileNames)
      }
    } finally {
      stopped.set(true)
    }
  }
}

Was this patch authored or co-authored using generative AI tooling?

No

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant