Question: Map-Reduce in Nextflow
0
gravatar for russhh
9 months ago by
russhh5.5k
UK, U. Glasgow
russhh5.5k wrote:

Hi. I'm trying to solve a simple map-reduce problem using nextflow.

# imperative pseudocode
# Map
Foreach letter x in (a, b, c): write ${x} to (temporary file) ${x}.txt
# Reduce
Bind together the rows of a.txt, b.txt, c.txt in any order to produce result.txt
# Cleanup
Remove a.txt, b.txt, c.txt and put result.txt into ./results/

I can't find the syntax for reduction in the nextflow documentation:

My (incorrect) reduction code is in the following:

// map_reduce.nf
letters = Channel.from('a', 'b', 'c')                                                               

process map_to_file {
    // I'm fairly certain this does what I want it writes a to a.txt (in whatever random directory
    // nextflow chooses to run that process in), and similarly for b -> b.txt, c -> c.txt
    // This 'process' should run three times, once for each letter
    input:                                                                                          
        val(letter) from letters
    output:                                                                                         
        file "${letter}.txt" into letter_files
    script:
        """                                                                                         
        echo "${letter}" > "${letter}.txt"                                                           
        """                                                                                         
}                                                                                                   

process reduce_files {
    // when we're finished, 'results.txt' in nextflow's working dir should be linked
    // to from results/results.txt                                                      
    publishDir "results"

    // But how do I tell this process that it should run once, and combine all files
    // from the channel `letter_files` into a single output file
    input:                                                                                          
        my_files from letter_files.collect()  // guesswork //                                                      

    output:                                                                                         
        file "results.txt"    

    // wanted: "cat <random_path>/a.txt <other_path>/b.txt <another_path>/c.txt > result.txt"                                                                                        
    script:                                                                                         
        """                                                                                         
        cat ${my_files} > "results.txt"                                                              
        """                                                                                         
}

Hopefuly this makes sense, I'm in the realm of unknown-unknowns with nextflow at the moment

nextflow • 376 views
ADD COMMENTlink modified 9 months ago by pditommaso210 • written 9 months ago by russhh5.5k
4
gravatar for pditommaso
9 months ago by
pditommaso210
Spain, Barcelona
pditommaso210 wrote:

Multiple text (files) can be reduced to one or more files with collectFile:

  // map_reduce.nf
letters = Channel.from('a', 'b', 'c')                                                               

process map_to_file {
    // I'm fairly certain this does what I want it writes a to a.txt (in whatever random directory
    // nextflow chooses to run that process in), and similarly for b -> b.txt, c -> c.txt
    // This 'process' should run three times, once for each letter
    input:                                                                                          
        val(letter) from letters
    output:                                                                                         
        file "${letter}.txt" into letter_files
    script:
        """                                                                                         
        echo "${letter}" > "${letter}.txt"                                                           
        """                                                                                         
}  

letter_files
    .collectFile(name:file('results/all.txt'))

https://www.nextflow.io/docs/latest/operator.html#collectfile

ADD COMMENTlink modified 9 months ago • written 9 months ago by pditommaso210
1

Thanks, but I found the operators section of the documentation really difficult to follow. Can letter_files.collectFile() be used as the input to a process? Presumably it would provide a single-emission channel, that emits a collection of files - can that collection of files be turned into a bash-string of filenames. The reason I ask, is that I have several other fan-in processes, some requiring more complicated steps than just binding the files together.

ADD REPLYlink written 9 months ago by russhh5.5k
3
gravatar for Pierre Lindenbaum
9 months ago by
France/Nantes/Institut du Thorax - INSERM UMR1087
Pierre Lindenbaum131k wrote:

this should work. Trying to remove the temporary files at the end makes the things a little bit harder. I wouldn't do that because you cannot really resume a broken workflow (server is down, etc...). Just keep everything and remove the 'work' directory at the end.

letters = Channel.from('a', 'b', 'c')         

process P1 {
    input:
        val L from letters;
    output:
        file('path.txt') into (P1out,P2out)
    script:
    """
    echo "${L}" > out.txt

    echo "\${PWD}/out.txt" > path.txt
    """
    }                                                     

process P2 {
    input:
        val L from P1out.collect()
    output:
        file("concat.txt") into final_out
    script:
     """
    cat ${L.join(" ")} | while read F; do cat "\$F" ; done > concat.txt
     """
    }

process P3 {
    input:
        file ignore from final_out
        val L from P2out.collect()
    script:
    """
    cat ${L.join(" ")} | while read F; do rm "\$F" ; done
    """
    }
ADD COMMENTlink modified 9 months ago • written 9 months ago by Pierre Lindenbaum131k
1

without deleting the intermediate files:

letters = Channel.from('a', 'b', 'c')         

process P1 {
    input:
        val L from letters;
    output:
        file('out.txt') into P1out
    script:
    """
    echo "${L}" > out.txt
    """
    }                                                     

process P2 {
    input:
        val L from P1out.collect()
    output:
        file("concat.txt") into final_out
    script:
     """
    cat ${L.join(" ")}  > concat.txt
     """
    }
ADD REPLYlink written 9 months ago by Pierre Lindenbaum131k
1

Ah, that's brilliant (sorry removing intermediate files was a misunderstanding on my part).

ADD REPLYlink written 9 months ago by russhh5.5k
Please log in to add an answer.

Help
Access

Use of this site constitutes acceptance of our User Agreement and Privacy Policy.
Powered by Biostar version 2.3.0
Traffic: 1146 users visited in the last hour