Home
avatar

nax

用 AI 从零实现通用验证码求解服务:OhMyCaptcha 原理剖析

本文深入分析 OhMyCaptcha 的核心实现原理,拆解如何用 Playwright + 多模态大模型构建一个覆盖 19 种验证码类型的自托管求解服务。读完你可以理解每种验证码的破解思路,并自己实现一个。

如果你只需要过 Cloudflare 盾、不需要 AI,推荐阅读姊妹篇:《深入理解 Cloudflare 过盾原理:无需 AI,从零实现 CF Turnstile 绕过

用 AI 从零实现通用验证码求解服务:OhMyCaptcha 原理剖析


两种完全不同的验证码

在动手之前,先理解验证码的分类——因为不同类型的验证码,破解思路完全不同:

环境检测型(不需要 AI)

验证码检测方式破解思路
reCAPTCHA v3浏览器环境 + 行为评分反检测 + 模拟行为
Cloudflare Turnstile浏览器环境 + TLS 指纹反检测 + 点击
hCaptcha (基础)浏览器环境反检测 + 点击

这类验证码不弹出”选图片”的视觉挑战,只在后台检测你的浏览器是否”像真人”。

视觉挑战型(需要 AI)

验证码挑战方式破解思路
reCAPTCHA v2 (高风险)选择包含特定物体的图片视觉模型识别
hCaptcha (图片挑战)选择匹配的图片视觉模型分类
图片验证码识别扭曲文字/点击目标多模态模型推理
FunCaptcha选择正确的旋转图片视觉模型分类

这类验证码需要”看懂”图片内容,必须依赖 AI 视觉模型。

OhMyCaptcha 两种都支持——浏览器自动化处理环境检测型,AI 模型处理视觉挑战型。


整体架构:异步任务队列 + 插件化求解器

OhMyCaptcha 的核心设计是一个 YesCaptcha 兼容的异步任务系统

客户端 createTask(type, params)

TaskManager 创建任务 → 异步派发给对应的 Solver

Solver 执行求解(浏览器 / AI)

客户端 getTaskResult(taskId) → 获取结果

为什么不同步返回结果?

因为验证码求解通常需要 5-30 秒。同步等待会导致 HTTP 连接超时,所以采用异步模式:先返回 taskId,客户端轮询结果。

Solver 协议

所有求解器实现同一个接口——这是标准的策略模式

class Solver(Protocol):
    async def solve(self, params: dict[str, Any]) -> dict[str, Any]: ...

TaskManager 在启动时注册所有求解器:

# 一个 Solver 实例可以处理多种任务类型
v3_solver = RecaptchaV3Solver(config)
task_manager.register_solver("RecaptchaV3TaskProxyless", v3_solver)
task_manager.register_solver("RecaptchaV3TaskProxylessM1", v3_solver)
task_manager.register_solver("RecaptchaV3EnterpriseTask", v3_solver)
# ...

classifier = ClassificationSolver(config)
task_manager.register_solver("HCaptchaClassification", classifier)
task_manager.register_solver("ReCaptchaV2Classification", classifier)
# ...

任务管理器实现

class TaskManager:
    TASK_TTL = timedelta(minutes=10)  # 任务 10 分钟过期

    def __init__(self):
        self._tasks: dict[str, Task] = {}          # taskId -> Task
        self._solvers: dict[str, Solver] = {}      # taskType -> Solver

    def create_task(self, task_type, params) -> str:
        self._cleanup_expired()                     # 清理过期任务
        task_id = str(uuid.uuid4())
        task = Task(id=task_id, type=task_type, params=params)
        self._tasks[task_id] = task
        asyncio.create_task(self._process_task(task))  # 火即忘
        return task_id

    async def _process_task(self, task):
        solver = self._solvers.get(task.type)
        try:
            solution = await solver.solve(task.params)
            task.solution = solution
            task.status = TaskStatus.READY
        except Exception as exc:
            task.status = TaskStatus.FAILED
            task.error_description = str(exc)

纯内存存储,服务重启即丢失。这是设计上的取舍——简单、无外部依赖,但不适合高可用场景。


原理一:reCAPTCHA v3 求解(纯浏览器 · 无需 AI)

reCAPTCHA v3 是最容易理解的类型。它没有视觉挑战,完全靠后台行为评分(0.0-1.0)来判断你是否是人类。只要你是在一个”足够真实”的浏览器环境里调用 grecaptcha.execute(),就能拿到 token。

原理

  1. 用 Playwright 打开目标 URL(注入反检测脚本)
  2. 等待 grecaptcha 库加载(可能已在页面中,也可能需要手动注入)
  3. 调用 grecaptcha.execute(siteKey, {action}) 获取 token
  4. 返回 token

反检测脚本

OhMyCaptcha 的反检测比 CF-Gateway-Pro 简单得多(因为 Playwright 本身的检测特征更少):

// 隐藏自动化特征
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
window.chrome = {runtime: {}, loadTimes: () => {}, csi: () => {}};

关键 JavaScript:获取 reCAPTCHA token

这段 JS 在浏览器里执行,处理了两种情况——页面已有 reCAPTCHA 脚本和需要手动注入:

([key, action]) => new Promise((resolve, reject) => {
    // 检查 grecaptcha 是否已加载(标准版或企业版)
    const gr = window.grecaptcha?.enterprise || window.grecaptcha;
    if (gr && typeof gr.execute === 'function') {
        gr.ready(() => {
            gr.execute(key, {action}).then(resolve).catch(reject);
        });
        return;
    }

    // 没有 → 手动注入 reCAPTCHA 脚本
    const script = document.createElement('script');
    script.src = 'https://www.google.com/recaptcha/api.js?render=' + key;
    script.onerror = () => reject(new Error('Failed to load reCAPTCHA'));
    script.onload = () => {
        const g = window.grecaptcha;
        g.ready(() => {
            g.execute(key, {action}).then(resolve).catch(reject);
        });
    };
    document.head.appendChild(script);
});

模拟人类行为

reCAPTCHA v3 的评分和行为有关,所以需要模拟最基本的鼠标活动:

async def _solve_once(self, website_url, website_key, page_action):
    context = await self._browser.new_context(
        user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...",
        viewport={"width": 1920, "height": 1080},
        locale="en-US",
    )
    page = await context.new_page()
    await page.add_init_script(STEALTH_JS)

    await page.goto(website_url, wait_until="networkidle")

    # 模拟鼠标移动(提高行为评分)
    await page.mouse.move(400, 300)
    await asyncio.sleep(1)
    await page.mouse.move(600, 400)
    await asyncio.sleep(0.5)

    # 等待 grecaptcha 加载
    try:
        await page.wait_for_function(
            "(typeof grecaptcha !== 'undefined' && typeof grecaptcha.execute === 'function')"
            " || (typeof grecaptcha?.enterprise?.execute === 'function')",
            timeout=10_000,
        )
    except Exception:
        pass  # 不在页面上,后面会注入

    # 执行获取 token
    token = await page.evaluate(EXECUTE_JS, [website_key, page_action])
    return token

重试机制

所有求解器都有统一的重试逻辑:

async def solve(self, params):
    last_error = None
    for attempt in range(self._config.captcha_retries):  # 默认3次
        try:
            token = await self._solve_once(...)
            return {"gRecaptchaResponse": token}
        except Exception as exc:
            last_error = exc
            if attempt < self._config.captcha_retries - 1:
                await asyncio.sleep(2)  # 重试间隔

    raise RuntimeError(f"Failed after {retries} attempts: {last_error}")

原理二:reCAPTCHA v2 求解(浏览器 + AI 音频转写)

reCAPTCHA v2 是最复杂的求解器,因为 Google 可能在你点击 checkbox 后弹出视觉挑战。OhMyCaptcha 的策略是:如果遇到视觉挑战,切换到音频挑战,然后用 AI 转写音频

完整流程

  1. 点击 reCAPTCHA 复选框
  2. 检查是否直接通过(低风险 session)
    • → 提取 token,结束
    • → 弹出了挑战,继续以下步骤
  3. 点击”音频挑战”按钮
  4. 下载音频 MP3 文件
  5. 将音频发送给云端 AI 模型(gpt-5.4)转写
  6. 将转写文本填入输入框并提交
  7. 提取 token

点击 checkbox

reCAPTCHA v2 的 checkbox 在一个 iframe 里,需要用 Playwright 的 frame_locator

async def _solve_checkbox(self, page):
    # checkbox 在 title="reCAPTCHA" 的 iframe 中
    checkbox_frame = page.frame_locator('iframe[title="reCAPTCHA"]').first
    checkbox = checkbox_frame.locator("#recaptcha-anchor")
    await checkbox.click(timeout=10_000)
    await asyncio.sleep(2)

    # 检查是否直接通过了
    token = await page.evaluate("""
    () => {
        const textarea = document.querySelector('#g-recaptcha-response');
        if (textarea && textarea.value && textarea.value.length > 20) {
            return textarea.value;
        }
        return null;
    }
    """)

    if token:
        return token  # 直接通过,不需要解挑战

    # 需要解挑战 → 走音频路径
    return await self._solve_audio_challenge(page)

音频挑战路径

async def _solve_audio_challenge(self, page):
    # 挑战在另一个 iframe 里
    bframe = page.frame_locator('iframe[title*="recaptcha challenge"]')

    # 点击"音频挑战"按钮
    audio_btn = bframe.locator("#recaptcha-audio-button")
    await audio_btn.click(timeout=8_000)
    await asyncio.sleep(3)

    # 找到音频下载链接
    audio_src = None
    for selector in [
        ".rc-audiochallenge-tdownload-link",   # 下载链接
        "a[href*='.mp3']",                      # MP3 链接
        "audio source",                          # audio 元素
    ]:
        try:
            element = bframe.locator(selector).first
            audio_src = await element.get_attribute("href") or \
                        await element.get_attribute("src")
            if audio_src:
                break
        except Exception:
            continue

    # 下载音频文件
    async with httpx.AsyncClient() as client:
        resp = await client.get(audio_src)
        audio_bytes = resp.content

    # AI 转写音频
    transcript = await self._transcribe_audio(audio_bytes)

    # 提交转写结果
    audio_input = bframe.locator("#audio-response")
    await audio_input.fill(transcript.strip().lower())

    verify_btn = bframe.locator("#recaptcha-verify-button")
    await verify_btn.click(timeout=8_000)
    await asyncio.sleep(2)

    # 提取token
    return await page.evaluate(EXTRACT_TOKEN_JS)

AI 音频转写

这里用云端大模型将音频内容转为文字:

async def _transcribe_audio(self, audio_bytes):
    audio_b64 = base64.b64encode(audio_bytes).decode()

    payload = {
        "model": self._config.cloud_model,  # gpt-5.4
        "messages": [{
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "This is a reCAPTCHA audio challenge. "
                            "Transcribe exactly what is spoken, "
                            "digits only, separated by spaces."
                },
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:audio/mp3;base64,{audio_b64}"}
                }
            ]
        }],
        "max_tokens": 50,
        "temperature": 0,
    }

    async with httpx.AsyncClient() as client:
        resp = await client.post(
            f"{self._config.cloud_base_url}/chat/completions",
            headers={"Authorization": f"Bearer {api_key}"},
            json=payload,
        )
        data = resp.json()
        return data["choices"][0]["message"]["content"].strip()

原理三:图片验证码识别(多模态视觉模型)

这是 OhMyCaptcha 中最有创意的部分——受 Argus 项目启发,用多模态大模型直接”看”验证码图片并返回结构化的操作指令。

三种验证码类型

类型说明模型输出
click点击指定目标目标坐标列表 [{x, y, label}]
slide滑块滑动到缺口位置缺口坐标 + 滑块坐标 + 拖拽距离
drag_match拖拽匹配(物体→影子)配对坐标列表 [{from, to}]

标准化坐标空间

不同验证码图片尺寸不同,为了让模型输出的坐标通用,先将所有图片缩放到 1440×900 的标准尺寸:

TARGET_WIDTH = 1440
TARGET_HEIGHT = 900

def _preprocess_image(image_bytes):
    img = Image.open(io.BytesIO(image_bytes))
    img = img.resize((TARGET_WIDTH, TARGET_HEIGHT), Image.Resampling.LANCZOS)
    buf = io.BytesIO()
    img.save(buf, format="PNG")
    return buf.getvalue()

System Prompt 设计

这是让 AI 理解验证码的关键——一个精心设计的 system prompt:

// You are a Computer Vision Data Annotation Assistant.
// Input Image: 1440x900 pixels, origin (0,0) at top-left.
// Step 1 -- Identify the CAPTCHA type: "click" / "slide" / "drag_match"
// Step 2 -- Return STRICT JSON only.

// For "click" type:
{"captcha_type": "click", "clicks": [{"x": 123, "y": 456, "label": "cat"}]}

// For "slide" type:
{"captcha_type": "slide",
 "gap": {"x": 300, "y": 200},      // ← 背景上缺口的中心
 "slider": {"x": 30, "y": 870},    // ← 滑块手柄的中心
 "drag_distance": 270}             // ← gap.x - slider.x

// For "drag_match" type:
{"captcha_type": "drag_match",
 "pairs": [{"from": {"x":650,"y":320}, "to": {"x":180,"y":290}}]}

// 重要:滑块类型中,"slider" 是底部滑条上的手柄,
// "gap" 是背景图上的拼图缺口(不是浮动的拼图块!)

注意 prompt 对 slide 类型的特别强调——“gap 是缺口,不是浮动拼图块”。这是因为模型容易混淆这两个概念。

调用模型

async def _call_model(self, data_url):
    response = await self._client.chat.completions.create(
        model=self._config.local_model,  # Qwen3.5-2B
        temperature=0.05,                 # 接近确定性输出
        max_tokens=1024,
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": [
                {"type": "image_url", "image_url": {"url": data_url, "detail": "high"}},
                {"type": "text", "text": "Identify the CAPTCHA type and return the annotation JSON."}
            ]}
        ],
    )

    raw = response.choices[0].message.content
    return self._parse_json(raw)

JSON 解析容错

大模型有时会在 JSON 外面包一层 markdown 代码块,需要处理:

def _parse_json(text):
    # 尝试提取 ```json ... ``` 中的内容
    match = re.search(r"```(?:json)?\s*(.*?)\s*```", text, re.DOTALL)
    cleaned = match.group(1) if match else text.strip()
    return json.loads(cleaned)

原理四:图片分类验证码(按类型定制 Prompt)

图片分类和图片识别不同——识别是”看懂图片内容并返回坐标”,分类是”判断图片是否包含指定目标”。

四种分类任务

OhMyCaptcha 为每种分类任务设计了专门的 System Prompt:

hCaptcha 分类(单图/多图判断):

// Given a question and images, determine which images match.

// 单图问题
{"answer": true} // 或 {"answer": false}

// 多图问题: 0-indexed 位置
{"answer": [0, 2, 5]}

reCAPTCHA v2 分类(3×3 或 4×4 宫格):

// Cells numbered 0-8 (3x3) or 0-15 (4x4), left-to-right, top-to-bottom.
// 匹配的格子编号
{"objects": [0, 3, 6]}

FunCaptcha 分类(2×3 宫格,通常只有一个正确答案):

// Cells numbered 0-5.
// 正确的那个
{"objects": [3]}

多图片输入处理

分类任务可能一次传入多张图片,需要统一打包:

def _extract_images(params):
    images = []

    if "image" in params:          # 单张图片
        images.append(params["image"])

    if "images" in params:         # 图片列表
        images.extend(params["images"])

    if "body" in params and not images:   # 兼容 body 字段
        images.append(params["body"])

    if "queries" in params:        # hCaptcha 的 queries 格式
        if isinstance(params["queries"], list):
            images.extend(params["queries"])

    return images

图片格式处理

确保 base64 数据带正确的 MIME 前缀:

def _prepare_image(b64_data):
    if b64_data.startswith("data:image"):
        return b64_data  # 已经是 data URL

    # 尝试检测图片格式
    try:
        img_bytes = base64.b64decode(b64_data)
        img = Image.open(io.BytesIO(img_bytes))
        fmt = img.format or "PNG"
        mime = f"image/{fmt.lower()}"
        return f"data:{mime};base64,{b64_data}"
    except Exception:
        return f"data:image/png;base64,{b64_data}"

原理五:双模型后端架构

OhMyCaptcha 使用两个 AI 模型后端,各有分工:

                    ┌─────────────────────────┐
                    │   本地模型 (SGLang)       │
                    │   Qwen3.5-2B             │
                    │                          │
任务类型:            │   用于:                  │
ImageToTextTask  ──→│   - 图片验证码识别       │
*Classification  ──→│   - 图片分类             │
                    │                          │
                    │   特点: 延迟低、免费       │
                    │   但能力有限              │
                    └─────────────────────────┘

                    ┌─────────────────────────┐
                    │   云端模型 (API)          │
                    │   gpt-5.4               │
                    │                          │
任务类型:            │   用于:                  │
reCAPTCHA v2     ──→│   - 音频转写             │
(音频挑战)           │   - 复杂推理             │
                    │                          │
                    │   特点: 能力强            │
                    │   但有延迟和成本          │
                    └─────────────────────────┘

为什么分两个?

  1. 图片识别/分类调用非常频繁,每次都打远程 API 会很慢且成本高 → 本地部署小模型
  2. 音频转写频率低但需要强推理能力 → 用云端大模型
  3. 两者都暴露 OpenAI 兼容的 /v1/chat/completions 接口,代码可以统一

配置的向后兼容

# 新环境变量优先,旧变量作为 fallback
cloud_base_url = os.environ.get(
    "CLOUD_BASE_URL",                                    # 新
    os.environ.get("CAPTCHA_BASE_URL", "https://...")    # 旧
)

local_model = os.environ.get(
    "LOCAL_MODEL",                                        # 新
    os.environ.get("CAPTCHA_MULTIMODAL_MODEL", "Qwen/Qwen3.5-2B")  # 旧
)

从零实现:最小可运行的通用验证码服务

理解了以上原理,下面是一个最小化但功能完整的示例:

"""最小化验证码求解服务 - 支持 reCAPTCHA v3 + 图片识别"""

import asyncio, uuid, json, base64, re, io
from fastapi import FastAPI
from pydantic import BaseModel
from playwright.async_api import async_playwright
from openai import AsyncOpenAI
from PIL import Image

app = FastAPI()

# ========== 配置 ==========
LOCAL_MODEL_URL = "http://localhost:30000/v1"
LOCAL_MODEL_NAME = "Qwen/Qwen3.5-2B"

# ========== 任务存储 ==========
tasks = {}

# ========== 反检测脚本 ==========
STEALTH_JS = """
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
window.chrome = {runtime: {}};
"""

# ========== reCAPTCHA v3 求解 ==========
browser = None

async def solve_recaptcha_v3(params):
    global browser
    if not browser:
        pw = await async_playwright().start()
        browser = await pw.chromium.launch(
            headless=True,
            args=["--disable-blink-features=AutomationControlled"]
        )

    ctx = await browser.new_context(
        user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/131.0.0.0",
        viewport={"width": 1920, "height": 1080},
    )
    page = await ctx.new_page()
    await page.add_init_script(STEALTH_JS)

    await page.goto(params["websiteURL"], wait_until="networkidle")
    await page.mouse.move(400, 300)
    await asyncio.sleep(1)

    token = await page.evaluate("""
    ([key, action]) => new Promise((resolve, reject) => {
        const gr = window.grecaptcha?.enterprise || window.grecaptcha;
        if (gr) {
            gr.ready(() => gr.execute(key, {action}).then(resolve).catch(reject));
        } else {
            const s = document.createElement('script');
            s.src = 'https://www.google.com/recaptcha/api.js?render=' + key;
            s.onload = () => grecaptcha.ready(() =>
                grecaptcha.execute(key, {action}).then(resolve).catch(reject));
            document.head.appendChild(s);
        }
    })
    """, [params["websiteKey"], params.get("pageAction", "verify")])

    await ctx.close()
    return {"gRecaptchaResponse": token}

# ========== 图片验证码识别 ==========
client = AsyncOpenAI(base_url=LOCAL_MODEL_URL, api_key="EMPTY")

CAPTCHA_PROMPT = """You are a CAPTCHA annotation assistant.
Image: 1440x900 pixels. Return STRICT JSON only.

For click: {"captcha_type":"click","clicks":[{"x":123,"y":456}]}
For slide: {"captcha_type":"slide","gap":{"x":300},"slider":{"x":30},"drag_distance":270}"""

async def solve_image(params):
    img_bytes = base64.b64decode(params["body"])
    img = Image.open(io.BytesIO(img_bytes)).resize((1440, 900))
    buf = io.BytesIO()
    img.save(buf, format="PNG")
    b64 = base64.b64encode(buf.getvalue()).decode()

    resp = await client.chat.completions.create(
        model=LOCAL_MODEL_NAME,
        temperature=0.05,
        max_tokens=1024,
        messages=[
            {"role": "system", "content": CAPTCHA_PROMPT},
            {"role": "user", "content": [
                {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}},
                {"type": "text", "text": "Identify and annotate."}
            ]}
        ],
    )
    raw = resp.choices[0].message.content
    match = re.search(r"```(?:json)?\s*(.*?)\s*```", raw, re.DOTALL)
    result = json.loads(match.group(1) if match else raw.strip())
    return {"text": json.dumps(result)}

# ========== 求解器注册 ==========
SOLVERS = {
    "RecaptchaV3TaskProxyless": solve_recaptcha_v3,
    "ImageToTextTask": solve_image,
}

# ========== API ==========
class CreateReq(BaseModel):
    clientKey: str
    task: dict

class ResultReq(BaseModel):
    clientKey: str
    taskId: str

@app.post("/createTask")
async def create_task(req: CreateReq):
    task_type = req.task["type"]
    if task_type not in SOLVERS:
        return {"errorId": 1, "errorCode": "ERROR_TASK_NOT_SUPPORTED"}

    task_id = str(uuid.uuid4())
    tasks[task_id] = {"status": "processing"}

    async def run():
        try:
            solution = await SOLVERS[task_type](req.task)
            tasks[task_id] = {"status": "ready", "solution": solution}
        except Exception as e:
            tasks[task_id] = {"status": "failed", "error": str(e)}

    asyncio.create_task(run())
    return {"errorId": 0, "taskId": task_id}

@app.post("/getTaskResult")
async def get_result(req: ResultReq):
    task = tasks.get(req.taskId)
    if not task:
        return {"errorId": 1, "errorCode": "ERROR_NO_SUCH_CAPCHA_ID"}
    if task["status"] == "processing":
        return {"errorId": 0, "status": "processing"}
    if task["status"] == "ready":
        return {"errorId": 0, "status": "ready", "solution": task["solution"]}
    return {"errorId": 1, "errorDescription": task.get("error")}

安装依赖:

pip install fastapi uvicorn playwright openai Pillow
playwright install chromium
# 图片识别还需要部署本地模型:
# pip install sglang && python -m sglang.launch_server --model Qwen/Qwen3.5-2B --port 30000

运行:

uvicorn main:app --port 8000

与 CF-Gateway-Pro 的区别

这两个项目解决的是不同层面的问题。如果你看了姊妹篇,这里做个简明对比:

OhMyCaptchaCF-Gateway-Pro
做什么返回验证码 Token返回页面内容
你拿到结果后自己构造请求去访问目标站直接得到数据
AI 依赖重度(图片/音频任务必需)
CF 过盾效果一般专精(Cookie 复用、TLS 指纹、指纹随机化)
适合场景通用验证码服务专注爬取 CF 站点

如果你只需要过 CF 盾,用 CF-Gateway-Pro 更省事。如果你需要解各种验证码(reCAPTCHA、hCaptcha、图片验证码等),OhMyCaptcha 是更好的选择。


[!WARNING] 免责声明:本文仅供安全研究和技术学习用途。请勿对未经授权的网站使用验证码绕过技术,遵守目标网站的服务条款和当地法律法规。

AI 验证码 多模态 Playwright 技术分析