175 lines
7.2 KiB
Python
175 lines
7.2 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""ARRFLIX headless smoke-test. Logs in via API, navigates to a detail page,
|
||
|
|
captures screenshot + console errors + network failures + computed-style for
|
||
|
|
backdrop. Pass dev or prod URL as argv[1]."""
|
||
|
|
import sys, json, time, os, asyncio, urllib.request, urllib.error
|
||
|
|
from playwright.async_api import async_playwright
|
||
|
|
|
||
|
|
URL = sys.argv[1] if len(sys.argv) > 1 else "https://dev.arrflix.s8n.ru"
|
||
|
|
USER = sys.argv[2] if len(sys.argv) > 2 else "guest-mirror"
|
||
|
|
PASS = sys.argv[3] if len(sys.argv) > 3 else "dev-test-guest"
|
||
|
|
ITEM = sys.argv[4] if len(sys.argv) > 4 else None # auto-pick first Series if absent
|
||
|
|
OUT = sys.argv[5] if len(sys.argv) > 5 else "/tmp/arrflix-headless"
|
||
|
|
os.makedirs(OUT, exist_ok=True)
|
||
|
|
|
||
|
|
DEVICE = "headless-test"
|
||
|
|
DEVICE_ID = "headless-test-2026-05-09"
|
||
|
|
CLIENT = "Headless"
|
||
|
|
VERSION = "1.0"
|
||
|
|
|
||
|
|
def auth_header(token=None):
|
||
|
|
h = (f'MediaBrowser Client="{CLIENT}", Device="{DEVICE}", '
|
||
|
|
f'DeviceId="{DEVICE_ID}", Version="{VERSION}"')
|
||
|
|
if token:
|
||
|
|
h += f', Token="{token}"'
|
||
|
|
return h
|
||
|
|
|
||
|
|
def api_post(path, body, token=None):
|
||
|
|
req = urllib.request.Request(
|
||
|
|
f"{URL}{path}",
|
||
|
|
data=json.dumps(body).encode(),
|
||
|
|
headers={
|
||
|
|
"Authorization": auth_header(token),
|
||
|
|
"Content-Type": "application/json",
|
||
|
|
},
|
||
|
|
method="POST",
|
||
|
|
)
|
||
|
|
ctx = __import__("ssl")._create_unverified_context()
|
||
|
|
with urllib.request.urlopen(req, context=ctx) as r:
|
||
|
|
return json.loads(r.read())
|
||
|
|
|
||
|
|
def api_get(path, token=None):
|
||
|
|
req = urllib.request.Request(
|
||
|
|
f"{URL}{path}",
|
||
|
|
headers={"Authorization": auth_header(token)},
|
||
|
|
)
|
||
|
|
ctx = __import__("ssl")._create_unverified_context()
|
||
|
|
with urllib.request.urlopen(req, context=ctx) as r:
|
||
|
|
return json.loads(r.read())
|
||
|
|
|
||
|
|
def login():
|
||
|
|
r = api_post("/Users/AuthenticateByName",
|
||
|
|
{"Username": USER, "Pw": PASS})
|
||
|
|
return r["AccessToken"], r["User"]["Id"], r["ServerId"]
|
||
|
|
|
||
|
|
async def main():
|
||
|
|
token, user_id, server_id = login()
|
||
|
|
print(f"[+] Authenticated as {USER} ({user_id})")
|
||
|
|
|
||
|
|
item_id = ITEM
|
||
|
|
if not item_id:
|
||
|
|
items = api_get(
|
||
|
|
f"/Users/{user_id}/Items?Recursive=true&IncludeItemTypes=Series&Limit=5",
|
||
|
|
token)
|
||
|
|
if items.get("Items"):
|
||
|
|
item_id = items["Items"][0]["Id"]
|
||
|
|
print(f"[+] Auto-picked Series: {items['Items'][0]['Name']} ({item_id})")
|
||
|
|
else:
|
||
|
|
print("[!] No series found, falling back to root")
|
||
|
|
|
||
|
|
console_messages = []
|
||
|
|
network_failures = []
|
||
|
|
|
||
|
|
async with async_playwright() as p:
|
||
|
|
browser = await p.chromium.launch(headless=True,
|
||
|
|
args=["--no-sandbox", "--disable-dev-shm-usage"])
|
||
|
|
ctx = await browser.new_context(
|
||
|
|
viewport={"width": 1600, "height": 900},
|
||
|
|
ignore_https_errors=True)
|
||
|
|
page = await ctx.new_page()
|
||
|
|
|
||
|
|
page.on("console", lambda m: console_messages.append(
|
||
|
|
f"[{m.type}] {m.text}"))
|
||
|
|
page.on("requestfailed", lambda r: network_failures.append(
|
||
|
|
f"{r.method} {r.url} :: {r.failure}"))
|
||
|
|
page.on("response", lambda r: None if r.status < 400 else
|
||
|
|
network_failures.append(f"HTTP {r.status} {r.url}"))
|
||
|
|
|
||
|
|
# Auth via login form
|
||
|
|
await page.goto(f"{URL}/web/", wait_until="networkidle", timeout=30000)
|
||
|
|
await asyncio.sleep(3)
|
||
|
|
# Wait for any input rendered by SPA
|
||
|
|
try:
|
||
|
|
await page.wait_for_selector("input", timeout=20000)
|
||
|
|
inputs = await page.evaluate(
|
||
|
|
"() => Array.from(document.querySelectorAll('input')).map(i => ({id:i.id, name:i.name, type:i.type, placeholder:i.placeholder}))")
|
||
|
|
print(f"[*] inputs: {inputs}")
|
||
|
|
# Find username input by heuristic
|
||
|
|
user_sel = None
|
||
|
|
pass_sel = None
|
||
|
|
for i in inputs:
|
||
|
|
fid, fname, ftype = i.get('id',''), i.get('name',''), i.get('type','')
|
||
|
|
if not user_sel and (ftype == 'text' or 'user' in (fid+fname).lower() or 'name' in (fid+fname).lower()):
|
||
|
|
user_sel = f'#{fid}' if fid else f'input[name="{fname}"]'
|
||
|
|
if not pass_sel and ftype == 'password':
|
||
|
|
pass_sel = f'#{fid}' if fid else f'input[name="{fname}"]'
|
||
|
|
print(f"[*] user_sel={user_sel} pass_sel={pass_sel}")
|
||
|
|
if user_sel and pass_sel:
|
||
|
|
await page.fill(user_sel, USER)
|
||
|
|
await page.fill(pass_sel, PASS)
|
||
|
|
await page.keyboard.press("Enter")
|
||
|
|
await page.wait_for_load_state("networkidle", timeout=20000)
|
||
|
|
await asyncio.sleep(2)
|
||
|
|
print("[+] logged in via form")
|
||
|
|
else:
|
||
|
|
print("[!] could not locate login fields")
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[!] form login failed: {e}")
|
||
|
|
|
||
|
|
# Navigate to detail page
|
||
|
|
target = (f"{URL}/web/#/details?id={item_id}&serverId={server_id}"
|
||
|
|
if item_id else f"{URL}/web/")
|
||
|
|
print(f"[*] navigating: {target}")
|
||
|
|
await page.goto(target, wait_until="networkidle", timeout=30000)
|
||
|
|
await asyncio.sleep(4) # let SPA paint backdrop
|
||
|
|
|
||
|
|
# Probe key DOM elements
|
||
|
|
probe = await page.evaluate("""() => {
|
||
|
|
const result = {};
|
||
|
|
const sel = ['.itemBackdrop', '.detailBackdrop', '.backdropContainer',
|
||
|
|
'.backgroundContainer', '.layout-desktop',
|
||
|
|
'body', '#reactRoot', '.itemDetailPage',
|
||
|
|
'video', '.htmlvideoplayer', '.btnPlay', '.detailPagePrimaryContainer'];
|
||
|
|
for (const s of sel) {
|
||
|
|
const el = document.querySelector(s);
|
||
|
|
if (!el) { result[s] = '<absent>'; continue; }
|
||
|
|
const cs = getComputedStyle(el);
|
||
|
|
result[s] = {
|
||
|
|
display: cs.display,
|
||
|
|
opacity: cs.opacity,
|
||
|
|
visibility: cs.visibility,
|
||
|
|
background: cs.backgroundColor,
|
||
|
|
backgroundImage: cs.backgroundImage.slice(0, 80),
|
||
|
|
zIndex: cs.zIndex,
|
||
|
|
rect: el.getBoundingClientRect().toJSON(),
|
||
|
|
};
|
||
|
|
}
|
||
|
|
result['__title'] = document.title;
|
||
|
|
const playBtn = document.querySelector('.btnPlay, [data-action="play"]');
|
||
|
|
result['__playBtnText'] = playBtn ? (playBtn.innerText || playBtn.textContent || '').trim() : null;
|
||
|
|
result['__bodyClasses'] = document.body.className;
|
||
|
|
result['__url'] = location.href;
|
||
|
|
return result;
|
||
|
|
}""")
|
||
|
|
|
||
|
|
screenshot = os.path.join(OUT, f"{URL.replace('https://','').replace('.','_')}-detail.png")
|
||
|
|
await page.screenshot(path=screenshot, full_page=False)
|
||
|
|
print(f"[+] screenshot: {screenshot}")
|
||
|
|
|
||
|
|
with open(os.path.join(OUT, "probe.json"), "w") as f:
|
||
|
|
json.dump({
|
||
|
|
"url": URL,
|
||
|
|
"user": USER,
|
||
|
|
"item": item_id,
|
||
|
|
"probe": probe,
|
||
|
|
"console": console_messages[-50:],
|
||
|
|
"network_failures": network_failures[-50:],
|
||
|
|
}, f, indent=2)
|
||
|
|
print(f"[+] probe.json: {os.path.join(OUT, 'probe.json')}")
|
||
|
|
print(f"[+] console msgs: {len(console_messages)}")
|
||
|
|
print(f"[+] network failures: {len(network_failures)}")
|
||
|
|
|
||
|
|
await browser.close()
|
||
|
|
|
||
|
|
asyncio.run(main())
|