Map-Reduce in Nextflow
2
0
Entering edit mode
4.2 years ago
russhh 5.7k

Hi. I'm trying to solve a simple map-reduce problem using nextflow.

# imperative pseudocode
# Map
Foreach letter x in (a, b, c): write ${x} to (temporary file) ${x}.txt
# Reduce
Bind together the rows of a.txt, b.txt, c.txt in any order to produce result.txt
# Cleanup
Remove a.txt, b.txt, c.txt and put result.txt into ./results/

I can't find the syntax for reduction in the nextflow documentation:

My (incorrect) reduction code is in the following:

// map_reduce.nf
letters = Channel.from('a', 'b', 'c')                                                               

process map_to_file {
    // I'm fairly certain this does what I want it writes a to a.txt (in whatever random directory
    // nextflow chooses to run that process in), and similarly for b -> b.txt, c -> c.txt
    // This 'process' should run three times, once for each letter
    input:                                                                                          
        val(letter) from letters
    output:                                                                                         
        file "${letter}.txt" into letter_files
    script:
        """                                                                                         
        echo "${letter}" > "${letter}.txt"                                                           
        """                                                                                         
}                                                                                                   

process reduce_files {
    // when we're finished, 'results.txt' in nextflow's working dir should be linked
    // to from results/results.txt                                                      
    publishDir "results"

    // But how do I tell this process that it should run once, and combine all files
    // from the channel `letter_files` into a single output file
    input:                                                                                          
        my_files from letter_files.collect()  // guesswork //                                                      

    output:                                                                                         
        file "results.txt"    

    // wanted: "cat <random_path>/a.txt <other_path>/b.txt <another_path>/c.txt > result.txt"                                                                                        
    script:                                                                                         
        """                                                                                         
        cat ${my_files} > "results.txt"                                                              
        """                                                                                         
}

Hopefuly this makes sense, I'm in the realm of unknown-unknowns with nextflow at the moment

nextflow • 3.0k views
ADD COMMENT
4
Entering edit mode
4.2 years ago
pditommaso ▴ 230

Multiple text (files) can be reduced to one or more files with collectFile:

  // map_reduce.nf
letters = Channel.from('a', 'b', 'c')                                                               

process map_to_file {
    // I'm fairly certain this does what I want it writes a to a.txt (in whatever random directory
    // nextflow chooses to run that process in), and similarly for b -> b.txt, c -> c.txt
    // This 'process' should run three times, once for each letter
    input:                                                                                          
        val(letter) from letters
    output:                                                                                         
        file "${letter}.txt" into letter_files
    script:
        """                                                                                         
        echo "${letter}" > "${letter}.txt"                                                           
        """                                                                                         
}  

letter_files
    .collectFile(name:file('results/all.txt'))

https://www.nextflow.io/docs/latest/operator.html#collectfile

ADD COMMENT
1
Entering edit mode

Thanks, but I found the operators section of the documentation really difficult to follow. Can letter_files.collectFile() be used as the input to a process? Presumably it would provide a single-emission channel, that emits a collection of files - can that collection of files be turned into a bash-string of filenames. The reason I ask, is that I have several other fan-in processes, some requiring more complicated steps than just binding the files together.

ADD REPLY
3
Entering edit mode
4.2 years ago

this should work. Trying to remove the temporary files at the end makes the things a little bit harder. I wouldn't do that because you cannot really resume a broken workflow (server is down, etc...). Just keep everything and remove the 'work' directory at the end.

letters = Channel.from('a', 'b', 'c')         

process P1 {
    input:
        val L from letters;
    output:
        file('path.txt') into (P1out,P2out)
    script:
    """
    echo "${L}" > out.txt

    echo "\${PWD}/out.txt" > path.txt
    """
    }                                                     

process P2 {
    input:
        val L from P1out.collect()
    output:
        file("concat.txt") into final_out
    script:
     """
    cat ${L.join(" ")} | while read F; do cat "\$F" ; done > concat.txt
     """
    }

process P3 {
    input:
        file ignore from final_out
        val L from P2out.collect()
    script:
    """
    cat ${L.join(" ")} | while read F; do rm "\$F" ; done
    """
    }
ADD COMMENT
1
Entering edit mode

without deleting the intermediate files:

letters = Channel.from('a', 'b', 'c')         

process P1 {
    input:
        val L from letters;
    output:
        file('out.txt') into P1out
    script:
    """
    echo "${L}" > out.txt
    """
    }                                                     

process P2 {
    input:
        val L from P1out.collect()
    output:
        file("concat.txt") into final_out
    script:
     """
    cat ${L.join(" ")}  > concat.txt
     """
    }
ADD REPLY
1
Entering edit mode

Ah, that's brilliant (sorry removing intermediate files was a misunderstanding on my part).

ADD REPLY

Login before adding your answer.

Traffic: 1990 users visited in the last hour
Help About
FAQ
Access RSS
API
Stats

Use of this site constitutes acceptance of our User Agreement and Privacy Policy.

Powered by the version 2.3.6