Anwendungsfälle

CAPTCHA-Verarbeitung für die Börsendatenerfassung

Finanzportale schützen Börsenkurse, Gewinndaten und Analystenberichte mit Cloudflare Turnstile und reCAPTCHA. CAPTCHAs werden bei schnellen Symbolsuchen, beim Herunterladen historischer Daten und bei Screener-Abfragen ausgelöst. Hier erfahren Sie, wie Sie eine zuverlässige Datenerfassung auf allen Finanzseiten aufrechterhalten.

CAPTCHA-Muster auf Finanzportalen

Datentyp Portalbeispiele CAPTCHA-Typ Auslöser
Echtzeit-Kurse Finanzportale Cloudflare Turnstile Schnelle Symbolsuche
Historische Preise Datenanbieter reCAPTCHA v2 Massen-CSV-Downloads
Finanzberichte SEC-Einreichungsseiten Bild-CAPTCHA Wiederholte EDGAR-Abfragen
Screener-Ergebnisse Aktienprüfer Cloudflare Challenge Komplexe Filterabfragen
Analystenbewertungen Forschungsportale reCAPTCHA v3 Mehrere Seitenaufrufe

Bestandsdatensammler

import requests
import time
import re
from datetime import datetime, timedelta

class StockDataCollector:
    def __init__(self, api_key):
        self.api_key = api_key
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
        })

    def get_quote(self, portal_url, symbol):
        """Get current stock quote, solving CAPTCHAs if needed."""
        url = f"{portal_url}/quote/{symbol}"
        response = self.session.get(url)

        if self._is_captcha_page(response):
            response = self._solve_and_retry(response, url)

        return self._parse_quote(response.text, symbol)

    def get_historical(self, portal_url, symbol, days=365):
        """Download historical price data."""
        url = f"{portal_url}/history/{symbol}"
        params = {
            "period": f"{days}d",
            "interval": "1d"
        }
        response = self.session.get(url, params=params)

        if self._is_captcha_page(response):
            response = self._solve_and_retry(response, url)

        return self._parse_historical(response.text)

    def scan_symbols(self, portal_url, symbols, delay=2):
        """Collect quotes for multiple symbols."""
        results = {}

        for symbol in symbols:
            try:
                results[symbol] = self.get_quote(portal_url, symbol)
                time.sleep(delay)
            except Exception as e:
                results[symbol] = {"error": str(e)}

        return results

    def _is_captcha_page(self, response):
        return (
            response.status_code == 403 or
            "cf-turnstile" in response.text or
            "challenges.cloudflare.com" in response.text
        )

    def _solve_and_retry(self, response, url):
        match = re.search(r'data-sitekey="(0x[^"]+)"', response.text)
        if not match:
            # Fall back to reCAPTCHA detection
            match = re.search(r'data-sitekey="([^"]+)"', response.text)
            if match:
                return self._solve_recaptcha_and_retry(match.group(1), url)
            raise ValueError("No CAPTCHA sitekey found")

        resp = requests.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key,
            "method": "turnstile",
            "sitekey": match.group(1),
            "pageurl": url,
            "json": 1
        })
        task_id = resp.json()["request"]

        for _ in range(60):
            time.sleep(3)
            result = requests.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": task_id,
                "json": 1
            })
            data = result.json()
            if data["status"] == 1:
                return self.session.post(url, data={
                    "cf-turnstile-response": data["request"]
                })

        raise TimeoutError("CAPTCHA solve timed out")

    def _solve_recaptcha_and_retry(self, site_key, url):
        resp = requests.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key,
            "method": "userrecaptcha",
            "googlekey": site_key,
            "pageurl": url,
            "json": 1
        })
        task_id = resp.json()["request"]

        for _ in range(60):
            time.sleep(3)
            result = requests.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": task_id,
                "json": 1
            })
            data = result.json()
            if data["status"] == 1:
                return self.session.post(url, data={
                    "g-recaptcha-response": data["request"]
                })

        raise TimeoutError("reCAPTCHA solve timed out")

    def _parse_quote(self, html, symbol):
        from bs4 import BeautifulSoup
        soup = BeautifulSoup(html, "html.parser")

        def text_or_none(node):
            return node.text.strip() if node and node.text else None

        return {
            "symbol": symbol,
            "price": text_or_none(soup.select_one("[data-field='regularMarketPrice'], .price")),
            "change": text_or_none(soup.select_one("[data-field='regularMarketChange'], .change")),
            "volume": text_or_none(soup.select_one("[data-field='regularMarketVolume'], .volume")),
            "market_cap": text_or_none(soup.select_one("[data-field='marketCap'], .market-cap")),
            "timestamp": datetime.now().isoformat()
        }

    def _parse_historical(self, html):
        from bs4 import BeautifulSoup
        soup = BeautifulSoup(html, "html.parser")
        rows = []

        for row in soup.select("table tr")[1:]:  # Skip header
            cells = [td.text.strip() for td in row.select("td")]
            if len(cells) >= 6:
                rows.append({
                    "date": cells[0],
                    "open": cells[1],
                    "high": cells[2],
                    "low": cells[3],
                    "close": cells[4],
                    "volume": cells[5]
                })

        return rows


# Usage
collector = StockDataCollector("YOUR_API_KEY")

# Single quote
quote = collector.get_quote("https://finance.example.com", "AAPL")
print(f"AAPL: ${quote['price']} ({quote['change']})")

# Scan multiple symbols
portfolio = collector.scan_symbols(
    "https://finance.example.com",
    ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
)

Markt-Screener mit CAPTCHA-Verwaltung (JavaScript)

class MarketScreener {
  constructor(apiKey) {
    this.apiKey = apiKey;
  }

  async screenStocks(portalUrl, filters) {
    const params = new URLSearchParams(filters);
    const response = await fetch(`${portalUrl}/screener?${params}`);
    const html = await response.text();

    if (html.includes('cf-turnstile') || response.status === 403) {
      return this.solveAndScreen(portalUrl, filters, html);
    }

    return this.parseScreenerResults(html);
  }

  async solveAndScreen(portalUrl, filters, html) {
    const match = html.match(/data-sitekey="(0x[^"]+)"/);
    if (!match) throw new Error('Turnstile sitekey not found');

    const submitResp = await fetch('https://ocr.captchaai.com/in.php', {
      method: 'POST',
      body: new URLSearchParams({
        key: this.apiKey,
        method: 'turnstile',
        sitekey: match[1],
        pageurl: portalUrl,
        json: '1'
      })
    });
    const { request: taskId } = await submitResp.json();

    for (let i = 0; i < 60; i++) {
      await new Promise(r => setTimeout(r, 3000));
      const result = await fetch(
        `https://ocr.captchaai.com/res.php?key=${this.apiKey}&action=get&id=${taskId}&json=1`
      );
      const data = await result.json();
      if (data.status === 1) {
        const response = await fetch(`${portalUrl}/screener`, {
          method: 'POST',
          body: new URLSearchParams({
            ...filters,
            'cf-turnstile-response': data.request
          })
        });
        return this.parseScreenerResults(await response.text());
      }
    }
    throw new Error('Turnstile solve timed out');
  }

  parseScreenerResults(html) {
    const rows = [];
    const tableMatch = html.match(/<table[^>]*>[\s\S]*?<\/table>/i);
    if (!tableMatch) return rows;

    const rowMatches = tableMatch[0].matchAll(/<tr[^>]*>([\s\S]*?)<\/tr>/gi);
    for (const row of rowMatches) {
      const cells = [...row[1].matchAll(/<td[^>]*>([\s\S]*?)<\/td>/gi)]
        .map(m => m[1].replace(/<[^>]+>/g, '').trim());
      if (cells.length >= 4) {
        rows.push({
          symbol: cells[0],
          price: cells[1],
          change: cells[2],
          volume: cells[3]
        });
      }
    }
    return rows;
  }
}

// Usage
const screener = new MarketScreener('YOUR_API_KEY');
const results = await screener.screenStocks('https://finance.example.com', {
  sector: 'technology',
  marketCap: 'large',
  peRatio: '<25'
});

Erfassungshäufigkeit nach Datentyp

Datentyp Empfohlenes Intervall CAPTCHA-Häufigkeit
Echtzeit-Kurse 1–5 Minuten Hoch – API verwenden, falls verfügbar
Tagesendpreise Einmal täglich nach Geschäftsschluss Niedrig
Finanzberichte Vierteljährlich Minimal
Screener-Ergebnisse Täglich Mäßig
Analystenbewertungen Wöchentlich Niedrig

Fehlerbehebung

Problem Ursache Lösung
Zu viele CAPTCHAs in kurzer Zeit Abrufrate oder Parallelität ist für die Quelle zu aggressiv Drossele Intervalle, halte Sessions stabil und prüfe die Qualität deiner Proxys
Daten fehlen trotz gelöster CAPTCHA Der Parser liest eine alte oder unvollständige Ansicht aus Extrahiere Daten erst nach erfolgreicher Token-Anwendung in derselben Sitzung
Kosten steigen stärker als erwartet Zu viele Wiederholungen oder unnötige Seitenaufrufe lösen zusätzliche Challenges aus Löse nur kritische Schritte und protokolliere Wiederholungen pro Quelle

Verwandte Leitfäden

Diskussionen (0)

Noch keine Kommentare.