Nextflow output from different processes mixed up as input in later processes
1
0
Entering edit mode
17 months ago
Eliveri ▴ 350

I wrote a Nextflow workflow DSL2 with the output of some processes used in multiple processes later in the workflow.

The last process is to print out a summary of the read counts after trimming and after the last step. I ran the pipeline for 2 pairs of fastq samples. Unfortunately, I found that the final results were mixed up between the 2 samples pairs I ran. So the trim_ch input was from one sample pair and the concat_ch input was from another sample pair in one process run.

Now I am worried that the output of processes/channels are getting mixed up unbeknownst to me. The into and from have been removed from DSL2, so I am not sure if I am missing something for branching processes.

The code (some of it is shortened/omitted as less relevant to question).

//trimmomatic read trimming
process TRIM {

    tag "trim ${pair_id}"   

    publishDir "${params.outdir}/$pair_id/trim_results"

    input:
    tuple val(pair_id), path(reads) 

    output:
    tuple val(pair_id), path("trimmed_${pair_id}_R{1,2}_{paired,unpaired}.fastq.gz")

    script:
    """
    """
}


//bwa alignment
process BWA_PF {

    tag "align-pf ${pair_id}f"

    publishDir "${params.outdir}/$pair_id/bwa_pf_results"

    input:
    tuple val(pair_id), path(reads)
    path index

    output:
    tuple val(pair_id), path("${pair_id}_pf_mapped.{bam,bam.bai}")

    script:
    """
    """
}

//filter mapped pf reads
process FILTER_PF {

    tag "filter ${pair_id}"

    publishDir "${params.outdir}/$pair_id/filter_results"

    input:
    tuple val(pair_id), path(reads)

    output:
    tuple val(pair_id), 
    path("${pair_id}_pf_R{1,2}.fastq.gz")

    script:
    """
    """
}

//filter mapped non pf reads
process FILTER_NON_PF {

    tag "filter ${pair_id}"

    publishDir "${params.outdir}/$pair_id/filter_results"

    input:
    tuple val(pair_id), path(reads)

    output:
    tuple val(pair_id), 
    path("${pair_id}_non_pf_R{1,2}.fastq.gz")

    script:
    """
    """
}

//bwa alignment
process BWA_PF_HUMAN {

    tag "align-pf_human ${pair_id}"

    publishDir "${params.outdir}/$pair_id/bwa_pf_human_results"

    input:
    tuple val(pair_id), path(reads)
    path index

    output:
    tuple val(pair_id), path("${pair_id}_pf_human_mapped.{bam,bam.bai}")

    script:
    """
    """
}

//bwa alignment
process PROGRAM{

    tag "program ${pair_id}"

    publishDir "${params.outdir}/$pair_id/program_results"

    input:
    tuple val(pair_id), path(reads)
    path chroms

    output:
    tuple val(pair_id), 
    path("${pair_id}}")

    script:
    """
    """
}

//concatenate pf and non_human reads
process CONCAT{

    tag "concat ${pair_id}"

    publishDir "${params.outdir}/$pair_id"

    input:
    tuple val(pair_id), path(program_reads)
    tuple val(pair_id), path(pf_reads)

    output:
    tuple val(pair_id), path("${pair_id}_non_human_R{1,2}.fastq.gz")

    script:
    """
    """
}

//summary
process SUMMARY{

    tag "summary ${pair_id}"

    publishDir "${params.outdir}/$pair_id"

    input:
    tuple val(pair_id), path(trim_reads)
    tuple val(pair_id), path(non_human_reads)

    output:
    file("summary_${pair_id}.csv")

    script:
    """
    """
}

workflow {
    Channel
        .fromFilePairs(params.reads, checkIfExists: true)
        .set {read_pairs_ch}

    // trim reads
    trim_ch = TRIM(read_pairs_ch)

    // fastqc report (optional)
    // fastqc_ch = FASTQC(trim_ch)

    // map to pf genome
    bwa_pf_ch = BWA_PF(trim_ch, params.pf_index)

    // filter mapped reads
    filter_pf_ch = FILTER_PF(bwa_pf_ch)
    filter_non_pf_ch = FILTER_NON_PF(bwa_pf_ch)

    // map to pf and human genome
    bwa_pf_human_ch = BWA_PF_HUMAN(filter_non_pf_ch, params.pf_human_index)

    // filter out human chromosome reads using eludicator
    program_human_ch = PROGRAM(bwa_pf_human_ch, params.chroms)

    // concatenate non human reads
    concat_ch = CONCAT(program_human_ch,filter_pf_ch)

    // summarize
    summary_ch = SUMMARY(trim_ch,concat_ch)
}
DSL2 process Nextflow • 1.4k views
ADD COMMENT
3
Entering edit mode
17 months ago

I'm not sure I understand, but to group all your reads with the same ID for concat . I would do something like

// filter mapped reads
filter_a_ch = FILTER_a(bwa_a_ch)
filter_non_a_ch = FILTER_NON_a(bwa_a_ch)

input_for_concat = filter_a_ch.combine(filter_non_a_ch).
   filter{T->T[0].equals(T[2])}. // same ID
   map{T->[T[0],T[1],T[3]]} // ID , FQ_A_CH , FQ_NON_A_CH

on a side note:

you don't need publishDir "${params.outdir}/$pair_id" for each process you don't need #!/usr/bin/env bash as bash is the default

if you know you're working with paired data, you'd better use

tuple val(pair_id),  path("${pair_id}_non_R1.fastq.gz"), path("${pair_id}_non_R2.fastq.gz")
ADD COMMENT
1
Entering edit mode

Thank you so much!

if you know you're working with paired data, you'd better use

tuple val(pair_id), path("${pair_id}_non_R1.fastq.gz"), path("${pair_id}_non_R2.fastq.gz")

May I ask if it is absolutely necessary to separate the paired file paths? Or is it more a matter of best practice?

I haven't done so with my workflow and so far there wasn't any issues, should I be worried?

ADD REPLY
1
Entering edit mode

Thank you. I also found that .join could be use if there is a common index.

ADD REPLY

Login before adding your answer.

Traffic: 1983 users visited in the last hour
Help About
FAQ
Access RSS
API
Stats

Use of this site constitutes acceptance of our User Agreement and Privacy Policy.

Powered by the version 2.3.6