1

I want to investigate multiprocessing. I have 'tar' archive, let's say with 1000 files(indeed there are much more files) and each file has 1000 rows. I need to read each file and each line of the file. I need to return and save info about each file in some 'result' variable(dictionary). I have next code and for some unknown reason it stops after 8 iterations:

class DataProc():
...

def data_proc(self):
    ...
    result = {}
    read_mode = 'r'
    self.tar = tarfile.open(file_path, read_mode)
    for file in self.tar:
        q = Queue()
        p = Process(target=process_tar,
                    args=(file, q))
        p.start()
        tmp_result = q.get()
        for key, item in tmp_result.items():
            '''
            do some logic and save data to result
            '''
            pass
        p.join()
    return result

def process_tar(self, file, q):
    output = {}
    extr_file = self.tar.extractfile(file)
    content = extr_file.readlines()
    '''
    do some data processing with file content
    save result to output
    '''
    q.put(output)

dp = DataProc() 
result = dp.data_proc()

'for file in self.tar' make only 8 iterations. What am I do wrong?

0

1 Answer 1

1

I see few issues in the posted code.

The main process opens the file but do not close it. If you have 1K files, you will run out of file descriptors. Rather pass the file path to the child process and let it open it.

As well, you will spawn 1K processes which is hard to be dealt with on a normal computer. You are managing the processes yourselves, using a Pool would cut lots of complexity simplifying your code.

How large is the output produced by the children processes? If too large, it might be one of the reasons why it gets stuck.

Last thing, mixing OOP and multiprocessing is a fairly error prone practice (AKA do not pass self to the children processes).

Something like this would cut most of the useless complexity (assuming Python 2).

from multiprocessing import Pool

files_path = [ ... list of path of archive files ... ]

def process_archive(file_path):
    with open(file_path):
        ... processing archive ...

pool = Pool()

for result in pool.map(function, files_path):
    ... enjoy the results ...
Sign up to request clarification or add additional context in comments.

1 Comment

output is not large. Thx for the advices. Will try and implement them.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.