Files
puppet-avalanche/customForms.py
2025-10-16 08:24:51 -04:00

213 lines
7.8 KiB
Python

#!/usr/bin/env python3
"""
Bulk downloader for Avalanche Association files listed in an Excel sheet.
- Reads the "resource>cform" sheet (default) from the Excel file.
- Builds each file's URL from a base like:
https://cdn.ymaws.com/www.avalancheassociation.ca
+ the value in the "File Path" column.
- Saves each file as the "Original File Name" inside a folder named:
"<Member ID> - <Member First Name>, <Member Last Name>"
- Skips rows with missing essentials and logs errors instead of crashing.
- Retries flaky network requests automatically.
"""
import argparse
import os
import sys
import time
import math
import logging
import unicodedata
from pathlib import Path
from typing import Optional, Tuple
import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# ---------------------------
# Helpers
# ---------------------------
def setup_logger(verbosity:int=1):
level = logging.INFO if verbosity == 1 else logging.DEBUG if verbosity > 1 else logging.WARNING
logging.basicConfig(
level=level,
format="%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%H:%M:%S"
)
def normalize(s: str) -> str:
"""Normalize unicode for filenames."""
return unicodedata.normalize("NFKD", s)
def safe_filename(name: str) -> str:
"""Make a safe filename (no weird characters, trimmed)."""
s = normalize(name).strip()
# Replace path separators and forbidden characters across OSs
bad = r'<>:"/\\|?*'
for ch in bad:
s = s.replace(ch, "_")
# avoid control chars
s = "".join(c for c in s if ord(c) >= 32)
# collapse spaces/underscores
while "__" in s:
s = s.replace("__", "_")
return s
def safe_folder(name: str) -> str:
return safe_filename(name)
def build_session(timeout: int = 20, total_retries: int = 5, backoff: float = 0.5) -> requests.Session:
session = requests.Session()
retry = Retry(
total=total_retries,
read=total_retries,
connect=total_retries,
status=total_retries,
backoff_factor=backoff,
status_forcelist=(429, 500, 502, 503, 504),
allowed_methods=frozenset(["GET"]),
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry, pool_connections=10, pool_maxsize=20)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.headers.update({
"User-Agent": "AA-Excel-Downloader/1.0 (+python requests)"
})
session.request_timeout = timeout
return session
def fetch_file(session: requests.Session, url: str, dest: Path, overwrite: bool = False) -> Tuple[bool, Optional[str]]:
"""Download a single file to dest. Returns (ok, error_message)."""
try:
dest.parent.mkdir(parents=True, exist_ok=True)
if dest.exists() and not overwrite:
logging.debug(f"Exists, skipping: {dest}")
return True, None
with session.get(url, stream=True, timeout=session.request_timeout) as r:
if r.status_code != 200:
return False, f"HTTP {r.status_code}"
# Determine size if provided
total = int(r.headers.get("Content-Length", 0))
chunk = 1024 * 64
done = 0
with open(dest, "wb") as f:
for part in r.iter_content(chunk_size=chunk):
if part:
f.write(part)
done += len(part)
# crude size check if header existed
if total and done < total:
return False, f"Incomplete download: {done}/{total} bytes"
return True, None
except requests.RequestException as e:
return False, f"Request failed: {e}"
except OSError as e:
return False, f"Filesystem error: {e}"
# ---------------------------
# Core
# ---------------------------
def main():
parser = argparse.ArgumentParser(description="Download files listed in an Excel (resource>cform) sheet.")
parser.add_argument("excel_path", help="Path to the Excel file (e.g., Mapping.xlsx)")
parser.add_argument("--sheet", default="resource>cform", help="Sheet name to read (default: resource>cform)")
parser.add_argument("--base-url", default="https://cdn.ymaws.com/www.avalancheassociation.ca",
help="Base URL to prepend to 'File Path'")
parser.add_argument("--output-dir", default="downloads", help="Directory to put member folders in")
parser.add_argument("--limit", type=int, default=None, help="Optionally limit number of rows processed (for testing)")
parser.add_argument("--overwrite", action="store_true", help="Overwrite files if they already exist")
parser.add_argument("-v", "--verbose", action="count", default=1, help="Increase log verbosity (-v, -vv)")
args = parser.parse_args()
setup_logger(args.verbose)
excel_path = Path(args.excel_path)
if not excel_path.exists():
logging.error(f"Excel not found: {excel_path}")
sys.exit(1)
# Load once
try:
df = pd.read_excel(excel_path, sheet_name=args.sheet, dtype=str)
except ValueError as e:
logging.error(f"Unable to read sheet '{args.sheet}': {e}")
sys.exit(1)
# Normalize columns (we will access by the exact names present in the file)
required_cols = ["File Path", "Original File Name", "Member ID", "Member First Name", "Member Last Name"]
missing = [c for c in required_cols if c not in df.columns]
if missing:
logging.error(f"Missing required columns: {missing}")
sys.exit(1)
# Drop rows with no file path or original name
df = df.dropna(subset=["File Path", "Original File Name", "Member ID", "Member First Name", "Member Last Name"])
if args.limit:
df = df.head(args.limit)
base = args.base_url.rstrip("/")
out_root = Path(args.output_dir)
session = build_session()
total = len(df)
logging.info(f"Rows to process: {total}")
successes = 0
failures = 0
for idx, row in df.iterrows():
file_path = str(row["File Path"]).strip()
orig_name = str(row["Original File Name"]).strip()
member_id = str(row["Member ID"]).strip()
first = str(row["Member First Name"]).strip()
last = str(row["Member Last Name"]).strip()
if not file_path or not orig_name or not member_id:
logging.debug(f"Skipping row {idx}: missing essentials")
continue
# Build URL safely
url = f"{base}/{file_path.lstrip('/')}"
# Build destination folder and filename
folder_name = f"{member_id} - {first}, {last}"
dest_dir = out_root / safe_folder(folder_name)
# Ensure filename is safe; keep extension if present in original
safe_name = safe_filename(orig_name)
if not os.path.splitext(safe_name)[1]:
# If original name has no extension, try to infer from file_path
ext = os.path.splitext(file_path)[1]
if ext:
safe_name = safe_name + ext
dest = dest_dir / safe_name
# If file exists and overwrite is false, try to avoid clobbering by adding a counter
if dest.exists() and not args.overwrite:
stem, ext = os.path.splitext(dest.name)
n = 1
while (dest_dir / f"{stem} ({n}){ext}").exists():
n += 1
dest = dest_dir / f"{stem} ({n}){ext}"
ok, err = fetch_file(session, url, dest, overwrite=args.overwrite)
if ok:
successes += 1
logging.info(f"✓ Saved: {dest}")
else:
failures += 1
logging.warning(f"✗ Failed ({err}) :: {url} -> {dest}")
logging.info(f"Done. Success: {successes} | Failures: {failures} | Output: {out_root.resolve()}")
# Exit code indicates if anything failed
sys.exit(0 if failures == 0 else 2)
if __name__ == "__main__":
main()