""" web/routers/migrate.py ---------------------- Migration trigger and history endpoints. POST /api/migrate — run the pipeline for one or more Adobe template IDs GET /api/migrate/history — return past migration records """ import asyncio import json import os import sys import tempfile from datetime import datetime, timezone from typing import List, Optional import httpx from fastapi import APIRouter, Request from fastapi.responses import JSONResponse from pydantic import BaseModel from web.config import settings from web.session import get_session # Ensure src/ is on path sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) router = APIRouter() _HISTORY_FILE = os.path.join( os.path.dirname(__file__), "..", "..", "migration-output", ".history.json" ) class MigrateRequest(BaseModel): adobe_template_ids: List[str] def _load_history() -> list: if not os.path.exists(_HISTORY_FILE): return [] with open(_HISTORY_FILE) as f: return json.load(f) def _save_history(records: list) -> None: os.makedirs(os.path.dirname(_HISTORY_FILE), exist_ok=True) with open(_HISTORY_FILE, "w") as f: json.dump(records, f, indent=2) def _load_compose(): """ Dynamically load and return the compose_template function from src/. Isolated in its own function so tests can patch it without touching the file system. """ import importlib.util spec = importlib.util.spec_from_file_location( "compose_docusign_template", os.path.join(os.path.dirname(__file__), "..", "..", "src", "compose_docusign_template.py"), ) compose_mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(compose_mod) return compose_mod.compose_template async def _download_adobe_template(template_id: str, access_token: str, output_dir: str) -> bool: """Download Adobe Sign template files into output_dir. Returns True on success.""" headers = {"Authorization": f"Bearer {access_token}"} base = settings.adobe_sign_base_url async with httpx.AsyncClient() as client: # Metadata meta_resp = await client.get(f"{base}/libraryDocuments/{template_id}", headers=headers) if not meta_resp.is_success: return False metadata = meta_resp.json() # Form fields fields_resp = await client.get(f"{base}/libraryDocuments/{template_id}/formFields", headers=headers) form_fields = fields_resp.json() if fields_resp.is_success else {"fields": []} # Documents list docs_resp = await client.get(f"{base}/libraryDocuments/{template_id}/documents", headers=headers) documents = docs_resp.json() if docs_resp.is_success else {"documents": []} # Download first PDF doc_list = documents.get("documents", []) pdf_bytes = b"" if doc_list: doc_id = doc_list[0].get("id") pdf_resp = await client.get(f"{base}/libraryDocuments/{template_id}/documents/{doc_id}", headers=headers) if pdf_resp.is_success: pdf_bytes = pdf_resp.content os.makedirs(output_dir, exist_ok=True) with open(os.path.join(output_dir, "metadata.json"), "w") as f: json.dump(metadata, f, indent=2) with open(os.path.join(output_dir, "form_fields.json"), "w") as f: json.dump(form_fields, f, indent=2) with open(os.path.join(output_dir, "documents.json"), "w") as f: json.dump(documents, f, indent=2) if pdf_bytes: pdf_name = doc_list[0].get("name", "document.pdf") if doc_list else "document.pdf" if not pdf_name.endswith(".pdf"): pdf_name += ".pdf" with open(os.path.join(output_dir, pdf_name), "wb") as f: f.write(pdf_bytes) return True async def _migrate_one( adobe_id: str, adobe_access_token: str, docusign_access_token: str, ) -> dict: """Run the full pipeline for one Adobe template. Returns a result record.""" timestamp = datetime.now(timezone.utc).isoformat() with tempfile.TemporaryDirectory() as tmpdir: download_dir = os.path.join(tmpdir, "download") output_dir = os.path.join(tmpdir, "output") # 1. Download ok = await _download_adobe_template(adobe_id, adobe_access_token, download_dir) if not ok: return { "timestamp": timestamp, "adobe_template_id": adobe_id, "adobe_template_name": None, "docusign_template_id": None, "action": None, "status": "failed", "error": "Adobe Sign download failed", } # Read template name from metadata with open(os.path.join(download_dir, "metadata.json")) as f: metadata = json.load(f) template_name = metadata.get("name", adobe_id) # 2. Compose DocuSign template JSON composed_file = os.path.join(tmpdir, "docusign-template.json") try: compose_fn = _load_compose() compose_fn(download_dir, composed_file) except Exception as exc: return { "timestamp": timestamp, "adobe_template_id": adobe_id, "adobe_template_name": template_name, "docusign_template_id": None, "action": None, "status": "failed", "error": f"Compose failed: {exc}", } if not os.path.exists(composed_file): return { "timestamp": timestamp, "adobe_template_id": adobe_id, "adobe_template_name": template_name, "docusign_template_id": None, "action": None, "status": "failed", "error": "Compose produced no output file", } # 3. Upload (upsert) to DocuSign using web session token with open(composed_file) as f: template_json = json.load(f) ds_headers = { "Authorization": f"Bearer {docusign_access_token}", "Content-Type": "application/json", "Accept": "application/json", } list_url = f"{settings.docusign_base_url}/v2.1/accounts/{settings.docusign_account_id}/templates" async with httpx.AsyncClient() as client: # Find existing list_resp = await client.get( list_url, headers=ds_headers, params={"search_text": template_name, "count": 100} ) existing_id = None if list_resp.is_success: raw = list_resp.json().get("envelopeTemplates") or list_resp.json().get("templates") or [] exact = [t for t in raw if t.get("name") == template_name] if exact: exact.sort(key=lambda t: t.get("lastModified", ""), reverse=True) existing_id = exact[0]["templateId"] if existing_id: up_resp = await client.put( f"{list_url}/{existing_id}", headers=ds_headers, json=template_json ) action = "updated" template_id = existing_id else: up_resp = await client.post(list_url, headers=ds_headers, json=template_json) action = "created" template_id = up_resp.json().get("templateId") if up_resp.is_success else None if not up_resp.is_success: return { "timestamp": timestamp, "adobe_template_id": adobe_id, "adobe_template_name": template_name, "docusign_template_id": None, "action": None, "status": "failed", "error": f"DocuSign upload failed ({up_resp.status_code}): {up_resp.text[:200]}", } return { "timestamp": timestamp, "adobe_template_id": adobe_id, "adobe_template_name": template_name, "docusign_template_id": template_id, "action": action, "status": "success", "error": None, } @router.post("") async def run_migration(body: MigrateRequest, request: Request): """Migrate one or more Adobe Sign templates to DocuSign.""" session = get_session(request) if not session.get("adobe_access_token"): return JSONResponse({"error": "not authenticated to Adobe Sign"}, status_code=401) if not session.get("docusign_access_token"): return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401) tasks = [ _migrate_one( aid, session["adobe_access_token"], session["docusign_access_token"], ) for aid in body.adobe_template_ids ] results = await asyncio.gather(*tasks) # Append to history history = _load_history() history.extend(results) _save_history(history) return {"results": list(results)} @router.get("/history") def migration_history(): """Return all past migration records.""" return {"history": _load_history()}