Initial push
This commit is contained in:
432
orchestrator/app/main.py
Normal file
432
orchestrator/app/main.py
Normal file
@@ -0,0 +1,432 @@
|
|||||||
|
"""
|
||||||
|
Civic Orchestrator — Diagnostic Record Receiver
|
||||||
|
=================================================
|
||||||
|
Receives diagnostic record envelopes from Hubzilla addons on this node,
|
||||||
|
validates the node token, and writes records to JSON storage.
|
||||||
|
|
||||||
|
This file began as the cry01 capacity-signal receiver. It now also
|
||||||
|
receives vs01, dsc01, and scn01 diagnostic records. All four addons share
|
||||||
|
one NODE_TOKEN — see docs/orchestrator/README.md for the trust boundary
|
||||||
|
this implies. Each addon has its own route and its own storage rule;
|
||||||
|
nothing is shared across addons except the token and this process.
|
||||||
|
|
||||||
|
Endpoints:
|
||||||
|
POST /cry01/signal — register a capacity signal (unchanged)
|
||||||
|
GET /cry01/signals — list signals for an association (unchanged)
|
||||||
|
POST /vs01/record — store a Vital Signs record (overwrite-by-Placekey)
|
||||||
|
POST /dsc01/record — store a DSC Categories record (overwrite-by-Placekey)
|
||||||
|
POST /scn01/record — append a Scenario attestation record (append-only)
|
||||||
|
GET /health — health check
|
||||||
|
|
||||||
|
Configuration via environment variables (set in systemd unit or shell):
|
||||||
|
NODE_TOKEN — shared secret, must match each addon's config.json node_token
|
||||||
|
SPOOL_FILE — cry01 signal spool path (unchanged)
|
||||||
|
default: /srv/civic-orchestrator/data/cry01_signals.json
|
||||||
|
DIAGNOSTIC_DATA_DIR — root directory for vs01/dsc01/scn01 JSON storage
|
||||||
|
default: /srv/civic-orchestrator/data
|
||||||
|
|
||||||
|
Storage layout under DIAGNOSTIC_DATA_DIR:
|
||||||
|
vs01/{association_slug}/{participant_id}.json — one file per participant, overwritten
|
||||||
|
dsc01/{association_slug}/{participant_id}.json — one file per participant, overwritten
|
||||||
|
scn01/{association_slug}/records.json — one array per association, appended
|
||||||
|
|
||||||
|
See docs/orchestrator/receiver-spec.md for the full envelope contract.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from fastapi import FastAPI, Header, HTTPException, Request
|
||||||
|
from fastapi.responses import JSONResponse
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Configuration
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
NODE_TOKEN = os.environ.get("NODE_TOKEN", "")
|
||||||
|
SPOOL_FILE = Path(os.environ.get("SPOOL_FILE", "/srv/civic-orchestrator/data/cry01_signals.json"))
|
||||||
|
DIAGNOSTIC_DATA_DIR = Path(os.environ.get("DIAGNOSTIC_DATA_DIR", "/srv/civic-orchestrator/data"))
|
||||||
|
|
||||||
|
# Association slugs and participant IDs are used directly as path components.
|
||||||
|
# This pattern is deliberately strict — it only allows characters that are
|
||||||
|
# safe in a POSIX path and that match the actual shapes of a slug and a
|
||||||
|
# Placekey. Anything else is rejected before it ever touches the filesystem.
|
||||||
|
_SAFE_PATH_COMPONENT = re.compile(r"^[A-Za-z0-9_\-]+$")
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Logging
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s %(levelname)s %(message)s",
|
||||||
|
)
|
||||||
|
log = logging.getLogger("civic-orchestrator")
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# App
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
app = FastAPI(title="Civic Orchestrator — Diagnostic Record Receiver", version="0.2.0")
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Shared helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def verify_token(token_hash: str) -> bool:
|
||||||
|
"""Verify the node token hash sent by the Hubzilla addon."""
|
||||||
|
if not NODE_TOKEN:
|
||||||
|
log.error("NODE_TOKEN is not set — all requests will be rejected.")
|
||||||
|
return False
|
||||||
|
expected = hashlib.sha256(NODE_TOKEN.encode()).hexdigest()
|
||||||
|
return token_hash == expected
|
||||||
|
|
||||||
|
|
||||||
|
def safe_path_component(value: str, field_name: str) -> str:
|
||||||
|
"""Validate a string is safe to use as a path component. Raises 400 if not."""
|
||||||
|
if not value or not _SAFE_PATH_COMPONENT.match(value):
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=400,
|
||||||
|
detail=f"Invalid {field_name}: must be non-empty and contain only letters, digits, hyphens, and underscores.",
|
||||||
|
)
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
def atomic_write_json(path: Path, data: Any) -> None:
|
||||||
|
"""Write JSON to disk atomically — write to a temp file, then rename over the target."""
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
tmp = path.with_suffix(path.suffix + ".tmp")
|
||||||
|
with tmp.open("w") as f:
|
||||||
|
json.dump(data, f, indent=2)
|
||||||
|
tmp.replace(path)
|
||||||
|
|
||||||
|
|
||||||
|
def load_json_or_default(path: Path, default: Any) -> Any:
|
||||||
|
"""Load JSON from disk, returning a default value if the file does not exist or is corrupt."""
|
||||||
|
if not path.exists():
|
||||||
|
return default
|
||||||
|
try:
|
||||||
|
with path.open("r") as f:
|
||||||
|
return json.load(f)
|
||||||
|
except (json.JSONDecodeError, OSError) as e:
|
||||||
|
log.error("Failed to load %s: %s", path, e)
|
||||||
|
return default
|
||||||
|
|
||||||
|
|
||||||
|
def validate_common_header(header: dict, expected_addon: str) -> list[str]:
|
||||||
|
"""Return a list of missing/invalid common header fields. Empty list means valid."""
|
||||||
|
required = [
|
||||||
|
"addon", "association_slug", "association_channel_id",
|
||||||
|
"participant_id", "submitted_at", "standing",
|
||||||
|
]
|
||||||
|
missing = [f for f in required if not header.get(f)]
|
||||||
|
if header.get("addon") != expected_addon:
|
||||||
|
missing.append(f"addon (expected '{expected_addon}')")
|
||||||
|
return missing
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# cry01 — capacity signals (unchanged from prior version)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def load_spool() -> list:
|
||||||
|
"""Load the cry01 signal spool file, returning an empty list if it does not exist."""
|
||||||
|
if not SPOOL_FILE.exists():
|
||||||
|
return []
|
||||||
|
try:
|
||||||
|
with SPOOL_FILE.open("r") as f:
|
||||||
|
return json.load(f)
|
||||||
|
except (json.JSONDecodeError, OSError) as e:
|
||||||
|
log.error("Failed to load spool file: %s", e)
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def save_spool(signals: list) -> None:
|
||||||
|
"""Write the cry01 signal spool list to disk atomically."""
|
||||||
|
atomic_write_json(SPOOL_FILE, signals)
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/health")
|
||||||
|
def health() -> dict:
|
||||||
|
return {
|
||||||
|
"status": "ok",
|
||||||
|
"spool_file": str(SPOOL_FILE),
|
||||||
|
"spool_exists": SPOOL_FILE.exists(),
|
||||||
|
"signal_count": len(load_spool()),
|
||||||
|
"diagnostic_data_dir": str(DIAGNOSTIC_DATA_DIR),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/cry01/signal")
|
||||||
|
async def receive_signal(request: Request) -> JSONResponse:
|
||||||
|
"""
|
||||||
|
Accept a capacity signal from cry01_spool.php.
|
||||||
|
Expected JSON body matches contracts/signal-v1.json.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
body = await request.json()
|
||||||
|
except Exception:
|
||||||
|
raise HTTPException(status_code=400, detail="Invalid JSON body.")
|
||||||
|
|
||||||
|
header = body.get("_header", {})
|
||||||
|
payload = body.get("_payload", {})
|
||||||
|
|
||||||
|
token_hash = header.get("node_token_hash", "")
|
||||||
|
if not verify_token(token_hash):
|
||||||
|
log.warning("Rejected signal — invalid node token hash.")
|
||||||
|
raise HTTPException(status_code=403, detail="Invalid node token.")
|
||||||
|
|
||||||
|
required_header = ["addon", "association_slug", "participant_xchan", "submitted_at"]
|
||||||
|
required_payload = ["g1_pubkey", "capacity_description", "g1_denomination",
|
||||||
|
"duration_days", "category_id", "direction"]
|
||||||
|
|
||||||
|
missing = [f for f in required_header if not header.get(f)]
|
||||||
|
missing += [f for f in required_payload if not payload.get(f)]
|
||||||
|
if missing:
|
||||||
|
raise HTTPException(status_code=422, detail=f"Missing fields: {missing}")
|
||||||
|
|
||||||
|
if header.get("addon") != "cry01":
|
||||||
|
raise HTTPException(status_code=400, detail="Unexpected addon identifier.")
|
||||||
|
|
||||||
|
if payload.get("direction") not in ("offer", "seek"):
|
||||||
|
raise HTTPException(status_code=422, detail="direction must be 'offer' or 'seek'.")
|
||||||
|
|
||||||
|
entry: dict[str, Any] = {
|
||||||
|
"_received_at": int(time.time()),
|
||||||
|
"_header": header,
|
||||||
|
"_payload": payload,
|
||||||
|
"_status": "pending",
|
||||||
|
}
|
||||||
|
|
||||||
|
signals = load_spool()
|
||||||
|
signals.append(entry)
|
||||||
|
save_spool(signals)
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Signal received: assoc=%s xchan=%s direction=%s category=%s",
|
||||||
|
header.get("association_slug"),
|
||||||
|
header.get("participant_xchan", "")[:12] + "...",
|
||||||
|
payload.get("direction"),
|
||||||
|
payload.get("category_id"),
|
||||||
|
)
|
||||||
|
|
||||||
|
return JSONResponse(status_code=201, content={"status": "accepted"})
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/cry01/signals")
|
||||||
|
async def list_signals(
|
||||||
|
association_slug: str = "",
|
||||||
|
x_node_token_hash: str = Header(default=""),
|
||||||
|
) -> JSONResponse:
|
||||||
|
"""List cry01 signals. Operator use only — requires node token."""
|
||||||
|
if not verify_token(x_node_token_hash):
|
||||||
|
raise HTTPException(status_code=403, detail="Invalid node token.")
|
||||||
|
|
||||||
|
signals = load_spool()
|
||||||
|
|
||||||
|
if association_slug:
|
||||||
|
signals = [
|
||||||
|
s for s in signals
|
||||||
|
if s.get("_header", {}).get("association_slug") == association_slug
|
||||||
|
]
|
||||||
|
|
||||||
|
return JSONResponse(content={"count": len(signals), "signals": signals})
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# vs01 / dsc01 — overwrite-by-Placekey
|
||||||
|
# Shared implementation: both record types have identical storage rules,
|
||||||
|
# differing only in the addon name and the path segment under DIAGNOSTIC_DATA_DIR.
|
||||||
|
# See docs/orchestrator/receiver-spec.md for the contract.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _receive_overwrite_record(request: Request, addon: str) -> JSONResponse:
|
||||||
|
"""Shared handler for vs01 and dsc01 — validates and overwrites the participant's record."""
|
||||||
|
try:
|
||||||
|
body = await request.json()
|
||||||
|
except Exception:
|
||||||
|
raise HTTPException(status_code=400, detail="Invalid JSON body.")
|
||||||
|
|
||||||
|
header = body.get("_header", {})
|
||||||
|
payload = body.get("_payload", {})
|
||||||
|
|
||||||
|
token_hash = header.get("node_token_hash", "")
|
||||||
|
if not verify_token(token_hash):
|
||||||
|
log.warning("Rejected %s record — invalid node token hash.", addon)
|
||||||
|
raise HTTPException(status_code=403, detail="Invalid node token.")
|
||||||
|
|
||||||
|
missing = validate_common_header(header, addon)
|
||||||
|
if missing:
|
||||||
|
raise HTTPException(status_code=400, detail=f"Missing or invalid header fields: {missing}")
|
||||||
|
|
||||||
|
fields = payload.get("fields")
|
||||||
|
if not fields:
|
||||||
|
raise HTTPException(status_code=400, detail="payload.fields is required and must be non-empty.")
|
||||||
|
|
||||||
|
association_slug = safe_path_component(header["association_slug"], "association_slug")
|
||||||
|
participant_id = safe_path_component(header["participant_id"], "participant_id")
|
||||||
|
|
||||||
|
record_path = DIAGNOSTIC_DATA_DIR / addon / association_slug / f"{participant_id}.json"
|
||||||
|
|
||||||
|
record: dict[str, Any] = {
|
||||||
|
"_received_at": int(time.time()),
|
||||||
|
"_header": header,
|
||||||
|
"_payload": payload,
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
atomic_write_json(record_path, record)
|
||||||
|
except OSError as e:
|
||||||
|
log.error("Failed to write %s record for %s/%s: %s", addon, association_slug, participant_id, e)
|
||||||
|
# Log the full envelope to a failure log before returning 500 —
|
||||||
|
# a failed write must never silently lose the submission.
|
||||||
|
_log_failed_write(addon, record)
|
||||||
|
raise HTTPException(status_code=500, detail="Storage write failed. This submission was not lost — contact the operator.")
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"%s record stored: assoc=%s participant=%s",
|
||||||
|
addon, association_slug, participant_id[:12] + "...",
|
||||||
|
)
|
||||||
|
|
||||||
|
return JSONResponse(status_code=200, content={
|
||||||
|
"status": "ok",
|
||||||
|
"addon": addon,
|
||||||
|
"participant_id": participant_id,
|
||||||
|
"stored_at": header["submitted_at"],
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def _log_failed_write(addon: str, record: dict) -> None:
|
||||||
|
"""Append a record that failed to write to its normal location into a separate failure log.
|
||||||
|
This is a last-resort safety net — it must never itself raise."""
|
||||||
|
try:
|
||||||
|
failure_log = DIAGNOSTIC_DATA_DIR / "_failed_writes.jsonl"
|
||||||
|
failure_log.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with failure_log.open("a") as f:
|
||||||
|
f.write(json.dumps({"addon": addon, "record": record, "logged_at": int(time.time())}) + "\n")
|
||||||
|
except Exception as e:
|
||||||
|
log.critical("FAILURE LOG ITSELF FAILED — record may be lost. addon=%s error=%s", addon, e)
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/vs01/record")
|
||||||
|
async def receive_vs01_record(request: Request) -> JSONResponse:
|
||||||
|
"""
|
||||||
|
Accept a Vital Signs record from vs01_spool.php.
|
||||||
|
Overwrites the participant's existing record in full.
|
||||||
|
See docs/orchestrator/receiver-spec.md and contracts/vs01/record-v1.json.
|
||||||
|
"""
|
||||||
|
return await _receive_overwrite_record(request, "vs01")
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/dsc01/record")
|
||||||
|
async def receive_dsc01_record(request: Request) -> JSONResponse:
|
||||||
|
"""
|
||||||
|
Accept a DSC Categories record from dsc01_spool.php.
|
||||||
|
Overwrites the participant's existing record in full.
|
||||||
|
See docs/orchestrator/receiver-spec.md and contracts/dsc01/record-v1.json.
|
||||||
|
"""
|
||||||
|
return await _receive_overwrite_record(request, "dsc01")
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# scn01 — append-only immutable attestation records
|
||||||
|
# See docs/orchestrator/receiver-spec.md and contracts/scn01/record-v1.json.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@app.post("/scn01/record")
|
||||||
|
async def receive_scn01_record(request: Request) -> JSONResponse:
|
||||||
|
"""
|
||||||
|
Accept a Scenario attestation record from scn01_spool.php.
|
||||||
|
Appends to the association's records array. Never overwrites or truncates.
|
||||||
|
g1_tx_hash is optional and never required for acceptance.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
body = await request.json()
|
||||||
|
except Exception:
|
||||||
|
raise HTTPException(status_code=400, detail="Invalid JSON body.")
|
||||||
|
|
||||||
|
header = body.get("_header", {})
|
||||||
|
payload = body.get("_payload", {})
|
||||||
|
credentials = body.get("_credentials", {})
|
||||||
|
|
||||||
|
token_hash = header.get("node_token_hash", "")
|
||||||
|
if not verify_token(token_hash):
|
||||||
|
log.warning("Rejected scn01 record — invalid node token hash.")
|
||||||
|
raise HTTPException(status_code=403, detail="Invalid node token.")
|
||||||
|
|
||||||
|
missing = validate_common_header(header, "scn01")
|
||||||
|
if missing:
|
||||||
|
raise HTTPException(status_code=400, detail=f"Missing or invalid header fields: {missing}")
|
||||||
|
|
||||||
|
pinned_ids = payload.get("pinned_scenario_ids")
|
||||||
|
narrative = payload.get("narrative")
|
||||||
|
vs_snap = payload.get("vs_snapshot")
|
||||||
|
dsc_snap = payload.get("dsc_snapshot")
|
||||||
|
|
||||||
|
if not pinned_ids or not isinstance(pinned_ids, list) or not (1 <= len(pinned_ids) <= 3):
|
||||||
|
raise HTTPException(status_code=400, detail="payload.pinned_scenario_ids must be an array of 1 to 3 entries.")
|
||||||
|
|
||||||
|
if not narrative:
|
||||||
|
raise HTTPException(status_code=400, detail="payload.narrative is required and must be non-empty.")
|
||||||
|
|
||||||
|
# Prerequisite backstop — the Hubzilla addon enforces this before the form
|
||||||
|
# is shown; this is defense-in-depth, not the primary gate.
|
||||||
|
if not vs_snap or not dsc_snap:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=400,
|
||||||
|
detail="payload.vs_snapshot and payload.dsc_snapshot are both required. "
|
||||||
|
"A Scenario cannot be submitted without an existing VS and DSC record.",
|
||||||
|
)
|
||||||
|
|
||||||
|
# g1_tx_hash is optional. Absence never blocks acceptance — the coin is
|
||||||
|
# a utility that strengthens evidentiary weight, not a currency that
|
||||||
|
# gates the record. See docs/orchestrator/README.md.
|
||||||
|
g1_tx_hash = credentials.get("g1_tx_hash")
|
||||||
|
|
||||||
|
association_slug = safe_path_component(header["association_slug"], "association_slug")
|
||||||
|
|
||||||
|
records_path = DIAGNOSTIC_DATA_DIR / "scn01" / association_slug / "records.json"
|
||||||
|
|
||||||
|
record: dict[str, Any] = {
|
||||||
|
"_received_at": int(time.time()),
|
||||||
|
"_header": header,
|
||||||
|
"_payload": payload,
|
||||||
|
"_credentials": {"g1_tx_hash": g1_tx_hash},
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
records = load_json_or_default(records_path, [])
|
||||||
|
if not isinstance(records, list):
|
||||||
|
# The file exists but is not an array — this is a corruption state.
|
||||||
|
# Refuse to silently coerce it; fail loudly instead.
|
||||||
|
log.critical("scn01 records file is not a JSON array: %s", records_path)
|
||||||
|
_log_failed_write("scn01", record)
|
||||||
|
raise HTTPException(status_code=500, detail="Records storage is in an unexpected state. This submission was not lost — contact the operator.")
|
||||||
|
|
||||||
|
records.append(record)
|
||||||
|
atomic_write_json(records_path, records)
|
||||||
|
record_index = len(records) - 1
|
||||||
|
except OSError as e:
|
||||||
|
log.error("Failed to write scn01 record for %s: %s", association_slug, e)
|
||||||
|
_log_failed_write("scn01", record)
|
||||||
|
raise HTTPException(status_code=500, detail="Storage write failed. This submission was not lost — contact the operator.")
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"scn01 record appended: assoc=%s participant=%s index=%d g1_tx_hash=%s",
|
||||||
|
association_slug,
|
||||||
|
header.get("participant_id", "")[:12] + "...",
|
||||||
|
record_index,
|
||||||
|
"present" if g1_tx_hash else "absent",
|
||||||
|
)
|
||||||
|
|
||||||
|
return JSONResponse(status_code=200, content={
|
||||||
|
"status": "ok",
|
||||||
|
"addon": "scn01",
|
||||||
|
"participant_id": header["participant_id"],
|
||||||
|
"record_index": record_index,
|
||||||
|
"g1_tx_hash": g1_tx_hash,
|
||||||
|
"stored_at": header["submitted_at"],
|
||||||
|
})
|
||||||
Reference in New Issue
Block a user