Main concepts
|
Operators work on Channels
Directives work on Processes
Workflows chain Channels
2025-10-30
Frédéric Jarlier, Julien Roméjon, Philippe Hupé, Laurent Jourdren, Quentin Duvert
Introduction
Channels
Operators
Process
Configuration & Profile
Error Management & Debugging
Metrics
$ in your
terminal:$ in your
terminal:$ module load nextflow
$ echo "println \"Hello\"" > hello.nf
$ nextflow run hello.nf
N E X T F L O W ~ version 22.10.7
Launching `hello.nf` [trusting_bohr] DSL2 - revision: da3698fdd8
HelloNote: The Nextflow version may differs.
|
Operators work on Channels
Directives work on Processes
Workflows chain Channels
Recieving a message is synchronous (blocking);
Sending a message is asynchronous (non blocking, not determinist);
Execution order in nextflow depends on channel availability;
Advantages:
Inconvenient:
// Value channels
Channel.value(“toto”)
Channel.value([1,2,3])
// Queue channels
Channel.of(1,2,3)
Channel.of([1,2,3])
Channel.fromPath(“/input/*.gz”)
🗎 example1.nf
// Queue Channels
ch1 = Channel.of( 1, 3, 5, 7, 9 )
ch2 = Channel.of( [1, 3, 5, 7, 9] )
ch3 = Channel.of( [1, 2], [5, 6], [7, 9] )
Channel.fromList([ 'a', 'b', 'c', 'd'] )
.view()
files = Channel.fromPath( 'data/*.fa' )
moreFiles = Channel.fromPath( 'data/**/*.fa' )
pairFiles = Channel.fromPath( 'data/file_{1,2}' )** for directory recursion
Operators manipulate channels;
Operators return queue channels or value channels;
With operators you can:
filter, first,
unique…)map, groupTuple,
collect, flatten…)splitCSV,
splitFasta…)multiMap, branch…)count, min,
max…)view, dump…)🔗 https://www.nextflow.io/docs/latest/reference/operator.html
Closures are lambda or anonymous functions;
Closures are blocks of code that can be passed as argument of an operator;
Closures are written between brackets:
{ put the code here };
By default there is only 1 argument : it (Warning:
only the it variable is
accepted);
With multiple arguments the map operator applies a
lambda function to each argument from a source channel.
🗎 example2.nf
Warning: when you need to merge two or more channels:
collect or
join operators.process < name > {
[directive]
input:
< process inputs >
output:
< process outputs >, emit: name
[script]:
< user script to be executed>
}🔗 https://www.nextflow.io/docs/latest/reference/syntax.html#process
process doOtherThing {
input: val MAX
script:
"""
DB="path_to_data_base"
blast -db \${DB} -query query.fa -outfmt 6 > blast_result
cat blast_result | head -n ${MAX} | cut -f2 > top_hits
blastdbcmd -db \${DB} -entry_batch top_hits > sequences
"""
}\${DB} or \$DB is a Bash
variable
${MAX} or $MAX is a
Nextflow variable
the input types can be:
valpathtuple(less used)
envstdineach [input collection]the output types can be:
valpathtuple(less used)
envstdoutevalA tuple variable gather multiple values in a single
cell of a channel;
If inputs of different sizes the first empty stops the process.
🔗 https://www.nextflow.io/docs/latest/process.html#inputs
🔗
https://www.nextflow.io/docs/latest/process.html#outputs
process align {
cpus 1
memory "200MB"
// executor "local"
executor "slurm"
// queue "diag"
beforeScript "source fun.sh"
label "exemple_dir1"
debug true
//tag appears in the .nextflow.log
tag "my_job"
input: [...]
output: [...]
script: [...]
}🔗 https://www.nextflow.io/docs/latest/process.html#directives
The publishDir directive organize your results and
the output files;
Don’t use publishDir as input of a process;
if there is no options in publishDir directive, all
the outputs of the process will be published;
The mode: 'link' option allow to use hard links
instead of copying files.
process foo {
publishDir './data/chunks' , mode: 'copy', overwrite: false
output:
path 'chunk_*'
”””
printf 'Hola' | split -b 1 - chunk_
”””
}🗎 example3.nf
#!) is not used in the beginning of the
script code, Bash will be the default interpreter.🗎 example4.nf
if/else statements;switch/case statements are deprecated
and not allowed in strict syntax.mode = 'tcoffee'
process align {
input:
path sequences
script:
if( mode == 'tcoffee' )
"""
t_coffee -in $sequences > out_file
"""
else if( mode == 'mafft' )
"""
mafft --anysymbol --parttree --quiet $sequences > out_file
"""
else
error "Invalid alignment mode: ${mode}"
}🗎 example5.nf
Nextlflow allow prototyping workflow without real data;
You just need to add a stub: section to your
process:
-stub-run: workflow is a function specialized for manipulating
processes, channels and operators;workflow can be anonymous (main) or named
(sub-workflow);take: and emit: sections are optional in
named workflows;publish: section in a workflow is an on going
development (planned in 25.10 version);include { pythonProcess1; pythonProcess2 } from './modules/pythonProcess/mains.nf'
workflow {
pythonProcess1()
pythonProcess2()
}include { task as task_alpha } from './some/module' //allow to import multiple times the same process🗎 example4.nf
When launched, Nextflow looks for configuration files in multiple locations (from low to high priority):
-c <config-file>
option-params-file
option)--something value)🔗 https://nextflow.io/docs/latest/reference/config.html
main.nf
process process1 {
label "someLabel"
script:
def go = params.flag ?: false
// ...
"""
bash script.sh # using $myvar
"""
}nextflow run main.nf --flag true
Constants (Global Namespaces)
launchDir: where you execute the pipelineprojectDir: where the main script is🔗 https://nextflow.io/docs/latest/reference/stdlib-namespaces.html
Configuration that can be choose at the execution;
Configuration files can contain the definition of profiles;
Select configuration profile with the option
-profile of Nextflow command line;
The standard profile is selected when no profile is given;
Multiple profiles can be selected:
process example {
errorStrategy "terminate" // kill all submitted jobs (default)
errorStrategy "finish" // let spawned job finish
errorStrategy "retry" // requeue the job
errorStrategy "ignore" // go to the next process
//or with closure
errorStrategy { sleep(Math.pow(2, task.attempt) * 30 as long); return ‘retry’ }
maxRetries 5 // when errorStrategy = retry, single process
maxErrors 2 // total number of errors (all process instances)
validExitStatus 0,1 // Great power == Great responsibility
memory {2.GB * task.attempt}
script: [...]
}task.attempt is only available in a process scope;Channel.view() to display the content of the
Channel;Channel.dump(tag: 'foo') that will be silent except
if you execute a pipeline using the following command to see channels
with directive tag to foo:-resume.work
`-- a3
`-- 5efc6e8b5c1779f668fee0a17a07e2
|-- .command.begin
|-- .command.err
|-- .command.log
|-- .command.out
|-- .command.run
|-- .command.sh
|-- .command.trace
|-- .exitcode
|-- D1601.bam
|-- D1601.bam.bai
`-- 231116_A00514_1601_AHNYYJDSX7 -> /mnt/beegfs/DATA/231116_A00514_1601_AHNYYJDSX7
$ nextflow run <pipeline> -with-report [file name]
$ nextflow run <pipeline> -with-timeline
$ nextflow run <pipeline> -with-traceIn DSL2, since version 22.03.0-edge
No more from, into and set in channel declaration;
Automatic fork;
Process and channel composition with | and
&;
Nextflow modules for reusable processes and subworkflow;
New workflow scope (take:, main:,
emit:).
You can use the VSCode Nextflow plugin to help with the transition;
Strict syntax is a subset of DSL2;
Available in version 25.02;
Set environment variable:
=> Add a dependency with a dedicated channel:
How to avoid race condition?
Using the def keyword makes the variable local to the enclosing scope;
Omitting the def keyword makes the variable global to the entire script.
// local variable
Channel.of(1,2,3) | map { it -> def X=it; X+=2 } | view { "ch1 = $it" }
// global variable
Channel.of(1,2,3) | map { it -> X=it; X*=2 } | view { "ch2 = $it" }🗎 example6.nf
Use runtime and Nextflow global namespaces
workflow.onComplete {
println "Pipeline completed at: $workflow.complete"
println "Execution status: ${ workflow.success ? 'OK' : 'failed' }"
}workflow.onError {
println "Error: Pipeline execution stopped with the following message: ${workflow.errorMessage}"
}🔗 https://nextflow.io/docs/latest/reference/stdlib-namespaces.html
-resume:On successful execution, the result of a process is stored in the work and used by the caching directive;
Resuming helps you detect reproducibility and channel error;
Delete work files with the -cleanup option;
You can’t use both -resume and -cleanup
options.
Exemple Geniac or nf-core:
## DSL2 //
assets/
bin/
conf/
docs/
recipes/
test/
nf-modules/common/process
nf-modules/common/subworkflow
nf-modules/local/process
nf-modules/local/subworkflow
main.nf
nextflow.config
publishDir, and options in config file
(modules.config).process bwaMem{
tag "${prefix}"
label 'bwa'
label 'medCpu'
input:
tuple val(prefix), path(reads), path(index)
output:
tuple val(prefix), path("*.sam"), emit: sam
script:
def args = params.args ?: '' // params.arg is define in the nextflow.config file
"""
localIndex=`find -L ./ -name "*.amb" | sed 's/.amb//'`
bwa mem $params.bwaOpts $args -t $task.cpus \${localIndex} $reads
"""
}withName:'bwaMem' {
publishDir = [
[
path: { "${params.outDir}/preprocessing/${prefix}/sam/mapping/" },
mode: 'copy',
pattern: "*.sam",
]
]
}