""" web/routers/templates.py ------------------------ Template listing endpoints for Adobe Sign and DocuSign. Computes per-template migration status for the side-by-side UI. """ from datetime import datetime, timezone from pathlib import Path from typing import Optional import httpx from fastapi import APIRouter, Request from fastapi.responses import JSONResponse from web.config import settings from web.docusign_context import DocusignContextError, current_account from web.session import get_session router = APIRouter() def _require_adobe(session: dict) -> Optional[JSONResponse]: if not session.get("adobe_access_token"): return JSONResponse({"error": "not authenticated to Adobe Sign"}, status_code=401) return None def _require_docusign(session: dict) -> Optional[JSONResponse]: if not session.get("docusign_access_token"): return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401) try: current_account(session) except DocusignContextError as e: return JSONResponse({"error": str(e), "code": e.code}, status_code=e.status_code) return None @router.get("/adobe") async def list_adobe_templates(request: Request): """List all Adobe Sign library documents (templates) for the current user.""" session = get_session(request) err = _require_adobe(session) if err: return err async with httpx.AsyncClient() as client: resp = await client.get( f"{settings.adobe_sign_base_url}/libraryDocuments", headers={"Authorization": f"Bearer {session['adobe_access_token']}"}, params={"pageSize": 100}, ) if not resp.is_success: return JSONResponse({"error": "Adobe Sign API error", "detail": resp.text}, status_code=502) data = resp.json() templates = [ { "id": t.get("id"), "name": t.get("name"), "modifiedDate": t.get("modifiedDate"), "sharingMode": t.get("sharingMode"), } for t in data.get("libraryDocumentList", []) ] return {"templates": templates} @router.get("/docusign") async def list_docusign_templates(request: Request): """List all DocuSign templates for the account.""" session = get_session(request) err = _require_docusign(session) if err: return err account = current_account(session) async with httpx.AsyncClient() as client: resp = await client.get( f"{account['base_url']}/v2.1/accounts/{account['account_id']}/templates", headers={"Authorization": f"Bearer {session['docusign_access_token']}"}, params={"count": 100}, ) if not resp.is_success: return JSONResponse({"error": "DocuSign API error", "detail": resp.text}, status_code=502) data = resp.json() raw = data.get("envelopeTemplates") or data.get("templates") or [] templates = [ { "id": t.get("templateId"), "name": t.get("name"), "lastModified": t.get("lastModified"), } for t in raw ] return {"templates": templates} @router.get("/status") async def template_status(request: Request): """ Merged view: each Adobe template tagged with migration status. Status values: not_migrated — no DocuSign template with the same name migrated — at least one exact name match in DocuSign needs_update — name match exists but Adobe template is newer """ session = get_session(request) err = _require_adobe(session) or _require_docusign(session) if err: return err account = current_account(session) # Fetch both lists concurrently async with httpx.AsyncClient() as client: adobe_resp, ds_resp = await asyncio.gather( client.get( f"{settings.adobe_sign_base_url}/libraryDocuments", headers={"Authorization": f"Bearer {session['adobe_access_token']}"}, params={"pageSize": 100}, ), client.get( f"{account['base_url']}/v2.1/accounts/{account['account_id']}/templates", headers={"Authorization": f"Bearer {session['docusign_access_token']}"}, params={"count": 100}, ), ) if not adobe_resp.is_success: return JSONResponse({"error": "Adobe Sign API error"}, status_code=502) if not ds_resp.is_success: return JSONResponse({"error": "DocuSign API error"}, status_code=502) adobe_templates = adobe_resp.json().get("libraryDocumentList", []) ds_raw = ds_resp.json().get("envelopeTemplates") or ds_resp.json().get("templates") or [] # Build a name → most-recently-modified DocuSign template lookup ds_by_name: dict[str, dict] = {} for t in ds_raw: name = t.get("name", "") existing = ds_by_name.get(name) if not existing or t.get("lastModified", "") > existing.get("lastModified", ""): ds_by_name[name] = t results = [] for t in adobe_templates: name = t.get("name", "") adobe_modified = t.get("modifiedDate", "") ds_match = ds_by_name.get(name) if not ds_match: status = "not_migrated" else: ds_modified = ds_match.get("lastModified", "") # needs_update if Adobe was modified after the DS template status = "needs_update" if adobe_modified > ds_modified else "migrated" blockers, warnings = _get_validation(t.get("id", ""), name) results.append({ "adobe_id": t.get("id"), "name": name, "adobe_modified": adobe_modified, "docusign_id": ds_match.get("templateId") if ds_match else None, "docusign_modified": ds_match.get("lastModified") if ds_match else None, "status": status, "blockers": blockers, "warnings": warnings, }) return {"templates": results} def _get_validation(template_id: str, template_name: str) -> tuple[list, list]: """Return (blockers, warnings) if the template has been downloaded; else ([], []).""" try: from src.services.mapping_service import adobe_folder_to_normalized from src.services.validation_service import validate_template downloads_dir = Path(settings.downloads_dir) if hasattr(settings, "downloads_dir") else Path("downloads") # Match folder by name__id or name pattern candidates = list(downloads_dir.glob(f"*__{template_id}")) if not candidates: # Try matching by sanitised name prefix safe = template_name.replace("/", "_").replace("\\", "_") candidates = list(downloads_dir.glob(f"{safe}*")) if not candidates or not candidates[0].is_dir(): return [], [] normalized = adobe_folder_to_normalized(str(candidates[0])) result = validate_template(normalized) return result.blockers, result.warnings except Exception: return [], [] # asyncio needed for gather — import at top of module import asyncio