from __future__ import annotations import json import os from collections import deque from datetime import datetime, timezone from typing import Any from fastapi import Request from web.config import settings from web.session import ensure_session_id def _audit_path() -> str: return settings.audit_log_file def _ensure_parent_dir() -> None: parent = os.path.dirname(_audit_path()) if parent: os.makedirs(parent, exist_ok=True) def _client_ip(request: Request) -> str | None: forwarded = request.headers.get("x-forwarded-for") if forwarded: return forwarded.split(",")[0].strip() if request.client: return request.client.host return None def _session_identity(session: dict[str, Any]) -> dict[str, Any]: return { "session_id": session.get("_session_id"), "adobe_user_name": session.get("adobe_user_name"), "adobe_user_email": session.get("adobe_user_email"), "adobe_account_name": session.get("adobe_account_name"), "adobe_account_id": session.get("adobe_account_id"), "docusign_user_name": session.get("docusign_user_name"), "docusign_user_email": session.get("docusign_user_email"), "docusign_account_name": session.get("docusign_selected_account_name"), "docusign_account_id": session.get("docusign_selected_account_id"), } def is_admin_session(session: dict[str, Any]) -> bool: emails = { (session.get("docusign_user_email") or "").strip().lower(), (session.get("adobe_user_email") or "").strip().lower(), } emails.discard("") return bool(emails & settings.admin_emails) def request_context(request: Request) -> dict[str, Any]: return { "path": str(request.url.path), "method": request.method, "ip": _client_ip(request), "user_agent": request.headers.get("user-agent"), } def build_event_with_context( context: dict[str, Any], session: dict[str, Any], action: str, details: dict[str, Any] | None = None, ) -> dict[str, Any]: event = { "timestamp": datetime.now(timezone.utc).isoformat(), "action": action, "path": context.get("path"), "method": context.get("method"), "ip": context.get("ip"), "user_agent": context.get("user_agent"), **_session_identity(session), } if details: event["details"] = details return event def build_event(request: Request, session: dict[str, Any], action: str, details: dict[str, Any] | None = None) -> dict[str, Any]: return build_event_with_context(request_context(request), session, action, details) def log_event(request: Request, session: dict[str, Any], action: str, details: dict[str, Any] | None = None) -> dict[str, Any]: ensure_session_id(session) event = build_event(request, session, action, details) _ensure_parent_dir() with open(_audit_path(), "a", encoding="utf-8") as f: f.write(json.dumps(event, ensure_ascii=True) + "\n") return event def log_context_event( context: dict[str, Any], session: dict[str, Any], action: str, details: dict[str, Any] | None = None, ) -> dict[str, Any]: event = build_event_with_context(context, session, action, details) _ensure_parent_dir() with open(_audit_path(), "a", encoding="utf-8") as f: f.write(json.dumps(event, ensure_ascii=True) + "\n") return event def recent_events(limit: int = 100, *, session_id: str | None = None, include_all: bool = False) -> list[dict[str, Any]]: path = _audit_path() if not os.path.exists(path): return [] lines: deque[str] = deque(maxlen=max(1, min(limit, 500))) with open(path, encoding="utf-8") as f: for line in f: line = line.strip() if line: lines.append(line) events: list[dict[str, Any]] = [] for line in reversed(lines): try: item = json.loads(line) except json.JSONDecodeError: continue if isinstance(item, dict): if not include_all and session_id and item.get("session_id") != session_id: continue events.append(item) return events