162 lines
4.7 KiB
Python
162 lines
4.7 KiB
Python
"""
|
|
web/session.py
|
|
--------------
|
|
Session helpers backed by a signed session-id cookie plus server-side JSON files.
|
|
|
|
This keeps OAuth refresh tokens off the client and allows multiple testers to use
|
|
their own browser sessions concurrently against the same deployment.
|
|
|
|
Backward compatibility:
|
|
- Older signed-cookie payloads that stored the full session dict are still read.
|
|
- Any write upgrades the browser to the server-side session-store format.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import secrets
|
|
from typing import Any
|
|
|
|
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
|
|
from fastapi import Request, Response
|
|
|
|
from web.config import settings
|
|
|
|
_serializer = URLSafeTimedSerializer(settings.session_secret_key)
|
|
_COOKIE_NAME = "migrator_session"
|
|
_MAX_AGE = 3600 # 1 hour
|
|
_SESSION_ID_KEY = "_session_id"
|
|
|
|
|
|
def _session_store_dir() -> str:
|
|
return settings.session_store_dir
|
|
|
|
|
|
def _session_path(session_id: str) -> str:
|
|
return os.path.join(_session_store_dir(), f"{session_id}.json")
|
|
|
|
|
|
def _ensure_store_dir() -> None:
|
|
os.makedirs(_session_store_dir(), exist_ok=True)
|
|
|
|
|
|
def _new_session_id() -> str:
|
|
return secrets.token_urlsafe(24)
|
|
|
|
|
|
def _load_server_session(session_id: str) -> dict:
|
|
path = _session_path(session_id)
|
|
if not os.path.exists(path):
|
|
return {}
|
|
try:
|
|
with open(path) as f:
|
|
data = json.load(f)
|
|
except (OSError, json.JSONDecodeError):
|
|
return {}
|
|
return data if isinstance(data, dict) else {}
|
|
|
|
|
|
def _write_server_session(session_id: str, data: dict) -> None:
|
|
_ensure_store_dir()
|
|
path = _session_path(session_id)
|
|
tmp_path = f"{path}.tmp"
|
|
with open(tmp_path, "w") as f:
|
|
json.dump(data, f)
|
|
os.replace(tmp_path, path)
|
|
|
|
|
|
def _delete_server_session(session_id: str | None) -> None:
|
|
if not session_id:
|
|
return
|
|
path = _session_path(session_id)
|
|
try:
|
|
os.remove(path)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
|
|
def _attach_session_id(data: dict, session_id: str) -> dict:
|
|
payload = dict(data)
|
|
payload[_SESSION_ID_KEY] = session_id
|
|
return payload
|
|
|
|
|
|
def _decode_cookie(request: Request) -> dict | str | None:
|
|
raw = request.cookies.get(_COOKIE_NAME)
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return _serializer.loads(raw, max_age=_MAX_AGE)
|
|
except (BadSignature, SignatureExpired):
|
|
return None
|
|
|
|
|
|
def get_session(request: Request) -> dict:
|
|
"""Return session data for the current request."""
|
|
decoded = _decode_cookie(request)
|
|
if decoded is None:
|
|
return {}
|
|
if isinstance(decoded, dict):
|
|
# Legacy cookie format from earlier app versions.
|
|
return decoded
|
|
if isinstance(decoded, str):
|
|
return _attach_session_id(_load_server_session(decoded), decoded)
|
|
return {}
|
|
|
|
|
|
def get_session_id(request: Request) -> str | None:
|
|
session = get_session(request)
|
|
return session.get(_SESSION_ID_KEY)
|
|
|
|
|
|
def create_test_session(data: dict, session_id: str | None = None) -> str:
|
|
"""
|
|
Test helper: create a server-side session and return a valid cookie value.
|
|
"""
|
|
sid = session_id or _new_session_id()
|
|
payload = dict(data)
|
|
payload.pop(_SESSION_ID_KEY, None)
|
|
_write_server_session(sid, payload)
|
|
return _serializer.dumps(sid)
|
|
|
|
|
|
def save_session(response: Response, data: dict, session_id: str | None = None) -> str:
|
|
"""Persist session data server-side and set the signed session-id cookie."""
|
|
sid = session_id or data.get(_SESSION_ID_KEY) or _new_session_id()
|
|
payload = dict(data)
|
|
payload.pop(_SESSION_ID_KEY, None)
|
|
_write_server_session(sid, payload)
|
|
response.set_cookie(
|
|
_COOKIE_NAME,
|
|
_serializer.dumps(sid),
|
|
max_age=_MAX_AGE,
|
|
httponly=True,
|
|
samesite="lax",
|
|
)
|
|
return sid
|
|
|
|
|
|
def clear_session(response: Response, request: Request | None = None) -> None:
|
|
"""Delete the browser session cookie and remove server-side session data when possible."""
|
|
session_id = None
|
|
if request is not None:
|
|
session_id = get_session_id(request)
|
|
_delete_server_session(session_id)
|
|
response.delete_cookie(_COOKIE_NAME)
|
|
|
|
|
|
def session_public_view(session: dict[str, Any]) -> dict[str, Any]:
|
|
"""
|
|
Return the subset of session data that is useful for UI/debug responses.
|
|
"""
|
|
return {
|
|
"session_id": session.get(_SESSION_ID_KEY),
|
|
"adobe": bool(session.get("adobe_access_token")),
|
|
"docusign": bool(session.get("docusign_access_token")),
|
|
"adobe_auth_mode": session.get("adobe_auth_mode", "disconnected"),
|
|
"docusign_auth_mode": session.get("docusign_auth_mode", "disconnected"),
|
|
"docusign_user_name": session.get("docusign_user_name"),
|
|
"docusign_user_email": session.get("docusign_user_email"),
|
|
}
|