0

I have a repository that returns Flow<Status<T>>. Status can be Status.Loading or Status.Data. I would like the Flow to emit Status.Loading in case the repository has not yet returned any values for X milliseconds.

In pseudo code it would look something like this:

repository.fetchData()
  .emitDataOrLoading()
  .collect {
     when (it) {
       is Status.Loading -> // show loading indicator in case no data after X milliseconds
       is Status.Data -> // We have received the data.
     } 
  }

So I am looking for how to implement .emitDataOrLoading()

7
  • What is fetchData()? A Flow that emits values every time a repository value changes? Or just a suspend function that does a one-time fetch? If it's a Flow that continually monitors the database, do you only want to show the Loading state if it's the first value that takes a long time? Commented May 8, 2023 at 14:53
  • I guess, you can use debounce operator while emitting at repository. Commented May 8, 2023 at 15:04
  • @Tenfour04 Good question! In this case, it would mostly be a one-time fetch from a HTTP REST endpoint. So basically, it would for example return a page's data and we want to show the loading indicator while waiting for the page to load. Commented May 8, 2023 at 15:05
  • @VivekGupta Can you elaborate? Commented May 8, 2023 at 15:06
  • The problem with debouncing is that when the fetched value does arrive, it will be delayed downstream by the debounce interval. Commented May 8, 2023 at 15:21

2 Answers 2

1

This can be created using the channelFlow builder. Maybe there is a simpler way with operators but I couldn't come up with one.

If fetchData() is a one-time fetching suspend function, you could do:

fun <R, T: R, L: R> lazyFlowOfWithLoadStateIfDelayed(cutoffDelay: Long, loadState: L, fetchAction: suspend ()->T): Flow<R> = channelFlow {
    val sendLoadingJob = launch {
        delay(cutoffDelay)
        send(loadState)
    }
    val value = fetchAction()
    sendLoadingJob.cancelAndJoin()
    send(value)
}

Usage:

lazyFlowOfWithLoadStateIfDelayed(1000L, Status.Loading) { Status.Data(repository.fetchData()) }
  .collect {
     when (it) {
       is Status.Loading -> // show loading indicator in case no data after X milliseconds
       is Status.Data -> // We have received the data.
     } 
  }

Edit: simpler way to build the above using debounce, based on Vivek Gupta's comments and answer. This works because debounce is optimized to pass through the last value of a finite flow immediately.

fun <R, T: R, L: R> lazyFlowOfWithLoadStateIfDelayed(cutoffDelay: Long, loadState: L, fetchAction: suspend ()->T): Flow<R> = flow {
    emit(loadState)
    emit(fetchAction())
}.debounce(cutoffDelay)

If fetchData() is a flow, for example a flow that infinitely monitors a database, you could build a flow operator that shows a load state only if the first item is slow:

fun <R, T: R, L: R> Flow<T>.emitLoadStateIfDelayed(cutoffDelay: Long, loadState: L): Flow<R> = channelFlow {
    val sendLoadingJob = launch {
        delay(cutoffDelay)
        send(loadState)
    }
    collect {
        sendLoadingJob.cancelAndJoin()
        send(it)
    }
}
Sign up to request clarification or add additional context in comments.

8 Comments

Thanks a lot! These solutions make complete sense to me. I have completely missed channelFlow which I think is why I couldn't come up with a solution. I need to read up on it. Cheers!
I don't see why you need channelFlow. The sends you write have a clear happens-before relationship.
@GeorgeLeung What alternative are you suggesting? We don't know if the fetched value comes before or after the sent loading state.
The use of a channel makes concurrent emissions safe. But in your code the sends aren't concurrent, so I thought they can be replaced with simple flow and emit. But I tested that and you are right, calling emit from another coroutine is prohibited.
Yes, the FlowCollector of a regular flow builder is not thread-safe so it prohibits emission from other coroutines. To make the delay concurrent, we need another coroutine, so channelFlow becomes necessary.
|
1

Assuming you have a sealed class

 sealed class Status<out T> {
    object Loading : Status<Nothing>()
    data class Data<T>(val data: T) : Status<T>()
}

And your fetchData() is

suspend fun <T> fetchData(): T {
        delay(Random.nextLong(1000, 5000)) // simulate network delay
    return "data" as T
}

And Repository method which is emitting flow

fun <T> getData(): Flow<Status<T>> = flow {
    emit(Status.Loading)
    val data = fetchData<T>()
    emit(Status.Data(data))
}

you can create a method with timeout this using debounce. It will not delay the successful response. getDataWithLoadingDelay will only delay the initial emission of a data value from the getData flow until the debounce time has elapsed. Once a data value has been emitted by the getData flow, it will be immediately emitted as a Status.Data value by the getDataWithLoadingDelay flow without any further delay.

fun <T> getDataWithLoadingDelay(delayMs: Long): Flow<Status<T>> = getData<T>()
    .debounce(delayMs)
    .map { status ->
        if (status is Status.Data) {
            status
        } else {
            Status.Loading
        }
    }

If no value is emitted during the debounce period, it emits a Status.Loading value to indicate that the data is still being fetched.

1 Comment

Oh right, because debounce is smart enough to emit the last value immediately if the source if finite and reaches the end. That is probably a cleaner way to build the first version in my answer.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.