0

how can i update a shared variable between different threading.Thread in python?

lets say that i have 5 threads working down a Queue.Queue(). after the queue is done i want to do an other operation but i want it to happen only once.

is it possible to share and update a variable betweeen the threads. so when Queue.empty() is True this event gets fired but if one of the threads is doing it i dont want the others to do do that too because i would get wrong results.

EDIT
i have a queue which reflects files on the filesystem. the files are uploaded to a site by the threads and while each thread is uploading the file it updates a set() of keywords i got from the files.
when the queue is empty i need to contact the site and tell it to update the keyword counts. right now each thread does this and i get an update for each thread which is bad. i also tried to empty the set but it doesnt work.

    keywordset = set()
    hkeywordset = set()
    def worker():
        while queue:
            if queue.empty():
                if len(keywordset) or len(hkeywordset):
                    # as soon as the queue is empty we send the keywords and hkeywords to the
                    # imageapp so it can start updating 
                    apiurl   = update_cols_url
                    if apiurl[-1] != '/':
                        apiurl = apiurl+'/'
                    try:
                        keywords = []
                        data = dict(keywords=list(keywordset), hkeywords=list(hkeywordset))
                        post = dict(data=simplejson.dumps(data))
                        post = urllib.urlencode(post)
                        urllib2.urlopen(apiurl, post)
                        hkeywordset.clear()
                        keywordset.clear()
                        print 'sent keywords and hkeywords to imageapp...'
                    except Exception, e: print e
            # we get the task form the Queue and process the file based on the action
            task = queue.get()
            print str(task)
            try:
                reindex = task['reindex']
            except:
                reindex = False
            data = updater.process_file(task['filename'], task['action'], task['fnamechange'], reindex)
            # we parse the images keywords and hkeywords and add them to the sets above for later 
            # processing
            try:
                for keyword in data['keywords']:
                    keywordset.add(keyword)
            except: pass
            try:
                for hkw in data['hkeywords']:
                        hkeywordset.add(hkw)
            except:pass
            queue.task_done()


    for i in range(num_worker_threads):
        t = threading.Thread(target=worker)
        t.daemon = True
        t.start()

    while 1:
        line = raw_input('type \'q\' to stop filewatcher... or \'qq\' to force quit...\n').strip()

this is what i was trying basically. but of course the part of queue.empty() gets exectued as many times as threads i have.

3 Answers 3

0

Why can't you just add the final step to the queue ?

Sign up to request clarification or add additional context in comments.

1 Comment

thats what im trying to with empty() where i process data in a set() and do the final test. the problem is that if i have multiple threads this final step is processed once for each thread. i want it to happen only once.
0

Have another queue where you place this event after first queue is empty.
Or have special thread for this event.

Comments

0

If you are using a queue to run your thread (thread pool) so you are making sure that there will not be a race condition (thread safe) because the queue run your thread in a sequential way, so i think you can share a variable between the thread and you can be sure that there will not be a race condition over this variable.

Edit : Here is something similar about what you want to do hope this can give you answer to your question this time :) :

import Queue
import threading
import ftplib
import os


class SendFileThread(threading.Thread):
     """ Thread that will handle sending files to the FTP server"""

     # Make set of keywords a class variable.
     Keywords = set()

     def __init__(self, queue, conn):

          self.conn = conn   
          self.queue = queue

          threading.Thread.__init__(self)

      def run(self):
          while True:
              # Grabs file from queue.
              file_name = self.queue.get()

              # Send file to FTP server.
              f=open(file_name,'rb')
              self.conn.storbinary('STOR '+os.path.basename(file_name),f)

              # Suppose that this keywords are in the first line.
              # Update the set of keywords.
              SendFileThread.Keywords.update(f.readline().split(" ")))

              # Signals to queue job is done.
              self.queue.task_done()


def main():
     # Files to send.
     files = os.listdir('/tosend')

     queue = Queue.Queue()

     # Connect to the FTP server.
     conn = ftplib.FTP('ftp_uri')   
     conn.login()

     # Create 5 threads that will handle file to send.
     for i in range(5):
         t = SendFileThread(queue, conn)
         t.start()

     # Fill the queue with files to be send.   
     for file in files:
         queue.put(file)

     # Wait until or thread are finish
     queue.join()

     # Send the keywords to the FTP server.
     # I didn't understand well the part update keywords count, 
     # how this count is stored ...
     # Here i will just send the keywords to the FTP server.
     with open("keywords", "w") as keywords_file
         keywords_file.write(";".join(SendFileThread.Keywords))
         conn.storbinary('STOR '+os.path.basename("keywords"),
                          keywords_file)

     conn.close()


if __name__ == '__main__':
     main()

4 Comments

sorry but i really don't understand what you mean.
@aschmid00 : sorry but i really don't understand what you mean it's the same for me ? i meant to help but i think rather that i just make you more confused, i think i will delete this answer unless you give us more detail about what you want to do and maybe some code to understand more the problem , Hope you find your answers :)
yes this helped to understand it a bit more. the difference to mine is that the threads are always alive because im listening to filesystem events and upload a file if it is dropped in the watched folder. an eventwatcher adds the tasks to the queue and the threads work on the tasks.
...but i still didn't find a real solution.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.