""" web/routers/templates.py ------------------------ Template listing endpoints for Adobe Sign and DocuSign. Computes per-template migration status for the side-by-side UI. """ import asyncio import json import os from datetime import datetime, timezone from pathlib import Path import tempfile from typing import Optional import httpx from fastapi import APIRouter, Request from fastapi.responses import JSONResponse from web.config import settings from web.docusign_context import DocusignContextError, current_account from web.session import get_session router = APIRouter() _HISTORY_FILE = os.path.abspath(os.path.join( os.path.dirname(__file__), "..", "..", "migration-output", ".history.json" )) def _require_adobe(session: dict) -> Optional[JSONResponse]: if not session.get("adobe_access_token"): return JSONResponse({"error": "not authenticated to Adobe Sign"}, status_code=401) return None def _require_docusign(session: dict) -> Optional[JSONResponse]: if not session.get("docusign_access_token"): return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401) try: current_account(session) except DocusignContextError as e: return JSONResponse({"error": str(e), "code": e.code}, status_code=e.status_code) return None @router.get("/adobe") async def list_adobe_templates(request: Request): """List all Adobe Sign library documents (templates) for the current user.""" session = get_session(request) err = _require_adobe(session) if err: return err async with httpx.AsyncClient() as client: resp = await client.get( f"{settings.adobe_sign_base_url}/libraryDocuments", headers={"Authorization": f"Bearer {session['adobe_access_token']}"}, params={"pageSize": 100}, ) if not resp.is_success: return JSONResponse({"error": "Adobe Sign API error", "detail": resp.text}, status_code=502) data = resp.json() templates = [ { "id": t.get("id"), "name": t.get("name"), "modifiedDate": t.get("modifiedDate"), "sharingMode": t.get("sharingMode"), } for t in data.get("libraryDocumentList", []) ] return {"templates": templates} @router.get("/docusign") async def list_docusign_templates(request: Request): """List all DocuSign templates for the account.""" session = get_session(request) err = _require_docusign(session) if err: return err account = current_account(session) async with httpx.AsyncClient() as client: resp = await client.get( f"{account['base_url']}/v2.1/accounts/{account['account_id']}/templates", headers={"Authorization": f"Bearer {session['docusign_access_token']}"}, params={"count": 100}, ) if not resp.is_success: return JSONResponse({"error": "DocuSign API error", "detail": resp.text}, status_code=502) data = resp.json() raw = data.get("envelopeTemplates") or data.get("templates") or [] templates = [ { "id": t.get("templateId"), "name": t.get("name"), "lastModified": t.get("lastModified"), } for t in raw ] return {"templates": templates} @router.get("/status") async def template_status(request: Request): """ Merged view: each Adobe template tagged with migration status. Status values: not_migrated — no DocuSign template with the same name migrated — at least one exact name match in DocuSign needs_update — name match exists but Adobe template is newer """ session = get_session(request) err = _require_adobe(session) or _require_docusign(session) if err: return err account = current_account(session) # Fetch both lists concurrently async with httpx.AsyncClient() as client: adobe_resp, ds_resp = await asyncio.gather( client.get( f"{settings.adobe_sign_base_url}/libraryDocuments", headers={"Authorization": f"Bearer {session['adobe_access_token']}"}, params={"pageSize": 100}, ), client.get( f"{account['base_url']}/v2.1/accounts/{account['account_id']}/templates", headers={"Authorization": f"Bearer {session['docusign_access_token']}"}, params={"count": 100}, ), ) if not adobe_resp.is_success: return JSONResponse({"error": "Adobe Sign API error"}, status_code=502) if not ds_resp.is_success: return JSONResponse({"error": "DocuSign API error"}, status_code=502) adobe_templates = adobe_resp.json().get("libraryDocumentList", []) ds_raw = ds_resp.json().get("envelopeTemplates") or ds_resp.json().get("templates") or [] # Build a name → most-recently-modified DocuSign template lookup ds_by_name: dict[str, dict] = {} for t in ds_raw: name = t.get("name", "") existing = ds_by_name.get(name) if not existing or t.get("lastModified", "") > existing.get("lastModified", ""): ds_by_name[name] = t results = [] for t in adobe_templates: name = t.get("name", "") adobe_modified = t.get("modifiedDate", "") ds_match = ds_by_name.get(name) if not ds_match: status = "not_migrated" else: ds_modified = ds_match.get("lastModified", "") # needs_update if Adobe was modified after the DS template status = "needs_update" if adobe_modified > ds_modified else "migrated" analysis = _get_template_analysis(t.get("id", ""), name) if not _has_analysis_issues(analysis): history_analysis = _get_history_analysis( t.get("id", ""), name, session.get("_session_id") or "legacy", ) analysis = _merge_analysis(analysis, history_analysis) results.append({ "adobe_id": t.get("id"), "name": name, "adobe_modified": adobe_modified, "docusign_id": ds_match.get("templateId") if ds_match else None, "docusign_modified": ds_match.get("lastModified") if ds_match else None, "status": status, "blockers": analysis["blockers"], "warnings": analysis["warnings"], "field_issues": analysis["field_issues"], "analysis_status": analysis["status"], }) return {"templates": results} def _get_template_analysis(template_id: str, template_name: str) -> dict: """ Return validation and composition issues for a downloaded template. Validation blockers/warnings answer "can this migrate at all?" Field issues answer "what mapping caveats would migration introduce?" If the template has not been downloaded yet, there is no local field data to analyze. """ analysis = { "blockers": [], "warnings": [], "field_issues": [], "status": "not_downloaded", } try: from src.services.mapping_service import adobe_folder_to_normalized from src.services.validation_service import validate_template from src.compose_docusign_template import compose_template template_dir = _find_downloaded_template(template_id, template_name) if not template_dir: return analysis normalized, _ = adobe_folder_to_normalized(str(template_dir), include_documents=False) result = validate_template(normalized) analysis["blockers"] = result.blockers analysis["warnings"] = result.warnings try: with tempfile.TemporaryDirectory() as tmpdir: output_path = Path(tmpdir) / "docusign-template.json" _, _compose_warnings, field_issues = compose_template(str(template_dir), str(output_path)) analysis["field_issues"] = field_issues except Exception as exc: analysis["warnings"] = _dedupe([ *analysis["warnings"], f"Field mapping analysis unavailable: {exc}", ]) analysis["status"] = "analyzed" return analysis except Exception as exc: analysis["warnings"] = [f"Template analysis unavailable: {exc}"] analysis["status"] = "error" return analysis def _find_downloaded_template(template_id: str, template_name: str) -> Path | None: downloads_dir = Path(settings.downloads_dir) candidates = list(downloads_dir.glob(f"*__{template_id}")) if not candidates: safe = template_name.replace("/", "_").replace("\\", "_") candidates = list(downloads_dir.glob(f"{safe}*")) return next((c for c in candidates if c.is_dir()), None) def _get_history_analysis(template_id: str, template_name: str, session_scope: str) -> dict: """ Return the latest issue details captured during migration for this template. The production web migration flow downloads Adobe template data to a temp directory, so the Templates page may not have persistent local downloads to re-analyze. Migration history is the source of truth for issues discovered during an actual migration attempt. """ analysis = { "blockers": [], "warnings": [], "field_issues": [], "status": "not_found", } matching_records = [ record for record in _load_history() if record.get("owner_session_id", "legacy") == session_scope and ( record.get("adobe_template_id") == template_id or record.get("adobe_template_name") == template_name ) ] if not matching_records: return analysis matching_records.sort(key=lambda record: record.get("timestamp", ""), reverse=True) for record in matching_records: blockers = record.get("blockers") or [] warnings = _template_warnings(record.get("warnings") or []) field_issues = record.get("field_issues") or [] if blockers or warnings or field_issues: analysis["blockers"] = blockers analysis["warnings"] = warnings analysis["field_issues"] = field_issues analysis["status"] = "history" return analysis analysis["status"] = "history_clean" return analysis def _load_history() -> list: if not os.path.exists(_HISTORY_FILE): return [] try: with open(_HISTORY_FILE, encoding="utf-8") as f: return json.load(f) except Exception: return [] def _template_warnings(warnings: list[str]) -> list[str]: """Remove operational migration messages that should not make a template look risky.""" return [ warning for warning in warnings if not str(warning).startswith("Skipped: template already exists") ] def _has_analysis_issues(analysis: dict) -> bool: return bool(analysis["blockers"] or analysis["warnings"] or analysis["field_issues"]) def _merge_analysis(primary: dict, fallback: dict) -> dict: if fallback["status"] in ("not_found", "history_clean"): return primary merged = { "blockers": _dedupe([*primary["blockers"], *fallback["blockers"]]), "warnings": _dedupe([*primary["warnings"], *fallback["warnings"]]), "field_issues": _dedupe_field_issues([*primary["field_issues"], *fallback["field_issues"]]), "status": fallback["status"] if primary["status"] == "not_downloaded" else primary["status"], } return merged def _dedupe_field_issues(items: list[dict]) -> list[dict]: seen = set() result = [] for item in items: key = ( item.get("code"), item.get("field_name"), item.get("message"), ) if key in seen: continue seen.add(key) result.append(item) return result def _dedupe(items: list[str]) -> list[str]: seen = set() result = [] for item in items: if item in seen: continue seen.add(item) result.append(item) return result