from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any
from kernite import JsonlDecisionSink, evaluate_execute_controlled
POLICY_DIR = Path(__file__).resolve().parents[2] / ".kernite"
BUNDLE = json.loads((POLICY_DIR / "policy-bundle.generated.json").read_text(encoding="utf-8"))
MAPPING = json.loads((POLICY_DIR / "policy-map.generated.json").read_text(encoding="utf-8"))
POLICIES = {policy["policy_key"]: policy for policy in BUNDLE.get("policies", [])}
OPERATIONS = {
(item["method"].upper(), item["path"]): item
for item in MAPPING.get("operations", [])
}
SINK = JsonlDecisionSink(POLICY_DIR / "decision-events.jsonl")
def evaluate_route(
*,
method: str,
path: str,
principal_id: str,
payload: dict[str, Any],
) -> dict[str, Any]:
operation = OPERATIONS.get((method.upper(), path))
if operation is None:
return {
"allow_write": True,
"mode": "skip",
"decision_raw": "skipped",
"decision_effective": "approved",
"governance": None,
"sink_status": "skipped",
}
policy_key = operation.get("policy_key")
selected_policies = [POLICIES[policy_key]] if policy_key in POLICIES else []
request_body = {
"workspace_id": "workspace-demo",
"principal": {"type": "token", "id": principal_id},
"object_type": operation["object_type"],
"operation": operation["operation"],
"payload": payload,
"policy_context": {
"governed": bool(operation.get("governed", False)),
"selected_policies": selected_policies,
"governed_scopes": BUNDLE.get("governed_scopes", []),
"policy_selection_reason_code": "policy_selected_workspace_default",
},
}
return evaluate_execute_controlled(
request_body,
mode=os.getenv("KERNITE_MODE", "observe"),
sink=SINK,
sink_failure_policy="fail_open",
)