0

As a newbie to the programming, I'm still exploring the concepts of multiprocessing and multithreading.

I've written small script that reads a file and copies the files to multiple temporary folders and does the following actions on each folder.

  1. build a label.
  2. Generate an package
  3. Push it to the Nexus.

There are ~500 folders & are processed sequentially. How can I use multiprocessing here, thereby process 100 folders at a time parallel or increase the number still. Also, is it possible to keep track of those process and fail the build even one sub process fails.

I read multiple articles on multiprocessing, but couldn't wrap my head around it :(

Any guidance would be of great help to me, thanks.

folder1
   -- war file
   -- metadata

folder 2
   -- war file
   -- metadata
....
....

folder 500
   -- war file
   -- metadata

Code snippet

import re, shutil, os
from pathlib import Path

target = "/home/work"
file_path = target + "/file.txt"

dict = {}
count = 1

def commands_to_run_on_each_folder(filetype, tmp_folder):
    target_folder = tmp_folder+'/tmp'+str(count)

    os.system(<1st command to build the label>)
    os.system(<2nd command to build the package>)
    <multiple file manipulations, where `filetype` is used and get the required file with right extension>
    <curl command to upload it to the Nexus>

#Read the text file and assemble it in a dictionary.
with open(file_path, 'r') as f:
    lines = f.read().splitlines()
    for i, line in enumerate(lines):
        match = re.match(r".*.war", line)
        if match:
            j = i-1 if i > 1 else 0
            for k in range(j, i):
                dict[match.string] = lines[k]
#Iterate the dictionary and copy the folder to the temporary folders.
for key, value in dict.items():
    os.mkdir(target+'/tmp'+str(count))
    shutil.copy(key, target+'/tmp'+str(count))
    shutil.copy(value, target+'/tmp'+str(count))
    commands_to_run_on_each_folder("war", target)
    count += 1

OS : Ubuntu 18.04 Memory : 22 GB container

8
  • you might want to check out ProcessPoolExecutor. I like the concurrent.futures library because it's pretty high level, and generally straightforward enough to use. Commented Sep 21, 2021 at 14:06
  • @rv.kvetch : thanks , i checked those concepts but couldn't convert as a code. Could you kindly help me with pseudocode, from there I can build upon.. Commented Sep 21, 2021 at 14:10
  • Sure, I could help out with pseudocode, but first I'd suggest you actually try out the example in the docs. I feel like it should be straightforward once you actually try it out. docs.python.org/3/library/… Commented Sep 21, 2021 at 14:13
  • great, let me check that. Commented Sep 21, 2021 at 14:14
  • dict is a keyword, please do not use it for variable name. Commented Sep 21, 2021 at 14:22

2 Answers 2

1

Using concurrent.futures is easy. I have modified your script to become:

#!/usr/bin/env python3
import itertools
import concurrent.futures
import logging
import pathlib
import re
import shutil


logging.basicConfig(
    level=logging.DEBUG,
    format="%(levelname)s:%(processName)s:%(message)s"
)


def worker(path1, path2, src, target, logger):
    logger.debug("Create dir %s", target)
    target.mkdir(exist_ok=True)

    logger.debug("Copy files")
    shutil.copy(src / path1, target / path1)
    shutil.copy(src / path2, target / path2)

    logger.debug("Additional commands to run on %s", target)
    # TODO: Add actions here
    # commands_to_run_on_each_folder(...)


def main():
    #Read the text file and assemble it in a dictionary.
    tasks = {}
    with open("file.txt", 'r') as f:
        lines = f.read().splitlines()
        for i, line in enumerate(lines):
            match = re.match(r".*.war", line)
            if match:
                j = i-1 if i > 1 else 0
                for k in range(j, i):
                    tasks[match.string] = lines[k]

    logger = logging.getLogger()
    # src: The directory where this script is
    src = pathlib.Path(__file__).parent
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for taskid, (path2, path1) in enumerate(tasks.items(), 1):
            target = pathlib.Path(f"/tmp/dir{taskid}")

            # Calls `worker` function with parameters path1, path2, ...
            # concurrently
            executor.submit(worker, path1, path2, src, target, logger)


if __name__ == "__main__":
    main()

Here is a sample output:

DEBUG:ForkProcess-1:Create dir /tmp/dir1
DEBUG:ForkProcess-1:Copy files
DEBUG:ForkProcess-1:Additional commands to run on /tmp/dir1
DEBUG:ForkProcess-1:Create dir /tmp/dir2
DEBUG:ForkProcess-1:Copy files
DEBUG:ForkProcess-1:Additional commands to run on /tmp/dir2
DEBUG:ForkProcess-1:Create dir /tmp/dir3
DEBUG:ForkProcess-1:Copy files
DEBUG:ForkProcess-1:Additional commands to run on /tmp/dir3
DEBUG:ForkProcess-1:Create dir /tmp/dir4
DEBUG:ForkProcess-1:Copy files
DEBUG:ForkProcess-1:Additional commands to run on /tmp/dir4

Notes

  • I use logging instead of print because logging works better with multi process environment
  • To turn off logging, change the level to logging.WARN
  • I use pathlib because it is more convenient than os.path
  • Note: The submit call will not wait. That means if function worker takes a long time to run, submit will return right away.
  • Using the with construct, the executor will wait for all the concurrent tasks to finish before exiting. This is what you want.
Sign up to request clarification or add additional context in comments.

3 Comments

I added the function in the to do area and while executing the script, its throwing below error.. I tried to debug the issue unfortunately no luck. TypeError: can't pickle _thread.RLock objects Traceback (most recent call last): File "/usr/lib/python3.6/multiprocessing/queues.py", line 234, in _feed obj = _ForkingPickler.dumps(obj) File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) TypeError: can't pickle _thread.RLock objects
When dealing in multi-process environment, parameters you pass between functions will be pickled (using the pickle library). If that parameter cannot be pickled, you get the above error. I suggest to ask that as a separate question and put a link to this comment. Alternatively, you can append to your existing question with more details.
Got it, thank you !!
1

This is not a good target for multiprocessing, but it is a good target for gnu parallel.

Your builds happen in the background: python is just calling system commands. You can certainly make multiple background os.system calls in parallel from python, but this script would be much better off running as a find | parallel paradigm.

What I would do is rewrite the script to process only one folder. Then I would do:

find /path/to/root/folder -type d | parallel --bar -I{} python3 script.py {} \;

Since you are on ubuntu, you have find and parallel already. Note that this is bash, to run in the shell, and not python.

Reasoning against doing this in python

  • don't re-invent the wheel.
  • easily customisable: you can change the number of processes by adding --jobs N
  • your code just calls other processes: you're using python like a scripting language like bash (which is fine), so it makes more sense to think of it as a build script for each folder
  • you get a progress bar and other goodies for free!

On the other hand if you do want to do this in python it is possible.

Note that current wisdom recommends using subprocess over os.system.

7 Comments

Thanks a lot for your inputs and the explanation. sure, would explore it :)
If wanted to do it in Python, for learning sake, which part of my code should I change to accomodate the multiprocessing, that would be of great help here.
Reduce your code to a single function which takes the directory as an input. Then generate a list of directories (hint: look up glob) and map your function over the directory list. As was said above, we could write this for you, but you'll learn better if you go and try the demo examples in the threading/multiprocessing docs, and then work out how to substitute your function for their demo function.
Note that to do this you need to make a function handling a single directory anyway, so you can try both approaches.
Actually I tried and failed, hence reached you experts :(
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.