""" web/routers/migrate.py ---------------------- Migration trigger and history endpoints. POST /api/migrate — run the pipeline for one or more Adobe template IDs POST /api/migrate/batch — batch migration with async progress tracking GET /api/migrate/batch/{id} — poll batch job status GET /api/migrate/history — return past migration records """ import asyncio import json import os import sys import tempfile import uuid from datetime import datetime, timezone from typing import Dict, List, Optional import httpx from fastapi import APIRouter, Request from fastapi.responses import JSONResponse from pydantic import BaseModel from web.config import settings from web.docusign_context import DocusignContextError, current_account from web.session import get_session sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) router = APIRouter() _HISTORY_FILE = os.path.join( os.path.dirname(__file__), "..", "..", "migration-output", ".history.json" ) # In-memory batch job store (keyed by job_id) _batch_jobs: Dict[str, dict] = {} class MigrationOptions(BaseModel): dry_run: bool = False overwrite_if_exists: bool = False include_documents: bool = True class MigrateRequest(BaseModel): # Primary API (blueprint-aligned) source_template_ids: Optional[List[str]] = None target_folder: Optional[str] = None options: MigrationOptions = MigrationOptions() # Legacy field kept for backward compatibility adobe_template_ids: Optional[List[str]] = None def resolved_ids(self) -> List[str]: return self.source_template_ids or self.adobe_template_ids or [] def _load_history() -> list: if not os.path.exists(_HISTORY_FILE): return [] with open(_HISTORY_FILE) as f: return json.load(f) def _save_history(records: list) -> None: os.makedirs(os.path.dirname(_HISTORY_FILE), exist_ok=True) with open(_HISTORY_FILE, "w") as f: json.dump(records, f, indent=2) def _session_scope(session: dict) -> str: return session.get("_session_id") or "legacy" def _scope_record(record: dict, session_scope: str) -> dict: scoped = dict(record) scoped["owner_session_id"] = session_scope return scoped def _filter_history_for_session(records: list, session_scope: str) -> list: return [record for record in records if record.get("owner_session_id", "legacy") == session_scope] def _load_compose(): """Dynamically load compose_template from src/.""" import importlib.util spec = importlib.util.spec_from_file_location( "compose_docusign_template", os.path.join(os.path.dirname(__file__), "..", "..", "src", "compose_docusign_template.py"), ) compose_mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(compose_mod) return compose_mod.compose_template async def _download_adobe_template(template_id: str, access_token: str, output_dir: str) -> bool: """Download Adobe Sign template files into output_dir. Returns True on success.""" headers = {"Authorization": f"Bearer {access_token}"} base = settings.adobe_sign_base_url async with httpx.AsyncClient() as client: meta_resp = await client.get(f"{base}/libraryDocuments/{template_id}", headers=headers) if not meta_resp.is_success: return False metadata = meta_resp.json() fields_resp = await client.get(f"{base}/libraryDocuments/{template_id}/formFields", headers=headers) form_fields = fields_resp.json() if fields_resp.is_success else {"fields": []} docs_resp = await client.get(f"{base}/libraryDocuments/{template_id}/documents", headers=headers) documents = docs_resp.json() if docs_resp.is_success else {"documents": []} doc_list = documents.get("documents", []) pdf_bytes = b"" if doc_list: doc_id = doc_list[0].get("id") pdf_resp = await client.get(f"{base}/libraryDocuments/{template_id}/documents/{doc_id}", headers=headers) if pdf_resp.is_success: pdf_bytes = pdf_resp.content os.makedirs(output_dir, exist_ok=True) with open(os.path.join(output_dir, "metadata.json"), "w") as f: json.dump(metadata, f, indent=2) with open(os.path.join(output_dir, "form_fields.json"), "w") as f: json.dump(form_fields, f, indent=2) with open(os.path.join(output_dir, "documents.json"), "w") as f: json.dump(documents, f, indent=2) if pdf_bytes: pdf_name = doc_list[0].get("name", "document.pdf") if doc_list else "document.pdf" if not pdf_name.endswith(".pdf"): pdf_name += ".pdf" with open(os.path.join(output_dir, pdf_name), "wb") as f: f.write(pdf_bytes) return True def _run_validation(download_dir: str) -> dict: """Run validation service on downloaded template, return summary.""" try: from src.services.mapping_service import adobe_folder_to_normalized from src.services.validation_service import validate_template norm, _ = adobe_folder_to_normalized(download_dir) result = validate_template(norm) return { "blockers": result.blockers, "warnings": result.warnings, "has_blockers": result.has_blockers(), } except Exception as exc: return {"blockers": [], "warnings": [f"Validation skipped: {exc}"], "has_blockers": False} async def _migrate_one( adobe_id: str, adobe_access_token: str, docusign_access_token: str, docusign_account_id: str, docusign_base_url: str, options: MigrationOptions, ) -> dict: """Run the full pipeline for one Adobe template. Returns a result record.""" timestamp = datetime.now(timezone.utc).isoformat() with tempfile.TemporaryDirectory() as tmpdir: download_dir = os.path.join(tmpdir, "download") output_dir = os.path.join(tmpdir, "output") # 1. Download ok = await _download_adobe_template(adobe_id, adobe_access_token, download_dir) if not ok: return { "timestamp": timestamp, "adobe_template_id": adobe_id, "adobe_template_name": None, "docusign_template_id": None, "action": None, "status": "failed", "error": "Adobe Sign download failed", "warnings": [], "blockers": [], "field_issues": [], "dry_run": options.dry_run, } with open(os.path.join(download_dir, "metadata.json")) as f: metadata = json.load(f) template_name = metadata.get("name", adobe_id) # 2. Validate validation = _run_validation(download_dir) if validation["has_blockers"]: return { "timestamp": timestamp, "adobe_template_id": adobe_id, "adobe_template_name": template_name, "docusign_template_id": None, "action": "blocked", "status": "blocked", "error": f"Validation blockers: {'; '.join(validation['blockers'])}", "warnings": validation["warnings"], "blockers": validation["blockers"], "field_issues": [], "dry_run": options.dry_run, } # 3. Compose composed_file = os.path.join(tmpdir, "docusign-template.json") compose_issues: list = [] try: compose_fn = _load_compose() compose_result = compose_fn(download_dir, composed_file) # compose_template returns (template, warnings, issues) if isinstance(compose_result, tuple) and len(compose_result) >= 3: compose_issues = compose_result[2] or [] except Exception as exc: return { "timestamp": timestamp, "adobe_template_id": adobe_id, "adobe_template_name": template_name, "docusign_template_id": None, "action": None, "status": "failed", "error": f"Compose failed: {exc}", "warnings": validation["warnings"], "blockers": [], "field_issues": [], "dry_run": options.dry_run, } if not os.path.exists(composed_file): return { "timestamp": timestamp, "adobe_template_id": adobe_id, "adobe_template_name": template_name, "docusign_template_id": None, "action": None, "status": "failed", "error": "Compose produced no output file", "warnings": validation["warnings"], "blockers": [], "field_issues": [], "dry_run": options.dry_run, } # 4. Dry run — stop here, do not upload if options.dry_run: return { "timestamp": timestamp, "adobe_template_id": adobe_id, "adobe_template_name": template_name, "docusign_template_id": None, "action": "dry_run", "status": "dry_run", "error": None, "warnings": validation["warnings"], "blockers": [], "field_issues": compose_issues, "dry_run": True, } # 5. Upload (upsert) to DocuSign with open(composed_file) as f: template_json = json.load(f) if not options.include_documents: for doc in template_json.get("documents", []): doc.pop("documentBase64", None) ds_headers = { "Authorization": f"Bearer {docusign_access_token}", "Content-Type": "application/json", "Accept": "application/json", } list_url = f"{docusign_base_url}/v2.1/accounts/{docusign_account_id}/templates" async with httpx.AsyncClient() as client: # Duplicate detection list_resp = await client.get( list_url, headers=ds_headers, params={"search_text": template_name, "count": 100} ) existing_id = None if list_resp.is_success: raw = list_resp.json().get("envelopeTemplates") or list_resp.json().get("templates") or [] exact = [t for t in raw if t.get("name") == template_name] if exact: exact.sort(key=lambda t: t.get("lastModified", ""), reverse=True) existing_id = exact[0]["templateId"] # Skip if already exists and overwrite is disabled if existing_id and not options.overwrite_if_exists: return { "timestamp": timestamp, "adobe_template_id": adobe_id, "adobe_template_name": template_name, "docusign_template_id": existing_id, "action": "skipped", "status": "skipped", "error": None, "warnings": validation["warnings"] + ["Skipped: template already exists (overwrite_if_exists=false)"], "blockers": [], "field_issues": compose_issues, "dry_run": False, } if existing_id: up_resp = await client.put( f"{list_url}/{existing_id}", headers=ds_headers, json=template_json ) action = "updated" template_id = existing_id else: up_resp = await client.post(list_url, headers=ds_headers, json=template_json) action = "created" template_id = up_resp.json().get("templateId") if up_resp.is_success else None if not up_resp.is_success: return { "timestamp": timestamp, "adobe_template_id": adobe_id, "adobe_template_name": template_name, "docusign_template_id": None, "action": None, "status": "failed", "error": f"DocuSign upload failed ({up_resp.status_code}): {up_resp.text[:200]}", "warnings": validation["warnings"], "blockers": [], "field_issues": compose_issues, "dry_run": False, } return { "timestamp": timestamp, "adobe_template_id": adobe_id, "adobe_template_name": template_name, "docusign_template_id": template_id, "action": action, "status": "success", "error": None, "warnings": validation["warnings"], "blockers": [], "field_issues": compose_issues, "dry_run": False, } @router.post("") async def run_migration(body: MigrateRequest, request: Request): """Migrate one or more Adobe Sign templates to DocuSign.""" session = get_session(request) if not session.get("adobe_access_token"): return JSONResponse({"error": "not authenticated to Adobe Sign"}, status_code=401) if not session.get("docusign_access_token"): return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401) try: account = current_account(session) except DocusignContextError as e: return JSONResponse({"error": str(e), "code": e.code}, status_code=e.status_code) ids = body.resolved_ids() if not ids: return JSONResponse({"error": "no template IDs provided"}, status_code=400) session_scope = _session_scope(session) tasks = [ _migrate_one( aid, session["adobe_access_token"], session["docusign_access_token"], account["account_id"], account["base_url"], body.options, ) for aid in ids ] results = await asyncio.gather(*tasks) scoped_results = [_scope_record(result, session_scope) for result in results] history = _load_history() history.extend(scoped_results) _save_history(history) return {"results": list(scoped_results)} @router.get("/history") def migration_history(request: Request): """Return all past migration records.""" session_scope = _session_scope(get_session(request)) return {"history": _filter_history_for_session(_load_history(), session_scope)} # --------------------------------------------------------------------------- # Batch migration # --------------------------------------------------------------------------- async def _run_batch_job( job_id: str, owner_session_id: str, ids: List[str], adobe_token: str, ds_token: str, ds_account_id: str, ds_base_url: str, options: MigrationOptions, ) -> None: """Background coroutine that processes a batch job and updates _batch_jobs.""" job = _batch_jobs[job_id] job["status"] = "running" results = [] for i, adobe_id in enumerate(ids): job["progress"] = {"completed": i, "total": len(ids), "current_id": adobe_id} result = await _migrate_one(adobe_id, adobe_token, ds_token, ds_account_id, ds_base_url, options) # Retry once on transient failures (network errors, not validation blockers) if result["status"] == "failed" and "upload failed" in (result.get("error") or ""): result = await _migrate_one(adobe_id, adobe_token, ds_token, ds_account_id, ds_base_url, options) if result["status"] != "failed": result["retried"] = True results.append(result) job["results"] = results # Persist to history history = _load_history() history.extend(_scope_record(result, owner_session_id) for result in results) _save_history(history) success = sum(1 for r in results if r["status"] == "success") failed = sum(1 for r in results if r["status"] in ("failed", "blocked")) skipped = sum(1 for r in results if r["status"] == "skipped") dry_runs = sum(1 for r in results if r["status"] == "dry_run") job["status"] = "completed" job["progress"] = {"completed": len(ids), "total": len(ids), "current_id": None} job["summary"] = { "total": len(ids), "success": success, "failed": failed, "skipped": skipped, "dry_run": dry_runs, } @router.post("/batch") async def run_batch_migration(body: MigrateRequest, request: Request): """ Start an async batch migration job. Returns a job_id immediately. Poll GET /api/migrate/batch/{job_id} for status. """ session = get_session(request) if not session.get("adobe_access_token"): return JSONResponse({"error": "not authenticated to Adobe Sign"}, status_code=401) if not session.get("docusign_access_token"): return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401) try: account = current_account(session) except DocusignContextError as e: return JSONResponse({"error": str(e), "code": e.code}, status_code=e.status_code) ids = body.resolved_ids() if not ids: return JSONResponse({"error": "no template IDs provided"}, status_code=400) session_scope = _session_scope(session) job_id = str(uuid.uuid4()) _batch_jobs[job_id] = { "job_id": job_id, "owner_session_id": session_scope, "status": "queued", "total": len(ids), "results": [], "progress": {"completed": 0, "total": len(ids), "current_id": None}, "summary": None, "created_at": datetime.now(timezone.utc).isoformat(), } asyncio.create_task( _run_batch_job( job_id, session_scope, ids, session["adobe_access_token"], session["docusign_access_token"], account["account_id"], account["base_url"], body.options, ) ) return {"job_id": job_id, "total": len(ids), "status": "queued"} @router.get("/batch/{job_id}") def get_batch_status(job_id: str, request: Request): """Poll the status of a batch migration job.""" job = _batch_jobs.get(job_id) if not job: return JSONResponse({"error": "batch job not found"}, status_code=404) session_scope = _session_scope(get_session(request)) if job.get("owner_session_id") != session_scope: return JSONResponse({"error": "batch job not found"}, status_code=404) return job