nextflow set tuple issue
18 months ago
JoeDoasi ▴ 10

Hello,

I recently started using Nextflow and I always get this error:

WARN: Input tuple does not match input set cardinality declared by process markDuplicates -- offending value:

The code is as follows:

Channel
.fromPath(params.samples)
.splitCsv()
.map { row ->
def sampleID = row[0]
def tag = row[1]

}
.set { allSamples_ch }

process align {

publishDir "{params.out}/aligned_reads", mode:'copy' input: set val(sampleID), val(tag), file(read1), file(read2) from allSamples_ch output: set val(sampleID), val(tag), file("{sampleID}_${tag}.bam") into reads_ch script: readGroup = \ "@RG\\tID:${sampleID}_${tag}\\tLB:${sampleID}_${tag}\\tPL:${params.pl}\\tSM:${sampleID}_${tag}"
"""
$BWA mem -M -t 24 \ -R \"${readGroup}\" \
$ref \${read1} \
${read2} | \$SAMTOOLS sort -@ 24 -o ${sampleID}_${tag}.bam -
"""
}

process markDuplicates {

publishDir "{params.out}/dedup_sorted", mode:'copy' input: set val(sampleID), val(tag), file(aligned_reads) from reads_ch.collect() output: set val(sampleID), val(tag), file("{sampleID}_sorted_dedup_${tag}.bam") into bam_for_variant_calling, \ sorted_dedup_ch_for_metrics, bam_for_indexing set val(sampleID), val(tag), file("${sampleID}_dedup_metrics_${tag}.txt") into dedup_qc_ch script: """$PICARD MarkDuplicates \
I=${sampleID}_${tag}.bam \
O=${sampleID}_sorted_dedup_${tag}.bam \
M=${sampleID}_dedup_metrics_${tag}.txt \
TMP_DIR=${params.tmpdir}/${workflow.runName}/\${sampleID}
"""
}


The align process was successful but I couldn't fix the markDuplicate step.

I will really appreciate your help folks!

Regards

Joe

My idea was to keep the next process wait until the running process finishes.

Initially I didn't use .collect().

so i have 6 processes to be applied on trio samples of some cases: normalabnormalabnormal:

align

markduplicates

index

stats

mutect

filterCalls


so at Mutect stage, I need all trios to be ready. I couldn't figure out how to do it..

You will probably need to re-work the channel of markduped bam files using .map{} (and other operators) before providing them to Mutect using a common grouping key.

seems to be working now..

hopefully no issues down the road!

Thanks

18 months ago
Barry Digby ★ 1.0k

Hi joe,

When you ask MarkDuplicates to .collect() the inputs you lose the inherent structure of the reads_ch tuple.

reads_ch has the structure [sample_id, tag, bam] but when you run .collect() it becomes [sample_id, tag, bam, sample_id, tag, bam ... sample_id, tag, bam] , a list including all sample_ids, tags and bam files.

My advice would be to omit the .collect() operator as it is not appropriate for picard MarkDuplicates, which works one sample at a time.

Once you get rid of .collect(), Nextflow will run MarkDuplicates in parallel for each sample in reads_ch provided to the process so don't worry about trying to speed it up.

p.s are you sure your aligned bams are correct? I don't see the reference genome + bwa indices staged as inputs for the alignment process (or did you omit them for the sake of the post).

