feat: Adobe Sign OAuth client and API wrapper

auth_adobe.py — one-time browser Auth Code Grant flow; saves access and
refresh tokens to .env. Targets the EU2 shard.

adobe_api.py — thin API client with auto token refresh on 401. Supports
GET, POST (JSON and multipart), PUT, and binary download.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Paul Huliganga 2026-04-15 19:44:43 -04:00
parent a1601009dc
commit 343955241d
2 changed files with 237 additions and 0 deletions

133
src/adobe_api.py Normal file
View File

@ -0,0 +1,133 @@
import os
import requests
from dotenv import load_dotenv, set_key
load_dotenv()
SHARD = "eu2"
TOKEN_URL = f"https://api.{SHARD}.adobesign.com/oauth/v2/token"
REDIRECT_URI = "https://localhost:8080/callback"
ENV_FILE = os.path.join(os.path.dirname(__file__), "..", ".env")
def _refresh_access_token():
client_id = os.getenv("ADOBE_CLIENT_ID")
client_secret = os.getenv("ADOBE_CLIENT_SECRET")
refresh_token = os.getenv("ADOBE_REFRESH_TOKEN")
if not all([client_id, client_secret, refresh_token]):
raise RuntimeError("Missing credentials for token refresh. Run src/auth_adobe.py first.")
data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": REDIRECT_URI,
}
resp = requests.post(TOKEN_URL, data=data)
resp.raise_for_status()
new_token = resp.json()["access_token"]
abs_env = os.path.abspath(ENV_FILE)
set_key(abs_env, "ADOBE_ACCESS_TOKEN", new_token)
os.environ["ADOBE_ACCESS_TOKEN"] = new_token
return new_token
def adobe_api_post_multipart(endpoint, files, data=None):
"""Upload a file via multipart/form-data (e.g. transient documents)."""
token = os.getenv("ADOBE_ACCESS_TOKEN")
base_url = os.getenv("ADOBE_SIGN_BASE_URL", f"https://api.{SHARD}.adobesign.com/api/rest/v6")
headers = {"Authorization": f"Bearer {token}"}
url = f"{base_url}/{endpoint}"
resp = requests.post(url, headers=headers, files=files, data=data or {})
if resp.status_code == 401:
token = _refresh_access_token()
headers["Authorization"] = f"Bearer {token}"
resp = requests.post(url, headers=headers, files=files, data=data or {})
resp.raise_for_status()
return resp.json()
def adobe_api_post_json(endpoint, body):
"""POST JSON body to an Adobe Sign endpoint."""
token = os.getenv("ADOBE_ACCESS_TOKEN")
base_url = os.getenv("ADOBE_SIGN_BASE_URL", f"https://api.{SHARD}.adobesign.com/api/rest/v6")
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
url = f"{base_url}/{endpoint}"
resp = requests.post(url, headers=headers, json=body)
if resp.status_code == 401:
token = _refresh_access_token()
headers["Authorization"] = f"Bearer {token}"
resp = requests.post(url, headers=headers, json=body)
resp.raise_for_status()
return resp.json()
def adobe_api_put_json(endpoint, body):
"""PUT JSON body to an Adobe Sign endpoint."""
token = os.getenv("ADOBE_ACCESS_TOKEN")
base_url = os.getenv("ADOBE_SIGN_BASE_URL", f"https://api.{SHARD}.adobesign.com/api/rest/v6")
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
url = f"{base_url}/{endpoint}"
resp = requests.put(url, headers=headers, json=body)
if resp.status_code == 401:
token = _refresh_access_token()
headers["Authorization"] = f"Bearer {token}"
resp = requests.put(url, headers=headers, json=body)
resp.raise_for_status()
return resp.json()
def adobe_api_get_bytes(endpoint):
"""Download binary content (e.g. PDF files) from the Adobe Sign API."""
token = os.getenv("ADOBE_ACCESS_TOKEN")
base_url = os.getenv("ADOBE_SIGN_BASE_URL", f"https://api.{SHARD}.adobesign.com/api/rest/v6")
headers = {"Authorization": f"Bearer {token}"}
url = f"{base_url}/{endpoint}"
resp = requests.get(url, headers=headers)
if resp.status_code == 401:
token = _refresh_access_token()
headers["Authorization"] = f"Bearer {token}"
resp = requests.get(url, headers=headers)
resp.raise_for_status()
return resp.content
def adobe_api_get(endpoint, params=None):
token = os.getenv("ADOBE_ACCESS_TOKEN")
base_url = os.getenv("ADOBE_SIGN_BASE_URL", f"https://api.{SHARD}.adobesign.com/api/rest/v6")
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json",
}
url = f"{base_url}/{endpoint}"
resp = requests.get(url, headers=headers, params=params)
if resp.status_code == 401:
# Token expired — refresh and retry once
token = _refresh_access_token()
headers["Authorization"] = f"Bearer {token}"
resp = requests.get(url, headers=headers, params=params)
resp.raise_for_status()
return resp.json()
if __name__ == "__main__":
library_docs = adobe_api_get("libraryDocuments")
print("Library Documents:", library_docs)

104
src/auth_adobe.py Normal file
View File

@ -0,0 +1,104 @@
"""
One-time Adobe Sign OAuth setup.
Run this script once to authorize the app and save tokens to .env:
python src/auth_adobe.py
Prerequisites:
- Set ADOBE_CLIENT_ID and ADOBE_CLIENT_SECRET in .env (or export them)
- Redirect URI in your Adobe Sign app must be set to: https://localhost
After authorizing in the browser, the page will fail to load (that's expected).
Copy the full URL from the address bar and paste it when prompted.
"""
import os
import sys
import webbrowser
from urllib.parse import urlencode, urlparse, parse_qs
from dotenv import load_dotenv, set_key
import requests
load_dotenv()
SHARD = "eu2"
AUTH_URL = f"https://secure.{SHARD}.adobesign.com/public/oauth/v2"
TOKEN_URL = f"https://api.{SHARD}.adobesign.com/oauth/v2/token"
REDIRECT_URI = "https://localhost:8080/callback"
SCOPES = "library_read:self library_write:self user_read:self"
ENV_FILE = os.path.join(os.path.dirname(__file__), "..", ".env")
def get_auth_url(client_id):
params = {
"redirect_uri": REDIRECT_URI,
"response_type": "code",
"client_id": client_id,
"scope": SCOPES,
}
return f"{AUTH_URL}?{urlencode(params)}"
def extract_code(redirected_url):
parsed = urlparse(redirected_url)
params = parse_qs(parsed.query)
if "code" not in params:
error = params.get("error_description", params.get("error", ["unknown"]))[0]
raise ValueError(f"No code in URL. Error: {error}")
return params["code"][0]
def exchange_code_for_tokens(code, client_id, client_secret):
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"client_id": client_id,
"client_secret": client_secret,
}
resp = requests.post(TOKEN_URL, data=data)
resp.raise_for_status()
return resp.json()
def save_tokens(tokens):
abs_env = os.path.abspath(ENV_FILE)
set_key(abs_env, "ADOBE_ACCESS_TOKEN", tokens["access_token"])
if "refresh_token" in tokens:
set_key(abs_env, "ADOBE_REFRESH_TOKEN", tokens["refresh_token"])
set_key(abs_env, "ADOBE_SIGN_BASE_URL", f"https://api.{SHARD}.adobesign.com/api/rest/v6")
print(f"Tokens saved to {abs_env}")
def main():
client_id = os.getenv("ADOBE_CLIENT_ID")
client_secret = os.getenv("ADOBE_CLIENT_SECRET")
if not client_id or not client_secret:
print("ERROR: ADOBE_CLIENT_ID and ADOBE_CLIENT_SECRET must be set in .env")
sys.exit(1)
url = get_auth_url(client_id)
print(f"\nOpening browser for authorization...")
print(f"\nIf the browser doesn't open, go to:\n{url}\n")
webbrowser.open(url)
print("After authorizing, the browser will land on a page that fails to load.")
print("That's expected — just copy the full URL from the address bar and paste it here.\n")
redirected_url = input("Paste the redirect URL: ").strip()
try:
code = extract_code(redirected_url)
except ValueError as e:
print(f"ERROR: {e}")
sys.exit(1)
print("Exchanging code for tokens...")
tokens = exchange_code_for_tokens(code, client_id, client_secret)
save_tokens(tokens)
print("Done. You can now run the migrator.")
if __name__ == "__main__":
main()