Hi. I'm trying to solve a simple map-reduce problem using nextflow.
# imperative pseudocode
# Map
Foreach letter x in (a, b, c): write ${x} to (temporary file) ${x}.txt
# Reduce
Bind together the rows of a.txt, b.txt, c.txt in any order to produce result.txt
# Cleanup
Remove a.txt, b.txt, c.txt and put result.txt into ./results/
I can't find the syntax for reduction in the nextflow documentation:
My (incorrect) reduction code is in the following:
// map_reduce.nf
letters = Channel.from('a', 'b', 'c')
process map_to_file {
// I'm fairly certain this does what I want it writes a to a.txt (in whatever random directory
// nextflow chooses to run that process in), and similarly for b -> b.txt, c -> c.txt
// This 'process' should run three times, once for each letter
input:
val(letter) from letters
output:
file "${letter}.txt" into letter_files
script:
"""
echo "${letter}" > "${letter}.txt"
"""
}
process reduce_files {
// when we're finished, 'results.txt' in nextflow's working dir should be linked
// to from results/results.txt
publishDir "results"
// But how do I tell this process that it should run once, and combine all files
// from the channel `letter_files` into a single output file
input:
my_files from letter_files.collect() // guesswork //
output:
file "results.txt"
// wanted: "cat <random_path>/a.txt <other_path>/b.txt <another_path>/c.txt > result.txt"
script:
"""
cat ${my_files} > "results.txt"
"""
}
Hopefuly this makes sense, I'm in the realm of unknown-unknowns with nextflow at the moment
Thanks, but I found the operators section of the documentation really difficult to follow. Can
letter_files.collectFile()
be used as the input to a process? Presumably it would provide a single-emission channel, that emits a collection of files - can that collection of files be turned into a bash-string of filenames. The reason I ask, is that I have several other fan-in processes, some requiring more complicated steps than just binding the files together.