Consider I have a reactive, push-style API such as Files.walkFileTree() from NIO.2. Now, I want to transform this API to produce a lazy sequence of items (Sequence<T>) instead.
This is fully possible with a background thread and a SynchronousQueue, e.g.:
typealias Callback<T> = (T) -> Unit
fun <T : Any> sequenceOf(callbackInvoker: (Callback<T>) -> Unit): Sequence<T> =
sequence {
val queue = SynchronousQueue<Optional<T>>()
val t = thread {
callbackInvoker { item ->
queue.put(Optional.of(item))
}
queue.put(Optional.empty())
}
try {
do {
val item = queue.take()
if (item.isPresent) {
yield(item.get())
}
} while (item.isPresent)
} finally {
t.join()
}
}
The above code then could be invoked as
sequenceOf { onFile: (Path) -> Unit ->
Files.walkFileTree(Path(""), object : SimpleFileVisitor<Path>() {
override fun visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult {
onFile(file)
return FileVisitResult.CONTINUE
}
})
}.forEach(::println)
Is the above possible without using any background thread? What is the idiomatic way to implement the above in Kotlin (probably with SequenceScope or, alternatively, with coroutines and suspending functions)?
SynchronousQueuehas no capacity, so the data producer effectively pauses insideQueue.put()until the previous item is consumed. The above also applies toChannel-based implementation withcapacity = RENDEZVOUS.Sequence, how about getting aFlowinstead?callbackInvokeris non-reentrant native code, which will result in aSIGSEGVif accidentally invoked from more than a single thread.