Amiga-Z Wiki

“Modern tools for old-school communities.”

User Tools

Site Tools


start:old_notes

Robin Net

SQLite Schema

PRAGMA foreign_keys = ON;

-- =========================================================
-- node_info
-- One row per RobinNet node instance
-- =========================================================
CREATE TABLE IF NOT EXISTS node_info (
    id INTEGER PRIMARY KEY CHECK (id = 1),

    node_uuid TEXT NOT NULL UNIQUE,
    node_name TEXT NOT NULL UNIQUE,

    operator_name TEXT NOT NULL DEFAULT '',
    operator_callsign TEXT NOT NULL DEFAULT '',
    location_text TEXT NOT NULL DEFAULT '',

    transport_profile TEXT NOT NULL DEFAULT 'lan',

    created_at TEXT NOT NULL,
    updated_at TEXT NOT NULL
);

-- =========================================================
-- peers
-- Known neighboring RobinNet nodes
-- =========================================================
CREATE TABLE IF NOT EXISTS peers (
    id INTEGER PRIMARY KEY AUTOINCREMENT,

    peer_uuid TEXT,
    peer_name TEXT NOT NULL,
    base_url TEXT NOT NULL,
    transport TEXT NOT NULL DEFAULT 'lan',

    enabled INTEGER NOT NULL DEFAULT 1 CHECK (enabled IN (0,1)),

    last_seen_at TEXT,
    last_sync_at TEXT,

    notes TEXT NOT NULL DEFAULT '',

    created_at TEXT NOT NULL,
    updated_at TEXT NOT NULL,

    UNIQUE(base_url)
);

CREATE INDEX IF NOT EXISTS idx_peers_enabled
    ON peers(enabled);

CREATE INDEX IF NOT EXISTS idx_peers_name
    ON peers(peer_name);

-- =========================================================
-- messages
-- Core store-and-forward message table
-- =========================================================
CREATE TABLE IF NOT EXISTS messages (
    id INTEGER PRIMARY KEY AUTOINCREMENT,

    msg_uuid TEXT NOT NULL UNIQUE,
    msg_hash TEXT NOT NULL,

    msg_type TEXT NOT NULL,         -- bulletin, direct, status
    scope TEXT NOT NULL DEFAULT 'network', -- local, regional, network

    origin_node_uuid TEXT NOT NULL,
    origin_node_name TEXT NOT NULL,
    origin_time TEXT NOT NULL,

    author TEXT NOT NULL DEFAULT '',
    author_callsign TEXT NOT NULL DEFAULT '',

    destination TEXT NOT NULL DEFAULT '',   -- freeform for v0.1
    title TEXT NOT NULL,
    body TEXT NOT NULL,

    created_at TEXT NOT NULL,
    expires_at TEXT,

    priority INTEGER NOT NULL DEFAULT 2 CHECK (priority BETWEEN 0 AND 9),

    status TEXT NOT NULL DEFAULT 'active',  -- active, expired, cancelled

    is_local_only INTEGER NOT NULL DEFAULT 0 CHECK (is_local_only IN (0,1)),
    is_rf_eligible INTEGER NOT NULL DEFAULT 0 CHECK (is_rf_eligible IN (0,1)),

    imported_from_peer_uuid TEXT,
    imported_from_peer_name TEXT,

    first_seen_at TEXT NOT NULL,
    last_seen_at TEXT NOT NULL,

    CHECK (length(trim(title)) > 0),
    CHECK (length(msg_uuid) > 0),
    CHECK (length(msg_hash) > 0)
);

CREATE INDEX IF NOT EXISTS idx_messages_type
    ON messages(msg_type);

CREATE INDEX IF NOT EXISTS idx_messages_scope
    ON messages(scope);

CREATE INDEX IF NOT EXISTS idx_messages_created_at
    ON messages(created_at DESC);

CREATE INDEX IF NOT EXISTS idx_messages_origin_node_uuid
    ON messages(origin_node_uuid);

CREATE INDEX IF NOT EXISTS idx_messages_status
    ON messages(status);

CREATE INDEX IF NOT EXISTS idx_messages_expires_at
    ON messages(expires_at);

-- Helpful when checking integrity collisions or reimports
CREATE INDEX IF NOT EXISTS idx_messages_hash
    ON messages(msg_hash);

-- =========================================================
-- message_trace
-- Per-message trail of where a message was seen/imported
-- Useful for troubleshooting and future RF accountability
-- =========================================================
CREATE TABLE IF NOT EXISTS message_trace (
    id INTEGER PRIMARY KEY AUTOINCREMENT,

    msg_uuid TEXT NOT NULL,
    event_type TEXT NOT NULL,   -- created, imported, exported, synced, rejected

    event_time TEXT NOT NULL,

    local_node_uuid TEXT NOT NULL,
    local_node_name TEXT NOT NULL,

    peer_uuid TEXT,
    peer_name TEXT,

    detail TEXT NOT NULL DEFAULT '',

    FOREIGN KEY (msg_uuid) REFERENCES messages(msg_uuid) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS idx_message_trace_msg_uuid
    ON message_trace(msg_uuid);

CREATE INDEX IF NOT EXISTS idx_message_trace_event_time
    ON message_trace(event_time DESC);

-- =========================================================
-- sync_state
-- Tracks per-peer sync knowledge so you can avoid repeating work
-- =========================================================
CREATE TABLE IF NOT EXISTS sync_state (
    id INTEGER PRIMARY KEY AUTOINCREMENT,

    peer_id INTEGER NOT NULL,
    msg_uuid TEXT NOT NULL,

    delivery_state TEXT NOT NULL DEFAULT 'pending', -- pending, sent, failed, skipped
    attempt_count INTEGER NOT NULL DEFAULT 0,

    first_attempt_at TEXT,
    last_attempt_at TEXT,
    delivered_at TEXT,

    last_error TEXT NOT NULL DEFAULT '',

    created_at TEXT NOT NULL,
    updated_at TEXT NOT NULL,

    FOREIGN KEY (peer_id) REFERENCES peers(id) ON DELETE CASCADE,
    FOREIGN KEY (msg_uuid) REFERENCES messages(msg_uuid) ON DELETE CASCADE,

    UNIQUE(peer_id, msg_uuid)
);

CREATE INDEX IF NOT EXISTS idx_sync_state_peer_id
    ON sync_state(peer_id);

CREATE INDEX IF NOT EXISTS idx_sync_state_msg_uuid
    ON sync_state(msg_uuid);

CREATE INDEX IF NOT EXISTS idx_sync_state_delivery_state
    ON sync_state(delivery_state);

-- =========================================================
-- local_settings
-- Small key/value table for future expansion
-- =========================================================
CREATE TABLE IF NOT EXISTS local_settings (
    setting_key TEXT PRIMARY KEY,
    setting_value TEXT NOT NULL,
    updated_at TEXT NOT NULL
);

This gives you the basics without boxing you in later.

node_info

Stores the identity of the current RobinNet node.

You only ever want one row here, so I used:

id INTEGER PRIMARY KEY CHECK (id = 1)

That makes it easy to enforce “single node identity per DB.”

peers

Stores neighboring nodes.

Each peer has:

friendly name

base URL

transport type

enabled flag

last seen / last sync timestamps

That’s enough for LAN testing and later expansion.

messages

This is the heart of RobinNet.

Important fields:

msg_uuid → global tracking ID

msg_hash → SHA256 integrity hash

origin_node_uuid / origin_node_name → where it started

origin_time → when it started

destination → freeform for now

imported_from_peer_* → last known import source

first_seen_at / last_seen_at → local traceability

That gives you solid “where did this come from?” tracking.

message_trace

This is the piece you were asking for when you mentioned tracking messages.

Instead of only storing one hash and calling it done, this gives you a real event trail like:

created locally

imported from bravo

exported to charlie

rejected because duplicate

later maybe approved for RF

Example rows might look like:

msg_uuid: abc123
event_type: created
local_node_name: alpha
detail: Local bulletin created by operator
msg_uuid: abc123
event_type: imported
local_node_name: bravo
peer_name: alpha
detail: Imported during sync

That is going to be much more useful than only an MD5.

sync_state

Tracks sync per peer per message.

That lets you later say:

message pending to peer X

already sent to peer Y

failed to send to peer Z

Without having to guess from logs.

This is very useful once you move beyond a toy prototype.

local_settings

Small utility table for:

schema version

sync cursor

admin flags

defaults

You’ll be glad it’s there later.

Suggested enum values

These are not SQLite enums, just recommended values you should keep consistent in code.

msg_type bulletin direct status scope local regional network status active expired cancelled event_type in message_trace created imported exported synced rejected expired delivery_state in sync_state pending sent failed skipped Recommended canonical hash input

For the msg_hash, hash only the canonical message content, not local metadata.

I’d recommend this exact field order:

msg_type scope origin_node_uuid origin_node_name origin_time author author_callsign destination title body expires_at priority is_local_only is_rf_eligible

That way:

the same content always hashes the same

local import/export events do not alter the hash

tampering is detectable

Suggested Python helper for SHA256

import hashlib
import json


def canonical_message_payload(message: dict) -> str:
    payload = {
        "msg_type": message["msg_type"],
        "scope": message["scope"],
        "origin_node_uuid": message["origin_node_uuid"],
        "origin_node_name": message["origin_node_name"],
        "origin_time": message["origin_time"],
        "author": message.get("author", ""),
        "author_callsign": message.get("author_callsign", ""),
        "destination": message.get("destination", ""),
        "title": message["title"],
        "body": message["body"],
        "expires_at": message.get("expires_at"),
        "priority": message.get("priority", 2),
        "is_local_only": int(bool(message.get("is_local_only", False))),
        "is_rf_eligible": int(bool(message.get("is_rf_eligible", False))),
    }
    return json.dumps(payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False)


def compute_message_hash(message: dict) -> str:
    canonical = canonical_message_payload(message)
    return hashlib.sha256(canonical.encode("utf-8")).hexdigest()

db.py

# robinnet/db.py
from __future__ import annotations

import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Iterable


SCHEMA_SQL = """
PRAGMA foreign_keys = ON;

CREATE TABLE IF NOT EXISTS node_info (
    id INTEGER PRIMARY KEY CHECK (id = 1),
    node_uuid TEXT NOT NULL UNIQUE,
    node_name TEXT NOT NULL UNIQUE,
    operator_name TEXT NOT NULL DEFAULT '',
    operator_callsign TEXT NOT NULL DEFAULT '',
    location_text TEXT NOT NULL DEFAULT '',
    transport_profile TEXT NOT NULL DEFAULT 'lan',
    created_at TEXT NOT NULL,
    updated_at TEXT NOT NULL
);

CREATE TABLE IF NOT EXISTS peers (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    peer_uuid TEXT,
    peer_name TEXT NOT NULL,
    base_url TEXT NOT NULL,
    transport TEXT NOT NULL DEFAULT 'lan',
    enabled INTEGER NOT NULL DEFAULT 1 CHECK (enabled IN (0,1)),
    last_seen_at TEXT,
    last_sync_at TEXT,
    notes TEXT NOT NULL DEFAULT '',
    created_at TEXT NOT NULL,
    updated_at TEXT NOT NULL,
    UNIQUE(base_url)
);

CREATE INDEX IF NOT EXISTS idx_peers_enabled
    ON peers(enabled);

CREATE INDEX IF NOT EXISTS idx_peers_name
    ON peers(peer_name);

CREATE TABLE IF NOT EXISTS messages (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    msg_uuid TEXT NOT NULL UNIQUE,
    msg_hash TEXT NOT NULL,
    msg_type TEXT NOT NULL,
    scope TEXT NOT NULL DEFAULT 'network',
    origin_node_uuid TEXT NOT NULL,
    origin_node_name TEXT NOT NULL,
    origin_time TEXT NOT NULL,
    author TEXT NOT NULL DEFAULT '',
    author_callsign TEXT NOT NULL DEFAULT '',
    destination TEXT NOT NULL DEFAULT '',
    title TEXT NOT NULL,
    body TEXT NOT NULL,
    created_at TEXT NOT NULL,
    expires_at TEXT,
    priority INTEGER NOT NULL DEFAULT 2 CHECK (priority BETWEEN 0 AND 9),
    status TEXT NOT NULL DEFAULT 'active',
    is_local_only INTEGER NOT NULL DEFAULT 0 CHECK (is_local_only IN (0,1)),
    is_rf_eligible INTEGER NOT NULL DEFAULT 0 CHECK (is_rf_eligible IN (0,1)),
    imported_from_peer_uuid TEXT,
    imported_from_peer_name TEXT,
    first_seen_at TEXT NOT NULL,
    last_seen_at TEXT NOT NULL,
    CHECK (length(trim(title)) > 0),
    CHECK (length(msg_uuid) > 0),
    CHECK (length(msg_hash) > 0)
);

CREATE INDEX IF NOT EXISTS idx_messages_type
    ON messages(msg_type);

CREATE INDEX IF NOT EXISTS idx_messages_scope
    ON messages(scope);

CREATE INDEX IF NOT EXISTS idx_messages_created_at
    ON messages(created_at DESC);

CREATE INDEX IF NOT EXISTS idx_messages_origin_node_uuid
    ON messages(origin_node_uuid);

CREATE INDEX IF NOT EXISTS idx_messages_status
    ON messages(status);

CREATE INDEX IF NOT EXISTS idx_messages_expires_at
    ON messages(expires_at);

CREATE INDEX IF NOT EXISTS idx_messages_hash
    ON messages(msg_hash);

CREATE TABLE IF NOT EXISTS message_trace (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    msg_uuid TEXT NOT NULL,
    event_type TEXT NOT NULL,
    event_time TEXT NOT NULL,
    local_node_uuid TEXT NOT NULL,
    local_node_name TEXT NOT NULL,
    peer_uuid TEXT,
    peer_name TEXT,
    detail TEXT NOT NULL DEFAULT '',
    FOREIGN KEY (msg_uuid) REFERENCES messages(msg_uuid) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS idx_message_trace_msg_uuid
    ON message_trace(msg_uuid);

CREATE INDEX IF NOT EXISTS idx_message_trace_event_time
    ON message_trace(event_time DESC);

CREATE TABLE IF NOT EXISTS sync_state (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    peer_id INTEGER NOT NULL,
    msg_uuid TEXT NOT NULL,
    delivery_state TEXT NOT NULL DEFAULT 'pending',
    attempt_count INTEGER NOT NULL DEFAULT 0,
    first_attempt_at TEXT,
    last_attempt_at TEXT,
    delivered_at TEXT,
    last_error TEXT NOT NULL DEFAULT '',
    created_at TEXT NOT NULL,
    updated_at TEXT NOT NULL,
    FOREIGN KEY (peer_id) REFERENCES peers(id) ON DELETE CASCADE,
    FOREIGN KEY (msg_uuid) REFERENCES messages(msg_uuid) ON DELETE CASCADE,
    UNIQUE(peer_id, msg_uuid)
);

CREATE INDEX IF NOT EXISTS idx_sync_state_peer_id
    ON sync_state(peer_id);

CREATE INDEX IF NOT EXISTS idx_sync_state_msg_uuid
    ON sync_state(msg_uuid);

CREATE INDEX IF NOT EXISTS idx_sync_state_delivery_state
    ON sync_state(delivery_state);

CREATE TABLE IF NOT EXISTS local_settings (
    setting_key TEXT PRIMARY KEY,
    setting_value TEXT NOT NULL,
    updated_at TEXT NOT NULL
);
"""


class RobinDB:
    def __init__(self, db_path: str | Path):
        self.db_path = Path(db_path)

    @contextmanager
    def connect(self) -> Iterable[sqlite3.Connection]:
        self.db_path.parent.mkdir(parents=True, exist_ok=True)
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        conn.execute("PRAGMA foreign_keys = ON;")
        try:
            yield conn
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            conn.close()

    def init_db(self) -> None:
        with self.connect() as conn:
            conn.executescript(SCHEMA_SQL)
            conn.execute(
                """
                INSERT OR REPLACE INTO local_settings (setting_key, setting_value, updated_at)
                VALUES ('schema_version', '1', CURRENT_TIMESTAMP)
                """
            )

    # =========================================================
    # Generic helpers
    # =========================================================
    def fetch_one(self, sql: str, params: tuple[Any, ...] = ()) -> dict[str, Any] | None:
        with self.connect() as conn:
            row = conn.execute(sql, params).fetchone()
            return dict(row) if row else None

    def fetch_all(self, sql: str, params: tuple[Any, ...] = ()) -> list[dict[str, Any]]:
        with self.connect() as conn:
            rows = conn.execute(sql, params).fetchall()
            return [dict(r) for r in rows]

    def execute(self, sql: str, params: tuple[Any, ...] = ()) -> int:
        with self.connect() as conn:
            cur = conn.execute(sql, params)
            return int(cur.lastrowid)

    # =========================================================
    # local_settings
    # =========================================================
    def get_setting(self, key: str) -> str | None:
        row = self.fetch_one(
            "SELECT setting_value FROM local_settings WHERE setting_key = ?",
            (key,),
        )
        return None if row is None else str(row["setting_value"])

    def set_setting(self, key: str, value: str, updated_at: str) -> None:
        with self.connect() as conn:
            conn.execute(
                """
                INSERT INTO local_settings (setting_key, setting_value, updated_at)
                VALUES (?, ?, ?)
                ON CONFLICT(setting_key) DO UPDATE SET
                    setting_value = excluded.setting_value,
                    updated_at = excluded.updated_at
                """,
                (key, value, updated_at),
            )

    # =========================================================
    # node_info
    # =========================================================
    def get_node_info(self) -> dict[str, Any] | None:
        return self.fetch_one("SELECT * FROM node_info WHERE id = 1")

    def save_node_info(
        self,
        *,
        node_uuid: str,
        node_name: str,
        operator_name: str,
        operator_callsign: str,
        location_text: str,
        transport_profile: str,
        created_at: str,
        updated_at: str,
    ) -> None:
        with self.connect() as conn:
            existing = conn.execute("SELECT id FROM node_info WHERE id = 1").fetchone()
            if existing:
                conn.execute(
                    """
                    UPDATE node_info
                    SET node_uuid = ?,
                        node_name = ?,
                        operator_name = ?,
                        operator_callsign = ?,
                        location_text = ?,
                        transport_profile = ?,
                        updated_at = ?
                    WHERE id = 1
                    """,
                    (
                        node_uuid,
                        node_name,
                        operator_name,
                        operator_callsign,
                        location_text,
                        transport_profile,
                        updated_at,
                    ),
                )
            else:
                conn.execute(
                    """
                    INSERT INTO node_info (
                        id,
                        node_uuid,
                        node_name,
                        operator_name,
                        operator_callsign,
                        location_text,
                        transport_profile,
                        created_at,
                        updated_at
                    ) VALUES (1, ?, ?, ?, ?, ?, ?, ?, ?)
                    """,
                    (
                        node_uuid,
                        node_name,
                        operator_name,
                        operator_callsign,
                        location_text,
                        transport_profile,
                        created_at,
                        updated_at,
                    ),
                )

    # =========================================================
    # peers
    # =========================================================
    def add_peer(
        self,
        *,
        peer_name: str,
        base_url: str,
        transport: str = "lan",
        peer_uuid: str | None = None,
        enabled: int = 1,
        notes: str = "",
        created_at: str,
        updated_at: str,
    ) -> int:
        with self.connect() as conn:
            cur = conn.execute(
                """
                INSERT INTO peers (
                    peer_uuid,
                    peer_name,
                    base_url,
                    transport,
                    enabled,
                    notes,
                    created_at,
                    updated_at
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    peer_uuid,
                    peer_name,
                    base_url,
                    transport,
                    int(bool(enabled)),
                    notes,
                    created_at,
                    updated_at,
                ),
            )
            return int(cur.lastrowid)

    def upsert_peer(
        self,
        *,
        peer_name: str,
        base_url: str,
        transport: str = "lan",
        peer_uuid: str | None = None,
        enabled: int = 1,
        notes: str = "",
        updated_at: str,
        created_at: str | None = None,
    ) -> int:
        with self.connect() as conn:
            existing = conn.execute(
                "SELECT id FROM peers WHERE base_url = ?",
                (base_url,),
            ).fetchone()

            if existing:
                peer_id = int(existing["id"])
                conn.execute(
                    """
                    UPDATE peers
                    SET peer_uuid = ?,
                        peer_name = ?,
                        transport = ?,
                        enabled = ?,
                        notes = ?,
                        updated_at = ?
                    WHERE id = ?
                    """,
                    (
                        peer_uuid,
                        peer_name,
                        transport,
                        int(bool(enabled)),
                        notes,
                        updated_at,
                        peer_id,
                    ),
                )
                return peer_id

            if created_at is None:
                created_at = updated_at

            cur = conn.execute(
                """
                INSERT INTO peers (
                    peer_uuid,
                    peer_name,
                    base_url,
                    transport,
                    enabled,
                    notes,
                    created_at,
                    updated_at
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    peer_uuid,
                    peer_name,
                    base_url,
                    transport,
                    int(bool(enabled)),
                    notes,
                    created_at,
                    updated_at,
                ),
            )
            return int(cur.lastrowid)

    def get_peer_by_id(self, peer_id: int) -> dict[str, Any] | None:
        return self.fetch_one("SELECT * FROM peers WHERE id = ?", (peer_id,))

    def get_peer_by_url(self, base_url: str) -> dict[str, Any] | None:
        return self.fetch_one("SELECT * FROM peers WHERE base_url = ?", (base_url,))

    def list_peers(self, enabled_only: bool = False) -> list[dict[str, Any]]:
        if enabled_only:
            return self.fetch_all(
                "SELECT * FROM peers WHERE enabled = 1 ORDER BY peer_name COLLATE NOCASE"
            )
        return self.fetch_all("SELECT * FROM peers ORDER BY peer_name COLLATE NOCASE")

    def update_peer_seen(
        self,
        *,
        peer_id: int,
        last_seen_at: str | None = None,
        last_sync_at: str | None = None,
        updated_at: str,
    ) -> None:
        with self.connect() as conn:
            current = conn.execute(
                "SELECT last_seen_at, last_sync_at FROM peers WHERE id = ?",
                (peer_id,),
            ).fetchone()
            if not current:
                raise ValueError(f"Peer id {peer_id} not found")

            new_last_seen = last_seen_at if last_seen_at is not None else current["last_seen_at"]
            new_last_sync = last_sync_at if last_sync_at is not None else current["last_sync_at"]

            conn.execute(
                """
                UPDATE peers
                SET last_seen_at = ?,
                    last_sync_at = ?,
                    updated_at = ?
                WHERE id = ?
                """,
                (new_last_seen, new_last_sync, updated_at, peer_id),
            )

    def set_peer_enabled(self, peer_id: int, enabled: bool, updated_at: str) -> None:
        with self.connect() as conn:
            conn.execute(
                """
                UPDATE peers
                SET enabled = ?,
                    updated_at = ?
                WHERE id = ?
                """,
                (1 if enabled else 0, updated_at, peer_id),
            )

    def delete_peer(self, peer_id: int) -> None:
        with self.connect() as conn:
            conn.execute("DELETE FROM peers WHERE id = ?", (peer_id,))

    # =========================================================
    # messages
    # =========================================================
    def create_message(
        self,
        *,
        msg_uuid: str,
        msg_hash: str,
        msg_type: str,
        scope: str,
        origin_node_uuid: str,
        origin_node_name: str,
        origin_time: str,
        author: str,
        author_callsign: str,
        destination: str,
        title: str,
        body: str,
        created_at: str,
        expires_at: str | None,
        priority: int,
        status: str,
        is_local_only: int,
        is_rf_eligible: int,
        imported_from_peer_uuid: str | None,
        imported_from_peer_name: str | None,
        first_seen_at: str,
        last_seen_at: str,
    ) -> int:
        with self.connect() as conn:
            cur = conn.execute(
                """
                INSERT INTO messages (
                    msg_uuid,
                    msg_hash,
                    msg_type,
                    scope,
                    origin_node_uuid,
                    origin_node_name,
                    origin_time,
                    author,
                    author_callsign,
                    destination,
                    title,
                    body,
                    created_at,
                    expires_at,
                    priority,
                    status,
                    is_local_only,
                    is_rf_eligible,
                    imported_from_peer_uuid,
                    imported_from_peer_name,
                    first_seen_at,
                    last_seen_at
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    msg_uuid,
                    msg_hash,
                    msg_type,
                    scope,
                    origin_node_uuid,
                    origin_node_name,
                    origin_time,
                    author,
                    author_callsign,
                    destination,
                    title,
                    body,
                    created_at,
                    expires_at,
                    priority,
                    status,
                    int(bool(is_local_only)),
                    int(bool(is_rf_eligible)),
                    imported_from_peer_uuid,
                    imported_from_peer_name,
                    first_seen_at,
                    last_seen_at,
                ),
            )
            return int(cur.lastrowid)

    def message_exists(self, msg_uuid: str) -> bool:
        row = self.fetch_one(
            "SELECT 1 AS exists_flag FROM messages WHERE msg_uuid = ?",
            (msg_uuid,),
        )
        return row is not None

    def get_message_by_uuid(self, msg_uuid: str) -> dict[str, Any] | None:
        return self.fetch_one(
            "SELECT * FROM messages WHERE msg_uuid = ?",
            (msg_uuid,),
        )

    def get_message_by_hash(self, msg_hash: str) -> list[dict[str, Any]]:
        return self.fetch_all(
            "SELECT * FROM messages WHERE msg_hash = ? ORDER BY created_at DESC",
            (msg_hash,),
        )

    def list_messages(
        self,
        *,
        msg_type: str | None = None,
        status: str | None = None,
        limit: int = 100,
        include_expired: bool = True,
    ) -> list[dict[str, Any]]:
        sql = "SELECT * FROM messages WHERE 1=1"
        params: list[Any] = []

        if msg_type:
            sql += " AND msg_type = ?"
            params.append(msg_type)

        if status:
            sql += " AND status = ?"
            params.append(status)

        if not include_expired:
            sql += " AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)"

        sql += " ORDER BY created_at DESC LIMIT ?"
        params.append(limit)

        return self.fetch_all(sql, tuple(params))

    def list_message_summaries(
        self,
        *,
        limit: int = 500,
        include_local_only: bool = False,
    ) -> list[dict[str, Any]]:
        sql = """
            SELECT
                msg_uuid,
                msg_hash,
                msg_type,
                scope,
                origin_node_uuid,
                origin_node_name,
                origin_time,
                author,
                author_callsign,
                destination,
                title,
                created_at,
                expires_at,
                priority,
                status,
                is_local_only,
                is_rf_eligible
            FROM messages
            WHERE status = 'active'
              AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
        """
        params: list[Any] = []

        if not include_local_only:
            sql += " AND is_local_only = 0"

        sql += " ORDER BY created_at DESC LIMIT ?"
        params.append(limit)

        return self.fetch_all(sql, tuple(params))

    def update_message_last_seen(
        self,
        *,
        msg_uuid: str,
        last_seen_at: str,
        imported_from_peer_uuid: str | None = None,
        imported_from_peer_name: str | None = None,
    ) -> None:
        with self.connect() as conn:
            conn.execute(
                """
                UPDATE messages
                SET last_seen_at = ?,
                    imported_from_peer_uuid = COALESCE(?, imported_from_peer_uuid),
                    imported_from_peer_name = COALESCE(?, imported_from_peer_name)
                WHERE msg_uuid = ?
                """,
                (
                    last_seen_at,
                    imported_from_peer_uuid,
                    imported_from_peer_name,
                    msg_uuid,
                ),
            )

    def set_message_status(self, msg_uuid: str, status: str, last_seen_at: str) -> None:
        with self.connect() as conn:
            conn.execute(
                """
                UPDATE messages
                SET status = ?,
                    last_seen_at = ?
                WHERE msg_uuid = ?
                """,
                (status, last_seen_at, msg_uuid),
            )

    def delete_message(self, msg_uuid: str) -> None:
        with self.connect() as conn:
            conn.execute("DELETE FROM messages WHERE msg_uuid = ?", (msg_uuid,))

    # =========================================================
    # import helper
    # =========================================================
    def import_message_if_new(
        self,
        *,
        message: dict[str, Any],
        local_node_uuid: str,
        local_node_name: str,
        event_time: str,
        imported_from_peer_uuid: str | None = None,
        imported_from_peer_name: str | None = None,
    ) -> bool:
        existing = self.get_message_by_uuid(message["msg_uuid"])
        if existing:
            self.update_message_last_seen(
                msg_uuid=message["msg_uuid"],
                last_seen_at=event_time,
                imported_from_peer_uuid=imported_from_peer_uuid,
                imported_from_peer_name=imported_from_peer_name,
            )
            self.record_trace_event(
                msg_uuid=message["msg_uuid"],
                event_type="rejected",
                event_time=event_time,
                local_node_uuid=local_node_uuid,
                local_node_name=local_node_name,
                peer_uuid=imported_from_peer_uuid,
                peer_name=imported_from_peer_name,
                detail="Duplicate import ignored",
            )
            return False

        self.create_message(
            msg_uuid=message["msg_uuid"],
            msg_hash=message["msg_hash"],
            msg_type=message["msg_type"],
            scope=message.get("scope", "network"),
            origin_node_uuid=message["origin_node_uuid"],
            origin_node_name=message["origin_node_name"],
            origin_time=message["origin_time"],
            author=message.get("author", ""),
            author_callsign=message.get("author_callsign", ""),
            destination=message.get("destination", ""),
            title=message["title"],
            body=message["body"],
            created_at=message["created_at"],
            expires_at=message.get("expires_at"),
            priority=int(message.get("priority", 2)),
            status=message.get("status", "active"),
            is_local_only=int(bool(message.get("is_local_only", False))),
            is_rf_eligible=int(bool(message.get("is_rf_eligible", False))),
            imported_from_peer_uuid=imported_from_peer_uuid,
            imported_from_peer_name=imported_from_peer_name,
            first_seen_at=event_time,
            last_seen_at=event_time,
        )

        self.record_trace_event(
            msg_uuid=message["msg_uuid"],
            event_type="imported",
            event_time=event_time,
            local_node_uuid=local_node_uuid,
            local_node_name=local_node_name,
            peer_uuid=imported_from_peer_uuid,
            peer_name=imported_from_peer_name,
            detail="Imported from peer",
        )
        return True

    # =========================================================
    # message_trace
    # =========================================================
    def record_trace_event(
        self,
        *,
        msg_uuid: str,
        event_type: str,
        event_time: str,
        local_node_uuid: str,
        local_node_name: str,
        peer_uuid: str | None = None,
        peer_name: str | None = None,
        detail: str = "",
    ) -> int:
        with self.connect() as conn:
            cur = conn.execute(
                """
                INSERT INTO message_trace (
                    msg_uuid,
                    event_type,
                    event_time,
                    local_node_uuid,
                    local_node_name,
                    peer_uuid,
                    peer_name,
                    detail
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    msg_uuid,
                    event_type,
                    event_time,
                    local_node_uuid,
                    local_node_name,
                    peer_uuid,
                    peer_name,
                    detail,
                ),
            )
            return int(cur.lastrowid)

    def get_message_trace(self, msg_uuid: str) -> list[dict[str, Any]]:
        return self.fetch_all(
            """
            SELECT *
            FROM message_trace
            WHERE msg_uuid = ?
            ORDER BY event_time ASC, id ASC
            """,
            (msg_uuid,),
        )

    # =========================================================
    # sync_state
    # =========================================================
    def get_sync_state(self, peer_id: int, msg_uuid: str) -> dict[str, Any] | None:
        return self.fetch_one(
            """
            SELECT *
            FROM sync_state
            WHERE peer_id = ? AND msg_uuid = ?
            """,
            (peer_id, msg_uuid),
        )

    def upsert_sync_state(
        self,
        *,
        peer_id: int,
        msg_uuid: str,
        delivery_state: str,
        updated_at: str,
        attempt_count: int | None = None,
        first_attempt_at: str | None = None,
        last_attempt_at: str | None = None,
        delivered_at: str | None = None,
        last_error: str = "",
        created_at: str | None = None,
    ) -> int:
        with self.connect() as conn:
            existing = conn.execute(
                """
                SELECT id, attempt_count, first_attempt_at, created_at
                FROM sync_state
                WHERE peer_id = ? AND msg_uuid = ?
                """,
                (peer_id, msg_uuid),
            ).fetchone()

            if existing:
                row_id = int(existing["id"])
                current_attempt_count = int(existing["attempt_count"] or 0)
                final_attempt_count = current_attempt_count if attempt_count is None else attempt_count

                existing_first_attempt = existing["first_attempt_at"]
                final_first_attempt = first_attempt_at if first_attempt_at is not None else existing_first_attempt

                conn.execute(
                    """
                    UPDATE sync_state
                    SET delivery_state = ?,
                        attempt_count = ?,
                        first_attempt_at = ?,
                        last_attempt_at = ?,
                        delivered_at = ?,
                        last_error = ?,
                        updated_at = ?
                    WHERE id = ?
                    """,
                    (
                        delivery_state,
                        final_attempt_count,
                        final_first_attempt,
                        last_attempt_at,
                        delivered_at,
                        last_error,
                        updated_at,
                        row_id,
                    ),
                )
                return row_id

            if created_at is None:
                created_at = updated_at

            if attempt_count is None:
                attempt_count = 0

            cur = conn.execute(
                """
                INSERT INTO sync_state (
                    peer_id,
                    msg_uuid,
                    delivery_state,
                    attempt_count,
                    first_attempt_at,
                    last_attempt_at,
                    delivered_at,
                    last_error,
                    created_at,
                    updated_at
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    peer_id,
                    msg_uuid,
                    delivery_state,
                    attempt_count,
                    first_attempt_at,
                    last_attempt_at,
                    delivered_at,
                    last_error,
                    created_at,
                    updated_at,
                ),
            )
            return int(cur.lastrowid)

    def mark_sync_pending(
        self,
        *,
        peer_id: int,
        msg_uuid: str,
        updated_at: str,
    ) -> int:
        existing = self.get_sync_state(peer_id, msg_uuid)
        attempt_count = 0 if not existing else int(existing["attempt_count"])
        first_attempt_at = None if not existing else existing["first_attempt_at"]

        return self.upsert_sync_state(
            peer_id=peer_id,
            msg_uuid=msg_uuid,
            delivery_state="pending",
            attempt_count=attempt_count,
            first_attempt_at=first_attempt_at,
            last_attempt_at=None,
            delivered_at=None,
            last_error="",
            updated_at=updated_at,
            created_at=updated_at if not existing else None,
        )

    def mark_sync_sent(
        self,
        *,
        peer_id: int,
        msg_uuid: str,
        attempt_time: str,
    ) -> int:
        existing = self.get_sync_state(peer_id, msg_uuid)
        current_attempts = 0 if not existing else int(existing["attempt_count"])
        first_attempt = attempt_time if not existing or not existing["first_attempt_at"] else existing["first_attempt_at"]

        return self.upsert_sync_state(
            peer_id=peer_id,
            msg_uuid=msg_uuid,
            delivery_state="sent",
            attempt_count=current_attempts + 1,
            first_attempt_at=first_attempt,
            last_attempt_at=attempt_time,
            delivered_at=attempt_time,
            last_error="",
            updated_at=attempt_time,
            created_at=attempt_time if not existing else None,
        )

    def mark_sync_failed(
        self,
        *,
        peer_id: int,
        msg_uuid: str,
        attempt_time: str,
        error_text: str,
    ) -> int:
        existing = self.get_sync_state(peer_id, msg_uuid)
        current_attempts = 0 if not existing else int(existing["attempt_count"])
        first_attempt = attempt_time if not existing or not existing["first_attempt_at"] else existing["first_attempt_at"]

        return self.upsert_sync_state(
            peer_id=peer_id,
            msg_uuid=msg_uuid,
            delivery_state="failed",
            attempt_count=current_attempts + 1,
            first_attempt_at=first_attempt,
            last_attempt_at=attempt_time,
            delivered_at=None,
            last_error=error_text,
            updated_at=attempt_time,
            created_at=attempt_time if not existing else None,
        )

    def list_sync_queue(
        self,
        *,
        peer_id: int | None = None,
        delivery_state: str | None = None,
        limit: int = 200,
    ) -> list[dict[str, Any]]:
        sql = """
            SELECT
                s.*,
                p.peer_name,
                p.base_url,
                m.title,
                m.msg_type,
                m.created_at AS message_created_at
            FROM sync_state s
            JOIN peers p ON p.id = s.peer_id
            JOIN messages m ON m.msg_uuid = s.msg_uuid
            WHERE 1=1
        """
        params: list[Any] = []

        if peer_id is not None:
            sql += " AND s.peer_id = ?"
            params.append(peer_id)

        if delivery_state is not None:
            sql += " AND s.delivery_state = ?"
            params.append(delivery_state)

        sql += " ORDER BY s.updated_at DESC LIMIT ?"
        params.append(limit)

        return self.fetch_all(sql, tuple(params))

    # =========================================================
    # maintenance helpers
    # =========================================================
    def expire_messages(self, now_iso: str) -> int:
        with self.connect() as conn:
            cur = conn.execute(
                """
                UPDATE messages
                SET status = 'expired',
                    last_seen_at = ?
                WHERE status = 'active'
                  AND expires_at IS NOT NULL
                  AND expires_at <= ?
                """,
                (now_iso, now_iso),
            )
            return int(cur.rowcount)

    def list_exportable_messages(self, limit: int = 500) -> list[dict[str, Any]]:
        return self.fetch_all(
            """
            SELECT *
            FROM messages
            WHERE status = 'active'
              AND is_local_only = 0
              AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
            ORDER BY created_at DESC
            LIMIT ?
            """,
            (limit,),
        )

Example Usage

from robinnet.db import RobinDB

db = RobinDB("./data/robinnet.db")
db.init_db()

db.save_node_info(
    node_uuid="11111111-1111-1111-1111-111111111111",
    node_name="alpha",
    operator_name="Rich",
    operator_callsign="",
    location_text="Albany area",
    transport_profile="lan",
    created_at="2026-03-11T12:00:00Z",
    updated_at="2026-03-11T12:00:00Z",
)

node = db.get_node_info()
print(node)
start/old_notes.txt · Last modified: by freedomotter

Donate Powered by PHP Valid HTML5 Valid CSS Driven by DokuWiki