From ea1284633043109b8b69da6b1fe883ce72880981 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 24 Oct 2024 09:52:26 -0500 Subject: [PATCH] Workflow output definition (second preview) (#5185) Signed-off-by: Ben Sherman Signed-off-by: Paolo Di Tommaso Co-authored-by: Paolo Di Tommaso Co-authored-by: Christopher Hakkaart --- docs/reference/cli.md | 5 + docs/reference/config.md | 105 ++++-- docs/reference/stdlib.md | 5 + docs/workflow.md | 301 +++++++++--------- .../src/main/groovy/nextflow/Session.groovy | 21 +- .../nextflow/ast/NextflowDSLImpl.groovy | 38 +-- .../main/groovy/nextflow/cli/CmdRun.groovy | 3 + .../nextflow/config/ConfigBuilder.groovy | 4 + .../nextflow/extension/PublishOp.groovy | 40 ++- .../nextflow/processor/PublishDir.groovy | 16 +- .../groovy/nextflow/script/OutputDsl.groovy | 100 ++---- .../nextflow/script/ProcessConfig.groovy | 19 -- .../groovy/nextflow/script/ProcessDef.groovy | 9 - .../nextflow/script/WorkflowMetadata.groovy | 6 + .../nextflow/trace/TraceObserver.groovy | 7 + .../nextflow/script/OutputDslTest.groovy | 74 ++--- tests/output-dsl.nf | 18 +- 17 files changed, 422 insertions(+), 349 deletions(-) diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 92a356fb74..43ab14c4e2 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -1015,6 +1015,11 @@ The `run` command is used to execute a local pipeline script or remote pipeline `-offline` : Do not check for remote project updates. +`-o, -output-dir` (`results`) +: :::{versionadded} 24.10.0 + ::: +: Directory where workflow outputs are stored. + `-params-file` : Load script parameters from a JSON/YAML file. diff --git a/docs/reference/config.md b/docs/reference/config.md index 41fbc289ea..ca305ddcf7 100644 --- a/docs/reference/config.md +++ b/docs/reference/config.md @@ -18,6 +18,11 @@ This page lists all of the available settings in the {ref}`Nextflow configuratio `dumpHashes` : If `true`, dump task hash keys in the log file, for debugging purposes. Equivalent to the `-dump-hashes` option of the `run` command. +`outputDir` +: :::{versionadded} 24.10.0 + ::: +: Defines the pipeline output directory. Equivalent to the `-output-dir` option of the `run` command. + `resume` : If `true`, enable the use of previously cached task executions. Equivalent to the `-resume` option of the `run` command. @@ -1226,27 +1231,9 @@ Read the {ref}`sharing-page` page to learn how to publish your pipeline to GitHu ## `nextflow` -The `nextflow` scope provides configuration options for the Nextflow runtime. - -`nextflow.publish.retryPolicy.delay` -: :::{versionadded} 24.03.0-edge - ::: -: Delay when retrying a failed publish operation (default: `350ms`). - -`nextflow.publish.retryPolicy.jitter` -: :::{versionadded} 24.03.0-edge - ::: -: Jitter value when retrying a failed publish operation (default: `0.25`). - -`nextflow.publish.retryPolicy.maxAttempt` -: :::{versionadded} 24.03.0-edge - ::: -: Max attempts when retrying a failed publish operation (default: `5`). - -`nextflow.publish.retryPolicy.maxDelay` -: :::{versionadded} 24.03.0-edge - ::: -: Max delay when retrying a failed publish operation (default: `90s`). +:::{deprecated} 24.10.0 +The `nextflow.publish` scope has been renamed to `workflow.output`. See {ref}`config-workflow` for more information. +::: (config-notification)= @@ -1600,6 +1587,9 @@ The following settings are available: ## `workflow` +:::{versionadded} 24.10.0 +::: + The `workflow` scope provides workflow execution options. `workflow.failOnIgnore` @@ -1612,3 +1602,76 @@ The `workflow` scope provides workflow execution options. `workflow.onError` : Specify a closure that will be invoked if a workflow run is terminated. See {ref}`workflow-handlers` for more information. + +`workflow.output.contentType` +: *Currently only supported for S3.* +: Specify the media type, also known as [MIME type](https://developer.mozilla.org/en-US/docs/Web/HTTP/MIME_types), of published files (default: `false`). Can be a string (e.g. `'text/html'`), or `true` to infer the content type from the file extension. + +`workflow.output.enabled` +: Enable or disable publishing (default: `true`). + +`workflow.output.ignoreErrors` +: When `true`, the workflow will not fail if a file can't be published for some reason (default: `false`). + +`workflow.output.mode` +: The file publishing method (default: `'symlink'`). The following options are available: + + `'copy'` + : Copy each file into the output directory. + + `'copyNoFollow'` + : Copy each file into the output directory without following symlinks, i.e. only the link is copied. + + `'link'` + : Create a hard link in the output directory for each file. + + `'move'` + : Move each file into the output directory. + : Should only be used for files which are not used by downstream processes in the workflow. + + `'rellink'` + : Create a relative symbolic link in the output directory for each file. + + `'symlink'` + : Create an absolute symbolic link in the output directory for each output file. + +`workflow.output.overwrite` +: When `true` any existing file in the specified folder will be overwritten (default: `'standard'`). The following options are available: + + `false` + : Never overwrite existing files. + + `true` + : Always overwrite existing files. + + `'deep'` + : Overwrite existing files when the file content is different. + + `'lenient'` + : Overwrite existing files when the file size is different. + + `'standard'` + : Overwrite existing files when the file size or last modified timestamp is different. + +`workflow.output.retryPolicy.delay` +: Delay when retrying a failed publish operation (default: `350ms`). + +`workflow.output.retryPolicy.jitter` +: Jitter value when retrying a failed publish operation (default: `0.25`). + +`workflow.output.retryPolicy.maxAttempt` +: Max attempts when retrying a failed publish operation (default: `5`). + +`workflow.output.retryPolicy.maxDelay` +: Max delay when retrying a failed publish operation (default: `90s`). + +`workflow.output.storageClass` +: *Currently only supported for S3.* +: Specify the storage class for published files. + +`workflow.output.tags` +: *Currently only supported for S3.* +: Specify arbitrary tags for published files. For example: + ```groovy + tags FOO: 'hello', BAR: 'world' + ``` diff --git a/docs/reference/stdlib.md b/docs/reference/stdlib.md index a211e4ab13..af5acd262e 100644 --- a/docs/reference/stdlib.md +++ b/docs/reference/stdlib.md @@ -128,6 +128,11 @@ The following constants are globally available in a Nextflow script: `workflow.manifest` : Entries of the workflow manifest. + `workflow.outputDir` + : :::{versionadded} 24.10.0 + ::: + : Workflow output directory. + `workflow.preview` : :::{versionadded} 24.04.0 ::: diff --git a/docs/workflow.md b/docs/workflow.md index 69a66b166e..9da30592f3 100644 --- a/docs/workflow.md +++ b/docs/workflow.md @@ -375,27 +375,17 @@ workflow { :::{versionadded} 24.04.0 ::: +:::{versionchanged} 24.10.0 +A second preview version has been introduced. Read the [migration notes](#migrating-from-first-preview) for details. +::: + :::{note} This feature requires the `nextflow.preview.output` feature flag to be enabled. ::: -A script may define the set of outputs that should be published by the entry workflow, known as the workflow output definition: - -```groovy -workflow { - foo(bar()) -} - -output { - directory 'results' -} -``` - -The output definition must be defined after the entry workflow. +A workflow can publish outputs by sending channels to "publish targets" in the workflow `publish` section. Any channel in the workflow can be published, including process and subworkflow outputs. This approach is intended to replace the {ref}`publishDir ` directive. -### Publishing channels - -Processes and workflows can each define a `publish` section which maps channels to publish targets. For example: +Here is a basic example: ```groovy process foo { @@ -404,70 +394,54 @@ process foo { output: path 'result.txt', emit: results - publish: - results >> 'foo' + // ... +} +process bar { // ... } -workflow foobar { +workflow { main: foo(data) bar(foo.out) publish: - foo.out >> 'foobar/foo' - - emit: - bar.out + foo.out.results >> 'foo' + bar.out >> 'bar' } ``` -In the above example, the output `results` of process `foo` is published to the target `foo/` by default. However, when the workflow `foobar` invokes process `foo`, it publishes `foo.out` (i.e. `foo.out.results`) to the target `foobar/foo/`, overriding the default target defined by `foo`. +In the above example, the `results` output of process `foo` is published to the target `foo`, and all outputs of process `bar` are published to the target `bar`. -In a process, any output with an `emit` name can be published. In a workflow, any channel defined in the workflow, including process and subworkflow outputs, can be published. +A "publish target" is simply a name that identifies a group of related outputs. How these targets are saved into a directory structure is described in the next section. -:::{note} -If the publish source is a process/workflow output (e.g. `foo.out`) with multiple channels, each channel will be published. Individual output channels can also be published by index or name (e.g. `foo.out[0]` or `foo.out.results`). +:::{tip} +A workflow can override the publish targets of a subworkflow by "re-publishing" the same channels to a different target. However, the best practice is to define all publish targets in the entry workflow, so that all publish targets are defined in one place at the top-level. ::: -As shown in the example, workflows can override the publish targets of process and subworkflow outputs. This way, each process and workflow can define some sensible defaults for publishing, which can be overridden by calling workflows as needed. +### Output directory -By default, all files emitted by the channel will be published into the specified directory. If a channel emits list values, any files in the list (including nested lists) will also be published. For example: +The top-level output directory of a workflow run can be set using the `-output-dir` command-line option or the `outputDir` config option: -```groovy -workflow { - ch_samples = Channel.of( - [ [id: 'sample1'], file('sample1.txt') ] - ) - - publish: - ch_samples >> 'samples' // sample1.txt will be published -} +```bash +nextflow run main.nf -output-dir 'my-results' ``` -### Publish directory - -The `directory` statement is used to set the top-level publish directory of the workflow: - ```groovy -output { - directory 'results' - - // ... -} +// nextflow.config +outputDir = 'my-results' ``` -It is optional, and it defaults to the launch directory (`workflow.launchDir`). Published files will be saved within this directory. +It defaults to `results` in the launch directory. All published outputs will be saved into this directory. -### Publish targets +Each publish target is saved into a subdirectory of the output directory. By default, the target name is used as the directory name. -A publish target is a name with a specific publish configuration. By default, when a channel is published to a target in the `publish:` section of a process or workflow, the target name is used as the publish path. - -For example, given the following output definition: +For example, given the following publish targets: ```groovy workflow { + main: ch_foo = foo() ch_bar = bar(ch_foo) @@ -475,10 +449,6 @@ workflow { ch_foo >> 'foo' ch_bar >> 'bar' } - -output { - directory 'results' -} ``` The following directory structure will be created: @@ -491,130 +461,132 @@ results/ └── ... ``` -:::{note} -The trailing slash in the target name is not required; it is only used to denote that the target name is intended to be used as the publish path. -::: - :::{warning} -The target name must not begin with a slash (`/`), it should be a relative path name. +Target names cannot begin or end with a slash (`/`). ::: -Workflows can also disable publishing for specific channels by redirecting them to `null`: +By default, all files emitted by a published channel will be published into the specified directory. If a channel emits list values, each file in the list (including nested lists) will be published. For example: ```groovy workflow { - ch_foo = foo() + main: + ch_samples = Channel.of( + [ [id: 'foo'], [ file('1.txt'), file('2.txt') ] ] + ) publish: - ch_foo >> (params.save_foo ? 'foo' : null) + ch_samples >> 'samples' // 1.txt and 2.txt will be published } ``` -Publish targets can be customized in the output definition using a set of options similar to the {ref}`process-publishdir` directive. - -For example: +A workflow can also disable publishing for a specific channel by redirecting it to `null`: ```groovy -output { - directory 'results' - mode 'copy' +workflow { + main: + ch_foo = foo() - 'foo' { - mode 'link' - } + publish: + ch_foo >> (params.save_foo ? 'foo' : null) } ``` -In this example, all files will be copied by default, and files published to `foo/` will be hard-linked, overriding the default option. +### Customizing outputs -Available options: +The output directory structure can be customized further in the "output block", which can be defined alongside an entry workflow. The output block consists of "target" blocks, which can be used to customize specific targets. -`contentType` -: *Currently only supported for S3.* -: Specify the media type a.k.a. [MIME type](https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_Types) of published files (default: `false`). Can be a string (e.g. `'text/html'`), or `true` to infer the content type from the file extension. +For example: -`enabled` -: Enable or disable publishing (default: `true`). +```groovy +workflow { + // ... +} -`ignoreErrors` -: When `true`, the workflow will not fail if a file can't be published for some reason (default: `false`). +output { + 'foo' { + enabled params.save_foo + path 'intermediates/foo' + } -`mode` -: The file publishing method (default: `'symlink'`). The following options are available: + 'bar' { + mode 'copy' + } +} +``` - `'copy'` - : Copy each file into the output directory. +This output block has the following effect: - `'copyNoFollow'` - : Copy each file into the output directory without following symlinks, i.e. only the link is copied. +- The target `foo` will be published only if `params.save_foo` is enabled, and it will be published to a different path within the output directory. - `'link'` - : Create a hard link in the output directory for each file. +- The target `bar` will publish files via copy instead of symlink. - `'move'` - : Move each file into the output directory. - : Should only be used for files which are not used by downstream processes in the workflow. +See [Reference](#reference) for all available directives in the output block. - `'rellink'` - : Create a relative symbolic link in the output directory for each file. +:::{tip} +The output block is only needed if you want to customize the behavior of specific targets. If you are satisfied with the default behavior and don't need to customize anything, the output block can be omitted. +::: - `'symlink'` - : Create an absolute symbolic link in the output directory for each output file. +### Dynamic publish path -`overwrite` -: When `true` any existing file in the specified folder will be overwritten (default: `'standard'`). The following options are available: +The `path` directive in a target block can also be a closure which defines a custom publish path for each channel value: - `false` - : Never overwrite existing files. +```groovy +workflow { + main: + ch_fastq = Channel.of( [ [id: 'SAMP1'], file('1.fastq'), file('2.fastq') ] ) - `true` - : Always overwrite existing files. + publish: + ch_fastq >> 'fastq' +} - `'deep'` - : Overwrite existing files when the file content is different. +output { + 'fastq' { + path { meta, fastq_1, fastq_2 -> "fastq/${meta.id}" } + } +} +``` - `'lenient'` - : Overwrite existing files when the file size is different. +The above example will publish each channel value to a different subdirectory. In this case, each pair of FASTQ files will be published to a subdirectory based on the sample ID. - `'standard'` - : Overwrite existing files when the file size or last modified timestamp is different. +The closure can even define a different path for each individual file by returning an inner closure, similar to the `saveAs` option of the {ref}`publishDir ` directive: -`path` -: Specify the publish path relative to the output directory (default: the target name). Can only be specified within a target definition. +```groovy +output { + 'fastq' { + path { meta, fastq_1, fastq_2 -> + { file -> "fastq/${meta.id}/${file.baseName}" } + } + } +} +``` -`storageClass` -: *Currently only supported for S3.* -: Specify the storage class for published files. +The inner closure will be applied to each file in the channel value, in this case `fastq_1` and `fastq_2`. -`tags` -: *Currently only supported for S3.* -: Specify arbitrary tags for published files. For example: - ```groovy - tags FOO: 'hello', BAR: 'world' - ``` +:::{tip} +A mapping closure should usually have only one parameter. However, if the incoming values are tuples, the closure can specify a parameter for each tuple element for more convenient access, also known as "destructuring" or "unpacking". +::: ### Index files -A publish target can create an index file of the values that were published. An index file is a useful way to save the metadata associated with files, and is more flexible than encoding metadata in the file path. Currently only CSV files are supported. +A publish target can create an index file of the values that were published. An index file preserves the structure of channel values, including metadata, which is simpler than encoding this information with directories and file names. The index file can be CSV (`.csv`) or JSON (`.json`). For example: ```groovy workflow { - ch_foo = Channel.of( - [id: 1, name: 'foo 1'], - [id: 2, name: 'foo 2'], - [id: 3, name: 'foo 3'] + main: + ch_fastq = Channel.of( + [ [id: 1, name: 'sample 1'], '1a.fastq', '1b.fastq' ], + [ [id: 2, name: 'sample 2'], '2a.fastq', '2b.fastq' ], + [ [id: 3, name: 'sample 3'], '3a.fastq', '3b.fastq' ] ) publish: - ch_foo >> 'foo' + ch_fastq >> 'fastq' } output { - directory 'results' - - 'foo' { + 'fastq' { index { path 'index.csv' } @@ -622,36 +594,75 @@ output { } ``` -The above example will write the following CSV file to `results/foo/index.csv`: +The above example will write the following CSV file to `results/fastq/index.csv`: ```csv -"id","name" -"1","foo 1" -"2","foo 2" -"3","foo 3" +"id","name","fastq_1","fastq_2" +"1","sample 1","results/fastq/1a.fastq","results/fastq/1b.fastq" +"2","sample 2","results/fastq/2a.fastq","results/fastq/2b.fastq" +"3","sample 3","results/fastq/3a.fastq","results/fastq/3b.fastq" ``` -You can customize the index file by specifying options in a block, for example: +You can customize the index file with additional directives, for example: ```groovy index { path 'index.csv' - header ['name', 'extra_option'] + header ['id', 'fastq_1', 'fastq_1'] sep '\t' - mapper { val -> val + [extra_option: 'bar'] } + mapper { meta, fq_1, fq_2 -> meta + [fastq_1: fq_1, fastq_2: fq_2] } } ``` -The following options are available: +This example will produce the same index file as above, but with the `name` column removed and with tabs instead of commas. + +See [Reference](#reference) for the list of available index directives. + +### Migrating from first preview + +The first preview of workflow publishing was introduced in 24.04. The second preview, introduced in 24.10, made the following breaking changes: -`header` -: When `true`, the keys of the first record are used as the column names (default: `false`). Can also be a list of column names. +- The process `publish:` section has been removed. Channels should be published only in workflows, ideally the entry workflow. -`mapper` -: Closure which defines how to transform each published value into a CSV record. The closure should return a list or map. By default, no transformation is applied. +- The `directory` output directive has been replaced with the `outputDir` config option and `-output-dir` command line option, which is `results` by default. The other directives such as `mode` have been replaced with config options under `workflow.output.*`. + + In other words, only target blocks can be specified in the output block, but target blocks can still specify directives such as `mode`. + +- Target names cannot begin or end with a slash (`/`); + +### Reference + +The following directives are available in a target block: + +`index` +: Create an index file which will contain a record of each published value. + + The following directives are available in an index definition: + + `header` + : When `true`, the keys of the first record are used as the column names (default: `false`). Can also be a list of column names. Only used for `csv` files. + + `mapper` + : Closure which defines how to transform each published value into a record. The closure should return a list or map. By default, no transformation is applied. + + `path` + : The name of the index file relative to the target path (required). Can be a `csv` or `json` file. + + `sep` + : The character used to separate values (default: `','`). Only used for `csv` files. `path` -: The name of the index file relative to the target path (required). +: Specify the publish path relative to the output directory (default: the target name). Can be a path, a closure that defines a custom directory for each published value, or a closure that defines a custom path for each individual file. -`sep` -: The character used to separate values (default: `','`). +Additionally, the following options from the {ref}`workflow ` config scope can be specified as directives: +- `contentType` +- `enabled` +- `ignoreErrors` +- `mode` +- `overwrite` +- `storageClass` +- `tags` + +:::{note} +Similarly to process directives vs {ref}`process ` config options, directives in the `output` block are specified without an equals sign (`=`). +::: diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 80845253b8..9b7ff555e7 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -122,6 +122,11 @@ class Session implements ISession { */ boolean resumeMode + /** + * The folder where workflow outputs are stored + */ + Path outputDir + /** * The folder where tasks temporary files are stored */ @@ -375,8 +380,11 @@ class Session implements ISession { // -- DAG object this.dag = new DAG() + // -- init output dir + this.outputDir = FileHelper.toCanonicalPath(config.outputDir ?: 'results') + // -- init work dir - this.workDir = ((config.workDir ?: 'work') as Path).complete() + this.workDir = FileHelper.toCanonicalPath(config.workDir ?: 'work') this.setLibDir( config.libDir as String ) // -- init cloud cache path @@ -1116,6 +1124,17 @@ class Session implements ISession { } } + void notifyWorkflowPublish(Object value) { + for( final observer : observers ) { + try { + observer.onWorkflowPublish(value) + } + catch( Exception e ) { + log.error "Failed to invoke observer on workflow publish: $observer", e + } + } + } + void notifyFilePublish(Path destination, Path source=null) { def copy = new ArrayList(observers) for( TraceObserver observer : copy ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy b/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy index b3c16b4af7..aa9f09c204 100644 --- a/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/ast/NextflowDSLImpl.groovy @@ -550,23 +550,23 @@ class NextflowDSLImpl implements ASTTransformation { for( Statement stmt : block.statements ) { if( stmt !instanceof ExpressionStatement ) { syntaxError(stmt, "Invalid publish target definition") - return + return } final stmtExpr = (ExpressionStatement)stmt if( stmtExpr.expression !instanceof MethodCallExpression ) { syntaxError(stmt, "Invalid publish target definition") - return + return } final call = (MethodCallExpression)stmtExpr.expression assert call.arguments instanceof ArgumentListExpression - // HACK: target definition is a method call with single closure argument - // custom parser will be able to detect more elegantly final targetArgs = (ArgumentListExpression)call.arguments - if( targetArgs.size() != 1 || targetArgs[0] !instanceof ClosureExpression ) - continue + if( targetArgs.size() != 1 || targetArgs[0] !instanceof ClosureExpression ) { + syntaxError(stmt, "Invalid publish target definition") + return + } final targetName = call.method final targetBody = (ClosureExpression)targetArgs[0] @@ -633,11 +633,6 @@ class NextflowDSLImpl implements ASTTransformation { } break - case 'publish': - if( stm instanceof ExpressionStatement ) - convertPublishMethod( stm ) - break - case 'exec': bodyLabel = currentLabel iterator.remove() @@ -1299,27 +1294,6 @@ class NextflowDSLImpl implements ASTTransformation { return false } - protected void convertPublishMethod(ExpressionStatement stmt) { - if( stmt.expression !instanceof BinaryExpression ) { - syntaxError(stmt, "Invalid process publish statement") - return - } - - final binaryX = (BinaryExpression)stmt.expression - if( binaryX.operation.type != Types.RIGHT_SHIFT ) { - syntaxError(stmt, "Invalid process publish statement") - return - } - - final left = binaryX.leftExpression - if( left !instanceof VariableExpression ) { - syntaxError(stmt, "Invalid process publish statement") - return - } - - stmt.expression = callThisX('_publish_target', args(constX(((VariableExpression)left).name), binaryX.rightExpression)) - } - protected boolean isIllegalName(String name, ASTNode node) { if( name in RESERVED_NAMES ) { unit.addError( new SyntaxException("Identifier `$name` is reserved for internal use", node.lineNumber, node.columnNumber+8) ) diff --git a/modules/nextflow/src/main/groovy/nextflow/cli/CmdRun.groovy b/modules/nextflow/src/main/groovy/nextflow/cli/CmdRun.groovy index e21347bb4f..5f6c904fe0 100644 --- a/modules/nextflow/src/main/groovy/nextflow/cli/CmdRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/cli/CmdRun.groovy @@ -108,6 +108,9 @@ class CmdRun extends CmdBase implements HubOptions { @Parameter(names=['-test'], description = 'Test a script function with the name specified') String test + @Parameter(names=['-o', '-output-dir'], description = 'Directory where workflow outputs are stored') + String outputDir + @Parameter(names=['-w', '-work-dir'], description = 'Directory where intermediate result files are stored') String workDir diff --git a/modules/nextflow/src/main/groovy/nextflow/config/ConfigBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/config/ConfigBuilder.groovy index 08d5f3f011..841a1c9bcc 100644 --- a/modules/nextflow/src/main/groovy/nextflow/config/ConfigBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/config/ConfigBuilder.groovy @@ -545,6 +545,10 @@ class ConfigBuilder { if( cmdRun.stubRun ) config.stubRun = cmdRun.stubRun + // -- set the output directory + if( cmdRun.outputDir ) + config.outputDir = cmdRun.outputDir + if( cmdRun.preview ) config.preview = cmdRun.preview diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy index 12f2e9d896..3f10b7c16c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy @@ -18,6 +18,7 @@ package nextflow.extension import java.nio.file.Path +import groovy.json.JsonOutput import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import groovyx.gpars.dataflow.DataflowReadChannel @@ -36,10 +37,12 @@ class PublishOp { private DataflowReadChannel source - private PublishDir publisher + private Map opts private Path targetDir + private Closure pathAs + private IndexOpts indexOpts private List indexRecords = [] @@ -50,8 +53,10 @@ class PublishOp { PublishOp(DataflowReadChannel source, Map opts) { this.source = source - this.publisher = PublishDir.create(opts) + this.opts = opts this.targetDir = opts.path as Path + if( opts.pathAs instanceof Closure ) + this.pathAs = opts.pathAs as Closure if( opts.index ) this.indexOpts = new IndexOpts(targetDir, opts.index as Map) } @@ -68,6 +73,24 @@ class PublishOp { protected void onNext(value) { log.trace "Publish operator received: $value" + + // evaluate dynamic path + final path = pathAs != null + ? pathAs.call(value) + : targetDir + if( path == null ) + return + + // emit workflow publish event + session.notifyWorkflowPublish(value) + + // create publisher + final overrides = path instanceof Closure + ? [saveAs: path] + : [path: path] + final publisher = PublishDir.create(opts + overrides) + + // publish files final result = collectFiles([:], value) for( final entry : result ) { final sourceDir = entry.key @@ -75,6 +98,7 @@ class PublishOp { publisher.apply(files, sourceDir) } + // create record for index file if( indexOpts ) { final record = indexOpts.mapper != null ? indexOpts.mapper.call(value) : value final normalized = normalizePaths(record) @@ -84,9 +108,15 @@ class PublishOp { } protected void onComplete(nope) { - if( indexOpts && indexRecords.size() > 0 && publisher.enabled ) { + if( indexOpts && indexRecords.size() > 0 ) { log.trace "Saving records to index file: ${indexRecords}" - new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexOpts.path) + final ext = indexOpts.path.getExtension() + if( ext == 'csv' ) + new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexOpts.path) + else if( ext == 'json' ) + indexOpts.path.text = DumpHelper.prettyPrint(indexRecords) + else + log.warn "Invalid extension '${ext}' for index file '${indexOpts.path}' -- should be 'csv' or 'json'" session.notifyFilePublish(indexOpts.path) } @@ -150,7 +180,7 @@ class PublishOp { private Path normalizePath(Path path) { final sourceDir = getTaskDir(path) - return targetDir.resolve(sourceDir.relativize(path)) + return targetDir.resolve(sourceDir.relativize(path)).normalize() } /** diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index ee2a5342ee..4ca5764519 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -219,11 +219,19 @@ class PublishDir { return result } + protected Map getRetryOpts() { + def result = session.config.navigate('nextflow.publish.retryPolicy') as Map + if( result != null ) + log.warn 'The `nextflow.publish` config scope has been renamed to `workflow.output`' + else + result = session.config.navigate('workflow.output.retryPolicy') as Map ?: Collections.emptyMap() + return result + } + protected void apply0(Set files) { assert path - - final retryOpts = session.config.navigate('nextflow.publish.retryPolicy') as Map ?: Collections.emptyMap() - this.retryConfig = new PublishRetryConfig(retryOpts) + // setup the retry policy config to be used + this.retryConfig = new PublishRetryConfig(getRetryOpts()) createPublishDir() validatePublishMode() @@ -317,7 +325,7 @@ class PublishDir { return } - final destination = resolveDestination(target) + final destination = resolveDestination(target).normalize() // apply tags if( this.tags!=null && destination instanceof TagAwareFile ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy b/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy index d7bb9c4f09..79ed13cde0 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy @@ -21,6 +21,8 @@ import java.nio.file.Path import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import groovyx.gpars.dataflow.DataflowWriteChannel +import nextflow.Global +import nextflow.Session import nextflow.exception.ScriptRuntimeException import nextflow.extension.CH import nextflow.extension.MixOp @@ -35,65 +37,15 @@ import nextflow.file.FileHelper @CompileStatic class OutputDsl { - private Map publishConfigs = [:] + private Session session = Global.session as Session - private Path directory - - private Map defaults = [:] + private Map targetConfigs = [:] private volatile List ops = [] - void directory(String directory) { - if( this.directory ) - throw new ScriptRuntimeException("Publish directory cannot be defined more than once in the workflow publish definition") - this.directory = FileHelper.toCanonicalPath(directory) - } - - void contentType(String value) { - setDefault('contentType', value) - } - - void contentType(boolean value) { - setDefault('contentType', value) - } - - void ignoreErrors(boolean value) { - setDefault('ignoreErrors', value) - } - - void mode(String value) { - setDefault('mode', value) - } - - void overwrite(boolean value) { - setDefault('overwrite', value) - } - - void overwrite(String value) { - setDefault('overwrite', value) - } - - void storageClass(String value) { - setDefault('storageClass', value) - } - - void tags(Map value) { - setDefault('tags', value) - } - - void enabled( boolean value ) { - setDefault('enabled', value) - } - - private void setDefault(String name, Object value) { - if( defaults.containsKey(name) ) - throw new ScriptRuntimeException("Default `${name}` option cannot be defined more than once in the workflow publish definition") - defaults[name] = value - } - void target(String name, Closure closure) { - if( publishConfigs.containsKey(name) ) - throw new ScriptRuntimeException("Target '${name}' is defined more than once in the workflow publish definition") + if( targetConfigs.containsKey(name) ) + throw new ScriptRuntimeException("Target '${name}' is defined more than once in the workflow output definition") final dsl = new TargetDsl() final cl = (Closure)closure.clone() @@ -101,10 +53,12 @@ class OutputDsl { cl.setDelegate(dsl) cl.call() - publishConfigs[name] = dsl.getOptions() + targetConfigs[name] = dsl.getOptions() } void build(Map targets) { + final defaults = session.config.navigate('workflow.output', Collections.emptyMap()) as Map + // construct mapping of target name -> source channels final Map> publishSources = [:] for( final source : targets.keySet() ) { @@ -116,22 +70,27 @@ class OutputDsl { publishSources[name] << source } + // validate target configs + for( final name : targetConfigs.keySet() ) { + if( name !in publishSources ) + log.warn "Publish target '${name}' was defined in the output block but not used by the workflow" + } + // create publish op (and optional index op) for each target for( final name : publishSources.keySet() ) { final sources = publishSources[name] final mixed = sources.size() > 1 ? new MixOp(sources.collect( ch -> CH.getReadChannel(ch) )).apply() : sources.first() - final opts = publishOptions(name, publishConfigs[name] ?: [:]) + final overrides = targetConfigs[name] ?: Collections.emptyMap() + final opts = publishOptions(name, defaults, overrides) - ops << new PublishOp(CH.getReadChannel(mixed), opts).apply() + if( opts.enabled == null || opts.enabled ) + ops << new PublishOp(CH.getReadChannel(mixed), opts).apply() } } - private Map publishOptions(String name, Map overrides) { - if( !directory ) - directory = FileHelper.toCanonicalPath('.') - + private Map publishOptions(String name, Map defaults, Map overrides) { final opts = defaults + overrides if( opts.containsKey('ignoreErrors') ) opts.failOnError = !opts.remove('ignoreErrors') @@ -139,9 +98,9 @@ class OutputDsl { opts.overwrite = 'standard' final path = opts.path as String ?: name - if( path.startsWith('/') ) - throw new ScriptRuntimeException("Invalid publish target path '${path}' -- it should be a relative path") - opts.path = directory.resolve(path) + if( path.startsWith('/') || path.endsWith('/') ) + throw new ScriptRuntimeException("Invalid publish target path '${path}' -- it should not contain a leading or trailing slash") + opts.path = session.outputDir.resolve(path) if( opts.index && !(opts.index as Map).path ) throw new ScriptRuntimeException("Index file definition for publish target '${name}' is missing `path` option") @@ -168,6 +127,10 @@ class OutputDsl { setOption('contentType', value) } + void enabled(boolean value) { + setOption('enabled', value) + } + void ignoreErrors(boolean value) { setOption('ignoreErrors', value) } @@ -197,6 +160,11 @@ class OutputDsl { setOption('path', value) } + void path(Closure value) { + setOption('path', '.') + setOption('pathAs', value) + } + void storageClass(String value) { setOption('storageClass', value) } @@ -205,10 +173,6 @@ class OutputDsl { setOption('tags', value) } - void enabled( boolean value ) { - setOption('enabled', value) - } - private void setOption(String name, Object value) { if( opts.containsKey(name) ) throw new ScriptRuntimeException("Publish option `${name}` cannot be defined more than once for a given target") diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy index a17e80eb5a..df7ce27ec6 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy @@ -164,11 +164,6 @@ class ProcessConfig implements Map, Cloneable { */ private outputs = new OutputsList() - /** - * Map of default publish targets - */ - private Map publishTargets = [:] - /** * Initialize the taskConfig object with the defaults values * @@ -519,13 +514,6 @@ class ProcessConfig implements Map, Cloneable { outputs } - /** - * Typed shortcut to {@code #publishTargets} - */ - Map getPublishTargets() { - publishTargets - } - /** * Implements the process {@code debug} directive. */ @@ -663,13 +651,6 @@ class ProcessConfig implements Map, Cloneable { result } - void _publish_target(String emit, String name) { - final emitNames = outputs.collect { param -> param.channelEmitName } - if( emit !in emitNames ) - throw new IllegalArgumentException("Invalid emit name '${emit}' in publish statement, valid emits are: ${emitNames.join(', ')}") - publishTargets[emit] = name - } - /** * Defines a special *dummy* input parameter, when no inputs are * provided by the user for the current task diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy index f7e59b371e..ae95b230e3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessDef.groovy @@ -18,7 +18,6 @@ package nextflow.script import groovy.transform.CompileStatic import groovy.util.logging.Slf4j -import groovyx.gpars.dataflow.DataflowWriteChannel import nextflow.Const import nextflow.Global import nextflow.Session @@ -209,14 +208,6 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef { // make a copy of the output list because execution can change it output = new ChannelOut(declaredOutputs.clone()) - // register process publish targets - for( final entry : processConfig.getPublishTargets() ) { - final emit = entry.key - final name = entry.value - final source = (DataflowWriteChannel)output.getProperty(emit) - session.publishTargets[source] = name - } - // create the executor final executor = session .executorFactory diff --git a/modules/nextflow/src/main/groovy/nextflow/script/WorkflowMetadata.groovy b/modules/nextflow/src/main/groovy/nextflow/script/WorkflowMetadata.groovy index 6f99b2f1a9..fbb73fc655 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/WorkflowMetadata.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/WorkflowMetadata.groovy @@ -139,6 +139,11 @@ class WorkflowMetadata { */ Path launchDir + /** + * Workflow output directory + */ + Path outputDir + /** * Workflow working directory */ @@ -257,6 +262,7 @@ class WorkflowMetadata { this.container = session.fetchContainers() this.commandLine = session.commandLine this.nextflow = NextflowMeta.instance + this.outputDir = session.outputDir this.workDir = session.workDir this.launchDir = Paths.get('.').complete() this.profile = session.profile ?: ConfigBuilder.DEFAULT_PROFILE diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy index 636ad03287..fd7dedc321 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy @@ -122,6 +122,13 @@ trait TraceObserver { */ void onFlowError(TaskHandler handler, TraceRecord trace){} + /** + * Method that is invoked when a value is published from a channel. + * + * @param value + */ + void onWorkflowPublish(Object value){} + /** * Method that is invoke when an output file is published * into a `publishDir` folder. diff --git a/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy b/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy index 3828ae5fc4..265672fd3f 100644 --- a/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy @@ -18,15 +18,23 @@ class OutputDslTest extends Specification { def 'should publish workflow outputs'() { given: def root = Files.createTempDirectory('test') + def outputDir = root.resolve('results') def workDir = root.resolve('work') def work1 = workDir.resolve('ab/1234'); Files.createDirectories(work1) def work2 = workDir.resolve('cd/5678'); Files.createDirectories(work2) def file1 = work1.resolve('file1.txt'); file1.text = 'Hello' def file2 = work2.resolve('file2.txt'); file2.text = 'world' - def target = root.resolve('results') and: def session = Mock(Session) { - getConfig() >> [:] + getConfig() >> [ + workflow: [ + output: [ + mode: 'symlink', + overwrite: true + ] + ] + ] + getOutputDir() >> outputDir getWorkDir() >> workDir } Global.session = session @@ -48,9 +56,6 @@ class OutputDslTest extends Specification { SysEnv.push(NXF_FILE_ROOT: root.toString()) when: - dsl.directory('results') - dsl.mode('symlink') - dsl.overwrite(true) dsl.target('bar') { path('barbar') index { @@ -67,74 +72,69 @@ class OutputDslTest extends Specification { } then: - target.resolve('foo/file1.txt').text == 'Hello' - target.resolve('barbar/file2.txt').text == 'world' - target.resolve('barbar/index.csv').text == """\ - "file2","${target}/barbar/file2.txt" + outputDir.resolve('foo/file1.txt').text == 'Hello' + outputDir.resolve('barbar/file2.txt').text == 'world' + outputDir.resolve('barbar/index.csv').text == """\ + "file2","${outputDir}/barbar/file2.txt" """.stripIndent() and: - 1 * session.notifyFilePublish(target.resolve('foo/file1.txt'), file1) - 1 * session.notifyFilePublish(target.resolve('barbar/file2.txt'), file2) - 1 * session.notifyFilePublish(target.resolve('barbar/index.csv')) + 1 * session.notifyFilePublish(outputDir.resolve('foo/file1.txt'), file1) + 1 * session.notifyFilePublish(outputDir.resolve('barbar/file2.txt'), file2) + 1 * session.notifyFilePublish(outputDir.resolve('barbar/index.csv')) cleanup: SysEnv.pop() root?.deleteDir() } - def 'should set options' () { + def 'should set target dsl' () { when: - def dsl1 = new OutputDsl() + def dsl1 = new OutputDsl.TargetDsl() then: - dsl1.@defaults == [:] + dsl1.getOptions() == [:] when: - def dsl2 = new OutputDsl() + def dsl2 = new OutputDsl.TargetDsl() and: dsl2.contentType('simple/text') + dsl2.enabled(true) dsl2.ignoreErrors(true) dsl2.mode('someMode') dsl2.overwrite(true) dsl2.storageClass('someClass') dsl2.tags([foo:'1',bar:'2']) - dsl2.enabled(true) then: - dsl2.@defaults == [ + dsl2.getOptions() == [ contentType:'simple/text', + enabled: true, ignoreErrors: true, mode: 'someMode', overwrite: true, storageClass: 'someClass', - tags: [foo:'1',bar:'2'], - enabled: true + tags: [foo:'1',bar:'2'] ] } - def 'should set target dsl' () { + def 'should set index dsl' () { when: - def dsl1 = new OutputDsl.TargetDsl() + def dsl1 = new OutputDsl.IndexDsl() then: dsl1.getOptions() == [:] when: - def dsl2 = new OutputDsl.TargetDsl() + def dsl2 = new OutputDsl.IndexDsl() + def mapper = { v -> v } and: - dsl2.contentType('simple/text') - dsl2.ignoreErrors(true) - dsl2.mode('someMode') - dsl2.overwrite(true) - dsl2.storageClass('someClass') - dsl2.tags([foo:'1',bar:'2']) - dsl2.enabled(true) + dsl2.header(true) + dsl2.mapper(mapper) + dsl2.path('path') + dsl2.sep(',') then: dsl2.getOptions() == [ - contentType:'simple/text', - ignoreErrors: true, - mode: 'someMode', - overwrite: true, - storageClass: 'someClass', - tags: [foo:'1',bar:'2'], - enabled: true + header: true, + mapper: mapper, + path: 'path', + sep: ',' ] } diff --git a/tests/output-dsl.nf b/tests/output-dsl.nf index 22d9cea365..f3ed165185 100644 --- a/tests/output-dsl.nf +++ b/tests/output-dsl.nf @@ -26,6 +26,7 @@ process align { path("*.bam") path("${x}.bai") + script: """ echo ${x} > ${x}.bam echo ${x} | rev > ${x}.bai @@ -40,6 +41,7 @@ process my_combine { output: path 'result.txt' + script: """ cat $bamfile > result.txt cat $baifile >> result.txt @@ -50,6 +52,7 @@ process foo { output: path 'xxx' + script: ''' mkdir xxx touch xxx/A @@ -59,13 +62,14 @@ process foo { } workflow { - def input = Channel.of('alpha','beta','delta') + main: + input = Channel.of('alpha','beta','delta') align(input) - def bam = align.out[0].toSortedList { it.name } - def bai = align.out[1].toSortedList { it.name } - my_combine( bam, bai ) - my_combine.out.view{ it.text } + bams = align.out[0].toSortedList { bam -> bam.name } + bais = align.out[1].toSortedList { bai -> bai.name } + my_combine( bams, bais ) + my_combine.out.view { it -> it.text } foo() @@ -76,10 +80,8 @@ workflow { } output { - directory 'results' - mode 'copy' - 'data' { + path { val -> { file -> file } } index { path 'index.csv' mapper { val -> [filename: val] }