adobe-to-docusign-migrator/web/audit.py

135 lines
4.1 KiB
Python

from __future__ import annotations
import json
import os
from collections import deque
from datetime import datetime, timezone
from typing import Any
from fastapi import Request
from web.config import settings
from web.session import ensure_session_id
def _audit_path() -> str:
return settings.audit_log_file
def _ensure_parent_dir() -> None:
parent = os.path.dirname(_audit_path())
if parent:
os.makedirs(parent, exist_ok=True)
def _client_ip(request: Request) -> str | None:
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
if request.client:
return request.client.host
return None
def _session_identity(session: dict[str, Any]) -> dict[str, Any]:
return {
"session_id": session.get("_session_id"),
"adobe_user_name": session.get("adobe_user_name"),
"adobe_user_email": session.get("adobe_user_email"),
"adobe_account_name": session.get("adobe_account_name"),
"adobe_account_id": session.get("adobe_account_id"),
"docusign_user_name": session.get("docusign_user_name"),
"docusign_user_email": session.get("docusign_user_email"),
"docusign_account_name": session.get("docusign_selected_account_name"),
"docusign_account_id": session.get("docusign_selected_account_id"),
}
def is_admin_session(session: dict[str, Any]) -> bool:
emails = {
(session.get("docusign_user_email") or "").strip().lower(),
(session.get("adobe_user_email") or "").strip().lower(),
}
emails.discard("")
return bool(emails & settings.admin_emails)
def request_context(request: Request) -> dict[str, Any]:
return {
"path": str(request.url.path),
"method": request.method,
"ip": _client_ip(request),
"user_agent": request.headers.get("user-agent"),
}
def build_event_with_context(
context: dict[str, Any],
session: dict[str, Any],
action: str,
details: dict[str, Any] | None = None,
) -> dict[str, Any]:
event = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"action": action,
"path": context.get("path"),
"method": context.get("method"),
"ip": context.get("ip"),
"user_agent": context.get("user_agent"),
**_session_identity(session),
}
if details:
event["details"] = details
return event
def build_event(request: Request, session: dict[str, Any], action: str, details: dict[str, Any] | None = None) -> dict[str, Any]:
return build_event_with_context(request_context(request), session, action, details)
def log_event(request: Request, session: dict[str, Any], action: str, details: dict[str, Any] | None = None) -> dict[str, Any]:
ensure_session_id(session)
event = build_event(request, session, action, details)
_ensure_parent_dir()
with open(_audit_path(), "a", encoding="utf-8") as f:
f.write(json.dumps(event, ensure_ascii=True) + "\n")
return event
def log_context_event(
context: dict[str, Any],
session: dict[str, Any],
action: str,
details: dict[str, Any] | None = None,
) -> dict[str, Any]:
event = build_event_with_context(context, session, action, details)
_ensure_parent_dir()
with open(_audit_path(), "a", encoding="utf-8") as f:
f.write(json.dumps(event, ensure_ascii=True) + "\n")
return event
def recent_events(limit: int = 100, *, session_id: str | None = None, include_all: bool = False) -> list[dict[str, Any]]:
path = _audit_path()
if not os.path.exists(path):
return []
lines: deque[str] = deque(maxlen=max(1, min(limit, 500)))
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
lines.append(line)
events: list[dict[str, Any]] = []
for line in reversed(lines):
try:
item = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(item, dict):
if not include_all and session_id and item.get("session_id") != session_id:
continue
events.append(item)
return events