2

How would I go and create a queue to run tasks in the background in Python?

I have tried via asyncio.Queue() but whenever I use Queue.put(task) it immediately starts the task.

It is for an application which receives an unknown amount of entries (filenames) from a database on a specified time interval. What I wish to accomplish with this backgroundqueue would be that the python application keeps running and keeps returning new filenames. Everytime the application finds new filenames it should handle them by creating a task, which would contain (method(variables)). These tasks should all be thrown into an ever expanding queue which runs the tasks on its own. Here's the code.

class DatabaseHandler:
def __init__(self):
    try:
        self.cnx = mysql.connector.connect(user='root', password='', host='127.0.0.1', database='mydb')
        self.cnx.autocommit = True
        self.q = asyncio.Queue()
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
            print("Something is wrong with your user name or password")
        elif err.errno == errorcode.ER_BAD_DB_ERROR:
            print("Database does not exist")
        else:
            print(err)
    self.get_new_entries(30.0)

def get_new_entries(self, delay):
    start_time = t.time()
    while True:
        current_time = datetime.datetime.now() - datetime.timedelta(seconds=delay)
        current_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
        data = current_time
        print(current_time)
        self.select_latest_entries(data)
        print("###################")
        t.sleep(delay - ((t.time() - start_time) % delay))

def select_latest_entries(self, input_data):
    query = """SELECT FILE_NAME FROM `added_files` WHERE CREATION_TIME > %s"""
    cursor = self.cnx.cursor()
    cursor.execute(query, (input_data,))
    for file_name in cursor.fetchall():
        file_name_string = ''.join(file_name)
        self.q.put(self.handle_new_file_names(file_name_string))
    cursor.close()

def handle_new_file_names(self, filename):
    create_new_npy_files(filename)
    self.update_entry(filename)

def update_entry(self, filename):
    print(filename)
    query = """UPDATE `added_files` SET NPY_CREATED_AT=NOW(), DELETED=1 WHERE FILE_NAME=%s"""
    update_cursor = self.cnx.cursor()
    self.cnx.commit()
    update_cursor.execute(query, (filename,))
    update_cursor.close()

As I said, this will instantly run the task.

create_new_npy_files is a pretty time consuming method in a static class.

5
  • 1
    asynio runs in one thread. It is well suited to tasks with a lot of I/O and not so well suited for CPU intensive tasks. But you still have the option to use a queue and just spawn off one or more workers (which are fed by the queue) as threads. Commented Mar 28, 2018 at 14:06
  • How would I go and do that? This whole async and threaded stuff is still very new to me Commented Mar 28, 2018 at 14:08
  • Just as you would run any other thread. There are some examples. Commented Mar 28, 2018 at 14:12
  • Can't seem to understand how that would work for me honestly. I get that I should use workers, but how can I assign a worker to a asyncio.queue? Commented Mar 28, 2018 at 14:25
  • 1
    Why not just put the file names on the queue? Then you have a pool of asyncio tasks or threads (depending on which you decide to use) that read from the queue. Commented Mar 28, 2018 at 14:55

2 Answers 2

2

There are two problems with this expression:

self.q.put(self.handle_new_file_names(file_name_string))

First, it is actually calling the handle_new_file_names method and is enqueueing its result. This is not specific to asyncio.Queue, it is how function calls work in Python (and most mainstream languages). The above is equivalent to:

_tmp = self.handle_new_file_names(file_name_string)
self.q.put(_tmp)

The second problem is that asyncio.Queue operations like get and put are coroutines, so you must await them.

If you want to enqueue a callable, you can use a lambda:

await self.q.put(lambda: self.handle_new_file_names(file_name_string))

But since the consumer of the queue is under your control, you can simply enqueue the file names, as suggested by @dirn:

await self.q.put(file_name_string)

The consumer of the queue would use await self.q.get() to read the file names and call self.handle_new_file_names() on each.

If you plan to use asyncio, consider reading a tutorial that covers the basics, and switching to an asyncio compliant database connector, so that the database queries play along with the asyncio event loop.

Sign up to request clarification or add additional context in comments.

Comments

1

For people who see this in the future. The answer I marked as accepted is the explanation of how to solve the problem. I'll write down some code which I used to create what I wanted. That is, tasks that should run in the background. Here you go.

from multiprocessing import Queue
import threading

class ThisClass
    def __init__(self):
        self.q = Queue()
        self.worker = threading.Thread(target=self._consume_queue)
        self.worker.start()
        self.run()

The queue created is not a queue for tasks, but for the variables you want to handle.

def run(self):
    for i in range(100):
        self.q.put(i)

Then for the _consume_queue(), which consumes the items in the queue when there are items:

def _consume_queue(self):
    while True:
        number = self.q.get()
        # the logic you want to use per number.

It seems the self.q.get() waits for new entries, even when there are none.

The -simplified- code above works for me, I hope it will also work for others.

6 Comments

This looks incorrect, but fixable. q.put will have no effect unless actually awaited, use q.put_nowait for a non-coroutine entry point. Second, asyncio.Queue is not thread-safe, and it is a serious programming error to invoke any of its methods from any thread except the one that runs the event loop. Even if that approach appears to work now, it will fail in even subtly different circumstances. To safely enqueue something from a different thread, use asyncio.run_coroutine_threadsafe(loop, q.put(...)).
I didn't create an asyncio.Queue, I created an multiprocessing.Queue. Wouldn't that make a difference?
In that case you don't need an asyncio event loop (or indeed any part of asyncio) at all - you can just use Thread(target=self._consume_queue).
You are right, just tested that out. I will edit it in.
This solution is indeed much simpler. The question mentioned asyncio and was tagged python-asyncio, which led me to believe that you were interested to solve the problem with asyncio. asyncio is somewhat specific in that it requires a bit of learning, and basing the application on the async execution model. If you have no interest in using asyncio, please mention that in the question (or don't tag it with python-asyncio), to avoid confusion.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.