feat(proxy): integrate Scrapling for enhanced web scraping capabilities

- Replaced Playwright with Scrapling's fetchers in the web crawling and YouTube processing modules for improved performance and flexibility.
- Updated proxy configuration to support dynamic proxy selection via environment variables.
- Enhanced logging to track performance metrics during web scraping operations.
- Refactored related modules to utilize the new proxy utilities and streamline the scraping process.
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-06-09 00:15:10 -07:00
parent 41a93ca8fb
commit 640ef5f15d
16 changed files with 5770 additions and 4886 deletions

View file

@ -277,9 +277,13 @@ TURNSTILE_ENABLED=FALSE
TURNSTILE_SECRET_KEY=
# Proxy provider selection. Selects a ProxyProvider implementation registered in
# app/utils/proxy/registry.py. Default: "anonymous_proxies". Add new vendors there.
# PROXY_PROVIDER=anonymous_proxies
# Residential Proxy Configuration (anonymous-proxies.net)
# Used for web crawling, link previews, and YouTube transcript fetching to avoid IP bans.
# Leave commented out to disable proxying.
# Consumed by the "anonymous_proxies" provider. Leave commented out to disable proxying.
# RESIDENTIAL_PROXY_USERNAME=your_proxy_username
# RESIDENTIAL_PROXY_PASSWORD=your_proxy_password
# RESIDENTIAL_PROXY_HOSTNAME=rotating.dnsproxifier.com:31230

View file

@ -109,8 +109,10 @@ RUN --mount=type=secret,id=HF_TOKEN \
HF_TOKEN="$(cat /run/secrets/HF_TOKEN 2>/dev/null || true)" \
python -c "from chonkie import AutoEmbeddings; AutoEmbeddings.get_embeddings('${EMBEDDING_MODEL}')"
# Install Playwright browsers (the playwright python package itself is in deps)
RUN playwright install chromium --with-deps
# Install Scrapling's browser engines (patchright Chromium + Camoufox).
# Scrapling pulls playwright/patchright via the `fetchers` extra; `scrapling install`
# downloads the matching browser binaries used by DynamicFetcher/StealthyFetcher.
RUN scrapling install
# Shared temp directory for file uploads between API and Worker containers.
# Python's tempfile module uses TMPDIR, so uploaded files land here.

View file

@ -8,18 +8,19 @@ transcript directly via the YouTubeTranscriptApi instead of crawling the page.
import hashlib
import logging
import time
from typing import Any
from urllib.parse import urlparse
import aiohttp
from fake_useragent import UserAgent
from langchain_core.tools import tool
from requests import Session
from scrapling.fetchers import AsyncFetcher
from youtube_transcript_api import YouTubeTranscriptApi
from app.connectors.webcrawler_connector import WebCrawlerConnector
from app.tasks.document_processors.youtube_processor import get_youtube_video_id
from app.utils.proxy_config import get_requests_proxies
from app.utils.proxy import get_proxy_url, get_requests_proxies
logger = logging.getLogger(__name__)
@ -85,15 +86,20 @@ async def _scrape_youtube_video(
oembed_url = "https://www.youtube.com/oembed"
try:
async with (
aiohttp.ClientSession() as http_session,
http_session.get(
oembed_url,
params=params,
proxy=residential_proxies["http"] if residential_proxies else None,
) as response,
):
video_data = await response.json()
oembed_fetch_start = time.perf_counter()
oembed_page = await AsyncFetcher.get(
oembed_url,
params=params,
proxy=get_proxy_url(),
stealthy_headers=True,
)
logger.info(
"[scrape_webpage][perf] source=oembed video=%s status=%s fetch_ms=%.1f",
video_id,
getattr(oembed_page, "status", None),
(time.perf_counter() - oembed_fetch_start) * 1000,
)
video_data = oembed_page.json()
except Exception:
video_data = {}
@ -102,6 +108,7 @@ async def _scrape_youtube_video(
# --- Transcript via YouTubeTranscriptApi ---
try:
transcript_fetch_start = time.perf_counter()
ua = UserAgent()
http_client = Session()
http_client.headers.update({"User-Agent": ua.random})
@ -115,6 +122,11 @@ async def _scrape_youtube_video(
transcript = next(iter(transcript_list))
captions = transcript.fetch()
logger.info(
"[scrape_webpage][perf] source=transcript video=%s fetch_ms=%.1f",
video_id,
(time.perf_counter() - transcript_fetch_start) * 1000,
)
logger.info(
f"[scrape_webpage] Fetched transcript for {video_id} "
f"in {transcript.language} ({transcript.language_code})"

View file

@ -2,18 +2,19 @@
import hashlib
import logging
import time
from typing import Any
from urllib.parse import urlparse
import aiohttp
from fake_useragent import UserAgent
from langchain_core.tools import tool
from requests import Session
from scrapling.fetchers import AsyncFetcher
from youtube_transcript_api import YouTubeTranscriptApi
from app.connectors.webcrawler_connector import WebCrawlerConnector
from app.tasks.document_processors.youtube_processor import get_youtube_video_id
from app.utils.proxy_config import get_requests_proxies
from app.utils.proxy import get_proxy_url, get_requests_proxies
logger = logging.getLogger(__name__)
@ -79,15 +80,20 @@ async def _scrape_youtube_video(
oembed_url = "https://www.youtube.com/oembed"
try:
async with (
aiohttp.ClientSession() as http_session,
http_session.get(
oembed_url,
params=params,
proxy=residential_proxies["http"] if residential_proxies else None,
) as response,
):
video_data = await response.json()
oembed_fetch_start = time.perf_counter()
oembed_page = await AsyncFetcher.get(
oembed_url,
params=params,
proxy=get_proxy_url(),
stealthy_headers=True,
)
logger.info(
"[scrape_webpage][perf] source=oembed video=%s status=%s fetch_ms=%.1f",
video_id,
getattr(oembed_page, "status", None),
(time.perf_counter() - oembed_fetch_start) * 1000,
)
video_data = oembed_page.json()
except Exception:
video_data = {}
@ -96,6 +102,7 @@ async def _scrape_youtube_video(
# --- Transcript via YouTubeTranscriptApi ---
try:
transcript_fetch_start = time.perf_counter()
ua = UserAgent()
http_client = Session()
http_client.headers.update({"User-Agent": ua.random})
@ -109,6 +116,11 @@ async def _scrape_youtube_video(
transcript = next(iter(transcript_list))
captions = transcript.fetch()
logger.info(
"[scrape_webpage][perf] source=transcript video=%s fetch_ms=%.1f",
video_id,
(time.perf_counter() - transcript_fetch_start) * 1000,
)
logger.info(
f"[scrape_webpage] Fetched transcript for {video_id} "
f"in {transcript.language} ({transcript.language_code})"

View file

@ -916,8 +916,13 @@ class Config:
AZURE_DI_ENDPOINT = os.getenv("AZURE_DI_ENDPOINT")
AZURE_DI_KEY = os.getenv("AZURE_DI_KEY")
# Proxy provider selection. Maps to a ProxyProvider implementation registered
# in app/utils/proxy/registry.py. Add new vendors there and switch via this var.
PROXY_PROVIDER = os.getenv("PROXY_PROVIDER", "anonymous_proxies")
# Residential Proxy Configuration (anonymous-proxies.net)
# Used for web crawling and YouTube transcript fetching to avoid IP bans.
# Consumed by the "anonymous_proxies" proxy provider.
RESIDENTIAL_PROXY_USERNAME = os.getenv("RESIDENTIAL_PROXY_USERNAME")
RESIDENTIAL_PROXY_PASSWORD = os.getenv("RESIDENTIAL_PROXY_PASSWORD")
RESIDENTIAL_PROXY_HOSTNAME = os.getenv("RESIDENTIAL_PROXY_HOSTNAME")

View file

@ -1,31 +1,34 @@
"""
WebCrawler Connector Module
A module for crawling web pages and extracting content using Firecrawl,
plain HTTP+Trafilatura, or Playwright. Provides a unified interface for
web scraping.
A module for crawling web pages and extracting content using Firecrawl or
Scrapling's tiered fetchers, with Trafilatura for HTML -> markdown extraction.
Provides a unified interface for web scraping.
Fallback order:
1. Firecrawl (if API key is configured)
2. HTTP + Trafilatura (lightweight, works on any event loop)
3. Playwright / Chromium (runs in a thread to avoid event-loop limitations)
1. Firecrawl (if API key is configured)
2. Scrapling AsyncFetcher (fast static HTTP, no browser subprocess)
3. Scrapling DynamicFetcher (full browser, run in a thread)
4. Scrapling StealthyFetcher (anti-bot stealth browser, run in a thread)
"""
import asyncio
import logging
import time
from typing import Any
import httpx
import trafilatura
import validators
from fake_useragent import UserAgent
from firecrawl import AsyncFirecrawlApp
from playwright.sync_api import sync_playwright
from scrapling.fetchers import AsyncFetcher, DynamicFetcher, StealthyFetcher
from app.utils.proxy_config import get_playwright_proxy, get_residential_proxy_url
from app.utils.proxy import get_proxy_url
logger = logging.getLogger(__name__)
# Prefix for performance/timing log lines so they are easy to grep/filter.
_PERF = "[webcrawler][perf]"
class WebCrawlerConnector:
"""Class for crawling web pages and extracting content."""
@ -36,8 +39,8 @@ class WebCrawlerConnector:
Args:
firecrawl_api_key: Firecrawl API key (optional). If provided, Firecrawl will be tried first
and Chromium will be used as fallback if Firecrawl fails. If not provided,
Chromium will be used directly.
and Scrapling will be used as fallback if Firecrawl fails. If not provided,
Scrapling fetchers are used directly.
"""
self.firecrawl_api_key = firecrawl_api_key
self.use_firecrawl = bool(firecrawl_api_key)
@ -60,8 +63,9 @@ class WebCrawlerConnector:
Fallback order:
1. Firecrawl (if API key configured)
2. Plain HTTP + Trafilatura (lightweight, no subprocess)
3. Playwright / Chromium (needs subprocess-capable event loop)
2. Scrapling AsyncFetcher (fast static HTTP, no subprocess)
3. Scrapling DynamicFetcher (full browser, run in a thread)
4. Scrapling StealthyFetcher (anti-bot stealth browser, run in a thread)
Args:
url: URL to crawl
@ -74,8 +78,8 @@ class WebCrawlerConnector:
- metadata: Page metadata (title, description, etc.)
- source: Original URL
- crawler_type: Type of crawler used
# Validate URL
"""
total_start = time.perf_counter()
try:
if not validators.url(url):
return None, f"Invalid URL: {url}"
@ -84,48 +88,128 @@ class WebCrawlerConnector:
# --- 1. Firecrawl (premium, if configured) ---
if self.use_firecrawl:
tier_start = time.perf_counter()
try:
logger.info(f"[webcrawler] Using Firecrawl for: {url}")
return await self._crawl_with_firecrawl(url, formats), None
result = await self._crawl_with_firecrawl(url, formats)
self._log_tier_outcome("firecrawl", url, tier_start, "success")
self._log_total(url, "firecrawl", total_start)
return result, None
except Exception as exc:
errors.append(f"Firecrawl: {exc!s}")
logger.warning(f"[webcrawler] Firecrawl failed for {url}: {exc!s}")
self._log_tier_outcome("firecrawl", url, tier_start, "error", exc)
# --- 2. HTTP + Trafilatura (no subprocess required) ---
# --- 2. Scrapling AsyncFetcher (fast static HTTP) ---
tier_start = time.perf_counter()
try:
logger.info(f"[webcrawler] Using HTTP+Trafilatura for: {url}")
result = await self._crawl_with_http(url)
logger.info(f"[webcrawler] Using Scrapling AsyncFetcher for: {url}")
result = await self._crawl_with_async_fetcher(url)
if result:
self._log_tier_outcome("scrapling-static", url, tier_start, "success")
self._log_total(url, "scrapling-static", total_start)
return result, None
errors.append("HTTP+Trafilatura: empty extraction")
errors.append("Scrapling static: empty extraction")
self._log_tier_outcome("scrapling-static", url, tier_start, "empty")
except Exception as exc:
errors.append(f"HTTP+Trafilatura: {exc!s}")
logger.warning(
f"[webcrawler] HTTP+Trafilatura failed for {url}: {exc!s}"
)
errors.append(f"Scrapling static: {exc!s}")
self._log_tier_outcome("scrapling-static", url, tier_start, "error", exc)
# --- 3. Playwright / Chromium (full browser, last resort) ---
# --- 3. Scrapling DynamicFetcher (full browser) ---
tier_start = time.perf_counter()
try:
logger.info(f"[webcrawler] Using Chromium+Trafilatura for: {url}")
return await self._crawl_with_chromium(url), None
logger.info(f"[webcrawler] Using Scrapling DynamicFetcher for: {url}")
result = await self._crawl_with_dynamic(url)
if result:
self._log_tier_outcome("scrapling-dynamic", url, tier_start, "success")
self._log_total(url, "scrapling-dynamic", total_start)
return result, None
errors.append("Scrapling dynamic: empty extraction")
self._log_tier_outcome("scrapling-dynamic", url, tier_start, "empty")
except NotImplementedError:
errors.append(
"Chromium: event loop does not support subprocesses "
"Scrapling dynamic: event loop does not support subprocesses "
"(common on Windows with uvicorn --reload)"
)
logger.warning(
f"[webcrawler] Chromium unavailable for {url}: "
"current event loop does not support subprocesses"
self._log_tier_outcome(
"scrapling-dynamic", url, tier_start, "unavailable"
)
except Exception as exc:
errors.append(f"Chromium: {exc!s}")
logger.warning(f"[webcrawler] Chromium failed for {url}: {exc!s}")
errors.append(f"Scrapling dynamic: {exc!s}")
self._log_tier_outcome("scrapling-dynamic", url, tier_start, "error", exc)
# --- 4. Scrapling StealthyFetcher (anti-bot, last resort) ---
tier_start = time.perf_counter()
try:
logger.info(f"[webcrawler] Using Scrapling StealthyFetcher for: {url}")
result = await self._crawl_with_stealthy(url)
if result:
self._log_tier_outcome("scrapling-stealthy", url, tier_start, "success")
self._log_total(url, "scrapling-stealthy", total_start)
return result, None
errors.append("Scrapling stealthy: empty extraction")
self._log_tier_outcome("scrapling-stealthy", url, tier_start, "empty")
except NotImplementedError:
errors.append(
"Scrapling stealthy: event loop does not support subprocesses "
"(common on Windows with uvicorn --reload)"
)
self._log_tier_outcome(
"scrapling-stealthy", url, tier_start, "unavailable"
)
except Exception as exc:
errors.append(f"Scrapling stealthy: {exc!s}")
self._log_tier_outcome(
"scrapling-stealthy", url, tier_start, "error", exc
)
self._log_total(url, "none", total_start)
return None, f"All crawl methods failed for {url}. {'; '.join(errors)}"
except Exception as e:
self._log_total(url, "error", total_start)
return None, f"Error crawling URL {url}: {e!s}"
@staticmethod
def _log_tier_outcome(
tier: str,
url: str,
tier_start: float,
outcome: str,
exc: Exception | None = None,
) -> None:
"""Log how long a single tier took and how it ended."""
elapsed_ms = (time.perf_counter() - tier_start) * 1000
if outcome == "error":
logger.warning(
"%s tier=%s url=%s elapsed_ms=%.1f outcome=error error=%s",
_PERF,
tier,
url,
elapsed_ms,
exc,
)
else:
logger.info(
"%s tier=%s url=%s elapsed_ms=%.1f outcome=%s",
_PERF,
tier,
url,
elapsed_ms,
outcome,
)
@staticmethod
def _log_total(url: str, selected: str, total_start: float) -> None:
"""Log the total time spent across all attempted tiers."""
total_ms = (time.perf_counter() - total_start) * 1000
logger.info(
"%s url=%s selected=%s total_ms=%.1f",
_PERF,
url,
selected,
total_ms,
)
async def _crawl_with_firecrawl(
self, url: str, formats: list[str] | None = None
) -> dict[str, Any]:
@ -177,52 +261,172 @@ class WebCrawlerConnector:
"crawler_type": "firecrawl",
}
async def _crawl_with_http(self, url: str) -> dict[str, Any] | None:
async def _crawl_with_async_fetcher(self, url: str) -> dict[str, Any] | None:
"""
Crawl URL using a plain HTTP request + Trafilatura content extraction.
Crawl URL using Scrapling's AsyncFetcher (static HTTP) + Trafilatura.
This method avoids launching a browser subprocess, making it safe to
call from any asyncio event loop (including Windows SelectorEventLoop
which does not support ``create_subprocess_exec``).
Returns ``None`` when Trafilatura cannot extract meaningful content
(e.g. JS-rendered SPAs) so the caller can fall through to Chromium.
AsyncFetcher is httpx/curl_cffi based and does not launch a browser
subprocess, making it safe to call from any asyncio event loop. Returns
``None`` when Trafilatura cannot extract meaningful content (e.g. JS
rendered SPAs) so the caller can fall through to the browser tiers.
"""
ua = UserAgent()
user_agent = ua.random
proxy_url = get_residential_proxy_url()
fetch_start = time.perf_counter()
page = await AsyncFetcher.get(
url,
stealthy_headers=True,
proxy=get_proxy_url(),
timeout=20,
)
fetch_ms = (time.perf_counter() - fetch_start) * 1000
async with httpx.AsyncClient(
timeout=20.0,
follow_redirects=True,
proxy=proxy_url,
headers={
"User-Agent": user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
},
) as client:
response = await client.get(url)
response.raise_for_status()
raw_html = response.text
if not raw_html or len(raw_html.strip()) == 0:
status = getattr(page, "status", None)
if status is not None and status >= 400:
logger.info(
"%s tier=scrapling-static url=%s fetch_ms=%.1f status=%s outcome=http_error",
_PERF,
url,
fetch_ms,
status,
)
return None
extracted_content = trafilatura.extract(
raw_html,
output_format="markdown",
include_comments=False,
include_tables=True,
include_images=True,
include_links=True,
return self._build_result(
page.html_content,
url,
"scrapling-static",
allow_raw_fallback=False,
fetch_ms=fetch_ms,
status=status,
)
if not extracted_content or len(extracted_content.strip()) == 0:
async def _crawl_with_dynamic(self, url: str) -> dict[str, Any] | None:
"""
Crawl URL using Scrapling's DynamicFetcher (full browser) + Trafilatura.
Runs the sync fetch in a worker thread so it works on any event loop,
including Windows ``SelectorEventLoop`` which cannot spawn subprocesses.
"""
return await asyncio.to_thread(self._crawl_with_dynamic_sync, url)
def _crawl_with_dynamic_sync(self, url: str) -> dict[str, Any] | None:
"""Synchronous DynamicFetcher crawl executed in a worker thread."""
fetch_start = time.perf_counter()
page = DynamicFetcher.fetch(
url,
headless=True,
network_idle=True,
timeout=30000,
proxy=get_proxy_url(),
)
fetch_ms = (time.perf_counter() - fetch_start) * 1000
return self._build_result(
page.html_content,
url,
"scrapling-dynamic",
allow_raw_fallback=False,
fetch_ms=fetch_ms,
status=getattr(page, "status", None),
)
async def _crawl_with_stealthy(self, url: str) -> dict[str, Any] | None:
"""
Crawl URL using Scrapling's StealthyFetcher (Camoufox) + Trafilatura.
Last-resort tier with anti-bot features. Runs the sync fetch in a worker
thread for the same event-loop-safety reasons as DynamicFetcher. Falls
back to the raw HTML when Trafilatura extraction is empty.
"""
return await asyncio.to_thread(self._crawl_with_stealthy_sync, url)
def _crawl_with_stealthy_sync(self, url: str) -> dict[str, Any] | None:
"""Synchronous StealthyFetcher crawl executed in a worker thread."""
fetch_start = time.perf_counter()
page = StealthyFetcher.fetch(
url,
headless=True,
network_idle=True,
block_ads=True,
proxy=get_proxy_url(),
)
fetch_ms = (time.perf_counter() - fetch_start) * 1000
return self._build_result(
page.html_content,
url,
"scrapling-stealthy",
allow_raw_fallback=True,
fetch_ms=fetch_ms,
status=getattr(page, "status", None),
)
def _build_result(
self,
raw_html: str | None,
url: str,
crawler_type: str,
*,
allow_raw_fallback: bool,
fetch_ms: float | None = None,
status: int | None = None,
) -> dict[str, Any] | None:
"""
Extract markdown + metadata from raw HTML using Trafilatura.
Args:
raw_html: Raw HTML source from a fetcher.
url: Original URL (used as the metadata source/title fallback).
crawler_type: Identifier of the tier that produced the HTML.
allow_raw_fallback: When True, return the raw HTML as content if
Trafilatura cannot extract anything (used by the last-resort
stealthy tier). When False, return ``None`` so the caller can
fall through to the next tier.
fetch_ms: Time spent fetching the page (for perf logging).
status: HTTP status code returned by the fetcher (for perf logging).
Returns:
Result dict (content/metadata/crawler_type) or ``None``.
"""
html_len = len(raw_html) if raw_html else 0
if not raw_html or len(raw_html.strip()) == 0:
self._log_build(
crawler_type, url, fetch_ms, 0.0, status, html_len, 0, "empty_html"
)
return None
trafilatura_metadata = trafilatura.extract_metadata(raw_html)
extract_start = time.perf_counter()
extracted_content: str | None = None
trafilatura_metadata = None
try:
extracted_content = trafilatura.extract(
raw_html,
output_format="markdown",
include_comments=False,
include_tables=True,
include_images=True,
include_links=True,
)
trafilatura_metadata = trafilatura.extract_metadata(raw_html)
if extracted_content and len(extracted_content.strip()) == 0:
extracted_content = None
except Exception:
extracted_content = None
extract_ms = (time.perf_counter() - extract_start) * 1000
if not extracted_content and not allow_raw_fallback:
self._log_build(
crawler_type,
url,
fetch_ms,
extract_ms,
status,
html_len,
0,
"no_extraction",
)
return None
metadata: dict[str, str] = {"source": url}
if trafilatura_metadata:
@ -236,105 +440,51 @@ class WebCrawlerConnector:
metadata["date"] = trafilatura_metadata.date
metadata.setdefault("title", url)
return {
"content": extracted_content,
"metadata": metadata,
"crawler_type": "http",
}
async def _crawl_with_chromium(self, url: str) -> dict[str, Any]:
"""
Crawl URL using Playwright with Trafilatura for content extraction.
Falls back to raw HTML if Trafilatura extraction fails.
Runs the sync Playwright API in a thread so it works on any event
loop, including Windows ``SelectorEventLoop`` which cannot spawn
subprocesses.
Args:
url: URL to crawl
Returns:
Dict containing crawled content and metadata
Raises:
Exception: If crawling fails
"""
return await asyncio.to_thread(self._crawl_with_chromium_sync, url)
def _crawl_with_chromium_sync(self, url: str) -> dict[str, Any]:
"""Synchronous Playwright crawl executed in a worker thread."""
ua = UserAgent()
user_agent = ua.random
playwright_proxy = get_playwright_proxy()
with sync_playwright() as p:
launch_kwargs: dict = {"headless": True}
if playwright_proxy:
launch_kwargs["proxy"] = playwright_proxy
browser = p.chromium.launch(**launch_kwargs)
context = browser.new_context(user_agent=user_agent)
page = context.new_page()
try:
page.goto(url, wait_until="domcontentloaded", timeout=30000)
raw_html = page.content()
page_title = page.title()
finally:
browser.close()
if not raw_html:
raise ValueError(f"Failed to load content from {url}")
base_metadata = {"title": page_title} if page_title else {}
extracted_content = None
trafilatura_metadata = None
try:
extracted_content = trafilatura.extract(
raw_html,
output_format="markdown",
include_comments=False,
include_tables=True,
include_images=True,
include_links=True,
)
trafilatura_metadata = trafilatura.extract_metadata(raw_html)
if not extracted_content or len(extracted_content.strip()) == 0:
extracted_content = None
except Exception:
extracted_content = None
metadata = {
"source": url,
"title": (
trafilatura_metadata.title
if trafilatura_metadata and trafilatura_metadata.title
else base_metadata.get("title", url)
),
}
if trafilatura_metadata:
if trafilatura_metadata.description:
metadata["description"] = trafilatura_metadata.description
if trafilatura_metadata.author:
metadata["author"] = trafilatura_metadata.author
if trafilatura_metadata.date:
metadata["date"] = trafilatura_metadata.date
metadata.update(base_metadata)
content = extracted_content if extracted_content else raw_html
self._log_build(
crawler_type,
url,
fetch_ms,
extract_ms,
status,
html_len,
len(content),
"extracted" if extracted_content else "raw_fallback",
)
return {
"content": extracted_content if extracted_content else raw_html,
"content": content,
"metadata": metadata,
"crawler_type": "chromium",
"crawler_type": crawler_type,
}
@staticmethod
def _log_build(
crawler_type: str,
url: str,
fetch_ms: float | None,
extract_ms: float,
status: int | None,
html_len: int,
content_len: int,
outcome: str,
) -> None:
"""Emit a detailed perf line splitting fetch vs Trafilatura extraction."""
fetch_repr = f"{fetch_ms:.1f}" if fetch_ms is not None else "n/a"
logger.info(
"%s tier=%s url=%s status=%s fetch_ms=%s extract_ms=%.1f "
"html_len=%d content_len=%d outcome=%s",
_PERF,
crawler_type,
url,
status,
fetch_repr,
extract_ms,
html_len,
content_len,
outcome,
)
def format_to_structured_document(
self, crawl_result: dict[str, Any], exclude_metadata: bool = False
) -> str:

View file

@ -3,14 +3,14 @@
import json
import logging
import re
import time
import aiohttp
from fake_useragent import UserAgent
from fastapi import APIRouter, Depends, HTTPException, Query
from scrapling.fetchers import AsyncFetcher
from app.db import User
from app.users import current_active_user
from app.utils.proxy_config import get_requests_proxies
from app.utils.proxy import get_proxy_url
router = APIRouter()
logger = logging.getLogger(__name__)
@ -69,26 +69,30 @@ async def _fetch_playlist_via_innertube(playlist_id: str) -> list[str]:
"context": {"client": _INNERTUBE_CLIENT},
"browseId": f"VL{playlist_id}",
}
proxies = get_requests_proxies()
try:
async with (
aiohttp.ClientSession() as session,
session.post(
_INNERTUBE_API_URL,
json=payload,
headers={"Content-Type": "application/json"},
proxy=proxies["http"] if proxies else None,
) as response,
):
if response.status != 200:
logger.warning(
"Innertube API returned %d for playlist %s",
response.status,
playlist_id,
)
return []
data = await response.json()
fetch_start = time.perf_counter()
page = await AsyncFetcher.post(
_INNERTUBE_API_URL,
json=payload,
proxy=get_proxy_url(),
stealthy_headers=True,
)
fetch_ms = (time.perf_counter() - fetch_start) * 1000
logger.info(
"[youtube][perf] source=innertube playlist=%s status=%s fetch_ms=%.1f",
playlist_id,
page.status,
fetch_ms,
)
if page.status != 200:
logger.warning(
"Innertube API returned %d for playlist %s",
page.status,
playlist_id,
)
return []
data = page.json()
return _extract_playlist_video_ids(data)
except Exception as e:
@ -98,35 +102,38 @@ async def _fetch_playlist_via_innertube(playlist_id: str) -> list[str]:
async def _fetch_playlist_via_html(playlist_id: str) -> list[str]:
"""Fallback: scrape playlist page HTML with consent cookies set."""
ua = UserAgent()
headers = {
"User-Agent": ua.random,
"Accept-Language": "en-US,en;q=0.9",
}
# Scrapling's stealthy_headers supplies a realistic User-Agent automatically.
headers = {"Accept-Language": "en-US,en;q=0.9"}
cookies = {
"CONSENT": "PENDING+999",
"SOCS": "CAISNQgDEitib3FfaWRlbnRpdHlmcm9udGVuZHVpc2VydmVyXzIwMjMwODI5LjA3X3AxGgJlbiADGgYIgOa_pgY",
}
proxies = get_requests_proxies()
playlist_url = f"https://www.youtube.com/playlist?list={playlist_id}"
try:
async with (
aiohttp.ClientSession(cookies=cookies) as session,
session.get(
playlist_url,
headers=headers,
proxy=proxies["http"] if proxies else None,
) as response,
):
if response.status != 200:
logger.warning(
"HTML fallback returned %d for playlist %s",
response.status,
playlist_id,
)
return []
html = await response.text()
fetch_start = time.perf_counter()
page = await AsyncFetcher.get(
playlist_url,
headers=headers,
cookies=cookies,
proxy=get_proxy_url(),
stealthy_headers=True,
)
fetch_ms = (time.perf_counter() - fetch_start) * 1000
logger.info(
"[youtube][perf] source=html-fallback playlist=%s status=%s fetch_ms=%.1f",
playlist_id,
page.status,
fetch_ms,
)
if page.status != 200:
logger.warning(
"HTML fallback returned %d for playlist %s",
page.status,
playlist_id,
)
return []
html = page.html_content
yt_data = _extract_yt_initial_data(html)
if not yt_data:

View file

@ -7,11 +7,12 @@ Implements 2-phase document status updates for real-time UI feedback:
"""
import logging
import time
from urllib.parse import parse_qs, urlparse
import aiohttp
from fake_useragent import UserAgent
from requests import Session
from scrapling.fetchers import AsyncFetcher
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from youtube_transcript_api import YouTubeTranscriptApi
@ -24,7 +25,7 @@ from app.utils.document_converters import (
generate_content_hash,
generate_unique_identifier_hash,
)
from app.utils.proxy_config import get_requests_proxies
from app.utils.proxy import get_proxy_url, get_requests_proxies
from .base import (
check_document_by_unique_identifier,
@ -218,18 +219,23 @@ async def add_youtube_video_document(
}
oembed_url = "https://www.youtube.com/oembed"
# Build residential proxy URL (if configured)
# Build residential proxy settings (if configured)
residential_proxies = get_requests_proxies()
async with (
aiohttp.ClientSession() as http_session,
http_session.get(
oembed_url,
params=params,
proxy=residential_proxies["http"] if residential_proxies else None,
) as response,
):
video_data = await response.json()
oembed_fetch_start = time.perf_counter()
oembed_page = await AsyncFetcher.get(
oembed_url,
params=params,
proxy=get_proxy_url(),
stealthy_headers=True,
)
logging.info(
"[youtube][perf] source=oembed video=%s status=%s fetch_ms=%.1f",
video_id,
getattr(oembed_page, "status", None),
(time.perf_counter() - oembed_fetch_start) * 1000,
)
video_data = oembed_page.json()
# Update title immediately for better UX (user sees actual title sooner)
document.title = video_data.get("title", f"YouTube Video: {video_id}")
@ -253,6 +259,7 @@ async def add_youtube_video_document(
)
try:
transcript_fetch_start = time.perf_counter()
ua = UserAgent()
http_client = Session()
http_client.headers.update({"User-Agent": ua.random})
@ -265,6 +272,11 @@ async def add_youtube_video_document(
transcript_list = ytt_api.list(video_id)
transcript = next(iter(transcript_list))
captions = transcript.fetch()
logging.info(
"[youtube][perf] source=transcript video=%s fetch_ms=%.1f",
video_id,
(time.perf_counter() - transcript_fetch_start) * 1000,
)
# Include complete caption information with timestamps
transcript_segments = []

View file

@ -0,0 +1,40 @@
"""Modular residential / rotating proxy provider package.
Selects a provider via the ``PROXY_PROVIDER`` env var (see ``registry.py``) and
exposes proxy settings in the formats different HTTP libraries expect. Add new
vendors by implementing :class:`ProxyProvider` in ``providers/`` and registering
them in ``registry.py``.
"""
from app.utils.proxy.base import ProxyProvider
from app.utils.proxy.registry import get_active_provider
def get_proxy_url() -> str | None:
"""Canonical ``http://user:pass@host:port`` URL for Scrapling/curl_cffi."""
return get_active_provider().get_proxy_url()
def get_playwright_proxy() -> dict[str, str] | None:
"""Playwright-style proxy dict, or ``None`` when not configured."""
return get_active_provider().get_playwright_proxy()
def get_requests_proxies() -> dict[str, str] | None:
"""``{"http": ..., "https": ...}`` dict for requests/aiohttp, or ``None``."""
return get_active_provider().get_requests_proxies()
def get_residential_proxy_url() -> str | None:
"""Backward-compatible alias for :func:`get_proxy_url`."""
return get_proxy_url()
__all__ = [
"ProxyProvider",
"get_active_provider",
"get_playwright_proxy",
"get_proxy_url",
"get_requests_proxies",
"get_residential_proxy_url",
]

View file

@ -0,0 +1,46 @@
"""Abstract base class for residential / rotating proxy providers.
Each provider reads its own credentials from the application Config and exposes
proxy settings in the formats the different HTTP stacks expect:
* ``get_proxy_url`` -> canonical ``http://user:pass@host:port`` string consumed
by Scrapling's fetchers (curl_cffi / patchright / camoufox).
* ``get_requests_proxies`` -> ``{"http": ..., "https": ...}`` dict for
``requests`` / ``aiohttp``.
* ``get_playwright_proxy`` -> Playwright-style ``{"server", "username",
"password"}`` dict.
Add a new vendor by subclassing :class:`ProxyProvider` in ``providers/`` and
registering it in ``registry.py``.
"""
from abc import ABC, abstractmethod
class ProxyProvider(ABC):
"""Interface every proxy provider must implement."""
#: Unique key used to select this provider via the ``PROXY_PROVIDER`` env var.
name: str = "base"
@abstractmethod
def get_proxy_url(self) -> str | None:
"""Return ``http://user:pass@host:port`` (no trailing slash), or ``None``.
This is the canonical form Scrapling/curl_cffi consume directly.
"""
@abstractmethod
def get_playwright_proxy(self) -> dict[str, str] | None:
"""Return a Playwright proxy dict, or ``None`` when not configured."""
def get_requests_proxies(self) -> dict[str, str] | None:
"""Return a ``requests``/``aiohttp`` proxies dict, or ``None``.
Built from :meth:`get_proxy_url` by default; override if a provider needs
different http vs https endpoints.
"""
proxy_url = self.get_proxy_url()
if proxy_url is None:
return None
return {"http": proxy_url, "https": proxy_url}

View file

@ -0,0 +1 @@
"""Concrete proxy provider implementations."""

View file

@ -0,0 +1,65 @@
"""anonymous-proxies.net residential / rotating proxy provider.
The vendor (``rotating.dnsproxifier.com``) encodes the location and rotation
``type`` options inside a base64-encoded JSON "password". The hostname already
includes the port (e.g. ``rotating.dnsproxifier.com:31230``).
"""
import base64
import json
import logging
from app.config import Config
from app.utils.proxy.base import ProxyProvider
logger = logging.getLogger(__name__)
class AnonymousProxiesProvider(ProxyProvider):
"""Provider for anonymous-proxies.net credentials in ``RESIDENTIAL_PROXY_*``."""
name = "anonymous_proxies"
def _password_b64(self) -> str | None:
"""Build the base64-encoded password dict required by the vendor.
Returns ``None`` when the password is not configured.
"""
password = Config.RESIDENTIAL_PROXY_PASSWORD
if not password:
return None
password_dict = {
"p": password,
"l": Config.RESIDENTIAL_PROXY_LOCATION,
"t": Config.RESIDENTIAL_PROXY_TYPE,
}
return base64.b64encode(
json.dumps(password_dict).encode("utf-8")
).decode("utf-8")
def get_proxy_url(self) -> str | None:
username = Config.RESIDENTIAL_PROXY_USERNAME
hostname = Config.RESIDENTIAL_PROXY_HOSTNAME
password_b64 = self._password_b64()
if not all([username, hostname, password_b64]):
return None
# No trailing slash: curl_cffi (Scrapling static fetcher) expects a bare
# ``http://user:pass@host:port`` URL.
return f"http://{username}:{password_b64}@{hostname}"
def get_playwright_proxy(self) -> dict[str, str] | None:
username = Config.RESIDENTIAL_PROXY_USERNAME
hostname = Config.RESIDENTIAL_PROXY_HOSTNAME
password_b64 = self._password_b64()
if not all([username, hostname, password_b64]):
return None
return {
"server": f"http://{hostname}",
"username": username,
"password": password_b64,
}

View file

@ -0,0 +1,44 @@
"""Proxy provider registry.
Maps the ``PROXY_PROVIDER`` config value to a :class:`ProxyProvider`
implementation. To add a new vendor: implement a provider in ``providers/`` and
add a single entry to ``_PROVIDERS`` below - no caller changes required.
"""
import logging
from app.config import Config
from app.utils.proxy.base import ProxyProvider
from app.utils.proxy.providers.anonymous_proxies import AnonymousProxiesProvider
logger = logging.getLogger(__name__)
# Registered proxy providers, keyed by their ``name``.
_PROVIDERS: dict[str, type[ProxyProvider]] = {
AnonymousProxiesProvider.name: AnonymousProxiesProvider,
}
_DEFAULT_PROVIDER = AnonymousProxiesProvider.name
_active_provider: ProxyProvider | None = None
def get_active_provider() -> ProxyProvider:
"""Return the configured proxy provider instance (cached for the process)."""
global _active_provider
if _active_provider is not None:
return _active_provider
key = (Config.PROXY_PROVIDER or _DEFAULT_PROVIDER).strip()
provider_cls = _PROVIDERS.get(key)
if provider_cls is None:
logger.warning(
"Unknown PROXY_PROVIDER '%s'; falling back to '%s'. Available: %s",
key,
_DEFAULT_PROVIDER,
", ".join(sorted(_PROVIDERS)),
)
provider_cls = _PROVIDERS[_DEFAULT_PROVIDER]
_active_provider = provider_cls()
return _active_provider

View file

@ -1,86 +1,20 @@
"""
Residential proxy configuration utility.
"""Backward-compatible shim for the proxy helpers.
Reads proxy credentials from the application Config and provides helper
functions that return proxy configs in the format expected by different
HTTP libraries (requests, httpx, aiohttp, Playwright).
The implementation now lives in the modular :mod:`app.utils.proxy` package.
Existing imports of ``app.utils.proxy_config`` keep working via these re-exports.
Prefer importing from ``app.utils.proxy`` (and ``get_proxy_url``) in new code.
"""
import base64
import json
import logging
from app.utils.proxy import (
get_playwright_proxy,
get_proxy_url,
get_requests_proxies,
get_residential_proxy_url,
)
from app.config import Config
logger = logging.getLogger(__name__)
def _build_password_b64() -> str | None:
"""
Build the base64-encoded password dict required by anonymous-proxies.net.
Returns ``None`` when the required config values are not set.
"""
password = Config.RESIDENTIAL_PROXY_PASSWORD
if not password:
return None
password_dict = {
"p": password,
"l": Config.RESIDENTIAL_PROXY_LOCATION,
"t": Config.RESIDENTIAL_PROXY_TYPE,
}
return base64.b64encode(json.dumps(password_dict).encode("utf-8")).decode("utf-8")
def get_residential_proxy_url() -> str | None:
"""
Return the fully-formed residential proxy URL, or ``None`` when not
configured.
The URL format is::
http://<username>:<base64_password>@<hostname>/
"""
username = Config.RESIDENTIAL_PROXY_USERNAME
hostname = Config.RESIDENTIAL_PROXY_HOSTNAME
password_b64 = _build_password_b64()
if not all([username, hostname, password_b64]):
return None
return f"http://{username}:{password_b64}@{hostname}/"
def get_requests_proxies() -> dict[str, str] | None:
"""
Return a ``{"http": , "https": }`` dict suitable for
``requests.Session.proxies`` and ``aiohttp`` ``proxy=`` kwarg,
or ``None`` when not configured.
"""
proxy_url = get_residential_proxy_url()
if proxy_url is None:
return None
return {"http": proxy_url, "https": proxy_url}
def get_playwright_proxy() -> dict[str, str] | None:
"""
Return a Playwright-compatible proxy dict::
{"server": "http://host:port", "username": "", "password": ""}
or ``None`` when not configured.
"""
username = Config.RESIDENTIAL_PROXY_USERNAME
hostname = Config.RESIDENTIAL_PROXY_HOSTNAME
password_b64 = _build_password_b64()
if not all([username, hostname, password_b64]):
return None
return {
"server": f"http://{hostname}",
"username": username,
"password": password_b64,
}
__all__ = [
"get_playwright_proxy",
"get_proxy_url",
"get_requests_proxies",
"get_residential_proxy_url",
]

View file

@ -25,7 +25,6 @@ dependencies = [
"notion-client>=2.3.0",
"numpy>=1.24.0",
"pgvector>=0.3.6",
"playwright>=1.50.0",
"pypdf>=5.1.0",
"python-ffmpeg>=2.0.12",
"rerankers[flashrank]>=0.7.1",
@ -90,6 +89,7 @@ dependencies = [
"opentelemetry-instrumentation-logging>=0.61b0",
"python-telegram-bot>=22.7",
"croniter>=2.0.0",
"scrapling[fetchers]>=0.4.9",
]
[project.optional-dependencies]

9668
surfsense_backend/uv.lock generated

File diff suppressed because it is too large Load diff