Tutorials

Python ThreadPoolExecutor für CAPTCHA zur Lösung von Parallelität

asyncio ist leistungsstark, erfordert jedoch das Umschreiben Ihrer gesamten Aufrufkette als asynchron. ThreadPoolExecutor bietet Ihnen Parallelität mit standardmäßigem synchronem Code – fügen Sie ihn ohne Umstrukturierung in bestehende Projekte ein.

Warum ThreadPoolExecutor für CAPTCHAs

Das Lösen von CAPTCHAs erfolgt mit I/O-bound (Warten auf HTTP-Antworten). Python-Threads geben die GIL während I/O-Vorgängen frei, wodurch ThreadPoolExecutor für diese Arbeitslast effizient ist:

Ansatz Komplexität Passt zu vorhandenem Code Parallelität für I/O
Sequentielle Keine Ja Keine
ThreadPoolExecutor Niedrig Ja Gut
asynchron Hoch Erfordert asynchrones Umschreiben Am besten
Mehrfachverarbeitung Mittel Meistens Overkill für I/O

Grundlegende Implementierung

import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

API_KEY = os.environ["CAPTCHAAI_API_KEY"]


def solve_captcha(sitekey, pageurl):
    """Synchronous CAPTCHA solve — submit and poll."""
    # Submit
    resp = requests.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": "userrecaptcha",
        "googlekey": sitekey,
        "pageurl": pageurl,
        "json": 1
    })
    data = resp.json()

    if data.get("status") != 1:
        raise RuntimeError(data.get("request", "Submit failed"))

    captcha_id = data["request"]

    # Poll for result
    for _ in range(60):
        time.sleep(5)
        result = requests.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY,
            "action": "get",
            "id": captcha_id,
            "json": 1
        }).json()

        if result.get("status") == 1:
            return result["request"]
        if result.get("request") != "CAPCHA_NOT_READY":
            raise RuntimeError(result.get("request", "Unknown error"))

    raise TimeoutError("Solve timeout after 300s")


# Batch solve with ThreadPoolExecutor
tasks = [
    {"sitekey": "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "pageurl": f"https://example.com/page/{i}"}
    for i in range(20)
]

start = time.time()

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {
        executor.submit(solve_captcha, t["sitekey"], t["pageurl"]): t
        for t in tasks
    }

    solved = 0
    failed = 0

    for future in as_completed(futures):
        task = futures[future]
        try:
            solution = future.result()
            solved += 1
            print(f"[OK] {task['pageurl']}: {solution[:30]}...")
        except Exception as e:
            failed += 1
            print(f"[ERR] {task['pageurl']}: {e}")

elapsed = time.time() - start
print(f"\nDone: {solved} solved, {failed} failed in {elapsed:.1f}s")

Verwenden der Sitzung zur Wiederverwendung von Verbindungen

Das Erstellen einer neuen TCP-Verbindung pro Anfrage verschwendet Zeit. Teilen Sie ein requests.Session pro Thread:

import threading

# Thread-local storage for sessions
thread_local = threading.local()


def get_session():
    """Get or create a thread-local session."""
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
        # Configure connection pooling
        adapter = requests.adapters.HTTPAdapter(
            pool_connections=10,
            pool_maxsize=10,
            max_retries=2
        )
        thread_local.session.mount("https://", adapter)
    return thread_local.session


def solve_captcha_pooled(sitekey, pageurl):
    """Solve using thread-local connection pooling."""
    session = get_session()

    resp = session.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": "userrecaptcha",
        "googlekey": sitekey,
        "pageurl": pageurl,
        "json": 1
    })
    data = resp.json()

    if data.get("status") != 1:
        raise RuntimeError(data.get("request"))

    captcha_id = data["request"]

    for _ in range(60):
        time.sleep(5)
        result = session.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY,
            "action": "get",
            "id": captcha_id,
            "json": 1
        }).json()

        if result.get("status") == 1:
            return result["request"]
        if result.get("request") != "CAPCHA_NOT_READY":
            raise RuntimeError(result.get("request"))

    raise TimeoutError("Solve timeout")

map() für einfache Stapeloperationen

Wenn Sie keine Fehlerbehandlung pro Aufgabe benötigen:

def solve_task(task):
    """Wrapper that returns result dict."""
    try:
        solution = solve_captcha_pooled(task["sitekey"], task["pageurl"])
        return {"url": task["pageurl"], "solution": solution, "error": None}
    except Exception as e:
        return {"url": task["pageurl"], "solution": None, "error": str(e)}


with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(solve_task, tasks))

solved = [r for r in results if r["solution"]]
failed = [r for r in results if r["error"]]
print(f"Solved: {len(solved)}, Failed: {len(failed)}")

Timeout-Schutz

Verhindern Sie, dass außer Kontrolle geratene Threads Ihren Pool blockieren:

from concurrent.futures import TimeoutError as FuturesTimeout

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = {
        executor.submit(solve_captcha_pooled, t["sitekey"], t["pageurl"]): t
        for t in tasks
    }

    for future in as_completed(futures, timeout=600):  # 10 min global timeout
        task = futures[future]
        try:
            solution = future.result(timeout=120)  # 2 min per task
            print(f"[OK] {task['pageurl']}")
        except FuturesTimeout:
            print(f"[TIMEOUT] {task['pageurl']}")
        except Exception as e:
            print(f"[ERR] {task['pageurl']}: {e}")

Fortschrittsrückruf

Verfolgen Sie die Fertigstellung in Echtzeit:

import threading

progress_lock = threading.Lock()
progress = {"done": 0, "total": 0}


def solve_with_progress(task):
    result = solve_task(task)
    with progress_lock:
        progress["done"] += 1
        pct = progress["done"] / progress["total"] * 100
        print(f'\r  Progress: {progress["done"]}/{progress["total"]} ({pct:.0f}%)', end="")
    return result


progress["total"] = len(tasks)

with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(solve_with_progress, tasks))

print()  # Newline after progress

Auswahl von max_workers

Arbeiter Gleichzeitige Lösungen Overhead Am besten für
5 5 Sehr niedrig Kleine Mengen, konservative Verwendung
10 10 Niedrig Allgemeine Verwendung
25 25 Mäßig Großvolumige Pipelines
50 50 Höher Maximaler Durchsatz

Mehr Worker bedeuten mehr gleichzeitige API-Verbindungen. Beginnen Sie mit 10 und steigern Sie sich, während Sie die Fehlerraten überwachen.

ThreadPoolExecutor vs. Asyncio

# ThreadPoolExecutor — drop into existing sync code
with ThreadPoolExecutor(max_workers=10) as executor:
    results = list(executor.map(solve_task, tasks))

# asyncio — requires async function chain
async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [solve_async(session, t) for t in task_list]
        results = await asyncio.gather(*tasks)

Verwenden Sie ThreadPoolExecutor, wenn:

  • Ihre vorhandene Codebasis ist synchron
  • Sie verwenden Bibliotheken, die Async nicht unterstützen (Selenium, einige ORMs)
  • Sie wollen schnelle Parallelität ohne Umstrukturierung

Verwenden Sie Asyncio, wenn:

  • Von Grund auf neu bauen
  • Maximale Effizienz zählt (weniger Betriebssystem-Threads)
  • Bereits in einem asynchronen Framework (FastAPI, aiohttp)

Fehlerbehebung

Problem Ursache Lösung
Token wird erzeugt, aber vom Ziel abgelehnt sitekey, pageurl oder Session-Kontext stimmen nicht Erfasse Parameter erneut und verwende den Token in derselben Browser- oder HTTP-Sitzung
Polling endet im Timeout Intervall, Wartezeit oder Fehlerbehandlung sind zu eng gesetzt Poll alle 5-10 Sekunden, trenne Timeout von echten Fehlercodes und logge die Ursache
Beispiel funktioniert lokal, aber nicht im Workflow Callback, Form-Feld oder Token-Injektion fehlt in der echten Zielkette Prüfe den exakten Übergabepfad vom Solver bis zur finalen Zielanfrage

Verwandte Leitfäden

  • Parallele vs. sequenzielle CAPTCHA-Lösung
  • Node.js Promise.allSettled Batch-CAPTCHA
  • Batch-CAPTCHA-Lösung mehrerer Aufgaben
Kommentare sind für diesen Artikel deaktiviert.