2302e74209
- DrissionPage + Chrome browser automation - Temporary email via mail.tm API - Verification code auto-extraction - Turnstile CAPTCHA bypass via extension - SSO cookie extraction and persistence
1140 lines
38 KiB
Python
1140 lines
38 KiB
Python
from DrissionPage import Chromium, ChromiumOptions
|
|
from DrissionPage.errors import PageDisconnectedError
|
|
import argparse
|
|
import time
|
|
import os
|
|
import secrets
|
|
import sys
|
|
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
|
|
import threading
|
|
|
|
from openai_register import get_email_and_token, get_oai_code
|
|
|
|
|
|
def ensure_stable_python_runtime():
|
|
# 优先自动切到更稳定的 3.12 / 3.13,避免 3.14 下 Mail.tm 偶发 TLS/兼容问题。
|
|
if sys.version_info < (3, 14) or os.environ.get("DPE_REEXEC_DONE") == "1":
|
|
return
|
|
|
|
local_app_data = os.environ.get("LOCALAPPDATA", "")
|
|
candidates = [
|
|
os.path.join(local_app_data, "Programs", "Python", "Python312", "python.exe"),
|
|
os.path.join(local_app_data, "Programs", "Python", "Python313", "python.exe"),
|
|
]
|
|
|
|
current_python = os.path.normcase(os.path.abspath(sys.executable))
|
|
for candidate in candidates:
|
|
if not os.path.isfile(candidate):
|
|
continue
|
|
if os.path.normcase(os.path.abspath(candidate)) == current_python:
|
|
return
|
|
|
|
print(f"[*] 检测到 Python {sys.version.split()[0]},自动切换到更稳定的解释器: {candidate}")
|
|
env = os.environ.copy()
|
|
env["DPE_REEXEC_DONE"] = "1"
|
|
os.execve(candidate, [candidate, os.path.abspath(__file__), *sys.argv[1:]], env)
|
|
|
|
|
|
def warn_runtime_compatibility():
|
|
# 中文提示:避免把底层 TLS 兼容问题误判成脚本逻辑错误。
|
|
if sys.version_info >= (3, 14):
|
|
print("[提示] 当前 Python 为 3.14+;若出现 Mail.tm TLS 异常,建议改用 Python 3.12 或 3.13。")
|
|
|
|
|
|
ensure_stable_python_runtime()
|
|
warn_runtime_compatibility()
|
|
|
|
co = ChromiumOptions()
|
|
co.auto_port()
|
|
co.set_timeouts(base=1)
|
|
|
|
# 加载修复 MouseEvent.screenX / screenY 的扩展。
|
|
EXTENSION_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "turnstilePatch"))
|
|
co.add_extension(EXTENSION_PATH)
|
|
|
|
# 保持可视化浏览器模式,方便直接观察注册过程。
|
|
|
|
browser = None
|
|
page = None
|
|
|
|
SIGNUP_URL = "https://accounts.x.ai/sign-up?redirect=grok-com"
|
|
DEFAULT_SSO_FILE = os.path.join(os.path.dirname(__file__), "sso.txt")
|
|
|
|
|
|
def start_browser():
|
|
# 每轮从全新浏览器开始,降低长时间复用带来的页面与 Cookie 污染。
|
|
global browser, page
|
|
browser = None
|
|
page = None
|
|
|
|
def _create():
|
|
global browser
|
|
try:
|
|
browser = Chromium(co)
|
|
except Exception:
|
|
browser = None
|
|
|
|
t = threading.Thread(target=_create)
|
|
t.daemon = True
|
|
t.start()
|
|
t.join(timeout=15)
|
|
|
|
if browser is None:
|
|
raise Exception("启动浏览器超时(>15秒)")
|
|
|
|
tabs = browser.get_tabs()
|
|
page = tabs[-1] if tabs else browser.new_tab()
|
|
return browser, page
|
|
|
|
|
|
def stop_browser():
|
|
# 完整关闭整个浏览器实例,供下一轮重新拉起。
|
|
global browser, page
|
|
if browser is not None:
|
|
def _safe_quit():
|
|
try:
|
|
browser.quit()
|
|
except Exception:
|
|
pass
|
|
t = threading.Thread(target=_safe_quit)
|
|
t.daemon = True
|
|
t.start()
|
|
t.join(timeout=5) # 最多等 5 秒,避免卡死
|
|
browser = None
|
|
page = None
|
|
|
|
|
|
def restart_browser():
|
|
# 用户要求不要长期复用同一浏览器,因此每轮结束都重启整个实例。
|
|
stop_browser()
|
|
time.sleep(1.5)
|
|
start_browser()
|
|
|
|
|
|
def refresh_active_page():
|
|
# 验证码确认后页面会跳转,旧 page 句柄可能断开,这里统一重新获取当前活动标签页。
|
|
global browser, page
|
|
if browser is None:
|
|
start_browser()
|
|
try:
|
|
tabs = browser.get_tabs()
|
|
if tabs:
|
|
page = tabs[-1]
|
|
else:
|
|
page = browser.new_tab()
|
|
except Exception:
|
|
restart_browser()
|
|
return page
|
|
|
|
|
|
def open_signup_page():
|
|
# 每轮开始时打开注册页,并切到“使用邮箱注册”流程。
|
|
global page
|
|
refresh_active_page()
|
|
try:
|
|
page.get(SIGNUP_URL)
|
|
except Exception:
|
|
refresh_active_page()
|
|
page = browser.new_tab(SIGNUP_URL)
|
|
click_email_signup_button()
|
|
|
|
|
|
def close_current_page():
|
|
# 兼容旧调用名,实际行为改为整轮重启浏览器。
|
|
restart_browser()
|
|
|
|
|
|
def has_profile_form():
|
|
# 最终注册页只要出现姓名和密码输入框,就认为已经成功进入资料填写阶段。
|
|
refresh_active_page()
|
|
try:
|
|
return bool(page.run_js(
|
|
"""
|
|
const givenInput = document.querySelector('input[data-testid="givenName"], input[name="givenName"], input[autocomplete="given-name"]');
|
|
const familyInput = document.querySelector('input[data-testid="familyName"], input[name="familyName"], input[autocomplete="family-name"]');
|
|
const passwordInput = document.querySelector('input[data-testid="password"], input[name="password"], input[type="password"]');
|
|
return !!(givenInput && familyInput && passwordInput);
|
|
"""
|
|
))
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def click_email_signup_button(timeout=10):
|
|
# 页面打开后,自动点击“使用邮箱注册”按钮。
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
clicked = page.run_js(r"""
|
|
const candidates = Array.from(document.querySelectorAll('button, a, [role="button"]'));
|
|
const target = candidates.find((node) => {
|
|
const text = (node.innerText || node.textContent || '').replace(/\s+/g, '');
|
|
return text.includes('使用邮箱注册');
|
|
});
|
|
|
|
if (!target) {
|
|
return false;
|
|
}
|
|
|
|
target.click();
|
|
return true;
|
|
""")
|
|
|
|
if clicked:
|
|
return True
|
|
|
|
time.sleep(0.5)
|
|
|
|
raise Exception('未找到“使用邮箱注册”按钮')
|
|
|
|
|
|
def fill_email_and_submit(timeout=15):
|
|
# 复用 `openai_register.py` 里的邮箱获取逻辑,保留邮箱与 token 供后续验证码步骤继续使用。
|
|
email, dev_token = get_email_and_token()
|
|
if not email or not dev_token:
|
|
raise Exception("获取邮箱失败")
|
|
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
filled = page.run_js(
|
|
"""
|
|
const email = arguments[0];
|
|
|
|
function isVisible(node) {
|
|
if (!node) {
|
|
return false;
|
|
}
|
|
const style = window.getComputedStyle(node);
|
|
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
|
|
return false;
|
|
}
|
|
const rect = node.getBoundingClientRect();
|
|
return rect.width > 0 && rect.height > 0;
|
|
}
|
|
|
|
const input = Array.from(document.querySelectorAll('input[data-testid="email"], input[name="email"], input[type="email"], input[autocomplete="email"]')).find((node) => {
|
|
return isVisible(node) && !node.disabled && !node.readOnly;
|
|
}) || null;
|
|
|
|
if (!input) {
|
|
return 'not-ready';
|
|
}
|
|
|
|
input.focus();
|
|
input.click();
|
|
|
|
// 不能只写 `input.value = xxx`,否则 React / 受控表单可能没有同步内部状态。
|
|
const valueSetter = Object.getOwnPropertyDescriptor(HTMLInputElement.prototype, 'value')?.set;
|
|
const tracker = input._valueTracker;
|
|
if (tracker) {
|
|
tracker.setValue('');
|
|
}
|
|
if (valueSetter) {
|
|
valueSetter.call(input, email);
|
|
} else {
|
|
input.value = email;
|
|
}
|
|
|
|
input.dispatchEvent(new InputEvent('beforeinput', {
|
|
bubbles: true,
|
|
data: email,
|
|
inputType: 'insertText',
|
|
}));
|
|
input.dispatchEvent(new InputEvent('input', {
|
|
bubbles: true,
|
|
data: email,
|
|
inputType: 'insertText',
|
|
}));
|
|
input.dispatchEvent(new Event('change', { bubbles: true }));
|
|
|
|
if ((input.value || '').trim() !== email || !input.checkValidity()) {
|
|
return false;
|
|
}
|
|
|
|
input.blur();
|
|
return 'filled';
|
|
""",
|
|
email,
|
|
)
|
|
|
|
if filled == 'not-ready':
|
|
time.sleep(0.5)
|
|
continue
|
|
|
|
if filled != 'filled':
|
|
print(f"[Debug] 邮箱输入框已出现,但写入失败: {filled}")
|
|
time.sleep(0.5)
|
|
continue
|
|
|
|
if filled == 'filled':
|
|
time.sleep(0.8)
|
|
clicked = page.run_js(
|
|
r"""
|
|
function isVisible(node) {
|
|
if (!node) {
|
|
return false;
|
|
}
|
|
const style = window.getComputedStyle(node);
|
|
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
|
|
return false;
|
|
}
|
|
const rect = node.getBoundingClientRect();
|
|
return rect.width > 0 && rect.height > 0;
|
|
}
|
|
|
|
const input = Array.from(document.querySelectorAll('input[data-testid="email"], input[name="email"], input[type="email"], input[autocomplete="email"]')).find((node) => {
|
|
return isVisible(node) && !node.disabled && !node.readOnly;
|
|
}) || null;
|
|
|
|
if (!input || !input.checkValidity() || !(input.value || '').trim()) {
|
|
return false;
|
|
}
|
|
|
|
const buttons = Array.from(document.querySelectorAll('button[type="submit"], button')).filter((node) => {
|
|
return isVisible(node) && !node.disabled && node.getAttribute('aria-disabled') !== 'true';
|
|
});
|
|
const submitButton = buttons.find((node) => {
|
|
const text = (node.innerText || node.textContent || '').replace(/\s+/g, '');
|
|
return text === '注册' || text.includes('注册');
|
|
});
|
|
|
|
if (!submitButton || submitButton.disabled) {
|
|
return false;
|
|
}
|
|
|
|
submitButton.click();
|
|
return true;
|
|
"""
|
|
)
|
|
|
|
if clicked:
|
|
print(f"[*] 已填写邮箱并点击注册: {email}")
|
|
return email, dev_token
|
|
|
|
time.sleep(0.5)
|
|
|
|
raise Exception("未找到邮箱输入框或注册按钮")
|
|
|
|
|
|
|
|
def fill_code_and_submit(email, dev_token, timeout=180):
|
|
# 复用 `openai_register.py` 里的验证码轮询逻辑,等待邮件到达后自动填写 OTP。
|
|
code = get_oai_code(dev_token, email)
|
|
if not code:
|
|
raise Exception("获取验证码失败")
|
|
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
try:
|
|
filled = page.run_js(
|
|
"""
|
|
const code = String(arguments[0] || '').trim();
|
|
|
|
function isVisible(node) {
|
|
if (!node) {
|
|
return false;
|
|
}
|
|
const style = window.getComputedStyle(node);
|
|
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
|
|
return false;
|
|
}
|
|
const rect = node.getBoundingClientRect();
|
|
return rect.width > 0 && rect.height > 0;
|
|
}
|
|
|
|
function setNativeValue(input, value) {
|
|
const nativeInputValueSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value')?.set;
|
|
const tracker = input._valueTracker;
|
|
if (tracker) {
|
|
tracker.setValue('');
|
|
}
|
|
if (nativeInputValueSetter) {
|
|
nativeInputValueSetter.call(input, '');
|
|
nativeInputValueSetter.call(input, value);
|
|
} else {
|
|
input.value = '';
|
|
input.value = value;
|
|
}
|
|
}
|
|
|
|
function dispatchInputEvents(input, value) {
|
|
input.dispatchEvent(new InputEvent('beforeinput', {
|
|
bubbles: true,
|
|
cancelable: true,
|
|
data: value,
|
|
inputType: 'insertText',
|
|
}));
|
|
input.dispatchEvent(new InputEvent('input', {
|
|
bubbles: true,
|
|
cancelable: true,
|
|
data: value,
|
|
inputType: 'insertText',
|
|
}));
|
|
input.dispatchEvent(new Event('change', { bubbles: true }));
|
|
}
|
|
|
|
const input = Array.from(document.querySelectorAll('input[data-input-otp="true"], input[name="code"], input[autocomplete="one-time-code"], input[inputmode="numeric"], input[inputmode="text"]')).find((node) => {
|
|
return isVisible(node) && !node.disabled && !node.readOnly && Number(node.maxLength || code.length || 6) > 1;
|
|
}) || null;
|
|
|
|
const otpBoxes = Array.from(document.querySelectorAll('input')).filter((node) => {
|
|
if (!isVisible(node) || node.disabled || node.readOnly) {
|
|
return false;
|
|
}
|
|
const maxLength = Number(node.maxLength || 0);
|
|
const autocomplete = String(node.autocomplete || '').toLowerCase();
|
|
return maxLength === 1 || autocomplete === 'one-time-code';
|
|
});
|
|
|
|
if (!input && otpBoxes.length < code.length) {
|
|
return 'not-ready';
|
|
}
|
|
|
|
if (input) {
|
|
input.focus();
|
|
input.click();
|
|
setNativeValue(input, code);
|
|
dispatchInputEvents(input, code);
|
|
|
|
const normalizedValue = String(input.value || '').trim();
|
|
const expectedLength = Number(input.maxLength || code.length || 6);
|
|
const slots = Array.from(document.querySelectorAll('[data-input-otp-slot="true"]'));
|
|
const filledSlots = slots.filter((slot) => (slot.textContent || '').trim()).length;
|
|
|
|
if (normalizedValue !== code) {
|
|
// 聚合输入框写入失败,尝试回退到分格输入方式
|
|
if (otpBoxes.length >= code.length) {
|
|
const orderedBoxes = otpBoxes.slice(0, code.length);
|
|
for (let i = 0; i < orderedBoxes.length; i += 1) {
|
|
const box = orderedBoxes[i];
|
|
const char = code[i] || '';
|
|
box.focus();
|
|
box.click();
|
|
setNativeValue(box, char);
|
|
dispatchInputEvents(box, char);
|
|
box.dispatchEvent(new KeyboardEvent('keydown', { bubbles: true, key: char }));
|
|
box.dispatchEvent(new KeyboardEvent('keyup', { bubbles: true, key: char }));
|
|
box.blur();
|
|
}
|
|
const merged = orderedBoxes.map((node) => String(node.value || '').trim()).join('');
|
|
return merged === code ? 'filled' : 'box-mismatch';
|
|
}
|
|
return 'aggregate-mismatch';
|
|
}
|
|
|
|
if (expectedLength > 0 && normalizedValue.length !== expectedLength) {
|
|
return 'aggregate-length-mismatch';
|
|
}
|
|
|
|
if (slots.length && filledSlots && filledSlots !== normalizedValue.length) {
|
|
return 'aggregate-slot-mismatch';
|
|
}
|
|
|
|
input.blur();
|
|
return 'filled';
|
|
}
|
|
|
|
const orderedBoxes = otpBoxes.slice(0, code.length);
|
|
for (let i = 0; i < orderedBoxes.length; i += 1) {
|
|
const box = orderedBoxes[i];
|
|
const char = code[i] || '';
|
|
box.focus();
|
|
box.click();
|
|
setNativeValue(box, char);
|
|
dispatchInputEvents(box, char);
|
|
box.dispatchEvent(new KeyboardEvent('keydown', { bubbles: true, key: char }));
|
|
box.dispatchEvent(new KeyboardEvent('keyup', { bubbles: true, key: char }));
|
|
box.blur();
|
|
}
|
|
|
|
const merged = orderedBoxes.map((node) => String(node.value || '').trim()).join('');
|
|
return merged === code ? 'filled' : 'box-mismatch';
|
|
""",
|
|
code,
|
|
)
|
|
except PageDisconnectedError:
|
|
# 点击确认邮箱后如果刚好发生跳转,旧页面句柄会断开;此时切到新页继续判断即可。
|
|
refresh_active_page()
|
|
if has_profile_form():
|
|
print("[*] 验证码提交后已跳转到最终注册页。")
|
|
return code
|
|
time.sleep(1)
|
|
continue
|
|
|
|
if filled == 'not-ready':
|
|
if has_profile_form():
|
|
print("[*] 已直接进入最终注册页,跳过验证码按钮确认。")
|
|
return code
|
|
time.sleep(0.5)
|
|
continue
|
|
|
|
if filled != 'filled':
|
|
print(f"[Debug] 验证码输入框已出现,但写入失败: {filled}")
|
|
time.sleep(0.5)
|
|
continue
|
|
|
|
if filled == 'filled':
|
|
time.sleep(1.2)
|
|
try:
|
|
clicked = page.run_js(
|
|
r"""
|
|
function isVisible(node) {
|
|
if (!node) {
|
|
return false;
|
|
}
|
|
const style = window.getComputedStyle(node);
|
|
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
|
|
return false;
|
|
}
|
|
const rect = node.getBoundingClientRect();
|
|
return rect.width > 0 && rect.height > 0;
|
|
}
|
|
|
|
const aggregateInput = Array.from(document.querySelectorAll('input[data-input-otp="true"], input[name="code"], input[autocomplete="one-time-code"], input[inputmode="numeric"], input[inputmode="text"]')).find((node) => {
|
|
return isVisible(node) && !node.disabled && !node.readOnly && Number(node.maxLength || 0) > 1;
|
|
}) || null;
|
|
|
|
let value = '';
|
|
if (aggregateInput) {
|
|
value = String(aggregateInput.value || '').trim();
|
|
const expectedLength = Number(aggregateInput.maxLength || value.length || 6);
|
|
if (!value || (expectedLength > 0 && value.length !== expectedLength)) {
|
|
return false;
|
|
}
|
|
|
|
const slots = Array.from(document.querySelectorAll('[data-input-otp-slot="true"]'));
|
|
if (slots.length) {
|
|
const filledSlots = slots.filter((slot) => (slot.textContent || '').trim()).length;
|
|
if (filledSlots && filledSlots !== value.length) {
|
|
return false;
|
|
}
|
|
}
|
|
} else {
|
|
const otpBoxes = Array.from(document.querySelectorAll('input')).filter((node) => {
|
|
if (!isVisible(node) || node.disabled || node.readOnly) {
|
|
return false;
|
|
}
|
|
const maxLength = Number(node.maxLength || 0);
|
|
const autocomplete = String(node.autocomplete || '').toLowerCase();
|
|
return maxLength === 1 || autocomplete === 'one-time-code';
|
|
});
|
|
value = otpBoxes.map((node) => String(node.value || '').trim()).join('');
|
|
if (!value || value.length < 6) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
const buttons = Array.from(document.querySelectorAll('button[type="submit"], button')).filter((node) => {
|
|
return isVisible(node) && !node.disabled && node.getAttribute('aria-disabled') !== 'true';
|
|
});
|
|
const confirmButton = buttons.find((node) => {
|
|
const text = (node.innerText || node.textContent || '').replace(/\s+/g, '');
|
|
return text === '确认邮箱' || text.includes('确认邮箱') || text === '继续' || text.includes('继续') || text === '下一步' || text.includes('下一步');
|
|
});
|
|
|
|
if (!confirmButton) {
|
|
return 'no-button';
|
|
}
|
|
|
|
confirmButton.focus();
|
|
confirmButton.click();
|
|
return 'clicked';
|
|
"""
|
|
)
|
|
except PageDisconnectedError:
|
|
refresh_active_page()
|
|
if has_profile_form():
|
|
print("[*] 确认邮箱后页面跳转成功,已进入最终注册页。")
|
|
return code
|
|
clicked = 'disconnected'
|
|
|
|
if clicked == 'clicked':
|
|
print(f"[*] 已填写验证码并点击确认邮箱: {code}")
|
|
time.sleep(2)
|
|
refresh_active_page()
|
|
if has_profile_form():
|
|
print("[*] 验证码确认完成,最终注册页已就绪。")
|
|
return code
|
|
|
|
if clicked == 'no-button':
|
|
current_url = page.url
|
|
if 'sign-up' in current_url or 'signup' in current_url:
|
|
print(f"[*] 已填写验证码,页面已自动跳转到下一步: {current_url}")
|
|
return code
|
|
|
|
if clicked == 'disconnected':
|
|
time.sleep(1)
|
|
continue
|
|
|
|
time.sleep(0.5)
|
|
|
|
debug_snapshot = page.run_js(
|
|
r"""
|
|
function isVisible(node) {
|
|
if (!node) {
|
|
return false;
|
|
}
|
|
const style = window.getComputedStyle(node);
|
|
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
|
|
return false;
|
|
}
|
|
const rect = node.getBoundingClientRect();
|
|
return rect.width > 0 && rect.height > 0;
|
|
}
|
|
|
|
const inputs = Array.from(document.querySelectorAll('input')).filter(isVisible).map((node) => ({
|
|
type: node.type || '',
|
|
name: node.name || '',
|
|
testid: node.getAttribute('data-testid') || '',
|
|
autocomplete: node.autocomplete || '',
|
|
maxLength: Number(node.maxLength || 0),
|
|
value: String(node.value || ''),
|
|
}));
|
|
|
|
const buttons = Array.from(document.querySelectorAll('button')).filter(isVisible).map((node) => ({
|
|
text: String(node.innerText || node.textContent || '').replace(/\s+/g, ' ').trim(),
|
|
disabled: !!node.disabled,
|
|
ariaDisabled: node.getAttribute('aria-disabled') || '',
|
|
}));
|
|
|
|
return { url: location.href, inputs, buttons };
|
|
"""
|
|
)
|
|
print(f"[Debug] 验证码页 DOM 摘要: {debug_snapshot}")
|
|
raise Exception("未找到验证码输入框或确认邮箱按钮")
|
|
|
|
|
|
def getTurnstileToken():
|
|
# 复用现有 turnstile 处理逻辑,在最终注册页需要时再触发。
|
|
try:
|
|
page.run_js("try { turnstile.reset() } catch(e) { }")
|
|
except Exception:
|
|
pass
|
|
|
|
turnstileResponse = None
|
|
|
|
for i in range(0, 15):
|
|
try:
|
|
turnstileResponse = page.run_js("try { return turnstile.getResponse() } catch(e) { return null }")
|
|
if turnstileResponse:
|
|
return turnstileResponse
|
|
|
|
# 先尝试直接点击页面主文档中可见的 Turnstile 容器/iframe
|
|
page.run_js("""
|
|
const box = document.querySelector('.cf-turnstile, .turnstile, [data-sitekey]');
|
|
if (box) {
|
|
box.scrollIntoView({ behavior: 'smooth', block: 'center' });
|
|
const rect = box.getBoundingClientRect();
|
|
const x = rect.left + rect.width / 2;
|
|
const y = rect.top + rect.height / 2;
|
|
box.dispatchEvent(new MouseEvent('click', { bubbles: true, clientX: x, clientY: y }));
|
|
}
|
|
""")
|
|
|
|
challengeSolution = page.ele("@name=cf-turnstile-response", timeout=3)
|
|
challengeWrapper = challengeSolution.parent()
|
|
challengeIframe = challengeWrapper.shadow_root.ele("tag:iframe", timeout=3)
|
|
|
|
challengeIframe.run_js("""
|
|
window.dtp = 1
|
|
function getRandomInt(min, max) {
|
|
return Math.floor(Math.random() * (max - min + 1)) + min;
|
|
}
|
|
|
|
// 旧方案在 4K 屏下不稳定,这里给出更自然的屏幕坐标。
|
|
let screenX = getRandomInt(800, 1200);
|
|
let screenY = getRandomInt(400, 600);
|
|
|
|
Object.defineProperty(MouseEvent.prototype, 'screenX', { value: screenX });
|
|
Object.defineProperty(MouseEvent.prototype, 'screenY', { value: screenY });
|
|
""")
|
|
|
|
challengeIframeBody = challengeIframe.ele("tag:body", timeout=3).shadow_root
|
|
challengeButton = challengeIframeBody.ele("tag:input", timeout=3)
|
|
challengeButton.click()
|
|
except:
|
|
pass
|
|
time.sleep(1)
|
|
raise Exception("failed to solve turnstile")
|
|
|
|
|
|
def build_profile():
|
|
# 生成一组可重复使用的注册资料,密码至少包含大小写、数字和特殊字符。
|
|
given_name = "Fan"
|
|
family_name = "YuXi"
|
|
password = "N" + secrets.token_hex(4) + "!a7#" + secrets.token_urlsafe(6)
|
|
return given_name, family_name, password
|
|
|
|
|
|
def fill_profile_and_submit(timeout=120):
|
|
# 在验证码通过后,直接锁定“可见且可写”的真实输入框,避免命中隐藏节点或 React 受控副本。
|
|
given_name, family_name, password = build_profile()
|
|
deadline = time.time() + timeout
|
|
turnstile_token = ""
|
|
|
|
while time.time() < deadline:
|
|
filled = page.run_js(
|
|
"""
|
|
const givenName = arguments[0];
|
|
const familyName = arguments[1];
|
|
const password = arguments[2];
|
|
|
|
function isVisible(node) {
|
|
if (!node) {
|
|
return false;
|
|
}
|
|
const style = window.getComputedStyle(node);
|
|
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
|
|
return false;
|
|
}
|
|
const rect = node.getBoundingClientRect();
|
|
return rect.width > 0 && rect.height > 0;
|
|
}
|
|
|
|
function pickInput(selector) {
|
|
return Array.from(document.querySelectorAll(selector)).find((node) => {
|
|
return isVisible(node) && !node.disabled && !node.readOnly;
|
|
}) || null;
|
|
}
|
|
|
|
function setInputValue(input, value) {
|
|
if (!input) {
|
|
return false;
|
|
}
|
|
input.focus();
|
|
input.click();
|
|
|
|
const nativeSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value')?.set;
|
|
const tracker = input._valueTracker;
|
|
if (tracker) {
|
|
tracker.setValue('');
|
|
}
|
|
|
|
if (nativeSetter) {
|
|
nativeSetter.call(input, '');
|
|
nativeSetter.call(input, value);
|
|
} else {
|
|
input.value = '';
|
|
input.value = value;
|
|
}
|
|
|
|
input.dispatchEvent(new InputEvent('beforeinput', {
|
|
bubbles: true,
|
|
cancelable: true,
|
|
data: value,
|
|
inputType: 'insertText',
|
|
}));
|
|
input.dispatchEvent(new InputEvent('input', {
|
|
bubbles: true,
|
|
cancelable: true,
|
|
data: value,
|
|
inputType: 'insertText',
|
|
}));
|
|
input.dispatchEvent(new Event('change', { bubbles: true }));
|
|
input.dispatchEvent(new Event('blur', { bubbles: true }));
|
|
|
|
return String(input.value || '') === String(value || '');
|
|
}
|
|
|
|
const givenInput = pickInput('input[data-testid="givenName"], input[name="givenName"], input[autocomplete="given-name"]');
|
|
const familyInput = pickInput('input[data-testid="familyName"], input[name="familyName"], input[autocomplete="family-name"]');
|
|
const passwordInput = pickInput('input[data-testid="password"], input[name="password"], input[type="password"]');
|
|
|
|
if (!givenInput || !familyInput || !passwordInput) {
|
|
return 'not-ready';
|
|
}
|
|
|
|
const givenOk = setInputValue(givenInput, givenName);
|
|
const familyOk = setInputValue(familyInput, familyName);
|
|
const passwordOk = setInputValue(passwordInput, password);
|
|
|
|
if (!givenOk || !familyOk || !passwordOk) {
|
|
return 'filled-failed';
|
|
}
|
|
|
|
return [
|
|
String(givenInput.value || '').trim() === String(givenName || '').trim(),
|
|
String(familyInput.value || '').trim() === String(familyName || '').trim(),
|
|
String(passwordInput.value || '') === String(password || ''),
|
|
].every(Boolean) ? 'filled' : 'verify-failed';
|
|
""",
|
|
given_name,
|
|
family_name,
|
|
password,
|
|
)
|
|
|
|
if filled == 'not-ready':
|
|
time.sleep(0.5)
|
|
continue
|
|
|
|
if filled != 'filled':
|
|
print(f"[Debug] 最终注册页输入框已出现,但姓名/密码写入失败: {filled}")
|
|
time.sleep(0.5)
|
|
continue
|
|
|
|
values_ok = page.run_js(
|
|
"""
|
|
const expectedGiven = arguments[0];
|
|
const expectedFamily = arguments[1];
|
|
const expectedPassword = arguments[2];
|
|
|
|
function isVisible(node) {
|
|
if (!node) {
|
|
return false;
|
|
}
|
|
const style = window.getComputedStyle(node);
|
|
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
|
|
return false;
|
|
}
|
|
const rect = node.getBoundingClientRect();
|
|
return rect.width > 0 && rect.height > 0;
|
|
}
|
|
|
|
function pickInput(selector) {
|
|
return Array.from(document.querySelectorAll(selector)).find((node) => {
|
|
return isVisible(node) && !node.disabled && !node.readOnly;
|
|
}) || null;
|
|
}
|
|
|
|
const givenInput = pickInput('input[data-testid="givenName"], input[name="givenName"], input[autocomplete="given-name"]');
|
|
const familyInput = pickInput('input[data-testid="familyName"], input[name="familyName"], input[autocomplete="family-name"]');
|
|
const passwordInput = pickInput('input[data-testid="password"], input[name="password"], input[type="password"]');
|
|
|
|
if (!givenInput || !familyInput || !passwordInput) {
|
|
return false;
|
|
}
|
|
|
|
return String(givenInput.value || '').trim() === String(expectedGiven || '').trim()
|
|
&& String(familyInput.value || '').trim() === String(expectedFamily || '').trim()
|
|
&& String(passwordInput.value || '') === String(expectedPassword || '');
|
|
""",
|
|
given_name,
|
|
family_name,
|
|
password,
|
|
)
|
|
if not values_ok:
|
|
print("[Debug] 最终注册页字段值校验失败,继续重试填写。")
|
|
time.sleep(0.5)
|
|
continue
|
|
|
|
turnstile_state = page.run_js(
|
|
"""
|
|
const challengeInput = document.querySelector('input[name="cf-turnstile-response"]');
|
|
if (!challengeInput) {
|
|
return 'not-found';
|
|
}
|
|
const value = String(challengeInput.value || '').trim();
|
|
return value ? 'ready' : 'pending';
|
|
"""
|
|
)
|
|
|
|
if turnstile_state == "pending" and not turnstile_token:
|
|
print("[*] 检测到最终注册页存在 Turnstile,开始使用现有真人化点击逻辑。")
|
|
turnstile_token = getTurnstileToken()
|
|
if turnstile_token:
|
|
synced = page.run_js(
|
|
"""
|
|
const token = arguments[0];
|
|
const challengeInput = document.querySelector('input[name="cf-turnstile-response"]');
|
|
if (!challengeInput) {
|
|
return false;
|
|
}
|
|
const nativeSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value')?.set;
|
|
if (nativeSetter) {
|
|
nativeSetter.call(challengeInput, token);
|
|
} else {
|
|
challengeInput.value = token;
|
|
}
|
|
challengeInput.dispatchEvent(new Event('input', { bubbles: true }));
|
|
challengeInput.dispatchEvent(new Event('change', { bubbles: true }));
|
|
return String(challengeInput.value || '').trim() === String(token || '').trim();
|
|
""",
|
|
turnstile_token,
|
|
)
|
|
if synced:
|
|
print("[*] Turnstile 响应已同步到最终注册表单。")
|
|
|
|
time.sleep(1.2)
|
|
|
|
try:
|
|
submit_button = page.ele('tag:button@@text()=完成注册', timeout=3)
|
|
except Exception:
|
|
submit_button = None
|
|
|
|
if not submit_button:
|
|
clicked = page.run_js(
|
|
r"""
|
|
const challengeInput = document.querySelector('input[name="cf-turnstile-response"]');
|
|
if (challengeInput && !String(challengeInput.value || '').trim()) {
|
|
return false;
|
|
}
|
|
const buttons = Array.from(document.querySelectorAll('button[type="submit"], button'));
|
|
const submitButton = buttons.find((node) => {
|
|
const text = (node.innerText || node.textContent || '').replace(/\s+/g, '');
|
|
return text === '完成注册' || text.includes('完成注册');
|
|
});
|
|
if (!submitButton || submitButton.disabled || submitButton.getAttribute('aria-disabled') === 'true') {
|
|
return false;
|
|
}
|
|
submitButton.focus();
|
|
submitButton.click();
|
|
return true;
|
|
"""
|
|
)
|
|
else:
|
|
challenge_value = page.run_js(
|
|
"""
|
|
const challengeInput = document.querySelector('input[name="cf-turnstile-response"]');
|
|
return challengeInput ? String(challengeInput.value || '').trim() : 'not-found';
|
|
"""
|
|
)
|
|
if challenge_value not in ('not-found', ''):
|
|
submit_button.click()
|
|
clicked = True
|
|
else:
|
|
# Turnstile 值可能为空,但按钮可能已启用,回退到 JS 方式尝试点击
|
|
clicked = page.run_js(
|
|
r"""
|
|
const buttons = Array.from(document.querySelectorAll('button[type="submit"], button'));
|
|
const submitButton = buttons.find((node) => {
|
|
const text = (node.innerText || node.textContent || '').replace(/\s+/g, '');
|
|
return text === '完成注册' || text.includes('完成注册');
|
|
});
|
|
if (!submitButton || submitButton.disabled || submitButton.getAttribute('aria-disabled') === 'true') {
|
|
return false;
|
|
}
|
|
submitButton.focus();
|
|
submitButton.click();
|
|
return true;
|
|
"""
|
|
)
|
|
if clicked:
|
|
print("[*] Turnstile 值为空,但按钮已启用,已强制点击完成注册。")
|
|
|
|
if clicked:
|
|
print(f"[*] 已填写注册资料并点击完成注册: {given_name} {family_name} / {password}")
|
|
# 等待页面跳转,最多 15 秒
|
|
for _ in range(30):
|
|
time.sleep(0.5)
|
|
try:
|
|
current_url = page.url
|
|
if 'sign-up' not in current_url and 'signup' not in current_url:
|
|
print(f"[*] 页面已跳转,当前URL: {current_url}")
|
|
break
|
|
except Exception:
|
|
pass
|
|
else:
|
|
# 页面未跳转,检查是否有错误提示
|
|
try:
|
|
error_text = page.run_js(r"""
|
|
const errorNode = document.querySelector('[role="alert"], .error, [data-testid="error-message"], .text-red-500, .text-error');
|
|
return errorNode ? (errorNode.innerText || errorNode.textContent || '').trim().substring(0, 200) : '';
|
|
""")
|
|
except Exception:
|
|
error_text = ""
|
|
if error_text:
|
|
raise Exception(f"注册提交失败,页面提示错误: {error_text}")
|
|
print("[*] 警告: 点击完成注册后页面未跳转,将继续尝试获取 sso cookie。")
|
|
|
|
return {
|
|
"given_name": given_name,
|
|
"family_name": family_name,
|
|
"password": password,
|
|
}
|
|
|
|
time.sleep(0.5)
|
|
|
|
raise Exception("未找到最终注册表单或完成注册按钮")
|
|
|
|
|
|
def extract_visible_numbers(timeout=60):
|
|
# 登录/注册完成后,提取页面上可见的普通数字文本,不处理任何敏感 Cookie。
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
result = page.run_js(
|
|
r"""
|
|
function isVisible(el) {
|
|
if (!el) {
|
|
return false;
|
|
}
|
|
const style = window.getComputedStyle(el);
|
|
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
|
|
return false;
|
|
}
|
|
const rect = el.getBoundingClientRect();
|
|
return rect.width > 0 && rect.height > 0;
|
|
}
|
|
|
|
const selector = [
|
|
'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
|
|
'div', 'span', 'p', 'strong', 'b', 'small',
|
|
'[data-testid]', '[class]', '[role="heading"]'
|
|
].join(',');
|
|
|
|
const seen = new Set();
|
|
const matches = [];
|
|
for (const node of document.querySelectorAll(selector)) {
|
|
if (!isVisible(node)) {
|
|
continue;
|
|
}
|
|
const text = String(node.innerText || node.textContent || '').replace(/\s+/g, ' ').trim();
|
|
if (!text) {
|
|
continue;
|
|
}
|
|
const found = text.match(/\d+(?:\.\d+)?/g);
|
|
if (!found) {
|
|
continue;
|
|
}
|
|
for (const value of found) {
|
|
const key = `${value}@@${text}`;
|
|
if (seen.has(key)) {
|
|
continue;
|
|
}
|
|
seen.add(key);
|
|
matches.push({ value, text });
|
|
}
|
|
}
|
|
|
|
return matches.slice(0, 30);
|
|
"""
|
|
)
|
|
|
|
if result:
|
|
print("[*] 页面可见数字文本提取结果:")
|
|
for item in result:
|
|
try:
|
|
print(f" - 数字: {item['value']} | 上下文: {item['text']}")
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
time.sleep(1)
|
|
|
|
raise Exception("登录后未提取到可见数字文本")
|
|
|
|
|
|
def wait_for_sso_cookie(timeout=120):
|
|
# 必须在注册完成后再取 sso,优先抓取精确的 sso cookie。
|
|
deadline = time.time() + timeout
|
|
last_seen_names = set()
|
|
last_report = 0
|
|
|
|
while time.time() < deadline:
|
|
try:
|
|
refresh_active_page()
|
|
if page is None:
|
|
time.sleep(1)
|
|
continue
|
|
|
|
cookies = page.cookies(all_domains=True, all_info=True) or []
|
|
for item in cookies:
|
|
if isinstance(item, dict):
|
|
name = str(item.get("name", "")).strip()
|
|
value = str(item.get("value", "")).strip()
|
|
else:
|
|
name = str(getattr(item, "name", "")).strip()
|
|
value = str(getattr(item, "value", "")).strip()
|
|
|
|
if name:
|
|
last_seen_names.add(name)
|
|
|
|
if name == "sso" and value:
|
|
print("[*] 注册完成后已获取到 sso cookie。")
|
|
return value
|
|
|
|
except PageDisconnectedError:
|
|
refresh_active_page()
|
|
except Exception:
|
|
pass
|
|
|
|
# 每 10 秒报告一次进度,避免看起来像是卡死
|
|
if time.time() - last_report >= 10:
|
|
try:
|
|
current_url = page.url if page else "unknown"
|
|
except Exception:
|
|
current_url = "unknown"
|
|
print(f"[*] 等待 sso cookie 中... 当前URL: {current_url} 已见cookie: {sorted(last_seen_names)}")
|
|
last_report = time.time()
|
|
|
|
time.sleep(1)
|
|
|
|
raise Exception(f"注册完成后未获取到 sso cookie,当前已见 cookie: {sorted(last_seen_names)}")
|
|
|
|
|
|
def append_sso_to_txt(sso_value, output_path=DEFAULT_SSO_FILE):
|
|
# 按用户要求,一行写一个 sso 值,持续追加。
|
|
normalized = str(sso_value or "").strip()
|
|
if not normalized:
|
|
raise Exception("待写入的 sso 为空")
|
|
|
|
with open(output_path, "a", encoding="utf-8") as file:
|
|
file.write(normalized + "\n")
|
|
|
|
print(f"[*] 已追加写入 sso 到文件: {output_path}")
|
|
|
|
|
|
def run_single_registration(output_path=DEFAULT_SSO_FILE, extract_numbers=False):
|
|
# 单轮流程:打开注册页 -> 完成注册 -> 获取 sso -> 写 txt。
|
|
open_signup_page()
|
|
email, dev_token = fill_email_and_submit()
|
|
fill_code_and_submit(email, dev_token)
|
|
profile = fill_profile_and_submit()
|
|
sso_value = wait_for_sso_cookie()
|
|
append_sso_to_txt(sso_value, output_path)
|
|
|
|
if extract_numbers:
|
|
extract_visible_numbers()
|
|
|
|
result = {
|
|
"email": email,
|
|
"sso": sso_value,
|
|
**profile,
|
|
}
|
|
print(f"[*] 本轮注册完成,邮箱: {email}")
|
|
return result
|
|
|
|
|
|
def main():
|
|
# 默认循环执行;每轮完成后关闭当前页,再自动进入下一轮。
|
|
parser = argparse.ArgumentParser(description="xAI 自动注册并采集 sso")
|
|
parser.add_argument("--count", type=int, default=0, help="执行轮数,0 表示无限循环")
|
|
parser.add_argument("--output", default=DEFAULT_SSO_FILE, help="sso 输出 txt 路径")
|
|
parser.add_argument("--extract-numbers", action="store_true", help="注册完成后额外提取页面数字文本")
|
|
args = parser.parse_args()
|
|
|
|
current_round = 0
|
|
try:
|
|
start_browser()
|
|
while True:
|
|
if args.count > 0 and current_round >= args.count:
|
|
break
|
|
|
|
current_round += 1
|
|
print(f"\n[*] 开始第 {current_round} 轮注册")
|
|
round_succeeded = False
|
|
|
|
try:
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
future = executor.submit(run_single_registration, args.output, extract_numbers=args.extract_numbers)
|
|
try:
|
|
future.result(timeout=120)
|
|
round_succeeded = True
|
|
except FutureTimeoutError:
|
|
print(f"[Error] 第 {current_round} 轮注册超时(>120秒),强制终止并进入下一轮。")
|
|
except KeyboardInterrupt:
|
|
print("\n[Info] 收到中断信号,停止后续轮次。")
|
|
break
|
|
except Exception as error:
|
|
print(f"[Error] 第 {current_round} 轮失败: {error}")
|
|
finally:
|
|
restart_browser()
|
|
|
|
if args.count == 0 or current_round < args.count:
|
|
time.sleep(2)
|
|
|
|
finally:
|
|
stop_browser()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|