From ce1408f90cd8bf573524b44850883df3af173d5a Mon Sep 17 00:00:00 2001 From: TheRON Date: Fri, 19 Jun 2026 03:20:34 -0400 Subject: [PATCH] Initial push --- orchestrator/app/main.py | 432 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 432 insertions(+) create mode 100644 orchestrator/app/main.py diff --git a/orchestrator/app/main.py b/orchestrator/app/main.py new file mode 100644 index 0000000..10a3afb --- /dev/null +++ b/orchestrator/app/main.py @@ -0,0 +1,432 @@ +""" +Civic Orchestrator — Diagnostic Record Receiver +================================================= +Receives diagnostic record envelopes from Hubzilla addons on this node, +validates the node token, and writes records to JSON storage. + +This file began as the cry01 capacity-signal receiver. It now also +receives vs01, dsc01, and scn01 diagnostic records. All four addons share +one NODE_TOKEN — see docs/orchestrator/README.md for the trust boundary +this implies. Each addon has its own route and its own storage rule; +nothing is shared across addons except the token and this process. + +Endpoints: + POST /cry01/signal — register a capacity signal (unchanged) + GET /cry01/signals — list signals for an association (unchanged) + POST /vs01/record — store a Vital Signs record (overwrite-by-Placekey) + POST /dsc01/record — store a DSC Categories record (overwrite-by-Placekey) + POST /scn01/record — append a Scenario attestation record (append-only) + GET /health — health check + +Configuration via environment variables (set in systemd unit or shell): + NODE_TOKEN — shared secret, must match each addon's config.json node_token + SPOOL_FILE — cry01 signal spool path (unchanged) + default: /srv/civic-orchestrator/data/cry01_signals.json + DIAGNOSTIC_DATA_DIR — root directory for vs01/dsc01/scn01 JSON storage + default: /srv/civic-orchestrator/data + +Storage layout under DIAGNOSTIC_DATA_DIR: + vs01/{association_slug}/{participant_id}.json — one file per participant, overwritten + dsc01/{association_slug}/{participant_id}.json — one file per participant, overwritten + scn01/{association_slug}/records.json — one array per association, appended + +See docs/orchestrator/receiver-spec.md for the full envelope contract. +""" + +import hashlib +import json +import logging +import os +import re +import time +from pathlib import Path +from typing import Any + +from fastapi import FastAPI, Header, HTTPException, Request +from fastapi.responses import JSONResponse + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +NODE_TOKEN = os.environ.get("NODE_TOKEN", "") +SPOOL_FILE = Path(os.environ.get("SPOOL_FILE", "/srv/civic-orchestrator/data/cry01_signals.json")) +DIAGNOSTIC_DATA_DIR = Path(os.environ.get("DIAGNOSTIC_DATA_DIR", "/srv/civic-orchestrator/data")) + +# Association slugs and participant IDs are used directly as path components. +# This pattern is deliberately strict — it only allows characters that are +# safe in a POSIX path and that match the actual shapes of a slug and a +# Placekey. Anything else is rejected before it ever touches the filesystem. +_SAFE_PATH_COMPONENT = re.compile(r"^[A-Za-z0-9_\-]+$") + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", +) +log = logging.getLogger("civic-orchestrator") + +# --------------------------------------------------------------------------- +# App +# --------------------------------------------------------------------------- + +app = FastAPI(title="Civic Orchestrator — Diagnostic Record Receiver", version="0.2.0") + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + +def verify_token(token_hash: str) -> bool: + """Verify the node token hash sent by the Hubzilla addon.""" + if not NODE_TOKEN: + log.error("NODE_TOKEN is not set — all requests will be rejected.") + return False + expected = hashlib.sha256(NODE_TOKEN.encode()).hexdigest() + return token_hash == expected + + +def safe_path_component(value: str, field_name: str) -> str: + """Validate a string is safe to use as a path component. Raises 400 if not.""" + if not value or not _SAFE_PATH_COMPONENT.match(value): + raise HTTPException( + status_code=400, + detail=f"Invalid {field_name}: must be non-empty and contain only letters, digits, hyphens, and underscores.", + ) + return value + + +def atomic_write_json(path: Path, data: Any) -> None: + """Write JSON to disk atomically — write to a temp file, then rename over the target.""" + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + with tmp.open("w") as f: + json.dump(data, f, indent=2) + tmp.replace(path) + + +def load_json_or_default(path: Path, default: Any) -> Any: + """Load JSON from disk, returning a default value if the file does not exist or is corrupt.""" + if not path.exists(): + return default + try: + with path.open("r") as f: + return json.load(f) + except (json.JSONDecodeError, OSError) as e: + log.error("Failed to load %s: %s", path, e) + return default + + +def validate_common_header(header: dict, expected_addon: str) -> list[str]: + """Return a list of missing/invalid common header fields. Empty list means valid.""" + required = [ + "addon", "association_slug", "association_channel_id", + "participant_id", "submitted_at", "standing", + ] + missing = [f for f in required if not header.get(f)] + if header.get("addon") != expected_addon: + missing.append(f"addon (expected '{expected_addon}')") + return missing + +# --------------------------------------------------------------------------- +# cry01 — capacity signals (unchanged from prior version) +# --------------------------------------------------------------------------- + +def load_spool() -> list: + """Load the cry01 signal spool file, returning an empty list if it does not exist.""" + if not SPOOL_FILE.exists(): + return [] + try: + with SPOOL_FILE.open("r") as f: + return json.load(f) + except (json.JSONDecodeError, OSError) as e: + log.error("Failed to load spool file: %s", e) + return [] + + +def save_spool(signals: list) -> None: + """Write the cry01 signal spool list to disk atomically.""" + atomic_write_json(SPOOL_FILE, signals) + + +@app.get("/health") +def health() -> dict: + return { + "status": "ok", + "spool_file": str(SPOOL_FILE), + "spool_exists": SPOOL_FILE.exists(), + "signal_count": len(load_spool()), + "diagnostic_data_dir": str(DIAGNOSTIC_DATA_DIR), + } + + +@app.post("/cry01/signal") +async def receive_signal(request: Request) -> JSONResponse: + """ + Accept a capacity signal from cry01_spool.php. + Expected JSON body matches contracts/signal-v1.json. + """ + try: + body = await request.json() + except Exception: + raise HTTPException(status_code=400, detail="Invalid JSON body.") + + header = body.get("_header", {}) + payload = body.get("_payload", {}) + + token_hash = header.get("node_token_hash", "") + if not verify_token(token_hash): + log.warning("Rejected signal — invalid node token hash.") + raise HTTPException(status_code=403, detail="Invalid node token.") + + required_header = ["addon", "association_slug", "participant_xchan", "submitted_at"] + required_payload = ["g1_pubkey", "capacity_description", "g1_denomination", + "duration_days", "category_id", "direction"] + + missing = [f for f in required_header if not header.get(f)] + missing += [f for f in required_payload if not payload.get(f)] + if missing: + raise HTTPException(status_code=422, detail=f"Missing fields: {missing}") + + if header.get("addon") != "cry01": + raise HTTPException(status_code=400, detail="Unexpected addon identifier.") + + if payload.get("direction") not in ("offer", "seek"): + raise HTTPException(status_code=422, detail="direction must be 'offer' or 'seek'.") + + entry: dict[str, Any] = { + "_received_at": int(time.time()), + "_header": header, + "_payload": payload, + "_status": "pending", + } + + signals = load_spool() + signals.append(entry) + save_spool(signals) + + log.info( + "Signal received: assoc=%s xchan=%s direction=%s category=%s", + header.get("association_slug"), + header.get("participant_xchan", "")[:12] + "...", + payload.get("direction"), + payload.get("category_id"), + ) + + return JSONResponse(status_code=201, content={"status": "accepted"}) + + +@app.get("/cry01/signals") +async def list_signals( + association_slug: str = "", + x_node_token_hash: str = Header(default=""), +) -> JSONResponse: + """List cry01 signals. Operator use only — requires node token.""" + if not verify_token(x_node_token_hash): + raise HTTPException(status_code=403, detail="Invalid node token.") + + signals = load_spool() + + if association_slug: + signals = [ + s for s in signals + if s.get("_header", {}).get("association_slug") == association_slug + ] + + return JSONResponse(content={"count": len(signals), "signals": signals}) + +# --------------------------------------------------------------------------- +# vs01 / dsc01 — overwrite-by-Placekey +# Shared implementation: both record types have identical storage rules, +# differing only in the addon name and the path segment under DIAGNOSTIC_DATA_DIR. +# See docs/orchestrator/receiver-spec.md for the contract. +# --------------------------------------------------------------------------- + +async def _receive_overwrite_record(request: Request, addon: str) -> JSONResponse: + """Shared handler for vs01 and dsc01 — validates and overwrites the participant's record.""" + try: + body = await request.json() + except Exception: + raise HTTPException(status_code=400, detail="Invalid JSON body.") + + header = body.get("_header", {}) + payload = body.get("_payload", {}) + + token_hash = header.get("node_token_hash", "") + if not verify_token(token_hash): + log.warning("Rejected %s record — invalid node token hash.", addon) + raise HTTPException(status_code=403, detail="Invalid node token.") + + missing = validate_common_header(header, addon) + if missing: + raise HTTPException(status_code=400, detail=f"Missing or invalid header fields: {missing}") + + fields = payload.get("fields") + if not fields: + raise HTTPException(status_code=400, detail="payload.fields is required and must be non-empty.") + + association_slug = safe_path_component(header["association_slug"], "association_slug") + participant_id = safe_path_component(header["participant_id"], "participant_id") + + record_path = DIAGNOSTIC_DATA_DIR / addon / association_slug / f"{participant_id}.json" + + record: dict[str, Any] = { + "_received_at": int(time.time()), + "_header": header, + "_payload": payload, + } + + try: + atomic_write_json(record_path, record) + except OSError as e: + log.error("Failed to write %s record for %s/%s: %s", addon, association_slug, participant_id, e) + # Log the full envelope to a failure log before returning 500 — + # a failed write must never silently lose the submission. + _log_failed_write(addon, record) + raise HTTPException(status_code=500, detail="Storage write failed. This submission was not lost — contact the operator.") + + log.info( + "%s record stored: assoc=%s participant=%s", + addon, association_slug, participant_id[:12] + "...", + ) + + return JSONResponse(status_code=200, content={ + "status": "ok", + "addon": addon, + "participant_id": participant_id, + "stored_at": header["submitted_at"], + }) + + +def _log_failed_write(addon: str, record: dict) -> None: + """Append a record that failed to write to its normal location into a separate failure log. + This is a last-resort safety net — it must never itself raise.""" + try: + failure_log = DIAGNOSTIC_DATA_DIR / "_failed_writes.jsonl" + failure_log.parent.mkdir(parents=True, exist_ok=True) + with failure_log.open("a") as f: + f.write(json.dumps({"addon": addon, "record": record, "logged_at": int(time.time())}) + "\n") + except Exception as e: + log.critical("FAILURE LOG ITSELF FAILED — record may be lost. addon=%s error=%s", addon, e) + + +@app.post("/vs01/record") +async def receive_vs01_record(request: Request) -> JSONResponse: + """ + Accept a Vital Signs record from vs01_spool.php. + Overwrites the participant's existing record in full. + See docs/orchestrator/receiver-spec.md and contracts/vs01/record-v1.json. + """ + return await _receive_overwrite_record(request, "vs01") + + +@app.post("/dsc01/record") +async def receive_dsc01_record(request: Request) -> JSONResponse: + """ + Accept a DSC Categories record from dsc01_spool.php. + Overwrites the participant's existing record in full. + See docs/orchestrator/receiver-spec.md and contracts/dsc01/record-v1.json. + """ + return await _receive_overwrite_record(request, "dsc01") + +# --------------------------------------------------------------------------- +# scn01 — append-only immutable attestation records +# See docs/orchestrator/receiver-spec.md and contracts/scn01/record-v1.json. +# --------------------------------------------------------------------------- + +@app.post("/scn01/record") +async def receive_scn01_record(request: Request) -> JSONResponse: + """ + Accept a Scenario attestation record from scn01_spool.php. + Appends to the association's records array. Never overwrites or truncates. + g1_tx_hash is optional and never required for acceptance. + """ + try: + body = await request.json() + except Exception: + raise HTTPException(status_code=400, detail="Invalid JSON body.") + + header = body.get("_header", {}) + payload = body.get("_payload", {}) + credentials = body.get("_credentials", {}) + + token_hash = header.get("node_token_hash", "") + if not verify_token(token_hash): + log.warning("Rejected scn01 record — invalid node token hash.") + raise HTTPException(status_code=403, detail="Invalid node token.") + + missing = validate_common_header(header, "scn01") + if missing: + raise HTTPException(status_code=400, detail=f"Missing or invalid header fields: {missing}") + + pinned_ids = payload.get("pinned_scenario_ids") + narrative = payload.get("narrative") + vs_snap = payload.get("vs_snapshot") + dsc_snap = payload.get("dsc_snapshot") + + if not pinned_ids or not isinstance(pinned_ids, list) or not (1 <= len(pinned_ids) <= 3): + raise HTTPException(status_code=400, detail="payload.pinned_scenario_ids must be an array of 1 to 3 entries.") + + if not narrative: + raise HTTPException(status_code=400, detail="payload.narrative is required and must be non-empty.") + + # Prerequisite backstop — the Hubzilla addon enforces this before the form + # is shown; this is defense-in-depth, not the primary gate. + if not vs_snap or not dsc_snap: + raise HTTPException( + status_code=400, + detail="payload.vs_snapshot and payload.dsc_snapshot are both required. " + "A Scenario cannot be submitted without an existing VS and DSC record.", + ) + + # g1_tx_hash is optional. Absence never blocks acceptance — the coin is + # a utility that strengthens evidentiary weight, not a currency that + # gates the record. See docs/orchestrator/README.md. + g1_tx_hash = credentials.get("g1_tx_hash") + + association_slug = safe_path_component(header["association_slug"], "association_slug") + + records_path = DIAGNOSTIC_DATA_DIR / "scn01" / association_slug / "records.json" + + record: dict[str, Any] = { + "_received_at": int(time.time()), + "_header": header, + "_payload": payload, + "_credentials": {"g1_tx_hash": g1_tx_hash}, + } + + try: + records = load_json_or_default(records_path, []) + if not isinstance(records, list): + # The file exists but is not an array — this is a corruption state. + # Refuse to silently coerce it; fail loudly instead. + log.critical("scn01 records file is not a JSON array: %s", records_path) + _log_failed_write("scn01", record) + raise HTTPException(status_code=500, detail="Records storage is in an unexpected state. This submission was not lost — contact the operator.") + + records.append(record) + atomic_write_json(records_path, records) + record_index = len(records) - 1 + except OSError as e: + log.error("Failed to write scn01 record for %s: %s", association_slug, e) + _log_failed_write("scn01", record) + raise HTTPException(status_code=500, detail="Storage write failed. This submission was not lost — contact the operator.") + + log.info( + "scn01 record appended: assoc=%s participant=%s index=%d g1_tx_hash=%s", + association_slug, + header.get("participant_id", "")[:12] + "...", + record_index, + "present" if g1_tx_hash else "absent", + ) + + return JSONResponse(status_code=200, content={ + "status": "ok", + "addon": "scn01", + "participant_id": header["participant_id"], + "record_index": record_index, + "g1_tx_hash": g1_tx_hash, + "stored_at": header["submitted_at"], + })