Nextflow working group

Nextflow working group

2025-10-30

Frédéric Jarlier, Julien Roméjon, Philippe Hupé, Laurent Jourdren, Quentin Duvert

Table of contents

  • Introduction

  • Channels

  • Operators

  • Process

  • Configuration & Profile

  • Error Management & Debugging

  • Metrics

Introduction

  • Nextflow is Data Flow Model programming language that helps building complex worflows;


  • The idea is to chain multiple tasks like in *nix system with piping command;


  • Nextflow run upon Java and extensively use Groovy;


  • The elementary bricks of Nextflow are processes, channels, operators and workflows (since DSL2).

Test environment

  • Copy the following command lines without the $ in your terminal:
$ module load nextflow
$ echo "println \"Hello\"" > hello.nf
$ nextflow run hello.nf

Test environment

  • Copy the following command lines without the $ in your terminal:
$ module load nextflow
$ echo "println \"Hello\"" > hello.nf
$ nextflow run hello.nf
N E X T F L O W  ~  version 22.10.7
Launching `hello.nf` [trusting_bohr] DSL2 - revision: da3698fdd8
Hello

Note: The Nextflow version may differs.

Main concepts

                                                                                                                                 
Channel Operator Process Directive Workflow
  • Operators work on Channels

  • Directives work on Processes

  • Workflows chain Channels

Channels

  • Recieving a message is synchronous (blocking);

  • Sending a message is asynchronous (non blocking, not determinist);

  • Execution order in nextflow depends on channel availability;

  • Advantages:

    • masking computing and communication.
  • Inconvenient:

    • difficult to debug;
    • difficult to develop.

Channels

  • Channels are 2 types:
    • Queue channels:
      • queue channels are FIFO queue and block other input queue channels (DataFlow channels).
    • Value channels:
      • only one value and consumed repeatedly and don’t block other input queue channel (DataFlow values).
  • Channels Factories:
    // Value channels

    Channel.value(“toto”)
    Channel.value([1,2,3])


    // Queue channels

    Channel.of(1,2,3)
    Channel.of([1,2,3])
    Channel.fromPath(/input/*.gz”)


🗎 example1.nf

    // Queue Channels

    ch1 = Channel.of( 1, 3, 5, 7, 9 )
    ch2 = Channel.of( [1, 3, 5, 7, 9] )
    ch3 = Channel.of( [1, 2], [5, 6], [7, 9] )

    Channel.fromList([ 'a', 'b', 'c', 'd'] )
           .view()


    files       = Channel.fromPath( 'data/*.fa' )
    moreFiles   = Channel.fromPath( 'data/**/*.fa' )
    pairFiles   = Channel.fromPath( 'data/file_{1,2}' )

** for directory recursion

Operators

  • Operators manipulate channels;

  • Operators return queue channels or value channels;

  • With operators you can:

    • filter (e.g. filter, first, unique…)
    • combine (e.g. map, groupTuple, collect, flatten…)
    • process text (e.g. splitCSV, splitFasta…)
    • fork (e.g. multiMap, branch…)
    • aggregate (count, min, max…)
    • debug (view, dump…)

🔗 https://www.nextflow.io/docs/latest/reference/operator.html

Operators

  • collect
Channel.of( 1, 2, 3, 4 )
    .collect()
    .view()
  • groupTuple

Channel.of( [1, 'A'], [1, 'B'], [2, 'C'], [1, 'C'], [2, 'A'] )
       .groupTuple()
       .view()
  • join
left  = Channel.of( ['X', 1], ['Y', 2], ['Z', 3], ['P', 7] )
right = Channel.of( ['Z', 6], ['Y', 5], ['X', 4] )

left.join(right).view()


🗎 example2.nf

Operators

  • splitCsv/splitFasta
Channel.of( '10,20,30\n70,80,90' )
       .splitCsv()
       .view()
Channel.fromPath('misc/sample.fa')
       .splitFasta( by: 10)
       .view()
  • min/max/sum
Channel.of( 8, 6, 2, 5)
       .min()
       .view() 
Channel.of( 8, 6, 2, 5)
       .sum()
       .view() 


🗎 example2.nf

Operator - Closure

  • Closures are lambda or anonymous functions;

  • Closures are blocks of code that can be passed as argument of an operator;

  • Closures are written between brackets: { put the code here };

  • By default there is only 1 argument : it (Warning: only the it variable is accepted);

  • With multiple arguments the map operator applies a lambda function to each argument from a source channel.

Channel.of( 1, 2, 3, 4, 5 )
       .map { a -> a * a }
       .view()
Channel.of( 1, 2, 3, 4, 5 )
       .map { it * it }
       .view()

🗎 example2.nf

Operators - Example: Merge


    // Define two channels of different sizes
    ch_small = Channel.fromList([1, 2, 3])
    ch_large = Channel.fromList(['A', 'B', 'C', 'D'])

    // Merge the channels together
    ch_small
        .merge(ch_large)
        .view { pair -> "Merged pair: $pair" }


Merged pair: [1, A]
Merged pair: [2, B]
Merged pair: [3, C]

Warning: when you need to merge two or more channels:

  • Verify the dimension of the channels;
  • If you are not sure of the dimension use the collect or join operators.

Processes

  • Skeleton
process < name > {

    [directive]

    input:
    < process inputs >

    output:
    < process outputs >, emit: name

    [script]:
    < user script to be executed>

}

🔗 https://www.nextflow.io/docs/latest/reference/syntax.html#process

  • Only one script section in a process definition;
  • Script executed in a Bash interpreter by default;
  • Script run in the host environment.

process doOtherThing {

    input: val MAX

script:
"""
    DB="path_to_data_base"

    blast -db \${DB} -query query.fa -outfmt 6 > blast_result
    cat blast_result | head -n ${MAX} | cut -f2 > top_hits
    blastdbcmd -db \${DB} -entry_batch top_hits > sequences
"""
}

\${DB} or \$DB is a Bash variable
${MAX} or $MAX is a Nextflow variable

Processes

  • Processes contain the commands to execute;
  • the input types can be:

    • val
    • path
    • tuple

      (less used)

    • env
    • stdin
    • each [input collection]
  • the output types can be:

    • val
    • path
    • tuple

      (less used)

    • env
    • stdout
    • eval
  • A tuple variable gather multiple values in a single cell of a channel;

  • If inputs of different sizes the first empty stops the process.


🔗 https://www.nextflow.io/docs/latest/process.html#inputs
🔗 https://www.nextflow.io/docs/latest/process.html#outputs

Process - directives

  • Directives are optionals and are defined at the beggining of a process;
  • Adjust the context and configuration (e.g. required cpu, memory, define labels, tags queues…).
process align {

        cpus 1
        memory "200MB"
        // executor "local"
        executor "slurm"
        // queue "diag"
        beforeScript "source fun.sh"
        label "exemple_dir1"
        debug true
        //tag appears in the .nextflow.log
        tag "my_job" 

        input: [...]
        output: [...]
        script: [...]
}

🔗 https://www.nextflow.io/docs/latest/process.html#directives

Process - publishDir directive

  • The publishDir directive organize your results and the output files;

  • Don’t use publishDir as input of a process;

  • if there is no options in publishDir directive, all the outputs of the process will be published;

  • The mode: 'link' option allow to use hard links instead of copying files.


process foo {
    publishDir './data/chunks' , mode: 'copy', overwrite: false

    output:
    path 'chunk_*'

    ”””
    printf 'Hola' | split -b 1 - chunk_
    ”””
}


🗎 example3.nf

Process - Interpreter

  • If a shebang (#!) is not used in the beginning of the script code, Bash will be the default interpreter.
process perlTask {

    script: 
    """
    #!/usr/bin/perl
    print 'Hi there!' . '\n';
    """
}
process bashTask {

    script: 
    """
    echo "Hello from $SHELL!";
    """
}
process pythonTask {

    script:
    """
    #!/usr/bin/env python3
    X = 'Hello'
    Y = 'world'
    print("%s - %s", X,Y)
    """
}
process bashTask {

    script: 
    """
    #!/bin/bash
    echo "Hello from $SHELL!";
    """
}

🗎 example4.nf

Process - Conditionnal execution

  • Use the if/else statements;
  • The switch/case statements are deprecated and not allowed in strict syntax.
mode = 'tcoffee'

process align {
    input:
    path sequences
    script:
    if( mode == 'tcoffee' )
        """
        t_coffee -in $sequences > out_file
        """
    else if( mode == 'mafft' )
        """
        mafft --anysymbol --parttree --quiet $sequences > out_file
        """
    else
        error "Invalid alignment mode: ${mode}"
}


🗎 example5.nf

Process - Dry run

  • Nextlflow allow prototyping workflow without real data;

  • You just need to add a stub: section to your process:

process INDEX {
  input:
    path transcriptome
  output:
    path 'index'
  script:
    """
    salmon index --threads $task.cpus -t $transcriptome -i index
    """
  stub:
    """
    mkdir index
    touch index/seq.bin
    touch index/info.json
    touch index/refseq.bin
    """
}
  • Execute the pipeline with option -stub-run:
$ nextflow run -stub-run main.nf

Workflow

  • workflow is a function specialized for manipulating processes, channels and operators;
  • workflow can be anonymous (main) or named (sub-workflow);
  • take: and emit: sections are optional in named workflows;
workflow my_work {
    take: //optional
    main:
    emit: //optional   
}
  • main workflow
workflow  {
    main:
    publish: //strict-syntax (nextflow >= 25.10)
}
  • An anonymous workflow is mandatory to execute a pipeline with Nextflow;
  • The publish: section in a workflow is an on going development (planned in 25.10 version);
  • For now publish your data in directive section of the process.

Workflow - Subworkflow

  • Named workflows are subworkflows:
workflow subw { println "hello" }
workflow { subw() }
  • Emit a Channel from a subworkflow:
workflow subw {
    main: message = "hello"
    emit: message
}
  • How to pass output:
workflow subw {
    main: message = "hello"
    emit: message
}

workflow { 
    main:
        subw() //call sub-workflow
        subw.out.message.view() // view the emit output message sub-workflow
}

Workflow - Include

  • Module inclusion:
include {pythonProcess} from './modules/pythonProcess/main.nf'
workflow {
    pythonProcess()
}
  • Multiple inclusion:
include { pythonProcess1; pythonProcess2  } from './modules/pythonProcess/mains.nf'
workflow {
    pythonProcess1()
    pythonProcess2()
}
  • Module aliases:
include { task as task_alpha } from './some/module' //allow to import multiple times the same process

🗎 example4.nf

Configuration

  • When launched, Nextflow looks for configuration files in multiple locations (from low to high priority):

    1. Parameters defined in pipeline scripts (e.g. main.nf)
    2. The config file $HOME/.nextflow/config
    3. The config file nextflow.config in the project directory
    4. The config file nextflow.config in the launch directory
    5. Config file specified using the -c <config-file> option
    6. Parameters specified in a params file (-params-file option)
    7. Parameters specified on the command line (--something value)


  • When params are passed in the command you overload the params from the nextflow.config

🔗 https://nextflow.io/docs/latest/reference/config.html

Configuration

  • The nextflow.config contains:
    • general configuration
    • profile configurations
includeConfig 'conf/base.config'

// Profiles
profiles {
  cluster {
    includeConfig 'conf/cluster.config'
  }
}
  • specific configurations can be in other files
    • base.config
    • cluster.config
    • process.config
process {
  executor = 'slurm'
  queue = params.queue ?: null
}

Configuration - Scope

  • Configuration is related to a scope

main.nf

process process1 {
    label "someLabel"
    script:   
        def go = params.flag ?: false
        // ...
        """
        bash script.sh # using $myvar
        """
    }
process process2 {
    label "someLabel"
    script:
        // ...
}
  • nextflow.config
env.myvar = "${baseDir}/somePath/boum.txt"
params.flag = true
  • process.config
process {
        executor = "pbs"
        withLabel: "someLabel" {
                cpus = 1
                memory = "200MB"
        }
}
  • 3 scopes here: env, params, process

nextflow run main.nf --flag true

  • Constants (Global Namespaces)

    • launchDir: where you execute the pipeline
    • projectDir: where the main script is

🔗 https://nextflow.io/docs/latest/reference/stdlib-namespaces.html

Configuration - Profile

  • Configuration that can be choose at the execution;

  • Configuration files can contain the definition of profiles;

  • Select configuration profile with the option -profile of Nextflow command line;

  • The standard profile is selected when no profile is given;

  • Multiple profiles can be selected:

$ nextflow run my_script.nf -profile standard,cluster
profiles {
    standard {
        process.executor = 'local'
    }
    cluster {
        process.executor = 'slurm'
        process.queue = 'long'
        process.memory = '10GB'
    }
}

Error Management

  • Nextflow allow to defined many error management strategies:
process example {

        errorStrategy "terminate" // kill all submitted jobs (default)
        errorStrategy "finish" // let spawned job finish
        errorStrategy "retry" // requeue the job
        errorStrategy "ignore" // go to the next process
        //or with closure
        errorStrategy { sleep(Math.pow(2, task.attempt) * 30 as long); return ‘retry’  }

        maxRetries 5 // when errorStrategy = retry, single process
        maxErrors 2 // total number of errors (all process instances)
        validExitStatus 0,1 // Great power == Great responsibility

        memory {2.GB * task.attempt}

script: [...]
}
  • task.attempt is only available in a process scope;
  • Retry should not be a strategy!

Debugging

  • Use Channel.view() to display the content of the Channel;
  • Use Channel.dump(tag: 'foo') that will be silent except if you execute a pipeline using the following command to see channels with directive tag to foo:
$ nextflow run mypipe.nf -dump-channels foo
  • Nextflow creates 1 folder work/{hash}/{hash} per process;
  • Inputs are symlinks (don’t create symlink in process or modify inputs);
  • Outputs are physical files;
  • When a process goes wrong edit .command.sh and resume the pipeline with option -resume.
work
`-- a3
    `-- 5efc6e8b5c1779f668fee0a17a07e2
        |-- .command.begin
        |-- .command.err
        |-- .command.log
        |-- .command.out
        |-- .command.run
        |-- .command.sh
        |-- .command.trace
        |-- .exitcode
        |-- D1601.bam
        |-- D1601.bam.bai
        `-- 231116_A00514_1601_AHNYYJDSX7 -> /mnt/beegfs/DATA/231116_A00514_1601_AHNYYJDSX7

Metrics

$ nextflow run <pipeline> -with-report [file name]
$ nextflow run <pipeline> -with-timeline
$ nextflow run <pipeline> -with-trace

Metrics

$ nextflow run main.nf -with-dag flowchart.png

The dot command in GraphViz is necessary to produce this graph and is installed automatically in Nextflow.

Nextflow - Container - 1

  • Nextflow support docker/singularity/apptainer;
  • Nextflow will launch apptainer exec;
$ nextflow run main.nf -with-apptainer [apptainer image file] //not recommended
  • With one container use the nextflow.config:
process.container = '/path/to/apptainer.img'
apptainer.enabled = true
apptainer.autoMounts = true
apptainer.runOptions="--containall"

Nextflow - Container - 2

  • good practice: 1 container per tool; 1 tool per process:

process FastQC {
    label: 'fastQC'
    ...
}
  • With multiple containers use the process.config:
process {
    withLabel:fastQC {
        container = 'fastqc.sif'
    }
    withLabel:multiQC {
        container = 'multiqc.sif'
    }
}
apptainer {
    autoMounts = true
    enabled = true
    runOptions="--containall"
}

Nextflow - Conda

  • Enable Conda in nextflow.config file
conda.enabled = true
  • Nextflow automatically creates and activates the Conda environment(s);
  • The dependencies are specified by each process with conda directive;
  • You can use label like containers to specify yaml files.
process hello {
  conda 'bwa samtools multiqc'
  script:
...
}
  • Or use Conda YAML file (Recommanded)
name: bwa-env
channels:
  - bioconda
dependencies:
  - bwa=0.7.17
name: py-env
channels:
  - conda-forge
  - bioconda
dependencies:
  - pip
  - pip:
    - numpy
process hello {
  conda '/some_path/my-env.yaml' // avoid conda environnement
                                 // in your home
  script:
 ...
}

DSL2

  • In DSL2, since version 22.03.0-edge

  • No more from, into and set in channel declaration;

  • Automatic fork;

  • Process and channel composition with | and &;

  • Nextflow modules for reusable processes and subworkflow;

  • New workflow scope (take:, main:, emit:).

include { foo } from './some/library'
include { bar } from './other/library'

workflow {
    take:
    main:
    foo()
    bar()
    emit:
    bar = bar.out
}

Nextflow - Strict syntax

  • You can use the VSCode Nextflow plugin to help with the transition;

  • Strict syntax is a subset of DSL2;

  • Available in version 25.02;

  • Set environment variable:

NXF_SYNTAX_PARSER=v2
  • The goal of strict syntax is to facilitate the debugging;
  • It affects Groovy calls essentially and the way of publishing results;
  • References:

🔗 https://www.nextflow.io/docs/latest/strict-syntax.html

Good Practices

  • How to force execution of two processes without relation?

=> Add a dependency with a dedicated channel:

process foo{
        output:
        val(true), emit doneCh

        script:
        ”””
        your command here
        ”””
}

process bar{
        input:
        val(flag)

        script ...
}
  • To solve reproducibility issue.

workflow {
    foo()
    bar(foo.out.doneCh)
}

Good Practices

  • How to avoid race condition?
//code with a race condition (share X between the command :global)
Channel.of(1,2,3) | map { it -> X=it; X+=2 } | view { "ch1 = $it" }
Channel.of(1,2,3) | map { it -> X=it; X*=2 } | view { "ch2 = $it" }

🗎 example6.nf

Good Practices

  • How to avoid race condition?

    • Using the def keyword makes the variable local to the enclosing scope;

    • Omitting the def keyword makes the variable global to the entire script.

// local variable
Channel.of(1,2,3) | map { it -> def X=it; X+=2 } | view { "ch1 = $it" }
// global variable
Channel.of(1,2,3) | map { it -> X=it; X*=2 } | view { "ch2 = $it" }

🗎 example6.nf

Good Practices

How to avoid non-deterministic process inputs?

workflow {
    ch_foo = Channel.of (['1','1.foo'],['2','2.foo'])
    ch_bar = Channel.of (['1','1.bar'],['2','2.bar'])
    test(ch_foo).merge(ch_bar).view()
}
process test {
    input: val x
    output: val x
    """
    echo $x
    """
}

🗎 example7.nf

Good Practices

  • Join will use the key:
workflow {
    ch_foo = Channel.of( ['1', '1.foo'], ['2', '2.foo'] )
    ch_bar = Channel.of( ['2', '2.bar'], ['1', '1.bar'] )
    ch_foo.join(ch_bar)
}
  • join == inner join (SQL) : no duplicate keys;
  • combine == join (SQL).

Good Practices

Use runtime and Nextflow global namespaces

  • Nextflow namespaces:
nextflow.build
nextflow.version
  • Workflow namespaces:
workflow.commandLine
workflow.complete
workflow.homeDir
workflow.launchDir
workflow.onComplete {
    println "Pipeline completed at: $workflow.complete"
    println "Execution status: ${ workflow.success ? 'OK' : 'failed' }"
}
workflow.onError {
    println "Error: Pipeline execution stopped with the following message: ${workflow.errorMessage}"
}

🔗 https://nextflow.io/docs/latest/reference/stdlib-namespaces.html

Good Practices

  • Resuming a pipeline is a good test -resume:

  • On successful execution, the result of a process is stored in the work and used by the caching directive;

  • Resuming helps you detect reproducibility and channel error;

  • Delete work files with the -cleanup option;

  • You can’t use both -resume and -cleanup options.

Good Practices

Good Practices

  • Modularity in process, workflow, configuration;

Exemple Geniac or nf-core:

## DSL2 // 

assets/
bin/
conf/
docs/
recipes/
test/
nf-modules/common/process
nf-modules/common/subworkflow
nf-modules/local/process
nf-modules/local/subworkflow
main.nf
nextflow.config
  • nextflow.config: the default config file which will load all the others;
  • base.config: default config – no need to change it;
  • cluster.config: define which cluster/scheduler to use;
  • genome.config: define all annotation files (from /data/annotation/pipelines);
  • process.config: define the RAM/CPU for each labels. Can be customized for some specific process;
  • singularity.config: define the location of the containers;
  • conda.config: define the location of the conda yml recipes.

Good Practices

  • Simplify: 1 process = 1 task;
  • Move publishDir, and options in config file (modules.config).
process bwaMem{
  tag "${prefix}"
  label 'bwa'
  label 'medCpu'

  input:
  tuple val(prefix), path(reads), path(index)
  output:
  tuple val(prefix), path("*.sam"), emit: sam

  script:
  def args = params.args ?: '' // params.arg is define in the nextflow.config file
  """
  localIndex=`find -L ./ -name "*.amb" | sed 's/.amb//'`
  bwa mem $params.bwaOpts $args -t $task.cpus \${localIndex} $reads
  """
}

Good Practices

  • modules.config:
withName:'bwaMem' {
    publishDir = [
      [
        path: { "${params.outDir}/preprocessing/${prefix}/sam/mapping/" },
        mode: 'copy',
        pattern: "*.sam",
      ]
    ]
  }
  • process.config:
  withLabel: lowMemory {
    memory = { 2GB * task.attempt }
  }
  withLabel: medMemory {
    memory = { 4GB * task.attempt }
  }

Good practice - Portability

  • Use and re-use sub-workflow:
include bwaMem from './../process/bwa'
include samtoolsSort from './../process/samtools'

workflow mapping {
        take:
            reads
            index
        main:
            bwaMem( reads, index )
            samtoolsSort( bwaMem.out.bam )
        emit:
            bam = samtoolsSort.out.bam
}
include bwaMem from './../subworkflow/mapping'
include samtoolsSort from './../process/fastqc'

workflow {

        chReads         = ...
        chBwaIndex      = ...

        main:
            fastqc( chReads )
            mapping( chReads, chBwaIndex.collect() )
}

Good practice - sub-workflow or process

  • You can’t call the same process or sub-workflow multiple times with the same name

include { mySubWorkflow as stepA } from './subworkflow.nf'
include { mySubWorkflow as stepB } from './subworkflow.nf'

workflow {
    stepA(Channel.of(1, 2, 3))
    Channel.of(10, 20, 30) | stepB

    stepA.out.view { "Step A output: $it" }
    stepB.out.view { "Step B output: $it" }
}

Thank you for your attention!