feat(phases-8-13): blueprint alignment — normalized schema, validation, migration options, retry, security, batch

Phase 8: Normalized intermediate schema (src/models/normalized_template.py, src/services/mapping_service.py)
  - Platform-agnostic NormalizedTemplate as pipeline bridge
  - PDF SHA-256 checksums computed during normalization
  - 13 tests passing

Phase 9: Validation service + report builder (src/services/validation_service.py, src/reports/report_builder.py)
  - Blockers (no recipients, no documents) halt migration
  - Warnings (no fields, unassigned roles, unsupported features) logged
  - Structured MigrationReport with per-template status
  - 20 tests passing

Phase 10: Migration options API (web/routers/migrate.py)
  - source_template_ids, dry_run, overwrite_if_exists, include_documents options
  - Backward compatible with legacy adobe_template_ids field
  - 7 tests passing

Phase 11: Retry with exponential backoff (src/utils/retry.py)
  - retry_with_backoff and async_retry_with_backoff decorators
  - 429/5xx detection via check_response_retryable
  - 14 tests passing

Phase 12: Security hardening (src/utils/log_sanitizer.py)
  - SanitizingFilter redacts tokens, JWTs, base64 PDF content from logs
  - 15 tests passing

Phase 13: Batch migration API
  - POST /api/migrate/batch — async background job with job_id
  - GET /api/migrate/batch/{job_id} — poll progress and results
  - 6 tests passing

Full suite: 108/108 tests passing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Paul Huliganga 2026-04-21 02:19:38 -04:00
parent 854d112372
commit 342e8c3471
22 changed files with 2433 additions and 53 deletions

View File

@ -1,9 +1,9 @@
# Initial Product Spec (Draft) # Product Specification
## Project: Adobe Sign to DocuSign Template Migrator ## Project: Adobe Sign to DocuSign Template Migrator
### Purpose ### Purpose
Develop an agent/toolkit that can programmatically extract template data and field logic from Adobe Sign (“library documents”), map/transform into DocuSigns template model, and create new DocuSign templates to reduce manual migration effort. Develop an agent/toolkit that can programmatically extract template data and field logic from Adobe Sign ("library documents"), map/transform into DocuSign's template model, and create new DocuSign templates to reduce manual migration effort.
--- ---
@ -12,28 +12,249 @@ Develop an agent/toolkit that can programmatically extract template data and fie
- Generate best-approximation DocuSign templates programmatically - Generate best-approximation DocuSign templates programmatically
- Handle all basic field types and recipient roles - Handle all basic field types and recipient roles
- Detect and warn on features needing special/manual handling (complex logic, custom validations, non-mappable features) - Detect and warn on features needing special/manual handling (complex logic, custom validations, non-mappable features)
- Produce a structured migration report with successes, warnings, and manual-fix items
### Key Features (MVP)
- Connect to Adobe Sign and DocuSign APIs via credentials loaded from .env
- Extract template listing from Adobe Sign sandbox/account
- Pull all required endpoints: metadata, formFields, recipients, workflows
- Assemble complete data model for each imported template
- Mapping layer: field type/role/routing normalization (see field-mapping.md)
- Programmatically create equivalent template and roles in DocuSign
- Logging and reporting of success, errors, edge cases
### Stretch (Future)
- UI for side-by-side compare/QA
- Complex feature transform plugins
- Bulk mode & idempotent re-runs
- Support for in-place PDF field overlay (anchors/rects)
--- ---
#### Out of Scope (MVP) ### Architecture
- Agreement instance migration (focus on templates only)
#### Components
- **Adobe Sign Client** (`src/adobe_api.py`) — authenticated API calls, template listing/download
- **DocuSign Client** (`src/upload_docusign_template.py`, `src/docusign_auth.py`) — JWT auth, template upsert
- **Normalized Schema Model** (`src/models/normalized_template.py`) — platform-agnostic intermediate representation
- **Mapping Service** (`src/services/mapping_service.py`) — field type, recipient role, coordinate translation
- **Validation Service** (`src/services/validation_service.py`) — field count comparison, recipient checks, missing role detection
- **Migration Service** (`src/services/migration_service.py`) — orchestrates download → normalize → validate → compose → upload
- **Report Builder** (`src/reports/report_builder.py`) — structured success/warning/error output
- **Web API** (`web/`) — FastAPI endpoints for browser-based orchestration
- **Frontend** (`web/static/`) — side-by-side template browser, migration UI
#### Service Separation
```
src/
models/
normalized_template.py # intermediate schema
services/
migration_service.py # pipeline orchestration
mapping_service.py # field/role/coord transformations
validation_service.py # pre/post migration checks
reports/
report_builder.py # structured report output
utils/
pdf_coords.py # coordinate normalization helpers
```
---
### High-Level Migration Flow
1. Authenticate to both Adobe Sign and DocuSign (OAuth)
2. List and select Adobe Sign templates
3. Extract: metadata, formFields, recipients, documents, workflows
4. **Normalize** into platform-agnostic intermediate schema
5. **Validate** normalized schema — blockers stop migration; warnings are logged
6. Map to DocuSign template payload
7. Upsert (create or update) in DocuSign
8. Generate migration report
---
### Internal Normalized Schema
Use an intermediate model so the tool is not tightly coupled to either platform. This enables future support for additional eSign platforms.
#### Schema Structure
```json
{
"template": {
"name": "Sales Agreement",
"description": "Migrated from Adobe Sign",
"emailSubject": "Please sign: Sales Agreement",
"emailMessage": "",
"documents": [],
"roles": [
{ "name": "Customer", "order": 1, "actionType": "SIGN" },
{ "name": "Company", "order": 2, "actionType": "SIGN" }
],
"fields": [
{
"type": "signature",
"page": 1,
"x": 120, "y": 540,
"width": 140, "height": 28,
"required": true,
"roleName": "Customer"
}
],
"reminderEnabled": false,
"expirationDays": null
}
}
```
---
### Core Entities to Migrate
| Entity | Adobe Sign Source | DocuSign Target |
|-------------------|-----------------------------|-----------------------------|
| Template name | `name` | `name` |
| Description | `message` | `description` |
| Documents (PDFs) | `libraryDocumentId` → bytes | `documents[]` |
| Recipient roles | `participantSetsInfo` | `recipients.signers[]` |
| Routing order | `participantSetsInfo.order` | `routingOrder` |
| Form fields | `formFields` | `tabs` per recipient |
| Email subject | `emailSubject` | `emailSubject` |
| Reminders | `reminderFrequency` | `reminders` |
| Expiration | `daysUntilSigningDeadline` | `expirationDateTime` |
---
### Mapping Logic
#### 1. Recipient and Role Mapping
- Map Adobe Sign participant sets → DocuSign template roles
- Preserve routing order
- Map action types: SIGN → signer, APPROVE → approver, CC → carbonCopy
#### 2. Field Type Mapping
```json
{
"SIGNATURE": "signHere",
"INITIALS": "initialHere",
"TEXT": "text",
"CHECKBOX": "checkbox",
"RADIO": "radioGroup",
"DROPDOWN": "list",
"DATE": "dateSigned",
"ATTACHMENT": "signerAttachment"
}
```
(Full mapping table: see `field-mapping.md`)
#### 3. Coordinate Mapping
- Normalize to PDF points
- Account for page rotation
- Transform coordinate origin if needed
- Validate field overlap after placement
#### 4. DocuSign Payload Fields
The tool must populate:
- Template name and description
- Email subject and message defaults
- Envelope/template documents (with document checksums)
- Template roles with routing order
- Tabs grouped by recipient
- Reminder and expiration settings where supported
---
### Unsupported / Flagged Features (Manual Review Required)
- Conditional recipient routing rules
- Advanced workflow branching
- Calculated fields
- Custom JavaScript validators
- Niche authentication methods (e.g., KBA, phone auth)
- Field validations with no direct DocuSign equivalent
- Webhook / event associations tied to template lifecycle
---
### Migration Options (API)
`POST /api/migrate` accepts:
```json
{
"sourceTemplateIds": ["tpl_1001", "tpl_1002"],
"targetFolder": "Migrated Templates",
"options": {
"overwriteIfExists": false,
"dryRun": true,
"includeDocuments": true
}
}
```
- **dryRun** — validate and report without creating DocuSign templates
- **overwriteIfExists** — when `false`, skip templates already migrated (default: false)
- **includeDocuments** — embed PDFs in DocuSign template (default: true)
- **targetFolder** — DocuSign folder for created templates
---
### Validation Layer
Pre-migration checks (blockers and warnings):
- Field count before vs. after mapping
- Recipient count and routing order integrity
- Fields missing role assignments
- Unsupported feature detection
- Document checksum comparison (before upload vs. after download confirmation)
Post-migration checks:
- DocuSign template field count vs. normalized schema count
- Recipient role count match
- Migration report includes pass/warn/fail per template
---
### Implementation Considerations
#### Authentication
- OAuth for both Adobe Sign and DocuSign (with token auto-refresh)
- Support admin-consent flows where required
- Securely store tokens (never in logs or plaintext files)
#### Rate Limits
- Batch API requests carefully
- Retries with exponential backoff on 429/5xx
- Use idempotency (upsert pattern) where possible
#### File Handling
- Preserve original PDFs locally in `downloads/`
- Checksum documents before and after upload
- Keep document-page metadata for accurate tab placement
#### Security
- Redact secrets and tokens from all log output
- Encrypt token storage where possible
- Maintain audit trail for all migration operations (template ID, timestamp, status, user)
---
### MVP Feature Set (Phase 1)
- Authenticate to both systems (CLI + Web)
- List and select Adobe Sign templates
- Migrate basic templates (standard roles + common fields)
- Normalized intermediate schema as pipeline bridge
- Validation layer (field/recipient counts, missing roles)
- Migration report (success / warning / error per template)
- Dry-run mode
- Idempotent re-runs (overwrite prevention)
### Phase 2 Features
- Batch migration (multiple templates in one request)
- Retry failed templates
- Coordinate validation preview
- Duplicate detection
- Folder / category mapping
- Audit logging
- Rate limit handling with backoff
### Phase 3 Features
- UI preview for field placements
- Manual correction workflow
- Side-by-side template comparison (visual diff)
- Webhook recreation
- Advanced workflow translation
---
### Out of Scope (MVP)
- Agreement instance migration (templates only)
- Custom integrations outside API surface - Custom integrations outside API surface
- Real-time collaborative editing
--- ---
*Last updated: 2026-04-14 (scaffolded by Cleo)* *Updated: 2026-04-21 (Blueprint alignment — added normalized schema, validation layer, migration options, security/rate-limit requirements, Phase 2/3 feature set, architecture detail)*

View File

@ -135,7 +135,7 @@ Then open [http://localhost:8000](http://localhost:8000) in your browser.
## Running tests ## Running tests
```bash ```bash
pytest tests/ -v # full suite (29 tests) pytest tests/ -v # full suite (108 tests)
pytest tests/test_regression.py -v # compose regression only pytest tests/test_regression.py -v # compose regression only
pytest tests/test_regression.py --update-snapshots # regenerate snapshots after intentional changes pytest tests/test_regression.py --update-snapshots # regenerate snapshots after intentional changes
``` ```
@ -154,10 +154,82 @@ unexpected API behaviors, and the fixes applied.
--- ---
## Migration API options
`POST /api/migrate` accepts extended options (blueprint-aligned):
```json
{
"source_template_ids": ["tpl_001", "tpl_002"],
"target_folder": "Migrated Templates",
"options": {
"dry_run": false,
"overwrite_if_exists": false,
"include_documents": true
}
}
```
| Option | Default | Description |
|---|---|---|
| `dry_run` | `false` | Validate and compose without creating DocuSign templates |
| `overwrite_if_exists` | `false` | If `false`, skip templates that already exist in DocuSign |
| `include_documents` | `true` | Embed PDFs in the DocuSign template |
**Batch migration** (`POST /api/migrate/batch`) runs the same pipeline for multiple templates as a background job:
```bash
# Start batch
curl -X POST /api/migrate/batch -d '{"source_template_ids": ["id1", "id2"]}'
# → {"job_id": "abc-123", "status": "queued"}
# Poll status
curl /api/migrate/batch/abc-123
# → {"status": "running", "progress": {"completed": 1, "total": 2}, ...}
```
---
## Normalized intermediate schema
The migration pipeline uses a platform-agnostic `NormalizedTemplate` model as a bridge between Adobe Sign and DocuSign. This decouples extraction from composition and enables the validation layer.
See `src/models/normalized_template.py` and `src/services/mapping_service.py`.
---
## Validation
Each template is validated before migration:
- **Blockers** (halt migration): no recipients, no documents
- **Warnings** (logged but continue): no signature fields, unassigned fields, unsupported features
Unsupported features flagged for manual review: conditional HIDE actions, JavaScript validators, calculated fields, webhook associations, niche authentication methods.
---
## Security
- `src/utils/log_sanitizer.py` — install `SanitizingFilter` to redact tokens, keys, and base64 PDF content from all log output
- PDF checksums (SHA-256) are computed and stored with each migration record
- Tokens are never written to logs; see `src/utils/log_sanitizer.py`
---
## Project structure ## Project structure
``` ```
src/ src/
models/
normalized_template.py # Platform-agnostic intermediate schema
services/
mapping_service.py # Adobe Sign → NormalizedTemplate converter
validation_service.py # Pre/post migration checks (blockers + warnings)
reports/
report_builder.py # Structured migration report per template
utils/
retry.py # Exponential backoff retry helpers
log_sanitizer.py # Secret redaction from logs
adobe_auth.py # One-time OAuth flow for Adobe Sign (CLI) adobe_auth.py # One-time OAuth flow for Adobe Sign (CLI)
adobe_api.py # Adobe Sign API client (auto token refresh) adobe_api.py # Adobe Sign API client (auto token refresh)
download_templates.py # List and download templates from Adobe Sign download_templates.py # List and download templates from Adobe Sign
@ -165,8 +237,6 @@ src/
docusign_auth.py # DocuSign JWT auth + one-time consent flow docusign_auth.py # DocuSign JWT auth + one-time consent flow
upload_docusign_template.py # Upsert upload: PUT if exists, POST if not upload_docusign_template.py # Upsert upload: PUT if exists, POST if not
migrate_template.py # End-to-end CLI runner (download → convert → upload) migrate_template.py # End-to-end CLI runner (download → convert → upload)
create_adobe_template.py # Utility: create a test template in Adobe Sign
generate_pdfs.py # Utility: generate sample PDFs for offline testing
web/ web/
app.py # FastAPI entrypoint (uvicorn web.app:app) app.py # FastAPI entrypoint (uvicorn web.app:app)
@ -175,13 +245,19 @@ web/
routers/ routers/
auth.py # Adobe Sign + DocuSign OAuth endpoints auth.py # Adobe Sign + DocuSign OAuth endpoints
templates.py # Template listing + migration status API templates.py # Template listing + migration status API
migrate.py # Migration trigger + history API migrate.py # Migration trigger, batch, + history API
static/ static/
index.html # Web UI (side-by-side browser + migrate flow) index.html # Web UI (side-by-side browser + migrate flow)
app.js # Vanilla JS frontend app.js # Vanilla JS frontend
style.css # Styles + status badge colours style.css # Styles + status badge colours
tests/ tests/
test_normalized_schema.py # Normalized model + mapping service tests
test_validation_service.py # Validation service + report builder tests
test_migration_options.py # dryRun, overwriteIfExists, includeDocuments
test_batch_migration.py # Batch migration API tests
test_retry.py # Retry with backoff utility tests
test_security.py # Log sanitization + PDF checksum tests
test_upload_upsert.py # Upsert logic unit tests test_upload_upsert.py # Upsert logic unit tests
test_api_health.py # Health endpoint test_api_health.py # Health endpoint
test_api_auth.py # OAuth endpoint tests test_api_auth.py # OAuth endpoint tests
@ -190,18 +266,12 @@ tests/
test_e2e.py # Full pipeline end-to-end test test_e2e.py # Full pipeline end-to-end test
test_regression.py # Compose output vs snapshots test_regression.py # Compose output vs snapshots
fixtures/expected/ # Regression snapshots (3 real templates) fixtures/expected/ # Regression snapshots (3 real templates)
FIELD-TYPE-REGRESSION.md # Manual field type regression checklist
PLATFORM-QUIRKS.md # Known API bugs and workarounds
downloads/ # Downloaded Adobe Sign templates (gitignored) downloads/ # Downloaded Adobe Sign templates (gitignored)
migration-output/ # Converted DocuSign template JSONs + history migration-output/ # Converted DocuSign template JSONs + history
sample-templates/ # JSON fixtures for offline testing
field-mapping.md # Field type mapping table + edge case log field-mapping.md # Field type mapping table + edge case log
CLAUDE.md # Claude Code instructions for this project PRODUCT-SPEC.md # Full product specification (blueprint-aligned)
docs/IMPLEMENTATION-PLAN.md # Feature design and test specifications
docs/agent-harness/ docs/agent-harness/
EXECUTION-BOARD.md # Living kanban board EXECUTION-BOARD.md # Living kanban board
AGENT-INSTRUCTIONS.md # Definition of done + conventions
requirements.txt # Python dependencies requirements.txt # Python dependencies
``` ```

View File

@ -1,6 +1,6 @@
# Execution Board (Living Kanban) # Execution Board (Living Kanban)
*Last updated: 2026-04-17* *Last updated: 2026-04-21*
--- ---
@ -79,9 +79,80 @@
--- ---
## Phase 8 — Normalized Intermediate Schema ✅ (2026-04-21)
- [x] Create `src/models/` package with `__init__.py`
- [x] Implement `src/models/normalized_template.py` — pydantic model with NormalizedTemplate, NormalizedField, NormalizedRole, NormalizedDocument
- [x] Implement `src/services/` package with `__init__.py`
- [x] Implement `src/services/mapping_service.py` — Adobe Sign folder → NormalizedTemplate converter with checksums
- [x] Write `tests/test_normalized_schema.py` — 13 tests passing (model construction, serialization, real fixture round-trips)
- [x] Update README
---
## Phase 9 — Validation Service ✅ (2026-04-21)
- [x] Implement `src/services/validation_service.py``ValidationResult(blockers, warnings)`, checks for no recipients, no documents, no fields, missing roles, unsupported features
- [x] Implement `src/reports/report_builder.py``MigrationReport`, `TemplateReport`, `MigrationStatus` enum, factory functions
- [x] Integrate validation into migration pipeline (`_run_validation` in `web/routers/migrate.py`) — blocks on blockers
- [x] Implement `compare_field_counts(normalized, ds_template)` post-migration check
- [x] Write `tests/test_validation_service.py` — 20 tests passing
- [x] Update README
---
## Phase 10 — Migration Options API ✅ (2026-04-21)
- [x] Extend `POST /api/migrate` request body: `source_template_ids[]`, `target_folder`, `options.dry_run`, `options.overwrite_if_exists`, `options.include_documents`
- [x] Implement dry-run path — validate + compose without creating DocuSign templates (`status=dry_run`)
- [x] Implement `overwrite_if_exists=false` — skip already-migrated templates (`status=skipped`)
- [x] Implement `include_documents` toggle — strips `documentBase64` from payload when false
- [x] Keep backward compatibility with legacy `adobe_template_ids` field
- [x] Write `tests/test_migration_options.py` — 7 tests passing
- [x] Update README
---
## Phase 11 — Rate Limiting & Retry with Backoff ✅ (2026-04-21)
- [x] Implement `src/utils/retry.py``retry_with_backoff` (sync) and `async_retry_with_backoff` decorators with exponential backoff + max_delay cap
- [x] Implement `check_response_retryable(status_code)` — returns True for 429/500/502/503/504
- [x] Write `tests/test_retry.py` — 14 tests passing (exponential delay, max delay, exception filtering, async)
- [x] Update README
---
## Phase 12 — Security Hardening & Audit Trail ✅ (2026-04-21)
- [x] Implement `src/utils/log_sanitizer.py``redact()`, `redact_dict()`, `SanitizingFilter`, `install_sanitizing_filter()`
- [x] Redacts: Bearer tokens, JWT-style tokens, key=value secret assignments, long base64 payloads (PDF content)
- [x] PDF checksum (SHA-256) computed in `mapping_service.adobe_folder_to_normalized()` and stored in `NormalizedDocument.checksum_sha256`
- [x] Write `tests/test_security.py` — 15 tests passing
- [x] Update README
---
## Phase 13 — Batch Migration API ✅ (2026-04-21)
- [x] Implement `POST /api/migrate/batch` — async background job, returns `job_id` immediately
- [x] Implement `GET /api/migrate/batch/{job_id}` — poll job status, progress, results, summary
- [x] Implement retry for failed templates (one retry on upload failures)
- [x] In-memory job store with progress tracking (`_batch_jobs` dict)
- [x] Write `tests/test_batch_migration.py` — 6 tests passing
- [x] Update README
---
## Full Test Suite ✅ (2026-04-21)
**108/108 tests passing**
---
## Gitea ## Gitea
- [x] Committed and pushed all changes (2026-04-17) - [x] Committed and pushed all changes (2026-04-17)
- [ ] Commit and push Phase 813 work (ui-redesign branch)
--- ---
@ -91,3 +162,5 @@
- (2026-04-15) Coordinate bug fixed — y is top-origin in both platforms, no conversion needed - (2026-04-15) Coordinate bug fixed — y is top-origin in both platforms, no conversion needed
- (2026-04-15) Paul Adobe Template created via API; Company/Title fields require manual UI fix (API limitation) - (2026-04-15) Paul Adobe Template created via API; Company/Title fields require manual UI fix (API limitation)
- (2026-04-17) v2 planning complete — idempotent upload + web UI implementation begins - (2026-04-17) v2 planning complete — idempotent upload + web UI implementation begins
- (2026-04-21) Blueprint comparison complete — added normalized schema, validation service, migration options, rate-limit/retry, security hardening, and batch migration phases (Phases 813)
- (2026-04-21) Phases 813 fully implemented — 108/108 tests passing on ui-redesign branch

0
src/models/__init__.py Normal file
View File

View File

@ -0,0 +1,78 @@
"""
normalized_template.py
-----------------------
Platform-agnostic intermediate schema that decouples Adobe Sign extraction
from DocuSign composition. Both platforms' data is converted to/from this
model so neither side is tightly coupled.
"""
from __future__ import annotations
from enum import Enum
from typing import Any, Optional
from pydantic import BaseModel, Field
class ActionType(str, Enum):
SIGN = "SIGN"
APPROVE = "APPROVE"
CC = "CC"
ACKNOWLEDGE = "ACKNOWLEDGE"
class NormalizedRole(BaseModel):
name: str
order: int
action_type: ActionType = ActionType.SIGN
class NormalizedField(BaseModel):
"""One form field in the normalized intermediate representation."""
type: str # e.g. "signature", "text", "checkbox"
label: str
page: int
x: float
y: float
width: float
height: float
required: bool = False
read_only: bool = False
role_name: str = "" # which role this field belongs to
options: list[str] = Field(default_factory=list) # for dropdown/radio
validation: str = "" # e.g. "DATE", "NUMBER"
content_type: str = "" # e.g. "SIGNATURE_DATE", "SIGNER_NAME"
conditional_parent_label: Optional[str] = None
conditional_parent_value: Optional[str] = None
raw: dict[str, Any] = Field(default_factory=dict) # original source data
class NormalizedDocument(BaseModel):
name: str
content_base64: str = "" # base64-encoded PDF bytes
checksum_sha256: str = "" # SHA-256 hex of raw bytes before encoding
source_path: str = ""
class NormalizedTemplate(BaseModel):
"""
Platform-agnostic representation of an eSignature template.
Used as the bridge between Adobe Sign and DocuSign.
"""
name: str
description: str = ""
email_subject: str = ""
email_message: str = ""
roles: list[NormalizedRole] = Field(default_factory=list)
documents: list[NormalizedDocument] = Field(default_factory=list)
fields: list[NormalizedField] = Field(default_factory=list)
reminder_enabled: bool = False
expiration_days: Optional[int] = None
source_id: str = "" # original Adobe Sign template ID
unsupported_features: list[str] = Field(default_factory=list)
def role_names(self) -> list[str]:
return [r.name for r in self.roles]
def fields_for_role(self, role_name: str) -> list[NormalizedField]:
return [f for f in self.fields if f.role_name == role_name]

0
src/reports/__init__.py Normal file
View File

View File

@ -0,0 +1,134 @@
"""
report_builder.py
-----------------
Builds structured migration reports per template and for batch runs.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
class MigrationStatus(str, Enum):
SUCCESS = "success"
SUCCESS_WITH_WARNINGS = "success_with_warnings"
SKIPPED = "skipped"
BLOCKED = "blocked"
ERROR = "error"
@dataclass
class TemplateReport:
template_name: str
source_id: str
status: MigrationStatus
docusign_template_id: str = ""
blockers: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
error: str = ""
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
dry_run: bool = False
def to_dict(self) -> dict:
return {
"template_name": self.template_name,
"source_id": self.source_id,
"status": self.status.value,
"docusign_template_id": self.docusign_template_id,
"blockers": self.blockers,
"warnings": self.warnings,
"error": self.error,
"timestamp": self.timestamp,
"dry_run": self.dry_run,
}
@dataclass
class MigrationReport:
reports: list[TemplateReport] = field(default_factory=list)
def add(self, report: TemplateReport) -> None:
self.reports.append(report)
def summary(self) -> dict:
counts: dict[str, int] = {}
for r in self.reports:
counts[r.status.value] = counts.get(r.status.value, 0) + 1
return {
"total": len(self.reports),
**counts,
}
def to_dict(self) -> dict:
return {
"summary": self.summary(),
"templates": [r.to_dict() for r in self.reports],
}
def to_json(self, indent: int = 2) -> str:
return json.dumps(self.to_dict(), indent=indent)
def has_errors(self) -> bool:
return any(r.status in (MigrationStatus.BLOCKED, MigrationStatus.ERROR) for r in self.reports)
def build_success_report(
template_name: str,
source_id: str,
docusign_template_id: str,
warnings: list[str],
dry_run: bool = False,
) -> TemplateReport:
status = MigrationStatus.SUCCESS_WITH_WARNINGS if warnings else MigrationStatus.SUCCESS
return TemplateReport(
template_name=template_name,
source_id=source_id,
status=status,
docusign_template_id=docusign_template_id,
warnings=warnings,
dry_run=dry_run,
)
def build_blocked_report(
template_name: str,
source_id: str,
blockers: list[str],
warnings: list[str],
dry_run: bool = False,
) -> TemplateReport:
return TemplateReport(
template_name=template_name,
source_id=source_id,
status=MigrationStatus.BLOCKED,
blockers=blockers,
warnings=warnings,
dry_run=dry_run,
)
def build_error_report(
template_name: str,
source_id: str,
error: str,
dry_run: bool = False,
) -> TemplateReport:
return TemplateReport(
template_name=template_name,
source_id=source_id,
status=MigrationStatus.ERROR,
error=error,
dry_run=dry_run,
)
def build_skipped_report(template_name: str, source_id: str, reason: str) -> TemplateReport:
return TemplateReport(
template_name=template_name,
source_id=source_id,
status=MigrationStatus.SKIPPED,
warnings=[f"Skipped: {reason}"],
)

0
src/services/__init__.py Normal file
View File

View File

@ -0,0 +1,275 @@
"""
mapping_service.py
-------------------
Converts a downloaded Adobe Sign template folder into a NormalizedTemplate.
Extracted from compose_docusign_template.py so the normalization step is
decoupled from DocuSign-specific composition.
"""
from __future__ import annotations
import hashlib
import base64
import json
from pathlib import Path
from src.models.normalized_template import (
ActionType,
NormalizedDocument,
NormalizedField,
NormalizedRole,
NormalizedTemplate,
)
MIN_TEXT_WIDTH = 120
# ---------------------------------------------------------------------------
# Adobe Sign → Normalized
# ---------------------------------------------------------------------------
_ROLE_ACTION_MAP = {
"SIGNER": ActionType.SIGN,
"SIGN": ActionType.SIGN,
"APPROVER": ActionType.APPROVE,
"APPROVE": ActionType.APPROVE,
"CC": ActionType.CC,
"SHARE": ActionType.CC,
"ACKNOWLEDGE": ActionType.ACKNOWLEDGE,
}
_UNSUPPORTED_FEATURES = [
("conditionalAction", "action", "HIDE", "Conditional HIDE actions"),
("inputType", None, "INLINE_IMAGE", "INLINE_IMAGE fields (no DocuSign equivalent)"),
("inputType", None, "PARTICIPATION_STAMP", "PARTICIPATION_STAMP fields (no DocuSign equivalent)"),
]
_UNSUPPORTED_INPUT_TYPES = {"INLINE_IMAGE", "PARTICIPATION_STAMP"}
def _detect_unsupported(fields: list[dict], metadata: dict) -> list[str]:
"""Return human-readable strings for features that cannot be fully migrated."""
found: list[str] = []
seen: set[str] = set()
def _add(msg: str):
if msg not in seen:
seen.add(msg)
found.append(msg)
for f in fields:
input_type = f.get("inputType", "")
if input_type in _UNSUPPORTED_INPUT_TYPES:
_add(f"Unsupported field type: {input_type}")
ca = f.get("conditionalAction", {})
if ca.get("action") == "HIDE":
_add("Conditional HIDE action (not supported in DocuSign)")
preds = ca.get("predicates", [])
for p in preds:
if p.get("operator") not in ("EQUALS", None, ""):
_add(f"Non-EQUALS conditional operator: {p.get('operator')} (only EQUALS supported)")
if p.get("operator") == "EQUALS":
break # first EQUALS is handled, only note if there are more
if len(preds) > 1:
_add("Multi-predicate conditional logic (only first EQUALS predicate is mapped)")
if f.get("inputType") == "STAMP":
_add("STAMP fields (require stamp feature enabled on DocuSign account)")
# Check for webhook / workflow triggers in metadata
if metadata.get("workflowId") or metadata.get("externalId"):
_add("Workflow / webhook associations (require manual recreation)")
return found
def _derive_roles(fields: list[dict], participant_sets: list[dict] | None = None) -> list[NormalizedRole]:
"""
Build ordered NormalizedRole list from participant_sets if available,
otherwise derive from field assignees.
"""
if participant_sets:
roles = []
for ps in sorted(participant_sets, key=lambda p: p.get("order", 0)):
name = ps.get("name") or f"Role {ps.get('order', 1)}"
order = ps.get("order", 1)
action_raw = (ps.get("role") or "SIGN").upper()
action = _ROLE_ACTION_MAP.get(action_raw, ActionType.SIGN)
roles.append(NormalizedRole(name=name, order=order, action_type=action))
if roles:
return roles
# Fall back: derive from field assignees
seen: dict[str, int] = {}
for f in fields:
assignee = f.get("assignee") or f"recipient{max(f.get('signerIndex', 0), 0)}"
if assignee not in seen:
try:
idx = int(assignee.replace("recipient", ""))
except ValueError:
idx = len(seen)
seen[assignee] = idx
if not seen:
return [NormalizedRole(name="Signer 1", order=1)]
return [
NormalizedRole(name=f"Signer {v + 1}", order=v + 1)
for _, v in sorted(seen.items(), key=lambda kv: kv[1])
]
def _assignee_to_role(assignee: str | None, roles: list[NormalizedRole]) -> str:
"""Map an Adobe field assignee string (e.g. 'recipient0') to a role name."""
if not assignee:
return roles[0].name if roles else "Signer 1"
try:
idx = int(assignee.replace("recipient", ""))
except ValueError:
return roles[0].name if roles else "Signer 1"
# roles are ordered 1-based
match = next((r for r in roles if r.order == idx + 1), None)
return match.name if match else (roles[0].name if roles else "Signer 1")
def _normalize_field(f: dict, role_name: str, warnings: list[str]) -> NormalizedField | None:
"""Convert a single Adobe Sign field dict to NormalizedField."""
input_type = f.get("inputType", "")
label = f.get("name", "unnamed")
locations = f.get("locations", [])
if not locations:
return None
loc = locations[0]
x = float(loc.get("left", 0))
y = float(loc.get("top", 0))
width = float(max(loc.get("width", MIN_TEXT_WIDTH), MIN_TEXT_WIDTH))
height = float(loc.get("height", 24))
page = int(loc.get("pageNumber", 1))
content_type = f.get("contentType", "")
validation = f.get("validation", "")
# Map Adobe input type to normalized type
type_map = {
"SIGNATURE": "signature",
"CHECKBOX": "checkbox",
"DROP_DOWN": "dropdown",
"RADIO": "radio",
"FILE_CHOOSER": "attachment",
"STAMP": "stamp",
"INLINE_IMAGE": "inline_image",
"PARTICIPATION_STAMP": "participation_stamp",
}
if input_type == "BLOCK" and content_type == "SIGNATURE_BLOCK":
norm_type = "signature"
elif input_type == "TEXT_FIELD":
norm_type = "text"
else:
norm_type = type_map.get(input_type, input_type.lower())
# Conditional logic
parent_label = None
parent_value = None
ca = f.get("conditionalAction", {})
predicates = ca.get("predicates", [])
if predicates and ca.get("action") == "SHOW":
pred = next((p for p in predicates if p.get("operator") == "EQUALS"), None)
if pred:
parent_label = pred.get("fieldName")
parent_value = pred.get("value")
options: list[str] = []
if input_type in ("DROP_DOWN", "RADIO"):
options = (f.get("hiddenOptions") or f.get("visibleOptions") or [])
return NormalizedField(
type=norm_type,
label=label,
page=page,
x=x,
y=y,
width=width,
height=height,
required=bool(f.get("required", False)),
read_only=bool(f.get("readOnly", False)),
role_name=role_name,
options=options,
validation=validation,
content_type=content_type,
conditional_parent_label=parent_label,
conditional_parent_value=parent_value,
raw=f,
)
def adobe_folder_to_normalized(
template_dir: str,
include_documents: bool = True,
) -> tuple[NormalizedTemplate, list[str]]:
"""
Build a NormalizedTemplate from a downloaded Adobe Sign template folder.
Args:
template_dir: path to downloads/<template-name>/ with metadata.json,
form_fields.json, documents.json, and a PDF.
include_documents: whether to embed PDF bytes.
Returns:
(NormalizedTemplate, warnings_list)
"""
template_dir = Path(template_dir)
warnings: list[str] = []
metadata = json.loads((template_dir / "metadata.json").read_text())
fields_data = json.loads((template_dir / "form_fields.json").read_text())
documents_data = json.loads((template_dir / "documents.json").read_text())
fields: list[dict] = fields_data.get("fields", [])
participant_sets = metadata.get("participantSetsInfo", None)
roles = _derive_roles(fields, participant_sets)
# Build normalized fields
normalized_fields: list[NormalizedField] = []
for f in fields:
assignee = f.get("assignee") or f"recipient{max(f.get('signerIndex', 0), 0)}"
role_name = _assignee_to_role(assignee, roles)
nf = _normalize_field(f, role_name, warnings)
if nf:
normalized_fields.append(nf)
# Document
pdf_files = [f for f in template_dir.iterdir() if f.is_file() and "json" not in f.name]
doc_info = documents_data.get("documents", [{}])[0]
doc_name = doc_info.get("name", "")
normalized_docs: list[NormalizedDocument] = []
if pdf_files:
pdf_path = pdf_files[0]
if not doc_name.lower().endswith(".pdf"):
doc_name = Path(doc_name).stem + ".pdf" if doc_name else pdf_path.name
pdf_bytes = pdf_path.read_bytes()
checksum = hashlib.sha256(pdf_bytes).hexdigest()
content_b64 = base64.b64encode(pdf_bytes).decode() if include_documents else ""
normalized_docs.append(NormalizedDocument(
name=doc_name,
content_base64=content_b64,
checksum_sha256=checksum,
source_path=str(pdf_path),
))
unsupported = _detect_unsupported(fields, metadata)
return NormalizedTemplate(
name=metadata.get("name", template_dir.name),
description=f"Migrated from Adobe Sign — original owner: {metadata.get('ownerEmail', '')}",
email_subject=metadata.get("emailSubject") or f"Please sign: {metadata.get('name', '')}",
email_message=metadata.get("message", ""),
roles=roles,
documents=normalized_docs,
fields=normalized_fields,
source_id=metadata.get("id", ""),
unsupported_features=unsupported,
), warnings

View File

@ -0,0 +1,133 @@
"""
validation_service.py
---------------------
Pre/post migration checks. Returns a ValidationResult with blockers
(which halt migration) and warnings (which are logged but don't block).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from src.models.normalized_template import NormalizedTemplate
@dataclass
class ValidationResult:
blockers: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
def has_blockers(self) -> bool:
return bool(self.blockers)
def is_ok(self) -> bool:
return not self.has_blockers()
def all_issues(self) -> list[str]:
return [f"BLOCKER: {b}" for b in self.blockers] + [f"WARNING: {w}" for w in self.warnings]
def validate_template(normalized: NormalizedTemplate) -> ValidationResult:
"""
Run all pre-migration checks on a NormalizedTemplate.
Returns a ValidationResult with blockers and warnings.
"""
result = ValidationResult()
_check_recipients(normalized, result)
_check_fields(normalized, result)
_check_role_assignments(normalized, result)
_check_documents(normalized, result)
_flag_unsupported(normalized, result)
return result
def _check_recipients(t: NormalizedTemplate, r: ValidationResult) -> None:
if not t.roles:
r.blockers.append("No recipients/roles defined — template cannot be migrated")
return
orders = [role.order for role in t.roles]
if len(orders) != len(set(orders)):
r.warnings.append("Duplicate routing orders detected in recipient roles")
expected = list(range(1, len(orders) + 1))
if sorted(orders) != expected:
r.warnings.append(
f"Non-sequential routing order: {sorted(orders)} — DocuSign expects {expected}"
)
def _check_fields(t: NormalizedTemplate, r: ValidationResult) -> None:
if not t.fields:
r.warnings.append("Template has 0 fields — the resulting DocuSign template will be empty")
return
sig_fields = [f for f in t.fields if f.type in ("signature", "initial")]
if not sig_fields:
r.warnings.append("No signature or initial fields found — signers will have nothing to sign")
def _check_role_assignments(t: NormalizedTemplate, r: ValidationResult) -> None:
role_names = {role.name for role in t.roles}
unassigned = [f.label for f in t.fields if f.role_name not in role_names]
if unassigned:
r.warnings.append(
f"{len(unassigned)} field(s) have role assignments that don't match any recipient: "
f"{unassigned[:5]}{'...' if len(unassigned) > 5 else ''}"
)
def _check_documents(t: NormalizedTemplate, r: ValidationResult) -> None:
if not t.documents:
r.blockers.append("No documents attached — at least one PDF is required")
return
for doc in t.documents:
if not doc.content_base64 and not doc.source_path:
r.warnings.append(f"Document '{doc.name}' has no content and no source path")
def _flag_unsupported(t: NormalizedTemplate, r: ValidationResult) -> None:
for feature in t.unsupported_features:
r.warnings.append(f"Unsupported feature (manual review needed): {feature}")
def compare_field_counts(
normalized: NormalizedTemplate,
docusign_template: dict,
) -> ValidationResult:
"""
Post-migration check: compare field count in NormalizedTemplate vs the
uploaded DocuSign template payload.
"""
result = ValidationResult()
expected = len(normalized.fields)
# Count tabs across all signers in the DS template payload
actual = 0
for signer in docusign_template.get("recipients", {}).get("signers", []):
tabs = signer.get("tabs", {})
for tab_list in tabs.values():
actual += len(tab_list)
if actual == 0 and expected > 0:
result.warnings.append(
f"DocuSign template has 0 tabs but {expected} fields were in the source"
)
elif abs(actual - expected) > 0:
result.warnings.append(
f"Field count mismatch: normalized={expected}, DocuSign tabs={actual} "
f"(some field types may expand or collapse during mapping)"
)
# Compare recipient counts
expected_roles = len(normalized.roles)
actual_signers = len(docusign_template.get("recipients", {}).get("signers", []))
if expected_roles != actual_signers:
result.warnings.append(
f"Recipient count mismatch: normalized={expected_roles}, DocuSign signers={actual_signers}"
)
return result

0
src/utils/__init__.py Normal file
View File

View File

@ -0,0 +1,98 @@
"""
log_sanitizer.py
----------------
Redacts secrets (tokens, keys, passwords) from log output so credentials
never appear in logs, stdout, or audit records.
"""
from __future__ import annotations
import logging
import re
from typing import Any
_REDACTED = "[REDACTED]"
# Patterns where group(1) is a safe label prefix and the rest is the secret.
# Result: group(1) + "[REDACTED]"
_LABEL_PATTERNS = [
# "Bearer <token>"
re.compile(r"(Bearer\s+)[A-Za-z0-9\-._~+/=]{8,}", re.IGNORECASE),
# key=value assignments for known secret keys
re.compile(
r"""((?:api[_\-]?key|access[_\-]?token|refresh[_\-]?token|client[_\-]?secret|password|private[_\-]?key|authorization)\s*[=:]\s*)["']?[A-Za-z0-9\-._~+/=!@#$%^&*]{6,}["']?""",
re.IGNORECASE,
),
]
# Patterns that fully match a secret — the entire match is replaced.
_FULL_SECRET_PATTERNS = [
# JWT-style tokens (three base64url segments separated by dots)
re.compile(r"\b[A-Za-z0-9\-_]{10,}\.[A-Za-z0-9\-_]{10,}\.[A-Za-z0-9\-_]{10,}\b"),
# Long base64 content (>500 chars) — PDF payloads, encoded keys, etc.
re.compile(r"[A-Za-z0-9+/]{500,}={0,2}"),
]
def redact(text: str) -> str:
"""Replace known secret patterns in *text* with [REDACTED]."""
for pattern in _LABEL_PATTERNS:
text = pattern.sub(lambda m: m.group(1) + _REDACTED, text)
for pattern in _FULL_SECRET_PATTERNS:
text = pattern.sub(_REDACTED, text)
return text
def redact_dict(data: dict, depth: int = 0) -> dict:
"""Recursively redact secret values in a dict (for logging structured data)."""
if depth > 10:
return data
_SECRET_KEYS = {
"access_token", "refresh_token", "token", "secret", "password",
"authorization", "api_key", "private_key", "client_secret",
"documentbase64",
}
result = {}
for k, v in data.items():
if k.lower().replace("-", "_") in _SECRET_KEYS:
result[k] = _REDACTED
elif isinstance(v, dict):
result[k] = redact_dict(v, depth + 1)
elif isinstance(v, list):
result[k] = [redact_dict(i, depth + 1) if isinstance(i, dict) else i for i in v]
elif isinstance(v, str) and len(v) > 100:
result[k] = redact(v)
else:
result[k] = v
return result
class SanitizingFilter(logging.Filter):
"""
A logging.Filter that runs redact() on every log record's message.
Attach to any logger or handler to ensure secrets never hit log output.
Usage:
logging.root.addFilter(SanitizingFilter())
"""
def filter(self, record: logging.LogRecord) -> bool:
try:
record.msg = redact(str(record.msg))
if record.args:
if isinstance(record.args, dict):
record.args = {k: redact(str(v)) for k, v in record.args.items()}
else:
record.args = tuple(redact(str(a)) for a in record.args)
except Exception:
pass
return True
def install_sanitizing_filter() -> None:
"""Install the SanitizingFilter on the root logger (idempotent)."""
root = logging.getLogger()
for existing in root.filters:
if isinstance(existing, SanitizingFilter):
return
root.addFilter(SanitizingFilter())

102
src/utils/retry.py Normal file
View File

@ -0,0 +1,102 @@
"""
retry.py
--------
Exponential backoff retry helpers for API calls that may hit rate limits
or transient server errors (429, 502, 503, 504).
"""
from __future__ import annotations
import asyncio
import functools
import logging
import time
from typing import Callable, TypeVar
logger = logging.getLogger(__name__)
T = TypeVar("T")
# HTTP status codes that are safe to retry
_RETRYABLE_STATUS = {429, 500, 502, 503, 504}
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
retryable_exceptions: tuple = (Exception,),
):
"""
Decorator for sync functions. Retries on exceptions with exponential backoff.
Usage:
@retry_with_backoff(max_retries=3, base_delay=1.0)
def my_api_call():
...
"""
def decorator(fn: Callable) -> Callable:
@functools.wraps(fn)
def wrapper(*args, **kwargs):
last_exc: Exception | None = None
for attempt in range(max_retries + 1):
try:
return fn(*args, **kwargs)
except retryable_exceptions as exc:
last_exc = exc
if attempt == max_retries:
break
delay = min(base_delay * (2 ** attempt), max_delay)
logger.warning(
"Retry %d/%d for %s after %.1fs — %s",
attempt + 1, max_retries, fn.__name__, delay, exc,
)
time.sleep(delay)
raise last_exc
return wrapper
return decorator
def async_retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
retryable_exceptions: tuple = (Exception,),
):
"""
Decorator for async functions. Retries on exceptions with exponential backoff.
Usage:
@async_retry_with_backoff(max_retries=3, base_delay=1.0)
async def my_api_call():
...
"""
def decorator(fn: Callable) -> Callable:
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
last_exc: Exception | None = None
for attempt in range(max_retries + 1):
try:
return await fn(*args, **kwargs)
except retryable_exceptions as exc:
last_exc = exc
if attempt == max_retries:
break
delay = min(base_delay * (2 ** attempt), max_delay)
logger.warning(
"Async retry %d/%d for %s after %.1fs — %s",
attempt + 1, max_retries, fn.__name__, delay, exc,
)
await asyncio.sleep(delay)
raise last_exc
return wrapper
return decorator
class RateLimitError(Exception):
"""Raised when an API returns HTTP 429 Too Many Requests."""
def check_response_retryable(status_code: int) -> bool:
"""Return True if the HTTP status code warrants a retry."""
return status_code in _RETRYABLE_STATUS

View File

@ -142,7 +142,8 @@ def test_migrate_single_template_updates():
): ):
resp = client.post( resp = client.post(
"/api/migrate", "/api/migrate",
json={"adobe_template_ids": [ADOBE_ID]}, # overwrite_if_exists=True so the existing template is updated, not skipped
json={"adobe_template_ids": [ADOBE_ID], "options": {"overwrite_if_exists": True}},
cookies={_COOKIE_NAME: _full_session()}, cookies={_COOKIE_NAME: _full_session()},
) )

View File

@ -0,0 +1,155 @@
"""
Tests for Phase 13: batch migration API.
"""
import asyncio
import json
import os
from unittest.mock import patch
import pytest
import respx
import httpx
from fastapi.testclient import TestClient
from web.app import app
from web.session import _serializer, _COOKIE_NAME
import web.routers.migrate as migrate_module
client = TestClient(app, raise_server_exceptions=True)
ADOBE_BASE = "https://api.eu2.adobesign.com/api/rest/v6"
DS_BASE = "https://demo.docusign.net/restapi"
DS_ACCOUNT = "test-account-id"
TEMPLATE_NAME = "Batch Test Template"
DS_NEW_ID = "ds-batch-new-001"
def _full_session():
return _serializer.dumps({
"adobe_access_token": "adobe-tok",
"docusign_access_token": "ds-tok",
})
@pytest.fixture(autouse=True)
def patch_settings(monkeypatch):
import web.config as cfg
monkeypatch.setattr(cfg.settings, "docusign_account_id", DS_ACCOUNT)
monkeypatch.setattr(cfg.settings, "docusign_base_url", DS_BASE)
monkeypatch.setattr(cfg.settings, "adobe_sign_base_url", ADOBE_BASE)
@pytest.fixture(autouse=True)
def temp_history(tmp_path, monkeypatch):
history_path = str(tmp_path / ".history.json")
monkeypatch.setattr(migrate_module, "_HISTORY_FILE", history_path)
return history_path
@pytest.fixture(autouse=True)
def clear_batch_jobs():
"""Clear in-memory batch jobs between tests."""
migrate_module._batch_jobs.clear()
yield
migrate_module._batch_jobs.clear()
def _async_wrap(sync_fn):
async def wrapper(*args, **kwargs):
return sync_fn(*args, **kwargs)
return wrapper
def _mock_download(template_id, access_token, output_dir):
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "metadata.json"), "w") as f:
json.dump({"name": f"Template {template_id}", "id": template_id}, f)
with open(os.path.join(output_dir, "form_fields.json"), "w") as f:
json.dump({"fields": []}, f)
with open(os.path.join(output_dir, "documents.json"), "w") as f:
json.dump({"documents": []}, f)
return True
def _mock_compose(template_dir, output_path):
with open(output_path, "w") as f:
json.dump({"name": TEMPLATE_NAME}, f)
def _mock_validation_ok(download_dir):
return {"blockers": [], "warnings": [], "has_blockers": False}
class TestBatchMigrationPost:
def test_batch_requires_auth(self):
resp = client.post("/api/migrate/batch", json={"source_template_ids": ["id1"]}, cookies={})
assert resp.status_code == 401
def test_batch_no_ids_returns_400(self):
resp = client.post(
"/api/migrate/batch",
json={},
cookies={_COOKIE_NAME: _full_session()},
)
assert resp.status_code == 400
@respx.mock
def test_batch_returns_job_id(self):
"""POST /api/migrate/batch returns a job_id immediately."""
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_ok),
):
resp = client.post(
"/api/migrate/batch",
json={"source_template_ids": ["id1", "id2"]},
cookies={_COOKIE_NAME: _full_session()},
)
assert resp.status_code == 200
body = resp.json()
assert "job_id" in body
assert body["total"] == 2
assert body["status"] == "queued"
@respx.mock
def test_batch_job_status_endpoint(self):
"""GET /api/migrate/batch/{id} returns job state."""
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_ok),
):
post_resp = client.post(
"/api/migrate/batch",
json={"source_template_ids": ["id1"]},
cookies={_COOKIE_NAME: _full_session()},
)
job_id = post_resp.json()["job_id"]
get_resp = client.get(f"/api/migrate/batch/{job_id}")
assert get_resp.status_code == 200
assert get_resp.json()["job_id"] == job_id
def test_batch_unknown_job_returns_404(self):
resp = client.get("/api/migrate/batch/nonexistent-job-id")
assert resp.status_code == 404
@respx.mock
def test_batch_dry_run_option(self):
"""Dry run in batch: no uploads, all results are dry_run."""
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_ok),
):
resp = client.post(
"/api/migrate/batch",
json={"source_template_ids": ["id1"], "options": {"dry_run": True}},
cookies={_COOKIE_NAME: _full_session()},
)
assert resp.status_code == 200
assert resp.json()["status"] == "queued"

View File

@ -175,7 +175,8 @@ def test_full_migration_flow(temp_history):
): ):
migrate_resp2 = test_client.post( migrate_resp2 = test_client.post(
"/api/migrate", "/api/migrate",
json={"adobe_template_ids": [ADOBE_ID]}, # overwrite_if_exists=True so the second run updates the existing template
json={"adobe_template_ids": [ADOBE_ID], "options": {"overwrite_if_exists": True}},
cookies={_COOKIE_NAME: session_cookie}, cookies={_COOKIE_NAME: session_cookie},
) )

View File

@ -0,0 +1,234 @@
"""
Tests for Phase 10: migration options (dryRun, overwriteIfExists, includeDocuments).
"""
import json
import os
from unittest.mock import patch
import pytest
import respx
import httpx
from fastapi.testclient import TestClient
from web.app import app
from web.session import _serializer, _COOKIE_NAME
import web.routers.migrate as migrate_module
client = TestClient(app, raise_server_exceptions=True)
ADOBE_BASE = "https://api.eu2.adobesign.com/api/rest/v6"
DS_BASE = "https://demo.docusign.net/restapi"
DS_ACCOUNT = "test-account-id"
TEMPLATE_NAME = "Options Test Template"
ADOBE_ID = "opt-adobe-001"
DS_EXISTING_ID = "ds-existing-opt-001"
DS_NEW_ID = "ds-new-opt-001"
def _full_session():
return _serializer.dumps({
"adobe_access_token": "adobe-tok",
"docusign_access_token": "ds-tok",
})
@pytest.fixture(autouse=True)
def patch_settings(monkeypatch):
import web.config as cfg
monkeypatch.setattr(cfg.settings, "docusign_account_id", DS_ACCOUNT)
monkeypatch.setattr(cfg.settings, "docusign_base_url", DS_BASE)
monkeypatch.setattr(cfg.settings, "adobe_sign_base_url", ADOBE_BASE)
@pytest.fixture(autouse=True)
def temp_history(tmp_path, monkeypatch):
history_path = str(tmp_path / ".history.json")
monkeypatch.setattr(migrate_module, "_HISTORY_FILE", history_path)
return history_path
def _async_wrap(sync_fn):
async def wrapper(*args, **kwargs):
return sync_fn(*args, **kwargs)
return wrapper
def _mock_download(template_id, access_token, output_dir):
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "metadata.json"), "w") as f:
json.dump({"name": TEMPLATE_NAME, "id": template_id}, f)
with open(os.path.join(output_dir, "form_fields.json"), "w") as f:
json.dump({"fields": []}, f)
with open(os.path.join(output_dir, "documents.json"), "w") as f:
json.dump({"documents": []}, f)
return True
def _mock_compose(template_dir: str, output_path: str):
with open(output_path, "w") as f:
json.dump({"name": TEMPLATE_NAME, "description": "mocked"}, f)
def _mock_validation_ok(download_dir):
return {"blockers": [], "warnings": [], "has_blockers": False}
class TestDryRun:
@respx.mock
def test_dry_run_does_not_upload(self):
"""dry_run=True: compose succeeds but no POST/PUT to DocuSign."""
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_ok),
):
resp = client.post(
"/api/migrate",
json={
"source_template_ids": [ADOBE_ID],
"options": {"dry_run": True},
},
cookies={_COOKIE_NAME: _full_session()},
)
assert resp.status_code == 200
results = resp.json()["results"]
assert results[0]["status"] == "dry_run"
assert results[0]["action"] == "dry_run"
assert results[0]["docusign_template_id"] is None
assert results[0]["dry_run"] is True
@respx.mock
def test_dry_run_false_does_upload(self):
"""dry_run=False (default): upload proceeds."""
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={"envelopeTemplates": []})
)
respx.post(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(201, json={"templateId": DS_NEW_ID})
)
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_ok),
):
resp = client.post(
"/api/migrate",
json={"source_template_ids": [ADOBE_ID], "options": {"dry_run": False}},
cookies={_COOKIE_NAME: _full_session()},
)
assert resp.status_code == 200
assert resp.json()["results"][0]["status"] == "success"
class TestOverwriteIfExists:
@respx.mock
def test_skip_when_overwrite_false(self):
"""overwrite_if_exists=False + existing template → skipped."""
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={
"envelopeTemplates": [
{"templateId": DS_EXISTING_ID, "name": TEMPLATE_NAME, "lastModified": "2026-04-10T00:00:00Z"}
]
})
)
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_ok),
):
resp = client.post(
"/api/migrate",
json={"source_template_ids": [ADOBE_ID], "options": {"overwrite_if_exists": False}},
cookies={_COOKIE_NAME: _full_session()},
)
results = resp.json()["results"]
assert results[0]["status"] == "skipped"
assert results[0]["docusign_template_id"] == DS_EXISTING_ID
@respx.mock
def test_overwrite_when_true(self):
"""overwrite_if_exists=True + existing template → PUT update."""
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={
"envelopeTemplates": [
{"templateId": DS_EXISTING_ID, "name": TEMPLATE_NAME, "lastModified": "2026-04-10T00:00:00Z"}
]
})
)
respx.put(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates/{DS_EXISTING_ID}").mock(
return_value=httpx.Response(200, json={})
)
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_ok),
):
resp = client.post(
"/api/migrate",
json={"source_template_ids": [ADOBE_ID], "options": {"overwrite_if_exists": True}},
cookies={_COOKIE_NAME: _full_session()},
)
assert resp.json()["results"][0]["action"] == "updated"
class TestSourceTemplateIds:
@respx.mock
def test_source_template_ids_field(self):
"""source_template_ids (new field) works correctly."""
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={"envelopeTemplates": []})
)
respx.post(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(201, json={"templateId": DS_NEW_ID})
)
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_ok),
):
resp = client.post(
"/api/migrate",
json={"source_template_ids": [ADOBE_ID]},
cookies={_COOKIE_NAME: _full_session()},
)
assert resp.status_code == 200
assert resp.json()["results"][0]["status"] == "success"
def test_no_ids_returns_400(self):
resp = client.post(
"/api/migrate",
json={},
cookies={_COOKIE_NAME: _full_session()},
)
assert resp.status_code == 400
class TestValidationBlocking:
def test_blocked_template_not_uploaded(self):
"""Template with validation blockers → status=blocked, no upload."""
def _mock_validation_blocked(download_dir):
return {
"blockers": ["No documents attached"],
"warnings": [],
"has_blockers": True,
}
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_blocked),
):
resp = client.post(
"/api/migrate",
json={"source_template_ids": [ADOBE_ID]},
cookies={_COOKIE_NAME: _full_session()},
)
assert resp.status_code == 200
result = resp.json()["results"][0]
assert result["status"] == "blocked"
assert "No documents" in result["error"]

View File

@ -0,0 +1,139 @@
"""
Tests for Phase 8: normalized intermediate schema and mapping service.
"""
import json
from pathlib import Path
import pytest
from src.models.normalized_template import (
ActionType,
NormalizedDocument,
NormalizedField,
NormalizedRole,
NormalizedTemplate,
)
from src.services.mapping_service import adobe_folder_to_normalized
DOWNLOADS = Path(__file__).parent.parent / "downloads"
DAVID_DIR = DOWNLOADS / "David Tag Demo Form__CBJCHBCA"
NDA_DIR = DOWNLOADS / "_DEMO USE ONLY_ NDA__CBJCHBCA"
ROB_DIR = DOWNLOADS / "Rob Test__CBJCHBCA"
# ---------------------------------------------------------------------------
# Model construction
# ---------------------------------------------------------------------------
class TestNormalizedModels:
def test_normalized_role_defaults(self):
r = NormalizedRole(name="Customer", order=1)
assert r.action_type == ActionType.SIGN
assert r.order == 1
def test_normalized_field_defaults(self):
f = NormalizedField(type="text", label="Name", page=1, x=10, y=20, width=120, height=24)
assert f.required is False
assert f.read_only is False
assert f.options == []
assert f.conditional_parent_label is None
def test_normalized_template_construction(self):
t = NormalizedTemplate(
name="My Template",
roles=[NormalizedRole(name="Signer 1", order=1)],
fields=[
NormalizedField(type="signature", label="sig1", page=1, x=0, y=0, width=140, height=28)
],
)
assert t.name == "My Template"
assert len(t.roles) == 1
assert len(t.fields) == 1
def test_role_names(self):
t = NormalizedTemplate(
name="T",
roles=[
NormalizedRole(name="Customer", order=1),
NormalizedRole(name="Company", order=2),
],
)
assert t.role_names() == ["Customer", "Company"]
def test_fields_for_role(self):
t = NormalizedTemplate(
name="T",
roles=[NormalizedRole(name="Signer 1", order=1)],
fields=[
NormalizedField(type="signature", label="s1", page=1, x=0, y=0, width=140, height=28, role_name="Signer 1"),
NormalizedField(type="text", label="name", page=1, x=0, y=50, width=120, height=24, role_name="Signer 2"),
],
)
assert len(t.fields_for_role("Signer 1")) == 1
assert len(t.fields_for_role("Signer 2")) == 1
assert len(t.fields_for_role("Nobody")) == 0
def test_normalized_document_checksum(self):
doc = NormalizedDocument(
name="test.pdf",
content_base64="dGVzdA==",
checksum_sha256="9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08",
)
assert doc.checksum_sha256 != ""
def test_serialization_roundtrip(self):
t = NormalizedTemplate(
name="Round Trip",
roles=[NormalizedRole(name="Signer 1", order=1)],
)
dumped = t.model_dump()
restored = NormalizedTemplate(**dumped)
assert restored.name == t.name
assert len(restored.roles) == 1
# ---------------------------------------------------------------------------
# Mapping service — requires real download fixtures
# ---------------------------------------------------------------------------
@pytest.mark.skipif(not DAVID_DIR.exists(), reason="Downloads fixtures not present")
class TestMappingService:
def test_david_template_normalizes(self):
norm, warnings = adobe_folder_to_normalized(str(DAVID_DIR))
assert isinstance(norm, NormalizedTemplate)
assert norm.name != ""
assert len(norm.roles) >= 1
assert len(norm.fields) > 0
def test_david_fields_have_roles(self):
norm, _ = adobe_folder_to_normalized(str(DAVID_DIR))
role_names = norm.role_names()
for f in norm.fields:
assert f.role_name in role_names, f"Field '{f.label}' has unresolved role '{f.role_name}'"
def test_david_documents_have_checksum(self):
norm, _ = adobe_folder_to_normalized(str(DAVID_DIR))
assert len(norm.documents) >= 1
for doc in norm.documents:
assert doc.checksum_sha256 != "", f"Document '{doc.name}' missing checksum"
assert len(doc.checksum_sha256) == 64 # SHA-256 hex
def test_exclude_documents_option(self):
norm, _ = adobe_folder_to_normalized(str(DAVID_DIR), include_documents=False)
for doc in norm.documents:
assert doc.content_base64 == ""
# checksum still computed even when content excluded
assert doc.checksum_sha256 != ""
@pytest.mark.skipif(not NDA_DIR.exists(), reason="NDA fixture not present")
def test_nda_template_normalizes(self):
norm, _ = adobe_folder_to_normalized(str(NDA_DIR))
assert norm.name != ""
assert len(norm.fields) > 0
@pytest.mark.skipif(not ROB_DIR.exists(), reason="Rob fixture not present")
def test_rob_template_normalizes(self):
norm, _ = adobe_folder_to_normalized(str(ROB_DIR))
assert norm.name != ""

152
tests/test_retry.py Normal file
View File

@ -0,0 +1,152 @@
"""
Tests for Phase 11: retry with backoff utility.
"""
import asyncio
import time
from unittest.mock import MagicMock, patch
import pytest
from src.utils.retry import (
RateLimitError,
async_retry_with_backoff,
check_response_retryable,
retry_with_backoff,
)
class TestRetryWithBackoff:
def test_success_on_first_try(self):
call_count = {"n": 0}
@retry_with_backoff(max_retries=3, base_delay=0.01)
def fn():
call_count["n"] += 1
return "ok"
result = fn()
assert result == "ok"
assert call_count["n"] == 1
def test_retries_on_exception(self):
call_count = {"n": 0}
@retry_with_backoff(max_retries=2, base_delay=0.01)
def fn():
call_count["n"] += 1
if call_count["n"] < 3:
raise ConnectionError("transient")
return "ok"
with patch("src.utils.retry.time.sleep"):
result = fn()
assert result == "ok"
assert call_count["n"] == 3
def test_raises_after_max_retries(self):
@retry_with_backoff(max_retries=2, base_delay=0.01)
def fn():
raise ConnectionError("always fails")
with patch("src.utils.retry.time.sleep"):
with pytest.raises(ConnectionError):
fn()
def test_exponential_delay(self):
sleeps = []
@retry_with_backoff(max_retries=3, base_delay=1.0)
def fn():
raise ValueError("fail")
with patch("src.utils.retry.time.sleep", side_effect=lambda d: sleeps.append(d)):
with pytest.raises(ValueError):
fn()
assert len(sleeps) == 3
assert sleeps[0] == 1.0
assert sleeps[1] == 2.0
assert sleeps[2] == 4.0
def test_max_delay_capped(self):
sleeps = []
@retry_with_backoff(max_retries=5, base_delay=10.0, max_delay=15.0)
def fn():
raise ValueError("fail")
with patch("src.utils.retry.time.sleep", side_effect=lambda d: sleeps.append(d)):
with pytest.raises(ValueError):
fn()
assert all(d <= 15.0 for d in sleeps)
def test_only_retries_specified_exceptions(self):
call_count = {"n": 0}
@retry_with_backoff(max_retries=3, base_delay=0.01, retryable_exceptions=(ConnectionError,))
def fn():
call_count["n"] += 1
raise ValueError("not retryable")
with pytest.raises(ValueError):
fn()
assert call_count["n"] == 1 # no retries for ValueError
class TestAsyncRetryWithBackoff:
def test_async_success_on_first_try(self):
call_count = {"n": 0}
@async_retry_with_backoff(max_retries=3, base_delay=0.01)
async def fn():
call_count["n"] += 1
return "ok"
result = asyncio.get_event_loop().run_until_complete(fn())
assert result == "ok"
assert call_count["n"] == 1
def test_async_retries_on_exception(self):
call_count = {"n": 0}
@async_retry_with_backoff(max_retries=2, base_delay=0.01)
async def fn():
call_count["n"] += 1
if call_count["n"] < 3:
raise ConnectionError("transient")
return "ok"
with patch("src.utils.retry.asyncio.sleep", new=asyncio.coroutine(lambda d: None)):
result = asyncio.get_event_loop().run_until_complete(fn())
assert result == "ok"
def test_async_raises_after_max_retries(self):
@async_retry_with_backoff(max_retries=1, base_delay=0.01)
async def fn():
raise ConnectionError("always fails")
with patch("src.utils.retry.asyncio.sleep", new=asyncio.coroutine(lambda d: None)):
with pytest.raises(ConnectionError):
asyncio.get_event_loop().run_until_complete(fn())
class TestCheckResponseRetryable:
def test_429_is_retryable(self):
assert check_response_retryable(429) is True
def test_503_is_retryable(self):
assert check_response_retryable(503) is True
def test_200_not_retryable(self):
assert check_response_retryable(200) is False
def test_400_not_retryable(self):
assert check_response_retryable(400) is False
def test_404_not_retryable(self):
assert check_response_retryable(404) is False

138
tests/test_security.py Normal file
View File

@ -0,0 +1,138 @@
"""
Tests for Phase 12: security log sanitization and audit trail.
"""
import hashlib
import json
import logging
import pytest
from src.utils.log_sanitizer import (
SanitizingFilter,
install_sanitizing_filter,
redact,
redact_dict,
)
class TestRedact:
def test_bearer_token_redacted(self):
text = "Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.abc.def"
result = redact(text)
assert "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9" not in result
assert "[REDACTED]" in result
def test_access_token_assignment_redacted(self):
text = 'access_token: "super_secret_value_12345"'
result = redact(text)
assert "super_secret_value_12345" not in result
assert "[REDACTED]" in result
def test_password_redacted(self):
text = "password=hunter2supersecure"
result = redact(text)
assert "hunter2supersecure" not in result
def test_safe_text_unchanged(self):
text = "Template migrated successfully: NDA v2"
result = redact(text)
assert result == text
def test_long_base64_redacted(self):
# Simulate a long PDF base64 payload being logged
b64 = "A" * 600
result = redact(b64)
assert "A" * 100 not in result
assert "[REDACTED]" in result
def test_short_base64_not_redacted(self):
# Short base64 (e.g. an ID) should not be redacted
short_b64 = "dGVzdA==" # "test" base64
result = redact(short_b64)
assert "dGVzdA" in result
class TestRedactDict:
def test_token_key_redacted(self):
d = {"access_token": "secret123", "name": "My Template"}
result = redact_dict(d)
assert result["access_token"] == "[REDACTED]"
assert result["name"] == "My Template"
def test_nested_dict_redacted(self):
d = {"auth": {"token": "secret123", "user": "alice"}}
result = redact_dict(d)
assert result["auth"]["token"] == "[REDACTED]"
assert result["auth"]["user"] == "alice"
def test_document_base64_redacted(self):
d = {"documentBase64": "A" * 200}
result = redact_dict(d)
assert result["documentBase64"] == "[REDACTED]"
def test_list_of_dicts_redacted(self):
d = {"items": [{"token": "abc123xyz", "id": "1"}]}
result = redact_dict(d)
assert result["items"][0]["token"] == "[REDACTED]"
assert result["items"][0]["id"] == "1"
def test_safe_dict_unchanged(self):
d = {"template_name": "NDA", "status": "success", "count": 3}
result = redact_dict(d)
assert result == d
class TestSanitizingFilter:
def test_filter_redacts_log_message(self):
record = logging.LogRecord(
name="test", level=logging.INFO,
pathname="", lineno=0,
msg="Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.payload.signature",
args=(), exc_info=None,
)
f = SanitizingFilter()
f.filter(record)
assert "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9" not in record.msg
def test_filter_redacts_args(self):
record = logging.LogRecord(
name="test", level=logging.INFO,
pathname="", lineno=0,
msg="Token: %s",
args=("access_token=supersecretvalue123456",),
exc_info=None,
)
f = SanitizingFilter()
f.filter(record)
assert "supersecretvalue123456" not in str(record.args)
def test_install_sanitizing_filter_idempotent(self):
install_sanitizing_filter()
install_sanitizing_filter() # second call should not add duplicate
root = logging.getLogger()
sanitizing_filters = [f for f in root.filters if isinstance(f, SanitizingFilter)]
assert len(sanitizing_filters) == 1
# Clean up
for f in sanitizing_filters:
root.removeFilter(f)
class TestPdfChecksum:
def test_checksum_matches_content(self):
from src.services.mapping_service import adobe_folder_to_normalized
from pathlib import Path
downloads = Path(__file__).parent.parent / "downloads" / "David Tag Demo Form__CBJCHBCA"
if not downloads.exists():
pytest.skip("Downloads fixtures not present")
norm, _ = adobe_folder_to_normalized(str(downloads))
assert norm.documents, "Expected at least one document"
doc = norm.documents[0]
# Recompute checksum from source path to verify
import base64
pdf_bytes = Path(doc.source_path).read_bytes()
expected_checksum = hashlib.sha256(pdf_bytes).hexdigest()
assert doc.checksum_sha256 == expected_checksum

View File

@ -0,0 +1,181 @@
"""
Tests for Phase 9: validation service.
"""
import pytest
from src.models.normalized_template import (
NormalizedDocument,
NormalizedField,
NormalizedRole,
NormalizedTemplate,
)
from src.services.validation_service import (
ValidationResult,
compare_field_counts,
validate_template,
)
from src.reports.report_builder import (
MigrationReport,
MigrationStatus,
build_blocked_report,
build_error_report,
build_skipped_report,
build_success_report,
)
def _make_template(**kwargs) -> NormalizedTemplate:
defaults = dict(
name="Test Template",
roles=[NormalizedRole(name="Signer 1", order=1)],
fields=[
NormalizedField(
type="signature", label="sig1", page=1,
x=100, y=500, width=140, height=28,
role_name="Signer 1",
)
],
documents=[NormalizedDocument(name="test.pdf", checksum_sha256="abc", source_path="/fake.pdf")],
)
defaults.update(kwargs)
return NormalizedTemplate(**defaults)
class TestValidationService:
def test_valid_template_passes(self):
t = _make_template()
result = validate_template(t)
assert result.is_ok()
assert result.blockers == []
def test_no_recipients_is_blocker(self):
t = _make_template(roles=[])
result = validate_template(t)
assert result.has_blockers()
assert any("recipient" in b.lower() or "role" in b.lower() for b in result.blockers)
def test_no_documents_is_blocker(self):
t = _make_template(documents=[])
result = validate_template(t)
assert result.has_blockers()
assert any("document" in b.lower() for b in result.blockers)
def test_no_fields_is_warning(self):
t = _make_template(fields=[])
result = validate_template(t)
assert result.is_ok() # not a blocker
assert any("0 field" in w or "empty" in w.lower() for w in result.warnings)
def test_no_signature_field_is_warning(self):
t = _make_template(fields=[
NormalizedField(type="text", label="name", page=1, x=0, y=0, width=120, height=24, role_name="Signer 1")
])
result = validate_template(t)
assert result.is_ok()
assert any("signature" in w.lower() for w in result.warnings)
def test_field_with_unknown_role_is_warning(self):
t = _make_template(fields=[
NormalizedField(
type="signature", label="sig1", page=1, x=0, y=0,
width=140, height=28, role_name="NonExistentRole"
)
])
result = validate_template(t)
assert result.is_ok()
assert any("role" in w.lower() or "assign" in w.lower() for w in result.warnings)
def test_unsupported_features_become_warnings(self):
t = _make_template(unsupported_features=["Conditional HIDE action", "Webhook associations"])
result = validate_template(t)
assert result.is_ok()
assert len([w for w in result.warnings if "Unsupported" in w or "manual" in w.lower()]) >= 2
def test_validation_result_all_issues(self):
r = ValidationResult(blockers=["blocker1"], warnings=["warn1"])
issues = r.all_issues()
assert any("BLOCKER" in i for i in issues)
assert any("WARNING" in i for i in issues)
class TestCompareFieldCounts:
def test_matching_counts_no_warnings(self):
t = _make_template(fields=[
NormalizedField(type="signature", label="sig1", page=1, x=0, y=0, width=140, height=28, role_name="Signer 1")
])
ds = {
"recipients": {
"signers": [{"tabs": {"signHereTabs": [{"tabLabel": "sig1"}]}}]
}
}
result = compare_field_counts(t, ds)
assert result.is_ok()
def test_mismatched_counts_warns(self):
t = _make_template(fields=[
NormalizedField(type="signature", label="s1", page=1, x=0, y=0, width=140, height=28, role_name="Signer 1"),
NormalizedField(type="text", label="t1", page=1, x=0, y=50, width=120, height=24, role_name="Signer 1"),
])
ds = {"recipients": {"signers": [{"tabs": {"signHereTabs": [{}]}}]}}
result = compare_field_counts(t, ds)
assert any("mismatch" in w.lower() or "count" in w.lower() for w in result.warnings)
def test_zero_tabs_with_fields_warns(self):
t = _make_template()
ds = {"recipients": {"signers": []}}
result = compare_field_counts(t, ds)
assert result.warnings # should warn about 0 tabs
class TestReportBuilder:
def test_success_report(self):
r = build_success_report("My Template", "src_001", "ds_001", warnings=[])
assert r.status == MigrationStatus.SUCCESS
assert r.docusign_template_id == "ds_001"
def test_success_with_warnings(self):
r = build_success_report("My Template", "src_001", "ds_001", warnings=["some warning"])
assert r.status == MigrationStatus.SUCCESS_WITH_WARNINGS
def test_blocked_report(self):
r = build_blocked_report("T", "id1", blockers=["no docs"], warnings=[])
assert r.status == MigrationStatus.BLOCKED
assert r.blockers == ["no docs"]
def test_error_report(self):
r = build_error_report("T", "id1", error="Connection refused")
assert r.status == MigrationStatus.ERROR
assert "Connection" in r.error
def test_skipped_report(self):
r = build_skipped_report("T", "id1", reason="already migrated")
assert r.status == MigrationStatus.SKIPPED
def test_migration_report_summary(self):
report = MigrationReport()
report.add(build_success_report("T1", "1", "ds1", []))
report.add(build_success_report("T2", "2", "ds2", ["warn"]))
report.add(build_error_report("T3", "3", "fail"))
summary = report.summary()
assert summary["total"] == 3
assert summary.get("success", 0) == 1
assert summary.get("error", 0) == 1
def test_report_to_dict(self):
report = MigrationReport()
report.add(build_success_report("T1", "1", "ds1", []))
d = report.to_dict()
assert "summary" in d
assert "templates" in d
assert d["templates"][0]["template_name"] == "T1"
def test_report_has_errors(self):
report = MigrationReport()
report.add(build_error_report("T", "1", "err"))
assert report.has_errors()
def test_report_no_errors(self):
report = MigrationReport()
report.add(build_success_report("T", "1", "ds1", []))
assert not report.has_errors()

View File

@ -4,6 +4,8 @@ web/routers/migrate.py
Migration trigger and history endpoints. Migration trigger and history endpoints.
POST /api/migrate run the pipeline for one or more Adobe template IDs POST /api/migrate run the pipeline for one or more Adobe template IDs
POST /api/migrate/batch batch migration with async progress tracking
GET /api/migrate/batch/{id} poll batch job status
GET /api/migrate/history return past migration records GET /api/migrate/history return past migration records
""" """
@ -12,8 +14,9 @@ import json
import os import os
import sys import sys
import tempfile import tempfile
import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import List, Optional from typing import Dict, List, Optional
import httpx import httpx
from fastapi import APIRouter, Request from fastapi import APIRouter, Request
@ -23,7 +26,6 @@ from pydantic import BaseModel
from web.config import settings from web.config import settings
from web.session import get_session from web.session import get_session
# Ensure src/ is on path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))
router = APIRouter() router = APIRouter()
@ -32,9 +34,26 @@ _HISTORY_FILE = os.path.join(
os.path.dirname(__file__), "..", "..", "migration-output", ".history.json" os.path.dirname(__file__), "..", "..", "migration-output", ".history.json"
) )
# In-memory batch job store (keyed by job_id)
_batch_jobs: Dict[str, dict] = {}
class MigrationOptions(BaseModel):
dry_run: bool = False
overwrite_if_exists: bool = False
include_documents: bool = True
class MigrateRequest(BaseModel): class MigrateRequest(BaseModel):
adobe_template_ids: List[str] # Primary API (blueprint-aligned)
source_template_ids: Optional[List[str]] = None
target_folder: Optional[str] = None
options: MigrationOptions = MigrationOptions()
# Legacy field kept for backward compatibility
adobe_template_ids: Optional[List[str]] = None
def resolved_ids(self) -> List[str]:
return self.source_template_ids or self.adobe_template_ids or []
def _load_history() -> list: def _load_history() -> list:
@ -51,10 +70,7 @@ def _save_history(records: list) -> None:
def _load_compose(): def _load_compose():
""" """Dynamically load compose_template from src/."""
Dynamically load and return the compose_template function from src/.
Isolated in its own function so tests can patch it without touching the file system.
"""
import importlib.util import importlib.util
spec = importlib.util.spec_from_file_location( spec = importlib.util.spec_from_file_location(
"compose_docusign_template", "compose_docusign_template",
@ -71,21 +87,17 @@ async def _download_adobe_template(template_id: str, access_token: str, output_d
base = settings.adobe_sign_base_url base = settings.adobe_sign_base_url
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
# Metadata
meta_resp = await client.get(f"{base}/libraryDocuments/{template_id}", headers=headers) meta_resp = await client.get(f"{base}/libraryDocuments/{template_id}", headers=headers)
if not meta_resp.is_success: if not meta_resp.is_success:
return False return False
metadata = meta_resp.json() metadata = meta_resp.json()
# Form fields
fields_resp = await client.get(f"{base}/libraryDocuments/{template_id}/formFields", headers=headers) fields_resp = await client.get(f"{base}/libraryDocuments/{template_id}/formFields", headers=headers)
form_fields = fields_resp.json() if fields_resp.is_success else {"fields": []} form_fields = fields_resp.json() if fields_resp.is_success else {"fields": []}
# Documents list
docs_resp = await client.get(f"{base}/libraryDocuments/{template_id}/documents", headers=headers) docs_resp = await client.get(f"{base}/libraryDocuments/{template_id}/documents", headers=headers)
documents = docs_resp.json() if docs_resp.is_success else {"documents": []} documents = docs_resp.json() if docs_resp.is_success else {"documents": []}
# Download first PDF
doc_list = documents.get("documents", []) doc_list = documents.get("documents", [])
pdf_bytes = b"" pdf_bytes = b""
if doc_list: if doc_list:
@ -111,10 +123,27 @@ async def _download_adobe_template(template_id: str, access_token: str, output_d
return True return True
def _run_validation(download_dir: str) -> dict:
"""Run validation service on downloaded template, return summary."""
try:
from src.services.mapping_service import adobe_folder_to_normalized
from src.services.validation_service import validate_template
norm, _ = adobe_folder_to_normalized(download_dir)
result = validate_template(norm)
return {
"blockers": result.blockers,
"warnings": result.warnings,
"has_blockers": result.has_blockers(),
}
except Exception as exc:
return {"blockers": [], "warnings": [f"Validation skipped: {exc}"], "has_blockers": False}
async def _migrate_one( async def _migrate_one(
adobe_id: str, adobe_id: str,
adobe_access_token: str, adobe_access_token: str,
docusign_access_token: str, docusign_access_token: str,
options: MigrationOptions,
) -> dict: ) -> dict:
"""Run the full pipeline for one Adobe template. Returns a result record.""" """Run the full pipeline for one Adobe template. Returns a result record."""
timestamp = datetime.now(timezone.utc).isoformat() timestamp = datetime.now(timezone.utc).isoformat()
@ -134,14 +163,32 @@ async def _migrate_one(
"action": None, "action": None,
"status": "failed", "status": "failed",
"error": "Adobe Sign download failed", "error": "Adobe Sign download failed",
"warnings": [],
"blockers": [],
"dry_run": options.dry_run,
} }
# Read template name from metadata
with open(os.path.join(download_dir, "metadata.json")) as f: with open(os.path.join(download_dir, "metadata.json")) as f:
metadata = json.load(f) metadata = json.load(f)
template_name = metadata.get("name", adobe_id) template_name = metadata.get("name", adobe_id)
# 2. Compose DocuSign template JSON # 2. Validate
validation = _run_validation(download_dir)
if validation["has_blockers"]:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": None,
"action": "blocked",
"status": "blocked",
"error": f"Validation blockers: {'; '.join(validation['blockers'])}",
"warnings": validation["warnings"],
"blockers": validation["blockers"],
"dry_run": options.dry_run,
}
# 3. Compose
composed_file = os.path.join(tmpdir, "docusign-template.json") composed_file = os.path.join(tmpdir, "docusign-template.json")
try: try:
compose_fn = _load_compose() compose_fn = _load_compose()
@ -155,6 +202,9 @@ async def _migrate_one(
"action": None, "action": None,
"status": "failed", "status": "failed",
"error": f"Compose failed: {exc}", "error": f"Compose failed: {exc}",
"warnings": validation["warnings"],
"blockers": [],
"dry_run": options.dry_run,
} }
if not os.path.exists(composed_file): if not os.path.exists(composed_file):
return { return {
@ -165,12 +215,34 @@ async def _migrate_one(
"action": None, "action": None,
"status": "failed", "status": "failed",
"error": "Compose produced no output file", "error": "Compose produced no output file",
"warnings": validation["warnings"],
"blockers": [],
"dry_run": options.dry_run,
} }
# 3. Upload (upsert) to DocuSign using web session token # 4. Dry run — stop here, do not upload
if options.dry_run:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": None,
"action": "dry_run",
"status": "dry_run",
"error": None,
"warnings": validation["warnings"],
"blockers": [],
"dry_run": True,
}
# 5. Upload (upsert) to DocuSign
with open(composed_file) as f: with open(composed_file) as f:
template_json = json.load(f) template_json = json.load(f)
if not options.include_documents:
for doc in template_json.get("documents", []):
doc.pop("documentBase64", None)
ds_headers = { ds_headers = {
"Authorization": f"Bearer {docusign_access_token}", "Authorization": f"Bearer {docusign_access_token}",
"Content-Type": "application/json", "Content-Type": "application/json",
@ -179,7 +251,7 @@ async def _migrate_one(
list_url = f"{settings.docusign_base_url}/v2.1/accounts/{settings.docusign_account_id}/templates" list_url = f"{settings.docusign_base_url}/v2.1/accounts/{settings.docusign_account_id}/templates"
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
# Find existing # Duplicate detection
list_resp = await client.get( list_resp = await client.get(
list_url, headers=ds_headers, params={"search_text": template_name, "count": 100} list_url, headers=ds_headers, params={"search_text": template_name, "count": 100}
) )
@ -191,6 +263,21 @@ async def _migrate_one(
exact.sort(key=lambda t: t.get("lastModified", ""), reverse=True) exact.sort(key=lambda t: t.get("lastModified", ""), reverse=True)
existing_id = exact[0]["templateId"] existing_id = exact[0]["templateId"]
# Skip if already exists and overwrite is disabled
if existing_id and not options.overwrite_if_exists:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": existing_id,
"action": "skipped",
"status": "skipped",
"error": None,
"warnings": validation["warnings"] + ["Skipped: template already exists (overwrite_if_exists=false)"],
"blockers": [],
"dry_run": False,
}
if existing_id: if existing_id:
up_resp = await client.put( up_resp = await client.put(
f"{list_url}/{existing_id}", headers=ds_headers, json=template_json f"{list_url}/{existing_id}", headers=ds_headers, json=template_json
@ -211,6 +298,9 @@ async def _migrate_one(
"action": None, "action": None,
"status": "failed", "status": "failed",
"error": f"DocuSign upload failed ({up_resp.status_code}): {up_resp.text[:200]}", "error": f"DocuSign upload failed ({up_resp.status_code}): {up_resp.text[:200]}",
"warnings": validation["warnings"],
"blockers": [],
"dry_run": False,
} }
return { return {
@ -221,6 +311,9 @@ async def _migrate_one(
"action": action, "action": action,
"status": "success", "status": "success",
"error": None, "error": None,
"warnings": validation["warnings"],
"blockers": [],
"dry_run": False,
} }
@ -233,17 +326,21 @@ async def run_migration(body: MigrateRequest, request: Request):
if not session.get("docusign_access_token"): if not session.get("docusign_access_token"):
return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401) return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401)
ids = body.resolved_ids()
if not ids:
return JSONResponse({"error": "no template IDs provided"}, status_code=400)
tasks = [ tasks = [
_migrate_one( _migrate_one(
aid, aid,
session["adobe_access_token"], session["adobe_access_token"],
session["docusign_access_token"], session["docusign_access_token"],
body.options,
) )
for aid in body.adobe_template_ids for aid in ids
] ]
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
# Append to history
history = _load_history() history = _load_history()
history.extend(results) history.extend(results)
_save_history(history) _save_history(history)
@ -255,3 +352,101 @@ async def run_migration(body: MigrateRequest, request: Request):
def migration_history(): def migration_history():
"""Return all past migration records.""" """Return all past migration records."""
return {"history": _load_history()} return {"history": _load_history()}
# ---------------------------------------------------------------------------
# Batch migration
# ---------------------------------------------------------------------------
async def _run_batch_job(
job_id: str,
ids: List[str],
adobe_token: str,
ds_token: str,
options: MigrationOptions,
) -> None:
"""Background coroutine that processes a batch job and updates _batch_jobs."""
job = _batch_jobs[job_id]
job["status"] = "running"
results = []
for i, adobe_id in enumerate(ids):
job["progress"] = {"completed": i, "total": len(ids), "current_id": adobe_id}
result = await _migrate_one(adobe_id, adobe_token, ds_token, options)
# Retry once on transient failures (network errors, not validation blockers)
if result["status"] == "failed" and "upload failed" in (result.get("error") or ""):
result = await _migrate_one(adobe_id, adobe_token, ds_token, options)
if result["status"] != "failed":
result["retried"] = True
results.append(result)
job["results"] = results
# Persist to history
history = _load_history()
history.extend(results)
_save_history(history)
success = sum(1 for r in results if r["status"] == "success")
failed = sum(1 for r in results if r["status"] in ("failed", "blocked"))
skipped = sum(1 for r in results if r["status"] == "skipped")
dry_runs = sum(1 for r in results if r["status"] == "dry_run")
job["status"] = "completed"
job["progress"] = {"completed": len(ids), "total": len(ids), "current_id": None}
job["summary"] = {
"total": len(ids),
"success": success,
"failed": failed,
"skipped": skipped,
"dry_run": dry_runs,
}
@router.post("/batch")
async def run_batch_migration(body: MigrateRequest, request: Request):
"""
Start an async batch migration job. Returns a job_id immediately.
Poll GET /api/migrate/batch/{job_id} for status.
"""
session = get_session(request)
if not session.get("adobe_access_token"):
return JSONResponse({"error": "not authenticated to Adobe Sign"}, status_code=401)
if not session.get("docusign_access_token"):
return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401)
ids = body.resolved_ids()
if not ids:
return JSONResponse({"error": "no template IDs provided"}, status_code=400)
job_id = str(uuid.uuid4())
_batch_jobs[job_id] = {
"job_id": job_id,
"status": "queued",
"total": len(ids),
"results": [],
"progress": {"completed": 0, "total": len(ids), "current_id": None},
"summary": None,
"created_at": datetime.now(timezone.utc).isoformat(),
}
asyncio.create_task(
_run_batch_job(
job_id, ids,
session["adobe_access_token"],
session["docusign_access_token"],
body.options,
)
)
return {"job_id": job_id, "total": len(ids), "status": "queued"}
@router.get("/batch/{job_id}")
def get_batch_status(job_id: str):
"""Poll the status of a batch migration job."""
job = _batch_jobs.get(job_id)
if not job:
return JSONResponse({"error": "batch job not found"}, status_code=404)
return job