Question: Multiprocessing in Python with subprocess to align several files with coding DNA seqences
2
gravatar for ta2007
22 months ago by
ta200730
United States
ta200730 wrote:

Hi! I know this question is very Python heavy and I have already posted it on Stackoverflow but since it's a bioinformatics problem, I wanted to ask the biologists for suggestions. I have a Python3 script that uses subprocess.call to run MACSE to align CDSs in about 2,300 input files in a directory and there are two output files for each CDS file. I have these two outputs (nucleotide and aa alignment files) going into two different directories. As smart as MACSE is, it is slow and the script can take more than a month to finish aligning all 2300 CDS files. I would like to learn how to multiprocess my script so several files can be processed at the same time using multiple cores. None of the input and output files should interact with each other so each core will be doing it's own independent task. I have been reading on the multiprocess library in Python but it might be too advanced for me to understand. Below is the script if anyone has suggestions. Thanks so much!

The Script:

import os
import subprocess
import argparse


parser = argparse.ArgumentParser(description="Script aligns CDS files.")
parser.add_argument('--root', default="~/testing_macse/", help="PATH to input dir.")
parser.add_argument('--align_NT_dir', default="~/testing_macse/NT_aligned/", help="PATH to the output directory for NT aligned CDS orthogroup files.")
parser.add_argument('--align_AA_dir', default="~/testing_macse/AA_aligned/", help="PATH to the output directory for AA aligned CDS orthogroup files.")
args = parser.parse_args()


def runMACSE(input_file, NT_output_file, AA_output_file):
    MACSE_command = "java -jar ~/bin/MACSE/macse_v1.01b.jar "
    MACSE_command += "-prog alignSequences "
    MACSE_command += "-seq {0} -out_NT {1} -out_AA {2}".format(input_file, NT_output_file, AA_output_file)
    # print(MACSE_command)
    subprocess.call(MACSE_command, shell=True)

Orig_file_dir = args.root
NT_align_file_dir = args.align_NT_dir
AA_align_file_dir = args.align_AA_dir

try:
    os.makedirs(NT_align_file_dir)
    os.makedirs(AA_align_file_dir)
except FileExistsError as e:
    print(e)

for currentFile in os.listdir(args.root):
    if currentFile.endswith(".fa"):
        runMACSE(args.root + currentFile, args.align_NT_dir + currentFile[:-3]+"_NT_aligned.fa", args.align_AA_dir +   currentFile[:-3]+"_AA_aligned.fa")
ADD COMMENTlink modified 21 months ago • written 22 months ago by ta200730

You might consider a workflow system like snakemake for this type of thing, particularly if you have access to a compute cluster.

ADD REPLYlink written 22 months ago by Sean Davis23k
1
gravatar for ta2007
21 months ago by
ta200730
United States
ta200730 wrote:

Hi Sean and John. Thanks for your input. Here is what I got with some more help from a friend along with your suggestions. Just wanted to share what we came up with. Thanks for your help!

 

 

 

ADD COMMENTlink written 21 months ago by ta200730
0
gravatar for Sean Davis
22 months ago by
Sean Davis23k
National Institutes of Health, Bethesda, MD
Sean Davis23k wrote:

Just replace your last loop with a call to multiprocessing map().  We need to do a little setup, though.  Note that I have not tested this, so you may have to tinker a bit. 

# map requires just one input argument, so
# wrap your original function to take a tuple
# and expand it.
def wrapRunMACSE(args):
    return(runMACSE(*args))

from multiprocessing import Pool

# Put the arguments into a list of tuples
# Each tuple will be handed to the wrapper.
args = [(args.root + currentFile, args.align_NT_dir + currentFile[:-3]+"_NT_aligned.fa", args.align_AA_dir +   currentFile[:-3]+"_AA_aligned.fa")]

p = Pool(4) #specify the number of cores you want to use

# and run all the files, in parallel.
p.map(wrapRunMACSE,args)

References:

http://stackoverflow.com/questions/5442910/python-multiprocessing-pool-map-for-multiple-arguments

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool

ADD COMMENTlink modified 22 months ago • written 22 months ago by Sean Davis23k
0
gravatar for John
22 months ago by
John12k
Germany
John12k wrote:

Using multiprocessing and map is definitely one good way to go. Personally I don't like multiprocessing because i never seem to have much luck with it, but having said that I always use python 2.7 and it appears from Sean's links that map and multiprocessing are better now in 3.x

Here's how I would do it in 2.7 (and its 3 compatible), but it's certainly not any clearer than Sean's way.

import os
import subprocess
import argparse
from __future__ import print_function

## Define user arguments.
parser = argparse.ArgumentParser(description="Script aligns CDS files.")
parser.add_argument('-i', '--input', default="~/testing_macse/", help="PATH to input dir.")
parser.add_argument('-c', '--cpu', default=1, type=int, help="Number of processes to use")
parser.add_argument('--MACSE', default="~/bin/MACSE/macse_v1.01b.jar", help="PATH to MACSE.")
parser.add_argument('--align_NT_dir', default="~/testing_macse/NT_aligned/", help="PATH to the output directory for NT aligned CDS orthogroup files.")
parser.add_argument('--align_AA_dir', default="~/testing_macse/AA_aligned/", help="PATH to the output directory for AA aligned CDS orthogroup files.")
parser.add_argument('-d', '--debug', help="Turn debug mode on.")
args = parser.parse_args()

## Make output directories if they dont already exist.
try:
    os.makedirs(NT_align_file_dir)
    os.makedirs(AA_align_file_dir)
except FileExistsError as e:
    if args.debug: print(e)

## Create a list of input files (full path)
inputs = []
for filePath in os.listdir(args.root):
    fullFilePath = os.join(args.root,filePath)
    if fullFilePath.endswith(".fa"):
        inputs.append(fullFilePath)
    else:
        print('Skipping: ' + fullFilePath)

## Define a function that waits until the number of subprocess
## drops to (or below) a given value before firing another one off.
def wait(processes):
    while len(subprocesses) > int(processes):
        time.sleep(5) # poll subprocesses every 5 seconds
        remove = []   # list of processes completed
        for key,status in subprocesses.items():
            if status.poll() != None: remove.append(key) # If poll returns something other
        for key in remove: del subprocesses[key]         # than None, the process has finished.

## Main loop where we fire off subprocesses, but always keeping under X processes running at a time.
subprocesses = {}
for index,fullFilePath in enumerate(inputs):
    wait(args.cpu -1) # wait here until we have atleast 1 less process than desired by --cpu.
    command = ' '.join(['java -jar',args.MACSE,'-prog alignSequences -seq',fullFilePath,'-out_NT',args.align_NT_dir,'-out_AA',args.align_AA_dir])
    if args.debug: print(command)
    subprocess.call(command,shell=True)
    print('Processing ' + str(index+1) + ' out of ' + str(len(inputs)))

# Wait for all subprocesses to end.
wait(0)
print('All done! :)')
 

Edit: I do wish code wouldn't wrap on Biostars... :)

ADD COMMENTlink modified 22 months ago • written 22 months ago by John12k
Please log in to add an answer.

Help
Access

Use of this site constitutes acceptance of our User Agreement and Privacy Policy.
Powered by Biostar version 2.3.0
Traffic: 900 users visited in the last hour