4

Given a function f, what's the best way to code a progress bar for f? I.e., have a real-time progress bar that updates during the execution of f. Note that I can't change f (it's a function from another library), so there's no way to insert a pbar.update call in f (hence this is a post regarding progress bars for non-loop functions). Other SO posts have addressed this problem under the condition that you can change the code in f, but I can't find/think of a solution when I don't have access to the contents of f.

Would I have to use threading or multiprocessing to achieve something like this?

Something like:

@progress_bar
def func_wrapper(*args, **kwargs):
    return f(*args, **kwargs)

or:

start_progress_bar()
f()

Any help is appreciated!

UPDATE: I've taken the code provided in @Acorn's answer and rewritten it in decorator form.

import concurrent.futures
import functools
import time

from tqdm import tqdm

def progress_bar(expected_time, increments=10):

    def _progress_bar(func):

        def timed_progress_bar(future, expected_time, increments=10):
            """
            Display progress bar for expected_time seconds.
            Complete early if future completes.
            Wait for future if it doesn't complete in expected_time.
            """
            interval = expected_time / increments
            with tqdm(total=increments) as pbar:
                for i in range(increments - 1):
                    if future.done():
                        # finish the progress bar
                        # not sure if there's a cleaner way to do this?
                        pbar.update(increments - i)
                        return
                    else:
                        time.sleep(interval)
                        pbar.update()
                # if the future still hasn't completed, wait for it.
                future.result()
                pbar.update()

        @functools.wraps(func)
        def _func(*args, **kwargs):
            with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
                future = pool.submit(func, *args, **kwargs)
                timed_progress_bar(future, expected_time, increments)

            return future.result()

        return _func

    return _progress_bar


if __name__ == "__main__":
    @progress_bar(expected_time=11)
    def test_func():
        time.sleep(10)
        return "result"

    print(test_func())  # prints "result"

3
  • Do you want your progress bar to update each time f is called, or each time that a loop occurs within f? The former is what possible with your Something like comment, whereas the latter would require f to support a callback or, as you mentioned, access to the contents of f. Commented Nov 23, 2019 at 23:45
  • You could probably achieve the latter with some kind of monkeypatching.. Commented Nov 23, 2019 at 23:46
  • This would really depend on what "f" is. The function call will first be executing, after some amount of time it may return. It is not guaranteed that the idea of a "progress bar" even makes sense for a function, and therefore your progress bar has to be specifically designed around how this function works. There won't be a "generic" solution to this problem Commented Nov 23, 2019 at 23:58

1 Answer 1

5

If the function doesn't allow you to take action after units of work, i.e. by exposing a generator interface or callback of some sort, then the only solution would be to use a modified version of the function, or do some kind of monkeypatching.

The solution would be specific to the code in question.


Update:

So if you don't mind the progress bar not accurately reflecting the progress, and just using a time estimate you could do something like this.

import concurrent.futures
import time

from tqdm import tqdm


def timed_future_progress_bar(future, expected_time, increments=10):
    """
    Display progress bar for expected_time seconds.
    Complete early if future completes.
    Wait for future if it doesn't complete in expected_time.
    """
    interval = expected_time / increments
    with tqdm(total=increments) as pbar:
        for i in range(increments - 1):
            if future.done():
                # finish the progress bar
                # not sure if there's a cleaner way to do this?
                pbar.update(increments - i)
                return
            else:
                time.sleep(interval)
                pbar.update()
        # if the future still hasn't completed, wait for it.
        future.result()
        pbar.update()


def blocking_job():
    time.sleep(2)
    return 'result'


def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
        future = pool.submit(blocking_job)
        timed_future_progress_bar(future, 5)
    print(f'Work done: {future.result()}')

main()

This should behave sensibly whether the job takes more or less time than expected. If the job runs longer than expected then the progress will wait at 90% until it completes.

Sign up to request clarification or add additional context in comments.

5 Comments

Thanks for responding. What if I knew how much time f would take? Would there be any way to run a progress bar for a set amount of time parallel to the execution of f?
@RyanPark, updated my answer with an asyncio based solution, and running the blocking f in a thread.
Simplified my solution to avoid using unnecessary asyncio.
Thanks for the help. I've never used concurrent before; would you mind explaining the line with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: for future reference? Why is max_workers set to 1? Sorry if it seems obvious to you, I just don't have any knowledge of concurrency at all.
max_workers is the number of threads that can run simultaneously. Seeing as only a single job is being run, that can be 1. The ThreadPoolExecutor basically just allows you to give it a function, and it makes sure it gets run on a separate thread, and returns the result to you, using the convenient futures interface.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.