diff --git a/src/adobe_api.py b/src/adobe_api.py new file mode 100644 index 0000000..e649af5 --- /dev/null +++ b/src/adobe_api.py @@ -0,0 +1,133 @@ +import os +import requests +from dotenv import load_dotenv, set_key + +load_dotenv() + +SHARD = "eu2" +TOKEN_URL = f"https://api.{SHARD}.adobesign.com/oauth/v2/token" +REDIRECT_URI = "https://localhost:8080/callback" +ENV_FILE = os.path.join(os.path.dirname(__file__), "..", ".env") + + +def _refresh_access_token(): + client_id = os.getenv("ADOBE_CLIENT_ID") + client_secret = os.getenv("ADOBE_CLIENT_SECRET") + refresh_token = os.getenv("ADOBE_REFRESH_TOKEN") + + if not all([client_id, client_secret, refresh_token]): + raise RuntimeError("Missing credentials for token refresh. Run src/auth_adobe.py first.") + + data = { + "grant_type": "refresh_token", + "refresh_token": refresh_token, + "client_id": client_id, + "client_secret": client_secret, + "redirect_uri": REDIRECT_URI, + } + resp = requests.post(TOKEN_URL, data=data) + resp.raise_for_status() + new_token = resp.json()["access_token"] + + abs_env = os.path.abspath(ENV_FILE) + set_key(abs_env, "ADOBE_ACCESS_TOKEN", new_token) + os.environ["ADOBE_ACCESS_TOKEN"] = new_token + return new_token + + +def adobe_api_post_multipart(endpoint, files, data=None): + """Upload a file via multipart/form-data (e.g. transient documents).""" + token = os.getenv("ADOBE_ACCESS_TOKEN") + base_url = os.getenv("ADOBE_SIGN_BASE_URL", f"https://api.{SHARD}.adobesign.com/api/rest/v6") + headers = {"Authorization": f"Bearer {token}"} + url = f"{base_url}/{endpoint}" + resp = requests.post(url, headers=headers, files=files, data=data or {}) + if resp.status_code == 401: + token = _refresh_access_token() + headers["Authorization"] = f"Bearer {token}" + resp = requests.post(url, headers=headers, files=files, data=data or {}) + resp.raise_for_status() + return resp.json() + + +def adobe_api_post_json(endpoint, body): + """POST JSON body to an Adobe Sign endpoint.""" + token = os.getenv("ADOBE_ACCESS_TOKEN") + base_url = os.getenv("ADOBE_SIGN_BASE_URL", f"https://api.{SHARD}.adobesign.com/api/rest/v6") + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + "Accept": "application/json", + } + url = f"{base_url}/{endpoint}" + resp = requests.post(url, headers=headers, json=body) + if resp.status_code == 401: + token = _refresh_access_token() + headers["Authorization"] = f"Bearer {token}" + resp = requests.post(url, headers=headers, json=body) + resp.raise_for_status() + return resp.json() + + +def adobe_api_put_json(endpoint, body): + """PUT JSON body to an Adobe Sign endpoint.""" + token = os.getenv("ADOBE_ACCESS_TOKEN") + base_url = os.getenv("ADOBE_SIGN_BASE_URL", f"https://api.{SHARD}.adobesign.com/api/rest/v6") + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + "Accept": "application/json", + } + url = f"{base_url}/{endpoint}" + resp = requests.put(url, headers=headers, json=body) + if resp.status_code == 401: + token = _refresh_access_token() + headers["Authorization"] = f"Bearer {token}" + resp = requests.put(url, headers=headers, json=body) + resp.raise_for_status() + return resp.json() + + +def adobe_api_get_bytes(endpoint): + """Download binary content (e.g. PDF files) from the Adobe Sign API.""" + token = os.getenv("ADOBE_ACCESS_TOKEN") + base_url = os.getenv("ADOBE_SIGN_BASE_URL", f"https://api.{SHARD}.adobesign.com/api/rest/v6") + + headers = {"Authorization": f"Bearer {token}"} + url = f"{base_url}/{endpoint}" + resp = requests.get(url, headers=headers) + + if resp.status_code == 401: + token = _refresh_access_token() + headers["Authorization"] = f"Bearer {token}" + resp = requests.get(url, headers=headers) + + resp.raise_for_status() + return resp.content + + +def adobe_api_get(endpoint, params=None): + token = os.getenv("ADOBE_ACCESS_TOKEN") + base_url = os.getenv("ADOBE_SIGN_BASE_URL", f"https://api.{SHARD}.adobesign.com/api/rest/v6") + + headers = { + "Authorization": f"Bearer {token}", + "Accept": "application/json", + } + + url = f"{base_url}/{endpoint}" + resp = requests.get(url, headers=headers, params=params) + + if resp.status_code == 401: + # Token expired — refresh and retry once + token = _refresh_access_token() + headers["Authorization"] = f"Bearer {token}" + resp = requests.get(url, headers=headers, params=params) + + resp.raise_for_status() + return resp.json() + + +if __name__ == "__main__": + library_docs = adobe_api_get("libraryDocuments") + print("Library Documents:", library_docs) diff --git a/src/auth_adobe.py b/src/auth_adobe.py new file mode 100644 index 0000000..75d7b45 --- /dev/null +++ b/src/auth_adobe.py @@ -0,0 +1,104 @@ +""" +One-time Adobe Sign OAuth setup. + +Run this script once to authorize the app and save tokens to .env: + python src/auth_adobe.py + +Prerequisites: + - Set ADOBE_CLIENT_ID and ADOBE_CLIENT_SECRET in .env (or export them) + - Redirect URI in your Adobe Sign app must be set to: https://localhost + +After authorizing in the browser, the page will fail to load (that's expected). +Copy the full URL from the address bar and paste it when prompted. +""" + +import os +import sys +import webbrowser +from urllib.parse import urlencode, urlparse, parse_qs + +from dotenv import load_dotenv, set_key +import requests + +load_dotenv() + +SHARD = "eu2" +AUTH_URL = f"https://secure.{SHARD}.adobesign.com/public/oauth/v2" +TOKEN_URL = f"https://api.{SHARD}.adobesign.com/oauth/v2/token" +REDIRECT_URI = "https://localhost:8080/callback" +SCOPES = "library_read:self library_write:self user_read:self" +ENV_FILE = os.path.join(os.path.dirname(__file__), "..", ".env") + + +def get_auth_url(client_id): + params = { + "redirect_uri": REDIRECT_URI, + "response_type": "code", + "client_id": client_id, + "scope": SCOPES, + } + return f"{AUTH_URL}?{urlencode(params)}" + + +def extract_code(redirected_url): + parsed = urlparse(redirected_url) + params = parse_qs(parsed.query) + if "code" not in params: + error = params.get("error_description", params.get("error", ["unknown"]))[0] + raise ValueError(f"No code in URL. Error: {error}") + return params["code"][0] + + +def exchange_code_for_tokens(code, client_id, client_secret): + data = { + "grant_type": "authorization_code", + "code": code, + "redirect_uri": REDIRECT_URI, + "client_id": client_id, + "client_secret": client_secret, + } + resp = requests.post(TOKEN_URL, data=data) + resp.raise_for_status() + return resp.json() + + +def save_tokens(tokens): + abs_env = os.path.abspath(ENV_FILE) + set_key(abs_env, "ADOBE_ACCESS_TOKEN", tokens["access_token"]) + if "refresh_token" in tokens: + set_key(abs_env, "ADOBE_REFRESH_TOKEN", tokens["refresh_token"]) + set_key(abs_env, "ADOBE_SIGN_BASE_URL", f"https://api.{SHARD}.adobesign.com/api/rest/v6") + print(f"Tokens saved to {abs_env}") + + +def main(): + client_id = os.getenv("ADOBE_CLIENT_ID") + client_secret = os.getenv("ADOBE_CLIENT_SECRET") + + if not client_id or not client_secret: + print("ERROR: ADOBE_CLIENT_ID and ADOBE_CLIENT_SECRET must be set in .env") + sys.exit(1) + + url = get_auth_url(client_id) + print(f"\nOpening browser for authorization...") + print(f"\nIf the browser doesn't open, go to:\n{url}\n") + webbrowser.open(url) + + print("After authorizing, the browser will land on a page that fails to load.") + print("That's expected — just copy the full URL from the address bar and paste it here.\n") + redirected_url = input("Paste the redirect URL: ").strip() + + try: + code = extract_code(redirected_url) + except ValueError as e: + print(f"ERROR: {e}") + sys.exit(1) + + print("Exchanging code for tokens...") + tokens = exchange_code_for_tokens(code, client_id, client_secret) + save_tokens(tokens) + print("Done. You can now run the migrator.") + + +if __name__ == "__main__": + main()