""" web/session.py -------------- Session helpers backed by a signed session-id cookie plus server-side JSON files. This keeps OAuth refresh tokens off the client and allows multiple testers to use their own browser sessions concurrently against the same deployment. Backward compatibility: - Older signed-cookie payloads that stored the full session dict are still read. - Any write upgrades the browser to the server-side session-store format. """ from __future__ import annotations import json import os import secrets from typing import Any from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired from fastapi import Request, Response from web.config import settings _serializer = URLSafeTimedSerializer(settings.session_secret_key) _COOKIE_NAME = "migrator_session" _MAX_AGE = 3600 # 1 hour _SESSION_ID_KEY = "_session_id" def _session_store_dir() -> str: return settings.session_store_dir def _session_path(session_id: str) -> str: return os.path.join(_session_store_dir(), f"{session_id}.json") def _ensure_store_dir() -> None: os.makedirs(_session_store_dir(), exist_ok=True) def _new_session_id() -> str: return secrets.token_urlsafe(24) def _load_server_session(session_id: str) -> dict: path = _session_path(session_id) if not os.path.exists(path): return {} try: with open(path) as f: data = json.load(f) except (OSError, json.JSONDecodeError): return {} return data if isinstance(data, dict) else {} def _write_server_session(session_id: str, data: dict) -> None: _ensure_store_dir() path = _session_path(session_id) tmp_path = f"{path}.tmp" with open(tmp_path, "w") as f: json.dump(data, f) os.replace(tmp_path, path) def _delete_server_session(session_id: str | None) -> None: if not session_id: return path = _session_path(session_id) try: os.remove(path) except FileNotFoundError: pass def _attach_session_id(data: dict, session_id: str) -> dict: payload = dict(data) payload[_SESSION_ID_KEY] = session_id return payload def _decode_cookie(request: Request) -> dict | str | None: raw = request.cookies.get(_COOKIE_NAME) if not raw: return None try: return _serializer.loads(raw, max_age=_MAX_AGE) except (BadSignature, SignatureExpired): return None def get_session(request: Request) -> dict: """Return session data for the current request.""" decoded = _decode_cookie(request) if decoded is None: return {} if isinstance(decoded, dict): # Legacy cookie format from earlier app versions. return decoded if isinstance(decoded, str): return _attach_session_id(_load_server_session(decoded), decoded) return {} def get_session_id(request: Request) -> str | None: session = get_session(request) return session.get(_SESSION_ID_KEY) def create_test_session(data: dict, session_id: str | None = None) -> str: """ Test helper: create a server-side session and return a valid cookie value. """ sid = session_id or _new_session_id() payload = dict(data) payload.pop(_SESSION_ID_KEY, None) _write_server_session(sid, payload) return _serializer.dumps(sid) def save_session(response: Response, data: dict, session_id: str | None = None) -> str: """Persist session data server-side and set the signed session-id cookie.""" sid = session_id or data.get(_SESSION_ID_KEY) or _new_session_id() payload = dict(data) payload.pop(_SESSION_ID_KEY, None) _write_server_session(sid, payload) response.set_cookie( _COOKIE_NAME, _serializer.dumps(sid), max_age=_MAX_AGE, httponly=True, samesite="lax", ) return sid def clear_session(response: Response, request: Request | None = None) -> None: """Delete the browser session cookie and remove server-side session data when possible.""" session_id = None if request is not None: session_id = get_session_id(request) _delete_server_session(session_id) response.delete_cookie(_COOKIE_NAME) def session_public_view(session: dict[str, Any]) -> dict[str, Any]: """ Return the subset of session data that is useful for UI/debug responses. """ return { "session_id": session.get(_SESSION_ID_KEY), "adobe": bool(session.get("adobe_access_token")), "docusign": bool(session.get("docusign_access_token")), "adobe_auth_mode": session.get("adobe_auth_mode", "disconnected"), "adobe_user_name": session.get("adobe_user_name"), "adobe_user_email": session.get("adobe_user_email"), "adobe_account_name": session.get("adobe_account_name"), "adobe_account_id": session.get("adobe_account_id"), "docusign_auth_mode": session.get("docusign_auth_mode", "disconnected"), "docusign_user_name": session.get("docusign_user_name"), "docusign_user_email": session.get("docusign_user_email"), "docusign_selected_account_id": session.get("docusign_selected_account_id"), "docusign_selected_account_name": session.get("docusign_selected_account_name"), "docusign_accounts_count": session.get("docusign_accounts_count", 0), }