630 lines
25 KiB
Python
630 lines
25 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""ARRFLIX headless smoke-test v2.
|
||
|
|
|
||
|
|
Why v2 exists (see docs/26 INC4 audit):
|
||
|
|
v1 had three coverage gaps that let two regressions ship:
|
||
|
|
- Logged in only as `guest` (non-admin restricted) → admin-only sections
|
||
|
|
like the "More from Season N" carousel never rendered, so the black
|
||
|
|
band behind that carousel was invisible to the test.
|
||
|
|
- Never clicked Play → never observed the <video> element in a real
|
||
|
|
playback state, so AV1+Opus episodes silently rendering black went
|
||
|
|
undetected.
|
||
|
|
- Probed only a hardcoded selector list → any element painting an
|
||
|
|
opaque background outside that list (e.g. a new section wrapper)
|
||
|
|
was never reported.
|
||
|
|
|
||
|
|
v2 closes those gaps:
|
||
|
|
1. Multi-user runs: executes the full probe as BOTH admin and non-admin
|
||
|
|
in the same invocation, writes per-user JSON + screenshots, and
|
||
|
|
reports a DOM-section diff (sections present for one user but not
|
||
|
|
the other → admin-only-visible content).
|
||
|
|
2. Click Play: locates the play button, clicks it, waits 10 s, captures
|
||
|
|
<video> element state (currentTime, paused, error, readyState, dims),
|
||
|
|
plus a video-area screenshot and any new console / network errors.
|
||
|
|
3. Multiple-item coverage: walks an item list (default: HEVC movie + AV1
|
||
|
|
TV episode + H.264 TV episode if available) and runs the full
|
||
|
|
detail-page + play probe for each.
|
||
|
|
4. Section-bg sweep: at scroll-bottom, walks every visible element and
|
||
|
|
reports any with a non-transparent backgroundColor whose bounding rect
|
||
|
|
overlaps where the pinned backdrop should be visible. Output goes
|
||
|
|
into probe.json under "regressions" with an allowlist filter.
|
||
|
|
5. Golden-screenshot diff: if a known-good screenshot exists at
|
||
|
|
OUT/golden/<key>.png, the run computes a Pillow pixel diff and writes
|
||
|
|
<key>-diff.png + a numeric mismatch ratio.
|
||
|
|
6. Structured JSON: probe.json now has top-level shape
|
||
|
|
{url, runs:[{user, item, item_kind, probe, play, regressions, ...}]}
|
||
|
|
so downstream tooling (CI / agents) can parse without grepping.
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
bin/headless-test-v2.py [URL] [OUT_DIR]
|
||
|
|
|
||
|
|
URL defaults to https://dev.arrflix.s8n.ru.
|
||
|
|
OUT_DIR defaults to /tmp/arrflix-headless-v2.
|
||
|
|
|
||
|
|
User credentials are determined automatically from URL:
|
||
|
|
arrflix.s8n.ru → admin=s8n / guest=guest
|
||
|
|
dev.arrflix.s8n.ru → admin=s8n-dev / guest=guest-mirror
|
||
|
|
|
||
|
|
Override via env vars:
|
||
|
|
ADMIN_USER, ADMIN_PASS, GUEST_USER, GUEST_PASS
|
||
|
|
ITEMS=id1,id2,id3 # override default item list
|
||
|
|
|
||
|
|
Default items (chosen for codec coverage):
|
||
|
|
- HEVC movie: 7aa5add2c2d8575eda5280b9b9072071 (The Dark Knight)
|
||
|
|
- AV1 episode: auto-pick first Mike Nolan Show episode
|
||
|
|
- H.264 episode: auto-pick first non-AV1 episode if available
|
||
|
|
|
||
|
|
Exit codes:
|
||
|
|
0 all runs succeeded, no playback errors, no regression bg elements
|
||
|
|
1 setup / login failure
|
||
|
|
2 one or more runs reported playback failure or unallowlisted bg regression
|
||
|
|
"""
|
||
|
|
import sys, json, time, os, asyncio, urllib.request, urllib.error, ssl
|
||
|
|
from pathlib import Path
|
||
|
|
from playwright.async_api import async_playwright
|
||
|
|
|
||
|
|
try:
|
||
|
|
from PIL import Image, ImageChops
|
||
|
|
PIL_OK = True
|
||
|
|
except ImportError:
|
||
|
|
PIL_OK = False
|
||
|
|
|
||
|
|
URL = sys.argv[1] if len(sys.argv) > 1 else "https://dev.arrflix.s8n.ru"
|
||
|
|
OUT = sys.argv[2] if len(sys.argv) > 2 else "/tmp/arrflix-headless-v2"
|
||
|
|
os.makedirs(OUT, exist_ok=True)
|
||
|
|
os.makedirs(os.path.join(OUT, "golden"), exist_ok=True)
|
||
|
|
|
||
|
|
# Default credentials by env (URL → admin/guest)
|
||
|
|
if "dev.arrflix.s8n.ru" in URL:
|
||
|
|
DEFAULT_ADMIN = ("s8n-dev", "2001dude")
|
||
|
|
DEFAULT_GUEST = ("guest-mirror", "dev-test-guest")
|
||
|
|
else:
|
||
|
|
DEFAULT_ADMIN = ("s8n", "2001dude")
|
||
|
|
DEFAULT_GUEST = ("guest", "123")
|
||
|
|
|
||
|
|
ADMIN_USER = os.environ.get("ADMIN_USER", DEFAULT_ADMIN[0])
|
||
|
|
ADMIN_PASS = os.environ.get("ADMIN_PASS", DEFAULT_ADMIN[1])
|
||
|
|
GUEST_USER = os.environ.get("GUEST_USER", DEFAULT_GUEST[0])
|
||
|
|
GUEST_PASS = os.environ.get("GUEST_PASS", DEFAULT_GUEST[1])
|
||
|
|
|
||
|
|
# Default items: HEVC movie known id; TV episodes auto-picked per-user
|
||
|
|
ITEMS_OVERRIDE = os.environ.get("ITEMS", "").strip()
|
||
|
|
DEFAULT_HEVC_MOVIE = "7aa5add2c2d8575eda5280b9b9072071" # Dark Knight
|
||
|
|
MNS_NEEDLE = "mike nolan" # case-insensitive substring of series name for AV1 lookup
|
||
|
|
|
||
|
|
DEVICE = "headless-test-v2"
|
||
|
|
DEVICE_ID = "headless-test-v2-2026-05-09"
|
||
|
|
CLIENT = "HeadlessV2"
|
||
|
|
VERSION = "2.0"
|
||
|
|
|
||
|
|
# Selectors known to legitimately paint solid bg over backdrop area; if a
|
||
|
|
# regression sweep finds a bg element NOT on this list overlapping the
|
||
|
|
# backdrop region, it is flagged. Update intentionally as design changes.
|
||
|
|
BG_ALLOWLIST = {
|
||
|
|
# OSD / video player overlays — fine to be opaque
|
||
|
|
".htmlVideoPlayer", ".videoPlayerContainer", ".osdContent",
|
||
|
|
".upNextDialog", ".dialogContainer", ".dialog",
|
||
|
|
# Modal / dialog scrim layers
|
||
|
|
".dialogBackdrop", ".paperList",
|
||
|
|
# Top app drawer (intentionally opaque)
|
||
|
|
".skinHeader", ".headerTop",
|
||
|
|
}
|
||
|
|
|
||
|
|
# ---------- HTTP helpers (raw API) ----------
|
||
|
|
|
||
|
|
def auth_header(token=None):
|
||
|
|
h = (f'MediaBrowser Client="{CLIENT}", Device="{DEVICE}", '
|
||
|
|
f'DeviceId="{DEVICE_ID}", Version="{VERSION}"')
|
||
|
|
if token:
|
||
|
|
h += f', Token="{token}"'
|
||
|
|
return h
|
||
|
|
|
||
|
|
|
||
|
|
def _req(path, method="GET", body=None, token=None):
|
||
|
|
data = json.dumps(body).encode() if body is not None else None
|
||
|
|
req = urllib.request.Request(
|
||
|
|
f"{URL}{path}",
|
||
|
|
data=data,
|
||
|
|
headers={
|
||
|
|
"Authorization": auth_header(token),
|
||
|
|
"Content-Type": "application/json",
|
||
|
|
},
|
||
|
|
method=method,
|
||
|
|
)
|
||
|
|
ctx = ssl._create_unverified_context()
|
||
|
|
with urllib.request.urlopen(req, context=ctx, timeout=15) as r:
|
||
|
|
raw = r.read()
|
||
|
|
return json.loads(raw) if raw else {}
|
||
|
|
|
||
|
|
|
||
|
|
def login(user, password):
|
||
|
|
return _req("/Users/AuthenticateByName", "POST",
|
||
|
|
{"Username": user, "Pw": password})
|
||
|
|
|
||
|
|
|
||
|
|
def find_av1_episode(token, user_id):
|
||
|
|
"""Find first episode of Mike Nolan Show (or any series matching needle)."""
|
||
|
|
series = _req(
|
||
|
|
f"/Users/{user_id}/Items?Recursive=true&IncludeItemTypes=Series&Limit=200",
|
||
|
|
token=token)
|
||
|
|
target = None
|
||
|
|
for s in series.get("Items", []):
|
||
|
|
if MNS_NEEDLE in s.get("Name", "").lower():
|
||
|
|
target = s
|
||
|
|
break
|
||
|
|
if not target:
|
||
|
|
return None, None
|
||
|
|
eps = _req(
|
||
|
|
f"/Shows/{target['Id']}/Episodes?UserId={user_id}&Limit=1",
|
||
|
|
token=token)
|
||
|
|
if eps.get("Items"):
|
||
|
|
return eps["Items"][0]["Id"], f"{target['Name']} - {eps['Items'][0].get('Name','?')}"
|
||
|
|
return None, None
|
||
|
|
|
||
|
|
|
||
|
|
def find_h264_episode(token, user_id, exclude_series_id=None):
|
||
|
|
"""Auto-pick first episode of any TV series other than the AV1 one."""
|
||
|
|
series = _req(
|
||
|
|
f"/Users/{user_id}/Items?Recursive=true&IncludeItemTypes=Series&Limit=50",
|
||
|
|
token=token)
|
||
|
|
for s in series.get("Items", []):
|
||
|
|
if exclude_series_id and s.get("Id") == exclude_series_id:
|
||
|
|
continue
|
||
|
|
if MNS_NEEDLE in s.get("Name", "").lower():
|
||
|
|
continue
|
||
|
|
eps = _req(
|
||
|
|
f"/Shows/{s['Id']}/Episodes?UserId={user_id}&Limit=1",
|
||
|
|
token=token)
|
||
|
|
if eps.get("Items"):
|
||
|
|
return eps["Items"][0]["Id"], f"{s['Name']} - {eps['Items'][0].get('Name','?')}"
|
||
|
|
return None, None
|
||
|
|
|
||
|
|
|
||
|
|
def resolve_items(token, user_id):
|
||
|
|
"""Return list of [(item_id, label, kind), ...]."""
|
||
|
|
if ITEMS_OVERRIDE:
|
||
|
|
return [(i.strip(), f"override-{n}", "override")
|
||
|
|
for n, i in enumerate(ITEMS_OVERRIDE.split(",")) if i.strip()]
|
||
|
|
out = []
|
||
|
|
# HEVC movie (fixed id)
|
||
|
|
out.append((DEFAULT_HEVC_MOVIE, "Dark Knight (HEVC movie)", "hevc-movie"))
|
||
|
|
# AV1 episode (auto)
|
||
|
|
av1_id, av1_label = find_av1_episode(token, user_id)
|
||
|
|
if av1_id:
|
||
|
|
out.append((av1_id, f"{av1_label} (AV1 ep)", "av1-episode"))
|
||
|
|
# H.264 episode (auto, different series from AV1)
|
||
|
|
series_id_excl = None
|
||
|
|
if av1_id:
|
||
|
|
try:
|
||
|
|
ep = _req(f"/Users/{user_id}/Items/{av1_id}", token=token)
|
||
|
|
series_id_excl = ep.get("SeriesId")
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
h264_id, h264_label = find_h264_episode(token, user_id, series_id_excl)
|
||
|
|
if h264_id:
|
||
|
|
out.append((h264_id, f"{h264_label} (H.264 ep)", "h264-episode"))
|
||
|
|
return out
|
||
|
|
|
||
|
|
|
||
|
|
# ---------- Playwright probe ----------
|
||
|
|
|
||
|
|
PROBE_SELECTORS = [
|
||
|
|
".itemBackdrop", ".detailBackdrop", ".backdropContainer",
|
||
|
|
".backgroundContainer", ".layout-desktop",
|
||
|
|
"body", "#reactRoot", ".itemDetailPage",
|
||
|
|
"video", ".htmlvideoplayer", ".btnPlay",
|
||
|
|
".detailPagePrimaryContainer", ".detailSection", ".detailVerticalSection",
|
||
|
|
".itemsContainer", ".padded-bottom-page", ".mainAnimatedPages",
|
||
|
|
".pageContainer", ".cardScalable", ".scrollSlider",
|
||
|
|
".sectionTitleContainer", ".detailPageContent", ".detailPageWrapperContainer",
|
||
|
|
".moreFromSeason", ".moreFromSeasonContainer", # admin-only carousel
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
async def probe_dom(page):
|
||
|
|
return await page.evaluate(
|
||
|
|
"""(SEL) => {
|
||
|
|
const result = {};
|
||
|
|
for (const s of SEL) {
|
||
|
|
const els = document.querySelectorAll(s);
|
||
|
|
if (!els.length) { result[s] = '<absent>'; continue; }
|
||
|
|
const el = els[0];
|
||
|
|
const cs = getComputedStyle(el);
|
||
|
|
result[s] = {
|
||
|
|
count: els.length,
|
||
|
|
display: cs.display,
|
||
|
|
opacity: cs.opacity,
|
||
|
|
visibility: cs.visibility,
|
||
|
|
background: cs.backgroundColor,
|
||
|
|
backgroundImage: cs.backgroundImage.slice(0, 80),
|
||
|
|
zIndex: cs.zIndex,
|
||
|
|
rect: el.getBoundingClientRect().toJSON(),
|
||
|
|
};
|
||
|
|
}
|
||
|
|
result.__title = document.title;
|
||
|
|
const playBtn = document.querySelector('.btnPlay, [data-action="play"]');
|
||
|
|
result.__playBtnText = playBtn
|
||
|
|
? (playBtn.innerText || playBtn.textContent || '').trim() : null;
|
||
|
|
result.__bodyClasses = document.body.className;
|
||
|
|
result.__url = location.href;
|
||
|
|
// List of all section-title texts so we can diff per-user.
|
||
|
|
result.__sectionTitles = Array.from(
|
||
|
|
document.querySelectorAll('.sectionTitleContainer, h2, .sectionHeader')
|
||
|
|
).map(e => (e.innerText || e.textContent || '').trim()).filter(Boolean);
|
||
|
|
return result;
|
||
|
|
}""",
|
||
|
|
PROBE_SELECTORS,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
async def sweep_backgrounds(page):
|
||
|
|
"""Walk visible elements; return ones with non-transparent bg whose rect
|
||
|
|
overlaps where the pinned backdrop should be visible (top of viewport
|
||
|
|
above ~70% page height). The criterion is intentionally generous —
|
||
|
|
callers filter via the allowlist."""
|
||
|
|
return await page.evaluate(
|
||
|
|
r"""() => {
|
||
|
|
const isOpaque = (c) => {
|
||
|
|
if (!c || c === 'rgba(0, 0, 0, 0)' || c === 'transparent') return false;
|
||
|
|
const m = c.match(/rgba?\(\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)(?:\s*,\s*([\d.]+))?\)/);
|
||
|
|
if (!m) return true;
|
||
|
|
const a = m[4] !== undefined ? parseFloat(m[4]) : 1.0;
|
||
|
|
return a > 0.05;
|
||
|
|
};
|
||
|
|
const out = [];
|
||
|
|
const all = document.querySelectorAll('*');
|
||
|
|
for (const el of all) {
|
||
|
|
const cs = getComputedStyle(el);
|
||
|
|
if (cs.display === 'none' || cs.visibility === 'hidden') continue;
|
||
|
|
if (!isOpaque(cs.backgroundColor)) continue;
|
||
|
|
const r = el.getBoundingClientRect();
|
||
|
|
if (r.width < 50 || r.height < 50) continue;
|
||
|
|
// Skip if bg is already same as body (chained inheritance, no diff)
|
||
|
|
if (el === document.body || el === document.documentElement) continue;
|
||
|
|
// Build a signature so consumers can match against allowlist
|
||
|
|
const cls = (el.className && typeof el.className === 'string')
|
||
|
|
? '.' + el.className.trim().split(/\s+/).join('.')
|
||
|
|
: '';
|
||
|
|
const sig = el.tagName.toLowerCase() + (el.id ? '#' + el.id : '') + cls;
|
||
|
|
out.push({
|
||
|
|
sig: sig.slice(0, 200),
|
||
|
|
tag: el.tagName.toLowerCase(),
|
||
|
|
id: el.id || null,
|
||
|
|
classes: (typeof el.className === 'string') ? el.className : '',
|
||
|
|
background: cs.backgroundColor,
|
||
|
|
rect: { x: r.x, y: r.y, w: r.width, h: r.height },
|
||
|
|
zIndex: cs.zIndex,
|
||
|
|
});
|
||
|
|
}
|
||
|
|
return out;
|
||
|
|
}"""
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def filter_regressions(bg_elements, viewport_w, viewport_h):
|
||
|
|
"""Apply allowlist + overlap heuristics → regression list.
|
||
|
|
|
||
|
|
A bg element is flagged iff:
|
||
|
|
- It is NOT in the allowlist (any allowlist class appears in its sig).
|
||
|
|
- Its rect overlaps the visible viewport (x within [0, vw], y within
|
||
|
|
a band where backdrop should show — i.e. above 80% page height
|
||
|
|
because content scrolls past pinned backdrop).
|
||
|
|
- The bg color is "very dark" (R+G+B < 90). Most legit overlays
|
||
|
|
are clearly tinted; near-black is the failure mode we want.
|
||
|
|
"""
|
||
|
|
regressions = []
|
||
|
|
for el in bg_elements:
|
||
|
|
sig = el["sig"]
|
||
|
|
if any(allow in sig for allow in BG_ALLOWLIST):
|
||
|
|
continue
|
||
|
|
bg = el["background"]
|
||
|
|
# Parse rgb sum
|
||
|
|
try:
|
||
|
|
nums = [int(x) for x in bg.replace("rgba(", "").replace("rgb(", "")
|
||
|
|
.replace(")", "").split(",")[:3]]
|
||
|
|
except Exception:
|
||
|
|
continue
|
||
|
|
if sum(nums) > 90:
|
||
|
|
continue
|
||
|
|
rect = el["rect"]
|
||
|
|
if rect["x"] + rect["w"] < 0 or rect["x"] > viewport_w:
|
||
|
|
continue
|
||
|
|
regressions.append(el)
|
||
|
|
return regressions
|
||
|
|
|
||
|
|
|
||
|
|
async def click_play_and_observe(page):
|
||
|
|
"""Find Play, click, wait 10s, return playback state + new errors."""
|
||
|
|
pre_console_marker = await page.evaluate("() => Date.now()")
|
||
|
|
state = {"clicked": False, "selector_used": None, "error": None}
|
||
|
|
# Try the canonical button selectors in priority order
|
||
|
|
for sel in [".btnPlay", "[data-action=\"play\"]", "button[is=\"emby-button\"][data-action=\"play\"]"]:
|
||
|
|
try:
|
||
|
|
btn = await page.query_selector(sel)
|
||
|
|
if btn:
|
||
|
|
box = await btn.bounding_box()
|
||
|
|
if box and box["width"] > 0:
|
||
|
|
await btn.click(timeout=5000)
|
||
|
|
state["clicked"] = True
|
||
|
|
state["selector_used"] = sel
|
||
|
|
break
|
||
|
|
except Exception as e:
|
||
|
|
state["error"] = f"{sel}: {e}"
|
||
|
|
if not state["clicked"]:
|
||
|
|
# Fallback: keyboard 'p' which Jellyfin web binds to play
|
||
|
|
try:
|
||
|
|
await page.keyboard.press("p")
|
||
|
|
state["clicked"] = True
|
||
|
|
state["selector_used"] = "kbd:p"
|
||
|
|
except Exception as e:
|
||
|
|
state["error"] = (state.get("error") or "") + f"; kbd:{e}"
|
||
|
|
return state
|
||
|
|
|
||
|
|
await asyncio.sleep(10)
|
||
|
|
|
||
|
|
state["video"] = await page.evaluate("""() => {
|
||
|
|
const v = document.querySelector('video');
|
||
|
|
if (!v) return { present: false };
|
||
|
|
const rect = v.getBoundingClientRect();
|
||
|
|
return {
|
||
|
|
present: true,
|
||
|
|
src: (v.src || '').slice(0, 200),
|
||
|
|
currentTime: v.currentTime,
|
||
|
|
paused: v.paused,
|
||
|
|
ended: v.ended,
|
||
|
|
readyState: v.readyState,
|
||
|
|
networkState: v.networkState,
|
||
|
|
error: v.error ? { code: v.error.code, message: v.error.message } : null,
|
||
|
|
videoWidth: v.videoWidth,
|
||
|
|
videoHeight: v.videoHeight,
|
||
|
|
duration: v.duration,
|
||
|
|
rect: { x: rect.x, y: rect.y, w: rect.width, h: rect.height },
|
||
|
|
buffered_ranges: v.buffered.length,
|
||
|
|
};
|
||
|
|
}""")
|
||
|
|
state["pre_marker"] = pre_console_marker
|
||
|
|
return state
|
||
|
|
|
||
|
|
|
||
|
|
async def run_one(p, user, password, role, run_idx, console_messages, network_failures):
|
||
|
|
"""Execute the full probe sequence for one user. Returns dict for JSON."""
|
||
|
|
print(f"\n=== Run {run_idx}: {role} ({user}) ===")
|
||
|
|
auth = login(user, password)
|
||
|
|
token = auth["AccessToken"]
|
||
|
|
user_id = auth["User"]["Id"]
|
||
|
|
server_id = auth["ServerId"]
|
||
|
|
is_admin = auth["User"].get("Policy", {}).get("IsAdministrator", False)
|
||
|
|
print(f"[+] Auth OK uid={user_id} admin={is_admin}")
|
||
|
|
|
||
|
|
items = resolve_items(token, user_id)
|
||
|
|
if not items:
|
||
|
|
print("[!] No items resolvable — aborting run")
|
||
|
|
return {"role": role, "user": user, "is_admin": is_admin, "items": [],
|
||
|
|
"error": "no items"}
|
||
|
|
|
||
|
|
print(f"[+] Items: {[(i[1], i[2]) for i in items]}")
|
||
|
|
|
||
|
|
runs = []
|
||
|
|
browser = await p.chromium.launch(
|
||
|
|
headless=True,
|
||
|
|
args=["--no-sandbox", "--disable-dev-shm-usage",
|
||
|
|
"--autoplay-policy=no-user-gesture-required"])
|
||
|
|
ctx = await browser.new_context(
|
||
|
|
viewport={"width": 1600, "height": 900},
|
||
|
|
ignore_https_errors=True)
|
||
|
|
page = await ctx.new_page()
|
||
|
|
|
||
|
|
page.on("console", lambda m: console_messages.append(
|
||
|
|
{"role": role, "user": user, "type": m.type, "text": m.text}))
|
||
|
|
page.on("requestfailed", lambda r: network_failures.append(
|
||
|
|
{"role": role, "user": user, "method": r.method, "url": r.url,
|
||
|
|
"failure": str(r.failure)}))
|
||
|
|
page.on("response", lambda r: None if r.status < 400 else
|
||
|
|
network_failures.append({"role": role, "user": user, "status": r.status,
|
||
|
|
"url": r.url}))
|
||
|
|
|
||
|
|
# --- form login (mirrors v1) ---
|
||
|
|
await page.goto(f"{URL}/web/", wait_until="networkidle", timeout=30000)
|
||
|
|
await asyncio.sleep(3)
|
||
|
|
try:
|
||
|
|
await page.wait_for_selector("input", timeout=20000)
|
||
|
|
inputs = await page.evaluate(
|
||
|
|
"() => Array.from(document.querySelectorAll('input')).map(i => "
|
||
|
|
"({id:i.id, name:i.name, type:i.type, placeholder:i.placeholder}))")
|
||
|
|
user_sel = pass_sel = None
|
||
|
|
for i in inputs:
|
||
|
|
fid, fname, ftype = i.get("id", ""), i.get("name", ""), i.get("type", "")
|
||
|
|
if not user_sel and (ftype == "text" or "user" in (fid+fname).lower()
|
||
|
|
or "name" in (fid+fname).lower()):
|
||
|
|
user_sel = f"#{fid}" if fid else f'input[name="{fname}"]'
|
||
|
|
if not pass_sel and ftype == "password":
|
||
|
|
pass_sel = f"#{fid}" if fid else f'input[name="{fname}"]'
|
||
|
|
if user_sel and pass_sel:
|
||
|
|
await page.fill(user_sel, user)
|
||
|
|
await page.fill(pass_sel, password)
|
||
|
|
await page.keyboard.press("Enter")
|
||
|
|
await page.wait_for_load_state("networkidle", timeout=20000)
|
||
|
|
await asyncio.sleep(2)
|
||
|
|
print(f"[+] form login OK as {user}")
|
||
|
|
else:
|
||
|
|
print("[!] login fields not found — continuing with API token")
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[!] form login failed: {e}")
|
||
|
|
|
||
|
|
for item_id, label, kind in items:
|
||
|
|
target = f"{URL}/web/#/details?id={item_id}&serverId={server_id}"
|
||
|
|
print(f"\n[*] {role}/{kind}: {label} → {target}")
|
||
|
|
await page.goto(target, wait_until="networkidle", timeout=30000)
|
||
|
|
await asyncio.sleep(4)
|
||
|
|
|
||
|
|
probe = await probe_dom(page)
|
||
|
|
viewport = page.viewport_size
|
||
|
|
vw, vh = viewport["width"], viewport["height"]
|
||
|
|
|
||
|
|
# Top + scrolled screenshots
|
||
|
|
safe_user = user.replace("@", "_").replace("/", "_")
|
||
|
|
key = f"{safe_user}-{kind}"
|
||
|
|
top_png = os.path.join(OUT, f"{key}-top.png")
|
||
|
|
await page.screenshot(path=top_png, full_page=False)
|
||
|
|
await page.evaluate("() => window.scrollTo(0, document.body.scrollHeight * 0.5)")
|
||
|
|
await asyncio.sleep(1)
|
||
|
|
mid_png = os.path.join(OUT, f"{key}-mid.png")
|
||
|
|
await page.screenshot(path=mid_png, full_page=False)
|
||
|
|
await page.evaluate("() => window.scrollTo(0, document.body.scrollHeight)")
|
||
|
|
await asyncio.sleep(1)
|
||
|
|
bot_png = os.path.join(OUT, f"{key}-bot.png")
|
||
|
|
await page.screenshot(path=bot_png, full_page=False)
|
||
|
|
|
||
|
|
# Background sweep at scroll-bottom (where INC4-style bands manifest)
|
||
|
|
bg_elements = await sweep_backgrounds(page)
|
||
|
|
regressions = filter_regressions(bg_elements, vw, vh)
|
||
|
|
print(f"[*] bg elements: {len(bg_elements)} regressions: {len(regressions)}")
|
||
|
|
|
||
|
|
# Click Play and observe
|
||
|
|
await page.evaluate("() => window.scrollTo(0, 0)")
|
||
|
|
await asyncio.sleep(1)
|
||
|
|
play_state = await click_play_and_observe(page)
|
||
|
|
play_png = os.path.join(OUT, f"{key}-play.png")
|
||
|
|
await page.screenshot(path=play_png, full_page=False)
|
||
|
|
|
||
|
|
# Diff vs golden
|
||
|
|
diffs = []
|
||
|
|
for shot in [(top_png, "top"), (mid_png, "mid"), (bot_png, "bot"),
|
||
|
|
(play_png, "play")]:
|
||
|
|
golden = os.path.join(OUT, "golden", f"{key}-{shot[1]}.png")
|
||
|
|
if PIL_OK and os.path.exists(golden):
|
||
|
|
try:
|
||
|
|
a = Image.open(shot[0]).convert("RGB")
|
||
|
|
b = Image.open(golden).convert("RGB")
|
||
|
|
if a.size != b.size:
|
||
|
|
diffs.append({"shot": shot[1], "error": "size mismatch"})
|
||
|
|
continue
|
||
|
|
diff_img = ImageChops.difference(a, b)
|
||
|
|
bbox = diff_img.getbbox()
|
||
|
|
diff_path = os.path.join(OUT, f"{key}-{shot[1]}-diff.png")
|
||
|
|
diff_img.save(diff_path)
|
||
|
|
# Numeric mismatch ratio
|
||
|
|
hist = diff_img.histogram()
|
||
|
|
nonzero = sum(hist[i] for i in range(1, 256))
|
||
|
|
total = a.size[0] * a.size[1] * 3
|
||
|
|
ratio = nonzero / total if total else 0
|
||
|
|
diffs.append({"shot": shot[1], "bbox": bbox, "ratio": ratio,
|
||
|
|
"diff_path": diff_path})
|
||
|
|
except Exception as e:
|
||
|
|
diffs.append({"shot": shot[1], "error": str(e)})
|
||
|
|
|
||
|
|
runs.append({
|
||
|
|
"item_id": item_id, "label": label, "kind": kind,
|
||
|
|
"screenshots": {"top": top_png, "mid": mid_png, "bot": bot_png,
|
||
|
|
"play": play_png},
|
||
|
|
"probe": probe,
|
||
|
|
"play": play_state,
|
||
|
|
"bg_count": len(bg_elements),
|
||
|
|
"regressions": regressions,
|
||
|
|
"diffs_vs_golden": diffs,
|
||
|
|
})
|
||
|
|
|
||
|
|
await browser.close()
|
||
|
|
return {"role": role, "user": user, "is_admin": is_admin,
|
||
|
|
"items": runs}
|
||
|
|
|
||
|
|
|
||
|
|
def section_title_diff(admin_run, guest_run):
|
||
|
|
"""Return sections present for admin but not guest (admin-only carousels)."""
|
||
|
|
diffs = []
|
||
|
|
a_items = {i["kind"]: i for i in admin_run.get("items", [])}
|
||
|
|
g_items = {i["kind"]: i for i in guest_run.get("items", [])}
|
||
|
|
for kind in a_items:
|
||
|
|
if kind not in g_items:
|
||
|
|
continue
|
||
|
|
a_titles = set(a_items[kind].get("probe", {}).get("__sectionTitles", []))
|
||
|
|
g_titles = set(g_items[kind].get("probe", {}).get("__sectionTitles", []))
|
||
|
|
only_admin = sorted(a_titles - g_titles)
|
||
|
|
only_guest = sorted(g_titles - a_titles)
|
||
|
|
if only_admin or only_guest:
|
||
|
|
diffs.append({"kind": kind, "only_admin": only_admin,
|
||
|
|
"only_guest": only_guest})
|
||
|
|
return diffs
|
||
|
|
|
||
|
|
|
||
|
|
def grade(result):
|
||
|
|
"""Decide pass/fail. Returns (exit_code, summary)."""
|
||
|
|
issues = []
|
||
|
|
for run in result["runs"]:
|
||
|
|
for item in run.get("items", []):
|
||
|
|
v = item.get("play", {}).get("video", {})
|
||
|
|
if not v.get("present"):
|
||
|
|
issues.append(f"{run['user']}/{item['kind']}: <video> absent")
|
||
|
|
elif v.get("error"):
|
||
|
|
issues.append(f"{run['user']}/{item['kind']}: video error "
|
||
|
|
f"code={v['error'].get('code')}")
|
||
|
|
elif v.get("readyState", 0) < 2:
|
||
|
|
issues.append(f"{run['user']}/{item['kind']}: video readyState="
|
||
|
|
f"{v.get('readyState')} (no current data)")
|
||
|
|
elif v.get("paused") and v.get("currentTime", 0) == 0:
|
||
|
|
issues.append(f"{run['user']}/{item['kind']}: video paused at t=0")
|
||
|
|
if item.get("regressions"):
|
||
|
|
issues.append(f"{run['user']}/{item['kind']}: "
|
||
|
|
f"{len(item['regressions'])} bg regression(s)")
|
||
|
|
return (2 if issues else 0, issues)
|
||
|
|
|
||
|
|
|
||
|
|
async def main():
|
||
|
|
console_messages = []
|
||
|
|
network_failures = []
|
||
|
|
print(f"[+] Target: {URL}")
|
||
|
|
print(f"[+] OUT: {OUT}")
|
||
|
|
print(f"[+] Admin: {ADMIN_USER}")
|
||
|
|
print(f"[+] Guest: {GUEST_USER}")
|
||
|
|
|
||
|
|
async with async_playwright() as p:
|
||
|
|
admin_run = await run_one(p, ADMIN_USER, ADMIN_PASS, "admin", 1,
|
||
|
|
console_messages, network_failures)
|
||
|
|
guest_run = await run_one(p, GUEST_USER, GUEST_PASS, "guest", 2,
|
||
|
|
console_messages, network_failures)
|
||
|
|
|
||
|
|
section_diff = section_title_diff(admin_run, guest_run)
|
||
|
|
|
||
|
|
result = {
|
||
|
|
"url": URL,
|
||
|
|
"timestamp": int(time.time()),
|
||
|
|
"runs": [admin_run, guest_run],
|
||
|
|
"section_title_diff": section_diff,
|
||
|
|
"console": console_messages[-200:],
|
||
|
|
"network_failures": network_failures[-200:],
|
||
|
|
}
|
||
|
|
|
||
|
|
code, issues = grade(result)
|
||
|
|
result["issues"] = issues
|
||
|
|
result["exit_code"] = code
|
||
|
|
|
||
|
|
with open(os.path.join(OUT, "probe.json"), "w") as f:
|
||
|
|
json.dump(result, f, indent=2, default=str)
|
||
|
|
|
||
|
|
print(f"\n=== SUMMARY ===")
|
||
|
|
print(f"console: {len(console_messages)} network failures: {len(network_failures)}")
|
||
|
|
print(f"section diffs: {len(section_diff)}")
|
||
|
|
if section_diff:
|
||
|
|
for d in section_diff:
|
||
|
|
if d["only_admin"]:
|
||
|
|
print(f" admin-only ({d['kind']}): {d['only_admin']}")
|
||
|
|
if issues:
|
||
|
|
print(f"ISSUES ({len(issues)}):")
|
||
|
|
for i in issues:
|
||
|
|
print(f" - {i}")
|
||
|
|
else:
|
||
|
|
print("no issues detected")
|
||
|
|
print(f"probe.json: {os.path.join(OUT, 'probe.json')}")
|
||
|
|
sys.exit(code)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
try:
|
||
|
|
asyncio.run(main())
|
||
|
|
except urllib.error.HTTPError as e:
|
||
|
|
print(f"[!] HTTP error during login: {e}")
|
||
|
|
sys.exit(1)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[!] fatal: {e}")
|
||
|
|
raise
|