1

Consider I have a reactive, push-style API such as Files.walkFileTree() from NIO.2. Now, I want to transform this API to produce a lazy sequence of items (Sequence<T>) instead.

This is fully possible with a background thread and a SynchronousQueue, e.g.:

typealias Callback<T> = (T) -> Unit

fun <T : Any> sequenceOf(callbackInvoker: (Callback<T>) -> Unit): Sequence<T> =
    sequence {
        val queue = SynchronousQueue<Optional<T>>()

        val t = thread {
            callbackInvoker { item ->
                queue.put(Optional.of(item))
            }

            queue.put(Optional.empty())
        }

        try {
            do {
                val item = queue.take()
                if (item.isPresent) {
                    yield(item.get())
                }
            } while (item.isPresent)
        } finally {
            t.join()
        }
    }

The above code then could be invoked as

    sequenceOf { onFile: (Path) -> Unit ->
        Files.walkFileTree(Path(""), object : SimpleFileVisitor<Path>() {
            override fun visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult {
                onFile(file)

                return FileVisitResult.CONTINUE
            }
        })
    }.forEach(::println)

Is the above possible without using any background thread? What is the idiomatic way to implement the above in Kotlin (probably with SequenceScope or, alternatively, with coroutines and suspending functions)?

4
  • @Sweeper, this implementation is as lazy as possible, since SynchronousQueue has no capacity, so the data producer effectively pauses inside Queue.put() until the previous item is consumed. The above also applies to Channel-based implementation with capacity = RENDEZVOUS. Commented May 13 at 9:54
  • 2
    See discuss.kotlinlang.org/t/… You cannot do this with just one thread. Commented May 13 at 9:56
  • 1
    Instead of Sequence, how about getting a Flow instead? Commented May 13 at 10:01
  • @Sweeper, exactly as in the discussion at kotlinlang.org you've referenced, my application is strictly synchronous and single-threaded, and I'd rather leave it as such. The actual callbackInvoker is non-reentrant native code, which will result in a SIGSEGV if accidentally invoked from more than a single thread. Commented May 13 at 10:05

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.