Skip to content

Node Development Guide

A "node" is a self-describing sensor or actuator process in bubbaloop. Nodes are the core building blocks of the platform.

Where Nodes Live

Every node is its own Git repository. Nodes are never embedded in the main bubbaloop repo.

Location What Who
your-username/my-sensor (GitHub) Your node source code You — any contributor
bubbaloop-nodes-official Official maintained nodes (rtsp-camera, openmeteo, etc.) Kornia team
~/.bubbaloop/nodes/ (local) Cloned/registered node source on your machine bubbaloop node add

Why separate repos?

  • Nodes have their own release cadence — ship updates without waiting for bubbaloop releases
  • Each node declares its own dependencies (Cargo.toml) — no workspace coupling
  • Contributors fork/clone a single small repo, not the entire platform
  • The marketplace discovers nodes by GitHub repo — each repo = one installable node

What about crates/bubbaloop-nodes/? That directory in the main repo contains only stale build artifacts from an earlier era when nodes were embedded. It is gitignored and will be removed. Never put node source code there.

Development workflow:

# 1. Create a new node (anywhere on disk)
mkdir ~/my-nodes && cd ~/my-nodes
bubbaloop node init my-sensor --type rust -d "My sensor"
cd my-sensor

# 2. Develop locally
# Edit src/main.rs — implement Node trait
pixi run build
pixi run main   # test against local zenohd

# 3. Register with local daemon (three required steps)
bubbaloop node add . -n my-sensor -c config.yaml
bubbaloop node install my-sensor   # writes systemd unit
bubbaloop node start   my-sensor   # starts the service

# 4. Publish to GitHub
git init && git add -A && git commit -m "Initial commit"
gh repo create my-sensor --public --push --source .

# 5. Others install your node
bubbaloop node add your-username/my-sensor --build --install

How bubbaloop node install Works

When a user runs bubbaloop node install rtsp-camera, the CLI:

  1. Checks if the node is already registered with the daemon
  2. Searches marketplace registries (nodes.yaml files on GitHub)
  3. Tries to download a precompiled binary (fast path — binary + checksum from GitHub Releases)
  4. Falls back to git clone + pixi run build (slow path — full source build)
  5. Saves everything to ~/.bubbaloop/nodes/<repo>/<subdir>/
  6. Registers the node path in ~/.bubbaloop/nodes.json
  7. Creates a systemd user service

On-disk result:

~/.bubbaloop/nodes/
├── bubbaloop-nodes-official/     # Cloned repo (or symlink for dev)
│   ├── rtsp-camera/
│   │   ├── node.yaml             # Manifest (read by daemon)
│   │   └── target/release/       # Built binary
│   ├── openmeteo/
│   └── system-telemetry/

For local development, you can symlink your working copy instead of having the CLI clone a separate copy:

ln -s ~/my-nodes/my-sensor ~/.bubbaloop/nodes/my-sensor
bubbaloop node add ~/.bubbaloop/nodes/my-sensor

What is a node?

A node is an autonomous process that: - Connects to the Zenoh data plane - Publishes sensor data (protobuf-encoded) - Serves a manifest describing its capabilities - Responds to commands from AI agents via MCP - Reports health via periodic heartbeats - Manages its own lifecycle (start, stop, restart)

Nodes run as systemd user services managed by the bubbaloop daemon. They can run on any machine — the daemon scopes all topics by machine_id for multi-machine deployments.

Recommended: Use the Node SDK to create Rust nodes with ~50 lines of code. The manual approach below is for advanced use cases or Python nodes.

Anatomy of a node

Required Components

  1. node.yaml — Marketplace metadata (name, version, type, description, author, build, command, capabilities, publishes, requires)
  2. config.yaml — Runtime instance parameters (publish_topic, rate_hz, node-specific fields)
  3. protos/ — Protobuf schema definitions for messages
  4. build system — pixi.toml with build/run tasks
  5. health heartbeat — Periodic publish to bubbaloop/global/{machine_id}/{instance_name}/health
  6. schema queryable — Serves FileDescriptorSet at {prefix}/schema
  7. signal handling — Graceful shutdown on SIGINT/SIGTERM

Zenoh Topics

Every node operates within a topic hierarchy using two key spaces:

bubbaloop/global/{machine_id}/{node_name}/health      → "ok" (always global)
bubbaloop/global/{machine_id}/{node_name}/schema      → FileDescriptorSet bytes (always global)

bubbaloop/global/{machine_id}/{publish_topic}         → network-visible data
bubbaloop/local/{machine_id}/{publish_topic}          → SHM-only data (same machine)

Environment variables: - BUBBALOOP_MACHINE_ID (default: hostname) — Machine identifier

Topic naming rules: - Only specify the suffix in config.yaml: publish_topic: my-node/data - Validate against ^[a-zA-Z0-9/_\-\.]+$ — reject anything else - Reserved tokens: health, daemon, camera, fleet, coordination, _global

Manifest Format

The manifest is a JSON document served via Zenoh queryable that describes the node's capabilities:

{
  "name": "network-monitor",
  "version": "0.2.0",
  "type": "python",
  "description": "Network connectivity monitor",
  "capabilities": ["sensor"],
  "publishes": [
    {
      "suffix": "network-monitor/status",
      "schema_type": "bubbaloop.network_monitor.NetworkStatus",
      "rate_hz": 0.1,
      "description": "Network health status"
    }
  ],
  "commands": [
    {
      "name": "ping_host",
      "description": "Ping a specific host",
      "parameters": {"host": "string", "count": "integer"}
    }
  ],
  "requires": {
    "hardware": ["network"]
  }
}

Schema Contract (Protobuf nodes)

Every node that publishes protobuf messages MUST serve its FileDescriptorSet via a Zenoh queryable. This enables runtime schema discovery for dashboards, AI agents, and cross-node type checking.

Requirements:

  1. Declare schema queryable at {node-name}/schema (relative to topic prefix):

    // Rust: NEVER use .complete(true) — blocks wildcard discovery
    let schema_queryable = session
        .declare_queryable(format!("{}/schema", topic_prefix))
        .await?;
    
    // Python: NEVER use complete=True
    queryable = session.declare_queryable(f"{topic_prefix}/schema")
    

  2. Serve FileDescriptorSet bytes (not JSON):

    // Rust: include compiled descriptor
    pub const DESCRIPTOR: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/descriptor.bin"));
    query.reply(query.key_expr().clone(), DESCRIPTOR).await?;
    
    // Python: reply with query.key_expr property (NOT method)
    with open("descriptor.bin", "rb") as f:
        descriptor_bytes = f.read()
    query.reply(query.key_expr, descriptor_bytes)  # key_expr is a PROPERTY
    

  3. Compile descriptor.bin via build.rs (Rust) or protoc (Python):

    // build.rs
    prost_build::Config::new()
        .file_descriptor_set_path(out_dir.join("descriptor.bin"))
        .compile_protos(&["protos/my_node.proto", "protos/header.proto"], &["protos/"])?;
    

  4. Include all .proto files the node uses (including header.proto from bubbaloop-schemas):

  5. Copy header.proto into node's protos/ directory
  6. Reference it in your message definitions: import "header.proto";

Why this matters:

  • Dashboard auto-discovers all schemas via wildcard query bubbaloop/**/schema
  • AI agents can introspect message types without reading source code
  • Cross-node type safety: verify sender/receiver compatibility at runtime
  • Version detection: dashboard can warn about schema mismatches

Common mistakes (caught by ./scripts/validate.sh):

  • Using .complete(true) in Rust queryables
  • Using complete=True in Python queryables
  • Using query.key_expr() as method in Python (it's a property)
  • Serving JSON instead of raw FileDescriptorSet bytes
  • Missing header.proto in FileDescriptorSet

Creating a Rust node

Step 1: Scaffold

Use the bubbaloop CLI to generate boilerplate:

bubbaloop node init my-sensor --template sensor --output ./my-sensor
cd my-sensor

This creates:

my-sensor/
  Cargo.toml          # Dependencies: zenoh, prost, bubbaloop-schemas
  build.rs            # Proto compilation
  config.yaml         # Runtime instance params
  node.yaml           # Marketplace metadata
  pixi.toml           # Build/run tasks
  protos/
    header.proto      # Shared Header contract
    my_sensor.proto   # Node-specific messages
  src/
    main.rs           # Entry point
    proto.rs          # include! generated types

Step 2: Define your proto

Edit protos/my_sensor.proto:

syntax = "proto3";

package bubbaloop.my_sensor.v1;

import "header.proto";

message SensorReading {
    bubbaloop.header.v1.Header header = 1;

    double temperature = 2;
    double humidity = 3;
    double pressure = 4;
    uint32 sequence = 5;
}

Step 3: Implement the main loop

Example from openmeteo node:

// src/main.rs
use anyhow::Result;
use argh::FromArgs;
use prost::Message;
use std::sync::Arc;
use tokio::time::{interval, Duration};

mod proto {
    include!(concat!(env!("OUT_DIR"), "/bubbaloop.my_sensor.v1.rs"));
}

/// FileDescriptorSet for this node's protobuf schemas
const DESCRIPTOR: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/descriptor.bin"));

#[derive(FromArgs)]
/// My sensor data publisher for Zenoh
struct Args {
    /// path to the configuration file
    #[argh(option, short = 'c')]
    config: Option<String>,

    /// zenoh router endpoint to connect to
    /// Default: tcp/127.0.0.1:7447 (local zenohd router)
    #[argh(option, short = 'e', default = "String::from(\"tcp/127.0.0.1:7447\")")]
    endpoint: String,
}

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize logging
    let env = env_logger::Env::default().default_filter_or("info");
    env_logger::init_from_env(env);

    let args: Args = argh::from_env();

    // Load config (from YAML, see Step 4)
    let config = load_config(&args.config)?;

    // Create shutdown channel
    let shutdown_tx = tokio::sync::watch::Sender::new(());

    // Set up Ctrl+C handler
    {
        let shutdown_tx = shutdown_tx.clone();
        ctrlc::set_handler(move || {
            log::info!("Received Ctrl+C, shutting down gracefully...");
            let _ = shutdown_tx.send(());
        })
        .expect("Error setting Ctrl+C handler");
    }

    // Read machine ID env var
    let machine_id = std::env::var("BUBBALOOP_MACHINE_ID")
        .unwrap_or_else(|_| {
            hostname::get()
                .map(|h| h.to_string_lossy().to_string())
                .unwrap_or_else(|_| "unknown".to_string())
        });
    log::info!("Machine ID: {}", machine_id);

    // Initialize Zenoh session in client mode
    let endpoint = std::env::var("ZENOH_ENDPOINT").unwrap_or(args.endpoint);
    log::info!("Connecting to Zenoh at: {}", endpoint);
    let mut zenoh_config = zenoh::Config::default();
    zenoh_config.insert_json5("mode", r#""client""#).unwrap();
    zenoh_config
        .insert_json5("connect/endpoints", &format!(r#"["{}"]"#, endpoint))
        .unwrap();
    let zenoh_session = Arc::new(zenoh::open(zenoh_config).await?);

    // Declare schema queryable (NO .complete(true)!)
    let schema_key = format!(
        "bubbaloop/global/{}/my-sensor/schema",
        machine_id
    );
    let _schema_queryable = zenoh_session
        .declare_queryable(&schema_key)
        .callback({
            let descriptor = DESCRIPTOR.to_vec();
            move |query| {
                let descriptor_clone = descriptor.clone();
                tokio::spawn(async move {
                    let _ = query.reply(&query.key_expr().clone(), descriptor_clone.as_slice()).await;
                });
            }
        })
        .await?;
    log::info!("Schema queryable: {}", schema_key);

    // Declare data publisher
    let data_topic = format!(
        "bubbaloop/global/{}/{}",
        machine_id, config.publish_topic
    );
    let publisher = zenoh_session.declare_publisher(&data_topic).await?;
    log::info!("Publishing to: {}", data_topic);

    // Health heartbeat publisher
    let health_topic = format!(
        "bubbaloop/global/{}/my-sensor/health",
        machine_id
    );
    let health_pub = zenoh_session.declare_publisher(&health_topic).await?;

    // Spawn health heartbeat task (5s interval)
    let health_task = tokio::spawn({
        let shutdown_rx = shutdown_tx.subscribe();
        async move {
            let mut ticker = interval(Duration::from_secs(5));
            let mut shutdown_rx = shutdown_rx;
            loop {
                tokio::select! {
                    biased;
                    _ = shutdown_rx.changed() => break,
                    _ = ticker.tick() => {
                        let _ = health_pub.put(b"alive").await;
                    }
                }
            }
        }
    });

    // Main sensing loop
    let mut shutdown_rx = shutdown_tx.subscribe();
    let mut tick = interval(Duration::from_secs_f64(1.0 / config.rate_hz));
    let mut seq: u32 = 0;

    loop {
        tokio::select! {
            biased;
            _ = shutdown_rx.changed() => break,
            _ = tick.tick() => {
                let now_ns = std::time::SystemTime::now()
                    .duration_since(std::time::UNIX_EPOCH)?
                    .as_nanos() as u64;

                let reading = proto::SensorReading {
                    header: Some(bubbaloop_schemas::header::v1::Header {
                        acq_time: now_ns,
                        pub_time: now_ns,
                        sequence: seq,
                        frame_id: "my-sensor".to_string(),
                        machine_id: machine_id.clone(),
                    }),
                    temperature: read_temperature(),
                    humidity: read_humidity(),
                    pressure: read_pressure(),
                    sequence: seq,
                };

                publisher.put(reading.encode_to_vec()).await.ok();
                seq = seq.wrapping_add(1);
            }
        }
    }

    // Clean shutdown
    health_task.abort();
    log::info!("my-sensor node shut down, exiting");
    Ok(())
}

fn read_temperature() -> f64 { 22.5 } // Replace with real sensor
fn read_humidity() -> f64 { 45.0 }
fn read_pressure() -> f64 { 1013.25 }

Step 4: Configure build.rs

// build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
    let protos_dir = std::path::Path::new("protos");
    if !protos_dir.exists() {
        return Ok(());
    }

    let proto_files: Vec<_> = std::fs::read_dir(protos_dir)?
        .filter_map(|e| e.ok())
        .map(|e| e.path())
        .filter(|p| p.extension().is_some_and(|ext| ext == "proto"))
        .collect();

    if proto_files.is_empty() {
        return Ok(());
    }

    let out_dir = std::path::PathBuf::from(std::env::var("OUT_DIR")?);

    let proto_strs: Vec<_> = proto_files.iter().filter_map(|p| p.to_str()).collect();
    prost_build::Config::new()
        .extern_path(".bubbaloop.header.v1", "::bubbaloop_schemas::header::v1")
        .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]")
        .file_descriptor_set_path(out_dir.join("descriptor.bin"))
        .compile_protos(&proto_strs, &["protos/"])?;

    for f in &proto_files {
        println!("cargo:rerun-if-changed={}", f.display());
    }
    println!("cargo:rerun-if-changed=protos");
    Ok(())
}

Step 5: Add Cargo.toml dependencies

[package]
name = "my-sensor"
version = "0.1.0"
edition = "2021"
description = "My sensor data publisher for Zenoh"

[dependencies]
anyhow = "1.0"
argh = "0.1"
ctrlc = "3.4"
env_logger = "0.11"
log = "0.4"
tokio = { version = "1", features = ["full"] }
zenoh = "1.7"
prost = "0.14"
prost-types = "0.14"
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9"
hostname = "0.4"
bubbaloop-schemas = { git = "https://github.com/kornia/bubbaloop.git", branch = "main" }

[build-dependencies]
prost-build = "0.14"

Step 6: Test locally

# Build
pixi run build

# Run (connects to local zenohd)
pixi run main -c config.yaml

# In another terminal: verify health heartbeat
z_sub -k "bubbaloop/global/*/my-sensor/health"

# Verify data publishing
z_sub -k "bubbaloop/global/*/my-sensor/*"

Step 7: Install and run via daemon

# Register with daemon
bubbaloop node add .

# Start as systemd service
bubbaloop node start my-sensor

# View logs
bubbaloop node logs my-sensor -f

# Check status
bubbaloop node list

# Stop
bubbaloop node stop my-sensor

Creating a Python node

Python nodes follow the same contract but use eclipse-zenoh and protobuf for serialization.

Step 1: Scaffold

bubbaloop node init my-sensor --template python --output ./my-sensor
cd my-sensor

This creates:

my-sensor/
  main.py             # Entry point
  build_proto.py      # Proto compilation script
  config.yaml         # Runtime instance params
  node.yaml           # Marketplace metadata
  pixi.toml           # Build/run tasks
  protos/
    header.proto      # Shared Header contract
    my_sensor.proto   # Node-specific messages

Step 2: Define your proto

Same as Rust (Step 2 above).

Step 3: Implement the main loop

Example from network-monitor node:

#!/usr/bin/env python3
"""my-sensor node - Temperature/humidity sensor publisher"""

import argparse
import json
import logging
import signal
import socket
import sys
import time
from datetime import datetime, timezone
from pathlib import Path

import yaml
import zenoh

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)

# Import generated protobuf modules
# Run `python build_proto.py` first to generate these
try:
    import header_pb2
    import my_sensor_pb2
except ImportError:
    logger.error(
        "Protobuf modules not found. Run 'python build_proto.py' first."
    )
    sys.exit(1)


class MySensorNode:
    """Temperature/humidity sensor publisher."""

    def __init__(self, config_path: Path, endpoint: str | None = None):
        # Load configuration
        if config_path.exists():
            with open(config_path) as f:
                self.config = yaml.safe_load(f)
        else:
            logger.warning(f"Config file not found: {config_path}, using defaults")
            self.config = {
                "publish_topic": "my-sensor/data",
                "rate_hz": 1.0,
            }

        # Resolve machine_id from env var
        import os
        self.machine_id = os.environ.get(
            "BUBBALOOP_MACHINE_ID", socket.gethostname()
        )

        # Setup zenoh
        zenoh_config = zenoh.Config()
        if endpoint:
            zenoh_config.insert_json5("connect/endpoints", json.dumps([endpoint]))

        self.session = zenoh.open(zenoh_config)
        logger.info("Connected to zenoh")

        # Build topic: bubbaloop/global/{machine_id}/{publish_topic}
        topic_suffix = self.config["publish_topic"]
        self.full_topic = f"bubbaloop/global/{self.machine_id}/{topic_suffix}"

        # Setup publishers
        self.publisher = self.session.declare_publisher(self.full_topic)
        logger.info(f"Publishing to: {self.full_topic}")

        self.health_publisher = self.session.declare_publisher(
            f"bubbaloop/global/{self.machine_id}/my-sensor/health"
        )

        # Declare schema queryable (NO complete=True!)
        descriptor_path = Path(__file__).parent / "descriptor.bin"
        if descriptor_path.exists():
            self.descriptor_bytes = descriptor_path.read_bytes()
            schema_key = f"bubbaloop/global/{self.machine_id}/my-sensor/schema"
            self.schema_queryable = self.session.declare_queryable(
                schema_key,
                lambda query: query.reply(query.key_expr, self.descriptor_bytes),
            )
            logger.info(f"Schema queryable: {schema_key}")
        else:
            self.descriptor_bytes = None
            self.schema_queryable = None
            logger.warning("descriptor.bin not found, schema queryable not available")

        self.hostname = socket.gethostname()
        self.running = True
        self.sequence = 0

    def process(self) -> bytes:
        """Read sensor and return serialized SensorReading."""
        now_ns = int(datetime.now(timezone.utc).timestamp() * 1e9)

        reading = my_sensor_pb2.SensorReading()
        reading.header.CopyFrom(
            header_pb2.Header(
                acq_time=now_ns,
                pub_time=now_ns,
                sequence=self.sequence,
                frame_id="my-sensor",
                machine_id=self.machine_id,
            )
        )
        reading.temperature = read_temperature()
        reading.humidity = read_humidity()
        reading.pressure = read_pressure()
        reading.sequence = self.sequence

        return reading.SerializeToString()

    def run(self):
        """Run the node main loop."""
        interval = 1.0 / self.config.get("rate_hz", 1.0)
        logger.info(
            f"my-sensor node started (rate: {self.config.get('rate_hz', 1.0)} Hz)"
        )

        while self.running:
            output = self.process()
            self.publisher.put(output)

            # Health heartbeat
            self.health_publisher.put(b"alive")

            if self.sequence % 10 == 0:
                logger.debug(f"Published reading seq={self.sequence}")

            self.sequence += 1
            time.sleep(interval)

        logger.info("my-sensor node stopped")

    def stop(self):
        """Stop the node."""
        self.running = False

    def close(self):
        """Clean up resources."""
        self.publisher.undeclare()
        self.health_publisher.undeclare()
        if self.schema_queryable is not None:
            self.schema_queryable.undeclare()
        self.session.close()


def read_temperature() -> float:
    return 22.5  # Replace with real sensor


def read_humidity() -> float:
    return 45.0


def read_pressure() -> float:
    return 1013.25


def main():
    parser = argparse.ArgumentParser(
        description="Temperature/humidity sensor publisher"
    )
    parser.add_argument(
        "-c",
        "--config",
        type=Path,
        default=Path("config.yaml"),
        help="Path to configuration file",
    )
    parser.add_argument(
        "-e",
        "--endpoint",
        type=str,
        default="tcp/127.0.0.1:7447",
        help="Zenoh endpoint to connect to (default: tcp/127.0.0.1:7447)",
    )
    args = parser.parse_args()

    node = MySensorNode(args.config, args.endpoint)

    # Setup signal handlers
    def signal_handler(signum, frame):
        logger.info("Shutdown signal received")
        node.stop()

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    try:
        node.run()
    finally:
        node.close()


if __name__ == "__main__":
    main()

Step 4: Build proto compiler script

# build_proto.py
#!/usr/bin/env python3
"""Compile protobuf schemas for my-sensor node."""

import os
import subprocess
import sys
from pathlib import Path

def main():
    # Directories
    script_dir = Path(__file__).parent
    protos_dir = script_dir / "protos"

    if not protos_dir.exists():
        print("No protos/ directory found, skipping compilation")
        return 0

    # Find all .proto files
    proto_files = list(protos_dir.glob("*.proto"))
    if not proto_files:
        print("No .proto files found in protos/")
        return 0

    print(f"Compiling {len(proto_files)} proto files...")

    # Run protoc
    cmd = [
        "protoc",
        f"--proto_path={protos_dir}",
        f"--python_out={script_dir}",
        f"--descriptor_set_out={script_dir / 'descriptor.bin'}",
        "--include_imports",
        "--include_source_info",
    ]
    cmd.extend(str(f) for f in proto_files)

    try:
        subprocess.run(cmd, check=True)
        print("Proto compilation successful")
        return 0
    except subprocess.CalledProcessError as e:
        print(f"Proto compilation failed: {e}", file=sys.stderr)
        return 1
    except FileNotFoundError:
        print("protoc not found. Install: pip install grpcio-tools", file=sys.stderr)
        return 1

if __name__ == "__main__":
    sys.exit(main())

Step 5: Test locally

# Compile protos
python build_proto.py

# Run
python main.py -c config.yaml

# In another terminal: verify
z_sub -k "bubbaloop/global/*/my-sensor/health"
z_sub -k "bubbaloop/global/*/my-sensor/*"

Step 6: Install and run via daemon

Same as Rust (Step 7 above).

node.yaml Format

The node.yaml file is the marketplace metadata that describes your node:

name: my-sensor
version: "0.1.0"
type: rust  # or "python"
description: Temperature/humidity sensor publisher
author: Your Name
build: pixi run build  # Rust: compiles binary; Python: compiles protos
command: ./target/release/my_sensor_node  # Rust binary path
# OR for Python:
# command: pixi run main  # Runs main.py via pixi

# Skill capabilities
capabilities:
  - sensor

# Topics this node publishes
publishes:
  - suffix: my-sensor/data
    description: "Temperature and humidity readings"
    schema_type: "bubbaloop.my_sensor.v1.SensorReading"
    rate_hz: 1.0

# Hardware/software requirements
requires:
  hardware:
    - network

Required fields (validated by bubbaloop node validate): - name — 1-64 chars, [a-zA-Z0-9_-], matches directory name - version — Semantic version (e.g., "0.1.0") - type"rust" or "python"

Recommended fields: - description — Human-readable description - author — Your name or team - build — Command to build the node - command — Command to run the node (see below)

command values:

Value When to use Example
(omit) Python node with main.py — default
./target/release/<name> Rust binary (default path) ./target/release/my-node
pixi run python main.py Python script via pixi pixi run python main.py
pixi run python <script> Python non-default entrypoint via pixi pixi run python sensor.py
pixi run python -m pkg.module Python package module via pixi pixi run python -m smartpower.nodes.runner config.yaml
python3 <script> Python without pixi python3 main.py
/abs/path/to/binary Absolute path to any executable /usr/local/bin/my-tool

The daemon resolves relative paths to the node directory and absolute tool paths:

  • pixi~/.pixi/bin/pixi
  • cargo~/.cargo/bin/cargo

is_built detection logic for command:

The daemon checks whether the node's main artifact exists on disk before allowing it to start. The detection order for multi-token commands is:

  1. Rust nodes: Checks for binary in target/release/<name> or target/debug/<name>. Fallback: first non-flag, non-absolute token in command (for non-standard cross-compilation paths).
  2. No command (Python): Defaults to main.py.
  3. Command with -m module.path: Converts dots to slashes and checks module/path.py. Example: pixi run python -m smartpower.nodes.runner → checks smartpower/nodes/runner.py.
  4. Command with *.py token: Checks for that script file. Example: pixi run python sensor.py → finds sensor.py.
  5. Anything else (external binary, pixi task, etc.): Assumes built (optimistic fallback).

Optional fields: - capabilities — List of skill types: sensor, actuator, processor, gateway - publishes — List of topics with suffix, description, schema_type, rate_hz - requires — Hardware/software dependencies (hardware: network, camera, gpio, etc.)

Best Practices

1. Always use Zenoh client mode (not peer)

Peer mode does not route through the zenohd router, breaking topic discovery.

// Rust
let mut zenoh_config = zenoh::Config::default();
zenoh_config.insert_json5("mode", r#""client""#).unwrap();
# Python
zenoh_config = zenoh.Config()
# Default is client mode, no explicit config needed

2. Never use .complete(true) on queryables

This breaks wildcard queries like bubbaloop/**/schema that the dashboard uses for discovery.

// ❌ WRONG
let queryable = session
    .declare_queryable("my-sensor/schema")
    .complete(true)  // BREAKS DISCOVERY
    .await?;

// ✅ CORRECT
let queryable = session
    .declare_queryable("my-sensor/schema")
    .await?;
# ❌ WRONG
queryable = session.declare_queryable("my-sensor/schema", complete=True)

# ✅ CORRECT
queryable = session.declare_queryable("my-sensor/schema")

3. Include header.proto in your FileDescriptorSet

The Header message is the standard metadata envelope for all node messages:

message Header {
    uint64 acq_time = 1;    // Nanoseconds since Unix epoch (acquisition time)
    uint64 pub_time = 2;    // Nanoseconds since Unix epoch (publish time)
    uint32 sequence = 3;    // Monotonic sequence number
    string frame_id = 4;    // Frame/sensor identifier
    string machine_id = 5;  // Machine identifier
}

Every node message should include a Header as the first field:

message SensorReading {
    bubbaloop.header.v1.Header header = 1;
    double temperature = 2;
    // ... other fields
}

4. Keep config in YAML, loaded at startup

Never hardcode configuration. Use config.yaml for all runtime parameters:

# config.yaml
publish_topic: my-sensor/data
rate_hz: 1.0
sensor_port: /dev/ttyUSB0
calibration_offset: 2.5

Load at startup and validate all fields (bounds checking, required fields, format validation).

5. Graceful shutdown on SIGTERM

The daemon sends SIGTERM when stopping a node. Always handle it gracefully:

// Rust: tokio::sync::watch channel pattern
let shutdown_tx = tokio::sync::watch::Sender::new(());
ctrlc::set_handler(move || {
    let _ = shutdown_tx.send(());
})?;

// In main loop:
tokio::select! {
    _ = shutdown_rx.changed() => break,
    // ... other branches
}
# Python: signal handlers
import signal

def signal_handler(signum, frame):
    node.stop()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

6. Publish health heartbeats every 5 seconds

The daemon marks a node unhealthy if no heartbeat arrives for 30 seconds. Publish every 5 seconds for safety margin:

// Rust: spawn background task
let health_task = tokio::spawn({
    let shutdown_rx = shutdown_tx.subscribe();
    async move {
        let mut ticker = interval(Duration::from_secs(5));
        loop {
            tokio::select! {
                _ = shutdown_rx.changed() => break,
                _ = ticker.tick() => {
                    let _ = health_pub.put(b"alive").await;
                }
            }
        }
    }
});
# Python: inline in main loop
while self.running:
    # ... do work
    self.health_publisher.put(b"alive")
    time.sleep(interval)

7. Validate all config inputs

Always validate configuration fields to prevent runtime errors:

// Rust example
if !(0.01..=1000.0).contains(&config.rate_hz) {
    anyhow::bail!("rate_hz {} out of range (0.01-1000.0)", config.rate_hz);
}

if !TOPIC_RE.is_match(&config.publish_topic) {
    anyhow::bail!("publish_topic contains invalid characters");
}
# Python example
import re
TOPIC_RE = re.compile(r"^[a-zA-Z0-9/_\-\.]+$")

topic = self.config.get("publish_topic", "")
if not TOPIC_RE.match(topic):
    raise ValueError(
        f"publish_topic '{topic}' contains invalid characters "
        f"(must match [a-zA-Z0-9/_\\-\\.]+)"
    )

rate_hz = self.config.get("rate_hz", 1.0)
if not (0.01 <= rate_hz <= 1000.0):
    raise ValueError(f"rate_hz {rate_hz} out of range (0.01-1000.0)")

8. Security checklist

  • Validate topic names: ^[a-zA-Z0-9/_\-\.]+$
  • Enforce bounds checking on numeric config values
  • Never bind to 0.0.0.0 — use localhost only
  • Never store secrets in config.yaml — use environment variables
  • Validate external endpoints (URL format, TLS certificates, timeout enforcement)

Two SDKs are available — Rust (bubbaloop-node) and Python (bubbaloop-sdk) — both with the same API. They reduce node boilerplate from ~300 lines to ~50 lines and provide:

  • Automatic Zenoh session creation (client mode, enforced)
  • Automatic health heartbeat publishing (5s interval)
  • Automatic schema queryable registration
  • Automatic config file loading (YAML deserialization)
  • Automatic signal handling (SIGTERM/SIGINT)
  • Automatic machine_id resolution
  • Automatic logging initialization

With the SDK, a complete node will look like this:

use bubbaloop_node::{Node, NodeContext};
use anyhow::Result;

#[derive(Debug, Clone, serde::Deserialize)]
pub struct Config {
    pub publish_topic: String,
    pub rate_hz: f64,
}

pub struct MySensor {
    publisher: bubbaloop_node::ProtoPublisher<proto::SensorReading>,
    rate_hz: f64,
}

#[async_trait::async_trait]
impl Node for MySensor {
    type Config = Config;

    fn name() -> &'static str { "my-sensor" }
    fn descriptor() -> &'static [u8] { DESCRIPTOR }

    async fn init(ctx: &NodeContext, config: &Config) -> Result<Self> {
        let publisher = ctx.publisher_proto::<proto::SensorReading>(&config.publish_topic).await?;
        Ok(Self { publisher, rate_hz: config.rate_hz })
    }

    async fn run(self, ctx: NodeContext) -> Result<()> {
        let mut shutdown_rx = ctx.shutdown_rx.clone();
        let mut tick = interval(Duration::from_secs_f64(1.0 / self.rate_hz));
        let mut seq: u32 = 0;

        loop {
            tokio::select! {
                biased;
                _ = shutdown_rx.changed() => break,
                _ = tick.tick() => {
                    let reading = proto::SensorReading {
                        // ... populate fields
                    };
                    self.publisher.put(&reading).await.ok();
                    seq = seq.wrapping_add(1);
                }
            }
        }
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    bubbaloop_node::run_node::<MySensor>().await
}

SDK Quick Start

# 1. Scaffold a new node
bubbaloop node init my-sensor --type rust -d "My custom sensor"

# 2. Edit src/main.rs — implement init() and run()
#    The SDK handles everything else automatically

# 3. Build and register
pixi run build
bubbaloop node add .
bubbaloop node start my-sensor

What the SDK Handles Automatically

Component Lines saved What it does
Zenoh session ~15 lines Client mode, endpoint resolution, scouting disabled
Health heartbeat ~15 lines 5s interval publish to health topic
Schema queryable ~12 lines Serves FileDescriptorSet, no .complete(true)
Config loading ~15 lines YAML deserialization with clear errors
Signal handling ~8 lines SIGINT/SIGTERM via watch channel
CLI arguments ~15 lines -c config.yaml -e endpoint
Key space + machine_id ~6 lines global/local key spaces + BUBBALOOP_MACHINE_ID
Total saved ~86 lines Per node, automatically correct

NodeContext API Reference

The NodeContext struct is provided to your node by the SDK runtime. It has these public fields and methods:

Public fields:

Field Type Description
session Arc<zenoh::Session> Zenoh session (client mode)
machine_id String Machine identifier (from BUBBALOOP_MACHINE_ID or hostname)
instance_name String Per-instance name (from config name field, or node type name)
shutdown_rx watch::Receiver<()> Shutdown signal — select on this in your run() loop

Topic builders:

Method Returns Description
ctx.topic(suffix) String bubbaloop/global/{machine_id}/{suffix} — network-visible
ctx.local_topic(suffix) String bubbaloop/local/{machine_id}/{suffix} — SHM-only

Publishers:

Method Returns Description
ctx.publisher_proto::<T>(suffix) ProtoPublisher<T> Protobuf with APPLICATION_PROTOBUF encoding + type name
ctx.publisher_json(suffix) JsonPublisher JSON with APPLICATION_JSON encoding
ctx.publisher_raw(suffix, local) RawPublisher Raw ZBytes; local=true for SHM with CongestionControl::Block
ctx.publisher_raw_proto::<T>(suffix) RawPublisher Raw SHM with protobuf encoding header (always local)

Subscribers:

Method Returns Description
ctx.subscriber::<T>(suffix) TypedSubscriber<T> Auto-decode protobuf, 256-slot FIFO
ctx.subscriber_raw(suffix, local) RawSubscriber Raw ZBytes, 4-slot FIFO (older frames dropped)

Utility functions (module-level, not on NodeContext):

Function Description
discover_nodes(&session, timeout) Find all nodes via health heartbeats; returns Vec<NodeInfo>
get_sample(session, key_expr, timeout) Single-shot pull without maintaining a subscription
ProtoDecoder::new(session) Dynamic protobuf decode via SchemaRegistry (queries bubbaloop/**/schema)

Re-exported crates: bubbaloop_node re-exports zenoh, prost, tokio, anyhow, log, serde_json, async_trait — no need to add these as direct dependencies.

Cargo.toml for Rust SDK nodes

[dependencies]
bubbaloop-node = { git = "https://github.com/kornia/bubbaloop.git", branch = "main" }

[build-dependencies]
bubbaloop-node-build = { git = "https://github.com/kornia/bubbaloop.git", branch = "main" }

[workspace]

bubbaloop-node-build automatically embeds header.proto, maps the header namespace, and writes descriptor.bin — no need to list bubbaloop-schemas, prost, or prost-build separately.

pixi.toml for Python SDK nodes

[pypi-dependencies]
bubbaloop-sdk = { git = "https://github.com/kornia/bubbaloop.git", branch = "main", subdirectory = "python-sdk" }

JSON field naming: Python nodes publish snake_case — the dashboard applies snakeToCamel() automatically on decode. Both protobuf and JSON paths normalize to camelCase for React components.

Complete Node Checklist

Before submitting a new node, verify ALL items:

Structure

  • [ ] node.yaml exists with: name, version, type, description, author, build, command
  • [ ] node.yaml has capabilities field (sensor/actuator/processor/gateway)
  • [ ] node.yaml has publishes field with topic suffix, description, schema_type, rate
  • [ ] node.yaml has requires field for hardware/software dependencies
  • [ ] config.yaml exists with: publish_topic, rate_hz, and node-specific fields
  • [ ] pixi.toml exists with: build and run tasks
  • [ ] protos/ directory with header.proto and node-specific .proto files

Communication

  • [ ] Publishes data via Zenoh topic: bubbaloop/{global|local}/{machine_id}/{suffix}
  • [ ] config.yaml specifies topic suffix only — SDK prepends key space + machine_id
  • [ ] Uses protobuf serialization for all data messages
  • [ ] Publishes health heartbeat to bubbaloop/global/{machine_id}/{instance_name}/health (vanilla zenoh, not protobuf)
  • [ ] Heartbeat interval <= 10 seconds (recommended: 5 seconds)
  • [ ] Declares schema queryable at {prefix}/schema (serves FileDescriptorSet bytes)
  • [ ] Schema queryable does NOT use .complete(true) (Rust) or complete=True (Python)

Security

  • [ ] Topic names validated: ^[a-zA-Z0-9/_\-\.]+$
  • [ ] Config numeric values have bounds checking
  • [ ] External endpoints validated (URL format, TLS)
  • [ ] No binding to 0.0.0.0
  • [ ] No multicast or gossip scouting enabled
  • [ ] No secrets in config.yaml
  • [ ] Handles SIGINT/SIGTERM gracefully

Code

  • [ ] Rust: uses vanilla zenoh with prost for data pub/sub
  • [ ] Python: uses eclipse-zenoh + protobuf, compiles protos via build_proto.py
  • [ ] Accepts CLI flags: -c config.yaml -e tcp/localhost:7447
  • [ ] Uses Header message pattern (acq_time, pub_time, sequence, frame_id, machine_id)
  • [ ] Reads BUBBALOOP_MACHINE_ID env var (default: hostname); uses global/local key spaces (SDK handles automatically)

Testing

  • [ ] Rust: config validation has unit tests (#[cfg(test)] module)
  • [ ] Rust: cargo test passes
  • [ ] Python: config loading/validation has tests
  • [ ] Manual integration test: verify health heartbeat with z_sub
  • [ ] Manual integration test: verify data publishing with z_sub
  • [ ] End-to-end test: register with daemon, verify bubbaloop node list shows HEALTHY

Reference nodes

Rust examples: - openmeteo/ — Weather data publisher (HTTP API, auto-discovery, complex config) - rtsp-camera/ — RTSP video stream publisher (hardware access, compression, unit tests)

Python examples: - network-monitor/ — Network connectivity checks (HTTP, DNS, ping)

Best practices reference: - rtsp-camera/ is the compliance reference — only node with full validation tests - openmeteo/ demonstrates config defaults and graceful degradation - network-monitor/ demonstrates Python patterns (protobuf compilation, signal handling)

Getting Help

  • Architecture overview: ARCHITECTURE.md (Node Contract section)
  • Official nodes repo: https://github.com/kornia/bubbaloop-nodes-official
  • Node SDK source: crates/bubbaloop-node/
  • Bubbaloop CLI reference: bubbaloop node --help
  • Zenoh documentation: https://zenoh.io/docs/

See Also