Multiprocessing in Python with subprocess to align several files with coding DNA seqences
3
2
Entering edit mode
5.7 years ago
ta2007 ▴ 30

Hi! I know this question is very Python heavy and I have already posted it on Stackoverflow but since it's a bioinformatics problem, I wanted to ask the biologists for suggestions. I have a Python3 script that uses subprocess.call to run MACSE to align CDSs in about 2,300 input files in a directory and there are two output files for each CDS file. I have these two outputs (nucleotide and aa alignment files) going into two different directories. As smart as MACSE is, it is slow and the script can take more than a month to finish aligning all 2300 CDS files. I would like to learn how to multiprocess my script so several files can be processed at the same time using multiple cores. None of the input and output files should interact with each other so each core will be doing it's own independent task. I have been reading on the multiprocess library in Python but it might be too advanced for me to understand. Below is the script if anyone has suggestions. Thanks so much!

The Script:

import os
import subprocess
import argparse

parser = argparse.ArgumentParser(description="Script aligns CDS files.")
parser.add_argument('--root', default="~/testing_macse/", help="PATH to input dir.")
parser.add_argument('--align_NT_dir', default="~/testing_macse/NT_aligned/", help="PATH to the output directory for NT aligned CDS orthogroup files.")
parser.add_argument('--align_AA_dir', default="~/testing_macse/AA_aligned/", help="PATH to the output directory for AA aligned CDS orthogroup files.")
args = parser.parse_args()

def runMACSE(input_file, NT_output_file, AA_output_file):
MACSE_command = "java -jar ~/bin/MACSE/macse_v1.01b.jar "
MACSE_command += "-prog alignSequences "
MACSE_command += "-seq {0} -out_NT {1} -out_AA {2}".format(input_file, NT_output_file, AA_output_file)
# print(MACSE_command)
subprocess.call(MACSE_command, shell=True)

Orig_file_dir = args.root
NT_align_file_dir = args.align_NT_dir
AA_align_file_dir = args.align_AA_dir

try:
os.makedirs(NT_align_file_dir)
os.makedirs(AA_align_file_dir)
except FileExistsError as e:
print(e)

for currentFile in os.listdir(args.root):
if currentFile.endswith(".fa"):
runMACSE(args.root + currentFile, args.align_NT_dir + currentFile[:-3]+"_NT_aligned.fa", args.align_AA_dir +   currentFile[:-3]+"_AA_aligned.fa")
alignment Multiprocessing Python3 • 5.4k views
0
Entering edit mode

You might consider a workflow system like snakemake for this type of thing, particularly if you have access to a compute cluster.

1
Entering edit mode
5.7 years ago
ta2007 ▴ 30

Hi Sean and John. Thanks for your input. Here is what I got with some more help from a friend along with your suggestions. Just wanted to share what we came up with. Thanks for your help!

0
Entering edit mode
5.7 years ago

Just replace your last loop with a call to multiprocessing map(). We need to do a little setup, though. Note that I have not tested this, so you may have to tinker a bit.

# map requires just one input argument, so
# wrap your original function to take a tuple
# and expand it.
def wrapRunMACSE(args):
return(runMACSE(*args))

from multiprocessing import Pool

# Put the arguments into a list of tuples
# Each tuple will be handed to the wrapper.
args = [(args.root + currentFile, args.align_NT_dir + currentFile[:-3]+"_NT_aligned.fa", args.align_AA_dir +   currentFile[:-3]+"_AA_aligned.fa")]

p = Pool(4) #specify the number of cores you want to use

# and run all the files, in parallel.
p.map(wrapRunMACSE,args)


References:

0
Entering edit mode
5.7 years ago
John 13k

Using multiprocessing and map is definitely one good way to go. Personally I don't like multiprocessing because i never seem to have much luck with it, but having said that I always use python 2.7 and it appears from Sean's links that map and multiprocessing are better now in 3.x

Here's how I would do it in 2.7 (and its 3 compatible), but it's certainly not any clearer than Sean's way.

import os
import subprocess
import argparse
from __future__ import print_function

## Define user arguments.
parser = argparse.ArgumentParser(description="Script aligns CDS files.")
parser.add_argument('-i', '--input', default="~/testing_macse/", help="PATH to input dir.")
parser.add_argument('-c', '--cpu', default=1, type=int, help="Number of processes to use")
parser.add_argument('--align_NT_dir', default="~/testing_macse/NT_aligned/", help="PATH to the output directory for NT aligned CDS orthogroup files.")
parser.add_argument('--align_AA_dir', default="~/testing_macse/AA_aligned/", help="PATH to the output directory for AA aligned CDS orthogroup files.")
parser.add_argument('-d', '--debug', help="Turn debug mode on.")
args = parser.parse_args()

## Make output directories if they dont already exist.
try:
os.makedirs(NT_align_file_dir)
os.makedirs(AA_align_file_dir)
except FileExistsError as e:
if args.debug: print(e)

## Create a list of input files (full path)
inputs = []
for filePath in os.listdir(args.root):
fullFilePath = os.join(args.root,filePath)
if fullFilePath.endswith(".fa"):
inputs.append(fullFilePath)
else:
print('Skipping: ' + fullFilePath)

## Define a function that waits until the number of subprocess
## drops to (or below) a given value before firing another one off.
def wait(processes):
while len(subprocesses) > int(processes):
time.sleep(5) # poll subprocesses every 5 seconds
remove = []   # list of processes completed
for key,status in subprocesses.items():
if status.poll() != None: remove.append(key) # If poll returns something other
for key in remove: del subprocesses[key]         # than None, the process has finished.

## Main loop where we fire off subprocesses, but always keeping under X processes running at a time.
subprocesses = {}
for index,fullFilePath in enumerate(inputs):
wait(args.cpu -1) # wait here until we have atleast 1 less process than desired by --cpu.
command = ' '.join(['java -jar',args.MACSE,'-prog alignSequences -seq',fullFilePath,'-out_NT',args.align_NT_dir,'-out_AA',args.align_AA_dir])
if args.debug: print(command)
subprocess.call(command,shell=True)
print('Processing ' + str(index+1) + ' out of ' + str(len(inputs)))

# Wait for all subprocesses to end.
wait(0)
print('All done! :)')


Edit: I do wish code wouldn't wrap on Biostars... :)