This is an old revision of the document!
Table of Contents
Robin Net
SQLite Schema
PRAGMA foreign_keys = ON;
-- =========================================================
-- node_info
-- One row per RobinNet node instance
-- =========================================================
CREATE TABLE IF NOT EXISTS node_info (
id INTEGER PRIMARY KEY CHECK (id = 1),
node_uuid TEXT NOT NULL UNIQUE,
node_name TEXT NOT NULL UNIQUE,
operator_name TEXT NOT NULL DEFAULT '',
operator_callsign TEXT NOT NULL DEFAULT '',
location_text TEXT NOT NULL DEFAULT '',
transport_profile TEXT NOT NULL DEFAULT 'lan',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
-- =========================================================
-- peers
-- Known neighboring RobinNet nodes
-- =========================================================
CREATE TABLE IF NOT EXISTS peers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
peer_uuid TEXT,
peer_name TEXT NOT NULL,
base_url TEXT NOT NULL,
transport TEXT NOT NULL DEFAULT 'lan',
enabled INTEGER NOT NULL DEFAULT 1 CHECK (enabled IN (0,1)),
last_seen_at TEXT,
last_sync_at TEXT,
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(base_url)
);
CREATE INDEX IF NOT EXISTS idx_peers_enabled
ON peers(enabled);
CREATE INDEX IF NOT EXISTS idx_peers_name
ON peers(peer_name);
-- =========================================================
-- messages
-- Core store-and-forward message table
-- =========================================================
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
msg_uuid TEXT NOT NULL UNIQUE,
msg_hash TEXT NOT NULL,
msg_type TEXT NOT NULL, -- bulletin, direct, status
scope TEXT NOT NULL DEFAULT 'network', -- local, regional, network
origin_node_uuid TEXT NOT NULL,
origin_node_name TEXT NOT NULL,
origin_time TEXT NOT NULL,
author TEXT NOT NULL DEFAULT '',
author_callsign TEXT NOT NULL DEFAULT '',
destination TEXT NOT NULL DEFAULT '', -- freeform for v0.1
title TEXT NOT NULL,
body TEXT NOT NULL,
created_at TEXT NOT NULL,
expires_at TEXT,
priority INTEGER NOT NULL DEFAULT 2 CHECK (priority BETWEEN 0 AND 9),
status TEXT NOT NULL DEFAULT 'active', -- active, expired, cancelled
is_local_only INTEGER NOT NULL DEFAULT 0 CHECK (is_local_only IN (0,1)),
is_rf_eligible INTEGER NOT NULL DEFAULT 0 CHECK (is_rf_eligible IN (0,1)),
imported_from_peer_uuid TEXT,
imported_from_peer_name TEXT,
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
CHECK (length(trim(title)) > 0),
CHECK (length(msg_uuid) > 0),
CHECK (length(msg_hash) > 0)
);
CREATE INDEX IF NOT EXISTS idx_messages_type
ON messages(msg_type);
CREATE INDEX IF NOT EXISTS idx_messages_scope
ON messages(scope);
CREATE INDEX IF NOT EXISTS idx_messages_created_at
ON messages(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_origin_node_uuid
ON messages(origin_node_uuid);
CREATE INDEX IF NOT EXISTS idx_messages_status
ON messages(status);
CREATE INDEX IF NOT EXISTS idx_messages_expires_at
ON messages(expires_at);
-- Helpful when checking integrity collisions or reimports
CREATE INDEX IF NOT EXISTS idx_messages_hash
ON messages(msg_hash);
-- =========================================================
-- message_trace
-- Per-message trail of where a message was seen/imported
-- Useful for troubleshooting and future RF accountability
-- =========================================================
CREATE TABLE IF NOT EXISTS message_trace (
id INTEGER PRIMARY KEY AUTOINCREMENT,
msg_uuid TEXT NOT NULL,
event_type TEXT NOT NULL, -- created, imported, exported, synced, rejected
event_time TEXT NOT NULL,
local_node_uuid TEXT NOT NULL,
local_node_name TEXT NOT NULL,
peer_uuid TEXT,
peer_name TEXT,
detail TEXT NOT NULL DEFAULT '',
FOREIGN KEY (msg_uuid) REFERENCES messages(msg_uuid) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_message_trace_msg_uuid
ON message_trace(msg_uuid);
CREATE INDEX IF NOT EXISTS idx_message_trace_event_time
ON message_trace(event_time DESC);
-- =========================================================
-- sync_state
-- Tracks per-peer sync knowledge so you can avoid repeating work
-- =========================================================
CREATE TABLE IF NOT EXISTS sync_state (
id INTEGER PRIMARY KEY AUTOINCREMENT,
peer_id INTEGER NOT NULL,
msg_uuid TEXT NOT NULL,
delivery_state TEXT NOT NULL DEFAULT 'pending', -- pending, sent, failed, skipped
attempt_count INTEGER NOT NULL DEFAULT 0,
first_attempt_at TEXT,
last_attempt_at TEXT,
delivered_at TEXT,
last_error TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (peer_id) REFERENCES peers(id) ON DELETE CASCADE,
FOREIGN KEY (msg_uuid) REFERENCES messages(msg_uuid) ON DELETE CASCADE,
UNIQUE(peer_id, msg_uuid)
);
CREATE INDEX IF NOT EXISTS idx_sync_state_peer_id
ON sync_state(peer_id);
CREATE INDEX IF NOT EXISTS idx_sync_state_msg_uuid
ON sync_state(msg_uuid);
CREATE INDEX IF NOT EXISTS idx_sync_state_delivery_state
ON sync_state(delivery_state);
-- =========================================================
-- local_settings
-- Small key/value table for future expansion
-- =========================================================
CREATE TABLE IF NOT EXISTS local_settings (
setting_key TEXT PRIMARY KEY,
setting_value TEXT NOT NULL,
updated_at TEXT NOT NULL
);
This gives you the basics without boxing you in later.
node_info
Stores the identity of the current RobinNet node.
You only ever want one row here, so I used:
id INTEGER PRIMARY KEY CHECK (id = 1)
That makes it easy to enforce “single node identity per DB.”
peers
Stores neighboring nodes.
Each peer has:
friendly name
base URL
transport type
enabled flag
last seen / last sync timestamps
That’s enough for LAN testing and later expansion.
messages
This is the heart of RobinNet.
Important fields:
msg_uuid → global tracking ID
msg_hash → SHA256 integrity hash
origin_node_uuid / origin_node_name → where it started
origin_time → when it started
destination → freeform for now
imported_from_peer_* → last known import source
first_seen_at / last_seen_at → local traceability
That gives you solid “where did this come from?” tracking.
message_trace
This is the piece you were asking for when you mentioned tracking messages.
Instead of only storing one hash and calling it done, this gives you a real event trail like:
created locally
imported from bravo
exported to charlie
rejected because duplicate
later maybe approved for RF
Example rows might look like:
msg_uuid: abc123 event_type: created local_node_name: alpha detail: Local bulletin created by operator
msg_uuid: abc123 event_type: imported local_node_name: bravo peer_name: alpha detail: Imported during sync
That is going to be much more useful than only an MD5.
sync_state
Tracks sync per peer per message.
That lets you later say:
message pending to peer X
already sent to peer Y
failed to send to peer Z
Without having to guess from logs.
This is very useful once you move beyond a toy prototype.
local_settings
Small utility table for:
schema version
sync cursor
admin flags
defaults
You’ll be glad it’s there later.
Suggested enum values
These are not SQLite enums, just recommended values you should keep consistent in code.
msg_type bulletin direct status scope local regional network status active expired cancelled event_type in message_trace created imported exported synced rejected expired delivery_state in sync_state pending sent failed skipped Recommended canonical hash input
For the msg_hash, hash only the canonical message content, not local metadata.
I’d recommend this exact field order:
msg_type scope origin_node_uuid origin_node_name origin_time author author_callsign destination title body expires_at priority is_local_only is_rf_eligible
That way:
the same content always hashes the same
local import/export events do not alter the hash
tampering is detectable
Suggested Python helper for SHA256
import hashlib
import json
def canonical_message_payload(message: dict) -> str:
payload = {
"msg_type": message["msg_type"],
"scope": message["scope"],
"origin_node_uuid": message["origin_node_uuid"],
"origin_node_name": message["origin_node_name"],
"origin_time": message["origin_time"],
"author": message.get("author", ""),
"author_callsign": message.get("author_callsign", ""),
"destination": message.get("destination", ""),
"title": message["title"],
"body": message["body"],
"expires_at": message.get("expires_at"),
"priority": message.get("priority", 2),
"is_local_only": int(bool(message.get("is_local_only", False))),
"is_rf_eligible": int(bool(message.get("is_rf_eligible", False))),
}
return json.dumps(payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False)
def compute_message_hash(message: dict) -> str:
canonical = canonical_message_payload(message)
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
db.py
# robinnet/db.py
from __future__ import annotations
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Iterable
SCHEMA_SQL = """
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS node_info (
id INTEGER PRIMARY KEY CHECK (id = 1),
node_uuid TEXT NOT NULL UNIQUE,
node_name TEXT NOT NULL UNIQUE,
operator_name TEXT NOT NULL DEFAULT '',
operator_callsign TEXT NOT NULL DEFAULT '',
location_text TEXT NOT NULL DEFAULT '',
transport_profile TEXT NOT NULL DEFAULT 'lan',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS peers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
peer_uuid TEXT,
peer_name TEXT NOT NULL,
base_url TEXT NOT NULL,
transport TEXT NOT NULL DEFAULT 'lan',
enabled INTEGER NOT NULL DEFAULT 1 CHECK (enabled IN (0,1)),
last_seen_at TEXT,
last_sync_at TEXT,
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(base_url)
);
CREATE INDEX IF NOT EXISTS idx_peers_enabled
ON peers(enabled);
CREATE INDEX IF NOT EXISTS idx_peers_name
ON peers(peer_name);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
msg_uuid TEXT NOT NULL UNIQUE,
msg_hash TEXT NOT NULL,
msg_type TEXT NOT NULL,
scope TEXT NOT NULL DEFAULT 'network',
origin_node_uuid TEXT NOT NULL,
origin_node_name TEXT NOT NULL,
origin_time TEXT NOT NULL,
author TEXT NOT NULL DEFAULT '',
author_callsign TEXT NOT NULL DEFAULT '',
destination TEXT NOT NULL DEFAULT '',
title TEXT NOT NULL,
body TEXT NOT NULL,
created_at TEXT NOT NULL,
expires_at TEXT,
priority INTEGER NOT NULL DEFAULT 2 CHECK (priority BETWEEN 0 AND 9),
status TEXT NOT NULL DEFAULT 'active',
is_local_only INTEGER NOT NULL DEFAULT 0 CHECK (is_local_only IN (0,1)),
is_rf_eligible INTEGER NOT NULL DEFAULT 0 CHECK (is_rf_eligible IN (0,1)),
imported_from_peer_uuid TEXT,
imported_from_peer_name TEXT,
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
CHECK (length(trim(title)) > 0),
CHECK (length(msg_uuid) > 0),
CHECK (length(msg_hash) > 0)
);
CREATE INDEX IF NOT EXISTS idx_messages_type
ON messages(msg_type);
CREATE INDEX IF NOT EXISTS idx_messages_scope
ON messages(scope);
CREATE INDEX IF NOT EXISTS idx_messages_created_at
ON messages(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_origin_node_uuid
ON messages(origin_node_uuid);
CREATE INDEX IF NOT EXISTS idx_messages_status
ON messages(status);
CREATE INDEX IF NOT EXISTS idx_messages_expires_at
ON messages(expires_at);
CREATE INDEX IF NOT EXISTS idx_messages_hash
ON messages(msg_hash);
CREATE TABLE IF NOT EXISTS message_trace (
id INTEGER PRIMARY KEY AUTOINCREMENT,
msg_uuid TEXT NOT NULL,
event_type TEXT NOT NULL,
event_time TEXT NOT NULL,
local_node_uuid TEXT NOT NULL,
local_node_name TEXT NOT NULL,
peer_uuid TEXT,
peer_name TEXT,
detail TEXT NOT NULL DEFAULT '',
FOREIGN KEY (msg_uuid) REFERENCES messages(msg_uuid) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_message_trace_msg_uuid
ON message_trace(msg_uuid);
CREATE INDEX IF NOT EXISTS idx_message_trace_event_time
ON message_trace(event_time DESC);
CREATE TABLE IF NOT EXISTS sync_state (
id INTEGER PRIMARY KEY AUTOINCREMENT,
peer_id INTEGER NOT NULL,
msg_uuid TEXT NOT NULL,
delivery_state TEXT NOT NULL DEFAULT 'pending',
attempt_count INTEGER NOT NULL DEFAULT 0,
first_attempt_at TEXT,
last_attempt_at TEXT,
delivered_at TEXT,
last_error TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (peer_id) REFERENCES peers(id) ON DELETE CASCADE,
FOREIGN KEY (msg_uuid) REFERENCES messages(msg_uuid) ON DELETE CASCADE,
UNIQUE(peer_id, msg_uuid)
);
CREATE INDEX IF NOT EXISTS idx_sync_state_peer_id
ON sync_state(peer_id);
CREATE INDEX IF NOT EXISTS idx_sync_state_msg_uuid
ON sync_state(msg_uuid);
CREATE INDEX IF NOT EXISTS idx_sync_state_delivery_state
ON sync_state(delivery_state);
CREATE TABLE IF NOT EXISTS local_settings (
setting_key TEXT PRIMARY KEY,
setting_value TEXT NOT NULL,
updated_at TEXT NOT NULL
);
"""
class RobinDB:
def __init__(self, db_path: str | Path):
self.db_path = Path(db_path)
@contextmanager
def connect(self) -> Iterable[sqlite3.Connection]:
self.db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON;")
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def init_db(self) -> None:
with self.connect() as conn:
conn.executescript(SCHEMA_SQL)
conn.execute(
"""
INSERT OR REPLACE INTO local_settings (setting_key, setting_value, updated_at)
VALUES ('schema_version', '1', CURRENT_TIMESTAMP)
"""
)
# =========================================================
# Generic helpers
# =========================================================
def fetch_one(self, sql: str, params: tuple[Any, ...] = ()) -> dict[str, Any] | None:
with self.connect() as conn:
row = conn.execute(sql, params).fetchone()
return dict(row) if row else None
def fetch_all(self, sql: str, params: tuple[Any, ...] = ()) -> list[dict[str, Any]]:
with self.connect() as conn:
rows = conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]
def execute(self, sql: str, params: tuple[Any, ...] = ()) -> int:
with self.connect() as conn:
cur = conn.execute(sql, params)
return int(cur.lastrowid)
# =========================================================
# local_settings
# =========================================================
def get_setting(self, key: str) -> str | None:
row = self.fetch_one(
"SELECT setting_value FROM local_settings WHERE setting_key = ?",
(key,),
)
return None if row is None else str(row["setting_value"])
def set_setting(self, key: str, value: str, updated_at: str) -> None:
with self.connect() as conn:
conn.execute(
"""
INSERT INTO local_settings (setting_key, setting_value, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(setting_key) DO UPDATE SET
setting_value = excluded.setting_value,
updated_at = excluded.updated_at
""",
(key, value, updated_at),
)
# =========================================================
# node_info
# =========================================================
def get_node_info(self) -> dict[str, Any] | None:
return self.fetch_one("SELECT * FROM node_info WHERE id = 1")
def save_node_info(
self,
*,
node_uuid: str,
node_name: str,
operator_name: str,
operator_callsign: str,
location_text: str,
transport_profile: str,
created_at: str,
updated_at: str,
) -> None:
with self.connect() as conn:
existing = conn.execute("SELECT id FROM node_info WHERE id = 1").fetchone()
if existing:
conn.execute(
"""
UPDATE node_info
SET node_uuid = ?,
node_name = ?,
operator_name = ?,
operator_callsign = ?,
location_text = ?,
transport_profile = ?,
updated_at = ?
WHERE id = 1
""",
(
node_uuid,
node_name,
operator_name,
operator_callsign,
location_text,
transport_profile,
updated_at,
),
)
else:
conn.execute(
"""
INSERT INTO node_info (
id,
node_uuid,
node_name,
operator_name,
operator_callsign,
location_text,
transport_profile,
created_at,
updated_at
) VALUES (1, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
node_uuid,
node_name,
operator_name,
operator_callsign,
location_text,
transport_profile,
created_at,
updated_at,
),
)
# =========================================================
# peers
# =========================================================
def add_peer(
self,
*,
peer_name: str,
base_url: str,
transport: str = "lan",
peer_uuid: str | None = None,
enabled: int = 1,
notes: str = "",
created_at: str,
updated_at: str,
) -> int:
with self.connect() as conn:
cur = conn.execute(
"""
INSERT INTO peers (
peer_uuid,
peer_name,
base_url,
transport,
enabled,
notes,
created_at,
updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
peer_uuid,
peer_name,
base_url,
transport,
int(bool(enabled)),
notes,
created_at,
updated_at,
),
)
return int(cur.lastrowid)
def upsert_peer(
self,
*,
peer_name: str,
base_url: str,
transport: str = "lan",
peer_uuid: str | None = None,
enabled: int = 1,
notes: str = "",
updated_at: str,
created_at: str | None = None,
) -> int:
with self.connect() as conn:
existing = conn.execute(
"SELECT id FROM peers WHERE base_url = ?",
(base_url,),
).fetchone()
if existing:
peer_id = int(existing["id"])
conn.execute(
"""
UPDATE peers
SET peer_uuid = ?,
peer_name = ?,
transport = ?,
enabled = ?,
notes = ?,
updated_at = ?
WHERE id = ?
""",
(
peer_uuid,
peer_name,
transport,
int(bool(enabled)),
notes,
updated_at,
peer_id,
),
)
return peer_id
if created_at is None:
created_at = updated_at
cur = conn.execute(
"""
INSERT INTO peers (
peer_uuid,
peer_name,
base_url,
transport,
enabled,
notes,
created_at,
updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
peer_uuid,
peer_name,
base_url,
transport,
int(bool(enabled)),
notes,
created_at,
updated_at,
),
)
return int(cur.lastrowid)
def get_peer_by_id(self, peer_id: int) -> dict[str, Any] | None:
return self.fetch_one("SELECT * FROM peers WHERE id = ?", (peer_id,))
def get_peer_by_url(self, base_url: str) -> dict[str, Any] | None:
return self.fetch_one("SELECT * FROM peers WHERE base_url = ?", (base_url,))
def list_peers(self, enabled_only: bool = False) -> list[dict[str, Any]]:
if enabled_only:
return self.fetch_all(
"SELECT * FROM peers WHERE enabled = 1 ORDER BY peer_name COLLATE NOCASE"
)
return self.fetch_all("SELECT * FROM peers ORDER BY peer_name COLLATE NOCASE")
def update_peer_seen(
self,
*,
peer_id: int,
last_seen_at: str | None = None,
last_sync_at: str | None = None,
updated_at: str,
) -> None:
with self.connect() as conn:
current = conn.execute(
"SELECT last_seen_at, last_sync_at FROM peers WHERE id = ?",
(peer_id,),
).fetchone()
if not current:
raise ValueError(f"Peer id {peer_id} not found")
new_last_seen = last_seen_at if last_seen_at is not None else current["last_seen_at"]
new_last_sync = last_sync_at if last_sync_at is not None else current["last_sync_at"]
conn.execute(
"""
UPDATE peers
SET last_seen_at = ?,
last_sync_at = ?,
updated_at = ?
WHERE id = ?
""",
(new_last_seen, new_last_sync, updated_at, peer_id),
)
def set_peer_enabled(self, peer_id: int, enabled: bool, updated_at: str) -> None:
with self.connect() as conn:
conn.execute(
"""
UPDATE peers
SET enabled = ?,
updated_at = ?
WHERE id = ?
""",
(1 if enabled else 0, updated_at, peer_id),
)
def delete_peer(self, peer_id: int) -> None:
with self.connect() as conn:
conn.execute("DELETE FROM peers WHERE id = ?", (peer_id,))
# =========================================================
# messages
# =========================================================
def create_message(
self,
*,
msg_uuid: str,
msg_hash: str,
msg_type: str,
scope: str,
origin_node_uuid: str,
origin_node_name: str,
origin_time: str,
author: str,
author_callsign: str,
destination: str,
title: str,
body: str,
created_at: str,
expires_at: str | None,
priority: int,
status: str,
is_local_only: int,
is_rf_eligible: int,
imported_from_peer_uuid: str | None,
imported_from_peer_name: str | None,
first_seen_at: str,
last_seen_at: str,
) -> int:
with self.connect() as conn:
cur = conn.execute(
"""
INSERT INTO messages (
msg_uuid,
msg_hash,
msg_type,
scope,
origin_node_uuid,
origin_node_name,
origin_time,
author,
author_callsign,
destination,
title,
body,
created_at,
expires_at,
priority,
status,
is_local_only,
is_rf_eligible,
imported_from_peer_uuid,
imported_from_peer_name,
first_seen_at,
last_seen_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
msg_uuid,
msg_hash,
msg_type,
scope,
origin_node_uuid,
origin_node_name,
origin_time,
author,
author_callsign,
destination,
title,
body,
created_at,
expires_at,
priority,
status,
int(bool(is_local_only)),
int(bool(is_rf_eligible)),
imported_from_peer_uuid,
imported_from_peer_name,
first_seen_at,
last_seen_at,
),
)
return int(cur.lastrowid)
def message_exists(self, msg_uuid: str) -> bool:
row = self.fetch_one(
"SELECT 1 AS exists_flag FROM messages WHERE msg_uuid = ?",
(msg_uuid,),
)
return row is not None
def get_message_by_uuid(self, msg_uuid: str) -> dict[str, Any] | None:
return self.fetch_one(
"SELECT * FROM messages WHERE msg_uuid = ?",
(msg_uuid,),
)
def get_message_by_hash(self, msg_hash: str) -> list[dict[str, Any]]:
return self.fetch_all(
"SELECT * FROM messages WHERE msg_hash = ? ORDER BY created_at DESC",
(msg_hash,),
)
def list_messages(
self,
*,
msg_type: str | None = None,
status: str | None = None,
limit: int = 100,
include_expired: bool = True,
) -> list[dict[str, Any]]:
sql = "SELECT * FROM messages WHERE 1=1"
params: list[Any] = []
if msg_type:
sql += " AND msg_type = ?"
params.append(msg_type)
if status:
sql += " AND status = ?"
params.append(status)
if not include_expired:
sql += " AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)"
sql += " ORDER BY created_at DESC LIMIT ?"
params.append(limit)
return self.fetch_all(sql, tuple(params))
def list_message_summaries(
self,
*,
limit: int = 500,
include_local_only: bool = False,
) -> list[dict[str, Any]]:
sql = """
SELECT
msg_uuid,
msg_hash,
msg_type,
scope,
origin_node_uuid,
origin_node_name,
origin_time,
author,
author_callsign,
destination,
title,
created_at,
expires_at,
priority,
status,
is_local_only,
is_rf_eligible
FROM messages
WHERE status = 'active'
AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
"""
params: list[Any] = []
if not include_local_only:
sql += " AND is_local_only = 0"
sql += " ORDER BY created_at DESC LIMIT ?"
params.append(limit)
return self.fetch_all(sql, tuple(params))
def update_message_last_seen(
self,
*,
msg_uuid: str,
last_seen_at: str,
imported_from_peer_uuid: str | None = None,
imported_from_peer_name: str | None = None,
) -> None:
with self.connect() as conn:
conn.execute(
"""
UPDATE messages
SET last_seen_at = ?,
imported_from_peer_uuid = COALESCE(?, imported_from_peer_uuid),
imported_from_peer_name = COALESCE(?, imported_from_peer_name)
WHERE msg_uuid = ?
""",
(
last_seen_at,
imported_from_peer_uuid,
imported_from_peer_name,
msg_uuid,
),
)
def set_message_status(self, msg_uuid: str, status: str, last_seen_at: str) -> None:
with self.connect() as conn:
conn.execute(
"""
UPDATE messages
SET status = ?,
last_seen_at = ?
WHERE msg_uuid = ?
""",
(status, last_seen_at, msg_uuid),
)
def delete_message(self, msg_uuid: str) -> None:
with self.connect() as conn:
conn.execute("DELETE FROM messages WHERE msg_uuid = ?", (msg_uuid,))
# =========================================================
# import helper
# =========================================================
def import_message_if_new(
self,
*,
message: dict[str, Any],
local_node_uuid: str,
local_node_name: str,
event_time: str,
imported_from_peer_uuid: str | None = None,
imported_from_peer_name: str | None = None,
) -> bool:
existing = self.get_message_by_uuid(message["msg_uuid"])
if existing:
self.update_message_last_seen(
msg_uuid=message["msg_uuid"],
last_seen_at=event_time,
imported_from_peer_uuid=imported_from_peer_uuid,
imported_from_peer_name=imported_from_peer_name,
)
self.record_trace_event(
msg_uuid=message["msg_uuid"],
event_type="rejected",
event_time=event_time,
local_node_uuid=local_node_uuid,
local_node_name=local_node_name,
peer_uuid=imported_from_peer_uuid,
peer_name=imported_from_peer_name,
detail="Duplicate import ignored",
)
return False
self.create_message(
msg_uuid=message["msg_uuid"],
msg_hash=message["msg_hash"],
msg_type=message["msg_type"],
scope=message.get("scope", "network"),
origin_node_uuid=message["origin_node_uuid"],
origin_node_name=message["origin_node_name"],
origin_time=message["origin_time"],
author=message.get("author", ""),
author_callsign=message.get("author_callsign", ""),
destination=message.get("destination", ""),
title=message["title"],
body=message["body"],
created_at=message["created_at"],
expires_at=message.get("expires_at"),
priority=int(message.get("priority", 2)),
status=message.get("status", "active"),
is_local_only=int(bool(message.get("is_local_only", False))),
is_rf_eligible=int(bool(message.get("is_rf_eligible", False))),
imported_from_peer_uuid=imported_from_peer_uuid,
imported_from_peer_name=imported_from_peer_name,
first_seen_at=event_time,
last_seen_at=event_time,
)
self.record_trace_event(
msg_uuid=message["msg_uuid"],
event_type="imported",
event_time=event_time,
local_node_uuid=local_node_uuid,
local_node_name=local_node_name,
peer_uuid=imported_from_peer_uuid,
peer_name=imported_from_peer_name,
detail="Imported from peer",
)
return True
# =========================================================
# message_trace
# =========================================================
def record_trace_event(
self,
*,
msg_uuid: str,
event_type: str,
event_time: str,
local_node_uuid: str,
local_node_name: str,
peer_uuid: str | None = None,
peer_name: str | None = None,
detail: str = "",
) -> int:
with self.connect() as conn:
cur = conn.execute(
"""
INSERT INTO message_trace (
msg_uuid,
event_type,
event_time,
local_node_uuid,
local_node_name,
peer_uuid,
peer_name,
detail
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
msg_uuid,
event_type,
event_time,
local_node_uuid,
local_node_name,
peer_uuid,
peer_name,
detail,
),
)
return int(cur.lastrowid)
def get_message_trace(self, msg_uuid: str) -> list[dict[str, Any]]:
return self.fetch_all(
"""
SELECT *
FROM message_trace
WHERE msg_uuid = ?
ORDER BY event_time ASC, id ASC
""",
(msg_uuid,),
)
# =========================================================
# sync_state
# =========================================================
def get_sync_state(self, peer_id: int, msg_uuid: str) -> dict[str, Any] | None:
return self.fetch_one(
"""
SELECT *
FROM sync_state
WHERE peer_id = ? AND msg_uuid = ?
""",
(peer_id, msg_uuid),
)
def upsert_sync_state(
self,
*,
peer_id: int,
msg_uuid: str,
delivery_state: str,
updated_at: str,
attempt_count: int | None = None,
first_attempt_at: str | None = None,
last_attempt_at: str | None = None,
delivered_at: str | None = None,
last_error: str = "",
created_at: str | None = None,
) -> int:
with self.connect() as conn:
existing = conn.execute(
"""
SELECT id, attempt_count, first_attempt_at, created_at
FROM sync_state
WHERE peer_id = ? AND msg_uuid = ?
""",
(peer_id, msg_uuid),
).fetchone()
if existing:
row_id = int(existing["id"])
current_attempt_count = int(existing["attempt_count"] or 0)
final_attempt_count = current_attempt_count if attempt_count is None else attempt_count
existing_first_attempt = existing["first_attempt_at"]
final_first_attempt = first_attempt_at if first_attempt_at is not None else existing_first_attempt
conn.execute(
"""
UPDATE sync_state
SET delivery_state = ?,
attempt_count = ?,
first_attempt_at = ?,
last_attempt_at = ?,
delivered_at = ?,
last_error = ?,
updated_at = ?
WHERE id = ?
""",
(
delivery_state,
final_attempt_count,
final_first_attempt,
last_attempt_at,
delivered_at,
last_error,
updated_at,
row_id,
),
)
return row_id
if created_at is None:
created_at = updated_at
if attempt_count is None:
attempt_count = 0
cur = conn.execute(
"""
INSERT INTO sync_state (
peer_id,
msg_uuid,
delivery_state,
attempt_count,
first_attempt_at,
last_attempt_at,
delivered_at,
last_error,
created_at,
updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
peer_id,
msg_uuid,
delivery_state,
attempt_count,
first_attempt_at,
last_attempt_at,
delivered_at,
last_error,
created_at,
updated_at,
),
)
return int(cur.lastrowid)
def mark_sync_pending(
self,
*,
peer_id: int,
msg_uuid: str,
updated_at: str,
) -> int:
existing = self.get_sync_state(peer_id, msg_uuid)
attempt_count = 0 if not existing else int(existing["attempt_count"])
first_attempt_at = None if not existing else existing["first_attempt_at"]
return self.upsert_sync_state(
peer_id=peer_id,
msg_uuid=msg_uuid,
delivery_state="pending",
attempt_count=attempt_count,
first_attempt_at=first_attempt_at,
last_attempt_at=None,
delivered_at=None,
last_error="",
updated_at=updated_at,
created_at=updated_at if not existing else None,
)
def mark_sync_sent(
self,
*,
peer_id: int,
msg_uuid: str,
attempt_time: str,
) -> int:
existing = self.get_sync_state(peer_id, msg_uuid)
current_attempts = 0 if not existing else int(existing["attempt_count"])
first_attempt = attempt_time if not existing or not existing["first_attempt_at"] else existing["first_attempt_at"]
return self.upsert_sync_state(
peer_id=peer_id,
msg_uuid=msg_uuid,
delivery_state="sent",
attempt_count=current_attempts + 1,
first_attempt_at=first_attempt,
last_attempt_at=attempt_time,
delivered_at=attempt_time,
last_error="",
updated_at=attempt_time,
created_at=attempt_time if not existing else None,
)
def mark_sync_failed(
self,
*,
peer_id: int,
msg_uuid: str,
attempt_time: str,
error_text: str,
) -> int:
existing = self.get_sync_state(peer_id, msg_uuid)
current_attempts = 0 if not existing else int(existing["attempt_count"])
first_attempt = attempt_time if not existing or not existing["first_attempt_at"] else existing["first_attempt_at"]
return self.upsert_sync_state(
peer_id=peer_id,
msg_uuid=msg_uuid,
delivery_state="failed",
attempt_count=current_attempts + 1,
first_attempt_at=first_attempt,
last_attempt_at=attempt_time,
delivered_at=None,
last_error=error_text,
updated_at=attempt_time,
created_at=attempt_time if not existing else None,
)
def list_sync_queue(
self,
*,
peer_id: int | None = None,
delivery_state: str | None = None,
limit: int = 200,
) -> list[dict[str, Any]]:
sql = """
SELECT
s.*,
p.peer_name,
p.base_url,
m.title,
m.msg_type,
m.created_at AS message_created_at
FROM sync_state s
JOIN peers p ON p.id = s.peer_id
JOIN messages m ON m.msg_uuid = s.msg_uuid
WHERE 1=1
"""
params: list[Any] = []
if peer_id is not None:
sql += " AND s.peer_id = ?"
params.append(peer_id)
if delivery_state is not None:
sql += " AND s.delivery_state = ?"
params.append(delivery_state)
sql += " ORDER BY s.updated_at DESC LIMIT ?"
params.append(limit)
return self.fetch_all(sql, tuple(params))
# =========================================================
# maintenance helpers
# =========================================================
def expire_messages(self, now_iso: str) -> int:
with self.connect() as conn:
cur = conn.execute(
"""
UPDATE messages
SET status = 'expired',
last_seen_at = ?
WHERE status = 'active'
AND expires_at IS NOT NULL
AND expires_at <= ?
""",
(now_iso, now_iso),
)
return int(cur.rowcount)
def list_exportable_messages(self, limit: int = 500) -> list[dict[str, Any]]:
return self.fetch_all(
"""
SELECT *
FROM messages
WHERE status = 'active'
AND is_local_only = 0
AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
ORDER BY created_at DESC
LIMIT ?
""",
(limit,),
)
Example Usage
from robinnet.db import RobinDB
db = RobinDB("./data/robinnet.db")
db.init_db()
db.save_node_info(
node_uuid="11111111-1111-1111-1111-111111111111",
node_name="alpha",
operator_name="Rich",
operator_callsign="",
location_text="Albany area",
transport_profile="lan",
created_at="2026-03-11T12:00:00Z",
updated_at="2026-03-11T12:00:00Z",
)
node = db.get_node_info()
print(node)
