Skip to content

cynkra/daggle

Repository files navigation

daggle daggle

CI Go Report Card Go Reference License: GPL-3.0

Experimental. daggle is under active development. Breaking changes to the YAML schema, CLI, and API may happen frequently until a stable 1.0 release.

A lightweight DAG scheduler for R. Define multi-step R workflows in YAML, run them with dependency resolution, parallel execution, retries, timeouts, and cron scheduling — all from a single binary.

daggle sits between "cron + Rscript" and heavy workflow engines. No server, no message broker, no changes to your existing R scripts. The only prerequisite is R.

Install

Install script (Linux and macOS):

curl -fsSL https://raw.githubusercontent.com/cynkra/daggle/main/install.sh | sh

Set DAGGLE_INSTALL_DIR to change the install location (default: /usr/local/bin).

Download binary from GitHub Releases — pre-built for Linux and macOS (amd64 and arm64).

From source (requires Go 1.22+):

go install github.com/cynkra/daggle/cmd/daggle@latest

Or clone and build:

git clone https://github.com/cynkra/daggle.git
cd daggle
go build -o daggle ./cmd/daggle/

Quick start

Create a DAG file at ~/.config/daggle/dags/hello.yaml (or in a .daggle/ directory in your project):

name: hello
params:
  - name: who
    default: world
steps:
  - id: greet
    command: echo "Hello, {{ .Params.who }}!"

  - id: analyze
    r_expr: |
      cat(paste("R version:", R.version.string, "\n"))
      cat(paste("2 + 2 =", 2 + 2, "\n"))
    depends: [greet]

  - id: report
    script: scripts/report.R
    args: ["--output", "results.csv"]
    depends: [analyze]
# Validate the DAG
daggle validate hello

# Run it
daggle run hello -p who=David

# Check results
daggle status hello

# List all DAGs
daggle list

DAG definition

Every step is assumed to be R unless stated otherwise. The following step types are supported:

Type Field What it does
R script script: Runs Rscript --no-save --no-restore <path> [args]
Inline R r_expr: Writes expression to a temp .R file, runs via Rscript
Shell command: Runs via sh -c (escape hatch for non-R tasks)
Quarto quarto: Runs quarto render <path> [args]
Test test: Runs devtools::test() with structured output
Check check: Runs rcmdcheck::rcmdcheck() with error/warning/note counts
Document document: Runs roxygen2::roxygenize()
Lint lint: Runs lintr::lint_package()
Style style: Runs styler::style_pkg()
R Markdown rmd: Renders via rmarkdown::render()
Restore renv renv_restore: Runs renv::restore() to install packages
Coverage coverage: Runs covr::package_coverage() with percentage output
Validate validate: Runs a data validation R script via Rscript
Approval gate approve: Pauses execution until human approves via CLI
Sub-DAG call: Executes another DAG as a sub-step
Pin pin: Publishes data/models via pins::pin_write()
Vetiver vetiver: MLOps model versioning and deployment
Shiny test shinytest: Runs shinytest2::test_app()
Pkgdown pkgdown: Builds package website via pkgdown::build_site()
Install install: Installs packages via pak or install.packages()
Targets targets: Runs targets::tar_make() pipeline
Benchmark benchmark: Runs bench scripts from a directory
Revdepcheck revdepcheck: Runs revdepcheck::revdep_check()
Deploy connect: Deploys to Posit Connect (Shiny, Quarto, Plumber)
Database database: SQL query via R DBI, result written to CSV/TSV/RDS/Parquet/Feather
Email email: Sends email via a named SMTP channel (Go net/smtp, no R required)
Docker docker: Runs a command inside a Docker container (isolation for different R versions or system libs)

Full YAML reference

name: my-pipeline               # optional; defaults to the YAML filename (without extension)
trigger:                        # optional: automated execution triggers
  schedule: "30 6 * * MON-FRI" # cron schedule for daggle serve
workdir: /opt/projects/etl      # optional; defaults to project root (parent of .daggle/)

env:                            # environment variables for all steps
  DB_HOST: "localhost"
  REPORT_DATE: "{{ .Today }}"

params:                         # parameterized DAGs
  - name: department
    default: "sales"

# Lifecycle hooks (R expressions or shell commands)
on_success:
  r_expr: 'logger::log_info("Pipeline complete")'
on_failure:
  command: echo "Pipeline failed!" | mail -s "Alert" team@example.com
on_exit:
  r_expr: 'logger::log_info("Pipeline finished")'

steps:
  - id: extract                 # unique step identifier (required)
    script: etl/extract.R       # R script path (relative to project root)
    args: ["--dept", "{{ .Params.department }}"]
    timeout: 10m                # kill step after this duration
    retry:
      limit: 3                 # retry up to 3 times on failure
    env:                        # step-level env vars
      BATCH_SIZE: "1000"

  - id: validate
    r_expr: |                  # inline R code
      data <- readRDS("data/raw.rds")
      stopifnot(nrow(data) > 0)
      cat("::daggle-output name=row_count::", nrow(data), "\n")
    depends: [extract]          # runs after extract completes
    on_failure:
      r_expr: 'warning("Validation failed")'

  - id: report
    script: etl/report.R
    args: ["--rows", "$DAGGLE_OUTPUT_VALIDATE_ROW_COUNT"]
    depends: [validate]
    workdir: reports             # step-level workdir (relative to project root, or absolute)

  - id: deploy
    command: scp output.csv server:/data/
    depends: [report]

Template variables

DAG files support Go text/template expressions:

Variable Example Description
{{ .Today }} 2026-04-01 Current date (YYYY-MM-DD)
{{ .Now }} 2026-04-01T08:30:00Z Current time (RFC3339)
{{ .Params.name }} sales Parameter value (from defaults or --param overrides)
{{ .Env.KEY }} localhost DAG-level environment variable

Inter-step data passing

Steps can pass data to downstream steps using stdout markers:

# In step "extract":
cat("::daggle-output name=row_count::", nrow(data), "\n")
cat("::daggle-output name=output_path::", path, "\n")

Downstream steps receive these as environment variables, namespaced by step ID:

- id: report
  # Available as $DAGGLE_OUTPUT_EXTRACT_ROW_COUNT and $DAGGLE_OUTPUT_EXTRACT_OUTPUT_PATH
  r_expr: |
    rows <- as.integer(Sys.getenv("DAGGLE_OUTPUT_EXTRACT_ROW_COUNT"))
    cat(sprintf("Processing %d rows\n", rows))
  depends: [extract]

Output markers are parsed and stripped from terminal output but kept in log files.

R package development

daggle has first-class support for R package workflows:

name: pkg-check
steps:
  - id: restore
    renv_restore: "."
  - id: document
    document: "."
    depends: [restore]
  - id: lint
    lint: "."
    depends: [document]
  - id: test
    test: "."
    depends: [document]
  - id: coverage
    coverage: "."
    depends: [test]
  - id: check
    check: "."
    depends: [document]

Each step checks for its required R package (devtools, rcmdcheck, roxygen2, lintr, styler, covr, renv) and fails with a clear install instruction if missing. Test results, check errors/warnings/notes, lint issue counts, and coverage percentages are emitted as ::daggle-output:: markers for downstream use.

Triggers

DAGs can be triggered automatically via the trigger: block. Run daggle serve to start the trigger daemon. Multiple triggers can coexist — any matching trigger starts a run.

Trigger Field Mechanism
Cron schedule: Standard 5-field cron or @every 5m, @hourly
File watcher watch: fsnotify with debounce
Webhook webhook: HTTP POST with optional HMAC-SHA256
DAG completion on_dag: Fires when another DAG completes/fails
Condition condition: Polls an R expression or shell command
Git git: Polls for new commits by branch
name: etl-pipeline
trigger:
  schedule: "30 6 * * MON-FRI"   # cron
  watch:                           # also trigger on file drops
    path: /data/incoming/
    pattern: "*.csv"
    debounce: 5s
  overlap: skip                    # skip (default) or cancel running instance

Scheduler

daggle serve runs a long-lived daemon managing all triggers. daggle serve --port 8787 also starts the REST API and a read-only status dashboard. Hot-reloads DAGs every 30s (or immediately on SIGHUP), limits to 4 concurrent runs, and shuts down gracefully on SIGINT/SIGTERM. Without daggle serve, use daggle run for manual execution.

REST API

When the scheduler is started with --port, it also serves a REST API for programmatic access and a read-only status dashboard. See docs/api.md for the full endpoint reference.

The API is designed to be wrapped — build custom dashboards with daggleR + Shiny, or any HTTP client.

# List DAGs
curl http://localhost:8787/api/v1/dags

# Trigger a run
curl -X POST http://localhost:8787/api/v1/dags/my-pipeline/run \
  -H 'Content-Type: application/json' \
  -d '{"params": {"dept": "sales"}}'

# Check status
curl http://localhost:8787/api/v1/dags/my-pipeline/runs/latest

# Get outputs (flat format, R-friendly)
curl http://localhost:8787/api/v1/dags/my-pipeline/runs/latest/outputs

# Approve a waiting step
curl -X POST http://localhost:8787/api/v1/dags/my-pipeline/runs/abc123/steps/review/approve

All list endpoints return flat JSON arrays for easy conversion to R data frames with jsonlite::fromJSON().

Companion R package

The daggleR package provides in-step helpers and API wrappers for use from R. Install with pak::pak("cynkra/daggleR").

CLI reference

Most-used commands:

daggle run <dag> [-p key=val]... [--dry-run]   Run a DAG now
daggle watch <dag>                             Re-run on file changes (authoring)
daggle validate <dag|path>                     Parse + static checks
daggle lint <dag|path>                         Semantic diagnostics (scripts, secrets, notify channels)
daggle status <dag> [--run-id <id>]            Latest (or specific) run status
daggle why <dag> [run-id]                      Collapse status + stderr + dag_hash drift for a failure
daggle list [--tag/--team/--owner <x>]         List DAGs with last run status
daggle serve [--port <n>]                      Scheduler daemon (and REST API with --port)

Other commands: stop, doctor, history, stats, plan, logs, diff, monitor, impact, annotate, archive, verify, cancel, clean, approve, reject, init, register, unregister, projects, version.

Run daggle <command> --help for per-command flags, or see the full CLI reference.

Global flags:

--dags-dir <path>    Override DAG definitions directory
--data-dir <path>    Override data/runs directory

Secrets & sensitive environment variables

Environment values can reference external secret sources instead of hardcoding credentials in YAML:

env:
  DB_HOST: "postgres.internal"                      # literal value
  DB_PASS: "${env:DATABASE_PASSWORD}"               # read from process environment
  API_KEY: "${file:/run/secrets/api_key}"            # read file contents (trimmed)
  VAULT_SECRET: "${vault:secret/data/myapp#api_key}" # read from HashiCorp Vault KV v2

References are resolved at DAG start (fail-fast). Unresolved references (missing env var, missing file, Vault error) fail the DAG before any step runs.

Vault authentication uses standard VAULT_ADDR and VAULT_TOKEN environment variables (falls back to ~/.vault-token).

Redaction: Values from ${vault:} and ${file:} sources are automatically redacted (***) in JSONL events and daggle status output. For ${env:} values, mark them explicitly:

env:
  DB_PASS:
    value: "${env:DATABASE_PASSWORD}"
    secret: true

Approval gates

Pause execution until a human reviews and approves:

steps:
  - id: fit-model
    script: models/fit.R
  - id: review
    approve:
      message: "Review model metrics before deploying"
      timeout: 24h
    depends: [fit-model]
  - id: deploy
    connect:
      type: plumber
      path: api/
    depends: [review]
daggle status my-pipeline   # shows "waiting" with approval message
daggle approve my-pipeline  # continue execution
daggle reject my-pipeline   # fail the step

Conditional steps

Skip steps based on conditions:

- id: deploy-prod
  script: deploy.R
  when:
    command: 'test "$DAGGLE_ENV" = production'
  depends: [test]

Matrix runs

Run the same step across a parameter grid:

- id: fit-model
  script: models/fit.R
  matrix:
    algo: [lm, glm, gam]
    dataset: [train, full]
  args: ["--algo", "{{ .Matrix.algo }}", "--data", "{{ .Matrix.dataset }}"]

This expands into 6 parallel step instances, each with DAGGLE_MATRIX_ALGO and DAGGLE_MATRIX_DATASET environment variables.

How it works

Dependency resolution — Steps declare dependencies via depends:. daggle performs a topological sort and groups steps into tiers. Steps within a tier run in parallel.

Working directory — Steps execute in the project root (parent of .daggle/) by default. Override at DAG level (workdir:) or step level. Step-level workdirs can be relative (resolved against the project root or DAG workdir) or absolute. Precedence: step > DAG > project root.

Timeouts — Each step can specify a timeout. On expiry, daggle sends SIGTERM to the entire process group, waits 5 seconds, then SIGKILL. No orphaned R processes.

Retries — Steps with retry.limit are re-executed on failure. Supports backoff: linear (default) or backoff: exponential (1s, 2s, 4s, 8s...) with an optional max_delay cap:

retry:
  limit: 5
  backoff: exponential
  max_delay: 60s

Error reporting — When an R step fails, daggle extracts the R error message from stderr and shows it in daggle status, so you don't have to dig through log files.

renv integration — daggle auto-detects renv.lock in the project directory and sets R_LIBS_USER so R steps use the project's renv library automatically. If renv.lock is found but the library directory is missing, daggle warns you to run renv::restore(). To use a custom library path, set R_LIBS_USER in the DAG or step env: — daggle will not override it.

Reproducibility — Each run writes a meta.json with the DAG file hash, R version, platform, daggle version, renv detection status, and parameters used. This makes it easy to trace what produced a given result.

Run history — Every run creates a directory under ~/.local/share/daggle/runs/<dag>/<date>/run_<id>/ containing:

  • events.jsonl — Lifecycle events (started, completed, failed, retried)
  • meta.json — Reproducibility metadata (R version, DAG hash, platform)
  • <step-id>.stdout.log / <step-id>.stderr.log — Captured output per step

DAG discovery

daggle looks for DAG files in order: --dags-dir flag, .daggle/ in the current directory, ~/.config/daggle/dags/. Register additional project directories with daggle register <path> so the scheduler picks up their DAGs. Steps execute in the project root (parent of .daggle/) by default.

File layout

Config in ~/.config/daggle/, data in ~/.local/share/daggle/ (XDG-compliant). Override with DAGGLE_CONFIG_DIR / DAGGLE_DATA_DIR or --dags-dir / --data-dir flags.

Testing

Unit tests (fast, no external dependencies):

go test -race ./...

Integration tests live under internal/integration/ behind the integration build tag and exercise full DAG runs with real Rscript / sh subprocesses, a running API server, and the scheduler. They require R on PATH:

go test -tags=integration -race ./internal/integration/...

CI runs both jobs on every PR; the integration job installs R via r-lib/actions/setup-r.

Editor support

A JSON Schema is provided for DAG YAML files. Add this to the top of your YAML files for autocomplete and validation in VS Code (with the YAML extension):

# yaml-language-server: $schema=https://github.com/cynkra/daggle/raw/main/docs/daggle-schema.json

License

GPL-3.0 — see LICENSE for details.

About

A lightweight DAG scheduler for R

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages