adobe-to-docusign-migrator/web/routers/templates.py

204 lines
7.0 KiB
Python

"""
web/routers/templates.py
------------------------
Template listing endpoints for Adobe Sign and DocuSign.
Computes per-template migration status for the side-by-side UI.
"""
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import httpx
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from web.config import settings
from web.docusign_context import DocusignContextError, current_account
from web.session import get_session
router = APIRouter()
def _require_adobe(session: dict) -> Optional[JSONResponse]:
if not session.get("adobe_access_token"):
return JSONResponse({"error": "not authenticated to Adobe Sign"}, status_code=401)
return None
def _require_docusign(session: dict) -> Optional[JSONResponse]:
if not session.get("docusign_access_token"):
return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401)
try:
current_account(session)
except DocusignContextError as e:
return JSONResponse({"error": str(e), "code": e.code}, status_code=e.status_code)
return None
@router.get("/adobe")
async def list_adobe_templates(request: Request):
"""List all Adobe Sign library documents (templates) for the current user."""
session = get_session(request)
err = _require_adobe(session)
if err:
return err
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{settings.adobe_sign_base_url}/libraryDocuments",
headers={"Authorization": f"Bearer {session['adobe_access_token']}"},
params={"pageSize": 100},
)
if not resp.is_success:
return JSONResponse({"error": "Adobe Sign API error", "detail": resp.text}, status_code=502)
data = resp.json()
templates = [
{
"id": t.get("id"),
"name": t.get("name"),
"modifiedDate": t.get("modifiedDate"),
"sharingMode": t.get("sharingMode"),
}
for t in data.get("libraryDocumentList", [])
]
return {"templates": templates}
@router.get("/docusign")
async def list_docusign_templates(request: Request):
"""List all DocuSign templates for the account."""
session = get_session(request)
err = _require_docusign(session)
if err:
return err
account = current_account(session)
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{account['base_url']}/v2.1/accounts/{account['account_id']}/templates",
headers={"Authorization": f"Bearer {session['docusign_access_token']}"},
params={"count": 100},
)
if not resp.is_success:
return JSONResponse({"error": "DocuSign API error", "detail": resp.text}, status_code=502)
data = resp.json()
raw = data.get("envelopeTemplates") or data.get("templates") or []
templates = [
{
"id": t.get("templateId"),
"name": t.get("name"),
"lastModified": t.get("lastModified"),
}
for t in raw
]
return {"templates": templates}
@router.get("/status")
async def template_status(request: Request):
"""
Merged view: each Adobe template tagged with migration status.
Status values:
not_migrated — no DocuSign template with the same name
migrated — at least one exact name match in DocuSign
needs_update — name match exists but Adobe template is newer
"""
session = get_session(request)
err = _require_adobe(session) or _require_docusign(session)
if err:
return err
account = current_account(session)
# Fetch both lists concurrently
async with httpx.AsyncClient() as client:
adobe_resp, ds_resp = await asyncio.gather(
client.get(
f"{settings.adobe_sign_base_url}/libraryDocuments",
headers={"Authorization": f"Bearer {session['adobe_access_token']}"},
params={"pageSize": 100},
),
client.get(
f"{account['base_url']}/v2.1/accounts/{account['account_id']}/templates",
headers={"Authorization": f"Bearer {session['docusign_access_token']}"},
params={"count": 100},
),
)
if not adobe_resp.is_success:
return JSONResponse({"error": "Adobe Sign API error"}, status_code=502)
if not ds_resp.is_success:
return JSONResponse({"error": "DocuSign API error"}, status_code=502)
adobe_templates = adobe_resp.json().get("libraryDocumentList", [])
ds_raw = ds_resp.json().get("envelopeTemplates") or ds_resp.json().get("templates") or []
# Build a name → most-recently-modified DocuSign template lookup
ds_by_name: dict[str, dict] = {}
for t in ds_raw:
name = t.get("name", "")
existing = ds_by_name.get(name)
if not existing or t.get("lastModified", "") > existing.get("lastModified", ""):
ds_by_name[name] = t
results = []
for t in adobe_templates:
name = t.get("name", "")
adobe_modified = t.get("modifiedDate", "")
ds_match = ds_by_name.get(name)
if not ds_match:
status = "not_migrated"
else:
ds_modified = ds_match.get("lastModified", "")
# needs_update if Adobe was modified after the DS template
status = "needs_update" if adobe_modified > ds_modified else "migrated"
blockers, warnings = _get_validation(t.get("id", ""), name)
results.append({
"adobe_id": t.get("id"),
"name": name,
"adobe_modified": adobe_modified,
"docusign_id": ds_match.get("templateId") if ds_match else None,
"docusign_modified": ds_match.get("lastModified") if ds_match else None,
"status": status,
"blockers": blockers,
"warnings": warnings,
})
return {"templates": results}
def _get_validation(template_id: str, template_name: str) -> tuple[list, list]:
"""Return (blockers, warnings) if the template has been downloaded; else ([], [])."""
try:
from src.services.mapping_service import adobe_folder_to_normalized
from src.services.validation_service import validate_template
downloads_dir = Path(settings.downloads_dir) if hasattr(settings, "downloads_dir") else Path("downloads")
# Match folder by name__id or name pattern
candidates = list(downloads_dir.glob(f"*__{template_id}"))
if not candidates:
# Try matching by sanitised name prefix
safe = template_name.replace("/", "_").replace("\\", "_")
candidates = list(downloads_dir.glob(f"{safe}*"))
if not candidates or not candidates[0].is_dir():
return [], []
normalized = adobe_folder_to_normalized(str(candidates[0]))
result = validate_template(normalized)
return result.blockers, result.warnings
except Exception:
return [], []
# asyncio needed for gather — import at top of module
import asyncio