Nextflow: split a FASTA file into 5 parts to enable parallel processing
1
1
Entering edit mode
4 months ago
neng ▴ 50
workflow {
SplitFasta()
//
split_pair_ch = SplitFasta.out.split_files
    | map { file -> tuple(file.baseName, file) }
    | view()
// parallel execution
InterProScan(split_pair_ch)
MergeResults(InterProScan.out.ipr_results.collect().sort())
}

I have a Nextflow workflow where I split a FASTA file into 5 parts to enable parallel processing. The relevant part of the workflow looks like above. Currently, the split_pair_ch channel looks like this:

[[a, b, c, d, e], [a1, b1, c1, d1]]

But what I want is to restructure the channel to look like this:

[[a, a1], [b, b1], [c, c1], [d, d1]]

The goal is to enable InterProScan to process corresponding split files in parallel. Is there a built-in or efficient way in Nextflow to achieve this kind of channel transformation?

Thanks in advance for your help!

process SplitFasta {
    tag {"split fasta with seqkit"}
    publishDir (
        path: "${params.outdir}",
        mode: 'copy'
    )

    input:

    output:
        tuple val(path.baseName), path("split/${baseName}.fasta"), emit: split_files

    shell:
    '''
    # creat split dir 
    mkdir -p split
    # seqkit
    seqkit split -p !{params.chunks} \
    -O split \
    --force \
    !{params.outdir}/test_transfeat_pep.fasta
    '''

}

process InterProScan {

    tag {"InterProScan chunk ${baseName}"}
    publishDir (
        path: "${params.outdir}",
        mode: 'copy'
    )

    input:
        tuple val(baseName), path(chunk_fasta)

    output:
        path "${baseName}.tsv", emit: ipr_results

    shell:
    '''
    $interproscan \
    -i !{chunk_fasta} \
    -cpu 16 \
    -f tsv \
    -b !{params.outdir}/08_Interproscan/!{baseName} \
    -appl Pfam,Gene3D,SMART,CDD,PANTHER \
    -dp \
    -goterms 
    '''

}
ok, I'm trying to achive in shell script ,then apply it in nextflow. I'ved tried the .flatMap, transpose, each, method in nextflow, it seems not work.

nextflow workflow pipeline • 1.3k views
ADD COMMENT
2
Entering edit mode

what I expected:

[part_001, /path/to/split/part_001.fasta]
[part_002, /path/to/split/part_002.fasta]
...

actually data stream:

[[part_001, part_002, part_003], [file1.fasta, file2.fasta, file3.fasta]]
ADD REPLY
1
Entering edit mode

no clue on nextflow but do assure that the split files are of equal size, otherwise you might get little gain from the splitting it, as one file might/will hold back the finishing of the run ( == as interproscan expert user ;) )

ADD REPLY
1
Entering edit mode

I don't understand what is a "pair" here. You should put a snippet of SplitFasta()

ADD REPLY
1
Entering edit mode

Agree - for this kind of simple stuff I wouldn't use a tuple, but just a simple path to the split files.

ADD REPLY
0
Entering edit mode

I don't understand how your splitfasta works, the output should collect the files under "split". It should be something like:

process SplitFasta {
    input:
        path(in_fasta)
    output:
        tuple val(path.baseName), path("dir/*"), emit: split_files
    shell:
    '''
    # creat split dir 
    mkdir -p split
    # seqkit
    seqkit split -p ${params.chunks} \
    -O split \
    --force \
   ${in_fasta}
    '''
}

Grab the output file(s) using path("dir/*")

You should never use the publishDir as a temporary location.

Use the 'input:' directive instead of using this "{params.outdir}/test_transfeat_pep.fasta"

ADD REPLY
0
Entering edit mode

Yeah, it's a good solution I've tried. The only problem is that the SplitFasta() process has to be finished before it starts, otherwise it report error. I don't know if there is a way to make sure split_chunks process starts after SplitFasta() process finished.

    SplitFasta()
split_files = SplitFasta.out.collect()
//using the output split fastafiles directly 
split_chunks = Channel.fromPath("${params.outdir}/split/*.fasta")
    .map { file -> 
        def base = file.baseName
        [base, file] 
    }
split_chunks .view()
ADD REPLY
0
Entering edit mode

It may be useful to also indicate how you are running this code? Are you supplying a single fasta file name or five pieces?

[[a, a1], [b, b1], [c, c1], [d, d1]]

Are these paired elements different representations e.g. FASTA and metadata?

ADD REPLY
3
Entering edit mode
4 months ago
neng ▴ 50

Ok, I finally find the solution. I would like to share with you guys:

workflow {
// Step 7: split
SplitFasta(params.fasta)

// Format data stream
split_chunks = SplitFasta.out[0]
    .flatMap { sample, files ->
        files.collect { f ->
            tuple(f.baseName, f)
        }
    }

split_chunks.view()

// Parallel processing
InterProScan(split_chunks)

// Merge results
MergeResults(InterProScan.out.collect()) }
ADD COMMENT

Login before adding your answer.

Traffic: 4485 users visited in the last hour
Help About
FAQ
Access RSS
API
Stats

Use of this site constitutes acceptance of our User Agreement and Privacy Policy.

Powered by the version 2.3.6