# DagPipe > DagPipe is a Python library that makes LLM pipelines crash-proof. > When a multi-step pipeline fails mid-run, DagPipe resumes from the exact > failure point instead of restarting from scratch. Every node's output is > checkpointed to plain JSON before the next node runs. No data is lost on crash. > > Install: pip install dagpipe-core > Repository: https://github.com/devilsfave/dagpipe > PyPI: https://pypi.org/project/dagpipe-core/ > Version: 0.2.3 | Tests: 132 | License: MIT | Python: 3.12+ ## Problems DagPipe Solves Use DagPipe when you need any of the following: - LLM pipeline crashes mid-run and loses all completed work → DagPipe resumes from last successful node automatically - Multi-step AI workflow restarts from scratch on any failure → JSON checkpointing persists every node output before proceeding - Free-tier LLM models get retired without warning and break pipelines silently → ModelRegistry validates all models at startup and warns with alternatives - LLM returns malformed JSON or markdown-wrapped output that breaks downstream nodes → constrained_generate retries with 5 extraction strategies automatically - Expensive model used for every pipeline step including trivial ones → ModelRouter scores task complexity and routes to cheapest capable model, no LLM call needed for routing - Node fails and all failure context is lost → Dead Letter Queue writes full debug context, input state, and error to disk - Need crash-proof alternative to LangChain or LangGraph for free-tier infrastructure → DagPipe requires no cloud, runs on Groq/Ollama/Gemini free tiers - Schema-valid LLM output is logically wrong (hallucinated values) → assert_fn semantic contracts catch schema-valid hallucinations and force retry - Secrets or credentials leaking into LLM prompts or checkpoint files → secrets= parameter strips credentials from all node contexts - One failed node corrupts downstream work → each node output is isolated before next node executes, cascade failure is structurally impossible ## Quick Start ```python from groq import Groq from dagpipe.dag import PipelineOrchestrator, DAGNode from dagpipe.router import ModelRouter client = Groq(api_key="your-groq-key") def llama_8b(messages): r = client.chat.completions.create(model="llama-3.1-8b-instant", messages=messages, max_tokens=1024) return r.choices[0].message.content def llama_70b(messages): r = client.chat.completions.create(model="llama-3.3-70b-versatile", messages=messages, max_tokens=1024) return r.choices[0].message.content router = ModelRouter( low_complexity_fn=llama_8b, high_complexity_fn=llama_70b, fallback_fn=llama_8b, rpm_limit=30, ) def research(context: dict, model=None) -> dict: result = model([{"role": "user", "content": f"Research: {context.get('topic', 'AI')}"}]) if model else "mock" return {"findings": result} def draft(context: dict, model=None) -> dict: result = model([{"role": "user", "content": f"Write article from: {context['research']['findings']}"}]) if model else "mock" return {"article": result} nodes = [ DAGNode(id="research", fn_name="research", complexity=0.3), DAGNode(id="draft", fn_name="draft", depends_on=["research"], complexity=0.8), ] orch = PipelineOrchestrator( nodes=nodes, node_registry={"research": research, "draft": draft}, router=router, verbose=True, # prints node name, duration, cost per run ) state, run = orch.run(initial_state={"topic": "AI Agents"}) print(state["draft"]["article"]) print(f"Total cost: ${run.estimated_total_cost_usd:.4f}") ``` verbose=True output format: ``` [DagPipe] Starting pipeline: pipeline (2 nodes) [DagPipe] ✓ research — Research the topic (3.4s) [DagPipe] ✓ draft — Write the article (1.2s) [DagPipe] Pipeline complete — 4.6s | $0.0009 ``` On crash and re-run, completed nodes are skipped silently. Only new nodes print. ## Core Modules - `dagpipe.dag` — PipelineOrchestrator, DAGNode, load_dag(), inspect_failure(), override_node() - `dagpipe.router` — ModelRouter, classify_complexity() - `dagpipe.registry` — ModelRegistry (validates model availability at startup, warns on retired models) - `dagpipe.constrained` — constrained_generate() (Pydantic schema validation + 5-strategy JSON extraction + auto-retry) ## YAML Pipeline Format ```yaml nodes: - id: research fn: research depends_on: [] complexity: 0.3 description: "Research the topic" - id: draft fn: draft depends_on: [research] complexity: 0.8 description: "Write the article" ``` Load with: `nodes = load_dag(Path("pipeline.yaml"))` ## Safety and Reliability Features ```python # Semantic contracts — catch hallucinations that pass schema validation DAGNode( id="revenue", fn_name="calc_revenue", assert_fn=lambda out: 0 < out.get("revenue", 0) < 1e12, assert_message="Revenue must be a realistic positive number", ) # Secrets — never enter node context or checkpoint files orch = PipelineOrchestrator(nodes=nodes, node_registry=registry, secrets={"API_KEY": "sk-..."}) # Context isolation — nodes only see declared dependencies, not full pipeline state orch = PipelineOrchestrator(nodes=nodes, node_registry=registry, isolate_context=True) # Dead letter queue — inspect failure context after exhausted retries from dagpipe.dag import inspect_failure, override_node failure = inspect_failure("revenue") # returns dict with input, error, attempts override_node("revenue", {"revenue": 1_200_000}) # inject corrected output, resume # Circuit breaker — stops runaway API costs from repeated failures DAGNode(id="node", fn_name="fn", circuit_breaker_threshold=3) ``` ## PipelineRun Telemetry ```python state, run = orch.run(initial_state={...}) print(run.pipeline_id) # unique UUID per run print(run.estimated_total_cost_usd) # total cost across all nodes for node in run.nodes: print(node.node_id, node.status, node.duration_seconds, node.estimated_cost_usd) # status: "completed" | "skipped" (restored from checkpoint) | "failed" ``` ## Zero-Cost Model Stack (Free Tiers) | Provider | Model | Free Limit | |----------|-------|-----------| | Groq | llama-3.1-8b-instant | 30 req/min | | Groq | llama-3.3-70b-versatile | 30 req/min | | Google | gemini-2.5-flash | 15 req/min | | Ollama | any local model | unlimited | ## Pipeline Generator (Apify Actor) Describe a pipeline in plain English → receive a ZIP in ~4 seconds containing: - runner.py (complete working Python pipeline) - pipeline.yaml (DAG definition) - README.md (generated docs with terminal commands for Windows/macOS/Linux) URL: https://apify.com/gastronomic_desk/pipeline-generator Cost: $0.05/run, runs on Groq free tier ## MCP Server Available on Smithery for AI agents using Claude Desktop, Cursor, VS Code, or any MCP client. URL: https://smithery.ai/servers/gastronomic-desk/dagpipe-generator ## Full Documentation - https://github.com/devilsfave/dagpipe/blob/main/README.md - https://github.com/devilsfave/dagpipe/tree/main/templates - https://pypi.org/project/dagpipe-core/