1
            from queue import Queue
            import threading
            import time

            queue1 = Queue()


            # this function should finish its work and restart with fresh queue1
            def association_helper():
                # this program should get the whole data from the queue, add into the list and print it. again it starts with
                # remaining items in queue (the items which was inserting when this function printings the value)
                lock = threading.Lock()
                lock.acquire()
                items = []
                print("Start..")
                while True:
                    if queue1.qsize()>0:
                        print("Line no 13:", queue1.qsize())
                        SizeofQueue1 = queue1.qsize()
                        for i in range(SizeofQueue1):
                            items.append(queue1.get())
                        queue1.task_done()
                        print("Line no 19:", len(items))
                        print(items)
                        print("Line no 25: done")
                        time.sleep(0.1)
                lock.release()


            i = 0


            def main():
                global i
                # continuous data coming and adding in queue
                while True:
                    queue1.put([i])
                    i += 1


            if __name__ == '__main__':
                # main thread will always run (adding the items in the queue)
                f_thread = threading.Thread(target=association_helper)
                f_thread.daemon = True
                f_thread.start()
                main()


    output:

    Start... 
    Line no 13: 1415 
    Line no 19: 3794 
    Line no 25: done
    Line no 13: 40591 
    Line no 19: 41856 
    Line no 25: done 
    Line no 13: 78526


as per expectations, the line no 13 and line no 19 should be same. also, after, line no 25, it should print Start..(because association_helps should finish its execution and run again)

Why association_helper function is running one time? why it is not finishing its work and restarting again with fresh remaining items in Queue?

Motivation:

  1. queue1 will always be adding new items in the main thread.
  2. when sizeof(queue1)>0, association_helper should extract the whole data from queue1 and process with data.
  3. But adding items in the queue should be continued
  4. Once association_helper finishes its execution, it should start fresh with the new items in the queue.

1 Answer 1

1

Let start at the end:

as per expectations, the line no 13 and line no 19 should be same.

Due to the fact that you get from the queue in one thread and inserting (put) to it on another without using any Lock you should not expect that between two line (in the thread function) nothing will be added to the queue. That is what you are seeing, printing the size in line 13 and getting the size in line 14 resulting with different values.

Also, after, line no 25, it should print Start..(because association_helps should finish its execution and run again)

You print("Start..") before entering the while True loop. There for you will not see any more of that print, unless you will call this function again.


The following are explanations and examples for how to resolve the races in the put/get threaded queue:

Declare the lock as a global variable.

lock1 = threading.Lock()

Using this lock now let's ensure that the size of the queue and the expected len(items) will results with the same value.

with lock1:
    print("Line no 13:", queue1.qsize())
    SizeofQueue1 = queue1.qsize()
# and
with lock1:
    queue1.put([i])

This will results with - the same expected size.:

Line no 13: 9
Line no 19: 9
[[1], [2], [3], [4], [5], [6], [7], [8], [9]]
Line no 25: done

Regarding the print("Start.."), you can just insert it to the while loop so it will printed between iterations.

while True:
        print("Start..")
        if queue1.qsize()>0:
            # The rest of the code

Finally, If you want the items list to contain only the items from the current iteration you need to clear it. If you wold not clear the list between two iteration the difference will just get bigger and bigger.

list.clear() Remove all items from the list. Equivalent to del a[:].

And you will results with:

while True:
        print("Start..")
        items.clear()
        if queue1.qsize()>0:
            # The rest of the code
Sign up to request clarification or add additional context in comments.

4 Comments

Hi @Amiram: Thank you, it helps a lot. I have two question to ask: 1. How the same "lock1=Threading.Lock()" is behaving in both threads at a time? 2. how/when Lock will release in one thread and process in other If we are not intentionally giving any time interval to lock/unlock? thank you.
That nice questions, there no space to elaborate in the comments window, but in short: 1. Due to the fact that this is one process both threads have an access to the process memory so both of them can use it, this is why the Lock has been created in the first place to synchronize between different threads. 2. When one thread is acquire the Lock the seconds one unable to get the code that is "protected" by that lock until the first will release it. In your example between the put and the i+=1 thread1 is not having the Lock it give thread2 the opportunity to acquire it.
It is difficult in real-time application with one lock in two different thread, Is there any way to get the same output with No Lock/ one Lock? Sometimes one threading is not releasing the lock and We are unable to see the expected output. Can we use some kind of timer, and Event concept where we can expect the same output.? Thank you.
Referring to the above example you are get the size of the queue just once keeping it and working with it in the current loop iteration. In this case you will not need Locks, Events or Timers. For example: current_q_size = queue1.qsize() and work with it at any other place you wish to use the size in the loop current iteration,

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.