Statische Worker-Pools verschwenden in ruhigen Zeiten Geld und führen in Spitzenzeiten zu Engpässen. Durch Auto-Scaling wird die Worker-Anzahl an den tatsächlichen Bedarf angepasst, wodurch sowohl Kosten als auch Durchsatz optimiert werden.
Skalierungssignale
| Signal | Hochskalieren wenn | Herunterskalieren wenn |
|---|---|---|
| Warteschlangentiefe | > 20 ausstehende Aufgaben | < 5 ausstehende Aufgaben |
| Worker-Auslastung | > 80 % ausgelastet | < 20 % ausgelastet |
| Lösungslatenz | P95 > 60 Sekunden | P95 < 20 Sekunden |
| Fehlerquote | > 5 % (neue Worker hinzufügen) | Stabil < 1 % |
| Kontostand | N/A | Kontostand < 1 $ (Skalierung stoppen) |
Thread-basierter Auto-Scaler
Skalieren Sie Worker-Threads innerhalb eines einzelnen Prozesses:
import os
import time
import threading
import requests
import json
import redis
class AutoScalingPool:
"""Dynamically scale CaptchaAI worker threads."""
def __init__(self, api_key, redis_url="redis://localhost:6379"):
self.api_key = api_key
self.redis = redis.from_url(redis_url)
self.base = "https://ocr.captchaai.com"
self.queue_key = "captcha:tasks"
self.results_key = "captcha:results"
self.min_workers = 2
self.max_workers = 20
self.workers = []
self.active_count = 0
self.lock = threading.Lock()
self.running = True
def start(self):
"""Start the pool with minimum workers."""
for _ in range(self.min_workers):
self._add_worker()
# Start scaler in background
scaler = threading.Thread(target=self._scaling_loop, daemon=True)
scaler.start()
print(f"Pool started with {self.min_workers} workers")
def _add_worker(self):
"""Add a worker thread."""
if len(self.workers) >= self.max_workers:
return
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
self.workers.append(t)
def _remove_worker(self):
"""Signal one worker to stop (lazy removal)."""
if len(self.workers) <= self.min_workers:
return
self.workers.pop() # Thread will exit on next idle cycle
def _worker_loop(self):
"""Worker loop: fetch and process tasks."""
while self.running and threading.current_thread() in self.workers:
result = self.redis.blpop(self.queue_key, timeout=10)
if result is None:
continue
_, raw = result
task = json.loads(raw)
task_id = task["id"]
with self.lock:
self.active_count += 1
try:
token = self._solve(task["method"], task["params"])
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "success", "token": token,
}))
except Exception as e:
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "error", "error": str(e),
}))
finally:
with self.lock:
self.active_count -= 1
def _scaling_loop(self):
"""Periodically adjust worker count."""
while self.running:
time.sleep(10)
queue_depth = self.redis.llen(self.queue_key)
current = len(self.workers)
utilization = (
self.active_count / current * 100 if current > 0 else 0
)
# Scale up: queue growing and workers busy
if queue_depth > 20 and utilization > 70:
new_count = min(current + 2, self.max_workers)
while len(self.workers) < new_count:
self._add_worker()
print(f"Scaled up: {current} → {len(self.workers)} workers")
# Scale down: queue empty and workers idle
elif queue_depth < 5 and utilization < 20:
target = max(current - 1, self.min_workers)
while len(self.workers) > target:
self._remove_worker()
if len(self.workers) < current:
print(f"Scaled down: {current} → {len(self.workers)} workers")
def _solve(self, method, params, timeout=120):
data = {"key": self.api_key, "method": method, "json": 1}
data.update(params)
resp = requests.post(
f"{self.base}/in.php", data=data, timeout=30,
)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(result.get("request"))
captcha_id = result["request"]
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{self.base}/res.php", params={
"key": self.api_key,
"action": "get",
"id": captcha_id,
"json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
if data.get("status") == 1:
return data["request"]
raise RuntimeError(data["request"])
raise TimeoutError("Solve timeout")
def stats(self):
return {
"workers": len(self.workers),
"active": self.active_count,
"queue": self.redis.llen(self.queue_key),
}
# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()
# Monitor
while True:
print(pool.stats())
time.sleep(30)
Prozessbasierter Auto-Scaler
Skalieren Sie Worker-Prozesse für die CPU-Isolierung:
import multiprocessing
import time
import redis
import os
class ProcessScaler:
"""Scale worker processes based on queue depth."""
def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
self.worker_fn = worker_fn
self.redis = redis.from_url(redis_url)
self.processes = []
self.min_workers = 2
self.max_workers = 16
def run(self, check_interval=15):
"""Run the scaler loop."""
# Start minimum workers
for _ in range(self.min_workers):
self._spawn()
while True:
time.sleep(check_interval)
self._cleanup_dead()
queue_depth = self.redis.llen("captcha:tasks")
current = len(self.processes)
# Scale up
if queue_depth > current * 5 and current < self.max_workers:
to_add = min(
max(1, queue_depth // 10),
self.max_workers - current,
)
for _ in range(to_add):
self._spawn()
print(f"Scaled up to {len(self.processes)} workers")
# Scale down
elif queue_depth < 3 and current > self.min_workers:
to_remove = min(2, current - self.min_workers)
for _ in range(to_remove):
p = self.processes.pop()
p.terminate()
print(f"Scaled down to {len(self.processes)} workers")
def _spawn(self):
p = multiprocessing.Process(target=self.worker_fn)
p.start()
self.processes.append(p)
def _cleanup_dead(self):
self.processes = [p for p in self.processes if p.is_alive()]
# Ensure minimum
while len(self.processes) < self.min_workers:
self._spawn()
Kontostand-bewusste Skalierung
Skalierung stoppen, wenn der Kontostand zu niedrig wird:
def check_balance(api_key, min_balance=2.0):
"""Check if balance is sufficient for scaling."""
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": api_key,
"action": "getbalance",
"json": 1,
}, timeout=15)
balance = float(resp.json()["request"])
if balance < min_balance:
print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
return False
return True
In die Skalierungsschleife integrieren:
# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
if check_balance(self.api_key, min_balance=2.0):
# Scale up
...
else:
print("Scaling paused — low balance")
Skalierungsstrategien im Vergleich
| Strategie | Am besten für | Latenz | Komplexität |
|---|---|---|---|
| Thread-Pool | I/O-bound (API-Aufrufe) | Niedrig | Niedrig |
| Prozesspool | CPU-gebundene Vorverarbeitung | Mittel | Mittel |
| Kubernetes HPA | Cloud-native Bereitstellungen | Höher | Hoch |
| KEDA | Ereignisgesteuerte Skalierung | Mittel | Mittel |
Fehlerbehebung
| Problem | Ursache | Lösung |
|---|---|---|
| Der Worker-Pool wächst immer weiter | Die Warteschlange wird nie leer | Prüfen Sie, ob Worker tatsächlich Aufgaben verarbeiten |
| Herunterskalierung zu aggressiv | Niedrige Schwelle | Verzögerung beim Herunterskalieren auf über 30 Sekunden erhöhen |
| Zombie-Prozesse | Prozesse nicht bereinigt | _cleanup_dead() regelmäßig aufrufen |
| Kontostand sinkt schnell | Zu viele Worker | Kontostandprüfung in die Skalierungslogik integrieren |
FAQ
Was ist das richtige Worker-to-Queue-Verhältnis?
Streben Sie 1 Worker pro 5–10 Aufgaben in der Warteschlange an. Jeder Worker verarbeitet je nach CAPTCHA-Typ ca. 3–6 pro Minute.
Soll ich Threads oder Prozesse verwenden?
Threads für reine API-Aufrufe (CaptchaAI ist I/O-bound). Prozesse, bei denen Sie neben der Lösung auch eine Bildvorverarbeitung oder umfangreiche Berechnungen durchführen.
Wie schnell sollte ich skalieren?
Schnell hochskalieren (alle 10–15 Sekunden überprüfen), langsam herunterskalieren (30–60 Sekunden bei geringer Last warten). Dies verhindert das ständige Wechseln zwischen States (State Thrashing).
Verwandte Leitfäden
- Kubernetes-Jobwarteschlangen
- Prometheus/Grafana Überwachung
Skalieren Sie intelligent – Holen Sie sich Ihren CaptchaAI-Schlüssel und starten Sie heute.