adobe-to-docusign-migrator/web/audit.py

122 lines
3.6 KiB
Python

from __future__ import annotations
import json
import os
from collections import deque
from datetime import datetime, timezone
from typing import Any
from fastapi import Request
from web.config import settings
def _audit_path() -> str:
return settings.audit_log_file
def _ensure_parent_dir() -> None:
parent = os.path.dirname(_audit_path())
if parent:
os.makedirs(parent, exist_ok=True)
def _client_ip(request: Request) -> str | None:
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
if request.client:
return request.client.host
return None
def _session_identity(session: dict[str, Any]) -> dict[str, Any]:
return {
"session_id": session.get("_session_id"),
"adobe_user_name": session.get("adobe_user_name"),
"adobe_user_email": session.get("adobe_user_email"),
"adobe_account_name": session.get("adobe_account_name"),
"adobe_account_id": session.get("adobe_account_id"),
"docusign_user_name": session.get("docusign_user_name"),
"docusign_user_email": session.get("docusign_user_email"),
"docusign_account_name": session.get("docusign_selected_account_name"),
"docusign_account_id": session.get("docusign_selected_account_id"),
}
def request_context(request: Request) -> dict[str, Any]:
return {
"path": str(request.url.path),
"method": request.method,
"ip": _client_ip(request),
"user_agent": request.headers.get("user-agent"),
}
def build_event_with_context(
context: dict[str, Any],
session: dict[str, Any],
action: str,
details: dict[str, Any] | None = None,
) -> dict[str, Any]:
event = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"action": action,
"path": context.get("path"),
"method": context.get("method"),
"ip": context.get("ip"),
"user_agent": context.get("user_agent"),
**_session_identity(session),
}
if details:
event["details"] = details
return event
def build_event(request: Request, session: dict[str, Any], action: str, details: dict[str, Any] | None = None) -> dict[str, Any]:
return build_event_with_context(request_context(request), session, action, details)
def log_event(request: Request, session: dict[str, Any], action: str, details: dict[str, Any] | None = None) -> dict[str, Any]:
event = build_event(request, session, action, details)
_ensure_parent_dir()
with open(_audit_path(), "a", encoding="utf-8") as f:
f.write(json.dumps(event, ensure_ascii=True) + "\n")
return event
def log_context_event(
context: dict[str, Any],
session: dict[str, Any],
action: str,
details: dict[str, Any] | None = None,
) -> dict[str, Any]:
event = build_event_with_context(context, session, action, details)
_ensure_parent_dir()
with open(_audit_path(), "a", encoding="utf-8") as f:
f.write(json.dumps(event, ensure_ascii=True) + "\n")
return event
def recent_events(limit: int = 100) -> list[dict[str, Any]]:
path = _audit_path()
if not os.path.exists(path):
return []
lines: deque[str] = deque(maxlen=max(1, min(limit, 500)))
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
lines.append(line)
events: list[dict[str, Any]] = []
for line in reversed(lines):
try:
item = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(item, dict):
events.append(item)
return events