asyncio ist leistungsstark, erfordert jedoch das Umschreiben Ihrer gesamten Aufrufkette als asynchron. ThreadPoolExecutor bietet Ihnen Parallelität mit standardmäßigem synchronem Code – fügen Sie ihn ohne Umstrukturierung in bestehende Projekte ein.
Warum ThreadPoolExecutor für CAPTCHAs
Das Lösen von CAPTCHAs erfolgt mit I/O-bound (Warten auf HTTP-Antworten). Python-Threads geben die GIL während I/O-Vorgängen frei, wodurch ThreadPoolExecutor für diese Arbeitslast effizient ist:
| Ansatz | Komplexität | Passt zu vorhandenem Code | Parallelität für I/O |
|---|---|---|---|
| Sequentielle | Keine | Ja | Keine |
| ThreadPoolExecutor | Niedrig | Ja | Gut |
| asynchron | Hoch | Erfordert asynchrones Umschreiben | Am besten |
| Mehrfachverarbeitung | Mittel | Meistens | Overkill für I/O |
Grundlegende Implementierung
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
API_KEY = os.environ["CAPTCHAAI_API_KEY"]
def solve_captcha(sitekey, pageurl):
"""Synchronous CAPTCHA solve — submit and poll."""
# Submit
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY,
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": pageurl,
"json": 1
})
data = resp.json()
if data.get("status") != 1:
raise RuntimeError(data.get("request", "Submit failed"))
captcha_id = data["request"]
# Poll for result
for _ in range(60):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY,
"action": "get",
"id": captcha_id,
"json": 1
}).json()
if result.get("status") == 1:
return result["request"]
if result.get("request") != "CAPCHA_NOT_READY":
raise RuntimeError(result.get("request", "Unknown error"))
raise TimeoutError("Solve timeout after 300s")
# Batch solve with ThreadPoolExecutor
tasks = [
{"sitekey": "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "pageurl": f"https://example.com/page/{i}"}
for i in range(20)
]
start = time.time()
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(solve_captcha, t["sitekey"], t["pageurl"]): t
for t in tasks
}
solved = 0
failed = 0
for future in as_completed(futures):
task = futures[future]
try:
solution = future.result()
solved += 1
print(f"[OK] {task['pageurl']}: {solution[:30]}...")
except Exception as e:
failed += 1
print(f"[ERR] {task['pageurl']}: {e}")
elapsed = time.time() - start
print(f"\nDone: {solved} solved, {failed} failed in {elapsed:.1f}s")
Verwenden der Sitzung zur Wiederverwendung von Verbindungen
Das Erstellen einer neuen TCP-Verbindung pro Anfrage verschwendet Zeit. Teilen Sie ein requests.Session pro Thread:
import threading
# Thread-local storage for sessions
thread_local = threading.local()
def get_session():
"""Get or create a thread-local session."""
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
# Configure connection pooling
adapter = requests.adapters.HTTPAdapter(
pool_connections=10,
pool_maxsize=10,
max_retries=2
)
thread_local.session.mount("https://", adapter)
return thread_local.session
def solve_captcha_pooled(sitekey, pageurl):
"""Solve using thread-local connection pooling."""
session = get_session()
resp = session.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY,
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": pageurl,
"json": 1
})
data = resp.json()
if data.get("status") != 1:
raise RuntimeError(data.get("request"))
captcha_id = data["request"]
for _ in range(60):
time.sleep(5)
result = session.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY,
"action": "get",
"id": captcha_id,
"json": 1
}).json()
if result.get("status") == 1:
return result["request"]
if result.get("request") != "CAPCHA_NOT_READY":
raise RuntimeError(result.get("request"))
raise TimeoutError("Solve timeout")
map() für einfache Stapeloperationen
Wenn Sie keine Fehlerbehandlung pro Aufgabe benötigen:
def solve_task(task):
"""Wrapper that returns result dict."""
try:
solution = solve_captcha_pooled(task["sitekey"], task["pageurl"])
return {"url": task["pageurl"], "solution": solution, "error": None}
except Exception as e:
return {"url": task["pageurl"], "solution": None, "error": str(e)}
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(solve_task, tasks))
solved = [r for r in results if r["solution"]]
failed = [r for r in results if r["error"]]
print(f"Solved: {len(solved)}, Failed: {len(failed)}")
Timeout-Schutz
Verhindern Sie, dass außer Kontrolle geratene Threads Ihren Pool blockieren:
from concurrent.futures import TimeoutError as FuturesTimeout
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(solve_captcha_pooled, t["sitekey"], t["pageurl"]): t
for t in tasks
}
for future in as_completed(futures, timeout=600): # 10 min global timeout
task = futures[future]
try:
solution = future.result(timeout=120) # 2 min per task
print(f"[OK] {task['pageurl']}")
except FuturesTimeout:
print(f"[TIMEOUT] {task['pageurl']}")
except Exception as e:
print(f"[ERR] {task['pageurl']}: {e}")
Fortschrittsrückruf
Verfolgen Sie die Fertigstellung in Echtzeit:
import threading
progress_lock = threading.Lock()
progress = {"done": 0, "total": 0}
def solve_with_progress(task):
result = solve_task(task)
with progress_lock:
progress["done"] += 1
pct = progress["done"] / progress["total"] * 100
print(f'\r Progress: {progress["done"]}/{progress["total"]} ({pct:.0f}%)', end="")
return result
progress["total"] = len(tasks)
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(solve_with_progress, tasks))
print() # Newline after progress
Auswahl von max_workers
| Arbeiter | Gleichzeitige Lösungen | Overhead | Am besten für |
|---|---|---|---|
| 5 | 5 | Sehr niedrig | Kleine Mengen, konservative Verwendung |
| 10 | 10 | Niedrig | Allgemeine Verwendung |
| 25 | 25 | Mäßig | Großvolumige Pipelines |
| 50 | 50 | Höher | Maximaler Durchsatz |
Mehr Worker bedeuten mehr gleichzeitige API-Verbindungen. Beginnen Sie mit 10 und steigern Sie sich, während Sie die Fehlerraten überwachen.
ThreadPoolExecutor vs. Asyncio
# ThreadPoolExecutor — drop into existing sync code
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(solve_task, tasks))
# asyncio — requires async function chain
async def main():
async with aiohttp.ClientSession() as session:
tasks = [solve_async(session, t) for t in task_list]
results = await asyncio.gather(*tasks)
Verwenden Sie ThreadPoolExecutor, wenn:
- Ihre vorhandene Codebasis ist synchron
- Sie verwenden Bibliotheken, die Async nicht unterstützen (Selenium, einige ORMs)
- Sie wollen schnelle Parallelität ohne Umstrukturierung
Verwenden Sie Asyncio, wenn:
- Von Grund auf neu bauen
- Maximale Effizienz zählt (weniger Betriebssystem-Threads)
- Bereits in einem asynchronen Framework (FastAPI, aiohttp)
Fehlerbehebung
| Problem | Ursache | Lösung |
|---|---|---|
| Token wird erzeugt, aber vom Ziel abgelehnt | sitekey, pageurl oder Session-Kontext stimmen nicht | Erfasse Parameter erneut und verwende den Token in derselben Browser- oder HTTP-Sitzung |
| Polling endet im Timeout | Intervall, Wartezeit oder Fehlerbehandlung sind zu eng gesetzt | Poll alle 5-10 Sekunden, trenne Timeout von echten Fehlercodes und logge die Ursache |
| Beispiel funktioniert lokal, aber nicht im Workflow | Callback, Form-Feld oder Token-Injektion fehlt in der echten Zielkette | Prüfe den exakten Übergabepfad vom Solver bis zur finalen Zielanfrage |
Verwandte Leitfäden
- Parallele vs. sequenzielle CAPTCHA-Lösung
- Node.js Promise.allSettled Batch-CAPTCHA
- Batch-CAPTCHA-Lösung mehrerer Aufgaben