Skip to content

Conversation

bentsherman
Copy link
Member

@bentsherman bentsherman commented Dec 3, 2023

This PR is an exploration intended to add support for static types for process inputs and outputs.

TODO:

  • refactor runtime classes to be independent of DSL
  • separate process inputs/outputs from env/file/stdin/stdout declarations in the runtime
  • move process input channel merge logic to CombineManyOp
  • refactor TaskProcessor to accept a single merged input channel
  • move task output collect logic to Task*Collector classes
  • move helper classes for TaskConfig into generic LazyHelpers module for lazy evaluation
  • add static type syntax to DSL
  • add type validation to task processor
  • make sure resume works
  • static types for workflow takes/emits ?
  • static typed methods for process directives ?
  • nullable paths (Nullable input/output paths #4293) ?
  • unit tests

Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
… annotation inputs

Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Copy link

netlify bot commented Dec 3, 2023

Deploy Preview for nextflow-docs-staging canceled.

Name Link
🔨 Latest commit 1cd6fce
🔍 Latest deploy log https://app.netlify.com/sites/nextflow-docs-staging/deploys/660a9cdb7f6bbf00098b125d

@bentsherman

This comment was marked as outdated.

@bentsherman

This comment was marked as outdated.

@bentsherman

This comment was marked as outdated.

Signed-off-by: Ben Sherman <bentshermann@gmail.com>
@bentsherman

This comment was marked as outdated.

Signed-off-by: Ben Sherman <bentshermann@gmail.com>
@bentsherman
Copy link
Member Author

bentsherman commented Dec 9, 2023

Now that I have developed the builder classes for process inputs and outputs and refactored the TaskProcessor accordingly, I think it is possible to bring static types to the DSL.

The key insight is to decouple the staging and unstaging of files/envs/stdin/stdout from the actual inputs and outputs declaration. I have been able to greatly simplify the runtime code by doing this, but a bonus is that it allows you to use arbitrary types.

In it's raw form, it would look like this:

process FOO {
    input:
    take 'sample_id'
    take 'files'

    env('SAMPLE_ID') { sample_id }
    path { files }

    output:
    env 'SAMPLE_ID'
    path '$file1', 'file.txt', arity: '1'

    emit { sample_id }
    emit { stdout }
    emit { [env('SAMPLE_ID'), path('$file1')] }
    emit { new Sample(sample_id, path('$file1') }
}

This is a bit verbose, but the output envs and files need to be declared immediately so that Nextflow can unstage them in the task wrapper script (whereas the emits aren't evaluated until after the task is completed). But, as you can see, it allows you to take and emit whatever types you want. You could imagine the take method having a type option and then verifying the type at runtime.

I think we can recover the existing DSL syntax on top of this API with some AST transforms and/or wrapper methods, but I still need to try this. So something in the current DSL:

process FOO {
    input:
    val my_val
    env SAMPLE_ID
    path 'file1.txt'
    tuple val(sample_id), path('file2.txt')

    output:
    val my_val
    env SAMPLE_ID
    path 'file1.txt'
    tuple val(sample_id), path('file2.txt')
}

Should be automatically translated to:

process FOO {
    input {
    take 'my_val' // $in0
    take '$in1'
    take '$in2'
    take '$in3'

    env('SAMPLE_ID') { $in1 }
    path('file1.txt') { $in2 }
    var('sample_id') { $in3[0] }
    path('file2.txt') { $in3[1] }
    }

    output {
    env('SAMPLE_ID')
    path('$file0', 'file1.txt')
    path('$file1', 'file2.txt')

    emit { my_val }
    emit { env('SAMPLE_ID') }
    emit { path('$file0') }
    emit { [sample_id, path('$file1')] }
    }
}

Another option might be to evaluate the emits before task execution and generate the outputs ahead of time, since the input vars are already defined, but calling env() / path() / stdout() would return a wrapper object that is bound to the final output after the task is complete. Then you at least don't have to define every env/path output twice. This is basically what the tuple output does, and it works fine because it constructs the tuple directly, whereas with static types the user defines the emitted object.

Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
@bentsherman
Copy link
Member Author

Putting those speculations aside, I have refactored the existing DSL to use the new builders, establishing a clear boundary between the DSL and runtime. I have not added any new features to the DSL, but this PR lays the groundwork for future enhancements.

If we want to support static types in the DSL, I think there is a range of options:

  1. The take / emit syntax shown above is the most explicit and verbose for the user, but the implementation is simple and supports arbitrary objects

  2. We could go wild and add some kind of pattern matching syntax (see Rust, OCaml, and Python >3.10 for examples). Likely the most difficult to implement, but would be the most concise for the user and also support arbitrary objects

  3. Maybe we don't need to support arbitrary objects. Maybe it would be enough to support flat lists with tuple and flat records with record, maybe flat maps with map. If so, it would just be a minor extension of the current syntax.

Note that if we add an alternative interface like the annotations, (3) is the obvious choice because users can fall back to the more verbose programmatic syntax if they need to do something that the DSL doesn't support.

Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
@bentsherman bentsherman changed the title Annotation API Separate DSL from task processor Dec 11, 2023
@bentsherman
Copy link
Member Author

bentsherman commented Dec 11, 2023

I have renamed this PR to reflect it's primary purpose. It is basically ready, and it works with nf-core/rnaseq without any changes. I may add a few more small changes and obviously need to update the tests, but I'd like to reach consensus on this PR first.

To help facilitate the review of this PR, here is a summary of the essential and non-essential (but still merge-worthy) changes:

Essential

  • Refactor runtime classes to be independent of the DSL
    • lays the groundwork for writing Nextflow pipelines in Python
  • Separate process inputs/outputs from env/file/stdin/stdout declarations in the runtime (i.e. DSL is unchanged)
    • lays the groundwork for supporting static types in the DSL

Non-essential

  • Move process input channel merge logic to CombineManyOp
  • Refactor TaskProcessor to accept a single merged input channel
  • Move task output collect logic to Task*Collector classes
  • Move helper classes for TaskConfig into generic LazyHelpers module for lazy binding

While I did rebuild many new classes from scratch, many of them ended up aligning nicely with existing classes, here is a rough mapping:

  • ProcessConfig -> much logic moved to ProcessBuilder, ProcessConfigBuilder, ProcessDsl
  • InputsList / OutputsList -> ProcessInputs / ProcessOutputs
  • BaseInParam / BaseOutParam -> ProcessInput / ProcessOutput
  • FileInParam / FileOutParam -> ProcessFileInput / ProcessFileOutput
  • TaskProcessor -> some logic moved to CombineManyOp and Task*Collector classes
  • TaskConfig -> some logic moved to LazyHelper

I am happy to carve out pieces of this PR and merge them separately if that would make things easier, it was just easier to do it all at once in order to validate the overall approach.

Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
@bentsherman
Copy link
Member Author

Current proposed syntax for static types:

  • inputs are just method parameters
    • use directives to stage env, files (stage name), stdin
    • input paths are automatically detected and staged, including nested
  • outputs are just variable declarations with assignment
  • use AST xform (eventually custom parser) to translate DSL syntax to runtime API calls
  • new syntax supports record types
  • previous syntax can be kept as a shorthand, ease the migration
// shorthand for @ValueObject class Sample { ... }
// should replace @ValueObject in custom parser
record Sample {
  String id
  List<Path> files
}

process FOO {
  // stageAs only needed if staging as different name
  env('SAMPLE_ID') { my_tuple[0] }
  stageAs('file1.txt') { my_file }
  stdin { my_stdin }
  stageAs('file2.txt') { my_tuple[1] }

  input:
  // additional behavior provided by directives
  // can support named args, default value in the future
  int my_val
  Path my_file
  String my_stdin
  List my_tuple // can be accessed in body via my_tuple[0], etc
  Sample my_record // custom record type!

  // previous syntax equivalent
  // doesn't require extra directives for env, stdin, files
  // can't be used for record types though
  val my_val
  path 'file1.txt'
  stdin /* my_stdin */
  tuple env(SAMPLE_ID), path('file2.txt')

  output:
  // r-value will be wrapped in closure by AST xform
  // r-value can be anything! even a function defined elsewhere!
  // closure delegate provides env(), stdout(), path() to unstage from task environment
  int my_val // must be assigned in body if no assignment here
  Path my_file = path('file1.txt') // maybe could be called file() like the script function?
  String my_stdout = stdout()
  List my_tuple = tuple( env('SAMPLE_ID'), path('file2.txt') )
  Sample my_record = new Sample( env('SAMPLE_ID'), path('file2.txt') )

  // previous syntax equivalent
  // can't be used for record types though
  val my_val
  path 'file1.txt'
  stdout /* my_stdout */
  tuple env(SAMPLE_ID), path('file2.txt')

  script:
  // ...
}

@bentsherman
Copy link
Member Author

Side note regarding maps. This PR will enable you to use maps instead of tuples or record types, but it's not as convenient. Because Nextflow doesn't know which map values are files, it can't automatically stage files like with tuples and records, so you'd have to use the stageAs directive to declare any file inputs:

process foo {
  stageAs { sample.files }

  input:
  Map sample // [ id: String, files: List<Path> ] (but Nextflow doesn't know this)

  script:
  // ...
}

IMO it's much better to use records anyway because of the explicit typing, and you could still have a meta-map in the record if you need to have arbitrary key-value pairs.

@stevekm
Copy link
Contributor

stevekm commented May 24, 2024

this PR looks really cool but I had some questions

is the "record" type something new to this PR? Or is this something that we can already use? Not entirely clear which aspects described here are new from this PR vs. illustrating currently available methods

This PR will enable you to use maps instead of tuples or record types, but it's not as convenient. Because Nextflow doesn't know which map values are files

[ id: String, files: List<Path> ] (but Nextflow doesn't know this)

naive question but can Nextflow just iterate through the map values and detect objects of type Path ( [ "some_file": file("foo.txt")] ) and stage them appropriately? Is it using a different method to detect Path attributes of a record object for staging?

@bentsherman
Copy link
Member Author

Record types are sort of already supported:

@ValueObject
class Sample {
  Map meta
  List<Path> reads
}

But Nextflow doesn't know how to stage files from a record type. You have to use tuples for this so that you can say exactly where the files are in the tuple using the path qualifier.

Right now, this feature will most likely be folded into DSL3, which we are still discussing but will focus on better static type checking.

And in one of my experiments with a potential DSL3 syntax (see nf-core/fetchngs#309), I found that oftentimes you don't even need record types in the process definition. In that proposal, you call a process within an operator with individual values, rather than with channels, which gives you more control over how to pass arguments. Take this example:

ftp_samples |> map { meta ->
    def fastq = [ file(meta.fastq_1), file(meta.fastq_2) ]
    SRA_FASTQ_FTP ( meta, fastq, sra_fastq_ftp_args )
}

I don't really need to pass a record type here when I could just pass the individual values directly. I might still pass around records at the workflow level, just to keep related things bundled together, but when calling a specific process, I could just pass in the values that the process needs. So I think this syntax, if it is accepted by the team, will reduce the need for tuples and records and such in the process definition.

@hanslovp-gne
Copy link

Thank you for exploring static types for inputs/outputs. This is very exciting and something that I am very excited about!

I have a question regarding static typing of tuples: We are currently using tuple (instead of the each) to facilitate parameter sweeps with channels, e.g. something like this:

input:
tuple path(embedding), val(numComponents), val(classColumn)

where the process would be called with a channel combined from 3 channels:

combinedChannel = embeddingChannel.combine(numComponentsChannel).combine(classColumnChannel)

With the new proposed syntax, would we be able to add type annotations to each element of a tuple? I see

List my_tuple

above, but I don't know if there is a way to specify the exact types of each element in my_tuple? Or would we need to define record with the appropriate members instead, and map the combinedChannel into a channel of such record?

@bentsherman
Copy link
Member Author

Hi @hanslovp-gne , the proposed syntax has evolved a bit since this PR, but I think you would be able to do parameter sweeps in a nice way

The process inputs would be define like:

input:
embedding: Path
num_components: Integer
class_column: String
...

Then you would do some kind of cross / combine as before to build up a channel of maps, where each map contains the above inputs as keys:

ch_inputs = ch_embeddings
  .cross(ch_num_components)
  .cross(ch_class_column)
  .map { embedding, num_components, class_column ->
    [ embedding: embedding, num_components: num_components, class_column: class_column ]
  }

EVALUATE( ch_inputs )

I was originally skeptical about using maps, but using them this way I think we can make it work nicely. Basically, treat the process inputs as one big map with a different type for each key

@hanslovp-gne
Copy link

Thank you @bentsherman, that looks awesome!

@bentsherman bentsherman mentioned this pull request Aug 27, 2025
3 tasks
@bentsherman
Copy link
Member Author

Closing in favor of #6368

@bentsherman bentsherman deleted the ben-programmatic-api branch August 27, 2025 17:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Using custom objects with paths

3 participants