DevOps & Skalierung

Apache Kafka + CaptchaAI: Streaming-CAPTCHA-Aufgabenverarbeitung

Wenn das CAPTCHA-Lösungsvolumen Tausende von Aufgaben pro Stunde erreicht, benötigen Sie mehr als eine einfache Warteschlange. Apache Kafka bietet dauerhaftes, geordnetes Nachrichten-Streaming mit hohem Durchsatz – ideal, um die Übermittlung von CAPTCHA-Aufgaben von der Ergebnisverarbeitung im großen Maßstab zu entkoppeln.

Kafka ist aber nicht automatisch die richtige erste Antwort. Wenn ein einzelner Dienst nur wenige hundert Aufgaben pro Tag verarbeitet, ist eine einfachere Queue oft günstiger und leichter zu betreiben. Kafka wird interessant, wenn Sie Rebalancing, Consumer Groups, Replays und klar getrennte Producer- und Consumer-Pfade wirklich brauchen.

Wann lohnt sich Kafka hier?

Betriebsrealität Kafka passt gut Einfachere Queue reicht oft
Tausende Aufgaben pro Stunde, mehrere Konsumenten Ja
Replay, Audit oder Streaming-Auswertung sind wichtig Ja
Ein kleiner interner Workflow mit begrenztem Volumen Häufig ausreichend
Sie brauchen nur eine robuste Aufgabenliste, keine Streaming-Plattform Häufig ausreichend

Architektur

[Scrapers] → Produce → [Kafka: captcha-tasks topic]
                              ↓
                    [CAPTCHA Worker Group]
                    (consume tasks, solve via CaptchaAI)
                              ↓
                    Produce → [Kafka: captcha-results topic]
                              ↓
                    [Result Consumer Group]
                    (process solutions, update database)

Zwei Kafka-Themen trennen die Anliegen:

  • captcha-tasks – CAPTCHA-Parameter warten darauf, gelöst zu werden
  • captcha-results – Gelöste Token, bereit für die nachgelagerte Verwendung

Voraussetzungen

# Python
pip install kafka-python requests

# Node.js
npm install kafkajs axios

Kafka-Broker, der auf localhost:9092 (oder Ihrer Cluster-Adresse) ausgeführt wird.

Schritt 1: Themen erstellen

kafka-topics.sh --create --topic captcha-tasks \
  --partitions 6 --replication-factor 1 \
  --bootstrap-server localhost:9092

kafka-topics.sh --create --topic captcha-results \
  --partitions 6 --replication-factor 1 \
  --bootstrap-server localhost:9092

Sechs Partitionen ermöglichen bis zu sechs parallele Verbraucher pro Gruppe.

Schritt 2: Aufgabenersteller (Scraper-Seite)

Python

import json
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    key_serializer=lambda k: k.encode("utf-8") if k else None,
    acks="all",  # Wait for all replicas to confirm
    retries=3
)


def enqueue_captcha(task_id, sitekey, pageurl, captcha_type="userrecaptcha"):
    """Send a CAPTCHA task to Kafka."""
    task = {
        "task_id": task_id,
        "method": captcha_type,
        "sitekey": sitekey,
        "pageurl": pageurl,
        "submitted_at": __import__("time").time()
    }

    future = producer.send(
        "captcha-tasks",
        key=task_id,  # Key ensures same task goes to same partition
        value=task
    )
    future.get(timeout=10)  # Block until confirmed
    return task_id


# Submit tasks
enqueue_captcha("task_001", "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "https://example.com")
enqueue_captcha("task_002", "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-", "https://example.com")
producer.flush()

JavaScript

const { Kafka } = require("kafkajs");

const kafka = new Kafka({
  clientId: "captcha-producer",
  brokers: ["localhost:9092"],
});

const producer = kafka.producer();

async function enqueueCaptcha(taskId, sitekey, pageurl) {
  await producer.connect();

  const task = {
    task_id: taskId,
    method: "userrecaptcha",
    sitekey: sitekey,
    pageurl: pageurl,
    submitted_at: Date.now(),
  };

  await producer.send({
    topic: "captcha-tasks",
    messages: [{ key: taskId, value: JSON.stringify(task) }],
  });
}

(async () => {
  await enqueueCaptcha(
    "task_001",
    "6Le-wvkSAAAAAPBMRTvw0Q4Muexq9bi0DJwx_mJ-",
    "https://example.com"
  );
  await producer.disconnect();
})();

Schritt 3: CAPTCHA-Worker (Verbraucher + Solver)

Python

import json
import os
import time
import requests
from kafka import KafkaConsumer, KafkaProducer

API_KEY = os.environ["CAPTCHAAI_API_KEY"]

consumer = KafkaConsumer(
    "captcha-tasks",
    bootstrap_servers=["localhost:9092"],
    group_id="captcha-workers",
    value_deserializer=lambda m: json.loads(m.decode("utf-8")),
    auto_offset_reset="earliest",
    enable_auto_commit=False,  # Manual commit after processing
    max_poll_records=10
)

result_producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda v: json.dumps(v).encode("utf-8")
)


def solve_captcha(task):
    """Submit to CaptchaAI and poll for result."""
    # Submit
    resp = requests.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": task["method"],
        "googlekey": task["sitekey"],
        "pageurl": task["pageurl"],
        "json": 1
    })
    data = resp.json()

    if data.get("status") != 1:
        return {"error": data.get("request")}

    captcha_id = data["request"]

    # Poll for result
    for _ in range(60):
        time.sleep(5)
        result = requests.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY,
            "action": "get",
            "id": captcha_id,
            "json": 1
        }).json()

        if result.get("status") == 1:
            return {"solution": result["request"]}
        if result.get("request") != "CAPCHA_NOT_READY":
            return {"error": result.get("request")}

    return {"error": "TIMEOUT"}


# Main consumer loop
print("CAPTCHA worker started. Waiting for tasks...")
for message in consumer:
    task = message.value
    print(f"Processing {task['task_id']}...")

    result = solve_captcha(task)
    result["task_id"] = task["task_id"]
    result["solved_at"] = time.time()

    # Publish result
    result_producer.send("captcha-results", value=result)
    result_producer.flush()

    # Commit offset after successful processing
    consumer.commit()
    print(f"  → {task['task_id']}: {'solved' if 'solution' in result else result.get('error')}")

JavaScript

const { Kafka } = require("kafkajs");
const axios = require("axios");

const API_KEY = process.env.CAPTCHAAI_API_KEY;

const kafka = new Kafka({
  clientId: "captcha-worker",
  brokers: ["localhost:9092"],
});

const consumer = kafka.consumer({ groupId: "captcha-workers" });
const producer = kafka.producer();

function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

async function solveCaptcha(task) {
  const submitResp = await axios.post(
    "https://ocr.captchaai.com/in.php",
    null,
    {
      params: {
        key: API_KEY,
        method: task.method,
        googlekey: task.sitekey,
        pageurl: task.pageurl,
        json: 1,
      },
    }
  );

  if (submitResp.data.status !== 1) {
    return { error: submitResp.data.request };
  }

  const captchaId = submitResp.data.request;

  for (let i = 0; i < 60; i++) {
    await sleep(5000);
    const result = await axios.get("https://ocr.captchaai.com/res.php", {
      params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
    });

    if (result.data.status === 1) return { solution: result.data.request };
    if (result.data.request !== "CAPCHA_NOT_READY")
      return { error: result.data.request };
  }

  return { error: "TIMEOUT" };
}

async function run() {
  await consumer.connect();
  await producer.connect();
  await consumer.subscribe({ topic: "captcha-tasks", fromBeginning: false });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const task = JSON.parse(message.value.toString());
      console.log(`Processing ${task.task_id}...`);

      const result = await solveCaptcha(task);
      result.task_id = task.task_id;
      result.solved_at = Date.now();

      await producer.send({
        topic: "captcha-results",
        messages: [{ value: JSON.stringify(result) }],
      });

      console.log(
        `  → ${task.task_id}: ${result.solution ? "solved" : result.error}`
      );
    },
  });
}

run();

Skalierende Arbeiter

Kafka-Verbrauchergruppen verteilen Partitionen automatisch an alle Mitarbeiter:

# 6 partitions, 3 workers → each worker gets 2 partitions
Worker-1: partitions 0, 1
Worker-2: partitions 2, 3
Worker-3: partitions 4, 5

# Add Worker-4 → rebalance
Worker-1: partitions 0, 1
Worker-2: partitions 2
Worker-3: partitions 3, 4
Worker-4: partition 5

Skalieren Sie auf die Anzahl der Partitionen. Fügen Sie darüber hinaus weitere Partitionen hinzu.

Überwachung

Verfolgen Sie wichtige Kennzahlen über die Verbraucherverzögerung von Kafka:

kafka-consumer-groups.sh --describe --group captcha-workers \
  --bootstrap-server localhost:9092
Metrisch Gesund Warnung
Verbraucherverzögerung < 100 > 1000 (Arbeiter hinzufügen)
Nachrichten/sec in Entspricht der Schaberrate Spitzen deuten auf Platzen hin
Nachrichten/sec aus Entspricht der Rate Zurückfallen = Engpass

Fehlerbehebung

Problem Ursache Lösung
Worker ist erreichbar, verarbeitet aber keine Aufgaben Queue, Credentials oder Eingabestrom stimmen nicht Prüfe Queue-Tiefe, API-Key, Health-Checks und Fehlerraten pro Worker gemeinsam
Fehlerrate steigt nach Rollout Neue Version verändert Session-, Proxy- oder Retry-Verhalten Vergleiche erfolgreiche und fehlschlagende Runs zwischen alter und neuer Version und rolle bei Bedarf zurück
Canary oder Health-Check bleibt rot Abhängigkeiten, Zeitlimits oder Secrets weichen von der Zielumgebung ab Prüfe Secrets, Netzwerkpfade und Schwellenwerte in exakt derselben Umgebung

Verwandte Leitfäden

  • RabbitMQ + CaptchaAI Message-Queue-Integration
  • Redis-Warteschlange + CaptchaAI
  • NATS Messaging CaptchaAI Aufgabenverteilung
Kommentare sind für diesen Artikel deaktiviert.