DevOps & Scaling

Auto-Scaling CAPTCHA-Solver-Worker

Statische Worker-Pools verschwenden in ruhigen Zeiten Geld und führen in Spitzenzeiten zu Engpässen. Durch Auto-Scaling wird die Worker-Anzahl an den tatsächlichen Bedarf angepasst, wodurch sowohl Kosten als auch Durchsatz optimiert werden.


Skalierungssignale

Signal Hochskalieren wenn Herunterskalieren wenn
Warteschlangentiefe > 20 ausstehende Aufgaben < 5 ausstehende Aufgaben
Worker-Auslastung > 80 % ausgelastet < 20 % ausgelastet
Lösungslatenz P95 > 60 Sekunden P95 < 20 Sekunden
Fehlerquote > 5 % (neue Worker hinzufügen) Stabil < 1 %
Kontostand N/A Kontostand < 1 $ (Skalierung stoppen)

Thread-basierter Auto-Scaler

Skalieren Sie Worker-Threads innerhalb eines einzelnen Prozesses:

import os
import time
import threading
import requests
import json
import redis


class AutoScalingPool:
    """Dynamically scale CaptchaAI worker threads."""

    def __init__(self, api_key, redis_url="redis://localhost:6379"):
        self.api_key = api_key
        self.redis = redis.from_url(redis_url)
        self.base = "https://ocr.captchaai.com"
        self.queue_key = "captcha:tasks"
        self.results_key = "captcha:results"

        self.min_workers = 2
        self.max_workers = 20
        self.workers = []
        self.active_count = 0
        self.lock = threading.Lock()
        self.running = True

    def start(self):
        """Start the pool with minimum workers."""
        for _ in range(self.min_workers):
            self._add_worker()

        # Start scaler in background
        scaler = threading.Thread(target=self._scaling_loop, daemon=True)
        scaler.start()
        print(f"Pool started with {self.min_workers} workers")

    def _add_worker(self):
        """Add a worker thread."""
        if len(self.workers) >= self.max_workers:
            return
        t = threading.Thread(target=self._worker_loop, daemon=True)
        t.start()
        self.workers.append(t)

    def _remove_worker(self):
        """Signal one worker to stop (lazy removal)."""
        if len(self.workers) <= self.min_workers:
            return
        self.workers.pop()  # Thread will exit on next idle cycle

    def _worker_loop(self):
        """Worker loop: fetch and process tasks."""
        while self.running and threading.current_thread() in self.workers:
            result = self.redis.blpop(self.queue_key, timeout=10)
            if result is None:
                continue

            _, raw = result
            task = json.loads(raw)
            task_id = task["id"]

            with self.lock:
                self.active_count += 1

            try:
                token = self._solve(task["method"], task["params"])
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "success", "token": token,
                }))
            except Exception as e:
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "error", "error": str(e),
                }))
            finally:
                with self.lock:
                    self.active_count -= 1

    def _scaling_loop(self):
        """Periodically adjust worker count."""
        while self.running:
            time.sleep(10)

            queue_depth = self.redis.llen(self.queue_key)
            current = len(self.workers)
            utilization = (
                self.active_count / current * 100 if current > 0 else 0
            )

            # Scale up: queue growing and workers busy
            if queue_depth > 20 and utilization > 70:
                new_count = min(current + 2, self.max_workers)
                while len(self.workers) < new_count:
                    self._add_worker()
                print(f"Scaled up: {current} → {len(self.workers)} workers")

            # Scale down: queue empty and workers idle
            elif queue_depth < 5 and utilization < 20:
                target = max(current - 1, self.min_workers)
                while len(self.workers) > target:
                    self._remove_worker()
                if len(self.workers) < current:
                    print(f"Scaled down: {current} → {len(self.workers)} workers")

    def _solve(self, method, params, timeout=120):
        data = {"key": self.api_key, "method": method, "json": 1}
        data.update(params)

        resp = requests.post(
            f"{self.base}/in.php", data=data, timeout=30,
        )
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(result.get("request"))

        captcha_id = result["request"]
        start = time.time()

        while time.time() - start < timeout:
            time.sleep(5)
            resp = requests.get(f"{self.base}/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": captcha_id,
                "json": 1,
            }, timeout=15)
            data = resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(data["request"])

        raise TimeoutError("Solve timeout")

    def stats(self):
        return {
            "workers": len(self.workers),
            "active": self.active_count,
            "queue": self.redis.llen(self.queue_key),
        }


# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()

# Monitor
while True:
    print(pool.stats())
    time.sleep(30)

Prozessbasierter Auto-Scaler

Skalieren Sie Worker-Prozesse für die CPU-Isolierung:

import multiprocessing
import time
import redis
import os


class ProcessScaler:
    """Scale worker processes based on queue depth."""

    def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
        self.worker_fn = worker_fn
        self.redis = redis.from_url(redis_url)
        self.processes = []
        self.min_workers = 2
        self.max_workers = 16

    def run(self, check_interval=15):
        """Run the scaler loop."""
        # Start minimum workers
        for _ in range(self.min_workers):
            self._spawn()

        while True:
            time.sleep(check_interval)
            self._cleanup_dead()

            queue_depth = self.redis.llen("captcha:tasks")
            current = len(self.processes)

            # Scale up
            if queue_depth > current * 5 and current < self.max_workers:
                to_add = min(
                    max(1, queue_depth // 10),
                    self.max_workers - current,
                )
                for _ in range(to_add):
                    self._spawn()
                print(f"Scaled up to {len(self.processes)} workers")

            # Scale down
            elif queue_depth < 3 and current > self.min_workers:
                to_remove = min(2, current - self.min_workers)
                for _ in range(to_remove):
                    p = self.processes.pop()
                    p.terminate()
                print(f"Scaled down to {len(self.processes)} workers")

    def _spawn(self):
        p = multiprocessing.Process(target=self.worker_fn)
        p.start()
        self.processes.append(p)

    def _cleanup_dead(self):
        self.processes = [p for p in self.processes if p.is_alive()]
        # Ensure minimum
        while len(self.processes) < self.min_workers:
            self._spawn()

Kontostand-bewusste Skalierung

Skalierung stoppen, wenn der Kontostand zu niedrig wird:

def check_balance(api_key, min_balance=2.0):
    """Check if balance is sufficient for scaling."""
    resp = requests.get("https://ocr.captchaai.com/res.php", params={
        "key": api_key,
        "action": "getbalance",
        "json": 1,
    }, timeout=15)
    balance = float(resp.json()["request"])

    if balance < min_balance:
        print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
        return False
    return True

In die Skalierungsschleife integrieren:

# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
    if check_balance(self.api_key, min_balance=2.0):
        # Scale up
        ...
    else:
        print("Scaling paused — low balance")

Skalierungsstrategien im Vergleich

Strategie Am besten für Latenz Komplexität
Thread-Pool I/O-bound (API-Aufrufe) Niedrig Niedrig
Prozesspool CPU-gebundene Vorverarbeitung Mittel Mittel
Kubernetes HPA Cloud-native Bereitstellungen Höher Hoch
KEDA Ereignisgesteuerte Skalierung Mittel Mittel

Fehlerbehebung

Problem Ursache Lösung
Der Worker-Pool wächst immer weiter Die Warteschlange wird nie leer Prüfen Sie, ob Worker tatsächlich Aufgaben verarbeiten
Herunterskalierung zu aggressiv Niedrige Schwelle Verzögerung beim Herunterskalieren auf über 30 Sekunden erhöhen
Zombie-Prozesse Prozesse nicht bereinigt _cleanup_dead() regelmäßig aufrufen
Kontostand sinkt schnell Zu viele Worker Kontostandprüfung in die Skalierungslogik integrieren

FAQ

Was ist das richtige Worker-to-Queue-Verhältnis?

Streben Sie 1 Worker pro 5–10 Aufgaben in der Warteschlange an. Jeder Worker verarbeitet je nach CAPTCHA-Typ ca. 3–6 pro Minute.

Soll ich Threads oder Prozesse verwenden?

Threads für reine API-Aufrufe (CaptchaAI ist I/O-bound). Prozesse, bei denen Sie neben der Lösung auch eine Bildvorverarbeitung oder umfangreiche Berechnungen durchführen.

Wie schnell sollte ich skalieren?

Schnell hochskalieren (alle 10–15 Sekunden überprüfen), langsam herunterskalieren (30–60 Sekunden bei geringer Last warten). Dies verhindert das ständige Wechseln zwischen States (State Thrashing).


Verwandte Leitfäden

  • Kubernetes-Jobwarteschlangen
  • Prometheus/Grafana Überwachung

Skalieren Sie intelligent – Holen Sie sich Ihren CaptchaAI-Schlüssel und starten Sie heute.

Kommentare sind für diesen Artikel deaktiviert.

Verwandte Beiträge

Explainers Auswirkungen der DNS-Auflösung auf die Leistung der CAPTCHA-API
DNS-Auflösung und ihre Auswirkungen auf die CAPTCHA-API-Performance: Latenzmessung, DNS-Caching und Optimierungsstrategien für Captcha AI.

DNS-Auflösung und ihre Auswirkungen auf die CAPTCHA-API-Performance: Latenzmessung, DNS-Caching und Optimierun...

Apr 24, 2026
DevOps & Scaling Azure Functions + CaptchaAI: Cloud-Integration
Captcha AI in Azure Functions integrieren – serverlose CAPTCHA-Lösung mit Python, Key Vault, Queue Storage und Application Insights.

Captcha AI in Azure Functions integrieren – serverlose CAPTCHA-Lösung mit Python, Key Vault, Queue Storage und...

May 02, 2026
DevOps & Scaling CaptchaAI Überwachung mit Datadog: Metriken und Warnungen
Captcha AI mit Datadog überwachen: CAPTCHA-Lösungsraten, Latenz und Fehlerraten als Datadog-Metriken senden und Alerts konfigurieren.

Captcha AI mit Datadog überwachen: CAPTCHA-Lösungsraten, Latenz und Fehlerraten als Datadog-Metriken senden un...

May 06, 2026