from DrissionPage import Chromium, ChromiumOptions from DrissionPage.errors import PageDisconnectedError import argparse import time import os import secrets import sys from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError import threading from openai_register import get_email_and_token, get_oai_code def ensure_stable_python_runtime(): # 优先自动切到更稳定的 3.12 / 3.13,避免 3.14 下 Mail.tm 偶发 TLS/兼容问题。 if sys.version_info < (3, 14) or os.environ.get("DPE_REEXEC_DONE") == "1": return local_app_data = os.environ.get("LOCALAPPDATA", "") candidates = [ os.path.join(local_app_data, "Programs", "Python", "Python312", "python.exe"), os.path.join(local_app_data, "Programs", "Python", "Python313", "python.exe"), ] current_python = os.path.normcase(os.path.abspath(sys.executable)) for candidate in candidates: if not os.path.isfile(candidate): continue if os.path.normcase(os.path.abspath(candidate)) == current_python: return print(f"[*] 检测到 Python {sys.version.split()[0]},自动切换到更稳定的解释器: {candidate}") env = os.environ.copy() env["DPE_REEXEC_DONE"] = "1" os.execve(candidate, [candidate, os.path.abspath(__file__), *sys.argv[1:]], env) def warn_runtime_compatibility(): # 中文提示:避免把底层 TLS 兼容问题误判成脚本逻辑错误。 if sys.version_info >= (3, 14): print("[提示] 当前 Python 为 3.14+;若出现 Mail.tm TLS 异常,建议改用 Python 3.12 或 3.13。") ensure_stable_python_runtime() warn_runtime_compatibility() co = ChromiumOptions() co.auto_port() co.set_timeouts(base=1) # 加载修复 MouseEvent.screenX / screenY 的扩展。 EXTENSION_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "turnstilePatch")) co.add_extension(EXTENSION_PATH) # 保持可视化浏览器模式,方便直接观察注册过程。 browser = None page = None SIGNUP_URL = "https://accounts.x.ai/sign-up?redirect=grok-com" DEFAULT_SSO_FILE = os.path.join(os.path.dirname(__file__), "sso.txt") def start_browser(): # 每轮从全新浏览器开始,降低长时间复用带来的页面与 Cookie 污染。 global browser, page browser = None page = None def _create(): global browser try: browser = Chromium(co) except Exception: browser = None t = threading.Thread(target=_create) t.daemon = True t.start() t.join(timeout=15) if browser is None: raise Exception("启动浏览器超时(>15秒)") tabs = browser.get_tabs() page = tabs[-1] if tabs else browser.new_tab() return browser, page def stop_browser(): # 完整关闭整个浏览器实例,供下一轮重新拉起。 global browser, page if browser is not None: def _safe_quit(): try: browser.quit() except Exception: pass t = threading.Thread(target=_safe_quit) t.daemon = True t.start() t.join(timeout=5) # 最多等 5 秒,避免卡死 browser = None page = None def restart_browser(): # 用户要求不要长期复用同一浏览器,因此每轮结束都重启整个实例。 stop_browser() time.sleep(1.5) start_browser() def refresh_active_page(): # 验证码确认后页面会跳转,旧 page 句柄可能断开,这里统一重新获取当前活动标签页。 global browser, page if browser is None: start_browser() try: tabs = browser.get_tabs() if tabs: page = tabs[-1] else: page = browser.new_tab() except Exception: restart_browser() return page def open_signup_page(): # 每轮开始时打开注册页,并切到“使用邮箱注册”流程。 global page refresh_active_page() try: page.get(SIGNUP_URL) except Exception: refresh_active_page() page = browser.new_tab(SIGNUP_URL) click_email_signup_button() def close_current_page(): # 兼容旧调用名,实际行为改为整轮重启浏览器。 restart_browser() def has_profile_form(): # 最终注册页只要出现姓名和密码输入框,就认为已经成功进入资料填写阶段。 refresh_active_page() try: return bool(page.run_js( """ const givenInput = document.querySelector('input[data-testid="givenName"], input[name="givenName"], input[autocomplete="given-name"]'); const familyInput = document.querySelector('input[data-testid="familyName"], input[name="familyName"], input[autocomplete="family-name"]'); const passwordInput = document.querySelector('input[data-testid="password"], input[name="password"], input[type="password"]'); return !!(givenInput && familyInput && passwordInput); """ )) except Exception: return False def click_email_signup_button(timeout=10): # 页面打开后,自动点击“使用邮箱注册”按钮。 deadline = time.time() + timeout while time.time() < deadline: clicked = page.run_js(r""" const candidates = Array.from(document.querySelectorAll('button, a, [role="button"]')); const target = candidates.find((node) => { const text = (node.innerText || node.textContent || '').replace(/\s+/g, ''); return text.includes('使用邮箱注册'); }); if (!target) { return false; } target.click(); return true; """) if clicked: return True time.sleep(0.5) raise Exception('未找到“使用邮箱注册”按钮') def fill_email_and_submit(timeout=15): # 复用 `openai_register.py` 里的邮箱获取逻辑,保留邮箱与 token 供后续验证码步骤继续使用。 email, dev_token = get_email_and_token() if not email or not dev_token: raise Exception("获取邮箱失败") deadline = time.time() + timeout while time.time() < deadline: filled = page.run_js( """ const email = arguments[0]; function isVisible(node) { if (!node) { return false; } const style = window.getComputedStyle(node); if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') { return false; } const rect = node.getBoundingClientRect(); return rect.width > 0 && rect.height > 0; } const input = Array.from(document.querySelectorAll('input[data-testid="email"], input[name="email"], input[type="email"], input[autocomplete="email"]')).find((node) => { return isVisible(node) && !node.disabled && !node.readOnly; }) || null; if (!input) { return 'not-ready'; } input.focus(); input.click(); // 不能只写 `input.value = xxx`,否则 React / 受控表单可能没有同步内部状态。 const valueSetter = Object.getOwnPropertyDescriptor(HTMLInputElement.prototype, 'value')?.set; const tracker = input._valueTracker; if (tracker) { tracker.setValue(''); } if (valueSetter) { valueSetter.call(input, email); } else { input.value = email; } input.dispatchEvent(new InputEvent('beforeinput', { bubbles: true, data: email, inputType: 'insertText', })); input.dispatchEvent(new InputEvent('input', { bubbles: true, data: email, inputType: 'insertText', })); input.dispatchEvent(new Event('change', { bubbles: true })); if ((input.value || '').trim() !== email || !input.checkValidity()) { return false; } input.blur(); return 'filled'; """, email, ) if filled == 'not-ready': time.sleep(0.5) continue if filled != 'filled': print(f"[Debug] 邮箱输入框已出现,但写入失败: {filled}") time.sleep(0.5) continue if filled == 'filled': time.sleep(0.8) clicked = page.run_js( r""" function isVisible(node) { if (!node) { return false; } const style = window.getComputedStyle(node); if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') { return false; } const rect = node.getBoundingClientRect(); return rect.width > 0 && rect.height > 0; } const input = Array.from(document.querySelectorAll('input[data-testid="email"], input[name="email"], input[type="email"], input[autocomplete="email"]')).find((node) => { return isVisible(node) && !node.disabled && !node.readOnly; }) || null; if (!input || !input.checkValidity() || !(input.value || '').trim()) { return false; } const buttons = Array.from(document.querySelectorAll('button[type="submit"], button')).filter((node) => { return isVisible(node) && !node.disabled && node.getAttribute('aria-disabled') !== 'true'; }); const submitButton = buttons.find((node) => { const text = (node.innerText || node.textContent || '').replace(/\s+/g, ''); return text === '注册' || text.includes('注册'); }); if (!submitButton || submitButton.disabled) { return false; } submitButton.click(); return true; """ ) if clicked: print(f"[*] 已填写邮箱并点击注册: {email}") return email, dev_token time.sleep(0.5) raise Exception("未找到邮箱输入框或注册按钮") def fill_code_and_submit(email, dev_token, timeout=180): # 复用 `openai_register.py` 里的验证码轮询逻辑,等待邮件到达后自动填写 OTP。 code = get_oai_code(dev_token, email) if not code: raise Exception("获取验证码失败") deadline = time.time() + timeout while time.time() < deadline: try: filled = page.run_js( """ const code = String(arguments[0] || '').trim(); function isVisible(node) { if (!node) { return false; } const style = window.getComputedStyle(node); if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') { return false; } const rect = node.getBoundingClientRect(); return rect.width > 0 && rect.height > 0; } function setNativeValue(input, value) { const nativeInputValueSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value')?.set; const tracker = input._valueTracker; if (tracker) { tracker.setValue(''); } if (nativeInputValueSetter) { nativeInputValueSetter.call(input, ''); nativeInputValueSetter.call(input, value); } else { input.value = ''; input.value = value; } } function dispatchInputEvents(input, value) { input.dispatchEvent(new InputEvent('beforeinput', { bubbles: true, cancelable: true, data: value, inputType: 'insertText', })); input.dispatchEvent(new InputEvent('input', { bubbles: true, cancelable: true, data: value, inputType: 'insertText', })); input.dispatchEvent(new Event('change', { bubbles: true })); } const input = Array.from(document.querySelectorAll('input[data-input-otp="true"], input[name="code"], input[autocomplete="one-time-code"], input[inputmode="numeric"], input[inputmode="text"]')).find((node) => { return isVisible(node) && !node.disabled && !node.readOnly && Number(node.maxLength || code.length || 6) > 1; }) || null; const otpBoxes = Array.from(document.querySelectorAll('input')).filter((node) => { if (!isVisible(node) || node.disabled || node.readOnly) { return false; } const maxLength = Number(node.maxLength || 0); const autocomplete = String(node.autocomplete || '').toLowerCase(); return maxLength === 1 || autocomplete === 'one-time-code'; }); if (!input && otpBoxes.length < code.length) { return 'not-ready'; } if (input) { input.focus(); input.click(); setNativeValue(input, code); dispatchInputEvents(input, code); const normalizedValue = String(input.value || '').trim(); const expectedLength = Number(input.maxLength || code.length || 6); const slots = Array.from(document.querySelectorAll('[data-input-otp-slot="true"]')); const filledSlots = slots.filter((slot) => (slot.textContent || '').trim()).length; if (normalizedValue !== code) { // 聚合输入框写入失败,尝试回退到分格输入方式 if (otpBoxes.length >= code.length) { const orderedBoxes = otpBoxes.slice(0, code.length); for (let i = 0; i < orderedBoxes.length; i += 1) { const box = orderedBoxes[i]; const char = code[i] || ''; box.focus(); box.click(); setNativeValue(box, char); dispatchInputEvents(box, char); box.dispatchEvent(new KeyboardEvent('keydown', { bubbles: true, key: char })); box.dispatchEvent(new KeyboardEvent('keyup', { bubbles: true, key: char })); box.blur(); } const merged = orderedBoxes.map((node) => String(node.value || '').trim()).join(''); return merged === code ? 'filled' : 'box-mismatch'; } return 'aggregate-mismatch'; } if (expectedLength > 0 && normalizedValue.length !== expectedLength) { return 'aggregate-length-mismatch'; } if (slots.length && filledSlots && filledSlots !== normalizedValue.length) { return 'aggregate-slot-mismatch'; } input.blur(); return 'filled'; } const orderedBoxes = otpBoxes.slice(0, code.length); for (let i = 0; i < orderedBoxes.length; i += 1) { const box = orderedBoxes[i]; const char = code[i] || ''; box.focus(); box.click(); setNativeValue(box, char); dispatchInputEvents(box, char); box.dispatchEvent(new KeyboardEvent('keydown', { bubbles: true, key: char })); box.dispatchEvent(new KeyboardEvent('keyup', { bubbles: true, key: char })); box.blur(); } const merged = orderedBoxes.map((node) => String(node.value || '').trim()).join(''); return merged === code ? 'filled' : 'box-mismatch'; """, code, ) except PageDisconnectedError: # 点击确认邮箱后如果刚好发生跳转,旧页面句柄会断开;此时切到新页继续判断即可。 refresh_active_page() if has_profile_form(): print("[*] 验证码提交后已跳转到最终注册页。") return code time.sleep(1) continue if filled == 'not-ready': if has_profile_form(): print("[*] 已直接进入最终注册页,跳过验证码按钮确认。") return code time.sleep(0.5) continue if filled != 'filled': print(f"[Debug] 验证码输入框已出现,但写入失败: {filled}") time.sleep(0.5) continue if filled == 'filled': time.sleep(1.2) try: clicked = page.run_js( r""" function isVisible(node) { if (!node) { return false; } const style = window.getComputedStyle(node); if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') { return false; } const rect = node.getBoundingClientRect(); return rect.width > 0 && rect.height > 0; } const aggregateInput = Array.from(document.querySelectorAll('input[data-input-otp="true"], input[name="code"], input[autocomplete="one-time-code"], input[inputmode="numeric"], input[inputmode="text"]')).find((node) => { return isVisible(node) && !node.disabled && !node.readOnly && Number(node.maxLength || 0) > 1; }) || null; let value = ''; if (aggregateInput) { value = String(aggregateInput.value || '').trim(); const expectedLength = Number(aggregateInput.maxLength || value.length || 6); if (!value || (expectedLength > 0 && value.length !== expectedLength)) { return false; } const slots = Array.from(document.querySelectorAll('[data-input-otp-slot="true"]')); if (slots.length) { const filledSlots = slots.filter((slot) => (slot.textContent || '').trim()).length; if (filledSlots && filledSlots !== value.length) { return false; } } } else { const otpBoxes = Array.from(document.querySelectorAll('input')).filter((node) => { if (!isVisible(node) || node.disabled || node.readOnly) { return false; } const maxLength = Number(node.maxLength || 0); const autocomplete = String(node.autocomplete || '').toLowerCase(); return maxLength === 1 || autocomplete === 'one-time-code'; }); value = otpBoxes.map((node) => String(node.value || '').trim()).join(''); if (!value || value.length < 6) { return false; } } const buttons = Array.from(document.querySelectorAll('button[type="submit"], button')).filter((node) => { return isVisible(node) && !node.disabled && node.getAttribute('aria-disabled') !== 'true'; }); const confirmButton = buttons.find((node) => { const text = (node.innerText || node.textContent || '').replace(/\s+/g, ''); return text === '确认邮箱' || text.includes('确认邮箱') || text === '继续' || text.includes('继续') || text === '下一步' || text.includes('下一步'); }); if (!confirmButton) { return 'no-button'; } confirmButton.focus(); confirmButton.click(); return 'clicked'; """ ) except PageDisconnectedError: refresh_active_page() if has_profile_form(): print("[*] 确认邮箱后页面跳转成功,已进入最终注册页。") return code clicked = 'disconnected' if clicked == 'clicked': print(f"[*] 已填写验证码并点击确认邮箱: {code}") time.sleep(2) refresh_active_page() if has_profile_form(): print("[*] 验证码确认完成,最终注册页已就绪。") return code if clicked == 'no-button': current_url = page.url if 'sign-up' in current_url or 'signup' in current_url: print(f"[*] 已填写验证码,页面已自动跳转到下一步: {current_url}") return code if clicked == 'disconnected': time.sleep(1) continue time.sleep(0.5) debug_snapshot = page.run_js( r""" function isVisible(node) { if (!node) { return false; } const style = window.getComputedStyle(node); if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') { return false; } const rect = node.getBoundingClientRect(); return rect.width > 0 && rect.height > 0; } const inputs = Array.from(document.querySelectorAll('input')).filter(isVisible).map((node) => ({ type: node.type || '', name: node.name || '', testid: node.getAttribute('data-testid') || '', autocomplete: node.autocomplete || '', maxLength: Number(node.maxLength || 0), value: String(node.value || ''), })); const buttons = Array.from(document.querySelectorAll('button')).filter(isVisible).map((node) => ({ text: String(node.innerText || node.textContent || '').replace(/\s+/g, ' ').trim(), disabled: !!node.disabled, ariaDisabled: node.getAttribute('aria-disabled') || '', })); return { url: location.href, inputs, buttons }; """ ) print(f"[Debug] 验证码页 DOM 摘要: {debug_snapshot}") raise Exception("未找到验证码输入框或确认邮箱按钮") def getTurnstileToken(): # 复用现有 turnstile 处理逻辑,在最终注册页需要时再触发。 try: page.run_js("try { turnstile.reset() } catch(e) { }") except Exception: pass turnstileResponse = None for i in range(0, 15): try: turnstileResponse = page.run_js("try { return turnstile.getResponse() } catch(e) { return null }") if turnstileResponse: return turnstileResponse # 先尝试直接点击页面主文档中可见的 Turnstile 容器/iframe page.run_js(""" const box = document.querySelector('.cf-turnstile, .turnstile, [data-sitekey]'); if (box) { box.scrollIntoView({ behavior: 'smooth', block: 'center' }); const rect = box.getBoundingClientRect(); const x = rect.left + rect.width / 2; const y = rect.top + rect.height / 2; box.dispatchEvent(new MouseEvent('click', { bubbles: true, clientX: x, clientY: y })); } """) challengeSolution = page.ele("@name=cf-turnstile-response", timeout=3) challengeWrapper = challengeSolution.parent() challengeIframe = challengeWrapper.shadow_root.ele("tag:iframe", timeout=3) challengeIframe.run_js(""" window.dtp = 1 function getRandomInt(min, max) { return Math.floor(Math.random() * (max - min + 1)) + min; } // 旧方案在 4K 屏下不稳定,这里给出更自然的屏幕坐标。 let screenX = getRandomInt(800, 1200); let screenY = getRandomInt(400, 600); Object.defineProperty(MouseEvent.prototype, 'screenX', { value: screenX }); Object.defineProperty(MouseEvent.prototype, 'screenY', { value: screenY }); """) challengeIframeBody = challengeIframe.ele("tag:body", timeout=3).shadow_root challengeButton = challengeIframeBody.ele("tag:input", timeout=3) challengeButton.click() except: pass time.sleep(1) raise Exception("failed to solve turnstile") def build_profile(): # 生成一组可重复使用的注册资料,密码至少包含大小写、数字和特殊字符。 given_name = "Fan" family_name = "YuXi" password = "N" + secrets.token_hex(4) + "!a7#" + secrets.token_urlsafe(6) return given_name, family_name, password def fill_profile_and_submit(timeout=120): # 在验证码通过后,直接锁定“可见且可写”的真实输入框,避免命中隐藏节点或 React 受控副本。 given_name, family_name, password = build_profile() deadline = time.time() + timeout turnstile_token = "" while time.time() < deadline: filled = page.run_js( """ const givenName = arguments[0]; const familyName = arguments[1]; const password = arguments[2]; function isVisible(node) { if (!node) { return false; } const style = window.getComputedStyle(node); if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') { return false; } const rect = node.getBoundingClientRect(); return rect.width > 0 && rect.height > 0; } function pickInput(selector) { return Array.from(document.querySelectorAll(selector)).find((node) => { return isVisible(node) && !node.disabled && !node.readOnly; }) || null; } function setInputValue(input, value) { if (!input) { return false; } input.focus(); input.click(); const nativeSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value')?.set; const tracker = input._valueTracker; if (tracker) { tracker.setValue(''); } if (nativeSetter) { nativeSetter.call(input, ''); nativeSetter.call(input, value); } else { input.value = ''; input.value = value; } input.dispatchEvent(new InputEvent('beforeinput', { bubbles: true, cancelable: true, data: value, inputType: 'insertText', })); input.dispatchEvent(new InputEvent('input', { bubbles: true, cancelable: true, data: value, inputType: 'insertText', })); input.dispatchEvent(new Event('change', { bubbles: true })); input.dispatchEvent(new Event('blur', { bubbles: true })); return String(input.value || '') === String(value || ''); } const givenInput = pickInput('input[data-testid="givenName"], input[name="givenName"], input[autocomplete="given-name"]'); const familyInput = pickInput('input[data-testid="familyName"], input[name="familyName"], input[autocomplete="family-name"]'); const passwordInput = pickInput('input[data-testid="password"], input[name="password"], input[type="password"]'); if (!givenInput || !familyInput || !passwordInput) { return 'not-ready'; } const givenOk = setInputValue(givenInput, givenName); const familyOk = setInputValue(familyInput, familyName); const passwordOk = setInputValue(passwordInput, password); if (!givenOk || !familyOk || !passwordOk) { return 'filled-failed'; } return [ String(givenInput.value || '').trim() === String(givenName || '').trim(), String(familyInput.value || '').trim() === String(familyName || '').trim(), String(passwordInput.value || '') === String(password || ''), ].every(Boolean) ? 'filled' : 'verify-failed'; """, given_name, family_name, password, ) if filled == 'not-ready': time.sleep(0.5) continue if filled != 'filled': print(f"[Debug] 最终注册页输入框已出现,但姓名/密码写入失败: {filled}") time.sleep(0.5) continue values_ok = page.run_js( """ const expectedGiven = arguments[0]; const expectedFamily = arguments[1]; const expectedPassword = arguments[2]; function isVisible(node) { if (!node) { return false; } const style = window.getComputedStyle(node); if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') { return false; } const rect = node.getBoundingClientRect(); return rect.width > 0 && rect.height > 0; } function pickInput(selector) { return Array.from(document.querySelectorAll(selector)).find((node) => { return isVisible(node) && !node.disabled && !node.readOnly; }) || null; } const givenInput = pickInput('input[data-testid="givenName"], input[name="givenName"], input[autocomplete="given-name"]'); const familyInput = pickInput('input[data-testid="familyName"], input[name="familyName"], input[autocomplete="family-name"]'); const passwordInput = pickInput('input[data-testid="password"], input[name="password"], input[type="password"]'); if (!givenInput || !familyInput || !passwordInput) { return false; } return String(givenInput.value || '').trim() === String(expectedGiven || '').trim() && String(familyInput.value || '').trim() === String(expectedFamily || '').trim() && String(passwordInput.value || '') === String(expectedPassword || ''); """, given_name, family_name, password, ) if not values_ok: print("[Debug] 最终注册页字段值校验失败,继续重试填写。") time.sleep(0.5) continue turnstile_state = page.run_js( """ const challengeInput = document.querySelector('input[name="cf-turnstile-response"]'); if (!challengeInput) { return 'not-found'; } const value = String(challengeInput.value || '').trim(); return value ? 'ready' : 'pending'; """ ) if turnstile_state == "pending" and not turnstile_token: print("[*] 检测到最终注册页存在 Turnstile,开始使用现有真人化点击逻辑。") turnstile_token = getTurnstileToken() if turnstile_token: synced = page.run_js( """ const token = arguments[0]; const challengeInput = document.querySelector('input[name="cf-turnstile-response"]'); if (!challengeInput) { return false; } const nativeSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value')?.set; if (nativeSetter) { nativeSetter.call(challengeInput, token); } else { challengeInput.value = token; } challengeInput.dispatchEvent(new Event('input', { bubbles: true })); challengeInput.dispatchEvent(new Event('change', { bubbles: true })); return String(challengeInput.value || '').trim() === String(token || '').trim(); """, turnstile_token, ) if synced: print("[*] Turnstile 响应已同步到最终注册表单。") time.sleep(1.2) try: submit_button = page.ele('tag:button@@text()=完成注册', timeout=3) except Exception: submit_button = None if not submit_button: clicked = page.run_js( r""" const challengeInput = document.querySelector('input[name="cf-turnstile-response"]'); if (challengeInput && !String(challengeInput.value || '').trim()) { return false; } const buttons = Array.from(document.querySelectorAll('button[type="submit"], button')); const submitButton = buttons.find((node) => { const text = (node.innerText || node.textContent || '').replace(/\s+/g, ''); return text === '完成注册' || text.includes('完成注册'); }); if (!submitButton || submitButton.disabled || submitButton.getAttribute('aria-disabled') === 'true') { return false; } submitButton.focus(); submitButton.click(); return true; """ ) else: challenge_value = page.run_js( """ const challengeInput = document.querySelector('input[name="cf-turnstile-response"]'); return challengeInput ? String(challengeInput.value || '').trim() : 'not-found'; """ ) if challenge_value not in ('not-found', ''): submit_button.click() clicked = True else: # Turnstile 值可能为空,但按钮可能已启用,回退到 JS 方式尝试点击 clicked = page.run_js( r""" const buttons = Array.from(document.querySelectorAll('button[type="submit"], button')); const submitButton = buttons.find((node) => { const text = (node.innerText || node.textContent || '').replace(/\s+/g, ''); return text === '完成注册' || text.includes('完成注册'); }); if (!submitButton || submitButton.disabled || submitButton.getAttribute('aria-disabled') === 'true') { return false; } submitButton.focus(); submitButton.click(); return true; """ ) if clicked: print("[*] Turnstile 值为空,但按钮已启用,已强制点击完成注册。") if clicked: print(f"[*] 已填写注册资料并点击完成注册: {given_name} {family_name} / {password}") # 等待页面跳转,最多 15 秒 for _ in range(30): time.sleep(0.5) try: current_url = page.url if 'sign-up' not in current_url and 'signup' not in current_url: print(f"[*] 页面已跳转,当前URL: {current_url}") break except Exception: pass else: # 页面未跳转,检查是否有错误提示 try: error_text = page.run_js(r""" const errorNode = document.querySelector('[role="alert"], .error, [data-testid="error-message"], .text-red-500, .text-error'); return errorNode ? (errorNode.innerText || errorNode.textContent || '').trim().substring(0, 200) : ''; """) except Exception: error_text = "" if error_text: raise Exception(f"注册提交失败,页面提示错误: {error_text}") print("[*] 警告: 点击完成注册后页面未跳转,将继续尝试获取 sso cookie。") return { "given_name": given_name, "family_name": family_name, "password": password, } time.sleep(0.5) raise Exception("未找到最终注册表单或完成注册按钮") def extract_visible_numbers(timeout=60): # 登录/注册完成后,提取页面上可见的普通数字文本,不处理任何敏感 Cookie。 deadline = time.time() + timeout while time.time() < deadline: result = page.run_js( r""" function isVisible(el) { if (!el) { return false; } const style = window.getComputedStyle(el); if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') { return false; } const rect = el.getBoundingClientRect(); return rect.width > 0 && rect.height > 0; } const selector = [ 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'div', 'span', 'p', 'strong', 'b', 'small', '[data-testid]', '[class]', '[role="heading"]' ].join(','); const seen = new Set(); const matches = []; for (const node of document.querySelectorAll(selector)) { if (!isVisible(node)) { continue; } const text = String(node.innerText || node.textContent || '').replace(/\s+/g, ' ').trim(); if (!text) { continue; } const found = text.match(/\d+(?:\.\d+)?/g); if (!found) { continue; } for (const value of found) { const key = `${value}@@${text}`; if (seen.has(key)) { continue; } seen.add(key); matches.push({ value, text }); } } return matches.slice(0, 30); """ ) if result: print("[*] 页面可见数字文本提取结果:") for item in result: try: print(f" - 数字: {item['value']} | 上下文: {item['text']}") except Exception: pass return result time.sleep(1) raise Exception("登录后未提取到可见数字文本") def wait_for_sso_cookie(timeout=120): # 必须在注册完成后再取 sso,优先抓取精确的 sso cookie。 deadline = time.time() + timeout last_seen_names = set() last_report = 0 while time.time() < deadline: try: refresh_active_page() if page is None: time.sleep(1) continue cookies = page.cookies(all_domains=True, all_info=True) or [] for item in cookies: if isinstance(item, dict): name = str(item.get("name", "")).strip() value = str(item.get("value", "")).strip() else: name = str(getattr(item, "name", "")).strip() value = str(getattr(item, "value", "")).strip() if name: last_seen_names.add(name) if name == "sso" and value: print("[*] 注册完成后已获取到 sso cookie。") return value except PageDisconnectedError: refresh_active_page() except Exception: pass # 每 10 秒报告一次进度,避免看起来像是卡死 if time.time() - last_report >= 10: try: current_url = page.url if page else "unknown" except Exception: current_url = "unknown" print(f"[*] 等待 sso cookie 中... 当前URL: {current_url} 已见cookie: {sorted(last_seen_names)}") last_report = time.time() time.sleep(1) raise Exception(f"注册完成后未获取到 sso cookie,当前已见 cookie: {sorted(last_seen_names)}") def append_sso_to_txt(sso_value, output_path=DEFAULT_SSO_FILE): # 按用户要求,一行写一个 sso 值,持续追加。 normalized = str(sso_value or "").strip() if not normalized: raise Exception("待写入的 sso 为空") with open(output_path, "a", encoding="utf-8") as file: file.write(normalized + "\n") print(f"[*] 已追加写入 sso 到文件: {output_path}") def run_single_registration(output_path=DEFAULT_SSO_FILE, extract_numbers=False): # 单轮流程:打开注册页 -> 完成注册 -> 获取 sso -> 写 txt。 open_signup_page() email, dev_token = fill_email_and_submit() fill_code_and_submit(email, dev_token) profile = fill_profile_and_submit() sso_value = wait_for_sso_cookie() append_sso_to_txt(sso_value, output_path) if extract_numbers: extract_visible_numbers() result = { "email": email, "sso": sso_value, **profile, } print(f"[*] 本轮注册完成,邮箱: {email}") return result def main(): # 默认循环执行;每轮完成后关闭当前页,再自动进入下一轮。 parser = argparse.ArgumentParser(description="xAI 自动注册并采集 sso") parser.add_argument("--count", type=int, default=0, help="执行轮数,0 表示无限循环") parser.add_argument("--output", default=DEFAULT_SSO_FILE, help="sso 输出 txt 路径") parser.add_argument("--extract-numbers", action="store_true", help="注册完成后额外提取页面数字文本") args = parser.parse_args() current_round = 0 try: start_browser() while True: if args.count > 0 and current_round >= args.count: break current_round += 1 print(f"\n[*] 开始第 {current_round} 轮注册") round_succeeded = False try: with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(run_single_registration, args.output, extract_numbers=args.extract_numbers) try: future.result(timeout=120) round_succeeded = True except FutureTimeoutError: print(f"[Error] 第 {current_round} 轮注册超时(>120秒),强制终止并进入下一轮。") except KeyboardInterrupt: print("\n[Info] 收到中断信号,停止后续轮次。") break except Exception as error: print(f"[Error] 第 {current_round} 轮失败: {error}") finally: restart_browser() if args.count == 0 or current_round < args.count: time.sleep(2) finally: stop_browser() if __name__ == "__main__": main()