用 AI 从零实现通用验证码求解服务:OhMyCaptcha 原理剖析
本文深入分析 OhMyCaptcha 的核心实现原理,拆解如何用 Playwright + 多模态大模型构建一个覆盖 19 种验证码类型的自托管求解服务。读完你可以理解每种验证码的破解思路,并自己实现一个。
如果你只需要过 Cloudflare 盾、不需要 AI,推荐阅读姊妹篇:《深入理解 Cloudflare 过盾原理:无需 AI,从零实现 CF Turnstile 绕过》
用 AI 从零实现通用验证码求解服务:OhMyCaptcha 原理剖析
两种完全不同的验证码
在动手之前,先理解验证码的分类——因为不同类型的验证码,破解思路完全不同:
环境检测型(不需要 AI)
| 验证码 | 检测方式 | 破解思路 |
|---|---|---|
| reCAPTCHA v3 | 浏览器环境 + 行为评分 | 反检测 + 模拟行为 |
| Cloudflare Turnstile | 浏览器环境 + TLS 指纹 | 反检测 + 点击 |
| hCaptcha (基础) | 浏览器环境 | 反检测 + 点击 |
这类验证码不弹出”选图片”的视觉挑战,只在后台检测你的浏览器是否”像真人”。
视觉挑战型(需要 AI)
| 验证码 | 挑战方式 | 破解思路 |
|---|---|---|
| reCAPTCHA v2 (高风险) | 选择包含特定物体的图片 | 视觉模型识别 |
| hCaptcha (图片挑战) | 选择匹配的图片 | 视觉模型分类 |
| 图片验证码 | 识别扭曲文字/点击目标 | 多模态模型推理 |
| FunCaptcha | 选择正确的旋转图片 | 视觉模型分类 |
这类验证码需要”看懂”图片内容,必须依赖 AI 视觉模型。
OhMyCaptcha 两种都支持——浏览器自动化处理环境检测型,AI 模型处理视觉挑战型。
整体架构:异步任务队列 + 插件化求解器
OhMyCaptcha 的核心设计是一个 YesCaptcha 兼容的异步任务系统:
客户端 createTask(type, params)
↓
TaskManager 创建任务 → 异步派发给对应的 Solver
↓
Solver 执行求解(浏览器 / AI)
↓
客户端 getTaskResult(taskId) → 获取结果为什么不同步返回结果?
因为验证码求解通常需要 5-30 秒。同步等待会导致 HTTP 连接超时,所以采用异步模式:先返回 taskId,客户端轮询结果。
Solver 协议
所有求解器实现同一个接口——这是标准的策略模式:
class Solver(Protocol):
async def solve(self, params: dict[str, Any]) -> dict[str, Any]: ...TaskManager 在启动时注册所有求解器:
# 一个 Solver 实例可以处理多种任务类型
v3_solver = RecaptchaV3Solver(config)
task_manager.register_solver("RecaptchaV3TaskProxyless", v3_solver)
task_manager.register_solver("RecaptchaV3TaskProxylessM1", v3_solver)
task_manager.register_solver("RecaptchaV3EnterpriseTask", v3_solver)
# ...
classifier = ClassificationSolver(config)
task_manager.register_solver("HCaptchaClassification", classifier)
task_manager.register_solver("ReCaptchaV2Classification", classifier)
# ...任务管理器实现
class TaskManager:
TASK_TTL = timedelta(minutes=10) # 任务 10 分钟过期
def __init__(self):
self._tasks: dict[str, Task] = {} # taskId -> Task
self._solvers: dict[str, Solver] = {} # taskType -> Solver
def create_task(self, task_type, params) -> str:
self._cleanup_expired() # 清理过期任务
task_id = str(uuid.uuid4())
task = Task(id=task_id, type=task_type, params=params)
self._tasks[task_id] = task
asyncio.create_task(self._process_task(task)) # 火即忘
return task_id
async def _process_task(self, task):
solver = self._solvers.get(task.type)
try:
solution = await solver.solve(task.params)
task.solution = solution
task.status = TaskStatus.READY
except Exception as exc:
task.status = TaskStatus.FAILED
task.error_description = str(exc)纯内存存储,服务重启即丢失。这是设计上的取舍——简单、无外部依赖,但不适合高可用场景。
原理一:reCAPTCHA v3 求解(纯浏览器 · 无需 AI)
reCAPTCHA v3 是最容易理解的类型。它没有视觉挑战,完全靠后台行为评分(0.0-1.0)来判断你是否是人类。只要你是在一个”足够真实”的浏览器环境里调用 grecaptcha.execute(),就能拿到 token。
原理
- 用 Playwright 打开目标 URL(注入反检测脚本)
- 等待
grecaptcha库加载(可能已在页面中,也可能需要手动注入) - 调用
grecaptcha.execute(siteKey, {action})获取 token - 返回 token
反检测脚本
OhMyCaptcha 的反检测比 CF-Gateway-Pro 简单得多(因为 Playwright 本身的检测特征更少):
// 隐藏自动化特征
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
window.chrome = {runtime: {}, loadTimes: () => {}, csi: () => {}};关键 JavaScript:获取 reCAPTCHA token
这段 JS 在浏览器里执行,处理了两种情况——页面已有 reCAPTCHA 脚本和需要手动注入:
([key, action]) => new Promise((resolve, reject) => {
// 检查 grecaptcha 是否已加载(标准版或企业版)
const gr = window.grecaptcha?.enterprise || window.grecaptcha;
if (gr && typeof gr.execute === 'function') {
gr.ready(() => {
gr.execute(key, {action}).then(resolve).catch(reject);
});
return;
}
// 没有 → 手动注入 reCAPTCHA 脚本
const script = document.createElement('script');
script.src = 'https://www.google.com/recaptcha/api.js?render=' + key;
script.onerror = () => reject(new Error('Failed to load reCAPTCHA'));
script.onload = () => {
const g = window.grecaptcha;
g.ready(() => {
g.execute(key, {action}).then(resolve).catch(reject);
});
};
document.head.appendChild(script);
});模拟人类行为
reCAPTCHA v3 的评分和行为有关,所以需要模拟最基本的鼠标活动:
async def _solve_once(self, website_url, website_key, page_action):
context = await self._browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...",
viewport={"width": 1920, "height": 1080},
locale="en-US",
)
page = await context.new_page()
await page.add_init_script(STEALTH_JS)
await page.goto(website_url, wait_until="networkidle")
# 模拟鼠标移动(提高行为评分)
await page.mouse.move(400, 300)
await asyncio.sleep(1)
await page.mouse.move(600, 400)
await asyncio.sleep(0.5)
# 等待 grecaptcha 加载
try:
await page.wait_for_function(
"(typeof grecaptcha !== 'undefined' && typeof grecaptcha.execute === 'function')"
" || (typeof grecaptcha?.enterprise?.execute === 'function')",
timeout=10_000,
)
except Exception:
pass # 不在页面上,后面会注入
# 执行获取 token
token = await page.evaluate(EXECUTE_JS, [website_key, page_action])
return token重试机制
所有求解器都有统一的重试逻辑:
async def solve(self, params):
last_error = None
for attempt in range(self._config.captcha_retries): # 默认3次
try:
token = await self._solve_once(...)
return {"gRecaptchaResponse": token}
except Exception as exc:
last_error = exc
if attempt < self._config.captcha_retries - 1:
await asyncio.sleep(2) # 重试间隔
raise RuntimeError(f"Failed after {retries} attempts: {last_error}")原理二:reCAPTCHA v2 求解(浏览器 + AI 音频转写)
reCAPTCHA v2 是最复杂的求解器,因为 Google 可能在你点击 checkbox 后弹出视觉挑战。OhMyCaptcha 的策略是:如果遇到视觉挑战,切换到音频挑战,然后用 AI 转写音频。
完整流程
- 点击 reCAPTCHA 复选框
- 检查是否直接通过(低风险 session)
- 是 → 提取 token,结束
- 否 → 弹出了挑战,继续以下步骤
- 点击”音频挑战”按钮
- 下载音频 MP3 文件
- 将音频发送给云端 AI 模型(gpt-5.4)转写
- 将转写文本填入输入框并提交
- 提取 token
点击 checkbox
reCAPTCHA v2 的 checkbox 在一个 iframe 里,需要用 Playwright 的 frame_locator:
async def _solve_checkbox(self, page):
# checkbox 在 title="reCAPTCHA" 的 iframe 中
checkbox_frame = page.frame_locator('iframe[title="reCAPTCHA"]').first
checkbox = checkbox_frame.locator("#recaptcha-anchor")
await checkbox.click(timeout=10_000)
await asyncio.sleep(2)
# 检查是否直接通过了
token = await page.evaluate("""
() => {
const textarea = document.querySelector('#g-recaptcha-response');
if (textarea && textarea.value && textarea.value.length > 20) {
return textarea.value;
}
return null;
}
""")
if token:
return token # 直接通过,不需要解挑战
# 需要解挑战 → 走音频路径
return await self._solve_audio_challenge(page)音频挑战路径
async def _solve_audio_challenge(self, page):
# 挑战在另一个 iframe 里
bframe = page.frame_locator('iframe[title*="recaptcha challenge"]')
# 点击"音频挑战"按钮
audio_btn = bframe.locator("#recaptcha-audio-button")
await audio_btn.click(timeout=8_000)
await asyncio.sleep(3)
# 找到音频下载链接
audio_src = None
for selector in [
".rc-audiochallenge-tdownload-link", # 下载链接
"a[href*='.mp3']", # MP3 链接
"audio source", # audio 元素
]:
try:
element = bframe.locator(selector).first
audio_src = await element.get_attribute("href") or \
await element.get_attribute("src")
if audio_src:
break
except Exception:
continue
# 下载音频文件
async with httpx.AsyncClient() as client:
resp = await client.get(audio_src)
audio_bytes = resp.content
# AI 转写音频
transcript = await self._transcribe_audio(audio_bytes)
# 提交转写结果
audio_input = bframe.locator("#audio-response")
await audio_input.fill(transcript.strip().lower())
verify_btn = bframe.locator("#recaptcha-verify-button")
await verify_btn.click(timeout=8_000)
await asyncio.sleep(2)
# 提取token
return await page.evaluate(EXTRACT_TOKEN_JS)AI 音频转写
这里用云端大模型将音频内容转为文字:
async def _transcribe_audio(self, audio_bytes):
audio_b64 = base64.b64encode(audio_bytes).decode()
payload = {
"model": self._config.cloud_model, # gpt-5.4
"messages": [{
"role": "user",
"content": [
{
"type": "text",
"text": "This is a reCAPTCHA audio challenge. "
"Transcribe exactly what is spoken, "
"digits only, separated by spaces."
},
{
"type": "image_url",
"image_url": {"url": f"data:audio/mp3;base64,{audio_b64}"}
}
]
}],
"max_tokens": 50,
"temperature": 0,
}
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self._config.cloud_base_url}/chat/completions",
headers={"Authorization": f"Bearer {api_key}"},
json=payload,
)
data = resp.json()
return data["choices"][0]["message"]["content"].strip()原理三:图片验证码识别(多模态视觉模型)
这是 OhMyCaptcha 中最有创意的部分——受 Argus 项目启发,用多模态大模型直接”看”验证码图片并返回结构化的操作指令。
三种验证码类型
| 类型 | 说明 | 模型输出 |
|---|---|---|
click | 点击指定目标 | 目标坐标列表 [{x, y, label}] |
slide | 滑块滑动到缺口位置 | 缺口坐标 + 滑块坐标 + 拖拽距离 |
drag_match | 拖拽匹配(物体→影子) | 配对坐标列表 [{from, to}] |
标准化坐标空间
不同验证码图片尺寸不同,为了让模型输出的坐标通用,先将所有图片缩放到 1440×900 的标准尺寸:
TARGET_WIDTH = 1440
TARGET_HEIGHT = 900
def _preprocess_image(image_bytes):
img = Image.open(io.BytesIO(image_bytes))
img = img.resize((TARGET_WIDTH, TARGET_HEIGHT), Image.Resampling.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()System Prompt 设计
这是让 AI 理解验证码的关键——一个精心设计的 system prompt:
// You are a Computer Vision Data Annotation Assistant.
// Input Image: 1440x900 pixels, origin (0,0) at top-left.
// Step 1 -- Identify the CAPTCHA type: "click" / "slide" / "drag_match"
// Step 2 -- Return STRICT JSON only.
// For "click" type:
{"captcha_type": "click", "clicks": [{"x": 123, "y": 456, "label": "cat"}]}
// For "slide" type:
{"captcha_type": "slide",
"gap": {"x": 300, "y": 200}, // ← 背景上缺口的中心
"slider": {"x": 30, "y": 870}, // ← 滑块手柄的中心
"drag_distance": 270} // ← gap.x - slider.x
// For "drag_match" type:
{"captcha_type": "drag_match",
"pairs": [{"from": {"x":650,"y":320}, "to": {"x":180,"y":290}}]}
// 重要:滑块类型中,"slider" 是底部滑条上的手柄,
// "gap" 是背景图上的拼图缺口(不是浮动的拼图块!)注意 prompt 对 slide 类型的特别强调——“gap 是缺口,不是浮动拼图块”。这是因为模型容易混淆这两个概念。
调用模型
async def _call_model(self, data_url):
response = await self._client.chat.completions.create(
model=self._config.local_model, # Qwen3.5-2B
temperature=0.05, # 接近确定性输出
max_tokens=1024,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": data_url, "detail": "high"}},
{"type": "text", "text": "Identify the CAPTCHA type and return the annotation JSON."}
]}
],
)
raw = response.choices[0].message.content
return self._parse_json(raw)JSON 解析容错
大模型有时会在 JSON 外面包一层 markdown 代码块,需要处理:
def _parse_json(text):
# 尝试提取 ```json ... ``` 中的内容
match = re.search(r"```(?:json)?\s*(.*?)\s*```", text, re.DOTALL)
cleaned = match.group(1) if match else text.strip()
return json.loads(cleaned)原理四:图片分类验证码(按类型定制 Prompt)
图片分类和图片识别不同——识别是”看懂图片内容并返回坐标”,分类是”判断图片是否包含指定目标”。
四种分类任务
OhMyCaptcha 为每种分类任务设计了专门的 System Prompt:
hCaptcha 分类(单图/多图判断):
// Given a question and images, determine which images match.
// 单图问题
{"answer": true} // 或 {"answer": false}
// 多图问题: 0-indexed 位置
{"answer": [0, 2, 5]}reCAPTCHA v2 分类(3×3 或 4×4 宫格):
// Cells numbered 0-8 (3x3) or 0-15 (4x4), left-to-right, top-to-bottom.
// 匹配的格子编号
{"objects": [0, 3, 6]}FunCaptcha 分类(2×3 宫格,通常只有一个正确答案):
// Cells numbered 0-5.
// 正确的那个
{"objects": [3]}多图片输入处理
分类任务可能一次传入多张图片,需要统一打包:
def _extract_images(params):
images = []
if "image" in params: # 单张图片
images.append(params["image"])
if "images" in params: # 图片列表
images.extend(params["images"])
if "body" in params and not images: # 兼容 body 字段
images.append(params["body"])
if "queries" in params: # hCaptcha 的 queries 格式
if isinstance(params["queries"], list):
images.extend(params["queries"])
return images图片格式处理
确保 base64 数据带正确的 MIME 前缀:
def _prepare_image(b64_data):
if b64_data.startswith("data:image"):
return b64_data # 已经是 data URL
# 尝试检测图片格式
try:
img_bytes = base64.b64decode(b64_data)
img = Image.open(io.BytesIO(img_bytes))
fmt = img.format or "PNG"
mime = f"image/{fmt.lower()}"
return f"data:{mime};base64,{b64_data}"
except Exception:
return f"data:image/png;base64,{b64_data}"原理五:双模型后端架构
OhMyCaptcha 使用两个 AI 模型后端,各有分工:
┌─────────────────────────┐
│ 本地模型 (SGLang) │
│ Qwen3.5-2B │
│ │
任务类型: │ 用于: │
ImageToTextTask ──→│ - 图片验证码识别 │
*Classification ──→│ - 图片分类 │
│ │
│ 特点: 延迟低、免费 │
│ 但能力有限 │
└─────────────────────────┘
┌─────────────────────────┐
│ 云端模型 (API) │
│ gpt-5.4 │
│ │
任务类型: │ 用于: │
reCAPTCHA v2 ──→│ - 音频转写 │
(音频挑战) │ - 复杂推理 │
│ │
│ 特点: 能力强 │
│ 但有延迟和成本 │
└─────────────────────────┘为什么分两个?
- 图片识别/分类调用非常频繁,每次都打远程 API 会很慢且成本高 → 本地部署小模型
- 音频转写频率低但需要强推理能力 → 用云端大模型
- 两者都暴露 OpenAI 兼容的
/v1/chat/completions接口,代码可以统一
配置的向后兼容
# 新环境变量优先,旧变量作为 fallback
cloud_base_url = os.environ.get(
"CLOUD_BASE_URL", # 新
os.environ.get("CAPTCHA_BASE_URL", "https://...") # 旧
)
local_model = os.environ.get(
"LOCAL_MODEL", # 新
os.environ.get("CAPTCHA_MULTIMODAL_MODEL", "Qwen/Qwen3.5-2B") # 旧
)从零实现:最小可运行的通用验证码服务
理解了以上原理,下面是一个最小化但功能完整的示例:
"""最小化验证码求解服务 - 支持 reCAPTCHA v3 + 图片识别"""
import asyncio, uuid, json, base64, re, io
from fastapi import FastAPI
from pydantic import BaseModel
from playwright.async_api import async_playwright
from openai import AsyncOpenAI
from PIL import Image
app = FastAPI()
# ========== 配置 ==========
LOCAL_MODEL_URL = "http://localhost:30000/v1"
LOCAL_MODEL_NAME = "Qwen/Qwen3.5-2B"
# ========== 任务存储 ==========
tasks = {}
# ========== 反检测脚本 ==========
STEALTH_JS = """
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
window.chrome = {runtime: {}};
"""
# ========== reCAPTCHA v3 求解 ==========
browser = None
async def solve_recaptcha_v3(params):
global browser
if not browser:
pw = await async_playwright().start()
browser = await pw.chromium.launch(
headless=True,
args=["--disable-blink-features=AutomationControlled"]
)
ctx = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0",
viewport={"width": 1920, "height": 1080},
)
page = await ctx.new_page()
await page.add_init_script(STEALTH_JS)
await page.goto(params["websiteURL"], wait_until="networkidle")
await page.mouse.move(400, 300)
await asyncio.sleep(1)
token = await page.evaluate("""
([key, action]) => new Promise((resolve, reject) => {
const gr = window.grecaptcha?.enterprise || window.grecaptcha;
if (gr) {
gr.ready(() => gr.execute(key, {action}).then(resolve).catch(reject));
} else {
const s = document.createElement('script');
s.src = 'https://www.google.com/recaptcha/api.js?render=' + key;
s.onload = () => grecaptcha.ready(() =>
grecaptcha.execute(key, {action}).then(resolve).catch(reject));
document.head.appendChild(s);
}
})
""", [params["websiteKey"], params.get("pageAction", "verify")])
await ctx.close()
return {"gRecaptchaResponse": token}
# ========== 图片验证码识别 ==========
client = AsyncOpenAI(base_url=LOCAL_MODEL_URL, api_key="EMPTY")
CAPTCHA_PROMPT = """You are a CAPTCHA annotation assistant.
Image: 1440x900 pixels. Return STRICT JSON only.
For click: {"captcha_type":"click","clicks":[{"x":123,"y":456}]}
For slide: {"captcha_type":"slide","gap":{"x":300},"slider":{"x":30},"drag_distance":270}"""
async def solve_image(params):
img_bytes = base64.b64decode(params["body"])
img = Image.open(io.BytesIO(img_bytes)).resize((1440, 900))
buf = io.BytesIO()
img.save(buf, format="PNG")
b64 = base64.b64encode(buf.getvalue()).decode()
resp = await client.chat.completions.create(
model=LOCAL_MODEL_NAME,
temperature=0.05,
max_tokens=1024,
messages=[
{"role": "system", "content": CAPTCHA_PROMPT},
{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}},
{"type": "text", "text": "Identify and annotate."}
]}
],
)
raw = resp.choices[0].message.content
match = re.search(r"```(?:json)?\s*(.*?)\s*```", raw, re.DOTALL)
result = json.loads(match.group(1) if match else raw.strip())
return {"text": json.dumps(result)}
# ========== 求解器注册 ==========
SOLVERS = {
"RecaptchaV3TaskProxyless": solve_recaptcha_v3,
"ImageToTextTask": solve_image,
}
# ========== API ==========
class CreateReq(BaseModel):
clientKey: str
task: dict
class ResultReq(BaseModel):
clientKey: str
taskId: str
@app.post("/createTask")
async def create_task(req: CreateReq):
task_type = req.task["type"]
if task_type not in SOLVERS:
return {"errorId": 1, "errorCode": "ERROR_TASK_NOT_SUPPORTED"}
task_id = str(uuid.uuid4())
tasks[task_id] = {"status": "processing"}
async def run():
try:
solution = await SOLVERS[task_type](req.task)
tasks[task_id] = {"status": "ready", "solution": solution}
except Exception as e:
tasks[task_id] = {"status": "failed", "error": str(e)}
asyncio.create_task(run())
return {"errorId": 0, "taskId": task_id}
@app.post("/getTaskResult")
async def get_result(req: ResultReq):
task = tasks.get(req.taskId)
if not task:
return {"errorId": 1, "errorCode": "ERROR_NO_SUCH_CAPCHA_ID"}
if task["status"] == "processing":
return {"errorId": 0, "status": "processing"}
if task["status"] == "ready":
return {"errorId": 0, "status": "ready", "solution": task["solution"]}
return {"errorId": 1, "errorDescription": task.get("error")}安装依赖:
pip install fastapi uvicorn playwright openai Pillow
playwright install chromium
# 图片识别还需要部署本地模型:
# pip install sglang && python -m sglang.launch_server --model Qwen/Qwen3.5-2B --port 30000运行:
uvicorn main:app --port 8000与 CF-Gateway-Pro 的区别
这两个项目解决的是不同层面的问题。如果你看了姊妹篇,这里做个简明对比:
| OhMyCaptcha | CF-Gateway-Pro | |
|---|---|---|
| 做什么 | 返回验证码 Token | 返回页面内容 |
| 你拿到结果后 | 自己构造请求去访问目标站 | 直接得到数据 |
| AI 依赖 | 重度(图片/音频任务必需) | 无 |
| CF 过盾效果 | 一般 | 专精(Cookie 复用、TLS 指纹、指纹随机化) |
| 适合场景 | 通用验证码服务 | 专注爬取 CF 站点 |
如果你只需要过 CF 盾,用 CF-Gateway-Pro 更省事。如果你需要解各种验证码(reCAPTCHA、hCaptcha、图片验证码等),OhMyCaptcha 是更好的选择。
[!WARNING] 免责声明:本文仅供安全研究和技术学习用途。请勿对未经授权的网站使用验证码绕过技术,遵守目标网站的服务条款和当地法律法规。