BLAST parallelisation best practices
2
0
Entering edit mode
10 months ago

Hi All,

I'm trying to speed up a BLASTP call as part of a bigger RBH workflow to detect orthologs, and I'm in the process of testing different approaches with a 100K sequence database and a 1107 sequence query (real database will be 350K, queries will differ in size). My function splits the query into separate files and processes them separately using python multiprocessing (Process or Pool), and I'm also looking at combining that with BLASTP's -num_threads parameter to increase speed further. I'm very knew to parallelisation in general (both threading and multiprocessing) but am keen to know more!

I posted these questions together as they all relate to the same code, are generally continuing on from each other and I can accept multiple answers (unlike stack overflow), but please let me know if you'd suggest posting them separately. I'd be grateful for any answers, doesn't have to cover every point in one go :D

Question 1 - I'm running local BLASTP and was wondering about the details for the num_threads parameter. Am I right in thinking that (as the name suggests), it spreads the workload across multiple threads across a single CPU, so is kind of analogous to Pythons threading module (as opposed to the multiprocessing module, which spreads tasks across separate CPUs)? I've heard BLAST only goes above 1 thread when it 'needs too', but I'm not clear on what this actually means - what determines if it needs more threads? Does it depend on the input query size? Are threads split at a specific step in the BLASTP program?

Question 2 - To check I have the right ideas conceptually, if the above is correct, would I be correct to say that BLAST itself is I/O bound (hence the threading), which makes sense as its processing thousands of sequences in the query etc so lots of input? But if you want to call BLAST in a workflow script (e.g. using Python's subprocess module), then the call is CPU bound if you set num_threads to a high number, as it spreads the work across multiple threads in a single CPU, which takes a lot of the CPU power? Or does the fact that blastP is not taking full advantage of the threading mean that the CPU is not actually getting fully utilised, so a call will still be input/output bound independent of num_threads? If that's correct, then maybe I could use threading to speed separately process the split queries rather than multiprocessing...

Question 3 - Are there any suggestions for how to get the best core and thread parameters for general use across different machines without relying on individual benchmarking (I want it to work on other peoples machines with as little tuning and optimisation as possible). Is it just cores = as many cores as you have (ie multiprocessing.cpu_count()) and threads = cores + 1 (defined by the BLASTP parameter num_threads)? Would this still be true on machines with more/less cores?

Question 4 - for benchmarking, how do external programs affect multiprocessing speed - would scrolling the web with 100 tabs open impact multiprocessing speed by increasing the work done by one of the CPUs, taking away resources from one of the processes running my script? If the answer is yes, whats the best way to benchmark this kind of thing? I'm including this question to give context on my benchmarking questions below (i.e. the numbers I am throwing around may be crap). I tried to include graphs of the numbers but they wont copy in, however I found a post explaining how to add pics so if they are helpful I can add them in.

Question 5 - Perhaps a more general question, I'm only splitting my query into 4 processes so would have thought multiprocessing.Process would be better (vs multiprocessing.Pool, which seems the preferred choice if you have lots of processes). But this isn't the case in my benchmarks, for multiprocessing using blastP_paralellised_process and blastP_paralellised_pool - any idea why? Timewise the process to pool 'seconds' ratio hovers around 1 with no obvious pattern for all num_threads (1-9) and core (1-5) combinations.

Question 6 - why does increasing the numbers of cores used to process number of cores * split BLASTP queries not result in obvious speed improvements? I would expect this with cores set >4, as my pc is a 4-core machine, but there seems to be little difference between processing 1/4 query files across 4 cores vs processing 1/2 query files across 2 cores. Is my assumption for Question 2 incorrect? There is a little bit of slowdown for running on a single core and a dramatic increase for 1 core with 2 and 1 threads (1618 seconds and 2297 seconds), but for 2-5 cores with 1-9 threads the time for each blastP run is around 1000 seconds with some small random fluctuations (eg 4 cores 1 thread is 1323 seconds, but the other multicore single thread runs are normal timewise relative to the baseline of the other values).

I've copied my code below. I've not included functions like split_fasta etc, as both they and BLASTP seem to be working (in the sense that im getting xml results files that I havent started parsing yet but look ok when i open in notepad) and I don't want to add 100 lines of unnecessary code and comments. Also, theyre used in the same way for both blastP_paralellised_process and blastP_paralellised_pool, so I don't think they are causing the time differences. Please let me know if including these would help though!

def blastP_paralellised_process(evalue_user, query_in_path, blastp_exe_path, results_out_path, db_user, num_cores, thread_num):
#function to split fasta query in 1 txt file per core
filenames_query_in_split=fasta_split(query_in_path, num_cores)
#function to construct result names for blastp parameter 'out'
filenames_results_out_split=build_split_filename(results_out_path, num_cores)
#copy a makeblastdb database given as iinput.  generate one database per core.
#Change name of file to include 'copy' and keep original database directory for quality control.
delim=db_user.rindex('\\')
db_name=db_user[delim::]
db_base=db_user[:delim]
databases=copy_dir(db_base, num_cores)#1 db per process or get lock
#split blastp params across processes.
processes=[]
for file_in_real, file_out_name, database in zip(filenames_query_in_split, filenames_results_out_split, databases):
#'blastP_subprocess' is a blast specific subprocess call that sets the environment to have
#env={'BLASTDB_LMDB_MAP_SIZE':'1000000'} and has some diagnostic error management.
blastP_process=Process(target=blastP_subprocess,
args=(evalue_user,
file_in_real,
blastp_exe_path,
file_out_name,
database+db_name,
blastP_process.start()
processes.append(blastP_process)
#let processes all finish
for blastP_process in processes:
blastP_process.join()

def blastP_paralellised_pool(evalue_user, query_in_path, blastp_exe_path, results_out_path, db_user, num_cores, thread_num):
####as above####
filenames_query_in_split=fasta_split(query_in_path, num_cores)
filenames_results_out_split=build_split_filename(results_out_path, num_cores)
delim=db_user.rindex('\\')
db_name=db_user[delim::]
db_base=db_user[:delim]
databases=copy_dir(db_base, num_cores)
################
#build params for blast
params_new=list(zip(
[evalue_user]*num_cores,
filenames_query_in_split,
[blastp_exe_path]*num_cores,
filenames_results_out_split,
[database+db_name for database in databases],
#feed each param to a worker in pool
with Pool(num_cores) as pool:
blastP_process=pool.starmap(blastP_subprocess, params_new)

if __name__ == '__main__':
#make blast db
makeblastdb_exe_path=r'C:\Users\u03132tk\.spyder-py3\ModuleMapper\Backend\Executables\NCBI\blast-2.10.1+\bin\makeblastdb.exe'
input_fasta_path=r'C:\Users\u03132tk\.spyder-py3\ModuleMapper\Backend\Precomputed_files\fasta_sequences_SMCOG_efetch_only.txt'
db_outpath=r'C:\Users\u03132tk\.spyder-py3\ModuleMapper\Backend\Intermediate_files\BLASTP_queries\DEMgenome_old\database\smcog_db'
db_type_str='prot'
start_time = time.time()
makeblastdb_subprocess(makeblastdb_exe_path, input_fasta_path, db_type_str, db_outpath)
print("--- makeblastdb %s seconds ---" % (time.time() - start_time))

#get blast settings
evalue_user= 0.001
query_user=r'C:\Users\u03132tk\.spyder-py3\ModuleMapper\Backend\Intermediate_files\BLASTP_queries\DEMgenome_old\genome_1_vicky_3.txt'
blastp_exe_path=r'C:\Users\u03132tk\.spyder-py3\ModuleMapper\Backend\Executables\NCBI\blast-2.10.1+\bin\blastp.exe'
out_path=r'C:\Users\u03132tk\.spyder-py3\ModuleMapper\Backend\Intermediate_files\BLASTP_results\blastresults_genome_1_vicky_3.xml'#zml?
num_cores=os.cpu_count()

#benchmarking
for num_cores in range(1,6)[::-1]:
print()
start_time = time.time()
blastP_paralellised_process(evalue_user, query_user, blastp_exe_path, out_path, db_outpath, num_cores, num_threads)
end_time=time.time()
start_time = time.time()
blastP_paralellised_pool(evalue_user, query_user, blastp_exe_path, out_path, db_outpath, num_cores, num_threads)
end_time=time.time()
print()

multiprocessing python 3.7 num_threads BLASTP • 615 views
2
Entering edit mode
10 months ago
Mensur Dlakic ★ 15k

First, beware that I am no expert on running these kinds of jobs from python scripts.

In my experience, it is best to run this as a single job with maximum available num_threads. If you have a single -query group of sequences and a single -db database, I think the program will load the database into memory once, and keep it there for all subsequent sequences. Any other solution that splits your sequences into multiple jobs will have to deal with loading the database multiple times, and I think that is likely to be the slowest part of this process even if you have a solid-state drive and really fast memory.

That said, I don't think you need to worry too much with a database that is 350K sequences and ~1000 queries. That will probably be done in couple of hours on any modern computer. In other words, you may spend more time thinking (and writing) about it than what the actual run will take.

0
Entering edit mode

I think you're right, I managed to find some more discussions where this was discussed as an issue for multiprocessing - I might try splitting the database rather than the query which could ameliorate the issue (and seems to be a general approach used).

I agree it's not a big deal for this specific example, but it's all going to end up as part of a bigger, generic application so I would like to scale it up to bigger queries/databases as much as possible (although something like diamond might be more useful for that). I'm also treating it as a learning exercise for python multiprocessing, I'm not very good at it but it seems a useful library to know :P

Thanks again, I completely disregarded the impacts of data sharing between processes!

2
Entering edit mode
10 months ago
xonq ▴ 40

Q1: Generally, I think you are correct in your interpretation between threading and multiprocessing. For BLAST, num_threads also has to do with spreading the workload of aligning across the threads you've allotted and query sequences you've provided, e.g. if you're querying nr, more threads will increase the throughput simply by expanding how many sequences can be aligned at once. Nevertheless, threading and multiprocessing are more nuanced than that and don't scale linearly sometimes when you think they should, so there is typically an asymptote of performance gains when increasing threads... beyond the asymptote you will likely decrease throughput.

Q2: I'm not sure I interpret what you're saying, but let's say you that subprocessed a bunch of BLAST searches with one thread alotted per search; the amount of concurrent BLAST queries you are running would scale with subprocessing, and the speed of each of those query should scale to a point with the threading. However, you may be starting to unnecessarily convolute things here - why not just compile your queries into a single fasta and search with one BLAST search that uses all available threads instead of relying on yourself to implement the subprocessing optimally? BLAST is a significantly tested program - probably one of, if not the best for all bioinformatic software - I trust their implementation of multithreading over my own - anything well written at the C level will surely be faster than Python. The only exception I can think of here is if you are reaching the peak threading improvement for BLAST, then it may make sense to start multiprocessing the workflow with each subprocess allocating the optimal threads.

Q3: For this you will have to turn to the literature or user forums to find actual graphs on how BLAST scales with multithreading.

Q4: If you are spreading your workflow across all your CPUs and maximizing CPU usage within the workflow then of course you are going to be cutting into performance once you start performing other computer processes. It is best practice, in my experience, to leave 1 to 2 CPU cores free during these analyses if you aren't using a HPC so that you don't interfere with system processes and potentially crash your system. With respect to many tabs, you may run into RAM issues before CPU problems.

Q5: No clue, I simply use pool so that I can scale in whichever way I choose. Most scripts I've reviewed do the same.

Q6: Again, if at all possible, leave the threading to the program that's been deployed around the globe and used for a couple decades haha. Multithreading in BLAST is tailored toward the operation, multiprocessing by you is opening a blackbox of different things that can affect performance. Unless you're at the asymptote of performance increase v threads, then prefer BLAST's implementation of multithreading over multiprocessing in my opinion.

0
Entering edit mode

Hi Xonq,

Thanks for your reply, managed to cover every question! I agree its probably not absolutely necessary here, but I'm quite keen to get more familiar with Python's parallelisation libraries so thought it would be a good opportunity :P Can see how it goes, definitely given some food for thought!