Guide
This guide walks through building observable Python pipelines with Graphbook Beta, from basic usage to advanced features like MCP integration and custom logging backends.
Decorating Functions with @gb.fn()
The @gb.fn() decorator is the core primitive. It registers a function as a node in the pipeline DAG. Edges are inferred from data flow: when a node’s return value is passed as an argument to another node, an edge is created from the producer to the consumer.
import graphbook.beta as gb
@gb.fn()
def load_data():
"""Load raw data."""
return [1, 2, 3]
@gb.fn()
def transform(data):
"""Transform data."""
return [x * 2 for x in data]
@gb.fn()
def run():
records = load_data() # edge: run → load_data (no data dep)
result = transform(records) # edge: load_data → transform (data flow)
return result
When a child node receives no node-produced arguments, the edge falls back to the calling parent node.
The decorator can be used in several forms:
@gb.fn # bare (no parentheses)
@gb.fn() # empty parentheses
@gb.fn(depends_on=[setup]) # with explicit dependencies
Explicit Dependencies with depends_on
Some dependencies cannot be detected automatically — shared mutable state, class attributes, closures, or global variables. Use depends_on to declare these explicitly:
@gb.fn()
def setup():
"""Initialize shared resources."""
...
@gb.fn(depends_on=[setup])
def process():
"""Uses resources initialized by setup."""
...
depends_on accepts a list of decorated functions or node ID strings. Explicit dependencies are added alongside any auto-detected data-flow edges.
Note
Graphbook tracks data flow via argument passing (id()-based return value tracking). Dependencies through shared mutable state, class attributes, closures, or global variables are not automatically detected. Use depends_on for these cases.
Logging
Graphbook provides several logging functions, all scoped to the currently executing node.
Text Logs
gb.log(message) logs a plain text message:
@gb.fn()
def train(model, data):
gb.log("Starting training...")
for epoch in range(10):
loss = train_epoch(model, data)
gb.log(f"Epoch {epoch}: loss={loss:.4f}")
Scalar Metrics
gb.log_metric(name, value, step=None) logs a scalar metric. Steps are auto-incremented if not provided:
@gb.fn()
def train(model, data):
for epoch in range(100):
loss = train_epoch(model, data)
accuracy = evaluate(model, data)
gb.log_metric("loss", loss)
gb.log_metric("accuracy", accuracy)
Images
gb.log_image(name, image, step=None) accepts PIL images, NumPy arrays, or PyTorch tensors:
@gb.fn()
def augment(image):
result = apply_transforms(image)
gb.log_image("augmented", result)
return result
Audio
gb.log_audio(name, audio, sr=16000) logs audio data as NumPy arrays:
@gb.fn()
def synthesize(text):
waveform = tts_model(text)
gb.log_audio("speech", waveform, sr=22050)
return waveform
Rich Text
gb.log_text(name, text) logs Markdown or formatted text:
@gb.fn()
def summarize(stats):
gb.log_text("report", f"""## Results
- **Accuracy**: {stats['acc']:.2%}
- **Loss**: {stats['loss']:.4f}
""")
Configuration
gb.log_cfg(cfg) logs configuration for the current node. Values are displayed in the Info tab:
@gb.fn()
def train(lr=0.001, epochs=50):
gb.log_cfg({"lr": lr, "epochs": epochs})
...
Multiple log_cfg() calls within the same node merge their dictionaries.
Object Inspection
gb.inspect(obj, name) logs metadata about an object without logging the raw data. This is useful for debugging tensor shapes, dtypes, and value ranges:
@gb.fn()
def forward(model, batch):
output = model(batch)
gb.inspect(output, "model_output")
# Captures: shape, dtype, device, requires_grad, min, max, mean
return output
For PyTorch tensors, it captures shape, dtype, device, requires_grad, min, max, mean. For NumPy arrays, it captures shape, dtype, min, max, mean. For pandas DataFrames, it captures columns and dtypes. For any object with __len__, it captures the length.
Progress Tracking
gb.track(iterable, name, total) wraps an iterable for tqdm-like progress tracking. The terminal dashboard renders a live progress bar:
@gb.fn()
def process(items):
results = []
for item in gb.track(items, name="processing"):
results.append(transform(item))
return results
If the iterable has a __len__, the total is auto-detected. Otherwise you can pass total explicitly:
for batch in gb.track(dataloader, name="training", total=len(dataloader)):
...
Workflow Description
gb.md(description) sets a Markdown description for the overall workflow. This is visible in MCP tools and the terminal dashboard:
gb.md("""
# Image Classification Pipeline
Loads images from disk, runs inference with a pretrained ResNet,
and exports predictions to a JSON file.
""")
Calling gb.md() multiple times appends to the description.
Human-in-the-Loop
gb.ask(question, options, timeout) pauses the pipeline and prompts the user for input. In server mode, the question is sent via MCP. In local mode, it falls back to a Rich terminal prompt:
@gb.fn()
def review(predictions):
answer = gb.ask(
"Model accuracy is 73%. Continue training?",
options=["yes", "no", "retrain"]
)
if answer == "no":
return predictions
elif answer == "retrain":
return retrain(predictions)
...
Execution Modes
Local Mode (Default)
When you run a script directly (python my_pipeline.py), graphbook operates in local mode. A Rich terminal display shows the DAG, node execution counts, progress bars, and logs. No daemon is required.
Server Mode
When the daemon is running, events are streamed to it via HTTP. This is activated automatically when using graphbook-beta run, which sets the GRAPHBOOK_MODE, GRAPHBOOK_SERVER_PORT, and GRAPHBOOK_RUN_ID environment variables.
You can also trigger server mode manually:
import graphbook.beta as gb
gb.init(mode="server", port=2048)
Auto Mode
The default mode is auto. On initialization, graphbook checks for a running daemon — if found, it uses server mode; otherwise it falls back to local mode. This is what happens when you call gb.init() without arguments, or when the @fn decorator triggers auto-initialization from environment variables.
Custom Logging Backends
Implement the LoggingBackend protocol to route events to external systems (TensorBoard, MLflow, W&B, etc.):
from graphbook.beta import LoggingBackend
class TensorBoardBackend:
def __init__(self, log_dir="runs"):
from torch.utils.tensorboard import SummaryWriter
self.writer = SummaryWriter(log_dir)
def on_log(self, node, message, timestamp):
pass # TensorBoard doesn't have a text log concept
def on_metric(self, node, name, value, step):
self.writer.add_scalar(f"{node}/{name}", value, step)
def on_image(self, node, name, image_bytes, step):
pass
def on_audio(self, node, name, audio_bytes, sr):
pass
def on_node_start(self, node, params):
pass
def on_node_end(self, node, duration):
pass
def flush(self):
self.writer.flush()
def close(self):
self.writer.close()
gb.init(backends=[TensorBoardBackend()])
Multiple backends can be active simultaneously.
MCP Integration for AI Agents
Graphbook Beta includes 15 MCP tools that allow AI agents (like Claude) to run, monitor, and debug pipelines. To set up MCP:
Start the daemon:
$ graphbook-beta serve -dGet the MCP config:
$ graphbook-beta mcpAdd the printed config to your Claude Desktop or Claude Code MCP configuration.
The MCP tools provide observation (get_graph, get_node_status, get_logs, get_metrics, get_errors, get_description, inspect_object) and action (run_pipeline, stop_pipeline, restart_pipeline, get_run_status, get_run_history, get_source_code, write_source_code, ask_user) capabilities. An AI agent can use these to autonomously run experiments, diagnose failures, patch code, and iterate.
Complete Example: Data Processing Pipeline
import numpy as np
import graphbook.beta as gb
gb.md("# Data Processing Pipeline\nGenerate, normalize, filter, and analyze data.")
@gb.fn()
def generate(num_samples: int = 200, noise: float = 0.1, seed: int = 42):
"""Generate synthetic signal data."""
gb.log_cfg({"num_samples": num_samples, "noise": noise, "seed": seed})
np.random.seed(seed)
t = np.linspace(0, 4 * np.pi, num_samples)
signal = np.sin(t) + noise * np.random.randn(num_samples)
gb.log(f"Generated {num_samples} samples")
gb.inspect(signal, "raw_signal")
return signal
@gb.fn()
def normalize(data, method: str = "standard", clip_min: float = -3.0, clip_max: float = 3.0):
"""Normalize and clip the signal."""
gb.log_cfg({"method": method, "clip_min": clip_min, "clip_max": clip_max})
if method == "standard":
data = (data - data.mean()) / (data.std() + 1e-8)
data = np.clip(data, clip_min, clip_max)
gb.log(f"Normalized with method={method}, clipped to [{clip_min}, {clip_max}]")
gb.inspect(data, "normalized")
return data
@gb.fn()
def analyze(data):
"""Compute statistics on the processed data."""
stats = {"mean": float(data.mean()), "std": float(data.std()), "n": len(data)}
gb.log(f"Stats: mean={stats['mean']:.4f}, std={stats['std']:.4f}")
gb.log_metric("mean", stats["mean"])
gb.log_metric("std", stats["std"])
return stats
@gb.fn()
def run():
"""Main entry point."""
data = generate()
normed = normalize(data)
return analyze(normed)
if __name__ == "__main__":
result = run()
print(result)