Test
This commit is contained in:
212
customForms.py
Normal file
212
customForms.py
Normal file
@@ -0,0 +1,212 @@
|
||||
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Bulk downloader for Avalanche Association files listed in an Excel sheet.
|
||||
|
||||
- Reads the "resource>cform" sheet (default) from the Excel file.
|
||||
- Builds each file's URL from a base like:
|
||||
https://cdn.ymaws.com/www.avalancheassociation.ca
|
||||
+ the value in the "File Path" column.
|
||||
- Saves each file as the "Original File Name" inside a folder named:
|
||||
"<Member ID> - <Member First Name>, <Member Last Name>"
|
||||
- Skips rows with missing essentials and logs errors instead of crashing.
|
||||
- Retries flaky network requests automatically.
|
||||
"""
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import math
|
||||
import logging
|
||||
import unicodedata
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import pandas as pd
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
# ---------------------------
|
||||
# Helpers
|
||||
# ---------------------------
|
||||
def setup_logger(verbosity:int=1):
|
||||
level = logging.INFO if verbosity == 1 else logging.DEBUG if verbosity > 1 else logging.WARNING
|
||||
logging.basicConfig(
|
||||
level=level,
|
||||
format="%(asctime)s | %(levelname)-8s | %(message)s",
|
||||
datefmt="%H:%M:%S"
|
||||
)
|
||||
|
||||
def normalize(s: str) -> str:
|
||||
"""Normalize unicode for filenames."""
|
||||
return unicodedata.normalize("NFKD", s)
|
||||
|
||||
def safe_filename(name: str) -> str:
|
||||
"""Make a safe filename (no weird characters, trimmed)."""
|
||||
s = normalize(name).strip()
|
||||
# Replace path separators and forbidden characters across OSs
|
||||
bad = r'<>:"/\\|?*'
|
||||
for ch in bad:
|
||||
s = s.replace(ch, "_")
|
||||
# avoid control chars
|
||||
s = "".join(c for c in s if ord(c) >= 32)
|
||||
# collapse spaces/underscores
|
||||
while "__" in s:
|
||||
s = s.replace("__", "_")
|
||||
return s
|
||||
|
||||
def safe_folder(name: str) -> str:
|
||||
return safe_filename(name)
|
||||
|
||||
def build_session(timeout: int = 20, total_retries: int = 5, backoff: float = 0.5) -> requests.Session:
|
||||
session = requests.Session()
|
||||
retry = Retry(
|
||||
total=total_retries,
|
||||
read=total_retries,
|
||||
connect=total_retries,
|
||||
status=total_retries,
|
||||
backoff_factor=backoff,
|
||||
status_forcelist=(429, 500, 502, 503, 504),
|
||||
allowed_methods=frozenset(["GET"]),
|
||||
raise_on_status=False,
|
||||
)
|
||||
adapter = HTTPAdapter(max_retries=retry, pool_connections=10, pool_maxsize=20)
|
||||
session.mount("http://", adapter)
|
||||
session.mount("https://", adapter)
|
||||
session.headers.update({
|
||||
"User-Agent": "AA-Excel-Downloader/1.0 (+python requests)"
|
||||
})
|
||||
session.request_timeout = timeout
|
||||
return session
|
||||
|
||||
def fetch_file(session: requests.Session, url: str, dest: Path, overwrite: bool = False) -> Tuple[bool, Optional[str]]:
|
||||
"""Download a single file to dest. Returns (ok, error_message)."""
|
||||
try:
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
if dest.exists() and not overwrite:
|
||||
logging.debug(f"Exists, skipping: {dest}")
|
||||
return True, None
|
||||
|
||||
with session.get(url, stream=True, timeout=session.request_timeout) as r:
|
||||
if r.status_code != 200:
|
||||
return False, f"HTTP {r.status_code}"
|
||||
# Determine size if provided
|
||||
total = int(r.headers.get("Content-Length", 0))
|
||||
chunk = 1024 * 64
|
||||
done = 0
|
||||
with open(dest, "wb") as f:
|
||||
for part in r.iter_content(chunk_size=chunk):
|
||||
if part:
|
||||
f.write(part)
|
||||
done += len(part)
|
||||
# crude size check if header existed
|
||||
if total and done < total:
|
||||
return False, f"Incomplete download: {done}/{total} bytes"
|
||||
return True, None
|
||||
except requests.RequestException as e:
|
||||
return False, f"Request failed: {e}"
|
||||
except OSError as e:
|
||||
return False, f"Filesystem error: {e}"
|
||||
|
||||
# ---------------------------
|
||||
# Core
|
||||
# ---------------------------
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Download files listed in an Excel (resource>cform) sheet.")
|
||||
parser.add_argument("excel_path", help="Path to the Excel file (e.g., Mapping.xlsx)")
|
||||
parser.add_argument("--sheet", default="resource>cform", help="Sheet name to read (default: resource>cform)")
|
||||
parser.add_argument("--base-url", default="https://cdn.ymaws.com/www.avalancheassociation.ca",
|
||||
help="Base URL to prepend to 'File Path'")
|
||||
parser.add_argument("--output-dir", default="downloads", help="Directory to put member folders in")
|
||||
parser.add_argument("--limit", type=int, default=None, help="Optionally limit number of rows processed (for testing)")
|
||||
parser.add_argument("--overwrite", action="store_true", help="Overwrite files if they already exist")
|
||||
parser.add_argument("-v", "--verbose", action="count", default=1, help="Increase log verbosity (-v, -vv)")
|
||||
|
||||
args = parser.parse_args()
|
||||
setup_logger(args.verbose)
|
||||
|
||||
excel_path = Path(args.excel_path)
|
||||
if not excel_path.exists():
|
||||
logging.error(f"Excel not found: {excel_path}")
|
||||
sys.exit(1)
|
||||
|
||||
# Load once
|
||||
try:
|
||||
df = pd.read_excel(excel_path, sheet_name=args.sheet, dtype=str)
|
||||
except ValueError as e:
|
||||
logging.error(f"Unable to read sheet '{args.sheet}': {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Normalize columns (we will access by the exact names present in the file)
|
||||
required_cols = ["File Path", "Original File Name", "Member ID", "Member First Name", "Member Last Name"]
|
||||
missing = [c for c in required_cols if c not in df.columns]
|
||||
if missing:
|
||||
logging.error(f"Missing required columns: {missing}")
|
||||
sys.exit(1)
|
||||
|
||||
# Drop rows with no file path or original name
|
||||
df = df.dropna(subset=["File Path", "Original File Name", "Member ID", "Member First Name", "Member Last Name"])
|
||||
if args.limit:
|
||||
df = df.head(args.limit)
|
||||
|
||||
base = args.base_url.rstrip("/")
|
||||
out_root = Path(args.output_dir)
|
||||
|
||||
session = build_session()
|
||||
|
||||
total = len(df)
|
||||
logging.info(f"Rows to process: {total}")
|
||||
|
||||
successes = 0
|
||||
failures = 0
|
||||
|
||||
for idx, row in df.iterrows():
|
||||
file_path = str(row["File Path"]).strip()
|
||||
orig_name = str(row["Original File Name"]).strip()
|
||||
member_id = str(row["Member ID"]).strip()
|
||||
first = str(row["Member First Name"]).strip()
|
||||
last = str(row["Member Last Name"]).strip()
|
||||
|
||||
if not file_path or not orig_name or not member_id:
|
||||
logging.debug(f"Skipping row {idx}: missing essentials")
|
||||
continue
|
||||
|
||||
# Build URL safely
|
||||
url = f"{base}/{file_path.lstrip('/')}"
|
||||
# Build destination folder and filename
|
||||
folder_name = f"{member_id} - {first}, {last}"
|
||||
dest_dir = out_root / safe_folder(folder_name)
|
||||
|
||||
# Ensure filename is safe; keep extension if present in original
|
||||
safe_name = safe_filename(orig_name)
|
||||
if not os.path.splitext(safe_name)[1]:
|
||||
# If original name has no extension, try to infer from file_path
|
||||
ext = os.path.splitext(file_path)[1]
|
||||
if ext:
|
||||
safe_name = safe_name + ext
|
||||
|
||||
dest = dest_dir / safe_name
|
||||
|
||||
# If file exists and overwrite is false, try to avoid clobbering by adding a counter
|
||||
if dest.exists() and not args.overwrite:
|
||||
stem, ext = os.path.splitext(dest.name)
|
||||
n = 1
|
||||
while (dest_dir / f"{stem} ({n}){ext}").exists():
|
||||
n += 1
|
||||
dest = dest_dir / f"{stem} ({n}){ext}"
|
||||
|
||||
ok, err = fetch_file(session, url, dest, overwrite=args.overwrite)
|
||||
if ok:
|
||||
successes += 1
|
||||
logging.info(f"✓ Saved: {dest}")
|
||||
else:
|
||||
failures += 1
|
||||
logging.warning(f"✗ Failed ({err}) :: {url} -> {dest}")
|
||||
|
||||
logging.info(f"Done. Success: {successes} | Failures: {failures} | Output: {out_root.resolve()}")
|
||||
# Exit code indicates if anything failed
|
||||
sys.exit(0 if failures == 0 else 2)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user