Wenn das CAPTCHA-Lösungsvolumen Tausende von Aufgaben pro Stunde erreicht, benötigen Sie mehr als eine einfache Warteschlange. Apache Kafka bietet dauerhaftes, geordnetes Nachrichten-Streaming mit hohem Durchsatz – ideal, um die Übermittlung von CAPTCHA-Aufgaben von der Ergebnisverarbeitung im großen Maßstab zu entkoppeln.
Kafka ist aber nicht automatisch die richtige erste Antwort. Wenn ein einzelner Dienst nur wenige hundert Aufgaben pro Tag verarbeitet, ist eine einfachere Queue oft günstiger und leichter zu betreiben. Kafka wird interessant, wenn Sie Rebalancing, Consumer Groups, Replays und klar getrennte Producer- und Consumer-Pfade wirklich brauchen.
Wann lohnt sich Kafka hier?
| Betriebsrealität | Kafka passt gut | Einfachere Queue reicht oft |
|---|---|---|
| Tausende Aufgaben pro Stunde, mehrere Konsumenten | Ja | – |
| Replay, Audit oder Streaming-Auswertung sind wichtig | Ja | – |
| Ein kleiner interner Workflow mit begrenztem Volumen | – | Häufig ausreichend |
| Sie brauchen nur eine robuste Aufgabenliste, keine Streaming-Plattform | – | Häufig ausreichend |
Architektur
[Scrapers] → Produce → [Kafka: captcha-tasks topic]
↓
[CAPTCHA Worker Group]
(consume tasks, solve via CaptchaAI)
↓
Produce → [Kafka: captcha-results topic]
↓
[Result Consumer Group]
(process solutions, update database)
Zwei Kafka-Themen trennen die Anliegen:
captcha-tasks– CAPTCHA-Parameter warten darauf, gelöst zu werdencaptcha-results– Gelöste Token, bereit für die nachgelagerte Verwendung
Voraussetzungen
# Python
pip install kafka-python requests
# Node.js
npm install kafkajs axios
Kafka-Broker, der auf localhost:9092 (oder Ihrer Cluster-Adresse) ausgeführt wird.
Schritt 1: Themen erstellen
kafka-topics.sh --create --topic captcha-tasks \
--partitions 6 --replication-factor 1 \
--bootstrap-server localhost:9092
kafka-topics.sh --create --topic captcha-results \
--partitions 6 --replication-factor 1 \
--bootstrap-server localhost:9092
Sechs Partitionen ermöglichen bis zu sechs parallele Verbraucher pro Gruppe.
Schritt 2: Aufgabenersteller (Scraper-Seite)
Python
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
key_serializer=lambda k: k.encode("utf-8") if k else None,
acks="all", # Wait for all replicas to confirm
retries=3
)
def enqueue_captcha(task_id, sitekey, pageurl, captcha_type="userrecaptcha"):
"""Send a CAPTCHA task to Kafka."""
task = {
"task_id": task_id,
"method": captcha_type,
"sitekey": sitekey,
"pageurl": pageurl,
"submitted_at": __import__("time").time()
}
future = producer.send(
"captcha-tasks",
key=task_id, # Key ensures same task goes to same partition
value=task
)
future.get(timeout=10) # Block until confirmed
return task_id
# Submit tasks
enqueue_captcha("task_001", "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "https://example.com")
enqueue_captcha("task_002", "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "https://example.com")
producer.flush()
JavaScript
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: "captcha-producer",
brokers: ["localhost:9092"],
});
const producer = kafka.producer();
async function enqueueCaptcha(taskId, sitekey, pageurl) {
await producer.connect();
const task = {
task_id: taskId,
method: "userrecaptcha",
sitekey: sitekey,
pageurl: pageurl,
submitted_at: Date.now(),
};
await producer.send({
topic: "captcha-tasks",
messages: [{ key: taskId, value: JSON.stringify(task) }],
});
}
(async () => {
await enqueueCaptcha(
"task_001",
"6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-",
"https://example.com"
);
await producer.disconnect();
})();
Schritt 3: CAPTCHA-Worker (Verbraucher + Solver)
Python
import json
import os
import time
import requests
from kafka import KafkaConsumer, KafkaProducer
API_KEY = os.environ["CAPTCHAAI_API_KEY"]
consumer = KafkaConsumer(
"captcha-tasks",
bootstrap_servers=["localhost:9092"],
group_id="captcha-workers",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
auto_offset_reset="earliest",
enable_auto_commit=False, # Manual commit after processing
max_poll_records=10
)
result_producer = KafkaProducer(
bootstrap_servers=["localhost:9092"],
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
def solve_captcha(task):
"""Submit to CaptchaAI and poll for result."""
# Submit
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY,
"method": task["method"],
"googlekey": task["sitekey"],
"pageurl": task["pageurl"],
"json": 1
})
data = resp.json()
if data.get("status") != 1:
return {"error": data.get("request")}
captcha_id = data["request"]
# Poll for result
for _ in range(60):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY,
"action": "get",
"id": captcha_id,
"json": 1
}).json()
if result.get("status") == 1:
return {"solution": result["request"]}
if result.get("request") != "CAPCHA_NOT_READY":
return {"error": result.get("request")}
return {"error": "TIMEOUT"}
# Main consumer loop
print("CAPTCHA worker started. Waiting for tasks...")
for message in consumer:
task = message.value
print(f"Processing {task['task_id']}...")
result = solve_captcha(task)
result["task_id"] = task["task_id"]
result["solved_at"] = time.time()
# Publish result
result_producer.send("captcha-results", value=result)
result_producer.flush()
# Commit offset after successful processing
consumer.commit()
print(f" → {task['task_id']}: {'solved' if 'solution' in result else result.get('error')}")
JavaScript
const { Kafka } = require("kafkajs");
const axios = require("axios");
const API_KEY = process.env.CAPTCHAAI_API_KEY;
const kafka = new Kafka({
clientId: "captcha-worker",
brokers: ["localhost:9092"],
});
const consumer = kafka.consumer({ groupId: "captcha-workers" });
const producer = kafka.producer();
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function solveCaptcha(task) {
const submitResp = await axios.post(
"https://ocr.captchaai.com/in.php",
null,
{
params: {
key: API_KEY,
method: task.method,
googlekey: task.sitekey,
pageurl: task.pageurl,
json: 1,
},
}
);
if (submitResp.data.status !== 1) {
return { error: submitResp.data.request };
}
const captchaId = submitResp.data.request;
for (let i = 0; i < 60; i++) {
await sleep(5000);
const result = await axios.get("https://ocr.captchaai.com/res.php", {
params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
});
if (result.data.status === 1) return { solution: result.data.request };
if (result.data.request !== "CAPCHA_NOT_READY")
return { error: result.data.request };
}
return { error: "TIMEOUT" };
}
async function run() {
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: "captcha-tasks", fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const task = JSON.parse(message.value.toString());
console.log(`Processing ${task.task_id}...`);
const result = await solveCaptcha(task);
result.task_id = task.task_id;
result.solved_at = Date.now();
await producer.send({
topic: "captcha-results",
messages: [{ value: JSON.stringify(result) }],
});
console.log(
` → ${task.task_id}: ${result.solution ? "solved" : result.error}`
);
},
});
}
run();
Skalierende Arbeiter
Kafka-Verbrauchergruppen verteilen Partitionen automatisch an alle Mitarbeiter:
# 6 partitions, 3 workers → each worker gets 2 partitions
Worker-1: partitions 0, 1
Worker-2: partitions 2, 3
Worker-3: partitions 4, 5
# Add Worker-4 → rebalance
Worker-1: partitions 0, 1
Worker-2: partitions 2
Worker-3: partitions 3, 4
Worker-4: partition 5
Skalieren Sie auf die Anzahl der Partitionen. Fügen Sie darüber hinaus weitere Partitionen hinzu.
Überwachung
Verfolgen Sie wichtige Kennzahlen über die Verbraucherverzögerung von Kafka:
kafka-consumer-groups.sh --describe --group captcha-workers \
--bootstrap-server localhost:9092
| Metrisch | Gesund | Warnung |
|---|---|---|
| Verbraucherverzögerung | < 100 | > 1000 (Arbeiter hinzufügen) |
| Nachrichten/sec in | Entspricht der Schaberrate | Spitzen deuten auf Platzen hin |
| Nachrichten/sec aus | Entspricht der Rate | Zurückfallen = Engpass |
Fehlerbehebung
| Problem | Ursache | Lösung |
|---|---|---|
| Worker ist erreichbar, verarbeitet aber keine Aufgaben | Queue, Credentials oder Eingabestrom stimmen nicht | Prüfe Queue-Tiefe, API-Key, Health-Checks und Fehlerraten pro Worker gemeinsam |
| Fehlerrate steigt nach Rollout | Neue Version verändert Session-, Proxy- oder Retry-Verhalten | Vergleiche erfolgreiche und fehlschlagende Runs zwischen alter und neuer Version und rolle bei Bedarf zurück |
| Canary oder Health-Check bleibt rot | Abhängigkeiten, Zeitlimits oder Secrets weichen von der Zielumgebung ab | Prüfe Secrets, Netzwerkpfade und Schwellenwerte in exakt derselben Umgebung |
Verwandte Leitfäden
- RabbitMQ + CaptchaAI Message-Queue-Integration
- Redis-Warteschlange + CaptchaAI
- NATS Messaging CaptchaAI Aufgabenverteilung