""" web/routers/templates.py ------------------------ Template listing endpoints for Adobe Sign and DocuSign. Computes per-template migration status for the side-by-side UI. """ from datetime import datetime, timezone from pathlib import Path import tempfile from typing import Optional import httpx from fastapi import APIRouter, Request from fastapi.responses import JSONResponse from web.config import settings from web.docusign_context import DocusignContextError, current_account from web.session import get_session router = APIRouter() def _require_adobe(session: dict) -> Optional[JSONResponse]: if not session.get("adobe_access_token"): return JSONResponse({"error": "not authenticated to Adobe Sign"}, status_code=401) return None def _require_docusign(session: dict) -> Optional[JSONResponse]: if not session.get("docusign_access_token"): return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401) try: current_account(session) except DocusignContextError as e: return JSONResponse({"error": str(e), "code": e.code}, status_code=e.status_code) return None @router.get("/adobe") async def list_adobe_templates(request: Request): """List all Adobe Sign library documents (templates) for the current user.""" session = get_session(request) err = _require_adobe(session) if err: return err async with httpx.AsyncClient() as client: resp = await client.get( f"{settings.adobe_sign_base_url}/libraryDocuments", headers={"Authorization": f"Bearer {session['adobe_access_token']}"}, params={"pageSize": 100}, ) if not resp.is_success: return JSONResponse({"error": "Adobe Sign API error", "detail": resp.text}, status_code=502) data = resp.json() templates = [ { "id": t.get("id"), "name": t.get("name"), "modifiedDate": t.get("modifiedDate"), "sharingMode": t.get("sharingMode"), } for t in data.get("libraryDocumentList", []) ] return {"templates": templates} @router.get("/docusign") async def list_docusign_templates(request: Request): """List all DocuSign templates for the account.""" session = get_session(request) err = _require_docusign(session) if err: return err account = current_account(session) async with httpx.AsyncClient() as client: resp = await client.get( f"{account['base_url']}/v2.1/accounts/{account['account_id']}/templates", headers={"Authorization": f"Bearer {session['docusign_access_token']}"}, params={"count": 100}, ) if not resp.is_success: return JSONResponse({"error": "DocuSign API error", "detail": resp.text}, status_code=502) data = resp.json() raw = data.get("envelopeTemplates") or data.get("templates") or [] templates = [ { "id": t.get("templateId"), "name": t.get("name"), "lastModified": t.get("lastModified"), } for t in raw ] return {"templates": templates} @router.get("/status") async def template_status(request: Request): """ Merged view: each Adobe template tagged with migration status. Status values: not_migrated — no DocuSign template with the same name migrated — at least one exact name match in DocuSign needs_update — name match exists but Adobe template is newer """ session = get_session(request) err = _require_adobe(session) or _require_docusign(session) if err: return err account = current_account(session) # Fetch both lists concurrently async with httpx.AsyncClient() as client: adobe_resp, ds_resp = await asyncio.gather( client.get( f"{settings.adobe_sign_base_url}/libraryDocuments", headers={"Authorization": f"Bearer {session['adobe_access_token']}"}, params={"pageSize": 100}, ), client.get( f"{account['base_url']}/v2.1/accounts/{account['account_id']}/templates", headers={"Authorization": f"Bearer {session['docusign_access_token']}"}, params={"count": 100}, ), ) if not adobe_resp.is_success: return JSONResponse({"error": "Adobe Sign API error"}, status_code=502) if not ds_resp.is_success: return JSONResponse({"error": "DocuSign API error"}, status_code=502) adobe_templates = adobe_resp.json().get("libraryDocumentList", []) ds_raw = ds_resp.json().get("envelopeTemplates") or ds_resp.json().get("templates") or [] # Build a name → most-recently-modified DocuSign template lookup ds_by_name: dict[str, dict] = {} for t in ds_raw: name = t.get("name", "") existing = ds_by_name.get(name) if not existing or t.get("lastModified", "") > existing.get("lastModified", ""): ds_by_name[name] = t results = [] for t in adobe_templates: name = t.get("name", "") adobe_modified = t.get("modifiedDate", "") ds_match = ds_by_name.get(name) if not ds_match: status = "not_migrated" else: ds_modified = ds_match.get("lastModified", "") # needs_update if Adobe was modified after the DS template status = "needs_update" if adobe_modified > ds_modified else "migrated" analysis = _get_template_analysis(t.get("id", ""), name) results.append({ "adobe_id": t.get("id"), "name": name, "adobe_modified": adobe_modified, "docusign_id": ds_match.get("templateId") if ds_match else None, "docusign_modified": ds_match.get("lastModified") if ds_match else None, "status": status, "blockers": analysis["blockers"], "warnings": analysis["warnings"], "field_issues": analysis["field_issues"], "analysis_status": analysis["status"], }) return {"templates": results} def _get_template_analysis(template_id: str, template_name: str) -> dict: """ Return validation and composition issues for a downloaded template. Validation blockers/warnings answer "can this migrate at all?" Field issues answer "what mapping caveats would migration introduce?" If the template has not been downloaded yet, there is no local field data to analyze. """ analysis = { "blockers": [], "warnings": [], "field_issues": [], "status": "not_downloaded", } try: from src.services.mapping_service import adobe_folder_to_normalized from src.services.validation_service import validate_template from src.compose_docusign_template import compose_template template_dir = _find_downloaded_template(template_id, template_name) if not template_dir: return analysis normalized, _ = adobe_folder_to_normalized(str(template_dir), include_documents=False) result = validate_template(normalized) analysis["blockers"] = result.blockers analysis["warnings"] = result.warnings try: with tempfile.TemporaryDirectory() as tmpdir: output_path = Path(tmpdir) / "docusign-template.json" _, _compose_warnings, field_issues = compose_template(str(template_dir), str(output_path)) analysis["field_issues"] = field_issues except Exception as exc: analysis["warnings"] = _dedupe([ *analysis["warnings"], f"Field mapping analysis unavailable: {exc}", ]) analysis["status"] = "analyzed" return analysis except Exception as exc: analysis["warnings"] = [f"Template analysis unavailable: {exc}"] analysis["status"] = "error" return analysis def _find_downloaded_template(template_id: str, template_name: str) -> Path | None: downloads_dir = Path(settings.downloads_dir) candidates = list(downloads_dir.glob(f"*__{template_id}")) if not candidates: safe = template_name.replace("/", "_").replace("\\", "_") candidates = list(downloads_dir.glob(f"{safe}*")) return next((c for c in candidates if c.is_dir()), None) def _dedupe(items: list[str]) -> list[str]: seen = set() result = [] for item in items: if item in seen: continue seen.add(item) result.append(item) return result # asyncio needed for gather — import at top of module import asyncio