Files
2026-06-21 05:31:53 -04:00

510 lines
21 KiB
Python

"""
Civic Orchestrator — Diagnostic Record Receiver
=================================================
Receives diagnostic record envelopes from Hubzilla addons on this node,
validates the node token, and writes records to JSON storage.
This file began as the cry01 capacity-signal receiver. It now also
receives vs01, dsc01, and scn01 diagnostic records. All four addons share
one NODE_TOKEN — see docs/orchestrator/README.md for the trust boundary
this implies. Each addon has its own route and its own storage rule;
nothing is shared across addons except the token and this process.
Endpoints:
POST /cry01/signal — register a capacity signal (unchanged)
GET /cry01/signals — list signals for an association (unchanged)
POST /vs01/record — append a Vital Signs record (append-only)
POST /dsc01/record — store a DSC Categories record (overwrite-by-Placekey)
POST /scn01/record — append a Scenario attestation record (append-only)
GET /health — health check
Configuration via environment variables (set in systemd unit or shell):
NODE_TOKEN — shared secret, must match each addon's config.json node_token
SPOOL_FILE — cry01 signal spool path (unchanged)
default: /srv/civic-orchestrator/data/cry01_signals.json
DIAGNOSTIC_DATA_DIR — root directory for vs01/dsc01/scn01 JSON storage
default: /srv/civic-orchestrator/data
Storage layout under DIAGNOSTIC_DATA_DIR:
vs01/{association_slug}/{participant_id}.json — one array per participant, appended
dsc01/{association_slug}/{participant_id}.json — one file per participant, overwritten
scn01/{association_slug}/records.json — one array per association, appended
vs01 is append-only, not overwrite-by-Placekey, as of this version. Reason:
the homeowner perspective is raw, mutable, participant-submitted input — not
the certified record. A sequence of submissions over time, including sparse
or hostile ones (e.g. a new occupant overwriting a detailed record with
"I don't know"), is itself part of the diagnostic picture and must not be
silently destroyed. The Stage 2 JSON-to-MariaDB parser is responsible for
deriving occupancy epochs and any other interpretation from this raw
sequence — the orchestrator does not infer or collapse history. dsc01
remains on the overwrite handler for now; the same question applies to it
and will be addressed in its own pass.
See docs/orchestrator/receiver-spec.md for the full envelope contract.
"""
import hashlib
import json
import logging
import os
import re
import time
from pathlib import Path
from typing import Any
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
NODE_TOKEN = os.environ.get("NODE_TOKEN", "")
SPOOL_FILE = Path(os.environ.get("SPOOL_FILE", "/srv/civic-orchestrator/data/cry01_signals.json"))
DIAGNOSTIC_DATA_DIR = Path(os.environ.get("DIAGNOSTIC_DATA_DIR", "/srv/civic-orchestrator/data"))
# Association slugs and participant IDs are used directly as path components.
# This pattern is deliberately strict — it only allows characters that are
# safe in a POSIX path and that match the actual shapes of a slug and a
# Placekey. Anything else is rejected before it ever touches the filesystem.
_SAFE_PATH_COMPONENT = re.compile(r"^[A-Za-z0-9_\-]+$")
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("civic-orchestrator")
# ---------------------------------------------------------------------------
# App
# ---------------------------------------------------------------------------
app = FastAPI(title="Civic Orchestrator — Diagnostic Record Receiver", version="0.3.0")
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def verify_token(token_hash: str) -> bool:
"""Verify the node token hash sent by the Hubzilla addon."""
if not NODE_TOKEN:
log.error("NODE_TOKEN is not set — all requests will be rejected.")
return False
expected = hashlib.sha256(NODE_TOKEN.encode()).hexdigest()
return token_hash == expected
def safe_path_component(value: str, field_name: str) -> str:
"""Validate a string is safe to use as a path component. Raises 400 if not."""
if not value or not _SAFE_PATH_COMPONENT.match(value):
raise HTTPException(
status_code=400,
detail=f"Invalid {field_name}: must be non-empty and contain only letters, digits, hyphens, and underscores.",
)
return value
def atomic_write_json(path: Path, data: Any) -> None:
"""Write JSON to disk atomically — write to a temp file, then rename over the target."""
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w") as f:
json.dump(data, f, indent=2)
tmp.replace(path)
def load_json_or_default(path: Path, default: Any) -> Any:
"""Load JSON from disk, returning a default value if the file does not exist or is corrupt."""
if not path.exists():
return default
try:
with path.open("r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError) as e:
log.error("Failed to load %s: %s", path, e)
return default
def validate_common_header(header: dict, expected_addon: str) -> list[str]:
"""Return a list of missing/invalid common header fields. Empty list means valid."""
required = [
"addon", "association_slug", "association_channel_id",
"participant_id", "submitted_at", "standing",
]
missing = [f for f in required if not header.get(f)]
if header.get("addon") != expected_addon:
missing.append(f"addon (expected '{expected_addon}')")
return missing
def _log_failed_write(addon: str, record: dict) -> None:
"""Append a record that failed to write to its normal location into a separate failure log.
This is a last-resort safety net — it must never itself raise."""
try:
failure_log = DIAGNOSTIC_DATA_DIR / "_failed_writes.jsonl"
failure_log.parent.mkdir(parents=True, exist_ok=True)
with failure_log.open("a") as f:
f.write(json.dumps({"addon": addon, "record": record, "logged_at": int(time.time())}) + "\n")
except Exception as e:
log.critical("FAILURE LOG ITSELF FAILED — record may be lost. addon=%s error=%s", addon, e)
# ---------------------------------------------------------------------------
# cry01 — capacity signals (unchanged from prior version)
# ---------------------------------------------------------------------------
def load_spool() -> list:
"""Load the cry01 signal spool file, returning an empty list if it does not exist."""
if not SPOOL_FILE.exists():
return []
try:
with SPOOL_FILE.open("r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError) as e:
log.error("Failed to load spool file: %s", e)
return []
def save_spool(signals: list) -> None:
"""Write the cry01 signal spool list to disk atomically."""
atomic_write_json(SPOOL_FILE, signals)
@app.get("/health")
def health() -> dict:
return {
"status": "ok",
"spool_file": str(SPOOL_FILE),
"spool_exists": SPOOL_FILE.exists(),
"signal_count": len(load_spool()),
"diagnostic_data_dir": str(DIAGNOSTIC_DATA_DIR),
}
@app.post("/cry01/signal")
async def receive_signal(request: Request) -> JSONResponse:
"""
Accept a capacity signal from cry01_spool.php.
Expected JSON body matches contracts/signal-v1.json.
"""
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body.")
header = body.get("_header", {})
payload = body.get("_payload", {})
token_hash = header.get("node_token_hash", "")
if not verify_token(token_hash):
log.warning("Rejected signal — invalid node token hash.")
raise HTTPException(status_code=403, detail="Invalid node token.")
required_header = ["addon", "association_slug", "participant_xchan", "submitted_at"]
required_payload = ["g1_pubkey", "capacity_description", "g1_denomination",
"duration_days", "category_id", "direction"]
missing = [f for f in required_header if not header.get(f)]
missing += [f for f in required_payload if not payload.get(f)]
if missing:
raise HTTPException(status_code=422, detail=f"Missing fields: {missing}")
if header.get("addon") != "cry01":
raise HTTPException(status_code=400, detail="Unexpected addon identifier.")
if payload.get("direction") not in ("offer", "seek"):
raise HTTPException(status_code=422, detail="direction must be 'offer' or 'seek'.")
entry: dict[str, Any] = {
"_received_at": int(time.time()),
"_header": header,
"_payload": payload,
"_status": "pending",
}
signals = load_spool()
signals.append(entry)
save_spool(signals)
log.info(
"Signal received: assoc=%s xchan=%s direction=%s category=%s",
header.get("association_slug"),
header.get("participant_xchan", "")[:12] + "...",
payload.get("direction"),
payload.get("category_id"),
)
return JSONResponse(status_code=201, content={"status": "accepted"})
@app.get("/cry01/signals")
async def list_signals(
association_slug: str = "",
x_node_token_hash: str = Header(default=""),
) -> JSONResponse:
"""List cry01 signals. Operator use only — requires node token."""
if not verify_token(x_node_token_hash):
raise HTTPException(status_code=403, detail="Invalid node token.")
signals = load_spool()
if association_slug:
signals = [
s for s in signals
if s.get("_header", {}).get("association_slug") == association_slug
]
return JSONResponse(content={"count": len(signals), "signals": signals})
# ---------------------------------------------------------------------------
# vs01 — append-only
# The homeowner perspective is raw, mutable, participant-submitted input,
# not the certified record. Every submission is preserved — nothing is
# ever overwritten or destroyed by the orchestrator. See module docstring
# above for the reasoning. Modeled on scn01's append pattern, but without
# the snapshot prerequisite (that requirement is specific to Scenarios).
# ---------------------------------------------------------------------------
@app.post("/vs01/record")
async def receive_vs01_record(request: Request) -> JSONResponse:
"""
Accept a Vital Signs record from vs01_spool.php.
Appends to the participant's records array. Never overwrites or truncates.
See docs/orchestrator/receiver-spec.md and contracts/vs01/record-v1.json.
"""
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body.")
header = body.get("_header", {})
payload = body.get("_payload", {})
token_hash = header.get("node_token_hash", "")
if not verify_token(token_hash):
log.warning("Rejected vs01 record — invalid node token hash.")
raise HTTPException(status_code=403, detail="Invalid node token.")
missing = validate_common_header(header, "vs01")
if missing:
raise HTTPException(status_code=400, detail=f"Missing or invalid header fields: {missing}")
fields = payload.get("fields")
if not fields:
raise HTTPException(status_code=400, detail="payload.fields is required and must be non-empty.")
association_slug = safe_path_component(header["association_slug"], "association_slug")
participant_id = safe_path_component(header["participant_id"], "participant_id")
records_path = DIAGNOSTIC_DATA_DIR / "vs01" / association_slug / f"{participant_id}.json"
record: dict[str, Any] = {
"_received_at": int(time.time()),
"_header": header,
"_payload": payload,
}
try:
records = load_json_or_default(records_path, [])
if not isinstance(records, list):
# The file exists but is not an array — this is a corruption state,
# most likely a leftover file from the prior overwrite-by-Placekey
# behavior (a single object, not an array). Refuse to silently
# coerce it; fail loudly instead.
log.critical("vs01 records file is not a JSON array: %s", records_path)
_log_failed_write("vs01", record)
raise HTTPException(status_code=500, detail="Records storage is in an unexpected state. This submission was not lost — contact the operator.")
records.append(record)
atomic_write_json(records_path, records)
record_index = len(records) - 1
except OSError as e:
log.error("Failed to write vs01 record for %s/%s: %s", association_slug, participant_id, e)
_log_failed_write("vs01", record)
raise HTTPException(status_code=500, detail="Storage write failed. This submission was not lost — contact the operator.")
log.info(
"vs01 record appended: assoc=%s participant=%s index=%d",
association_slug, participant_id[:12] + "...", record_index,
)
return JSONResponse(status_code=200, content={
"status": "ok",
"addon": "vs01",
"participant_id": participant_id,
"record_index": record_index,
"stored_at": header["submitted_at"],
})
# ---------------------------------------------------------------------------
# dsc01 — overwrite-by-Placekey (unchanged)
# See docs/orchestrator/receiver-spec.md and contracts/dsc01/record-v1.json.
# ---------------------------------------------------------------------------
async def _receive_overwrite_record(request: Request, addon: str) -> JSONResponse:
"""Handler for dsc01 — validates and overwrites the participant's record."""
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body.")
header = body.get("_header", {})
payload = body.get("_payload", {})
token_hash = header.get("node_token_hash", "")
if not verify_token(token_hash):
log.warning("Rejected %s record — invalid node token hash.", addon)
raise HTTPException(status_code=403, detail="Invalid node token.")
missing = validate_common_header(header, addon)
if missing:
raise HTTPException(status_code=400, detail=f"Missing or invalid header fields: {missing}")
fields = payload.get("fields")
if not fields:
raise HTTPException(status_code=400, detail="payload.fields is required and must be non-empty.")
association_slug = safe_path_component(header["association_slug"], "association_slug")
participant_id = safe_path_component(header["participant_id"], "participant_id")
record_path = DIAGNOSTIC_DATA_DIR / addon / association_slug / f"{participant_id}.json"
record: dict[str, Any] = {
"_received_at": int(time.time()),
"_header": header,
"_payload": payload,
}
try:
atomic_write_json(record_path, record)
except OSError as e:
log.error("Failed to write %s record for %s/%s: %s", addon, association_slug, participant_id, e)
_log_failed_write(addon, record)
raise HTTPException(status_code=500, detail="Storage write failed. This submission was not lost — contact the operator.")
log.info(
"%s record stored: assoc=%s participant=%s",
addon, association_slug, participant_id[:12] + "...",
)
return JSONResponse(status_code=200, content={
"status": "ok",
"addon": addon,
"participant_id": participant_id,
"stored_at": header["submitted_at"],
})
@app.post("/dsc01/record")
async def receive_dsc01_record(request: Request) -> JSONResponse:
"""
Accept a DSC Categories record from dsc01_spool.php.
Overwrites the participant's existing record in full.
See docs/orchestrator/receiver-spec.md and contracts/dsc01/record-v1.json.
"""
return await _receive_overwrite_record(request, "dsc01")
# ---------------------------------------------------------------------------
# scn01 — append-only immutable attestation records
# See docs/orchestrator/receiver-spec.md and contracts/scn01/record-v1.json.
# ---------------------------------------------------------------------------
@app.post("/scn01/record")
async def receive_scn01_record(request: Request) -> JSONResponse:
"""
Accept a Scenario attestation record from scn01_spool.php.
Appends to the association's records array. Never overwrites or truncates.
g1_tx_hash is optional and never required for acceptance.
"""
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body.")
header = body.get("_header", {})
payload = body.get("_payload", {})
credentials = body.get("_credentials", {})
token_hash = header.get("node_token_hash", "")
if not verify_token(token_hash):
log.warning("Rejected scn01 record — invalid node token hash.")
raise HTTPException(status_code=403, detail="Invalid node token.")
missing = validate_common_header(header, "scn01")
if missing:
raise HTTPException(status_code=400, detail=f"Missing or invalid header fields: {missing}")
pinned_ids = payload.get("pinned_scenario_ids")
narrative = payload.get("narrative")
vs_snap = payload.get("vs_snapshot")
dsc_snap = payload.get("dsc_snapshot")
if not pinned_ids or not isinstance(pinned_ids, list) or not (1 <= len(pinned_ids) <= 3):
raise HTTPException(status_code=400, detail="payload.pinned_scenario_ids must be an array of 1 to 3 entries.")
if not narrative:
raise HTTPException(status_code=400, detail="payload.narrative is required and must be non-empty.")
# Prerequisite backstop — the Hubzilla addon enforces this before the form
# is shown; this is defense-in-depth, not the primary gate.
if not vs_snap or not dsc_snap:
raise HTTPException(
status_code=400,
detail="payload.vs_snapshot and payload.dsc_snapshot are both required. "
"A Scenario cannot be submitted without an existing VS and DSC record.",
)
# g1_tx_hash is optional. Absence never blocks acceptance — the coin is
# a utility that strengthens evidentiary weight, not a currency that
# gates the record. See docs/orchestrator/README.md.
g1_tx_hash = credentials.get("g1_tx_hash")
association_slug = safe_path_component(header["association_slug"], "association_slug")
records_path = DIAGNOSTIC_DATA_DIR / "scn01" / association_slug / "records.json"
record: dict[str, Any] = {
"_received_at": int(time.time()),
"_header": header,
"_payload": payload,
"_credentials": {"g1_tx_hash": g1_tx_hash},
}
try:
records = load_json_or_default(records_path, [])
if not isinstance(records, list):
# The file exists but is not an array — this is a corruption state.
# Refuse to silently coerce it; fail loudly instead.
log.critical("scn01 records file is not a JSON array: %s", records_path)
_log_failed_write("scn01", record)
raise HTTPException(status_code=500, detail="Records storage is in an unexpected state. This submission was not lost — contact the operator.")
records.append(record)
atomic_write_json(records_path, records)
record_index = len(records) - 1
except OSError as e:
log.error("Failed to write scn01 record for %s: %s", association_slug, e)
_log_failed_write("scn01", record)
raise HTTPException(status_code=500, detail="Storage write failed. This submission was not lost — contact the operator.")
log.info(
"scn01 record appended: assoc=%s participant=%s index=%d g1_tx_hash=%s",
association_slug,
header.get("participant_id", "")[:12] + "...",
record_index,
"present" if g1_tx_hash else "absent",
)
return JSONResponse(status_code=200, content={
"status": "ok",
"addon": "scn01",
"participant_id": header["participant_id"],
"record_index": record_index,
"g1_tx_hash": g1_tx_hash,
"stored_at": header["submitted_at"],
})