sunnymh-manga-dl/download.py
2026-04-11 16:55:13 +08:00

687 lines
22 KiB
Python

"""
Manga downloader for m.happymh.com (educational purposes only).
Launches real Chrome via subprocess (not Playwright), then connects via
Chrome DevTools Protocol. Images are downloaded directly via HTTP.
Usage:
python download.py --setup # open Chrome, solve CF manually, exit
python download.py # download manga from manga.json
"""
import json
import re
import sys
import time
import socket
import subprocess
from pathlib import Path
from urllib.parse import urlparse
from playwright.sync_api import sync_playwright
BASE_URL = "https://m.happymh.com"
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/143.0.0.0 Safari/537.36"
)
ROOT_DIR = Path(__file__).parent
CONTENT_DIR = ROOT_DIR / "manga-content"
MANGA_JSON = ROOT_DIR / "manga.json"
BROWSER_DATA = ROOT_DIR / ".browser-data"
CDP_PORT = 9333
REQUEST_DELAY = 1.5
CHROME_PATH = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
def is_port_open(port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(("localhost", port)) == 0
def launch_chrome(start_url=None):
"""Launch real Chrome with CDP port."""
if is_port_open(CDP_PORT):
print(f"Chrome already on port {CDP_PORT}")
return None
if not Path(CHROME_PATH).exists():
print(f"Chrome not found at: {CHROME_PATH}")
sys.exit(1)
cmd = [
CHROME_PATH,
f"--remote-debugging-port={CDP_PORT}",
f"--user-data-dir={BROWSER_DATA}",
"--no-first-run",
"--no-default-browser-check",
]
if start_url:
cmd.append(start_url)
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
for _ in range(30):
if is_port_open(CDP_PORT):
time.sleep(1)
return proc
time.sleep(0.5)
print("Chrome failed to start")
sys.exit(1)
def wait_for_cloudflare(page, timeout=120):
"""Wait for CF to resolve. User solves CAPTCHA manually if needed."""
for i in range(timeout):
try:
title = page.title()
except Exception:
time.sleep(1)
continue
if "Just a moment" in title or "challenge" in page.url:
if i == 0:
print(" CF challenge — solve in browser...")
elif i % 15 == 0:
print(f" Still waiting for CF... ({i}s)")
time.sleep(1)
continue
if title and "嗨皮漫画" in title:
return True
if title and "happymh" in page.url:
return True
time.sleep(1)
print(" CF timed out.")
return False
def fetch_chapters_via_api(page, slug):
"""Get full chapter list via chapterByPage API with pagination."""
result = page.evaluate("""
async (slug) => {
const all = [];
let total = 0;
for (let p = 1; p <= 30; p++) {
const url = `/v2.0/apis/manga/chapterByPage?code=${slug}&lang=cn&order=asc&page=${p}&_t=${Date.now()}`;
try {
const ctrl = new AbortController();
setTimeout(() => ctrl.abort(), 10000);
const r = await fetch(url, { signal: ctrl.signal });
if (!r.ok) { if (p === 1) return { error: r.status }; break; }
const json = await r.json();
if (!json.data) break;
total = json.data.total || total;
// Find chapter array in response
let items = null;
for (const val of Object.values(json.data)) {
if (Array.isArray(val) && val.length > 0) {
items = val;
break;
}
}
if (!items || items.length === 0) break;
for (const ch of items) {
all.push({
id: String(ch.id || ''),
chapterName: ch.chapterName || ch.name || '',
});
}
if (total && all.length >= total) break;
} catch (e) {
if (p === 1) return { error: e.message };
break;
}
}
return { chapters: all, total };
}
""", slug)
if result and result.get("chapters") and len(result["chapters"]) > 0:
chapters = result["chapters"]
total = result.get("total", len(chapters))
print(f" API: {len(chapters)}/{total} chapters")
return chapters
if result and result.get("error"):
print(f" API error: {result['error']}")
return None
def fetch_chapters_from_dom(page):
"""Scrape all chapters from the MUI Drawer chapter list.
Opens drawer, clicks 'load more' repeatedly, then scrapes."""
try:
page.wait_for_selector("a[href*='/mangaread/']", timeout=15000)
page.wait_for_timeout(1000)
except Exception:
print(" No chapter links found")
return None
# Step 1: Open the chapter list drawer
for selector in [
"text=展开全部", "text=查看全部", "text=全部章节",
"text=展开更多", "text=更多",
"[class*='expand']", "[class*='more']",
]:
try:
btn = page.query_selector(selector)
if btn and btn.is_visible():
btn.click()
print(" Opening chapter drawer...")
page.wait_for_timeout(2000)
break
except Exception:
continue
# Step 2: Wait for drawer
try:
page.wait_for_selector(".MuiDrawer-paper", timeout=5000)
except Exception:
print(" Drawer not found, using page chapters")
# Step 3: Click sort button to get ascending order (oldest first)
try:
sort_btn = page.query_selector("text=点我改变排序")
if sort_btn and sort_btn.is_visible():
sort_btn.click()
print(" Sorting ascending...")
page.wait_for_timeout(2000)
except Exception:
pass
# Step 4: Click "点我加载更多" until all chapters loaded
# Get expected total from header "共177个章节"
total = page.evaluate("""
() => {
const spans = document.querySelectorAll('.MuiDrawer-paper span');
for (const s of spans) {
const m = s.textContent.match(/共(\\d+)个章节/);
if (m) return parseInt(m[1]);
}
return 0;
}
""")
if total:
print(f" Total chapters: {total}")
for round_num in range(50):
count = page.evaluate(
"document.querySelectorAll('.MuiDrawer-paper a[href*=\"/mangaread/\"]').length"
)
if total and count >= total:
break
print(f" Loading... {count}/{total or '?'}", end="\r")
# Find and click the "load more" element — search fresh each time
clicked = page.evaluate("""
() => {
const walker = document.createTreeWalker(
document.querySelector('.MuiDrawer-paper') || document.body,
NodeFilter.SHOW_TEXT
);
while (walker.nextNode()) {
if (walker.currentNode.textContent.includes('加载更多')) {
let el = walker.currentNode.parentElement;
while (el && el.tagName !== 'LI') el = el.parentElement;
if (el) { el.click(); return true; }
walker.currentNode.parentElement.click();
return true;
}
}
return false;
}
""")
if not clicked:
break
page.wait_for_timeout(1000)
count = page.evaluate(
"document.querySelectorAll('.MuiDrawer-paper a[href*=\"/mangaread/\"]').length"
)
print(f" Loaded {count} chapters" + " " * 20)
# Step 5: Scrape chapters from the drawer
chapters = page.evaluate("""
() => {
const drawer = document.querySelector('.MuiDrawer-paper');
const container = drawer || document;
const links = container.querySelectorAll('a[href*="/mangaread/"]');
const chapters = [];
const seen = new Set();
links.forEach(a => {
const href = a.getAttribute('href');
const match = href.match(/\\/mangaread\\/[^/]+\\/(\\d+)/);
if (match && !seen.has(match[1])) {
seen.add(match[1]);
const name = a.textContent.trim();
if (name && name !== '开始阅读') {
chapters.push({ id: match[1], chapterName: name });
}
}
});
return chapters;
}
""")
# Step 6: Close drawer
try:
page.keyboard.press("Escape")
page.wait_for_timeout(500)
except Exception:
pass
return chapters if chapters else None
def fetch_metadata(page):
"""Extract manga metadata and cover URL from the loaded page."""
html_text = page.content()
metadata = {"mg-url": page.url}
m = re.search(r'<h2 class="mg-title">(.*?)</h2>', html_text)
if m:
metadata["mg-title"] = m.group(1).strip()
m = re.search(r'<p class="mg-sub-title"><a[^>]*>(.*?)</a>', html_text)
if m:
metadata["mg-author"] = m.group(1).strip()
genre_matches = re.findall(r'<p class="mg-cate">.*?</p>', html_text, re.DOTALL)
if genre_matches:
genres = re.findall(r'<a[^>]*>(.*?)</a>', genre_matches[0])
metadata["mg-genres"] = genres
m = re.search(r'<div class="mg-desc">.*?<p[^>]*>(.*?)</p>', html_text, re.DOTALL)
if m:
metadata["mg-description"] = m.group(1).strip()
# Extract cover image URL
cover_url = page.evaluate("""
() => {
// Try og:image meta tag
const og = document.querySelector('meta[property="og:image"]');
if (og) return og.content;
// Try common cover selectors
const selectors = ['img.mg-cover', '.mg-cover img', '.cover img', 'img[src*="mcover"]'];
for (const sel of selectors) {
const img = document.querySelector(sel);
if (img && img.src) return img.src;
}
return null;
}
""")
if cover_url:
metadata["mg-cover"] = cover_url
return metadata
def get_chapter_images(page, slug, chapter_id):
"""Navigate to reader page, intercept the API response for image URLs."""
captured_images = []
api_info = {"found": False, "error": None}
def on_response(response):
if "/apis/manga/reading" not in response.url:
return
api_info["found"] = True
if response.status != 200:
api_info["error"] = f"status {response.status}"
return
try:
data = response.json()
scans = data.get("data", {}).get("scans", [])
if isinstance(scans, str):
scans = json.loads(scans)
for scan in scans:
if isinstance(scan, dict) and "url" in scan:
captured_images.append({
"url": scan["url"],
"no_referrer": scan.get("r", 0) != 0,
})
except Exception as e:
api_info["error"] = str(e)
page.on("response", on_response)
reader_url = f"{BASE_URL}/mangaread/{slug}/{chapter_id}"
print(" Loading reader...")
try:
page.evaluate(f"window.location.href = '{reader_url}'")
except Exception:
pass
time.sleep(2)
try:
page.evaluate("window.close = () => {}")
except Exception:
pass
print(" Waiting for page...")
if not wait_for_cloudflare(page, timeout=90):
page.remove_listener("response", on_response)
return []
print(" Waiting for API...")
deadline = time.time() + 20
while time.time() < deadline:
if captured_images:
break
try:
page.wait_for_timeout(500)
except Exception:
break
page.remove_listener("response", on_response)
if not api_info["found"]:
print(" API not intercepted")
elif api_info["error"]:
print(f" API: {api_info['error']}")
# DOM fallback
if not captured_images:
try:
page.wait_for_timeout(3000)
dom_images = page.evaluate("""
() => {
const imgs = document.querySelectorAll('img[src*="http"]');
const urls = [];
const seen = new Set();
imgs.forEach(img => {
const src = img.src || '';
if (src && !seen.has(src) && !src.includes('/mcover/')
&& !src.includes('cloudflare') && !src.includes('.svg')) {
seen.add(src);
urls.push(src);
}
});
return urls;
}
""")
if dom_images:
print(f" DOM: {len(dom_images)} images")
for u in dom_images:
captured_images.append({"url": u, "no_referrer": False})
except Exception as e:
print(f" DOM failed: {e}")
return captured_images
def download_image(page, img, save_path):
"""Download image via browser network stack. Captures raw bytes via CDP — no base64."""
if save_path.exists():
return True
url = img["url"]
ref_policy = "no-referrer" if img.get("no_referrer") else "origin"
try:
with page.expect_response(lambda r: url in r.url, timeout=15000) as resp_info:
page.evaluate(
"([u, r]) => fetch(u, { referrerPolicy: r })",
[url, ref_policy],
)
response = resp_info.value
if response.status == 200:
body = response.body() # raw bytes from network layer
if body and len(body) > 100:
save_path.parent.mkdir(parents=True, exist_ok=True)
save_path.write_bytes(body)
return True
except Exception as e:
if not hasattr(download_image, "_err_logged"):
download_image._err_logged = True
print(f"\n First error: {e}")
return False
return False
def get_existing_chapters(manga_dir):
existing = set()
if manga_dir.exists():
for entry in manga_dir.iterdir():
if entry.is_dir() and any(entry.glob("*.jpg")):
existing.add(entry.name)
return existing
def download_manga(page, manga_url):
"""Download all chapters using a single page."""
slug = urlparse(manga_url).path.strip("/").split("/")[-1]
manga_dir = CONTENT_DIR / slug
print(f"\n{'='*60}")
print(f"Manga: {slug}")
print(f"{'='*60}")
# Intercept all cover images from page load traffic
cover_responses = {}
def on_manga_response(response):
if "/mcover/" in response.url and response.status == 200:
try:
cover_responses[response.url] = response.body()
except Exception:
pass
page.on("response", on_manga_response)
print("Loading manga page...")
try:
page.goto(f"{BASE_URL}/manga/{slug}", wait_until="commit", timeout=60000)
except Exception:
pass
if not wait_for_cloudflare(page):
page.remove_listener("response", on_manga_response)
return
print("Fetching chapters via API...")
chapters = fetch_chapters_via_api(page, slug)
if not chapters:
print(" API failed, trying DOM...")
chapters = fetch_chapters_from_dom(page)
if not chapters:
print("No chapters found.")
return
print(f"Found {len(chapters)} chapters")
metadata = fetch_metadata(page)
manga_dir.mkdir(parents=True, exist_ok=True)
detail_path = manga_dir / "detail.json"
if metadata:
existing_meta = {}
if detail_path.exists():
try:
existing_meta = json.loads(detail_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
pass
existing_meta.update(metadata)
detail_path.write_text(
json.dumps(existing_meta, ensure_ascii=False, indent=4),
encoding="utf-8",
)
# Save cover image — match the correct one from DOM
page.remove_listener("response", on_manga_response)
cover_path = manga_dir / "cover.jpg"
if not cover_path.exists():
# Find the actual cover URL from the first mip-fill-content img
cover_url = page.evaluate("""
() => {
const img = document.querySelector('img.mip-fill-content[src*="mcover"]');
return img ? img.src : null;
}
""")
cover_body = None
if cover_url:
# Exact match first
cover_body = cover_responses.get(cover_url)
# Substring match fallback
if not cover_body:
for url, data in cover_responses.items():
if cover_url.split("?")[0] in url or url.split("?")[0] in cover_url:
cover_body = data
break
if cover_body and len(cover_body) > 100:
cover_path.write_bytes(cover_body)
print(f"Cover saved ({len(cover_body)} bytes)")
else:
print(f"Cover not found (captured {len(cover_responses)} mcover images, target: {cover_url})")
existing_chapters = get_existing_chapters(manga_dir)
# Chapters are already in DOM order (ascending from drawer)
chapters_sorted = chapters
for i, chapter in enumerate(chapters_sorted, 1):
ch_id = chapter["id"]
ch_name = chapter["chapterName"]
folder_name = f"{i} {ch_name}"
# Skip if this chapter already downloaded (check by chapter name)
already = any(ch_name in name for name in existing_chapters)
if already:
print(f" [{i}/{len(chapters_sorted)}] {ch_name} — skip")
continue
print(f" [{i}/{len(chapters_sorted)}] {ch_name} (id={ch_id})")
images = get_chapter_images(page, slug, ch_id)
if not images:
print(f" No images")
continue
print(f" {len(images)} pages")
chapter_dir = manga_dir / folder_name
chapter_dir.mkdir(parents=True, exist_ok=True)
# Download images via browser network stack (raw bytes, no base64)
ok = 0
failed = []
for pn, img in enumerate(images, 1):
save_path = chapter_dir / f"{pn}.jpg"
if download_image(page, img, save_path):
ok += 1
print(f" {pn}/{len(images)}", end="\r")
else:
failed.append((pn, img))
time.sleep(0.1)
# Retry failed images once
if failed:
time.sleep(1)
for pn, img in failed:
save_path = chapter_dir / f"{pn}.jpg"
if download_image(page, img, save_path):
ok += 1
else:
print(f" {pn}/{len(images)} FAIL")
time.sleep(0.3)
print(f" {ok}/{len(images)} downloaded" + " " * 20)
if ok == 0:
try:
chapter_dir.rmdir()
except Exception:
pass
time.sleep(REQUEST_DELAY)
print(f"\nDone: {slug}")
def setup_mode():
"""Launch Chrome for manual CF solving."""
print("=== SETUP ===")
print("Chrome will open. Do this:")
print(" 1. Go to m.happymh.com — solve Cloudflare")
print(" 2. Open a manga page — solve CF if prompted")
print(" 3. Open a chapter reader — solve CF if prompted")
print(" 4. Press ENTER here when done\n")
chrome_proc = launch_chrome(BASE_URL)
input(">>> Press ENTER when Cloudflare is solved... ")
try:
with sync_playwright() as p:
browser = p.chromium.connect_over_cdp(f"http://localhost:{CDP_PORT}")
ctx = browser.contexts[0]
cookies = ctx.cookies()
cf = [c for c in cookies if c["name"] == "cf_clearance"]
if cf:
print("cf_clearance found!")
else:
print("Warning: cf_clearance not found")
browser.close()
except Exception as e:
print(f"Could not verify: {e}")
if chrome_proc:
chrome_proc.terminate()
print("Done. Now run: python download.py")
def main():
if "--setup" in sys.argv:
setup_mode()
return
if not MANGA_JSON.exists():
print(f"Error: {MANGA_JSON} not found")
sys.exit(1)
manga_urls = json.loads(MANGA_JSON.read_text(encoding="utf-8"))
if not isinstance(manga_urls, list) or not manga_urls:
print("Error: manga.json should be a JSON array of URLs")
sys.exit(1)
print(f"Found {len(manga_urls)} manga(s)")
print("Launching Chrome...\n")
chrome_proc = launch_chrome()
try:
with sync_playwright() as p:
browser = p.chromium.connect_over_cdp(f"http://localhost:{CDP_PORT}")
context = browser.contexts[0]
page = context.pages[0] if context.pages else context.new_page()
for url in manga_urls:
try:
download_manga(page, url)
except Exception as e:
print(f"\nError: {url}: {e}")
import traceback
traceback.print_exc()
browser.close()
finally:
if chrome_proc:
chrome_proc.terminate()
print("\nAll done!")
if __name__ == "__main__":
main()