Skip to contents

Dagster Pipes protocol for R — run R scripts as external processes orchestrated by Dagster.

Overview

dagsterpipes implements the Dagster Pipes protocol in R. It lets R scripts receive execution context from Dagster (asset keys, partitions, extras) and report materializations, asset checks, logs, and custom messages back to the orchestrator. It is the R counterpart to Python’s dagster-pipes and the JavaScript reference implementation.

Installation

Install from GitHub with either remotes or pak:

# install.packages("remotes")
remotes::install_github("joekirincic/dagsterpipes")

# or
# install.packages("pak")
pak::pak("joekirincic/dagsterpipes")

Quick start

On the Python / Dagster side, define an asset that runs an R script via PipesSubprocessClient:

import dagster as dg

@dg.asset(compute_kind="R")
def my_r_asset(
    context: dg.AssetExecutionContext,
    pipes_subprocess_client: dg.PipesSubprocessClient,
):
    return pipes_subprocess_client.run(
        command=["Rscript", "my_script.R"],
        context=context,
        extras={"threshold": 0.5, "output_path": "/tmp/results.csv"},
    ).get_materialize_result()

defs = dg.Definitions(
    assets=[my_r_asset],
    resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)

On the R side (my_script.R), use with_dagster_pipes() to open a session, do the work, and report back. The session is closed automatically — including forwarding any error to Dagster:

library(dagsterpipes)

with_dagster_pipes(function(ctx) {
  threshold <- ctx$get_extra("threshold")
  output_path <- ctx$get_extra("output_path")

  # ... do work ...

  ctx$report_asset_materialization(
    metadata = list(
      row_count   = pipes_metadata_value(1000L, "int"),
      output_path = pipes_metadata_value(output_path, "path")
    )
  )

  ctx$log("Processing complete", level = "INFO")
})

Key features

  • Minimal dependencies: only R6 and jsonlite.
  • File-based message transport (newline-delimited JSON), matching Dagster’s default PipesSubprocessClient / PipesTempFileMessageReader.
  • Automatic session lifecycle: with_dagster_pipes() opens, closes, and forwards errors to Dagster for you — no manual tryCatch needed.
  • Graceful degradation: when the Dagster env vars are absent, a no-op NullPipesContext is returned so scripts can be run and tested outside Dagster.
  • R6-based API with active bindings for context fields (asset_key, run_id, partition_key, extras, etc.).

Running standalone

When DAGSTER_PIPES_CONTEXT and DAGSTER_PIPES_MESSAGES are not set, with_dagster_pipes() (and open_dagster_pipes()) returns a NullPipesContext. It logs to the R console via message() and silently ignores materialization, check, and custom-message reports. This means the same script can be run interactively or from a plain Rscript invocation without any Dagster machinery.

Supported message types

Method Purpose
opened Sent automatically when the session is initialized.
closed Sent by ctx$close() to end the session.
log ctx$log(message, level) — levels: DEBUG, INFO, WARNING, ERROR, CRITICAL.
report_asset_materialization ctx$report_asset_materialization(metadata, asset_key, data_version).
report_asset_check ctx$report_asset_check(check_name, passed, asset_key, severity, metadata).
report_custom_message ctx$report_custom_message(payload).

Metadata

Metadata values reported to Dagster are typed. Use pipes_metadata_value(raw_value, type) to wrap a value with a type annotation:

pipes_metadata_value(1000L, "int")
pipes_metadata_value("/tmp/out.csv", "path")
pipes_metadata_value("https://example.com", "url")

Supported types: text, url, path, notebook, json, md, float, int, bool, dagster_run, asset, null, table, table_schema, table_column_lineage, timestamp, and __infer__ (the default, which lets Dagster infer the type from the raw value).

Code of Conduct

Please note that the dagsterpipes project is released with a Contributor Code of Conduct. By contributing to this project, you agree to abide by its terms.

License

MIT.